异步协程网络爬虫

A. Jesse Jiryu Davis在纽约为MongoDB工作。他编写了Motor,异步MongoDB Python驱动器,他也是MongoDB C驱动器的首席开发者, 同时他也是PyMango组织的成员之一。他对asyncio和Tornado同样有着杰出贡献。他的博客是 http://emptysqua.re

Guido van Rossum,Python之父,Python是目前主要的编程语言之一,无论线上线下。 他在社区里一直是一位仁慈的独裁者,一个来自Monty Python短剧的标题。Guido网址是http://www.python.org/~guido/ .

Introduction 介绍

经典计算机科学看重高效的算法以便能尽快完成计算。但是许多网络程序消耗的时间不是在计算上,它们通常维持着许多打开的缓慢的连接,或者期待着一些不频繁发生的事件发生。这些程序代表了另一个不同的挑战:如何高效的监听大量网络事件。解决这个问题的一个现代方法是采用异步I/O.

这一章节实现了一个简单的网络爬虫。这个爬虫是一个异步调用的原型应用程序,因为它需要等待许多响应,而极少有CPU计算。它每次可以抓取的页面越多,它运行结束的时间越快。 如果它为每一个运行的请求分发一个线程,那么随着并发请求数量的增加,它最终会在耗尽系统套接字之前,耗尽内存或者其他线程相关的资源。 它通过使用异步I/O来避免对大量线程依赖。

我们通过三步来实现这个例子。首先,我们展示一个异步的事件循环,并且完成一个带有回掉函数并且使用这个循环的爬虫:它非常的高效,但是当我们想扩展它来适应更复杂的问题时会带来很多难以处理的代码。因此,接下来我们展示一个即高效又容易扩展的python的协程的程序。第三步,我们使用python标准库中的“asyncio”库中的全功能的协程程序,然后通过async异步队列来组合他们。

The Task

一个网络爬虫会寻找并且下载一个网站上的所有页面,可能会存档或者对他们建立索引。从一个根节点开始,它爬取每一个页面,解析页面并且寻找从未访问过的链接,然后把他们加入到队列中。当解析到一个没有从未访问过的链接的页面并且队列是空的时候,爬虫会停止。

我们可以通过同时下载许多页面来加快这个过程。当爬虫发现新的链接时,它在单独的sockets上同时启动抓取新页面的操作。当抓取结果抵达时,它开始解析响应,并往队列里添加新解析到的链接。 大量的并发请求可能导致一些性能降低,因而我们限制同一时间内请求的数量,把其他的链接加入队列直到一些运行中的请求完成。

The Traditional Approach 传统的实现方法

我们该如何让爬虫并发处理请求呢?传统方法是建立一个线程池。每个进程每次将负责通过一个socket下载一个页面。比如,下载“xkcd.com”的一个页面:

  1. def fetch(url):
  2. sock = socket.socket()
  3. sock.connect(('xkcd.com', 80))
  4. request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
  5. sock.send(request.encode('ascii'))
  6. response = b''
  7. chunk = sock.recv(4096)
  8. while chunk:
  9. response += chunk
  10. chunk = sock.recv(4096)
  11. # Page is now downloaded.
  12. links = parse_links(response)
  13. q.add(links)

默认情况下,socket操作是阻塞的:当一个线程调用像connect或者recv之类Socket相关的方法时,它会被阻塞直至操作完成。 因此,一次性并行下载很多页面,我们得需要更多的线程。一个复杂点的程序通过将线程池中的空闲线程保持在线程池中,然后将他们检出以重用他们来用于后续任务中分摊线程创建的成本,它对连接池中的套接字执行相同的操作。

然而,线程的开销是很昂贵的,并且操作系统对进程,线程的数量进行各种限制。在Jesse的系统上,Python线程需要大约50k的内存,并且启动数以万计的线程会导致失败。 如果我们在并发socket上扩展到数万个并发操作,我们就耗尽了线程,然后我们用完了socket。 每个线程的开销或系统对线程的限制是瓶颈。

在他那篇颇有影响力的文章《The C10K problem》中,Dan Kegel概述了用多线程并行处理I/O问题的局限性。

是时候让web服务器同时处理数万客户端请求了,不是吗?毕竟,web那么大。

Kegel在1999年发明了“C10K”这个词。一万连接现在听起来觉得很少,但问题的关键点在于连接的数量而不在于类型。回到那个年代,一个连接使用一个线程来处理C10K问题是不实际的。现在容量已经是当初的好几个数量级了。说实话,我们的爬虫小玩具使用线程的方式也能运行的很好。但对于需要面对成百上千连接的大规模应用程序来说,使用线程的缺陷还是依旧在这儿:大部分操作系统还能创建Socket,但是不能再继续创建线程了。我们如何克服这个难题呢?

Async 异步

异步I / O框架使用非阻塞socket在单个线程上执行并行操作。 在我们的异步爬虫中,我们在开始连接到服务器之前设置socket无阻塞:

  1. sock = socket.socket()
  2. sock.setblocking(False)
  3. try:
  4. sock.connect(('xkcd.com', 80))
  5. except BlockingIOError:
  6. pass

非阻塞套接字从connect抛出一个异常,即使它正常工作。 这个异常复制了底层C函数的行为,它将errno设置为EINPROGRESS来告诉你它已经开始。

现在我们的爬虫需要一种方法来知道连接何时建立,然后它可以发送HTTP请求。 我们可以简单地在一个循环中尝试:

  1. request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
  2. encoded = request.encode('ascii')
  3. while True:
  4. try:
  5. sock.send(encoded)
  6. break # Done.
  7. except OSError as e:
  8. pass
  9. print('sent')

这种方法不仅浪费电力,而且不能有效地等待*多个socket上的事件。 在过去,BSD Unix的这个问题的解决方案是select,一个C函数,等待一个事件发生在一个非阻塞的套接字或者一个小的数组。 如今,对具有大量连接的互联网应用的需求导致了诸如poll',然后是BSD上的kqueue’和Linux上的`epoll’的替换。 这些API类似于“select”,但是对于非常大量的连接执行得很好。

Python 3.4的“DefaultSelector”使用你的系统上最好的类似select的函数。 为了注册有关网络I / O的通知,我们创建一个非阻塞socket并使用默认选择器注册它:

  1. from selectors import DefaultSelector, EVENT_WRITE
  2. selector = DefaultSelector()
  3. sock = socket.socket()
  4. sock.setblocking(False)
  5. try:
  6. sock.connect(('xkcd.com', 80))
  7. except BlockingIOError:
  8. pass
  9. def connected():
  10. selector.unregister(sock.fileno())
  11. print('connected!')
  12. selector.register(sock.fileno(), EVENT_WRITE, connected)

我们忽略了错误并调用了selector.register,传递了socket的文件描述符和一个表示我们正在等待什么事件的常量。 要在连接建立时获得通知,我们传递EVENT_WRITE:也就是说,我们想知道套接字何时是可写的。 我们还传递一个Python函数connected,当事件发生时运行。 这样的函数称为回调

当选择器接收到它们时,我们在一个循环中处理I / O通知:

  1. def loop():
  2. while True:
  3. events = selector.select()
  4. for event_key, event_mask in events:
  5. callback = event_key.data
  6. callback()

connected回调被存储为event_key.data,我们在非阻塞socket连接时取回并执行。

与上面的快速循环不同,这里select调用暂停,等待下一个I / O事件。 然后循环运行等待这些事件的回调。 尚未完成的操作将保持挂起,直到事件循环的某个未来时间点为止

我们已经证明了什么? 我们展示了当操作准备好时如何开始操作和执行回调。 异步框架建立在我们所展示的两个特性(非阻塞socket和事件循环)上,以在单个线程上运行并发操作。

我们在这里实现了“并发”,但不是传统上被称为“并行性”。 也就是说,我们构建了一个重叠I / O的小系统。 它能够开始新的操作,而其他人在飞行。 它实际上并不利用多个核来并行执行计算。 但是,这个系统是为I / O绑定的问题设计的,而不是CPU绑定的。[^ 14]

因此,我们的事件循环在并发I / O方面是高效的,因为它不会将线程资源分配给每个连接。 但在我们继续前,重要的是纠正一个常见的误解,即异步的速度比多线程快。 通常不是,事实上,在Python中,像我们这样的事件循环比服务少且非常活跃的连接的多线程慢。 在没有全局解释器锁的运行时,线程在这样的工作负载上表现更好。 什么时候异步I / O是正确的,是与许多慢或困连接与罕见的事件的应用程序。[^ 11][^bayer]

Programming With Callbacks 回调

随着我们构建的runty异步框架到目前为止,我们如何构建一个网络爬虫? 即使一个简单的URL爬虫写起来也是痛苦的。 We begin with global sets of the URLs we have yet to fetch, and the URLs we have seen: 我们从全局的还没有抓取URL集合和我们看到的URL开始:

  1. urls_todo = set(['/'])
  2. seen_urls = set(['/'])

seen_urls集合包括urls_todo和完成的URL。 这两个集合由根URL“/”初始化。

获取页面将需要一系列回调。 当连接socket时,连接回调触发,并向服务器发送GET请求。 但是它必须等待响应,所以它注册另一个回调。 如果,当回调触发时,它不能读取完整的响应,它再次注册,等等。

让我们将这些回调收集到一个Fetcher对象中。 它需要一个URL,一个socket对象和一个地方来累积response bytes:

  1. class Fetcher:
  2. def __init__(self, url):
  3. self.response = b'' # Empty array of bytes.
  4. self.url = url
  5. self.sock = None

We begin by calling Fetcher.fetch:

  1. # Method on Fetcher class.
  2. def fetch(self):
  3. self.sock = socket.socket()
  4. self.sock.setblocking(False)
  5. try:
  6. self.sock.connect(('xkcd.com', 80))
  7. except BlockingIOError:
  8. pass
  9. # Register next callback.
  10. selector.register(self.sock.fileno(),
  11. EVENT_WRITE,
  12. self.connected)

fetch方法开始连接socket。 但请注意,该方法在建立连接之前返回。 它必须将控制权返回到事件循环,以等待连接。 为了理解为什么,想象我们的整个应用程序的结构如下:

  1. # Begin fetching http://xkcd.com/353/
  2. fetcher = Fetcher('/353/')
  3. fetcher.fetch()
  4. while True:
  5. events = selector.select()
  6. for event_key, event_mask in events:
  7. callback = event_key.data
  8. callback(event_key, event_mask)

当调用select时,所有事件通知都在事件循环中处理。 因此,“fetch”必须手动控制事件循环,以便程序知道套接字何时连接。 只有这样,循环才会运行connected回调,它在上面的fetch结尾处注册。 Here is the implementation of connected:这里是connected的实现:

  1. # Method on Fetcher class.
  2. def connected(self, key, mask):
  3. print('connected!')
  4. selector.unregister(key.fd)
  5. request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url)
  6. self.sock.send(request.encode('ascii'))
  7. # Register the next callback.
  8. selector.register(key.fd,
  9. EVENT_READ,
  10. self.read_response)

该方法发送GET请求。 一个真正的应用程序将检查send的返回值,以防整个消息不能立即发送。 但我们的要求很低,我们的应用程序不复杂。 它调用send,然后等待响应。 当然,它必须注册另一个回调,并放弃对事件循环的控制。 下一个和最后一个回调,read_response,处理服务器的回复:

  1. # Method on Fetcher class.
  2. def read_response(self, key, mask):
  3. global stopped
  4. chunk = self.sock.recv(4096) # 4k chunk size.
  5. if chunk:
  6. self.response += chunk
  7. else:
  8. selector.unregister(key.fd) # Done reading.
  9. links = self.parse_links()
  10. # Python set-logic:
  11. for link in links.difference(seen_urls):
  12. urls_todo.add(link)
  13. Fetcher(link).fetch() # <- New Fetcher.
  14. seen_urls.update(links)
  15. urls_todo.remove(self.url)
  16. if not urls_todo:
  17. stopped = True

每当选择器看到socket是“可读的”时,就会执行回调,这可能意味着两件事情:套接字有数据或关闭。

回调从socket请求最多4K字节的数据。 如果less准备好,chunk包含任何可用的数据。 如果有更多,chunk是4K字节长,并且socket保持可读,所以事件循环在下一个tick时再次运行这个回调。 当响应完成时,服务器已关闭socket,并且“chunk”为空。

parse_links方法(未显示)返回一组URL。 我们为每个新网址开始一个新的抓取器,没有并发上限。 注意使用回调的异步编程的一个很好的功能:我们不需要围绕共享数据的变化的互斥,例如当我们添加链接到seen_urls。 没有抢先的多任务,所以我们不能在我们的代码中的任意点被打断。 We add a global stopped variable and use it to control the loop: 我们添加一个全局stopped变量,并使用它来控制循环:

  1. stopped = False
  2. def loop():
  3. while not stopped:
  4. events = selector.select()
  5. for event_key, event_mask in events:
  6. callback = event_key.data
  7. callback()

一旦所有页面被下载,fetcher会停止全局事件循环,程序退出。

这个例子使async的问题很简单:spaghetti code。 我们需要一些方法来表达一系列计算和I / O操作,并且调度多个这样的一系列操作以并发运行。 但是没有线程,一系列操作不能被收集到单个函数中:每当一个函数开始I / O操作时,它显式地保存将来需要的任何状态,然后返回。 你负责思考和编写这个state-saving代码。

让我们解释一下我们的意思。 考虑我们如何简单地在一个具有常规阻塞socket的线程上获取一个URL:

  1. # Blocking version.
  2. def fetch(url):
  3. sock = socket.socket()
  4. sock.connect(('xkcd.com', 80))
  5. request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
  6. sock.send(request.encode('ascii'))
  7. response = b''
  8. chunk = sock.recv(4096)
  9. while chunk:
  10. response += chunk
  11. chunk = sock.recv(4096)
  12. # Page is now downloaded.
  13. links = parse_links(response)
  14. q.add(links)

这个函数在一个socket操作和下一个socket操作之间记住什么状态? 它有socket,一个URL和累积的“响应”。 在线程上运行的函数使用编程语言的基本特性将该临时状态存储在其堆栈中的局部变量中。 该函数还有一个“continuation”,即它计划在I / O完成后执行的代码。 运行时通过存储线程的指令指针来记住continuation。 您不必考虑恢复这些局部变量和I / O后的继续。 它是内置的语言。

但是使用基于回调的异步框架,这些语言功能没有帮助。 在等待I / O时,函数必须显式地保存其状态,因为函数在I / O完成之前返回并丢失其堆栈。 代替局部变量,我们的基于回调的例子将sockresponse存储为self的属性,Fetcher实例。 代替指令指针,它通过注册回调connectedread_response来存储它的continuation。 随着应用程序的功能增长,我们在回调中手动保存的状态的复杂性也在增加。 这样繁重的记录使得编码器倾向于偏头痛。

更糟的是,如果回调引发异常,在调度链中的下一个回调之前会发生什么? 我们在parse_links方法上做了一个不好的工作,它抛出一个解析一些HTML的异常:

  1. Traceback (most recent call last):
  2. File "loop-with-callbacks.py", line 111, in <module>
  3. loop()
  4. File "loop-with-callbacks.py", line 106, in loop
  5. callback(event_key, event_mask)
  6. File "loop-with-callbacks.py", line 51, in read_response
  7. links = self.parse_links()
  8. File "loop-with-callbacks.py", line 67, in parse_links
  9. raise Exception('parse error')
  10. Exception: parse error

堆栈跟踪仅显示事件循环正在运行回调。 我们不记得是什么导致的错误。 链条在两端都断了:我们忘了我们去哪里,我们从哪来。 这种上下文的丢失被称为“堆栈翻录”,并且在许多情况下它混淆了研究者。 堆栈翻录还防止我们为回调链安装异常处理程序,“try / except”块包装函数调用及其后代树。[^ 7]

因此,除了关于多线程和异步的相对效率的长期争论之外,还有另一个争论是更容易出错的:如果你错误的同步它们,线程就容易受到数据竞争,但是回调由于堆栈翻录,固执的调试 。

Coroutines 协程

我们用一个承诺诱惑你。 可以编写异步代码,将回调的效率与多线程编程的经典好看结合起来。 这种组合通过称为“协程”的模式来实现。 使用Python 3.4的标准asyncio库和一个名为“aiohttp”的包,在协程中获取一个URL是非常直接的[^ 10]:

  1. @asyncio.coroutine
  2. def fetch(self, url):
  3. response = yield from self.session.get(url)
  4. body = yield from response.read()

它也是可扩展的。 与每个线程的50k内存和操作系统对线程的硬限制相比,Python协程在Jesse系统上只需要3k的内存。 Python可以轻松启动数十万个协程。

协程的概念,可以追溯到计算机科学的祖先,很简单:它是一个可以暂停和恢复的子程序。 而线程是由操作系统抢占式多任务,协同多任务协作:他们选择何时暂停,以及哪个协程运行下一步。

有很多协同的实现; 即使在Python有几个。 Python 3.4中的标准“asyncio”库中的协程是基于generator,Future类和“yield from”语句构建的。 从Python 3.5开始,协程是语言本身的一个本地特性[^ 17]; 然而,了解协同程序,因为他们第一次在Python 3.4中实现,使用预先存在的语言设施,是解决Python 3.5的本地协同程序的基础。

为了解释Python 3.4的基于生成器的协程,我们将介绍一些生成器,以及它们如何在asyncio中用作协同程序,并且相信你会喜欢阅读它,就像我们喜欢写它一样。 一旦我们解释了基于生成器的协同程序,我们将使用它们在我们的异步Web爬虫。

How Python Generators Work Python生成器如何工作

在掌握Python生成器之前,您必须了解常规Python函数的工作原理。 通常,当Python函数调用子例程时,子例程保留控制权,直到返回或抛出异常。 然后控制权返回给调用者:

  1. >>> def foo():
  2. ... bar()
  3. ...
  4. >>> def bar():
  5. ... pass

标准的Python解释器是用C编写的。执行Python函数的C函数被称为PyEval_EvalFrameEx。 它需要一个Python栈框架对象,并在框架的上下文中评估Python字节码。 这里是foo的字节码:

  1. >>> import dis
  2. >>> dis.dis(foo)
  3. 2 0 LOAD_GLOBAL 0 (bar)
  4. 3 CALL_FUNCTION 0 (0 positional, 0 keyword pair)
  5. 6 POP_TOP
  6. 7 LOAD_CONST 0 (None)
  7. 10 RETURN_VALUE

foo函数将bar加载到它的堆栈上并调用它,然后从堆栈中弹出其返回值,将None加载到堆栈中,并返回None

PyEval_EvalFrameEx遇到CALL_FUNCTION字节码时,它创建一个新的Python栈框架和递归:也就是说,它用新的框架递归调用PyEval_EvalFrameEx,用来执行bar

了解Python堆栈在堆内存中分配是至关重要的! Python解释器是一个正常的C程序,所以它的堆栈是正常的堆栈帧。 但是 Python 堆栈框架操纵是在堆上。 除此之外,这意味着Python堆栈帧可以超过其函数调用。 要以交互方式查看,请从bar中保存当前帧:

  1. >>> import inspect
  2. >>> frame = None
  3. >>> def foo():
  4. ... bar()
  5. ...
  6. >>> def bar():
  7. ... global frame
  8. ... frame = inspect.currentframe()
  9. ...
  10. >>> foo()
  11. >>> # The frame was executing the code for 'bar'.
  12. >>> frame.f_code.co_name
  13. 'bar'
  14. >>> # Its back pointer refers to the frame for 'foo'.
  15. >>> caller_frame = frame.f_back
  16. >>> caller_frame.f_code.co_name
  17. 'foo'

该阶段现在设置为Python生成器,它使用相同的构建块 - 代码对象和堆栈帧 - 以奇妙的效果。 This is a generator function: 这是一个生成器函数:

  1. >>> def gen_fn():
  2. ... result = yield 1
  3. ... print('result of yield: {}'.format(result))
  4. ... result2 = yield 2
  5. ... print('result of 2nd yield: {}'.format(result2))
  6. ... return 'done'
  7. ...

当Python将gen_fn编译成字节码时,它看到yield语句,并且知道gen_fn是一个生成器函数,而不是一个常规函数。 它设置一个标志,以记住这个事实:

  1. >>> # The generator flag is bit position 5.
  2. >>> generator_bit = 1 << 5
  3. >>> bool(gen_fn.__code__.co_flags & generator_bit)
  4. True

当你调用一个生成器函数,Python看到生成器标志,它实际上不运行该函数。 相反,它创建一个生成器:

  1. >>> gen = gen_fn()
  2. >>> type(gen)
  3. <class 'generator'>

Python生成器封装了一个栈帧加上一些代码的引用,gen_fn的主体:

  1. >>> gen.gi_code.co_name
  2. 'gen_fn'

来自gen_fn调用的所有生成器都指向这个相同的代码。 但每个都有自己的堆栈帧。 这个堆栈帧不在任何实际堆栈,它在堆内存等待被使用: \aosafigure[240pt]{crawler-images/generator.png}{Generators}{500l.crawler.generators}

该帧具有“最后指令”指针,它是最近执行的指令。 开始时,最后一个指令指针是-1,表示生成器尚未开始:

  1. >>> gen.gi_frame.f_lasti
  2. -1

当我们调用’send’时,生成器到达它的第一个“yield”,并暂停。 send的返回值是1,因为这是gen传递给yield表达式:

  1. >>> gen.send(None)
  2. 1

生成器的指令指针现在是3个字节码,部分通过编译的Python的56个字节:

  1. >>> gen.gi_frame.f_lasti
  2. 3
  3. >>> len(gen.gi_code.co_code)
  4. 56

生成器可以在任何时候从任何函数恢复,因为它的堆栈帧实际上不在堆栈上:它在堆上。 它在调用层次结构中的位置不是固定的,并且它不需要遵守常规函数执行的先进先出顺序。 它是解放的,浮动自由像云。

我们可以发送值“hello”到生成器,它成为yield表达式的结果,生成器继续,直到它产生2:

  1. >>> gen.send('hello')
  2. result of yield: hello
  3. 2

Its stack frame now contains the local variable result: 它的堆栈帧现在包含局部变量result:

  1. >>> gen.gi_frame.f_locals
  2. {'result': 'hello'}

gen_fn创建的其他生成器将有自己的堆栈帧和局部变量 When we call send again, the generator continues from its second yield, and finishes by raising the special StopIteration exception: 当我们再次调用send时,生成器从它的第二个yield继续,并且通过提高特殊的StopIteration异常来结束:

  1. >>> gen.send('goodbye')
  2. result of 2nd yield: goodbye
  3. Traceback (most recent call last):
  4. File "<input>", line 1, in <module>
  5. StopIteration: done

异常有一个值,它是生成器的返回值:字符串"done"

Building Coroutines With Generators 构造带有生成器的协程

因此,发生器可以暂停,并且可以使用值恢复,并且它具有返回值。 听起来像一个很好的原语,构建一个异步编程模型,没有意大利面条回调! 我们想建立一个“协程”:一个与程序中的其他程序合作安排的程序。 我们的协程将是Python标准“asyncio”库中的简化版本。 在asyncio中,我们将使用generator,futures和“yield from”语句。

首先,我们需要一种方法来表示协程正在等待的future结果。 精简版本:

  1. class Future:
  2. def __init__(self):
  3. self.result = None
  4. self._callbacks = []
  5. def add_done_callback(self, fn):
  6. self._callbacks.append(fn)
  7. def set_result(self, result):
  8. self.result = result
  9. for fn in self._callbacks:
  10. fn(self)

future 最初是“待定”。 它是通过调用set_result来“解析”的。[^ 12] Let us adapt our fetcher to use futures and coroutines. We wrote fetch with a callback: 让我们调整我们的fetcher使用futures 和协程。 我们用回调写了fetch

  1. class Fetcher:
  2. def fetch(self):
  3. self.sock = socket.socket()
  4. self.sock.setblocking(False)
  5. try:
  6. self.sock.connect(('xkcd.com', 80))
  7. except BlockingIOError:
  8. pass
  9. selector.register(self.sock.fileno(),
  10. EVENT_WRITE,
  11. self.connected)
  12. def connected(self, key, mask):
  13. print('connected!')
  14. # And so on....

fetch方法开始连接一个socket,然后注册回调,connect,当socket就绪时执行。 现在我们可以将这两个步骤组合成一个协程:

  1. def fetch(self):
  2. sock = socket.socket()
  3. sock.setblocking(False)
  4. try:
  5. sock.connect(('xkcd.com', 80))
  6. except BlockingIOError:
  7. pass
  8. f = Future()
  9. def on_connected():
  10. f.set_result(None)
  11. selector.register(sock.fileno(),
  12. EVENT_WRITE,
  13. on_connected)
  14. yield f
  15. selector.unregister(sock.fileno())
  16. print('connected!')

现在fetch是一个生成器函数,而不是一个常规的函数,因为它包含一个yield语句。 我们创建一个待定的Future,然后让它暂停抓取,直到socket准备就绪。 内部函数on_connected解析Future。

但是,当future 解决,用什么来恢复生成器? 我们需要一个协程driver程序。 让我们称之为“任务”:

  1. class Task:
  2. def __init__(self, coro):
  3. self.coro = coro
  4. f = Future()
  5. f.set_result(None)
  6. self.step(f)
  7. def step(self, future):
  8. try:
  9. next_future = self.coro.send(future.result)
  10. except StopIteration:
  11. return
  12. next_future.add_done_callback(self.step)
  13. # Begin fetching http://xkcd.com/353/
  14. fetcher = Fetcher('/353/')
  15. Task(fetcher.fetch())
  16. loop()

任务通过发送None来启动fetch生成器。 然后fetch运行,直到它产生一个future,任务捕获为next_future。 当套接字连接时,事件循环运行回调on_connected,它解析future,它调用step,它恢复fetch

Factoring Coroutines With yield from

一旦socket连接,我们发送HTTP GET请求并读取服务器响应。 这些步骤不再分散在回调中; 我们将它们收集到相同的生成器函数中:

  1. def fetch(self):
  2. # ... connection logic from above, then:
  3. sock.send(request.encode('ascii'))
  4. while True:
  5. f = Future()
  6. def on_readable():
  7. f.set_result(sock.recv(4096))
  8. selector.register(sock.fileno(),
  9. EVENT_READ,
  10. on_readable)
  11. chunk = yield f
  12. selector.unregister(sock.fileno())
  13. if chunk:
  14. self.response += chunk
  15. else:
  16. # Done reading.
  17. break

This code, which reads a whole message from a socket, seems generally useful. How can we factor it from fetch into a subroutine? Now Python 3’s celebrated yield from takes the stage. It lets one generator delegate to another. 这个代码,从socket读取整个消息,似乎很有用。 我们如何将它从fetch转换为子程序? 现在Python 3的yield from走上舞台。 它让一个生成器委托给另一个。 To see how, let us return to our simple generator example: 为了看看怎么做,让我们回到我们简单的生成器示例:

  1. >>> def gen_fn():
  2. ... result = yield 1
  3. ... print('result of yield: {}'.format(result))
  4. ... result2 = yield 2
  5. ... print('result of 2nd yield: {}'.format(result2))
  6. ... return 'done'
  7. ...

To call this generator from another generator, delegate to it with yield from: 要从另一个生成器调用这个生成器,使用yield from来委托它:

  1. >>> # Generator function:
  2. >>> def caller_fn():
  3. ... gen = gen_fn()
  4. ... rv = yield from gen
  5. ... print('return value of yield-from: {}'
  6. ... .format(rv))
  7. ...
  8. >>> # Make a generator from the
  9. >>> # generator function.
  10. >>> caller = caller_fn()

The caller generator acts as if it were gen, the generator it is delegating to: caller生成器就像是gen,它被委托给:

  1. >>> caller.send(None)
  2. 1
  3. >>> caller.gi_frame.f_lasti
  4. 15
  5. >>> caller.send('hello')
  6. result of yield: hello
  7. 2
  8. >>> caller.gi_frame.f_lasti # Hasn't advanced.
  9. 15
  10. >>> caller.send('goodbye')
  11. result of 2nd yield: goodbye
  12. return value of yield-from: done
  13. Traceback (most recent call last):
  14. File "<input>", line 1, in <module>
  15. StopIteration

While caller yields from gen, caller does not advance. Notice that its instruction pointer remains at 15, the site of its yield from statement, even while the inner generator gen advances from one yield statement to the next.[^13] From our perspective outside caller, we cannot tell if the values it yields are from caller or from the generator it delegates to. And from inside gen, we cannot tell if values are sent in from caller or from outside it. The yield from statement is a frictionless channel, through which values flow in and out of gen until gen completes. 虽然callergen产生,caller 不会前进。 注意,它的指令指针保持在15,即它的yield from语句的位置,即使内部生成器gen从一个yield语句前进到下一个。[^ 13]从我们的角度看, 我们不能知道它产生的值是从caller还是从它委派的生成器。 从gen里面,我们不能知道值是从caller还是从外部发送。 “yield from”语句是一个无摩擦的通道,尽管值通过它流入和离开gen,直到gen完成。 A coroutine can delegate work to a sub-coroutine with yield from and receive the result of the work. Notice, above, that caller printed “return value of yield-from: done”. When gen completed, its return value became the value of the yield from statement in caller: 协程可以将工作委托给具有yield from 的子协程,并接收工作的结果。 注意,上面的caller打印“return value of yield-from: done”。 当gen完成时,其返回值成为caller'中yield from`语句的值:

  1. rv = yield from gen

Earlier, when we criticized callback-based async programming, our most strident complaint was about “stack ripping”: when a callback throws an exception, the stack trace is typically useless. It only shows that the event loop was running the callback, not why. How do coroutines fare? 早些时候,当我们批评基于回调的异步编程时,我们最强烈的投诉是关于“stack ripping”:当回调抛出异常时,堆栈跟踪通常是无用的。 它只显示事件循环正在运行回调,而不是为什么。 协程如何运行?

  1. >>> def gen_fn():
  2. ... raise Exception('my error')
  3. >>> caller = caller_fn()
  4. >>> caller.send(None)
  5. Traceback (most recent call last):
  6. File "<input>", line 1, in <module>
  7. File "<input>", line 3, in caller_fn
  8. File "<input>", line 2, in gen_fn
  9. Exception: my error

This is much more useful! The stack trace shows caller_fn was delegating to gen_fn when it threw the error. Even more comforting, we can wrap the call to a sub-coroutine in an exception handler, the same is with normal subroutines: 这更有用! 堆栈跟踪显示 caller_fn在委托gen_fn 时抛出错误。 更令人欣慰的是,我们可以将调用包装到异常处理程序中的子协程,同样的是使用正常的子程序:

  1. >>> def gen_fn():
  2. ... yield 1
  3. ... raise Exception('uh oh')
  4. ...
  5. >>> def caller_fn():
  6. ... try:
  7. ... yield from gen_fn()
  8. ... except Exception as exc:
  9. ... print('caught {}'.format(exc))
  10. ...
  11. >>> caller = caller_fn()
  12. >>> caller.send(None)
  13. 1
  14. >>> caller.send('hello')
  15. caught uh oh

So we factor logic with sub-coroutines just like with regular subroutines. Let us factor some useful sub-coroutines from our fetcher. We write a read coroutine to receive one chunk: 因此,我们使用子协程,就像使用常规子程序一样。 让我们从我们的fetcher中得到一些有用的子协程。 我们写一个read协程来接收一个块:

  1. def read(sock):
  2. f = Future()
  3. def on_readable():
  4. f.set_result(sock.recv(4096))
  5. selector.register(sock.fileno(), EVENT_READ, on_readable)
  6. chunk = yield f # Read one chunk.
  7. selector.unregister(sock.fileno())
  8. return chunk

We build on read with a read_all coroutine that receives a whole message: 我们使用read协程构建read_all,它接收一条完整的消息:

  1. def read_all(sock):
  2. response = []
  3. # Read whole response.
  4. chunk = yield from read(sock)
  5. while chunk:
  6. response.append(chunk)
  7. chunk = yield from read(sock)
  8. return b''.join(response)

If you squint the right way, the yield from statements disappear and these look like conventional functions doing blocking I/O. But in fact, read and read_all are coroutines. Yielding from read pauses read_all until the I/O completes. While read_all is paused, asyncio’s event loop does other work and awaits other I/O events; read_all is resumed with the result of read on the next loop tick once its event is ready. 如果你以正确的方式看待,yield from语句消失,这些看起来像阻塞I / O的常规函数。 但事实上,readread_all是协程。 从 read读取暂停read_all,直到I / O完成。 当read_all被暂停时,asyncio的事件循环执行其他工作,并等待其他I / O事件; read_all在其事件准备就绪后,在下一个循环中返回“read”的结果。 At the stack’s root, fetch calls read_all: 在栈的根,fetch调用read_all

  1. class Fetcher:
  2. def fetch(self):
  3. # ... connection logic from above, then:
  4. sock.send(request.encode('ascii'))
  5. self.response = yield from read_all(sock)

Miraculously, the Task class needs no modification. It drives the outer fetch coroutine just the same as before: 奇怪的是,Task类不需要修改。 它与之前一样驱动外部fetch协程:

  1. Task(fetcher.fetch())
  2. loop()

When read yields a future, the task receives it through the channel of yield from statements, precisely as if the future were yielded directly from fetch. When the loop resolves a future, the task sends its result into fetch, and the value is received by read, exactly as if the task were driving read directly: 当read产生future时,任务通过yield from语句的通道接收它,就好像future直接从fetch中获得。 当循环解决future时,任务将其结果发送到fetch,并且值由read接收,就像任务直接驱动read

\aosafigure[240pt]{crawler-images/yield-from.png}{Yield From}{500l.crawler.yieldfrom}

To perfect our coroutine implementation, we polish out one mar: our code uses yield when it waits for a future, but yield from when it delegates to a sub-coroutine. It would be more refined if we used yield from whenever a coroutine pauses. Then a coroutine need not concern itself with what type of thing it awaits. 为了完善我们的协程实现,我们抛弃一个mar:我们的代码在等待future时使用yield,而在委托给一个子协程时使用yield from。 如果我们每当一个协程暂停时使用yield from,它会更精确。 然后协同程序不需要关心它等待什么类型的事情。 We take advantage of the deep correspondence in Python between generators and iterators. Advancing a generator is, to the caller, the same as advancing an iterator. So we make our Future class iterable by implementing a special method: 我们利用Python在生成器和迭代器之间的深层对应。 推进生成器对于调用者,与推进迭代器相同。 所以我们通过实现一个特殊的方法使我们的Future类可迭代:

  1. # Method on Future class.
  2. def __iter__(self):
  3. # Tell Task to resume me here.
  4. yield self
  5. return self.result

The future’s __iter__ method is a coroutine that yields the future itself. Now when we replace code like this: future的__iter__方法是一个协同程序,它产生future本身。 现在当我们像这样替换代码:

  1. # f is a Future.
  2. yield f

…with this:

  1. # f is a Future.
  2. yield from f

…the outcome is the same! The driving Task receives the future from its call to send, and when the future is resolved it sends the new result back into the coroutine. …结果是一样的! 驱动任务从其对send的调用接收future ,并且当future 被解决时,它将新的结果发送回协程。

What is the advantage of using yield from everywhere? Why is that better than waiting for futures with yield and delegating to sub-coroutines with yield from? It is better because now, a method can freely change its implementation without affecting the caller: it might be a normal method that returns a future that will resolve to a value, or it might be a coroutine that contains yield from statements and returns a value. In either case, the caller need only yield from the method in order to wait for the result. 使用yield from 的优势是什么? 为什么比等待具有 yield的future,并委托给具有yield from的子协程更好? 它是更好的,因为现在,一个方法可以自由地改变其实现,而不影响调用者:它可能是一个正常的方法,返回一个future将解析一个值,或者它可能是一个协程包含yield from语句 和返回一个值。 在任一情况下,调用者只需要yield from 方法来等待结果。 Gentle reader, we have reached the end of our enjoyable exposition of coroutines in asyncio. We peered into the machinery of generators, and sketched an implementation of futures and tasks. We outlined how asyncio attains the best of both worlds: concurrent I/O that is more efficient than threads and more legible than callbacks. Of course, the real asyncio is much more sophisticated than our sketch. The real framework addresses zero-copy I/O, fair scheduling, exception handling, and an abundance of other features. 温柔的读者,我们已经到达了我们愉快的在asyncio的协程的终点。 我们探讨了生成器的机制,并草拟了一个futures 和tasks的实现。 我们概述了asyncio如何实现两个中最好的:并发I / O比线程更有效,比回调更清晰。 当然,真正的asyncio比我们的草图更复杂。 真正的框架解决了零拷贝I / O,公平调度,异常处理和大量其他功能。 To an asyncio user, coding with coroutines is much simpler than you saw here. In the code above we implemented coroutines from first principles, so you saw callbacks, tasks, and futures. You even saw non-blocking sockets and the call to select. But when it comes time to build an application with asyncio, none of this appears in your code. As we promised, you can now sleekly fetch a URL: 对于asyncio用户,使用协程的编码比你在这里看到的要简单得多。 在上面的代码中,我们从第一个原则实现协程,所以你看到回调,tasks和futures。 你甚至看到非阻塞socket和调用 select。 但是当使用asyncio构建应用程序时,这些都不会出现在您的代码中。 正如我们承诺的,你现在可以顺利地获取一个URL:

  1. @asyncio.coroutine
  2. def fetch(self, url):
  3. response = yield from self.session.get(url)
  4. body = yield from response.read()

Satisfied with this exposition, we return to our original assignment: to write an async web crawler, using asyncio. 我们回到我们原来的任务:写一个异步的网络爬虫,使用asyncio。

Coordinating Coroutines

We began by describing how we want our crawler to work. Now it is time to implement it with asyncio coroutines. 我们开始描述我们希望我们的爬虫如何工作。 现在是时候实现它与asyncio协程。 Our crawler will fetch the first page, parse its links, and add them to a queue. After this it fans out across the website, fetching pages concurrently. But to limit load on the client and server, we want some maximum number of workers to run, and no more. Whenever a worker finishes fetching a page, it should immediately pull the next link from the queue. We will pass through periods when there is not enough work to go around, so some workers must pause. But when a worker hits a page rich with new links, then the queue suddenly grows and any paused workers should wake and get cracking. Finally, our program must quit once its work is done. 我们的抓取工具会抓取第一页,解析其链接,并将其添加到队列中。 在这之后,它退出网站,同时抓取页面。 但是为了限制客户端和服务器上的负载,我们希望运行一些最大数量的worker,并且不会更多。 每当一个worker完成提取页面时,它应该立即从队列中拉下一个链接。 当没有足够的工作量时,我们会暂停一些worker。 但是当一个worker点击一个富有新链接的页面时,队列突然增长,任何暂停的worker都应该醒来并开始工作。 最后,我们的程序必须在其工作完成后退出。 Imagine if the workers were threads. How would we express the crawler’s algorithm? We could use a synchronized queue^5 from the Python standard library. Each time an item is put in the queue, the queue increments its count of “tasks”. Worker threads call task_done after completing work on an item. The main thread blocks on Queue.join until each item put in the queue is matched by a task_done call, then it exits. 想象一下,如果workers 是线程。 我们将如何表达爬虫的算法? 我们可以使用来自Python标准库的同步队列[^ 5]。 每次将项目放入队列时,队列都会增加其“任务”的计数。 工作线程在完成对项目的工作后调用task_doneQueue.join中的主线程直到每个项目放入队列后阻塞,然后通过task_done调用来匹配,然后退出。 Coroutines use the exact same pattern with an asyncio queue! First we import it^6: 协程使用与asyncio队列完全相同的模式! 首先我们导入它[^ 6]:

  1. try:
  2. from asyncio import JoinableQueue as Queue
  3. except ImportError:
  4. # In Python 3.5, asyncio.JoinableQueue is
  5. # merged into Queue.
  6. from asyncio import Queue

We collect the workers’ shared state in a crawler class, and write the main logic in its crawl method. We start crawl on a coroutine and run asyncio’s event loop until crawl finishes: 我们在爬虫类中收集workers的共享状态,并在其crawl方法中编写主逻辑。 我们在协程上启动crawl并运行asyncio的事件循环,直到crawl完成:

  1. loop = asyncio.get_event_loop()
  2. crawler = crawling.Crawler('http://xkcd.com',
  3. max_redirect=10)
  4. loop.run_until_complete(crawler.crawl())

The crawler begins with a root URL and max_redirect, the number of redirects it is willing to follow to fetch any one URL. It puts the pair (URL, max_redirect) in the queue. (For the reason why, stay tuned.) 爬虫以根网址和max_redirect开头,the number of redirects it is willing to follow to fetch any one URL(不知道怎么翻译比较好…先放在这) 它将(URL, max_redirect)放在队列中。 (为什么,请继续关注。)

  1. class Crawler:
  2. def __init__(self, root_url, max_redirect):
  3. self.max_tasks = 10
  4. self.max_redirect = max_redirect
  5. self.q = Queue()
  6. self.seen_urls = set()
  7. # aiohttp's ClientSession does connection pooling and
  8. # HTTP keep-alives for us.
  9. self.session = aiohttp.ClientSession(loop=loop)
  10. # Put (URL, max_redirect) in the queue.
  11. self.q.put((root_url, self.max_redirect))

The number of unfinished tasks in the queue is now one. Back in our main script, we launch the event loop and the crawl method: 队列中未完成任务的数量现在为1。 回到我们的主脚本,我们启动事件循环和crawl方法:

  1. loop.run_until_complete(crawler.crawl())

The crawl coroutine kicks off the workers. It is like a main thread: it blocks on join until all tasks are finished, while the workers run in the background. crawl协程启动workers。 它像一个主线程:它阻塞在join,直到所有任务完成,而workers 在后台运行。

  1. @asyncio.coroutine
  2. def crawl(self):
  3. """Run the crawler until all work is done."""
  4. workers = [asyncio.Task(self.work())
  5. for _ in range(self.max_tasks)]
  6. # When all work is done, exit.
  7. yield from self.q.join()
  8. for w in workers:
  9. w.cancel()

If the workers were threads we might not wish to start them all at once. To avoid creating expensive threads until it is certain they are necessary, a thread pool typically grows on demand. But coroutines are cheap, so we simply start the maximum number allowed. 如果workers 是线程,我们可能不希望立即启动它们。 以避免在确定需要之前创建昂贵的线程,线程池通常根据需要增长。 但协程很便宜,所以我们简单的启动允许的最大数量。 It is interesting to note how we shut down the crawler. When the join future resolves, the worker tasks are alive but suspended: they wait for more URLs but none come. So, the main coroutine cancels them before exiting. Otherwise, as the Python interpreter shuts down and calls all objects’ destructors, living tasks cry out: 有趣的是,我们注意到我们如何关闭爬虫。 当joinfuture 解析时,worker 任务活着但是被暂停:它们等待更多的URL,但没有来到。 因此,主协程在退出之前取消它们。 否则,当Python解释器关闭并调用所有对象的析构函数时,存活的任务崩溃了:

  1. ERROR:asyncio:Task was destroyed but it is pending!

And how does cancel work? Generators have a feature we have not yet shown you. You can throw an exception into a generator from outside: cancel是如何工作的? 生成器具有我们尚未向您展示的功能。 您可以从外部将异常抛出到生成器中:

  1. >>> gen = gen_fn()
  2. >>> gen.send(None) # Start the generator as usual.
  3. 1
  4. >>> gen.throw(Exception('error'))
  5. Traceback (most recent call last):
  6. File "<input>", line 3, in <module>
  7. File "<input>", line 2, in gen_fn
  8. Exception: error

The generator is resumed by throw, but it is now raising an exception. If no code in the generator’s call stack catches it, the exception bubbles back up to the top. So to cancel a task’s coroutine: 生成器由throw恢复,但它现在引发异常。 如果生成器的调用栈中没有代码捕获它,则异常冒泡回到顶部。 所以为了取消任务的协程:

  1. # Method of Task class.
  2. def cancel(self):
  3. self.coro.throw(CancelledError)

Wherever the generator is paused, at some yield from statement, it resumes and throws an exception. We handle cancellation in the task’s step method: 无论生成器何时暂停,在某些yield from语句中,它会恢复并抛出异常。 我们在任务的step方法中处理取消:

  1. # Method of Task class.
  2. def step(self, future):
  3. try:
  4. next_future = self.coro.send(future.result)
  5. except CancelledError:
  6. self.cancelled = True
  7. return
  8. except StopIteration:
  9. return
  10. next_future.add_done_callback(self.step)

Now the task knows it is cancelled, so when it is destroyed it does not rage against the dying of the light. 现在任务知道它被取消,所以当它被销毁时,它不愤怒反对死亡。 Once crawl has canceled the workers, it exits. The event loop sees that the coroutine is complete (we shall see how later), and it too exits: 一旦crawl取消了workers,它就退出了。 事件循环看到协程是完成了的(我们将看到稍后),它也退出:

  1. loop.run_until_complete(crawler.crawl())

The crawl method comprises all that our main coroutine must do. It is the worker coroutines that get URLs from the queue, fetch them, and parse them for new links. Each worker runs the work coroutine independently: crawl方法包括我们的主协程必须做的所有事情。 它是worker 协程,从队列获取URL,获取它们,并解析它们的新链接。 每个worker 独立运行work协同程序:

  1. @asyncio.coroutine
  2. def work(self):
  3. while True:
  4. url, max_redirect = yield from self.q.get()
  5. # Download page and add new links to self.q.
  6. yield from self.fetch(url, max_redirect)
  7. self.q.task_done()

Python sees that this code contains yield from statements, and compiles it into a generator function. So in crawl, when the main coroutine calls self.work ten times, it does not actually execute this method: it only creates ten generator objects with references to this code. It wraps each in a Task. The Task receives each future the generator yields, and drives the generator by calling send with each future’s result when the future resolves. Because the generators have their own stack frames, they run independently, with separate local variables and instruction pointers. Python看到这个代码包含yield from语句,并将其编译成一个生成器函数。 因此,在爬虫运行时,当主协程调用 self.work 十次时,它实际上不执行此方法:它只创建十个生成器对象引用此代码。 它将每个任务包装在一个Task中。 任务接收每个未来的生成器 yields,并通过在未来结算时通过调用send 与每个future的结果来驱动生成器。 因为生成器具有自己的堆栈帧,所以它们独立运行,具有单独的局部变量和指令指针。 The worker coordinates with its fellows via the queue. It waits for new URLs with: worker 通过队列与其同事协调。 它等待具有以下内容的新网址:

  1. url, max_redirect = yield from self.q.get()

The queue’s get method is itself a coroutine: it pauses until someone puts an item in the queue, then resumes and returns the item. 队列的get方法本身是一个协程:它将暂停,直到有人将一个项目放入队列,然后就会恢复并返回项目。 Incidentally, this is where the worker will be paused at the end of the crawl, when the main coroutine cancels it. From the coroutine’s perspective, its last trip around the loop ends when yield from raises a CancelledError. 顺便说一下,当主协程取消它,worker 将在爬取结束时暂停。 从协程的角度来看,当“yield from”引发一个“CancelledError”时,它的最后一次循环结束。 When a worker fetches a page it parses the links and puts new ones in the queue, then calls task_done to decrement the counter. Eventually, a worker fetches a page whose URLs have all been fetched already, and there is also no work left in the queue. Thus this worker’s call to task_done decrements the counter to zero. Then crawl, which is waiting for the queue’s join method, is unpaused and finishes. 当worker 提取页面时,它会解析链接并将新的链接放入队列,然后调用task_done来递减计数器。 最终,worker 获取已经获取了URL的页面,并且队列中也没有剩余的工作。 因此,这个worker 对“task_done”的调用将计数器减少为零。 然后, crawl,它等待队列的join方法,被取消暂停并完成。 We promised to explain why the items in the queue are pairs, like: 我们承诺过解释为什么队列中的项目是成对的,如:

  1. # URL to fetch, and the number of redirects left.
  2. ('http://xkcd.com/353', 10)

New URLs have ten redirects remaining. Fetching this particular URL results in a redirect to a new location with a trailing slash. We decrement the number of redirects remaining, and put the next location in the queue: 新网址有十个重定向。 获取此特定网址会导致重定向到带有尾部斜杠的新位置。 我们减少剩余的重定向数,并将下一个位置放入队列:

  1. # URL with a trailing slash. Nine redirects left.
  2. ('http://xkcd.com/353/', 9)

The aiohttp package we use would follow redirects by default and give us the final response. We tell it not to, however, and handle redirects in the crawler, so it can coalesce redirect paths that lead to the same destination: if we have already seen this URL, it is in self.seen_urls and we have already started on this path from a different entry point: 我们使用的aiohttp包将遵循默认的重定向,并给我们最后的响应。 然而,我们告诉它不会在抓取工具中处理重定向,所以它可以合并到重定向路径,导致相同的目标:如果我们已经看到这个URL,它在self.seen_urls,我们已经 在此路径上从不同的入口点启动: \aosafigure[240pt]{crawler-images/redirects.png}{Redirects}{500l.crawler.redirects}

The crawler fetches “foo” and sees it redirects to “baz”, so it adds “baz” to the queue and to seen_urls. If the next page it fetches is “bar”, which also redirects to “baz”, the fetcher does not enqueue “baz” again. If the response is a page, rather than a redirect, fetch parses it for links and puts new ones in the queue. 抓取工具获取“foo”并且看到它重定向到“baz”,因此它将“baz”添加到队列和seen_urls。 如果它获取的下一页是“bar”,它也重定向到“baz”,抓取器不会再次入队“baz”。 如果响应是一个页面,而不是一个重定向,fetch解析它的链接,并将新的队列中。

  1. @asyncio.coroutine
  2. def fetch(self, url, max_redirect):
  3. # Handle redirects ourselves.
  4. response = yield from self.session.get(
  5. url, allow_redirects=False)
  6. try:
  7. if is_redirect(response):
  8. if max_redirect > 0:
  9. next_url = response.headers['location']
  10. if next_url in self.seen_urls:
  11. # We have been down this path before.
  12. return
  13. # Remember we have seen this URL.
  14. self.seen_urls.add(next_url)
  15. # Follow the redirect. One less redirect remains.
  16. self.q.put_nowait((next_url, max_redirect - 1))
  17. else:
  18. links = yield from self.parse_links(response)
  19. # Python set-logic:
  20. for link in links.difference(self.seen_urls):
  21. self.q.put_nowait((link, self.max_redirect))
  22. self.seen_urls.update(links)
  23. finally:
  24. # Return connection to pool.
  25. yield from response.release()

If this were multithreaded code, it would be lousy with race conditions. For example, the worker checks if a link is in seen_urls, and if not the worker puts it in the queue and adds it to seen_urls. If it were interrupted between the two operations, then another worker might parse the same link from a different page, also observe that it is not in seen_urls, and also add it to the queue. Now that same link is in the queue twice, leading (at best) to duplicated work and wrong statistics. 如果这是多线程代码,它将是讨厌的条件竞争。 例如,worker 检查链接是否在seen_urls中,如果不是,则将其放入队列并将其添加到seen_urls中。 如果它在两个操作之间中断,则另一个worker 可能从不同的页面解析相同的链接,还观察到它不在seen_urls,并且也将其添加到队列。 现在同一个链接在队列中两次,导致(顶多)重复的工作和错误的统计。 However, a coroutine is only vulnerable to interruption at yield from statements. This is a key difference that makes coroutine code far less prone to races than multithreaded code: multithreaded code must enter a critical section explicitly, by grabbing a lock, otherwise it is interruptible. A Python coroutine is uninterruptible by default, and only cedes control when it explicitly yields. 但是,协程只受到yield from语句中断的影响。 这是一个关键区别,使得协同代码比多线程代码更不容易发生竞争:多线程代码必须通过抓取锁来显式地进入临界区,否则它是可中断的。 Python协程在默认情况下是不可中断的,并且只有在它显式产生时才控制。 We no longer need a fetcher class like we had in the callback-based program. That class was a workaround for a deficiency of callbacks: they need some place to store state while waiting for I/O, since their local variables are not preserved across calls. But the fetch coroutine can store its state in local variables like a regular function does, so there is no more need for a class. 我们不再需要像我们在基于回调的程序中一样的fetcher类。 该类是回调不足的解决方法:在等待I / O时,它们需要一些地方来存储状态,因为它们的局部变量不会跨越调用保留。 但是fetch协程可以像常规函数那样将其状态存储在局部变量中,因此不再需要类。 When fetch finishes processing the server response it returns to the caller, work. The work method calls task_done on the queue and then gets the next URL from the queue to be fetched. 当fetch完成处理服务器响应时,它返回到调用者workwork方法在队列上调用task_done,然后从队列中获取下一个要获取的URL。 When fetch puts new links in the queue it increments the count of unfinished tasks and keeps the main coroutine, which is waiting for q.join, paused. If, however, there are no unseen links and this was the last URL in the queue, then when work calls task_done the count of unfinished tasks falls to zero. That event unpauses join and the main coroutine completes. 当fetch将新的链接放入队列时,它增加未完成任务的计数,并保持主协程,等待q.join,暂停。 然而,如果没有unseen links,这是队列中的最后一个URL,那么当work调用task_done时,未完成任务的计数降为零。 该事件取消了join并且主协程完成。 The queue code that coordinates the workers and the main coroutine is like this[^9]: 协调workers 和主协程的队列代码是这样的[^ 9]:

  1. class Queue:
  2. def __init__(self):
  3. self._join_future = Future()
  4. self._unfinished_tasks = 0
  5. # ... other initialization ...
  6. def put_nowait(self, item):
  7. self._unfinished_tasks += 1
  8. # ... store the item ...
  9. def task_done(self):
  10. self._unfinished_tasks -= 1
  11. if self._unfinished_tasks == 0:
  12. self._join_future.set_result(None)
  13. @asyncio.coroutine
  14. def join(self):
  15. if self._unfinished_tasks > 0:
  16. yield from self._join_future

The main coroutine, crawl, yields from join. So when the last worker decrements the count of unfinished tasks to zero, it signals crawl to resume, and finish. 主协程crawljoin中产生。 因此,当最后一个工人将未完成任务的计数减少为零时,它指示crawl恢复并且完成。 The ride is almost over. Our program began with the call to crawl: 我们的程序从调用crawl开始:

  1. loop.run_until_complete(self.crawler.crawl())

How does the program end? Since crawl is a generator function, calling it returns a generator. To drive the generator, asyncio wraps it in a task: 程序如何结束? 因为crawl ‘是一个生成器函数,所以调用它会返回一个生成器。 为了驱动生成器,asyncio将它包装在一个任务中:

  1. class EventLoop:
  2. def run_until_complete(self, coro):
  3. """Run until the coroutine is done."""
  4. task = Task(coro)
  5. task.add_done_callback(stop_callback)
  6. try:
  7. self.run_forever()
  8. except StopError:
  9. pass
  10. class StopError(BaseException):
  11. """Raised to stop the event loop."""
  12. def stop_callback(future):
  13. raise StopError

When the task completes, it raises StopError, which the loop uses as a signal that it has arrived at normal completion. 当任务完成时,它引发StopError,loop 作为它已经到达正常完成的信号。 But what’s this? The task has methods called add_done_callback and result? You might think that a task resembles a future. Your instinct is correct. We must admit a detail about the Task class we hid from you: a task is a future. 但是这是什么? task 有称为add_done_callbackresult的方法? 你可能认为任务类似于future。 你的直觉是正确的。 我们必须承认我们隐藏的任务类的细节:一个任务是一个future。

  1. class Task(Future):
  2. """A coroutine wrapped in a Future."""

Normally a future is resolved by someone else calling set_result on it. But a task resolves itself when its coroutine stops. Remember from our earlier exploration of Python generators that when a generator returns, it throws the special StopIteration exception: 通常,future 由其他人调用set_result解决。 但是一个任务在它的协程停止时自行解决。 记住我们早期探索Python生成器时,当一个生成器返回时,它会抛出特殊的StopIteration异常:

  1. # Method of class Task.
  2. def step(self, future):
  3. try:
  4. next_future = self.coro.send(future.result)
  5. except CancelledError:
  6. self.cancelled = True
  7. return
  8. except StopIteration as exc:
  9. # Task resolves itself with coro's return
  10. # value.
  11. self.set_result(exc.value)
  12. return
  13. next_future.add_done_callback(self.step)

So when the event loop calls task.add_done_callback(stop_callback), it prepares to be stopped by the task. Here is run_until_complete again: 所以当事件循环调用task.add_done_callback(stop_callback)时,它准备被任务停止。 这里是run_until_complete

  1. # Method of event loop.
  2. def run_until_complete(self, coro):
  3. task = Task(coro)
  4. task.add_done_callback(stop_callback)
  5. try:
  6. self.run_forever()
  7. except StopError:
  8. pass

When the task catches StopIteration and resolves itself, the callback raises StopError from within the loop. The loop stops and the call stack is unwound to run_until_complete. Our program is finished. 当任务捕获StopIteration并且自己解析时,回调从循环中引发StopError。 循环停止,调用堆栈解开为run_until_complete。 我们的程序完成了。

Conclusion 结论

Increasingly often, modern programs are I/O-bound instead of CPU-bound. For such programs, Python threads are the worst of both worlds: the global interpreter lock prevents them from actually executing computations in parallel, and preemptive switching makes them prone to races. Async is often the right pattern. But as callback-based async code grows, it tends to become a dishevelled mess. Coroutines are a tidy alternative. They factor naturally into subroutines, with sane exception handling and stack traces. 越来越多地,现代程序是I / O绑定而不是CPU绑定。 对于这样的程序,Python线程是很糟糕的:全局解释器锁防止它们实际上并行执行计算,并且抢先切换使它们容易出现竞争。 异步通常是正确的模式。 但是随着基于回调的异步代码的增长,它往往成为一个混乱的混乱。 协程是一个整洁的替代品。 他们自然地考虑子程序,具有正确的异常处理和堆栈跟踪。 If we squint so that the yield from statements blur, a coroutine looks like a thread doing traditional blocking I/O. We can even coordinate coroutines with classic patterns from multi-threaded programming. There is no need for reinvention. Thus, compared to callbacks, coroutines are an inviting idiom to the coder experienced with multithreading. 如果我们眯着眼睛,使得yield from语句模糊,协程看起来像是执行传统的阻塞I / O的线程。 我们甚至可以使用多线程编程中的经典模式来协调协程。 没有必要改造。 But when we open our eyes and focus on the yield from statements, we see they mark points when the coroutine cedes control and allows others to run. Unlike threads, coroutines display where our code can be interrupted and where it cannot. In his illuminating essay “Unyielding”^4, Glyph Lefkowitz writes, “Threads make local reasoning difficult, and local reasoning is perhaps the most important thing in software development.” Explicitly yielding, however, makes it possible to “understand the behavior (and thereby, the correctness) of a routine by examining the routine itself rather than examining the entire system.” 但是当我们打开我们的眼睛并专注于yield from语句时,我们看到它们在协程退出控制并允许其他人运行时标记了重点。 与线程不同,协同程序显示我们的代码可以中断的地方,而线程不能。 在他的论文”Unyielding”[^ 4]中,Glyph Lefkowitz写道:“线程使得局部推理变得困难,局部推理也许是软件开发中最重要的。 然而,显式产生可以通过检查例程本身而不是检查整个系统来“理解例程的行为(正确性)。 This chapter was written during a renaissance in the history of Python and async. Generator-based coroutines, whose devising you have just learned, were released in the “asyncio” module with Python 3.4 in March 2014. In September 2015, Python 3.5 was released with coroutines built in to the language itself. These native coroutinesare declared with the new syntax “async def”, and instead of “yield from”, they use the new “await” keyword to delegate to a coroutine or wait for a Future. 章是在Python和异步的历史上复兴期间写的。 基于生成器的协程,它的设计你刚刚学会了,在2014年3月的Python 3.4的“asyncio”模块中发布。2015年9月,Python 3.5发布了内置语言本身的协同程序。 这些本地协程用新语法“async def”声明,而不是“yield from”,它们使用新的“await”关键字来委派协程或等待Future。 Despite these advances, the core ideas remain. Python’s new native coroutines will be syntactically distinct from generators but work very similarly; indeed, they will share an implementation within the Python interpreter. Task, Future, and the event loop will continue to play their roles in asyncio. 尽管有这些进展,核心思想仍然存在。 Python的新本地协同程序在语法上不同于生成器,但工作非常相似; 实际上,他们将在Python解释器中共享一个实现。 Task, Future, 和event loop将继续在asyncio中发挥他们的角色。 Now that you know how asyncio coroutines work, you can largely forget the details. The machinery is tucked behind a dapper interface. But your grasp of the fundamentals empowers you to code correctly and efficiently in modern async environments. 现在你知道asyncio协程如何工作,你可以在很大程度上忘记细节。 机械被塞在一个dapper接口后面。 但是你对基础知识的掌握使你能够在现代异步环境中正确有效地编程。

[^7]: For a complex solution to this problem, see http://www.tornadoweb.org/en/stable/stack_context.html

[^9]: The actual asyncio.Queue implementation uses an asyncio.Event in place of the Future shown here. The difference is an Event can be reset, whereas a Future cannot transition from resolved back to pending.

[^10]: The @asyncio.coroutine decorator is not magical. In fact, if it decorates a generator function and the PYTHONASYNCIODEBUG environment variable is not set, the decorator does practically nothing. It just sets an attribute, _is_coroutine, for the convenience of other parts of the framework. It is possible to use asyncio with bare generators not decorated with @asyncio.coroutine at all.

[^11]: Jesse listed indications and contraindications for using async in “What Is Async, How Does It Work, And When Should I Use It?”, available at pyvideo.org. [^bayer]: Mike Bayer compared the throughput of asyncio and multithreading for different workloads in his “Asynchronous Python and Databases”: http://techspot.zzzeek.org/2015/02/15/asynchronous-python-and-databases/ [^11]: Jesse listed indications and contraindications for using async in “What Is Async, How Does It Work, And When Should I Use It?”:. Mike Bayer compared the throughput of asyncio and multithreading for different workloads in “Asynchronous Python and Databases”: [^12]: This future has many deficiencies. For example, once this future is resolved, a coroutine that yields it should resume immediately instead of pausing, but with our code it does not. See asyncio’s Future class for a complete implementation. [^13]: In fact, this is exactly how “yield from” works in CPython. A function increments its instruction pointer before executing each statement. But after the outer generator executes “yield from”, it subtracts 1 from its instruction pointer to keep itself pinned at the “yield from” statement. Then it yields to its caller. The cycle repeats until the inner generator throws StopIteration, at which point the outer generator finally allows itself to advance to the next instruction. [^14]: Python’s global interpreter lock prohibits running Python code in parallel in one process anyway. Parallelizing CPU-bound algorithms in Python requires multiple processes, or writing the parallel portions of the code in C. But that is a topic for another day. [^15]: Even calls to send can block, if the recipient is slow to acknowledge outstanding messages and the system’s buffer of outgoing data is full. [^16]: Guido introduced the standard asyncio library, called “Tulip” then, at PyCon 2013. [^16]: Guido introduced the standard asyncio library, called “Tulip” then, at PyCon 2013.

[^17]: Python 3.5’s built-in coroutines are described in PEP 492, “Coroutines with async and await syntax.”