深入netty网络编程
一、netty简介
Netty是基于Java NIO 的异步事件驱动的网络应用框架,使用Netty可以快速开发网络应用,Netty提供了高层次的抽象来简化TCP和UDP服务器socket编程。同时保证高吞吐量、低延时、高可靠性。
二、netty vs nio编程
NIO开发的问题:
1、NIO类库和API复杂,使用麻烦。
2、需要具备Java多线程编程能力(涉及到Reactor模式)。
3、客户端断线重连、网络不稳定、半包读写、失败缓存、网络阻塞和异常码流等问题处理难度非常大。
4、存在部分BUG。
5、要写大量繁琐的代码,创建、注册,判断事件等,很多程序只关注数据的收发以及对数据的逻辑处理,NIO需要手动创建buffer缓冲区。
netty优势:
1、API使用简单,开发门槛低。
2、功能强大,预置了多种编解码功能,支持多种主流协议。
3、定制功能强,可以通过ChannelHandler对通信框架进行灵活的扩展。
4、性能高,通过与其他业界主流的NIO框架对比,Netty综合性能最优。
5、成熟、稳定,Netty修复了已经发现的NIO所有BUG。
6、社区活跃。
7、经历了很多商用项目的考验。
8、Netty不需要程序员创建bytebuf,直接读取数据到已有的缓冲区中。
9、使用主线程组负责连接,工作线程组负责其他的业务处理。
10、更加优雅的 Reactor 模式实现、灵活的线程模型、利用 EventLoop 等创新性的机制,可以非常高效地管理成百上千的 Channel。
11、充分利用了 Java 的 Zero-Copy 机制,并且从多种角度,“斤斤计较”般的降低内存分配和回收的开销。例如,使用池化的 Direct Buffer 等技术,在提高 IO 性能的同时,减少了对象的创建和销毁;
12、利用反射等技术直接操纵 SelectionKey,使用数组而不是 Java 容器等。
13、使用更多本地代码。例如,直接利用 JNI 调用 Open SSL 等方式,获得比 Java 内建 SSL 引擎更好的性能。
14、在通信协议、序列化等其他角度的优化,从网络协议的角度,Netty 除了支持传输层的 UDP、TCP、SCTP协议,也支持 HTTP(s)、WebSocket 等多种应用层协议,它并不是单一协议的 API。
15、总的来说,Netty 并没有 Java 核心类库那些强烈的通用性、跨平台等各种负担,针对性能等特定目标以及 Linux 等特定环境,采取了一些极致的优化手段。
16、异步非阻塞(线程池处理)、基于事件驱动、高性能、高可靠性和高可定制性。
三、组件
Channel
netty中把一个端到端的通信定义为了通道。所谓的端包含但不限于硬件设备,文件。这是对通信的第一层抽象。通道也是一个连接在Java中(Socket类),基本的 I/O 操作(bind()、 connect()、 read()和 write())依赖于底层网络传输所提供的功能。Netty 的 Channel 接口所提 供的 API降低了直接使用 Soc ket 类的复杂性。Netty中也提供了使用多种方式连接的Channel:
- EmbeddedChannel
- LocalServerChannel
- NioDatagramChannel
- NioSctpChannel
- NioSocketChannel
Selector
选择器或多路复用器,Netty基于Selector对象实现(底层epoll实现)I/O多路复用,通过 Selector绑定有EventLoop一个线程可以监听多个连接的Channel事件, 当向一个Selector中注册Channel 后,Selector 内部的机制就可以自动不断地查询(select) 这些注册的Channel是否有已就绪的I/O事件(例如可读, 可写, 网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel。
ByteBuf
缓冲区,对字节数据的封装,由最小容量、最大容量、读指针、写指针组成。
直接内存 vs 堆内存:
- 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用。
- 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放。
池化 vs 非池化,池化的最大意义在于可以重用 ByteBuf,优点有:
-没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力。
-有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率。
-高并发时,池化功能更节约内存,减少内存溢出的可能。
Bootstrap、ServerBootstrap
Bootstrap意思是引导,一个Netty应用通常由一个Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类。
NioEventLoop
NioEventLoop中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务:、
-I/O任务 即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法触发。
-非IO任务 添加到taskQueue中的任务,如register0、bind0等任务,由runAllTasks方法触发。
NioEventLoopGroup
NioEventLoopGroup,主要管理eventLoop的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个Channel上的事件,而一个Channel只对应于一个线程。
ChannelHandler
ChannelHandler是一个接口,处理I/O事件或拦截I/O操作,并将其转发到其 ChannelPi peline(业务处理链)中的下一个处理程序。ChannelHandler 用来处理 Channe 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline。
ChannelHandlerContext
保存Channel相关的所有上下文信息,同时关联一个ChannelHandler对象。
ChannelPipline
保存ChannelHandler的List,用于处理或拦截Channel的入站事件和出站操作(分包粘包、编码解码等处理),ChannelPipeline实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及Channel中各个的ChannelHandler如何相互交互。
ChannelFuture
这是 Netty 实现异步 IO 的基础之一,保证了同一个 Channel 操作的调用顺序。Netty 扩展了 Java 标准的 Future,提供了针对自己场景的特有Future定义。
四、启动流程
public class NettyServer {//创建两个线程组bossGroup和workGroup,含有的线程NioEventLoop的个数默认为cpu核数的两倍//bossGroup主线程组只负责请求连接,真正和客户端业务处理交给workGroup完成EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workGroup = new NioEventLoopGroup(10);try {//创建服务端启动对象ServerBootstrap bootstrap = new ServerBootstrap();//使用链式编程来配置参数bootstrap.group(bossGroup, workGroup)//使用NioServerSocketChannel作为服务器的通道实现.channel(NioServerSocketChannel.class)// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理.option(ChannelOption.SO_BACKLOG, 1024)//创建通道初始化对象,设置初始化参数.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//对workGroup的SocketChannel设置处理器ch.pipeline().addLast(new NettyServerHandler());}});System.out.println("netty server start success ");//绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况// 启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕ChannelFuture channelFuture = bootstrap.bind(8000).sync();//对通道关闭进行监听,closeFuture是异步操作,监听通道关闭// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成channelFuture.channel().closeFuture().sync();} catch (Exception ce) {ce.printStackTrace();} finally {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}
NioEventLoopGroup 实际上就是个线程池,一个 EventLoopGroup 包含一个或者多个 EventLoop;
一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;
所有有 EnventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理;
一个 Channel 在它的生命周期内只注册于一个 EventLoop;
每一个 EventLoop 负责处理一个或多个 Channel;
五、netty线程模型:reactor
基于事件驱动响应的模型,epoll也属于事件驱动,主线程组负责连接、工作线程组负责其他业务,一个selector相当于一个reactor对应一个线程,bossGroup和workerGroup属于多Reactor模型。就是reactor的多线程模型。分类有Reactor 单线程;Reactor 多线程;主从 Reactor 多线程。
一个连接里完整的网络处理过程一般分为accept、read、decode、process、encode、send这几步。Reactor模式将每个步骤映射为一个Task,服务端线程执行的最小逻辑单元不再是一次完整的网络请求,而是Task,且采用非阻塞方式执行。
六、异步处理
netty是异步非阻塞框架,通过线程池多线程模型来完成的。异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。
Netty 中的异步事件处理,Event被放入EventQueue即可返回,后续再从Queue消费处理
Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture。调用者并不能立刻获得结果,而是通过 Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。
当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。
常见有如下操作:
(1)通过 isDone 方法来判断当前操作是否完成;
(2)通过 isSuccess 方法来判断已完成的当前操作是否成功;
(3)通过 getCause 方法来获取已完成的当前操作失败的原因;
(4)通过 isCancelled 方法来判断已完成的当前操作是否被取消;
(5)通过 addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果 Future 对象已完成,则理解通知指定的监听器。
例如下面的代码中绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑:
serverBootstrap.bind(port).addListener(future -> {if(future.isSuccess()) {System.out.println(newDate() + ": 端口["+ port + "]绑定成功!");} else{System.err.println("端口["+ port + "]绑定失败!");}});
七、零拷贝
(1)Netty 的接收和发送 ByteBuffer默认采用 DIRECT BUFFERS ,使用堆外直接内存进 行 Socket 读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存 ( HEAP BUFFERS)进行 Socket 读写, JVM 会将堆内存 Buffer 拷贝一份到直接内 存中,然后才写入 Socket 中。相比于堆外直接内存,消息在发送过程中多了一次缓 冲区的内存拷贝。
(2)Netty 提供了组合 Buffer 对象,可以聚合多个 ByteBuffer 对象,用户可以像操作一个 Buffer 那样方便的对组合 Buffer 进行操作,避免了传统通过内存拷贝的方式 将几个 小 Buffer 合并成一个大的 Buffer 。
(3)Netty 的文件传输采用了 transferTo 方法,它可以直接将文件缓冲区的数据发送到目标 Channel ,避免了传统通过循环 write 方式导致的内存拷贝问题。
八、源码分析
1、 EventLoopGroup bossGroup =new NioEventLoopGroup(1); EventLoopGroup workGroup =new NioEventLoopGroup();
这两行代码就是创建两个线程池组,第一个是boss线程组,第二个是工作线程组,netty底层的boos就是用来处理连接事件,然后将连接注册到工作线程上,其实在NioEventLoopoGroup中如果初始化的线程数是1,那么底层就是一个多路复用器,也就是说传入了几个线程,就会产生多少给多路复用器Selector。
public NioEventLoopGroup(int nThreads, Executor executor) {/*** 这个构造中第三个参数就是Nio的操作,nio中创建一个ServerSocket就是通过ServerSocket.open,* 其实底层执行的代码就是调用的SelectorProvider.provider().openServerSocketChannel(),所以这里* 传入的是NIO的相关入口api,后续要使用*/this(nThreads, executor, SelectorProvider.provider());}/*** 这里是初始化Nio线程组,在Netty中是线程组,线程组使用的是MultithreadEventLoopGroup,而每个线程组* 中都是一个NioEventLoop,对应的是SingleThreadEventLoop* @param nThreads* @param executor* @param selectorProvider* @param selectStrategyFactory*/public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());}//MultithreadEventLoopGroup是NioEventLoopGroup的父类protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {/*** 在构造线程组的时候如果没有传入线程的具体数字,也就是没有指定线程数的情况下,那么netty会默认* 分配一个线程 数量:Math.max(1, SystemPropertyUtil.getInt(* "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2))*其实简单来说就是获取服务器的cpu核数 * 2,比如我的电脑是8核 ,那么线程数就是16*/super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);}protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));}//初始化线程池if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}/*** 这里就是初始化线程组的,比如说 NioEventLoopGroup初始化的是16,那么这里就会初始化一个线程组* EventExecutor数组为16,这里面的每一个都是一个线程对象,这个线程对象是NioEventLoop,也是一个* SingleThreadEventLoop,所以NioEventLoopGroup中包含的是MultitheadEventLoopGroup,而每个* MultitheadEventLoopGroup中的线程是NioEventLoop(SingleThreadEventLoop),在里面维护了一个* 线程池,所以NioEventLoop中有一个多路复用器Selector,简单来说就是初始化的NioEventLoopGroup有多个个线程* 就有多少个多路多路复用器Selector和TaskQueue,而Selector和TaskQueue是存在于NioEventLoop中的**/children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {boolean success = false;try {//创建一个NioEventLoop,放入线程组,创建NioEventLoop的时候会创建Selector和taskQueue,并且维护了一个线程池//线程池是共用的,就是NioEventLoopGroup初始化的线程池Executor是共用的children[i] = newChild(executor, args);success = true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException("failed to create a child event loop", e);} finally {....io.netty.channel.nio.NioEventLoopGroup#newChild@Overrideprotected EventLoop newChild(Executor executor, Object... args) throws Exception {EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;//创建一个NioEventLoop,创建Selector和taskQueuereturn new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);}NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,EventLoopTaskQueueFactory queueFactory) {//这里创建了一个TaskQueue,然后调用父类的构造super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),rejectedExecutionHandler);this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");//调用NIO的api创建一个多路复用器Selector,是通过provider创建的final SelectorTuple selectorTuple = openSelector();//将创建的多路复用器Selector赋值给全局的selectorthis.selector = selectorTuple.selector;//unwrappedSelector和selector是相同的this.unwrappedSelector = selectorTuple.unwrappedSelector;}//SingleThreadEventExecutor是NioEventLoop父类protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,boolean addTaskWakesUp, Queue<Runnable> taskQueue,RejectedExecutionHandler rejectedHandler) {super(parent);this.addTaskWakesUp = addTaskWakesUp;//设置最大的task任务数量 Integer.MAX_VALUEthis.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;this.executor = ThreadExecutorMap.apply(executor, this);this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");}
上面的源码分析:
1.通过NioEventLoopGroup创建一个线程组,这个线程组是一个MultithreadEventExecutor Group,然后在MultithreadEventExecutorGroup中根据传入的线程数会创建很多个NioEve ntLoop,每个NioEventLoop中都会维护一个线程池Executor、一个Selector多路复用器、一个taskQueue,其中selector、taskQueue是NioEventLoop是每个NioEventLoop私有的,就是比如创建了16个NioEventLoop对象,那么每个事件循环线程组对象NioEventLoop中的selector、taskQueue都是私有的,而Executor是共有的,就是所有的对象都会共用Executor这个线程池。
2.然后创建的所有的事件线程组对象组成一个EventExecutor数组 ,EventExecutor的子类是NioEventLoop。
3.NioEventLoopGroup是一个线程组,这个线程组是通过MultithreadEventExecutorGroup创建的,所以NioEventLoopGroup对应的是MultithreadEventExecutorGroup,而NioEve ntLoop是通过MultithreadEventExecutorGroup创建的,创建的是一个SingleThreadEvent Loop线程组,就是单线程组。
4.每个SingleThreadEventLoop中会创建一个多路复用器selector和一个taskQueue,task Queue主要存放一个任务,然后通过executor去执行这个任务,其实对于boss来说,这个task其实就是注册事件。
2、ServerBootstrap启动类链式编程
ServerBootstrap就是服务端启动的一个启动类,包含了netty启动的所有过程,它采用的是链式编程,比如绑定线程组、设置tcp的一些参数和处理器都是通过启动类来绑定的,前面声明了两个线程组,但是声明的线程组如果没有绑定到启动类,也就是主运行的类上是没有任何效果的。
绑定线程组:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");}//将工作线程组的对象赋值给childGroup,没有其他过程this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");return this;}public B group(EventLoopGroup group) {ObjectUtil.checkNotNull(group, "group");if (this.group != null) {throw new IllegalStateException("group set already");}//将boss的线程组绑定到group中,就是一个赋值的过程,没有其他的操作this.group = group;return self();}
channel(NioServerSocketChannel.class):
public B channel(Class<? extends C> channelClass) {/*** 这里将传入的比如NioServerSocketChannel通过反射创建一个ReflectiveChannelFactory对象,然后* 调用channelFactory将创建的ReflectiveChannelFactory对象赋值给ServerBootstrap 中channelFactory属性* 所以这里记录下,在ServerBootstrap中有个属性为channelFactory* 这里的new ReflectiveChannelFactory其实就是得到了传入进来的channelClass一个对象,通过反射调用* 构造NioServerSocketChannel的构造空构造方法*/return channelFactory(new ReflectiveChannelFactory<C>(ObjectUtil.checkNotNull(channelClass, "channelClass")));}io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel()/*** Create a new instance* 这里是一个空构造方法,在这个构造方法中做的事情有:* 1.创建一个ServerSocketChannel(NIO);* 2.将创建的ServerSocketChannel设置一些参数,比如设置这个ServerSocketChannel的事件为OP_ACCEPT,就是接受连接事件* 3.设置ServerSocketChannel为非阻塞的模式。*/public NioServerSocketChannel() {//newSocket就是创建一个ServerSocketChannel,调用的就是NIO的创建逻辑this(newSocket(DEFAULT_SELECTOR_PROVIDER));}private static ServerSocketChannel newSocket(SelectorProvider provider) {try {/*** Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.** See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.* 调用NIO创建一个ServerSocketChannel*/return provider.openServerSocketChannel();} catch (IOException e) {throw new ChannelException("Failed to open a server socket.", e);}}public NioServerSocketChannel(ServerSocketChannel channel) {//调用父类将这个ServerSocketChannel设置为非阻塞模式和将创建的ServerSocketChannel赋值到父类的属性ch中super(null, channel, SelectionKey.OP_ACCEPT);config = new NioServerSocketChannelConfig(this, javaChannel().socket());}//AbstractNioChannel是NioServerSocketChannel的父类protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {//调用父类初始化的管道ChannelPipelinesuper(parent);//赋值channelthis.ch = ch;//设置这个ServerSocketChannel的事件类型为readInterestOp(OP_ACCEPT)this.readInterestOp = readInterestOp;try {//设置这个ServerSocketChannel为非阻塞的模式ch.configureBlocking(false);} catch (IOException e) {try {ch.close();} catch (IOException e2) {logger.warn("Failed to close a partially initialized socket.", e2);}throw new ChannelException("Failed to enter non-blocking mode.", e);}}//AbstractChannel是AbstractNioChannel的父类protected AbstractChannel(Channel parent) {this.parent = parent;//创建一个ChannelIdid = newId();//创建AbstractUnsafe对象,后续要使用unsafe = newUnsafe();//初始化管道pipeline对象,这个对象非常重要,就是我们的管道的处理器就是放在里面的,所有的处理器都会加入到这个管道中//这个管道就是一个双向链表,有head和tail,默认的管道实现类是DefaultChannelPipelinepipeline = newChannelPipeline();}io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline//channel就是刚刚NioServerSocketChannel,也是一个ServerScoketChannelprotected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise = new VoidChannelPromise(channel, true);//这个管道pipeline中的尾部tail = new TailContext(this);//这个管道pipeline中的头部head = new HeadContext(this);//建立双向链表关系head.next = tail;tail.prev = head;}
上面的过程就是在绑定一个NioServerSocketChannel,其实简单来说就是
1.将NioServerSocketChannel通过反射调用构造方法封装成一个ReflectiveChannelFactory对象,然后设置这个对象到ServerBootstrap中的属性channelFactory中,ReflectiveChan nelFactory包含了NioServerSocketChannel中的构造方法 和通过反射得到实例对象的方法,其实就是对NioServerSocketChannel的一个包装类。
2.第一步是通过反射得到NioServerSocketChannel对象,所以NioServerSocketChannel中构造所做的事情就是调用Nio创建一个ServerSocketChannel,然后设置这个ServerSocket Channel的事件类型为OP_ACCETP,并且设置为阻塞。
3.在这个构造方法中还做一件很重要的事情就是初始化管道DefaultChannelPipeline,这个管道对象中还创建了一个双向链表的管道对象head和tail,分别对应AbstractChannelHandler Context head;final AbstractChannelHandlerContext tail。
ChannelFuture channelFuture = bootstrap.bind(8000).sync():
上面的一些操作都是Netty启动前需要做的一些准备工作,真正的启动流程就在最后一行代码 bind(8000),这行代码承载了Netty启动的最复杂的逻辑。
public ChannelFuture bind(int inetPort) {//绑定启动,将端口封装成了一个InetSocketAddress对象return bind(new InetSocketAddress(inetPort));}
之前NIO编程创建一个ServerSocketChannel,然后绑定一个端口,最后调用epoll的select进行阻塞,当有事件过来的时候select就会解除阻塞,最后通过selectKeys来接受事件,Netty也是这样的流程。
/*** 这个方法就是Netty的启动过程的最核心的方法,前面的都只是为 下面这个方法做准备的* 这个方法里面有两个最核心的方法,* initAndRegister:初始化和注册,注册的是将创建ServerSocketChannel注册到多路复用器selector上* doBind0:端口绑定,启动监听* 总结来说就是initAndRegister配置Netty和注册多路复用器* doBind0:绑定端口,调用epoll阻塞,监听事件的发生* @param localAddress* @return*/private ChannelFuture doBind(final SocketAddress localAddress) {//初始化和注册(太多 )final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}/*** 下面的代码逻辑就是绑定端口,开启select阻塞* 首先条件isDone第一次启动是不会进入这里,走的else的逻辑* 在else逻辑中添加了一个监听器,最后由监听器出发了doBind0方法* 所以不管怎么说,核心代码逻辑都是doBind0*/if (regFuture.isDone()) {// At this point we know that the registration was complete and successful.ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);//添加一个监听器,最后由监听器调用了doBind0方法regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel.promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered();/*** 在DefaultChannelPromise中的notifyListeners进行调用的*/doBind0(regFuture, channel, localAddress, promise);}}});return promise;}}
/*** 多路复用器注册的核心方法* @param task*/@Overridepublic void execute(Runnable task) {ObjectUtil.checkNotNull(task, "task");execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));}/*** 多路复用器注册的核心方法,task就是最外层的那个register0所在的线程* @param task* @param immediate*/private void execute(Runnable task, boolean immediate) {//这里就是和外层的那个判断一样,就是看当前线程是否是NioEventLoop的那个线程,代码到这里还在主线程中,所以这里返回的false//因为task线程还没开始执行boolean inEventLoop = inEventLoop();//将注册的那个register0所在线程加入到task任务中,也就是加入到taskQueue.offer(task)队列中,等待调度addTask(task);if (!inEventLoop) {//开始线程startThread();if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {// The task queue does not support removal so the best thing we can do is to just move on and// hope we will be able to pick-up the task before its completely terminated.// In worst case we will log on termination.}if (reject) {reject();}}}if (!addTaskWakesUp && immediate) {wakeup(inEventLoop);}}private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success = false;try {doStartThread();success = true;} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}}}
通道注册到多不复用器:
io.netty.channel.AbstractChannel.AbstractUnsafe#register
@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");//这里表示已经注册过了,不需要在注册if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}//eventLoop 是从boss 线程组选择出来一个NioEventLoopAbstractChannel.this.eventLoop = eventLoop;//下面这个if表示当前的线程是否和NioEventLoop是否在一个线程中的//反正就是判断需要同步处理还是异步处理,Netty是一个异步非阻塞的通信框架,所以很多地方都是使用这种线程池的方案去处理的//所以毫无疑问,register0是核心方法if (eventLoop.inEventLoop()) {register0(promise);} else {try {/*** 这里不要被这个execute给迷糊了,看到这个以为是一个线程的启动方法,其实不是,其实就是简单的* 调用了对象eventLoop中的execute方法,这个方法传入了一个线程对象,但是肯定不是在这里执行的这个线程的run方法* 的,所以这里记住这个execute方法中传入了一个内部类线程,肯定要先看这个execute方法,而这里的* eventLoop是NioEventLoop,也是SingleThreadEventExecutor*/eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}}io.netty.util.concurrent.SingleThreadEventExecutor#execute(java.lang.Runnable)/*** 多路复用器注册的核心方法* @param task*/@Overridepublic void execute(Runnable task) {ObjectUtil.checkNotNull(task, "task");execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));}/*** 多路复用器注册的核心方法,task就是最外层的那个register0所在的线程* @param task* @param immediate*/private void execute(Runnable task, boolean immediate) {//这里就是和外层的那个判断一样,就是看当前线程是否是NioEventLoop的那个线程,代码到这里还在主线程中,所以这里返回的false//因为task线程还没开始执行boolean inEventLoop = inEventLoop();//将注册的那个register0所在线程加入到task任务中,也就是加入到taskQueue.offer(task)队列中,等待调度addTask(task);if (!inEventLoop) {//开始线程startThread();if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {// The task queue does not support removal so the best thing we can do is to just move on and// hope we will be able to pick-up the task before its completely terminated.// In worst case we will log on termination.}if (reject) {reject();}}}if (!addTaskWakesUp && immediate) {wakeup(inEventLoop);}}private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success = false;try {doStartThread();success = true;} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}}}
io.netty.channel.nio.NioEventLoop#run
这个run方法就是NioEventLoop的,channel注册多路复用器、select阻塞、事件的处理等都在这个run里面执行的,为什么说netty是异步非阻塞,就是因为它都是通过线程池来完成的;简单来说就是这个run方法将之前添加的taskQueue中的线程任务都拿出来要执行,但是需要将run的逻辑执行完成了 才会执行,所以当你看到eventLoop.execute(Runnable task)的时候你就要知道task的执行是execute去执行的,但是需要execute中的逻辑执行完成了才会去执行这个task,而Netty的这里的很多任务都是这样做的,就是当有eventLoop.execute (Runnable task)的时候,Netty首先将这个task添加到taskQueue中,然后执行了execute中的逻辑过后,然后将taskQueue中的task拿出来,然后调用task的run执行任务的核心逻辑,netty中大量的处理都是在taskQueue中去完成的,所以 对于eventLoop.execute(task)这个核心逻辑要明白才行。
/*** 这里的run方法才是真正的将NIOServerSocketChannel注册到多路复用器上*/@Overrideprotected void run() {int selectCnt = 0;for (;;) {try {int strategy;try {/***这里就是判断现在是那种事件,如果Netty在启动的时候,这里的case都不满足,但是如果启动完成了*那么再来调用就会调用SELECT,所以根据不同的情况来的*/strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT://这里就是调用的多路复用器的select方法,就是在nio中的selector.select()方法/*** curDeadlineNanos这个是计算调用select的时候阻塞的时间的,因为netty这里 还有很多事情没有做* 注册多路复用器都还没有做,所以这里要计算出一个时间,如果这个时间过了,还没有连接过来,那么也会唤醒,* 不会一直阻塞*/long curDeadlineNanos = nextScheduledTaskDeadlineNanos();if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE; // nothing on the calendar}nextWakeupNanos.set(curDeadlineNanos);try {/*** 如果是第一次启动的注册多路复用器是不会进入这个 if的,因为* hasTasks就是判断当前的多路复用器注册队列中是是否有任务,如果有任务,* 那么返回true,没有就返回 false的,netty的意思就是服务端刚启动要注册的是NioServerSocketChannel* 所以不阻塞,否则阻塞了没意义,因为多路复用器都还没有注册的有channel,根本没有办法监听*/if (!hasTasks()) {strategy = select(curDeadlineNanos);}} finally {// This update is just to help block unnecessary selector wakeups// so use of lazySet is ok (no race condition)nextWakeupNanos.lazySet(AWAKE);}// fall throughdefault:}} catch (IOException e) {// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();selectCnt = 0;handleLoopException(e);continue;}/*** strategy在Netty启动的时候是0*/selectCnt++;cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;boolean ranTasks;if (ioRatio == 100) {try {if (strategy > 0) {//调用select有事件发生的时候会来调用,这里面就是NIO的那个获取selectKeys然后处理的逻辑,netty封装了processSelectedKeys();}} finally {// Ensure we always run tasks.ranTasks = runAllTasks();}} else if (strategy > 0) {final long ioStartTime = System.nanoTime();try {//调用select有事件发生的时候会来调用processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}} else {//Netty启动的时候调用,这里才是真正的多路复用器的注册,就是调用最开始的那个execute(Runable task)中的那个task//就是从taskQueue中取出这个线程任务然后执行run方法ranTasks = runAllTasks(0); // This will run the minimum number of tasks}if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}selectCnt = 0;} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)selectCnt = 0;}} catch (CancelledKeyException e) {// Harmless exception - log anywayif (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}} catch (Error e) {throw (Error) e;} catch (Throwable t) {handleLoopException(t);} finally {// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Error e) {throw (Error) e;} catch (Throwable t) {handleLoopException(t);}}}}
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {try {// check if the channel is still open as it could be closed in the mean time when the register// call was outside of the eventLoopif (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration = neverRegistered;//这里就是将NioServerSocketChannel注册到多路复用上SelectordoRegister();neverRegistered = false;registered = true;// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the// user may already fire events through the pipeline in the ChannelFutureListener./*** 这个方法的调用就是处理管道的handler的,就是处理器,* 它的过程会将之前创建的ChannelInitializer那个管道中的处理器添加到管道中,然后删除* 简单来说就是先将ChannelInitializer中的那个initChannel方法调用完成过后,然后删除这个ChannelInitializer那* 内部类*/pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);//这里的调用的注册,那么注册也要调用管道中的所有的有实现了注册的处理器,调用是从链表的head开始的//当看到有调用fireChannelRegistered方法的, 那么证明是调用了下一个处理器的ChannelRegistered方法pipeline.fireChannelRegistered();// Only fire a channelActive if the channel has never been registered. This prevents firing// multiple channel actives if the channel is deregistered and re-registered.if (isActive()) {//表示netty现在是激活的状态来调用的if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {// This channel was registered before and autoRead() is set. This means we need to begin read// again so that we process inbound data.//// See https://github.com/netty/netty/issues/4805beginRead();}}} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}@Overrideprotected void doRegister() throws Exception {boolean selected = false;for (;;) {try {//将NioServerSocketChannel 注册到selector多路复用器 上selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// Force the Selector to select now as the "canceled" SelectionKey may still be// cached and not removed because no Select.select(..) operation was called yet.eventLoop().selectNow();selected = true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?throw e;}}}}
