200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > 【RocketMQ】延迟消息(延迟队列)

【RocketMQ】延迟消息(延迟队列)

时间:2018-09-12 09:21:56

相关推荐

【RocketMQ】延迟消息(延迟队列)

文章目录

1. 什么是延迟消息1.1 延时消息的使用场景2. 示例3. 原理参考

1. 什么是延迟消息

发送消息后,消费者要等待一定的时间才能消费到该消息。

RocketMQ 不支持任意时间自定义的延迟消息,仅支持内置预设值的延迟时间间隔的延迟消息。

预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h

对于开源版是这样,而对于专业版,是可以自定义时间的。

1.1 延时消息的使用场景

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

2. 示例

对于生产者,设定一个延迟时间,而对于消费者,没有什么不同:

生产者:

package com.xin.rocketmq.demo.testrun;import com.xin.rocketmq.demo.config.JmsConfig;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.mon.message.Message;public class ProducerDelay {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("192.168.10.11:9876");producer.start();Message msg1 = new Message(JmsConfig.TOPIC,"订单001".getBytes());msg1.setDelayTimeLevel(2);//延迟5秒Message msg2 = new Message(JmsConfig.TOPIC,"订单001".getBytes());msg2.setDelayTimeLevel(4);//延迟30秒SendResult sendResult1 = producer.send(msg1);SendResult sendResult2 = producer.send(msg2);System.out.println("Product1-同步发送-Product信息={}" + sendResult1);System.out.println("Product2-同步发送-Product信息={}" + sendResult2);producer.shutdown();}}

msg2.setDelayTimeLevel(4);设置延迟时间。

消费者:

package com.xin.rocketmq.demo.testrun;import com.xin.rocketmq.demo.config.JmsConfig;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.mon.message.MessageExt;import java.util.List;public class ConsumerDelay {public static void main(String[] args) throws Exception {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// 设置NameServer的地址consumer.setNamesrvAddr("192.168.10.11:9876");// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息consumer.subscribe(JmsConfig.TOPIC, "*");// 注册消息监听者consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {// Print approximate delay time periodSystem.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();}}

3. 原理

开源RocketMQ支持延迟消息,但是不支持秒级精度。默认支持18个level的延迟消息,这是通过broker端的messageDelayLevel配置项确定的,如下:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

Broker在启动时,内部会创建一个内部主题:SCHEDULE_TOPIC_XXXX,根据延迟level的个数,创建对应数量的队列,也就是说18个level对应了18个队列。注意,这并不是说这个内部主题只会有18个队列,因为Broker通常是集群模式部署的,因此每个节点都有18个队列。

延迟级别的值可以进行修改,以满足自己的业务需求,可以修改/添加新的level。例如:你想支持2天的延迟,修改最后一个level的值为2d,这个时候依然是18个level;也可以增加一个2d,这个时候总共就有19个level。

可以看到这里并不支持秒级精度,按照《rocketmq developer guide》中的说法,是为了避免在broker对消息进行排序,造成性能影响。不过笔者考虑,之所以不支持,更多应该是商业上的考虑。

参考

深入理解RocketMQ延迟消息

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。