💡 如何拿到 cpu 个数

  • Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数
  • 这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启

分两组选择器

  • 单线程配一个选择器,专门处理 accept 事件
  • 创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理 read 事件
    ```java public class ChannelDemo7 { public static void main(String[] args) throws IOException {
    1. new BossEventLoop().register();
    }
  1. @Slf4j
  2. static class BossEventLoop implements Runnable {
  3. private Selector boss;
  4. private WorkerEventLoop[] workers;
  5. private volatile boolean start = false;
  6. AtomicInteger index = new AtomicInteger();
  7. public void register() throws IOException {
  8. if (!start) {
  9. ServerSocketChannel ssc = ServerSocketChannel.open();
  10. ssc.bind(new InetSocketAddress(8080));
  11. ssc.configureBlocking(false);
  12. boss = Selector.open();
  13. SelectionKey ssckey = ssc.register(boss, 0, null);
  14. ssckey.interestOps(SelectionKey.OP_ACCEPT);
  15. workers = initEventLoops();
  16. new Thread(this, "boss").start();
  17. log.debug("boss start...");
  18. start = true;
  19. }
  20. }
  21. public WorkerEventLoop[] initEventLoops() {

// EventLoop[] eventLoops = new EventLoop[Runtime.getRuntime().availableProcessors()]; WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[2]; for (int i = 0; i < workerEventLoops.length; i++) { workerEventLoops[i] = new WorkerEventLoop(i); } return workerEventLoops; }

  1. @Override
  2. public void run() {
  3. while (true) {
  4. try {
  5. boss.select();
  6. Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
  7. while (iter.hasNext()) {
  8. SelectionKey key = iter.next();
  9. iter.remove();
  10. if (key.isAcceptable()) {
  11. ServerSocketChannel c = (ServerSocketChannel) key.channel();
  12. SocketChannel sc = c.accept();
  13. sc.configureBlocking(false);
  14. log.debug("{} connected", sc.getRemoteAddress());
  15. workers[index.getAndIncrement() % workers.length].register(sc);
  16. }
  17. }
  18. } catch (IOException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. }
  23. }
  24. @Slf4j
  25. static class WorkerEventLoop implements Runnable {
  26. private Selector worker;
  27. private volatile boolean start = false;
  28. private int index;
  29. private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();
  30. public WorkerEventLoop(int index) {
  31. this.index = index;
  32. }
  33. public void register(SocketChannel sc) throws IOException {
  34. if (!start) {
  35. worker = Selector.open();
  36. new Thread(this, "worker-" + index).start();
  37. start = true;
  38. }
  39. tasks.add(() -> {
  40. try {
  41. SelectionKey sckey = sc.register(worker, 0, null);
  42. sckey.interestOps(SelectionKey.OP_READ);
  43. worker.selectNow();
  44. } catch (IOException e) {
  45. e.printStackTrace();
  46. }
  47. });
  48. worker.wakeup();
  49. }
  50. @Override
  51. public void run() {
  52. while (true) {
  53. try {
  54. worker.select();
  55. Runnable task = tasks.poll();
  56. if (task != null) {
  57. task.run();
  58. }
  59. Set<SelectionKey> keys = worker.selectedKeys();
  60. Iterator<SelectionKey> iter = keys.iterator();
  61. while (iter.hasNext()) {
  62. SelectionKey key = iter.next();
  63. if (key.isReadable()) {
  64. SocketChannel sc = (SocketChannel) key.channel();
  65. ByteBuffer buffer = ByteBuffer.allocate(128);
  66. try {
  67. int read = sc.read(buffer);
  68. if (read == -1) {
  69. key.cancel();
  70. sc.close();
  71. } else {
  72. buffer.flip();
  73. log.debug("{} message:", sc.getRemoteAddress());
  74. debugAll(buffer);
  75. }
  76. } catch (IOException e) {
  77. e.printStackTrace();
  78. key.cancel();
  79. sc.close();
  80. }
  81. }
  82. iter.remove();
  83. }
  84. } catch (IOException e) {
  85. e.printStackTrace();
  86. }
  87. }
  88. }
  89. }

} ```