【精】彻底熟悉Hadoop RPC框架
学习HDFS绕不开的一个重要的知识点就是Hadoop RPC框架。但是能将RPC框架从客户端->服务端,再从服务端到客户端这一套通信脉络顺着讲明白的资料很少。不少都是给出各个类的定义啊,调用了什么方法啊,很难让读者直观的理解Hadoop RPC框架。因此决定写这样一篇文章,我觉得只要认真读这篇文章,同时跟着本文的流程走一遍,掌握Hadoop RPC框架绝对是不在话下。
本文通过流程图+文字详细的介绍了Hadoop RPC框架。本文首先从DFSClient出发,追踪一个RPC请求的传播路径,以此对Hadoop RPC框架有个初步的了解。然后力图以先整体再局部的方式详细介绍Hadoop RPC框架。
本文涉及到的预备知识有:
静态内部类的用法、动态代理设计模式
RPC调用链路
当我们使用HDFS API进行编程或者HDFS CLI命令行输入命令执行时,内部会通过调用DFSClient的相关方法来实现。以HDFS API的使用为例:
①使用HDFS文件系统API前,先得到一个FileSystem对象。
②调用FileSystem对象中的方法。
// ①使用HDFS文件系统API前,先得到一个FileSystem对象。
public static FileSystem getFileSystem(String coreSitePath, String hdfsSitePath) throws IOException {
Configuration conf = new Configuration();
conf.addResource(new File(coreSitePath).toURI().toURL());
conf.addResource(new File(hdfsSitePath).toURI().toURL());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.setBoolean("fs.hdfs.impl.disable.cache", true);
conf.setClassLoader(BaseUtil.class.getClassLoader());
return FileSystem.get(conf);
}
// ②调用FileSystem对象中的方法。
FileSystem fileSystem = getFileSystem(coreSitePath, hdfsSitePath);
ContentSummary contentSummary = fileSystem.getContentSummary(new Path(hdfsFilePath));
代码里的FileSystem的实际类型其实是DistributedFileSystem,观察DistributedFileSystem类的源码,里面持有了一个DFSClient对象dfs,如下图:
当调用FileSystem类的方法时,其实是调用了DFSClient的方法,如下图:
那我们就从DFSClient类开始,描述一个RPC的传播探险过程,如下图所示。
淡蓝色大长方形代表类,深蓝色小长方形代表类里的字段。箭头指向字段的真实类型。
用文字来描述一下:
DFSClient#namenode (实际类型:ClientNamenodeProtocolTranslatorPB)
|
|
ClientNamenodeProtocolPB#rpcProxy(实际类型:通过JDK动态代理生成的ClientNamenodeProtocolPB的代理对象)
|
通过动态代理Invoker类的invoke方法与ClientNamenodeProtocolServerSideTranslatorPB建立Socket连接
|
ClientNamenodeProtocolServerSideTranslatorPB#server (实际类型:NameNodeRpcServer 或者 RouterRpcServer)
已经非常容易看懂了!!
那其实这里面涉及到三个问题:
①DFSClient的namenode成员变量怎么初始化成ClientNamenodeProtocolTranslatorPB的?
②ClientNamenodeProtocolPB的rpcProxy成员变量怎么赋值为动态代理对象的,以及代理对象怎么通过invoke方法与ClientNamenodeProtocolServerSideTranslatorPB建立了Socket连接?
③ClientNamenodeProtocolServerSideTranslatorPB的server成员变量怎么初始化成NameNodeRpcServer或者RouterRpcServer的?以及Server端怎么解析RPC请求的?
我们按顺序来回答这三个问题:
首先第一个问题:①DFSClient的namenode成员变量怎么初始化成ClientNamenodeProtocolTranslatorPB的?
通过Debug HDFS Client,可以发现最终是调用到了org.apache.hadoop.hdfs.NameNodeProxiesClient#createNonHAProxyWithClientProtocol方法。
调用栈如下:
关注一下调用栈栈顶的NameNodeProxiesClient#createNonHAProxyWithClientProtocol方法:
OK,第一个问题回答完了。
接着回答第二个问题:
②ClientNamenodeProtocolPB的rpcProxy成员变量怎么赋值为动态代理对象的,以及代理对象怎么通过invoke方法与ClientNamenodeProtocolServerSideTranslatorPB建立了Socket连接?
刚刚在上面的图里,NameNodeProxiesClient#createNonHAProxyWithClientProtocol中通过RPC.getProtocolProxy返回了一个ClientNamenodeProtocolPB类型的变量proxy。这个对象就是ClientNamenodeProtocolTranslatorPB中的rpcProxy成员。(因为通过构造函数传进去了)
我们看ClientNamenodeProtocolPB类型,这是一个接口。里面定义了Protocol Buffer根据我们提供的.proto文件生成的类。
同时我们也看到了proxy是这样生成的:
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf),
org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
fallbackToSimpleAuth).getProxy();
追踪 RPC.getProtocolProxy方法:
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr,
UserGroupInformation ticket,
Configuration conf,
SocketFactory factory,
int rpcTimeout,
RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth)
throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
fallbackToSimpleAuth);
}
可以发现最终是由RpcEngine接口定义的getProxy()方法来设置的proxy,继续往下追,看这个getProxy方法的具体实现,实际中Hadoop一般是基于Protocol Buffer作为序列化框架的,因此我们看ProtobufRpcEngine中的实现,如下图:
OK,我们知道了ClientNamenodeProtocolTranslatorPB#rpcProxy这个对象实际上就是在ProtobufRpcEngine#getProxy中生成的代理对象之后(第②个问题的第一小问),来看看代理对象的invoke方法是如何与Server端建立的Socket连接吧(第②个问题的第二小问)!
我们直接来到ProtobufRpcEngine#Invoker内部类的invoke方法。(动态代理就是这么设计的,就是要看invoke方法)
invoke方法很长,我截取了其中主要框架代码,忽略了一些不影响我们理解Hadoop RPC框架的细节(比如参数验证啊什么的),代码如下,核心关注一行client.call(xxx)。相当于在这里最终调用到了Client类的call方法。
@Override
public Message invoke(Object proxy, final Method method, Object[] args)
throws ServiceException {
// 构造RPC请求header
RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
// RPC请求参数
final Message theRequest = (Message) args[1];
final RpcWritable.Buffer val;
try {
// 核心方法!
val = (RpcWritable.Buffer) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcProtobufRequest(rpcRequestHeader, theRequest), remoteId,
fallbackToSimpleAuth);
} catch (Throwable e) {
throw new ServiceException(e);
} finally {
if (traceScope != null) traceScope.close();
}
// 是异步模式? 可以先忽略
if (Client.isAsynchronousMode()) {
final AsyncGet<RpcWritable.Buffer, IOException> arr
= Client.getAsyncRpcResponse();
final AsyncGet<Message, Exception> asyncGet
= new AsyncGet<Message, Exception>() {
@Override
public Message get(long timeout, TimeUnit unit) throws Exception {
return getReturnMessage(method, arr.get(timeout, unit));
}
@Override
public boolean isDone() {
return arr.isDone();
}
};
ASYNC_RETURN_MESSAGE.set(asyncGet);
return null;
} else {
// 根据服务端响应构造返回结果的Messge对象
return getReturnMessage(method, val);
}
}
经过追踪,建立Socket的整体流程是:Client#call -> Client#getConnection-> Client#Connection#setupIOstreams ->Client#Connection#setupConnection -> NetUtils.connect。整个调用链路如下图所示:
OK前两个问题回答完毕,接着来看第三个问题:
③ClientNamenodeProtocolServerSideTranslatorPB的server成员变量怎么初始化成NameNodeRpcServer或者RouterRpcServer的?
回顾第二步,Hadoop通过JDK动态代理拿到了一个ClientNamenodeProtocolPB类型的代理对象:rpcProxy,然后把方法的调用委托给代理对象。代理对象会根据我们在hdfs-site.xml中提供的Namenode RPC的Socket标识符(IP或hostname + 端口号)与这个Socket标识符建立连接,进行RPC请求和获取响应。
因此在Namenode RPC Server端会有启动这个监听端口的行为。我们来看一看。观察到在NamenodeRpcServer的构造函数中,new了ClientNamenodeProtocolServerSideTranslatorPB对象。这个对象看类的名字有个ServerSide,意思是Server一侧的TranslatePB,对应了之前讲的客户端一侧的TranslatePB。
OK,我们接下来从NameNode类出发,进而推理到NamenodeRpcServer以及ClientNamenodeProtocolServerSideTranslatorPB。
首先给出一个整体的流程图:
在Namenode初始化的时候,会根据各种配置项构造一个NamenodeRpcServer对象rpcServer,然后在启动服务函数里启动这个rpcServer。如下图所示:
那我们主要关注start方法。NameNodeRpcServer中负责处理RPC的server对象都是RPC#Server类型的,它继承了org.apache.hadoop.ipc.Server类。如下代码所示,所以start方法跳转到了org.apache.hadoop.ipc.Server类的start方法:
// NameNodeRpcServer.java
protected final RPC.Server clientRpcServer;
// RPC.java
public abstract static class Server extends org.apache.hadoop.ipc.Server {
}
Server类的start方法如下图:
主要涉及到三个类:Responder、Listener、Handler。
这三个类本质上都是线程类。
Listener用于监听客户端发过来的请求,通过Reader进行对请求进行反序列化,然后将请求放到callQueue里,等待Handler从callQueue中take请求进行处理。Responder用来进行请求的响应。
到这里我们只剩下最后一个问题需要搞明白,Server类是怎么通过Handler把RPC调用最终委派给ClientNamenodeProtocolServerSideTranslatorPB的?
Handler是个线程类,它的run方法不断地从callQueue中take Call对象,然后执行Call.run()方法进行rpc请求的处理,其中又会调用到下图所示的call方法获得返回值:
追到call方法里面:到了ProtobufRpcEngine.Server.ProtoBufRpcInvoker#call方法(只给出关键部分,省略try catch和其他无关代码):
public Writable call(RPC.Server server, String connectionProtocolName,
Writable writableRequest, long receiveTime) throws Exception {
// 从注册的Protocol中获取protocolImpl
ProtoClassProtoImpl protocolImpl = getProtocolImpl(server,
declaringClassProtoName, clientVersion);
// 进而获取Blocking service对象
BlockingService service = (BlockingService) protocolImpl.protocolImpl;
MethodDescriptor methodDescriptor = service.getDescriptorForType()
.findMethodByName(methodName);
// 调用service对象上的方法,这个service对象其实就是在NameNodeRpcServer中通过
// ClientNamenodeProtocolServerSideTranslatorPB构造的对象。
result = service.callBlockingMethod(methodDescriptor, null, param);
}
第一个参数是RPC.Server类型,对应了前面NameNodeRpcServer中的三个变量:
然后我们来弄清楚BlockingService service对象的真身(跟ClientNamenodeProtocolServerSideTranslatorPB关联起来!)。
NameNodeRpcServer构造函数中,有如下代码:
public NameNodeRpcServer(Configuration conf, NameNode nn)
throws IOException {
ClientNamenodeProtocolServerSideTranslatorPB
clientProtocolServerTranslator =
new ClientNamenodeProtocolServerSideTranslatorPB(this);
BlockingService clientNNPbService = ClientNamenodeProtocol.
newReflectiveBlockingService(clientProtocolServerTranslator);
//记住这个setInstance,传入了上面的BlockingService变量。下一步我们会用到。
clientRpcServer = new RPC.Builder(conf)
.setProtocol(
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService)
.setBindAddress(bindHost)
.setPort(rpcAddr.getPort())
.setNumHandlers(handlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager())
.build();
}
在构造一个RPC Server的过程中,setInstance方法传入了一个BlockingService对象。在build的过程中,会进行判断,如果Instance不为空,那么就在new Server的时候注册起来,这样我们在刚才的call方法中拿到的protocolImpl其实就是从注册的Map中拿到这个BlockingService,也就和ClientNamenodeProtocolServerSideTranslatorPB关联了起来!
如下图:
到这里整个Hadoop RPC框架的整体过程已经讲完了。
当然里面有很多细节,比如Retry机制、Failover、异常处理、异步RPC请求、线程安全等等问题留待以后再深挖。
参考
Hadoop-3.1版本源码