200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > RocketMQ源码 — 十一 RocketMQ事务消息

RocketMQ源码 — 十一 RocketMQ事务消息

时间:2024-03-13 01:21:18

相关推荐

RocketMQ源码 — 十一  RocketMQ事务消息

分布式事务是一个复杂的问题,rmq实现了事务的最终一致性,rmq保证本地事务成功消息一定会发送成功并被成功消费,如果本地事务失败了,消息不会被发送。

rmq事务消息的实现过程为:

producer发送half消息broker确认half消息,并通知producer,表示消息已经成功发送到broker(这个过程其实就是步骤1broker的返回)producer收到half确认消息之后,执行自己本地事务,并将事务结果(UNKNOW、commit、rollback)告诉broker(这是一个oneway消息,而且失败不重试)broker收到producer本地事务的结果后决定是否投递消息给consumer鉴于producer发送本地事务结果可能失败,broker会定时扫描集群中的事务消息,然后回查(apache4.2.0尚未实现,因为没有调用org.apache.rocketmq..Broker2Client#checkProducerTransactionState)

producer发送half消息

事务消息的发送过程和普通消息发送过程是不一样的,发送消息的方法是org.apache.rocketmq.client.producer.TransactionMQProducer#sendMessageInTransaction,入参有一个LocalTransactionExecuter,需要用户实现一个本地事务的executor,用户可以在executor中执行事务操作

// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransactionpublic TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter tranExecuter, final Object arg)throws MQClientException {if (null == tranExecuter) {throw new MQClientException("tranExecutor is null", null);}Validators.checkMessage(msg, this.defaultMQProducer);SendResult sendResult = null;// 标记消息是half消息MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());try {// 发送half消息,该方法是同步发送,事务消息也必须是同步发送sendResult = this.send(msg);} catch (Exception e) {throw new MQClientException("send message Exception", e);}LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable localException = null;switch (sendResult.getSendStatus()) {case SEND_OK: {// 只有在half消息发送成功的时候才会执行事务try {if (sendResult.getTransactionId() != null) {msg.putUserProperty("__transactionId__", sendResult.getTransactionId());}// 执行本地事务localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);if (null == localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;}if (localTransactionState != MIT_MESSAGE) {log.info("executeLocalTransactionBranch return {}", localTransactionState);log.info(msg.toString());}} catch (Throwable e) {log.info("executeLocalTransactionBranch exception", e);log.info(msg.toString());localException = e;}}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}try {// 根据事务commit的情况来判断下一步操作this.endTransaction(sendResult, localTransactionState, localException);} catch (Exception e) {log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);}TransactionSendResult transactionSendResult = new TransactionSendResult();transactionSendResult.setSendStatus(sendResult.getSendStatus());transactionSendResult.setMessageQueue(sendResult.getMessageQueue());transactionSendResult.setMsgId(sendResult.getMsgId());transactionSendResult.setQueueOffset(sendResult.getQueueOffset());transactionSendResult.setTransactionId(sendResult.getTransactionId());transactionSendResult.setLocalTransactionState(localTransactionState);return transactionSendResult;}

为了保证本地事务和消息发送成功的原子性,producer会先发送一个half消息到broker

只有half消息发送成功了,事务才会被执行如果half消息发送失败了,事务不会被执行

half消息和普通的消息也不一样,half消息发送到broker后并不会被consumer消费掉。之所以不会被消费掉的原因如下:

broker在将消息写入CommitLog的时候会判断消息类型,如果是是prepare或者rollback消息,ConsumeQueue的offset(每个消息对应ConsumeQueue中的一个数据结构(包含topic、tag的hashCode、消息对应CommitLog的物理offset),offset表示数据结构是第几个)不会增加broker在构造ConsumeQueue的时候会判断是否是prepare或者rollback消息,如果是这两种中的一种则不会将该消息放入ConsumeQueue,cnosumer在拉取消息的时候也就不会拉取到prepare和rollback的消息。

相关代码如下:

// org.apache.mitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)switch (tranType) {case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:// The next update ConsumeQueue informationCommitLog.this.topicQueueTable.put(key, ++queueOffset);break;default:break;}// org.apache.rocketmq.mitLogDispatcherBuildConsumeQueue#dispatchpublic void dispatch(DispatchRequest request) {final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());switch (tranType) {case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:DefaultMessageStore.this.putMessagePositionInfo(request);break;case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;}}

以上两点保证了prepare消息也就是half消息不会被消费。

producer结束事务

producer根据half消息发送结果和事务执行结果来处理事务——commit或者rollback。从上面发送消息的代码可以看到最后调用了endTransaction来处理事务执行结果,这个方法里面就是将事务执行的结果通过消息发送给broker,由broker决定消息是否投递。

public void endTransaction(final SendResult sendResult,final LocalTransactionState localTransactionState,final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {final MessageId id;// 从broker返回的信息中获取half消息的offsetif (sendResult.getOffsetMsgId() != null) {id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());} else {id = MessageDecoder.decodeMessageId(sendResult.getMsgId());}String transactionId = sendResult.getTransactionId();final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();// 需要把transactionId和offset发送给broker,便于broker查找half消息requestHeader.setTransactionId(transactionId);requestHeader.setCommitLogOffset(id.getOffset());switch (localTransactionState) {case COMMIT_MESSAGE:// 表明本地址事务成功commit,告诉broker可以提交事务requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE:// 说明事物需要回滚,有可能是half消息发送失败,也有可能是本地事务执行失败requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);break;case UNKNOW:// 如果状态是UNKNOW,broker还会反查producer,也就是接口:org.apache.rocketmq.example.transaction.TransactionCheckListenerImpl#checkLocalTransactionState的作用,但是目前rmq4.2.0并没有向producer查询,也就是源码中都没有调用这个接口requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);break;default:break;}requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());requestHeader.setMsgId(sendResult.getMsgId());String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;// 这个发送消息是onway的,也就是不会等待返回this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,this.defaultMQProducer.getSendMsgTimeout());}

从最一开始的图看出,producer将事务结果的消息发送给broker的时候可能会失败,如果失败了,broker就不知道本次事务是否应该commit,为了防止这种情况,rmq会向producer发送一个command查询处于prepare状态的事务的结果,上面也说了rmq4.2.0并没有发送这个command,也就是说当前rmq并不能保证producer将事务结果通知到broker。

broker决定消息是否可以投递

broker处理事务结果的消息的类是org.apache.rocketmq.broker.processor.EndTransactionProcessor

收到消息之后先检查是否是事务类型的消息,不是事务消息直接返回。根据header中的offset查询half消息,查不到直接返回,不作处理根据half消息构造新的消息,新构造的这个消息会被重新写入CommitLog,如果是rollback消息则body为空如果是rollback消息的话,该消息不会被投递(原因和half不会被投递的原因一样),commit消息broker才会投递给consumer

也就是说rmq对于commit和rollback都会新写一个消息到CommitLog,只是rollback的消息的body是空的,而且该消息和half消息一样不会被投递,直到CommitLog删除过期消息,会从磁盘中删除;但是commit的时候,rmq会重新封装half消息并“投递”给consumer消费。

consumer保证消费成功

关于事务消息consumer端的消费方式和普通消息是一样的,RocketMQ能保证消息能被consumer收到(消息重试等机制,其实有可能存在consumer消费失败的情况,这种情况RocketMQ并不能解决,官方建议人工解决,这种情况出现的概率极低)。

总结

基于rmq的阿里云ons实现了事务最终一致性的所有功能,但是apache rmq没有实现消息回查的功能。所以rmq存在一定几率会让事务处于事务结果不明确的状态。

参考

收发事务消息

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。