200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > RocketMQ-初体验RocketMQ(09)-广播消息 延时消息 批量消息

RocketMQ-初体验RocketMQ(09)-广播消息 延时消息 批量消息

时间:2020-06-29 00:39:49

相关推荐

RocketMQ-初体验RocketMQ(09)-广播消息 延时消息 批量消息

文章目录

广播消息广播消息概述演示步骤延时消息概述使用场景延时机制实现原理示例批量消息批量消息概述示例代码

广播消息

广播消息概述

广播消息就是向所有用户发送消息。 如果我们希望所有订阅者都能收到有关某个主题的消息,可以使用广播消息。

举个例子 生产者发送10条消息,有2个订阅者,则这两个订阅者会分别收到10条消息, 而与广播模式相对应的集群模式这是 2个订阅者一共收到10条消息。

Rocketmq 消费者默认是集群的方式消费的,使用广播模式进行消费需要显示设置

核心:消费端设置消息模型consumer.setMessageModel(MessageModel.BROADCASTING);

演示步骤

启动2个或者2个以上的消费者启动生产者发送消息观察2个消费者的消息接收情况 :两个Consumer收到了同样的消息,OK.

生产者:

package com.artisan.rocketmq.broadcast;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.mon.message.Message;import org.apache.mon.RemotingHelper;/*** @author 小工匠* @version v1.0* @create -11-10 19:22* @motto show me the code ,change the word* @blog https://artisan./* @description**/public class BroadcastProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("consumer_model_group");producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");producer.start();for (int i = 0; i < 4; i++){Message msg = new Message("TopicTest","TagA","OrderID188",("Hello world"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}producer.shutdown();}}

消费者:

package com.artisan.rocketmq.broadcast;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.mon.consumer.ConsumeFromWhere;import org.mon.message.MessageExt;import org.mon.protocol.heartbeat.MessageModel;import java.util.List;/*** @author 小工匠* @version v1.0* @create -11-10 19:27* @motto show me the code ,change the word* @blog https://artisan./* @description**/public class BroadcastConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_model_group");consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//广播,全量消费consumer.setMessageModel(MessageModel.BROADCASTING);consumer.subscribe("TopicTest", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt ext : msgs){System.out.printf(Thread.currentThread().getName() + " Receive New Message: " + new String(ext.getBody()) + "%n");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Broadcast Consumer Started.%n");}}

测试结果:

生产者:

消费者1:

消费者2:

延时消息

概述

定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。

使用场景

举个例子: 电商系统,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

延时机制

org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel

当前支持的延迟时间

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

分别对应级别

1 2 3....................

设置消息时延

Message message = new Message;message.setDelayTimeLevel(3)

现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关。

实现原理

延迟队列的核心思路: 【利用中间队列临时存储】—>所有的延迟消息由producer消息发憷之后,都会存放在一个topic下 (SHCEDULE_TOPIC_XXXX), 不同的延迟级别对应不同的队列序号,当延迟时间到了之后,由定时线程读取转换为普通的消息存到真实指定的topic下,此时对于consumer端此消息才可见,从而被consumer消费。

示例

生产者:

package com.artisan.rocketmq.schedule;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.mon.message.Message;import java.util.Date;/*** @author 小工匠* @version v1.0* @create -11-10 17:23* @motto show me the code ,change the word* @blog https://artisan./* @description**/public class ScheduledMessageProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ExampleConsumer");producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");producer.start();int totalMessagesToSend = 3;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());//延时消费 6-->2分钟message.setDelayTimeLevel(6);// Send the messageproducer.send(message);}System.out.printf("message send is completed .%n" + new Date());producer.shutdown();}}

消费者:

package com.artisan.rocketmq.schedule;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.mon.message.MessageExt;import java.util.Date;import java.util.List;/*** @author 小工匠* @version v1.0* @create -11-10 17:23* @motto show me the code ,change the word* @blog https://artisan./* @description**/public class ScheduledMessageConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");consumer.subscribe("TestTopic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {// Print approximate delay time periodSystem.out.println(new Date() + "Receive message[msgId=" + message.getMsgId() + "] "+ "message content is :" + new String(message.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();//System.out.printf("Consumer Started.%n");}}

设置的延迟level为6 ,对应的时间间隔是两分钟,OK。

批量消息

批量消息概述

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息

此外,这一批消息的总大小不应超过4MB。rocketmq建议每次批量消息大小大概在1MB。当消息大小超过4MB时,需要将消息进行分割

示例

生产者

package com.artisan.rocketmq.batch;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.mon.message.Message;import java.util.ArrayList;import java.util.List;/*** @author 小工匠* @version v1.0* @create -11-10 21:27* @motto show me the code ,change the word* @blog https://artisan./* @description**/public class BatchProducer {public static void main(String[] args) throws Exception {/*** rocketMq 支持消息批量发送* 同一批次的消息应具有:相同的主题,相同的waitStoreMsgOK,并且不支持定时任务。* <strong> 同一批次消息建议大小不超过~1M </strong>,消息最大不能超过4M,需要* 对msg进行拆分*/DefaultMQProducer producer = new DefaultMQProducer("batch_group");producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");producer.start();String topic = "BatchTest";List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));ListSplitter splitter = new ListSplitter(messages);/*** 对批量消息进行拆分*/while (splitter.hasNext()) {try {List<Message> listItem = splitter.next();producer.send(listItem);} catch (Exception e) {e.printStackTrace();}}producer.shutdown();}}

消息拆分

package com.artisan.rocketmq.batch;import org.mon.message.Message;import java.util.Iterator;import java.util.List;import java.util.Map;/*** @author 小工匠* @version v1.0* @create -11-10 21:35* @motto show me the code ,change the word* @blog https://artisan./* @description**/public class ListSplitter implements Iterator<List<Message>> {private final int SIZE_LIMIT = 1000 * 1000 * 1;//1MBprivate final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Overridepublic boolean hasNext() {return currIndex < messages.size();}@Overridepublic List<Message> next() {int nextIndex = currIndex;int totalSize = 0;//遍历消息准备拆分for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);int tmpSize = message.getTopic().length() + message.getBody().length;Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();}tmpSize = tmpSize + 20; //for log overheadif (tmpSize > SIZE_LIMIT) {if (nextIndex - currIndex == 0) {nextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}List<Message> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}}

消费者

package com.artisan.rocketmq.batch;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.mon.message.MessageExt;import java.util.List;/*** @author 小工匠* @version v1.0* @create -11-10 21:38* @motto show me the code ,change the word* @blog https://artisan./* @description**/public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_group");consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");consumer.subscribe("BatchTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs){System.out.println("queueId=" + msg.getQueueId() + "," + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}}

代码

请移步: /yangshangwei/rocketmqMaster

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。