服务端

  1. package cn.inetty.nio.worker;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.*;
  6. import java.util.Iterator;
  7. import java.util.concurrent.ConcurrentLinkedDeque;
  8. import java.util.concurrent.atomic.AtomicInteger;
  9. import static cn.inetty.nio.ByteBufferUtil.debugAll;
  10. // 编写多线程服务器
  11. public class iMultiThreadServer {
  12. public static void main(String[] args) throws IOException {
  13. Thread.currentThread().setName("boss");
  14. ServerSocketChannel ssc = ServerSocketChannel.open();
  15. Selector selector = Selector.open();
  16. ssc.configureBlocking(false);
  17. SelectionKey bossKey = ssc.register(selector, 0, null);
  18. bossKey.interestOps(SelectionKey.OP_ACCEPT);
  19. ssc.bind(new InetSocketAddress(8888));
  20. // 创建工作线程并初始化 此工作线程是固定数量的
  21. System.out.println(Runtime.getRuntime().availableProcessors());
  22. Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
  23. for (int i = 0; i < workers.length; i++) {
  24. workers[i] = new Worker("worker-"+i);
  25. }
  26. AtomicInteger index = new AtomicInteger();
  27. while (true) {
  28. selector.select();
  29. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
  30. while (iterator.hasNext()) {
  31. SelectionKey key = iterator.next();
  32. iterator.remove();
  33. if (key.isAcceptable()) {
  34. // 线程的 boss 负责连接
  35. SocketChannel channel = ssc.accept();
  36. channel.configureBlocking(false);
  37. System.out.println("connected: " + channel.getRemoteAddress());
  38. // 关联worker
  39. System.out.println("before: " + channel.getRemoteAddress());
  40. // 轮询 worker round robin
  41. workers[index.getAndIncrement() % workers.length].register(channel);
  42. System.out.println("after: " + channel.getRemoteAddress());
  43. }
  44. }
  45. }
  46. }
  47. // 工作线程
  48. static class Worker implements Runnable {
  49. private Thread thread;
  50. private Selector selector;
  51. private String name;
  52. private volatile boolean isStart = false;
  53. private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>();
  54. public Worker(String name) {
  55. this.name = name;
  56. }
  57. // 初始化线程 和 Selector
  58. public void register(SocketChannel channel) throws IOException {
  59. if (!isStart) {
  60. thread = new Thread(this, name);
  61. thread.start();
  62. selector = Selector.open();
  63. isStart = true;
  64. }
  65. // 添加任务到队列,但此时任务还没有被执行
  66. queue.add(()->{
  67. try {
  68. channel.register(selector, SelectionKey.OP_READ, null);
  69. } catch (ClosedChannelException e) {
  70. e.printStackTrace();
  71. }
  72. });
  73. selector.wakeup(); // 唤醒 worker-0 线程
  74. }
  75. @Override
  76. public void run() {
  77. while (true) {
  78. try {
  79. selector.select(); // worker-0 线程阻塞
  80. Runnable poll = queue.poll();// 取出队列
  81. if (poll!=null) {
  82. poll.run(); // 在此时真正执行了 channel.register(selector, SelectionKey.OP_READ, null);
  83. }
  84. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
  85. while (iterator.hasNext()) {
  86. SelectionKey key = iterator.next();
  87. SocketChannel channel = (SocketChannel) key.channel();
  88. System.out.println("thread: " + Thread.currentThread().getName() + "read: " + channel.getRemoteAddress());
  89. ByteBuffer buffer = ByteBuffer.allocate(16);
  90. channel.read(buffer);
  91. debugAll(buffer);
  92. }
  93. } catch (IOException e) {
  94. e.printStackTrace();
  95. }
  96. }
  97. }
  98. }
  99. }

客户端

  1. package cn.inetty.nio.worker;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.channels.SocketChannel;
  5. import java.nio.charset.Charset;
  6. public class Client1 {
  7. public static void main(String[] args) throws IOException {
  8. SocketChannel open = SocketChannel.open();
  9. open.connect(new InetSocketAddress("localhost", 8888));
  10. open.write(Charset.defaultCharset().encode("0123456789asdfghjkl"));
  11. System.in.read();
  12. }
  13. }