手把手带你源码解析HDFS文件上传之create创建过程
0)在pom.xml中增加如下依赖
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>3.1.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
DN向NN发起创建请求
用户自己写的代码
public void testPut2() throws IOException {
FSDataOutputStream fos = fs.create(newPath("/input"));
fos.write("helloworld".getBytes());
}
FileSystem.java
public FSDataOutputStream create(Path f) throws IOException {
return create(f, true);
}
public FSDataOutputStream create(Path f, booleanoverwrite)
throws IOException {
return create(f, overwrite,
getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT),
getDefaultReplication(f),
getDefaultBlockSize(f));
}
public FSDataOutputStream create(Path f,
boolean overwrite,
int bufferSize,
short replication,
long blockSize) throws IOException {
return create(f, overwrite, bufferSize, replication, blockSize,null);
}
public FSDataOutputStream create(Path f,
boolean overwrite,
int bufferSize,
short replication,
long blockSize,
Progressable progress
) throws IOException {
return this.create(f, FsCreateModes.applyUMask(
FsPermission.getFileDefault(),FsPermission.getUMask(getConf())),
overwrite, bufferSize, replication,blockSize, progress);
}
public abstract FSDataOutputStream create(Path f,
FsPermission permission,
boolean overwrite,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throwsIOException;
选中create,点击ctrl+h,找到实现类DistributedFileSystem.java,查找create方法。
DistributedFileSystem.java
public FSDataOutputStream create(Path f,FsPermission permission,
boolean overwrite, int bufferSize, shortreplication, long blockSize,
Progressable progress) throws IOException {
return this.create(f, permission,
overwrite ? EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE)
: EnumSet.of(CreateFlag.CREATE),bufferSize, replication,
blockSize, progress, null);
}
public FSDataOutputStreamcreate(final Path f, finalFsPermission permission,
final EnumSet<CreateFlag> cflags, finalint bufferSize,
final short replication, final longblockSize,
final Progressable progress, finalChecksumOpt checksumOpt)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE);
Path absF = fixRelativePart(f);
return newFileSystemLinkResolver<FSDataOutputStream>() {
public FSDataOutputStream doCall(final Path p) throws IOException {
// 创建获取了一个输出流对象
final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
cflags, replication,blockSize, progress, bufferSize,
checksumOpt);
// 这里将上面创建的dfsos进行包装并返回
return dfs.createWrappedOutputStream(dfsos,statistics);
}
public FSDataOutputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.create(p, permission,cflags, bufferSize,
replication, blockSize,progress, checksumOpt);
}
}.resolve(this, absF);
}
点击create,进入DFSClient.java
public DFSOutputStream create(String src,FsPermission permission,
EnumSet<CreateFlag> flag, short replication, long blockSize,
Progressable progress, int buffersize, ChecksumOpt checksumOpt)
throws IOException {
return create(src, permission, flag, true,
replication, blockSize, progress,buffersize, checksumOpt, null);
}
public DFSOutputStream create(String src,FsPermission permission,
EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize, Progressable progress, int buffersize,
ChecksumOpt checksumOpt, InetSocketAddress[]favoredNodes)
throws IOException {
return create(src, permission, flag, createParent, replication,blockSize,
progress, buffersize, checksumOpt,favoredNodes, null);
}
public DFSOutputStream create(String src,FsPermission permission,
EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize, Progressable progress, int buffersize,
ChecksumOpt checksumOpt, InetSocketAddress[]favoredNodes,
String ecPolicyName) throws IOException {
checkOpen();
final FsPermission masked =applyUMask(permission);
LOG.debug("{}: masked={}", src,masked);
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
src, masked, flag, createParent,replication, blockSize, progress,
dfsClientConf.createChecksum(checksumOpt),
getFavoredNodesStr(favoredNodes),ecPolicyName);
beginFileLease(result.getFileId(),result);
return result;
}
点击newStreamForCreate,进入DFSOutputStream.java
static DFSOutputStream newStreamForCreate(DFSClientdfsClient, String src,
FsPermission masked,EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize,Progressable progress,
DataChecksum checksum, String[] favoredNodes,String ecPolicyName)
throws IOException {
try (TraceScope ignored =
dfsClient.newPathTraceScope("newStreamForCreate",src)) {
HdfsFileStatusstat = null;
// Retry the create if we get a RetryStartFileException up to a maximum
// number of times
boolean shouldRetry = true;
int retryCount = CREATE_RETRY_COUNT;
while (shouldRetry) {
shouldRetry = false;
try {
// DN将创建请求发送给NN(RPC)
stat = dfsClient.namenode.create(src, masked,dfsClient.clientName,
new EnumSetWritable<>(flag),createParent, replication,
blockSize, SUPPORTED_CRYPTO_VERSIONS,ecPolicyName);
break;
} catch (RemoteException re) {
… ….
}
}
Preconditions.checkNotNull(stat, "HdfsFileStatus should not benull!");
final DFSOutputStream out;
if(stat.getErasureCodingPolicy() != null) {
out = newDFSStripedOutputStream(dfsClient, src, stat,
flag, progress, checksum,favoredNodes);
}else {
out = newDFSOutputStream(dfsClient, src, stat,
flag, progress, checksum,favoredNodes, true);
}
//开启线程run,DataStreamer extends Daemon extends Thread
out.start();
return out;
}
}
NN处理DN的创建请求
1)点击create
ClientProtocol.java
HdfsFileStatus create(String src, FsPermissionmasked,
String clientName,EnumSetWritable<CreateFlag> flag,
boolean createParent, shortreplication, long blockSize,
CryptoProtocolVersion[]supportedVersions, String ecPolicyName)
throws IOException;
2)Ctrl + h查找create实现类,点击NameNodeRpcServer,在NameNodeRpcServer.java中搜索create
NameNodeRpcServer.java
public HdfsFileStatuscreate(String src, FsPermissionmasked,
String clientName,EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication,long blockSize,
CryptoProtocolVersion[] supportedVersions,String ecPolicyName)
throws IOException {
// 检查NN启动
checkNNStartup();
... ...
HdfsFileStatus status = null;
try {
PermissionStatus perm = newPermissionStatus(getRemoteUser()
.getShortUserName(), null, masked);
// 重要
status = namesystem.startFile(src, perm, clientName,clientMachine,
flag.get(), createParent, replication,blockSize, supportedVersions,
ecPolicyName, cacheEntry != null);
} finally {
RetryCache.setState(cacheEntry, status !=null, status);
}
metrics.incrFilesCreated();
metrics.incrCreateFileOps();
return status;
}
FSNamesystem.java
HdfsFileStatus startFile(String src, PermissionStatuspermissions,
String holder, String clientMachine,EnumSet<CreateFlag> flag,
boolean createParent, short replication,long blockSize,
CryptoProtocolVersion[] supportedVersions,String ecPolicyName,
boolean logRetryCache) throws IOException {
HdfsFileStatus status;
try {
status = startFileInt(src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize,supportedVersions, ecPolicyName,
logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "create",src);
throw e;
}
logAuditEvent(true, "create", src,status);
return status;
}
private HdfsFileStatus startFileInt(String src,
PermissionStatus permissions, Stringholder, String clientMachine,
EnumSet<CreateFlag> flag, booleancreateParent, short replication,
long blockSize, CryptoProtocolVersion[]supportedVersions,
String ecPolicyName, boolean logRetryCache)throws IOException {
... ...
stat = FSDirWriteFileOp.startFile(this, iip, permissions,holder,
clientMachine, flag, createParent,replication, blockSize, feInfo,
toRemoveBlocks, shouldReplicate,ecPolicyName, logRetryCache);
... ...
}
static HdfsFileStatus startFile(
... ...)
throws IOException {
... ...
FSDirectory fsd = fsn.getFSDirectory();
// 文件路径是否存在校验
if (iip.getLastINode() != null) {
if (overwrite) {
List<INode> toRemoveINodes = newChunkedArrayList<>();
List<Long> toRemoveUCFiles = newChunkedArrayList<>();
long ret = FSDirDeleteOp.delete(fsd, iip,toRemoveBlocks,
toRemoveINodes, toRemoveUCFiles, now());
if (ret >= 0) {
iip = INodesInPath.replace(iip,iip.length() - 1, null);
FSDirDeleteOp.incrDeletedFileCount(ret);
fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
}
} else {
// If lease soft limit time is expired,recover the lease
fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip,
src, holder,clientMachine, false);
throw new FileAlreadyExistsException(src + " forclient " +
clientMachine + " already exists");
}
}
fsn.checkFsObjectLimit();
INodeFile newNode = null;
INodesInPath parent = FSDirMkdirOp.createAncestorDirectories(fsd,iip, permissions);
if (parent != null) {
// 添加文件元数据信息
iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
replication, blockSize, holder,clientMachine, shouldReplicate,
ecPolicyName);
newNode = iip != null ?iip.getLastINode().asFile() : null;
}
... ...
setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist);
fsd.getEditLog().logOpenFile(src, newNode,overwrite, logRetryEntry);
if (NameNode.stateChangeLog.isDebugEnabled()){
NameNode.stateChangeLog.debug("DIR*NameSystem.startFile: added " +
src + " inode " +newNode.getId() + " " + holder);
}
return FSDirStatAndListingOp.getFileInfo(fsd,iip, false, false);
}
private static INodesInPath addFile(
FSDirectory fsd, INodesInPath existing,byte[] localName,
PermissionStatus permissions, shortreplication, long preferredBlockSize,
String clientName, String clientMachine,boolean shouldReplicate,
String ecPolicyName) throws IOException {
Preconditions.checkNotNull(existing);
long modTime = now();
INodesInPath newiip;
fsd.writeLock();
try {
… …
newiip = fsd.addINode(existing, newNode, permissions.getPermission());
} finally {
fsd.writeUnlock();
}
... ...
return newiip;
}
INodesInPath addINode(INodesInPath existing, INodechild,
FsPermission modes)
throws QuotaExceededException,UnresolvedLinkException {
cacheName(child);
writeLock();
try {
// 将数据写入到INode的目录树中
return addLastINode(existing, child, modes, true);
} finally {
writeUnlock();
}
}
DataStreamer启动流程
NN处理完DN请求后,再次回到DN端,启动对应的线程
DFSOutputStream.java
static DFSOutputStream newStreamForCreate(DFSClientdfsClient, String src,
FsPermission masked,EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize,Progressable progress,
DataChecksum checksum, String[] favoredNodes,String ecPolicyName)
throws IOException {
... ...
// DN将创建请求发送给NN(RPC)
stat = dfsClient.namenode.create(src,masked, dfsClient.clientName,
new EnumSetWritable<>(flag), createParent, replication,
blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
... ...
// 创建输出流
out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum,favoredNodes, true);
// 开启线程run,DataStreamer extends Daemon extends Thread
out.start();
return out;
}
点击DFSOutputStream
protected DFSOutputStream(DFSClient dfsClient, String src,
HdfsFileStatus stat,EnumSet<CreateFlag> flag, Progressable progress,
DataChecksum checksum, String[]favoredNodes, boolean createStreamer) {
this(dfsClient, src, flag, progress, stat,checksum);
this.shouldSyncBlock =flag.contains(CreateFlag.SYNC_BLOCK);
// Directory => File => Block(128M)=> packet(64K) => chunk(chunk 512byte +chunksum 4byte)
computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
bytesPerChecksum);
if (createStreamer) {
streamer = new DataStreamer(stat, null,dfsClient, src, progress,
checksum, cachingStrategy, byteArrayManager,favoredNodes,
addBlockFlags);
}
}
点击newStreamForCreate方法中的out.start(),进入DFSOutputStream.java
protected synchronized void start() {
getStreamer().start();
}
protected DataStreamer getStreamer() {
return streamer;
}
点击DataStreamer,进入DataStreamer.java
class DataStreamer extends Daemon {
。。。。。。
}
点击Daemon,进入Daemon.java
public class Daemon extends Thread {
。。。。。。
}
说明:out.start();实际是开启线程,点击DataStreamer,搜索run方法
DataStreamer.java
public void run(){
long lastPacket = Time.monotonicNow();
TraceScope scope = null;
while (!streamerClosed &&dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder
if (errorState.hasError()) {
closeResponder();
}
DFSPacketone;
try {
// process datanode IO errors ifany
boolean doSleep =processDatanodeOrExternalError();
final int halfSocketTimeout =dfsClient.getConf().getSocketTimeout()/2;
synchronized (dataQueue) {
// wait for a packet to be sent.
… …
try {
// 如果dataQueue里面没有数据,代码会阻塞在这儿
dataQueue.wait(timeout);
} catch(InterruptedException e) {
LOG.warn("Caught exception", e);
}
doSleep = false;
now = Time.monotonicNow();
}
… …
// 队列不为空,从队列中取出packet
one = dataQueue.getFirst();// regular data packet
SpanId[] parents =one.getTraceParents();
if (parents.length > 0){
scope = dfsClient.getTracer().
newScope("dataStreamer",parents[0]);
scope.getSpan().setParents(parents);
}
}
}
… …
}
B站|大数据那些事
想获取更多更全资料
扫码加好友入群
欢迎各位大佬加入开源共享
共同面对大数据领域疑难问题
来稿请投邮箱:[email protected]