服务端
package cn.inetty.nio.worker;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.*;import java.util.Iterator;import java.util.concurrent.ConcurrentLinkedDeque;import java.util.concurrent.atomic.AtomicInteger;import static cn.inetty.nio.ByteBufferUtil.debugAll;// 编写多线程服务器public class iMultiThreadServer { public static void main(String[] args) throws IOException { Thread.currentThread().setName("boss"); ServerSocketChannel ssc = ServerSocketChannel.open(); Selector selector = Selector.open(); ssc.configureBlocking(false); SelectionKey bossKey = ssc.register(selector, 0, null); bossKey.interestOps(SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8888)); // 创建工作线程并初始化 此工作线程是固定数量的 System.out.println(Runtime.getRuntime().availableProcessors()); Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()]; for (int i = 0; i < workers.length; i++) { workers[i] = new Worker("worker-"+i); } AtomicInteger index = new AtomicInteger(); while (true) { selector.select(); Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { // 线程的 boss 负责连接 SocketChannel channel = ssc.accept(); channel.configureBlocking(false); System.out.println("connected: " + channel.getRemoteAddress()); // 关联worker System.out.println("before: " + channel.getRemoteAddress()); // 轮询 worker round robin workers[index.getAndIncrement() % workers.length].register(channel); System.out.println("after: " + channel.getRemoteAddress()); } } } } // 工作线程 static class Worker implements Runnable { private Thread thread; private Selector selector; private String name; private volatile boolean isStart = false; private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>(); public Worker(String name) { this.name = name; } // 初始化线程 和 Selector public void register(SocketChannel channel) throws IOException { if (!isStart) { thread = new Thread(this, name); thread.start(); selector = Selector.open(); isStart = true; } // 添加任务到队列,但此时任务还没有被执行 queue.add(()->{ try { channel.register(selector, SelectionKey.OP_READ, null); } catch (ClosedChannelException e) { e.printStackTrace(); } }); selector.wakeup(); // 唤醒 worker-0 线程 } @Override public void run() { while (true) { try { selector.select(); // worker-0 线程阻塞 Runnable poll = queue.poll();// 取出队列 if (poll!=null) { poll.run(); // 在此时真正执行了 channel.register(selector, SelectionKey.OP_READ, null); } Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); SocketChannel channel = (SocketChannel) key.channel(); System.out.println("thread: " + Thread.currentThread().getName() + "read: " + channel.getRemoteAddress()); ByteBuffer buffer = ByteBuffer.allocate(16); channel.read(buffer); debugAll(buffer); } } catch (IOException e) { e.printStackTrace(); } } } }}
客户端
package cn.inetty.nio.worker;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;public class Client1 { public static void main(String[] args) throws IOException { SocketChannel open = SocketChannel.open(); open.connect(new InetSocketAddress("localhost", 8888)); open.write(Charset.defaultCharset().encode("0123456789asdfghjkl")); System.in.read(); }}