Netty源码深度解析-EventLoop(1)EventLoop的构造
导读
原创文章,转载请注明出处。
本文所使用的netty版本4.1.6.Final:带注释的netty源码
EventLoop在netty中发挥着驱动引擎的作用,本文我们以NioEventLoopGroup和NioEventLoop为例着重分析一下EventLoopGroup和EventLoop的创建、一些重要的数据结构和netty的一些优化。
1 NioEventLoopGroup
咱们以NioEventLoopGroup为例进行分析。NioEventLoopGroup有很多构造方法,咱们不再一一贴出,只贴出2个关键的构造方法。
NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider)该构造方法给出了SelectStrategyFactory的默认值为DefaultSelectStrategyFactory.INSTANCE。EventLoop在每次循环时需要调用该类的calculateStrategy方法来决定循环的策略,具体咱们后边讲。
我们通过new NioEventLoopGroup()或者new NioEventLoopGroup(32)创建EventLoopGroup时最终都会调用到这个构造方法。接着又调用了另一个构造方法,咱们跟下去。
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
这里调用了父类 MultithreadEventLoopGroup的构造方法,咱们继续跟下去。
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
MultithreadEventLoopGroup的构造方法如下,这里给出了nThreads的默认值,为MultithreadEventLoopGroup的静态属性DEFAULT_EVENT_LOOP_THREADS。接着调用父类MultithreadEventExecutorGroup的构造方法。
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
DEFAULT_EVENT_LOOP_THREADS的赋值在MultithreadEventLoopGroup的静态代码块中,我们看到该值默认为cpu核数×2。
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
咱们接着跟到MultithreadEventExecutorGroup的构造方法,这里继续调用本类的构造方法MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args),并且为executor赋默认值。
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
executor的默认值为ThreadPerTaskExecutor,顾名思义就是为每一个任务创建一个线程,我们看一下它的实现,代码如下,execute方法中为每一个任务创建了一个线程。
public final class ThreadPerTaskExecutor implements Executor {
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
我们继续跟到MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args),这里接着调用另一个构造方法MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args)并且为chooserFacotry赋默认值,chooserFactory咱们在“服务端的启动流程”和“客户端的启动流程”中均有涉及,作用是在为Channel绑定EventLoop时轮询地从EventLoopGroup里选择EventLoop,这里不再赘述。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
接着看下一个构造方法MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args),这里就是创建EventLoopGroup的核心逻辑了。
首先创建了一个EventExecutor数组,并赋值给children属性,数据长度和nThreads相等,很容易想到,这里应该就是保存EventLoop的数组了。
紧接着循环调用newChild方法为数组的每一个元素赋值。
最后调用chooserFactory.newChooser(children)为chooser赋值。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
} finally {
}
}
chooser = chooserFactory.newChooser(children);
}
我们跟着看一下newChild方法,该方法为抽象的,这里newChild方法的实现在NioEventLoopGroup中。好了,到这里咱们看到了NioEventLoop的构造方法,继续跟进去。
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
2 NioEventLoop
来看NioEventLoop的构造方法,这里NioEventLoop保存了3个属性。
-
provider:selector的工厂类,从provider可以得到selector。 -
selector: 调用provider.openSelector()得到的selector,这里netty做了优化,咱们后边讲。 -
selectStrategy:这个的calculateStrategy方法返回值,决定了EventLoop的下一步动作,咱们也是后边讲。
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
selector = openSelector();
selectStrategy = strategy;
}
继续跟到父类SingleThreadEventLoop的构造方法,这里初始化了一个属性tailTasks,具体有什么用,咱们一会儿说,先记住这里有一个任务队列叫tailTasks。
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
}
继续跟到父类SingleThreadEventExecutor的构造方法,这里有几个属性赋值。
-
addTaskWakesUp:这个是用来唤醒阻塞在taskQueue上的EventLoop线程的,在NioEventLoop中EventLoop不会阻塞在taskQueue上,这里用处不是很大,可以先不研究它。 -
maxPendingTasks:后面紧接着就用到了,任务队列的最大长度。 -
executor:真正生成线程的类,默认是ThreadPerTaskExecutor。 -
taskQueue: 任务队列,还记得上边已经有一个tailTask队列了吗,这里是另外一个队列。 -
rejectedExecutionHandler:拒绝策略。
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
继续跟到父类AbstractScheduledEventExecutor的构造方法,这里什么也没干,又继续调用父类的构造方法,但是我们注意到该类中也有一个队列scheduledTaskQueue,顾名思义是定时任务队列,只是该队列没有在构造方法中初始化,等到有定时任务加入时才初始化。
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
super(parent);
}
}
继续跟到父类AbstractEventExecutor的构造方法,这里为parent赋值,即该EventLoop所属的EventLoopGroup。
protected AbstractEventExecutor(EventExecutorGroup parent) {
this.parent = parent;
}
3 netty的一些优化
3.1 对selector的优化
我们回到NioEventLoop类中的openSelector方法,这个方法在干什么呢,它在通过反射替换sun.nio.ch.SelectorImpl类的两个属性selectedKeys和publicSelectedKeys,将这两个属性都替换成了SelectedSelectionKeySet类的实例。为什么要这么换呢,咱们接着往下看。
private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();
} catch (IOException e) {
}
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (ClassNotFoundException e) {
} catch (SecurityException e) {
}
}
});
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
} catch (IllegalAccessException e) {
} catch (RuntimeException e) {
}
}
});
if (maybeException instanceof Exception) {
} else {
//将selectedKeySet保存到selectedKeys属性中
selectedKeys = selectedKeySet;
}
return selector;
}
sun.nio.ch.SelectorImpl类中的selectedKeys和publicSelectedKeys的作用是保存有兴趣事件发生的Selectionkey,其中publicSelectedKeys是selectedKeys的包装类,Util.ungrowableSet(this.selectedKeys),看方法名ungrowableSet表示包装之后这个set就不能添加元素了,但是可以删除元素。也就是说jdk内部使用selectedKeys(注意这是个protected字段)进行添加和删除操作,而暴露给用户的是publicSelectedKeys只能进行删除操作(看selectedKeys方法)。
默认实现用的是HashSet,
public abstract class SelectorImpl extends AbstractSelector {
protected Set<SelectionKey> selectedKeys = new HashSet();
protected HashSet<SelectionKey> keys = new HashSet();
private Set<SelectionKey> publicKeys;
private Set<SelectionKey> publicSelectedKeys;
protected SelectorImpl(SelectorProvider var1) {
super(var1);
if (Util.atBugLevel("1.4")) {
this.publicKeys = this.keys;
this.publicSelectedKeys = this.selectedKeys;
} else {
this.publicKeys = Collections.unmodifiableSet(this.keys);
this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
}
}
public Set<SelectionKey> selectedKeys() {
if (!this.isOpen() && !Util.atBugLevel("1.4")) {
throw new ClosedSelectorException();
} else {
return this.publicSelectedKeys;
}
}
}
再来看看SelectedSelectionKeySet,这里用的数据结构是数组,很显然netty这里的优化是因为数组的元素添加和遍历操作比HashSet更快。有同学对这里有两个数组的存在表示疑问,大家不用操心这个问题了,因为我也没看懂为什么会有两个数组,还要交替使用。netty在后来的版本中做了优化,只用一个数组就实现了。所以这里只要知道netty用数组进行优化就可以了。
那么又一个问题来了,为什么jdk里边不用数组或者List,而用HashSet呢,那是因为selector的select方法每次调用都会把已经准备好的SelectionKey放入selectedKeys中,如果用户在第一次调用select方法之后没有处理相应Channel的事件的,也没有删除selectedKeys中的元素,那么再次调用select之后,相同的SelectionKey会再次加入selectedKeys中,如果selectedKeys使用数组或者List实现将起不到去重的效果。
另一方面,如果使用List或者数组,删除成本也比较高。
而netty的实现中保证不会在连续调用两次select方法之间不删除selectedKeys中的元素,而且netty直接将selectedKeys暴露出来,在删除的时候可以直接将数据中对应索引的元素设置为null。
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
private SelectionKey[] keysA;
private int keysASize;
private SelectionKey[] keysB;
private int keysBSize;
private boolean isA = true;
}
3.2 对任务队列的优化
在SingleThreadEventExecutor的构造方法中taskQueue属性通过调用newTaskQueue方法产生,newTaskQueue方法在NioEventLoop中被覆盖了,我们看一下。
这里被优化成了MpscQueue,全称是MultiProducerSingleConsumerQueue,即多生产者单消费者队列,感兴趣的同学自己去看一下,既然单独做一个这样的队列出来,在当前场景下自然是比BlockingQueue性能更好了。
为什么可以做这样的优化呢,很显然,这里的队列消费者只有一个那就是EventLoop,而生产者会有多个。并且,这里有一行注释This event loop never calls takeTask(),这就说明这个MultiProducerSingleConsumerQueue并没有阻塞的take方法,而正好NioEventLoop也不需要调用阻塞的take方法,正好适合。
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return PlatformDependent.newMpscQueue(maxPendingTasks);
}
同样的,tailTask这个队列也被优化成了MpscQueue。我们看一下scheduledTaskQueue是什么队列,答案在AbstractScheduledEventExecutor#scheduledTaskQueue方法中,我们看到scheduledTaskQueue仅仅是一个普通的优先级队列,甚至都不是一个线程安全的阻塞队列。
Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
}
return scheduledTaskQueue;
}
为什么呢,为什么tailTasks和taskQueue是MpscQueue,而scheduledTaskQueue是非线程安全的队列呢?可能是因为没有的带优先级功能的MpsQueue吧。
scheduledTaskQueue是非线程安全的队列,会不会有多线程安全问题呢,答案是不会。我们看一下AbstractScheduledEventExecutor#schedule(io.netty.util.concurrent.ScheduledFutureTask<V>)方法,这里在添加定时任务时,如果是非EventLoop线程调用,则会发起一个异步调用,最终往scheduledTaskQueue添加定时任务的还是EventLoop线程。所以呢,这里又有另一个优化,那就是可以延迟初始化scheduledTaskQueue。
4 总结
画重点来了,本文的重点就这两个。
-
每个 EventLoop中有3个队列,分别是tailTasks、taskQueue和scheduledTaskQueue。而且tailTasks和taskQueue队列在NioEventLoop中的被优化为MultiProducerSingleConsumerQueue。 -
netty对 selector中的selectedKeys做了优化,从HashSet替换为SelectedSelectionKeySet,SelectedSelectionKeySet是用数组实现的Set,添加元素和遍历效率更高。
关于作者
王建新,转转架构部资深Java工程师,主要负责服务治理、RPC框架、分布式调用跟踪、监控系统等。爱技术、爱学习,欢迎联系交流。
