Apache Iceberg入门教程系列之小文件合并
本文是《Apache Iceberg 入门教程![1]》(点击文末阅读原文直达)专题的第 1 篇,共 9 篇。
在 这篇文章中我们分析了 Apache Iceberg 写数据的源码。如下是我们使用 Spark 写两次数据到 Iceberg 表的数据目录布局( 测试代码[2]):
因为我们每次写入的数据就几条,Iceberg 每个分区写文件的时候都是产生新的文件,这就导致底层文件系统里面产生了很多大小才几KB的文件。如果我们是使用 Spark Streaming 的方式7*24小时不断地往 Apache Iceberg 里面写数据,这将产生大量的小文件。
使用 Iceberg 来压缩文件
值得高兴的是,Apache Iceberg 给我们提供了相关 Actions API 来合并这些小文件,具体如下:
运行完上面代码之后,可以将 Iceberg 的小文件进行合并,得到的新数据目录如下:
对比最新的结果可以得出:
•ts_year=2020/id_bucket=0 新增了名为 00001-1-da4e85fa-096d-4b74-9b3c-a260e425385d-00001.parquet 的数据文件,这个其实就是把之前四个文件进行和合并得到的新文件;•ts_year=2020/id_bucket=1 新增了名为 00000-0-1d5871d5-08ac-4e43-a589-70a5d50dd4d2-00001.parquet 的数据文件,这个其实就是把之前两个文件进行和合并得到的新文件。
Iceberg 小文件合并原理
Iceberg 小文件合并是在 org.apache.iceberg.actions.RewriteDataFilesAction 类里面实现的。小文件合并其实是通过 Spark 并行计算的,这也就是上面 DEMO 初始化了一个 SparkSession 的原因。我们可以通过 RewriteDataFilesAction 类的 targetSizeInBytes 方法来设置输出的合并文件大小。
注意:经过我多次测试,有以下结论:
•如果你设置的 targetSizeInBytes 值比底层文件的大小都小,其实原来大于这个尺寸的文件并没被拆成一个个大小最多为 targetSizeInBytes 的文件,而相当于把原来每个文件读出来,再写到新文件里面。•如果你设置的 targetSizeInBytes 值比底层文件的大小要大,这时候合并才真正生效。这个看起来像是把文件大小小于 targetSizeInBytes 的文件进行了合并,大于这个值的文件好像就是简单读出来,再写到新文件里面。
当我们调用了 execute() 方法,RewriteDataFilesAction 类会先创建出一个 org.apache.iceberg.DataTableScan,然后会把对应表的最新快照(Snapshot)拿出来,紧接着拿出这个快照对应的底层所有数据文件。然后按照分区 Key 进行分组(group),同一个分区的文件放到一起,并将这些信息放到 Map<StructLikeWrapper, Collection
由于分区里面可能存在一个文件,这时候就没必要去执行文件合并,这时候可以去掉这部分分区,得到了 Map<StructLikeWrapper, Collection
如果 filteredGroupedTasks 不为空,则对每个分区里面的文件进行 split 和 combine 操作,如下:4 combinedScanTasks 结构如下:5 combinedScanTasks 里面其实就是封装了 BaseCombinedScanTask 类,这个类里面的 task 就是标识哪些 Iceberg 的数据文件需要合并到新文件里面。得到 combinedScanTasks 之后会构造出一个 RDD:
JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());
然后最终会调用 taskRDD 的 map 方法,遍历 combinedScanTasks 里面的 task,将 task 里面对应的 Iceberg 读出来,再写到新文件里面:
public List<DataFile> rewriteDataForTasks(JavaRDD<CombinedScanTask> taskRDD) {
JavaRDD<TaskResult> taskCommitRDD = taskRDD.map(this::rewriteDataForTask);
return taskCommitRDD.collect().stream()
.flatMap(taskCommit -> Arrays.stream(taskCommit.files()))
.collect(Collectors.toList());
rewriteDataForTask 的实现如下:
private TaskResult rewriteDataForTask(CombinedScanTask task) throws Exception {
TaskContext context = TaskContext.get();
int partitionId = context.partitionId();
long taskId = context.taskAttemptId();
RowDataReader dataReader = new RowDataReader(
task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive);
SparkAppenderFactory appenderFactory = new SparkAppenderFactory(
properties, schema, SparkSchemaUtil.convert(schema));
OutputFileFactory fileFactory = new OutputFileFactory(
spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
BaseWriter writer;
if (spec.fields().isEmpty()) {
writer = new UnpartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE);
} else {
writer = new PartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE, schema);
}
try {
while (dataReader.next()) {
InternalRow row = dataReader.get();
writer.write(row);
}
dataReader.close();
dataReader = null;
return writer.complete();
} catch (Throwable originalThrowable) {
......
}
}
rewriteDataForTasks 执行完会返回新创建文件的路径,最后会写到新的快照里面。在快照里面会将新建的文件表示为 org.apache.iceberg.ManifestEntry.Status#ADDED,上一个快照里面的文件标记为 org.apache.iceberg.ManifestEntry.Status#DELETED。
好了,本文就分享到这里。
引用链接
[1]
Apache Iceberg 入门教程!: https://www.iteblog.com/archives/series/apache-iceberg-tutorial/
[2]
测试代码: https://www.iteblog.com/archives/9888.html#_Spark2_Apache_Iceberg
Java与大数据架构
7年老码农,10W+关注者。【Java与大数据架构】全面分享Java编程、Spark、Flink、Kafka、Elasticsearch、数据湖等干货。欢迎扫码关注!