Netty核心源码分析(一)
前言:分析Netty服务端的核心源码,主要从下面三个角度来解析。
1.服务端的启动流程。
2.接受请求流程。
3.监听读事件。
(源码是从下载git上netty源码分支4.1,启动类为example/echo/EchoServer)
由于上面三点篇幅较长,所以拆成两篇,本文主要解析服务端的启动流程。
本篇Netty服务端的启动流程,目录如下:
1.介绍主从Reactor多线程模型。
2.创建NioEventLoopGroup。
3.ServerBootstrap初始化配置阶段。
4.启动服务。
介绍Netty的线程模型
开始介绍Netty的源码前,首先来了解下其线程模型,以便更好的来解读源码。
Netty是在基于主从Reactor多线程模型做的一定的改进,比如主从Reactor都可以是多个。
什么是主从Reactor多线程模型呢?
我们知道最简单的nio,单线程,开启一个selector多路复用器,负责监听accept事件&IO读写事件以及业务处理,只要任意一步卡住,都会影响其他的动作,吞吐量完全不行。
那么将上面的三快都拆分到不同的线程来处理,比如Reactor主线程(独立的selector)只负责监听accept,创建通道后直接丢给Reactor子线程,这样即使read&write阻塞,也不会影响Reactor主线程的接收请求。
Reactor子线程负责监听通道的read&write事件,业务处理丢给业务线程池来处理,即使某个业务特别耗时,也不影响Reactor子线程来读写通道数据。
同时Reactor主线程和Reactor子线程都可以是多个,横向扩展其接受请求和读写的能力。
接下来,我们来看服务端的启动源码。
创建NioEventLoopGroup
1EventLoopGroup bossGroup = new NioEventLoopGroup(1);
2EventLoopGroup workerGroup = new NioEventLoopGroup();
首先来分析这两行代码:看结构图
boosGroup即为Reactor主线程组,workerGroup为Reactor子线程组;每个NioEventLoopGroup中都由NioEventLoop组成(数量可指定,不指定时是核数*2)。
MultithreadEventLoopGroup中指定NioEventLoop的数量。🔽
1 static {
2 DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
3 "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
4
5 if (logger.isDebugEnabled()) {
6 logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
7 }
8 }
9
10 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
11 super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
12 }
MultithreadEventExecutorGroup中按照指定的线程数创建NioEventLoop对象。🔽
下面这段代码主要来看三个组件:executor,chooser,eventExecutor。
1 protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
2 EventExecutorChooserFactory chooserFactory, Object... args) {
3 if (nThreads <= 0) {
4 throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
5 }
6
7 if (executor == null) {// 👇①
8 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
9 }
10
11 children = new EventExecutor[nThreads];
12
13 for (int i = 0; i < nThreads; i ++) {
14 boolean success = false;
15 try {// 👇② 创建NioEventLoop
16 children[i] = newChild(executor, args);
17 success = true;
18 } catch (Exception e) {
19 // TODO: Think about if this is a good exception type
20 throw new IllegalStateException("failed to create a child event loop", e);
21 } finally {
22 // 省略部分代码
23 }
24 }
25 // 👇③
26 chooser = chooserFactory.newChooser(children);
27
28 }
👉①exector(ThreadPerTaskExecutor)是每个NioEventLoop开启线程的执行器,通过持有的线程工厂创建线程来执行任务。
1 if (executor == null) {
2 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
3 }
4 @Override
5 public void execute(Runnable command) {
6 threadFactory.newThread(command).start();
7 }
👉③chooser选择器,NioEventLoopGroup中有多个NioEventLoop,那么任务是通过哪个NioEventLoop来处理的呢,就是通过chooser选择器来决定的,目前的方案就是数组下标加1来轮询。
👉②children为EventExecutor数组。每个children[i]都是一个NioEventLoop对象。来看下EventExecutor的结构。
创建NioEventLoop的源码如下(在构造NioEventLoop就已经通过原生JDK提供的方法开启了多路选择器selector):
1 @Override
2 protected EventLoop newChild(Executor executor, Object... args) throws Exception {
3 return new NioEventLoop(this, executor, (SelectorProvider) args[0],
4 ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
5 }
6 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
7 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
8 super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
9 if (selectorProvider == null) {
10 throw new NullPointerException("selectorProvider");
11 }
12 if (strategy == null) {
13 throw new NullPointerException("selectStrategy");
14 }
15 provider = selectorProvider;
16 final SelectorTuple selectorTuple = openSelector();
17 selector = selectorTuple.selector;
18 unwrappedSelector = selectorTuple.unwrappedSelector;
19 selectStrategy = strategy;
20 }
总结,到目前,代码做了哪些事情:
创建两个NioEventLoopGroup,一个为bossGroup,一个为workerGroup。
每个NioEventLoopGroup中都有一个executor执行器(用于给其中的NioEventLoop创建线程执行用),chooser用于决定使用哪个来NioEventLoop来执行。
假设我们的机器是4核的,那么bossGroup中的NioEventLoop为一个,workerGroup中的NioEventLoop为8个。
每个NioEventLoop中都会开启一个多路选择器selector。
ServerBootstrap初始化配置阶段
1final EchoServerHandler serverHandler = new EchoServerHandler();
2 try {
3 ServerBootstrap b = new ServerBootstrap();
4 b.group(bossGroup, workerGroup)
5 .channel(NioServerSocketChannel.class)
6 .option(ChannelOption.SO_BACKLOG, 100)
7 .handler(new LoggingHandler(LogLevel.INFO))
8 .childHandler(new ChannelInitializer<SocketChannel>() {
9 @Override
10 public void initChannel(SocketChannel ch) throws Exception {
11 ChannelPipeline p = ch.pipeline();
12 if (sslCtx != null) {
13 p.addLast(sslCtx.newHandler(ch.alloc()));
14 }
15 //p.addLast(new LoggingHandler(LogLevel.INFO));
16 p.addLast(serverHandler);
17 }
18 });
ServerBootstrap为服务端的启动类(客户端使用Bootstrap),链式设置参数如下:
.group()指定bossGroup为group,workerGroup为childGroup。
.channnel()指定channelClass为NioServerSocketChannel。(点进去,设置了channelFactory为指定类的ReflectiveChannelFactory,最终是用于在将JDK原生的通道ServerSocketChannel封装成指定类型)。
.option()指定网络参数(原生JDK的socket网络连接相关参数)。
.handler()给boosGroup设置的ChannelHandler处理器。(后续在构建NioServerSocketChannel通道时,会加入到pipeline中双向链表中)。
.childHandler()给childGroup的ChannelHandler处理器。(后续在构建NioSocketChannel通道时,会加入到pipeline中的双向链表中)。
启动服务
1ChannelFuture f = b.bind(PORT).sync();
上面绑定的代码点进去,主要分为两个步骤:
1.初始化注册通道(final ChannelFuture regFuture = initAndRegister();)
2.进行绑定(doBind0 (regFuture, channel, localAddress, promise);)
1.初始化注册通道阶段-initAndRegister
分为三个步骤:
1.1 channelFactory.newChannel();
ServerBootstrap初始化配置阶段,已经指定了channelFactory为ReflectiveChannelFactory。所以这里会通过反射构造NioServerSocketChannel对象。
1public NioServerSocketChannel() {
2 // DEFAULT_SELECTOR_PROVIDER为JDK提供的selector的生成器
3 // 通过原生JDK创建ServerSocketChannel
4 this(newSocket(DEFAULT_SELECTOR_PROVIDER));
5 }
6
7public NioServerSocketChannel(ServerSocketChannel channel) {
8 // 指定了感兴趣选择器感兴趣的监听事件accept
9 super(null, channel, SelectionKey.OP_ACCEPT);
10 config = new NioServerSocketChannelConfig(this, javaChannel().socket());
11 }
12 protected AbstractChannel(Channel parent) {
13 this.parent = parent;
14 // 生成id
15 id = newId();
16 // 此对象用于通道就绪后读通道时候的处理;
17 // 对于NioServerSocketChannel而言,为NioMessageUnsafe
18 // 对于NioSocketChannel而言,为NioSocketChannelUnsafe
19 unsafe = newUnsafe();
20 // 生成默认的DefaultChannelPipeline
21 pipeline = newChannelPipeline();
22 }
23 // 默认的pipeline,里面有一个双向链表,初期就两个元素 tailContext-->headConext
24 protected DefaultChannelPipeline(Channel channel) {
25 this.channel = ObjectUtil.checkNotNull(channel, "channel");
26 succeededFuture = new SucceededChannelFuture(channel, null);
27 voidPromise = new VoidChannelPromise(channel, true);
28
29 tail = new TailContext(this);
30 head = new HeadContext(this);
31
32 head.next = tail;
33 tail.prev = head;
34 }
源码解读:
通过原生JDK的SelectorProvider开启服务端通道ServerSocketChannel。
将原生的服务端通道对象封装成NioServerSocketChannel。
创建用于处理通道就绪的对象unsafe。
创建默认的pipeline,初期pipeline中有双向链表tailContext-->headContext。
由上可知,pipeline和channel是一一对应的关系,通过channel可以获取到pipeline,从pipeline中也有获取到channel;pipeline中构建了一个双向链表,目前还看不出干什么用的。
1.2 初始化(init(channel);)
1void init(Channel channel) throws Exception {
2 // 省略部分代码是(将ServerBootstrap设置的options参数设置到channel上)
3 p.addLast(new ChannelInitializer<Channel>() {
4 @Override
5 public void initChannel(final Channel ch) throws Exception {
6 final ChannelPipeline pipeline = ch.pipeline();
7 ChannelHandler handler = config.handler();
8 if (handler != null) {
9 pipeline.addLast(handler);
10 }
11
12 ch.eventLoop().execute(new Runnable() {
13 @Override
14 public void run() {
15 pipeline.addLast(new ServerBootstrapAcceptor(
16 ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
17 }
18 });
19 }
20 });
21}
源码解读:
主要来看addLast方法做了什么事情,点进去。
1.发现将ChannelInitializer构造成DefaultChannelHandlerContext上下文对象,然后将其加入到pipeline的双向链表中,注意总是从tail.pre的节点开始向后添加。那么现在pipeline的双向链表中的元素是:
【headContext <--> ChannelInitializer实现类上下文 <--> tailContext】
2.同时初始化了PendingHandlerAddedTask任务,其中的内容就是👆的ChannelInitializer实现类上下文。
1.3 config().group().register(channel);
1public ChannelFuture register(Channel channel) {
2 return next().register(channel);
3}
4
5@Override
6public EventExecutor next() {
7 return chooser.next();
8}
9private void register0(ChannelPromise promise) {
10 try {
11 // 👇
12 doRegister();
13 neverRegistered = false;
14 registered = true;
15 // 👇
16 pipeline.invokeHandlerAddedIfNeeded();
17
18 safeSetSuccess(promise);
19 // 👇 触发pipeline中的双向链表的handler的channelRegistered
20 // 从head开始向后轮询,只抽出满足条件(必须是ChannelInboundHandler的实现类)的handler
21 pipeline.fireChannelRegistered();
22 // 省略部分代码。。。
23 } catch (Throwable t) {
24 // Close the channel directly to avoid FD leak.
25 closeForcibly();
26 closeFuture.setClosed();
27 safeSetFailure(promise, t);
28 }
29}
源码解读:
问:通道注册到哪个NioEventLoop上?
答:我们知道group指的是NioEventLoopGroup,是NioEventLoop(每个nioEventLoop在创建的时候都开启了多路选择器selector)的集合,那么通道到底注册到哪个selector呢?
还记得(创建NioEventLoopGroup阶段)构建的chooser选择器吗?就是通过选择器来选定一个NioEventLoop的,基本的策略就是通过下标+1轮询一个。
问:doRegister()中做了什么?
答:点进去发现,是将NioServerSocketChannel中原生的通道注册到选定的NioEventLoop上原生的选择器selector上。
问:pipeline.invokeHandlerAddedIfNeeded做了啥是?
答:主要是执行PendingHandlerAddedTask任务,也就是其中chanelHandlerContext上下文的通道处理器的handlerAdded方法,也就是(ChannelInitializer)中的handlerAdded方法。🔽
1 @Override
2 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
3 if (ctx.channel().isRegistered()) {
4 initChannel(ctx);
5 }
6 }
7
8 @SuppressWarnings("unchecked")
9 private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
10 if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
11 try {
12 initChannel((C) ctx.channel());
13 } catch (Throwable cause) {
14 exceptionCaught(ctx, cause);
15 } finally {
16 remove(ctx);
17 }
18 return true;
19 }
20 return false;
21 }
执行ChannelInitializer的initChannel方法,执行完毕后,将自身从pipeline中删除。回顾initChannel中做了什么(1.将ServerBootstrap中配置的handler追击到pipeline中,2.将ServerBootstrapAcceptor追加到pipeline中)。
此时pipeline中的双向链表中的内容变成了:
【tailContext <--> LoggingHandler ==> ServerBootstrapAcceptor <--> tailContext】
问:( pipeline.fireChannelRegistered();)做了什么?
答:从pipeline中的双向链表中的head节点开始,轮询满足条件的handler,执行其channelRegistered方法。条件:必须是ChannelInboundHandler的实现类,即必须是inbound的ChannelHandler。
现在来回顾下initAndRegister的整体流程:
2.进行绑定阶段-doBind0
1private static void doBind0(
2 final ChannelFuture regFuture, final Channel channel,
3 final SocketAddress localAddress, final ChannelPromise promise) {
4
5 channel.eventLoop().execute(new Runnable() {
6 @Override
7 public void run() {
8 if (regFuture.isSuccess()) {
9 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
10 } else {
11 promise.setFailure(regFuture.cause());
12 }
13 }
14 });
15}
源码解读:
将绑定的任务交给了channel注册的selector所属的NioEventLoop。
execute里面做了什么事情呢?
如果NioEventLoop线程还没有开启,则开启线程,然后将任务加入到任务队列中。
问:任务做了啥?
1@Override
2public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
3
4 boolean wasActive = isActive();
5 try {
6 // 👇NioServerSocketChannel的doBind方法
7 doBind(localAddress);
8 } catch (Throwable t) {
9 safeSetFailure(promise, t);
10 closeIfClosed();
11 return;
12 }
13
14 if (!wasActive && isActive()) {
15 invokeLater(new Runnable() {
16 @Override
17 public void run() {
18 // 👇 最终会到head的抽出inbound的channelHandler,触发其channelActive方法,
19 pipeline.fireChannelActive();
20 }
21 });
22 }
23
24 safeSetSuccess(promise);
25}
26 @Override
27 protected void doBind(SocketAddress localAddress) throws Exception {
28 // 将监听地址绑定到通道上
29 if (PlatformDependent.javaVersion() >= 7) {
30 javaChannel().bind(localAddress, config.getBacklog());
31 } else {
32 javaChannel().socket().bind(localAddress, config.getBacklog());
33 }
34 }
1 @Override
2 protected void doBeginRead() throws Exception {
3 // Channel.read() or ChannelHandlerContext.read() was called
4 final SelectionKey selectionKey = this.selectionKey;
5 if (!selectionKey.isValid()) {
6 return;
7 }
8
9 readPending = true;
10
11 final int interestOps = selectionKey.interestOps();
12 if ((interestOps & readInterestOp) == 0) {
13 selectionKey.interestOps(interestOps | readInterestOp);
14 }
15 }
问:如何开启线程?
答:开启线程:在(创建NioEventLoopGroup)阶段,创建了executor,通过默认线程工厂创建一个线程。
问:开启的线程里面做了哪些事情?
答:线程里面做了啥事?看NioEventLoop的run方法。
1.等待Nio事件 -- select()
2.处理Nio事件 -- processSelectedKeys()⭐️关系到后续要说明的接收请求和监听读事件的处理
3.处理NioEventLoop的任务队列中的任务 -- runAllTasks
现在,来回顾下绑定的整体流程。
至此,服务端的启动的流程已经走完。
以上。