代码实现
说明todo
import multiprocessing as mp
import os
import re
import time
import pandas as pd
import requests
from bs4 import BeautifulSoup
headers = {
'cookie': 'ip_ck=58aI7/31j7QuMDE5ODcyLjE2MaTA5NTU2NTI%3D; z_pro_city=s_provice%3Dzhejiang%26s_city%3Dningbo; userProvinceId=26; userCityId=154; userCountyId=929; userLocationId=12374; realLocationId=12374; userFidLocationId=12374; dwhis=%22s%22%3A%2228%22%2C%22m%22%3A%22125%22%2C%22p%22%3A%221296179%22; lv=1614734211; vn=11; visited_subcateId=6|0|443|529|28; visited_subcateProId=6-0|0-0|443-0|529-0|28-0,1289592|57-0; visited_serachKw=RTX%20%203090%7CRTX%20%203090/robots.txt%7CRTX%203090%7CGT%20210%7C210%7CRTX%203080; listSubcateId=6; Adshow=4; questionnaire_pv=1614729617',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.182 Safari/537.36 Edg/88.0.705.81'
}
base_url = "https://detail.zol.com.cn"
pattern = re.compile(u'.+[0-9]+.+')
products_A = {
"hdd": "https://detail.zol.com.cn/hard_drives/s5994/", # 机械硬盘
"ssd": "https://detail.zol.com.cn/solid_state_drive/s8155/", # 固态硬盘
"motherboard": "https://detail.zol.com.cn/motherboard/", # 主板
"power": "https://detail.zol.com.cn/power/", # 电源
"memory": "https://detail.zol.com.cn/memory/s5974/zhejiang/", # 内存
"case": "https://detail.zol.com.cn/case/", # 机箱
"lcd": "https://detail.zol.com.cn/lcd/", # 液晶显示器
}
products_B = {
"vga": "https://detail.zol.com.cn/vga/", # 显卡
"cooling_product": "https://detail.zol.com.cn/cooling_product/", # 散热器
}
def get_all_urls(base_page: str) -> list:
url_list = []
url_pattern = base_page + "{0}.html"
index = 0
while True:
index += 1
cur_page = url_pattern.format(index)
response = requests.get(cur_page, headers=headers)
soup = BeautifulSoup(response.text, features="lxml")
items = soup.find("div", attrs={"class": "content"}).find("div", attrs={"class": "pic-mode-box"}). \
find("ul").find_all("li")
for item in items:
try:
url_list.append(base_url + item.find("a").get("href"))
except:
continue
end_flag = soup.find("div", attrs={"class": "history-tips"}).get_text()
if pattern.match(end_flag) is not None:
break
return url_list
class MyCrawler:
def __init__(self, url_list: list, type: str):
self.type = type
self.workerNum = mp.cpu_count()
self.urlQueue = mp.Manager().Queue()
self.resultList = mp.Manager().list()
for url in url_list:
self.urlQueue.put(url)
def job(self, index: int):
Process_id = 'Process-' + str(index)
while not self.urlQueue.empty():
url = self.urlQueue.get()
try:
start = time.time()
map = {"link": url}
response = requests.get(map["link"], headers=headers)
soup = BeautifulSoup(response.text, features="lxml")
map["name"] = soup.find("h1").get_text()
map["price"] = soup.find("b", attrs={"class": "price-type"}).get_text()
if self.type == "A":
section = soup.find("div", attrs={"class": "section"}).find("div",
attrs={"class": "section-content"})
params = section.find_all("p")
for param in params:
key = param.find("span").get_text()[:-1].replace("\n", " ").replace("\r", " ")
value = str(param.contents[1]).replace("\n", " ").replace("\r", " ")
map[key] = value
else:
section = soup.find("div", attrs={"class": "section", "id": "proParamSection"}).find("div", attrs={
"class": "section-content"})
params = section.find_all("tr")
for param in params:
tds = param.find_all("td")
key = str(tds[0].get_text()).replace("\n", " ").replace("\r", " ")
value = str(tds[1].get_text()).replace("\n", " ").replace("\r", " ")
map[key] = value
print("单次耗时:{0}".format(time.time() - start))
print(map)
self.resultList.append(map)
# self.resultList.append(map)
except Exception as e:
print(Process_id, self.urlQueue.qsize(), url, "Error", e)
def run(self):
start = time.time()
N = mp.cpu_count()
pool = mp.Pool(processes=N)
for i in range(N):
pool.apply_async(self.job, args=(i,))
print('Started process')
pool.close()
pool.join()
end = time.time()
print('Pool + Queue :', end - start)
print('Main process Ended!')
if __name__ == "__main__":
for key, value in products_A.items():
# 判断是否已经爬过了
file_path = "../load/{0}.csv".format(key)
is_exist = os.path.exists(file_path)
if is_exist:
print("{0}爬过了".format(key))
continue
# 获取所有产品的url
url_list = get_all_urls(value)
print("产品数量:{0}".format(len(url_list)))
crawler = MyCrawler(url_list, "A")
crawler.run()
# print("list:\n", crawler.resultList)
native_list = crawler.resultList[:]
df = pd.DataFrame(native_list)
df.to_csv(file_path)
for key, value in products_B.items():
# 判断是否已经爬过了
file_path = "../load/{0}.csv".format(key)
is_exist = os.path.exists(file_path)
if is_exist:
print("{0}爬过了".format(key))
continue
# 获取所有产品的url
url_list = get_all_urls(value)
print("产品数量:{0}".format(len(url_list)))
crawler = MyCrawler(url_list, "B")
crawler.run()
# print("list:\n", crawler.resultList)
native_list = crawler.resultList[:]
df = pd.DataFrame(native_list)
df.to_csv(file_path)