上篇文章 跟我学RocketMQ之消息发送源码解析 中,我们已经对普通消息的发送流程进行了详细的解释,但是由于篇幅问题没有展开讲解批量消息的发送。本文中,我们就一起来集中分析一下批量消息的发送是怎样的逻辑。
DefaultProducer.send
RocketMQ提供了批量发送消息的API,同样在DefaultProducer.java中
@Overridepublic SendResult send(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.defaultMQProducerImpl.send(batch(msgs));}
它的参数为Message集合,也就是一批消息。它的另外一个重载方法提供了发送超时时间参数
@Overridepublic SendResult send(Collection<Message> msgs,long timeout) throws MQClientException, RemotingException,MQBrokerException, InterruptedException {return this.defaultMQProducerImpl.send(batch(msgs), timeout);}
可以看到是将消息通过batch()方法打包为单条消息,我们看一下batch方法的逻辑
DefaultProducer.batch
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
// 声明批量消息体MessageBatch msgBatch;try {
// 从Message的list生成批量消息体MessageBatchmsgBatch = MessageBatch.generateFromList(msgs);for (Message message : msgBatch) {Validators.checkMessage(message, this);MessageClientIDSetter.setUniqID(message);message.setTopic(withNamespace(message.getTopic()));}// 设置消息体,此时的消息体已经是处理过后的批量消息体msgBatch.setBody(msgBatch.encode());} catch (Exception e) {throw new MQClientException("Failed to initiate the MessageBatch", e);}// 设置topicmsgBatch.setTopic(withNamespace(msgBatch.getTopic()));return msgBatch;}
从代码可以看到,核心思想是将一批消息(Collection msgs)打包为MessageBatch对象,我们看下MessageBatch的声明
public class MessageBatch extends Message implements Iterable<Message> {
private final List<Message> messages;
private MessageBatch(List<Message> messages) {this.messages = messages;}
可以看到MessageBatch继承自Message,持有List 引用。
我们接着看一下generateFromList方法
MessageBatch.generateFromList
public static MessageBatch generateFromList(Collection<Message> messages) {assert messages != null;assert messages.size() > 0;
// 首先实例化一个Message的listList<Message> messageList = new ArrayList<Message>(messages.size());Message first = null;
// 对messages集合进行遍历for (Message message : messages) {
// 判断延时级别,如果大于0抛出异常,原因为:批量消息发送不支持延时if (message.getDelayTimeLevel() > 0) {throw new UnsupportedOperationException("TimeDelayLevel in not supported for batching");}
// 判断topic是否以 **"%RETRY%"** 开头,如果是,// 则抛出异常,原因为:批量发送消息不支持消息重试if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {throw new UnsupportedOperationException("Retry Group is not supported for batching");}
// 判断集合中的每个Message的topic与批量发送topic是否一致,// 如果不一致则抛出异常,原因为:// 批量消息中的每个消息实体的Topic要和批量消息整体的topic保持一致。if (first == null) {first = message;} else {if (!first.getTopic().equals(message.getTopic())) {throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");}
// 判断批量消息的首个Message与其他的每个Message实体的等待消息存储状态是否相同,// 如果不同则报错,原因为:批量消息中每个消息的waitStoreMsgOK状态均应该相同。if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");}}
// 校验通过后,将message实体添加到messageList中messageList.add(message);}
// 将处理完成的messageList作为构造方法,// 初始化MessageBatch实体,并设置topic以及isWaitStoreMsgOK状态。MessageBatch messageBatch = new MessageBatch(messageList);
messageBatch.setTopic(first.getTopic());messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());return messageBatch;}
总结一下,generateFromList方法对调用方设置的Collection 集合进行遍历,经过前置校验之后,转换为MessageBatch对象并返回给DefaultProducer.batch方法中,我们接着看DefaultProducer.batch的逻辑。
到此,通过MessageBatch.generateFromList方法,将发送端传入的一批消息集合转换为了MessageBatch实体。
DefaultProducer.batch
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
// 声明批量消息体MessageBatch msgBatch;try {// 从Message的list生成批量消息体MessageBatchmsgBatch = MessageBatch.generateFromList(msgs);for (Message message : msgBatch) {Validators.checkMessage(message, this);MessageClientIDSetter.setUniqID(message);message.setTopic(withNamespace(message.getTopic()));}// 设置消息体,此时的消息体已经是处理过后的批量消息体msgBatch.setBody(msgBatch.encode());} catch (Exception e) {throw new MQClientException("Failed to initiate the MessageBatch", e);}// 设置topicmsgBatch.setTopic(withNamespace(msgBatch.getTopic()));return msgBatch;}
注意下面这行代码:
// 设置消息体,此时的消息体已经是处理过后的批量消息体msgBatch.setBody(msgBatch.encode());
这里对MessageBatch进行消息编码处理,通过调用MessageBatch的encode方法实现,代码逻辑如下:
public byte[] encode() {return MessageDecoder.encodeMessages(messages);}
可以看到是通过静态方法encodeMessages(List messages)实现的。
我们看一下encodeMessages方法的逻辑:
public static byte[] encodeMessages(List<Message> messages) {//TO DO refactor, accumulate in one buffer, avoid copiesList<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());int allSize = 0;for (Message message : messages) {
// 遍历messages集合,分别对每个Message实体进行编码操作,转换为byte[]byte[] tmp = encodeMessage(message);// 将转换后的单个Message的byte[]设置到encodedMessages中encodedMessages.add(tmp);// 批量消息的二进制数据长度随实际消息体递增allSize += tmp.length;}byte[] allBytes = new byte[allSize];int pos = 0;for (byte[] bytes : encodedMessages) {// 遍历encodedMessages,按序复制每个Message的二进制格式消息体System.arraycopy(bytes, 0, allBytes, pos, bytes.length);pos += bytes.length;}// 返回批量消息整体的消息体二进制数组return allBytes;}
encodeMessages的逻辑在注释中分析的已经比较清楚了,其实就是遍历messages,并按序拼接每个Message实体的二进制数组格式消息体并返回。
我们可以继续看一下单个Message是如何进行编码的,调用了MessageDecoder.encodeMessage(message)方法,逻辑如下:
public static byte[] encodeMessage(Message message) {//only need flag, body, propertiesbyte[] body = message.getBody();int bodyLen = body.length;String properties = messageProperties2String(message.getProperties());byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);//note properties length must not more than Short.MAXshort propertiesLength = (short) propertiesBytes.length;int sysFlag = message.getFlag();int storeSize = 4 // 1 TOTALSIZE+ 4 // 2 MAGICCOD+ 4 // 3 BODYCRC+ 4 // 4 FLAG+ 4 + bodyLen // 4 BODY+ 2 + propertiesLength;ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);// 1 TOTALSIZEbyteBuffer.putInt(storeSize);
// 2 MAGICCODEbyteBuffer.putInt(0);
// 3 BODYCRCbyteBuffer.putInt(0);
// 4 FLAGint flag = message.getFlag();byteBuffer.putInt(flag);
// 5 BODYbyteBuffer.putInt(bodyLen);byteBuffer.put(body);
// 6 propertiesbyteBuffer.putShort(propertiesLength);byteBuffer.put(propertiesBytes);
return byteBuffer.array();}
这里其实就是将消息按照RocektMQ的消息协议进行编码,格式为:
消息总长度--- 4字节魔数--- 4字节bodyCRC校验码 --- 4字节flag标识 --- 4字节body长度 --- 4字节消息体 --- 消息体实际长度N字节属性长度 --- 2字节扩展属性 --- N字节
通过encodeMessage方法处理之后,消息便会被编码为固定格式,最终会被Broker端进行处理并持久化。
其他
到此便是批量消息发送的源码分析,实际上RocketMQ在处理批量消息的时候是将其解析为单个消息再发送的,这样就在底层统一了单条消息、批量消息发送的逻辑,让整个框架的设计更加健壮,也便于我们进行理解学习。
后续的发送流程这里就不再重复展开了,感兴趣的同学可以移步我们的上一篇文章查看
跟我学RocketMQ之消息发送源码解析
批量消息的源码分析就暂时告一段落,更多的源码分析随后奉上,感谢您的阅读。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。