200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > RocketMQ(十)——Consumer消费进度(Offset)的管理

RocketMQ(十)——Consumer消费进度(Offset)的管理

时间:2020-08-19 08:27:02

相关推荐

RocketMQ(十)——Consumer消费进度(Offset)的管理

文章目录

Consumer消费进度(Offset)的管理Offset本地管理模式Offset远程管理模式offset用途重试队列offset的同步提交与异步提交

Consumer消费进度(Offset)的管理

消费进度offset是用来记录每个Queue的不同消费者组的消费进度的。根据消费进度记录器的不用,可以分为两种模式:本地模式和远程模式

Offset本地管理模式

当消费模式为广播消费时,offset使用本地模式存储。因为每条消息会被素有的消费者消费。每个消费者管理自己的消费进度,各个消费者之间不存在消费进度的交集。

consumer在广播模式下offset相关数据以JSON的形式持久化到consumer 本地磁盘文件中,默认文件路径为当前用户主目录下的.rocketmq_offsets/${当前消费者ID}/${消费者组名}/offsets.json

Offset远程管理模式

当消费者模式为集群消费时,offset使用远程模式管理,因为所有Consumer示例对消息采用的是均衡消费,所有Consumer共享Queue的消费进度。

Consumer在集群消费模式下offset相关数据以json的形式持久化到Broker磁盘文件中,文件路径为当前用户主目录下的store/config/consumerOffset.json

Broker启动时会加载这个文件,并写入到一个双层Map(ConsumerOffsetManager)。外层map的key为topic@group,value为内层map。内层map的key为queueId,value为offset。当发生Rebalance时,新的Consumer会从该Map中获取到相应的数据来继续消费。

集群模式下offset采用远程管理模式,主要是为了保证Rebalance机制。

offset用途

消费者是如何从最开始持续消费消息的?消费者要消费的第一条消息的起始位置是用户自己通过consumer.setConsumeFromWhere()方法指定的。

在Consumer启动后,其要消费的第一条消息的起始位置常用的有三种,这三种位置可以通过枚举类型常量设置。这个枚举类型为ConsumeFromWhere

当消费完一批消息后,Consumer会提交其消费进度offset给Broker,Broker在收到消费进度后会将其更新到双层MapConsumerOffsetManagerconsumerOffset.json文件中,然后向该Consumer进 行ACK,而ACK内容中包含三项数据:minOffset(最小offset)、maxOffset(最大offset)、nextBeginOffset(下次消费的起始offset)。

重试队列

当rocketMQ对消息的消费出现异常时,会将发生异常的消息的offset提交到Broker中的重试队列。系统在发生消息消费异常时会为当前的topic@group创建一个重试队列,该队列以%RETRY%开头,到达重试时间后进行消费重试

offset的同步提交与异步提交

集群消费模式下,Consumer消费完消息后会向Broker提交消费进度offset,其提交方式分为两种:

同步提交:消费者在消费完一批消息后会向broker提交这些消息的offset,然后等待broker的成功响应。若在等待超时之前收到了成功响应,则继续读取下一批消息进行消费(从ACK中获取nextBeginOffset)。若没有收到响应,则会重新提交,直到获取到响应。而在这个等待过程中,消费者是阻塞的。其严重影响了消费者的吞吐量。

异步提交:消费者在消费完一批消息后向broker提交offset,但无需等待Broker的成功响应,可以继续读取并消费下一批消息。这种方式增加了消费者的吞吐量。但需要注意,broker在收到提交的offset后,还是会向消费者进行响应的。可能还没有收到ACK,此时Consumer会从Broker中直接获取nextBeginOffset

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。