Netty对象重用:Recycler源码分析
Netty 作为一个高性能的网络 IO 框架,在代码层面做了大量的优化,为了减轻 GC 的压力,尽可能的使对象可以被重用,避免频繁的创建和销毁。
Recycler
抽象类是 Netty 实现的,基于线程本地变量 Stack 实现的一个轻量级的对象重用池。调用get()
方法时优先从对象池中获取可重用的对象,当池中没有对象可用时会自动触发newObject()
创建新对象。io.netty.util.internal.ObjectPool.Handle.recycle()
方法可以帮助对象进行回收,默认的实现类是DefaultHandle
,回收时它会将对象push
到线程绑定的 Stack 中。
1. 用法示例
如下是一个使用示例,完成对Person
的回收与重用:
public class RecyclerDemo {
static class Person {
private ObjectPool.Handle<Person> handle;
public Person(ObjectPool.Handle<Person> handle) {
this.handle = handle;
}
public void recycle() {
handle.recycle(this);
}
}
public static void main(String[] args) {
ObjectPool<Person> pool = ObjectPool.newPool(new ObjectPool.ObjectCreator<Person>() {
@Override
public Person newObject(ObjectPool.Handle<Person> handle) {
return new Person(handle);
}
});
Person person = pool.get();
person.recycle();
}
}
2. 源码分析
2.1 ObjectPool
首先看ObjectPool
类,它定义了一个轻量级的对象池,get()
方法可以从池中获取一个对象:
// 轻量级的对象池
public abstract class ObjectPool<T> {
ObjectPool() { }
// 从对象池中获取一个对象,对象可能通过 ObjectPool.ObjectCreator.newObject()创建
public abstract T get();
}
内部接口Handle
负责定义对象的回收:
// 对象回收的处理
public interface Handle<T> {
// 重用对象
void recycle(T self);
}
内部接口ObjectCreator
负责定义新对象的创建:
// 对象创建器
public interface ObjectCreator<T> {
// 基于Handle创建一个新对象,对象在可重用时会调用 handle.recycle()
T newObject(Handle<T> handle);
}
静态方法newPool
可以快速创建一个对象池实例:
public static <T> ObjectPool<T> newPool(final ObjectCreator<T> creator) {
return new RecyclerObjectPool<T>(ObjectUtil.checkNotNull(creator, "creator"));
}
Netty 提供了一个默认的实现类RecyclerObjectPool
,它的实现也非常简单,主要还是依赖Recycler
类,所以核心还是要把Recycler
搞懂。
// 可重用的对象池
private static final class RecyclerObjectPool<T> extends ObjectPool<T> {
private final Recycler<T> recycler;
RecyclerObjectPool(final ObjectCreator<T> creator) {
recycler = new Recycler<T>() {
// 当池中没有可用对象时,调用此方法创建新对象
@Override
protected T newObject(Handle<T> handle) {
return creator.newObject(handle);
}
};
}
@Override
public T get() {
return recycler.get();
}
}
2.2 Recycler
Recycler 是对象回收的核心,先来看它的属性。它提供了大量的静态常量来定义一些默认值,这些值可以通过设置 JVM 参数调整,如下:
// 不需要回收对象的 Handle
@SuppressWarnings("rawtypes")
private static final Handle NOOP_HANDLE = new Handle() {
@Override
public void recycle(Object object) {
// NOOP
}
};
// ID生成器,WeakOrderQueue和recycleId需要用到
private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);
// 对象被回收时,会将recycleId设为OWN_THREAD_ID,代表被回收了
private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
// 线程本地变量Stack的最大容量 4096
private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
// 线程本地变量Stack默认的最大容量
private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
// Stack初始容量,256~4096之间扩容
private static final int INITIAL_CAPACITY;
//最大可共享的容量因子,默认为2,非主线程可回收的容量,默认是一半
private static final int MAX_SHARED_CAPACITY_FACTOR;
// 最大延迟队列的数量 默认CPU核心数*2
private static final int MAX_DELAYED_QUEUES_PER_THREAD;
// Link节点的elements数组长度
private static final int LINK_CAPACITY;
// 对象回收的频率,默认每8个回收一个,避免WeakOrderQueue增长的过快,降低对快速回收的敏感程度
private static final int RATIO;
private static final int DELAYED_QUEUE_RATIO;
实例常量这里就不贴代码了,含义同静态常量,只是做简单的赋值操作。
使用 Recycler 的线程,会在线程的本地变量里存放一个Stack
栈来存放可重用的对象:
// 在线程的本地变量Stack中存放已回收的对象
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
interval, maxDelayedQueuesPerThread, delayedQueueInterval);
}
@Override
protected void onRemoval(Stack<T> value) {
if (value.threadRef.get() == Thread.currentThread()) {
if (DELAYED_RECYCLED.isSet()) {
DELAYED_RECYCLED.get().remove(value);
}
}
}
};
2.2.1 get()获取对象
直接看核心流程吧,先看get()
获取对象的流程,它首先会判断maxCapacityPerThread
是否为 0,0 代表不回收对象,每次都创建新对象。否则从线程的本地变量中获取 Stack,调用pop()
方法试图从栈中弹出一个可用对象,如果没有对象可用,则调用newObject()
创建新对象。
// 获取一个对象,如果池中没有,则创建新对象
@SuppressWarnings("unchecked")
public final T get() {
if (maxCapacityPerThread == 0) {
// 线程本地变量Stack容量为0,代表不重用对象,NOOP_HANDLE不会回收
return newObject((Handle<T>) NOOP_HANDLE);
}
// 获取线程本地变量Stack
Stack<T> stack = threadLocal.get();
// 对象经过DefaultHandle包装,从栈中弹出一个
DefaultHandle<T> handle = stack.pop();
if (handle == null) {
// 没有可用对象了,基于handle创建一个新对象
handle = stack.newHandle();
handle.value = newObject(handle);
}
return (T) handle.value;
}
stack.pop()
方法非常重要,需要重点关注!!!
Stack 是 Recycler 的内部类,它使用数组实现了一个简单的栈结构,调用pop()
时,它首先会判断当前栈中是否有可用元素,如果有则直接弹出,否则会调用scavenge()
将 WeakOrderQueue 中的对象转移到栈中。
为啥还需要 WeakOrderQueue 队列呢???这是因为对象除了可以被主线程回收外,还可以被其他线程回收。当被主线程回收时,直接入栈,如果是非主线程回收,则将对象放入 Stack 绑定的 WeakOrderQueue 中,待
pop()
时再从 Queue 转移到 Stack。
直接看pop()
代码:
DefaultHandle<T> pop() {
// 可用的对象数量,recycle()时会增加
int size = this.size;
if (size == 0) {
// 没有可用对象,尝试清理
if (!scavenge()) {
return null;
}
size = this.size;
if (size <= 0) {
// double check, avoid races
return null;
}
}
// 存在可重用的对象,直接从数组中取
size--;
DefaultHandle ret = elements[size];
elements[size] = null;
this.size = size;
if (ret.lastRecycledId != ret.recycleId) {
throw new IllegalStateException("recycled multiple times");
}
// 对象取出后,重置recycleId和lastRecycledId
ret.recycleId = 0;
ret.lastRecycledId = 0;
return ret;
}
Stack 有可用对象时处理很简单,只是简单的弹出。无可用对象时,尝试将其他线程帮忙回收的对象从 WeakOrderQueue 转移到 Stack。
// 尝试将WeakOrderQueue中的对象转移到Stack
private boolean scavenge() {
// continue an existing scavenge, if any
if (scavengeSome()) {
return true;
}
// 没有对象可以转移,重置指针
prev = null;
cursor = head;
return false;
}
scavengeSome()
会遍历 WeakOrderQueue,将所有 Link 中的元素转移到 Stack。
WeakOrderQueue 是一个单向链表,通过一系列 Link 节点连接而成,每个 Link 节点内部存储了一个 elements 数组用来存储已回收的对象。
// 尝试将Queue中的对象转移到Stack
private boolean scavengeSome() {
WeakOrderQueue prev;
WeakOrderQueue cursor = this.cursor;
if (cursor == null) {
prev = null;
cursor = head;
if (cursor == null) {// 还没有回收过对象,无法清理
return false;
}
} else {
prev = this.prev;
}
boolean success = false;
do {
if (cursor.transfer(this)) {
success = true;
break;
}
// 单向链表结构,获取下一个节点
WeakOrderQueue next = cursor.getNext();
if (cursor.get() == null) {
// 使用了弱引用,检测到Queue绑定的线程销毁了,因此对象转移后需要断开此节点
if (cursor.hasFinalData()) {
for (;;) {
if (cursor.transfer(this)) {
success = true;
} else {
break;
}
}
}
if (prev != null) {
cursor.reclaimAllSpaceAndUnlink();
prev.setNext(next);
}
} else {
prev = cursor;
}
cursor = next;
} while (cursor != null && !success);
this.prev = prev;
this.cursor = cursor;
return success;
}
io.netty.util.Recycler.WeakOrderQueue.transfer()
方法会将 Link 节点中的对象转移到 Stack 中,readIndex
代表转移的进度指针,达到LINK_CAPACITY
说明说明节点转移完毕,会继续转移 next 节点,代码这里就不贴了,感兴趣的同学自己去看下。
2.2.2 recycle()回收对象
另一个核心主流程是handle.recycle()
,它负责回收对象。对象的回收会转交给Handle
处理,默认的实现是io.netty.util.Recycler.DefaultHandle
。它首先会获取到 Stack 容器,做一些校验工作,然后将对象 push 到 Stack 中进行回收重用。
// 回收对象
@Override
public void recycle(Object object) {
if (object != value) {// 判空
throw new IllegalArgumentException("object does not belong to handle");
}
// 获取绑定的Stack
Stack<?> stack = this.stack;
if (lastRecycledId != recycleId || stack == null) {
// 对象已经被回收
throw new IllegalStateException("recycled already");
}
stack.push(this);
}
核心逻辑依然在stack.push()
中,这里它会判断回收的线程是否是 Stack 绑定的主线程,如果是则直接入栈,否则放入 Stack 绑定的 WeakOrderQueue 中,等待主线程自己去转移。
void push(DefaultHandle<?> item) {
Thread currentThread = Thread.currentThread();
if (threadRef.get() == currentThread) {
// 回收线程是主线程,直接添加到数组
pushNow(item);
} else {
/*
回收线程非主线程,添加到WeakOrderQueue中,
后续pop()时会将Queue中对象转移到Stack
*/
pushLater(item, currentThread);
}
}
先看简单的pushNow()
,对于主线程回收,对象入栈即可。
// 主线程回收,对象入栈
private void pushNow(DefaultHandle<?> item) {
// 避免重复回收,第一次回收时 recycleId=0,于是会设置lastRecycleId=OWN_THREAD_ID
if (item.recycleId != 0 || !item.compareAndSetLastRecycledId(0, OWN_THREAD_ID)) {
throw new IllegalStateException("recycled already");
}
// 对象回收将 recycleId设为OWN_THREAD_ID
item.recycleId = OWN_THREAD_ID;
int size = this.size;
if (size >= maxCapacity || dropHandle(item)) {
return;
}
if (size == elements.length) {
// 扩容
elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
}
elements[size] = item;
this.size = size + 1;
}
对于非主线程回收,会复杂一些,对象会被添加到 Stack 绑定的 WeakOrderQueue 中,如果 WeakOrderQueue 的数量达到上限,则不再回收对象。
// 非主线程回收对象
private void pushLater(DefaultHandle<?> item, Thread thread) {
if (maxDelayedQueues == 0) {
// 不支持线程之间的回收
return;
}
// 获取Stack和Queue的映射关系
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
// 当前Stack绑定的Queue
WeakOrderQueue queue = delayedRecycled.get(this);
if (queue == null) {// 如果Queue是null
if (delayedRecycled.size() >= maxDelayedQueues) {
// 延迟队列的数量已达上限,put一个占位符
delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
// 创建一个和Stack绑定的新Queue
if ((queue = newWeakOrderQueue(thread)) == null) {
return;
}
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
// Queue的数量已达上限
return;
}
// 添加到Queue中
queue.add(item);
}
到这里对象回收的流程也就基本结束了。
3. 总结
Recycler 是 Netty 基于本地线程变量 Stack 实现的一套轻量级的重用对象池,它的目的是减轻 GC 的压力,避免对象的频繁创建和销毁,像 ByteBuf、Entry 等对象都使用它来进行回收和重用。
它利用 FastThreadLocal 在每个线程的本地变量中存放一个 Stack 栈结构来保存已回收的对象,哪个线程创建的对象只有该线程自己可以重用,因此 Stack 绑定了一个主线程。但是对象的回收可以是任意线程,如果是主线程回收则直接将对象入栈即可,对于非主线程的回收,Recycler 会将对象暂时放入 Stack 绑定的延迟队列 WeakOrderQueue 中,为了避免对象回收的速度过快导致 WeakOrderQueue 增长的过快,允许通过ratio
属性来设置回收的频率。回收对象被放入 WeakOrderQueue 后,主线程在取对象时如果栈中无可用对象了,会尝试将 WeakOrderQueue 中的对象迁移到 Stack 中,这就是 Recycler 回收的大致流程总结。