200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > RocketMQ源码分析之延迟消息

RocketMQ源码分析之延迟消息

时间:2019-10-15 19:49:49

相关推荐

RocketMQ源码分析之延迟消息

文章目录

前言一、延迟消息1.特点2.使用场景3.demo二、发送延迟消息三、broker端存储延迟消息四、总结1.延迟消息工作原理2.延迟消息在消费者消费重试中的应用

前言

本篇文章将会分析延迟消息的工作原理以及其在consumer端消息重试场景中的应用。


一、延迟消息

1.特点

(1)与普通消息相比,延迟消息需要设置延迟级别,注意:延迟级别从1开始,如果延迟级别等于0则表示该消息不是延迟消息

(2)延迟消息发送到broker后不会立刻被消费,而是需要等待特定时间后才被投递到真正的topic中

(3)RocketMQ不支持任意时间延迟,broker端配置文件中可以配置延迟队列等级,默认值是1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,目前RocketMQ中支持的延迟时间的单位有4种:s(秒)、m(分钟)、h(小时)、d(天)

2.使用场景

(1)在电商购物场景中,如果用户在下单后没有立刻付款此时界面上就会提示:如果15分钟后没有支付那么订单将会被取消

(2)通过消息触发定时任务,例如在某一固定时间点向用户发送提醒消息

3.demo

示例源于官网。

(1)producer

import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.mon.message.Message;public class ScheduledMessageProducer {public static void main(String[] args) throws Exception {// 实例化一个生产者来产生延时消息DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");// 启动生产者producer.start();int totalMessagesToSend = 100;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)message.setDelayTimeLevel(3);// 发送消息producer.send(message);}// 关闭生产者producer.shutdown();}}

(2)consumer

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.mon.message.MessageExt;import java.util.List;public class ScheduledMessageConsumer {public static void main(String[] args) throws Exception {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");// 订阅Topicsconsumer.subscribe("TestTopic", "*");// 注册消息监听者consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {// Print approximate delay time periodSystem.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();}}


二、发送延迟消息

producer发送延迟消息与普通消息的流程是一致的,唯一需要注意的是:需要在producer端调用setDelayTimeLevel(int level)方法为消息设置延迟等级,设置延迟等级实际上是在消息的properties属性中添加<DELAY, level>键值对。延迟消息的发送流程可以参考笔者之前的笔记:RocketMQ源码分析之普通消息发送


public void setDelayTimeLevel(int level) {this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));}void putProperty(final String name, final String value) {if (null == this.properties) {this.properties = new HashMap<String, String>();}this.properties.put(name, value);}

三、broker端存储延迟消息

broker端存储延迟消息时会发生两次消息写入操作,一次是将消息写入“SCHEDULE_TOPIC_XXXX”的“delayLevel - 1”消息队列中,一次是将消息写入实际的topic和queueId中。接下来我们看看延迟消息在broker端存储的工作原理。

1.在broker端存储时会判断消息的properties属性中DELAY的值是否大于0,如果大于0则表示该消息是延迟消息,对延迟消息的处理逻辑如下:

首先会对延迟级别进行判断,判断其是否超过了broker端设置的最大延迟级别,如果大于则将其重置为broker端的最大延迟级别将消息的原始topic和queueId存储在在properties的REAL_TOPIC和REAL_QID属性中将消息的topic和queueId别重置为SCHEDULE_TOPIC_XXXX和delayLevel - 1

这一步的处理对应于Commitlog.java中的asyncPutMessage(final MessageExtBrokerInner msg)方法,具体如下:

if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Deliveryif (msg.getDelayTimeLevel() > 0) {//如果设置的延迟级别大于broker端配置最大延迟级别则将该消息的延迟级别重置为broker端配置的最大延迟级别if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));//将消息的topic和queueId分别重置为SCHEDULE_TOPIC_XXXX和delayLevel - 1,而消息原始的topic和queueId记录在properties的REAL_TOPIC和REAL_QID属性中msg.setTopic(topic);msg.setQueueId(queueId);}}public static int delayLevel2QueueId(final int delayLevel) {return delayLevel - 1;}

注意:在将消息存入“SCHEDULE_TOPIC_XXXX”时,MessageQueue的queueId与delayLevel的对应关系是:queueId = delayLevel - 1。接着就是将消息写入到commitlog。

2.当commitlog中新添加消息后就会调用reputMessageService服务来构建DispatchRequest,后续会根据DispatchRequest来构建consumequeue和indexFile。在构建DispatchRequest时会调用checkMessageAndReturnSize方法,该方法中有关于延迟消息的处理需要注意,具体如下:调用computeDeliverTimestamp方法计算延迟消息的投递时间,并将投递时间放在consumequeue的tag字段,也就是此时构建的consumequeue中tag中存储的是“storeTimestamp+延迟级别对应的时间”

{String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {int delayLevel = Integer.parseInt(t);if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();}if (delayLevel > 0) {tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,storeTimestamp);}}}public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {Long time = this.delayLevelTable.get(delayLevel);if (time != null) {return time + storeTimestamp;}return storeTimestamp + 1000;}

在将延迟消息写入commitlog(topic:SCHEDULE_TOPIC_XXXX)以及构建完其对应的consumequeue后,后续都是由ScheduleMessageService服务来处理,这里我们先介绍下有关ScheduleMessageService服务的基础信息,然后再接着上面详细分析broker后续如何处理延迟消息。

3.ScheduleMessageService

关于ScheduleMessageService我们需要了解以下信息:

其初始化及启动都是在broker启动的过程中完成的,其实现原理是Timer+TimerTask在其初始化完成后会执行load函数,主要完成两个任务,一个是将文件${Rocket_HOME}/store/config/delayOffset.json加载到内存中的offsetTable,一个是获取broker端配置的messageDelayLevel并将其解析到delayLevelTable,其数据结构是<delayLevel, delay timeMillis>,在解析的过程中会确定好maxDelayLevel

public boolean load() {boolean result = super.load();result = result && this.parseDelayLevel();return result;}public boolean parseDelayLevel() {HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();//从这里可以看到目前支持4种时间单位timeUnitTable.put("s", 1000L);timeUnitTable.put("m", 1000L * 60);timeUnitTable.put("h", 1000L * 60 * 60);timeUnitTable.put("d", 1000L * 60 * 60 * 24);String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();try {String[] levelArray = levelString.split(" ");for (int i = 0; i < levelArray.length; i++) {String value = levelArray[i];String ch = value.substring(value.length() - 1);Long tu = timeUnitTable.get(ch);int level = i + 1;if (level > this.maxDelayLevel) {this.maxDelayLevel = level;}long num = Long.parseLong(value.substring(0, value.length() - 1));long delayTimeMillis = tu * num;this.delayLevelTable.put(level, delayTimeMillis);}} catch (Exception e) {log.error("parseDelayLevel exception", e);log.info("levelString String = {}", levelString);return false;}

在启动ScheduleMessageService时会完成两个任务,一个是遍历delayLevelTable为每个延迟级别的队列创建一个DeliverDelayedMessageTimerTask,一个是创建定时任务将offsetTable持久化

public void start() {if (pareAndSet(false, true)) {this.timer = new Timer("ScheduleMessageTimerThread", true);for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {Integer level = entry.getKey();Long timeDelay = entry.getValue();//获取延迟级别对应的消息队列拉取进展,offsetTable中存储的是<delayLevel, consumequeue拉取进展>Long offset = this.offsetTable.get(level);if (null == offset) {offset = 0L;}if (timeDelay != null) {this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);}}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {if (started.get()) ScheduleMessageService.this.persist();} catch (Throwable e) {log.error("scheduleAtFixedRate flush exception", e);}}}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());}}

有了前面ScheduleMessageService的介绍,我们接着分析broker后续的处理,前面分析到构建“SCHEDULE_TOPIC_XXXX”的consumequeue,那么“SCHEDULE_TOPIC_XXXX”是由谁来消费呢?其实是有ScheduleMessageService为每个延迟队列构建的DeliverDelayedMessageTimerTask来消费。DeliverDelayedMessageTimerTask继承了TimeTask,也就是说它的本质就是一个TimeTask,其核心实现是在executeOnTimeup方法中

,我们来看下它都完成哪些操作:

根据topic的名称“SCHEDULE_TOPIC_XXXX”以及delayLevel对应的queueId来查询其对应的consumequeue

根据当前consumequeue的拉取进展来获取consumequeue中待读取的数据

解析consumequeue中的数据:延迟消息在commitlog中的物理偏移量、消息大小以及消息tag的hashcode

判断当前是否已经到了延迟消息投递时间,方法是计算投递时间与当前时间的差值countdown,如果countdown小于等于0表示已经到了消息投递时间,如果countdown大于0则表示还没有到延迟消息投递时间

如果到达延迟消息投递时间则会根据该消息在commitlog中的物理偏移量以及消息大小来获取延迟消息msgExt,接着会调用messageTimeup方法,它会根据延迟消息构建一个新的消息,这里比较关键的操作有三个:一个是根据消息的tag来设置新消息的tagsCode,一个是将消息properties中key为“DELAY”的键值对删除了,最后一个是新消息的topic和queueId是原来消息中properties中REAL_TOPIC和REAL_QID对应的值,也就是说这一步是还原了最初的延迟消息,接着就是调用了putMessage方法将还原后的消息写入commitlog,如果写入失败则会在日志中打印失败的消息同时会在10秒后再次调度该DeliverDelayedMessageTimerTask任务

如果没有到达延迟消息投递的时间则会在countdown时间之后再次调度该DeliverDelayedMessageTimerTask任务

当该延迟队列中没有新的消息可以消费时,则会以0.1秒为周期调度DeliverDelayedMessageTimerTask任务

public void run() {try {if (isStarted()) {this.executeOnTimeup();}} catch (Exception e) {// XXX: warn and notify melog.error("ScheduleMessageService, executeOnTimeup exception", e);ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);}}public void executeOnTimeup() {ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));long failScheduleOffset = offset;if (cq != null) {SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);if (bufferCQ != null) {try {long nextOffset = offset;int i = 0;ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {long offsetPy = bufferCQ.getByteBuffer().getLong();int sizePy = bufferCQ.getByteBuffer().getInt();long tagsCode = bufferCQ.getByteBuffer().getLong();if (cq.isExtAddr(tagsCode)) {if (cq.getExt(tagsCode, cqExtUnit)) {tagsCode = cqExtUnit.getTagsCode();} else {//can't find ext content.So re compute tags code.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",tagsCode, offsetPy, sizePy);long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);}}long now = System.currentTimeMillis();long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);long countdown = deliverTimestamp - now;if (countdown <= 0) {MessageExt msgExt =ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);if (msgExt != null) {try {MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",msgInner.getTopic(), msgInner);continue;}PutMessageResult putMessageResult =ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);if (putMessageResult != null&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {continue;} else {// XXX: warn and notify melog.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",msgExt.getTopic(), msgExt.getMsgId());ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,nextOffset), DELAY_FOR_A_PERIOD);ScheduleMessageService.this.updateOffset(this.delayLevel,nextOffset);return;}} catch (Exception e) {/** XXX: warn and notify me*/log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="+ offsetPy + ",sizePy=" + sizePy, e);}}} else {ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown);ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;}} // end of fornextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;} finally {bufferCQ.release();}} // end of if (bufferCQ != null)else {long cqMinOffset = cq.getMinOffsetInQueue();if (offset < cqMinOffset) {failScheduleOffset = cqMinOffset;log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="+ cqMinOffset + ", queueId=" + cq.getQueueId());}}} // end of if (cq != null)ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,failScheduleOffset), DELAY_FOR_A_WHILE);}

当消息成功写入commitlog后,reputMessageService会构建DispatchRequest来构建consumequeue和indexFile,这样消费者就可以正常消费消息了。


四、总结

1.延迟消息工作原理

这里我们用一张图来总结延迟消息的工作原理:

延迟消息的整个流程可以概括为以下步骤:

(1)producer端发送延迟消息

(2)broker将延迟消息的topic和queueId分别替换为SCHEDULE_TOPIC_XXXX和delayLevel-1,将其存储在commitlog中并构建对应的consumequeue,此时该消息的consumequeue的tagsCode值为storeTimestamp+延迟级别对应的时间

(3)延迟队列对应的DeliverDelayedMessageTimerTask根据offsetTable中的拉取进展从consumequeue获取延迟消息在commitlog中的物理偏移量等信息

(4)从commitlog读取延迟消息并还原延迟消息的topic和queueId

(5)将还原后的消息再次写入commitlog中

(6)构建消息的consumequeue

(7)消费者正常消费消息

从上图我们可以看到整个过程中有两次消息写入,所以此时的tps会翻倍

2.延迟消息在消费者消费重试中的应用

在RocketMQ中延迟消息被用在了consumer端消息重试的场景中,现在来分析下具体是如何应用的。

(1)首先,当前consumer端消息完一条消息返回的状态是ConsumeConcurrentlyStatus.RECONSUME_LATER时,consumer端会向broker发送RequestCode.CONSUMER_SEND_MSG_BACK请求,broker在对该请求处理时有以下两点需要注意:

broker端存储的消息的topic名称是%RETRY%+consumerGroup,这里需要注意一点,consumer在启动的过程中除了会订阅消息本省的topic外还会订阅重试topic

private void copySubscription() throws MQClientException {try {Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();if (sub != null) {for (final Map.Entry<String, String> entry : sub.entrySet()) {final String topic = entry.getKey();final String subString = entry.getValue();SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic, subString);this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);}}if (null == this.messageListenerInner) {this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();}switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:break;case CLUSTERING://订阅重试topicfinal String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),retryTopic, SubscriptionData.SUB_ALL);this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);break;default:break;}} catch (Exception e) {throw new MQClientException("subscription exception", e);}}

broker端会计算delayLevel,计算方法是delayLevel = 3 + msgExt.getReconsumeTimes()broker端会将消息的reconsumeTimes值加1

String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();...if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) {newTopic = MixAll.getDLQTopic(requestHeader.getGroup());queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,DLQ_NUMS_PER_GROUP,PermName.PERM_WRITE, 0);if (null == topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("topic[" + newTopic + "] not exist");return pletedFuture(response);}} else {if (0 == delayLevel) {delayLevel = 3 + msgExt.getReconsumeTimes();}msgExt.setDelayTimeLevel(delayLevel);}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(newTopic);msgInner.setBody(msgExt.getBody());msgInner.setFlag(msgExt.getFlag());MessageAccessor.setProperties(msgInner, msgExt.getProperties());msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));msgInner.setQueueId(queueIdInt);msgInner.setSysFlag(msgExt.getSysFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(msgExt.getStoreHost());msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);String originMsgId = MessageAccessor.getOriginMessageId(msgExt);MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);

(2)在往broker写(1)中的消息时,由于消息的delayTimeLevel大于0,所以会对消息进行本文第三部分的处理,这里就和延迟消息的工作原理衔接上了。虽然消息重试次数的增加,消息被延迟处理的时间也会越长。

最后用一张图来总结下一条消费重试的消息在broker端的流转过程:

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。