课程地址

1.python并发编程简介

1.1为什么要引入并发编程

一句话,提高速度

场景1:一个网络爬虫,按顺序爬取花了1小时,采用并发下载减少到20分钟

场景2:一个APP应用,优化前每次打开页面需要3秒,采用异步并发提升到每次200毫秒

1.2有哪些程序提速的方法

image.png

1.3Python对并发编程的支持

多线程:threading,利用CPU和IO可以同时执行的原理,让CPU不会干巴巴等待IO完成
多进程:multiprocessing,利用多核CPU的能力,真正的并行执行任务
异步IO:asyncio,在单线程利用CPU和IO同时执行的原理,实现函数异步执行

使用Lock对资源加锁,防止冲突访问
使用Queue实现不同线程/进程之间的数据通信,实现生产者-消费者模式
使用线程池Pool/进程池Pool,简化线程/进程的任务提交、等待结束、获取结果
使用subprocess启动外部程序的进程,并进行输入输出交互

2.怎样选择多线程多进程多协程

2.1什么是CPU密集型计算、IO密集型计算

CPU密集型(CPU-bound)

CPU密集型也叫计算密集型,是指I/O在很短的时间就可以完成,CPU需要大量的计算和处理,特点是CPU占用率相当高

例如:压缩解压缩、加密解密、正则表达式搜索

IO密集型(I/O bound)

IO密集型指的是系统运作大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作CPU占用率仍然较低

例如:文件处理程序、网络爬虫程序、读写数据库程序

2.2多线程、多进程、多协程的对比

image.png

2.3怎样根据任务选择对应技术

image.png

3.Python速度慢的罪魁祸首,全局解释器锁GIL

3.1Python速度慢的两大原因

  1. 动态类型语言边解释边执行
  2. GIL 无法利用多核CPU并发执行

3.2GIL是什么

image.png

3.3为什么有GIL这个东西

image.png

3.4怎样规避GIL带来的限制

image.png

4.使用多线程,Python爬虫被加速10倍

4.1基础知识

image.png

4,2代码实现

  1. import requests
  2. from bs4 import BeautifulSoup
  3. urls = [
  4. f"https://www.cnblogs.com/sitehome/p/{page}" //列表生成式
  5. for page in range(1, 50 + 1)
  6. ]
  7. def craw(url): //发起网络请求
  8. #print("craw url: ", url)
  9. r = requests.get(url)
  10. return r.text
  11. def parse(html): //数据提取,我会选择使用Xpath,但他使用的BeautifulSoup
  12. # class="post-item-title"
  13. soup = BeautifulSoup(html, "html.parser")
  14. links = soup.find_all("a", class_="post-item-title")
  15. return [(link["href"], link.get_text()) for link in links]
  16. if __name__ == "__main__":
  17. for result in parse(craw(urls[2])):
  18. print(result)
  1. import blog_spider
  2. import threading
  3. import time
  4. def single_thread(): //单线程
  5. print("single_thread begin")
  6. for url in blog_spider.urls:
  7. blog_spider.craw(url)
  8. print("single_thread end")
  9. def multi_thread(): //多线程
  10. print("multi_thread begin")
  11. threads = []
  12. for url in blog_spider.urls:
  13. threads.append(
  14. threading.Thread(target=blog_spider.craw, args=(url,))
  15. )
  16. for thread in threads:
  17. thread.start()
  18. for thread in threads:
  19. thread.join()
  20. print("multi_thread end")
  21. if __name__ == "__main__":
  22. start = time.time()
  23. single_thread()
  24. end = time.time()
  25. print("single thread cost:", end - start, "seconds")
  26. start = time.time()
  27. multi_thread()
  28. end = time.time()
  29. print("multi thread cost:", end - start, "seconds")

4,3自学知识补充

很多同学到这一节勉强能够看懂一些,但其实你根本没懂,强行进入下一节,你只会选择放弃,但你无法真正放弃,只是换个视频学习而已

理解不了不是你的问题,你需要足够的知识作为补充,利用文档进行学习尤为必要,与其要求别人,不如来要求自己,因为后者我有绝对的主动权

接下来进行多线程的学习

多线程简介

Python通过两个标准库thread和threading提供对线程的支持。thread提供了低级别的、原始的线程以及一个简单的锁 所以我们只需要学习threading就行

线程模块threading

image.png

使用Threading模块创建线程

  1. import threading
  2. import time
  3. def process(): //这个函数很容易理解,就是分别执行输入三次
  4. for i in range(3):
  5. time.sleep(1)
  6. print("thread name is %s\n"% threading.current_thread().name)
  7. if __name__ == '__main__':
  8. print("----主线程开始----")
  9. threads = [threading.Thread(target=process) for i in range(4)] #创建四个线程,存入列表
  10. for i in threads:
  11. i.start() #启动线程
  12. for i in threads:
  13. i.join() #等待子线程结束
  14. print("----主线程结束----")

image.png
image.png

简单来说这几行代码的意思就是,创建了4个线程,然后分别用for循环执行4次start()和join()方法。每个子线程分析执行输出3次

里面关键点是 threading.Thread

image.png

image.png

image.png

5.Python实现生产者消费者爬虫

multiProcessing Queue队列可以实现线程间通信。使用Quene队列在线程间通信通常应用于生产者消费者模式

生产者:产生数据的模块
消费者:处理数据点模块

再生产者与消费者之间的缓冲区称之为仓库。生产者负责往仓库运输商品,而消费者负责从仓库里取出商品,这就构成了生产者消费者模式

生产者消费者爬虫的架构

image.png

多线程数据通信的queue.Queue

image.png

  1. Queue.qsize(): 返回queue的近似值。注意:qsize>0 不保证(get)取元素不阻塞。qsize<maxsize不保证(put)存元素不会阻塞
  2. Queue.empty(): 判断队列是否为空。和上面一样注意
  3. Queue.full(): 判断是否满了。和上面一样注意
  4. Queue.put(item, block=True, timeout=None): 往队列里放数据。如果满了的话,blocking = False 直接报 Full异常。如果blocking = True,就是等一会,timeout必须为 0 或正数。None为一直等下去,0为不等,正数n为等待n秒还不能存入,报Full异常。
  5. Queue.put_nowait(item): 往队列里存放元素,不等待
  6. Queue.get(item, block=True, timeout=None): 从队列里取数据。如果为空的话,blocking = False 直接报 empty异常。如果blocking = True,就是等一会,timeout必须为 0 或正数。None为一直等下去,0为不等,正数n为等待n秒还不能读取,报empty异常
  7. Queue.get_nowait(item): 从队列里取元素,不等待两个方法跟踪入队的任务是否被消费者daemon进程完全消费
  8. Queue.task_done(): 表示队列中某个元素呗消费进程使用,消费结束发送的信息。每个get()方法会拿到一个任务,其随后调用task_done()表示这个队列,这个队列的线程的任务完成。就是发送消息,告诉完成啦!
  9. 如果当前的join()当前处于阻塞状态,当前的所有元素执行后都会重启(意味着收到加入queue的每一个对象task_done()调用的信息)
  10. 如果调用的次数操作放入队列的items的个数多的话,会触发ValueError异常
  11. Queue.join(): 一直阻塞直到队列中的所有元素都被取出和执行未完成的个数,只要有元素添加到queue中就会增加。未完成的个数,只要消费者线程调用task_done()表明其被取走,其调用结束。当未完成任务的计数等于0join()就会不阻塞

代码示例

  1. from multiprocessing import Queue
  2. if __name__ == '__main__':
  3. q = Queue(3) #初始化Quene,括号内填写num值为可接受消息数量,如果num值没有指定或者为负值就代表消息数量没有上限
  4. q.put("消息1")
  5. q.put("消息2")
  6. print(q.full()) #判断是否满了,这里返回False
  7. q.put("消息3") //put(item, block=True, timeout=None): item代表数据,blocking = False 直接报 Full异常。如果blocking = True,就是等一会,timeout必须为 0 或正数。None为一直等下去,0为不等,正数n为等待n秒还不能存入,报Full异常。
  8. print(q.full()) # 判断是否满了,这里返回True
  9. try:
  10. q.put("消息4",True,2)
  11. except:
  12. print("消息队列已满,现有消息数量:%s"%q.qsize()) //返回queue的近似值,即元素数量
  13. try:
  14. q.put_nowait("消息4") //put_nowait(item): 往队列里存放元素,不等待
  15. except:
  16. print("消息队列已满,现有消息数量:%s" % q.qsize()) //返回queue的近似值,即元素数量
  17. if not q.empty():
  18. print("-----从队列中获取消息------")
  19. for i in range(q.qsize()):
  20. print(q.get_nowait())
  21. if not q.full():
  22. q.put_nowait("消息4") //get_nowait(item): 从队列里取元素,不等待两个方法跟踪入队的任务是否被消费者daemon进程完全消费

image.png

  1. from multiprocessing import Process,Queue
  2. import time
  3. #向队列中写入数据
  4. def write_task(q):
  5. if not q.full():
  6. for i in range(5):
  7. message = "消息"+str(i)
  8. q.put(message)
  9. print("写入:%s"%message)
  10. def read_task(q):
  11. time.sleep(1)
  12. while not q.empty():
  13. print("读取:%s"% q.get(True,2))
  14. if __name__ == '__main__':
  15. print("---父进程开始-----")
  16. q = Queue()
  17. pw = Process(target=write_task,args=(q,))
  18. pr = Process(target=read_task,args=(q,))
  19. pw.start()
  20. pr.start()
  21. pw.join()
  22. pr.join()
  23. print("---父进程结束-----")

image.png

6.Python线程安全问题以及解决方案

6.1线程安全概念介绍

image.png

6.2Lock 用于解决线程安全问题

image.png

with模式

  1. import threading
  2. import time
  3. lock = threading.Lock() #创建锁
  4. class Account:
  5. def __init__(self, balance):
  6. self.balance = balance
  7. def draw(account, amount):
  8. with lock: #将存有问题的代码放在锁里面
  9. if account.balance >= amount:
  10. time.sleep(0.1)
  11. print(threading.current_thread().name,
  12. "取钱成功")
  13. account.balance -= amount
  14. print(threading.current_thread().name,
  15. "余额", account.balance)
  16. else:
  17. print(threading.current_thread().name,
  18. "取钱失败,余额不足")
  19. if __name__ == "__main__":
  20. account = Account(1000)
  21. ta = threading.Thread(name="ta", target=draw, args=(account, 800))
  22. tb = threading.Thread(name="tb", target=draw, args=(account, 800))
  23. ta.start()
  24. tb.start()

try-finally模式

这里使用多线程和互斥锁模拟实现多人同时订购电影票的功能,假设电影院某个场次只有100张电影票,10个用户同时抢购该电影票。每售出一张,显示一次剩余的电影票张数

global用法

  1. from threading import Thread,Lock
  2. import time
  3. n = 100
  4. def task():
  5. global n
  6. mutex.acquire() #上锁
  7. temp = n #赋值给临时变量
  8. time.sleep(0.1)
  9. n = temp - 1
  10. print("购买成功,剩余%d张电影票"%n)
  11. mutex.release() #释放锁
  12. if __name__ == '__main__':
  13. mutex = Lock() #实例化Lock类
  14. t_1 = [] #初始化一个列表
  15. for i in range(10):
  16. t = Thread(target=task) #实例化线程类
  17. t_1.append(t) #将线程实例化存入列表中
  18. t.start() #创建线程
  19. for t in t_1:
  20. t.join() #等待子线程结束

image.png

7.Python好用的线程池ThreadPoolExecutor

8.Python使用线程池在Web服务中实现加速

9.使用多进程multiprocessing模块加速程序的运行

9.1有了多线程threading,为什么还要用多进程multiprocessing

其实前面章节已经介绍清楚了

image.png

9.2多进程multiprocessing知识梳理

image.png

9.3代码实战:单线程、多线程、多进程对比CPU密集计算速度

10.Python在Flask服务中使用多进程池加速程序运行

11.Python异步IO实现并发爬虫

12.在异步IO中使用信号量控制爬虫并发度

13.Python使用subprocess播放歌曲解压文件