200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > RocketMQ高并发项目——分布式事务之消息最终一致性事务(消息支付)

RocketMQ高并发项目——分布式事务之消息最终一致性事务(消息支付)

时间:2023-04-20 19:19:30

相关推荐

RocketMQ高并发项目——分布式事务之消息最终一致性事务(消息支付)

一. 分布式事务概念

说到分布式事务,就会谈到那个经典的”账户转账”问题:2个账户,分布处于2个不同的DB,或者说2个不同的子系统里面,A要扣钱,B要加钱,如何保证原子性?

一般的思路都是通过消息中间件来实现“最终一致性”:A系统扣钱,然后发条消息给中间件,B系统接收此消息,进行加钱。

但这里面有个问题:A是先update DB,后发送消息呢?还是先发送消息,后update DB?假设先update DB成功,发送消息网络失败,重发又失败,怎么办?假设先发送消息成功,update DB失败。消息已经发出去了,又不能撤回,怎么办?

所以,这里下个结论:只要发送消息和update DB这2个操作不是原子的,无论谁先谁后,都是有问题的。

那这个问题怎么解决呢??为了能解决该问题,同时又不和业务耦合,RocketMQ提出了“事务消息”的概念。

具体来说,就是把消息的发送分成了2个阶段:Prepare阶段和确认阶段。

具体来说,上面的2个步骤,被分解成3个步骤:

(1) 发送Prepared消息

(2) update DB

(3) 根据update DB结果成功或失败,Confirm或者取消Prepared消息。

可能有人会问了,前2步执行成功了,最后1步失败了怎么办?这里就涉及到了RocketMQ的关键点:RocketMQ会定期(默认是1分钟)扫描所有的Prepared消息,询问发送方,到底是要确认这条消息发出去?还是取消此条消息?

有了上述的概念,我们详细解释一下事务消息交互的过程

首先,MQ发送方向MQ服务(即RocketMQ的Broker)发送半消息。MQ服务端会将消息做持久化处理,并发送ACK确认消息已经发送成功。MQ发送方执行本地事务MQ发送方根据本地事务执行的结果向MQ服务提交二次确认:如果本地事务执行成功,则提交消息状态为Commit,否则为Rollback。MQ服务端收到Commit状态的消息将消息标记为可投递状态,订阅方最终会收到该条消息。如果收到的是Rollback,最终MQ服务端会删除该条半消息,订阅方不会接收到这条消息。如果出现网络闪断、应用重启等情况,4阶段替提交的二次确认最终并未能到达MQ服务端,一定时间之后,MQ服务端会对此消息发起回查操作,确认发送方本地事务的执行状态。发送方需要实现服务回查逻辑供MQ服务端进行回调。当发送方收到回查后,需要检查对应消息的本地事务执行的最终结果,此处也需要根据本地事务的成功或失败返回Commit或者Rollback,即再次提交消息状态的二次确认,MQ服务端仍会按照步骤4对该半消息进行操作。

注意1-4 为事务消息的发送过程, 5-6 为事务消息的回查过程。

二. RocketMQ分布式事务Demo

MQ发送方代码:

package com.rocketmq.yy.producer.transaction;import com.rocketmq.yy.constants.Const;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.TransactionListener;import org.apache.rocketmq.client.producer.TransactionMQProducer;import org.mon.message.Message;import org.apache.mon.RemotingHelper;import org.omg.CORBA.PUBLIC_MEMBER;import java.io.UnsupportedEncodingException;import java.util.concurrent.*;public class TransactionProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, InterruptedException {TransactionMQProducer producer = new TransactionMQProducer("text_tx_producer_group_name");//线程池ExecutorService executorService = new ThreadPoolExecutor(2,5,100,TimeUnit.SECONDS,new ArrayBlockingQueue<>(2000),new ThreadFactory(){@Overridepublic Thread newThread(Runnable r){Thread thread = new Thread(r);thread.setName("text_tx_producer_group_name"+"check-thread"); //一般线程都要起名字,方便排错日志查询等return thread;}});producer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE); //RocketMQ的ip地址,放在Const类中了,本文没贴出来producer.setExecutorService(executorService);// 这个对象主要做两个事情,1:一步执行本地事务;2:做回查TransactionListener transactionListener = new TransactionListenerImpl();producer.setTransactionListener(transactionListener);producer.start();;Message message = new Message("test_tx_topic","TagA", "Key",("hello rocketmq 4 tx!").getBytes(RemotingHelper.DEFAULT_CHARSET)/*实际的消息体*/);producer.sendMessageInTransaction(message,"我是回调的参数");Thread.sleep(Integer.MAX_VALUE);producer.shutdown();}}

TransactionListener(本地事务与回查函数)

package com.rocketmq.yy.producer.transaction;import org.apache.rocketmq.client.producer.LocalTransactionState;import org.apache.rocketmq.client.producer.TransactionListener;import org.mon.message.Message;import org.mon.message.MessageExt;public class TransactionListenerImpl implements TransactionListener {//执行本地事务@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object arg) {String callArg = (String)arg;System.out.println("callArg:" + callArg);System.out.println("message:" + message);//tx.begin// 数据库落库操作// mitreturn MIT_MESSAGE;}//check回调@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println("-------回调消息检查-------" + msg);return MIT_MESSAGE;}}

MQ接收方代码

package com.rocketmq.yy.producer.transaction;import com.rocketmq.yy.constants.Const;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.mon.consumer.ConsumeFromWhere;import org.mon.message.MessageExt;import org.apache.mon.RemotingHelper;import org.springframework.aop.support.DefaultPointcutAdvisor;import java.io.UnsupportedEncodingException;import java.util.List;public class TransactionConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_tx_consumer_group_name");consumer.setConsumeThreadMin(10);consumer.setConsumeThreadMax(20);consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET.CONSUME_FROM_LAST_OFFSET);consumer.subscribe("test_tx_topic","*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {MessageExt me = msgs.get(0);try{String topic = me.getTopic();String tags = me.getTags();String keys = me.getKeys();String body = new String(me.getBody(), RemotingHelper.DEFAULT_CHARSET);System.out.println("收到事务消息,topic:" + topic + ",tags:" + tags + ", keys:" + keys + ",body" + body);} catch (UnsupportedEncodingException e) {e.printStackTrace();}return null;}});consumer.start();System.out.println("tx consumer started...");}}

然后我们开启RocketMQ的四个主从节点,启动消费者

然后开启发送者:

我们现在再看消费者,就可以发现消费者接收到信息了、

如果我们将TransactionListener类中的执行本地事务方法的返回值改为

启动producer,执行完本地方法后,producer发给MQ是个unknown状态,所以MQ并不会把此条消息发给consumer,而是向producer进行轮询,producer会启动TransactionListener类中的回调方法,回调方法返回给MQ的是commit,则MQ接收到信息后就会把消息发给consumer。

下图是produce的控制台图,第一幅是produce执行本地事务输出的,第二幅是一分钟后MQ对producer进行轮询,producer执行回调函数输出的。

三 项目应用

我们先来看看业务流程图,红色框的部分就是我们要编写的业务逻辑

这里我们只填service层的业务代码,主要弄清业务逻辑和RocketMQ的基本编写

PayServiceImpl:业务层,调用各种封装类方法完成整个逻辑TransactionProducer:封装类,发送消息TransactionListenerImpl:实现TransactionListener类,负责A账户的扣款逻辑业务,和发送消息给支付BCallbackService:回调给订单的业务逻辑类SyncProducer:分装类,同步发送消息

PayServiceImpl

package com.yy.paya.service.impl;//jar包省略public class PayServiceImpl implements PayService{public static final String TX_PAY_TOPIC = "tx_pay_topic";public static final String TX_PAY_TAGS = "pay";@Autowiredprivate CustomerAccountMapper customerAccountMapper;@Autowiredprivate TransactionProducer transactionProducer;@Autowiredprivate CallbackService callbackService;@Overridepublic String payment(String userId, String orderId, String accountId, double money) {String paymentRet = "";try {//最开始有一步 token验证操作(重复提单问题--比如老公老婆用同一账户在同一时间下单)BigDecimal payMoney = new BigDecimal(money);// 加锁开始(获取)每个acoount账号都有一个锁,所以不会引起并发问题// 这里加锁是为了保证获取版本号和获取余额操作的原子性,这样可以保证余额不足的不会发消息给MQCustomerAccount old = customerAccountMapper.selectByPrimaryKey(accountId);BigDecimal currentBalance = old.getCurrentBalance();int currentVersion = old.getVersion();//要对大概率事件进行提前预判(小概率事件我们做放过,但是最后保障数据的一致性即可)// 业务出发://1 当前一个用户账户 只允许一个线程(一个应用端访问)// 技术出发://1 redis去重 分布式锁//2 数据库乐观锁去重// 做扣款操作的时候:获得分布式锁,看一下能否获得BigDecimal newBalance = currentBalance.subtract(payMoney);//加锁结束(释放)if(newBalance.doubleValue() > 0 ) {//或者一种情况获取锁失败//1.组装消息// 1.执行本地事务String keys = UUID.randomUUID().toString() + "$" + System.currentTimeMillis();Map<String, Object> params = new HashMap<>();params.put("userId", userId);params.put("orderId", orderId);params.put("accountId", accountId);params.put("money", money);//100Message message = new Message(TX_PAY_TOPIC, TX_PAY_TAGS, keys, FastJsonConvertUtil.convertObjectToJSON(params).getBytes());//可能需要用到的参数params.put("payMoney", payMoney);params.put("newBalance", newBalance);params.put("currentVersion", currentVersion);//同步阻塞CountDownLatch countDownLatch = new CountDownLatch(1);params.put("currentCountDown", countDownLatch);//本地的事务执行+消息发送 TransactionSendResult sendResult = transactionProducer.sendMessage(message, params);countDownLatch.await();// 当消息发送成功,本地事务也执行成功时if(sendResult.getSendStatus() == SendStatus.SEND_OK&& sendResult.getLocalTransactionState() == MIT_MESSAGE) {//回调order通知支付成功消息callbackService.sendOKMessage(orderId, userId);paymentRet = "支付成功!";} else {paymentRet = "支付失败!";}} else {paymentRet = "余额不足!";}} catch (Exception e) {e.printStackTrace();paymentRet = "支付失败!";}return paymentRet;}}

package com.yy.paya.service.producer;//jar包省略/*** 一个注入的封装类*/@Componentpublic class TransactionProducer implements InitializingBean {private TransactionMQProducer producer;private ExecutorService executorService; //回调check检查时用@Autowiredprivate TransactionListenerImpl transactionListenerImpl;private static final String NAMESERVER = "172.20.10.7:9876";private static final String PRODUCER_GROUP_NAME = "tx_pay_producer_group_name";//构造函数private TransactionProducer() {this.producer = new TransactionMQProducer(PRODUCER_GROUP_NAME);this.executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName(PRODUCER_GROUP_NAME + "-check-thread");return thread;}});this.producer.setExecutorService(executorService);this.producer.setNamesrvAddr(NAMESERVER);}//继承了InitializingBean接口实现的方法,当我们的类TransactionProducer初始化完毕后再将transactionListenerImpl注入进来@Overridepublic void afterPropertiesSet() throws Exception {this.producer.setTransactionListener(transactionListenerImpl);start(); //启动}private void start() {try {this.producer.start();} catch (MQClientException e) {e.printStackTrace();}}public void shutdown() {this.producer.shutdown();}//真正发消息的方法public TransactionSendResult sendMessage(Message message, Object argument) {TransactionSendResult sendResult = null;try {sendResult = this.producer.sendMessageInTransaction(message, argument);} catch (Exception e) {e.printStackTrace();}return sendResult;}}

TransactionListenerImpl :

package com.yy.paya.service.producer;//jar包省略@Componentpublic class TransactionListenerImpl implements TransactionListener {@Autowiredprivate CustomerAccountMapper customerAccountMapper;@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {System.err.println("执行本地事务单元------------");CountDownLatch currentCountDown = null;try {Map<String, Object> params = (Map<String, Object>) arg;String userId = (String)params.get("userId");String accountId = (String)params.get("accountId");String orderId = (String)params.get("orderId");BigDecimal payMoney = (BigDecimal)params.get("payMoney");//当前的支付款BigDecimal newBalance = (BigDecimal)params.get("newBalance");//前置扣款成功的余额int currentVersion = (int)params.get("currentVersion");currentCountDown = (CountDownLatch)params.get("currentCountDown");//updateBalance 传递当前的支付款 数据库操作: Date currentTime = new Date();int count = this.customerAccountMapper.updateBalance(accountId, newBalance, currentVersion, currentTime);if(count == 1) {currentCountDown.countDown();return MIT_MESSAGE;} else {currentCountDown.countDown();return LocalTransactionState.ROLLBACK_MESSAGE;}} catch (Exception e) {e.printStackTrace();currentCountDown.countDown();return LocalTransactionState.ROLLBACK_MESSAGE;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// TODO Auto-generated method stubreturn null;}}

CallbackService :

package com.yy.paya.service.producer;//jar包省略/*** 发给order系统,更改订单状态(已经付款)*/@Servicepublic class CallbackService {public static final String CALLBACK_PAY_TOPIC = "callback_pay_topic";public static final String CALLBACK_PAY_TAGS = "callback_pay";public static final String NAMESERVER = "192.168.11.121:9876;192.168.11.122:9876;192.168.11.123:9876;192.168.11.124:9876";@Autowiredprivate SyncProducer syncProducer;public void sendOKMessage(String orderId, String userId) {Map<String, Object> params = new HashMap<>();params.put("userId", userId);params.put("orderId", orderId);params.put("status", "2");//okString keys = UUID.randomUUID().toString() + "$" + System.currentTimeMillis();Message message = new Message(CALLBACK_PAY_TOPIC, CALLBACK_PAY_TAGS, keys, FastJsonConvertUtil.convertObjectToJSON(params).getBytes());SendResult ret = syncProducer.sendMessage(message);}}

SyncProducer:

package com.yy.paya.service.producer;//jar包省略@Componentpublic class SyncProducer {private DefaultMQProducer producer;private static final String NAMESERVER = "192.168.11.121:9876;192.168.11.122:9876;192.168.11.123:9876;192.168.11.124:9876";private static final String PRODUCER_GROUP_NAME = "callback_pay_producer_group_name";private SyncProducer() {this.producer = new DefaultMQProducer(PRODUCER_GROUP_NAME);this.producer.setNamesrvAddr(NAMESERVER);this.producer.setRetryTimesWhenSendFailed(3); //失败后可以重发3次start();}public void start() {try {this.producer.start();} catch (MQClientException e) {e.printStackTrace();}}public SendResult sendMessage(Message message) {SendResult sendResult = null;try {sendResult = this.producer.send(message);} catch (MQClientException e) {e.printStackTrace();} catch (RemotingException e) {e.printStackTrace();} catch (MQBrokerException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}return sendResult;}public void shutdown() {this.producer.shutdown();}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。