1. class ConverterWorker(Thread):
    2. def __init__(self, name, incoming_queue, send_result):
    3. super().__init__()
    4. self.stopped = False
    5. self.setName(name)
    6. self.setDaemon(True)
    7. self.__msg_queue = incoming_queue
    8. self.in_progress = False
    9. self.__send_result = send_result
    10. def run(self):
    11. while not self.stopped:
    12. if not self.__msg_queue.empty():
    13. self.in_progress = True
    14. convert_function, config, incoming_data = self.__msg_queue.get(True, 100)
    15. # 转换
    16. converted_data = convert_function(config, incoming_data)
    17. log.debug(converted_data)
    18. # 发送gatewayservice处理
    19. self.__send_result(config, converted_data)
    20. self.in_progress = False
    21. else:
    22. sleep(.2)

    tip:

    1. 连接器的_on_message回调里把没有转换的消息放到__msg_queue队列
    2. 在消息转换线程里取消息并且进行转换,转换好的数据发送给gatewayservice处理