200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > RocketMQ:消费端的消息消息队列负载均衡与重新发布机制源码解析

RocketMQ:消费端的消息消息队列负载均衡与重新发布机制源码解析

时间:2024-07-23 14:20:45

相关推荐

RocketMQ:消费端的消息消息队列负载均衡与重新发布机制源码解析

文章目录

前言流程解析总结

前言

在上一篇博客中我们了解到,PullMessageService线程主要是负责从pullRequestQueue中获得拉取消息请求并进行请求处理的。

PullMessageService#run

//在拉取消息请求队列中拉取消息请求PullRequest pullRequest = this.pullRequestQueue.take();//处理请求this.pullMessage(pullRequest);

但是pullRequestQueue中的PullRequest是从哪来的呢?是什么时候由谁进行填充的呢?

private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();

流程解析

通过pullRequestQueue中的PullRequest添加操作这个线索一步步跟踪下去,最后得出了pullRequestQueue的调用链:

RebalanceService#run↓MQClientInstance#doRebalance↓DefaultMQPullConsumerImpl#doRebalance↓RebalanceImpl#doRebalance↓RebalanceImpl#rebalanceByTopic↓RebalanceImpl#updateProcessQueueTableInRebalance↓RebalancePushImpl#dispatchPullRequest↓DefaultMQPushConsumerImpl#executePullRequestImmediately↓PullMessageService#executePullRequestImmediately

由上面的调用链我们可以看到,向PullMessageService中的LinkedBlockingQueue<PullRequest>添加拉取消息请求的是RebalanceService#run,接下来我们对这个源头RebalanceService进行解析。

RebalanceService

public class RebalanceService extends ServiceThread {//等待时间private static long waitInterval =Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));private final InternalLogger log = ClientLogger.getLog();//消息客户端private final MQClientInstance mqClientFactory;public RebalanceService(MQClientInstance mqClientFactory) {this.mqClientFactory = mqClientFactory;}@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {this.waitForRunning(waitInterval);//进入mqClientFactorythis.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");}}

RebalanceService是一个服务线程,其run方法主要是调用MQClientInstance#doRebalance进行重新负载。

MQClientInstance

private final RebalanceService rebalanceService;public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;...// Start rebalance service//负载均衡服务启动this.rebalanceService.start();}}}

MQClientInstance持有一个RebalanceService线程,在start方法中开启该线程。

MQClientInstance#doRebalance

//循环遍历每个消费者组中的MQConsumerInner(即DefaultMQPush<Pull>ConsumerImpl)并调用其doRebalancefor (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {MQConsumerInner impl = entry.getValue();if (impl != null) {try {impl.doRebalance();} catch (Throwable e) {log.error("doRebalance exception", e);}}}

DefaultMQPushConsumerImpl#doRebalance

@Overridepublic void doRebalance() {if (!this.pause) {this.rebalanceImpl.doRebalance(this.isConsumeOrderly());}}

经过多层的对象委托,终于来到实现消息负载分发的核心。

RebalanceImpl

//消息处理队列protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);//Topic的队列信息protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =new ConcurrentHashMap<String, Set<MessageQueue>>();//Topic订阅信息protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =new ConcurrentHashMap<String, SubscriptionData>();//消费者组protected String consumerGroup;//消费模式protected MessageModel messageModel;//队列分配策略protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;//MQ客户端protected MQClientInstance mQClientFactory;

RebalanceImpl#doRebalance

/*** 遍历订阅消息对每个主题的订阅的队列进行重新负载* @param isOrder是否是顺序消息*/public void doRebalance(final boolean isOrder) {Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();try {//根据Topic来对队列进行重新负载this.rebalanceByTopic(topic, isOrder);} catch (Throwable e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);}}}}//如果消息队列的Topic不在订阅的主题中-删除该消息队列this.truncateMessageQueueNotMyTopic();}

RebalanceImpl#rebalanceByTopic

//从主题订阅消息缓存表中获取主题的队列信息Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);//查找该主题订阅组所有的消费者IDList<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);//消费模式switch (messageModel) {case BROADCASTING: {... break;}case CLUSTERING: {if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}if (null == cidAll) {log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);}//对主题的消息队列和消费者ID进行排序if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);//获取当前负载均衡策略AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {//根据策略对消息队列进行重新分配allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}//重新负载后-对消息消费队列进行更新-返回消息队列负载是否发生变化boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);this.messageQueueChanged(topic, mqSet, allocateResultSet);}}break;}default:break;}

Rebalance#updateProcessQueueTableInRebalance

boolean changed = false;//消息队列负载是否发生变化Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();//遍历<消息队列,处理队列>缓存表while (it.hasNext()) {Entry<MessageQueue, ProcessQueue> next = it.next();MessageQueue mq = next.getKey();ProcessQueue pq = next.getValue();//如果消息队列不在该Topic处理范围内if (mq.getTopic().equals(topic)) {//消息队列已经被分配到其他消费者去消费了-不包含在当前主题的Set<MessageQueue>中if (!mqSet.contains(mq)) {//private volatile boolean dropped;//设置当前处理队列为被丢弃-及时阻止继续向该消息处理队列进行消息拉取pq.setDropped(true);//判断是否需要移除if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();//发生变化changed = true;log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);}} else if (pq.isPullExpired()) {switch (this.consumeType()) {case CONSUME_ACTIVELY:break;case CONSUME_PASSIVELY:pq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",consumerGroup, mq);}break;default:break;}}}}

RebalancePushImpl#removeUnnecessaryMessageQueue

//丢弃消息队列之前先将消息队列进行持久化//保存在本地(LocalFileOffsetStore)/消息服务器Broker(RemoteBrokerOffsetStore)this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);//顺序消费进入的分支if (this.defaultMQPushConsumerImpl.isConsumeOrderly()&& MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {try {if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) {try {return this.unlockDelay(mq, pq);} finally {pq.getConsumeLock().unlock();}} else {log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",mq,pq.getTryUnlockTimes());pq.incTryUnlockTimes();}} catch (Exception e) {log.error("removeUnnecessaryMessageQueue Exception", e);}return false;}//暂时只看非顺序消息-返回truereturn true;

RebalanceImpl#updateProcessQueueTableInRebalance

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();//遍历消息队列for (MessageQueue mq : mqSet) {if (!this.processQueueTable.containsKey(mq)) {if (isOrder && !this.lock(mq)) {log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);continue;}this.removeDirtyOffset(mq);ProcessQueue pq = new ProcessQueue();long nextOffset = -1L;try {//根据不同的消息消费策略获取下一次消费的偏移量//CONSUME_FROM_LAST_OFFSET/CONSUME_FROM_FIRST_OFFSET/CONSUME_FROM_TIMESTAMPnextOffset = putePullFromWhereWithException(mq);} catch (Exception e) {log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);continue;}if (nextOffset >= 0) {ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);//消息队列已经存在if (pre != null) {log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);} else {//消息队列不存在-新添加log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);//封装拉取请求PullRequestPullRequest pullRequest = new PullRequest();pullRequest.setConsumerGroup(consumerGroup);pullRequest.setNextOffset(nextOffset);pullRequest.setMessageQueue(mq);pullRequest.setProcessQueue(pq);//放入拉取请求列表pullRequestList.add(pullRequest);changed = true;}} else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);}}}//分发消息拉取请求this.dispatchPullRequest(pullRequestList);return changed;

RebalancePushImpl#dispatchPullRequest

@Overridepublic void dispatchPullRequest(List<PullRequest> pullRequestList) {//遍历请求列表for (PullRequest pullRequest : pullRequestList) {//立刻拉取消息this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);}}

DefaultMQPushConsumerImpl#executePullRequestImmediately

public void executePullRequestImmediately(final PullRequest pullRequest) {//将请求丢入PullMessageService线程中this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);}

PullMessageService

public void executePullRequestImmediately(final PullRequest pullRequest) {try {//放入消息拉取请求队列中this.pullRequestQueue.put(pullRequest);} catch (InterruptedException e) {log.error("executePullRequestImmediately pullRequestQueue.put", e);}}

总结

本文主要解析了消息消费端的负载机制,首先RebalanceService线程启动,为消息消费者分发消息队列,每一个MessageQueue消息队列都回构建一个PullRequest,通过将这个PullRequest放入PullMessageService中的pullRequestQueue,进而唤醒PullMessageService#run,在pullRequestQueue中获得拉取消息请求并进行处理。从上一篇的的消息拉取分析中我们可以得知,接下来执行DefaultMQPushConsumerImpl#pullMessage,通过网络远程调用从Broker中拉取消息,一次最多拉取消息数量默认为32条,然后Broker将拉取的消息进行过滤并封装后返回。返回之后再回到消息消费端,将消费任务提交到消费者的ConsumerMessageService执行消息的消费。

本文仅作为个人学习使用,如有不足或错误请指正!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。