手把手带你源码解析HDFS文件上传之create创建过程
0)在pom.xml中增加如下依赖
<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs-client</artifactId><version>3.1.3</version><scope>provided</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency></dependencies>
DN向NN发起创建请求
用户自己写的代码
public void testPut2() throws IOException {FSDataOutputStream fos = fs.create(newPath("/input"));fos.write("helloworld".getBytes());}
FileSystem.java
public FSDataOutputStream create(Path f) throws IOException {return create(f, true);}public FSDataOutputStream create(Path f, booleanoverwrite)throws IOException {return create(f, overwrite,getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,IO_FILE_BUFFER_SIZE_DEFAULT),getDefaultReplication(f),getDefaultBlockSize(f));}public FSDataOutputStream create(Path f,boolean overwrite,int bufferSize,short replication,long blockSize) throws IOException {return create(f, overwrite, bufferSize, replication, blockSize,null);}public FSDataOutputStream create(Path f,boolean overwrite,int bufferSize,short replication,long blockSize,Progressable progress) throws IOException {return this.create(f, FsCreateModes.applyUMask(FsPermission.getFileDefault(),FsPermission.getUMask(getConf())),overwrite, bufferSize, replication,blockSize, progress);}public abstract FSDataOutputStream create(Path f,FsPermission permission,boolean overwrite,int bufferSize,short replication,long blockSize,Progressable progress) throwsIOException;
选中create,点击ctrl+h,找到实现类DistributedFileSystem.java,查找create方法。
DistributedFileSystem.java
public FSDataOutputStream create(Path f,FsPermission permission,boolean overwrite, int bufferSize, shortreplication, long blockSize,Progressable progress) throws IOException {return this.create(f, permission,overwrite ? EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE): EnumSet.of(CreateFlag.CREATE),bufferSize, replication,blockSize, progress, null);}public FSDataOutputStreamcreate(final Path f, finalFsPermission permission,final EnumSet<CreateFlag> cflags, finalint bufferSize,final short replication, final longblockSize,final Progressable progress, finalChecksumOpt checksumOpt)throws IOException {statistics.incrementWriteOps(1);storageStatistics.incrementOpCounter(OpType.CREATE);Path absF = fixRelativePart(f);return newFileSystemLinkResolver<FSDataOutputStream>() {public FSDataOutputStream doCall(final Path p) throws IOException {// 创建获取了一个输出流对象final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,cflags, replication,blockSize, progress, bufferSize,checksumOpt);// 这里将上面创建的dfsos进行包装并返回return dfs.createWrappedOutputStream(dfsos,statistics);}public FSDataOutputStream next(final FileSystem fs, final Path p)throws IOException {return fs.create(p, permission,cflags, bufferSize,replication, blockSize,progress, checksumOpt);}}.resolve(this, absF);}
点击create,进入DFSClient.java
public DFSOutputStream create(String src,FsPermission permission,EnumSet<CreateFlag> flag, short replication, long blockSize,Progressable progress, int buffersize, ChecksumOpt checksumOpt)throws IOException {return create(src, permission, flag, true,replication, blockSize, progress,buffersize, checksumOpt, null);}public DFSOutputStream create(String src,FsPermission permission,EnumSet<CreateFlag> flag, boolean createParent, short replication,long blockSize, Progressable progress, int buffersize,ChecksumOpt checksumOpt, InetSocketAddress[]favoredNodes)throws IOException {return create(src, permission, flag, createParent, replication,blockSize,progress, buffersize, checksumOpt,favoredNodes, null);}public DFSOutputStream create(String src,FsPermission permission,EnumSet<CreateFlag> flag, boolean createParent, short replication,long blockSize, Progressable progress, int buffersize,ChecksumOpt checksumOpt, InetSocketAddress[]favoredNodes,String ecPolicyName) throws IOException {checkOpen();final FsPermission masked =applyUMask(permission);LOG.debug("{}: masked={}", src,masked);final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,src, masked, flag, createParent,replication, blockSize, progress,dfsClientConf.createChecksum(checksumOpt),getFavoredNodesStr(favoredNodes),ecPolicyName);beginFileLease(result.getFileId(),result);return result;}
点击newStreamForCreate,进入DFSOutputStream.java
static DFSOutputStream newStreamForCreate(DFSClientdfsClient, String src,FsPermission masked,EnumSet<CreateFlag> flag, boolean createParent,short replication, long blockSize,Progressable progress,DataChecksum checksum, String[] favoredNodes,String ecPolicyName)throws IOException {try (TraceScope ignored =dfsClient.newPathTraceScope("newStreamForCreate",src)) {HdfsFileStatusstat = null;// Retry the create if we get a RetryStartFileException up to a maximum// number of timesboolean shouldRetry = true;int retryCount = CREATE_RETRY_COUNT;while (shouldRetry) {shouldRetry = false;try {// DN将创建请求发送给NN(RPC)stat = dfsClient.namenode.create(src, masked,dfsClient.clientName,new EnumSetWritable<>(flag),createParent, replication,blockSize, SUPPORTED_CRYPTO_VERSIONS,ecPolicyName);break;} catch (RemoteException re) {… ….}}Preconditions.checkNotNull(stat, "HdfsFileStatus should not benull!");final DFSOutputStream out;if(stat.getErasureCodingPolicy() != null) {out = newDFSStripedOutputStream(dfsClient, src, stat,flag, progress, checksum,favoredNodes);}else {out = newDFSOutputStream(dfsClient, src, stat,flag, progress, checksum,favoredNodes, true);}//开启线程run,DataStreamer extends Daemon extends Threadout.start();return out;}}
NN处理DN的创建请求
1)点击create
ClientProtocol.java
HdfsFileStatus create(String src, FsPermissionmasked,String clientName,EnumSetWritable<CreateFlag> flag,boolean createParent, shortreplication, long blockSize,CryptoProtocolVersion[]supportedVersions, String ecPolicyName)throws IOException;
2)Ctrl + h查找create实现类,点击NameNodeRpcServer,在NameNodeRpcServer.java中搜索create
NameNodeRpcServer.java
public HdfsFileStatuscreate(String src, FsPermissionmasked,String clientName,EnumSetWritable<CreateFlag> flag,boolean createParent, short replication,long blockSize,CryptoProtocolVersion[] supportedVersions,String ecPolicyName)throws IOException {// 检查NN启动checkNNStartup();... ...HdfsFileStatus status = null;try {PermissionStatus perm = newPermissionStatus(getRemoteUser().getShortUserName(), null, masked);// 重要status = namesystem.startFile(src, perm, clientName,clientMachine,flag.get(), createParent, replication,blockSize, supportedVersions,ecPolicyName, cacheEntry != null);} finally {RetryCache.setState(cacheEntry, status !=null, status);}metrics.incrFilesCreated();metrics.incrCreateFileOps();return status;}
FSNamesystem.java
HdfsFileStatus startFile(String src, PermissionStatuspermissions,String holder, String clientMachine,EnumSet<CreateFlag> flag,boolean createParent, short replication,long blockSize,CryptoProtocolVersion[] supportedVersions,String ecPolicyName,boolean logRetryCache) throws IOException {HdfsFileStatus status;try {status = startFileInt(src, permissions, holder, clientMachine, flag,createParent, replication, blockSize,supportedVersions, ecPolicyName,logRetryCache);} catch (AccessControlException e) {logAuditEvent(false, "create",src);throw e;}logAuditEvent(true, "create", src,status);return status;}private HdfsFileStatus startFileInt(String src,PermissionStatus permissions, Stringholder, String clientMachine,EnumSet<CreateFlag> flag, booleancreateParent, short replication,long blockSize, CryptoProtocolVersion[]supportedVersions,String ecPolicyName, boolean logRetryCache)throws IOException {... ...stat = FSDirWriteFileOp.startFile(this, iip, permissions,holder,clientMachine, flag, createParent,replication, blockSize, feInfo,toRemoveBlocks, shouldReplicate,ecPolicyName, logRetryCache);... ...}static HdfsFileStatus startFile(... ...)throws IOException {... ...FSDirectory fsd = fsn.getFSDirectory();// 文件路径是否存在校验if (iip.getLastINode() != null) {if (overwrite) {List<INode> toRemoveINodes = newChunkedArrayList<>();List<Long> toRemoveUCFiles = newChunkedArrayList<>();long ret = FSDirDeleteOp.delete(fsd, iip,toRemoveBlocks,toRemoveINodes, toRemoveUCFiles, now());if (ret >= 0) {iip = INodesInPath.replace(iip,iip.length() - 1, null);FSDirDeleteOp.incrDeletedFileCount(ret);fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);}} else {// If lease soft limit time is expired,recover the leasefsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip,src, holder,clientMachine, false);throw new FileAlreadyExistsException(src + " forclient " +clientMachine + " already exists");}}fsn.checkFsObjectLimit();INodeFile newNode = null;INodesInPath parent = FSDirMkdirOp.createAncestorDirectories(fsd,iip, permissions);if (parent != null) {// 添加文件元数据信息iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,replication, blockSize, holder,clientMachine, shouldReplicate,ecPolicyName);newNode = iip != null ?iip.getLastINode().asFile() : null;}... ...setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist);fsd.getEditLog().logOpenFile(src, newNode,overwrite, logRetryEntry);if (NameNode.stateChangeLog.isDebugEnabled()){NameNode.stateChangeLog.debug("DIR*NameSystem.startFile: added " +src + " inode " +newNode.getId() + " " + holder);}return FSDirStatAndListingOp.getFileInfo(fsd,iip, false, false);}private static INodesInPath addFile(FSDirectory fsd, INodesInPath existing,byte[] localName,PermissionStatus permissions, shortreplication, long preferredBlockSize,String clientName, String clientMachine,boolean shouldReplicate,String ecPolicyName) throws IOException {Preconditions.checkNotNull(existing);long modTime = now();INodesInPath newiip;fsd.writeLock();try {… …newiip = fsd.addINode(existing, newNode, permissions.getPermission());} finally {fsd.writeUnlock();}... ...return newiip;}INodesInPath addINode(INodesInPath existing, INodechild,FsPermission modes)throws QuotaExceededException,UnresolvedLinkException {cacheName(child);writeLock();try {// 将数据写入到INode的目录树中return addLastINode(existing, child, modes, true);} finally {writeUnlock();}}
DataStreamer启动流程
NN处理完DN请求后,再次回到DN端,启动对应的线程
DFSOutputStream.java
static DFSOutputStream newStreamForCreate(DFSClientdfsClient, String src,FsPermission masked,EnumSet<CreateFlag> flag, boolean createParent,short replication, long blockSize,Progressable progress,DataChecksum checksum, String[] favoredNodes,String ecPolicyName)throws IOException {... ...// DN将创建请求发送给NN(RPC)stat = dfsClient.namenode.create(src,masked, dfsClient.clientName,new EnumSetWritable<>(flag), createParent, replication,blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);... ...// 创建输出流out = new DFSOutputStream(dfsClient, src, stat,flag, progress, checksum,favoredNodes, true);// 开启线程run,DataStreamer extends Daemon extends Threadout.start();return out;}
点击DFSOutputStream
protected DFSOutputStream(DFSClient dfsClient, String src,HdfsFileStatus stat,EnumSet<CreateFlag> flag, Progressable progress,DataChecksum checksum, String[]favoredNodes, boolean createStreamer) {this(dfsClient, src, flag, progress, stat,checksum);this.shouldSyncBlock =flag.contains(CreateFlag.SYNC_BLOCK);// Directory => File => Block(128M)=> packet(64K) => chunk(chunk 512byte +chunksum 4byte)computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),bytesPerChecksum);if (createStreamer) {streamer = new DataStreamer(stat, null,dfsClient, src, progress,checksum, cachingStrategy, byteArrayManager,favoredNodes,addBlockFlags);}}
点击newStreamForCreate方法中的out.start(),进入DFSOutputStream.java
protected synchronized void start() {getStreamer().start();}protected DataStreamer getStreamer() {return streamer;}
点击DataStreamer,进入DataStreamer.java
class DataStreamer extends Daemon {。。。。。。}
点击Daemon,进入Daemon.java
public class Daemon extends Thread {。。。。。。}
说明:out.start();实际是开启线程,点击DataStreamer,搜索run方法
DataStreamer.java
public void run(){long lastPacket = Time.monotonicNow();TraceScope scope = null;while (!streamerClosed &&dfsClient.clientRunning) {// if the Responder encountered an error, shutdown Responderif (errorState.hasError()) {closeResponder();}DFSPacketone;try {// process datanode IO errors ifanyboolean doSleep =processDatanodeOrExternalError();final int halfSocketTimeout =dfsClient.getConf().getSocketTimeout()/2;synchronized (dataQueue) {// wait for a packet to be sent.… …try {// 如果dataQueue里面没有数据,代码会阻塞在这儿dataQueue.wait(timeout);} catch(InterruptedException e) {LOG.warn("Caught exception", e);}doSleep = false;now = Time.monotonicNow();}… …// 队列不为空,从队列中取出packetone = dataQueue.getFirst();// regular data packetSpanId[] parents =one.getTraceParents();if (parents.length > 0){scope = dfsClient.getTracer().newScope("dataStreamer",parents[0]);scope.getSpan().setParents(parents);}}}… …}
B站|大数据那些事
想获取更多更全资料
扫码加好友入群
欢迎各位大佬加入开源共享
共同面对大数据领域疑难问题
来稿请投邮箱:[email protected]
