浅谈HDFS源码的启动与心跳流程
在我们读这篇文章知晓hadoop RPC原理,查看,接下来,我们可以针对Apache hadoop技术源码阅读技巧进行分享。
1、NameNode启动流程源码深度剖析
我们可以从上图了解到一些信息,我们可以了解一些相关的参数信息,对此我们在hdfs架构设计可以往NameDode几个重要的类进行读解,我们可以先从NameNode类去了解得知:
NameNode服务既管理了HDFS的集群的命名空间(目录树)和 "inode table"。一个HDFS集群里面只有一个NameNode.(除了HA方案,或者是联邦)Namenode管理了两张比较重要的表:
一张表管理了文件与block之间的关系。
另一张表管理了block文件块与 DataNode主机之间的关系。
说明:
第一张表非常珍贵,存储到了磁盘上面。(因为文件与block块之间的关系是不会发生变化的)
第二张表是每次NameNode重启的时候重新构建出来的
Namenode服务是由三个重要的类支撑的
(1)NameNode类
管理配置的参数hdfs-site.xml core-site.xml
(2)NameNode server
IPC Server:
NameNodeRPCServer:开放端口,等待别人调用.比如:8020/9000
HTTP Server:
NameNodeHttpServer:开放50070界面,我们可以通过这个界面了解HDFS的情况
(3)FSNameSystem
这个类非常重要,管理了HDFS的元数据(目录树的信息)
NameNode的启动流程无非服务端就是 RPCServer,它的默认端口号为 9000/8020, HttpServer的默认端口号为 50070,后面启动HTTPServer
if (NamenodeRole.NAMENODE == role) {
//TODO 启动HTTPServer
startHttpServer(conf);
}
启动HTTPServer完毕后,它会加载元数据,加载元数据这个事,目前对集群刚启动的时候,我们不做重点分析,在后面分析到管理元数据的时候,我们会回过头来在分析,而为什么现在不分析?
重要,这个就是Hadoop RPC,当启动一些公共的服务。NameNode RPC的服务就是在里面启动的
如:进行资源检查,检查是否有磁盘足够存储元数据、进入安全模式检查,检查是否可以退出安全模式。
startCommonServices(conf);
而上面提过元数据管理,它的逻辑代码如下:
namesystem.startCommonServices(conf, haContext);
registerNNSMXBean();
if (NamenodeRole.NAMENODE != role) {
startHttpServer(conf);
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
}
在NameNode类里,我们在读源码会看到有个叫getHttpServerBindAddress方法,主要是设置了主机名和端口号
private void startHttpServer(final Configuration conf) throws IOException {
//TODO getHttpServerBindAddress 里面设置了主机名和端口号
httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf));
httpServer.start();
httpServer.setStartupProgress(startupProgress);
}
我们在分析源码时,需要留意的关键点 初始化的方法,它的代码逻辑如下:
initialize(conf);
try {
haContext.writeLock();
state.prepareToEnterState(haContext);
state.enterState(haContext);
} finally {
haContext.writeUnlock();
}
} catch (IOException e) {
this.stop();
throw e;
} catch (HadoopIllegalArgumentException e) {
this.stop();
throw e;
}
this.started.set(true);
}
我们以往做常规操作把集群刚刚搭建起来,然后集群进行初始化,都会执行“ hdfs namenode -format”命令,HDFS集群的时候会传进参数,我们这时候需要了解它的业务代码逻辑:
StartupOption startOpt = parseArguments(argv);
if (startOpt == null) {
printUsage(System.err);
return null;
}
setStartupOption(conf, startOpt);
switch (startOpt) {
case FORMAT: {
boolean aborted = format(conf, startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted ? 1 : 0);
return null; // avoid javac warning
}
case GENCLUSTERID: {
System.err.println("Generating new cluster id:");
System.out.println(NNStorage.newClusterID());
terminate(0);
return null;
}
case FINALIZE: {
System.err.println("Use of the argument '" + StartupOption.FINALIZE +
"' is no longer supported. To finalize an upgrade, start the NN " +
" and then run `hdfs dfsadmin -finalizeUpgrade'");
terminate(1);
return null; // avoid javac warning
}
case ROLLBACK: {
boolean aborted = doRollback(conf, true);
terminate(aborted ? 1 : 0);
return null; // avoid warning
}
case BOOTSTRAPSTANDBY: {
String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length);
int rc = BootstrapStandby.run(toolArgs, conf);
terminate(rc);
return null; // avoid warning
}
case INITIALIZESHAREDEDITS: {
boolean aborted = initializeSharedEdits(conf,
startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted ? 1 : 0);
return null; // avoid warning
}
case BACKUP:
case CHECKPOINT: {
NamenodeRole role = startOpt.toNodeRole();
DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
return new BackupNode(conf, role);
}
case RECOVER: {
NameNode.doRecovery(startOpt, conf);
return null;
}
case METADATAVERSION: {
printMetadataVersion(conf);
terminate(0);
return null; // avoid javac warning
}
case UPGRADEONLY: {
DefaultMetricsSystem.initialize("NameNode");
new NameNode(conf);
terminate(0);
return null;
}
default: {
DefaultMetricsSystem.initialize("NameNode");
//因为我们现在分析的是启动namenode的代码,所以代码肯定是走到这儿了。
//TODO 关键代码
return new NameNode(conf);
}
}
}
启动NameNode的代码后,肯定是走到这一步
在HDFS场景驱动下,我们只关心NameNode是如何启动的?阅读源码过程中,发现kafka 这个代码就写得比较好,我们可以看得详细一点。
public static void main(String argv[]) throws Exception {
//解析参数
if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
//参数异常退出
System.exit(0);
}
我们知道创建NameNode的核心代码,主要是RPC的服务端的,如下逻辑代码:
try {
StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
//TODO 创建NameNode的核心代码
//RPC的服务端
NameNode namenode = createNameNode(argv, null);
if (namenode != null) {
namenode.join();
}
} catch (Throwable e) {
LOG.error("Failed to start namenode.", e);
terminate(1, e);
}
}
接着我们在读源码有几个重要的类,分别是:
FSNamesystem
NameNodeResourceChecker
NameNodeResourcePolicy
CheckableNameNodeResource
NameNodeHttpServer、DFSConfigKeys
FSNamesystem类查看,发现NameNode启动的时候,主要是存储资源检查。其中,需要关注的文件 core-site.xml hdfs-site.xml,比如:
(1)fsimage目录磁盘的存储够不够(100M)
(2)editLog目录 磁盘的存储够不够(100M)
默认,这两个文件存储在同一个目录。
问题:NameNode如何做资源检查,定位元数据存在哪儿?
NameNode资源检查 通过core-site.xml hdfs-site.xml两个文件,就知道了元数据存在哪儿,NameNode的两个目录:存储fsiamge的目录,存储editlog的目录。
但是一般情况下,或者默认情况这两个使用的是同一个目录,加载了配置文件,配置文件里面有存储元数据的目录
在此,我们需要关注几个点,比如:检查是否有足够的磁盘存储元数据、HDFS的安全模式、启动重要服务的代码逻辑
nnResourceChecker = new NameNodeResourceChecker(conf);
//TODO 检查是否有足够的磁盘存储元数据
checkAvailableResources();
assert safeMode != null && !isPopulatingReplQueues();
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.SAFEMODE);
prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
getCompleteBlocksTotal());
//TODO HDFS的安全模式
setBlockTotal();
//TODO 启动重要服务
blockManager.activate(conf);
} finally {
writeUnlock();
}
registerMXBean();
DefaultMetricsSystem.instance().register(this);
if (inodeAttributeProvider != null) {
inodeAttributeProvider.start();
dir.setINodeAttributeProvider(inodeAttributeProvider);
}
snapshotManager.registerMXBean();
}
接着,我们在读源码过程中的NameNodeResourceChecker查看发现,比如:获取当前目录的空间大小,如果空间大小小于 100M 就返回false
@Override
public boolean isResourceAvailable() {
//TODO 获取当前目录的空间大小
long availableSpace = df.getAvailable();
if (LOG.isDebugEnabled()) {
LOG.debug("Space available on volume '" + volume + "' is "
+ availableSpace);
}
//TODO 如果空间大小小于 100M 就返回false
if (availableSpace < duReserved) {
//ERROR
LOG.warn("Space available on volume '" + volume + "' is "
+ availableSpace +
", which is below the configured reserved amount " + duReserved);
/**
* NameNode:有可能误操作,跑了一个任务,这个任务生成了很多数据。
*
*/
return false;
} else {
return true;
}
}
说明:NameNode:有可能误操作,跑了一个任务,这个任务生成了很多数据
问题:namenode服务器上面,肯定会有很多个目录?到底我们需要监控哪些目录呢?
(1)阈值
(2)添加需要监控的磁盘
localEditDirs -》 hdfs-site.xml core-site.xml
for (URI editsDirToCheck : localEditDirs) {
addDirToCheck(editsDirToCheck,
FSNamesystem.getRequiredNamespaceEditsDirs(conf).contains(
editsDirToCheck));
}
比如:一个目录就是一个CheckedVolume、这个对象其实就是一个目录、volumes里面会有多个路径(目录)
//一个目录就是一个CheckedVolume
CheckedVolume newVolume = new CheckedVolume(dir, required);
//这个对象其实就是一个目录
CheckedVolume volume = volumes.get(newVolume.getVolume());
if (volume == null || !volume.isRequired()) {
//volumes里面会有多个路径(目录)
volumes.put(newVolume.getVolume(), newVolume);
}
关键调用的代码,volumes 里面存放的就是需要检查的目录,如果资源不够,返回值是false
public boolean hasAvailableDiskSpace() {
//关键调用的是这儿的代码
// //volumes 里面存放的就是需要检查的目录
//如果资源不够,返回值是false
return NameNodeResourcePolicy.areResourcesAvailable(volumes.values(),
minimumRedundantVolumes);
}
继续读源码NameNodeResourcePolicy类查看,发现它是通过遍历每个目录来判断磁盘资源是否充足,它的代码逻辑如下:
//遍历每个目录
for (CheckableNameNodeResource resource : resources) {
if (!resource.isRequired()) {
redundantResourceCount++;
if (!resource.isResourceAvailable()) {
disabledRedundantResourceCount++;
}
} else {
requiredResourceCount++;
//TODO 判断磁盘资源是否充足
//resource.isResourceAvailable() = false
//!false = true
//如果空间不够那么就返回false
if (!resource.isResourceAvailable()) {
// Short circuit - a required resource is not available.
return false;
}
}
}
继续读源码CheckableNameNodeResource类查看,我们知道Hadoop喜欢自己封装东西。举个例子,本来就有RPC的服务。但是Hadoop自己封装了一个Hadoop RPC,这个地方也是类似,本来有HttpServer,Hadoop经过封装封装了,比如 HttpServer2服务,它的代码逻辑是这样:
HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf,
httpAddr, httpsAddr, "hdfs",
DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY);
httpServer = builder.build();
if (policy.isHttpsEnabled()) {
// assume same ssl port for all datanodes
InetSocketAddress datanodeSslPort = NetUtils.createSocketAddr(conf.getTrimmed(
DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":"
+ DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT));
httpServer.setAttribute(DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY,
datanodeSslPort.getPort());
}
initWebHdfs(conf);
httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn);
httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
有个绑定一堆servlet,也就是说servlet越多,支持的功能就越多
setupServlets(httpServer, conf);
启动httpServer服务,对外开放50070端口
httpServer.start();
当然,在NameNodeHttpServer类里,发现还有一个上传元数据的请求,SecondaryNameNode/StandByNamenode合并出来的FSImage需要替换Active NameNode的fsimage,发送的就是http的请求,请求就会转发给这个servlet
httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
ImageServlet.class, true);
我们可以在50070界面上浏览目录信息,就是因为这儿有这个servlet,如:"http://hadoop1:50070/listpahts/?path=/"
httpServer.addInternalServlet("listPaths", "/listPaths/*",
ListPathsServlet.class, false);
httpServer.addInternalServlet("data", "/data/*",
FileDataServlet.class, false);
httpServer.addInternalServlet("checksum", "/fileChecksum/*",
FileChecksumServlets.RedirectServlet.class, false);
httpServer.addInternalServlet("contentSummary", "/contentSummary/*",
ContentSummaryServlet.class, false);
}
2、DataNode启动流程源码深度剖析
Data启动初始化
注册流程
DataNode与NameNode启动流程
(1)DataNode存储hdfs上block文件块。在一个文件系统里面可以有多个dataNode,每个DataNode周期性的跟NameNode进行通信,客户端也可以跟DataNode进行交互,或者DataNode之间也可以进行相互通信。
(2)DataNode存储一系列block,DataNode允许客户端去读写block。DataNode也会去响应NameNode,响应NameNode发送过来的一些指令,
比如:删除block,复制block等操作。
说明:NameNode不会直接去操作DataNode,如:datanode -> 发送心跳 Namenode
(3)datanode管理了一个重要的表:block -》 stream of bytes 一些元数据的信息。
(4)这个信息是存储在本地磁盘,DataNode启动的时候会把这些信息,汇报给NameNode,启动了以后也会再去不断的汇报。
(5)DataNode启动了以后会一直去问namenode自己需要干些什么?
NameNode是不能直接去操作DataNode的。DataNode启动了以后,会跟NameNode,进行心跳,NameNode接收到了心跳了以后,如果需要这个DataNode做什么事,就会给DataNode一个返回值(指令),DataNode接收到这些指令以后就知道NameNode想让他做什么事了
(6)DataNode开放了Socket服务,让客户端或者别的DataNode来进行读写数据。DataNode启动的时候会把自己的主机名和端口号汇报给NameNode,也就是说如果Client和DataNode想要去访问某个DataNode.首先要跟NameNode进行通信,从NameNode那儿获取到目标DataNode的主机名和端口号。这样才可以访问到对应的DataNode了。
总结:
一个集群里面可以有很多个DataNode,这些DataNode就是用来存储数据的。
DataNode启动了以后会周期性的跟NameNode进行通信(心跳,块汇报)
NameNode不能直接操作DataNode.而是通信心跳返回值指令的方式去操作的DataNode.
DataNode启动了以后开放了一个socket的服务(RPC),等待别人去调用他。
读源码关注这几个类
DataXceiverServer
DataStorage
Storage
InterDatanodeProtocol
BlockPoolManager
DatanodeProtocolClientSideTranslatorPB
在DataXceiverServer类里,发现这个服务启动起来以后是用来处理客户端或者其他DataNode发送过来的的读写block数据请求的,这个地方没有使用Hadoop的RPC,每发送过来一个block 都启动一个DataXceiver 去处理这个block,它的逻辑代码如下:
new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this))
.start();
在BlockPoolManager类里,发现会遍历所有的联邦
synchronized void startAll() throws IOException {
try {
UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
//TODO 遍历所有的BPOfferService 遍历所有的联邦
for (BPOfferService bpos : offerServices) {
//TODO 重要
bpos.start();
}
return null;
}
});
} catch (InterruptedException ex) {
IOException ioe = new IOException();
ioe.initCause(ex.getCause());
throw ioe;
}
}
重要代码
doRefreshNamenodes(newAddressMap);
比如:namenode服务器搭建好:namenode上的配置文件一模一样的复制到各个datanode节点。
通常情况下:HDFS集群的架构是HA架构,如果是联邦架构,里面就会有多个namenodeservice
hadoop1,hadoop2 -> 联邦1 service1
hadoop3,hadoop4 -> 联邦2 service2
toAdd里面有多少有的联邦:
一个联邦就是一个NameService,如果是2个联邦,那么这个地方就会有两个值,BPOfferService -》 一个联邦
重要的关系:
一个联邦对应一个BPOfferService
一个联邦里面的一个NameNode就是一个BPServiceActor,也就是正常来说一个BPOfferService对应两个BPServiceActor
BPOfferService bpos = createBPOS(addrs);
bpByNameserviceId.put(nsToAdd, bpos);
offerServices.add(bpos);
}
最后,DataNode向NameNode进行注册和心跳
startAll();//术的层面 道的层
知其所以然、知其所以必然,知其然而不知其所以然;蒙惠者虽知其然,而未必知其所以然;也这是我们从学习实践中得出的深切体会!分享完毕,谢谢!