200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > 解析RocketMQ的消息索引文件consumequeue

解析RocketMQ的消息索引文件consumequeue

时间:2019-01-31 15:15:40

相关推荐

解析RocketMQ的消息索引文件consumequeue

CommitLog的文件结构

下图展示了CommitLog的文件结构,可以看到,包含了topic、queueId、消息体等核心信息。

同Kafka一样,消息是变长的,顺序写入。

如下图所示:

ConsumeQueue的文件结构

ConsumeQueue中并不需要存储消息的内容,而存储的是消息在CommitLog中的offset。也就是说,ConsumeQueue其实是CommitLog的一个索引文件。

如下图所示:

ConsumeQueue是定长的结构,每1条记录固定的20个字节。很显然,Consumer消费消息的时候,要读2次:先读ConsumeQueue得到offset,再读CommitLog得到消息内容。

解析代码

import java.io.DataInputStream;import java.io.File;import java.io.FileInputStream;import java.io.IOException;public class TestCQ {public static void main(String[] args) throws IOException {decodeCQ(new File("D:\\consumeQueue\\0\\00000000000000000000"));}static void decodeCQ(File consumeQueue) throws IOException {FileInputStream fis = new FileInputStream(consumeQueue);DataInputStream dis = new DataInputStream(fis);long preTag = 0;long count = 1;while (true) {long offset = dis.readLong();int size = dis.readInt();long tag = dis.readLong();if (size == 0) {break;}if ((tag - preTag) != 1) {// System.err.printf("%d: %d %d %d\n", count++, tag, size,// offset);System.out.printf("[ERROR]%d: %d %d %d\n", count++, tag, size, offset);}preTag = tag;System.out.printf("%d: %d %d %d\n", count++, tag, size, offset);}fis.close();}}

结果如下:

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。