class ConverterWorker(Thread):
def __init__(self, name, incoming_queue, send_result):
super().__init__()
self.stopped = False
self.setName(name)
self.setDaemon(True)
self.__msg_queue = incoming_queue
self.in_progress = False
self.__send_result = send_result
def run(self):
while not self.stopped:
if not self.__msg_queue.empty():
self.in_progress = True
convert_function, config, incoming_data = self.__msg_queue.get(True, 100)
# 转换
converted_data = convert_function(config, incoming_data)
log.debug(converted_data)
# 发送gatewayservice处理
self.__send_result(config, converted_data)
self.in_progress = False
else:
sleep(.2)
tip:
- 连接器的_on_message回调里把没有转换的消息放到__msg_queue队列
- 在消息转换线程里取消息并且进行转换,转换好的数据发送给gatewayservice处理