目的:在爬虫中使用异步实现高性能的数据爬取操作。

get方法是一个阻塞的方法

异步爬虫的方式

1.多线程,多进程(不建议):

  1. 好处:可以为相关阻塞的操作单独开启线程或者进程,阻塞操作就可以异步执行。<br /> 弊端:无法无限制的开启多线程或者多进程。

2.线程池、进程池(适当的使用):

  1. 好处:我们可以降低系统对 进程或者线程 创建和销毁的一个频率,从而很好的降低系统的开销。<br /> 弊端:池中 线程或进程 的数量是有上限。
  1. import time
  2. #导入线程池模块对应的类
  3. from multiprocessing.dummy import Pool
  4. #使用线程池方式执行
  5. start_time = time.time()
  6. def get_page(str):
  7. print("正在下载 :",str)
  8. time.sleep(2)
  9. print('下载成功:',str)
  10. name_list =['xiaozi','aa','bb','cc']
  11. #实例化一个线程池对象
  12. pool = Pool(4)
  13. #将列表中每一个列表元素传递给get_page进行处理。
  14. pool.map(get_page,name_list)
  15. pool.close()
  16. pool.join()
  17. end_time = time.time()
  18. print(end_time-start_time)

实战

原则:线程池处理的是阻塞且较为耗时的操作

有些数据是动态加载的,XHR里没有就是不通过ajax请求到的,是动态加载的

xpath和BS4只能定位标签,不能定位对应的js代码

用正则去提取

import requests
from lxml import etree
import re
from multiprocessing.dummy import Pool
#需求:爬取梨视频的视频数据
headers = {
    'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36'
}
#原则:线程池处理的是阻塞且较为耗时的操作

#对下述url发起请求解析出视频详情页的url和视频的名称
url = 'https://www.pearvideo.com/category_5'
page_text = requests.get(url=url,headers=headers).text

tree = etree.HTML(page_text)
li_list = tree.xpath('//ul[@id="listvideoListUl"]/li')
urls = [] #存储所有视频的链接and名字
for li in li_list:
    detail_url = 'https://www.pearvideo.com/'+li.xpath('./div/a/@href')[0]
    name = li.xpath('./div/a/div[2]/text()')[0]+'.mp4'
    #对详情页的url发起请求
    detail_page_text = requests.get(url=detail_url,headers=headers).text
    #从详情页中解析出视频的地址(url)
    ex = 'srcUrl="(.*?)",vdoUrl'
    video_url = re.findall(ex,detail_page_text)[0]
    dic = {
        'name':name,
        'url':video_url
    }
    urls.append(dic)
#对视频链接发起请求获取视频的二进制数据,然后将视频数据进行返回
def get_video_data(dic):
    url = dic['url']
    print(dic['name'],'正在下载......')
    data = requests.get(url=url,headers=headers).content
    #持久化存储操作
    with open(dic['name'],'wb') as fp:
        fp.write(data)
        print(dic['name'],'下载成功!')
#使用线程池对视频数据进行请求(较为耗时的阻塞操作)
pool = Pool(4)
pool.map(get_video_data,urls)

pool.close()
pool.join()

3.单线程+异步协程(推荐):

协程 补充 基于python3.6

event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足某些条件的时候,函数就会被循环执行。

coroutine:协程对象,我们可以将协程对象注册到事件循环中,它会被事件循环调用。<br />   <br />我们可以使用 async 关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。

task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。

future:代表将来执行或还没有执行的任务,实际上和 task 没有本质区别。

async:定义一个协程.

await:用来挂起阻塞方法的执行。

实现协程

pending,待定状态。

#!/usr/bin/env python 
# -*- coding:utf-8 -*-
# @Time     : 2022/4/11 19:44
# @Author   : hq

import asyncio


async def request(url):
    print("正在请求的url是", url)
    print("请求成功", url)
    return url


# async修饰的函数,调用之后返回的一个协程对象

c = request('www.baidu.com')


# # 创建一个事件循环对象
# loop = asyncio.get_event_loop()
#
# # 将协程对象注册到loop中,然后启动loop
# loop.run_until_complete(c)  # 包括了2部分

# # task的使用
# loop = asyncio.get_event_loop()
# # 基于loop创建了一个task对象
# task = loop.create_task(c)
# print(task)
#
# loop.run_until_complete(task)
# print(task)


# future的使用
# loop = asyncio.get_event_loop()
# task = asyncio.ensure_future(c)
# print(task)
# loop.run_until_complete(task)
# print(task)


def callback_func(task):
    # resultf返回的就是任务对象中封装的协程对象对应函数的返回值
    print(task.result())  # 可以用协程对象的一个result()方法,返回一个返回值


# 绑定回调,回调函数得绑定
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(c)  # 将协程对象注册到任务中
# 将回调函数绑定到任务中,任务执行完后,作为回调函数
task.add_done_callback(callback_func)  # 将任务对象传递回去了
loop.run_until_complete(task)

多任务异步协程

多个任务的协程,是不是异步的。

在异步协程中,如果出现了同步模块相关的代码,那么就无法实现异步
# time.sleep(2)

当在asyncio中遇到阻塞操作必须进行手动挂起

#!/usr/bin/env python 
# -*- coding:utf-8 -*-
# @Time     : 2022/4/11 20:15
# @Author   : hq
import asyncio
import time
from pyxllib.xl import TicToc, Timer


async def request(url):
    print('正在下载', url)
    # 在异步协程中,如果出现了同步模块相关的代码,那么就无法实现异步
    # time.sleep(2)

    # 当在asyncio中遇到阻塞操作必须进行手动挂起
    await asyncio.sleep(2)
    print('下载成功', url)


tt = TicToc(title='pachong')
tt.tic()

urls = [
    'www.baidu.com',
    'www.sougou.com',
    'www.doubanjiang.com'
]

# 任务列表:存放多个任务对象
tasks = []

for url in urls:
    c = request(url)  # 协程对象
    task = asyncio.ensure_future(c)  # 任务对象
    tasks.append(task)

# 创建事件循环
loop = asyncio.get_event_loop()
# 固定写法,必须封装在wait方法中
# 需要将任务列表封装到wait中
loop.run_until_complete(asyncio.wait(tasks))

# 执行程序

tt.toc('jieshu')

进程可以分成一个或多个线程,线程可以分成一个或者多个协程

from flask import Flask
import time

app = Flask(__name__)


@app.route('/bobo')
def index_bobo():
    time.sleep(2)
    return 'Hello bobo'

@app.route('/jay')
def index_jay():
    time.sleep(2)
    return 'Hello jay'

@app.route('/tom')
def index_tom():
    time.sleep(2)
    return 'Hello tom'

if __name__ == '__main__':
    app.run(threaded=True)
import requests
import asyncio
import time

start = time.time()
urls = [
    'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom'
]


async def get_page(url):
    print('正在下载', url)
    # requests.get是基于同步,必须使用基于异步的网络请求模块进行指定url的请求发送
    # requests不支持异步
    # aiohttp:基于异步网络请求的模块
    response = requests.get(url=url)
    print('下载完毕:', response.text)


tasks = []

for url in urls:
    c = get_page(url)
    task = asyncio.ensure_future(c)
    tasks.append(task)

loop = asyncio.get_event_loop()
# 固定写法
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()

print('总耗时:', end - start)

aiohttp模块

await session.get(url) 也需要挂起。
注意:获取响应数据操作之前一定要使用await进行手动挂起

# 环境安装:pip install aiohttp
# 使用该模块中的ClientSession
import requests
import asyncio
import time
import aiohttp

start = time.time()
# urls = [
#     'http://127.0.0.1:5000/bobo','http://127.0.0.1:5000/jay','http://127.0.0.1:5000/tom',
#     'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom',
#     'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom',
#     'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom',
#
# ]
from multiprocessing.dummy import Pool

pool = Pool(2)

urls = []
for i in range(10):
    urls.append('http://127.0.0.1:5000/bobo')
print(urls)


async def get_page(url):
    # session对象
    async with aiohttp.ClientSession() as session:
        # get()、post():
        # headers,params/data,proxy='http://ip:port'
        # 阻塞,耗时,需要手动挂起
        async with await session.get(url) as response:
            # text()返回字符串形式的响应数据
            # read()返回的二进制形式的响应数据
            # json()返回的就是json对象
            # 注意:获取响应数据操作之前一定要使用await进行手动挂起
            page_text = await response.text()
            print(page_text)


tasks = []

for url in urls:
    c = get_page(url)
    task = asyncio.ensure_future(c)
    tasks.append(task)

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

end = time.time()

print('总耗时:', end - start)