vlambda博客
学习文章列表

Out了!原来HDFS中自带数据限流机制

前言

       在大规模Hadoop生产集群中,DataNode IO负载很高,我们期望能有一种方式能够对DataNode IO进行限流,保证服务的正常运行。带着这样一种目的,本文基于Apache Hadoop3.3.0版本梳理一下HDFS内的已经实现的限流机制及其应用场景。

限流算法

      限流的核心算法在DataTransferThrottler类,机制很简单:限制单位period内的数据byte数,如果发现传输的数据量过大,就wait到下一个period继续限流。这个类完全可以直接拎出来到我们自己的产品、项目中需要对数据限流的地方使用。我们直接看一下实现类:


  public class DataTransferThrottler {  private final long period;  //限流的时间窗口,一般hard-coded为500ms   private final long periodExtension; // 对一次传输最长限流的时间,一般hard-coded为3个period周期  private long bytesPerPeriod;  //为按带宽大小,每个period内传输的数据量  private long curPeriodStart;  // 该轮throttle的起始时间  private long curReserve;      // 每个period内能传输的数据量,初始值bytesPerPeriod private long bytesAlreadyUsed;  ...... public DataTransferThrottler(long period, long bandwidthPerSec) {    this.curPeriodStart = monotonicNow(); //创建throttle时初始化 this.period = period; this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000; this.periodExtension = period*3; }//限流核心方法public synchronized void throttle(long numOfBytes, Canceler canceler) { if ( numOfBytes <= 0 ) { return; }  //先扣除即将传输的byte数 curReserve -= numOfBytes;    //预加上 bytesAlreadyUsed += numOfBytes;//reserve小于0则进入限流逻辑 while (curReserve <= 0) { if (canceler != null && canceler.isCancelled()) { return; } long now = monotonicNow(); long curPeriodEnd = curPeriodStart + period;     //1)只有当前时间在本周期内才限流,执行wait if ( now < curPeriodEnd ) { // Wait for next period so that curReserve can be increased. try {        //等待到下一个period start时间点 wait( curPeriodEnd - now ); } catch (InterruptedException e) { // Abort throttle and reset interrupted status to make sure other // interrupt handling higher in the call stack executes. Thread.currentThread().interrupt(); break; }        //2)now超过当前周期,但还在最大周期内,更新curPeriodStart和curReserve  } else if ( now < (curPeriodStart + periodExtension)) { curPeriodStart = curPeriodEnd;        //curReserve 加上一个周期的byte额度 curReserve += bytesPerPeriod; } else {         //3)此时时间已经超过最大period,表示已经多次没有throttle了,        // 重置curPeriodStart, 进入下一个1+3 period轮回,并清零之前的bytesAlreadyUsed // discard the prev period. Throttler might not have // been used for a long time. curPeriodStart = now; curReserve = bytesPerPeriod - bytesAlreadyUsed; } } //传输成功结束,扣除已传数的byte数 bytesAlreadyUsed -= numOfBytes; }}

希望这个图(参考)能够加深一点对算法的理解


应用场景

介绍完限流的核心算法,我们看看源码里哪些地方用到这个限流。

大概有四个地方用到了这个类:

  • TransferFsImage

  • JournalNodeSyncer

  • VolumeScanner

  • DataXceiverServer

我们一一学习一下。


1TransferFsImage

    字面意思是传输FSImage。StandbyNamenode在做checkpoint完成后需要将最新的FSImage通过HTTP PUT方式传输给Active Namenode(非HA模式也有类似逻辑,此处忽略)。此时对FSImage传输限流,可以降低对Active NameNode的影响。限流默认开启,带宽参数:

dfs.image.transfer.bandwidthPerSec,默认值52428800。大体调用流程如下:

StandbyCheckpointer.doCheckpoint() --->TransferFsImage.uploadImageFromStorage()    -->uploadImage()    -->writeFileToPutRequest()    -->copyFileToStream()
 ImageServlet.java //静态构造throttler,根据配置的带宽决定是否开启 限流 public static DataTransferThrottler getThrottler(Configuration conf) { long transferBandwidth = conf.getLongBytes( DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY,        DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT);//默认52428800,说明该throttler默认开启了。 DataTransferThrottler throttler = null; if (transferBandwidth > 0) { throttler = new DataTransferThrottler(transferBandwidth); } return throttler;  }
 TransferFsImage.java private static void writeFileToPutRequest(Configuration conf, HttpURLConnection connection, File imageFile, Canceler canceler) throws IOException { connection.setRequestProperty(Util.CONTENT_TYPE, "application/octet-stream"); connection.setRequestProperty(Util.CONTENT_TRANSFER_ENCODING, "binary"); OutputStream output = connection.getOutputStream(); FileInputStream input = new FileInputStream(imageFile); try {    copyFileToStream(output, imageFile, input, //获取throttler限流器 ImageServlet.getThrottler(conf), canceler); } finally { IOUtils.closeStream(input); IOUtils.closeStream(output); } }  private static void copyFileToStream(OutputStream out, File localfile, FileInputStream infile, DataTransferThrottler throttler, Canceler canceler) throws IOException { byte buf[] = new byte[IO_FILE_BUFFER_SIZE]; long total = 0; int num = 1; IOException ioe = null;... try { while (num > 0) { ... num = infile.read(buf); if (num <= 0) { break; } ... out.write(buf, 0, num); total += num; if (throttler != null) {        //对读入的byte数限流,canceler预留,表示是否跳过限流 throttler.throttle(num, canceler); } } } catch (...) { ... } finally { ... }

2JournalNodeSyncer

    JournalNodeSyncer是JournalNode服务的一个线程服务,定期与其他JournalNode比较并同步editlog。当需要从别的JN下载Editlog Segment时用throttler来控制下载的速率。该限流默认关闭,带宽参数dfs.edit.log.transfer.bandwidthPerSec,默认值为0。具体流程如下:

JournalNode.startSyncer()--->JournalNodeSyncer.start()    -->startSyncJournalsDaemon()    -->syncJournals()    -->syncWithJournalAtIndex()    -->getMissingLogSegments()    -->downloadMissingLogSegment()--->Util.doGetUrl(...throttler)    -->receiveFile(...throttler)

3VolumeScanner

       VolumeScanner是datanode内部扫描单个磁盘的线程,扫描疑似坏block及其元数据。该处的throttler默认开启,默认带宽为1048576,参数配置dfs.block.scanner.volume.bytes.per.second,具体流程如下:


DataNode.startDataNode()--->BlockPoolManager.refreshNamenodes()   -->doRefreshNamenodes() //创建BPOfferService并启动   -->startAll();--->BPOfferService.start() //维护BPServiceActor列表--->BPServiceActor.start() //启动actor线程,是实际与NN心跳交互的线程    -->connectToNNAndHandshake()--->BPOfferService.verifyAndSetNamespaceInfo()--->DataNode.initBlockPool(this);    -->initStorage(nsInfo); //初始化存储,创建FsDatasetImpl--->FSDatasetImpl.addVolume() //加入磁盘--->FsVolumeList.activateVolume() // 每个盘创建一个VolumeScanner并启动--->BlockScanner.addVolumeScanner() //为每个盘创建一个VolumeScanner并启动--->VolumeScanner.start() //启动线程    -->runLoop()     -->scanBlock() //扫描可疑的block文件--->BlockSender.sendBlock(nullStreamnullthrottler)//读取并发送到/dev/null -->doSendBlock() -->sendPacket()

4DataXceiverServer

    该类是是DataNode的核心服务,用来监听流式接口与请求(send\recieve block)、每当有Client通过Sender类发起流式接口请求时,就创建一个DataXceiver对象用于响应请求并执行操作,它没有采用Hadoop的IPC机制,而是直接参考Java Socket实现。这里面的限流机制主要用在三方面:

  • balanceThrottler

Balancer中用来限制拷贝block的速率,继承DataTransferThrottler,在其基础上加了Semaphore机制来控制最大线程数。带宽配置参数:dfs.datanode.balance.bandwidthPerSec,默认100 * 1024*1024

  • writeThrottler

datanode中写block到pipeline中的限流,带宽配置参数:dfs.datanode.data.transfer.bandwidthPerSec,默认0表示不开启限流。

  • transferThrottler

datanode复制block到指定DataNode的限流,(一般是在DataNode故障时,需要新的DataNode节点替换异常节点,DFSClient调用)。带宽配置参数:dfs.datanode.data.write.bandwidthPerSec,默认0表示不开启限流。


【总结也很重要!!!】

1、在Hadoop 源码里DataTransferThrottler类很早就有,但设计之初只用对FSImage、EditLog、Balancer等非业务流程做限流,直到3.3.0版本才将DataNode的write和transfer block这两种核心操作纳入限流管理,之前只能通过HDFS Quota机制“曲线救国”。

2、readBlock操作目前还没有限流机制,传入的throttler是null。可能是考虑读请求对datanode网络产生的压力不是造成瓶颈的核心因素。

3、目前的限流只是集群层面统一的配置,不够灵活,缺乏分租户IO的隔离和控制。为此,我们曾自研了一套基于YARN队列的IO限速功能,将IO作为等同于内存、CPU一样的资源,可以按队列进行资源的隔离和限制。有机会的话下次介绍下。

4、在HDFS中,RPC限流跟数据限流同样重要,已实现的数据限流机制对实现RPC限流有一定的借鉴意义。

参考

《Hadoop 2.x HDFS源码剖析》

《深入剖析Hadoop HDFS》