浅谈HDFS写数据流程的核心架构设计(上)
读源码至今发现,HDFS的源码写得不好,在学习HDFS过程中,所以我们在需要理解写数据流程的架构设计,另为什么要建立数据管理,这是我们需要去思考的问题,从上一节我们梳理就不再概述。
接下来我们继续深入去阅读源码,在读源代码之前,可以先了解写数据流程的核心架构如下图所示:
思考
假如多个客户端同时要并发的写Hadoop HDFS上的一个文件,这个事儿能成吗?明显不可以接受啊,因为HDFS上的文件是不允许并发写的,比如并发的追加一些数据什么。所以HDFS里有一个机制,叫做文件契约机制。
也就是说,同一时间只能有一个客户端获取NameNode上面一个文件的契约,然后才可以向获取契约的文件写入数据。
此时如果其他客户端尝试获取文件契约的时候,就获取不到,只能干等着。
通过这个机制,可以保证同一时间只有一个客户端在写一个文件。
在获取到了文件契约之后,在写文件的过程期间,那个客户端需要开启一个线程,不停的发送请求给NameNode进行文件续约,告诉NameNode:
NameNode大哥,我还在写文件啊,你给我一直保留那个契约好吗?而NameNode内部有一个专门的后台线程,负责监控各个契约的续约时间。
如果某个契约很长时间没续约了,此时就自动过期掉这个契约,让别的客户端来写。
接下来我们可以围绕这几个问题去阅读源码,比如:
创建文件
创建契约
启动了DataStramer线程
开启了续约
契约的检查
创建packet
申请Block
建立数据管道
ResponseProcessor线程
PacketResponder线程
如果在写数据的过程出问题了?怎么对应的处理方式是什么呢?
在读源码,我们可以围绕这几个源码类去阅读查看,比如:
EditLogTailer、FSImage、FSImageLoader、FSDirAclOp
EditLogFileInputStream、GetJournalEditServlet
StandbyCheckpointer、TransferFsImage、FsckServlet
DistributedFileSystem、DFSOutputStream
FSNamesystem、LeaseManager
DFSOutputStream、LeaseRenewer、HdfsDataInputStream
FSOutputSummer、LocatedBlock、Sender
DataXceiver、DataXceiverServer、BlockReader、Receiver
比如,我们去阅读EditLogTailer源代码类,我们需要了解它是什么类,做了什么事情?
其实,EditLogTailer是一个后台线程,启动了以后会周期性的去journalnode集群上面去读取元数据日志,然后再把这些元数据日志应用到自己的元数据里面(内存+磁盘)
它还会加载当前自己的元数据日志,另通过StandByNamenoe 获取当前的元数据日志的最后一条日志的事务ID是多少
其实还有一个重要的代码,比如:需要去journlanode上面去读取元数据,由于现在的事务id是1000,所以需要到journlanode上面去读取
注意:读日志的时候,只需要去读取 1001后面的日志就可以。
另外还有一个参数是用来获取Journalnode获取日志的流:
streams = editLog.selectInputStreams(lastTxnId + 1, 0, null, false)
当它去Journalnode加载日志的代码逻辑用这样写的
editsLoaded = image.loadEdits(streams, namesystem);
还有一点需要说明一下,每隔60秒 StandByNameNode 会去Journalnode获取一下日志
我们继续阅读源代码EditLogFileInputStream类,它的log是URLLog,通过getInpustream()方法找到的。它这个源码代码类也用到设计模式叫做装饰者模式
它读取日志通过reader读取的
它还创建了HttpURLConnetcion,如果我们这儿发送的是HTTP的请求,读取的Journalndoe那儿的日志,说明journalndoe启动起来的时候肯定会有一个JournalnodeHttpServer
NameNode |
DataNode |
JournalNode |
NameNodeRpcServer |
RpcServer |
JournalnodeRpcServer |
NameNodeHttperServer |
Httpserver |
JournalnodeRpcServer |
public InputStream getInputStream() throws IOException {
return SecurityUtil.doAsCurrentUser(
new PrivilegedExceptionAction<InputStream>() {
public InputStream run() throws IOException {
HttpURLConnection connection;
try {
//NameNode: NameNodeRpcServer NameNodeHttperServer
//DataNode: RpcServer Httpserver
//JournalNode: JournalnodeRpcServer JournalnodeHttpserver
//TODO 真相大白,我们创建了一个HttpURLConnection对象
connection = (HttpURLConnection)
connectionFactory.openConnection(url, isSpnegoEnabled);
} catch (AuthenticationException e) {
throw new IOException(e);
}
它是通过对象来获取输入流
我们继续读源代码GetJournalEditServlet类,是通过journalndoe来读取数据流。
还有一个流对拷(editFileIn),这个输入流读取的是journalnode这儿的日志
TransferFsImage.copyFileToStream(response.getOutputStream(), editFile,
editFileIn, throttler);
我们继续阅读源代码StandbyCheckpointer类,需要理解StandbyCheckpointer 是一个运行在standBynamenode上的一个线程。他会周期性的对命名空间做checkpoint的操作(说白了就是把内存里面目录树的信息持久化到磁盘上面)并且会把这个份数据上传到active namenode(用来替换 active namednoe上面的fsimage)
Checkpointer的流程是这样的,我们会看到doCheckpoint(),点击进去查看
会把内存的数据写到磁盘上面,它的写方式使用的是异步线程
//开启了一个异步的线程
ExecutorService executor =
Executors.newSingleThreadExecutor(uploadThreadFactory);
Future<Void> upload = executor.submit(new Callable<Void>() {
public Void call() throws IOException {
//这个操作就要把刚刚从内存里面的元数据持持久化到磁盘上面的 那个份数据
//上传到 active的namenode上面去。
TransferFsImage.uploadImageFromStorage(activeNNAddress, conf,
namesystem.getFSImage().getStorage(), imageType, txid, canceler);
return null;
}
});
说明:有个uploadImageFromStorage方法,它会替换全量元数据,数据量很大,可能是几十g
由于是线程,所以肯定有run方法,另外它会做判断每隔60检查以下是否需要做checkpoint
我们还需要了解满足checkpoint两个条件。
第一,比如如:数量 10000,我们上一次checkpoint 现在最新的数据差了多少数据?或者说大概的意思就是说我们现在有多少条日志没有checkpoint了。
第二,当前时间 - 上一次checkpoint的时间。说白了这个变量代表的意思就是 已经有多久没有做checkpoint了。
final long now = monotonicNow();
//TODO checkpoint条件一 数量 10000
final long uncheckpointed = countUncheckpointedTxns();
//TODO checkpoint条件二
//当前时间 - 上一次checkpoint的时间。
final long secsSinceLast = (now - lastCheckpointTime) / 1000;
假如:
在条件一,如果距离上一次做checkpoint超过100万条日志没有做checkpoint,那么就需要做一次。
条件二:如果超过一个小时没有做checkpoint了,那么也需要做一次
当然,满足条件,它也会执行checkpoint
我们在读源码,发现一个问题:在HDFS驱动场景里,比如数据量比较大:
他就用httphttpserver,它是通过standBynamenode到journalnode做同步的日志操作;当然也可以通过standBYnamenode到activenamenode同步的日志操作,但是,这样可能有好几个g
在HDFS驱动场景里,比如数据量比较小,他就用rpcserver,它是通过datanode到namenode带过去的数据信息很小;还有另一种方式,可以通过namenode到journalnode。
阅读源代码TransferFsImage类,它是把输入流写到数输出流,发现
做一个上传image的操作,需要传参有6个参数:
uploadImage(url, conf, storage, nnf, txid, canceler);
它是通过http方式获取的流,不断读自己的数据
OutputStream output = connection.getOutputStream();
//输入流肯定是自己这儿的,不断读自己的数据
FileInputStream input = new FileInputStream(imageFile);
我们还需要了解流对烤,就是通过数据网output 输出流里面去写
try {
//一个流对烤把数据网output 输出流里面去写。
copyFileToStream(output, imageFile, input,
ImageServlet.getThrottler(conf), canceler);
} finally {
IOUtils.closeStream(input);
IOUtils.closeStream(output);
}
知其所以然、知其所以必然,知其然而不知其所以然;蒙惠者虽知其然,而未必知其所以然;也这是我们从学习实践中得出的深切体会!分享完毕,谢谢!