实例 动态添加任务

效果, 多了会卡死

  1. # -*- coding: utf-8 -*-
  2. """
  3. @author: chenchen
  4. @software: PyCharm
  5. @file: task_pool.py
  6. @功能:
  7. @time: 2022-09-18 20:30
  8. """
  9. """
  10. asyncio 协程介绍:
  11. - 动态添加任务:
  12. - 方案是创建一个线程,使事件循环在线程内永久运行
  13. - 设置守护进程,随着主进程一起关闭
  14. - 自动停止任务
  15. - 阻塞任务完成
  16. - 协程池
  17. - asyncio.Semaphore() 进行控制
  18. """
  19. import asyncio
  20. import aiohttp
  21. import time
  22. import queue
  23. from threading import Thread
  24. class AsyncPool(object):
  25. """
  26. 1. 支持动态添加任务
  27. 2. 支持自动停止事件循环
  28. 3. 支持最大协程数
  29. """
  30. def __init__(self, maxsize=1, loop=None):
  31. """
  32. 初始化
  33. :param loop:
  34. :param maxsize: 默认为1
  35. """
  36. # 在jupyter需要这个,不然asyncio运行出错
  37. # import nest_asyncio
  38. # nest_asyncio.apply()
  39. # 队列,先进先出,根据队列是否为空判断,退出协程
  40. self.task = queue.Queue()
  41. # 协程池
  42. self.loop, _ = self.start_loop(loop)
  43. # 限制并发量为500
  44. self.semaphore = asyncio.Semaphore(maxsize, loop=self.loop)
  45. """ 添加任务 """
  46. def task_add(self, item=1):
  47. self.task.put(item)
  48. """ 任务完成 回调函数 """
  49. def task_done(self, fn):
  50. if fn:
  51. pass
  52. self.task.get()
  53. self.task.task_done()
  54. """ 等待任务执行完毕 """
  55. def wait(self):
  56. self.task.join()
  57. @property # 获取当前线程数
  58. def running(self):
  59. return self.task.qsize()
  60. @staticmethod # 运行事件循环
  61. def _start_thread_loop(loop):
  62. """
  63. 运行事件循环
  64. :param loop: loop以参数的形式传递进来运行
  65. :return:
  66. """
  67. # 将当前上下文的事件循环设置为循环。
  68. asyncio.set_event_loop(loop)
  69. # 开始事件循环
  70. loop.run_forever()
  71. """ 停止协程 关闭线程 """
  72. async def _stop_thread_loop(self, loop_time=1):
  73. while True:
  74. if self.task.empty():
  75. self.loop.stop() # 停止协程
  76. break
  77. await asyncio.sleep(loop_time)
  78. """ 运行事件循环 开启新线程 :param loop: 协程"""
  79. def start_loop(self, loop):
  80. # 获取一个事件循环
  81. if not loop:
  82. loop = asyncio.new_event_loop()
  83. loop_thread = Thread(target=self._start_thread_loop, args=(loop,))
  84. loop_thread.setDaemon(True) # 设置守护进程
  85. loop_thread.start() # 运行线程,同时协程事件循环也会运行
  86. return loop, loop_thread
  87. """ 队列为空,则关闭线程 :param loop_time """
  88. def stop_loop(self, loop_time=1):
  89. # 关闭线程任务
  90. asyncio.run_coroutine_threadsafe(self._stop_thread_loop(loop_time), self.loop)
  91. """ 释放线程 """
  92. def release(self, loop_time=1):
  93. self.stop_loop(loop_time)
  94. """ 信号包装 """
  95. async def async_semaphore_func(self, func):
  96. async with self.semaphore:
  97. return await func
  98. """ 提交任务到事件循环 :param func: 异步函数对象, callback: 回调函数"""
  99. def submit(self, func, callback=None):
  100. self.task_add()
  101. # 将协程注册一个到运行在线程中的循环,thread_loop 会获得一个环任务
  102. # 注意:run_coroutine_threadsafe 这个方法只能用在运行在线程中的循环事件使用
  103. # future = asyncio.run_coroutine_threadsafe(func, self.loop)
  104. future = asyncio.run_coroutine_threadsafe(self.async_semaphore_func(func), self.loop)
  105. # 添加回调函数,添加顺序调用
  106. future.add_done_callback(callback)
  107. future.add_done_callback(self.task_done)
  108. async def thread_example(i):
  109. url = "http://127.0.0.1:3001/?num={}".format(i)
  110. async with aiohttp.ClientSession() as session:
  111. async with session.get(url) as res:
  112. # print(res.status)
  113. # print(res.content)
  114. return await res.text()
  115. def my_callback(future):
  116. result = future.result()
  117. print('返回值: ', result)
  118. def main():
  119. # 任务组, 最大协程数
  120. pool = AsyncPool(maxsize=100000)
  121. # 增加任务
  122. pool.submit(thread_example(1), my_callback)
  123. # 插入任务任务
  124. # for i in range(10):
  125. # pool.submit(thread_example(i), my_callback)
  126. print("等待子线程结束1...")
  127. # 停止事件循环
  128. pool.release()
  129. # 获取线程数
  130. print(pool.running)
  131. print("等待子线程结束2...")
  132. # 等待
  133. pool.wait()
  134. print("等待子线程结束3...")
  135. if __name__ == '__main__':
  136. start_time = time.time()
  137. main()
  138. end_time = time.time()
  139. print("run time: ", end_time - start_time)

完美的异步网络发送

只要传递 session 即可完成一个数据, 发起多个不同的接口

  1. # -*- coding: utf-8 -*-
  2. """
  3. @author: chenchen
  4. @software: PyCharm
  5. @file: Thread.py
  6. @功能:
  7. @time: 2022-09-18 1:07
  8. """
  9. import json, platform, time, re
  10. from PyQt5 import QtCore
  11. from PyQt5.QtCore import Qt, pyqtSignal
  12. # uvloop linux系统的时候安装这个 pip install uvloop -i https://pypi.douban.com/simple
  13. import aiohttp, asyncio, queue, traceback
  14. from aiohttp import ClientSession, client_exceptions, TCPConnector, ClientTimeout
  15. from Meal import Meal, Shop
  16. from utils import api
  17. """ 这是第一个执行的线程 整个循环都在这处理 """
  18. class RunTherad(QtCore.QThread):
  19. start_prin = pyqtSignal(dict)
  20. start_prin_status = pyqtSignal(dict)
  21. def __init__(self):
  22. super(RunTherad, self).__init__()
  23. self.dataList = []
  24. self.Meal = Meal()
  25. self.Shop = Shop()
  26. """
  27. linux: uvloop.EventLoopPolicy())
  28. win: asyncio.WindowsSelectorEventLoopPolicy()
  29. """
  30. sys = platform.system()
  31. if sys == 'Windows':
  32. print('windows系统')
  33. asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
  34. elif sys == 'Linux':
  35. # asyncio.set_event_loop_policy(uvloop.EventLoopPolicy)
  36. print('linux系统')
  37. else:
  38. pass
  39. self.new_loop = asyncio.new_event_loop()
  40. self.queue_data = queue.Queue() # 任务队列
  41. self.timeout_domains = [] # 超时任务队列存放
  42. self.unknown_error_domains = [] # 超时未知错误队列存放
  43. def run(self):
  44. """ 单词运行代码 """
  45. # asyncio.run(self.main(self.dataList))
  46. # print("成功连接 number: {}, 超时连接 number: {}, 失败连接数 number: {} ====================".format(self.queue_data.qsize(), len(self.timeout_domains),len(self.unknown_error_domains)))
  47. # mtshop = self.Meal.getData()
  48. # if len(mtshop['data']) > 0:
  49. # mtshop = self.Meal.getData()
  50. # mtshop = json.dumps(mtshop['data'], ensure_ascii=False)
  51. # mtshop = mtshop.replace('\'', '\"')
  52. # mtshop= json.loads(json.dumps(mtshop, ensure_ascii=False))
  53. # self.dataList = json.loads(mtshop)
  54. # print(self.dataList, type(self.dataList))
  55. # asyncio.run(self.main(self.dataList))
  56. # print("成功连接 number: {}, 超时连接 number: {}, 失败连接数 number: {} ====================".format(self.queue_data.qsize(), len(self.timeout_domains), len( self.unknown_error_domains)))
  57. # self.timeout_domains = [] # 超时任务队列存放
  58. # self.unknown_error_domains = [] # 超时未知错误队列存放
  59. # self.queue_data = queue.Queue()
  60. # time.sleep(1)
  61. try:
  62. """ 批量运行代码 """
  63. while True:
  64. mtshop = self.Meal.getData()
  65. print('本次处理数据:', len(mtshop['data']))
  66. if len(mtshop['data']) > 0:
  67. # mtshop = self.Meal.getData()
  68. mtshop = json.dumps(mtshop['data'], ensure_ascii=False)
  69. mtshop = mtshop.replace('\'', '\"')
  70. mtshop = json.loads(json.dumps(mtshop, ensure_ascii=False))
  71. self.dataList = json.loads(mtshop)
  72. """ 启动异步网络请求 """
  73. asyncio.run(self.main(self.dataList))
  74. print("成功连接 number: {}, 超时连接 number: {}, 失败连接数 number: {} ====================".format(self.queue_data.qsize(), len(self.timeout_domains), len( self.unknown_error_domains)))
  75. self.timeout_domains = [] # 超时任务队列存放
  76. self.unknown_error_domains = [] # 超时未知错误队列存放
  77. self.queue_data = queue.Queue()
  78. time.sleep(5)
  79. else:
  80. print('数据库没有任务, 等待10秒后再次检测=====>>>>>')
  81. time.sleep(10)
  82. except Exception as e:
  83. print('run报错:', e)
  84. async def fetch(self, session, n, linedata):
  85. """
  86. :param session: aiohttp.ClientSession
  87. :param n: 数量 序号
  88. :param linedata: 整条的onedata
  89. """
  90. cookie = linedata['cookie']
  91. _state = linedata['state']
  92. _expire = linedata['expire']
  93. _mark = linedata['mark']
  94. _expiretime = linedata['expiretime']
  95. _shop_id = linedata['shop_id']
  96. region_id = re.findall('region_id=(.*?);', cookie)[0]
  97. region_version = re.findall('region_version=(.*?);', cookie)[0]
  98. start_time = time.time()
  99. try:
  100. """ 判断店铺 是否过期, 即将过期 """
  101. _expiretime = _expiretime + ' 23:59:59'
  102. _expiretime = int(time.mktime(time.strptime(_expiretime, '%Y-%m-%d %H:%M:%S')))
  103. _currenttime =int(time.time())
  104. if _currenttime > _expiretime:
  105. # print('过期了:', _shop_id)
  106. if _expire != 1:
  107. await self.Meal.shop_expire_update(self, session,_shop_id, 1)
  108. elif _expiretime - _currenttime < 86399:
  109. # print('即将过期:', _shop_id)
  110. if _expire != 2:
  111. await self.Meal.shop_expire_update(self, session, _shop_id, 2)
  112. """ 不是营业中状态的 就只查询店铺状态, 否则就查询店铺订单出餐并出单"""
  113. if _state != 1:
  114. await self.Shop.businessSatus( session, cookie, region_id, region_version, _state)
  115. end_time = time.time()
  116. cost = end_time - start_time
  117. print(f"营业状态请求:{n}, 耗时:{cost}, 店铺: {linedata['shop_id']}")
  118. else:
  119. url, param, headers = api.dataShopOrder(self, cookie, region_id, region_version)
  120. async with session.get(url=url, params=param, headers=headers, timeout=5, verify_ssl=False) as response:
  121. d = await response.json() # 返回订单的数据 是否有
  122. if d['code'] == 1001: # """ cookie 失效请重新登录 """
  123. await self.Meal.shop_cookie_update(session, {'shop_id': linedata['shop_id']})
  124. elif d['code'] == 0: # """ cookie 正常 """
  125. if len(d['data']['wmOrderList']) > 0: # 判断是否有订单数据, 循环遍历订单是否可以出餐
  126. for index, item in enumerate(d['data']['wmOrderList']):
  127. js = json.loads(item['orderInfo'])
  128. ts = time.localtime().tm_hour # 1. 这里还需要判断 当前的时间, 选择使用 指定时间
  129. num = js['num']
  130. order_time = js['order_time']
  131. order_num = js['wm_order_id_view_str']
  132. cityId = js['poi_city_id']
  133. wmPoiId = js['wm_poi_id']
  134. utime = js['utime']
  135. # 按指定使用出餐
  136. vals = {'region_id': region_id, 'region_version': region_version, 'wmPoiId': wmPoiId, 'num': num, 'order_num': order_num, 'order_time': order_time, 'cityId': cityId, 'utime': utime, 'cookie': cookie }
  137. if ts == 11:
  138. if utime + linedata['meal_morning_time'] < int(time.time()):
  139. print(f"午高峰时间出餐 订餐号: {num}, 下单时间: {order_time}, 订单号: {order_num}, 店铺id: {wmPoiId}, 城市id: {cityId}")
  140. await self.Meal.Receive_send(session, vals)
  141. elif ts == 17 or ts == 18:
  142. if utime + linedata['meal_night_time'] < int(time.time()):
  143. print(f"晚高峰时间出餐 订餐号: {num}, 下单时间: {order_time}, 订单号: {order_num}, 店铺id: {wmPoiId}, 城市id: {cityId}")
  144. await self.Meal.Receive_send(session, vals)
  145. else:
  146. if utime + linedata['meal_time'] < int(time.time()):
  147. print(f"正常时间出餐 订餐号: {num}, 下单时间: {order_time}, 订单号: {order_num}, 店铺id: {wmPoiId}, 城市id: {cityId}")
  148. await self.Meal.Receive_send(session, vals)
  149. end_time = time.time()
  150. cost = end_time - start_time
  151. print(f"本次请求:{n}, 耗时:{cost}, 店铺: {linedata['shop_id']}")
  152. else:
  153. print('查询店铺订单, 网络错误, 请检查是否有网络信号')
  154. self.queue_data.put(n)
  155. except client_exceptions.ServerTimeoutError as timeout_error: # 记录超时的店铺请求
  156. print("fetch函数 请求超时的店铺 error: {}, url: {}".format(timeout_error, linedata['shop_id']))
  157. self.timeout_domains.append(linedata['shop_id'])
  158. except Exception:
  159. print("fetc函数 错误捕捉 unknown error: {}".format(traceback.format_exc()))
  160. self.unknown_error_domains.append(linedata['shop_id'])
  161. async def chunks(self, sem, session, i, oneData):
  162. try:
  163. async with sem:
  164. await self.fetch(session, i + 1, oneData)
  165. # self.new_loop.call_soon_threadsafe(self.new_loop.stop)
  166. except Exception as e:
  167. print("chunks报错:", e)
  168. async def main(self, data):
  169. sem = asyncio.Semaphore(100)
  170. # timeout = ClientTimeout(total=5, connect=5, sock_connect=15, sock_read=5)
  171. timeout = ClientTimeout(total=10, connect=10000, sock_connect=5, sock_read=5)
  172. async with ClientSession(connector=TCPConnector(limit=100), timeout=timeout) as session:
  173. tasks = [asyncio.create_task(self.chunks(sem, session, index, shop_data)) for index, shop_data in enumerate(data)]
  174. await asyncio.wait(tasks)
  175. # print('结束:' ,await asyncio.wait(tasks))