Skip to main content
Rapsberry Pi

冬休みの自由研究: EdgeX Foundry (7) Kubernetes 上で Fuji リリースを動かす

EdgeX Foundry 関連エントリ

EdgeX Foundry と Kubernetes

これまでの EdgeX Foundry 関連エントリでは、一貫してその動作を Docker Compose に任せていました。公式でも Docker Compose や Snaps を利用して動作させる手順が紹介されています。

が、最近よく Kubernetes(や OpenShift)を触っていることもあり、コンテナなら Kubernetes でも動かせるよね! という気持ちになったので、やってみました。

なお、現段階では、Kubernetes 上で動作させるためのマニフェストファイルは、公式には用意されていません。また、そもそも EdgeX Foundry は HA 構成を明確にはサポートしておらず、実装上も考慮されていないようです。つまり、仮に Kubernetes で動作させられたとしても、レプリカを増やしてロードバランスするような状態での正常な動作は何の保証もないことになります。

というわけで、現状ではあくまで実験程度に捉えておくのがよいと思います。

今回の目標

後述しますが、EdgeX Foundry の Kubernetes 上での動作は、過去にすでに試みられているので、今回はそれらの先行実装で解決されていない以下の 2 点を解消できる実装を目標としました。

  1. 現時点での Fuji リリースを動作させられること
  2. マルチノード構成の Kubernetes で動かせること

幸いなことに、自宅の vSphere 環境では Cluster API を使ってマルチノードの Kubernetes 環境がすぐに作れるようになっている ので、この環境を使います。

できあがったマニフェスト

紆余曲折を経てひとまず動くものができたので、リポジトリ に置いてあります。

以下、使い方を簡単に紹介します。リポジトリはあらかじめクローンしておきます。

$ git clone https://github.com/kurokobo/edgex-on-kubernetes.git
$ cd edgex-on-kubernetes

Persistent Volume の用意

PV を 4 つ使うので、PV の実体となるものを用意します。今回は QNAP を NFS ストレージとして使いました。

エクスポートされた NFS 領域を適当な作業用の Linux 端末からマウントして、PV の実体となるサブディレクトリを作っておきます。

$ mount -t nfs 192.168.0.200:/k8s /mnt/k8s
$ for i in $(seq -f %03g 100); do sudo mkdir /mnt/k8s/pv$i; done
$ umount /mnt/k8s

続けて、これらの NFS 領域を Kubernetes の PersistentVolume として定義します。マニフェストの中身はこんな感じです。

$ cat persistentvolumes/nfs/pv001-persistentvolume.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
  name: pv001
spec:
  capacity:
    storage: 512Mi
  accessModes:
    - ReadWriteMany
  persistentVolumeReclaimPolicy: Recycle
  nfs:
    server: 192.168.0.200
    path: /k8s/pv001

当然ですが、これはぼくの環境に合わせたものなので、それ以外の環境で動かす場合は適宜修正は必要です。今回は nfs で書いていますが、hostPath でも gcePersistentDisk でも好きなようにしてください。

逆にいえば、Deployment ではボリュームを PVC で割り当てているので、環境に合わせて編集するのは PV だけでどうにかなる気がします。

マニフェストができたら、kubectl create します。

$ ls persistentvolumes/nfs/*.yaml | xargs -n 1 kubectl create -f
persistentvolume/pv001 created
persistentvolume/pv002 created
persistentvolume/pv003 created
persistentvolume/pv004 created

$ kubectl get pv
NAME    CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS      CLAIM   STORAGECLASS   REASON   AGE
pv001   512Mi      RWX            Recycle          Available                                   22s
pv002   128Mi      RWX            Recycle          Available                                   22s
pv003   128Mi      RWX            Recycle          Available                                   22s
pv004   128Mi      RWX            Recycle          Available                                   22s

Persistent Volume Claim の用意

PV ができたら、PVC を作ります。

$ ls persistentvolumeclaims/*.yaml | xargs -n 1 kubectl create -f
persistentvolumeclaim/consul-config created
persistentvolumeclaim/consul-data created
persistentvolumeclaim/db-data created
persistentvolumeclaim/log-data created

$ kubectl get pv,pvc
NAME                     CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS   CLAIM                   STORAGECLASS   REASON   AGE
persistentvolume/pv001   512Mi      RWX            Recycle          Bound    default/db-data                                 11m
persistentvolume/pv002   128Mi      RWX            Recycle          Bound    default/consul-data                             11m
persistentvolume/pv003   128Mi      RWX            Recycle          Bound    default/log-data                                11m
persistentvolume/pv004   128Mi      RWX            Recycle          Bound    default/consul-config                           11m

NAME                                  STATUS   VOLUME   CAPACITY   ACCESS MODES   STORAGECLASS   AGE
persistentvolumeclaim/consul-config   Bound    pv004    128Mi      RWX                           8s
persistentvolumeclaim/consul-data     Bound    pv002    128Mi      RWX                           8s
persistentvolumeclaim/db-data         Bound    pv001    512Mi      RWX                           8s
persistentvolumeclaim/log-data        Bound    pv003    128Mi      RWX                           7s

PV の STATUSBound になって、PVC の VOLUME で PV が対応付けられていれば大丈夫です。

Service の作成

続けて SVC を作ります。マニフェストファイル内で、外部から触りそうなヤツだけ type: LoadBalancer を書いて外部 IP アドレスを持たせています。

$ ls services/*.yaml | xargs -n 1 kubectl create -f
service/edgex-app-service-configurable-rules created
service/edgex-core-command created
service/edgex-core-consul created
service/edgex-core-data created
service/edgex-core-metadata created
service/edgex-device-virtual created
service/edgex-mongo created
service/edgex-support-logging created
service/edgex-support-notifications created
service/edgex-support-rulesengine created
service/edgex-support-scheduler created
service/edgex-sys-mgmt-agent created
service/edgex-ui-go created

$ kubectl get svc
NAME                                   TYPE           CLUSTER-IP       EXTERNAL-IP    PORT(S)                          AGE
edgex-app-service-configurable-rules   ClusterIP      100.67.112.145   <none>         48100/TCP                        8s
edgex-core-command                     LoadBalancer   100.69.10.125    192.168.0.50   48082:31022/TCP                  8s
edgex-core-consul                      LoadBalancer   100.71.56.117    192.168.0.51   8400:30675/TCP,8500:32711/TCP    8s
edgex-core-data                        LoadBalancer   100.70.131.230   192.168.0.52   48080:31879/TCP,5563:31195/TCP   8s
edgex-core-metadata                    LoadBalancer   100.68.152.47    192.168.0.53   48081:30265/TCP                  8s
edgex-device-virtual                   ClusterIP      100.70.115.198   <none>         49990/TCP                        8s
edgex-mongo                            LoadBalancer   100.69.209.129   192.168.0.54   27017:30701/TCP                  7s
edgex-support-logging                  ClusterIP      100.65.29.76     <none>         48061/TCP                        7s
edgex-support-notifications            ClusterIP      100.65.127.176   <none>         48060/TCP                        7s
edgex-support-rulesengine              ClusterIP      100.64.125.224   <none>         48075/TCP                        7s
edgex-support-scheduler                ClusterIP      100.70.168.47    <none>         48085/TCP                        7s
edgex-sys-mgmt-agent                   LoadBalancer   100.68.113.150   192.168.0.55   48090:32466/TCP                  7s
edgex-ui-go                            LoadBalancer   100.64.153.22    192.168.0.56   4000:32252/TCP                   7s
kubernetes                             ClusterIP      100.64.0.1       <none>         443/TCP                          14m

外部からの触りっぷりをもうちょっときれいにしたい場合は、Ingress でも作るとよいと思います。

Deployment の作成と Consul の初期化

ここまでできたら、あとは順次マイクロサービスを起動していきます。

最初に、edgex-files を起動させて、PV 内にファイルを配置します。その後、Consul(edgex-core-consul)を起動させます。

$ kubectl create -f pods/edgex-files-pod.yaml
pod/edgex-files created

$ while [[ $(kubectl get pods edgex-files -o "jsonpath={.status.phase}") != "Succeeded" ]]; do sleep 1; done

$ kubectl create -f deployments/edgex-core-consul-deployment.yaml
deployment.apps/edgex-core-consul created

$ kubectl wait --for=condition=available --timeout=60s deployment edgex-core-consul
deployment.apps/edgex-core-consul condition met

Consul が起動したら、edgex-core-config-seed を動作させて、Consul に初期設定値を登録させます。

$ kubectl create -f pods/edgex-core-config-seed-pod.yaml
pod/edgex-core-config-seed created

$ while [[ $(kubectl get pods edgex-core-config-seed -o "jsonpath={.status.phase}") != "Succeeded" ]]; do sleep 1; done

edgex-filesedgex-core-config-seed は、それぞれ edgex-files はファイルのコピーが終わったら、edgex-core-config-seed は Consul に値を登録し終わったら不要になります。そのため、ここではこれらは Deployment リソースとしてではなく Pod リソースとして定義しています。

残りの Deployment の作成

残りのマイクロサービスを起動していきます。

$ kubectl create -f deployments/edgex-mongo-deployment.yaml
deployment.apps/edgex-mongo created

$ kubectl wait --for=condition=available --timeout=60s deployment edgex-mongo
deployment.apps/edgex-mongo condition met

$ kubectl create -f deployments/edgex-support-logging-deployment.yaml
deployment.apps/edgex-support-logging created

$ kubectl wait --for=condition=available --timeout=60s deployment edgex-support-logging
deployment.apps/edgex-support-logging condition met

$ kubectl create -f deployments/edgex-support-notifications-deployment.yaml
deployment.apps/edgex-support-notifications created

$ kubectl wait --for=condition=available --timeout=60s deployment edgex-support-notifications
deployment.apps/edgex-support-notifications condition met

$ kubectl create -f deployments/edgex-core-metadata-deployment.yaml
deployment.apps/edgex-core-metadata created

$ kubectl wait --for=condition=available --timeout=60s deployment edgex-core-metadata
deployment.apps/edgex-core-metadata condition met

$ kubectl create -f deployments/edgex-core-data-deployment.yaml
deployment.apps/edgex-core-data created

$ kubectl wait --for=condition=available --timeout=60s deployment edgex-core-data
deployment.apps/edgex-core-data condition met

$ kubectl create -f deployments/edgex-core-command-deployment.yaml
deployment.apps/edgex-core-command created

$ kubectl wait --for=condition=available --timeout=60s deployment edgex-core-command
deployment.apps/edgex-core-command condition met

$ kubectl create -f deployments/edgex-support-scheduler-deployment.yaml
deployment.apps/edgex-support-scheduler created

$ kubectl wait --for=condition=available --timeout=60s deployment edgex-support-scheduler
deployment.apps/edgex-support-scheduler condition met

$ kubectl create -f deployments/edgex-support-rulesengine-deployment.yaml
deployment.apps/edgex-support-rulesengine created

$ kubectl wait --for=condition=available --timeout=60s deployment edgex-support-rulesengine
deployment.apps/edgex-support-rulesengine condition met

$ kubectl create -f deployments/edgex-app-service-configurable-rules-deployment.yaml
deployment.apps/edgex-app-service-configurable-rules created

$ kubectl wait --for=condition=available --timeout=60s deployment edgex-app-service-configurable-rules
deployment.apps/edgex-app-service-configurable-rules condition met

$ kubectl create -f deployments/edgex-device-virtual-deployment.yaml
deployment.apps/edgex-device-virtual created

$ kubectl wait --for=condition=available --timeout=60s deployment edgex-device-virtual
deployment.apps/edgex-device-virtual condition met

$ kubectl create -f deployments/edgex-sys-mgmt-agent-deployment.yaml
deployment.apps/edgex-sys-mgmt-agent created

$ kubectl wait --for=condition=available --timeout=60s deployment edgex-sys-mgmt-agent
deployment.apps/edgex-sys-mgmt-agent condition met

$ kubectl create -f deployments/edgex-ui-go-deployment.yaml
deployment.apps/edgex-ui-go created

$ kubectl wait --for=condition=available --timeout=60s deployment edgex-ui-go
deployment.apps/edgex-ui-go condition met

完成

完成です。

$ kubectl get all,pv,pvc
NAME                                                        READY   STATUS      RESTARTS   AGE
pod/edgex-app-service-configurable-rules-6c794d754d-xj4v4   1/1     Running     0          92s
pod/edgex-core-command-cc8465b6f-kgwhm                      1/1     Running     0          98s
pod/edgex-core-config-seed                                  0/1     Completed   0          2m13s
pod/edgex-core-consul-5b8cdbbfc8-g7j6t                      1/1     Running     0          2m26s
pod/edgex-core-data-c7775c56b-t8rdf                         1/1     Running     0          100s
pod/edgex-core-metadata-66f86d86b-mfqfg                     1/1     Running     0          102s
pod/edgex-device-virtual-597f9d77f-vqlln                    1/1     Running     0          90s
pod/edgex-files                                             0/1     Completed   0          6m13s
pod/edgex-mongo-99c7d8957-fsz6c                             1/1     Running     0          110s
pod/edgex-support-logging-8566649bf8-dj8fw                  1/1     Running     0          107s
pod/edgex-support-notifications-586cf544f-n8md7             1/1     Running     0          104s
pod/edgex-support-rulesengine-6cfc49f89b-xn8fc              1/1     Running     0          94s
pod/edgex-support-scheduler-7658f985bb-9bmz2                1/1     Running     0          96s
pod/edgex-sys-mgmt-agent-5dd56c65bb-tnqth                   1/1     Running     0          88s
pod/edgex-ui-go-54668dcc94-mdx2d                            1/1     Running     0          86s

NAME                                           TYPE           CLUSTER-IP       EXTERNAL-IP    PORT(S)                          AGE
service/edgex-app-service-configurable-rules   ClusterIP      100.67.11.126    <none>         48100/TCP                        7m33s
service/edgex-core-command                     LoadBalancer   100.71.84.78     192.168.0.50   48082:32108/TCP                  7m33s
service/edgex-core-consul                      LoadBalancer   100.66.136.131   192.168.0.51   8400:31442/TCP,8500:31806/TCP    7m32s
service/edgex-core-data                        LoadBalancer   100.64.210.255   192.168.0.52   48080:31808/TCP,5563:30664/TCP   7m32s
service/edgex-core-metadata                    LoadBalancer   100.67.207.51    192.168.0.53   48081:32379/TCP                  7m32s
service/edgex-device-virtual                   ClusterIP      100.69.130.2     <none>         49990/TCP                        7m32s
service/edgex-mongo                            LoadBalancer   100.66.8.30      192.168.0.54   27017:31973/TCP                  7m32s
service/edgex-support-logging                  ClusterIP      100.68.171.73    <none>         48061/TCP                        7m32s
service/edgex-support-notifications            ClusterIP      100.67.242.204   <none>         48060/TCP                        7m32s
service/edgex-support-rulesengine              ClusterIP      100.67.203.199   <none>         48075/TCP                        7m32s
service/edgex-support-scheduler                ClusterIP      100.65.53.193    <none>         48085/TCP                        7m31s
service/edgex-sys-mgmt-agent                   LoadBalancer   100.65.122.248   192.168.0.55   48090:31476/TCP                  7m31s
service/edgex-ui-go                            LoadBalancer   100.70.107.81    192.168.0.56   4000:30852/TCP                   7m31s
service/kubernetes                             ClusterIP      100.64.0.1       <none>         443/TCP                          47m

NAME                                                   READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/edgex-app-service-configurable-rules   1/1     1            1           92s
deployment.apps/edgex-core-command                     1/1     1            1           98s
deployment.apps/edgex-core-consul                      1/1     1            1           2m26s
deployment.apps/edgex-core-data                        1/1     1            1           100s
deployment.apps/edgex-core-metadata                    1/1     1            1           102s
deployment.apps/edgex-device-virtual                   1/1     1            1           90s
deployment.apps/edgex-mongo                            1/1     1            1           110s
deployment.apps/edgex-support-logging                  1/1     1            1           107s
deployment.apps/edgex-support-notifications            1/1     1            1           104s
deployment.apps/edgex-support-rulesengine              1/1     1            1           94s
deployment.apps/edgex-support-scheduler                1/1     1            1           96s
deployment.apps/edgex-sys-mgmt-agent                   1/1     1            1           88s
deployment.apps/edgex-ui-go                            1/1     1            1           86s

NAME                                                              DESIRED   CURRENT   READY   AGE
replicaset.apps/edgex-app-service-configurable-rules-6c794d754d   1         1         1       92s
replicaset.apps/edgex-core-command-cc8465b6f                      1         1         1       98s
replicaset.apps/edgex-core-consul-5b8cdbbfc8                      1         1         1       2m26s
replicaset.apps/edgex-core-data-c7775c56b                         1         1         1       100s
replicaset.apps/edgex-core-metadata-66f86d86b                     1         1         1       102s
replicaset.apps/edgex-device-virtual-597f9d77f                    1         1         1       90s
replicaset.apps/edgex-mongo-99c7d8957                             1         1         1       110s
replicaset.apps/edgex-support-logging-8566649bf8                  1         1         1       107s
replicaset.apps/edgex-support-notifications-586cf544f             1         1         1       104s
replicaset.apps/edgex-support-rulesengine-6cfc49f89b              1         1         1       94s
replicaset.apps/edgex-support-scheduler-7658f985bb                1         1         1       96s
replicaset.apps/edgex-sys-mgmt-agent-5dd56c65bb                   1         1         1       88s
replicaset.apps/edgex-ui-go-54668dcc94                            1         1         1       86s

NAME                     CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS   CLAIM                   STORAGECLASS   REASON   AGE
persistentvolume/pv001   512Mi      RWX            Recycle          Bound    default/db-data                                 8m11s
persistentvolume/pv002   128Mi      RWX            Recycle          Bound    default/consul-config                           8m11s
persistentvolume/pv003   128Mi      RWX            Recycle          Bound    default/consul-data                             8m10s
persistentvolume/pv004   128Mi      RWX            Recycle          Bound    default/log-data                                8m10s

NAME                                  STATUS   VOLUME   CAPACITY   ACCESS MODES   STORAGECLASS   AGE
persistentvolumeclaim/consul-config   Bound    pv002    128Mi      RWX                           8m4s
persistentvolumeclaim/consul-data     Bound    pv003    128Mi      RWX                           8m4s
persistentvolumeclaim/db-data         Bound    pv001    512Mi      RWX                           8m4s
persistentvolumeclaim/log-data        Bound    pv004    128Mi      RWX                           8m4s

動作確認

標準で edgex-device-virtual が動いているので、edgex-core-data のログなどを除けば値が取り込まれていることが観察できます。

$ kubectl logs edgex-core-data-c7775c56b-t8rdf --tail 4
level=INFO ts=2020-03-01T07:41:47.454970177Z app=edgex-core-data source=event.go:240 msg="Putting event on message queue"
level=INFO ts=2020-03-01T07:41:47.455410868Z app=edgex-core-data source=event.go:258 msg="Event Published on message queue. Topic: events, Correlation-id: 4d142e84-f3b2-49cd-a1d4-8be7dc7e5d36 "
level=INFO ts=2020-03-01T07:41:47.47515949Z app=edgex-core-data source=event.go:240 msg="Putting event on message queue"
level=INFO ts=2020-03-01T07:41:47.47523845Z app=edgex-core-data source=event.go:258 msg="Event Published on message queue. Topic: events, Correlation-id: 6e6f7f2a-da7f-4fc7-ac1f-85e5918c31ed "

適当に MQTT トピックにエクスポートさせると、実際に値が届いていることが確認できます。エクスポート先の MQTT ブローカは、マニフェストファイルで環境変数として定義しているので、適宜修正してください。

$ docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto
598878ed584699c61b4ccb95cdaeb53cc70b3a56793b53c83db1511483dc037a

$ kubectl create -f deployments/edgex-app-service-configurable-mqtt-export-deployment.yaml
deployment.apps/edgex-app-service-configurable-mqtt-export created

$ kubectl wait --for=condition=available --timeout=60s deployment edgex-app-service-configurable-mqtt-export
deployment.apps/edgex-app-service-configurable-mqtt-export condition met

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "#" -v
edgex-handson-topic {"id":"5a79e954-6c3f-4635-9e60-d93379a2a7ec","device":"Random-UnsignedInteger-Device","origin":1583049398041588772,"readings":[{"id":"529f5003-47b6-45ab-a743-7b4a21d937f4","origin":1583049398026423142,"device":"Random-UnsignedInteger-Device","name":"Uint8","value":"80"}]}
edgex-handson-topic {"id":"2cd0b56e-28c8-4035-9351-75028b37afef","device":"Random-UnsignedInteger-Device","origin":1583049398427444993,"readings":[{"id":"0a041b11-76f9-426c-9098-75affe58ea89","origin":1583049398412848451,"device":"Random-UnsignedInteger-Device","name":"Uint64","value":"2542183823530459689"}]}
edgex-handson-topic {"id":"00aaffb1-27ca-43ff-8150-ace2eefe2a32","device":"Random-Boolean-Device","origin":1583049398815297954,"readings":[{"id":"7ecc8518-ae56-4c78-8527-dd3442b14c6d","origin":1583049398800419572,"device":"Random-Boolean-Device","name":"Bool","value":"false"}]}
...

残されている問題

そんなわけで、とりあえずは動いていますが、

  • 起動・停止が面倒
    • マイクロサービス間に依存関係があり、起動順序に配慮が必要(依存関係自体は Docker Compose でも定義されているし起動順序も公式ドキュメントに書いてあるので無視できない)
    • Kubernetes では起動順序を制御しにくい(実装が検討されているっぽい のでそれに期待)
    • 現状では、コマンド群をシェルスクリプトにするか、InitContianers などでがんばるのが精々。Kustomize とか Helm でも細かい順序制御はしんどい気がする
  • 設定ファイルの流し込みがしにくい
    • configuration.toml などをコンテナに渡しにくい
    • PV を新しく作って InitContainers で git clone するなどの工夫が必要
  • ヘルスチェックをしていない
    • いわゆる Liveness Probe、Rediness Probe の追加をサボった
  • edgex-sys-mgmt-agent が部分的に動かないかも(未確認)
    • /var/run/docker.sock を消したが影響を確認していない

あたりに本気で使うにはハードルがありそうです。将来的には公式に参考実装やベストプラクティスが出てくるといいですね。

マニフェストの作成過程

いきなりできあがったモノを紹介しましたが、作る過程をざっくり書いておきます。

とりあえず Kompose で変換する

Docker Compose ファイルを Kubernetes 用のマニフェストに変換できる Kompose というものがあります。おもむろにこれに突っ込めば完成!

$ kompose convert -f docker-compose.yml
INFO Network edgex-network is detected at Source, shall be converted to equivalent NetworkPolicy at Destination
...
INFO Kubernetes file "app-service-rules-service.yaml" created
...
INFO Kubernetes file "edgex-network-networkpolicy.yaml" created

となるはずもなく、このままだと全然動きません。この作業で、

  • Services(*-service.yaml
  • Deployments(*-deployment.yaml
  • Persistent Volyme Claims(*-persistentvolumeclaim.yaml
  • Network Policy(*-networkpolicy.yaml

の 4 種類のファイルができあがっています。

このままだと動かないとは言いつつ、ベースとしては充分使えるので、これを編集していく形で実装を考えます。

アーキテクチャを考える

この段階で、テストだからがばがばでよしとすることにして、Network Policy のマニフェストは消しました。

残りのマニフェストで定義された構成を、以下のように考えて組み替えていきます。

  • 共有ボリュームの持たせ方
    • Docker Compose ファイルだと、ボリュームが無駄に広い範囲で共有されすぎている気がする
    • 最新の Nightly Build リリースの Docker Compose ファイルをみると、最小限に絞られていそうだ
    • Nightly Build 版での共有っぷりを参考に、今回も共有範囲は最小限に絞ることにする
  • 共有ボリュームへのファイルの配置の仕方
    • edgex-files のコンテナイメージ内に共有ボリューム上に置くべきファイルが保存されている
    • Docker Compose と同じようにマウントしてしまうと、既存のファイルがオーバライドされて見えなくなる
    • 別のパスにマウントして、commandargs で必要なファイルをコピーさせるようにする
    • 一度だけ動けばよいので、restartPolicyNever にした Pod にする
  • 設定の配り方
    • 先行実装では edgex-config-seed を消し去っていた(ローカルの設定ファイルを使わせていた)例がある
    • とはいえ EdgeX Foundry は Consul ありきでできているフシもあるので、できれば Docker Compose のときと同じようにしたい
    • 一度だけ動けばよいので、restartPolicyNever にした Pod にする
  • Docker ならではの実装の排除
    • Portainer は Docker 前提なので消すことにする
    • /var/run/docker.sock のマウントも Docker 前提なので消すことにする
  • PV の作り方
    • Deployment に直接 PV を持たせるのではなく、お作法通り PVC を使ってマウントさせるようにする

マニフェストの整形

方針が決まったら、Kompose で変換されたマニフェストファイルを実際にきれいにしていきます。

  • Services
    • metadata を削除、または修正
    • ほかのマイクロサービスから正しく名前解決できるように name を各マイクロサービスのホスト名に修正
    • selector を Deployment に合わせて修正
    • 外部からアクセスしそうなサービスに type: LoadBalancer を追加
  • Deployments
    • API バージョンを修正、併せて必須パラメータ(selector)追加
    • metadata を削除、または修正
    • Deployment と Pod の name を各マイクロサービスのホスト名に統一
    • 不要なボリュームの削除
    • edgex-core-config-seededgex-files の削除
  • Pods
    • edgex-core-config-seededgex-files を新規に追加
    • restartPolicy やボリュームの変更、commandargs の追加
  • Persistent Volume Claims
    • metadata を削除、または修正
    • accessModesReadWriteMany に変更
    • サイズを変更(値は このとき の実使用量を基に決定)
  • Persistent Volumes
    • 新規に追加

こまごまトラブルシュート

で、だいたいできたと思ったら、一部のマイクロサービスで、REST エンドポイントとして必要な HTTP サーバが起動しない状態になりました。以下のような具合で、起動しようとした直後に停止してしまっています。

$ kubectl logs edgex-support-logging-66fc4f6c98-28bk4
...
level=INFO ts=2020-02-25T12:47:50.622662811Z app=edgex-support-logging source=httpserver.go:72 msg="Web server starting (edgex-support-logging:48061)"
level=INFO ts=2020-02-25T12:47:50.624106767Z app=edgex-support-logging source=httpserver.go:80 msg="Web server stopped"
...

いろいろ調べたら、以下のような状態のようでした。

  • edgex-go リポジトリで管理されているマイクロサービスで起きる
  • HTTP サーバは、マイクロサービスの起動時に <ホスト名>:<ポート番号> で待ち受けを指示される
    • 上記のログの例だと edgex-support-logging:48061
    • 待ち受けのホスト名とポート番号は Consul から受け取る
  • HTTP サーバは、指示された <ホスト名> の名前解決を試みる
    • コンテナ内の /etc/hosts を無視して、Kubernetes による名前解決が行われる
    • 名前解決したいホスト名が Service の名前と一致しているため、Kubernetes の ClusterIP に解決される
  • Pod 内のプロセスからは ClusterIP は利用できない(Pod の IP アドレスしか認識されない)ため、待ち受けできずに死ぬ

Consul 上のホスト名を書き換えるのも、Pod と Service でホスト名を別にするのも、副作用がわからないのでできれば避けたいところです。そもそも Kubernetes の仕様上、コンテナ内の /etc/hosts には Pod の IP アドレスと Pod のホスト名が正しく書いてあるわけですから、これを参照してくれさえすればよいだけの話です。

が、Go 言語のデフォルトの名前解決の仕組みでは、/etc/nsswitch.conf が存在しない(または存在していても中で files が指定されていない)場合に、/etc/hosts は無視されるようです。この挙動の修正は 1.15 のロードマップに乗っている ものの、要するに、現時点ではそういう仕様ということになります。

そして、問題が起きているマイクロサービスのコンテナイメージは、scratchalpine などがベースになっているので、/etc/nsswitch.conf が存在しません。

というわけで、いろいろな仕様が重なった結果として、こういう問題が引き起こされています。がんばりましょう。

この解決にあたって、以下の 3 案を考えました。今回はいちばんマニフェストへの取り込みがラクな最初の案を採用しています。

  • 環境変数 GODEBUG に、値 netdns=cgo を与える
    • 名前解決に Go 標準ではなく CGO の仕組みを使うことを強制するオプション
    • CGO では /etc/nsswitch.conf が存在していなくても /etc/hosts が読み取られる
  • /etc/nsswitch.conf(中身は hosts: files dns)をコンテナ内に配置する
    • コンテナイメージを自製するか、PV や InitContainer などを使ってがんばる必要がある
  • Consul の K/V ストアの /edgex/core/1.0/*/Service/Host を全部 0.0.0.0 にする
    • Consul を起動させて、手で書き換えてから残りのサービスを起動するようにする
    • 副作用が不明

先行実装

冒頭で書いた通り、EdgeX Foundry の Kubernetes 上での動作は、すでに過去に試みられています。現状で確認できているものを簡単に紹介します。

EdgeX on Kubernetes

EdgeX Foundry のブログの 2018 年のエントリで、当時のリリースを Kubernetes で動作させる例が紹介されています。

2018 年当時なので、使われているイメージは古く、どうやら Barcelona ベースのようです。

また、共有ボリュームとして利用する PV が hostPath で定義されているので、シングルノード上での動作が前提になっているものと推測されます。

edgex-kubernetes-support

GitHub の edgexfoundry-holding 配下のリポジトリとして用意されています。比較的新しめです。

こちらは Edinburgh をベースにしたものと Nightly Build をベースにしたものがありました。Nightly Build とはいえ、半年以上前のそれなので、実際は Fuji 相当に近そうです。Helm チャートも用意されています。

K8s だけでなく K3s も対象としている点はうれしいですが、これも PV が hostPath で定義されているので、シングルノードでの動作を前提としているようです。

まとめ

EdgeX Foundry の Fuji リリースを、マルチノード構成の Kubernetes 上で動作させるためのマニフェストファイルを作成して、動作を確認しました。

Kompose は便利ですが、やはりそれなりの規模の Docker Compose ファイルだと完全に無編集でそのまま使えるというわけにはいかないですね。よい経験になりました。

EdgeX Foundry も Kubernetes も、本当に日々更新されていっているので、追従し続けるのは相当しんどそうです。このブログの EdgeX Foundry 関連エントリも、来月に予定されている次期バージョン(Geneva)のリリースですぐに時代遅れになるわけです。がんばりましょう。

EdgeX Foundry 関連エントリ


Rapsberry Pi

冬休みの自由研究: EdgeX Foundry (6) アプリケーションサービスによるエクスポート

EdgeX Foundry 関連エントリ

エクスポート方法の新旧

エクスポートサービスを取り扱ったエントリの最後で、以下のように書きました。

……と、意気揚々と書いてきたものの、今回利用したエクスポートサービスは最近のリリースではすでに廃止されており、エクスポートの機能は アプリケーションサービス に統合されているようです。

冬休みの自由研究: EdgeX Foundry (3) エクスポートサービスによるエクスポート

アプリケーションサービスに関する公式ドキュメント では、エクスポートサービスの利用はまだサポートされるものの、今後はアプリケーションサービスの利用が推奨されています。

エクスポートサービスを利用したエクスポートの欠点として、

  • エクスポートサービスは、登録されているすべてのクライアントにデータを順次配信するが、これがボトルネックになり、エンドポイント側の受信が遅延する
  • 登録されているクライアントを管理することそれ自体のオーバヘッドと複雑さが EdgeX Foundry にとって負担である
  • 特定のクラウドへのエクスポートに限定して作りこむのではなく、SDK を提供してクラウドプロバイダにとらわれない状態にすべきだ

などが挙げられています。ここでいう SDK とそれを使った実装がアプリケーションサービスなわけですが、アプリケーションサービスでは、

  • コアサービス(データを ZeroMQ で配る Core Data)に直接つなげられることで、パフォーマンスの問題を排除する
  • 開発者には、データが利用可能になったら直ちにそれに対して何らかの処理を行える手段が提供される
  • 登録作業が不要である

などの点でよい感じになっています。

いまいち正しくない気もしますが、ざっくりまとめると、下図の上が下になったので、

上がこれまで、下がこれから

エクスポートサービス(のディストリビューションサービス)がボトルネックにならなくなったし、値を好きに処理できる場所もできたよ! ということかと思います。

今回試すこと

そういうわけで、エクスポートサービスのなくなった世界でも生きていけるよう、エクスポートサービスを扱ったエントリと同様に、MQTT トピックと REST エンドポイントへ、アプリケーションサービスを使ってエクスポートできる状態を作ります。

今回も GitHub にもろもろ置いてあります ので、こちらをクローンして使います。

$ git clone https://github.com/kurokobo/edgex-lab-handson.git
 
$ cd lab05

アプリケーションサービス

上図のアプリケーションサービスの箱の中には、ファンクション(Fn で表記)を並べました。アプリケーションサービスでは、このように任意のファンクション群をパイプライン化してデータフローを定義できます。

例えば簡単な例では、標準で用意されているファンクションを並べるだけでも、

  1. デバイス名でフィルタするファンクション
  2. リソース名でフィルタするファンクション
  3. JSON 形式に変換するファンクション
  4. MQTT でエクスポートするファンクション

などのパイプラインが構成できます。現時点で利用できるファンクションは、SDK の README.md で確認できます。

構成の考え方

標準で実装されているアプリケーションサービスとして、app-service-configurable がありますが、これは、ひとつのパイプラインをひとつのインスタンスで処理する前提で作られているようです。また、パイプラインそのものは、インスタンスごとの設定ファイル configuration.toml で定義します。

つまり、コンテナ環境の例でいえば、冒頭の絵のように、

  • 最終的に MQTT トピックにエクスポートする app-service-configurable コンテナ
  • 最終的に REST エンドポイントにエクスポートする app-service-configurable コンテナ

など、app-service-configurable が、目的に応じて複数起動している状態を作るということです。

なお、コアサービスのホスト名やポート番号、MQTT ではブローカのホスト名など環境に依存する情報は、コンテナの場合は環境変数で Docker Compose ファイルから渡せるようになっています。エクスポートサービスでは API を叩いてクライアントとして登録する作業が必要でしたが、すべての構成をあらかじめファイルとして定義しておけるということですね。

設定ファイルの確認

コンテナの場合、デフォルトでコンテナ内の /res に、目的別に事前構成された configuration.toml がディレクトリを分けて配置されています。そして、起動時に渡す環境変数で、どのディレクトリの中身を利用するか指定します。

例えば、MQTT トピックへのエクスポートで利用する configuration.toml は以下です。

$ cat mqtt-export/configuration.toml
...
  [Writable.Pipeline]
    ExecutionOrder = "TransformToJSON, MQTTSend, MarkAsPushed"

    [Writable.Pipeline.Functions.TransformToJSON]
    [Writable.Pipeline.Functions.MarkAsPushed]
    [Writable.Pipeline.Functions.FilterByDeviceName]
      [Writable.Pipeline.Functions.FilterByDeviceName.Parameters]
        DeviceNames = ""
    [Writable.Pipeline.Functions.FilterByValueDescriptor]
      [Writable.Pipeline.Functions.FilterByValueDescriptor.Parameters]
        ValueDescriptors = ""
    [Writable.Pipeline.Functions.MQTTSend]
      [Writable.Pipeline.Functions.MQTTSend.Parameters]
        qos="0"
        key=""
        autoreconnect="false"
        retain="false"
        cert=""
        persistOnError = "false"
      [Writable.Pipeline.Functions.MQTTSend.Addressable]
        Address=   "localhost"
        Port=      1883
        Protocol=  "tcp"
        Publisher= "AppServiceConfigurable-mqtt-export"
        User=      ""
        Password=  ""
        Topic=     "edgex-events"
...

ExecutionOrder に含まれる関数がパイプラインとして順次実行されていきます。この例だと、JSON に変換してから MQTT トピックに送信しています。

利用する関数は下で列挙されていますが、ExecutionOrder に含まれないものは実際には利用されないようです。デバイス名でフィルタする関数(FilterByDeviceName)なども書かれていますが、使いたいなら自分で ExecutionOrder に書いてね、ということでしょう。

また、MQTT ブローカのホスト名に localhost が指定されていたり、トピック名が edgex-events になっていたりしますが、先述のとおり、この辺りは環境変数で書き換えますのでこのままで問題ありません。

同様に、REST エンドポイントへのエクスポートで利用する設定ファイルは以下です。

$ cat http-export/configuration.toml
...
  [Writable.StoreAndForward]
    Enabled = false
    RetryInterval = "5m"
    MaxRetryCount = 10

  [Writable.Pipeline]
    UseTargetTypeOfByteArray = false
    ExecutionOrder = "FilterByDeviceName, TransformToJSON, HTTPPostJSON, MarkAsPushed"

    [Writable.Pipeline.Functions.TransformToJSON]
    [Writable.Pipeline.Functions.MarkAsPushed]
    [Writable.Pipeline.Functions.FilterByDeviceName]
      [Writable.Pipeline.Functions.FilterByDeviceName.Parameters]
        DeviceNames = ""
    [Writable.Pipeline.Functions.FilterByValueDescriptor]
      [Writable.Pipeline.Functions.FilterByValueDescriptor.Parameters]
        ValueDescriptors = ""
    [Writable.Pipeline.Functions.HTTPPostJSON]
      [Writable.Pipeline.Functions.HTTPPostJSON.Parameters]
        url = "http://"
        persistOnError = "false"
...

環境変数による柔軟なオーバライド

設定ファイルに含まれるパラメータは、実装から推測すると、おそらくすべて環境変数で書き換えが可能です。セクション名の区切り文字を _ に置換して、さらに _<パラメータ名> を付与して環境変数として与えれば、オーバライドされるようでした。

例えば、ExecutionOrder 自体も、Writable_Pipeline_ExecutionOrder 環境変数として与えられます。例えば先述の mqtt-exportExecutionOrder には FilterByDeviceName が含まれせんが、環境変数で以下を与えることで、このフィルタも有効化できます。

  • 環境変数 Writable_Pipeline_ExecutionOrder
    • FilterByDeviceName, TransformToJSON, MQTTSend, MarkAsPushed
  • 環境変数 Writable_Pipeline_Functions_FilterByDeviceName_Parameters_DeviceNames
    • <デバイス名>

今回の構成の定義

では、今回用の構成を作ります。先の通り、環境依存の値は環境変数経由で与えればよいので、今回編集するのは、Docker Compose ファイルです。

Docker Compose ファイルには、もともと、ルールエンジンにデータを送る用の app-service-rules が定義されています。

$ cat docker-composer.yml
...
  app-service-rules:
    image: edgexfoundry/docker-app-service-configurable:1.0.0
    ports:
      - "48096:48096"
    container_name: edgex-app-service-configurable-rules
    hostname: edgex-app-service-configurable-rules
    networks:
      edgex-network:
        aliases:
          - edgex-app-service-configurable-rules
    environment:
      <<: *common-variables
      edgex_service: http://edgex-app-service-configurable-rules:48096
      edgex_profile: rules-engine
      Service_Host: edgex-app-service-configurable-rules
      MessageBus_SubscribeHost_Host: edgex-core-data
    depends_on:
      - consul
      - logging
      - data
...

今回は、これに加えて、MQTT トピックにエクスポートする app-service-mqtt-export と、REST エンドポイントにエクスポートする app-service-http-export を追加しています。

$ cat docker-composer.yml
...
  app-service-mqtt-export:
    image: edgexfoundry/docker-app-service-configurable:1.0.0
    ports:
      - "48097:48097"
    container_name: edgex-app-service-configurable-mqtt-export
    hostname: edgex-app-service-configurable-mqtt-export
    networks:
      edgex-network:
        aliases:
          - edgex-app-service-configurable-mqtt-export
    environment:
      <<: *common-variables
      edgex_service: http://edgex-app-service-configurable-mqtt-export:48097
      edgex_profile: mqtt-export
      Service_Host: edgex-app-service-configurable-mqtt-export
      MessageBus_SubscribeHost_Host: edgex-core-data
      Writable_Pipeline_Functions_MQTTSend_Addressable_Address: 192.168.0.100
      Writable_Pipeline_Functions_MQTTSend_Addressable_Port: 1883
      Writable_Pipeline_Functions_MQTTSend_Addressable_Protocol: tcp
      # Writable_Pipeline_Functions_MQTTSend_Addressable_Publisher: 
      # Writable_Pipeline_Functions_MQTTSend_Addressable_User: 
      # Writable_Pipeline_Functions_MQTTSend_Addressable_Password: 
      Writable_Pipeline_Functions_MQTTSend_Addressable_Topic: edgex-handson-topic
      # Writable_Pipeline_Functions_MQTTSend_Parameters_Qos: 
      # Writable_Pipeline_Functions_MQTTSend_Parameters_Key: 
      # Writable_Pipeline_Functions_MQTTSend_Parameters_Cert: 
      # Writable_Pipeline_Functions_MQTTSend_Parameters_Autoreconnect: 
      # Writable_Pipeline_Functions_MQTTSend_Parameters_Retain: 
      # Writable_Pipeline_Functions_MQTTSend_Parameters_PersistOnError: 
    volumes:
      - ./app-service-configurable:/res
    depends_on:
      - consul
      - logging
      - data

  app-service-http-export:
    image: edgexfoundry/docker-app-service-configurable:1.0.0
    ports:
      - "48098:48098"
    container_name: edgex-app-service-configurable-http-export
    hostname: edgex-app-service-configurable-http-export
    networks:
      edgex-network:
        aliases:
          - edgex-app-service-configurable-http-export
    environment:
      <<: *common-variables
      edgex_service: http://edgex-app-service-configurable-http-export:48098
      edgex_profile: http-export
      Service_Host: edgex-app-service-configurable-http-export
      MessageBus_SubscribeHost_Host: edgex-core-data
      Writable_Pipeline_Functions_HTTPPostJSON_Parameters_url: http://192.168.0.100:5000/api/v1/echo
      # Writable_Pipeline_Functions_HTTPPostJSON_Parameters_persistOnError: 
    volumes:
      - ./app-service-configurable:/res
    depends_on:
      - consul
      - logging
      - data 
...

使っているコンテナイメージは、いずれもまったく同じです。ボリュームも同じマウントのしかたをしていますが、使う設定ファイルを edgex_profile で制御しています。また、読ませる設定ファイルに応じて、必要な設定を環境変数で入れています。

MQTT ブローカや REST エンドポイントのホスト名などは環境に合わせて適宜修正してください。ほかにも、configuration.toml で変更したい値があれば、環境変数として与えると変えられます。

また、今回はエクスポートサービスが起動しないようにコメントアウトしています。

起動

下準備として、MQTT ブローカを起動して、購読を開始します。

$ docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "#" -v

別のターミナルで REST エンドポイントも起動します。

$ cd rest-endpoint

$ python ./main.py
 * Serving Flask app "main" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: on
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 159-538-312
 * Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)

この状態で、EdgeX Foundry を起動します。

$ docker-compose up -d

動作確認

必要な設定はすべて環境変数を使って与えていたので、何もしないでも MQTT トピックと REST エンドポイントに値が届き出します。

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "#" -v
...
edgex-handson-topic {"id":"a11011bb-30cb-44fa-885c-8929c64e35b1","device":"Random-UnsignedInteger-Device","origin":1580015685340757500,"readings":[{"id":"26f50397-1e8b-4ca0-8db2-eeeee958a643","origin":1580015685326443600,"device":"Random-UnsignedInteger-Device","name":"Uint8","value":"90"}]}
edgex-handson-topic {"id":"526631fd-a0f7-437d-98a9-4f6eb8a06775","device":"Random-UnsignedInteger-Device","origin":1580015685369824800,"readings":[{"id":"91ef8066-5c31-4377-84e6-a6d517457f62","origin":1580015685356418200,"device":"Random-UnsignedInteger-Device","name":"Uint16","value":"27009"}]}
...
$ python ./main.py
...
--
2020-01-26 14:15:10.318596
{"id":"be20b21b-f0a4-4550-95bb-a78b5c597f56","device":"Random-Integer-Device","origin":1580015710310854200,"readings":[{"id":"d62fdacc-bfa4-4d05-a537-25c03e151f78","origin":1580015710295318700,"device":"Random-Integer-Device","name":"Int8","value":"3"}]}
192.168.0.100 - - [26/Jan/2020 14:15:10] "POST /api/v1/echo HTTP/1.1" 200 -
--
2020-01-26 14:15:10.375595
{"id":"c0275732-2e3d-4945-9435-d6e77e0ccb74","device":"Random-Integer-Device","origin":1580015710364584600,"readings":[{"id":"067d71a3-7c1f-4fea-968d-ac5db0410e67","origin":1580015710350134100,"device":"Random-Integer-Device","name":"Int16","value":"-23040"}]}
192.168.0.100 - - [26/Jan/2020 14:15:10] "POST /api/v1/echo HTTP/1.1" 200 -
...

簡単ですね。

これらの値は、冒頭で紹介した通り、それぞれのアプリケーションサービスが直接コアサービス層から値を受け取って処理したうえでエクスポートしています。データパスが分散して並列化するので、エクスポートサービスを使っていた頃に存在していたボトルネックが回避されます。

まとめ

エクスポートサービスを利用しないエクスポート手段として、アプリケーションサービスを構成して試しました。

使ってみると、確かにこのアーキテクチャのほうが素直で圧倒的に使い勝手が良いです。家の環境もこっちに置き換えました。

現段階ではバンドルされている関数は多くないですが、今後この辺りも充実してくることが期待できそうです。

EdgeX Foundry 関連エントリ


Rapsberry Pi

冬休みの自由研究: EdgeX Foundry (5) ルールエンジンによる自動制御

EdgeX Foundry 関連エントリ

今回のゴール

これまでのエントリでは、仮想デバイスや MQTT デバイスを用いて、デバイスからの情報の収集やデバイスへのコマンドの実行を試してきました。

今回は、ルールエンジンの動作、つまり、

  • 何らかのデバイスのリソースの値が
  • 何らかの条件を満たしたら
  • 別のデバイスでコマンドを実行する

ような自動制御を実際に試します。

前回のエントリ で構成した MQTT デバイスだけを利用してもすぐできるのですが、それだけだとおもしろくないので、新しいデバイスサービスをひとつ追加して、それを組み込みます。具体的には、

  • ルール (1)
    • REST デバイスから送られる値が
    • 80 を越えたら
    • MQTT デバイスにメッセージ HIGH を送信する
  • ルール (2)
    • REST デバイスから送られる値が
    • 20 を下回ったら
    • MQTT デバイスにメッセージ LOW を送信する

ような状態を目指します。図示すると以下のような状態です。

REST デバイスをトリガに MQTT デバイスが自動制御される

今回も GitHub にもろもろ置いてあります ので、こちらをクローンして使います。

$ git clone https://github.com/kurokobo/edgex-lab-handson.git

$ cd lab04

REST デバイスの追加

まだ開発途中で正式リリースには至っていないようですが、device-rest-go と名付けられたデバイスサービスがあります。

REST デバイスサービスの概要

これは、以下のようなデバイスサービスです。

  • REST で POST してくるデバイスに利用する
  • POST されたリクエストボディの中身を値として EdgeX Foundry に取り込む

このデバイスサービスを構成すると、エンドポイントとして

  • /api/v1/resource/<デバイス名>/<リソース名>

が作成され、これに対して POST することで値を取り込んでくれるようになります。デバイス名やリソース名は、前回取り扱った MQTT デバイスと同様、デバイスプロファイルなどの設定ファイルで定義できます。

なお、現時点では情報のやりとりは一方向であり、つまり、デバイス側から POST された値を取り込むのみで、逆にデバイス側へのリクエストの発行はできないようです。 また、JSON を投げても、現時点の実装ではパースはしてくれず単なる文字列として扱われるようです。

今回試す設計

今回は、 device-rest-goREADME.md を参考に、以下のデバイスサービスとデバイスを定義しています。

  • デバイスプロファイル
    • rest/rest.test.device.profile.yml ファイル
    • リソース intfloat を定義
  • デバイスサービス設定
    • rest/configuration.toml ファイル
    • 上記のデバイスプロファイルに紐づけた REST_DEVICE を定義

これにより、

  • /api/v1/resource/REST_DEVICE/int
  • /api/v1/resource/REST_DEVICE/float

に値を POST すれば取り込んでもらえるようになるはずです。

この REST デバイスサービスを含め、今回分の環境は、Docker Compose ファイルに反映済みです

起動

では、今回分の環境をまとめて起動させます。

MQTT デバイスは今回も使うので、 環境に合わせて mqtt/configuration.toml は修正してください。

MQTT ブローカ、MQTT デバイス、EdgeX Foundry の順に起動します。

$ docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto

$ docker run -d --restart=always --name=mqtt-scripts -v "$(pwd)/mqtt-scripts:/scripts" dersimn/mqtt-scripts --url mqtt://192.168.0.100 --dir /scripts

$ docker-compose up -d

REST デバイスサービスの動作確認

起動が確認できたら、実際にエンドポイントに POST して、データの取り込みを確認します。待ち受けポートは 49986 です。Content-Typetext/plain である必要があります。

$ curl -X POST -H "Content-Type: text/plain" -d 12345 http://localhost:49986/api/v1/resource/REST_DEVICE/int

edgex-device-rest コンテナのログで Pushed event to core data が記録されていれば、値は正常に受け付けられています。

$ docker logs --tail 5 edgex-device-rest
level=DEBUG ts=2020-01-25T07:49:17.0621062Z app=edgex-device-rest source=resthandler.go:82 msg="Received POST for Device=REST_DEVICE Resource=int"
level=DEBUG ts=2020-01-25T07:49:17.0673842Z app=edgex-device-rest source=resthandler.go:101 msg="Content Type is 'text/plain' & Media Type is 'text/plain' and Type is 'Int64'"
level=DEBUG ts=2020-01-25T07:49:17.0726606Z app=edgex-device-rest source=resthandler.go:142 msg="Incoming reading received: Device=REST_DEVICE Resource=int"
level=DEBUG ts=2020-01-25T07:49:17.0766349Z app=edgex-device-rest source=utils.go:75 correlation-id=946f482c-db1f-44b5-8a76-437c4ca3d3e9 msg="SendEvent: EventClient.MarshalEvent encoded event"
level=INFO ts=2020-01-25T07:49:17.0899243Z app=edgex-device-rest source=utils.go:93 Content-Type=application/json correlation-id=946f482c-db1f-44b5-8a76-437c4ca3d3e9 msg="SendEvent: Pushed event to core data"

実際に取り込まれているようです。

$ edgex-cli reading list REST_DEVICE -l 10
Reading ID                              Name    Device          Origin                  Value   Created         Modified        Pushed
b43f7300-1377-4fdb-a82f-44eb497d9568    int     REST_DEVICE     1579938557072646900     12345   3 minutes       3 minutes       50 years

ルールエンジンの利用

お膳立てができたので、本題のルールエンジンを構成していきます。

ルールエンジンの概要

詳細は 公式のドキュメント がわかりやすいですが、端的に言えば、

  • コアサービスまたはエクスポートサービスからデータを受け取る
  • 事前に定義されたルールの条件との合致を確認する
  • 合致していた場合は、そのルールで定義されたアクションを実行する

ような処理をしてくれるサービスです。

実装は現段階では BRMS の Drools ベースとのことなので、Java 製ですね。

ルールエンジンへデータを配信する構成パタンは、ドキュメントでは以下の二つが説明されています。

  • エクスポートサービスのクライアントとして動作させるパタン
    • データは Export Distribution(edgex-export-distro コンテナ)から配信される
    • データの流れが全体で統一されてシンプルに
    • レイテンシやエクスポートサービスのキャパシティなどの面にデメリット
  • コアサービスに直結させるパタン
    • データは Core Data(edgex-core-data コンテナ)から配信される
    • エクスポートサービスをバイパスすることで、レイテンシやパフォーマンス面ではメリットがある
    • データの流れはやや複雑に

この構成パタンの選択や、具体的な接続先の情報は、次で紹介する設定ファイルで指定できます。

ルールエンジンの設定

設定がデフォルトのままでよければ、コンテナイメージの中にすべて含まれているので、敢えて手で投入する必要はありませんが、今回はせっかくなので触れるようにしています。rulesengine フォルダの中のファイル群がそれです。

公式のドキュメント で触れられている設定が含まれるファイルが、rulesengine/application.properties です。

$ cat rulesengine/application.properties
...
export.client=false
...
export.client.registration.url=http://edgex-export-client:48071/api/v1
export.client.registration.name=EdgeXRulesEngine
...
export.zeromq.port=5563
export.zeromq.host=tcp://edgex-core-data
...

export.clientfalse なので、今回はコアサービスから直接データを受け取ります。export.zeromq.host は現時点のデフォルトでは tcp://edgex-app-service-configurable-rules になっており、アプリケーションサービス からデータを受け取る形になっていますが、今回はシンプルにドキュメントに従って tcp://edgex-core-data にしています。

もう一つのファイルが、rulesengine/rule-template.drl です。これはルールそのもののテンプレートとして利用されます。これについては後述します。

ルールの設定

では、実際にルールを設定していきます。現状、GUI でも CLI でもできないので、API を叩きます。標準 GUI には設定画面はあるものの、プルダウンに項目が出てこなくて設定できずでした。

ルールの検討

ルールは以下ような JSON で投入します。

{
    "name": "<ルール名>",
    "condition": {
        "device": "<トリガ条件にするデバイス名>",
        "checks": [
            {
                "parameter": "<トリガ条件にするリソース名>",
                "operand1": "<トリガ条件にする値>",
                "operation": "<比較演算子>",
                "operand2": "<トリガする閾値>"
            }
        ]
    },
    "action": {
        "device": "<操作対象のデバイス ID>",
        "command": "<操作対象のコマンド ID>",
        "body": "<操作対象のコマンドに PUT される内容>"
    },
    "log": "<トリガされたときのログ出力文字列>"
}

全体の構造は、condition で指定した条件を満たしたら action で指定したコマンドがトリガされる、と思えばよいでしょう。

condition での指定値は以下のように組んでいきます。

  • device
    • トリガ条件にするデバイスの名称を指定します。ID では動かないので注意です
    • 今回だと、REST デバイスの値を元にコマンドを実行したいので、REST_DEVICE を指定します
  • parameter
    • トリガ条件にするデバイスのリソース名を指定します。リソース名なので、デバイスプロファイルで指定した名称であり、すなわち Reading の名前でもあります
    • 今回の REST_DEVICE はリソース intfloat を持ちますが、今回は int を指定します
  • operand1
    • 比較元になる値を作ります。デバイスの値は内部では文字列値として扱われているため、数値であれば適切な型にキャストする必要があります
    • Drools が Java ベースのため、ここでは Java の文法で書きます。値に応じて、例えば以下などが使い分けられるでしょう
      • Integer.parseInt(value)
      • Float.parseFloat(value)
  • operation
    • 比較演算子です。<=> などが考えられます
  • operand2
    • 閾値です

action での指定値は、以下のように組みます。

  • device
    • 先ほどと異なり、ここでは ID で指定します
    • 今回は MQ_DEVICE の ID です
    • デバイスの ID は API で確認できます(方法は 過去のエントリ で)
  • command
    • コマンドも ID で指定します
    • 今回は testmessage の ID です
    • コマンドの ID も API で確認できます(方法は 過去のエントリ で)
  • body
    • コマンドの PUT 命令のボディを JSON 形式で指定します
    • めっちゃエスケープが必要です
    • 例えば今回だと、{\\\"message\\\":\\\"HIGH\\\"}{\\\"message\\\":\\\"LOW\\\"} です

最終的に、今回作りたい以下のルール群は、

  • ルール (1)
    • REST デバイスから送られる値が
    • 80 を越えたら
    • MQTT デバイスにメッセージ HIGH を送信する
  • ルール (2)
    • REST デバイスから送られる値が
    • 20 を下回ったら
    • MQTT デバイスにメッセージ LOW を送信する

ひとつめは以下の JSON で、

{
    "name": "rule_int_high",
    "condition": {
        "device": "REST_DEVICE",
        "checks": [
            {
                "parameter": "int",
                "operand1": "Integer.parseInt(value)",
                "operation": ">",
                "operand2": "80"
            }
        ]
    },
    "action": {
        "device": "09dae1fc-e2be-4388-9677-639d2f24c58b",
        "command": "1c7a50e7-5424-4bce-9b9a-29849510580e",
        "body": "{\\\"message\\\":\\\"HIGH\\\"}"
    },
    "log": "Action triggered: The value is too high."
}

ふたつめは以下の JSON であらわされます。

{
    "name": "rule_int_low",
    "condition": {
        "device": "REST_DEVICE",
        "checks": [
            {
                "parameter": "int",
                "operand1": "Integer.parseInt(value)",
                "operation": "<",
                "operand2": "20"
            }
        ]
    },
    "action": {
        "device": "09dae1fc-e2be-4388-9677-639d2f24c58b",
        "command": "1c7a50e7-5424-4bce-9b9a-29849510580e",
        "body": "{\\\"message\\\":\\\"LOW\\\"}"
    },
    "log": "Action triggered: The value is too low."
}

ルールの投入

では、ルールを投入します。エンドポイントは以下です。

  • http://localhost:48075/api/v1/rule

ここに、先の JSON を POST します。

ひとつめのルールの投入。true が返れば成功
ふたつめのルールの投入

ルールの確認

ルールが投入されると、rule-template.drl を元に新しい <ルール名>.drl ファイルが生成されて利用されます。edgex-support-rulesengine 内に配置されるので、ここではコンテナ内のファイルを cat して覗きます。

$ docker exec edgex-support-rulesengine cat /edgex/edgex-support-rulesengine/rules/rule_int_high.drl
package org.edgexfoundry.rules;
global org.edgexfoundry.engine.CommandExecutor executor;
global org.edgexfoundry.support.logging.client.EdgeXLogger logger;
import org.edgexfoundry.domain.core.Event;
import org.edgexfoundry.domain.core.Reading;
import java.util.Map;
rule "rule_int_high"
when
  $e:Event($rlist: readings && device=="REST_DEVICE")
  $r0:Reading(name=="int" && Integer.parseInt(value) > 80) from $rlist
then
executor.fireCommand("09dae1fc-e2be-4388-9677-639d2f24c58b", "1c7a50e7-5424-4bce-9b9a-29849510580e", "{\"message\":\"HIGH\"}");
logger.info("Action triggered: The value is too high.");
end

$ docker exec edgex-support-rulesengine cat /edgex/edgex-support-rulesengine/rules/rule_int_low.drl
package org.edgexfoundry.rules;
global org.edgexfoundry.engine.CommandExecutor executor;
global org.edgexfoundry.support.logging.client.EdgeXLogger logger;
import org.edgexfoundry.domain.core.Event;
import org.edgexfoundry.domain.core.Reading;
import java.util.Map;
rule "rule_int_low"
when
  $e:Event($rlist: readings && device=="REST_DEVICE")
  $r0:Reading(name=="int" && Integer.parseInt(value) < 20) from $rlist
then
executor.fireCommand("09dae1fc-e2be-4388-9677-639d2f24c58b", "1c7a50e7-5424-4bce-9b9a-29849510580e", "{\"message\":\"LOW\"}");
logger.info("Action triggered: The value is too low.");
end

手元の rule-template.drl と見比べると、テンプレートを元に展開されているっぽさがわかりますね。Drools の drl ファイルの文法にあまり詳しくないですが、行われているのが単純な文字列連結なのであれば、operand に指定する値あたりは工夫すると、もうちょっと複雑な計算もできるのかもしれません。試していませんし、インジェクション攻撃っぽいですけど。

なお、現在の API ではルールの中身までは確認できず、ルール名の一覧が取得できるのみのようです。悲しい。

$ curl -s http://localhost:48075/api/v1/rule | jq
[
  "rule_int_high",
  "rule_int_low"
]

ルールエンジンの動作確認

動作を確認していきます。

動きを追いやすくするため、下準備として edgex-support-rulesengine のログを常時表示させて、さらに別のターミナルで MQTT ブローカの全トピックを購読しておくとわかりやすいです。

$ docker logs -f --tail=10 edgex-support-rulesengine
[2020-01-25 13:44:15.023] boot - 6  INFO [main] --- ZeroMQEventSubscriber: JSON event received
[2020-01-25 13:44:15.024] boot - 6  INFO [main] --- ZeroMQEventSubscriber: Event sent to rules engine for device id:  MQ_DEVICE
[2020-01-25 13:44:17.214] boot - 6  INFO [main] --- ZeroMQEventSubscriber: JSON event received
[2020-01-25 13:44:17.216] boot - 6  INFO [main] --- ZeroMQEventSubscriber: Event sent to rules engine for device id:  MQ_DEVICE
...
$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "#" -v
logic/connected 2
DataTopic {"name":"MQ_DEVICE","cmd":"randfloat32","randfloat32":"27.0"}
...

では、REST デバイスの気持ちになって、まずはルールに合致しないデータを REST デバイスサービスに POST します。

$ curl -X POST -H "Content-Type: text/plain" -d 50 http://localhost:49986/api/v1/resource/REST_DEVICE/int

値は取り込まれていますし、

$ edgex-cli reading list REST_DEVICE -l 10
Reading ID                              Name    Device          Origin                  Value   Created         Modified        Pushed
66461804-a894-470f-a099-631ffd4c32cb    int     REST_DEVICE     1579960045357298000     50      About a minute  About a minute  50 years

ルールエンジンにも REST_DEVICE からの値は届いているようなログが出ますが、実際には何もトリガされません。正常です。

$ docker logs -f --tail=10 edgex-support-rulesengine
...
[2020-01-25 13:47:25.370] boot - 6  INFO [main] --- ZeroMQEventSubscriber: JSON event received
[2020-01-25 13:47:25.466] boot - 6  INFO [main] --- ZeroMQEventSubscriber: Event sent to rules engine for device id:  REST_DEVICE
...

つづいて、ルールに合致する値を投げます。

$ curl -X POST -H "Content-Type: text/plain" -d 90 http://192.168.0.100:49986/api/v1/resource/REST_DEVICE/int

ルールエンジンのログでは、指定したログメッセージが記録され、 {"message":"HIGH"} が指定したコマンドにリクエストされたことがわかります。

$ docker logs -f --tail=10 edgex-support-rulesengine
...
[2020-01-25 13:57:26.805] boot - 6  INFO [main] --- ZeroMQEventSubscriber: JSON event received
[2020-01-25 13:57:26.853] boot - 6  INFO [main] --- RuleEngine: Action triggered: The value is too high.
[2020-01-25 13:57:26.853] boot - 6  INFO [SimpleAsyncTaskExecutor-2] --- CommandExecutor: Sending request to:  09dae1fc-e2be-4388-9677-639d2f24c58bfor command:  1c7a50e7-5424-4bce-9b9a-29849510580e with body: {"message":"HIGH"}
[2020-01-25 13:57:26.866] boot - 6  INFO [main] --- RuleEngine: Event triggered 1rules: Event [pushed=0, device=REST_DEVICE, readings=[Reading [pushed=0, name=int, value=90, device=REST_DEVICE]], toString()=BaseObject [id=538aea34-b4ce-4342-88a9-6c08cdac080e, created=0, modified=0, origin=1579960646793054700]]
...

MQTT ブローカ上では、MQTT デバイスに対するコマンド実行が行われた様子がわかります。

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "#" -v
CommandTopic {"cmd":"message","message":"HIGH","method":"set","uuid":"5e2c4946b8dd790001754b8b"}
ResponseTopic {"cmd":"message","message":"HIGH","method":"set","uuid":"5e2c4946b8dd790001754b8b"}
...

MQTT デバイスの testmessage コマンドに GET すると、値が狙い通りに変更されていることがわかります。

$ curl -s http://localhost:48082/api/v1/device/name/MQ_DEVICE/command/testmessage | jq
{
  "device": "MQ_DEVICE",
  "origin": 1579960983817888000,
  "readings": [
    {
      "origin": 1579960983809258000,
      "device": "MQ_DEVICE",
      "name": "message",
      "value": "HIGH"
    }
  ],
  "EncodedEvent": null
}

同様に、REST デバイスの気持ちになってふたつめのルールに合致する値を投げます。

$ curl -X POST -H "Content-Type: text/plain" -d 10 http://192.168.0.100:49986/api/v1/resource/REST_DEVICE/int

もろもろの処理が動き、MQTT デバイス側の値が変わったことが確認できます。

$ curl -s http://localhost:48082/api/v1/device/name/MQ_DEVICE/command/testmessage | jq
{
  "device": "MQ_DEVICE",
  "origin": 1579961107845061000,
  "readings": [
    {
      "origin": 1579961107837083000,
      "device": "MQ_DEVICE",
      "name": "message",
      "value": "LOW"
    }
  ],
  "EncodedEvent": null
}

まとめ

あるデバイスの値の変化をトリガにして、別のデバイスを制御できることが確認できました。

今回は、REST デバイスを使った関係で、実質手動でトリガさせたに等しい状況でしたが、本来はセンサの値をトリガにアクチュエータを動かすような使い方になるでしょう。REST デバイスの代わりに MQTT デバイスや仮想デバイスのランダム値を condition に指定すれば、似たような状況のテストが可能です。

現段階ではシンプルなルールエンジンしか積まれていませんが、今後エコシステムが成熟してアプリケーションサービスなどが充実してくれば、より複雑な処理も可能になることが期待できます。

EdgeX Foundry 関連エントリ


Rapsberry Pi

冬休みの自由研究: EdgeX Foundry (4) MQTT デバイスサービスの追加

EdgeX Foundry 関連エントリ

おさらい

前々回のエントリ では、下図のとおり、バンドルされている仮想デバイスを利用して動作を確認していました。

前々回のエントリで構成されていたデバイスとデバイスサービス

これらの仮想デバイスは、デバイスからのデータの受け取りやデバイスへのコマンド実行などをテストする目的ではたいへん便利ですが、現実世界とのインタラクションはできません。

そこで今回は、MQTT をインタフェイスにもつデバイス(のシミュレータ)を用意し、下図のように EdgeX Foundry がそのデバイスと実際に(MQTT ブローカを介して)インタラクションできる状態を構成します。

MQTT 対応デバイス(のシミュレータ)を EdgeX Foundry の制御下におく

本エントリの作業内容は、エントリ執筆時点の 公式ドキュメント を基にしています。

今回も、GitHub にもろもろ置いてあります ので、こちらをクローンして使います。

$ git clone https://github.com/kurokobo/edgex-lab-handson.git
 
$ cd lab03

新しいデバイスの仕様と動作確認

まずは、今回新しく EdgeX Foundry の制御下におきたいデバイスの仕様を整理します。その後、実際にそのデバイスのシミュレータを動作させ、仕様通りに動くことを確認します。

新しいデバイスの仕様

今回は、以下のような仕様のデバイスを考えます。

  • 二種類のセンサを持つ
    • randfloat32
    • randfloat64
  • 二種類の文字列情報を持つ
    • ping
    • message
  • トピック DataTopic にセンサ randfloat32 の値を 15 秒ごとに配信する
    • {"name":"MQ_DEVICE","cmd":"randfloat32","randfloat32":"<値>"} の形式で配信する
  • トピック CommandTopic でコマンドを待ち受ける
    • {"name":"MQTT_DEVICE","method":"<get または set>","cmd":"<コマンド名>"} の形式のコマンドを受け取る
  • コマンドを受け取ったら、コマンド応じて処理を実行し、結果をトピック ResponseTopic に配信する
    • コマンドのメソッドが set だった場合は、保存されている message を書き換える
    • コマンドが ping だった場合は、pong を返す
    • コマンドが message だった場合は、保存されている message を返す
    • コマンドが randfloat32 または randfloat64 だった場合は、それぞれ対応するセンサの値を返す
    • 結果は {"name":"MQ_DEVICE","method":"<メソッド>","cmd":"<コマンド名>","<コマンド名>":"<値>"} の形式で配信する

デバイス側が定期的にデータを発信するだけでなく、コマンド操作も受け付けるようなデバイスです。ただしよく見ると、二つあるセンサのうち randfloat64 は、値の定期的な自動配信はありません(これは伏線です)。

シミュレータの起動

今回は、上記のようなデバイスを模したシミュレータを利用します。MQTT を扱うにはブローカが不可欠なので、前回のエントリ と同様にまずはこれを起動します。

$ docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto
6de5986ebb1b3715179253857952f143fbf19fa77e201980f8573d96558850fd

続けてデバイス(のシミュレータ)の用意です。

Node.js で MQTT を扱えるコンテナ dersimn/mqtt-scripts で、次のようなスクリプトを起動させます。

function getRandomFloat(min, max) {
    return Math.random() * (max - min) + min;
}

const deviceName = "MQ_DEVICE";
let message = "test-message";

// 1. Publish random number every 15 seconds
schedule('*/15 * * * * *', () => {
    let body = {
        "name": deviceName,
        "cmd": "randfloat32",
        "randfloat32": getRandomFloat(25, 29).toFixed(1)
    };
    publish('DataTopic', JSON.stringify(body));
});

// 2. Receive the reading request, then return the response
// 3. Receive the put request, then change the device value
subscribe("CommandTopic", (topic, val) => {
    var data = val;
    if (data.method == "set") {
        message = data[data.cmd]
    } else {
        switch (data.cmd) {
            case "ping":
                data.ping = "pong";
                break;
            case "message":
                data.message = message;
                break;
            case "randfloat32":
                data.randfloat32 = getRandomFloat(25, 29).toFixed(1);
                break;
            case "randfloat64":
                data.randfloat64 = getRandomFloat(10, 1).toFixed(5);
                break;
        }
    }
    publish("ResponseTopic", JSON.stringify(data));
});

これを起動するには、以下のように作業します。IP アドレスは適宜読み替えてください。

$ cd mqtt-scripts
$ docker run -d --restart=always --name=mqtt-scripts -v "$(pwd):/scripts" dersimn/mqtt-scripts --url mqtt://192.168.0.100 --dir /scripts
Unable to find image 'dersimn/mqtt-scripts:latest' locally
...
9d52b4956ebbec60dd0f93bdf9750ff915fefd8e8c58d3ce8bc7ba1429693d2b

シミュレータの動作確認

ブローカの全トピックを購読します。15 秒ごとに DataTopic にセンサ randfloat32 の値が届いていることがわかります。

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "#" -v
...
DataTopic {"name":"MQ_DEVICE","cmd":"randfloat32","randfloat32":"27.4"}
DataTopic {"name":"MQ_DEVICE","cmd":"randfloat32","randfloat32":"28.6"}

コマンド操作ができることも確認します。別のターミナルで CommandTopic にコマンドを配信します。Windows 環境だと "\" にしないと動かないかもしれません。

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "CommandTopic" -m '{"name":"MQTT_DEVICE","method":"get","cmd":"ping"}'

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "CommandTopic" -m '{"name":"MQTT_DEVICE","method":"get","cmd":"randfloat32"}'

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "CommandTopic" -m '{"name":"MQTT_DEVICE","method":"get","cmd":"randfloat64"}'

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "CommandTopic" -m '{"name":"MQTT_DEVICE","method":"get","cmd":"message"}'

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "CommandTopic" -m '{"name":"MQTT_DEVICE","method":"set","cmd":"message","message":"modified-message"}'

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "CommandTopic" -m '{"name":"MQTT_DEVICE","method":"get","cmd":"message"}'

購読している側では、コマンドが CommandTopic に届き、そのコマンドに応じた応答が ResponseTopic に配信されていることが確認できます。また、message コマンドの結果が、set メソッドの実行前後で変更されていることも確認できます。

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "#" -v
...
CommandTopic {"name":"MQTT_DEVICE","method":"get","cmd":"ping"}
ResponseTopic {"name":"MQTT_DEVICE","method":"get","cmd":"ping","ping":"pong"}
...
CommandTopic {"name":"MQTT_DEVICE","method":"get","cmd":"randfloat32"}
ResponseTopic {"name":"MQTT_DEVICE","method":"get","cmd":"randfloat32","randfloat32":"27.6"}
...
CommandTopic {"name":"MQTT_DEVICE","method":"get","cmd":"randfloat64"}
ResponseTopic {"name":"MQTT_DEVICE","method":"get","cmd":"randfloat64","randfloat64":"8.39883"}
...
CommandTopic {"name":"MQTT_DEVICE","method":"get","cmd":"message"}
ResponseTopic {"name":"MQTT_DEVICE","method":"get","cmd":"message","message":"test-message"}
...
CommandTopic {"name":"MQTT_DEVICE","method":"set","cmd":"message","message":"modified-message"}
ResponseTopic {"name":"MQTT_DEVICE","method":"set","cmd":"message","message":"modified-message"}
...
CommandTopic {"name":"MQTT_DEVICE","method":"get","cmd":"message"}
ResponseTopic {"name":"MQTT_DEVICE","method":"get","cmd":"message","message":"modified-message"}

ここまでで、デバイス(のシミュレータ)が動作している状態が作れました。ここまでは EdgeX Foundry はまったく関係なく、ただ単に前述の仕様のデバイスを論理的に作っただけです。

デバイスサービスの構成

さて、ここからが本題です。先ほど作ったデバイスを EdgeX Foundry の制御下におくことを考えます。

デバイスサービスの概念

現実世界に存在するデバイスは、それぞれが異なるセンサやスイッチを持っていて、異なるプロトコルをインタフェイスに持つため、それぞれに応じた適切な方法で値の取得や制御命令の発行をする必要があります。

この実態を踏まえ、EdgeX Foundry では、実デバイスと EdgeX Foundry の間を取り持ってくれる存在としてデバイスサービスとよばれるマイクロサービスを定義しています。デバイスサービスは、おおむねデバイスの種類ごとに用意するものと思えばよさそうで、つまり、ひとつのデバイスサービスに対してひとつ以上のデバイスの制御を任せられるようです。

デバイスサービスは、以下のような役割を持ちます。

  • コアサービス層に対して、デバイスからの情報取得やデバイスへの制御命令の発行を行える REST エンドポイントを提供する
  • コアサービス層からの制御命令を実デバイスに合わせた命令に翻訳して実行し、実デバイスを制御する
  • デバイスから送られてきた情報をコアサービス層に合わせた情報に翻訳し、コアサービス層に届ける
  • 事前に定められた場合は、定期的にデバイスに対して特定の処理を行い、結果をコアサービス層に届ける

また、デバイスサービスそのものは、以下のような特徴を持ちます。

  • MQTT や Modbus など業界で多く使われるプロトコルに合わせたデバイスサービスはすでに参考実装が用意されており、複雑でない構成の場合は充分に流用できる
  • 参考実装の流用では不足する場合は、提供されている SDK を利用して気軽に自製できる(C 言語または Go 言語)
  • すべてのデバイスサービスはこの SDK を用いて共通のフレームワークのもとで実装されるため、操作方法や設定方法は自ずと共通化され、相互運用性は高まる
  • EdgeX Foundry を取り巻くエコシステムの成熟により、デバイスのベンダがそのデバイス用のデバイスサービスを提供したり、デバイスにデバイスサービスが組み込まれたり、第三者により開発されたデバイスサービスの充実も期待できる

EdgeX Foundry に限った話ではないですが、プラットフォーム系の製品の場合、製品の発展にはエコシステムの成熟が非常に重要です。この観点では、EdgeX Foundry は、商用環境への利用を謳ったバージョン 1.0 のリリースが 2019 年の冬とついこの前なので、まだまだこれからですね。

デバイスサービスの構成要素

デバイスを制御するデバイスサービスを追加するには、以下の 3 つの要素を考える必要があります。

  • デバイスサービスそのもの
    • コアサービスとデバイスとの間を仲介する、C 言語または Go 言語で実装されたマイクロサービス
    • 今回はコンテナとして動作させる
    • 後述のデバイスプロファイルとデバイスサービス設定を読み込んで動作する
  • デバイスプロファイル(Device Profile)
    • YAML ファイルとして定義
    • 管理対象のデバイス種別が持つリソースや、そのリソースが持つ値の意味、それぞれのリソースに対して行える操作などを定義する
  • デバイスサービス設定(Drvice Service Configuration)
    • configuration.toml ファイルとして定義
    • デバイスサービス自身の設定や、連携する EdgeX Foundry の他のマイクロサービスの情報などを定義する
    • デバイスプロファイルと実際のデバイスを紐づけてデバイスを定義する。デバイスやリソースに対する自動実行処理なども定義する
    • その他、デバイスサービスの動作に必要なパラメータを指定する

デバイスプロファイル

今回は、mqtt.test.device.profile.yml を利用します。中身は以下の通りです。

$ cat mqtt/mqtt.test.device.profile.yml
name: "Test.Device.MQTT.Profile"
manufacturer: "Dell"
model: "MQTT-2"
labels:
- "test"
description: "Test device profile"

deviceResources:
- name: randfloat32
  description: "device random number with Base64 encoding"
  properties:
    value:
      { type: "Float32", size: "4", readWrite: "R", defaultValue: "0.00", minimum: "100.00", maximum: "0.00", floatEncoding: "Base64" }
    units:
      { type: "String", readWrite: "R", defaultValue: "" }
- name: randfloat64
  description: "device random number with e notion"
  properties:
    value:
      { type: "Float64", size: "4", readWrite: "R", defaultValue: "0.00", minimum: "100.00", maximum: "0.00", floatEncoding: "eNotation" }
    units:
      { type: "String", readWrite: "R", defaultValue: "" }
- name: ping
  description: "device awake"
  properties:
    value:
      { type: "String", size: "0", readWrite: "R", defaultValue: "oops" }
    units:
      { type: "String", readWrite: "R", defaultValue: "" }
- name: message
  description: "device notification message"
  properties:
    value:
      { type: "String", size: "0", readWrite: "W" ,scale: "", offset: "", base: ""  }
    units:
      { type: "String", readWrite: "R", defaultValue: "" }

deviceCommands:
- name: testrandfloat32
  get:
    - { index: "1", operation: "get", deviceResource: "randfloat32"}
- name: testrandfloat64
  get:
    - { index: "1", operation: "get", deviceResource: "randfloat64"}
- name: testping
  get:
    - { index: "1", operation: "get", deviceResource: "ping"}
- name: testmessage
  get:
    - { index: "1", operation: "get", deviceResource: "message"}
  set:
    - { index: "1", operation: "set", deviceResource: "message"}

coreCommands:
- name: testrandfloat32
  get:
    path: "/api/v1/device/{deviceId}/testrandfloat32"
    responses:
    - code: "200"
      description: "get the random float32 value"
      expectedValues: ["randfloat32"]
    - code: "500"
      description: "internal server error"
      expectedValues: []
- name: testrandfloat64
  get:
    path: "/api/v1/device/{deviceId}/testrandfloat64"
    responses:
    - code: "200"
      description: "get the random float64 value"
      expectedValues: ["randfloat64"]
    - code: "500"
      description: "internal server error"
      expectedValues: []
- name: testping
  get:
    path: "/api/v1/device/{deviceId}/testping"
    responses:
    - code: "200"
      description: "ping the device"
      expectedValues: ["ping"]
    - code: "500"
      description: "internal server error"
      expectedValues: []
- name: testmessage
  get:
    path: "/api/v1/device/{deviceId}/testmessage"
    responses:
    - code: "200"
      description: "get the message"
      expectedValues: ["message"]
    - code: "500"
      description: "internal server error"
      expectedValues: []
  put:
    path: "/api/v1/device/{deviceId}/testmessage"
    parameterNames: ["message"]
    responses:
    - code: "204"
      description: "set the message."
      expectedValues: []
    - code: "500"
      description: "internal server error"
      expectedValues: []

デバイスプロファイルは、デバイスと一対一ではなく、例えばデバイスのモデルごとにひとつ用意するイメージです 。

この例では、冒頭の 6 行でデバイスのモデルを定義しています。先ほど動作させたデバイス(のシミュレータ)は、Dell 製の MQTT-2 というモデルである、という想定で書かれています。

その後、大きく deviceResourcesdeviceCommandscoreCommands、の三つのセクションが続いています。

deviceResources セクションでは、デバイスが持っているリソースと、それが持つ値の意味を決めています。この例では、このデバイスにリソース randfloat64 があり、それは 0.00 から 100.00 の範囲の Float64 型の値を持つと定義されています。また、message リソースは String 型の値を持ち、他のリソースと違って書き換えが可能(W)と定義されていることがわかります。

deviceCommands セクションでは、このデバイスに対して実行できるコマンドを定義し、そのコマンドのメソッド(get または set)ごとに参照または操作されるリソースを紐付けています。例えば、testrandfloat64 コマンドの get メソッドは、先に deviecResources で定義したリソース randfloat64 からの値の読み取りを行えるように定義されています。また、testmessage コマンドでは、set または get メソッドにより、リソース message の値の取得や変更ができるように定義されています。なお、ここでは実装されていませんが、ひとつのメソッドに複数のリソースを紐付けることも可能です。つまり、一回の get 操作で複数のリソースの値を同時に取得するようにも構成できます。Edge Xpert のドキュメントでは、そのような例が解説され ています。

coreCommands セクションでは、コアサービス層からデバイスに対して実行できるコマンドを定義しています。例えば、コアサービスが testrandfloat64 コマンドに GET リクエストを実行すると、その命令はパス /api/v1/device/{deviceId}/testrandfloat64 に渡されます。このパスは、先の deviceCommands で定義した testrandfloat64 コマンドを表すものです。そしてコアサービスは、結果としてレスポンスコード 200 が返ってきた場合は、そのレスポンスボディに含まれる値を randfloat32 の値として取り込むように構成されています。また、testmessage コマンドのみ、ほかのコマンドと異なり PUT リクエストの動作が定義されています。

デバイスサービス設定

今回は、configuration.toml を利用します。中身は以下の通りです。

このファイルは環境依存の固定値を含みます。自分の環境で試す場合は、IP アドレスを環境に合わせて書き換えてください。

$ cat mqtt/configuration.toml
# configuration.toml
[Writable]
LogLevel = 'DEBUG'

[Service]
Host = "edgex-device-mqtt"
Port = 49982
ConnectRetries = 3
Labels = []
OpenMsg = "device mqtt started"
Timeout = 5000
EnableAsyncReadings = true
AsyncBufferSize = 16

[Registry]
Host = "edgex-core-consul"
Port = 8500
CheckInterval = "10s"
FailLimit = 3
FailWaitTime = 10
Type = "consul"

[Logging]
EnableRemote = false
File = "./device-mqtt.log"

[Clients]
  [Clients.Data]
  Name = "edgex-core-data"
  Protocol = "http"
  Host = "edgex-core-data"
  Port = 48080
  Timeout = 50000

  [Clients.Metadata]
  Name = "edgex-core-metadata"
  Protocol = "http"
  Host = "edgex-core-metadata"
  Port = 48081
  Timeout = 50000

  [Clients.Logging]
  Name = "edgex-support-logging"
  Protocol = "http"
  Host ="edgex-support-logging"
  Port = 48061

[Device]
  DataTransform = true
  InitCmd = ""
  InitCmdArgs = ""
  MaxCmdOps = 128
  MaxCmdValueLen = 256
  RemoveCmd = ""
  RemoveCmdArgs = ""
  ProfilesDir = "/custom-config"

# Pre-define Devices
[[DeviceList]]
  Name = "MQ_DEVICE"
  Profile = "Test.Device.MQTT.Profile"
  Description = "General MQTT device"
  Labels = [ "MQTT"]
  [DeviceList.Protocols]
    [DeviceList.Protocols.mqtt]
       Schema = "tcp"
       Host = "192.168.0.100"
       Port = "1883"
       ClientId = "CommandPublisher"
       User = ""
       Password = ""
       Topic = "CommandTopic"
  [[DeviceList.AutoEvents]]
    Frequency = "30s"
    OnChange = false
    Resource = "testrandfloat64"

# Driver configs
[Driver]
IncomingSchema = "tcp"
IncomingHost = "192.168.0.100"
IncomingPort = "1883"
IncomingUser = ""
IncomingPassword = ""
IncomingQos = "0"
IncomingKeepAlive = "3600"
IncomingClientId = "IncomingDataSubscriber"
IncomingTopic = "DataTopic"
ResponseSchema = "tcp"
ResponseHost = "192.168.0.100"
ResponsePort = "1883"
ResponseUser = ""
ResponsePassword = ""
ResponseQos = "0"
ResponseKeepAlive = "3600"
ResponseClientId = "CommandResponseSubscriber"
ResponseTopic = "ResponseTopic"

このファイルで、実際のデバイスを定義したり、必要なパラメータを指定したりします。

前半部分では EdgeX Foundry 内の他のサービスの情報などを指定しています。

実際のデバイスの定義は [[DeviceList]] の部分です。デバイスプロファイル Test.Device.MQTT.Profile に紐づけてデバイス MQ_DEVICE を定義しています。また、このデバイスに対するコマンドの実行に利用する MQTT ブローカも指定しています。

[[DeviceList.AutoEvents]] の部分では、デバイス側からは自動では配信されない値であった randfloat64 の値を定期的に取得するため、30 秒ごとに testrandfloat64 コマンドを実行するように設定しています。

[Driver] の部分では、デバイスサービスがデバイスから情報を受け取るために購読する MQTT ブローカやトピック名を設定しています。

なお、今回はこのファイルの中で実デバイスの定義もしてしまっていますが、デバイスはあとからも追加が可能です。また、このファイルはあくまで MQTT デバイスサービスの設定ファイルなので、利用したいデバイスサービスが違えば当然この設定ファイルで書くべき内容も変わります。

デバイスサービスを含んだ EdgeX Foundry の起動

設定ファイルができたら、デバイスサービスを含む EdgeX Foundry を起動します。

今回は、Docker Compose ファイルに以下を足しています。これにより、 edgexfoundry/docker-device-mqtt-go イメージがコンテナとして実行され、今回用の設定ファイルが含まれる mqtt ディレクトリがコンテナにマウントさたうえで、起動時にはその中身を利用して EdgeX Foundry にデバイスサービスとデバイスが登録されます。

  device-mqtt:
    image: edgexfoundry/docker-device-mqtt-go:1.1.1
    ports:
      - "49982:49982"
    container_name: edgex-device-mqtt
    hostname: edgex-device-mqtt
    networks:
      - edgex-network
    volumes:
      - db-data:/data/db
      - log-data:/edgex/logs
      - consul-config:/consul/config
      - consul-data:/consul/data
      - ./mqtt:/custom-config
    depends_on:
      - data
      - command
    entrypoint:
      - /device-mqtt
      - --registry=consul://edgex-core-consul:8500
      - --confdir=/custom-config

では、実際にこのファイルを利用して EdgeX Foundry を起動させます。

$ docker-compose up -d

動作確認

まずは、デバイスサービスやデバイスが正常に EdgeX Foundry に認識されているか確認し、その後、値の収集やコマンド操作を確認します。

登録状態の確認

以前のエントリで device-virtual の存在を確認した方法 で、今回はデバイスサービス edgex-device-mqtt とデバイス MQ_DEVICE が確認できれば、登録は成功しています。ラクなのは UI か CLI ですね。

また、これまで紹介しませんでしたが、EdgeX Foundry はマイクロサービス群の状態管理に HashiCorp の Consul を利用しており、各サービスの起動状態や登録状態は、この Consul を通じても確認できます。

Consul の GUI には、http://192.168.0.100:8500/ でアクセスできます。

Consul の GUI

ここで、

  • [Services] タブで edgex-device-mqtt が緑色である
  • [Key/Value] タブで edgex > devices > 1.0edgex-device-mqtt がある
  • [Key/Value] タブで edgex > devices > 1.0 > edgex-device-mqtt > DeviceList > 0 > NameMQ_DEVICE である

あたりが確認できれば、登録はできていると考えてよいでしょう。

デバイスから配信されている値の収集の確認

今回のデバイスは、センサ randfloat32 の値を 15 秒ごとに自動的に MQTT トピックに配信していました。デバイスサービスはこれを受け取れるように構成しましたので、EdgeX Foundry に取り込まれているはずです。

これも、以前のエントリで値を確認した方法 で確認できます。これもラクなのは GUI か CLI ですが、例えば、API で確認する場合は以下の通りです。

$ curl -s http://localhost:48080/api/v1/reading/name/randfloat32/3 | jq
[
  {
    "id": "23fdac76-0870-4b9d-b653-561d8a7c0423",
    "created": 1579707885029,
    "origin": 1579707885005379300,
    "modified": 1579707885029,
    "device": "MQ_DEVICE",
    "name": "randfloat32",
    "value": "QdpmZg=="
  },
  {
    "id": "5c086b1d-6b38-48c0-9aef-9b800f9c72ab",
    "created": 1579707870018,
    "origin": 1579707870004742000,
    "modified": 1579707870018,
    "device": "MQ_DEVICE",
    "name": "randfloat32",
    "value": "QeGZmg=="
  },
  {
    "id": "de67c865-0406-408d-b238-d33de1fe1032",
    "created": 1579707855022,
    "origin": 1579707855011006200,
    "modified": 1579707855022,
    "device": "MQ_DEVICE",
    "name": "randfloat32",
    "value": "Qd2Zmg=="
  }
]

時刻はエポックミリ秒(origin はエポックナノ秒)なので、変換すると 15 秒おきであることが確認できます。

$ date -d "@1579707885.029"
Wed 22 Jan 2020 03:44:45 PM UTC

$ date -d "@1579707870.018"
Wed 22 Jan 2020 03:44:30 PM UTC

$ date -d "@1579707855.022"
Wed 22 Jan 2020 03:44:15 PM UTC

デバイスサービスの自動実行イベントの確認

randfloat64 の値は、自動では配信されなかったため、デバイスサービスの設定(configuration.toml)に [[DeviceList.AutoEvents]] を定義して、デバイスサービス側から 30 秒ごとに自動で収集させていました。

これもデータの蓄積を任意の方法で確認できます。例えば CLI で確認する場合は以下の通りです。

$ edgex-cli reading list MQ_DEVICE -l 10 | grep randfloat64
5df1da1a-91db-41cf-99fa-962e87077395    randfloat64     MQ_DEVICE       1579709342806935600     4.924730e+00    15 seconds      15 seconds      50 years
7c33bb76-c9e9-4ef8-a410-80227e236d13    randfloat64     MQ_DEVICE       1579709312695223100     3.734020e+00    45 seconds      45 seconds      50 years
053987f1-5a89-4cc2-a433-03432caa9039    randfloat64     MQ_DEVICE       1579709282611878800     9.320210e+00    About a minute  About a minute  50 years

これも変換すると 30 秒おきであることが確認できます。

$ date -d "@1579709342.806935600"
Wed Jan 22 16:09:02 UTC 2020

$ date -d "@1579709312.695223100"
Wed Jan 22 16:08:32 UTC 2020

$ date -d "@1579709282.611878800"
Wed Jan 22 16:08:02 UTC 2020

デバイスへのコマンド実行の確認

このデバイスは、リソース message の値は EdgeX Foundry から変更できるような仕様で、デバイスプロファイルでもそれに合わせて PUT リクエストの挙動を定義していました。

では、この操作ができることを確認します。

デバイスプロファイルで定義した各コマンドは、内部では REST エンドポイントでリクエストを待ち受けてており、そしてこの URL は、API 経由で確認できます。具体的には、デバイスを示すエンドポイントへの GET リクエストの結果に含まれます。

$ curl -s http://localhost:48082/api/v1/device/name/MQ_DEVICE | jq
...
  "id": "77fccbb8-f33b-42d8-bf21-6d55cdde2068",
  "name": "MQ_DEVICE",
...
  "commands": [
...
      "name": "testmessage",
...
      "get": {
        "path": "/api/v1/device/{deviceId}/testmessage",
...
        "url": "http://edgex-core-command:48082/api/v1/device/77fccbb8-f33b-42d8-bf21-6d55cdde2068/command/4daa4148-7c0e-4cfd-81cc-b2b740bb4de4"
      },
      "put": {
        "path": "/api/v1/device/{deviceId}/testmessage",
...
        "url": "http://edgex-core-command:48082/api/v1/device/77fccbb8-f33b-42d8-bf21-6d55cdde2068/command/4daa4148-7c0e-4cfd-81cc-b2b740bb4de4",
...

EdgeX Foundry がデバイスにコマンドを実行するときは、内部ではこの URL に対して GET なり PUT なりをリクエストするわけです。そしてデバイスサービスがそれを受け取って、設定したとおりに翻訳してデバイスへの操作が実行され応答が返ることになります。

ここでは実際に、確認できた URL にリクエストを投げます。外部からこの URL を直接叩くときは、ホスト名部分を環境に合わせて書き換えたうえで叩きます。今回であれば、次のような操作で現在値が確認できます。

$ curl -s http://localhost:48082/api/v1/device/77fccbb8-f33b-42d8-bf21-6d55cdde2068/command/4daa4148-7c0e-4cfd-81cc-b2b740bb4de4 | jq
{
  "device": "MQ_DEVICE",
  "origin": 1579710432121478400,
  "readings": [
    {
      "origin": 1579710432109675800,
      "device": "MQ_DEVICE",
      "name": "message",
      "value": "test-message"
    }
  ],
  "EncodedEvent": null
}

まだ何も変更していないので、初期値(mqtt-script.js で定義されている)が返ってきました。

では、変更の PUT を投げます。JSON をリクエストボディに含めて PUT します。

$ curl -s -X PUT -d '{"message":"modified-message"}' http://localhost:48082/api/v1/device/77fccbb8-f33b-42d8-bf21-6d55cdde2068/command/4daa4148-7c0e-4cfd-81cc-b2b740bb4de4

エラーがなければ、再度 GET すると、値が変わっていることが確認できます。

ところで、URL は /api/v1/device/<デバイス ID>/command/<コマンド ID> なパスですが、実はわざわざ ID を調べなくても、/api/v1/device/name/<デバイス名>/command/<コマンド名> でも機能します。というわけで、お好みに合わせてどちらを使ってもよいですが、せっかくなので今度はこちらに GET します。

$ curl -s http://localhost:48082/api/v1/device/name/MQ_DEVICE/command/testmessage | jq
{
  "device": "MQ_DEVICE",
  "origin": 1579710963293832000,
  "readings": [
    {
      "origin": 1579710963283338200,
      "device": "MQ_DEVICE",
      "name": "message",
      "value": "modified-message"
    }
  ],
  "EncodedEvent": null
}

値が変わっていることが確認できました。

なお、このような GET や PUT に相当する操作は、GUI でも実行できます。例えば Closure UI であれば、[DEVICES] 画面のデバイス MQ_DEVICEACTION の [Control Device] アイコンから現在値が確認できます。この裏側では、この画面を開く際に、全部のリソースに対して順番に GET が発行されています。

現在値が確認できる

この画面では、PUT リクエストが可能なリソースには ACTION 欄に Set Value アイコンが出るのですが、実行しようとしても手元の環境では入力欄が表示されず使えませんでした……。

Closure UI ではない標準 UI であれば、PUT も実行できました。

Method を set にして send するだけ

まとめ

新しいデバイス(のシミュレータ)を用意し、それに合わせたデバイスサービスを構成して、このデバイスを EdgeX Foundry の制御下においたうえで、実際の動作を確認できました。

実際のシーンでは、本当のデバイスの本当の仕様に合わせてデバイスサービスを構成する必要があり、場合によってはデバイスサービスそれ自体の実装を含めてカスタマイズが必要になる可能性ももちろんあるわけです。

が、今回確認した通り、デバイスや環境に依存する部分(リソースやコマンドの名前や数、MQTT ブローカのホスト情報やトピック名など)は、デバイスプロファイルやデバイスサービス設定として別ファイルで定義できるようになっていて、デバイスサービスの実装それ自体は、設定ファイルに従って淡々と動くだけでもあります。

例えば Raspberry Pi に接続したセンサの値を取り込みたい場合を考えると、実際のセンサ値を MQTT トピックに投げる部分は自製する必要がありますが、であればその際に、逆に今回使ったこのデバイスサービスの仕様を前提にした投げ方にしてしまうことも可能です。そうすれば、このデバイスサービスをそのまま実際に流用できることになります。

今回は MQTT の例でしたが、それ以外のプロトコル用の参考実装も、環境依存の部分が設定ファイル群で外在化できるような汎用性のある形で作られているようなので、それなりに様々なデバイスで使いまわせそうです。

なお、EdgeX Foundry に取り込まれたデータは、前回のエントリで扱ったエクスポートの設定 や、アプリケーションサービスを構成することで、簡単に持ち出せます。コアサービス層に集約される段階で、すでにデバイスの差異は抽象化されているので、もちろん今回の MQTT で収集したデータも他のデバイスの情報とまったく同じ方法でエクスポート可能です。

EdgeX Foundry 関連エントリ


Rapsberry Pi

冬休みの自由研究: EdgeX Foundry (3) エクスポートサービスによるエクスポート

EdgeX Foundry 関連エントリ

エクスポートの仕組み

前回のエントリ では、EdgeX Foundry を稼働させて、仮想デバイスのデータが蓄積される様子を観察しました。続いては、外部へのエクスポートを試します。

EdgeX Foundry は、エッジ内で閉じて利用することももちろん可能ですが、例えば処理したデータをクラウドに投げたい、オンプレミスの別のシステムに投げたい、など、EdgeX Foundry の外部へデータを連携したいシーンも考えられます。

このようなときに、エクスポートサービスを構成することで、コアサービスに届けられたデータをリアルタイムに外部に送れるようになります。

赤枠が今回関係するところ

EdgeX Foundry では、エクスポート先をエクスポートサービスのクライアントとして定義しています。クライアントは、クライアント登録サービス(図中の CLIENT REGISTRATION、今回の構成では edgex-export-client コンテナ)を通じてデータベースに接続情報を保存することで新規に登録できます。

エクスポート先へのデータの配信は、実際には配信サービス(図中 DISTRIBUTION、edgex-export-distro コンテナ)が行っています。コアサービス層からのイベントの配信を受けて、データベースに登録されたクライアントにデータを送出する役割を担っています。

エクスポートの準備

今回は、エクスポート方法として、

  • MQTT トピックへの配信によるエクスポート
  • REST エンドポイントへの POST によるエクスポート

の二つのパターンを構成します。

前述の通り、エクスポート先は エクスポートサービスのクライアントと考えるため、実際には、

  • MQTT トピックをエクスポートクライアントとして登録する
  • REST エンドポイントをエクスポートクライアントとして登録する

という操作を行います。

さて、今回も GitHub にもろもろ置いてあります ので、こちらをクローンして使います。

$ git clone https://github.com/kurokobo/edgex-lab-handson.git

$ cd lab02

MQTT ブローカの準備

エクスポートクライアントとして MQTT トピックを登録するため、そのトピックを提供する MQTT ブローカを作ります。

本来の用途を考えれば、エッジからさらにフォグかクラウドに送るイメージなので、test.mosquitto.orgCloudMQTT などのクラウドっぽいサービスに投げるほうがそれっぽいですが、今回はお試しなので、ブローカはローカルに立ててしまいます。

といっても、コンテナイメージがあるので、おもむろに起動させるだけです。

$ docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto
Unable to find image 'eclipse-mosquitto:latest' locally
...
52e107224959223a3132466b5356278f637daa855aadc3a1345c71c193deb4df

起動できたら、簡単にテストします。クライアントもコンテナのイメージが(非公式ですが)あるのでお借りして、適当なトピック名を指定して購読を開始してから、

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "edgex-handson-topic" -d
Unable to find image 'efrecon/mqtt-client:latest' locally
...
Client mosqsub|6-1f6d3fb35f68 sending CONNECT
Client mosqsub|6-1f6d3fb35f68 received CONNACK (0)
Client mosqsub|6-1f6d3fb35f68 sending SUBSCRIBE (Mid: 1, Topic: edgex-handson-topic, QoS: 0)
Client mosqsub|6-1f6d3fb35f68 received SUBACK
Subscribed (mid: 1): 0

別のコンテナで同じトピックに配信します。

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "edgex-handson-topic" -d -m "TEST MESSAGE"
Client mosqpub|6-96175cb072bd sending CONNECT
Client mosqpub|6-96175cb072bd received CONNACK (0)
Client mosqpub|6-96175cb072bd sending PUBLISH (d0, q0, r0, m1, 'edgex-handson-topic', ... (20 bytes))
Client mosqpub|6-96175cb072bd sending DISCONNECT

購読している側で、メッセージが配信されてくることが確認できれば成功です。

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "edgex-handson-topic" -d
...
TEST MESSAGE

デバッグメッセージが出て邪魔な場合は、-d オプションを消すと静かになります。

REST エンドポイントの準備

続いて、エクスポートクライアントとして登録する REST エンドポイントを作ります。

登録すると、REST エンドポイント側には単なる POST リクエストで届くので、POST されたリクエストボディが確認できれば動作確認には充分です。ここでは、Python の Flask で、リクエストボディを標準出力に吐くだけの簡単な Web サーバを作ります。

from flask import Flask, request
from datetime import datetime

app = Flask(__name__)


@app.route("/api/v1/echo", methods=["POST"])
def echo():
    print("--\n{}".format(datetime.now()))
    print(request.get_data().decode("utf-8"))
    return "OK", 200


if __name__ == "__main__":
    app.run(debug=True, host="0.0.0.0", port=5000)

リポジトリには配置済みなので、クローンしてある場合はそのまま起動できます。

$ cd lab02/rest-endpoint
$ python ./main.py
 * Serving Flask app "main" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: on
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 159-538-312
 * Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)

待ち受けができたら、cURL や Postman などで適当な POST リクエストを発行します。

$ curl -X POST -d '{"message": "TEST MESSAGE"}' http://192.168.0.100:5000/api/v1/echo
OK

Web サーバの出力に POST したリクエストボディが表示されれば成功です。

$ python ./main.py
...
--
2020-01-21 00:03:26.503181
{"message": "TEST MESSAGE"}
192.168.0.220 - - [21/Jan/2020 00:03:26] "POST /api/v1/echo HTTP/1.1" 200 -

エクスポートクライアントの登録

ここまでで、エクスポートされればその中身が確認できる状態が整ったので、続けて EdgeX Foundry 側に実際に MQTT トピックや REST エンドポイントをエクスポートクライアントとして登録していきます。

EdgeX Foundry が起動していない場合は、前回のエントリ なども参考にして起動させます。

$ cd lab02
$ docker-compose up -d

エクスポートクライアントの追加は、API や GUI で行えます。CLI では現時点では難しそうです。

API でのエクスポートクライアントの登録

API によるエクスポートクライアントの登録は、edgex-export-client のエンドポイントに JSON で POST することで行えます。cURL や Postman などで実行できます。

MQTT トピックをエクスポートクライアントとして登録するには、以下のような JSON を組んで、

{
    "name": "edgex-handson-mqtt",
    "addressable": {
        "name": "edgex-handson-mqttbroker",
        "protocol": "tcp",
        "address": "192.168.0.100",
        "port": 1883,
        "topic": "edgex-handson-topic"
    },
    "format": "JSON",
    "enable": true,
    "destination": "MQTT_TOPIC"
}

これを http://localhost:48071/api/v1/registration に POST します。 登録されたクライアントの ID が返ってきます。

$ curl -X POST -d '{"name":"edgex-handson-mqtt","addressable":{"name":"edgex-handson-mqttbroker","protocol":"tcp","address":"192.168.0.100","port":1883,"topic":"edgex-handson-topic"},"format":"JSON","enable":true,"destination":"MQTT_TOPIC"}' http://localhost:48071/api/v1/registration
ebdde555-001b-4798-817a-da75b92406c7

登録したらその時点からエクスポートが開始されるので、指定した MQTT トピックを購読すれば、値が届いていることが確認できます。

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "edgex-handson-topic" -d
...
Client mosqsub|6-1f6d3fb35f68 received PUBLISH (d0, q0, r0, m0, 'edgex-handson-topic', ... (258 bytes))
{"id":"ac6d9f85-7c9f-4300-b6e1-f51f8b8e0e69","device":"Random-Integer-Device","origin":1579534468929275000,"readings":[{"id":"8c9267a0-c8a1-4e9e-9a60-90d7df8966fc","origin":1579534468917108600,"device":"Random-Integer-Device","name":"Int16","value":"8409"}]}
Client mosqsub|6-1f6d3fb35f68 received PUBLISH (d0, q0, r0, m0, 'edgex-handson-topic', ... (263 bytes))
{"id":"0539ed5c-68c3-4a12-a936-743ced169f7d","device":"Random-Integer-Device","origin":1579534468955468400,"readings":[{"id":"4b52dc78-efd9-4234-982c-1a17f21640f3","origin":1579534468943312700,"device":"Random-Integer-Device","name":"Int32","value":"-54648128"}]}

続けて、REST エンドポイントへのエクスポートを追加します。POST する JSON は以下です。

{
    "name": "edgex-handson-rest",
    "addressable": {
        "name": "edgex-handson-restendpoint",
        "protocol": "http",
        "method": "POST",
        "address": "192.168.0.100",
        "port": 5000,
        "path": "/api/v1/echo"
    },
    "format": "JSON",
    "enable": true,
    "destination": "REST_ENDPOINT"
}

POST する先は同様に http://localhost:48071/api/v1/registration です。

$ curl -X POST -d '{"name":"edgex-handson-rest","addressable":{"name":"edgex-handson-restendpoint","protocol":"http","method":"POST","address":"192.168.0.100","port":5000,"path":"/api/v1/echo"},"format":"JSON","enable":true,"destination":"REST_ENDPOINT"}' http://localhost:48071/api/v1/registration
6092b921-b173-4de7-8441-f89461734c17

こちらも追加した段階からエクスポートが開始されるので、REST エンドポイント側で、POST リクエストが来ていることが確認できます。

$ python ./main.py
...
--
2020-01-21 00:47:45.055939
{"id":"a0f8347d-e3d0-4bfc-ae20-98fcb9714224","device":"Random-Integer-Device","origin":1579535264923285900,"readings":[{"id":"9c5313ef-4a04-48f1-bcd4-cea06a08c0f5","origin":1579535264898458900,"device":"Random-Integer-Device","name":"Int16","value":"-18516"}]}
192.168.0.100 - - [21/Jan/2020 00:47:45] "POST /api/v1/echo HTTP/1.1" 200 -
--
2020-01-21 00:47:45.181942
{"id":"7cf0d138-ee3d-47e0-8875-877a5ea1111d","device":"Random-Integer-Device","origin":1579535264973814800,"readings":[{"id":"372eb7bf-9037-43ed-aa53-6d9cd4cafc5a","origin":1579535264962783700,"device":"Random-Integer-Device","name":"Int32","value":"-1557838489"}]}
192.168.0.100 - - [21/Jan/2020 00:47:45] "POST /api/v1/echo HTTP/1.1" 200 -

API でのエクスポートクライアントの確認

登録済みのエクスポート設定は、GET リクエストで確認できます。

$ curl -s http://localhost:48071/api/v1/registration | jq
[
  {
    "id": "ebdde555-001b-4798-817a-da75b92406c7",
    "created": 1579534374854,
    "modified": 1579534374854,
    "origin": 0,
    "name": "edgex-handson-mqtt",
...
  },
  {
    "id": "6092b921-b173-4de7-8441-f89461734c17",
    "created": 1579535149239,
    "modified": 1579535149239,
    "origin": 0,
    "name": "edgex-handson-rest",
...
  }
]

API でのエクスポートクライアントの削除

削除は DELETE リクエストで行えます。エンドポイントはそれぞれ以下です。

  • 削除対象を ID で指定する場合
    • http://localhost:48071/api/v1/registration/id/<ID>
  • 削除対象を名前で指定する場合
    • http://localhost:48071/api/v1/registration/name/<名前>

cURL で実行する場合は以下が例です。

$ curl -X DELETE http://localhost:48071/api/v1/registration/id/ebdde555-001b-4798-817a-da75b92406c7
true

$ curl -X DELETE http://localhost:48071/api/v1/registration/name/edgex-handson-rest
true

GUI でのエクスポートクライアントの登録

ここまで API でがんばってきましたが、前回のエントリ で紹介した Closure UI では、エクスポートクライアントの追加も削除も非常に楽に行えます。

http://localhost:8080/ にアクセスしてログイン後、[EXPORT] を開きます。

まだ何も登録されていない

ここから、画面に従って値を入れていけば完了です。例えば今回の MQTT トピックをクライアントとして登録する場合は、以下のように入力します。

項目
DestinationMQTT Topic
Nameedgex-handson-mqtt
Enableオン(緑)
Export formatJSON
ProtocolTCP
Address192.168.0.100
Port1883
Topicedgex-handson-topic
上記以外デフォルト値

REST エンドポイントの場合は次のようにします。

項目
DestinationREST Endpoint
Nameedgex-handson-rest
Enableオン(緑)
Export formatJSON
ProtocolHTTP
Address192.168.0.100
Port5000
Path/api/v1/echo
上記以外デフォルト値
登録後の状態

GUI でのエクスポートクライアントの削除

削除は、登録後の画面で、削除したいクライアントの右端、[ACTION] 欄にある [Delete Export] アイコンをクリックするだけです。簡単ですね。

削除前に確認が出る

アプリケーションサービスによるエクスポート

……と、意気揚々と書いてきたものの、今回利用したエクスポートサービスは最近のリリースではすでに廃止されており、エクスポートの機能は アプリケーションサービス に統合されているようです。

アプリケーションサービスは、データに対して変換やフィルタなどさまざまな処理を行えますが、その処理のひとつとして、今回行ったようなエクスポートも行えます。複数の処理をパイプライン化してつなげることで、例えば特定のデバイスの値だけフィルタしたあとにエクスポート、などが可能です。

エクスポートサービスでは、すべてのデータがエクスポートサービスを通るため、データ量やエクスポート先が多い場合にパフォーマンス面での問題が考えられますが、アプリケーションサービスでは、各々がコアサービス層のメッセージバスに直接つないでデータを受け取れるようにすることで、こうした問題に対応しているようですね。

SDK も提供されており、自製しなくても単純なフィルタやエクスポートは app-service-configurable を利用して簡単に行えそうです。

アプリケーションサービスを使ったエクスポートは、別のエントリで紹介 していますので、併せてどうぞ。

まとめ

エクスポートサービスを構成して、EdgeX Foundry で収集したデータを外部に送れることが確認できました。エクスポート先の任意のシステムでは、これらの値を取り込んで処理すればよいわけですね。

GUI の画面上やドキュメントでは、GCP や AWS、Azure の IoT サービスもエクスポート先として挙げられていますが、現段階でどこまで動くかは確認できていません。すでに証明書などを用いた認証はできそうなので、どこかで試してみます。

EdgeX Foundry 関連エントリ