在运行完成Python的模拟设备数据上云,想必大家对设备模拟是啥应该没有很大的疑问了,以物联网设备端到硬件端的流程来,我们接下来介绍下这两个Python文件的源码做了哪些事情。
but,首先有一些通讯上的概念需要你先知道,这对了解平台与设备间通讯非常重要,在我们周围许多的设备通过无线网络(WiFi、ZigBee、2G、NB-IoT、LoRa)等和云平台保持实时通讯,这个过程的建立对于开发者来说非常重要。

MQTT协议

实际上我们的Python脚本使用paho-mqtt包提供的mqtt client客户端与平台建立了MQTT连接,MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,MQTT是一个客户端服务端架构的发布/订阅模式的消息传输协议。它的设计思想是轻巧、开放、简单、规范,易于实现。这些特点使得它对很多场景来说都是很好的选择,特别是对于受限的环境如机器与机器的通信(M2M)以及物联网环境(IoT)。
质量较高的文档推荐:(如有版权问题请联系作者取消连接引用):

除去保证通讯质量的QoS(Quality of Service)之外,MQTT主要提供了一套基于中间代理服务器的订阅机制,很好地将端和端通讯进行异步解耦,解决了同步可能造成过多资源占用的情况,适合物联网使用场景。
Python源码详解 - 图1

iot_device.py详解

iot_device.py文件中包含两个class类对象

  • MessageThread

  • IoTDevice

MessageThread使用多线程(Python的假多线程)来处理接收到平台的下行消息,防止出现下行消息阻塞,作为IoTDevice class的工具类使用,可以方便地将平台下发的消息给MessageThread来处理。
IoTDevice为连接MQTT broker(MQTT服务器)的客户端类,通过paho-mqtt包提供的MQTT客户端进行MQTT的协议封装,与平台进行通讯。

  1. class IoTDevice(object):
  2. PORT = 1883
  3. HOST = ".iot-as-mqtt.cn-shanghai.aliyuncs.com"
  4. PROPERTY_TOPIC = "/sys/%s/%s/thing/event/property/post"
  5. EVENT_TOPIC = "/sys/%s/%s/thing/event/%s/post"
  6. SERVICE_TOPIC = "/sys/%s/%s/thing/service/%s"
  7. OTA_VERSION_TOPIC = "/ota/device/inform/%s/%s"
  8. OTA_UPGRADE_TOPIC = "/ota/device/upgrade/%s/%s"
  9. OTA_PROGRESS_TOPIC = "/ota/device/progress/%s/%s"

以上代码,初始化一些类内作用域的变量,有连接的地址和端口,以及设备需要使用的一些MQTT Topic,Topic是MQTT服务器为各个订阅关系所开辟出来的通道,客户端通过MQTT协议订阅了Topic A,当设备向Topic A发送消息时,MQTT服务器会使用标准MQTT协议将此消息推送给客户端。
基于设备“属性”、“事件”、“服务”的设备模型,设备需要向这些Topic发送消息:

设备模型内容 Topic 什么时候发生
属性上报 /sys/{productKey}/{deviceName}/thing/event/property/post 设备进行属性上报,直接发布(Publish)即可,MQTT服务器维护发布订阅权限和关系
属性回复 /sys/{productKey}/{deviceName}/thing/event/property/post_reply 设备进行属性上报,平台服务器收到消息会进行回复,回复Topic需要设备提前完成订阅
事件上报 /sys/{productKey}/{deviceName}/thing/event/{eventId}/post 设备进行事件上报,每个事件类型具有单独的Topic(eventId)
事件回复 /sys/{productKey}/{deviceName}/thing/event/{eventId}/post_reply 设备进行事件上报,平台服务器接收到会进行回复,需要设备提前订阅
服务订阅 /sys/%s/%s/thing/service/{serviceId} 设备订阅服务Topic,平台可以发起服务并送入参数,分为同步和异步,目前我们使用的异步,即收到消息不需要进行回复
  1. # 初始化类,注入类内元素变量
  2. def __init__(self, pk, dn, ds):
  3. self._product_key = pk
  4. self._device_name = dn
  5. self._device_secret = ds
  6. self._mqtt_client = None
  7. self._version = None
  8. # self._host = pk + self.HOST
  9. self._host = self.HOST
  10. self._port = self.PORT
  11. self._property_topic = self.PROPERTY_TOPIC % (self._product_key, self._device_name)
  12. self._property_reply_topic = self._property_topic + "_reply"
  13. self._version_topic = self.OTA_VERSION_TOPIC % (self._product_key, self._device_name)
  14. self._upgrade_topic = self.OTA_UPGRADE_TOPIC % (self._product_key, self._device_name)
  15. self._progress_topic = self.OTA_PROGRESS_TOPIC % (self._product_key, self._device_name)
  16. self._event_topic_reply_list = []
  17. self._service_topic_dict = {}
  18. self._service_topic_list = None
  19. self._conn_cb = None
  20. self._post_cb = None
  21. self._service_cb = None
  22. self._upgrade_cb = None
  23. # 设备的多线程接收类
  24. self._thread = MessageThread(self._on_async_message)

以上代码主要实现类内一些变量的初始化,init()函数是类被初始化的构造函数。没啥可说的。

  1. # 执行设备各状态回调函数的初始化
  2. def callback_set(self, conn_cb, post_cb, service_cb, upgrade_cb):
  3. self._conn_cb = conn_cb
  4. self._post_cb = post_cb
  5. self._service_cb = service_cb
  6. self._upgrade_cb = upgrade_cb

为IoTDevice类注入一些回调函数的函数,cb(callback),回调函数就是一个通过函数指针调用的函数。 如果你把函数的指针(地址)作为参数传递给另一个函数,当这个指针被用来调用其所指向的函数时,我们就说这是回调函数回调函数不是由该函数的实现方直接调用,而是在特定的事件或条件发生时由另外的一方调用的,用于对该事件或条件进行响应。

run_device.py详解

run_device.py中使用了iot_device.py定义的IoTDevice类(key:value类型),在文件中定义了和业务逻辑相关的函数和方法,具体来说,需要开发者关心和修正的有:

  • device_conf :device_conf是一个python的dict类,其中包含设备连接平台的三元组(productKeydeviceNamedeviceSecret)以及设备上报的固件版本信息(用于平台对设备进行OTA管理)

  • device_random_datadevice_random_data 也是Python的dict类,类中定义了在云平台定义的设备模型中的属性、事件、服务,在后面业务主函数random_loop中会对device_random_data进行处理,