200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > 分布式消息队列RocketMQ继承SpringBoot

分布式消息队列RocketMQ继承SpringBoot

时间:2020-08-11 16:51:26

相关推荐

分布式消息队列RocketMQ继承SpringBoot

一、介绍

Springboot 继承RocketMQ:

com.alibaba.cloud

spring-cloud-starter-stream-rocketmq

底层封装了 rocketmq-client

参数设置

application.yml

#rocketmq配置

rocketmq:

name-server: 127.0.0.1:9876

生产者配置

producer:

#生产者组名,规定在一个应用里面必须唯一

group: producerGroup

#消息发送的超时时间 默认3000ms

send-message-timeout: 3000

#消息达到4096字节的时候,消息就会被压缩。默认 4096

compress-message-body-threshold: 4096

#最大的消息限制,默认为128K

max-message-size: 4194304

#同步消息发送失败重试次数

retry-times-when-send-failed: 3

#在内部发送失败时是否重试其他代理,这个参数在有多个broker时才生效

retry-next-server: true

#异步消息发送失败重试的次数

retry-times-when-send-async-failed: 3

consumer:

isOnOff: on

# 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识

groupName: sampleGroup

# mq的nameserver地址

namesrvAddr: ip:9876

# 消费者订阅的主题topic和tags(标识订阅该主题下所有的tags),格式: topic~tag1||tag2||tags3;

topic: topic

tag: ""

# 消费者线程数据量

consumeThreadMin: 5

consumeThreadMax: 32

# 设置一次消费信息的条数,默认1

consumeMessageBatchMaxSize: 1

二、关键类

1.RocketMQTemplate:提供了各种操作MQ的发放。

2.RocketMQLocalTransactionListener:本地事务监听器 。

3.RocketMQListener:消费信息监听器 。

4.MessageQueueSelector:消息队列选择策略。

5.DefaultMQProducer:默认的生产者。

三、生产者方法

同步(顺序、普通、延迟消息)消息:

//发送普通同步消息-Object

syncSend(String destination, Object payload)

//发送普通同步消息-Message

syncSend(String destination, Message message)

//发送普通同步消息-Object,并设置发送超时时间

syncSend(String destination, Object payload, long timeout)

//发送普通同步消息-Message,并设置发送超时时间

syncSend(String destination, Message message, long timeout)

//发送普通同步延迟消息,并设置超时,这个下文会演示

syncSend(String destination, Message message, long timeout, int delayLevel)

异步消息(普通、延迟消息):

//发送普通异步消息-Object

asyncSend(String destination, Object payload, SendCallback sendCallback)

//发送普通异步消息-Message

asyncSend(String destination, Message message, SendCallback sendCallback)

//发送普通异步消息-Object,并设置发送超时时间

asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout)

//发送普通异步消息-Message,并设置发送超时时间

asyncSend(String destination, Message message, SendCallback sendCallback, long timeout)

单向(顺序、普通、延迟消息)信息:

//发送单向信息–Message

sendOneWay(String destination, Message message)

//发送单向信息–Object

sendOneWay(String destination, Object payload)

//发送单向顺序信息–Message

public void sendOneWayOrderly(String destination, Message message, String hashKey)

//发送单向顺序信息–Object

public void sendOneWayOrderly(String destination, Object payload, String hashKey)

事务消息:

//发送事务消息

public TransactionSendResult sendMessageInTransaction(String txProducerGroup, String destination, Message message, Object arg)

//取消事务消息

public void removeTransactionMQProducer(String txProducerGroup)

带标签tag消息:

rocketMQTemplate.syncSend(topic+“:”+tag, message)

SQL表达式过滤消息(SQL92过滤):

需要在broker.conf添加enablePropertyFilter=true 支持sql过滤

SQL表达式方式可以根据发送消息时输入的属性进行一些计算。

RocketMQ的SQL表达式语法 只定义了一些基本的语法功能。

数字比较,如>,>=,<,<=,BETWEEN,=;

字符比较,如:=,<>,IN;IS NULL or IS NOT NULL;

逻辑运算符:AND, OR, NOT;

常量类型:

数值,如:123, 3.1415;

字符, 如:‘abc’, 必须使用单引号;

NULL,特殊常量

Boolean, TRUE or FALSE;

String msg = “sql过滤”;

Message message = MessageBuilder.withPayload(msg).build() ;

Map headers = new HashMap<>() ;

headers.put(“i”, 5) ;

rocketMQTemplate.convertAndSend(“test-sql-topic”, message, headers);

注意:

消息可以指定过滤类型为tag,则 destination传入格式为:topicName:tagName

四、消费者方法

1.push模式

消息的生产者将消息发送到broker,然后broker将消息主动推送给订阅了该消息的消费者端。

@Service

@RocketMQMessageListener(consumerGroup = “common-customer-group”, topic = “common_topic”, messageModel = MessageModel.CLUSTERING)

public class CommonConsumerListener implements RocketMQListener {

@Override

public void onMessage(Object o) {

System.out.println(“通用消费者-----------------”+o.toString());

//消费者处理时抛出异常时就会自动重试

throw new RuntimeException(“消费者处理时抛出异常时就会自动重试”);

}

}

@Service

@RocketMQMessageListener(consumerGroup = “common-customer-sql-group”, topic = “common_topic”,selectorType = SelectorType.SQL92 ,selectorExpression = “type=‘user’ or a <7”, messageModel = MessageModel.CLUSTERING)

public class SqlConsumerListener implements RocketMQListener {

@Override

public void onMessage(MessageExt message) {

System.out.println(“消费消息:”+new String(message.getBody()));

System.out.println(“消费消息:”+message.getProperties());

}

}

@Service

@RocketMQMessageListener(consumerGroup = “common-customer-tag-group”, topic = “common_topic”,selectorType = SelectorType.TAG ,selectorExpression = “tagA||tagB”, messageModel = MessageModel.CLUSTERING)

public class TagConsumerListener implements RocketMQListener {

@Override

public void onMessage(Object o) {

System.out.println(o.toString());

}

}

注意:

消费者监听消息,如果抛出异常,则开启重试。

selectorType:指定消息通过的tag的方式,默认为SelectorType.TAG

messageModel:指定消息的消费模式,默认为MessageModel.CLUSTERING模式每条消息只能由一个消费者消费,而MessageModel.BROADCASTING模式为广播模式,所有订阅者都能消费。

selectorExpression : 采用rocketMQ支持的表达式,如Tag消息能够被消费,多个采用||分割。SQL消息采用SQL表达式过滤消息。

consumerGroup: 消费组

topic:主题

2.pull模式

消息生产者将消息发送到broker上,然后由消费者自发的去broker去拉取消息。

五、事务消息

RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。 本质是两阶段提交。

1.分布式事务

分布式事务通俗来说,一次操作由若干分支组成,这些分支操作分属于不同的应用,分布在不同的服务器上,分布式事务需要保证这些操作要么全部成功,要么全部失败,分支事务与普通事务一样,就是为了保证操作结果的一致性。

2.事务消息

要么同时成功,要么同时失败。

3.半事务消息

暂存投递的消息,生产者已经成功地将消息发送到了消息队列RocketMQ版服务端,但是消息队列RocketMQ版服务端未收到生产者对消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。

4.消息回查

由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列RocketMQ版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。消息事务重试

5.本地事务状态

生产者回调操作执行的结果为本地事务状态。其会发送给事务协调者,而协调者会在发送给事务管理者,事务管理者会根据协调者发送过来的本地事务状态来决定全局事务确认指令。该句话:具体请查看阿里云 Seata 分布式事务。

RocketMQ 的事务消息分为3种状态,分别是提交状态、回滚状态、未知状态。

MIT: 提交事务,它允许消费者消费此消息。

RocketMQLocalTransactionState.ROLLBACK: 回滚事务,它代表该消息将被删除,不允许被消费。

RocketMQLocalTransactionState.UNKNOWN: 未知状态,它代表需要检查消息队列来确定状态。调用checkLocalTransaction方法,最多重试15次,超过了默认丢弃此消息.

RocketMQ事务消息通过异步确保方式,保证事务的最终一致性。

事务消息发送步骤如下:

生产者将半事务消息发送至消息队列RocketMQ服务端。

消息队列RocketMQ版服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息为半事务消息。

生产者开始执行本地事务逻辑。

生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。

二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。

事务消息回查步骤如下:

生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

回查间隔时间:系统默认每隔30秒发起一次定时任务,对未提交的半事务消息进行回查,共持续12小时。

第一次消息回查最快时间:该参数支持自定义设置。若指定消息未达到设置的最快回查时间前,系统默认每隔30秒一次的回查任务不会检查该消息。

备注:本地事务的回滚依赖于本地DB的ACID特性,订阅方的成功消费由 MQ Server 的失败重试机制进行保证。

六、案例展示

工具类:

package com.rocketmq.service.config;

import org.apache.rocketmq.client.producer.SendCallback;

import org.apache.rocketmq.client.producer.SendResult;

import org.apache.rocketmq.client.producer.TransactionSendResult;

import org.apache.rocketmq.spring.core.RocketMQTemplate;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.lang.Nullable;

import org.springframework.messaging.Message;

import org.springframework.messaging.MessagingException;

import org.springframework.messaging.core.MessagePostProcessor;

import org.ponent;

import javax.annotation.PostConstruct;

import javax.annotation.PreDestroy;

import java.util.Collection;

import java.util.Map;

@Component

public class RocketMqBuilder {

private static final Logger LOG = LoggerFactory.getLogger(RocketMqBuilder.class);@Autowiredprivate RocketMQTemplate rocketMQTemplate;@PostConstructpublic void init() {LOG.info("---RocketMq助手初始化---");}//==================同步消息开始===========================//public SendResult syncSendMessage(String topic, Object message) {return rocketMQTemplate.syncSend(topic, message);}public SendResult syncSendMessage(String topic, Message<!--?--> message) {return rocketMQTemplate.syncSend(topic, message);}public SendResult syncSendMessageTimeOut(String topic, Object message, long timeout) {return rocketMQTemplate.syncSend(topic, message, timeout);}public SendResult syncSendMessageTimeOut(String topic, Message<!--?--> message, long timeout) {return rocketMQTemplate.syncSend(topic, message, timeout);}public SendResult syncSendMessageTimeOut(String topic, Message<!--?--> message, long timeout, int delayLevel) {return rocketMQTemplate.syncSend(topic, message, timeout, delayLevel);}public SendResult syncBtachSendMessage(String topic, Collection<!--?--> messages) {return rocketMQTemplate.syncSend(topic, messages);}public SendResult syncBtachSendMessage(String topic, Collection<!--?--> messages, long timeout) {return rocketMQTemplate.syncSend(topic, messages, timeout);}public SendResult syncSendMessage(String topic, Object message, String hashKey) {return rocketMQTemplate.syncSendOrderly(topic, message, hashKey);}public SendResult syncSendMessage(String topic, Object message, String hashKey, long timeout) {return rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);}public SendResult syncSendMessage(String topic, Message<!--?--> message, String hashKey) {return rocketMQTemplate.syncSendOrderly(topic, message, hashKey);}public SendResult syncSendMessage(String topic, Message<!--?--> message, String hashKey, long timeout) {return rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);}//==================同步消息结束===========================////==================异步消息开始===========================//public void asyncSendMessage(String topic, Object message, SendCallback sendCallback) {rocketMQTemplate.asyncSend(topic, message, sendCallback != null ? sendCallback : getDefaultSendCallBack());}public void asyncSendMessage(String topic, Message<!--?--> message, SendCallback sendCallback) {rocketMQTemplate.asyncSend(topic, message, sendCallback != null ? sendCallback : getDefaultSendCallBack());}public void asyncSendMessage(String topic, Object message, SendCallback sendCallback, long timeout) {rocketMQTemplate.asyncSend(topic, message, sendCallback != null ? sendCallback : getDefaultSendCallBack(), timeout);}public void asyncSendMessage(String topic, Message<!--?--> message, SendCallback sendCallback, long timeout) {rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);}public void asyncSendMessage(String topic, Message<!--?--> message, long timeout) {rocketMQTemplate.asyncSend(topic, message, getDefaultSendCallBack(), timeout);}public void asyncSendMessage(String topic, Object message, String hashKey, SendCallback sendCallback) {rocketMQTemplate.asyncSendOrderly(topic, message, hashKey, sendCallback != null ? sendCallback : getDefaultSendCallBack());}public void asyncSendMessage(String topic, Object message, String hashKey, SendCallback sendCallback, long timeout) {rocketMQTemplate.asyncSendOrderly(topic, message, hashKey, sendCallback != null ? sendCallback : getDefaultSendCallBack(), timeout);}public void syncSendMessage(String topic, Message<!--?--> message, String hashKey, SendCallback sendCallback) {rocketMQTemplate.asyncSendOrderly(topic, message, hashKey, sendCallback != null ? sendCallback : getDefaultSendCallBack());}public void syncSendMessage(String topic, Message<!--?--> message, String hashKey, SendCallback sendCallback, long timeout) {rocketMQTemplate.asyncSendOrderly(topic, message, hashKey, sendCallback != null ? sendCallback : getDefaultSendCallBack(), timeout);}//==================异步消息结束===========================////==================单向消息开始===========================//public void sendOneWay(String topic, Object message) {rocketMQTemplate.sendOneWay(topic, message);}public void sendOneWay(String topic, Message<!--?--> message) {rocketMQTemplate.sendOneWay(topic, message);}public void sendOneWay(String topic, Object message, String hashKey) {rocketMQTemplate.sendOneWayOrderly(topic, message, hashKey);}public void sendOneWay(String topic, Message<!--?--> message, String hashKey) {rocketMQTemplate.sendOneWayOrderly(topic, message, hashKey);}//==================单向消息结束===========================//public TransactionSendResult sendMessageInTransaction(String txProducerGroup, String topic, Message<!--?--> message, Object value) {return rocketMQTemplate.sendMessageInTransaction(txProducerGroup, topic, message, value);}public void sendMessageInTransaction(String txProducerGroup) {rocketMQTemplate.removeTransactionMQProducer(txProducerGroup);}private SendCallback getDefaultSendCallBack() {return new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {LOG.info("---发送MQ成功---");}@Overridepublic void onException(Throwable throwable) {LOG.error("---发送MQ失败---" + throwable.getMessage(), throwable.getMessage());}};}@PreDestroypublic void destroy() {LOG.info("---RocketMq助手注销---");}public void convertAndSend(String topic, Message<!--?--> message) {rocketMQTemplate.convertAndSend(topic, message);}public void convertAndSend(String topic, Message<!--?--> message,Map<string, object=""> headMap) {rocketMQTemplate.convertAndSend(topic, message,headMap);}

}

</string,>

前提:

@Autowired

private RocketMqBuilder rocketMqUtils;

private static final String topic = “common_topic”;

private static final String asyncTopic = “common_async_topic”;

private static final String delayTopic = “common_delay_topic”;

private static final String topicOrder = “topic_orderly”;

private static final String syncTopicOrder = “sync_test_topic_orderly”;

private static final String oneTopicOrder = “one_test_topic_orderly”;

private static final String txTopic = “common-topic-tx”;

private static final String txGroup = “common-tx-group”;

1.普通消息

生产者:

@RequestMapping(“/sendBatchMessage”)

public SendStatus sendBatchMessage() {

List msgs = new ArrayList();

for (int i = 0; i < 10; i++) {

msgs.add(MessageBuilder.withPayload(“Hello RocketMQ Batch Msg#” + i).setHeader(RocketMQHeaders.KEYS, “KEY_” + i).build());

}

SendResult sendResult = rocketMqUtils.syncSendMessage(topic, msgs);

return sendResult.getSendStatus();

}

@RequestMapping(“/sendCommonMessage”)

public SendStatus sendCommonMessage(String message) {

SendResult sendResult = rocketMqUtils.syncSendMessage(topic, message);

return sendResult.getSendStatus();

}

@RequestMapping(“/sendCommonMessageOne”)

public SendStatus sendCommonMessageOne(String message) {

SendResult sendResult = rocketMqUtils.syncSendMessage(topic, MessageBuilder.withPayload(message).build());

return sendResult.getSendStatus();

}

消费者:

import org.mon.message.MessageExt;

import org.apache.rocketmq.spring.annotation.MessageModel;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

import org.apache.rocketmq.spring.core.RocketMQListener;

import org.springframework.stereotype.Service;

@Service

@RocketMQMessageListener(consumerGroup = “common-customer-group”, topic = “common_topic”, messageModel = MessageModel.CLUSTERING)

public class CommonConsumerListener implements RocketMQListener {

@Override

public void onMessage(MessageExt messageExt) {

System.out.println(“通用消费者-----------------”+messageExt.toString());

//消费者处理时抛出异常时就会自动重试

// throw new RuntimeException(“消费者处理时抛出异常时就会自动重试”);

}

}

2.异步消息

生产者:

@RequestMapping(“/sendAsyncCommonMessage”)

public String sendAsyncCommonMessage(String message) {

rocketMqUtils.asyncSendMessage(asyncTopic, message, new SendCallback() {

@Override

public void onSuccess(SendResult sendResult) {

log.info(“异步消息发送成功:{}”, sendResult);

}

@Overridepublic void onException(Throwable throwable) {log.info("异步消息发送失败:{}", throwable.getMessage());}});return "ok";

}

@RequestMapping(“/sendAsyncCommonMessageOne”)

public String sendAsyncCommonMessageOne(String message) {

rocketMqUtils.asyncSendMessage(asyncTopic, MessageBuilder.withPayload(message).build(), new SendCallback() {

@Override

public void onSuccess(SendResult sendResult) {

log.info(“异步消息发送成功:{}”, sendResult);

}

@Overridepublic void onException(Throwable throwable) {log.info("异步消息发送失败:{}", throwable.getMessage());}});return "ok";

}

消费者:

import org.mon.message.MessageExt;

import org.apache.rocketmq.spring.annotation.MessageModel;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

import org.apache.rocketmq.spring.core.RocketMQListener;

import org.springframework.stereotype.Service;

@Service

@RocketMQMessageListener(consumerGroup = “common-customer-async-group”, topic = “common_async_topic”, messageModel = MessageModel.CLUSTERING)

public class CommonConsumerAsyncMessageListener implements RocketMQListener {

@Override

public void onMessage(MessageExt messageExt) {

System.out.println(“异步消费-----------------”+messageExt.toString());

//消费者处理时抛出异常时就会自动重试

// throw new RuntimeException(“消费者处理时抛出异常时就会自动重试”);

}

}

3.顺序消息

生产者:

@RequestMapping(“/sendSyncOrderMessage”)

public String sendSyncOrderMessage() {

//参数一:topic 如果想添加tag,可以使用"topic:tag"的写法

//参数二:消息内容

//参数三:hashKey 用来计算决定消息发送到哪个消息队列

String message = "orderly message: ";

for (int i = 0; i < 10; i++) {

// 模拟有序发送消息

rocketMqUtils.syncSendMessage(topicOrder, message + i, “select_queue_key”);

}

return “ok”;

}

@RequestMapping(“/sendASyncOrderMessage”)

public String sendASyncOrderMessage() {

//参数一:topic 如果想添加tag,可以使用"topic:tag"的写法

//参数二:消息内容

//参数三:hashKey 用来计算决定消息发送到哪个消息队列, 一般是订单ID,产品ID等

rocketMqUtils.asyncSendMessage(syncTopicOrder, “111111创建”, “111111”, new SendCallback() {

@Override

public void onSuccess(SendResult sendResult) {

log.info(“异步消息发送成功:{}”, sendResult);

}

@Overridepublic void onException(Throwable throwable) {log.info("异步消息发送失败:{}", throwable.getMessage());}});rocketMqUtils.asyncSendMessage(syncTopicOrder, "111111支付", "111111", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("异步消息发送成功:{}", sendResult);}@Overridepublic void onException(Throwable throwable) {log.info("异步消息发送失败:{}", throwable.getMessage());}});rocketMqUtils.asyncSendMessage(syncTopicOrder, "111111完成", "111111", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("异步消息发送成功:{}", sendResult);}@Overridepublic void onException(Throwable throwable) {log.info("异步消息发送失败:{}", throwable.getMessage());}});rocketMqUtils.asyncSendMessage(syncTopicOrder, "222222创建", "222222", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("异步消息发送成功:{}", sendResult);}@Overridepublic void onException(Throwable throwable) {log.info("异步消息发送失败:{}", throwable.getMessage());}});rocketMqUtils.asyncSendMessage(syncTopicOrder, "222222支付", "222222", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("异步消息发送成功:{}", sendResult);}@Overridepublic void onException(Throwable throwable) {log.info("异步消息发送失败:{}", throwable.getMessage());}});rocketMqUtils.asyncSendMessage(syncTopicOrder, "222222完成", "222222", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("异步消息发送成功:{}", sendResult);}@Overridepublic void onException(Throwable throwable) {log.info("异步消息发送失败:{}", throwable.getMessage());}});return "ok";

}

@RequestMapping(“/sendOneWayMessage”)

public String sendOneWayMessage() {

//参数一:topic 如果想添加tag,可以使用"topic:tag"的写法

//参数二:消息内容

//参数三:hashKey 用来计算决定消息发送到哪个消息队列

String message = "one orderly message: ";

for (int i = 0; i < 10; i++) {

// 模拟有序发送消息

rocketMqUtils.sendOneWay(oneTopicOrder, message + i, “select_queue_key”);

}

return “ok”;

}

消费者:

import org.mon.message.MessageExt;

import org.apache.rocketmq.spring.annotation.ConsumeMode;

import org.apache.rocketmq.spring.annotation.MessageModel;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

import org.apache.rocketmq.spring.core.RocketMQListener;

import org.springframework.stereotype.Service;

@Service

@RocketMQMessageListener(consumerGroup = “customer-orderly-group”, topic = “topic_orderly” ,consumeMode = ConsumeMode.ORDERLY, messageModel = MessageModel.CLUSTERING)

public class CommonOrderConsumerListener implements RocketMQListener {

@Override

public void onMessage(MessageExt messageExt) {

System.out.println(“顺序信息的三种方式:同步-----------------”+messageExt.toString());

}

}

import org.mon.message.MessageExt;

import org.apache.rocketmq.spring.annotation.ConsumeMode;

import org.apache.rocketmq.spring.annotation.MessageModel;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

import org.apache.rocketmq.spring.core.RocketMQListener;

import org.springframework.stereotype.Service;

@Service

@RocketMQMessageListener(consumerGroup = “common-customer-sync-orderly-group”, topic = “sync_test_topic_orderly” ,consumeMode = ConsumeMode.ORDERLY, messageModel = MessageModel.CLUSTERING)

public class CommonSyncOrderConsumerListener implements RocketMQListener {

@Override

public void onMessage(MessageExt messageExt) {

System.out.println(“顺序信息的三种方式:异步-----------------”+messageExt.toString());

}

}

import org.mon.message.MessageExt;

import org.apache.rocketmq.spring.annotation.ConsumeMode;

import org.apache.rocketmq.spring.annotation.MessageModel;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

import org.apache.rocketmq.spring.core.RocketMQListener;

import org.springframework.stereotype.Service;

@Service

@RocketMQMessageListener(consumerGroup = “common-customer-one-orderly-group”, topic = “one_test_topic_orderly”,consumeMode = ConsumeMode.ORDERLY, messageModel = MessageModel.CLUSTERING)

public class CommonOneOrderConsumerListener implements RocketMQListener {

@Override

public void onMessage(MessageExt messageExt) {

System.out.println(“顺序信息的三种方式:单向-----------------”+messageExt.toString());

}

}

4.延迟消息

生产者:

@RequestMapping(“/sendSyncDelayMessage”)

public SendStatus sendSyncDelayMessage(String message) {

Message messageData = MessageBuilder.withPayload(message + new Date()).build();

SendResult sendResult = rocketMqUtils.syncSendMessageTimeOut(delayTopic, messageData, 3000, 3);return sendResult.getSendStatus();

}

@RequestMapping(“/sendASyncDelayMessage”)

public String sendASyncDelayMessage(String message) {

Message messageData = MessageBuilder.withPayload(message + new Date()).build();

SendCallback sc = new SendCallback() {

@Override

public void onSuccess(SendResult sendResult) {

log.info(“发送异步延时消息成功”);

}

@Overridepublic void onException(Throwable throwable) {log.info("发送异步延时消息失败:{}", throwable.getMessage());}};rocketMqUtils.asyncSendMessage(delayTopic, messageData, sc, 3000);return "ok";

}

消费者:

import org.mon.message.MessageExt;

import org.apache.rocketmq.spring.annotation.MessageModel;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

import org.apache.rocketmq.spring.core.RocketMQListener;

import org.springframework.stereotype.Service;

@Service

@RocketMQMessageListener(consumerGroup = “common-customer-delay-group”, topic = “common_delay_topic”, messageModel = MessageModel.CLUSTERING)

public class CommonDelayConsumerListener implements RocketMQListener {

@Override

public void onMessage(MessageExt messageExt) {

System.out.println(“延迟消息-----------------” + messageExt.toString());

//消费者处理时抛出异常时就会自动重试

// throw new RuntimeException(“消费者处理时抛出异常时就会自动重试”);

}

}

5.tag消息

生产者:

@RequestMapping(“/sendCommonMessageByTag”)

public SendStatus sendCommonMessage(String message, String tag) {

SendResult sendResult = rocketMqUtils.syncSendMessage(topic + “:” + tag, message);

return sendResult.getSendStatus();

}

消费者:

import org.mon.message.MessageExt;

import org.apache.rocketmq.spring.annotation.MessageModel;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

import org.apache.rocketmq.spring.annotation.SelectorType;

import org.apache.rocketmq.spring.core.RocketMQListener;

import org.springframework.stereotype.Service;

@Service

@RocketMQMessageListener(consumerGroup = “common-customer-tag-group”, topic = “common_topic”,selectorType = SelectorType.TAG ,selectorExpression = “tagA||tagB”, messageModel = MessageModel.CLUSTERING)

public class TagConsumerListener implements RocketMQListener {

@Override

public void onMessage(MessageExt messageExt) {

System.out.println(messageExt.toString());

}

}

6.sql过滤消息

生产者:

@RequestMapping(“/sendCommonMessageBySql”)

public String sendCommonMessageBySql(String message) {

Map<string, object=“”> headers = new HashMap<>();

headers.put(“type”, “user”);

headers.put(“a”, 6);

rocketMqUtils.convertAndSend(topic, MessageBuilder.withPayload(message).build(), headers);

return “ok”;

}</string,>

消费者:

import org.mon.message.MessageExt;

import org.apache.rocketmq.spring.annotation.MessageModel;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

import org.apache.rocketmq.spring.annotation.SelectorType;

import org.apache.rocketmq.spring.core.RocketMQListener;

import org.springframework.stereotype.Service;

@Service

@RocketMQMessageListener(consumerGroup = “common-customer-sql-group”, topic = “common_topic”,selectorType = SelectorType.SQL92 ,selectorExpression = “type=‘user’ or a <7”, messageModel = MessageModel.CLUSTERING)

public class SqlConsumerListener implements RocketMQListener {

@Override

public void onMessage(MessageExt message) {

System.out.println(“消费消息:”+new String(message.getBody()));

System.out.println(“消费消息:”+message.getProperties());

}

}

7.事务消息

生产者:

@RequestMapping(“/sendTxMessage”)

public String sendTxMessage() {

String[] tags = {“a”, “b”, “c”};

for (int i = 0; i < 3; i++) {

Message message = MessageBuilder.withPayload(“事务消息===>” + i).setHeader(“rocketmq_tags”, tags[i]).build();

//发送半事务消息

TransactionSendResult res = rocketMqUtils.sendMessageInTransaction(txGroup, txTopic + “:” + tags[i], message, i + 1);

if (res.getLocalTransactionState().equals(MIT_MESSAGE) && res.getSendStatus().equals(SendStatus.SEND_OK)) {

log.info(“事物消息发送成功”);

}

log.info(“事物消息发送结果:{}”, res);

}

return “ok”;

}

生产者监听器:

import com.alibaba.druid.util.StringUtils;

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;

import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;

import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.messaging.Message;

import org.ponent;

@Component

@RocketMQTransactionListener(txProducerGroup = “common-tx-group”)

public class TransactionListener implements RocketMQLocalTransactionListener {

private static final Logger log = LoggerFactory.getLogger("TransactionListener");@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务String tag = String.valueOf(msg.getHeaders().get("rocketmq_tags"));if (StringUtils.equals("a", tag)){//这里只讲TAGA消息提交,状态为可执行return MIT;}else if (StringUtils.equals("b", tag)) {return RocketMQLocalTransactionState.ROLLBACK;} else if (StringUtils.equals("c",tag)) {return RocketMQLocalTransactionState.UNKNOWN;}return RocketMQLocalTransactionState.UNKNOWN;}//mq回调检查本地事务执行情况@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {log.info("checkLocalTransaction===>{}",msg);return MIT;}

}

消费者:

import org.apache.rocketmq.spring.annotation.MessageModel;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

import org.apache.rocketmq.spring.core.RocketMQListener;

import org.springframework.stereotype.Service;

@Service

@RocketMQMessageListener(consumerGroup = “common-customer-tx”, topic = “common-topic-tx”,selectorExpression = “TAGA||TAGB||TAGC”,messageModel = MessageModel.CLUSTERING)

public class CommonTxConsumerListener implements RocketMQListener {

@Overridepublic void onMessage(String s) {System.out.println("消费消息 事务消息:" + s);}

}

其他调用方法,请研究工具类。

代码

七、相关问题 1.如何获取 Topic-Broker 的映射关系?

Producer 和 Consumer 启动时,也都需要指定 namesrvAddr 的地址,从 Namesrv 集群中选一台建立长连接。生产者每 30 秒从 Namesrv 获取 Topic 跟 Broker 的映射关系,更新到本地内存中。然后再跟 Topic 涉及的所有 Broker 建立长连接,每隔 30 秒发一次心跳。

2、消费者消费的几种模式?

RocketMQ 消费者有集群消费和广播消费两种消费模式。

集群消费

一个 Consumer Group 中的各个 Consumer 实例分摊去消费消息,即一条消息只会投递到一个 Consumer Group 下面的一个实例。

广播消费

消息将对一个 Consumer Group 下的各个 Consumer 实例都投递一遍。即即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。

3.RocketMQ 为何这么快?

是因为使用了顺序存储、Page Cache 和异步刷盘。

1)在写入 commitLog 的时候是顺序写入的,这样比随机写入的性能有巨大提升。

2)写入 commitLog 的时候并不是直接写入磁盘,而是先写入操作系统的 PageCache。最后由操作系统异步将缓存中的数据刷到磁盘。

4.RocketMQ 的角色构成?

生产者(Producer):负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。

消费者(Consumer):负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。

消息服务器(Broker):是消息存储中心,主要作用是接收来自 Producer 的消息并存储, Consumer 从这里取得消息。

名称服务器(NameServer):用来保存 Broker 相关 Topic 等元信息并给 Producer ,提供 Consumer 查找 Broker 信息。

5.RocketMQ 执行流程?

1)启动 Namesrv 后开始监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。

2)Broker 启动,跟所有的 Namesrv 保持长连接,定时发送心跳包。

3)收发消息前,先创建 Topic。创建 Topic 时,需要指定该 Topic 要存储在哪些 Broker上,也可以在发送消息时自动创建 Topic。

4)Producer 向该 Topic 发送消息。

5)Consumer 消费该 Topic 的消息。

6.Broker 中的消息被消费后会立即删除吗?

不会,每条消息都会持久化到 CommitLog 中,每个 Consumer 连接到 Broker 后会维持消费进度信息,当有消息消费后只是当前 Consumer 的消费进度(CommitLog的offset)更新了。

7.Broker 如何保存消息数据?

RocketMQ 主要的存储文件包括 commitlog、consumequeue 以及 indexfile 三种文件。

Broker 在收到消息之后,会把消息保存到 commitlog 文件中,同时每个 Topic 对应的 messagequeue 下都会生成 consumequeue 文件用于保存 commitlog 的物理位置偏移量 offset,而 key 和 offset 的对应关系则使用 indexfile 保存。

8.为何使用 NameServer 而非 ZK?

NameServer 是专为 RocketMQ 设计的轻量级名称服务,为 producer 和 consumer 提供路由信息。具有简单、可集群横吐扩展、无状态,节点之间互不通信等特点。而 RocketMQ 的架构设计决定了只需要一个轻量级的元数据服务器就足够了,只需要保持最终一致,而不需要 Zookeeper 这样的强一致性解决方案,不需要再依赖另一个中间件,从而减少整体维护成本。

9.生产环境有多个nameserver该如何连接?

rocketmq.name-server支持配置多个nameserver地址,采用;分隔即可。例如:a:9876;b:9876

10.发送的消息内容体是如何被序列化与反序列化的?

RocketMQ的消息体都是以byte[]方式存储。当业务系统的消息内容体如果是java.lang.String类型时,统一按照utf-8编码转成byte[];如果业务系统的消息内容为非java.lang.String类型,则采用jackson-databind序列化成JSON格式的字符串之后,再统一按照utf-8编码转成byte[]。

11.如何指定topic的tags?

RocketMQ的最佳实践中推荐:一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。 在使用rocketMQTemplate发送消息时,通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName,:前面表示topic的名称,后面表示tags名称。

注意:

tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个。

12.如何指定消费者从哪开始消费消息,或开始消费的位置?

消费者默认开始消费的位置请参考:RocketMQ FAQ。 若想自定义消费者开始的消费位置,只需在消费者类添加一个RocketMQPushConsumerLifecycleListener接口的实现即可。 示例如下:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.mon.UtilAll;

import org.mon.consumer.ConsumeFromWhere;

import org.mon.message.MessageExt;

import org.apache.rocketmq.spring.annotation.MessageModel;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

import org.apache.rocketmq.spring.core.RocketMQListener;

import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;

import org.springframework.stereotype.Service;

@Service

@RocketMQMessageListener(consumerGroup = “common-customer-group”, topic = “common_topic”, messageModel = MessageModel.CLUSTERING)

public class CommonConsumerListener implements RocketMQListener , RocketMQPushConsumerLifecycleListener {

@Override

public void onMessage(MessageExt messageExt) {

System.out.println(“通用消费者-----------------”+messageExt.toString());

//消费者处理时抛出异常时就会自动重试

// throw new RuntimeException(“消费者处理时抛出异常时就会自动重试”);

}

//自定义消息 开始消费的位置@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {// set consumer consume message from nowconsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));}

}

13.NameServer 如何保证最终一致性?

NameServer 作为一个名称服务,需要提供服务注册、服务剔除、服务发现这些基本功能,但是 NameServer 节点之间并不通信,在某个时刻各个节点数据可能不一致的情况下,下面分别从路由注册、路由剔除以及路由发现三个角度进行介绍 NameServer 如何保证最终一致性。

1)路由注册:Broker 节点在启动时轮训 NameServer 列表,与每个 NameServer 节点建立长连接,发起注册请求。 注册后每 30s 向 NameServer 发送心跳包。

2)路由剔除:正常情况下 Broker 退出后会被 Netty 通道监听器监听到,异常情况下,NameServer 有一个定时任务,每隔 10s 扫描一下 Broker 表,剔除心跳包更新时间超过 120s 的 Broker。

3)路由发现:由于 NameServer 不会主动推送 Broker 信息,所以 RocketMQ 客户端提供了定时拉取 Topic 最新路由信息的机制(默认是30秒)。

4)由于路由信息是定时拉取得,所以需要加上(生产者)重试机制。

14.RocketMQ 如何实现事务消息?

分布式系统中的事务可以使用 TCC(Try、Confirm、Cancel)。2pc 来解决分布式系统中的消息原子性。RocketMQ中提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致。

Half Message:预处理消息,当 Broker 收到此类消息后,会存储到 RMQ_SYS_TRANS_HALF_TOPIC 的消息消费队列中。检查事务状态:Broker 会开启一个定时任务,消费 RMQ_SYS_TRANS_HALF_TOPIC 队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,Broker 会定时去回调在重新检查。超时:如果超过回查次数,默认回滚消息。事务消息实现也就是它并未真正进入 Topic 的队列中,而是用了临时队列来放所谓的half message,等提交事务后才会真正的将half message转移到 Topic 下的队列中。

15.RocketMQ 如何实现负载均衡?

RocketMQ 通过 Topic 在多 Broker 中分布式存储实现负载均衡,同时需要生产者、Broker 以及消费者多个不同角色共同完成。

Producer

发送端通过指定 queue 发送消息到相应的Broker 中来达到写入时的负载均衡。默认策略是随机选择,通过自增随机数对列表大小取余获取位置信息,自带容错策略。还可以通过 MessageQueueSelector 的 select 方法实现自定义。

Consumer

采用的是平均分配算法来进行负载均衡,支持一下几种负载均衡策略:

平均分配策略,也是默认策略;

环形分配策略;

手动配置分配策略;

机房分配策略;

一致性哈希分配策略:

靠近机房策略。

16.RocketMQ 如何保证消息不丢失?

RocketMQ 中间件的 Producer、Broker 以及 Consumer 三个组成部分都有可能导致消息的丢失。

Producer 如何保证消息不丢失

采取 send() 同步发送消息,发送结果是同步感知的。

发送失败后可以重试,设置重试次数,默认 3 次。producer.setRetryTimesWhenSendFailed(10);

集群部署,比如发送失败了的原因可能是当前 Broker 宕机了,重试的时候会发送到其它的 Broker 上。

Broker 如何保证消息不丢失

修改刷盘策略为同步刷盘,默认情况下是异步刷盘(即消息到内存后就返回确认信息)。

集群部署,默认异步复制到 slave,可以采用同步的复制方式,master 节点将会同步等待 slave 节点复制完成,才会返回确认响应。

Consumer 如何保证消息不丢失

消费过程需要注意返回消息状态,只有当业务逻辑真正执行成功后,才能返回 CONSUME_SUCCESS 的 ACK 确认。

17.高吞吐量下如何优化生产者和消费者?

1)同一个 Group 下,多机部署,并行消费;

2)单个 Consumer 提高消费线程个数;

3)批量消费:批量拉取拉去消息以及业务逻辑的批量处理。

文章转自:分布式消息队列RocketMQ继承SpringBoot_Java-答学网

作者:答学网,转载请注明原文链接:/

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。