Skip to main content

おうち IoT 用の LINE ボットをもう少し賢くする

これまでの LINE ボットの課題

2018 年に、家のエアコンを操作してくれる LINE ボットを作って以来、すでに一年半くらい運用しています。概要は 当時のエントリでも紹介しています が、LINE での会話を元に自宅のエアコンを操作してくれるものです。

これ、作った当初に想定していた以上にたいへん便利で、外出先で家族と『そろそろロボくんにお願いしておこう』などの会話が発生する程度には実際に活用されていました。この手の『作ってみた』系は、長期的な運用が定着する前に使わなくなることが多い印象もあり、これは小規模ではあるもののうまくいった例と言えるかと思います。

当時の LINE ボットの様子

一方で、作りが甘い部分もあって、

  • Lambda の Node.js のランタイムで EoL が迫っていた
    • そもそも Node.js のランタイム側の更新に追従していくことに将来的にけっこう体力を使いそうな印象がある
    • 当時 Node.js を選んだ理由は『とりあえずいちど触ってみたかった』からというだけで、すでにその目的は達成できた
  • ひとつの Lambda に全機能を詰め込んでいて、どう考えてもイケてないアーキテクチャだ
  • 機能が足りない
    • エアコンのオンオフと温度の変更はできたが、冷房と暖房の切り替え機能を実装していない

など、長期的な運用に耐えられるよう全体を作り直したいモチベーションも強くなってきていました。

そんな中、あるイベントに参加して、『Raspberry Pi とセンサとクラウドサービスを使って何でもいいから個人で何かを作る』という活動をすることになり、タイミングもよかったので、この LINE ボットの作り直しを進めることにしました。

できたもの

で、こういう風に進化して、今も元気に動いています。

現在の LINE ボットの様子

以下のような機能が付いています。

  • エアコンの操作
    • オンオフ、冷暖房の切り替え、温度の変更、現在の設定の確認をしてくれます
  • 加湿器の操作
    • オンオフをしてくれます
  • 空調センサ
    • 室内に設置したセンサを使って、気温、湿度、気圧、二酸化炭素濃度の現在の値を教えてくれます
    • 任意の時間分の履歴をグラフ化して送ってくれます
  • 家計簿の記録
    • 支払者、使途、金額を Google Sheets に追記してくれます
  • 毎朝の自動制御
    • 毎朝、部屋が寒い(暑い)場合は、暖房(冷房)を付けてくれます
    • 毎朝、部屋が乾燥している場合は、加湿器を付けてくれます

実装

実装を簡単に紹介します。末尾には GitHub へのリンクも載せています。

全体像

全体はこのような構成です。

こけおどし全体像

参加したイベントの性質上、なるべくたくさんの要素技術を取り込んだほうが評価が上がる仕組みだったため、あえてまわりくどく冗長な構成になっている部分もあります。

センシングと蓄積

センシングと蓄積は、図の以下の部分です。これまで複数回にわたって紹介してきた EdgeX Foundry を中心に構成しています。

センシングと蓄積の部分

EdgeX Foundry 自体は、家庭内の IoT ゲートウェイサーバとして構築した Ubuntu 仮想マシン(ESXi 上で動作)の中で動いています。このゲートウェイサーバ上では MQTT ブローカも動かしています。

Raspberry Pi は、センサ値を読んで MQTT トピックにひたすら放り込み続けるだけの係です。EdgeX Foundry は、この Raspberry Pi を MQTT デバイスとして制御するよう構成されており、所定の MQTT トピックに投げ込まれたデータを取り込んで、アプリケーションサービスの機能を使ってクラウド上の所定の宛先にデータをエクスポートしています。

エクスポート先は、Mosquitto のパブリック MQTT トピックと、Pivotal Web Service(PWS)上のワーカアプリケーションで、なんやかんやされて最終的にデータは InfluxDB Cloud 2.0、Redis Cloud、MongoDB mLab に蓄積されます。

十数年前に PIC や Arduino でセンサやアクチュエータを操作していた頃は、データシートとにらめっこをしながら必要な抵抗を計算したり回路を考えたりしていましたが、最近のヤツは単にデータを読み取るだけなら恐ろしいくらい簡単ですね…… けっこう衝撃でした。

今回は BME280 と MH-Z19 を使っています。安いですし精度は悪いのでしょうが、厳密な値は必要としていないのでよしとしています。最初は DS18B20 も使っていましたが、BME280 を組み込んでからは使っていません。

アクチュエーション

最終的にはエアコンと加湿器が操作されるわけですが、この部分の実装は、前者は Nature Remo の API、後者は非公式の野良 API で制御しています。

アクチュエーションの部分

Nature Remo の API を叩く役は、AWS の所定の Lambda に一任しています。加湿器も同様ですが、こちらは非公式 API の都合上、ローカルネットワーク内からしか制御できなかったので、Lambda からいちど IoT Core の MQTT トピックに命令をなげ、それをローカルネットワーク側から購読して後続の処理をトリガさせています。ここだけは Node.js です。

加湿器は、モデルの選定がなかなかたいへんでした。加湿器に限らず何らかの家電を外部から自動制御する場合、

  • スマートコンセントを利用する
    • コンセントが通電したら目的の動作が開始される仕様である必要がある
  • スマートスイッチ(SwitchBot など)を利用する
    • 目的のボタンが、物理的にスマートスイッチで押下できる形状であり、かつスマートスイッチの力で押せるだけの硬さである必要がある
  • スマートリモコンを利用する
    • RF ではなく赤外線を利用したリモコンで制御できる機器である必要がある
  • 専用の API を利用する
    • API が公開されている必要がある

のいずれかの条件を満たす必要がありますが、加湿器でこれに合致するモデルが全然見つけられませんでした。大体の加湿器は、コンセントの通電後にさらにボタンを押さないと動作しませんし、ボタンの周辺の形状や寸法やボタンの硬さは公開されていませんし、リモコン付きのモデルでも赤外線でなく RF ですし、公式の API はなさそうですし。

結局、公開されていない API を無理やり叩ける非公式のライブラリが存在していてハックの余地がありそう、ということがわかった Oittm のアロマディフューザー を採用しています。ただしこれも、

  • 非公式のライブラリ で制御できるようにするには、公式アプリケーション Tuya Smart の旧バージョン(3.12.6 以前)を使って、MITM 攻撃的なヤツでアプリケーションの通信を自分で盗聴してキーを入手しないといけない
  • ライブラリが Node.js 向けしかない
  • そもそもの加湿器自体が小型でとても非力で、リビングなど広い空間の加湿には向かない

など、ベストとは言い難い状態です。改善の余地ありです。

インタフェイスとインタラクション

インタフェイス役の LINE ボットに話しかけると、LINE の Messaging API 経由で AWS の Lambda に届いて、メッセージ本文がパタンマッチされてその内容に応じて後続の処理がトリガされます。

インタラクションの部分

エアコンの操作であれば Nature Remo を制御する Lambda を呼びますし、加湿器の操作であればそれ用の別の Lambda を呼びます。センサのデータが必要な処理であれば、外部のデータベースに必要なデータを取りに行く Lambda を呼びます。

インタフェイスとしての LINE

グラフを要求された場合は、InfluxDB からデータを取ってきて matplotlib で描画したあとに S3 に保存し、それをプッシュでユーザに送ります。描画する対象(温度、湿度、気圧……)と長さ(N 分、M 時間)はメッセージ本文から都度判断します。

実際に生成されたグラフの例

この辺りの処理では、

  • グラフ要求のメッセージが来たら、受理した旨の返事を後続処理に先行して即座にしてしまい、画像の生成と送信はあとから非同期で行う
    • グラフの生成に時間がかかるため、 同期的に処理させると(Lambda ではなく)API Gateway がタイムアウトする
  • S3 のバケット名や画像ファイル名をなるべく短くする
    • LINE で画像を送るときは、画像そのものではなく画像の URL を送る必要がある
    • Lambda で発行する S3 の署名付き URL は 1,000 文字前後とめちゃくちゃ長い(x-amz-security-token が含まれるため)
    • LINE 側の制約で、URL は 1,000 文字以内でないとエラーで送れない
    • 署名付き URL ではなくパブリックな URL にしてしまうのは気が引ける
    • バケット名やファイル名は URL に含まれるため、短くすることでギリギリ 1,000 文字以内になる

などの工夫が必要でした。特に二点目はひどい回避策ですが、あまり権限をがばがばにはしたくなかったので仕方なく……。

また、LINE とはまったく関係ないところで、Grafana を PWS で動かしており、そこでもグラフを見られるようにしています。

感触とロードマップ

センサの値がグラフで見られるのが、実際に使ってみると思っていた以上におもしろかったです。

例えばエアコンをつけてから温度が上がっていく様子や、朝起きて部屋で人間が活動しだしてから二酸化炭素濃度が一気に上がる様子、逆に家が無人になってから下がる様子、キッチンで火を使う調理を始めたタイミングなど、思っていた以上に生活パタンが可視化されることがわかりました。

また、二酸化炭素濃度や気圧の変化では眠気や頭痛の誘発なども懸念されるので、体調に違和感を覚えたときに客観的な変化をすぐ確認できるのはうれしく、体調管理面でも地味に役立っています。

運用面では、Lambda まわりをすべて CloudFormation で定義したことで、コードの管理とデプロイがだいぶに楽になりました。クラウド側がマネージドサービスとサーバレスアーキテクチャだけで組めているのも安心感があります。

ただ、本エントリの途中で、

参加したイベントの性質上、なるべくたくさんの要素技術を取り込んだほうが評価が上がる仕組みだったため、あえてまわりくどく冗長な構成になっている部分もあります

と書いた通り、正直なところ機能に比較して実装が大げさすぎるので、この辺はスリムにしたいと思っています。

イベントはもう終わったので、現状のゴテゴテ感を保つ意味はあまりなく、例えば、

  • この規模だと EdgeX Foundry の恩恵は受けにくいので、まるっと削って Raspberry Pi から直接クラウドに投げてもよいだろう
  • データの蓄積場所も AWS の DynamoDB などにしてしまえば、PWS も InfluxDB も Redis も MongoDB もいらなくなり、AWS に全部寄せられるだろう
  • いっそ Raspberry Pi も AWS IoT Greengrass で AWS から管理させるとよいのでは

などのダイエットや試行錯誤を検討中です。とくに Greengrass は完全に未修分野なので純粋に興味もあり触ってみたいですね。

物理的には、加湿器をもう少し高機能(大容量)かつ自動制御しやすいものに替えたい気持ちがあります。オンオフは自動で制御できても、給水が自動化しづらいので、結局は人間の介入が必要な状態になってしまっており、人間が手を抜くにはタンクの容量がキモになりそうだと考えています。

関連リポジトリ

一覧します。

  • raspi-airmeasurment
    • Raspberry Pi 上で動作させる、各種センサ値を読み取って MQTT トピックに送る Python プログラム
  • edgex-lab-raspi
    • Raspberry Pi から MQTT トピックに送られた値の取り込みや、クラウドへのエクスポートを行えるように構成した EdgeX Foundry の Docker Compose ファイル
  • edgex-lab-export2db
    • PWS 上で動作させる、EdgeX Foundry からエクスポートされたデータを各種データベースに保存するためのプログラム群
    • Telegraf 用のイメージは Docker Hub の kurokobo/edgex-lab-telegraf に配置済み
  • grafana-with-flux
    • PWS 上で動作させる Grafana
  • sam-smarthome-api
    • AWS 上で動作させる一連の Lambda 群
  • tuya-mqtt
    • MQTT の配信を受けて加湿器をコントロールする Node.js プログラム


Rapsberry Pi

冬休みの自由研究: EdgeX Foundry (6) アプリケーションサービスによるエクスポート

EdgeX Foundry 関連エントリ

エクスポート方法の新旧

エクスポートサービスを取り扱ったエントリの最後で、以下のように書きました。

……と、意気揚々と書いてきたものの、今回利用したエクスポートサービスは最近のリリースではすでに廃止されており、エクスポートの機能は アプリケーションサービス に統合されているようです。

冬休みの自由研究: EdgeX Foundry (3) エクスポートサービスによるエクスポート

アプリケーションサービスに関する公式ドキュメント では、エクスポートサービスの利用はまだサポートされるものの、今後はアプリケーションサービスの利用が推奨されています。

エクスポートサービスを利用したエクスポートの欠点として、

  • エクスポートサービスは、登録されているすべてのクライアントにデータを順次配信するが、これがボトルネックになり、エンドポイント側の受信が遅延する
  • 登録されているクライアントを管理することそれ自体のオーバヘッドと複雑さが EdgeX Foundry にとって負担である
  • 特定のクラウドへのエクスポートに限定して作りこむのではなく、SDK を提供してクラウドプロバイダにとらわれない状態にすべきだ

などが挙げられています。ここでいう SDK とそれを使った実装がアプリケーションサービスなわけですが、アプリケーションサービスでは、

  • コアサービス(データを ZeroMQ で配る Core Data)に直接つなげられることで、パフォーマンスの問題を排除する
  • 開発者には、データが利用可能になったら直ちにそれに対して何らかの処理を行える手段が提供される
  • 登録作業が不要である

などの点でよい感じになっています。

いまいち正しくない気もしますが、ざっくりまとめると、下図の上が下になったので、

上がこれまで、下がこれから

エクスポートサービス(のディストリビューションサービス)がボトルネックにならなくなったし、値を好きに処理できる場所もできたよ! ということかと思います。

今回試すこと

そういうわけで、エクスポートサービスのなくなった世界でも生きていけるよう、エクスポートサービスを扱ったエントリと同様に、MQTT トピックと REST エンドポイントへ、アプリケーションサービスを使ってエクスポートできる状態を作ります。

今回も GitHub にもろもろ置いてあります ので、こちらをクローンして使います。

$ git clone https://github.com/kurokobo/edgex-lab-handson.git
 
$ cd lab05

アプリケーションサービス

上図のアプリケーションサービスの箱の中には、ファンクション(Fn で表記)を並べました。アプリケーションサービスでは、このように任意のファンクション群をパイプライン化してデータフローを定義できます。

例えば簡単な例では、標準で用意されているファンクションを並べるだけでも、

  1. デバイス名でフィルタするファンクション
  2. リソース名でフィルタするファンクション
  3. JSON 形式に変換するファンクション
  4. MQTT でエクスポートするファンクション

などのパイプラインが構成できます。現時点で利用できるファンクションは、SDK の README.md で確認できます。

構成の考え方

標準で実装されているアプリケーションサービスとして、app-service-configurable がありますが、これは、ひとつのパイプラインをひとつのインスタンスで処理する前提で作られているようです。また、パイプラインそのものは、インスタンスごとの設定ファイル configuration.toml で定義します。

つまり、コンテナ環境の例でいえば、冒頭の絵のように、

  • 最終的に MQTT トピックにエクスポートする app-service-configurable コンテナ
  • 最終的に REST エンドポイントにエクスポートする app-service-configurable コンテナ

など、app-service-configurable が、目的に応じて複数起動している状態を作るということです。

なお、コアサービスのホスト名やポート番号、MQTT ではブローカのホスト名など環境に依存する情報は、コンテナの場合は環境変数で Docker Compose ファイルから渡せるようになっています。エクスポートサービスでは API を叩いてクライアントとして登録する作業が必要でしたが、すべての構成をあらかじめファイルとして定義しておけるということですね。

設定ファイルの確認

コンテナの場合、デフォルトでコンテナ内の /res に、目的別に事前構成された configuration.toml がディレクトリを分けて配置されています。そして、起動時に渡す環境変数で、どのディレクトリの中身を利用するか指定します。

例えば、MQTT トピックへのエクスポートで利用する configuration.toml は以下です。

$ cat mqtt-export/configuration.toml
...
  [Writable.Pipeline]
    ExecutionOrder = "TransformToJSON, MQTTSend, MarkAsPushed"

    [Writable.Pipeline.Functions.TransformToJSON]
    [Writable.Pipeline.Functions.MarkAsPushed]
    [Writable.Pipeline.Functions.FilterByDeviceName]
      [Writable.Pipeline.Functions.FilterByDeviceName.Parameters]
        DeviceNames = ""
    [Writable.Pipeline.Functions.FilterByValueDescriptor]
      [Writable.Pipeline.Functions.FilterByValueDescriptor.Parameters]
        ValueDescriptors = ""
    [Writable.Pipeline.Functions.MQTTSend]
      [Writable.Pipeline.Functions.MQTTSend.Parameters]
        qos="0"
        key=""
        autoreconnect="false"
        retain="false"
        cert=""
        persistOnError = "false"
      [Writable.Pipeline.Functions.MQTTSend.Addressable]
        Address=   "localhost"
        Port=      1883
        Protocol=  "tcp"
        Publisher= "AppServiceConfigurable-mqtt-export"
        User=      ""
        Password=  ""
        Topic=     "edgex-events"
...

ExecutionOrder に含まれる関数がパイプラインとして順次実行されていきます。この例だと、JSON に変換してから MQTT トピックに送信しています。

利用する関数は下で列挙されていますが、ExecutionOrder に含まれないものは実際には利用されないようです。デバイス名でフィルタする関数(FilterByDeviceName)なども書かれていますが、使いたいなら自分で ExecutionOrder に書いてね、ということでしょう。

また、MQTT ブローカのホスト名に localhost が指定されていたり、トピック名が edgex-events になっていたりしますが、先述のとおり、この辺りは環境変数で書き換えますのでこのままで問題ありません。

同様に、REST エンドポイントへのエクスポートで利用する設定ファイルは以下です。

$ cat http-export/configuration.toml
...
  [Writable.StoreAndForward]
    Enabled = false
    RetryInterval = "5m"
    MaxRetryCount = 10

  [Writable.Pipeline]
    UseTargetTypeOfByteArray = false
    ExecutionOrder = "FilterByDeviceName, TransformToJSON, HTTPPostJSON, MarkAsPushed"

    [Writable.Pipeline.Functions.TransformToJSON]
    [Writable.Pipeline.Functions.MarkAsPushed]
    [Writable.Pipeline.Functions.FilterByDeviceName]
      [Writable.Pipeline.Functions.FilterByDeviceName.Parameters]
        DeviceNames = ""
    [Writable.Pipeline.Functions.FilterByValueDescriptor]
      [Writable.Pipeline.Functions.FilterByValueDescriptor.Parameters]
        ValueDescriptors = ""
    [Writable.Pipeline.Functions.HTTPPostJSON]
      [Writable.Pipeline.Functions.HTTPPostJSON.Parameters]
        url = "http://"
        persistOnError = "false"
...

環境変数による柔軟なオーバライド

設定ファイルに含まれるパラメータは、実装から推測すると、おそらくすべて環境変数で書き換えが可能です。セクション名の区切り文字を _ に置換して、さらに _<パラメータ名> を付与して環境変数として与えれば、オーバライドされるようでした。

例えば、ExecutionOrder 自体も、Writable_Pipeline_ExecutionOrder 環境変数として与えられます。例えば先述の mqtt-exportExecutionOrder には FilterByDeviceName が含まれせんが、環境変数で以下を与えることで、このフィルタも有効化できます。

  • 環境変数 Writable_Pipeline_ExecutionOrder
    • FilterByDeviceName, TransformToJSON, MQTTSend, MarkAsPushed
  • 環境変数 Writable_Pipeline_Functions_FilterByDeviceName_Parameters_DeviceNames
    • <デバイス名>

今回の構成の定義

では、今回用の構成を作ります。先の通り、環境依存の値は環境変数経由で与えればよいので、今回編集するのは、Docker Compose ファイルです。

Docker Compose ファイルには、もともと、ルールエンジンにデータを送る用の app-service-rules が定義されています。

$ cat docker-composer.yml
...
  app-service-rules:
    image: edgexfoundry/docker-app-service-configurable:1.0.0
    ports:
      - "48096:48096"
    container_name: edgex-app-service-configurable-rules
    hostname: edgex-app-service-configurable-rules
    networks:
      edgex-network:
        aliases:
          - edgex-app-service-configurable-rules
    environment:
      <<: *common-variables
      edgex_service: http://edgex-app-service-configurable-rules:48096
      edgex_profile: rules-engine
      Service_Host: edgex-app-service-configurable-rules
      MessageBus_SubscribeHost_Host: edgex-core-data
    depends_on:
      - consul
      - logging
      - data
...

今回は、これに加えて、MQTT トピックにエクスポートする app-service-mqtt-export と、REST エンドポイントにエクスポートする app-service-http-export を追加しています。

$ cat docker-composer.yml
...
  app-service-mqtt-export:
    image: edgexfoundry/docker-app-service-configurable:1.0.0
    ports:
      - "48097:48097"
    container_name: edgex-app-service-configurable-mqtt-export
    hostname: edgex-app-service-configurable-mqtt-export
    networks:
      edgex-network:
        aliases:
          - edgex-app-service-configurable-mqtt-export
    environment:
      <<: *common-variables
      edgex_service: http://edgex-app-service-configurable-mqtt-export:48097
      edgex_profile: mqtt-export
      Service_Host: edgex-app-service-configurable-mqtt-export
      MessageBus_SubscribeHost_Host: edgex-core-data
      Writable_Pipeline_Functions_MQTTSend_Addressable_Address: 192.168.0.100
      Writable_Pipeline_Functions_MQTTSend_Addressable_Port: 1883
      Writable_Pipeline_Functions_MQTTSend_Addressable_Protocol: tcp
      # Writable_Pipeline_Functions_MQTTSend_Addressable_Publisher: 
      # Writable_Pipeline_Functions_MQTTSend_Addressable_User: 
      # Writable_Pipeline_Functions_MQTTSend_Addressable_Password: 
      Writable_Pipeline_Functions_MQTTSend_Addressable_Topic: edgex-handson-topic
      # Writable_Pipeline_Functions_MQTTSend_Parameters_Qos: 
      # Writable_Pipeline_Functions_MQTTSend_Parameters_Key: 
      # Writable_Pipeline_Functions_MQTTSend_Parameters_Cert: 
      # Writable_Pipeline_Functions_MQTTSend_Parameters_Autoreconnect: 
      # Writable_Pipeline_Functions_MQTTSend_Parameters_Retain: 
      # Writable_Pipeline_Functions_MQTTSend_Parameters_PersistOnError: 
    volumes:
      - ./app-service-configurable:/res
    depends_on:
      - consul
      - logging
      - data

  app-service-http-export:
    image: edgexfoundry/docker-app-service-configurable:1.0.0
    ports:
      - "48098:48098"
    container_name: edgex-app-service-configurable-http-export
    hostname: edgex-app-service-configurable-http-export
    networks:
      edgex-network:
        aliases:
          - edgex-app-service-configurable-http-export
    environment:
      <<: *common-variables
      edgex_service: http://edgex-app-service-configurable-http-export:48098
      edgex_profile: http-export
      Service_Host: edgex-app-service-configurable-http-export
      MessageBus_SubscribeHost_Host: edgex-core-data
      Writable_Pipeline_Functions_HTTPPostJSON_Parameters_url: http://192.168.0.100:5000/api/v1/echo
      # Writable_Pipeline_Functions_HTTPPostJSON_Parameters_persistOnError: 
    volumes:
      - ./app-service-configurable:/res
    depends_on:
      - consul
      - logging
      - data 
...

使っているコンテナイメージは、いずれもまったく同じです。ボリュームも同じマウントのしかたをしていますが、使う設定ファイルを edgex_profile で制御しています。また、読ませる設定ファイルに応じて、必要な設定を環境変数で入れています。

MQTT ブローカや REST エンドポイントのホスト名などは環境に合わせて適宜修正してください。ほかにも、configuration.toml で変更したい値があれば、環境変数として与えると変えられます。

また、今回はエクスポートサービスが起動しないようにコメントアウトしています。

起動

下準備として、MQTT ブローカを起動して、購読を開始します。

$ docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "#" -v

別のターミナルで REST エンドポイントも起動します。

$ cd rest-endpoint

$ python ./main.py
 * Serving Flask app "main" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: on
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 159-538-312
 * Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)

この状態で、EdgeX Foundry を起動します。

$ docker-compose up -d

動作確認

必要な設定はすべて環境変数を使って与えていたので、何もしないでも MQTT トピックと REST エンドポイントに値が届き出します。

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "#" -v
...
edgex-handson-topic {"id":"a11011bb-30cb-44fa-885c-8929c64e35b1","device":"Random-UnsignedInteger-Device","origin":1580015685340757500,"readings":[{"id":"26f50397-1e8b-4ca0-8db2-eeeee958a643","origin":1580015685326443600,"device":"Random-UnsignedInteger-Device","name":"Uint8","value":"90"}]}
edgex-handson-topic {"id":"526631fd-a0f7-437d-98a9-4f6eb8a06775","device":"Random-UnsignedInteger-Device","origin":1580015685369824800,"readings":[{"id":"91ef8066-5c31-4377-84e6-a6d517457f62","origin":1580015685356418200,"device":"Random-UnsignedInteger-Device","name":"Uint16","value":"27009"}]}
...
$ python ./main.py
...
--
2020-01-26 14:15:10.318596
{"id":"be20b21b-f0a4-4550-95bb-a78b5c597f56","device":"Random-Integer-Device","origin":1580015710310854200,"readings":[{"id":"d62fdacc-bfa4-4d05-a537-25c03e151f78","origin":1580015710295318700,"device":"Random-Integer-Device","name":"Int8","value":"3"}]}
192.168.0.100 - - [26/Jan/2020 14:15:10] "POST /api/v1/echo HTTP/1.1" 200 -
--
2020-01-26 14:15:10.375595
{"id":"c0275732-2e3d-4945-9435-d6e77e0ccb74","device":"Random-Integer-Device","origin":1580015710364584600,"readings":[{"id":"067d71a3-7c1f-4fea-968d-ac5db0410e67","origin":1580015710350134100,"device":"Random-Integer-Device","name":"Int16","value":"-23040"}]}
192.168.0.100 - - [26/Jan/2020 14:15:10] "POST /api/v1/echo HTTP/1.1" 200 -
...

簡単ですね。

これらの値は、冒頭で紹介した通り、それぞれのアプリケーションサービスが直接コアサービス層から値を受け取って処理したうえでエクスポートしています。データパスが分散して並列化するので、エクスポートサービスを使っていた頃に存在していたボトルネックが回避されます。

まとめ

エクスポートサービスを利用しないエクスポート手段として、アプリケーションサービスを構成して試しました。

使ってみると、確かにこのアーキテクチャのほうが素直で圧倒的に使い勝手が良いです。家の環境もこっちに置き換えました。

現段階ではバンドルされている関数は多くないですが、今後この辺りも充実してくることが期待できそうです。

EdgeX Foundry 関連エントリ


Rapsberry Pi

冬休みの自由研究: EdgeX Foundry (5) ルールエンジンによる自動制御

EdgeX Foundry 関連エントリ

今回のゴール

これまでのエントリでは、仮想デバイスや MQTT デバイスを用いて、デバイスからの情報の収集やデバイスへのコマンドの実行を試してきました。

今回は、ルールエンジンの動作、つまり、

  • 何らかのデバイスのリソースの値が
  • 何らかの条件を満たしたら
  • 別のデバイスでコマンドを実行する

ような自動制御を実際に試します。

前回のエントリ で構成した MQTT デバイスだけを利用してもすぐできるのですが、それだけだとおもしろくないので、新しいデバイスサービスをひとつ追加して、それを組み込みます。具体的には、

  • ルール (1)
    • REST デバイスから送られる値が
    • 80 を越えたら
    • MQTT デバイスにメッセージ HIGH を送信する
  • ルール (2)
    • REST デバイスから送られる値が
    • 20 を下回ったら
    • MQTT デバイスにメッセージ LOW を送信する

ような状態を目指します。図示すると以下のような状態です。

REST デバイスをトリガに MQTT デバイスが自動制御される

今回も GitHub にもろもろ置いてあります ので、こちらをクローンして使います。

$ git clone https://github.com/kurokobo/edgex-lab-handson.git

$ cd lab04

REST デバイスの追加

まだ開発途中で正式リリースには至っていないようですが、device-rest-go と名付けられたデバイスサービスがあります。

REST デバイスサービスの概要

これは、以下のようなデバイスサービスです。

  • REST で POST してくるデバイスに利用する
  • POST されたリクエストボディの中身を値として EdgeX Foundry に取り込む

このデバイスサービスを構成すると、エンドポイントとして

  • /api/v1/resource/<デバイス名>/<リソース名>

が作成され、これに対して POST することで値を取り込んでくれるようになります。デバイス名やリソース名は、前回取り扱った MQTT デバイスと同様、デバイスプロファイルなどの設定ファイルで定義できます。

なお、現時点では情報のやりとりは一方向であり、つまり、デバイス側から POST された値を取り込むのみで、逆にデバイス側へのリクエストの発行はできないようです。 また、JSON を投げても、現時点の実装ではパースはしてくれず単なる文字列として扱われるようです。

今回試す設計

今回は、 device-rest-goREADME.md を参考に、以下のデバイスサービスとデバイスを定義しています。

  • デバイスプロファイル
    • rest/rest.test.device.profile.yml ファイル
    • リソース intfloat を定義
  • デバイスサービス設定
    • rest/configuration.toml ファイル
    • 上記のデバイスプロファイルに紐づけた REST_DEVICE を定義

これにより、

  • /api/v1/resource/REST_DEVICE/int
  • /api/v1/resource/REST_DEVICE/float

に値を POST すれば取り込んでもらえるようになるはずです。

この REST デバイスサービスを含め、今回分の環境は、Docker Compose ファイルに反映済みです

起動

では、今回分の環境をまとめて起動させます。

MQTT デバイスは今回も使うので、 環境に合わせて mqtt/configuration.toml は修正してください。

MQTT ブローカ、MQTT デバイス、EdgeX Foundry の順に起動します。

$ docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto

$ docker run -d --restart=always --name=mqtt-scripts -v "$(pwd)/mqtt-scripts:/scripts" dersimn/mqtt-scripts --url mqtt://192.168.0.100 --dir /scripts

$ docker-compose up -d

REST デバイスサービスの動作確認

起動が確認できたら、実際にエンドポイントに POST して、データの取り込みを確認します。待ち受けポートは 49986 です。Content-Typetext/plain である必要があります。

$ curl -X POST -H "Content-Type: text/plain" -d 12345 http://localhost:49986/api/v1/resource/REST_DEVICE/int

edgex-device-rest コンテナのログで Pushed event to core data が記録されていれば、値は正常に受け付けられています。

$ docker logs --tail 5 edgex-device-rest
level=DEBUG ts=2020-01-25T07:49:17.0621062Z app=edgex-device-rest source=resthandler.go:82 msg="Received POST for Device=REST_DEVICE Resource=int"
level=DEBUG ts=2020-01-25T07:49:17.0673842Z app=edgex-device-rest source=resthandler.go:101 msg="Content Type is 'text/plain' & Media Type is 'text/plain' and Type is 'Int64'"
level=DEBUG ts=2020-01-25T07:49:17.0726606Z app=edgex-device-rest source=resthandler.go:142 msg="Incoming reading received: Device=REST_DEVICE Resource=int"
level=DEBUG ts=2020-01-25T07:49:17.0766349Z app=edgex-device-rest source=utils.go:75 correlation-id=946f482c-db1f-44b5-8a76-437c4ca3d3e9 msg="SendEvent: EventClient.MarshalEvent encoded event"
level=INFO ts=2020-01-25T07:49:17.0899243Z app=edgex-device-rest source=utils.go:93 Content-Type=application/json correlation-id=946f482c-db1f-44b5-8a76-437c4ca3d3e9 msg="SendEvent: Pushed event to core data"

実際に取り込まれているようです。

$ edgex-cli reading list REST_DEVICE -l 10
Reading ID                              Name    Device          Origin                  Value   Created         Modified        Pushed
b43f7300-1377-4fdb-a82f-44eb497d9568    int     REST_DEVICE     1579938557072646900     12345   3 minutes       3 minutes       50 years

ルールエンジンの利用

お膳立てができたので、本題のルールエンジンを構成していきます。

ルールエンジンの概要

詳細は 公式のドキュメント がわかりやすいですが、端的に言えば、

  • コアサービスまたはエクスポートサービスからデータを受け取る
  • 事前に定義されたルールの条件との合致を確認する
  • 合致していた場合は、そのルールで定義されたアクションを実行する

ような処理をしてくれるサービスです。

実装は現段階では BRMS の Drools ベースとのことなので、Java 製ですね。

ルールエンジンへデータを配信する構成パタンは、ドキュメントでは以下の二つが説明されています。

  • エクスポートサービスのクライアントとして動作させるパタン
    • データは Export Distribution(edgex-export-distro コンテナ)から配信される
    • データの流れが全体で統一されてシンプルに
    • レイテンシやエクスポートサービスのキャパシティなどの面にデメリット
  • コアサービスに直結させるパタン
    • データは Core Data(edgex-core-data コンテナ)から配信される
    • エクスポートサービスをバイパスすることで、レイテンシやパフォーマンス面ではメリットがある
    • データの流れはやや複雑に

この構成パタンの選択や、具体的な接続先の情報は、次で紹介する設定ファイルで指定できます。

ルールエンジンの設定

設定がデフォルトのままでよければ、コンテナイメージの中にすべて含まれているので、敢えて手で投入する必要はありませんが、今回はせっかくなので触れるようにしています。rulesengine フォルダの中のファイル群がそれです。

公式のドキュメント で触れられている設定が含まれるファイルが、rulesengine/application.properties です。

$ cat rulesengine/application.properties
...
export.client=false
...
export.client.registration.url=http://edgex-export-client:48071/api/v1
export.client.registration.name=EdgeXRulesEngine
...
export.zeromq.port=5563
export.zeromq.host=tcp://edgex-core-data
...

export.clientfalse なので、今回はコアサービスから直接データを受け取ります。export.zeromq.host は現時点のデフォルトでは tcp://edgex-app-service-configurable-rules になっており、アプリケーションサービス からデータを受け取る形になっていますが、今回はシンプルにドキュメントに従って tcp://edgex-core-data にしています。

もう一つのファイルが、rulesengine/rule-template.drl です。これはルールそのもののテンプレートとして利用されます。これについては後述します。

ルールの設定

では、実際にルールを設定していきます。現状、GUI でも CLI でもできないので、API を叩きます。標準 GUI には設定画面はあるものの、プルダウンに項目が出てこなくて設定できずでした。

ルールの検討

ルールは以下ような JSON で投入します。

{
    "name": "<ルール名>",
    "condition": {
        "device": "<トリガ条件にするデバイス名>",
        "checks": [
            {
                "parameter": "<トリガ条件にするリソース名>",
                "operand1": "<トリガ条件にする値>",
                "operation": "<比較演算子>",
                "operand2": "<トリガする閾値>"
            }
        ]
    },
    "action": {
        "device": "<操作対象のデバイス ID>",
        "command": "<操作対象のコマンド ID>",
        "body": "<操作対象のコマンドに PUT される内容>"
    },
    "log": "<トリガされたときのログ出力文字列>"
}

全体の構造は、condition で指定した条件を満たしたら action で指定したコマンドがトリガされる、と思えばよいでしょう。

condition での指定値は以下のように組んでいきます。

  • device
    • トリガ条件にするデバイスの名称を指定します。ID では動かないので注意です
    • 今回だと、REST デバイスの値を元にコマンドを実行したいので、REST_DEVICE を指定します
  • parameter
    • トリガ条件にするデバイスのリソース名を指定します。リソース名なので、デバイスプロファイルで指定した名称であり、すなわち Reading の名前でもあります
    • 今回の REST_DEVICE はリソース intfloat を持ちますが、今回は int を指定します
  • operand1
    • 比較元になる値を作ります。デバイスの値は内部では文字列値として扱われているため、数値であれば適切な型にキャストする必要があります
    • Drools が Java ベースのため、ここでは Java の文法で書きます。値に応じて、例えば以下などが使い分けられるでしょう
      • Integer.parseInt(value)
      • Float.parseFloat(value)
  • operation
    • 比較演算子です。<=> などが考えられます
  • operand2
    • 閾値です

action での指定値は、以下のように組みます。

  • device
    • 先ほどと異なり、ここでは ID で指定します
    • 今回は MQ_DEVICE の ID です
    • デバイスの ID は API で確認できます(方法は 過去のエントリ で)
  • command
    • コマンドも ID で指定します
    • 今回は testmessage の ID です
    • コマンドの ID も API で確認できます(方法は 過去のエントリ で)
  • body
    • コマンドの PUT 命令のボディを JSON 形式で指定します
    • めっちゃエスケープが必要です
    • 例えば今回だと、{\\\"message\\\":\\\"HIGH\\\"}{\\\"message\\\":\\\"LOW\\\"} です

最終的に、今回作りたい以下のルール群は、

  • ルール (1)
    • REST デバイスから送られる値が
    • 80 を越えたら
    • MQTT デバイスにメッセージ HIGH を送信する
  • ルール (2)
    • REST デバイスから送られる値が
    • 20 を下回ったら
    • MQTT デバイスにメッセージ LOW を送信する

ひとつめは以下の JSON で、

{
    "name": "rule_int_high",
    "condition": {
        "device": "REST_DEVICE",
        "checks": [
            {
                "parameter": "int",
                "operand1": "Integer.parseInt(value)",
                "operation": ">",
                "operand2": "80"
            }
        ]
    },
    "action": {
        "device": "09dae1fc-e2be-4388-9677-639d2f24c58b",
        "command": "1c7a50e7-5424-4bce-9b9a-29849510580e",
        "body": "{\\\"message\\\":\\\"HIGH\\\"}"
    },
    "log": "Action triggered: The value is too high."
}

ふたつめは以下の JSON であらわされます。

{
    "name": "rule_int_low",
    "condition": {
        "device": "REST_DEVICE",
        "checks": [
            {
                "parameter": "int",
                "operand1": "Integer.parseInt(value)",
                "operation": "<",
                "operand2": "20"
            }
        ]
    },
    "action": {
        "device": "09dae1fc-e2be-4388-9677-639d2f24c58b",
        "command": "1c7a50e7-5424-4bce-9b9a-29849510580e",
        "body": "{\\\"message\\\":\\\"LOW\\\"}"
    },
    "log": "Action triggered: The value is too low."
}

ルールの投入

では、ルールを投入します。エンドポイントは以下です。

  • http://localhost:48075/api/v1/rule

ここに、先の JSON を POST します。

ひとつめのルールの投入。true が返れば成功
ふたつめのルールの投入

ルールの確認

ルールが投入されると、rule-template.drl を元に新しい <ルール名>.drl ファイルが生成されて利用されます。edgex-support-rulesengine 内に配置されるので、ここではコンテナ内のファイルを cat して覗きます。

$ docker exec edgex-support-rulesengine cat /edgex/edgex-support-rulesengine/rules/rule_int_high.drl
package org.edgexfoundry.rules;
global org.edgexfoundry.engine.CommandExecutor executor;
global org.edgexfoundry.support.logging.client.EdgeXLogger logger;
import org.edgexfoundry.domain.core.Event;
import org.edgexfoundry.domain.core.Reading;
import java.util.Map;
rule "rule_int_high"
when
  $e:Event($rlist: readings && device=="REST_DEVICE")
  $r0:Reading(name=="int" && Integer.parseInt(value) > 80) from $rlist
then
executor.fireCommand("09dae1fc-e2be-4388-9677-639d2f24c58b", "1c7a50e7-5424-4bce-9b9a-29849510580e", "{\"message\":\"HIGH\"}");
logger.info("Action triggered: The value is too high.");
end

$ docker exec edgex-support-rulesengine cat /edgex/edgex-support-rulesengine/rules/rule_int_low.drl
package org.edgexfoundry.rules;
global org.edgexfoundry.engine.CommandExecutor executor;
global org.edgexfoundry.support.logging.client.EdgeXLogger logger;
import org.edgexfoundry.domain.core.Event;
import org.edgexfoundry.domain.core.Reading;
import java.util.Map;
rule "rule_int_low"
when
  $e:Event($rlist: readings && device=="REST_DEVICE")
  $r0:Reading(name=="int" && Integer.parseInt(value) < 20) from $rlist
then
executor.fireCommand("09dae1fc-e2be-4388-9677-639d2f24c58b", "1c7a50e7-5424-4bce-9b9a-29849510580e", "{\"message\":\"LOW\"}");
logger.info("Action triggered: The value is too low.");
end

手元の rule-template.drl と見比べると、テンプレートを元に展開されているっぽさがわかりますね。Drools の drl ファイルの文法にあまり詳しくないですが、行われているのが単純な文字列連結なのであれば、operand に指定する値あたりは工夫すると、もうちょっと複雑な計算もできるのかもしれません。試していませんし、インジェクション攻撃っぽいですけど。

なお、現在の API ではルールの中身までは確認できず、ルール名の一覧が取得できるのみのようです。悲しい。

$ curl -s http://localhost:48075/api/v1/rule | jq
[
  "rule_int_high",
  "rule_int_low"
]

ルールエンジンの動作確認

動作を確認していきます。

動きを追いやすくするため、下準備として edgex-support-rulesengine のログを常時表示させて、さらに別のターミナルで MQTT ブローカの全トピックを購読しておくとわかりやすいです。

$ docker logs -f --tail=10 edgex-support-rulesengine
[2020-01-25 13:44:15.023] boot - 6  INFO [main] --- ZeroMQEventSubscriber: JSON event received
[2020-01-25 13:44:15.024] boot - 6  INFO [main] --- ZeroMQEventSubscriber: Event sent to rules engine for device id:  MQ_DEVICE
[2020-01-25 13:44:17.214] boot - 6  INFO [main] --- ZeroMQEventSubscriber: JSON event received
[2020-01-25 13:44:17.216] boot - 6  INFO [main] --- ZeroMQEventSubscriber: Event sent to rules engine for device id:  MQ_DEVICE
...
$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "#" -v
logic/connected 2
DataTopic {"name":"MQ_DEVICE","cmd":"randfloat32","randfloat32":"27.0"}
...

では、REST デバイスの気持ちになって、まずはルールに合致しないデータを REST デバイスサービスに POST します。

$ curl -X POST -H "Content-Type: text/plain" -d 50 http://localhost:49986/api/v1/resource/REST_DEVICE/int

値は取り込まれていますし、

$ edgex-cli reading list REST_DEVICE -l 10
Reading ID                              Name    Device          Origin                  Value   Created         Modified        Pushed
66461804-a894-470f-a099-631ffd4c32cb    int     REST_DEVICE     1579960045357298000     50      About a minute  About a minute  50 years

ルールエンジンにも REST_DEVICE からの値は届いているようなログが出ますが、実際には何もトリガされません。正常です。

$ docker logs -f --tail=10 edgex-support-rulesengine
...
[2020-01-25 13:47:25.370] boot - 6  INFO [main] --- ZeroMQEventSubscriber: JSON event received
[2020-01-25 13:47:25.466] boot - 6  INFO [main] --- ZeroMQEventSubscriber: Event sent to rules engine for device id:  REST_DEVICE
...

つづいて、ルールに合致する値を投げます。

$ curl -X POST -H "Content-Type: text/plain" -d 90 http://192.168.0.100:49986/api/v1/resource/REST_DEVICE/int

ルールエンジンのログでは、指定したログメッセージが記録され、 {"message":"HIGH"} が指定したコマンドにリクエストされたことがわかります。

$ docker logs -f --tail=10 edgex-support-rulesengine
...
[2020-01-25 13:57:26.805] boot - 6  INFO [main] --- ZeroMQEventSubscriber: JSON event received
[2020-01-25 13:57:26.853] boot - 6  INFO [main] --- RuleEngine: Action triggered: The value is too high.
[2020-01-25 13:57:26.853] boot - 6  INFO [SimpleAsyncTaskExecutor-2] --- CommandExecutor: Sending request to:  09dae1fc-e2be-4388-9677-639d2f24c58bfor command:  1c7a50e7-5424-4bce-9b9a-29849510580e with body: {"message":"HIGH"}
[2020-01-25 13:57:26.866] boot - 6  INFO [main] --- RuleEngine: Event triggered 1rules: Event [pushed=0, device=REST_DEVICE, readings=[Reading [pushed=0, name=int, value=90, device=REST_DEVICE]], toString()=BaseObject [id=538aea34-b4ce-4342-88a9-6c08cdac080e, created=0, modified=0, origin=1579960646793054700]]
...

MQTT ブローカ上では、MQTT デバイスに対するコマンド実行が行われた様子がわかります。

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "#" -v
CommandTopic {"cmd":"message","message":"HIGH","method":"set","uuid":"5e2c4946b8dd790001754b8b"}
ResponseTopic {"cmd":"message","message":"HIGH","method":"set","uuid":"5e2c4946b8dd790001754b8b"}
...

MQTT デバイスの testmessage コマンドに GET すると、値が狙い通りに変更されていることがわかります。

$ curl -s http://localhost:48082/api/v1/device/name/MQ_DEVICE/command/testmessage | jq
{
  "device": "MQ_DEVICE",
  "origin": 1579960983817888000,
  "readings": [
    {
      "origin": 1579960983809258000,
      "device": "MQ_DEVICE",
      "name": "message",
      "value": "HIGH"
    }
  ],
  "EncodedEvent": null
}

同様に、REST デバイスの気持ちになってふたつめのルールに合致する値を投げます。

$ curl -X POST -H "Content-Type: text/plain" -d 10 http://192.168.0.100:49986/api/v1/resource/REST_DEVICE/int

もろもろの処理が動き、MQTT デバイス側の値が変わったことが確認できます。

$ curl -s http://localhost:48082/api/v1/device/name/MQ_DEVICE/command/testmessage | jq
{
  "device": "MQ_DEVICE",
  "origin": 1579961107845061000,
  "readings": [
    {
      "origin": 1579961107837083000,
      "device": "MQ_DEVICE",
      "name": "message",
      "value": "LOW"
    }
  ],
  "EncodedEvent": null
}

まとめ

あるデバイスの値の変化をトリガにして、別のデバイスを制御できることが確認できました。

今回は、REST デバイスを使った関係で、実質手動でトリガさせたに等しい状況でしたが、本来はセンサの値をトリガにアクチュエータを動かすような使い方になるでしょう。REST デバイスの代わりに MQTT デバイスや仮想デバイスのランダム値を condition に指定すれば、似たような状況のテストが可能です。

現段階ではシンプルなルールエンジンしか積まれていませんが、今後エコシステムが成熟してアプリケーションサービスなどが充実してくれば、より複雑な処理も可能になることが期待できます。

EdgeX Foundry 関連エントリ


Rapsberry Pi

冬休みの自由研究: EdgeX Foundry (4) MQTT デバイスサービスの追加

EdgeX Foundry 関連エントリ

おさらい

前々回のエントリ では、下図のとおり、バンドルされている仮想デバイスを利用して動作を確認していました。

前々回のエントリで構成されていたデバイスとデバイスサービス

これらの仮想デバイスは、デバイスからのデータの受け取りやデバイスへのコマンド実行などをテストする目的ではたいへん便利ですが、現実世界とのインタラクションはできません。

そこで今回は、MQTT をインタフェイスにもつデバイス(のシミュレータ)を用意し、下図のように EdgeX Foundry がそのデバイスと実際に(MQTT ブローカを介して)インタラクションできる状態を構成します。

MQTT 対応デバイス(のシミュレータ)を EdgeX Foundry の制御下におく

本エントリの作業内容は、エントリ執筆時点の 公式ドキュメント を基にしています。

今回も、GitHub にもろもろ置いてあります ので、こちらをクローンして使います。

$ git clone https://github.com/kurokobo/edgex-lab-handson.git
 
$ cd lab03

新しいデバイスの仕様と動作確認

まずは、今回新しく EdgeX Foundry の制御下におきたいデバイスの仕様を整理します。その後、実際にそのデバイスのシミュレータを動作させ、仕様通りに動くことを確認します。

新しいデバイスの仕様

今回は、以下のような仕様のデバイスを考えます。

  • 二種類のセンサを持つ
    • randfloat32
    • randfloat64
  • 二種類の文字列情報を持つ
    • ping
    • message
  • トピック DataTopic にセンサ randfloat32 の値を 15 秒ごとに配信する
    • {"name":"MQ_DEVICE","cmd":"randfloat32","randfloat32":"<値>"} の形式で配信する
  • トピック CommandTopic でコマンドを待ち受ける
    • {"name":"MQTT_DEVICE","method":"<get または set>","cmd":"<コマンド名>"} の形式のコマンドを受け取る
  • コマンドを受け取ったら、コマンド応じて処理を実行し、結果をトピック ResponseTopic に配信する
    • コマンドのメソッドが set だった場合は、保存されている message を書き換える
    • コマンドが ping だった場合は、pong を返す
    • コマンドが message だった場合は、保存されている message を返す
    • コマンドが randfloat32 または randfloat64 だった場合は、それぞれ対応するセンサの値を返す
    • 結果は {"name":"MQ_DEVICE","method":"<メソッド>","cmd":"<コマンド名>","<コマンド名>":"<値>"} の形式で配信する

デバイス側が定期的にデータを発信するだけでなく、コマンド操作も受け付けるようなデバイスです。ただしよく見ると、二つあるセンサのうち randfloat64 は、値の定期的な自動配信はありません(これは伏線です)。

シミュレータの起動

今回は、上記のようなデバイスを模したシミュレータを利用します。MQTT を扱うにはブローカが不可欠なので、前回のエントリ と同様にまずはこれを起動します。

$ docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto
6de5986ebb1b3715179253857952f143fbf19fa77e201980f8573d96558850fd

続けてデバイス(のシミュレータ)の用意です。

Node.js で MQTT を扱えるコンテナ dersimn/mqtt-scripts で、次のようなスクリプトを起動させます。

function getRandomFloat(min, max) {
    return Math.random() * (max - min) + min;
}

const deviceName = "MQ_DEVICE";
let message = "test-message";

// 1. Publish random number every 15 seconds
schedule('*/15 * * * * *', () => {
    let body = {
        "name": deviceName,
        "cmd": "randfloat32",
        "randfloat32": getRandomFloat(25, 29).toFixed(1)
    };
    publish('DataTopic', JSON.stringify(body));
});

// 2. Receive the reading request, then return the response
// 3. Receive the put request, then change the device value
subscribe("CommandTopic", (topic, val) => {
    var data = val;
    if (data.method == "set") {
        message = data[data.cmd]
    } else {
        switch (data.cmd) {
            case "ping":
                data.ping = "pong";
                break;
            case "message":
                data.message = message;
                break;
            case "randfloat32":
                data.randfloat32 = getRandomFloat(25, 29).toFixed(1);
                break;
            case "randfloat64":
                data.randfloat64 = getRandomFloat(10, 1).toFixed(5);
                break;
        }
    }
    publish("ResponseTopic", JSON.stringify(data));
});

これを起動するには、以下のように作業します。IP アドレスは適宜読み替えてください。

$ cd mqtt-scripts
$ docker run -d --restart=always --name=mqtt-scripts -v "$(pwd):/scripts" dersimn/mqtt-scripts --url mqtt://192.168.0.100 --dir /scripts
Unable to find image 'dersimn/mqtt-scripts:latest' locally
...
9d52b4956ebbec60dd0f93bdf9750ff915fefd8e8c58d3ce8bc7ba1429693d2b

シミュレータの動作確認

ブローカの全トピックを購読します。15 秒ごとに DataTopic にセンサ randfloat32 の値が届いていることがわかります。

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "#" -v
...
DataTopic {"name":"MQ_DEVICE","cmd":"randfloat32","randfloat32":"27.4"}
DataTopic {"name":"MQ_DEVICE","cmd":"randfloat32","randfloat32":"28.6"}

コマンド操作ができることも確認します。別のターミナルで CommandTopic にコマンドを配信します。Windows 環境だと "\" にしないと動かないかもしれません。

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "CommandTopic" -m '{"name":"MQTT_DEVICE","method":"get","cmd":"ping"}'

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "CommandTopic" -m '{"name":"MQTT_DEVICE","method":"get","cmd":"randfloat32"}'

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "CommandTopic" -m '{"name":"MQTT_DEVICE","method":"get","cmd":"randfloat64"}'

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "CommandTopic" -m '{"name":"MQTT_DEVICE","method":"get","cmd":"message"}'

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "CommandTopic" -m '{"name":"MQTT_DEVICE","method":"set","cmd":"message","message":"modified-message"}'

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "CommandTopic" -m '{"name":"MQTT_DEVICE","method":"get","cmd":"message"}'

購読している側では、コマンドが CommandTopic に届き、そのコマンドに応じた応答が ResponseTopic に配信されていることが確認できます。また、message コマンドの結果が、set メソッドの実行前後で変更されていることも確認できます。

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "#" -v
...
CommandTopic {"name":"MQTT_DEVICE","method":"get","cmd":"ping"}
ResponseTopic {"name":"MQTT_DEVICE","method":"get","cmd":"ping","ping":"pong"}
...
CommandTopic {"name":"MQTT_DEVICE","method":"get","cmd":"randfloat32"}
ResponseTopic {"name":"MQTT_DEVICE","method":"get","cmd":"randfloat32","randfloat32":"27.6"}
...
CommandTopic {"name":"MQTT_DEVICE","method":"get","cmd":"randfloat64"}
ResponseTopic {"name":"MQTT_DEVICE","method":"get","cmd":"randfloat64","randfloat64":"8.39883"}
...
CommandTopic {"name":"MQTT_DEVICE","method":"get","cmd":"message"}
ResponseTopic {"name":"MQTT_DEVICE","method":"get","cmd":"message","message":"test-message"}
...
CommandTopic {"name":"MQTT_DEVICE","method":"set","cmd":"message","message":"modified-message"}
ResponseTopic {"name":"MQTT_DEVICE","method":"set","cmd":"message","message":"modified-message"}
...
CommandTopic {"name":"MQTT_DEVICE","method":"get","cmd":"message"}
ResponseTopic {"name":"MQTT_DEVICE","method":"get","cmd":"message","message":"modified-message"}

ここまでで、デバイス(のシミュレータ)が動作している状態が作れました。ここまでは EdgeX Foundry はまったく関係なく、ただ単に前述の仕様のデバイスを論理的に作っただけです。

デバイスサービスの構成

さて、ここからが本題です。先ほど作ったデバイスを EdgeX Foundry の制御下におくことを考えます。

デバイスサービスの概念

現実世界に存在するデバイスは、それぞれが異なるセンサやスイッチを持っていて、異なるプロトコルをインタフェイスに持つため、それぞれに応じた適切な方法で値の取得や制御命令の発行をする必要があります。

この実態を踏まえ、EdgeX Foundry では、実デバイスと EdgeX Foundry の間を取り持ってくれる存在としてデバイスサービスとよばれるマイクロサービスを定義しています。デバイスサービスは、おおむねデバイスの種類ごとに用意するものと思えばよさそうで、つまり、ひとつのデバイスサービスに対してひとつ以上のデバイスの制御を任せられるようです。

デバイスサービスは、以下のような役割を持ちます。

  • コアサービス層に対して、デバイスからの情報取得やデバイスへの制御命令の発行を行える REST エンドポイントを提供する
  • コアサービス層からの制御命令を実デバイスに合わせた命令に翻訳して実行し、実デバイスを制御する
  • デバイスから送られてきた情報をコアサービス層に合わせた情報に翻訳し、コアサービス層に届ける
  • 事前に定められた場合は、定期的にデバイスに対して特定の処理を行い、結果をコアサービス層に届ける

また、デバイスサービスそのものは、以下のような特徴を持ちます。

  • MQTT や Modbus など業界で多く使われるプロトコルに合わせたデバイスサービスはすでに参考実装が用意されており、複雑でない構成の場合は充分に流用できる
  • 参考実装の流用では不足する場合は、提供されている SDK を利用して気軽に自製できる(C 言語または Go 言語)
  • すべてのデバイスサービスはこの SDK を用いて共通のフレームワークのもとで実装されるため、操作方法や設定方法は自ずと共通化され、相互運用性は高まる
  • EdgeX Foundry を取り巻くエコシステムの成熟により、デバイスのベンダがそのデバイス用のデバイスサービスを提供したり、デバイスにデバイスサービスが組み込まれたり、第三者により開発されたデバイスサービスの充実も期待できる

EdgeX Foundry に限った話ではないですが、プラットフォーム系の製品の場合、製品の発展にはエコシステムの成熟が非常に重要です。この観点では、EdgeX Foundry は、商用環境への利用を謳ったバージョン 1.0 のリリースが 2019 年の冬とついこの前なので、まだまだこれからですね。

デバイスサービスの構成要素

デバイスを制御するデバイスサービスを追加するには、以下の 3 つの要素を考える必要があります。

  • デバイスサービスそのもの
    • コアサービスとデバイスとの間を仲介する、C 言語または Go 言語で実装されたマイクロサービス
    • 今回はコンテナとして動作させる
    • 後述のデバイスプロファイルとデバイスサービス設定を読み込んで動作する
  • デバイスプロファイル(Device Profile)
    • YAML ファイルとして定義
    • 管理対象のデバイス種別が持つリソースや、そのリソースが持つ値の意味、それぞれのリソースに対して行える操作などを定義する
  • デバイスサービス設定(Drvice Service Configuration)
    • configuration.toml ファイルとして定義
    • デバイスサービス自身の設定や、連携する EdgeX Foundry の他のマイクロサービスの情報などを定義する
    • デバイスプロファイルと実際のデバイスを紐づけてデバイスを定義する。デバイスやリソースに対する自動実行処理なども定義する
    • その他、デバイスサービスの動作に必要なパラメータを指定する

デバイスプロファイル

今回は、mqtt.test.device.profile.yml を利用します。中身は以下の通りです。

$ cat mqtt/mqtt.test.device.profile.yml
name: "Test.Device.MQTT.Profile"
manufacturer: "Dell"
model: "MQTT-2"
labels:
- "test"
description: "Test device profile"

deviceResources:
- name: randfloat32
  description: "device random number with Base64 encoding"
  properties:
    value:
      { type: "Float32", size: "4", readWrite: "R", defaultValue: "0.00", minimum: "100.00", maximum: "0.00", floatEncoding: "Base64" }
    units:
      { type: "String", readWrite: "R", defaultValue: "" }
- name: randfloat64
  description: "device random number with e notion"
  properties:
    value:
      { type: "Float64", size: "4", readWrite: "R", defaultValue: "0.00", minimum: "100.00", maximum: "0.00", floatEncoding: "eNotation" }
    units:
      { type: "String", readWrite: "R", defaultValue: "" }
- name: ping
  description: "device awake"
  properties:
    value:
      { type: "String", size: "0", readWrite: "R", defaultValue: "oops" }
    units:
      { type: "String", readWrite: "R", defaultValue: "" }
- name: message
  description: "device notification message"
  properties:
    value:
      { type: "String", size: "0", readWrite: "W" ,scale: "", offset: "", base: ""  }
    units:
      { type: "String", readWrite: "R", defaultValue: "" }

deviceCommands:
- name: testrandfloat32
  get:
    - { index: "1", operation: "get", deviceResource: "randfloat32"}
- name: testrandfloat64
  get:
    - { index: "1", operation: "get", deviceResource: "randfloat64"}
- name: testping
  get:
    - { index: "1", operation: "get", deviceResource: "ping"}
- name: testmessage
  get:
    - { index: "1", operation: "get", deviceResource: "message"}
  set:
    - { index: "1", operation: "set", deviceResource: "message"}

coreCommands:
- name: testrandfloat32
  get:
    path: "/api/v1/device/{deviceId}/testrandfloat32"
    responses:
    - code: "200"
      description: "get the random float32 value"
      expectedValues: ["randfloat32"]
    - code: "500"
      description: "internal server error"
      expectedValues: []
- name: testrandfloat64
  get:
    path: "/api/v1/device/{deviceId}/testrandfloat64"
    responses:
    - code: "200"
      description: "get the random float64 value"
      expectedValues: ["randfloat64"]
    - code: "500"
      description: "internal server error"
      expectedValues: []
- name: testping
  get:
    path: "/api/v1/device/{deviceId}/testping"
    responses:
    - code: "200"
      description: "ping the device"
      expectedValues: ["ping"]
    - code: "500"
      description: "internal server error"
      expectedValues: []
- name: testmessage
  get:
    path: "/api/v1/device/{deviceId}/testmessage"
    responses:
    - code: "200"
      description: "get the message"
      expectedValues: ["message"]
    - code: "500"
      description: "internal server error"
      expectedValues: []
  put:
    path: "/api/v1/device/{deviceId}/testmessage"
    parameterNames: ["message"]
    responses:
    - code: "204"
      description: "set the message."
      expectedValues: []
    - code: "500"
      description: "internal server error"
      expectedValues: []

デバイスプロファイルは、デバイスと一対一ではなく、例えばデバイスのモデルごとにひとつ用意するイメージです 。

この例では、冒頭の 6 行でデバイスのモデルを定義しています。先ほど動作させたデバイス(のシミュレータ)は、Dell 製の MQTT-2 というモデルである、という想定で書かれています。

その後、大きく deviceResourcesdeviceCommandscoreCommands、の三つのセクションが続いています。

deviceResources セクションでは、デバイスが持っているリソースと、それが持つ値の意味を決めています。この例では、このデバイスにリソース randfloat64 があり、それは 0.00 から 100.00 の範囲の Float64 型の値を持つと定義されています。また、message リソースは String 型の値を持ち、他のリソースと違って書き換えが可能(W)と定義されていることがわかります。

deviceCommands セクションでは、このデバイスに対して実行できるコマンドを定義し、そのコマンドのメソッド(get または set)ごとに参照または操作されるリソースを紐付けています。例えば、testrandfloat64 コマンドの get メソッドは、先に deviecResources で定義したリソース randfloat64 からの値の読み取りを行えるように定義されています。また、testmessage コマンドでは、set または get メソッドにより、リソース message の値の取得や変更ができるように定義されています。なお、ここでは実装されていませんが、ひとつのメソッドに複数のリソースを紐付けることも可能です。つまり、一回の get 操作で複数のリソースの値を同時に取得するようにも構成できます。Edge Xpert のドキュメントでは、そのような例が解説され ています。

coreCommands セクションでは、コアサービス層からデバイスに対して実行できるコマンドを定義しています。例えば、コアサービスが testrandfloat64 コマンドに GET リクエストを実行すると、その命令はパス /api/v1/device/{deviceId}/testrandfloat64 に渡されます。このパスは、先の deviceCommands で定義した testrandfloat64 コマンドを表すものです。そしてコアサービスは、結果としてレスポンスコード 200 が返ってきた場合は、そのレスポンスボディに含まれる値を randfloat32 の値として取り込むように構成されています。また、testmessage コマンドのみ、ほかのコマンドと異なり PUT リクエストの動作が定義されています。

デバイスサービス設定

今回は、configuration.toml を利用します。中身は以下の通りです。

このファイルは環境依存の固定値を含みます。自分の環境で試す場合は、IP アドレスを環境に合わせて書き換えてください。

$ cat mqtt/configuration.toml
# configuration.toml
[Writable]
LogLevel = 'DEBUG'

[Service]
Host = "edgex-device-mqtt"
Port = 49982
ConnectRetries = 3
Labels = []
OpenMsg = "device mqtt started"
Timeout = 5000
EnableAsyncReadings = true
AsyncBufferSize = 16

[Registry]
Host = "edgex-core-consul"
Port = 8500
CheckInterval = "10s"
FailLimit = 3
FailWaitTime = 10
Type = "consul"

[Logging]
EnableRemote = false
File = "./device-mqtt.log"

[Clients]
  [Clients.Data]
  Name = "edgex-core-data"
  Protocol = "http"
  Host = "edgex-core-data"
  Port = 48080
  Timeout = 50000

  [Clients.Metadata]
  Name = "edgex-core-metadata"
  Protocol = "http"
  Host = "edgex-core-metadata"
  Port = 48081
  Timeout = 50000

  [Clients.Logging]
  Name = "edgex-support-logging"
  Protocol = "http"
  Host ="edgex-support-logging"
  Port = 48061

[Device]
  DataTransform = true
  InitCmd = ""
  InitCmdArgs = ""
  MaxCmdOps = 128
  MaxCmdValueLen = 256
  RemoveCmd = ""
  RemoveCmdArgs = ""
  ProfilesDir = "/custom-config"

# Pre-define Devices
[[DeviceList]]
  Name = "MQ_DEVICE"
  Profile = "Test.Device.MQTT.Profile"
  Description = "General MQTT device"
  Labels = [ "MQTT"]
  [DeviceList.Protocols]
    [DeviceList.Protocols.mqtt]
       Schema = "tcp"
       Host = "192.168.0.100"
       Port = "1883"
       ClientId = "CommandPublisher"
       User = ""
       Password = ""
       Topic = "CommandTopic"
  [[DeviceList.AutoEvents]]
    Frequency = "30s"
    OnChange = false
    Resource = "testrandfloat64"

# Driver configs
[Driver]
IncomingSchema = "tcp"
IncomingHost = "192.168.0.100"
IncomingPort = "1883"
IncomingUser = ""
IncomingPassword = ""
IncomingQos = "0"
IncomingKeepAlive = "3600"
IncomingClientId = "IncomingDataSubscriber"
IncomingTopic = "DataTopic"
ResponseSchema = "tcp"
ResponseHost = "192.168.0.100"
ResponsePort = "1883"
ResponseUser = ""
ResponsePassword = ""
ResponseQos = "0"
ResponseKeepAlive = "3600"
ResponseClientId = "CommandResponseSubscriber"
ResponseTopic = "ResponseTopic"

このファイルで、実際のデバイスを定義したり、必要なパラメータを指定したりします。

前半部分では EdgeX Foundry 内の他のサービスの情報などを指定しています。

実際のデバイスの定義は [[DeviceList]] の部分です。デバイスプロファイル Test.Device.MQTT.Profile に紐づけてデバイス MQ_DEVICE を定義しています。また、このデバイスに対するコマンドの実行に利用する MQTT ブローカも指定しています。

[[DeviceList.AutoEvents]] の部分では、デバイス側からは自動では配信されない値であった randfloat64 の値を定期的に取得するため、30 秒ごとに testrandfloat64 コマンドを実行するように設定しています。

[Driver] の部分では、デバイスサービスがデバイスから情報を受け取るために購読する MQTT ブローカやトピック名を設定しています。

なお、今回はこのファイルの中で実デバイスの定義もしてしまっていますが、デバイスはあとからも追加が可能です。また、このファイルはあくまで MQTT デバイスサービスの設定ファイルなので、利用したいデバイスサービスが違えば当然この設定ファイルで書くべき内容も変わります。

デバイスサービスを含んだ EdgeX Foundry の起動

設定ファイルができたら、デバイスサービスを含む EdgeX Foundry を起動します。

今回は、Docker Compose ファイルに以下を足しています。これにより、 edgexfoundry/docker-device-mqtt-go イメージがコンテナとして実行され、今回用の設定ファイルが含まれる mqtt ディレクトリがコンテナにマウントさたうえで、起動時にはその中身を利用して EdgeX Foundry にデバイスサービスとデバイスが登録されます。

  device-mqtt:
    image: edgexfoundry/docker-device-mqtt-go:1.1.1
    ports:
      - "49982:49982"
    container_name: edgex-device-mqtt
    hostname: edgex-device-mqtt
    networks:
      - edgex-network
    volumes:
      - db-data:/data/db
      - log-data:/edgex/logs
      - consul-config:/consul/config
      - consul-data:/consul/data
      - ./mqtt:/custom-config
    depends_on:
      - data
      - command
    entrypoint:
      - /device-mqtt
      - --registry=consul://edgex-core-consul:8500
      - --confdir=/custom-config

では、実際にこのファイルを利用して EdgeX Foundry を起動させます。

$ docker-compose up -d

動作確認

まずは、デバイスサービスやデバイスが正常に EdgeX Foundry に認識されているか確認し、その後、値の収集やコマンド操作を確認します。

登録状態の確認

以前のエントリで device-virtual の存在を確認した方法 で、今回はデバイスサービス edgex-device-mqtt とデバイス MQ_DEVICE が確認できれば、登録は成功しています。ラクなのは UI か CLI ですね。

また、これまで紹介しませんでしたが、EdgeX Foundry はマイクロサービス群の状態管理に HashiCorp の Consul を利用しており、各サービスの起動状態や登録状態は、この Consul を通じても確認できます。

Consul の GUI には、http://192.168.0.100:8500/ でアクセスできます。

Consul の GUI

ここで、

  • [Services] タブで edgex-device-mqtt が緑色である
  • [Key/Value] タブで edgex > devices > 1.0edgex-device-mqtt がある
  • [Key/Value] タブで edgex > devices > 1.0 > edgex-device-mqtt > DeviceList > 0 > NameMQ_DEVICE である

あたりが確認できれば、登録はできていると考えてよいでしょう。

デバイスから配信されている値の収集の確認

今回のデバイスは、センサ randfloat32 の値を 15 秒ごとに自動的に MQTT トピックに配信していました。デバイスサービスはこれを受け取れるように構成しましたので、EdgeX Foundry に取り込まれているはずです。

これも、以前のエントリで値を確認した方法 で確認できます。これもラクなのは GUI か CLI ですが、例えば、API で確認する場合は以下の通りです。

$ curl -s http://localhost:48080/api/v1/reading/name/randfloat32/3 | jq
[
  {
    "id": "23fdac76-0870-4b9d-b653-561d8a7c0423",
    "created": 1579707885029,
    "origin": 1579707885005379300,
    "modified": 1579707885029,
    "device": "MQ_DEVICE",
    "name": "randfloat32",
    "value": "QdpmZg=="
  },
  {
    "id": "5c086b1d-6b38-48c0-9aef-9b800f9c72ab",
    "created": 1579707870018,
    "origin": 1579707870004742000,
    "modified": 1579707870018,
    "device": "MQ_DEVICE",
    "name": "randfloat32",
    "value": "QeGZmg=="
  },
  {
    "id": "de67c865-0406-408d-b238-d33de1fe1032",
    "created": 1579707855022,
    "origin": 1579707855011006200,
    "modified": 1579707855022,
    "device": "MQ_DEVICE",
    "name": "randfloat32",
    "value": "Qd2Zmg=="
  }
]

時刻はエポックミリ秒(origin はエポックナノ秒)なので、変換すると 15 秒おきであることが確認できます。

$ date -d "@1579707885.029"
Wed 22 Jan 2020 03:44:45 PM UTC

$ date -d "@1579707870.018"
Wed 22 Jan 2020 03:44:30 PM UTC

$ date -d "@1579707855.022"
Wed 22 Jan 2020 03:44:15 PM UTC

デバイスサービスの自動実行イベントの確認

randfloat64 の値は、自動では配信されなかったため、デバイスサービスの設定(configuration.toml)に [[DeviceList.AutoEvents]] を定義して、デバイスサービス側から 30 秒ごとに自動で収集させていました。

これもデータの蓄積を任意の方法で確認できます。例えば CLI で確認する場合は以下の通りです。

$ edgex-cli reading list MQ_DEVICE -l 10 | grep randfloat64
5df1da1a-91db-41cf-99fa-962e87077395    randfloat64     MQ_DEVICE       1579709342806935600     4.924730e+00    15 seconds      15 seconds      50 years
7c33bb76-c9e9-4ef8-a410-80227e236d13    randfloat64     MQ_DEVICE       1579709312695223100     3.734020e+00    45 seconds      45 seconds      50 years
053987f1-5a89-4cc2-a433-03432caa9039    randfloat64     MQ_DEVICE       1579709282611878800     9.320210e+00    About a minute  About a minute  50 years

これも変換すると 30 秒おきであることが確認できます。

$ date -d "@1579709342.806935600"
Wed Jan 22 16:09:02 UTC 2020

$ date -d "@1579709312.695223100"
Wed Jan 22 16:08:32 UTC 2020

$ date -d "@1579709282.611878800"
Wed Jan 22 16:08:02 UTC 2020

デバイスへのコマンド実行の確認

このデバイスは、リソース message の値は EdgeX Foundry から変更できるような仕様で、デバイスプロファイルでもそれに合わせて PUT リクエストの挙動を定義していました。

では、この操作ができることを確認します。

デバイスプロファイルで定義した各コマンドは、内部では REST エンドポイントでリクエストを待ち受けてており、そしてこの URL は、API 経由で確認できます。具体的には、デバイスを示すエンドポイントへの GET リクエストの結果に含まれます。

$ curl -s http://localhost:48082/api/v1/device/name/MQ_DEVICE | jq
...
  "id": "77fccbb8-f33b-42d8-bf21-6d55cdde2068",
  "name": "MQ_DEVICE",
...
  "commands": [
...
      "name": "testmessage",
...
      "get": {
        "path": "/api/v1/device/{deviceId}/testmessage",
...
        "url": "http://edgex-core-command:48082/api/v1/device/77fccbb8-f33b-42d8-bf21-6d55cdde2068/command/4daa4148-7c0e-4cfd-81cc-b2b740bb4de4"
      },
      "put": {
        "path": "/api/v1/device/{deviceId}/testmessage",
...
        "url": "http://edgex-core-command:48082/api/v1/device/77fccbb8-f33b-42d8-bf21-6d55cdde2068/command/4daa4148-7c0e-4cfd-81cc-b2b740bb4de4",
...

EdgeX Foundry がデバイスにコマンドを実行するときは、内部ではこの URL に対して GET なり PUT なりをリクエストするわけです。そしてデバイスサービスがそれを受け取って、設定したとおりに翻訳してデバイスへの操作が実行され応答が返ることになります。

ここでは実際に、確認できた URL にリクエストを投げます。外部からこの URL を直接叩くときは、ホスト名部分を環境に合わせて書き換えたうえで叩きます。今回であれば、次のような操作で現在値が確認できます。

$ curl -s http://localhost:48082/api/v1/device/77fccbb8-f33b-42d8-bf21-6d55cdde2068/command/4daa4148-7c0e-4cfd-81cc-b2b740bb4de4 | jq
{
  "device": "MQ_DEVICE",
  "origin": 1579710432121478400,
  "readings": [
    {
      "origin": 1579710432109675800,
      "device": "MQ_DEVICE",
      "name": "message",
      "value": "test-message"
    }
  ],
  "EncodedEvent": null
}

まだ何も変更していないので、初期値(mqtt-script.js で定義されている)が返ってきました。

では、変更の PUT を投げます。JSON をリクエストボディに含めて PUT します。

$ curl -s -X PUT -d '{"message":"modified-message"}' http://localhost:48082/api/v1/device/77fccbb8-f33b-42d8-bf21-6d55cdde2068/command/4daa4148-7c0e-4cfd-81cc-b2b740bb4de4

エラーがなければ、再度 GET すると、値が変わっていることが確認できます。

ところで、URL は /api/v1/device/<デバイス ID>/command/<コマンド ID> なパスですが、実はわざわざ ID を調べなくても、/api/v1/device/name/<デバイス名>/command/<コマンド名> でも機能します。というわけで、お好みに合わせてどちらを使ってもよいですが、せっかくなので今度はこちらに GET します。

$ curl -s http://localhost:48082/api/v1/device/name/MQ_DEVICE/command/testmessage | jq
{
  "device": "MQ_DEVICE",
  "origin": 1579710963293832000,
  "readings": [
    {
      "origin": 1579710963283338200,
      "device": "MQ_DEVICE",
      "name": "message",
      "value": "modified-message"
    }
  ],
  "EncodedEvent": null
}

値が変わっていることが確認できました。

なお、このような GET や PUT に相当する操作は、GUI でも実行できます。例えば Closure UI であれば、[DEVICES] 画面のデバイス MQ_DEVICEACTION の [Control Device] アイコンから現在値が確認できます。この裏側では、この画面を開く際に、全部のリソースに対して順番に GET が発行されています。

現在値が確認できる

この画面では、PUT リクエストが可能なリソースには ACTION 欄に Set Value アイコンが出るのですが、実行しようとしても手元の環境では入力欄が表示されず使えませんでした……。

Closure UI ではない標準 UI であれば、PUT も実行できました。

Method を set にして send するだけ

まとめ

新しいデバイス(のシミュレータ)を用意し、それに合わせたデバイスサービスを構成して、このデバイスを EdgeX Foundry の制御下においたうえで、実際の動作を確認できました。

実際のシーンでは、本当のデバイスの本当の仕様に合わせてデバイスサービスを構成する必要があり、場合によってはデバイスサービスそれ自体の実装を含めてカスタマイズが必要になる可能性ももちろんあるわけです。

が、今回確認した通り、デバイスや環境に依存する部分(リソースやコマンドの名前や数、MQTT ブローカのホスト情報やトピック名など)は、デバイスプロファイルやデバイスサービス設定として別ファイルで定義できるようになっていて、デバイスサービスの実装それ自体は、設定ファイルに従って淡々と動くだけでもあります。

例えば Raspberry Pi に接続したセンサの値を取り込みたい場合を考えると、実際のセンサ値を MQTT トピックに投げる部分は自製する必要がありますが、であればその際に、逆に今回使ったこのデバイスサービスの仕様を前提にした投げ方にしてしまうことも可能です。そうすれば、このデバイスサービスをそのまま実際に流用できることになります。

今回は MQTT の例でしたが、それ以外のプロトコル用の参考実装も、環境依存の部分が設定ファイル群で外在化できるような汎用性のある形で作られているようなので、それなりに様々なデバイスで使いまわせそうです。

なお、EdgeX Foundry に取り込まれたデータは、前回のエントリで扱ったエクスポートの設定 や、アプリケーションサービスを構成することで、簡単に持ち出せます。コアサービス層に集約される段階で、すでにデバイスの差異は抽象化されているので、もちろん今回の MQTT で収集したデータも他のデバイスの情報とまったく同じ方法でエクスポート可能です。

EdgeX Foundry 関連エントリ


Rapsberry Pi

冬休みの自由研究: EdgeX Foundry (3) エクスポートサービスによるエクスポート

EdgeX Foundry 関連エントリ

エクスポートの仕組み

前回のエントリ では、EdgeX Foundry を稼働させて、仮想デバイスのデータが蓄積される様子を観察しました。続いては、外部へのエクスポートを試します。

EdgeX Foundry は、エッジ内で閉じて利用することももちろん可能ですが、例えば処理したデータをクラウドに投げたい、オンプレミスの別のシステムに投げたい、など、EdgeX Foundry の外部へデータを連携したいシーンも考えられます。

このようなときに、エクスポートサービスを構成することで、コアサービスに届けられたデータをリアルタイムに外部に送れるようになります。

赤枠が今回関係するところ

EdgeX Foundry では、エクスポート先をエクスポートサービスのクライアントとして定義しています。クライアントは、クライアント登録サービス(図中の CLIENT REGISTRATION、今回の構成では edgex-export-client コンテナ)を通じてデータベースに接続情報を保存することで新規に登録できます。

エクスポート先へのデータの配信は、実際には配信サービス(図中 DISTRIBUTION、edgex-export-distro コンテナ)が行っています。コアサービス層からのイベントの配信を受けて、データベースに登録されたクライアントにデータを送出する役割を担っています。

エクスポートの準備

今回は、エクスポート方法として、

  • MQTT トピックへの配信によるエクスポート
  • REST エンドポイントへの POST によるエクスポート

の二つのパターンを構成します。

前述の通り、エクスポート先は エクスポートサービスのクライアントと考えるため、実際には、

  • MQTT トピックをエクスポートクライアントとして登録する
  • REST エンドポイントをエクスポートクライアントとして登録する

という操作を行います。

さて、今回も GitHub にもろもろ置いてあります ので、こちらをクローンして使います。

$ git clone https://github.com/kurokobo/edgex-lab-handson.git

$ cd lab02

MQTT ブローカの準備

エクスポートクライアントとして MQTT トピックを登録するため、そのトピックを提供する MQTT ブローカを作ります。

本来の用途を考えれば、エッジからさらにフォグかクラウドに送るイメージなので、test.mosquitto.orgCloudMQTT などのクラウドっぽいサービスに投げるほうがそれっぽいですが、今回はお試しなので、ブローカはローカルに立ててしまいます。

といっても、コンテナイメージがあるので、おもむろに起動させるだけです。

$ docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto
Unable to find image 'eclipse-mosquitto:latest' locally
...
52e107224959223a3132466b5356278f637daa855aadc3a1345c71c193deb4df

起動できたら、簡単にテストします。クライアントもコンテナのイメージが(非公式ですが)あるのでお借りして、適当なトピック名を指定して購読を開始してから、

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "edgex-handson-topic" -d
Unable to find image 'efrecon/mqtt-client:latest' locally
...
Client mosqsub|6-1f6d3fb35f68 sending CONNECT
Client mosqsub|6-1f6d3fb35f68 received CONNACK (0)
Client mosqsub|6-1f6d3fb35f68 sending SUBSCRIBE (Mid: 1, Topic: edgex-handson-topic, QoS: 0)
Client mosqsub|6-1f6d3fb35f68 received SUBACK
Subscribed (mid: 1): 0

別のコンテナで同じトピックに配信します。

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "edgex-handson-topic" -d -m "TEST MESSAGE"
Client mosqpub|6-96175cb072bd sending CONNECT
Client mosqpub|6-96175cb072bd received CONNACK (0)
Client mosqpub|6-96175cb072bd sending PUBLISH (d0, q0, r0, m1, 'edgex-handson-topic', ... (20 bytes))
Client mosqpub|6-96175cb072bd sending DISCONNECT

購読している側で、メッセージが配信されてくることが確認できれば成功です。

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "edgex-handson-topic" -d
...
TEST MESSAGE

デバッグメッセージが出て邪魔な場合は、-d オプションを消すと静かになります。

REST エンドポイントの準備

続いて、エクスポートクライアントとして登録する REST エンドポイントを作ります。

登録すると、REST エンドポイント側には単なる POST リクエストで届くので、POST されたリクエストボディが確認できれば動作確認には充分です。ここでは、Python の Flask で、リクエストボディを標準出力に吐くだけの簡単な Web サーバを作ります。

from flask import Flask, request
from datetime import datetime

app = Flask(__name__)


@app.route("/api/v1/echo", methods=["POST"])
def echo():
    print("--\n{}".format(datetime.now()))
    print(request.get_data().decode("utf-8"))
    return "OK", 200


if __name__ == "__main__":
    app.run(debug=True, host="0.0.0.0", port=5000)

リポジトリには配置済みなので、クローンしてある場合はそのまま起動できます。

$ cd lab02/rest-endpoint
$ python ./main.py
 * Serving Flask app "main" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: on
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 159-538-312
 * Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)

待ち受けができたら、cURL や Postman などで適当な POST リクエストを発行します。

$ curl -X POST -d '{"message": "TEST MESSAGE"}' http://192.168.0.100:5000/api/v1/echo
OK

Web サーバの出力に POST したリクエストボディが表示されれば成功です。

$ python ./main.py
...
--
2020-01-21 00:03:26.503181
{"message": "TEST MESSAGE"}
192.168.0.220 - - [21/Jan/2020 00:03:26] "POST /api/v1/echo HTTP/1.1" 200 -

エクスポートクライアントの登録

ここまでで、エクスポートされればその中身が確認できる状態が整ったので、続けて EdgeX Foundry 側に実際に MQTT トピックや REST エンドポイントをエクスポートクライアントとして登録していきます。

EdgeX Foundry が起動していない場合は、前回のエントリ なども参考にして起動させます。

$ cd lab02
$ docker-compose up -d

エクスポートクライアントの追加は、API や GUI で行えます。CLI では現時点では難しそうです。

API でのエクスポートクライアントの登録

API によるエクスポートクライアントの登録は、edgex-export-client のエンドポイントに JSON で POST することで行えます。cURL や Postman などで実行できます。

MQTT トピックをエクスポートクライアントとして登録するには、以下のような JSON を組んで、

{
    "name": "edgex-handson-mqtt",
    "addressable": {
        "name": "edgex-handson-mqttbroker",
        "protocol": "tcp",
        "address": "192.168.0.100",
        "port": 1883,
        "topic": "edgex-handson-topic"
    },
    "format": "JSON",
    "enable": true,
    "destination": "MQTT_TOPIC"
}

これを http://localhost:48071/api/v1/registration に POST します。 登録されたクライアントの ID が返ってきます。

$ curl -X POST -d '{"name":"edgex-handson-mqtt","addressable":{"name":"edgex-handson-mqttbroker","protocol":"tcp","address":"192.168.0.100","port":1883,"topic":"edgex-handson-topic"},"format":"JSON","enable":true,"destination":"MQTT_TOPIC"}' http://localhost:48071/api/v1/registration
ebdde555-001b-4798-817a-da75b92406c7

登録したらその時点からエクスポートが開始されるので、指定した MQTT トピックを購読すれば、値が届いていることが確認できます。

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "edgex-handson-topic" -d
...
Client mosqsub|6-1f6d3fb35f68 received PUBLISH (d0, q0, r0, m0, 'edgex-handson-topic', ... (258 bytes))
{"id":"ac6d9f85-7c9f-4300-b6e1-f51f8b8e0e69","device":"Random-Integer-Device","origin":1579534468929275000,"readings":[{"id":"8c9267a0-c8a1-4e9e-9a60-90d7df8966fc","origin":1579534468917108600,"device":"Random-Integer-Device","name":"Int16","value":"8409"}]}
Client mosqsub|6-1f6d3fb35f68 received PUBLISH (d0, q0, r0, m0, 'edgex-handson-topic', ... (263 bytes))
{"id":"0539ed5c-68c3-4a12-a936-743ced169f7d","device":"Random-Integer-Device","origin":1579534468955468400,"readings":[{"id":"4b52dc78-efd9-4234-982c-1a17f21640f3","origin":1579534468943312700,"device":"Random-Integer-Device","name":"Int32","value":"-54648128"}]}

続けて、REST エンドポイントへのエクスポートを追加します。POST する JSON は以下です。

{
    "name": "edgex-handson-rest",
    "addressable": {
        "name": "edgex-handson-restendpoint",
        "protocol": "http",
        "method": "POST",
        "address": "192.168.0.100",
        "port": 5000,
        "path": "/api/v1/echo"
    },
    "format": "JSON",
    "enable": true,
    "destination": "REST_ENDPOINT"
}

POST する先は同様に http://localhost:48071/api/v1/registration です。

$ curl -X POST -d '{"name":"edgex-handson-rest","addressable":{"name":"edgex-handson-restendpoint","protocol":"http","method":"POST","address":"192.168.0.100","port":5000,"path":"/api/v1/echo"},"format":"JSON","enable":true,"destination":"REST_ENDPOINT"}' http://localhost:48071/api/v1/registration
6092b921-b173-4de7-8441-f89461734c17

こちらも追加した段階からエクスポートが開始されるので、REST エンドポイント側で、POST リクエストが来ていることが確認できます。

$ python ./main.py
...
--
2020-01-21 00:47:45.055939
{"id":"a0f8347d-e3d0-4bfc-ae20-98fcb9714224","device":"Random-Integer-Device","origin":1579535264923285900,"readings":[{"id":"9c5313ef-4a04-48f1-bcd4-cea06a08c0f5","origin":1579535264898458900,"device":"Random-Integer-Device","name":"Int16","value":"-18516"}]}
192.168.0.100 - - [21/Jan/2020 00:47:45] "POST /api/v1/echo HTTP/1.1" 200 -
--
2020-01-21 00:47:45.181942
{"id":"7cf0d138-ee3d-47e0-8875-877a5ea1111d","device":"Random-Integer-Device","origin":1579535264973814800,"readings":[{"id":"372eb7bf-9037-43ed-aa53-6d9cd4cafc5a","origin":1579535264962783700,"device":"Random-Integer-Device","name":"Int32","value":"-1557838489"}]}
192.168.0.100 - - [21/Jan/2020 00:47:45] "POST /api/v1/echo HTTP/1.1" 200 -

API でのエクスポートクライアントの確認

登録済みのエクスポート設定は、GET リクエストで確認できます。

$ curl -s http://localhost:48071/api/v1/registration | jq
[
  {
    "id": "ebdde555-001b-4798-817a-da75b92406c7",
    "created": 1579534374854,
    "modified": 1579534374854,
    "origin": 0,
    "name": "edgex-handson-mqtt",
...
  },
  {
    "id": "6092b921-b173-4de7-8441-f89461734c17",
    "created": 1579535149239,
    "modified": 1579535149239,
    "origin": 0,
    "name": "edgex-handson-rest",
...
  }
]

API でのエクスポートクライアントの削除

削除は DELETE リクエストで行えます。エンドポイントはそれぞれ以下です。

  • 削除対象を ID で指定する場合
    • http://localhost:48071/api/v1/registration/id/<ID>
  • 削除対象を名前で指定する場合
    • http://localhost:48071/api/v1/registration/name/<名前>

cURL で実行する場合は以下が例です。

$ curl -X DELETE http://localhost:48071/api/v1/registration/id/ebdde555-001b-4798-817a-da75b92406c7
true

$ curl -X DELETE http://localhost:48071/api/v1/registration/name/edgex-handson-rest
true

GUI でのエクスポートクライアントの登録

ここまで API でがんばってきましたが、前回のエントリ で紹介した Closure UI では、エクスポートクライアントの追加も削除も非常に楽に行えます。

http://localhost:8080/ にアクセスしてログイン後、[EXPORT] を開きます。

まだ何も登録されていない

ここから、画面に従って値を入れていけば完了です。例えば今回の MQTT トピックをクライアントとして登録する場合は、以下のように入力します。

項目
DestinationMQTT Topic
Nameedgex-handson-mqtt
Enableオン(緑)
Export formatJSON
ProtocolTCP
Address192.168.0.100
Port1883
Topicedgex-handson-topic
上記以外デフォルト値

REST エンドポイントの場合は次のようにします。

項目
DestinationREST Endpoint
Nameedgex-handson-rest
Enableオン(緑)
Export formatJSON
ProtocolHTTP
Address192.168.0.100
Port5000
Path/api/v1/echo
上記以外デフォルト値
登録後の状態

GUI でのエクスポートクライアントの削除

削除は、登録後の画面で、削除したいクライアントの右端、[ACTION] 欄にある [Delete Export] アイコンをクリックするだけです。簡単ですね。

削除前に確認が出る

アプリケーションサービスによるエクスポート

……と、意気揚々と書いてきたものの、今回利用したエクスポートサービスは最近のリリースではすでに廃止されており、エクスポートの機能は アプリケーションサービス に統合されているようです。

アプリケーションサービスは、データに対して変換やフィルタなどさまざまな処理を行えますが、その処理のひとつとして、今回行ったようなエクスポートも行えます。複数の処理をパイプライン化してつなげることで、例えば特定のデバイスの値だけフィルタしたあとにエクスポート、などが可能です。

エクスポートサービスでは、すべてのデータがエクスポートサービスを通るため、データ量やエクスポート先が多い場合にパフォーマンス面での問題が考えられますが、アプリケーションサービスでは、各々がコアサービス層のメッセージバスに直接つないでデータを受け取れるようにすることで、こうした問題に対応しているようですね。

SDK も提供されており、自製しなくても単純なフィルタやエクスポートは app-service-configurable を利用して簡単に行えそうです。

アプリケーションサービスを使ったエクスポートは、別のエントリで紹介 していますので、併せてどうぞ。

まとめ

エクスポートサービスを構成して、EdgeX Foundry で収集したデータを外部に送れることが確認できました。エクスポート先の任意のシステムでは、これらの値を取り込んで処理すればよいわけですね。

GUI の画面上やドキュメントでは、GCP や AWS、Azure の IoT サービスもエクスポート先として挙げられていますが、現段階でどこまで動くかは確認できていません。すでに証明書などを用いた認証はできそうなので、どこかで試してみます。

EdgeX Foundry 関連エントリ