环境

  1. ~ uname -a
  2. Darwin chenshundeMacBook-Pro.local 20.3.0 Darwin Kernel Version 20.3.0: Thu Jan 21 00:07:06 PST 2021; root:xnu-7195.81.3~1/RELEASE_X86_64 x86_64

kqueue==> kernel queue

事件驱动: 事件是内核提供的,例如内核提供那些socket可读可写,然后应用获取到这些fd文件,读取数据/写入数据

API

kqueue: kernel event notification mechanism (内核事件通知机制)

  • int kqueue(void);

    1. The kqueue() system call allocates a kqueue file descriptor. This file descriptor provides a generic
    2. method of notifying the user when a kernel event (kevent) happens or a condition holds, based on the
    3. results of small pieces of kernel code termed filters.

    内核完成的,那些fd文件有变更(可读可写)

  • int kevent(int kq, const struct kevent changelist, int nchanges, struct kevent eventlist, int nevents, const struct timespec *timeout);

    注册kevent到kqueue上或者是 从kqueue中查找发生变更的kevent,注册和查找放在一个方法是为了减少系统调用的次数 (论文这么描述的) kq: kqueue对应的文件描述符 changelist: 注册到kqueue的event数组 nchanges: changelist的数量 eventlist: 发生变更的event数组 nevents: 变更的个数 timeout: 时间参数, If timeout is a non-NULL pointer, it specifies a maximum interval to wait for an event, which will be interpreted as a struct timespec. If timeout is a NULL pointer, both kevent() and kevent64() wait indefinitely.

  • EV_SET(&kev, ident, filter, flags, fflags, data, udata);

    设置数据, ident: 通常是文件描述符fd filter: 可以理解为当事件发生时,内核会如何处理这些event,例如可读事件/可写事件发生时

    • EVFILT_READ
    • EVFILT_WRITE

    flags: 可以理解为添加到队列的时候如何操作,例如ADD/DEL/Modify

    • EV_ADD
    • EV_DELETE
    • EV_ENABLE

    fflags: Filter-specific flags. 不知道,常设置成0 data: Filter-specific data value. Filter中产生的数据,常设置成0

使用方式

这里还缺失一个

  • acceptHandler, 客户端链接,将socket对应的fd文件注册到kqueue
  • closeHandler,客户端断开,将socket对应的fd从kqueue移除
  • write and read handler,socket对应的读写事件

    1. int kqfd = kqueue();
    2. //将kqfd使用fcntl设置为非阻塞
    3. int clientFd = getClientFd(...)//.... 从socket对应一个fd文件
    4. struct kevent ke;
    5. //从Java的角度来说就是ke实例化/赋值,将kevent放到队列中(EV_ADD)
    6. //fd文件发生可读事件时,将该fd变更为活跃fd,放到对应kevent调用的eventList
    7. EV_SET(&ke, clientFd, EVFILT_READ, EV_ADD,0,0,NULL);
    8. //将kevent注册到queue上,eventList为0,立刻返回。
    9. int ret = kevent(kqfd, &ke, 1, NULL, 0 , NULL);
    10. //这段代码来自于redis
    11. for (; ; ){
    12. int retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize, &timeout);
    13. //处理fd即可
    14. }

    注册kevent

    1. static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    2. aeApiState *state = eventLoop->apidata;
    3. struct kevent ke;
    4. if (mask & AE_READABLE) {
    5. //&ke设置的对象,fd 文件描述符
    6. //EVFILT_READ filter,当event发生读取事件的时候选中
    7. //ev_add将event添加到queue
    8. EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
    9. if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
    10. }
    11. if (mask & AE_WRITABLE) {
    12. EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
    13. if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
    14. }
    15. return 0;
    16. }

    文档

  • kqueue.pdf

  • redis ae_kqueue.c

    java

    1. public class KqueueExample {
    2. public static void main(String[] args) throws IOException {
    3. //开启一个ServerSocketChannel,对应了Socket IO中的ServerSocket
    4. //int fd = getFd();
    5. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    6. //int kqfd = kqueue()
    7. final Selector selector = Selector.open();
    8. serverSocketChannel.socket().setReuseAddress(true);
    9. //bind
    10. serverSocketChannel.socket().bind(new InetSocketAddress("localhost", 19998));
    11. //f_cntl(fd, flags|n_block)
    12. serverSocketChannel.configureBlocking(false);
    13. //kevent(kqfd,ev_accept,ev_add,fd,1,NULL,0);
    14. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, serverSocketChannel);
    15. System.out.println("-- server ready");
    16. while (true) {
    17. selector.select(10_000);
    18. //int nums = kevent(kqfd,NULL,0,events,nevents,NULL)
    19. Set<SelectionKey> keysToConsume = selector.selectedKeys();
    20. keysToConsume.forEach(key -> {
    21. try {
    22. if (!key.isValid()) {
    23. return;
    24. }
    25. if (key.isAcceptable()) {
    26. ServerSocketChannel selectableChannel = (ServerSocketChannel) key.channel();
    27. //accept
    28. final SocketChannel accept = selectableChannel.accept();
    29. if (accept != null) {
    30. accept.configureBlocking(false);
    31. accept.register(selector, SelectionKey.OP_READ);
    32. }
    33. } else if (key.isReadable()) {
    34. final SocketChannel channel = (SocketChannel) key.channel();
    35. channel.register(selector, SelectionKey.OP_WRITE);
    36. final ByteBuffer allocate = ByteBuffer.allocate(64);
    37. final int read = channel.read(allocate);
    38. if (read == -1) {
    39. return;
    40. }
    41. allocate.flip();
    42. final byte[] array = allocate.array();
    43. System.out.println("收到了消息:===>" + new String(array, 0, allocate.limit()).trim());
    44. ByteBuffer writeBuffer = ByteBuffer.wrap(ACK);
    45. Queue<Object> pendingWrites = channelToPendingWrites.get(channel);
    46. if (pendingWrites == null) {
    47. synchronized (channelToPendingWrites) {
    48. pendingWrites = channelToPendingWrites.get(channel);
    49. if (pendingWrites == null) {
    50. pendingWrites = new ConcurrentLinkedQueue<>();
    51. channelToPendingWrites.put(channel, pendingWrites);
    52. }
    53. }
    54. }
    55. pendingWrites.add(writeBuffer);
    56. channel.register(selector, SelectionKey.OP_WRITE);
    57. } else if (key.isWritable()) {
    58. SocketChannel socketChannel = (SocketChannel) key.channel();
    59. final Queue<Object> objects = channelToPendingWrites.get(socketChannel);
    60. if (objects == null || objects.size() == 0) {
    61. final ByteBuffer allocate = ByteBuffer.allocate(12);
    62. allocate.put("hello,world!".getBytes());
    63. socketChannel.write(allocate);
    64. } else {
    65. final Object poll = objects.poll();
    66. socketChannel.write((ByteBuffer) poll);
    67. }
    68. socketChannel.register(selector, SelectionKey.OP_READ);
    69. }
    70. } catch (Exception e) {
    71. e.printStackTrace();
    72. }
    73. });
    74. keysToConsume.clear();
    75. }
    76. }
    77. private static final Map<SelectableChannel, Queue<Object>> channelToPendingWrites = new ConcurrentHashMap<>();
    78. private static final byte[] ACK = "Data logged successfully\n".getBytes();
    79. }