昨日内容回顾
I/O 模型,面试会问到
I/O 操作,不占用 CPU。它内部有一个专门的处理 I/O 模块。
print 和写 log 属于 I/O 操作,它不占用 CPU
线程
GIL 保证一个进程中的多个线程在同一时刻只有一个可以被 CPU 执行
后续的项目,特别是处理网络请求,非常多。
实例化一个 Lock(),它就是一个互斥锁
LCOK 和 RLOCK
互斥锁 LCOK
死锁
rlock 递归锁
递归锁不会发生死锁现象
2 个进程中的线程,不会受到 GIL 影响。
GIL 是针对一个进程中的多个线程,同一时间,只能有一个线程访问 CPU
针对 GIL 的 CPU 利用率问题
起多个进程,就可以解决 CPU 利用率问题。
昨天的科学家吃面的例子,它不能用一把锁,必须 2 个锁。
def eat1(noodle_lock,fork_lock,name):
noodle_lock.acquire()
print('%s抢到了面'%name)
fork_lock.acquire()
print('%s抢到了叉子'%name)
print('%s正在吃面'%name)
fork_lock.release()
print('%s归还了叉子' % name)
noodle_lock.release()
print('%s归还了面' % name)
看下图

假设有三个人,
A 要面和叉子
B 只要面
C 只要叉子
如果只有一个锁,那么就无法处理这 3 个人的需求,会发生数据不安全的情况。
semaphore 在一开始固定一个线程的流量
condition 通过一个信号控制线程的流量
event 通过一个信号控制所有线程
timer 定时器
队列 线程数据安全
线程池
能够在多线程的基础上进一步节省内存和时间开销
一、引子
之前我们学习了线程、进程的概念,了解了在操作系统中进程是资源分配的最小单位,线程是 CPU 调度的最小单位。按道理来说我们已经算是把 cpu 的利用率提高很多了。但是我们知道无论是创建多进程还是创建多线程来解决问题,都要消耗一定的时间来创建进程、创建线程、以及管理他们之间的切换。
随着我们对于效率的追求不断提高,基于单线程来实现并发又成为一个新的课题,即只用一个主线程(很明显可利用的 cpu 只有一个)情况下实现并发。这样就可以节省创建线进程所消耗的时间。
为此我们需要先回顾下并发的本质:切换+保存状态
cpu 正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),一种情况是该任务发生了阻塞,另外一种情况是该任务计算的时间过长

ps:在介绍进程理论时,提及进程的三种执行状态,而线程才是执行单位,所以也可以将上图理解为线程的三种状态
一:其中第二种情况并不能提升效率,只是为了让 cpu 能够雨露均沾,实现看起来所有任务都被“同时”执行的效果,如果多个任务都是纯计算的,这种切换反而会降低效率。
为此我们可以基于 yield 来验证。yield 本身就是一种在单线程下可以保存任务运行状态的方法,我们来简单复习一下:
#1 yiled可以保存状态,yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级
#2 send可以把一个函数的结果传给另外一个函数,以此实现单线程内程序之间的切换
单纯地切换反而会降低运行效率
#串行执行
import time
def consumer(res):
'''任务1:接收数据,处理数据'''
pass
def producer():
'''任务2:生产数据'''
res=[]
for i in range(10000000):
res.append(i)
return res
start=time.time()
#串行执行
res=producer()
consumer(res) #写成consumer(producer())会降低执行效率
stop=time.time()
print(stop-start) #1.5536692142486572
#基于yield并发执行
import time
def consumer():
'''任务1:接收数据,处理数据'''
while True:
x=yield
def producer():
'''任务2:生产数据'''
g=consumer()
next(g)
for i in range(10000000):
g.send(i)
start=time.time()
#基于yield保存状态,实现两个任务直接来回切换,即并发的效果
#PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
producer()
stop=time.time()
print(stop-start) #2.0272178649902344
二:第一种情况的切换。在任务一遇到 io 情况下,切到任务二去执行,这样就可以利用任务一阻塞的时间完成任务二的计算,效率的提升就在于此。
import time
def consumer():
'''任务1:接收数据,处理数据'''
while True:
x=yield
def producer():
'''任务2:生产数据'''
g=consumer()
next(g)
for i in range(10000000):
g.send(i)
time.sleep(2)
start=time.time()
producer() #并发执行,但是任务producer遇到io就会阻塞住,并不会切到该线程内的其他任务去执行
stop=time.time()
print(stop-start)
yield无法做到遇到io阻塞
对于单线程下,我们不可避免程序中出现 io 操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到 io 阻塞时就切换到另外一个任务去计算,这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被 cpu 执行的状态,相当于我们在用户程序级别将自己的 io 操作最大限度地隐藏起来,从而可以迷惑操作系统,让其看到:该线程好像是一直在计算,io 比较少,从而更多的将 cpu 的执行权限分配给我们的线程。
协程的本质就是在单线程下,由用户自己控制一个任务遇到 io 阻塞了就切换另外一个任务去执行,以此来提升效率。为了实现它,我们需要找寻一种可以同时满足以下条件的解决方案:
#1. 可以控制多个任务之间的切换,切换之前将任务的状态保存下来,以便重新运行时,可以基于暂停的位置继续执行。
#2. 作为1的补充:可以检测io操作,在遇到io操作的情况下才发生切换
红色表示忙(IO),绿色表示正常运行
左边是正常的线程,右边是协程

在执行程序的过程中,遇到 IO 操作就切换其他线程执行,比如 b。
网络的 recv,访问网页,都存在 IO
协程
协程并不是实际存在的实体
它的本质 就是一个线程的多个部分
比线程的单位更小 —— 协程、纤程
它的本质就是一个线程的多个部分
在一个线程中可以开启很多协程
在执行程序的过程中,遇到 IO 操作就冻结当前位置的状态
去执行其他任务,在执行其他任务过程中,
会不断的检测上一个冻结的任务是否 IO 结束
如果 IO 结束了,就继续从冻结的位置开始执行
一个线程不会遇到阻塞 —— 一直在使用CPU
多个线程 —— 只能有一个线程使用CPU
协程比线程之间的切换和线程的创建销毁
所花费的时间、空间开销要小的多
协程的特点
冻结当前程序/任务的执行状态 —— 技能解锁
可以规避 IO 操作的时间
它的特点,是线程没有的。
冻结函数状态—>生成器
def func():
print(1)
yield 'aaa'
print(2)
yield 'bbb'
print(3)
yield 'ccc'
g = func()
next(g)
执行输出:1
def func():
x = yield 1
print(x)
yield 2
g = func()
print(next(g))
print(g.send('aaa'))
执行输出:
1
aaa
2
上面代码的运行过程如下。
1.当调用 next(g)方法时,python 首先会执行 func 方法的 yield 1 语句。由于是一个 yield 语句,因此方法的执行过程被挂起,而 next 方法返回值为 yield 关键字后面表达式的值,即为 1。
2.当调用 g.send(‘aaa’)方法时,python 首先恢复 func 方法的运行环境。同时,将表达式 yield 1 的返回值定义为 send 方法参数的值,即为 aaa
这样,接下来 x = yield 1 这一赋值语句会将 x 的值置为 aaa。继续运行会遇到 yield 2 语句。
因此,func 方法再次被挂起。同时,send 方法的返回值为 yield 关键字后面表达式的值,为 2。
最终输出:
1
aaa
2
单纯的切换状态,会不会影响程序执行时间?
看上面的例子:单纯地切换反而会降低运行效率
结论:
单纯的切换 还是要耗费一些时间的 记住当前执行的状态
上面的列表虽然执行快,但是它占用了大量内存。它是用时间换了空间。
二、协程介绍
协程:是单线程下的并发,又称微线程,纤程。英文名 Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。
需要强调的是:
#1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
#2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)
对比操作系统控制线程的切换,用户在单线程内控制协程的切换
优点如下:
#1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
#2. 单线程内就可以实现并发的效果,最大限度地利用cpu
缺点如下:
#1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
#2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
总结协程特点:
1.必须在只有一个单线程里实现并发 2.修改共享数据不需加锁 3.用户程序里自己保存多个控制流的上下文栈附加:一个协程遇到 IO 操作自动切换到其它协程(如何实现检测 IO,yield、greenlet 都无法实现,就用到了 gevent 模块(select 机制))
三、Greenlet 模块
简介:
Greenlet 是 python 的一个 C 扩展,来源于 Stackless python,旨在提供可自行调度的‘微线程’, 即协程。generator 实现的协程在 yield value 时只能将 value 返回给调用者(caller)。 而在 greenlet 中,target.switch(value)可以切换到指定的协程(target), 然后 yield value。greenlet 用 switch 来表示协程的切换,从一个协程切换到另一个协程需要显式指定。
安装 :pip3 install greenlet
from greenlet import greenlet
def eat(name):
print('%s eat 1' %name)
g2.switch('egon')
print('%s eat 2' %name)
g2.switch()
def play(name):
print('%s play 1' %name)
g1.switch()
print('%s play 2' %name)
g1=greenlet(eat)
g2=greenlet(play)
g1.switch('egon')#可以在第一次switch时传入参数,以后都不需要
greenlet实现状态切换
单纯的切换(在没有 io 的情况下或者没有重复开辟内存空间的操作),反而会降低程序的执行速度
效率对比
#顺序执行
import time
def f1():
res=1
for i in range(100000000):
res+=i
def f2():
res=1
for i in range(100000000):
res*=i
start=time.time()
f1()
f2()
stop=time.time()
print('run time is %s' %(stop-start)) #10.985628366470337
#切换
from greenlet import greenlet
import time
def f1():
res=1
for i in range(100000000):
res+=i
g2.switch()
def f2():
res=1
for i in range(100000000):
res*=i
g1.switch()
start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print('run time is %s' %(stop-start)) # 52.763017892837524
greenlet 只是提供了一种比 generator 更加便捷的切换方式,当切到一个任务执行时如果遇到 io,那就原地阻塞,仍然是没有解决遇到 IO 自动切换来提升效率的问题。
单线程里的这 20 个任务的代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务 1 时遇到阻塞,就利用阻塞的时间去执行任务 2。。。。如此,才能提高效率,这就用到了 Gevent 模块。
greenlet 不是创造协程的模块
它是用来做多个协程任务切换的
它到底是怎么实现切换的呢?
from greenlet import greenlet
def func():
print(123)
def func2():
print(456)
g1 = greenlet(func) # 实例化
g2 = greenlet(func2)
g1.switch() # 开始运行,它会运行到下一个switch结束。否则一直运行
执行输出:123
from greenlet import greenlet
def test1():
print 12
gr2.switch()
print 34
def test2():
print 56
gr1.switch()
print 78
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
执行输出:
12
56
34
执行过程:
当创建一个 greenlet 时,首先初始化一个空的栈,switch 到这个栈的时候,会运行在 greenlet 构造时传入的函数(首先在 test1 中打印 12), 如果在这个函数(test1)中 switch 到其他协程(到了 test2 打印 34),那么该协程会被挂起,等到切换回来(在 test2 中切换回来 打印 34)。当这个协程对应函数执行完毕,那么这个协程就变成 dead 状态。
注意 上面没有打印 test2 的最后一行输出 78,因为在 test2 中切换到 gr1 之后挂起,但是没有地方再切换回来。这个可能造成泄漏,后面细说。
上面的例子,有几个缺点
1.手动切换
2.不能规避 I/O 操作(睡眠)
四、Gevent 模块
安装:pip3 install gevent
Gevent 是一个第三方库,可以轻松通过 gevent 实现并发同步或异步编程,在 gevent 中用到的主要模式是 Greenlet, 它是以 C 扩展模块形式接入 Python 的轻量级协程。Greenlet 全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
用法介绍
g1=gevent.spawn(func,1,,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的
g2=gevent.spawn(func2)
g1.join() #等待g1结束
g2.join() #等待g2结束
#或者上述两步合作一步:gevent.joinall([g1,g2])
g1.value#拿到func1的返回值
例:遇到io主动切换
from gevent import monkey;monkey.patch_all()
import gevent
import time
def eat():
print('eat food 1')
time.sleep(2)
print('eat food 2')
def play():
print('play 1')
time.sleep(1)
print('play 2')
g1=gevent.spawn(eat)
g2=gevent.spawn(play)
gevent.joinall([g1,g2])
print('主')
我们可以用 threading.current_thread().getName()来查看每个 g1 和 g2,查看的结果为 DummyThread-n,即假线程

from gevent import monkey;monkey.patch_all()
import threading
import gevent
import time
def eat():
print(threading.current_thread().getName())
print('eat food 1')
time.sleep(2)
print('eat food 2')
def play():
print(threading.current_thread().getName())
print('play 1')
time.sleep(1)
print('play 2')
g1=gevent.spawn(eat)
g2=gevent.spawn(play)
gevent.joinall([g1,g2])
print('主')
查看threading.current_thread().getName()
真正能实现协程的模块 gevent
import gevent
def eat():
print('eating1')
print('eating2')
g1 = gevent.spawn(eat) #创建一个协程对象g1
执行输出为空,表示它还没执行。
import gevent
def eat():
print('eating1')
print('eating2')
g1 = gevent.spawn(eat) #创建一个协程对象g1
g1.join() #等待g1结束
执行输出:
eating1
eating2
import time
import gevent
def eat():
print('eating1')
time.sleep(1)
print('eating2')
def play():
print('playing1')
time.sleep(1)
print('playing2')
g1 = gevent.spawn(eat) #创建一个协程对象g1
g2 = gevent.spawn(play)
g1.join() #等待g1结束
g2.join()
执行输出:
eating1
eating2
playing1
playing2
如果想顺序执行呢?需要用到 gevent.sleep
import time
import gevent
def eat():
print('eating1')
gevent.sleep(1) #延时调用
print('eating2')
def play():
print('playing1')
gevent.sleep(1) #延时调用
print('playing2')
g1 = gevent.spawn(eat) #创建一个协程对象g1
g2 = gevent.spawn(play)
g1.join() #等待g1结束
g2.join()
执行输出:
eating1
playing1
eating2
playing2
如果想让协程执行 time.sleep()呢?由于默认,协程无法识别 time.sleep()方法,需要导入一个模块 monkey
monkey patch (猴子补丁)
用来在运行时动态修改已有的代码,而不需要修改原始代码。
from gevent import monkey;monkey.patch_all()
# 它会把下面导入的所有的模块中的IO操作都打成一个包,gevent就能够认识这些IO了
import time
import gevent
def eat():
print('eating1')
time.sleep(1) #延时调用
print('eating2')
def play():
print('playing1')
time.sleep(1) #延时调用
print('playing2')
g1 = gevent.spawn(eat) #创建一个协程对象g1
g2 = gevent.spawn(play)
g1.join() #等待g1结束
g2.join()
执行输出:
eating1
playing1
eating2
playing2
结论:
使用 gevent 模块来执行多个函数,表示在这些函数遇到 IO 操作的时候可以在同一个线程中进行切换
利用其他任务的 IO 阻塞时间来切换到其他的任务继续执行
前提是:
spawn 来发布协程任务
join 负责开启并等待任务执行结束
gevent 本身不认识其他模块中的 IO 操作,但是如果我们在导入其他模块之前执行 from gevent import monkey;monkey.patch_all() 这行代码,必须在文件最开头
gevent 就能够认识在这句话之后导入的模块中的所有 IO 操作了
五、Gevent 之同步与异步
from gevent import spawn,joinall,monkey;monkey.patch_all()
import time
def task(pid):
"""
Some non-deterministic task
"""
time.sleep(0.5)
print('Task %s done' % pid)
def synchronous(): # 同步
for i in range(10):
task(i)
def asynchronous(): # 异步
g_l=[spawn(task,i) for i in range(10)]
joinall(g_l)
print('DONE')
if __name__ == '__main__':
print('Synchronous:')
synchronous()
print('Asynchronous:')
asynchronous()
# 上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。
# 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,
# 后者阻塞当前流程,并执行所有给定的greenlet任务。执行流程只会在 所有greenlet执行完后才会继续向下走。
当一个任务执行时,依赖另外一个任务的结果时,这种情况不适合异步,只能用同步
Gevent 之应用举例一
手动安装模块 requests
pip3 install requests
协程应用:爬虫
from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time
def get_page(url):
print('GET: %s' %url)
response=requests.get(url)
if response.status_code == 200:
print('%d bytes received from %s' %(len(response.text),url))
start_time=time.time()
gevent.joinall([
gevent.spawn(get_page,'https://www.python.org/'),
gevent.spawn(get_page,'https://www.yahoo.com/'),
gevent.spawn(get_page,'https://github.com/'),
])
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))
等待网页请求结果是,去执行其他任务
红色表示等待,绿色表示执行任务

另外一个爬虫例子:
对比使用普通函数和使用协程,谁更快一点
由于操作系统,访问一次网页后,会有缓存。
所以测试时,先访问一遍网页。再分别测试协程和普通函数。
from gevent import monkey;monkey.patch_all()
from urllib.request import urlopen
import gevent
import time
def get_page(url):
res = urlopen(url)
#print(len(res.read()))
url_lst = [
'http://www.baidu.com',
'http://www.sogou.com',
'http://www.sohu.com',
'http://www.qq.com',
'http://www.cnblogs.com',
]
start = time.time()
gevent.joinall([gevent.spawn(get_page,url) for url in url_lst])
print('先执行一次',time.time() - start)
start = time.time()
gevent.joinall([gevent.spawn(get_page,url) for url in url_lst])
print('协程',time.time() - start)
start = time.time()
for url in url_lst:get_page(url)
print('普通',time.time() - start)
执行输出:
先执行一次 0.6465449333190918
协程 0.34525322914123535
普通 0.570899486541748
结论
以后用爬虫,可以使用协程,它的速度更快。
Gevent 之应用举例二
通过 gevent 实现单线程下的 socket 并发
注意 :from gevent import monkey;monkey.patch_all()一定要放到导入 socket 模块之前,否则 gevent 无法识别 socket 的阻塞
server
from gevent import monkey;monkey.patch_all()
from socket import *
import gevent
#如果不想用money.patch_all()打补丁,可以用gevent自带的socket
# from gevent import socket
# s=socket.socket()
def server(server_ip,port):
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind((server_ip,port))
s.listen(5)
while True:
conn,addr=s.accept()
gevent.spawn(talk,conn,addr)
def talk(conn,addr):
try:
while True:
res=conn.recv(1024)
print('client %s:%s msg: %s' %(addr[0],addr[1],res))
conn.send(res.upper())
except Exception as e:
print(e)
finally:
conn.close()
if __name__ == '__main__':
server('127.0.0.1',8080)
client
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))
多线程并发多个客户端
from threading import Thread
from socket import *
import threading
def client(server_ip,port):
c=socket(AF_INET,SOCK_STREAM) #套接字对象一定要加到函数内,即局部名称空间内,放在函数外则被所有线程共享,则大家公用一个套接字对象,那么客户端端口永远一样了
c.connect((server_ip,port))
count=0
while True:
c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
msg=c.recv(1024)
print(msg.decode('utf-8'))
count+=1
if __name__ == '__main__':
for i in range(500):
t=Thread(target=client,args=('127.0.0.1',8080))
t.start()
进程 5 个,线程 20 个,协程 500 个 —— 通用的组合 —— 50000qps
0.3s 以内,用户是感觉不到的
只有进程能处理并行
重点掌握进程,线程,协程
这 3 者之间的区别,优缺点,理论知识。面试会问道。
task,翻译是任务
1 多进程/多线程网络编程都是一个进程或者线程处理一个task,当task过多时,就会导致巨量的进程/线程。
2 巨量的进程/线程会导致 上下文切换极其频繁! 大家知道:上下文切换是要消耗cpu资源的 所以当进程/线程数量过多时,cpu资源就得不到有效利用
3 而协程实际上就是:在用户空间实现task的上下文切换! 这种上下文切换消耗的代价相较而言微乎其微。这就是协程的优势!
4 当然协程也有劣势:就是无法利用多核cpu,但是我们有解决办法:多进程 + 协程
看下图

playing2 没有输出,是因为阻塞结束,不再切换。
明天默写:
socket_server
from gevent import monkey;monkey.patch_all()
import socket
import gevent
def async_talk(conn):
try:
while True:
conn.send(b'hello')
ret = conn.recv(1024)
print(ret)
finally:
conn.close()
sk = socket.socket()
sk.bind(('127.0.0.1',9000))
sk.listen()
while True:
conn,addr = sk.accept()
gevent.spawn(async_talk,conn)
sk.close()
socket_client
import socket
from threading import Thread
def socket_client():
sk = socket.socket()
sk.connect(('127.0.0.1',9000))
while True:
print(sk.recv(1024))
sk.send(b'bye')
sk.close()
for i in range(500):
Thread(target=socket_client).start()