netty核心技术与源码剖析

1、netty的介绍和应用场景

netty是基于异步的网络io通讯框架,并且基于事件驱动。

具有非阻塞的特点,什么是非阻塞呢?

比如我们的传统bs架构,当我们的浏览器发送请求,这个线程就必须等到服务器的响应才会继续执行,在这个过程中线程就是阻塞的,而ajax则支持异步通讯,当请求发送过去,线程并不需要等待响应结果,而是可以继续执行下面的逻辑,而是有一个专门的回调函数去收到响应。此时线程就是非阻塞的。
image-20201125152349860.png

netty的基本架构
image-20201125153318828.png
NIO对原生的IO进行了优化,netty对NIO进行封装和二次优化,最底层是基于TCP的ip协议。

RPC框架底层都是基于netty来进行数据传输的,比如游戏行业也是基于netty进行网络数据传输。

BIO模型
image-20201125160907387.png

NIO模型
image-20201125161104020.png

BIO与NIO的区别

BIO是传统的IO模型,我们的客户端发送一次请求,服务器会单独提供一个线程来与客户端建立通道,此时线程是阻塞的,并不会长时间工作,此时就会占用系统资源,而我们的服务器也会有性能的瓶颈。

而NIO则是具有异步的效果,它会单独开一个selector选择器,来监听所有的通道,使用轮询和事件驱动的方式,这样当我们的通道在与客户端进行读写时,就不需要等待着它执行完成,而是去处理另外的通道,当某个通道发生了一个事件,此时便会与对应的通道进行连接来进行处理。我们分多个选择器来进行交互。

BIO适合连接数较小,对服务器的性能要求较高,并发只能发生在应用程序中,JDK1.4之前只支持这种方式。

NIO适合连接数较多且连接时间较短的架构,比如即时通讯,弹幕。

BIO的简单通讯步骤

  1. 客户发送请求建立一个socket,服务端建立一个socket
  2. 服务端创建一条单独的线程与之进行通讯
  3. 客户发送请求先咨询是否有线程响应,没有则等待或者超时
  4. 正常情况下,客户端与服务端建立连接通讯过程中,本地客户端线程处于阻塞状态。
  5. 收到响应,客户端线程继续执行。

NIO的基本介绍

NIO是对BIO的扩展和重写,并且是非阻塞模式

NIO有三个组件,channel管道,buffer缓冲区和selector选择器。

当一次请求发送到服务器,建立一个socket,然后会创建一个线程,这个线程维护一个selector,而selector会与多个客户端之间通过一个管道连接,即channel,管道和客户端之间有一个buffer缓冲区,当我们客户端进行读写时数据都会存放到缓冲区中,我们的selector会采用轮询的方式来获取我们管道中的数据,如果没有可用的数据,那么就会处理别的管道,去干别的事情,而BIO模型就会阻塞等待完全的数据读写完成,假设有10000个请求,BIO就得分配10000条线程来进行处理,而我们的NIO只需要创建200条线程,每个线程的selector管理50个通道。

selector 、buffer、channel的关系
image-20201125175814008.png

  • 每一个channel都会对应一个buffer,并且是双向绑定。
  • 每一个buffer其实底层都是一个数组,与传统的BIO不同,BIO不是输入流就是输出流,并不能双向操作,而buffer可以进行读也可以进行写,只不过要使用flip切换。
  • 每个selector对应一个线程,一个线程对应多个channel,selector本质就是一个对象的形式存在。
  • 程序切换到哪一个channel是由事件决定的,Event就是一个重要的概念
  • 每一个selector根据不同的事件在channel中切换
  • channel是双向的,可以返回底层操作系统的情况,比如linux底层就是双向的。

buffer类的常用api
image-20201125221828381.png

buffer类的常用四个属性
image-20201125222221264.png

buffer的limit属性是指缓冲区的重点,当position所指向的数字指向limit值时,此时就表示一次读写完成,并且需要我们用flip方法重置position的值,让下一次的读写正常进行,不然读到的都是空数据。

buffer类是最顶级的抽象类,除了boolean其他基本数据类型都有他对应的子类

在网络传输中由于我们都是通过字节的方式来传输,所以bytebuffer用的是比较多的。

channel

使用channel进行写入本地文件。

  1. public class NioChannelFile {
  2. public static void main(String[] args) throws Exception {
  3. String str = "hello,chaosheng";
  4. // 创建一个文件输出流
  5. FileOutputStream out = new FileOutputStream("d://123.txt");
  6. // 从输出流中获取一个filechannel 这个channel的真实类型是它的子类
  7. FileChannel channel = out.getChannel();
  8. // 创建一个buffer缓冲区
  9. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  10. // 将string放入到到缓冲区中
  11. byteBuffer.put(str.getBytes());
  12. // 写入完毕 应该转换成读取状态
  13. byteBuffer.flip();
  14. // 写入到通道中
  15. channel.write(byteBuffer);
  16. out.close();
  17. }
  18. }

代码流程示意图
image-20201125230725232.png
filechannel类的重要方法
image-20201125231835351.png
读取本地文件

  1. public class NioChannelFileRead {
  2. public static void main(String[] args) throws Exception {
  3. // 创建一个文件输入流
  4. File file = new File("d://123.txt");
  5. FileInputStream fileInputStream = new FileInputStream(file);
  6. // 获取通道
  7. FileChannel channel = fileInputStream.getChannel();
  8. // 创建缓冲区
  9. ByteBuffer byteBuffer =ByteBuffer.allocate(20);
  10. // 缓冲区读取channel中的数据
  11. channel.read(byteBuffer);
  12. System.out.println(new String(byteBuffer.array()));
  13. }
  14. }

利用一个buffer完成文件的读和写

  1. public class NioChannelReadAndWrite {
  2. public static void main(String[] args) throws Exception{
  3. FileInputStream fileInputStream = new FileInputStream("123.txt");
  4. FileChannel channel = fileInputStream.getChannel();
  5. FileOutputStream fileOutputStream = new FileOutputStream("12.txt");
  6. FileChannel channel1 = fileOutputStream.getChannel();
  7. ByteBuffer byteBuffer = ByteBuffer.allocate(512);
  8. while (true){
  9. // byteBuffer.clear();
  10. int read = channel.read(byteBuffer);
  11. System.out.println(read);
  12. if (read == -1){
  13. break;
  14. }
  15. byteBuffer.flip();
  16. channel1.write(byteBuffer);
  17. byteBuffer.flip();
  18. // 在这里程序有一个bug,如果我们的程序在此处跳到下一处循环,那么此时read等于0,如果不做clear或者再次filp,那么会死循环。
  19. // 因为此时limit等于poposition 下一次读就是0
  20. }
  21. fileInputStream.close();
  22. fileOutputStream.close();
  23. }
  24. }

使用transferFrom完成文件的赋值。

  1. public class FileTransferFrom {
  2. public static void main(String[] args) throws Exception {
  3. FileInputStream fileInputStream = new FileInputStream("D:\\BaiduNetdiskDownload\\尚硅谷Netty学习资料.zip");
  4. FileOutputStream fileOutputStream = new FileOutputStream("d://a");
  5. FileChannel channel = fileInputStream.getChannel();
  6. FileChannel channel1 = fileOutputStream.getChannel();
  7. long l1 = System.nanoTime() / 1000000L;
  8. channel1.transferFrom(channel,0,channel.size());
  9. long l2 = System.nanoTime() / 1000000L;
  10. System.out.println(l2-l1);
  11. fileOutputStream.close();
  12. fileInputStream.close();
  13. }
  14. }

注意事项
image-20201126011713130.png

MappedByteBuffer 可以直接在内存中修改文件,并不需要从jvm中再进行一次IO,所以是与操作系统的交互,操作系统不需要再复制。

当我们一个buffer不够用时们使用一个buffer数组来进行传输,因为buffer本质上是一个byte数组,而jvm的内存都是碎片化的,所以很难找到一块大空间且内存地址连续,所以说创建一个大容量buffer效率不够高。

1、NIO梳理

NIO的通道类似于流,但是传统的BIO中是包含channel的,类似于流但是还是有本质的区别。

  • 通道可以同时读写,而流只能分output和input
  • 通道可以异步读取数据
  • 通道可以从缓冲区中读写数据

2、selector选择器

image-20201126174912799.png
当客户端发送一次请求建立一个socket,服务器也会对应的创建一个socket建立连接,然后分配一条线程与之通讯,然后会注册一个selector对象,selector对象来通过channel与之进行交互,以事件驱动的方式监听着每一个channel,一旦一个channel发生了一个事件,selector就会来处理,而传统IO在这个过程中是阻塞的,线程会挂起浪费服务器资源,而NIO通过一个线程管理一个selector,一个selector监听多个channel的方式达到多路复用的效果,大大减少了系统的线程开销。

我们通过调用open()方法会得到一个selector对象,selector对象可以调用它的以下方法来监听channel。

public abstract int select(); 这种方式会阻塞,会一直监听channel直到返回一个key
public abstract int select(long timeout) 可以设置超时时间,时间一到就返回。
public abstract int selectNow() ; 非阻塞,调用查询后无论有没有都返回。

select()方法获取到channel对应selectionkey后会返回并加入到内部集合,然后通过selectedKeys方法可以得到内部集合中保存的keys,每一个keys就是一个对应的事件,然后可以获取到这个key对应的channel然后对它进行操作。
image-20201126192807717.png

1、理论原理

  1. 服务器会创建一个serversocketchannel来监听一个端口,服务器与之建立socket连接之后,每个客户端都会生成一个socketchannel
  2. 会和客户端的socketchannel连接。
  3. 然后将socketchannel注册到selector上面,通过register()方法,一个channel可以注册多个socketchannel
  4. 注册后返回一个selectionKey,然后存放进selector中,以集合的形式保存。
  5. 然后selector会通过select()方法来进行各个socketchannel的监听,这个select方法是可选的,有阻塞的也有不阻塞的。
  6. 监听到有事件发生,会得到selectionkey,反向获取socketchannel(channel方法)得到channel
  7. 完成业务处理。

image-20201126215605463.png

2、代码层面补充

服务器

  1. /**
  2. * @Author chaosheng
  3. * @Date 2020/11/26 21:59
  4. * @Version 1.0
  5. */
  6. public class NioServer {
  7. public static void main(String[] args) throws Exception{
  8. // 创建一个serversocketchannel
  9. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  10. // 得到一个Selecor对象,来监听每一个socketchannel
  11. Selector selector = Selector.open();
  12. //绑定一个端口6666, 在服务器端监听
  13. serverSocketChannel.socket().bind(new InetSocketAddress(6666));
  14. //设置为非阻塞
  15. serverSocketChannel.configureBlocking(false);
  16. //把 serverSocketChannel 注册到 selector 关心 事件为 OP_ACCEPT
  17. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  18. System.out.println("注册后的selectionkey 数量=" + selector.keys().size()); // 1
  19. // 等待客户端连接,客户端连接6666端口,会产生一个socketchannel
  20. while (true){
  21. if (selector.select(1000)==0){
  22. System.out.println("客户端在等待连接");
  23. continue;
  24. }
  25. // 如果走到这,就说明有客户端向6666端口发起了请求,并创建一个socketchannel
  26. //1.如果返回的>0, 表示已经获取到关注的事件
  27. //2. selector.selectedKeys() 返回关注事件的集合
  28. // 通过 selectionKeys 反向获取通道
  29. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  30. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  31. while (iterator.hasNext()){
  32. SelectionKey selectionKey = iterator.next();
  33. // 遍历 得到selectkey 判断它发生了什么事件 如果是OP_ACCEPT 就为他注册一个socketchannel
  34. if (selectionKey.isAcceptable()){
  35. // accept方法是阻塞的,但是我们的传统IO是不知道有连接而是阻塞着等待,而我们这里是已经知道了事件的发生,所以马上会执行并不会阻塞。
  36. SocketChannel socketChannel = serverSocketChannel.accept();
  37. System.out.println("客户端连接成功 生成了一个 socketChannel " + socketChannel.hashCode());
  38. // 将这个socketchannel也注册到selector中,并且设置为非阻塞
  39. socketChannel.configureBlocking(false);
  40. // 将这个socketchannel注册到selector中,为他绑定一个读的事件,并绑定一个bytebuffer
  41. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  42. socketChannel.register(selector,SelectionKey.OP_READ,byteBuffer);
  43. System.out.println("客户端连接后 ,注册的selectionkey 数量=" + selector.keys().size());
  44. }
  45. // 如果selectkey中发生了读的事件,就将它的buffer中的数据打印到控制台
  46. if (selectionKey.isReadable()){
  47. // 我们通过selectkey的channel方法获取到对应的channel
  48. SocketChannel channel = (SocketChannel) selectionKey.channel();
  49. // 获取到该channel关联的buffer
  50. ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
  51. // 将channel中的数据写入buffer
  52. buffer.flip();
  53. channel.read(buffer);
  54. System.out.println("form 客户端 " + new java.lang.String(buffer.array()));
  55. }
  56. }
  57. }
  58. }
  59. }

客户端

  1. public class NIOClient {
  2. public static void main(String[] args) throws Exception {
  3. // 创建一个socketchannel与服务器进行连接
  4. SocketChannel channel = SocketChannel.open();
  5. // 创建一个客户端IP和端口的address
  6. InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
  7. // 设置为非阻塞
  8. channel.configureBlocking(false);
  9. // 连接服务器
  10. if (!channel.connect(inetSocketAddress)){
  11. while (!channel.finishConnect()){
  12. System.out.println("因为连接需要事件,客户端并不会阻塞在这,可以做别的事");
  13. }
  14. }
  15. String s = "hello world";
  16. ByteBuffer wrap = ByteBuffer.wrap(s.getBytes());
  17. channel.write(wrap);
  18. System.in.read();
  19. }
  20. }

selector的keys()和selectkeys()不是一个含义,前者是所有发生的连接数,后者是执行时发生了多少次事件。
image-20201126232114848.png
selectkey相关的API

image-20201126233848715.png
serversocketchannelAPI
image-20201126234434795.png

socketchannel和serversocketchannel的区别

  • serversocketchannel负责监听某一个端口,当服务端有socket连接时会创建一个socketchannel。
  • socketchannel主要负责读写操作
  • 相同点:他们都要注册到selector中。

3、基于NIO的简单群聊系统

思路:

  • 分客户端和服务端,服务端连接客户端时提示客户端上线,显示ip地址和端口
  • 客户端离线即广播离线通知,客户端离线会抛出异常,捕获到异常来实现
  • 服务器端进行消息转发到其他在线客户端,并且排除自己。
  • 每一个客户端其实就是一个socketchannel,可以使用selector的key()来获取所有的事件,每一个事件可以得到一个channel
  • 每一个客户端可以发送消息给其他客户端,能收到其他客户端的所有消息。

客户端代码

3、零拷贝

零拷贝是指从操作系统看的,指不经过cpu拷贝。

DMA是避免不了的。 DMA指的是直接内存拷贝

传统IO是需要经过4次转换,4次拷贝的。

到了LINUX优化后的sendfile时,只需要两次切换和2次拷贝,所以就是一次零拷贝了。

直接从内核中DMA数据到缓冲,再从缓存DMA到协议栈。

对比
image-20201128134010055.png

2、netty

为什么有了nio还需要用netty

首先,NIO使用太复杂了,需要熟练掌握serversocketchannel,selector,socketchannel,buffer,以及了解他们之间的关系

image-20201128151112511.png
netty的优点
image-20201128151214836.png

1、线程模型

目前有两种线程模型

传统I/O线程模式

Reactor线程模式

  1. 单reactor单线程
  2. 单reacto多线程
  3. 主从reactor多线程

netty在主从reactor多线程模型上做了一定的改进,其中主从reactor中有多个reactor

传统阻塞I/O线程模型
image-20201128152847059.png

Reactor线程模式
image-20201128153847262.png

改进:

传统的io服务模型是一个请求对应一个线程和一个handle来处理,handle是阻塞对象

而reactor则是创建一个阻塞的hanle对象,然后在他之后有一个线程池,这个阻塞对象在收到请求时会去线程池里中一条线程来完成业务处理。
image-20201128154708788.png

reactor和传统io模型最大的不同点就是传统io是一次请求会有一个处理的线程和handle

而reactor则是多路复用一个handle,然后将请求同步分派给线程池中的某个处理线程来处理。

并且reactor模式是通过事件驱动监听方式,收到事件后就会分发给一个线程或者进程来处理请求。

eventhandle是不阻塞的,只有当我们的dispatchhandle监听到IO事件发生完毕,才会派分给eventhandler去执行,并且是一个线程池,所以他们没有工作要干的时候是处于闲置状态的,不会占用系统资源。

2、reactor线程模式的三种实现

单reactor单线程无法处理多并发请求
image-20201128160746179.png

我们前面的NIO群聊就属于这种模型,都是通过main线程来处理一套完整的流程,reactor调用我们的selector的select()方法来监听事件,然后是连接事件就通过acceptor调用accept()然后通过handle来处理,但是我们是直接通过api操作的,我们也可以将方法封装到一个类中,本质上它们并没有区别,然后通过handle对象来完成read 业务处理 send这个流程,但是这种模式的缺点就是只有一个线程无法应对多线程场景,此时假设当一个read事件正在处理,reactor监听到了另一个客户端发生的事件此时再来分配给handle处理,实际上是需要等待前面的read-业务处理-send完成之后才可以执行,是需要等待线程执行完毕才会执行的。

image-20201128162130915.png
单reactor多线程
image-20201128163506573.png

优点:可以充分利用多核cpu的性能

缺点:由于业务处理涉及到了多线程场景,所以对共享数据的安全不能够保障,并且reactor运行在主线程中监听派发事件,高并发下也有瓶颈。

主从reactor
image-20201128170317768.png

image-20201128170346800.png
分成三层,每层处理相应的工作。

优点:父reactor和子reactor分工明确,父来监听连接事件然后通过acceptor注册一个事件交给子reactor,然后又子reactor完成后续。

数据交互也简单,只需要将新连接传给子reactor,数据由子reactor通过对应的channel返回给客户端。

缺点:编程复杂度高

Nginx和netty都使用了这种模式。

3、netty的模型

image-20201128173601465.png
netty客户端

  1. public static void main(String[] args) throws InterruptedException {
  2. // 创建bossgroup和 workgroup
  3. // 1.创建了两个线程组,分别是bossgroup和workgroup
  4. // 2.bossgroup只会处理连接请求,真正的业务端处理会交给workgroup完成
  5. // 3.两个都是无限循环
  6. EventLoopGroup boosGroup = new NioEventLoopGroup();
  7. EventLoopGroup workGroup = new NioEventLoopGroup();
  8. // 创建服务器端启动的对象,配置启动参数 是链式编程
  9. ServerBootstrap bootstrap = new ServerBootstrap();
  10. bootstrap.group(boosGroup,workGroup) // 设置两个线程组
  11. .channel(NioServerSocketChannel.class) // NioServerSocketChannel作为通道实现
  12. .option(ChannelOption.SO_BACKLOG,128) // 设置线程队列得到连接的个数
  13. .childOption(ChannelOption.SO_KEEPALIVE,true)// 设置保持活动连接状态
  14. .childHandler(new ChannelInitializer<SocketChannel>() { // 给我们的workgroup的eventloop对应的管道设置处理器
  15. @Override
  16. protected void initChannel(SocketChannel ch) throws Exception {
  17. // 可以通过socketchannel获取pipeline 给他添加一个自己的handle处理器
  18. ch.pipeline().addLast(new NettyServerHandle());
  19. }
  20. });
  21. System.out.println("服务器 is ready");
  22. // 绑定一个端口并且同步处理 启动服务器
  23. // 对关闭通道进行监听 并不是马上关闭通道
  24. ChannelFuture sync = bootstrap.bind(6666).sync();
  25. }

自定义handle,通过channel可以获取对应的pipeline,设置它的handle方法

继承 ChannelInboundHandlerAdapter

  1. public class NettyServerHandle extends ChannelInboundHandlerAdapter {
  2. @Override
  3. /**
  4. * ctx 上下文对象 ,所有的通道 管道 ip地址都可以通过它获取
  5. * msg 数据对象
  6. */
  7. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  8. System.out.println(ctx);
  9. ByteBuf buf = (ByteBuf) msg;
  10. System.out.println("客户端发送的消息是:"+buf.toString(CharsetUtil.UTF_8));
  11. System.out.println("客户端地址是:"+ctx.channel().remoteAddress());
  12. }
  13. @Override
  14. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  15. ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server".getBytes(CharsetUtil.UTF_8)));
  16. }
  17. @Override
  18. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  19. ctx.close();
  20. }
  21. }

客户端代码

  1. public class NettyClientHandle extends ChannelInboundHandlerAdapter {
  2. @Override
  3. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  4. System.out.println(ctx);
  5. ctx.writeAndFlush(Unpooled.copiedBuffer("来自曾说潮生".getBytes(CharsetUtil.UTF_8)));
  6. }
  7. @Override
  8. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  9. System.out.println("服务器的地址" + ctx.channel().remoteAddress());
  10. ByteBuf byteBuf = (ByteBuf) msg;
  11. System.out.println(byteBuf.toString(CharsetUtil.UTF_8));
  12. }
  13. @Override
  14. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  15. cause.printStackTrace();
  16. ctx.close();
  17. }
  18. }

netty执行流程中的taskQueue
image-20201129144558222.png

当我们的业务流程执行时间过长时,此时还是会阻塞,但是我们可以通过将任务提交到Queue中异步执行,这时就会并发执行,NIOEventgroup中还维护了一个线程,这个线程和它的线程组中的线程不是同一个线程,可以异步执行任务。

第一种方式,我们将我们的异步任务提交到TaskQueue中执行。

第二种方式是定时任务,需要提交到ScheduleTaskQueue中执行,并且指定一个参数和时间单位。

第三种方式是在我们客户端刚开始注册的时候,会拿到他的socketchannel,我们可以用一个集合来管理每一个注册来的socketchannel,然后在queue或者schedule中进行定时消息推送。

比如每一个客户端连接一次,拿到它的socketchannel并且根据用户标识存入一个map中,在我们服务器的handle方法中,拿到这个集合获取所有的channel,将channel提交到queue或者schedule队列中,调用write方法给注册的用户推送消息。

小总结
image-20201129150444366.png

4、异步模型

future-listener机制

异步的概念和同步相对,就是当一个异步调用发出后,调用者不能立马得到执行结果。

Netty的I/O操作都是异步的,包括bind,write,read,connect都会简单的返回一个channelfuture

netty的异步模型是建立在future和fallback之上的,futrue的核心思想就是当一个fun函数的执行过程中非常耗时,等待它的返回结果显然不合适,会给我们返回一个future,后续通过future来监控执行过程。

待续。。

3、后置知识补充

1、Java IO的演进之路

1、 阻塞IO与非阻塞IO的区别

2、为什么要用netty,netty能解决什么