前言

asyncio 不是多进程, 也不是多线程, 单单是一个线程, 但是是在 Python 的功能间切换着执行

说实话go协程比较舒服点,但库可能没py丰富,有时候用得到py

对同步代码和异步代码进行对比学习
用sleep(1)模拟耗时1秒的i/o操作

  1. event_loop事件循环:程序开启一个无限的循环,当把一些函数注册到事件循环上时,满足事件发生条件即调用相应的函数。
  2. coroutine协程对象:指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象,协程对象需要注册到事件循环,由事件循环调用。
  3. task任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
  4. future:代表将来执行或没有执行的任务的结果,它和task上没有本质的区别
  5. async/await关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

    切换的点用 await 来标记, 使用async关键词将其变成协程方法, 比如 async def function():。其中,async 定义一个协程,await 用来挂起阻塞方法的执行。

前置知识点

异步:好比一个函数调用后,原来的函数继续干自己的事情,等那个函数干完后,借助某种手段通知原来的函数执行结果。也是一种目的,一般是通过多线程技术去实现。粗略理解为:在等待一件事情的处理结果时, 对方是否提供通知服务, 如果对方提供通知服务, 则为异步。
异步是指代码在进行IO操作或者等待的时候,不必等待IO操作完成或者等待完成就直接返回的调用方式。

阻塞:阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。粗略理解为:在等待一件事情的处理结果时, 你是否还去干点其他的事情, 如果不去, 则为阻塞。

非阻塞:在不能立刻得到结果之前,该调用不会阻塞当前线程。粗略理解为:在等待一件事情的处理结果时, 你是否还去干点其他的事情, 如果去了, 则为非阻塞。

分析

同步:

  1. import time
  2. def yanmu(t):
  3. print('Start job ', t)
  4. time.sleep(t)
  5. print('Job ', t, ' takes ', t, ' s')
  6. def main():
  7. [yanmu(t) for t in range(1, 3)]
  8. t1 = time.time()
  9. main()
  10. print("没有 async 总共消耗时间 : ", time.time() - t1)

image.png
job 是按顺序执行的, 必须执行完 job 1 才能开始执行 job 2, 而且 job 1 需要1秒的执行时间, 而 job 2 需要2秒. 所以总时间是 3 秒多.

而asyncio 可以对此优化
job 1 在等待 time.sleep(t) 结束的时候, 比如是等待一个网页的下载成功, 在这个地方是可以切换给 job 2, 让它开始执行

异步:

  1. import asyncio
  2. import time
  3. async def yanmu(t): # async 形式的功能
  4. print('Start job ', t)
  5. await asyncio.sleep(t) # 等待 "t" 秒, 期间切换其他任务
  6. print('Job ', t, ' takes ', t, ' s')
  7. async def main(loop): # async 形式的功能
  8. tasks = [
  9. loop.create_task(yanmu(t)) for t in range(1, 3)
  10. ] # 创建任务, 但是不执行
  11. await asyncio.wait(tasks) # 执行并等待所有任务完成
  12. t1 = time.time()
  13. loop = asyncio.get_event_loop() # 建立 loop
  14. loop.run_until_complete(main(loop)) # 执行 loop,并且等待所有任务结束
  15. loop.close() # 关闭 loop
  16. print("Async 总共消耗时间 : ", time.time() - t1)

image.png

我们没有等待 job 1 的结束才开始 job 2, 而是 job 1 触发了 await 的时候就切换到了 job 2 了.
这时, job 1 和 job 2 同时在等待 await asyncio.sleep(t), 所以最终的程序完成时间, 取决于等待最长的 t, 也就是 2秒. 4

这和上面用普通形式的代码相比(3秒), 的确快了很多.由于协程对象不能直接运行,在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象。所谓task对象是Future类的子类,保存了协程运行后的状态,用于未来获取协程的结果
image.png

  1. import asyncio
  2. import requests
  3. async def scan(url):
  4. r = requests.get(url).status_code
  5. return r
  6. task = asyncio.ensure_future(scan('http://www.baidu.com'))
  7. loop = asyncio.get_event_loop()
  8. loop.run_until_complete(task)
  9. print(task.result())
  1. 把任务赋值给task
  2. 然后loop为申请调度(这么理解)
  3. 然后执行

但注意因为requests这个库是同步堵塞的,所以没办法变成异步执行(await)
3.7版本可以使用create_task,老版本比较通用的是asyncio.ensure_future

开发工具为了兼容可以作出选择

image.png

简单总结

async def 用来定义异步函数,其内部有异步操作。
每个线程有一个事件循环,主线程调用asyncio.get_event_loop()时会创建事件循环,你需要把异步的任务丢给这个循环的run_until_complete()方法,事件循环会安排协同程序的执行。

aiohttp

发包还是安全的基础呀

requests这个库是堵塞的,并不支持异步,而aiohttp是支持异步的网络请求的库
所以这里要用到aiohttp

基本用法

  1. async with ClientSession() as session: #首先要建立一个session对象,然后用session对象去打开网页
  2. async with session.get(url) as response: #session可以进行多项操作,比如post, get, put, head等

简单例子

  1. 首先async def 关键字定义了这是个异步函数
  2. await 关键字加在需要等待的操作前面,response.read()等待request响应,是个耗IO操作。
  3. 然后使用ClientSession类发起http请求。
  1. import asyncio
  2. from aiohttp import ClientSession
  3. tasks = []
  4. url="https://www.baidu.com/"
  5. async def hello(url):
  6. async with ClientSession() as session:
  7. async with session.get(url) as response:
  8. response=await response.read()
  9. print(response)
  10. if __name__ == '__main__':
  11. loop=asyncio.get_event_loop()
  12. loop.run_until_complete(hello(url))

多链接异步访问

如果我们需要请求多个URL该怎么办呢,同步的做法访问多个URL只需要加个for循环就可以了。但异步的实现方式并没那么容易,在之前的基础上需要将hello()包装在asyncio的Future对象中,然后将Future对象列表作为任务传递给事件循环。

  1. import asyncio
  2. from aiohttp import ClientSession
  3. tasks = []
  4. url="https://www.baidu.com/"
  5. async def hello(url):
  6. async with ClientSession() as session:
  7. async with session.get(url) as response:
  8. response=await response.text(encoding="utf-8")
  9. return response
  10. def run():
  11. for i in range(5):
  12. task=asyncio.ensure_future(hello(url.format(i)))
  13. tasks.append(task)
  14. result = loop.run_until_complete(asyncio.gather(*tasks))
  15. print(result)
  16. if __name__ == '__main__':
  17. loop=asyncio.get_event_loop()
  18. run()
  1. import asyncio
  2. from aiohttp import ClientSession
  3. tasks = []
  4. url="https://www.baidu.com/"
  5. async def hello(url):
  6. async with ClientSession() as session:
  7. async with session.get(url) as response:
  8. response=response.status
  9. return response
  10. def run():
  11. for i in range(5):
  12. task=asyncio.ensure_future(hello(url.format(i)))
  13. tasks.append(task)
  14. result = loop.run_until_complete(asyncio.gather(*tasks))
  15. print(result)
  16. if __name__ == '__main__':
  17. loop=asyncio.get_event_loop()
  18. run()