vlambda博客
学习文章列表

gRPC服务端启动流程走查

目录
1、服务端启动示例
2、构建监听地址SocketAddress
2.1 SPI加载NettyServerProvider
2.2 根据指定端口创建监听地址
3、将service注册到缓存
4、Server构建
5、服务端启动
6、小结

1、服务端启动示例

server = ServerBuilder.forPort(port) // @1
.addService(new GreeterImpl()) // @2
.build() // @3
.start(); // @4

小结:服务端启动只有一行代码,设计简洁。

2、构建监听地址SocketAddress

2.1 SPI加载NettyServerProvider

代码坐标:io.grpc.ServerProvider

ServerBuilder.forPort(port)

public static ServerBuilder<?> forPort(int port) {
return ServerProvider.provider().builderForPort(port);
}

private static final ServerProvider provider = ServiceProviders.load(
ServerProvider.class,
Collections.<Class<?>>emptyList(),
ServerProvider.class.getClassLoader(),
new PriorityAccessor<ServerProvider>() {
@Override
public boolean isAvailable(ServerProvider provider) {
return provider.isAvailable();
}

@Override
public int getPriority(ServerProvider provider) {
return provider.priority();
}
}); // @1

@1 通过SPI对实例化NettyServerProvider
String PREFIX = "META-INF/services/";
String fullName = PREFIX + service.getName();
c = Class.forName(cn, false, loader);
具体目录为:grpc-netty工程 META-INF/services/io.grpc.ServerProvider配置文件提供的配置io.grpc.netty.NettyServerProvider

2.2 根据指定端口创建监听地址

代码坐标1:io.grpc.netty.NettyServerProvider
代码坐标2:io.grpc.netty.NettyServerBuilder

protected NettyServerBuilder builderForPort(int port) {
return NettyServerBuilder.forPort(port);
}

private final List<SocketAddress> listenAddresses = new ArrayList<>();

private NettyServerBuilder(int port) {
this.listenAddresses.add(new InetSocketAddress(port)); // @1
}
3、将service注册到缓存

代码坐标:io.grpc.internal.AbstractServerImplBuilder

.addService(new GreeterImpl()) // @1

public final T addService(BindableService bindableService) {
...
return addService(checkNotNull(bindableService, "bindableService").bindService()); // @2
}

Builder addService(ServerServiceDefinition service) {
services.put(service.getServiceDescriptor().getName(), service); // @3
return this;
}

@1 GreeterImpl自定义提供的服务实现类需继承插件生产的抽象类GreeterGrpc.GreeterImplBase同时实现了BindableService接口
@2 bindService()不需要自己实现,插件自动生成代码时会自动生成其实现类
@3 将服务注册到缓存services中(LinkedHashMap)

小结:将自定义的服务提供实现类注册到缓存中。

4、Server构建
.build()
public final Server build() {
ServerImpl server = new ServerImpl(
this,
buildTransportServers(getTracerFactories()), // @1
Context.ROOT);
for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) {
notifyTarget.notifyOnBuild(server);
}
return server;
}

protected List<NettyServer> buildTransportServers(){
...
for (SocketAddress listenAddress : listenAddresses) {
NettyServer transportServer = new NettyServer(...) // @2
}
}

@1 创建NettyServer
@2 为注册的每个SocketAddress创建NettyServer

4.1 NettyServer构造函数参数

NettyServer(
SocketAddress address, ChannelFactory<? extends ServerChannel> channelFactory, // @1
Map<ChannelOption<?>, ?> channelOptions, // @2
ObjectPool<? extends EventLoopGroup/**一组EventLoop**/> bossGroupPool, // @3
ObjectPool<? extends EventLoopGroup> workerGroupPool, // @4
ProtocolNegotiator protocolNegotiator, // @5
List<? extends ServerStreamTracer.Factory> streamTracerFactories, // @6
TransportTracer.Factory transportTracerFactory, // @7
int maxStreamsPerConnection, // @8
int flowControlWindow, // @9
int maxMessageSize, // @10
int maxHeaderListSize, // @11
long keepAliveTimeInNanos, // @12
long keepAliveTimeoutInNanos, // @13
long maxConnectionIdleInNanos, // @14
long maxConnectionAgeInNanos, // @15
long maxConnectionAgeGraceInNanos,// @16
boolean permitKeepAliveWithoutCalls, // @17
long permitKeepAliveTimeInNanos, // @18
InternalChannelz channelz
)

@1 Channel工厂类创建新的通道
@2 ChannelOption设置项
@3 用于accept客户端链接的线程池转发给workerGroupPool
@4 初始化客户端连接的线程池
@5 遵循HTTP/2规范的通信协商
@6 用于创建ServerStreamTracer
@7 创建TransportTracer工厂类用于统计通信流量
@8 每个连接允许的最大Streams
@9 HTTP/2流控窗口大小
@10 服务端允许的最大消息体大小该属性已废弃使用maxInboundMessageSize代替
@11 接受最大Header大小已废弃使用maxInboundMetadataSize代替
@12 存活时间单位纳秒在存活时间内发送下一次keepAlive ping
@13 keepalive ping requests的超时时间
@14 连接最大空闲时间超过后将优雅关闭
@15 连接最大存活时间超过后将优雅关闭
@16 当达到最大连接时RPCs有优雅关闭时间,在优雅时间内RPC请求未完成将被取消
@17 是否允许在没有RPC调用的情况下客户端发送keep-alive HTTP/2 PINGs 默认false
@18 允许客户端保持连接的最大时间 默认5分钟

5、服务端启动

代码坐标:ServerImpl.start

.start()

public ServerImpl start() throws IOException {
synchronized (lock) {
checkState(!started, "Already started");// @1
checkState(!shutdown, "Shutting down");// @2
ServerListenerImpl listener = new ServerListenerImpl(); // @3
for (InternalServer ts : transportServers) {
ts.start(listener); // @4
activeTransportServers++; // @5
}
executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); // @6
started = true; // @7
return this;
}

@1 检查服务是否启动
@2 检查服务是否正在关闭
@3 transport创建监听器
@4 server启动
@5 活动server统计
@6 从守护线程池中获取一个线程
@7 服务启动标记

代码坐标:NettyServer.start

public void start(ServerListener serverListener) {
ServerBootstrap b = new ServerBootstrap();
...
b.childHandler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) {
NettyServerTransport transport =
new NettyServerTransport(...); // @1
}
transport.start(transportListener);
...
});
}

代码坐标:NettyServerTransport.start

...
grpcHandler = createHandler(listener, channelUnused); // @2
...

代码坐标:NettyServerHandler.newHandler

 static NettyServerHandler newHandler(...){ // @3
final Http2Connection connection = new DefaultHttp2Connection(true);
WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection);
dist.allocationQuantum(16 * 1024); // Make benchmarks fast again.
DefaultHttp2RemoteFlowController controller =
new DefaultHttp2RemoteFlowController(connection, dist);
connection.remote().flowController(controller);
final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer(
permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);

// Create the local flow controller configured to auto-refill the connection window.
connection.local().flowController(
new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter);
encoder = new Http2ControlFrameLimitEncoder(encoder, 10000);
Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
frameReader);

Http2Settings settings = new Http2Settings();
settings.initialWindowSize(flowControlWindow);
settings.maxConcurrentStreams(maxStreams);
settings.maxHeaderListSize(maxHeaderListSize);

return new NettyServerHandler(...)
}

@1 创建NettyServerTransport
@2 基于Netty HTTP/2构建gRPC服务端
@3 具体的Netty HTTP/2实现,具体在分析HTTP/2时再回头分析

6、小结

7.系列文章




「瓜农老梁  学习同行