14、进程池 ProcessPoolExecutor
author:14组开发周博文 欢迎各位老师与同学指导和指正!
- 在标准库模块
concurrent.futures
中提供了一个类对象ProcessPoolExecutor
,也用于表示进程池。与Pool
相比,ProcessPoolExecutor
的功能和性能更加强大。
14-1、一般使用
- 基本使用
import time, random
from concurrent.futures import ProcessPoolExecutor
def do_sth(i):
print(f'子进程 {i} 启动')
start = time.time()
time.sleep(random.random() * 10)
end = time.time()
print(f'子进程结束,耗时 {end - start} 秒')
def main():
print('父进程启动')
# 将进程池所能容纳的最大进程数指定为3
ppe = ProcessPoolExecutor(max_workers=3)
# 将进程池处理的任务全部交给进程池,此后会创建并启动由进程池所管理的子进程
for i in range(1, 11):
# 与方法start类似,不同的是,创建并启动由进程池管理的子进程
ppe.submit(do_sth, i)
# 父进程被阻塞
# 进程池中所管理的所有子进程都执行完之后,父进程再从被阻塞的地方继续执行
ppe.shutdown(wait=True)
print('父进程结束')
if __name__ == "__main__":
main()
运行结果
父进程启动
子进程 1 启动
子进程 2 启动
子进程 3 启动
子进程结束,耗时 4.644615173339844 秒
子进程 4 启动
子进程结束,耗时 5.5947425365448 秒
子进程 5 启动
子进程结束,耗时 8.126129627227783 秒
子进程 6 启动
子进程结束,耗时 4.460269212722778 秒
子进程 7 启动
子进程结束,耗时 1.6025967597961426 秒 子进程 8 启动
子进程结束,耗时 5.34043288230896 秒
子进程 9 启动
子进程结束,耗时 2.1274640560150146 秒 子进程 10 启动
子进程结束,耗时 5.02870774269104 秒
子进程结束,耗时 3.3813669681549072 秒 子进程结束,耗时 7.3181703090667725 秒 父进程结束
使用
with
关键字简化创建进程池- 类对象
ProcessPoolExecutor
遵守了上下文管理协议,所以可以使用with
语句,这样,在离开运行时上下文时会自动的调用方法shutdown(wait=True)
- 类对象
import time, random
from concurrent.futures import ProcessPoolExecutor
def do_sth(i):
print(f'子进程 {i} 启动')
start = time.time()
time.sleep(random.random() * 10)
end = time.time()
print(f'子进程结束,耗时 {end - start} 秒')
def main():
print('父进程启动')
# 将进程池所能容纳的最大进程数指定为3
with ProcessPoolExecutor(max_workers=3) as ppe:
for i in range(1, 11):
ppe.submit(do_sth, i)
print('父进程结束')
if __name__ == "__main__":
main()
运行结果
父进程启动
子进程 1 启动
子进程 2 启动
子进程 3 启动
子进程结束,耗时 3.266294240951538 秒
子进程 4 启动
子进程结束,耗时 3.5326197147369385 秒
子进程 5 启动
子进程结束,耗时 0.11972618103027344 秒
子进程 6 启动
子进程结束,耗时 6.573461532592773 秒
子进程 7 启动
子进程结束,耗时 1.381535530090332 秒
子进程 8 启动
子进程结束,耗时 5.36028265953064 秒
子进程 9 启动
子进程结束,耗时 6.590775966644287 秒
子进程 10 启动
子进程结束,耗时 1.9599616527557373 秒
子进程结束,耗时 5.0140204429626465 秒
子进程结束,耗时 9.511806726455688 秒
父进程结束
- 使用
map
简化创建任务
import time, random
from concurrent.futures import ProcessPoolExecutor
def do_sth(i):
print(f'子进程 {i} 启动')
start = time.time()
time.sleep(random.random() * 10)
end = time.time()
print(f'子进程结束,耗时 {end - start} 秒')
def main():
print('父进程启动')
# 将进程池所能容纳的最大进程数指定为3
with ProcessPoolExecutor(max_workers=3) as ppe:
ppe.map(do_sth, range(1, 11))
print('父进程结束')
if __name__ == "__main__":
main()
运行结果
父进程启动
子进程 1 启动
子进程 3 启动
子进程 2 启动
子进程结束,耗时 2.013543128967285 秒
子进程 4 启动
子进程结束,耗时 2.692397356033325 秒
子进程 5 启动
子进程结束,耗时 0.5211577415466309 秒
子进程 6 启动
子进程结束,耗时 7.59312629699707 秒
子进程 7 启动
子进程结束,耗时 6.858060598373413 秒
子进程 8 启动
子进程结束,耗时 0.4232463836669922 秒
子进程 9 启动
子进程结束,耗时 3.7634811401367188 秒
子进程 10 启动
子进程结束,耗时 9.153268575668335 秒
子进程结束,耗时 3.5927655696868896 秒
子进程结束,耗时 9.247799396514893 秒
父进程结束
14-2、 Future
对象和 result
方法
方法
submit()
的返回值是一个Future
实例对象,表示子进程所调用的那个函数的执行(比如:do_sth()
),可以调用Future
的方法result()
得到的这个函数的返回值。方法
result()
是一个同步方法,也就是说,知道这个函数执行完毕之后方法result()
才会返回。
import time
from concurrent.futures import ProcessPoolExecutor
def do_sth(i):
time.sleep(2)
return i * i
def main():
with ProcessPoolExecutor(max_workers=3) as ppe:
for i in range(1, 5):
future = ppe.submit(do_sth, i)
# 同步方法,需要等待do_sth执行完毕
print(future.result())
if __name__ == "__main__":
main()
运行结果
1
4
9
16
- 运行结果中,得到的四个结果是一个一个的完成的,并不是我们所期待的那样,同时启动三个子进程处理任务。这是因为
result()
是同步方法,每次都需要等待do_sth()
执行完毕才会执行下一个任务。
- 如果想要即得到函数的返回值,又想让方法异步的执行。新建一个空列表用来保存
Future
实例对象吧。
import time
from concurrent.futures import ProcessPoolExecutor
def do_sth(i):
time.sleep(2)
return i * i
def main():
with ProcessPoolExecutor(max_workers=3) as ppe:
objs = []
for i in range(1, 5):
future = ppe.submit(do_sth, i)
# 异步方法,无需等待do_sth执行完毕
print(future)
objs.append(future)
for obj in objs:
print(obj.result())
if __name__ == "__main__":
main()
运行结果
1
4
9
16
- 运行结果中,首先打印四个
Future
对象,然后等四个子进程全都执行完毕之后一次性的将四个子进程所调用do_sth()
的返回值全都打印出来。
14-3、 wait()
函数与 as_completed()
函数
标准库模块
concurrent.futures
提供了两个函数:wait()
和as_completed()
(1)
wait(fs, timeout=None, return_when=ALL_COMPLETED)
- 函数用于阻塞父进程,以等待指定的
Future
实例对象序列,直到满足所指定的条件。 - 参数
fs
用于指定要等待的Future
实例对象序列。 - 参数
timeout
用于指定等到的最长时间。如果指定为None
或者不指定,则一直等待。 - 参数
return_when
用于指定函数何时返回,有三种取值:FIRST_COMPLETED
、FIRST_EXCEPTION
和ALL_COMPLETED
,分别表示:当第一个Future
实例对象已经完成或者被取消时、当第一个Future
实例对象抛出异常时、当所有的Future
实例对象已经完成或者被取消时。 - 该函数的返回值是由两个集合组成的元组,第一个集合包含了已经完成或者已被取消的所有
Future
实例对象,第二个集合包含了没有完成并且没有被取消的Future
实例对象。
- 函数用于阻塞父进程,以等待指定的
import time, random
from concurrent.futures import ProcessPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION
def do_sth(i):
time.sleep(random.random() * 10)
return i * i
def main():
ppe = ProcessPoolExecutor(max_workers=3)
objs = []
for i in range(1, 5):
future = ppe.submit(do_sth, i)
objs.append(future)
# wait 的返回值是一个元组,其中包括两个集合,分别表示已完成和完成的任务的Future对象集合
done, not_done = wait(objs, return_when=ALL_COMPLETED)
# done, not_done = wait(objs, return_when=FIRST_COMPLETED)
print('done set: ', done)
print('not_done set: ', not_done)
if __name__ == "__main__":
main()
return_when
指定为 ALL_COMPLETED
的运行结果
done set: {<Future at 0x2536a41f400 state=finished returned int>, <Future at 0x2536a41f4a8 state=finished returned int>, <Future at 0x2536a41f358 state=finished returned int>, <Future at 0x2536a3dc4e0 state=finished returned int>}
not_done set: set()
return_when
指定为 FIRST_COMPLETED
的运行结果
done set: {<Future at 0x1c5e0640400 state=finished returned int>}
not_done set: {<Future at 0x1c5e0640358 state=running>, <Future at 0x1c5e0640550 state=running>, <Future at 0x1c5e05fc4e0 state=running>}
(2)
as_completed(*fs*, *timeout*=None)
- 该函数用于将指定的
Future
实例对象序列转换为一个迭代器,当序列中的任意一个Future
实例对象已经完成或已被取消时,都会被yield
。这样遍历得到的迭代器,就可以在任意一个Future
实例对象已经完成或已被取消时立即做一些处理,比如调用方法reslut
得到执行结果。简单的说,as_completed
函数可以帮我们实现在Future
实例对象已经完成或已被取消时立即做一些处理的需求。 - 参数
fs
用于指定Future
实例对象序列。 - 参数
timeout
用于指定超时时间。如果指定为None
或者不指定,则不会超时。 - 该函数的返回值是
Future
实例对象的迭代器。
- 该函数用于将指定的
import time, random
from concurrent.futures import ProcessPoolExecutor, as_completed
def do_sth(i):
time.sleep(random.random() * 10)
return i * i
def main():
print("父进程开始")
ppe = ProcessPoolExecutor(max_workers=3)
objs = []
for i in range(1, 5):
future = ppe.submit(do_sth, i)
objs.append(future)
future_iterator = as_completed(objs)
for future in future_iterator:
print(future.result())
print("父进程结束")
if __name__ == "__main__":
main()
运行结果
父进程开始 1
9
16
4
父进程结束
- 运行结果得知,当子进程处理完成一个任务并得到
Future
实例对象后,就会立刻调用result()
函数得到任务运行结果。并且任务依旧是同步进行的。