1 架构设计演变

(1) 1.0

image.png
缺点: 没有必要把app server和 im server分开, 然后强行通过rpc来通信

(2) 2.0

image.png
缺点: 未采用微服务架构, 不方便协同开发,
有其它服务需要服务端主动向客户端推送消息,为了减少重复开发,需要微服务。

(3) 3.0

image.png
缺点: 当用户量过多, 一台服务器支持不了这么多websocket长连接时, 无法进行水平扩展

(4) 4.0 (当前架构)

image.png

2 技术难点

(1) 如何保证消息不丢失?

image.png
在这个过程中,丢失消息有以下几种情况:

  1. 因为网络不通等原因导致用户A把消息发送到IM服务器失败;
  2. IM服务器存储消息失败;
  3. 用户A在超时时间内未收到IM服务器返回的结果;
  4. 由于IM服务器断电等原因导致消息未能成功推送给用户B

解决方案:

  1. 前三种情况(http),用户A将被提示消息发送失败;用户A可自行选择是否重发
  2. 后一种情况(ws),用户B未收到消息. websocket API可判断出消息发送是否成功, 未发送成功, 则断开ws连接, 视为用户B离线, 用户B下次建立连接后自行通过http拉取离线消息

    (2) 如何保证消息不重复?

  • 重复存储

在上述步骤3中, 由于网络原因导致 用户A没收到响应成功, 这时触发重传机制, 然后server端会再收到一次该消息导致重复存储
解决方案:
客户端发送消息时为每个消息生成一个uuid发给服务端, 服务端存储时若数据库报错uuid已存在, 则不再把消息放入消息队列, 从而不会重复存储

  • 重复推送

server端推送给用户B消息时, 若推送失败, 这部分代码是我们自己写的, 可以不重复推送, 直接断开ws连接, 视为用户B离线,
而且就算断线重连后, 拉取离线消息可能重复, client端消息采用插入机制, 若时间戳相同, 且uuid相同, 则不插入重复收到的消息

(3) 如何保证消息的时序一致性?

  • 什么是时序一致性?

    对于点对点的聊天场景,时序一致性保证接收方的接收顺序和发送方的发出顺序一致; 对于群聊场景,时序一致性保证所有接收人看到的消息展现顺序一致。

  • 时序一致性的难点?

    多发送方、多接收方、服务端多线程并发处理情况下,无法保证时序一致性。 分布式环境下,多个机器的本地时钟不一致,没有“全局时钟”,不能用“本地时间”保证时序的一致性。

其实有”全局时钟”, unix时间戳就是全球统一的, 不随时区变化

  • 单聊, 如何保证一致性?

    发送方A在某群内依次发出了msg1,msg2,msg3, 如何保证接收方B的群内消息显示顺序也是1,2,3呢?

在A往B发出的消息中,加上发送方A本地的一个绝对时序,来表示接收方B的展现时序, 如果接收方B先收到msg3,msg3会先展现,后收到msg1和msg2后,会展现在msg3的前面。这意味着, 前端不能直接append, 而是从后往前找到合适的位置插入接收到的消息

  • 群聊, 如何保证一致性?

    N个群友在一个群里聊,怎么保证所有群友收到的消息显示时序一致?

不能再利用发送方的seq来保证时序,因为发送方不单点,时间也不一致
可以利用数据库的入库unix时间来统一时间

综上, 最终方案, 前端根据消息的入库时间的unix时间戳来对收到的消息进行排序

(4) 群聊的消息是读扩散, 还是写扩散?

  • 读扩散: 群内消息只存一份, 订阅了该发布者的用户,从内容发布者处获取数据。当订阅者过多时,存在读瓶颈。适用于写多,读少场景。
  • 写扩散:内容发布者会将每次发布的数据推送到每个接收者处,接收者只需要从自己处读取数据便可。当订阅者过多时,存在写瓶颈。适用于读多,写少场景。

image.png
群聊的消息 写少读多, 所以采用读扩散

聊天消息 在 kafka中作为一个topic, 分区只有一个, 但可以有多个副本, 副本数=kafka的broker数

每个用户上线后, message-api都要查看ta在哪些组里, 给它加入到这些组名对应的channel里, 后面收到消息, 直接取出channel里的消息广播给所有在线client即可

(5) 如何保证所有聊天信息的完整性?

  • 用户上线时拉取各个群的离线消息
  • 之后的在线消息直接由服务端推送

    (6) 如何保证离线消息按需拉取?

  1. 用户先调用user_group_list, 获取自己的group_list
  2. 然后调用message 的 pull, 对每个group, 根据group_user中的last_msg_id, 获取到上次更新的消息id, 到message表里查找该group对应的消息, 且msg_id>last_msg_id的所有消息, 返回给用户

但是, 鉴于离线消息可能太多的情况, 每个群组的离线最多返回10条, 用户需要自己上拉刷新

(7) 用户A在server1上, 用户B在server2上, 用户A发给用户B的消息如何推送给用户B?

用户在获取到自己的群组后, 加入每个群对应的group, 方便websocket广播
用户A 上传到 某个群组 的消息先放到MQ中(生产者), message-api监听MQ, 取出消息广播给该room中的所有用户
image.png

(8) 如何实现消息漫游?

  • QQ能够拉取到所有历史聊天消息,这个就是消息漫游。
  • wechat目前只支持“多点登录”同时收发在线消息,没有实现“消息漫游”
  • 本开源项目的设计是希望能够像qq一样实现消息漫游, 有两种方案
    • 方案1: 用redis缓存要记录每个客户端(key=uid+平台) 拉取到的历史消息的最新时间戳, 每次上线后拉取离线消息都从这个时间后开始拉取
    • 方案2(实际采用): 离线消息全部通过上拉获取, 每次上拉可获取10条, 这种方式最简单, 不用前端进行离线消息存储

      (9) 数据量大之后如何分库分表?

      数据量大之后, 消息表一定是最先突破2100w的,
      可以按 消息表的主键id来分库分表
      具体可以看看TiDB怎么做的

      (10) 用户量大后, ws长连接多了怎么处理?

      msg-api水平扩容, 按每50w个同时在线用户 加一台机器

      每个tcp缓冲区消耗的内存大约是4KB, 在golang中, 每个tcp会分配一个协程, 最小2KB 平均一个websocket, 大约是6KB以上, 就按10KB来算吧 一台机器内存为8GB, 按msg-api最大能用5GB算吧 xx = 5GB / 10KB = 50w

3 技术栈

  • Redis: 缓存, 存储在线用户uid, 和 与用户建立websocket长连接的接入层server的addr
  • Mysql: 存储用户信息, 聊天信息, 群组信息
  • kafka: 逻辑层 与 接入层的解耦, 异步, 传递在线消息
  • OSS: 存储图片,视频等附件

    4 User服务

    (1) 数据表

    | id | user表主键, 自增 | | —- | —- | | email | unique | | password | | | nick_name | | | gender | 1男, 2女 | | avator_url | | | create_time | timestamp |

(2) 协议

  1. syntax = "proto3";
  2. option go_package = "./proto";
  3. // 注册
  4. message RegisterRequest {
  5. string email = 1;
  6. string password = 2;
  7. string nickName = 3;
  8. int64 gender = 4;
  9. }
  10. message RegisterResponse {
  11. }
  12. // 登录
  13. message LoginRequest {
  14. string email = 1;
  15. string password = 2;
  16. }
  17. message LoginResponse {
  18. string accessToken = 1;
  19. int64 accessExpire = 2;
  20. }
  21. // 个人信息
  22. message PersonalInfoRequest {
  23. int64 id = 1;
  24. }
  25. message PersonalInfoResponse {
  26. string nickName = 1;
  27. int64 gender = 2;
  28. string email = 3;
  29. }
  30. service UserClient {
  31. rpc Login(LoginRequest) returns(LoginResponse);
  32. rpc Register(RegisterRequest) returns(RegisterResponse);
  33. rpc PersonalInfo(PersonalInfoRequest) returns(PersonalInfoResponse);
  34. }

5 Message服务

(1) 数据表

msg_chat聊天消息表, 持久化存放全部消息(无论在线离线)

id 自增, uint64
group_id 分组id,
两人聊天: A_B,
多人聊天: A_timestamp
msg_type 枚举值(1系统, 2文本, 3文件, 4语音, 5链接)
sender_id 消息发送者的用户id
content 消息内容, 最多65535个字节
create_time 创建时间 (服务器时间戳), 每次返回给前端的是Unix时间戳的毫秒

(2) 协议

  1. syntax = "proto3";
  2. option go_package = "./proto";
  3. // 聊天消息结构
  4. message ChatMsg {
  5. int64 groupId=1; // 群id
  6. int64 senderId=2; // 发送者uid
  7. int64 type=3; // 消息类型 0文本, 1图片, 2视频, 3语音
  8. string content=4; // 消息内容
  9. string uuid=5; // 作用是去重
  10. int64 createTime=6; // 创建时间
  11. }
  12. // 上传消息
  13. message UploadRequest {
  14. int64 groupId=1;
  15. int64 senderId=2;
  16. int64 type=3;
  17. string content=4;
  18. string uuid=5;
  19. }
  20. message UploadResponse {
  21. string uuid=1;
  22. int64 createTime=2; // 创建时间
  23. }
  24. // 拉取消息
  25. message PullRequest {
  26. string email=1; // 用户邮箱
  27. string platform=2; // 设备信息
  28. int64 groupId=3; // 要拉取哪个群的消息
  29. }
  30. message PullResponse {
  31. repeated ChatMsg list=1;
  32. }
  33. service MessageClient {
  34. rpc Upload(UploadRequest) returns(UploadResponse);
  35. rpc Pull(PullRequest) returns(PullResponse);
  36. }

6 Group服务

(1) 数据表

  • group | id | 字符串, 两人的为uid1_uid2, 多人的为uuid | | —- | —- | | name | 群名称(默认是 用户A昵称,用户B昵称) | | type | 群类型, 单聊1, 群聊2 | | status | 0表示无效(未同意), 1表示有效, 2表示无效(拉黑) | | config | 群配置, json
    {
    “notice”: “群公告”,
    “owner”: 2,
    “curNum”: 2,
    } | | create_time | 创建时间 |

  • group_user | id | 自增 | | —- | —- | | group_id | 群id (index) | | user_id | 用户id (index) | | alias_name | 用户对该群(或该用户)的备注名 | | create_time | 进群时间 |

  • redis缓存

    “uid:platform:group_id”: last_msg_id

(2) 协议

  1. syntax = "proto3";
  2. option go_package = "./proto";
  3. // 添加好友
  4. message AddFriendRequest {
  5. int64 fromUid=1;
  6. int64 toUid=2;
  7. }
  8. message AddFriendResponse {
  9. string groupId=1; // 返回创建的这个新群, 但是这个群还处于删除态, 不可发聊天消息
  10. }
  11. // 处理好友申请
  12. message HandleFriendRequest {
  13. string groupId=1;
  14. bool isAgree=2;
  15. }
  16. message HandleFriendResponse {
  17. string groupId=1;
  18. }
  19. // 获取群内的用户
  20. message GroupUsersRequest {
  21. string groupId=1;
  22. }
  23. message GroupUsersResponse {
  24. repeated int64 list=1;
  25. }
  26. service GroupClient {
  27. rpc AddFriend(AddFriendRequest) returns(AddFriendResponse);
  28. rpc HandleFriend(HandleFriendRequest) returns(HandleFriendResponse);
  29. rpc GroupUsers(GroupUsersRequest) returns(GroupUsersResponse);
  30. }
  31. 3) invite
  32. 群内用户 可以 邀请用户进群
  33. 4) remove
  34. 群主 可以 把某用户移出群聊
  35. 5) exit
  36. 群内用户 可以自行退群, 群主会自动移交


7 前端技术难点

(1) 下拉刷新

  1. ### `pull_to_refresh` 下拉组件
  2. lib/pages/category/widgets/news_page_list.dart
  3. ```dart
  4. @override
  5. Widget build(BuildContext context) {
  6. super.build(context);
  7. return GetX<CategoryController>(
  8. init: controller,
  9. builder: (controller) => SmartRefresher(
  10. enablePullUp: true,
  11. controller: controller.refreshController,
  12. onRefresh: controller.onRefresh,
  13. onLoading: controller.onLoading,
  14. child: CustomScrollView(
  15. slivers: [
  16. SliverPadding(
  17. padding: EdgeInsets.symmetric(
  18. vertical: 0.w,
  19. horizontal: 0.w,
  20. ),
  21. sliver: SliverList(
  22. delegate: SliverChildBuilderDelegate(
  23. (content, index) {
  24. var item = controller.state.newsList[index];
  25. return newsListItem(item);
  26. },
  27. childCount: controller.state.newsList.length,
  28. ),
  29. ),
  30. ),
  31. ],
  32. ),
  33. ),
  34. );
  35. }

controller: controller.refreshController 上下拉控制器

onRefresh: controller.onRefresh 下拉刷新数据

onLoading: controller.onLoading 上拉载入数据

SliverChildBuilderDelegate 动态构建每一项, childCount 告诉组件一共有多少数据

controller 中写入业务

lib/pages/category/controller.dart

  • onRefresh 下拉刷新
  1. void onRefresh() {
  2. fetchNewsList(isRefresh: true).then((_) {
  3. refreshController.refreshCompleted(resetFooterState: true);
  4. }).catchError((_) {
  5. refreshController.refreshFailed();
  6. });
  7. }

refreshController.refreshCompleted() 刷新完成

refreshController.refreshFailed() 刷新失败

  • onLoading 上拉载入
  1. void onLoading() {
  2. if (state.newsList.length < total) {
  3. fetchNewsList().then((_) {
  4. refreshController.loadComplete();
  5. }).catchError((_) {
  6. refreshController.loadFailed();
  7. });
  8. } else {
  9. refreshController.loadNoData();
  10. }
  11. }

refreshController.loadComplete() 载入完成

refreshController.loadFailed() 载入失败

refreshController.loadNoData() 没有数据

  • fetch 所有数据
  1. // 拉取数据
  2. Future<void> fetchNewsList({bool isRefresh = false}) async {
  3. var result = await NewsAPI.newsPageList(
  4. params: NewsPageListRequestEntity(
  5. categoryCode: categoryCode,
  6. pageNum: curPage + 1,
  7. pageSize: pageSize,
  8. ),
  9. );
  10. if (isRefresh == true) {
  11. curPage = 1;
  12. total = result.counts!;
  13. state.newsList.clear();
  14. } else {
  15. curPage++;
  16. }
  17. state.newsList.addAll(result.items!);
  18. }

state.newsList.addAll(result.items!); 合并 list 集合 RxList 封装的

  • dispose 记得释放
  1. ///dispose 释放内存
  2. @override
  3. void dispose() {
  4. super.dispose();
  5. // dispose 释放对象
  6. refreshController.dispose();
  7. }

refreshController.dispose() 这个业务中就是下拉控件了,还有视频播放器、文本框啥的控制器都要记得释放。 ```

参考资料

后端:

前端: