200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > RocketMQ消息存储 刷盘 负载均衡

RocketMQ消息存储 刷盘 负载均衡

时间:2021-10-16 04:36:59

相关推荐

RocketMQ消息存储 刷盘 负载均衡

消息存储

消息存储是RocketMQ中最为复杂和最为重要的一部分。

消息存储总体架构

消息存储架构图:

minOffset:当前队列的最小消息偏移量,如果消费时指定从最早消费,就是从该偏移量消费。

maxOffset:当前队列消息的最大偏移量,就是最新的那个小心的偏移量。

consumerOffset:当前消费者在该队列的消费偏移量,如果是集群消费模式,那么代表的是当前消费者组在该队列的消费偏移量,如果是广播消费模式,那么就代表当前消费者在该队列的消费偏移量。

消息存储架构图中主要有下面三个跟消息存储相关的文件构成。

commitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;也就是这个文件是真正存储消息的文件。这个跟kafka有点不一样,kafka是按照topic-partition为文件夹来区分数据日志文件,而RocketMQ是直接所以topic消息一起存储。

ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M;

就是根据这个索引文件来索引commitLog文件的具体消息,提高效率。

ConsumeQueue文件按照topic + queue来区分。

(3) IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:KaTeX parse error: Undefined control sequence: \store at position 6: HOME \̲s̲t̲o̲r̲e̲\index{fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。

在上面的RocketMQ的消息存储整体架构图中可以看出,RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。RocketMQ的混合型存储结构(多个Topic的消息实体内容都存储于一个CommitLog中)针对Producer和Consumer分别采用了数据和索引部分相分离的存储结构,Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。正因为如此,Consumer也就肯定有机会去消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。

页缓存:

页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache。对于数据的写入,OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。

在RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,在page cache机制的预读取作用下,Consume Queue文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能。而对于CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为“Deadline”(此时块存储采用SSD的话),随机读的性能也会有所提升。

另外,RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。(0拷贝)

刷盘

同步刷盘:如上图所示,只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。

异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。但是会稍微降低消息可靠性。

什么时候删除消息物理文件

消息存储在CommitLog之后,的确是会被清理的,但是这个清理只会在以下任一条件成立才会批量删

除消息文件(CommitLog):

. 消息文件过期(默认72小时),且到达清理时点(默认是凌晨4点),删除过期文件。消息文件过期(默认72小时),且磁盘空间达到了水位线(默认75%),删除过期文件。磁盘已经达到必须释放的上限(85%水位线)的时候,则开始批量清理文件(无论是否过期),直

到空间充足。

若磁盘空间达到危险水位线(默认90%),出于保护自身的目的,broker会拒绝写入服务

负载均衡

RocketMQ中的负载均衡都在Client端完成,具体来说的话,主要可以分为Producer端发送消息时候的负载均衡和Consumer端订阅消息的负载均衡。

由上面的消息存储可知,在同一个broker中,所有消息都是通过commitLog统一存储,然后通过topic-queue来索引消息。所以可以近似认为topic是逻辑主题,实际存储消息是queue(分区)。

Producer的负载均衡

Producer端在发送消息的时候,会先根据Topic找到指定的TopicPublishInfo,在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下selectOneMessageQueue()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。

选择broker->topic->queue

Consumer的负载均衡

在RocketMQ中,Consumer端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。在两种基于拉模式的消费方式(Push/Pull)中,均需要Consumer端在知道从Broker端的哪一个消息队列—队列中去获取消息。因此,有必要在Consumer端来做负载均衡,即Broker端中多个MessageQueue分配给同一个ConsumerGroup中的哪些Consumer消费。

消费端会通过RebalanceService线程,20秒钟做一次基于topic下的所有队列负载

消费端遍历自己的所有topic,依次调rebalanceByTopic。根据topic获取此topic下的所有queue。选择一台broker获取基于group的所有消费端(有心跳向所有broker注册客户端信息),就是选择一台broker,获取某个groupId下的所有消费者。选择队列分配策略实例AllocateMessageQueueStrategy执行分配算法

分配算法

(AllocateMessageQueueAveragely)平均分配算法(默认)(AllocateMessageQueueAveragelyByCircle)环状分配消息队列(AllocateMessageQueueByConfig)按照配置来分配队列: 根据用户指定的配置来进行负载(AllocateMessageQueueByMachineRoom)按照指定机房来配置队列(AllocateMachineRoomNearby)按照就近机房来配置队列:(AllocateMessageQueueConsistentHash)一致性hash,根据消费者的cid进行

重新负载触发时机:

消费者数量发送变化。每20秒会触发检查一次rebalance

消息消费队列在同一消费组不同消费者之间的负载均衡,其核心设计理念是在一个消息消费队列在同一时间只允许被同一消费组内的一个消费者消费,一个消息消费者能同时消费多个消息队列。这个跟kafka是一致的。

就是当(同一消费者组的消费者):

queue数量=consumer数量时,queue与consumer一对一指定。queue数量>consumer数量时,其中一些消费者会消费多个队列。queue数量<consumer数量,queue与consumer一对一指定,多出来的消费者空闲。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。