异步执行可以用多线程(ThreadPoolExecutor),或多进程(ProcessPoolExecutor)。
两者都实现了相同的接口——Executor 抽象类。

Executor 对象

抽象类,不能直接用,要通过子类(具体类)来实例化。

submit(fn, args, *kwargs)

安排什么时候调用 fn 。 fn(args, *kwargs),submit 方法返回一个 Future 对象。

  1. from concurrent.futures import ThreadPoolExecutor
  2. with ThreadPoolExecutor(max_workers=1) as executor:
  3. future = executor.submit(pow, 3, 4)
  4. print(future) # <Future at 0x984930 state=finished returned int>
  5. print(future.result()) # 81

with 语句会自动调用 excutor 的 shutdown 方法释放资源。

注意了,调用函数除了显式地用括号调用,还可以把函数 submit 到 executor 中,让它在合适的时候调用。
但是,当函数 submit 到 executor 来执行时,函数如果出现未定义的变量,解析器不会报错而是跳过!!!

map(func, iterable…)

如果多次 submit 同一个函数,只是函数参数不同,可以用 map 简化。
注意,返回的生成器生成的不是 future 对象,而是返回结果。而 submit 方法返回的是一个 Future 对象。

  1. from concurrent.futures import ThreadPoolExecutor
  2. import math
  3. with ThreadPoolExecutor(max_workers=1) as executor:
  4. results = executor.map(math.sqrt, [9, 81, 64])
  5. print(results) # <generator object ...>
  6. print(list(results)) # [3.0, 9.0, 8.0]

ThreadPoolExecutor

多少次 submit 就要多少个线程,若 submit 数大于 max_workers 会导致死锁。整个模块占一个线程,不算入 max_workers 。

  1. def wait_on_future():
  2. f = executor.submit(pow, 4, 2)
  3. print(f.result())
  4. executor = ThreadPoolExecutor(max_workers=1)
  5. executor.submit(wait_on_future)

两次 submit,一个线程,死锁。
max_workers 为 1,最开始 submit 的函数是 wait_on_future,霸占了这个线程,后面 submit 的 pow 函数就永远无法执行。

map——并发执行,有序返回

因为返回有序,所以做不到立刻返回。
返回生成器,直接生成调用函数的返回结果。

  1. # a.py
  2. from concurrent.futures import ThreadPoolExecutor, as_completed
  3. import requests
  4. URLS = [
  5. 'https://www.baidu.com/',
  6. 'https://github.com/96chh',
  7. 'http://www.gdut.edu.cn/',
  8. 'https://www.processon.com/',
  9. ]
  10. def parse(url):
  11. r = requests.get(url)
  12. print(url)
  13. return url
  14. with ThreadPoolExecutor(max_workers=3) as executor:
  15. results = executor.map(parse, URLS)
  16. for r in results:
  17. print(r)
  1. C:\Users\ct7a10>python a.py
  2. http://www.gdut.edu.cn/
  3. https://www.processon.com/
  4. https://www.baidu.com/
  5. https://www.baidu.com/ # 有序返回
  6. https://github.com/96chh
  7. https://github.com/96chh # 有序返回
  8. http://www.gdut.edu.cn/ # 有序返回
  9. https://www.processon.com/ # 有序返回

as_completed——并发执行,无序立刻返回

模块方法。
返回生成器,生成 Future 对象。
参数是多个 Future 对象(例如一个 dict 或 list,反正里面包含多个 future)。

无序并发执行,无序立刻返回:

  1. with ThreadPoolExecutor(max_workers=3) as executor:
  2. fs = [executor.submit(parse, url) for url in URLS]
  3. for f in as_completed(fs):
  4. print(f.result())
  1. C:\Users\ct7a10>python b.py
  2. http://www.gdut.edu.cn/
  3. http://www.gdut.edu.cn/
  4. https://www.baidu.com/
  5. https://www.baidu.com/
  6. https://www.processon.com/
  7. https://www.processon.com/
  8. https://github.com/96chh
  9. https://github.com/96chh

ProcessPoolExecutor

The __main__ module must be importable by worker subprocesses.This means that ProcessPoolExecutor will not work in the interactive interpreter.

  1. import concurrent.futures
  2. import math
  3. PRIMES = [
  4. 112272535095293,
  5. 112582705942171,
  6. 112272535095293,
  7. 115280095190773,
  8. 115797848077099,
  9. 1099726899285419]
  10. def is_prime(n):
  11. if n % 2 == 0:
  12. return False
  13. sqrt_n = int(math.sqrt(n))
  14. for i in range(3, sqrt_n + 1, 2):
  15. if n % i == 0:
  16. return False
  17. return True
  18. def main():
  19. with concurrent.futures.ProcessPoolExecutor() as executor:
  20. for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
  21. print('%d is prime: %s' % (number, prime)) # 顺序返回
  22. if __name__ == '__main__': # 不能省略!
  23. main()

Future 对象

包裹着一个可调用对象的异步执行。由 submit 方法生成 Future 对象,不应手动创建。

  • cancel():正在执行不能取消返回 False;取消成功返回 True

  • cancelled():成功取消则返回 True

  • running():正在执行则返回 True,且此时不能 cancel

  • done():成功取消或执行结束则返回 True

  • result(timeout=None):返回可调用对象的返回值。

  • add_done_callback(fn):fn 是一个函数,future 是它唯一参数。当这个 future 执行完毕或取消时,调用 fn 。

参考文档:
https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures