前面做了很多铺垫(Netty源码之内存管理(一)),带着大家熟悉了与内存分配相关的类的定义和分配逻辑。但并没有真正落实到 jemalloc 思想在源码是如何体现的。本章就是对 PoolChunk
逐字解析,死扣细节。在分析源码之前我们需要对分配的内存级别有一个清晰的定位,当分配 Huge
级别对象,直接使用 PoolChunk
包装,并没有复杂的分配逻辑。而对于 tiny&small&normal
级别来说,进行精细化的内存管理十分有必要的。
开局一张图:PoolChunk
本质就是维护这一棵满二叉树,这棵树默认管理 16MB
内存(这个值是可以手动设置)。memoryMap[]
是可变的数组,Netty 在这个数组上逻辑构建一棵满二叉树(当然也可以用链表之类的数据结构,但是随机索引效率不高,我们可以根据数组索引快速定位到某一层的第一个节点。但链表是不能做到的),depthMap[]
表示每个节点对应的深度,这是不可变的。一个 Long
型被分成高、低两部分,高 32 位记录小内存分配信息,低 32 位记录节点下标值。当分配 normal
级别内存时,只有低 32 位信息有用,它的值表示节点序号(起始值为 1),当分配 tiny&normal
级别内存时,高、低两部分确定某个 page
下的某个 subpage
。
还有一个比较有意思的是任意节点所管理的内存大小都是 2 的次幂,因此 Netty 会对用户申请的内存大小进行规格化的原因就在这里。任意规格值(当然不能超过 PoolChunkSize
)都能找到合适的节点,除非没有节点可满足当前内存申请,那只能新创建一个 PoolChunk
。还有另外一个疑问就是如何更新 memoryMap[]
数组呢,请看大屏幕:
index 表示节点索引,value 表示对应 memoryMap[index] 。
用户第一次申请 4MB 大小内存,由于内存大小确定,因此所在的层的位置也可以通过 maxOrder - (log2(normCapacity) - pageShifts)
确定,4MB 对应层数(也可理解为深度)为 2。
内存分配过程如下,其实对应方法 allocateNode(int)
实现逻辑: 首先判断节点 1 的使用状态: memoryMap1 != unusable(12) 表示节点 1 可用,但由于层数不匹配,所以获取子树节点 2,同时判断使用状态,发现 != 12 且层数不匹配,那就继续获取子节点 4,发现 !=12 且层数匹配,所以节点 4 就用作此次内存分配的节点,并更新 memoryMap[4]=12 表示节点 4 已使用,变量 handle
的低 32 位记录子节点位置信息,同时循环更新父节点的 memoryMap,父节点的值是子节点的 memoryMap 的最小值,所以 memoryMap[2]=2,memoryMap[1] =1。这样,第一次申请 4MB 大小内存就算完成了。
第二次申请 4MB 大小内存,当在第 2 层判断节点 4 的 memoryMap 值等于 12,会判断兄弟节点 5 是否满足分配。大家好好体会。
PoolChunk 内存分配
PoolChunk 是 jemalloc3.x 算法思想的体现,里面以 allocate
开头的 API 就是内存分配算法的实现。入口方法是 allocate(PooledByteBuf, int, int)
。
allocate(PooledByteBuf, int, int)
这个方法做的事情有:
- 根据申请内存大小选择合适的分配策略。具体为如果 >=pageSize,使用
allocateRun()
方法分配,否则使用allocateSubpage()
分配,它们都会返回句柄值 handle。 - 初始化
ByteBuf
。
源码如下:
// io.netty.buffer.PoolChunk#allocate
/**
*
* @param buf 「ByteBuf」对象,它是物理内存的承托
* @param reqCapacity 用户所需内存大小
* @param normCapacity 规格值
* @return
*/
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
// 低32位: 节点索引
// 高32位: 位图索引
final long handle;
// 位操作判断大小
if ((normCapacity & subpageOverflowMask) != 0) {
// #1 申请>=8KB
handle = allocateRun(normCapacity);
} else {
// #2 申请<8KB
handle = allocateSubpage(normCapacity);
}
if (handle < 0) {
return false;
}
// #3 如果「PoolChunk」存在缓存的「ByteBuffer」就复用
ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null;
// #4 初始化ByteBuf内存相关信息
initBuf(buf, nioBuffer, handle, reqCapacity);
return true;
}
allocateRun(int)
方法 allocateRun(int) 做的事情也不多,主要有:
- 根据规格值计算所对应的深度
d
- 调用
allocateNode(d)
完成内存分配 更新剩余空闲值 ```java // io.netty.buffer.PoolChunk#allocateRun /**
申请大小为「norCapacity」的内存块 */ private long allocateRun(int normCapacity) { // #1 计算当前规格值所对应树的深度d // log2(normCapacity): 获取当前值所对应最高位1的序号, // pageShifts: 默认值为 13,也就是 pageSize=8192 最高位为1的序号,因为这里分配的是 >=8192, // 所以需要减去它的偏移量,即从0开始。 // maxOrder: 默认值为 11,maxOrder - 偏移量 = 确切(合适)的树高度 // 可以想象normCapacity 从 8192 不断向上增长,那树的高度也不断变小 int d = maxOrder - (log2(normCapacity) - pageShifts);
// #2 △在深度d的节点中寻找空闲节点并分配内存 int id = allocateNode(d); if (id < 0) {
return id;
}
// #3 更新剩余空闲值 freeBytes -= runLength(id);
// #4 返回信息 return id; }
private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1; // 31 /**
- 获取以2为底的对数值
- 思路是数有多少个0
*/
private static int log2(int val) {
// compute the (0-based, with lsb = 0) position of highest set bit i.e, log2
// Integer.numberOfLeadingZeros(int): 返回无符号整型的最高非零位前面的0的个数(包括符号位在内)
// 31-0位数量=非0位数量,比如 0000…1000 Integer.numberOfLeadingZeros(0000…1000) = 28,
// 31-28=3,其实就是获取最高位1的序号(从右至左,起始序号为0)
return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);
}
```
allocateNode(int)
终于到了内存分配的重头戏,它属于节点粒度的分配逻辑。整体思路并不难,前面也通过图解讲述过,但由于采用了太多位运算所以看起来有点头晕。所以我们先熟悉一下部分位运算公式,规定
id^=1
: id 为奇数则 -1,id 为偶数则 +1。这里用来获取偶数的兄弟节点。比如 id=2,则其兄弟节点为 id^=1 = 3。id<<=1
: 相当于id=id*2
,目的是跳转到节点 id 的左子节点。比如 id = 2,它的左子节点值为 4。1<<d
: 表示 1*2 。对在任意深度为d的节点,节点的索引值在 2 到 2-1 范围内。比如深度为 1,则索引值在 [2, 3],当深度为 2,索引值在 [4, 7] 范围内。initial=-(1 << d)
: 对 2取反,目的是用来判断与目标深度值 d 是否匹配,可以把 initial 可以看成是掩码。当匹配目标深度,有 id & initial == initial,若当前深度<目标深度,有 id & initial ==0。比如目标深度为 2,那 initial=-4,当id=1时,id&initial=0,说明还没有到达目标深度,获取最左子节点(id=id*2)2,此时 2 & initial=0,说明还没有到达目标深度,继续获取最左子节点 4,此时 4&-4=4,此时就找到了目标深度。然后就可以从左到右找寻空闲节点并进行内存分配。
allocateNode(int depth)
目标是在深度 d 中找到空闲的节点并,如果存在按规则更新 memoryMap 相应节点的值并返回节点序号。思路也是比较清晰,从头结点开始判断,如果可分配但深度不匹配则获取左子节点,如果不可分配就返回 -1。继续判断左子节点是否可分配以及深度是否匹配,如果都不匹配,继续重复上面步骤。如果深度匹配但当前节点不可分配(val>d=true),那就获取兄弟节点继续重复上述步骤。如果深度匹配且当前节点可分配,则该节点就是此次申请的目标节点,并将它设置为 unusable 不可用状态,同时,按公式 memoryMap[父节点] = Min(子节点1,子节点2) 循环更新 memoryMap。
还有一个有意思的点需要注意,就是 memoryMap 存储的值,它是这棵树的核心。初始化的值以及后续更新也做得非常巧妙,我说不上来,大家慢慢休会吧。通过内存分配示意图再来休会一下上面的文字描述:
相关源码解析如下
// io.netty.buffer.PoolChunk#allocateNode
/**
* 在深度「d」查找空闲节点
* @param d
* @return 空闲节点索引,如果返回-1表示当前「PoolChunk」无法满足此次内存分配
*/
private int allocateNode(int d) {
int id = 1;
// 用于判断节点是否匹配深度「d」
int initial = - (1 << d);
// #1 判断根节点是否可用
byte val = value(id);
if (val > d) {
// 根节点不可用,返回「-1」
return -1;
}
// #2 从上往下、从左往右遍历节点。直到找到合适的空闲节点并返回索引值。
// id&initial=0: 表明节点id 匹配 深度d,否则不匹配
// val<d: 表明节点id有空闲内存可分配,若val>d表明节点id无空闲内存可分配
// 循环退出条件是 val>d 或 id&initial !=0,当匹配到空闲节点时,通常 val<d 且 id&initial !=0
while (val < d || (id & initial) == 0) {
// id = id * 2;
id <<= 1;
// 获取momeryMap[id]的值
val = value(id);
// 如果val>d,表明节点「id」不满足此次分配
// 这个判断十分巧妙,前面也出现过一次,用来判断根节点是否满足深度为d的内存申请,
// 如果val>d则不满足,因为,父节点的val值是两个子节点的最小值,
// 因此如果父节点的val>d那说明子节点无法满足当前深度d的内存申请
if (val > d) {
// 获取兄弟节点, 继续查找
id ^= 1;
val = value(id);
}
}
byte value = value(id);
assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
value, id & initial, d);
// #3 将选中的节点标记为「unusable」
setValue(id, unusable);
// #4 循环更新父节点「memoryMap」的值,其值取两个子节点最小的那个值
updateParentsAlloc(id);
// #5 Return
return id;
}
private byte value(int id) {
return memoryMap[id];
}
/**
* 从节点id开始一直到根节点,更新对应的memoryMap的值
*/
private void updateParentsAlloc(int id) {
while (id > 1) {
int parentId = id >>> 1;
byte val1 = value(id);
byte val2 = value(id ^ 1);
byte val = val1 < val2 ? val1 : val2;
setValue(parentId, val);
id = parentId;
}
}
allocateNode(int depth)
是分配 Normal
级别的核心方法,本质是维护 memoryMap[]
数组,遍历树查找空闲内存满足本次内存申请。
allocateSubpage(int)
这个方法是申请 Tiny&Small
级别内存。上一章节讲过对该内存分配的思想: 简单一句话,将某个空闲的 page 拆分成若干个 subpage,使用对象 PoolSubpage
对这些若干个 subpage 进行管理。
// io.netty.buffer.PoolChunk#allocateSubpage
/**
* 申请「tiny&small」级别内存
*/
private long allocateSubpage(int normCapacity) {
// #1 先去「PoolSubpagePools[]」数组中是否能找到合适的「PoolSubpage」
// 如果存在,则使用现成的,否则得新建一个「PoolSubpage」对象后再放入「PoolSubpagePools[]」数组中
// 「PoolSubpage[]」每个节点都会初始化一个head节点,所以这里返回一定不为空
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
// 子页直接可叶子节点层查找
int d = maxOrder;
// 对头结点上锁,通过数组减少锁的粒度,提高并发性能
synchronized (head) {
// #2 在深度d获取空闲子叶
int id = allocateNode(d);
if (id < 0) {
return id;
}
// 获取成功,对该「page」进行拆分改造
final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize;
freeBytes -= pageSize;
// #3 获取偏移量(相对于 maxSubpageAlloc)
// 可以理解为 subpageIdx = id - maxSubpageAllocs
int subpageIdx = subpageIdx(id);
// 定位到「PoolChunk」内部的「PoolSubpage[]」
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
// 如果不存在,则创建
subpage = new PoolSubpage<T>(head, // 这个头结点来自「PoolArena」
this, // 当前「PoolChunk」
id, // 节点「id」
runOffset(id), // 节点「id」字节偏移量
pageSize, // 页大小
normCapacity); // 每个「subpage」等份大小
// 记录新创建的「PoolSubpage」
subpages[subpageIdx] = subpage;
} else {
subpage.init(head, normCapacity);
}
// #4 使用「PoolSubpage」分配内存
return subpage.allocate();
}
}
/**
* 移除最高位,获得偏移量
*/
private int subpageIdx(int memoryMapIdx) {
return memoryMapIdx ^ maxSubpageAllocs; // remove highest set bit, to get offset
}
/**
* 获取节点「id」字节偏移量。
*/
private int runOffset(int id) {
// << 优先级高于 ^
// depth(id): 获取节点「id」对应的深度
// int index = 1 << depth(id) => 2^depth(id),获取节点「id」所在层的最左节点的索引值
// id ^ index => |id-index|,节点id相对所在层的最左节点的偏移量
int shift = id ^ 1 << depth(id);
// runLength(id): 获取节点id的单位值(byte)比如,对于序号为4的节点,它所对应的值为 4194304,即 4MB
// shift * runLength(id): 也就是前面已用的数
// size=runLength(id): 节点id所分配内存大小
// shift*size: 字节偏移量
return shift * runLength(id);
}
/**
* 获取节点「id」所分配的内存大小,单位:字节
* 等价于 chunkSize/(2^maxOrder)
*/
private int runLength(int id) {
// log2ChunkSize=log2chunkSize
return 1 << (log2ChunkSize - depth(id));
}
这里对 allocateSubpage(int)
源码做个小总结: 这个方法主要的目的是创建一个 PoolSubpage
对象,然后委托这个对象完成 tiny&small
级别内存分配。PoolChunk 在内部使用 PoolSubage
PoolSubpage 内存分配
PoolSubpage 内部相关变量之前已经解释过,在这里解释的。它管理内存的逻辑是将 pageSize 大小的内存块等分成若干个子块,子块个数是根据本次申请内存大小所决定,比如申请 1KB 内存,那么会找到一个空闲的 page 并将其拆分成 8 等份(8KB/1KB=8)。并使用位图记录每份子块的使用状态,1 表示已使用,0 表示未使用。最多可分为 512 等份,底层使用 long[] 数组存储位图信息。64 位的句柄值的高 32 位存储位信息,低 32 位存储储节点索引值。因此,核心的问题是如何使用 long[] 数组记录使用情况呢?
源码之下无秘密:
// io.netty.buffer.PoolSubpage#allocate
/**
* 「PoolSubpage」内存分配入口
* @return 返回此次内存分配在bitmap的索引值
*/
long allocate() {
if (elemSize == 0) {
return toHandle(0);
}
// 无可用分片,返回-1
if (numAvail == 0 || !doNotDestroy) {
return -1;
}
// #1 获取下一个可用的分片索引(绝对值)
// 第一个索引值为0
final int bitmapIdx = getNextAvail();
// #2 除以64,确定bitmap[]哪一个
// 第一个bitmap索引值为0
int q = bitmapIdx >>> 6;
// #3 &63: 确认64位长度long的哪一位
// 除以64取余,获取当前绝对 id 的偏移量
// 63: 0011 1111
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) == 0;
// 更新第r位的值为1
// << 优先级高于 |=
bitmap[q] |= 1L << r;
// 更新可用数量
if (-- numAvail == 0) {
// 如果可用数量为0,表示子页中再无可分配的空间
// 需要从双向链表中移除
removeFromPool();
}
// 将bitmapIdx 转换为long存储,long 高32位存储的是小内存位置索引
return toHandle(bitmapIdx);
}
/**
* 获取下一个可用的「分片内存块」
*/
private int getNextAvail() {
int nextAvail = this.nextAvail;
// nextAvail>=0,表明可以直接使用
if (nextAvail >= 0) {
this.nextAvail = -1;
return nextAvail;
}
// nextAvaild<0,需要寻找下一个可用的「分片内存块」
return findNextAvail();
}
/**
* 获取下一个可用的「分片内存块」
* 本质是搜索 bitmap[] 数组为0的索引值
*/
private int findNextAvail() {
final long[] bitmap = this.bitmap;
final int bitmapLength = this.bitmapLength;
// 循环遍历
for (int i = 0; i < bitmapLength; i ++) {
long bits = bitmap[i];
// #1 先判断整个bits是否有「0」位
// 不可用时bits为「0XFFFFFFFFFFFFFFFF」,~bits=0
// 可用时~bits !=0
if (~bits != 0) {
// #2 找寻可用的位
return findNextAvail0(i, bits);
}
}
return -1;
}
/**
* 搜索下一个可用位
*
*/
private int findNextAvail0(int i, long bits) {
final int maxNumElems = this.maxNumElems;
// i << 6 => i * 2^6=i*64
// 想象把long[]展开,baseVal就是基址
final int baseVal = i << 6;
for (int j = 0; j < 64; j ++) {
// bits & 1: 判断最低位是否为0
if ((bits & 1) == 0) {
// 找到空闲子块,组装数据
// baseVal|j => baseVal + j,基址+位的偏移值
int val = baseVal | j;
// 不能越界
if (val < maxNumElems) {
return val;
} else {
break;
}
}
// 无符号右移1位
bits >>>= 1;
}
return -1;
}
// io.netty.buffer.PoolSubpage#toHandle
/**
* 将bitmap索引信息写入高32位,memoryMapIdx信息写入低32位
*
* 0x4000000000000000L: 最高位为1,其他所有位为0。
* 为什么使用0x4000000000000000L数值?
* 是因为对于第一次小内存分配情况,如果高32位为0,则返回句柄值的高位为 0,
* 低32位为 2048(第11层的第一个节点的索引值),但是这个返回值并不会当成子页来处理,从而影响后续的逻辑判断
* 详见 https://blog.csdn.net/wangwei19871103/article/details/104356566
* @param bitmapIdx bitmap索引值
* @return 句柄值
*/
private long toHandle(int bitmapIdx) {
return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
}
bitmap 填充示意图
小结上面的源码: PoolSubpage 使用 8 个 long 值存储子块的使用情况,句柄高 32 位存储位图索引值,低 32 位存储 节点索引值。通过大量的位运算提高了性能,通过源码阅读,也提升了位编程应用技巧。
这有一个关于 PoolSubpage 的是如何和 PoolArena 配合使用,因为我们知道,PoolArena 也存有 PoolSubpage[] 数组对象,这些数组对象是怎么被添加的呢?答案在在初始化 PoolSubpage 时就添加到对应的 PoolArena#poolsubpage[] 中了。源码如下:
// io.netty.buffer.PoolSubpage
/**
* 「PoolSubpage」构造器
* @param head 从「PoolArena」对象中获取「PoolSubpage」结点做头结点
* @param chunk 当前「PoolSubpage」属性的「PoolChunk」对象
* @param memoryMapIdx 所属的「Page」的节点值
* @param runOffset 对存储容器为「byte[]」有用,表示偏移量
* @param pageSize 页大小,默认值为: 8KB
* @param elemSize 元素个数
*/
PoolSubpage(PoolSubpage<T> head,
PoolChunk<T> chunk,
int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
this.chunk = chunk;
this.memoryMapIdx = memoryMapIdx;
this.runOffset = runOffset;
this.pageSize = pageSize;
bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
// 添加到「head」链表中
init(head, elemSize);
}
// 初始化「PoolSubpage」
void init(PoolSubpage<T> head, int elemSize) {
doNotDestroy = true;
this.elemSize = elemSize;
if (elemSize != 0) {
// 初始化各类参数
maxNumElems = numAvail = pageSize / elemSize;
nextAvail = 0;
// 根据元素个数确定所需要bitmap个数,即确认「bitmapLength」值
bitmapLength = maxNumElems >>> 6;
if ((maxNumElems & 63) != 0) {
bitmapLength ++;
}
for (int i = 0; i < bitmapLength; i ++) {
bitmap[i] = 0;
}
}
// 添加至双向链表中,供后续分配使用
addToPool(head);
}
private void addToPool(PoolSubpage<T> head) {
assert prev == null && next == null;
prev = head;
next = head.next;
next.prev = this;
head.next = this;
}
以上,就是对 PoolSubpage 内存分析的源码解析,理清思路,功能拆解之后并不困难。
PoolChunk 如何回收内存
讲完了物理内存分配,还没有讲 PoolChunk 是如何回收内存。
// io.netty.buffer.PoolChunk#free
/**
* 「PoolChunk」释放「handle」表示的内存块。
* 本质是修改对应节点的「memoryMap」值。
* 参数「nioByteBuf」如果不为空,则会放入「Deque」队列中缓存,减少GC
*/
void free(long handle, ByteBuffer nioBuffer) {
// #1 获取句柄「handle」低32位数值,该值表示节点id
int memoryMapIdx = memoryMapIdx(handle);
// #2 获取句柄「handle」高32位数值,该值表示bitmap索引值
int bitmapIdx = bitmapIdx(handle);
// #3 如果bitmqpIdx不为0,说明当前属于subpage释放
if (bitmapIdx != 0) { // free a subpage
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage != null && subpage.doNotDestroy;
// #4 别忘记,「PoolArena」对象中也存有「PoolSubpage」的引用哦
PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
synchronized (head) {
// 交给「PoolSubpage」专业人员释放吧
// 0x3FFFFFFF: 0011 1111 1111 1111 1111 1111 1111 1111,
// bitmapIdx & 0x3FFFFFFF: 保留低30位的值
// 为什么要抹去最高2位呢?因为生成handle是通过 |0x4000000000000000L 操作
if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {
return;
}
}
}
// #5 更新空闲内存信息
freeBytes += runLength(memoryMapIdx);
// #6 更新memoryMap信息
setValue(memoryMapIdx, depth(memoryMapIdx));
// #6 循环更新父节点的值
// 注意: 当更新父节点值时,有可能遇到两个兄弟节点的值都为初始值,
// 此时,父节点的值也为初始化而非两者之中最小值
updateParentsFree(memoryMapIdx);
// #7 缓存「ByteBuffer」对象
if (nioBuffer != null && cachedNioBuffers != null &&
cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
cachedNioBuffers.offer(nioBuffer);
}
}
// 无符号右移32位即可
private static int bitmapIdx(long handle) {
return (int) (handle >>> Integer.SIZE);
}
从源码可看出,PoolChunk 回收一块内存十分简单。回收 PoolSubpage 稍微麻烦一点,因为还需要和 PoolArena 中的 PoolSubpage 保持同步。
PoolSubpage 如何回收内存
// io.netty.buffer.PoolSubpage#free
/**
*「PoolSubpage」释放内存块
* 目标是修改相应「bitmap」的值
*
* @param head 来自「PooArena#PoolSubpage[]」数组的head节点
* @param bitmapIdx bitmap索引值
*/
boolean free(PoolSubpage<T> head, int bitmapIdx) {
if (elemSize == 0) {
return true;
}
// 确认bitmap[] 数组索引值
int q = bitmapIdx >>> 6;
// 确认在64位中的哪一位
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) != 0;
// 修改对应位为0
bitmap[q] ^= 1L << r;
// 设置可用位信息,待下次分配时直接使用
setNextAvail(bitmapIdx);
// 因为当numAvail=0时,表示无可用内存块,则会从「PoolArena#PoolSubpage[]」数组中移除
// 这次添加后就可以从新回到「PoolArena[]」队列中
if (numAvail ++ == 0) {
// 添加队列
addToPool(head);
return true;
}
if (numAvail != maxNumElems) {
// 还没有到达饱和,即完成这次分配后还有可用空闲,那直接返回
return true;
} else {
// 达到饱和,无内存块可用
if (prev == next) {
// 如果当前链表只有这么一个PoolSubpage对象,就不移除了
return true;
}
// 移除链表
doNotDestroy = false;
removeFromPool();
return false;
}
}
PoolSubpage 需要照顾到 PoolArena 的 PoolSubpage[] 变量,所以稍微代码量多一点。但逻辑十分清楚。看代码就十分明白,我就不强行总结了。
如何回收整个 PoolChunk
核心代码在 PoolArena
,根据有无 Cleaner
释放内存 memory
对象即可。
// io.netty.buffer.PoolArena.DirectArena#destroyChunk
@Override
protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
if (PlatformDependent.useDirectBufferNoCleaner()) {
PlatformDependent.freeDirectNoCleaner(chunk.memory);
} else {
PlatformDependent.freeDirectBuffer(chunk.memory);
}
}
小结
Netty 的内存回收是庞大的,两篇文章从 ByteBuf 体系结构讲到源码级的内存分配实现,似乎还是没有讲全讲透。这里只不过把最核心的代码拧出来给大家口味,最终还是希望看到这些文章的各位 DEBUG 调试走一遍。我深知自己的知识能力水平有限,文章部分地方的表达能力欠缺,有些简单的地方描述过于复杂,而恰恰需要讲清楚的地方一笔带过,敬请读者斧正。