多线程&多任务介绍

多线程&多任务通常是指将一个任务或多个任务运行在子线程,并且子线程可以独立启动,或通过线程池启动。子线程通常用于执行以下类型的任务:

  • 长时间运行的任务:子线程适合处理那些耗时较长的任务,以避免阻塞主线程的执行。例如,进行复杂的计算、图像处理、视频编解码等任务可以放在子线程中执行,以保持应用程序的响应性。

  • 阻塞型任务:如果有一些阻塞型的操作,可能会导致主线程被阻塞,例如进行网络请求、文件读写、数据库查询等。将这些任务放在子线程中执行可以确保主线程的流畅运行,同时避免应用程序的无响应状态。

  • 并行处理任务:如果有多个独立的任务需要同时执行,可以将它们分配给多个子线程来实现并行处理。例如,批量下载多个文件、同时进行多个数据处理任务等。

  • 异步操作:子线程可以用于执行异步操作,例如在后台处理数据、定时任务、监听外部设备的输入等。这样可以保持应用程序的其他部分正常运行,同时处理异步任务。

需要注意的是,使用子线程需要谨慎处理线程间的数据共享和同步问题,以避免出现竞态条件或其他并发问题。在 PyQt 中,可以使用线程间通信机制(如信号槽机制)来安全地传递数据和操作UI元素。

多线程管理

通常线程有两种启动方式,分别对应不同的使用场景:

  • 将任务直接运行在一个独立的线程里

用于执行一次性长期任务(例如:并行任务,多线程下载),长期循环阻塞式接收任务(例如:接收蓝牙、网络、串口消息)

  • 将任务运行在一个线程池的线程里

用于执行频繁触发型短期任务,避免线程资源浪费(例如:数据库操作、文件写入、网络数据下载),因为线程池可以重复利用其内部维护的N个线程,按需创建,减少资源的申请与释放操作。

拷贝依赖

  1. """
  2. 线程管理器,用于管理线程的创建、启动、停止等。
  3. 包含两种线程启动方式:
  4. 1. start 运行一个独立线程,用于执行一次性短期任务(例如:并行任务,多线程下载)、长期循环接收任务(例如:阻塞式接收蓝牙、串口消息)
  5. 2. start_in_thread_pool 在一个线程池里运行任务,用于执行非定期触发型短期任务,避免线程资源浪费(例如:数据库操作、文件写入、网络数据下载)
  6. 其中包含了一个 Worker 类
  7. ● Worker 类用于创建线程任务
  8. Worker 类继承自 QRunnable 类,用于创建线程任务。
  9. ● 其中包含了一个 run 方法,用于执行线程任务
  10. ● 一个 signal_connect 方法,用于连接信号槽
  11. ● 一个 stop 方法,用于停止线程任务
  12. ● 一个 emit_msg 方法,用于发送消息。
  13. """
  14. import inspect
  15. from typing import Callable
  16. from PyQt5.QtCore import pyqtSignal, QObject, QRunnable, QThread, QThreadPool
  17. class WorkerSignals(QObject):
  18. signal_finished = pyqtSignal()
  19. signal_error = pyqtSignal(Exception)
  20. signal_result = pyqtSignal(object)
  21. signal_msg = pyqtSignal(object)
  22. class Worker(QRunnable):
  23. def __init__(self, target: Callable, args=None, kwargs=None):
  24. super().__init__()
  25. self.setAutoDelete(True) # 自动删除,避免内存泄漏
  26. self.__func = target
  27. self.__args = args if args else ()
  28. self.__kwargs = kwargs if kwargs else {}
  29. self.__signals = WorkerSignals()
  30. self.is_running = False
  31. self.worker_thread: WorkerThread = None
  32. def run(self):
  33. self.is_running = True
  34. try:
  35. # 如果func的第一个参数是Worker类型,则将self作为第一个参数传入
  36. if self.__is_worker_func(self.__func):
  37. result = self.__func(self, *self.__args, **self.__kwargs)
  38. else:
  39. # 否则,直接传入参数
  40. result = self.__func(*self.__args, **self.__kwargs)
  41. self.__signals.signal_result.emit(result)
  42. except Exception as e:
  43. self.__signals.signal_error.emit(e)
  44. finally:
  45. self.is_running = False
  46. self.__signals.signal_finished.emit()
  47. def signal_connect(self, msg_handler=None, result_handler=None, finished_handler=None, error_handler=None):
  48. if msg_handler:
  49. self.__signals.signal_msg.connect(msg_handler)
  50. if result_handler:
  51. self.__signals.signal_result.connect(result_handler)
  52. if finished_handler:
  53. self.__signals.signal_finished.connect(finished_handler)
  54. if error_handler:
  55. self.__signals.signal_error.connect(error_handler)
  56. return self
  57. def stop(self):
  58. self.is_running = False
  59. def emit_msg(self, msg):
  60. self.__signals.signal_msg.emit(msg)
  61. def start(self, daemon=True):
  62. """
  63. 1. 运行一个独立线程,用于执行一次性短期任务(例如:并行任务,多线程下载)、长期循环接收任务(例如:阻塞式接收蓝牙、串口消息)
  64. :return:
  65. """
  66. self.worker_thread = WorkerThread(self)
  67. self.worker_thread.daemon = daemon
  68. self.worker_thread.start()
  69. return self.worker_thread
  70. def start_in_thread_pool(self):
  71. """
  72. 2. 在一个线程池里运行任务,用于执行非定期的短期任务,避免线程资源浪费(例如:文件写出、网络数据下载)
  73. :param refresh_worker: 任务
  74. """
  75. QThreadPool.globalInstance().start(self)
  76. @classmethod
  77. def __is_worker_func(cls, func: Callable):
  78. """
  79. 判断一个函数是否是worker函数,worker函数的第一个参数必须是Worker类型
  80. :param func:
  81. :return:
  82. """
  83. sig = inspect.signature(func)
  84. # 判断第一个参数是否是Worker类型,或者参数名是否是worker
  85. param_keys = list(sig.parameters.keys())
  86. if len(param_keys) > 0:
  87. first_param = sig.parameters[param_keys[0]]
  88. if first_param.annotation == Worker:
  89. return True
  90. if first_param.name == "worker":
  91. return True
  92. return False
  93. class WorkerThread(QThread):
  94. def __init__(self, worker: Worker):
  95. super().__init__()
  96. self.__worker = worker
  97. def run(self):
  98. self.__worker.run()

使用示例

以下代码分别显示了如下两个应用场景:

  • 使用方式1:单线程循环接收消息示例
  • 使用方式2:利用线程池异步执行多个独立任务示例 ```python

    使用示例

    import sys import os import time import threading

from PyQt5.QtCore import pyqtSlot from PyQt5.QtWidgets import QApplication import requests

from qt_worker import Worker

def long_time_recv_task(worker: Worker, title, start): counter = start thread_name = threading.currentThread().name while worker.is_running:

  1. # 模拟阻塞(等待网络、串口、蓝牙等)
  2. time.sleep(1)
  3. # 模拟收到消息
  4. worker.emit_msg(f"{title} long time task {counter} : {thread_name}")
  5. counter += 1
  6. if counter >= 110:
  7. break
  8. return "refresh_worker done: {}".format(counter)

@pyqtSlot(object) def on_result_received(msg): thread_name = threading.currentThread().name print(f”result: < {msg} > {thread_name}”)

def single_recv_thread_test(): “”” 单线程循环接收消息 :return: “”” worker = Worker(long_time_recv_task, args=(“消息接收”,), kwargs={“start”: 100}) worker.signal_connect( msg_handler=lambda msg: print(msg), result_handler=on_result_received, ) worker.start()

def pic_download_task(url): “”” 下载图片, 保存到pic目录 :param url: 图片地址 :return: “”” response = requests.get(url) if response.status_code != 200: print(“连接图片服务器失败:”, response.status_code) return file_name = url.split(‘/‘)[-1]

  1. # 如果pic目录不存在,则创建
  2. if not os.path.exists('pic'):
  3. os.mkdir('pic')
  4. with open(f"pic/{file_name}", 'wb') as f:
  5. f.write(response.content)
  6. # 返回f的绝对路径
  7. return "{} -> {}".format(url, os.path.abspath(f.name))

def thread_pool_test(): “”” 利用线程池连续下载多个图片文件 :return: “””

  1. pics = [
  2. "https://www.baidu.com/img/bd_logo1.png",
  3. "https://c-ssl.duitang.com/uploads/blog/202305/26/EWSwLxqBhV5zZJa.jpg",
  4. "https://c-ssl.duitang.com/uploads/blog/202305/26/lGSxjBMefx04z33.jpg",
  5. "https://c-ssl.duitang.com/uploads/blog/202305/26/XxSLogyQCQd9emB.jpg",
  6. "https://c-ssl.duitang.com/uploads/item/202002/26/20200226215648_yynrr.jpg",
  7. ]
  8. for pic in pics:
  9. worker = Worker(pic_download_task, args=(pic,))
  10. worker.signal_connect(result_handler=lambda msg: print("保存成功:", msg))
  11. worker.start_in_thread_pool()

if name == ‘main‘: app = QApplication(sys.argv)

  1. # 使用方式1:单线程循环接收消息示例
  2. single_recv_thread_test()
  3. # 使用方式2:利用线程池异步执行多个任务示例(下载多个图片文件)
  4. thread_pool_test()
  5. sys.exit(app.exec_())
  1. <a name="rWkay"></a>
  2. ## 多任务管理
  3. 指开启一个长期运行的线程,在线程内部,运行一个循环,循环中阻塞式地接收用户发来的任务(定期或非定期),并及时按照用户预定的函数进行执行(通常要消耗一小段时间)。这个多任务管理器应有如下特点:
  4. - 不会阻塞主线程(保障主界面操作顺滑)
  5. - 一旦任务完成,将执行结果返回给任务的发布者。这可以通过回调函数、事件或其他适当的机制来实现。
  6. - 如果任务执行过程中发生异常,需要将异常信息返回给任务的发布者,以便了解并采取适当的处理措施。
  7. - 可以保障任务的执行顺序和发布任务的顺序一致(通过队列保证要求)
  8. - 这个长期任务管理器可以通过调用方法进行停止
  9. <a name="woh64"></a>
  10. ### 拷贝依赖
  11. 这段代码定义了一个 `TaskWorker` 类,该类继承自 `QThread` 类,并使用队列实现了多个任务的异步执行。
  12. ```python
  13. """
  14. 运行一个独立线程,用于执行长期循环发送任务,可以随时执行异步任务,内部维护一个消息队列(例如:发送蓝牙、串口消息)
  15. 这段代码定义了一个 TaskWorker 类,该类继承自 QThread 类,并使用队列实现了任务的异步执行。
  16. """
  17. from queue import Queue
  18. from PyQt5.QtCore import QThread, pyqtSignal
  19. class TaskWorker(QThread):
  20. taskResult = pyqtSignal(object)
  21. taskError = pyqtSignal(Exception)
  22. taskFinished = pyqtSignal()
  23. def __init__(self, do_task, parent=None):
  24. super(TaskWorker, self).__init__(parent)
  25. self.do_task = do_task
  26. self.task_queue = Queue()
  27. self.is_running = True
  28. def run(self):
  29. while self.is_running:
  30. # 不断从队列中取出任务并执行,如果没有任务则阻塞
  31. task_arg = self.task_queue.get()
  32. # 如果取出的任务为 None,且线程已设置为关闭,则退出线程
  33. if task_arg is None and not self.is_running:
  34. break
  35. try:
  36. result = self.do_task(task_arg)
  37. self.taskResult.emit(result)
  38. except Exception as e:
  39. self.taskError.emit(e)
  40. self.taskFinished.emit()
  41. def signal_connect(self, result_handler=None, finished_handler=None, error_handler=None):
  42. # Connect the worker's signal to the handler slot
  43. if result_handler is not None:
  44. self.taskResult.connect(result_handler)
  45. if finished_handler is not None:
  46. self.taskFinished.connect(finished_handler)
  47. if error_handler is not None:
  48. self.taskError.connect(error_handler)
  49. return
  50. def join_queue(self, task):
  51. if not self.is_running:
  52. return
  53. self.task_queue.put(task)
  54. def stop(self):
  55. self.is_running = False
  56. # Put a None task to the queue to stop the thread
  57. self.task_queue.put(None)

使用示例

主程序中创建了一个 TaskWorker 实例,将任务添加到任务队列中,并使用手动/定时器定期添加任务。
当任务完成时,TaskWorker 实例会发出信号,主程序中的槽函数会接收到这些信号并进行处理。

代码具体步骤如下:

  1. 在主程序中定义一个 do_task 函数,该函数用于执行任务。
  2. 在主程序中定义两个槽函数 on_resulton_error,分别用于处理任务完成和任务出错的信号。
  3. 创建一个 TaskWorker 实例,并传入任务函数,函数里是任务要执行的内容
  4. TaskWorker 实例的信号连接到槽函数。
  5. 运行 TaskWorker 实例的start方法启动线程
  6. 运行主程序,通过各种方式把任务及任务参数通过join_queue加入队列
  7. 等待多个任务执行完成。 ```python “”” 主程序中创建了一个 TaskWorker 实例,将任务添加到任务队列中,并使用定时器定期添加任务。 当任务完成时,TaskWorker 实例会发出信号,主程序中的槽函数会接收到这些信号并进行处理。

    代码具体步骤如下:

    1. 在主程序中定义一个 do_task 函数,该函数用于执行任务。
    2. 在主程序中定义两个槽函数 on_result 和 on_error,分别用于处理任务完成和任务出错的信号。
    3. 创建一个 TaskWorker 实例。
    4. 将 TaskWorker 实例的信号连接到槽函数。
    5. 将任务添加到任务队列中,并使用定时器定期添加任务。
    6. 运行主程序,等待任务执行完成。 “”” from PyQt5.QtWidgets import QApplication from PyQt5.QtCore import pyqtSlot, QTimer import sys import threading import time

from qt_task import TaskWorker

def do_task(task_arg): a, b = task_arg rst = a / b

  1. # Simulate a time-consuming task
  2. time.sleep(1) # Pause for 1 seconds
  3. return f"Task signal_result {a} / {b} = {rst}"

@pyqtSlot(object) def on_result(result):

  1. # print(threading.currentThread())
  2. print("on result: ", result)

@pyqtSlot(Exception) def on_error(e):

  1. # print(threading.currentThread())
  2. print("on error: ", e)

if name == ‘main‘: app = QApplication(sys.argv)

  1. print("main: ", threading.currentThread())
  2. task_worker = TaskWorker(do_task)
  3. task_worker.signal_connect(
  4. result_handler=on_result,
  5. finished_handler=lambda: print("on finished"),
  6. error_handler=on_error,
  7. )
  8. task_worker.start()
  9. # Add tasks to the task manager
  10. task_worker.join_queue((3, 2))
  11. task_worker.join_queue((4, 2))
  12. task_worker.join_queue((5, 2))
  13. task_worker.join_queue((5, 0))
  14. # Use a timer to add tasks periodically
  15. timer = QTimer()
  16. timer.timeout.connect(lambda: task_worker.join_queue((5, 2)))
  17. timer.start(3000) # Add a task every 5 seconds
  18. # 执行一个10秒后的延时任务
  19. QTimer.singleShot(10000, lambda: task_worker.stop())
  20. sys.exit(app.exec_())

```