image.png

Servier
  1. public class GroupChatServer {
  2. private int port;
  3. public GroupChatServer(int port) {
  4. this.port = port;
  5. }
  6. //run方法处理客户端请求
  7. public void run() throws InterruptedException {
  8. //创建两个线程组
  9. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  10. EventLoopGroup workerGroup = new NioEventLoopGroup();
  11. try {
  12. ServerBootstrap serverBootstrap = new ServerBootstrap();
  13. serverBootstrap.group(bossGroup, workerGroup)
  14. .channel(NioServerSocketChannel.class)
  15. .option(ChannelOption.SO_BACKLOG, 128)
  16. .childOption(ChannelOption.SO_KEEPALIVE, true)
  17. .childHandler(new ChannelInitializer<SocketChannel>() {
  18. @Override
  19. protected void initChannel(SocketChannel ch) throws Exception {
  20. //获取pipeline
  21. ChannelPipeline pipeline = ch.pipeline();
  22. //向pipeline中加入一个解码器
  23. pipeline.addLast("decoder", new StringDecoder());
  24. //加入编码器
  25. pipeline.addLast("encoder", new StringEncoder());
  26. //自定义handler
  27. pipeline.addLast(new GroupChatServerHandler());
  28. }
  29. });
  30. System.out.println("netty 服务器启动了 ");
  31. ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
  32. //监听关闭
  33. channelFuture.channel().closeFuture().sync();
  34. } finally {
  35. bossGroup.shutdownGracefully();
  36. workerGroup.shutdownGracefully();
  37. }
  38. }
  39. public static void main(String[] args) throws InterruptedException {
  40. new GroupChatServer(7000).run();
  41. }
  42. }

ServerHandler
  1. public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
  2. //定义一个Channel组 管理所有的Channel
  3. //GlobalEventExecutor.INSTANCE是一个全局的事件执行器,是一个单列
  4. private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  5. //处理时间
  6. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  7. //表示连接建立之后调用的方法,一但连接第一个被执行
  8. //将当前channel加入到channelGroup
  9. @Override
  10. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  11. Channel channel = ctx.channel();
  12. //将客户加入聊天的信息推送给其他在线的客户端
  13. //该方法会将channelGroup中所有的Channel遍历并发送消息
  14. channelGroup.writeAndFlush("客户端:" + channel.remoteAddress() + "加入聊天\n");
  15. channelGroup.add(channel);
  16. }
  17. //表示断开连接被触发,并把当前channel自动从ChannelGroup中移除
  18. @Override
  19. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  20. Channel channel = ctx.channel();
  21. channelGroup.writeAndFlush("客户端:" + channel.remoteAddress() + "退出聊天\n");
  22. }
  23. //表示channel 处于活动状态,提示用户上线
  24. @Override
  25. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  26. System.out.println(ctx.channel().remoteAddress() + "--上线了");
  27. }
  28. //channel 处于不活动状态,提示用户离线
  29. @Override
  30. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  31. System.out.println(ctx.channel().remoteAddress() + "--离线了");
  32. }
  33. //读取数据并转发给当前在线的用户
  34. @Override
  35. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  36. //获取到当前Channel
  37. Channel channel = ctx.channel();
  38. //分别处理,不能给自己转发消息,根据不同的用户回复不同的消息
  39. channelGroup.forEach(ch -> {
  40. if (channel != ch) { //不是当前的channel,直接转发消息
  41. ch.writeAndFlush("客户" + channel.remoteAddress() + ":" + msg + "\n");
  42. } else { //回显自己发送的消息
  43. ch.writeAndFlush("自己发送了:" + msg + "\n");
  44. }
  45. });
  46. }
  47. //异常处理
  48. @Override
  49. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  50. ctx.close();
  51. }
  52. }

Client
  1. public class GroupChatClient {
  2. private final String HOST;
  3. private final int PORT;
  4. public GroupChatClient(String host, int port) {
  5. this.HOST = host;
  6. this.PORT = port;
  7. }
  8. public void run() throws InterruptedException {
  9. EventLoopGroup eventExecutors = new NioEventLoopGroup();
  10. try {
  11. Bootstrap bootstrap = new Bootstrap();
  12. bootstrap.group(eventExecutors)
  13. .channel(NioSocketChannel.class)
  14. .handler(new ChannelInitializer<SocketChannel>() {
  15. @Override
  16. protected void initChannel(SocketChannel ch) throws Exception {
  17. ChannelPipeline pipeline = ch.pipeline();
  18. pipeline.addLast("decoder", new StringDecoder());
  19. pipeline.addLast("encoder", new StringEncoder());
  20. pipeline.addLast(new GroupChatClientHandler());
  21. }
  22. });
  23. ChannelFuture channelFuture = bootstrap.connect(HOST, PORT).sync();
  24. Channel channel = channelFuture.channel();
  25. System.out.println(channel.localAddress() + "准备好了~~~~~~~~~");
  26. //客户端需要输入信息
  27. Scanner scanner = new Scanner(System.in);
  28. while (scanner.hasNextLine()) {
  29. String msg = scanner.nextLine();
  30. //通过Channel到服务器端
  31. channel.writeAndFlush(msg + "\r\n");
  32. }
  33. } finally {
  34. eventExecutors.shutdownGracefully();
  35. }
  36. }
  37. public static void main(String[] args) throws InterruptedException {
  38. new GroupChatClient("127.0.0.1", 7000).run();
  39. }
  40. }

ClientHandler
  1. public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
  2. @Override
  3. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  4. System.out.println(msg.trim());
  5. }
  6. }