线程模型基本介绍
- 不同的线程模型, 对程序的性能有很大影响,为了搞清Netty线程模式,我们来系统的讲解下各个线程模式,最后看看Netty线程模型有什么优越性
- 目前存在的线程模型有
- 传统阻塞IO服务模型
- Reactor模式
- 根据Reactor的数量和处理资源线程池的数量不同,有三种典型的实现
- 单 Reactor 单线程
- 单 Reactor 多线程
- 主从 Reactor 多线程
Netty线程模式(Netty 主要基于主从Reactor多线程模型做了一定的改进,其中主从Reactor多线程模型有多个Reactor)
传统阻塞IO服务模型
工作原理图
黄色的框表示对象,蓝色的框表示线程
-
模型特点
采用阻塞IO模式获取驶入的数据
-
问题分析
当并发数很大,就会创建大量的线程,占用很大系统资源
连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在Read操作造成线程资源浪费
Reactor模式
针对传统阻塞IO服务模型的两个缺点,解决方案
基于IO复用模型,多个连接公用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接,当某个连接有新的数据可以处理时,擦欧洲哦系统通知应用程序,线程从阻塞状态返回,开始进行业务处理,Reactor对应的叫法:
- 反应器模式
- 分发者模式(Dispatcher)
- 通知者模式(notifier)
基于线程池复用线程资源, 不必为每个连接都创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务
IO复用结合线程池, 就是Reactor模式基本设计思想,如图
对上图说明:Reactor模式,通过一个或多个输入同时传递给服务处理器的模式(基于事件驱动)
- 服务器端程序处理传入的多个请求,并将他们同步分派到相应的处理线程,因此Reactor模式也叫dispatcher模式
Reactor模式使用IO复用监听事件, 收到事件后分发给某个线程(进程),这点就是网络服务器高并发处理关键
Reactor模式中 核心组成
Reactor: Reactor在一个单独的线程中运行, 负责监听和分发事件, 分发给适当的处理程序来对IO事件做出反应, 他就像公司的电话接线员,他接听来自客户的电话并将线路转移到适当的联系人
- Handlers: 处理程序执行IO事件要完成的实际事件, 类似于客户想要与之交谈的公司中的实际官员,Reactor通过调度适当的处理程序来响应IO事件,处理程序执行非阻塞操作
Reactor模式分类
根据Reactor的数量和处理资源池线程的数量不同, 有3种典型的实现
- Select是前面IO复用模型介绍的标准网络编程API,可以实现应用程序通过一个阻塞对象监听多路连接请求
2. Reactor对象通过Select监控客户端请求事件,收到事件后通过Dispatch进行分发
3. 如果是建立连接请求事件,则由Acceptor通过Accept处理连接请求,然后创建一个Handler对象处理连接完成后的后续业务处理
4. 如果不是建立连接事件,则Reactor会分发调用连接对应的Handler来响应
5. Handler会完成Read→业务处理→Send的完整业务流程
结合实例:服务器端用一个线程通过多路复用搞定所有的IO操作(包括连接,读、写等),编码简单,清晰明了,
但是如果客户端连接数量较多,将无法支撑,前面的NIO案例就属于这种模型。方案优缺点分析
- 优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成
3. 缺点:性能问题,只有一个线程,无法完全发挥多核CPU的性能。Handler在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈
3. 缺点:可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
4. 使用场景:客户端的数量有限,业务处理非常快速,比如Redis在业务处理的时间复杂度O(1)的情况单Reactor 多线程
原理图
图小结
- Reactor对象通过select监控客户端请求事件,收到事件后,通过dispatch进行分发
2. 如果建立连接请求,则右F Acceptor通过accept处理连接请求,然后创建一个Handler对象处理完成连接后的各种事件
3. 如果不是连接请求,则由reactor分发调用连接对应的handler来处理
4. handler只负责响应事件,不做具体的业务处理,通过read读取数据后,会分发给后面的worker线程池的某个线程处理业务
5. worker线程池会分配独立线程完成真正的业务,并将结果返回给handle
6. handler收到响应后,通过send将结果返回给client方案优缺点分析
- 优点:可以充分的利用多核cpu的处理能力
2. 缺点:多线程数据共享和访问比较复杂,reactor处理所有的事件的监听和响应,在单线程运行,在高并发场景容易出现性能瓶颈主从Reactor 多线程
工作原理图
针对单Reactor多线程模型中,Reactor在单线程中运行,高并发场景下容易成为性能瓶颈,可以让Reactor在多线程中运行方案说明
- Reactor主线程MainReactor对象通过select监听连接事件,收到事件后,通过Acceptor处理连接事件
2. 当Acceptor处理连接事件后,MainReactor将连接分配给SubReactor
3. subreactor将连接加入到连接队列进行监听,并创建handler进行各种事件处理
4. 当有新事件发生时,subreactor就会调用对应的Jhandler处理
5. handler通过read读取数据,分发给后面的worker线程处理
6. worker线程池分配独立的worker线程进行业务处理,并返回结果
7. handler收到响应的结果后,再通过send将结果返回给client
8. Reactor主线程可以对应多个Reactor子线程, 即MainRecator可以关联多个SubReactorScalable IO in java 对 Multiple Reactors的原理图解
方案优缺点说明
- 优点:父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理
2. 优点:父线程与子线程的数据交互简单,Reactor主线程只需要把新连接传给子线程,子线程无需返回数据。
3. 缺点:编程复杂度较高
结合实例:这种模型在许多项目中广泛使用,包括Nginx主从Reactor多进程模型,Memcached主从多线程,Netty主从多线程模型的支持Reactor模式小结
三种模式用生活案例来理解
- 单Reactor单线程,前台接待员和服务员是同一个人,全程为顾客服
2. 单Reactor多线程,1个前台接待员,多个服务员,接待员只负责接待
3. 主从Reactor多线程,多个前台接待员,多个服务生Reactor模式具有以下的优点
- 响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的
2. 可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销
3. 扩展性好,可以方便的通过增加Reactor实例个数来充分利用CPU资源
4. 复用性好,Reactor相模型本身与具体事件处理逻辑无关,具有很高的复用性
Netty模型
工作原理示意图 1-简单版
Netty主要是基于主从Reactors多线程模型(如图)做了一定的改进, 其中主从Reactor 多线程模型有多个Reactor
对上图的说明
- BossGroup线程维护Selector,只关注Accecpt
- 当接收到Accept事件,获取到对应的SocketChannel,封装成NIOScoketChannel并注册到Worker线程(事件循环),并进行维护
当Worker线程监听到selector中通道发生自己感兴趣的事件后,就进行处理(就由handler),注意handler经加入到通道
工作原理示意图 2-进阶版
工作原理示意图3-详细版
对上图的说明小结
Netty抽象出两组线程池Boss Group 专门负责接收客户端的连接, WorkerGroup 专门负责网路的读写
- Boss Group 和 WorkerGroup 类型都是NIOEventLoopGroup
- NioEventLoopGroup相当于一个事件循环组, 这个组中含有多个事件循环, 每一个事件循环是NioEventLoop
- NioEventLoop表示一个不断循环的执行处理任务的线程, 每个NioEventLoop都有一个Selector, 用于监听绑定在其上的Socket的网络通讯
- NioEventLoopGroup 可以有多个线程, 即可以含有多个NIOEventLoop
- 每个Boss NioEventLoop 循环执行的步骤有3步
- 轮训accept事件
- 处理accept事件, 与client建立连接, 生成NioSocketChannel, 并将其注册到某个worker NioEventLoop上的selector
- 处理任务队列任务,即runAllTask
- 每个Worker NIOEventLoop循环执行的步骤
- 轮训read, write事件
- 处理IO事件, 即read, write事件, 在NIOSocketChannel处理
- 处理任务队列的任务, 即runAllTask
每个Worker NioEventLoop 处理业务时, 会使用pipeline(管道), pipeline中包含了channel, 即通过pipeline可以获取到对应的通道,通道中维护了很多的处理器
Netty快速入门实例-TCP服务
需求
使用IDEA创建Netty项目
- Netty服务器在6668端口监听, 客户端能发送消息给服务器”Hello, 服务器~”
- 服务器可以回复消息给客户端”hello, 客户端~”
- 目的: 对Netty线程模型 有一个初步认识, 便于理解Netty 模型理论
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
/*
* 创建BossGroup 和 WorkerGroup
* 说明:
* 1. 创建连个线程组 bossGroup 和 workerGroup
* 2. bossGroup只处理连接请求(accept), 真正的和客户端业务处理, 会交给workerGroup完成
* 3. 两个都是无线循环
* 4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
* 默认: 实际CPU核数 * 2
*/
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建服务器端的启动对象, 配置参数
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) // 使用NioServerSocketChannel作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列等待连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {
/**
* 初始化通道
* 创建一个通道初始化对象(匿名对象)
* 给pipeline设置处理器
* @param socketChannel SocketChannel
* @throws Exception err
*/
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 设置 处理器
socketChannel.pipeline().addLast(new NettyServerHandler());
}
}); // 给我们的workerGroup 的 EventLoop 对应的管道(pipeline)设置处理器
System.out.println("server is ready......");
// 绑定一个接口 并且同步 生成一个 ChannelFuture 对象
// 启动服务器(并绑定端口)
ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();
// 对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
<a name="Z9jtB"></a>
### 编写ServerHandler
```java
package com.dance.netty.netty.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
/**
* 说明:
* 1. 我们自定义一个handler 需要集成Netty 规定好的 HandlerAdapter
* 2. 这时我们自定义一个Handler 才能称为一个Handler
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取消息
* @param ctx 通道处理器上下文 含有管道 pipeline对象 地址
* @param msg 消息
* @throws Exception 异常
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("ctx = " + ctx);
// 将msg 转换为 byteBuf
// 这个是Netty的 ByteBuf 不是 NIO的ByteBuffer
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("客户端发送的消息是: " + byteBuf.toString(StandardCharsets.UTF_8));
System.out.println("客户端地址: " + ctx.channel().remoteAddress());
}
/**
* 数据读取完毕
* @param ctx 通道处理器上下文
* @throws Exception 异常
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// write + flush
// 将数据写入到缓存并刷新
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello 客户端", StandardCharsets.UTF_8));
}
/**
* 处理异常
* @param ctx 上下文对象
* @param cause 异常
* @throws Exception 异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 异常一般是关闭通道
ctx.close();
cause.printStackTrace();
}
}
编写Client
package com.dance.netty.netty.simple;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
// 客户端需要一个事件循环组
EventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
// 创建客户端启动对象
// 注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("客户端 ok !");
// 启动客户端去连接服务端
// 关于 ChannelFuture 要分析, 涉及到Netty的异步模型
ChannelFuture sync = bootstrap.connect("127.0.0.1", 6668).sync();
// 给关闭通道进行监听
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
编写ClientHandler
package com.dance.netty.netty.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.StandardCharsets;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当通道被激活触发
* @param ctx 上下文
* @throws Exception 异常
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client ctx is " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello 服务器!", StandardCharsets.UTF_8));
}
/**
* 当有通道读取事件触发
* @param ctx 上下文
* @param msg 消息
* @throws Exception 异常
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("服务器回复的消息: " + byteBuf.toString(StandardCharsets.UTF_8));
System.out.println("服务器地址: " + ctx.channel().remoteAddress());
}
/**
* 异常
* @param ctx 上下文
* @param cause 异常
* @throws Exception 异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
cause.printStackTrace();
}
}
测试
启动服务器端
启动客户端
服务器端输出
server is ready......
ctx = ChannelHandlerContext(NettyServerHandler#0, [id: 0x565d4ec5, L:/127.0.0.1:6668 - R:/127.0.0.1:51028])
客户端发送的消息是: Hello 服务器!
客户端地址: /127.0.0.1:51028
客户端输出
客户端 ok !
client ctx is ChannelHandlerContext(NettyClientHandler#0, [id: 0xd0a47750, L:/127.0.0.1:51028 - R:/127.0.0.1:6668])
服务器回复的消息: Hello 客户端
服务器地址: /127.0.0.1:6668
TaskQueue 任务队列
任务队列中的Task有三种典型使用场景
- 用户程序自定义的普通任务 [举例说明]
- 用户自定义定时任务
非当前Reactor线程调用Channel的各种方法
- 例如在推送系统的业务线程里面, 根据用户的标识, 找到对应的Channel引用,然后调用Write类方法向该用户推送消息, 就会进入到该场景,最终的write会提交到任务队列中后被异步消费
问题
如果在读取或者操作时碰到非常耗时的业务, 那么他就会阻塞在业务执行的地方,我们希望他可以异步执行,而不是阻塞事件, 我们可以提交到该Channel对应的NioEventLoop的taskQueue中
修改NettyServerHandler的ChannelRead方法
这样测试,我们服务器端,channelRead方法就会阻塞10秒才返回Thread.sleep(10 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello 客户端 我读取完成喽", StandardCharsets.UTF_8)); System.out.println("go on....");
用户自定义普通任务
使用用户自定义普通任务解决
通过上下文对象获取Channel,获取EventLoop,并提交一个任务ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(10 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello 客户端 我读取完成喽", StandardCharsets.UTF_8)); } catch (InterruptedException e) { e.printStackTrace(); System.out.println("发生异常"); } } });
测试可以,这样的话,ChannelRead事件不会阻塞,并且任务是在10秒后返回的
但是这里如果是多任务那么时间是累加的// 应为是任务队列所以第一个任务返回是延迟10秒 ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(10 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello 客户端 我读取完成喽", StandardCharsets.UTF_8)); } catch (InterruptedException e) { e.printStackTrace(); System.out.println("发生异常"); } } }); // 第二个任务返回是 上面的 10 + 下面的 20 = 30 秒 ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(20 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello 客户端 我读取完成喽", StandardCharsets.UTF_8)); } catch (InterruptedException e) { e.printStackTrace(); System.out.println("发生异常"); } } });
用户自定义定时任务
延时5秒// 用户自定义定时任务 ctx.channel().eventLoop().schedule(() -> { try { Thread.sleep(10 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello 客户端 我读取完成喽", StandardCharsets.UTF_8)); } catch (InterruptedException e) { e.printStackTrace(); System.out.println("发生异常"); } }, 5, TimeUnit.SECONDS);
执行测试 ok
那么普通任务和延时任务并存,那么他的执行时间如何呢?
经过测试一个普通任务10秒,一个定时任务5秒+执行10秒, 总共会在20秒执行完成,在10秒返回一个, 在20秒返回一个
预测普通任务和延时任务同时开始, 普通任务开始执行,延时任务开始倒计时,在延时任务倒计时5秒后进入等待,而普通任务10秒执行完成后延时任务直接开始执行,所以在20秒返回方案再说明
- 例如在推送系统的业务线程里面, 根据用户的标识, 找到对应的Channel引用,然后调用Write类方法向该用户推送消息, 就会进入到该场景,最终的write会提交到任务队列中后被异步消费
Netty抽象出两组线程池, BOSSGroup专门负责接收客户端连接, WorkerGroup专门负责网络读写操作
- NioEventLoop表示一个不断循环执行处理任务的线程, 每个NioEventLoop都有一个Selector, 用于监听绑定在其上的Socket网络通道
- NioEventLoop内部采用串行化设计, 从消息的读取->解码->处理->编码->发送,始终由IO线程NioEventLoop负责
NioEventLoopGroup 下包含多个NioEventLoop
异步模型和同步模型相对, 当一个异步过程调用发出后, 调用者不能立刻得到结果, 实际处理这个调用的组件在完成后,通过状态, 通知和回调来通知调用者
- Netty中的IO操作是异步的,包括bind, write, Connect等操作会简单的返回一个ChannelFuture
- 调用者不能立刻获得结果, 而是通过Future-listener机制, 用户可以方便的主动获取或者通过通知机制获得IO操作结果
Netty的异步模型是建立在future 和 callback之上的, callback就是回调, 重点说Future, 他的核心思想是: 假设一个方法fun, 计算过程可能非常耗时, 等待Fun返回显然不合适, 那么可以再调用Fun的时候,立马写一个Future, 后续可以通过Future去监控方法Fun的处理过程, (即Future-Listener 机制)
Futurn说明
表示异步的执行结果, 可以通过他提供的方法来检查执行是否完成,比如检索计算等等
ChannelFuture是一个接口: public interface ChannelFuture extends Future
{} 在使用Netty进行编程时拦截操作和转换出入站数据只需要提供callback或者利用future即可, 这使链式操作简单, 高效, 并有利于编写可重用的通用的代码
Netty框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来, 解脱出来
future-listener机制
当Future对象刚刚创建时, 处于非完成状态, 调用者可以通过返回的ChannelFuture来获取操作执行的状态, 注册监听函数来执行完成后的操作
- 常见有如下操作
- 通过isDone方法来判断当前操作是否完成
- 通过isSuccess方法来判断已完成的当前操作是否成功
- 通过getCause方法来获取已完成的当前操作失败的原因
- 通过isCancelled方法来判断已完成的当前操作是否被取消
- 通过addListener方法注册监听器, 当操作已完成(isDone 方法返回完成), 将会通知指定的监听器, 如果Future对象已完成, 则通知指定的监听器
举例说明
```java // 绑定一个接口 并且同步 生成一个 ChannelFuture 对象 // 启动服务器(并绑定端口) ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();
// 给channelFuture注册监听器 channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { // 如果操作成功 if (channelFuture.isSuccess()){ System.out.println(“监听端口 6668 成功”); }else{ System.out.println(“监听端口 6668 失败”); } } });
执行结果
```java
server is ready......
监听端口 6668 成功
快速入门实例-HTTP 服务
- D实例要求:使用IDEA创建Netty项目
- Netty服务器在6668端口监听,浏览器发出请求“http://localhost:6668/
- 服务器可以回复消息给客户端“Hello!我是服务器5”,并对特定请求资源进行过滤
- 目的:Netty可以做Http服务开发,并且理解Handler实例和客户端及其请求的关系
- 看老师代码演示
新建HttpServer
```java package com.dance.netty.netty.http;
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
HTTP 服务器 */ public class HttpServer {
public static void main(String[] args) {
NioEventLoopGroup boosGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boosGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new HttpServerInitializer()); ChannelFuture sync = serverBootstrap.bind("127.0.0.1",80).sync(); sync.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { boosGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
}
}
<a name="On1iN"></a>
### 新建HttpServerInitializer
```java
package com.dance.netty.netty.http;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
/**
* 通道初始化
*/
public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 向管道加入处理器
ChannelPipeline pipeline = socketChannel.pipeline();
// 加入一个Netty提供的 HttpServerCodec codec => [coder - decoder]
/*
HttpServerCodec 是Netty提供的处理HTTP的
*/
pipeline.addLast("MyHttpServer", new HttpServerCodec());
// 添加自己的处理器
pipeline.addLast("MyHandler", new HttpServerHandler());
}
}
新建HttpServerHandler
package com.dance.netty.netty.http;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import java.nio.charset.StandardCharsets;
/**
* 管道处理器
* SimpleChannelInboundHandler 继承了 ChannelInboundHandlerAdapter
* HttpObject 客户端和服务器相互通讯的数据定义
*/
public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
/**
* 读取客户端数据
* @param channelHandlerContext 上下文对象
* @param msg httpObject对象
* @throws Exception 异常
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject msg) throws Exception {
// 判断是不是HttpRequest
if(msg instanceof HttpRequest){
System.out.println("msg type is " + msg.getClass());
System.out.println("client address is " + channelHandlerContext.channel().remoteAddress());
// 回复信息给浏览器 [HTTP协议]
ByteBuf byteBuf = Unpooled.copiedBuffer("Hello 我是服务器~", StandardCharsets.UTF_8);
// 构造一个HTTP的响应, 即 HttpResponse
DefaultFullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
// 设置响应信息
httpResponse.headers()
.set(HttpHeaderNames.CONTENT_TYPE, "text/plain;Charset=UTF-8")
.set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
channelHandlerContext.writeAndFlush(httpResponse);
}
}
}
测试
这里存在一个问题,那就是端口问题,之前是6668 chrome 访问直接被阻止
还有就是,在返回的类型中需要设置字符编码.不然会乱码
// 设置响应信息
httpResponse.headers()
.set(HttpHeaderNames.CONTENT_TYPE, "text/plain;Charset=UTF-8")
.set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
但是还是有一个问题的, 我访问了一次,但是控制台却显示的是两次
原因是应为在浏览器加载资源 的时候,同时请求了页面浏览器角标
就是这个东西
修改HttpServerHandler
package com.dance.netty.netty.http;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import java.nio.charset.StandardCharsets;
/**
* 管道处理器
* SimpleChannelInboundHandler 继承了 ChannelInboundHandlerAdapter
* HttpObject 客户端和服务器相互通讯的数据定义
*/
public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
/**
* 读取客户端数据
* @param channelHandlerContext 上下文对象
* @param msg httpObject对象
* @throws Exception 异常
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject msg) throws Exception {
// 判断是不是HttpRequest
if(msg instanceof HttpRequest){
// 进行资源过滤
String uri = ((HttpRequest) msg).uri();
// 如果是图标直接结束
if(uri.endsWith("favicon.ico")){
System.out.println("request is favicon.ico, no result");
return;
}
.............
}
}
}
重启后测试
可以看到角标被过滤了