找回密码
 会员注册
查看: 19|回复: 0

RocketMQDLedger初识

[复制链接]

2万

主题

0

回帖

6万

积分

超级版主

积分
63700
发表于 2024-10-12 22:05:55 | 显示全部楼层 |阅读模式
RocketMQ DLedger初识 345 前言众所周知,作为一个出色的分布式消息中间件,RocketMQ 在全球范围内获得了广泛的应用,那么作为一个分布式消息中间件,最重要的是什么?协议?持久化?消息分发实现?高可用?高可靠?好的协议可以保证通讯的稳定,持久化可以保证数据的存储,消息分发实现可以结合多场景加速业务,高可用可以保证业务大量运行,高可靠可以保证业务的持续运行。今天,我们想谈一谈 RocketMQ 的高可用机制常见的消息中间件集群实现方式主从复制方式(非对称):这种方式下,一主多从的架构被广泛应用。消息中间件的写入操作只能在主节点上进行,而读取操作在所有节点上都可以执行。主节点负责数据的同步复制到所有从节点上,当主节点出现故障时,一个备份从节点会自动地竞选为新的主节点,以保证系统的持续运行。这种方式下,主节点和从节点之间的延时可能会导致数据不一致或者消息丢失。对等网络方式(对称):这种方式下,每个节点都可以承担读写操作的任务,数据可以通过节点之间的同步来保持一致性。每个节点的能力相同,不存在主从的概念。当一个节点出现故障时,其他节点会自动接管它的任务,因此系统没有单点故障。这种方式下,网络的复杂性和节点间的通信有可能造成性能瓶颈。RocktMQ 高可用方式在4.5版本之前,RocketMQ 不支持节点的自动晋升,那么如果主节点挂了,未消费的数据会从从节点上被继续消费,但是这一组节点就失去了作用,无法再被写入,若集群中主节点数量较少,可能会引起故障,于是在4.5版本,升级支持 DLedger 模式完成自动选主。我们现在看一下 DLedger 如何实现自动选主。首先,我们要明白 DLedger 模式的本质便是使用了 Raft 算法,接着我们来看一下 RocketMQ 的代码实现。在启动 broke r时,会将 CommitLog 转换为 DLedgerCommitLog 类型,并添加 DLedgerRoleChangeHandler 处理器,那么我们来看看 CommitLog 与 DLedgerCommitLog 的区别commitLog 是 RocketMQ 最核心的数据存储。它是一个顺序写的文件,用于存储 Producer 发送的消息和 Consumer 消费的消息,也就是全部通过消息中间件传递的消息。每一个写入 commitLog 的消息都会被分配一个唯一的 offset (偏移量),用于标识该条消息在 commitLog 中的位置。commitLog 中消息的存储格式包括消息长度、消息属性(如是否压缩、是否顺序消费、是否是事务消息等)、消息体等信息。public class CommitLog {    public final static int MESSAGE_MAGIC_CODE = -626843481;    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);    // 空文件结束标识    protected final static int BLANK_MAGIC_CODE = -875286124;   // 文件队列,用于存储在磁盘上的消息    protected final MappedFileQueue mappedFileQueue;    // 默认的消息存储对象    protected final DefaultMessageStore defaultMessageStore;    // 用于刷盘和提交的服务    private final FlushCommitLogService flushCommitLogService;    // 如果启用了TransientStorePool,我们必须在固定的时间内将消息刷新到FileChannel    private final FlushCommitLogService commitLogService;  // 消息发送的回调函数    private final AppendMessageCallback appendMessageCallback;    private final ThreadLocal batchEncoderThreadLocal;    // 用于存储每个 topic 的队列    protected HashMap topicQueueTable = new HashMap(1024);    // 确认偏移量    protected volatile long confirmOffset = -1L;  // 加锁期间的起始时间    private volatile long beginTimeInLock = 0;    // 消息发送的锁,用于防止并发发送。    protected final utMessageLock putMessageLock;}DLedgerCommitLog 是 RocketMQ 用作持久化存储的一种实现方式,它基于Apache DistributedLog (DLedger) 实现了高可靠、高性能的分布式日志存储,也正是它,使得 CommitLog 拥有了选举复制的能力。public class DLedgerCommitLog extends CommitLog {   // DLedger实例    private final DLedgerServer dLedgerServer;   // DLedger的配置信息    private final DLedgerConfig dLedgerConfig;   // 用于存储mmap文件的存储类    private final DLedgerMmapFileStore dLedgerFileStore;   // mmap文件列表    private final MmapFileList dLedgerFileList;    // id标识代理角色,0表示主,其他表示从    private final int id;    private final MessageSerializer messageSerializer;   // 进入DLedger锁的开始时间    private volatile long beginTimeInDledgerLock = 0;    // 分隔旧的commitlog和DLedgerCommitlog的偏移量    private long dividedCommitlogOffset = -1;  // 是否正在恢复旧的commitlog    private boolean isInrecoveringOldCommitlog = false;}了解完 CommitLog,我们回到 broker 的启动,核心类 BrokerController 的 initialize 方法,首先会根据配置生成一个 DefaultMessageStore,这里会判断当前是否支持 DLedgerCommitLog,如果支持,则会创建一个 DLedgerRoleChangeHandler 对象并注册为 leader 选举的回调方法。接着与老版本一致会创建一个 BrokerStats 对象和一个 MessageStorePluginContext 对象。最后,会将 CommitLogDispatcherCalcBitMap 对象添加到 MessageStore 的 DispatcherList 中。public class BrokerController {    public boolean initialize() throws CloneNotSupportedException {        ……        if (result) {            try {                this.messageStore =                    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,                        this.brokerConfig);                // 如果支持DLegerCommitLog                if (messageStoreConfig.isEnableDLegerCommitLog()) {                    // 创建DLedgerRoleChangeHandler                    DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);                    // 将CommitLog转换为DLegerCommitLog,并添加DLedgerRoleChangeHandler处理器                    ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);                }                this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);                // 加载插件                MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);                this.messageStore = MessageStoreFactory.build(context, this.messageStore);                this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));        }        ……    }}接着我们进入 DLedgerCommitLog 的代码,这里可以看到使用了 openmessaging 包下的 DLedgerServer 组件public class DLedgerCommitLog extends CommitLog {  @Override  public void start() {      // 启动dLedgerServer      dLedgerServer.startup();  }}基于 dLedgerLeaderElector 做了 Leader 选举的操作public class DLedgerServer extends AbstractDLedgerServer {    public synchronized void startup() {            if (!isStarted) {                this.dLedgerStore.startup();                this.fsmCaller.ifPresent(x -> {                    // 启动状态机调用程序并加载现有快照以进行数据恢复                    x.start();                    x.getSnapshotManager().loadSnapshot();                });                if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) {                    this.dLedgerRpcService.startup();                }                this.dLedgerEntryPusher.startup();                // 进行leader选举                this.dLedgerLeaderElector.startup();                executorService.scheduleAtFixedRate(this::checkPreferredLeader, 1000, 1000, TimeUnit.MILLISECONDS);                isStarted = true;            }        }}启动状态机public class DLedgerLeaderElector {    public void startup() {        // 启动状态机            stateMaintainer.start();            for (RoleChangeHandler roleChangeHandler : roleChangeHandlers) {                roleChangeHandler.startup();            }        }}状态机public class StateMaintainer extends ShutdownAbleThread {        public StateMaintainer(String name, Logger logger) {            super(name, logger);        }        @Override        public void doWork() {            try {              // 是否支持Leader选举                if (DLedgerLeaderElector.this.dLedgerConfig.isEnableLeaderElector()) {                    DLedgerLeaderElector.this.refreshIntervals(dLedgerConfig);                    // 状态机核心方法                    DLedgerLeaderElector.this.maintainState();                }                sleep(10);            } catch (Throwable t) {                DLedgerLeaderElector.LOGGER.error("Error in heartbeat", t);            }        }}状态机核心方法,Raft 中的三个角色,Leader、Follower、Candidate,这里对三个角色不做过多叙述,可以参见《In Search of an Understandable Consensus Algorithm》 一文public class DLedgerLeaderElector {  private void maintainState() throws Exception {    // Leader角色      if (memberState.isLeader()) {          maintainAsLeader();      } else if (memberState.isFollower()) {          // Follower角色          maintainAsFollower();      } else {          // Candidate角色          maintainAsCandidate();      }  }}接下来我们来看 raft 的核心实现,首先看 Leader 角色的实现代码当上次发送心跳时间大于心跳包间隔时,会重新发送心跳public class DLedgerLeaderElector {  private void maintainAsLeader() throws Exception {    // 上次发送心跳时间是否大于心跳包间隔时间        if (DLedgerUtils.elapsed(lastSendHeartBeatTime) > heartBeatTimeIntervalMs) {          // 任期            long term;            // 主节点id            String leaderId;            synchronized (memberState) {                if (!memberState.isLeader()) {                    //stop sending                    return;                }                term = memberState.currTerm();                leaderId = memberState.getLeaderId();                lastSendHeartBeatTime = System.currentTimeMillis();            }            // 发送心跳            sendHeartbeats(term, leaderId);        }    }}当我们看心跳代码前,首先回顾下 raft 理论中对于心跳返回的描述再让我们回到代码public class DLedgerLeaderElector {  private void sendHeartbeats(long term, String leaderId) throws Exception {        ……        for (String id : memberState.getPeerMap().keySet()) {            if (memberState.getSelfId().equals(id)) {                continue;            }            HeartBeatRequest heartBeatRequest = new HeartBeatRequest();            heartBeatRequest.setGroup(memberState.getGroup());            heartBeatRequest.setLocalId(memberState.getSelfId());            heartBeatRequest.setRemoteId(id);            // 主节点id            heartBeatRequest.setLeaderId(leaderId);            // 任期            heartBeatRequest.setTerm(term);            // 异步发送心跳            CompletableFuture future = dLedgerRpcService.heartBeat(heartBeatRequest);            future.whenComplete((HeartBeatResponse x, Throwable ex) -> {                try {                    if (ex != null) {                        memberState.getPeersLiveTable().put(id, Boolean.FALSE);                        throw ex;                    }                    // 获取心跳结果                    switch (DLedgerResponseCode.valueOf(x.getCode())) {                      // 成功                        case SUCCESS:                            succNum.incrementAndGet();                            break;                        // 主节点的Term小于从节点                        case EXPIRED_TERM:                            maxTerm.set(x.getTerm());                            break;                        // 从节点的主节点非当前节点                        case INCONSISTENT_LEADER:                            inconsistLeader.compareAndSet(false, true);                            break;                        // 从节点尚未准备完毕                        case TERM_NOT_READY:                            notReadyNum.incrementAndGet();                            break;                        default:                            break;                    }                    ……                } catch (Throwable t) {                    LOGGER.error("heartbeat response failed", t);                } finally {                    allNum.incrementAndGet();                    if (allNum.get() == memberState.peerSize()) {                        beatLatch.countDown();                    }                }            });        }        long voteResultWaitTime = 10;        beatLatch.await(heartBeatTimeIntervalMs - voteResultWaitTime, TimeUnit.MILLISECONDS);        Thread.sleep(voteResultWaitTime);        // 当从节点返回的term大于自身时,直接退化为candidate        if (maxTerm.get() > term) {            LOGGER.warn("[{}] currentTerm{} is not the biggest={}, deal with it", memberState.getSelfId(), term, maxTerm.get());            changeRoleToCandidate(maxTerm.get());            return;        }    // 当半数以上正常返回心跳时,Leader状态正常,重置心跳时间        if (memberState.isQuorum(succNum.get())) {            lastSuccHeartBeatTime = System.currentTimeMillis();        } else {            LOGGER.info("[{}] arse heartbeat responses in cost={} term={} allNum={} succNum={} notReadyNum={} inconsistLeader={} maxTerm={} peerSize={} lastSuccHeartBeatTime={}",                    memberState.getSelfId(), DLedgerUtils.elapsed(startHeartbeatTimeMs), term, allNum.get(), succNum.get(), notReadyNum.get(), inconsistLeader.get(), maxTerm.get(), memberState.peerSize(), new Timestamp(lastSuccHeartBeatTime));            // 当正常心跳 + 未准备的心跳大于半数时,立即发送心跳            if (memberState.isQuorum(succNum.get() + notReadyNum.get())) {                lastSendHeartBeatTime = -1;            // 当从节点中有其他主节点时,直接退化为candidate            } else if (inconsistLeader.get()) {                changeRoleToCandidate(term);            // 如果上次心跳包时间大于3次心跳间隔时间,直接退化为candidate            } else if (DLedgerUtils.elapsed(lastSuccHeartBeatTime) > (long) maxHeartBeatLeak * heartBeatTimeIntervalMs) {                changeRoleToCandidate(term);            }        }    }}从简单上来说,Leader 只做了一件事,那就是发送心跳,根据心跳结果判断服务是否正常及自己的地位。接着让我们看 Follower 做了什么,Follower 在选举中的流程比较简单public class DLedgerLeaderElector {  private void maintainAsFollower() {    // 如果上次心跳时间大于2次心跳间隔        if (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > 2L * heartBeatTimeIntervalMs) {            synchronized (memberState) {              // 如果当前角色是Follower,并且心跳大于3次心跳间隔,升级到candidate                if (memberState.isFollower() & DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > (long) maxHeartBeatLeak * heartBeatTimeIntervalMs) {                    LOGGER.info("[{}][HeartBeatTimeOut] lastLeaderHeartBeatTime: {} heartBeatTimeIntervalMs: {} lastLeader={}", memberState.getSelfId(), new Timestamp(lastLeaderHeartBeatTime), heartBeatTimeIntervalMs, memberState.getLeaderId());                    changeRoleToCandidate(memberState.currTerm());                }            }        }    }}最后我们来看 Candidate 角色,这块的代码比较多,让我们逐行来进行分析public class DLedgerLeaderElector {  private void maintainAsCandidate() throws Exception {        // 如果当前时间小于下次发起投票时间或者不应该立即发起投票,返回        if (System.currentTimeMillis() > quorumVoteResponses = voteForQuorumResponses(term, ledgerEndTerm, ledgerEndIndex);                ……        for (CompletableFuture future : quorumVoteResponses) {            future.whenComplete((VoteResponse x, Throwable ex) -> {                try {                    if (ex != null) {                        throw ex;                    }                    LOGGER.info("[{}][GetVoteResponse] {}", memberState.getSelfId(), JSON.toJSONString(x));                    if (x.getVoteResult() != VoteResponse.RESULT.UNKNOWN) {                        validNum.incrementAndGet();                    }                    synchronized (knownMaxTermInGroup) {                        switch (x.getVoteResult()) {                          // 赞成,成功数加一                            case ACCEPT:                                acceptedNum.incrementAndGet();                                break;                            // 被已有Leader的节点拒绝                            case REJECT_ALREADY_HAS_LEADER:                                alreadyHasLeader.compareAndSet(false, true);                                break;                            // 任期小于其他选举人                            case REJECT_TERM_SMALL_THAN_LEDGER:                            case REJECT_EXPIRED_VOTE_TERM:                                if (x.getTerm() > knownMaxTermInGroup.get()) {                                  // 维护最大任期                                    knownMaxTermInGroup.set(x.getTerm());                                }                                break;                            // 任期小于对方                            case REJECT_EXPIRED_LEDGER_TERM:                            // 日志小于对方                            case REJECT_SMALL_LEDGER_END_INDEX:                                biggerLedgerNum.incrementAndGet();                                break;                            // 对方尚未准备完成                            case REJECT_TERM_NOT_READY:                                notReadyTermNum.incrementAndGet();                                break;                            // 已投票                            case REJECT_ALREADY_VOTED:                            // 拒绝接受领导                            case REJECT_TAKING_LEADERSHIP:                            default:                                break;                        }                    }                    // 如果已经有leader或已接受的投票数量满足 quorum 或者已接受和未准备好的数量之和满足 quorum,释放阻塞状态                    if (alreadyHasLeader.get()                            || memberState.isQuorum(acceptedNum.get())                            || memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {                        voteLatch.countDown();                    }                } catch (Throwable t) {                    LOGGER.error("vote response failed", t);                } finally {                    allNum.incrementAndGet();                   // 所有异步请求结束时,释放阻塞状态                    if (allNum.get() == memberState.peerSize()) {                        voteLatch.countDown();                    }                }            });        }        try {            // 生成一个随机数的阻塞时间            voteLatch.await(2000 + random.nextInt(maxVoteIntervalMs), TimeUnit.MILLISECONDS);        } catch (Throwable ignore) {        }        lastVoteCost = DLedgerUtils.elapsed(startVoteTimeMs);        VoteResponse.ParseResult parseResult;        if (knownMaxTermInGroup.get() > term) {            // 已知的最大任期比当前任期要大,则返回 WAIT_TO_VOTE_NEXT,并转变为Candidate            parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;            nextTimeToRequestVote = getNextTimeToRequestVote();            changeRoleToCandidate(knownMaxTermInGroup.get());        } else if (alreadyHasLeader.get()) {           // 已经存在Leader,则返回 WAIT_TO_VOTE_NEXT            parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;            nextTimeToRequestVote = getNextTimeToRequestVote() + (long) heartBeatTimeIntervalMs * maxHeartBeatLeak;        } else if (!memberState.isQuorum(validNum.get())) {           // 有效响应的数量无法满足 quorum,则返回 WAIT_TO_REVOTE             parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;            nextTimeToRequestVote = getNextTimeToRequestVote();        } else if (!memberState.isQuorum(validNum.get() - biggerLedgerNum.get())) {           // 有效响应的数量减去日志条目大于自身的数量无法满足 quorum,则返回 WAIT_TO_REVOTE             parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;            nextTimeToRequestVote = getNextTimeToRequestVote() + maxVoteIntervalMs;        } else if (memberState.isQuorum(acceptedNum.get())) {           // 接受的投票数量满足 quorum,则本次投票通过            parseResult = VoteResponse.ParseResult.PASSED;        } else if (memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {            // 已接受和未准备好的数量之和满足 quorum,则立即进行投票            parseResult = VoteResponse.ParseResult.REVOTE_IMMEDIATELY;        } else {            parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;            nextTimeToRequestVote = getNextTimeToRequestVote();        }        lastParseResult = parseResult;        LOGGER.info("[{}] [PARSE_VOTE_RESULT] cost={} term={} memberNum={} allNum={} acceptedNum={} notReadyTermNum={} biggerLedgerNum={} alreadyHasLeader={} maxTerm={} result={}",                memberState.getSelfId(), lastVoteCost, term, memberState.peerSize(), allNum, acceptedNum, notReadyTermNum, biggerLedgerNum, alreadyHasLeader, knownMaxTermInGroup.get(), parseResult);        if (parseResult == VoteResponse.ParseResult.PASSED) {            LOGGER.info("[{}] [VOTE_RESULT] has been elected to be the leader in term {}", memberState.getSelfId(), term);           // 如果是通过,则转变为Leader对象            changeRoleToLeader(term);        }    }}由于篇幅的问题,我们并没有一一将 Dledger 的核心代码在这里展现,在此仅展示了选主等基本流程,对于写入、复制、日志存储、消息传递等都不多加诉说。Dledger 算法的核心原理是 Raft 协议,当一个节点发起投票请求时,其他节点会收到请求并发送响应,响应的结果将根据投票数量判断是否达成共识。如果共识达成,则新的 leader 将被选举出来,同时新的日志将被追加到磁盘上。如果共识未达成,则需要等待一定时间后重新发起投票请求。DLedger 模式的弊端那么这写法有问题吗?看起来似乎非常完美,解决了选主的问题,但是同时给用户造成了很大的困扰,首先,Broker 的副本必须是三个及以上,副本的 ACK 必须遵循多数派协议,这一点造成了成本与性能损耗的上升,其次,这使得 RocketMQ 存在两套 HA 复制流程,且 Raft 模式下的复制无法利用 RocketMQ 原生的存储能力。RocketMQ 5.0于是 RocketMQ 在 5.0 版本出了一个全新的模式,Controller 模式。一个基于 Raft 的一致性模块(DLedger Controller),并当作一个可选的选主组件,支持独立部署,也可以嵌入在 Nameserver 中,Broker 通过与 Controller 的交互完成 Master 的选举。内嵌在 Nameserver 中独立部署DLedger Controller模式的优劣然而,即使是 5.0 中做了优化的 DLedger Controller,仍然存在一些问题,下面让我们看看这个模式带来的优缺点内嵌在 NameServer 中资源竞争:如果 DLedger Controller 占用了过多的资源,可能会影响 NameServer 的其他功能。稳定性问题:DLedger Controller 可能对 NameServer 的稳定性产生影响,例如当 DLedger Controller 出现故障或卡顿时,可能会影响整个 NameServer 运行的稳定性。难以分离:将 DLedger Controller 内嵌在 NameServer 中,可能会降低其可分离性,增加其耦合度。简化了部署:将 DLedger Controller 内嵌在 NameServer 中,可以减少需要部署的进程数量,降低了部署和维护的复杂度。减少网络通信:内嵌在 NameServer 中的 DLedger Controller 可以直接与 NameServer 进行通信,减少了网络通信开销,提高了性能和可靠性。整合了多个功能:内嵌在 NameServer 中的 DLedger Controller 可以整合 NameServer 的多个功能,如目录服务、路由服务、定时任务等,提高了系统集成的效率和灵活性。优点缺点独立部署复杂度高:DLedger Controller 的实现需要复杂的算法和数据结构,对开发人员的要求较高。成本高:为了保证高可用性和稳定性,需要配置较多的物理服务器和网络设备,成本较高。维护难度大:DLedger Controller 需要不断地进行监控和维护,一旦出现问题,可能需要进行复杂的排错和修复,增加了维护难度。高可用:DLedger Controller 可提供高可用的服务,即使其中一台服务器出现问题,仍能保证集群的正常运行。性能优秀:DLedger Controller 能快速处理大量的日志数据,使得 RocketMQ 能够高效地处理分布式事务。扩展性好:DLedger Controller 可以添加新的节点,以适应不断增长的数据量和用户需求。优点缺点总结本文简单介绍了 DLedger 基于 Raft 协议实现的 Leader 选举机制,让大家深入地理解分布式系统中的 Leader 选举过程。然而如何在实际场景下选取、优化和扩展分布式一致性算法,都是非常值得探讨的问题。参考文献Apache RocketMQIn Search of an Understandable Consensus Algorithm (Extended Version)
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2024-12-25 13:54 , Processed in 0.305927 second(s), 25 queries .

Powered by Discuz! X3.5

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表