昨日内容回顾

协程实际上是一个线程,执行了多个任务,遇到 IO 就切换

切换,可以使用 yield,greenlet

遇到 IO gevent: 检测到 IO,能够使用 greenlet 实现自动切换,规避了 IO 阻塞问题。

昨天没有讲到的小问题,看下面的例子:

  1. import gevent
  2. def func():
  3. print('eating')
  4. gevent.spawn(func) # 协程任务开启

执行程序,没有输出结果

加上 join

  1. import gevent
  2. def func():
  3. print('eating')
  4. g = gevent.spawn(func)
  5. g.join() # 阻塞,等待协程执行完毕

执行输出:eating

加上睡眠

  1. import time
  2. import gevent
  3. def func():
  4. print('eating')
  5. g = gevent.spawn(func)
  6. time.sleep(1) # 等待1秒

执行程序,没有输出结果

使用 gevent.sleep()

  1. import gevent
  2. def func():
  3. print('eating')
  4. g = gevent.spawn(func)
  5. gevent.sleep(1)

执行输出:eating

导入 monkey 模块,再使用内置的 time.sleep()

  1. from gevent import monkey;monkey.patch_all()
  2. import time
  3. import gevent
  4. def func():
  5. print('eating')
  6. g = gevent.spawn(func)
  7. time.sleep(1)

执行输出:eating

修改为 sleep(0)

  1. from gevent import monkey;monkey.patch_all()
  2. import time
  3. import gevent
  4. def func():
  5. print('eating')
  6. g = gevent.spawn(func)
  7. time.sleep(0)

执行输出:eating

time.sleep(0) 虽然时间为 0,它也是阻塞,gevent 检测到了 IO

总结:

协程任务开启,不会立即执行,它需要 IO 才能执行

  1. from gevent import monkey;monkey.patch_all()
  2. import time
  3. import gevent
  4. def func():
  5. print('eating1') # 执行
  6. time.sleep(0.1) # 遇到IO
  7. print('eating2')
  8. time.sleep(0.1)
  9. print('eating3')
  10. time.sleep(0.1)
  11. g = gevent.spawn(func) # 协程任务开启
  12. time.sleep(0) # 阻塞遇到IO

执行输出:

eating1

结果为什么是 eating1?下面的 2 和 3 为啥不执行呢?

因为 time.sleep(0) 比 time.sleep(0.1)要快,执行之后,就结束了。

切换到主进程时,发现主进程已经结束了。所以 eating2 和 eating3 没有执行。

如果用 join,就可以让 eating2 和 earting3 执行

  1. from gevent import monkey;monkey.patch_all()
  2. import time
  3. import gevent
  4. def func():
  5. print('eating1') # 执行
  6. time.sleep(0.1) # 遇到IO
  7. print('eating2')
  8. time.sleep(0.1)
  9. print('eating3')
  10. time.sleep(0.1)
  11. g = gevent.spawn(func) # 协程任务开启
  12. #time.sleep(0) # 阻塞遇到IO
  13. g.join() # 等待协程结束

待协程结束

执行输出:

eating1

eating2

eating3

总结:

内部执行 switch 时,必须保证协程不结束之前,主线程不结束

在昨天的 socket 例子中,没有用到 join

因为 accept 是永久阻塞,它不需要 join

并发编程,在面试很重要。一定要熟练掌握。

一、IO 模型介绍

为了更好地了解 IO 模型,我们需要事先回顾下:同步、异步、阻塞、非阻塞

同步(synchronous) IO 和异步(asynchronous) IO,阻塞(blocking) IO 和非阻塞(non-blocking)IO 分别是什么,到底有什么区别?这个问题其实不同的人给出的答案都可能不同,比如 wiki,就认为 asynchronous IO 和 non-blocking IO 是一个东西。这其实是因为不同的人的知识背景不同,并且在讨论这个问题的时候上下文(context)也不相同。所以,为了更好的回答这个问题,我先限定一下本文的上下文。

本文讨论的背景是 Linux 环境下的 network IO。本文最重要的参考文献是 Richard Stevens 的“UNIX® Network Programming Volume 1, Third Edition: The Sockets Networking ”,6.2 节“I/O Models ”,Stevens 在这节中详细说明了各种 IO 的特点和区别,如果英文够好的话,推荐直接阅读。Stevens 的文风是有名的深入浅出,所以不用担心看不懂。本文中的流程图也是截取自参考文献。

Stevens 在文章中一共比较了五种 IO Model:

  1. * blocking IO 阻塞 IO
  2. * nonblocking IO 非阻塞 IO
  3. * IO multiplexing IO 多路复用
  4. * signal driven IO 信号驱动 IO
  5. * asynchronous IO 异步 IO

由 signal driven IO(信号驱动 IO)在实际中并不常用,所以主要介绍其余四种 IO Model。

再说一下 IO 发生时涉及的对象和步骤。对于一个 network IO (这里我们以 read 举例),它会涉及到两个系统对象,一个是调用这个 IO 的 process (or thread),另一个就是系统内核(kernel)。当一个 read 操作发生时,该操作会经历两个阶段:

  1. #1)等待数据准备 (Waiting for the data to be ready)
  2. #2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)

记住这两点很重要,因为这些 IO 模型的区别就是在两个阶段上各有不同的情况。

异步:多个任务,并发执行。再执行一个操作的同时,又有另外一个任务也在执行。

input accept recv recvfrom read(文件) 这些表示长时间 IO

sleep join

print send write log.debug 这些表示短暂 IO

二、阻塞 IO(blocking IO)

在 linux 中,默认情况下所有的 socket 都是 blocking,一个典型的读操作流程大概是这样:

Day44 IO模型介绍,阻塞IO,非阻塞IO,多路复用IO,异步IO,IO模型比较分析,selectors模块,垃圾回收机制 - 图1

当用户进程调用了 recvfrom 这个系统调用,kernel 就开始了 IO 的第一个阶段:准备数据。对于 network io 来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的 UDP 包),这个时候 kernel 就要等待足够的数据到来。

而在用户进程这边,整个进程会被阻塞。当 kernel 一直等到数据准备好了,它就会将数据从 kernel 中拷贝到用户内存,然后 kernel 返回结果,用户进程才解除 block 的状态,重新运行起来。

所以,blocking IO 的特点就是在 IO 执行的两个阶段(等待数据和拷贝数据两个阶段)都被 block 了

几乎所有的程序员第一次接触到的网络编程都是从 listen()、send()、recv() 等接口开始的,使用这些接口可以很方便的构建服务器/客户机的模型。然而大部分的 socket 接口都是阻塞型的。如下图

ps:所谓阻塞型接口是指系统调用(一般是 IO 接口)不返回调用结果并让当前线程一直阻塞,只有当该系统调用获得结果或者超时出错时才返回。

Day44 IO模型介绍,阻塞IO,非阻塞IO,多路复用IO,异步IO,IO模型比较分析,selectors模块,垃圾回收机制 - 图2

实际上,除非特别指定,几乎所有的 IO 接口 ( 包括 socket 接口 ) 都是阻塞型的。这给网络编程带来了一个很大的问题,如在调用 recv(1024)的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。

一个简单的解决方案:

  1. #在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。

该方案的问题是:

  1. #开启多进程或都线程的方式,在遇到要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。

改进方案:

  1. #很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如websphere、tomcat和各种数据库等。

改进后方案其实也存在着问题:

  1. #“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。

对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题。

Day44 IO模型介绍,阻塞IO,非阻塞IO,多路复用IO,异步IO,IO模型比较分析,selectors模块,垃圾回收机制 - 图3

发送数据,首先由操作系统接收数据

操作系统陷入阻塞,等待数据阶段

因为它不知道,它时候数据来了

上图的几个绿色箭头,都是操作系统做的。

用户态接收到消息,结束阻塞状态

上图是网络 IO 模型

之前的并发,是多创建了几个进程

那么阻塞在进程中。用户虽然感觉不到,本质上是不能解决 IO 问题

阻塞 IO 是比较低效的

所有的阻塞都不会调用 CPU

IO 多的线程:比如任务过多时,会耗费 CPU

协程 500 个任务,不耗费 CPU,切换速度非常快。

所以协程比线程快

如果是高计算型的,需要用到多进程,在多进程,再起协程

这样效率会更高,它会规避很多 IO 操作

如果任务比较少,起线程和协程区别很小。但是协程会更好一点

总结:

协程能解决的事情,不要用线程

在其他语言里面:

  1. 多进程 数据隔离 可以利用多核
  2. 多线程 数据不隔离,可以利用多核
  3. 协程 数据不隔离 不能利用多核
  4. 所以,协程很鸡肋

在 Cpython 解释器下的 python 语言

  1. 多进程 数据隔离 可以利用多核
  2. 多线程 数据不隔离,不能利用多核
  3. 协程 数据不隔离 不能利用多核
  4. 但是协程的优势大于线程。在 Cpython 下,效率非常快。

三、非阻塞 IO(non-blocking IO)

Linux 下,可以通过设置 socket 使其变为 non-blocking。当对一个 non-blocking socket 执行读操作时,流程是这个样子:

Day44 IO模型介绍,阻塞IO,非阻塞IO,多路复用IO,异步IO,IO模型比较分析,selectors模块,垃圾回收机制 - 图4

从图中可以看出,当用户进程发出 read 操作时,如果 kernel 中的数据还没有准备好,那么它并不会 block 用户进程,而是立刻返回一个 error。从用户进程角度讲 ,它发起一个 read 操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个 error 时,它就知道数据还没有准备好,于是用户就可以在本次到下次再发起 read 询问的时间间隔内做其他事情,或者直接再次发送 read 操作。一旦 kernel 中的数据准备好了,并且又再次收到了用户进程的 system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的),然后返回。

也就是说非阻塞的 recvform 系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个 error。进程在返回之后,可以干点别的事情,然后再发起 recvform 系统调用。重复上面的过程,循环往复的进行 recvform 系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。

所以,在非阻塞式 IO 中,用户进程其实是需要不断的主动询问 kernel 数据准备好了没有。

非阻塞IO实例

  1. #服务端
  2. from socket import *
  3. import time
  4. s=socket(AF_INET,SOCK_STREAM)
  5. s.bind(('127.0.0.1',8080))
  6. s.listen(5)
  7. s.setblocking(False) #设置socket的接口为非阻塞
  8. conn_l=[]
  9. del_l=[]
  10. while True:
  11. try:
  12. conn,addr=s.accept()
  13. conn_l.append(conn)
  14. except BlockingIOError:
  15. print(conn_l)
  16. for conn in conn_l:
  17. try:
  18. data=conn.recv(1024)
  19. if not data:
  20. del_l.append(conn)
  21. continue
  22. conn.send(data.upper())
  23. except BlockingIOError:
  24. pass
  25. except ConnectionResetError:
  26. del_l.append(conn)
  27. for conn in del_l:
  28. conn_l.remove(conn)
  29. conn.close()
  30. del_l=[]
  31. #客户端
  32. from socket import *
  33. c=socket(AF_INET,SOCK_STREAM)
  34. c.connect(('127.0.0.1',8080))
  35. while True:
  36. msg=input('>>: ')
  37. if not msg:continue
  38. c.send(msg.encode('utf-8'))
  39. data=c.recv(1024)
  40. print(data.decode('utf-8'))

但是非阻塞 IO 模型绝不被推荐。

我们不能否则其优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在“”同时“”执行)。

但是也难掩其缺点:

  1. #1. 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况
  2. #2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。

此外,在这个方案中 recv()更多的是起到检测“操作是否完成”的作用,实际操作系统提供了更为高效的检测“操作是否完成“作用的接口,例如 select()多路复用模式,可以一次检测多个连接是否活跃。

Day44 IO模型介绍,阻塞IO,非阻塞IO,多路复用IO,异步IO,IO模型比较分析,selectors模块,垃圾回收机制 - 图5

用户态,表示 python 代码

如果只接受一个连接,只有一个 conn,只有一个 recv

那么它和阻塞 IO 效果是一样的

如果不止有一个连接,可以利用等待时间,做其他事情

比如再接收一个新的连接。

看下面的例子:

起一个进程,多个客户端同时连接,实现并发效果,完成非阻塞 IO 模型

服务端

  1. import socket
  2. sk = socket.socket() # 创建套接字
  3. sk.bind(('127.0.0.1', 9000)) # 监听回环地址的9000端口
  4. sk.setblocking(False) # 将套接字设为非阻塞模式
  5. sk.listen() # 开始监听
  6. '''
  7. 只存储正常连接列表。是为了之前的连接,不会被覆盖。
  8. 可能之前一个连接,还没有处理完,新的连接过来时
  9. 之前的连接,就被覆盖了。为了解决这个问题,需要一个列表来存储
  10. '''
  11. conn_lst = []
  12. '''
  13. 关闭连接列表,这个列表的元素如果存在conn_lst中,就会被删除。
  14. 因为在循环一个列表conn_lst时,不能对conn_lst做增加/删除操作。
  15. 所以需要遍历del_lst列表,每次从conn_lst中删除一个元素。
  16. '''
  17. del_lst = []
  18. while True:
  19. try:
  20. conn, addr = sk.accept() # 非阻塞的模型
  21. print(conn, addr) # 打印连接以及客户端地址
  22. conn_lst.append(conn) # 添加正常的连接
  23. except BlockingIOError as e: # 不正确的连接,刚开始是没有发送数据
  24. for conn in conn_lst: # [conn1,conn2,conn3]
  25. try:
  26. msg = conn.recv(1024) # 非阻塞,接收1024字节
  27. if not msg: # msg = b'' 判断接收数据为空
  28. conn.close() # 关闭连接
  29. del_lst.append(conn) # 添加到删除列表
  30. continue # 返回程序最开始的地方,即try
  31. print(msg) # 打印接收信息
  32. msg = msg.decode('utf-8').upper() # 解码并将接收信息全部大写
  33. conn.send(msg.encode('utf-8')) # 发送处理后的数据给客户端
  34. except NameError:pass # 没有客户端连接
  35. except BlockingIOError:pass # 没有数据来
  36. for conn in del_lst: # 遍历删除列表
  37. conn_lst.remove(conn) # 每次从正常的连接中,删除已经关闭的连接。为了不做OSError异常处理,直接删掉连接
  38. del_lst.clear() # 清空列表。

客户端

  1. import time
  2. import socket
  3. sk = socket.socket()
  4. sk.connect(('127.0.0.1',9000))
  5. for i in range(20): # 循环20次
  6. sk.send(b'hello')
  7. print(sk.recv(1024))
  8. time.sleep(1) # 模拟自动发送,间隔1秒
  9. sk.close()

执行输出,效果如下:

Day44 IO模型介绍,阻塞IO,非阻塞IO,多路复用IO,异步IO,IO模型比较分析,selectors模块,垃圾回收机制 - 图6

假设 conn1.recv 阻塞了,它不会就干等着。它会继续执行新的连接,比如 conn2.recv

所以 CPU,会一直工作,不会闲着。它一直在 while True

那么问题来了,非阻塞 IO,非常耗费 CPU

非阻塞 IO 虽然完成了异步,但是它非常耗费 CPU

四、多路复用 IO(IO multiplexing)

IO multiplexing 这个词可能有点陌生,但是如果我说 select/epoll,大概就都能明白了。有些地方也称这种 IO 方式为事件驱动 IO(event driven IO)。我们都知道,select/epoll 的好处就在于单个 process 就可以同时处理多个网络连接的 IO。它的基本原理就是 select/epoll 这个 function 会不断的轮询所负责的所有 socket,当某个 socket 有数据到达了,就通知用户进程。它的流程如图:

Day44 IO模型介绍,阻塞IO,非阻塞IO,多路复用IO,异步IO,IO模型比较分析,selectors模块,垃圾回收机制 - 图7

当用户进程调用了 select,那么整个进程会被 block,而同时,kernel 会“监视”所有 select 负责的 socket,当任何一个 socket 中的数据准备好了,select 就会返回。这个时候用户进程再调用 read 操作,将数据从 kernel 拷贝到用户进程。

这个图和 blocking IO 的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用(select 和 recvfrom),而 blocking IO 只调用了一个系统调用(recvfrom)。但是,用 select 的优势在于它可以同时处理多个 connection。

强调:

1. 如果处理的连接数不是很高的话,使用 select/epoll 的 web server 不一定比使用 multi-threading + blocking IO 的 web server 性能更好,可能延迟还更大。select/epoll 的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。

2. 在多路复用模型中,对于每一个 socket,一般都设置成为 non-blocking,但是,如上图所示,整个用户的 process 其实是一直被 block 的。只不过 process 是被 select 这个函数 block,而不是被 socket IO 给 block。

结论: select 的优势在于可以处理多个连接,不适用于单个连接

select网络IO模型

服务端

  1. from socket import *
  2. import select
  3. s=socket(AF_INET,SOCK_STREAM)
  4. s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
  5. s.bind(('127.0.0.1',8081))
  6. s.listen(5)
  7. s.setblocking(False) #设置socket的接口为非阻塞
  8. read_l=[s,]
  9. while True:
  10. r_l,w_l,x_l=select.select(read_l,[],[])
  11. print(r_l)
  12. for ready_obj in r_l:
  13. if ready_obj == s:
  14. conn,addr=ready_obj.accept() #此时的ready_obj等于s
  15. read_l.append(conn)
  16. else:
  17. try:
  18. data=ready_obj.recv(1024) #此时的ready_obj等于conn
  19. if not data:
  20. ready_obj.close()
  21. read_l.remove(ready_obj)
  22. continue
  23. ready_obj.send(data.upper())
  24. except ConnectionResetError:
  25. ready_obj.close()
  26. read_l.remove(ready_obj)

客户端

  1. from socket import *
  2. c=socket(AF_INET,SOCK_STREAM)
  3. c.connect(('127.0.0.1',8081))
  4. while True:
  5. msg=input('>>: ')
  6. if not msg:continue
  7. c.send(msg.encode('utf-8'))
  8. data=c.recv(1024)
  9. print(data.decode('utf-8'))

select 监听 fd 变化的过程分析:

#用户进程创建 socket 对象,拷贝监听的 fd 到内核空间,每一个 fd 会对应一张系统文件表,内核空间的 fd 响应到数据后,就会发送信号给用户进程数据已到; #用户进程再发送系统调用,比如(accept)将内核空间的数据 copy 到用户空间,同时作为接受数据端内核空间的数据清除,这样重新监听时 fd 再有新的数据又可以响应到了(发送端因为基于 TCP 协议所以需要收到应答后才会清除)。

该模型的优点:

#相比其他模型,使用 select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。

该模型的缺点:

#首先 select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。 #很多操作系统提供了更为高效的接口,如 linux 提供了 epoll,BSD 提供了 kqueue,Solaris 提供了/dev/poll,…。 #如果需要实现更高效的服务器程序,类似 epoll 这样的接口更被推荐。遗憾的是不同的操作系统特供的 epoll 接口有很大差异, #所以使用类似于 epoll 的接口实现具有较好跨平台能力的服务器会比较困难。 #其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。

IO 多路复用跟阻塞模型的区别在于下图的紫色部分

Day44 IO模型介绍,阻塞IO,非阻塞IO,多路复用IO,异步IO,IO模型比较分析,selectors模块,垃圾回收机制 - 图8

多了一个代理,以及中间的发送过程

针对一个连接,它的效率没有阻塞模型块

但是在多个连接中,它的优点在于多路复用

假设收快递的例子:

全班有 100 个人,每个人的快递,不需要自己去楼下收取,让指定一个人去收取。

假设说,这个人就是老师。这老师得有多闲啊,收快递…. anyway,这都不是重点。

老师这个角色,就相当于是代理。肩负了所有 IO 操作。

当快递来了,发起一个 recv 请求。接收之后,通知全班同学,xx 快递到了。

操作系统本身提供了代理机制

python 提供了 select 模块,来操作代理

操作系统中的 IO 多路复用的机制 select :

windows 操作系统提供给你的一种

监听接收数据 IO 的一个代理

select 模块:

python 使用操作系统 select 机制的功能

如果没有 select 模块,只有操作系统能用,但是 python 不能使用

如果有 select 模块,python 就能操作了

使用 socket 模拟 IO 多路复用

需要导入 select 模块

服务端

  1. import socket
  2. import select
  3. sk = socket.socket() # 创建套接字
  4. sk.bind(('127.0.0.1',9000))
  5. sk.setblocking(False) # 将套接字设为非阻塞模式
  6. sk.listen() # 监听
  7. l = [sk] # 列表放入一个套接字对象
  8. while True: #接收多个客户端连接
  9. r,w,x = select.select(l,[],[]) # 阻塞行为。如果没有数据,程序会一直等待,直到有数据为止<br> print(r) # 打印用户连接信息
  10. for obj in r: # obj表示用户连接信息
  11. if obj is sk: # 当obj等于sk时
  12. conn,addr = obj.accept() #接受一个客户端的连接请求,并返回一个新的套接字。address是连接客户端的地址
  13. l.append(conn) # conn是新的套接字对象,追加到列表中
  14. else:
  15. msg = obj.recv(1024) # 接收客户端发送的数据
  16. if not msg: # 当数据为空时
  17. obj.close() # 关闭客户端连接
  18. l.remove(obj) # 删除连接对象
  19. continue # 返回while循环最开始的地方
  20. print(msg) # 打印接收数据
  21. obj.send(b'bye') # 回复信息bye给客户端

客户端

  1. import socket
  2. sk = socket.socket()
  3. sk.connect(('127.0.0.1',9000))
  4. while True:
  5. sk.send(b'hello')
  6. print(sk.recv(1024))
  7. sk.close()

执行输出,效果如下:

Day44 IO模型介绍,阻塞IO,非阻塞IO,多路复用IO,异步IO,IO模型比较分析,selectors模块,垃圾回收机制 - 图9

通过一个线程,实现了多个客户端同时连接。

看 select 源码

  1. def select(rlist, wlist, xlist, timeout=None): # real signature unknown; restored from __doc__
  2. """
  3. select(rlist, wlist, xlist[, timeout]) -> (rlist, wlist, xlist)
  4. ...

解释一下 select 参数

  1. select内部自动监听sk1,sk2,sk3三个对象,监听三个句柄是否发生变化,把发生变化的元素放入r_list中。
  2. 如果有人连接sk1,则r_list = [sk1]
  3. 如果有人连接sk1sk2,则r_list = [sk1,sk2]
  4. select中第1个参数表示inputs中发生变化的句柄放入r_list
  5. select中第2个参数表示[]中的值原封不动的传递给w_list
  6. select中第3个参数表示inputs中发生错误的句柄放入e_list
  7. timeout 参数1表示1秒监听一次
  8. 当有用户连接时,r_list里面的内容[<socket.socket fd=316, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000), raddr=('127.0.0.1', 64305)>]

上面的 IO 多路复用模型,必须要熟悉。它是很重要的。

select 也有弊端,在操作系统内部就相当于执行了 for 循环,轮询了所有连接。

那么问题来了,操作系统,需要从头到尾,无时无刻,都在访问。名单越长,耗费的时间越长。

回到上面收快递的例子,假如有 100 个人,那么第 99 个人快递来了

老师必须从头到尾,问一遍,才能到 99。极大的浪费性能。

select - windows/linux/ios

poll - linux/ios 内部采用了一种数据结构的优化,让能监听的数据量变多了

epoll -linux # 回调函数

windows 只有 select,poll 和 epoll 是 linux 才有的。

poll 和 seelce 类似,poll 能代理的数量更大,速度没有很快

高级别的是 epoll,它是效率最快的

假设吴泰的快递来了,直接告诉吴泰,你的快递来了。

而不是用轮询方式,每一个人都问一遍。

五、异步 IO(Asynchronous I/O)

Linux 下的 asynchronous IO 其实用得不多,从内核 2.6 版本才开始引入。先看一下它的流程:

Day44 IO模型介绍,阻塞IO,非阻塞IO,多路复用IO,异步IO,IO模型比较分析,selectors模块,垃圾回收机制 - 图10

用户进程发起 read 操作之后,立刻就可以开始去做其它的事。而另一方面,从 kernel 的角度,当它受到一个 asynchronous read 之后,首先它会立刻返回,所以不会对用户进程产生任何 block。然后,kernel 会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel 会给用户进程发送一个 signal,告诉它 read 操作完成了。

六、IO 模型比较分析

到目前为止,已经将四个 IO Model 都介绍完了。现在回过头来回答最初的那几个问题:blocking 和 non-blocking 的区别在哪,synchronous IO 和 asynchronous IO 的区别在哪。

先回答最简单的这个:blocking vs non-blocking。前面的介绍中其实已经很明确的说明了这两者的区别。调用 blocking IO 会一直 block 住对应的进程直到操作完成,而 non-blocking IO 在 kernel 还准备数据的情况下会立刻返回。

再说明 synchronous IO 和 asynchronous IO 的区别之前,需要先给出两者的定义。Stevens 给出的定义(其实是 POSIX 的定义)是这样子的:

A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;

An asynchronous I/O operation does not cause the requesting process to be blocked;

两者的区别就在于 synchronous IO 做”IO operation”的时候会将 process 阻塞。按照这个定义,四个 IO 模型可以分为两大类,之前所述的 blocking IO,non-blocking IO,IO multiplexing 都属于 synchronous IO 这一类,而 asynchronous I/O 后一类 。

有人可能会说,non-blocking IO 并没有被 block 啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的 IO 操作,就是例子中的 recvfrom 这个 system call。non-blocking IO 在执行 recvfrom 这个 system call 的时候,如果 kernel 的数据没有准备好,这时候不会 block 进程。但是,当 kernel 中数据准备好的时候,recvfrom 会将数据从 kernel 拷贝到用户内存中,这个时候进程是被 block 了,在这段时间内,进程是被 block 的。而 asynchronous IO 则不一样,当进程发起 IO 操作之后,就直接返回再也不理睬了,直到 kernel 发送一个信号,告诉进程说 IO 完成。在这整个过程中,进程完全没有被 block。

各个 IO Model 的比较如图所示:

Day44 IO模型介绍,阻塞IO,非阻塞IO,多路复用IO,异步IO,IO模型比较分析,selectors模块,垃圾回收机制 - 图11

经过上面的介绍,会发现 non-blocking IO 和 asynchronous IO 的区别还是很明显的。在 non-blocking IO 中,虽然进程大部分时间都不会被 block,但是它仍然要求进程去主动的 check,并且当数据准备完成以后,也需要进程主动的再次调用 recvfrom 来将数据拷贝到用户内存。而 asynchronous IO 则完全不同。它就像是用户进程将整个 IO 操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查 IO 操作的状态,也不需要主动的去拷贝数据。

从上图可以看出,异步模型是最快的,但是不好实现。目前能实现的是 IO 多路复用

后面学习的 web 框架,自带异步。所以直接使用就可以了。

七、selectors 模块

select,poll,epoll

IO复用:为了解释这个名词,首先来理解下复用这个概念,复用也就是共用的意思,这样理解还是有些抽象,为此,咱们来理解下复用在通信领域的使用,在通信领域中为了充分利用网络连接的物理介质,往往在同一条网络链路上采用时分复用或频分复用的技术使其在同一链路上传输多路信号,到这里我们就基本上理解了复用的含义,即公用某个“介质”来尽可能多的做同一类(性质)的事,那IO复用的“介质”是什么呢?为此我们首先来看看服务器编程的模型,客户端发来的请求服务端会产生一个进程来对其进行服务,每当来一个客户请求就产生一个进程来服务,然而进程不可能无限制的产生,因此为了解决大量客户端访问的问题,引入了IO复用技术,即:一个进程可以同时对多个客户请求进行服务。也就是说IO复用的“介质”是进程(准确的说复用的是select和poll,因为进程也是靠调用select和poll来实现的),复用一个进程(select和poll)来对多个IO进行服务,虽然客户端发来的IO是并发的但是IO所需的读写数据多数情况下是没有准备好的,因此就可以利用一个函数(select和poll)来监听IO所需的这些数据的状态,一旦IO有数据可以进行读写了,进程就来对这样的IO进行服务。

理解完IO复用后,我们在来看下实现IO复用中的三个API(select、poll和epoll)的区别和联系

select,poll,epoll都是IO多路复用的机制,I/O多路复用就是通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知应用程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。三者的原型如下所示:

  1. int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
  2. int poll(struct pollfd *fds, nfds_t nfds, int timeout);
  3. int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

1.select的第一个参数nfds为fdset集合中最大描述符值加1,fdset是一个位数组,其大小限制__FD_SETSIZE(1024),位数组的每一位代表其对应的描述符是否需要被检查。第二三四参数表示需要关注读、写、错误事件的文件描述符位数组,这些参数既是输入参数也是输出参数,可能会被内核修改用于标示哪些描述符上发生了关注的事件,所以每次调用select前都需要重新初始化fdset。timeout参数为超时时间,该结构会被内核修改,其值为超时剩余的时间。

select的调用步骤如下:

(1)使用copy_from_user从用户空间拷贝fdset到内核空间

(2)注册回调函数__pollwait

(3)遍历所有fd,调用其对应的poll方法(对于socket,这个poll方法是sock_poll,sock_poll根据情况会调用到tcp_poll,udp_poll或者datagram_poll)

(4)以tcp_poll为例,其核心实现就是__pollwait,也就是上面注册的回调函数。

(5)__pollwait的主要工作就是把current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于tcp_poll 来说,其等待队列是sk->sk_sleep(注意把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数 据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒了。

(6)poll方法返回时会返回一个描述读写操作是否就绪的mask掩码,根据这个mask掩码给fd_set赋值。

(7)如果遍历完所有的fd,还没有返回一个可读写的mask掩码,则会调用schedule_timeout是调用select的进程(也就是 current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间(schedule_timeout 指定),还是没人唤醒,则调用select的进程会重新被唤醒获得CPU,进而重新遍历fd,判断有没有就绪的fd。

(8)把fd_set从内核空间拷贝到用户空间。

总结下select的几大缺点:

(1)每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大

(2)同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大

(3)select支持的文件描述符数量太小了,默认是1024

2.poll与select不同,通过一个pollfd数组向内核传递需要关注的事件,故没有描述符个数的限制,pollfd中的events字段和revents分别用于标示关注的事件和发生的事件,故pollfd数组只需要被初始化一次。

poll的实现机制与select类似,其对应内核中的sys_poll,只不过poll向内核传递pollfd数组,然后对pollfd中的每个描述符进行poll,相比处理fdset来说,poll效率更高。poll返回后,需要对pollfd中的每个元素检查其revents值,来得指事件是否发生。

3.直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

epoll既然是对select和poll的改进,就应该能避免上述的三个缺点。那epoll都是怎么解决的呢?在此之前,我们先看一下epoll 和select和poll的调用接口上的不同,select和poll都只提供了一个函数——select或者poll函数。而epoll提供了三个函 数,epoll_create,epoll_ctl和epoll_wait,epoll_create是创建一个epoll句柄;epoll_ctl是注 册要监听的事件类型;epoll_wait则是等待事件的产生。

对于第一个缺点,epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时(在epoll_ctl中指定 EPOLL_CTL_ADD),会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝 一次。

对于第二个缺点,epoll的解决方案不像select或poll一样每次都把current轮流加入fd对应的设备等待队列中,而只在 epoll_ctl时把current挂一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调 函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd(利用 schedule_timeout()实现睡一会,判断一会的效果,和select实现中的第7步是类似的)。

对于第三个缺点,epoll没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子, 在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。

总结:

(1)select,poll实现需要自己不断轮询所有fd集合,直到设备就绪,期间可能要睡眠和唤醒多次交替。而epoll其实也需要调用 epoll_wait不断轮询就绪链表,期间也可能多次睡眠和唤醒交替,但是它是设备就绪时,调用回调函数,把就绪fd放入就绪链表中,并唤醒在 epoll_wait中进入睡眠的进程。虽然都要睡眠和交替,但是select和poll在“醒着”的时候要遍历整个fd集合,而epoll在“醒着”的 时候只要判断一下就绪链表是否为空就行了,这节省了大量的CPU时间,这就是回调机制带来的性能提升。

(2)select,poll每次调用都要把fd集合从用户态往内核态拷贝一次,并且要把current往设备等待队列中挂一次,而epoll只要 一次拷贝,而且把current往等待队列上挂也只挂一次(在epoll_wait的开始,注意这里的等待队列并不是设备等待队列,只是一个ep

这三种 IO 多路复用模型在不同的平台有着不同的支持,而 epoll 在 windows 下就不支持,好在我们有 selectors 模块,帮我们默认选择当前平台下最合适的

基于selectors模块实现聊天

服务端

  1. from socket import *
  2. import selectors
  3. sel=selectors.DefaultSelector()
  4. def accept(server_fileobj,mask):
  5. conn,addr=server_fileobj.accept()
  6. sel.register(conn,selectors.EVENT_READ,read)
  7. def read(conn,mask):
  8. try:
  9. data=conn.recv(1024)
  10. if not data:
  11. print('closing',conn)
  12. sel.unregister(conn)
  13. conn.close()
  14. return
  15. conn.send(data.upper()+b'_SB')
  16. except Exception:
  17. print('closing', conn)
  18. sel.unregister(conn)
  19. conn.close()
  20. server_fileobj=socket(AF_INET,SOCK_STREAM)
  21. server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
  22. server_fileobj.bind(('127.0.0.1',8088))
  23. server_fileobj.listen(5)
  24. server_fileobj.setblocking(False) #设置socket的接口为非阻塞
  25. sel.register(server_fileobj,selectors.EVENT_READ,accept) #相当于网select的读列表里append了一个文件句柄server_fileobj,并且绑定了一个回调函数accept
  26. while True:
  27. events=sel.select() #检测所有的fileobj,是否有完成wait data的
  28. for sel_obj,mask in events:
  29. callback=sel_obj.data #callback=accpet
  30. callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)

客户端

  1. from socket import *
  2. c=socket(AF_INET,SOCK_STREAM)
  3. c.connect(('127.0.0.1',8088))
  4. while True:
  5. msg=input('>>: ')
  6. if not msg:continue
  7. c.send(msg.encode('utf-8'))
  8. data=c.recv(1024)
  9. print(data.decode('utf-8'))

selectors 模块,会自动判断操作系统,自动选择最优的方案。

一般是不会用 windows 系统做服务器。最常用的是 linux。

selectors 模块很少用。

可以尝试看 socketservr 的源码

它是 IO 多路复用 + 多线程 实现的

请参数链接:

解读socketserver源码 —— http://www.cnblogs.com/Eva-J/p/5081851.html

重点掌握 IO 多路复用

八、垃圾回收机制

GC(Garbage Collection),翻译过来,就是垃圾回收。

GC 作为现代编程语言的自动内存管理机制,专注于两件事:

  1. 找到内存中无用的垃圾资源

  2. 清除这些垃圾并把内存让出来给其他对象使用。

在 C 语言中,需要码农写 GC。但是 python 就不需要,它自动帮你做了。

链表

它内部有一个链表:

链表介绍

链表是一种物理存储单元上非连续、非顺序的存储结构,数据元素的逻辑顺序是通过链表中的指针链接次序实现的。链表由一系列结点(链表中每一个元素称为结点)组成,结点可以在运行时动态生成。每个结点包括两个部分:一个是存储数据元素的数据域,另一个是存储下一个结点地址的指针域。

看下图

Day44 IO模型介绍,阻塞IO,非阻塞IO,多路复用IO,异步IO,IO模型比较分析,selectors模块,垃圾回收机制 - 图12

看到图片上面的 2 个粘在一起的方块没有?

左边表示数据,右边是下个数据的指针。

链表是可以存无限的,但是查找,是采用轮询方式。

也就是说,有 100 个数据,我需要找 99。必须从 1 开始,一个个去找才可以。

引用计数

Python 语言默认采用的垃圾收集机制是【引用计数法 Reference Counting】

Python 使用引用计数追踪内存中的对象,当对象的引用计数变为 0,它将被垃圾回收

对象首次被创建,并且被赋值给变量时,该对象的引用计数为 1

每当该对象的引用被赋值给其他变量时,该对象引用计数自动加 1

当对象的一个引用被销毁时,引用计数会自动减 1

del 语句会删除对象的一个引用

这将导致该引用指向的对象的引用计数减 1

注意:任何追踪或调试程序会给对象增加一个额外引用,这会推迟该对象被回收的时间。

  1. A = 1 #对象A的引用计数为 1
  2. B = 2 #对象B的引用计数为 1
  3. del A #A的引用减 1,最后A对象的引用为 0
  4. del B #B的引用减 1, 最后B对象的引用为 0

垃圾回收器是一块独立的代码,用来寻找引用计数为 0 的对象。

垃圾回收器也负责检查那些虽然引用计数大于 0 但也该被销毁的对象,因为特定情形下会导致循环引用。

循环引用发生在当你有两个对象互相引用时,及时所有的引用都消失了,这些引用仍然存在。下面是一个循环引用的例子:

  1. A = []
  2. B = []
  3. A.append(B)
  4. B.append(A)

循环引用

Day44 IO模型介绍,阻塞IO,非阻塞IO,多路复用IO,异步IO,IO模型比较分析,selectors模块,垃圾回收机制 - 图13

一旦发生循环引用

就相互减 1

链表关系解决了循环引用的计数统计问题

分代回收

一个变量在引用计数为 0 的时候并不会立刻被解释器删掉,它是有生命周期的。

分代回收是一种以空间换时间的操作方式,Python 将内存根据对象的存活时间划分为不同的集合,每个集合称为一个代,Python 将内存分为了 3“代”,分别为年轻代(第 0 代)、中年代(第 1 代)、老年代(第 2 代),他们对应的是 3 个链表,它们的垃圾收集频率与对象的存活时间的增大而减小。

新创建的对象都会分配在年轻代,年轻代链表的总数达到上限时,Python 垃圾收集机制就会被触发,把那些可以被回收的对象回收掉,而那些不会回收的对象就会被移到中年代去,依此类推,老年代中的对象是存活时间最久的对象,甚至是存活于整个系统的生命周期内。

同时,分代回收是建立在标记清除技术基础之上。分代回收同样作为 Python 的辅助垃圾收集技术处理那些容器对象

垃圾回收时,python 不能进行其它的任务。频繁的垃圾回收会大大降低 python 的效率。如果内存中的对象不多,就没有必要总启动垃圾回收。所以,python 只会在特定的条件下,自动启动垃圾回收。当 python 运行时,会记录其中分配对象和取消分配对象的次数。当两者差值高于某个阈值,垃圾回收才会启动。垃圾回收时一定会扫描所有的 0 代对象,0 代经过一定次数的垃圾回收,就会启动对 0 代和 1 代的扫描清理,当 1 代也经历一定次数的垃圾回收后, 就会启动对 0 代 1 代和 2 代所有对象的扫描。

python 使用 gc 模块,来处理垃圾回收。

查看阈值上限:

  1. import gc
  2. print(gc.get_threshold())

执行输出:

(700, 10, 10)

分别表示第 0 代,第 1 代,第 2 代。

700 即是垃圾回收启动的阈值,返回的两个 10,每 10 次 0 代垃圾回收,会配合 1 次 1 代的垃圾回收。

而每 10 次 1 代的垃圾回收,才会有 1 次的 2 代垃圾回收。