一、说明
生产者消费者模式链接
二、代码
# -*- coding:utf-8 -*-
import requests
from lxml import etree
import threading
from queue import Queue
import os
class Producer_mulu(threading.Thread):
def __init__(self, mulu_queue, zhangjie_queue, *args, **kwargs):
super(Producer_mulu, self).__init__(*args, **kwargs)
self.mulu_queue = mulu_queue
self.zhangjie_queue = zhangjie_queue
def run(self):
while True:
if self.mulu_queue.empty():
print("所有目录path都已生产完成,生产者停止")
break
xuanhuan_url = self.mulu_queue.get()
self.get_mulu(xuanhuan_url)
def get_mulu(self, xuanhuan_url):
req_text = requests.get(xuanhuan_url).content.decode('utf-8')
text_html = etree.HTML(req_text)
file_name = text_html.xpath('//div[@id="newscontent"]/div[1]/ul/li/span[1]/a/text()')
mulu_url = text_html.xpath('//div[@id="newscontent"]/div[1]/ul/li/span[1]/a/@href')
mulu_path = list(zip(file_name, mulu_url))
for xiaoshuo_name, url in mulu_path:
# self.zhangjie_queue.put((xiaoshuo_name,url))
# print(self.zhangjie_queue.get())
self.zhangjie_queue.put(url)
class Producer_zhangjie(threading.Thread):
def __init__(self, mulu_queue, zhangjie_queue, content_queue, *args, **kwargs):
super(Producer_zhangjie, self).__init__(*args, **kwargs)
self.mulu_queue = mulu_queue
self.zhangjie_queue = zhangjie_queue
self.content_queue = content_queue
def run(self):
while True:
if self.mulu_queue.empty() and self.zhangjie_queue.empty():
print("所有章节目录都已生产完成,生产者停止")
break
mulu_url = self.zhangjie_queue.get()
self.get_zj_path(mulu_url)
# 获取章节path
def get_zj_path(self, mulu_url):
base_url = 'http://www.xbiquge.la/'
req_mulu = requests.get(mulu_url).content.decode('utf-8')
mulu_html = etree.HTML(req_mulu)
base_path_list = mulu_html.xpath("//div[@class='box_con'][2]/div[@id='list']/dl/dd/a/@href")
zhang_name_list = mulu_html.xpath("//div[@class='box_con'][2]/div[@id='list']/dl/dd/a/text()")
mulu_path_list = [base_url + x for x in base_path_list]
zj_path = list(zip(zhang_name_list, mulu_path_list))
for zhangjie_name, content_url in zj_path:
self.content_queue.put((zhangjie_name, content_url))
class Consumers_content(threading.Thread):
def __init__(self, mulu_queue, zhangjie_queue, content_queue, *args, **kwargs):
super(Consumers_content, self).__init__(*args, **kwargs)
self.mulu_queue = mulu_queue
self.zhangjie_queue = zhangjie_queue
self.content_queue = content_queue
def run(self):
while True:
if self.mulu_queue.empty() and self.zhangjie_queue.empty() and self.content_queue.empty():
break
zhangjie_name, content_url = self.content_queue.get()
self.get_text(content_url, zhangjie_name)
# 获取内容
def get_text(self, text_url, text_name):
req_text = requests.get(text_url).content.decode('utf-8')
text_html = etree.HTML(req_text)
xiaoshuo_name = text_html.xpath("//div[@class='box_con']/div[@class='con_top']/a[3]/text()")[0]
data = text_html.xpath("//div[@class='box_con']/div[@id='content']/text()")
data = (str(data).replace('\\xa0', '')).replace("\\r',", '').replace('\'', '').replace('”', '').replace('“', '')
dirs = '.\\xiaoshuo\\{}'.format(xiaoshuo_name)
if not os.path.exists(dirs):
os.makedirs(dirs)
with open(r'.\xiaoshuo\{0}\{1}'.format(xiaoshuo_name, text_name) + '.txt', 'w', encoding='utf-8') as f:
f.write(text_name)
f.write('\n')
f.write(data[1:-1])
f.write('\n')
print("《" + xiaoshuo_name + "》" + text_name + "下载成功")
if __name__ == '__main__':
mulu_Queue = Queue(1000000)
zhangjie_Queue = Queue(1000000)
content_Queue = Queue(1000000)
for i in range(2, 100):
xh_url = "http://www.xbiquge.la/fenlei/1_{}.html".format(i)
mulu_Queue.put(xh_url)
for i in range(10):
mulu = Producer_mulu(mulu_Queue, zhangjie_Queue)
mulu.start()
for i in range(10):
zhangjie = Producer_zhangjie(mulu_Queue, zhangjie_Queue, content_Queue)
zhangjie.start()
for i in range(100):
content = Consumers_content(mulu_Queue, zhangjie_Queue, content_Queue)
content.start()
三、总结
流程说明
使用threading.Thread创建两个生产者,一个消费者;
Producer_mulu生产玄幻小说分类下的所有小说“href”,
Consumers_content生产每个小说的章节名称和章节URL,
Consumers_content使用Consumers_content生产的内容生成小说内容并分类保存
zip的使用
mulu_path = list(zip(file_name, mulu_url))
for zhangjie_name, content_url in zj_path:
a = [1,2,3]
b = [4,5,6]
c = [4,5,6,7,8]
>>> zipped = zip(a,b) # 打包为元组的列表
[(1, 4), (2, 5), (3, 6)]
>>> zip(a,c) # 元素个数与最短的列表一致
[(1, 4), (2, 5), (3, 6)]
>>> zip(*zipped) # 与 zip 相反,*zipped 可理解为解压,返回二维矩阵式
[(1, 2, 3), (4, 5, 6)]
相关操作
a_list = ["a1", "a2", "a3"]
b_list = ["b1", "b2", "b3"]
c_list = ["c1", "c2", "c3"] # 元素个数与最短的列表一致
lis = list(map(list, zip(a_list, b_list, c_list)))
lisb = list(zip(a_list, b_list, c_list)) # 打包为元组的列表
new_list = []
for i in lis:
new_dict = {'A': i[0], 'B': i[1], 'C': i[2]}
new_list.append(new_dict)
print(lis)
print(lisb)
print(new_list)
for a, b, c in lis:
print(a, b, c)
lis [['a1', 'b1', 'c1'], ['a2', 'b2', 'c2'], ['a3', 'b3', 'c3']]
lisb [('a1', 'b1', 'c1'), ('a2', 'b2', 'c2'), ('a3', 'b3', 'c3')]
new_list [{'A': 'a1', 'B': 'b1', 'C': 'c1'}, {'A': 'a2', 'B': 'b2', 'C': 'c2'}, {'A': 'a3', 'B': 'b3', 'C': 'c3'}]
a1 b1 c1
a2 b2 c2
a3 b3 c3
lxml
req_mulu = requests.get(mulu_url).content.decode('utf-8')
mulu_html = etree.HTML(req_mulu)
base_path_list = mulu_html.xpath("//div[@class='box_con'][2]/div[@id='list']/dl/dd/a/@href")
Queue
for i in range(2, 100):
xh_url = "http://www.xbiquge.la/fenlei/1_{}.html".format(i)
mulu_Queue.put(xh_url)
for i in range(10):
mulu = Producer_mulu(mulu_Queue, zhangjie_Queue)
mulu.start()
for i in range(10):
zhangjie = Producer_zhangjie(mulu_Queue, zhangjie_Queue, content_Queue)
zhangjie.start()
for i in range(100):
content = Consumers_content(mulu_Queue, zhangjie_Queue, content_Queue)
content.start()
将xh_url即翻页url,循环后存入mulu_Queue队列中
mulu_Queue中的url在Producer_mulu中取出,并生成zhangjie_Queue队列;Producer_zhangjie生产content_Queue队列
Consumers_content中mulu_Queue,zhangjie_Queue,content_Queue全为空后,循环停止