Rapsberry Pi

冬休みの自由研究: EdgeX Foundry (3) エクスポートサービスによるエクスポート

EdgeX Foundry 関連エントリ

エクスポートの仕組み

前回のエントリ では、EdgeX Foundry を稼働させて、仮想デバイスのデータが蓄積される様子を観察しました。続いては、外部へのエクスポートを試します。

EdgeX Foundry は、エッジ内で閉じて利用することももちろん可能ですが、例えば処理したデータをクラウドに投げたい、オンプレミスの別のシステムに投げたい、など、EdgeX Foundry の外部へデータを連携したいシーンも考えられます。

このようなときに、エクスポートサービスを構成することで、コアサービスに届けられたデータをリアルタイムに外部に送れるようになります。

赤枠が今回関係するところ

EdgeX Foundry では、エクスポート先をエクスポートサービスのクライアントとして定義しています。クライアントは、クライアント登録サービス(図中の CLIENT REGISTRATION、今回の構成では edgex-export-client コンテナ)を通じてデータベースに接続情報を保存することで新規に登録できます。

エクスポート先へのデータの配信は、実際には配信サービス(図中 DISTRIBUTION、edgex-export-distro コンテナ)が行っています。コアサービス層からのイベントの配信を受けて、データベースに登録されたクライアントにデータを送出する役割を担っています。

エクスポートの準備

今回は、エクスポート方法として、

  • MQTT トピックへの配信によるエクスポート
  • REST エンドポイントへの POST によるエクスポート

の二つのパターンを構成します。

前述の通り、エクスポート先は エクスポートサービスのクライアントと考えるため、実際には、

  • MQTT トピックをエクスポートクライアントとして登録する
  • REST エンドポイントをエクスポートクライアントとして登録する

という操作を行います。

さて、今回も GitHub にもろもろ置いてあります ので、こちらをクローンして使います。

$ git clone https://github.com/kurokobo/edgex-lab-handson.git

$ cd lab02

MQTT ブローカの準備

エクスポートクライアントとして MQTT トピックを登録するため、そのトピックを提供する MQTT ブローカを作ります。

本来の用途を考えれば、エッジからさらにフォグかクラウドに送るイメージなので、test.mosquitto.orgCloudMQTT などのクラウドっぽいサービスに投げるほうがそれっぽいですが、今回はお試しなので、ブローカはローカルに立ててしまいます。

といっても、コンテナイメージがあるので、おもむろに起動させるだけです。

$ docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto
Unable to find image 'eclipse-mosquitto:latest' locally
...
52e107224959223a3132466b5356278f637daa855aadc3a1345c71c193deb4df

起動できたら、簡単にテストします。クライアントもコンテナのイメージが(非公式ですが)あるのでお借りして、適当なトピック名を指定して購読を開始してから、

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "edgex-handson-topic" -d
Unable to find image 'efrecon/mqtt-client:latest' locally
...
Client mosqsub|6-1f6d3fb35f68 sending CONNECT
Client mosqsub|6-1f6d3fb35f68 received CONNACK (0)
Client mosqsub|6-1f6d3fb35f68 sending SUBSCRIBE (Mid: 1, Topic: edgex-handson-topic, QoS: 0)
Client mosqsub|6-1f6d3fb35f68 received SUBACK
Subscribed (mid: 1): 0

別のコンテナで同じトピックに配信します。

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "edgex-handson-topic" -d -m "TEST MESSAGE"
Client mosqpub|6-96175cb072bd sending CONNECT
Client mosqpub|6-96175cb072bd received CONNACK (0)
Client mosqpub|6-96175cb072bd sending PUBLISH (d0, q0, r0, m1, 'edgex-handson-topic', ... (20 bytes))
Client mosqpub|6-96175cb072bd sending DISCONNECT

購読している側で、メッセージが配信されてくることが確認できれば成功です。

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "edgex-handson-topic" -d
...
TEST MESSAGE

デバッグメッセージが出て邪魔な場合は、-d オプションを消すと静かになります。

REST エンドポイントの準備

続いて、エクスポートクライアントとして登録する REST エンドポイントを作ります。

登録すると、REST エンドポイント側には単なる POST リクエストで届くので、POST されたリクエストボディが確認できれば動作確認には充分です。ここでは、Python の Flask で、リクエストボディを標準出力に吐くだけの簡単な Web サーバを作ります。

from flask import Flask, request
from datetime import datetime

app = Flask(__name__)


@app.route("/api/v1/echo", methods=["POST"])
def echo():
    print("--\n{}".format(datetime.now()))
    print(request.get_data().decode("utf-8"))
    return "OK", 200


if __name__ == "__main__":
    app.run(debug=True, host="0.0.0.0", port=5000)

リポジトリには配置済みなので、クローンしてある場合はそのまま起動できます。

$ cd lab02/rest-endpoint
$ python ./main.py
 * Serving Flask app "main" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: on
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 159-538-312
 * Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)

待ち受けができたら、cURL や Postman などで適当な POST リクエストを発行します。

$ curl -X POST -d '{"message": "TEST MESSAGE"}' http://192.168.0.100:5000/api/v1/echo
OK

Web サーバの出力に POST したリクエストボディが表示されれば成功です。

$ python ./main.py
...
--
2020-01-21 00:03:26.503181
{"message": "TEST MESSAGE"}
192.168.0.220 - - [21/Jan/2020 00:03:26] "POST /api/v1/echo HTTP/1.1" 200 -

エクスポートクライアントの登録

ここまでで、エクスポートされればその中身が確認できる状態が整ったので、続けて EdgeX Foundry 側に実際に MQTT トピックや REST エンドポイントをエクスポートクライアントとして登録していきます。

EdgeX Foundry が起動していない場合は、前回のエントリ なども参考にして起動させます。

$ cd lab02
$ docker-compose up -d

エクスポートクライアントの追加は、API や GUI で行えます。CLI では現時点では難しそうです。

API でのエクスポートクライアントの登録

API によるエクスポートクライアントの登録は、edgex-export-client のエンドポイントに JSON で POST することで行えます。cURL や Postman などで実行できます。

MQTT トピックをエクスポートクライアントとして登録するには、以下のような JSON を組んで、

{
    "name": "edgex-handson-mqtt",
    "addressable": {
        "name": "edgex-handson-mqttbroker",
        "protocol": "tcp",
        "address": "192.168.0.100",
        "port": 1883,
        "topic": "edgex-handson-topic"
    },
    "format": "JSON",
    "enable": true,
    "destination": "MQTT_TOPIC"
}

これを http://localhost:48071/api/v1/registration に POST します。 登録されたクライアントの ID が返ってきます。

$ curl -X POST -d '{"name":"edgex-handson-mqtt","addressable":{"name":"edgex-handson-mqttbroker","protocol":"tcp","address":"192.168.0.100","port":1883,"topic":"edgex-handson-topic"},"format":"JSON","enable":true,"destination":"MQTT_TOPIC"}' http://localhost:48071/api/v1/registration
ebdde555-001b-4798-817a-da75b92406c7

登録したらその時点からエクスポートが開始されるので、指定した MQTT トピックを購読すれば、値が届いていることが確認できます。

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "edgex-handson-topic" -d
...
Client mosqsub|6-1f6d3fb35f68 received PUBLISH (d0, q0, r0, m0, 'edgex-handson-topic', ... (258 bytes))
{"id":"ac6d9f85-7c9f-4300-b6e1-f51f8b8e0e69","device":"Random-Integer-Device","origin":1579534468929275000,"readings":[{"id":"8c9267a0-c8a1-4e9e-9a60-90d7df8966fc","origin":1579534468917108600,"device":"Random-Integer-Device","name":"Int16","value":"8409"}]}
Client mosqsub|6-1f6d3fb35f68 received PUBLISH (d0, q0, r0, m0, 'edgex-handson-topic', ... (263 bytes))
{"id":"0539ed5c-68c3-4a12-a936-743ced169f7d","device":"Random-Integer-Device","origin":1579534468955468400,"readings":[{"id":"4b52dc78-efd9-4234-982c-1a17f21640f3","origin":1579534468943312700,"device":"Random-Integer-Device","name":"Int32","value":"-54648128"}]}

続けて、REST エンドポイントへのエクスポートを追加します。POST する JSON は以下です。

{
    "name": "edgex-handson-rest",
    "addressable": {
        "name": "edgex-handson-restendpoint",
        "protocol": "http",
        "method": "POST",
        "address": "192.168.0.100",
        "port": 5000,
        "path": "/api/v1/echo"
    },
    "format": "JSON",
    "enable": true,
    "destination": "REST_ENDPOINT"
}

POST する先は同様に http://localhost:48071/api/v1/registration です。

$ curl -X POST -d '{"name":"edgex-handson-rest","addressable":{"name":"edgex-handson-restendpoint","protocol":"http","method":"POST","address":"192.168.0.100","port":5000,"path":"/api/v1/echo"},"format":"JSON","enable":true,"destination":"REST_ENDPOINT"}' http://localhost:48071/api/v1/registration
6092b921-b173-4de7-8441-f89461734c17

こちらも追加した段階からエクスポートが開始されるので、REST エンドポイント側で、POST リクエストが来ていることが確認できます。

$ python ./main.py
...
--
2020-01-21 00:47:45.055939
{"id":"a0f8347d-e3d0-4bfc-ae20-98fcb9714224","device":"Random-Integer-Device","origin":1579535264923285900,"readings":[{"id":"9c5313ef-4a04-48f1-bcd4-cea06a08c0f5","origin":1579535264898458900,"device":"Random-Integer-Device","name":"Int16","value":"-18516"}]}
192.168.0.100 - - [21/Jan/2020 00:47:45] "POST /api/v1/echo HTTP/1.1" 200 -
--
2020-01-21 00:47:45.181942
{"id":"7cf0d138-ee3d-47e0-8875-877a5ea1111d","device":"Random-Integer-Device","origin":1579535264973814800,"readings":[{"id":"372eb7bf-9037-43ed-aa53-6d9cd4cafc5a","origin":1579535264962783700,"device":"Random-Integer-Device","name":"Int32","value":"-1557838489"}]}
192.168.0.100 - - [21/Jan/2020 00:47:45] "POST /api/v1/echo HTTP/1.1" 200 -

API でのエクスポートクライアントの確認

登録済みのエクスポート設定は、GET リクエストで確認できます。

$ curl -s http://localhost:48071/api/v1/registration | jq
[
  {
    "id": "ebdde555-001b-4798-817a-da75b92406c7",
    "created": 1579534374854,
    "modified": 1579534374854,
    "origin": 0,
    "name": "edgex-handson-mqtt",
...
  },
  {
    "id": "6092b921-b173-4de7-8441-f89461734c17",
    "created": 1579535149239,
    "modified": 1579535149239,
    "origin": 0,
    "name": "edgex-handson-rest",
...
  }
]

API でのエクスポートクライアントの削除

削除は DELETE リクエストで行えます。エンドポイントはそれぞれ以下です。

  • 削除対象を ID で指定する場合
    • http://localhost:48071/api/v1/registration/id/<ID>
  • 削除対象を名前で指定する場合
    • http://localhost:48071/api/v1/registration/name/<名前>

cURL で実行する場合は以下が例です。

$ curl -X DELETE http://localhost:48071/api/v1/registration/id/ebdde555-001b-4798-817a-da75b92406c7
true

$ curl -X DELETE http://localhost:48071/api/v1/registration/name/edgex-handson-rest
true

GUI でのエクスポートクライアントの登録

ここまで API でがんばってきましたが、前回のエントリ で紹介した Closure UI では、エクスポートクライアントの追加も削除も非常に楽に行えます。

http://localhost:8080/ にアクセスしてログイン後、[EXPORT] を開きます。

まだ何も登録されていない

ここから、画面に従って値を入れていけば完了です。例えば今回の MQTT トピックをクライアントとして登録する場合は、以下のように入力します。

項目
DestinationMQTT Topic
Nameedgex-handson-mqtt
Enableオン(緑)
Export formatJSON
ProtocolTCP
Address192.168.0.100
Port1883
Topicedgex-handson-topic
上記以外デフォルト値

REST エンドポイントの場合は次のようにします。

項目
DestinationREST Endpoint
Nameedgex-handson-rest
Enableオン(緑)
Export formatJSON
ProtocolHTTP
Address192.168.0.100
Port5000
Path/api/v1/echo
上記以外デフォルト値
登録後の状態

GUI でのエクスポートクライアントの削除

削除は、登録後の画面で、削除したいクライアントの右端、[ACTION] 欄にある [Delete Export] アイコンをクリックするだけです。簡単ですね。

削除前に確認が出る

アプリケーションサービスによるエクスポート

……と、意気揚々と書いてきたものの、今回利用したエクスポートサービスは最近のリリースではすでに廃止されており、エクスポートの機能は アプリケーションサービス に統合されているようです。

アプリケーションサービスは、データに対して変換やフィルタなどさまざまな処理を行えますが、その処理のひとつとして、今回行ったようなエクスポートも行えます。複数の処理をパイプライン化してつなげることで、例えば特定のデバイスの値だけフィルタしたあとにエクスポート、などが可能です。

エクスポートサービスでは、すべてのデータがエクスポートサービスを通るため、データ量やエクスポート先が多い場合にパフォーマンス面での問題が考えられますが、アプリケーションサービスでは、各々がコアサービス層のメッセージバスに直接つないでデータを受け取れるようにすることで、こうした問題に対応しているようですね。

SDK も提供されており、自製しなくても単純なフィルタやエクスポートは app-service-configurable を利用して簡単に行えそうです。

アプリケーションサービスを使ったエクスポートは、別のエントリで紹介 していますので、併せてどうぞ。

まとめ

エクスポートサービスを構成して、EdgeX Foundry で収集したデータを外部に送れることが確認できました。エクスポート先の任意のシステムでは、これらの値を取り込んで処理すればよいわけですね。

GUI の画面上やドキュメントでは、GCP や AWS、Azure の IoT サービスもエクスポート先として挙げられていますが、現段階でどこまで動くかは確認できていません。すでに証明書などを用いた認証はできそうなので、どこかで試してみます。

EdgeX Foundry 関連エントリ

@kurokobo

くろいです。ギターアンサンブルやら音響やらがフィールドの IT やさんなアルトギター弾き。たまこう 48 期ぎたさん、SFC '07 おんぞう、新日本ギターアンサンブル、Rubinetto。今は野良気味。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です