💡 如何拿到 cpu 个数
- Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数
- 这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启
分两组选择器
- 单线程配一个选择器,专门处理 accept 事件
- 创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理 read 事件
```java public class ChannelDemo7 { public static void main(String[] args) throws IOException {
}new BossEventLoop().register();
@Slf4jstatic class BossEventLoop implements Runnable {private Selector boss;private WorkerEventLoop[] workers;private volatile boolean start = false;AtomicInteger index = new AtomicInteger();public void register() throws IOException {if (!start) {ServerSocketChannel ssc = ServerSocketChannel.open();ssc.bind(new InetSocketAddress(8080));ssc.configureBlocking(false);boss = Selector.open();SelectionKey ssckey = ssc.register(boss, 0, null);ssckey.interestOps(SelectionKey.OP_ACCEPT);workers = initEventLoops();new Thread(this, "boss").start();log.debug("boss start...");start = true;}}public WorkerEventLoop[] initEventLoops() {
// EventLoop[] eventLoops = new EventLoop[Runtime.getRuntime().availableProcessors()]; WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[2]; for (int i = 0; i < workerEventLoops.length; i++) { workerEventLoops[i] = new WorkerEventLoop(i); } return workerEventLoops; }
@Overridepublic void run() {while (true) {try {boss.select();Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {ServerSocketChannel c = (ServerSocketChannel) key.channel();SocketChannel sc = c.accept();sc.configureBlocking(false);log.debug("{} connected", sc.getRemoteAddress());workers[index.getAndIncrement() % workers.length].register(sc);}}} catch (IOException e) {e.printStackTrace();}}}}@Slf4jstatic class WorkerEventLoop implements Runnable {private Selector worker;private volatile boolean start = false;private int index;private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();public WorkerEventLoop(int index) {this.index = index;}public void register(SocketChannel sc) throws IOException {if (!start) {worker = Selector.open();new Thread(this, "worker-" + index).start();start = true;}tasks.add(() -> {try {SelectionKey sckey = sc.register(worker, 0, null);sckey.interestOps(SelectionKey.OP_READ);worker.selectNow();} catch (IOException e) {e.printStackTrace();}});worker.wakeup();}@Overridepublic void run() {while (true) {try {worker.select();Runnable task = tasks.poll();if (task != null) {task.run();}Set<SelectionKey> keys = worker.selectedKeys();Iterator<SelectionKey> iter = keys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();if (key.isReadable()) {SocketChannel sc = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(128);try {int read = sc.read(buffer);if (read == -1) {key.cancel();sc.close();} else {buffer.flip();log.debug("{} message:", sc.getRemoteAddress());debugAll(buffer);}} catch (IOException e) {e.printStackTrace();key.cancel();sc.close();}}iter.remove();}} catch (IOException e) {e.printStackTrace();}}}}
} ```
