1 __threading.Thread
此方法难以控制同时运行的线程数,如同时只运行4个线
示例1
import timeimport randomimport loggingimport threading# 显示线程名字: threadNamelogging.basicConfig(format='[%(levelname)s %(asctime)s %(filename)s:%(lineno)d %(threadName)s] %(message)s',datefmt='%Y-%m-%d %H:%M:%S',level=logging.DEBUG)logger = logging.getLogger(__name__)logger.info('start')def func(n):sec = random.randint(1, 7)logger.info('number {}, sleep {}'.format(n, sec))time.sleep(sec)threads = []for i in range(5):t = threading.Thread(target=func, args=(i, ))threads.append(t)for t in threads: # 开始,执行全部线程t.start()for t in threads: # 阻塞,所有线程都结束后才向后执行t.join()logger.info('done')
运行结果
示例2 - 继承Thread类
import timeimport randomimport loggingimport threadinglogging.basicConfig(format='[%(levelname)s %(asctime)s %(filename)s:%(lineno)d %(threadName)s] %(message)s',datefmt='%Y-%m-%d %H:%M:%S',level=logging.DEBUG)logger = logging.getLogger(__name__)logger.info('start')class MyThread(threading.Thread):def __init__(self, n):super(MyThread, self).__init__()self.n = ndef run(self):logger.info('\033[32mstarting {}\033[0m'.format(self.name))sec = random.randint(1,7)logger.info('sleep {}, value: {}'.format(sec, self.n))time.sleep(sec)logger.info('\033[36mexiting {}\033[0m'.format(self.name))threads = []for i in range(1, 6):t = MyThread(i)threads.append(t)for t in threads:t.start()for t in threads:t.join()logger.info('done')
运行结果
2 __mulitprocessing.dummy.Pool
用法类似于多进程池 multiprocessing.Pool multiprocessing.dummy.Pool可以用于类中 multiprocessing.pool.ThreadPool的target不能是类的方法
import timeimport loggingfrom multiprocessing.dummy import Pool as ThreadPool# from multiprocessing.pool import ThreadPoollogging.basicConfig(format='[%(levelname)s %(asctime)s %(filename)s:%(lineno)d %(threadName)s] %(message)s',datefmt='%Y-%m-%d %H:%M:%S',level=logging.DEBUG)logger = logging.getLogger(__name__)start_time = time.time()logger.info('start')def func(n):logger.info('starting {}'.format(n))sec = 2logger.info('number {}, sleep {}'.format(n, sec))time.sleep(sec)logger.info('exiting {}'.format(n))p = ThreadPool(3)for i in range(1, 6):p.apply_async(func, args=(i,))p.close() # 关闭线程池,不可以再添加子线程p.join() # 阻塞,之前需要先closelogger.info('done')logger.info('time used: {:.2f}s'.format(time.time() - start_time))
运行结果
3 _concurrent.futures.ThreadPoolExecutor_
第三方包 可以指定线程数
示例
import timeimport loggingfrom concurrent.futures import ThreadPoolExecutorlogging.basicConfig(format='[%(levelname)s %(asctime)s %(filename)s:%(lineno)d %(threadName)s] %(message)s',datefmt='%Y-%m-%d %H:%M:%S',level=logging.DEBUG)logger = logging.getLogger(__name__)start_time = time.time()logger.info('start')def func(n):logger.info('starting {}'.format(n))sec = 2logger.info('number {}, sleep {}'.format(n, sec))time.sleep(sec)logger.info('exiting {}'.format(n))t = ThreadPoolExecutor(2)for i in range(1, 6):t.submit(func, i)t.shutdown()logger.info('done')logger.info('time used: {:.2f}s'.format(time.time() - start_time))
4 map, map_async, apply, apply_async
| Method | Multi-args | Concurrence | Blocking | Ordered-results |
|---|---|---|---|---|
| apply | YES | NO | YES | NO |
| apply_async | YES | YES | NO | NO |
| map | NO | YES | YES | YES |
| map_async | NO | YES | NO | YES |
