vlambda博客
学习文章列表

深入netty网络编程

一、netty简介

Netty是基于Java NIO 的异步事件驱动的网络应用框架,使用Netty可以快速开发网络应用,Netty提供了高层次的抽象来简化TCP和UDP服务器socket编程。同时保证高吞吐量、低延时、高可靠性。


二、netty vs nio编程

NIO开发的问题:

1、NIO类库和API复杂,使用麻烦。

2、需要具备Java多线程编程能力(涉及到Reactor模式)。

3、客户端断线重连、网络不稳定、半包读写、失败缓存、网络阻塞和异常码流等问题处理难度非常大。

4、存在部分BUG。

5、要写大量繁琐的代码,创建、注册,判断事件等,很多程序只关注数据的收发以及对数据的逻辑处理,NIO需要手动创建buffer缓冲区。


netty优势:

1、API使用简单,开发门槛低。

2、功能强大,预置了多种编解码功能,支持多种主流协议。

3、定制功能强,可以通过ChannelHandler对通信框架进行灵活的扩展。

4、性能高,通过与其他业界主流的NIO框架对比,Netty综合性能最优。

5、成熟、稳定,Netty修复了已经发现的NIO所有BUG。

6、社区活跃。

7、经历了很多商用项目的考验。

8、Netty不需要程序员创建bytebuf,直接读取数据到已有的缓冲区中。

9、使用主线程组负责连接,工作线程组负责其他的业务处理。

10、更加优雅的 Reactor 模式实现、灵活的线程模型、利用 EventLoop 等创新性的机制,可以非常高效地管理成百上千的 Channel。

11、充分利用了 Java 的 Zero-Copy 机制,并且从多种角度,“斤斤计较”般的降低内存分配和回收的开销。例如,使用池化的 Direct Buffer 等技术,在提高 IO 性能的同时,减少了对象的创建和销毁;

12、利用反射等技术直接操纵 SelectionKey,使用数组而不是 Java 容器等。

13、使用更多本地代码。例如,直接利用 JNI 调用 Open SSL 等方式,获得比 Java 内建 SSL 引擎更好的性能。

14、在通信协议、序列化等其他角度的优化,从网络协议的角度,Netty 除了支持传输层的 UDP、TCP、SCTP协议,也支持 HTTP(s)、WebSocket 等多种应用层协议,它并不是单一协议的 API。

15、总的来说,Netty 并没有 Java 核心类库那些强烈的通用性、跨平台等各种负担,针对性能等特定目标以及 Linux 等特定环境,采取了一些极致的优化手段。

16、异步非阻塞(线程池处理)、基于事件驱动、高性能、高可靠性和高可定制性。


三、组件

  • Channel

    netty中把一个端到端的通信定义为了通道。所谓的端包含但不限于硬件设备,文件。这是对通信的第一层抽象。通道也是一个连接在Java中(Socket类),基本的 I/O 操作(bind()、 connect()、 read()和 write())依赖于底层网络传输所提供的功能。Netty 的 Channel 接口所提 供的 API降低了直接使用 Soc ket 类的复杂性。Netty中也提供了使用多种方式连接的Channel: 

    - EmbeddedChannel 

    - LocalServerChannel

    - NioDatagramChannel 

    - NioSctpChannel 

    - NioSocketChannel


  • Selector

    选择器或多路复用器,Netty基于Selector对象实现(底层epoll实现)I/O多路复用,通过 Selector绑定有EventLoop一个线程可以监听多个连接的Channel事件, 当向一个Selector中注册Channel 后,Selector 内部的机制就可以自动不断地查询(select) 这些注册的Channel是否有已就绪的I/O事件(例如可读, 可写, 网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel。


  • ByteBuf

    缓冲区,对字节数据的封装,由最小容量、最大容量、读指针、写指针组成。

    直接内存 vs 堆内存:

    - 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用。

    - 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放。

    池化 vs 非池化,池化的最大意义在于可以重用 ByteBuf,优点有:

    -没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力。

    -有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率。

    -高并发时,池化功能更节约内存,减少内存溢出的可能。


  • Bootstrap、ServerBootstrap

    Bootstrap意思是引导,一个Netty应用通常由一个Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类。


  • NioEventLoop

    NioEventLoop中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务:、

    -I/O任务 即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法触发。

    -非IO任务 添加到taskQueue中的任务,如register0、bind0等任务,由runAllTasks方法触发。


  • NioEventLoopGroup

    NioEventLoopGroup,主要管理eventLoop的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个Channel上的事件,而一个Channel只对应于一个线程。


  • ChannelHandler

    ChannelHandler是一个接口,处理I/O事件或拦截I/O操作,并将其转发到其 ChannelPi peline(业务处理链)中的下一个处理程序。ChannelHandler 用来处理 Channe 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline。


  • ChannelHandlerContext

    保存Channel相关的所有上下文信息,同时关联一个ChannelHandler对象。


  • ChannelPipline

    保存ChannelHandler的List,用于处理或拦截Channel的入站事件和出站操作(分包粘包、编码解码等处理),ChannelPipeline实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及Channel中各个的ChannelHandler如何相互交互。


  • ChannelFuture

    这是 Netty 实现异步 IO 的基础之一,保证了同一个 Channel 操作的调用顺序。Netty 扩展了 Java 标准的 Future,提供了针对自己场景的特有Future定义。


四、启动流程

public class NettyServer { //创建两个线程组bossGroup和workGroup,含有的线程NioEventLoop的个数默认为cpu核数的两倍 //bossGroup主线程组只负责请求连接,真正和客户端业务处理交给workGroup完成 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workGroup = new NioEventLoopGroup(10); try { //创建服务端启动对象 ServerBootstrap bootstrap = new ServerBootstrap(); //使用链式编程来配置参数 bootstrap.group(bossGroup, workGroup) //使用NioServerSocketChannel作为服务器的通道实现 .channel(NioServerSocketChannel.class) // 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。 // 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理 .option(ChannelOption.SO_BACKLOG, 1024) //创建通道初始化对象,设置初始化参数 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //对workGroup的SocketChannel设置处理器 ch.pipeline().addLast(new NettyServerHandler()); } }); System.out.println("netty server start success "); //绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况 // 启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕 ChannelFuture channelFuture = bootstrap.bind(8000).sync(); //对通道关闭进行监听,closeFuture是异步操作,监听通道关闭 // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成 channelFuture.channel().closeFuture().sync(); } catch (Exception ce) { ce.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); }}
  • NioEventLoopGroup 实际上就是个线程池,一个 EventLoopGroup 包含一个或者多个 EventLoop;

  • 一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;

  • 所有有 EnventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理;

  • 一个 Channel 在它的生命周期内只注册于一个 EventLoop;

  • 每一个 EventLoop 负责处理一个或多个 Channel;


五、netty线程模型:reactor

    基于事件驱动响应的模型,epoll也属于事件驱动,主线程组负责连接、工作线程组负责其他业务,一个selector相当于一个reactor对应一个线程,bossGroup和workerGroup属于多Reactor模型。就是reactor的多线程模型。分类有Reactor 单线程;Reactor 多线程;主从 Reactor 多线程。

    一个连接里完整的网络处理过程一般分为accept、read、decode、process、encode、send这几步。Reactor模式将每个步骤映射为一个Task,服务端线程执行的最小逻辑单元不再是一次完整的网络请求,而是Task,且采用非阻塞方式执行。

    


六、异步处理

    netty是异步非阻塞框架,通过线程池多线程模型来完成的。异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。

    Netty 中的异步事件处理,Event被放入EventQueue即可返回,后续再从Queue消费处理 

Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture。调用者并不能立刻获得结果,而是通过 Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。
    当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。
常见有如下操作:
(1)通过 isDone 方法来判断当前操作是否完成;
(2)通过 isSuccess 方法来判断已完成的当前操作是否成功;
(3)通过 getCause 方法来获取已完成的当前操作失败的原因;
(4)通过 isCancelled 方法来判断已完成的当前操作是否被取消;
(5)通过 addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果 Future 对象已完成,则理解通知指定的监听器。

    例如下面的代码中绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑:

serverBootstrap.bind(port).addListener(future -> { if(future.isSuccess()) { System.out.println(newDate() + ": 端口["+ port + "]绑定成功!"); } else{ System.err.println("端口["+ port + "]绑定失败!"); } });


七、零拷贝

(1)Netty 的接收和发送 ByteBuffer默认采用 DIRECT BUFFERS ,使用堆外直接内存进 行 Socket 读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存 ( HEAP BUFFERS)进行 Socket 读写, JVM 会将堆内存 Buffer 拷贝一份到直接内 存中,然后才写入 Socket 中。相比于堆外直接内存,消息在发送过程中多了一次缓 冲区的内存拷贝。

(2)Netty 提供了组合 Buffer 对象,可以聚合多个 ByteBuffer 对象,用户可以像操作一个 Buffer 那样方便的对组合 Buffer 进行操作,避免了传统通过内存拷贝的方式 将几个 小 Buffer 合并成一个大的 Buffer 。

(3)Netty 的文件传输采用了 transferTo 方法,它可以直接将文件缓冲区的数据发送到目标 Channel ,避免了传统通过循环 write 方式导致的内存拷贝问题。




八、源码分析


1、 EventLoopGroup bossGroup =new NioEventLoopGroup(1);   EventLoopGroup workGroup =new NioEventLoopGroup(); 

       这两行代码就是创建两个线程池组,第一个是boss线程组,第二个是工作线程组,netty底层的boos就是用来处理连接事件,然后将连接注册到工作线程上,其实在NioEventLoopoGroup中如果初始化的线程数是1,那么底层就是一个多路复用器,也就是说传入了几个线程,就会产生多少给多路复用器Selector。

public NioEventLoopGroup(int nThreads, Executor executor) { /** * 这个构造中第三个参数就是Nio的操作,nio中创建一个ServerSocket就是通过ServerSocket.open, * 其实底层执行的代码就是调用的SelectorProvider.provider().openServerSocketChannel(),所以这里 * 传入的是NIO的相关入口api,后续要使用 */ this(nThreads, executor, SelectorProvider.provider());}
/** * 这里是初始化Nio线程组,在Netty中是线程组,线程组使用的是MultithreadEventLoopGroup,而每个线程组 * 中都是一个NioEventLoop,对应的是SingleThreadEventLoop * @param nThreads * @param executor * @param selectorProvider * @param selectStrategyFactory */public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());}//MultithreadEventLoopGroup是NioEventLoopGroup的父类protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { /** * 在构造线程组的时候如果没有传入线程的具体数字,也就是没有指定线程数的情况下,那么netty会默认 * 分配一个线程 数量:Math.max(1, SystemPropertyUtil.getInt( * "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)) *其实简单来说就是获取服务器的cpu核数 * 2,比如我的电脑是8核 ,那么线程数就是16 */ super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);}protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); }
//初始化线程池 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); }
/** * 这里就是初始化线程组的,比如说 NioEventLoopGroup初始化的是16,那么这里就会初始化一个线程组 * EventExecutor数组为16,这里面的每一个都是一个线程对象,这个线程对象是NioEventLoop,也是一个 * SingleThreadEventLoop,所以NioEventLoopGroup中包含的是MultitheadEventLoopGroup,而每个 * MultitheadEventLoopGroup中的线程是NioEventLoop(SingleThreadEventLoop),在里面维护了一个 * 线程池,所以NioEventLoop中有一个多路复用器Selector,简单来说就是初始化的NioEventLoopGroup有多个个线程 * 就有多少个多路多路复用器Selector和TaskQueue,而Selector和TaskQueue是存在于NioEventLoop中的 * */ children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) { boolean success = false; try { //创建一个NioEventLoop,放入线程组,创建NioEventLoop的时候会创建Selector和taskQueue,并且维护了一个线程池 //线程池是共用的,就是NioEventLoopGroup初始化的线程池Executor是共用的 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { ....
io.netty.channel.nio.NioEventLoopGroup#newChild@Overrideprotected EventLoop newChild(Executor executor, Object... args) throws Exception { EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; //创建一个NioEventLoop,创建Selector和taskQueue return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);}NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { //这里创建了一个TaskQueue,然后调用父类的构造 super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); //调用NIO的api创建一个多路复用器Selector,是通过provider创建的 final SelectorTuple selectorTuple = openSelector(); //将创建的多路复用器Selector赋值给全局的selector this.selector = selectorTuple.selector; //unwrappedSelector和selector是相同的 this.unwrappedSelector = selectorTuple.unwrappedSelector;}//SingleThreadEventExecutor是NioEventLoop父类protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; //设置最大的task任务数量 Integer.MAX_VALUE this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS; this.executor = ThreadExecutorMap.apply(executor, this); this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue"); this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");}

上面的源码分析:

1.通过NioEventLoopGroup创建一个线程组,这个线程组是一个MultithreadEventExecutor Group,然后在MultithreadEventExecutorGroup中根据传入的线程数会创建很多个NioEve ntLoop,每个NioEventLoop中都会维护一个线程池Executor、一个Selector多路复用器、一个taskQueue,其中selector、taskQueue是NioEventLoop是每个NioEventLoop私有的,就是比如创建了16个NioEventLoop对象,那么每个事件循环线程组对象NioEventLoop中的selector、taskQueue都是私有的,而Executor是共有的,就是所有的对象都会共用Executor这个线程池。

2.然后创建的所有的事件线程组对象组成一个EventExecutor数组 ,EventExecutor的子类是NioEventLoop。

3.NioEventLoopGroup是一个线程组,这个线程组是通过MultithreadEventExecutorGroup创建的,所以NioEventLoopGroup对应的是MultithreadEventExecutorGroup,而NioEve ntLoop是通过MultithreadEventExecutorGroup创建的,创建的是一个SingleThreadEvent Loop线程组,就是单线程组。

4.每个SingleThreadEventLoop中会创建一个多路复用器selector和一个taskQueue,task Queue主要存放一个任务,然后通过executor去执行这个任务,其实对于boss来说,这个task其实就是注册事件。



2、ServerBootstrap启动类链式编程

    ServerBootstrap就是服务端启动的一个启动类,包含了netty启动的所有过程,它采用的是链式编程,比如绑定线程组、设置tcp的一些参数和处理器都是通过启动类来绑定的,前面声明了两个线程组,但是声明的线程组如果没有绑定到启动类,也就是主运行的类上是没有任何效果的。

绑定线程组

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } //将工作线程组的对象赋值给childGroup,没有其他过程 this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup"); return this;}public B group(EventLoopGroup group) { ObjectUtil.checkNotNull(group, "group"); if (this.group != null) { throw new IllegalStateException("group set already"); } //将boss的线程组绑定到group中,就是一个赋值的过程,没有其他的操作 this.group = group; return self();}

channel(NioServerSocketChannel.class):

public B channel(Class<? extends C> channelClass) { /** * 这里将传入的比如NioServerSocketChannel通过反射创建一个ReflectiveChannelFactory对象,然后 * 调用channelFactory将创建的ReflectiveChannelFactory对象赋值给ServerBootstrap 中channelFactory属性 * 所以这里记录下,在ServerBootstrap中有个属性为channelFactory * 这里的new ReflectiveChannelFactory其实就是得到了传入进来的channelClass一个对象,通过反射调用 * 构造NioServerSocketChannel的构造空构造方法 */ return channelFactory(new ReflectiveChannelFactory<C>( ObjectUtil.checkNotNull(channelClass, "channelClass") ));}
io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel()/** * Create a new instance * 这里是一个空构造方法,在这个构造方法中做的事情有: * 1.创建一个ServerSocketChannel(NIO); * 2.将创建的ServerSocketChannel设置一些参数,比如设置这个ServerSocketChannel的事件为OP_ACCEPT,就是接受连接事件 * 3.设置ServerSocketChannel为非阻塞的模式。 */public NioServerSocketChannel() { //newSocket就是创建一个ServerSocketChannel,调用的就是NIO的创建逻辑 this(newSocket(DEFAULT_SELECTOR_PROVIDER));}private static ServerSocketChannel newSocket(SelectorProvider provider) { try { /** * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise. * * See <a href="https://github.com/netty/netty/issues/2308">#2308</a>. * 调用NIO创建一个ServerSocketChannel */ return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); }}public NioServerSocketChannel(ServerSocketChannel channel) { //调用父类将这个ServerSocketChannel设置为非阻塞模式和将创建的ServerSocketChannel赋值到父类的属性ch中 super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket());}//AbstractNioChannel是NioServerSocketChannel的父类protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { //调用父类初始化的管道ChannelPipeline super(parent); //赋值channel this.ch = ch; //设置这个ServerSocketChannel的事件类型为readInterestOp(OP_ACCEPT) this.readInterestOp = readInterestOp; try { //设置这个ServerSocketChannel为非阻塞的模式 ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { logger.warn( "Failed to close a partially initialized socket.", e2); }
throw new ChannelException("Failed to enter non-blocking mode.", e); }}//AbstractChannel是AbstractNioChannel的父类protected AbstractChannel(Channel parent) { this.parent = parent; //创建一个ChannelId id = newId(); //创建AbstractUnsafe对象,后续要使用 unsafe = newUnsafe(); //初始化管道pipeline对象,这个对象非常重要,就是我们的管道的处理器就是放在里面的,所有的处理器都会加入到这个管道中 //这个管道就是一个双向链表,有head和tail,默认的管道实现类是DefaultChannelPipeline pipeline = newChannelPipeline();}
io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline//channel就是刚刚NioServerSocketChannel,也是一个ServerScoketChannelprotected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true);
//这个管道pipeline中的尾部 tail = new TailContext(this); //这个管道pipeline中的头部 head = new HeadContext(this);
//建立双向链表关系 head.next = tail; tail.prev = head;}


上面的过程就是在绑定一个NioServerSocketChannel,其实简单来说就是

1.将NioServerSocketChannel通过反射调用构造方法封装成一个ReflectiveChannelFactory对象,然后设置这个对象到ServerBootstrap中的属性channelFactory中,ReflectiveChan nelFactory包含了NioServerSocketChannel中的构造方法 和通过反射得到实例对象的方法,其实就是对NioServerSocketChannel的一个包装类。

2.第一步是通过反射得到NioServerSocketChannel对象,所以NioServerSocketChannel中构造所做的事情就是调用Nio创建一个ServerSocketChannel,然后设置这个ServerSocket Channel的事件类型为OP_ACCETP,并且设置为阻塞。

3.在这个构造方法中还做一件很重要的事情就是初始化管道DefaultChannelPipeline,这个管道对象中还创建了一个双向链表的管道对象head和tail,分别对应AbstractChannelHandler Context head;final AbstractChannelHandlerContext tail。


ChannelFuture channelFuture = bootstrap.bind(8000).sync():

上面的一些操作都是Netty启动前需要做的一些准备工作,真正的启动流程就在最后一行代码 bind(8000),这行代码承载了Netty启动的最复杂的逻辑。

public ChannelFuture bind(int inetPort) { //绑定启动,将端口封装成了一个InetSocketAddress对象 return bind(new InetSocketAddress(inetPort));}

之前NIO编程创建一个ServerSocketChannel,然后绑定一个端口,最后调用epoll的select进行阻塞,当有事件过来的时候select就会解除阻塞,最后通过selectKeys来接受事件,Netty也是这样的流程。

/** * 这个方法就是Netty的启动过程的最核心的方法,前面的都只是为 下面这个方法做准备的 * 这个方法里面有两个最核心的方法, * initAndRegister:初始化和注册,注册的是将创建ServerSocketChannel注册到多路复用器selector上 * doBind0:端口绑定,启动监听 * 总结来说就是initAndRegister配置Netty和注册多路复用器 * doBind0:绑定端口,调用epoll阻塞,监听事件的发生 * @param localAddress * @return */private ChannelFuture doBind(final SocketAddress localAddress) {
//初始化和注册(太多 ) final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; }
/** * 下面的代码逻辑就是绑定端口,开启select阻塞 * 首先条件isDone第一次启动是不会进入这里,走的else的逻辑 * 在else逻辑中添加了一个监听器,最后由监听器出发了doBind0方法 * 所以不管怎么说,核心代码逻辑都是doBind0 */ if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); //添加一个监听器,最后由监听器调用了doBind0方法 regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered();
/** * 在DefaultChannelPromise中的notifyListeners进行调用的 */
doBind0(regFuture, channel, localAddress, promise); } } }); return promise; }}


/** * 多路复用器注册的核心方法 * @param task */@Overridepublic void execute(Runnable task) { ObjectUtil.checkNotNull(task, "task"); execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));}/** * 多路复用器注册的核心方法,task就是最外层的那个register0所在的线程 * @param task * @param immediate */private void execute(Runnable task, boolean immediate) { //这里就是和外层的那个判断一样,就是看当前线程是否是NioEventLoop的那个线程,代码到这里还在主线程中,所以这里返回的false //因为task线程还没开始执行 boolean inEventLoop = inEventLoop(); //将注册的那个register0所在线程加入到task任务中,也就是加入到taskQueue.offer(task)队列中,等待调度 addTask(task); if (!inEventLoop) { //开始线程 startThread(); if (isShutdown()) { boolean reject = false; try { if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { // The task queue does not support removal so the best thing we can do is to just move on and // hope we will be able to pick-up the task before its completely terminated. // In worst case we will log on termination. } if (reject) { reject(); } } }
if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); }}private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } }}


通道注册到多不复用器:

io.netty.channel.AbstractChannel.AbstractUnsafe#register

@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) { ObjectUtil.checkNotNull(eventLoop, "eventLoop"); //这里表示已经注册过了,不需要在注册 if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; }
//eventLoop 是从boss 线程组选择出来一个NioEventLoop AbstractChannel.this.eventLoop = eventLoop;
//下面这个if表示当前的线程是否和NioEventLoop是否在一个线程中的 //反正就是判断需要同步处理还是异步处理,Netty是一个异步非阻塞的通信框架,所以很多地方都是使用这种线程池的方案去处理的 //所以毫无疑问,register0是核心方法 if (eventLoop.inEventLoop()) { register0(promise); } else { try { /** * 这里不要被这个execute给迷糊了,看到这个以为是一个线程的启动方法,其实不是,其实就是简单的 * 调用了对象eventLoop中的execute方法,这个方法传入了一个线程对象,但是肯定不是在这里执行的这个线程的run方法 * 的,所以这里记住这个execute方法中传入了一个内部类线程,肯定要先看这个execute方法,而这里的 * eventLoop是NioEventLoop,也是SingleThreadEventExecutor */ eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }}
io.netty.util.concurrent.SingleThreadEventExecutor#execute(java.lang.Runnable)/** * 多路复用器注册的核心方法 * @param task */@Overridepublic void execute(Runnable task) { ObjectUtil.checkNotNull(task, "task"); execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));}/** * 多路复用器注册的核心方法,task就是最外层的那个register0所在的线程 * @param task * @param immediate */private void execute(Runnable task, boolean immediate) { //这里就是和外层的那个判断一样,就是看当前线程是否是NioEventLoop的那个线程,代码到这里还在主线程中,所以这里返回的false //因为task线程还没开始执行 boolean inEventLoop = inEventLoop(); //将注册的那个register0所在线程加入到task任务中,也就是加入到taskQueue.offer(task)队列中,等待调度 addTask(task); if (!inEventLoop) { //开始线程 startThread(); if (isShutdown()) { boolean reject = false; try { if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { // The task queue does not support removal so the best thing we can do is to just move on and // hope we will be able to pick-up the task before its completely terminated. // In worst case we will log on termination. } if (reject) { reject(); } } }
if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); }}private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } }}


io.netty.channel.nio.NioEventLoop#run

这个run方法就是NioEventLoop的,channel注册多路复用器、select阻塞、事件的处理等都在这个run里面执行的,为什么说netty是异步非阻塞,就是因为它都是通过线程池来完成的;简单来说就是这个run方法将之前添加的taskQueue中的线程任务都拿出来要执行,但是需要将run的逻辑执行完成了 才会执行,所以当你看到eventLoop.execute(Runnable task)的时候你就要知道task的执行是execute去执行的,但是需要execute中的逻辑执行完成了才会去执行这个task,而Netty的这里的很多任务都是这样做的,就是当有eventLoop.execute (Runnable task)的时候,Netty首先将这个task添加到taskQueue中,然后执行了execute中的逻辑过后,然后将taskQueue中的task拿出来,然后调用task的run执行任务的核心逻辑,netty中大量的处理都是在taskQueue中去完成的,所以 对于eventLoop.execute(task)这个核心逻辑要明白才行。

/** * 这里的run方法才是真正的将NIOServerSocketChannel注册到多路复用器上 */@Overrideprotected void run() { int selectCnt = 0; for (;;) { try { int strategy; try { /** *这里就是判断现在是那种事件,如果Netty在启动的时候,这里的case都不满足,但是如果启动完成了 *那么再来调用就会调用SELECT,所以根据不同的情况来的 */ strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue;
case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT: //这里就是调用的多路复用器的select方法,就是在nio中的selector.select()方法 /** * curDeadlineNanos这个是计算调用select的时候阻塞的时间的,因为netty这里 还有很多事情没有做 * 注册多路复用器都还没有做,所以这里要计算出一个时间,如果这个时间过了,还没有连接过来,那么也会唤醒, * 不会一直阻塞 */ long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { /** * 如果是第一次启动的注册多路复用器是不会进入这个 if的,因为 * hasTasks就是判断当前的多路复用器注册队列中是是否有任务,如果有任务, * 那么返回true,没有就返回 false的,netty的意思就是服务端刚启动要注册的是NioServerSocketChannel * 所以不阻塞,否则阻塞了没意义,因为多路复用器都还没有注册的有channel,根本没有办法监听 */ if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); } // fall through default: } } catch (IOException e) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 rebuildSelector0(); selectCnt = 0; handleLoopException(e); continue; }
/** * strategy在Netty启动的时候是0 */ selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; boolean ranTasks; if (ioRatio == 100) { try { if (strategy > 0) { //调用select有事件发生的时候会来调用,这里面就是NIO的那个获取selectKeys然后处理的逻辑,netty封装了 processSelectedKeys(); } } finally { // Ensure we always run tasks. ranTasks = runAllTasks(); } } else if (strategy > 0) { final long ioStartTime = System.nanoTime(); try { //调用select有事件发生的时候会来调用 processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { //Netty启动的时候调用,这里才是真正的多路复用器的注册,就是调用最开始的那个execute(Runable task)中的那个task //就是从taskQueue中取出这个线程任务然后执行run方法 ranTasks = runAllTasks(0); // This will run the minimum number of tasks }
if (ranTasks || strategy > 0) { if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } selectCnt = 0; } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case) selectCnt = 0; } } catch (CancelledKeyException e) { // Harmless exception - log anyway if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } } catch (Error e) { throw (Error) e; } catch (Throwable t) { handleLoopException(t); } finally { // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Error e) { throw (Error) e; } catch (Throwable t) { handleLoopException(t); } } }}


io.netty.channel.AbstractChannel.AbstractUnsafe#register0

private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; //这里就是将NioServerSocketChannel注册到多路复用上Selector doRegister(); neverRegistered = false; registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. /** * 这个方法的调用就是处理管道的handler的,就是处理器, * 它的过程会将之前创建的ChannelInitializer那个管道中的处理器添加到管道中,然后删除 * 简单来说就是先将ChannelInitializer中的那个initChannel方法调用完成过后,然后删除这个ChannelInitializer那 * 内部类 */ pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise); //这里的调用的注册,那么注册也要调用管道中的所有的有实现了注册的处理器,调用是从链表的head开始的 //当看到有调用fireChannelRegistered方法的, 那么证明是调用了下一个处理器的ChannelRegistered方法 pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { //表示netty现在是激活的状态来调用的 if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}@Overrideprotected void doRegister() throws Exception { boolean selected = false; for (;;) { try { //将NioServerSocketChannel 注册到selector多路复用器 上 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } }}