使用Netty创建一个简易的聊天室案例,不用的用户可以看到其他用户的上下线通知

代码:

服务端:

  1. package chatdemo;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelOption;
  6. import io.netty.channel.EventLoopGroup;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioServerSocketChannel;
  10. import io.netty.handler.codec.string.StringDecoder;
  11. import io.netty.handler.codec.string.StringEncoder;
  12. /* 聊天室服务端 */
  13. public class GroupChatServer {
  14. private int port; //监听端口
  15. public GroupChatServer(int port){
  16. this.port = port;
  17. }
  18. public void run() throws Exception {
  19. EventLoopGroup bossGroup = new NioEventLoopGroup(1); //管理连接请求线程组
  20. EventLoopGroup workGroup = new NioEventLoopGroup(); //进行业务处理线程组
  21. try {
  22. ServerBootstrap serverBootstrap = new ServerBootstrap();
  23. serverBootstrap.group(bossGroup,workGroup) //设置两个线程组
  24. .channel(NioServerSocketChannel.class) //使用 NioSocketChannel 作为服务器通道实现
  25. .option(ChannelOption.SO_BACKLOG,120) //设置线程队列等待连接的个数
  26. .childOption(ChannelOption.SO_KEEPALIVE,true) //保持活动连接状态
  27. .childHandler(new ChannelInitializer<SocketChannel>() {
  28. @Override
  29. protected void initChannel(SocketChannel ch) throws Exception {
  30. ch.pipeline().addLast("decoder",new StringDecoder()); //向PipeLine加入解码器
  31. ch.pipeline().addLast("encoder",new StringEncoder()); //向PipeLine加入编码器
  32. ch.pipeline().addLast(new GroupChatServerHandler()); //自定义业务处理Handler
  33. }
  34. });
  35. System.out.println("netty服务器启动");
  36. ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
  37. channelFuture.channel().closeFuture().sync(); //对关闭通道进行监听(当有关闭通道的消息时才进行监听)
  38. } finally {
  39. bossGroup.shutdownGracefully();
  40. workGroup.shutdownGracefully();
  41. }
  42. }
  43. public static void main(String[] args) throws Exception {
  44. new GroupChatServer(7000).run();
  45. }
  46. }

服务端Handler:

  1. package chatdemo;
  2. import io.netty.channel.Channel;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.SimpleChannelInboundHandler;
  5. import io.netty.channel.group.ChannelGroup;
  6. import io.netty.channel.group.DefaultChannelGroup;
  7. import io.netty.util.concurrent.GlobalEventExecutor;
  8. import java.text.SimpleDateFormat;
  9. import java.time.LocalDateTime;
  10. /* 自定义服务端业务处理Handler*/
  11. public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
  12. /*定义一个Channel组,用于管理所有的Channel,当handlerRemoved方法执行时自动移除对应的Channel
  13. * GlobalEventExecutor.INSTANCE 全局的事件执行器,是一个单例
  14. */
  15. private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  16. private LocalDateTime now = LocalDateTime.now();
  17. /* 一旦连接建立便执行该方法
  18. * 功能: 将当前channel加入到channelGroup,
  19. */
  20. @Override
  21. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  22. Channel channel = ctx.channel();
  23. /* channelGroup的writeAndFlush方法会将自身所有的Channel进行遍历并发送消息 */
  24. channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"在["+now+"]加入聊天\n");
  25. channelGroup.add(channel);
  26. }
  27. /*
  28. * 断开连接时触发
  29. */
  30. @Override
  31. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  32. Channel channel = ctx.channel();
  33. channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"在["+now+"]离开了\n");
  34. System.out.println("当前ChannelGroup容量: "+channelGroup.size());
  35. }
  36. /*
  37. * 通道就绪(活动)时触发
  38. */
  39. @Override
  40. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  41. System.out.println("[客户端]"+ctx.channel().remoteAddress()+"上线了!");
  42. }
  43. /*
  44. * 通道非活动时触发
  45. */
  46. @Override
  47. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  48. System.out.println("[客户端]"+ctx.channel().remoteAddress()+"下线了!");
  49. }
  50. /* 读取数据 */
  51. @Override
  52. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  53. //获取到当前channel
  54. Channel channel = ctx.channel();
  55. //遍历ChannelGroup,根据不同的情况发送不同的消息
  56. channelGroup.forEach(ch->{
  57. if(ch!=channel){
  58. //不是当前Channel,进行转发
  59. ch.writeAndFlush("[客户]"+channel.remoteAddress()+" 发送消息 "+msg+"\n");
  60. }else{
  61. ch.writeAndFlush("[自己]发送了消息: "+msg+"\n");
  62. }
  63. });
  64. }
  65. /* 发生异常时触发 */
  66. @Override
  67. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  68. ctx.close(); //关闭通道
  69. }
  70. }

客户端:

  1. package chatdemo;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioSocketChannel;
  7. import io.netty.handler.codec.string.StringDecoder;
  8. import io.netty.handler.codec.string.StringEncoder;
  9. import java.util.Scanner;
  10. /* 客户端 */
  11. public class GroupChatClient {
  12. private final String host;
  13. private final int port;
  14. public GroupChatClient(String host, int port) {
  15. this.host = host;
  16. this.port = port;
  17. }
  18. public void run(){
  19. //客户端需要一个事件循环组
  20. EventLoopGroup eventGroup = new NioEventLoopGroup();
  21. Bootstrap bootstrap = new Bootstrap();
  22. try {
  23. bootstrap.group(eventGroup)
  24. .channel(NioSocketChannel.class)
  25. .handler(new ChannelInitializer<SocketChannel>() {
  26. @Override
  27. protected void initChannel(SocketChannel ch) throws Exception {
  28. ChannelPipeline pipeline = ch.pipeline();
  29. pipeline.addLast("decoder",new StringDecoder()); //向PipeLine加入解码器
  30. pipeline.addLast("encoder",new StringEncoder()); //向PipeLine加入编码器
  31. pipeline.addLast(new GroupChatClientHandler()); //添加自定义Handler
  32. }
  33. });
  34. ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
  35. //给关闭通道进行监听(关闭通道事件发生后触发)
  36. Channel channel = channelFuture.channel();
  37. System.out.println("======"+channel.localAddress()+"======");
  38. //输入信息并发送
  39. Scanner scanner = new Scanner(System.in);
  40. while (scanner.hasNext()){
  41. String msg = scanner.nextLine();
  42. channel.writeAndFlush(msg+"\r\n");
  43. }
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. }finally{
  47. eventGroup.shutdownGracefully();
  48. }
  49. }
  50. public static void main(String[] args) {
  51. new GroupChatClient("127.0.0.1",7000).run();
  52. }
  53. }

客户端Handler:

  1. package chatdemo;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.SimpleChannelInboundHandler;
  4. /* 客户端Handler */
  5. public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
  6. @Override
  7. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  8. System.out.println(msg.trim());
  9. }
  10. }

测试:

允许客户端多实例运行后,可以实现不同用户的群聊功能与上下线通知
image.png