💡 利用多线程优化
现在都是多核 cpu,设计时要充分考虑别让 cpu 的力量被白白浪费
前面的代码只有一个选择器,没有充分利用多核 cpu,如何改进呢?
分两组选择器
- 单线程配一个选择器,专门处理 accept 事件
- 创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理 read 事件
1、第一种多线程实现方式
思路 : 主线程监听接收事件,主线程将客户端通道注册 到worker线程选择器,并设置监听可读事件,由worker线程进行读操作
> 问题:> 主线程的>clientChannel.register(worker.selector,SelectionKey.OP_READ);> 语句和worker线程的>selector.select();> 执行顺序问题 一旦,worker线程陷入阻塞,那么客户端通道注册可读事件无效,worker线程无法感知到读事件
@Slf4jpublic class MultiThreadServer {public static void main(String[] args) throws IOException {Thread.currentThread().setName("boss");ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8989));Selector bossSelector = Selector.open();SelectionKey bossKey = serverSocketChannel.register(bossSelector, SelectionKey.OP_ACCEPT);//创建固定数量的workerWorker worker = new Worker("worker-0");worker.register();while (true){bossSelector.select();Iterator<SelectionKey> iterator = bossSelector.selectedKeys().iterator();while (iterator.hasNext()){SelectionKey key = iterator.next();iterator.remove();if (key.isAcceptable()) {SocketChannel clientChannel = serverSocketChannel.accept();clientChannel.configureBlocking(false);log.info("connected...{}",clientChannel.getRemoteAddress());//关联worker的selectorlog.info("before register...{}",clientChannel.getRemoteAddress());clientChannel.register(worker.selector,SelectionKey.OP_READ);log.info("after register...{}",clientChannel.getRemoteAddress());}}}}static class Worker implements Runnable{private Thread thread;private Selector selector;private String name;private volatile boolean start = false;public Worker(String name) {this.name = name;}public synchronized void register() throws IOException {if(!start){thread = new Thread(this);selector = Selector.open();start = true;thread.start();}}@Overridepublic void run() {while (true){try {//选择器阻塞时,无法反映通道注册时间selector.select();Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()){SelectionKey key = iterator.next();iterator.remove();if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(1024);SocketChannel clientChannel = (SocketChannel) key.channel();log.info("read...{}",clientChannel.getRemoteAddress());clientChannel.read(buffer);buffer.flip();debugAll(buffer);}}} catch (IOException e) {e.printStackTrace();}}}}}
2、使用队列解决第一种方式出现的问题
把注册选择器事件通过队列也交给worker线程处理,然后唤醒selector,从队列取出注册事件执行
public class MultiThreadServer {public static void main(String[] args) throws IOException {Thread.currentThread().setName("boss");ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8989));Selector bossSelector = Selector.open();SelectionKey bossKey = serverSocketChannel.register(bossSelector, SelectionKey.OP_ACCEPT);//创建固定数量的workerWorker worker = new Worker("worker-0");while (true){bossSelector.select();Iterator<SelectionKey> iterator = bossSelector.selectedKeys().iterator();while (iterator.hasNext()){SelectionKey key = iterator.next();iterator.remove();if (key.isAcceptable()) {SocketChannel clientChannel = serverSocketChannel.accept();clientChannel.configureBlocking(false);log.info("connected...{}",clientChannel.getRemoteAddress());//关联worker的selectorlog.info("before register...{}",clientChannel.getRemoteAddress());worker.register(clientChannel);log.info("after register...{}",clientChannel.getRemoteAddress());}}}}static class Worker implements Runnable{private Thread thread;private Selector selector;private String name;private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque();private volatile boolean start = false;public Worker(String name) {this.name = name;}/*** 第一次调用会创建work线程 每次注册新事件后唤醒selector* @param clientChannel* @throws IOException*/public synchronized void register(SocketChannel clientChannel) throws IOException {if(!start){thread = new Thread(this);selector = Selector.open();start = true;thread.start();}//通过队列传递任务,让worker线程自己注册queue.add(() -> {try {clientChannel.register(selector,SelectionKey.OP_READ);} catch (ClosedChannelException e) {e.printStackTrace();}});//唤醒selectorselector.wakeup();}@Overridepublic void run() {while (true){try {selector.select();Runnable task = queue.poll();if(task != null){task.run();}Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (iterator.hasNext()){SelectionKey key = iterator.next();iterator.remove();if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(1024);SocketChannel clientChannel = (SocketChannel) key.channel();log.info("read...{}",clientChannel.getRemoteAddress());clientChannel.read(buffer);buffer.flip();System.out.println(StandardCharsets.UTF_8.decode(buffer));buffer.clear();}}} catch (IOException e) {e.printStackTrace();}}}}}
