vlambda博客
学习文章列表

HDFS写数据流程话你知

数亦有术
乾坤未定,你我皆黑马,分享大数据领域相关的内容,大数据主流技术应用和剖析,大数据架构,以及个人的职业发展,共同进步,互相吹捧
1篇原创内容
Official Account

作者 | 吴邪   大数据4年从业经验,目前就职于广州一家互联网公司,负责大数据基础平台自研、离线计算&实时计算研究


本文字数7800+,其中90%是代码,采用渐进式分析源码的方法,阅读时间大概需要15-20分钟


HDFS写数据流程话你知



前面几篇文章分享了HDFS NameNode和DataNode的初始化流程以及元数据管理流程,从HDFS的功能层面上来讲,主要的功能点我们都说到了,那么HDFS最重要的功能就是存储数据,即如何写读数据是HDFS最核心的功能点,本篇文章我们会就HDFS写数据的流程进行剖析


HDFS写数据流程
HDFS写数据流程话你知
图1:写数据流程图

 

  1. 客户端发起写文件请求,调用DistributedFileSystem(FileSystem的实现类)的create()方法。

  2. 通过NameNodeRPC与NameNode建立通信,首先会判断文件的用户权限以及文件是否已经存在,还有文件是否存在父目录,返回是否可以上传的标识和要上传的数据块。

  3. 客户端开始向FSDataOutputStream发送写数据请求,FSDataOutputStream会将数据切分成packet放入数据队列中。

  4. 客户端将NameNode返回的可用于写入的DataNode信息列表和block数据块按照就近的策略写入第一个DataNode,DataNode会根据HDFS的副本策略在其他DataNode直接进行数据复制。

  5. DataNode写完数据之后会返回一个确认响应,FSDataOutputStream收集完DataNode的响应之后会清空用于接收响应的数据队列,用于下一次接收信息。

  6. 写完数据之后调用客户端的close()方法,关闭数据流。

  7. 然后客户端将完成数据写入的信息通知NameNode。

 

以上就是HDFS写数据的整体流程,下面开始从源码层面进行剖析。
 
写数据

HDFS写数据流程话你知

图1



可以看到FileSystem是HDFS文件交互系统的抽象类,底层有很多不同的实现类,包括WebHdfsFileSystem、S3FileSystem、HttpFSFileSystem、AzureFileSystem等等。从HDFS写数据的流程图可以知道第一步就是先定位到create()方法。


找到FileSystem这个类的create()方法,可以看到只有一行代码,而且返回的对象就是FSDataOutputStream输出流,所以下一步找实现类,这也是我们阅读源码的一种好习惯。


/** * Create an FSDataOutputStream at the indicated Path. * Files are overwritten by default. * @param f the file to create */public FSDataOutputStream create(Path f) throws IOException { //定位就是这个方法 return create(f, true);}
/** * Create an FSDataOutputStream at the indicated Path. * @param f the file to create * @param overwrite if a file with this name already exists, then if true, * the file will be overwritten, and if false an exception will be thrown. */public FSDataOutputStream create(Path f, boolean overwrite) throws IOException { //指定要创建的文件,如果已存在则覆盖掉,设置文件缓存大小为4M,获取副本策略以及默认数据块的大小 return create(f, overwrite, getConf().getInt("io.file.buffer.size", 4096), getDefaultReplication(f), getDefaultBlockSize(f));}
/** * Create an FSDataOutputStream at the indicated Path. * @param f the file name to open * @param overwrite if a file with this name already exists, then if true, * the file will be overwritten, and if false an error will be thrown. * @param bufferSize the size of the buffer to be used. * @param replication required block replication for the file. */public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize ) throws IOException { //只有一行,那就是它了 return create(f, overwrite, bufferSize, replication, blockSize, null);}




在这里提一下HDFS代码的方法其实大部分写的很简洁,不像一些方法写了几百行,实际上阅读起来是非常费劲的,但是HDFS则会将不同的逻辑层层抽取,有点像套娃,使得代码的可读性比较好。

找到FileSystem的实现类DistributedFileSystem的create()方法。


@Overridepublic FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { //巧了,又是一行,是不是很像套娃,一层套一层 //写数据的时候会判断文件是否存在,是否覆盖文件,如果overwrite为false则执行CreateFlag.CREATE,反之为true则执行CreateFlag.OVERWRITE return this.create(f, permission, overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE), bufferSize, replication, blockSize, progress, null);}
@Overridepublic FSDataOutputStream create(final Path f, final FsPermission permission, final EnumSet<CreateFlag> cflags, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt) throws IOException { statistics.incrementWriteOps(1); Path absF = fixRelativePart(f); return new FileSystemLinkResolver<FSDataOutputStream>() { @Override public FSDataOutputStream doCall(final Path p) throws IOException, UnresolvedLinkException { //创建了一个DFSOutputStream,进行初始化操作 /** * 往文件目录树里面添加了INodeFile * 添加了契约管理 * 启动了DataStreamer,写数据核心服务 */ final DFSOutputStream dfsos = dfs.create(getPathName(p), permission, cflags, replication, blockSize, progress, bufferSize, checksumOpt); return dfs.createWrappedOutputStream(dfsos, statistics); } @Override public FSDataOutputStream next(final FileSystem fs, final Path p) throws IOException { return fs.create(p, permission, cflags, bufferSize, replication, blockSize, progress, checksumOpt); } }.resolve(this, absF);}

分析dfs.create(.....)方法里面执行了什么操作,在这里先说一个概念“文件契约(lease)”,HDFS在写数据文件的时候,会为每个数据文件创建契约,主要涉及三个重要的步骤:添加契约、开启契约、延续契约(续约)对应LeaseManager中的addLease()、startLease()、renewLease()。


/** * Adds (or re-adds) the lease for the specified file. */synchronized Lease addLease(String holder, String src) {
Lease lease = getLease(holder); if (lease == null) { // 如果契约为空则创建一个契约 lease = new Lease(holder); //存储到支持排序序的数据结构里面 leases.put(holder, lease); sortedLeases.add(lease); } else { //续约 renewLease(lease); } sortedLeasesByPath.put(src, lease); lease.paths.add(src); return lease;}


何为契约?顾名思义就是一种约束,在一个动作中参与 中需要遵守的规则。在HDFS中有个文件契约机制,同一个时间只允许一个客户端获取NameNode上面一个文件的契约,然后才可以向获取契约的文件写入数据。 此时如果其他客户端尝试获取文件契约的时候,就获取不到,只能干等着。 通过这个机制,可以保证同一时间只有一个客户端在写一个文件。 在获取到了文件契约之后,在写文件的过程期间,那个客户端需要开启一个线程,不停的发送请求给 NameNode进行文件续约,告诉NameNode:NameNode大哥,我还在写文件啊,你给我一直保留那个契约好吗? 同时NameNode内部有一个专门的后台线程,负责监控各个契约的续约时间。 如果某个契约很长时间没续约了,即某个客户端没有继续写入文件了,此时就自动过期掉这个契约,让别的客户端来写。

图2:文件契约管理

 
文件契约机制在这里不展开说,有兴趣的可以自己查资料了解一下,还是聚焦到HDFS写数据的流程的主题上来。

/** * Call * {@link #create(String, FsPermission, EnumSet, boolean, short, long, Progressable, int, ChecksumOpt)} * with <code>createParent</code> set to true. */public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt) throws IOException { //似曾相识create return create(src, permission, flag, true, replication, blockSize, progress, buffersize, checksumOpt, null);}
public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes) throws IOException { checkOpen(); if (permission == null) { permission = FsPermission.getFileDefault(); } FsPermission masked = permission.applyUMask(dfsClientConf.uMask); if (LOG.isDebugEnabled()) { LOG.debug(src + ": masked=" + masked); } //这里主要做了三个动作,newStreamForCreate这个方法 // 1、往文件目录树INodeDirectory添加了INodeFile // 2、添加文件契约lease // 3、启动DataStreamer,负责与NameNodeRPCServer通信 final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, buffersize, dfsClientConf.createChecksum(checksumOpt), getFavoredNodesStr(favoredNodes));
//开启契约 beginFileLease(result.getFileId(), result); return result;}
/** Get a lease and start automatic renewal *///获取一个文件契约并启动自动续约private void beginFileLease(final long inodeId, final DFSOutputStream out) throws IOException { getLeaseRenewer().put(inodeId, out, this);}
//这里使用了synchronized关键字,保证同一个时间只有一个客户端能写数据,其他的阻塞synchronized void put(final long inodeId, final DFSOutputStream out, final DFSClient dfsc) { if (dfsc.isClientRunning()) { if (!isRunning() || isRenewerExpired()) { //start a new deamon with a new id. final int id = ++currentId; //创建了一个后台线程 daemon = new Daemon(new Runnable() { @Override public void run() { try { if (LOG.isDebugEnabled()) { LOG.debug("Lease renewer daemon for " + clientsString() + " with renew id " + id + " started"); } //LeaseRenewer----负责管理续约 LeaseRenewer.this.run(id); } catch(InterruptedException e) { if (LOG.isDebugEnabled()) { LOG.debug(LeaseRenewer.this.getClass().getSimpleName() + " is interrupted.", e); } } finally { synchronized(LeaseRenewer.this) { Factory.INSTANCE.remove(LeaseRenewer.this); } if (LOG.isDebugEnabled()) { LOG.debug("Lease renewer daemon for " + clientsString() + " with renew id " + id + " exited"); } } } @Override public String toString() { return String.valueOf(LeaseRenewer.this); } }); //启动后台守护线程 daemon.start(); } dfsc.putFileBeingWritten(inodeId, out); emptyTime = Long.MAX_VALUE; }}

以上完成了写数据的准备动作,契约管理。


启动DataStreamer


static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, DataChecksum checksum, String[] favoredNodes) throws IOException { TraceScope scope = dfsClient.getPathTraceScope("newStreamForCreate", src); try { HdfsFileStatus stat = null;
// Retry the create if we get a RetryStartFileException up to a maximum // number of times boolean shouldRetry = true; int retryCount = CREATE_RETRY_COUNT; //不断重试,确保文件目录创建成功 while (shouldRetry) { shouldRetry = false; try { //hdfs创建文件,可以详细看NameNodeRPC端的create()方法 stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS); break;//成功则跳出循环,失败则抛出异常并重试 } catch (RemoteException re) { IOException e = re.unwrapRemoteException(AccessControlException.class, DSQuotaExceededException.class, FileAlreadyExistsException.class, FileNotFoundException.class, ParentNotDirectoryException.class, NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class, UnresolvedPathException.class, SnapshotAccessControlException.class, UnknownCryptoProtocolVersionException.class); if (e instanceof RetryStartFileException) { //重试 if (retryCount > 0) { shouldRetry = true; retryCount--; } else { throw new IOException("Too many retries because of encryption" + " zone operations", e); } } else { throw e; } } } Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
//初始化了DataStreamer,DataStreamer是写数据流程的核心类,实质上DataStreamer是一个线程 final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes); //启动了DataStreamer out.start(); return out; } finally { scope.close(); }}

下面分析一下写数据的核心类DFSOutputStream和DataStreamer


/** Construct a new output stream for creating a file. */private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress, DataChecksum checksum, String[] favoredNodes) throws IOException { this(dfsClient, src, progress, stat, checksum); this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); /** * Directory -> File -> Block(128M) -> packet(64k)-> chunk(516byte) */ //计算数据单元的值,比较简单,不展开,点进去看就知道了 computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);  //创建了DataStreamer,这才是核心所在 streamer = new DataStreamer(stat, null); if (favoredNodes != null && favoredNodes.length != 0) { streamer.setFavoredNodes(favoredNodes); }}

点进去new DataStreamer()发现这个方法没什么内容,看完是不是有点无从下手?


private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) { isAppend = false; isLazyPersistFile = isLazyPersist(stat); this.block = block; stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;}

所以说阅读源码不是一件很简单的事情,还是没有那么容易的,其实在上面的newStreamForCreate(.....)这个方法中就悄咪咪地说了DataStreamer是一个线程,有start()方法,自然就有run()方法,所以不言而喻,揪出run()就是我们要的答案,后面写数据的时候还要结合这一部分关键代码。




/* * streamer thread is the only thread that opens streams to datanode, and closes * them. Any error recovery is also done by this thread. */@Overridepublic void run() { long lastPacket = Time.monotonicNow(); TraceScope scope = NullScope.INSTANCE; while (!streamerClosed && dfsClient.clientRunning) { // if the Responder encountered an error, shutdown Responder if (hasError && response != null) { try { response.close(); response.join(); response = null; } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } } DFSPacket one; try { // process datanode IO errors if any boolean doSleep = false;  if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) {
doSleep = processDatanodeError(); } synchronized (dataQueue) { // wait for a packet to be sent. long now = Time.monotonicNow(); // 刚开始创建文件的时候,dataQueue.size() == 0 while ((!streamerClosed && !hasError && dfsClient.clientRunning && dataQueue.size() == 0 && (stage != BlockConstructionStage.DATA_STREAMING || stage == BlockConstructionStage.DATA_STREAMING && now - lastPacket < dfsClient.getConf().socketTimeout / 2)) || doSleep) { long timeout = dfsClient.getConf().socketTimeout / 2 - (now - lastPacket); timeout = timeout <= 0 ? 1000 : timeout; timeout = (stage == BlockConstructionStage.DATA_STREAMING) ? timeout : 1000; try { //如果dataQueue队列里面没有数据,代码就会阻塞在这里,等待被唤醒。 dataQueue.wait(timeout); } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } doSleep = false; now = Time.monotonicNow(); } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } // get packet to be sent. if (dataQueue.isEmpty()) { one = createHeartbeatPacket(); assert one != null; } else { //如果队列不为空,从往队列里面取出packet one = dataQueue.getFirst(); // regular data packet long parents[] = one.getTraceParents(); if (parents.length > 0) { scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0])); // TODO: use setParents API once it's available from HTrace 3.2 // scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS); // scope.getSpan().setParents(parents); } } } // get new block from namenode. // 建立数据管道pipeline,向NameNode申请block if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Allocating new block"); } //建立数据管道,保存管道中block的存储信息,包括位置,类型和ID //nextBlockOutputStream()这个方法很重要,向namenode申请block用于写入数据,选择存放block的DataNode策略也是在这个方法里面,这里由于篇幅所限不展开说明。 setPipeline(nextBlockOutputStream()); //初始化DataStreaming服务,启动了ResponseProcessor,用来监听packet发送的状态。 initDataStreaming(); } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Append to block " + block); } setupPipelineForAppendOrRecovery(); initDataStreaming(); } long lastByteOffsetInBlock = one.getLastByteOffsetBlock(); if (lastByteOffsetInBlock > blockSize) { throw new IOException("BlockSize " + blockSize + " is smaller than data size. " + " Offset of packet in block " + lastByteOffsetInBlock + " Aborting file " + src); } if (one.isLastPacketInBlock()) { // wait for all data packets have been successfully acked synchronized (dataQueue) { while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) { try { // wait for acks to arrive from datanodes dataQueue.wait(1000); } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } } } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } stage = BlockConstructionStage.PIPELINE_CLOSE; } // send the packet Span span = null; synchronized (dataQueue) { // move packet from dataQueue to ackQueue if (!one.isHeartbeatPacket()) { span = scope.detach(); one.setTraceSpan(span); //从dataQueue把要发送的这个packet移除出去 dataQueue.removeFirst(); //ackQueue里面添加这个packet ackQueue.addLast(one); //唤醒wait线程 dataQueue.notifyAll(); } } if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DataStreamer block " + block + " sending packet " + one); } // write out data to remote datanode TraceScope writeScope = Trace.startSpan("writeTo", span); try { //这个就是我们写数据代码 one.writeTo(blockStream); blockStream.flush(); } catch (IOException e) { // HDFS-3398 treat primary DN is down since client is unable to // write to primary DN. If a failed or restarting node has already // been recorded by the responder, the following call will have no // effect. Pipeline recovery can handle only one node error at a // time. If the primary node fails again during the recovery, it // will be taken out then. //PrimaryDatanode 指的是数据管道第一个datanode //errorIndex = 0; tryMarkPrimaryDatanodeFailed(); //抛异常 throw e; } finally { writeScope.close(); } lastPacket = Time.monotonicNow(); // update bytesSent long tmpBytesSent = one.getLastByteOffsetBlock(); if (bytesSent < tmpBytesSent) { bytesSent = tmpBytesSent; } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } // Is this block full? if (one.isLastPacketInBlock()) { // wait for the close packet has been acked synchronized (dataQueue) { while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) { dataQueue.wait(1000);// wait for acks to arrive from datanodes } } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } endBlock(); } if (progress != null) { progress.progress(); } // This is used by unit test to trigger race conditions. if (artificialSlowdown != 0 && dfsClient.clientRunning) { Thread.sleep(artificialSlowdown); } } catch (Throwable e) { // Log warning if there was a real error. if (restartingNodeIndex.get() == -1) { DFSClient.LOG.warn("DataStreamer Exception", e); } if (e instanceof IOException) { setLastException((IOException) e); } else { setLastException(new IOException("DataStreamer Exception: ", e)); } //捕获到了异常 //把标识改为true hasError = true; if (errorIndex == -1 && restartingNodeIndex.get() == -1) { // Not a datanode issue streamerClosed = true; } } finally { scope.close(); } } closeInternal();


初始化DataStreaming

/** * Initialize for data streaming */private void initDataStreaming() { this.setName("DataStreamer for file " + src + " block " + block);  response = new ResponseProcessor(nodes); //启动线程,看到这里是不是跟DataStreamer很相似 response.start(); stage = BlockConstructionStage.DATA_STREAMING;}
ResponseProcessor(DatanodeInfo[] targets) { this.targets = targets; } @Override public void run() { setName("ResponseProcessor for block " + block); //ack响应队列,返回DataNode写数据的结果 PipelineAck ack = new PipelineAck(); TraceScope scope = NullScope.INSTANCE; while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) { // process responses from datanodes. try { // read an ack from the pipeline long begin = Time.monotonicNow(); //读取下游的处理结果 ack.readFields(blockReplyStream); long duration = Time.monotonicNow() - begin; if (duration > dfsclientSlowLogThresholdMs && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) { DFSClient.LOG.warn("Slow ReadProcessor read fields took " + duration + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: " + ack + ", targets: " + Arrays.asList(targets)); } else if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient " + ack); } long seqno = ack.getSeqno(); // processes response status from datanodes. for (int i = ack.getNumOfReplies() - 1; i >= 0 && dfsClient.clientRunning; i--) { final Status reply = PipelineAck.getStatusFromHeader(ack.getHeaderFlag(i)); // Restart will not be treated differently unless it is // the local node or the only one in the pipeline. if (PipelineAck.isRestartOOBStatus(reply) && shouldWaitForRestart(i)) { restartDeadline = dfsClient.getConf().datanodeRestartTimeout + Time.monotonicNow(); setRestartingNodeIndex(i); String message = "A datanode is restarting: " + targets[i]; DFSClient.LOG.info(message); throw new IOException(message); } // node error if (reply != SUCCESS) { setErrorIndex(i); // first bad datanode throw new IOException("Bad response " + reply + " for block " + block + " from datanode " + targets[i]); } } assert seqno != PipelineAck.UNKOWN_SEQNO : "Ack for unknown seqno should be a failed ack: " + ack; if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack continue; } // a success ack for a data packet DFSPacket one; synchronized (dataQueue) { one = ackQueue.getFirst(); } if (one.getSeqno() != seqno) { throw new IOException("ResponseProcessor: Expecting seqno " + " for block " + block + one.getSeqno() + " but received " + seqno); } isLastPacketInBlock = one.isLastPacketInBlock(); // Fail the packet write for testing in order to force a // pipeline recovery. if (DFSClientFaultInjector.get().failPacket() && isLastPacketInBlock) { failPacket = true; throw new IOException("Failing the last packet for testing."); } // update bytesAcked block.setNumBytes(one.getLastByteOffsetBlock()); synchronized (dataQueue) { scope = Trace.continueSpan(one.getTraceSpan()); one.setTraceSpan(null); lastAckedSeqno = seqno; //如果ack发送成功那么就会把ackQueue里面packet移除来 ackQueue.removeFirst(); dataQueue.notifyAll(); one.releaseBuffer(byteArrayManager); } } catch (Exception e) { if (!responderClosed) { if (e instanceof IOException) { setLastException((IOException) e); } hasError = true; // If no explicit error report was received, mark the primary // node as failed. tryMarkPrimaryDatanodeFailed(); synchronized (dataQueue) { dataQueue.notifyAll(); } if (restartingNodeIndex.get() == -1) { DFSClient.LOG.warn( "DFSOutputStream ResponseProcessor exception " + " for block " + block, e); } responderClosed = true; } } finally { scope.close(); } } } void close() { responderClosed = true; //打断当前的线程,让线程快速退出 this.interrupt(); }}


执行完DistributedFileSystem的create(..)方法后,将返回的DFSOutputStream对象传递给createWrappedOutputStream(...)方法中进行再次封装,这里引用了设计模式中的“装饰者”模式(Decorator Pattern)。


/** * Wraps the stream in a CryptoOutputStream if the underlying file is encrypted. */ //返回的是HDFSOutputStreampublic HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos, FileSystem.Statistics statistics) throws IOException { return createWrappedOutputStream(dfsos, statistics, 0);}


上面我们完成了create()方法的分析,主要完成了文件目录树添加INodeFIle、文件契约管理并且启动了DataStreamer和初始化DataStreamming,接下来才是真正进行数据写入。

 

开始写数据


根据写数据的流程图我们找到FSDataOutputStream的write()方法。


public void write(int b) throws IOException { //out就是DFSOutputStream out.write(b); position++; if (statistics != null) { statistics.incrementBytesWritten(1); }}



通过DFSOutputStream的父类FSOutputSummer定位到write()方法


/** Write one byte */@Overridepublic synchronized void write(int b) throws IOException { buf[count++] = (byte)b; if(count == buf.length) { //写文件 flushBuffer(); }}
/* Forces any buffered output bytes to be checksumed and written out to * the underlying output stream. */protected synchronized void flushBuffer() throws IOException { // flushBuffer(false, true);}
protected synchronized int flushBuffer(boolean keep, boolean flushPartial) throws IOException { int bufLen = count; int partialLen = bufLen % sum.getBytesPerChecksum(); int lenToFlush = flushPartial ? bufLen : bufLen - partialLen; if (lenToFlush != 0) { //核心的代码 //HDFS File -> Block(128M) -> packet(64K) -> chunk 512 + chunksum 4 writeChecksumChunks(buf, 0, lenToFlush); if (!flushPartial || keep) { count = partialLen; System.arraycopy(buf, bufLen - count, buf, 0, count); } else { count = 0; } } // total bytes left minus unflushed bytes left return count - (bufLen - lenToFlush);}



计算每个数据单元的校验和


/** Generate checksums for the given data chunks and output chunks & checksums * to the underlying output stream. */private void writeChecksumChunks(byte b[], int off, int len)throws IOException { //计算出来chunk的校验和 sum.calculateChunkedSums(b, off, len, checksum, 0); //按照chunk的大小遍历数据 for (int i = 0; i < len; i += sum.getBytesPerChecksum()) { int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i); int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize(); //往chunk写数据 writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize()); }}



继续找到FSOutputSummer的实现类DFSOutputStream,定位到writeChunk(....)方法


// @see FSOutputSummer#writeChunk()@Overrideprotected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen) throws IOException { TraceScope scope = dfsClient.getPathTraceScope("DFSOutputStream#writeChunk", src); try { //写chunk writeChunkImpl(b, offset, len, checksum, ckoff, cklen); } finally { scope.close(); }}
private synchronized void writeChunkImpl(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen) throws IOException { dfsClient.checkOpen(); checkClosed(); if (len > bytesPerChecksum) { throw new IOException("writeChunk() buffer size is " + len + " is larger than supported bytesPerChecksum " + bytesPerChecksum); } if (cklen != 0 && cklen != getChecksumSize()) { throw new IOException( "writeChunk() checksum size is supposed to be " + getChecksumSize() + " but found to be " + cklen); } if (currentPacket == null) { //创建packet currentPacket = createPacket(packetSize, chunksPerPacket, bytesCurBlock, currentSeqno++, false); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + currentPacket.getSeqno() + ", code-snippet__string">", packetSize=" + packetSize + ", chunksPerPacket=" + chunksPerPacket + ", bytesCurBlock=" + bytesCurBlock); } } //往packet里面写chunk校验和 currentPacket.writeChecksum(checksum, ckoff, cklen); //往packet里面写一个chunk currentPacket.writeData(b, offset, len); //累计计算一共写了多少个chunk,如果packet写满了127chunk,那就是一个完整的packet currentPacket.incNumChunks(); //同理,如果packect写满了,会形成blcok bytesCurBlock += len;//128M // If packet is full, enqueue it for transmission //当packet写满了或者block写满了 if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || bytesCurBlock == blockSize) { if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" + currentPacket.getSeqno() + ", code-snippet_outer"> + src + ", bytesCurBlock=" + bytesCurBlock + ", blockSize=" + blockSize + ", appendChunk=" + appendChunk); } //写满了一个packet,将packet加入队列,packet队列满了则阻塞住 waitAndQueueCurrentPacket(); // If the reopened file did not end at chunk boundary and the above // write filled up its partial chunk. Tell the summer to generate full // crc chunks from now on. if (appendChunk && bytesCurBlock % bytesPerChecksum == 0) { appendChunk = false; resetChecksumBufSize(); } if (!appendChunk) { int psize = Math.min((int) (blockSize - bytesCurBlock), dfsClient.getConf().writePacketSize); computePacketChunkSize(psize, bytesPerChecksum); } // // if encountering a block boundary, send an empty packet to // indicate the end of block and reset bytesCurBlock. // 当前累计blcok的大小等于128M,说明我经写完了一个block了。 if (bytesCurBlock == blockSize) { //一个block写完的时候,最后一个packet是一个空的packet。 currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true); currentPacket.setSyncBlock(shouldSyncBlock); //把当前的currentPacket 加入了dataQueue waitAndQueueCurrentPacket(); bytesCurBlock = 0; lastFlushOffset = 0; } }}



本文剖析了HDFS写数据的流程,结合写数据的流程图,主要分七步走,涉及了文件目录和文件的创建,文件契约机制管理,核心写数据服务DataStreamer启动和DataStreaming初始化,数据单元chunk和block写入DataQueue队列,Block的申请以及管道的建立,数据写入容错等等。说实话我个人感觉HDFS写数据的流程是非常复杂的,内容很多,源码阅读起来非常绕,容易把自己绕晕了,这篇文章也只是写了个大概,避免文章篇幅过长,很多细节的代码是没办法体现出来,有兴趣的可以拉取源码细看,希望对大家有所启发。