跟着视频学习netty;这个资源地址 【https://www.bilibili.com/video/BV11X4y1K7VZ?p=10】 这个可学学网络基础还行,后边的内容丝毫不连贯,差评
跟着学netty的,看看尚硅谷的课程吧 【https://www.bilibili.com/video/BV1DJ411m7NR?p=2】,这个课程的笔记有人上传了;
笔记:【https://blog.csdn.net/youth_lql/category_10959696.html 目录
https://blog.csdn.net/Youth_lql/article/details/115524052 第1弹 nio等基础
https://blog.csdn.net/Youth_lql/article/details/115734142 第2弹 netty服务器客户端示例
https://blog.csdn.net/Youth_lql/article/details/116015820 第3弹 编解码器+协议+粘包等】
netty: 异步!基于事件驱动! 的网路应用框架!
有一点需要注意 nio 对应的是网络编程,
1 nio
1 三大核心
NIO 有三大核心部分:Channel(通道,对应BIO的socket)、Buffer(缓冲区)、Selector(选择器) 。
这是一个NIO的简易图
这是nio三大组件的结构图
0 三大组件的关系
- 每个channel都对应一个buffer
- 一个selector对应一个线程; 一个线程可以对应多个channel(channel就理解成“链接”、socket的概念就行)
- 程序切换到哪个channel是由事件(event)决定的; event是一个很重要的概念;Sector会根据不同的事件,在各个通道上来回的切换
- Buffer就是一个内存块,底层是一个数组;
- 数据的读取、写入时通过buffer;这个和BIO是不同的,BIO中要么是输入流要么是输出流,不能双向;但是NIO中的buffer可以读也可以写,需要
flip
方法切换channel
;1 Channel 通道
通道的状态会发生变化,一但有变化,就会通知selector;然后selector去处理
可以粗俗的 将channel理解为流或者链接;
我们常用的channel 有 FileChannel (用于文件的读写) DatagramChannel(用于UDP数据的读写) ServerSocketChannel 和 SocketChannel (用于 TCP的数据的读写)1 FileChannel 🌟🌟🌟🌟🌟
主要用于对本地文件进行IO操作
public int read(ByteBuffer dst)
: 从通道读取数据到缓冲区中 🌟🌟🌟🌟🌟🌟🌟public int write(ByteBuffer src)
: 把缓冲区的数据写到通道中🌟🌟🌟🌟🌟🌟public long transferFrom(ReadableByteChannel src,long position,long count)
: 从目标通道中复制数据到当前通道中- public long transferTo(long position,long count,WritableByteChannel target): 把数据从当前通道赋值给目标通道
2 Buffer 缓冲区
buffer相关的一些基本概念和操作、方法什么的之前写过一点;
1 nio基础,浅识ByteBuffer —> IoBuffer —> mina
当时是为了用mina网络应用框架才学习了一下相关的操作api;这里不在单独写了,到时候看这个文章吧;
缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel 提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer,如图:【后面举例说明】
Buffer 是一个顶级抽象类,他的子类有 IntBuffer DoubleBuffer LongBuffer等,每个基本类型都有一个对应的buffer类,但是最长用的还是ByteBuffer(字节数据buffer),我们网络上传输的数据都是二进制数据,所以ByteBuffer很常用;
3 Selector 选择器
Selector 能够检测多个注册的通道上是否有事件发生(注意:多个 Channel 以事件的方式可以注册到同一个 Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。
对于上图的解读:
- 当客户端连接时,会通过 ServerSocketChannel 得到 SocketChannel。
- Selector 进行监听 select 方法,返回有事件发生的通道的个数。
- 将 socketChannel 注册到 Selector 上,register(Selector sel, int ops),一个 Selector 上可以注册多个 SocketChannel。
- 注册后返回一个 SelectionKey,会和该 Selector 关联(集合)。
- 进一步得到各个 SelectionKey(有事件发生)。
- 在通过 SelectionKey 反向获取 SocketChannel,方法 channel()。
- 可以通过得到的 channel,完成业务处理。
解读过程变成代码:
package com.binc.testspring.study.nio;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* FileName: Test2
* Autho: binC
* Date: 2022/4/14 17:49
*/
public class Test2 {
/**
* 服务端
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//创建ServerSocketChannel -> ServerSocket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//serverSocketChannel绑定一个端口6666, 在服务器端监听
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
//设置为非阻塞
serverSocketChannel.configureBlocking(false);
//得到一个Selecor对象
Selector selector = Selector.open();
/*
* 注意:我们从所有的流程图中看到的都是将socketChannel注册到selector中去;
* 实际上我们隐藏的一个操作时,在创建了服务端的serverSocketChannel 也需要注册到selector中去
* */
//把 serverSocketChannel 注册到 selector 关心 事件为 OP_ACCEPT pos_1
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("注册后的selectionkey 数量=" + selector.keys().size()); // 1
//循环等待客户端连接
while (true) {
//这里我们等待1秒,如果没有事件(我们现在这个程序关注的事件是 serverSocketChannel的连接(OP_ACCEPT)事件)发生, 直接返回
if (selector.select(1000) == 0) { //没有事件发生
System.out.println("服务器等待了1秒,无连接");
continue;
}
//如果返回的>0, 就获取到相关的 selectionKey集合
//1.如果返回的>0, 表示已经获取到关注的事件
//2. selector.selectedKeys() 返回关注事件的集合
// 通过 selectionKeys 反向获取通道
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("selectionKeys 数量 = " + selectionKeys.size());
//遍历 Set<SelectionKey>, 使用迭代器遍历
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
//获取到SelectionKey
SelectionKey key = keyIterator.next();
//根据key 对应的通道发生的事件做相应处理
if (key.isAcceptable()) { //如果是 OP_ACCEPT, 有新的客户端连接
//该该客户端生成一个 SocketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客户端连接成功 生成了一个 socketChannel " + socketChannel.hashCode());
//将 SocketChannel 设置为非阻塞
socketChannel.configureBlocking(false);
//将socketChannel 注册到selector, 关注事件为 OP_READ, 同时给socketChannel
//关联一个Buffer
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
System.out.println("客户端连接后 ,注册的selectionkey 数量=" + selector.keys().size()); //2,3,4..
}
if (key.isReadable()) { //发生 OP_READ
//通过key 反向获取到对应channel
SocketChannel channel = (SocketChannel) key.channel();
//获取到该channel关联的buffer
ByteBuffer buffer = (ByteBuffer) key.attachment(); // 得到与之关联的共享数据 即buffer
channel.read(buffer);
System.out.println("form 客户端 " + new String(buffer.array()));
}
//手动从集合中移动当前的selectionKey, 防止重复操作
keyIterator.remove();
}
}
}
/**
* 客户端
* @throws IOException
*/
@Test
public void test() throws IOException {
//得到一个网络通道
SocketChannel socketChannel = SocketChannel.open();
//设置非阻塞
socketChannel.configureBlocking(false);
//提供服务器端的ip 和 端口
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
//连接服务器
if (!socketChannel.connect(inetSocketAddress)) {
// 客户端这是成非阻塞了, 建立链接是需要时间的所以很容易走到这里; 我们尝试不启动服务端,直接启动给客户端,程序就一直在这里执行别的逻辑;
while (!socketChannel.finishConnect()) {
System.out.println("因为连接需要时间,客户端不会阻塞,可以做其它工作..");
}
}
//...如果连接成功,就发送数据
String str = "hello, 尚硅谷~";
//Wraps a byte array into a buffer
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
//发送数据,将 buffer 数据写入 channel
socketChannel.write(buffer);
System.in.read();
}
}
selector api
open() 获取一个selector select() 阻塞 直到获取到有事件发生 select(int time) 阻塞time毫秒,然后返回 selectNow() 非阻塞,立马返回 keys() 获取到所有注册到selector的selectionKey的集合 selectorKeys() 手动获取一下有事件发生的key;
selectorKey api
key.selector() 得到selectorKey与之关联的selector对象 key.channel() 得到与之关联的channel key.attachment() 得到与之关联的共享数据 即: channel key.interestOps(int ops) 设置或者改变监听事件
serverSocketChannel api
ServerSocketChannel 在服务器端监听新的客户端socket连接
open() 得到一个ServerSocketChannel 通道 bind() 设置服务器端口 configureBlocking(Boolean b) 设置阻塞或者非阻塞模式 nio编程中通常都是设置成false;否则就不用nio了 accept(*) 接受一个链接,返回代表这个链接的通道对象 即 socketChannel register(selector,int) 将当前channel注册到selector 并这是监听事件
socketChannel api
SocketChannel ,网络IO通道,具体负责进行读写操作; nio把缓冲区的数据写入到通道,或者把通道的数据读到缓冲区
public int read(ByteBuffer dst)
: 从通道读取数据到缓冲区中 🌟🌟🌟🌟🌟🌟🌟public int write(ByteBuffer src)
: 把缓冲区的数据写到通道中🌟🌟🌟🌟🌟🌟open 得到一个socketChannel 通道 configureBlocking(Boolean b) 设置阻塞或者非阻塞模式 nio编程中通常都是设置成false;否则就不用nio了 connect(SocketAddres addr) 链接服务器 finnishConnect 如果上边方法链接失败,接下来通过该方法完成链接操作; write(buffer) 往通道写数据 read(buffer) 从通道里数据 register(selector,int) 将当前channel注册到selector 并这是监听事件 close 关闭通道
2 netty
这部分的资料:https://blog.csdn.net/Youth_lql/article/details/115734142
nio已经能够完成对同步非阻塞编程的开发了,为什么还要有netty呢,其实就是原生代码nio 编程很复杂,并且粘包 半包等问题得自己解决,
1原生 NIO 存在的问题
- NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer等
- 需要具备其他的额外技能:要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。
- 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。
- JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU100%。直到 JDK1.7 版本该问题仍旧存在,没有被根本解决。
因为有这些问题,所以引出了对nio的封装的框架;netty就是用的nio做的进一步封装;这个封装过程还是涉及到了reactor模型;
2 reactor模型
- 单reactor单线程模型
- 单reactor 多线程模型
- 主从reactor多线程模型
netty就是一句的 主从reactor多线程模型; ngix也是用的这个模型;
3