前言

asyncio函数:

异步IO采用消息循环的模式,重复“读取消息—处理消息”的过程,也就是说异步IO模型”需要一个消息循环,在消息循环中,主线程不断地重复“读取消息-处理消息”这一过程。

event_loop 事件循环:

程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。

coroutine 协程:

协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。

task 任务:

一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
async/await 关键字: 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

参考案列

https://cloud.tencent.com/developer/article/1907288?from=article.detail.1949693
https://cloud.tencent.com/developer/article/1738086?from=article.detail.1949693
https://cloud.tencent.com/developer/article/1785143?from=article.detail.1949693

安装

  1. pip install asyncio

案列演示

  1. from aiohttp import ClientSession
  2. import aiohttp, time
  3. import asyncio
  4. async def main(url):
  5. t1 = time.time()
  6. print("启动时间: %s" % t1)
  7. async with aiohttp.ClientSession() as seesion:
  8. async with seesion.get(url=url) as resp:
  9. t2 = time.time()
  10. print("结束时间: %s" % t2)
  11. print('相差时间 cost: %.6f' % (t2 - t1))
  12. if __name__ == '__main__':
  13. url = "http://www.baidu.com"
  14. asyncio.run(main(url=url))
  15. """ 打印
  16. 启动时间: 1651936215.269375
  17. 结束时间: 1651936215.3499389
  18. 相差时间 cost: 0.080564
  19. """
  1. from aiohttp import ClientSession
  2. import aiohttp, time
  3. import asyncio
  4. tasks = []
  5. url = "https://www.baidu.com/{}"
  6. async def main(url):
  7. t1 = time.time()
  8. async with ClientSession() as session:
  9. async with session.get(url) as response:
  10. # response = await response.read()
  11. t2 = time.time()
  12. print(f'相差时间 cost: {t2 - t1} {url}')
  13. def run():
  14. for i in range(5):
  15. task = asyncio.ensure_future(main(url.format(i)))
  16. tasks.append(task)
  17. if __name__ == '__main__':
  18. loop = asyncio.get_event_loop()
  19. run()
  20. loop.run_until_complete(asyncio.wait(tasks))
  21. """ 打印
  22. 相差时间 cost: 0.1047825813293457 https://www.baidu.com/1
  23. 相差时间 cost: 0.1047821044921875 https://www.baidu.com/2
  24. 相差时间 cost: 0.2055821418762207 https://www.baidu.com/0
  25. 相差时间 cost: 0.11085987091064453 https://www.baidu.com/4
  26. 相差时间 cost: 0.11085987091064453 https://www.baidu.com/3
  27. 进程已结束,退出代码为 0
  28. """

threading多线程+aiohttp+多循环

代码 结合了 threading + aiohttp 多线程 + 多循环次数

  1. asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # 捕获报错
  2. new_loop = asyncio.new_event_loop()
  3. th = threading.Thread(target=self.Start_task, args=(new_loop,data[0], data[1], data[2], index,), name=f'threadQt_index:{index}', )
  4. self.ThreadList.append(th) # 加入队列
  5. th.setDaemon(False) #守护线程
  6. th.start()
  7. #=========================================================上面的吧new_loop 带进来了
  8. new_loop.run_until_complete(self.async_BaiduStatistics(id, Referer_url))
  9. #==========================================================
  10. async with aiohttp.ClientSession() as seesion:
  11. async with seesion.get(url=url, params=payload, headers=headers, timeout=1) as resp:
  12. pass

在for 循环中 使用threading 启动了一个线程 调用了 Start_task 这个函数, 这个函数是执行每个个URL 循环指定次数

  1. class Runthread_batch(QtCore.QThread):
  2. start_print = pyqtSignal(int, int, str)
  3. start_print_starus = pyqtSignal()
  4. start_print_api = pyqtSignal(dict)
  5. def __init__(self, batchlist, dataObj):
  6. super(Runthread_batch, self).__init__()
  7. self.ThreadList = [] # 后面需要对线程销毁
  8. self.start_num = 0
  9. def __del__(self):
  10. print("del 当程序关闭了会触发这个")
  11. def run(self):
  12. for index, item in enumerate(self.batchlist):
  13. isgo = True
  14. if stop_status_batch == 1: break
  15. data = item.split("----")
  16. # 投递任务
  17. asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # 捕获报错
  18. new_loop = asyncio.new_event_loop()
  19. th = threading.Thread(target=self.Start_task, args=(new_loop,data[0], data[1], data[2], index,), name=f'threadQt_index:{index}', )
  20. self.ThreadList.append(th) # 加入队列
  21. th.setDaemon(False) #守护线程
  22. th.start()
  23. self.start_num += 1
  24. # 这里实现 控制线程并行启动数量, 如果有空余的线程, 这里会跳出循环进行下一个url 的请求 并循环次数
  25. if self.start_num >= self.Therade_num:
  26. while isgo:
  27. if self.start_num < self.Therade_num:
  28. isgo = False
  29. self.sleep(2)
  30. isgo = True
  31. # print("当前线程数量:", len(threading.enumerate()))
  32. for th in self.ThreadList:
  33. print("销毁_ 投递任务线程:: ", th)
  34. th.join()
  35. self.ThreadList = []
  36. self.start_print_starus.emit() # 结束整个任务, 会给槽函数发送信号
  37. # 执行循环次数的代码函数, 因为实例化出 new_loop 用来执行 new_loop.run_until_complete(self.async_BaiduStatistics(id, Referer_url))
  38. def Start_task(self, new_loop, Referer_url, type, id, num):
  39. print(f"投递的任务: 索引:{str(num)} {Referer_url} {id} {type}")
  40. try:
  41. for i in range(1, self.For_num + 1):
  42. if stop_status_batch == 1: break
  43. new_loop.run_until_complete(self.async_BaiduStatistics(id, Referer_url))
  44. if i < 10:
  45. # print(f"索引 {str(num)} 投递了请求次数: {i} 投递了请求 {Referer_url}")
  46. self.start_print.emit(num, i, Referer_url)
  47. elif i % 10 == 0:
  48. # print(f"索引 {str(num)} 投递了请求次数: {i} 投递了请求 {Referer_url}")
  49. self.start_print.emit(num, i, Referer_url)
  50. self.start_num -= 1 # 表示运行完后 减 已完成运行的任务线程
  51. print("Start_task当前线程数量:", len(threading.enumerate()))
  52. except Exception as e:
  53. print("执行任务出错了", e)
  54. # 真正执行请求的代码 通过 aiohttp 请求
  55. async def async_BaiduStatistics(self, si, Referer_url):
  56. requestCounter = 0
  57. while requestCounter < self.requestRate:
  58. try:
  59. headers = {'Referer': Referer_url,'User-Agent': ua.random,}
  60. async with aiohttp.ClientSession() as seesion:
  61. async with seesion.get(url=url, params=payload, headers=headers, timeout=1) as resp:
  62. pass
  63. # 这里如果需要等待返回数据使用 await | await resp.text
  64. except Exception as e:
  65. print("执行投递错误:", e)
  66. requestCounter += 1

threading单线程+aiohttp+多循环

  1. # 初始化
  2. self.new_loop = asyncio.new_event_loop()
  3. asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # 捕获报错
  4. # 在 run 函数中调用 执行asyncio
  5. self.new_loop.run_until_complete(self.async_BaiduStatistics(self.id, self.url))
  6. # 在实际请求网络的函数代码中使用了 异步请求网络 0.2/s 左右
  7. async with aiohttp.ClientSession() as seesion:
  8. async with seesion.get(url=url, params=payload, headers=headers, timeout=1) as resp:
  9. pass
  1. # 单个网址刷 ok 0429
  2. class Runthread_specified(QtCore.QThread):
  3. start_printSP_Ui = pyqtSignal(int, int, str)
  4. start_print_starus = pyqtSignal()
  5. def __init__(self, specified):
  6. super(Runthread_specified, self).__init__()
  7. self.ThreadList = []
  8. self.Radio_start = specified["Radio_start"]
  9. self.api_url = specified["api_url"]
  10. self.requestRate = int(specified["then_mun"])
  11. self.for_num = int(specified["for_num"])
  12. self.id = specified["id"]
  13. self.url = specified["url"]
  14. self.type = specified["type"]
  15. self.UA = specified["UA"]
  16. self.keyData = specified["key"]
  17. self.locationX = specified["X"]
  18. self.start_num = 0 # 启动线程
  19. self.new_loop = asyncio.new_event_loop()
  20. asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # 捕获报错
  21. def run(self):
  22. for cnum in range(self.for_num + 1):
  23. isgo = True # 真表示等待 , 假 退出循环
  24. if stop_status_specified == 1: break
  25. self.start_num += 1
  26. # 投递任务
  27. try:
  28. self.new_loop.run_until_complete(self.async_BaiduStatistics(self.id, self.url))
  29. except Exception as e:
  30. print("eeee", e)
  31. if cnum < 10:
  32. # print(f"索引 {str(self.locationX)} 投递了请求次数: {cnum} 投递了请求 {self.url}")
  33. self.start_printSP_Ui.emit(self.locationX, cnum, self.url)
  34. elif cnum % 10 == 0:
  35. # print(f"索引 {str(self.locationX)} 投递了请求次数: {cnum} 投递了请求 {self.url}")
  36. self.start_printSP_Ui.emit(self.locationX, cnum, self.url)
  37. self.start_print_starus.emit()
  38. async def async_BaiduStatistics(self, si, Referer_url):
  39. requestCounter = 0
  40. while requestCounter < self.requestRate:
  41. try:
  42. ua = UserAgent()
  43. if self.keyData.rfind("\n") != -1:
  44. self.key = random.choice(self.keyData.split("\n"))
  45. else:
  46. self.key = self.keyData
  47. text = urllib.parse.quote(self.key)
  48. url = "https://hm.baidu.com/hm.gif?"
  49. payload = {}
  50. headers = {'Referer': Referer_url,'User-Agent': ua.random,}
  51. async with aiohttp.ClientSession() as seesion:
  52. async with seesion.get(url=url, params=payload, headers=headers, timeout=1) as resp:
  53. pass
  54. # timeout=2
  55. except Exception as e:
  56. print("执行投递错误:", e)
  57. requestCounter += 1

============

aiohttp各种参数设置案列

aoihttp 业务核心功能

发起 get 请求

  1. # -*- encoding: utf-8 -*-
  2. import asyncio
  3. import aiohttp
  4. async def main():
  5. async with aiohttp.ClientSession() as session:
  6. async with session.get('http://www.baidu.com') as resp:
  7. print(resp.status)
  8. res = await resp.text()
  9. print(res[:100])
  10. if __name__ == '__main__':
  11. # 注意:
  12. # python3.7+ 支持写法
  13. # asyncio.run(main())
  14. # python3.6及以下版本写法
  15. event_loop = asyncio.get_event_loop()
  16. result = event_loop.run_until_complete(asyncio.gather(main()))
  17. event_loop.close()

发起post 请求

  1. # -*- encoding: utf-8 -*-
  2. import asyncio
  3. import aiohttp
  4. async def post_v1():
  5. data = b'\x00Binary-data\x00' # 未经编码的数据通过bytes数据上传
  6. data = 'text' # 传递文本数据
  7. data = {'key': 'value'} # 传递form表单
  8. async with aiohttp.ClientSession() as sess:
  9. async with sess.post('http://httpbin.org/post', data=data) as resp:
  10. print(resp.status)
  11. # 复杂的 post 请求
  12. async def post_v2():
  13. payload = {'key': 'value'} # 传递 pyload
  14. async with aiohttp.ClientSession() as sess:
  15. async with sess.post('http://httpbin.org/post', json=payload) as resp:
  16. print(resp.status)
  17. if __name__ == '__main__':
  18. event_loop = asyncio.get_event_loop()
  19. result = event_loop.run_until_complete(asyncio.gather(main()))
  20. event_loop.close()

向 url 中传递参数

有些场景是需要拼接请求url 在这个时候可以使用本 case 来做处理

  1. # -*- encoding: utf-8 -*-
  2. import asyncio
  3. import aiohttp
  4. async def main():
  5. """ 以下三种方式均可以 """
  6. params = {'key1': 'value1', 'key2': 'value2'}
  7. params = [('key', 'value1'), ('key', 'value2')]
  8. params = 'key=value+1'
  9. async with aiohttp.ClientSession() as sess:
  10. async with sess.get('http://httpbin.org/get', params=params) as resp:
  11. print(resp.status)
  12. if __name__ == '__main__':
  13. event_loop = asyncio.get_event_loop()
  14. result = event_loop.run_until_complete(asyncio.gather(main()))
  15. event_loop.close()

向目标服务器上传文件

有时候,我们确实是有想服务器传文件的需求,eg:上传回执单;上传图片…… 100张 10000张的量级的时候我们会想用多线程去处理,但量再大 你再使用 多线程+requests 的方式就会发现有大量的报错,若有类似的使用场景,可以用以下 case 处理

  1. import aiohttp
  2. async def main():
  3. """ 传递文件 """
  4. files = {'file': open('report.xls', 'rb')}
  5. async with aiohttp.ClientSession() as sess:
  6. async with sess.post('http://httpbin.org/post', data=files) as resp:
  7. print(resp.status)
  8. print(await resp.text())
  9. async def main2():
  10. """ 实例化 FormData 可以指定 filename 和 content_type """
  11. data = aiohttp.FormData()
  12. data.add_field('file',
  13. open('report.xls', 'rb'),
  14. filename='report.xls',
  15. content_type='application/vnd.ms-excel')
  16. async with aiohttp.ClientSession() as sess:
  17. async with sess.post('http://httpbin.org/post', data=data) as resp:
  18. print(resp.status)
  19. print(await resp.text())
  20. async def main3():
  21. """ 流式上传文件 """
  22. async with aiohttp.ClientSession() as sess:
  23. with open('report.xls', 'rb') as f:
  24. async with sess.post('http://httpbin.org/post', data=f) as resp:
  25. print(resp.status)
  26. print(await resp.text())
  27. async def main4():
  28. """ 因为 content属性是 StreamReader(提供异步迭代器协议),
  29. 所以可以将 get 和 post 请求链接在一起。python3.6+能使用"""
  30. async with aiohttp.ClientSession() as sess:
  31. async with sess.get('http://python.org') as resp:
  32. async with sess.post('http://httpbin.org/post', data=resp.content) as r:
  33. print(r.status)
  34. print(await r.text())

设置请求超时

有时候,我们向服务器发送请求,若没有设置超时时间,此请求就会一直阻塞直到系统报错,这对于我们的系统是无法容忍的,所以发请求的时候千万要记得加上超时时间。

  1. import aiohttp
  2. timeout = aiohttp.ClientTimeout(total=60)
  3. async def main():
  4. async with aiohttp.ClientSession(timeout=timeout) as sess:
  5. async with sess.get('http://httpbin.org/get') as resp:
  6. print(resp.status)
  7. print(await resp.text())

aoihttp 爬虫核心功能

自定义cookie

  1. import aiohttp
  2. import asyncio
  3. cookies = {'cookies_are': 'working'}
  4. async def main():
  5. async with aiohttp.ClientSession(cookies=cookies) as session:
  6. async with session.get('http://httpbin.org/cookies') as resp:
  7. print(resp.status)
  8. print(await resp.text())
  9. assert await resp.json() == {"cookies": {"cookies_are": "working"}}
  10. if __name__ == "__main__":
  11. event_loop = asyncio.get_event_loop()
  12. result = event_loop.run_until_complete(asyncio.gather(main()))
  13. event_loop.close()

在多个请求之间共享cookie

  1. import aiohttp
  2. import asyncio
  3. async def main():
  4. async with aiohttp.ClientSession() as session:
  5. await session.get('http://httpbin.org/cookies/set?my_cookie=my_value')
  6. filtered = session.cookie_jar.filter_cookies('http://httpbin.org')
  7. print(filtered)
  8. assert filtered['my_cookie'].value == 'my_value'
  9. async with session.get('http://httpbin.org/cookies') as r:
  10. json_body = await r.json()
  11. print(json_body)
  12. assert json_body['cookies']['my_cookie'] == 'my_value'
  13. if __name__ == "__main__":
  14. event_loop = asyncio.get_event_loop()
  15. result = event_loop.run_until_complete(asyncio.gather(main()))
  16. event_loop.close()

Cookie 的安全性问题: 默认 ClientSession 使用的是严格模式的 aiohttp.CookieJar. RFC 2109,明确的禁止接受url和ip地址产生的 cookie,只能接受 DNS 解析IP产生的cookie。
可以通过设置 aiohttp.CookieJar 的 unsafe=True 来配置

  1. jar = aiohttp.CookieJar(unsafe=True)
  2. session = aiohttp.ClientSession(cookie_jar=jar)

使用虚假Cookie Jar: 有时不想处理cookie。这时可以在会话中使用aiohttp.DummyCookieJar来达到目的。

  1. jar = aiohttp.DummyCookieJar()
  2. session = aiohttp.ClientSession(cookie_jar=jar)

自定义请求头

  1. import aiohttp
  2. import asyncio
  3. async with aiohttp.ClientSession(headers={'User-Agent': 'your agent'
  4. "refer":"http://httpbin.org"}) as session:
  5. async with session.get('http://httpbin.org/headers') as resp:
  6. print(resp.status)
  7. print(await resp.text())

SSL验证警告问题

默认情况下,aiohttp对HTTPS协议使用严格检查,如果你不想上传SSL证书,可将ssl设置为False。

  1. r = await session.get('https://example.com', ssl=False)

代理问题AgentIP

  1. # 第一种
  2. async with aiohttp.ClientSession() as session:
  3. proxy_auth = aiohttp.BasicAuth('user', 'pass')
  4. async with session.get("http://python.org", proxy="http://proxy.com", proxy_auth=proxy_auth) as resp:
  5. print(resp.status)
  6. # 第二种
  7. session.get("http://python.org", proxy="http://user:pass@some.proxy.com")

aoihttp 连接池

1.使用连接器

想要调整请求的传输层你可以为ClientSession及其同类组件传递自定义的连接器。例如:
conn = aiohttp.TCPConnector() session = aiohttp.ClientSession(connector=conn)

注:不要给多个会话对象使用同一个连接器,某一会话对象拥有其所有权。

2.限制连接池的容量

限制同一时间打开的连接数可以传递limit参数:
conn = aiohttp.TCPConnector(limit=30)

这样就将总数限制在30,默认情况下是100.如果你不想有限制,传递0即可:
conn = aiohttp.TCPConnector(limit=0)

小结:

爬虫常用的功能单独来写,主要是 aiohttp 还有一个问题没有解决,通过阅读源码确实是无法很好解决这个问题,在网上搜索了大半天基本没有有效的解决方案,so 笔者会给出一个自己找到的解决方案,在接下来的文章中我会进行分享。