1 架构设计演变
(1) 1.0
缺点: 没有必要把app server和 im server分开, 然后强行通过rpc来通信
(2) 2.0
缺点: 未采用微服务架构, 不方便协同开发,
有其它服务需要服务端主动向客户端推送消息,为了减少重复开发,需要微服务。
(3) 3.0
缺点: 当用户量过多, 一台服务器支持不了这么多websocket长连接时, 无法进行水平扩展
(4) 4.0 (当前架构)
2 技术难点
(1) 如何保证消息不丢失?
在这个过程中,丢失消息有以下几种情况:
- 因为网络不通等原因导致用户A把消息发送到IM服务器失败;
- IM服务器存储消息失败;
- 用户A在超时时间内未收到IM服务器返回的结果;
- 由于IM服务器断电等原因导致消息未能成功推送给用户B
解决方案:
- 前三种情况(http),用户A将被提示消息发送失败;用户A可自行选择是否重发
- 后一种情况(ws),用户B未收到消息. websocket API可判断出消息发送是否成功, 未发送成功, 则断开ws连接, 视为用户B离线, 用户B下次建立连接后自行通过http拉取离线消息
(2) 如何保证消息不重复?
- 重复存储
在上述步骤3中, 由于网络原因导致 用户A没收到响应成功, 这时触发重传机制, 然后server端会再收到一次该消息导致重复存储
解决方案:
客户端发送消息时为每个消息生成一个uuid发给服务端, 服务端存储时若数据库报错uuid已存在, 则不再把消息放入消息队列, 从而不会重复存储
- 重复推送
server端推送给用户B消息时, 若推送失败, 这部分代码是我们自己写的, 可以不重复推送, 直接断开ws连接, 视为用户B离线,
而且就算断线重连后, 拉取离线消息可能重复, client端消息采用插入机制, 若时间戳相同, 且uuid相同, 则不插入重复收到的消息
(3) 如何保证消息的时序一致性?
什么是时序一致性?
对于点对点的聊天场景,时序一致性保证接收方的接收顺序和发送方的发出顺序一致; 对于群聊场景,时序一致性保证所有接收人看到的消息展现顺序一致。
时序一致性的难点?
多发送方、多接收方、服务端多线程并发处理情况下,无法保证时序一致性。 分布式环境下,多个机器的本地时钟不一致,没有“全局时钟”,不能用“本地时间”保证时序的一致性。
其实有”全局时钟”, unix时间戳就是全球统一的, 不随时区变化
- 单聊, 如何保证一致性?
发送方A在某群内依次发出了msg1,msg2,msg3, 如何保证接收方B的群内消息显示顺序也是1,2,3呢?
在A往B发出的消息中,加上发送方A本地的一个绝对时序,来表示接收方B的展现时序, 如果接收方B先收到msg3,msg3会先展现,后收到msg1和msg2后,会展现在msg3的前面。这意味着, 前端不能直接append, 而是从后往前找到合适的位置插入接收到的消息
- 群聊, 如何保证一致性?
N个群友在一个群里聊,怎么保证所有群友收到的消息显示时序一致?
不能再利用发送方的seq来保证时序,因为发送方不单点,时间也不一致
可以利用数据库的入库unix时间来统一时间
综上, 最终方案, 前端根据消息的入库时间的unix时间戳来对收到的消息进行排序
(4) 群聊的消息是读扩散, 还是写扩散?
- 读扩散: 群内消息只存一份, 订阅了该发布者的用户,从内容发布者处获取数据。当订阅者过多时,存在读瓶颈。适用于写多,读少场景。
- 写扩散:内容发布者会将每次发布的数据推送到每个接收者处,接收者只需要从自己处读取数据便可。当订阅者过多时,存在写瓶颈。适用于读多,写少场景。
群聊的消息 写少读多, 所以采用读扩散
聊天消息 在 kafka中作为一个topic, 分区只有一个, 但可以有多个副本, 副本数=kafka的broker数
每个用户上线后, message-api都要查看ta在哪些组里, 给它加入到这些组名对应的channel里, 后面收到消息, 直接取出channel里的消息广播给所有在线client即可
(5) 如何保证所有聊天信息的完整性?
- 用户先调用user_group_list, 获取自己的group_list
- 然后调用message 的 pull, 对每个group, 根据group_user中的last_msg_id, 获取到上次更新的消息id, 到message表里查找该group对应的消息, 且msg_id>last_msg_id的所有消息, 返回给用户
但是, 鉴于离线消息可能太多的情况, 每个群组的离线最多返回10条, 用户需要自己上拉刷新
(7) 用户A在server1上, 用户B在server2上, 用户A发给用户B的消息如何推送给用户B?
用户在获取到自己的群组后, 加入每个群对应的group, 方便websocket广播
用户A 上传到 某个群组 的消息先放到MQ中(生产者), message-api监听MQ, 取出消息广播给该room中的所有用户
(8) 如何实现消息漫游?
- QQ能够拉取到所有历史聊天消息,这个就是消息漫游。
- wechat目前只支持“多点登录”同时收发在线消息,没有实现“消息漫游”
- 本开源项目的设计是希望能够像qq一样实现消息漫游, 有两种方案
- 方案1: 用redis缓存要记录每个客户端(key=uid+平台) 拉取到的历史消息的最新时间戳, 每次上线后拉取离线消息都从这个时间后开始拉取
- 方案2(实际采用): 离线消息全部通过上拉获取, 每次上拉可获取10条, 这种方式最简单, 不用前端进行离线消息存储
(9) 数据量大之后如何分库分表?
数据量大之后, 消息表一定是最先突破2100w的,
可以按 消息表的主键id来分库分表
具体可以看看TiDB怎么做的(10) 用户量大后, ws长连接多了怎么处理?
msg-api水平扩容, 按每50w个同时在线用户 加一台机器每个tcp缓冲区消耗的内存大约是4KB, 在golang中, 每个tcp会分配一个协程, 最小2KB 平均一个websocket, 大约是6KB以上, 就按10KB来算吧 一台机器内存为8GB, 按msg-api最大能用5GB算吧 xx = 5GB / 10KB = 50w
3 技术栈
- Redis: 缓存, 存储在线用户uid, 和 与用户建立websocket长连接的接入层server的addr
- Mysql: 存储用户信息, 聊天信息, 群组信息
- kafka: 逻辑层 与 接入层的解耦, 异步, 传递在线消息
- OSS: 存储图片,视频等附件
4 User服务
(1) 数据表
| id | user表主键, 自增 | | —- | —- | | email | unique | | password | | | nick_name | | | gender | 1男, 2女 | | avator_url | | | create_time | timestamp |
(2) 协议
syntax = "proto3";
option go_package = "./proto";
// 注册
message RegisterRequest {
string email = 1;
string password = 2;
string nickName = 3;
int64 gender = 4;
}
message RegisterResponse {
}
// 登录
message LoginRequest {
string email = 1;
string password = 2;
}
message LoginResponse {
string accessToken = 1;
int64 accessExpire = 2;
}
// 个人信息
message PersonalInfoRequest {
int64 id = 1;
}
message PersonalInfoResponse {
string nickName = 1;
int64 gender = 2;
string email = 3;
}
service UserClient {
rpc Login(LoginRequest) returns(LoginResponse);
rpc Register(RegisterRequest) returns(RegisterResponse);
rpc PersonalInfo(PersonalInfoRequest) returns(PersonalInfoResponse);
}
5 Message服务
(1) 数据表
msg_chat聊天消息表, 持久化存放全部消息(无论在线离线)
id | 自增, uint64 |
---|---|
group_id | 分组id, 两人聊天: A_B, 多人聊天: A_timestamp |
msg_type | 枚举值(1系统, 2文本, 3文件, 4语音, 5链接) |
sender_id | 消息发送者的用户id |
content | 消息内容, 最多65535个字节 |
create_time | 创建时间 (服务器时间戳), 每次返回给前端的是Unix时间戳的毫秒 |
(2) 协议
syntax = "proto3";
option go_package = "./proto";
// 聊天消息结构
message ChatMsg {
int64 groupId=1; // 群id
int64 senderId=2; // 发送者uid
int64 type=3; // 消息类型 0文本, 1图片, 2视频, 3语音
string content=4; // 消息内容
string uuid=5; // 作用是去重
int64 createTime=6; // 创建时间
}
// 上传消息
message UploadRequest {
int64 groupId=1;
int64 senderId=2;
int64 type=3;
string content=4;
string uuid=5;
}
message UploadResponse {
string uuid=1;
int64 createTime=2; // 创建时间
}
// 拉取消息
message PullRequest {
string email=1; // 用户邮箱
string platform=2; // 设备信息
int64 groupId=3; // 要拉取哪个群的消息
}
message PullResponse {
repeated ChatMsg list=1;
}
service MessageClient {
rpc Upload(UploadRequest) returns(UploadResponse);
rpc Pull(PullRequest) returns(PullResponse);
}
6 Group服务
(1) 数据表
group | id | 字符串, 两人的为uid1_uid2, 多人的为uuid | | —- | —- | | name | 群名称(默认是 用户A昵称,用户B昵称) | | type | 群类型, 单聊1, 群聊2 | | status | 0表示无效(未同意), 1表示有效, 2表示无效(拉黑) | | config | 群配置, json
{
“notice”: “群公告”,
“owner”: 2,
“curNum”: 2,
} | | create_time | 创建时间 |group_user | id | 自增 | | —- | —- | | group_id | 群id (index) | | user_id | 用户id (index) | | alias_name | 用户对该群(或该用户)的备注名 | | create_time | 进群时间 |
redis缓存
“uid:platform:group_id”: last_msg_id
(2) 协议
syntax = "proto3";
option go_package = "./proto";
// 添加好友
message AddFriendRequest {
int64 fromUid=1;
int64 toUid=2;
}
message AddFriendResponse {
string groupId=1; // 返回创建的这个新群, 但是这个群还处于删除态, 不可发聊天消息
}
// 处理好友申请
message HandleFriendRequest {
string groupId=1;
bool isAgree=2;
}
message HandleFriendResponse {
string groupId=1;
}
// 获取群内的用户
message GroupUsersRequest {
string groupId=1;
}
message GroupUsersResponse {
repeated int64 list=1;
}
service GroupClient {
rpc AddFriend(AddFriendRequest) returns(AddFriendResponse);
rpc HandleFriend(HandleFriendRequest) returns(HandleFriendResponse);
rpc GroupUsers(GroupUsersRequest) returns(GroupUsersResponse);
}
3) invite
群内用户 可以 邀请用户进群
4) remove
群主 可以 把某用户移出群聊
5) exit
群内用户 可以自行退群, 群主会自动移交
7 前端技术难点
(1) 下拉刷新
### `pull_to_refresh` 下拉组件
lib/pages/category/widgets/news_page_list.dart
```dart
@override
Widget build(BuildContext context) {
super.build(context);
return GetX<CategoryController>(
init: controller,
builder: (controller) => SmartRefresher(
enablePullUp: true,
controller: controller.refreshController,
onRefresh: controller.onRefresh,
onLoading: controller.onLoading,
child: CustomScrollView(
slivers: [
SliverPadding(
padding: EdgeInsets.symmetric(
vertical: 0.w,
horizontal: 0.w,
),
sliver: SliverList(
delegate: SliverChildBuilderDelegate(
(content, index) {
var item = controller.state.newsList[index];
return newsListItem(item);
},
childCount: controller.state.newsList.length,
),
),
),
],
),
),
);
}
controller: controller.refreshController
上下拉控制器
onRefresh: controller.onRefresh
下拉刷新数据
onLoading: controller.onLoading
上拉载入数据
SliverChildBuilderDelegate
动态构建每一项, childCount
告诉组件一共有多少数据
controller
中写入业务
lib/pages/category/controller.dart
onRefresh
下拉刷新
void onRefresh() {
fetchNewsList(isRefresh: true).then((_) {
refreshController.refreshCompleted(resetFooterState: true);
}).catchError((_) {
refreshController.refreshFailed();
});
}
refreshController.refreshCompleted()
刷新完成
refreshController.refreshFailed()
刷新失败
onLoading
上拉载入
void onLoading() {
if (state.newsList.length < total) {
fetchNewsList().then((_) {
refreshController.loadComplete();
}).catchError((_) {
refreshController.loadFailed();
});
} else {
refreshController.loadNoData();
}
}
refreshController.loadComplete()
载入完成
refreshController.loadFailed()
载入失败
refreshController.loadNoData()
没有数据
fetch
所有数据
// 拉取数据
Future<void> fetchNewsList({bool isRefresh = false}) async {
var result = await NewsAPI.newsPageList(
params: NewsPageListRequestEntity(
categoryCode: categoryCode,
pageNum: curPage + 1,
pageSize: pageSize,
),
);
if (isRefresh == true) {
curPage = 1;
total = result.counts!;
state.newsList.clear();
} else {
curPage++;
}
state.newsList.addAll(result.items!);
}
state.newsList.addAll(result.items!);
合并 list
集合 RxList
封装的
dispose
记得释放
///dispose 释放内存
@override
void dispose() {
super.dispose();
// dispose 释放对象
refreshController.dispose();
}
refreshController.dispose()
这个业务中就是下拉控件了,还有视频播放器、文本框啥的控制器都要记得释放。
```
参考资料
后端:
- 极客时间专栏课《即时消息技术剖析与实战》
- Github (go-websocket)
- Github (micro-message-system)
- Github (Open-IM-Server)
- Github (gowebsocket)
- Github (gin-chat-demo)
- 哔哩哔哩-如何设计10亿流量的IM系统
- 飞书-从0到1再到N,探索亿级流量的IM架构演绎
- 52im-海量在线用户的移动端IM架构设计实践
- 消息“时序”与“一致性”为何这么难?
- 细聊分布式ID生成方法
- 如何设计一个亿级消息量的 IM 系统
前端:
- 仅界面: https://gitee.com/laogede/flutter_wechat(视频教程)
- 完整源码: https://github.com/CoderMikeHe/flutter_wechat (前端2)
- 仿微信录制音频开源库:https://github.com/yxwandroid/flutter_plugin_record
- 微信图库:https://github.com/fluttercandies/flutter_wechat_assets_picker
- youtube教程: https://www.youtube.com/watch?v=Da8vg2RIFxM&list=PL-BBzWII3MPEYXtI6HgqVHGaL6lb4sA6e&index=4