HDFS写数据流程话你知
作者 | 吴邪 大数据4年从业经验,目前就职于广州一家互联网公司,负责大数据基础平台自研、离线计算&实时计算研究
本文字数7800+,其中90%是代码,采用渐进式分析源码的方法,阅读时间大概需要15-20分钟
客户端发起写文件请求,调用DistributedFileSystem(FileSystem的实现类)的create()方法。
通过NameNodeRPC与NameNode建立通信,首先会判断文件的用户权限以及文件是否已经存在,还有文件是否存在父目录,返回是否可以上传的标识和要上传的数据块。
客户端开始向FSDataOutputStream发送写数据请求,FSDataOutputStream会将数据切分成packet放入数据队列中。
客户端将NameNode返回的可用于写入的DataNode信息列表和block数据块按照就近的策略写入第一个DataNode,DataNode会根据HDFS的副本策略在其他DataNode直接进行数据复制。
DataNode写完数据之后会返回一个确认响应,FSDataOutputStream收集完DataNode的响应之后会清空用于接收响应的数据队列,用于下一次接收信息。
写完数据之后调用客户端的close()方法,关闭数据流。
然后客户端将完成数据写入的信息通知NameNode。
图1
可以看到FileSystem是HDFS文件交互系统的抽象类,底层有很多不同的实现类,包括WebHdfsFileSystem、S3FileSystem、HttpFSFileSystem、AzureFileSystem等等。从HDFS写数据的流程图可以知道第一步就是先定位到create()方法。
找到FileSystem这个类的create()方法,可以看到只有一行代码,而且返回的对象就是FSDataOutputStream输出流,所以下一步找实现类,这也是我们阅读源码的一种好习惯。
/**
* Create an FSDataOutputStream at the indicated Path.
* Files are overwritten by default.
* @param f the file to create
*/
public FSDataOutputStream create(Path f) throws IOException {
//定位就是这个方法
return create(f, true);
}
/**
* Create an FSDataOutputStream at the indicated Path.
* @param f the file to create
* @param overwrite if a file with this name already exists, then if true,
* the file will be overwritten, and if false an exception will be thrown.
*/
public FSDataOutputStream create(Path f, boolean overwrite)
throws IOException {
//指定要创建的文件,如果已存在则覆盖掉,设置文件缓存大小为4M,获取副本策略以及默认数据块的大小
return create(f, overwrite,
getConf().getInt("io.file.buffer.size", 4096),
getDefaultReplication(f),
getDefaultBlockSize(f));
}
/**
* Create an FSDataOutputStream at the indicated Path.
* @param f the file name to open
* @param overwrite if a file with this name already exists, then if true,
* the file will be overwritten, and if false an error will be thrown.
* @param bufferSize the size of the buffer to be used.
* @param replication required block replication for the file.
*/
public FSDataOutputStream create(Path f,
boolean overwrite,
int bufferSize,
short replication,
long blockSize
) throws IOException {
//只有一行,那就是它了
return create(f, overwrite, bufferSize, replication, blockSize, null);
}
找到FileSystem的实现类DistributedFileSystem的create()方法。
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
//巧了,又是一行,是不是很像套娃,一层套一层
//写数据的时候会判断文件是否存在,是否覆盖文件,如果overwrite为false则执行CreateFlag.CREATE,反之为true则执行CreateFlag.OVERWRITE
return this.create(f, permission,
overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
: EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
blockSize, progress, null);
}
public FSDataOutputStream create(final Path f, final FsPermission permission,
final EnumSet<CreateFlag> cflags, final int bufferSize,
final short replication, final long blockSize, final Progressable progress,
final ChecksumOpt checksumOpt) throws IOException {
statistics.incrementWriteOps(1);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataOutputStream>() {
public FSDataOutputStream doCall(final Path p)
throws IOException, UnresolvedLinkException {
//创建了一个DFSOutputStream,进行初始化操作
/**
* 往文件目录树里面添加了INodeFile
* 添加了契约管理
* 启动了DataStreamer,写数据核心服务
*/
final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
cflags, replication, blockSize, progress, bufferSize,
checksumOpt);
return dfs.createWrappedOutputStream(dfsos, statistics);
}
public FSDataOutputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.create(p, permission, cflags, bufferSize,
replication, blockSize, progress, checksumOpt);
}
}.resolve(this, absF);
}
分析dfs.create(.....)方法里面执行了什么操作,在这里先说一个概念“文件契约(lease)”,HDFS在写数据文件的时候,会为每个数据文件创建契约,主要涉及三个重要的步骤:添加契约、开启契约、延续契约(续约)对应LeaseManager中的addLease()、startLease()、renewLease()。
/**
* Adds (or re-adds) the lease for the specified file.
*/
synchronized Lease addLease(String holder, String src) {
Lease lease = getLease(holder);
if (lease == null) {
// 如果契约为空则创建一个契约
lease = new Lease(holder);
//存储到支持排序序的数据结构里面
leases.put(holder, lease);
sortedLeases.add(lease);
} else {
//续约
renewLease(lease);
}
sortedLeasesByPath.put(src, lease);
lease.paths.add(src);
return lease;
}
图2:文件契约管理
/**
* Call
* {@link #create(String, FsPermission, EnumSet, boolean, short, long, Progressable, int, ChecksumOpt)}
* with <code>createParent</code> set to true.
*/
public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, short replication,
long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt) throws IOException {
//似曾相识create
return create(src, permission, flag, true, replication, blockSize, progress, buffersize, checksumOpt, null);
}
public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt,
InetSocketAddress[] favoredNodes) throws IOException {
checkOpen();
if (permission == null) {
permission = FsPermission.getFileDefault();
}
FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
if (LOG.isDebugEnabled()) {
LOG.debug(src + ": masked=" + masked);
}
//这里主要做了三个动作,newStreamForCreate这个方法
// 1、往文件目录树INodeDirectory添加了INodeFile
// 2、添加文件契约lease
// 3、启动DataStreamer,负责与NameNodeRPCServer通信
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent,
replication, blockSize, progress, buffersize, dfsClientConf.createChecksum(checksumOpt),
getFavoredNodesStr(favoredNodes));
//开启契约
beginFileLease(result.getFileId(), result);
return result;
}
/** Get a lease and start automatic renewal */
//获取一个文件契约并启动自动续约
private void beginFileLease(final long inodeId, final DFSOutputStream out) throws IOException {
getLeaseRenewer().put(inodeId, out, this);
}
//这里使用了synchronized关键字,保证同一个时间只有一个客户端能写数据,其他的阻塞
synchronized void put(final long inodeId, final DFSOutputStream out,
final DFSClient dfsc) {
if (dfsc.isClientRunning()) {
if (!isRunning() || isRenewerExpired()) {
//start a new deamon with a new id.
final int id = ++currentId;
//创建了一个后台线程
daemon = new Daemon(new Runnable() {
public void run() {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Lease renewer daemon for " + clientsString()
+ " with renew id " + id + " started");
}
//LeaseRenewer----负责管理续约
LeaseRenewer.this.run(id);
} catch(InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(LeaseRenewer.this.getClass().getSimpleName()
+ " is interrupted.", e);
}
} finally {
synchronized(LeaseRenewer.this) {
Factory.INSTANCE.remove(LeaseRenewer.this);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Lease renewer daemon for " + clientsString()
+ " with renew id " + id + " exited");
}
}
}
public String toString() {
return String.valueOf(LeaseRenewer.this);
}
});
//启动后台守护线程
daemon.start();
}
dfsc.putFileBeingWritten(inodeId, out);
emptyTime = Long.MAX_VALUE;
}
}
以上完成了写数据的准备动作,契约管理。
启动DataStreamer
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked,
EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress,
int buffersize, DataChecksum checksum, String[] favoredNodes) throws IOException {
TraceScope scope = dfsClient.getPathTraceScope("newStreamForCreate", src);
try {
HdfsFileStatus stat = null;
// Retry the create if we get a RetryStartFileException up to a maximum
// number of times
boolean shouldRetry = true;
int retryCount = CREATE_RETRY_COUNT;
//不断重试,确保文件目录创建成功
while (shouldRetry) {
shouldRetry = false;
try {
//hdfs创建文件,可以详细看NameNodeRPC端的create()方法
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize,
SUPPORTED_CRYPTO_VERSIONS);
break;//成功则跳出循环,失败则抛出异常并重试
} catch (RemoteException re) {
IOException e = re.unwrapRemoteException(AccessControlException.class,
DSQuotaExceededException.class, FileAlreadyExistsException.class,
FileNotFoundException.class, ParentNotDirectoryException.class,
NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class,
UnresolvedPathException.class, SnapshotAccessControlException.class,
UnknownCryptoProtocolVersionException.class);
if (e instanceof RetryStartFileException) {
//重试
if (retryCount > 0) {
shouldRetry = true;
retryCount--;
} else {
throw new IOException("Too many retries because of encryption" + " zone operations", e);
}
} else {
throw e;
}
}
}
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
//初始化了DataStreamer,DataStreamer是写数据流程的核心类,实质上DataStreamer是一个线程
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum,
favoredNodes);
//启动了DataStreamer
out.start();
return out;
} finally {
scope.close();
}
}
下面分析一下写数据的核心类DFSOutputStream和DataStreamer。
/** Construct a new output stream for creating a file. */
private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet<CreateFlag> flag,
Progressable progress, DataChecksum checksum, String[] favoredNodes) throws IOException {
this(dfsClient, src, progress, stat, checksum);
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
/**
* Directory -> File -> Block(128M) -> packet(64k)-> chunk(516byte)
*/
//计算数据单元的值,比较简单,不展开,点进去看就知道了
computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
//创建了DataStreamer,这才是核心所在
streamer = new DataStreamer(stat, null);
if (favoredNodes != null && favoredNodes.length != 0) {
streamer.setFavoredNodes(favoredNodes);
}
}
点进去new DataStreamer()发现这个方法没什么内容,看完是不是有点无从下手?
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) {
isAppend = false;
isLazyPersistFile = isLazyPersist(stat);
this.block = block;
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
所以说阅读源码不是一件很简单的事情,还是没有那么容易的,其实在上面的newStreamForCreate(.....)这个方法中就悄咪咪地说了DataStreamer是一个线程,有start()方法,自然就有run()方法,所以不言而喻,揪出run()就是我们要的答案,后面写数据的时候还要结合这一部分关键代码。
/*
* streamer thread is the only thread that opens streams to datanode, and closes
* them. Any error recovery is also done by this thread.
*/
public void run() {
long lastPacket = Time.monotonicNow();
TraceScope scope = NullScope.INSTANCE;
while (!streamerClosed && dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder
if (hasError && response != null) {
try {
response.close();
response.join();
response = null;
} catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e);
}
}
DFSPacket one;
try {
// process datanode IO errors if any
boolean doSleep = false;
if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) {
doSleep = processDatanodeError();
}
synchronized (dataQueue) {
// wait for a packet to be sent.
long now = Time.monotonicNow();
// 刚开始创建文件的时候,dataQueue.size() == 0
while ((!streamerClosed && !hasError && dfsClient.clientRunning && dataQueue.size() == 0
&& (stage != BlockConstructionStage.DATA_STREAMING
|| stage == BlockConstructionStage.DATA_STREAMING
&& now - lastPacket < dfsClient.getConf().socketTimeout / 2))
|| doSleep) {
long timeout = dfsClient.getConf().socketTimeout / 2 - (now - lastPacket);
timeout = timeout <= 0 ? 1000 : timeout;
timeout = (stage == BlockConstructionStage.DATA_STREAMING) ? timeout : 1000;
try {
//如果dataQueue队列里面没有数据,代码就会阻塞在这里,等待被唤醒。
dataQueue.wait(timeout);
} catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e);
}
doSleep = false;
now = Time.monotonicNow();
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}
// get packet to be sent.
if (dataQueue.isEmpty()) {
one = createHeartbeatPacket();
assert one != null;
} else {
//如果队列不为空,从往队列里面取出packet
one = dataQueue.getFirst(); // regular data packet
long parents[] = one.getTraceParents();
if (parents.length > 0) {
scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0]));
// TODO: use setParents API once it's available from HTrace 3.2
// scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS);
// scope.getSpan().setParents(parents);
}
}
}
// get new block from namenode.
// 建立数据管道pipeline,向NameNode申请block
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Allocating new block");
}
//建立数据管道,保存管道中block的存储信息,包括位置,类型和ID
//nextBlockOutputStream()这个方法很重要,向namenode申请block用于写入数据,选择存放block的DataNode策略也是在这个方法里面,这里由于篇幅所限不展开说明。
setPipeline(nextBlockOutputStream());
//初始化DataStreaming服务,启动了ResponseProcessor,用来监听packet发送的状态。
initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Append to block " + block);
}
setupPipelineForAppendOrRecovery();
initDataStreaming();
}
long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
if (lastByteOffsetInBlock > blockSize) {
throw new IOException("BlockSize " + blockSize + " is smaller than data size. "
+ " Offset of packet in block " + lastByteOffsetInBlock + " Aborting file " + src);
}
if (one.isLastPacketInBlock()) {
// wait for all data packets have been successfully acked
synchronized (dataQueue) {
while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) {
try {
// wait for acks to arrive from datanodes
dataQueue.wait(1000);
} catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e);
}
}
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}
stage = BlockConstructionStage.PIPELINE_CLOSE;
}
// send the packet
Span span = null;
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
span = scope.detach();
one.setTraceSpan(span);
//从dataQueue把要发送的这个packet移除出去
dataQueue.removeFirst();
//ackQueue里面添加这个packet
ackQueue.addLast(one);
//唤醒wait线程
dataQueue.notifyAll();
}
}
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DataStreamer block " + block + " sending packet " + one);
}
// write out data to remote datanode
TraceScope writeScope = Trace.startSpan("writeTo", span);
try {
//这个就是我们写数据代码
one.writeTo(blockStream);
blockStream.flush();
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
// write to primary DN. If a failed or restarting node has already
// been recorded by the responder, the following call will have no
// effect. Pipeline recovery can handle only one node error at a
// time. If the primary node fails again during the recovery, it
// will be taken out then.
//PrimaryDatanode 指的是数据管道第一个datanode
//errorIndex = 0;
tryMarkPrimaryDatanodeFailed();
//抛异常
throw e;
} finally {
writeScope.close();
}
lastPacket = Time.monotonicNow();
// update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock();
if (bytesSent < tmpBytesSent) {
bytesSent = tmpBytesSent;
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}
// Is this block full?
if (one.isLastPacketInBlock()) {
// wait for the close packet has been acked
synchronized (dataQueue) {
while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) {
dataQueue.wait(1000);// wait for acks to arrive from datanodes
}
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}
endBlock();
}
if (progress != null) {
progress.progress();
}
// This is used by unit test to trigger race conditions.
if (artificialSlowdown != 0 && dfsClient.clientRunning) {
Thread.sleep(artificialSlowdown);
}
} catch (Throwable e) {
// Log warning if there was a real error.
if (restartingNodeIndex.get() == -1) {
DFSClient.LOG.warn("DataStreamer Exception", e);
}
if (e instanceof IOException) {
setLastException((IOException) e);
} else {
setLastException(new IOException("DataStreamer Exception: ", e));
}
//捕获到了异常
//把标识改为true
hasError = true;
if (errorIndex == -1 && restartingNodeIndex.get() == -1) {
// Not a datanode issue
streamerClosed = true;
}
} finally {
scope.close();
}
}
closeInternal();
/**
* Initialize for data streaming
*/
private void initDataStreaming() {
this.setName("DataStreamer for file " + src + " block " + block);
response = new ResponseProcessor(nodes);
//启动线程,看到这里是不是跟DataStreamer很相似
response.start();
stage = BlockConstructionStage.DATA_STREAMING;
}
ResponseProcessor(DatanodeInfo[] targets) {
this.targets = targets;
}
public void run() {
setName("ResponseProcessor for block " + block);
//ack响应队列,返回DataNode写数据的结果
PipelineAck ack = new PipelineAck();
TraceScope scope = NullScope.INSTANCE;
while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
// process responses from datanodes.
try {
// read an ack from the pipeline
long begin = Time.monotonicNow();
//读取下游的处理结果
ack.readFields(blockReplyStream);
long duration = Time.monotonicNow() - begin;
if (duration > dfsclientSlowLogThresholdMs && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
DFSClient.LOG.warn("Slow ReadProcessor read fields took " + duration + "ms (threshold="
+ dfsclientSlowLogThresholdMs + "ms); ack: " + ack + ", targets: "
+ Arrays.asList(targets));
} else if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient " + ack);
}
long seqno = ack.getSeqno();
// processes response status from datanodes.
for (int i = ack.getNumOfReplies() - 1; i >= 0 && dfsClient.clientRunning; i--) {
final Status reply = PipelineAck.getStatusFromHeader(ack.getHeaderFlag(i));
// Restart will not be treated differently unless it is
// the local node or the only one in the pipeline.
if (PipelineAck.isRestartOOBStatus(reply) && shouldWaitForRestart(i)) {
restartDeadline = dfsClient.getConf().datanodeRestartTimeout + Time.monotonicNow();
setRestartingNodeIndex(i);
String message = "A datanode is restarting: " + targets[i];
DFSClient.LOG.info(message);
throw new IOException(message);
}
// node error
if (reply != SUCCESS) {
setErrorIndex(i); // first bad datanode
throw new IOException("Bad response " + reply + " for block " + block
+ " from datanode " + targets[i]);
}
}
assert seqno != PipelineAck.UNKOWN_SEQNO : "Ack for unknown seqno should be a failed ack: "
+ ack;
if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack
continue;
}
// a success ack for a data packet
DFSPacket one;
synchronized (dataQueue) {
one = ackQueue.getFirst();
}
if (one.getSeqno() != seqno) {
throw new IOException("ResponseProcessor: Expecting seqno " + " for block " + block
+ one.getSeqno() + " but received " + seqno);
}
isLastPacketInBlock = one.isLastPacketInBlock();
// Fail the packet write for testing in order to force a
// pipeline recovery.
if (DFSClientFaultInjector.get().failPacket() && isLastPacketInBlock) {
failPacket = true;
throw new IOException("Failing the last packet for testing.");
}
// update bytesAcked
block.setNumBytes(one.getLastByteOffsetBlock());
synchronized (dataQueue) {
scope = Trace.continueSpan(one.getTraceSpan());
one.setTraceSpan(null);
lastAckedSeqno = seqno;
//如果ack发送成功那么就会把ackQueue里面packet移除来
ackQueue.removeFirst();
dataQueue.notifyAll();
one.releaseBuffer(byteArrayManager);
}
} catch (Exception e) {
if (!responderClosed) {
if (e instanceof IOException) {
setLastException((IOException) e);
}
hasError = true;
// If no explicit error report was received, mark the primary
// node as failed.
tryMarkPrimaryDatanodeFailed();
synchronized (dataQueue) {
dataQueue.notifyAll();
}
if (restartingNodeIndex.get() == -1) {
DFSClient.LOG.warn(
"DFSOutputStream ResponseProcessor exception " + " for block " + block, e);
}
responderClosed = true;
}
} finally {
scope.close();
}
}
}
void close() {
responderClosed = true;
//打断当前的线程,让线程快速退出
this.interrupt();
}
}
执行完DistributedFileSystem的create(..)方法后,将返回的DFSOutputStream对象传递给createWrappedOutputStream(...)方法中进行再次封装,这里引用了设计模式中的“装饰者”模式(Decorator Pattern)。
/**
* Wraps the stream in a CryptoOutputStream if the underlying file is encrypted.
*/
//返回的是HDFSOutputStream
public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos, FileSystem.Statistics statistics)
throws IOException {
return createWrappedOutputStream(dfsos, statistics, 0);
}
上面我们完成了create()方法的分析,主要完成了文件目录树添加INodeFIle、文件契约管理并且启动了DataStreamer和初始化DataStreamming,接下来才是真正进行数据写入。
开始写数据
根据写数据的流程图我们找到FSDataOutputStream的write()方法。
public void write(int b) throws IOException {
//out就是DFSOutputStream
out.write(b);
position++;
if (statistics != null) {
statistics.incrementBytesWritten(1);
}
}
通过DFSOutputStream的父类FSOutputSummer定位到write()方法
/** Write one byte */
public synchronized void write(int b) throws IOException {
buf[count++] = (byte)b;
if(count == buf.length) {
//写文件
flushBuffer();
}
}
/* Forces any buffered output bytes to be checksumed and written out to
* the underlying output stream.
*/
protected synchronized void flushBuffer() throws IOException {
//
flushBuffer(false, true);
}
protected synchronized int flushBuffer(boolean keep,
boolean flushPartial) throws IOException {
int bufLen = count;
int partialLen = bufLen % sum.getBytesPerChecksum();
int lenToFlush = flushPartial ? bufLen : bufLen - partialLen;
if (lenToFlush != 0) {
//核心的代码
//HDFS File -> Block(128M) -> packet(64K) -> chunk 512 + chunksum 4
writeChecksumChunks(buf, 0, lenToFlush);
if (!flushPartial || keep) {
count = partialLen;
System.arraycopy(buf, bufLen - count, buf, 0, count);
} else {
count = 0;
}
}
// total bytes left minus unflushed bytes left
return count - (bufLen - lenToFlush);
}
计算每个数据单元的校验和
/** Generate checksums for the given data chunks and output chunks & checksums
* to the underlying output stream.
*/
private void writeChecksumChunks(byte b[], int off, int len)
throws IOException {
//计算出来chunk的校验和
sum.calculateChunkedSums(b, off, len, checksum, 0);
//按照chunk的大小遍历数据
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
//往chunk写数据
writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
}
}
继续找到FSOutputSummer的实现类DFSOutputStream,定位到writeChunk(....)方法
// @see FSOutputSummer#writeChunk()
protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen)
throws IOException {
TraceScope scope = dfsClient.getPathTraceScope("DFSOutputStream#writeChunk", src);
try {
//写chunk
writeChunkImpl(b, offset, len, checksum, ckoff, cklen);
} finally {
scope.close();
}
}
private synchronized void writeChunkImpl(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen)
throws IOException {
dfsClient.checkOpen();
checkClosed();
if (len > bytesPerChecksum) {
throw new IOException("writeChunk() buffer size is " + len + " is larger than supported bytesPerChecksum "
+ bytesPerChecksum);
}
if (cklen != 0 && cklen != getChecksumSize()) {
throw new IOException(
"writeChunk() checksum size is supposed to be " + getChecksumSize() + " but found to be " + cklen);
}
if (currentPacket == null) {
//创建packet
currentPacket = createPacket(packetSize, chunksPerPacket, bytesCurBlock, currentSeqno++, false);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + currentPacket.getSeqno()
+ ", code-snippet__string">", packetSize=" + packetSize + ", chunksPerPacket=" + chunksPerPacket
+ ", bytesCurBlock=" + bytesCurBlock);
}
}
//往packet里面写chunk校验和
currentPacket.writeChecksum(checksum, ckoff, cklen);
//往packet里面写一个chunk
currentPacket.writeData(b, offset, len);
//累计计算一共写了多少个chunk,如果packet写满了127chunk,那就是一个完整的packet
currentPacket.incNumChunks();
//同理,如果packect写满了,会形成blcok
bytesCurBlock += len;//128M
// If packet is full, enqueue it for transmission
//当packet写满了或者block写满了
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || bytesCurBlock == blockSize) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" + currentPacket.getSeqno() + ", code-snippet_outer"> + src + ", bytesCurBlock=" + bytesCurBlock + ", blockSize=" + blockSize + ", appendChunk="
+ appendChunk);
}
//写满了一个packet,将packet加入队列,packet队列满了则阻塞住
waitAndQueueCurrentPacket();
// If the reopened file did not end at chunk boundary and the above
// write filled up its partial chunk. Tell the summer to generate full
// crc chunks from now on.
if (appendChunk && bytesCurBlock % bytesPerChecksum == 0) {
appendChunk = false;
resetChecksumBufSize();
}
if (!appendChunk) {
int psize = Math.min((int) (blockSize - bytesCurBlock), dfsClient.getConf().writePacketSize);
computePacketChunkSize(psize, bytesPerChecksum);
}
//
// if encountering a block boundary, send an empty packet to
// indicate the end of block and reset bytesCurBlock.
// 当前累计blcok的大小等于128M,说明我经写完了一个block了。
if (bytesCurBlock == blockSize) {
//一个block写完的时候,最后一个packet是一个空的packet。
currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
currentPacket.setSyncBlock(shouldSyncBlock);
//把当前的currentPacket 加入了dataQueue
waitAndQueueCurrentPacket();
bytesCurBlock = 0;
lastFlushOffset = 0;
}
}
}
本文剖析了HDFS写数据的流程,结合写数据的流程图,主要分七步走,涉及了文件目录和文件的创建,文件契约机制管理,核心写数据服务DataStreamer启动和DataStreaming初始化,数据单元chunk和block写入DataQueue队列,Block的申请以及管道的建立,数据写入容错等等。说实话我个人感觉HDFS写数据的流程是非常复杂的,内容很多,源码阅读起来非常绕,容易把自己绕晕了,这篇文章也只是写了个大概,避免文章篇幅过长,很多细节的代码是没办法体现出来,有兴趣的可以拉取源码细看,希望对大家有所启发。