1.python并发编程简介
1.1为什么要引入并发编程
一句话,提高速度
场景1:一个网络爬虫,按顺序爬取花了1小时,采用并发下载减少到20分钟
场景2:一个APP应用,优化前每次打开页面需要3秒,采用异步并发提升到每次200毫秒
1.2有哪些程序提速的方法
1.3Python对并发编程的支持
多线程:threading,利用CPU和IO可以同时执行的原理,让CPU不会干巴巴等待IO完成
多进程:multiprocessing,利用多核CPU的能力,真正的并行执行任务
异步IO:asyncio,在单线程利用CPU和IO同时执行的原理,实现函数异步执行
使用Lock对资源加锁,防止冲突访问
使用Queue实现不同线程/进程之间的数据通信,实现生产者-消费者模式
使用线程池Pool/进程池Pool,简化线程/进程的任务提交、等待结束、获取结果
使用subprocess启动外部程序的进程,并进行输入输出交互
2.怎样选择多线程多进程多协程
2.1什么是CPU密集型计算、IO密集型计算
CPU密集型(CPU-bound)
CPU密集型也叫计算密集型,是指I/O在很短的时间就可以完成,CPU需要大量的计算和处理,特点是CPU占用率相当高
例如:压缩解压缩、加密解密、正则表达式搜索
IO密集型(I/O bound)
IO密集型指的是系统运作大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,CPU占用率仍然较低
例如:文件处理程序、网络爬虫程序、读写数据库程序
2.2多线程、多进程、多协程的对比
2.3怎样根据任务选择对应技术
3.Python速度慢的罪魁祸首,全局解释器锁GIL
3.1Python速度慢的两大原因
- 动态类型语言边解释边执行
- GIL 无法利用多核CPU并发执行
3.2GIL是什么
3.3为什么有GIL这个东西
3.4怎样规避GIL带来的限制
4.使用多线程,Python爬虫被加速10倍
4.1基础知识
4,2代码实现
import requests
from bs4 import BeautifulSoup
urls = [
f"https://www.cnblogs.com/sitehome/p/{page}" //列表生成式
for page in range(1, 50 + 1)
]
def craw(url): //发起网络请求
#print("craw url: ", url)
r = requests.get(url)
return r.text
def parse(html): //数据提取,我会选择使用Xpath,但他使用的BeautifulSoup
# class="post-item-title"
soup = BeautifulSoup(html, "html.parser")
links = soup.find_all("a", class_="post-item-title")
return [(link["href"], link.get_text()) for link in links]
if __name__ == "__main__":
for result in parse(craw(urls[2])):
print(result)
import blog_spider
import threading
import time
def single_thread(): //单线程
print("single_thread begin")
for url in blog_spider.urls:
blog_spider.craw(url)
print("single_thread end")
def multi_thread(): //多线程
print("multi_thread begin")
threads = []
for url in blog_spider.urls:
threads.append(
threading.Thread(target=blog_spider.craw, args=(url,))
)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("multi_thread end")
if __name__ == "__main__":
start = time.time()
single_thread()
end = time.time()
print("single thread cost:", end - start, "seconds")
start = time.time()
multi_thread()
end = time.time()
print("multi thread cost:", end - start, "seconds")
4,3自学知识补充
很多同学到这一节勉强能够看懂一些,但其实你根本没懂,强行进入下一节,你只会选择放弃,但你无法真正放弃,只是换个视频学习而已
理解不了不是你的问题,你需要足够的知识作为补充,利用文档进行学习尤为必要,与其要求别人,不如来要求自己,因为后者我有绝对的主动权
接下来进行多线程的学习
多线程简介
Python通过两个标准库thread和threading提供对线程的支持。thread提供了低级别的、原始的线程以及一个简单的锁 所以我们只需要学习threading就行
线程模块threading
使用Threading模块创建线程
import threading
import time
def process(): //这个函数很容易理解,就是分别执行输入三次
for i in range(3):
time.sleep(1)
print("thread name is %s\n"% threading.current_thread().name)
if __name__ == '__main__':
print("----主线程开始----")
threads = [threading.Thread(target=process) for i in range(4)] #创建四个线程,存入列表
for i in threads:
i.start() #启动线程
for i in threads:
i.join() #等待子线程结束
print("----主线程结束----")
简单来说这几行代码的意思就是,创建了4个线程,然后分别用for循环执行4次start()和join()方法。每个子线程分析执行输出3次
里面关键点是 threading.Thread
5.Python实现生产者消费者爬虫
multiProcessing Queue队列可以实现线程间通信。使用Quene队列在线程间通信通常应用于生产者消费者模式
生产者:产生数据的模块
消费者:处理数据点模块
再生产者与消费者之间的缓冲区称之为仓库。生产者负责往仓库运输商品,而消费者负责从仓库里取出商品,这就构成了生产者消费者模式
生产者消费者爬虫的架构
多线程数据通信的queue.Queue
Queue.qsize(): 返回queue的近似值。注意:qsize>0 不保证(get)取元素不阻塞。qsize<maxsize不保证(put)存元素不会阻塞
Queue.empty(): 判断队列是否为空。和上面一样注意
Queue.full(): 判断是否满了。和上面一样注意
Queue.put(item, block=True, timeout=None): 往队列里放数据。如果满了的话,blocking = False 直接报 Full异常。如果blocking = True,就是等一会,timeout必须为 0 或正数。None为一直等下去,0为不等,正数n为等待n秒还不能存入,报Full异常。
Queue.put_nowait(item): 往队列里存放元素,不等待
Queue.get(item, block=True, timeout=None): 从队列里取数据。如果为空的话,blocking = False 直接报 empty异常。如果blocking = True,就是等一会,timeout必须为 0 或正数。None为一直等下去,0为不等,正数n为等待n秒还不能读取,报empty异常
Queue.get_nowait(item): 从队列里取元素,不等待两个方法跟踪入队的任务是否被消费者daemon进程完全消费
Queue.task_done(): 表示队列中某个元素呗消费进程使用,消费结束发送的信息。每个get()方法会拿到一个任务,其随后调用task_done()表示这个队列,这个队列的线程的任务完成。就是发送消息,告诉完成啦!
如果当前的join()当前处于阻塞状态,当前的所有元素执行后都会重启(意味着收到加入queue的每一个对象task_done()调用的信息)
如果调用的次数操作放入队列的items的个数多的话,会触发ValueError异常
Queue.join(): 一直阻塞直到队列中的所有元素都被取出和执行未完成的个数,只要有元素添加到queue中就会增加。未完成的个数,只要消费者线程调用task_done()表明其被取走,其调用结束。当未完成任务的计数等于0,join()就会不阻塞
代码示例
from multiprocessing import Queue
if __name__ == '__main__':
q = Queue(3) #初始化Quene,括号内填写num值为可接受消息数量,如果num值没有指定或者为负值就代表消息数量没有上限
q.put("消息1")
q.put("消息2")
print(q.full()) #判断是否满了,这里返回False
q.put("消息3") //put(item, block=True, timeout=None): item代表数据,blocking = False 直接报 Full异常。如果blocking = True,就是等一会,timeout必须为 0 或正数。None为一直等下去,0为不等,正数n为等待n秒还不能存入,报Full异常。
print(q.full()) # 判断是否满了,这里返回True
try:
q.put("消息4",True,2)
except:
print("消息队列已满,现有消息数量:%s"%q.qsize()) //返回queue的近似值,即元素数量
try:
q.put_nowait("消息4") //put_nowait(item): 往队列里存放元素,不等待
except:
print("消息队列已满,现有消息数量:%s" % q.qsize()) //返回queue的近似值,即元素数量
if not q.empty():
print("-----从队列中获取消息------")
for i in range(q.qsize()):
print(q.get_nowait())
if not q.full():
q.put_nowait("消息4") //get_nowait(item): 从队列里取元素,不等待两个方法跟踪入队的任务是否被消费者daemon进程完全消费
from multiprocessing import Process,Queue
import time
#向队列中写入数据
def write_task(q):
if not q.full():
for i in range(5):
message = "消息"+str(i)
q.put(message)
print("写入:%s"%message)
def read_task(q):
time.sleep(1)
while not q.empty():
print("读取:%s"% q.get(True,2))
if __name__ == '__main__':
print("---父进程开始-----")
q = Queue()
pw = Process(target=write_task,args=(q,))
pr = Process(target=read_task,args=(q,))
pw.start()
pr.start()
pw.join()
pr.join()
print("---父进程结束-----")
6.Python线程安全问题以及解决方案
6.1线程安全概念介绍
6.2Lock 用于解决线程安全问题
with模式
import threading
import time
lock = threading.Lock() #创建锁
class Account:
def __init__(self, balance):
self.balance = balance
def draw(account, amount):
with lock: #将存有问题的代码放在锁里面
if account.balance >= amount:
time.sleep(0.1)
print(threading.current_thread().name,
"取钱成功")
account.balance -= amount
print(threading.current_thread().name,
"余额", account.balance)
else:
print(threading.current_thread().name,
"取钱失败,余额不足")
if __name__ == "__main__":
account = Account(1000)
ta = threading.Thread(name="ta", target=draw, args=(account, 800))
tb = threading.Thread(name="tb", target=draw, args=(account, 800))
ta.start()
tb.start()
try-finally模式
这里使用多线程和互斥锁模拟实现多人同时订购电影票的功能,假设电影院某个场次只有100张电影票,10个用户同时抢购该电影票。每售出一张,显示一次剩余的电影票张数
from threading import Thread,Lock
import time
n = 100
def task():
global n
mutex.acquire() #上锁
temp = n #赋值给临时变量
time.sleep(0.1)
n = temp - 1
print("购买成功,剩余%d张电影票"%n)
mutex.release() #释放锁
if __name__ == '__main__':
mutex = Lock() #实例化Lock类
t_1 = [] #初始化一个列表
for i in range(10):
t = Thread(target=task) #实例化线程类
t_1.append(t) #将线程实例化存入列表中
t.start() #创建线程
for t in t_1:
t.join() #等待子线程结束
7.Python好用的线程池ThreadPoolExecutor
8.Python使用线程池在Web服务中实现加速
9.使用多进程multiprocessing模块加速程序的运行
9.1有了多线程threading,为什么还要用多进程multiprocessing
其实前面章节已经介绍清楚了
9.2多进程multiprocessing知识梳理
9.3代码实战:单线程、多线程、多进程对比CPU密集计算速度
10.Python在Flask服务中使用多进程池加速程序运行
11.Python异步IO实现并发爬虫
12.在异步IO中使用信号量控制爬虫并发度
13.Python使用subprocess播放歌曲解压文件