200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > rocketMQ-消息队列

rocketMQ-消息队列

时间:2021-08-12 20:11:23

相关推荐

rocketMQ-消息队列

RocketMQ4.X基础介绍

官网地址 /

参考资料 //01/12/rocketmq-quick-start-in-10-minutes/

参考资料 /p/453c6e7ff81c

Apache RocketMQ作为阿里开源的一款高性能、高吞吐量的分布式消息中间件

特点

支持Broker和Consumer端消息过滤支持发布订阅模型,和点对点支持拉pull和推push两种消息模式单一队列百万消息、亿级消息堆积支持单master节点,多master节点,多master多slave节点任意一点都是高可用,水平拓展,Producer、Consumer、队列都可以分布式消息失败重试机制、支持特定level的定时消息新版本底层采用Netty4.3.x支持分布式事务适合金融类业务,高可用性跟踪和审计功能。

概念

Producer:消息生产者Producer Group:消息生产者组,发送同类消息的一个消息生产组Consumer:消费者Consumer Group:消费同类消息的多个实例Tag:标签,子主题(二级分类)对topic的进一步细化,用于区分同一个主题下的不同业务的消息Topic:主题, 如订单类消息,queue是消息的物理管理单位,而topic是逻辑管理单位。一个topic下可以有多个queue,默认自动创建是4个,手动创建是8个Message:消息,每个message必须指定一个topicBroker:MQ程序,接收生产的消息,提供给消费者消费的程序Name Server:给生产和消费者提供路由信息,提供轻量级的服务发现、路由、元数据信息,可以多个部署,互相独立(比zookeeper更轻量)Offset: 偏移量,可以理解为消息进度commit log: 消息存储会写在Commit log文件里面

RocketMQ集群模式分析

推荐主从(异步、同步双写)、双主双从-多主多从模式(异步复制),双主双从-多主多从模式(同步双写)

单节点 :

优点:本地开发测试,配置简单,同步刷盘消息一条都不会丢

缺点:不可靠,如果宕机,会导致服务不可用

主从(异步、同步双写) :

优点:同步双写消息不丢失, 异步复制存在少量丢失 ,主节点宕机,从节点可以对外提供消息的消费,但是不支持写入

缺点:主备有短暂消息延迟,毫秒级,目前不支持自动切换,需要脚本或者其他程序进行检测然后进行停止broker,重启让从节点成为主节点

双主:

优点:配置简单, 可以靠配置RAID磁盘阵列保证消息可靠,异步刷盘丢失少量消息

缺点: master机器宕机期间,未被消费的消息在机器恢复之前不可消费,实时性会受到影响

双主双从,多主多从模式(异步复制)

优点:磁盘损坏,消息丢失的非常少,消息实时性不会受影响,Master 宕机后,消费者仍然可以从Slave消费

缺点:主备有短暂消息延迟,毫秒级,如果Master宕机,磁盘损坏情况,会丢失少量消息

双主双从,多主多从模式(同步双写)

优点:同步双写方式,主备都写成功,向应用才返回成功,服务可用性与数据可用性都非常高

缺点:性能比异步复制模式略低,主宕机后,备机不能自动切换为主机

消息可靠性

同步、异步刷盘

内存+磁盘什么是异步刷盘(数据可能丢失,性能高):什么是同步刷盘:数据安全性高选择:各有优缺点,看业务需要

同步、异步复制

Master - Slave节点里面异步复制 : 数据可能丢失,性能高同步复制: 数据安全性高,性能低一点最终推荐这种方式:同步双写(即M-S同步复制),异步刷盘

消息队列RocketMQ4.X核心配置

compressMsgBodyOverHowmuch :消息超过默认字节4096后进行压缩

retryTimesWhenSendFailed : 失败重发次数

maxMessageSize : 最大消息配置,默认128k

topicQueueNums : 主题下面的队列数量,默认是4

autoCreateTopicEnable : 是否自动创建主题Topic, 开发建议为true,生产要为false

defaultTopicQueueNums : 自动创建服务器不存在的topic,默认创建的队列数

autoCreateSubscriptionGroup: 是否允许 Broker 自动创建订阅组,建议线下开发开启,线上关闭

brokerClusterName : 集群名称

brokerId : 0表示Master主节点 大于0表示从节点

brokerIP1 : Broker服务地址

brokerRole : broker角色 ASYNC_MASTER/ SYNC_MASTER/ SLAVE

deleteWhen : 每天执行删除过期文件的时间,默认每天凌晨4点

flushDiskType :刷盘策略, 默认为 ASYNC_FLUSH(异步刷盘), 另外是SYNC_FLUSH(同步刷盘)

listenPort : Broker监听的端口号

mapedFileSizeCommitLog : 单个conmmitlog文件大小,默认是1GB

mapedFileSizeConsumeQueue:ConsumeQueue每个文件默认存30W条,可以根据项目调整

storePathRootDir : 存储消息以及一些配置信息的根目录 默认为用户的 ${HOME}/store

storePathCommitLog:commitlog存储目录默认为${storePathRootDir}/commitlog

storePathIndex: 消息索引存储路径

syncFlushTimeout : 同步刷盘超时时间

diskMaxUsedSpaceRatio : 检测可用的磁盘空间大小,超过后会写入报错

RocketMQ消息常见发送状态

消息发送有同步和异步Broker消息投递状态讲解

MQ消息类型

同步消息

应用场景:重要通知邮件、报名短信通知、营销短信系统等

异步消息

应用场景:对RT时间敏感,可以支持更高的并发,回调成功触发相对应的业务,比如注册成功后通知积分系统发放优惠券

延时消息

Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息,目前支持固定精度的消息消息生产和消费有时间窗口要求:比如在天猫电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条 延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略

顺序消息

消息的生产和消费顺序一致电商的订单创建,同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息、订单交易成功消息 都会按照先后顺序来发布和消费

示例

各类消息生产者

import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.MessageQueueSelector;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.mon.message.Message;import org.mon.message.MessageQueue;import org.apache.rocketmq.remoting.exception.RemotingException;import org.ponent;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;import java.util.List;/*** @author zhang3* 消息 生产者*/@Component@Slf4jpublic class DemoProducer {private String producerGroup = "broker-a";private DefaultMQProducer producer;public DefaultMQProducer getProducer(){return this.producer;}/*** 对象在使用之前必须要调用一次,只能初始化一次*/@PostConstructpublic void start(){producer = new DefaultMQProducer(producerGroup);producer.setNamesrvAddr(JmsConfig.NAME_SERVER);try {this.producer.start();} catch (MQClientException e) {e.printStackTrace();}}/*** 一般在应用上下文,使用上下文监听器,进行关闭*/@PreDestroypublic void shutdown(){this.producer.shutdown();}/*** 同步 发送消息** @param message 消息* @return 发送结果*/public SendResult sendMsg(Message message) {try {SendResult sendResult = getProducer().send(message);log.info("MQ消息发送成功!");return sendResult;} catch (Exception e) {throw new RuntimeException("MQ消息发送失败!", e);}}/*** 异步 发送消息** @param message 消息*/public void sendAsyMsg(Message message) {try {getProducer().send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("MQ消息发送成功!");}@Overridepublic void onException(Throwable throwable) {//TODO 补偿操作,根据具体业务定制throw new RuntimeException("MQ消息发送失败!", throwable);}});} catch (Exception e) {throw new RuntimeException("MQ消息发送失败!", e);}}/*** 延迟 发送消息** @param message 消息* @param delayTimeLevel 延迟级别* @return 发送结果*/public SendResult sendDelayMsg(Message message, int delayTimeLevel) {try {//xxx是级别,1表示配置里面的第一个级别,2表示第二个级别,延迟消费级别//"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"message.setDelayTimeLevel(delayTimeLevel);SendResult sendResult = getProducer().send(message);log.info("MQ消息发送成功!");return sendResult;} catch (Exception e) {throw new RuntimeException("MQ消息发送失败!", e);}}/*** 延迟 发送消息** @param message 消息* @return 发送结果*/public SendResult sendDelayMsg(Message message) {return sendDelayMsg(message, 3);}/*** 选择队列 发送消息** @param message 消息* @param id 编号* @return 发送结果*/public SendResult sendQueueSelectorMsg(Message message, Integer id) {try {SendResult sendResult = getProducer().send(message, (queueLists, msg, o) -> {Integer num = (Integer) o;//消息队列(0-3),默认topic下queue数量是4Integer queueNum = num % queueLists.size();return queueLists.get(queueNum);}, id);log.info("MQ消息发送成功!");return sendResult;} catch (Exception e) {throw new RuntimeException("MQ消息发送失败!", e);}}}

并发消息 消费者示例

import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.mon.consumer.ConsumeFromWhere;import org.mon.message.MessageExt;import org.ponent;import java.io.UnsupportedEncodingException;/*** 并发进行消费 消费者*/@Component@Slf4jpublic class DemoConsumer {private DefaultMQPushConsumer consumer;private String consumerGroup = "demo_consumer_group";public DemoConsumer() throws MQClientException {consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.subscribe(JmsConfig.DEFAULT_TOPIC, "*");// consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// try {//Message msg = msgs.get(0);//System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));//String topic = msg.getTopic();//String body = new String(msg.getBody(), "utf-8");//String tags = msg.getTags();//String keys = msg.getKeys();//System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);//return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// } catch (UnsupportedEncodingException e) {//e.printStackTrace();//return ConsumeConcurrentlyStatus.RECONSUME_LATER;// }// });consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {MessageExt msg = msgs.get(0);int times = msg.getReconsumeTimes();if (times > 0) {log.info("重试次数="+times);}try {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));String topic = msg.getTopic();String body = new String(msg.getBody(), "utf-8");String tags = msg.getTags();String keys = msg.getKeys();log.info("消费消息:" + "topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (UnsupportedEncodingException e) {log.error("消费异常" + e.getMessage());//如果重试2次不成功,则记录,人工介入if(times >= 2){log.error("重试次数大于2,记录数据库,发短信通知开发人员或者运营人员");//TODO 记录数据库,发短信通知开发人员或者运营人员//告诉broker,消息成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});consumer.start();log.info("consumer start ...");}}

顺序消息 消费者示例

import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;import org.apache.rocketmq.client.exception.MQClientException;import org.mon.consumer.ConsumeFromWhere;import org.mon.message.Message;import org.mon.message.MessageExt;import org.mon.protocol.heartbeat.MessageModel;import org.ponent;import java.io.UnsupportedEncodingException;/*** @author zhang3* description 有序的进行消费 消费者* * 需要配合有序的消息进行一起使用* {@link DemoProducer#sendQueueSelectorMsg(Message, Integer)}*/@Component@Slf4jpublic class DemoOrderlyConsumer {private DefaultMQPushConsumer consumer;private String consumerGroup = "demo_orderly_consumer_group";public DemoOrderlyConsumer() throws MQClientException {consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//默认是集群方式,可以更改为广播,但是广播方式不支持重试consumer.setMessageModel(MessageModel.CLUSTERING);consumer.subscribe(JmsConfig.DEFAULT_ORDERLY_TOPIC, "*");consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {MessageExt msg = msgs.get(0);int times = msg.getReconsumeTimes();if (times > 0) {log.info("重试次数="+times);}try {//System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));String topic = msg.getTopic();String body = new String(msg.getBody(), "utf-8");String tags = msg.getTags();String keys = msg.getKeys();int queueId = msg.getQueueId();log.info("队列id:" +queueId + ",msg:" + body);//做业务逻辑操作 TODO//log.info("消费消息:" + "topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);return ConsumeOrderlyStatus.SUCCESS;} catch (UnsupportedEncodingException e) {log.error("消费异常" + e.getMessage());//如果重试2次不成功,则记录,人工介入if(times >= 2){log.error("重试次数大于2,记录数据库,发短信通知开发人员或者运营人员");//TODO 记录数据库,发短信通知开发人员或者运营人员//告诉broker,消息成功return ConsumeOrderlyStatus.SUCCESS;}return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}});consumer.start();log.info("consumer start ...");}}

消费者 配置

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

集群和广播模式

Topic下队列的奇偶数会影响Customer个数里面的消费数量 如果是4个队列,8个消息,4个节点则会各消费2条,如果不对等,则负载均衡会分配不均,如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用,所以需要控制让queue的总数量大于等于consumer的数量 集群模式(默认): Consumer实例平均分摊消费生产者发送的消息例子:订单消息,一般是只被消费一次 广播模式: 广播模式下消费消息:投递到Broker的消息会被每个Consumer进行消费,一条消息被多个Consumer消费,广播消费中ConsumerGroup暂时无用例子:群公告,每个人都需要消费这个消息 怎么切换模式:通过setMessageModel()

MQ消费模式分析

PushConsumer本质是长轮训

实时性高;但增加服务端负载,消费端能力不同,如果Push推送过快,消费端会出现很多问题

系统收到消息后自动处理消息和offset,如果有新的Consumer加入会自动做负载均衡,在broker端可以通过longPollingEnable=true来开启长轮询消费端代码:DefaultMQPushConsumerImpl->pullMessage->PullCallback服务端代码:broker.longpolling虽然是push,但是代码里面大量使用了pull,是因为使用长轮训方式达到Push效果,既有pull有的,又有Push的实时性优雅关闭:主要是释放资源和保存Offset, 调用shutdown()即可 ,参考 @PostConstruct、@PreDestroy

PullConsumer需要自己维护Offset

消费者从Server端拉取消息,主动权在消费者端,可控性好;但 间隔时间不好设置,间隔太短,则空请求,浪费资源;间隔时间太长,则消息不能及时处理

官方例子路径:org.apache.rocketmq.example.simple.PullConsumer获取MessageQueue遍历客户维护Offset,需用用户本地存储Offset,存储内存、磁盘、数据库等处理不同状态的消息 FOUND、NO_NEW_MSG、OFFSET_ILLRGL、NO_MATCHED_MSG、4种状态灵活性高可控性强,但是编码复杂度会高优雅关闭:主要是释放资源和保存Offset,需用程序自己保存好Offset,特别是异常处理的时候

长轮询

长轮询: Client请求Server端也就是Broker的时候, Broker会保持当前连接一段时间 默认是15s,如果这段时间内有消息到达,则立刻返回给Consumer.没消息的话 超过15s,则返回空,再进行重新请求;主动权在Consumer中,Broker即使有大量的消息 也不会主动提送Consumer, 缺点:服务端需要保持Consumer的请求,会占用资源,需要客户端连接数可控 否则会一堆连接

消息队列Offset和CommitLog

offset 分析

什么是offset

message queue是无限长的数组,一条消息进来下标就会涨1,下标就是offset,消息在某个MessageQueue里的位置,通过offset的值可以定位到这条消息,或者指示Consumer从这条消息开始向后处理

message queue中的maxOffset表示消息的最大offset, maxOffset并不是最新的那条消息的offset,而是最新消息的offset+1,minOffset则是现存在的最小offset。

fileReserveTime=48 默认消息存储48小时后,消费会被物理地从磁盘删除,message queue的min offset也就对应增长。所以比minOffset还要小的那些消息已经不在broker上了,就无法被消费

类型(父类是OffsetStore):

本地文件类型 DefaultMQPushConsumer的BROADCASTING模式,各个Consumer没有互相干扰,使用LoclaFileOffsetStore,把Offset存储在本地 Broker代存储类型 DefaultMQPushConsumer的CLUSTERING模式,由Broker端存储和控制Offset的值,使用RemoteBrokerOffsetStore 阅读源码的正确姿势: 先有思路,明白大体流程再看接口再看实现类

有什么用

主要是记录消息的偏移量,有多个消费者进行消费集群模式下采用RemoteBrokerOffsetStore, broker控制offset的值广播模式下采用LocalFileOffsetStore, 消费端存储

建议采用pushConsumer,RocketMQ自动维护OffsetStore,如果用另外一种pullConsumer需要自己进行维护OffsetStore

CommitLog分析

消息存储是由ConsumeQueue和CommitLog配合完成

ConsumeQueue: 是逻辑队列, CommitLog是真正存储消息文件的,存储的是指向物理存储的地址

Topic下的每个message queue都有对应的ConsumeQueue文件,内容也会被持久化到磁盘

默认地址:store/consumequeue/{topicName}/{queueid}/fileName

什么是CommitLog:

消息文件的存储地址生成规则: 每个文件的默认1G =1024 * 1024 * 1024,commitlog的文件名fileName,名字长度为20位,左边补零,剩余为起始偏移量;比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1 073 741 824Byte;当这个文件满了,第二个文件名字为00000000001073741824,起始偏移量为1073741824, 消息存储的时候会顺序写入文件,当文件满了则写入下一个文件 判断消息存储在哪个CommitLog上 例如 1073742827 为物理偏移量,则其对应的相对偏移量为 1003 = 1073742827 - 1073741824,并且该偏移量位于第二个 CommitLog。

Broker里面一个Topic

里面有多个MesssageQueue 每个MessageQueue对应一个ConsumeQueue ConsumeQueue里面记录的是消息在CommitLog里面的物理存储地址

ZeroCopy零拷贝技术分析

高效原因

CommitLog顺序写, 存储了MessagBody、message key、tag等信息

ConsumeQueue随机读 + 操作系统的PageCache + 零拷贝技术ZeroCopy

零拷贝技术

read(file, tmp_buf, len);write(socket, tmp_buf, len);

例子:将一个File读取并发送出去(Linux有两个上下文,内核态,用户态)

File文件的经历了4次copy 调用read,将文件拷贝到了kernel内核态CPU控制 kernel态的数据copy到用户态调用write时,user态下的内容会copy到内核态的socket的buffer中最后将内核态socket buffer的数据copy到网卡设备中传送 缺点:增加了上下文切换、浪费了2次无效拷贝(即步骤2和3)

ZeroCopy:

请求kernel直接把disk的data传输给socket,而不是通过应用程序传输。Zero copy大大提高了应用程序的性能,减少不必要的内核缓冲区跟用户缓冲区间的拷贝,从而减少CPU的开销和减少了kernel和user模式的上下文切换,达到性能的提升

对应零拷贝技术有mmap及sendfile

mmap:小文件传输快 RocketMQ 选择这种方式,mmap+write 方式,小块数据传输,效果会比 sendfile 更好 sendfile:大文件传输比mmap快

Java中的TransferTo()实现了Zero-Copy

应用:Kafka、Netty、RocketMQ等都采用了零拷贝技术

RocketMQ4.x分布式事务消息

分布式事务消息介绍

什么是分布式事务 来源:单体应用—>拆分为分布式应用一个接口需要调用多个服务,且操作不同的数据库,数据一致性难保障, 常见解决方案 2PC : 两阶段提交, 基于XA协议TCC : Try、Confirm、Cancel 下单: 事务消息最终一致性:更多… 框架 GTS -> 开源 Fescar 地址:/alibaba/fescar LCN 地址:/codingapi/tx-lcn

MQ分布式事务消息的总体架构

RocketMQ事务消息:

RocketMQ 提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致

半消息Half Message:

暂不能投递的消息(暂不能消费),Producer已经将消息成功发送到了Broker端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息

消息回查:

由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列 RocketMQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。

整体交互流程

Producer向broker端发送消息。

服务端将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。

发送方开始执行本地事务逻辑。发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半消息,订阅方将不会接受该消息在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半消息进行操作

RocketMQ事务消息的状态

COMMIT_MESSAGE: 提交事务消息,消费者可以消费此消息ROLLBACK_MESSAGE:回滚事务消息,消息会在broker中删除,消费者不能消费UNKNOW:Broker需要回查确认消息的状态

关于事务消息的消费

事务消息consumer端的消费方式和普通消息是一样的,RocketMQ能保证消息能被consumer收到(消息重试等机制,最后也存在consumer消费失败的情况,这种情况出现的概率极低)。

MQ分布式事务消息实战

TransactionMQProducer基础介绍和使用自定义线程池和消息生产者结合

//监听器 ,执行本地事务TransactionListener transactionListener = new TransactionListenerImpl();//创建事务消息发送者TransactionMQProducer producer = new TransactionMQProducer("unique_group_name");//创建自定义线程池//@param corePoolSize 池中所保存的核心线程数//@param maximumPoolSize 池中允许的最大线程数//@param keepActiveTime 非核心线程空闲等待新任务的最长时间//@param timeunit keepActiveTime参数的时间单位//@param blockingqueue 任务队列ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;}});//设置producer基本属性 producer.setNamesrvAddr(JmsConfig.NAME_SERVER);producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();

TransactionListener使用

executeLocalTransaction 执行本地事务checkLocalTransaction 回查消息,要么commit 要么rollback,reconsumeTimes不生效

注意点:TransactionMQProducer 的groupName要唯一,不能和普通的producer一样

本地访问路径:http://localhost:8081/api/v1/pay_cb?tag=xdclass222&otherParam=2

消息队列常见问题总结

为什么使用消息队列

消息队列技术选择

如果保证消息队列高可用

如何保证消息传输的可靠性

如何消息的重复消费

如何保证消息的顺序消费

消息堆积怎么处理

为什么使用消息队列

异步 例子: 解耦: 例子: 削峰: 例子: 缺点: 系统可用性越低:外部依赖越多,依赖越多,出问题风险越大系统复杂性提高:需要考虑多种场景,比如消息重复消费,消息丢失需要更多的机器和人力: 消息队列一般集群部署,而且需要运维和监控,例如topic申请等

消息队列选择问题:Apache ActiveMQ、Kafka、RabbitMQ、RocketMQ

ActiveMQ:/

Apache出品,历史悠久,支持多种语言的客户端和协议,支持多种语言Java, .NET, C++ 等,基于JMS Provider的实现

缺点:吞吐量不高,多队列的时候性能下降,存在消息丢失的情况,比较少大规模使用

Kafka:/

是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理大规模的网站中的所有动作流数据(网页浏览,搜索和其他用户的行动),副本集机制,实现数据冗余,保障数据尽量不丢失;支持多个生产者和消费者

缺点:不支持批量和广播消息,运维难度大,文档比较少, 需要掌握Scala,二次开发难度大

RabbitMQ:/

是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错

缺点:使用Erlang开发,阅读和修改源码难度大

RocketMQ:/

阿里开源的一款的消息中间件, 纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点, 性能强劲(零拷贝技术),支持海量堆积, 支持指定次数和时间间隔的失败消息重发,支持consumer端tag过滤、延迟消息等,在阿里内部进行大规模使用,适合在电商,互联网金融等领域使用

缺点:部分实现不是按照标准JMS规范,有些系统要迁移或者引入队列需要修改代码

消息队列怎么避免重复消费

RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重

接口幂等性保障 ,消费端处理业务消息要保持幂等性

Redis

setNX() , 做消息id去重 java版本目前不支持设置过期时间

//Redis中操作,判断是否已经操作过 TODOboolean flag = jedis.setNX(key);if(flag){//消费}else{//忽略,重复消费}

拓展(如果再用expire则不是原子操作,可以用下面方式实现分布式锁)

加锁String result = jedis.set(key, value, "NX", "PX", expireTime)解锁(Lua脚本,先检查key,匹配再释放锁,lua可以保证原子性)String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));备注:lockKey可以是商品id,requestId用于标示是同个客户端

Incr 原子操作:key自增,大于0 返回值大于0则说明消费过

int num = jedis.incr(key);if(num == 1){//消费}else{//忽略,重复消费}

上述两个方式都可以,但是不能用于分布式锁,考虑原子问题,但是排重可以不考虑原子问题,数据量多需要设置过期时间

数据库去重表

某个字段使用Message的key做唯一索引

MQ如何保证消息的可靠性传输

producer端

不采用oneway发送,使用同步或者异步方式发送,做好重试,但是重试的Message key必须唯一投递的日志需要保存,关键字段,投递时间、投递状态、重试次数、请求体、响应体

broker端

双主双从架构,NameServer需要多节点同步双写、异步刷盘 (同步刷盘则可靠性更高,但是性能差点,根据业务选择)

consumer端

消息消费务必保留日志,即消息的元数据和消息体消费端务必做好幂等性处理

投递到broker端后

机器断电重启:异步刷盘,消息丢失;同步刷盘消息不丢失

硬件故障:可能存在丢失,看队列架构

消息发生大量堆积应该怎么处理

线上故障了,怎么处理

消息堆积了10小时,有几千万条消息待处理,现在怎么办?修复consumer, 然后慢慢消费?也需要几小时才可以消费完成,新的消息怎么办?

正确的姿势

临时topic队列扩容,并提高消费者能力,但是如果增加Consumer数量,但是堆积的topic里面的message queue数量固定,过多的consumer不能分配到message queue编写临时处理分发程序,从旧topic快速读取到临时新topic中,新topic的queue数量扩容多倍,然后再启动更多consumer进行在临时新的topic里消费

RocketMQ高性能的原因分析

MQ架构配置

顺序写,随机读,零拷贝同步刷盘SYNC_FLUSH和异步刷盘ASYNC_FLUSH, 通过flushDiskType配置同步复制和异步复制,通过brokerRole配置,ASYNC_MASTER, SYNC_MASTER, SLAVE推荐同步复制(双写),异步刷盘

发送端高可用

双主双从架构:创建Topic对应的时候,MessageQueue创建在多个Broker上

即相同的Broker名称,不同的brokerid(即主从模式);当一个Master不可用时,组内其他的Master仍然可用。

但是机器资源不足的时候,需要手工把slave转成master,目前不支持自动转换,可用shell处理

消费高可用

主从架构:Broker角色,Master提供读写,Slave只支持读Consumer不用配置,当Master不可用或者繁忙的时候,Consumer会自动切换到Slave节点进行能读取

提高消息的消费能力

并行消费 增加多个节点增加单个Consumer的并行度,修改consumerThreadMin和consumerThreadMax批量消费,设置Consumer的consumerMessageBatchMaxSize, 默认是1,如果为N,则消息多的时候,每次收到的消息为N条 择 Linux Ext4 文件系统,Ext4 文件系统删除 1G 大小的文件通常耗时小于 50ms,而 Ext3文件系统耗时需要 1s,删除文件时磁盘IO 压力极大,会导致 IO 操作超时

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。
扩展阅读