找回密码
 会员注册
查看: 13|回复: 0

深入剖析RocketMQ源码-负载均衡机制

[复制链接]

4

主题

0

回帖

13

积分

新手上路

积分
13
发表于 2024-10-5 09:49:36 | 显示全部楼层 |阅读模式
Wang Zhi一、引言RocketMQ是一款优秀的分布式消息中间件,在各方面的性能都比目前已有的消息队列要好,RocketMQ默认采用长轮询的拉模式, 单机支持千万级别的消息堆积,可以非常好的应用在海量消息系统中。RocketMQ主要由 Producer、Broker、Consumer、Namesvr 等组件组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息,Namesvr负责存储元数据,各组件的主要功能如下:消息生产者(Producer):负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到Broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。消息消费者(Consumer):负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。代理服务器(Broker Server):消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。名字服务(Name Server):名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。生产者组(Producer Group):同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。消费者组(Consumer Group):同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。RocketMQ整体消息处理逻辑上以Topic维度进行生产消费、物理上会存储到具体的Broker上的某个MessageQueue当中,正因为一个Topic会存在多个Broker节点上的多个MessageQueue,所以自然而然就产生了消息生产消费的负载均衡需求。本篇文章分析的核心在于介绍RocketMQ的消息生产者(Producer)和消息消费者(Consumer)在整个消息的生产消费过程中如何实现负载均衡以及其中的实现细节。二、RocketMQ的整体架构(图片来自于Apache RocketMQ)RocketMQ架构上主要分为四部分,如上图所示:Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。NameServer:NameServer是一个非常简单的Topic路由注册中心,支持分布式集群方式部署,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,支持分布式集群方式部署。RocketMQ的Topic的物理分布如上图所示:Topic作为消息生产和消费的逻辑概念,具体的消息存储分布在不同的Broker当中。Broker中的Queue是Topic对应消息的物理存储单元。在RocketMQ的整体设计理念当中,消息的生产消费以Topic维度进行,每个Topic会在RocketMQ的集群中的Broker节点创建对应的MessageQueue。producer生产消息的过程本质上就是选择Topic在Broker的所有的MessageQueue并按照一定的规则选择其中一个进行消息发送,正常情况的策略是轮询。consumer消费消息的过程本质上就是一个订阅同一个Topic的consumerGroup下的每个consumer按照一定的规则负责Topic下一部分MessageQueue进行消费。在RocketMQ整个消息的生命周期内,不管是生产消息还是消费消息都会涉及到负载均衡的概念,消息的生成过程中主要涉及到Broker选择的负载均衡,消息的消费过程主要涉及多consumer和多Broker之间的负责均衡。三、producer消息生产过程producer消息生产过程:producer首先访问namesvr获取路由信息,namesvr存储Topic维度的所有路由信息(包括每个topic在每个Broker的队列分布情况)。producer解析路由信息生成本地的路由信息,解析Topic在Broker队列信息并转化为本地的消息生产的路由信息。producer根据本地路由信息向Broker发送消息,选择本地路由中具体的Broker进行消息发送。3.1 路由同步过程public class MQClientInstance { public boolean updateTopicRouteInfoFromNameServer(final String topic) { return updateTopicRouteInfoFromNameServer(topic, false, null); } public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { try { if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { TopicRouteData topicRouteData; if (isDefault & defaultMQProducer != null) { // 省略对应的代码 } else { // 1、负责查询指定的Topic对应的路由信息 topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); } if (topicRouteData != null) { // 2、比较路由数据topicRouteData是否发生变更 TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataIsChange(old, topicRouteData); if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); } // 3、解析路由信息转化为生产者的路由信息和消费者的路由信息 if (changed) { TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); } // 生成生产者对应的Topic信息 { TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData); publishInfo.setHaveTopicRouterInfo(true); Iterator> it = this.producerTable.entrySet().iterator(); while (it.hasNext()) { Entry entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicPublishInfo(topic, publishInfo); } } } // 保存到本地生产者路由表当中 this.topicRouteTable.put(topic, cloneTopicRouteData); return true; } } } finally { this.lockNamesrv.unlock(); } } else { } } catch (InterruptedException e) { } return false; }}路由同步过程:路由同步过程是消息生产者发送消息的前置条件,没有路由的同步就无法感知具体发往那个Broker节点。路由同步主要负责查询指定的Topic对应的路由信息,比较路由数据topicRouteData是否发生变更,最终解析路由信息转化为生产者的路由信息和消费者的路由信息。public class TopicRouteData extends RemotingSerializable { private String orderTopicConf; // 按照broker维度保存的Queue信息 private List queueDatas; // 按照broker维度保存的broker信息 private List brokerDatas; private HashMap/* Filter Server */> filterServerTable;} public class QueueData implements Comparable { // broker的名称 private String brokerName; // 读队列大小 private int readQueueNums; // 写队列大小 private int writeQueueNums; // 读写权限 private int perm; private int topicSynFlag;} public class BrokerData implements Comparable { // broker所属集群信息 private String cluster; // broker的名称 private String brokerName; // broker对应的ip地址信息 private HashMap brokerAddrs; private final Random random = new Random();} -------------------------------------------------------------------------------------------------- public class TopicPublishInfo { private boolean orderTopic = false; private boolean haveTopicRouterInfo = false; // 最细粒度的队列信息 private List messageQueueList = new ArrayList(); private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private TopicRouteData topicRouteData;} public class MessageQueue implements Comparable, Serializable { private static final long serialVersionUID = 6191200464116433425L; // Topic信息 private String topic; // 所属的brokerName信息 private String brokerName; // Topic下的队列信息Id private int queueId;}路由解析过程:TopicRouteData核心变量QueueData保存每个Broker的队列信息,BrokerData保存Broker的地址信息。TopicPublishInfo核心变量MessageQueue保存最细粒度的队列信息。producer负责将从namesvr获取的TopicRouteData转化为producer本地的TopicPublishInfo。public class MQClientInstance { public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) { TopicPublishInfo info = new TopicPublishInfo(); info.setTopicRouteData(route); if (route.getOrderTopicConf() != null & route.getOrderTopicConf().length() > 0) { // 省略相关代码 } else { List qds = route.getQueueDatas(); // 按照brokerName进行排序 Collections.sort(qds); // 遍历所有broker生成队列维度信息 for (QueueData qd : qds) { // 具备写能力的QueueData能够用于队列生成 if (PermName.isWriteable(qd.getPerm())) { // 遍历获得指定brokerData进行异常条件过滤 BrokerData brokerData = null; for (BrokerData bd : route.getBrokerDatas()) { if (bd.getBrokerName().equals(qd.getBrokerName())) { brokerData = bd; break; } } if (null == brokerData) { continue; } if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) { continue; } // 遍历QueueData的写队列的数量大小,生成MessageQueue保存指定TopicPublishInfo for (int i = 0; i topicList = new HashSet(); // 遍历所有的consumer订阅的Topic并从namesvr获取路由信息 { Iterator> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { Set subList = impl.subscriptions(); if (subList != null) { for (SubscriptionData subData : subList) { topicList.add(subData.getTopic()); } } } } } for (String topic : topicList) { this.updateTopicRouteInfoFromNameServer(topic); } } public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { try { if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { TopicRouteData topicRouteData; if (isDefault & defaultMQProducer != null) { // 省略代码 } else { topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); } if (topicRouteData != null) { TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataIsChange(old, topicRouteData); if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); } if (changed) { TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); } // 构建consumer侧的路由信息 { Set subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); Iterator> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); } } } this.topicRouteTable.put(topic, cloneTopicRouteData); return true; } } } finally { this.lockNamesrv.unlock(); } } } catch (InterruptedException e) { } return false; }}路由同步过程:路由同步过程是消息消费者消费消息的前置条件,没有路由的同步就无法感知具体待消费的消息的Broker节点。consumer节点通过定时任务定期从namesvr同步该消费节点订阅的topic的路由信息。consumer通过updateTopicSubscribeInfo将同步的路由信息构建成本地的路由信息并用以后续的负责均衡。4.2负载均衡过程public class RebalanceService extends ServiceThread { private static long waitInterval = Long.parseLong(System.getProperty( "rocketmq.client.rebalance.waitInterval", "20000")); private final MQClientInstance mqClientFactory; public RebalanceService(MQClientInstance mqClientFactory) { this.mqClientFactory = mqClientFactory; } @Override public void run() { while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } }}负载均衡过程:consumer通过RebalanceService来定期进行重新负载均衡。RebalanceService的核心在于完成MessageQueue和consumer的分配关系。public abstract class RebalanceImpl { private void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { // 省略相关代码 break; } case CLUSTERING: { // 集群模式下的负载均衡 // 1、获取topic下所有的MessageQueue Set mqSet = this.topicSubscribeInfoTable.get(topic); // 2、获取topic下该consumerGroup下所有的consumer对象 List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); // 3、开始重新分配进行rebalance if (mqSet != null & cidAll != null) { List mqAll = new ArrayList(); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List allocateResult = null; try { // 4、通过分配策略重新进行分配 allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { return; } Set allocateResultSet = new HashSet(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } // 5、根据分配结果执行真正的rebalance动作 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break; } default: break; } }重新分配流程:获取topic下所有的MessageQueue。获取topic下该consumerGroup下所有的consumer的cid(如192.168.0.8@15958)。针对mqAll和cidAll进行排序,mqAll排序顺序按照先BrokerName后BrokerId,cidAll排序按照字符串排序。通过分配策略AllocateMessageQueueStrategy重新分配。根据分配结果执行真正的rebalance动作。public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy { private final InternalLogger log = ClientLogger.getLog(); @Override public List allocate(String consumerGroup, String currentCID, List mqAll, List cidAll) { List result = new ArrayList(); // 核心逻辑计算开始 // 计算当前cid的下标 int index = cidAll.indexOf(currentCID); // 计算多余的模值 int mod = mqAll.size() % cidAll.size(); // 计算平均大小 int averageSize = mqAll.size() 0 & index 0 & index allocate(String consumerGroup, String currentCID, List mqAll, List cidAll) { List result = new ArrayList(); // 通过改写这部分逻辑,增加判断是否是指定IP的机器,如果不是直接返回空列表表示该机器不负责消费 if (!cidAll.contains(currentCID)) { return result; } int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); int averageSize = mqAll.size() 0 & index 0 & index
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2025-1-11 20:59 , Processed in 0.598236 second(s), 25 queries .

Powered by Discuz! X3.5

© 2001-2025 Discuz! Team.

快速回复 返回顶部 返回列表