select
poll
epoll
都是linux系统提供的 IO多路复用的函数
#include<sys/select.h>
#include<sys/poll.h>
#include<sys/epoll.h>
1. 从操作系统角度看多路复用
2. epoll与select, poll区别
- epoll每次调用只会向多路复用器传入新需要关注的文件描述符
- select和poll, 每次调用都会把所有需要关注的文件描述符都传进去, 非常浪费空间
- select 入参是个数组, select返回的是含有整个句柄的数组,应用程序需要遍历整个数组才能发现哪些句柄发生了事件;
- poll 入参是个链表
- select 和 poll 是水平触发, 只要有IO数据可读, 就会触发通知
- epoll开辟了一个内存空间, 专门保存需要监听的文件描述符(红黑树)
- epoll是边缘触发, 只有新的IO活动到来, 才会触发通知; 即使通道中有数据也不会再触发
举个例子:
第一轮轮询
- 需要关注100个文件描述符fd, epoll传入100个, select和poll也是传入100个
第二轮轮询
- 我只需要在原来的100个基础上再多关注1个, 那么第二次轮询
- epoll只需要传那新的一个就行了;
- 而select和poll函数需要传入101个文件描述符
3. epoll_wait函数
函数声明:
int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout)
该函数用于轮询I/O事件的发生;
参数:
epfd
:由epoll_create 生成的epoll专用的文件描述符;epoll_event
:用于回传代处理事件的数组;maxevents
:每次能处理的事件数;timeout
:等待I/O事件发生的超时值(单位我也不太清楚); -1相当于阻塞,0相当于非阻塞。一般用 -1 即可返回发生事件数。
4. 用C实现服务端
启动:
socket();
创建一个文件描述符, 例如4bind();
将这个文件描述符, 绑定到端口上, 例如4绑定到9090listen();
开始监听9090端口的网络事件epoll_create();
创建一个多路复用器的空间, 例如这个空间用文件描述符8表示;相当于java中的 Selector.open()
5. epoll_ctl(8,EPOLL_CTL_ADD,4,{EPOLLIN,{u32=4,u64=12412342345234345345345}})=0;
把监听网络的文件描述符4, 注册到文件描述符8的空间中, 并标识监听EPOLLIN
事件; 相当于java的server.register(selector, SelectorKey.OP_ACCEPT);
当新客户端连接发生的时候:
epoll_wait(8,{{EPOLLIN,{u32=4,u64=4234234234234234}},4096,-1})=1
入参最后一个参数是timeout,-1是阻塞,0相当于非阻塞, 大于0的数相当于等待时间, 也就是说可以阻塞也可以非阻塞,正好对应了 java中的: select.select(0); 注意: epoll_wait()函数的入参中有个是指针, 函数会把发生事件的文件描述符存入那个指针的epoll_event结构体中 相当于java中的select.select(0);
返回值=1 是告诉你有几个新的连接accept(4, .......) = 9
//9就是新拿到的文件描述符, 相当于java中的(ServerSocketChannel)(key.channel()).accept();
方法那么拿到的这个9就是新的连接, 分配到的文件描述符!!! 在java中相当于拿到了一个新的客户端的连接
客户端发送数据后:
epoll_ctl(8, EPOLL_CTL_ADD, 9, {EPOLLIN, {....}}) = 0;
8是多路复用器的文件描述符, 9是新的客户端连接的文件描述符; 这个操作由java中的client.register(selector, SelectionKey.OP_READ, buffer);
函数完成; 功能是: 把客户端的文件描述符9, 放入多路复用器8的空间中, 注册监听9的读取事件这时候客户端发送数据; 服务端:
epoll_wait(7, 接收文件描述符的结构体指针, -1) =1
; 相当于java的,select.select();
;那么C的函数,会把发生事件的文件描述符, 存入那个指针指向的结构体实例中; 我们就能从selector.selectedKeys()
中找到那个客户端的文件描述符- 然后服务端就可以读取socket缓冲区的数据了; C函数:
read(8, ....);
; java中, 是直接读取到了buffer中
服务端发送数据:
- 服务端向客户端发送数据 C函数:
write(8, ....);
5. 示例
#include <netinet/in.h> // sockaddr_in
#include <sys/types.h> // socket
#include <sys/socket.h> // socket
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/epoll.h> // epoll
#include <sys/ioctl.h>
#include <sys/time.h>
#include <iostream>
#include <vector>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <cstring>
using namespace std;
#define BUFFER_SIZE 1024
#define EPOLLSIZE 100
struct PACKET_HEAD
{
int length;
};
class Server
{
private:
struct sockaddr_in server_addr;
socklen_t server_addr_len;
int listen_fd; // 监听的fd
int epfd; // epoll fd
struct epoll_event events[EPOLLSIZE]; // epoll_wait返回的就绪事件
public:
Server(int port);
~Server();
void Bind();
void Listen(int queue_len = 20);
void Accept();
void Run();
void Recv(int fd);
};
Server::Server(int port)
{
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htons(INADDR_ANY);
server_addr.sin_port = htons(port);
// create socket to listen
listen_fd = socket(PF_INET, SOCK_STREAM, 0);
if (listen_fd < 0)
{
cout << "Create Socket Failed!";
exit(1);
}
int opt = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
}
Server::~Server()
{
close(epfd);
}
void Server::Bind()
{
if (-1 == (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr))))
{
cout << "Server Bind Failed!";
exit(1);
}
cout << "Bind Successfully.\n";
}
void Server::Listen(int queue_len)
{
if (-1 == listen(listen_fd, queue_len))
{
cout << "Server Listen Failed!";
exit(1);
}
cout << "Listen Successfully.\n";
}
void Server::Accept()
{
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int new_fd = accept(listen_fd, (struct sockaddr *)&client_addr, &client_addr_len);
if (new_fd < 0)
{
cout << "Server Accept Failed!";
exit(1);
}
cout << "new connection was accepted.\n";
// 在epfd中注册新建立的连接
struct epoll_event event;
event.data.fd = new_fd;
event.events = EPOLLIN;
epoll_ctl(epfd, EPOLL_CTL_ADD, new_fd, &event);
}
void Server::Run()
{
epfd = epoll_create(1); // 创建epoll句柄
struct epoll_event event;
event.data.fd = listen_fd;
event.events = EPOLLIN;
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &event); // 注册listen_fd
while (1)
{
int nums = epoll_wait(epfd, events, EPOLLSIZE, -1);
if (nums < 0)
{
cout << "poll() error!";
exit(1);
}
if (nums == 0)
{
continue;
}
for (int i = 0; i < nums; ++i) // 遍历所有就绪事件
{
int fd = events[i].data.fd;
if ((fd == listen_fd) && (events[i].events & EPOLLIN))
Accept(); // 有新的客户端请求
else if (events[i].events & EPOLLIN)
Recv(fd); // 读数据
else
;
}
}
}
void Server::Recv(int fd)
{
bool close_conn = false; // 标记当前连接是否断开了
PACKET_HEAD head;
recv(fd, &head, sizeof(head), 0); // 先接受包头,即数据总长度
char *buffer = new char[head.length];
bzero(buffer, head.length);
int total = 0;
while (total < head.length)
{
int len = recv(fd, buffer + total, head.length - total, 0);
if (len < 0)
{
cout << "recv() error!";
close_conn = true;
break;
}
total = total + len;
}
if (total == head.length) // 将收到的消息原样发回给客户端
{
int ret1 = send(fd, &head, sizeof(head), 0);
int ret2 = send(fd, buffer, head.length, 0);
if (ret1 < 0 || ret2 < 0)
{
cout << "send() error!";
close_conn = true;
}
}
delete buffer;
if (close_conn) // 当前这个连接有问题,关闭它
{
close(fd);
struct epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN;
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &event); // Delete一个fd
}
}
int main()
{
Server server(15000);
server.Bind();
server.Listen();
server.Run();
return 0;
}
这有几个文章不错 https://www.cnblogs.com/apprentice89/p/3234677.html https://www.cnblogs.com/apprentice89/archive/2013/05/06/3063039.html https://zhuanlan.zhihu.com/p/422229233 https://gcmurphy.wordpress.com/2012/11/30/using-epoll-in-go/