200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > 跟我学RocketMQ之批量消息发送源码解析

跟我学RocketMQ之批量消息发送源码解析

时间:2020-07-01 19:10:31

相关推荐

跟我学RocketMQ之批量消息发送源码解析

上篇文章 跟我学RocketMQ之消息发送源码解析 中,我们已经对普通消息的发送流程进行了详细的解释,但是由于篇幅问题没有展开讲解批量消息的发送。本文中,我们就一起来集中分析一下批量消息的发送是怎样的逻辑。

DefaultProducer.send

RocketMQ提供了批量发送消息的API,同样在DefaultProducer.java中

@Overridepublic SendResult send(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.defaultMQProducerImpl.send(batch(msgs));}

它的参数为Message集合,也就是一批消息。它的另外一个重载方法提供了发送超时时间参数

@Overridepublic SendResult send(Collection<Message> msgs,long timeout) throws MQClientException, RemotingException,MQBrokerException, InterruptedException {return this.defaultMQProducerImpl.send(batch(msgs), timeout);}

可以看到是将消息通过batch()方法打包为单条消息,我们看一下batch方法的逻辑

DefaultProducer.batch

private MessageBatch batch(Collection<Message> msgs) throws MQClientException {

// 声明批量消息体MessageBatch msgBatch;try {

// 从Message的list生成批量消息体MessageBatchmsgBatch = MessageBatch.generateFromList(msgs);for (Message message : msgBatch) {Validators.checkMessage(message, this);MessageClientIDSetter.setUniqID(message);message.setTopic(withNamespace(message.getTopic()));}// 设置消息体,此时的消息体已经是处理过后的批量消息体msgBatch.setBody(msgBatch.encode());} catch (Exception e) {throw new MQClientException("Failed to initiate the MessageBatch", e);}// 设置topicmsgBatch.setTopic(withNamespace(msgBatch.getTopic()));return msgBatch;}

从代码可以看到,核心思想是将一批消息(Collection msgs)打包为MessageBatch对象,我们看下MessageBatch的声明

public class MessageBatch extends Message implements Iterable<Message> {

private final List<Message> messages;

private MessageBatch(List<Message> messages) {this.messages = messages;}

可以看到MessageBatch继承自Message,持有List 引用。

我们接着看一下generateFromList方法

MessageBatch.generateFromList

public static MessageBatch generateFromList(Collection<Message> messages) {assert messages != null;assert messages.size() > 0;

// 首先实例化一个Message的listList<Message> messageList = new ArrayList<Message>(messages.size());Message first = null;

// 对messages集合进行遍历for (Message message : messages) {

// 判断延时级别,如果大于0抛出异常,原因为:批量消息发送不支持延时if (message.getDelayTimeLevel() > 0) {throw new UnsupportedOperationException("TimeDelayLevel in not supported for batching");}

// 判断topic是否以 **"%RETRY%"** 开头,如果是,// 则抛出异常,原因为:批量发送消息不支持消息重试if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {throw new UnsupportedOperationException("Retry Group is not supported for batching");}

// 判断集合中的每个Message的topic与批量发送topic是否一致,// 如果不一致则抛出异常,原因为:// 批量消息中的每个消息实体的Topic要和批量消息整体的topic保持一致。if (first == null) {first = message;} else {if (!first.getTopic().equals(message.getTopic())) {throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");}

// 判断批量消息的首个Message与其他的每个Message实体的等待消息存储状态是否相同,// 如果不同则报错,原因为:批量消息中每个消息的waitStoreMsgOK状态均应该相同。if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");}}

// 校验通过后,将message实体添加到messageList中messageList.add(message);}

// 将处理完成的messageList作为构造方法,// 初始化MessageBatch实体,并设置topic以及isWaitStoreMsgOK状态。MessageBatch messageBatch = new MessageBatch(messageList);

messageBatch.setTopic(first.getTopic());messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());return messageBatch;}

总结一下,generateFromList方法对调用方设置的Collection 集合进行遍历,经过前置校验之后,转换为MessageBatch对象并返回给DefaultProducer.batch方法中,我们接着看DefaultProducer.batch的逻辑。

到此,通过MessageBatch.generateFromList方法,将发送端传入的一批消息集合转换为了MessageBatch实体。

DefaultProducer.batch

private MessageBatch batch(Collection<Message> msgs) throws MQClientException {

// 声明批量消息体MessageBatch msgBatch;try {// 从Message的list生成批量消息体MessageBatchmsgBatch = MessageBatch.generateFromList(msgs);for (Message message : msgBatch) {Validators.checkMessage(message, this);MessageClientIDSetter.setUniqID(message);message.setTopic(withNamespace(message.getTopic()));}// 设置消息体,此时的消息体已经是处理过后的批量消息体msgBatch.setBody(msgBatch.encode());} catch (Exception e) {throw new MQClientException("Failed to initiate the MessageBatch", e);}// 设置topicmsgBatch.setTopic(withNamespace(msgBatch.getTopic()));return msgBatch;}

注意下面这行代码:

// 设置消息体,此时的消息体已经是处理过后的批量消息体msgBatch.setBody(msgBatch.encode());

这里对MessageBatch进行消息编码处理,通过调用MessageBatch的encode方法实现,代码逻辑如下:

public byte[] encode() {return MessageDecoder.encodeMessages(messages);}

可以看到是通过静态方法encodeMessages(List messages)实现的。

我们看一下encodeMessages方法的逻辑:

public static byte[] encodeMessages(List<Message> messages) {//TO DO refactor, accumulate in one buffer, avoid copiesList<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());int allSize = 0;for (Message message : messages) {

// 遍历messages集合,分别对每个Message实体进行编码操作,转换为byte[]byte[] tmp = encodeMessage(message);// 将转换后的单个Message的byte[]设置到encodedMessages中encodedMessages.add(tmp);// 批量消息的二进制数据长度随实际消息体递增allSize += tmp.length;}byte[] allBytes = new byte[allSize];int pos = 0;for (byte[] bytes : encodedMessages) {// 遍历encodedMessages,按序复制每个Message的二进制格式消息体System.arraycopy(bytes, 0, allBytes, pos, bytes.length);pos += bytes.length;}// 返回批量消息整体的消息体二进制数组return allBytes;}

encodeMessages的逻辑在注释中分析的已经比较清楚了,其实就是遍历messages,并按序拼接每个Message实体的二进制数组格式消息体并返回。

我们可以继续看一下单个Message是如何进行编码的,调用了MessageDecoder.encodeMessage(message)方法,逻辑如下:

public static byte[] encodeMessage(Message message) {//only need flag, body, propertiesbyte[] body = message.getBody();int bodyLen = body.length;String properties = messageProperties2String(message.getProperties());byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);//note properties length must not more than Short.MAXshort propertiesLength = (short) propertiesBytes.length;int sysFlag = message.getFlag();int storeSize = 4 // 1 TOTALSIZE+ 4 // 2 MAGICCOD+ 4 // 3 BODYCRC+ 4 // 4 FLAG+ 4 + bodyLen // 4 BODY+ 2 + propertiesLength;ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);// 1 TOTALSIZEbyteBuffer.putInt(storeSize);

// 2 MAGICCODEbyteBuffer.putInt(0);

// 3 BODYCRCbyteBuffer.putInt(0);

// 4 FLAGint flag = message.getFlag();byteBuffer.putInt(flag);

// 5 BODYbyteBuffer.putInt(bodyLen);byteBuffer.put(body);

// 6 propertiesbyteBuffer.putShort(propertiesLength);byteBuffer.put(propertiesBytes);

return byteBuffer.array();}

这里其实就是将消息按照RocektMQ的消息协议进行编码,格式为:

消息总长度--- 4字节魔数--- 4字节bodyCRC校验码 --- 4字节flag标识 --- 4字节body长度 --- 4字节消息体 --- 消息体实际长度N字节属性长度 --- 2字节扩展属性 --- N字节

通过encodeMessage方法处理之后,消息便会被编码为固定格式,最终会被Broker端进行处理并持久化。

其他

到此便是批量消息发送的源码分析,实际上RocketMQ在处理批量消息的时候是将其解析为单个消息再发送的,这样就在底层统一了单条消息、批量消息发送的逻辑,让整个框架的设计更加健壮,也便于我们进行理解学习。

后续的发送流程这里就不再重复展开了,感兴趣的同学可以移步我们的上一篇文章查看

跟我学RocketMQ之消息发送源码解析

批量消息的源码分析就暂时告一段落,更多的源码分析随后奉上,感谢您的阅读。

版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。