Java Socket Netty

什么是Netty

Netty是由JBOSS提供的一个java开源框架,现为Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
上面是来自于百度百科给出的解释,能清晰的看到,Netty是一个基于NIO的模型,使用Netty的地方很多就是socket服务开发,而关于NIO,相信大家肯定不陌生。

对比Netty和传统的Socket

既然要说Netty,那么肯定要对Netty还有Socket不同的代码进行一个分析,分析的透彻了,才能真的选择使用Netty,而不再进行Socket的开发了,相信到时候,大家肯定会做出最正确的选择。

传统Socket编程服务端

  1. import java.io.IOException;
  2. import java.net.ServerSocket;
  3. import java.net.Socket;
  4. /**
  5. * @ClassName SocketDemo
  6. * @Date 2021/4/19 10:33
  7. * @Description SocketDemo
  8. */
  9. public class SocketServerDemo {
  10. public static void main(String[] args) {
  11. ServerSocket server=null;
  12. try {
  13. server=new ServerSocket(18080);
  14. System.out.println("时间服务已经启动--端口号为:18080...");
  15. while (true){
  16. Socket client = server.accept();
  17. //每次接收到一个新的客户端连接,启动一个新的线程来处理
  18. new Thread(new TimeServerHandler(client)).start();
  19. }
  20. } catch (IOException e) {
  21. e.printStackTrace();
  22. }finally {
  23. try {
  24. server.close();
  25. } catch (IOException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. }
  30. }
  1. import java.io.BufferedReader;
  2. import java.io.IOException;
  3. import java.io.InputStreamReader;
  4. import java.io.PrintWriter;
  5. import java.net.Socket;
  6. import java.util.Calendar;
  7. /**
  8. * @ClassName TimeServerHandler
  9. * @Date 2021/4/19 10:35
  10. * @Description TimeServerHandler
  11. */
  12. public class TimeServerHandler implements Runnable {
  13. private Socket clientProxxy;
  14. public TimeServerHandler(Socket clientProxxy) {
  15. this.clientProxxy = clientProxxy;
  16. }
  17. @Override
  18. public void run() {
  19. BufferedReader reader = null;
  20. PrintWriter writer = null;
  21. try {
  22. reader = new BufferedReader(new InputStreamReader(clientProxxy.getInputStream()));
  23. writer =new PrintWriter(clientProxxy.getOutputStream()) ;
  24. while (true) {//因为一个client可以发送多次请求,这里的每一次循环,相当于接收处理一次请求
  25. String request = reader.readLine();
  26. if (!"GET CURRENT TIME".equals(request)) {
  27. writer.println("BAD_REQUEST");
  28. } else {
  29. writer.println(Calendar.getInstance().getTime().toLocaleString());
  30. }
  31. writer.flush();
  32. }
  33. } catch (Exception e) {
  34. throw new RuntimeException(e);
  35. } finally {
  36. try {
  37. writer.close();
  38. reader.close();
  39. clientProxxy.close();
  40. } catch (IOException e) {
  41. e.printStackTrace();
  42. }
  43. }
  44. }
  45. }

传统Socket编程客户端

  1. import java.io.BufferedReader;
  2. import java.io.IOException;
  3. import java.io.InputStreamReader;
  4. import java.io.PrintWriter;
  5. import java.net.Socket;
  6. /**
  7. * @ClassName SocketClientDemo
  8. * @Date 2021/4/19 10:42
  9. * @Description SocketClientDemo
  10. */
  11. public class SocketClientDemo {
  12. public static void main(String[] args) {
  13. BufferedReader reader = null;
  14. PrintWriter writer = null;
  15. Socket client=null;
  16. try {
  17. client=new Socket("127.0.0.1",18080);
  18. writer = new PrintWriter(client.getOutputStream());
  19. reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
  20. while (true){//每隔5秒发送一次请求
  21. writer.println("GET CURRENT TIME");
  22. writer.flush();
  23. String response = reader.readLine();
  24. System.out.println("Current Time:"+response);
  25. Thread.sleep(5000);
  26. }
  27. } catch (Exception e) {
  28. e.printStackTrace();
  29. } finally {
  30. try {
  31. writer.close();
  32. reader.close();
  33. client.close();
  34. } catch (IOException e) {
  35. e.printStackTrace();
  36. }
  37. }
  38. }
  39. }

来执行一下才能知道效果,
首先运行服务端:

  1. TimeServer Started on 18080...

接着启动客户端

  1. Current Time:2021-4-19 10:48:21
  2. Current Time:2021-4-19 10:48:26
  3. Current Time:2021-4-19 10:48:31
  4. Current Time:2021-4-19 10:48:36
  5. Current Time:2021-4-19 10:48:41
  6. Current Time:2021-4-19 10:48:46
  7. Current Time:2021-4-19 10:48:51

大家看一下,这是不是就是相当于一个Socket的客户端和服务端之间进行通信的过程,在client端可以发送请求指令”GET CURRENT TIME”给server端,每隔5秒钟发送一次,每次server端都返回当前时间。
而这也是传统的BIO的做法,每一个client都需要去对应一个线程去进行处理,client越多,那么要开启的线程也就会越多,也就是说,如果采用BIO通信模型的服务端,通常由一个独立的Acceptor线程负责监听客户端的连接,当接收到客户端的连接请求后,会为每一个客户端请求创建新的线程进行请求的处理,处理完成后通过输出流返回信息给客户端,响应完成后销毁线程。
模型图如下
使用Netty进行Socket编程 - 图1
这时候就有大佬说,不会用线程池么?使用线程池的话,它实际上并没有解决任何实际性的问题,他实际上就是对BIO做了一个优化,属于伪异步IO通信。
伪异步IO通信模型图
使用Netty进行Socket编程 - 图2
异步IO通信确实能缓解一部分的压力,但是这种模型也是有缺陷的,当有大量客户端请求的时候,随着并发访问量的增长,伪异步IO就会造成线程池阻塞。
这时候就取决于是想选择,系统发生线程堆栈溢出、创建新线程失败等问题呢,还是选择大量客户端请求,造成线程池阻塞。
都说,技术是为了解决问题而出现的,那么接下来就有了解决这个问题的技术出现了,Netty,来看看Netty吧。

Netty环境搭建

在这里使用的依旧是Springboot来整合Netty的环境,然后在后续过程中,使用Netty实现服务端程序和客户端程序,虽然Netty并没有实现传说中的AIO,但是已经算是吧这个NIO的模型,实现到了极致了。

先创建出来一个项目

image.png

加入Netty的pom的依赖

  1. <!--Netty-->
  2. <dependency>
  3. <groupId>io.netty</groupId>
  4. <artifactId>netty-all</artifactId>
  5. <version>4.1.31.Final</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.projectlombok</groupId>
  9. <artifactId>lombok</artifactId>
  10. <version>1.16.22</version>
  11. </dependency>
  12. <!-- logger -->
  13. <dependency>
  14. <groupId>org.slf4j</groupId>
  15. <artifactId>slf4j-api</artifactId>
  16. <version>1.7.25</version>
  17. </dependency>
  18. <dependency>
  19. <groupId>ch.qos.logback</groupId>
  20. <artifactId>logback-core</artifactId>
  21. <version>1.2.3</version>
  22. </dependency>
  23. <dependency>
  24. <groupId>ch.qos.logback</groupId>
  25. <artifactId>logback-classic</artifactId>
  26. <version>1.2.3</version>
  27. </dependency>

Netty服务端程序

  1. import io.netty.bootstrap.ServerBootstrap;
  2. import io.netty.channel.ChannelFuture;
  3. import io.netty.channel.ChannelInitializer;
  4. import io.netty.channel.EventLoopGroup;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.SocketChannel;
  7. import io.netty.channel.socket.nio.NioServerSocketChannel;
  8. import io.netty.handler.codec.LineBasedFrameDecoder;
  9. import io.netty.handler.codec.string.StringDecoder;
  10. /**
  11. * @ClassName NettyServerDemo
  12. * @Date 2021/4/19 11:11
  13. * @Description NettyServerDemo
  14. */
  15. public class NettyServerDemo {
  16. private int port=18081;
  17. public void run() throws Exception {
  18. EventLoopGroup bossGroup = new NioEventLoopGroup();
  19. EventLoopGroup workerGroup = new NioEventLoopGroup();
  20. try {
  21. ServerBootstrap b = new ServerBootstrap();
  22. b.group(bossGroup, workerGroup)
  23. .channel(NioServerSocketChannel.class)
  24. .childHandler(new ChannelInitializer<SocketChannel>() {
  25. @Override
  26. public void initChannel(SocketChannel ch) throws Exception {
  27. ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
  28. ch.pipeline().addLast(new StringDecoder());
  29. ch.pipeline().addLast(new TimeServerHandler());
  30. }
  31. });
  32. ChannelFuture f = b.bind(port).sync();
  33. System.out.println("TimeServer Started on 18081...");
  34. f.channel().closeFuture().sync();
  35. } finally {
  36. workerGroup.shutdownGracefully();
  37. bossGroup.shutdownGracefully();
  38. }
  39. }
  40. public static void main(String[] args) throws Exception {
  41. new NettyServerDemo().run();
  42. }
  43. }
  1. import io.netty.buffer.ByteBuf;
  2. import io.netty.buffer.Unpooled;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. import java.util.Date;
  6. /**
  7. * @ClassName TimeServerHandler
  8. * @Date 2021/4/19 11:19
  9. * @Description TimeServerHandler
  10. */
  11. public class TimeServerHandler extends ChannelInboundHandlerAdapter {
  12. @Override
  13. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  14. String request = (String) msg;
  15. String response = null;
  16. if ("QUERY TIME ORDER".equals(request)) {
  17. response = new Date(System.currentTimeMillis()).toString();
  18. } else {
  19. response = "BAD REQUEST";
  20. }
  21. response = response + System.getProperty("line.separator");
  22. ByteBuf resp = Unpooled.copiedBuffer(response.getBytes());
  23. ctx.writeAndFlush(resp);
  24. }
  25. }

Netty客户端程序

  1. import io.netty.bootstrap.Bootstrap;
  2. import io.netty.channel.ChannelFuture;
  3. import io.netty.channel.ChannelInitializer;
  4. import io.netty.channel.EventLoopGroup;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.SocketChannel;
  7. import io.netty.channel.socket.nio.NioSocketChannel;
  8. import io.netty.handler.codec.LineBasedFrameDecoder;
  9. import io.netty.handler.codec.string.StringDecoder;
  10. /**
  11. * @ClassName NettyClientDemo
  12. * @Date 2021/4/19 11:21
  13. * @Description NettyClientDemo
  14. */
  15. public class NettyClientDemo {
  16. public static void main(String[] args) throws Exception {
  17. String host = "localhost";
  18. int port = 18081;
  19. EventLoopGroup workerGroup = new NioEventLoopGroup();
  20. try {
  21. Bootstrap b = new Bootstrap();
  22. b.group(workerGroup);
  23. b.channel(NioSocketChannel.class);
  24. b.handler(new ChannelInitializer<SocketChannel>() {
  25. @Override
  26. public void initChannel(SocketChannel ch) throws Exception {
  27. ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
  28. ch.pipeline().addLast(new StringDecoder());
  29. ch.pipeline().addLast(new TimeClientHandler());
  30. }
  31. });
  32. // 开启客户端.
  33. ChannelFuture f = b.connect(host, port).sync();
  34. // 等到连接关闭.
  35. f.channel().closeFuture().sync();
  36. } finally {
  37. workerGroup.shutdownGracefully();
  38. }
  39. }
  40. }
  1. import io.netty.buffer.ByteBuf;
  2. import io.netty.buffer.Unpooled;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. /**
  6. * @ClassName TimeClientHandler
  7. * @Date 2021/4/19 11:22
  8. * @Description TimeClientHandler
  9. */
  10. public class TimeClientHandler extends ChannelInboundHandlerAdapter {
  11. private byte[] req=("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
  12. @Override
  13. public void channelActive(ChannelHandlerContext ctx) {//1
  14. ByteBuf message = Unpooled.buffer(req.length);
  15. message.writeBytes(req);
  16. ctx.writeAndFlush(message);
  17. }
  18. @Override
  19. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  20. String body = (String) msg;
  21. System.out.println("Now is:" + body);
  22. }
  23. }

首先启动服务端,控制台输出:

  1. TimeServer Started on 18081...

接着启动客户端,控制要输出:

  1. Now is:Mon Apr 19 11:34:21 CST 2021

既然代码写了,那是不是就得来分析一下这个Netty在中间都干了什么东西,他的类是什么样子的,都有哪些方法。
大家先从代码的源码上开始看起,因为在代码中分别使用到了好几个类,而这些类的父类,或者是接口定义者追根到底,也就是这个样子的,从IDEA中打开他的类图可以清晰的看到。
使用Netty进行Socket编程 - 图4
而在源码中,最重要的就是这个Channel,接下来就来分析一波吧。

Channel

All I/O operations are asynchronous.一句话点出核心所有的IO操作都是异步的,这意味着任何I/O调用都将立即返回,但不保证请求的I/O操作已完成。这是在源码的注释上面给出的解释。

Channel分类

  • 服务端: NioServerSocketChannel
  • 客户端: NioSocketChannel

看到这个,大家肯定也都不陌生,因为Channel即可以在JDK的Socket中充当管道出现,同时,也在Netty的服务端和客户端进行IO数据交互,充当一个媒介的存在,那么他的区别在哪?
Netty对Jdk原生的ServerSocketChannel进行了封装和增强封装成了NioXXXChannel, 相对于原生的JdkChannel,Netty的Channel增加了如下的组件。

  • id 标识唯一身份信息
  • 可能存在的parent Channel
  • 管道 pepiline
  • 用于数据读写的unsafe内部类
  • 关联上相伴终生的NioEventLoop

在官网可以了解这个这个类的API有更多的信息io.netty.channel
而关于Channel,其实换成大家容易理解的话的话,那就是由它负责同对端进行网络通信、注册和数据操作等功能

A Channel can have a parent depending on how it was created. For instance, a SocketChannel, that was accepted by ServerSocketChannel, will return the ServerSocketChannel as its parent on parent().
The semantics of the hierarchical structure depends on the transport implementation where the Channel belongs to. For example, you could write a new Channel implementation that creates the sub-channels that share one socket connection, as BEEP and SSH do.

一个Channel可以有一个父Channel,这取决于它是如何创建的。例如,被ServerSocketChannel接受的SocketChannel将返回ServerSocketChannel作为其parent()上的父对象。层次结构的语义取决于通道所属的传输实现。
Channel的抽象类AbstractChannel中有一个受保护的构造方法,而AbstractChannel内部有一个pipeline属性,Netty在对Channel进行初始化的时候将该属性初始化为DefaultChannelPipeline的实例。

为什么选择Netty

同步阻塞I/O(BIO) 伪异步I/O 非阻塞I/O (NIO) 异步I/O (AIO)
I/O类型(同步) 同步I/O 同步I/O 同步I/O (I/O多路复用) 异步I/O
API使用难度 简单 简单 非常复杂 复杂
调试难度 简单 简单 复杂 复杂
可靠性 非常差
吞吐量

其实在上面的图中,已经能看出来了,不同的I/O模型,效率,使用难度,吞吐量都是非常重要的,所以选择的时候,肯定要慎重选择,而为什么不使用Java原生的呢?
实际上很简单,1.复杂,2.不好用
对于Java的NIO的类库和API繁杂使用麻烦,需要熟练掌握Selectol,ServerSocketChannel,SocketChannel,ByteBuffer
JDK NIO的BUG,比如epoll bug,这个BUG会在linux上导致cpu 100%,使得nio server/client不可用,而且在1.7中都没有解决完这个bug,只不过发生频率比较低。
而Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。