A Web Crawler With asyncio Coroutines

这是开源程序架构系列的第四本500 Lines or Less的早期章节。 如果你发现任何问题,可以在我们的Github追踪器上反馈。 请关注AOSA blog或新的章节和最后的出版计划,新闻公告推特, 获取关于本书的最新消息。


A. Jesse Jiryu Davis is a staff engineer at MongoDB in New York. He wrote Motor, the async MongoDB Python driver, and he is the lead developer of the MongoDB C Driver and a member of the PyMongo team. He contributes to asyncio and Tornado. He writes at http://emptysqua.re.
A. Jesse Jiryu Davis在纽约为MongoDB工作。他编写了Motor,异步MongoDB Python驱动器,他也是MongoDB C驱动器的首席开发者, 同时他也是PyMango组织的成员之一。他对asyncio和Tornado同样有着杰出贡献。他的博客是 http://emptysqua.re .

Guido van Rossum is the creator of Python, one of the major programming languages on and off the web. The Python community refers to him as the BDFL (Benevolent Dictator For Life), a title straight from a Monty Python skit. Guido’s home on the web is http://www.python.org/~guido/.
Guido van Rossum,Python之父,Python是目前主要的编程语言之一,无论线上线下。 他在社区里一直是一位仁慈的独裁者,一个来自Monty Python短剧的标题。Guido网上的家是http://www.python.org/~guido/ .

介绍

Classical computer science emphasizes efficient algorithms that complete computations as quickly as possible. But many networked programs spend their time not computing, but holding open many connections that are slow, or have infrequent events. These programs present a very different challenge: to wait for a huge number of network events efficiently. A contemporary approach to this problem is asynchronous I/O, or “async”.
经典计算机科学看重高效的算法以便能尽快完成计算。但是许多网络程序消耗的时间不是在计算上,它们通常维持着许多打开的缓慢的连接,或者期待着一些不频繁发生的事件发生。这些程序代表了另一个不同的挑战:如何高效的监听大量网络事件。解决这个问题的一个现代方法是采用异步I/O.

This chapter presents a simple web crawler. The crawler is an archetypal async application because it waits for many responses, but does little computation. The more pages it can fetch at once, the sooner it completes. If it devotes a thread to each in-flight request, then as the number of concurrent requests rises it will run out of memory or other thread-related resource before it runs out of sockets. It avoids the need for threads by using asynchronous I/O.
这一章节实现了一个简单的网络爬虫。这个爬虫是一个异步调用的原型应用程序,因为它需要等待许多响应,而极少有CPU计算。它每次可以抓取的页面越多,它运行结束的时间越快。 如果它为每一个运行的请求分发一个线程,那么随着并发请求数量的增加,它最终会在耗尽系统套接字之前,耗尽内存或者其他线程相关的资源。 它通过使用异步I/O来避免对大量线程依赖。

We present the example in three stages. First, we show an async event loop and sketch a crawler that uses the event loop with callbacks: it is very efficient, but extending it to more complex problems would lead to unmanageable spaghetti code. Second, therefore, we show that Python coroutines are both efficient and extensible. We implement simple coroutines in Python using generator functions. In the third stage, we use the full-featured coroutines from Python’s standard “asyncio” library1, and coordinate them using an async queue.
我们通过三步来实现这个例子。首先,我们展示一个异步事件循环并且梗概一个通过回掉使用这个事件循环。它非常高效,但是扩展它去适应更复杂的问题时会 导致难以处理的意大利面条式代码。因而接下来我们展示既高效又易扩展的Python协同程序。我们在Python中使用生成器函数来实现简单的协调程序。 最后,我们使用来自Python标准“asyncio”库中的全功能的协程程序,然后使用异步序列来整合他们。

任务

A web crawler finds and downloads all pages on a website, perhaps to archive or index them. Beginning with a root URL, it fetches each page, parses it for links to pages it has not seen, and adds the new links to a queue. When it fetches a page with no unseen links and the queue is empty, it stops.
一个网络爬虫寻找并下载一个网站上的所有页面,可能会存储,或者对它们建立索引。由一个根地址开始,取得每一个页面,解析它去寻找指向从未访问过的页面的链接,把新的链接加入队列。 当他解析到一个没有包含陌生链接的页面并且队列是空的,它便停下来。

We can hasten this process by downloading many pages concurrently. As the crawler finds new links, it launches simultaneous fetch operations for the new pages on separate sockets. It parses responses as they arrive, adding new links to the queue. There may come some point of diminishing returns where too much concurrency degrades performance, so we cap the number of concurrent requests, and leave the remaining links in the queue until some in-flight requests complete.
我们可以通过同时下载许多页面来加快这个过程。当爬虫发现新的链接时,它在单独的套接字上同时启动抓取新页面的操作。当抓取结果抵达时,它开始解析响应,并往队列里添加新解析到的链接。 大量的并发请求可能导致一些性能降低,因而我们限制同一时间内请求的数量,把其他的链接加入队列直到一些运行中的请求完成。

传统的实现方法

How do we make the crawler concurrent? Traditionally we would create a thread pool. Each thread would be in charge of downloading one page at a time over a socket. For example, to download a page from xkcd.com。
我们该如何让爬虫并发处理请求呢?传统方法是建立一个线程池。每个进程每次将负责通过一个套接字下载一个页面。比如,下载“xkcd.com”的一个页面。

  1. def fetch(url):
  2. sock = socket.socket()
  3. sock.connect(('xkcd.com', 80))
  4. request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
  5. sock.send(request.encode('ascii'))
  6. response = b''
  7. chunk = sock.recv(4096)
  8. while chunk:
  9. response += chunk
  10. chunk = sock.recv(4096)
  11. # Page is now downloaded.
  12. links = parse_links(response)
  13. q.add(links)

By default, socket operations are blocking: when the thread calls a method like connect or recv, it pauses until the operation completes. 2 Consequently to download many pages at once, we need many threads. A sophisticated application amortizes the cost of thread-creation by keeping idle threads in a thread pool, then checking them out to reuse them for subsequent tasks; it does the same with sockets in a connection pool.
默认情况下,套接字操作是阻塞的:当一个线程调用一个像connect或者recv之类Socket相关的方法时,它会被阻塞直至操作完成。 另外,一次性并行下载很多页面,我们得需要更多的线程。一个复杂点的程序,会将线程频繁创建的开销通过在线程池中保存空闲线程的方式摊销,然后再从线程池中取出并重用这些线程去处理随后的任务;这样达到的效果和使用Socket连接池一样

And yet, threads are expensive, and operating systems enforce a variety of hard caps on the number of threads a process, user, or machine may have. On Jesse’s system, a Python thread costs around 50k of memory, and starting tens of thousands of threads causes failures. If we scale up to tens of thousands of simultaneous operations on concurrent sockets, we run out of threads before we run out of sockets. Per-thread overhead or system limits on threads are the bottleneck. 然而,线程的开销是相对昂贵的,操作系统执行TODO 在Jesse的电脑上,一个Python线程大约消耗50K内存,并且开启成千上万个线程的时候会失败。 如果我们使用并发Socket的方式同时采取成千上万的操作,我们会在耗尽Socket之前达到我们能使用的线程的上限。每一个的开销或者操作系统的上限是这种实现方式的瓶颈。

In his influential article “The C10K problem”3, Dan Kegel outlines the limitations of multithreading for I/O concurrency. He begins, 在他那篇颇有影响力的文章《The C10K problem》中,Dan Kegel概述了用多线程并行处理I/O问题的局限性。

It’s time for web servers to handle ten thousand clients simultaneously, don’t you think? After all, the web is a big place now. 是时候让web服务器同时处理数万客户端请求了,不是吗?毕竟,web那么大。

Kegel coined the term “C10K” in 1999. Ten thousand connections sounds dainty now, but the problem has changed only in size, not in kind. Back then, using a thread per connection for C10K was impractical. Now the cap is orders of magnitude higher. Indeed, our toy web crawler would work just fine with threads. Yet for very large scale applications, with hundreds of thousands of connections, the cap remains: there is a limit beyond which most systems can still create sockets, but have run out of threads. How can we overcome this? Kegel在1999年发明了“C10K”这个词。一万连接现在听起来觉得很少,但问题的关键点在于连接的数量而不在于类型。回到那个年代,一个连接使用一个线程来处理C10K问题是不实际的。现在容量已经是当初的好几个数量级了。说实话,我们的爬虫小玩具使用线程的方式也能运行的很好。但对于需要面对成百上千连接的大规模应用程序来说,使用线程的缺陷还是依旧在这儿:大部分操作系统还能创建Socket,但是不能再继续创建线程了。我们如何克服这个难题呢?

异步

回调

协程

Python生成器如何工作

使用生成器实现协程

So a generator can pause, and it can be resumed with a value, and it has a return value. Sounds like a good primitive upon which to build an async programming model, without spaghetti callbacks! We want to build a “coroutine”: a routine that is cooperatively scheduled with other routines in the program. Our coroutines will be a simplified version of those in Python’s standard “asyncio” library. As in asyncio, we will use generators, futures, and the “yield from” statement.

First we need a way to represent some future result that a coroutine is waiting for. A stripped-down version:

  1. class Future:
  2. def __init__(self):
  3. self.result = None
  4. self._callbacks = []
  5. def add_done_callback(self, fn):
  6. self._callbacks.append(fn)
  7. def set_result(self, result):
  8. self.result = result
  9. for fn in self._callbacks:
  10. fn(self)

A future is initially “pending”. It is “resolved” by a call to set_result.9

Let us adapt our fetcher to use futures and coroutines. Review how we wrote fetch with a callback:

  1. class Fetcher:
  2. def fetch(self):
  3. self.sock = socket.socket()
  4. self.sock.setblocking(False)
  5. try:
  6. self.sock.connect(('xkcd.com', 80))
  7. except BlockingIOError:
  8. pass
  9. selector.register(self.sock.fileno(),
  10. EVENT_WRITE,
  11. self.connected)
  12. def connected(self, key, mask):
  13. print('connected!')
  14. # And so on....

The fetch method begins connecting a socket, then registers the callback, connected, to be executed when the socket is ready. Now we can combine these two steps into one coroutine:

  1. def fetch(self):
  2. sock = socket.socket()
  3. sock.setblocking(False)
  4. try:
  5. sock.connect(('xkcd.com', 80))
  6. except BlockingIOError:
  7. pass
  8. f = Future()
  9. def on_connected():
  10. f.set_result(None)
  11. selector.register(sock.fileno(),
  12. EVENT_WRITE,
  13. on_connected)
  14. yield f
  15. selector.unregister(sock.fileno())
  16. print('connected!')

Now fetch is a generator function, rather than a regular one, because it contains a yield statement. We create a pending future, then yield it to pause fetch until the socket is ready. The inner function on_connected resolves the future.

But when the future resolves, what resumes the generator? We need a coroutine driver. Let us call it “task”:

  1. class Task:
  2. def __init__(self, coro):
  3. self.coro = coro
  4. f = Future()
  5. f.set_result(None)
  6. self.step(f)
  7. def step(self, future):
  8. try:
  9. next_future = self.coro.send(future.result)
  10. except StopIteration:
  11. return
  12. next_future.add_done_callback(self.step)
  13. # Begin fetching http://xkcd.com/353/
  14. fetcher = Fetcher('/353/')
  15. Task(fetcher.fetch())
  16. loop()

The task starts the fetch generator by sending None into it. Then fetch runs until it yields a future, which the task captures as next_future. When the socket is connected, the event loop runs the callback on_connected, which resolves the future, which calls step, which resumes fetch.

Factoring Coroutines With yield from 使用yield from构造协同程序

协调协同程序

结论


  1. 龟叔在PyCon 2013上介绍了标准asyncio库,当时叫做“Tulip”。
  2. Even calls to send can block, if the recipient is slow to acknowledge outstanding messages and the system’s buffer of outgoing data is full. call to send 可以分块,如果接收方迟迟难以确认信号,而且系统传出数据缓冲耗尽。
  3. http://www.kegel.com/c10k.html
  4. Python’s global interpreter lock prohibits running Python code in parallel in one process anyway. Parallelizing CPU-bound algorithms in Python requires multiple processes, or writing the parallel portions of the code in C. But that is a topic for another day.

  5. Jesse listed indications and contraindications for using async in “What Is Async, How Does It Work, And When Should I Use It?”:. Mike Bayer compared the throughput of asyncio and multithreading for different workloads in “Asynchronous Python and Databases”:

  6. For a complex solution to this problem, see http://www.tornadoweb.org/en/stable/stack_context.html

  7. The @asyncio.coroutine decorator is not magical. In fact, if it decorates a generator function and the PYTHONASYNCIODEBUG environment variable is not set, the decorator does practically nothing. It just sets an attribute, _is_coroutine, for the convenience of other parts of the framework. It is possible to use asyncio with bare generators not decorated with @asyncio.coroutine at all.

  8. Python 3.5’s built-in coroutines are described in PEP 492, “Coroutines with async and await syntax.”

  9. This future has many deficiencies. For example, once this future is resolved, a coroutine that yields it should resume immediately instead of pausing, but with our code it does not. See asyncio’s Future class for a complete implementation.

  10. In fact, this is exactly how “yield from” works in CPython. A function increments its instruction pointer before executing each statement. But after the outer generator executes “yield from”, it subtracts 1 from its instruction pointer to keep itself pinned at the “yield from” statement. Then it yields to its caller. The cycle repeats until the inner generator throws StopIteration, at which point the outer generator finally allows itself to advance to the next instruction.

  11. https://docs.python.org/3/library/queue.html

  12. https://docs.python.org/3/library/asyncio-sync.html
  13. The actual asyncio.Queue implementation uses an asyncio.Event in place of the Future shown here. The difference is an Event can be reset, whereas a Future cannot transition from resolved back to pending.

  14. https://glyph.twistedmatrix.com/2014/02/unyielding.html