vlambda博客
学习文章列表

Netty核心源码分析(一)

前言:分析Netty服务端的核心源码,主要从下面三个角度来解析。
1.服务端的启动流程。
2.接受请求流程。
3.监听读事件。
(源码是从下载git上netty源码分支4.1,启动类为example/echo/EchoServer)
由于上面三点篇幅较长,所以拆成两篇,本文主要解析服务端的启动流程。

本篇Netty服务端的启动流程,目录如下:
1.介绍主从Reactor多线程模型。
2.创建NioEventLoopGroup。
3.ServerBootstrap初始化配置阶段。
4.启动服务。

介绍Netty的线程模型

开始介绍Netty的源码前,首先来了解下其线程模型,以便更好的来解读源码。

Netty是在基于主从Reactor多线程模型做的一定的改进,比如主从Reactor都可以是多个。
什么是主从Reactor多线程模型呢?
我们知道最简单的nio,单线程,开启一个selector多路复用器,负责监听accept事件&IO读写事件以及业务处理,只要任意一步卡住,都会影响其他的动作,吞吐量完全不行。
那么将上面的三快都拆分到不同的线程来处理,比如Reactor主线程(独立的selector)只负责监听accept,创建通道后直接丢给Reactor子线程,这样即使read&write阻塞,也不会影响Reactor主线程的接收请求。
Reactor子线程负责监听通道的read&write事件,业务处理丢给业务线程池来处理,即使某个业务特别耗时,也不影响Reactor子线程来读写通道数据。
同时Reactor主线程和Reactor子线程都可以是多个,横向扩展其接受请求和读写的能力。

接下来,我们来看服务端的启动源码。

创建NioEventLoopGroup

1EventLoopGroup bossGroup = new NioEventLoopGroup(1);
2EventLoopGroup workerGroup = new NioEventLoopGroup();

首先来分析这两行代码:看结构图

Netty核心源码分析(一)

boosGroup即为Reactor主线程组,workerGroup为Reactor子线程组;每个NioEventLoopGroup中都由NioEventLoop组成(数量可指定,不指定时是核数*2)。
MultithreadEventLoopGroup中指定NioEventLoop的数量。🔽

 1    static {
2        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
3                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
4
5        if (logger.isDebugEnabled()) {
6            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
7        }
8    }
9
10    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
11        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
12    }

MultithreadEventExecutorGroup中按照指定的线程数创建NioEventLoop对象。🔽
下面这段代码主要来看三个组件:executor,chooser,eventExecutor。

 1 protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
2                                            EventExecutorChooserFactory chooserFactory, Object... args)
 
{
3        if (nThreads <= 0) {
4            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
5        }
6
7        if (executor == null) {// 👇①
8            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
9        }
10
11        children = new EventExecutor[nThreads];
12
13        for (int i = 0; i < nThreads; i ++) {
14            boolean success = false;
15            try {// 👇② 创建NioEventLoop
16                children[i] = newChild(executor, args);
17                success = true;
18            } catch (Exception e) {
19                // TODO: Think about if this is a good exception type
20                throw new IllegalStateException("failed to create a child event loop", e);
21            } finally {
22               // 省略部分代码
23            }
24        }
25        // 👇③
26        chooser = chooserFactory.newChooser(children);
27
28    }

👉①exector(ThreadPerTaskExecutor)是每个NioEventLoop开启线程的执行器,通过持有的线程工厂创建线程来执行任务。

1    if (executor == null) {
2        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
3    }
4    @Override
5    public void execute(Runnable command) {
6        threadFactory.newThread(command).start();
7    }

👉③chooser选择器,NioEventLoopGroup中有多个NioEventLoop,那么任务是通过哪个NioEventLoop来处理的呢,就是通过chooser选择器来决定的,目前的方案就是数组下标加1来轮询。
👉②children为EventExecutor数组。每个children[i]都是一个NioEventLoop对象。来看下EventExecutor的结构。

Netty核心源码分析(一)

创建NioEventLoop的源码如下(在构造NioEventLoop就已经通过原生JDK提供的方法开启了多路选择器selector):

 1    @Override
2    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
3        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
4            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
5    }
6    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
7                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
8        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
9        if (selectorProvider == null) {
10            throw new NullPointerException("selectorProvider");
11        }
12        if (strategy == null) {
13            throw new NullPointerException("selectStrategy");
14        }
15        provider = selectorProvider;
16        final SelectorTuple selectorTuple = openSelector();
17        selector = selectorTuple.selector;
18        unwrappedSelector = selectorTuple.unwrappedSelector;
19        selectStrategy = strategy;
20    }    

总结,到目前,代码做了哪些事情:

Netty核心源码分析(一)

创建两个NioEventLoopGroup,一个为bossGroup,一个为workerGroup。
每个NioEventLoopGroup中都有一个executor执行器(用于给其中的NioEventLoop创建线程执行用),chooser用于决定使用哪个来NioEventLoop来执行。
假设我们的机器是4核的,那么bossGroup中的NioEventLoop为一个,workerGroup中的NioEventLoop为8个。
每个NioEventLoop中都会开启一个多路选择器selector。

ServerBootstrap初始化配置阶段

 1final EchoServerHandler serverHandler = new EchoServerHandler();
2        try {
3            ServerBootstrap b = new ServerBootstrap();
4            b.group(bossGroup, workerGroup)
5             .channel(NioServerSocketChannel.class)
6             .option(ChannelOption.SO_BACKLOG, 100)
7             .handler(new LoggingHandler(LogLevel.INFO))
8             .childHandler(new ChannelInitializer<SocketChannel>() {
9                 @Override
10                 public void initChannel(SocketChannel ch) throws Exception {
11                     ChannelPipeline p = ch.pipeline();
12                     if (sslCtx != null) {
13                         p.addLast(sslCtx.newHandler(ch.alloc()));
14                     }
15                     //p.addLast(new LoggingHandler(LogLevel.INFO));
16                     p.addLast(serverHandler);
17                 }
18             });

ServerBootstrap为服务端的启动类(客户端使用Bootstrap),链式设置参数如下:

  • .group()指定bossGroup为group,workerGroup为childGroup。

  • .channnel()指定channelClass为NioServerSocketChannel。(点进去,设置了channelFactory为指定类的ReflectiveChannelFactory,最终是用于在将JDK原生的通道ServerSocketChannel封装成指定类型)。

  • .option()指定网络参数(原生JDK的socket网络连接相关参数)。

  • .handler()给boosGroup设置的ChannelHandler处理器。(后续在构建NioServerSocketChannel通道时,会加入到pipeline中双向链表中)。

  • .childHandler()给childGroup的ChannelHandler处理器。(后续在构建NioSocketChannel通道时,会加入到pipeline中的双向链表中)。

启动服务

1ChannelFuture f = b.bind(PORT).sync();

上面绑定的代码点进去,主要分为两个步骤:
1.初始化注册通道(final ChannelFuture regFuture = initAndRegister();)
2.进行绑定(doBind0 (regFuture, channel, localAddress, promise);)

1.初始化注册通道阶段-initAndRegister

分为三个步骤:

1.1 channelFactory.newChannel();

ServerBootstrap初始化配置阶段,已经指定了channelFactory为ReflectiveChannelFactory。所以这里会通过反射构造NioServerSocketChannel对象。

 1public NioServerSocketChannel() {
2        // DEFAULT_SELECTOR_PROVIDER为JDK提供的selector的生成器
3        // 通过原生JDK创建ServerSocketChannel
4        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
5    }
6
7public NioServerSocketChannel(ServerSocketChannel channel{
8        // 指定了感兴趣选择器感兴趣的监听事件accept
9        super(null, channel, SelectionKey.OP_ACCEPT);
10        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
11    }
12    protected AbstractChannel(Channel parent{
13        this.parent = parent;
14        // 生成id
15        id = newId();
16        // 此对象用于通道就绪后读通道时候的处理;
17        // 对于NioServerSocketChannel而言,为NioMessageUnsafe
18        // 对于NioSocketChannel而言,为NioSocketChannelUnsafe
19        unsafe = newUnsafe();
20        // 生成默认的DefaultChannelPipeline
21        pipeline = newChannelPipeline();
22    }
23    // 默认的pipeline,里面有一个双向链表,初期就两个元素 tailContext-->headConext
24    protected DefaultChannelPipeline(Channel channel{
25        this.channel = ObjectUtil.checkNotNull(channel, "channel");
26        succeededFuture = new SucceededChannelFuture(channel, null);
27        voidPromise =  new VoidChannelPromise(channel, true);
28
29        tail = new TailContext(this);
30        head = new HeadContext(this);
31
32        head.next = tail;
33        tail.prev = head;
34    }

源码解读:

  • 通过原生JDK的SelectorProvider开启服务端通道ServerSocketChannel。

  • 将原生的服务端通道对象封装成NioServerSocketChannel。

  • 创建用于处理通道就绪的对象unsafe。

  • 创建默认的pipeline,初期pipeline中有双向链表tailContext-->headContext。

由上可知,pipeline和channel是一一对应的关系,通过channel可以获取到pipeline,从pipeline中也有获取到channel;pipeline中构建了一个双向链表,目前还看不出干什么用的。

1.2 初始化(init(channel);)
 1void init(Channel channel) throws Exception {
2    // 省略部分代码是(将ServerBootstrap设置的options参数设置到channel上)
3    p.addLast(new ChannelInitializer<Channel>() {
4        @Override
5        public void initChannel(final Channel ch) throws Exception {
6            final ChannelPipeline pipeline = ch.pipeline();
7            ChannelHandler handler = config.handler();
8            if (handler != null) {
9                pipeline.addLast(handler);
10            }
11
12            ch.eventLoop().execute(new Runnable() {
13                @Override
14                public void run() {
15                    pipeline.addLast(new ServerBootstrapAcceptor(
16                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
17                }
18            });
19        }
20    });
21}

源码解读:
主要来看addLast方法做了什么事情,点进去。
1.发现将ChannelInitializer构造成DefaultChannelHandlerContext上下文对象,然后将其加入到pipeline的双向链表中,注意总是从tail.pre的节点开始向后添加。那么现在pipeline的双向链表中的元素是:
【headContext <--> ChannelInitializer实现类上下文 <--> tailContext】
2.同时初始化了PendingHandlerAddedTask任务,其中的内容就是👆的ChannelInitializer实现类上下文。

1.3 config().group().register(channel);
 1public ChannelFuture register(Channel channel) {
2    return next().register(channel);
3}
4
5@Override
6public EventExecutor next() 
{
7    return chooser.next();
8}
9private void register0(ChannelPromise promise) {
10    try {
11        // 👇
12        doRegister();
13        neverRegistered = false;
14        registered = true;
15        // 👇
16        pipeline.invokeHandlerAddedIfNeeded();
17
18        safeSetSuccess(promise);
19        // 👇 触发pipeline中的双向链表的handler的channelRegistered
20        // 从head开始向后轮询,只抽出满足条件(必须是ChannelInboundHandler的实现类)的handler
21        pipeline.fireChannelRegistered();
22        // 省略部分代码。。。
23    } catch (Throwable t) {
24        // Close the channel directly to avoid FD leak.
25        closeForcibly();
26        closeFuture.setClosed();
27        safeSetFailure(promise, t);
28    }
29}

源码解读:
问:通道注册到哪个NioEventLoop上?
答:我们知道group指的是NioEventLoopGroup,是NioEventLoop(每个nioEventLoop在创建的时候都开启了多路选择器selector)的集合,那么通道到底注册到哪个selector呢?
还记得(创建NioEventLoopGroup阶段)构建的chooser选择器吗?就是通过选择器来选定一个NioEventLoop的,基本的策略就是通过下标+1轮询一个。

问:doRegister()中做了什么?
答:点进去发现,是将NioServerSocketChannel中原生的通道注册到选定的NioEventLoop上原生的选择器selector上。

:pipeline.invokeHandlerAddedIfNeeded做了啥是?
答:主要是执行PendingHandlerAddedTask任务,也就是其中chanelHandlerContext上下文的通道处理器的handlerAdded方法,也就是(ChannelInitializer)中的handlerAdded方法。🔽

 1    @Override
2    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
3        if (ctx.channel().isRegistered()) {
4            initChannel(ctx);
5        }
6    }
7
8    @SuppressWarnings("unchecked")
9    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
10        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
11            try {
12                initChannel((C) ctx.channel());
13            } catch (Throwable cause) {
14                exceptionCaught(ctx, cause);
15            } finally {
16                remove(ctx);
17            }
18            return true;
19        }
20        return false;
21    }

执行ChannelInitializer的initChannel方法,执行完毕后,将自身从pipeline中删除。回顾initChannel中做了什么(1.将ServerBootstrap中配置的handler追击到pipeline中,2.将ServerBootstrapAcceptor追加到pipeline中)。
此时pipeline中的双向链表中的内容变成了:
【tailContext <--> LoggingHandler ==> ServerBootstrapAcceptor <--> tailContext】

:( pipeline.fireChannelRegistered();)做了什么?
答:从pipeline中的双向链表中的head节点开始,轮询满足条件的handler,执行其channelRegistered方法。条件:必须是ChannelInboundHandler的实现类,即必须是inbound的ChannelHandler。

现在来回顾下initAndRegister的整体流程:

2.进行绑定阶段-doBind0
 1private static void doBind0(
2        final ChannelFuture regFuture, final Channel channel,
3        final SocketAddress localAddress, final ChannelPromise promise)
 
{
4
5    channel.eventLoop().execute(new Runnable() {
6        @Override
7        public void run() {
8            if (regFuture.isSuccess()) {
9                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
10            } else {
11                promise.setFailure(regFuture.cause());
12            }
13        }
14    });
15}

源码解读:
将绑定的任务交给了channel注册的selector所属的NioEventLoop。
execute里面做了什么事情呢?
如果NioEventLoop线程还没有开启,则开启线程,然后将任务加入到任务队列中。

:任务做了啥?

 1@Override
2public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
3
4    boolean wasActive = isActive();
5    try {
6        // 👇NioServerSocketChannel的doBind方法
7        doBind(localAddress);
8    } catch (Throwable t) {
9        safeSetFailure(promise, t);
10        closeIfClosed();
11        return;
12    }
13
14    if (!wasActive && isActive()) {
15        invokeLater(new Runnable() {
16            @Override
17            public void run() {
18                // 👇 最终会到head的抽出inbound的channelHandler,触发其channelActive方法,
19                pipeline.fireChannelActive();
20            }
21        });
22    }
23
24    safeSetSuccess(promise);
25}
26    @Override
27    protected void doBind(SocketAddress localAddress) throws Exception {
28        // 将监听地址绑定到通道上
29        if (PlatformDependent.javaVersion() >= 7) {
30            javaChannel().bind(localAddress, config.getBacklog());
31        } else {
32            javaChannel().socket().bind(localAddress, config.getBacklog());
33        }
34    }
 1    @Override
2    protected void doBeginRead() throws Exception {
3        // Channel.read() or ChannelHandlerContext.read() was called
4        final SelectionKey selectionKey = this.selectionKey;
5        if (!selectionKey.isValid()) {
6            return;
7        }
8
9        readPending = true;
10
11        final int interestOps = selectionKey.interestOps();
12        if ((interestOps & readInterestOp) == 0) {
13            selectionKey.interestOps(interestOps | readInterestOp);
14        }
15    }

:如何开启线程?
答:开启线程:在(创建NioEventLoopGroup)阶段,创建了executor,通过默认线程工厂创建一个线程。

:开启的线程里面做了哪些事情?
答:线程里面做了啥事?看NioEventLoop的run方法。
1.等待Nio事件 -- select()
2.处理Nio事件 -- processSelectedKeys()⭐️关系到后续要说明的接收请求和监听读事件的处理
3.处理NioEventLoop的任务队列中的任务 -- runAllTasks

现在,来回顾下绑定的整体流程。

至此,服务端的启动的流程已经走完。

以上。