vlambda博客
学习文章列表

Netty Server端启动分析(一)EventLoopGroup和 EventLoop分析

本文将以 Java Netty 学习(七) 为例,分析下netty服务端启动分析。

EventLoopGroup

EventLoopGroup 可以理解为 EventLoopGroup,可以理解为线程组。

默认线程数

 
   
   
 
  1. DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(

  2. "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

即默认为cpu对应线程数*2。

构造方法

从默认的线程进,到:

 
   
   
 
  1. public NioEventLoopGroup(int nThreads, Executor executor) {

  2. this(nThreads, executor, SelectorProvider.provider());

  3. }

SelectorProvider.provider() 主要为根据平台来创建一个对应的 SelectProvider

 
   
   
 
  1. public static SelectorProvider provider() {

  2. synchronized (lock) {

  3. if (provider != null)

  4. return provider;

  5. return AccessController.doPrivileged(

  6. // 在一些权限操作中,获取

  7. new PrivilegedAction<SelectorProvider>() {

  8. public SelectorProvider run() {

  9. if (loadProviderFromProperty())

  10. return provider;

  11. if (loadProviderAsService())

  12. return provider;

  13. // 根据平台型,选取一个SelectorProvider

  14. provider = sun.nio.ch.DefaultSelectorProvider.create();

  15. return provider;

  16. }

  17. });

  18. }

  19. }

根据平台型,选取一个 SelectorProviderwindowsWindowsSelectorProvider

再传入默认的 SelectStrategyFactory 即默认的reactor线程的执行策略:

 
   
   
 
  1. @Override

  2. public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {

  3. return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;

  4. }

默认的拒绝策略 RejectedExecutionHandlers

 
   
   
 
  1. private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {

  2. @Override

  3. public void rejected(Runnable task, SingleThreadEventExecutor executor) {

  4. throw new RejectedExecutionException();

  5. }

  6. };

最终传入的最全参数的构造方法为:

 
   
   
 
  1. public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,

  2. final SelectStrategyFactory selectStrategyFactory) {

  3. super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());

  4. }

父类 MultithreadEventExecutorGroup 构造方法,会将子类参数以多参数传入,并且会制定一个EventLoop的分发策略:

 
   
   
 
  1. protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {

  2. this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);

  3. }

分发策略:

 
   
   
 
  1. public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();


  2. private DefaultEventExecutorChooserFactory() { }


  3. @SuppressWarnings("unchecked")

  4. @Override

  5. public EventExecutorChooser newChooser(EventExecutor[] executors) {

  6. if (isPowerOfTwo(executors.length)) {

  7. return new PowerOfTwoEventExecutorChooser(executors);

  8. } else {

  9. return new GenericEventExecutorChooser(executors);

  10. }

  11. }

有两种分发策略:PowerOfTwoEventExecutorChooserGenericEventExecutorChooser 两种。

父类MultithreadEventExecutorGroup构造方法

父类 MultithreadEventExecutorGroup 构造方法中,主要是初始化 内部的 EventExecutor 数组,而数组大小就是前面传入的线程数。EventExecutor 是一个抽象接口,代表着一个事件执行器, NioEventLoop 就是其一个子类。

 
   
   
 
  1. protected MultithreadEventExecutorGroup(int nThreads, Executor executor,

  2. EventExecutorChooserFactory chooserFactory, Object... args) {

  3. if (nThreads <= 0) {

  4. throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));

  5. }

  6. if (executor == null) {

  7. // 单任务执行器,主要目的是启动reactor线程

  8. executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());

  9. }

  10. // 初始化数组

  11. children = new EventExecutor[nThreads];

  12. for (int i = 0; i < nThreads; i ++) {

  13. boolean success = false;

  14. try {

  15. // 初始化

  16. children[i] = newChild(executor, args);

  17. success = true; // 设置标志位

  18. } catch (Exception e) {

  19. // TODO: Think about if this is a good exception type

  20. throw new IllegalStateException("failed to create a child event loop", e);

  21. } finally {

  22. ... 主要为失败处理

  23. }

  24. // 根据处理器长度,来决定用哪种处理工厂

  25. chooser = chooserFactory.newChooser(children);

  26. final FutureListener<Object> terminationListener = new FutureListener<Object>() {

  27. @Override

  28. public void operationComplete(Future<Object> future) throws Exception {

  29. if (terminatedChildren.incrementAndGet() == children.length) {

  30. terminationFuture.setSuccess(null);

  31. }

  32. }

  33. };

  34. // 设置失败处理监听器

  35. for (EventExecutor e: children) {

  36. e.terminationFuture().addListener(terminationListener);

  37. }

  38. // 其他处理

  39. }

NioEventLoop 构造方法

看具体 NioEventLoop 具体初始化方法,主要目的为一个Reactor线程,绑定一个 Selector

 
   
   
 
  1. NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,

  2. SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,

  3. EventLoopTaskQueueFactory queueFactory) {

  4. super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),

  5. rejectedExecutionHandler);

  6. // 判空操作

  7. provider = selectorProvider;

  8. // 打开一个selector

  9. final SelectorTuple selectorTuple = openSelector();

  10. // 填充selector

  11. selector = selectorTuple.selector;

  12. unwrappedSelector = selectorTuple.unwrappedSelector;

  13. selectStrategy = strategy;

  14. }

NioEventLoop 父类为 SingleThreadEventExecutor ,可以理解为 单消费者,多任务的线程池。其父类包括任务队列初始化,线程池执行器制定等。

 
   
   
 
  1. protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,

  2. boolean addTaskWakesUp, Queue<Runnable> taskQueue,

  3. RejectedExecutionHandler rejectedHandler) {

  4. super(parent);

  5. this.addTaskWakesUp = addTaskWakesUp;

  6. this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS; // 最大任务数,默认为integer的最大值

  7. this.executor = ThreadExecutorMap.apply(executor, this); // 任务执行器

  8. this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue"); // 任务队列

  9. rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); // 拒绝策略

  10. }

openSelector(); 时,有对nio原生使用 Set进行优化,将其改成数组类型,因为总体为轮训类型,所以并不需要为 Set类型。

总结

  1. EventLoopGroup 作为线程组,具体 EventLoop 则作为一个reactor线程,实际为一个单线程多任务的线程池。

  2. Netty优化 原生的Nio中 Set,将其使用反射改为了数组进行操作。

  3. 如果不指定线程数量,默认是cpu线程数*2。