200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > RocketMQ 死信队列

RocketMQ 死信队列

时间:2022-11-02 15:17:40

相关推荐

RocketMQ 死信队列

RocketMQ 死信队列

死信队列

死信队列是什么?

死信队列指的是种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

在RocketMQ中消息重试超过一定次数后(默认16次)就会被放到死信队列中,在消息队列。

可以在控制台Topic列表中看到“DLQ”相关的Topic,默认命名是:

%RETRY%消费组名称(重试Topic)

%DLQ%消费组名称(死信Topic)

死信队列也可以被订阅和消费,并且也会过期RocketMQ 中

其中包括重试之后也无法消费的消息也会

死信队列应用场景

如我们平时下单后未在指定时间内付款,过来这个时间,我们的订单会被放入死信队列中。当我们再去付款时候,会发现订单已经被取消,此时我们只需要去死信队列中查该订单是否存在。

如当一些消息出现异常迟迟未被消费(或者最大重试次数后也未成功消费),这时候就会将消息存放到死信队列中。

示例

这里我们定义生产者

// 实例化生产者,并指定生产组名称DefaultMQProducer producer = new DefaultMQProducer("myproducer_group_topic_name_dle_01");//设置实例名称,一个jvm中有多个生产者可以根据实例名区分//默认defaultproducer.setInstanceName("topic_name_dle");// 指定nameserver的地址producer.setNamesrvAddr("localhost:9876");//设置同步重试次数producer.setRetryTimesWhenSendFailed(2);//设置异步发送次数//producer.setRetryTimesWhenSendAsyncFailed(2);// 初始化生产者producer.start();for (int i = 0; i <10 ; i++) {Message message = new Message("topic_name_dle", ("key=" + i).getBytes("utf-8"));// 1 同步发送 如果发送失败会根据重试次数重试SendResult send = producer.send(message);SendStatus sendStatus = send.getSendStatus();System.out.println(sendStatus.toString());}

消费者

这里默认返回消息消费失败,指定消费者重试一次。

/*** 推消息消费*/DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("consumer_group_topic_name_dle_01");// 指定nameserver的地址defaultMQPushConsumer.setNamesrvAddr("localhost:9876");//defaultMQPushConsumer.subscribe("topic_name_dle", "*");/*** 推送消息 提高消费处理能力* 1 提高消费并行度* 2 以批量方式进行 消费* 3 检测延时情况,跳过非重要消息*///消费限流 只针对推送来设置,拉取消息自己控制// 1 提高消费并行度defaultMQPushConsumer.setConsumeThreadMax(10);defaultMQPushConsumer.setConsumeThreadMin(1);// 2 以批量方式进行 消费// 设置消息批处理的一个批次中消息的最大个数defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);//设置重试次数 默认16次defaultMQPushConsumer.setMaxReconsumeTimes(1);// 添加消息监听器,一旦有消息推送过来,就进行消费defaultMQPushConsumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {//final MessageQueue messageQueue = context.getMessageQueue();for (MessageExt msg : msgs) {System.out.println(msg);try {System.out.println(new String(msg.getBody(), "utf-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}// 消息消费成功//return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//null 也表示推送失败,会进行重试return null;// 消息消费失败//return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});

启动消费者和生产者之后,消费者可以看到,消息重发了一次,这里图没截全。

RocketMq 可视化工具:rocketmq-console下载地址:

/apache/rocketmq-externals/archive/rocketmq-console-1.0.0.zip

下载成功带入idea 将配置文件改成自己的地址,然后启动

可以从控制台中看到,没有被正常消费的消息被发送到死信队列中

这里与RocketMQ不同的是RabbitMQ需要自己定义一个队列与交换机绑定,没有被成功消费会将消息发送到自己创建的死信队列中去,而RocketMQ不需要我们自己去指定死信队列,会自己根据重试次数以及消息是否消费成功,将消息发送到死信队列(不需要我们去创建)。

死信队列

重试的队列以

可以看到队列名前会%RETRY%前缀 表示是重试队列

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。