200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > RocketMQ延迟消息的底层实现源码解析

RocketMQ延迟消息的底层实现源码解析

时间:2022-12-12 04:41:23

相关推荐

RocketMQ延迟消息的底层实现源码解析

1.延迟消息的使用

Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));msg.setDelayTimeLevel(1);SendResult sendResult = producer.send(msg);

只需要在创建消息的时候指定消息的延迟级别就可以了,默认有18个延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

2.延迟消息的实现原理

在RocketMQ中,发送一个消息我们都是需要指定消息投递到哪个topic,但是如果这个消息设置了消息的延迟级别,那么该消息投递的就不是目标topic的,而是一个叫SCHEDULE_TOPIC_XXXX的topic,然后会有一个定时任务定时地去检查该topic里面的消息对应的延时时间是否已经结束了,如果结束了就把该消息重新放回目标topic,那么消费者此时就可以消费到了

3.延迟消息源码实现

(1)commitlog写入延迟消息

org.apache.mitLog#asyncPutMessage

调用链就不贴出来了,最终消息都是需要调用CommitLog的写入方法

if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// 条件成立:说明用户发送的是延时消息,设置了消息的延时级别if (msg.getDelayTimeLevel() > 0) {// 重置下延时级别,不能太大if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}// 延时消息投递的topic,SCHEDULE_TOPIC_XXXXtopic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;// 延时消息投递的queueId,delayLevel - 1queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// 备份下消息原本要投递的topic以及queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));// 设置延时消息投递的topic以及queueIdmsg.setTopic(topic);msg.setQueueId(queueId);}}

由于该方法太长,所以这里只贴出关于处理延时消息的部分。可以看到,如果消息设置了延迟级别(默认等于0),首先会把原始投递的topic和queueId保存到自身属性中,然后会把原始topic换成SCHEDULE_TOPIC_XXXX这个topic,并且queueId为延迟级别-1,然后后面就会写入commitlog文件中

(2)consumequeue写入延迟消息

org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput

消息写入了commitlog之后,后台线程会异步地把消息的索引信息写入到consumequeue文件中

DispatchRequest dispatchRequest =mitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);

org.apache.mitLog#checkMessageAndReturnSize(java.nio.ByteBuffer, boolean, boolean)

......{// 获取用户设置的消息延迟级别String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);// 这里基本都会成立,当发送延迟消息的时候,会先写入到commitlog中,并且会把消息的topic改成SCHEDULE_TOPIC_XXXXif (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {int delayLevel = Integer.parseInt(t);// 重置消息延迟级别if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();}if (delayLevel > 0) {// 根据延迟级别计算出消息延迟结束时间,也就是说对于延迟消息来说,在延迟时间还没结束之前,ConsumeQueueData中的tagCode记录的是延时结束时间tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,storeTimestamp);}}}......

所以可以看到对于延迟消息来说,在延迟队列对应的consumequeue中存储的条目数据其中tagCode这一块内容存储的并不是该消息的tagCode,而是该消息的延迟结束时间

(3)加载负责延迟消息组件---ScheduleMessageService

在broker启动的时候,会去加载各种文件中的数据到内存,其中需要把delayOffset.json文件加载到内存中,而这个工作就是ScheduleMessageService去做的

org.apache.rocketmq.store.schedule.ScheduleMessageService#load

public boolean load() {boolean result = super.load();// 初始化delayLevelTable表result = result && this.parseDelayLevel();return result;}

调用父类的load方法,在父类的load方法中读取到文件内容之后,就会去转换成java对象,看decode方法

public void decode(String jsonString) {if (jsonString != null) {DelayOffsetSerializeWrapper delayOffsetSerializeWrapper =DelayOffsetSerializeWrapper.fromJson(jsonString, DelayOffsetSerializeWrapper.class);if (delayOffsetSerializeWrapper != null) {this.offsetTable.putAll(delayOffsetSerializeWrapper.getOffsetTable());}}}

public class DelayOffsetSerializeWrapper extends RemotingSerializable {private ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =new ConcurrentHashMap<Integer, Long>(32);public ConcurrentMap<Integer, Long> getOffsetTable() {return offsetTable;}public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) {this.offsetTable = offsetTable;}}

可以看到里面存的数据就是每一个延迟队列对应的消息最大偏移量。但是仔细看load方法,在调用完父类的load方法之后,ScheduleMessageService还会调用一个parseDelayLevel方法,代码如下

/*** 初始化delayLevelTable表* @return*/public boolean parseDelayLevel() {// 计算出每一个时间单位对应的毫秒数HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();timeUnitTable.put("s", 1000L);timeUnitTable.put("m", 1000L * 60);timeUnitTable.put("h", 1000L * 60 * 60);timeUnitTable.put("d", 1000L * 60 * 60 * 24);// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hString levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();try {String[] levelArray = levelString.split(" ");for (int i = 0; i < levelArray.length; i++) {String value = levelArray[i];// 获取到时间单位String ch = value.substring(value.length() - 1);// 获取该时间对应的毫秒数Long tu = timeUnitTable.get(ch);int level = i + 1;if (level > this.maxDelayLevel) {// 得到最大的延迟级别this.maxDelayLevel = level;}long num = Long.parseLong(value.substring(0, value.length() - 1));long delayTimeMillis = tu * num;this.delayLevelTable.put(level, delayTimeMillis);}} catch (Exception e) {log.error("parseDelayLevel exception", e);log.info("levelString String = {}", levelString);return false;}return true;}

这个方法中做的事情也很简单,就是把配置中的每一个延迟级别都转换成对应的时间毫秒数,然后初始化delayLevelTable表

(4)启动ScheduleMessageService

org.apache.rocketmq.store.schedule.ScheduleMessageService#start

/*** 在消息存储服务启动的时候会被调用*/public void start() {if (pareAndSet(false, true)) {this.timer = new Timer("ScheduleMessageTimerThread", true);for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {Integer level = entry.getKey();Long timeDelay = entry.getValue();Long offset = this.offsetTable.get(level);if (null == offset) {offset = 0L;}// 给每一个延迟级别都启动对应的TimeTask,延迟1s执行if (timeDelay != null) {this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);}}// 每10s把每一个延迟队列的最大消息偏移量写入到磁盘中this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {if (started.get()) ScheduleMessageService.this.persist();} catch (Throwable e) {log.error("scheduleAtFixedRate flush exception", e);}}}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());}}

首先遍历delayLevelTable表,根据延迟级别从offsetTable表中找到该延迟队列中最大的消息偏移量,然后给每一个延迟级别都创建一个TimeTask,并且把对应的延迟级别和消息偏移量都传进TimeTask中,然后延迟1s执行。接着再创建一个定时任务每10s把每一个延迟队列的最大消息偏移量写入到delayOffset.json文件中

(5)执行每一个TimeTask

org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup

ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));

根据延迟级别找到topic为SCHEDULE_TOPIC_XXXX的队列,然后根据offset得到对应的consumequeue文件,再返回该文件从起始偏移量到写入位点的所有ConsumeQueueData

// 消息的commitlog物理偏移量long offsetPy = bufferCQ.getByteBuffer().getLong();// 消息大小int sizePy = bufferCQ.getByteBuffer().getInt();// 延迟结束时间,在消息写入到commitlog之后会进行分发到consumequeue,而对于延迟消息来说,tagCode这个位置存储的是该消息的延迟到期时间long tagsCode = bufferCQ.getByteBuffer().getLong();

获取到ConsumeQueueData中存储的值

// 获取当前时间long now = System.currentTimeMillis();// 得到正确的到期时间long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);// 这里计算出的偏移位是当前遍历的前一个ConsumeQueueData的位点,作用是当延迟消息投递到原始的topic失败的时候会根据这个偏移位点去重新执行投递nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);long countdown = deliverTimestamp - now;// 条件成立: 说明延迟时间已经结束了if (countdown <= 0) {// 根据commitlog物理偏移量找到msgMessageExt msgExt =ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);if (msgExt != null) {try {// cp创建一个新的msg对象,该msg对象的topic以及queueId都回到了原始的值MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",msgInner.getTopic(), msgInner);continue;}// 写入到commitlog中PutMessageResult putMessageResult =ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);// 写入成功,跳过该次循环判断下一条延迟消息是否达到到期时间if (putMessageResult != null&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {continue;} else {// XXX: warn and notify melog.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",msgExt.getTopic(), msgExt.getMsgId());ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,nextOffset), DELAY_FOR_A_PERIOD);ScheduleMessageService.this.updateOffset(this.delayLevel,nextOffset);return;}} catch (Exception e) {/** XXX: warn and notify me*/log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="+ offsetPy + ",sizePy=" + sizePy, e);}}}// 条件成立: 说明此时队列中的延迟时间还未结束else {// 重新提交一个TimeTask,并且设置的延迟执行时间为消息剩余的延时时间ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown);// 更新延迟队列已消费的消息偏移量ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;}

然后就会去判断该消息的延迟时间是否已经结束了,如果结束了,那么就把消息重新投递到原来的topic的队列中(原始的topic和queueId存储在消息属性中),然后再去遍历下一个ConsumeQueueData,否则如果这个ConsumeQueueData中的延迟时间还未结束,就重新提交一个TimeTask,该TimeTask延迟执行的时间是这个ConsumeQueueData剩余的延时时间(当这个TimeTask执行的时候,这个ConsumeQueueData肯定延时时间肯定就已经结束了),然后再更新该延迟队列已消费的消息偏移量

if (putMessageResult != null&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {continue;} else {// XXX: warn and notify melog.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",msgExt.getTopic(), msgExt.getMsgId());ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,nextOffset), DELAY_FOR_A_PERIOD);ScheduleMessageService.this.updateOffset(this.delayLevel,nextOffset);return;}

但是如果写入延迟消息失败的话,就需要从上一个ConsumeQueue的偏移量开始,延迟0.1s重新执行TimeTask,并且把上一个ConsumeQueueData的偏移量更新到该延迟队列的最大已消费偏移量

// 如果已经遍历完了延迟队列中的所有消息了,那么就计算出此时最后一个消息的偏移位,然后根据这个偏移位再次发起一个TimeTasknextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);// 更新延迟队列已消费的消息偏移量ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);return;

如果消息都遍历完了,那么就延迟0.1s再发起一个TimeTask重新遍历该consumequeue文件,并且此时会更新延迟队列已消费的消息偏移量

ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,failScheduleOffset), DELAY_FOR_A_WHILE);

如果是一开始ScheduleMessageService启动的时候,此时延迟队列是还没有对应的consumequeue文件的,所以此时会延迟0.1s重新发起一个TimeTask

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。