服务端
package cn.inetty.nio.worker;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import static cn.inetty.nio.ByteBufferUtil.debugAll;
// 编写多线程服务器
public class iMultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
Selector selector = Selector.open();
ssc.configureBlocking(false);
SelectionKey bossKey = ssc.register(selector, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8888));
// 创建工作线程并初始化 此工作线程是固定数量的
System.out.println(Runtime.getRuntime().availableProcessors());
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-"+i);
}
AtomicInteger index = new AtomicInteger();
while (true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
// 线程的 boss 负责连接
SocketChannel channel = ssc.accept();
channel.configureBlocking(false);
System.out.println("connected: " + channel.getRemoteAddress());
// 关联worker
System.out.println("before: " + channel.getRemoteAddress());
// 轮询 worker round robin
workers[index.getAndIncrement() % workers.length].register(channel);
System.out.println("after: " + channel.getRemoteAddress());
}
}
}
}
// 工作线程
static class Worker implements Runnable {
private Thread thread;
private Selector selector;
private String name;
private volatile boolean isStart = false;
private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>();
public Worker(String name) {
this.name = name;
}
// 初始化线程 和 Selector
public void register(SocketChannel channel) throws IOException {
if (!isStart) {
thread = new Thread(this, name);
thread.start();
selector = Selector.open();
isStart = true;
}
// 添加任务到队列,但此时任务还没有被执行
queue.add(()->{
try {
channel.register(selector, SelectionKey.OP_READ, null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
selector.wakeup(); // 唤醒 worker-0 线程
}
@Override
public void run() {
while (true) {
try {
selector.select(); // worker-0 线程阻塞
Runnable poll = queue.poll();// 取出队列
if (poll!=null) {
poll.run(); // 在此时真正执行了 channel.register(selector, SelectionKey.OP_READ, null);
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
SocketChannel channel = (SocketChannel) key.channel();
System.out.println("thread: " + Thread.currentThread().getName() + "read: " + channel.getRemoteAddress());
ByteBuffer buffer = ByteBuffer.allocate(16);
channel.read(buffer);
debugAll(buffer);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
客户端
package cn.inetty.nio.worker;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
public class Client1 {
public static void main(String[] args) throws IOException {
SocketChannel open = SocketChannel.open();
open.connect(new InetSocketAddress("localhost", 8888));
open.write(Charset.defaultCharset().encode("0123456789asdfghjkl"));
System.in.read();
}
}