一、实例要求

  1. 编写一个 Netty 群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)。
  2. 实现多人群聊。
  3. 服务器端:可以监测用户上线,离线,并实现消息转发功能。
  4. 客户端:通过 channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发得到)。
  5. 目的:进一步理解 Netty 非阻塞网络编程机制。

    二、代码

    GroupChatClient

    1. public class GroupChatClient {
    2. private final String host;
    3. private final int port;
    4. public GroupChatClient(String host, int port) {
    5. this.host = host;
    6. this.port = port;
    7. }
    8. public void run() throws InterruptedException {
    9. EventLoopGroup group = new NioEventLoopGroup();
    10. try {
    11. Bootstrap bootstrap = new Bootstrap();
    12. bootstrap.group(group)
    13. .channel(NioSocketChannel.class)
    14. .handler(new ChannelInitializer<SocketChannel>() {
    15. @Override
    16. protected void initChannel(SocketChannel ch) throws Exception {
    17. ChannelPipeline pipeline = ch.pipeline();
    18. pipeline.addLast("decoder", new StringDecoder());
    19. pipeline.addLast("encoder", new StringEncoder());
    20. pipeline.addLast(new GroupChatClientHandler());
    21. }
    22. });
    23. ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
    24. // 得到channel
    25. Channel channel = channelFuture.channel();
    26. System.out.println("------" + channel.localAddress() + "--------------");
    27. // 客户端需要输入信息,创建一个扫描器
    28. Scanner scanner = new Scanner(System.in);
    29. while (scanner.hasNextLine()) {
    30. String msg = scanner.nextLine();
    31. // 通过channel 发送到服务器端
    32. channel.writeAndFlush(msg + "\r\n");
    33. }
    34. } finally {
    35. group.shutdownGracefully();
    36. }
    37. }
    38. public static void main(String[] args) throws InterruptedException {
    39. new GroupChatClient("127.0.0.1", 6699).run();
    40. }
    41. }

    GroupChatClientHandler

    1. public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
    2. @Override
    3. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    4. System.out.println("客户端读取:" + msg);
    5. }
    6. }

    GroupChatServer

    ```java public class GroupChatServer { private int port; // 监听端口

    GroupChatServer(int port) {

    1. this.port = port;

    }

    // 编写 run 方法,处理客户端的请求 public void run() throws InterruptedException {

    1. // 创建 两个线程组
    2. NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
    3. NioEventLoopGroup workerGroup = new NioEventLoopGroup(3);
    4. try {
    5. ServerBootstrap serverBootstrap = new ServerBootstrap();
    6. serverBootstrap.group(bossGroup, workerGroup)
    7. .channel(NioServerSocketChannel.class)
    8. .option(ChannelOption.SO_BACKLOG, 128)
    9. .childOption(ChannelOption.SO_KEEPALIVE, true)
    10. .childHandler(new ChannelInitializer<SocketChannel>() {
    11. @Override
    12. protected void initChannel(SocketChannel ch) throws Exception {
    13. // 获取到 pipeline
    14. ChannelPipeline pipeline = ch.pipeline();
    15. // 向 pipeline 假如解码器
    16. pipeline.addLast("decoder", new StringDecoder());
    17. // 加入 编码器
    18. pipeline.addLast("coder", new StringEncoder());
    19. // 处理业务
    20. pipeline.addLast(new GroupChatServerHandler());
    21. }
    22. });
    23. System.out.println("netty 服务器启动成功");
    24. ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
    25. channelFuture.channel().closeFuture().sync();
    26. } finally {
    27. bossGroup.shutdownGracefully();
    28. workerGroup.shutdownGracefully();
    29. }

    }

    public static void main(String[] args) throws InterruptedException {

    1. new GroupChatServer(6699).run();

    }

}

  1. <a name="Dg22U"></a>
  2. ## GroupChatServerHandler
  3. ```java
  4. public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
  5. // 定义一个channel组,管理所有的channel
  6. // GlobalEventExecutor.INSTANCE 全局事件执行器,是一个单例
  7. private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  8. private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  9. private static DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  10. /**
  11. * 表示连接建立,一旦连接,第一个被执行
  12. * 将当前 channel 加入到 channelGroup
  13. *
  14. * @param ctx
  15. * @throws Exception
  16. */
  17. @Override
  18. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  19. Channel channel = ctx.channel();
  20. // 将该客户加入聊天的信息推送给其它在线的客户端
  21. /**
  22. * 该方法会将 channelGroup 中所有的 channel 遍历,并发送 消息
  23. * 我们不需要自己遍历
  24. */
  25. channelGroup.writeAndFlush("[客户端]" + LocalDateTime.now().format(dtf) + " " + channel.remoteAddress() + " 加入聊天");
  26. channelGroup.add(channel);
  27. }
  28. /**
  29. * 表示 断开连接,将 xx客户离开信息推送给当前在线的客户
  30. *
  31. * @param ctx
  32. * @throws Exception
  33. */
  34. @Override
  35. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  36. Channel channel = ctx.channel();
  37. channelGroup.writeAndFlush("[客户端]" + LocalDateTime.now().format(dtf) + " " + channel.remoteAddress() + " 离开了");
  38. System.out.println("channelGroupSize is " + channelGroup.size());
  39. }
  40. /**
  41. * 表示 channel 处于活动状态,提示 xx 上线
  42. *
  43. * @param ctx
  44. * @throws Exception
  45. */
  46. @Override
  47. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  48. System.out.println(LocalDateTime.now().format(dtf) + " " + ctx.channel().remoteAddress() + " 上线了~");
  49. }
  50. /**
  51. * 表示 channel 处于不活跃状态,提示 xx 下线
  52. *
  53. * @param ctx
  54. * @throws Exception
  55. */
  56. @Override
  57. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  58. System.out.println(LocalDateTime.now().format(dtf) + " " + ctx.channel().remoteAddress() + " 离线了~");
  59. }
  60. /**
  61. * 读取数据并执行业务操作
  62. *
  63. * @param ctx
  64. * @param msg
  65. * @throws Exception
  66. */
  67. @Override
  68. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  69. // 获取到当前 channel
  70. Channel channel = ctx.channel();
  71. // 遍历 channelgroup,根据不同的情况,回送不同的消息
  72. channelGroup.forEach(ch -> {
  73. if (channel != ch) {
  74. // 不是当前的 channel,转发消息 (不发给自己)
  75. ch.writeAndFlush("[客户]" + LocalDateTime.now().format(dtf) + " " + channel.remoteAddress() + "发送了消息:" + msg);
  76. } else {
  77. // 发给自己
  78. ch.writeAndFlush("[自动发送了消息]" + sdf.format(new Date()) + " " + msg);
  79. }
  80. });
  81. }
  82. /**
  83. * 异常
  84. *
  85. * @param ctx
  86. * @param cause
  87. * @throws Exception
  88. */
  89. @Override
  90. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  91. System.out.println("【服务器异常】" + LocalDateTime.now().format(dtf) + " " + ctx.toString());
  92. cause.printStackTrace();
  93. ctx.close();
  94. }
  95. }

三、结果展示

服务端

image.png

客户端

client 1

image.png

client 2

image.png

client 3

image.png