转发 Linux下I/O多路复用select, poll, epoll 三种模型的Python使用

select, poll, epoll本质上都是同步的 I/O,因为它们都是在读写事件就绪后自己负责进行读写,这个读写的过程是阻塞的。

select, poll, epoll 都是一种 I/O 复用的机制。它们都是通过一种机制(由系统提供的)来监视多个描述符,一旦某个描述符就绪了,就能通知程序进行相应的读写操作。

select

select 的原理

select 是通过系统调用来监视着一个由多个文件描述符(file descriptor)组成的数组,当 select() 返回后,数组中就绪的文件描述符会被内核修改标记位 (其实就是一个整数),使得进程可以获得这些文件描述符从而进行后续的读写操作。select 饰通过遍历来监视整个数组的,而且每次遍历都是线性的。

select 的缺点

  1. 每次调用 select,都需要把 fd 集合从用户态拷贝到内核态,这个开销在 fd 很多的时候会很大
  2. 单个进程能够监视的 fd 数量存在最大限制,在 linux 上默认为 1024(可以通过修改宏定义或者重新编译内核的方式提升这个限制)
  3. 并且由于 select 的 fd 是放在数组中,并且每次都要线性遍历整个数组,当 fd 很多的时候,开销也很大

在 Python 中调用 select

Python 中,select,poll,epoll 和 unix 的 kqueue() 都在模块select中。

调用 select 的函数为select.select(rlist, wlist, xlist[, timeout]),前三个参数都分别是三个数组,数组中的对象均为waitable object: 均是整数的文件描述符(file descriptor)或者一个拥有返回文件描述符方法fileno()的对象;

  • rlist: 等待读就绪的 list
  • wlist: 等待写就绪的 list
  • xlist: 等待 “异常” 的 list

这三个 list 可以是一个空的 list,但是接收 3 个空的 list 是依赖于系统的 (在 Linux 上是可以接受的,但是在 window 上是不可以的)。

timeout参数是接受一个 float 的数字,单位是。当缺省timeout时,select 会一直阻塞之道至少有一个文件描述符 (fd) 准备就绪。如果timeout设为 0 时,则 select 不会阻塞。

函数的返回值是返回三个准备就绪的 list: 对应者rlist, wlist, xlist这三个 list 的子集。如果 timeout,会返回 3 个空的 list。

在 list 中可以接受 Ptython 的的file对象 (比如sys.stdin,或者会被open()os.open()返回的 object),socket object 将会返回socket.socket()。也可以自定义类,只要有一个合适的fileno()的方法 (需要真实返回一个文件描述符,而不是一个随机的整数)。

Python 的简单示例

  1. import select, socket
  2. response = b"hello world"
  3. serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  4. serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  5. serversocket.bind(('localhost', 8080))
  6. serversocket.listen(1)
  7. serversocket.setblocking(0)
  8. inputs = [serversocket, ]
  9. while True:
  10. rlist, wlist, xlist = select.select(inputs, [], [])
  11. for sock in rlist:
  12. server socket读就绪
  13. if sock == serversocket:
  14. con, addr = serversocket.accept()
  15. #将这个connection添加到读就绪中
  16. inputs.append(con)
  17. else:
  18. data = sock.recv(1024)
  19. if data:
  20. sock.send(response)
  21. #从读就绪的list中删除
  22. inputs.remove(sock)
  23. sock.close()

poll

poll 的原理

poll 本质上和 select 没有区别,只是没有了最大连接数 (linux 上默认 1024 个) 的限制,原因是它基于链表存储的。

poll 的缺点

poll 除了没有了最大连接数的缺点,其他都和 select 一样

在 Python 中调用 poll

  • select.poll(),返回一个 poll 的对象,支持注册和注销文件描述符。
  • poll.register(fd[, eventmask])注册一个文件描述符,注册后,可以通过poll()方法来检查是否有对应的 I/O 事件发生。fd可以是 i 个整数,或者有返回整数的fileno()方法对象。如果 File 对象实现了 fileno(),也可以当作参数使用。
  • eventmask是一个你想去检查的事件类型,它可以是常量POLLIN, POLLPRIPOLLOUT的组合。如果缺省,默认会去检查所有的 3 种事件类型。 | 事件常量 | 意义 | | —- | —- | | POLLIN | 有数据读取 | | POLLPRT | 有数据紧急读取 | | POLLOUT | 准备输出: 输出不会阻塞 | | POLLERR | 某些错误情况出现 | | POLLHUP | 挂起 | | POLLNVAL | 无效请求: 描述无法打开 |
  • poll.modify(fd, eventmask) 修改一个已经存在的 fd,和poll.register(fd, eventmask)有相同的作用。如果去尝试修改一个未经注册的 fd,会引起一个errnoENOENTIOError
  • poll.unregister(fd)从 poll 对象中注销一个 fd。尝试去注销一个未经注册的 fd,会引起KeyError
  • poll.poll([timeout])去检测已经注册了的文件描述符。会返回一个可能为空的 list,list 中包含着(fd, event)这样的二元组。 fd是文件描述符, event是文件描述符对应的事件。如果返回的是一个空的 list,则说明超时了且没有文件描述符有事件发生。timeout的单位是milliseconds,如果设置了timeout,系统将会等待对应的时间。如果timeout缺省或者是None,这个方法将会阻塞直到对应的 poll 对象有一个事件发生。

Python 简单示例

  1. import select, socket
  2. response = b"hello world"
  3. serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  4. serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  5. serversocket.bind(('192.168.199.197', 8080))
  6. serversocket.listen(1)
  7. serversocket.setblocking(0)
  8. poll = select.poll()
  9. poll.register(serversocket.fileno(), select.POLLIN)
  10. connections = {}
  11. while True:
  12. for fd, event in poll.poll():
  13. if event == select.POLLIN:
  14. if fd == serversocket.fileno():
  15. con, addr = serversocket.accept()
  16. poll.register(con.fileno(), select.POLLIN)
  17. connections[con.fileno()] = con
  18. else:
  19. con = connections[fd]
  20. data = con.recv(1024)
  21. if data:
  22. poll.modify(con.fileno(), select.POLLOUT)
  23. elif event == select.POLLOUT:
  24. con = connections[fd]
  25. con.send(response)
  26. poll.unregister(con.fileno())
  27. con.close()

epoll

epoll 的原理及改进

在 linux2.6(准确来说是 2.5.44)由内核直接支持的方法。epoll 解决了 select 和 poll 的缺点。

  • 对于第一个缺点,epoll 的解决方法是每次注册新的事件到 epoll 中,会把所有的 fd 拷贝进内核,而不是在等待的时候重复拷贝,保证了每个 fd 在整个过程中只会拷贝 1 次。
  • 对于第二个缺点,epoll 没有这个限制,它所支持的 fd 上限是最大可以打开文件的数目,具体数目可以 cat /proc/sys/fs/file-max 查看,一般来说这个数目和系统内存关系比较大。
  • 对于第三个缺点,epoll 的解决方法不像 select 和 poll 每次对所有 fd 进行遍历轮询所有 fd 集合,而是在注册新的事件时,为每个 fd 指定一个回调函数,当设备就绪的时候,调用这个回调函数,这个回调函数就会把就绪的 fd 加入一个就绪表中。(所以 epoll 实际只需要遍历就绪表)。

epoll 同时支持水平触发和边缘触发:

  • 水平触发(level-triggered):只要满足条件,就触发一个事件 (只要有数据没有被获取,内核就不断通知你)。e.g: 在水平触发模式下,重复调用epoll.poll()会重复通知关注的 event,直到与该 event 有关的所有数据都已被处理。(select, poll 是水平触发, epoll 默认水平触发)
  • 边缘触发(edge-triggered):每当状态变化时,触发一个事件。e.g: 在边沿触发模式中,epoll.poll() 在读或者写 event 在 socket 上面发生后,将只会返回一次 event。调用epoll.poll()的程序必须处理所有和这个 event 相关的数据,随后的epoll.poll()调用不会再有这个 event 的通知。

在 Python 中调用 epoll

  • select.epoll([sizehint=-1])返回一个 epoll 对象。
  • eventmask | 事件常量 | 意义 | | —- | —- | | EPOLLIN | 读就绪 | | EPOLLOUT | 写就绪 | | EPOLLPRI | 有数据紧急读取 | | EPOLLERR | assoc. fd 有错误情况发生 | | EPOLLHUP | assoc. fd 发生挂起 | | EPOLLRT | 设置边缘触发 (ET)(默认的是水平触发) | | EPOLLONESHOT | 设置为 one-short 行为,一个事件 (event) 被拉出后,对应的 fd 在内部被禁用 | | EPOLLRDNORM | 和 EPOLLIN 相等 | | EPOLLRDBAND | 优先读取的数据带 (data band) | | EPOLLWRNORM | 和 EPOLLOUT 相等 | | EPOLLWRBAND | 优先写的数据带 (data band) | | EPOLLMSG | 忽视 |
  • epoll.close()关闭 epoll 对象的文件描述符。
  • epoll.fileno返回 control fd 的文件描述符 number。
  • epoll.fromfd(fd)用给予的 fd 来创建一个 epoll 对象。
  • epoll.register(fd[, eventmask])在 epoll 对象中注册一个文件描述符。(如果文件描述符已经存在,将会引起一个IOError
  • epoll.modify(fd, eventmask)修改一个已经注册的文件描述符。
  • epoll.unregister(fd)注销一个文件描述符。
  • epoll.poll(timeout=-1[, maxevnets=-1])等待事件,timeout(float) 的单位是秒(second)。

Ptython 示例

epoll 的示例就直接引用这篇出名的 blog

  1. import socket, select
  2. EOL1 = b'\n\n'
  3. EOL2 = b'\n\r\n'
  4. response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
  5. response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
  6. response += b'Hello, world!'
  7. serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  8. serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  9. serversocket.bind(('0.0.0.0', 8080))
  10. serversocket.listen(1)
  11. serversocket.setblocking(0)
  12. epoll = select.epoll()
  13. epoll.register(serversocket.fileno(), select.EPOLLIN)
  14. try:
  15. connections = {}; requests = {}; responses = {}
  16. while True:
  17. events = epoll.poll(1)
  18. for fileno, event in events:
  19. if fileno == serversocket.fileno():
  20. connection, address = serversocket.accept()
  21. connection.setblocking(0)
  22. epoll.register(connection.fileno(), select.EPOLLIN)
  23. connections[connection.fileno()] = connection
  24. requests[connection.fileno()] = b''
  25. responses[connection.fileno()] = response
  26. elif event & select.EPOLLIN:
  27. requests[fileno] += connections[fileno].recv(1024)
  28. if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
  29. epoll.modify(fileno, select.EPOLLOUT)
  30. print('-'*40 + '\n' + requests[fileno].decode()[:-2])
  31. elif event & select.EPOLLOUT:
  32. byteswritten = connections[fileno].send(responses[fileno])
  33. responses[fileno] = responses[fileno][byteswritten:]
  34. if len(responses[fileno]) == 0:
  35. epoll.modify(fileno, 0)
  36. connections[fileno].shutdown(socket.SHUT_RDWR)
  37. elif event & select.EPOLLHUP:
  38. epoll.unregister(fileno)
  39. connections[fileno].close()
  40. del connections[fileno]
  41. finally:
  42. epoll.unregister(serversocket.fileno())
  43. epoll.close()
  44. serversocket.close()