8.1 自定义Protobuf编解码器
使用Netty内置的Protobuf系列编解码器,虽然可以解决简单的Protobuf协议的传输问题,但是对复杂Head-Content协议(例如数据包头部存在魔数、版本号字段,具体如图8-1所示)的解析,内置Protobuf系列编解码器就显得无能为力了,这种情况下需要自定义Protobuf编码器和解码器。
8.1.1 自定义Protobuf编码器
- 写入待发送的Protobuf POJO实例的二进制字节长度。
- 写入其他的字段,如魔数、版本号
写入Protobuf POJO实例的二进制字节码内容。
@Slf4j
public class ProtobufEncoder extends MessageToByteEncoder<ProtoMsg.Message> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, ProtoMsg.Message message, ByteBuf byteBuf) throws Exception {
encode0(message, byteBuf);
}
public static void encode0(ProtoMsg.Message msg, ByteBuf out) {
out.writeShort(ProtoInstant.MAGIC_CODE);
out.writeShort(ProtoInstant.VERSION_CODE);
byte[] bytes = msg.toByteArray(); // 将ProtoMsg.Message对象转换为byte
int length = bytes.length; // 读取消息长度
Logger.cfo("encoder length = " + length);
// 先将消息长度写入
out.writeInt(length);
// 消息体中包含我们要发送的数据
out.writeBytes(bytes);
}
}
8.1.2 自定义Protobuf解码器
读取长度,如果长度位数不够,就终止读取。
- 读取魔数、版本号等其他字段。
按照净长度读取内容。如果内容的字节数不够,则恢复到之前的起始位置(也就是长度的位置),然后终止读取。
public class ProtobufDecoder extends ByteToMessageDecoder{
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
Object outMsg = decode0(channelHandlerContext, byteBuf);
if (outMsg != null) {
// 获取业务消息
list.add(outMsg);
}
}
public static Object decode0(ChannelHandlerContext ctx, ByteBuf in) throws InvalidFrameException, InvalidProtocolBufferException {
//标记下当前readIndex的位置
in.markReaderIndex();
// 判断包头的长度
if (in.readableBytes() < 8) {
return null;
}
//读取魔数
short magic = in.readShort();
if (magic != ProtoInstant.MAGIC_CODE) {
String error = "客户端口令不对: " + ctx.channel().remoteAddress(); // 可以从ctx中得到channel,然后从channel中移出remoteAddress
//异常连接,直接报错,关闭连接
throw new InvalidFrameException(error);
}
// 读取版本
short version = in.readShort();
if (version != ProtoInstant.VERSION_CODE){
String error = "协议的版本不对:" + ctx.channel().remoteAddress();
throw new InvalidFrameException(error);
}
// 读取传送过来的消息长度
int length = in.readInt();
// 长度如果小于0
if (length < 0) {
// 非法数据,关闭连接
ctx.close();
}
if (length > in.readableBytes()){ // 读到的消息体长度如果小于传送过来的消息长度
// 重置读取位置
in.resetReaderIndex();
return null;
}
Logger.cfo("decoder length = " + in.readableBytes());
byte[] array;
if (in.hasArray()) { // 说明是堆缓冲
// array = new byte[length];
// in.readBytes(array, 0, length);
ByteBuf slice = in.slice(in.readerIndex(), length);
Logger.cfo("slice length=" + slice.readableBytes());
array = slice.array();
}else {
//直接缓冲
array = new byte[length];
in.readBytes(array, 0, length);
}
//字节转对象
ProtoMsg.Message outMsg = ProtoMsg.Message.parseFrom(array);
return outMsg;
}
}
8.1.3 IM系统中Protobuf消息格式的设计
原则一:消息类型使用enum定义 ```java enum HeadType {
LOGIN_REQUEST = 0; //登录请求
LOGIN_RESPONSE = 1; //登录响应
LOGOUT_REQUEST = 2; //登出请求
LOGOUT_RESPONSE = 3; //登出响应
KEEPALIVE_REQUEST = 4; //心跳请求
KEEPALIVE_RESPONSE = 5; //心跳响应
MESSAGE_REQUEST = 6; //聊天消息请求
MESSAGE_RESPONSE = 7; //聊天消息响应
MESSAGE_NOTIFICATION = 8; //服务器通知
}
2. 原则二:使用一个Protobuf消息结构定义一类消息
```java
/*登录请求信息*/
message LoginRequest {
string uid = 1; //用户唯一ID
string deviceId = 2; //设备ID
string token = 3; //用户token
uint32 platform = 4; //客户端平台 Windows、MAC、Android、IOS、Web
string appVersion = 5; //APP版本号
}
- 原则三:建议给应答消息加上成功标记和应答序号 ```java /聊天响应/ message MessageResponse { bool result = 1; //true表示发送成功,false表示发送失败 uint32 code = 2; //错误码 string info = 3; //错误描述 uint32 expose = 4; //错误描述是否提示给用户:1 提示; 0 不提示 bool lastBlock = 5; //是否为最后的应答 fixed32 blockIndex = 6; //应答的序号 }
4. 原则四:编解码从顶层消息开始
```java
/*外层消息*/
message Message {
HeadType type = 1; //消息类型
uint64 sequence = 2; //序列号
string sessionId = 3; //会话ID
LoginRequest loginRequest = 4; //登录请求
LoginResponse loginResponse = 5; //登录响应
MessageRequest messageRequest = 6; //聊天请求
MessageResponse messageResponse = 7; //聊天响应
MessageNotification notification = 8; //通知消息
}
8.2 IM的登录流程
8.2.1 图解登录/响应流程的环节
从客户端到服务端再到客户端,9个环节的相关介绍如下:
- 客户端收集用户ID和密码,需要使用LoginConsoleCommand控制台命令类。
- 客户端发送Protobuf数据包到客户端通道,需要通过LoginSender发送器组装Protobuf数据包。
- 客户端通道将Protobuf数据包发送到对端,需要通过Netty底层来完成。
- 服务器子通道收到Protobuf数据包,需要通过Netty底层来完成。
- 服务端UserLoginHandler入站处理器收到登录消息,交给业务处理器LoginMsgProcesser处理异步的业务逻辑。
- 服务端LoginMsgProcesser处理完异步的业务逻辑,将处理结果写入用户绑定的子通道。
- 服务器子通道将登录响应Protobuf数据帧发送到客户端,需要通过Netty底层来完成。
- 客户端通道收到Protobuf登录响应数据包,需要通过Netty底层来完成。
客户端LoginResponseHandler业务处理器处理登录响应,例如设置登录的状态、保存会话的Session ID等。
8.2.2 客户端涉及的主要模块
ClientCommand模块:控制台命令收集器。
- ProtobufBuilder模块:Protobuf数据包构造者。
- Sender模块:数据包发送器。
-
8.2.3 服务端涉及的主要模块
Handler模块:客户端请求的处理。
- Processer模块:以异步方式完成请求的业务逻辑处理。
-
8.3 客户端的登录处理的实战案例
聊天命令的信息收集类:ChatConsoleCommand。
- 登录命令的信息收集类:LoginConsoleCommand。
- 退出命令的信息收集类:LogoutConsoleCommand。
- 命令的类型收集类:ClientCommandMenu。
8.3.1 LoginConsoleCommand和User POJO
```java package com.crazymakercircle.imClient.clientCommand; //… public class LoginConsoleCommand implements BaseCommand { public static final String KEY = “1”; private String userName; //简单起见,假设用户名称和id一致 private String password; //登录密码 @Override public void exec(Scanner scanner) {
} //… }System.out.println("请输入用户信息(id:password) ");
String[] info = null;
while (true) {
String input = scanner.next();
info = input.split(":");
if (info.length != 2) {
System.out.println("请按照格式输入(id:password):");
}else {
break;
}
}
userName=info[0];
password = info[1];
成功获取到用户密码和ID获取后,客户端CommandClient将这些内容组装成User POJO用户对象,然后通过客户端登录消息发送器loginSender开始向服务端发送登录请求,主要代码如下:
```java
package com.crazymakercircle.imClient.client;
//…
@Service("CommandClient")
public class CommandClient {
//…
//命令收集线程
public void startCommandThread() throws InterruptedException {
Thread.currentThread().setName("主线程");
while (true) {
//建立连接
while (connectFlag == false) {
//开始连接
startConnectServer();
waitCommandThread();
}
//处理命令
while (null != session &&session.isConnected()) {
Scanner scanner = new Scanner(System.in);
clientCommandMenu.exec(scanner);
String key = clientCommandMenu.getCommandInput();
//取到命令收集类POJO
BaseCommand command = commandMap.get(key);
switch (key) {
//登录的命令
case LoginConsoleCommand.KEY:
command.exec(scanner); //收集用户name和password
startLogin((LoginConsoleCommand) command);
break;
case… //省略其他的命令收集代码
}
}
}
}
//开始发送登录请求
private void startLogin(LoginConsoleCommand command) {
//…
User user = new User();
user.setUid(command.getUserName());
user.setToken(command.getPassword());
user.setDevId("1111");
loginSender.setUser(user);
loginSender.setSession(session);
loginSender.sendLoginMsg();
}
//…
}
8.3.2 LoginSender
package com.crazymakercircle.imClient.sender;
//…
@Slf4j
@Service("LoginSender")
public class LoginSender extends BaseSender {
public void sendLoginMsg() {
log.info("生成登录消息");
ProtoMsg.Message message =
LoginMsgBuilder.buildLoginMsg(getUser(), getSession());
log.info ("发送登录消息");
super.sendMsg(message);
}
}
BaseSender基类的代码如下
package com.crazymakercircle.imClient.sender;
//…
public abstract class BaseSender {
private User user;
private ClientSession session;
//…
public void sendMsg(ProtoMsg.Message message) {
if (null == getSession() || !isConnected()) {
log.info("连接还没成功");
return;
}
Channel channel=getSession().getChannel();
ChannelFuture f = channel.writeAndFlush(message);
f.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future)
…{
//回调
if (future.isSuccess()) {
sendSucceed(message);
} else {
sendfailed(message);
}
}
});
//…
}
protected void sendSucceed(ProtoMsg.Message message) {
log.info("发送成功");
}
protected void sendfailed(ProtoMsg.Message message) {
log.info("发送失败");
}
}
在处理加入通道时,可以为处理器设置一个单独的处理器线程,大致代码如下:
//创建一个独立的线程池,假定有32条线程
EventExecutorGroup threadGroup = new DefaultEventExecutorGroup(32);
final OutHandlerDemo handlerA = new OutHandlerDemo();//创建处理器
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>(){
protected void initChannel(EmbeddedChannel ch){
//handlerA的执行,从threadGroup池中绑定一条线程
ch.pipeline().addLast(threadGroup,handler);
}
};
8.3.3 ClientSession
ClientSession是一个很重要的胶水类,包含两个成员:一个是user,代表用户;另一个是channel,代表连接的通道。在实际开发中,这两个成员的作用是:
- 通过user,ClientSession可以获得当前的用户信息。
- 通过channel,ClientSession可以向服务端发送消息。
客户端会话ClientSession保存着当前的状态:
- 是否成功连接isConnected。
- 是否成功登录isLogin。
ClientSession客户端会话的主要代码如下:
package com.crazymakercircle.imClient.client;
//…
public class ClientSession {
public static final AttributeKey<ClientSession> SESSION_KEY =
AttributeKey.valueOf("SESSION_KEY");
private Channel channel;
private User user;
private String sessionId; //保存登录后的服务端sessionid
private Boolean isConnected = false;
private Boolean isLogin = false;
//绑定通道
public ClientSession(Channel channel) {
this.channel = channel;
this.sessionId = String.valueOf(-1);
//重要:ClientSession绑定到Channel上
channel.attr(ClientSession.SESSION_KEY).set(this);
}
//登录成功之后,设置sessionId
public static void loginSuccess(
ChannelHandlerContext ctx, ProtoMsg.Message pkg) {
Channel channel = ctx.channel();
ClientSession session =
channel.attr(ClientSession.SESSION_KEY).get();
session.setSessionId(pkg.getSessionId());
session.setLogin(true);
log.info("登录成功");
}
//获取通道
public static ClientSession getSession(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
ClientSession session =
channel.attr(ClientSession.SESSION_KEY).get();
return session;
}
//把Protobuf数据包写入通道
public ChannelFuture witeAndFlush(Object pkg) {
ChannelFuture f = channel.writeAndFlush(pkg);
return f;
}
//…
}
什么时候创建客户端会话呢?在Netty客户端发起连接请求之后,增加一个连接建立完成的异步回调任务,代码如下:
package com.crazymakercircle.imClient.client;
//…
public class CommandController {
//…
GenericFutureListener<ChannelFuture> connectedListener =
(ChannelFuture f) -> {
final EventLoop eventLoop = f.channel().eventLoop();
if (!f.isSuccess()) {
log.info("连接失败!在10秒之后准备尝试重连!");
eventLoop.schedule(() ->nettyClient.doConnect(),
10, TimeUnit.SECONDS);
connectFlag = false;
} else {
connectFlag = true;
log.info("疯狂创客圈 IM 服务器连接成功!");
channel = f.channel();
//创建会话
session= new ClientSession(channel);
channel.closeFuture().addListener(closeListener);
//唤醒用户线程
notifyCommandThread();
}
};
//…
}
8.3.4 LoginResponseHandler
package com.crazymakercircle.imClient.handler;
//…
public class LoginResponseHandler
extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
…{
//判断消息实例
if (null == msg || !(msg instanceofProtoMsg.Message)) {
super.channelRead(ctx, msg);
return;
}
//判断类型
ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
ProtoMsg.HeadType headType = ((ProtoMsg.Message) msg).getType();
if (!headType.equals(ProtoMsg.HeadType.LOGIN_RESPONSE)) {
super.channelRead(ctx, msg);
return;
}
//判断返回是否成功
ProtoMsg.LoginResponse info = pkg.getLoginResponse();
ProtoInstant.ResultCodeEnum result =
ProtoInstant.ResultCodeEnum.values()[info.getCode()];
if (!result.equals(ProtoInstant.ResultCodeEnum.SUCCESS)) {
//登录失败
log.info(result.getDesc());
} else {
//登录成功
ClientSession.loginSuccess(ctx, pkg);
ChannelPipeline p = ctx.pipeline();
//移除登录响应处理器
p.remove(this);
//在编码器后面动态插入心跳处理器
p.addAfter("encoder", "heartbeat",
new HeartBeatClientHandler());
}
}
}
在登录成功之后,需要将LoginResponseHandler登录响应处理器实例从流水线上移除,因为不需要再处理登录响应了。同时,需要在客户端和服务端(即服务器器)之间开启定时的心跳处理。心跳是一个比较复杂的议题,后面会专门详细介绍客户端和服务器之间的心跳。
8.3.5 客户端流水线的装配
package com.crazymakercircle.imClient.client;
//省略部分代码
public class NettyClient {
@Autowired
private ChatMsgHandler chatMsgHandler; //聊天消息处理器
@Autowired
private LoginResponseHandler loginResponceHandler; //登录响应处理器
//连接异步监听
private GenericFutureListener<ChannelFuture> connectedListener;
private Bootstrap b;
private EventLoopGroup g;
//省略部分代码
public void doConnect() {
try {
b = new Bootstrap();
//省略设置通道初始化参数
b.handler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast("decoder",
new ProtobufDecoder());
ch.pipeline().addLast("encoder",
new ProtobufEncoder());
ch.pipeline().addLast(loginResponseHandler);
ch.pipeline().addLast(chatMsgHandler);
ch.pipeline().addLast("exception",
new ExceptionHandler());
}
});
log.info("客户端开始连接 [疯狂创客圈IM]");
ChannelFuture f = b.connect();
f.addListener(connectedListener);
} catch (Exception e) {
log.info("客户端连接失败!" + e.getMessage());
}
}
//…
}
8.4 服务端的登录响应的实战案例
服务端的登录处理流程是:
- ProtobufDecoder解码器把请求ByteBuf数据包解码成Protobuf数据包。
- UserLoginRequestHandler登录处理器负责处理Protobuf数据包,进行一些必要的判断和预处理后,启动LoginProcesser登录业务处理器,以异步方式进行登录验证处理。
- LoginProcesser通过数据库或者远程接口完成用户验证,根据验证处理的结果生成登录成功/失败的登录响应报文,并发送给客户端。
8.4.1 服务端流水线的装配
```java package com.crazymakercircle.imServer.server; //… public class ChatServer {//…
@Autowired
private LoginRequestHandler loginRequestHandler; //登录请求处理器
@Autowired
private ServerExceptionHandler serverExceptionHandler; //服务器异常处理器
public void run() {
try {
//省略Bootstrap的配置选项
//5 装配流水线
b.childHandler(new ChannelInitializer<SocketChannel>() {
//有连接到达时会创建一个子通道
protected void initChannel(SocketChannel ch) …{
//装配子通道流水线中的Handler业务处理器
<a name="Z1uiS"></a>
## 8.4.2 LoginRequestHandler
```java
package com.crazymakercircle.imServer.handler;
//…
@Slf4j
@Service("LoginRequestHandler")
@ChannelHandler.Sharable
public class LoginRequestHandler extends ChannelInboundHandlerAdapter {
@Autowired
LoginProcesser loginProcesser;
public void channelRead(ChannelHandlerContext ctx, Object msg) …{
if (null == msg || !(msg instanceofProtoMsg.Message)) {
super.channelRead(ctx, msg);
return;
}
ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
//取得请求类型
ProtoMsg.HeadType headType = pkg.getType();
if (!headType.equals(loginProcesser.type())) {
super.channelRead(ctx, msg);
return;
}
ServerSession session = new ServerSession(ctx.channel());
//异步任务,处理登录的逻辑
CallbackTaskScheduler.add(new CallbackTask<Boolean>() {
@Override
public Boolean execute()…{
boolean r = loginProcesser.action(session, pkg);
return r;
}
//异步任务返回
@Override
public void onBack(Boolean r) {
if (r) {
ctx.pipeline().remove(LoginRequestHandler.this);
log.info("登录成功:" + session.getUser());
} else {
ServerSession.closeSession(ctx);
log.info("登录失败:" + session.getUser());
8.4.3 LoginProcesser
package com.crazymakercircle.imServer.processer;
//…
@Slf4j
@Service("LoginProcesser")
public class LoginProcesser extends AbstractServerProcesser {
@Autowired
LoginResponseBuilderloginResponseBuilder;
@Override
public ProtoMsg.HeadTypetype() {
return ProtoMsg.HeadType.LOGIN_REQUEST;
}
@Override
public boolean action(ServerSession session,ProtoMsg.Message proto){
//取出token验证
ProtoMsg.LoginRequest info = proto.getLoginRequest();
long seqNo = proto.getSequence();
User user = User.fromMsg(info);
//检查用户
booleanisValidUser = checkUser(user);
if (!isValidUser) {
ProtoInstant.ResultCodeEnum resultcode =
ProtoInstant.ResultCodeEnum.NO_TOKEN;
//生成登录失败的报文
ProtoMsg.Message response =
loginResponseBuilder.loginResponse(resultcode, seqNo, "-1");
//发送登录失败的报文
session.writeAndFlush(response);
return false;
}
session.setUser(user);
session.bind();
//登录成功
ProtoInstant.ResultCodeEnum resultcode =
ProtoInstant.ResultCodeEnum.SUCCESS;
//生成登录成功的报文
ProtoMsg.Message response = loginResponseBuilder.
loginResponse(resultcode, seqNo, session.getSessionId());
//发送登录成功的报文
session.writeAndFlush(response);
return true;
}
private booleancheckUser(User user) {
if (SessionMap.inst().hasLogin(user)) {
return false;
}
//验证用户,比较耗时的操作,需要200毫秒以上的时间甚至更多
//方法1:调用远程用户RESTful校验服务
//方法2:调用数据库接口校验
return true;
}
}
8.4.4 EventLoop线程和业务线程相互隔离
创建Netty的EventLoopGroup线程池,专用于处理耗时任务。 ```java //创建一个独立的线程池,假定有32条线程
EventExecutorGroup threadGroup = new DefaultEventExecutorGroup(32); final OutHandlerDemo handlerA = new OutHandlerDemo();//创建处理器 ChannelInitializer i = new ChannelInitializer
(){ protected void initChannel(EmbeddedChannel ch){
//处理器加入通道时,从专用threadGroup池中绑定一条线程
ch.pipeline().addLast(threadGroup,handler);
}
};
2. 创建一个专门的Java线程池,专用于处理耗时任务。
```java
package com.crazymakercircle.cocurrent;
//…
public class FutureTaskScheduler extends Thread
{
//方法二是使用自建的线程池时专用于处理耗时操作
private static final ExecutorService POOL=
Executors.newFixedThreadPool(10);
//添加耗时任务
public static void add(Runnable executeTask)
{
POOL.submit(executeTask);
}
}
8.5 详解Session服务器会话
8.5.1 通道的容器属性
- AttributeKey不是原始的键(如Map中的键),而是一个键的包装类。AttributeKey确保了键的唯一性,在单个Netty应用中,AttributeKey必须唯一。
- 这里的Attribute值不是原始的值(如Map中的值),也是值的包装类。原始的值就放置在Attribute包装实例中,可以通过Attribute包装类实现值的读取(get)和设置(set)。
在Netty中,接口AttributeMap的源代码如下:
package io.netty.util;
public interface AttributeMap {
<T> Attribute<T> attr(AttributeKey<T> key);
}
- Attribute的设值
```java
//定义键
public static final AttributeKey
SESSION_KEY = AttributeKey.valueOf(“SESSION_KEY”); //… //通过设置将会话绑定到通道 channel.attr(SESSION_KEY).set(session);
AttributeKey的创建需要用到静态方法AttributeKey.valueOf(String)方法。该方法的返回值为一个AttributeKey实例,其泛型参数为实际键-值对中值的实际类型。如果实际的值是ServerSession类型,则定义键的泛型参数为ServerSession,整个AttributeKey定义为AttributeKey<ServerSession>。
```java
//键的泛型形参是设置的值类型
public static final AttributeKey<ServerSession> SESSION_KEY =
AttributeKey.valueOf("SESSION_KEY");
创建完AttributeKey后,就可以通过通道完成键-值对的设值(set)、取值(get)了。常常使用链式调用,首先通过通道的attr(AttributeKey)方法取得value的包装类Attribute实例,然后通过Attribute的set()方法设置真正的值。在例子中,值是一个会话(Session)实例。
这里的AttributeKey一般定义为一个常量,需要提前定义;它的泛型参数是最终的Attribute的包装值value的数据类型。
- Attribute取值
```java
//取得Attribute实例
Attribute
attribute = ctx.channel().attr(SESSION_KEY); ServerSession session=attribute.get();
<a name="ag0PU"></a>
## 8.5.2 ServerSession服务端会话类
```java
package com.crazymakercircle.imServer.server;
//…
public class ServerSession {
public static final AttributeKey<ServerSession> SESSION_KEY =
AttributeKey.valueOf("SESSION_KEY");
private Channel channel; //通道
private User user; //用户
private final String sessionId;//会话唯一标识
private boolean isLogin = false;//登录状态
public ServerSession(Channel channel) {
this.channel = channel;
this.sessionId = buildNewSessionId();
}
//反向导航
public static ServerSession getSession(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
return channel.attr(ServerSession.SESSION_KEY).get();
}
//和通道实现双向绑定
public ServerSession bind() {
log.info(" ServerSession绑定会话 " + channel.remoteAddress());
channel.attr(ServerSession.SESSION_KEY).set(this);
SessionMap.inst().addSession(getSessionId(), this);
isLogin = true;
return this;
}
//构造session id
private static String buildNewSessionId() {
String uuid = UUID.randomUUID().toString();
return uuid.replaceAll("-", "");
}
//省略不是太重要的方法
}
8.5.3 SessionMap会话管理器
这里使用一个会话容器SessionMap,负责管理服务端所有的ServerSession,其内部使用一个线程安全的ConcurrentHashMap类型的映射成员,保持sessionId到服务端ServerSession的映射。
package com.crazymakercircle.imServer.server;
//…
public final class SessionMap {
private ConcurrentHashMap<String, ServerSession> map =
new ConcurrentHashMap<String, ServerSession>();
//增加会话对象
public void addSession(String sessionId, ServerSession s) {
map.put(sessionId, s);
log.info("用户登录:id= " + s.getUser().getUid()
+ " 在线总数: " + map.size());
}
//获取会话对象
public ServerSession getSession(String sessionId) {
if (map.containsKey(sessionId)) {
return map.get(sessionId);
} else {
return null;
}
}
//省略不是太重要的方法
}
8.6 点对点单聊的实战案例
8.6.1 单聊的端到端流程
8.6.2 客户端的ChatConsoleCommand收集聊天内容
聊天消息收集类ChatConsoleCommand负责从控制台Scanner实例收集用户输入的聊天消息(格式为id:message),代码如下:
package com.crazymakercircle.imClient.command;
//…
@Data
@Service("ChatConsoleCommand")
public class ChatConsoleCommand implements BaseCommand {
private String toUserId; //目标用户id(这里为登录的用户名称)
private String message; //聊天内容
public static final String KEY = "2";
@Override
public void exec(Scanner scanner) {
System.out.print("请输入聊天的消息(id:message):");
String[] info = null;
while (true) {
String input = scanner.next();
info = input.split(":");
if (info.length != 2) {
System.out.println("请输入聊天的消息(id:message):");
}else {
break;
}
}
toUserId = info[0];
message = info[1];
}
//…
}
8.6.3 客户端的CommandController发送POJO
ChatConsoleCommand的调用者是CommandController命令控制类,该控制类在收集完成聊天内容和目标用户后,在自己的startOneChat()方法中调用ChatSender发送实例,将聊天消息组装成Protobuf数据包,通过客户端的通道发往服务端。
package com.crazymakercircle.imClient.client;
//…
public class CommandController {
@Autowired
ChatConsoleCommand chatConsoleCommand; //聊天命令收集器实例
//省略其他成员
public void startCommandThread()throws InterruptedException {
Thread.currentThread().setName("命令线程");
while (true) {
//建立连接
while (connectFlag == false) {
//开始连接
startConnectServer();
waitCommandThread();
}
//处理命令
while (null != session ) {
Scanner scanner = new Scanner(System.in);
clientCommandMenu.exec(scanner);
String key = clientCommandMenu.getCommandInput();
BaseCommand command = commandMap.get(key);
//…
switch (key) {
case ChatConsoleCommand.KEY:
command.exec(scanner);
startOneChat((ChatConsoleCommand) command);
break;
//省略其他命令
}
}
}
}
//发送单聊消息
private void startOneChat(ChatConsoleCommand c) {
chatSender.setSession(session);
chatSender.setUser(user);
chatSender.sendChatMsg(c.getToUserId(), c.getMessage());
}
//省略其他的命令处理
}
8.6.4 服务端的ChatRedirectHandler进行消息转发
服务端收到聊天消息后会进行消息的转发,主要由消息转发处理器ChatRedirectHandler负责,其大致的工作如下:
- 对消息类型进行判断:判断是否为聊天请求Protobuf数据包。如果不是,通过调用super.channelRead(ctx, msg)将消息交给流水线的下一站。
- 对消息发送方用户登录进行判断:如果没有登录,则不能发送消息。
开启异步的消息转发,由其ChatRedirectProcesser实例负责完成消息转发。
package com.crazymakercircle.imServer.handler;
//…
public class ChatRedirectHandler extends ChannelInboundHandlerAdapter {
@Autowired
ChatRedirectProcesserchatRedirectProcesser;
public void channelRead(ChannelHandlerContext ctx, Object msg) …{
//判断消息实例
if (null == msg || !(msg instanceofProtoMsg.Message)) {
super.channelRead(ctx, msg);
return;
}
//判断消息类型
ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
ProtoMsg.HeadType headType = ((ProtoMsg.Message) msg).getType();
if (!headType.equals(chatRedirectProcesser.type())) {
super.channelRead(ctx, msg);
return;
}
//判断是否登录
ServerSession session = ServerSession.getSession(ctx);
if (null == session || !session.isLogin()) {
log.error("用户尚未登录,不能发送消息");
return;
}
//异步处理IM消息转发的逻辑
FutureTaskScheduler.add(() ->
{
chatRedirectProcesser.action(session, pkg);
});
}
}
8.6.5 服务端的ChatRedirectProcesser进行异步消息转发
ChatRedirectProcesser异步消息转发类负责将消息发送到目标用户,这是一个异步执行的任务,其大致功能如下:
根据目标用户ID找出所有的服务端的会话列表。
- 为每一个会话转发一份消息
```java
package com.crazymakercircle.imServer.processer;
//…
public class ChatRedirectProcesser extends AbstractServerProcesser {
@Override
public ProtoMsg.HeadTypetype() {
} @Override public boolean action(ServerSessionfromSession,return ProtoMsg.HeadType.MESSAGE_REQUEST;
} } //由于一个用户可能有多个会话,因此需要通过调用SessionMap会话管理器的SessionMap.inst().getSessionsBy(uid)方法来取得这个用户的所有会话。 package com.crazymakercircle.imServer.server; //… @Slf4j @Data public final class SessionMap { //全部的会话映射 “uid->session” private ConcurrentHashMapProtoMsg.Message proto) {
//聊天处理
ProtoMsg.MessageRequest msg = proto.getMessageRequest();
//获取接收方的chatID
String to = msg.getTo();
List<ServerSession> toSessions =
SessionMap.inst().getSessionsBy(to);
if (toSessions == null) {
//接收方离线,这里一般会做离线消息处理
Print.tcfo("[" + to + "] 不在线,发送失败!");
} else {
toSessions.forEach((session) -> {
//将IM消息发送到每一个接收方的通道
session.writeAndFlush(proto);
});
}
return true;
map = new ConcurrentHashMap<String, ServerSession>();
//根据用户id获取会话集合
public List
<a name="i6zBf"></a>
## 8.6.6 客户端的ChatMsgHandler聊天消息处理器
1. 对消息类型进行判断,判断是否为聊天请求Protobuf数据包。如果不是,通过super.channelRead(ctx, msg)将消息交给流水线的下一站。
1. 如果是聊天消息,就将聊天消息显示在控制台。
```java
package com.crazymakercircle.imClient.handler;
//…
public class ChatMsgHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) …{
//判断类型
ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
ProtoMsg.HeadType headType = pkg.getType();
if (!headType.equals(ProtoMsg.HeadType.MESSAGE_REQUEST)) {
super.channelRead(ctx, msg);
return; //不是聊天消息
}
ProtoMsg.MessageRequest req = pkg.getMessageRequest();
String content = req.getContent();
String uid = req.getFrom();
System.out.println(" 收到消息 from uid:" + uid + " -> " + content);
}
}
8.7 详解心跳检测
8.7.1 网络连接的假死现象
什么是连接假死呢?如果底层的TCP连接(socket连接)已经断开,但是服务端并没有正常关闭套接字,服务端认为这条TCP连接仍然是存在的,则该连接处于“假死”状态。连接假死的具体表现如下:
- 在服务端,会有一些处于TCP_ESTABLISHED状态的“正常”连接。
- 在客户端,TCP客户端显示连接已经断开。
- 虽然客户端可以进行断线重连操作,但是上一次的连接状态依然被服务端认为有效,并且服务端的资源得不到正确释放,包括套接字上下文以及接收/发送缓冲区。
连接假死通常是由以下多个原因造成的,例如:
- 应用程序出现线程堵塞,无法进行数据的读写。
- 网络相关的设备出现故障,例如网卡、机房故障。
- 网络丢包。公网环境非常容易出现丢包和网络抖动等现象。
8.7.2 服务端的空闲检测
空闲检测就是每隔一段时间检测子通道是否有数据读写,如果有,则子通道是正常的;如果没有,则子通道被判定为假死,关掉子通道。
服务端如何实现空闲检测呢?使用Netty自带的IdleStateHandler空闲状态处理器就可以实现这个功能。下面的示例程序继承自IdleStateHandler,定义一个假死处理类:
package com.crazymakercircle.imServer.handler;
//…
public class HeartBeatServerHandler extends IdleStateHandler {
private static final int READ_IDLE_GAP = 150; //最大空闲,单位秒
public HeartBeatServerHandler() {
super(READ_IDLE_GAP, 0, 0, TimeUnit.SECONDS);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx,
IdleStateEventevt) …{
System.out.println(READ_IDLE_GAP + "秒内未读到数据,关闭连接");
ServerSession.closeSession(ctx);
}
public void channelRead(ChannelHandlerContext ctx, Object msg){
//…
ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
//判断和处理心跳数据包
ProtoMsg.HeadType headType = pkg.getType();
if (headType.equals(ProtoMsg.HeadType.HEART_BEAT)) {
//异步处理,将心跳数据包直接回复给客户端
FutureTaskScheduler.add(() -> {
if (ctx.channel().isActive()) {
ctx.writeAndFlush(msg);
}
});
}
super.channelRead(ctx, msg);
}
}
在HeartBeatServerHandler的构造函数中,调用了基类IdleStateHandler的构造函数,传递了四个参数:
public HeartBeatServerHandler() {
super(READ_IDLE_GAP, 0, 0, TimeUnit.SECONDS);
}
其中,第一个参数表示入站(Inbound)空闲时长,指的是一段时间内如果没有数据入站,就判定连接假死;第二个参数是出站(Outbound)空闲时长,指的是一段时间内如果没有数据出站,就判定连接假死;第三个参数是出/入站检测时长,表示在一段时间内如果没有出站或者入站,就判定连接假死;最后一个参数表示时间单位,TimeUnit.SECONDS表示秒。<br />假死被判定之后,IdleStateHandler类会回调自己的channelIdle()方法。在这个子类的重写版本中,重写了空闲回调方法,手动关闭连接。
@Override
protected void channelIdle(ChannelHandlerContext ctx,
IdleStateEventevt) …{
System.out.println(READ_IDLE_GAP + "秒内未读到数据,关闭连接");
ServerSession.closeSession(ctx);
}
8.7.3 客户端的心跳发送
与服务端的空闲检测相配合,客户端需要定期发送数据包到服务端,通常这个数据包称为心跳数据包。接下来,定义一个Handler业务处理器定期发送心跳数据包给服务端。
package com.crazymakercircle.imClient.handler;
//…
public class HeartBeatClientHandler
extends ChannelInboundHandlerAdapter {
//心跳的时间间隔,单位为秒
private static final int HEARTBEAT_INTERVAL = 50;
//在Handler业务处理器被加入到流水线时,开始发送心跳数据包
@Override
public void handlerAdded(ChannelHandlerContext ctx) …{
ClientSession session = ClientSession.getSession(ctx);
User user = session.getUser();
HeartBeatMsgBuilder builder =
new HeartBeatMsgBuilder(user, session);
ProtoMsg.Message message = builder.buildMsg();
//发送心跳数据包
heartBeat(ctx, message);
}
//使用定时器,定期发送心跳数据包
public void heartBeat(ChannelHandlerContext ctx,
ProtoMsg.MessageheartbeatMsg) {
//提交一个一次性的定时任务
ctx.executor().schedule(() -> {
if (ctx.channel().isActive()) {
log.info(" 发送HEART_BEAT 消息to server");
ctx.writeAndFlush(heartbeatMsg);
//递归调用:提交下一个一次性的定时任务,发送下一次的心跳
heartBeat(ctx, heartbeatMsg);
}
}, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
}
//接收到服务器的心跳回写
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg){
//判断类型
ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
ProtoMsg.HeadType headType = pkg.getType();
if (headType.equals(ProtoMsg.HeadType.HEART_BEAT)) {
log.info(" 收到回写的HEART_BEAT 消息 from server");
return;
} else {
HeartBeatClientHandler实例并不是一开始就装配到了流水线中的,它装配的时机是在登录成功之后。登录处理器LoginResponseHandler的相关代码如下:
package com.crazymakercircle.imClient.clientHandler;
//…
public class LoginResponseHandler
extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)…{
//省略登录数据包的预处理
if (!result.equals(ProtoInstant.ResultCodeEnum.SUCCESS)) {
//登录失败
log.info(result.getDesc());
} else {
//登录成功
//省略其他处理
//在编码器后面动态插入心跳处理器
ChannelPipeline p=ctx.pipeline();
p.addAfter("encoder","heartbeat",new HeartBeatClientHandler());
}
}
}