200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > Netty 的 内存池 是如何实现的

Netty 的 内存池 是如何实现的

时间:2021-10-07 00:08:25

相关推荐

Netty 的 内存池 是如何实现的

对于今天的源码剖析,你可以带着下面这么几个问题:

1 PoolArena中的PoolSubpage数组和PoolChunk中的PoolSubpage数组有什么关联?

2 PoolThreadCache中的MemoryRegionCache数组与PoolSubpage是否有联系?

3 Netty内存池的整体结构是什么样的?

4 如果让你来介绍Netty内存池,你如何来描述?

内存池源码剖析

调试用例

上一节,我们说了Netty根据请求的大小将其分成四类:Tiny、Small、Normal、Huge,这四类请求的分界线分别为512B、8KB、16MB,针对这四类请求,Netty的处理逻辑并不一样,但是,本节,我们并不打算把这四种全部讲解一遍,我们只讲解其中的一个——Tiny。

另外,为了加快分配内存的速度,Netty还使用了线程缓存,而线程缓存实际上是在有回收内存的情况下才有效,所以,我们设计的调试用例里面还应该包括回收内存的部分,即ReferenceCountUtil.release(byteBuf)。

public class ByteBufTest {public static void main(String[] args) {// 1. 创建池化的分配器ByteBufAllocator allocator = new PooledByteBufAllocator(false);// 2. 分配一个40B的ByteBufByteBuf byteBuf = allocator.heapBuffer(40);// 3. 写入数据byteBuf.writeInt(4);// 4. 读取数据System.out.println(byteBuf.readInt());// 5. 回收内存ReferenceCountUtil.release(byteBuf);// 6. 分配一个30B的ByteBufByteBuf byteBuf2 = allocator.heapBuffer(30);// 7. 再次分配一个40B的ByteBufByteBuf byteBuf3 = allocator.heapBuffer(40);}}

我们先创建一个池化的分配器,使用其分配一个40B的ByteBuf,接着写入数据并读取数据,然后回收,理论上来说,这个40B的ByteBuf会进入线程缓存,接着再分配一个30B的ByteBuf,观察其是否会从线程缓存中获取,最后再分配一个40B的ByteBuf,观察其是否会从线程缓存中获取。

源码调试

创建池化的分配器让我们先在创建分配器那行打个断点,跟踪进去:

public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,int tinyCacheSize, int smallCacheSize, int normalCacheSize,boolean useCacheForAllThreads, int directMemoryCacheAlignment) {super(preferDirect);// 这是一个ThreadLocal,用于存放线程缓存PoolThreadCache对象threadCache = new PoolThreadLocalCache(useCacheForAllThreads);// Tiny的缓存个数,默认512this.tinyCacheSize = tinyCacheSize;// Small的缓存个数,默认256this.smallCacheSize = smallCacheSize;// Normal的缓存个数,默认64this.normalCacheSize = normalCacheSize;// pageSize默认为8KB// maxOrder表示树的最大高度,默认为11// 计算每个Chunk的大小,默认为16MBchunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);// ...// 计算每页大小的左移位数// pageSize=8KB=10 0000 0000 0000=1<<13// pageShifts表示pageSize是1左移13位得来的// 所以pageShifts为13int pageShifts = validateAndCalculatePageShifts(pageSize);// 初始化堆内存池heapArenas// 默认为CPU核数*2,同时根据堆内存大小调整if (nHeapArena > 0) {heapArenas = newArenaArray(nHeapArena);List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);for (int i = 0; i < heapArenas.length; i ++) {// 创建HeapArena并加入到数组中PoolArena.HeapArena arena = new PoolArena.HeapArena(this,pageSize, maxOrder, pageShifts, chunkSize,directMemoryCacheAlignment);heapArenas[i] = arena;// 同时加入监控metrics.add(arena);}heapArenaMetrics = Collections.unmodifiableList(metrics);} else {heapArenas = null;heapArenaMetrics = Collections.emptyList();}// 初始化直接内存池directArenas// 默认为CPU核数*2,同时根据直接内存大小调整if (nDirectArena > 0) {directArenas = newArenaArray(nDirectArena);List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);for (int i = 0; i < directArenas.length; i ++) {// 创建DirectArena并加入到数组中PoolArena.DirectArena arena = new PoolArena.DirectArena(this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);directArenas[i] = arena;// 同时加入监控metrics.add(arena);}directArenaMetrics = Collections.unmodifiableList(metrics);} else {directArenas = null;directArenaMetrics = Collections.emptyList();}// 监控metric = new PooledByteBufAllocatorMetric(this);}

在PooledByteBufAllocator的构造方法中,主要是初始化一些属性,比如chunkSize、heapArenas和directArenas数组等,默认地,chunkSize为16M,heapArenas和directArenas数组大小为2倍的CPU核数,同时根据内存大小动态调整。

2. 分配一个40B的ByteBuf

// PooledByteBufAllocator#newHeapBuffer@Overrideprotected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {// 先从ThreadLocal中获取一个PoolThreadCache线程缓存对象PoolThreadCache cache = threadCache.get();// 因为我们创建的是基于堆内存的ByteBuf,所以使用heapArena// 另外,在PoolThreadCache初始化的时候会绑定一个最少使用的heapArena// 所以这里是可以获取到的// 具体可参考PooledByteBufAllocator.PoolThreadLocalCache#initialValue()方法PoolArena<byte[]> heapArena = cache.heapArena;final ByteBuf buf;if (heapArena != null) {// 使用heapArena来分配buf = heapArena.allocate(cache, initialCapacity, maxCapacity);} else {buf = PlatformDependent.hasUnsafe() ?new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);}return toLeakAwareBuffer(buf);}

在这段代码中,首先从threadCache中获取了一个PoolThreadCache对象cache,再从这个cache中获取一个heapArena,这个heapArena是从allocator的heapArenas数组中取的一个最少使用的,在cache初始化的时候绑定进去的,为什么绑定的是最少使用的呢?

我们知道,heapArenas数组的大小是固定的,而线程是可能无限多的,每个线程都要绑定一个heapArena,那么,怎么绑定才能减少线程之间的竞争呢?答案很明显,绑定最少使用的那个heapArena。比如,heapArenas数组大小为16,而线程也正好为16,这样的话,线程之间完全没竞争关系,如果按照数组的顺序绑定,最后这16个线程都会绑定到同一个heapArena,竞争非常剧烈。

OK,我们继续跟踪到heapArena.allocate()中:

// PoolArena#allocate(PoolThreadCache, int, int)PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {// 使用对象池创建一个池化的ByteBuf,先不展开,跳过// PooledByteBuf中有一个memory字段用来存放数据// 从对象池中返回的这个buf,它的memory大小为0// 内存池的作用就是为了给这个memory分配空间PooledByteBuf<T> buf = newByteBuf(maxCapacity);// 继续深入allocate(cache, buf, reqCapacity);return buf;}// PoolArena#allocate(PoolThreadCache, PooledByteBuf<T>, int)private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {// 将请求的大小规范化,这里40B会被规范化到48Bfinal int normCapacity = normalizeCapacity(reqCapacity);// 判断是否小于8KBif (isTinyOrSmall(normCapacity)) {// capacity < pageSizeint tableIdx;PoolSubpage<T>[] table;// 是否为Tiny,即小于512Bboolean tiny = isTiny(normCapacity);if (tiny) {// < 512// 先尝试从线程缓存中分配内存if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {// 分配成功则返回return;}// 计算48B在tinySubpagePools数组中的索引// 16、32、48,所以这里的索引是3tableIdx = tinyIdx(normCapacity);// PoolArena的tinySubpagePools数组,大小为32table = tinySubpagePools;} else {// 如果为Small,从这里走if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {return;}tableIdx = smallIdx(normCapacity);table = smallSubpagePools;}// 获取头节点,head中不存放任何数据,仅用来加锁使用final PoolSubpage<T> head = table[tableIdx];// 分段锁的用法,减少锁竞争synchronized (head) {// 因为还没有分配过任何内存,所以next为空,先跳过这一段final PoolSubpage<T> s = head.next;// ...// 分配成功会返回}// 按PoolArena加锁,又是分段锁的用法// 这里就体会到了上面按最少使用去绑定PoolArena的魅力了吧// 减少了锁竞争synchronized (this) {// 这个方法名有点歧义// 并不是说按Normal的请求去处理allocateNormal(buf, reqCapacity, normCapacity);}incTinySmallAllocation(tiny);return;}// 如果是Normal请求if (normCapacity <= chunkSize) {// ...} else {// 如果是Huge请求allocateHuge(buf, reqCapacity);}}

这段代码中运用了大量的分段锁技巧:

1 PoolArena,每个线程绑定一个最少使用的PoolArena,充分减少锁竞争。

2 tinySubpagePools,按16B分成32段,只有分配相同大小(规范后的大小)的请求时且还是两个线程绑定到同一个PoolArena的情况下才有锁竞争。

结合ConcurrentHashMap中分段锁的用法,我们归纳出来一个规律:

分段锁一般都是通过数组来实现的

通过某种规则将多个线程的操作分离到数组中不同的位置上,

以便降低线程之间修改同一段数组的竞争,所以,有时候也叫作锁分离

比如,在ConcurrentHashMap中,是根据key值hash到不同的桶中进行处理。在PoolArena中,是通过线程绑定最小使用PoolArena来达到锁分离的目的。在tinySubpagePools中,是通过将请求分割成不同的段进行处理减少锁竞争。

OK,让我们继续跟踪到allocateNormal()内部:

// PoolArena#allocateNormalprivate void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {// 先尝试从已有的PoolChunk中分配内存// 此时是第一次请求,所以,没有任何可用的PoolChunkif (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) {return;}// key1, 创建一个新的PoolChunkPoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);// key2, 使用这个PoolChunk分配内存boolean success = c.allocate(buf, reqCapacity, normCapacity);assert success;// 这新的PoolChunk加入到qInit中qInit.add(c);}

这个方法中,有两个关键的地方:

1 创建PoolChunk,通过上一节的分析,我们知道,数据全部都是存储在PoolChunk的memory字段中的,那么,这个memory是如何创建的呢?

2 使用PoolChunk分配内存,如何分配?

// PoolArena.HeapArena#newChunk@Overrideprotected PoolChunk<byte[]> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {return new PoolChunk<byte[]>(this, newByteArray(chunkSize), pageSize, maxOrder, pageShifts, chunkSize, 0);}// PoolArena.HeapArena#newByteArrayprivate static byte[] newByteArray(int size) {// 调用PlatformDependentreturn PlatformDependent.allocateUninitializedArray(size);} public static byte[] allocateUninitializedArray(int size) {// 默认地,UNINITIALIZED_ARRAY_ALLOCATION_THRESHOLD=-1// 创建了一个大小为16MB的byte[]数组,在堆内存中return UNINITIALIZED_ARRAY_ALLOCATION_THRESHOLD < 0 || UNINITIALIZED_ARRAY_ALLOCATION_THRESHOLD > size ?new byte[size] : PlatformDependent0.allocateUninitializedArray(size);}// PoolChunk的构造方法PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) {unpooled = false;this.arena = arena;// memory就是上面创建的byte[]数组this.memory = memory;// 8KBthis.pageSize = pageSize;// 13this.pageShifts = pageShifts;// 11this.maxOrder = maxOrder;// 16MBthis.chunkSize = chunkSize;// 0this.offset = offset;// 12,表示如果满二叉树中的节点更新到了12,则表示此节点已完全分配了unusable = (byte) (maxOrder + 1);// 24,用于计算满二叉树中的节点占用的空间大小,比如2048占用8K,而1024占用16Klog2ChunkSize = log2(chunkSize);// -8192,表示subpage溢出的掩码subpageOverflowMask = ~(pageSize - 1);// 初始时可使用的内存等于PoolChunk的整个空间,即16MBfreeBytes = chunkSize;assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;// 一个PoolChunk可以被分成多少页,1<<11=2048maxSubpageAllocs = 1 << maxOrder;// 创建memoryMap和depthMap,满二叉树,最后一层的节点数正好等于上面所有层的节点数// 所以它们的大小为2048*2=4096memoryMap = new byte[maxSubpageAllocs << 1];depthMap = new byte[memoryMap.length];int memoryMapIndex = 1;// 初始化memoryMap和depthMap中的元素为每个节点的高度for (int d = 0; d <= maxOrder; ++ d) {int depth = 1 << d;for (int p = 0; p < depth; ++ p) {memoryMap[memoryMapIndex] = (byte) d;depthMap[memoryMapIndex] = (byte) d;memoryMapIndex ++;}}// 一个PoolChunk可以被分割成2048个Page// Page在Netty中同样使用PoolSubpage来表示subpages = newSubpageArray(maxSubpageAllocs);cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。