环境
➜ ~ uname -a
Darwin chenshundeMacBook-Pro.local 20.3.0 Darwin Kernel Version 20.3.0: Thu Jan 21 00:07:06 PST 2021; root:xnu-7195.81.3~1/RELEASE_X86_64 x86_64
kqueue==> kernel queue
事件驱动: 事件是内核提供的,例如内核提供那些socket可读可写,然后应用获取到这些fd文件,读取数据/写入数据
API
kqueue: kernel event notification mechanism (内核事件通知机制)
int kqueue(void);
The kqueue() system call allocates a kqueue file descriptor. This file descriptor provides a generic
method of notifying the user when a kernel event (kevent) happens or a condition holds, based on the
results of small pieces of kernel code termed filters.
内核完成的,那些fd文件有变更(可读可写)
int kevent(int kq, const struct kevent changelist, int nchanges, struct kevent eventlist, int nevents, const struct timespec *timeout);
注册kevent到kqueue上
或者是从kqueue中查找发生变更的kevent
,注册和查找放在一个方法是为了减少系统调用的次数 (论文这么描述的) kq: kqueue对应的文件描述符 changelist: 注册到kqueue的event数组 nchanges: changelist的数量 eventlist: 发生变更的event数组 nevents: 变更的个数 timeout: 时间参数, If timeout is a non-NULL pointer, it specifies a maximum interval to wait for an event, which will be interpreted as a struct timespec. If timeout is a NULL pointer, both kevent() and kevent64() wait indefinitely.EV_SET(&kev, ident, filter, flags, fflags, data, udata);
设置数据, ident: 通常是文件描述符fd filter: 可以理解为当事件发生时,内核会如何处理这些event,例如可读事件/可写事件发生时
- EVFILT_READ
- EVFILT_WRITE
flags: 可以理解为添加到队列的时候如何操作,例如ADD/DEL/Modify
- EV_ADD
- EV_DELETE
- EV_ENABLE
fflags: Filter-specific flags. 不知道,常设置成0 data: Filter-specific data value. Filter中产生的数据,常设置成0
使用方式
这里还缺失一个
- acceptHandler, 客户端链接,将socket对应的fd文件注册到kqueue
- closeHandler,客户端断开,将socket对应的fd从kqueue移除
write and read handler,socket对应的读写事件
int kqfd = kqueue();
//将kqfd使用fcntl设置为非阻塞
int clientFd = getClientFd(...)//.... 从socket对应一个fd文件
struct kevent ke;
//从Java的角度来说就是ke实例化/赋值,将kevent放到队列中(EV_ADD)
//fd文件发生可读事件时,将该fd变更为活跃fd,放到对应kevent调用的eventList
EV_SET(&ke, clientFd, EVFILT_READ, EV_ADD,0,0,NULL);
//将kevent注册到queue上,eventList为0,立刻返回。
int ret = kevent(kqfd, &ke, 1, NULL, 0 , NULL);
//这段代码来自于redis
for (; ; ){
int retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, &timeout);
//处理fd即可
}
注册kevent
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct kevent ke;
if (mask & AE_READABLE) {
//&ke设置的对象,fd 文件描述符
//EVFILT_READ filter,当event发生读取事件的时候选中
//ev_add将event添加到queue
EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
}
if (mask & AE_WRITABLE) {
EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
}
return 0;
}
文档
-
java
public class KqueueExample {
public static void main(String[] args) throws IOException {
//开启一个ServerSocketChannel,对应了Socket IO中的ServerSocket
//int fd = getFd();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//int kqfd = kqueue()
final Selector selector = Selector.open();
serverSocketChannel.socket().setReuseAddress(true);
//bind
serverSocketChannel.socket().bind(new InetSocketAddress("localhost", 19998));
//f_cntl(fd, flags|n_block)
serverSocketChannel.configureBlocking(false);
//kevent(kqfd,ev_accept,ev_add,fd,1,NULL,0);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, serverSocketChannel);
System.out.println("-- server ready");
while (true) {
selector.select(10_000);
//int nums = kevent(kqfd,NULL,0,events,nevents,NULL)
Set<SelectionKey> keysToConsume = selector.selectedKeys();
keysToConsume.forEach(key -> {
try {
if (!key.isValid()) {
return;
}
if (key.isAcceptable()) {
ServerSocketChannel selectableChannel = (ServerSocketChannel) key.channel();
//accept
final SocketChannel accept = selectableChannel.accept();
if (accept != null) {
accept.configureBlocking(false);
accept.register(selector, SelectionKey.OP_READ);
}
} else if (key.isReadable()) {
final SocketChannel channel = (SocketChannel) key.channel();
channel.register(selector, SelectionKey.OP_WRITE);
final ByteBuffer allocate = ByteBuffer.allocate(64);
final int read = channel.read(allocate);
if (read == -1) {
return;
}
allocate.flip();
final byte[] array = allocate.array();
System.out.println("收到了消息:===>" + new String(array, 0, allocate.limit()).trim());
ByteBuffer writeBuffer = ByteBuffer.wrap(ACK);
Queue<Object> pendingWrites = channelToPendingWrites.get(channel);
if (pendingWrites == null) {
synchronized (channelToPendingWrites) {
pendingWrites = channelToPendingWrites.get(channel);
if (pendingWrites == null) {
pendingWrites = new ConcurrentLinkedQueue<>();
channelToPendingWrites.put(channel, pendingWrites);
}
}
}
pendingWrites.add(writeBuffer);
channel.register(selector, SelectionKey.OP_WRITE);
} else if (key.isWritable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
final Queue<Object> objects = channelToPendingWrites.get(socketChannel);
if (objects == null || objects.size() == 0) {
final ByteBuffer allocate = ByteBuffer.allocate(12);
allocate.put("hello,world!".getBytes());
socketChannel.write(allocate);
} else {
final Object poll = objects.poll();
socketChannel.write((ByteBuffer) poll);
}
socketChannel.register(selector, SelectionKey.OP_READ);
}
} catch (Exception e) {
e.printStackTrace();
}
});
keysToConsume.clear();
}
}
private static final Map<SelectableChannel, Queue<Object>> channelToPendingWrites = new ConcurrentHashMap<>();
private static final byte[] ACK = "Data logged successfully\n".getBytes();
}