HDFS写数据流程话你知
作者 | 吴邪 大数据4年从业经验,目前就职于广州一家互联网公司,负责大数据基础平台自研、离线计算&实时计算研究
本文字数7800+,其中90%是代码,采用渐进式分析源码的方法,阅读时间大概需要15-20分钟
客户端发起写文件请求,调用DistributedFileSystem(FileSystem的实现类)的create()方法。
通过NameNodeRPC与NameNode建立通信,首先会判断文件的用户权限以及文件是否已经存在,还有文件是否存在父目录,返回是否可以上传的标识和要上传的数据块。
客户端开始向FSDataOutputStream发送写数据请求,FSDataOutputStream会将数据切分成packet放入数据队列中。
客户端将NameNode返回的可用于写入的DataNode信息列表和block数据块按照就近的策略写入第一个DataNode,DataNode会根据HDFS的副本策略在其他DataNode直接进行数据复制。
DataNode写完数据之后会返回一个确认响应,FSDataOutputStream收集完DataNode的响应之后会清空用于接收响应的数据队列,用于下一次接收信息。
写完数据之后调用客户端的close()方法,关闭数据流。
然后客户端将完成数据写入的信息通知NameNode。
图1
可以看到FileSystem是HDFS文件交互系统的抽象类,底层有很多不同的实现类,包括WebHdfsFileSystem、S3FileSystem、HttpFSFileSystem、AzureFileSystem等等。从HDFS写数据的流程图可以知道第一步就是先定位到create()方法。
找到FileSystem这个类的create()方法,可以看到只有一行代码,而且返回的对象就是FSDataOutputStream输出流,所以下一步找实现类,这也是我们阅读源码的一种好习惯。
/*** Create an FSDataOutputStream at the indicated Path.* Files are overwritten by default.* @param f the file to create*/public FSDataOutputStream create(Path f) throws IOException {//定位就是这个方法return create(f, true);}/*** Create an FSDataOutputStream at the indicated Path.* @param f the file to create* @param overwrite if a file with this name already exists, then if true,* the file will be overwritten, and if false an exception will be thrown.*/public FSDataOutputStream create(Path f, boolean overwrite)throws IOException {//指定要创建的文件,如果已存在则覆盖掉,设置文件缓存大小为4M,获取副本策略以及默认数据块的大小return create(f, overwrite,getConf().getInt("io.file.buffer.size", 4096),getDefaultReplication(f),getDefaultBlockSize(f));}/*** Create an FSDataOutputStream at the indicated Path.* @param f the file name to open* @param overwrite if a file with this name already exists, then if true,* the file will be overwritten, and if false an error will be thrown.* @param bufferSize the size of the buffer to be used.* @param replication required block replication for the file.*/public FSDataOutputStream create(Path f,boolean overwrite,int bufferSize,short replication,long blockSize) throws IOException {//只有一行,那就是它了return create(f, overwrite, bufferSize, replication, blockSize, null);}
找到FileSystem的实现类DistributedFileSystem的create()方法。
public FSDataOutputStream create(Path f, FsPermission permission,boolean overwrite, int bufferSize, short replication, long blockSize,Progressable progress) throws IOException {//巧了,又是一行,是不是很像套娃,一层套一层//写数据的时候会判断文件是否存在,是否覆盖文件,如果overwrite为false则执行CreateFlag.CREATE,反之为true则执行CreateFlag.OVERWRITEreturn this.create(f, permission,overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE): EnumSet.of(CreateFlag.CREATE), bufferSize, replication,blockSize, progress, null);}public FSDataOutputStream create(final Path f, final FsPermission permission,final EnumSet<CreateFlag> cflags, final int bufferSize,final short replication, final long blockSize, final Progressable progress,final ChecksumOpt checksumOpt) throws IOException {statistics.incrementWriteOps(1);Path absF = fixRelativePart(f);return new FileSystemLinkResolver<FSDataOutputStream>() {public FSDataOutputStream doCall(final Path p)throws IOException, UnresolvedLinkException {//创建了一个DFSOutputStream,进行初始化操作/*** 往文件目录树里面添加了INodeFile* 添加了契约管理* 启动了DataStreamer,写数据核心服务*/final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,cflags, replication, blockSize, progress, bufferSize,checksumOpt);return dfs.createWrappedOutputStream(dfsos, statistics);}public FSDataOutputStream next(final FileSystem fs, final Path p)throws IOException {return fs.create(p, permission, cflags, bufferSize,replication, blockSize, progress, checksumOpt);}}.resolve(this, absF);}
分析dfs.create(.....)方法里面执行了什么操作,在这里先说一个概念“文件契约(lease)”,HDFS在写数据文件的时候,会为每个数据文件创建契约,主要涉及三个重要的步骤:添加契约、开启契约、延续契约(续约)对应LeaseManager中的addLease()、startLease()、renewLease()。
/*** Adds (or re-adds) the lease for the specified file.*/synchronized Lease addLease(String holder, String src) {Lease lease = getLease(holder);if (lease == null) {// 如果契约为空则创建一个契约lease = new Lease(holder);//存储到支持排序序的数据结构里面leases.put(holder, lease);sortedLeases.add(lease);} else {//续约renewLease(lease);}sortedLeasesByPath.put(src, lease);lease.paths.add(src);return lease;}
图2:文件契约管理
/*** Call* {@link #create(String, FsPermission, EnumSet, boolean, short, long, Progressable, int, ChecksumOpt)}* with <code>createParent</code> set to true.*/public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, short replication,long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt) throws IOException {//似曾相识createreturn create(src, permission, flag, true, replication, blockSize, progress, buffersize, checksumOpt, null);}public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent,short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt,InetSocketAddress[] favoredNodes) throws IOException {checkOpen();if (permission == null) {permission = FsPermission.getFileDefault();}FsPermission masked = permission.applyUMask(dfsClientConf.uMask);if (LOG.isDebugEnabled()) {LOG.debug(src + ": masked=" + masked);}//这里主要做了三个动作,newStreamForCreate这个方法// 1、往文件目录树INodeDirectory添加了INodeFile// 2、添加文件契约lease// 3、启动DataStreamer,负责与NameNodeRPCServer通信final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent,replication, blockSize, progress, buffersize, dfsClientConf.createChecksum(checksumOpt),getFavoredNodesStr(favoredNodes));//开启契约beginFileLease(result.getFileId(), result);return result;}/** Get a lease and start automatic renewal *///获取一个文件契约并启动自动续约private void beginFileLease(final long inodeId, final DFSOutputStream out) throws IOException {getLeaseRenewer().put(inodeId, out, this);}//这里使用了synchronized关键字,保证同一个时间只有一个客户端能写数据,其他的阻塞synchronized void put(final long inodeId, final DFSOutputStream out,final DFSClient dfsc) {if (dfsc.isClientRunning()) {if (!isRunning() || isRenewerExpired()) {//start a new deamon with a new id.final int id = ++currentId;//创建了一个后台线程daemon = new Daemon(new Runnable() {public void run() {try {if (LOG.isDebugEnabled()) {LOG.debug("Lease renewer daemon for " + clientsString()+ " with renew id " + id + " started");}//LeaseRenewer----负责管理续约LeaseRenewer.this.run(id);} catch(InterruptedException e) {if (LOG.isDebugEnabled()) {LOG.debug(LeaseRenewer.this.getClass().getSimpleName()+ " is interrupted.", e);}} finally {synchronized(LeaseRenewer.this) {Factory.INSTANCE.remove(LeaseRenewer.this);}if (LOG.isDebugEnabled()) {LOG.debug("Lease renewer daemon for " + clientsString()+ " with renew id " + id + " exited");}}}public String toString() {return String.valueOf(LeaseRenewer.this);}});//启动后台守护线程daemon.start();}dfsc.putFileBeingWritten(inodeId, out);emptyTime = Long.MAX_VALUE;}}
以上完成了写数据的准备动作,契约管理。
启动DataStreamer
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked,EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress,int buffersize, DataChecksum checksum, String[] favoredNodes) throws IOException {TraceScope scope = dfsClient.getPathTraceScope("newStreamForCreate", src);try {HdfsFileStatus stat = null;// Retry the create if we get a RetryStartFileException up to a maximum// number of timesboolean shouldRetry = true;int retryCount = CREATE_RETRY_COUNT;//不断重试,确保文件目录创建成功while (shouldRetry) {shouldRetry = false;try {//hdfs创建文件,可以详细看NameNodeRPC端的create()方法stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize,SUPPORTED_CRYPTO_VERSIONS);break;//成功则跳出循环,失败则抛出异常并重试} catch (RemoteException re) {IOException e = re.unwrapRemoteException(AccessControlException.class,DSQuotaExceededException.class, FileAlreadyExistsException.class,FileNotFoundException.class, ParentNotDirectoryException.class,NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class,UnresolvedPathException.class, SnapshotAccessControlException.class,UnknownCryptoProtocolVersionException.class);if (e instanceof RetryStartFileException) {//重试if (retryCount > 0) {shouldRetry = true;retryCount--;} else {throw new IOException("Too many retries because of encryption" + " zone operations", e);}} else {throw e;}}}Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");//初始化了DataStreamer,DataStreamer是写数据流程的核心类,实质上DataStreamer是一个线程final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum,favoredNodes);//启动了DataStreamerout.start();return out;} finally {scope.close();}}
下面分析一下写数据的核心类DFSOutputStream和DataStreamer。
/** Construct a new output stream for creating a file. */private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet<CreateFlag> flag,Progressable progress, DataChecksum checksum, String[] favoredNodes) throws IOException {this(dfsClient, src, progress, stat, checksum);this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);/*** Directory -> File -> Block(128M) -> packet(64k)-> chunk(516byte)*///计算数据单元的值,比较简单,不展开,点进去看就知道了computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);//创建了DataStreamer,这才是核心所在streamer = new DataStreamer(stat, null);if (favoredNodes != null && favoredNodes.length != 0) {streamer.setFavoredNodes(favoredNodes);}}
点进去new DataStreamer()发现这个方法没什么内容,看完是不是有点无从下手?
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) {isAppend = false;isLazyPersistFile = isLazyPersist(stat);this.block = block;stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;}
所以说阅读源码不是一件很简单的事情,还是没有那么容易的,其实在上面的newStreamForCreate(.....)这个方法中就悄咪咪地说了DataStreamer是一个线程,有start()方法,自然就有run()方法,所以不言而喻,揪出run()就是我们要的答案,后面写数据的时候还要结合这一部分关键代码。
/** streamer thread is the only thread that opens streams to datanode, and closes* them. Any error recovery is also done by this thread.*/public void run() {long lastPacket = Time.monotonicNow();TraceScope scope = NullScope.INSTANCE;while (!streamerClosed && dfsClient.clientRunning) {// if the Responder encountered an error, shutdown Responderif (hasError && response != null) {try {response.close();response.join();response = null;} catch (InterruptedException e) {DFSClient.LOG.warn("Caught exception ", e);}}DFSPacket one;try {// process datanode IO errors if anyboolean doSleep = false;if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) {doSleep = processDatanodeError();}synchronized (dataQueue) {// wait for a packet to be sent.long now = Time.monotonicNow();// 刚开始创建文件的时候,dataQueue.size() == 0while ((!streamerClosed && !hasError && dfsClient.clientRunning && dataQueue.size() == 0&& (stage != BlockConstructionStage.DATA_STREAMING|| stage == BlockConstructionStage.DATA_STREAMING&& now - lastPacket < dfsClient.getConf().socketTimeout / 2))|| doSleep) {long timeout = dfsClient.getConf().socketTimeout / 2 - (now - lastPacket);timeout = timeout <= 0 ? 1000 : timeout;timeout = (stage == BlockConstructionStage.DATA_STREAMING) ? timeout : 1000;try {//如果dataQueue队列里面没有数据,代码就会阻塞在这里,等待被唤醒。dataQueue.wait(timeout);} catch (InterruptedException e) {DFSClient.LOG.warn("Caught exception ", e);}doSleep = false;now = Time.monotonicNow();}if (streamerClosed || hasError || !dfsClient.clientRunning) {continue;}// get packet to be sent.if (dataQueue.isEmpty()) {one = createHeartbeatPacket();assert one != null;} else {//如果队列不为空,从往队列里面取出packetone = dataQueue.getFirst(); // regular data packetlong parents[] = one.getTraceParents();if (parents.length > 0) {scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0]));// TODO: use setParents API once it's available from HTrace 3.2// scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS);// scope.getSpan().setParents(parents);}}}// get new block from namenode.// 建立数据管道pipeline,向NameNode申请blockif (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {if (DFSClient.LOG.isDebugEnabled()) {DFSClient.LOG.debug("Allocating new block");}//建立数据管道,保存管道中block的存储信息,包括位置,类型和ID//nextBlockOutputStream()这个方法很重要,向namenode申请block用于写入数据,选择存放block的DataNode策略也是在这个方法里面,这里由于篇幅所限不展开说明。setPipeline(nextBlockOutputStream());//初始化DataStreaming服务,启动了ResponseProcessor,用来监听packet发送的状态。initDataStreaming();} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {if (DFSClient.LOG.isDebugEnabled()) {DFSClient.LOG.debug("Append to block " + block);}setupPipelineForAppendOrRecovery();initDataStreaming();}long lastByteOffsetInBlock = one.getLastByteOffsetBlock();if (lastByteOffsetInBlock > blockSize) {throw new IOException("BlockSize " + blockSize + " is smaller than data size. "+ " Offset of packet in block " + lastByteOffsetInBlock + " Aborting file " + src);}if (one.isLastPacketInBlock()) {// wait for all data packets have been successfully ackedsynchronized (dataQueue) {while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) {try {// wait for acks to arrive from datanodesdataQueue.wait(1000);} catch (InterruptedException e) {DFSClient.LOG.warn("Caught exception ", e);}}}if (streamerClosed || hasError || !dfsClient.clientRunning) {continue;}stage = BlockConstructionStage.PIPELINE_CLOSE;}// send the packetSpan span = null;synchronized (dataQueue) {// move packet from dataQueue to ackQueueif (!one.isHeartbeatPacket()) {span = scope.detach();one.setTraceSpan(span);//从dataQueue把要发送的这个packet移除出去dataQueue.removeFirst();//ackQueue里面添加这个packetackQueue.addLast(one);//唤醒wait线程dataQueue.notifyAll();}}if (DFSClient.LOG.isDebugEnabled()) {DFSClient.LOG.debug("DataStreamer block " + block + " sending packet " + one);}// write out data to remote datanodeTraceScope writeScope = Trace.startSpan("writeTo", span);try {//这个就是我们写数据代码one.writeTo(blockStream);blockStream.flush();} catch (IOException e) {// HDFS-3398 treat primary DN is down since client is unable to// write to primary DN. If a failed or restarting node has already// been recorded by the responder, the following call will have no// effect. Pipeline recovery can handle only one node error at a// time. If the primary node fails again during the recovery, it// will be taken out then.//PrimaryDatanode 指的是数据管道第一个datanode//errorIndex = 0;tryMarkPrimaryDatanodeFailed();//抛异常throw e;} finally {writeScope.close();}lastPacket = Time.monotonicNow();// update bytesSentlong tmpBytesSent = one.getLastByteOffsetBlock();if (bytesSent < tmpBytesSent) {bytesSent = tmpBytesSent;}if (streamerClosed || hasError || !dfsClient.clientRunning) {continue;}// Is this block full?if (one.isLastPacketInBlock()) {// wait for the close packet has been ackedsynchronized (dataQueue) {while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) {dataQueue.wait(1000);// wait for acks to arrive from datanodes}}if (streamerClosed || hasError || !dfsClient.clientRunning) {continue;}endBlock();}if (progress != null) {progress.progress();}// This is used by unit test to trigger race conditions.if (artificialSlowdown != 0 && dfsClient.clientRunning) {Thread.sleep(artificialSlowdown);}} catch (Throwable e) {// Log warning if there was a real error.if (restartingNodeIndex.get() == -1) {DFSClient.LOG.warn("DataStreamer Exception", e);}if (e instanceof IOException) {setLastException((IOException) e);} else {setLastException(new IOException("DataStreamer Exception: ", e));}//捕获到了异常//把标识改为truehasError = true;if (errorIndex == -1 && restartingNodeIndex.get() == -1) {// Not a datanode issuestreamerClosed = true;}} finally {scope.close();}}closeInternal();
/*** Initialize for data streaming*/private void initDataStreaming() {this.setName("DataStreamer for file " + src + " block " + block);response = new ResponseProcessor(nodes);//启动线程,看到这里是不是跟DataStreamer很相似response.start();stage = BlockConstructionStage.DATA_STREAMING;}ResponseProcessor(DatanodeInfo[] targets) {this.targets = targets;}public void run() {setName("ResponseProcessor for block " + block);//ack响应队列,返回DataNode写数据的结果PipelineAck ack = new PipelineAck();TraceScope scope = NullScope.INSTANCE;while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {// process responses from datanodes.try {// read an ack from the pipelinelong begin = Time.monotonicNow();//读取下游的处理结果ack.readFields(blockReplyStream);long duration = Time.monotonicNow() - begin;if (duration > dfsclientSlowLogThresholdMs && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {DFSClient.LOG.warn("Slow ReadProcessor read fields took " + duration + "ms (threshold="+ dfsclientSlowLogThresholdMs + "ms); ack: " + ack + ", targets: "+ Arrays.asList(targets));} else if (DFSClient.LOG.isDebugEnabled()) {DFSClient.LOG.debug("DFSClient " + ack);}long seqno = ack.getSeqno();// processes response status from datanodes.for (int i = ack.getNumOfReplies() - 1; i >= 0 && dfsClient.clientRunning; i--) {final Status reply = PipelineAck.getStatusFromHeader(ack.getHeaderFlag(i));// Restart will not be treated differently unless it is// the local node or the only one in the pipeline.if (PipelineAck.isRestartOOBStatus(reply) && shouldWaitForRestart(i)) {restartDeadline = dfsClient.getConf().datanodeRestartTimeout + Time.monotonicNow();setRestartingNodeIndex(i);String message = "A datanode is restarting: " + targets[i];DFSClient.LOG.info(message);throw new IOException(message);}// node errorif (reply != SUCCESS) {setErrorIndex(i); // first bad datanodethrow new IOException("Bad response " + reply + " for block " + block+ " from datanode " + targets[i]);}}assert seqno != PipelineAck.UNKOWN_SEQNO : "Ack for unknown seqno should be a failed ack: "+ ack;if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ackcontinue;}// a success ack for a data packetDFSPacket one;synchronized (dataQueue) {one = ackQueue.getFirst();}if (one.getSeqno() != seqno) {throw new IOException("ResponseProcessor: Expecting seqno " + " for block " + block+ one.getSeqno() + " but received " + seqno);}isLastPacketInBlock = one.isLastPacketInBlock();// Fail the packet write for testing in order to force a// pipeline recovery.if (DFSClientFaultInjector.get().failPacket() && isLastPacketInBlock) {failPacket = true;throw new IOException("Failing the last packet for testing.");}// update bytesAckedblock.setNumBytes(one.getLastByteOffsetBlock());synchronized (dataQueue) {scope = Trace.continueSpan(one.getTraceSpan());one.setTraceSpan(null);lastAckedSeqno = seqno;//如果ack发送成功那么就会把ackQueue里面packet移除来ackQueue.removeFirst();dataQueue.notifyAll();one.releaseBuffer(byteArrayManager);}} catch (Exception e) {if (!responderClosed) {if (e instanceof IOException) {setLastException((IOException) e);}hasError = true;// If no explicit error report was received, mark the primary// node as failed.tryMarkPrimaryDatanodeFailed();synchronized (dataQueue) {dataQueue.notifyAll();}if (restartingNodeIndex.get() == -1) {DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception " + " for block " + block, e);}responderClosed = true;}} finally {scope.close();}}}void close() {responderClosed = true;//打断当前的线程,让线程快速退出this.interrupt();}}
执行完DistributedFileSystem的create(..)方法后,将返回的DFSOutputStream对象传递给createWrappedOutputStream(...)方法中进行再次封装,这里引用了设计模式中的“装饰者”模式(Decorator Pattern)。
/*** Wraps the stream in a CryptoOutputStream if the underlying file is encrypted.*///返回的是HDFSOutputStreampublic HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos, FileSystem.Statistics statistics)throws IOException {return createWrappedOutputStream(dfsos, statistics, 0);}
上面我们完成了create()方法的分析,主要完成了文件目录树添加INodeFIle、文件契约管理并且启动了DataStreamer和初始化DataStreamming,接下来才是真正进行数据写入。
开始写数据
根据写数据的流程图我们找到FSDataOutputStream的write()方法。
public void write(int b) throws IOException {//out就是DFSOutputStreamout.write(b);position++;if (statistics != null) {statistics.incrementBytesWritten(1);}}
通过DFSOutputStream的父类FSOutputSummer定位到write()方法
/** Write one byte */public synchronized void write(int b) throws IOException {buf[count++] = (byte)b;if(count == buf.length) {//写文件flushBuffer();}}/* Forces any buffered output bytes to be checksumed and written out to* the underlying output stream.*/protected synchronized void flushBuffer() throws IOException {//flushBuffer(false, true);}protected synchronized int flushBuffer(boolean keep,boolean flushPartial) throws IOException {int bufLen = count;int partialLen = bufLen % sum.getBytesPerChecksum();int lenToFlush = flushPartial ? bufLen : bufLen - partialLen;if (lenToFlush != 0) {//核心的代码//HDFS File -> Block(128M) -> packet(64K) -> chunk 512 + chunksum 4writeChecksumChunks(buf, 0, lenToFlush);if (!flushPartial || keep) {count = partialLen;System.arraycopy(buf, bufLen - count, buf, 0, count);} else {count = 0;}}// total bytes left minus unflushed bytes leftreturn count - (bufLen - lenToFlush);}
计算每个数据单元的校验和
/** Generate checksums for the given data chunks and output chunks & checksums* to the underlying output stream.*/private void writeChecksumChunks(byte b[], int off, int len)throws IOException {//计算出来chunk的校验和sum.calculateChunkedSums(b, off, len, checksum, 0);//按照chunk的大小遍历数据for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();//往chunk写数据writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());}}
继续找到FSOutputSummer的实现类DFSOutputStream,定位到writeChunk(....)方法
// @see FSOutputSummer#writeChunk()protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen)throws IOException {TraceScope scope = dfsClient.getPathTraceScope("DFSOutputStream#writeChunk", src);try {//写chunkwriteChunkImpl(b, offset, len, checksum, ckoff, cklen);} finally {scope.close();}}private synchronized void writeChunkImpl(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen)throws IOException {dfsClient.checkOpen();checkClosed();if (len > bytesPerChecksum) {throw new IOException("writeChunk() buffer size is " + len + " is larger than supported bytesPerChecksum "+ bytesPerChecksum);}if (cklen != 0 && cklen != getChecksumSize()) {throw new IOException("writeChunk() checksum size is supposed to be " + getChecksumSize() + " but found to be " + cklen);}if (currentPacket == null) {//创建packetcurrentPacket = createPacket(packetSize, chunksPerPacket, bytesCurBlock, currentSeqno++, false);if (DFSClient.LOG.isDebugEnabled()) {DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + currentPacket.getSeqno()+ ", code-snippet__string">", packetSize=" + packetSize + ", chunksPerPacket=" + chunksPerPacket+ ", bytesCurBlock=" + bytesCurBlock);}}//往packet里面写chunk校验和currentPacket.writeChecksum(checksum, ckoff, cklen);//往packet里面写一个chunkcurrentPacket.writeData(b, offset, len);//累计计算一共写了多少个chunk,如果packet写满了127chunk,那就是一个完整的packetcurrentPacket.incNumChunks();//同理,如果packect写满了,会形成blcokbytesCurBlock += len;//128M// If packet is full, enqueue it for transmission//当packet写满了或者block写满了if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || bytesCurBlock == blockSize) {if (DFSClient.LOG.isDebugEnabled()) {DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" + currentPacket.getSeqno() + ", code-snippet_outer"> + src + ", bytesCurBlock=" + bytesCurBlock + ", blockSize=" + blockSize + ", appendChunk="+ appendChunk);}//写满了一个packet,将packet加入队列,packet队列满了则阻塞住waitAndQueueCurrentPacket();// If the reopened file did not end at chunk boundary and the above// write filled up its partial chunk. Tell the summer to generate full// crc chunks from now on.if (appendChunk && bytesCurBlock % bytesPerChecksum == 0) {appendChunk = false;resetChecksumBufSize();}if (!appendChunk) {int psize = Math.min((int) (blockSize - bytesCurBlock), dfsClient.getConf().writePacketSize);computePacketChunkSize(psize, bytesPerChecksum);}//// if encountering a block boundary, send an empty packet to// indicate the end of block and reset bytesCurBlock.// 当前累计blcok的大小等于128M,说明我经写完了一个block了。if (bytesCurBlock == blockSize) {//一个block写完的时候,最后一个packet是一个空的packet。currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);currentPacket.setSyncBlock(shouldSyncBlock);//把当前的currentPacket 加入了dataQueuewaitAndQueueCurrentPacket();bytesCurBlock = 0;lastFlushOffset = 0;}}}
本文剖析了HDFS写数据的流程,结合写数据的流程图,主要分七步走,涉及了文件目录和文件的创建,文件契约机制管理,核心写数据服务DataStreamer启动和DataStreaming初始化,数据单元chunk和block写入DataQueue队列,Block的申请以及管道的建立,数据写入容错等等。说实话我个人感觉HDFS写数据的流程是非常复杂的,内容很多,源码阅读起来非常绕,容易把自己绕晕了,这篇文章也只是写了个大概,避免文章篇幅过长,很多细节的代码是没办法体现出来,有兴趣的可以拉取源码细看,希望对大家有所启发。
