Netty 线程服务端启动流程
Netty
本文需要JDK NIO相关知识前提
类图:
主要的接口:
netty: EventLoopGroup, Eventloo, Channel
JDK: Executor, ExecutorService
线程模型
Netty的线程模型是典型的Reactor模式,关于Reactor模式的相关内容可以参考大神 Doug Lea的文章:《Scalable IO in Java》
Netty里面的线程模型,是主从Reactor模型,线程为: EvevtLoopGroup
主线程:主要负责网络IO的监听、连接请求,分发handler
从线程:主要负责用户内容处理
时序图
服务端启动的每一步调用的方法在图里都比较清晰,下面具体说下 一个典型的服务端启动程序:
public static void main(String[] args) {
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workGroup=new NioEventLoopGroup();
ServerBootstrap serverBootstrap=new ServerBootstrap();
try {
serverBootstrap.group(bossGroup,workGroup).
channel(NioServerSocketChannel.class)
.childHandler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(
SocketChannel ch) throws Exception {
ChannelPipeline pipeline =
ch.pipeline();
pipeline.addLast(
new TcpServerHandler());
}
});
ChannelFuture channelFuture =
serverBootstrap.bind(8866).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
程序简单说明:
首先创建2个EventLoopGroup,一个主:bossGroup,一个从:workGroup
创建启动器:ServerBootstrap
将主从Reactor放到启动器中,设置相关的参数并添加相应的handlder处理业务
绑定端口启动监听服务
初始化
EventLoopGroup bossGroup=new NioEventLoopGroup();
创建主Reactor,在这个里面主要初始化了一些属性,建立线程组
初始化工作主要在其父类:
MultithreadEventExecutorGroup
初始化
children=newEventExecutor[nThreads];
这个children是NioEventLoop实例。每个NioEventLoop是一个Executor的实例,其中execute实现为内部实线的线程池,每个NioEventLoop内部有一个selector实例、一个创建线程用的executor,这个executor可以理解为JDK自带的CacheThreadPool,每次提交都创建一个新线程并马上启动。
绑定
ChannelFuture channelFuture = serverBootstrap.bind(8866).sync();
serverBootstrap.bind()
,调用父类(AbstractBootstrap
)的doBind()
private ChannelFuture doBind(final SocketAddress localAddress) {
//initAndRegister方法,做了2件事,一个初始化,一个将channel注册到selector上面
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
//注册完后进行绑定,实际就是启动线程进行事件的监听
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
}
//.....
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
}
//....省略
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
initAndRegister
对应上面时序图第3步,此时通过反射工场实例化出了NioSeverSocketChannel
这个对象,该对象里面创建了一个JDKServerSocketChannel
引用并设置为非阻塞(ch.configureBlocking(false);
)。同时实例化了
NioSeverSocketChannel
对象的ChannelPipeline
对象(对应第4步),ChannelPipeline
这个对象又实例化了2个ChannelHandlerContext对象,一个是tail,一个是head,头尾相接形成一个链表.调用bootstrap的init() 第5步,往pipline里面添加了一个
ServerBootstrapAcceptor)
handler(异步添加),对应第6步第7步 注册,调用EventLoopGroup里面的next,实际调用的是EventLoop里面的register,然后regisger又回过来往EventLoop中进行提交任务异步进行register操作(第10步)。
第11步,EventLoop里面的线程池开始工作,这个线程池是Netty内部自己实现的,用了一个阻塞队列实现。添加完任务后线程开始工作,在最后的Run方法里面用一个无限循环进行seletor的轮询。
至此,register流程已完。
回到
AbstractBootstrap
绑定,在channel注册完后,继续进行绑定,第14步,doBind0()
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
}
//========
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
这里的 doBind0()里面的内容也是异步执行的,跟上面的注册一样,都是提交到eventloop里面去执行的,接着是在线程池里面执行
channel.bind(...)
方法,这里的channel实际是NioServerSocketChannel
(第15步)
接着第16步,又调用pipeline里面的
bind(...)
, pipeline调用tail.bind()
,这个tail是ChannelHandlerContext
(第17步),
第18步,执行
invokeBind()
,从上面的第17步得知这里的invokeBind()
是 head 里面的,
第19步,所以这里的bind()为head里面的,即:
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
这里的
unsafe
实际是channel
里面的绑定的unsafe
对象,即第20步。接着
channel
里面的bind()
方法调用自己的doBind()
(NioServerSocketChannel.doBind()) ,即第21步。
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
最后第22步,这里面可以看到用的javaChannel().bind()方法进行最终端口的绑定监听,这个javaChannel()实际就是JDK NIO原生的 ServerSocketChannel对象(在上面NioServerSocketChannel实例化的时候创建的)。
至此,由Netty转化为JDK Nio的启动过程已经完成了,最后JDK的bind()可以自己查看相关JDK文档,在此不述(本文主要分析Netty服务端启动流程)。
网络无处不在,Netty已经成为Java通讯框架的事实标准,许多相关网络模块都使用Netty作为通讯,如Dubbo,RocketMQ,ES,Lettuce等等,可见Netty的地位和重要性。说到底这些技术都是基于Reactor模型进行的,Redis,Nginx等也是这个模型,理解这个模型很重要!