1 __threading.Thread
此方法难以控制同时运行的线程数,如同时只运行4个线
示例1
import time
import random
import logging
import threading
# 显示线程名字: threadName
logging.basicConfig(
format='[%(levelname)s %(asctime)s %(filename)s:%(lineno)d %(threadName)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.info('start')
def func(n):
sec = random.randint(1, 7)
logger.info('number {}, sleep {}'.format(n, sec))
time.sleep(sec)
threads = []
for i in range(5):
t = threading.Thread(target=func, args=(i, ))
threads.append(t)
for t in threads: # 开始,执行全部线程
t.start()
for t in threads: # 阻塞,所有线程都结束后才向后执行
t.join()
logger.info('done')
运行结果
示例2 - 继承Thread类
import time
import random
import logging
import threading
logging.basicConfig(
format='[%(levelname)s %(asctime)s %(filename)s:%(lineno)d %(threadName)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.info('start')
class MyThread(threading.Thread):
def __init__(self, n):
super(MyThread, self).__init__()
self.n = n
def run(self):
logger.info('\033[32mstarting {}\033[0m'.format(self.name))
sec = random.randint(1,7)
logger.info('sleep {}, value: {}'.format(sec, self.n))
time.sleep(sec)
logger.info('\033[36mexiting {}\033[0m'.format(self.name))
threads = []
for i in range(1, 6):
t = MyThread(i)
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
logger.info('done')
运行结果
2 __mulitprocessing.dummy.Pool
用法类似于多进程池 multiprocessing.Pool multiprocessing.dummy.Pool可以用于类中 multiprocessing.pool.ThreadPool的target不能是类的方法
import time
import logging
from multiprocessing.dummy import Pool as ThreadPool
# from multiprocessing.pool import ThreadPool
logging.basicConfig(
format='[%(levelname)s %(asctime)s %(filename)s:%(lineno)d %(threadName)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.DEBUG)
logger = logging.getLogger(__name__)
start_time = time.time()
logger.info('start')
def func(n):
logger.info('starting {}'.format(n))
sec = 2
logger.info('number {}, sleep {}'.format(n, sec))
time.sleep(sec)
logger.info('exiting {}'.format(n))
p = ThreadPool(3)
for i in range(1, 6):
p.apply_async(func, args=(i,))
p.close() # 关闭线程池,不可以再添加子线程
p.join() # 阻塞,之前需要先close
logger.info('done')
logger.info('time used: {:.2f}s'.format(time.time() - start_time))
运行结果
3 _concurrent.futures.ThreadPoolExecutor_
第三方包 可以指定线程数
示例
import time
import logging
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(
format='[%(levelname)s %(asctime)s %(filename)s:%(lineno)d %(threadName)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.DEBUG)
logger = logging.getLogger(__name__)
start_time = time.time()
logger.info('start')
def func(n):
logger.info('starting {}'.format(n))
sec = 2
logger.info('number {}, sleep {}'.format(n, sec))
time.sleep(sec)
logger.info('exiting {}'.format(n))
t = ThreadPoolExecutor(2)
for i in range(1, 6):
t.submit(func, i)
t.shutdown()
logger.info('done')
logger.info('time used: {:.2f}s'.format(time.time() - start_time))
4 map, map_async, apply, apply_async
Method | Multi-args | Concurrence | Blocking | Ordered-results |
---|---|---|---|---|
apply | YES | NO | YES | NO |
apply_async | YES | YES | NO | NO |
map | NO | YES | YES | YES |
map_async | NO | YES | NO | YES |