Netty 核心源码解读 —— ServerBootstrap 篇
本文我们就开始对 ServerBootstrap 进行源码解读(4.1.51.Final-SNAPSHOT),为什么是 ServerBootstrap,记得在用 Netty 做第一个项目的时候,写的第一行 Code 就是 new ServerBootstrap(),ServerBootstrap 是 Netty Server 的启动类,所以从它开始了解 Netty 是最合适的。
ServerBootstrap
private final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZ_GROUP_SIZE);private final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZ_THREAD_SIZE);public void init() throws Exception {// Server 服务启动ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup);bootstrap.channel(NioServerSocketChannel.class);bootstrap.childHandler(new ServerChannelInitializer(serverConfig));// 可选参数bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);// 绑定接口,同步等待成功ChannelFuture future = bootstrap.bind(port).sync();ChannelFuture channelFuture = future.addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture future) throws Exception {}});}
这是我在做 TCP 网关时写的 Netty Server 的代码片段(https://github.com/SongranZhang/tcp-gateway/blob/master/src/main/java/com/linkedkeeper/tcp/connector/tcp/server/TcpServer.java),可以看到,Netty Server 的初始化首先是通过 ServerBootstrap 的无参构造函数创建一个对象,接着是这个对象的一串链式调用 bootstrap.group().channel().childHandler().childOption(),而服务启动的真正触发点是这段 bootstrap.bind(port).sync(),下面我们就逐一来分析下这里的每个方法。
首先是 group() 方法。
## ServerBootstrap.javaprivate volatile EventLoopGroup childGroup;public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");}this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");return this;}
这里 workerGroup 赋值给了 ServerBootstrap 的 childGroup,bossGroup 赋值给了父类 AbstractBootstrap 的 group。
volatile EventLoopGroup group;public B group(EventLoopGroup group) {ObjectUtil.checkNotNull(group, "group");if (this.group != null) {throw new IllegalStateException("group set already");}this.group = group;return self();}
接下来是 channel() 方法。
## AbstractBootstrap.javaprivate volatile ChannelFactory extends C> channelFactory;public B channel(Class<? extends C> channelClass) {return channelFactory(new ReflectiveChannelFactory<C>(ObjectUtil.checkNotNull(channelClass, "channelClass")));}public B channelFactory(ChannelFactory extends C> channelFactory) {ObjectUtil.checkNotNull(channelFactory, "channelFactory");if (this.channelFactory != null) {throw new IllegalStateException("channelFactory set already");}this.channelFactory = channelFactory;return self();}
这里 NioServerSocketChannel.class 通过 ReflectiveChannelFactory 进行了实例化,然后赋值给了 AbstractBootstrap 的 channelFactory。
接下来是 childHandler() 方法。
## ServerBootstrap.javaprivate volatile ChannelHandler childHandler;public ServerBootstrap childHandler(ChannelHandler childHandler) {this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");return this;}
这里是对 ServerBootstrap 的 childHandler 赋值。
最后是 childOption() 方法。
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {ObjectUtil.checkNotNull(childOption, "childOption");synchronized (childOptions) {if (value == null) {childOptions.remove(childOption);} else {childOptions.put(childOption, value);}}return this;}
这里 childOptions 维护了 TCP 的参数设置。
简言之 bootstrap.group().channel().childHandler().childOption() 就是在构建 Netty Server 的各种参数,下面再来看 bootstrap.bind(port).sync()。
首先是 bind() 方法。
public ChannelFuture bind(int inetPort) {return bind(new InetSocketAddress(inetPort));}public ChannelFuture bind(SocketAddress localAddress) {validate();return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));}public B validate() {if (group == null) {throw new IllegalStateException("group not set");}if (channelFactory == null) {throw new IllegalStateException("channel or channelFactory not set");}return self();}
这里的 validate() 方法对 AbstractBootstrap 的 group 和 channelFactory 进行非空校验,之后调用 doBind() 方法。
## AbstractBootstrap.javaprivate ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}if (regFuture.isDone()) {// At this point we know that the registration was complete and successful.ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel.promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered();doBind0(regFuture, channel, localAddress, promise);}}});return promise;}}
首先看一下 initAndRegister() 方法。
final ChannelFuture initAndRegister() {Channel channel = null;try {channel = channelFactory.newChannel();init(channel);} catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files"))channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}ChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, it's one of the following cases:// 1) If we attempted registration from the event loop, the registration has been completed at this point.// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully// added to the event loop's task queue for later execution.// i.e. It's safe to attempt bind() or connect() now:// because bind() or connect() will be executed *after* the scheduled registration task is executed// because register(), bind(), and connect() are all bound to the same thread.return regFuture;}
这里 channelFactory.newChannel() 调用的是 ReflectiveChannelFactory 的 newChannel 方法。
## ReflectiveChannelFactoryprivate final Constructor<? extends T> constructor;public ReflectiveChannelFactory(Class<? extends T> clazz) {ObjectUtil.checkNotNull(clazz, "clazz");try {this.constructor = clazz.getConstructor();} catch (NoSuchMethodException e) {throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +" does not have a public non-arg constructor", e);}}public T newChannel() {try {return constructor.newInstance();} catch (Throwable t) {throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);}}
这里 constructor.newInstance() 是 NioServerSocketChannel.class 的一个实例。得到 channel 后,调用 init(channel) 进行初始化,一是给 options 和 attrs 赋值,二是构建 pipeline。
## ServerBootstrap.javavoid init(Channel channel) {setChannelOptions(channel, newOptionsArray(), logger);setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption>, Object>[] currentChildOptions;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);}final Entry<AttributeKey>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});}
回到 initAndRegister() 方法中,init(channel) 之后是 register(channel),该方法在 NioEventLoopGroup 的父类 MultithreadEventLoopGroup 中实现,我们在解读 NioEventLoop 源码时再分析。
public ChannelFuture register(Channel channel) {return next().register(channel);}
看完 initAndRegister(),再回到 doBind() 接着看 doBind0()。
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}
这里 regFuture.isSuccess() 会执行 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);,否者执行 promise.setFailure(regFuture.cause());,这里的 promise 可以认为是一种特殊的 Future 对象。bind 是在 ChannelPipeline 里进行绑定的,我们在解读 ChannelPipeline 源码时再分析。
最后看一下 bootstrap.bind(serverPort).sync() 中的 sync(),bootstrap.bind(serverPort) 返回的是 ChannelFuture,所以 sync() 是调用 DefaultChannelPromise 的方法。
## DefaultChannelPromisepublic ChannelPromise sync() throws InterruptedException {super.sync();return this;}
这里 super.sync(); 调用了父类的方法。
## DefaultPromisepublic Promise<V> sync() throws InterruptedException {await();rethrowIfFailed();return this;}public Promise<V> await() throws InterruptedException {if (isDone()) {return this;}if (Thread.interrupted()) {throw new InterruptedException(toString());}checkDeadLock();synchronized (this) {while (!isDone()) {incWaiters();try {wait();} finally {decWaiters();}}}return this;}
这里 while(!isDone()) 会进入循环,调用 sync() 后线程会被阻塞住。
总结
本篇也是写了好久,本文介绍了 ServerBootstrap,它是构建 Netty Server 的主要实现类,ServerBootstrap 里主要是对各种属性进行赋值,并创建 Channel 和 ChannelPipeline,最后绑定本地端口开始监听 IO 事件。在后续的文章里,我会继续与大家讨论 Netty 的 EventLoop,还请大家多多关注我的个人博客或公账号。
