- mqtt客户端连接
- 启动消息转换线程,处理的队列为__msg_queue
连接器启动函数open:
def open(self):self.__stopped = Falseself.start()
selft.start()启动线程,执行run()函数:
def run(self):try:# 核心self.__connect()except Exception as e:self.__log.exception(e)try:self.close()except Exception as e:self.__log.exception(e)while True:if self.__stopped:breakelif not self._connected:self.__connect()self.__threads_manager()sleep(.2)
核心是connect()函数,需要注意的是threads_manager()函数
def __connect(self):while not self._connected and not self.__stopped:try:# 核心self._client.connect(self.__broker['host'],self.__broker.get('port', 1883))self._client.loop_start()if not self._connected:sleep(1)except ConnectionRefusedError as e:self.__log.error(e)sleep(10)
最终是由_client进行连接,_client是paho.mqtt.client。
__threads_manager()函数:
def __threads_manager(self):if len(self.__workers_thread_pool) == 0:# 创建转换线程,self.__msg_queue是处理的消息队列,self._save_converted_msg是处理消息的函数worker = MqttConnector.ConverterWorker("Main", self.__msg_queue, self._save_converted_msg)self.__workers_thread_pool.append(worker)worker.start()# 根据消息队列容量的大小和每个工作线程处理最大处理消息的数量计算出需要的线程number_of_needed_threads = round(self.__msg_queue.qsize() / self.__max_msg_number_for_worker, 0)threads_count = len(self.__workers_thread_pool)if number_of_needed_threads > threads_count < self.__max_number_of_workers:thread = MqttConnector.ConverterWorker("Worker " + ''.join(random.choice(string.ascii_lowercase) for _ in range(5)), self.__msg_queue,self._save_converted_msg)self.__workers_thread_pool.append(thread)thread.start()elif number_of_needed_threads < threads_count and threads_count > 1:worker: MqttConnector.ConverterWorker = self.__workers_thread_pool[-1]if not worker.in_progress:worker.stopped = Trueself.__workers_thread_pool.remove(worker)
说明:__threads_manager函数作用就是调整转换消息的线程池
