Servier
public class GroupChatServer { private int port; public GroupChatServer(int port) { this.port = port; } //run方法处理客户端请求 public void run() throws InterruptedException { //创建两个线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //获取pipeline ChannelPipeline pipeline = ch.pipeline(); //向pipeline中加入一个解码器 pipeline.addLast("decoder", new StringDecoder()); //加入编码器 pipeline.addLast("encoder", new StringEncoder()); //自定义handler pipeline.addLast(new GroupChatServerHandler()); } }); System.out.println("netty 服务器启动了 "); ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //监听关闭 channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new GroupChatServer(7000).run(); }}
ServerHandler
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> { //定义一个Channel组 管理所有的Channel //GlobalEventExecutor.INSTANCE是一个全局的事件执行器,是一个单列 private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); //处理时间 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //表示连接建立之后调用的方法,一但连接第一个被执行 //将当前channel加入到channelGroup @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //将客户加入聊天的信息推送给其他在线的客户端 //该方法会将channelGroup中所有的Channel遍历并发送消息 channelGroup.writeAndFlush("客户端:" + channel.remoteAddress() + "加入聊天\n"); channelGroup.add(channel); } //表示断开连接被触发,并把当前channel自动从ChannelGroup中移除 @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("客户端:" + channel.remoteAddress() + "退出聊天\n"); } //表示channel 处于活动状态,提示用户上线 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress() + "--上线了"); } //channel 处于不活动状态,提示用户离线 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress() + "--离线了"); } //读取数据并转发给当前在线的用户 @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { //获取到当前Channel Channel channel = ctx.channel(); //分别处理,不能给自己转发消息,根据不同的用户回复不同的消息 channelGroup.forEach(ch -> { if (channel != ch) { //不是当前的channel,直接转发消息 ch.writeAndFlush("客户" + channel.remoteAddress() + ":" + msg + "\n"); } else { //回显自己发送的消息 ch.writeAndFlush("自己发送了:" + msg + "\n"); } }); } //异常处理 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); }}
Client
public class GroupChatClient { private final String HOST; private final int PORT; public GroupChatClient(String host, int port) { this.HOST = host; this.PORT = port; } public void run() throws InterruptedException { EventLoopGroup eventExecutors = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventExecutors) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast(new GroupChatClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect(HOST, PORT).sync(); Channel channel = channelFuture.channel(); System.out.println(channel.localAddress() + "准备好了~~~~~~~~~"); //客户端需要输入信息 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String msg = scanner.nextLine(); //通过Channel到服务器端 channel.writeAndFlush(msg + "\r\n"); } } finally { eventExecutors.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new GroupChatClient("127.0.0.1", 7000).run(); }}
ClientHandler
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg.trim()); }}