实现一个简单的内存池
大家对于对象池,连接池等技术一定不陌生,有些资源创建和销毁都是非常耗时的事情,通常我们会
把这部分资源池化管理来减少创建和销毁带来的性能损耗
,在一些情况下申请和释放内存也是一件比较耗时的事情,因此我们可以通过池化内存来降低这部分损耗,但是内存池和对象池有些区别,对象池申请和释放的单位是个
,内存池申请和释放的单位则是一段内存
,今天我们实现一个简单的内存池来理解内存池化概念。
1、ByteBuffer简介
在说内存池之前我们先简单了解下ByteBuffer(对ByteBuffer了解的同学可以直接跳过本章节),ByteBuffer是Java提供给我们直接访问内存的方式,我们通过它来实现对内存的读写,也就是说每个ByteBuffer实际上指向了一段内存,这段内存可以是堆内内存也可以是堆外内存,比如:
1// 堆内内存
2ByteBuffer heapMemory = ByteBuffer.allocate(64);
3// 堆外内存
4ByteBuffer directMemory = ByteBuffer.allocateDirect(64);
ByteBuffer是如何描述这么一段内存的呢?在ByteBuffer的父类中有这么几个字段(部分)
1// 用于描述当前操作的作为,比如读取位置,写入位置
2private int position = 0;
3// 用于描述内存可用大小,可读大小/可写大小
4private int limit;
5// 所申请的内存大小
6private int capacity;
7// 当申请的内存为堆外内存时,需要记录内存的地址。
8// (我们使用系统函数申请一段内存时,函数会返回一个地址回来,这个地址即为我们申请内存的起始地址)
9long address;
有了上面几个字段我们可以描述和读写某段内存了
ByteBuffer提供给我们一些操作的API,比如:
1// 往内存写入数字100(int型)
2heapMemory.putInt(100);
3// 往内存写入数字200(long型)
4heapMemory.putLong(200);
5// 反转读写位置
6heapMemory.flip();
7// 从内存读取数字100(int型)
8int intValue = heapMemory.getInt();
9// 从内存读取数字200(long型)
10long longValue = heapMemory.getLong();
put和get比较好理解,我们看下flip方法:
1public final Buffer flip() {
2 limit = position;
3 position = 0;
4 mark = -1;
5 return this;
6}
1.将position赋值给limit,position是读写位置(一般为写入位置),表示限制容量为我最后一次读写的位置。
2.将position重置为0,表示从首部(一般是读取)开始读写这段内存。
3.重置mark为-1.
flip操作之后我们可以反向操作这段内存了,但是在一些经常需要传递ByteBuffer的场景可能会由于忘记flip或者重复flip导致程序出现bug
,所以你会看到一些网络框架会对ByteBuffer进行一次封装,采用标记readIndex/writeIndex方式来操作ByteBuffer,这么做还有另一个原因,因为在部分场景下申请内存释放带来的overhead比较高,所以一些网络框架会将内存池化处理,今天我们来实现一个简单的内存池。
2、实现一个简单的内存池
在实现内存池之前我们先回顾下连接池的实现原理,首先我们需要有两张表,一张来记录所有的可用连接,一张来记录正在使用的连接,当连接释放后将连接从正在使用表更新到可用表,每次更新的单位是个/条
等,内存池同样需要记录可用和已用,但是由于每次申请的内存大小不确定,我们不能单纯的用个
来作为申请和释放的单位,这里提供一个比较简单的实现思路,比如我们现在有8K内存可以作为内存池使用,我们把它等分成8份,这样每份就是1K字节,然后我们建两张表来描述这8份内存的使用情况:
Free表:
长度为8,属性为bit,表示当前下标内存是否已使用(1表示free)Block表:
长度为8,属性为int,表示当内存属性为已用时,该已用内存的结束位置+1(即下一段内存的开始位置)
1// 将两张表初始化
20 1 2 3 4 5 6 7
31 1 1 1 1 1 1 1 // Free表
40 0 0 0 0 0 0 0 // Block表
申请一段长度为100字节的内存(A),因为100字节小于1Kb,所以申请的块大小为1,申请后的表结构为:
10 1 2 3 4 5 6 7
20 1 1 1 1 1 1 1 // Free表
31 0 0 0 0 0 0 0 // Block表
接着申请一段长度为1500字节的内存(B),通过计算可知申请的块大小为2,申请后的表结构为:
10 1 2 3 4 5 6 7
20 0 1 1 1 1 1 1 // Free表
31 3 0 0 0 0 0 0 // Block表
将内存A释放掉,注意释放仅修改Free表,不修改Block表,我们在申请内存时才去更新Block值,因为我们只看已申请内存块的Block值
,释放后的表结构为
10 1 2 3 4 5 6 7
21 0 1 1 1 1 1 1 // Free表
31 3 0 0 0 0 0 0 // Block表
原理我们了解了,接下来用代码把它实现出来:
首先我们对ByteBuffer做下简单封装(为便于理解我们屏蔽掉与内存池不相关的代码):
1public abstract class ByteBuf {
2
3 static final AtomicIntegerFieldUpdater<ByteBuf> refCntUpdater;
4
5 static {
6 refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(ByteBuf.class, "referenceCount");
7 }
8
9 volatile int referenceCount = 0;
10
11 int offset;
12 int capacity;
13 int unitOffset;
14
15 public final void release() {
16 int referenceCount = this.referenceCount;
17 if (referenceCount < 1) {
18 return;
19 }
20 if (refCntUpdater.compareAndSet(this, referenceCount, referenceCount - 1)) {
21 if (referenceCount == 1) {
22 release0();
23 }
24 return;
25 }
26 for (; ; ) {
27 referenceCount = this.referenceCount;
28 if (referenceCount < 1) {
29 return;
30 }
31 if (refCntUpdater.compareAndSet(this, referenceCount, referenceCount - 1)) {
32 if (referenceCount == 1) {
33 allocator.release(this);
34 }
35 return;
36 } else {
37 if (BUF_THREAD_YIELD) {
38 Thread.yield();
39 }
40 }
41 }
42 }
43
44 protected PooledHeapByteBuf produce(int unitOffset, int unitSize) {
45 int unit = allocator.getUnit();
46 this.offset = unitOffset * unit;
47 this.capacity = unitSize * unit;
48 this.unitOffset = unitOffset;
49 this.referenceCount = 1;
50 return this;
51 }
52
53}
接下来对每个属性及方法做下解释:
referenceCount :
我们这里采用引用计数的方式管理内存的回收,新申请出来的内存引用计数为1,每次对该内存进行复制新增时引用计数加1,回收时减1,当引用计数为0时则将该内存回收至内存池。
refCntUpdater:
大家在阅读源码的时候可能会经常看到AtomicIntegerFieldUpdater
这个类型的字段,通常我们认为它和AtomicInteger
是等价的,在一些频繁创建referenceCount
的场景如果使用AtomicInteger
会导致创建大量的该对象,一个是内存占用比较大,另一个是产生大量对象后给gc带来比较大的压力,所以一般我们推荐使用AtomicIntegerFieldUpdater
offset:
该字段记录ByteBuf在内存池中的起始位置,比如我们前面申请的内存B,因为它从第1个内存块开始,所以它的offset值为1024
capacity:
该字段记录ByteBuf的容量,比如内存B占用两个block,所以它的容量为2048
unitOffset:
该字段记录ByteBuf的block值在内存中的起始位置,比如内存B的unitOffset值为1
release方法:
该方法尝试对referenceCount
进行原子性减1,如果referenceCount
值为0则回收该内存
produce方法:
申请内存时通过计算得到unitOffset和size,则成功申请一段内存,通过这两个值计算ByteBuf的位置和实际大小。
接下来我们看下Allocator的实现:
1public final class PooledByteBufAllocator {
2
3 static final int ADDRESS_BITS_PER_WORD = 6;
4
5 private final int[] blockEnds;
6 private final int capacity;
7 private final long[] frees;
8 private final ReentrantLock lock = new ReentrantLock();
9 private final int unit;
10 private int mark;
11 private volatile int memoryBarrier;
12
13 public ByteBuf allocate(int limit) {
14 if (limit < 1) {
15 return ByteBuf.empty();
16 }
17 int size = (limit + unit - 1) / unit;
18 int blockStart;
19 ReentrantLock lock = this.lock;
20 lock.lock();
21 try {
22 if (!isRunning()) {
23 return unpooled.allocate(limit);
24 }
25 int mark = this.mark;
26 blockStart = allocate(mark, capacity, size);
27 if (blockStart == -1) {
28 blockStart = allocate(0, mark, size);
29 if (blockStart == -1) {
30 return unpooled.allocate(limit);
31 }
32 }
33 return newByteBuf().produce(blockStart, size);
34 } finally {
35 lock.unlock();
36 }
37 }
38
39 private int allocate(int start, int end, int size) {
40 int[] blockEnds = this.blockEnds;
41 int pos = start;
42 int limit = pos + size;
43 if (limit >= end) {
44 return -1;
45 }
46 for (; ; ) {
47 if (!isFree(pos)) {
48 pos = blockEnds[pos];
49 limit = pos + size;
50 if (limit >= end) {
51 return -1;
52 }
53 continue;
54 }
55 if (++pos == limit) {
56 int blockEnd = pos + 1;
57 int blockStart = blockEnd - size;
58 clearFree(blockStart);
59 blockEnds[blockStart] = blockEnd;
60 this.mark = blockEnd;
61 return blockStart;
62 }
63 }
64 }
65
66 private static int wordIndex(int bitIndex) {
67 return bitIndex >> ADDRESS_BITS_PER_WORD;
68 }
69
70 private void setFree(int index) {
71 int wordIndex = wordIndex(index);
72 frees[wordIndex] |= (1L << index);
73 }
74
75 private void clearFree(int index) {
76 int wordIndex = wordIndex(index);
77 frees[wordIndex] &= ~(1L << index);
78 }
79
80 private boolean isFree(int index) {
81 int wordIndex = wordIndex(index);
82 return ((frees[wordIndex] & (1L << index)) != 0);
83 }
84
85 public void release(ByteBuf buf) {
86 ReentrantLock lock = this.lock;
87 lock.lock();
88 try {
89 memoryBarrier = 1;
90 setFree(buf.unitOffset());
91 } finally {
92 lock.unlock();
93 }
94 }
95
96}
接下来对每个属性及方法做下解释:
ADDRESS_BITS_PER_WORD:
因为我们Free表用long[]
表示的,每个long的大小为64bit,这里我们定义一个常量来表示每个long的位移位数。
blockEnds:
Block表
capacity:
内存池中block数量
frees:
Free表
lock:
内存池锁
unit:
每个block字节数
mark:
该字段用于记录最后一次申请内存的末尾位置,下次申请时从该位置往后申请
memoryBarrier:
(更:不需要因lock里state)一般情况下我们的内存池是ThreadLocal
的,但是在一些情况下可能需要从其他线程申请或释放内存,因此我们要求Free表和Block表对多核可见,所以我们在需要访问这两表时加上内存屏障确保这些数据是"最新的"。
allocate方法:
我们从mark开始查找空闲内存,如果没找到则从0开始向mark查找,找到内存后标记mark。
release方法:
注意下在ByteBuf第33行代码,在refCnt成功后会在这里对ByteBuf释放,释放时直接修改Free表。
对Free表操作的几个方法:
这几个方法比较简单,做些set(index,0/1),isFree(index)等的操作,根据ADDRESS_BITS_PER_WORD
找到对应的位置即可。
至此我们实现了一个非常简单的内存池,这个内存池在快速释放场景(非常驻内存)比较有效,我们也可以对它做些扩展,比如针对特小或者特大申请情况下的的处理,这里仅展示了部分代码,如果有兴趣查看完整代码请移步这里https://github.com/FireNio/firenio/tree/dev_wangkai/firenio-core/src/main/java/com/firenio/buffer
如果觉得对你有帮助的话欢迎点击
在看
,你的关注和转发是我最大的动力。