1. mqtt客户端连接
    2. 启动消息转换线程,处理的队列为__msg_queue

    连接器启动函数open:

    1. def open(self):
    2. self.__stopped = False
    3. self.start()

    selft.start()启动线程,执行run()函数:

    1. def run(self):
    2. try:
    3. # 核心
    4. self.__connect()
    5. except Exception as e:
    6. self.__log.exception(e)
    7. try:
    8. self.close()
    9. except Exception as e:
    10. self.__log.exception(e)
    11. while True:
    12. if self.__stopped:
    13. break
    14. elif not self._connected:
    15. self.__connect()
    16. self.__threads_manager()
    17. sleep(.2)

    核心是connect()函数,需要注意的是threads_manager()函数

    1. def __connect(self):
    2. while not self._connected and not self.__stopped:
    3. try:
    4. # 核心
    5. self._client.connect(self.__broker['host'],
    6. self.__broker.get('port', 1883))
    7. self._client.loop_start()
    8. if not self._connected:
    9. sleep(1)
    10. except ConnectionRefusedError as e:
    11. self.__log.error(e)
    12. sleep(10)

    最终是由_client进行连接,_client是paho.mqtt.client。
    __threads_manager()函数:

    1. def __threads_manager(self):
    2. if len(self.__workers_thread_pool) == 0:
    3. # 创建转换线程,self.__msg_queue是处理的消息队列,self._save_converted_msg是处理消息的函数
    4. worker = MqttConnector.ConverterWorker("Main", self.__msg_queue, self._save_converted_msg)
    5. self.__workers_thread_pool.append(worker)
    6. worker.start()
    7. # 根据消息队列容量的大小和每个工作线程处理最大处理消息的数量计算出需要的线程
    8. number_of_needed_threads = round(self.__msg_queue.qsize() / self.__max_msg_number_for_worker, 0)
    9. threads_count = len(self.__workers_thread_pool)
    10. if number_of_needed_threads > threads_count < self.__max_number_of_workers:
    11. thread = MqttConnector.ConverterWorker(
    12. "Worker " + ''.join(random.choice(string.ascii_lowercase) for _ in range(5)), self.__msg_queue,
    13. self._save_converted_msg)
    14. self.__workers_thread_pool.append(thread)
    15. thread.start()
    16. elif number_of_needed_threads < threads_count and threads_count > 1:
    17. worker: MqttConnector.ConverterWorker = self.__workers_thread_pool[-1]
    18. if not worker.in_progress:
    19. worker.stopped = True
    20. self.__workers_thread_pool.remove(worker)

    说明:__threads_manager函数作用就是调整转换消息的线程池