接收设备端消息,在_on_connectd函数里订阅了mqtt.json配置文件里的一下主题:
- sensor/data 设备的遥测数据
- sensor/connect 设备连接
- sensor/disconnect 设备断开连接
- v1/devices/me/attributes/request 获取设备属性
1、设备遥测数据消息
request_handled = self.put_data_to_convert(converter, message, content)
put_data_to_convert:
def put_data_to_convert(self, converter, message, content) -> bool:
if not self.__msg_queue.full():
self.__msg_queue.put((converter.convert, message.topic, content), True, 100)
return True
return False
设备遥测数据是直接扔到消息队列__msg_queue里去的,没有做处理,在ConverterWorker线程里处理好了再交由gatewayservice处理
2、设备连接消息
self.__gateway.add_device(found_device_name, {"connector": self}, device_type=found_device_type)
3、设备断开连接消息
self.__gateway.del_device(found_device_name)
4、获取设备属性消息
self.__gateway.tb_client.client.gw_request_shared_attributes(
found_device_name,
[found_attribute_name],
lambda data, *args: self.notify_attribute(
data,
found_attribute_name,
handler.get("topicExpression"),
handler.get("valueExpression"),
handler.get('retain', False)))
5、设备的回复服务端的rpc消息
if self.__gateway.is_rpc_in_progress(message.topic):
self.__gateway.rpc_with_reply_processing(message.topic, content)
return None
说明:把消息交由gatewayservice处理