💡 利用多线程优化
现在都是多核 cpu,设计时要充分考虑别让 cpu 的力量被白白浪费
前面的代码只有一个选择器,没有充分利用多核 cpu,如何改进呢?
分两组选择器
- 单线程配一个选择器,专门处理 accept 事件
- 创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理 read 事件
1、第一种多线程实现方式
思路 : 主线程监听接收事件,主线程将客户端通道注册 到worker线程选择器,并设置监听可读事件,由worker线程进行读操作
> 问题:> 主线程的>clientChannel.register(worker.selector,SelectionKey.OP_READ);
> 语句和worker线程的>selector.select();
> 执行顺序问题 一旦,worker线程陷入阻塞,那么客户端通道注册可读事件无效,worker线程无法感知到读事件
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8989));
Selector bossSelector = Selector.open();
SelectionKey bossKey = serverSocketChannel.register(bossSelector, SelectionKey.OP_ACCEPT);
//创建固定数量的worker
Worker worker = new Worker("worker-0");
worker.register();
while (true){
bossSelector.select();
Iterator<SelectionKey> iterator = bossSelector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel clientChannel = serverSocketChannel.accept();
clientChannel.configureBlocking(false);
log.info("connected...{}",clientChannel.getRemoteAddress());
//关联worker的selector
log.info("before register...{}",clientChannel.getRemoteAddress());
clientChannel.register(worker.selector,SelectionKey.OP_READ);
log.info("after register...{}",clientChannel.getRemoteAddress());
}
}
}
}
static class Worker implements Runnable{
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false;
public Worker(String name) {
this.name = name;
}
public synchronized void register() throws IOException {
if(!start){
thread = new Thread(this);
selector = Selector.open();
start = true;
thread.start();
}
}
@Override
public void run() {
while (true){
try {
//选择器阻塞时,无法反映通道注册时间
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel clientChannel = (SocketChannel) key.channel();
log.info("read...{}",clientChannel.getRemoteAddress());
clientChannel.read(buffer);
buffer.flip();
debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
2、使用队列解决第一种方式出现的问题
把注册选择器事件通过队列也交给worker线程处理,然后唤醒selector,从队列取出注册事件执行
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8989));
Selector bossSelector = Selector.open();
SelectionKey bossKey = serverSocketChannel.register(bossSelector, SelectionKey.OP_ACCEPT);
//创建固定数量的worker
Worker worker = new Worker("worker-0");
while (true){
bossSelector.select();
Iterator<SelectionKey> iterator = bossSelector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel clientChannel = serverSocketChannel.accept();
clientChannel.configureBlocking(false);
log.info("connected...{}",clientChannel.getRemoteAddress());
//关联worker的selector
log.info("before register...{}",clientChannel.getRemoteAddress());
worker.register(clientChannel);
log.info("after register...{}",clientChannel.getRemoteAddress());
}
}
}
}
static class Worker implements Runnable{
private Thread thread;
private Selector selector;
private String name;
private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque();
private volatile boolean start = false;
public Worker(String name) {
this.name = name;
}
/**
* 第一次调用会创建work线程 每次注册新事件后唤醒selector
* @param clientChannel
* @throws IOException
*/
public synchronized void register(SocketChannel clientChannel) throws IOException {
if(!start){
thread = new Thread(this);
selector = Selector.open();
start = true;
thread.start();
}
//通过队列传递任务,让worker线程自己注册
queue.add(() -> {
try {
clientChannel.register(selector,SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
//唤醒selector
selector.wakeup();
}
@Override
public void run() {
while (true){
try {
selector.select();
Runnable task = queue.poll();
if(task != null){
task.run();
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel clientChannel = (SocketChannel) key.channel();
log.info("read...{}",clientChannel.getRemoteAddress());
clientChannel.read(buffer);
buffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(buffer));
buffer.clear();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}