14、进程池 ProcessPoolExecutor

author:14组开发周博文 欢迎各位老师与同学指导和指正!

  • 在标准库模块 concurrent.futures 中提供了一个类对象 ProcessPoolExecutor ,也用于表示进程池。与 Pool 相比, ProcessPoolExecutor 的功能和性能更加强大。

14-1、一般使用

  • 基本使用
  1. import time, random
  2. from concurrent.futures import ProcessPoolExecutor
  3. def do_sth(i):
  4. print(f'子进程 {i} 启动')
  5. start = time.time()
  6. time.sleep(random.random() * 10)
  7. end = time.time()
  8. print(f'子进程结束,耗时 {end - start} 秒')
  9. def main():
  10. print('父进程启动')
  11. # 将进程池所能容纳的最大进程数指定为3
  12. ppe = ProcessPoolExecutor(max_workers=3)
  13. # 将进程池处理的任务全部交给进程池,此后会创建并启动由进程池所管理的子进程
  14. for i in range(1, 11):
  15. # 与方法start类似,不同的是,创建并启动由进程池管理的子进程
  16. ppe.submit(do_sth, i)
  17. # 父进程被阻塞
  18. # 进程池中所管理的所有子进程都执行完之后,父进程再从被阻塞的地方继续执行
  19. ppe.shutdown(wait=True)
  20. print('父进程结束')
  21. if __name__ == "__main__":
  22. main()

运行结果

父进程启动
子进程 1 启动
子进程 2 启动
子进程 3 启动
子进程结束,耗时 4.644615173339844 秒
子进程 4 启动
子进程结束,耗时 5.5947425365448 秒
子进程 5 启动
子进程结束,耗时 8.126129627227783 秒
子进程 6 启动
子进程结束,耗时 4.460269212722778 秒
子进程 7 启动
子进程结束,耗时 1.6025967597961426 秒 子进程 8 启动
子进程结束,耗时 5.34043288230896 秒
子进程 9 启动
子进程结束,耗时 2.1274640560150146 秒 子进程 10 启动
子进程结束,耗时 5.02870774269104 秒
子进程结束,耗时 3.3813669681549072 秒 子进程结束,耗时 7.3181703090667725 秒 父进程结束

  • 使用 with 关键字简化创建进程池

    • 类对象 ProcessPoolExecutor 遵守了上下文管理协议,所以可以使用 with 语句,这样,在离开运行时上下文时会自动的调用方法 shutdown(wait=True)
  1. import time, random
  2. from concurrent.futures import ProcessPoolExecutor
  3. def do_sth(i):
  4. print(f'子进程 {i} 启动')
  5. start = time.time()
  6. time.sleep(random.random() * 10)
  7. end = time.time()
  8. print(f'子进程结束,耗时 {end - start} 秒')
  9. def main():
  10. print('父进程启动')
  11. # 将进程池所能容纳的最大进程数指定为3
  12. with ProcessPoolExecutor(max_workers=3) as ppe:
  13. for i in range(1, 11):
  14. ppe.submit(do_sth, i)
  15. print('父进程结束')
  16. if __name__ == "__main__":
  17. main()

运行结果

父进程启动
子进程 1 启动
子进程 2 启动
子进程 3 启动
子进程结束,耗时 3.266294240951538 秒
子进程 4 启动
子进程结束,耗时 3.5326197147369385 秒
子进程 5 启动
子进程结束,耗时 0.11972618103027344 秒
子进程 6 启动
子进程结束,耗时 6.573461532592773 秒
子进程 7 启动
子进程结束,耗时 1.381535530090332 秒
子进程 8 启动
子进程结束,耗时 5.36028265953064 秒
子进程 9 启动
子进程结束,耗时 6.590775966644287 秒
子进程 10 启动
子进程结束,耗时 1.9599616527557373 秒
子进程结束,耗时 5.0140204429626465 秒
子进程结束,耗时 9.511806726455688 秒
父进程结束

  • 使用 map 简化创建任务
  1. import time, random
  2. from concurrent.futures import ProcessPoolExecutor
  3. def do_sth(i):
  4. print(f'子进程 {i} 启动')
  5. start = time.time()
  6. time.sleep(random.random() * 10)
  7. end = time.time()
  8. print(f'子进程结束,耗时 {end - start} 秒')
  9. def main():
  10. print('父进程启动')
  11. # 将进程池所能容纳的最大进程数指定为3
  12. with ProcessPoolExecutor(max_workers=3) as ppe:
  13. ppe.map(do_sth, range(1, 11))
  14. print('父进程结束')
  15. if __name__ == "__main__":
  16. main()

运行结果

父进程启动
子进程 1 启动
子进程 3 启动
子进程 2 启动
子进程结束,耗时 2.013543128967285 秒
子进程 4 启动
子进程结束,耗时 2.692397356033325 秒
子进程 5 启动
子进程结束,耗时 0.5211577415466309 秒
子进程 6 启动
子进程结束,耗时 7.59312629699707 秒
子进程 7 启动
子进程结束,耗时 6.858060598373413 秒
子进程 8 启动
子进程结束,耗时 0.4232463836669922 秒
子进程 9 启动
子进程结束,耗时 3.7634811401367188 秒
子进程 10 启动
子进程结束,耗时 9.153268575668335 秒
子进程结束,耗时 3.5927655696868896 秒
子进程结束,耗时 9.247799396514893 秒
父进程结束

14-2、 Future 对象和 result 方法

  • 方法 submit() 的返回值是一个 Future 实例对象,表示子进程所调用的那个函数的执行(比如: do_sth() ),可以调用 Future 的方法 result() 得到的这个函数的返回值。

  • 方法 result() 是一个同步方法,也就是说,知道这个函数执行完毕之后方法 result() 才会返回。

  1. import time
  2. from concurrent.futures import ProcessPoolExecutor
  3. def do_sth(i):
  4. time.sleep(2)
  5. return i * i
  6. def main():
  7. with ProcessPoolExecutor(max_workers=3) as ppe:
  8. for i in range(1, 5):
  9. future = ppe.submit(do_sth, i)
  10. # 同步方法,需要等待do_sth执行完毕
  11. print(future.result())
  12. if __name__ == "__main__":
  13. main()

运行结果

1
4
9
16

  • 运行结果中,得到的四个结果是一个一个的完成的,并不是我们所期待的那样,同时启动三个子进程处理任务。这是因为 result() 是同步方法,每次都需要等待 do_sth() 执行完毕才会执行下一个任务。
  • 如果想要即得到函数的返回值,又想让方法异步的执行。新建一个空列表用来保存 Future 实例对象吧。
  1. import time
  2. from concurrent.futures import ProcessPoolExecutor
  3. def do_sth(i):
  4. time.sleep(2)
  5. return i * i
  6. def main():
  7. with ProcessPoolExecutor(max_workers=3) as ppe:
  8. objs = []
  9. for i in range(1, 5):
  10. future = ppe.submit(do_sth, i)
  11. # 异步方法,无需等待do_sth执行完毕
  12. print(future)
  13. objs.append(future)
  14. for obj in objs:
  15. print(obj.result())
  16. if __name__ == "__main__":
  17. main()

运行结果

1
4
9
16

  • 运行结果中,首先打印四个 Future 对象,然后等四个子进程全都执行完毕之后一次性的将四个子进程所调用 do_sth() 的返回值全都打印出来。

14-3、 wait() 函数与 as_completed() 函数

  • 标准库模块 concurrent.futures 提供了两个函数: wait()as_completed()

  • (1) wait(fs, timeout=None, return_when=ALL_COMPLETED)

    • 函数用于阻塞父进程,以等待指定的 Future 实例对象序列,直到满足所指定的条件。
    • 参数 fs 用于指定要等待的Future 实例对象序列。
    • 参数 timeout 用于指定等到的最长时间。如果指定为 None 或者不指定,则一直等待。
    • 参数 return_when 用于指定函数何时返回,有三种取值: FIRST_COMPLETEDFIRST_EXCEPTIONALL_COMPLETED ,分别表示:当第一个 Future 实例对象已经完成或者被取消时、当第一个 Future 实例对象抛出异常时、当所有的 Future 实例对象已经完成或者被取消时。
    • 该函数的返回值是由两个集合组成的元组,第一个集合包含了已经完成或者已被取消的所有 Future 实例对象,第二个集合包含了没有完成并且没有被取消的 Future 实例对象。
  1. import time, random
  2. from concurrent.futures import ProcessPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION
  3. def do_sth(i):
  4. time.sleep(random.random() * 10)
  5. return i * i
  6. def main():
  7. ppe = ProcessPoolExecutor(max_workers=3)
  8. objs = []
  9. for i in range(1, 5):
  10. future = ppe.submit(do_sth, i)
  11. objs.append(future)
  12. # wait 的返回值是一个元组,其中包括两个集合,分别表示已完成和完成的任务的Future对象集合
  13. done, not_done = wait(objs, return_when=ALL_COMPLETED)
  14. # done, not_done = wait(objs, return_when=FIRST_COMPLETED)
  15. print('done set: ', done)
  16. print('not_done set: ', not_done)
  17. if __name__ == "__main__":
  18. main()

return_when 指定为 ALL_COMPLETED 的运行结果

  1. done set: {<Future at 0x2536a41f400 state=finished returned int>, <Future at 0x2536a41f4a8 state=finished returned int>, <Future at 0x2536a41f358 state=finished returned int>, <Future at 0x2536a3dc4e0 state=finished returned int>}
  2. not_done set: set()

return_when 指定为 FIRST_COMPLETED 的运行结果

  1. done set: {<Future at 0x1c5e0640400 state=finished returned int>}
  2. not_done set: {<Future at 0x1c5e0640358 state=running>, <Future at 0x1c5e0640550 state=running>, <Future at 0x1c5e05fc4e0 state=running>}
  • (2) as_completed(*fs*, *timeout*=None)

    • 该函数用于将指定的 Future 实例对象序列转换为一个迭代器,当序列中的任意一个 Future 实例对象已经完成或已被取消时,都会被 yield 。这样遍历得到的迭代器,就可以在任意一个 Future 实例对象已经完成或已被取消时立即做一些处理,比如调用方法 reslut 得到执行结果。简单的说, as_completed 函数可以帮我们实现在 Future 实例对象已经完成或已被取消时立即做一些处理的需求。
    • 参数 fs 用于指定 Future 实例对象序列。
    • 参数 timeout 用于指定超时时间。如果指定为 None 或者不指定,则不会超时。
    • 该函数的返回值是 Future 实例对象的迭代器。
  1. import time, random
  2. from concurrent.futures import ProcessPoolExecutor, as_completed
  3. def do_sth(i):
  4. time.sleep(random.random() * 10)
  5. return i * i
  6. def main():
  7. print("父进程开始")
  8. ppe = ProcessPoolExecutor(max_workers=3)
  9. objs = []
  10. for i in range(1, 5):
  11. future = ppe.submit(do_sth, i)
  12. objs.append(future)
  13. future_iterator = as_completed(objs)
  14. for future in future_iterator:
  15. print(future.result())
  16. print("父进程结束")
  17. if __name__ == "__main__":
  18. main()

运行结果

父进程开始 1
9
16
4
父进程结束

  • 运行结果得知,当子进程处理完成一个任务并得到 Future实例对象后,就会立刻调用result()函数得到任务运行结果。并且任务依旧是同步进行的。