实现目标
- 客户端建立连接,发送消息-Netty rocks!
- 服务器报告接收到消息,并将其回送给客户端
- 客户端报告返回的消息并退出
所有的netty服务器都需要以下两个部分:
- 至少一个ChannelHandler,该组件实现了服务器对从客户端接收的数据的处理,即它的业务逻辑
- 引导,配置服务器的启动代码。它会将服务器绑定到它要监听连接请求的端口上。
ChannelHandler和业务逻辑
ChannelHandler是一个接口族的父接口,它的实现负责接收并响应事件通知,在Netty应用程序中,所有的数据处理逻辑都包含在这些核心抽象的实现中。
因为Echo服务器会响应传入的消息,所以它需要实现ChannelInboundHandler接口,用来定义响应入站事件的方法。这个应用程序只需要用到少量的方法,所以继承ChannelInboundHandlerAdapter类足够了,它提供了ChannelInboundHandler的默认实现。
在Echo中会用到的方法:
- channelRead(),对于每个传入的消息都要调用
- ChannelReadCompute(),通知ChannelInboundHandler最后一次对channelRead()的调用是当前批量读取数据中最后一条消息
- exceptionCaught(),在读取操作期间,在异常抛出时会调用
package cn.linguo.netty.handler;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandler.Sharable;import io.netty.util.CharsetUtil;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/*** 标识一个ChannelHandler可以被多个Channel安全地共享** @author linguo**/@Sharablepublic class EchoServerHandler extends ChannelInboundHandlerAdapter {/*** 数据入站调用*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf in = (ByteBuf) msg;// 将消息内容打印到控制台System.out.println("server receiver:" + in.toString(CharsetUtil.UTF_8));// 将接收到的消息写给发送者,而不冲刷出站消息ctx.write(in);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {// 将目前暂存于ChannelOutBoundBuffer中的消息(在下一次调用flush()或者writeAndFlush()方法时将会尝试写出到套接字)冲刷到远程节点,并且关闭该channelctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);}/** 捕获异常*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();// 关闭该channelctx.close();}}
ChannerRead()和ChannelReadCompute()的区别:
channelRead表示接收消息,可以看到msg转换成了ByteBuf,然后打印,也就是把Client传过来的消息打印了一下,你会发现每次打印完后,channelReadComplete也会调用,如果你试着传一个超长的字符串过来,超过1024个字母长度,你会发现channelRead会调用多次,而channelReadComplete只调用一次。
因为ByteBuf是有长度限制的,所以超长了,就会多次读取,也就是调用多次channelRead,而channelReadComplete则是每条消息只会调用一次,无论你多长,分多少次读取,只在该条消息最后一次读取完成的时候调用,所以这段代码把关闭Channel的操作放在channelReadComplete里,放到channelRead里可能消息太长了,结果第一次读完就关掉连接了,后面的消息全丢了。
编写引导服务
引导服务器具体涉及以下内容:
- 绑定到服务器将在其上监听并接收传入连接请求的端口
- 配置Channel,以将有关的入站消息通知给EchoServerHandler实例
具体的代码示例:
package cn.linguo.netty.server;import java.net.InetSocketAddress;import cn.linguo.netty.handler.EchoServerHandler;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;/*** 引导服务器** @author linguo**/public class EchoServer {private final int port;public EchoServer(int port) {this.port = port;}public void start() throws InterruptedException {final EchoServerHandler serverHandler = new EchoServerHandler();// 创建EventLoopGroupEventLoopGroup group = new NioEventLoopGroup();try {// 创建ServerBootstrapServerBootstrap b = new ServerBootstrap();b.group(group).channel(NioServerSocketChannel.class) // 指定所使用的的NIO传输Channel.localAddress(new InetSocketAddress(port)) // 使用指定的端口设置套接字地址// 添加一个EchoServerHandler到子Channel的ChannelPipeline.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// EchoServerHandler被标注为@Sharaable,所以我们可以总是使用同样的实例// 这里对于所有的客户端连接来说,都会使用同一个EchoServerHandler,因为其被标注为@Sharable。ch.pipeline().addLast(serverHandler);}});// 异步地绑定服务器;调用sync()方法阻塞等待知道绑定完成ChannelFuture f = b.bind().sync();// 获取Channel的CloseFuture,并且阻塞当前线程直到它完成f.channel().closeFuture().sync();} finally {// 关闭EventLoopGroup,释放所有资源group.shutdownGracefully().sync();}}public static void main(String[] args) throws Exception {if (args.length != 1) {System.out.println("Usage:" + EchoServer.class.getSimpleName() + "<port>");return;}// 设置端口置(如果端口参数不正确,抛出转换异常)int port = Integer.parseInt(args[0]);// 调用服务器的start()方法new EchoServer(port).start();}}
主要步骤回顾
- EchoServerHandler实现了业务逻辑
- main()方法引导了服务器
引导过程中所需要的步骤如下:
- 创建一个ServerBootstrap的实例以引导和绑定服务器
- 创建并分配一个NioEventLoopGroup实例以进行事件的处理,如接受新连接以及读写数据
- 指定服务器绑定的本地InetSocketAddress
- 使用一个EchoServerHandler的实例初始化每一个新的Channel
- 调用ServerBootstrap.bind()方法以绑定服务器
