200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > RocketMQ入门到入土(五)消息持久化存储源码解析

RocketMQ入门到入土(五)消息持久化存储源码解析

时间:2020-12-12 09:03:38

相关推荐

RocketMQ入门到入土(五)消息持久化存储源码解析

精彩推荐

一百期Java面试题汇总

SpringBoot内容聚合

IntelliJ IDEA内容聚合

Mybatis内容聚合

接上一篇:RocketMQ入门到入土(四)producer生产消息源码剖析

一、原理

1、消息存在哪了?

消息持久化的地方其实是磁盘上,在如下目录里的commitlog文件夹里。

/root/store/commitlog

源码如下:

//{@linkorg.apache.rocketmq.store.config.MessageStoreConfig}//数据存储根目录privateStringstorePathRootDir=System.getProperty("user.home")+File.separator+"store";//commitlog目录privateStringstorePathCommitLog=System.getProperty("user.home")+File.separator+"store"+File.separator+"commitlog";//每个commitlog文件大小为1GB,超过1GB则创建新的commitlog文件privateintmappedFileSizeCommitLog=1024*1024*1024;

比如验证下:

[root@iZ2ze84zygpzjw5bfcmh2hZcommitlog]#pwd/root/store/commitlog[root@iZ2ze84zygpzjw5bfcmh2hZcommitlog]#ll-htotal400K-rw-r--r--1rootroot1.0GJun3018:2100000000000000000000[root@iZ2ze84zygpzjw5bfcmh2hZcommitlog]#

可以清晰的看到文件大小是1.0G,超过1.0G再写入消息的话会自动创建新的commitlog文件。

2、关键类解释

2.1、MappedFile

对应的是commitlog文件,比如上面的00000000000000000000文件。

2.2、MappedFileQueue

MappedFile所在的文件夹,对MappedFile进行封装成文件队列。

2.3、CommitLog

针对MappedFileQueue的封装使用。

二、Broker接收消息

1、调用链

BrokerStartup.start()-》BrokerController.start()-》NettyRemotingServer.start()-》NettyRemotingServer.prepareSharableHandlers()-》newNettyServerHandler()-》NettyRemotingAbstract.processMessageReceived()-》NettyRemotingAbstract.processRequestCommand()-》SendMessageProcessor.processRequest()

2、processRequest

SendMessageProcessor.processRequest()

@OverridepublicRemotingCommandprocessRequest(ChannelHandlerContextctx,RemotingCommandrequest)throwsRemotingCommandException{RemotingCommandresponse=null;try{//调用asyncProcessRequestresponse=asyncProcessRequest(ctx,request).get();}catch(InterruptedException|ExecutionExceptione){log.error("processSendMessageerror,request:"+request.toString(),e);}returnresponse;}

3、asyncProcessRequest

publicCompletableFuture<RemotingCommand>asyncProcessRequest(ChannelHandlerContextctx,RemotingCommandrequest)throwsRemotingCommandException{finalSendMessageContextmqtraceContext;switch(request.getCode()){//表示消费者发送的消息,发送者消费失败会重新发回队列进行消息重试caseRequestCode.CONSUMER_SEND_MSG_BACK:returnthis.asyncConsumerSendMsgBack(ctx,request);default://解析header,也就是我们Producer发送过来的消息都在request里,给他解析到SendMessageRequestHeader对象里去。SendMessageRequestHeaderrequestHeader=parseRequestHeader(request);if(requestHeader==null){pletedFuture(null);}mqtraceContext=buildMsgContext(ctx,requestHeader);//将解析好的参数放到SendMessageContext对象里this.executeSendMessageHookBefore(ctx,request,mqtraceContext);if(requestHeader.isBatch()){//批处理消息用returnthis.asyncSendBatchMessage(ctx,request,mqtraceContext,requestHeader);}else{//非批处理,我们这里介绍的核心。returnthis.asyncSendMessage(ctx,request,mqtraceContext,requestHeader);}}}

4、asyncSendMessage

privateCompletableFuture<RemotingCommand>asyncSendMessage(ChannelHandlerContextctx,RemotingCommandrequest,SendMessageContextmqtraceContext,SendMessageRequestHeaderrequestHeader){finalbyte[]body=request.getBody();intqueueIdInt=requestHeader.getQueueId();TopicConfigtopicConfig=this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());//拼凑message对象MessageExtBrokerInnermsgInner=newMessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());MessageAccessor.setProperties(msgInner,MessageDecoder.string2messageProperties(requestHeader.getProperties()));msgInner.setPropertiesString(requestHeader.getProperties());msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes()==null?0:requestHeader.getReconsumeTimes());CompletableFuture<PutMessageResult>putMessageResult=null;Map<String,String>origProps=MessageDecoder.string2messageProperties(requestHeader.getProperties());//真正接收消息的方法putMessageResult=this.brokerController.getMessageStore().asyncPutMessage(msgInner);returnhandlePutMessageResultFuture(putMessageResult,response,request,msgInner,responseHeader,mqtraceContext,ctx,queueIdInt);}

至此我们的消息接收完成了,都封装到了MessageExtBrokerInner对象里。

三、Broker消息存储(持久化)

1、asyncPutMessage

接着上步骤的asyncSendMessage继续看

@OverridepublicCompletableFuture<PutMessageResult>asyncPutMessage(MessageExtBrokerInnermsg){CompletableFuture<PutMessageResult>putResultFuture=mitLog.asyncPutMessage(msg);putResultFuture.thenAccept((result)->{......});returnputResultFuture;}

2、commitLog.asyncPutMessage

publicCompletableFuture<PutMessageResult>asyncPutMessage(finalMessageExtBrokerInnermsg){//获取最后一个文件,MappedFile就是commitlog目录下的那个0000000000文件MappedFilemappedFile=this.mappedFileQueue.getLastMappedFile();try{//追加数据到commitlogresult=mappedFile.appendMessage(msg,this.appendMessageCallback);switch(result.getStatus()){......}//将内存的数据持久化到磁盘CompletableFuture<PutMessageStatus>flushResultFuture=submitFlushRequest(result,putMessageResult,msg);}}

3、appendMessagesInner

publicAppendMessageResultappendMessagesInner(finalMessageExtmessageExt,finalAppendMessageCallbackcb){//将消息写到内存returncb.doAppend(this.getFileFromOffset(),byteBuffer,this.fileSize-currentPos,(MessageExtBrokerInner)messageExt);}

4、doAppend

@OverridepublicAppendMessageResultdoAppend(finallongfileFromOffset,finalByteBufferbyteBuffer,finalintmaxBlank,finalMessageExtBrokerInnermsgInner){//Initializationofstoragespacethis.resetByteBuffer(msgStoreItemMemory,msgLen);//1TOTALSIZEthis.msgStoreItemMemory.putInt(msgLen);//2MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);//3BODYCRCthis.msgStoreItemMemory.putInt(msgInner.getBodyCRC());//4QUEUEIDthis.msgStoreItemMemory.putInt(msgInner.getQueueId());//5FLAGthis.msgStoreItemMemory.putInt(msgInner.getFlag());//6QUEUEOFFSETthis.msgStoreItemMemory.putLong(queueOffset);//7PHYSICALOFFSETthis.msgStoreItemMemory.putLong(fileFromOffset+byteBuffer.position());//8SYSFLAGthis.msgStoreItemMemory.putInt(msgInner.getSysFlag());//9BORNTIMESTAMPthis.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());//10BORNHOSTthis.resetByteBuffer(bornHostHolder,bornHostLength);this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));//11STORETIMESTAMPthis.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());//12STOREHOSTADDRESSthis.resetByteBuffer(storeHostHolder,storeHostLength);this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));//13RECONSUMETIMESthis.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());//14PreparedTransactionOffsetthis.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());//15BODYthis.msgStoreItemMemory.putInt(bodyLength);if(bodyLength>0)this.msgStoreItemMemory.put(msgInner.getBody());//16TOPICthis.msgStoreItemMemory.put((byte)topicLength);this.msgStoreItemMemory.put(topicData);//17PROPERTIESthis.msgStoreItemMemory.putShort((short)propertiesLength);if(propertiesLength>0)this.msgStoreItemMemory.put(propertiesData);finallongbeginTimeMills=CommitLog.this.defaultMessageStore.now();//WritemessagestothequeuebufferbyteBuffer.put(this.msgStoreItemMemory.array(),0,msgLen);returnresult;}

这一步其实就已经把消息保存到缓冲区里了,也就是msgStoreItemMemory,这里采取的NIO。

privatefinalByteBuffermsgStoreItemMemory;

5、submitFlushRequest

再次回到【2、commitLog.asyncPutMessage】的submitFlushRequest方法,因为之前的方法是将数据已经写到ByteBuffer缓冲区里了,下一步也就是我们现在这一步就要刷盘了。

publicCompletableFuture<PutMessageStatus>submitFlushRequest(AppendMessageResultresult,PutMessageResultputMessageResult,MessageExtmessageExt){//同步刷盘if(FlushDiskType.SYNC_FLUSH==this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()){finalGroupCommitServiceservice=(GroupCommitService)this.flushCommitLogService;if(messageExt.isWaitStoreMsgOK()){GroupCommitRequestrequest=newGroupCommitRequest(result.getWroteOffset()+result.getWroteBytes(),this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());service.putRequest(request);returnrequest.future();}else{service.wakeup();pletedFuture(PutMessageStatus.PUT_OK);}}//异步刷盘else{if(!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()){flushCommitLogService.wakeup();}else{commitLogService.wakeup();}pletedFuture(PutMessageStatus.PUT_OK);}}

6、异步刷盘

classFlushRealTimeServiceextendsFlushCommitLogService{@Overridepublicvoidrun(){while(!this.isStopped()){try{//每隔500ms刷一次盘if(flushCommitLogTimed){Thread.sleep(500);}else{this.waitForRunning(500);}//调用mappedFileQueue的flush方法CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);}catch(Throwablee){}}}}

可看出默认是每隔500毫秒刷一次盘

7、mappedFileQueue.flush

publicbooleanflush(finalintflushLeastPages){MappedFilemappedFile=this.findMappedFileByOffset(this.flushedWhere,this.flushedWhere==0);if(mappedFile!=null){//真正的刷盘操作intoffset=mappedFile.flush(flushLeastPages);}}

8、mappedFile.flush

publicintflush(finalintflushLeastPages){if(this.isAbleToFlush(flushLeastPages)){try{if(writeBuffer!=null||this.fileChannel.position()!=0){//刷盘NIOthis.fileChannel.force(false);}else{//刷盘NIOthis.mappedByteBuffer.force();}}catch(Throwablee){log.error("Erroroccurredwhenforcedatatodisk.",e);}}returnthis.getFlushedPosition();}

至此已经全部结束。

四、总结

面试被问:Broker收到消息后怎么持久化的?

回答者:有两种方式:同步和异步。一般选择异步,同步效率低,但是更可靠。消息存储大致原理是:

核心类MappedFile对应的是每个commitlog文件,MappedFileQueue相当于文件夹,管理所有的文件,还有一个管理者CommitLog对象,他负责提供一些操作。具体的是Broker端拿到消息后先将消息、topic、queue等内容存到ByteBuffer里,然后去持久化到commitlog文件中。commitlog文件大小为1G,超出大小会新创建commitlog文件来存储,采取的nio方式。

五、补充:同步/异步刷盘

1、关键类

2、图解

3、同步刷盘

3.1、源码

//{@linkorg.apache.mitLog#submitFlushRequest()}//Synchronizationflushif(FlushDiskType.SYNC_FLUSH==this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()){//同步刷盘service->GroupCommitServicefinalGroupCommitServiceservice=(GroupCommitService)this.flushCommitLogService;if(messageExt.isWaitStoreMsgOK()){//数据准备GroupCommitRequestrequest=newGroupCommitRequest(result.getWroteOffset()+result.getWroteBytes(),this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());//将数据对象放到requestsWrite里service.putRequest(request);returnrequest.future();}else{service.wakeup();pletedFuture(PutMessageStatus.PUT_OK);}}

putRequest

publicsynchronizedvoidputRequest(finalGroupCommitRequestrequest){synchronized(this.requestsWrite){this.requestsWrite.add(request);}//这里很关键!!!,给他设置成true。然后计数器-1。下面run方法的时候才会进行交换数据且returnif(pareAndSet(false,true)){waitPoint.countDown();//notify}}

run

publicvoidrun(){while(!this.isStopped()){try{//是同步还是异步的关键方法,也就是说组不阻塞全看这里。this.waitForRunning(10);//真正的刷盘逻辑this.doCommit();}catch(Exceptione){CommitLog.log.warn(this.getServiceName()+"servicehasexception.",e);}}}

waitForRunning

protectedvolatileAtomicBooleanhasNotified=newAtomicBoolean(false);//其实就是CountDownLatchprotectedfinalCountDownLatch2waitPoint=newCountDownLatch2(1);protectedvoidwaitForRunning(longinterval){//如果是true,且给他改成false成功的话,则onWaitEnd()且return,但是默认是false,也就是默认情况下这个if不会进。if(pareAndSet(true,false)){this.onWaitEnd();return;}//entrytowaitwaitPoint.reset();try{//等待,默认值是1,也就是waitPoint.countDown()一次后就会激活这里。waitPoint.await(interval,TimeUnit.MILLISECONDS);}catch(InterruptedExceptione){log.error("Interrupted",e);}finally{//给状态值设置成falsehasNotified.set(false);this.onWaitEnd();}}

3.2、总结

总结下同步刷盘的主要流程:

核心类是GroupCommitService,核心方法 是waitForRunning。

先调用putRequest方法将hasNotified变为true,且进行notify,也就是waitPoint.countDown()

其次是run方法里的waitForRunning()waitForRunning()判断hasNotified是不是true,是true则交换数据然后return掉,也就是不进行await阻塞,直接return。

最后上一步return了,没有阻塞,那么顺理成章的调用doCommit进行真正意义的刷盘。

4、异步刷盘

4.1、源码

核心类是:FlushRealTimeService

//{@linkorg.apache.mitLog#submitFlushRequest()}//Asynchronousflushif(!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()){flushCommitLogService.wakeup();}else{commitLogService.wakeup();}pletedFuture(PutMessageStatus.PUT_OK);

run

//{@linkorg.apache.mitLog.FlushRealTimeService#run()}classFlushRealTimeServiceextendsFlushCommitLogService{@Overridepublicvoidrun(){while(!this.isStopped()){try{//每隔500ms刷一次盘if(flushCommitLogTimed){Thread.sleep(500);}else{//根上面同步刷盘调用的是同一个方法,区别在于这里没有将hasNotified变为true,也就是还是默认的false,那么waitForRunning方法内部的第一个判断就不会走,就不会return掉,就会进行下面的await方法阻塞,默认阻塞时间是500毫秒。也就是默认500ms刷一次盘。this.waitForRunning(500);}//调用mappedFileQueue的flush方法CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);}catch(Throwablee){}}}}

4.2、总结

核心类#方法:FlushRealTimeService#run()

判断flushCommitLogTimed是不是true,默认false,是true则直接sleep(500ms)然后进行mappedFileQueue.flush()刷盘。

若是false,则进入waitForRunning(500),这里是和同步刷盘的区别关键所在,同步刷盘之前将hasNotified变为true了,所以直接一套小连招:return+doCommit了 ,异步这里直接调用的waitForRunning(500),在这之前没任何对hasNotified的操作,所以不会return,而是会继续走下面的waitPoint.await(500, TimeUnit.MILLISECONDS);进行阻塞500毫秒,500毫秒后自动唤醒然后进行flush刷盘。也就是异步刷盘的话默认500ms刷盘一次。

END

我知道你 “在看”

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。