date: 2021-08-29title: python中协程的概念及使用 #标题
tags: #标签
categories: python # 分类
参考:
协程的概念
什么是协程
协程,又称微线程。协程的作用是在执行函数A时,可以随时中断,去执行函数B,然后中断继续执行函数A(可以自由切换)。但这一过程并不是函数调用(没有调用语句),这一整个过程看似像多线程,然而协程只有一个线程执行。
协程的优势
- 执行效率极高,因为子程序切换(函数)不是线程切换,由程序自身控制,没有切换线程的开销。所以与多线程相比,线程的数量越多,协程性能的优势越明显。
- 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在控制共享资源时也不需要加锁,因此执行效率高很多。
说明:协程可以处理IO密集型程序的效率问题,但是处理CPU密集型不是它的长处,如要充分发挥CPU利用率可以结合多进程+协程。
如何实现协程
Python2对协程的支持比较有限,生成器的yield实现了一部分但不完全,gevent模块倒是有比较好的实现;Python3.4以后引入了asyncio模块,可以很好的使用协程。
gevent使用说明
gevent是第三方库,通过greenlet(底层是C语言写的)实现协程,其基本思想就是当一个greenlet遇到I/O操作时,比如访问网络,就自动切换到其他的greenlet,等待I/O操作完成,再在适当的时候切换回来继续执行。由于I/O操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待I/O。gevent是第三方库,需要安装gevent
模块。
- monkey:可以使一些阻塞的模块变得不阻塞,机制:遇到IO操作则自动切换,手动切换可以用gevent.sleep(0)(将爬虫代码换成这个,效果一样可以达到切换上下文)。
- gevent.spawn:启动协程,参数为函数名称,参数名称。
- gevent.joinall:停止协程。
asynico
asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。asyncio的异步操作,需要在coroutine中通过yield from完成。
- asynico + yield from(python3.4) # 3.4中是结合yield来实现的
- asynico + await(python3.5) # 在3.5中是通过await来实现的
gevent使用示例
import gevent
# 定义一个函数,假装里面存放的是一些I/O操作(磁盘I/O、网络I/O...)
def func():
print('func start')
gevent.sleep(2) # 假装这里是I/O操作
print('func end')
# 带有I/O操作的代码写在函数func中,然后提交func给event
g1 = gevent.spawn(func)
g2 = gevent.spawn(func)
g3 = gevent.spawn(func)
# 手动阻塞,如果不手动阻塞,那么函数不会执行,因为整个代码没有发生阻塞,所以线程不会切换到函数中去执行
# 下面是手动对每一个对象进行阻塞,比较low,g1.join()表示阻塞直至协程g1执行完毕,g2.join()表示阻塞直至协程g2执行完毕
# g1.join()
# g2.join()
# g3.join()
# 一般是使用joinall来进行阻塞,joinall()中的参数是一个协程对象列表,会等待所有的协程都执行完毕再退出
gevent.joinall([g1, g2, g3])
# 如果不使用join、joinall来手动阻塞,也可以手动阻塞一下,只是阻塞时间无法自己掌握(需要阻塞到函数中的任务都执行完成)
# gevent.sleep(3)
# 打印结果如下:
上面是通过gevent提供的sleep来实现模仿I/O操作的,那么如果我们就想使用time模块提供的sleep方法呢?
import gevent
import time
def func():
print('start func')
time.sleep(2)
print('end func')
g1=gevent.spawn(func)
g2=gevent.spawn(func)
g3=gevent.spawn(func)
gevent.joinall([g1,g2,g3])
# 输出结果如下:
start func
end func
start func
end func
start func
end func
可以看到当使用time.sleep来实现模仿I/O操作时,就没有了并发效果,那么该如何解决呢?下面也将是一个完整的gevent模块使用示例:
from gevent import monkey
monkey.patch_all()
# 上面的代码一般要写在导入time、socket等模块前(我试了下,在import time后写也是可以的,但不建议)
import gevent
import time
def func(n):
print(f'start func{n}')
time.sleep(2)
print(f'end func{n}')
def main():
l1 = []
for i in range(3):
res = gevent.spawn(func, i) # 函数名后面可以跟参数,spawn可以接收位置参数和关键字参数
l1.append(res)
gevent.joinall(l1)
if __name__ == '__main__':
main()
通过协程实现聊天服务器
# server端代码如下
from gevent import monkey
monkey.patch_all()
import socket
import gevent
def func(conn):
while True:
msg = conn.recv(1024).decode('utf-8')
MSG = msg.upper()
print(MSG)
conn.send(MSG.encode('utf-8'))
sk = socket.socket()
sk.bind(('127.0.0.1', 9001))
sk.listen()
while 1:
conn, _ = sk.accept()
g1 = gevent.spawn(func, conn)
# clinet代码如下:
import time
import socket
sk = socket.socket()
sk.connect(('127.0.0.1', 9001))
while True:
sk.send(b'hello01')
msg = sk.recv(1024)
print(msg)
time.sleep(2)
上面的代码,无论启多少个client端,server端都可以正常通过协程去工作。
gevent判断是否可以实现协程
# 方式一,通过在执行monkey.patch_all()前后各打印下模块,通过打印结果是否一致即可判断
# 两次打印结果一致,则表明此模块无法实现协程,如果不一致,则表明可以实现
import socket
print(socket.socket)
from gevent import monkey
monkey.patch_all()
print(socket.socket)
# 打印如下:
# <class 'socket.socket'>
# <class 'gevent._socket3.socket'>
# 如果不使用monkey这个功能,那么打印结果应该是一样的:
# <class 'socket.socket'>
# <class 'socket.socket'>
# 方式二,直接看monkey.patch_all方法的源码:
# 默认参数中,模块名的值为True的表示支持,为False则表示不支持
def patch_all(socket=True, dns=True, time=True, select=True, thread=True, os=True, ssl=True, httplib=False,
subprocess=True, sys=False, aggressive=True, Event=False,
builtins=True, signal=True):
asyncio模块
代码示例
import asyncio
# 只要函数中包含了await关键字,那么def前面必须写async
async def func(name, age):
print(f'start...我叫{name},今年{age}岁了')
'''
await 后面跟的是可能会发生阻塞的方法
await 关键字必须写在一个async开头的函数里
await是一个只能在协程函数中使用的关键字,用于遇到IO操作时挂起 当前协程(任务)
当前协程(任务)挂起过程中 事件循环可以去执行其他的协程(任务)
当前协程IO处理完成时,可以再次切换回来执行await之后的代码。代码如下:
'''
await asyncio.sleep(2)
print('end...')
# 初始化一个事件循环对象
loop = asyncio.get_event_loop()
# 运行函数必须放在下面的方法中
loop.run_until_complete(func('alex', 36))
上面是简单的运行了一个函数,如果要运行多个函数只需要将要运行的函数放到一个列表中,再将函数列表放置到asyncio.wait()方法中,然后传给run_until_complete()方法,如下:
import asyncio
async def func(name, age):
print(f'start... 我叫{name},今年{age}岁了')
await asyncio.sleep(2)
print('end...')
loop = asyncio.get_event_loop()
func_list = [func('张三', 35), func('李四', 24)]
loop.run_until_complete(asyncio.wait(func_list))
代码实践
"""
通过协程实现爬取图片并保存到本地
下载图片使用第三方模块aiohttp,请提前安装 aiohttp 模块
"""
# -*- coding:utf-8 -*-
import aiohttp
import asyncio
async def fetch(session, url):
print('发送请求:', url)
async with session.get(url, verify_ssl=False) as response:
content = await response.content.read()
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_obj:
file_obj.write(content)
async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
]
tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
await asyncio.wait(tasks)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
task对象
Tasks用于并发调度协程,通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task() 函数以外,还可以用低层级的 loop.create_task() 或 ensure_future() 函数。不建议手动实例化 Task 对象。
本质上是将协程对象封装成task对象,并将协程立即加入事件循环,同时追踪协程的状态。
注意:asyncio.create_task() 函数在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future() 函数。
示例1
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
async def main():
print("main开始")
# 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
task1 = asyncio.create_task(func())
# 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
task2 = asyncio.create_task(func())
print("main结束")
# 当执行某协程遇到IO操作时,会自动化切换执行其他任务。
# 此处的await是等待相对应的协程全都执行完毕并获取结果
ret1 = await task1
ret2 = await task2
print(ret1, ret2)
asyncio.run(main())
示例2
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"
async def main():
print("main开始")
# 创建协程,将协程封装到Task对象中并添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
# 在调用
task_list = [
asyncio.create_task(func(), name="n1"),
asyncio.create_task(func(), name="n2")
]
print("main结束")
# 当执行某协程遇到IO操作时,会自动化切换执行其他任务。
# 此处的await是等待所有协程执行完毕,并将所有协程的返回值保存到done
# 如果设置了timeout值,则意味着此处最多等待的秒,完成的协程返回值写入到done中,未完成则写到pending中。
done, pending = await asyncio.wait(task_list, timeout=None)
print(done, pending)
asyncio.run(main())
# 注意:asyncio.wait 源码内部会对列表中的每个协程执行ensure_future从而封装为Task对象,
# 所以在和wait配合使用时task_list的值为[func(),func()] 也是可以的。