asyncio模块

  • 包含了各种特定系统实现的事件循环模块(比如selectepoll
  • 对传输方式和通信协议进行了很好的封装,方便使用
  • TCPUDPSSL、子进程、延时调用等功能的支持
  • 有和futures模块中类似的Future类(这个在concurrent线程池里讲过)
  • 基于yield from的协议和任务,可以采用顺序逻辑去编写并发代码
  • 当遇到设计I/O操作的调用时,可以将这个任务转移到线程池,不单单地只支持协程,融合了前面的多线程工具,一起解决异步I/O问题
  • Python用于解决异步I/O编程的一整套解决方案

事件循环

引言

在计算系统中,可以产生事件的实体叫做事件源,能处理事件的实体叫做事件处理者。此外,还有一些第三方实体叫做事件循环。它的作用是管理所有的事件,在整个程序运行过程中不断循环执行,追踪事件发生的顺序将它们放到队列中,当主线程空闲的时候,调用相应的事件处理者处理事件。

所有的事件都在事件循环中被捕捉,然后经过事件事件处理者的处理,前面Python socket编程中的例子就是采用的这种处理方式。

协程+事件循环

  1. import asyncio
  2. import time
  3. async def deal_html(url):
  4. print("start dealing with url")
  5. await asyncio.sleep(2) # 模拟耗时操作
  6. print("Done!")
  7. if __name__ == '__main__':
  8. start_time = time.time()
  9. loop = asyncio.get_event_loop() # 得到一个事件循环
  10. loop.run_until_complete(deal_html("https://www.google.com")) # 把协程放入时间循环
  11. end_time = time.time()
  12. print(end_time - start_time)
  13. # result
  14. # start dealing with url
  15. # Done!
  16. # 2.0022695064544678

在协程函数中,不能使用time.sleep(2)time模块提供的休眠方法是针对整个线程、整个程序的,用这种方式休眠,事件循环也被终止了,对单个协程任务还好,如果有多个协程任务,显然是不符合我们的逻辑的。于是asyncio模块提供了一个休眠方法,针对每个协程休眠,可以保证事件循环还在进行。

下面的这个例子中,一次提交了3个同样的协程任务,可以看出,最后的运行时间还是 2s 左右。

  1. import asyncio
  2. import time
  3. async def deal_html(url):
  4. print("start dealing with url")
  5. await asyncio.sleep(2) # 模拟耗时操作
  6. # time.sleep(2) # 如果采用这种方式休眠,大家可以试一下结果如何
  7. print("Done!")
  8. if __name__ == '__main__':
  9. start_time = time.time()
  10. loop = asyncio.get_event_loop() # 得到一个事件循环
  11. tasks = [deal_html("https://www.google.com") for i in range(3)]
  12. loop.run_until_complete(asyncio.wait(tasks)) # 把协程放入事件循环,等待它们全部结束
  13. end_time = time.time()
  14. print(end_time - start_time)
  15. # result
  16. # start dealing with url
  17. # start dealing with url
  18. # start dealing with url
  19. # Done!
  20. # Done!
  21. # Done!
  22. # 2.0020759105682373

也可以参考 这篇文章(科学上网)

获得协程的返回值

前面介绍过,asyncio有和concurrent.futures模块相似的Future对象(其实就是它的子类),通过它我们就可以获得协程执行的状态和结果。从代码中可以看出,这种使用方式和前面使用线程池的语法和逻辑是非常相似的。

  1. import asyncio
  2. import time
  3. async def deal_html(url):
  4. print("start dealing with url")
  5. await asyncio.sleep(2) # 模拟耗时操作
  6. return "task finished"
  7. if __name__ == '__main__':
  8. start_time = time.time()
  9. loop = asyncio.get_event_loop() # 得到一个事件循环
  10. task = loop.create_task(deal_html("https://www.google.com")) # 提交一个协程任务,返回一个对象
  11. loop.run_until_complete(task) # 把协程放入事件循环
  12. print(task.result()) # 拿到返回结果
  13. end_time = time.time()
  14. print(end_time - start_time)
  15. # result:
  16. # start dealing with url
  17. # task finished
  18. # 2.00205659866333

说道事件循环,就想到了回调。如果一个协程任务完后,需要给相关人员发送通知。比如,我们在 12306 上购票成功后都会收到短信提醒,这就可以用回调来解决.

  1. import asyncio
  2. import time
  3. from functools import partial
  4. async def deal_html(url):
  5. print("start dealing with url")
  6. await asyncio.sleep(2)
  7. return "task finished"
  8. """
  9. 这里就是回调函数,它必须接收 Future 对象作为参数,便于我们获取相关信息
  10. 如果函数还接收其他参数,这个 Future 参数要放到列表的最后,调用的时候回自动传递进来
  11. """
  12. def send_email(name, future):
  13. print("Sending email to {whom}".format(whom=name))
  14. if __name__ == '__main__':
  15. start_time = time.time()
  16. loop = asyncio.get_event_loop() # 得到一个事件循环
  17. task = loop.create_task(deal_html("https://www.google.com"))
  18. task.add_done_callback(partial(send_email, "MetaTian")) # 把 MetaTian 先传递进去,封装成新的函数,就不用考虑参数问题了
  19. loop.run_until_complete(task) # 把协程放入事件循环
  20. print(task.result()) # 拿到返回结果
  21. end_time = time.time()
  22. print(end_time - start_time)
  23. # result:
  24. # start dealing with url
  25. # Sending email to MetaTian
  26. # task finished
  27. # 2.0017893314361572

task.add_done_callback()中,只接受一个函数名称Future对象会自动传递进去作为参数,如果你熟悉PyQt信号与槽机制,这种调用方式会很眼熟。但是,我们的回调函数是带参数的,怎么办呢?我们用到了很早很早以前讲过的 偏函数,它将回调函数进行了二次封装,成为了一个新的函数。

gather and wait

简化前面的一个例子,使用asyncio.wait()可以等待协程任务队列的执行,它的功能是比较单一的。

  1. import asyncio
  2. import time
  3. async def deal_html(url):
  4. print("start dealing with {0}".format(url))
  5. await asyncio.sleep(2) # 模拟耗时操作
  6. print("Done!")
  7. if __name__ == '__main__':
  8. loop = asyncio.get_event_loop() # 得到一个事件循环
  9. tasks = [deal_html("https://www.google.com") for i in range(3)]
  10. loop.run_until_complete(asyncio.wait(tasks)) # 等待任务完成
  11. # loop.run_until_complete() 参数要求:A Future, a coroutine or an awaitable is required

看看asyncio.gather()能做什么。

  1. import asyncio
  2. import time
  3. async def deal_html(url):
  4. print("start dealing with {0}".format(url))
  5. await asyncio.sleep(2) # 模拟耗时操作
  6. print("Done!")
  7. if __name__ == '__main__':
  8. loop = asyncio.get_event_loop() # 得到一个事件循环
  9. tasks = [deal_html("https://www.google.com") for i in range(3)]
  10. # 协程任务分组
  11. group1 = asyncio.gather(*[deal_html("https://www.google.com") for i in range(3)])
  12. group2 = asyncio.gather(*[deal_html("https://www.yuque.com") for i in range(2)])
  13. # 批量取消任务
  14. group1.cancel()
  15. try:
  16. loop.run_until_complete(asyncio.gather(group2, group1))
  17. except Exeption:
  18. pass
  19. # result:
  20. # start dealing with https://www.yuque.com
  21. # start dealing with https://www.yuque.com

Asyncio.gather vs asyncio.wait

task的取消和子协程调用

如何取消task

  1. import asyncio
  2. import time
  3. async def deal_html(times):
  4. print("start dealing with url")
  5. await asyncio.sleep(times) # 模拟耗时操作
  6. print("Done!")
  7. if __name__ == '__main__':
  8. loop = asyncio.get_event_loop()
  9. tasks = [deal_html(i) for i in range(2,5)]
  10. try:
  11. loop.run_until_complete(asyncio.wait(tasks))
  12. except KeyboardInterrupt as e: # 键盘按下 Ctrl + c
  13. all_tasks = asyncio.Task.all_tasks() # 获得所有的任务,比我们提交的要多一个
  14. for task in all_tasks:
  15. print("canceling")
  16. task.cancel()
  17. loop.stop()
  18. loop.run_forever()
  19. finally:
  20. loop.close()
  21. # result:
  22. # start dealing with url
  23. # start dealing with url
  24. # start dealing with url
  25. # canceling
  26. # canceling
  27. # canceling
  28. # canceling

和事件循环相关的api官方文档
除了我们提交的三个协程任务deal_html(),还有一个wait()协程,它是为了协助任务完成自动加进去的。

asyncio并发编程 - 图1

子协程调用

  1. import asyncio
  2. async def compute(x, y):
  3. print("Computing {n1} + {n2}...".format(n1=x, n2=y))
  4. await asyncio.sleep(1)
  5. return x + y
  6. async def print_sum(x, y):
  7. res = await compute(x, y)
  8. print("{n1} + {n2} is {result}".format(n1=x, n2=y, result=res))
  9. if __name__ == '__main__':
  10. loop = asyncio.get_event_loop()
  11. loop.run_until_complete(print_sum(1, 2))
  12. # result:
  13. # Computing 1 + 2...
  14. # 1 + 2 is 3

整个调用过程的时序图如下:

asyncio并发编程 - 图2

计划回调

asyncio提供的事件循环中,我们不仅可以向里面提交协程任务,还可以直接传递相关的任务函数,等到条件满足后进行回调。

参考:Scheduling callbacksScheduling delayed callbacks

call_soon(callback, *args)

向事件循环中添加一个函数,等到下一次循环的时候进行回调,如果有多个函数进行了注册,则按顺序调用,且只调用一次。

  1. import asyncio
  2. def learn(lan):
  3. print("Learning {0}...".format(lan))
  4. def stop_loop(loop):
  5. loop.stop()
  6. if __name__ == '__main__':
  7. loop = asyncio.get_event_loop()
  8. loop.call_soon(learn, "Python")
  9. loop.call_soon(stop_loop, loop) # 用来结束事件循环,注册时的先后顺序很重要
  10. """
  11. 不能调用 run_until_complete(),这个方法只针对协程任务
  12. 而且协程任务完成后,事件循环结束,run_forever()会让事件循环一直进行下去
  13. 要通过其他的方式来结束掉任务循环
  14. """
  15. loop.run_forever()
  16. # result:
  17. # Learning Python...

call_later(delay, callback, *args)

向事件循环中添加任务函数,经过给定的时间参数后开始调用,函数调用的顺序按照delay参数(单位是)进行排序。

  1. import asyncio
  2. def learn(lan):
  3. print("Learning {0}...".format(lan))
  4. if __name__ == '__main__':
  5. loop = asyncio.get_event_loop()
  6. """
  7. 这里的时间调用顺序是相对的,如果 learn(Python) 非常耗时
  8. 当 learn 结束后,再过1s, 调用learn(Ruby), 再过3s, 调用learn(Java)
  9. """
  10. loop.call_later(3, learn, "Java") # 3s后调用
  11. loop.call_later(1, learn, "Ruby") # 1s后调用
  12. loop.call_soon(learn, "Python") # 立刻调用
  13. loop.run_forever()
  14. # result:
  15. # Learning Python...
  16. # Learning Ruby...
  17. # Learning Java...

call_at(when, callback, *args)

向事件循环中添加任务函数,它们严格地按照规定的绝对时间进行调用。通过loop.time()我们可以看到这个绝对时间。

  1. import asyncio
  2. def learn(lan, when):
  3. print("Learning {0}... at {1}".format(lan, when))
  4. if __name__ == '__main__':
  5. loop = asyncio.get_event_loop()
  6. now = loop.time()
  7. print("Now at: {}".format(now))
  8. loop.call_at(now+4, learn, "Java", now+4)
  9. loop.call_at(now+2, learn, "Ruby", now+2)
  10. loop.run_forever()
  11. # result:
  12. # Now at: 3692.0
  13. # Learning Ruby... at 3694.0
  14. # Learning Java... at 3696.0

call_soon_threadsafe(callback, *args)

call_soon_threadsafe()的线程安全版本,虽然协程是单线程的,但是如果一个任务中使用了其他线程也需要的变量,就要考虑线程安全的问题,这种调用就是为了解决这个问题的。

使用asynio模拟http请求

原始的方式

  1. import socket
  2. from urllib.parse import urlparse
  3. def get_url(url):
  4. # 通过 socket 请求 html
  5. url = urlparse(url)
  6. host = url.netloc
  7. path = "/" if not url.path else url.path
  8. client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  9. client.connect((host, 80))
  10. client.send("GET {0} HTTP/1.1\r\nHost:{1}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
  11. data = b""
  12. while True:
  13. d = client.recv(1024) # 内容会很多,多次接收
  14. if d:
  15. data += d
  16. else:
  17. break
  18. data = data.decode("utf8")
  19. html_data = data.split("\r\n\r\n")[1] # 去响应头,保留页面数据
  20. # print(html_data) # 可以打印出来看下结果,就是 html 代码,这里只关注时间
  21. client.close()
  22. if __name__ == '__main__':
  23. import time
  24. start_time = time.time()
  25. get_url("https://www.baidu.com")
  26. end_time = time.time()
  27. print(end_time-start_time)
  28. # result:
  29. # 0.7720680236816406

使用asyncio模块

  1. import socket
  2. import asyncio
  3. from urllib.parse import urlparse
  4. async def get_url(url):
  5. # 通过 socket 请求 html
  6. url = urlparse(url)
  7. host = url.netloc
  8. path = "/" if not url.path else url.path
  9. """
  10. 用协程的方式控制 socket
  11. 底层还是使用了 select, 通过读写状态进行操作
  12. 因为这个模块封装了很多细节,使用起来会简洁许多
  13. """
  14. reader, writer = await asyncio.open_connection(host, 80)
  15. writer.write("GET {0} HTTP/1.1\r\nHost:{1}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
  16. html_lines = []
  17. async for line in reader:
  18. data = line.decode("utf8")
  19. html_lines.append(data)
  20. html_data = "\n".join(html_lines)
  21. return html_data
  22. if __name__ == '__main__':
  23. import time
  24. start_time = time.time()
  25. loop = asyncio.get_event_loop()
  26. tasks = [get_url("https://www.baidu.com") for i in range(10)] # 10个请求协程
  27. loop.run_until_complete(asyncio.wait(tasks))
  28. end_time = time.time()
  29. print(end_time-start_time)
  30. # result:
  31. # 0.11810922622680664

concurrent线程池工具很类似,这里也提供了一个as_completed()函数,只要有一个协程任务完成了,就返回,然后可以每次只处理一个协程任务。

  1. import socket
  2. import asyncio
  3. from urllib.parse import urlparse
  4. async def get_url(url):
  5. # 通过 socket 请求 html
  6. url = urlparse(url)
  7. host = url.netloc
  8. path = "/" if not url.path else url.path
  9. """
  10. 用协程的方式控制 socket
  11. 底层还是使用了 select, 通过读写状态进行操作
  12. 为了这个模块封装了很多细节
  13. """
  14. reader, writer = await asyncio.open_connection(host, 80)
  15. writer.write("GET {0} HTTP/1.1\r\nHost:{1}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
  16. html_lines = []
  17. async for line in reader:
  18. data = line.decode("utf8")
  19. html_lines.append(data)
  20. html_data = "\n".join(html_lines)
  21. return html_data
  22. # 再创建一个协程,封装一部分协程任务
  23. async def main():
  24. tasks = [get_url("https://www.baidu.com") for i in range(5)]
  25. for task in asyncio.as_completed(tasks):
  26. result = await task # result: html_data
  27. print("done")
  28. if __name__ == '__main__':
  29. import time
  30. start_time = time.time()
  31. loop = asyncio.get_event_loop()
  32. loop.run_until_complete(main())
  33. end_time = time.time()
  34. print(end_time-start_time)
  35. # result:
  36. # done
  37. # done
  38. # done
  39. # done
  40. # done

结束

协程算是Python中比较复杂的一个部分,对它的理解和使用都比多线程多进程要复杂的多。初学编程,我们都是采用同步的方式来编写代码,事件循环回调异步这些编程思路的转换需要一定的时间,通过使用asyncio模块,我们能够比较简单地去使用协程,但弄清楚协程的设计思想和相关的小知识点是很有必要的,比如:

  • async
  • await
  • yield
  • yield from

这些关键字的含义和作用要理解的很清楚才行。附上几篇参考资料,便于巩固: