本文转自阿里云,全文分三篇:

  1. 架构篇
  2. 模型篇
  3. 实现

架构篇

前言

IM全称是『Instant Messaging』,中文名是即时通讯。在这个高度信息化的移动互联网时代,生活中IM类产品已经成为必备品,比较有名的如钉钉、微信、QQ等以IM为核心功能的产品。当然目前微信已经成长为一个生态型产品,但其核心功能还是IM。还有一些非以IM系统为核心的应用,最典型的如一些在线游戏、社交应用,IM也是其重要的功能模块。可以说,IM系统已经是任何一个带有社交属性的应用需要具备的基础功能,网络上对于这类系统的设计与实现的讨论也越来越多。
IM系统在互联网初期即存在,其基础技术架构在这十几年的发展中更新迭代多次,从早期的CS、P2P架构,到现在后台已经演变为一个复杂的分布式系统,涉及移动端、网络通信、协议、安全、存储和搜索等技术的方方面面。IM系统中最核心的部分是消息系统,消息系统中最核心的功能是消息的同步、存储和检索:

  • 消息的同步:将消息完整的、快速的从发送方传递到接收方,就是消息的同步。消息同步系统最重要的衡量指标就是消息传递的实时性、完整性以及能支撑的消息规模。从功能上来说,一般至少要支持在线和离线推送,高级的IM系统还支持『多端同步』。
  • 消息的存储:消息存储即消息的持久化保存,传统消息系统通常只能支持消息在接收端的本地存储,数据基本不具备可靠性。现代消息系统能支持消息在服务端的在线存储,功能上对应的就是『消息漫游』,消息漫游的好处是可以实现账号在任意端登陆查看所有历史消息。
  • 消息的检索:消息一般是文本,所以支持全文检索也是必备的能力之一。传统消息系统通常来说也是只能支持消息的本地检索,基于本地存储的消息数据来构建。而现在消息系统在能支持消息的在线存储后,也具备了消息的『在线检索』能力。

本篇文章内容主要涉及IM系统中的消息系统架构,会介绍一种基于阿里云表格存储Tablestore的Timeline模型构建的消息系统。基于Tablestore Timeline构建的现代消息系统,能够同时支持消息系统的众多高级特性,包括『多端同步』、『消息漫游』和『在线检索』。在性能和规模上,能够做到全量消息云端存储和索引,百万TPS写入以及毫秒级延迟的消息同步和检索能力。
之后我们会继续发表两篇文章,来更详细介绍Tablestore Timeline模型概念及使用:

  • 模型篇:详细介绍Tablestore Timeline模型的基本概念和基础数据结构,并结合IM系统进行基本的建模。
  • 实现篇:会基于Tablestore Timeline实现一个具备『多端同步』、『消息漫游』和『在线检索』这些高级功能的简易IM系统,并共享我们的源代码。

    传统架构 vs 现代架构

    即时通讯IM架构 - 图1
    传统架构下,消息是先同步后存储。对于在线的用户,消息会直接实时同步到在线的接收方,消息同步成功后,并不会在服务端持久化。而对于离线的用户或者消息无法实时同步成功时,消息会持久化到离线库,当接收方重新连接后,会从离线库拉取所有未读消息。当离线库中的消息成功同步到接收方后,消息会从离线库中删除。传统的消息系统,服务端的主要工作是维护发送方和接收方的连接状态,并提供在线消息同步和离线消息缓存的能力,保证消息一定能够从发送方传递到接收方。服务端不会对消息进行持久化,所以也无法支持消息漫游。消息的持久化存储及索引同样只能在接收端本地实现,数据可靠性极低。
    现代架构下,消息是先存储后同步。先存储后同步的好处是,如果接收方确认接收到了消息,那这条消息一定是已经在云端保存了。并且消息会有两个库来保存,一个是消息存储库,用于全量保存所有会话的消息,主要用于支持消息漫游。另一个是消息同步库,主要用于接收方的多端同步。消息从发送方发出后,经过服务端转发,服务端会先将消息保存到消息存储库,后保存到消息同步库。完成消息的持久化保存后,对于在线的接收方,会直接选择在线推送。但在线推送并不是一个必须路径,只是一个更优的消息传递路径。对于在线推送失败或者离线的接收方,会有另外一个统一的消息同步方式。接收方会主动的向服务端拉取所有未同步消息,但接收方何时来同步以及会在哪些端来同步消息对服务端来说是未知的,所以要求服务端必须保存所有需要同步到接收方的消息,这是消息同步库的主要作用。对于新的同步设备,会有消息漫游的需求,这是消息存储库的主要作用,在消息存储库中,可以拉取任意会话的全量历史消息。消息检索的实现依赖于对消息存储库内消息的索引,通常是一个近实时(NRT,near real time)的索引构建过程,这个索引同样是在线的。
    以上就是传统架构和现代架构的一个简单的对比,现代架构上整个消息的同步、存储和索引流程,并没有变复杂太多。现代架构的实现本质上是把传统架构内本地存储和索引都搬到云上,最大挑战是需要集中管理全量消息的存储和索引,带来的好处是能实现多端同步消息漫游以及在线检索。可以看到现代架构中最核心的就是两个消息库『消息同步库』和『消息存储库』,以及对『消息存储库』的『消息索引』的实现,接下来我们逐步拆解这几个核心的设计和实现。

    基础模型

    在深入讲解消息系统的设计和实现之前,需要对消息系统内的几个基本概念和基础模型有一个理解。网上分析的很多的不同类型的消息系统实现,实现差异上主要在消息同步和存储的方案上,在消息的数据模型上其实有很大的共性。围绕数据同步模型的讨论主要在『读扩散』、『写扩散』和『混合模式』这三种方案,目前还没有更多的选择。而对于数据模型的抽象,还没有一个标准的定义。
    本章节会介绍下表格存储Tablestore提出的Timeline模型,这是一个对消息系统内消息模型的一个抽象,能简化和更好的让开发者理解消息系统内的消息同步和存储模型,基于此模型我们会再深入探讨消息的同步和存储的选择和实现。

    Timeline模型

    Timeline是一个对消息抽象的逻辑模型,该模型会帮助我们简化对消息同步和存储模型的理解,而消息同步库和存储库的设计和实现也是围绕Timeline的特性和需求来展开。
    即时通讯IM架构 - 图2
    如图是Timeline模型的一个抽象表述,Timeline可以简单理解为是一个消息队列,但这个消息队列有如下特性:

  • 每条消息对应一个顺序ID:每个消息拥有一个唯一的顺序ID(SequenceId),队列消息按SequenceId排序。

  • 新消息写入能自动分配递增的顺序ID,保证永远插入队尾:Timeline中是根据同步位点也就是顺序ID来同步消息,所以需要保证新写入的消息数据的顺序ID绝对不能比已同步的消息的顺序ID还小,否则会导致数据漏同步,所以需要支持对新写入的数据自动分配比当前已存储的所有消息的顺序ID更大的顺序ID。
  • 新消息写入也能自定义顺序ID,满足自定义排序需求:上面提到的自动分配顺序ID,主要是为了满足消息同步的需求,消息同步要求消息是根据『已同步』或是『已写入』的顺序来排序。而消息的存储,通常要求消息能根据会话顺序来排序,会话顺序通常由端的会话来决定,而不是服务端的同步顺序来定,这是两种顺序要求。
  • 支持根据顺序ID的随机定位:可根据SequenceId随机定位到Timeline中的某个位置,从这个位置开始正序或逆序的读取消息,也可支持读取指定顺序ID的某条消息。
  • 支持对消息的自定义索引:消息体内数据根据业务不同会包含不同的字段,Timeline需要支持对不同字段的自定义索引,来支持对消息内容的全文索引,或者是任意字段的灵活条件组合查询。

消息同步可以基于Timeline很简单的实现,图中的例子中,消息发送方是A,消息接收方是B,同时B存在多个接收端,分别是B1、B2和B3。A向B发送消息,消息需要同步到B的多个端,待同步的消息通过一个Timeline来进行交换。A向B发送的所有消息,都会保存在这个Timeline中,B的每个接收端都是独立的从这个Timeline中拉取消息。每个接收端同步完毕后,都会在本地记录下最新同步到的消息的SequenceId,即最新的一个位点,作为下次消息同步的起始位点。服务端不会保存各个端的同步状态,各个端均可以在任意时间从任意点开始拉取消息。
消息存储也是基于Timeline实现,和消息同步唯一的区别是,消息存储要求服务端能够对Timeline内的所有数据进行持久化,并且消息采用会话顺序来保存,需要自定义顺序ID。
消息检索基于Timeline提供的消息索引来实现,能支持比较灵活的多字段索引,根据业务的不同可有自由度较高的定制。

消息存储模型

即时通讯IM架构 - 图3
如图是基于Timeline的消息存储模型,消息存储要求每个会话都对应一个独立的Timeline。如图例子所示,A与B/C/D/E/F均发生了会话,每个会话对应一个独立的Timeline,每个Timeline内存有这个会话中的所有消息,消息根据会话顺序排序,服务端会对每个Timeline进行持久化存储,也就拥有了消息漫游的能力。

消息同步模型

消息同步模型会比消息存储模型稍复杂一些,消息的同步一般有读扩散(也叫拉模式)和写扩散(也叫推模式)两种不同的方式,分别对应不同的Timeline物理模型。
即时通讯IM架构 - 图4
如图是读扩散和写扩散两种不同同步模式下对应的不同的Timeline模型,按图中的示例,A作为消息接收者,其与B/C/D/E/F发生了会话,每个会话中的新的消息都需要同步到A的某个端,看下读扩散和写扩散两种模式下消息如何做同步。

  • 读扩散:消息存储模型中,每个会话的Timeline中保存了这个会话的全量消息。读扩散的消息同步模式下,每个会话中产生的新的消息,只需要写一次到其用于存储的Timeline中,接收端从这个Timeline中拉取新的消息。优点是消息只需要写一次,相比写扩散的模式,能够大大降低消息写入次数,特别是在群消息这种场景下。但其缺点也比较明显,接收端去同步消息的逻辑会相对复杂和低效。接收端需要对每个会话都拉取一次才能获取全部消息,读被大大的放大,并且会产生很多无效的读,因为并不是每个会话都会有新消息产生。
  • 写扩散:写扩散的消息同步模式,需要有一个额外的Timeline来专门用于消息同步,通常是每个接收端都会拥有一个独立的同步Timeline(或者叫收件箱),用于存放需要向这个接收端同步的所有消息。每个会话中的消息,会产生多次写,除了写入用于消息存储的会话Timeline,还需要写入需要同步到的接收端的同步Timeline。在个人与个人的会话中,消息会被额外写两次,除了写入这个会话的存储Timeline,还需要写入参与这个会话的两个接收者的同步Timeline。而在群这个场景下,写入会被更加的放大,如果这个群拥有N个参与者,那每条消息都需要额外的写N次。写扩散同步模式的优点是,在接收端消息同步逻辑会非常简单,只需要从其同步Timeline中读取一次即可,大大降低了消息同步所需的读的压力。其缺点就是消息写入会被放大,特别是针对群这种场景。
    Timeline模型不会对选择读扩散还是写扩散做约束,而是能同时支持两种模式,因为本质上两种模式的逻辑数据模型并无差别,只是消息数据是用一个Timeline来支持多端读还是复制到多个Timeline来支持多端读的问题。

针对IM这种应用场景,消息系统通常会选择写扩散这种消息同步模式。IM场景下,一条消息只会产生一次,但是会被读取多次,是典型的读多写少的场景,消息的读写比例大概是10:1。若使用读扩散同步模式,整个系统的读写比例会被放大到100:1。一个优化的好的系统,必须从设计上去平衡这种读写压力,避免读或写任意一维触碰到天花板。所以IM系统这类场景下,通常会应用写扩散这种同步模式,来平衡读和写,将100:1的读写比例平衡到30:30。当然写扩散这种同步模式,还需要处理一些极端场景,例如万人大群。针对这种极端写扩散的场景,会退化到使用读扩散。一个简单的IM系统,通常会在产品层面限制这种大群的存在,而对于一个高级的IM系统,会采用读写扩散混合的同步模式,来满足这类产品的需求。采用混合模式,会根据数据的不同类型和不同的读写负载,来决定用写扩散还是读扩散。

典型架构设计

即时通讯IM架构 - 图5
如图是一个典型的消息系统架构,架构中包含几个重要组件:

  • :作为消息的发送和接收端,通过连接消息服务器来发送和接收消息。
  • 消息服务器:一组无状态的服务器,可水平扩展,处理消息的发送和接收请求,连接后端消息系统。
  • 消息队列:新写入消息的缓冲队列,消息系统的前置消息存储,用于削峰填谷以及异步消费。
  • 消息处理:一组无状态的消费处理服务器,用于异步消费消息队列中的消息数据,处理消息的持久化和写扩散同步。
  • 消息存储和索引库:持久化存储消息,每个会话对应一个Timeline进行消息存储,存储的消息建立索引来实现消息检索。
  • 消息同步库:写扩散形式同步消息,每个用户的收件箱对应一个Timeline,同步库内消息不需要永久保存,通常对消息设定一个生命周期。
    新消息会由端发出,通常消息体中会携带消息ID(用于去重)、逻辑时间戳(用于排序)、消息类型(控制消息、图片消息或者文本消息等)、消息体等内容。消息会先写入消息队列,作为底层存储的一个临时缓冲区。消息队列中的消息会由消息处理服务器消费,可以允许乱序消费。消息处理服务器对消息先存储后同步,先写入发件箱Timeline(存储库),后写扩散至各个接收端的收件箱(同步库)。消息数据写入存储库后,会被近实时的构建索引,索引包括文本消息的全文索引以及多字段索引(发送方、消息类型等)。

对于在线的设备,可以由消息服务器主动推送至在线设备端。对于离线设备,登录后会主动向服务端同步消息。每个设备会在本地保留有最新一条消息的顺序ID,向服务端同步该顺序ID后的所有消息。

总结

本篇文章主要介绍了现代IM系统中消息系统所需要具备的能力,对比了传统架构和现代架构。为方便接下来的深入探讨,介绍了表格存储Tablestore推出的Timeline模型,以及在IM系统中消息存储和消息同步模型的基本概念和策略,最后介绍了一个典型的架构设计。


模型篇

前言

架构篇中我们介绍了现代IM消息系统的架构,介绍了Timeline的抽象模型以及基于Timeline模型构建的一个支持『消息漫游』、『多端同步』和『消息检索』多种高级功能的消息系统的典型架构。架构篇中为了简化读者对Tablestore Timeline模型的理解,概要性的对Timeline的基本逻辑模型做了介绍,以及对消息系统中消息的多种同步模式、存储和索引的基本概念做了一个科普。
本篇文章是对架构篇的一个补充,会对Tablestore的Timeline模型做一个非常详尽的解读,让读者能够深入到实现层面了解Timeline的基本功能以及核心组件。最后我们还是会基于IM消息系统这个场景,来看如何基于Tablestore Timeline实现IM场景下消息同步、存储和索引等基本功能。

Timeline模型

Timeline模型以『简单』为设计目标,核心模块构成比较清晰明了,主要包括:

  • Store:Timeline存储库,类似数据库的表的概念。
  • Identifier:用于区分Timeline的唯一标识。
  • Meta:用于描述Timeline的元数据,元数据描述采用free-schema结构,可自由包含任意列。
  • Queue:一个Timeline内所有Message存储在Queue内。
  • Message:Timeline内传递的消息体,也是一个free-schema的结构,可自由包含任意列。
  • Index:包含Meta Index和Message Index,可对Meta或Message内的任意列自定义索引,提供灵活的多条件组合查询和搜索。

    Timeline Store

    即时通讯IM架构 - 图6
    Timeline Store是Timeline的存储库,对应于数据库内表的概念。上图是Timeline Store的结构图,Store内会存储所有的Timeline数据。Timeline是一个面向海量消息的数据模型,同时用于消息存储库和同步库,需要满足多种要求:

  • 支撑海量数据存储:对于消息存储库来说,如果需要消息永久存储,则随着时间的积累,数据规模会越来越大,需要存储库能应对长时间积累的海量消息数据存储,需要能达到PB级容量。

  • 低存储成本:消息数据的冷热区分是很明显的,大部分查询都会集中在热数据,所以对于冷数据需要有一个比较低成本的存储方式,否则随着时间的积累数据量不断膨胀,存储成本会非常大。
  • 数据生命周期管理:不管是对于消息数据的存储还是同步,数据都需要定义生命周期。存储库是用于在线存储消息数据本身,通常需要设定一个较长周期的保存时间。而同步库是用于写扩散模式的在线或离线推送,通常设定一个较短的保存时间。
  • 极高的写入吞吐:各类场景下的消息系统,除了类似微博、头条这种类型的Feeds流系统,像绝大部分即时通讯或朋友圈这类消息场景,通常是采用写扩散的消息同步模式,写扩散要求底层存储具备极高的写入吞吐能力,以应对消息洪峰。
  • 低延迟的读:消息系统通常是应用在在线场景,所以对于查询要求低延迟。


    Tablestore Timeline的底层是基于LSM存储引擎的分布式数据库,LSM的最大优势就是对写入非常友好,天然适合消息写扩散的模式。同时对查询也做了极大优化,例如热数据进缓存、bloom filter等等。数据表采用Range Partition的分区模式,能提供水平扩展的服务能力,以及能自动探测并处理热点分区的负载均衡策略。为了满足同步库和存储库对存储的不同要求,也提供了一些灵活的自定义配置,主要包括:

  • Time to live(数据生命周期):可自定义数据生命周期,例如永久保存,或者保存N天。

  • Storage type(存储类型):自定义存储类型,对存储库来说,HDD是最好的选择,对同步库来说,SSD是最好的选择。

    Timeline Module

    即时通讯IM架构 - 图7
    Timeline Store内能存储海量的Timeline,单个Timeline的详细结构图如上,可以看到Timeline主要包含了三大部分:

  • Timeline Meta:元数据部分,用于描述Timeline,包括:

    • Identifier:用于唯一标识Timeline,可包含多个字段。
    • Meta:用于描述Timeline的元数据,可包含任意个数任意类型的字段。
    • Meta Index:元数据索引,可对元数据内任意属性列建索引,支持多字段条件组合查询和检索。
  • Timeline Queue:用于存储和同步消息的队列,队列中元素由两部分组成:
    • Sequence Id:顺序ID,队列中用于定位Message的位点信息,在队列中顺序ID保持递增。
    • Message:队列中承载消息的实体,包含了消息的完整内容。
  • Timeline Data:Timeline的数据部分就是Message,Message主要包含:
    • Message:消息实体,其内部也可以包含任意数量任意类型字段。
    • Message Index:消息数据索引,可对消息实体内任意列做索引,支持多字段条件组合查询和检索。

IM消息系统建模

即时通讯IM架构 - 图8
以一个简易版IM系统为例,来看如何基于Tablestore Timeline模型建模。按照上图中的例子,存在A、B、C三个用户,A与B发生单聊,A与C发生单聊,以及A、B、C组成一个群聊,来看下在这个场景下消息同步、存储以及读写流程分别如何基于Tablestore Timeline建模。

消息同步模型
消息同步选择写扩散模型,能完全利用Tablestore Timeline的优势,以及针对IM消息场景读多写少的特性,通过写扩散来平衡读写,均衡整个系统的资源。写扩散模型下,每个接收消息的个体均拥有一个收件箱,所有需要同步至该个体的消息需要投递到其收件箱内。图上例子中,A、B、C三个用户分别拥有收件箱,每个用户不同的设备端,均从同一个收件箱内拉取新消息。

消息同步库
收件箱存储在同步库内,同步库中每个收件箱对应一个Timeline。根据图上的例子,总共存在3个Timeline作为收件箱。每个消息接收端保存有本地最新拉取的消息的SequenceID,每次拉取新消息均是从该SequenceID开始拉取消息。对同步库的查询会比较频繁,通常是对最新消息的查询,所以要求热数据尽量缓存在内存中,能提供高并发低延迟的查询。所以对同步库的配置,一般是需要SSD存储。消息如果已经同步到了所有的终端,则代表收件箱内的该消息已经被消费完毕,理论上可以清理。但设计上来说不做主动清理,而是给数据定义一个较短的生命周期来自动过期,一般定义为一周或者两周。数据过期之后,如果仍要同步拉取新消息,则需要退化到读扩散的模式,从存储库中拉取消息。

消息存储库
消息存储库中保存有每个会话的消息,每个会话的发件箱对应一个Timeline。发件箱内的消息支持按会话维度拉取消息,例如浏览某个会话内的历史消息则通过读取发件箱完成。一般来说,新消息通过在线推送或者查询同步库可投递到各个接收端,所以对存储库的查询会相对来说较少。而存储库用于长期存储消息,例如永久存储,相对同步库来说数据量会较大。所以存储库的选择一般是HDD,数据生命周期根据消息需要保存的时间来定,通常是一个较长的时间。

消息索引库
消息索引库依附于存储库,使用了Timeline的Message Index,可以对存储库内的消息进行索引,例如对文本内容的全文索引、收件人、发件人以及发送时间的索引等,能支持全文检索等高级查询和搜索。

总结

本篇文章主要对Tablestore Timeline模型进行了详解,介绍了Timeline各模块包括Store、Meta、Queue、Data和Index等,最后以一个简单的IM场景举例如何基于Timeline来建模。在下一篇实现篇中,会直接基于Tablestore Timeline来实现一个简易版的支持单聊、群聊、元数据管理以及消息检索的IM系统,敬请期待。欢迎加入我们的技术交流群(钉钉:11789671),来与我们一起探讨。


实现篇

概述

生活中最常见的即时聊天类软件如:钉钉、微信等,都可以描述为:实现了即时通讯能力的聊天工具。其中聊天会话可分为两大类,分别是:单聊、群聊(公众号类似单聊)。这里我们以钉钉(Ding Talk)的功能为参照,详细说明相应的功能基于Tablestore的Timeline模型如何实现。如:新消息提醒,未读消息数统计,查看会话中更久的聊天内容,群名模糊检索,关键字查询历史记录,以及多客户端同步等。让用户在实现方案上有更清晰的认识,对模型的抽象概念、接口有更好的理解。
下面会按照聊天系统的功能模块分段,分别介绍每一部分的功能、方案介绍、表设计以及实现代码等。功能模块主要分为:消息存储、关系维护、即时感知、多端同步。

功能模块

消息存储

消息系统中,消息存储是最基本的功能。对于消息存储(提供消息的读、写、持久化),一方面需要持久化写入,保证消息数据的不丢失,另一方面,适合用户的快速、高效查询。在IM场景中,写入方式通常是单行、批量写入,而读取需要按照消息队列范围读取。有时用户还有对于历史消息的模糊查询需求,这时就需要使用多维检索、全文检索的能力。
消息的存储都是基于Timeline模型,具体模型见文章《Tablestore发布Timeline 2.0模型》。样例中,消息数据的表结构见下图:
表设计:im_timeline_store_table

即时通讯IM架构 - 图9

存储库

功能:会话窗口消息展示

存储库是聊天会话消息所对应的存储表,消息以会话分类存储,每个会话是一个消息队列。单个消息队列(TimelineQueue)通过timelineId唯一标识,所有消息基于sequenceId有序排列。消息体中含有发送人、消息id(消息去重)、消息发送时间、消息体内容、消息类型(类型包含图片、文件、普通文本,本文仅适用文本)等。

即时通讯IM架构 - 图10

图片来自公开交流群截图
如上图,当用户点击某一个会话时,窗口会展示相应会话的最新一页消息。图片里的消息都是从存储库拉取的,通过timelineId获取该会话的Queue实例,然后调用Queue的scan接口与ScanParam参数(sequenceId范围+倒序)拉取最新的一页消息。当用户向上滚动,展示完这一页消息后,客户端会基于第一次请求的最小sequencId发起第二次请求,获取第二页消息记录,单页消息数通常选择20-30条。会话的消息可以选择在客户端持久化,然后在感知到新消息之后更新本地消息,增加缓存减少网络IO。
核心代码

  1. public List<AppMessage> fetchConversationMessage(String timelineId, long sequenceId) {
  2. TimelineStore store = timelineV2.getTimelineStoreTableInstance();
  3. TimelineIdentifier identifier = new TimelineIdentifier.Builder()
  4. .addField("timeline_id", timelineId)
  5. .build();
  6. ScanParameter parameter = new ScanParameter()
  7. .scanBackward(sequenceId)
  8. .maxCount(30);
  9. Iterator<TimelineEntry> iterator = store.createTimelineQueue(identifier).scan(parameter);
  10. List<AppMessage> appMessages = new LinkedList<AppMessage>();
  11. while (iterator.hasNext() && counter++ <= 30) {
  12. TimelineEntry timelineEntry = iterator.next();
  13. AppMessage appMessage = new AppMessage(timelineId, timelineEntry);
  14. appMessages.add(appMessage);
  15. }
  16. return appMessages;
  17. }

即时通讯IM架构 - 图11
存储库的消息需要永久保存,是整个应用的全量消息存储。存储库数据过期时间(TTL)需要设为-1。

功能:多维组合、全文检索

全文检索能力就是对存储库的消息内容做模糊查询,因而需要对存储库的数据建立多元索引。具体索引字段,需要根据设计需求设计。如钉钉公开群的检索,需要对群ID、消息发送人、消息类型、消息内容、以及时间建立索引,其中消息内容需要使用分词字符串类型,从而提供模糊查询的能力。
即时通讯IM架构 - 图12
核心代码

  1. public List<AppMessage> fetchConversationMessage(String timelineId, long sequenceId) {
  2. TimelineStore store = timelineV2.getTimelineStoreTableInstance();
  3. TimelineIdentifier identifier = new TimelineIdentifier.Builder()
  4. .addField("timeline_id", timelineId)
  5. .build();
  6. ScanParameter parameter = new ScanParameter()
  7. .scanBackward(sequenceId)
  8. .maxCount(30);
  9. Iterator<TimelineEntry> iterator = store.createTimelineQueue(identifier).scan(parameter);
  10. List<AppMessage> appMessages = new LinkedList<AppMessage>();
  11. int counter = 0;
  12. while (iterator.hasNext() && counter++ <= 30) {
  13. TimelineEntry timelineEntry = iterator.next();
  14. AppMessage appMessage = new AppMessage(timelineId, timelineEntry);
  15. appMessages.add(appMessage);
  16. }
  17. return appMessages;
  18. }

即时通讯IM架构 - 图13
另外,为了做消息的权限管理,仅允许用户检索自己有权限查看的消息,可在消息体字段中扩展接收人ID数组,这样对所有群做检索时,需要增加接收人字段为自己的用户ID这一必要条件,即可实现消息内容的权限限制。 样例中没有实现这一功能,用户可根据需求自己增加、修改。

同步库

功能:新消息即时统计

当客户端在线时,应用的系统服务会维护客户端的长连接,因而可以感知客户端在线。当用户的同步库有新消息写入时(即有新消息),应用会发出信号通知客户端有新消息,然后客户端会基于同步库checkpoint点,拉取同步库中该sequenceId之后的所有新消息,统计各会话的新消息数,并更新checkpoint点。

即时通讯IM架构 - 图14
如上图,对于一个在线客户端,每个会话都会维护一个未读消息的计数(小红点),也会有一个总未读数的计数,这个数量一般会存储在客户端本地,或者通过redis持久化。这些未读消息,指的就是通过同步库拉取并统计过,但是还未被用户点开的消息数量。在拉取到新消息列表后,客户端(或应用层)会遍历所有新消息,然后将新消息所对应会话的未读计数累加1,这样实现了未读消息的即时感知与更新。只有当用户点开会话后,会话的未读计数才会清零。
在更新未读数的同时,会话列表中还会有最新消息的简短摘要信息以及最新消息的发送时间等。这些可以在遍历新消息列表时不断更新。这些统计、摘要都是依托同步库,而非存储库实现的。
核心代码

  1. public List<AppMessage> fetchSyncMessage(String userId, long lastSequenceId) {
  2. TimelineStore sync = timelineV2.getTimelineSyncTableInstance();
  3. TimelineIdentifier identifier = new TimelineIdentifier.Builder()
  4. .addField("timeline_id", userId)
  5. .build();
  6. ScanParameter parameter = new ScanParameter()
  7. .scanForward(lastSequenceId)
  8. .maxCount(30);
  9. Iterator<TimelineEntry> iterator = sync.createTimelineQueue(identifier).scan(parameter);
  10. List<AppMessage> appMessages = new LinkedList<AppMessage>();
  11. int counter = 0;
  12. while (iterator.hasNext() && counter++ <= 30) {
  13. AppMessage appMessage = new AppMessage(userId, iterator.next());
  14. appMessages.add(appMessage);
  15. }
  16. return appMessages;
  17. }

在统计到会话列表中不存在的会话时,客户端会做一次额外请求。通过timelineID获取会话的基本描述信息,如群头像或好友的头像、群名称等,并初始化未读数计时器0,然后累加新消息数、更新最新消息摘要等。
即时通讯IM架构 - 图15
同步库对于IM场景下的新消息即时感知统计这一核心功能,就是通过写入冗余的方式,提升新消息读取统计的效率与速度。对于IM场景没有收件箱的概念,因而同步库中冗余消息并没有永久保存的价值,提供7天过期时间已经足够保证功能正常。用户可以根据自身需求,调整同步库的数据过期时间(TTL)。

功能:异步写扩散

在本文的样例中,单聊会话的消息在写完存储库后同时写入了同步库,只有两行的写入开销很小。但是对于群会话,写完存储库后要获取群用户列表,然后依次写入相应用户的同步库。这种方式在群少、用户少时不会有问题,但随着用户体量、活跃度的增加,同步的写的方式就会面临性能问题,因此建议用户对群写扩散使用异步任务实现。
用户可以基于表格存储实现一个任务队列,将写扩散任务写入队列中后直接返回,然后由其他进程保证任务队列的执行。任务队列保存了群ID、消息的完整信息,消费进程不断轮询读取新任务,获取任务后,才会从群关系表中获取完整的群成员列表,并做相应的写扩散。
任务队列可以直接基于Tablestore实现,表设计为两列主键,第一列为topic,第二列为自增列,一个topic对应一个队列,任务会被有序写入单个队列中。当并发量持续膨胀后,可对任务做hash分桶,随机写入多个topic。这样可以增加消费者数量(消费并发量),提升写扩散效率。对应任务队列消费,用户只需要维护每个topic的checkpoint点。checkpoint点之前的为已完成任务,通过getRange的方式顺序获取checkpoint点之后未执行的新任务,保证任务的执行。失败的任务可以重新写入任务队列来提升容错,并增加重试计数。出现多次失败后放弃重写,然后将该任务写入特殊的问题队列,方便应用的开发者们查询、定位问题。

元数据管理

所谓元数据,就是描述数据的数据。在这里主要体现为两类:用户元数据、会话元数据。这里群的元数据信息:群ID(复用群的timelineId)、群名称、创建时间等信息,可以直接基于timelineMeta的管理表完成实现,所有Group类型的TimelineMeta可以映射为一个Group。但是用户的元数据却不能复用TimelineMeta,所以需要单独的表实现。

用户元数据

即用户的属性信息,通过用户ID识别特定用户。在上面提到的用户关系中,通过用户的标识ID确认用户身份,但用户的属性信息,如:性别、签名、头像等信息,还是需要单独维护。因此需要单独维护。
表设计:im_user_table

即时通讯IM架构 - 图16
用户元数据以user_id为标识,与同步库中的timeline_id一一对应。用户同步新消息时,只会拉取同步库中自己对应的单个消息队列(TimelineQueue)。因此,为了唯一ID的方便管理,我们可以选择user_id与用户同步库的timeline_id使用同一个值。这样一来,在消息写扩散时,只需知道群内用户的user_id列表回好友user_id,即可以完成写扩算。

功能:用户检索

对于用户,添加好友的需求有很多种,这里我们只需要维护用户表,并且创建多元索引,即可轻松实现。样例中没有实现,用户可以根据自己需求配置不同的索引字段设置,这里我们仅简单分析一下需求:

  • 通过用户ID:主键查询;
  • 二维码(含用户ID信息):主键查询;
  • 用户姓名:多元索引,用户名字段设置分词字符串;
  • 用户标签:多元索引,数组字符串索引提供签检索、嵌套索引提供多标签打分检索排序;
  • 附近的人:多元索引,GEO索引查询附近、特定地理围栏的人;

详细的多元索引功能,用户可参看官网文档:多元索引

会话元数据

即会话的属性信息,通过唯一会话ID识别特定会话,属性信息会包含:会话类别(群、单聊、公众号等)、群名称、公告、创建时间等。同时,通过群名称模糊查找群,也会是会话元数需要的重要能力。
在Timeline模型中,提供了Timeline Meta的管理能力,只需通过相应的接口便可实现会话meta的管理。

即时通讯IM架构 - 图17
存储库中管理的是会话的消息队列(TimelineQueue),这里与会话元数据中的行一一对应。客户端用户选中特定会话后,应用从相应的消息队列倒序批量拉取消息展示到客户端,群聊单聊的使用方式一样,因而并不做会话类型的区分。

功能:群检索

用户如果有加入群的需求,首先需要查询到特定的群。查询群的方式与用户查询方式类似,功能也可以做相同的实现。用户可以根据自己需求定制不同的索引字段设置,需求实现方式如下:

  • 群ID:主键查询;
  • 二维码(含用户ID信息):主键查询;
  • 群名:多元索引,用户名字段设置分词字符串;
  • 群标签:多元索引,数组字符串索引提供签检索、嵌套索引提供多标签打分检索排序;

注:会话元数据可以直接维护单聊会话与人的映射关系。对于单聊的meta增加一列users字段,存放两个用户ID,这样不用额外维护关系表(基于单聊关系表im_user_relation_table创建timeline_id为第一列主键的二级索引)。

关系维护

完成了元数据管理以及用户和群的检索,剩下的就是如何添加好友、加入群聊了。这里就涉及到IM体统中另一个重要的功能点。关系维护包含:人与人的关系、人与群的关系以及人与会话的。下面我们介绍如何基于Tablestore解决这一关系维护的需求。

单聊关系

功能:人与单聊会话的关系

单聊场景下,参与者仅有两个人,同时不考虑顺序。无论是我联系小明或是小明联系我,对应的会话必须有且仅有一个。如果使用表格存储维护这个关系,建议用如下的设计方式。
第一列为主用户ID、第二列为次用户ID,在两个人成为好友后,关系表中需要插入两行数据,分别以自己的用户ID为main_user,以好友的用户ID为sub_user,然后将共同的会话timline_id作为属性列,并且可以维护相互之间不同的昵称、显示。
表设计:im_user_relation_table

即时通讯IM架构 - 图18
基于该单聊关系表,还可以建立多元索引,方便用户好友列表的获取,同时支持加好友时间排序、昵称排序等功能。如果考虑到延时、费用等因素,即时使用多元索引,直接通过getRange接口也可以快速拉、高效的获取自己所有好友列表,实现好友关系的维护与查询。

功能:人与人的关系

借助以上表,人与人的关系可以很简单实现,比如我判断我与小明的好友关系,直接通过单行查询知道我们的好友关系是否存在,如果存在就不会展示加好友按钮。而如果非好友,这是完成好友添加后,写入两行不同主键顺序行,并生成一个唯一的timelineId即可。这个设计的好处在于用户可以直接通过自己的ID与好友的ID快速获取会话信息。只要用户在写入两行时做好一致性维护。
如果好友关系一旦解除,可以直接拼出关系表中两行主键对用户关系,通过做物理删除(删除行)或逻辑删除(属性列状态修改)结束两两个人的好友关系即可;
核心代码

  1. public void establishFriendship(String userA, String userB, String timelineId) {
  2. PrimaryKey primaryKeyA = PrimaryKeyBuilder.createPrimaryKeyBuilder()
  3. .addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString(userA))
  4. .addPrimaryKeyColumn("sub_user", PrimaryKeyValue.fromString(userB))
  5. .build();
  6. RowPutChange rowPutChangeA = new RowPutChange(userRelationTable, primaryKeyA);
  7. rowPutChangeA.addColumn("timeline_id", ColumnValue.fromString(timelineId));
  8. PrimaryKey primaryKeyB = PrimaryKeyBuilder.createPrimaryKeyBuilder()
  9. .addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString(userB))
  10. .addPrimaryKeyColumn("sub_user", PrimaryKeyValue.fromString(userA))
  11. .build();
  12. RowPutChange rowPutChangeB = new RowPutChange(userRelationTable, primaryKeyB);
  13. rowPutChangeB.addColumn("timeline_id", ColumnValue.fromString(timelineId));
  14. BatchWriteRowRequest request = new BatchWriteRowRequest();
  15. request.addRowChange(rowPutChangeA);
  16. request.addRowChange(rowPutChangeB);
  17. syncClient.batchWriteRow(request);
  18. }
  19. public void breakupFriendship(String userA, String userB) {
  20. PrimaryKey primaryKeyA = PrimaryKeyBuilder.createPrimaryKeyBuilder()
  21. .addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString(userA))
  22. .addPrimaryKeyColumn("sub_user", PrimaryKeyValue.fromString(userB))
  23. .build();
  24. RowDeleteChange rowPutChangeA = new RowDeleteChange(userRelationTable, primaryKeyA);
  25. PrimaryKey primaryKeyB = PrimaryKeyBuilder.createPrimaryKeyBuilder()
  26. .addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString(userB))
  27. .addPrimaryKeyColumn("sub_user", PrimaryKeyValue.fromString(userA))
  28. .build();
  29. RowDeleteChange rowPutChangeB = new RowDeleteChange(userRelationTable, primaryKeyB);
  30. BatchWriteRowRequest request = new BatchWriteRowRequest();
  31. request.addRowChange(rowPutChangeA);
  32. request.addRowChange(rowPutChangeB);
  33. syncClient.batchWriteRow(request);
  34. }

即时通讯IM架构 - 图19

群聊关系

功能:群聊会话与人的关系

群聊时,主要的查询需求还是获取当前群内用户的列表。一方面方便群属性的展示,另一方面为应用做写扩散提供快速获取收件人列表的查询。因而在表设计上,我们会建议用户使用两列主键:第一列为群ID,第二列为用户ID。通过这样的设计,可以直接给予getRange接口拉取群所有用户的信息。
群聊关系表解决了群到用户的映射关系,但我们还需要用户到群的映射关系。如果为了查询用户所在群的列表而新键一张表,冗余成本、一致性维护成本就很高。这里可以使用两种索引来解决反向的映射关系。样例中,我们使用了二级索引,将用户ID字段作为索引主键,从而可以直接基于索引查询单用户的群列表。同步实时性更好,成本更低。
当然用户也可以使用多元索引:对群、用户、入群时间做索引,可以查询到某用户的所有在群列表,并且基于入群时间排序。
表设计:im_group_relation_table

即时通讯IM架构 - 图20
基于群关系表,可以直接基于关系主表通过getRange的方式获取单个群内所有的用户。在做写扩散时,可以直接获取群内用户ID列表,提升写扩散的效率。同时,也方便展示群内用户列表。
核心代码

  1. public List<Conversation> listMySingleConversations(String userId) {
  2. PrimaryKey start = PrimaryKeyBuilder.createPrimaryKeyBuilder()
  3. .addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString(userId))
  4. .addPrimaryKeyColumn("sub_user", PrimaryKeyValue.INF_MIN)
  5. .build();
  6. PrimaryKey end = PrimaryKeyBuilder.createPrimaryKeyBuilder()
  7. .addPrimaryKeyColumn("main_user", PrimaryKeyValue.fromString(userId))
  8. .addPrimaryKeyColumn("sub_user", PrimaryKeyValue.INF_MAX)
  9. .build();
  10. RangeRowQueryCriteria criteria = new RangeRowQueryCriteria(userRelationTable);
  11. criteria.setInclusiveStartPrimaryKey(start);
  12. criteria.setExclusiveEndPrimaryKey(end);
  13. criteria.setMaxVersions(1);
  14. criteria.setLimit(100);
  15. criteria.setDirection(Direction.FORWARD);
  16. criteria.addColumnsToGet(new String[] {"timeline_id"});
  17. GetRangeRequest request = new GetRangeRequest(criteria);
  18. GetRangeResponse response = syncClient.getRange(request);
  19. List<Conversation> singleConversations = new ArrayList<Conversation>(response.getRows().size());
  20. for (Row row : response.getRows()) {
  21. String timelineId = row.getColumn("timeline_id").get(0).getValue().asString();
  22. String subUserId = row.getPrimaryKey().getPrimaryKeyColumn("sub_user").getValue().asString();
  23. User friend = describeUser(subUserId);
  24. Conversation conversation = new Conversation(timelineId, friend);
  25. singleConversations.add(conversation);
  26. }
  27. return singleConversations;
  28. }

即时通讯IM架构 - 图21

功能:人与群聊会话的关系

获取单用户所有加入群列表,可以基于主表创建二级索引,将用户字段设为索引的第一列主键。索引的数据结构见下图。这样基于二级索引,可以直接通过getRange的方式获取单用户加入的群的TimlineId列表。
二级索引:im_group_relation_global_index

即时通讯IM架构 - 图22
核心代码

  1. public List<Conversation> listMyGroupConversations(String userId) {
  2. PrimaryKey start = PrimaryKeyBuilder.createPrimaryKeyBuilder()
  3. .addPrimaryKeyColumn("user_id", PrimaryKeyValue.fromString(userId))
  4. .addPrimaryKeyColumn("group_id", PrimaryKeyValue.INF_MIN)
  5. .build();
  6. PrimaryKey end = PrimaryKeyBuilder.createPrimaryKeyBuilder()
  7. .addPrimaryKeyColumn("user_id", PrimaryKeyValue.fromString(userId))
  8. .addPrimaryKeyColumn("group_id", PrimaryKeyValue.INF_MAX)
  9. .build();
  10. RangeRowQueryCriteria criteria = new RangeRowQueryCriteria(groupRelationGlobalIndex);
  11. criteria.setInclusiveStartPrimaryKey(start);
  12. criteria.setExclusiveEndPrimaryKey(end);
  13. criteria.setMaxVersions(1);
  14. criteria.setLimit(100);
  15. criteria.setDirection(Direction.FORWARD);
  16. criteria.addColumnsToGet(new String[] {"group_id"});
  17. GetRangeRequest request = new GetRangeRequest(criteria);
  18. GetRangeResponse response = syncClient.getRange(request);
  19. List<Conversation> groupConversations = new ArrayList<Conversation>(response.getRows().size());
  20. for (Row row : response.getRows()) {
  21. String timelineId = row.getPrimaryKey().getPrimaryKeyColumn("group_id").getValue().asString();
  22. Group group = describeGroup(timelineId);
  23. Conversation conversation = new Conversation(timelineId, group);
  24. groupConversations.add(conversation);
  25. }
  26. return groupConversations;
  27. }

即时通讯IM架构 - 图23

即时感知

让客户端即时感知消息的实现方案,可以参考《Feed 流设计总纲》一文中会话池的维护方式,这里作简要描述,不会在样例中实现。

会话池方案

即时感知新消息正是IM(Instant Message)场景下核心所在。让客户端及时感知到新信息的到来,然后客户端接收到通知后才会从同步库中拉取更新的消息,让用户更快速、更及时地提醒用户阅读新消息。可是,接受者如何才能快速感知到自己有了新消息呢?
让在线的客户端周期性的刷新拉取?这样的方式毫无疑问可以满足需求,但伴随而来的是大量无效的网络资源浪费。同时应用的压力也会随着用户量的不断增长变得更沉重。而当白天大量非活跃用户在线时,压力更为明显。面对这一问题,应用通常会维护一个推送会话池。会话池记录了在线客户端与用户信息,当在线用户有新的消息写入,通过推送池获取该用户的会话,然后通知客户端拉取同步库新消息。这样同步消息的压力只会随着真实消息量而增长,避免了大量不必要的同步库查询请求。
实现会话推送池的方案很多,可以使用内存型数据库,也可以直接使用表格存储,同时保证会话推送池的持久化。
在即时感知上,最直观的就是会话表中变动的未读消息数统计了。统计新消息的实现方式上,已在本文的【消息存储 > 第二类:同步库 > 新消息即时统计】部分做了详尽描述,不理解的可返回去重新看一下。持久化未读消息数是很必要的,否则在更换设备或重新登录后。未读消息数被清零,将会忽略很多新消息提醒,这是我们不能接受的。

其他

多端同步

实现了以上功能,IM系统的基本需求已经完成。但实现多端数据同步上,还有两个注意事项。
其一,我们对于单客户端情况下,用户同步库做了一个checkpoint点的持久化,对应的概念是:“已读最新消息的sequenceId”。此时,checkpoint点无客户端的区分,如果使用本地做持久化,多端同步时就会出现问题,不同客户端统计的未读消息数就会不一致。这是需要通过应用服务端维护checkpoint点,同时会话的未读消息数也需要在应用服务侧维护,这样才能保证多端统计数一致。同时,当有未读消息的会话被点击,会话未读数清0时,要让服务有感知,然后通知到其他在线端,维护实时一致性。
其二,多端情况下,自己在一个客户端发送了新消息,其他客户端在没有其他新消息时,是无法感知并刷新自己的发送消息,这在多端同步中也是要解决的小问题。这时,简单的解决方案就是将自己发送的消息,也写入自己的同步库。只要再统计未读信息时,对自己的信息不计数,但在最新消息摘要中需要做更新。这样,多端同步问题很容易实现。

添加好友、入群申请

添加好友或入群,不是主动发起请求就会直接完成的,这里需要主动方申请后,审核方完成统一才会真实完成。因而只有在审核方才会有权限发起关系的创建。
那如何让被添加用户或群主感知到申请?当然是借助同步库,作为一种新的消息类型或者特殊的会话,让用户即时感知到新申请,尽早完成审批。申请列表如果需要持久化,也可单独建表维护,只要保证用户新申请的即时感知即可。

样例实操

本位为了与用户一起梳理IM系统应用的功能点,基于Tablestore实现的样例简单功能,完整的样例代码已完成开源,代码地址:源码链接。用户可以结合文章、代码一起阅读。代码在本地运行,使用前请确保:

  • 开通服务、创建实例
  • 获取AK
  • 设置样例配置文件
  • 实例支持二级索引(需要主动申请);

    样例配置

    在home目录下创建tablestoreCong.json文件,填写相应参数如下:
    1. # mac 或 linux系统下:/home/userhome/tablestoreCong.json
    2. # windows系统下: C:\Documents and Settings\%用户名%\tablestoreCong.json
    3. {
    4. "endpoint": "http://instanceName.cn-hangzhou.ots.aliyuncs.com",
    5. "accessId": "***********",
    6. "accessKey": "***********************",
    7. "instanceName": "instanceName"
    8. }
    endpoint:实例的接入地址,控制台实例详情页获取;
    accessId:AK的ID,获取AK链接提供;
    accessKey:AK的密码,获取AK链接提供;
    instanceName:使用的实例名;

    样例入口

    样例中共有三个入口,用户需要根据先后顺序执行,使用后及时释放资源,避免不必要的费用浪费;
入口 入口类名 功能
初始化 InitChartRoomExample 创建所有需要的表,同时根据配置创建相应的多元索引与二级索引
模拟调用 ClientRequestExample 应用的接口使用,样例未做前后端联调调用,用户可通过接口返回数据的打印了解使用方式。
释放资源 ReleaseChartRoomExample 释放所有资源,先释放索引后删表

项目结构

即时通讯IM架构 - 图24