前面做了很多铺垫(Netty源码之内存管理(一)),带着大家熟悉了与内存分配相关的类的定义和分配逻辑。但并没有真正落实到 jemalloc 思想在源码是如何体现的。本章就是对 PoolChunk 逐字解析,死扣细节。在分析源码之前我们需要对分配的内存级别有一个清晰的定位,当分配 Huge 级别对象,直接使用 PoolChunk 包装,并没有复杂的分配逻辑。而对于 tiny&small&normal 级别来说,进行精细化的内存管理十分有必要的。
开局一张图:
满二叉树结构示意图.png
PoolChunk 本质就是维护这一棵满二叉树,这棵树默认管理 16MB 内存(这个值是可以手动设置)。memoryMap[] 是可变的数组,Netty 在这个数组上逻辑构建一棵满二叉树(当然也可以用链表之类的数据结构,但是随机索引效率不高,我们可以根据数组索引快速定位到某一层的第一个节点。但链表是不能做到的),depthMap[] 表示每个节点对应的深度,这是不可变的。一个 Long 型被分成高、低两部分,高 32 位记录小内存分配信息,低 32 位记录节点下标值。当分配 normal 级别内存时,只有低 32 位信息有用,它的值表示节点序号(起始值为 1),当分配 tiny&normal 级别内存时,高、低两部分确定某个 page 下的某个 subpage
还有一个比较有意思的是任意节点所管理的内存大小都是 2 的次幂,因此 Netty 会对用户申请的内存大小进行规格化的原因就在这里。任意规格值(当然不能超过 PoolChunkSize)都能找到合适的节点,除非没有节点可满足当前内存申请,那只能新创建一个 PoolChunk。还有另外一个疑问就是如何更新 memoryMap[] 数组呢,请看大屏幕:
内存分配流程图.png
index 表示节点索引,value 表示对应 memoryMap[index] 。
用户第一次申请 4MB 大小内存,由于内存大小确定,因此所在的层的位置也可以通过 maxOrder - (log2(normCapacity) - pageShifts) 确定,4MB 对应层数(也可理解为深度)为 2。
内存分配过程如下,其实对应方法 allocateNode(int) 实现逻辑: 首先判断节点 1 的使用状态: memoryMap1 != unusable(12) 表示节点 1 可用,但由于层数不匹配,所以获取子树节点 2,同时判断使用状态,发现 != 12 且层数不匹配,那就继续获取子节点 4,发现 !=12 且层数匹配,所以节点 4 就用作此次内存分配的节点,并更新 memoryMap[4]=12 表示节点 4 已使用,变量 handle 的低 32 位记录子节点位置信息,同时循环更新父节点的 memoryMap,父节点的值是子节点的 memoryMap 的最小值,所以 memoryMap[2]=2,memoryMap[1] =1。这样,第一次申请 4MB 大小内存就算完成了。
第二次申请 4MB 大小内存,当在第 2 层判断节点 4 的 memoryMap 值等于 12,会判断兄弟节点 5 是否满足分配。大家好好体会。

PoolChunk 内存分配

PoolChunk 是 jemalloc3.x 算法思想的体现,里面以 allocate 开头的 API 就是内存分配算法的实现。入口方法是 allocate(PooledByteBuf, int, int)

allocate(PooledByteBuf, int, int)

这个方法做的事情有:

  • 根据申请内存大小选择合适的分配策略。具体为如果 >=pageSize,使用 allocateRun() 方法分配,否则使用 allocateSubpage() 分配,它们都会返回句柄值 handle。
  • 初始化 ByteBuf

源码如下:

  1. // io.netty.buffer.PoolChunk#allocate
  2. /**
  3. *
  4. * @param buf 「ByteBuf」对象,它是物理内存的承托
  5. * @param reqCapacity 用户所需内存大小
  6. * @param normCapacity 规格值
  7. * @return
  8. */
  9. boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
  10. // 低32位: 节点索引
  11. // 高32位: 位图索引
  12. final long handle;
  13. // 位操作判断大小
  14. if ((normCapacity & subpageOverflowMask) != 0) {
  15. // #1 申请>=8KB
  16. handle = allocateRun(normCapacity);
  17. } else {
  18. // #2 申请<8KB
  19. handle = allocateSubpage(normCapacity);
  20. }
  21. if (handle < 0) {
  22. return false;
  23. }
  24. // #3 如果「PoolChunk」存在缓存的「ByteBuffer」就复用
  25. ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null;
  26. // #4 初始化ByteBuf内存相关信息
  27. initBuf(buf, nioBuffer, handle, reqCapacity);
  28. return true;
  29. }

allocateRun(int)

方法 allocateRun(int) 做的事情也不多,主要有:

  • 根据规格值计算所对应的深度 d
  • 调用 allocateNode(d) 完成内存分配
  • 更新剩余空闲值 ```java // io.netty.buffer.PoolChunk#allocateRun /**

    • 申请大小为「norCapacity」的内存块 */ private long allocateRun(int normCapacity) { // #1 计算当前规格值所对应树的深度d // log2(normCapacity): 获取当前值所对应最高位1的序号, // pageShifts: 默认值为 13,也就是 pageSize=8192 最高位为1的序号,因为这里分配的是 >=8192, // 所以需要减去它的偏移量,即从0开始。 // maxOrder: 默认值为 11,maxOrder - 偏移量 = 确切(合适)的树高度 // 可以想象normCapacity 从 8192 不断向上增长,那树的高度也不断变小 int d = maxOrder - (log2(normCapacity) - pageShifts);

      // #2 △在深度d的节点中寻找空闲节点并分配内存 int id = allocateNode(d); if (id < 0) {

      return id;
      

      }

      // #3 更新剩余空闲值 freeBytes -= runLength(id);

      // #4 返回信息 return id; }

private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1; // 31 /**

  • 获取以2为底的对数值
  • 思路是数有多少个0 */ private static int log2(int val) { // compute the (0-based, with lsb = 0) position of highest set bit i.e, log2 // Integer.numberOfLeadingZeros(int): 返回无符号整型的最高非零位前面的0的个数(包括符号位在内) // 31-0位数量=非0位数量,比如 0000…1000 Integer.numberOfLeadingZeros(0000…1000) = 28, // 31-28=3,其实就是获取最高位1的序号(从右至左,起始序号为0) return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val); } ```

    allocateNode(int)

    终于到了内存分配的重头戏,它属于节点粒度的分配逻辑。整体思路并不难,前面也通过图解讲述过,但由于采用了太多位运算所以看起来有点头晕。所以我们先熟悉一下部分位运算公式,规定
  • id^=1: id 为奇数则 -1,id 为偶数则 +1。这里用来获取偶数的兄弟节点。比如 id=2,则其兄弟节点为 id^=1 = 3。
  • id<<=1: 相当于 id=id*2,目的是跳转到节点 id 的左子节点。比如 id = 2,它的左子节点值为 4。
  • 1<<d: 表示 1*2 。对在任意深度为d的节点,节点的索引值在 2 到 2-1 范围内。比如深度为 1,则索引值在 [2, 3],当深度为 2,索引值在 [4, 7] 范围内。
  • initial=-(1 << d): 对 2取反,目的是用来判断与目标深度值 d 是否匹配,可以把 initial 可以看成是掩码。当匹配目标深度,有 id & initial == initial,若当前深度<目标深度,有 id & initial ==0。比如目标深度为 2,那 initial=-4,当id=1时,id&initial=0,说明还没有到达目标深度,获取最左子节点(id=id*2)2,此时 2 & initial=0,说明还没有到达目标深度,继续获取最左子节点 4,此时 4&-4=4,此时就找到了目标深度。然后就可以从左到右找寻空闲节点并进行内存分配。

allocateNode(int depth) 目标是在深度 d 中找到空闲的节点并,如果存在按规则更新 memoryMap 相应节点的值并返回节点序号。思路也是比较清晰,从头结点开始判断,如果可分配但深度不匹配则获取左子节点,如果不可分配就返回 -1。继续判断左子节点是否可分配以及深度是否匹配,如果都不匹配,继续重复上面步骤。如果深度匹配但当前节点不可分配(val>d=true),那就获取兄弟节点继续重复上述步骤。如果深度匹配且当前节点可分配,则该节点就是此次申请的目标节点,并将它设置为 unusable 不可用状态,同时,按公式 memoryMap[父节点] = Min(子节点1,子节点2) 循环更新 memoryMap。
还有一个有意思的点需要注意,就是 memoryMap 存储的值,它是这棵树的核心。初始化的值以及后续更新也做得非常巧妙,我说不上来,大家慢慢休会吧。通过内存分配示意图再来休会一下上面的文字描述:
分配示意图_1.png
分配示意图_2.png
相关源码解析如下
PoolChunk_AllocateNode.png

// io.netty.buffer.PoolChunk#allocateNode
/**
 * 在深度「d」查找空闲节点
 * @param     d  
 * @return     空闲节点索引,如果返回-1表示当前「PoolChunk」无法满足此次内存分配
 */
private int allocateNode(int d) {
    int id = 1;
    // 用于判断节点是否匹配深度「d」
    int initial = - (1 << d);

    // #1 判断根节点是否可用
    byte val = value(id);
    if (val > d) {
        // 根节点不可用,返回「-1」
        return -1;
    }

    // #2 从上往下、从左往右遍历节点。直到找到合适的空闲节点并返回索引值。
    // id&initial=0: 表明节点id 匹配 深度d,否则不匹配
    // val<d: 表明节点id有空闲内存可分配,若val>d表明节点id无空闲内存可分配
    // 循环退出条件是 val>d 或 id&initial !=0,当匹配到空闲节点时,通常 val<d 且 id&initial !=0
    while (val < d || (id & initial) == 0) { 

        // id = id * 2;
        id <<= 1;
        // 获取momeryMap[id]的值
        val = value(id);

        // 如果val>d,表明节点「id」不满足此次分配
        // 这个判断十分巧妙,前面也出现过一次,用来判断根节点是否满足深度为d的内存申请,
        // 如果val>d则不满足,因为,父节点的val值是两个子节点的最小值,
        // 因此如果父节点的val>d那说明子节点无法满足当前深度d的内存申请
        if (val > d) {
            // 获取兄弟节点, 继续查找
            id ^= 1;
            val = value(id);
        }
    }
    byte value = value(id);
    assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
                                                                  value, id & initial, d);
    // #3 将选中的节点标记为「unusable」
    setValue(id, unusable); 

    // #4 循环更新父节点「memoryMap」的值,其值取两个子节点最小的那个值
    updateParentsAlloc(id);

    // #5 Return
    return id;
}

private byte value(int id) {
    return memoryMap[id];
}

/**
 * 从节点id开始一直到根节点,更新对应的memoryMap的值
 */
private void updateParentsAlloc(int id) {
    while (id > 1) {
        int parentId = id >>> 1;
        byte val1 = value(id);
        byte val2 = value(id ^ 1);
        byte val = val1 < val2 ? val1 : val2;
        setValue(parentId, val);
        id = parentId;
    }
}

allocateNode(int depth) 是分配 Normal 级别的核心方法,本质是维护 memoryMap[] 数组,遍历树查找空闲内存满足本次内存申请。

allocateSubpage(int)

这个方法是申请 Tiny&Small 级别内存。上一章节讲过对该内存分配的思想: 简单一句话,将某个空闲的 page 拆分成若干个 subpage,使用对象 PoolSubpage 对这些若干个 subpage 进行管理。
PoolChunk#allocateSubpage.png

// io.netty.buffer.PoolChunk#allocateSubpage
/**
 * 申请「tiny&small」级别内存
 */
private long allocateSubpage(int normCapacity) {

    // #1 先去「PoolSubpagePools[]」数组中是否能找到合适的「PoolSubpage」
    // 如果存在,则使用现成的,否则得新建一个「PoolSubpage」对象后再放入「PoolSubpagePools[]」数组中
    // 「PoolSubpage[]」每个节点都会初始化一个head节点,所以这里返回一定不为空
    PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);

    // 子页直接可叶子节点层查找
    int d = maxOrder;
    // 对头结点上锁,通过数组减少锁的粒度,提高并发性能
    synchronized (head) {
        // #2 在深度d获取空闲子叶
        int id = allocateNode(d);
        if (id < 0) {
            return id;
        }

        // 获取成功,对该「page」进行拆分改造
        final PoolSubpage<T>[] subpages = this.subpages;
        final int pageSize = this.pageSize;

        freeBytes -= pageSize;

        // #3 获取偏移量(相对于 maxSubpageAlloc)
        // 可以理解为 subpageIdx = id - maxSubpageAllocs
        int subpageIdx = subpageIdx(id);
        // 定位到「PoolChunk」内部的「PoolSubpage[]」
        PoolSubpage<T> subpage = subpages[subpageIdx];

        if (subpage == null) {
            // 如果不存在,则创建
            subpage = new PoolSubpage<T>(head,             // 这个头结点来自「PoolArena」
                                         this,             // 当前「PoolChunk」
                                         id,             // 节点「id」
                                         runOffset(id), // 节点「id」字节偏移量
                                         pageSize,        // 页大小
                                         normCapacity); // 每个「subpage」等份大小
            // 记录新创建的「PoolSubpage」
            subpages[subpageIdx] = subpage;
        } else {
            subpage.init(head, normCapacity);
        }

        // #4 使用「PoolSubpage」分配内存
        return subpage.allocate();
    }
}

/**
 * 移除最高位,获得偏移量
 */
private int subpageIdx(int memoryMapIdx) {
    return memoryMapIdx ^ maxSubpageAllocs; // remove highest set bit, to get offset
}

/**
 * 获取节点「id」字节偏移量。
 */
private int runOffset(int id) {
    // << 优先级高于 ^
    // depth(id): 获取节点「id」对应的深度
    // int index = 1 << depth(id) => 2^depth(id),获取节点「id」所在层的最左节点的索引值
    // id ^ index => |id-index|,节点id相对所在层的最左节点的偏移量
    int shift = id ^ 1 << depth(id);

    // runLength(id): 获取节点id的单位值(byte)比如,对于序号为4的节点,它所对应的值为 4194304,即 4MB
    // shift * runLength(id): 也就是前面已用的数
    // size=runLength(id): 节点id所分配内存大小
    // shift*size: 字节偏移量
    return shift * runLength(id);
}

/**
 * 获取节点「id」所分配的内存大小,单位:字节
 * 等价于 chunkSize/(2^maxOrder)
 */
private int runLength(int id) {
    // log2ChunkSize=log2chunkSize
    return 1 << (log2ChunkSize - depth(id));
}

这里对 allocateSubpage(int) 源码做个小总结: 这个方法主要的目的是创建一个 PoolSubpage 对象,然后委托这个对象完成 tiny&small 级别内存分配。PoolChunk 在内部使用 PoolSubage[] 数组保存 PoolSubpage 对象引用。PoolSubage[] 长度和二叉树的叶子节点个数相同,它们一一对应。PoolSubpage 对象是 page 的化身,它拥有管理 pageSize 大小内存的能力。PoolChunk 只需管理 page,两者分工明确。

PoolSubpage 内存分配

PoolSubpage 内部相关变量之前已经解释过,在这里解释的。它管理内存的逻辑是将 pageSize 大小的内存块等分成若干个子块,子块个数是根据本次申请内存大小所决定,比如申请 1KB 内存,那么会找到一个空闲的 page 并将其拆分成 8 等份(8KB/1KB=8)。并使用位图记录每份子块的使用状态,1 表示已使用,0 表示未使用。最多可分为 512 等份,底层使用 long[] 数组存储位图信息。64 位的句柄值的高 32 位存储位信息,低 32 位存储储节点索引值。因此,核心的问题是如何使用 long[] 数组记录使用情况呢?
源码之下无秘密:
PoolSubpage#allocate.png

// io.netty.buffer.PoolSubpage#allocate
/**
 * 「PoolSubpage」内存分配入口
 * @return 返回此次内存分配在bitmap的索引值
 */
long allocate() {
    if (elemSize == 0) {
        return toHandle(0);
    }

    // 无可用分片,返回-1
    if (numAvail == 0 || !doNotDestroy) {
        return -1;
    }

    // #1 获取下一个可用的分片索引(绝对值)
    // 第一个索引值为0
    final int bitmapIdx = getNextAvail();

    // #2 除以64,确定bitmap[]哪一个
    // 第一个bitmap索引值为0 
    int q = bitmapIdx >>> 6;

    // #3 &63: 确认64位长度long的哪一位
    // 除以64取余,获取当前绝对 id 的偏移量
    // 63: 0011 1111
    int r = bitmapIdx & 63;
    assert (bitmap[q] >>> r & 1) == 0;

    // 更新第r位的值为1
    // << 优先级高于 |=
    bitmap[q] |= 1L << r;

    // 更新可用数量
    if (-- numAvail == 0) {
        // 如果可用数量为0,表示子页中再无可分配的空间
        // 需要从双向链表中移除
        removeFromPool();
    }

    // 将bitmapIdx 转换为long存储,long 高32位存储的是小内存位置索引
    return toHandle(bitmapIdx);
}

/**
 * 获取下一个可用的「分片内存块」
 */
private int getNextAvail() {
    int nextAvail = this.nextAvail;
    // nextAvail>=0,表明可以直接使用
    if (nextAvail >= 0) {
        this.nextAvail = -1;
        return nextAvail;
    }
    // nextAvaild<0,需要寻找下一个可用的「分片内存块」
    return findNextAvail();
}

/**
 * 获取下一个可用的「分片内存块」
 * 本质是搜索 bitmap[] 数组为0的索引值
 */
private int findNextAvail() {
    final long[] bitmap = this.bitmap;
    final int bitmapLength = this.bitmapLength;
    // 循环遍历
    for (int i = 0; i < bitmapLength; i ++) {
        long bits = bitmap[i];

        // #1 先判断整个bits是否有「0」位
        // 不可用时bits为「0XFFFFFFFFFFFFFFFF」,~bits=0
        // 可用时~bits !=0
        if (~bits != 0) {
            // #2 找寻可用的位
            return findNextAvail0(i, bits);
        }
    }
    return -1;
}

/**
 * 搜索下一个可用位
 * 
 */
private int findNextAvail0(int i, long bits) {
    final int maxNumElems = this.maxNumElems;

    // i << 6 => i * 2^6=i*64
    // 想象把long[]展开,baseVal就是基址
    final int baseVal = i << 6;

    for (int j = 0; j < 64; j ++) {
        // bits & 1: 判断最低位是否为0
        if ((bits & 1) == 0) {
            // 找到空闲子块,组装数据
            // baseVal|j => baseVal + j,基址+位的偏移值
            int val = baseVal | j;
            // 不能越界
            if (val < maxNumElems) {
                return val;
            } else {
                break;
            }
        }

        // 无符号右移1位
        bits >>>= 1;
    }
    return -1;
}

// io.netty.buffer.PoolSubpage#toHandle
/** 
 * 将bitmap索引信息写入高32位,memoryMapIdx信息写入低32位
 *
 * 0x4000000000000000L: 最高位为1,其他所有位为0。
 * 为什么使用0x4000000000000000L数值?
 * 是因为对于第一次小内存分配情况,如果高32位为0,则返回句柄值的高位为 0,
 * 低32位为 2048(第11层的第一个节点的索引值),但是这个返回值并不会当成子页来处理,从而影响后续的逻辑判断
 * 详见 https://blog.csdn.net/wangwei19871103/article/details/104356566
 * @param bitmapIdx  bitmap索引值
 * @return             句柄值
 */
private long toHandle(int bitmapIdx) {
    return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
}

bitmap 填充示意图
bitmap填充图.png
小结上面的源码: PoolSubpage 使用 8 个 long 值存储子块的使用情况,句柄高 32 位存储位图索引值,低 32 位存储 节点索引值。通过大量的位运算提高了性能,通过源码阅读,也提升了位编程应用技巧。
这有一个关于 PoolSubpage 的是如何和 PoolArena 配合使用,因为我们知道,PoolArena 也存有 PoolSubpage[] 数组对象,这些数组对象是怎么被添加的呢?答案在在初始化 PoolSubpage 时就添加到对应的 PoolArena#poolsubpage[] 中了。源码如下:

// io.netty.buffer.PoolSubpage
/**
 * 「PoolSubpage」构造器
 * @param head           从「PoolArena」对象中获取「PoolSubpage」结点做头结点
 * @param chunk          当前「PoolSubpage」属性的「PoolChunk」对象
 * @param memoryMapIdx   所属的「Page」的节点值
 * @param runOffset      对存储容器为「byte[]」有用,表示偏移量
 * @param pageSize       页大小,默认值为: 8KB
 * @param elemSize       元素个数
 */
PoolSubpage(PoolSubpage<T> head, 
            PoolChunk<T> chunk, 
            int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
    this.chunk = chunk;
    this.memoryMapIdx = memoryMapIdx;
    this.runOffset = runOffset;
    this.pageSize = pageSize;
    bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64

    // 添加到「head」链表中
    init(head, elemSize);
}

// 初始化「PoolSubpage」
void init(PoolSubpage<T> head, int elemSize) {
    doNotDestroy = true;
    this.elemSize = elemSize;
    if (elemSize != 0) {
        // 初始化各类参数
        maxNumElems = numAvail = pageSize / elemSize;
        nextAvail = 0;
        // 根据元素个数确定所需要bitmap个数,即确认「bitmapLength」值
        bitmapLength = maxNumElems >>> 6;
        if ((maxNumElems & 63) != 0) {
            bitmapLength ++;
        }

        for (int i = 0; i < bitmapLength; i ++) {
            bitmap[i] = 0;
        }
    }
    // 添加至双向链表中,供后续分配使用
    addToPool(head);
}

private void addToPool(PoolSubpage<T> head) {
    assert prev == null && next == null;
    prev = head;
    next = head.next;
    next.prev = this;
    head.next = this;
}

以上,就是对 PoolSubpage 内存分析的源码解析,理清思路,功能拆解之后并不困难。

PoolChunk 如何回收内存

讲完了物理内存分配,还没有讲 PoolChunk 是如何回收内存。

// io.netty.buffer.PoolChunk#free
/**
 * 「PoolChunk」释放「handle」表示的内存块。
 * 本质是修改对应节点的「memoryMap」值。
 * 参数「nioByteBuf」如果不为空,则会放入「Deque」队列中缓存,减少GC
 */
void free(long handle, ByteBuffer nioBuffer) {

    // #1 获取句柄「handle」低32位数值,该值表示节点id
    int memoryMapIdx = memoryMapIdx(handle);

    // #2 获取句柄「handle」高32位数值,该值表示bitmap索引值
    int bitmapIdx = bitmapIdx(handle);

    // #3 如果bitmqpIdx不为0,说明当前属于subpage释放
    if (bitmapIdx != 0) { // free a subpage
        PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
        assert subpage != null && subpage.doNotDestroy;

        // #4 别忘记,「PoolArena」对象中也存有「PoolSubpage」的引用哦
        PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
        synchronized (head) {
            // 交给「PoolSubpage」专业人员释放吧
            // 0x3FFFFFFF: 0011 1111 1111 1111 1111 1111 1111 1111,
            // bitmapIdx & 0x3FFFFFFF: 保留低30位的值
            // 为什么要抹去最高2位呢?因为生成handle是通过 |0x4000000000000000L 操作
            if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {
                return;
            }
        }
    }

    // #5 更新空闲内存信息
    freeBytes += runLength(memoryMapIdx);

    // #6 更新memoryMap信息
    setValue(memoryMapIdx, depth(memoryMapIdx));

    // #6 循环更新父节点的值
    // 注意: 当更新父节点值时,有可能遇到两个兄弟节点的值都为初始值,
    // 此时,父节点的值也为初始化而非两者之中最小值
    updateParentsFree(memoryMapIdx);

    // #7 缓存「ByteBuffer」对象
    if (nioBuffer != null && cachedNioBuffers != null &&
        cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
        cachedNioBuffers.offer(nioBuffer);
    }
}

// 无符号右移32位即可
private static int bitmapIdx(long handle) {
    return (int) (handle >>> Integer.SIZE);
}

从源码可看出,PoolChunk 回收一块内存十分简单。回收 PoolSubpage 稍微麻烦一点,因为还需要和 PoolArena 中的 PoolSubpage 保持同步。

PoolSubpage 如何回收内存

// io.netty.buffer.PoolSubpage#free
/**
 *「PoolSubpage」释放内存块
 * 目标是修改相应「bitmap」的值
 * 
 * @param head 来自「PooArena#PoolSubpage[]」数组的head节点
 * @param bitmapIdx bitmap索引值
 */
boolean free(PoolSubpage<T> head, int bitmapIdx) {
    if (elemSize == 0) {
        return true;
    }

    // 确认bitmap[] 数组索引值
    int q = bitmapIdx >>> 6;
    // 确认在64位中的哪一位
    int r = bitmapIdx & 63;
    assert (bitmap[q] >>> r & 1) != 0;

    // 修改对应位为0
    bitmap[q] ^= 1L << r;

    // 设置可用位信息,待下次分配时直接使用
    setNextAvail(bitmapIdx);

    // 因为当numAvail=0时,表示无可用内存块,则会从「PoolArena#PoolSubpage[]」数组中移除
    // 这次添加后就可以从新回到「PoolArena[]」队列中
    if (numAvail ++ == 0) {
        // 添加队列
        addToPool(head);
        return true;
    }

    if (numAvail != maxNumElems) {
        // 还没有到达饱和,即完成这次分配后还有可用空闲,那直接返回
        return true;
    } else {
        // 达到饱和,无内存块可用
        if (prev == next) {
            // 如果当前链表只有这么一个PoolSubpage对象,就不移除了
            return true;
        }

        // 移除链表
        doNotDestroy = false;
        removeFromPool();
        return false;
    }
}

PoolSubpage 需要照顾到 PoolArena 的 PoolSubpage[] 变量,所以稍微代码量多一点。但逻辑十分清楚。看代码就十分明白,我就不强行总结了。

如何回收整个 PoolChunk

核心代码在 PoolArena,根据有无 Cleaner 释放内存 memory 对象即可。

// io.netty.buffer.PoolArena.DirectArena#destroyChunk
@Override
protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
    if (PlatformDependent.useDirectBufferNoCleaner()) {
        PlatformDependent.freeDirectNoCleaner(chunk.memory);
    } else {
        PlatformDependent.freeDirectBuffer(chunk.memory);
    }
}

小结

Netty 的内存回收是庞大的,两篇文章从 ByteBuf 体系结构讲到源码级的内存分配实现,似乎还是没有讲全讲透。这里只不过把最核心的代码拧出来给大家口味,最终还是希望看到这些文章的各位 DEBUG 调试走一遍。我深知自己的知识能力水平有限,文章部分地方的表达能力欠缺,有些简单的地方描述过于复杂,而恰恰需要讲清楚的地方一笔带过,敬请读者斧正。

我的公众号

公众号封面.png