date: 2021-08-29title: python中协程的概念及使用 #标题
tags: #标签
categories: python # 分类

参考:

协程的概念

什么是协程

协程,又称微线程。协程的作用是在执行函数A时,可以随时中断,去执行函数B,然后中断继续执行函数A(可以自由切换)。但这一过程并不是函数调用(没有调用语句),这一整个过程看似像多线程,然而协程只有一个线程执行。

协程的优势

  • 执行效率极高,因为子程序切换(函数)不是线程切换,由程序自身控制,没有切换线程的开销。所以与多线程相比,线程的数量越多,协程性能的优势越明显。
  • 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在控制共享资源时也不需要加锁,因此执行效率高很多。

说明:协程可以处理IO密集型程序的效率问题,但是处理CPU密集型不是它的长处,如要充分发挥CPU利用率可以结合多进程+协程。

如何实现协程

Python2对协程的支持比较有限,生成器的yield实现了一部分但不完全,gevent模块倒是有比较好的实现;Python3.4以后引入了asyncio模块,可以很好的使用协程。

gevent使用说明

gevent是第三方库,通过greenlet(底层是C语言写的)实现协程,其基本思想就是当一个greenlet遇到I/O操作时,比如访问网络,就自动切换到其他的greenlet,等待I/O操作完成,再在适当的时候切换回来继续执行。由于I/O操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待I/O。gevent是第三方库,需要安装gevent模块。

  • monkey:可以使一些阻塞的模块变得不阻塞,机制:遇到IO操作则自动切换,手动切换可以用gevent.sleep(0)(将爬虫代码换成这个,效果一样可以达到切换上下文)。
  • gevent.spawn:启动协程,参数为函数名称,参数名称。
  • gevent.joinall:停止协程。

asynico

asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。asyncio的异步操作,需要在coroutine中通过yield from完成。

  • asynico + yield from(python3.4) # 3.4中是结合yield来实现的
  • asynico + await(python3.5) # 在3.5中是通过await来实现的

gevent使用示例

import gevent


# 定义一个函数,假装里面存放的是一些I/O操作(磁盘I/O、网络I/O...)
def func():
    print('func start')
    gevent.sleep(2)  # 假装这里是I/O操作
    print('func end')


# 带有I/O操作的代码写在函数func中,然后提交func给event
g1 = gevent.spawn(func)
g2 = gevent.spawn(func)
g3 = gevent.spawn(func)

# 手动阻塞,如果不手动阻塞,那么函数不会执行,因为整个代码没有发生阻塞,所以线程不会切换到函数中去执行
# 下面是手动对每一个对象进行阻塞,比较low,g1.join()表示阻塞直至协程g1执行完毕,g2.join()表示阻塞直至协程g2执行完毕
# g1.join()
# g2.join()
# g3.join()

# 一般是使用joinall来进行阻塞,joinall()中的参数是一个协程对象列表,会等待所有的协程都执行完毕再退出
gevent.joinall([g1, g2, g3])

# 如果不使用join、joinall来手动阻塞,也可以手动阻塞一下,只是阻塞时间无法自己掌握(需要阻塞到函数中的任务都执行完成)
# gevent.sleep(3)


# 打印结果如下:

上面是通过gevent提供的sleep来实现模仿I/O操作的,那么如果我们就想使用time模块提供的sleep方法呢?

import gevent
import time
def func():
    print('start func')
    time.sleep(2)
    print('end func')


g1=gevent.spawn(func)
g2=gevent.spawn(func)
g3=gevent.spawn(func)

gevent.joinall([g1,g2,g3])


# 输出结果如下:
start func
end func
start func
end func
start func
end func

可以看到当使用time.sleep来实现模仿I/O操作时,就没有了并发效果,那么该如何解决呢?下面也将是一个完整的gevent模块使用示例:

from gevent import monkey

monkey.patch_all()
# 上面的代码一般要写在导入time、socket等模块前(我试了下,在import time后写也是可以的,但不建议)
import gevent
import time


def func(n):
    print(f'start func{n}')
    time.sleep(2)
    print(f'end func{n}')


def main():
    l1 = []
    for i in range(3):
        res = gevent.spawn(func, i)  # 函数名后面可以跟参数,spawn可以接收位置参数和关键字参数
        l1.append(res)

    gevent.joinall(l1)


if __name__ == '__main__':
    main()

通过协程实现聊天服务器

# server端代码如下
from gevent import monkey

monkey.patch_all()
import socket
import gevent


def func(conn):
    while True:
        msg = conn.recv(1024).decode('utf-8')
        MSG = msg.upper()
        print(MSG)
        conn.send(MSG.encode('utf-8'))


sk = socket.socket()
sk.bind(('127.0.0.1', 9001))
sk.listen()

while 1:
    conn, _ = sk.accept()
    g1 = gevent.spawn(func, conn)







# clinet代码如下:
import time
import socket

sk = socket.socket()
sk.connect(('127.0.0.1', 9001))

while True:
    sk.send(b'hello01')
    msg = sk.recv(1024)
    print(msg)
    time.sleep(2)

上面的代码,无论启多少个client端,server端都可以正常通过协程去工作。

gevent判断是否可以实现协程


# 方式一,通过在执行monkey.patch_all()前后各打印下模块,通过打印结果是否一致即可判断
# 两次打印结果一致,则表明此模块无法实现协程,如果不一致,则表明可以实现
import socket

print(socket.socket)
from gevent import monkey

monkey.patch_all()
print(socket.socket)

# 打印如下:
# <class 'socket.socket'>
# <class 'gevent._socket3.socket'>

# 如果不使用monkey这个功能,那么打印结果应该是一样的:
# <class 'socket.socket'>
# <class 'socket.socket'>






# 方式二,直接看monkey.patch_all方法的源码:
# 默认参数中,模块名的值为True的表示支持,为False则表示不支持
def patch_all(socket=True, dns=True, time=True, select=True, thread=True, os=True, ssl=True, httplib=False,
              subprocess=True, sys=False, aggressive=True, Event=False,
              builtins=True, signal=True):

asyncio模块

代码示例

import asyncio


# 只要函数中包含了await关键字,那么def前面必须写async
async def func(name, age):
    print(f'start...我叫{name},今年{age}岁了')
    '''
    await 后面跟的是可能会发生阻塞的方法
    await 关键字必须写在一个async开头的函数里
    await是一个只能在协程函数中使用的关键字,用于遇到IO操作时挂起 当前协程(任务)
    当前协程(任务)挂起过程中 事件循环可以去执行其他的协程(任务)
    当前协程IO处理完成时,可以再次切换回来执行await之后的代码。代码如下:
    '''

    await asyncio.sleep(2)
    print('end...')


# 初始化一个事件循环对象
loop = asyncio.get_event_loop()
# 运行函数必须放在下面的方法中
loop.run_until_complete(func('alex', 36))

上面是简单的运行了一个函数,如果要运行多个函数只需要将要运行的函数放到一个列表中,再将函数列表放置到asyncio.wait()方法中,然后传给run_until_complete()方法,如下:

import asyncio


async def func(name, age):
    print(f'start... 我叫{name},今年{age}岁了')
    await asyncio.sleep(2)
    print('end...')


loop = asyncio.get_event_loop()
func_list = [func('张三', 35), func('李四', 24)]
loop.run_until_complete(asyncio.wait(func_list))

代码实践

"""
通过协程实现爬取图片并保存到本地
下载图片使用第三方模块aiohttp,请提前安装 aiohttp 模块
"""


# -*- coding:utf-8 -*-

import aiohttp
import asyncio


async def fetch(session, url):
    print('发送请求:', url)
    async with session.get(url, verify_ssl=False) as response:
        content = await response.content.read()
        file_name = url.rsplit('_')[-1]
        with open(file_name, mode='wb') as file_obj:
            file_obj.write(content)


async def main():
    async with aiohttp.ClientSession() as session:
        url_list = [
            'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
            'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
            'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
        ]
        tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
        await asyncio.wait(tasks)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

task对象

Tasks用于并发调度协程,通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用 asyncio.create_task() 函数以外,还可以用低层级的 loop.create_task() 或 ensure_future() 函数。不建议手动实例化 Task 对象。

本质上是将协程对象封装成task对象,并将协程立即加入事件循环,同时追踪协程的状态。

注意:asyncio.create_task() 函数在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future() 函数。

示例1
import asyncio


async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"


async def main():
    print("main开始")

    # 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
    task1 = asyncio.create_task(func())

    # 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
    task2 = asyncio.create_task(func())

    print("main结束")

    # 当执行某协程遇到IO操作时,会自动化切换执行其他任务。
    # 此处的await是等待相对应的协程全都执行完毕并获取结果
    ret1 = await task1
    ret2 = await task2
    print(ret1, ret2)


asyncio.run(main())

示例2
import asyncio


async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"


async def main():
    print("main开始")

    # 创建协程,将协程封装到Task对象中并添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
    # 在调用
    task_list = [
        asyncio.create_task(func(), name="n1"),
        asyncio.create_task(func(), name="n2")
    ]

    print("main结束")

    # 当执行某协程遇到IO操作时,会自动化切换执行其他任务。
    # 此处的await是等待所有协程执行完毕,并将所有协程的返回值保存到done
    # 如果设置了timeout值,则意味着此处最多等待的秒,完成的协程返回值写入到done中,未完成则写到pending中。
    done, pending = await asyncio.wait(task_list, timeout=None)
    print(done, pending)


asyncio.run(main())
# 注意:asyncio.wait 源码内部会对列表中的每个协程执行ensure_future从而封装为Task对象,
# 所以在和wait配合使用时task_list的值为[func(),func()] 也是可以的。