实例 动态添加任务
效果, 多了会卡死
# -*- coding: utf-8 -*-
"""
@author: chenchen
@software: PyCharm
@file: task_pool.py
@功能:
@time: 2022-09-18 20:30
"""
"""
asyncio 协程介绍:
- 动态添加任务:
- 方案是创建一个线程,使事件循环在线程内永久运行
- 设置守护进程,随着主进程一起关闭
- 自动停止任务
- 阻塞任务完成
- 协程池
- asyncio.Semaphore() 进行控制
"""
import asyncio
import aiohttp
import time
import queue
from threading import Thread
class AsyncPool(object):
"""
1. 支持动态添加任务
2. 支持自动停止事件循环
3. 支持最大协程数
"""
def __init__(self, maxsize=1, loop=None):
"""
初始化
:param loop:
:param maxsize: 默认为1
"""
# 在jupyter需要这个,不然asyncio运行出错
# import nest_asyncio
# nest_asyncio.apply()
# 队列,先进先出,根据队列是否为空判断,退出协程
self.task = queue.Queue()
# 协程池
self.loop, _ = self.start_loop(loop)
# 限制并发量为500
self.semaphore = asyncio.Semaphore(maxsize, loop=self.loop)
""" 添加任务 """
def task_add(self, item=1):
self.task.put(item)
""" 任务完成 回调函数 """
def task_done(self, fn):
if fn:
pass
self.task.get()
self.task.task_done()
""" 等待任务执行完毕 """
def wait(self):
self.task.join()
@property # 获取当前线程数
def running(self):
return self.task.qsize()
@staticmethod # 运行事件循环
def _start_thread_loop(loop):
"""
运行事件循环
:param loop: loop以参数的形式传递进来运行
:return:
"""
# 将当前上下文的事件循环设置为循环。
asyncio.set_event_loop(loop)
# 开始事件循环
loop.run_forever()
""" 停止协程 关闭线程 """
async def _stop_thread_loop(self, loop_time=1):
while True:
if self.task.empty():
self.loop.stop() # 停止协程
break
await asyncio.sleep(loop_time)
""" 运行事件循环 开启新线程 :param loop: 协程"""
def start_loop(self, loop):
# 获取一个事件循环
if not loop:
loop = asyncio.new_event_loop()
loop_thread = Thread(target=self._start_thread_loop, args=(loop,))
loop_thread.setDaemon(True) # 设置守护进程
loop_thread.start() # 运行线程,同时协程事件循环也会运行
return loop, loop_thread
""" 队列为空,则关闭线程 :param loop_time """
def stop_loop(self, loop_time=1):
# 关闭线程任务
asyncio.run_coroutine_threadsafe(self._stop_thread_loop(loop_time), self.loop)
""" 释放线程 """
def release(self, loop_time=1):
self.stop_loop(loop_time)
""" 信号包装 """
async def async_semaphore_func(self, func):
async with self.semaphore:
return await func
""" 提交任务到事件循环 :param func: 异步函数对象, callback: 回调函数"""
def submit(self, func, callback=None):
self.task_add()
# 将协程注册一个到运行在线程中的循环,thread_loop 会获得一个环任务
# 注意:run_coroutine_threadsafe 这个方法只能用在运行在线程中的循环事件使用
# future = asyncio.run_coroutine_threadsafe(func, self.loop)
future = asyncio.run_coroutine_threadsafe(self.async_semaphore_func(func), self.loop)
# 添加回调函数,添加顺序调用
future.add_done_callback(callback)
future.add_done_callback(self.task_done)
async def thread_example(i):
url = "http://127.0.0.1:3001/?num={}".format(i)
async with aiohttp.ClientSession() as session:
async with session.get(url) as res:
# print(res.status)
# print(res.content)
return await res.text()
def my_callback(future):
result = future.result()
print('返回值: ', result)
def main():
# 任务组, 最大协程数
pool = AsyncPool(maxsize=100000)
# 增加任务
pool.submit(thread_example(1), my_callback)
# 插入任务任务
# for i in range(10):
# pool.submit(thread_example(i), my_callback)
print("等待子线程结束1...")
# 停止事件循环
pool.release()
# 获取线程数
print(pool.running)
print("等待子线程结束2...")
# 等待
pool.wait()
print("等待子线程结束3...")
if __name__ == '__main__':
start_time = time.time()
main()
end_time = time.time()
print("run time: ", end_time - start_time)
完美的异步网络发送
只要传递 session 即可完成一个数据, 发起多个不同的接口
# -*- coding: utf-8 -*-
"""
@author: chenchen
@software: PyCharm
@file: Thread.py
@功能:
@time: 2022-09-18 1:07
"""
import json, platform, time, re
from PyQt5 import QtCore
from PyQt5.QtCore import Qt, pyqtSignal
# uvloop linux系统的时候安装这个 pip install uvloop -i https://pypi.douban.com/simple
import aiohttp, asyncio, queue, traceback
from aiohttp import ClientSession, client_exceptions, TCPConnector, ClientTimeout
from Meal import Meal, Shop
from utils import api
""" 这是第一个执行的线程 整个循环都在这处理 """
class RunTherad(QtCore.QThread):
start_prin = pyqtSignal(dict)
start_prin_status = pyqtSignal(dict)
def __init__(self):
super(RunTherad, self).__init__()
self.dataList = []
self.Meal = Meal()
self.Shop = Shop()
"""
linux: uvloop.EventLoopPolicy())
win: asyncio.WindowsSelectorEventLoopPolicy()
"""
sys = platform.system()
if sys == 'Windows':
print('windows系统')
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
elif sys == 'Linux':
# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy)
print('linux系统')
else:
pass
self.new_loop = asyncio.new_event_loop()
self.queue_data = queue.Queue() # 任务队列
self.timeout_domains = [] # 超时任务队列存放
self.unknown_error_domains = [] # 超时未知错误队列存放
def run(self):
""" 单词运行代码 """
# asyncio.run(self.main(self.dataList))
# print("成功连接 number: {}, 超时连接 number: {}, 失败连接数 number: {} ====================".format(self.queue_data.qsize(), len(self.timeout_domains),len(self.unknown_error_domains)))
# mtshop = self.Meal.getData()
# if len(mtshop['data']) > 0:
# mtshop = self.Meal.getData()
# mtshop = json.dumps(mtshop['data'], ensure_ascii=False)
# mtshop = mtshop.replace('\'', '\"')
# mtshop= json.loads(json.dumps(mtshop, ensure_ascii=False))
# self.dataList = json.loads(mtshop)
# print(self.dataList, type(self.dataList))
# asyncio.run(self.main(self.dataList))
# print("成功连接 number: {}, 超时连接 number: {}, 失败连接数 number: {} ====================".format(self.queue_data.qsize(), len(self.timeout_domains), len( self.unknown_error_domains)))
# self.timeout_domains = [] # 超时任务队列存放
# self.unknown_error_domains = [] # 超时未知错误队列存放
# self.queue_data = queue.Queue()
# time.sleep(1)
try:
""" 批量运行代码 """
while True:
mtshop = self.Meal.getData()
print('本次处理数据:', len(mtshop['data']))
if len(mtshop['data']) > 0:
# mtshop = self.Meal.getData()
mtshop = json.dumps(mtshop['data'], ensure_ascii=False)
mtshop = mtshop.replace('\'', '\"')
mtshop = json.loads(json.dumps(mtshop, ensure_ascii=False))
self.dataList = json.loads(mtshop)
""" 启动异步网络请求 """
asyncio.run(self.main(self.dataList))
print("成功连接 number: {}, 超时连接 number: {}, 失败连接数 number: {} ====================".format(self.queue_data.qsize(), len(self.timeout_domains), len( self.unknown_error_domains)))
self.timeout_domains = [] # 超时任务队列存放
self.unknown_error_domains = [] # 超时未知错误队列存放
self.queue_data = queue.Queue()
time.sleep(5)
else:
print('数据库没有任务, 等待10秒后再次检测=====>>>>>')
time.sleep(10)
except Exception as e:
print('run报错:', e)
async def fetch(self, session, n, linedata):
"""
:param session: aiohttp.ClientSession
:param n: 数量 序号
:param linedata: 整条的onedata
"""
cookie = linedata['cookie']
_state = linedata['state']
_expire = linedata['expire']
_mark = linedata['mark']
_expiretime = linedata['expiretime']
_shop_id = linedata['shop_id']
region_id = re.findall('region_id=(.*?);', cookie)[0]
region_version = re.findall('region_version=(.*?);', cookie)[0]
start_time = time.time()
try:
""" 判断店铺 是否过期, 即将过期 """
_expiretime = _expiretime + ' 23:59:59'
_expiretime = int(time.mktime(time.strptime(_expiretime, '%Y-%m-%d %H:%M:%S')))
_currenttime =int(time.time())
if _currenttime > _expiretime:
# print('过期了:', _shop_id)
if _expire != 1:
await self.Meal.shop_expire_update(self, session,_shop_id, 1)
elif _expiretime - _currenttime < 86399:
# print('即将过期:', _shop_id)
if _expire != 2:
await self.Meal.shop_expire_update(self, session, _shop_id, 2)
""" 不是营业中状态的 就只查询店铺状态, 否则就查询店铺订单出餐并出单"""
if _state != 1:
await self.Shop.businessSatus( session, cookie, region_id, region_version, _state)
end_time = time.time()
cost = end_time - start_time
print(f"营业状态请求:{n}, 耗时:{cost}, 店铺: {linedata['shop_id']}")
else:
url, param, headers = api.dataShopOrder(self, cookie, region_id, region_version)
async with session.get(url=url, params=param, headers=headers, timeout=5, verify_ssl=False) as response:
d = await response.json() # 返回订单的数据 是否有
if d['code'] == 1001: # """ cookie 失效请重新登录 """
await self.Meal.shop_cookie_update(session, {'shop_id': linedata['shop_id']})
elif d['code'] == 0: # """ cookie 正常 """
if len(d['data']['wmOrderList']) > 0: # 判断是否有订单数据, 循环遍历订单是否可以出餐
for index, item in enumerate(d['data']['wmOrderList']):
js = json.loads(item['orderInfo'])
ts = time.localtime().tm_hour # 1. 这里还需要判断 当前的时间, 选择使用 指定时间
num = js['num']
order_time = js['order_time']
order_num = js['wm_order_id_view_str']
cityId = js['poi_city_id']
wmPoiId = js['wm_poi_id']
utime = js['utime']
# 按指定使用出餐
vals = {'region_id': region_id, 'region_version': region_version, 'wmPoiId': wmPoiId, 'num': num, 'order_num': order_num, 'order_time': order_time, 'cityId': cityId, 'utime': utime, 'cookie': cookie }
if ts == 11:
if utime + linedata['meal_morning_time'] < int(time.time()):
print(f"午高峰时间出餐 订餐号: {num}, 下单时间: {order_time}, 订单号: {order_num}, 店铺id: {wmPoiId}, 城市id: {cityId}")
await self.Meal.Receive_send(session, vals)
elif ts == 17 or ts == 18:
if utime + linedata['meal_night_time'] < int(time.time()):
print(f"晚高峰时间出餐 订餐号: {num}, 下单时间: {order_time}, 订单号: {order_num}, 店铺id: {wmPoiId}, 城市id: {cityId}")
await self.Meal.Receive_send(session, vals)
else:
if utime + linedata['meal_time'] < int(time.time()):
print(f"正常时间出餐 订餐号: {num}, 下单时间: {order_time}, 订单号: {order_num}, 店铺id: {wmPoiId}, 城市id: {cityId}")
await self.Meal.Receive_send(session, vals)
end_time = time.time()
cost = end_time - start_time
print(f"本次请求:{n}, 耗时:{cost}, 店铺: {linedata['shop_id']}")
else:
print('查询店铺订单, 网络错误, 请检查是否有网络信号')
self.queue_data.put(n)
except client_exceptions.ServerTimeoutError as timeout_error: # 记录超时的店铺请求
print("fetch函数 请求超时的店铺 error: {}, url: {}".format(timeout_error, linedata['shop_id']))
self.timeout_domains.append(linedata['shop_id'])
except Exception:
print("fetc函数 错误捕捉 unknown error: {}".format(traceback.format_exc()))
self.unknown_error_domains.append(linedata['shop_id'])
async def chunks(self, sem, session, i, oneData):
try:
async with sem:
await self.fetch(session, i + 1, oneData)
# self.new_loop.call_soon_threadsafe(self.new_loop.stop)
except Exception as e:
print("chunks报错:", e)
async def main(self, data):
sem = asyncio.Semaphore(100)
# timeout = ClientTimeout(total=5, connect=5, sock_connect=15, sock_read=5)
timeout = ClientTimeout(total=10, connect=10000, sock_connect=5, sock_read=5)
async with ClientSession(connector=TCPConnector(limit=100), timeout=timeout) as session:
tasks = [asyncio.create_task(self.chunks(sem, session, index, shop_data)) for index, shop_data in enumerate(data)]
await asyncio.wait(tasks)
# print('结束:' ,await asyncio.wait(tasks))