vlambda博客
学习文章列表

【GRPC】学习GRPC线程模型(一)

最近有个朋友换了工作,不像大部分人那样,编造各种被迫离开的外部因素,来保住公司的颜面(比如:抠),与公司好聚好散。朋友直言不讳地在离职申请表上,坦诚地写出了公司在各方面的不足,如:食堂难吃。


工具人疑惑地问朋友,你们公司好歹也是业内头部公司,又是出了名的、以关爱员工自我标榜的家文化企业,待遇比不上互联网也就算了,连区区食堂质量也沦陷了吗?


朋友叹了口气说,互联网狼文化每周有家庭日,我们家文化每周只有学习强国。


工具人弱弱地问,那努力工作不平躺,学习强国做卷王,总要让你们每天吃饱,补充营养吧?


朋友呵呵一笑道,


打菜阿姨好手艺,

勺起肉落装空气,

人称食堂娃娃机。

做菜师傅好算计,

中午剩菜晚间卖,

东方不败铁公鸡。


那菜品咋样呢?

有诗曰:


食堂名菜风干鸡,

硬如生铁狗不理。

生猛海鲜断头虾,

烂须软壳世间奇。

假牛肉,人工造,

大咸蛋,血压高。

反正员工没得挑,

我爱咋烧就咋烧。


工具人摇摇头,马上就要”十三五“了,让你们公司的人大代表给劳动法上个提案吧:午饭吃不饱吃不好的员工,可以准时下班。


言归正传,朋友能够顺利地找到工作,还是因为自身能力过硬,基础扎实。能不将就就绝不将就。所以榜样在前,我们今天就继续学习GRPC相关知识。

grpc的线程模型由多个线程池组成,其中Acceptor线程池和网络IO线程池是基于Netty的。今天我们就来看看这两个线程池是如何初始化的。Grpc服务端启动的入口是ServerImpl中的start函数:

ServerImpl.class

@Override public ServerImpl start() throws IOException { synchronized (lock) { checkState(!started, "Already started"); checkState(!shutdown, "Shutting down"); // Start and wait for any port to actually be bound. // 启动NettyServer服务 transportServer.start(new ServerListenerImpl()); executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); started = true; return this; } }

我们继续追踪transportServer.start(...);这里我们重点关注Netty的两个分组:bossGroup和workerGroup,以及ServerBootstrap.bind(address)函数。bossGroup其实就是Grpc的Acceptor线程的载体,而workerGroup则是IO线程池的载体。

NettyServer.class

 @Override public void start(ServerListener serverListener) throws IOException { listener = checkNotNull(serverListener, "serverListener"); // 初始化EventLoopGroup allocateSharedGroups(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup); b.channel(channelType); ..... //新客户链接初始化 b.childHandler(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ..... } }); // Bind and start to accept incoming connections. ChannelFuture future = b.bind(address); ....}

从源码中可以看到,通过allocateSharedGroups初始化的线程组中,bossGroup初始化了1个NioEventLoop,而workerGroup则会拥有CPU个数两倍的NioEventLoop。NioEventLoop是Netty的核心类之一,每一个NioEventLoop会开启一个线程,并使用Select选择器处理网络IO事件。NioEventLoop也可以执行非IO任务,通过execute直接执行,如channel的pipeline初始化等。通过追踪 b.bind(address),可以看到首先会对监听的socket(即channel) 初始化,在初始化完成后,将channel注册到Select中,并channel绑定了自己的执行线程。

AbstractBootstrap.class

final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); //第一步,初始化channel  init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);    } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } //将channel注册到Select中,同时channel也绑定了自己的执行线程 ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture;}

根据我们开篇的描述,Grpc或者说Netty会用boss线程处理接入请求,使用worker线程处理网络IO,那这个衔接过程是在哪里完成的呢?为此,我们追踪一下channel的初始化过程init(channel),这里的channel指的是服务端监听socket。

@Overridevoid init(Channel channel) { setChannelOptions(channel, newOptionsArray(), logger); setAttributes(channel, newAttributesArray()); ChannelPipeline p = channel.pipeline(); //这里的childGroup只是换了个马甲,其实就是开始初始化的wokerGroup final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions); final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs); p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { //给pipeline增加了ServerBootstrapAcceptor处理器 pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });}

在服务端channel初始化时,channel执行了一个非IO任务,给自己的pipeline处理链增加了ServerBootstrapAcceptor处理器。并且把wokerGroup作为构造参数传了进去。当一个新的客户端发起建连,服务端channel所注册的Select会监听到一个OP_ACCEPT的IO事件,并由NioEventLoop线程进行处理。同样的,如果是其他网络IO,也会在NioEventLoop中进行处理,如SelectionKey为OP_WRITE或OP_READ 。

NioEventLoop.class

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); ....... try { int readyOps = k.readyOps();  if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } //OP_ACCEPT:接收连接事件,表示服务端监听到了客户端连接, // 并可以接收这个连接 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //处理事件 unsafe.read();    } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }

其读事件unsafe.read()处理过程如下:

该步骤会将表示客户端socket的channel,投递到server channel的处理链上,其接收者即为之前我们提到的ServerBootstrapAcceptor处理器,并触发channelRead事件,此时将代表客户端socket的channel对象,并选择workerGroup中的某一个NioEventLoop绑定执行线程,并注册到对应的select上。

ServerBootstrapAcceptor.calss

@Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { //客户端channel final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); try { //将客户端channel注册到workerGroup的Select上。 childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }

至此,我们将Grpc或者说Netty的服务建连,boss线程,worker线程池之间的关系和代码整体串联起来了。