|
RocketMQ源码分析概要本文主要做了RocketMQ4.8版本的源码主流程分析。一二三四五节介绍文件存储部分。六七节介绍消息生产流程,包含client端和server端。八节介绍消息消费流程。九节介绍了传统的主从消息复制模式。最后一节介绍了相对较新的基于raft的dleger模式消息处理流程。一.存储设计RocketMQ主要的存储文件包括Comitlog文件、ConsumeQueue文件、IndexFile文件。RocketMQ将所有主题的消息存储在同一个文件中。但由于消息中间件一般是基于主题进行订阅,这样做不方便按照消息主题检索消息。为了方便消息消费,RocketMQ引入了ConsumeQueue消息队列文件,每个Topic包含多个ConsumeQueue,每一个ConsumeQueue有一个文件。该文件可以看成是Commitlog关于消息消费的“索引”。此外还有IndexFile索引文件,主要就是为了加速消息的检索,根据消息的属性快速从Commitlog文件中检索消息。CommitLog:消息存储文件,所有Topic的消息都存储在CommitLog文件中。ConsumeQueue:消息消费队列,消息到达CommitLog文件后,将异步转发到ConsumeQueue,供消费者消费。IndexFile:消息索引文件,主要存储消息Key与Offset(消息在CommitLog中的偏移)的对应关系。二. 存储文件组织RocketMQ通过使用内存映射文件来提高IO访问性能,无论是CommitLog、ConsumeQueue还是IndexFile,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件,文件名就是该文件第一条消息对应的全局物理偏移量。以下为commitlog实际文件截图:存储目录RocketMQ使用MappedFile、MappedFileQueue来封装存储文件。MappedFile文件映射使用方式如下:MappedFileQueue是MappedFile的管理容器,也是对存储目录的封装,例如CommitLog文件的存储路径${ROCKET_HOME}/store/commitlog/,该目录下会存在多个内存映射文件(MappedFile)。逻辑关系数据结构三.存储文件格式3.1?Commitlog文件格式我们来看下按照这种格式来查找指定消息的逻辑。根据offset查找消息。根据offset定位到文件用offset与文件长度取余得到在文件内的偏移量,从该偏移量读取size长度的内容返回即可3.2 Consumequeue文件格式为了加速ConsumeQueue消息条目的检索,每一个Consumequeue条目不会存储消息的全量信息,其存储格式如图:ConsumeQueue即为Commitlog文件的索引文件,其构建机制是当消息存储到Commitlog文件后,由专门的线程转发消息,从而构建ConsumeQueue。如下图,将consumeQueue想象成一个元素大小为20字节的大数组的话,startIndex就是这个数组的索引,而真正在文件中查找的时候则需要换算成文件内的字节偏移量(直接乘20)。以下为根据消费者消费的逻辑偏移量查找consumeQueue条目的逻辑。四.消费队列文件的实时更新ConsumeQueue、IndexFile都是基于CommitLog文件构建的,当生产者提交的消息存储在Commitlog文件中时,ConsumeQueue文件需要及时更新,否则消息无法及时被消费。RocketMQ通过开启一个线程ReputMessageServcie来实时读取CommitLog文件新增内容,使用reputFromOffset来标记已经追踪到的位置。主要流程如下:从当前记录已追踪到的位置reputFromOffset开始读取commitLog数据将读取到的buffer解析成携带关键信息的DispatchRequest(包含commitLogOffset,msgSize,tagsCode等consumeQueue的必要信息,以用来构建consumeQueue)doDispatch方法最终会调用putMessagePositionInfo方法,将读取到的信息存入consumeQueue对应文件。五.文件刷盘机制RocketMQ的存储与读写基于内存映射机制(MappedByteBuffer),消息存储时首先将消息追加到内存,再根据配置的刷盘策略在一定时间进行刷写磁盘。如果是同步刷盘,消息追加到内存后,将同步调用MappedByteBuffer的force()方法。如果是异步刷盘,在消息追加到内存后立刻返回给发送端。RocketMQ使用一个单独的线程按照某一个设定的频率执行刷盘。通过在broker配置文件中配置flushDiskType来设定刷盘方式,可选值为ASYNC_FLUSH(异步刷盘)、SYNC_FLUSH(同步刷盘),默认为异步刷盘。参数解析:flushIntervalCommitLog:FlushRealTimeService线程任务运行间隔(默认500ms)。flushPhysicQueueLeastPages:一次刷写任务至少包含的页数,如果脏页数量不足,小于该参数配置的值,将忽略本次刷写任务,默认4页。flushPhysicQueueThoroughInterval:两次真实刷写任务最大间隔,默认10s。注:每间隔10S会把flushPhysicQueueLeastPages强制设置为0,意为强制刷盘,无论脏页数量是否达到。5.1?异步刷盘主流程单线程死循环,每间隔500ms调用一次flush方法。flush方法的入参是一次刷写任务至少包含的页数, 如果脏页页数不足,则啥都不做。flush方法最终会走到 mappedByteBuffer.force()方法。注:force方法详解1.?jdk 源码中显示force方法最终会调用msync系统调用,并传入MS_SYNC标志位。2.?MS_SYNC标志位的含义为同步等待刷盘结果。六.消息生产 client端流程生产者消息发送流程主要的步骤:校验消息、查找路由、消息发送(包含异常处理机制)。6.1?消息生产总流程获取topic路由信息根据策略选择一个messageQueue进行消息发送,默认使用轮询策略调用sendKernelImpl方法发送消息,如果发送失败,默认最多重试三次sendKernelImpl 会调用到mqClient,使用netty发送请求给mq broker注:1. 路由信息既指定topic共有多少队列,这些队列分布在哪些broker节点上,方便发送消息时选择具体的broker节点发送rpc请求。2. 路由信息数据结构如下:6.2?消息发送路由选择详解获取路由信息如果本地缓存中包含指定topic的路由信息,则直接返回;如本地map中不包含指定topic的路由信息,则发送rpc请求到nameserver获取。消息队列选择查询到的路由信息(队列分布信息)结构如下图。选择策略默认使用轮询模式,依次来做消息发送的负载均衡。3. 消息发送,根据上步选好的队列决定出brokerAddr,发送rpc请求到这个地址。七.消息生产 broker端流程消息存储代码入口:org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage。检查消息合法性调用CommitLog类的putMessage方法展开CommitLog类的putMessage获取当前可以写入的Commitlog文件put之前先上独占锁调用MappedFile#appendMessage方法finally块中解锁4.?展开MappedFile#appendMessage(核心就是调用MappedByteBuffer.put)八.消费者消费消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费组可订阅多个主题,消费组之间有集群模式与广播模式两种消费模式。集群模式,Topic下的同一条消息只允许被其中一个消费者消费。广播模式,Topic下的同一条消息将被集群内的所有消费者消费一次。集群模式下,多个消费者如何对消息队列进行负载呢?消息队列负载机制遵循的原则:一个消息队列同一时间只允许被一个消费者消费,一个消费者可以消费多个消息队列。8.1?客户端消息拉取RocketMQ使用一个单独的线程PullMessageService来负责消息的拉取。a.从pullRequestQueue中获取一个PullRequest(消息拉取任务),如果pullRequestQueue为空,则线程将阻塞,直到有拉取任务被放入。b.调用pullMessage方法处理拿到的PullRequest进行消息拉取,PullRequest包含将要消费的messageQueue信息。c.?调用pullKernelImpl,传入需要拉取的具体队列、offset、maxNums,拉取完成后需要执行的回调。d.?根据选择的队列得到具体的broker地址,然后调用mqClient发送rpc请求到broker。e.?收到响应,执行回调。将foundList(收到的消息列表)放入processQueue。调用submitConsumeRequest处理收到的消息体(实际上是将拉取到的消息丢入线程池,做异步化处理)。将pullRequest重新放回pullRequestQueue,启动下一次pull流程。注:下图312行,pullRequest的nextOffseet已更新,所以下次拉取的时候会从新的offset开始拉取。f.?submitConsumeRequest处理流程最终会走到ConsumeRequest的run方法调用业务方注册的listener在processConsumeResult中处理消息消费Result、消费失败重试等逻辑8.2?服务端处理消息拉取入口在PullMessageProcessor#processRequest,该方法核心是DefaultMessageStore#getMessage。展开DefaultMessageStore#getMessage方法,传入topic、queueId、offset、maxMsgNum。根据topic,queueId确定consumeQueue文件。从找到的consumeQueue文件指定offset位置找到第一条‘索引’,读出对应在commitLog中的offset以及size。用上述读取到的offset和size从commitLog中读到目标消息(此处有相对较多的随机读取)。依次循环,直到消息数量达到maxMsgNum。九.?主从同步(HA)机制为了提高消息消费的高可用性,避免Broker发生单点故障引起存储在Broker上的消息无法及时消费甚至丢失,RocketMQ引入了Broker主备机制。即消息消费到达主服务器后需要将消息同步到消息从服务器,如果主服务器Broker宕机后,消息消费者可以从从服务器拉取消息。9.1?RocketMQ HA的实现原理master启动,并在特定端口上监听slave的连接。slave主动连接master,master接收客户端的连接,并建立相关TCP连接。slave主动向master发送待拉取消息偏移量,master解析请求并返回消息给从服务器。slave保存消息并继续发送新的消息同步请求。具体实现(HAService)master启动,并在特定端口上监听从服务器的连接。slave启动主动连接master(connectMaster)slave默认固定每隔5s向master会报当前拉取到的消息偏移量(reportSlaveMaxOffset)处理master发过来的消息副本(processReadEvent)继续发送将要拉取的 消息偏移量,等到master响应(reportSlaveMaxOffsetPlus)部分步骤详解slave -> master请求详解内容很简单,只有将要拉取的offsetmaster 处理 slave请求详解读取要拉取的offset更新slaveAckOffset,用于主从同步复制消息时及时响应给slave如果是初次收到slave请求,更新slaveRequestOffsetmaster发送header给slave(调用transferData)(包含接下来将要发送的offset)master从本地commitLog中查询指定offset处的消息,填充进入selectMappedBufferResultmaster调用transferData写出刚才查到的mq消息给到slave节点由于上述流程更新了nextTransferFromWhere,之后就可以不断循环,发送消息给到slave。十.Dledger模式从4.5版本开始,rocket引入了一种新的高可用机制,即rocketMq的Dleger模式。Dleger模式遵从Raft协议http://thesecretlivesofdata.com/raft/ 我们以一个集群三台机器为例(包含一主两从)。10.1?dleger的不同之处引入dledger后,broker端处理消息发送的流程略有不同:commitLog存储实现类换成了DledgerCommitLog,核心方法也从mappedFile.appendMessage换成了dLedgerServer.handleAppend,也就是说核心方法不再是把消息put进mappedByteBuffer就ok了。10.2?dLedger消息存储主流程剖析下dLedgerServer.handleAppend:判断从节点是不是压力太大响应太慢,如果是则直接拒绝producer发过来的请求因为理论上三节点组成的cluster至少要有一个从节点响应成功消息才算正常落地。如果两个从节点都响应慢,则master也无法接口请求,只能报错。如果从节点正常,则存入master本地的mappedByteBuffer(dLedgerStore.appendAsLeader)。将刚存入本地的消息 交给dLedgerEntryPusher来负责发送给slave,并等待从节点的响应(waitAck)。详解下waitAck:核心就一条,把msg在commitLog中的索引以及对应的future放入一个map。注:方便slave响应回来之后,根据响应的index,调用对应index的future,以发送响应给到producer,表示消息发送成功。10.3?消息从master到 slave的复制流程master启动时同时启动了2个dispatcher,各自对应一个slave节点。在dispatcher的主方法中,循环调用doAppend方法,不断复制消息发送到slave。doAppend负责根据当前已经到达的位置writeIndex,查询并发送消息到从节点(doAppendInner),并更新writeIndex,循环往复。doAppendInner 先查询index位置的消息,再rpc push消息到从节点。在从节点ack回调中,更新对应从节点所达到的水位(updatePeerWaterMark)。注:水位即对应从节点已经追加到的index位置。更新水位时,peerid为从节点的id,三个节点的水位信息统一放在peerWaterMarksByTerm。master-> slave push消息,收到响应后更新水位,就是这段的总结。10.4?消息复制成功后及时响应producer消息复制成功后及时响应producer,以表示消息发送成功。master启动时同样启动了一个QuorumAckChecker线程。QuorumAckChecker主方法中进行水位(peerWaterMarksByTerm)的处理。checker计算水位map中的数据,算出当前多数节点(两个节点就够了)已经达到的水位(quorumIndex),标志着这些消息已经收到slave的ack,低于这个水位的消息全部都可以给producer响应了。注:举例子:master append到了index 5,slave 1到了 4, slave 2 到了 3,则quorumIndex计算为4。处理的内容很简单,从pendingResponse的map中找出低于共识水位(quorumIndex)的消息发送response给到producer(调用future.complete)。这个future正是上边消息发送流程写完master本地后放入pendingResponse这个map的。至此整个producer的请求,响应流程已经结束。关注得物技术,携手走向技术的云端文|zmm
|
|