到目前为止, IotHub 一次只能对一个设备下发指令,假如业务系统需要对多台设备同时下发指令,应该怎么做呢?我们可以将设备进行分组,业务系统可以通过指定指令的设备组,来实现指令的批量下发。
设备分组
来看一下这个场景,业务系统要关闭二楼所有的 10 个传感器,在目前的 IotHub 功能设计下,业务系统需要调用 10 次下发指令接口,这显然是不合理的。 以 MQTT 的解决方案来说,这 10 个传感器都订阅一个 topic,比如: sensors/2ndfloor,只要往这个主题上发布一条消息就可以了,不需要每个设备都发布一次。
以上就是 IotHub 设备分组要实现的功能,业务系统可以通过 IotHub Server API 提供的接口,给设备设置一个或者多个标签,同时Server API提供接口,可以根据标签批量下发指令。 这样就实现了类似于设备分组的功能,拥有相同的标签的设备就相当于属于同一分组。
有两个问题是设备分组功能需要去解决的。
第一个是设备如何去订阅相应的标签的主题,到目前为止,IotHub 的设备端是通过 EMQ X 的服务器订阅功能完成订阅的,这设备分组的场景下,设备的标签是可以动态增加和删除的,所以无法使用 EMQ X 的服务器订阅功能。那么我们只能使用 MQTT 的 subscribe 和 unsubcribe 功能来完成标签的订阅。
第二个是设备如何知道自己应该订阅哪些标签的主题,在业务系统修改了设备的标签,IotHub 需要将设备的标签信息告知设备,这样设备才能去 subscribe 和 unsubscribe 相应的标签主题。IotHub 会同时使用 Push 和 Pull 模式来告知设备它的标签信息。
功能设计
我们会为设备添加标签,设备会根据标签的内容去订阅相应的主题,为了确保标签的内容可以正确同步到设备,我们会设计标签同步的 Push 和 Pull 模式。
标签存储
Device 模型将新增一个 tags 字段,类型为数组。一个设备可以拥有多个标签。
标签信息同步
Push模式:当设备的标签信息发生变化,即业务系统调用 Server API 修改设备标签后,IotHub 将设备标签数组通过指令下发给设备,指令名为$set_tags
。
Pull模式:当设备连接到 IotHub 后,会发起一个 Resource 名为$tags
的数据请求,IotHub 收到请求之后会将设备标签数组通过指令下发给设备,指令名为$set_tags
。
结合 Push 和 Pull 模式,我们基本上可以保证设备能够准确地获取自己的标签。
熟悉 MQTT 的读者这里可能会有一个疑问,根据 MQTT 协议的内容,还有一个更简单的方案:每次设备标签信息发生变化以后,向一个设备相关的主题上发布一个 retained 消息,里面包含标签信息不就可以了吗?这样无论设备在什么时候连接到 IotHub 都能获取到标签,不再需要 Pull 了。
按照 MQTT 协议,理论上这样是更优的。但是是实际情况和 MQTT 协议是有一点出入的。MQTT 协议规定了如果 Client 不主动设置 clean_session = true,那么 Broker 应该永久为 Client 保存 Session,包括设备订阅的主题、未应答的 QoS>1 的消息等。但在实际情况中,Broker 的存储空间是有限的,Broker 是不会永久保存session的,大部分的 Broker 都会设置一个 session 的过期时间,可以在< EMQ X 安装目录>/etc/emqx.conf
里找到 EMQ X client session 过期时间的配置:
zone.external.session_expiry_interval = 2h
默认情况下 EMQ X Client Session 的过期时间是 2 小时,换句话来说,QoS1 消息的保存时间是 2 小时,你可以根据项目的实际情况调整成更大的值。 但是 Broker 的存储空间是有限的, Session 始终是有过期时间的,这个是你在设计和架构中需要考虑到的。
阿里云 IoT的 QoS1 消息保存时间是7天。
如果我们修改了设备的标签以后,恰好设备离线超出了设置的 session 过期时间,那么设备就收不到标签相关的指令了。 所以这里我们加上了 Pull 模式来保证设备能获取标签数据。
设备订阅
这里我们约定设备通过标签接收下发指令的主题名为:tags/:ProductName/:tag/cmd/:CommandName/:Encoding/:RequestID/:ExpiresAt
。
当设备收到$set_tags
指令后,用自己已订阅的标签和$set_tags
指令数据里的标签数组来对比。来确定需要 subscribe 和 unsubscribe 的标签。 在 MQTT 的架构里,client 是无法知道自己订阅了哪些主题的,所以设备需要在本地保存自己的标签,以便和$set_tags
指令数据里面的标签进行对比,设备需要提供持续性的存储。
假设说设备的存储坏了(这是不可避免的),存储的标签数据没有了,更换了存储重新接入以后,设备对比 IotHub 发来的标签数组,是无法知道它应该 unsubscribe 哪些标签的,所以设备可能会订阅到它不应该订阅的主题。这种情况下我的建议是设备使用新的 ClientID 接入。
在实际的项目我们一般会使用 EMQ X Broker 集群,加上设备的网络状态不是很稳定的情况下,有可能会出现标签指令乱序的情况,比如业务系统连续对一个设备的标签修改两次,结果第二次修改的指令比第一次修改的指令先到达,在设备端,第一次修改的内容就覆盖了第二次修改的内容。
为了避免这种情况的发生,$set_tags
指令会带一个标签信息的版本号 tags_version:
- 业务系统每次修改设备信息时,tags_version 会加 1;
- 设备端收到
$set_tags
指令时,用指令里的 tags_version 和本地保存的 tags_version 对比,如果指令里的 tags_version 大于本地保存的 tags_version,才会执行后续的处理。这里的 tags_version 只是用来应对 MQTT Pubulish 包未按照预定顺序到达设备时的情况,对于业务系统调用 Server API 对设备标签的并发修改,需要其他机制来应对,比如乐观锁,这个和本课程的主题无关,就暂行跳过,不再赘述和实现了。
这一节我们完成了 IotHub 设备分组的功能设计,下一节我们开始服务端的功能实现。