Netty9# Netty抽象内存分配器实现原理
前言
本文通过分析抽象内存分配器API梳理其基于堆内存、堆外内存分配的实现原理。最后走查了CompositeByteBuf这种类似数据库视图的实现原理。
堆外内存&堆内存
分配方式 | 优点 | 缺点 |
---|---|---|
堆内存 | JVM负责内存的分配与回收 | 数据过多会引起频繁GC和停顿; 多一次拷贝,在用户态分配、I/O通信需要数据拷贝到内核态 |
堆外内存 | I/O性能高,直接在内核态分配 降低GC频率和停顿 |
内存分配和收回比较慢、需要手动处理 |
内存分配器类图
字节缓存的分配出自ByteBufAllocator,其实现类AbstractByteBufAllocator(抽象类)、PooledByteBufAllocator(池化内存分配器)、UnpooledByteBufAllocator(非池化内存分配器)、PreferHeapByteBufAllocator(堆内存分配器)、PreferredDirectByteBufAllocator(堆外内存分配器)。
主要接口
接口 | 说明 |
---|---|
ByteBuf buffer() | 分配一块字节缓存,由其实现类决定堆外内存或者堆内存 |
ByteBuf ioBuffer() | 系统支持UNSAFE和CLEANER则优先分配堆外内存;否则分配堆内存。 |
ByteBuf heapBuffer() | 分配堆内存字节缓存区 |
ByteBuf directBuffer() | 分配堆外内存字节缓存区 |
CompositeByteBuf compositeBuffer() | 分配一个CompositeByteBuf(将多个buffers组合成一个buffer) 由实现类决定堆内存或者堆外内存 |
下面走查下抽象内存分配器AbstractByteBufAllocator的API。
构造函数
private final boolean directByDefault;
private final ByteBuf emptyBuf;
protected AbstractByteBufAllocator(boolean preferDirect) {
directByDefault = preferDirect && PlatformDependent.hasUnsafe(); // 注解@1
emptyBuf = new EmptyByteBuf(this);
}
注解@1 directByDefault是否使用堆外内存分配,满足两个条件。preferDirect布尔型用户传入;PlatformDependent.hasUnsafe() 系统是否支持UNSAFE(通过内存指针进行堆外内存分配);即:用户传入preferDirect=true并且系统支持UNSAFE则使用堆外内存。
buffer()
@Override
public ByteBuf buffer() { // 注解@2
if (directByDefault) {
return directBuffer();
}
return heapBuffer();
}
注解@2 directByDefault如果为true使用堆外内存分配DirectByteBuffer,底层使用unsafe.allocateMemory分配。
directByDefault如果为false使用堆内存分配 new byte[initialCapacity]。
ioBuffer
public ByteBuf ioBuffer() { // 注解@3
if (PlatformDependent.hasUnsafe() || isDirectBufferPooled()) {
return directBuffer(DEFAULT_INITIAL_CAPACITY);
}
return heapBuffer(DEFAULT_INITIAL_CAPACITY);
}
注解@3 如果系统支持UNSAFE或者使用池化内存,优先分配堆外内存,否则分配堆内存。
heapBuffer
@Override
public ByteBuf heapBuffer() { // 注解@4
return heapBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY);
}
注解@4 分配堆外内存new一个byte数组(new byte[])。
directBuffer
@Override
public ByteBuf directBuffer() { // 注解@5
return directBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY);
}
注解@5 分配堆外内存。
compositeBuffer
@Override
public CompositeByteBuf compositeBuffer() { // 注解@6
if (directByDefault) {
return compositeDirectBuffer();
}
return compositeHeapBuffer();
}
注解@6 跟上面一样的,只是分配的CompositeByteBuf。下面看下这种将多个个buffer组合成一个buffer是如何实现的。
@Override
public CompositeByteBuf compositeHeapBuffer() {
return compositeHeapBuffer(DEFAULT_MAX_COMPONENTS);
}
@Override
public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
return toLeakAwareBuffer(new CompositeByteBuf(this, false, maxNumComponents));
}
private CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents, int initSize) {
super(AbstractByteBufAllocator.DEFAULT_MAX_CAPACITY);
this.alloc = ObjectUtil.checkNotNull(alloc, "alloc");
if (maxNumComponents < 1) {
throw new IllegalArgumentException(
"maxNumComponents: " + maxNumComponents + " (expected: >= 1)");
}
this.direct = direct;
this.maxNumComponents = maxNumComponents;
components = newCompArray(initSize, maxNumComponents); // 注解@6.1
}
注解@6.1 在CompositeByteBuf的构造方法中初始化了一个components,这个默认initSize=0;maxNumComponents默认为16。
private static Component[] newCompArray(int initComponents, int maxNumComponents) {
int capacityGuess = Math.min(AbstractByteBufAllocator.DEFAULT_MAX_COMPONENTS, maxNumComponents);
return new Component[Math.max(initComponents, capacityGuess)]; // 注解@6.2
}
注解@6.2 components是Component的对象数组,数组大小默认为16.
下面通过例子来体验一把CompositeByteBuf,先直观感受下。
示例代码
@Test
public void testCompositeByteBuf(){
String str1 = "瓜农";
String str2 = "老梁";
ByteBuf buf1 = Unpooled.buffer(10);
buf1.writeBytes(str1.getBytes(CharsetUtil.UTF_8));
System.out.println("buf1's readerIndex:" + buf1.readerIndex());
System.out.println("buf1's writeIndex" + buf1.writerIndex());
ByteBuf buf2 = Unpooled.buffer(10);
buf2.writeBytes(str2.getBytes(CharsetUtil.UTF_8));
System.out.println("buf2's readerIndex:" + buf2.readerIndex());
System.out.println("buf2's writeIndex" + buf2.writerIndex());
ByteBuf compositeByteBuf = Unpooled.wrappedBuffer(buf1,buf2);
System.out.println("compositeByteBuf's readerIndex:" + compositeByteBuf.readerIndex());
System.out.println("compositeByteBuf's writeIndex" + compositeByteBuf.writerIndex());
System.out.print(compositeByteBuf.toString(CharsetUtil.UTF_8));
}
结果输出
buf1's readerIndex:0
buf1's writeIndex6
buf2's readerIndex:0
buf2's writeIndex6
compositeByteBuf's readerIndex:0
compositeByteBuf's writeIndex12
瓜农老梁
小结 Unpooled.wrappedBuffer(buf1,buf2)将两个ByteBuf进行了合并一个ByteBuf;对外提供统一的读写指针供使用。
接下来看下他是如何合并的。
public static ByteBuf wrappedBuffer(int maxNumComponents, ByteBuf... buffers) {
switch (buffers.length) {
case 0:
break;
case 1:
ByteBuf buffer = buffers[0];
if (buffer.isReadable()) {
return wrappedBuffer(buffer.order(BIG_ENDIAN));
} else {
buffer.release();
}
break;
default:
for (int i = 0; i < buffers.length; i++) {
ByteBuf buf = buffers[i];
if (buf.isReadable()) {
return new CompositeByteBuf(ALLOC, false, maxNumComponents, buffers, i); // 注解@7
}
buf.release();
}
break;
}
return EMPTY_BUFFER;
}
注解@7 通过创建一个CompositeByteBuf,将ByteBuf数组传入构造函数。
CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents,
ByteBuf[] buffers, int offset) {
this(alloc, direct, maxNumComponents, buffers.length - offset);
addComponents0(false, 0, buffers, offset); // 注解@8
consolidateIfNeeded();
setIndex0(0, capacity()); // 注解@9
}
注解@8 填充Component[]数据,每个Component元素包含了传入的ByteBuf。
private CompositeByteBuf addComponents0(boolean increaseWriterIndex,
final int cIndex, ByteBuf[] buffers, int arrOffset) {
// buffers数组的长度;本例中arrOffset=0;count=len
final int len = buffers.length, count = len - arrOffset;
int ci = Integer.MAX_VALUE;
try {
checkComponentIndex(cIndex); // 合法性校验
shiftComps(cIndex, count); // 注解@8.1
int nextOffset = cIndex > 0 ? components[cIndex - 1].endOffset : 0;
for (ci = cIndex; arrOffset < len; arrOffset++, ci++) { // 注解@8.2
// 从数组中拿出传入的ByteBuf
ByteBuf b = buffers[arrOffset];
if (b == null) {
break;
}
// 构建Component
Component c = newComponent(ensureAccessible(b), nextOffset);
// 加入components数组
components[ci] = c;
// 递增endOffset
nextOffset = c.endOffset;
}
return this;
} finally {
// ...
}
}
注解@8.1 扩容Component数组,默认的数量为16个,当添加的buffer的数量超过16时就需要扩容了,下面看下其如何扩容的。
private void shiftComps(int i, int count) {
final int size = componentCount, newSize = size + count;
assert i >= 0 && i <= size && count > 0;
if (newSize > components.length) {
// grow the array
int newArrSize = Math.max(size + (size >> 1), newSize); // 注解@8.1.1
Component[] newArr;
// 注解@8.1.2
if (i == size) {
newArr = Arrays.copyOf(components, newArrSize, Component[].class);
} else {
newArr = new Component[newArrSize];
if (i > 0) {
System.arraycopy(components, 0, newArr, 0, i);
}
if (i < size) {
System.arraycopy(components, i, newArr, i + count, size - i);
}
}
components = newArr;
} else if (i < size) {
System.arraycopy(components, i, components, i + count, size - i);
}
componentCount = newSize;
}
注解@8.1.1 默认size=16,size >> 1 = 8 也就是扩容会以原大小一半的容量进行扩容。
注解@8.1.2 下面判断根据场景通过Arrays.copyOf、System.arraycopy将Component数组扩容;插入尾部、中部、头部等情况。本示例中没有超过16,所以不会扩容,componentCount=2。
注解@8.2 循环拿出传入的ByteBuf数组构建Component,并将其加入Component数组中;最后移动nextOffset。关于各个参数的含义,源码给出了注释。构造函数中
第一个参数:传入的ByteBuf
第二个参数:源ByteBuf的readerIndex
第三个参数:unwrapped的buffer
第四个参数:unwrappedIndex
第五个参数:offset = components[cIndex - 1].endOffset
第六个参数:len = buf.readableBytes() buf为源buffer
第七个参数:slice = null (示例)
以endOffset为例,等于插入数组中上一个Conponent的endOffset + 当前ByteBuf的可读长度,从而维护了其在整个CompositeByteBuf的写索引情况。
一个buffer对应一个Component,每个Component持有源buffer并维护了其在整个CompositeByteBuf的索引情况。
注解@9 设置整个CompositeByteBuf的读索引和写索引,读索引初始值为0;写索引为components[size - 1].endOffset,也就是整个Conponent数组中其每个元素维护的ByteBuf可读字节(writerIndex - readerIndex)大小的总和。
final void setIndex0(int readerIndex, int writerIndex) {
this.readerIndex = readerIndex;
this.writerIndex = writerIndex;
}
public int capacity() {
int size = componentCount;
return size > 0 ? components[size - 1].endOffset : 0;
}
小结: CompositeByteBuf通过将多个ByteBuf装入component数组中,对其统一维护读写索引,在外面看起来是一个统一的buffer;类似数据库中的视图。