从源码角度分析netty的Reactor反应器模式
回顾下经典的反应器模式:
其分为三步:注册,轮询,分发。
接下来看一下netty的反应器模式
bossLoopGroup线程组:
workerLoopGroup线程组:
不管是BossLoopGroup,还是WorkerLoopGroup。可以看出其实netty的反应器模式的实现就是通过NioEventLoop实现的。接下来分析一下NioEventLoop。
搞清NioEventLoop之前,需要弄清以下两个问题:
1.NioEventLoop和原生的Selector之前的关系?
从上图中的对比可以发现,netty的NioEventLoop在实现其反应器模式时,充当了原生的Selector角色。
从类的继承关系看:
可以看出其基类SingleThreadEventExecutor包含一个Thread,其自己包含一个原生的Selecor。所以他们之间的关系可以是包含于被包含之前的关系。“带有一个线程的Selector”。
2.netty的channel和原生channel之间的关系?
以netty其中的一种channel,NioSocketChannel为例,来看看其类的继承关系:
可以看出,netty的channel封装了原生的channel。
那么知道了其关系后,请试着回答一下以下问题:
netty的Reactor的反应器线程在哪?
netty的Reactor的反应器线程在什么时候启动的?
答案:
1.netty的反应器线程在NioEventLoop的基类SingleThreadEventExecutor中包含了Thread。
2.在中可知,反应器线程会轮询Selector,其启动一般是channel注册在Selector之前。netty也一样,其时机是在SocketChannel注册到EventLoop的Selector之前。
下面将从源码角度分析netty的反应器模式,如果不是太明白反应器模式,建议先完全弄明白这种模式,可以参考上一篇: 。
一、从源码角度分析注册:
源码调用链:
使用的netty版本:4.1.39.final,其他版本源码可能不一样,但原理和大致流程是一致的。
其注册过程在第6步,AbstractNioChannel.doRegister()中:
public abstract class AbstractNioChannel extends AbstractChannel {
.....
//原生的Channel(通过构造函数进行初始化)
private final SelectableChannel ch;
//选择键
volatile SelectionKey selectionKey;
protected final int readInterestOp;
//初始化
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
//初始化原生的channel
this.ch = ch;
this.readInterestOp = readInterestOp;
.....
}
.....
protected void doRegister() throws Exception {
boolean selected = false;
while(true) {
try {
//1.调用javaChanel()方法获取一个原生的channel
//2.然后将channel注册到EventLoop关联的Selector上
//3.将this与channel绑定,返回绑定好的选择键
//this就绪的channel实例,一种附件
//选择键可以理解为一种绑定关系,通过就绪的this实例,拿到对应的底层channel
this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException var3) {
if (selected) {
throw var3;
}
this.eventLoop().selectNow();
selected = true;
}
}
}
//获取原生的channel
protected SelectableChannel javaChannel() {
return this.ch;
}
}
二、从源码角度分析轮询:
在上面的问题中,我们类比出了netty的反应器线程的执行时机是在:SockectChannel注册到EventLoop的Selector之前,也就是注册之前。
源码:4.AbstractChannel.AbstractUnsafe.register
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
//由调用方传入
private volatile EventLoop eventLoop;
.....
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
//线程已启动,进行注册流程
this.register0(promise);
} else {
//线程未启动
try {
eventLoop.execute(new Runnable() {
//提交一个注册流程任务,让这个任务去完成注册流程
public void run() {
AbstractUnsafe.this.register0(promise);
}
});
} catch (Throwable var4) {
AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);
this.closeForcibly();
AbstractChannel.this.closeFuture.setClosed();
this.safeSetFailure(promise, var4);
}
}
}
}
从上面的源码我们可以看到,注册之前的这一部分源码并没有实现Reactor反应器线程的启动,那么它是在哪启动的呢?轮询又是在哪完成的呢?在前面第一个问题时,我们知道netty的反应器线程在NioEventLoop的基类SingleThreadEventExecutor中,也刚好是其上一步的调用中,这个eventLoop也就是NioEventLoop。
源码:3.SingleThreadEventLoop.register
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
.....
}
//其父类
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
//任务队列
private final Queue<Runnable> taskQueue;
//父类中维护了一个线程,由子类启动
private volatile Thread thread;
private final Executor executor;
.....
//启动线程执行,执行run()
private void startThread() {
if (this.state == 1 && STATE_UPDATER.compareAndSet(this, 1, 2)) {
boolean success = false;
try {
this.doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, 2, 1);
}
}
}
}
private void doStartThread() {
assert this.thread == null;
this.executor.execute(new Runnable() {
public void run() {
SingleThreadEventExecutor.this.thread = Thread.currentThread();
if (SingleThreadEventExecutor.this.interrupted) {
SingleThreadEventExecutor.this.thread.interrupt();
}
boolean success = false;
SingleThreadEventExecutor.this.updateLastExecutionTime();
boolean var112 = false;
int oldState;
label1907: {
try {
var112 = true;
//启动线程池中的线程
SingleThreadEventExecutor.this.run();
success = true;
var112 = false;
break label1907;
} catch (Throwable var119) {
SingleThreadEventExecutor.logger.warn("Unexpected exception from an event executor: ", var119);
var112 = false;
} finally {
.....
}
......
});
}
//完成轮询工作
protected abstract void run();
}
//实现类(由它完成轮询工作)
public final class NioEventLoop extends SingleThreadEventLoop {
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;
protected void run() {
//轮询
while(true) {
while(true) {
try {
....
this.cancelledKeys = 0;
this.needsToSelectAgain = false;
//网络IO事件执行时间和task任务执行时间的比例,默认为50
int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
//处理网络IO事件
//SK选择中ready事件,如:accept,connect,read,write等
this.processSelectedKeys();
} finally {
//处理task事件
//添加到taskQueue中的任务
this.runAllTasks();
}
} else {
//重新计算ioRatio,轮询
long ioStartTime = System.nanoTime();
boolean var14 = false;
try {
var14 = true;
this.processSelectedKeys();
var14 = false;
} finally {
if (var14) {
long ioTime = System.nanoTime() - ioStartTime;
this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);
}
}
long ioTime = System.nanoTime() - ioStartTime;
this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);
}
} catch (Throwable var24) {
handleLoopException(var24);
}
break;
}
//检测线程是否停,不然一直轮询
try {
if (this.isShuttingDown()) {
this.closeAll();
if (this.confirmShutdown()) {
return;
}
}
} catch (Throwable var20) {
handleLoopException(var20);
}
}
}
}
上面启动Reactor反应器线程的线程池是SingleThreadEventExecutor,在调用的源码第二步:2.MultithreadEventLoopGroup.register 完成线程池的初始化
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
.....
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
.....
}
//其父类
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
private final AtomicInteger terminatedChildren;
private final Promise<?> terminationFuture;
private final EventExecutorChooser chooser;
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, (Executor)(threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory)), args);
}
.....
}
主要注意的是其线程池用到的线程工厂用的ThreadPerTaskExecutor此工厂主要特点是执行一个线程,创建一个线程池。所以整体上netty的Reactor反应器线程是单线程的。
三、从源码角度分析分发:
回到轮询时的源码:
//实现类(由它完成轮询工作)
public final class NioEventLoop extends SingleThreadEventLoop {
private Selector selector;
private Selector unwrappedSelector;
//IO事件集合,分发时需要使用
private SelectedSelectionKeySet selectedKeys;
protected void run() {
//轮询
while(true) {
while(true) {
try {
....
this.cancelledKeys = 0;
this.needsToSelectAgain = false;
//网络IO事件执行时间和task任务执行时间的比例,默认为50
int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
//1.处理网络IO事件
//SK选择中ready事件,如:accept,connect,read,write等
this.processSelectedKeys();
} finally {
//2.处理task事件
//添加到taskQueue中的任务
this.runAllTasks();
}
} else {
......
}
} catch (Throwable var24) {
handleLoopException(var24);
}
break;
}
......
}
}
//1.处理网络io事件
private void processSelectedKeys() {
if (this.selectedKeys != null) {
//处理优化后的SelectedKey,主要区别是遍历SelectedKey方式不同
this.processSelectedKeysOptimized();
} else {
//处理原生的
this.processSelectedKeysPlain(this.selector.selectedKeys());
}
}
//处理原生SelectedKeys
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
if (!selectedKeys.isEmpty()) {
Iterator i = selectedKeys.iterator();
while(true) {
//遍历SK
SelectionKey k = (SelectionKey)i.next();
//获取绑定的就绪附件
Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
//处理就绪的netty channel
this.processSelectedKey(k, (AbstractNioChannel)a);
} else {
NioTask<SelectableChannel> task = (NioTask)a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
.....
}
}
}
//按照事件类型进行分发
//K 通过该选择键分发任务
//ch 对应的通道附件
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
NioEventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable var6) {
return;
}
if (eventLoop == this && eventLoop != null) {
unsafe.close(unsafe.voidPromise());
}
} else {
try {
int readyOps = k.readyOps();
if ((readyOps & 8) != 0) {
int ops = k.interestOps();
ops &= -9;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & 4) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & 17) != 0 || readyOps == 0) {
//处理read就绪事件
unsafe.read();
}
} catch (CancelledKeyException var7) {
unsafe.close(unsafe.voidPromise());
}
}
}
//2.处理taskQueue队列中的任务(这种处理方式放在下篇单独分享)
protected boolean runAllTasks() {
assert this.inEventLoop();
boolean ranAtLeastOne = false;
boolean fetchedAll;
do {
fetchedAll = this.fetchFromScheduledTaskQueue();
if (this.runAllTasksFrom(this.taskQueue)) {
ranAtLeastOne = true;
}
} while(!fetchedAll);
if (ranAtLeastOne) {
this.lastExecutionTime = ScheduledFutureTask.nanoTime();
}
this.afterRunningAllTasks();
return ranAtLeastOne;
}
}
最后看下,netty分发IO事件后,对应的:unsafe.read();
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
protected class NioByteUnsafe extends AbstractNioUnsafe {
protected NioByteUnsafe() {
super(AbstractNioByteChannel.this);
}
public final void read() {
//读取通道配置
ChannelConfig config = AbstractNioByteChannel.this.config();
if (AbstractNioByteChannel.this.shouldBreakReadReady(config)) {
AbstractNioByteChannel.this.clearReadPending();
} else {
ChannelPipeline pipeline = AbstractNioByteChannel.this.pipeline();
ByteBufAllocator allocator = config.getAllocator();
Handle allocHandle = this.recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
//分类缓存区大小
byteBuf = allocHandle.allocate(allocator);
//从通道中读取数据到byteBuf缓冲区
allocHandle.lastBytesRead(AbstractNioByteChannel.this.doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
AbstractNioByteChannel.this.readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
AbstractNioByteChannel.this.readPending = false;
//通知pipeline流水线对应的业务处理器进行处理
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
//最后判断是否继续
//不能超过最大读取次数
//缓冲区每次都要装满
} while(allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
this.closeOnRead(pipeline);
}
} catch (Throwable var11) {
this.handleReadException(pipeline, byteBuf, var11, close, allocHandle);
} finally {
if (!AbstractNioByteChannel.this.readPending && !config.isAutoRead()) {
this.removeReadOp();
}
}
}
}
}
}
以上就是netty的NioEventLoop的分发事件过程。
最后:
netty的一个Reactor模型,对应一个EventLoop线程,其内部维护了一个Selector处理IO事件,一个taskQueue处理内部任务(这种内部任务的处理方式单独放在下一篇分享),从源码可以看出,他们的处理方式是串行执行的,不存在安全问题,这种无锁化设计,大大的提高了其性能。