vlambda博客
学习文章列表

浅谈HDFS源码的启动与心跳流程

在我们读这篇文章知晓hadoop RPC原理,查看,接下来,我们可以针对Apache hadoop技术源码阅读技巧进行分享。


1、NameNode启动流程源码深度剖析


我们可以从上图了解到一些信息,我们可以了解一些相关的参数信息,对此我们在hdfs架构设计可以往NameDode几个重要的类进行读解,我们可以先从NameNode类去了解得知:


NameNode服务既管理了HDFS的集群的命名空间(目录树)和 "inode table"。一个HDFS集群里面只有一个NameNode.(除了HA方案,或者是联邦)Namenode管理了两张比较重要的表:

  • 一张表管理了文件与block之间的关系。

  • 另一张表管理了block文件块与 DataNode主机之间的关系。


说明:

第一张表非常珍贵,存储到了磁盘上面。(因为文件与block块之间的关系是不会发生变化的)

第二张表是每次NameNode重启的时候重新构建出来的


Namenode服务是由三个重要的类支撑的

(1)NameNode类

  • 管理配置的参数hdfs-site.xml core-site.xml



(2)NameNode server

        IPC Server:

  • NameNodeRPCServer:开放端口,等待别人调用.比如:8020/9000

         HTTP Server:

  • NameNodeHttpServer:开放50070界面,我们可以通过这个界面了解HDFS的情况

 

(3)FSNameSystem

  •  这个类非常重要,管理了HDFS的元数据(目录树的信息)


NameNode的启动流程无非服务端就是 RPCServer,它的默认端口号为 9000/8020, HttpServer的默认端口号为 50070,后面启动HTTPServer

 if (NamenodeRole.NAMENODE == role) { //TODO 启动HTTPServer startHttpServer(conf); }


启动HTTPServer完毕后,它会加载元数据,加载元数据这个事,目前对集群刚启动的时候,我们不做重点分析,在后面分析到管理元数据的时候,我们会回过头来在分析,而为什么现在不分析?

重要,这个就是Hadoop RPC,当启动一些公共的服务。NameNode RPC的服务就是在里面启动的


如:进行资源检查,检查是否有磁盘足够存储元数据、进入安全模式检查,检查是否可以退出安全模式。

 startCommonServices(conf);

而上面提过元数据管理,它的逻辑代码如下:

namesystem.startCommonServices(conf, haContext); registerNNSMXBean(); if (NamenodeRole.NAMENODE != role) { startHttpServer(conf); httpServer.setNameNodeAddress(getNameNodeAddress()); httpServer.setFSImage(getFSImage()); }


在NameNode类里,我们在读源码会看到有个叫getHttpServerBindAddress方法,主要是设置了主机名和端口号

 private void startHttpServer(final Configuration conf) throws IOException { //TODO getHttpServerBindAddress 里面设置了主机名和端口号 httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf)); httpServer.start(); httpServer.setStartupProgress(startupProgress); }

我们在分析源码时,需要留意的关键点 初始化的方法,它的代码逻辑如下:

 initialize(conf); try { haContext.writeLock(); state.prepareToEnterState(haContext); state.enterState(haContext); } finally { haContext.writeUnlock(); } } catch (IOException e) { this.stop(); throw e; } catch (HadoopIllegalArgumentException e) { this.stop(); throw e; } this.started.set(true); }

我们以往做常规操作把集群刚刚搭建起来,然后集群进行初始化,都会执行“ hdfs namenode -format”命令,HDFS集群的时候会传进参数,我们这时候需要了解它的业务代码逻辑:

 StartupOption startOpt = parseArguments(argv); if (startOpt == null) { printUsage(System.err); return null; } setStartupOption(conf, startOpt);
switch (startOpt) { case FORMAT: { boolean aborted = format(conf, startOpt.getForceFormat(), startOpt.getInteractiveFormat()); terminate(aborted ? 1 : 0); return null; // avoid javac warning } case GENCLUSTERID: { System.err.println("Generating new cluster id:"); System.out.println(NNStorage.newClusterID()); terminate(0); return null; } case FINALIZE: { System.err.println("Use of the argument '" + StartupOption.FINALIZE + "' is no longer supported. To finalize an upgrade, start the NN " + " and then run `hdfs dfsadmin -finalizeUpgrade'"); terminate(1); return null; // avoid javac warning } case ROLLBACK: { boolean aborted = doRollback(conf, true); terminate(aborted ? 1 : 0); return null; // avoid warning } case BOOTSTRAPSTANDBY: { String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length); int rc = BootstrapStandby.run(toolArgs, conf); terminate(rc); return null; // avoid warning } case INITIALIZESHAREDEDITS: { boolean aborted = initializeSharedEdits(conf, startOpt.getForceFormat(), startOpt.getInteractiveFormat()); terminate(aborted ? 1 : 0); return null; // avoid warning } case BACKUP: case CHECKPOINT: { NamenodeRole role = startOpt.toNodeRole(); DefaultMetricsSystem.initialize(role.toString().replace(" ", "")); return new BackupNode(conf, role); } case RECOVER: { NameNode.doRecovery(startOpt, conf); return null; } case METADATAVERSION: { printMetadataVersion(conf); terminate(0); return null; // avoid javac warning } case UPGRADEONLY: { DefaultMetricsSystem.initialize("NameNode"); new NameNode(conf); terminate(0); return null; }
default: { DefaultMetricsSystem.initialize("NameNode"); //因为我们现在分析的是启动namenode的代码,所以代码肯定是走到这儿了。 //TODO 关键代码 return new NameNode(conf); } } }

启动NameNode的代码后,肯定是走到这一步

浅谈HDFS源码的启动与心跳流程

在HDFS场景驱动下,我们只关心NameNode是如何启动的?阅读源码过程中,发现kafka 这个代码就写得比较好,我们可以看得详细一点。

public static void main(String argv[]) throws Exception { //解析参数 if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) { //参数异常退出 System.exit(0);    }

我们知道创建NameNode的核心代码,主要是RPC的服务端的,如下逻辑代码:

 try { StringUtils.startupShutdownMessage(NameNode.class, argv, LOG); //TODO 创建NameNode的核心代码 //RPC的服务端 NameNode namenode = createNameNode(argv, null); if (namenode != null) {
namenode.join(); } } catch (Throwable e) { LOG.error("Failed to start namenode.", e); terminate(1, e); } }


接着我们在读源码有几个重要的类,分别是:

FSNamesystem

NameNodeResourceChecker

NameNodeResourcePolicy

CheckableNameNodeResource

NameNodeHttpServer、DFSConfigKeys


FSNamesystem类查看,发现NameNode启动的时候,主要是存储资源检查。其中,需要关注的文件 core-site.xml  hdfs-site.xml,比如:

(1)fsimage目录磁盘的存储够不够(100M)

(2)editLog目录  磁盘的存储够不够(100M)

默认,这两个文件存储在同一个目录。


问题:NameNode如何做资源检查,定位元数据存在哪儿?

NameNode资源检查 通过core-site.xml  hdfs-site.xml两个文件,就知道了元数据存在哪儿,NameNode的两个目录:存储fsiamge的目录,存储editlog的目录。


但是一般情况下,或者默认情况这两个使用的是同一个目录,加载了配置文件,配置文件里面有存储元数据的目录


在此,我们需要关注几个点,比如:检查是否有足够的磁盘存储元数据、HDFS的安全模式、启动重要服务的代码逻辑

 nnResourceChecker = new NameNodeResourceChecker(conf); //TODO 检查是否有足够的磁盘存储元数据 checkAvailableResources();
assert safeMode != null && !isPopulatingReplQueues(); StartupProgress prog = NameNode.getStartupProgress(); prog.beginPhase(Phase.SAFEMODE); prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS, getCompleteBlocksTotal()); //TODO HDFS的安全模式 setBlockTotal(); //TODO 启动重要服务 blockManager.activate(conf); } finally { writeUnlock(); }
registerMXBean(); DefaultMetricsSystem.instance().register(this); if (inodeAttributeProvider != null) { inodeAttributeProvider.start(); dir.setINodeAttributeProvider(inodeAttributeProvider); } snapshotManager.registerMXBean();  }

接着,我们在读源码过程中的NameNodeResourceChecker查看发现,比如:获取当前目录的空间大小,如果空间大小小于  100M  就返回false

 @Override public boolean isResourceAvailable() { //TODO 获取当前目录的空间大小 long availableSpace = df.getAvailable(); if (LOG.isDebugEnabled()) { LOG.debug("Space available on volume '" + volume + "' is " + availableSpace); } //TODO 如果空间大小小于 100M 就返回false if (availableSpace < duReserved) { //ERROR LOG.warn("Space available on volume '" + volume + "' is " + availableSpace + ", which is below the configured reserved amount " + duReserved);
/** * NameNode:有可能误操作,跑了一个任务,这个任务生成了很多数据。 * */
return false; } else { return true; } }

说明:NameNode:有可能误操作,跑了一个任务,这个任务生成了很多数据


问题:namenode服务器上面,肯定会有很多个目录?到底我们需要监控哪些目录呢?

(1)阈值

浅谈HDFS源码的启动与心跳流程(2)添加需要监控的磁盘

localEditDirs -》 hdfs-site.xml  core-site.xml

 for (URI editsDirToCheck : localEditDirs) { addDirToCheck(editsDirToCheck, FSNamesystem.getRequiredNamespaceEditsDirs(conf).contains( editsDirToCheck)); }

比如:一个目录就是一个CheckedVolume、这个对象其实就是一个目录、volumes里面会有多个路径(目录)

 //一个目录就是一个CheckedVolume CheckedVolume newVolume = new CheckedVolume(dir, required); //这个对象其实就是一个目录 CheckedVolume volume = volumes.get(newVolume.getVolume()); if (volume == null || !volume.isRequired()) { //volumes里面会有多个路径(目录) volumes.put(newVolume.getVolume(), newVolume);    }


关键调用的代码,volumes 里面存放的就是需要检查的目录,如果资源不够,返回值是false

public boolean hasAvailableDiskSpace() { //关键调用的是这儿的代码 // //volumes 里面存放的就是需要检查的目录 //如果资源不够,返回值是false return NameNodeResourcePolicy.areResourcesAvailable(volumes.values(), minimumRedundantVolumes); }

继续读源码NameNodeResourcePolicy类查看,发现它是通过遍历每个目录来判断磁盘资源是否充足,它的代码逻辑如下:

 //遍历每个目录 for (CheckableNameNodeResource resource : resources) { if (!resource.isRequired()) { redundantResourceCount++; if (!resource.isResourceAvailable()) { disabledRedundantResourceCount++; } } else { requiredResourceCount++; //TODO 判断磁盘资源是否充足 //resource.isResourceAvailable() = false //!false = true
//如果空间不够那么就返回false if (!resource.isResourceAvailable()) { // Short circuit - a required resource is not available. return false; } }    }

继续读源码CheckableNameNodeResource类查看,我们知道Hadoop喜欢自己封装东西。举个例子,本来就有RPC的服务。但是Hadoop自己封装了一个Hadoop RPC,这个地方也是类似,本来有HttpServer,Hadoop经过封装封装了,比如 HttpServer2服务,它的代码逻辑是这样:

 HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf, httpAddr, httpsAddr, "hdfs", DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY);
httpServer = builder.build();
if (policy.isHttpsEnabled()) { // assume same ssl port for all datanodes InetSocketAddress datanodeSslPort = NetUtils.createSocketAddr(conf.getTrimmed( DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":" + DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT)); httpServer.setAttribute(DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY, datanodeSslPort.getPort()); }
initWebHdfs(conf);
httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn); httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);

有个绑定一堆servlet,也就是说servlet越多,支持的功能就越多

 setupServlets(httpServer, conf);

启动httpServer服务,对外开放50070端口

 httpServer.start();

当然,在NameNodeHttpServer类里,发现还有一个上传元数据的请求,SecondaryNameNode/StandByNamenode合并出来的FSImage需要替换Active NameNode的fsimage,发送的就是http的请求,请求就会转发给这个servlet

 httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC, ImageServlet.class, true);

我们可以在50070界面上浏览目录信息,就是因为这儿有这个servlet,如:"http://hadoop1:50070/listpahts/?path=/"

 httpServer.addInternalServlet("listPaths", "/listPaths/*", ListPathsServlet.class, false); httpServer.addInternalServlet("data", "/data/*", FileDataServlet.class, false); httpServer.addInternalServlet("checksum", "/fileChecksum/*", FileChecksumServlets.RedirectServlet.class, false); httpServer.addInternalServlet("contentSummary", "/contentSummary/*", ContentSummaryServlet.class, false); }




2、DataNode启动流程源码深度剖析


Data启动初始化

浅谈HDFS源码的启动与心跳流程


注册流程

浅谈HDFS源码的启动与心跳流程


DataNode与NameNode启动流程



(1)DataNode存储hdfs上block文件块。在一个文件系统里面可以有多个dataNode,每个DataNode周期性的跟NameNode进行通信,客户端也可以跟DataNode进行交互,或者DataNode之间也可以进行相互通信。


(2)DataNode存储一系列block,DataNode允许客户端去读写block。DataNode也会去响应NameNode,响应NameNode发送过来的一些指令,

比如:删除block,复制block等操作。

说明:NameNode不会直接去操作DataNode,如:datanode -> 发送心跳  Namenode


(3)datanode管理了一个重要的表:block -》 stream of bytes  一些元数据的信息。


(4)这个信息是存储在本地磁盘,DataNode启动的时候会把这些信息,汇报给NameNode,启动了以后也会再去不断的汇报。


(5)DataNode启动了以后会一直去问namenode自己需要干些什么?

NameNode是不能直接去操作DataNode的。DataNode启动了以后,会跟NameNode,进行心跳,NameNode接收到了心跳了以后,如果需要这个DataNode做什么事,就会给DataNode一个返回值(指令),DataNode接收到这些指令以后就知道NameNode想让他做什么事了


(6)DataNode开放了Socket服务,让客户端或者别的DataNode来进行读写数据。DataNode启动的时候会把自己的主机名和端口号汇报给NameNode,也就是说如果Client和DataNode想要去访问某个DataNode.首先要跟NameNode进行通信,从NameNode那儿获取到目标DataNode的主机名和端口号。这样才可以访问到对应的DataNode了。


总结:

  • 一个集群里面可以有很多个DataNode,这些DataNode就是用来存储数据的。

  • DataNode启动了以后会周期性的跟NameNode进行通信(心跳,块汇报)

  • NameNode不能直接操作DataNode.而是通信心跳返回值指令的方式去操作的DataNode.

  • DataNode启动了以后开放了一个socket的服务(RPC),等待别人去调用他。



读源码关注这几个类

DataXceiverServer

DataStorage

Storage

InterDatanodeProtocol

BlockPoolManager

DatanodeProtocolClientSideTranslatorPB

在DataXceiverServer类里,发现这个服务启动起来以后是用来处理客户端或者其他DataNode发送过来的的读写block数据请求的,这个地方没有使用Hadoop的RPC,每发送过来一个block 都启动一个DataXceiver 去处理这个block,它的逻辑代码如下:

 new Daemon(datanode.threadGroup, DataXceiver.create(peer, datanode, this)) .start();

在BlockPoolManager类里,发现会遍历所有的联邦

synchronized void startAll() throws IOException { try { UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { //TODO 遍历所有的BPOfferService 遍历所有的联邦 for (BPOfferService bpos : offerServices) { //TODO 重要 bpos.start(); } return null; } }); } catch (InterruptedException ex) { IOException ioe = new IOException(); ioe.initCause(ex.getCause()); throw ioe; }  }

 重要代码

 doRefreshNamenodes(newAddressMap);

比如:namenode服务器搭建好:namenode上的配置文件一模一样的复制到各个datanode节点。

通常情况下:HDFS集群的架构是HA架构,如果是联邦架构,里面就会有多个namenodeservice

hadoop1,hadoop2 -> 联邦1 service1hadoop3,hadoop4 -> 联邦2 service2

toAdd里面有多少有的联邦:

一个联邦就是一个NameService,如果是2个联邦,那么这个地方就会有两个值,BPOfferService -》 一个联邦


重要的关系:

  • 一个联邦对应一个BPOfferService

  • 一个联邦里面的一个NameNode就是一个BPServiceActor,也就是正常来说一个BPOfferService对应两个BPServiceActor

 BPOfferService bpos = createBPOS(addrs); bpByNameserviceId.put(nsToAdd, bpos); offerServices.add(bpos); }

最后,DataNode向NameNode进行注册和心跳

 startAll();//术的层面 道的层



知其所以然、知其所以必然,知其然而不知其所以然;蒙惠者虽知其然,而未必知其所以然;也这是我们从学习实践中得出的深切体会!分享完毕,谢谢!