200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > RocketMQ消息ACK机制

RocketMQ消息ACK机制

时间:2019-11-22 01:37:54

相关推荐

RocketMQ消息ACK机制

1 RocketMQ是以consumer group+queue来确认消息消费进度,通过gruop+offset来标记一个goroup在queue上消费进度,消费成功之后都会返回一个ack消息告之broker更新offset,但是RocketMQ并不是按一条一条消息来做ack,而是根据一次拉取批量来做消息ack

如一次从broker拉去10条消息,就按照10条(不一定是10条,跟更新远程的消费进度定时任务执行时间有关系)消息整体做offset,为方便理解下面先按照10条来分析

如上一次的offset为101,本次拉取了10调消息,偏移量从101-110

每一条消息消费成功会按照当前消息最小的offset来更新本地的消费进度,怎么理解这句话,

例如:103消息先消费完成,但是101还没有消费完成(消费失败也算作消费完成),这时候更新还是按照101的偏移量来更新本地偏移量;直到所有的消息都消费完成,110这条消息消费完成的时候才会把偏移量更新为110,再通过定时任务将本地偏移量更新到broker(假设恰好更新偏移量等定时任务触发)。

RocketMQ按批次更新进度好处是不需要每一条消息都需要做ack操作,提升了效率,但是随之产生了2个问题:

问题1:

如果这一批消息中的101消息由于一些原因一直没有消费完成,即使其它的9条消息都消费完成了,broker的消费进度依然偏移到101,如果此时该consumer宕机或者实例被kill,该queue通过负载均衡策略会重新被分配给其它的consumer,这个时候从broker拉去的偏移量为101开始消费,但是实际102-109这9条消息已经消费完成,造成102-109这9条消息重复消费

在3.6版本之前RocketMQ没有给出解决方案,官方强调业务方需要自己实现消息幂等逻辑,但是为了避免大量的出现消息重复消费的问题,RocketMQ也做了一些限制,如果本地的消息量达到2000之后,不会在拉取新的消息,也就是即使出现上面的极端情况,也只会造成最多1999条消息重复消费。

在3.6之后的版本RocketMQ给出了一个解决方案(治标不治本),在消费端设置了一个消费超时时间

consumeTimeout = 15min

原理是,RocketMQ启动了一个定时任务来检查所有的消息的消费情况,在消费开始的时候会记录消息消费开始时间,每隔consumeTimeout时间去检查所有消息是不是消费完成了,如果还没有消费完成并且时间超过了consumeTimeout配置的时间,就当作消费失败处理(也算作消费完成),既然消费完成了,自然会把本地消费进度更新到上例中的110,再通过定时同步机制将本地进度同步到broker,达成本地和broker端一致的效果

public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {return;}int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;for (int i = 0; i < loop; i++) {MessageExt msg = null;try {this.lockTreeMap.readLock().lockInterruptibly();try {if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {msg = msgTreeMap.firstEntry().getValue();} else {break;}} finally {this.lockTreeMap.readLock().unlock();}} catch (InterruptedException e) {log.error("getExpiredMsg exception", e);}try {pushConsumer.sendMessageBack(msg, 3);

consumeTimeout支持业务自己配置,为什么说治标不治本,因为始终还是出现2*consumeTimeout时间(比如第一次任务在12点0分,101消息从12点1分开始消费,到12点30分才会发现超时,如果这个时候宕机)的消息会出现无法完成确认造成消息重复消费。

问题2:

既然是按批量来更新消费进度,但是那些虽然消费完成但是实际是失败的消息(主动返回RECONSUME_LATER和抛出异常的)的消息是如何处理的?

rocketmq在消息消费失败的消息会单独把该消息的msgid、偏移量等信息通过rpc调用通知给broker,那broker会把该消息做重新的投递,从而做到了消息的重置机制,消息的重试后面在分析

public void consumerSendMessageBack(final String addr,final MessageExt msg,final String consumerGroup,final int delayLevel,final long timeoutMillis,final int maxConsumeRetryTimes) throws RemotingException, MQBrokerException, InterruptedException {ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);requestHeader.setGroup(consumerGroup);requestHeader.setOriginTopic(msg.getTopic());requestHeader.setOffset(msg.getCommitLogOffset());requestHeader.setDelayLevel(delayLevel);requestHeader.setOriginMsgId(msg.getMsgId());requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。