使用Netty创建一个简易的聊天室案例,不用的用户可以看到其他用户的上下线通知
代码:
服务端:
package chatdemo;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;/* 聊天室服务端 */public class GroupChatServer {private int port; //监听端口public GroupChatServer(int port){this.port = port;}public void run() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1); //管理连接请求线程组EventLoopGroup workGroup = new NioEventLoopGroup(); //进行业务处理线程组try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup,workGroup) //设置两个线程组.channel(NioServerSocketChannel.class) //使用 NioSocketChannel 作为服务器通道实现.option(ChannelOption.SO_BACKLOG,120) //设置线程队列等待连接的个数.childOption(ChannelOption.SO_KEEPALIVE,true) //保持活动连接状态.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("decoder",new StringDecoder()); //向PipeLine加入解码器ch.pipeline().addLast("encoder",new StringEncoder()); //向PipeLine加入编码器ch.pipeline().addLast(new GroupChatServerHandler()); //自定义业务处理Handler}});System.out.println("netty服务器启动");ChannelFuture channelFuture = serverBootstrap.bind(port).sync();channelFuture.channel().closeFuture().sync(); //对关闭通道进行监听(当有关闭通道的消息时才进行监听)} finally {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {new GroupChatServer(7000).run();}}
服务端Handler:
package chatdemo;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.channel.group.ChannelGroup;import io.netty.channel.group.DefaultChannelGroup;import io.netty.util.concurrent.GlobalEventExecutor;import java.text.SimpleDateFormat;import java.time.LocalDateTime;/* 自定义服务端业务处理Handler*/public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {/*定义一个Channel组,用于管理所有的Channel,当handlerRemoved方法执行时自动移除对应的Channel* GlobalEventExecutor.INSTANCE 全局的事件执行器,是一个单例*/private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);private LocalDateTime now = LocalDateTime.now();/* 一旦连接建立便执行该方法* 功能: 将当前channel加入到channelGroup,*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();/* channelGroup的writeAndFlush方法会将自身所有的Channel进行遍历并发送消息 */channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"在["+now+"]加入聊天\n");channelGroup.add(channel);}/** 断开连接时触发*/@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"在["+now+"]离开了\n");System.out.println("当前ChannelGroup容量: "+channelGroup.size());}/** 通道就绪(活动)时触发*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("[客户端]"+ctx.channel().remoteAddress()+"上线了!");}/** 通道非活动时触发*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("[客户端]"+ctx.channel().remoteAddress()+"下线了!");}/* 读取数据 */@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {//获取到当前channelChannel channel = ctx.channel();//遍历ChannelGroup,根据不同的情况发送不同的消息channelGroup.forEach(ch->{if(ch!=channel){//不是当前Channel,进行转发ch.writeAndFlush("[客户]"+channel.remoteAddress()+" 发送消息 "+msg+"\n");}else{ch.writeAndFlush("[自己]发送了消息: "+msg+"\n");}});}/* 发生异常时触发 */@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close(); //关闭通道}}
客户端:
package chatdemo;import io.netty.bootstrap.Bootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import java.util.Scanner;/* 客户端 */public class GroupChatClient {private final String host;private final int port;public GroupChatClient(String host, int port) {this.host = host;this.port = port;}public void run(){//客户端需要一个事件循环组EventLoopGroup eventGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();try {bootstrap.group(eventGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("decoder",new StringDecoder()); //向PipeLine加入解码器pipeline.addLast("encoder",new StringEncoder()); //向PipeLine加入编码器pipeline.addLast(new GroupChatClientHandler()); //添加自定义Handler}});ChannelFuture channelFuture = bootstrap.connect(host, port).sync();//给关闭通道进行监听(关闭通道事件发生后触发)Channel channel = channelFuture.channel();System.out.println("======"+channel.localAddress()+"======");//输入信息并发送Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String msg = scanner.nextLine();channel.writeAndFlush(msg+"\r\n");}} catch (InterruptedException e) {e.printStackTrace();}finally{eventGroup.shutdownGracefully();}}public static void main(String[] args) {new GroupChatClient("127.0.0.1",7000).run();}}
客户端Handler:
package chatdemo;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;/* 客户端Handler */public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(msg.trim());}}
测试:
允许客户端多实例运行后,可以实现不同用户的群聊功能与上下线通知
