class ConverterWorker(Thread):def __init__(self, name, incoming_queue, send_result):super().__init__()self.stopped = Falseself.setName(name)self.setDaemon(True)self.__msg_queue = incoming_queueself.in_progress = Falseself.__send_result = send_resultdef run(self):while not self.stopped:if not self.__msg_queue.empty():self.in_progress = Trueconvert_function, config, incoming_data = self.__msg_queue.get(True, 100)# 转换converted_data = convert_function(config, incoming_data)log.debug(converted_data)# 发送gatewayservice处理self.__send_result(config, converted_data)self.in_progress = Falseelse:sleep(.2)
tip:
- 连接器的_on_message回调里把没有转换的消息放到__msg_queue队列
- 在消息转换线程里取消息并且进行转换,转换好的数据发送给gatewayservice处理
