深入netty网络编程
一、netty简介
Netty是基于Java NIO 的异步事件驱动的网络应用框架,使用Netty可以快速开发网络应用,Netty提供了高层次的抽象来简化TCP和UDP服务器socket编程。同时保证高吞吐量、低延时、高可靠性。
二、netty vs nio编程
NIO开发的问题:
1、NIO类库和API复杂,使用麻烦。
2、需要具备Java多线程编程能力(涉及到Reactor模式)。
3、客户端断线重连、网络不稳定、半包读写、失败缓存、网络阻塞和异常码流等问题处理难度非常大。
4、存在部分BUG。
5、要写大量繁琐的代码,创建、注册,判断事件等,很多程序只关注数据的收发以及对数据的逻辑处理,NIO需要手动创建buffer缓冲区。
netty优势:
1、API使用简单,开发门槛低。
2、功能强大,预置了多种编解码功能,支持多种主流协议。
3、定制功能强,可以通过ChannelHandler对通信框架进行灵活的扩展。
4、性能高,通过与其他业界主流的NIO框架对比,Netty综合性能最优。
5、成熟、稳定,Netty修复了已经发现的NIO所有BUG。
6、社区活跃。
7、经历了很多商用项目的考验。
8、Netty不需要程序员创建bytebuf,直接读取数据到已有的缓冲区中。
9、使用主线程组负责连接,工作线程组负责其他的业务处理。
10、更加优雅的 Reactor 模式实现、灵活的线程模型、利用 EventLoop 等创新性的机制,可以非常高效地管理成百上千的 Channel。
11、充分利用了 Java 的 Zero-Copy 机制,并且从多种角度,“斤斤计较”般的降低内存分配和回收的开销。例如,使用池化的 Direct Buffer 等技术,在提高 IO 性能的同时,减少了对象的创建和销毁;
12、利用反射等技术直接操纵 SelectionKey,使用数组而不是 Java 容器等。
13、使用更多本地代码。例如,直接利用 JNI 调用 Open SSL 等方式,获得比 Java 内建 SSL 引擎更好的性能。
14、在通信协议、序列化等其他角度的优化,从网络协议的角度,Netty 除了支持传输层的 UDP、TCP、SCTP协议,也支持 HTTP(s)、WebSocket 等多种应用层协议,它并不是单一协议的 API。
15、总的来说,Netty 并没有 Java 核心类库那些强烈的通用性、跨平台等各种负担,针对性能等特定目标以及 Linux 等特定环境,采取了一些极致的优化手段。
16、异步非阻塞(线程池处理)、基于事件驱动、高性能、高可靠性和高可定制性。
三、组件
Channel
netty中把一个端到端的通信定义为了通道。所谓的端包含但不限于硬件设备,文件。这是对通信的第一层抽象。通道也是一个连接在Java中(Socket类),基本的 I/O 操作(bind()、 connect()、 read()和 write())依赖于底层网络传输所提供的功能。Netty 的 Channel 接口所提 供的 API降低了直接使用 Soc ket 类的复杂性。Netty中也提供了使用多种方式连接的Channel:
- EmbeddedChannel
- LocalServerChannel
- NioDatagramChannel
- NioSctpChannel
- NioSocketChannel
Selector
选择器或多路复用器,Netty基于Selector对象实现(底层epoll实现)I/O多路复用,通过 Selector绑定有EventLoop一个线程可以监听多个连接的Channel事件, 当向一个Selector中注册Channel 后,Selector 内部的机制就可以自动不断地查询(select) 这些注册的Channel是否有已就绪的I/O事件(例如可读, 可写, 网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel。
ByteBuf
缓冲区,对字节数据的封装,由最小容量、最大容量、读指针、写指针组成。
直接内存 vs 堆内存:
- 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用。
- 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放。
池化 vs 非池化,池化的最大意义在于可以重用 ByteBuf,优点有:
-没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力。
-有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率。
-高并发时,池化功能更节约内存,减少内存溢出的可能。
Bootstrap、ServerBootstrap
Bootstrap意思是引导,一个Netty应用通常由一个Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类。
NioEventLoop
NioEventLoop中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务:、
-I/O任务 即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法触发。
-非IO任务 添加到taskQueue中的任务,如register0、bind0等任务,由runAllTasks方法触发。
NioEventLoopGroup
NioEventLoopGroup,主要管理eventLoop的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个Channel上的事件,而一个Channel只对应于一个线程。
ChannelHandler
ChannelHandler是一个接口,处理I/O事件或拦截I/O操作,并将其转发到其 ChannelPi peline(业务处理链)中的下一个处理程序。ChannelHandler 用来处理 Channe 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline。
ChannelHandlerContext
保存Channel相关的所有上下文信息,同时关联一个ChannelHandler对象。
ChannelPipline
保存ChannelHandler的List,用于处理或拦截Channel的入站事件和出站操作(分包粘包、编码解码等处理),ChannelPipeline实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及Channel中各个的ChannelHandler如何相互交互。
ChannelFuture
这是 Netty 实现异步 IO 的基础之一,保证了同一个 Channel 操作的调用顺序。Netty 扩展了 Java 标准的 Future,提供了针对自己场景的特有Future定义。
四、启动流程
public class NettyServer {
//创建两个线程组bossGroup和workGroup,含有的线程NioEventLoop的个数默认为cpu核数的两倍
//bossGroup主线程组只负责请求连接,真正和客户端业务处理交给workGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup(10);
try {
//创建服务端启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来配置参数
bootstrap.group(bossGroup, workGroup)
//使用NioServerSocketChannel作为服务器的通道实现
.channel(NioServerSocketChannel.class)
// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG, 1024)
//创建通道初始化对象,设置初始化参数
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//对workGroup的SocketChannel设置处理器
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty server start success ");
//绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
// 启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
ChannelFuture channelFuture = bootstrap.bind(8000).sync();
//对通道关闭进行监听,closeFuture是异步操作,监听通道关闭
// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成
channelFuture.channel().closeFuture().sync();
} catch (Exception ce) {
ce.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
NioEventLoopGroup 实际上就是个线程池,一个 EventLoopGroup 包含一个或者多个 EventLoop;
一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;
所有有 EnventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理;
一个 Channel 在它的生命周期内只注册于一个 EventLoop;
每一个 EventLoop 负责处理一个或多个 Channel;
五、netty线程模型:reactor
基于事件驱动响应的模型,epoll也属于事件驱动,主线程组负责连接、工作线程组负责其他业务,一个selector相当于一个reactor对应一个线程,bossGroup和workerGroup属于多Reactor模型。就是reactor的多线程模型。分类有Reactor 单线程;Reactor 多线程;主从 Reactor 多线程。
一个连接里完整的网络处理过程一般分为accept、read、decode、process、encode、send这几步。Reactor模式将每个步骤映射为一个Task,服务端线程执行的最小逻辑单元不再是一次完整的网络请求,而是Task,且采用非阻塞方式执行。
六、异步处理
netty是异步非阻塞框架,通过线程池多线程模型来完成的。异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。
Netty 中的异步事件处理,Event被放入EventQueue即可返回,后续再从Queue消费处理
Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture。调用者并不能立刻获得结果,而是通过 Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。
当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。
常见有如下操作:
(1)通过 isDone 方法来判断当前操作是否完成;
(2)通过 isSuccess 方法来判断已完成的当前操作是否成功;
(3)通过 getCause 方法来获取已完成的当前操作失败的原因;
(4)通过 isCancelled 方法来判断已完成的当前操作是否被取消;
(5)通过 addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果 Future 对象已完成,则理解通知指定的监听器。
例如下面的代码中绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑:
serverBootstrap.bind(port).addListener(future -> {
if(future.isSuccess()) {
System.out.println(newDate() + ": 端口["+ port + "]绑定成功!");
} else{
System.err.println("端口["+ port + "]绑定失败!");
}
});
七、零拷贝
(1)Netty 的接收和发送 ByteBuffer默认采用 DIRECT BUFFERS ,使用堆外直接内存进 行 Socket 读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存 ( HEAP BUFFERS)进行 Socket 读写, JVM 会将堆内存 Buffer 拷贝一份到直接内 存中,然后才写入 Socket 中。相比于堆外直接内存,消息在发送过程中多了一次缓 冲区的内存拷贝。
(2)Netty 提供了组合 Buffer 对象,可以聚合多个 ByteBuffer 对象,用户可以像操作一个 Buffer 那样方便的对组合 Buffer 进行操作,避免了传统通过内存拷贝的方式 将几个 小 Buffer 合并成一个大的 Buffer 。
(3)Netty 的文件传输采用了 transferTo 方法,它可以直接将文件缓冲区的数据发送到目标 Channel ,避免了传统通过循环 write 方式导致的内存拷贝问题。
八、源码分析
1、 EventLoopGroup bossGroup =new NioEventLoopGroup(1); EventLoopGroup workGroup =new NioEventLoopGroup();
这两行代码就是创建两个线程池组,第一个是boss线程组,第二个是工作线程组,netty底层的boos就是用来处理连接事件,然后将连接注册到工作线程上,其实在NioEventLoopoGroup中如果初始化的线程数是1,那么底层就是一个多路复用器,也就是说传入了几个线程,就会产生多少给多路复用器Selector。
public NioEventLoopGroup(int nThreads, Executor executor) {
/**
* 这个构造中第三个参数就是Nio的操作,nio中创建一个ServerSocket就是通过ServerSocket.open,
* 其实底层执行的代码就是调用的SelectorProvider.provider().openServerSocketChannel(),所以这里
* 传入的是NIO的相关入口api,后续要使用
*/
this(nThreads, executor, SelectorProvider.provider());
}
/**
* 这里是初始化Nio线程组,在Netty中是线程组,线程组使用的是MultithreadEventLoopGroup,而每个线程组
* 中都是一个NioEventLoop,对应的是SingleThreadEventLoop
* @param nThreads
* @param executor
* @param selectorProvider
* @param selectStrategyFactory
*/
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
//MultithreadEventLoopGroup是NioEventLoopGroup的父类
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
/**
* 在构造线程组的时候如果没有传入线程的具体数字,也就是没有指定线程数的情况下,那么netty会默认
* 分配一个线程 数量:Math.max(1, SystemPropertyUtil.getInt(
* "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2))
*其实简单来说就是获取服务器的cpu核数 * 2,比如我的电脑是8核 ,那么线程数就是16
*/
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
//初始化线程池
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
/**
* 这里就是初始化线程组的,比如说 NioEventLoopGroup初始化的是16,那么这里就会初始化一个线程组
* EventExecutor数组为16,这里面的每一个都是一个线程对象,这个线程对象是NioEventLoop,也是一个
* SingleThreadEventLoop,所以NioEventLoopGroup中包含的是MultitheadEventLoopGroup,而每个
* MultitheadEventLoopGroup中的线程是NioEventLoop(SingleThreadEventLoop),在里面维护了一个
* 线程池,所以NioEventLoop中有一个多路复用器Selector,简单来说就是初始化的NioEventLoopGroup有多个个线程
* 就有多少个多路多路复用器Selector和TaskQueue,而Selector和TaskQueue是存在于NioEventLoop中的
*
*/
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//创建一个NioEventLoop,放入线程组,创建NioEventLoop的时候会创建Selector和taskQueue,并且维护了一个线程池
//线程池是共用的,就是NioEventLoopGroup初始化的线程池Executor是共用的
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
....
io.netty.channel.nio.NioEventLoopGroup#newChild
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
//创建一个NioEventLoop,创建Selector和taskQueue
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
//这里创建了一个TaskQueue,然后调用父类的构造
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
//调用NIO的api创建一个多路复用器Selector,是通过provider创建的
final SelectorTuple selectorTuple = openSelector();
//将创建的多路复用器Selector赋值给全局的selector
this.selector = selectorTuple.selector;
//unwrappedSelector和selector是相同的
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
//SingleThreadEventExecutor是NioEventLoop父类
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
//设置最大的task任务数量 Integer.MAX_VALUE
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
this.executor = ThreadExecutorMap.apply(executor, this);
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
上面的源码分析:
1.通过NioEventLoopGroup创建一个线程组,这个线程组是一个MultithreadEventExecutor Group,然后在MultithreadEventExecutorGroup中根据传入的线程数会创建很多个NioEve ntLoop,每个NioEventLoop中都会维护一个线程池Executor、一个Selector多路复用器、一个taskQueue,其中selector、taskQueue是NioEventLoop是每个NioEventLoop私有的,就是比如创建了16个NioEventLoop对象,那么每个事件循环线程组对象NioEventLoop中的selector、taskQueue都是私有的,而Executor是共有的,就是所有的对象都会共用Executor这个线程池。
2.然后创建的所有的事件线程组对象组成一个EventExecutor数组 ,EventExecutor的子类是NioEventLoop。
3.NioEventLoopGroup是一个线程组,这个线程组是通过MultithreadEventExecutorGroup创建的,所以NioEventLoopGroup对应的是MultithreadEventExecutorGroup,而NioEve ntLoop是通过MultithreadEventExecutorGroup创建的,创建的是一个SingleThreadEvent Loop线程组,就是单线程组。
4.每个SingleThreadEventLoop中会创建一个多路复用器selector和一个taskQueue,task Queue主要存放一个任务,然后通过executor去执行这个任务,其实对于boss来说,这个task其实就是注册事件。
2、ServerBootstrap启动类链式编程
ServerBootstrap就是服务端启动的一个启动类,包含了netty启动的所有过程,它采用的是链式编程,比如绑定线程组、设置tcp的一些参数和处理器都是通过启动类来绑定的,前面声明了两个线程组,但是声明的线程组如果没有绑定到启动类,也就是主运行的类上是没有任何效果的。
绑定线程组:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
//将工作线程组的对象赋值给childGroup,没有其他过程
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
public B group(EventLoopGroup group) {
ObjectUtil.checkNotNull(group, "group");
if (this.group != null) {
throw new IllegalStateException("group set already");
}
//将boss的线程组绑定到group中,就是一个赋值的过程,没有其他的操作
this.group = group;
return self();
}
channel(NioServerSocketChannel.class):
public B channel(Class<? extends C> channelClass) {
/**
* 这里将传入的比如NioServerSocketChannel通过反射创建一个ReflectiveChannelFactory对象,然后
* 调用channelFactory将创建的ReflectiveChannelFactory对象赋值给ServerBootstrap 中channelFactory属性
* 所以这里记录下,在ServerBootstrap中有个属性为channelFactory
* 这里的new ReflectiveChannelFactory其实就是得到了传入进来的channelClass一个对象,通过反射调用
* 构造NioServerSocketChannel的构造空构造方法
*/
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel()
/**
* Create a new instance
* 这里是一个空构造方法,在这个构造方法中做的事情有:
* 1.创建一个ServerSocketChannel(NIO);
* 2.将创建的ServerSocketChannel设置一些参数,比如设置这个ServerSocketChannel的事件为OP_ACCEPT,就是接受连接事件
* 3.设置ServerSocketChannel为非阻塞的模式。
*/
public NioServerSocketChannel() {
//newSocket就是创建一个ServerSocketChannel,调用的就是NIO的创建逻辑
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
* 调用NIO创建一个ServerSocketChannel
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
public NioServerSocketChannel(ServerSocketChannel channel) {
//调用父类将这个ServerSocketChannel设置为非阻塞模式和将创建的ServerSocketChannel赋值到父类的属性ch中
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
//AbstractNioChannel是NioServerSocketChannel的父类
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
//调用父类初始化的管道ChannelPipeline
super(parent);
//赋值channel
this.ch = ch;
//设置这个ServerSocketChannel的事件类型为readInterestOp(OP_ACCEPT)
this.readInterestOp = readInterestOp;
try {
//设置这个ServerSocketChannel为非阻塞的模式
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
//AbstractChannel是AbstractNioChannel的父类
protected AbstractChannel(Channel parent) {
this.parent = parent;
//创建一个ChannelId
id = newId();
//创建AbstractUnsafe对象,后续要使用
unsafe = newUnsafe();
//初始化管道pipeline对象,这个对象非常重要,就是我们的管道的处理器就是放在里面的,所有的处理器都会加入到这个管道中
//这个管道就是一个双向链表,有head和tail,默认的管道实现类是DefaultChannelPipeline
pipeline = newChannelPipeline();
}
io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline
//channel就是刚刚NioServerSocketChannel,也是一个ServerScoketChannel
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
//这个管道pipeline中的尾部
tail = new TailContext(this);
//这个管道pipeline中的头部
head = new HeadContext(this);
//建立双向链表关系
head.next = tail;
tail.prev = head;
}
上面的过程就是在绑定一个NioServerSocketChannel,其实简单来说就是
1.将NioServerSocketChannel通过反射调用构造方法封装成一个ReflectiveChannelFactory对象,然后设置这个对象到ServerBootstrap中的属性channelFactory中,ReflectiveChan nelFactory包含了NioServerSocketChannel中的构造方法 和通过反射得到实例对象的方法,其实就是对NioServerSocketChannel的一个包装类。
2.第一步是通过反射得到NioServerSocketChannel对象,所以NioServerSocketChannel中构造所做的事情就是调用Nio创建一个ServerSocketChannel,然后设置这个ServerSocket Channel的事件类型为OP_ACCETP,并且设置为阻塞。
3.在这个构造方法中还做一件很重要的事情就是初始化管道DefaultChannelPipeline,这个管道对象中还创建了一个双向链表的管道对象head和tail,分别对应AbstractChannelHandler Context head;final AbstractChannelHandlerContext tail。
ChannelFuture channelFuture = bootstrap.bind(8000).sync():
上面的一些操作都是Netty启动前需要做的一些准备工作,真正的启动流程就在最后一行代码 bind(8000),这行代码承载了Netty启动的最复杂的逻辑。
public ChannelFuture bind(int inetPort) {
//绑定启动,将端口封装成了一个InetSocketAddress对象
return bind(new InetSocketAddress(inetPort));
}
之前NIO编程创建一个ServerSocketChannel,然后绑定一个端口,最后调用epoll的select进行阻塞,当有事件过来的时候select就会解除阻塞,最后通过selectKeys来接受事件,Netty也是这样的流程。
/**
* 这个方法就是Netty的启动过程的最核心的方法,前面的都只是为 下面这个方法做准备的
* 这个方法里面有两个最核心的方法,
* initAndRegister:初始化和注册,注册的是将创建ServerSocketChannel注册到多路复用器selector上
* doBind0:端口绑定,启动监听
* 总结来说就是initAndRegister配置Netty和注册多路复用器
* doBind0:绑定端口,调用epoll阻塞,监听事件的发生
* @param localAddress
* @return
*/
private ChannelFuture doBind(final SocketAddress localAddress) {
//初始化和注册(太多 )
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
/**
* 下面的代码逻辑就是绑定端口,开启select阻塞
* 首先条件isDone第一次启动是不会进入这里,走的else的逻辑
* 在else逻辑中添加了一个监听器,最后由监听器出发了doBind0方法
* 所以不管怎么说,核心代码逻辑都是doBind0
*/
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
//添加一个监听器,最后由监听器调用了doBind0方法
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
/**
* 在DefaultChannelPromise中的notifyListeners进行调用的
*/
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
/**
* 多路复用器注册的核心方法
* @param task
*/
@Override
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
/**
* 多路复用器注册的核心方法,task就是最外层的那个register0所在的线程
* @param task
* @param immediate
*/
private void execute(Runnable task, boolean immediate) {
//这里就是和外层的那个判断一样,就是看当前线程是否是NioEventLoop的那个线程,代码到这里还在主线程中,所以这里返回的false
//因为task线程还没开始执行
boolean inEventLoop = inEventLoop();
//将注册的那个register0所在线程加入到task任务中,也就是加入到taskQueue.offer(task)队列中,等待调度
addTask(task);
if (!inEventLoop) {
//开始线程
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
通道注册到多不复用器:
io.netty.channel.AbstractChannel.AbstractUnsafe#register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
//这里表示已经注册过了,不需要在注册
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
//eventLoop 是从boss 线程组选择出来一个NioEventLoop
AbstractChannel.this.eventLoop = eventLoop;
//下面这个if表示当前的线程是否和NioEventLoop是否在一个线程中的
//反正就是判断需要同步处理还是异步处理,Netty是一个异步非阻塞的通信框架,所以很多地方都是使用这种线程池的方案去处理的
//所以毫无疑问,register0是核心方法
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
/**
* 这里不要被这个execute给迷糊了,看到这个以为是一个线程的启动方法,其实不是,其实就是简单的
* 调用了对象eventLoop中的execute方法,这个方法传入了一个线程对象,但是肯定不是在这里执行的这个线程的run方法
* 的,所以这里记住这个execute方法中传入了一个内部类线程,肯定要先看这个execute方法,而这里的
* eventLoop是NioEventLoop,也是SingleThreadEventExecutor
*/
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
io.netty.util.concurrent.SingleThreadEventExecutor#execute(java.lang.Runnable)
/**
* 多路复用器注册的核心方法
* @param task
*/
@Override
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
/**
* 多路复用器注册的核心方法,task就是最外层的那个register0所在的线程
* @param task
* @param immediate
*/
private void execute(Runnable task, boolean immediate) {
//这里就是和外层的那个判断一样,就是看当前线程是否是NioEventLoop的那个线程,代码到这里还在主线程中,所以这里返回的false
//因为task线程还没开始执行
boolean inEventLoop = inEventLoop();
//将注册的那个register0所在线程加入到task任务中,也就是加入到taskQueue.offer(task)队列中,等待调度
addTask(task);
if (!inEventLoop) {
//开始线程
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
io.netty.channel.nio.NioEventLoop#run
这个run方法就是NioEventLoop的,channel注册多路复用器、select阻塞、事件的处理等都在这个run里面执行的,为什么说netty是异步非阻塞,就是因为它都是通过线程池来完成的;简单来说就是这个run方法将之前添加的taskQueue中的线程任务都拿出来要执行,但是需要将run的逻辑执行完成了 才会执行,所以当你看到eventLoop.execute(Runnable task)的时候你就要知道task的执行是execute去执行的,但是需要execute中的逻辑执行完成了才会去执行这个task,而Netty的这里的很多任务都是这样做的,就是当有eventLoop.execute (Runnable task)的时候,Netty首先将这个task添加到taskQueue中,然后执行了execute中的逻辑过后,然后将taskQueue中的task拿出来,然后调用task的run执行任务的核心逻辑,netty中大量的处理都是在taskQueue中去完成的,所以 对于eventLoop.execute(task)这个核心逻辑要明白才行。
/**
* 这里的run方法才是真正的将NIOServerSocketChannel注册到多路复用器上
*/
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
/**
*这里就是判断现在是那种事件,如果Netty在启动的时候,这里的case都不满足,但是如果启动完成了
*那么再来调用就会调用SELECT,所以根据不同的情况来的
*/
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
//这里就是调用的多路复用器的select方法,就是在nio中的selector.select()方法
/**
* curDeadlineNanos这个是计算调用select的时候阻塞的时间的,因为netty这里 还有很多事情没有做
* 注册多路复用器都还没有做,所以这里要计算出一个时间,如果这个时间过了,还没有连接过来,那么也会唤醒,
* 不会一直阻塞
*/
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
/**
* 如果是第一次启动的注册多路复用器是不会进入这个 if的,因为
* hasTasks就是判断当前的多路复用器注册队列中是是否有任务,如果有任务,
* 那么返回true,没有就返回 false的,netty的意思就是服务端刚启动要注册的是NioServerSocketChannel
* 所以不阻塞,否则阻塞了没意义,因为多路复用器都还没有注册的有channel,根本没有办法监听
*/
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
/**
* strategy在Netty启动的时候是0
*/
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
//调用select有事件发生的时候会来调用,这里面就是NIO的那个获取selectKeys然后处理的逻辑,netty封装了
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
//调用select有事件发生的时候会来调用
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
//Netty启动的时候调用,这里才是真正的多路复用器的注册,就是调用最开始的那个execute(Runable task)中的那个task
//就是从taskQueue中取出这个线程任务然后执行run方法
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {
throw (Error) e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Error e) {
throw (Error) e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//这里就是将NioServerSocketChannel注册到多路复用上Selector
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
/**
* 这个方法的调用就是处理管道的handler的,就是处理器,
* 它的过程会将之前创建的ChannelInitializer那个管道中的处理器添加到管道中,然后删除
* 简单来说就是先将ChannelInitializer中的那个initChannel方法调用完成过后,然后删除这个ChannelInitializer那
* 内部类
*/
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
//这里的调用的注册,那么注册也要调用管道中的所有的有实现了注册的处理器,调用是从链表的head开始的
//当看到有调用fireChannelRegistered方法的, 那么证明是调用了下一个处理器的ChannelRegistered方法
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
//表示netty现在是激活的状态来调用的
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//将NioServerSocketChannel 注册到selector多路复用器 上
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}