- 异步基础概念 https://ruanyifeng.com/blog/2019/11/python-asyncio.html
- yield & yield from https://zhuanlan.zhihu.com/p/75347080
- yield & yield from https://www.jianshu.com/p/b036e6e97c18
- 异步入门 https://zhuanlan.zhihu.com/p/25228075
- asyncio 模块 https://mp.weixin.qq.com/s/JyhBVZPeKM5E44jrrA3LEA
- asyncio 模块 https://zhuanlan.zhihu.com/p/59621713
- async | await 关键字 https://zhuanlan.zhihu.com/p/27258289
- 回调函数示例 ```python import socket import time
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
selector = DefaultSelector() stopped = False urls = {‘/‘, ‘/1’, ‘/2’, ‘/3’, ‘/4’}
class Crawler: def init(self, url, host): self.url = url self.host = host self.response = b’’ self.request = “GET {} HTTP/1.0\r\nHost: {}\r\n\r\n”.format(self.url, self.host)
def fetch(self):
self.sock = socket.socket()
self.sock.setblocking(False)
try:
self.sock.connect((self.host, 80))
except BlockingIOError:
pass
selector.register(self.sock.fileno(), EVENT_WRITE, self.on_connected)
def on_connected(self, key, mask):
selector.unregister(key.fd)
print(f"Connected: {self.request.strip()}")
self.sock.send(self.request.encode('ascii'))
selector.register(key.fd, EVENT_READ, self.read_response)
def read_response(self, key, mask):
global stopped
chunk = self.sock.recv(4096)
while chunk:
self.response += chunk
chunk = self.sock.recv(4096)
print(f"Finished: {self.request.strip()}")
selector.unregister(key.fd)
urls.remove(self.url)
if not urls:
stopped = True
def loop(): while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback(event_key, event_mask)
if name == ‘main‘: start = time.time() for url in urls: crawler = Crawler(url, “baidu.com”) crawler.fetch() loop() print(time.time() - start)
> 可以在 `urls` 中只放一个元素,然后断点调试,观察程序流程
- 生成器(yield)示例
```python
import socket
import time
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
selector = DefaultSelector()
urls = {'/', '/1', '/2', '/3', '/4'}
stopped = False
class Future:
def __init__(self):
self.result = None
self._callbacks = []
def add_done_callback(self, fn):
self._callbacks.append(fn)
def set_result(self, result):
self.result = result
for fn in self._callbacks:
fn(self)
class Crawler:
def __init__(self, url, host):
self.url = url
self.host = host
self.request = "GET {} HTTP/1.0\r\nHost: {}\r\n\r\n".format(self.url, self.host)
self.response = b''
def fetch(self):
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect((self.host, 80))
except BlockingIOError:
pass
f = Future()
def on_connected():
f.set_result(None)
selector.register(sock.fileno(), EVENT_WRITE, on_connected)
yield f
selector.unregister(sock.fileno())
print(f"Connected: {self.request.strip()}")
sock.send(self.request.encode('ascii'))
global stopped
while True:
f = Future()
def on_readable():
f.set_result(sock.recv(4096))
selector.register(sock.fileno(), EVENT_READ, on_readable)
chunk = yield f
selector.unregister(sock.fileno())
if chunk:
self.response += chunk
else:
print(f"Finished: {self.request.strip()}")
urls.remove(self.url)
if not urls:
stopped = True
break
class Task:
def __init__(self, coro):
self.coro = coro
f = Future()
self.step(f)
def step(self, future):
try:
next_future = self.coro.send(future.result)
except StopIteration:
return
next_future.add_done_callback(self.step)
def loop():
while not stopped:
events = selector.select()
for key, mask in events:
callback = key.data
callback()
if __name__ == '__main__':
start = time.time()
for url in urls:
crawler = Crawler(url, "baidu.com")
Task(crawler.fetch())
loop()
print(time.time() - start)