协程

.. testsetup::

from tornado import gen

Tornado中推荐使用 协程 写异步代码. 协程使用了Python的 yield 关键字代替链式回调来将程序挂起和恢复执行(像在 gevent <http://www.gevent.org>_ 中出现的轻量级线程合作方式有时也被称为协程, 但是在Tornado中所有的协程使用明确的上下文切换,并被称为异步函数).

使用协程几乎像写同步代码一样简单, 并且不需要浪费额外的线程. 它们还通过减少上下文切换来 使并发编程更简单 <https://glyph.twistedmatrix.com/2014/02/unyielding.html>_ .

例子::

  1. from tornado import gen
  2. @gen.coroutine
  3. def fetch_coroutine(url):
  4. http_client = AsyncHTTPClient()
  5. response = yield http_client.fetch(url)
  6. # 在Python 3.3之前, 在generator中是不允许有返回值的
  7. # 必须通过抛出异常来代替.
  8. # 就像 raise gen.Return(response.body).
  9. return response.body

.. _native_coroutines:

Python 3.5: async and await

  1. Python 3.5 引入了 ``async`` ``await`` 关键字(使用这些关键字的
  2. 函数也被称为"原生协程"). Tornado 4.3, 你可以用它们代替 ``yield`` 为基础的协程.
  3. 只需要简单的使用 ``async def foo()`` 在函数定义的时候代替 ``@gen.coroutine`` 装饰器, ``await`` 代替yield. 本文档的其他部分会继续使用 ``yield`` 的风格来和旧版本的Python兼容, 但是如果 ``async`` ``await`` 可用的话,它们运行起来会更快::
  4. async def fetch_coroutine(url):
  5. http_client = AsyncHTTPClient()
  6. response = await http_client.fetch(url)
  7. return response.body
  8. ``await`` 关键字比 ``yield`` 关键字功能要少一些.
  9. 例如,在一个使用 ``yield`` 的协程中, 你可以得到
  10. ``Futures`` 列表, 但是在原生协程中,你必须把列表用 `tornado.gen.multi` 包起来. 你也可以使用 `tornado.gen.convert_yielded`
  11. 来把任何使用 ``yield`` 工作的代码转换成使用 ``await`` 的形式.
  12. 虽然原生协程没有明显依赖于特定框架(例如它们没有使用装饰器,例如 `tornado.gen.coroutine`
  13. `asyncio.coroutine`), 不是所有的协程都和其他的兼容. 有一个 *协程执行者(coroutine runner)* 在第一个协程被调用的时候进行选择, 然后被所有用 ``await`` 直接调用的协程共享.
  14. Tornado 的协程执行者(coroutine runner)在设计上是多用途的,可以接受任何来自其他框架的awaitable对象;
  15. 其他的协程运行时可能有很多限制(例如, ``asyncio`` 协程执行者不接受来自其他框架的协程).
  16. 基于这些原因,我们推荐组合了多个框架的应用都使用Tornado的协程执行者来进行协程调度.
  17. 为了能使用Tornado来调度执行asyncio的协程, 可以使用
  18. `tornado.platform.asyncio.to_asyncio_future` 适配器.
  19. 它是如何工作的
  20. ~~~~~~~~~~~~~~~~~~~~
  21. 包含了 ``yield`` 关键字的函数是一个 **生成器(generator)**. 所有的生成器都是异步的;
  22. 当调用它们的时候,会返回一个生成器对象,而不是一个执行完的结果.
  23. ``@gen.coroutine`` 装饰器通过 ``yield`` 表达式和生成器进行交流, 而且通过返回一个 `.Future` 与协程的调用方进行交互.
  24. 下面是一个协程装饰器内部循环的简单版本::
  25. # tornado.gen.Runner 简化的内部循环
  26. def run(self):
  27. # send(x) makes the current yield return x.
  28. # It returns when the next yield is reached
  29. future = self.gen.send(self.next)
  30. def callback(f):
  31. self.next = f.result()
  32. self.run()
  33. future.add_done_callback(callback)
  34. 装饰器从生成器接收一个 `.Future` 对象, 等待(非阻塞的)这个 `.Future`
  35. 对象执行完成, 然后"解开(unwraps)" 这个 `.Future` 对象,并把结果作为
  36. ``yield`` 表达式的结果传回给生成器. 大多数异步代码从来不会直接接触 `.Future`
  37. 除非 `.Future` 立即通过异步函数返回给 ``yield`` 表达式.
  38. 如何调用协程

协程一般不会抛出异常: 它们抛出的任何异常将被 .Future 捕获 直到它被得到. 这意味着用正确的方式调用协程是重要的, 否则你可能有被 忽略的错误::

  1. @gen.coroutine
  2. def divide(x, y):
  3. return x / y
  4. def bad_call():
  5. # 这里应该抛出一个 ZeroDivisionError 的异常, 但事实上并没有
  6. # 因为协程的调用方式是错误的.
  7. divide(1, 0)

几乎所有的情况下, 任何一个调用协程的函数都必须是协程它自身, 并且在 调用的时候使用 yield 关键字. 当你复写超类中的方法, 请参阅文档, 看看协程是否支持(文档应该会写该方法 “可能是一个协程” 或者 “可能返回 一个 .Future “)::

  1. @gen.coroutine
  2. def good_call():
  3. # yield 将会解开 divide() 返回的 Future 并且抛出异常
  4. yield divide(1, 0)

有时你可能想要对一个协程”一劳永逸”而且不等待它的结果. 在这种情况下, 建议使用 .IOLoop.spawn_callback, 它使得 .IOLoop 负责调用. 如果 它失败了, .IOLoop 会在日志中把调用栈记录下来::

  1. # IOLoop 将会捕获异常,并且在日志中打印栈记录.
  2. # 注意这不像是一个正常的调用, 因为我们是通过
  3. # IOLoop 调用的这个函数.
  4. IOLoop.current().spawn_callback(divide, 1, 0)

最后, 在程序顶层, 如果 .IOLoop 尚未运行, 你可以启动 .IOLoop, 执行协程,然后使用 .IOLoop.run_sync 方法停止 .IOLoop . 这通常被 用来启动面向批处理程序的 main 函数::

  1. # run_sync() 不接收参数,所以我们必须把调用包在lambda函数中.
  2. IOLoop.current().run_sync(lambda: divide(1, 0))

协程模式 ~~~~~~

结合 callback ^^^^^^^^^^^^^^^^^^^^^^^^^^

为了使用回调而不是 .Future 与异步代码进行交互, 把调用包在 .Task 中. 这将为你添加一个回调参数并且返回一个可以yield的 .Future :

.. testcode::

  1. @gen.coroutine
  2. def call_task():
  3. # 注意这里没有传进来some_function.
  4. # 这里会被Task翻译成
  5. # some_function(other_args, callback=callback)
  6. yield gen.Task(some_function, other_args)

.. testoutput:: :hide:

调用阻塞函数 ^^^^^^^^^^^^^^^^^^^^^^^^^^

从协程调用阻塞函数最简单的方式是使用 ~concurrent.futures.ThreadPoolExecutor, 它将返回和协程兼容的 Futures ::

  1. thread_pool = ThreadPoolExecutor(4)
  2. @gen.coroutine
  3. def call_blocking():
  4. yield thread_pool.submit(blocking_func, args)

并行 ^^^^^^^^^^^

协程装饰器能识别列表或者字典对象中各自的 Futures, 并且并行的等待这些 Futures :

.. testcode::

  1. @gen.coroutine
  2. def parallel_fetch(url1, url2):
  3. resp1, resp2 = yield [http_client.fetch(url1),
  4. http_client.fetch(url2)]
  5. @gen.coroutine
  6. def parallel_fetch_many(urls):
  7. responses = yield [http_client.fetch(url) for url in urls]
  8. # 响应是和HTTPResponses相同顺序的列表
  9. @gen.coroutine
  10. def parallel_fetch_dict(urls):
  11. responses = yield {url: http_client.fetch(url)
  12. for url in urls}
  13. # 响应是一个字典 {url: HTTPResponse}

.. testoutput:: :hide:

交叉存取 ^^^^^^^^^^^^

有时候保存一个 .Future 比立即yield它更有用, 所以你可以在等待之前 执行其他操作:

.. testcode::

  1. @gen.coroutine
  2. def get(self):
  3. fetch_future = self.fetch_next_chunk()
  4. while True:
  5. chunk = yield fetch_future
  6. if chunk is None: break
  7. self.write(chunk)
  8. fetch_future = self.fetch_next_chunk()
  9. yield self.flush()

.. testoutput:: :hide:

循环 ^^^^^^^

协程的循环是棘手的, 因为在Python中没有办法在 for 循环或者 while 循环 yield 迭代器,并且捕获yield的结果. 相反,你需要将 循环条件从访问结果中分离出来, 下面是一个使用 Motor <http://motor.readthedocs.org/en/stable/>_ 的例子::

  1. import motor
  2. db = motor.MotorClient().test
  3. @gen.coroutine
  4. def loop_example(collection):
  5. cursor = db.collection.find()
  6. while (yield cursor.fetch_next):
  7. doc = cursor.next_object()

在后台运行 ^^^^^^^^^^^^^^^^^^^^^^^^^

.PeriodicCallback 通常不使用协程. 相反,一个协程可以包含一个 while True: 循环并使用 tornado.gen.sleep::

  1. @gen.coroutine
  2. def minute_loop():
  3. while True:
  4. yield do_something()
  5. yield gen.sleep(60)
  6. # Coroutines that loop forever are generally started with
  7. # spawn_callback().
  8. IOLoop.current().spawn_callback(minute_loop)

有时可能会遇到一个更复杂的循环. 例如, 上一个循环运行每次花费 60+N 秒, 其中 Ndo_something() 花费的时间. 为了 准确的每60秒运行,使用上面的交叉模式::

  1. @gen.coroutine
  2. def minute_loop2():
  3. while True:
  4. nxt = gen.sleep(60) # 开始计时.
  5. yield do_something() # 计时后运行.
  6. yield nxt # 等待计时结束.