vlambda博客
学习文章列表

​Netty之Reactor线程模型概述

Netty之Reactor线程模型概述

前一篇中简单介绍了Reactor设计模式,并回顾了Doug Lea在Scalable IO in Java一文中介绍的几种Reactor模型与结合Java NIO的简单实现。今天在这个基础上来看一下,Netty是如何实现Reactor设计模式的。本文只整体的分析不涉及具体的源码,源码分析放在概述后。

Netty Reactor线程模型中主要角色

先来看看Netty4.1.47.Final源码中exapmle中EchoServer示例

 
   
   
 
  1. /**

  2. * Echoes back any received data from a client.

  3. */

  4. public final class EchoServer {

  5. static final boolean SSL = System.getProperty("ssl") != null;

  6. static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

  7. public static void main(String[] args) throws Exception {

  8. // Configure SSL.

  9. final SslContext sslCtx;

  10. if (SSL) {

  11. SelfSignedCertificate ssc = new SelfSignedCertificate();

  12. sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();

  13. } else {

  14. sslCtx = null;

  15. }

  16. // Configure the server.

  17. EventLoopGroup bossGroup = new NioEventLoopGroup(1);

  18. EventLoopGroup workerGroup = new NioEventLoopGroup();

  19. final EchoServerHandler serverHandler = new EchoServerHandler();

  20. try {

  21. ServerBootstrap b = new ServerBootstrap();

  22. // 设置线程模型

  23. b.group(bossGroup, workerGroup)

  24. // 设置IO模型

  25. .channel(NioServerSocketChannel.class)

  26. // 设置服务端tcp连接最大的连接数

  27. .option(ChannelOption.SO_BACKLOG, 100)

  28. // 设置每一个连接的处理器

  29. .handler(new LoggingHandler(LogLevel.INFO))

  30. // 针对于已建立好的连接设置处理器

  31. .childHandler(new ChannelInitializer<SocketChannel>() {

  32. @Override

  33. public void initChannel(SocketChannel ch) throws Exception {

  34. ChannelPipeline p = ch.pipeline();

  35. if (sslCtx != null) {

  36. p.addLast(sslCtx.newHandler(ch.alloc()));

  37. }

  38. //p.addLast(new LoggingHandler(LogLevel.INFO));

  39. p.addLast(serverHandler);

  40. }

  41. });

  42. // Start the server.

  43. ChannelFuture f = b.bind(PORT).sync();

  44. // Wait until the server socket is closed.

  45. f.channel().closeFuture().sync();

  46. } finally {

  47. // Shut down all event loops to terminate all threads.

  48. bossGroup.shutdownGracefully();

  49. workerGroup.shutdownGracefully();

  50. }

  51. }

  52. }

通过ServerBootstrap类可以指定线程模型bossGroup与workGroup、IO模型channel、服务端属性option、处理每个连接的属性childOption、每一个连接的处理器handler、已建立的连接处理器childHandler。ServerBootstrap类能够用简短的代码构建出性能高效、扩展性极好的网络服务端程序。Netty内部为我们实现了复杂的Reactor线程模型。就上面的示例而言涉及与Reactor线程模型相关的核心类有

  • Channel 可以理解为网络连接的抽象,能处理请求连接与IO读写事件。处理TCP协议时可以分为NioServerSocketChannel、NioSocketChannel两类。NioServerSocketChannel内部封装了java NIO的ServerSocketChannel用于接收Client的连接,NioSocketChannel内封装了java NIO的SocketChannel用于处理已建立好的Client连接上的IO读写事件。

  • ChannelPipeline 可以理解为处理连接Channel上事件的处理器ChannelHandler构成的一条管道,每个Channel都有唯一的ChannelPipeline。ChannelPipeline里面包含多个ChannelHandlerContext,默认只有TailContext与HeadContext。在ChannelPipeline上可以动态地添加或删除ChannelHandler。

  • ChannelHandlerContext 事件处理器ChannelHandler对应的上下文,ChannelHandlerContext内包含了唯一的ChannelHandler。上下文其实就是ChannelPipeline和Channel。

  • ChannelHandler 连接Channel上的事件处理器,用于具体处理请求连接与IO读写事件。ChannelHandler处理完事件后,可以通过其对应的ChannelHandlerContext将事件传播给Channel上的其他ChannelHandler去处理。

  • ServerBootstrapAcceptor NioServerSocketChannel上的一个特殊ChannelHandler,用于处理请求连接事件。另外,ServerBootstrapAcceptor 会设置连接NioSocketChannel上面的选项、属性与ChannelHandler(这些是通过ServerBootstrap的child**方法指定的,如childOption、childAttr、childHandler),同时将NioSocketChannel注册到workerGroup上去,workerGroup内部会选择一个线程用于处理这条连接NioSocketChannel上的IO事件。

  • NioEventLoopGroup 事件循环组,其内部管理了若干NioEventLoop。负责根据一定的策略为Channel选择一个NioEventLoop,并将Channel注册到NioEventLoop内部的事件处理器Selector上。后继Channel上的连接或IO读写事件将由该NioEventLoop内部的线程去处理。

  • NioEventLoop 事件循环,内部维护了Selector用于处理已注册的Channel。NioEventLoop的run方法一方面会通过内部的Selector不断地监听Channel上的连接与IO读写事件并处理,另一方面还会不断地处理提交到其内部队列中的任务。所有的这些事件实际都是由NioEventLoop的超类SingleThreadEventExecutor中的线程thread完成的。每次向SingleThreadEventExecutor提交任务,任务都会被加入到队列,同时如果是第一次提交任务还会触发SingleThreadEventExecutor中thread线程的创建。这个thread线程是一个特殊的线程,实际由ThreadPerTaskExecutor创建启动。

Netty之Reactor线程模型

Netty内部采用的Reactor模型可以用下了一张图概括

结合示例代码先看图最左边的部分,示例代码中通过ServerBootstrap参数boosGroup指的NioEventLoopGroup主要负责accept客户端的连接。而真正执行是由boosGroup中的一个NioEventLoop去处理。这个NioEventLoop内部维护了用于循环执行事件与任务的thread、监听事件的多路复用器selector、存放待执行任务的taskQueue、存放定时执行任务的scheduledTaskQueue。NioEventLoopGroup中的thread在第一次有任务提交时会被创建与启动,然后循环做三件事件:select不断地select直到一定时间内有任务或事件要被处理、processSelectedKeys处理selector上监听获取的事件、runAllTask处理任务队列中的任务。就boosGroup而言,不管组里面指定了多少个NioEventLoop,实际都只会创建一个。这个NioEventLoop在调用ServerBootstrap#bind方法时,转由ServerBootstrap#initAndRegister方法创建。ServerBootstrap#initAndRegister主要做两件事init Channel 与 register Channel。

  • init Channel,ServerBootstrap会通过反射init参数channel指定的IO模型NioServerSocketChannel,在创建NioServerSocketChannel的同时会创建其对应的ChannelPiple,并将最重要的用于处理客户端连接的ServerBootstrapAcceptor(一个特殊的Handler)加到这个ChannelPiple上。

  • register Channel,ServerBootstrap的boosGroup根据一定的策略选择一个NioEventLoop并将上面创建的NioServerSocketChannel注册到NioEventLoop内部的selector上。在注册这一步时,会提交一个任务到到NioEventLoop的taskQueue中,从而触发NioEventLoop内部thread的创建与启动。后继NioEventLoop内部的thread会不断地进行select、processSelectedKeys、runAllTask这三件事件。

然后再看一下中间的部分,当调用ServerBootstrap#bind方法最终NioEventLoop内部的thread启动后。一旦有客户端向服务端发出连接请求后,NioEventLoop内部的thread#run方法在调用Selector#select方法时,就会监听到NioServerSocketChannel上的Accept事件,然后去创建对应的NioSocketChannel并将NioSocketChannel以消息的形式通过ChannelPipeline传递给其上面的ServerBootstrapAcceptor。ServerBootstrapAcceptor负责一方面负责根据ServerBootstrap指定的NioSocketChannel相关参数设置NioSocketChannel,另一方面负责调用workerGroup的register方法,由workerGroup根据一定的策略选择一个NioEventLoop,并完成将NioSocketChannel注册到这个NioEventLoop的selector上,同样这一步会触发NioEventLoop内部thread的创建与启动。

最后再看当Client上发消息给服务端,在上一步看到有Accpet事件达到后ServerBootstrapAcceptor实际会触发这个连接对应的NioSocketChannel绑定到workerGroup中的NioEventLoop。后继有Client上的IO事件达到时都会由这个NioEventLoop内部的thread处理。总结下来就是boosGroup中固定一个线程会不断地监听服务端某一个端口上Client端的Accpet事件,然后创建这个连接对应的NioSocketChannel并通过ChannelPipeline传递给ServerBootstrapAcceptor去处理。ServerBootstrapAcceptor接收到消息后,会让workGroup去选择内部的一个NioEventLoop去处理这个连接上后继的IO事件与任务。

Netty之Reactor线程模型优点