Echo服务器

所有的Netty服务器都需要以下两部分。

  • 至少一个 ChannelHandler ——该组件实现了服务器对从客户端接收的数据的处理,即它的业务逻辑。
  • 引导 ——这是配置服务器的启动代码。至少,它会将服务器绑定到它要监听连接请求的端口上。

Echo服务器会响应传入的消息,所以它需要实现ChannelInboundHandler 接口,用来定义响应入站 事件的方法。继承ChannelInboundHandlerAdapter 类,它提供了ChannelInboundHandler 的默认实现。

EchoServerHandler

  • channelRead() ——对于每个传入的消息都要调用;
  • channelReadComplete() ——通知ChannelInboundHandler 最后一次对channel-Read() 的调用是当前批量读取中的最后一条消息;
  • exceptionCaught() ——在读取操作期间,有异常抛出时会调用。

ChannelInboundHandlerAdapter 的每个方法都可以被重写。

除了ChannelInboundHandlerAdapter 之外,还有很多需要学习的ChannelHandler 的子类型和实现.

  • 针对不同类型的事件来调用ChannelHandler


  1. package com.github.twx;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelFutureListener;
  5. import io.netty.channel.ChannelHandler;
  6. import io.netty.channel.ChannelHandlerContext;
  7. import io.netty.channel.ChannelInboundHandlerAdapter;
  8. import io.netty.util.CharsetUtil;
  9. /**
  10. * ChannelInBoundHandler: 响应入站事件
  11. * ChannelInboundHandlerAdapter提供了ChannelInboundHandler 的默认实现
  12. * @author tangwx@soyuan.com.cn
  13. * @date 2019-05-22 23:51
  14. */
  15. //标示ChannelHandler可以被多个Channel安全地共享
  16. @ChannelHandler.Sharable
  17. public class EchoServerHandler extends ChannelInboundHandlerAdapter {
  18. /**
  19. * 对于每个传入的消息都要调用
  20. * @param ctx
  21. * @param msg
  22. * @throws Exception
  23. */
  24. @Override
  25. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  26. ByteBuf byteBuf = (ByteBuf) msg;
  27. System.out.println("server received: "+byteBuf.toString(CharsetUtil.UTF_8));
  28. //原封不动的发送给客户端
  29. ctx.write(byteBuf);
  30. }
  31. /**
  32. * 对channelRead()的调用是当前批量读取中的最后一条消息
  33. * @param ctx
  34. * @throws Exception
  35. */
  36. @Override
  37. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  38. //将未处理的消息冲刷到远程节点,并且关闭该Channel
  39. ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
  40. .addListener(ChannelFutureListener.CLOSE);
  41. }
  42. /**
  43. * 每个Channel都拥有一个关联的ChannelPipeline,其持有一个ChannelHandler的实例链。
  44. * ChannelHandler会把方法的调用转发给链中的下一个Channel-Handler 。
  45. *
  46. * 如果exceptionCaught() 方法没有被实现,
  47. * 那么异常将会被传递到ChannelPipeline的尾端并被记录
  48. * @param ctx
  49. * @param cause
  50. * @throws Exception
  51. */
  52. @Override
  53. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  54. cause.printStackTrace();
  55. ctx.close();
  56. }
  57. }

引导服务器

引导服务器可以概况成两个步骤:

  • 绑定到服务器将在其上监听并接受传入连接请求的端口;
  • 配置Channel ,以将有关的入站消息通知给EchoServerHandler 实例。

EchoServerHandler被标注为@Shareable,所以我们可以总是使用同样的实例

  1. package com.github.twx;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioServerSocketChannel;
  9. import java.net.InetSocketAddress;
  10. /**
  11. * @author tangwx@soyuan.com.cn
  12. * @date 2019-05-22 23:58
  13. */
  14. public class EchoServer {
  15. private final int port;
  16. public EchoServer(int port) {
  17. this.port = port;
  18. }
  19. public static void main(String[] args) {
  20. //设置端口值
  21. int port = Integer.parseInt(args[0]);
  22. new EchoServer(port).start();
  23. }
  24. private void start() {
  25. //这是我们自己实现的ChannelHandler
  26. EchoServerHandler serverHandler = new EchoServerHandler();
  27. //创建EventLoopGroup
  28. EventLoopGroup group = new NioEventLoopGroup();
  29. //创建ServerBootstrap
  30. ServerBootstrap bootstrap = new ServerBootstrap();
  31. bootstrap.group(group)
  32. //指定所使用的NIO传输Channel
  33. //使用NIO是因为它的可扩展性和彻底的异步性,它是目前使用最广泛的传输
  34. //如果你想要在自己的服务器中使用OIO传输,
  35. //将需要指定OioServerSocketChannel和OioEventLoopGroup 。
  36. .channel(NioServerSocketChannel.class)
  37. //使用指定的端口设置套接字地址
  38. .localAddress(new InetSocketAddress(port))
  39. //添加一个EchoServerHandler到子Channel的ChannelPipeline
  40. //当有一个新的连接进入时,新的子Channel将会被创建
  41. .childHandler(new ChannelInitializer<SocketChannel>() {
  42. @Override
  43. protected void initChannel(SocketChannel ch) throws Exception {
  44. //EchoServerHandler被标注为@Shareable,所以我们可以总是使用同样的实例
  45. ch.pipeline().addLast(serverHandler);
  46. }
  47. });
  48. try {
  49. //异步绑定服务器:调用sync()方法阻塞等待直到绑定完成
  50. ChannelFuture future = bootstrap.bind().sync();
  51. //获取Channel的CloseFuture,并且阻塞当前线程直到它完成
  52. future.channel().closeFuture().sync();
  53. } catch (InterruptedException e) {
  54. e.printStackTrace();
  55. }finally {
  56. try {
  57. //关闭EventLoopGroup,释放所有的资源
  58. group.shutdownGracefully().sync();
  59. } catch (InterruptedException e) {
  60. e.printStackTrace();
  61. }
  62. }
  63. }
  64. }

注意看代码解释。

Echo客户端

扩展SimpleChannelInboundHandler 类处理入站消息。

EchoClientHandler

  1. package com.github.twx;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandler;
  5. import io.netty.channel.ChannelHandlerContext;
  6. import io.netty.channel.SimpleChannelInboundHandler;
  7. import io.netty.util.CharsetUtil;
  8. /**
  9. * @author tangwx@soyuan.com.cn
  10. * @date 2019-05-23 00:18
  11. */
  12. //标记Sharable的实例可以被多个Channel共享
  13. @ChannelHandler.Sharable
  14. public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
  15. /**
  16. * 在与服务器建立连接之后将被调用
  17. * @param ctx
  18. * @throws Exception
  19. */
  20. @Override
  21. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  22. //建立连接后给服务器发送信息
  23. ctx.writeAndFlush(Unpooled.copiedBuffer("我是客户端,我已与服务器连接...", CharsetUtil.UTF_8));
  24. }
  25. /**
  26. * 当从服务器接收到一条消息时被调用
  27. * @param ctx
  28. * @param msg
  29. * @throws Exception
  30. */
  31. @Override
  32. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
  33. System.out.println("【客户端从服务器收到消息:】 "+msg.toString(CharsetUtil.UTF_8));
  34. }
  35. /**
  36. * 在处理过程中引发异常时被调用。
  37. * @param ctx
  38. * @param cause
  39. * @throws Exception
  40. */
  41. @Override
  42. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  43. cause.printStackTrace();
  44. ctx.close();
  45. }
  46. }

重写了channelRead0() 方法。每当接收数据时,都会调用这个方法。需要注意的是,由服务器发送的消息可能会被分块接收。也就是说,如果服务器发送了5字节,那么不能保证这5字节会被一次性接收。即使是对于这么少量的数据,channelRead0() 方法也可能会被调用两次.

引导客户端

客户端是使用主机和端口参数来连接远程地址.

与服务器引导还是非常类似的,不同的点我已经在代码注释中标明了。

  1. package com.github.twx;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.bootstrap.ServerBootstrap;
  4. import io.netty.channel.ChannelFuture;
  5. import io.netty.channel.ChannelInitializer;
  6. import io.netty.channel.EventLoop;
  7. import io.netty.channel.EventLoopGroup;
  8. import io.netty.channel.nio.NioEventLoopGroup;
  9. import io.netty.channel.socket.SocketChannel;
  10. import io.netty.channel.socket.nio.NioSocketChannel;
  11. import java.net.InetSocketAddress;
  12. /**
  13. * @author tangwx@soyuan.com.cn
  14. * @date 2019-05-23 00:12
  15. */
  16. public class EchoClient {
  17. private final String host;
  18. private final int port;
  19. public EchoClient(String host, int port) {
  20. this.host = host;
  21. this.port = port;
  22. }
  23. public static void main(String[] args) {
  24. new EchoClient(args[0], Integer.parseInt(args[1])).start();
  25. }
  26. private void start() {
  27. //指定EventLoopGroup以处理客户端事件;需要适用于NIO的实现
  28. EventLoopGroup group = new NioEventLoopGroup();
  29. //创建Bootstrap
  30. //服务端是:ServerBootstrap bootstrap = new ServerBootstrap();
  31. Bootstrap bootstrap = new Bootstrap();
  32. bootstrap.group(group)
  33. //适用于NIO传输的Channel类型
  34. .channel(NioSocketChannel.class)
  35. //设置服务器的InetSocketAddr-ess
  36. //服务端是localAddress
  37. .remoteAddress(new InetSocketAddress(host,port))
  38. //在创建Channel时,向ChannelPipeline中添加一个EchoClientHandler实例
  39. .handler(new ChannelInitializer<SocketChannel>() {
  40. @Override
  41. protected void initChannel(SocketChannel ch) throws Exception {
  42. //特定的处理器
  43. ch.pipeline().addLast(new EchoClientHandler());
  44. }
  45. });
  46. try {
  47. //连接到远程节点,阻塞等待直到连接完成
  48. //服务器是bind()
  49. ChannelFuture future = bootstrap.connect().sync();
  50. //阻塞,直到Channel关闭
  51. future.channel().closeFuture().sync();
  52. } catch (InterruptedException e) {
  53. e.printStackTrace();
  54. }finally {
  55. try {
  56. //关闭线程池并且释放所有的资源
  57. group.shutdownGracefully().sync();
  58. } catch (InterruptedException e) {
  59. e.printStackTrace();
  60. }
  61. }
  62. }
  63. }