1.多组件的pipline技术架构
- 背景
- 很多事都不会一次性完成,都会分成很多中间步骤一步步完成
2. 生产者、消费者爬虫架构
3. 多线程数据通信
4.代码编写实现生产者消费者爬虫
# -*- coding: utf-8 -*-# 生产者、消费者from queue import Queueimport threadingimport random, timeclass Producer(threading.Thread):def __init__(self, q, name):super(Producer, self).__init__()self.q = qself.name = nameprint(self.name + ":Producer-init 完成: ")def run(self):while True:time.sleep(random.randint(6, 10))if self.q.full(): # 队列满print(self.name + ':Producer-Queue is full')else:value = random.randint(0, 10)print(self.name+': put value:' + str(value) + '-into queue')self.q.put((self.name + ":" + str(value))) # 放入队列class Consumer(threading.Thread):def __init__(self, q, name):super(Consumer, self).__init__()self.q = qself.name = nameprint(self.name + ":Consumer-init 完成:")def run(self):while True:time.sleep(random.randint(6, 10))if self.q.empty():# with write_lock:print(self.name + ':Consumer-Queue-empty')else:value = self.q.get()print(self.name + ':Consumer:取值消费:get Queue-' + str(value))if __name__ == "__main__":q = Queue(10)p1 = Producer(q, 'P1')p2 = Producer(q, 'P2')p1.start()p2.start()c1 = Consumer(q, 'C1')# c2 = Consumer(q, 'C2')# c3 = Consumer(q, 'C3')c1.start()# c2.start()# c3.start()
5.多线程锁实现
- 一个线程处理自己的http请求,不需要加锁。但是如果你线程会有全局操作,比如全局变量的统计计数等就需要加锁
```
-- coding: utf-8 --
import threading import time from decimal import Decimal
condition = threading.Condition() num = 0 box_size = 15
class GoodsProduce(threading.Thread): def init(self, companyname, producespeed, info): super(GoodsProduce, self).__init() self.companyName = company_name self.produceSpeed = Decimal(2 / produce_speed).quantize(Decimal(‘0.00’)) self.info = info
def run(self):global numwhile True:if condition.acquire(): # 获取锁对象if num < box_size: # 商品小于最大数量time.sleep(self.produceSpeed)num += 1print("GoodsProduce : {} Produce one , 现有数量 :{}".format(self.companyName, num))# condition.notify() # 通知消费者condition.notifyAll() # 通知消费者condition.release() # 释放锁对象else:print("NOTE: BOX is full , size -{} ,生产完成后数量: - {}".format(box_size, num))condition.wait() # 线程挂起def show(self):print("show companyName -- {} ,produceSpeed -- {}, info -- {}".format(self.companyName, self.produceSpeed, self.info))
class GoodsConsume(threading.Thread): def init(self, cname, area, info): super(GoodsConsume, self).init() self.cname = cname self.area = area self.info = info
def run(self):global numwhile True:if condition.acquire(): # 获取锁对象if num >= 1:num -= 1print("GoodsConsumer {} Consume one , 现有数量:{}".format(self.cname, num))# condition.notify() # 通知生产者condition.notifyAll() # 通知生产者condition.release() # 释放锁对象else:print("NOTE: BOX is null ,please wait ... size {} ,消费完后数量: {}".format(box_size, num))time.sleep(1)condition.wait() # 线程挂起time.sleep(1)def show(self):print("show GoodsConsume {} area -- {} ,info -- {}".format(self.cname, self.area, self.info))
if name == “main“: produce_0 = GoodsProduce(“Prd-{}”.format(0), 1, “this is {} prd company”.format(0)) produce_1 = GoodsProduce(“Prd-{}”.format(1), 2, “this is {} prd company”.format(1)) produce_0.start() produce_1.start() produce_0.show() produce_1.show()
customer_0 = GoodsConsume("cus-{}".format(0), "area-{}".format(0), "this is {} customer".format(0))customer_1 = GoodsConsume("cus-{}".format(1), "area-{}".format(1), "this is {} customer".format(1))customer_2 = GoodsConsume("cus-{}".format(2), "area-{}".format(2), "this is {} customer".format(2))customer_3 = GoodsConsume("cus-{}".format(3), "area-{}".format(3), "this is {} customer".format(3))customer_0.start()customer_0.show()customer_1.start()customer_1.show()customer_2.start()customer_2.show()customer_3.start()customer_3.show()
```
