本文基于python3 asyncio, aiohttp
首先要声明一个点,python中创建的async协程函数无法直接运行,必须要添加到loop循环时间中才能运行。
一般采用两种方式运行协程函数
_
例1:
import asyncio
loop = asyncio.get_event_loop()
tasks = [asyncio.create_task( 此处放入协程函数)] # 创建协程事件
loop.run_until_complete(asyncio.wait(tasks))
# 如果你的tasks是个列表,run_until_complete中需要添加asyncio.wait,
# 如果是单个协程对象 比如上方例子中的 async def main()
# 可以省略 tasks = ...步骤 直接写 loop.run_until_complete(main())
例2:
不创建事件循环 直接使用 asyncio.run() 逻辑与例1一致
其他的一些注意事项可以百度搜一搜
_
首先创建一个单例模式的异步请求类
from aiohttp import ClientSession, TCPConnector
import asyncio
class CS:
# 实现单例模式
def __new__(cls, *args, **kwargs):
if not hasattr(CS, '_single'):
CS._single = object.__new__(cls)
return CS._single
# 创建一个异步请求类session, connector=TCPConnector(ssl=False)等同于requests.post(verify=False)
# 跳过证书验证
def __init__(self, headers=None):
self._cs = ClientSession(connector=TCPConnector(ssl=False), headers=headers)
# 添加请求头
def add_headers(self, headers):
self._cs._prepare_headers(headers)
# 不建议在类内写逻辑, 比如什么获取token之类的,尽量减少耦合度
async def post(self, url, **kwargs):
async with self._cs.post(url, **kwargs) as resp:
return await resp.json()
async def get(self, url, **kwargs):
async with self._cs.get(url, **kwargs) as resp:
return await resp.json()
async def close(self):
await self._cs.close()
async def main():
cs = CS()
# 获取token
response = await cs.get(url, params)
token = response['token']
# 将token写入请求头
cs.add_headers({'token': token})
result = await cs.post(url, json)
await cs.close()
return result
asyncio.run(main())
后续添加协程函数时, 不可使用同一个类中的self.post(),self.get()之类的方法。因为在init的时候
已经创建了一个session, 直接使用self.post()将会导致创建两个session接着报错。
在调用完协程函数后记得close()
self.post(),self.get()是在实例化后,类内没有满足需求时还需要去请求其他接口时使用。此时可以保证用的是同一个session
例子:
async def main(): #非类内
cs = CS(headers={‘x-stream-id’: get_x_stream_id()}
response = await cs.post(url=url, json=json) # post方法内已经将返回的response转化成json了
cs.close() # 必须关闭
return response