200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > 大揭秘!RocketMQ如何管理消费进度

大揭秘!RocketMQ如何管理消费进度

时间:2020-12-12 01:21:57

相关推荐

大揭秘!RocketMQ如何管理消费进度

在企业实践RocketMQ时基本上80%都是不消费问题,而由于消费进度问题导致不消费的问题又是最难确认的和排查的。RocketMQ的消费进度分为本地消费进度管理和远程消费进度管理,分别对应的消费模式是广播消费和集群消费。

本文选自《RocketMQ分布式消息中间件:核心原理与最佳实践》一书,带你层层揭秘RocketMQ如何管理消费进度。

什么是消费进度

消费进度,也就是由Broker管理每一个消费者消费Topic的进度,包含正常提交消费进度和重置消费进度,如下:

上图表示一个消费者组A,部署了2个消费者实例consumer instance1和consumer instance2。

- consumer instance1消费queue1和queue2

- consumer instance2消费queue3和queue4

这里的消费进度是指consumer instance1分别消费到queue1和queue2第多少条消息,consumer instance2分别消费到queue3和queue4第多少条消息。

在集群消费时,消费进度由消费者主动“上报”给Broker,广播消费时由消费者自己本地保存。

为什么需要消费进度

消费进度管理的目的是保证消费者在正常运行状态、重启、异常关闭等状态下都能准确续接“上一次”未处理的消息。

在RocketMQ中,实现的消费语义叫“至少投递一次”,也就是所有的消息至少有一次机会消费不用担心会丢消息。用户需要实现消费幂等来避免重复投递对业务实际数据的影响。

什么时候“上报”消费进度

消费者一般在两种情况下“上报”消费进度,消费成功后(包含正常消费成功、重试消费成功)和重置消费进度。如下图2展示了,图3展示了:

消费成功后提交消费进度的过程

重置消费进度的过程

二者共同点:

• 都是由Broker统一管理消费者的消费进度

• 都需要由消费者“主动上报”最新的消费进度

二者的差异点:

• 正常消费时提交消费进度,一般消费进度是向前推进

• 重置消费进度时提交消费进度,消费进度可能向前推进,也可能向后回溯

消费进度管理代码分析

在RocketMQ中,将消费进度管理抽象为消费进度管理接口OffsetStore,该接口有两个实现类: RemoteBrokerOffsetStore和LocalFileOffsetStore,他们分别实现了集群消费、广播消费的消费进度管理。

下图描述了OffsetStore、RemoteBrokerOffsetStore和LocalFileOffsetStore三者的类图关系:

OffsetStore接口定义了消费进度管理的基本方法,具体方法列表如下(方法参数已省略):

load(): 加载全部消费者的消费进度信息

updateOffset(): 更新一个queue的消费进度

readOffset(): 读取一个queue的消费进度

persistAll(): 持久化全部消费进度

persist(): 持久化一个queue的消费进度

removeOffset(): 移除一个queue的消费进度

cloneOffsetTable(): 克隆一个topic的消费进度

updateConsumeOffsetToBroker(): 更新消费进度到Broker

RemoteBrokerOffsetStore的实现是将消费进度信息保存到Broker中;LocalFileOffsetStore的实现是将消费进度信息保存到本地文件中。

/ 彩蛋1 /

updateConsumeOffsetToBroker() 这个方法是将消费进度更新到Broker中,想必在LocalFileOffsetStore是没有实现该方法的。通过看源码,也印证了我们的猜想:

接来下以用Push的方式消费普通消息(非顺序消息)为例,具体讲解如何消费成功、重置消费位点整个过程是如何的。

▊消费成功,如何提交消费进度?

在RocketMQ中,消费者是一批一批的消费的,Push消费方式默认每批16条消息,消费完成后会调用ConsumeMessageConcurrentlyService。

processConsumeResult()方法处理消费结果,该方法会更新这批消息中对应Topic的queue的消费进度,具体核心代码片段如下:

1longoffset=consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());2if(offset>=0&&!consumeRequest.getProcessQueue().isDropped()){3this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(),offset,true);4}

以上代码主要涉及3个核心方法removeMessage()、isDropped()、updateOffset()。

removeMessage()方法是将成功消费的消息从本地缓queue中删除,并返回这个queue的消费位点。

isDropped()这个方法是判断这些消息所在的本地queue是否被drop了,如果被drop了消费进度就不更新。一般由于有消费者上线、下线、broker宕机等引发消费者负载均衡,导致这个queue已经分配给其他消费者。

updateOffset(): 更新本地内存中的消费位点。

实现代码如下:

1publicvoidupdateOffset(MessageQueuemq,longoffset,booleanincreaseOnly){2if(mq!=null){3AtomicLongoffsetOld=this.offsetTable.get(mq);4if(null==offsetOld){5offsetOld=this.offsetTable.putIfAbsent(mq,newAtomicLong(offset));6}7if(null!=offsetOld){8if(increaseOnly){9 pareAndIncreaseOnly(offsetOld, offset);10}else{11offsetOld.set(offset);12}13}14}15}

代码中this.offsetTable的类型是ConcurrentMap<MessageQueue,AtomicLong>,表示一个本地queue和其消费位点的对应关系,看到这里大家不禁心中会冒起疑问: 不是更新位点到Broker中嘛? 是的,确实不是。在RocketMQ的设计中,本地消费位点和Broker位点同步是异步的。大家如果顺着persistAll()方法找调用关系,会发现RocketMQ客户端在启动时会初始化一个定时任务调用persistAll()方法,将offsetTable中的本地位点信息更新到Broker中。

persistAll()方法主要是通过调用updateConsumeOffsetToBroker()方法将消费进度更新到Broker的,核心代码片段如下:

1publicvoidupdateConsumeOffsetToBroker(MessageQueuemq,longoffset,booleanisOneway)throwsRemotingException,2MQBrokerException,InterruptedException,MQClientException{3...4if(isOneway){5this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(6findBrokerResult.getBrokerAddr(),requestHeader,1000*5);7}else{8this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(9findBrokerResult.getBrokerAddr(),requestHeader,1000*5);10}11}12...13}

updateConsumeOffsetToBroker()方法将一个queue的消费进度信息封装为一个RPC请求的requestHeader,再加上请求代码RequestCode.UPDATE_CONSUMER_OFFSET一起封装成为一个RPC的请求命令RemotingCommand,最后调用网络层方法invokeOneway()将该RPC请求发送给Broker。

/ 彩蛋2 /

这里特别注意,RocketMQ默认是通过invokeOneway()方法将该请求发送出去的,也就是说客户端只管发请求。不管Broker的返回结果。如果网络不好或者Broker处理慢,可能发现一个现象: 消费者一直在正常消费,而Broker的消费进度信息更新很慢。

▊重置消费进度如何生效?

RocketMQ目前支持重置消费进度到某个具体时间,重置消费位点逻辑中客户端部分和正常消费一致,只是消费进度更新发起者是RocketMQ Console,具体过程如下图6所示:

第一步,用户可以在RocketMQ Console的Topic页面,重置一个Topic的某一个消费者组的消费进度到某个时刻。

第二步,当Broker收到Console发送的重置消费进度请求后,会根据重置时间查找该时间对应的每个queue的消费位点,然后将这些信息封装后发送给每一个消费者实例。

第三步,消费者收到Broker发送的重置位点请求后,更新本地消费进度。

/ 彩蛋3 /

这里有个坑,除了java客户端之外,如果是CPP/Python/Go等基于CPP客户端封装的多语言客户端会重置失败,原因时Broker在封装请求时,只是按照java协议封装了请求包,该包其他语言会解析失败,导致重置位点失败。目前笔者已经提PR(pr id=1930)处理了。

第四步,消费者本地的定时任务定时将本地位点信息同步到Broker。(逻辑和成功消费时一致)

通过我们大量的实践发现,何时提交消费进度、如何提交消费进度是排查问题的主要依据,在掌握了这两点后,问题基本迎刃而解。

想要了解更多关于RocketMQ的原理实现可以阅读《RocketMQ分布式消息中间件:核心原理与最佳实践》一书。

这是一本讲解RocketMQ最佳实践的系统化书籍,作者有在RocketMQ在线高可靠场景下的深度开发和运营经验,踩过很多坑,总结出宝贵的经验。内容清晰易懂,又结合了最佳实践的经验,可以当作RocketMQ初学的参考书,也可以当作在线深度大规模使用的工具书。

关于作者

Apache RocketMQ北京社区联合发起人,RocketMQ项目Commiter,RocketMQ社区Python客户端项目负责人。目前就职于北京某在线教育公司,担任高级大数据工程师,曾负责公司消息与数据流平台,目前主要负责OLAP团队,对分布式存储计算系统设计有丰富经验,热衷于知识分享和社区活动。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。