Skip to main content

Cluster API で vSphere 上の Kubernetes クラスタを管理する

きっかけ

実験したいことが出てきてしまい、自宅で Kubernetes を触りたくなりました。

これまで Kubernetes を触る場合は Google Kubernetes Engine(GKE)ばかりを使っていたのですが、 今回実験したいのは IoT の世界でいうエッジ側の話なので、できればオンプレミス相当の Kuberentes クラスタが欲しいところです。

そんなわけで、これ幸いと自宅の vSphere 環境で Cluster API を叩いてゲストクラスタを作ることにしました。Cluster API は、2019 年に VMware から発表された VMware Tanzu や Project Pacific の実装でも使われているそうで、そういう意味でも興味のあるところです。

VMware Tanzu や Project Pacific は、エントリの本筋ではないので細かくは書きませんが、めっちゃ雑に書くと、vSphere と Kubernetes がイケてる感じにくっついたヤツです。

Cluster API とは

Cluster API は、Kubernetes っぽいお作法で Kubernetes クラスタそれ自体を管理できる仕組みです。Kubernetes の SIG のプロジェクト として開発が進められています。

最新バージョンは 2019 年 9 月にリリースされた v1alpha2 で、現在は v1alpha3 が開発中です。バージョン名を見てもわかる通り、現段階ではいわゆるアルファ版ですし、ドキュメントでも『プロトタイプだよ』『どんどん変わるからね』と記載されているので、ごりごりに使い込むのはまだちょっと待ったほうがよさそうですね。

Cluster API is still in a prototype stage while we get feedback on the API types themselves. All of the code here is to experiment with the API and demo its abilities, in order to drive more technical feedback to the API design. Because of this, all of the codebase is rapidly changing.

https://github.com/kubernetes-sigs/cluster-api/blob/master/README.md

Kubernetes クラスタを構成するノードは、多くの場合は仮想マシンです。その仮想マシンは、パブリッククラウド上だったりオンプレミスの vSphere 上や OpenStack 上だったりで動いているわけですが、Cluster API では、そうしたプラットフォームごとに Provider なる実装が用意されており、環境差異を抽象化してくれます。これにより、異なる環境でも同一の操作感で Kubernetes クラスタを管理できます。

今回は vSphere 環境上の Kubernetes クラスタの構成が目的なので、vSphere 用の Provider を使って作業します。

構成要素と構成の流れ

最終的には、いわゆる Kubernetes らしく業務や開発で様々なアプリケーションを動作させることになる Kubernetes クラスタと、それらを管理するためだけの Kubernetes クラスタ、の大きく二種類の Kubernetes クラスタができあがります。

前者がゲストクラスタ(ワークロードクラスタ)、後者がマネジメントクラスタなどと呼ばれるようです。Cluster API はこのうちのマネジメントクラスタに組み込まれており、この Cluster API によってゲストクラスタのライフサイクルを簡単に管理できるということです。

ざっくりイメージ

もう少し具体的にいえば、例えば Kubernetes クラスタ自体は cluster リソースとして、あるいはそれを構成するノードの仮想マシンは machine リソースとして扱えるようになり、通常の pod リソースや deployment リソースと同じように、自分以外の Kubernetes クラスタの構成が管理できるということです。

構築の観点では、つまりマネジメントクラスタができさえすれば Cluster API 環境はほぼ完成と言えるわけですが、実際にはマネジメントクラスタ自身もマネジメントクラスタで管理するため、手順はちょっと複雑です。

マネジメントクラスタの作り方はいくつかあるようですが、今回は vSphere 用 Provider の Getting Started の通り、以下のような流れで構成を進めます。

作業の流れのイメージ
  1. 作業用端末(図中 Workstation)に Docker や kuberctl など必要なモノを揃えて、マニフェストファイルを生成する
  2. Docker 上に作業用の Kubernetes クラスタ(ブートストラップクラスタ)を作り、Cluster API を導入する
  3. 導入した Cluster API を使って、マニフェストファイルに従って本当のマネジメントクラスタを作る
  4. マネジメントクラスタに Cluster API 環境を移行(Pivoting)して、ブートストラップクラスタを消す

最終的なマネジメントクラスタを作るためにさらに別の Kubernetes クラスタ(ブートストラップクラスタ)が必要なあたりがわりとややこしいですが、そういうものみたいです。

ここまでできたら、Cluster API の本来の利用方法の通り、マニフェストファイルに従ってゲストクラスタを作ったり消したり拡張したり縮小したりできます。

ここまでの作業はこれをやるためのただのお膳立てです

構築の準備

前述した流れの通り、マネジメントクラスタを構築するには、ブートストラップクラスタが動作できる環境が必要です。また、マネジメントクラスタが動作する vSphere 環境でも、少し準備が必要です。

作業端末の整備

作業前提を整えます。作業用の端末は、ブートストラップクラスタの動作と、それを用いたマネジメントクラスタの構築ができる必要があり、このために、

が必要です。Windows でもおそらく動くとは思いますが、今回は作業用の Ubuntu 端末を別に用意して使っています。

Docker は適当に入れます。今回の端末は Ubuntu 19.10 なので、19.04 用のバイナリを無理やり入れます。

$ sudo apt update
$ sudo apt install apt-transport-https ca-certificates curl software-properties-common
$ curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
$ sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu disco stable"
$ sudo apt update
$ sudo apt install docker-ce docker-ce-cli containerd.io
$ sudo usermod -aG docker ${USER}

clusterctl と Kind、kubectl はバイナリをダウンロードするだけです。

$ curl -Lo ./clusterctl https://github.com/kubernetes-sigs/cluster-api/releases/download/v0.2.9/clusterctl-darwin-amd64
$ chmod +x ./clusterctl
$ mv ./clusterctl ~/bin
$ curl -Lo ./kind https://github.com/kubernetes-sigs/kind/releases/download/v0.6.1/kind-$(uname)-amd64
$ chmod +x ./kind
$ mv ./kind ~/bin
$ curl -Lo ./kubectl https://storage.googleapis.com/kubernetes-release/release/`curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt`/bin/linux/amd64/kubectl
$ chmod +x ./kubectl
$ mv ./kubectl ~/bin

が、clusterctl は v1alpha2 の時点ですでに DEPRECATED 扱いでした。

しかしながら代替手段がよくわかっていないし、Cluster API のドキュメント 通りではなにやらうまく動かない(kubeconfig 用の secret ができあがらない)ので、v1alpha3 が出て情報が増えてきたらどうにかします。

あと、Kind の最新リリースは現時点で 0.7.0 ですが、それを使うと後続の作業がうまく動かなかったので、ひとつ古い 0.6.1 を指定しています。

vSphere 環境での OVA テンプレートの展開

vSphere 環境用の Provider を使って Kubernetes クラスタを作った場合、最終的にできあがる Kubernetes のノードは vSphere 環境上の仮想マシンです。

この仮想マシンの元になるテンプレートが用意されているので、これをあらかじめデプロイして、テンプレートに変換しておきます。

Ubuntu 版と CentOS 版がありますが、今回は Ubuntu 版の最新の 1.16.3 を使いました。

その他のイメージのリンクは リポジトリに一覧 されています。

vSphere 環境でのフォルダとリソースプールの作成

マネジメントクラスタとゲストクラスタを入れるフォルダ(仮想マシンインベントリのヤツ)とリソースプールを作ります。入れ物としてただあればよいので、設定は適当で大丈夫です。

マネジメントクラスタの構成

では、実際の構築を進めます。ブートストラップクラスタを作り、そこで Cluster API を動作させて、それを通じて最終的なマネジメントクラスタを作ります。

はじめに、構成に必要な環境変数を、envvars.txt にまとめて定義します。内容はそれぞれの環境に依存するので書き換えが必要です。

$ cat envvars.txt
# vCenter config/credentials
export VSPHERE_SERVER='192.168.0.201'                  # (required) The vCenter server IP or FQDN
export VSPHERE_USERNAME='administrator@vsphere.local'  # (required) The username used to access the remote vSphere endpoint
export VSPHERE_PASSWORD='my-secure-password'           # (required) The password used to access the remote vSphere endpoint

# vSphere deployment configs
export VSPHERE_DATACENTER='kuro-dc01'                                 # (required) The vSphere datacenter to deploy the management cluster on
export VSPHERE_DATASTORE='nfs-ds01'                                   # (required) The vSphere datastore to deploy the management cluster on
export VSPHERE_NETWORK='ext-vm'                                       # (required) The VM network to deploy the management cluster on
export VSPHERE_RESOURCE_POOL='k8s'                                    # (required) The vSphere resource pool for your VMs
export VSPHERE_FOLDER='k8s'                                           # (optional) The VM folder for your VMs, defaults to the root vSphere folder if not set.
export VSPHERE_TEMPLATE='template_ubuntu-1804-kube-v1.16.3'           # (required) The VM template to use for your management cluster.
export VSPHERE_DISK_GIB='50'                                          # (optional) The VM Disk size in GB, defaults to 20 if not set
export VSPHERE_NUM_CPUS='2'                                           # (optional) The # of CPUs for control plane nodes in your management cluster, defaults to 2 if not set
export VSPHERE_MEM_MIB='2048'                                         # (optional) The memory (in MiB) for control plane nodes in your management cluster, defaults to 2048 if not set
export SSH_AUTHORIZED_KEY='ssh-rsa AAAAB...6Ix0= kuro@kuro-ubuntu01'  # (optional) The public ssh authorized key on all machines in this cluster

# Kubernetes configs
export KUBERNETES_VERSION='1.16.3'     # (optional) The Kubernetes version to use, defaults to 1.16.2
export SERVICE_CIDR='100.64.0.0/13'    # (optional) The service CIDR of the management cluster, defaults to "100.64.0.0/13"
export CLUSTER_CIDR='100.96.0.0/11'    # (optional) The cluster CIDR of the management cluster, defaults to "100.96.0.0/11"
export SERVICE_DOMAIN='cluster.local'  # (optional) The k8s service domain of the management cluster, defaults to "cluster.local"

続けて、このファイルを使って、マネジメントクラスタの構成を定義したマニフェストファイル群を生成します。この作業のための専用のコンテナイメージが用意されているので、これに食べさせます。

$ docker run --rm \
  -v "$(pwd):/out" \
  -v "$(pwd)/envvars.txt":/envvars.txt:ro \
  gcr.io/cluster-api-provider-vsphere/release/manifests:latest \
  -c management-cluster
Checking 192.168.0.201 for vSphere version
Detected vSphere version 6.7.1
Generated ./out/management-cluster/addons.yaml
Generated ./out/management-cluster/cluster.yaml
Generated ./out/management-cluster/controlplane.yaml
Generated ./out/management-cluster/machinedeployment.yaml
Generated /build/examples/pre-67u3/provider-components/provider-components-cluster-api.yaml
Generated /build/examples/pre-67u3/provider-components/provider-components-kubeadm.yaml
Generated /build/examples/pre-67u3/provider-components/provider-components-vsphere.yaml
Generated ./out/management-cluster/provider-components.yaml
WARNING: ./out/management-cluster/provider-components.yaml includes vSphere credentials

これで、./out/management-cluster 配下にマニフェストファイル群が出力されました。

$ ls -l ./out/management-cluster/
total 268
-rw-r--r-- 1 kuro kuro  19656 Feb 11 08:54 addons.yaml
-rw-r--r-- 1 kuro kuro    933 Feb 11 08:54 cluster.yaml
-rw-r--r-- 1 kuro kuro   3649 Feb 11 08:54 controlplane.yaml
-rw-r--r-- 1 kuro kuro   2576 Feb 11 08:54 machinedeployment.yaml
-rw-r--r-- 1 kuro kuro 240747 Feb 11 08:54 provider-components.yaml

で、あとはこれらを clusterctl に食べさせるだけです。

$ clusterctl create cluster \
  --bootstrap-type kind \
  --bootstrap-flags name=management-cluster \
  --cluster ./out/management-cluster/cluster.yaml \
  --machines ./out/management-cluster/controlplane.yaml \
  --provider-components ./out/management-cluster/provider-components.yaml \
  --addon-components ./out/management-cluster/addons.yaml \
  --kubeconfig-out ./out/management-cluster/kubeconfig
NOTICE: clusterctl has been deprecated in v1alpha2 and will be removed in a future version.
I0211 08:55:09.414116    2332 createbootstrapcluster.go:27] Preparing bootstrap cluster
I0211 08:55:59.745617    2332 clusterdeployer.go:82] Applying Cluster API stack to bootstrap cluster
...
I0211 08:56:02.302440    2332 clusterdeployer.go:87] Provisioning target cluster via bootstrap cluster
...
I0211 08:56:02.454960    2332 applymachines.go:46] Creating machines in namespace "default"
I0211 08:59:12.512879    2332 clusterdeployer.go:105] Creating target cluster
...
I0211 08:59:13.394821    2332 clusterdeployer.go:123] Pivoting Cluster API stack to target cluster
...
I0211 08:59:56.665878    2332 clusterdeployer.go:164] Done provisioning cluster. You can now access your cluster with kubectl --kubeconfig ./out/management-cluster/kubeconfig
I0211 08:59:56.666586    2332 createbootstrapcluster.go:36] Cleaning up bootstrap cluster.

主要なログだけ抜粋していますが、これだけで、

  1. Kind を使って、作業端末の Docker 上にブートストラップクラスタを構築する
  2. ブートストラップクラスタに Cluster API を導入する
  3. その Cluster API を使って、マニフェスト通りに vSphere 環境上にマネジメントクラスタをデプロイする
    • 仮想マシンをテンプレートからデプロイする
    • 仮想マシンをクラスタのコントロールプレーンとして構成する
  4. ブートストラップクラスタ上に構成していた Cluster API の環境をマネジメントクラスタに移行する(Pivoting)

が実行され、最終的な形でマネジメントクラスタができあがります。

完成したマネジメントクラスタへの接続に必要な情報は、./out/management-cluster/kubeconfig に保存されています。kubectl の設定ファイルをこれに切り替えてコマンドを実行すると、マネジメントクラスタ自体の情報や、machine リソースとしての自分自身の存在が確認できます。

$ export KUBECONFIG="$(pwd)/out/management-cluster/kubeconfig"

$ kubectl cluster-info
Kubernetes master is running at https://192.168.0.27:6443
KubeDNS is running at https://192.168.0.27:6443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy

To further debug and diagnose cluster problems, use 'kubectl cluster-info dump'.

$ kubectl get machines
NAME                                       PROVIDERID                                       PHASE
management-cluster-controlplane-0          vsphere://42069f1f-21ac-45be-69b9-e94702d3062b   running

vSphere 環境では、仮想マシン management-cluster-controlplane-0 の存在が確認できるはずです。

ここまででマネジメントクラスタができたので、Cluster API 環境は完成です。あとは好きなように Cluster API を使ってゲストクラスタを作ったり消したり拡張したり縮小したりできます。

ゲストクラスタの構成

実際に、Cluster API を使って、新しくゲストクラスタを構成します。

Cluster API は、Kubernetes のお作法で Kubernetes クラスタ自体が管理できるので、つまり、クラスタの構成情報も、Pod や Deployment などほかの Kubernetes リソースと同じように、マニフェストファイルで定義されます。よって、ゲストクラスタを作るには、その構成を定義したマニフェストファイルが必要です。

本来はきちんと中身を書くべきっぽいですが、マネジメントクラスタ用のマニフェストファイルを作ったのと同じ方法でゲストクラスタ用のマニフェストファイル群も作れるので、ここではそれを利用します。

$ docker run --rm \
  -v "$(pwd):/out" \
  -v "$(pwd)/envvars.txt":/envvars.txt:ro \
  gcr.io/cluster-api-provider-vsphere/release/manifests:latest \
  -c workload-cluster-1
Checking 192.168.0.201 for vSphere version
Detected vSphere version 6.7.1
Generated ./out/workload-cluster-1/addons.yaml
Generated ./out/workload-cluster-1/cluster.yaml
Generated ./out/workload-cluster-1/controlplane.yaml
Generated ./out/workload-cluster-1/machinedeployment.yaml
Generated /build/examples/pre-67u3/provider-components/provider-components-cluster-api.yaml
Generated /build/examples/pre-67u3/provider-components/provider-components-kubeadm.yaml
Generated /build/examples/pre-67u3/provider-components/provider-components-vsphere.yaml
Generated ./out/workload-cluster-1/provider-components.yaml
WARNING: ./out/workload-cluster-1/provider-components.yaml includes vSphere credentials

ゲストクラスタの定義と、コントロールプレーンの定義が、

  • ./out/workload-cluster-1/cluster.yaml
  • ./out/workload-cluster-1/controlplane.yaml

に含まれます。また、ワーカノードの定義は、

  • ./out/workload-cluster-1/machinedeployment.yaml

です。自分でマニフェストをいじる場合は、この辺をどうにかする必要があるということですね。

実際にデプロイするには、kubectl の接続先をマネジメントクラスタに切り替えてから、先の 3 つのファイルをマネジメントクラスタに突っ込みます。

$ export KUBECONFIG="$(pwd)/out/management-cluster/kubeconfig"

$ kubectl apply -f ./out/workload-cluster-1/cluster.yaml
cluster.cluster.x-k8s.io/workload-cluster-1 created
vspherecluster.infrastructure.cluster.x-k8s.io/workload-cluster-1 created

$ kubectl apply -f ./out/workload-cluster-1/controlplane.yaml
kubeadmconfig.bootstrap.cluster.x-k8s.io/workload-cluster-1-controlplane-0 created
machine.cluster.x-k8s.io/workload-cluster-1-controlplane-0 created
vspheremachine.infrastructure.cluster.x-k8s.io/workload-cluster-1-controlplane-0 created

$ kubectl apply -f ./out/workload-cluster-1/machinedeployment.yaml
kubeadmconfigtemplate.bootstrap.cluster.x-k8s.io/workload-cluster-1-md-0 created
machinedeployment.cluster.x-k8s.io/workload-cluster-1-md-0 created
vspheremachinetemplate.infrastructure.cluster.x-k8s.io/workload-cluster-1-md-0 created

この作業によって、まずコントロールプレーンがデプロイされ、続けてワーカノードがデプロイされます。vSphere 環境でも順次仮想マシンがデプロイされパワーオンされていく様子が観察できるでしょう。

デプロイが完了すると、以下のように、マネジメントクラスタが Kubernetes クラスタ自体を cluster リソースや machine リソースとして管理できている状態になります。

$ kubectl get clusters
NAME                 PHASE
management-cluster   provisioned
workload-cluster-1   provisioned

$ kubectl get machines
NAME                                       PROVIDERID                                       PHASE
management-cluster-controlplane-0          vsphere://42069f1f-21ac-45be-69b9-e94702d3062b   running
workload-cluster-1-controlplane-0          vsphere://4206f835-1e8d-3473-943f-0e2cc4b04319   running
workload-cluster-1-md-0-78469c8cf9-fr22j   vsphere://4206661b-9be8-cede-eeee-68ddbbdeb872   running
workload-cluster-1-md-0-78469c8cf9-jnqnw   vsphere://4206861f-e133-4a53-2008-3346c81ed8e3   running

ゲストクラスタへの接続情報は、secret リソースとして保持されています。これを kubeconfig として保存することで、kubectl でゲストクラスタに接続できるようになります。

$ kubectl get secrets
NAME                            TYPE                                  DATA   AGE
...
workload-cluster-1-kubeconfig   Opaque                                1      
...

$ kubectl get secret workload-cluster-1-kubeconfig -o=jsonpath='{.data.value}' | \
  { base64 -d 2>/dev/null || base64 -D; } >./out/workload-cluster-1/kubeconfig

実際に接続すれば、当たり前ですがゲストクラスタやノードの情報が確認できます。

$ export KUBECONFIG="$(pwd)/out/workload-cluster-1/kubeconfig"

$ kubectl cluster-info
Kubernetes master is running at https://192.168.0.28:6443
KubeDNS is running at https://192.168.0.28:6443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy

To further debug and diagnose cluster problems, use 'kubectl cluster-info dump'.

$ kubectl get podsnodes
NAME                                       STATUS     ROLES    AGE     VERSION
workload-cluster-1-controlplane-0          NotReady   master   3m14s   v1.16.3
workload-cluster-1-md-0-78469c8cf9-fr22j   NotReady   <none>   90s     v1.16.3

表示されている通り、この段階ではノードは NotReady です。これは CNI プラグインが構成されていないからで、雑にいえば、この Kubernetes クラスタ内のコンテナネットワークに使う実装を明示する必要があるということです。

コンテナネットワークの実装には Flannel とか Calico とかいろいろありますが、ゲストクラスタ用マニフェスト群の中にある ./out/workload-cluster-1/addons.yaml で Calico を構成できるので、今回はこれを使います。

適用してしばらく待つと、ノードが Ready になり、ゲストクラスタの完成です。

$ kubectl apply -f ./out/workload-cluster-1/addons.yaml 
configmap/calico-config created
...
serviceaccount/calico-kube-controllers created

$ kubectl get nodes
NAME                                       STATUS   ROLES    AGE     VERSION
workload-cluster-1-controlplane-0          Ready    master   4m22s   v1.16.3
workload-cluster-1-md-0-78469c8cf9-fr22j   Ready    <none>   2m38s   v1.16.3

ゲストクラスタの拡張

Kubernetes って、アプリケーションのスケールアウトが kubectl scale だけでできてめっちゃラクですよね。

というのと同じノリで、ゲストクラスタもめっちゃラクにスケールアウトできるので、やってみましょう。

ノードの管理はマネジメントクラスタから行うので、接続先を切り替えて、machinedeployment をスケールさせます。

$ export KUBECONFIG="$(pwd)/out/management-cluster/kubeconfig"

$ kubectl scale md workload-cluster-1-md-0 --replicas=3
machinedeployment.cluster.x-k8s.io/workload-cluster-1-md-0 scaled

$ kubectl get machines
NAME                                       PROVIDERID                                       PHASE
management-cluster-controlplane-0          vsphere://42069f1f-21ac-45be-69b9-e94702d3062b   running
workload-cluster-1-controlplane-0          vsphere://4206f835-1e8d-3473-943f-0e2cc4b04319   running
workload-cluster-1-md-0-78469c8cf9-fr22j   vsphere://4206661b-9be8-cede-eeee-68ddbbdeb872   running
workload-cluster-1-md-0-78469c8cf9-jnqnw   vsphere://4206861f-e133-4a53-2008-3346c81ed8e3   running
workload-cluster-1-md-0-78469c8cf9-nxvht   vsphere://4206ec88-6607-1699-8051-e1447a448983   running

これだけでノードが 3 台になりました。仮想マシンも増えています。

ノードが増えた様子

簡単ですね。

ゲストクラスタのロードバランサの構成

おまけです。

vSphere が組み込みでロードバランサを持っていないから仕方ないですが、 現状、Cluster API でゲストクラスタを構成するときには、ロードバランサは構成できません。

このままだと、ゲストクラスタで kubectl expose--type=LoadBalancer しても EXTERNAL-IP が永遠に pending のままで、外部に公開できません。

GitHub でも issue があります し、めっちゃがんばると多分オンプレミス環境でも NSX-T とかでどうにかできるとは思いますが、その域に達していないので、とりあえず MetalLB を突っ込んで解決します。

MetalLB とは

ロードバランサが使えずにサービスを外部に公開できない、というのは、ベアメタル環境で Kubernetes を使うときによく遭遇する話題のようで、そういうヒト向けの仮想ロードバランサの実装です。

実際には正しい負荷分散にはならないみたいですが、ものすごく気軽に使えますし、アクセス経路の提供という意味では充分機能するので便利です。

MetalLB の構成と初期設定

公式のドキュメント に従います。インストール用のマニフェストファイルを突っ込んだあと、

$ kubectl apply -f https://raw.githubusercontent.com/google/metallb/v0.8.3/manifests/metallb.yaml
namespace/metallb-system created
...
deployment.apps/controller created

ドキュメントの Layer 2 Configuration の通りに設定用マニフェストファイルを作って突っ込みます。IP アドレスのレンジは任意で修正します。

$ cat metallb.yaml 
apiVersion: v1
kind: ConfigMap
metadata:
  namespace: metallb-system
  name: config
data:
  config: |
    address-pools:
    - name: default
      protocol: layer2
      addresses:
      - 192.168.0.50-192.168.0.99

$ kubectl apply -f metallb.yaml 
configmap/config created

これだけです。

動作確認

Kubernetes のチュートリアルのヤツ を作って、

$ kubectl apply -f https://k8s.io/examples/service/load-balancer-example.yaml
deployment.apps/hello-world created

$ kubectl get pods
NAME                          READY   STATUS    RESTARTS   AGE
hello-world-f9b447754-6rrt9   1/1     Running   0          68s
hello-world-f9b447754-b6psq   1/1     Running   0          68s
hello-world-f9b447754-ml5w8   1/1     Running   0          68s
hello-world-f9b447754-nprbn   1/1     Running   0          68s
hello-world-f9b447754-wpgv5   1/1     Running   0          68s

expose します。

$ kubectl expose deployment hello-world --type=LoadBalancer
service/hello-world exposed

$ kubectl get services
NAME          TYPE           CLUSTER-IP       EXTERNAL-IP    PORT(S)          AGE
hello-world   LoadBalancer   100.69.189.221   192.168.0.50   8080:32015/TCP   6s
kubernetes    ClusterIP      100.64.0.1       <none>         443/TCP          4h55m

$ kubectl describe services hello-world
...
Endpoints:                100.113.190.67:8080,100.113.190.68:8080,100.97.109.3:8080 + 2 more...
...

EXTERNAL-IP に MetalLB で設定したレンジの IP アドレスが振られていますし、エンドポイントも 5 つです。

実際にこの IP アドレスにアクセスすると、正しく表示が返ってきます。無事に動いているようです。

$ curl http://192.168.0.50:8080/
Hello Kubernetes!

まとめ

vSphere 環境で Cluster API を使って Kubernetes クラスタを管理できる状態を整えました。

最初の作業はちょっとだけ手間ですが、いちどできてしまうとスケールも相当楽なので使いやすそうです。ロードバランサ周りが NSX-T などふくめいい感じにできるようになってくると、活用の幅も広がりそうですね。

VMware Tanzu や Project Pacific でも Kubernetes クラスタのライフサイクル管理は謳われているので、この手の作業が導入作業も含めて GUI や API 経由でポチポチ簡単にできるようになるのだろうと勝手に思っています。GA になったら触ってみます。


おうち IoT 用の LINE ボットをもう少し賢くする

これまでの LINE ボットの課題

2018 年に、家のエアコンを操作してくれる LINE ボットを作って以来、すでに一年半くらい運用しています。概要は 当時のエントリでも紹介しています が、LINE での会話を元に自宅のエアコンを操作してくれるものです。

これ、作った当初に想定していた以上にたいへん便利で、外出先で家族と『そろそろロボくんにお願いしておこう』などの会話が発生する程度には実際に活用されていました。この手の『作ってみた』系は、長期的な運用が定着する前に使わなくなることが多い印象もあり、これは小規模ではあるもののうまくいった例と言えるかと思います。

当時の LINE ボットの様子

一方で、作りが甘い部分もあって、

  • Lambda の Node.js のランタイムで EoL が迫っていた
    • そもそも Node.js のランタイム側の更新に追従していくことに将来的にけっこう体力を使いそうな印象がある
    • 当時 Node.js を選んだ理由は『とりあえずいちど触ってみたかった』からというだけで、すでにその目的は達成できた
  • ひとつの Lambda に全機能を詰め込んでいて、どう考えてもイケてないアーキテクチャだ
  • 機能が足りない
    • エアコンのオンオフと温度の変更はできたが、冷房と暖房の切り替え機能を実装していない

など、長期的な運用に耐えられるよう全体を作り直したいモチベーションも強くなってきていました。

そんな中、あるイベントに参加して、『Raspberry Pi とセンサとクラウドサービスを使って何でもいいから個人で何かを作る』という活動をすることになり、タイミングもよかったので、この LINE ボットの作り直しを進めることにしました。

できたもの

で、こういう風に進化して、今も元気に動いています。

現在の LINE ボットの様子

以下のような機能が付いています。

  • エアコンの操作
    • オンオフ、冷暖房の切り替え、温度の変更、現在の設定の確認をしてくれます
  • 加湿器の操作
    • オンオフをしてくれます
  • 空調センサ
    • 室内に設置したセンサを使って、気温、湿度、気圧、二酸化炭素濃度の現在の値を教えてくれます
    • 任意の時間分の履歴をグラフ化して送ってくれます
  • 家計簿の記録
    • 支払者、使途、金額を Google Sheets に追記してくれます
  • 毎朝の自動制御
    • 毎朝、部屋が寒い(暑い)場合は、暖房(冷房)を付けてくれます
    • 毎朝、部屋が乾燥している場合は、加湿器を付けてくれます

実装

実装を簡単に紹介します。末尾には GitHub へのリンクも載せています。

全体像

全体はこのような構成です。

こけおどし全体像

参加したイベントの性質上、なるべくたくさんの要素技術を取り込んだほうが評価が上がる仕組みだったため、あえてまわりくどく冗長な構成になっている部分もあります。

センシングと蓄積

センシングと蓄積は、図の以下の部分です。これまで複数回にわたって紹介してきた EdgeX Foundry を中心に構成しています。

センシングと蓄積の部分

EdgeX Foundry 自体は、家庭内の IoT ゲートウェイサーバとして構築した Ubuntu 仮想マシン(ESXi 上で動作)の中で動いています。このゲートウェイサーバ上では MQTT ブローカも動かしています。

Raspberry Pi は、センサ値を読んで MQTT トピックにひたすら放り込み続けるだけの係です。EdgeX Foundry は、この Raspberry Pi を MQTT デバイスとして制御するよう構成されており、所定の MQTT トピックに投げ込まれたデータを取り込んで、アプリケーションサービスの機能を使ってクラウド上の所定の宛先にデータをエクスポートしています。

エクスポート先は、Mosquitto のパブリック MQTT トピックと、Pivotal Web Service(PWS)上のワーカアプリケーションで、なんやかんやされて最終的にデータは InfluxDB Cloud 2.0、Redis Cloud、MongoDB mLab に蓄積されます。

十数年前に PIC や Arduino でセンサやアクチュエータを操作していた頃は、データシートとにらめっこをしながら必要な抵抗を計算したり回路を考えたりしていましたが、最近のヤツは単にデータを読み取るだけなら恐ろしいくらい簡単ですね…… けっこう衝撃でした。

今回は BME280 と MH-Z19 を使っています。安いですし精度は悪いのでしょうが、厳密な値は必要としていないのでよしとしています。最初は DS18B20 も使っていましたが、BME280 を組み込んでからは使っていません。

アクチュエーション

最終的にはエアコンと加湿器が操作されるわけですが、この部分の実装は、前者は Nature Remo の API、後者は非公式の野良 API で制御しています。

アクチュエーションの部分

Nature Remo の API を叩く役は、AWS の所定の Lambda に一任しています。加湿器も同様ですが、こちらは非公式 API の都合上、ローカルネットワーク内からしか制御できなかったので、Lambda からいちど IoT Core の MQTT トピックに命令をなげ、それをローカルネットワーク側から購読して後続の処理をトリガさせています。ここだけは Node.js です。

加湿器は、モデルの選定がなかなかたいへんでした。加湿器に限らず何らかの家電を外部から自動制御する場合、

  • スマートコンセントを利用する
    • コンセントが通電したら目的の動作が開始される仕様である必要がある
  • スマートスイッチ(SwitchBot など)を利用する
    • 目的のボタンが、物理的にスマートスイッチで押下できる形状であり、かつスマートスイッチの力で押せるだけの硬さである必要がある
  • スマートリモコンを利用する
    • RF ではなく赤外線を利用したリモコンで制御できる機器である必要がある
  • 専用の API を利用する
    • API が公開されている必要がある

のいずれかの条件を満たす必要がありますが、加湿器でこれに合致するモデルが全然見つけられませんでした。大体の加湿器は、コンセントの通電後にさらにボタンを押さないと動作しませんし、ボタンの周辺の形状や寸法やボタンの硬さは公開されていませんし、リモコン付きのモデルでも赤外線でなく RF ですし、公式の API はなさそうですし。

結局、公開されていない API を無理やり叩ける非公式のライブラリが存在していてハックの余地がありそう、ということがわかった Oittm のアロマディフューザー を採用しています。ただしこれも、

  • 非公式のライブラリ で制御できるようにするには、公式アプリケーション Tuya Smart の旧バージョン(3.12.6 以前)を使って、MITM 攻撃的なヤツでアプリケーションの通信を自分で盗聴してキーを入手しないといけない
  • ライブラリが Node.js 向けしかない
  • そもそもの加湿器自体が小型でとても非力で、リビングなど広い空間の加湿には向かない

など、ベストとは言い難い状態です。改善の余地ありです。

インタフェイスとインタラクション

インタフェイス役の LINE ボットに話しかけると、LINE の Messaging API 経由で AWS の Lambda に届いて、メッセージ本文がパタンマッチされてその内容に応じて後続の処理がトリガされます。

インタラクションの部分

エアコンの操作であれば Nature Remo を制御する Lambda を呼びますし、加湿器の操作であればそれ用の別の Lambda を呼びます。センサのデータが必要な処理であれば、外部のデータベースに必要なデータを取りに行く Lambda を呼びます。

インタフェイスとしての LINE

グラフを要求された場合は、InfluxDB からデータを取ってきて matplotlib で描画したあとに S3 に保存し、それをプッシュでユーザに送ります。描画する対象(温度、湿度、気圧……)と長さ(N 分、M 時間)はメッセージ本文から都度判断します。

実際に生成されたグラフの例

この辺りの処理では、

  • グラフ要求のメッセージが来たら、受理した旨の返事を後続処理に先行して即座にしてしまい、画像の生成と送信はあとから非同期で行う
    • グラフの生成に時間がかかるため、 同期的に処理させると(Lambda ではなく)API Gateway がタイムアウトする
  • S3 のバケット名や画像ファイル名をなるべく短くする
    • LINE で画像を送るときは、画像そのものではなく画像の URL を送る必要がある
    • Lambda で発行する S3 の署名付き URL は 1,000 文字前後とめちゃくちゃ長い(x-amz-security-token が含まれるため)
    • LINE 側の制約で、URL は 1,000 文字以内でないとエラーで送れない
    • 署名付き URL ではなくパブリックな URL にしてしまうのは気が引ける
    • バケット名やファイル名は URL に含まれるため、短くすることでギリギリ 1,000 文字以内になる

などの工夫が必要でした。特に二点目はひどい回避策ですが、あまり権限をがばがばにはしたくなかったので仕方なく……。

また、LINE とはまったく関係ないところで、Grafana を PWS で動かしており、そこでもグラフを見られるようにしています。

感触とロードマップ

センサの値がグラフで見られるのが、実際に使ってみると思っていた以上におもしろかったです。

例えばエアコンをつけてから温度が上がっていく様子や、朝起きて部屋で人間が活動しだしてから二酸化炭素濃度が一気に上がる様子、逆に家が無人になってから下がる様子、キッチンで火を使う調理を始めたタイミングなど、思っていた以上に生活パタンが可視化されることがわかりました。

また、二酸化炭素濃度や気圧の変化では眠気や頭痛の誘発なども懸念されるので、体調に違和感を覚えたときに客観的な変化をすぐ確認できるのはうれしく、体調管理面でも地味に役立っています。

運用面では、Lambda まわりをすべて CloudFormation で定義したことで、コードの管理とデプロイがだいぶに楽になりました。クラウド側がマネージドサービスとサーバレスアーキテクチャだけで組めているのも安心感があります。

ただ、本エントリの途中で、

参加したイベントの性質上、なるべくたくさんの要素技術を取り込んだほうが評価が上がる仕組みだったため、あえてまわりくどく冗長な構成になっている部分もあります

と書いた通り、正直なところ機能に比較して実装が大げさすぎるので、この辺はスリムにしたいと思っています。

イベントはもう終わったので、現状のゴテゴテ感を保つ意味はあまりなく、例えば、

  • この規模だと EdgeX Foundry の恩恵は受けにくいので、まるっと削って Raspberry Pi から直接クラウドに投げてもよいだろう
  • データの蓄積場所も AWS の DynamoDB などにしてしまえば、PWS も InfluxDB も Redis も MongoDB もいらなくなり、AWS に全部寄せられるだろう
  • いっそ Raspberry Pi も AWS IoT Greengrass で AWS から管理させるとよいのでは

などのダイエットや試行錯誤を検討中です。とくに Greengrass は完全に未修分野なので純粋に興味もあり触ってみたいですね。

物理的には、加湿器をもう少し高機能(大容量)かつ自動制御しやすいものに替えたい気持ちがあります。オンオフは自動で制御できても、給水が自動化しづらいので、結局は人間の介入が必要な状態になってしまっており、人間が手を抜くにはタンクの容量がキモになりそうだと考えています。

関連リポジトリ

一覧します。

  • raspi-airmeasurment
    • Raspberry Pi 上で動作させる、各種センサ値を読み取って MQTT トピックに送る Python プログラム
  • edgex-lab-raspi
    • Raspberry Pi から MQTT トピックに送られた値の取り込みや、クラウドへのエクスポートを行えるように構成した EdgeX Foundry の Docker Compose ファイル
  • edgex-lab-export2db
    • PWS 上で動作させる、EdgeX Foundry からエクスポートされたデータを各種データベースに保存するためのプログラム群
    • Telegraf 用のイメージは Docker Hub の kurokobo/edgex-lab-telegraf に配置済み
  • grafana-with-flux
    • PWS 上で動作させる Grafana
  • sam-smarthome-api
    • AWS 上で動作させる一連の Lambda 群
  • tuya-mqtt
    • MQTT の配信を受けて加湿器をコントロールする Node.js プログラム


Rapsberry Pi

冬休みの自由研究: EdgeX Foundry (6) アプリケーションサービスによるエクスポート

EdgeX Foundry 関連エントリ

エクスポート方法の新旧

エクスポートサービスを取り扱ったエントリの最後で、以下のように書きました。

……と、意気揚々と書いてきたものの、今回利用したエクスポートサービスは最近のリリースではすでに廃止されており、エクスポートの機能は アプリケーションサービス に統合されているようです。

冬休みの自由研究: EdgeX Foundry (3) エクスポートサービスによるエクスポート

アプリケーションサービスに関する公式ドキュメント では、エクスポートサービスの利用はまだサポートされるものの、今後はアプリケーションサービスの利用が推奨されています。

エクスポートサービスを利用したエクスポートの欠点として、

  • エクスポートサービスは、登録されているすべてのクライアントにデータを順次配信するが、これがボトルネックになり、エンドポイント側の受信が遅延する
  • 登録されているクライアントを管理することそれ自体のオーバヘッドと複雑さが EdgeX Foundry にとって負担である
  • 特定のクラウドへのエクスポートに限定して作りこむのではなく、SDK を提供してクラウドプロバイダにとらわれない状態にすべきだ

などが挙げられています。ここでいう SDK とそれを使った実装がアプリケーションサービスなわけですが、アプリケーションサービスでは、

  • コアサービス(データを ZeroMQ で配る Core Data)に直接つなげられることで、パフォーマンスの問題を排除する
  • 開発者には、データが利用可能になったら直ちにそれに対して何らかの処理を行える手段が提供される
  • 登録作業が不要である

などの点でよい感じになっています。

いまいち正しくない気もしますが、ざっくりまとめると、下図の上が下になったので、

上がこれまで、下がこれから

エクスポートサービス(のディストリビューションサービス)がボトルネックにならなくなったし、値を好きに処理できる場所もできたよ! ということかと思います。

今回試すこと

そういうわけで、エクスポートサービスのなくなった世界でも生きていけるよう、エクスポートサービスを扱ったエントリと同様に、MQTT トピックと REST エンドポイントへ、アプリケーションサービスを使ってエクスポートできる状態を作ります。

今回も GitHub にもろもろ置いてあります ので、こちらをクローンして使います。

$ git clone https://github.com/kurokobo/edgex-lab-handson.git
 
$ cd lab05

アプリケーションサービス

上図のアプリケーションサービスの箱の中には、ファンクション(Fn で表記)を並べました。アプリケーションサービスでは、このように任意のファンクション群をパイプライン化してデータフローを定義できます。

例えば簡単な例では、標準で用意されているファンクションを並べるだけでも、

  1. デバイス名でフィルタするファンクション
  2. リソース名でフィルタするファンクション
  3. JSON 形式に変換するファンクション
  4. MQTT でエクスポートするファンクション

などのパイプラインが構成できます。現時点で利用できるファンクションは、SDK の README.md で確認できます。

構成の考え方

標準で実装されているアプリケーションサービスとして、app-service-configurable がありますが、これは、ひとつのパイプラインをひとつのインスタンスで処理する前提で作られているようです。また、パイプラインそのものは、インスタンスごとの設定ファイル configuration.toml で定義します。

つまり、コンテナ環境の例でいえば、冒頭の絵のように、

  • 最終的に MQTT トピックにエクスポートする app-service-configurable コンテナ
  • 最終的に REST エンドポイントにエクスポートする app-service-configurable コンテナ

など、app-service-configurable が、目的に応じて複数起動している状態を作るということです。

なお、コアサービスのホスト名やポート番号、MQTT ではブローカのホスト名など環境に依存する情報は、コンテナの場合は環境変数で Docker Compose ファイルから渡せるようになっています。エクスポートサービスでは API を叩いてクライアントとして登録する作業が必要でしたが、すべての構成をあらかじめファイルとして定義しておけるということですね。

設定ファイルの確認

コンテナの場合、デフォルトでコンテナ内の /res に、目的別に事前構成された configuration.toml がディレクトリを分けて配置されています。そして、起動時に渡す環境変数で、どのディレクトリの中身を利用するか指定します。

例えば、MQTT トピックへのエクスポートで利用する configuration.toml は以下です。

$ cat mqtt-export/configuration.toml
...
  [Writable.Pipeline]
    ExecutionOrder = "TransformToJSON, MQTTSend, MarkAsPushed"

    [Writable.Pipeline.Functions.TransformToJSON]
    [Writable.Pipeline.Functions.MarkAsPushed]
    [Writable.Pipeline.Functions.FilterByDeviceName]
      [Writable.Pipeline.Functions.FilterByDeviceName.Parameters]
        DeviceNames = ""
    [Writable.Pipeline.Functions.FilterByValueDescriptor]
      [Writable.Pipeline.Functions.FilterByValueDescriptor.Parameters]
        ValueDescriptors = ""
    [Writable.Pipeline.Functions.MQTTSend]
      [Writable.Pipeline.Functions.MQTTSend.Parameters]
        qos="0"
        key=""
        autoreconnect="false"
        retain="false"
        cert=""
        persistOnError = "false"
      [Writable.Pipeline.Functions.MQTTSend.Addressable]
        Address=   "localhost"
        Port=      1883
        Protocol=  "tcp"
        Publisher= "AppServiceConfigurable-mqtt-export"
        User=      ""
        Password=  ""
        Topic=     "edgex-events"
...

ExecutionOrder に含まれる関数がパイプラインとして順次実行されていきます。この例だと、JSON に変換してから MQTT トピックに送信しています。

利用する関数は下で列挙されていますが、ExecutionOrder に含まれないものは実際には利用されないようです。デバイス名でフィルタする関数(FilterByDeviceName)なども書かれていますが、使いたいなら自分で ExecutionOrder に書いてね、ということでしょう。

また、MQTT ブローカのホスト名に localhost が指定されていたり、トピック名が edgex-events になっていたりしますが、先述のとおり、この辺りは環境変数で書き換えますのでこのままで問題ありません。

同様に、REST エンドポイントへのエクスポートで利用する設定ファイルは以下です。

$ cat http-export/configuration.toml
...
  [Writable.StoreAndForward]
    Enabled = false
    RetryInterval = "5m"
    MaxRetryCount = 10

  [Writable.Pipeline]
    UseTargetTypeOfByteArray = false
    ExecutionOrder = "FilterByDeviceName, TransformToJSON, HTTPPostJSON, MarkAsPushed"

    [Writable.Pipeline.Functions.TransformToJSON]
    [Writable.Pipeline.Functions.MarkAsPushed]
    [Writable.Pipeline.Functions.FilterByDeviceName]
      [Writable.Pipeline.Functions.FilterByDeviceName.Parameters]
        DeviceNames = ""
    [Writable.Pipeline.Functions.FilterByValueDescriptor]
      [Writable.Pipeline.Functions.FilterByValueDescriptor.Parameters]
        ValueDescriptors = ""
    [Writable.Pipeline.Functions.HTTPPostJSON]
      [Writable.Pipeline.Functions.HTTPPostJSON.Parameters]
        url = "http://"
        persistOnError = "false"
...

環境変数による柔軟なオーバライド

設定ファイルに含まれるパラメータは、実装から推測すると、おそらくすべて環境変数で書き換えが可能です。セクション名の区切り文字を _ に置換して、さらに _<パラメータ名> を付与して環境変数として与えれば、オーバライドされるようでした。

例えば、ExecutionOrder 自体も、Writable_Pipeline_ExecutionOrder 環境変数として与えられます。例えば先述の mqtt-exportExecutionOrder には FilterByDeviceName が含まれせんが、環境変数で以下を与えることで、このフィルタも有効化できます。

  • 環境変数 Writable_Pipeline_ExecutionOrder
    • FilterByDeviceName, TransformToJSON, MQTTSend, MarkAsPushed
  • 環境変数 Writable_Pipeline_Functions_FilterByDeviceName_Parameters_DeviceNames
    • <デバイス名>

今回の構成の定義

では、今回用の構成を作ります。先の通り、環境依存の値は環境変数経由で与えればよいので、今回編集するのは、Docker Compose ファイルです。

Docker Compose ファイルには、もともと、ルールエンジンにデータを送る用の app-service-rules が定義されています。

$ cat docker-composer.yml
...
  app-service-rules:
    image: edgexfoundry/docker-app-service-configurable:1.0.0
    ports:
      - "48096:48096"
    container_name: edgex-app-service-configurable-rules
    hostname: edgex-app-service-configurable-rules
    networks:
      edgex-network:
        aliases:
          - edgex-app-service-configurable-rules
    environment:
      <<: *common-variables
      edgex_service: http://edgex-app-service-configurable-rules:48096
      edgex_profile: rules-engine
      Service_Host: edgex-app-service-configurable-rules
      MessageBus_SubscribeHost_Host: edgex-core-data
    depends_on:
      - consul
      - logging
      - data
...

今回は、これに加えて、MQTT トピックにエクスポートする app-service-mqtt-export と、REST エンドポイントにエクスポートする app-service-http-export を追加しています。

$ cat docker-composer.yml
...
  app-service-mqtt-export:
    image: edgexfoundry/docker-app-service-configurable:1.0.0
    ports:
      - "48097:48097"
    container_name: edgex-app-service-configurable-mqtt-export
    hostname: edgex-app-service-configurable-mqtt-export
    networks:
      edgex-network:
        aliases:
          - edgex-app-service-configurable-mqtt-export
    environment:
      <<: *common-variables
      edgex_service: http://edgex-app-service-configurable-mqtt-export:48097
      edgex_profile: mqtt-export
      Service_Host: edgex-app-service-configurable-mqtt-export
      MessageBus_SubscribeHost_Host: edgex-core-data
      Writable_Pipeline_Functions_MQTTSend_Addressable_Address: 192.168.0.100
      Writable_Pipeline_Functions_MQTTSend_Addressable_Port: 1883
      Writable_Pipeline_Functions_MQTTSend_Addressable_Protocol: tcp
      # Writable_Pipeline_Functions_MQTTSend_Addressable_Publisher: 
      # Writable_Pipeline_Functions_MQTTSend_Addressable_User: 
      # Writable_Pipeline_Functions_MQTTSend_Addressable_Password: 
      Writable_Pipeline_Functions_MQTTSend_Addressable_Topic: edgex-handson-topic
      # Writable_Pipeline_Functions_MQTTSend_Parameters_Qos: 
      # Writable_Pipeline_Functions_MQTTSend_Parameters_Key: 
      # Writable_Pipeline_Functions_MQTTSend_Parameters_Cert: 
      # Writable_Pipeline_Functions_MQTTSend_Parameters_Autoreconnect: 
      # Writable_Pipeline_Functions_MQTTSend_Parameters_Retain: 
      # Writable_Pipeline_Functions_MQTTSend_Parameters_PersistOnError: 
    volumes:
      - ./app-service-configurable:/res
    depends_on:
      - consul
      - logging
      - data

  app-service-http-export:
    image: edgexfoundry/docker-app-service-configurable:1.0.0
    ports:
      - "48098:48098"
    container_name: edgex-app-service-configurable-http-export
    hostname: edgex-app-service-configurable-http-export
    networks:
      edgex-network:
        aliases:
          - edgex-app-service-configurable-http-export
    environment:
      <<: *common-variables
      edgex_service: http://edgex-app-service-configurable-http-export:48098
      edgex_profile: http-export
      Service_Host: edgex-app-service-configurable-http-export
      MessageBus_SubscribeHost_Host: edgex-core-data
      Writable_Pipeline_Functions_HTTPPostJSON_Parameters_url: http://192.168.0.100:5000/api/v1/echo
      # Writable_Pipeline_Functions_HTTPPostJSON_Parameters_persistOnError: 
    volumes:
      - ./app-service-configurable:/res
    depends_on:
      - consul
      - logging
      - data 
...

使っているコンテナイメージは、いずれもまったく同じです。ボリュームも同じマウントのしかたをしていますが、使う設定ファイルを edgex_profile で制御しています。また、読ませる設定ファイルに応じて、必要な設定を環境変数で入れています。

MQTT ブローカや REST エンドポイントのホスト名などは環境に合わせて適宜修正してください。ほかにも、configuration.toml で変更したい値があれば、環境変数として与えると変えられます。

また、今回はエクスポートサービスが起動しないようにコメントアウトしています。

起動

下準備として、MQTT ブローカを起動して、購読を開始します。

$ docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "#" -v

別のターミナルで REST エンドポイントも起動します。

$ cd rest-endpoint

$ python ./main.py
 * Serving Flask app "main" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: on
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 159-538-312
 * Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)

この状態で、EdgeX Foundry を起動します。

$ docker-compose up -d

動作確認

必要な設定はすべて環境変数を使って与えていたので、何もしないでも MQTT トピックと REST エンドポイントに値が届き出します。

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "#" -v
...
edgex-handson-topic {"id":"a11011bb-30cb-44fa-885c-8929c64e35b1","device":"Random-UnsignedInteger-Device","origin":1580015685340757500,"readings":[{"id":"26f50397-1e8b-4ca0-8db2-eeeee958a643","origin":1580015685326443600,"device":"Random-UnsignedInteger-Device","name":"Uint8","value":"90"}]}
edgex-handson-topic {"id":"526631fd-a0f7-437d-98a9-4f6eb8a06775","device":"Random-UnsignedInteger-Device","origin":1580015685369824800,"readings":[{"id":"91ef8066-5c31-4377-84e6-a6d517457f62","origin":1580015685356418200,"device":"Random-UnsignedInteger-Device","name":"Uint16","value":"27009"}]}
...
$ python ./main.py
...
--
2020-01-26 14:15:10.318596
{"id":"be20b21b-f0a4-4550-95bb-a78b5c597f56","device":"Random-Integer-Device","origin":1580015710310854200,"readings":[{"id":"d62fdacc-bfa4-4d05-a537-25c03e151f78","origin":1580015710295318700,"device":"Random-Integer-Device","name":"Int8","value":"3"}]}
192.168.0.100 - - [26/Jan/2020 14:15:10] "POST /api/v1/echo HTTP/1.1" 200 -
--
2020-01-26 14:15:10.375595
{"id":"c0275732-2e3d-4945-9435-d6e77e0ccb74","device":"Random-Integer-Device","origin":1580015710364584600,"readings":[{"id":"067d71a3-7c1f-4fea-968d-ac5db0410e67","origin":1580015710350134100,"device":"Random-Integer-Device","name":"Int16","value":"-23040"}]}
192.168.0.100 - - [26/Jan/2020 14:15:10] "POST /api/v1/echo HTTP/1.1" 200 -
...

簡単ですね。

これらの値は、冒頭で紹介した通り、それぞれのアプリケーションサービスが直接コアサービス層から値を受け取って処理したうえでエクスポートしています。データパスが分散して並列化するので、エクスポートサービスを使っていた頃に存在していたボトルネックが回避されます。

まとめ

エクスポートサービスを利用しないエクスポート手段として、アプリケーションサービスを構成して試しました。

使ってみると、確かにこのアーキテクチャのほうが素直で圧倒的に使い勝手が良いです。家の環境もこっちに置き換えました。

現段階ではバンドルされている関数は多くないですが、今後この辺りも充実してくることが期待できそうです。

EdgeX Foundry 関連エントリ


Rapsberry Pi

冬休みの自由研究: EdgeX Foundry (5) ルールエンジンによる自動制御

EdgeX Foundry 関連エントリ

今回のゴール

これまでのエントリでは、仮想デバイスや MQTT デバイスを用いて、デバイスからの情報の収集やデバイスへのコマンドの実行を試してきました。

今回は、ルールエンジンの動作、つまり、

  • 何らかのデバイスのリソースの値が
  • 何らかの条件を満たしたら
  • 別のデバイスでコマンドを実行する

ような自動制御を実際に試します。

前回のエントリ で構成した MQTT デバイスだけを利用してもすぐできるのですが、それだけだとおもしろくないので、新しいデバイスサービスをひとつ追加して、それを組み込みます。具体的には、

  • ルール (1)
    • REST デバイスから送られる値が
    • 80 を越えたら
    • MQTT デバイスにメッセージ HIGH を送信する
  • ルール (2)
    • REST デバイスから送られる値が
    • 20 を下回ったら
    • MQTT デバイスにメッセージ LOW を送信する

ような状態を目指します。図示すると以下のような状態です。

REST デバイスをトリガに MQTT デバイスが自動制御される

今回も GitHub にもろもろ置いてあります ので、こちらをクローンして使います。

$ git clone https://github.com/kurokobo/edgex-lab-handson.git

$ cd lab04

REST デバイスの追加

まだ開発途中で正式リリースには至っていないようですが、device-rest-go と名付けられたデバイスサービスがあります。

REST デバイスサービスの概要

これは、以下のようなデバイスサービスです。

  • REST で POST してくるデバイスに利用する
  • POST されたリクエストボディの中身を値として EdgeX Foundry に取り込む

このデバイスサービスを構成すると、エンドポイントとして

  • /api/v1/resource/<デバイス名>/<リソース名>

が作成され、これに対して POST することで値を取り込んでくれるようになります。デバイス名やリソース名は、前回取り扱った MQTT デバイスと同様、デバイスプロファイルなどの設定ファイルで定義できます。

なお、現時点では情報のやりとりは一方向であり、つまり、デバイス側から POST された値を取り込むのみで、逆にデバイス側へのリクエストの発行はできないようです。 また、JSON を投げても、現時点の実装ではパースはしてくれず単なる文字列として扱われるようです。

今回試す設計

今回は、 device-rest-goREADME.md を参考に、以下のデバイスサービスとデバイスを定義しています。

  • デバイスプロファイル
    • rest/rest.test.device.profile.yml ファイル
    • リソース intfloat を定義
  • デバイスサービス設定
    • rest/configuration.toml ファイル
    • 上記のデバイスプロファイルに紐づけた REST_DEVICE を定義

これにより、

  • /api/v1/resource/REST_DEVICE/int
  • /api/v1/resource/REST_DEVICE/float

に値を POST すれば取り込んでもらえるようになるはずです。

この REST デバイスサービスを含め、今回分の環境は、Docker Compose ファイルに反映済みです

起動

では、今回分の環境をまとめて起動させます。

MQTT デバイスは今回も使うので、 環境に合わせて mqtt/configuration.toml は修正してください。

MQTT ブローカ、MQTT デバイス、EdgeX Foundry の順に起動します。

$ docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto

$ docker run -d --restart=always --name=mqtt-scripts -v "$(pwd)/mqtt-scripts:/scripts" dersimn/mqtt-scripts --url mqtt://192.168.0.100 --dir /scripts

$ docker-compose up -d

REST デバイスサービスの動作確認

起動が確認できたら、実際にエンドポイントに POST して、データの取り込みを確認します。待ち受けポートは 49986 です。Content-Typetext/plain である必要があります。

$ curl -X POST -H "Content-Type: text/plain" -d 12345 http://localhost:49986/api/v1/resource/REST_DEVICE/int

edgex-device-rest コンテナのログで Pushed event to core data が記録されていれば、値は正常に受け付けられています。

$ docker logs --tail 5 edgex-device-rest
level=DEBUG ts=2020-01-25T07:49:17.0621062Z app=edgex-device-rest source=resthandler.go:82 msg="Received POST for Device=REST_DEVICE Resource=int"
level=DEBUG ts=2020-01-25T07:49:17.0673842Z app=edgex-device-rest source=resthandler.go:101 msg="Content Type is 'text/plain' &amp; Media Type is 'text/plain' and Type is 'Int64'"
level=DEBUG ts=2020-01-25T07:49:17.0726606Z app=edgex-device-rest source=resthandler.go:142 msg="Incoming reading received: Device=REST_DEVICE Resource=int"
level=DEBUG ts=2020-01-25T07:49:17.0766349Z app=edgex-device-rest source=utils.go:75 correlation-id=946f482c-db1f-44b5-8a76-437c4ca3d3e9 msg="SendEvent: EventClient.MarshalEvent encoded event"
level=INFO ts=2020-01-25T07:49:17.0899243Z app=edgex-device-rest source=utils.go:93 Content-Type=application/json correlation-id=946f482c-db1f-44b5-8a76-437c4ca3d3e9 msg="SendEvent: Pushed event to core data"

実際に取り込まれているようです。

$ edgex-cli reading list REST_DEVICE -l 10
Reading ID                              Name    Device          Origin                  Value   Created         Modified        Pushed
b43f7300-1377-4fdb-a82f-44eb497d9568    int     REST_DEVICE     1579938557072646900     12345   3 minutes       3 minutes       50 years

ルールエンジンの利用

お膳立てができたので、本題のルールエンジンを構成していきます。

ルールエンジンの概要

詳細は 公式のドキュメント がわかりやすいですが、端的に言えば、

  • コアサービスまたはエクスポートサービスからデータを受け取る
  • 事前に定義されたルールの条件との合致を確認する
  • 合致していた場合は、そのルールで定義されたアクションを実行する

ような処理をしてくれるサービスです。

実装は現段階では BRMS の Drools ベースとのことなので、Java 製ですね。

ルールエンジンへデータを配信する構成パタンは、ドキュメントでは以下の二つが説明されています。

  • エクスポートサービスのクライアントとして動作させるパタン
    • データは Export Distribution(edgex-export-distro コンテナ)から配信される
    • データの流れが全体で統一されてシンプルに
    • レイテンシやエクスポートサービスのキャパシティなどの面にデメリット
  • コアサービスに直結させるパタン
    • データは Core Data(edgex-core-data コンテナ)から配信される
    • エクスポートサービスをバイパスすることで、レイテンシやパフォーマンス面ではメリットがある
    • データの流れはやや複雑に

この構成パタンの選択や、具体的な接続先の情報は、次で紹介する設定ファイルで指定できます。

ルールエンジンの設定

設定がデフォルトのままでよければ、コンテナイメージの中にすべて含まれているので、敢えて手で投入する必要はありませんが、今回はせっかくなので触れるようにしています。rulesengine フォルダの中のファイル群がそれです。

公式のドキュメント で触れられている設定が含まれるファイルが、rulesengine/application.properties です。

$ cat rulesengine/application.properties
...
export.client=false
...
export.client.registration.url=http://edgex-export-client:48071/api/v1
export.client.registration.name=EdgeXRulesEngine
...
export.zeromq.port=5563
export.zeromq.host=tcp://edgex-core-data
...

export.clientfalse なので、今回はコアサービスから直接データを受け取ります。export.zeromq.host は現時点のデフォルトでは tcp://edgex-app-service-configurable-rules になっており、アプリケーションサービス からデータを受け取る形になっていますが、今回はシンプルにドキュメントに従って tcp://edgex-core-data にしています。

もう一つのファイルが、rulesengine/rule-template.drl です。これはルールそのもののテンプレートとして利用されます。これについては後述します。

ルールの設定

では、実際にルールを設定していきます。現状、GUI でも CLI でもできないので、API を叩きます。標準 GUI には設定画面はあるものの、プルダウンに項目が出てこなくて設定できずでした。

ルールの検討

ルールは以下ような JSON で投入します。

{
    "name": "<ルール名>",
    "condition": {
        "device": "<トリガ条件にするデバイス名>",
        "checks": [
            {
                "parameter": "<トリガ条件にするリソース名>",
                "operand1": "<トリガ条件にする値>",
                "operation": "<比較演算子>",
                "operand2": "<トリガする閾値>"
            }
        ]
    },
    "action": {
        "device": "<操作対象のデバイス ID>",
        "command": "<操作対象のコマンド ID>",
        "body": "<操作対象のコマンドに PUT される内容>"
    },
    "log": "<トリガされたときのログ出力文字列>"
}

全体の構造は、condition で指定した条件を満たしたら action で指定したコマンドがトリガされる、と思えばよいでしょう。

condition での指定値は以下のように組んでいきます。

  • device
    • トリガ条件にするデバイスの名称を指定します。ID では動かないので注意です
    • 今回だと、REST デバイスの値を元にコマンドを実行したいので、REST_DEVICE を指定します
  • parameter
    • トリガ条件にするデバイスのリソース名を指定します。リソース名なので、デバイスプロファイルで指定した名称であり、すなわち Reading の名前でもあります
    • 今回の REST_DEVICE はリソース intfloat を持ちますが、今回は int を指定します
  • operand1
    • 比較元になる値を作ります。デバイスの値は内部では文字列値として扱われているため、数値であれば適切な型にキャストする必要があります
    • Drools が Java ベースのため、ここでは Java の文法で書きます。値に応じて、例えば以下などが使い分けられるでしょう
      • Integer.parseInt(value)
      • Float.parseFloat(value)
  • operation
    • 比較演算子です。<=> などが考えられます
  • operand2
    • 閾値です

action での指定値は、以下のように組みます。

  • device
    • 先ほどと異なり、ここでは ID で指定します
    • 今回は MQ_DEVICE の ID です
    • デバイスの ID は API で確認できます(方法は 過去のエントリ で)
  • command
    • コマンドも ID で指定します
    • 今回は testmessage の ID です
    • コマンドの ID も API で確認できます(方法は 過去のエントリ で)
  • body
    • コマンドの PUT 命令のボディを JSON 形式で指定します
    • めっちゃエスケープが必要です
    • 例えば今回だと、{\\\"message\\\":\\\"HIGH\\\"}{\\\"message\\\":\\\"LOW\\\"} です

最終的に、今回作りたい以下のルール群は、

  • ルール (1)
    • REST デバイスから送られる値が
    • 80 を越えたら
    • MQTT デバイスにメッセージ HIGH を送信する
  • ルール (2)
    • REST デバイスから送られる値が
    • 20 を下回ったら
    • MQTT デバイスにメッセージ LOW を送信する

ひとつめは以下の JSON で、

{
    "name": "rule_int_high",
    "condition": {
        "device": "REST_DEVICE",
        "checks": [
            {
                "parameter": "int",
                "operand1": "Integer.parseInt(value)",
                "operation": ">",
                "operand2": "80"
            }
        ]
    },
    "action": {
        "device": "09dae1fc-e2be-4388-9677-639d2f24c58b",
        "command": "1c7a50e7-5424-4bce-9b9a-29849510580e",
        "body": "{\\\"message\\\":\\\"HIGH\\\"}"
    },
    "log": "Action triggered: The value is too high."
}

ふたつめは以下の JSON であらわされます。

{
    "name": "rule_int_low",
    "condition": {
        "device": "REST_DEVICE",
        "checks": [
            {
                "parameter": "int",
                "operand1": "Integer.parseInt(value)",
                "operation": "<",
                "operand2": "20"
            }
        ]
    },
    "action": {
        "device": "09dae1fc-e2be-4388-9677-639d2f24c58b",
        "command": "1c7a50e7-5424-4bce-9b9a-29849510580e",
        "body": "{\\\"message\\\":\\\"LOW\\\"}"
    },
    "log": "Action triggered: The value is too low."
}

ルールの投入

では、ルールを投入します。エンドポイントは以下です。

  • http://localhost:48075/api/v1/rule

ここに、先の JSON を POST します。

ひとつめのルールの投入。true が返れば成功
ふたつめのルールの投入

ルールの確認

ルールが投入されると、rule-template.drl を元に新しい <ルール名>.drl ファイルが生成されて利用されます。edgex-support-rulesengine 内に配置されるので、ここではコンテナ内のファイルを cat して覗きます。

$ docker exec edgex-support-rulesengine cat /edgex/edgex-support-rulesengine/rules/rule_int_high.drl
package org.edgexfoundry.rules;
global org.edgexfoundry.engine.CommandExecutor executor;
global org.edgexfoundry.support.logging.client.EdgeXLogger logger;
import org.edgexfoundry.domain.core.Event;
import org.edgexfoundry.domain.core.Reading;
import java.util.Map;
rule "rule_int_high"
when
  $e:Event($rlist: readings &amp;&amp; device=="REST_DEVICE")
  $r0:Reading(name=="int" &amp;&amp; Integer.parseInt(value) > 80) from $rlist
then
executor.fireCommand("09dae1fc-e2be-4388-9677-639d2f24c58b", "1c7a50e7-5424-4bce-9b9a-29849510580e", "{\"message\":\"HIGH\"}");
logger.info("Action triggered: The value is too high.");
end

$ docker exec edgex-support-rulesengine cat /edgex/edgex-support-rulesengine/rules/rule_int_low.drl
package org.edgexfoundry.rules;
global org.edgexfoundry.engine.CommandExecutor executor;
global org.edgexfoundry.support.logging.client.EdgeXLogger logger;
import org.edgexfoundry.domain.core.Event;
import org.edgexfoundry.domain.core.Reading;
import java.util.Map;
rule "rule_int_low"
when
  $e:Event($rlist: readings &amp;&amp; device=="REST_DEVICE")
  $r0:Reading(name=="int" &amp;&amp; Integer.parseInt(value) < 20) from $rlist
then
executor.fireCommand("09dae1fc-e2be-4388-9677-639d2f24c58b", "1c7a50e7-5424-4bce-9b9a-29849510580e", "{\"message\":\"LOW\"}");
logger.info("Action triggered: The value is too low.");
end

手元の rule-template.drl と見比べると、テンプレートを元に展開されているっぽさがわかりますね。Drools の drl ファイルの文法にあまり詳しくないですが、行われているのが単純な文字列連結なのであれば、operand に指定する値あたりは工夫すると、もうちょっと複雑な計算もできるのかもしれません。試していませんし、インジェクション攻撃っぽいですけど。

なお、現在の API ではルールの中身までは確認できず、ルール名の一覧が取得できるのみのようです。悲しい。

$ curl -s http://localhost:48075/api/v1/rule | jq
[
  "rule_int_high",
  "rule_int_low"
]

ルールエンジンの動作確認

動作を確認していきます。

動きを追いやすくするため、下準備として edgex-support-rulesengine のログを常時表示させて、さらに別のターミナルで MQTT ブローカの全トピックを購読しておくとわかりやすいです。

$ docker logs -f --tail=10 edgex-support-rulesengine
[2020-01-25 13:44:15.023] boot - 6  INFO [main] --- ZeroMQEventSubscriber: JSON event received
[2020-01-25 13:44:15.024] boot - 6  INFO [main] --- ZeroMQEventSubscriber: Event sent to rules engine for device id:  MQ_DEVICE
[2020-01-25 13:44:17.214] boot - 6  INFO [main] --- ZeroMQEventSubscriber: JSON event received
[2020-01-25 13:44:17.216] boot - 6  INFO [main] --- ZeroMQEventSubscriber: Event sent to rules engine for device id:  MQ_DEVICE
...
$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "#" -v
logic/connected 2
DataTopic {"name":"MQ_DEVICE","cmd":"randfloat32","randfloat32":"27.0"}
...

では、REST デバイスの気持ちになって、まずはルールに合致しないデータを REST デバイスサービスに POST します。

$ curl -X POST -H "Content-Type: text/plain" -d 50 http://localhost:49986/api/v1/resource/REST_DEVICE/int

値は取り込まれていますし、

$ edgex-cli reading list REST_DEVICE -l 10
Reading ID                              Name    Device          Origin                  Value   Created         Modified        Pushed
66461804-a894-470f-a099-631ffd4c32cb    int     REST_DEVICE     1579960045357298000     50      About a minute  About a minute  50 years

ルールエンジンにも REST_DEVICE からの値は届いているようなログが出ますが、実際には何もトリガされません。正常です。

$ docker logs -f --tail=10 edgex-support-rulesengine
...
[2020-01-25 13:47:25.370] boot - 6  INFO [main] --- ZeroMQEventSubscriber: JSON event received
[2020-01-25 13:47:25.466] boot - 6  INFO [main] --- ZeroMQEventSubscriber: Event sent to rules engine for device id:  REST_DEVICE
...

つづいて、ルールに合致する値を投げます。

$ curl -X POST -H "Content-Type: text/plain" -d 90 http://192.168.0.100:49986/api/v1/resource/REST_DEVICE/int

ルールエンジンのログでは、指定したログメッセージが記録され、 {"message":"HIGH"} が指定したコマンドにリクエストされたことがわかります。

$ docker logs -f --tail=10 edgex-support-rulesengine
...
[2020-01-25 13:57:26.805] boot - 6  INFO [main] --- ZeroMQEventSubscriber: JSON event received
[2020-01-25 13:57:26.853] boot - 6  INFO [main] --- RuleEngine: Action triggered: The value is too high.
[2020-01-25 13:57:26.853] boot - 6  INFO [SimpleAsyncTaskExecutor-2] --- CommandExecutor: Sending request to:  09dae1fc-e2be-4388-9677-639d2f24c58bfor command:  1c7a50e7-5424-4bce-9b9a-29849510580e with body: {"message":"HIGH"}
[2020-01-25 13:57:26.866] boot - 6  INFO [main] --- RuleEngine: Event triggered 1rules: Event [pushed=0, device=REST_DEVICE, readings=[Reading [pushed=0, name=int, value=90, device=REST_DEVICE]], toString()=BaseObject [id=538aea34-b4ce-4342-88a9-6c08cdac080e, created=0, modified=0, origin=1579960646793054700]]
...

MQTT ブローカ上では、MQTT デバイスに対するコマンド実行が行われた様子がわかります。

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "#" -v
CommandTopic {"cmd":"message","message":"HIGH","method":"set","uuid":"5e2c4946b8dd790001754b8b"}
ResponseTopic {"cmd":"message","message":"HIGH","method":"set","uuid":"5e2c4946b8dd790001754b8b"}
...

MQTT デバイスの testmessage コマンドに GET すると、値が狙い通りに変更されていることがわかります。

$ curl -s http://localhost:48082/api/v1/device/name/MQ_DEVICE/command/testmessage | jq
{
  "device": "MQ_DEVICE",
  "origin": 1579960983817888000,
  "readings": [
    {
      "origin": 1579960983809258000,
      "device": "MQ_DEVICE",
      "name": "message",
      "value": "HIGH"
    }
  ],
  "EncodedEvent": null
}

同様に、REST デバイスの気持ちになってふたつめのルールに合致する値を投げます。

$ curl -X POST -H "Content-Type: text/plain" -d 10 http://192.168.0.100:49986/api/v1/resource/REST_DEVICE/int

もろもろの処理が動き、MQTT デバイス側の値が変わったことが確認できます。

$ curl -s http://localhost:48082/api/v1/device/name/MQ_DEVICE/command/testmessage | jq
{
  "device": "MQ_DEVICE",
  "origin": 1579961107845061000,
  "readings": [
    {
      "origin": 1579961107837083000,
      "device": "MQ_DEVICE",
      "name": "message",
      "value": "LOW"
    }
  ],
  "EncodedEvent": null
}

まとめ

あるデバイスの値の変化をトリガにして、別のデバイスを制御できることが確認できました。

今回は、REST デバイスを使った関係で、実質手動でトリガさせたに等しい状況でしたが、本来はセンサの値をトリガにアクチュエータを動かすような使い方になるでしょう。REST デバイスの代わりに MQTT デバイスや仮想デバイスのランダム値を condition に指定すれば、似たような状況のテストが可能です。

現段階ではシンプルなルールエンジンしか積まれていませんが、今後エコシステムが成熟してアプリケーションサービスなどが充実してくれば、より複雑な処理も可能になることが期待できます。

EdgeX Foundry 関連エントリ


Rapsberry Pi

冬休みの自由研究: EdgeX Foundry (4) MQTT デバイスサービスの追加

EdgeX Foundry 関連エントリ

おさらい

前々回のエントリ では、下図のとおり、バンドルされている仮想デバイスを利用して動作を確認していました。

前々回のエントリで構成されていたデバイスとデバイスサービス

これらの仮想デバイスは、デバイスからのデータの受け取りやデバイスへのコマンド実行などをテストする目的ではたいへん便利ですが、現実世界とのインタラクションはできません。

そこで今回は、MQTT をインタフェイスにもつデバイス(のシミュレータ)を用意し、下図のように EdgeX Foundry がそのデバイスと実際に(MQTT ブローカを介して)インタラクションできる状態を構成します。

MQTT 対応デバイス(のシミュレータ)を EdgeX Foundry の制御下におく

本エントリの作業内容は、エントリ執筆時点の公式ドキュメント(6.5. MQTT – Adding a Device to EdgeX)を基にしています。

今回も、GitHub にもろもろ置いてあります ので、こちらをクローンして使います。

$ git clone https://github.com/kurokobo/edgex-lab-handson.git
 
$ cd lab03

新しいデバイスの仕様と動作確認

まずは、今回新しく EdgeX Foundry の制御下におきたいデバイスの仕様を整理します。その後、実際にそのデバイスのシミュレータを動作させ、仕様通りに動くことを確認します。

新しいデバイスの仕様

今回は、以下のような仕様のデバイスを考えます。

  • 二種類のセンサを持つ
    • randfloat32
    • randfloat64
  • 二種類の文字列情報を持つ
    • ping
    • message
  • トピック DataTopic にセンサ randfloat32 の値を 15 秒ごとに配信する
    • {"name":"MQ_DEVICE","cmd":"randfloat32","randfloat32":"<値>"} の形式で配信する
  • トピック CommandTopic でコマンドを待ち受ける
    • {"name":"MQTT_DEVICE","method":"<get または set>","cmd":"<コマンド名>"} の形式のコマンドを受け取る
  • コマンドを受け取ったら、コマンド応じて処理を実行し、結果をトピック ResponseTopic に配信する
    • コマンドのメソッドが set だった場合は、保存されている message を書き換える
    • コマンドが ping だった場合は、pong を返す
    • コマンドが message だった場合は、保存されている message を返す
    • コマンドが randfloat32 または randfloat64 だった場合は、それぞれ対応するセンサの値を返す
    • 結果は {"name":"MQ_DEVICE","method":"<メソッド>","cmd":"<コマンド名>","<コマンド名>":"<値>"} の形式で配信する

デバイス側が定期的にデータを発信するだけでなく、コマンド操作も受け付けるようなデバイスです。ただしよく見ると、二つあるセンサのうち randfloat64 は、値の定期的な自動配信はありません(これは伏線です)。

シミュレータの起動

今回は、上記のようなデバイスを模したシミュレータを利用します。MQTT を扱うにはブローカが不可欠なので、前回のエントリ と同様にまずはこれを起動します。

$ docker run -d --rm --name broker -p 1883:1883 eclipse-mosquitto
6de5986ebb1b3715179253857952f143fbf19fa77e201980f8573d96558850fd

続けてデバイス(のシミュレータ)の用意です。

Node.js で MQTT を扱えるコンテナ dersimn/mqtt-scripts で、次のようなスクリプトを起動させます。

function getRandomFloat(min, max) {
    return Math.random() * (max - min) + min;
}

const deviceName = "MQ_DEVICE";
let message = "test-message";

// 1. Publish random number every 15 seconds
schedule('*/15 * * * * *', () => {
    let body = {
        "name": deviceName,
        "cmd": "randfloat32",
        "randfloat32": getRandomFloat(25, 29).toFixed(1)
    };
    publish('DataTopic', JSON.stringify(body));
});

// 2. Receive the reading request, then return the response
// 3. Receive the put request, then change the device value
subscribe("CommandTopic", (topic, val) => {
    var data = val;
    if (data.method == "set") {
        message = data[data.cmd]
    } else {
        switch (data.cmd) {
            case "ping":
                data.ping = "pong";
                break;
            case "message":
                data.message = message;
                break;
            case "randfloat32":
                data.randfloat32 = getRandomFloat(25, 29).toFixed(1);
                break;
            case "randfloat64":
                data.randfloat64 = getRandomFloat(10, 1).toFixed(5);
                break;
        }
    }
    publish("ResponseTopic", JSON.stringify(data));
});

これを起動するには、以下のように作業します。IP アドレスは適宜読み替えてください。

$ cd mqtt-scripts
$ docker run -d --restart=always --name=mqtt-scripts -v "$(pwd):/scripts" dersimn/mqtt-scripts --url mqtt://192.168.0.100 --dir /scripts
Unable to find image 'dersimn/mqtt-scripts:latest' locally
...
9d52b4956ebbec60dd0f93bdf9750ff915fefd8e8c58d3ce8bc7ba1429693d2b

シミュレータの動作確認

ブローカの全トピックを購読します。15 秒ごとに DataTopic にセンサ randfloat32 の値が届いていることがわかります。

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "#" -v
...
DataTopic {"name":"MQ_DEVICE","cmd":"randfloat32","randfloat32":"27.4"}
DataTopic {"name":"MQ_DEVICE","cmd":"randfloat32","randfloat32":"28.6"}

コマンド操作ができることも確認します。別のターミナルで CommandTopic にコマンドを配信します。Windows 環境だと "\" にしないと動かないかもしれません。

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "CommandTopic" -m '{"name":"MQTT_DEVICE","method":"get","cmd":"ping"}'

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "CommandTopic" -m '{"name":"MQTT_DEVICE","method":"get","cmd":"randfloat32"}'

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "CommandTopic" -m '{"name":"MQTT_DEVICE","method":"get","cmd":"randfloat64"}'

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "CommandTopic" -m '{"name":"MQTT_DEVICE","method":"get","cmd":"message"}'

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "CommandTopic" -m '{"name":"MQTT_DEVICE","method":"set","cmd":"message","message":"modified-message"}'

$ docker run --init -it --rm efrecon/mqtt-client pub -h 192.168.0.100 -t "CommandTopic" -m '{"name":"MQTT_DEVICE","method":"get","cmd":"message"}'

購読している側では、コマンドが CommandTopic に届き、そのコマンドに応じた応答が ResponseTopic に配信されていることが確認できます。また、message コマンドの結果が、set メソッドの実行前後で変更されていることも確認できます。

$ docker run --init --rm -it efrecon/mqtt-client sub -h 192.168.0.100 -t "#" -v
...
CommandTopic {"name":"MQTT_DEVICE","method":"get","cmd":"ping"}
ResponseTopic {"name":"MQTT_DEVICE","method":"get","cmd":"ping","ping":"pong"}
...
CommandTopic {"name":"MQTT_DEVICE","method":"get","cmd":"randfloat32"}
ResponseTopic {"name":"MQTT_DEVICE","method":"get","cmd":"randfloat32","randfloat32":"27.6"}
...
CommandTopic {"name":"MQTT_DEVICE","method":"get","cmd":"randfloat64"}
ResponseTopic {"name":"MQTT_DEVICE","method":"get","cmd":"randfloat64","randfloat64":"8.39883"}
...
CommandTopic {"name":"MQTT_DEVICE","method":"get","cmd":"message"}
ResponseTopic {"name":"MQTT_DEVICE","method":"get","cmd":"message","message":"test-message"}
...
CommandTopic {"name":"MQTT_DEVICE","method":"set","cmd":"message","message":"modified-message"}
ResponseTopic {"name":"MQTT_DEVICE","method":"set","cmd":"message","message":"modified-message"}
...
CommandTopic {"name":"MQTT_DEVICE","method":"get","cmd":"message"}
ResponseTopic {"name":"MQTT_DEVICE","method":"get","cmd":"message","message":"modified-message"}

ここまでで、デバイス(のシミュレータ)が動作している状態が作れました。ここまでは EdgeX Foundry はまったく関係なく、ただ単に前述の仕様のデバイスを論理的に作っただけです。

デバイスサービスの構成

さて、ここからが本題です。先ほど作ったデバイスを EdgeX Foundry の制御下におくことを考えます。

デバイスサービスの概念

現実世界に存在するデバイスは、それぞれが異なるセンサやスイッチを持っていて、異なるプロトコルをインタフェイスに持つため、それぞれに応じた適切な方法で値の取得や制御命令の発行をする必要があります。

この実態を踏まえ、EdgeX Foundry では、実デバイスと EdgeX Foundry の間を取り持ってくれる存在としてデバイスサービスとよばれるマイクロサービスを定義しています。デバイスサービスは、おおむねデバイスの種類ごとに用意するものと思えばよさそうで、つまり、ひとつのデバイスサービスに対してひとつ以上のデバイスの制御を任せられるようです。

デバイスサービスは、以下のような役割を持ちます。

  • コアサービス層に対して、デバイスからの情報取得やデバイスへの制御命令の発行を行える REST エンドポイントを提供する
  • コアサービス層からの制御命令を実デバイスに合わせた命令に翻訳して実行し、実デバイスを制御する
  • デバイスから送られてきた情報をコアサービス層に合わせた情報に翻訳し、コアサービス層に届ける
  • 事前に定められた場合は、定期的にデバイスに対して特定の処理を行い、結果をコアサービス層に届ける

また、デバイスサービスそのものは、以下のような特徴を持ちます。

  • MQTT や Modbus など業界で多く使われるプロトコルに合わせたデバイスサービスはすでに参考実装が用意されており、複雑でない構成の場合は充分に流用できる
  • 参考実装の流用では不足する場合は、提供されている SDK を利用して気軽に自製できる(C 言語または Go 言語)
  • すべてのデバイスサービスはこの SDK を用いて共通のフレームワークのもとで実装されるため、操作方法や設定方法は自ずと共通化され、相互運用性は高まる
  • EdgeX Foundry を取り巻くエコシステムの成熟により、デバイスのベンダがそのデバイス用のデバイスサービスを提供したり、デバイスにデバイスサービスが組み込まれたり、第三者により開発されたデバイスサービスの充実も期待できる

EdgeX Foundry に限った話ではないですが、プラットフォーム系の製品の場合、製品の発展にはエコシステムの成熟が非常に重要です。この観点では、EdgeX Foundry は、商用環境への利用を謳ったバージョン 1.0 のリリースが 2019 年の冬とついこの前なので、まだまだこれからですね。

デバイスサービスの構成要素

デバイスを制御するデバイスサービスを追加するには、以下の 3 つの要素を考える必要があります。

  • デバイスサービスそのもの
    • コアサービスとデバイスとの間を仲介する、C 言語または Go 言語で実装されたマイクロサービス
    • 今回はコンテナとして動作させる
    • 後述のデバイスプロファイルとデバイスサービス設定を読み込んで動作する
  • デバイスプロファイル(Device Profile)
    • YAML ファイルとして定義
    • 管理対象のデバイス種別が持つリソースや、そのリソースが持つ値の意味、それぞれのリソースに対して行える操作などを定義する
  • デバイスサービス設定(Drvice Service Configuration)
    • configuration.toml ファイルとして定義
    • デバイスサービス自身の設定や、連携する EdgeX Foundry の他のマイクロサービスの情報などを定義する
    • デバイスプロファイルと実際のデバイスを紐づけてデバイスを定義する。デバイスやリソースに対する自動実行処理なども定義する
    • その他、デバイスサービスの動作に必要なパラメータを指定する

デバイスプロファイル

今回は、mqtt.test.device.profile.yml を利用します。中身は以下の通りです。

$ cat mqtt/mqtt.test.device.profile.yml
name: "Test.Device.MQTT.Profile"
manufacturer: "Dell"
model: "MQTT-2"
labels:
- "test"
description: "Test device profile"

deviceResources:
- name: randfloat32
  description: "device random number with Base64 encoding"
  properties:
    value:
      { type: "Float32", size: "4", readWrite: "R", defaultValue: "0.00", minimum: "100.00", maximum: "0.00", floatEncoding: "Base64" }
    units:
      { type: "String", readWrite: "R", defaultValue: "" }
- name: randfloat64
  description: "device random number with e notion"
  properties:
    value:
      { type: "Float64", size: "4", readWrite: "R", defaultValue: "0.00", minimum: "100.00", maximum: "0.00", floatEncoding: "eNotation" }
    units:
      { type: "String", readWrite: "R", defaultValue: "" }
- name: ping
  description: "device awake"
  properties:
    value:
      { type: "String", size: "0", readWrite: "R", defaultValue: "oops" }
    units:
      { type: "String", readWrite: "R", defaultValue: "" }
- name: message
  description: "device notification message"
  properties:
    value:
      { type: "String", size: "0", readWrite: "W" ,scale: "", offset: "", base: ""  }
    units:
      { type: "String", readWrite: "R", defaultValue: "" }

deviceCommands:
- name: testrandfloat32
  get:
    - { index: "1", operation: "get", deviceResource: "randfloat32"}
- name: testrandfloat64
  get:
    - { index: "1", operation: "get", deviceResource: "randfloat64"}
- name: testping
  get:
    - { index: "1", operation: "get", deviceResource: "ping"}
- name: testmessage
  get:
    - { index: "1", operation: "get", deviceResource: "message"}
  set:
    - { index: "1", operation: "set", deviceResource: "message"}

coreCommands:
- name: testrandfloat32
  get:
    path: "/api/v1/device/{deviceId}/testrandfloat32"
    responses:
    - code: "200"
      description: "get the random float32 value"
      expectedValues: ["randfloat32"]
    - code: "500"
      description: "internal server error"
      expectedValues: []
- name: testrandfloat64
  get:
    path: "/api/v1/device/{deviceId}/testrandfloat64"
    responses:
    - code: "200"
      description: "get the random float64 value"
      expectedValues: ["randfloat64"]
    - code: "500"
      description: "internal server error"
      expectedValues: []
- name: testping
  get:
    path: "/api/v1/device/{deviceId}/testping"
    responses:
    - code: "200"
      description: "ping the device"
      expectedValues: ["ping"]
    - code: "500"
      description: "internal server error"
      expectedValues: []
- name: testmessage
  get:
    path: "/api/v1/device/{deviceId}/testmessage"
    responses:
    - code: "200"
      description: "get the message"
      expectedValues: ["message"]
    - code: "500"
      description: "internal server error"
      expectedValues: []
  put:
    path: "/api/v1/device/{deviceId}/testmessage"
    parameterNames: ["message"]
    responses:
    - code: "204"
      description: "set the message."
      expectedValues: []
    - code: "500"
      description: "internal server error"
      expectedValues: []

デバイスプロファイルは、デバイスと一対一ではなく、例えばデバイスのモデルごとにひとつ用意するイメージです 。

この例では、冒頭の 6 行でデバイスのモデルを定義しています。先ほど動作させたデバイス(のシミュレータ)は、Dell 製の MQTT-2 というモデルである、という想定で書かれています。

その後、大きく deviceResourcesdeviceCommandscoreCommands、の三つのセクションが続いています。

deviceResources セクションでは、デバイスが持っているリソースと、それが持つ値の意味を決めています。この例では、このデバイスにリソース randfloat64 があり、それは 0.00 から 100.00 の範囲の Float64 型の値を持つと定義されています。また、message リソースは String 型の値を持ち、他のリソースと違って書き換えが可能(W)と定義されていることがわかります。

deviceCommands セクションでは、このデバイスに対して実行できるコマンドを定義し、そのコマンドのメソッド(get または set)ごとに参照または操作されるリソースを紐付けています。例えば、testrandfloat64 コマンドの get メソッドは、先に deviecResources で定義したリソース randfloat64 からの値の読み取りを行えるように定義されています。また、testmessage コマンドでは、set または get メソッドにより、リソース message の値の取得や変更ができるように定義されています。なお、ここでは実装されていませんが、ひとつのメソッドに複数のリソースを紐付けることも可能です。つまり、一回の get 操作で複数のリソースの値を同時に取得するようにも構成できます。Edge Xpert のドキュメントでは、そのような例が解説され ています。

coreCommands セクションでは、コアサービス層からデバイスに対して実行できるコマンドを定義しています。例えば、コアサービスが testrandfloat64 コマンドに GET リクエストを実行すると、その命令はパス /api/v1/device/{deviceId}/testrandfloat64 に渡されます。このパスは、先の deviceCommands で定義した testrandfloat64 コマンドを表すものです。そしてコアサービスは、結果としてレスポンスコード 200 が返ってきた場合は、そのレスポンスボディに含まれる値を randfloat32 の値として取り込むように構成されています。また、testmessage コマンドのみ、ほかのコマンドと異なり PUT リクエストの動作が定義されています。

デバイスサービス設定

今回は、configuration.toml を利用します。中身は以下の通りです。

このファイルは環境依存の固定値を含みます。自分の環境で試す場合は、IP アドレスを環境に合わせて書き換えてください。

$ cat mqtt/configuration.toml
# configuration.toml
[Writable]
LogLevel = 'DEBUG'

[Service]
Host = "edgex-device-mqtt"
Port = 49982
ConnectRetries = 3
Labels = []
OpenMsg = "device mqtt started"
Timeout = 5000
EnableAsyncReadings = true
AsyncBufferSize = 16

[Registry]
Host = "edgex-core-consul"
Port = 8500
CheckInterval = "10s"
FailLimit = 3
FailWaitTime = 10
Type = "consul"

[Logging]
EnableRemote = false
File = "./device-mqtt.log"

[Clients]
  [Clients.Data]
  Name = "edgex-core-data"
  Protocol = "http"
  Host = "edgex-core-data"
  Port = 48080
  Timeout = 50000

  [Clients.Metadata]
  Name = "edgex-core-metadata"
  Protocol = "http"
  Host = "edgex-core-metadata"
  Port = 48081
  Timeout = 50000

  [Clients.Logging]
  Name = "edgex-support-logging"
  Protocol = "http"
  Host ="edgex-support-logging"
  Port = 48061

[Device]
  DataTransform = true
  InitCmd = ""
  InitCmdArgs = ""
  MaxCmdOps = 128
  MaxCmdValueLen = 256
  RemoveCmd = ""
  RemoveCmdArgs = ""
  ProfilesDir = "/custom-config"

# Pre-define Devices
[[DeviceList]]
  Name = "MQ_DEVICE"
  Profile = "Test.Device.MQTT.Profile"
  Description = "General MQTT device"
  Labels = [ "MQTT"]
  [DeviceList.Protocols]
    [DeviceList.Protocols.mqtt]
       Schema = "tcp"
       Host = "192.168.0.100"
       Port = "1883"
       ClientId = "CommandPublisher"
       User = ""
       Password = ""
       Topic = "CommandTopic"
  [[DeviceList.AutoEvents]]
    Frequency = "30s"
    OnChange = false
    Resource = "testrandfloat64"

# Driver configs
[Driver]
IncomingSchema = "tcp"
IncomingHost = "192.168.0.100"
IncomingPort = "1883"
IncomingUser = ""
IncomingPassword = ""
IncomingQos = "0"
IncomingKeepAlive = "3600"
IncomingClientId = "IncomingDataSubscriber"
IncomingTopic = "DataTopic"
ResponseSchema = "tcp"
ResponseHost = "192.168.0.100"
ResponsePort = "1883"
ResponseUser = ""
ResponsePassword = ""
ResponseQos = "0"
ResponseKeepAlive = "3600"
ResponseClientId = "CommandResponseSubscriber"
ResponseTopic = "ResponseTopic"

このファイルで、実際のデバイスを定義したり、必要なパラメータを指定したりします。

前半部分では EdgeX Foundry 内の他のサービスの情報などを指定しています。

実際のデバイスの定義は [[DeviceList]] の部分です。デバイスプロファイル Test.Device.MQTT.Profile に紐づけてデバイス MQ_DEVICE を定義しています。また、このデバイスに対するコマンドの実行に利用する MQTT ブローカも指定しています。

[[DeviceList.AutoEvents]] の部分では、デバイス側からは自動では配信されない値であった randfloat64 の値を定期的に取得するため、30 秒ごとに testrandfloat64 コマンドを実行するように設定しています。

[Driver] の部分では、デバイスサービスがデバイスから情報を受け取るために購読する MQTT ブローカやトピック名を設定しています。

なお、今回はこのファイルの中で実デバイスの定義もしてしまっていますが、デバイスはあとからも追加が可能です。また、このファイルはあくまで MQTT デバイスサービスの設定ファイルなので、利用したいデバイスサービスが違えば当然この設定ファイルで書くべき内容も変わります。

デバイスサービスを含んだ EdgeX Foundry の起動

設定ファイルができたら、デバイスサービスを含む EdgeX Foundry を起動します。

今回は、Docker Compose ファイルに以下を足しています。これにより、 edgexfoundry/docker-device-mqtt-go イメージがコンテナとして実行され、今回用の設定ファイルが含まれる mqtt ディレクトリがコンテナにマウントさたうえで、起動時にはその中身を利用して EdgeX Foundry にデバイスサービスとデバイスが登録されます。

  device-mqtt:
    image: edgexfoundry/docker-device-mqtt-go:1.1.1
    ports:
      - "49982:49982"
    container_name: edgex-device-mqtt
    hostname: edgex-device-mqtt
    networks:
      - edgex-network
    volumes:
      - db-data:/data/db
      - log-data:/edgex/logs
      - consul-config:/consul/config
      - consul-data:/consul/data
      - ./mqtt:/custom-config
    depends_on:
      - data
      - command
    entrypoint:
      - /device-mqtt
      - --registry=consul://edgex-core-consul:8500
      - --confdir=/custom-config

では、実際にこのファイルを利用して EdgeX Foundry を起動させます。

$ docker-compose up -d

動作確認

まずは、デバイスサービスやデバイスが正常に EdgeX Foundry に認識されているか確認し、その後、値の収集やコマンド操作を確認します。

登録状態の確認

以前のエントリで device-virtual の存在を確認した方法 で、今回はデバイスサービス edgex-device-mqtt とデバイス MQ_DEVICE が確認できれば、登録は成功しています。ラクなのは UI か CLI ですね。

また、これまで紹介しませんでしたが、EdgeX Foundry はマイクロサービス群の状態管理に HashiCorp の Consul を利用しており、各サービスの起動状態や登録状態は、この Consul を通じても確認できます。

Consul の GUI には、http://192.168.0.100:8500/ でアクセスできます。

Consul の GUI

ここで、

  • [Services] タブで edgex-device-mqtt が緑色である
  • [Key/Value] タブで edgex > devices > 1.0edgex-device-mqtt がある
  • [Key/Value] タブで edgex > devices > 1.0 > edgex-device-mqtt > DeviceList > 0 > NameMQ_DEVICE である

あたりが確認できれば、登録はできていると考えてよいでしょう。

デバイスから配信されている値の収集の確認

今回のデバイスは、センサ randfloat32 の値を 15 秒ごとに自動的に MQTT トピックに配信していました。デバイスサービスはこれを受け取れるように構成しましたので、EdgeX Foundry に取り込まれているはずです。

これも、以前のエントリで値を確認した方法 で確認できます。これもラクなのは GUI か CLI ですが、例えば、API で確認する場合は以下の通りです。

$ curl -s http://localhost:48080/api/v1/reading/name/randfloat32/3 | jq
[
  {
    "id": "23fdac76-0870-4b9d-b653-561d8a7c0423",
    "created": 1579707885029,
    "origin": 1579707885005379300,
    "modified": 1579707885029,
    "device": "MQ_DEVICE",
    "name": "randfloat32",
    "value": "QdpmZg=="
  },
  {
    "id": "5c086b1d-6b38-48c0-9aef-9b800f9c72ab",
    "created": 1579707870018,
    "origin": 1579707870004742000,
    "modified": 1579707870018,
    "device": "MQ_DEVICE",
    "name": "randfloat32",
    "value": "QeGZmg=="
  },
  {
    "id": "de67c865-0406-408d-b238-d33de1fe1032",
    "created": 1579707855022,
    "origin": 1579707855011006200,
    "modified": 1579707855022,
    "device": "MQ_DEVICE",
    "name": "randfloat32",
    "value": "Qd2Zmg=="
  }
]

時刻はエポックミリ秒(origin はエポックナノ秒)なので、変換すると 15 秒おきであることが確認できます。

$ date -d "@1579707885.029"
Wed 22 Jan 2020 03:44:45 PM UTC

$ date -d "@1579707870.018"
Wed 22 Jan 2020 03:44:30 PM UTC

$ date -d "@1579707855.022"
Wed 22 Jan 2020 03:44:15 PM UTC

デバイスサービスの自動実行イベントの確認

randfloat64 の値は、自動では配信されなかったため、デバイスサービスの設定(configuration.toml)に [[DeviceList.AutoEvents]] を定義して、デバイスサービス側から 30 秒ごとに自動で収集させていました。

これもデータの蓄積を任意の方法で確認できます。例えば CLI で確認する場合は以下の通りです。

$ edgex-cli reading list MQ_DEVICE -l 10 | grep randfloat64
5df1da1a-91db-41cf-99fa-962e87077395    randfloat64     MQ_DEVICE       1579709342806935600     4.924730e+00    15 seconds      15 seconds      50 years
7c33bb76-c9e9-4ef8-a410-80227e236d13    randfloat64     MQ_DEVICE       1579709312695223100     3.734020e+00    45 seconds      45 seconds      50 years
053987f1-5a89-4cc2-a433-03432caa9039    randfloat64     MQ_DEVICE       1579709282611878800     9.320210e+00    About a minute  About a minute  50 years

これも変換すると 30 秒おきであることが確認できます。

$ date -d "@1579709342.806935600"
Wed Jan 22 16:09:02 UTC 2020

$ date -d "@1579709312.695223100"
Wed Jan 22 16:08:32 UTC 2020

$ date -d "@1579709282.611878800"
Wed Jan 22 16:08:02 UTC 2020

デバイスへのコマンド実行の確認

このデバイスは、リソース message の値は EdgeX Foundry から変更できるような仕様で、デバイスプロファイルでもそれに合わせて PUT リクエストの挙動を定義していました。

では、この操作ができることを確認します。

デバイスプロファイルで定義した各コマンドは、内部では REST エンドポイントでリクエストを待ち受けてており、そしてこの URL は、API 経由で確認できます。具体的には、デバイスを示すエンドポイントへの GET リクエストの結果に含まれます。

$ curl -s http://localhost:48082/api/v1/device/name/MQ_DEVICE | jq
...
  "id": "77fccbb8-f33b-42d8-bf21-6d55cdde2068",
  "name": "MQ_DEVICE",
...
  "commands": [
...
      "name": "testmessage",
...
      "get": {
        "path": "/api/v1/device/{deviceId}/testmessage",
...
        "url": "http://edgex-core-command:48082/api/v1/device/77fccbb8-f33b-42d8-bf21-6d55cdde2068/command/4daa4148-7c0e-4cfd-81cc-b2b740bb4de4"
      },
      "put": {
        "path": "/api/v1/device/{deviceId}/testmessage",
...
        "url": "http://edgex-core-command:48082/api/v1/device/77fccbb8-f33b-42d8-bf21-6d55cdde2068/command/4daa4148-7c0e-4cfd-81cc-b2b740bb4de4",
...

EdgeX Foundry がデバイスにコマンドを実行するときは、内部ではこの URL に対して GET なり PUT なりをリクエストするわけです。そしてデバイスサービスがそれを受け取って、設定したとおりに翻訳してデバイスへの操作が実行され応答が返ることになります。

ここでは実際に、確認できた URL にリクエストを投げます。外部からこの URL を直接叩くときは、ホスト名部分を環境に合わせて書き換えたうえで叩きます。今回であれば、次のような操作で現在値が確認できます。

$ curl -s http://localhost:48082/api/v1/device/77fccbb8-f33b-42d8-bf21-6d55cdde2068/command/4daa4148-7c0e-4cfd-81cc-b2b740bb4de4 | jq
{
  "device": "MQ_DEVICE",
  "origin": 1579710432121478400,
  "readings": [
    {
      "origin": 1579710432109675800,
      "device": "MQ_DEVICE",
      "name": "message",
      "value": "test-message"
    }
  ],
  "EncodedEvent": null
}

まだ何も変更していないので、初期値(mqtt-script.js で定義されている)が返ってきました。

では、変更の PUT を投げます。JSON をリクエストボディに含めて PUT します。

$ curl -s -X PUT -d '{"message":"modified-message"}' http://localhost:48082/api/v1/device/77fccbb8-f33b-42d8-bf21-6d55cdde2068/command/4daa4148-7c0e-4cfd-81cc-b2b740bb4de4

エラーがなければ、再度 GET すると、値が変わっていることが確認できます。

ところで、URL は /api/v1/device/<デバイス ID>/command/<コマンド ID> なパスですが、実はわざわざ ID を調べなくても、/api/v1/device/name/<デバイス名>/command/<コマンド名> でも機能します。というわけで、お好みに合わせてどちらを使ってもよいですが、せっかくなので今度はこちらに GET します。

$ curl -s http://localhost:48082/api/v1/device/name/MQ_DEVICE/command/testmessage | jq
{
  "device": "MQ_DEVICE",
  "origin": 1579710963293832000,
  "readings": [
    {
      "origin": 1579710963283338200,
      "device": "MQ_DEVICE",
      "name": "message",
      "value": "modified-message"
    }
  ],
  "EncodedEvent": null
}

値が変わっていることが確認できました。

なお、このような GET や PUT に相当する操作は、GUI でも実行できます。例えば Closure UI であれば、[DEVICES] 画面のデバイス MQ_DEVICEACTION の [Control Device] アイコンから現在値が確認できます。この裏側では、この画面を開く際に、全部のリソースに対して順番に GET が発行されています。

現在値が確認できる

この画面では、PUT リクエストが可能なリソースには ACTION 欄に Set Value アイコンが出るのですが、実行しようとしても手元の環境では入力欄が表示されず使えませんでした……。

Closure UI ではない標準 UI であれば、PUT も実行できました。

Method を set にして send するだけ

まとめ

新しいデバイス(のシミュレータ)を用意し、それに合わせたデバイスサービスを構成して、このデバイスを EdgeX Foundry の制御下においたうえで、実際の動作を確認できました。

実際のシーンでは、本当のデバイスの本当の仕様に合わせてデバイスサービスを構成する必要があり、場合によってはデバイスサービスそれ自体の実装を含めてカスタマイズが必要になる可能性ももちろんあるわけです。

が、今回確認した通り、デバイスや環境に依存する部分(リソースやコマンドの名前や数、MQTT ブローカのホスト情報やトピック名など)は、デバイスプロファイルやデバイスサービス設定として別ファイルで定義できるようになっていて、デバイスサービスの実装それ自体は、設定ファイルに従って淡々と動くだけでもあります。

例えば Raspberry Pi に接続したセンサの値を取り込みたい場合を考えると、実際のセンサ値を MQTT トピックに投げる部分は自製する必要がありますが、であればその際に、逆に今回使ったこのデバイスサービスの仕様を前提にした投げ方にしてしまうことも可能です。そうすれば、このデバイスサービスをそのまま実際に流用できることになります。

今回は MQTT の例でしたが、それ以外のプロトコル用の参考実装も、環境依存の部分が設定ファイル群で外在化できるような汎用性のある形で作られているようなので、それなりに様々なデバイスで使いまわせそうです。

なお、EdgeX Foundry に取り込まれたデータは、前回のエントリで扱ったエクスポートの設定 や、アプリケーションサービスを構成することで、簡単に持ち出せます。コアサービス層に集約される段階で、すでにデバイスの差異は抽象化されているので、もちろん今回の MQTT で収集したデータも他のデバイスの情報とまったく同じ方法でエクスポート可能です。

EdgeX Foundry 関連エントリ