Bootstrap, ServerBootstrap
- Bootstrap 意思是引导, 一个Netty应用通常由一个Bootstrap开始, 主要作用的配置整个Netty程序, 串联各个组件, Netty中Bootstrap类是客户端程序的启动引导类, ServerBootstrap是服务器端程序的启动引导类
- 常见的方法有:
- public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup), 用于设置服务端的BossGroup和WorkerGroup
- public B group(EventLoopGroup group), 用于设置客户单的EventLoopGroup
- public B channel(Class<? extends C> channelClass), 该方法用来设置一个服务器端的通道实现
- public
B option(ChannelOption option, T value), 该方法用来ServerChannel添加配置 - public
ServerBootstrap childOption(ChannelOption childOption, T value), 用来给接收到的通道添加配置 - public ServerBootstrap childHandler(ChannelHandler childHandler), 该方法用来设置业务处理类(自定义的Handler)
- public ChannelFuture bind(int inetPort), 该方法用于服务端,用来设置绑定的端口
- public ChannelFuture connect(InetAddress inetHost, int inetPort), 该方法用于客户端, 用来连接服务器端
Future, ChannelFuture
Netty中所有的IO操作都是异步的, 不能立刻得知消息是否被正确处理, 但是可以过一会等它执行完成或者直接注册一个监听器, 具体的实现就是通过Future和ChannelFuture, 他们可以注册一个监听, 当操作执行成功或者失败时监听会自动触发注册的监听事件
常见方法:
- Netty网络通信的组件,能够用于执行网络IO操作
- 通过channel可获得当前网络连接的通道的状态
- 通过channel可获得网络连接的配置参数(例如接收缓冲区大小)
- channel提供异步的网络IO操作(如建立连接, 读写,绑定端口), 异步调用意味着任何IO调用都将立即返回,并且不保证在调用结束时所请求的IO操作已完成
- 调用立即返回一个ChannelFuture实例, 通过注册监听器到ChannelFuture上, 可以IO操作成功, 失败或取消时回调通知方
- 支持关联IO操作与对应的处理程序
不同协议, 不同的阻塞类型的连接都有不同的Channel类型与之对应,常用的Channel类型:
Netty基于Selector对象实现IO多路复用, 通过Selector一个线程可以监听多个连接的Channel事件
当一个Selector中注册Channel后, Selector内部的机制就可以自动不断的查询(Select)这些注册的Channel是否有已就绪的IO事件(例如可读, 可写, 网络连接完成等), 这样程序就可以很简单的使用一个线程高效的管理多个Channel
ChannelHandler 及其实现类
ChannelHandler是一个接口, 处理IO事件或者拦截IO操作, 并将其转发到其ChannelPipeline(业务处理链)中的下一个处理程序
- ChannelHandler本身并没有提供很多方法, 因为这个接口有很多的方法需要去实现, 方便使用期间, 可以集成他的子类
- ChannelHandler及其实现类一览图(后)
- 我们经常需要自定义一个handler类去继承ChannelInboundHandlerAdapter, 然后重写相应的方法实现业务逻辑, 我们接下来看看一般都需要重写那些方法
Pipeline 和 ChannelPipeline
ChannelPipeline是一个重点:
- ChannelPipeline是一个Handler的集合, 它负责处理和拦截inbound和outbound的事件和操作, 相当于一个贯穿Netty 的链. (也可以这样理解, ChannelPipeline是保存ChannelHandler的List, 用于处理或拦截Channel的入站事件和出站操作)
- ChannelPipeline实现了一种高效形式的拦截过滤器模式, 使用户可以完全控制事件的处理方式, 以及Channel 中各个的ChannelHandler如何交互
- 在Netty中每个Channel都有且仅有一个ChannelPipeline与之对应, 他们的组成关系如下
- 常用方法
- ChannelPipeline addFirst(ChannelHandler… handlers), 把一个业务处理类(Handler), 添加到链中的第一个位置
- ChannelPipeline addLast(ChannelHandler… handlers), 把一个业务处理类(Handler), 添加到链中的最后一个位置
入站 出站
ChannelHandlerContext
- 保存Channel相关信息的上下文对象, 同时关联一个CHannel对象
- 即ChannelHandlerContext中包含一个具体的事件处理器ChannelHandler, 同时ChannelHandlerContext 中也绑定了对应的pipeline和Channel信息,方便对ChannelHandler进行调用
- 常用方法:
ChannelOption
- Netty在创建Channel实例后, 一般都需要设置ChannelOption参数
- ChannelOption参数如下:
EventLoopGroup和其实现类NioEventLoopGroup
- EventLoopGroup是一组EventLoop的抽象,Netty 为了更好的利用多核CPU资源, 一般会有多个EventLoop同时工作, 每个EventLoop维护者一个Selector实例
- EventLoopGroup提供next接口, 可以从组里面按照一定规则获取其中一个EventLoop来处理任务, 在Netty服务器端编程中, 我们一般都需要提供两个EventLoopGroup 例如:BossEventLoopGroup 和 WorkerEventLoopGroup
- 通常一个服务器端口即一个ServerSocketChannel 对应一个 Selector 和一个EventLoop线程, BOSSEventLoop负责接收客户端的连接并将SocketChannel交给WorkerEventLoopGroup来进行IO处理,如下图所示
常用方法
Netty提供了一个专门用来操作缓冲区(即 Netty的数据容器)的工具类
- 常用方法如下
- 举例说明Unpooled获取Netty的数据容器ByteBuf的基本使用[案例演示]
案例一
package com.dance.netty.netty.buf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
public class NettyByteBuf01 {
public static void main(String[] args) {
/*
* 创建一个ByteBuf
* 创建对象, 该对象包含一个数组arr, 是一个byte[10]
* 在Netty的Buffer中, 不需要使用flip进行反转
* 底层维护了readerIndex 和 writerIndex
* 通过readerIndex, writerIndex, capacity 将buffer分为三个区域
* 0 -> readerIndex : 已读区域
* readerIndex -> writerIndex : 未读区域
* writerIndex -> capacity : 可写区域
*/
ByteBuf buffer = Unpooled.buffer(10);
for (int i = 0; i < 10; i++) {
buffer.writeByte(i);
}
System.out.println("capacity is "+buffer.capacity());
// readByte会使 readerIndex增长 如果指定位置则不会
for (int i = 0; i < buffer.capacity(); i++) {
System.out.println(buffer.readByte());
}
}
}
案例二
package com.dance.netty.netty.buf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.charset.StandardCharsets;
public class NettyByteBuf02 {
public static void main(String[] args) {
ByteBuf byteBuf = Unpooled.copiedBuffer("hello,world!", StandardCharsets.UTF_8);
// 调用相关的方法
if(byteBuf.hasArray()){
// 获取字节数组
byte[] array = byteBuf.array();
// 转为字符串输出
System.out.println(new String(array,StandardCharsets.UTF_8));
// 输出ByteBuf
System.out.println("byte buf is " + byteBuf);
// 数组偏移量
System.out.println(byteBuf.arrayOffset());
// readerIndex 位置
System.out.println(byteBuf.readerIndex());
// writerIndex 位置
System.out.println(byteBuf.writerIndex());
// capacity 边界
System.out.println(byteBuf.capacity());
// 读取宇哥字节
System.out.println(byteBuf.readByte());
// 读取指定位置的一个字节
System.out.println(byteBuf.getByte(0));
// 可读的字节数
System.out.println(byteBuf.readableBytes());
for (int i = 0; i < byteBuf.readableBytes(); i++) {
System.out.println((char)byteBuf.getByte(i));
}
// 从0开始读取4个字节
System.out.println(byteBuf.getCharSequence(0, 4, StandardCharsets.UTF_8));
// 从4开始读取6个字节
System.out.println(byteBuf.getCharSequence(4, 6, StandardCharsets.UTF_8));
}
}
}
Netty应用实例 - 群聊系统
需求
- 编写一个Netty群聊系统, 实现服务器端和客户端之间的数据简单通讯(非阻塞)
- 实现多人群聊
- 服务器端: 可以监测用户上下, 离线, 并实现消息转发功能
- 客户端: 通过channel可以无阻塞发送消息给其他所有用户同时可以接收其他用户发送的消息(由服务器转发得到)
- 目的: 进一步理解Netty 非阻塞网络编程机制
- 看老师代码演示
NettyServer
package com.dance.netty.netty.groupchar;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.nio.charset.StandardCharsets;
public class NettyServer {
private final int port;
public NettyServer(int port) {
this.port = port;
}
public void run() throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast("decoder", new StringDecoder(StandardCharsets.UTF_8))
.addLast("encoder", new StringEncoder(StandardCharsets.UTF_8))
.addLast("myHandler", new NettyServerHandler());
}
});
System.out.println("netty server is starter......");
ChannelFuture sync = serverBootstrap.bind(port).sync();
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new NettyServer(7000).run();
}
}
NettyServerhandler
package com.dance.netty.netty.groupchar;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class NettyServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 定义一个Channel组, 用于管理所有的channel
* GlobalEventExecutor.INSTANCE : 是一个全局的事件执行器, 是一个单例
*/
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/**
* 表示建立连接, 一旦连接, 第一个执行
* @param ctx 上下文对象
* @throws Exception 异常
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
/*
* 将该用户加入聊天的信息提送给其他客户端
* 该方法会将 channelGroup 中 所有的Channel遍历 并发送
*/
channelGroup.writeAndFlush("[客户端]:" + channel.remoteAddress() +" " + dateTimeFormatter.format(LocalDateTime.now()) + " 加入聊天\n" );
// 加入到组中 因为先发送后加入, 就不会发送给自己了
channelGroup.add(channel);
}
/**
* 表示Channel处于活跃的状态
* @param ctx 上下文对象
* @throws Exception 异常
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("[客户端]: " +ctx.channel().remoteAddress() + " 上线了~" +" " + dateTimeFormatter.format(LocalDateTime.now()));
}
/**
* 表示Channel处于非活跃状态
* @param ctx 上下文对象
* @throws Exception 异常
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("[客户端]: " + ctx.channel().remoteAddress() + " 离线了~" +" " + dateTimeFormatter.format(LocalDateTime.now()));
}
/**
* 表示断开连接
* @param ctx 上下文对象
* @throws Exception 异常
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 会自动将channel从ChannelGroup中移除, 不需要手动
channelGroup.writeAndFlush(ctx.channel().remoteAddress() + " 离开了~" +" " + dateTimeFormatter.format(LocalDateTime.now()));
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel channel = ctx.channel();
channelGroup.forEach(x -> {
// 排除自己
if(channel != x){
x.writeAndFlush("[客户] : " + channel.remoteAddress() +" " + dateTimeFormatter.format(LocalDateTime.now()) + " 发送了 -> " +msg + "\n");
}else{
// 回显自己
x.writeAndFlush("[自己] : " + channel.remoteAddress()+" " + dateTimeFormatter.format(LocalDateTime.now()) + " 发送了 -> " +msg + "\n");
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();
}
}
NettyClient
package com.dance.netty.netty.groupchar;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
public class NettyClient {
private final int port;
private final String ip;
public NettyClient(int port, String ip) {
this.port = port;
this.ip = ip;
}
public void run() throws InterruptedException {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new StringDecoder())
.addLast("encoder", new StringEncoder())
.addLast("myHandler", new NettyClientHandler());
}
});
ChannelFuture sync = bootstrap.connect(ip, port).sync();
System.out.println("----------"+sync.channel().localAddress()+"----------");
Scanner scanner = new Scanner(System.in);
Channel channel = sync.channel();
while (scanner.hasNextLine()){
String s = scanner.nextLine();
channel.writeAndFlush(s+ "\r\n");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventExecutors.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new NettyClient(7000, "127.0.0.1").run();
}
}
NettyClientHandler
package com.dance.netty.netty.groupchar;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}
执行结果
NettyServer
netty server is starter......
[客户端]: /127.0.0.1:61298 上线了~ 2022-01-16 18:59:32
[客户端]: /127.0.0.1:61342 上线了~ 2022-01-16 18:59:35
[客户端]: /127.0.0.1:61402 上线了~ 2022-01-16 18:59:49
[客户端]: /127.0.0.1:61402 离线了~ 2022-01-16 19:03:34
NettyClient1
----------/127.0.0.1:61298----------
[客户端]:/127.0.0.1:61342 2022-01-16 18:59:35 加入聊天
[客户端]:/127.0.0.1:61402 2022-01-16 18:59:49 加入聊天
hi flower
[自己] : /127.0.0.1:61298 2022-01-16 19:00:16 发送了 -> hi flower
[客户] : /127.0.0.1:61342 2022-01-16 19:00:37 发送了 -> hi dance
/127.0.0.1:61402 离开了~ 2022-01-16 19:03:34
NettyClient2
----------/127.0.0.1:61342----------
[客户端]:/127.0.0.1:61402 2022-01-16 18:59:49 加入聊天
[客户] : /127.0.0.1:61298 2022-01-16 19:00:16 发送了 -> hi flower
hi dance
[自己] : /127.0.0.1:61342 2022-01-16 19:00:37 发送了 -> hi dance
/127.0.0.1:61402 离开了~ 2022-01-16 19:03:34
NettyClient3
----------/127.0.0.1:61402----------
[客户] : /127.0.0.1:61298 2022-01-16 19:00:16 发送了 -> hi flower
[客户] : /127.0.0.1:61342 2022-01-16 19:00:37 发送了 -> hi dance
Netty心跳检测机制
需求
- 编写一个Netty心跳检测机制案例, 当服务器超过3秒没有读时,就提示读空闲
- 当服务器超过5秒没有写操作时, 就提示写空闲
- 实现当服务器超过7秒没有读或者写操作时, 就提示读写空闲
- 代码如下
NettyServerHertbeat
package com.dance.netty.netty.heartbeat;
import com.dance.netty.netty.groupchar.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
public class NettyServerHertBeat {
private final int port;
public NettyServerHertBeat(int port) {
this.port = port;
}
public void run() throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO)) // 在BossGroup中增加一个日志处理器 日志级别为INFO
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// IdleStateHandler 是Netty提供的处理空闲状态的处理器(这个只是检测,如果存在就会激活一个IDLEStateEvent事件)
/*
* 第一个: 读空闲时间, 表示多长时间没有读, 就会发送一个心跳检测包检测是否连接
* 第二个: 写空闲时间 表示多长时间没有写, 就会发送一个心跳检测包检测是否连接
* 第三个: 所有空闲时间 表示多长时间没有读写, 就会发送一个心跳检测包检测是否连接
* 第四个: 时间单位
* 文档说明: triggers on {@link IdleStateEvent} when a {@link Channel} has not performed read
* ,write, or both operation for a while
* 当 IdleStateEvent 触发后, 就会传递给管道的下一个handler去处理
* 通过调用(触发)下一个handler的userEventTriggered
*/
pipeline.addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
// 加入一个对空闲检测进一步处理的Handler(自定义)
pipeline.addLast(new NettyServerIdleStateHandler());
}
});
System.out.println("netty server is starter......");
ChannelFuture sync = serverBootstrap.bind(port).sync();
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new NettyServerHertBeat(7000).run();
}
}
NettyServerIdleStatHandler
package com.dance.netty.netty.heartbeat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
public class NettyServerIdleStateHandler extends ChannelInboundHandlerAdapter {
/**
* 用户事件触发器
*
* @param ctx 上下文
* @param evt 事件
* @throws Exception 异常
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 如果属于空闲事件
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
String msg = "";
switch (idleStateEvent.state()) {
// 读空闲事件
case READER_IDLE:
msg = "读空闲";
break;
// 写空闲事件
case WRITER_IDLE:
msg = "写空闲";
break;
// 读写空闲事件
case ALL_IDLE:
msg = "读写空闲";
break;
default:
msg = "没有对应的事件";
}
System.out.println(ctx.channel().remoteAddress() + "--超时事件--" + msg);
}
}
}
客户端采用Telnet
执行结果
netty server is starter......
一月 16, 2022 9:58:00 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xd704efa3] REGISTERED
一月 16, 2022 9:58:00 下午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0xd704efa3] BIND: 0.0.0.0/0.0.0.0:7000
一月 16, 2022 9:58:00 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xd704efa3, L:/0:0:0:0:0:0:0:0:7000] ACTIVE
一月 16, 2022 9:58:06 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xd704efa3, L:/0:0:0:0:0:0:0:0:7000] READ: [id: 0x3f6ff8a9, L:/127.0.0.1:7000 - R:/127.0.0.1:62391]
一月 16, 2022 9:58:06 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xd704efa3, L:/0:0:0:0:0:0:0:0:7000] READ COMPLETE
/127.0.0.1:62391--超时事件--读空闲
/127.0.0.1:62391--超时事件--写空闲
/127.0.0.1:62391--超时事件--读空闲
/127.0.0.1:62391--超时事件--读写空闲
/127.0.0.1:62391--超时事件--读空闲
/127.0.0.1:62391--超时事件--写空闲
/127.0.0.1:62391--超时事件--读空闲
/127.0.0.1:62391--超时事件--读写空闲
/127.0.0.1:62391--超时事件--写空闲
/127.0.0.1:62391--超时事件--读空闲
/127.0.0.1:62391--超时事件--读空闲
Netty通过WebSocket编程实现服务器与客户端长连接
需求
- Http协议是无状态的,浏览器和服务器间的请求响应一次, 下一次会重新创建连接
- 要求: 实现基于WebSocket的长链接的全双工的交互
- 改变Http协议多次请求的约束, 实现长链接, 服务器可以发送消息给浏览器
- 客户端浏览器和服务器端会相互感知, 比如服务器关闭了, 浏览器会感知, 同样浏览器关闭了,服务器也会感知
运行界面
WebSocketServer
package com.dance.netty.netty.websocket;
import com.dance.netty.netty.heartbeat.NettyServerHertBeat;
import com.dance.netty.netty.heartbeat.NettyServerIdleStateHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class WebSocketServer {
private final int port;
public WebSocketServer(int port) {
this.port = port;
}
public void run() throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO)) // 在BossGroup中增加一个日志处理器 日志级别为INFO
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 因为是基于Http协议的所以采用Http的编解码器
pipeline.addLast(new HttpServerCodec());
// 是以块的方式写, 添加ChunkedWriteHandler(分块写入处理程序)
pipeline.addLast(new ChunkedWriteHandler());
/*
* http 数据在传输过程中是分段的 http Object aggregator 就是可以将多个段聚合
* 这就是为什么 当浏览器发送大量数据时, 就会出现多次http请求
* 参数为 : 最大内容长度
*/
pipeline.addLast(new HttpObjectAggregator(8192));
/*
* 对应WebSocket 他的数据时以桢(frame) 形式传递
* 可以看到WebSocketFrame下面有6个子类
* 浏览器请求时: ws://localhost:7000/xxx 请求的url
* 核心功能是将http协议升级为ws协议 保持长链接
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
// 自定义Handler, 处理业务逻辑
pipeline.addLast(new WebSocketTextFrameHandler());
}
});
System.out.println("netty server is starter......");
ChannelFuture sync = serverBootstrap.bind(port).sync();
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new WebSocketServer(7000).run();
}
}
WebSocketTextFrameHandler
package com.dance.netty.netty.websocket;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.time.LocalDateTime;
/**
* TextWebSocketFrame 表示一个文本桢
*/
public class WebSocketTextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("[服务器] : 收到消息 -> " + msg.text());
// 回复浏览器
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间: " + LocalDateTime.now() + "->"+ msg.text()));
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// id 表示唯一的值 LongText是唯一的
System.out.println("handlerAdded 被调用:" + ctx.channel().id().asLongText());
// shortText 可能会重复
System.out.println("handlerAdded 被调用:" + ctx.channel().id().asShortText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// id 表示唯一的值 LongText是唯一的
System.out.println("handlerRemoved 被调用:" + ctx.channel().id().asLongText());
// shortText 可能会重复
System.out.println("handlerRemoved 被调用:" + ctx.channel().id().asShortText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();
cause.printStackTrace();
}
}
页面
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<script>
var socket;
// 判断当前浏览器是否支持WebSocket
if(window.WebSocket){
socket = new WebSocket("ws://localhost:7000/hello");
// 相当于ChannelRead ev就是消息回送
socket.onmessage = function (ev){
console.log(ev)
let textArea = document.getElementById("responseText")
textArea.value = textArea.value + "\n" + ev.data
}
// 相当于连接开启 ChannelAdded ev
socket.onopen = function (ev) {
console.log(ev)
let textArea = document.getElementById("responseText")
textArea.value = "连接开启了"
}
// 相当于连接关闭 ChannelRemove ev
socket.onclose = function (ev) {
console.log(ev)
let textArea = document.getElementById("responseText")
textArea.value = textArea.value + '\n' + "连接关闭了"
}
}else{
alert("当前浏览器不支持WebSocket")
}
function send(msg) {
if(!window.socket){
return;
}
if(socket.readyState === WebSocket.OPEN){
// 通过WebSocket发送消息
socket.send(msg)
}else{
alert('连接没有开启')
}
}
</script>
<form onsubmit="return false">
<textarea name="message" style="width: 300px; height: 300px;"></textarea>
<input type="button" value="发送消息" onclick="send(this.form.message.value)">
<textarea id="responseText" style="width: 300px; height: 300px;"></textarea>
<input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
</form>
<script>
</script>
</body>
</html>
测试
启动服务器
netty server is starter......
一月 16, 2022 11:19:27 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xb0f42cfd] REGISTERED
一月 16, 2022 11:19:27 下午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0xb0f42cfd] BIND: 0.0.0.0/0.0.0.0:7000
一月 16, 2022 11:19:27 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xb0f42cfd, L:/0:0:0:0:0:0:0:0:7000] ACTIVE
启动页面
可以看到连接开启了
可以看到浏览器发送了三个请求
第一个ws请求是和IDEA建立连接的不用管
第二个http协议是请求html文件的
我们主要看第三个请求, 这个就是我们自己的ws请求, 状态为101 Switching Protocols切换协议, 并且协议升级为ws协议
并且服务器感知,建立Channel连接
一月 16, 2022 11:20:46 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xb0f42cfd, L:/0:0:0:0:0:0:0:0:7000] READ: [id: 0x9b6bccb3, L:/0:0:0:0:0:0:0:1:7000 - R:/0:0:0:0:0:0:0:1:55639]
一月 16, 2022 11:20:46 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xb0f42cfd, L:/0:0:0:0:0:0:0:0:7000] READ COMPLETE
handlerAdded 被调用:005056fffec00008-00006534-00000002-37865b9aaa734014-9b6bccb3
handlerAdded 被调用:9b6bccb3
页面发送消息, 后端连接返回消息,并且浏览器中并没有新的请求
服务器
[服务器] : 收到消息 -> hi netty
关闭浏览器后服务端感知,同样的关闭服务器浏览器也会感知
handlerRemoved 被调用:005056fffec00008-00006534-00000002-37865b9aaa734014-9b6bccb3
handlerRemoved 被调用:9b6bccb3
并且在建立WebSocket连接时需要请求路径和后端配置路径一致
前端: ws://localhost:7000/hello
后端配置: /hello