实例 动态添加任务
效果, 多了会卡死
# -*- coding: utf-8 -*-"""@author: chenchen@software: PyCharm@file: task_pool.py@功能:@time: 2022-09-18 20:30""""""asyncio 协程介绍:- 动态添加任务:- 方案是创建一个线程,使事件循环在线程内永久运行- 设置守护进程,随着主进程一起关闭- 自动停止任务- 阻塞任务完成- 协程池- asyncio.Semaphore() 进行控制"""import asyncioimport aiohttpimport timeimport queuefrom threading import Threadclass AsyncPool(object):"""1. 支持动态添加任务2. 支持自动停止事件循环3. 支持最大协程数"""def __init__(self, maxsize=1, loop=None):"""初始化:param loop::param maxsize: 默认为1"""# 在jupyter需要这个,不然asyncio运行出错# import nest_asyncio# nest_asyncio.apply()# 队列,先进先出,根据队列是否为空判断,退出协程self.task = queue.Queue()# 协程池self.loop, _ = self.start_loop(loop)# 限制并发量为500self.semaphore = asyncio.Semaphore(maxsize, loop=self.loop)""" 添加任务 """def task_add(self, item=1):self.task.put(item)""" 任务完成 回调函数 """def task_done(self, fn):if fn:passself.task.get()self.task.task_done()""" 等待任务执行完毕 """def wait(self):self.task.join()@property # 获取当前线程数def running(self):return self.task.qsize()@staticmethod # 运行事件循环def _start_thread_loop(loop):"""运行事件循环:param loop: loop以参数的形式传递进来运行:return:"""# 将当前上下文的事件循环设置为循环。asyncio.set_event_loop(loop)# 开始事件循环loop.run_forever()""" 停止协程 关闭线程 """async def _stop_thread_loop(self, loop_time=1):while True:if self.task.empty():self.loop.stop() # 停止协程breakawait asyncio.sleep(loop_time)""" 运行事件循环 开启新线程 :param loop: 协程"""def start_loop(self, loop):# 获取一个事件循环if not loop:loop = asyncio.new_event_loop()loop_thread = Thread(target=self._start_thread_loop, args=(loop,))loop_thread.setDaemon(True) # 设置守护进程loop_thread.start() # 运行线程,同时协程事件循环也会运行return loop, loop_thread""" 队列为空,则关闭线程 :param loop_time """def stop_loop(self, loop_time=1):# 关闭线程任务asyncio.run_coroutine_threadsafe(self._stop_thread_loop(loop_time), self.loop)""" 释放线程 """def release(self, loop_time=1):self.stop_loop(loop_time)""" 信号包装 """async def async_semaphore_func(self, func):async with self.semaphore:return await func""" 提交任务到事件循环 :param func: 异步函数对象, callback: 回调函数"""def submit(self, func, callback=None):self.task_add()# 将协程注册一个到运行在线程中的循环,thread_loop 会获得一个环任务# 注意:run_coroutine_threadsafe 这个方法只能用在运行在线程中的循环事件使用# future = asyncio.run_coroutine_threadsafe(func, self.loop)future = asyncio.run_coroutine_threadsafe(self.async_semaphore_func(func), self.loop)# 添加回调函数,添加顺序调用future.add_done_callback(callback)future.add_done_callback(self.task_done)async def thread_example(i):url = "http://127.0.0.1:3001/?num={}".format(i)async with aiohttp.ClientSession() as session:async with session.get(url) as res:# print(res.status)# print(res.content)return await res.text()def my_callback(future):result = future.result()print('返回值: ', result)def main():# 任务组, 最大协程数pool = AsyncPool(maxsize=100000)# 增加任务pool.submit(thread_example(1), my_callback)# 插入任务任务# for i in range(10):# pool.submit(thread_example(i), my_callback)print("等待子线程结束1...")# 停止事件循环pool.release()# 获取线程数print(pool.running)print("等待子线程结束2...")# 等待pool.wait()print("等待子线程结束3...")if __name__ == '__main__':start_time = time.time()main()end_time = time.time()print("run time: ", end_time - start_time)
完美的异步网络发送
只要传递 session 即可完成一个数据, 发起多个不同的接口
# -*- coding: utf-8 -*-"""@author: chenchen@software: PyCharm@file: Thread.py@功能:@time: 2022-09-18 1:07"""import json, platform, time, refrom PyQt5 import QtCorefrom PyQt5.QtCore import Qt, pyqtSignal# uvloop linux系统的时候安装这个 pip install uvloop -i https://pypi.douban.com/simpleimport aiohttp, asyncio, queue, tracebackfrom aiohttp import ClientSession, client_exceptions, TCPConnector, ClientTimeoutfrom Meal import Meal, Shopfrom utils import api""" 这是第一个执行的线程 整个循环都在这处理 """class RunTherad(QtCore.QThread):start_prin = pyqtSignal(dict)start_prin_status = pyqtSignal(dict)def __init__(self):super(RunTherad, self).__init__()self.dataList = []self.Meal = Meal()self.Shop = Shop()"""linux: uvloop.EventLoopPolicy())win: asyncio.WindowsSelectorEventLoopPolicy()"""sys = platform.system()if sys == 'Windows':print('windows系统')asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())elif sys == 'Linux':# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy)print('linux系统')else:passself.new_loop = asyncio.new_event_loop()self.queue_data = queue.Queue() # 任务队列self.timeout_domains = [] # 超时任务队列存放self.unknown_error_domains = [] # 超时未知错误队列存放def run(self):""" 单词运行代码 """# asyncio.run(self.main(self.dataList))# print("成功连接 number: {}, 超时连接 number: {}, 失败连接数 number: {} ====================".format(self.queue_data.qsize(), len(self.timeout_domains),len(self.unknown_error_domains)))# mtshop = self.Meal.getData()# if len(mtshop['data']) > 0:# mtshop = self.Meal.getData()# mtshop = json.dumps(mtshop['data'], ensure_ascii=False)# mtshop = mtshop.replace('\'', '\"')# mtshop= json.loads(json.dumps(mtshop, ensure_ascii=False))# self.dataList = json.loads(mtshop)# print(self.dataList, type(self.dataList))# asyncio.run(self.main(self.dataList))# print("成功连接 number: {}, 超时连接 number: {}, 失败连接数 number: {} ====================".format(self.queue_data.qsize(), len(self.timeout_domains), len( self.unknown_error_domains)))# self.timeout_domains = [] # 超时任务队列存放# self.unknown_error_domains = [] # 超时未知错误队列存放# self.queue_data = queue.Queue()# time.sleep(1)try:""" 批量运行代码 """while True:mtshop = self.Meal.getData()print('本次处理数据:', len(mtshop['data']))if len(mtshop['data']) > 0:# mtshop = self.Meal.getData()mtshop = json.dumps(mtshop['data'], ensure_ascii=False)mtshop = mtshop.replace('\'', '\"')mtshop = json.loads(json.dumps(mtshop, ensure_ascii=False))self.dataList = json.loads(mtshop)""" 启动异步网络请求 """asyncio.run(self.main(self.dataList))print("成功连接 number: {}, 超时连接 number: {}, 失败连接数 number: {} ====================".format(self.queue_data.qsize(), len(self.timeout_domains), len( self.unknown_error_domains)))self.timeout_domains = [] # 超时任务队列存放self.unknown_error_domains = [] # 超时未知错误队列存放self.queue_data = queue.Queue()time.sleep(5)else:print('数据库没有任务, 等待10秒后再次检测=====>>>>>')time.sleep(10)except Exception as e:print('run报错:', e)async def fetch(self, session, n, linedata):""":param session: aiohttp.ClientSession:param n: 数量 序号:param linedata: 整条的onedata"""cookie = linedata['cookie']_state = linedata['state']_expire = linedata['expire']_mark = linedata['mark']_expiretime = linedata['expiretime']_shop_id = linedata['shop_id']region_id = re.findall('region_id=(.*?);', cookie)[0]region_version = re.findall('region_version=(.*?);', cookie)[0]start_time = time.time()try:""" 判断店铺 是否过期, 即将过期 """_expiretime = _expiretime + ' 23:59:59'_expiretime = int(time.mktime(time.strptime(_expiretime, '%Y-%m-%d %H:%M:%S')))_currenttime =int(time.time())if _currenttime > _expiretime:# print('过期了:', _shop_id)if _expire != 1:await self.Meal.shop_expire_update(self, session,_shop_id, 1)elif _expiretime - _currenttime < 86399:# print('即将过期:', _shop_id)if _expire != 2:await self.Meal.shop_expire_update(self, session, _shop_id, 2)""" 不是营业中状态的 就只查询店铺状态, 否则就查询店铺订单出餐并出单"""if _state != 1:await self.Shop.businessSatus( session, cookie, region_id, region_version, _state)end_time = time.time()cost = end_time - start_timeprint(f"营业状态请求:{n}, 耗时:{cost}, 店铺: {linedata['shop_id']}")else:url, param, headers = api.dataShopOrder(self, cookie, region_id, region_version)async with session.get(url=url, params=param, headers=headers, timeout=5, verify_ssl=False) as response:d = await response.json() # 返回订单的数据 是否有if d['code'] == 1001: # """ cookie 失效请重新登录 """await self.Meal.shop_cookie_update(session, {'shop_id': linedata['shop_id']})elif d['code'] == 0: # """ cookie 正常 """if len(d['data']['wmOrderList']) > 0: # 判断是否有订单数据, 循环遍历订单是否可以出餐for index, item in enumerate(d['data']['wmOrderList']):js = json.loads(item['orderInfo'])ts = time.localtime().tm_hour # 1. 这里还需要判断 当前的时间, 选择使用 指定时间num = js['num']order_time = js['order_time']order_num = js['wm_order_id_view_str']cityId = js['poi_city_id']wmPoiId = js['wm_poi_id']utime = js['utime']# 按指定使用出餐vals = {'region_id': region_id, 'region_version': region_version, 'wmPoiId': wmPoiId, 'num': num, 'order_num': order_num, 'order_time': order_time, 'cityId': cityId, 'utime': utime, 'cookie': cookie }if ts == 11:if utime + linedata['meal_morning_time'] < int(time.time()):print(f"午高峰时间出餐 订餐号: {num}, 下单时间: {order_time}, 订单号: {order_num}, 店铺id: {wmPoiId}, 城市id: {cityId}")await self.Meal.Receive_send(session, vals)elif ts == 17 or ts == 18:if utime + linedata['meal_night_time'] < int(time.time()):print(f"晚高峰时间出餐 订餐号: {num}, 下单时间: {order_time}, 订单号: {order_num}, 店铺id: {wmPoiId}, 城市id: {cityId}")await self.Meal.Receive_send(session, vals)else:if utime + linedata['meal_time'] < int(time.time()):print(f"正常时间出餐 订餐号: {num}, 下单时间: {order_time}, 订单号: {order_num}, 店铺id: {wmPoiId}, 城市id: {cityId}")await self.Meal.Receive_send(session, vals)end_time = time.time()cost = end_time - start_timeprint(f"本次请求:{n}, 耗时:{cost}, 店铺: {linedata['shop_id']}")else:print('查询店铺订单, 网络错误, 请检查是否有网络信号')self.queue_data.put(n)except client_exceptions.ServerTimeoutError as timeout_error: # 记录超时的店铺请求print("fetch函数 请求超时的店铺 error: {}, url: {}".format(timeout_error, linedata['shop_id']))self.timeout_domains.append(linedata['shop_id'])except Exception:print("fetc函数 错误捕捉 unknown error: {}".format(traceback.format_exc()))self.unknown_error_domains.append(linedata['shop_id'])async def chunks(self, sem, session, i, oneData):try:async with sem:await self.fetch(session, i + 1, oneData)# self.new_loop.call_soon_threadsafe(self.new_loop.stop)except Exception as e:print("chunks报错:", e)async def main(self, data):sem = asyncio.Semaphore(100)# timeout = ClientTimeout(total=5, connect=5, sock_connect=15, sock_read=5)timeout = ClientTimeout(total=10, connect=10000, sock_connect=5, sock_read=5)async with ClientSession(connector=TCPConnector(limit=100), timeout=timeout) as session:tasks = [asyncio.create_task(self.chunks(sem, session, index, shop_data)) for index, shop_data in enumerate(data)]await asyncio.wait(tasks)# print('结束:' ,await asyncio.wait(tasks))
