asyncio

异步io,就是你发起一个io操作,却不用等它结束,你可以继续做其他事,当它结果时,你会得到通知。

asyncio是并发concurrency的一种方式。对python来说,并发还可以通过线程(threading) 和多进程(multiprocessing)来实现。

asyncio并不能带来真正的并行(parallelism)。当然因为GIL的存在,python的多线程也不能事画真正的并行。

协程

协程的定义,需要使用 async def 语句

  1. async def do_some_work(x): pass

do_some_work 便是一个协程。
准备来说,do_some_work是一个协程函数,可以通过 asyncio.iscoroutinefunction 来验证

  1. print(asyncio.iscoroutinefunction(do_some_work)) # True

这个协程什么都没有做,我们让它sleep账簿,模拟实际的工作量

  1. async def do_some_work(x):
  2. print("Waiting " + str(x))
  3. await asyncio.sleep(x)

在解释await之前,有必要说明一下协程可以做哪些事。协程可以:

  • 等待一个 future 结束
  • 等待另一个协程(产生一个结果,或者引发一个异常)
  • 产生一个结果给正在等它的协程
  • 引发一个异常给正在等待它的协程

asyncio.sleep 也是一个协程,所以 await asyncio.sleep(x) 就是等待另一个协程。可以参数asyncio.sleep文档:

  1. sleep(delay, result=None, *, loop=None)
  2. Coroutine that completes after a given time (in seconds).

运行协程

调用协程函数,协程并不会开始运行,只是返回一个协程对象,可以通过 asyncio.isfcoroutine来验证:

  1. print(asyncio.iscoroutine(do_some_work(3)) # True
  2. async1.py:16: RuntimeWarning: coroutine 'do_some_work' was never awaited
  3. print(asyncio.iscoroutine(do_some_work(3)))

要让这个协程对象运行的话,有两种方式:

  • 在另一个已经运行的协程中用 await 来等待它
  • 通过 ensure_future 函数计划它的执行

简单来说只有 loop运行了,协程才可能运行。

下面:

  1. 先拿到当前线程缺少的loop,
  2. 然后把 协程对象 交给loop.run_until_complete,
  3. 协程对象随后会在loop里得到运行。
  1. loop = asyncio.get_event_loop()
  2. loop.ru_until_complete(do_some_work(3))

run_until_complete是一个了阻塞(blocking)调用,直到协程运行结事,它才返回。这一点从函数名可以看出。
fun_until_complete的参数是一个 future,但是我们这里传给它的却是协程对象,之所以能这样,是因为它的内部做了检查,通过 ensure_future 孟尝君把协程对象包装wrap成了future。所以,我们可以写得更明显五经:

  1. loop.run_until_complete(asyncio.ensurefuture(do_some_work(3)))

完整代码

  1. import asyncio
  2. async def_some_work(x):
  3. print("Waiting " + str(x))
  4. await asyncio.sleep(x)
  5. loop = asyncio.get_event_loop()
  6. loop.run_until_complete(do_some_work(3))
  7. '''
  8. output:
  9. Waiting 3
  10. <三秒后程序结果>
  11. '''

回调

假如协程是一个io的读操作,等它读完数据后,我们希望得到通知,以便下一步数据的处理。这一需要可以通过往future添加回调来实现。

  1. def done_callback(futu):
  2. print('Done')
  3. futu = asyncio.ensure_future(do_some_work(3))
  4. futu.add_done_callback(done_callback)
  5. loop.run_until_complete(futu)

多个协程

实际项目中,往往有多个协程,同时在一个loop里运行。为了把多个协程交给loop,需要借助 asyncio.gather函数

  1. loop.run_until_complete(asyncio.gather(do_some_work(1), do_some_work(3)))

或者先把协程存在列表里

  1. coros = [do_some_work(1), do_some_work(3)]
  2. loop/run_until_complete(asyncio.gather(*coros))

运行结果:

  1. Waiting 3
  2. Waiting 1
  3. <等待三秒种>
  4. Done

这两个协程是并发运行的,所以等待的时间不是 1+3=4,而是心耗时较长的那个协程为准。
参考函数gather的文档

gather(*coros_or_futures, loop=None, return_exceptions=False) Return a future aggregating results from the given coroutines or futures.

发现也可以传futures给它

  1. futus = [asyncio.ensure_future(do_some_work(1)),
  2. asyncio.ensure_future(do_some_work(3))]
  3. loop.run_until_complete(asyncio.gather(*futus))

gather 起聚合的作用,把多个 futures 包装成单个 future,因为 loop.run_until_complete 只接受单个 future。

run_until_complete 和 run_forever

我们一直通过 run_until_complete 来运行 loop ,等到 future 完成,run_until_complete 也就返回了。

  1. async def do_some_work(x):
  2. print('Waiting ' + str(x))
  3. await asyncio.sleep(x)
  4. print('Done')
  5. loop = asyncio.get_event_loop()
  6. coro = do_some_work(3)
  7. loop.run_until_complete(coro) # future 完成后返回
  8. '''
  9. Waiting 3
  10. <等待三秒钟>
  11. Done
  12. <程序退出>
  13. '''

现在改用 run_forever

  1. async def do_some_work(x):
  2. print('Waiting ' + str(x))
  3. await asyncio.sleep(x)
  4. print('Done')
  5. loop = asyncio.get_event_loop()
  6. coro = do_some_work(3)
  7. asyncio.ensure_future(coro)
  8. loop.run_forever()

三秒钟过后,future 结束,但是程序并不会退出。run_forever 会一直运行,直到 stop 被调用,但是你不能像下面这样调 stop

  1. loop.run_forever()
  2. loop.stop() # 上一步,永远不返回,stop不会被调用到

run_forever 不返回,stop 永远也不会被调用。所以,只能在协程中调 stop

  1. async def do_some_work(loop, x):
  2. print('Waiting ' + str(x))
  3. await asyncio.sleep(x)
  4. print('Done')
  5. loop.stop()

这样并非没有问题,假如有多个协程在 loop 里运行:

  1. asyncio.ensure_future(do_some_work(loop, 1))
  2. asyncio.ensure_future(do_some_work(loop, 3))
  3. loop.run_forever()

第二个协程没结束,loop 就停止了——被先结束的那个协程给停掉的。
要解决这个问题,可以用 gather 把多个协程合并成一个 future,并添加回调,然后在回调里再去停止 loop。

  1. async def do_some_work(loop, x):
  2. print('Waiting ' + str(x))
  3. await asyncio.sleep(x)
  4. print('Done')
  5. def done_callback(loop, futu):
  6. loop.stop()
  7. loop = asyncio.get_event_loop()
  8. futus = asyncio.gather(do_some_work(loop, 1), do_some_work(loop, 3))
  9. futus.add_done_callback(functools.partial(done_callback, loop))
  10. loop.run_forever()

其实这基本上就是 run_until_complete的实现了,run_until_complete在内部也是调用 run_forever.

close loop

以上示例都没有调用 loop.close,好像也没有什么问题。所以到底要不要调 loop.close 呢?
简单来说,loop 只要不关闭,就还可以再运行。:

  1. loop.run_until_complete(do_some_work(loop, 1))
  2. loop.run_until_complete(do_some_work(loop, 3))
  3. loop.close()

但是如果关闭了,就不能再运行了:

  1. loop.run_until_complete(do_some_work(loop, 1))
  2. loop.close()
  3. loop.run_until_complete(do_some_work(loop, 3)) # 此处异常

建议调用 loop.close,以彻底清理 loop 对象防止误用。

gather vs wait

asyncio.gatherasyncio.wait 功能相似。

  1. coros = [do_some_work(loop, 1), do_some_work(loop, 3)]
  2. loop.run_until_complete(asyncio.wait(coros))

具体差别可请参见 StackOverflow 的讨论:Asyncio.gather vs asyncio.wait

timer

C++ Boost.Asio 提供了 IO 对象 timer,但是 Python 并没有原生支持 timer,不过可以用 asyncio.sleep 模拟。

  1. async def timer(x, cb):
  2. futu = asyncio.ensure_future(asyncio.sleep(x))
  3. futu.add_done_callback(cb)
  4. await futu
  5. t = timer(3, lambda futu: print('Done'))
  6. loop.run_until_complete(t)