Netty Server端启动分析(一)EventLoopGroup和 EventLoop分析
本文将以 Java Netty 学习(七) 为例,分析下netty服务端启动分析。
EventLoopGroup
EventLoopGroup
可以理解为 EventLoop
的 Group
,可以理解为线程组。
默认线程数
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
即默认为cpu对应线程数*2。
构造方法
从默认的线程进,到:
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
SelectorProvider.provider()
主要为根据平台来创建一个对应的 SelectProvider
:
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
// 在一些权限操作中,获取
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
// 根据平台型,选取一个SelectorProvider
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
根据平台型,选取一个 SelectorProvider
, windows
为 WindowsSelectorProvider
。
再传入默认的 SelectStrategyFactory
即默认的reactor线程的执行策略:
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
默认的拒绝策略 RejectedExecutionHandlers
:
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
@Override
public void rejected(Runnable task, SingleThreadEventExecutor executor) {
throw new RejectedExecutionException();
}
};
最终传入的最全参数的构造方法为:
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
父类 MultithreadEventExecutorGroup
构造方法,会将子类参数以多参数传入,并且会制定一个EventLoop的分发策略:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
分发策略:
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
有两种分发策略:PowerOfTwoEventExecutorChooser
和 GenericEventExecutorChooser
两种。
父类MultithreadEventExecutorGroup构造方法
父类 MultithreadEventExecutorGroup
构造方法中,主要是初始化 内部的 EventExecutor
数组,而数组大小就是前面传入的线程数。EventExecutor
是一个抽象接口,代表着一个事件执行器, NioEventLoop
就是其一个子类。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
// 单任务执行器,主要目的是启动reactor线程
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 初始化数组
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 初始化
children[i] = newChild(executor, args);
success = true; // 设置标志位
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
... 主要为失败处理
}
// 根据处理器长度,来决定用哪种处理工厂
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
// 设置失败处理监听器
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
// 其他处理
}
NioEventLoop 构造方法
看具体 NioEventLoop
具体初始化方法,主要目的为一个Reactor线程,绑定一个 Selector
。
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
// 判空操作
provider = selectorProvider;
// 打开一个selector
final SelectorTuple selectorTuple = openSelector();
// 填充selector
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
NioEventLoop
父类为 SingleThreadEventExecutor
,可以理解为 单消费者,多任务的线程池。其父类包括任务队列初始化,线程池执行器制定等。
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS; // 最大任务数,默认为integer的最大值
this.executor = ThreadExecutorMap.apply(executor, this); // 任务执行器
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue"); // 任务队列
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); // 拒绝策略
}
在 openSelector();
时,有对nio原生使用 Set
进行优化,将其改成数组类型,因为总体为轮训类型,所以并不需要为 Set
类型。
总结
EventLoopGroup
作为线程组,具体EventLoop
则作为一个reactor线程,实际为一个单线程多任务的线程池。Netty
优化 原生的Nio中Set
,将其使用反射改为了数组进行操作。如果不指定线程数量,默认是cpu线程数*2。