vlambda博客
学习文章列表

从源码角度分析netty的Reactor反应器模式

回顾下经典的反应器模式:


其分为三步:注册,轮询,分发。

接下来看一下netty的反应器模式


bossLoopGroup线程组:



从源码角度分析netty的Reactor反应器模式


workerLoopGroup线程组:


从源码角度分析netty的Reactor反应器模式

    不管是BossLoopGroup,还是WorkerLoopGroup。可以看出其实netty的反应器模式的实现就是通过NioEventLoop实现的。接下来分析一下NioEventLoop。


搞清NioEventLoop之前,需要弄清以下两个问题:


1.NioEventLoop和原生的Selector之前的关系?

  从上图中的对比可以发现,netty的NioEventLoop在实现其反应器模式时,充当了原生的Selector角色。


从类的继承关系看:


从源码角度分析netty的Reactor反应器模式

可以看出其基类SingleThreadEventExecutor包含一个Thread,其自己包含一个原生的Selecor。所以他们之间的关系可以是包含于被包含之前的关系。“带有一个线程的Selector”。


2.netty的channel和原生channel之间的关系?


以netty其中的一种channel,NioSocketChannel为例,来看看其类的继承关系:

从源码角度分析netty的Reactor反应器模式


可以看出,netty的channel封装了原生的channel。



那么知道了其关系后,请试着回答一下以下问题:


  1. netty的Reactor的反应器线程在哪?

  2. netty的Reactor的反应器线程在什么时候启动的?


答案:

   1.netty的反应器线程在NioEventLoop的基类SingleThreadEventExecutor中包含了Thread。

  2.在中可知,反应器线程会轮询Selector,其启动一般是channel注册在Selector之前。netty也一样,其时机是在SocketChannel注册到EventLoop的Selector之前。


下面将从源码角度分析netty的反应器模式,如果不是太明白反应器模式,建议先完全弄明白这种模式,可以参考上一篇: 。



一、从源码角度分析注册:


源码调用链:


从源码角度分析netty的Reactor反应器模式


  使用的netty版本:4.1.39.final,其他版本源码可能不一样,但原理和大致流程是一致的。


其注册过程在第6步,AbstractNioChannel.doRegister()中:


public abstract class AbstractNioChannel extends AbstractChannel {    .....    //原生的Channel(通过构造函数进行初始化)    private final SelectableChannel ch;    //选择键 volatile SelectionKey selectionKey; protected final int readInterestOp;     //初始化 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent);        //初始化原生的channel this.ch = ch;        this.readInterestOp = readInterestOp;                ..... } ..... protected void doRegister() throws Exception { boolean selected = false; while(true) { try {                //1.调用javaChanel()方法获取一个原生的channel                //2.然后将channel注册到EventLoop关联的Selector上                //3.将this与channel绑定,返回绑定好的选择键                //this就绪的channel实例,一种附件                //选择键可以理解为一种绑定关系,通过就绪的this实例,拿到对应的底层channel this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException var3) { if (selected) { throw var3;                } this.eventLoop().selectNow(); selected = true; } }    }        //获取原生的channel    protected SelectableChannel javaChannel() { return this.ch; }    }


二、从源码角度分析轮询:


   在上面的问题中,我们类比出了netty的反应器线程的执行时机是在:SockectChannel注册到EventLoop的Selector之前,也就是注册之前。


源码:4.AbstractChannel.AbstractUnsafe.register

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {     //由调用方传入 private volatile EventLoop eventLoop;     .....  public final void register(EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) {                    //线程已启动,进行注册流程 this.register0(promise);                } else {                    //线程未启动 try { eventLoop.execute(new Runnable() {                            //提交一个注册流程任务,让这个任务去完成注册流程 public void run() { AbstractUnsafe.this.register0(promise); } }); } catch (Throwable var4) { AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4); this.closeForcibly(); AbstractChannel.this.closeFuture.setClosed(); this.safeSetFailure(promise, var4); } } }}


从上面的源码我们可以看到,注册之前的这一部分源码并没有实现Reactor反应器线程的启动,那么它是在哪启动的呢?轮询又是在哪完成的呢?在前面第一个问题时,我们知道netty的反应器线程在NioEventLoop的基类SingleThreadEventExecutor中,也刚好是其上一步的调用中,这个eventLoop也就是NioEventLoop。


源码:3.SingleThreadEventLoop.register

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {    .....    }
//其父类public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { //任务队列 private final Queue<Runnable> taskQueue;    //父类中维护了一个线程,由子类启动    private volatile Thread thread; private final Executor executor;    .....            //启动线程执行,执行run()    private void startThread() { if (this.state == 1 && STATE_UPDATER.compareAndSet(this, 1, 2)) { boolean success = false;
try { this.doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, 2, 1); }
} }
} private void doStartThread() {        assert this.thread == null; this.executor.execute(new Runnable() { public void run() { SingleThreadEventExecutor.this.thread = Thread.currentThread(); if (SingleThreadEventExecutor.this.interrupted) { SingleThreadEventExecutor.this.thread.interrupt(); }
boolean success = false; SingleThreadEventExecutor.this.updateLastExecutionTime(); boolean var112 = false;
int oldState; label1907: { try { var112 = true; //启动线程池中的线程 SingleThreadEventExecutor.this.run(); success = true; var112 = false; break label1907; } catch (Throwable var119) { SingleThreadEventExecutor.logger.warn("Unexpected exception from an event executor: ", var119); var112 = false;                    } finally {                      ..... }                    ...... }); }        //完成轮询工作    protected abstract void run();          }
//实现类(由它完成轮询工作)public final class NioEventLoop extends SingleThreadEventLoop { private Selector selector; private Selector unwrappedSelector; private SelectedSelectionKeySet selectedKeys;        protected void run() {       //轮询     while(true) { while(true) { try {                        ....                         this.cancelledKeys = 0; this.needsToSelectAgain = false;                        //网络IO事件执行时间和task任务执行时间的比例,默认为50                        int ioRatio = this.ioRatio;                      if (ioRatio == 100) { try { //处理网络IO事件                                //SK选择中ready事件,如:accept,connect,read,write等 this.processSelectedKeys(); } finally { //处理task事件                                //添加到taskQueue中的任务 this.runAllTasks(); } } else {                            //重新计算ioRatio,轮询 long ioStartTime = System.nanoTime(); boolean var14 = false;
try { var14 = true; this.processSelectedKeys(); var14 = false; } finally { if (var14) { long ioTime = System.nanoTime() - ioStartTime; this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio); } }
long ioTime = System.nanoTime() - ioStartTime; this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio); } } catch (Throwable var24) { handleLoopException(var24); } break; }                //检测线程是否停,不然一直轮询 try { if (this.isShuttingDown()) { this.closeAll(); if (this.confirmShutdown()) { return; } } } catch (Throwable var20) { handleLoopException(var20); } }    }  }


上面启动Reactor反应器线程的线程池是SingleThreadEventExecutor,在调用的源码第二步:2.MultithreadEventLoopGroup.register 完成线程池的初始化

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {    .....     protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } ..... }  //其父类public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup { private final EventExecutor[] children; private final Set<EventExecutor> readonlyChildren; private final AtomicInteger terminatedChildren; private final Promise<?> terminationFuture; private final EventExecutorChooser chooser;
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { this(nThreads, (Executor)(threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory)), args); }     ..... }

主要注意的是其线程池用到的线程工厂用的ThreadPerTaskExecutor此工厂主要特点是执行一个线程,创建一个线程池。所以整体上netty的Reactor反应器线程是单线程的。



三、从源码角度分析分发:


回到轮询时的源码:

//实现类(由它完成轮询工作)public final class NioEventLoop extends SingleThreadEventLoop { private Selector selector; private Selector unwrappedSelector;    //IO事件集合,分发时需要使用 private SelectedSelectionKeySet selectedKeys;  protected void run() { //轮询 while(true) { while(true) { try { ....  this.cancelledKeys = 0; this.needsToSelectAgain = false; //网络IO事件执行时间和task任务执行时间的比例,默认为50 int ioRatio = this.ioRatio;  if (ioRatio == 100) { try { //1.处理网络IO事件 //SK选择中ready事件,如:accept,connect,read,write等 this.processSelectedKeys(); } finally { //2.处理task事件 //添加到taskQueue中的任务 this.runAllTasks(); } } else {                            ...... } } catch (Throwable var24) { handleLoopException(var24); } break; } ...... }    }             //1.处理网络io事件    private void processSelectedKeys() { if (this.selectedKeys != null) {            //处理优化后的SelectedKey,主要区别是遍历SelectedKey方式不同 this.processSelectedKeysOptimized(); } else {            //处理原生的 this.processSelectedKeysPlain(this.selector.selectedKeys()); }
}        //处理原生SelectedKeys private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { if (!selectedKeys.isEmpty()) { Iterator i = selectedKeys.iterator();
while(true) {                //遍历SK SelectionKey k = (SelectionKey)i.next();                //获取绑定的就绪附件 Object a = k.attachment(); i.remove(); if (a instanceof AbstractNioChannel) { //处理就绪的netty channel this.processSelectedKey(k, (AbstractNioChannel)a); } else { NioTask<SelectableChannel> task = (NioTask)a; processSelectedKey(k, task); }
if (!i.hasNext()) { break; }
                ..... }
} }        //按照事件类型进行分发    //K 通过该选择键分发任务    //ch 对应的通道附件 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { NioEventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable var6) { return; }
if (eventLoop == this && eventLoop != null) { unsafe.close(unsafe.voidPromise()); } } else { try { int readyOps = k.readyOps(); if ((readyOps & 8) != 0) { int ops = k.interestOps(); ops &= -9; k.interestOps(ops); unsafe.finishConnect(); }
if ((readyOps & 4) != 0) { ch.unsafe().forceFlush(); }
if ((readyOps & 17) != 0 || readyOps == 0) {                    //处理read就绪事件 unsafe.read(); } } catch (CancelledKeyException var7) { unsafe.close(unsafe.voidPromise()); }
} }          //2.处理taskQueue队列中的任务(这种处理方式放在下篇单独分享) protected boolean runAllTasks() { assert this.inEventLoop();
boolean ranAtLeastOne = false;
boolean fetchedAll; do { fetchedAll = this.fetchFromScheduledTaskQueue(); if (this.runAllTasksFrom(this.taskQueue)) { ranAtLeastOne = true; } } while(!fetchedAll);
if (ranAtLeastOne) { this.lastExecutionTime = ScheduledFutureTask.nanoTime(); }
this.afterRunningAllTasks(); return ranAtLeastOne; }    }



最后看下,netty分发IO事件后,对应的:unsafe.read();

public abstract class AbstractNioByteChannel extends AbstractNioChannel {
protected class NioByteUnsafe extends AbstractNioUnsafe { protected NioByteUnsafe() { super(AbstractNioByteChannel.this); }         public final void read() {            //读取通道配置 ChannelConfig config = AbstractNioByteChannel.this.config(); if (AbstractNioByteChannel.this.shouldBreakReadReady(config)) { AbstractNioByteChannel.this.clearReadPending(); } else { ChannelPipeline pipeline = AbstractNioByteChannel.this.pipeline(); ByteBufAllocator allocator = config.getAllocator(); Handle allocHandle = this.recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false;
try { do {                        //分类缓存区大小 byteBuf = allocHandle.allocate(allocator);                        //从通道中读取数据到byteBuf缓冲区 allocHandle.lastBytesRead(AbstractNioByteChannel.this.doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { AbstractNioByteChannel.this.readPending = false; } break; }
allocHandle.incMessagesRead(1); AbstractNioByteChannel.this.readPending = false;                        //通知pipeline流水线对应的业务处理器进行处理 pipeline.fireChannelRead(byteBuf);                        byteBuf = null;                            //最后判断是否继续                        //不能超过最大读取次数                        //缓冲区每次都要装满                    } while(allocHandle.continueReading());
allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { this.closeOnRead(pipeline); } } catch (Throwable var11) { this.handleReadException(pipeline, byteBuf, var11, close, allocHandle); } finally { if (!AbstractNioByteChannel.this.readPending && !config.isAutoRead()) { this.removeReadOp(); }
}
}        }   } 
}


以上就是netty的NioEventLoop的分发事件过程。


最后:

   netty的一个Reactor模型,对应一个EventLoop线程,其内部维护了一个Selector处理IO事件,一个taskQueue处理内部任务(这种内部任务的处理方式单独放在下一篇分享),从源码可以看出,他们的处理方式是串行执行的,不存在安全问题,这种无锁化设计,大大的提高了其性能。



从源码角度分析netty的Reactor反应器模式
从源码角度分析netty的Reactor反应器模式


从源码角度分析netty的Reactor反应器模式 从源码角度分析netty的Reactor反应器模式


从源码角度分析netty的Reactor反应器模式