vlambda博客
学习文章列表

【精】彻底熟悉Hadoop RPC框架

学习HDFS绕不开的一个重要的知识点就是Hadoop RPC框架。但是能将RPC框架从客户端->服务端,再从服务端到客户端这一套通信脉络顺着讲明白的资料很少。不少都是给出各个类的定义啊,调用了什么方法啊,很难让读者直观的理解Hadoop RPC框架。因此决定写这样一篇文章,我觉得只要认真读这篇文章,同时跟着本文的流程走一遍,掌握Hadoop RPC框架绝对是不在话下。

本文通过流程图+文字详细的介绍了Hadoop RPC框架。本文首先从DFSClient出发,追踪一个RPC请求的传播路径,以此对Hadoop RPC框架有个初步的了解。然后力图以先整体再局部的方式详细介绍Hadoop RPC框架。

本文涉及到的预备知识有:
静态内部类的用法、动态代理设计模式

RPC调用链路

当我们使用HDFS  API进行编程或者HDFS CLI命令行输入命令执行时,内部会通过调用DFSClient的相关方法来实现。以HDFS API的使用为例:

①使用HDFS文件系统API前,先得到一个FileSystem对象。
②调用FileSystem对象中的方法。

  // ①使用HDFS文件系统API前,先得到一个FileSystem对象。
  public static FileSystem getFileSystem(String coreSitePath, String hdfsSitePath) throws IOException {
    Configuration conf = new Configuration();
    conf.addResource(new File(coreSitePath).toURI().toURL());
    conf.addResource(new File(hdfsSitePath).toURI().toURL());
    conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
    conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
    conf.setBoolean("fs.hdfs.impl.disable.cache"true);
    conf.setClassLoader(BaseUtil.class.getClassLoader());
    return FileSystem.get(conf);
  }

// ②调用FileSystem对象中的方法。
FileSystem fileSystem = getFileSystem(coreSitePath, hdfsSitePath);
ContentSummary contentSummary = fileSystem.getContentSummary(new Path(hdfsFilePath));

代码里的FileSystem的实际类型其实是DistributedFileSystem,观察DistributedFileSystem类的源码,里面持有了一个DFSClient对象dfs,如下图:

当调用FileSystem类的方法时,其实是调用了DFSClient的方法,如下图:

【精】彻底熟悉Hadoop RPC框架

那我们就从DFSClient类开始,描述一个RPC的传播探险过程,如下图所示。
淡蓝色大长方形代表类,深蓝色小长方形代表类里的字段。箭头指向字段的真实类型。

【精】彻底熟悉Hadoop RPC框架

用文字来描述一下:
DFSClient#namenode (实际类型:ClientNamenodeProtocolTranslatorPB)
|
|
ClientNamenodeProtocolPB#rpcProxy(实际类型:通过JDK动态代理生成的ClientNamenodeProtocolPB的代理对象)
|
通过动态代理Invoker类的invoke方法与ClientNamenodeProtocolServerSideTranslatorPB建立Socket连接
|
ClientNamenodeProtocolServerSideTranslatorPB#server (实际类型:NameNodeRpcServer 或者 RouterRpcServer)

已经非常容易看懂了!!

那其实这里面涉及到三个问题:
①DFSClient的namenode成员变量怎么初始化成ClientNamenodeProtocolTranslatorPB的?

②ClientNamenodeProtocolPB的rpcProxy成员变量怎么赋值为动态代理对象的,以及代理对象怎么通过invoke方法与ClientNamenodeProtocolServerSideTranslatorPB建立了Socket连接?

③ClientNamenodeProtocolServerSideTranslatorPB的server成员变量怎么初始化成NameNodeRpcServer或者RouterRpcServer的?以及Server端怎么解析RPC请求的?

我们按顺序来回答这三个问题:

首先第一个问题:①DFSClient的namenode成员变量怎么初始化成ClientNamenodeProtocolTranslatorPB的?

通过Debug HDFS Client,可以发现最终是调用到了org.apache.hadoop.hdfs.NameNodeProxiesClient#createNonHAProxyWithClientProtocol方法。

调用栈如下:

【精】彻底熟悉Hadoop RPC框架

关注一下调用栈栈顶的NameNodeProxiesClient#createNonHAProxyWithClientProtocol方法:

【精】彻底熟悉Hadoop RPC框架

OK,第一个问题回答完了。

接着回答第二个问题:
②ClientNamenodeProtocolPB的rpcProxy成员变量怎么赋值为动态代理对象的,以及代理对象怎么通过invoke方法与ClientNamenodeProtocolServerSideTranslatorPB建立了Socket连接?

刚刚在上面的图里,NameNodeProxiesClient#createNonHAProxyWithClientProtocol中通过RPC.getProtocolProxy返回了一个ClientNamenodeProtocolPB类型的变量proxy。这个对象就是ClientNamenodeProtocolTranslatorPB中的rpcProxy成员。(因为通过构造函数传进去了)

我们看ClientNamenodeProtocolPB类型,这是一个接口。里面定义了Protocol Buffer根据我们提供的.proto文件生成的类。

同时我们也看到了proxy是这样生成的:

ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
        ClientNamenodeProtocolPB.class, version, address, ugi, conf,
        NetUtils.getDefaultSocketFactory(conf),
        org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
        fallbackToSimpleAuth).getProxy();

追踪 RPC.getProtocolProxy方法:

   public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
                                long clientVersion,
                                InetSocketAddress addr,
                                UserGroupInformation ticket,
                                Configuration conf,
                                SocketFactory factory,
                                int rpcTimeout,
                                RetryPolicy connectionRetryPolicy,
                                AtomicBoolean fallbackToSimpleAuth)

       throws IOException 
{
    if (UserGroupInformation.isSecurityEnabled()) {
      SaslRpcServer.init(conf);
    }
    return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
        addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
        fallbackToSimpleAuth);
  }

可以发现最终是由RpcEngine接口定义的getProxy()方法来设置的proxy,继续往下追,看这个getProxy方法的具体实现,实际中Hadoop一般是基于Protocol Buffer作为序列化框架的,因此我们看ProtobufRpcEngine中的实现,如下图:

【精】彻底熟悉Hadoop RPC框架

OK,我们知道了ClientNamenodeProtocolTranslatorPB#rpcProxy这个对象实际上就是在ProtobufRpcEngine#getProxy中生成的代理对象之后(第②个问题的第一小问),来看看代理对象的invoke方法是如何与Server端建立的Socket连接吧(第②个问题的第二小问)!

我们直接来到ProtobufRpcEngine#Invoker内部类的invoke方法。(动态代理就是这么设计的,就是要看invoke方法)

invoke方法很长,我截取了其中主要框架代码,忽略了一些不影响我们理解Hadoop RPC框架的细节(比如参数验证啊什么的),代码如下,核心关注一行client.call(xxx)。相当于在这里最终调用到了Client类的call方法。

    @Override
    public Message invoke(Object proxy, final Method method, Object[] args)
        throws ServiceException 
{
      // 构造RPC请求header
      RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
      // RPC请求参数
      final Message theRequest = (Message) args[1];
      final RpcWritable.Buffer val;
      try {
        // 核心方法!
        val = (RpcWritable.Buffer) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
            new RpcProtobufRequest(rpcRequestHeader, theRequest), remoteId,
            fallbackToSimpleAuth);

      } catch (Throwable e) {        
        throw new ServiceException(e);
      } finally {
        if (traceScope != null) traceScope.close();
      }
      // 是异步模式? 可以先忽略
      if (Client.isAsynchronousMode()) {
        final AsyncGet<RpcWritable.Buffer, IOException> arr
            = Client.getAsyncRpcResponse();
        final AsyncGet<Message, Exception> asyncGet
            = new AsyncGet<Message, Exception>() {
          @Override
          public Message get(long timeout, TimeUnit unit) throws Exception {
            return getReturnMessage(method, arr.get(timeout, unit));
          }

          @Override
          public boolean isDone() {
            return arr.isDone();
          }
        };
        ASYNC_RETURN_MESSAGE.set(asyncGet);
        return null;
      } else {
        // 根据服务端响应构造返回结果的Messge对象
        return getReturnMessage(method, val);
      }
    }

经过追踪,建立Socket的整体流程是:Client#call -> Client#getConnection-> Client#Connection#setupIOstreams ->Client#Connection#setupConnection -> NetUtils.connect。整个调用链路如下图所示:

【精】彻底熟悉Hadoop RPC框架

OK前两个问题回答完毕,接着来看第三个问题:
③ClientNamenodeProtocolServerSideTranslatorPB的server成员变量怎么初始化成NameNodeRpcServer或者RouterRpcServer的?

回顾第二步,Hadoop通过JDK动态代理拿到了一个ClientNamenodeProtocolPB类型的代理对象:rpcProxy,然后把方法的调用委托给代理对象。代理对象会根据我们在hdfs-site.xml中提供的Namenode RPC的Socket标识符(IP或hostname + 端口号)与这个Socket标识符建立连接,进行RPC请求和获取响应。

因此在Namenode RPC Server端会有启动这个监听端口的行为。我们来看一看。观察到在NamenodeRpcServer的构造函数中,new了ClientNamenodeProtocolServerSideTranslatorPB对象。这个对象看类的名字有个ServerSide,意思是Server一侧的TranslatePB,对应了之前讲的客户端一侧的TranslatePB。

【精】彻底熟悉Hadoop RPC框架

OK,我们接下来从NameNode类出发,进而推理到NamenodeRpcServer以及ClientNamenodeProtocolServerSideTranslatorPB。

首先给出一个整体的流程图:

【精】彻底熟悉Hadoop RPC框架

在Namenode初始化的时候,会根据各种配置项构造一个NamenodeRpcServer对象rpcServer,然后在启动服务函数里启动这个rpcServer。如下图所示:

【精】彻底熟悉Hadoop RPC框架

那我们主要关注start方法。NameNodeRpcServer中负责处理RPC的server对象都是RPC#Server类型的,它继承了org.apache.hadoop.ipc.Server类。如下代码所示,所以start方法跳转到了org.apache.hadoop.ipc.Server类的start方法:

// NameNodeRpcServer.java
protected final RPC.Server clientRpcServer;

// RPC.java
public abstract static class Server extends org.apache.hadoop.ipc.Server {
}

Server类的start方法如下图:

【精】彻底熟悉Hadoop RPC框架

主要涉及到三个类:Responder、Listener、Handler。
这三个类本质上都是线程类。

Listener用于监听客户端发过来的请求,通过Reader进行对请求进行反序列化,然后将请求放到callQueue里,等待Handler从callQueue中take请求进行处理。Responder用来进行请求的响应。

到这里我们只剩下最后一个问题需要搞明白,Server类是怎么通过Handler把RPC调用最终委派给ClientNamenodeProtocolServerSideTranslatorPB的?

Handler是个线程类,它的run方法不断地从callQueue中take Call对象,然后执行Call.run()方法进行rpc请求的处理,其中又会调用到下图所示的call方法获得返回值:

【精】彻底熟悉Hadoop RPC框架

追到call方法里面:到了ProtobufRpcEngine.Server.ProtoBufRpcInvoker#call方法(只给出关键部分,省略try catch和其他无关代码):

      public Writable call(RPC.Server server, String connectionProtocolName,
          Writable writableRequest, long receiveTime)
 throws Exception 
{

        // 从注册的Protocol中获取protocolImpl
        ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, 
                              declaringClassProtoName, clientVersion);
        // 进而获取Blocking service对象
        BlockingService service = (BlockingService) protocolImpl.protocolImpl;
        MethodDescriptor methodDescriptor = service.getDescriptorForType()
            .findMethodByName(methodName);

        // 调用service对象上的方法,这个service对象其实就是在NameNodeRpcServer中通过
        // ClientNamenodeProtocolServerSideTranslatorPB构造的对象。
        result = service.callBlockingMethod(methodDescriptor, null, param);
    }

第一个参数是RPC.Server类型,对应了前面NameNodeRpcServer中的三个变量:

然后我们来弄清楚BlockingService service对象的真身(跟ClientNamenodeProtocolServerSideTranslatorPB关联起来!)。

NameNodeRpcServer构造函数中,有如下代码:

public NameNodeRpcServer(Configuration conf, NameNode nn)
      throws IOException 
{

    ClientNamenodeProtocolServerSideTranslatorPB 
       clientProtocolServerTranslator = 
         new ClientNamenodeProtocolServerSideTranslatorPB(this);
    BlockingService clientNNPbService = ClientNamenodeProtocol.
         newReflectiveBlockingService(clientProtocolServerTranslator);

    //记住这个setInstance,传入了上面的BlockingService变量。下一步我们会用到。
    clientRpcServer = new RPC.Builder(conf)
        .setProtocol(
            org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
        .setInstance(clientNNPbService)
        .setBindAddress(bindHost)
        .setPort(rpcAddr.getPort())
        .setNumHandlers(handlerCount)
        .setVerbose(false)
        .setSecretManager(namesystem.getDelegationTokenSecretManager())
        .build();

}

在构造一个RPC Server的过程中,setInstance方法传入了一个BlockingService对象。在build的过程中,会进行判断,如果Instance不为空,那么就在new Server的时候注册起来,这样我们在刚才的call方法中拿到的protocolImpl其实就是从注册的Map中拿到这个BlockingService,也就和ClientNamenodeProtocolServerSideTranslatorPB关联了起来!

如下图:

到这里整个Hadoop RPC框架的整体过程已经讲完了。

当然里面有很多细节,比如Retry机制、Failover、异常处理、异步RPC请求、线程安全等等问题留待以后再深挖。

参考
Hadoop-3.1版本源码