一、单线程模式

1.1 模型图

image.png

1.2 代码实现

  1. public class Main {
  2. public static void main(String[] args) throws IOException {
  3. TCPReactor tcpReactor = new TCPReactor(9090);
  4. tcpReactor.run();
  5. }
  6. }
  7. public class TCPReactor implements Runnable {
  8. private ServerSocketChannel serverSocketChannel;
  9. private Selector selector;
  10. public TCPReactor(int port) throws IOException {
  11. selector = Selector.open();
  12. serverSocketChannel = ServerSocketChannel.open();
  13. ServerSocket socket = serverSocketChannel.socket();
  14. socket.bind(new InetSocketAddress(port)); // 绑定端口
  15. serverSocketChannel.configureBlocking(false); // 设置非阻塞
  16. SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 处理 accept 事件
  17. selectionKey.attach(new Acceptor(selector, serverSocketChannel)); // 附件一个对象
  18. }
  19. @Override
  20. public void run() {
  21. while (!Thread.interrupted()) {
  22. System.out.println("Waiting for new event on port: " + serverSocketChannel.socket().getLocalPort());
  23. try {
  24. if (selector.select() == 0) {
  25. continue;
  26. }
  27. } catch (IOException e) {
  28. e.printStackTrace();
  29. }
  30. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  31. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  32. while (iterator.hasNext()) {
  33. SelectionKey selectionKey = iterator.next();
  34. dispatch(selectionKey);
  35. iterator.remove();
  36. }
  37. }
  38. }
  39. private void dispatch(SelectionKey selectionKey) {
  40. // 第一次是 accept 事件(Acceptor)
  41. // 后面就有可能是 read 事件(TCPHandler),根据需要进行 dispatch
  42. Runnable r = (Runnable) selectionKey.attachment();
  43. if (r != null) {
  44. r.run();
  45. }
  46. }
  47. }
  48. public class Acceptor implements Runnable {
  49. Selector selector;
  50. ServerSocketChannel serverSocketChannel;
  51. public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
  52. this.selector = selector;
  53. this.serverSocketChannel = serverSocketChannel;
  54. }
  55. @Override
  56. public void run() {
  57. try {
  58. SocketChannel socketChannel = serverSocketChannel.accept(); // 接收客户端请求
  59. System.out.println(socketChannel.socket().getRemoteSocketAddress().toString() + " is connected");
  60. if (socketChannel != null) {
  61. socketChannel.configureBlocking(false); // 设置非阻塞
  62. SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
  63. selectionKey.attach(new TCPHandler(selectionKey, socketChannel));
  64. selector.wakeup(); // 让一个阻塞的 selector 操作立即返回
  65. }
  66. } catch (IOException e) {
  67. e.printStackTrace();
  68. }
  69. }
  70. }
  71. public class TCPHandler implements Runnable {
  72. SelectionKey selectionKey;
  73. SocketChannel socketChannel;
  74. int state = 0;
  75. public TCPHandler(SelectionKey selectionKey, SocketChannel socketChannel) {
  76. this.selectionKey = selectionKey;
  77. this.socketChannel = socketChannel;
  78. state = 0; // 初始状态设置未 READING
  79. }
  80. @Override
  81. public void run() {
  82. try {
  83. if (state == 0) {
  84. read(); // 读取数据
  85. } else {
  86. send(); // 发送数据
  87. }
  88. } catch (Exception ex) {
  89. System.out.println("[warning] A client has been closed");
  90. closeChannel();
  91. }
  92. }
  93. private void closeChannel() {
  94. try {
  95. socketChannel.close();
  96. } catch (IOException e) {
  97. e.printStackTrace();
  98. }
  99. }
  100. private synchronized void read() throws IOException {
  101. ByteBuffer buffer = ByteBuffer.allocate(1024);
  102. int len = socketChannel.read(buffer);
  103. if (len == -1) {
  104. System.out.println("[warning] A client has been closed");
  105. closeChannel();
  106. return;
  107. }
  108. String content = new String(buffer.array());
  109. if (!content.equals(" ")) {
  110. System.out.println(socketChannel.socket().getRemoteSocketAddress().toString() + " > " + content);
  111. process(content);
  112. state = 1; // 改变状态
  113. selectionKey.interestOps(SelectionKey.OP_WRITE); // 通过 key 改变通道注册事件
  114. selectionKey.selector().wakeup(); // 唤醒 selector
  115. }
  116. }
  117. private void send() throws IOException {
  118. String response = "Your message has send to " + socketChannel.socket().getRemoteSocketAddress().toString() + "\r\n";
  119. ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
  120. while (buffer.hasRemaining()) {
  121. socketChannel.write(buffer);
  122. }
  123. state = 0;
  124. selectionKey.interestOps(SelectionKey.OP_READ); // 通过 key 改变通道注册事件
  125. selectionKey.selector().wakeup(); // 唤醒 selector
  126. }
  127. private void process(String content) {
  128. try {
  129. TimeUnit.SECONDS.sleep(2);
  130. } catch (InterruptedException e) {
  131. e.printStackTrace();
  132. }
  133. }
  134. }

二、多线程模型

2.1 模型图

image.png

2.2 代码实现

  1. public class Main {
  2. public static void main(String[] args) throws IOException {
  3. TCPReactor tcpReactor = new TCPReactor(9090);
  4. tcpReactor.run();
  5. }
  6. }
  7. public class TCPReactor implements Runnable {
  8. private ServerSocketChannel serverSocketChannel;
  9. private Selector selector;
  10. public TCPReactor(int port) throws IOException {
  11. selector = Selector.open();
  12. serverSocketChannel = ServerSocketChannel.open();
  13. ServerSocket socket = serverSocketChannel.socket();
  14. socket.bind(new InetSocketAddress(port)); // 绑定端口
  15. serverSocketChannel.configureBlocking(false); // 设置非阻塞
  16. SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 处理 accept 事件
  17. selectionKey.attach(new Acceptor(selector, serverSocketChannel)); // 附件一个对象
  18. }
  19. @Override
  20. public void run() {
  21. while (!Thread.interrupted()) {
  22. System.out.println("Waiting for new event on port: " + serverSocketChannel.socket().getLocalPort());
  23. try {
  24. if (selector.select() == 0) {
  25. continue;
  26. }
  27. } catch (IOException e) {
  28. e.printStackTrace();
  29. }
  30. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  31. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  32. while (iterator.hasNext()) {
  33. SelectionKey selectionKey = iterator.next();
  34. dispatch(selectionKey);
  35. iterator.remove();
  36. }
  37. }
  38. }
  39. private void dispatch(SelectionKey selectionKey) {
  40. // 第一次是 accept 事件(Acceptor)
  41. // 后面就有可能是 read 事件(TCPHandler),根据需要进行 dispatch
  42. Runnable r = (Runnable) selectionKey.attachment();
  43. if (r != null) {
  44. r.run();
  45. }
  46. }
  47. }
  48. public class TCPHandler implements Runnable {
  49. SelectionKey selectionKey;
  50. SocketChannel socketChannel;
  51. HandlerState state;
  52. private static final int THREAD_COUNTING = 10;
  53. static ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_COUNTING, THREAD_COUNTING,
  54. 120, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
  55. public TCPHandler(SelectionKey selectionKey, SocketChannel socketChannel) {
  56. this.selectionKey = selectionKey;
  57. this.socketChannel = socketChannel;
  58. state = new ReadState(); // 初始状态设置未 READING
  59. pool.setMaximumPoolSize(32);
  60. }
  61. @Override
  62. public void run() {
  63. try {
  64. state.handle(this, selectionKey, socketChannel, pool);
  65. } catch (Exception ex) {
  66. System.out.println("[warning] A client has been closed");
  67. closeChannel();
  68. }
  69. }
  70. public void closeChannel() {
  71. try {
  72. socketChannel.close();
  73. } catch (IOException e) {
  74. e.printStackTrace();
  75. }
  76. }
  77. public void setState(HandlerState state) {
  78. this.state = state;
  79. }
  80. }
  81. public interface HandlerState {
  82. void changeState(TCPHandler handler);
  83. void handle(TCPHandler handler, SelectionKey key, SocketChannel sc, ThreadPoolExecutor pool) throws IOException;
  84. }
  85. public class ReadState implements HandlerState {
  86. private SelectionKey sk;
  87. @Override
  88. public void changeState(TCPHandler handler) {
  89. handler.setState(new WorkState());
  90. }
  91. @Override
  92. public void handle(TCPHandler handler, SelectionKey sk, SocketChannel sc, ThreadPoolExecutor pool) throws IOException {
  93. this.sk = sk;
  94. byte[] arr = new byte[1024];
  95. ByteBuffer buf = ByteBuffer.wrap(arr);
  96. int numBytes = sc.read(buf); // 讀取字符串
  97. if (numBytes == -1) {
  98. System.out.println("[Warning!] A client has been closed.");
  99. handler.closeChannel();
  100. return;
  101. }
  102. String str = new String(arr);
  103. if (!str.equals(" ")) {
  104. handler.setState(new WorkState()); // 改變狀態(READING->WORKING)
  105. pool.execute(new WorkerThread(handler, str)); // do process in worker thread
  106. System.out.println(sc.socket().getRemoteSocketAddress().toString() + " > " + str);
  107. }
  108. }
  109. /*
  110. * 執行邏輯處理之函數
  111. */
  112. synchronized void process(TCPHandler h, String str) {
  113. try {
  114. TimeUnit.SECONDS.sleep(2);
  115. } catch (InterruptedException e) {
  116. e.printStackTrace();
  117. }
  118. h.setState(new WriteState()); // 改變狀態(WORKING->SENDING)
  119. this.sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件
  120. this.sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
  121. }
  122. class WorkerThread implements Runnable {
  123. TCPHandler h;
  124. String str;
  125. public WorkerThread(TCPHandler h, String str) {
  126. this.h = h;
  127. this.str = str;
  128. }
  129. @Override
  130. public void run() {
  131. process(h, str);
  132. }
  133. }
  134. }
  135. public class WorkState implements HandlerState {
  136. @Override
  137. public void changeState(TCPHandler h) {
  138. h.setState(new WriteState());
  139. }
  140. @Override
  141. public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
  142. ThreadPoolExecutor pool) throws IOException {
  143. }
  144. }
  145. public class WriteState implements HandlerState {
  146. @Override
  147. public void changeState(TCPHandler h) {
  148. h.setState(new ReadState());
  149. }
  150. @Override
  151. public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
  152. ThreadPoolExecutor pool) throws IOException {
  153. // get message from message queue
  154. String str = "Your message has sent to " + sc.socket().getLocalSocketAddress().toString() + "\r\n";
  155. ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設為0,所以不需要再flip()
  156. while (buf.hasRemaining()) {
  157. sc.write(buf); // 回傳給client回應字符串,發送buf的position位置 到limit位置為止之間的內容
  158. }
  159. h.setState(new ReadState()); // 改變狀態(SENDING->READING)
  160. sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件
  161. sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
  162. }
  163. }

三、主从模型

3.1 模型图

image.png

3.2 代码实现

public class Main {

    public static void main(String[] args) {
        try {
            TCPReactor reactor = new TCPReactor(9090);
            // reactor.run();
            Thread thread = new Thread(reactor);
            thread.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

public class TCPReactor implements Runnable {

    private final ServerSocketChannel ssc;
    private final Selector selector; // mainReactor用的selector

    public TCPReactor(int port) throws IOException {
        selector = Selector.open();
        ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false); // 設置ServerSocketChannel為非阻塞

        Acceptor acceptor = new Acceptor(ssc);

        SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個OP_ACCEPT事件,然後返回該通道的key
        sk.attach(acceptor); // 給定key一個附加的Acceptor對象

        InetSocketAddress addr = new InetSocketAddress(port);
        ssc.socket().bind(addr); // 在ServerSocketChannel綁定監聽端口
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) { // 在線程被中斷前持續運行
            System.out.println("mainReactor waiting for new event on port: " + ssc.socket().getLocalPort() + "...");
            try {
                if (selector.select() == 0) // 若沒有事件就緒則不往下執行
                    continue;
            } catch (IOException e) {
                e.printStackTrace();
            }
            Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合
            Iterator<SelectionKey> it = selectedKeys.iterator();
            while (it.hasNext()) {
                dispatch(it.next()); // 根據事件的key進行調度
                it.remove();
            }
        }
    }

    /*
     * name: dispatch(SelectionKey key)
     * description: 調度方法,根據事件綁定的對象開新線程
     */
    private void dispatch(SelectionKey key) {
        Runnable r = (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程
        if (r != null)
            r.run();
    }

}

// 接受連線請求線程
package org.wesoft.study.reactor.masterandslave;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class Acceptor implements Runnable {

    private final ServerSocketChannel ssc; // mainReactor監聽的socket通道
    private final int cores = Runtime.getRuntime().availableProcessors(); // 取得CPU核心數
    private final Selector[] selectors = new Selector[cores]; // 創建核心數個selector給subReactor用
    private int selIdx = 0; // 當前可使用的subReactor索引
    private TCPSubReactor[] r = new TCPSubReactor[cores]; // subReactor線程
    private Thread[] t = new Thread[cores]; // subReactor線程

    public Acceptor(ServerSocketChannel ssc) throws IOException {
        this.ssc = ssc;
        // 創建多個selector以及多個subReactor線程
        for (int i = 0; i < cores; i++) {
            selectors[i] = Selector.open();
            r[i] = new TCPSubReactor(selectors[i], ssc, i);
            t[i] = new Thread(r[i]);
            t[i].start();
        }
    }

    @Override
    public synchronized void run() {
        try {
            SocketChannel sc = ssc.accept(); // 接受client連線請求
            System.out.println(sc.socket().getRemoteSocketAddress().toString() + " is connected.");

            if (sc != null) {
                sc.configureBlocking(false); // 設置為非阻塞
                r[selIdx].setRestart(true); // 暫停線程
                selectors[selIdx].wakeup(); // 使一個阻塞住的selector操作立即返回
                SelectionKey sk = sc.register(selectors[selIdx],
                        SelectionKey.OP_READ); // SocketChannel向selector[selIdx]註冊一個OP_READ事件,然後返回該通道的key
                selectors[selIdx].wakeup(); // 使一個阻塞住的selector操作立即返回
                r[selIdx].setRestart(false); // 重啟線程
                sk.attach(new TCPHandler(sk, sc)); // 給定key一個附加的TCPHandler對象
                if (++selIdx == selectors.length)
                    selIdx = 0;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

public class TCPSubReactor implements Runnable {

    private final ServerSocketChannel ssc;
    private final Selector selector;
    private boolean restart = false;
    int num;

    public TCPSubReactor(Selector selector, ServerSocketChannel ssc, int num) {
        this.ssc = ssc;
        this.selector = selector;
        this.num = num;
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) { // 在線程被中斷前持續運行  
            //System.out.println("ID:" + num  
            //      + " subReactor waiting for new event on port: "  
            //      + ssc.socket().getLocalPort() + "...");  
            System.out.println("waiting for restart");
            while (!Thread.interrupted() && !restart) { // 在線程被中斷前以及被指定重啟前持續運行  
                try {
                    if (selector.select() == 0)
                        continue; // 若沒有事件就緒則不往下執行  
                } catch (IOException e) {
                    e.printStackTrace();
                }
                Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合  
                Iterator<SelectionKey> it = selectedKeys.iterator();
                while (it.hasNext()) {
                    dispatch((SelectionKey) (it.next())); // 根據事件的key進行調度  
                    it.remove();
                }
            }
        }
    }

    /*
     * name: dispatch(SelectionKey key) description: 調度方法,根據事件綁定的對象開新線程
     */
    private void dispatch(SelectionKey key) {
        Runnable r = (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程  
        if (r != null)
            r.run();
    }

    public void setRestart(boolean restart) {
        this.restart = restart;
    }
}

public class TCPHandler implements Runnable {

    private final SelectionKey sk;
    private final SocketChannel sc;
    private static final int THREAD_COUNTING = 10;
    private static ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_COUNTING, THREAD_COUNTING,
            120, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); // 線程池
    HandlerState state; // 以狀態模式實現Handler

    public TCPHandler(SelectionKey sk, SocketChannel sc) {
        this.sk = sk;
        this.sc = sc;
        state = new ReadState(); // 初始狀態設定為READING
        pool.setMaximumPoolSize(32); // 設置線程池最大線程數
    }

    @Override
    public void run() {
        try {
            state.handle(this, sk, sc, pool);
        } catch (IOException e) {
            System.out.println("[Warning!] A client has been closed.");
            closeChannel();
        }
    }

    public void closeChannel() {
        try {
            sk.cancel();
            sc.close();
        } catch (IOException e1) {
            e1.printStackTrace();
        }
    }

    public void setState(HandlerState state) {
        this.state = state;
    }
}

public interface HandlerState {

    void changeState(TCPHandler handler);

    void handle(TCPHandler handler, SelectionKey key, SocketChannel sc, ThreadPoolExecutor pool) throws IOException;

}

public class ReadState implements HandlerState {

    private SelectionKey sk;

    @Override
    public void changeState(TCPHandler handler) {
        handler.setState(new WorkState());
    }

    @Override
    public void handle(TCPHandler handler, SelectionKey sk, SocketChannel sc, ThreadPoolExecutor pool) throws IOException {
        this.sk = sk;
        byte[] arr = new byte[1024];
        ByteBuffer buf = ByteBuffer.wrap(arr);

        int numBytes = sc.read(buf); // 讀取字符串
        if (numBytes == -1) {
            System.out.println("[Warning!] A client has been closed.");
            handler.closeChannel();
            return;
        }
        String str = new String(arr);
        if (!str.equals(" ")) {
            handler.setState(new WorkState()); // 改變狀態(READING->WORKING)
            pool.execute(new WorkerThread(handler, str)); // do process in worker thread
            System.out.println(sc.socket().getRemoteSocketAddress().toString() + " > " + str);
        }
    }

    /*
     * 執行邏輯處理之函數
     */
    synchronized void process(TCPHandler h, String str) {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        h.setState(new WriteState()); // 改變狀態(WORKING->SENDING)
        this.sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件
        this.sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
    }

    class WorkerThread implements Runnable {

        TCPHandler h;
        String str;

        public WorkerThread(TCPHandler h, String str) {
            this.h = h;
            this.str = str;
        }

        @Override
        public void run() {
            process(h, str);
        }

    }
}

public class WriteState implements HandlerState {

    @Override
    public void changeState(TCPHandler h) {
        h.setState(new ReadState());
    }

    @Override
    public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
                       ThreadPoolExecutor pool) throws IOException {
        // get message from message queue

        String str = "Your message has sent to " + sc.socket().getLocalSocketAddress().toString() + "\r\n";
        ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設為0,所以不需要再flip()

        while (buf.hasRemaining()) {
            sc.write(buf); // 回傳給client回應字符串,發送buf的position位置 到limit位置為止之間的內容
        }

        h.setState(new ReadState()); // 改變狀態(SENDING->READING)
        sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件
        sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回
    }
}