Apache hudi 源码分析 - 写时优化小文件问题
Flink : 0.12 (引擎版本影响不大)
hudi : 0.11.0-SNAPSHOT
Time: 2022/03/14
spark 适配同理
整体流程
flink 对每一行数据进行处理,构造 recorderKey(包含分区路径)
通过 Hudi Metadata 获取指定分区路径所有满足条件的小文件(fileId)
对小文件进行构造生成 AssignState,通过计算历史平均每一行数据的大小,计算每个小文件还能再存入多少条数据。将 AssignState 用分区路径缓存。
对每行数据重复上述操作,如果是已经缓存过的分区路径,直接获取 AssginState,更新每个小文件剩余存入个数
如果小文件剩余容量不足,就会创建新的 fileId 进行写入
待 checkpoint 触发写出
源码分析
BucketAssignFunction.java
flink processElement,hudi 会通过其计算的每一条数据的 recordKey 得到 partitionPath
private HoodieRecordLocation getNewRecordLocation(String partitionPath) {// // 通过 recordKey 得到 partitionPath,获取对应分区的小文件信息,观察下一个代码块final BucketInfo bucketInfo = this.bucketAssigner.addInsert(partitionPath);final HoodieRecordLocation location;switch (bucketInfo.getBucketType()) {case INSERT:// This is an insert bucket, use HoodieRecordLocation instant time as "I".// Downstream operators can then check the instant time to know whether// a record belongs to an insert bucket.location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix());break;case UPDATE:location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix());break;default:throw new AssertionError();}return location;}
BucketAssigner.java
public BucketInfo addInsert(String partitionPath) {// 获取小文件SmallFileAssign smallFileAssign = getSmallFileAssign(partitionPath);// assign 判断小文件是否还能再分配,不能超过 totalAssginif (smallFileAssign != null && smallFileAssign.assign()) {return new BucketInfo(BucketType.UPDATE, smallFileAssign.getFileId(), partitionPath);}// 下面就是创建新的 fileId 写入...}private synchronized SmallFileAssign getSmallFileAssign(String partitionPath) {// 判断是否缓存了对应分区路径的小文件信息if (smallFileAssignMap.containsKey(partitionPath)) {return smallFileAssignMap.get(partitionPath);}// writeProfile.getSmallFiles 获取小文件,观察下一个代码块,然后回来List<SmallFile> smallFiles = smallFilesOfThisTask(writeProfile.getSmallFiles(partitionPath));if (smallFiles.size() > 0) {LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);// 重点关注:////注意这里:小文件返回后构造 assignState, 在初始化 SmallFileAssignState 时,会通过计算历史的平均每行数据的大小,如果没有,默认 1024 byte 作为每一行大小。// 然后 (文件最大配置 - 小文件大小) / 平均行大小 = 这个小文件还能分配的行数(totalUnassigned// 后续的 processElement 每次就会调用 SmallFileAssignState assigned 分配(如果 partitionUrl 相同),直到分配完//// // 重点关注:SmallFileAssignState[] states = smallFiles.stream().map(smallFile -> new SmallFileAssignState(config.getParquetMaxFileSize(), smallFile, writeProfile.getAvgSize())).toArray(SmallFileAssignState[]::new);SmallFileAssign assign = new SmallFileAssign(states);smallFileAssignMap.put(partitionPath, assign);return assign;}smallFileAssignMap.put(partitionPath, null);return null;}
WriteProfile.java
public synchronized List<SmallFile> getSmallFiles(String partitionPath) {// lookup the cache firstif (smallFilesMap.containsKey(partitionPath)) {return smallFilesMap.get(partitionPath);}List<SmallFile> smallFiles = new ArrayList<>();if (config.getParquetSmallFileLimit() <= 0) {this.smallFilesMap.put(partitionPath, smallFiles);return smallFiles;}// 获取小文件,调用 smallFilesProfilesmallFiles = smallFilesProfile(partitionPath);this.smallFilesMap.put(partitionPath, smallFiles);return smallFiles;}// 非 MOR 表实现,MOR 表调用 DeltaWriteProfile.smallFIleProfileprotected List<SmallFile> smallFilesProfile(String partitionPath) {// smallFiles only for partitionPathList<SmallFile> smallFileLocations = new ArrayList<>();HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants();if (!commitTimeline.empty()) { // if we have some commitsHoodieInstant latestCommitTime = commitTimeline.lastInstant().get();// 获取指定分区下的所有文件(使用 Metadata 获取的 fsView)List<HoodieBaseFile> allFiles = fsView.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());for (HoodieBaseFile file : allFiles) {// 过滤出满足条件的文件// 小于 hoodie.parquet.small.file.limit 默认 100M,并且大于 0 的文件if (file.getFileSize() < config.getParquetSmallFileLimit() && file.getFileSize() > 0) {String filename = file.getFileName();SmallFile sf = new SmallFile();sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));sf.sizeBytes = file.getFileSize();smallFileLocations.add(sf);}}}return smallFileLocations;}
如果是 MOR 表,DeltaWriteProfile.java
@Overrideprotected List<SmallFile> smallFilesProfile(String partitionPath) {// smallFiles only for partitionPathList<SmallFile> smallFileLocations = new ArrayList<>();// Init here since this class (and member variables) might not have been initializedHoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants();// Find out all eligible small file slicesif (!commitTimeline.empty()) {HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();// find the smallest file in partition and append to itList<FileSlice> allSmallFileSlices = new ArrayList<>();// If we can index log files, we can add more inserts to log files for fileIds including those under// pending compaction.// 获取 base_file + log_file 的文件偏List<FileSlice> allFileSlices = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true).collect(Collectors.toList());for (FileSlice fileSlice : allFileSlices) {// 判断是否满足小文件的条件// (baseFileSize + totalLogFileSize * ratio) < hoodie.parquet.max.file.size(120M)// 这里的 ratio 是 hoodie.logfile.to.parquet.compression.ratio,默认 0.35if (isSmallFile(fileSlice)) {allSmallFileSlices.add(fileSlice);}}// Create SmallFiles from the eligible file slicesfor (FileSlice smallFileSlice : allSmallFileSlices) {SmallFile sf = new SmallFile();if (smallFileSlice.getBaseFile().isPresent()) {// TODO : Move logic of file name, file id, base commit time handling inside file sliceString filename = smallFileSlice.getBaseFile().get().getFileName();sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));sf.sizeBytes = getTotalFileSize(smallFileSlice);smallFileLocations.add(sf);} else {smallFileSlice.getLogFiles().findFirst().ifPresent(logFile -> {// in case there is something error, and the file slice has no log filesf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),FSUtils.getFileIdFromLogPath(logFile.getPath()));sf.sizeBytes = getTotalFileSize(smallFileSlice);smallFileLocations.add(sf);});}}}return smallFileLocations;}
注意点
解决小文件的方式不是追加写文件,而是使用相同的 fileId 生成新的版本号,所以可能会有文件数并没有降低的疑问。设置合适的版本历史和 clean service 自动清理历史版本数据
