1.多组件的pipline技术架构

  • 背景
    • 很多事都不会一次性完成,都会分成很多中间步骤一步步完成

image.png

2. 生产者、消费者爬虫架构

image.png

3. 多线程数据通信

image.png

4.代码编写实现生产者消费者爬虫

  1. # -*- coding: utf-8 -*-
  2. # 生产者、消费者
  3. from queue import Queue
  4. import threading
  5. import random, time
  6. class Producer(threading.Thread):
  7. def __init__(self, q, name):
  8. super(Producer, self).__init__()
  9. self.q = q
  10. self.name = name
  11. print(self.name + ":Producer-init 完成: ")
  12. def run(self):
  13. while True:
  14. time.sleep(random.randint(6, 10))
  15. if self.q.full(): # 队列满
  16. print(self.name + ':Producer-Queue is full')
  17. else:
  18. value = random.randint(0, 10)
  19. print(self.name+': put value:' + str(value) + '-into queue')
  20. self.q.put((self.name + ":" + str(value))) # 放入队列
  21. class Consumer(threading.Thread):
  22. def __init__(self, q, name):
  23. super(Consumer, self).__init__()
  24. self.q = q
  25. self.name = name
  26. print(self.name + ":Consumer-init 完成:")
  27. def run(self):
  28. while True:
  29. time.sleep(random.randint(6, 10))
  30. if self.q.empty():
  31. # with write_lock:
  32. print(self.name + ':Consumer-Queue-empty')
  33. else:
  34. value = self.q.get()
  35. print(self.name + ':Consumer:取值消费:get Queue-' + str(value))
  36. if __name__ == "__main__":
  37. q = Queue(10)
  38. p1 = Producer(q, 'P1')
  39. p2 = Producer(q, 'P2')
  40. p1.start()
  41. p2.start()
  42. c1 = Consumer(q, 'C1')
  43. # c2 = Consumer(q, 'C2')
  44. # c3 = Consumer(q, 'C3')
  45. c1.start()
  46. # c2.start()
  47. # c3.start()

5.多线程锁实现

  • 一个线程处理自己的http请求,不需要加锁。但是如果你线程会有全局操作,比如全局变量的统计计数等就需要加锁 ```

    -- coding: utf-8 --

    import threading import time from decimal import Decimal

condition = threading.Condition() num = 0 box_size = 15

class GoodsProduce(threading.Thread): def init(self, companyname, producespeed, info): super(GoodsProduce, self).__init() self.companyName = company_name self.produceSpeed = Decimal(2 / produce_speed).quantize(Decimal(‘0.00’)) self.info = info

  1. def run(self):
  2. global num
  3. while True:
  4. if condition.acquire(): # 获取锁对象
  5. if num < box_size: # 商品小于最大数量
  6. time.sleep(self.produceSpeed)
  7. num += 1
  8. print("GoodsProduce : {} Produce one , 现有数量 :{}".format(self.companyName, num))
  9. # condition.notify() # 通知消费者
  10. condition.notifyAll() # 通知消费者
  11. condition.release() # 释放锁对象
  12. else:
  13. print("NOTE: BOX is full , size -{} ,生产完成后数量: - {}".format(box_size, num))
  14. condition.wait() # 线程挂起
  15. def show(self):
  16. print("show companyName -- {} ,produceSpeed -- {}, info -- {}".format(self.companyName, self.produceSpeed, self.info))

class GoodsConsume(threading.Thread): def init(self, cname, area, info): super(GoodsConsume, self).init() self.cname = cname self.area = area self.info = info

  1. def run(self):
  2. global num
  3. while True:
  4. if condition.acquire(): # 获取锁对象
  5. if num >= 1:
  6. num -= 1
  7. print("GoodsConsumer {} Consume one , 现有数量:{}".format(self.cname, num))
  8. # condition.notify() # 通知生产者
  9. condition.notifyAll() # 通知生产者
  10. condition.release() # 释放锁对象
  11. else:
  12. print("NOTE: BOX is null ,please wait ... size {} ,消费完后数量: {}".format(box_size, num))
  13. time.sleep(1)
  14. condition.wait() # 线程挂起
  15. time.sleep(1)
  16. def show(self):
  17. print("show GoodsConsume {} area -- {} ,info -- {}".format(self.cname, self.area, self.info))

if name == “main“: produce_0 = GoodsProduce(“Prd-{}”.format(0), 1, “this is {} prd company”.format(0)) produce_1 = GoodsProduce(“Prd-{}”.format(1), 2, “this is {} prd company”.format(1)) produce_0.start() produce_1.start() produce_0.show() produce_1.show()

  1. customer_0 = GoodsConsume("cus-{}".format(0), "area-{}".format(0), "this is {} customer".format(0))
  2. customer_1 = GoodsConsume("cus-{}".format(1), "area-{}".format(1), "this is {} customer".format(1))
  3. customer_2 = GoodsConsume("cus-{}".format(2), "area-{}".format(2), "this is {} customer".format(2))
  4. customer_3 = GoodsConsume("cus-{}".format(3), "area-{}".format(3), "this is {} customer".format(3))
  5. customer_0.start()
  6. customer_0.show()
  7. customer_1.start()
  8. customer_1.show()
  9. customer_2.start()
  10. customer_2.show()
  11. customer_3.start()
  12. customer_3.show()

```