默认
打赏 发表评论 29
想开发IM:买成品怕坑?租第3方怕贵?找开源自已撸?尽量别走弯路了... 找站长给点建议
一套高可用、易伸缩、高并发的IM群聊、单聊架构方案设计实践
阅读(278114) | 评论(29 收藏30 淘帖2 3
微信扫一扫关注!

本文原题为“一套高可用群聊消息系统实现”,由作者“于雨氏”授权即时通讯网整理和发布,内容有些许改动,作者博客地址:alexstocks.github.io。应作者要求,如需转载,请联系作者获得授权。


一、引言


要实现一整套能用于大用户量、高并发场景下的IM群聊,技术难度远超IM系统中的其它功能,原因在于:IM群聊消息的实时写扩散特性带来了一系列技术难题。

举个例子:如一个2000人群里,一条普通消息的发出问题,将瞬间写扩散为2000条消息的接收问题,如何保证这些消息的及时、有序、高效地送达,涉及到的技术问题点实在太多,更别说个别场景下万人大群里的炸群消息难题了更别说个别场景下万人大群里的炸群消息难题了。

这也是为什么一般中大型IM系统中,都会将群聊单独拎出来考虑架构的设计,单独有针对性地进行架构优化,从而降低整个系统的设计难度。

本文将分享的是一套生产环境下的IM群聊消息系统的高可用、易伸缩、高并发架构设计实践,属于原创第一手资料,内容较专业,适合有一定IM架构经验的后端程序员阅读。

推荐:如有兴趣,本文作者的另一篇《一套原创分布式即时通讯(IM)系统理论架构方案》,也适合正在进行IM系统架构设计研究的同学阅读。

另外,以下几篇有关IM实际动手开发的文章也值得一读,有兴趣可以看看:

二、群聊技术文章


IM群聊消息究竟是存1份(即扩散读)还是存多份(即扩散写)?
IM群聊消息的已读回执功能该怎么实现?
关于IM即时通讯群聊消息的乱序问题讨论
现代IM系统中聊天消息的同步和存储方案探讨
移动端IM中大规模群消息的推送如何保证效率、实时性?
微信后台团队:微信后台异步消息队列的优化升级实践分享
IM群聊消息如此复杂,如何保证不丢不重?
IM单聊和群聊中的在线状态同步应该用“推”还是“拉”?
如何保证IM实时消息的“时序性”与“一致性”?
快速裂变:见证微信强大后台架构从0到1的演进历程(一)

三、万事开头难:初始的极简实现


所谓的群聊消息系统,就是一种多对多群体聊天方式,譬如直播房间内的聊天室对应的服务器端就是一个群聊消息系统。

2017年9月初,我们初步实现了一套极简的群聊消息系统,其大致架构如下:
一套高可用、易伸缩、高并发的IM群聊、单聊架构方案设计实践_1.gif

系统名词解释:

  • 1)Client : 消息发布者【或者叫做服务端群聊消息系统调用者】,publisher;
  • 2)Proxy : 系统代理,对外统一接口,收集Client发来的消息转发给Broker;
  • 3)Broker :系统消息转发Server,Broker 会根据 Gateway Message 组织一个 RoomGatewayList【key为RoomID,value为 Gateway IP:Port 地址列表】,然后把 Proxy 发来的消息转发到 Room 中所有成员登录的所有 Gateway;
  • 4)Router :用户登录消息转发者,把Gateway转发来的用户登入登出消息转发给所有的Broker;
  • 5)Gateway :所有服务端的入口,接收合法客户端的连接,并把客户端的登录登出消息通过Router转发给所有的Broker;
  • 6)Room Message : Room聊天消息;
  • 7)Gateway Message : Room内某成员 登录 或者 登出 某Gateway消息,包含用户UIN/RoomID/Gateway地址{IP:Port}等消息。

当一个 Room 中多个 Client 连接一个 Gateway 的时候,Broker只会根据 RoomID 把房间内的消息转发一次给这个Gateway,由Gateway再把消息复制多份分别发送给连接这个 Gateway 的 Room 中的所有用户的客户端。

这套系统有如下特点:

  • 1)系统只转发房间内的聊天消息,每个节点收到后立即转发出去,不存储任何房间内的聊天消息,不考虑消息丢失以及消息重复的问题;
  • 2)系统固定地由一个Proxy、三个Broker和一个Router构成;
  • 3)Proxy接收后端发送来的房间消息,然后按照一定的负载均衡算法把消息发往某个Broker,Broker则把消息发送到所有与Room有关系的接口机Gateway;
  • 4)Router接收Gateway转发来的某个Room内某成员在这个Gateway的登出或者登录消息,然后把消息发送到所有Broker;
  • 5)Broker收到Router转发来的Gateway消息后,更新(添加或者删除)与某Room相关的Gateway集合记录;
  • 6)整个系统的通信链路采用UDP通信方式。

从以上特点,整个消息系统足够简单,没有考虑扩缩容问题,当系统负载到达极限的时候,就重新再部署一套系统以应对后端client的消息压力。

这种处理方式本质是把系统的扩容能力甩锅给了后端Client以及前端Gateway:每次扩容一个系统,所有Client需要在本地配置文件中添加一个Proxy地址然后全部重启,所有Gateway则需要再本地配置文件添加一个Router地址然后全部重启

这种“幸福我一人,辛苦千万家”的扩容应对方式,必然导致公司内部这套系统的使用者怨声载道,下一阶段的升级就是必然的了。

四、进一步重点设计:“可扩展性”


4.1、基本思路


大道之行也,天下为公,不同的系统有不同的构架,相同的系统总有类似的实现。类似于数据库的分库分表【关于分库分表,目前看到的最好的文章是《一种支持自由规划无须数据迁移和修改路由代码的Replicaing扩容方案》】,其扩展实现核心思想是分Partition分Replica,但各Replica之间还区分leader(leader-follower,只有leader可接受写请求)和non-leader(所有replica均可接收写请求)两种机制。

从数据角度来看,这套系统接收两种消息:Room Message(房间聊天消息)和Gateway Message(用户登录消息)。两种消息的交汇之地就是Broker,所以应对扩展的紧要地方就是Broker,Broker的每个Partition采用non-leader机制,各replica均可接收Gateway Message消息写请求和Room Message转发请求。

首先,当Room Message量加大时可以对Proxy进行水平扩展,多部署Proxy即可因应Room Message的流量。

其次,当Gateway Message量加大时可以对Router进行水平扩展,多部署Router即可因应Gateway Message的流量。

最后,两种消息的交汇之地Broker如何扩展呢?可以把若干Broker Replica组成一个Partition,因为Gateway Message是在一个Partition内广播的,所有Broker Replica都会有相同的RoomGatewayList 数据,因此当Gateway Message增加时扩容Partition即可。当Room Message量增加时,水平扩容Partition内的Broker Replica即可,因为Room Message只会发送到Partition内某个Replica上。

从个人经验来看,Room ID的增长以及Room内成员的增加量在一段时间内可以认为是直线增加,而Room Message可能会以指数级增长,所以若设计得当则Partition扩容的概率很小,而Partition内Replica水平增长的概率几乎是100%。

不管是Partition级别的水平扩容还是Partition Replica级别的水平扩容,不可能像系统极简版本那样每次扩容后都需要Client或者Gateway去更新配置文件然后重启,因应之道就是可用zookeeper充当角色的Registriy。通过这个zookeeper注册中心,相关角色扩容的时候在Registry注册后,与之相关的其他模块得到通知即可获取其地址等信息。采用zookeeper作为Registry的时候,所以程序实现的时候采用实时watch和定时轮询的策略保证数据可靠性,因为一旦网络有任何的抖动,zk就会认为客户端已经宕机把链接关闭。

分析完毕,与之相对的架构图如下:
一套高可用、易伸缩、高并发的IM群聊、单聊架构方案设计实践_2.gif

以下各分章节将描述各个模块详细流程。

4.2、Client


Client详细流程如下:

  • 1)从配置文件加载Registry地址;
  • 2)从Registy上Proxy注册路径/pubsub/proxy下获取所有的Proxy,依据各个Proxy ID大小顺序递增组成一个ProxyArray;
  • 3)启动一个线程实时关注Registry路径/pubsub/proxy,以获取Proxy的动态变化,及时更新ProxyArray;
  • 4)启动一个线程定时轮询获取Registry路径/pubsub/proxy下各个Proxy实例,作为关注策略的补充,以期本地ProxyArray内各个Proxy成员与Registry上的各个Proxy保持一致;定时给各个Proxy发送心跳,异步获取心跳回包;定时清除ProxyArray中心跳超时的Proxy成员;
  • 5)发送消息的时候采用snowflake算法给每个消息分配一个MessageID,然后采用相关负载均衡算法把消息转发给某个Proxy。

4.3、Proxy


Proxy详细流程如下:

  • 1)读取配置文件,获取Registry地址;
  • 2)把自身信息注册到Registry路径/pubsub/proxy下,把Registry返回的ReplicaID作为自身ID;
  • 3)从Registry路径/pubsub/broker/partition(x)下获取每个Broker Partition的各个replica;
  • 4)从Registry路径/pubsub/broker/partition_num获取当前有效的Broker Partition Number;

  • 5)启动一个线程关注Registry上的Broker路径/pubsub/broker,以实时获取以下信息:
  •     {Broker Partition Number}
  •      - 新的Broker Partition(此时发生了扩容);
  •      - Broker Partition内新的broker replica(Partition内发生了replica扩容);
  •      - Broker Parition内某replica挂掉的信息;

  • 6)定时向各个Broker Partition replica发送心跳,异步等待Broker返回的心跳响应包,以探测其活性,以保证不向超时的replica转发Room Message;
  • 7)启动一个线程定时读取Registry上的Broker路径/pubsub/broker下各个子节点的值,以定时轮询的策略观察Broker Partition Number变动,以及各Partition的变动情况,作为实时策略的补充;同时定时检查心跳包超时的Broker,从有效的BrokerList中删除;
  • 8)依据规则【BrokerPartitionID = RoomID % BrokerPartitionNum, BrokerReplicaID = RoomID % BrokerPartitionReplicaNum】向某个Partition的replica转发Room Message,收到Client的Heatbeat包时要及时给予响应。

之所以把Room Message和Heartbeat Message放在一个线程处理,是为了防止进程假死这种情况。

当/pubsub/broker/partition_num的值发生改变的时候(譬如值改为4),意味着Router Partition进行了扩展,Proxy要及时获取新Partition路径(如/pubsub/broker/Partition2和/pubsub/broker/Partition3)下的实例,并关注这些路径,获取新Partition下的实例。

之所以Proxy在获取Registry下所有当前的Broker实例信息后再注册自身信息,是因为此时它才具有转发消息的资格。

Proxy转发某个Room消息时候,只发送给处于Running状态的Broker。为Broker Partition内所有replica依据Registry给其分配的replicaID进行递增排序,组成一个Broker Partition Replica Array,规则中BrokerPartitionReplicaNum为Array的size,而BrokerReplicaID为replica在Array中的下标。

4.4、Pipeline


收到的 Room Message 需要做三部工作:收取 Room Message、消息协议转换和向 Broker 发送消息。

初始系统这三步流程如果均放在一个线程内处理,proxy 的整体吞吐率只有 50 000 Msg/s。

最后的实现方式是按照消息处理的三个步骤以 pipeline 方式做如下流程处理:

  • 1)启动 1 个消息接收线程和 N【N == Broker Parition 数目】个多写一读形式的无锁队列【称之为消息协议转换队列】,消息接收线程分别启动一个 epoll 循环流程收取消息,然后把消息以相应的 hash 算法【队列ID = UIN % N】写入对应的消息协议转换队列;
  • 2)启动 N 个线程 和 N * 3 个一写一读的无锁队列【称之为消息发送队列】,每个消息协议专家线程从消息协议转换队列接收到消息并进行协议转换后,根据相应的 hash 算法【队列ID = UIN % 3N】写入消息发送队列;
  • 3)启动 3N 个消息发送线程,分别创建与之对应的 Broker 的连接,每个线程单独从对应的某个消息发送队列接收消息然后发送出去。

经过以上流水线改造后,Proxy 的整体吞吐率可达 200 000 Msg/s。

关于 pipeline 自身的解释,本文不做详述,可以参考下图:
一套高可用、易伸缩、高并发的IM群聊、单聊架构方案设计实践_1111.jpg

4.5、大房间消息处理


每个 Room 的人数不均,最简便的解决方法就是给不同人数量级的 Room 各搭建一套消息系统,不用修改任何代码。

然所谓需求推动架构改进,在系统迭代升级过程中遇到了这样一个需求:业务方有一个全国 Room,用于给所有在线用户进行消息推送。针对这个需求,不可能为了一个这样的 Room 单独搭建一套系统,况且这个 Room 的消息量很少。

如果把这个 Room 的消息直接发送给现有系统,它有可能影响其他 Room 的消息发送:消息系统是一个写放大的系统,全国 Room 内有系统所有的在线用户,每次发送都会卡顿其他 Room 的消息发送。

最终的解决方案是:使用类似于分区的方法,把这样的大 Room 映射为 64 个虚拟 Room【称之为 VRoom】。在 Room 号段分配业务线的配合下,给消息系统专门保留了一个号段,用于这种大 Room 的切分,在 Proxy 层依据一个 hash 方法 【 VRoomID = UserID % 64】 把每个 User 分配到相应的 VRoom,其他模块代码不用修改即完成了大 Room 消息的路由。

4.6、Broker


Broker详细流程如下:

  • 1)Broker加载配置,获取自身所在Partition的ID(假设为3);
  • 2)向Registry路径/pubsub/broker/partition3注册,设置其状态为Init,注册中心返回的ID作为自身的ID(replicaID);
  • 3)接收Router转发来的Gateway Message,放入GatewayMessageQueue;
  • 4)从Database加载数据,把自身所在的Broker Partition所应该负责的 RoomGatewayList 数据加载进来;
  • 5)异步处理GatewayMessageQueue内的Gateway Message,只处理满足规则【PartitionID == RoomID % PartitionNum】的消息,把数据存入本地路由信息缓存;
  • 6)修改Registry路径/pubsub/broker/partition3下自身节点的状态为Running;
  • 7)启动线程实时关注Registry路径/pubsub/broker/partition_num的值;
  • 8)启动线程定时查询Registry路径/pubsub/broker/partition_num的值;
  • 9)当Registry路径/pubsub/broker/partition_num的值发生改变的时候,依据规则【PartitionID == RoomID % PartitionNum】清洗本地路由信息缓存中每条数据;
  • 10)接收Proxy发来的Room Message,依据RoomID从路由信息缓存中查找Room有成员登陆的所有Gateway,把消息转发给这些Gateway。

注意Broker之所以先注册然后再加载Database中的数据,是为了在加载数据的时候同时接收Router转发来的Gateway Message,但是在数据加载完前这些受到的数据先被缓存起来,待所有 RoomGatewayList 数据加载完后就把这些数据重放一遍;

Broker之所以区分状态,是为了在加载完毕 RoomGatewayList 数据前不对Proxy提供转发消息的服务,同时也方便Broker Partition应对的消息量增大时进行水平扩展。

当Broker发生Partition扩展的时候,新的Partition个数必须是2的幂,只有新Partition内所有Broker Replica都加载实例完毕,再更改/pubsub/broker/partition_num的值。

老的Broker也要watch路径/pubsub/broker/partition_num的值,当这个值增加的时候,它也需要清洗本地的路由信息缓存。

Broker的扩容过程犹如细胞分裂,形成中的两个细胞有着完全相同的数据,分裂完成后【Registry路径/pubsub/broker/partition_num的值翻倍】则需要清洗垃圾信息。这种方法称为翻倍法。

4.7、Router


Router详细流程如下:

  • 1)Router加载配置,Registry地址;
  • 2)把自身信息注册到Registry路径/pubsub/router下,把Registry返回的ReplicaID作为自身ID;
  • 3)从Registry路径/pubsub/broker/partition(x)下获取每个Broker Partition的各个replica;
  • 4)从Registry路径/pubsub/broker/partition_num获取当前有效的Broker Partition Number;

  • 5)启动一个线程关注Registry上的Broker路径/pubsub/broker,以实时获取以下信息:
  •     {Broker Partition Number}
  •     - 新的Broker Partition(此时发生了扩容);
  •     - Broker Partition内新的broker replica(Partition内发生了replica扩容);
  •     - Broker Parition内某replica挂掉的信息;

  • 6)定时向各个Broker Partition replica发送心跳,异步等待Broker返回的心跳响应包,以探测其活性,以保证不向超时的replica转发Gateway Message;
  • 7)启动一个线程定时读取Registry上的Broker路径/pubsub/broker下各个子节点的值,以定时轮询的策略观察Broker Partition Number变动,以及各Partition的变动情况,作为实时策略的补充;同时定时检查心跳包超时的Broker,从有效的BrokerList中删除;
  • 8)从Database全量加载路由 RoomGatewayList 数据放入本地缓存;
  • 9)收取Gateway发来的心跳消息,及时返回ack包;
  • 10)收取Gateway转发来的Gateway Message,按照一定规则【BrokerPartitionID % BrokerPartitionNum = RoomID % BrokerPartitionNum】转发给某个Broker Partition下所有Broker Replica,保证Partition下所有replica拥有同样的路由 RoomGatewayList 数据,再把Message内数据存入本地缓存,当检测到数据不重复的时候把数据异步写入Database。

4.8、Gateway


Gateway详细流程如下:

  • 1)读取配置文件,加载Registry地址;
  • 2)从Registry路径/pubsub/router/下获取所有router replica,依据各Replica的ID递增排序组成replica数组RouterArray;
  • 3)启动一个线程实时关注Registry路径/pubsub/router,以获取Router的动态变化,及时更新RouterArray;
  • 4)启动一个线程定时轮询获取Registry路径/pubsub/router下各个Router实例,作为关注策略的补充,以期本地RouterArray及时更新;定时给各个Router发送心跳,异步获取心跳回包;定时清除RouterArray中心跳超时的Router成员;
  • 5)当有Room内某成员客户端连接上来或者Room内所有成员都不连接当前Gateway节点时,依据规则【RouterArrayIndex = RoomID % RouterNum】向某个Router发送Gateway Message;
  • 6)收到Broker转发来的Room Message时,根据MessageID进行去重,如果不重复则把消息发送到连接到当前Gateway的Room内所有客户端,同时把MessageID缓存起来以用于去重判断。

Gateway本地有一个基于共享内存的LRU Cache,存储最近一段时间发送的消息的MessageID。

五、接下来迫切要解决的:系统稳定性


系统具有了可扩展性仅仅是系统可用的初步,整个系统要保证最低粒度的SLA(0.99),就必须在两个维度对系统的可靠性就行感知:消息延迟和系统内部组件的高可用。

5.1、消息延迟


准确的消息延迟的统计,通用的做法可以基于日志系统对系统所有消息或者以一定概率抽样后进行统计,但限于人力目前没有这样做。

目前使用了一个方法:通过一种构造一组伪用户ID,定时地把消息发送给proxy,每条消息经过一层就把在这层的进入时间和发出时间以及组件自身的一些信息填入消息,这组伪用户的消息最终会被发送到一个伪Gateway端,伪Gateway对这些消息的信息进行归并统计后,即可计算出当前系统的平均消息延迟时间。

通过所有消息的平均延迟可以评估系统的整体性能。同时,因为系统消息路由的哈希方式已知,当固定时间内伪Gateway没有收到消息时,就把消息当做发送失败,当某条链路失败一定次数后就可以产生告警了。

5.2、高可用


上面的方法同时能够检测某个链路是否出问题,但是链路具体出问题的点无法判断,且实时性无法保证。

为了保证各个组件的高可用,系统引入了另一种评估方法:每个层次都给后端组件发送心跳包,通过心跳包的延迟和成功率判断其下一级组件的当前的可用状态。

譬如proxy定时给每个Partition内每个broker发送心跳,可以依据心跳的成功率来快速判断broker是否处于“假死”状态(最近业务就遇到过broker进程还活着,但是对任何收到的消息都不处理的情况)。

同时依靠心跳包的延迟还可以判断broker的处理能力,基于此延迟值可在同一Partition内多broker端进行负载均衡。

六、进一步优化:消息可靠性


公司内部内部原有一个走tcp通道的群聊消息系统,但是经过元旦一次大事故(几乎全线崩溃)后,相关业务的一些重要消息改走这套基于UDP的群聊消息系统了。这些消息如服务端下达给客户端的游戏动作指令,是不允许丢失的,但其特点是相对于聊天消息来说量非常小(单人1秒最多一个),所以需要在目前UDP链路传递消息的基础之上再构建一个可靠消息链路。

国内某IM大厂的消息系统也是以UDP链路为基础的(见《为什么QQ用的是UDP协议而不是TCP协议?》),他们的做法是消息重试加ack构建了可靠消息稳定传输链路。但是这种做法会降低系统的吞吐率,所以需要独辟蹊径。

UDP通信的本质就是伪装的IP通信,TCP自身的稳定性无非是重传、去重和ack,所以不考虑消息顺序性的情况下可以通过重传与去重来保证消息的可靠性。

基于目前系统的可靠消息传输流程如下:

  • 1)Client给每个命令消息依据snowflake算法配置一个ID,复制三份,立即发送给不同的Proxy;
  • 2)Proxy收到命令消息以后随机发送给一个Broker;
  • 3)Broker收到后传输给Gateway;
  • 4)Gateway接收到命令消息后根据消息ID进行重复判断,如果重复则丢弃,否则就发送给APP,并缓存之。

正常的消息在群聊消息系统中传输时,Proxy会根据消息的Room ID传递给固定的Broker,以保证消息的有序性。

七、Router需要进一步强化


7.1、简述


当线上需要部署多套群聊消息系统的时候,Gateway需要把同样的Room Message复制多份转发给多套群聊消息系统,会增大Gateway压力,可以把Router单独独立部署,然后把Room Message向所有的群聊消息系统转发。

Router系统原有流程是:Gateway按照Room ID把消息转发给某个Router,然后Router把消息转发给下游Broker实例。新部署一套群聊消息系统的时候,新系统Broker的schema需要通过一套约定机制通知Router,使得Router自身逻辑过于复杂。

一套高可用、易伸缩、高并发的IM群聊、单聊架构方案设计实践_3.gif

重构后的Router架构参照上图,也采用分Partition分Replica设计,Partition内部各Replica之间采用non-leader机制;各Router Replica不会主动把Gateway Message内容push给各Broker,而是各Broker主动通过心跳包形式向Router Partition内某个Replica注册,而后此Replica才会把消息转发到这个Broker上。

类似于Broker,Router Partition也以2倍扩容方式进行Partition水平扩展,并通过一定机制保证扩容或者Partition内部各个实例停止运行或者新启动时,尽力保证数据的一致性。

Router Replica收到Gateway Message后,replica先把Gateway Message转发给Partition内各个peer replica,然后再转发给各个订阅者。Router转发消息的同时异步把消息数据写入Database。

独立Router架构下,下面小节将分别详述Gateway、Router和Broker三个相关模块的详细流程。

7.2、Gateway


Gateway详细流程如下:

  • 1)从Registry路径/pubsub/router/partition(x)下获取每个Partition的各个replica;
  • 2)从Registry路径/pubsub/router/partition_num获取当前有效的Router Partition Number;
  • 3)启动一个线程关注Registry上的Router路径/pubsub/router,以实时获取以下信息:{Router Partition Number} -> 新的Router Partition(此时发生了扩容);  Partition内新的replica(Partition内发生了replica扩容);  Parition内某replica挂掉的信息;
  • 4)定时向各个Partition replica发送心跳,异步等待Router返回的心跳响应包,以探测其活性,以保证不向超时的replica转发Gateway Message;
  • 5)启动一个线程定时读取Registry上的Router路径/pubsub/router下各个子节点的值,以定时轮询的策略观察Router Partition Number变动,以及各Partition的变动情况,作为实时策略的补充;同时定时检查心跳包超时的Router,从有效的BrokerList中删除;
  • 6 依据规则向某个Partition的replica转发Gateway Message。

第六步的规则决定了Gateway Message的目的Partition和replica,规则内容有:

如果某Router Partition ID满足condition(RoomID % RouterPartitionNumber == RouterPartitionID % RouterPartitionNumber),则把消息转发到此Partition;

这里之所以不采用直接hash方式(RouterPartitionID = RoomID % RouterPartitionNumber)获取Router Partition,是考虑到当Router进行2倍扩容的时候当所有新的Partition的所有Replica都启动完毕且数据一致时才会修改Registry路径/pubsub/router/partitionnum的值,按照规则的计算公式才能保证新Partition的各个Replica在启动过程中就可以得到Gateway Message,也即此时每个Gateway Message会被发送到两个Router Partition。 当Router扩容完毕,修改Registry路径/pubsub/router/partitionnum的值后,此时新集群进入稳定期,每个Gateway Message只会被发送固定的一个Partition,condition(RoomID % RouterPartitionNumber == RouterPartitionID % RouterPartitionNumber)等效于condition(RouterPartitionID = RoomID % RouterPartitionNumber)。

如果Router Partition内某replia满足condition(replicaPartitionID = RoomID % RouterPartitionReplicaNumber),则把消息转发到此replica。

replica向Registry注册的时候得到的ID称之为replicaID,Router Parition内所有replica按照replicaID递增排序组成replica数组RouterPartitionReplicaArray,replicaPartitionID即为replica在数组中的下标。



Gateway Message数据一致性:

Gateway向Router发送的Router Message内容有两种:某user在当前Gateway上进入某Room 和 某user在当前Gateway上退出某Room,数据项分别是UIN(用户ID)、Room ID、Gateway Addr和User Action(Login or Logout。

由于所有消息都是走UDP链路进行转发,则这些消息的顺序就有可能乱序。Gateway可以统一给其发出的所有消息分配一个全局递增的ID【下文称为GatewayMsgID,Gateway Message ID】以保证消息的唯一性和全局有序性。

Gateway向Registry注册临时有序节点时,Registry会给Gateway分配一个ID,Gateway可以用这个ID作为自身的Instance ID【假设这个ID上限是65535】。

GatewayMsgID字长是64bit,其格式如下:
// 63 -------------------------- 48 47 -------------- 38 37 ------------ 0
// |  16bit Gateway Instance ID    |   10bit Reserve    |    38bit自增码  |

7.3、Router


Router系统部署之前,先设置Registry路径/pubsub/router/partition_num的值为1。

Router详细流程如下:

  • 1)Router加载配置,获取自身所在Partition的ID(假设为3);
  • 2)向Registry路径/pubsub/router/partition3注册,设置其状态为Init,注册中心返回的ID作为自身的ID(replicaID);
  • 3)注册完毕会收到Gateway发来的Gateway Message以及Broker发来的心跳消息(HeartBeat Message),先缓存到消息队列MessageQueue;
  • 4)从Registry路径/pubsub/router/partition3下获取自身所在的Partition内的各个replica;
  • 5)从Registry路径/pubsub/router/partition_num获取当前有效的Router Partition Number;
  • 6)启动一个线程关注Registry路径/pubsub/router,以实时获取以下信息:{Router Partition Number}  -> Partition内新的replica(Partition内发生了replica扩容);  Parition内某replica挂掉的信息;
  • 7)从Database加载数据;
  • 8)启动一个线程异步处理MessageQueue内的Gateway Message,把Gateway Message转发给同Partition内其他peer replica,然后依据规则【RoomID % BrokerPartitionNumber == BrokerReplicaPartitionID % BrokerPartitionNumber】转发给BrokerList内每个Broker;处理Broker发来的心跳包,把Broker的信息存入本地BrokerList,然后给Broker发送回包;
  • 9)修改Registry路径/pubsub/router/partition3下节点的状态为Running;
  • 10)启动一个线程定时读取Registry路径/pubsub/router下各个子路径的值,以定时轮询的策略观察Router各Partition的变动情况,作为实时策略的补充;检查超时的Broker,把其从BrokerList中剔除;
  • 11)当RouterPartitionNum倍增时,Router依据规则【RoomID % BrokerPartitionNumber == BrokerReplicaPartitionID % BrokerPartitionNumber】清洗自身路由信息缓存中数据;
  • 12)Router本地存储每个Gateway的最大GatewayMsgID,收到小于GatewayMsgID的Gateway Message可以丢弃不处理,否则就更新GatewayMsgID并根据上面逻辑进行处理。

之所以把Gateway Message和Heartbeat Message放在一个线程处理,是为了防止进程假死这种情况。

Broker也采用了分Partition分Replica机制,所以向Broker转发Gateway Message时候路由规则,与Gateway向Router转发消息的路由规则相同。

另外启动一个工具,当水平扩展后新启动的Partition内所有Replica的状态都是Running的时候,修改Registry路径/pubsub/router/partition_num的值为所有Partition的数目。

7.4、Broker


Broker详细流程如下:

  • 1)Broker加载配置,获取自身所在Partition的ID(假设为3);
  • 2)向Registry路径/pubsub/broker/partition3注册,设置其状态为Init,注册中心返回的ID作为自身的ID(replicaID);
  • 3)从Registry路径/pubsub/router/partition_num获取当前有效的Router Partition Number;
  • 4)从Registry路径/pubsub/router/partition(x)下获取每个Router Partition的各个replica;
  • 5)启动一个线程关注Registry路径/pubsub/router,以实时获取以下信息:{Router Partition Number} -> 新的Router Partition(此时发生了扩容);  Partition内新的replica(Partition内发生了replica扩容);  Parition内某replica挂掉的信息;
  • 6)依据规则【RouterPartitionID % BrokerPartitionNum == BrokerPartitionID % BrokerPartitionNum,RouterReplicaID = BrokerReplicaID % BrokerPartitionNum】选定目标Router Partition下某个Router replica,向其发送心跳消息,包含BrokerPartitionNum、BrokerPartitionID、BrokerHostAddr和精确到秒级的Timestamp,并异步等待所有Router replica的回复,所有Router转发来的Gateway Message放入GatewayMessageQueue;
  • 7)依据规则【BrokerPartitionID == RoomID % BrokerParitionNum】从Database加载数据;
  • 8)依据规则【BrokerPartitionID % BrokerParitionNum == RoomID % BrokerParitionNum】异步处理GatewayMessageQueue内的Gateway Message,只留下合乎规则的消息的数据;
  • 9)修改Registry路径/pubsub/broker/partition3下自身节点的状态为Running;
  • 10)启动一个线程定时读取Registry路径/pubsub/router下各个子路径的值,以定时轮询的策略观察Router各Partition的变动情况,作为实时策略的补充;定时检查超时的Router,某Router超时后更换其所在的Partition内其他Router替换之,定时发送心跳包;
  • 11)当Registry路径/pubsub/broker/partition_num的值BrokerPartitionNum发生改变的时候,依据规则【PartitionID == RoomID % PartitionNum】清洗本地路由信息缓存中每条数据;
  • 12)接收Proxy发来的Room Message,依据RoomID从路由信息缓存中查找Room有成员登陆的所有Gateway,把消息转发给这些Gateway;
  • 13)Broker本地存储每个Gateway的最大GatewayMsgID,收到小于GatewayMsgID的Gateway Message可以丢弃不处理,否则更新GatewayMsgID并根据上面逻辑进行处理。

BrokerPartitionNumber可以小于或者等于或者大于RouterPartitionNumber,两个数应该均是2的幂,两个集群可以分别进行扩展,互不影响。譬如BrokerPartitionNumber=4而RouterPartitionNumber=2,则Broker Partition 3只需要向Router Partition 1的某个follower发送心跳消息即可;若BrokerPartitionNumber=4而RouterPartitionNumber=8,则Broker Partition 3需要向Router Partition 3的某个follower发送心跳消息的同时,还需要向Router Partition 7的某个follower发送心跳,以获取全量的Gateway Message。

Broker需要关注/pubsub/router/partitionnum和/pubsub/broker/partitionnum的值的变化,当router或者broker进行parition水平扩展的时候,Broker需要及时重新构建与Router之间的对应关系,及时变动发送心跳的Router Replica对象【RouterPartitionID = BrokerReplicaID % RouterPartitionNum,RouterPartitionID为Router Replica在PartitionRouterReplicaArray数组的下标】。

当Router Partition内replica死掉或者发送心跳包的replica对象死掉(无论是注册中心通知还是心跳包超时),broker要及时变动发送心跳的Router replica对象。

另外,Gateway使用UDP通信方式向Router发送Gateway Message,如若这个Message丢失则此Gateway上该Room内所有成员一段时间内(当有新的成员在当前Gateway上加入Room 时会产生新的Gateway Message)都无法再接收消息,为了保证消息的可靠性,可以使用这样一个约束解决问题:在此Gateway上登录的某Room内的人数少于3时,Gateway会把Gateway Message复制两份非连续(如以10ms为时间间隔)重复发送给某个Partition leader。因Gateway Message消息处理的幂等性,重复Gateway Message并不会导致Room Message发送错误,只在极少概率的情况下会导致Gateway收到消息的时候Room内已经没有成员在此Gateway登录,此时Gateway会把消息丢弃不作处理。

传递实时消息群聊消息系统的Broker向特定Gateway转发Room Message的时候,会带上Room内在此Gateway上登录的用户列表,Gateway根据这个用户列表下发消息时如果检测到此用户已经下线,在放弃向此用户转发消息的同时,还应该把此用户已经下线的消息发送给Router,当Router把这个消息转发给Broker后,Broker把此用户从用户列表中剔除。通过这种负反馈机制保证用户状态更新的及时性

八、离线消息的处理


8.1、简述


前期的系统只考虑了用户在线情况下实时消息的传递,当用户离线时其消息便无法获取。

若系统考虑用户离线消息传递,需要考虑如下因素:

  • 1)消息固化:保证用户上线时收到其离线期间的消息;
  • 2)消息有序:离线消息和在线消息都在一个消息系统传递,给每个消息分配一个ID以区分消息先后顺序,消息顺序越靠后则ID愈大。

离线消息的存储和传输,需要考虑用户的状态以及每条消息的发送状态,整个消息核心链路流程会有大的重构。

新消息架构如下图:
一套高可用、易伸缩、高并发的IM群聊、单聊架构方案设计实践_4.gif

系统名词解释:

  • 1)Pi : 消息ID存储模块,存储每个人未发送的消息ID有序递增集合;
  • 2)Xiu : 消息存储KV模块,存储每个人的消息,给每个消息分配ID,以ID为key,以消息内为value;
  • 3)Gateway Message(HB) : 用户登录登出消息,包括APP保活定时心跳(Hearbeat)消息。

系统内部代号貔貅(貔貅者,雄貔雌貅),源自上面两个新模块。

这个版本架构流程的核心思想为“消息ID与消息内容分离,消息与用户状态分离”。消息发送流程涉及到模块 Client/Proxy/Pi/Xiu,消息推送流程则涉及到模块 Pi/Xiu/Broker/Router/Gateway。

下面小节先细述Pi和Xiu的接口,然后再详述发送和推送流程。

8.2、Xiu


Xiu模块功能名称是Message Storage,用户缓存和固化消息,并给消息分配ID。Xiu 集群采用分 Partition 分 Replica 机制,Partition 初始数目须是2的倍数,集群扩容时采用翻倍法。

8.2.1存储消息


存储消息请求的参数列表为{SnowflakeID,UIN, Message},其流程如下:

  • 1)接收客户端发来的消息,获取消息接收人ID(UIN)和客户端给消息分配的 SnowflakeID;
  • 2)检查 UIN % Xiu_Partition_Num == Xiu_Partition_ID % Xiu_Partition_Num 添加是否成立【即接收人的消息是否应当由当前Xiu负责】,不成立则返回错误并退出;
  • 3)检查 SnowflakeID 对应的消息是否已经被存储过,若已经存储过则返回其对应的消息ID然后退出;
  • 4)给消息分配一个 MsgID:
    每个Xiu有自己唯一的 Xiu_Partition_ID,以及一个初始值为 0 的 Partition_Msg_ID。MsgID = 1B[ Xiu_Partition_ID ] + 1B[ Message Type ] + 6B[ ++ Partition_Msg_ID ]。每次分配的时候 Partition_Msg_ID 都自增加一。
  • 5)以 MsgID 为 key 把消息存入基于共享内存的 Hashtable,并存入消息的 CRC32 hash值和插入时间,把 MsgID 存入一个 LRU list 中:
    LRU List 自身并不存入共享内存中,当进程重启时,可以根据Hashtable中的数据重构出这个List。把消息存入 Hashtable 中时,如果 Hashtable full,则依据 LRU List 对Hashtable 中的消息进行淘汰。
  • 6)把MsgID返回给客户端;
  • 7)把MsgID异步通知给消息固化线程,消息固化线程根据MsgID从Hashtable中读取消息并根据CRC32 hash值判断消息内容是否完整,完整则把消息存入本地RocksDB中。

8.2.2读取消息


读取消息请求的参数列表为{UIN, MsgIDList},其流程为:

  • 1)获取请求的 MsgIDList,判断每个MsgID MsgID{Xiu_Partition_ID} == Xiu_Partition_ID 条件是否成立,不成立则返回错误并退出;
  • 2)从 Hashtable 中获取每个 MsgID 对应的消息;
  • 3)如果 Hashtable 中不存在,则从 RocksDB 中读取 MsgID 对应的消息;
  • 4)读取完毕则把所有获取的消息返回给客户端。

8.2.3主从数据同步


目前从简,暂定Xiu的副本只有一个。

Xiu节点启动的时候根据自身配置文件中分配的 Xiu_Partition_ID 到Registry路径 /pubsub/xiu/partition_id 下进行注册一个临时有序节点,注册成功则Registry会返回Xiu的节点 ID。

Xiu节点获取 /pubsub/xiu/partition_id 下的所有节点的ID和地址信息,依据 节点ID最小者为leader 的原则,即可判定自己的角色。只有leader可接受读写数据请求。

数据同步流程如下:

  • 1)follower定时向leader发送心跳信息,心跳信息包含本地最新消息的ID;
  • 2)leader启动一个数据同步线程处理follower的心跳信息,leader的数据同步线程从LRU list中查找 follower_latest_msg_id 之后的N条消息的ID,若获取到则读取消息并同步给follower,获取不到则回复其与leader之间消息差距太大;
  • 3)follower从leader获取到最新一批消息,则存储之;
  • 4)follower若获取leader的消息差距太大响应,则请求leader的agent把RocksDB的固化数据全量同步过来,整理完毕后再次启动与leader之间的数据同步流程。

follower会关注Registry路径 /pubsub/xiu/partition_id 下所有所有节点的变化情况,如果leader挂掉则及时转换身份并接受客户端请求。如果follower 与 leader 之间的心跳超时,则follower删掉 leader 的 Registry 路径节点,及时进行身份转换处理客户端请求。

当leader重启或者follower转换为leader的时候,需要把 Partition_Msg_ID 进行一个大数值增值(譬如增加1000)以防止可能的消息ID乱序情况。

8.2.4集群扩容


Xiu 集群扩容采用翻倍法,扩容时新 Partition 的节点启动后工作流程如下:

  • 1)向Registry的路径 /pubsub/xiu/partition_id 下自己的 node 的 state 为 running,同时注册自己的对外服务地址信息;
  • 2)另外启动一个工具,当水平扩展后所有新启动的 Partition 内所有 Replica 的状态都是 Running 的时候,修改 Registry 路径 /pubsub/xiu/partition_num 的值为扩容后 Partition 的数目。按照开头的例子,即由2升级为4。

之所以 Xiu 不用像 Broker 和 Router 那样启动的时候向老的 Partition 同步数据,是因为每个 Xiu 分配的 MsgID 中已经带有 Xiu 的 PartitionID 信息,即使集群扩容这个 ID 也不变,根据这个ID也可以定位到其所在的Partition,而不是借助 hash 方法。

8.3、Pi


Pi 模块功能名称是 Message ID Storage,存储每个用户的 MsgID List。Xiu 集群也采用分 Partition 分 Replica 机制,Partition 初始数目须是2的倍数,集群扩容时采用翻倍法。

8.3.1存储消息ID


MsgID 存储的请求参数列表为{UIN,MsgID},Pi 工作流程如下:

  • 1)判断条件 UIN % Pi_Partition_Num == Pi_Partition_ID % Pi_Partition_Num 是否成立,若不成立则返回error退出;
  • 2)把 MsgID 插入UIN的 MsgIDList 中,保持 MsgIDList 中所有 MsgID 不重复有序递增,把请求内容写入本地log,给请求者返回成功响应。

Pi有专门的日志记录线程,给每个日志操作分配一个 LogID,每个 Log 文件记录一定量的写操作,当文件 size 超过配置的上限后删除之。

8.3.2读取消息ID列表


读取请求参数列表为{UIN, StartMsgID, MsgIDNum, ExpireFlag},其意义为获取用户 UIN 自起始ID为 StartMsgID 起(不包括 StartMsgID )的数目为 MsgIDNum 的消息ID列表,ExpireFlag意思是 所有小于等于 StartMsgID 的消息ID是否删除。

流程如下:

  • 1)判断条件 UIN % Pi_Partition_Num == Pi_Partition_ID % Pi_Partition_Num 是否成立,若不成立则返回error退出;
  • 2)获取 (StartID, StartMsgID + MsgIDNum] 范围内的所有 MsgID,把结果返回给客户端;
  • 3)如果 ExpireFlag 有效,则删除MsgIDList内所有在 [0, StartMsgID] 范围内的MsgID,把请求内容写入本地log。

8.3.3主从数据同步


同 Xiu 模块,暂定 Pi 的同 Parition 副本只有一个。

Pi 节点启动的时候根据自身配置文件中分配的 Pi_Partition_ID 到Registry路径 /pubsub/pi/partition_id 下进行注册一个临时有序节点,注册成功则 Registry 会返回 Pi 的节点 ID。

Pi 节点获取 /pubsub/pi/partition_id 下的所有节点的ID和地址信息,依据 节点ID最小者为leader 的原则,即可判定自己的角色。只有 leader 可接受读写数据请求。

数据同步流程如下:

  • 1)follower 定时向 leader 发送心跳信息,心跳信息包含本地最新 LogID;
  • 2)leader 启动一个数据同步线程处理 follower 的心跳信息,根据 follower 汇报的 logID 把此 LogID;
  • 3)follower 从 leader 获取到最新一批 Log,先存储然后重放。

follower 会关注Registry路径 /pubsub/pi/partition_id 下所有节点的变化情况,如果 leader 挂掉则及时转换身份并接受客户端请求。如果follower 与 leader 之间的心跳超时,则follower删掉 leader 的 Registry 路径节点,及时进行身份转换处理客户端请求。

8.3.4集群扩容


Pi 集群扩容采用翻倍法。则节点启动后工作流程如下:

  • 1)向 Registry 注册,获取 Registry 路径 /pubsub/xiu/partition_num 的值 PartitionNumber;
  • 2)如果发现自己 PartitionID 满足条件 PartitionID >= PartitionNumber 时,则意味着当前 Partition 是扩容后的新集群,更新 Registry 中自己状态为start;
  • 3)读取 Registry 路径 /pubsub/xiu 下所有 Parition 的 leader,根据条件 自身PartitionID % PartitionNumber == PartitionID % PartitionNumber 寻找对应的老 Partition 的 leader,称之为 parent_leader;
  • 4)缓存收到 Proxy 转发来的用户请求;
  • 5)向 parent_leader 获取log;
  • 6)向 parent_leader 同步内存数据;
  • 7)重放 parent_leader 的log;
  • 8)更新 Registry 中自己的状态为 Running;
  • 9)重放用户请求;
  • 10)当 Registry 路径 /pubsub/xiu/partition_num 的值 PartitionNumber 满足条件 PartitionID >= PartitionNumber 时,意味着扩容完成,处理用户请求时要给用户返回响应。

Proxy 会把读写请求参照条件 UIN % Pi\_Partition\_Num == Pi\_Partition\_ID % Pi\_Partition\_Num 向相关 partition 的 leader 转发用户请求。假设原来 PartitionNumber 值为2,扩容后值为4,则原来转发给 partition0 的写请求现在需同时转发给 partition0 和 partition2,原来转发给 partition1 的写请求现在需同时转发给 partition1 和 partition3。

另外启动一个工具,当水平扩展后所有新启动的 Partition 内所有 Replica 的状态都是 Running 的时候,修改Registry路径/pubsub/xiu/partition_num的值为扩容后 Partition 的数目。

8.4、数据发送流程


消息自 PiXiu 的外部客户端(Client,服务端所有使用 PiXiu 提供的服务者统称为客户端)按照一定负载均衡规则发送到 Proxy,然后存入 Xiu 中,把 MsgID 存入 Pi 中。

其详细流程如下:

  • 1)Client 依据 snowflake 算法给消息分配 SnowflakeID,依据 ProxyID = UIN % ProxyNum 规则把消息发往某个 Proxy;
  • 2)Proxy 收到消息后转发到 Xiu;
  • 3)Proxy 收到 Xiu 返回的响应后,把响应转发给 Client;
  • 4)如果 Proxy 收到 Xiu 返回的响应带有 MsgID,则发起 Pi 写流程,把 MsgID 同步到 Pi 中;
  • 5)如果 Proxy 收到 Xiu 返回的响应带有 MsgID,则给 Broker 发送一个 Notify,告知其某 UIN 的最新 MsgID。

8.5、数据转发流程


转发消息的主体是Broker,原来的在线消息转发流程是它收到 Proxy 转发来的 Message,然后根据用户是否在线然后转发给 Gateway。

PiXiu架构下 Broker 会收到以下类型消息:

  • 1)用户登录消息;
  • 2)用户心跳消息;
  • 3)用户登出消息;
  • 4)Notify 消息;
  • 5)Ack 消息。

Broker流程受这五种消息驱动,下面分别详述其收到这五种消息时的处理流程。

用户登录消息流程如下:

  • 1)检查用户的当前状态,若为 OffLine 则把其状态值为在线 OnLine;
  • 2)检查用户的待发送消息队列是否为空,不为空则退出;
  • 3)向 Pi 模块发送获取 N 条消息 ID 的请求 {UIN: uin, StartMsgID: 0, MsgIDNum: N, ExpireFlag: false},设置用户状态为 GettingMsgIDList 并等待回应;
  • 4)根据 Pi 返回的消息 ID 队列,向 Xiu 发起获取消息请求 {UIN: uin, MsgIDList: msg ID List},设置用户状态为 GettingMsgList 并等待回应;
  • 5)Xiu 返回消息列表后,设置状态为 SendingMsg,并向 Gateway 转发消息。

可以把用户心跳消息当做用户登录消息处理。

Gateway的用户登出消息产生有三种情况:

  • 1)用户主动退出;
  • 2)用户心跳超时;
  • 3)给用户转发消息时发生网络错误。

用户登出消息处理流程如下:

  • 1)检查用户状态,如果为 OffLine,则退出;
  • 2)用户状态不为 OffLine 且检查用户已经发送出去的消息列表的最后一条消息的 ID(LastMsgID),向 Pi 发送获取 MsgID 请求{UIN: uin, StartMsgID: LastMsgID, MsgIDNum: 0, ExpireFlag: True},待 Pi 返回响应后退出。

处理 Proxy 发来的 Notify 消息处理流程如下:

  • 1)如果用户状态为 OffLine,则退出;
  • 2)更新用户的最新消息 ID(LatestMsgID),如果用户发送消息队列不为空则退出;
  • 3)向 Pi 模块发送获取 N 条消息 ID 的请求 {UIN: uin, StartMsgID: 0, MsgIDNum: N, ExpireFlag: false},设置用户状态为 GettingMsgIDList 并等待回应;
  • 4)根据 Pi 返回的消息 ID 队列,向 Xiu 发起获取消息请求 {UIN: uin, MsgIDList: msg ID List},设置用户状态为 GettingMsgList 并等待回应;
  • 5)Xiu 返回消息列表后,设置状态为 SendingMsg,并向 Gateway 转发消息。

所谓 Ack 消息,就是 Broker 经 Gateway 把消息转发给 App 后,App 给Broker的消息回复,告知Broker其最近成功收到消息的 MsgID。

Ack 消息处理流程如下:

  • 1)如果用户状态为 OffLine,则退出;
  • 2)更新 LatestAckMsgID 的值;
  • 3)如果用户发送消息队列不为空,则发送下一个消息后退出;
  • 4)如果 LatestAckMsgID >= LatestMsgID,则退出;
  • 5)向 Pi 模块发送获取 N 条消息 ID 的请求 {UIN: uin, StartMsgID: 0, MsgIDNum: N, ExpireFlag: false},设置用户状态为 GettingMsgIDList 并等待回应;
  • 6)根据 Pi 返回的消息 ID 队列,向 Xiu 发起获取消息请求 {UIN: uin, MsgIDList: msg ID List},设置用户状态为 GettingMsgList 并等待回应;
  • 7)Xiu 返回消息列表后,设置状态为 SendingMsg,并向 Gateway 转发消息。

总体上,PiXiu 转发消息流程采用拉取(pull)转发模型,以上面五种消息为驱动进行状态转换,并作出相应的动作行为。

九、单聊消息


前几章所述之系统架构均基于群聊这一实时通信场景,此群聊消息系统在公司内部稳定运行近一年半时间,期间经历了6个大版本的迭代演进,赢得了各个业务线同事的信任,承担了从群聊、聊天室、语音传递、到游戏信令传递等等各种场景的群聊实时通信业务。

2017 年 9 月份刚接手系统时,已有业务方提出把对单人下发的消息【以下称之为 Chat Message】也接入这套系统,即消息系统不仅是一个群聊消息下发通道,也应该是一个单聊消息下发通道,但是考虑到当时处于系统实现初期,个人基于系统小作的考量直接拒绝了。随着这一年半其接入的业务范围的扩展,目下公司正考虑拆分 Gateway,这套消息系统需要被重构以处理单聊消息就是势然了。

公司的 Gateway 大概是我见过的最复杂的接口层系统,各种本应该放在逻辑层处理的子系统都被糅合进了接口层 Gateway 这一单个模块之内,除了公司几个创始人外很少有人清楚其它内部逻辑,据说陆续有四个高薪招聘的高手在接手这套系统后三个月内跑路了。Gateway 内部与聊天场景有关的状态数据主要有两种:用户所在的群信息【以下简称 Router Data】以及用户自身的状态信息【包括在线状态以及客户端地址信息,以下简称 Relay Data】。

Router Data 已经被群聊系统的 Router 模块替代掉,当下的 Gateway 已经不需要存储 Router Data。若扩展此系统使其能够处理单聊消息,能够存储用户的在线状态以及用户接入的 Gateway 信息,则 Gateway 就可以不用存储任何有关用户的状态数据了。

9.1、实时消息系统架构


Relay Data 迥异于 Router Data:Relay Data 的 key 是 UIN,其数据传递依赖于 Relay Message;而 Router Data 的 key 是 GroupID,其数据传递依赖于 Gateway Message。此二者差异决定了这两种数据的处理不可能在一个单一的模块之内。

新的实时通信系统添加了一个 Relay 模块用于处理 Relay Message 并存储 Relay Data。

添加了单聊消息处理能力的实时消息系统架构如下:
一套高可用、易伸缩、高并发的IM群聊、单聊架构方案设计实践_aaa.gif

下面详述新架构下各个模块的功能和消息处理流程,至于系统注册、系统扩容以及注册通知等相关流程与以往处理机制雷同,下面不再详述。

注意:新架构图中把原来的专门存储 Router Data 的 Database 模块改称为 Router DB,并添加了一个 Relay DB,以专门存储 Relay Data。

9.2、Gateway


Gateway 不再存储 Router Data 和 Relay Data,几乎成了 APP 的透明代理。

其功能如下:

  • 1)接收 APP 的连接请求,并把用户连接消息以 Relay Message 形式发送给 Relay;
  • 2)APP 与 Gateway 连接断开时以 Relay Message 形式发送给 Relay;
  • 3)把用户登录登出某 Room 的消息转发给 Router;
  • 4)接收 Relay 转发来的 Room Message 和 Chat Message,并下发给 APP。

9.3、Relay


Relay 是一个新模块,但其组织方式类似于 Router,亦是分 Partition 分 Replica,处理 Relay Message。

Relay 模块依据用户的 UIN 进行把不同用户的 Relay Data 放入不同的 Parition,同 Partition 内的 所有 Relay Replica 数据一致。

Relay 功能列表如下:

  • 1)接收由 Gateway 转发来的 Relay Message,存储为 Relay Data,并把 Relay Data 异步存入 Relay DB;
  • 2)起始时先从 Relay DB 获取其 Partiton 内所有用户的 Relay Data,以与 Partiton 内其他 Replica 保持数据一致性;
  • 3)接收由 Broker 转发来的 Room Message,依据 Relay Data 记录的用户所在的 Gateway 把消息复制转发给 Gateway;
  • 4)接收由 Proxy 转发来的 Chat Message,依据 Relay Data 记录的用户所在的 Gateway 把消息复制转发给 Gateway;
  • 5)前面提及所有处理 Room Message 系统都是一个写放大的系统,究其原因是一条 Room Message 需要被复制下发给 Room 内每个人,所以 Relay 处理 Room Message 时需要根据 Room Message 内的 Room UIN List 对消息进行复制后下发给每个 User APP 所连接的 Gateway。

当然,如果用户不在线,Relay 会对 Room Message 和 Chat Message 都作丢弃处理,因为此系统只处理实时消息。

Relay 承担了群聊消息与单聊消息的下发。

9.4、Broker


新架构下 Broker 模块不再直接把 Room Message 转发给 Gateway,而是转发给 Relay。有了 Relay,Broker 自身只需要存储每个 Group 的 UIN List 即可,大大减轻了自身的任务流程,Broker 可以把 Relay 视作 pseudo-gateway。

Broker 功能列表如下:

  • 1)接收 Router 转发来的 Gateway Message,存储为 Router Data,并把 Router Data 异步存入 Router DB;
  • 2)起始时先从 Router DB 获取其 Partiton 内所有用户的 Router Data,以与 Partiton 内其他 Replica 保持数据一致性;
  • 3)接收由 Proxy 转发来的 Room Message,依据 Router Data 记录的 Group 内的 UIN List 把消息复制转发给各个 Relay;
  • 4)Broker 仅仅承担了群聊消息下发。

9.5、Proxy


Proxy 接收 Client 发来的消息时,需要区分消息是 Room Message 还是 Chat Message。

新架构下其功能列表如下:

  • 1)关注 Registry Broker Path,以实时获取正确的 Broker List;
  • 2)关注 Registry Relay Path,以实时获取正确的 Relya List;
  • 3)接收 Client 发来的 Room Message,依据 Group 把消息转发给 Broker;
  • 4)接收 Client 发来的 Chat Message,依据 UIN 把消息转发给 Relay;
  • 5)Proxy 对 Group Message 的处理仅仅是转发了事,并不需要复制,所以不存在写放大的问题。

其实还可以把 Proxy 区分为 Group Message Proxy 和 Chat Message Proxy,以期收逻辑清晰职责明了之效。

十、本文总结


这套群聊消息系统尚有以下task list需完善:

  • 1)消息以UDP链路传递,不可靠【2018/01/29解决之】;
  • 2)目前的负载均衡算法采用了极简的RoundRobin算法,可以根据成功率和延迟添加基于权重的负载均衡算法实现;
  • 3)只考虑传递,没有考虑消息的去重,可以根据消息ID实现这个功能【2018/01/29解决之】;
  • 4)各个模块之间没有考虑心跳方案,整个系统的稳定性依赖于Registry【2018/01/17解决之】;
  • 5)离线消息处理【2018/03/03解决之】;
  • 6)区分消息优先级。

此记。

参考文档:一种支持自由规划无须数据迁移和修改路由代码的Replicaing扩容方案

十一、本文成文历程


  • 于雨氏,2017/12/31,初作此文于丰台金箱堂。
  • 于雨氏,2018/01/16,于海淀添加“系统稳定性”一节。
  • 于雨氏,2018/01/29,于海淀添加“消息可靠性”一节。
  • 于雨氏,2018/02/11,于海淀添加“Router”一节,并重新格式化全文。
  • 于雨氏,2018/03/05,于海淀添加“PiXiu”一节。
  • 于雨氏,2018/03/14,于海淀添加负反馈机制、根据Gateway Message ID保证Gateway Message数据一致性 和 Gateway用户退出消息产生机制 等三个细节。
  • 于雨氏,2018/08/05,于海淀添加 “pipeline” 一节。
  • 于雨氏,2018/08/28,于海淀添加 “大房间消息处理” 一节。
  • 于雨氏,2019/01/19,于丰台添加 “#7 单聊消息” 一章节。

附录:更多IM架构设计文章


浅谈IM系统的架构设计
简述移动端IM开发的那些坑:架构设计、通信协议和客户端
一套海量在线用户的移动端IM架构设计实践分享(含详细图文)
一套原创分布式即时通讯(IM)系统理论架构方案
从零到卓越:京东客服即时通讯系统的技术架构演进历程
蘑菇街即时通讯/IM服务器开发之架构选择
腾讯QQ1.4亿在线用户的技术挑战和架构演进之路PPT
微信后台基于时间序的海量数据冷热分级架构设计实践
微信技术总监谈架构:微信之道——大道至简(演讲全文)
如何解读《微信技术总监谈架构:微信之道——大道至简》
快速裂变:见证微信强大后台架构从0到1的演进历程(一)
17年的实践:腾讯海量产品的技术方法论
移动端IM中大规模群消息的推送如何保证效率、实时性?
现代IM系统中聊天消息的同步和存储方案探讨
IM开发基础知识补课(二):如何设计大量图片文件的服务端存储架构?
IM开发基础知识补课(三):快速理解服务端数据库读写分离原理及实践建议
IM开发基础知识补课(四):正确理解HTTP短连接中的Cookie、Session和Token
WhatsApp技术实践分享:32人工程团队创造的技术神话
微信朋友圈千亿访问量背后的技术挑战和实践总结
王者荣耀2亿用户量的背后:产品定位、技术架构、网络方案等
IM系统的MQ消息中间件选型:Kafka还是RabbitMQ?
腾讯资深架构师干货总结:一文读懂大型分布式系统设计的方方面面
以微博类应用场景为例,总结海量社交系统的架构设计步骤
快速理解高性能HTTP服务端的负载均衡技术原理
子弹短信光鲜的背后:网易云信首席架构师分享亿级IM平台的技术实践
知乎技术分享:从单机到2000万QPS并发的Redis高性能缓存实践之路
IM开发基础知识补课(五):通俗易懂,正确理解并用好MQ消息队列
微信技术分享:微信的海量IM聊天消息序列号生成实践(算法原理篇)
微信技术分享:微信的海量IM聊天消息序列号生成实践(容灾方案篇)
新手入门:零基础理解大型分布式架构的演进历史、技术原理、最佳实践
一套高可用、易伸缩、高并发的IM群聊架构方案设计实践
>> 更多同类文章 ……

即时通讯网 - 即时通讯开发者社区! 来源: - 即时通讯开发者社区!

上一篇:IM消息ID技术专题(二):微信的海量IM聊天消息序列号生成实践(容灾方案篇)下一篇:请教RabitMQ用于IM中1:1和1:N消息收发的技术可行性

本帖已收录至以下技术专辑

推荐方案
评论 29
文章写的太详细了,读我的都心衰。。。
好详细啊,向大佬致敬
引用:jueze 发表于 2018-10-23 16:54
好详细啊,向大佬致敬

。。。好复杂
引用:605682883 发表于 2018-10-24 18:51
。。。好复杂

是的,中大型im架构是很复杂的
好文章,写的很深呀!
有几个问题请教一下。
1:你说贵公司tcp的系统瘫痪换udp了。请问是为什么呢。tcp显然不会比udp差或者不稳定。据我所知当年qq用udp也是无奈之举,业务当时的系统单机很难承载很多tcp长连接。如果要是放到现在单机很容易几十万的长连接,就会用tcp了。不知道你们选择udp的理由是什么呢?

2:你的架构,client端连接的是proxy。如果是基于tcp的。那么这个proxy是不是能和gateway合并呢?因为gateway和client是一个长连接。如果client发送一条消息。是不是就会先到gateway。或者说是client直接在通过短连接直接连接proxy。而不用这个长连接呢?还是gateway转给proxy。或者说gateway和proxy合并呢?

3:最后的router扩容方案没怎么理解。router本来的作用仅仅是转发gateway过来的登录登出消息。为什么
room message复制多份需要扩容router?room message复制多份 仅仅是相当于room message增加。room message增加不是仅仅需要扩容proxy 和  broker吗?

4:proxy router gateway broker 这些组件 ,他们内部是如何通信的?

引用:65725738 发表于 2018-11-03 10:53
好文章,写的很深呀!
有几个问题请教一下。
1:你说贵公司tcp的系统瘫痪换udp了。请问是为什么呢。tcp显 ...

逐个回答你的问题。

1 关于到底用tcp还是udp,这个话题跟你们公司相关基础设施建设有关。
2 文中 Client 一词你得根据上下文理解,所有图中的 Client 指的是服务端的消息发布者,并不是 APP 类型的客户端,所以不存着 gateway和proxy合并的事情。

3 你得理解“room message复制多份 仅仅是相当于room message增加。room message增加不是仅仅需要扩容proxy 和  broker吗?”非常且完全正确,Router扩容是登录消息量增大的时候才进行扩容,如果文中有相关字句造成你理解为“room message复制多份需要扩容router”,请指出,我可以修改。至于Router扩容方案你没怎么理解,具体哪一步不理解,可以在Jack Jiang的群里私下找我沟通。

4 至于各个组件怎么通信,我个人觉得blog已经详细说明了,有错误地方或者不明晰的地方可以直接在Jack群里找我沟通。
引用:yuyu 发表于 2018-11-04 00:46
逐个回答你的问题。

1 关于到底用tcp还是udp,这个话题跟你们公司相关基础设施建设有关。

感谢本文原作者的回复
引用:yuyu 发表于 2018-11-04 00:46
逐个回答你的问题。

1 关于到底用tcp还是udp,这个话题跟你们公司相关基础设施建设有关。

不好意思 。谢谢解答。但是还是没太明白。  你说 所有图中的 Client 指的是服务端的消息发布者,并不是 APP 类型的客户端。  这个没问题。   我的意思是如果是使用tcp长连接的话。   gateway就是 app客户端连接的服务端。那么它本身不就是你的 图中的 Client 吗?

至于我第4个问题。我意思组件之间是用什么技术通信的。 http,tcp长连接,tcp短连接。rmi 还是什么?

大佬你在一群,我现在加群是四群。所以只能在这里问你了。
引用:yuyu 发表于 2018-11-04 00:46
逐个回答你的问题。

1 关于到底用tcp还是udp,这个话题跟你们公司相关基础设施建设有关。

七、Router需要进一步强化

7.1、简述

当线上需要部署多套群聊消息系统的时候,Gateway需要把同样的Room Message复制多份转发给多套群聊消息系统,会增大Gateway压力,可以把Router单独独立部署,然后把Room Message向所有的群聊消息系统转发。

这里面没理解。gateway 为什么需要复制多份Room Message。然后为什么要把Router单独独立部?
请教几个问题。
1. 用户发的消息怎么进入系统,我看里面的client并不是指用户?  用户与gateway保持长连接,这个连接只用于用户接收消息吗?
2. broker会转发消息给跟roomID有关系的gateway。若一个gateway跟所有roomID有关系,是不是从client发出的所有消息都会转发到这个gateway,这样gateway要出处理这个系统所有消息,会不会有瓶颈?
有一点不是很明白:
1.是不是写扩散点逻辑在最后的方案中是放在【Relay】中。
2.【Route】和【Relay】的区别是什么,介绍中感觉【Roter】只是负载维护用户和群的映射关系的管理工作。按道理【Route】与【Relay】应该是有交互的,图中没有体现。那【Relay】如何获取写扩散的映射关系,用户和群的关系。
3.【Broker】是不是最后只负责抽取【PI】【XIU】中存储的消息进行数据转发给对应的【Relay】
引用:coding_im 发表于 2018-12-07 18:56
请教几个问题。
1. 用户发的消息怎么进入系统,我看里面的client并不是指用户?  用户与gateway保持长连 ...

这个消息系统仅仅是一个消息下发通道,可类比为一种pubsub型的mq,如果下发消息是群消息,则这个系统根据群ID就能把消息准确的扩散复制后下发给群里每个相关成员的客户端,如果消息消息是单聊消息,则这个系统根据消息中的UID就能准确地把消息下发给这个UID的客户端。

下面逐个回答你的问题:
1 这里的client其实是服务器其他模块对这个系统的调用者,套用mq中的术语,其为producer。用户与gateway之间确实保持长连接,这个连接其实是用于上传客户端发送出的消息,也用于接收别的客户端发送来的消息,但是这套系统重点讲解的是如何对消息进行下发,所以本文只讲了如何对消息进行扩散、寻路(路由)和下发。

2 如果一个 RoomID 的所有客户端都连接一个 Gateway,则你说的这个问题确实存在。但是,这里的问题是如何避免这种情况发生,而不是把问题抛给消息系统,把压力传递给消息系统让消息系统去解决这个问题。譬如,我们在Gateway前再加一个 httpDNS 系统,当客户端寻找要连接的 Gateway 的时候,由 httpDNS 系统确保所有的客户端连接均匀地被打散到各个 Gateway,同时限制每个 Gateway 的连接数目不会超过最大 Room 内成员额定数【譬如最大 QQ 群是2000人,则确保每个 Gateway 的连接数低于 2000】,这个问题估计就不是个问题了?

上面是个人的愚见,还有需要指教的地方,可以继续回帖。

点评

JackJiang  说:
yuyu是本文《一套高可用、易伸缩、高并发的IM群聊架构方案设计实践》的作者,专门请他来回复的,以他的回答为准,大家可以多讨论  (5 年前)
引用:一夕 发表于 2019-03-01 16:06
有一点不是很明白:
1.是不是写扩散点逻辑在最后的方案中是放在【Relay】中。
2.【Route】和【Relay】的 ...

逐个回复你的问题。
1 写扩散点最后确实是在 Relay。
2 以系统最后一个版本为例,Router 是向系统的 Broker  转发群内每个在线用户的在线信息【Gateway Message】,向 Relay 转发每个在线用户所在的 Gateway信息【Relay Message】。Router向Relay发送消息,Relay不会向Router发送消息,只会向 Gateway 发送群聊消息【Room Message】和单聊消息【Chat Message】。

或许举例一个实际场景能够更让人清晰明了。譬如有如下场景:

A: Room1 【RoomID: 100】内有用户 U1【UID:1001】、U2【UID:1002】和 U3【UID:1003】;
B:U1 登录了 Gateway1【G1】,Gateway1 的地址为 10.0.0.1:10000;
C:U2 登录了 Gateway2【G2】,Gateway2 的地址为 10.0.0.2:10000;
D:U3 不在线。

则 一条Gateway Message 包含的数据关系就是:map{key: Room1-100,value: User List[U1-1001,U2-1002]};

Relay Message有两条,其数据关系分别是 map{key: U1-1001, value: G1-10.0.0.1:10000} 和  map{key: U2-1002, value: G2-10.0.0.2:10000} ;

Room Message 就是一条包含 Room1-100 的消息;

Chat Message 就是两条分别包含 U1-1001 和 U2-1002的消息。

3 本系统其实讲了两个模型的系统:第九章内容讲解的最新版本的实时在线消息下发系统以及第八章内容讲述的异步离线消息下发系统,前者是使用推技术把消息【推送】给客户端,后者则是依赖客户端自身的状态把消息【拉取】下来。

离线消息系统中并没有 Relay 模块,只实现了群聊消息的异步下发,所以请明晰概念后再进一步讨论。
引用:yuyu 发表于 2019-03-02 17:56
逐个回复你的问题。
1 写扩散点最后确实是在 Relay。
2 以系统最后一个版本为例,Router 是向系统的 Br ...

感谢作者的详细回复!
看了一下文章,确实很有深度,但是搞不明白的一点是,消息从gateway发出,client如何获取到呢?这块没衔接起来,望大佬指点一下。
引用:登至必极 发表于 2020-04-27 21:48
看了一下文章,确实很有深度,但是搞不明白的一点是,消息从gateway发出,client如何获取到呢?这块没衔接 ...

client就是通过长连接连到gateway的
还有一个问题就是,服务的可用性怎么保证?也就是一旦某种类型的某个服务比如borker中的某个replica挂断,怎么进行故障恢复对终端无感知呢?谢谢大佬解答
引用:登至必极 发表于 2020-04-28 14:57
还有一个问题就是,服务的可用性怎么保证?也就是一旦某种类型的某个服务比如borker中的某个replica挂断, ...

集群的另一个优势就是高可用,这个服务不可用了,客户在重连逻辑下就连到别的实例了。
打赏楼主 ×
使用微信打赏! 使用支付宝打赏!

返回顶部