【GRPC】学习GRPC线程模型(一)
最近有个朋友换了工作,不像大部分人那样,编造各种被迫离开的外部因素,来保住公司的颜面(比如:抠),与公司好聚好散。朋友直言不讳地在离职申请表上,坦诚地写出了公司在各方面的不足,如:食堂难吃。
工具人疑惑地问朋友,你们公司好歹也是业内头部公司,又是出了名的、以关爱员工自我标榜的家文化企业,待遇比不上互联网也就算了,连区区食堂质量也沦陷了吗?
朋友叹了口气说,互联网狼文化每周有家庭日,我们家文化每周只有学习强国。
工具人弱弱地问,那努力工作不平躺,学习强国做卷王,总要让你们每天吃饱,补充营养吧?
朋友呵呵一笑道,
打菜阿姨好手艺,
勺起肉落装空气,
人称食堂娃娃机。
做菜师傅好算计,
中午剩菜晚间卖,
东方不败铁公鸡。
那菜品咋样呢?
又有诗曰:
食堂名菜风干鸡,
硬如生铁狗不理。
生猛海鲜断头虾,
烂须软壳世间奇。
假牛肉,人工造,
大咸蛋,血压高。
反正员工没得挑,
我爱咋烧就咋烧。
工具人摇摇头,马上就要”十三五“了,让你们公司的人大代表给劳动法上个提案吧:午饭吃不饱吃不好的员工,可以准时下班。
言归正传,朋友能够顺利地找到工作,还是因为自身能力过硬,基础扎实。能不将就就绝不将就。所以榜样在前,我们今天就继续学习GRPC相关知识。
grpc的线程模型由多个线程池组成,其中Acceptor线程池和网络IO线程池是基于Netty的。今天我们就来看看这两个线程池是如何初始化的。Grpc服务端启动的入口是ServerImpl中的start函数:
ServerImpl.class
public ServerImpl start() throws IOException {
synchronized (lock) {
checkState(!started, "Already started");
checkState(!shutdown, "Shutting down");
// Start and wait for any port to actually be bound.
// 启动NettyServer服务
transportServer.start(new ServerListenerImpl());
executor = Preconditions.checkNotNull(executorPool.getObject(), "executor");
started = true;
return this;
}
}
我们继续追踪transportServer.start(...);这里我们重点关注Netty的两个分组:bossGroup和workerGroup,以及ServerBootstrap.bind(address)函数。bossGroup其实就是Grpc的Acceptor线程的载体,而workerGroup则是IO线程池的载体。
NettyServer.class
public void
start(ServerListener serverListener) throws IOException
{
listener = checkNotNull(serverListener, "serverListener");
// 初始化EventLoopGroup
allocateSharedGroups();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(channelType);
.....
//新客户链接初始化
b.childHandler(new ChannelInitializer<Channel>() {
public void initChannel(Channel ch) throws Exception {
.....
}
});
// Bind and start to accept incoming connections.
ChannelFuture future = b.bind(address);
....
}
从源码中可以看到,通过allocateSharedGroups初始化的线程组中,bossGroup初始化了1个NioEventLoop,而workerGroup则会拥有CPU个数两倍的NioEventLoop。NioEventLoop是Netty的核心类之一,每一个NioEventLoop会开启一个线程,并使用Select选择器处理网络IO事件。NioEventLoop也可以执行非IO任务,通过execute直接执行,如channel的pipeline初始化等。通过追踪 b.bind(address),可以看到首先会对监听的socket(即channel) 初始化,在初始化完成后,将channel注册到Select中,并channel绑定了自己的执行线程。
AbstractBootstrap.class
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
//第一步,初始化channel
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
//将channel注册到Select中,同时channel也绑定了自己的执行线程
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
根据我们开篇的描述,Grpc或者说Netty会用boss线程处理接入请求,使用worker线程处理网络IO,那这个衔接过程是在哪里完成的呢?为此,我们追踪一下channel的初始化过程init(channel),这里的channel指的是服务端监听socket。
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());
ChannelPipeline p = channel.pipeline();
//这里的childGroup只是换了个马甲,其实就是开始初始化的wokerGroup
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
p.addLast(new ChannelInitializer<Channel>() {
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
public void run() {
//给pipeline增加了ServerBootstrapAcceptor处理器
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
在服务端channel初始化时,channel执行了一个非IO任务,给自己的pipeline处理链增加了ServerBootstrapAcceptor处理器。并且把wokerGroup作为构造参数传了进去。当一个新的客户端发起建连,服务端channel所注册的Select会监听到一个OP_ACCEPT的IO事件,并由NioEventLoop线程进行处理。同样的,如果是其他网络IO,也会在NioEventLoop中进行处理,如SelectionKey为OP_WRITE或OP_READ 。
NioEventLoop.class
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
.......
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
//OP_ACCEPT:接收连接事件,表示服务端监听到了客户端连接,
// 并可以接收这个连接
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//处理事件
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
其读事件unsafe.read()处理过程如下:
该步骤会将表示客户端socket的channel,投递到server channel的处理链上,其接收者即为之前我们提到的ServerBootstrapAcceptor处理器,并触发channelRead事件,此时将代表客户端socket的channel对象,并选择workerGroup中的某一个NioEventLoop绑定执行线程,并注册到对应的select上。
ServerBootstrapAcceptor.calss
"unchecked") (
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//客户端channel
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
//将客户端channel注册到workerGroup的Select上。
childGroup.register(child).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
至此,我们将Grpc或者说Netty的服务建连,boss线程,worker线程池之间的关系和代码整体串联起来了。