vlambda博客
学习文章列表

实现一个简单的内存池

大家对于对象池,连接池等技术一定不陌生,有些资源创建和销毁都是非常耗时的事情,通常我们会把这部分资源池化管理来减少创建和销毁带来的性能损耗,在一些情况下申请和释放内存也是一件比较耗时的事情,因此我们可以通过池化内存来降低这部分损耗,但是内存池和对象池有些区别,对象池申请和释放的单位是,内存池申请和释放的单位则是一段内存,今天我们实现一个简单的内存池来理解内存池化概念。

1、ByteBuffer简介
在说内存池之前我们先简单了解下ByteBuffer(对ByteBuffer了解的同学可以直接跳过本章节),ByteBuffer是Java提供给我们直接访问内存的方式,我们通过它来实现对内存的读写,也就是说每个ByteBuffer实际上指向了一段内存,这段内存可以是堆内内存也可以是堆外内存,比如:

1// 堆内内存
2ByteBuffer heapMemory = ByteBuffer.allocate(64);
3// 堆外内存
4ByteBuffer directMemory = ByteBuffer.allocateDirect(64);

ByteBuffer是如何描述这么一段内存的呢?在ByteBuffer的父类中有这么几个字段(部分)

1// 用于描述当前操作的作为,比如读取位置,写入位置
2private int position = 0;
3// 用于描述内存可用大小,可读大小/可写大小
4private int limit;
5// 所申请的内存大小
6private int capacity;
7// 当申请的内存为堆外内存时,需要记录内存的地址。
8// (我们使用系统函数申请一段内存时,函数会返回一个地址回来,这个地址即为我们申请内存的起始地址)
9long address;

有了上面几个字段我们可以描述和读写某段内存了
ByteBuffer提供给我们一些操作的API,比如:

 1// 往内存写入数字100(int型)
2heapMemory.putInt(100);
3// 往内存写入数字200(long型)
4heapMemory.putLong(200);
5// 反转读写位置
6heapMemory.flip();
7// 从内存读取数字100(int型)
8int intValue = heapMemory.getInt();
9// 从内存读取数字200(long型)
10long longValue = heapMemory.getLong();

put和get比较好理解,我们看下flip方法:

1public final Buffer flip() {
2    limit = position;
3    position = 0;
4    mark = -1;
5    return this;
6}

1.将position赋值给limit,position是读写位置(一般为写入位置),表示限制容量为我最后一次读写的位置。
2.将position重置为0,表示从首部(一般是读取)开始读写这段内存。
3.重置mark为-1.

flip操作之后我们可以反向操作这段内存了,但是在一些经常需要传递ByteBuffer的场景可能会由于忘记flip或者重复flip导致程序出现bug,所以你会看到一些网络框架会对ByteBuffer进行一次封装,采用标记readIndex/writeIndex方式来操作ByteBuffer,这么做还有另一个原因,因为在部分场景下申请内存释放带来的overhead比较高,所以一些网络框架会将内存池化处理,今天我们来实现一个简单的内存池。

2、实现一个简单的内存池
在实现内存池之前我们先回顾下连接池的实现原理,首先我们需要有两张表,一张来记录所有的可用连接,一张来记录正在使用的连接,当连接释放后将连接从正在使用表更新到可用表,每次更新的单位是个/条等,内存池同样需要记录可用和已用,但是由于每次申请的内存大小不确定,我们不能单纯的用来作为申请和释放的单位,这里提供一个比较简单的实现思路,比如我们现在有8K内存可以作为内存池使用,我们把它等分成8份,这样每份就是1K字节,然后我们建两张表来描述这8份内存的使用情况:

Free表:长度为8,属性为bit,表示当前下标内存是否已使用(1表示free)
Block表:长度为8,属性为int,表示当内存属性为已用时,该已用内存的结束位置+1(即下一段内存的开始位置)

1// 将两张表初始化
20  1  2  3  4  5  6  7
31  1  1  1  1  1  1  1 // Free表
40  0  0  0  0  0  0  0 // Block表

申请一段长度为100字节的内存(A),因为100字节小于1Kb,所以申请的块大小为1,申请后的表结构为:

10  1  2  3  4  5  6  7
20  1  1  1  1  1  1  1 // Free表
31  0  0  0  0  0  0  0 // Block表

接着申请一段长度为1500字节的内存(B),通过计算可知申请的块大小为2,申请后的表结构为:

10  1  2  3  4  5  6  7
20  0  1  1  1  1  1  1 // Free表
31  3  0  0  0  0  0  0 // Block表

将内存A释放掉,注意释放仅修改Free表,不修改Block表,我们在申请内存时才去更新Block值,因为我们只看已申请内存块的Block值,释放后的表结构为

10  1  2  3  4  5  6  7
21  0  1  1  1  1  1  1 // Free表
31  3  0  0  0  0  0  0 // Block表

原理我们了解了,接下来用代码把它实现出来:
首先我们对ByteBuffer做下简单封装(为便于理解我们屏蔽掉与内存池不相关的代码):

 1public abstract class ByteBuf {
2
3    static final AtomicIntegerFieldUpdater<ByteBuf> refCntUpdater;
4
5    static {
6        refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(ByteBuf.class, "referenceCount");
7    }
8
9    volatile int referenceCount = 0;
10
11    int offset;
12    int capacity;
13    int unitOffset;
14
15    public final void release() {
16        int referenceCount = this.referenceCount;
17        if (referenceCount < 1) {
18            return;
19        }
20        if (refCntUpdater.compareAndSet(this, referenceCount, referenceCount - 1)) {
21            if (referenceCount == 1) {
22                release0();
23            }
24            return;
25        }
26        for (; ; ) {
27            referenceCount = this.referenceCount;
28            if (referenceCount < 1) {
29                return;
30            }
31            if (refCntUpdater.compareAndSet(this, referenceCount, referenceCount - 1)) {
32                if (referenceCount == 1) {
33                    allocator.release(this);
34                }
35                return;
36            } else {
37                if (BUF_THREAD_YIELD) {
38                    Thread.yield();
39                }
40            }
41        }
42    }
43
44    protected PooledHeapByteBuf produce(int unitOffset, int unitSize) {
45        int unit = allocator.getUnit();
46        this.offset = unitOffset * unit;
47        this.capacity = unitSize * unit;
48        this.unitOffset = unitOffset;
49        this.referenceCount = 1;
50        return this;
51    }
52
53}

接下来对每个属性及方法做下解释:

referenceCount :我们这里采用引用计数的方式管理内存的回收,新申请出来的内存引用计数为1,每次对该内存进行复制新增时引用计数加1,回收时减1,当引用计数为0时则将该内存回收至内存池。

refCntUpdater:大家在阅读源码的时候可能会经常看到AtomicIntegerFieldUpdater这个类型的字段,通常我们认为它和AtomicInteger是等价的,在一些频繁创建referenceCount的场景如果使用AtomicInteger会导致创建大量的该对象,一个是内存占用比较大,另一个是产生大量对象后给gc带来比较大的压力,所以一般我们推荐使用AtomicIntegerFieldUpdater

offset:该字段记录ByteBuf在内存池中的起始位置,比如我们前面申请的内存B,因为它从第1个内存块开始,所以它的offset值为1024

capacity:该字段记录ByteBuf的容量,比如内存B占用两个block,所以它的容量为2048

unitOffset:该字段记录ByteBuf的block值在内存中的起始位置,比如内存B的unitOffset值为1

release方法:该方法尝试对referenceCount进行原子性减1,如果referenceCount值为0则回收该内存

produce方法:申请内存时通过计算得到unitOffset和size,则成功申请一段内存,通过这两个值计算ByteBuf的位置和实际大小。

接下来我们看下Allocator的实现:

 1public final class PooledByteBufAllocator {
2
3    static final        int           ADDRESS_BITS_PER_WORD = 6;
4
5    private final    int[]            blockEnds;
6    private final    int              capacity;
7    private final    long[]           frees;
8    private final    ReentrantLock    lock    = new ReentrantLock();
9    private final    int              unit;
10    private          int              mark;
11    private volatile int              memoryBarrier;
12
13    public ByteBuf allocate(int limit) {
14        if (limit < 1) {
15            return ByteBuf.empty();
16        }
17        int           size = (limit + unit - 1) / unit;
18        int           blockStart;
19        ReentrantLock lock = this.lock;
20        lock.lock();
21        try {
22            if (!isRunning()) {
23                return unpooled.allocate(limit);
24            }
25            int mark = this.mark;
26            blockStart = allocate(mark, capacity, size);
27            if (blockStart == -1) {
28                blockStart = allocate(0, mark, size);
29                if (blockStart == -1) {
30                    return unpooled.allocate(limit);
31                }
32            }
33            return newByteBuf().produce(blockStart, size);
34        } finally {
35            lock.unlock();
36        }
37    }
38
39    private int allocate(int start, int end, int size) {
40        int[] blockEnds = this.blockEnds;
41        int   pos       = start;
42        int   limit     = pos + size;
43        if (limit >= end) {
44            return -1;
45        }
46        for (; ; ) {
47            if (!isFree(pos)) {
48                pos = blockEnds[pos];
49                limit = pos + size;
50                if (limit >= end) {
51                    return -1;
52                }
53                continue;
54            }
55            if (++pos == limit) {
56                int blockEnd   = pos + 1;
57                int blockStart = blockEnd - size;
58                clearFree(blockStart);
59                blockEnds[blockStart] = blockEnd;
60                this.mark = blockEnd;
61                return blockStart;
62            }
63        }
64    }
65
66    private static int wordIndex(int bitIndex) {
67        return bitIndex >> ADDRESS_BITS_PER_WORD;
68    }
69
70    private void setFree(int index) {
71        int wordIndex = wordIndex(index);
72        frees[wordIndex] |= (1L << index);
73    }
74
75    private void clearFree(int index) {
76        int wordIndex = wordIndex(index);
77        frees[wordIndex] &= ~(1L << index);
78    }
79
80    private boolean isFree(int index) {
81        int wordIndex = wordIndex(index);
82        return ((frees[wordIndex] & (1L << index)) != 0);
83    }
84
85    public void release(ByteBuf buf) {
86        ReentrantLock lock = this.lock;
87        lock.lock();
88        try {
89            memoryBarrier = 1;
90            setFree(buf.unitOffset());
91        } finally {
92            lock.unlock();
93        }
94    }
95
96}

接下来对每个属性及方法做下解释:

ADDRESS_BITS_PER_WORD:因为我们Free表用long[]表示的,每个long的大小为64bit,这里我们定义一个常量来表示每个long的位移位数。

blockEnds:Block表

capacity:内存池中block数量

frees:Free表

lock:内存池锁

unit:每个block字节数

mark:该字段用于记录最后一次申请内存的末尾位置,下次申请时从该位置往后申请

memoryBarrier:(更:不需要因lock里state)一般情况下我们的内存池是ThreadLocal的,但是在一些情况下可能需要从其他线程申请或释放内存,因此我们要求Free表和Block表对多核可见,所以我们在需要访问这两表时加上内存屏障确保这些数据是"最新的"。

allocate方法:我们从mark开始查找空闲内存,如果没找到则从0开始向mark查找,找到内存后标记mark。

release方法:注意下在ByteBuf第33行代码,在refCnt成功后会在这里对ByteBuf释放,释放时直接修改Free表。

对Free表操作的几个方法:这几个方法比较简单,做些set(index,0/1),isFree(index)等的操作,根据ADDRESS_BITS_PER_WORD找到对应的位置即可。

至此我们实现了一个非常简单的内存池,这个内存池在快速释放场景(非常驻内存)比较有效,我们也可以对它做些扩展,比如针对特小或者特大申请情况下的的处理,这里仅展示了部分代码,如果有兴趣查看完整代码请移步这里https://github.com/FireNio/firenio/tree/dev_wangkai/firenio-core/src/main/java/com/firenio/buffer


如果觉得对你有帮助的话欢迎点击在看,你的关注和转发是我最大的动力。