从源码角度分析netty的Reactor反应器模式
回顾下经典的反应器模式:
其分为三步:注册,轮询,分发。
接下来看一下netty的反应器模式
bossLoopGroup线程组:
workerLoopGroup线程组:
不管是BossLoopGroup,还是WorkerLoopGroup。可以看出其实netty的反应器模式的实现就是通过NioEventLoop实现的。接下来分析一下NioEventLoop。
搞清NioEventLoop之前,需要弄清以下两个问题:
1.NioEventLoop和原生的Selector之前的关系?
从上图中的对比可以发现,netty的NioEventLoop在实现其反应器模式时,充当了原生的Selector角色。
从类的继承关系看:
可以看出其基类SingleThreadEventExecutor包含一个Thread,其自己包含一个原生的Selecor。所以他们之间的关系可以是包含于被包含之前的关系。“带有一个线程的Selector”。
2.netty的channel和原生channel之间的关系?
以netty其中的一种channel,NioSocketChannel为例,来看看其类的继承关系:
可以看出,netty的channel封装了原生的channel。
那么知道了其关系后,请试着回答一下以下问题:
netty的Reactor的反应器线程在哪?
netty的Reactor的反应器线程在什么时候启动的?
答案:
1.netty的反应器线程在NioEventLoop的基类SingleThreadEventExecutor中包含了Thread。
2.在中可知,反应器线程会轮询Selector,其启动一般是channel注册在Selector之前。netty也一样,其时机是在SocketChannel注册到EventLoop的Selector之前。
下面将从源码角度分析netty的反应器模式,如果不是太明白反应器模式,建议先完全弄明白这种模式,可以参考上一篇: 。
一、从源码角度分析注册:
源码调用链:
使用的netty版本:4.1.39.final,其他版本源码可能不一样,但原理和大致流程是一致的。
其注册过程在第6步,AbstractNioChannel.doRegister()中:
public abstract class AbstractNioChannel extends AbstractChannel {.....//原生的Channel(通过构造函数进行初始化)private final SelectableChannel ch;//选择键volatile SelectionKey selectionKey;protected final int readInterestOp;//初始化protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);//初始化原生的channelthis.ch = ch;this.readInterestOp = readInterestOp;.....}.....protected void doRegister() throws Exception {boolean selected = false;while(true) {try {//1.调用javaChanel()方法获取一个原生的channel//2.然后将channel注册到EventLoop关联的Selector上//3.将this与channel绑定,返回绑定好的选择键//this就绪的channel实例,一种附件//选择键可以理解为一种绑定关系,通过就绪的this实例,拿到对应的底层channelthis.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException var3) {if (selected) {throw var3;}this.eventLoop().selectNow();selected = true;}}}//获取原生的channelprotected SelectableChannel javaChannel() {return this.ch;}}
二、从源码角度分析轮询:
在上面的问题中,我们类比出了netty的反应器线程的执行时机是在:SockectChannel注册到EventLoop的Selector之前,也就是注册之前。
源码:4.AbstractChannel.AbstractUnsafe.register
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {//由调用方传入private volatile EventLoop eventLoop;.....public final void register(EventLoop eventLoop, final ChannelPromise promise) {AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {//线程已启动,进行注册流程this.register0(promise);} else {//线程未启动try {eventLoop.execute(new Runnable() {//提交一个注册流程任务,让这个任务去完成注册流程public void run() {AbstractUnsafe.this.register0(promise);}});} catch (Throwable var4) {AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);this.closeForcibly();AbstractChannel.this.closeFuture.setClosed();this.safeSetFailure(promise, var4);}}}}
从上面的源码我们可以看到,注册之前的这一部分源码并没有实现Reactor反应器线程的启动,那么它是在哪启动的呢?轮询又是在哪完成的呢?在前面第一个问题时,我们知道netty的反应器线程在NioEventLoop的基类SingleThreadEventExecutor中,也刚好是其上一步的调用中,这个eventLoop也就是NioEventLoop。
源码:3.SingleThreadEventLoop.register
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {.....}//其父类public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {//任务队列private final Queue<Runnable> taskQueue;//父类中维护了一个线程,由子类启动private volatile Thread thread;private final Executor executor;.....//启动线程执行,执行run()private void startThread() {if (this.state == 1 && STATE_UPDATER.compareAndSet(this, 1, 2)) {boolean success = false;try {this.doStartThread();success = true;} finally {if (!success) {STATE_UPDATER.compareAndSet(this, 2, 1);}}}}private void doStartThread() {assert this.thread == null;this.executor.execute(new Runnable() {public void run() {SingleThreadEventExecutor.this.thread = Thread.currentThread();if (SingleThreadEventExecutor.this.interrupted) {SingleThreadEventExecutor.this.thread.interrupt();}boolean success = false;SingleThreadEventExecutor.this.updateLastExecutionTime();boolean var112 = false;int oldState;label1907: {try {var112 = true;//启动线程池中的线程SingleThreadEventExecutor.this.run();success = true;var112 = false;break label1907;} catch (Throwable var119) {SingleThreadEventExecutor.logger.warn("Unexpected exception from an event executor: ", var119);var112 = false;} finally {.....}......});}//完成轮询工作protected abstract void run();}//实现类(由它完成轮询工作)public final class NioEventLoop extends SingleThreadEventLoop {private Selector selector;private Selector unwrappedSelector;private SelectedSelectionKeySet selectedKeys;protected void run() {//轮询while(true) {while(true) {try {....this.cancelledKeys = 0;this.needsToSelectAgain = false;//网络IO事件执行时间和task任务执行时间的比例,默认为50int ioRatio = this.ioRatio;if (ioRatio == 100) {try {//处理网络IO事件//SK选择中ready事件,如:accept,connect,read,write等this.processSelectedKeys();} finally {//处理task事件//添加到taskQueue中的任务this.runAllTasks();}} else {//重新计算ioRatio,轮询long ioStartTime = System.nanoTime();boolean var14 = false;try {var14 = true;this.processSelectedKeys();var14 = false;} finally {if (var14) {long ioTime = System.nanoTime() - ioStartTime;this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);}}long ioTime = System.nanoTime() - ioStartTime;this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);}} catch (Throwable var24) {handleLoopException(var24);}break;}//检测线程是否停,不然一直轮询try {if (this.isShuttingDown()) {this.closeAll();if (this.confirmShutdown()) {return;}}} catch (Throwable var20) {handleLoopException(var20);}}}}
上面启动Reactor反应器线程的线程池是SingleThreadEventExecutor,在调用的源码第二步:2.MultithreadEventLoopGroup.register 完成线程池的初始化
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {.....protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);}.....}//其父类public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {private final EventExecutor[] children;private final Set<EventExecutor> readonlyChildren;private final AtomicInteger terminatedChildren;private final Promise<?> terminationFuture;private final EventExecutorChooser chooser;protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {this(nThreads, (Executor)(threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory)), args);}.....}
主要注意的是其线程池用到的线程工厂用的ThreadPerTaskExecutor此工厂主要特点是执行一个线程,创建一个线程池。所以整体上netty的Reactor反应器线程是单线程的。
三、从源码角度分析分发:
回到轮询时的源码:
//实现类(由它完成轮询工作)public final class NioEventLoop extends SingleThreadEventLoop {private Selector selector;private Selector unwrappedSelector;//IO事件集合,分发时需要使用private SelectedSelectionKeySet selectedKeys;protected void run() {//轮询while(true) {while(true) {try {....this.cancelledKeys = 0;this.needsToSelectAgain = false;//网络IO事件执行时间和task任务执行时间的比例,默认为50int ioRatio = this.ioRatio;if (ioRatio == 100) {try {//1.处理网络IO事件//SK选择中ready事件,如:accept,connect,read,write等this.processSelectedKeys();} finally {//2.处理task事件//添加到taskQueue中的任务this.runAllTasks();}} else {......}} catch (Throwable var24) {handleLoopException(var24);}break;}......}}//1.处理网络io事件private void processSelectedKeys() {if (this.selectedKeys != null) {//处理优化后的SelectedKey,主要区别是遍历SelectedKey方式不同this.processSelectedKeysOptimized();} else {//处理原生的this.processSelectedKeysPlain(this.selector.selectedKeys());}}//处理原生SelectedKeysprivate void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {if (!selectedKeys.isEmpty()) {Iterator i = selectedKeys.iterator();while(true) {//遍历SKSelectionKey k = (SelectionKey)i.next();//获取绑定的就绪附件Object a = k.attachment();i.remove();if (a instanceof AbstractNioChannel) {//处理就绪的netty channelthis.processSelectedKey(k, (AbstractNioChannel)a);} else {NioTask<SelectableChannel> task = (NioTask)a;processSelectedKey(k, task);}if (!i.hasNext()) {break;}.....}}}//按照事件类型进行分发//K 通过该选择键分发任务//ch 对应的通道附件private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {NioEventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable var6) {return;}if (eventLoop == this && eventLoop != null) {unsafe.close(unsafe.voidPromise());}} else {try {int readyOps = k.readyOps();if ((readyOps & 8) != 0) {int ops = k.interestOps();ops &= -9;k.interestOps(ops);unsafe.finishConnect();}if ((readyOps & 4) != 0) {ch.unsafe().forceFlush();}if ((readyOps & 17) != 0 || readyOps == 0) {//处理read就绪事件unsafe.read();}} catch (CancelledKeyException var7) {unsafe.close(unsafe.voidPromise());}}}//2.处理taskQueue队列中的任务(这种处理方式放在下篇单独分享)protected boolean runAllTasks() {assert this.inEventLoop();boolean ranAtLeastOne = false;boolean fetchedAll;do {fetchedAll = this.fetchFromScheduledTaskQueue();if (this.runAllTasksFrom(this.taskQueue)) {ranAtLeastOne = true;}} while(!fetchedAll);if (ranAtLeastOne) {this.lastExecutionTime = ScheduledFutureTask.nanoTime();}this.afterRunningAllTasks();return ranAtLeastOne;}}
最后看下,netty分发IO事件后,对应的:unsafe.read();
public abstract class AbstractNioByteChannel extends AbstractNioChannel {protected class NioByteUnsafe extends AbstractNioUnsafe {protected NioByteUnsafe() {super(AbstractNioByteChannel.this);}public final void read() {//读取通道配置ChannelConfig config = AbstractNioByteChannel.this.config();if (AbstractNioByteChannel.this.shouldBreakReadReady(config)) {AbstractNioByteChannel.this.clearReadPending();} else {ChannelPipeline pipeline = AbstractNioByteChannel.this.pipeline();ByteBufAllocator allocator = config.getAllocator();Handle allocHandle = this.recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;try {do {//分类缓存区大小byteBuf = allocHandle.allocate(allocator);//从通道中读取数据到byteBuf缓冲区allocHandle.lastBytesRead(AbstractNioByteChannel.this.doReadBytes(byteBuf));if (allocHandle.lastBytesRead() <= 0) {byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;if (close) {AbstractNioByteChannel.this.readPending = false;}break;}allocHandle.incMessagesRead(1);AbstractNioByteChannel.this.readPending = false;//通知pipeline流水线对应的业务处理器进行处理pipeline.fireChannelRead(byteBuf);byteBuf = null;//最后判断是否继续//不能超过最大读取次数//缓冲区每次都要装满} while(allocHandle.continueReading());allocHandle.readComplete();pipeline.fireChannelReadComplete();if (close) {this.closeOnRead(pipeline);}} catch (Throwable var11) {this.handleReadException(pipeline, byteBuf, var11, close, allocHandle);} finally {if (!AbstractNioByteChannel.this.readPending && !config.isAutoRead()) {this.removeReadOp();}}}}}}
以上就是netty的NioEventLoop的分发事件过程。
最后:
netty的一个Reactor模型,对应一个EventLoop线程,其内部维护了一个Selector处理IO事件,一个taskQueue处理内部任务(这种内部任务的处理方式单独放在下一篇分享),从源码可以看出,他们的处理方式是串行执行的,不存在安全问题,这种无锁化设计,大大的提高了其性能。
