1 __threading.Thread

此方法难以控制同时运行的线程数,如同时只运行4个线

示例1

  1. import time
  2. import random
  3. import logging
  4. import threading
  5. # 显示线程名字: threadName
  6. logging.basicConfig(
  7. format='[%(levelname)s %(asctime)s %(filename)s:%(lineno)d %(threadName)s] %(message)s',
  8. datefmt='%Y-%m-%d %H:%M:%S',
  9. level=logging.DEBUG)
  10. logger = logging.getLogger(__name__)
  11. logger.info('start')
  12. def func(n):
  13. sec = random.randint(1, 7)
  14. logger.info('number {}, sleep {}'.format(n, sec))
  15. time.sleep(sec)
  16. threads = []
  17. for i in range(5):
  18. t = threading.Thread(target=func, args=(i, ))
  19. threads.append(t)
  20. for t in threads: # 开始,执行全部线程
  21. t.start()
  22. for t in threads: # 阻塞,所有线程都结束后才向后执行
  23. t.join()
  24. logger.info('done')

运行结果
image.png
示例2 - 继承Thread类

  1. import time
  2. import random
  3. import logging
  4. import threading
  5. logging.basicConfig(
  6. format='[%(levelname)s %(asctime)s %(filename)s:%(lineno)d %(threadName)s] %(message)s',
  7. datefmt='%Y-%m-%d %H:%M:%S',
  8. level=logging.DEBUG)
  9. logger = logging.getLogger(__name__)
  10. logger.info('start')
  11. class MyThread(threading.Thread):
  12. def __init__(self, n):
  13. super(MyThread, self).__init__()
  14. self.n = n
  15. def run(self):
  16. logger.info('\033[32mstarting {}\033[0m'.format(self.name))
  17. sec = random.randint(1,7)
  18. logger.info('sleep {}, value: {}'.format(sec, self.n))
  19. time.sleep(sec)
  20. logger.info('\033[36mexiting {}\033[0m'.format(self.name))
  21. threads = []
  22. for i in range(1, 6):
  23. t = MyThread(i)
  24. threads.append(t)
  25. for t in threads:
  26. t.start()
  27. for t in threads:
  28. t.join()
  29. logger.info('done')

运行结果
image.png

2 __mulitprocessing.dummy.Pool

用法类似于多进程池 multiprocessing.Pool multiprocessing.dummy.Pool可以用于类中 multiprocessing.pool.ThreadPool的target不能是类的方法

  1. import time
  2. import logging
  3. from multiprocessing.dummy import Pool as ThreadPool
  4. # from multiprocessing.pool import ThreadPool
  5. logging.basicConfig(
  6. format='[%(levelname)s %(asctime)s %(filename)s:%(lineno)d %(threadName)s] %(message)s',
  7. datefmt='%Y-%m-%d %H:%M:%S',
  8. level=logging.DEBUG)
  9. logger = logging.getLogger(__name__)
  10. start_time = time.time()
  11. logger.info('start')
  12. def func(n):
  13. logger.info('starting {}'.format(n))
  14. sec = 2
  15. logger.info('number {}, sleep {}'.format(n, sec))
  16. time.sleep(sec)
  17. logger.info('exiting {}'.format(n))
  18. p = ThreadPool(3)
  19. for i in range(1, 6):
  20. p.apply_async(func, args=(i,))
  21. p.close() # 关闭线程池,不可以再添加子线程
  22. p.join() # 阻塞,之前需要先close
  23. logger.info('done')
  24. logger.info('time used: {:.2f}s'.format(time.time() - start_time))

运行结果
image.png

3 _concurrent.futures.ThreadPoolExecutor_

第三方包 可以指定线程数

示例

  1. import time
  2. import logging
  3. from concurrent.futures import ThreadPoolExecutor
  4. logging.basicConfig(
  5. format='[%(levelname)s %(asctime)s %(filename)s:%(lineno)d %(threadName)s] %(message)s',
  6. datefmt='%Y-%m-%d %H:%M:%S',
  7. level=logging.DEBUG)
  8. logger = logging.getLogger(__name__)
  9. start_time = time.time()
  10. logger.info('start')
  11. def func(n):
  12. logger.info('starting {}'.format(n))
  13. sec = 2
  14. logger.info('number {}, sleep {}'.format(n, sec))
  15. time.sleep(sec)
  16. logger.info('exiting {}'.format(n))
  17. t = ThreadPoolExecutor(2)
  18. for i in range(1, 6):
  19. t.submit(func, i)
  20. t.shutdown()
  21. logger.info('done')
  22. logger.info('time used: {:.2f}s'.format(time.time() - start_time))

运行结果
image.png

4 map, map_async, apply, apply_async

https://www.dazhuanlan.com/2019/12/13/5df300f615954/

Method Multi-args Concurrence Blocking Ordered-results
apply YES NO YES NO
apply_async YES YES NO NO
map NO YES YES YES
map_async NO YES NO YES