异步执行可以用多线程(ThreadPoolExecutor),或多进程(ProcessPoolExecutor)。
两者都实现了相同的接口——Executor 抽象类。
Executor 对象
抽象类,不能直接用,要通过子类(具体类)来实例化。
submit(fn, args, *kwargs)
安排什么时候调用 fn 。 fn(args, *kwargs),submit 方法返回一个 Future 对象。
from concurrent.futures import ThreadPoolExecutorwith ThreadPoolExecutor(max_workers=1) as executor:future = executor.submit(pow, 3, 4)print(future) # <Future at 0x984930 state=finished returned int>print(future.result()) # 81
with 语句会自动调用 excutor 的 shutdown 方法释放资源。
注意了,调用函数除了显式地用括号调用,还可以把函数 submit 到 executor 中,让它在合适的时候调用。
但是,当函数 submit 到 executor 来执行时,函数如果出现未定义的变量,解析器不会报错而是跳过!!!
map(func, iterable…)
如果多次 submit 同一个函数,只是函数参数不同,可以用 map 简化。
注意,返回的生成器生成的不是 future 对象,而是返回结果。而 submit 方法返回的是一个 Future 对象。
from concurrent.futures import ThreadPoolExecutorimport mathwith ThreadPoolExecutor(max_workers=1) as executor:results = executor.map(math.sqrt, [9, 81, 64])print(results) # <generator object ...>print(list(results)) # [3.0, 9.0, 8.0]
ThreadPoolExecutor
多少次 submit 就要多少个线程,若 submit 数大于 max_workers 会导致死锁。整个模块占一个线程,不算入 max_workers 。
def wait_on_future():f = executor.submit(pow, 4, 2)print(f.result())executor = ThreadPoolExecutor(max_workers=1)executor.submit(wait_on_future)
两次 submit,一个线程,死锁。
max_workers 为 1,最开始 submit 的函数是 wait_on_future,霸占了这个线程,后面 submit 的 pow 函数就永远无法执行。
map——并发执行,有序返回
因为返回有序,所以做不到立刻返回。
返回生成器,直接生成调用函数的返回结果。
# a.pyfrom concurrent.futures import ThreadPoolExecutor, as_completedimport requestsURLS = ['https://www.baidu.com/','https://github.com/96chh','http://www.gdut.edu.cn/','https://www.processon.com/',]def parse(url):r = requests.get(url)print(url)return urlwith ThreadPoolExecutor(max_workers=3) as executor:results = executor.map(parse, URLS)for r in results:print(r)
C:\Users\ct7a10>python a.pyhttp://www.gdut.edu.cn/https://www.processon.com/https://www.baidu.com/https://www.baidu.com/ # 有序返回https://github.com/96chhhttps://github.com/96chh # 有序返回http://www.gdut.edu.cn/ # 有序返回https://www.processon.com/ # 有序返回
as_completed——并发执行,无序立刻返回
模块方法。
返回生成器,生成 Future 对象。
参数是多个 Future 对象(例如一个 dict 或 list,反正里面包含多个 future)。
无序并发执行,无序立刻返回:
with ThreadPoolExecutor(max_workers=3) as executor:fs = [executor.submit(parse, url) for url in URLS]for f in as_completed(fs):print(f.result())
C:\Users\ct7a10>python b.pyhttp://www.gdut.edu.cn/http://www.gdut.edu.cn/https://www.baidu.com/https://www.baidu.com/https://www.processon.com/https://www.processon.com/https://github.com/96chhhttps://github.com/96chh
ProcessPoolExecutor
The
__main__module must be importable by worker subprocesses.This means that ProcessPoolExecutor will not work in the interactive interpreter.
import concurrent.futuresimport mathPRIMES = [112272535095293,112582705942171,112272535095293,115280095190773,115797848077099,1099726899285419]def is_prime(n):if n % 2 == 0:return Falsesqrt_n = int(math.sqrt(n))for i in range(3, sqrt_n + 1, 2):if n % i == 0:return Falsereturn Truedef main():with concurrent.futures.ProcessPoolExecutor() as executor:for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):print('%d is prime: %s' % (number, prime)) # 顺序返回if __name__ == '__main__': # 不能省略!main()
Future 对象
包裹着一个可调用对象的异步执行。由 submit 方法生成 Future 对象,不应手动创建。
cancel():正在执行不能取消返回 False;取消成功返回 True
cancelled():成功取消则返回 True
running():正在执行则返回 True,且此时不能 cancel
done():成功取消或执行结束则返回 True
result(timeout=None):返回可调用对象的返回值。
add_done_callback(fn):fn 是一个函数,future 是它唯一参数。当这个 future 执行完毕或取消时,调用 fn 。
参考文档:
https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures
