Out了!原来HDFS中自带数据限流机制
前言
在大规模Hadoop生产集群中,DataNode IO负载很高,我们期望能有一种方式能够对DataNode IO进行限流,保证服务的正常运行。带着这样一种目的,本文基于Apache Hadoop3.3.0版本梳理一下HDFS内的已经实现的限流机制及其应用场景。
限流算法
限流的核心算法在DataTransferThrottler类,机制很简单:限制单位period内的数据byte数,如果发现传输的数据量过大,就wait到下一个period继续限流。这个类完全可以直接拎出来到我们自己的产品、项目中需要对数据限流的地方使用。我们直接看一下实现类:
public class DataTransferThrottler {private final long period; //限流的时间窗口,一般hard-coded为500msprivate final long periodExtension; // 对一次传输最长限流的时间,一般hard-coded为3个period周期private long bytesPerPeriod; //为按带宽大小,每个period内传输的数据量private long curPeriodStart; // 该轮throttle的起始时间private long curReserve; // 每个period内能传输的数据量,初始值bytesPerPeriodprivate long bytesAlreadyUsed;......public DataTransferThrottler(long period, long bandwidthPerSec) {this.curPeriodStart = monotonicNow(); //创建throttle时初始化this.period = period;this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000;this.periodExtension = period*3;}//限流核心方法public synchronized void throttle(long numOfBytes, Canceler canceler) {if ( numOfBytes <= 0 ) {return;}//先扣除即将传输的byte数curReserve -= numOfBytes;//预加上bytesAlreadyUsed += numOfBytes;//reserve小于0则进入限流逻辑while (curReserve <= 0) {if (canceler != null && canceler.isCancelled()) {return;}long now = monotonicNow();long curPeriodEnd = curPeriodStart + period;//1)只有当前时间在本周期内才限流,执行waitif ( now < curPeriodEnd ) {// Wait for next period so that curReserve can be increased.try {//等待到下一个period start时间点wait( curPeriodEnd - now );} catch (InterruptedException e) {// Abort throttle and reset interrupted status to make sure other// interrupt handling higher in the call stack executes.Thread.currentThread().interrupt();break;}//2)now超过当前周期,但还在最大周期内,更新curPeriodStart和curReserve} else if ( now < (curPeriodStart + periodExtension)) {curPeriodStart = curPeriodEnd;//curReserve 加上一个周期的byte额度curReserve += bytesPerPeriod;} else {//3)此时时间已经超过最大period,表示已经多次没有throttle了,// 重置curPeriodStart, 进入下一个1+3 period轮回,并清零之前的bytesAlreadyUsed// discard the prev period. Throttler might not have// been used for a long time.curPeriodStart = now;curReserve = bytesPerPeriod - bytesAlreadyUsed;}}//传输成功结束,扣除已传数的byte数bytesAlreadyUsed -= numOfBytes;}}
希望这个图(参考)能够加深一点对算法的理解。
应用场景
介绍完限流的核心算法,我们看看源码里哪些地方用到这个限流。
大概有四个地方用到了这个类:
TransferFsImage
JournalNodeSyncer
VolumeScanner
DataXceiverServer
我们一一学习一下。
1、TransferFsImage
字面意思是传输FSImage。StandbyNamenode在做checkpoint完成后需要将最新的FSImage通过HTTP PUT方式传输给Active Namenode(非HA模式也有类似逻辑,此处忽略)。此时对FSImage传输限流,可以降低对Active NameNode的影响。限流默认开启,带宽参数:
dfs.image.transfer.bandwidthPerSec,默认值52428800。大体调用流程如下:
StandbyCheckpointer.doCheckpoint()--->TransferFsImage.uploadImageFromStorage()-->uploadImage()-->writeFileToPutRequest()-->copyFileToStream()
ImageServlet.java//静态构造throttler,根据配置的带宽决定是否开启 限流public static DataTransferThrottler getThrottler(Configuration conf) {long transferBandwidth = conf.getLongBytes(DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY,DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT);//默认52428800,说明该throttler默认开启了。DataTransferThrottler throttler = null;if (transferBandwidth > 0) {throttler = new DataTransferThrottler(transferBandwidth);}return throttler;}
TransferFsImage.javaprivate static void writeFileToPutRequest(Configuration conf,HttpURLConnection connection, File imageFile, Canceler canceler)throws IOException {connection.setRequestProperty(Util.CONTENT_TYPE, "application/octet-stream");connection.setRequestProperty(Util.CONTENT_TRANSFER_ENCODING, "binary");OutputStream output = connection.getOutputStream();FileInputStream input = new FileInputStream(imageFile);try {copyFileToStream(output, imageFile, input,//获取throttler限流器ImageServlet.getThrottler(conf), canceler);} finally {IOUtils.closeStream(input);IOUtils.closeStream(output);}}private static void copyFileToStream(OutputStream out, File localfile,FileInputStream infile, DataTransferThrottler throttler,Canceler canceler) throws IOException {byte buf[] = new byte[IO_FILE_BUFFER_SIZE];long total = 0;int num = 1;IOException ioe = null;...try {while (num > 0) {...num = infile.read(buf);if (num <= 0) {break;}...out.write(buf, 0, num);total += num;if (throttler != null) {//对读入的byte数限流,canceler预留,表示是否跳过限流throttler.throttle(num, canceler);}}} catch (...) {...} finally {...}
2、JournalNodeSyncer
JournalNodeSyncer是JournalNode服务的一个线程服务,定期与其他JournalNode比较并同步editlog。当需要从别的JN下载Editlog Segment时用throttler来控制下载的速率。该限流默认关闭,带宽参数dfs.edit.log.transfer.bandwidthPerSec,默认值为0。具体流程如下:
JournalNode.startSyncer()--->JournalNodeSyncer.start()-->startSyncJournalsDaemon()-->syncJournals()-->syncWithJournalAtIndex()-->getMissingLogSegments()-->downloadMissingLogSegment()--->Util.doGetUrl(...throttler)-->receiveFile(...throttler)
3、VolumeScanner
VolumeScanner是datanode内部扫描单个磁盘的线程,扫描疑似坏block及其元数据。该处的throttler默认开启,默认带宽为1048576,参数配置dfs.block.scanner.volume.bytes.per.second,具体流程如下:
DataNode.startDataNode()--->BlockPoolManager.refreshNamenodes()-->doRefreshNamenodes() //创建BPOfferService并启动-->startAll();--->BPOfferService.start() //维护BPServiceActor列表--->BPServiceActor.start() //启动actor线程,是实际与NN心跳交互的线程-->connectToNNAndHandshake()--->BPOfferService.verifyAndSetNamespaceInfo()--->DataNode.initBlockPool(this);-->initStorage(nsInfo); //初始化存储,创建FsDatasetImpl--->FSDatasetImpl.addVolume() //加入磁盘--->FsVolumeList.activateVolume() // 每个盘创建一个VolumeScanner并启动--->BlockScanner.addVolumeScanner() //为每个盘创建一个VolumeScanner并启动--->VolumeScanner.start() //启动线程-->runLoop()-->scanBlock() //扫描可疑的block文件--->BlockSender.sendBlock(nullStream, null, throttler)//读取并发送到/dev/null-->doSendBlock()-->sendPacket()
4、DataXceiverServer
该类是是DataNode的核心服务,用来监听流式接口与请求(send\recieve block)、每当有Client通过Sender类发起流式接口请求时,就创建一个DataXceiver对象用于响应请求并执行操作,它没有采用Hadoop的IPC机制,而是直接参考Java Socket实现。这里面的限流机制主要用在三方面:
balanceThrottler
Balancer中用来限制拷贝block的速率,继承DataTransferThrottler,在其基础上加了Semaphore机制来控制最大线程数。带宽配置参数:dfs.datanode.balance.bandwidthPerSec,默认100 * 1024*1024
writeThrottler
datanode中写block到pipeline中的限流,带宽配置参数:dfs.datanode.data.transfer.bandwidthPerSec,默认0表示不开启限流。
transferThrottler
datanode复制block到指定DataNode的限流,(一般是在DataNode故障时,需要新的DataNode节点替换异常节点,DFSClient调用)。带宽配置参数:dfs.datanode.data.write.bandwidthPerSec,默认0表示不开启限流。
总结
1、在Hadoop 源码里DataTransferThrottler类很早就有,但设计之初只用对FSImage、EditLog、Balancer等非业务流程做限流,直到3.3.0版本才将DataNode的write和transfer block这两种核心操作纳入限流管理,之前只能通过HDFS Quota机制“曲线救国”。
2、readBlock操作目前还没有限流机制,传入的throttler是null。可能是考虑读请求对datanode网络产生的压力不是造成瓶颈的核心因素。
3、目前的限流只是集群层面统一的配置,不够灵活,缺乏分租户IO的隔离和控制。为此,我们曾自研了一套基于YARN队列的IO限速功能,将IO作为等同于内存、CPU一样的资源,可以按队列进行资源的隔离和限制。有机会的话下次介绍下。
4、在HDFS中,RPC限流跟数据限流同样重要,已实现的数据限流机制对实现RPC限流有一定的借鉴意义。
参考
《Hadoop 2.x HDFS源码剖析》
《深入剖析Hadoop HDFS》
