Out了!原来HDFS中自带数据限流机制
前言
在大规模Hadoop生产集群中,DataNode IO负载很高,我们期望能有一种方式能够对DataNode IO进行限流,保证服务的正常运行。带着这样一种目的,本文基于Apache Hadoop3.3.0版本梳理一下HDFS内的已经实现的限流机制及其应用场景。
限流算法
限流的核心算法在DataTransferThrottler类,机制很简单:限制单位period内的数据byte数,如果发现传输的数据量过大,就wait到下一个period继续限流。这个类完全可以直接拎出来到我们自己的产品、项目中需要对数据限流的地方使用。我们直接看一下实现类:
public class DataTransferThrottler {
private final long period; //限流的时间窗口,一般hard-coded为500ms
private final long periodExtension; // 对一次传输最长限流的时间,一般hard-coded为3个period周期
private long bytesPerPeriod; //为按带宽大小,每个period内传输的数据量
private long curPeriodStart; // 该轮throttle的起始时间
private long curReserve; // 每个period内能传输的数据量,初始值bytesPerPeriod
private long bytesAlreadyUsed;
......
public DataTransferThrottler(long period, long bandwidthPerSec) {
this.curPeriodStart = monotonicNow(); //创建throttle时初始化
this.period = period;
this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000;
this.periodExtension = period*3;
}
//限流核心方法
public synchronized void throttle(long numOfBytes, Canceler canceler) {
if ( numOfBytes <= 0 ) {
return;
}
//先扣除即将传输的byte数
curReserve -= numOfBytes;
//预加上
bytesAlreadyUsed += numOfBytes;
//reserve小于0则进入限流逻辑
while (curReserve <= 0) {
if (canceler != null && canceler.isCancelled()) {
return;
}
long now = monotonicNow();
long curPeriodEnd = curPeriodStart + period;
//1)只有当前时间在本周期内才限流,执行wait
if ( now < curPeriodEnd ) {
// Wait for next period so that curReserve can be increased.
try {
//等待到下一个period start时间点
wait( curPeriodEnd - now );
} catch (InterruptedException e) {
// Abort throttle and reset interrupted status to make sure other
// interrupt handling higher in the call stack executes.
Thread.currentThread().interrupt();
break;
}
//2)now超过当前周期,但还在最大周期内,更新curPeriodStart和curReserve
} else if ( now < (curPeriodStart + periodExtension)) {
curPeriodStart = curPeriodEnd;
//curReserve 加上一个周期的byte额度
curReserve += bytesPerPeriod;
} else {
//3)此时时间已经超过最大period,表示已经多次没有throttle了,
// 重置curPeriodStart, 进入下一个1+3 period轮回,并清零之前的bytesAlreadyUsed
// discard the prev period. Throttler might not have
// been used for a long time.
curPeriodStart = now;
curReserve = bytesPerPeriod - bytesAlreadyUsed;
}
}
//传输成功结束,扣除已传数的byte数
bytesAlreadyUsed -= numOfBytes;
}
}
希望这个图(参考)能够加深一点对算法的理解。
应用场景
介绍完限流的核心算法,我们看看源码里哪些地方用到这个限流。
大概有四个地方用到了这个类:
TransferFsImage
JournalNodeSyncer
VolumeScanner
DataXceiverServer
我们一一学习一下。
1、TransferFsImage
字面意思是传输FSImage。StandbyNamenode在做checkpoint完成后需要将最新的FSImage通过HTTP PUT方式传输给Active Namenode(非HA模式也有类似逻辑,此处忽略)。此时对FSImage传输限流,可以降低对Active NameNode的影响。限流默认开启,带宽参数:
dfs.image.transfer.bandwidthPerSec,默认值52428800。大体调用流程如下:
StandbyCheckpointer.doCheckpoint()
--->TransferFsImage.uploadImageFromStorage()
-->uploadImage()
-->writeFileToPutRequest()
-->copyFileToStream()
ImageServlet.java
//静态构造throttler,根据配置的带宽决定是否开启 限流
public static DataTransferThrottler getThrottler(Configuration conf) {
long transferBandwidth = conf.getLongBytes(
DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY,
DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT);//默认52428800,说明该throttler默认开启了。
DataTransferThrottler throttler = null;
if (transferBandwidth > 0) {
throttler = new DataTransferThrottler(transferBandwidth);
}
return throttler;
}
TransferFsImage.java
private static void writeFileToPutRequest(Configuration conf,
HttpURLConnection connection, File imageFile, Canceler canceler)
throws IOException {
connection.setRequestProperty(Util.CONTENT_TYPE, "application/octet-stream");
connection.setRequestProperty(Util.CONTENT_TRANSFER_ENCODING, "binary");
OutputStream output = connection.getOutputStream();
FileInputStream input = new FileInputStream(imageFile);
try {
copyFileToStream(output, imageFile, input,
//获取throttler限流器
ImageServlet.getThrottler(conf), canceler);
} finally {
IOUtils.closeStream(input);
IOUtils.closeStream(output);
}
}
private static void copyFileToStream(OutputStream out, File localfile,
FileInputStream infile, DataTransferThrottler throttler,
Canceler canceler) throws IOException {
byte buf[] = new byte[IO_FILE_BUFFER_SIZE];
long total = 0;
int num = 1;
IOException ioe = null;
...
try {
while (num > 0) {
...
num = infile.read(buf);
if (num <= 0) {
break;
}
...
out.write(buf, 0, num);
total += num;
if (throttler != null) {
//对读入的byte数限流,canceler预留,表示是否跳过限流
throttler.throttle(num, canceler);
}
}
} catch (...) {
...
} finally {
...
}
2、JournalNodeSyncer
JournalNodeSyncer是JournalNode服务的一个线程服务,定期与其他JournalNode比较并同步editlog。当需要从别的JN下载Editlog Segment时用throttler来控制下载的速率。该限流默认关闭,带宽参数dfs.edit.log.transfer.bandwidthPerSec,默认值为0。具体流程如下:
JournalNode.startSyncer()
--->JournalNodeSyncer.start()
-->startSyncJournalsDaemon()
-->syncJournals()
-->syncWithJournalAtIndex()
-->getMissingLogSegments()
-->downloadMissingLogSegment()
--->Util.doGetUrl(...throttler)
-->receiveFile(...throttler)
3、VolumeScanner
VolumeScanner是datanode内部扫描单个磁盘的线程,扫描疑似坏block及其元数据。该处的throttler默认开启,默认带宽为1048576,参数配置dfs.block.scanner.volume.bytes.per.second,具体流程如下:
DataNode.startDataNode()
--->BlockPoolManager.refreshNamenodes()
-->doRefreshNamenodes() //创建BPOfferService并启动
-->startAll();
--->BPOfferService.start() //维护BPServiceActor列表
--->BPServiceActor.start() //启动actor线程,是实际与NN心跳交互的线程
-->connectToNNAndHandshake()
--->BPOfferService.verifyAndSetNamespaceInfo()
--->DataNode.initBlockPool(this);
-->initStorage(nsInfo); //初始化存储,创建FsDatasetImpl
--->FSDatasetImpl.addVolume() //加入磁盘
--->FsVolumeList.activateVolume() // 每个盘创建一个VolumeScanner并启动
--->BlockScanner.addVolumeScanner() //为每个盘创建一个VolumeScanner并启动
--->VolumeScanner.start() //启动线程
-->runLoop()
-->scanBlock() //扫描可疑的block文件
--->BlockSender.sendBlock(nullStream, null, throttler)//读取并发送到/dev/null
-->doSendBlock()
-->sendPacket()
4、DataXceiverServer
该类是是DataNode的核心服务,用来监听流式接口与请求(send\recieve block)、每当有Client通过Sender类发起流式接口请求时,就创建一个DataXceiver对象用于响应请求并执行操作,它没有采用Hadoop的IPC机制,而是直接参考Java Socket实现。这里面的限流机制主要用在三方面:
balanceThrottler
Balancer中用来限制拷贝block的速率,继承DataTransferThrottler,在其基础上加了Semaphore机制来控制最大线程数。带宽配置参数:dfs.datanode.balance.bandwidthPerSec,默认100 * 1024*1024
writeThrottler
datanode中写block到pipeline中的限流,带宽配置参数:dfs.datanode.data.transfer.bandwidthPerSec,默认0表示不开启限流。
transferThrottler
datanode复制block到指定DataNode的限流,(一般是在DataNode故障时,需要新的DataNode节点替换异常节点,DFSClient调用)。带宽配置参数:dfs.datanode.data.write.bandwidthPerSec,默认0表示不开启限流。
总结
1、在Hadoop 源码里DataTransferThrottler类很早就有,但设计之初只用对FSImage、EditLog、Balancer等非业务流程做限流,直到3.3.0版本才将DataNode的write和transfer block这两种核心操作纳入限流管理,之前只能通过HDFS Quota机制“曲线救国”。
2、readBlock操作目前还没有限流机制,传入的throttler是null。可能是考虑读请求对datanode网络产生的压力不是造成瓶颈的核心因素。
3、目前的限流只是集群层面统一的配置,不够灵活,缺乏分租户IO的隔离和控制。为此,我们曾自研了一套基于YARN队列的IO限速功能,将IO作为等同于内存、CPU一样的资源,可以按队列进行资源的隔离和限制。有机会的话下次介绍下。
4、在HDFS中,RPC限流跟数据限流同样重要,已实现的数据限流机制对实现RPC限流有一定的借鉴意义。
参考
《Hadoop 2.x HDFS源码剖析》
《深入剖析Hadoop HDFS》