8.1 自定义Protobuf编解码器

使用Netty内置的Protobuf系列编解码器,虽然可以解决简单的Protobuf协议的传输问题,但是对复杂Head-Content协议(例如数据包头部存在魔数、版本号字段,具体如图8-1所示)的解析,内置Protobuf系列编解码器就显得无能为力了,这种情况下需要自定义Protobuf编码器和解码器。
image.png

8.1.1 自定义Protobuf编码器

  1. 写入待发送的Protobuf POJO实例的二进制字节长度。
  2. 写入其他的字段,如魔数、版本号
  3. 写入Protobuf POJO实例的二进制字节码内容。

    1. @Slf4j
    2. public class ProtobufEncoder extends MessageToByteEncoder<ProtoMsg.Message> {
    3. @Override
    4. protected void encode(ChannelHandlerContext channelHandlerContext, ProtoMsg.Message message, ByteBuf byteBuf) throws Exception {
    5. encode0(message, byteBuf);
    6. }
    7. public static void encode0(ProtoMsg.Message msg, ByteBuf out) {
    8. out.writeShort(ProtoInstant.MAGIC_CODE);
    9. out.writeShort(ProtoInstant.VERSION_CODE);
    10. byte[] bytes = msg.toByteArray(); // 将ProtoMsg.Message对象转换为byte
    11. int length = bytes.length; // 读取消息长度
    12. Logger.cfo("encoder length = " + length);
    13. // 先将消息长度写入
    14. out.writeInt(length);
    15. // 消息体中包含我们要发送的数据
    16. out.writeBytes(bytes);
    17. }
    18. }

    8.1.2 自定义Protobuf解码器

  4. 读取长度,如果长度位数不够,就终止读取。

  5. 读取魔数、版本号等其他字段。
  6. 按照净长度读取内容。如果内容的字节数不够,则恢复到之前的起始位置(也就是长度的位置),然后终止读取。

    1. public class ProtobufDecoder extends ByteToMessageDecoder{
    2. @Override
    3. protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
    4. Object outMsg = decode0(channelHandlerContext, byteBuf);
    5. if (outMsg != null) {
    6. // 获取业务消息
    7. list.add(outMsg);
    8. }
    9. }
    10. public static Object decode0(ChannelHandlerContext ctx, ByteBuf in) throws InvalidFrameException, InvalidProtocolBufferException {
    11. //标记下当前readIndex的位置
    12. in.markReaderIndex();
    13. // 判断包头的长度
    14. if (in.readableBytes() < 8) {
    15. return null;
    16. }
    17. //读取魔数
    18. short magic = in.readShort();
    19. if (magic != ProtoInstant.MAGIC_CODE) {
    20. String error = "客户端口令不对: " + ctx.channel().remoteAddress(); // 可以从ctx中得到channel,然后从channel中移出remoteAddress
    21. //异常连接,直接报错,关闭连接
    22. throw new InvalidFrameException(error);
    23. }
    24. // 读取版本
    25. short version = in.readShort();
    26. if (version != ProtoInstant.VERSION_CODE){
    27. String error = "协议的版本不对:" + ctx.channel().remoteAddress();
    28. throw new InvalidFrameException(error);
    29. }
    30. // 读取传送过来的消息长度
    31. int length = in.readInt();
    32. // 长度如果小于0
    33. if (length < 0) {
    34. // 非法数据,关闭连接
    35. ctx.close();
    36. }
    37. if (length > in.readableBytes()){ // 读到的消息体长度如果小于传送过来的消息长度
    38. // 重置读取位置
    39. in.resetReaderIndex();
    40. return null;
    41. }
    42. Logger.cfo("decoder length = " + in.readableBytes());
    43. byte[] array;
    44. if (in.hasArray()) { // 说明是堆缓冲
    45. // array = new byte[length];
    46. // in.readBytes(array, 0, length);
    47. ByteBuf slice = in.slice(in.readerIndex(), length);
    48. Logger.cfo("slice length=" + slice.readableBytes());
    49. array = slice.array();
    50. }else {
    51. //直接缓冲
    52. array = new byte[length];
    53. in.readBytes(array, 0, length);
    54. }
    55. //字节转对象
    56. ProtoMsg.Message outMsg = ProtoMsg.Message.parseFrom(array);
    57. return outMsg;
    58. }
    59. }

    8.1.3 IM系统中Protobuf消息格式的设计

  7. 原则一:消息类型使用enum定义 ```java enum HeadType {

    1. LOGIN_REQUEST = 0; //登录请求
    2. LOGIN_RESPONSE = 1; //登录响应
    3. LOGOUT_REQUEST = 2; //登出请求
    4. LOGOUT_RESPONSE = 3; //登出响应
    5. KEEPALIVE_REQUEST = 4; //心跳请求
    6. KEEPALIVE_RESPONSE = 5; //心跳响应
    7. MESSAGE_REQUEST = 6; //聊天消息请求
    8. MESSAGE_RESPONSE = 7; //聊天消息响应
    9. MESSAGE_NOTIFICATION = 8; //服务器通知

    }

  1. 2. 原则二:使用一个Protobuf消息结构定义一类消息
  2. ```java
  3. /*登录请求信息*/
  4. message LoginRequest {
  5. string uid = 1; //用户唯一ID
  6. string deviceId = 2; //设备ID
  7. string token = 3; //用户token
  8. uint32 platform = 4; //客户端平台 Windows、MAC、Android、IOS、Web
  9. string appVersion = 5; //APP版本号
  10. }
  1. 原则三:建议给应答消息加上成功标记和应答序号 ```java /聊天响应/ message MessageResponse { bool result = 1; //true表示发送成功,false表示发送失败 uint32 code = 2; //错误码 string info = 3; //错误描述 uint32 expose = 4; //错误描述是否提示给用户:1 提示; 0 不提示 bool lastBlock = 5; //是否为最后的应答 fixed32 blockIndex = 6; //应答的序号 }
  1. 4. 原则四:编解码从顶层消息开始
  2. ```java
  3. /*外层消息*/
  4. message Message {
  5. HeadType type = 1; //消息类型
  6. uint64 sequence = 2; //序列号
  7. string sessionId = 3; //会话ID
  8. LoginRequest loginRequest = 4; //登录请求
  9. LoginResponse loginResponse = 5; //登录响应
  10. MessageRequest messageRequest = 6; //聊天请求
  11. MessageResponse messageResponse = 7; //聊天响应
  12. MessageNotification notification = 8; //通知消息
  13. }

8.2 IM的登录流程

8.2.1 图解登录/响应流程的环节

第8章 基于Netty单体IM系统的开发实战 - 图2
从客户端到服务端再到客户端,9个环节的相关介绍如下:

  1. 客户端收集用户ID和密码,需要使用LoginConsoleCommand控制台命令类。
  2. 客户端发送Protobuf数据包到客户端通道,需要通过LoginSender发送器组装Protobuf数据包。
  3. 客户端通道将Protobuf数据包发送到对端,需要通过Netty底层来完成。
  4. 服务器子通道收到Protobuf数据包,需要通过Netty底层来完成。
  5. 服务端UserLoginHandler入站处理器收到登录消息,交给业务处理器LoginMsgProcesser处理异步的业务逻辑。
  6. 服务端LoginMsgProcesser处理完异步的业务逻辑,将处理结果写入用户绑定的子通道。
  7. 服务器子通道将登录响应Protobuf数据帧发送到客户端,需要通过Netty底层来完成。
  8. 客户端通道收到Protobuf登录响应数据包,需要通过Netty底层来完成。
  9. 客户端LoginResponseHandler业务处理器处理登录响应,例如设置登录的状态、保存会话的Session ID等。

    8.2.2 客户端涉及的主要模块

  10. ClientCommand模块:控制台命令收集器。

  11. ProtobufBuilder模块:Protobuf数据包构造者。
  12. Sender模块:数据包发送器。
  13. Handler模块:服务器响应处理器。

    8.2.3 服务端涉及的主要模块

  14. Handler模块:客户端请求的处理。

  15. Processer模块:以异步方式完成请求的业务逻辑处理。
  16. Session模块:管理用户与通道的绑定关系。

    8.3 客户端的登录处理的实战案例

  17. 聊天命令的信息收集类:ChatConsoleCommand。

  18. 登录命令的信息收集类:LoginConsoleCommand。
  19. 退出命令的信息收集类:LogoutConsoleCommand。
  20. 命令的类型收集类:ClientCommandMenu。

    8.3.1 LoginConsoleCommand和User POJO

    ```java package com.crazymakercircle.imClient.clientCommand; //… public class LoginConsoleCommand implements BaseCommand { public static final String KEY = “1”; private String userName; //简单起见,假设用户名称和id一致 private String password; //登录密码 @Override public void exec(Scanner scanner) {
    1. System.out.println("请输入用户信息(id:password) ");
    2. String[] info = null;
    3. while (true) {
    4. String input = scanner.next();
    5. info = input.split(":");
    6. if (info.length != 2) {
    7. System.out.println("请按照格式输入(id:password):");
    8. }else {
    9. break;
    10. }
    11. }
    12. userName=info[0];
    13. password = info[1];
    } //… }
  1. 成功获取到用户密码和ID获取后,客户端CommandClient将这些内容组装成User POJO用户对象,然后通过客户端登录消息发送器loginSender开始向服务端发送登录请求,主要代码如下:
  2. ```java
  3. package com.crazymakercircle.imClient.client;
  4. //…
  5. @Service("CommandClient")
  6. public class CommandClient {
  7. //…
  8. //命令收集线程
  9. public void startCommandThread() throws InterruptedException {
  10. Thread.currentThread().setName("主线程");
  11. while (true) {
  12. //建立连接
  13. while (connectFlag == false) {
  14. //开始连接
  15. startConnectServer();
  16. waitCommandThread();
  17. }
  18. //处理命令
  19. while (null != session &&session.isConnected()) {
  20. Scanner scanner = new Scanner(System.in);
  21. clientCommandMenu.exec(scanner);
  22. String key = clientCommandMenu.getCommandInput();
  23. //取到命令收集类POJO
  24. BaseCommand command = commandMap.get(key);
  25. switch (key) {
  26. //登录的命令
  27. case LoginConsoleCommand.KEY:
  28. command.exec(scanner); //收集用户name和password
  29. startLogin((LoginConsoleCommand) command);
  30. break;
  31. case… //省略其他的命令收集代码
  32. }
  33. }
  34. }
  35. }
  36. //开始发送登录请求
  37. private void startLogin(LoginConsoleCommand command) {
  38. //…
  39. User user = new User();
  40. user.setUid(command.getUserName());
  41. user.setToken(command.getPassword());
  42. user.setDevId("1111");
  43. loginSender.setUser(user);
  44. loginSender.setSession(session);
  45. loginSender.sendLoginMsg();
  46. }
  47. //…
  48. }

8.3.2 LoginSender

  1. package com.crazymakercircle.imClient.sender;
  2. //…
  3. @Slf4j
  4. @Service("LoginSender")
  5. public class LoginSender extends BaseSender {
  6. public void sendLoginMsg() {
  7. log.info("生成登录消息");
  8. ProtoMsg.Message message =
  9. LoginMsgBuilder.buildLoginMsg(getUser(), getSession());
  10. log.info ("发送登录消息");
  11. super.sendMsg(message);
  12. }
  13. }

BaseSender基类的代码如下

  1. package com.crazymakercircle.imClient.sender;
  2. //…
  3. public abstract class BaseSender {
  4. private User user;
  5. private ClientSession session;
  6. //…
  7. public void sendMsg(ProtoMsg.Message message) {
  8. if (null == getSession() || !isConnected()) {
  9. log.info("连接还没成功");
  10. return;
  11. }
  12. Channel channel=getSession().getChannel();
  13. ChannelFuture f = channel.writeAndFlush(message);
  14. f.addListener(new GenericFutureListener<Future<? super Void>>() {
  15. @Override
  16. public void operationComplete(Future<? super Void> future)
  17. …{
  18. //回调
  19. if (future.isSuccess()) {
  20. sendSucceed(message);
  21. } else {
  22. sendfailed(message);
  23. }
  24. }
  25. });
  26. //…
  27. }
  28. protected void sendSucceed(ProtoMsg.Message message) {
  29. log.info("发送成功");
  30. }
  31. protected void sendfailed(ProtoMsg.Message message) {
  32. log.info("发送失败");
  33. }
  34. }

在处理加入通道时,可以为处理器设置一个单独的处理器线程,大致代码如下:

  1. //创建一个独立的线程池,假定有32条线程
  2. EventExecutorGroup threadGroup = new DefaultEventExecutorGroup(32);
  3. final OutHandlerDemo handlerA = new OutHandlerDemo();//创建处理器
  4. ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>(){
  5. protected void initChannel(EmbeddedChannel ch){
  6. //handlerA的执行,从threadGroup池中绑定一条线程
  7. ch.pipeline().addLast(threadGroup,handler);
  8. }
  9. };

8.3.3 ClientSession

ClientSession是一个很重要的胶水类,包含两个成员:一个是user,代表用户;另一个是channel,代表连接的通道。在实际开发中,这两个成员的作用是:

  1. 通过user,ClientSession可以获得当前的用户信息。
  2. 通过channel,ClientSession可以向服务端发送消息。

客户端会话ClientSession保存着当前的状态:

  1. 是否成功连接isConnected。
  2. 是否成功登录isLogin。

ClientSession客户端会话的主要代码如下:

  1. package com.crazymakercircle.imClient.client;
  2. //…
  3. public class ClientSession {
  4. public static final AttributeKey<ClientSession> SESSION_KEY =
  5. AttributeKey.valueOf("SESSION_KEY");
  6. private Channel channel;
  7. private User user;
  8. private String sessionId; //保存登录后的服务端sessionid
  9. private Boolean isConnected = false;
  10. private Boolean isLogin = false;
  11. //绑定通道
  12. public ClientSession(Channel channel) {
  13. this.channel = channel;
  14. this.sessionId = String.valueOf(-1);
  15. //重要:ClientSession绑定到Channel上
  16. channel.attr(ClientSession.SESSION_KEY).set(this);
  17. }
  18. //登录成功之后,设置sessionId
  19. public static void loginSuccess(
  20. ChannelHandlerContext ctx, ProtoMsg.Message pkg) {
  21. Channel channel = ctx.channel();
  22. ClientSession session =
  23. channel.attr(ClientSession.SESSION_KEY).get();
  24. session.setSessionId(pkg.getSessionId());
  25. session.setLogin(true);
  26. log.info("登录成功");
  27. }
  28. //获取通道
  29. public static ClientSession getSession(ChannelHandlerContext ctx) {
  30. Channel channel = ctx.channel();
  31. ClientSession session =
  32. channel.attr(ClientSession.SESSION_KEY).get();
  33. return session;
  34. }
  35. //把Protobuf数据包写入通道
  36. public ChannelFuture witeAndFlush(Object pkg) {
  37. ChannelFuture f = channel.writeAndFlush(pkg);
  38. return f;
  39. }
  40. //…
  41. }
  1. 什么时候创建客户端会话呢?在Netty客户端发起连接请求之后,增加一个连接建立完成的异步回调任务,代码如下:
  1. package com.crazymakercircle.imClient.client;
  2. //…
  3. public class CommandController {
  4. //…
  5. GenericFutureListener<ChannelFuture> connectedListener =
  6. (ChannelFuture f) -> {
  7. final EventLoop eventLoop = f.channel().eventLoop();
  8. if (!f.isSuccess()) {
  9. log.info("连接失败!在10秒之后准备尝试重连!");
  10. eventLoop.schedule(() ->nettyClient.doConnect(),
  11. 10, TimeUnit.SECONDS);
  12. connectFlag = false;
  13. } else {
  14. connectFlag = true;
  15. log.info("疯狂创客圈 IM 服务器连接成功!");
  16. channel = f.channel();
  17. //创建会话
  18. session= new ClientSession(channel);
  19. channel.closeFuture().addListener(closeListener);
  20. //唤醒用户线程
  21. notifyCommandThread();
  22. }
  23. };
  24. //…
  25. }

8.3.4 LoginResponseHandler

  1. package com.crazymakercircle.imClient.handler;
  2. //…
  3. public class LoginResponseHandler
  4. extends ChannelInboundHandlerAdapter {
  5. @Override
  6. public void channelRead(ChannelHandlerContext ctx, Object msg)
  7. …{
  8. //判断消息实例
  9. if (null == msg || !(msg instanceofProtoMsg.Message)) {
  10. super.channelRead(ctx, msg);
  11. return;
  12. }
  13. //判断类型
  14. ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
  15. ProtoMsg.HeadType headType = ((ProtoMsg.Message) msg).getType();
  16. if (!headType.equals(ProtoMsg.HeadType.LOGIN_RESPONSE)) {
  17. super.channelRead(ctx, msg);
  18. return;
  19. }
  20. //判断返回是否成功
  21. ProtoMsg.LoginResponse info = pkg.getLoginResponse();
  22. ProtoInstant.ResultCodeEnum result =
  23. ProtoInstant.ResultCodeEnum.values()[info.getCode()];
  24. if (!result.equals(ProtoInstant.ResultCodeEnum.SUCCESS)) {
  25. //登录失败
  26. log.info(result.getDesc());
  27. } else {
  28. //登录成功
  29. ClientSession.loginSuccess(ctx, pkg);
  30. ChannelPipeline p = ctx.pipeline();
  31. //移除登录响应处理器
  32. p.remove(this);
  33. //在编码器后面动态插入心跳处理器
  34. p.addAfter("encoder", "heartbeat",
  35. new HeartBeatClientHandler());
  36. }
  37. }
  38. }

在登录成功之后,需要将LoginResponseHandler登录响应处理器实例从流水线上移除,因为不需要再处理登录响应了。同时,需要在客户端和服务端(即服务器器)之间开启定时的心跳处理。心跳是一个比较复杂的议题,后面会专门详细介绍客户端和服务器之间的心跳。

8.3.5 客户端流水线的装配

  1. package com.crazymakercircle.imClient.client;
  2. //省略部分代码
  3. public class NettyClient {
  4. @Autowired
  5. private ChatMsgHandler chatMsgHandler; //聊天消息处理器
  6. @Autowired
  7. private LoginResponseHandler loginResponceHandler; //登录响应处理器
  8. //连接异步监听
  9. private GenericFutureListener<ChannelFuture> connectedListener;
  10. private Bootstrap b;
  11. private EventLoopGroup g;
  12. //省略部分代码
  13. public void doConnect() {
  14. try {
  15. b = new Bootstrap();
  16. //省略设置通道初始化参数
  17. b.handler(new ChannelInitializer<SocketChannel>() {
  18. public void initChannel(SocketChannel ch) {
  19. ch.pipeline().addLast("decoder",
  20. new ProtobufDecoder());
  21. ch.pipeline().addLast("encoder",
  22. new ProtobufEncoder());
  23. ch.pipeline().addLast(loginResponseHandler);
  24. ch.pipeline().addLast(chatMsgHandler);
  25. ch.pipeline().addLast("exception",
  26. new ExceptionHandler());
  27. }
  28. });
  29. log.info("客户端开始连接 [疯狂创客圈IM]");
  30. ChannelFuture f = b.connect();
  31. f.addListener(connectedListener);
  32. } catch (Exception e) {
  33. log.info("客户端连接失败!" + e.getMessage());
  34. }
  35. }
  36. //…
  37. }

8.4 服务端的登录响应的实战案例

服务端的登录处理流程是:

  1. ProtobufDecoder解码器把请求ByteBuf数据包解码成Protobuf数据包。
  2. UserLoginRequestHandler登录处理器负责处理Protobuf数据包,进行一些必要的判断和预处理后,启动LoginProcesser登录业务处理器,以异步方式进行登录验证处理。
  3. LoginProcesser通过数据库或者远程接口完成用户验证,根据验证处理的结果生成登录成功/失败的登录响应报文,并发送给客户端。

    8.4.1 服务端流水线的装配

    ```java package com.crazymakercircle.imServer.server; //… public class ChatServer {
    1. //…
    2. @Autowired
    3. private LoginRequestHandler loginRequestHandler; //登录请求处理器
    4. @Autowired
    5. private ServerExceptionHandler serverExceptionHandler; //服务器异常处理器
    6. public void run() {
    7. try {
    8. //省略Bootstrap的配置选项
    9. //5 装配流水线
    10. b.childHandler(new ChannelInitializer<SocketChannel>() {
    11. //有连接到达时会创建一个子通道
    12. protected void initChannel(SocketChannel ch) …{
    13. //装配子通道流水线中的Handler业务处理器
  1. <a name="Z1uiS"></a>
  2. ## 8.4.2 LoginRequestHandler
  3. ```java
  4. package com.crazymakercircle.imServer.handler;
  5. //…
  6. @Slf4j
  7. @Service("LoginRequestHandler")
  8. @ChannelHandler.Sharable
  9. public class LoginRequestHandler extends ChannelInboundHandlerAdapter {
  10. @Autowired
  11. LoginProcesser loginProcesser;
  12. public void channelRead(ChannelHandlerContext ctx, Object msg) …{
  13. if (null == msg || !(msg instanceofProtoMsg.Message)) {
  14. super.channelRead(ctx, msg);
  15. return;
  16. }
  17. ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
  18. //取得请求类型
  19. ProtoMsg.HeadType headType = pkg.getType();
  20. if (!headType.equals(loginProcesser.type())) {
  21. super.channelRead(ctx, msg);
  22. return;
  23. }
  24. ServerSession session = new ServerSession(ctx.channel());
  25. //异步任务,处理登录的逻辑
  26. CallbackTaskScheduler.add(new CallbackTask<Boolean>() {
  27. @Override
  28. public Boolean execute()…{
  29. boolean r = loginProcesser.action(session, pkg);
  30. return r;
  31. }
  32. //异步任务返回
  33. @Override
  34. public void onBack(Boolean r) {
  35. if (r) {
  36. ctx.pipeline().remove(LoginRequestHandler.this);
  37. log.info("登录成功:" + session.getUser());
  38. } else {
  39. ServerSession.closeSession(ctx);
  40. log.info("登录失败:" + session.getUser());

8.4.3 LoginProcesser

  1. package com.crazymakercircle.imServer.processer;
  2. //…
  3. @Slf4j
  4. @Service("LoginProcesser")
  5. public class LoginProcesser extends AbstractServerProcesser {
  6. @Autowired
  7. LoginResponseBuilderloginResponseBuilder;
  8. @Override
  9. public ProtoMsg.HeadTypetype() {
  10. return ProtoMsg.HeadType.LOGIN_REQUEST;
  11. }
  12. @Override
  13. public boolean action(ServerSession session,ProtoMsg.Message proto){
  14. //取出token验证
  15. ProtoMsg.LoginRequest info = proto.getLoginRequest();
  16. long seqNo = proto.getSequence();
  17. User user = User.fromMsg(info);
  18. //检查用户
  19. booleanisValidUser = checkUser(user);
  20. if (!isValidUser) {
  21. ProtoInstant.ResultCodeEnum resultcode =
  22. ProtoInstant.ResultCodeEnum.NO_TOKEN;
  23. //生成登录失败的报文
  24. ProtoMsg.Message response =
  25. loginResponseBuilder.loginResponse(resultcode, seqNo, "-1");
  26. //发送登录失败的报文
  27. session.writeAndFlush(response);
  28. return false;
  29. }
  30. session.setUser(user);
  31. session.bind();
  32. //登录成功
  33. ProtoInstant.ResultCodeEnum resultcode =
  34. ProtoInstant.ResultCodeEnum.SUCCESS;
  35. //生成登录成功的报文
  36. ProtoMsg.Message response = loginResponseBuilder.
  37. loginResponse(resultcode, seqNo, session.getSessionId());
  38. //发送登录成功的报文
  39. session.writeAndFlush(response);
  40. return true;
  41. }
  42. private booleancheckUser(User user) {
  43. if (SessionMap.inst().hasLogin(user)) {
  44. return false;
  45. }
  46. //验证用户,比较耗时的操作,需要200毫秒以上的时间甚至更多
  47. //方法1:调用远程用户RESTful校验服务
  48. //方法2:调用数据库接口校验
  49. return true;
  50. }
  51. }

8.4.4 EventLoop线程和业务线程相互隔离

  1. 创建Netty的EventLoopGroup线程池,专用于处理耗时任务。 ```java //创建一个独立的线程池,假定有32条线程

    EventExecutorGroup threadGroup = new DefaultEventExecutorGroup(32); final OutHandlerDemo handlerA = new OutHandlerDemo();//创建处理器 ChannelInitializer i = new ChannelInitializer(){

    1. protected void initChannel(EmbeddedChannel ch){
    2. //处理器加入通道时,从专用threadGroup池中绑定一条线程
    3. ch.pipeline().addLast(threadGroup,handler);
    4. }

    };

  1. 2. 创建一个专门的Java线程池,专用于处理耗时任务。
  2. ```java
  3. package com.crazymakercircle.cocurrent;
  4. //…
  5. public class FutureTaskScheduler extends Thread
  6. {
  7. //方法二是使用自建的线程池时专用于处理耗时操作
  8. private static final ExecutorService POOL=
  9. Executors.newFixedThreadPool(10);
  10. //添加耗时任务
  11. public static void add(Runnable executeTask)
  12. {
  13. POOL.submit(executeTask);
  14. }
  15. }

8.5 详解Session服务器会话

8.5.1 通道的容器属性

image.png

  1. AttributeKey不是原始的键(如Map中的键),而是一个键的包装类。AttributeKey确保了键的唯一性,在单个Netty应用中,AttributeKey必须唯一。
  2. 这里的Attribute值不是原始的值(如Map中的值),也是值的包装类。原始的值就放置在Attribute包装实例中,可以通过Attribute包装类实现值的读取(get)和设置(set)。

在Netty中,接口AttributeMap的源代码如下:

  1. package io.netty.util;
  2. public interface AttributeMap {
  3. <T> Attribute<T> attr(AttributeKey<T> key);
  4. }
  1. Attribute的设值 ```java //定义键 public static final AttributeKey SESSION_KEY = AttributeKey.valueOf(“SESSION_KEY”); //… //通过设置将会话绑定到通道 channel.attr(SESSION_KEY).set(session);
  1. AttributeKey的创建需要用到静态方法AttributeKey.valueOf(String)方法。该方法的返回值为一个AttributeKey实例,其泛型参数为实际键-值对中值的实际类型。如果实际的值是ServerSession类型,则定义键的泛型参数为ServerSession,整个AttributeKey定义为AttributeKey<ServerSession>。
  2. ```java
  3. //键的泛型形参是设置的值类型
  4. public static final AttributeKey<ServerSession> SESSION_KEY =
  5. AttributeKey.valueOf("SESSION_KEY");

创建完AttributeKey后,就可以通过通道完成键-值对的设值(set)、取值(get)了。常常使用链式调用,首先通过通道的attr(AttributeKey)方法取得value的包装类Attribute实例,然后通过Attribute的set()方法设置真正的值。在例子中,值是一个会话(Session)实例。
这里的AttributeKey一般定义为一个常量,需要提前定义;它的泛型参数是最终的Attribute的包装值value的数据类型。

  1. Attribute取值 ```java //取得Attribute实例 Attribute attribute = ctx.channel().attr(SESSION_KEY); ServerSession session=attribute.get();
  1. <a name="ag0PU"></a>
  2. ## 8.5.2 ServerSession服务端会话类
  3. ```java
  4. package com.crazymakercircle.imServer.server;
  5. //…
  6. public class ServerSession {
  7. public static final AttributeKey<ServerSession> SESSION_KEY =
  8. AttributeKey.valueOf("SESSION_KEY");
  9. private Channel channel; //通道
  10. private User user; //用户
  11. private final String sessionId;//会话唯一标识
  12. private boolean isLogin = false;//登录状态
  13. public ServerSession(Channel channel) {
  14. this.channel = channel;
  15. this.sessionId = buildNewSessionId();
  16. }
  17. //反向导航
  18. public static ServerSession getSession(ChannelHandlerContext ctx) {
  19. Channel channel = ctx.channel();
  20. return channel.attr(ServerSession.SESSION_KEY).get();
  21. }
  22. //和通道实现双向绑定
  23. public ServerSession bind() {
  24. log.info(" ServerSession绑定会话 " + channel.remoteAddress());
  25. channel.attr(ServerSession.SESSION_KEY).set(this);
  26. SessionMap.inst().addSession(getSessionId(), this);
  27. isLogin = true;
  28. return this;
  29. }
  30. //构造session id
  31. private static String buildNewSessionId() {
  32. String uuid = UUID.randomUUID().toString();
  33. return uuid.replaceAll("-", "");
  34. }
  35. //省略不是太重要的方法
  36. }

8.5.3 SessionMap会话管理器

这里使用一个会话容器SessionMap,负责管理服务端所有的ServerSession,其内部使用一个线程安全的ConcurrentHashMap类型的映射成员,保持sessionId到服务端ServerSession的映射。

  1. package com.crazymakercircle.imServer.server;
  2. //…
  3. public final class SessionMap {
  4. private ConcurrentHashMap<String, ServerSession> map =
  5. new ConcurrentHashMap<String, ServerSession>();
  6. //增加会话对象
  7. public void addSession(String sessionId, ServerSession s) {
  8. map.put(sessionId, s);
  9. log.info("用户登录:id= " + s.getUser().getUid()
  10. + " 在线总数: " + map.size());
  11. }
  12. //获取会话对象
  13. public ServerSession getSession(String sessionId) {
  14. if (map.containsKey(sessionId)) {
  15. return map.get(sessionId);
  16. } else {
  17. return null;
  18. }
  19. }
  20. //省略不是太重要的方法
  21. }

8.6 点对点单聊的实战案例

8.6.1 单聊的端到端流程

image.png

8.6.2 客户端的ChatConsoleCommand收集聊天内容

聊天消息收集类ChatConsoleCommand负责从控制台Scanner实例收集用户输入的聊天消息(格式为id:message),代码如下:

  1. package com.crazymakercircle.imClient.command;
  2. //…
  3. @Data
  4. @Service("ChatConsoleCommand")
  5. public class ChatConsoleCommand implements BaseCommand {
  6. private String toUserId; //目标用户id(这里为登录的用户名称)
  7. private String message; //聊天内容
  8. public static final String KEY = "2";
  9. @Override
  10. public void exec(Scanner scanner) {
  11. System.out.print("请输入聊天的消息(id:message):");
  12. String[] info = null;
  13. while (true) {
  14. String input = scanner.next();
  15. info = input.split(":");
  16. if (info.length != 2) {
  17. System.out.println("请输入聊天的消息(id:message):");
  18. }else {
  19. break;
  20. }
  21. }
  22. toUserId = info[0];
  23. message = info[1];
  24. }
  25. //…
  26. }

8.6.3 客户端的CommandController发送POJO

ChatConsoleCommand的调用者是CommandController命令控制类,该控制类在收集完成聊天内容和目标用户后,在自己的startOneChat()方法中调用ChatSender发送实例,将聊天消息组装成Protobuf数据包,通过客户端的通道发往服务端。

  1. package com.crazymakercircle.imClient.client;
  2. //…
  3. public class CommandController {
  4. @Autowired
  5. ChatConsoleCommand chatConsoleCommand; //聊天命令收集器实例
  6. //省略其他成员
  7. public void startCommandThread()throws InterruptedException {
  8. Thread.currentThread().setName("命令线程");
  9. while (true) {
  10. //建立连接
  11. while (connectFlag == false) {
  12. //开始连接
  13. startConnectServer();
  14. waitCommandThread();
  15. }
  16. //处理命令
  17. while (null != session ) {
  18. Scanner scanner = new Scanner(System.in);
  19. clientCommandMenu.exec(scanner);
  20. String key = clientCommandMenu.getCommandInput();
  21. BaseCommand command = commandMap.get(key);
  22. //…
  23. switch (key) {
  24. case ChatConsoleCommand.KEY:
  25. command.exec(scanner);
  26. startOneChat((ChatConsoleCommand) command);
  27. break;
  28. //省略其他命令
  29. }
  30. }
  31. }
  32. }
  33. //发送单聊消息
  34. private void startOneChat(ChatConsoleCommand c) {
  35. chatSender.setSession(session);
  36. chatSender.setUser(user);
  37. chatSender.sendChatMsg(c.getToUserId(), c.getMessage());
  38. }
  39. //省略其他的命令处理
  40. }

8.6.4 服务端的ChatRedirectHandler进行消息转发

服务端收到聊天消息后会进行消息的转发,主要由消息转发处理器ChatRedirectHandler负责,其大致的工作如下:

  1. 对消息类型进行判断:判断是否为聊天请求Protobuf数据包。如果不是,通过调用super.channelRead(ctx, msg)将消息交给流水线的下一站。
  2. 对消息发送方用户登录进行判断:如果没有登录,则不能发送消息。
  3. 开启异步的消息转发,由其ChatRedirectProcesser实例负责完成消息转发。

    1. package com.crazymakercircle.imServer.handler;
    2. //…
    3. public class ChatRedirectHandler extends ChannelInboundHandlerAdapter {
    4. @Autowired
    5. ChatRedirectProcesserchatRedirectProcesser;
    6. public void channelRead(ChannelHandlerContext ctx, Object msg) …{
    7. //判断消息实例
    8. if (null == msg || !(msg instanceofProtoMsg.Message)) {
    9. super.channelRead(ctx, msg);
    10. return;
    11. }
    12. //判断消息类型
    13. ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
    14. ProtoMsg.HeadType headType = ((ProtoMsg.Message) msg).getType();
    15. if (!headType.equals(chatRedirectProcesser.type())) {
    16. super.channelRead(ctx, msg);
    17. return;
    18. }
    19. //判断是否登录
    20. ServerSession session = ServerSession.getSession(ctx);
    21. if (null == session || !session.isLogin()) {
    22. log.error("用户尚未登录,不能发送消息");
    23. return;
    24. }
    25. //异步处理IM消息转发的逻辑
    26. FutureTaskScheduler.add(() ->
    27. {
    28. chatRedirectProcesser.action(session, pkg);
    29. });
    30. }
    31. }

    8.6.5 服务端的ChatRedirectProcesser进行异步消息转发

    ChatRedirectProcesser异步消息转发类负责将消息发送到目标用户,这是一个异步执行的任务,其大致功能如下:

  4. 根据目标用户ID找出所有的服务端的会话列表。

  5. 为每一个会话转发一份消息 ```java package com.crazymakercircle.imServer.processer; //… public class ChatRedirectProcesser extends AbstractServerProcesser { @Override public ProtoMsg.HeadTypetype() {
    1. return ProtoMsg.HeadType.MESSAGE_REQUEST;
    } @Override public boolean action(ServerSessionfromSession,
    1. ProtoMsg.Message proto) {
    2. //聊天处理
    3. ProtoMsg.MessageRequest msg = proto.getMessageRequest();
    4. //获取接收方的chatID
    5. String to = msg.getTo();
    6. List<ServerSession> toSessions =
    7. SessionMap.inst().getSessionsBy(to);
    8. if (toSessions == null) {
    9. //接收方离线,这里一般会做离线消息处理
    10. Print.tcfo("[" + to + "] 不在线,发送失败!");
    11. } else {
    12. toSessions.forEach((session) -> {
    13. //将IM消息发送到每一个接收方的通道
    14. session.writeAndFlush(proto);
    15. });
    16. }
    17. return true;
    } } //由于一个用户可能有多个会话,因此需要通过调用SessionMap会话管理器的SessionMap.inst().getSessionsBy(uid)方法来取得这个用户的所有会话。 package com.crazymakercircle.imServer.server; //… @Slf4j @Data public final class SessionMap { //全部的会话映射 “uid->session” private ConcurrentHashMap map =
    1. new ConcurrentHashMap<String, ServerSession>();

//根据用户id获取会话集合 public ListgetSessionsBy(String userId) { List list = map.values() .stream() .filter(s ->s.getUser().getUid().equals(userId)) .collect(Collectors.toList()); return list; } //… }

  1. <a name="i6zBf"></a>
  2. ## 8.6.6 客户端的ChatMsgHandler聊天消息处理器
  3. 1. 对消息类型进行判断,判断是否为聊天请求Protobuf数据包。如果不是,通过super.channelRead(ctx, msg)将消息交给流水线的下一站。
  4. 1. 如果是聊天消息,就将聊天消息显示在控制台。
  5. ```java
  6. package com.crazymakercircle.imClient.handler;
  7. //…
  8. public class ChatMsgHandler extends ChannelInboundHandlerAdapter {
  9. @Override
  10. public void channelRead(ChannelHandlerContext ctx, Object msg) …{
  11. //判断类型
  12. ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
  13. ProtoMsg.HeadType headType = pkg.getType();
  14. if (!headType.equals(ProtoMsg.HeadType.MESSAGE_REQUEST)) {
  15. super.channelRead(ctx, msg);
  16. return; //不是聊天消息
  17. }
  18. ProtoMsg.MessageRequest req = pkg.getMessageRequest();
  19. String content = req.getContent();
  20. String uid = req.getFrom();
  21. System.out.println(" 收到消息 from uid:" + uid + " -> " + content);
  22. }
  23. }

8.7 详解心跳检测

8.7.1 网络连接的假死现象

什么是连接假死呢?如果底层的TCP连接(socket连接)已经断开,但是服务端并没有正常关闭套接字,服务端认为这条TCP连接仍然是存在的,则该连接处于“假死”状态。连接假死的具体表现如下:

  1. 在服务端,会有一些处于TCP_ESTABLISHED状态的“正常”连接。
  2. 在客户端,TCP客户端显示连接已经断开。
  3. 虽然客户端可以进行断线重连操作,但是上一次的连接状态依然被服务端认为有效,并且服务端的资源得不到正确释放,包括套接字上下文以及接收/发送缓冲区。

连接假死通常是由以下多个原因造成的,例如:

  1. 应用程序出现线程堵塞,无法进行数据的读写。
  2. 网络相关的设备出现故障,例如网卡、机房故障。
  3. 网络丢包。公网环境非常容易出现丢包和网络抖动等现象。

8.7.2 服务端的空闲检测

空闲检测就是每隔一段时间检测子通道是否有数据读写,如果有,则子通道是正常的;如果没有,则子通道被判定为假死,关掉子通道。
服务端如何实现空闲检测呢?使用Netty自带的IdleStateHandler空闲状态处理器就可以实现这个功能。下面的示例程序继承自IdleStateHandler,定义一个假死处理类:

  1. package com.crazymakercircle.imServer.handler;
  2. //…
  3. public class HeartBeatServerHandler extends IdleStateHandler {
  4. private static final int READ_IDLE_GAP = 150; //最大空闲,单位秒
  5. public HeartBeatServerHandler() {
  6. super(READ_IDLE_GAP, 0, 0, TimeUnit.SECONDS);
  7. }
  8. @Override
  9. protected void channelIdle(ChannelHandlerContext ctx,
  10. IdleStateEventevt) …{
  11. System.out.println(READ_IDLE_GAP + "秒内未读到数据,关闭连接");
  12. ServerSession.closeSession(ctx);
  13. }
  14. public void channelRead(ChannelHandlerContext ctx, Object msg){
  15. //…
  16. ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
  17. //判断和处理心跳数据包
  18. ProtoMsg.HeadType headType = pkg.getType();
  19. if (headType.equals(ProtoMsg.HeadType.HEART_BEAT)) {
  20. //异步处理,将心跳数据包直接回复给客户端
  21. FutureTaskScheduler.add(() -> {
  22. if (ctx.channel().isActive()) {
  23. ctx.writeAndFlush(msg);
  24. }
  25. });
  26. }
  27. super.channelRead(ctx, msg);
  28. }
  29. }
  1. HeartBeatServerHandler的构造函数中,调用了基类IdleStateHandler的构造函数,传递了四个参数:
  1. public HeartBeatServerHandler() {
  2. super(READ_IDLE_GAP, 0, 0, TimeUnit.SECONDS);
  3. }
  1. 其中,第一个参数表示入站(Inbound)空闲时长,指的是一段时间内如果没有数据入站,就判定连接假死;第二个参数是出站(Outbound)空闲时长,指的是一段时间内如果没有数据出站,就判定连接假死;第三个参数是出/入站检测时长,表示在一段时间内如果没有出站或者入站,就判定连接假死;最后一个参数表示时间单位,TimeUnit.SECONDS表示秒。<br />假死被判定之后,IdleStateHandler类会回调自己的channelIdle()方法。在这个子类的重写版本中,重写了空闲回调方法,手动关闭连接。
  1. @Override
  2. protected void channelIdle(ChannelHandlerContext ctx,
  3. IdleStateEventevt) …{
  4. System.out.println(READ_IDLE_GAP + "秒内未读到数据,关闭连接");
  5. ServerSession.closeSession(ctx);
  6. }

8.7.3 客户端的心跳发送

与服务端的空闲检测相配合,客户端需要定期发送数据包到服务端,通常这个数据包称为心跳数据包。接下来,定义一个Handler业务处理器定期发送心跳数据包给服务端。

  1. package com.crazymakercircle.imClient.handler;
  2. //…
  3. public class HeartBeatClientHandler
  4. extends ChannelInboundHandlerAdapter {
  5. //心跳的时间间隔,单位为秒
  6. private static final int HEARTBEAT_INTERVAL = 50;
  7. //在Handler业务处理器被加入到流水线时,开始发送心跳数据包
  8. @Override
  9. public void handlerAdded(ChannelHandlerContext ctx) …{
  10. ClientSession session = ClientSession.getSession(ctx);
  11. User user = session.getUser();
  12. HeartBeatMsgBuilder builder =
  13. new HeartBeatMsgBuilder(user, session);
  14. ProtoMsg.Message message = builder.buildMsg();
  15. //发送心跳数据包
  16. heartBeat(ctx, message);
  17. }
  18. //使用定时器,定期发送心跳数据包
  19. public void heartBeat(ChannelHandlerContext ctx,
  20. ProtoMsg.MessageheartbeatMsg) {
  21. //提交一个一次性的定时任务
  22. ctx.executor().schedule(() -> {
  23. if (ctx.channel().isActive()) {
  24. log.info(" 发送HEART_BEAT 消息to server");
  25. ctx.writeAndFlush(heartbeatMsg);
  26. //递归调用:提交下一个一次性的定时任务,发送下一次的心跳
  27. heartBeat(ctx, heartbeatMsg);
  28. }
  29. }, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
  30. }
  31. //接收到服务器的心跳回写
  32. @Override
  33. public void channelRead(ChannelHandlerContext ctx, Object msg){
  34. //判断类型
  35. ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
  36. ProtoMsg.HeadType headType = pkg.getType();
  37. if (headType.equals(ProtoMsg.HeadType.HEART_BEAT)) {
  38. log.info(" 收到回写的HEART_BEAT 消息 from server");
  39. return;
  40. } else {
  1. HeartBeatClientHandler实例并不是一开始就装配到了流水线中的,它装配的时机是在登录成功之后。登录处理器LoginResponseHandler的相关代码如下:
  1. package com.crazymakercircle.imClient.clientHandler;
  2. //…
  3. public class LoginResponseHandler
  4. extends ChannelInboundHandlerAdapter {
  5. @Override
  6. public void channelRead(ChannelHandlerContext ctx, Object msg)…{
  7. //省略登录数据包的预处理
  8. if (!result.equals(ProtoInstant.ResultCodeEnum.SUCCESS)) {
  9. //登录失败
  10. log.info(result.getDesc());
  11. } else {
  12. //登录成功
  13.     //省略其他处理
  14. //在编码器后面动态插入心跳处理器
  15. ChannelPipeline p=ctx.pipeline();
  16. p.addAfter("encoder","heartbeat",new HeartBeatClientHandler());
  17. }
  18.    }
  19. }