1.python并发编程简介
1.1为什么要引入并发编程
一句话,提高速度
场景1:一个网络爬虫,按顺序爬取花了1小时,采用并发下载减少到20分钟
场景2:一个APP应用,优化前每次打开页面需要3秒,采用异步并发提升到每次200毫秒
1.2有哪些程序提速的方法

1.3Python对并发编程的支持
多线程:threading,利用CPU和IO可以同时执行的原理,让CPU不会干巴巴等待IO完成
多进程:multiprocessing,利用多核CPU的能力,真正的并行执行任务
异步IO:asyncio,在单线程利用CPU和IO同时执行的原理,实现函数异步执行
使用Lock对资源加锁,防止冲突访问
使用Queue实现不同线程/进程之间的数据通信,实现生产者-消费者模式
使用线程池Pool/进程池Pool,简化线程/进程的任务提交、等待结束、获取结果
使用subprocess启动外部程序的进程,并进行输入输出交互
2.怎样选择多线程多进程多协程
2.1什么是CPU密集型计算、IO密集型计算
CPU密集型(CPU-bound)
CPU密集型也叫计算密集型,是指I/O在很短的时间就可以完成,CPU需要大量的计算和处理,特点是CPU占用率相当高
例如:压缩解压缩、加密解密、正则表达式搜索
IO密集型(I/O bound)
IO密集型指的是系统运作大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,CPU占用率仍然较低
例如:文件处理程序、网络爬虫程序、读写数据库程序
2.2多线程、多进程、多协程的对比

2.3怎样根据任务选择对应技术

3.Python速度慢的罪魁祸首,全局解释器锁GIL
3.1Python速度慢的两大原因
- 动态类型语言边解释边执行
- GIL 无法利用多核CPU并发执行
3.2GIL是什么

3.3为什么有GIL这个东西

3.4怎样规避GIL带来的限制

4.使用多线程,Python爬虫被加速10倍
4.1基础知识

4,2代码实现
import requestsfrom bs4 import BeautifulSoupurls = [f"https://www.cnblogs.com/sitehome/p/{page}" //列表生成式for page in range(1, 50 + 1)]def craw(url): //发起网络请求#print("craw url: ", url)r = requests.get(url)return r.textdef parse(html): //数据提取,我会选择使用Xpath,但他使用的BeautifulSoup# class="post-item-title"soup = BeautifulSoup(html, "html.parser")links = soup.find_all("a", class_="post-item-title")return [(link["href"], link.get_text()) for link in links]if __name__ == "__main__":for result in parse(craw(urls[2])):print(result)
import blog_spiderimport threadingimport timedef single_thread(): //单线程print("single_thread begin")for url in blog_spider.urls:blog_spider.craw(url)print("single_thread end")def multi_thread(): //多线程print("multi_thread begin")threads = []for url in blog_spider.urls:threads.append(threading.Thread(target=blog_spider.craw, args=(url,)))for thread in threads:thread.start()for thread in threads:thread.join()print("multi_thread end")if __name__ == "__main__":start = time.time()single_thread()end = time.time()print("single thread cost:", end - start, "seconds")start = time.time()multi_thread()end = time.time()print("multi thread cost:", end - start, "seconds")
4,3自学知识补充
很多同学到这一节勉强能够看懂一些,但其实你根本没懂,强行进入下一节,你只会选择放弃,但你无法真正放弃,只是换个视频学习而已
理解不了不是你的问题,你需要足够的知识作为补充,利用文档进行学习尤为必要,与其要求别人,不如来要求自己,因为后者我有绝对的主动权
接下来进行多线程的学习
多线程简介
Python通过两个标准库thread和threading提供对线程的支持。thread提供了低级别的、原始的线程以及一个简单的锁 所以我们只需要学习threading就行
线程模块threading

使用Threading模块创建线程
import threadingimport timedef process(): //这个函数很容易理解,就是分别执行输入三次for i in range(3):time.sleep(1)print("thread name is %s\n"% threading.current_thread().name)if __name__ == '__main__':print("----主线程开始----")threads = [threading.Thread(target=process) for i in range(4)] #创建四个线程,存入列表for i in threads:i.start() #启动线程for i in threads:i.join() #等待子线程结束print("----主线程结束----")


简单来说这几行代码的意思就是,创建了4个线程,然后分别用for循环执行4次start()和join()方法。每个子线程分析执行输出3次
里面关键点是 threading.Thread



5.Python实现生产者消费者爬虫
multiProcessing Queue队列可以实现线程间通信。使用Quene队列在线程间通信通常应用于生产者消费者模式
生产者:产生数据的模块
消费者:处理数据点模块
再生产者与消费者之间的缓冲区称之为仓库。生产者负责往仓库运输商品,而消费者负责从仓库里取出商品,这就构成了生产者消费者模式
生产者消费者爬虫的架构

多线程数据通信的queue.Queue

Queue.qsize(): 返回queue的近似值。注意:qsize>0 不保证(get)取元素不阻塞。qsize<maxsize不保证(put)存元素不会阻塞Queue.empty(): 判断队列是否为空。和上面一样注意Queue.full(): 判断是否满了。和上面一样注意Queue.put(item, block=True, timeout=None): 往队列里放数据。如果满了的话,blocking = False 直接报 Full异常。如果blocking = True,就是等一会,timeout必须为 0 或正数。None为一直等下去,0为不等,正数n为等待n秒还不能存入,报Full异常。Queue.put_nowait(item): 往队列里存放元素,不等待Queue.get(item, block=True, timeout=None): 从队列里取数据。如果为空的话,blocking = False 直接报 empty异常。如果blocking = True,就是等一会,timeout必须为 0 或正数。None为一直等下去,0为不等,正数n为等待n秒还不能读取,报empty异常Queue.get_nowait(item): 从队列里取元素,不等待两个方法跟踪入队的任务是否被消费者daemon进程完全消费Queue.task_done(): 表示队列中某个元素呗消费进程使用,消费结束发送的信息。每个get()方法会拿到一个任务,其随后调用task_done()表示这个队列,这个队列的线程的任务完成。就是发送消息,告诉完成啦!如果当前的join()当前处于阻塞状态,当前的所有元素执行后都会重启(意味着收到加入queue的每一个对象task_done()调用的信息)如果调用的次数操作放入队列的items的个数多的话,会触发ValueError异常Queue.join(): 一直阻塞直到队列中的所有元素都被取出和执行未完成的个数,只要有元素添加到queue中就会增加。未完成的个数,只要消费者线程调用task_done()表明其被取走,其调用结束。当未完成任务的计数等于0,join()就会不阻塞
代码示例
from multiprocessing import Queueif __name__ == '__main__':q = Queue(3) #初始化Quene,括号内填写num值为可接受消息数量,如果num值没有指定或者为负值就代表消息数量没有上限q.put("消息1")q.put("消息2")print(q.full()) #判断是否满了,这里返回Falseq.put("消息3") //put(item, block=True, timeout=None): item代表数据,blocking = False 直接报 Full异常。如果blocking = True,就是等一会,timeout必须为 0 或正数。None为一直等下去,0为不等,正数n为等待n秒还不能存入,报Full异常。print(q.full()) # 判断是否满了,这里返回Truetry:q.put("消息4",True,2)except:print("消息队列已满,现有消息数量:%s"%q.qsize()) //返回queue的近似值,即元素数量try:q.put_nowait("消息4") //put_nowait(item): 往队列里存放元素,不等待except:print("消息队列已满,现有消息数量:%s" % q.qsize()) //返回queue的近似值,即元素数量if not q.empty():print("-----从队列中获取消息------")for i in range(q.qsize()):print(q.get_nowait())if not q.full():q.put_nowait("消息4") //get_nowait(item): 从队列里取元素,不等待两个方法跟踪入队的任务是否被消费者daemon进程完全消费

from multiprocessing import Process,Queueimport time#向队列中写入数据def write_task(q):if not q.full():for i in range(5):message = "消息"+str(i)q.put(message)print("写入:%s"%message)def read_task(q):time.sleep(1)while not q.empty():print("读取:%s"% q.get(True,2))if __name__ == '__main__':print("---父进程开始-----")q = Queue()pw = Process(target=write_task,args=(q,))pr = Process(target=read_task,args=(q,))pw.start()pr.start()pw.join()pr.join()print("---父进程结束-----")

6.Python线程安全问题以及解决方案
6.1线程安全概念介绍

6.2Lock 用于解决线程安全问题
with模式
import threadingimport timelock = threading.Lock() #创建锁class Account:def __init__(self, balance):self.balance = balancedef draw(account, amount):with lock: #将存有问题的代码放在锁里面if account.balance >= amount:time.sleep(0.1)print(threading.current_thread().name,"取钱成功")account.balance -= amountprint(threading.current_thread().name,"余额", account.balance)else:print(threading.current_thread().name,"取钱失败,余额不足")if __name__ == "__main__":account = Account(1000)ta = threading.Thread(name="ta", target=draw, args=(account, 800))tb = threading.Thread(name="tb", target=draw, args=(account, 800))ta.start()tb.start()
try-finally模式
这里使用多线程和互斥锁模拟实现多人同时订购电影票的功能,假设电影院某个场次只有100张电影票,10个用户同时抢购该电影票。每售出一张,显示一次剩余的电影票张数
from threading import Thread,Lockimport timen = 100def task():global nmutex.acquire() #上锁temp = n #赋值给临时变量time.sleep(0.1)n = temp - 1print("购买成功,剩余%d张电影票"%n)mutex.release() #释放锁if __name__ == '__main__':mutex = Lock() #实例化Lock类t_1 = [] #初始化一个列表for i in range(10):t = Thread(target=task) #实例化线程类t_1.append(t) #将线程实例化存入列表中t.start() #创建线程for t in t_1:t.join() #等待子线程结束

7.Python好用的线程池ThreadPoolExecutor
8.Python使用线程池在Web服务中实现加速
9.使用多进程multiprocessing模块加速程序的运行
9.1有了多线程threading,为什么还要用多进程multiprocessing
其实前面章节已经介绍清楚了

9.2多进程multiprocessing知识梳理

9.3代码实战:单线程、多线程、多进程对比CPU密集计算速度
10.Python在Flask服务中使用多进程池加速程序运行
11.Python异步IO实现并发爬虫
12.在异步IO中使用信号量控制爬虫并发度
13.Python使用subprocess播放歌曲解压文件
