目次 [非表示]
EdgeX Foundry 関連エントリ
- 冬休みの自由研究: EdgeX Foundry (1) 概要
- 冬休みの自由研究: EdgeX Foundry (2) 導入とデータの確認
- 冬休みの自由研究: EdgeX Foundry (3) エクスポートサービスによるエクスポート
- 冬休みの自由研究: EdgeX Foundry (4) MQTT デバイスサービスの追加
- 冬休みの自由研究: EdgeX Foundry (5) ルールエンジンによる自動制御
- 冬休みの自由研究: EdgeX Foundry (6) アプリケーションサービスによるエクスポート
- 冬休みの自由研究: EdgeX Foundry (7) Kubernetes 上で Fuji リリースを動かす
- EdgeX Foundry ハンズオンラボガイド公開
- EdgeX Foundry: Geneva から Hanoi へ
エクスポートの仕組み
前回のエントリ では、EdgeX Foundry を稼働させて、仮想デバイスのデータが蓄積される様子を観察しました。続いては、外部へのエクスポートを試します。
EdgeX Foundry は、エッジ内で閉じて利用することももちろん可能ですが、例えば処理したデータをクラウドに投げたい、オンプレミスの別のシステムに投げたい、など、EdgeX Foundry の外部へデータを連携したいシーンも考えられます。
このようなときに、エクスポートサービスを構成することで、コアサービスに届けられたデータをリアルタイムに外部に送れるようになります。
EdgeX Foundry では、エクスポート先をエクスポートサービスのクライアントとして定義しています。クライアントは、クライアント登録サービス(図中の CLIENT REGISTRATION、今回の構成では edgex-export-client
コンテナ)を通じてデータベースに接続情報を保存することで新規に登録できます。
エクスポート先へのデータの配信は、実際には配信サービス(図中 DISTRIBUTION、edgex-export-distro
コンテナ)が行っています。コアサービス層からのイベントの配信を受けて、データベースに登録されたクライアントにデータを送出する役割を担っています。
エクスポートの準備
今回は、エクスポート方法として、
- MQTT トピックへの配信によるエクスポート
- REST エンドポイントへの POST によるエクスポート
の二つのパターンを構成します。
前述の通り、エクスポート先は エクスポートサービスのクライアントと考えるため、実際には、
- MQTT トピックをエクスポートクライアントとして登録する
- REST エンドポイントをエクスポートクライアントとして登録する
という操作を行います。
さて、今回も GitHub にもろもろ置いてあります ので、こちらをクローンして使います。
1 2 3 | $ git clone https://github.com/kurokobo/edgex-lab-handson.git $ cd lab02 |
MQTT ブローカの準備
エクスポートクライアントとして MQTT トピックを登録するため、そのトピックを提供する MQTT ブローカを作ります。
本来の用途を考えれば、エッジからさらにフォグかクラウドに送るイメージなので、test.mosquitto.org や CloudMQTT などのクラウドっぽいサービスに投げるほうがそれっぽいですが、今回はお試しなので、ブローカはローカルに立ててしまいます。
といっても、コンテナイメージがあるので、おもむろに起動させるだけです。
1 2 3 4 | $ docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto Unable to find image 'eclipse-mosquitto:latest' locally ... 52e107224959223a3132466b5356278f637daa855aadc3a1345c71c193deb4df |
起動できたら、簡単にテストします。クライアントもコンテナのイメージが(非公式ですが)あるのでお借りして、適当なトピック名を指定して購読を開始してから、
1 2 3 4 5 6 7 8 | $ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "edgex-handson-topic" -d Unable to find image 'efrecon/mqtt-client:latest' locally ... Client mosqsub|6-1f6d3fb35f68 sending CONNECT Client mosqsub|6-1f6d3fb35f68 received CONNACK (0) Client mosqsub|6-1f6d3fb35f68 sending SUBSCRIBE (Mid: 1, Topic: edgex-handson-topic, QoS: 0) Client mosqsub|6-1f6d3fb35f68 received SUBACK Subscribed (mid: 1): 0 |
別のコンテナで同じトピックに配信します。
1 2 3 4 5 | $ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "edgex-handson-topic" -d -m "TEST MESSAGE" Client mosqpub|6-96175cb072bd sending CONNECT Client mosqpub|6-96175cb072bd received CONNACK (0) Client mosqpub|6-96175cb072bd sending PUBLISH (d0, q0, r0, m1, 'edgex-handson-topic', ... (20 bytes)) Client mosqpub|6-96175cb072bd sending DISCONNECT |
購読している側で、メッセージが配信されてくることが確認できれば成功です。
1 2 3 | $ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "edgex-handson-topic" -d ... TEST MESSAGE |
デバッグメッセージが出て邪魔な場合は、-d
オプションを消すと静かになります。
REST エンドポイントの準備
続いて、エクスポートクライアントとして登録する REST エンドポイントを作ります。
登録すると、REST エンドポイント側には単なる POST リクエストで届くので、POST されたリクエストボディが確認できれば動作確認には充分です。ここでは、Python の Flask で、リクエストボディを標準出力に吐くだけの簡単な Web サーバを作ります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | from flask import Flask, request from datetime import datetime app = Flask(__name__) @app .route( "/api/v1/echo" , methods = [ "POST" ]) def echo(): print ( "--\n{}" . format (datetime.now())) print (request.get_data().decode( "utf-8" )) return "OK" , 200 if __name__ = = "__main__" : app.run(debug = True , host = "0.0.0.0" , port = 5000 ) |
リポジトリには配置済みなので、クローンしてある場合はそのまま起動できます。
1 2 3 4 5 6 7 8 9 10 11 | $ cd lab02/rest-endpoint $ python ./main.py * Serving Flask app "main" (lazy loading) * Environment: production WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. * Debug mode: on * Restarting with stat * Debugger is active! * Debugger PIN: 159-538-312 * Running on http://0.0.0.0:5000/ (Press CTRL+C to quit) |
待ち受けができたら、cURL や Postman などで適当な POST リクエストを発行します。
1 2 | $ curl -X POST -d '{"message": "TEST MESSAGE"}' http://192.168.0.100:5000/api/v1/echo OK |
Web サーバの出力に POST したリクエストボディが表示されれば成功です。
1 2 3 4 5 6 | $ python ./main.py ... -- 2020-01-21 00:03:26.503181 {"message": "TEST MESSAGE"} 192.168.0.220 - - [21/Jan/2020 00:03:26] "POST /api/v1/echo HTTP/1.1" 200 - |
エクスポートクライアントの登録
ここまでで、エクスポートされればその中身が確認できる状態が整ったので、続けて EdgeX Foundry 側に実際に MQTT トピックや REST エンドポイントをエクスポートクライアントとして登録していきます。
EdgeX Foundry が起動していない場合は、前回のエントリ なども参考にして起動させます。
1 2 | $ cd lab02 $ docker-compose up -d |
エクスポートクライアントの追加は、API や GUI で行えます。CLI では現時点では難しそうです。
API でのエクスポートクライアントの登録
API によるエクスポートクライアントの登録は、edgex-export-client
のエンドポイントに JSON で POST することで行えます。cURL や Postman などで実行できます。
MQTT トピックをエクスポートクライアントとして登録するには、以下のような JSON を組んで、
1 2 3 4 5 6 7 8 9 10 11 12 13 | { "name" : "edgex-handson-mqtt" , "addressable" : { "name" : "edgex-handson-mqttbroker" , "protocol" : "tcp" , "address" : "192.168.0.100" , "port" : 1883, "topic" : "edgex-handson-topic" }, "format" : "JSON" , "enable" : true , "destination" : "MQTT_TOPIC" } |
これを http://localhost:48071/api/v1/registration
に POST します。 登録されたクライアントの ID が返ってきます。
1 2 | $ curl -X POST -d '{"name":"edgex-handson-mqtt","addressable":{"name":"edgex-handson-mqttbroker","protocol":"tcp","address":"192.168.0.100","port":1883,"topic":"edgex-handson-topic"},"format":"JSON","enable":true,"destination":"MQTT_TOPIC"}' http://localhost:48071/api/v1/registration ebdde555-001b-4798-817a-da75b92406c7 |
登録したらその時点からエクスポートが開始されるので、指定した MQTT トピックを購読すれば、値が届いていることが確認できます。
1 2 3 4 5 6 | $ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "edgex-handson-topic" -d ... Client mosqsub|6-1f6d3fb35f68 received PUBLISH (d0, q0, r0, m0, 'edgex-handson-topic', ... (258 bytes)) {"id":"ac6d9f85-7c9f-4300-b6e1-f51f8b8e0e69","device":"Random-Integer-Device","origin":1579534468929275000,"readings":[{"id":"8c9267a0-c8a1-4e9e-9a60-90d7df8966fc","origin":1579534468917108600,"device":"Random-Integer-Device","name":"Int16","value":"8409"}]} Client mosqsub|6-1f6d3fb35f68 received PUBLISH (d0, q0, r0, m0, 'edgex-handson-topic', ... (263 bytes)) {"id":"0539ed5c-68c3-4a12-a936-743ced169f7d","device":"Random-Integer-Device","origin":1579534468955468400,"readings":[{"id":"4b52dc78-efd9-4234-982c-1a17f21640f3","origin":1579534468943312700,"device":"Random-Integer-Device","name":"Int32","value":"-54648128"}]} |
続けて、REST エンドポイントへのエクスポートを追加します。POST する JSON は以下です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | { "name" : "edgex-handson-rest" , "addressable" : { "name" : "edgex-handson-restendpoint" , "protocol" : "http" , "method" : "POST" , "address" : "192.168.0.100" , "port" : 5000, "path" : "/api/v1/echo" }, "format" : "JSON" , "enable" : true , "destination" : "REST_ENDPOINT" } |
POST する先は同様に http://localhost:48071/api/v1/registration
です。
1 2 | $ curl -X POST -d '{"name":"edgex-handson-rest","addressable":{"name":"edgex-handson-restendpoint","protocol":"http","method":"POST","address":"192.168.0.100","port":5000,"path":"/api/v1/echo"},"format":"JSON","enable":true,"destination":"REST_ENDPOINT"}' http://localhost:48071/api/v1/registration 6092b921-b173-4de7-8441-f89461734c17 |
こちらも追加した段階からエクスポートが開始されるので、REST エンドポイント側で、POST リクエストが来ていることが確認できます。
1 2 3 4 5 6 7 8 9 10 | $ python ./main.py ... -- 2020-01-21 00:47:45.055939 {"id":"a0f8347d-e3d0-4bfc-ae20-98fcb9714224","device":"Random-Integer-Device","origin":1579535264923285900,"readings":[{"id":"9c5313ef-4a04-48f1-bcd4-cea06a08c0f5","origin":1579535264898458900,"device":"Random-Integer-Device","name":"Int16","value":"-18516"}]} 192.168.0.100 - - [21/Jan/2020 00:47:45] "POST /api/v1/echo HTTP/1.1" 200 - -- 2020-01-21 00:47:45.181942 {"id":"7cf0d138-ee3d-47e0-8875-877a5ea1111d","device":"Random-Integer-Device","origin":1579535264973814800,"readings":[{"id":"372eb7bf-9037-43ed-aa53-6d9cd4cafc5a","origin":1579535264962783700,"device":"Random-Integer-Device","name":"Int32","value":"-1557838489"}]} 192.168.0.100 - - [21/Jan/2020 00:47:45] "POST /api/v1/echo HTTP/1.1" 200 - |
API でのエクスポートクライアントの確認
登録済みのエクスポート設定は、GET リクエストで確認できます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | $ curl -s http://localhost:48071/api/v1/registration | jq [ { "id": "ebdde555-001b-4798-817a-da75b92406c7", "created": 1579534374854, "modified": 1579534374854, "origin": 0, "name": "edgex-handson-mqtt", ... }, { "id": "6092b921-b173-4de7-8441-f89461734c17", "created": 1579535149239, "modified": 1579535149239, "origin": 0, "name": "edgex-handson-rest", ... } ] |
API でのエクスポートクライアントの削除
削除は DELETE リクエストで行えます。エンドポイントはそれぞれ以下です。
- 削除対象を ID で指定する場合
http://localhost:48071/api/v1/registration/id/<ID>
- 削除対象を名前で指定する場合
-
http://localhost:48071/api/v1/registration/name/<名前>
-
cURL で実行する場合は以下が例です。
1 2 3 4 5 | $ curl -X DELETE http://localhost:48071/api/v1/registration/id/ebdde555-001b-4798-817a-da75b92406c7 true $ curl -X DELETE http://localhost:48071/api/v1/registration/name/edgex-handson-rest true |
GUI でのエクスポートクライアントの登録
ここまで API でがんばってきましたが、前回のエントリ で紹介した Closure UI では、エクスポートクライアントの追加も削除も非常に楽に行えます。
http://localhost:8080/
にアクセスしてログイン後、[EXPORT
] を開きます。
ここから、画面に従って値を入れていけば完了です。例えば今回の MQTT トピックをクライアントとして登録する場合は、以下のように入力します。
項目 | 値 |
Destination | MQTT Topic |
Name | edgex-handson-mqtt |
Enable | オン(緑) |
Export format | JSON |
Protocol | TCP |
Address | 192.168.0.100 |
Port | 1883 |
Topic | edgex-handson-topic |
上記以外 | デフォルト値 |
REST エンドポイントの場合は次のようにします。
項目 | 値 |
Destination | REST Endpoint |
Name | edgex-handson-rest |
Enable | オン(緑) |
Export format | JSON |
Protocol | HTTP |
Address | 192.168.0.100 |
Port | 5000 |
Path | /api/v1/echo |
上記以外 | デフォルト値 |
GUI でのエクスポートクライアントの削除
削除は、登録後の画面で、削除したいクライアントの右端、[ACTION
] 欄にある [Delete Export
] アイコンをクリックするだけです。簡単ですね。
アプリケーションサービスによるエクスポート
……と、意気揚々と書いてきたものの、今回利用したエクスポートサービスは最近のリリースではすでに廃止されており、エクスポートの機能は アプリケーションサービス に統合されているようです。
アプリケーションサービスは、データに対して変換やフィルタなどさまざまな処理を行えますが、その処理のひとつとして、今回行ったようなエクスポートも行えます。複数の処理をパイプライン化してつなげることで、例えば特定のデバイスの値だけフィルタしたあとにエクスポート、などが可能です。
エクスポートサービスでは、すべてのデータがエクスポートサービスを通るため、データ量やエクスポート先が多い場合にパフォーマンス面での問題が考えられますが、アプリケーションサービスでは、各々がコアサービス層のメッセージバスに直接つないでデータを受け取れるようにすることで、こうした問題に対応しているようですね。
SDK も提供されており、自製しなくても単純なフィルタやエクスポートは app-service-configurable を利用して簡単に行えそうです。
アプリケーションサービスを使ったエクスポートは、別のエントリで紹介 していますので、併せてどうぞ。
まとめ
エクスポートサービスを構成して、EdgeX Foundry で収集したデータを外部に送れることが確認できました。エクスポート先の任意のシステムでは、これらの値を取り込んで処理すればよいわけですね。
GUI の画面上やドキュメントでは、GCP や AWS、Azure の IoT サービスもエクスポート先として挙げられていますが、現段階でどこまで動くかは確認できていません。すでに証明書などを用いた認証はできそうなので、どこかで試してみます。
EdgeX Foundry 関連エントリ
- 冬休みの自由研究: EdgeX Foundry (1) 概要
- 冬休みの自由研究: EdgeX Foundry (2) 導入とデータの確認
- 冬休みの自由研究: EdgeX Foundry (3) エクスポートサービスによるエクスポート
- 冬休みの自由研究: EdgeX Foundry (4) MQTT デバイスサービスの追加
- 冬休みの自由研究: EdgeX Foundry (5) ルールエンジンによる自動制御
- 冬休みの自由研究: EdgeX Foundry (6) アプリケーションサービスによるエクスポート
- 冬休みの自由研究: EdgeX Foundry (7) Kubernetes 上で Fuji リリースを動かす
- EdgeX Foundry ハンズオンラボガイド公開
- EdgeX Foundry: Geneva から Hanoi へ