|
这是第345篇不掺水的,想要了解更多,请戳下方卡片关注我们吧~前言众所周知,作为一个出色的分布式消息中间件,RocketMQ 在全球范围内获得了广泛的应用,那么作为一个分布式消息中间件,最重要的是什么?协议?持久化?消息分发实现?高可用?高可靠?好的协议可以保证通讯的稳定,持久化可以保证数据的存储,消息分发实现可以结合多场景加速业务,高可用可以保证业务大量运行,高可靠可以保证业务的持续运行。今天,我们想谈一谈 RocketMQ 的高可用机制常见的消息中间件集群实现方式主从复制方式(非对称):这种方式下,一主多从的架构被广泛应用。消息中间件的写入操作只能在主节点上进行,而读取操作在所有节点上都可以执行。主节点负责数据的同步复制到所有从节点上,当主节点出现故障时,一个备份从节点会自动地竞选为新的主节点,以保证系统的持续运行。这种方式下,主节点和从节点之间的延时可能会导致数据不一致或者消息丢失。对等网络方式(对称):这种方式下,每个节点都可以承担读写操作的任务,数据可以通过节点之间的同步来保持一致性。每个节点的能力相同,不存在主从的概念。当一个节点出现故障时,其他节点会自动接管它的任务,因此系统没有单点故障。这种方式下,网络的复杂性和节点间的通信有可能造成性能瓶颈。RocktMQ 高可用方式在4.5版本之前,RocketMQ 不支持节点的自动晋升,那么如果主节点挂了,未消费的数据会从从节点上被继续消费,但是这一组节点就失去了作用,无法再被写入,若集群中主节点数量较少,可能会引起故障,于是在4.5版本,升级支持 DLedger 模式完成自动选主。我们现在看一下 DLedger 如何实现自动选主。首先,我们要明白 DLedger 模式的本质便是使用了 Raft 算法,接着我们来看一下 RocketMQ 的代码实现。在启动 broke r时,会将 CommitLog 转换为 DLedgerCommitLog 类型,并添加 DLedgerRoleChangeHandler 处理器,那么我们来看看 CommitLog 与 DLedgerCommitLog 的区别commitLog 是 RocketMQ 最核心的数据存储。它是一个顺序写的文件,用于存储 Producer 发送的消息和 Consumer 消费的消息,也就是全部通过消息中间件传递的消息。每一个写入 commitLog 的消息都会被分配一个唯一的 offset (偏移量),用于标识该条消息在 commitLog 中的位置。commitLog 中消息的存储格式包括消息长度、消息属性(如是否压缩、是否顺序消费、是否是事务消息等)、消息体等信息。publicclassCommitLog{publicfinalstaticintMESSAGE_MAGIC_CODE=-626843481;protectedstaticfinalInternalLoggerlog=InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);//空文件结束标识protectedfinalstaticintBLANK_MAGIC_CODE=-875286124;//文件队列,用于存储在磁盘上的消息protectedfinalMappedFileQueuemappedFileQueue;//默认的消息存储对象protectedfinalDefaultMessageStoredefaultMessageStore;//用于刷盘和提交的服务privatefinalFlushCommitLogServiceflushCommitLogService;//如果启用了TransientStorePool,我们必须在固定的时间内将消息刷新到FileChannelprivatefinalFlushCommitLogServicecommitLogService;//消息发送的回调函数privatefinalAppendMessageCallbackappendMessageCallback;privatefinalThreadLocalbatchEncoderThreadLocal;//用于存储每个topic的队列protectedHashMaptopicQueueTable=newHashMap(1024);//确认偏移量protectedvolatilelongconfirmOffset=-1L;//加锁期间的起始时间privatevolatilelongbeginTimeInLock=0;//消息发送的锁,用于防止并发发送。protectedfinalPutMessageLockputMessageLock;}DLedgerCommitLog 是 RocketMQ 用作持久化存储的一种实现方式,它基于Apache DistributedLog (DLedger) 实现了高可靠、高性能的分布式日志存储,也正是它,使得 CommitLog 拥有了选举复制的能力。publicclassDLedgerCommitLogextendsCommitLog{//DLedger实例privatefinalDLedgerServerdLedgerServer;//DLedger的配置信息privatefinalDLedgerConfigdLedgerConfig;//用于存储mmap文件的存储类privatefinalDLedgerMmapFileStoredLedgerFileStore;//mmap文件列表privatefinalMmapFileListdLedgerFileList;//id标识代理角色,0表示主,其他表示从privatefinalintid;privatefinalMessageSerializermessageSerializer;//进入DLedger锁的开始时间privatevolatilelongbeginTimeInDledgerLock=0;//分隔旧的commitlog和DLedgerCommitlog的偏移量privatelongdividedCommitlogOffset=-1;//是否正在恢复旧的commitlogprivatebooleanisInrecoveringOldCommitlog=false;}了解完 CommitLog,我们回到 broker 的启动,核心类 BrokerController 的 initialize 方法,首先会根据配置生成一个 DefaultMessageStore,这里会判断当前是否支持 DLedgerCommitLog,如果支持,则会创建一个 DLedgerRoleChangeHandler 对象并注册为 leader 选举的回调方法。接着与老版本一致会创建一个 BrokerStats 对象和一个 MessageStorePluginContext 对象。最后,会将 CommitLogDispatcherCalcBitMap 对象添加到 MessageStore 的 DispatcherList 中。publicclassBrokerController{publicbooleaninitialize()throwsCloneNotSupportedException{……if(result){try{this.messageStore=newDefaultMessageStore(this.messageStoreConfig,this.brokerStatsManager,this.messageArrivingListener,this.brokerConfig);//如果支持DLegerCommitLogif(messageStoreConfig.isEnableDLegerCommitLog()){//创建DLedgerRoleChangeHandlerDLedgerRoleChangeHandlerroleChangeHandler=newDLedgerRoleChangeHandler(this,(DefaultMessageStore)messageStore);//将CommitLog转换为DLegerCommitLog,并添加DLedgerRoleChangeHandler处理器((DLedgerCommitLog)((DefaultMessageStore)messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);}this.brokerStats=newBrokerStats((DefaultMessageStore)this.messageStore);//加载插件MessageStorePluginContextcontext=newMessageStorePluginContext(messageStoreConfig,brokerStatsManager,messageArrivingListener,brokerConfig);this.messageStore=MessageStoreFactory.build(context,this.messageStore);this.messageStore.getDispatcherList().addFirst(newCommitLogDispatcherCalcBitMap(this.brokerConfig,this.consumerFilterManager));}……}}接着我们进入 DLedgerCommitLog 的代码,这里可以看到使用了 openmessaging 包下的 DLedgerServer 组件publicclassDLedgerCommitLogextendsCommitLog{@Overridepublicvoidstart(){//启动dLedgerServerdLedgerServer.startup();}}基于 dLedgerLeaderElector 做了 Leader 选举的操作publicclassDLedgerServerextendsAbstractDLedgerServer{publicsynchronizedvoidstartup(){if(!isStarted){this.dLedgerStore.startup();this.fsmCaller.ifPresent(x->{//启动状态机调用程序并加载现有快照以进行数据恢复x.start();x.getSnapshotManager().loadSnapshot();});if(RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)){this.dLedgerRpcService.startup();}this.dLedgerEntryPusher.startup();//进行leader选举this.dLedgerLeaderElector.startup();executorService.scheduleAtFixedRate(this::checkPreferredLeader,1000,1000,TimeUnit.MILLISECONDS);isStarted=true;}}}启动状态机publicclassDLedgerLeaderElector{publicvoidstartup(){//启动状态机stateMaintainer.start();for(RoleChangeHandlerroleChangeHandler:roleChangeHandlers){roleChangeHandler.startup();}}}状态机publicclassStateMaintainerextendsShutdownAbleThread{publicStateMaintainer(Stringname,Loggerlogger){super(name,logger);}@OverridepublicvoiddoWork(){try{//是否支持Leader选举if(DLedgerLeaderElector.this.dLedgerConfig.isEnableLeaderElector()){DLedgerLeaderElector.this.refreshIntervals(dLedgerConfig);//状态机核心方法DLedgerLeaderElector.this.maintainState();}sleep(10);}catch(Throwablet){DLedgerLeaderElector.LOGGER.error("Errorinheartbeat",t);}}}状态机核心方法,Raft 中的三个角色,Leader、Follower、Candidate,这里对三个角色不做过多叙述,可以参见《In Search of an Understandable Consensus Algorithm》一文publicclassDLedgerLeaderElector{privatevoidmaintainState()throwsException{//Leader角色if(memberState.isLeader()){maintainAsLeader();}elseif(memberState.isFollower()){//Follower角色maintainAsFollower();}else{//Candidate角色maintainAsCandidate();}}}接下来我们来看 raft 的核心实现,首先看 Leader 角色的实现代码当上次发送心跳时间大于心跳包间隔时,会重新发送心跳publicclassDLedgerLeaderElector{privatevoidmaintainAsLeader()throwsException{//上次发送心跳时间是否大于心跳包间隔时间if(DLedgerUtils.elapsed(lastSendHeartBeatTime)>heartBeatTimeIntervalMs){//任期longterm;//主节点idStringleaderId;synchronized(memberState){if(!memberState.isLeader()){//stopsendingreturn;}term=memberState.currTerm();leaderId=memberState.getLeaderId();lastSendHeartBeatTime=System.currentTimeMillis();}//发送心跳sendHeartbeats(term,leaderId);}}}当我们看心跳代码前,首先回顾下 raft 理论中对于心跳返回的描述再让我们回到代码publicclassDLedgerLeaderElector{privatevoidsendHeartbeats(longterm,StringleaderId)throwsException{……for(Stringid:memberState.getPeerMap().keySet()){if(memberState.getSelfId().equals(id)){continue;}HeartBeatRequestheartBeatRequest=newHeartBeatRequest();heartBeatRequest.setGroup(memberState.getGroup());heartBeatRequest.setLocalId(memberState.getSelfId());heartBeatRequest.setRemoteId(id);//主节点idheartBeatRequest.setLeaderId(leaderId);//任期heartBeatRequest.setTerm(term);//异步发送心跳CompletableFuturefuture=dLedgerRpcService.heartBeat(heartBeatRequest);future.whenComplete((HeartBeatResponsex,Throwableex)->{try{if(ex!=null){memberState.getPeersLiveTable().put(id,Boolean.FALSE);throwex;}//获取心跳结果switch(DLedgerResponseCode.valueOf(x.getCode())){//成功caseSUCCESS:succNum.incrementAndGet();break;//主节点的Term小于从节点caseEXPIRED_TERM:maxTerm.set(x.getTerm());break;//从节点的主节点非当前节点caseINCONSISTENT_LEADER:inconsistLeader.compareAndSet(false,true);break;//从节点尚未准备完毕caseTERM_NOT_READY:notReadyNum.incrementAndGet();break;default:break;}……}catch(Throwablet){LOGGER.error("heartbeatresponsefailed",t);}finally{allNum.incrementAndGet();if(allNum.get()==memberState.peerSize()){beatLatch.countDown();}}});}longvoteResultWaitTime=10;beatLatch.await(heartBeatTimeIntervalMs-voteResultWaitTime,TimeUnit.MILLISECONDS);Thread.sleep(voteResultWaitTime);//当从节点返回的term大于自身时,直接退化为candidateif(maxTerm.get()>term){LOGGER.warn("[{}]currentTerm{}isnotthebiggest={},dealwithit",memberState.getSelfId(),term,maxTerm.get());changeRoleToCandidate(maxTerm.get());return;}//当半数以上正常返回心跳时,Leader状态正常,重置心跳时间if(memberState.isQuorum(succNum.get())){lastSuccHeartBeatTime=System.currentTimeMillis();}else{LOGGER.info("[{}]Parseheartbeatresponsesincost={}term={}allNum={}succNum={}notReadyNum={}inconsistLeader={}maxTerm={}peerSize={}lastSuccHeartBeatTime={}",memberState.getSelfId(),DLedgerUtils.elapsed(startHeartbeatTimeMs),term,allNum.get(),succNum.get(),notReadyNum.get(),inconsistLeader.get(),maxTerm.get(),memberState.peerSize(),newTimestamp(lastSuccHeartBeatTime));//当正常心跳+未准备的心跳大于半数时,立即发送心跳if(memberState.isQuorum(succNum.get()+notReadyNum.get())){lastSendHeartBeatTime=-1;//当从节点中有其他主节点时,直接退化为candidate}elseif(inconsistLeader.get()){changeRoleToCandidate(term);//如果上次心跳包时间大于3次心跳间隔时间,直接退化为candidate}elseif(DLedgerUtils.elapsed(lastSuccHeartBeatTime)>(long)maxHeartBeatLeak*heartBeatTimeIntervalMs){changeRoleToCandidate(term);}}}}从简单上来说,Leader 只做了一件事,那就是发送心跳,根据心跳结果判断服务是否正常及自己的地位。接着让我们看 Follower 做了什么,Follower 在选举中的流程比较简单publicclassDLedgerLeaderElector{privatevoidmaintainAsFollower(){//如果上次心跳时间大于2次心跳间隔if(DLedgerUtils.elapsed(lastLeaderHeartBeatTime)>2L*heartBeatTimeIntervalMs){synchronized(memberState){//如果当前角色是Follower,并且心跳大于3次心跳间隔,升级到candidateif(memberState.isFollower()&DLedgerUtils.elapsed(lastLeaderHeartBeatTime)>(long)maxHeartBeatLeak*heartBeatTimeIntervalMs){LOGGER.info("[{}][HeartBeatTimeOut]lastLeaderHeartBeatTime:{}heartBeatTimeIntervalMs:{}lastLeader={}",memberState.getSelfId(),newTimestamp(lastLeaderHeartBeatTime),heartBeatTimeIntervalMs,memberState.getLeaderId());changeRoleToCandidate(memberState.currTerm());}}}}}最后我们来看 Candidate 角色,这块的代码比较多,让我们逐行来进行分析publicclassDLedgerLeaderElector{privatevoidmaintainAsCandidate()throwsException{//如果当前时间小于下次发起投票时间或者不应该立即发起投票,返回if(System.currentTimeMillis()>quorumVoteResponses=voteForQuorumResponses(term,ledgerEndTerm,ledgerEndIndex);……for(CompletableFuturefuture:quorumVoteResponses){future.whenComplete((VoteResponsex,Throwableex)->{try{if(ex!=null){throwex;}LOGGER.info("[{}][GetVoteResponse]{}",memberState.getSelfId(),JSON.toJSONString(x));if(x.getVoteResult()!=VoteResponse.RESULT.UNKNOWN){validNum.incrementAndGet();}synchronized(knownMaxTermInGroup){switch(x.getVoteResult()){//赞成,成功数加一caseACCEPT:acceptedNum.incrementAndGet();break;//被已有Leader的节点拒绝caseREJECT_ALREADY_HAS_LEADER:alreadyHasLeader.compareAndSet(false,true);break;//任期小于其他选举人caseREJECT_TERM_SMALL_THAN_LEDGER:caseREJECT_EXPIRED_VOTE_TERM:if(x.getTerm()>knownMaxTermInGroup.get()){//维护最大任期knownMaxTermInGroup.set(x.getTerm());}break;//任期小于对方caseREJECT_EXPIRED_LEDGER_TERM://日志小于对方caseREJECT_SMALL_LEDGER_END_INDEX:biggerLedgerNum.incrementAndGet();break;//对方尚未准备完成caseREJECT_TERM_NOT_READY:notReadyTermNum.incrementAndGet();break;//已投票caseREJECT_ALREADY_VOTED://拒绝接受领导caseREJECT_TAKING_LEADERSHIP:default:break;}}//如果已经有leader或已接受的投票数量满足quorum或者已接受和未准备好的数量之和满足quorum,释放阻塞状态if(alreadyHasLeader.get()||memberState.isQuorum(acceptedNum.get())||memberState.isQuorum(acceptedNum.get()+notReadyTermNum.get())){voteLatch.countDown();}}catch(Throwablet){LOGGER.error("voteresponsefailed",t);}finally{allNum.incrementAndGet();//所有异步请求结束时,释放阻塞状态if(allNum.get()==memberState.peerSize()){voteLatch.countDown();}}});}try{//生成一个随机数的阻塞时间voteLatch.await(2000+random.nextInt(maxVoteIntervalMs),TimeUnit.MILLISECONDS);}catch(Throwableignore){}lastVoteCost=DLedgerUtils.elapsed(startVoteTimeMs);VoteResponse.ParseResultparseResult;if(knownMaxTermInGroup.get()>term){//已知的最大任期比当前任期要大,则返回WAIT_TO_VOTE_NEXT,并转变为CandidateparseResult=VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;nextTimeToRequestVote=getNextTimeToRequestVote();changeRoleToCandidate(knownMaxTermInGroup.get());}elseif(alreadyHasLeader.get()){//已经存在Leader,则返回WAIT_TO_VOTE_NEXTparseResult=VoteResponse.ParseResult.WAIT_TO_REVOTE;nextTimeToRequestVote=getNextTimeToRequestVote()+(long)heartBeatTimeIntervalMs*maxHeartBeatLeak;}elseif(!memberState.isQuorum(validNum.get())){//有效响应的数量无法满足quorum,则返回WAIT_TO_REVOTEparseResult=VoteResponse.ParseResult.WAIT_TO_REVOTE;nextTimeToRequestVote=getNextTimeToRequestVote();}elseif(!memberState.isQuorum(validNum.get()-biggerLedgerNum.get())){//有效响应的数量减去日志条目大于自身的数量无法满足quorum,则返回WAIT_TO_REVOTEparseResult=VoteResponse.ParseResult.WAIT_TO_REVOTE;nextTimeToRequestVote=getNextTimeToRequestVote()+maxVoteIntervalMs;}elseif(memberState.isQuorum(acceptedNum.get())){//接受的投票数量满足quorum,则本次投票通过parseResult=VoteResponse.ParseResult.PASSED;}elseif(memberState.isQuorum(acceptedNum.get()+notReadyTermNum.get())){//已接受和未准备好的数量之和满足quorum,则立即进行投票parseResult=VoteResponse.ParseResult.REVOTE_IMMEDIATELY;}else{parseResult=VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;nextTimeToRequestVote=getNextTimeToRequestVote();}lastParseResult=parseResult;LOGGER.info("[{}][PARSE_VOTE_RESULT]cost={}term={}memberNum={}allNum={}acceptedNum={}notReadyTermNum={}biggerLedgerNum={}alreadyHasLeader={}maxTerm={}result={}",memberState.getSelfId(),lastVoteCost,term,memberState.peerSize(),allNum,acceptedNum,notReadyTermNum,biggerLedgerNum,alreadyHasLeader,knownMaxTermInGroup.get(),parseResult);if(parseResult==VoteResponse.ParseResult.PASSED){LOGGER.info("[{}][VOTE_RESULT]hasbeenelectedtobetheleaderinterm{}",memberState.getSelfId(),term);//如果是通过,则转变为Leader对象changeRoleToLeader(term);}}}由于篇幅的问题,我们并没有一一将 Dledger 的核心代码在这里展现,在此仅展示了选主等基本流程,对于写入、复制、日志存储、消息传递等都不多加诉说。Dledger 算法的核心原理是 Raft 协议,当一个节点发起投票请求时,其他节点会收到请求并发送响应,响应的结果将根据投票数量判断是否达成共识。如果共识达成,则新的 leader 将被选举出来,同时新的日志将被追加到磁盘上。如果共识未达成,则需要等待一定时间后重新发起投票请求。DLedger 模式的弊端那么这写法有问题吗?看起来似乎非常完美,解决了选主的问题,但是同时给用户造成了很大的困扰,首先,Broker 的副本必须是三个及以上,副本的 ACK 必须遵循多数派协议,这一点造成了成本与性能损耗的上升,其次,这使得 RocketMQ 存在两套 HA 复制流程,且 Raft 模式下的复制无法利用 RocketMQ 原生的存储能力。RocketMQ 5.0于是 RocketMQ 在 5.0 版本出了一个全新的模式,Controller 模式。一个基于 Raft 的一致性模块(DLedger Controller),并当作一个可选的选主组件,支持独立部署,也可以嵌入在 Nameserver 中,Broker 通过与 Controller 的交互完成 Master 的选举。内嵌在 Nameserver 中独立部署DLedger Controller模式的优劣然而,即使是 5.0 中做了优化的 DLedger Controller,仍然存在一些问题,下面让我们看看这个模式带来的优缺点内嵌在 NameServer 中资源竞争:如果 DLedger Controller 占用了过多的资源,可能会影响 NameServer 的其他功能。稳定性问题:DLedger Controller 可能对 NameServer 的稳定性产生影响,例如当 DLedger Controller 出现故障或卡顿时,可能会影响整个 NameServer 运行的稳定性。难以分离:将 DLedger Controller 内嵌在 NameServer 中,可能会降低其可分离性,增加其耦合度。简化了部署:将 DLedger Controller 内嵌在 NameServer 中,可以减少需要部署的进程数量,降低了部署和维护的复杂度。减少网络通信:内嵌在 NameServer 中的 DLedger Controller 可以直接与 NameServer 进行通信,减少了网络通信开销,提高了性能和可靠性。整合了多个功能:内嵌在 NameServer 中的 DLedger Controller 可以整合 NameServer 的多个功能,如目录服务、路由服务、定时任务等,提高了系统集成的效率和灵活性。优点缺点独立部署复杂度高:DLedger Controller 的实现需要复杂的算法和数据结构,对开发人员的要求较高。成本高:为了保证高可用性和稳定性,需要配置较多的物理服务器和网络设备,成本较高。维护难度大:DLedger Controller 需要不断地进行监控和维护,一旦出现问题,可能需要进行复杂的排错和修复,增加了维护难度。高可用:DLedger Controller 可提供高可用的服务,即使其中一台服务器出现问题,仍能保证集群的正常运行。性能优秀:DLedger Controller 能快速处理大量的日志数据,使得 RocketMQ 能够高效地处理分布式事务。扩展性好:DLedger Controller 可以添加新的节点,以适应不断增长的数据量和用户需求。优点缺点总结本文简单介绍了 DLedger 基于 Raft 协议实现的 Leader 选举机制,让大家深入地理解分布式系统中的 Leader 选举过程。然而如何在实际场景下选取、优化和扩展分布式一致性算法,都是非常值得探讨的问题。参考文献Apache RocketMQIn Search of an Understandable Consensus Algorithm (Extended Version)看完两件事如果你觉得这篇内容对你挺有启发,我想邀请你帮我两件小事1.点个「在看」,让更多人也能看到这篇内容(点了「在看」,bug -1 )2.关注公众号「政采云技术」,持续为你推送精选好文招贤纳士政采云技术团队(Zero),Base 杭州,一个富有激情和技术匠心精神的成长型团队。规模 500 人左右,在日常业务开发之外,还分别在云原生、区块链、人工智能、低代码平台、中间件、大数据、物料体系、工程平台、性能体验、可视化等领域进行技术探索和实践,推动并落地了一系列的内部技术产品,持续探索技术的新边界。此外,团队还纷纷投身社区建设,目前已经是 google flutter、scikit-learn、Apache Dubbo、Apache Rocketmq、Apache Pulsar、CNCF Dapr、Apache DolphinScheduler、alibaba Seata 等众多优秀开源社区的贡献者。如果你想改变一直被事折腾,希望开始折腾事;如果你想改变一直被告诫需要多些想法,却无从破局;如果你想改变你有能力去做成那个结果,却不需要你;如果你想改变你想做成的事需要一个团队去支撑,但没你带人的位置;如果你想改变本来悟性不错,但总是有那一层窗户纸的模糊……如果你相信相信的力量,相信平凡人能成就非凡事,相信能遇到更好的自己。如果你希望参与到随着业务腾飞的过程,亲手推动一个有着深入的业务理解、完善的技术体系、技术创造价值、影响力外溢的技术团队的成长过程,我觉得我们该聊聊。任何时间,等着你写点什么,发给 zcy-tc@cai-inc.com
|
|