使用Netty创建一个简易的聊天室案例,不用的用户可以看到其他用户的上下线通知
代码:
服务端:
package chatdemo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/* 聊天室服务端 */
public class GroupChatServer {
private int port; //监听端口
public GroupChatServer(int port){
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1); //管理连接请求线程组
EventLoopGroup workGroup = new NioEventLoopGroup(); //进行业务处理线程组
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) //使用 NioSocketChannel 作为服务器通道实现
.option(ChannelOption.SO_BACKLOG,120) //设置线程队列等待连接的个数
.childOption(ChannelOption.SO_KEEPALIVE,true) //保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder",new StringDecoder()); //向PipeLine加入解码器
ch.pipeline().addLast("encoder",new StringEncoder()); //向PipeLine加入编码器
ch.pipeline().addLast(new GroupChatServerHandler()); //自定义业务处理Handler
}
});
System.out.println("netty服务器启动");
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync(); //对关闭通道进行监听(当有关闭通道的消息时才进行监听)
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatServer(7000).run();
}
}
服务端Handler:
package chatdemo;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
/* 自定义服务端业务处理Handler*/
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
/*定义一个Channel组,用于管理所有的Channel,当handlerRemoved方法执行时自动移除对应的Channel
* GlobalEventExecutor.INSTANCE 全局的事件执行器,是一个单例
*/
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private LocalDateTime now = LocalDateTime.now();
/* 一旦连接建立便执行该方法
* 功能: 将当前channel加入到channelGroup,
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
/* channelGroup的writeAndFlush方法会将自身所有的Channel进行遍历并发送消息 */
channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"在["+now+"]加入聊天\n");
channelGroup.add(channel);
}
/*
* 断开连接时触发
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"在["+now+"]离开了\n");
System.out.println("当前ChannelGroup容量: "+channelGroup.size());
}
/*
* 通道就绪(活动)时触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("[客户端]"+ctx.channel().remoteAddress()+"上线了!");
}
/*
* 通道非活动时触发
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("[客户端]"+ctx.channel().remoteAddress()+"下线了!");
}
/* 读取数据 */
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
//获取到当前channel
Channel channel = ctx.channel();
//遍历ChannelGroup,根据不同的情况发送不同的消息
channelGroup.forEach(ch->{
if(ch!=channel){
//不是当前Channel,进行转发
ch.writeAndFlush("[客户]"+channel.remoteAddress()+" 发送消息 "+msg+"\n");
}else{
ch.writeAndFlush("[自己]发送了消息: "+msg+"\n");
}
});
}
/* 发生异常时触发 */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close(); //关闭通道
}
}
客户端:
package chatdemo;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
/* 客户端 */
public class GroupChatClient {
private final String host;
private final int port;
public GroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run(){
//客户端需要一个事件循环组
EventLoopGroup eventGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(eventGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder",new StringDecoder()); //向PipeLine加入解码器
pipeline.addLast("encoder",new StringEncoder()); //向PipeLine加入编码器
pipeline.addLast(new GroupChatClientHandler()); //添加自定义Handler
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//给关闭通道进行监听(关闭通道事件发生后触发)
Channel channel = channelFuture.channel();
System.out.println("======"+channel.localAddress()+"======");
//输入信息并发送
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String msg = scanner.nextLine();
channel.writeAndFlush(msg+"\r\n");
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
eventGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new GroupChatClient("127.0.0.1",7000).run();
}
}
客户端Handler:
package chatdemo;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/* 客户端Handler */
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}
测试:
允许客户端多实例运行后,可以实现不同用户的群聊功能与上下线通知