本文介绍Java的网络编程, 如何从BIO走到NIO再走到今天的Netty
1. BIO
就是最基本的Socket操作, 可以随便找一本大学Java教材, 一般都会有
public static void main(String[] args) throws IOException {//...ServerSocket server = new ServerSocket(8080);while (true) {Socket client = server.accept(); //主线程: 阻塞操作1//一般情况下, 在accept之后会把这个client交给其他线程处理InputStream inputStream = client.getInputStream(); //其他线程: 阻塞操作2}//... 省略若干代码}
2. 第一代 Java NIO
NIO的出现主要是为了解决BIO的问题
优点:
- 解决了BIO线程数量限制, 个位数的线程就可以处理大量的连接
- 将建立连接 和 接收数据进行了区分
public static void main(String[] args) throws IOException {LinkedList<SocketChannel> clientList = new LinkedList<>();ServerSocketChannel ss = ServerSocketChannel.open();ss.bind(new InetSocketAddress(8080));ss.configureBlocking(false);//设置为非阻塞while (true) {//--------------------主线程----------------------------//这里的代码块, 实际上就是可以用一个线程轮询, 是否有新的连接进入SocketChannel client = ss.accept();//非阻塞, 可能返回nullif (client != null) {//这里非空,就表示有新的连接进入client.configureBlocking(false);//设置为非阻塞clientList.add(client);}//--------------------主线程----------------------------//--------------------工作线程(池)----------------------------ByteBuffer buffer = ByteBuffer.allocateDirect(4096);//这里可以在其他线程池处理, 用一个或者几个线程疯狂遍历已经连接的客户端, 检查有没有新的数据进来//当然这里的代码就是在主线程中遍历,并读取数据,也是可以的, 就是效率肯定没有另外拉个线程读取数据来得高for (SocketChannel c : clientList) {//!!!!!!!!!!!!!!注意这里的轮询非常浪费CPU, 需要每个连接都去检查int num = c.read(buffer);//!!!!!!!!!!! 从客户端读取数据, 虽然这一步不是阻塞的, 但是这一步会发生用户态到内核态的转换if (num > 0) {//读得到就处理, 读不到就拉倒buffer.flip();byte[] aaa = new byte[buffer.limit()];buffer.get(aaa);String b = new String(aaa);System.out.println(b);buffer.clear();}}//--------------------工作线程(池)----------------------------}}
缺点:
- 建立连接的线程,和读取数据的线程, 都需要不停的轮询, 造成了大量的无效的操作, 浪费了大量CPU资源
- accept和read函数实际上会导致一次用户态到内核态的切换,也会浪费CPU
3. 第二代 Java NIO
使用selector的NIO:
用一条线程绑定多路复用器来处理所有连接(包括accept事件,read事件,write事件等)
优点:
- 不在需要不停的轮询了, 当发生连接或者读取事件的时候, 是由多路复用器来通知我们, 节约了CPU资源; (这里可以选择 1.阻塞等待多路复用器 或者 2.轮询查看多路复用器)
public class SelectorNIO_1 {public static void main(String[] args) throws IOException {SocketMultiplexSingleThreadV1 service = new SocketMultiplexSingleThreadV1();service.start();}/*** 单线程处理多路复用器的demo*/static class SocketMultiplexSingleThreadV1 {private ServerSocketChannel server = null;private Selector selector = null;private int port = 8080;public void initServer() throws IOException {server = ServerSocketChannel.open();server.configureBlocking(false);//非阻塞server.bind(new InetSocketAddress(port));selector = Selector.open();//创建一个多路复用器server.register(selector, SelectionKey.OP_ACCEPT);//向多路复用器 注册server, 并且设置监听ACCEPT事件}public void start() throws IOException {//1. 启动服务器,同时注册号多路复用器initServer();while (true) {//2. 从多路复用器中拿需要处理的文件描述符int readyNum = selector.select();//会阻塞,直到有响应事件, 这个阻塞是针对多路复用器的,要注意!!//select():阻塞到至少有一个通道在你注册的事件上就绪了。//select(long timeout):和select()一样,但最长阻塞时间为timeout毫秒。//selectNow():非阻塞,只要有通道就绪就立刻返回。if (readyNum > 0) {//3. 遍历需要处理的keySet<SelectionKey> selectionKeys = selector.selectedKeys();//获取有事件的key, 这个实际上拿到的是多路复用器中的一个成员变量, 搞完只要要把key remove出去, 非常麻烦Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();//这里需要remove掉, 如果不remove掉,下次循环还会遍历到这个key, 设计非常傻逼//4. 处理这个keyif (key.isAcceptable()) {acceptHandler(key);//建立连接操作} else if (key.isReadable()) {readHandler(key);//读取数据曹邹}}}}}//监听到客户端的连接请求事件之后private void acceptHandler(SelectionKey key) throws IOException {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();//1. 创建连接SocketChannel client = ssc.accept();//建立连接client.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocateDirect(8192);//2. 将这个连接和一个buffer绑定到一起, buffer当做key的attachment存进去//3. 向多路复用器注册监听 该连接的读取事件SelectionKey selectionKey = client.register(selector, SelectionKey.OP_READ, buffer);//向多路复用器绑定,读取事件//这个selectionKey和到时候发生读取事件读取的key是一个对象, 这里暂时没用它System.out.println("新客户端: " + client.getRemoteAddress());}private void readHandler(SelectionKey key) throws IOException {SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();//这个就是在建立连接的时候,当做attachment存进去的bufferbuffer.clear();int read;while (true) {read = client.read(buffer);if (read > 0) {buffer.flip();while (buffer.hasRemaining()) {//这里是把数据写回客户端//这里可以写其他的自己的业务逻辑client.write(buffer);}buffer.clear();} else if (read == 0) {break;} else { //-1 可能是客户端关闭了, close_waitclient.close();break;}}}}}
缺点:
- 由于只有一个线程在处理连接, 当我们线程处理读写事件的时候, 没法立刻对连接事件作出响应
- 多个读写事件都在等待一个线程处理, 也就无法将网卡的IO能力完全发挥出来
4. 第三代Java NIO
使用Selector的NIO
�1个boss线程专门处理accept事件
n个worker线程专门处理读写事件
�优点:
- 1个线程专门处理
ACCEPT事件, 对新连接的响应非常快 - 专门有n个线程, 来处理读写事件, 效率非常的高
- 思想上, 这种设计是能够最大的压榨机器性能的
public class SelectorNIO_2 {public static void main(String[] args) throws IOException, InterruptedException {//new的时候,就创建了boss线程和worker线程,只是没有startSocketMultiplexingThreads socketMultiplexingThreads = new SocketMultiplexingThreads(9090, 4);//start boss线程和worker线程, 此时可以接收请求了socketMultiplexingThreads.start();}//包装了boss线程和worker线程的创建初始化过程static class SocketMultiplexingThreads {//服务private ServerSocketChannel server;//boss线程private BossThread bossThread;//worker线程private List<WorkerThread> workerThreads;//服务绑定的端口号private int port;//构造方法, 同时初始化server, bossThread, workerThreadpublic SocketMultiplexingThreads(int port, int workerNum) throws IOException {this.port = port;server = ServerSocketChannel.open();server.configureBlocking(false);//非阻塞server.bind(new InetSocketAddress(this.port));this.bossThread = new BossThread(server, this);//创建boss线程workerThreads = new ArrayList<>(workerNum);for (int i = 0; i < workerNum; i++) {//创建worker线程workerThreads.add(new WorkerThread(server));}}//粗糙的启动方法public void start() throws InterruptedException {bossThread.start();for (WorkerThread workerThread : workerThreads) {workerThread.start();}Thread.sleep(1000 * 60 * 60);}//随机获取一个worker线程WorkerThread randomGetWorkerThread() {Random random = new Random();int index = random.nextInt(workerThreads.size());return workerThreads.get(index);}}//boss线程只处理accept事件static class BossThread extends Thread {private Selector bossSelector;private SocketMultiplexingThreads socketMultiplexingThreads;public BossThread(ServerSocketChannel server, SocketMultiplexingThreads socketMultiplexingThreads) throws IOException {this.socketMultiplexingThreads = socketMultiplexingThreads;//创建Boss多路复用器this.bossSelector = Selector.open();//向主线程注册ACCEPT事件server.register(bossSelector, SelectionKey.OP_ACCEPT);//向多路复用器 注册server, 并且设置监听ACCEPT事件System.out.println("Boss 线程创建完毕");}@lombok.SneakyThrows@Overridepublic void run() {//boss线程不停的轮询, 等待新的连接, 专门处理accept事件while (true) {int readyNum = bossSelector.select(100);if (readyNum > 0) {Set<SelectionKey> selectionKeys = bossSelector.selectedKeys();//获取有事件的key, 这个实际上拿到的是多路复用器中的一个成员变量, 搞完只要要把key remove出去, 非常麻烦Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {System.out.println("boss 线程处理事件");SelectionKey key = iterator.next();iterator.remove();//这里需要remove掉, 如果不remove掉,下次循环还会遍历到这个key, 设计非常傻逼//Boss线程只处理Accept事件if (key.isAcceptable()) {acceptHandler(key);//建立连接操作}}}}}//处理accept事件,将读事件绑定到worker线程上去private void acceptHandler(SelectionKey key) throws IOException {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();//1. 创建连接SocketChannel client = ssc.accept();//建立连接client.configureBlocking(false);//2. 分配buffer给这个连接, 其实这里如果把分配内存放到worker线程中去做, boss能处理更多的连接, 这里就不演示了, 不然代码就不清晰了ByteBuffer buffer = ByteBuffer.allocateDirect(8192);//3. 获取到worker线程的多路复用器WorkerThread workerThread = socketMultiplexingThreads.randomGetWorkerThread();//这里用的是随机获取一个, 算法可以自己实现Selector workerSelector = workerThread.getWorkerSelector();//4. 向多路复用器注册监听 该连接的读取事件SelectionKey selectionKey = client.register(workerSelector, SelectionKey.OP_READ, buffer);//向多路复用器绑定,读取事件//这个selectionKey和到时候发生读取事件读取的key是一个对象, 这里暂时没用它System.out.println("向worker线程注册读取事件: " + client.getRemoteAddress());}}//worker线程只处理读写事件static class WorkerThread extends Thread {private Selector workerSelector;private static AtomicInteger accumulator = new AtomicInteger(0);private int id = accumulator.getAndIncrement();//作为线程的标识public WorkerThread(ServerSocketChannel server) throws IOException {//创建Worker多路复用器this.workerSelector = Selector.open();System.out.println(this + " worker 线程创建完毕");}public Selector getWorkerSelector() {return this.workerSelector;}@SneakyThrows@Overridepublic void run() {//worker线程不停的轮询, 等待新的连接, 专门处理读事件while (true) {int readyNum = workerSelector.select(100);if (readyNum > 0) {Set<SelectionKey> selectionKeys = workerSelector.selectedKeys();//获取有事件的key, 这个实际上拿到的是多路复用器中的一个成员变量, 搞完只要要把key remove出去, 非常麻烦Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {System.out.println(this + " worker 线程处理事件");SelectionKey key = iterator.next();iterator.remove();//这里需要remove掉, 如果不remove掉,下次循环还会遍历到这个key, 设计非常傻逼//Boss线程只处理Accept事件if (key.isReadable()) {readHandler(key);//建立连接操作}}}}}//worker线程处理读写事件private void readHandler(SelectionKey key) throws IOException {SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();//这个就是在建立连接的时候,当做attachment存进去的bufferbuffer.clear();int read;while (true) {read = client.read(buffer);if (read > 0) {buffer.flip();while (buffer.hasRemaining()) {//这里是把数据写回客户端//这里可以写其他的自己的业务逻辑client.write(buffer);}buffer.clear();} else if (read == 0) {break;} else { //-1 可能是客户端关闭了, close_waitclient.close();break;}}}@Overridepublic String toString() {return "worker线程[" + id + "]";}}}
缺点:
- 需要自己直接使用NIO的接口, 那么意味着可能会遇到各种坑
- 已经有Netty这种成熟框架包装了NIO的接口, 没有必要自己再写一套基于NIO的框架(但是方便帮助自己理解netty的思想)
5. Netty的使用
- Netty对JDK的NIO又再进行了一次包装, 底层还是调用JDK的NIO接口进行操作
- Netty的设计哲学是将所有网络操作抽象成 事件, 再将事件交给 EventLoop 去处理; (包括建立连接,读,写 都是事件)
-
5.1 单线程Netty
用一个线程去处理所有 事件 ```java public class NettyIO_1 {
public static void main(String[] args) throws InterruptedException {
//boss线程只设置1个NioEventLoopGroup boss = new NioEventLoopGroup(1);//group可以设置多个线程,但是我们这只设置了1个ServerBootstrap boot = new ServerBootstrap();//注意,这里特地写成两个boss//意味着: boss线程不但处理accept事件,也处理读写事件boot.group(boss, boss).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel channel) throws Exception {ChannelPipeline pipeline = channel.pipeline();pipeline.addLast(new MyInbound());}});ChannelFuture future = boot.bind(8080).sync();future.channel().closeFuture().sync();
}
static class MyInbound extends ChannelInboundHandlerAdapter {
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//todo}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {//todo}
}
}
<a name="anM7H"></a>## 5.2 多线程Netty```javapublic class NettyIO_2 {public static void main(String[] args) throws InterruptedException {//boss线程设置4个//线程既是boss也是workerNioEventLoopGroup boss = new NioEventLoopGroup(4);ServerBootstrap boot = new ServerBootstrap();boot.group(boss, boss).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel channel) throws Exception {ChannelPipeline pipeline = channel.pipeline();pipeline.addLast(new MyInbound());}});ChannelFuture future = boot.bind(8080).sync();future.channel().closeFuture().sync();}static class MyInbound extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ctx.write(msg);//echo的作用, 将读到的东西,写回客户端}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}}}
5.3 Netty的经典使用
- 在多线程的基础上,将线程分为boss和worker
- boss 专门用来处理连接事件, 只有一个线程
worker 专门用来处理读写事件, 可以有多个线程 ```java public class NettyIO_3 {
public static void main(String[] args) throws InterruptedException {
//bossGroup, 专门处理连接事件NioEventLoopGroup boss = new NioEventLoopGroup(99);//即使这里传入了99, 会创建99个EventLoop对象, 但是线程被使用的只有一个, 也就是说只有一个线程处理ACCEPT事件//workerGroup 专门处理读写事件NioEventLoopGroup worker = new NioEventLoopGroup(20);//20个线程用来处理读写事件ServerBootstrap boot = new ServerBootstrap();boot.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel channel) throws Exception {ChannelPipeline pipeline = channel.pipeline();pipeline.addLast(new MyInbound());}});ChannelFuture future = boot.bind(8080).sync();future.channel().closeFuture().sync();
}
static class MyInbound extends ChannelInboundHandlerAdapter {
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ctx.write(msg);//echo的作用, 将读到的东西,写回客户端}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}
}
} ```
