vlambda博客
学习文章列表

手把手带你源码解析HDFS文件上传之create创建过程

点击上方蓝字
关注我吧

HDFS的写数据流程,如下图所示:

手把手带你源码解析HDFS文件上传之create创建过程

HDFS上传源码解析如下图所示:

手把手带你源码解析HDFS文件上传之create创建过程


0)在pom.xml中增加如下依赖

<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.3</version> </dependency>  <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.1.3</version> </dependency>  <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs-client</artifactId> <version>3.1.3</version> <scope>provided</scope> </dependency>  <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.30</version> </dependency></dependencies>

1

DN向NN发起创建请求


用户自己写的代码

@Testpublic void testPut2() throws IOException { FSDataOutputStream fos = fs.create(newPath("/input"));  fos.write("helloworld".getBytes());}


FileSystem.java

public FSDataOutputStream create(Path f) throws IOException { return create(f, true);} public FSDataOutputStream create(Path f, booleanoverwrite) throws IOException {  return create(f, overwrite, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT), getDefaultReplication(f), getDefaultBlockSize(f));} public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException {  return create(f, overwrite, bufferSize, replication, blockSize,null);} public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress ) throws IOException {  return this.create(f, FsCreateModes.applyUMask( FsPermission.getFileDefault(),FsPermission.getUMask(getConf())), overwrite, bufferSize, replication,blockSize, progress);} public abstract FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throwsIOException;


选中create,点击ctrl+h,找到实现类DistributedFileSystem.java,查找create方法。

DistributedFileSystem.java

@Overridepublic FSDataOutputStream create(Path f,FsPermission permission, boolean overwrite, int bufferSize, shortreplication, long blockSize, Progressable progress) throws IOException {  return this.create(f, permission, overwrite ? EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE),bufferSize, replication, blockSize, progress, null);} @Overridepublic FSDataOutputStreamcreate(final Path f, finalFsPermission permission, final EnumSet<CreateFlag> cflags, finalint bufferSize, final short replication, final longblockSize, final Progressable progress, finalChecksumOpt checksumOpt) throws IOException {  statistics.incrementWriteOps(1); storageStatistics.incrementOpCounter(OpType.CREATE); Path absF = fixRelativePart(f);  return newFileSystemLinkResolver<FSDataOutputStream>() {  @Override public FSDataOutputStream doCall(final Path p) throws IOException {  // 创建获取了一个输出流对象 final DFSOutputStream dfsos = dfs.create(getPathName(p), permission, cflags, replication,blockSize, progress, bufferSize, checksumOpt); // 这里将上面创建的dfsos进行包装并返回 return dfs.createWrappedOutputStream(dfsos,statistics); }  @Override public FSDataOutputStream next(final FileSystem fs, final Path p) throws IOException { return fs.create(p, permission,cflags, bufferSize, replication, blockSize,progress, checksumOpt); } }.resolve(this, absF);}


点击create,进入DFSClient.java

public DFSOutputStream create(String src,FsPermission permission, EnumSet<CreateFlag> flag, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt) throws IOException {  return create(src, permission, flag, true, replication, blockSize, progress,buffersize, checksumOpt, null);} public DFSOutputStream create(String src,FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[]favoredNodes) throws IOException {  return create(src, permission, flag, createParent, replication,blockSize, progress, buffersize, checksumOpt,favoredNodes, null);} public DFSOutputStream create(String src,FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[]favoredNodes, String ecPolicyName) throws IOException {  checkOpen();  final FsPermission masked =applyUMask(permission); LOG.debug("{}: masked={}", src,masked);  final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent,replication, blockSize, progress, dfsClientConf.createChecksum(checksumOpt), getFavoredNodesStr(favoredNodes),ecPolicyName);  beginFileLease(result.getFileId(),result);  return result;}


点击newStreamForCreate,进入DFSOutputStream.java

static DFSOutputStream newStreamForCreate(DFSClientdfsClient, String src, FsPermission masked,EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize,Progressable progress, DataChecksum checksum, String[] favoredNodes,String ecPolicyName) throws IOException {  try (TraceScope ignored = dfsClient.newPathTraceScope("newStreamForCreate",src)) { HdfsFileStatusstat = null;  // Retry the create if we get a RetryStartFileException up to a maximum // number of times boolean shouldRetry = true; int retryCount = CREATE_RETRY_COUNT;  while (shouldRetry) { shouldRetry = false; try { // DN将创建请求发送给NN(RPC) stat = dfsClient.namenode.create(src, masked,dfsClient.clientName, new EnumSetWritable<>(flag),createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS,ecPolicyName); break; } catch (RemoteException re) { … …. } } Preconditions.checkNotNull(stat, "HdfsFileStatus should not benull!"); final DFSOutputStream out;  if(stat.getErasureCodingPolicy() != null) { out = newDFSStripedOutputStream(dfsClient, src, stat, flag, progress, checksum,favoredNodes); }else { out = newDFSOutputStream(dfsClient, src, stat, flag, progress, checksum,favoredNodes, true); }  //开启线程run,DataStreamer extends Daemon extends Thread out.start();  return out; }}

2

NN处理DN的创建请求


1)点击create

ClientProtocol.java

HdfsFileStatus create(String src, FsPermissionmasked, String clientName,EnumSetWritable<CreateFlag> flag, boolean createParent, shortreplication, long blockSize, CryptoProtocolVersion[]supportedVersions, String ecPolicyName) throws IOException;


2)Ctrl + h查找create实现类,点击NameNodeRpcServer,在NameNodeRpcServer.java中搜索create

NameNodeRpcServer.java

public HdfsFileStatuscreate(String src, FsPermissionmasked, String clientName,EnumSetWritable<CreateFlag> flag, boolean createParent, short replication,long blockSize, CryptoProtocolVersion[] supportedVersions,String ecPolicyName) throws IOException { // 检查NN启动 checkNNStartup(); ... ...  HdfsFileStatus status = null; try { PermissionStatus perm = newPermissionStatus(getRemoteUser() .getShortUserName(), null, masked); // 重要 status = namesystem.startFile(src, perm, clientName,clientMachine, flag.get(), createParent, replication,blockSize, supportedVersions, ecPolicyName, cacheEntry != null); } finally { RetryCache.setState(cacheEntry, status !=null, status); }  metrics.incrFilesCreated(); metrics.incrCreateFileOps(); return status;}


FSNamesystem.java

HdfsFileStatus startFile(String src, PermissionStatuspermissions, String holder, String clientMachine,EnumSet<CreateFlag> flag, boolean createParent, short replication,long blockSize, CryptoProtocolVersion[] supportedVersions,String ecPolicyName, boolean logRetryCache) throws IOException {  HdfsFileStatus status; try { status = startFileInt(src, permissions, holder, clientMachine, flag, createParent, replication, blockSize,supportedVersions, ecPolicyName, logRetryCache); } catch (AccessControlException e) { logAuditEvent(false, "create",src); throw e; } logAuditEvent(true, "create", src,status); return status;} private HdfsFileStatus startFileInt(String src, PermissionStatus permissions, Stringholder, String clientMachine, EnumSet<CreateFlag> flag, booleancreateParent, short replication, long blockSize, CryptoProtocolVersion[]supportedVersions, String ecPolicyName, boolean logRetryCache)throws IOException {  ... ... stat = FSDirWriteFileOp.startFile(this, iip, permissions,holder, clientMachine, flag, createParent,replication, blockSize, feInfo, toRemoveBlocks, shouldReplicate,ecPolicyName, logRetryCache); ... ...} static HdfsFileStatus startFile( ... ...) throws IOException {  ... ... FSDirectory fsd = fsn.getFSDirectory();  // 文件路径是否存在校验 if (iip.getLastINode() != null) { if (overwrite) { List<INode> toRemoveINodes = newChunkedArrayList<>(); List<Long> toRemoveUCFiles = newChunkedArrayList<>(); long ret = FSDirDeleteOp.delete(fsd, iip,toRemoveBlocks, toRemoveINodes, toRemoveUCFiles, now()); if (ret >= 0) { iip = INodesInPath.replace(iip,iip.length() - 1, null); FSDirDeleteOp.incrDeletedFileCount(ret); fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true); } } else { // If lease soft limit time is expired,recover the lease fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip, src, holder,clientMachine, false); throw new FileAlreadyExistsException(src + " forclient " + clientMachine + " already exists"); } } fsn.checkFsObjectLimit(); INodeFile newNode = null; INodesInPath parent = FSDirMkdirOp.createAncestorDirectories(fsd,iip, permissions); if (parent != null) { // 添加文件元数据信息 iip = addFile(fsd, parent, iip.getLastLocalName(), permissions, replication, blockSize, holder,clientMachine, shouldReplicate, ecPolicyName); newNode = iip != null ?iip.getLastINode().asFile() : null; } ... ... setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist); fsd.getEditLog().logOpenFile(src, newNode,overwrite, logRetryEntry); if (NameNode.stateChangeLog.isDebugEnabled()){ NameNode.stateChangeLog.debug("DIR*NameSystem.startFile: added " + src + " inode " +newNode.getId() + " " + holder); } return FSDirStatAndListingOp.getFileInfo(fsd,iip, false, false);} private static INodesInPath addFile( FSDirectory fsd, INodesInPath existing,byte[] localName, PermissionStatus permissions, shortreplication, long preferredBlockSize, String clientName, String clientMachine,boolean shouldReplicate, String ecPolicyName) throws IOException {  Preconditions.checkNotNull(existing); long modTime = now(); INodesInPath newiip; fsd.writeLock(); try { … …  newiip = fsd.addINode(existing, newNode, permissions.getPermission()); } finally { fsd.writeUnlock(); } ... ... return newiip;} INodesInPath addINode(INodesInPath existing, INodechild, FsPermission modes) throws QuotaExceededException,UnresolvedLinkException { cacheName(child); writeLock(); try { // 将数据写入到INode的目录树中 return addLastINode(existing, child, modes, true); } finally { writeUnlock(); }}

3

DataStreamer启动流程


NN处理完DN请求后,再次回到DN端,启动对应的线程

DFSOutputStream.java

static DFSOutputStream newStreamForCreate(DFSClientdfsClient, String src, FsPermission masked,EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize,Progressable progress, DataChecksum checksum, String[] favoredNodes,String ecPolicyName) throws IOException { ... ... // DN将创建请求发送给NN(RPC) stat = dfsClient.namenode.create(src,masked, dfsClient.clientName, new EnumSetWritable<>(flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName); ... ...  // 创建输出流 out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum,favoredNodes, true); // 开启线程run,DataStreamer extends Daemon extends Thread out.start();  return out;}


点击DFSOutputStream

protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,EnumSet<CreateFlag> flag, Progressable progress, DataChecksum checksum, String[]favoredNodes, boolean createStreamer) { this(dfsClient, src, flag, progress, stat,checksum); this.shouldSyncBlock =flag.contains(CreateFlag.SYNC_BLOCK);  // Directory => File => Block(128M)=> packet(64K) => chunk(chunk 512byte +chunksum 4byte) computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum);  if (createStreamer) { streamer = new DataStreamer(stat, null,dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,favoredNodes, addBlockFlags); }}


点击newStreamForCreate方法中的out.start(),进入DFSOutputStream.java

protected synchronized void start() { getStreamer().start();} protected DataStreamer getStreamer() { return streamer;}


点击DataStreamer,进入DataStreamer.java

class DataStreamer extends Daemon { 。。。。。。}


点击Daemon,进入Daemon.java

public class Daemon extends Thread { 。。。。。。}


说明:out.start();实际是开启线程,点击DataStreamer,搜索run方法

DataStreamer.java

@Overridepublic void run(){  long lastPacket = Time.monotonicNow(); TraceScope scope = null; while (!streamerClosed &&dfsClient.clientRunning) { // if the Responder encountered an error, shutdown Responder if (errorState.hasError()) { closeResponder(); }  DFSPacketone; try { // process datanode IO errors ifany boolean doSleep =processDatanodeOrExternalError();  final int halfSocketTimeout =dfsClient.getConf().getSocketTimeout()/2; synchronized (dataQueue) { // wait for a packet to be sent. … … try { // 如果dataQueue里面没有数据,代码会阻塞在这儿 dataQueue.wait(timeout); } catch(InterruptedException e) { LOG.warn("Caught exception", e); } doSleep = false; now = Time.monotonicNow(); } … … // 队列不为空,从队列中取出packet one = dataQueue.getFirst();// regular data packet SpanId[] parents =one.getTraceParents(); if (parents.length > 0){ scope = dfsClient.getTracer(). newScope("dataStreamer",parents[0]); scope.getSpan().setParents(parents); } } } … …}


手把手带你源码解析HDFS文件上传之create创建过程

手把手带你源码解析HDFS文件上传之create创建过程

B站|大数据那些事


想获取更多更全资料

扫码加好友入群

欢迎各位大佬加入开源共享

共同面对大数据领域疑难问题

来稿请投邮箱:[email protected]