本文介绍Java的网络编程, 如何从BIO走到NIO再走到今天的Netty

1. BIO

就是最基本的Socket操作, 可以随便找一本大学Java教材, 一般都会有

  1. public static void main(String[] args) throws IOException {
  2. //...
  3. ServerSocket server = new ServerSocket(8080);
  4. while (true) {
  5. Socket client = server.accept(); //主线程: 阻塞操作1
  6. //一般情况下, 在accept之后会把这个client交给其他线程处理
  7. InputStream inputStream = client.getInputStream(); //其他线程: 阻塞操作2
  8. }
  9. //... 省略若干代码
  10. }

2. 第一代 Java NIO

NIO的出现主要是为了解决BIO的问题
优点:

  1. 解决了BIO线程数量限制, 个位数的线程就可以处理大量的连接
  2. 将建立连接 和 接收数据进行了区分
  1. public static void main(String[] args) throws IOException {
  2. LinkedList<SocketChannel> clientList = new LinkedList<>();
  3. ServerSocketChannel ss = ServerSocketChannel.open();
  4. ss.bind(new InetSocketAddress(8080));
  5. ss.configureBlocking(false);//设置为非阻塞
  6. while (true) {
  7. //--------------------主线程----------------------------
  8. //这里的代码块, 实际上就是可以用一个线程轮询, 是否有新的连接进入
  9. SocketChannel client = ss.accept();//非阻塞, 可能返回null
  10. if (client != null) {
  11. //这里非空,就表示有新的连接进入
  12. client.configureBlocking(false);//设置为非阻塞
  13. clientList.add(client);
  14. }
  15. //--------------------主线程----------------------------
  16. //--------------------工作线程(池)----------------------------
  17. ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
  18. //这里可以在其他线程池处理, 用一个或者几个线程疯狂遍历已经连接的客户端, 检查有没有新的数据进来
  19. //当然这里的代码就是在主线程中遍历,并读取数据,也是可以的, 就是效率肯定没有另外拉个线程读取数据来得高
  20. for (SocketChannel c : clientList) {//!!!!!!!!!!!!!!注意这里的轮询非常浪费CPU, 需要每个连接都去检查
  21. int num = c.read(buffer);//!!!!!!!!!!! 从客户端读取数据, 虽然这一步不是阻塞的, 但是这一步会发生用户态到内核态的转换
  22. if (num > 0) {//读得到就处理, 读不到就拉倒
  23. buffer.flip();
  24. byte[] aaa = new byte[buffer.limit()];
  25. buffer.get(aaa);
  26. String b = new String(aaa);
  27. System.out.println(b);
  28. buffer.clear();
  29. }
  30. }
  31. //--------------------工作线程(池)----------------------------
  32. }
  33. }

缺点:

  1. 建立连接的线程,和读取数据的线程, 都需要不停的轮询, 造成了大量的无效的操作, 浪费了大量CPU资源
  2. accept和read函数实际上会导致一次用户态到内核态的切换,也会浪费CPU

3. 第二代 Java NIO

使用selector的NIO:
用一条线程绑定多路复用器来处理所有连接(包括accept事件,read事件,write事件等)

优点:

  • 不在需要不停的轮询了, 当发生连接或者读取事件的时候, 是由多路复用器来通知我们, 节约了CPU资源; (这里可以选择 1.阻塞等待多路复用器 或者 2.轮询查看多路复用器)
  1. public class SelectorNIO_1 {
  2. public static void main(String[] args) throws IOException {
  3. SocketMultiplexSingleThreadV1 service = new SocketMultiplexSingleThreadV1();
  4. service.start();
  5. }
  6. /**
  7. * 单线程处理多路复用器的demo
  8. */
  9. static class SocketMultiplexSingleThreadV1 {
  10. private ServerSocketChannel server = null;
  11. private Selector selector = null;
  12. private int port = 8080;
  13. public void initServer() throws IOException {
  14. server = ServerSocketChannel.open();
  15. server.configureBlocking(false);//非阻塞
  16. server.bind(new InetSocketAddress(port));
  17. selector = Selector.open();//创建一个多路复用器
  18. server.register(selector, SelectionKey.OP_ACCEPT);//向多路复用器 注册server, 并且设置监听ACCEPT事件
  19. }
  20. public void start() throws IOException {
  21. //1. 启动服务器,同时注册号多路复用器
  22. initServer();
  23. while (true) {
  24. //2. 从多路复用器中拿需要处理的文件描述符
  25. int readyNum = selector.select();//会阻塞,直到有响应事件, 这个阻塞是针对多路复用器的,要注意!!
  26. //select():阻塞到至少有一个通道在你注册的事件上就绪了。
  27. //select(long timeout):和select()一样,但最长阻塞时间为timeout毫秒。
  28. //selectNow():非阻塞,只要有通道就绪就立刻返回。
  29. if (readyNum > 0) {
  30. //3. 遍历需要处理的key
  31. Set<SelectionKey> selectionKeys = selector.selectedKeys();//获取有事件的key, 这个实际上拿到的是多路复用器中的一个成员变量, 搞完只要要把key remove出去, 非常麻烦
  32. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  33. while (iterator.hasNext()) {
  34. SelectionKey key = iterator.next();
  35. iterator.remove();//这里需要remove掉, 如果不remove掉,下次循环还会遍历到这个key, 设计非常傻逼
  36. //4. 处理这个key
  37. if (key.isAcceptable()) {
  38. acceptHandler(key);//建立连接操作
  39. } else if (key.isReadable()) {
  40. readHandler(key);//读取数据曹邹
  41. }
  42. }
  43. }
  44. }
  45. }
  46. //监听到客户端的连接请求事件之后
  47. private void acceptHandler(SelectionKey key) throws IOException {
  48. ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
  49. //1. 创建连接
  50. SocketChannel client = ssc.accept();//建立连接
  51. client.configureBlocking(false);
  52. ByteBuffer buffer = ByteBuffer.allocateDirect(8192);
  53. //2. 将这个连接和一个buffer绑定到一起, buffer当做key的attachment存进去
  54. //3. 向多路复用器注册监听 该连接的读取事件
  55. SelectionKey selectionKey = client.register(selector, SelectionKey.OP_READ, buffer);//向多路复用器绑定,读取事件
  56. //这个selectionKey和到时候发生读取事件读取的key是一个对象, 这里暂时没用它
  57. System.out.println("新客户端: " + client.getRemoteAddress());
  58. }
  59. private void readHandler(SelectionKey key) throws IOException {
  60. SocketChannel client = (SocketChannel) key.channel();
  61. ByteBuffer buffer = (ByteBuffer) key.attachment();//这个就是在建立连接的时候,当做attachment存进去的buffer
  62. buffer.clear();
  63. int read;
  64. while (true) {
  65. read = client.read(buffer);
  66. if (read > 0) {
  67. buffer.flip();
  68. while (buffer.hasRemaining()) {
  69. //这里是把数据写回客户端
  70. //这里可以写其他的自己的业务逻辑
  71. client.write(buffer);
  72. }
  73. buffer.clear();
  74. } else if (read == 0) {
  75. break;
  76. } else { //-1 可能是客户端关闭了, close_wait
  77. client.close();
  78. break;
  79. }
  80. }
  81. }
  82. }
  83. }

缺点:

  1. 由于只有一个线程在处理连接, 当我们线程处理读写事件的时候, 没法立刻对连接事件作出响应
  2. 多个读写事件都在等待一个线程处理, 也就无法将网卡的IO能力完全发挥出来

4. 第三代Java NIO

使用Selector的NIO
�1个boss线程专门处理accept事件
n个worker线程专门处理读写事件

�优点:

  1. 1个线程专门处理ACCEPT事件, 对新连接的响应非常快
  2. 专门有n个线程, 来处理读写事件, 效率非常的高
  3. 思想上, 这种设计是能够最大的压榨机器性能的
  1. public class SelectorNIO_2 {
  2. public static void main(String[] args) throws IOException, InterruptedException {
  3. //new的时候,就创建了boss线程和worker线程,只是没有start
  4. SocketMultiplexingThreads socketMultiplexingThreads = new SocketMultiplexingThreads(9090, 4);
  5. //start boss线程和worker线程, 此时可以接收请求了
  6. socketMultiplexingThreads.start();
  7. }
  8. //包装了boss线程和worker线程的创建初始化过程
  9. static class SocketMultiplexingThreads {
  10. //服务
  11. private ServerSocketChannel server;
  12. //boss线程
  13. private BossThread bossThread;
  14. //worker线程
  15. private List<WorkerThread> workerThreads;
  16. //服务绑定的端口号
  17. private int port;
  18. //构造方法, 同时初始化server, bossThread, workerThread
  19. public SocketMultiplexingThreads(int port, int workerNum) throws IOException {
  20. this.port = port;
  21. server = ServerSocketChannel.open();
  22. server.configureBlocking(false);//非阻塞
  23. server.bind(new InetSocketAddress(this.port));
  24. this.bossThread = new BossThread(server, this);
  25. //创建boss线程
  26. workerThreads = new ArrayList<>(workerNum);
  27. for (int i = 0; i < workerNum; i++) {
  28. //创建worker线程
  29. workerThreads.add(new WorkerThread(server));
  30. }
  31. }
  32. //粗糙的启动方法
  33. public void start() throws InterruptedException {
  34. bossThread.start();
  35. for (WorkerThread workerThread : workerThreads) {
  36. workerThread.start();
  37. }
  38. Thread.sleep(1000 * 60 * 60);
  39. }
  40. //随机获取一个worker线程
  41. WorkerThread randomGetWorkerThread() {
  42. Random random = new Random();
  43. int index = random.nextInt(workerThreads.size());
  44. return workerThreads.get(index);
  45. }
  46. }
  47. //boss线程只处理accept事件
  48. static class BossThread extends Thread {
  49. private Selector bossSelector;
  50. private SocketMultiplexingThreads socketMultiplexingThreads;
  51. public BossThread(ServerSocketChannel server, SocketMultiplexingThreads socketMultiplexingThreads) throws IOException {
  52. this.socketMultiplexingThreads = socketMultiplexingThreads;
  53. //创建Boss多路复用器
  54. this.bossSelector = Selector.open();
  55. //向主线程注册ACCEPT事件
  56. server.register(bossSelector, SelectionKey.OP_ACCEPT);//向多路复用器 注册server, 并且设置监听ACCEPT事件
  57. System.out.println("Boss 线程创建完毕");
  58. }
  59. @lombok.SneakyThrows
  60. @Override
  61. public void run() {
  62. //boss线程不停的轮询, 等待新的连接, 专门处理accept事件
  63. while (true) {
  64. int readyNum = bossSelector.select(100);
  65. if (readyNum > 0) {
  66. Set<SelectionKey> selectionKeys = bossSelector.selectedKeys();//获取有事件的key, 这个实际上拿到的是多路复用器中的一个成员变量, 搞完只要要把key remove出去, 非常麻烦
  67. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  68. while (iterator.hasNext()) {
  69. System.out.println("boss 线程处理事件");
  70. SelectionKey key = iterator.next();
  71. iterator.remove();//这里需要remove掉, 如果不remove掉,下次循环还会遍历到这个key, 设计非常傻逼
  72. //Boss线程只处理Accept事件
  73. if (key.isAcceptable()) {
  74. acceptHandler(key);//建立连接操作
  75. }
  76. }
  77. }
  78. }
  79. }
  80. //处理accept事件,将读事件绑定到worker线程上去
  81. private void acceptHandler(SelectionKey key) throws IOException {
  82. ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
  83. //1. 创建连接
  84. SocketChannel client = ssc.accept();//建立连接
  85. client.configureBlocking(false);
  86. //2. 分配buffer给这个连接, 其实这里如果把分配内存放到worker线程中去做, boss能处理更多的连接, 这里就不演示了, 不然代码就不清晰了
  87. ByteBuffer buffer = ByteBuffer.allocateDirect(8192);
  88. //3. 获取到worker线程的多路复用器
  89. WorkerThread workerThread = socketMultiplexingThreads.randomGetWorkerThread();//这里用的是随机获取一个, 算法可以自己实现
  90. Selector workerSelector = workerThread.getWorkerSelector();
  91. //4. 向多路复用器注册监听 该连接的读取事件
  92. SelectionKey selectionKey = client.register(workerSelector, SelectionKey.OP_READ, buffer);//向多路复用器绑定,读取事件
  93. //这个selectionKey和到时候发生读取事件读取的key是一个对象, 这里暂时没用它
  94. System.out.println("向worker线程注册读取事件: " + client.getRemoteAddress());
  95. }
  96. }
  97. //worker线程只处理读写事件
  98. static class WorkerThread extends Thread {
  99. private Selector workerSelector;
  100. private static AtomicInteger accumulator = new AtomicInteger(0);
  101. private int id = accumulator.getAndIncrement();//作为线程的标识
  102. public WorkerThread(ServerSocketChannel server) throws IOException {
  103. //创建Worker多路复用器
  104. this.workerSelector = Selector.open();
  105. System.out.println(this + " worker 线程创建完毕");
  106. }
  107. public Selector getWorkerSelector() {
  108. return this.workerSelector;
  109. }
  110. @SneakyThrows
  111. @Override
  112. public void run() {
  113. //worker线程不停的轮询, 等待新的连接, 专门处理读事件
  114. while (true) {
  115. int readyNum = workerSelector.select(100);
  116. if (readyNum > 0) {
  117. Set<SelectionKey> selectionKeys = workerSelector.selectedKeys();//获取有事件的key, 这个实际上拿到的是多路复用器中的一个成员变量, 搞完只要要把key remove出去, 非常麻烦
  118. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  119. while (iterator.hasNext()) {
  120. System.out.println(this + " worker 线程处理事件");
  121. SelectionKey key = iterator.next();
  122. iterator.remove();//这里需要remove掉, 如果不remove掉,下次循环还会遍历到这个key, 设计非常傻逼
  123. //Boss线程只处理Accept事件
  124. if (key.isReadable()) {
  125. readHandler(key);//建立连接操作
  126. }
  127. }
  128. }
  129. }
  130. }
  131. //worker线程处理读写事件
  132. private void readHandler(SelectionKey key) throws IOException {
  133. SocketChannel client = (SocketChannel) key.channel();
  134. ByteBuffer buffer = (ByteBuffer) key.attachment();//这个就是在建立连接的时候,当做attachment存进去的buffer
  135. buffer.clear();
  136. int read;
  137. while (true) {
  138. read = client.read(buffer);
  139. if (read > 0) {
  140. buffer.flip();
  141. while (buffer.hasRemaining()) {
  142. //这里是把数据写回客户端
  143. //这里可以写其他的自己的业务逻辑
  144. client.write(buffer);
  145. }
  146. buffer.clear();
  147. } else if (read == 0) {
  148. break;
  149. } else { //-1 可能是客户端关闭了, close_wait
  150. client.close();
  151. break;
  152. }
  153. }
  154. }
  155. @Override
  156. public String toString() {
  157. return "worker线程[" + id + "]";
  158. }
  159. }
  160. }

缺点:

  1. 需要自己直接使用NIO的接口, 那么意味着可能会遇到各种坑
  2. 已经有Netty这种成熟框架包装了NIO的接口, 没有必要自己再写一套基于NIO的框架(但是方便帮助自己理解netty的思想)

5. Netty的使用

  • Netty对JDK的NIO又再进行了一次包装, 底层还是调用JDK的NIO接口进行操作
  • Netty的设计哲学是将所有网络操作抽象成 事件, 再将事件交给 EventLoop 去处理; (包括建立连接,读,写 都是事件)
  • 让网络编程的程序员可以更聚焦于业务的处理

    5.1 单线程Netty

  • 用一个线程去处理所有 事件 ```java public class NettyIO_1 {

    public static void main(String[] args) throws InterruptedException {

    1. //boss线程只设置1个
    2. NioEventLoopGroup boss = new NioEventLoopGroup(1);//group可以设置多个线程,但是我们这只设置了1个
    3. ServerBootstrap boot = new ServerBootstrap();
    4. //注意,这里特地写成两个boss
    5. //意味着: boss线程不但处理accept事件,也处理读写事件
    6. boot.group(boss, boss)
    7. .channel(NioServerSocketChannel.class)
    8. .childHandler(new ChannelInitializer<NioSocketChannel>() {
    9. @Override
    10. protected void initChannel(NioSocketChannel channel) throws Exception {
    11. ChannelPipeline pipeline = channel.pipeline();
    12. pipeline.addLast(new MyInbound());
    13. }
    14. });
    15. ChannelFuture future = boot.bind(8080).sync();
    16. future.channel().closeFuture().sync();

    }

    static class MyInbound extends ChannelInboundHandlerAdapter {

    1. @Override
    2. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    3. //todo
    4. }
    5. @Override
    6. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    7. //todo
    8. }

    }

}

  1. <a name="anM7H"></a>
  2. ## 5.2 多线程Netty
  3. ```java
  4. public class NettyIO_2 {
  5. public static void main(String[] args) throws InterruptedException {
  6. //boss线程设置4个
  7. //线程既是boss也是worker
  8. NioEventLoopGroup boss = new NioEventLoopGroup(4);
  9. ServerBootstrap boot = new ServerBootstrap();
  10. boot.group(boss, boss)
  11. .channel(NioServerSocketChannel.class)
  12. .childHandler(new ChannelInitializer<NioSocketChannel>() {
  13. @Override
  14. protected void initChannel(NioSocketChannel channel) throws Exception {
  15. ChannelPipeline pipeline = channel.pipeline();
  16. pipeline.addLast(new MyInbound());
  17. }
  18. });
  19. ChannelFuture future = boot.bind(8080).sync();
  20. future.channel().closeFuture().sync();
  21. }
  22. static class MyInbound extends ChannelInboundHandlerAdapter {
  23. @Override
  24. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  25. ctx.write(msg);//echo的作用, 将读到的东西,写回客户端
  26. }
  27. @Override
  28. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  29. ctx.flush();
  30. }
  31. }
  32. }

5.3 Netty的经典使用

  • 在多线程的基础上,将线程分为boss和worker
  • boss 专门用来处理连接事件, 只有一个线程
  • worker 专门用来处理读写事件, 可以有多个线程 ```java public class NettyIO_3 {

    public static void main(String[] args) throws InterruptedException {

    1. //bossGroup, 专门处理连接事件
    2. NioEventLoopGroup boss = new NioEventLoopGroup(99);//即使这里传入了99, 会创建99个EventLoop对象, 但是线程被使用的只有一个, 也就是说只有一个线程处理ACCEPT事件
    3. //workerGroup 专门处理读写事件
    4. NioEventLoopGroup worker = new NioEventLoopGroup(20);//20个线程用来处理读写事件
    5. ServerBootstrap boot = new ServerBootstrap();
    6. boot.group(boss, worker)
    7. .channel(NioServerSocketChannel.class)
    8. .childHandler(new ChannelInitializer<NioSocketChannel>() {
    9. @Override
    10. protected void initChannel(NioSocketChannel channel) throws Exception {
    11. ChannelPipeline pipeline = channel.pipeline();
    12. pipeline.addLast(new MyInbound());
    13. }
    14. });
    15. ChannelFuture future = boot.bind(8080).sync();
    16. future.channel().closeFuture().sync();

    }

    static class MyInbound extends ChannelInboundHandlerAdapter {

    1. @Override
    2. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    3. ctx.write(msg);//echo的作用, 将读到的东西,写回客户端
    4. }
    5. @Override
    6. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    7. ctx.flush();
    8. }

    }

} ```