200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > RocketMQ重试机制(ACK确认机制)

RocketMQ重试机制(ACK确认机制)

时间:2018-10-11 14:35:30

相关推荐

RocketMQ重试机制(ACK确认机制)

大家好,我是一个喜欢诗词的java研发赛亚人,感谢您的关注~ ┗( ▔, ▔ )┛。微信搜索【程序猿卡卡罗特】,后续有更多硬核文章哦~

今日诗词:少年恃险若平地,独倚长剑凌清秋。 – [唐·顾况]《行路难三首》

今天我们来扒一扒RocketMQ重试机制的底裤,内容比较硬核,建议一键三联。哦不,走错片场了,建议点赞 + 收藏。

好嘞,咱们这就上车~

以下只设计Consumer的重试机制,Producer比较简单,只是单纯的重发(当然还有故障转移机制啦),暂不讨论…

设计知识点

ACK 重试机制原理死信队列(DLQ队列)

几个问题

消息重试是什么意思?Consumer 消费消息分为集群模式(Cluster)、广播模式(Broadcast),两种模式都会进行消息重试吗?消息重试的策略是什么?消息重试的延迟时间规则?什么叫死信队列?有什么特点?Msg加入死信队列的条件是什么?

知识背景

我们知道Consumer拉取消息、消费消息时分开的,分别由两个类去实现:

拉取消息:PullMessageService消费消息:ConsumeMessageConcurrentlyService

消息消费流程

下面只展示关键代码

1、假设我们拉取到消息,准备提交到 ConsumeMessageConcurrentlyService 中进行消费,会调如下代码:

// ConsumeMessageConcurrentlyService public void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();// 假设未分页if (msgs.size() <= consumeBatchSize) {// 消息封装到里面ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {// 丢线程池消费this.consumeExecutor.submit(consumeRequest);}}}

2、ConsumeRequest 内部代码

@Overridepublic void run() {// 1、Consumer 中设计的回调方法MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {// 2、回调 Consumer 中的监听回调方法status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {hasException = true;}// 3、如果status 返回null,设置为 RECONSUME_LATER 类型if (null == status) {status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}// 4、对返回的 status 结果进行处理ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);}

什么?Consumer 中的监听回调方法是什么意思?

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("your_topic");// .... 省略部分代码// 1、设置监听回调方法consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {try {System.out.println(result);// 2、返回成功表示消费成功,不会进行重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (UnsupportedEncodingException e) {e.printStackTrace();// 3、返回 RECONSUME_LATER 表示消息需要重试(返回NULL也是一样)// RECONSUME_LATER:通过单词我们知道是 稍后重新消费的意思,即重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}});

回调方法就是上面你写的那个匿名类嘛。我猜您肯定知道的啦,真谦虚 (ー̀дー́)

3、根据返回的 status 判断是否需要重试

public void processConsumeResult(final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest) {int ackIndex = context.getAckIndex();switch (status) {// 1、消费成功case CONSUME_SUCCESS:if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;}break;// 2、消费延迟case RECONSUME_LATER:ackIndex = -1;break;default:break;}// 3、针对不同的消息模式做不同的处理switch (this.defaultMQPushConsumer.getMessageModel()) {// 4、广播模式:如果消费是爱 ackIndex 为-1就会执行循环,可以看到只是打印日志,没有其它多余的操作case BROADCASTING:for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;// 5、集群模式case CLUSTERING:List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());// 6、RECONSUME_LATER 时,ackIndex 为-1,执行循环。CONSUME_SUCCESS 时不会执行循环for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);// 7、能到这里说明是 RECONSUME_LATER 状态:回退Msg到Broker,也就是ACK(重试)boolean result = this.sendMessageBack(msg, context);// 8、ACK 可能会失败,需要记录失败的ACKif (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}}if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);// 9、存在ACK 失败的消息,将消息丢到线程池延迟 5s 重新消费this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}// 10、更新消费的偏移量:注意这里 CONSUME_SUCCESS 和 RECONSUME_LATER 都会更新long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}}

根据上面源码我们可以得出以下结论:

1、由第4步我们可知:广播模式 就算消费者消费失败,也不会进行重试,只是打印警告日志。

2、只有消费失败(没有返回 CONSUME_SUCCESS 都成为失败)的消息才需要发送ACK重试

3、如果ACK失败,(总感觉这里ACK叫起来怪怪的,《RocketMQ技术内幕》中成为ACK失败),我们叫重试失败吧。

​ 如果重试失败,就会继续被延迟5s重新消费(又会回调到Consumer中的回调方法)

4、消息被消费成功、失败,都会更新Consumer 的偏移量

4、ConsumeMessageConcurrentlyService.sendMessageBack:准备请求Broker

public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {// 1、注意这里:默认为0,其实一直都是0,其它地方没有修改。这表示RocketMQ延迟消息的 延迟级别int delayLevel = context.getDelayLevelWhenNextConsume();try {// 2、发送给Brokerthis.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());return true;} catch (Exception e) {log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);}return false;}

什么?你不知道RocketMQ延迟消息的 延迟级别是啥意思? T_T"

RocketMQ官网延迟例子

我们知道RocketMQ延迟级别分为18级,delayLevel从1-18,每个数字对应一个延迟的时间。

延迟时间如下:

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

比如:delayLevel=1,表示延迟1s

​ 那 delayLevel=4,就是延迟30s的意思? 呀,你还学会了抢答,就是这个意思。你真聪明。 (o゚▽゚)o

Broker端对重试的处理

以下代码设计到Broker的源码,读者需要下载RocketMQ源码才看得到。

这个方法就是处理Consumer的重试请求的代码,方法中代码比较长。主要做了以下几个事:

更消息的 Topic 为"%RETRY%"+ group,计算queueId(重试队列,队列数为1)如果消息重试 >= 16次(默认)。继续更改消息的Topic 为死信队列的Topic:"%DLQ%" + group,消费队列为1(死信队列只有一个消费队列)如果没有变成死信,计算消息的延迟级别复制原来Msg,重新生成一个Msg,将新Msg丢给BrokerController中,然后存到CommitLog中进行存储(什么?你不知道什么是CommitLog? 下期写一篇RocketMQ内部存储结构) 新的Msg 会有新的messageId非死信:该消息以新的Topic名:"%RETRY%"+ group存到CommitLog中作为延迟消息死信:以"%DLQ%" + group为Topic名,存到CommitLog中:存到死信队列中的消息不会被Consumer消费了

private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request){// 1、新的Topic名:"%RETRY%"+ groupString newTopic = MixAll.getRetryTopic(requestHeader.getGroup());// 重试队列数为1int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();// 2、都是为0int delayLevel = requestHeader.getDelayLevel();// 3、消息重试次数:重试几次这里存的就是低几次int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();// 4、如果超过最大重试次数(默认为16)if (msgExt.getReconsumeTimes() >= maxReconsumeTimes|| delayLevel < 0) {// 5、更改Topic 名为死信队列名:"%DLQ%" + groupnewTopic = MixAll.getDLQTopic(requestHeader.getGroup());// 6、默认死信队列数为1queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;} else {// 7、delayLevel 其实都为0,所以这里就相当于是重试次数 +3if (0 == delayLevel) {delayLevel = 3 + msgExt.getReconsumeTimes();}msgExt.setDelayTimeLevel(delayLevel);}// 8、新建消息,准备存到CommitLog中作为新消息MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(newTopic);msgInner.setQueueId(queueIdInt);// 8-1、重试次数+1。新消息被消费者消费时就会传上来,到第4步进行比较msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);// 9、作为新消息存到CommitLog中PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);}

什么是死信队列(DLQ队列)?

参考博客,就不造轮子了

总结就是:

Broker中单独的一个队列(DLQ),该队列存储了Consumer端重试16次后都没成功消费的消息该队列:只有写权限,没有读权限。所以是不能被Consumer重新消费的,只能进行人工干预,重新投递(Rocket-MQ-Console 中可以操作)DLQ队列中,该消息的TOPIC 重新被命名为:"%DLQ%"+groupNameDLQ队列其实就是(consumequeue文件夹的"%DLQ%"+groupName命名的Topic文件夹下的队列)

什么?consumequeue 文件夹是什么鬼?等我…,马上就写一起RocketMQ消息存储结构你就清楚了

重试消息延时机制

我们说重试消息发到Broker后,被作为一个新的延迟消息存到了CommitLog中,当该消息到了消费时间点是会被Consumer重新消费的。

消息重试16次才会被丢到死信队列中,才不会被消费了。

那其余15次消息每次延迟是延迟多久呢?

我们在上面的源码其实可以看得出:消息的延迟级别是受重试次数(reconsumeTimes)影响的。重试次数越大,延迟越久。

delayLevel = 3 + msgExt.getReconsumeTimes();

具体的重试延迟时间如下:图片来自阿里云

总结

我们回到我们刚开始的几个问题:

消息重试是什么意思?Consumer 消费消息分为集群模式(Cluster)、广播模式(Broadcast),两种模式都会进行消息重试吗?消息重试的策略是什么?消息重试的延迟时间规则?什么叫死信队列?有什么特点?Msg加入死信队列的条件是什么?

消息重试是什么意思?

RocketMQ为了保证高可用,如果Consumer消费消息失败(回调函数没有返回CONSUME_SUCCESS)就需要重新让消费者消费该条消息。

Consumer 消费消息分为集群模式(Cluster)、广播模式(Broadcast),两种模式都会进行消息重试吗?

广播模式只会以警告日志的形式记录消费失败的消息,并不会重试

集群模式才会执行消息的重试机制。

消息重试的策略是什么?

Broker 端采用延迟消息的方式,供Consumer再次消费。

消息重试的延迟时间规则?

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xRNGr03y-1628575923634)(imgs\消息重试机制\msg重试时间.png)]

Msg加入死信队列的条件是什么?

消息重试16次后,Consumer 还未消费成功。

最后

若有错误,欢迎各位爷在评论区不吝赐教。后续会继续更新RocektMQ相关文章,欢迎大家在评论区留言呐~

后续最新的文章会先更新到微信上,欢迎来骚扰鸭~ (ノ゚▽゚)ノ

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。