|
背景目前笔者所在团队正常研发一款流程编排引擎,其中有多个功能特性需要MQ的延迟消息/消费者重试等特性。经过多方面的考量,我们最终决定采用计算存储分离的架构,在分布式KV存储的基础上,研发一款定制化的MQ。目前,其具备了MQ的主要特性。本文所描述的是基于分布式KV的基础上研发MQ的核心思路。术语「Message:」 消息「Topic:」 消息的逻辑分类「Partition:」 分区,一个Topic中包含多个分区,每个消息最终发送到Topic的某个分区中「Partition Offset:」 每当消息被发送到某个Partition中,这个Partition的Offset+1「Producer:」 生产者,发送消息到某个Topic下的某个分区「Consumer Group:」 消费者组。一个Topic可以有多个不同的Consumer Group进行消费,每个Consumer Group可以消费到Topic中的全量消息。「Consumer:」 消费者,消费Topic中的消息。如果一个Consumer Group下只有一个Consumer,则其会消费到全部消息;如果有多个Consumer,则每个Consumer只消费部分消息。「Consumer Group Offset:」 记录消费者组,针对某个Topic下所有Partition当前消费到位置。针对每个Partition,不会超过Partition 最大Offset。「Broker:」 消息代理,Producer将消息发送给Broker,由broker发送到topic下某个分区。Consumer连接Broker,从Broker消费消息。「Broker Cluster:」 Broker集群,为了实现高可用,每个Broker集群下包含多个Broke实例「Rebalance:」 再均衡。传统的实现中,一个Consumer Group下的Consumer数量不能超过这个Topic下Partition的数量,一个Partition最多只能分配给一个消费者,超出Partition数量的消费者无法消费到消息。在本文实现中,不存在Rebalance概念。消费者数量,不受Partition限制。「Delay Message:」 延迟消息。通常情况下,一个消息被投递到Topic中,就会被立即消费。延迟消息的意思是,延迟指定时间后,才可以被消费。「Retry Message:」 重试消息。当一条消息消费失败后,需要被重试,即按照一定的重试策略,重新让消费者来消费这条消息。整体架构说明:「KV」 「存储」本文以KV Storage是Redis Cluster为例进行讲解,Broker Cluster中的所有Broker实例共享这个Redis Cluster,其他任意支持按key排序scan的KV存储均可。从持久化的角度来说,使用内存模式Redis并不合适,这里意图在于说明基于分布式KV实现的MQ的核心原理。事实上,在公司内部,我们使用的是基于RocksDB基础上研发的分布式KV,在网络通信上兼容redis协议。为了简化,本文中不讨论Redis Cluster扩/缩容,Slot迁移的情况。但足以掌握基于分布式KV研发一款消息中间件的核心原理。「网络通信」Producer和Consumer与Broker的通信,在笔者的项目中,使用的是Grpc。在开源的通信框架中,Grpc可以说是最流行的方案,Apacha RocketMQ 5.x版本也采用了Grpc。在本文中,并不会对Grpc进行介绍。详细设计Broker集群元数据每个Broker启动时,可以将自身信息注册到Redis中,以便producer/consumer进行服务发现。例如通过hash结构维护:Key[cluster]$cluster_nameValuefiledvaluebroker1ip:portbroker2ip:portTopic元数据Topic元数据主要是维护Topic下有多少Partition,这些Partition在Redis Cluster中是如何分布的。用户在创建Topic时,指定分区数量。Redis Cluster有16384个槽,每个Redis分片负责其中部分槽。当创建一个Topic时,例如指定10个分区,可以按照一定策略把这个10个分区映射到不同的槽上,相当于间接的把分区分配到了不同的redis分片上。当创建好一个Topic之后,将Topic下的分区分配给不同的Broker。例如10个分区,10个Broker,则每个Broker负责一个分区。如果只有5个分区,那么需要分配给其中5个broker。例如通过hash结构维护维护这个映射关系key[topic_metadata]$topic_namevaluefiledvaluepartition1broker1partition2broker2消息消息使用protobuf进行定义:messageMessage{google.protobuf.Structmetadata=1;//消息的元数据stringpartition=2;//消息所属的分区int64offset=3;//消息的offsetstringmsgId=4;//消息的唯一idstringtopic=5;//消息的topicstringkey=6;//消息key,用于路由bytesbody=7;//消息体google.protobuf.Timestampborn_time=8;//消息生成时间google.protobuf.TimestampexpireTime=9;//消息截止时间,用于延迟消息}生产者在发送消息时,最简单的情况下,只需要指定消息的topic、body。当有其他特殊需求时,可以指定以下字段:key:具有相同key的消息,经过hash算法,写入到相同的分区。partition:直接指定分区,不根据key计算。expireTime:延迟消息。消息不希望被立即消费,而是到指定时间后,才会被消费。消息发送「从Producer的角度来说:」「重试:」 发送一条消息到broker可能失败,所以需要重试。重试需要设置一定的次数和超时时间,在超时时间内进行重试。「分区选择:」 选择分区应该在producer端确定,确定分区后,消息发送到分区所属的broker。「聚合:」 为了减少网络io,应该聚合批次进行发送,注意聚合是按照分区进行聚合「broker选择:」 对于无序消息,选择broker可以有一定的策略,例如某个broker失败率比较高,或者延迟比较高,则应该优先选择其他的broker。「从broker角度来说:」接收到一条消息时,offset信息维护。每次发送,在确定消息需要发送到的分区后,broker需要将对应partition的offset+1。在笔者的项目中,使用了hash结构存储每个分区的最大的offset:key:[topic_offset]{$topic}valuefieldvaluepartition1offset1partition2offset2为了提升offset维护的效率,不需要每次都调用HINCRBY,而是在broker启动时,将自己维护的分区offset信息加载到内存中,之后发送消息时,内存中增加,定期保存到KV中。此外,需要有一个修正offset的逻辑,避免broker异常宕机的情况下,offset没有成功保存到redis中。在broker启动时,可以从当前维护的最大offset开始往后扫描,如果发现了新消息,则说明offset需要修正(参考如下消息存储部分)。消息存储当消息被写入到redis中,key满足以下格式:[topic]{$topic_$partition}$offset其中:[topic]:是固定前缀{topic_partition}:是名称,partition表示分区。这里利用了redis hash tag能力。$offset:表示当前这个分区的offset消息拉取拉取通过redis scan操作进行,将scan到的消息交由消费者处理。在拉取消息时,依赖于一个consumer offset,其维护了某个consumer group消费某个topic的进度信息。拉取时,从这个位置开始。这里可以考虑使用hash数据结构:key:[consumer_offset]$group_$topicvaluefieldvaluepartition1offset1partition2offset2当有消费者连接上某个broker时,broker查询到自己负责的分区的parititon offset,从这个位置开始拉取消息。延迟消息对于所有延迟消息,会首先发送到一个特殊的delay topic中,相当于暂存这个消息。消息到期后,投递到目标topic中。在发送到延迟topic之前,会记录消息的原始topic、partition到metadata的origin_topic 「、」 origin_partition字段中。之后发送到delay topic中。key格式与普通消息不同,以时间戳排序:[delay]$broker_id}$expireTime会有一个延迟消息转发器,不断的扫描abase,当发现有消息到期时,修改延迟消息中的目标topic为origin_topic、origin_partition字段,之后从发送逻辑,投递到目标topic中。同时,会记录当前扫描到的位置。消费重试当消费者消费一条消息失败时,默认也是会走延迟消息逻辑,到期后,投递给目标消费者,重新消费。重试消息,也是基于延迟消息的基础上开发的。在逻辑上不同的是:延迟消息直接投递给了目标topic而重试消息不能投递给你目标topic,因为一个topic有多个consumer group,如果只是某个consumer group失败需要重试,那么其他consumer group应该不受影响。因此每个consumer_group,应该有独立的重试topic。如:[topic]retry_$consumer_group这个Topic下应该也有包含分区,策略与之前所述Topic元数据维护相同。死信队列当消息重试到最大重试次数后,依然失败,可以放入死信队列。如:[topic]dead_$consumer_group消息TTL为了避免已经被消费的消息,占用大量的存储空间,消息会被清理。我们的策略是:对于普通消息:3天后会自动清理,意味着一条消息3天内没被消费,将会被删除。对于延迟消息:在截止时间的基础上+3天。总结本文的目的是介绍如何基于分布式KV研发一款MQ的核心思路,很多容灾/高可用/性能优化等方面的主题并没有讨论。仅仅是提供一种核心思路,如果希望在生产环境使用,需要进行大量的改进与优化。点击阅读原文,了解更多技术干货~
|
|