💡 利用多线程优化

现在都是多核 cpu,设计时要充分考虑别让 cpu 的力量被白白浪费

前面的代码只有一个选择器,没有充分利用多核 cpu,如何改进呢?
分两组选择器

  • 单线程配一个选择器,专门处理 accept 事件
  • 创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理 read 事件

1、第一种多线程实现方式

思路 : 主线程监听接收事件,主线程将客户端通道注册 到worker线程选择器,并设置监听可读事件,由worker线程进行读操作
> 问题:> 主线程的> clientChannel.register(worker.selector,SelectionKey.OP_READ);> 语句和worker线程的> selector.select();> 执行顺序问题 一旦,worker线程陷入阻塞,那么客户端通道注册可读事件无效,worker线程无法感知到读事件

  1. @Slf4j
  2. public class MultiThreadServer {
  3. public static void main(String[] args) throws IOException {
  4. Thread.currentThread().setName("boss");
  5. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  6. serverSocketChannel.configureBlocking(false);
  7. serverSocketChannel.bind(new InetSocketAddress(8989));
  8. Selector bossSelector = Selector.open();
  9. SelectionKey bossKey = serverSocketChannel.register(bossSelector, SelectionKey.OP_ACCEPT);
  10. //创建固定数量的worker
  11. Worker worker = new Worker("worker-0");
  12. worker.register();
  13. while (true){
  14. bossSelector.select();
  15. Iterator<SelectionKey> iterator = bossSelector.selectedKeys().iterator();
  16. while (iterator.hasNext()){
  17. SelectionKey key = iterator.next();
  18. iterator.remove();
  19. if (key.isAcceptable()) {
  20. SocketChannel clientChannel = serverSocketChannel.accept();
  21. clientChannel.configureBlocking(false);
  22. log.info("connected...{}",clientChannel.getRemoteAddress());
  23. //关联worker的selector
  24. log.info("before register...{}",clientChannel.getRemoteAddress());
  25. clientChannel.register(worker.selector,SelectionKey.OP_READ);
  26. log.info("after register...{}",clientChannel.getRemoteAddress());
  27. }
  28. }
  29. }
  30. }
  31. static class Worker implements Runnable{
  32. private Thread thread;
  33. private Selector selector;
  34. private String name;
  35. private volatile boolean start = false;
  36. public Worker(String name) {
  37. this.name = name;
  38. }
  39. public synchronized void register() throws IOException {
  40. if(!start){
  41. thread = new Thread(this);
  42. selector = Selector.open();
  43. start = true;
  44. thread.start();
  45. }
  46. }
  47. @Override
  48. public void run() {
  49. while (true){
  50. try {
  51. //选择器阻塞时,无法反映通道注册时间
  52. selector.select();
  53. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
  54. while (iterator.hasNext()){
  55. SelectionKey key = iterator.next();
  56. iterator.remove();
  57. if (key.isReadable()) {
  58. ByteBuffer buffer = ByteBuffer.allocate(1024);
  59. SocketChannel clientChannel = (SocketChannel) key.channel();
  60. log.info("read...{}",clientChannel.getRemoteAddress());
  61. clientChannel.read(buffer);
  62. buffer.flip();
  63. debugAll(buffer);
  64. }
  65. }
  66. } catch (IOException e) {
  67. e.printStackTrace();
  68. }
  69. }
  70. }
  71. }
  72. }

2、使用队列解决第一种方式出现的问题

把注册选择器事件通过队列也交给worker线程处理,然后唤醒selector,从队列取出注册事件执行

  1. public class MultiThreadServer {
  2. public static void main(String[] args) throws IOException {
  3. Thread.currentThread().setName("boss");
  4. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  5. serverSocketChannel.configureBlocking(false);
  6. serverSocketChannel.bind(new InetSocketAddress(8989));
  7. Selector bossSelector = Selector.open();
  8. SelectionKey bossKey = serverSocketChannel.register(bossSelector, SelectionKey.OP_ACCEPT);
  9. //创建固定数量的worker
  10. Worker worker = new Worker("worker-0");
  11. while (true){
  12. bossSelector.select();
  13. Iterator<SelectionKey> iterator = bossSelector.selectedKeys().iterator();
  14. while (iterator.hasNext()){
  15. SelectionKey key = iterator.next();
  16. iterator.remove();
  17. if (key.isAcceptable()) {
  18. SocketChannel clientChannel = serverSocketChannel.accept();
  19. clientChannel.configureBlocking(false);
  20. log.info("connected...{}",clientChannel.getRemoteAddress());
  21. //关联worker的selector
  22. log.info("before register...{}",clientChannel.getRemoteAddress());
  23. worker.register(clientChannel);
  24. log.info("after register...{}",clientChannel.getRemoteAddress());
  25. }
  26. }
  27. }
  28. }
  29. static class Worker implements Runnable{
  30. private Thread thread;
  31. private Selector selector;
  32. private String name;
  33. private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque();
  34. private volatile boolean start = false;
  35. public Worker(String name) {
  36. this.name = name;
  37. }
  38. /**
  39. * 第一次调用会创建work线程 每次注册新事件后唤醒selector
  40. * @param clientChannel
  41. * @throws IOException
  42. */
  43. public synchronized void register(SocketChannel clientChannel) throws IOException {
  44. if(!start){
  45. thread = new Thread(this);
  46. selector = Selector.open();
  47. start = true;
  48. thread.start();
  49. }
  50. //通过队列传递任务,让worker线程自己注册
  51. queue.add(() -> {
  52. try {
  53. clientChannel.register(selector,SelectionKey.OP_READ);
  54. } catch (ClosedChannelException e) {
  55. e.printStackTrace();
  56. }
  57. });
  58. //唤醒selector
  59. selector.wakeup();
  60. }
  61. @Override
  62. public void run() {
  63. while (true){
  64. try {
  65. selector.select();
  66. Runnable task = queue.poll();
  67. if(task != null){
  68. task.run();
  69. }
  70. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
  71. while (iterator.hasNext()){
  72. SelectionKey key = iterator.next();
  73. iterator.remove();
  74. if (key.isReadable()) {
  75. ByteBuffer buffer = ByteBuffer.allocate(1024);
  76. SocketChannel clientChannel = (SocketChannel) key.channel();
  77. log.info("read...{}",clientChannel.getRemoteAddress());
  78. clientChannel.read(buffer);
  79. buffer.flip();
  80. System.out.println(StandardCharsets.UTF_8.decode(buffer));
  81. buffer.clear();
  82. }
  83. }
  84. } catch (IOException e) {
  85. e.printStackTrace();
  86. }
  87. }
  88. }
  89. }
  90. }