Echo服务器
所有的Netty服务器都需要以下两部分。
- 至少一个
ChannelHandler——该组件实现了服务器对从客户端接收的数据的处理,即它的业务逻辑。 - 引导 ——这是配置服务器的启动代码。至少,它会将服务器绑定到它要监听连接请求的端口上。
Echo服务器会响应传入的消息,所以它需要实现ChannelInboundHandler 接口,用来定义响应入站 事件的方法。继承ChannelInboundHandlerAdapter 类,它提供了ChannelInboundHandler 的默认实现。
EchoServerHandler
channelRead()——对于每个传入的消息都要调用;channelReadComplete()——通知ChannelInboundHandler最后一次对channel-Read()的调用是当前批量读取中的最后一条消息;exceptionCaught()——在读取操作期间,有异常抛出时会调用。
ChannelInboundHandlerAdapter 的每个方法都可以被重写。
除了ChannelInboundHandlerAdapter 之外,还有很多需要学习的ChannelHandler 的子类型和实现.
- 针对不同类型的事件来调用
ChannelHandler;
package com.github.twx;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;/*** ChannelInBoundHandler: 响应入站事件* ChannelInboundHandlerAdapter提供了ChannelInboundHandler 的默认实现* @author tangwx@soyuan.com.cn* @date 2019-05-22 23:51*///标示ChannelHandler可以被多个Channel安全地共享@ChannelHandler.Sharablepublic class EchoServerHandler extends ChannelInboundHandlerAdapter {/*** 对于每个传入的消息都要调用* @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;System.out.println("server received: "+byteBuf.toString(CharsetUtil.UTF_8));//原封不动的发送给客户端ctx.write(byteBuf);}/*** 对channelRead()的调用是当前批量读取中的最后一条消息* @param ctx* @throws Exception*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {//将未处理的消息冲刷到远程节点,并且关闭该Channelctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);}/*** 每个Channel都拥有一个关联的ChannelPipeline,其持有一个ChannelHandler的实例链。* ChannelHandler会把方法的调用转发给链中的下一个Channel-Handler 。** 如果exceptionCaught() 方法没有被实现,* 那么异常将会被传递到ChannelPipeline的尾端并被记录* @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}}
引导服务器
引导服务器可以概况成两个步骤:
- 绑定到服务器将在其上监听并接受传入连接请求的端口;
- 配置
Channel,以将有关的入站消息通知给EchoServerHandler实例。
EchoServerHandler被标注为@Shareable,所以我们可以总是使用同样的实例
package com.github.twx;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import java.net.InetSocketAddress;/*** @author tangwx@soyuan.com.cn* @date 2019-05-22 23:58*/public class EchoServer {private final int port;public EchoServer(int port) {this.port = port;}public static void main(String[] args) {//设置端口值int port = Integer.parseInt(args[0]);new EchoServer(port).start();}private void start() {//这是我们自己实现的ChannelHandlerEchoServerHandler serverHandler = new EchoServerHandler();//创建EventLoopGroupEventLoopGroup group = new NioEventLoopGroup();//创建ServerBootstrapServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(group)//指定所使用的NIO传输Channel//使用NIO是因为它的可扩展性和彻底的异步性,它是目前使用最广泛的传输//如果你想要在自己的服务器中使用OIO传输,//将需要指定OioServerSocketChannel和OioEventLoopGroup 。.channel(NioServerSocketChannel.class)//使用指定的端口设置套接字地址.localAddress(new InetSocketAddress(port))//添加一个EchoServerHandler到子Channel的ChannelPipeline//当有一个新的连接进入时,新的子Channel将会被创建.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//EchoServerHandler被标注为@Shareable,所以我们可以总是使用同样的实例ch.pipeline().addLast(serverHandler);}});try {//异步绑定服务器:调用sync()方法阻塞等待直到绑定完成ChannelFuture future = bootstrap.bind().sync();//获取Channel的CloseFuture,并且阻塞当前线程直到它完成future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}finally {try {//关闭EventLoopGroup,释放所有的资源group.shutdownGracefully().sync();} catch (InterruptedException e) {e.printStackTrace();}}}}
注意看代码解释。
Echo客户端
扩展SimpleChannelInboundHandler 类处理入站消息。
EchoClientHandler
package com.github.twx;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.util.CharsetUtil;/*** @author tangwx@soyuan.com.cn* @date 2019-05-23 00:18*///标记Sharable的实例可以被多个Channel共享@ChannelHandler.Sharablepublic class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {/*** 在与服务器建立连接之后将被调用* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//建立连接后给服务器发送信息ctx.writeAndFlush(Unpooled.copiedBuffer("我是客户端,我已与服务器连接...", CharsetUtil.UTF_8));}/*** 当从服务器接收到一条消息时被调用* @param ctx* @param msg* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {System.out.println("【客户端从服务器收到消息:】 "+msg.toString(CharsetUtil.UTF_8));}/*** 在处理过程中引发异常时被调用。* @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}}
重写了channelRead0() 方法。每当接收数据时,都会调用这个方法。需要注意的是,由服务器发送的消息可能会被分块接收。也就是说,如果服务器发送了5字节,那么不能保证这5字节会被一次性接收。即使是对于这么少量的数据,channelRead0() 方法也可能会被调用两次.
引导客户端
客户端是使用主机和端口参数来连接远程地址.
与服务器引导还是非常类似的,不同的点我已经在代码注释中标明了。
package com.github.twx;import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoop;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import java.net.InetSocketAddress;/*** @author tangwx@soyuan.com.cn* @date 2019-05-23 00:12*/public class EchoClient {private final String host;private final int port;public EchoClient(String host, int port) {this.host = host;this.port = port;}public static void main(String[] args) {new EchoClient(args[0], Integer.parseInt(args[1])).start();}private void start() {//指定EventLoopGroup以处理客户端事件;需要适用于NIO的实现EventLoopGroup group = new NioEventLoopGroup();//创建Bootstrap//服务端是:ServerBootstrap bootstrap = new ServerBootstrap();Bootstrap bootstrap = new Bootstrap();bootstrap.group(group)//适用于NIO传输的Channel类型.channel(NioSocketChannel.class)//设置服务器的InetSocketAddr-ess//服务端是localAddress.remoteAddress(new InetSocketAddress(host,port))//在创建Channel时,向ChannelPipeline中添加一个EchoClientHandler实例.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//特定的处理器ch.pipeline().addLast(new EchoClientHandler());}});try {//连接到远程节点,阻塞等待直到连接完成//服务器是bind()ChannelFuture future = bootstrap.connect().sync();//阻塞,直到Channel关闭future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}finally {try {//关闭线程池并且释放所有的资源group.shutdownGracefully().sync();} catch (InterruptedException e) {e.printStackTrace();}}}}
