vlambda博客
学习文章列表

Netty 线程服务端启动流程

Netty

本文需要JDK NIO相关知识前提

类图:

主要的接口:

  • netty: EventLoopGroup, Eventloo, Channel

  • JDK: Executor, ExecutorService

线程模型

Netty的线程模型是典型的Reactor模式,关于Reactor模式的相关内容可以参考大神 Doug Lea的文章:《Scalable IO in Java》

Netty里面的线程模型,是主从Reactor模型,线程为: EvevtLoopGroup

  • 主线程:主要负责网络IO的监听、连接请求,分发handler

  • 从线程:主要负责用户内容处理

时序图

Netty 线程服务端启动流程


服务端启动的每一步调用的方法在图里都比较清晰,下面具体说下 一个典型的服务端启动程序:

 
   
   
 
  1. public static void main(String[] args) {

  2. EventLoopGroup bossGroup=new NioEventLoopGroup();

  3. EventLoopGroup workGroup=new NioEventLoopGroup();


  4. ServerBootstrap serverBootstrap=new ServerBootstrap();


  5. try {

  6. serverBootstrap.group(bossGroup,workGroup).

  7. channel(NioServerSocketChannel.class)

  8. .childHandler(

  9. new ChannelInitializer<SocketChannel>() {

  10. @Override

  11. protected void initChannel(

  12. SocketChannel ch) throws Exception {

  13. ChannelPipeline pipeline =

  14. ch.pipeline();

  15. pipeline.addLast(

  16. new TcpServerHandler());

  17. }

  18. });


  19. ChannelFuture channelFuture =

  20. serverBootstrap.bind(8866).sync();

  21. channelFuture.channel().closeFuture().sync();

  22. } catch (InterruptedException e) {

  23. e.printStackTrace();

  24. }finally {

  25. bossGroup.shutdownGracefully();

  26. workGroup.shutdownGracefully();

  27. }

  28. }

程序简单说明:

  • 首先创建2个EventLoopGroup,一个主:bossGroup,一个从:workGroup

  • 创建启动器:ServerBootstrap

  • 将主从Reactor放到启动器中,设置相关的参数并添加相应的handlder处理业务

  • 绑定端口启动监听服务


初始化
 
   
   
 
  1. EventLoopGroup bossGroup=new NioEventLoopGroup();

创建主Reactor,在这个里面主要初始化了一些属性,建立线程组

  • 初始化工作主要在其父类: MultithreadEventExecutorGroup

  • 初始化 children=newEventExecutor[nThreads]; 这个children是NioEventLoop实例。

    • 每个NioEventLoop是一个Executor的实例,其中execute实现为内部实线的线程池,每个NioEventLoop内部有一个selector实例、一个创建线程用的executor,这个executor可以理解为JDK自带的CacheThreadPool,每次提交都创建一个新线程并马上启动。

绑定
 
   
   
 
  1. ChannelFuture channelFuture = serverBootstrap.bind(8866).sync();

  1. serverBootstrap.bind(),调用父类( AbstractBootstrap)的 doBind()

 
   
   
 
  1. private ChannelFuture doBind(final SocketAddress localAddress) {

  2. //initAndRegister方法,做了2件事,一个初始化,一个将channel注册到selector上面

  3. final ChannelFuture regFuture = initAndRegister();

  4. final Channel channel = regFuture.channel();

  5. if (regFuture.cause() != null) {

  6. return regFuture;

  7. }


  8. //注册完后进行绑定,实际就是启动线程进行事件的监听

  9. if (regFuture.isDone()) {

  10. // At this point we know that the registration was complete and successful.

  11. ChannelPromise promise = channel.newPromise();

  12. doBind0(regFuture, channel, localAddress, promise);

  13. return promise;

  14. }

  15. //.....

  16. }

 
   
   
 
  1. final ChannelFuture initAndRegister() {

  2. Channel channel = null;

  3. try {

  4. channel = channelFactory.newChannel();

  5. init(channel);

  6. }

  7. //....省略


  8. ChannelFuture regFuture = config().group().register(channel);

  9. if (regFuture.cause() != null) {

  10. if (channel.isRegistered()) {

  11. channel.close();

  12. } else {

  13. channel.unsafe().closeForcibly();

  14. }

  15. }

  16. return regFuture;

  17. }

  • initAndRegister 对应上面时序图第3步,此时通过反射工场实例化出了 NioSeverSocketChannel这个对象,该对象里面创建了一个JDK ServerSocketChannel引用并设置为非阻塞( ch.configureBlocking(false);)。

  • 同时实例化了 NioSeverSocketChannel对象的 ChannelPipeline对象(对应第4步), ChannelPipeline这个对象又实例化了2个ChannelHandlerContext对象,一个是tail,一个是head,头尾相接形成一个链表.

  • 调用bootstrap的init() 第5步,往pipline里面添加了一个 ServerBootstrapAcceptorhandler(异步添加),对应第6步

  • 第7步 注册,调用EventLoopGroup里面的next,实际调用的是EventLoop里面的register,然后regisger又回过来往EventLoop中进行提交任务异步进行register操作(第10步)。

  • 第11步,EventLoop里面的线程池开始工作,这个线程池是Netty内部自己实现的,用了一个阻塞队列实现。添加完任务后线程开始工作,在最后的Run方法里面用一个无限循环进行seletor的轮询。

Netty 线程服务端启动流程

Netty 线程服务端启动流程

至此,register流程已完。

  1. 回到 AbstractBootstrap绑定,在channel注册完后,继续进行绑定,第14步, doBind0()

 
   
   
 
  1. if (regFuture.isDone()) {

  2. // At this point we know that the registration was complete and successful.

  3. ChannelPromise promise = channel.newPromise();

  4. doBind0(regFuture, channel, localAddress, promise);

  5. return promise;

  6. }

  7. //========

  8. private static void doBind0(

  9. final ChannelFuture regFuture, final Channel channel,

  10. final SocketAddress localAddress, final ChannelPromise promise) {


  11. // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up

  12. // the pipeline in its channelRegistered() implementation.

  13. channel.eventLoop().execute(new Runnable() {

  14. @Override

  15. public void run() {

  16. if (regFuture.isSuccess()) {

  17. channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

  18. } else {

  19. promise.setFailure(regFuture.cause());

  20. }

  21. }

  22. });

  23. }


  • 这里的 doBind0()里面的内容也是异步执行的,跟上面的注册一样,都是提交到eventloop里面去执行的,接着是在线程池里面执行 channel.bind(...) 方法,这里的channel实际是 NioServerSocketChannel(第15步)


  • 接着第16步,又调用pipeline里面的 bind(...), pipeline调用 tail.bind(),这个tail是 ChannelHandlerContext(第17步),


  • 第18步,执行 invokeBind(),从上面的第17步得知这里的 invokeBind()是 head 里面的,


  • 第19步,所以这里的bind()为head里面的,即:

 
   
   
 
  1. public void bind(

  2. ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)

  3. throws Exception {

  4. unsafe.bind(localAddress, promise);

  5. }

  • 这里的 unsafe实际是 channel里面的绑定的 unsafe对象,即第20步。

  • 接着 channel里面的 bind()方法调用自己的 doBind()(NioServerSocketChannel.doBind()) ,即第21步。

 
   
   
 
  1. protected void doBind(SocketAddress localAddress) throws Exception {

  2. if (PlatformDependent.javaVersion() >= 7) {

  3. javaChannel().bind(localAddress, config.getBacklog());

  4. } else {

  5. javaChannel().socket().bind(localAddress, config.getBacklog());

  6. }

  7. }

  • 最后第22步,这里面可以看到用的javaChannel().bind()方法进行最终端口的绑定监听,这个javaChannel()实际就是JDK NIO原生的 ServerSocketChannel对象(在上面NioServerSocketChannel实例化的时候创建的)。

至此,由Netty转化为JDK Nio的启动过程已经完成了,最后JDK的bind()可以自己查看相关JDK文档,在此不述(本文主要分析Netty服务端启动流程)。


网络无处不在,Netty已经成为Java通讯框架的事实标准,许多相关网络模块都使用Netty作为通讯,如Dubbo,RocketMQ,ES,Lettuce等等,可见Netty的地位和重要性。说到底这些技术都是基于Reactor模型进行的,Redis,Nginx等也是这个模型,理解这个模型很重要!