15、线程池 ThreadPoolExecutor

author:14组开发周博文 欢迎各位老师与同学指导和指正!

  • 在标准库模块 concurrent.futures 中提供了一个类对象 ThreadPoolExecutor ,也用于表示线程池。与 ThreadPool 相比, ThreadPoolExecutor 的功能和性能更加强大。

15-1、一般使用

  • 基本使用
  1. import time, random
  2. from concurrent.futures import ThreadPoolExecutor
  3. def do_sth(i):
  4. print(f'子线程 {i} 启动')
  5. start = time.time()
  6. time.sleep(random.random() * 10)
  7. end = time.time()
  8. print(f'子线程结束,耗时 {end - start} 秒')
  9. def main():
  10. print('父线程启动')
  11. # 将线程池所能容纳的最大线程数指定为3
  12. tpe = ThreadPoolExecutor(max_workers=3)
  13. # 将线程池处理的任务全部交给线程池,此后会创建并启动由线程池所管理的子线程
  14. for i in range(1, 11):
  15. # 与方法start类似,不同的是,创建并启动由线程池管理的子线程
  16. tpe.submit(do_sth, i)
  17. # 父线程被阻塞
  18. # 线程池中所管理的所有子线程都执行完之后,父线程再从被阻塞的地方继续执行
  19. tpe.shutdown(wait=True)
  20. print('父线程结束')
  21. if __name__ == "__main__":
  22. main()

运行结果

父线程启动
子线程 1 启动
子线程 2 启动
子线程 3 启动
子线程结束,耗时 1.3141014575958252 秒
子线程 4 启动
子线程结束,耗时 2.937268018722534 秒
子线程 5 启动
子线程结束,耗时 5.958585023880005 秒
子线程 6 启动
子线程结束,耗时 9.515044927597046 秒
子线程 7 启动
子线程结束,耗时 6.779180288314819 秒
子线程 8 启动
子线程结束,耗时 9.496912956237793 秒
子线程 9 启动
子线程结束,耗时 5.9886391162872314 秒
子线程 10 启动
子线程结束,耗时 9.875327110290527 秒
子线程结束,耗时 5.99570369720459 秒
子线程结束,耗时 5.6961729526519775 秒
父线程结束

  • 使用 with 关键字简化创建进程池
  • 类对象 ThreadPoolExecutor 遵守了上下文管理协议,所以可以使用 with 语句,这样,在离开运行时上下文时会自动的调用方法 shutdown(wait=True)
import time, random
from concurrent.futures import ThreadPoolExecutor
def do_sth(i):
    print(f'子线程 {i} 启动')
    start = time.time()
    time.sleep(random.random() * 10)
    end = time.time()
    print(f'子线程结束,耗时 {end - start} 秒')
def main():
    print('父线程启动')
    # 将线程池所能容纳的最大线程数指定为3
    with ThreadPoolExecutor(max_workers=3) as tpe:
        for i in range(1, 11):
            tpe.submit(do_sth, i)
    print('父线程结束')
if __name__ == "__main__":
    main()

运行结果

父线程启动
子线程 1 启动
子线程 2 启动
子线程 3 启动
子线程结束,耗时 3.3316078186035156 秒
子线程 4 启动
子线程结束,耗时 4.002740383148193 秒
子线程 5 启动
子线程结束,耗时 0.7241055965423584 秒
子线程 6 启动
子线程结束,耗时 1.4055047035217285 秒
子线程 7 启动
子线程结束,耗时 6.345177412033081 秒
子线程 8 启动
子线程结束,耗时 3.8100180625915527 秒
子线程 9 启动
子线程结束,耗时 7.2050559520721436 秒
子线程 10 启动
子线程结束,耗时 1.193450689315796 秒
子线程结束,耗时 1.8979597091674805 秒
子线程结束,耗时 7.9613683223724365 秒
父线程结束

  • 使用 map 简化创建任务
import time, random
from concurrent.futures import ThreadPoolExecutor
def do_sth(i):
    print(f'子线程 {i} 启动')
    start = time.time()
    time.sleep(random.random() * 10)
    end = time.time()
    print(f'子线程结束,耗时 {end - start} 秒')
def main():
    print('父线程启动')
    # 将线程池所能容纳的最大线程数指定为3
    with ThreadPoolExecutor(max_workers=3) as tpe:
        tpe.map(do_sth, range(1, 11))
    print('父线程结束')
if __name__ == "__main__":
    main()

运行结果

父线程启动 子线程 1 启动 子线程 2 启动 子线程 3 启动 子线程结束,耗时 4.230120897293091 秒 子线程 4 启动 子线程结束,耗时 5.368730306625366 秒 子线程 5 启动 子线程结束,耗时 6.6971752643585205 秒 子线程 6 启动 子线程结束,耗时 4.179962396621704 秒 子线程 7 启动 子线程结束,耗时 3.384809970855713 秒 子线程 8 启动 子线程结束,耗时 6.470539331436157 秒 子线程 9 启动 子线程结束,耗时 5.21962833404541 秒 子线程 10 启动 子线程结束,耗时 4.528219223022461 秒 子线程结束,耗时 1.5640325546264648 秒 子线程结束,耗时 4.147211790084839 秒 父线程结束

15-2、 Future 对象和 result 方法

  • 方法 submit() 的返回值是一个 Future 实例对象,表示子线程所调用的那个函数的执行(比如: do_sth() ),可以调用 Future 的方法 result() 得到的这个函数的返回值。
  • 方法 result() 是一个同步方法,也就是说,知道这个函数执行完毕之后方法 result () 才会返回。
import time
from concurrent.futures import ThreadPoolExecutor
def do_sth(i):
    print(f'子线程 {i} 启动')
    time.sleep(2)
    print(f'子线程 {i} 结束')
    return i * i
def main():
    with ThreadPoolExecutor(max_workers=3) as tpe:
        for i in range(1, 5):
            future = tpe.submit(do_sth, i)
            # 同步方法,需要等待do_sth执行完毕
            print(future.result())
if __name__ == "__main__":
    main()

运行结果

子线程 1 启动
子线程 1 结束
1
子线程 2 启动
子线程 2 结束
4
子线程 3 启动
子线程 3 结束
9
子线程 4 启动
子线程 4 结束
16

  • 运行结果中,得到的四个结果是一个一个的完成的,并不是我们所期待的那样,同时启动三个子线程处理任务。这是因为 result() 是同步方法,每次都需要等待 do_sth() 执行完毕才会执行下一个任务。

  • 如果想要即得到函数的返回值,又想让方法异步的执行。新建一个空列表用来保存 Future 实例对象吧。

python
import time
from concurrent.futures import ThreadPoolExecutor
def do_sth(i):
    print(f'子线程 {i} 启动')
    time.sleep(2)
    print(f'子线程 {i} 结束')
    return i * i
def main():
    print("父线程开始")
    with ThreadPoolExecutor(max_workers=3) as tpe:
        objs = []
        for i in range(1, 5):
            future = tpe.submit(do_sth, i)
            # 异步方法,无需等待do_sth执行完毕
            print(future)
            objs.append(future)

    for obj in objs:
        print(obj.result())

    print("父线程结束")
if __name__ == "__main__":
    main()

运行结果

父线程开始 子线程 1 启动

子线程 2 启动

子线程 3 启动

子线程 1 结束 子线程 4 启动 子线程 2 结束 子线程 3 结束 子线程 4 结束 1 4 9 16 父线程结束

  • 运行结果中,首先打印四个 Future 对象,其中三个是处于 running 状态的,第四个是在 pending 状态中。证明了同时启动了三个子线程去执行任务,等然后等四个子线程全都执行完毕之后一次性的将四个子线程所调用 do_sth() 的返回值全都打印出来。

15-3、 wait() 函数与 as_completed() 函数

  • 标准库模块 concurrent.futures 提供了两个函数: wait()as_completed()
  • (1) wait(fs, timeout=None, return_when=ALL_COMPLETED)
    • 函数用于阻塞父线程,以等待指定的 Future 实例对象序列,直到满足所指定的条件。
    • 参数 fs 用于指定要等待的 Future 实例对象序列。
    • 参数 timeout 用于指定等到的最长时间。如果指定为 None 或者不指定,则一直等待。
    • 参数 return_when 用于指定函数何时返回,有三种取值: FIRST_COMPLETEDFIRST_EXCEPTIONALL_COMPLETED ,分别表示:当第一个 Future 实例对象已经完成或者被取消时、当第一个 Future 实例对象抛出异常时、当所有的 Future 实例对象已经完成或者被取消时。
    • 该函数的返回值是由两个集合组成的元组,第一个集合包含了已经完成或者已被取消的所有 Future 实例对象,第二个集合包含了没有完成并且没有被取消的 Future 实例对象。
import time, random
from concurrent.futures import ProcessPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION
def do_sth(i):
    time.sleep(random.random() * 10)
    return i * i
def main():
    ppe = ProcessPoolExecutor(max_workers=3)
    objs = []
    for i in range(1, 5):
        future = ppe.submit(do_sth, i)
        objs.append(future)

    # wait 的返回值是一个元组,其中包括两个集合,分别表示已完成和完成的任务的Future对象集合
    done, not_done = wait(objs, return_when=ALL_COMPLETED)
    # done, not_done = wait(objs, return_when=FIRST_COMPLETED)
    print('done set: ', done)
    print('not_done set: ', not_done)
if __name__ == "__main__":
    main()

return_when 指定为 ALL_COMPLETED 的运行结果

父线程启动 子线程 1 启动 子线程 2 启动 子线程 3 启动 子线程 1 结束 子线程 4 启动 子线程 3 结束 子线程 4 结束 子线程 2 结束 done set: {, , , } not_done set: set() 父线程结束

return_when 指定为 FIRST_COMPLETED 的运行结果

父线程启动 子线程 1 启动 子线程 2 启动 子线程 3 启动 子线程 2 结束 子线程 4 启动 done set: {} not_done set: {, , } 父线程结束 子线程 3 结束 子线程 4 结束 子线程 1 结束

  • 运行结果可以看出,return_when 指定为 FIRST_COMPLETED时,父线程会在第一个线程结束后停止阻塞状态。但不影响其它子线程的运行。

  • (2) as_completed(*fs*, *timeout*=None)
    • 该函数用于将指定的 Future 实例对象序列转换为一个迭代器,当序列中的任意一个 Future 实例对象已经完成或已被取消时,都会被 yield 。这样遍历得到的迭代器,就可以在任意一个 Future 实例对象已经完成或已被取消时立即做一些处理,比如调用方法reslut 得到执行结果。简单的说, as_completed 函数可以帮我们实现在 Future 实例对象已经完成或已被取消时立即做一些处理的需求。
    • 参数 fs 用于指定 Future 实例对象序列。
    • 参数 timeout 用于指定超时时间。如果指定为 None 或者不指定,则不会超时。
    • 该函数的返回值是 Future 实例对象的迭代器。
import time, random
from concurrent.futures import ThreadPoolExecutor, as_completed
def do_sth(i):
    print(f'子线程 {i} 启动')
    time.sleep(random.random() * 10)
    print(f'子线程 {i} 结束')
    return i * i
def main():
    print("父线程开始")
    tpe = ThreadPoolExecutor(max_workers=3)
    objs = []
    for i in range(1, 5):
        future = tpe.submit(do_sth, i)
        objs.append(future)

    future_iterator = as_completed(objs)
    for future in future_iterator:
        print(future.result())
    print("父线程结束")
if __name__ == "__main__":
    main()

运行结果

父线程开始 子线程 1 启动 子线程 2 启动 子线程 3 启动 子线程 1 结束 子线程 4 启动 1 子线程 3 结束 9 子线程 4 结束 16 子线程 2 结束 4 父线程结束

  • 运行结果得知,当子线程处理完成一个任务并得到 Future 实例对象后,就会立刻调用指定的 result() 函数得到任务返回值。并且任务依旧是同步进行的。

15线程池 ThreadPoolExecutor - 图1