这一节我们来设计和实现 IotHub 设备影子服务端的功能。
服务端需要对设备影子进行存储,在业务系统修改设备影子时,需要将设备影子同步到设备端,同时还需要处理来自于设备的设备影子同步消息来将设备端的数据同步到数据库中。
最后服务端还要提供接口供业务系统查询和修改设备影子。
存储设备影子
我们在 Device 模型里新增一个字段shadow来保存设备的影子,一个空的设备影子应该是:
{"state":{},"metadata":{},"version":0}
我们按照这个来设置这个字段的默认值:
//IotHub_Server/models/device.jsconst deviceSchema = new Schema({...shadow:{type: String,default: JSON.stringify({"state":{},"metadata":{},"version":0})}})
下发设备影子相关的指令
IotHub 需要向设备发送两种设备影子相关的指令,一个是更新影子,这里使用指令名$update_shadow,另外一个是成功更新设备影子后,对设备的回复信息,这里使用指令名$shadow_reply。发送这两条指令使用 IotHub 指令下发的通道就可以了。
设备端发送影子相关消息
设备端会向 IotHub 发送三种影子相关的消息,IotHub Server 需要对这些消息进行回应:
- 设备主动请求影子数据,使用设备数据请求的通道,resource 名为”$shadow”;
- 设备更新完状态后向 IotHub 回复的消息,这里我们使用上传数据的通道,将 DataType 设为”$shadow_updated”;
- 设备主动更新影子数据,这里我们使用上传数据的通道,将 DataType 设为”$shadow_reporeted”。
服务端更新设备影子
Server API
IotHub 提供一个接口供业务系统修改设备的影子,接收一个 JSON 对象{desired:{key1=value1, ...}, version=xx}作为参数,业务系统在调用时需要提供影子的版本,以避免业务系统用老版本的数据覆盖当前的。
如果业务系统请求的 version 大于当前的影子 version,则更新影子的 desired 字段,以及相关的 metadata 字段,更新成功以后向设备下发指令”$update_shadow”://IotHub_Server/routes/devices.jsrouter.put("/:productName/:deviceName/shadow", function (req, res) {var productName = req.params.productNamevar deviceName = req.params.deviceNameDevice.findOne({"product_name": productName, "device_name": deviceName}, function (err, device) {if (err != null) {res.send(err)} else if (device != null) {if(device.updateShadowDesired(req.body.desired, req.body.version)){res.status(200).send("ok")}else{res.status(409).send("version out of date")}} else {res.status(404).send("device not found")}})})
因为设备在连接时还会主动请求一次影子数据,所以这里使用 qos=0 就可以了。//IotHub_Server/models/device.jsdeviceSchema.methods.updateShadowDesired = function (desired, version) {var ts = Math.floor(Date.now() / 1000)var shadow = JSON.parse(this.shadow)if (version > shadow.version) {shadow.state.desired = shadow.state.desired || {}shadow.metadata.desired = shadow.metadata.desired || {}for (var key in desired) {shadow.state.desired[key] = desired[key]shadow.metadata.desired[key] = {timestamp: ts}}shadow.version = versionshadow.timestamp = tsthis.shadow = JSON.stringify(shadow)this.save()this.sendUpdateShadow()return true} else {return false}}deviceSchema.methods.sendUpdateShadow= function(){this.sendCommand({commandName: "$update_shadow",data: this.shadow,qos: 0})
响应设备端影子消息
影子数据请求
在收到 resource 名为$shadow的数据请求后,IotHub 应该下发”$update_shadow”指令://IotHub_Server/services/message_service.jsstatic handleDataRequest({productName, deviceName, resource, payload, ts}) {if (resource.startsWith("$")) {...} else if (resource == "$shadow_updated") {Device.findOne({product_name: productName, device_name: deviceName}, function (err, device) {if (device != null) {device.sendUpdateShadow()}})}}...}
状态更新回复
在收到 DataType=”$shadow_updated” 的上传数据后,IotHub 应该按照数据的内容对设备影子进行更新:
更新时需要先检查回复的 version,同时如果 desired 中的字段值为 null 的话,需要在 reported 里面删除相应的字段,更新成功后需要回复设备://IotHub_Server/service/message_service.jsstatic handleUploadData({productName, deviceName, ts, payload, messageId, dataType} = {}) {if (dataType.startsWith("$")) {if (dataType == "$shadow") {Device.findOne({product_name: productName, device_name: deviceName}, function (err, device) {if (device != null) {device.updateShadow(JSON.parse(payload.toString()))}})}} else {...}}
//IotHub_Server/models/device.jsdeviceSchema.methods.updateShadow = function (shadowUpdated) {var ts = Math.floor(Date.now() / 1000)var shadow = JSON.parse(this.shadow)if (shadow.version == shadowUpdated.version) {if (shadowUpdated.state.desired == null) {shadow.state.desired = shadow.state.desired || {}shadow.state.reported = shadow.state.reported || {}shadow.metadata.reported = shadow.metadata.reported || {}for (var key in shadow.state.desired) {if (shadow.state.desired[key] != null) {shadow.state.reported[key] = shadowUpdated.state.desired[key]shadow.metadata.reported[key] = {timestamp: ts}} else {delete(shadow.state.reported[key])delete(shadow.metadata.reported[key])}}shadow.timestamp = tsshadow.version = shadow.version + 1delete(shadow.state.desired)delete(shadow.metadata.desired)this.shadow = JSON.stringify(shadow)this.save()this.sendCommand({commandName: "$shadow_reply",data: JSON.stringify({status: "success", timestamp: ts, version: shadow.version}),qos: 0})}} else {this.sendUpdateShadow()}}
设备主动更新影子
在收到 DataType=”$shadow_reported” 的上传数据后,IotHub 应该按照数据的内容对设备影子进行更新:
在更新影子时也需要检查 version 和 null 字段://IotHub_Server/services/message_service.jsstatic handleUploadData({productName, deviceName, ts, payload, messageId, dataType} = {}) {if (dataType.startsWith("$")) {...else if("$shadow_updated"){Device.findOne({product_name: productName, device_name: deviceName}, function (err, device) {if (device != null) {device.reportShadow(JSON.parse(payload.toString()))}})}}...}
//IotHub_Server/models/device.jsdeviceSchema.methods.reportShadow = function (shadowReported) {var ts = Math.floor(Date.now() / 1000)var shadow = JSON.parse(this.shadow)if (shadow.version == shadowReported.version) {shadow.state.reported = shadow.state.reported || {}shadow.metadata.reported = shadow.metadata.reported || {}for (var key in shadowReported.state.reported) {if (shadowReported.state.reported[key] != null) {shadow.state.reported[key] = shadowReported.state.reported[key]shadow.metadata.reported[key] = {timestamp: ts}} else {delete(shadow.state.reported[key])delete(shadow.metadata.reported[key])}}shadow.timestamp = tsshadow.version = shadow.version + 1this.shadow = JSON.stringify(shadow)this.save()this.sendCommand({commandName: "$shadow_reply",data: JSON.stringify({status: "success", timestamp: ts, version: shadow.version}),qos: 0})} else {this.sendUpdateShadow()}}
查询设备影子详情
最后只需要在设备详情接口返回设备影子的数据就可以了:
这一节我们完成了 IotHub 设备影子的服务端实现,下一节我们来实现设备影子的设备端实现,并写一些代码来验证这个功能。//IotHub_Server/models/device.jsdeviceSchema.methods.toJSONObject = function () {return {...shadow: JSON.parse(this.shadow),}}
