vlambda博客
学习文章列表

读书笔记《distributed-data-systems-with-azure-databricks》第四章三角洲湖上的蔚蓝数据库

Chapter 5: Introducing Delta Engine

Delta Engine 是 Delta Lake 的查询引擎,默认包含在 Azure Databricks 中。由于优化的布局和改进的数据索引,它的构建方式允许我们以多种方式优化 Delta Lake 中的数据处理。这些优化操作包括使用动态文件修剪 (DFP)、Z-Ordering、Auto Compaction、ad hoc processing 和更多的。这些优化操作的额外好处是,其中一些操作以自动方式发生,只需使用 Delta Lake。您将以多种方式使用 Delta 引擎优化。

在本章中,你将学习如何利用 Delta Lake 优化 Azure Databricks 中的 Delta Lake ETL。以下是我们将集中讨论的主题:

  • 使用 Delta Engine 优化文件管理
  • 使用 DFP 优化查询
  • 使用布隆过滤器
  • 优化连接性能

Delta Engine 是关于优化的,文件管理可能是我们的主要缺点之一。在本章中,您将学习如何使用 Delta Engine 的功能来提高 ETL 的性能。

Technical requirements

为了能够在本章中使用 Delta Engine,您需要订阅 Azure Databricks。

Optimizing file management with Delta Engine

Delta Engine 允许改进对 Delta Lake 中文件的管理,从而提高查询速度,这要归功于存储数据布局的优化。 Delta Lake 通过使用两种类型的算法来做到这一点:bin-packing 和 Z-Ordering。第一个算法 在将小文件合并为大文件时很有用,并且在处理大文件时更有效。第二种算法借鉴了数学分析,并应用于数据的底层结构,以将多个维度映射到一个维度,同时保留数据点的局部性。

在本节中,我们将了解这些算法的工作原理,了解如何使用作用于我们数据的命令来实现它们,以及如何借助时间旅行功能处理数据快照。

需要记住的是,虽然我们在使用 Delta Lake 时会进行自动优化,但大多数这些优化不会自动发生,其中一些必须手动应用。我们将看到如何——以及多久——这样做。

Merging small files using bin-packing

我们在前面的章节中已经讨论了处理从不同来源到达我们数据湖的几个小文件可能会出现的困难。 Delta Engine 允许我们将小文件合并为更大的文件,从而为我们提供了解决此问题的方法。

Bin-packing 与 Z-Ordering 一样,是一种从数学分析中借用的算法,最初的构思是优化如何将不同体积的对象打包到有限数量的 bin 中,同时使用尽可能少的 bin。我们已经可以想到如何将其应用于将较小的文件打包成最少数量的较大、更高效的文件的问题。它们如下:

  • We can apply this algorithm to a Delta table by using the OPTIMIZE command on a specific table path, as shown in the following code example:
    OPTIMIZE delta.`/data/data_events`

    在表中运行此操作将在输出中显示受操作影响的表的路径,以及优化期间修改的文件数量的指标,如以下屏幕截图所示:

读书笔记《distributed-data-systems-with-azure-databricks》第四章三角洲湖上的蔚蓝数据库

图 5.1 – 指定要按路径优化的 Delta 表

  • As we are used to doing, we can also just pass the table name, like this:
    OPTIMIZE data_events

    该表很小,因此优化实际上不会更改包含表数据的文件的任何内容,如下面的屏幕截图所示:

读书笔记《distributed-data-systems-with-azure-databricks》第四章三角洲湖上的蔚蓝数据库

图 5.2 – 按名称指定要优化的 Delta 表

  • If we have many files but we want to focus on a certain number of them, we can optimize a portion of the data by explicitly passing a WHERE clause. This will cause the OPTIMIZE command to just act on a subset of data, as illustrated in the following code snippet:
    OPTIMIZE data_events WHERE insert_date >= '2020-06-15'

    当运行OPTIMIZE命令后,我们会看到一系列与已经发生的文件重组相关的文件统计信息,例如删除文件和添加文件,甚至还有一些Z -订购统计。

Delta Lake 将优化上述表,从而提高查询速度,这要归功于我们将扫描较少数量的文件来读取我们的表。需要记住的一点是,由 OPTIMIZE 命令触发的 bin-packing 优化是一个幂等动作,这是一个花哨的词来表示该命令的执行只有在数据集上第一次触发时才会生效。

当此操作发生在表上时,正在优化的表的读取器不会受到影响,这要归功于 Delta 表的 Snapshot Isolation 功能。即使底层文件结构被算法重新排序并相应修改事务日志,也能确保表的完整性,因此查询将在 OPTIMIZE 之前和之后产生相同的结果命令已在表上运行。与正在优化的表相关的任何数据流也不会受到此命令的影响。

该算法的目标是生成一系列大小均匀的文件,在存储中提供大小和数量之间的平衡。

那么,如果这种重组如此高效并且不会影响存储在表中的数据,那么为什么 OPTIMIZE 不是自动的呢?这个问题的答案在于,要将多个文件合并为一个较大的文件,您需要在目录中预先设置这些小文件,否则将没有任何东西可以合并;因此,OPTIMIZE 不能自动运行。另一个答案是,压缩在计算、存储和时间资源方面也是一项昂贵的操作,因此当正在优化的表是流的源时,这可能会影响 Azure Databricks 执行低延迟流的能力。

讨论的最后一点涉及应该多久优化一次增量表。当然,这是性能和用于运行优化的资源成本之间的权衡。如果您的优先级是性能,则此操作应更频繁或定期(例如每天)进行;否则,您可以优先考虑成本并减少运行频率。

Skipping data

随着新数据的到来,Delta Lake 将跟踪与输入/输出(I/O) 数据粒度。这些统计信息是自动获得的,在查询时存储有关最小值和最大值的信息。这样做的目的是识别具有一定相关性的列,相应地计划查询,并避免不必要的 I/O 操作。在查找查询期间,Azure Databricks 数据跳过功能将利用获得的统计信息来跳过在此操作中不需要读取的文件。

粒度可以调整,并且可以很好地与分区集成,即使这些是独立的操作。

请记住,数据跳过是一种索引概率技术,与 Bloom 过滤器(将在本章后面讨论)一样,它可能会导致误报。

没有 需要配置数据跳过,因为它默认激活并尽可能应用,这也意味着此操作的性能与数据的底层结构绑定。我们可以配置的是我们想要在统计期间跟踪多少列——默认情况下是前 32 列,可以使用 dataSkippingNumIndexedCols 选项修改该值。考虑到跟踪列是一项资源密集型操作,因此添加更多列或跟踪具有长字符串数据类型的列将对写入文件时的开销产生影响。

解决这些问题的方法是使用 ALTER TABLE 更改长字符串列的类型,或将其移出 dataSkippingNumIndexedCols< /code> 选项,在收集统计信息时被跳过。

借助数据跳过,我们可以减少每次查询中需要读取的数据量,从而提高 ETL 管道的性能并减少时间和成本。

在下一节中,我们将了解 Z-Ordering 以及如何利用它来进一步提高性能。

Using Z-order clustering

有时,数据 可能分散在文件中,这在我们对其运行查询时是不切实际的。这会导致元数据大小的增加,以及列出目录的问题和文件压缩问题。

为了处理这个问题,我们可以使用分区技术(正如我们将在下一节中看到的那样),试图使我们的信息分布更加均匀。反过来,这些技术的性能取决于信息如何分散在文件中,因此在运行它们之前,建议您应用 Z-Ordering。

Z-Ordering 是 Delta Lake 的一项功能,旨在将相关数据分配到同一位置。这可以提高我们对运行此命令的表的查询性能,因为 Azure Databricks 可以通过数据跳过方法利用它来更有效地读取和分区文件。我们可以使用 ZORDER BY 命令在 Azure Databricks 中使用 Z-Ordering:

  • In the following code example, we will optimize the data table on the event_type column:
    OPTIMIZE data ZORDER BY (event_type)

    通过在笔记本的 结构化查询语言 (SQL) 单元格中运行此操作,我们应该得到所做的修改根据指定的分区优化表。从下面的截图示例中,我们可以看到我们的表很小,我们并不需要优化表的底层文件结构:

读书笔记《distributed-data-systems-with-azure-databricks》第四章三角洲湖上的蔚蓝数据库

图 5.3 – 在 Delta 表上应用 ZORDER

  • We can also reference Delta tables by path, as illustrated in the following code snippet:
    OPTIMIZE delta.'path_to_data' ZORDER BY (event_type)

    通过在 Azure Databricks 笔记本的 SQL 单元格中运行上述代码,我们应该可以看到操作的结果指标,其方式与之前按名称指定表时相同,如以下屏幕截图所示:

读书笔记《distributed-data-systems-with-azure-databricks》第四章三角洲湖上的蔚蓝数据库

图 5.4 – 在 Delta 表上应用 ZORDER,通过路径指定

Z-Ordering 也可以通过传递条件应用于数据子集。

  • In the following code example, we will only optimize data in the event_type column between today and yesterday:
    OPTIMIZE data WHERE insert_date >= current_timestamp() - INTERVAL 1 day ZORDER BY (event_type)

    在下面的屏幕截图中,我们可以看到通过在笔记本的 SQL 单元格中运行此操作,我们将看到表的路径和操作的指标,其方式与前面的示例相同:

读书笔记《distributed-data-systems-with-azure-databricks》第四章三角洲湖上的蔚蓝数据库

图 5.5 – 将 ZORDER 应用于数据的特定部分

具有许多唯一值的列,以及经常用于过滤的列,是最推荐通过运行 ZORDER 。我们还可以通过将作为参数分隔的列名传递给 ZORDER 命令来指定多个列。

多次运行 ZORDER 会导致多次执行,但这被认为是增量操作。在新数据到达后运行 ZORDER 不会影响已经优化的数据。

要记住的一件事是运行 Z-Ordering 与数据大小无关。该操作的目标是重新定位相关文件,而不是均匀分布数据,因为这也会导致数据偏斜,我们将在下一节中看到如何解决这个问题。

Managing data recency

正如上一章中提到的,当查询一个增量表时,除非另有说明,否则我们正在读取该表的最新状态,这要归功于增量表是自动更新的。这是一个自动运行的过程,但如果我们愿意,我们可以在不需要时修改此行为。这可能在执行历史分析时发生,因此我们不需要预先更新表。这样做会在查询时产生更低的延迟,简而言之,这意味着查询将运行得更快。

可以使用 spark.databricks.delta.stalenessLimit 选项设置此行为,该选项将 time 字符串值和将此应用于我们正在处理的当前会话,因此它不会影响其他用户 阅读该表。 time 参数以 毫秒 (ms) 表示,范围可以,例如,从 1 小时到 1 天。如果超过这个过期限制,表状态将被更新。

值得一提的是,该参数不会阻止更新表——它只是避免查询等待表更新。

Understanding checkpoints

我们已经在讨论表上的流式源时讨论了检查点,但我们还没有完全讨论它们是如何工作的。

Delta Lake 使用检查点来聚合 Delta 表的状态,以便在我们需要表的最新状态时将其作为计算的起点。检查点默认每 10 次提交写入一次,它们存储在表路径中的 _delta_log 目录中。

这种机制节省了大量时间,因为否则,在查询表时,要获取最新状态 Delta Lake 必须读取大量 JavaScript Object Notation (JSON) 文件,表示表的事务日志以获取最后一个状态。

简而言之,检查点保存与在当前状态之前对表所做的所有事务和修改相关的信息,检查已被最近的操作取消的先前操作 是否被删除从检查站。这些类型的操作称为无效操作,并使用称为规则的策略删除和解。这样做是为了减少日志的整体大小,并在重建快照时提高读取效率。正如我们所见,检查点对于快照隔离和访问先前版本的数据至关重要。

检查点也是存储有关列使用情况的统计信息以供在数据跳过期间使用的地方。

查看日志目录时,我们会看到它有一个命名约定,如下面的代码片段所示:

00000000000000001025.checkpoint.parquet

我们可以看到检查点的 名称由一个前缀组成,该前缀表示它所拥有的表的版本和表的文件格式。以更一般的方式,我们可以说具有 table_format 的表的 n 版本的检查点名称将是 n.checkpoint.table_format

鉴于这种格式和事务日志的性质,如果用户想要访问最新的检查点,则很难列出目录中的所有检查点(可能有数千个)。因此,Delta Lake 将最后一个检查点存储在 _delta_log/_last_checkpoint 文件中,以便为构建表的最新快照提供方便的访问。

如前所述,在使用结构化流时,如果启用了检查点,则流可以在失败的情况下重新启动,如下所示:

  • To do this, we can set the delta.checkpoint.writeStatsAsStruct option to true in our streaming table, as follows:
    ALTER TABLE our_table SET TBLPROPERTIES ('delta.checkpoint.writeStatsAsStruct' = 'true')

    我们可以在笔记本的 SQL 单元中运行它,如果操作成功,它应该返回 OK,如下例所示:

读书笔记《distributed-data-systems-with-azure-databricks》第四章三角洲湖上的蔚蓝数据库

图 5.6 – 在 Delta 表上设置检查点设置

  • 或者,当使用 Delta 表时,我们可以使用带有 delta 前缀的表的路径,如下所示代码片段:
    ALTER TABLE delta.`path_to_our_table` SET TBLPROPERTIES ('delta.checkpoint.writeStatsAsStruct' = 'true')
  • 我们还可以通过运行以下 Python 命令在处理流数据帧时使用 Apache Spark checkpointLocation 选项,该命令会写入一个名为 data< 的流数据帧/code> 以 Parquet 格式设置并设置检查点位置路径。每个查询必须有一个唯一的检查点:
    data.writeStream.format("parquet")   .option("路径", output_path)   .option("checkpointLocation", checkpoint_path)   .start()

在我们的结构化流中设置检查点可以让我们在可能出现故障的情况下进行备份。如果 流重新启动,它将自动从其离开的位置继续。

Automatically optimizing files with Delta Engine

自动优化是 Delta Engine 的一个 功能,用于进一步优化 Delta 表。它会在大表中的数据操作语言 (DML)操作期间自动压缩小文件,合并文件并保持元数据清洁。我们有两个选项来应用自动优化:Optimized Writes 和 Auto Compaction。 Optimized Writes 尝试将表分区均匀地排列到代表每个分区的 128 个 兆字节 (MB) 大小的文件中,以改进写入。 Auto Compaction 是一个 OPTIMIZE 命令,压缩大小设置为 128 MB,在每次写入操作后运行。

自动优化使经常修改的表保持优化,这在处理低延迟数据流或频繁表合并时非常有用。

我们需要通过在特定表上启用 our_delta_table.autoOptimize.optimizeWriteour_delta_table.autoOptimize.autoCompact 中的选项来启用自动优化:

  • To enable this option on existing tables, we can use the TBLPROPERTIES SQL command to set those properties to true, as in the following example:
    ALTER TABLE our_table SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true)

    我们可以在笔记本的 SQL 指定单元格中运行此命令,如以下屏幕截图所示:

读书笔记《distributed-data-systems-with-azure-databricks》第四章三角洲湖上的蔚蓝数据库

图 5.7 – 在 Delta 表上启用自动优化

  • 我们也可以 在所有新的 Delta 表上默认将这些选项定义为 true,使用以下 SQL 配置:
    设置 spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true。 设置 spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true。
  • 如果我们只想将其应用于我们的工作会话,我们可以使用以下代码来设置以下 Spark 配置:
    spark.databricks.delta.optimizeWrite.enabled spark.databricks.delta.autoCompact.enabled

必须手动启用这些选项的一个原因是,如果我们在同一张表上有许多来自不同用户的并发 DML 操作,这可能会导致事务日志中的冲突,这可能很危险,因为这不会导致任何失败或重试。

另外要记住的是,在大表上,在表上启用自动优化可以与 OPTIMIZE 结合使用,并且不能代替 ZORDER

让我们更详细地了解如何使用 Optimized Writes 和 Auto Compaction,如下所示:

  • Optimized Writes:当在表上启用 Optimized Writes 时,Delta Engine 将尝试将表重新排序为每个表分区大约 128 MB 的文件,这可以 根据数据模式变化,以平衡并行写入的文件数量。此过程根据表的分区结构对文件进行重新排序。当数据流具有几分钟的延迟时,此选项很有用。
  • Auto Compaction:正如前面提到的,Auto Compaction 在每次写入后运行,因此如果在表上启用它,则不会每次写操作后都需要调用 OPTIMIZE。如果我们没有在工作台上运行定期安排的 OPTIMIZE 作业,那么启用此选项是个好主意。

大多数时候 在 Azure Databricks 中优化数据处理时,很容易只关注转换。但我们应该永远记住,Databricks 使用分布式计算,需要在工作人员之间分发数据。确保数据井井有条并对其存储进行了优化,这将带来更好的性能指标,不仅适用于存储,还适用于数据处理。

Using caching to improve performance

缓存是一种操作,我们将数据存储在更靠近处理位置的位置,以提高性能。它可以在 Azure Databricks 中以两种不同的方式应用:增量缓存和 Apache Spark 缓存。根据我们处理的情况,我们可以利用每个选项的特定特性来提高表格的读取速度。

这样做的好处是无需选择使用哪个选项,因为它们可以同时使用。

让我们深入了解这两个选项的特征,并看看它们如何应用于提高性能。

Delta and Apache Spark caching

Delta 和 Apache Spark 有不同的特点,可以总结如下:

  • Delta 缓存: Delta 缓存会自动将远程 Parquet 文件的副本复制到本地节点存储中以加速其处理。它可以访问的远程位置是 Databricks File System (DBFS)、Hadoop 分布式文件系统 (HDFS)、Azure Blob 存储、Azure Data Lake Storage Gen1 和 Azure Data Lake Storage Gen2。由于优化的解压缩和符合处理要求的输出格式,它可以比 Spark 缓存运行得更快。可以使用 CACHE 语句预加载数据。然后数据存储在本地磁盘上,并且可以通过 固态硬盘 (SSD) 快速读取。
  • Spark 缓存:Spark 缓存可以存储来自子查询的结果并处理除 Parquet 之外的多种文件格式。必须手动指定要缓存的查询和表。数据存储在内存中,这会损害性能。

我们可以通过从集群配置中选择 Delta Cache Accelerated 工作器之一来启用 D​​elta 缓存,如下面的屏幕截图所示:

读书笔记《distributed-data-systems-with-azure-databricks》第四章三角洲湖上的蔚蓝数据库

图 5.8 – 选择 Delta Cache 加速存储

然后,我们继续如下:

  • 为了启用 Spark 缓存,我们需要在我们的数据帧上运行缓存选项——如以下 Python 命令所示——我们在其中缓存一个名为 data 的数据帧:
    data.cache()
  • 缓存会调用persist方法,传入MEMORY_DISK作为参数,意思就是和下面的Python命令一样:
    data.persist(StorageLevel.MEMORY_AND_DISK)

我们有几个参数作为存储级别传递,概述如下:

  • DISK_ONLY:这只会将序列化的数据保存在磁盘上。
  • MEMORY_ONLY:这会将反序列化的数据仅保留在内存中,实现非常好的计算性能,但限制了可用于存储的内存量。
  • MEMORY_AND_DISK:这会将数据持久化在内存中,如果没有足够的内存可用,它将驱逐数据以存储在磁盘上。这是 Spark 中的默认模式,它将尽可能多的数据存储在浮动内存中,其余的则被驱逐到磁盘中。计算时间将比仅使用磁盘快,但比仅使用内存要慢。
  • OFF_HEAP:数据持久化在堆外内存中,即不受Java虚拟机监管的内存(JVM) 垃圾收集器。数据将被更快地读取,但应谨慎使用,因为必须由用户指定分配的内存。

对缓存数据在 worker 中的存储位置进行细粒度控制 允许我们对使用数据的算法的性能进行细粒度控制。存储在本地工作内存中的数据将比从磁盘读取更快,但能够保存更少量的数据。 OFF_HEAP 内存应该小心使用,因为它的数据也被 JVM 用于内部进程。

Caching a subset of the data

我们也可以显式地选择部分数据缓存在worker本地存储中,使用CACHE SELECT命令以下方式:

CACHE SELECT col1, col2, col3 
FROM our_table
WHERE our_condition

同样,我们可以使用 delta 前缀和表路径来指定 Delta 表,如以下代码片段所示:

CACHE SELECT col1, col2, col3 
FROM delta. 'our_table'
WHERE our_condition

在以下屏幕截图示例中,我们在笔记本的 SQL 单元格中运行它,将条件指定为 COVID 数据集的特定省份:

读书笔记《distributed-data-systems-with-azure-databricks》第四章三角洲湖上的蔚蓝数据库

图 5.9 – 指定要缓存的 Delta 表的一部分

这将只记录符合我们条件的指定列进行缓存,从而防止后续查询扫描远程文件。

在使用 Delta 表时不需要这样做,因为数据会在查询时被缓存,但当您需要提高查询性能时它会很有用。

一个简单的 示例是缓存名为 data 的表,如下所示:

CACHE SELECT * FROM user_data

或者,我们也可以对会话期间经常使用的部分数据执行此操作,如下所示:

CACHE SELECT user_id, user_cohort FROM user_data WHERE user_cohort=10

接下来,我们将看到如何配置 Delta 缓存的行为。

Configuring the Delta cache

建议始终对工作实例使用Delta缓存加速集群,这些实例是运行SSD磁盘的优化实例。这些实例已默认配置为使用 Delta 缓存,但您可以在会话创建期间使用 Apache Spark 选项手动配置其参数:

  • 要建立最大磁盘使用量,默认设置为工作器本地存储的一半,您可以使用 maxDiskUsage 选项,以字节为单位,如下例所示,我们将其设置为 50g
    spark.databricks.io.cache.maxDiskUsage 50g
  • 我们还可以使用 maxMetaDataCache 选项指定元数据的最大分配空间(以字节为单位),如下例所示,其中设置为 1g
    spark.databricks.io.cache.maxMetaDataCache 1g
  • 我们也可以禁用压缩,默认是开启的,如下:
    spark.databricks.io.cache.compression.enabled false
  • 要禁用 Delta 缓存,我们可以使用以下 Scala 命令:
    spark.conf.set("spark.databricks.io.cache.enabled", "false")

很高兴知道,当禁用 Delta 缓存时,我们不会丢失已经存储在 worker 本地存储中的数据,但我们会避免添加更多数据。

在下一个 部分中,您将了解 DFP,这是一项 Delta Lake 功能,可提高查询性能。

Optimizing queries using DFP

DFP 是 Delta Lake 功能,可自动跳过与查询无关的文件。它是 Azure Databricks 中的默认选项,通过收集有关 Delta Lake 中文件的数据 工作,无需明确声明应在查询中跳过文件,通过以下方式提高性能利用数据的粒度。

DFP 关于进程是否启用、表的最小大小以及触发进程所需的最小文件数的行为可以通过以下选项进行管理:

  • spark.databricks.optimizer.dynamicPartitionPruning(默认为 true):是否启用 DFP。
  • spark.databricks.optimizer.deltaTableSizeThreshold(默认为 10 GB):激活 DFP 的 Delta 表的最小大小。
  • spark.databricks.optimizer.deltaTableFilesThreshold(默认为1000):表示探针端Delta表的文件数触发 DFP 所需的联接。如果文件数量较少,则不值得触发该过程。

DFP 可用于非分区表、非选择性连接以及对它们执行 DML 操作时。

通过应用 DFP 获得的改进与数据的粒度级别相关。因此,在使用 DFP 时应用 Z 排序可能是一个好主意。

Using DFP

为了首先了解我们如何应用 DFP,让我们看一个示例 我们手动使用文字语句和另一个 Delta Lake 功能,称为 动态分区修剪 民进党)。

在物理计划阶段,Delta Lake 为查询创建一个可执行计划。该计划将计算分布在许多工人的集群中。此计划已优化为仅扫描与查询相关的文件。

如果我们要运行查询,但我们想根据条件显式过滤结果。一个更具体的例子是选择 col1col2col3 从我们的表中,并根据最大值在其中一列上应用过滤条件,如以下代码片段所示:

SELECT col1,col2,col3
FROM our_table
WHERE col1> max_value

Delta Lake 在我们的列中保留有关最小值和最大值的信息,因此它会根据我们的过滤条件自动跳过与我们的查询无关的文件。

这是可能的,因为过滤包含文字条件(在我们的例子中是最大值条件),因此 Delta Lake 可以计划和优化文件扫描。如果过滤条件是联接的一部分,则 Delta Lake 无法计划要跳过哪些文件,因为过滤条件是另一个查询的结果。这称为 DPP,是 Delta Lake 的一个先前功能,与 DFP 一样,旨在提高查询的性能。

在使用基于星型模式的数据模型时,我们首先连接两个或多个表的以下类型的查询非常常见:

SELECT col1, col2, col3
FROM fact_table src
JOIN dim_table trg
ON src.id_col=trg.id_col
WHERE col1=our_condition

在这种情况下,条件作用于维度表而不是事实表。 Delta Lake 必须扫描事实表上的所有文件,扫描维度表中的所有文件,应用过滤条件,然后执行连接。

在这种情况下,DPP 将不起作用,但如果我们启用了 DFP,它将重新排序查询的执行计划,以便能够提前检查两个表上的条件,这提供了跳过两个表上的文件并增加的可能性查询的性能。

DFP 在 Azure Databricks 中默认启用,并根据以下规则应用于查询:

  • 连接策略是BROADCAST HASH JOIN
  • 连接类型为 INNERLEFT-SEMI
  • 内表是一个 Delta 表。
  • 内表中的文件数量足以证明应用 DFP 是合理的。

根据 选择的选项,DFP 将应用于连接操作。

Using Bloom filters

布隆过滤器 是一种根据条件有效过滤数据库中记录的方法。它们具有概率性质,用于测试集合中元素的成员资格。我们可能会遇到误报,但不会遇到误报。这些过滤器是作为一种数学结构开发的,当要扫描的数据量无法读取时应用,并且基于散列技术。

Delta Lake 为我们提供了对查询应用 Bloom 过滤器以进一步提高性能的能力。我们将看到它们如何在基本层面上工作,以及它们如何在 Delta Lake 中应用。

Understanding Bloom filters

正如在本节的介绍中提到的,布隆过滤器是用于测试元素是否属于某个类别的概率数据结构。此结构是使用散列函数填充的固定长度位数组,该函数将信息映射为 1 和 0。数组的长度取决于允许的误报数。动臂过滤器具有恒定的复杂性,并且相对于项目需要非常小的空间。

为了将数据映射到这个概率数据结构中,我们使用了允许我们获得数据指纹的散列函数,这意味着这个散列(几乎)是唯一的,可以用来比较或识别数据。这些散列函数是单向运算,并且必须始终为相同的数据提供相同的输出,并且如果可能的话,输出 值必须均匀且随机分布,以便相似的输入不会给出相同的结果。

这些哈希值可以在以后逐位比较以检查相似性。

在下面的截图示例中,我们比较了 W 元素是否属于 {x, y, z} 组。为此,我们将比较具有散列 W 的结果散列数组之一的位置与由该组的所有散列填充的组合位置。如果 W 散列中的所有 1 都在组的散列填充的位置,则 W 必须在集合中。如果 W 散列中的任何位落入 0,则它们不属于该集合:

读书笔记《distributed-data-systems-with-azure-databricks》第四章三角洲湖上的蔚蓝数据库

图 5.10 – 比较哈希以检查成员资格(图片取自 https:/ /en.wikipedia.org/wiki/Bloom_filter#/media/File:Bloom_filter.svg)

在这种情况下,W 元素不在集合 中,因为它散列到包含 0

我们可以看到,布隆过滤器是一种 快速提供答案的方法,如果一个元素属于一个集合,并且它们在数据工程中非常重要。接下来,我们将了解如何在 Delta Lake 中实现 Bloom 过滤器,以确保我们的查询执行良好。

Bloom filters in Azure Databricks

Azure Databricks Delta Lake 在运行查询以启用数据跳过时使用 Bloom 筛选器检查文件是否与特定筛选器匹配。它通过创建与每个文件关联的Bloom filter索引文件来做到这一点,最多五个级别,可以快速告诉Delta Lake文件包含所需数据的概率,或者是否根本不匹配。这些文件是存储在数据文件路径中的子目录中的单行 Parquet 文件。

如果给定文件不存在 Bloom 索引文件,则将始终读取该文件。当然,布隆过滤器索引文件的大​​小与选择的 误报率 (FPR) 成正比,因为较低的rate 将始终需要数组中的更多位进行比较,例如,对于给定的 10% FPR,这是 5 位。

_delta_index 包含索引文件,其命名格式与添加 index.v1.parquet 后缀。

我们可以通过将 spark.databricks.io.skipping.bloomFilter.enabled 会话配置设置为 true 来启用或禁用布隆过滤器或 false,如以下屏幕截图所示。 Azure Databricks 中已默认启用此选项:

读书笔记《distributed-data-systems-with-azure-databricks》第四章三角洲湖上的蔚蓝数据库

图 5.11 – 将 Bloom 过滤器设置为新 Delta 表的默认值

Creating a Bloom filter index

我们可以使用 CREATE BLOOMFILTER INDEX 命令创建布隆过滤器。这适用于表上的所有列或仅适用于其中的一个子集。

创建布隆过滤器的语法如以下代码片段所示:

CREATE BLOOMFILTER INDEX ON TABLE our_table
FOR COLUMNS(col1 OPTIONS(fpp=0.1), col2 OPTIONS(fpp=0.2))]

以前面章节中使用的 COVID 数据集为例,我们可以在 patient_id 列中创建一个布隆过滤器,如图所示在以下屏幕截图中:

读书笔记《distributed-data-systems-with-azure-databricks》第四章三角洲湖上的蔚蓝数据库

图 5.12 – 为 Delta 表创建 Bloom 过滤器

我们还可以使用 Delta 表的路径引用相同的命令,如以下代码片段所示:

CREATE BLOOMFILTER INDEX ON TABLE delta.'path_to_table'
FOR COLUMNS(col1 OPTIONS(fpp=0.1), col2 OPTIONS(fpp=0.2))]

在这种情况下,我们为 col1col2 列创建 Bloom 过滤器,FPR 分别为 10% 和 20%。

我们还可以传递 numItems 选项,这是一个文件可以包含的不同项目的数量,以及 maxExpectedFpp 选项,它控制不将布隆过滤器写入磁盘的最大 FPR。也就是说,我们创建了一种方法来更快地查询写入内存的数据,我们只需计算一次。

我们还可以使用 DROP BLOOMFILTER INDEX 命令从表中删除所有 Bloom 过滤器。如果我们想删除我们最近创建的 Bloom 过滤器,我们可以运行以下代码:

DROP BLOOMFILTER INDEX ON TABLE our_table
FOR COLUMNS(col1 , col2)

这将删除 col1col2 列中的所有 Bloom 过滤器。请记住,在表上运行 VACUUM 也会删除所有 Bloom 过滤器。

Optimizing join performance

对表执行连接可能是一项耗费资源的操作。为了提高此类操作的性能,我们可以选择数据子集或纠正可能存在的缺陷,例如我们的数据中文件 大小的分布不成比例。解决这些问题可以提高性能并导致更有效地使用分布式计算能力。

Azure Databricks Delta Lake 通过提供范围筛选和更正我们表中数据文件大小分布的偏度来优化连接操作。

Range join optimization

连接是经常使用的,因此优化这些操作可以大大提高我们的查询性能。范围连接优化是指定需要对范围给定的数据子集执行连接的过程。

当连接操作具有类型为数字或日期时间类型的过滤条件时,将应用范围连接优化,并且可以将其理解为间隔并具有 bin 大小的调整数。

可以在以下代码片段中看到间隔的示例:

SELECT * FROM data_points JOIN ranges ON data_points.p BETWEEN ranges.start_range and ranges.end_range;

要使用范围连接 优化,我们需要建立所需的 bin 大小。为此,我们使用提示或在 Spark 会话配置中进行设置,因为除非指定,否则它不会运行。 bin-size 数字在使用日期类型时以天为单位,在使用时间戳时以秒为单位。

Enabling range join optimization

我们通过显式传递带有与 bin 大小对应的参数的提示来启用 范围连接优化。通过在 SELECT 语句上使用特定的 RANGE_JOIN 语法调用提示,并接收目标表和 bin 大小作为参数。除了表,我们还可以指定视图和子查询,考虑到这可能会损害性能。

在下一个 示例中,范围连接是在 bin 大小为 20 的表上建立的:

SELECT /*+ RANGE_JOIN(data_points, 20) */ *
FROM data_points JOIN ranges ON data_points.p >= ranges.start_range AND points.p < ranges.end_range;

您还可以在范围内将 bin 大小指定为数据的百分位数。在以下情况下,这是 10%:

SELECT /*+ RANGE_JOIN(r1, 0.1) */ *
FROM (SELECT * FROM ranges WHERE ranges.amount < 100) r1, ranges r2
WHERE r1.start_range < r2.start_range + 100 AND r2.start_range < r1.start_range + 100;

最后,我们还可以使用 BETWEEN 命令指定时间戳和日期时间格式的范围连接,如前所述:

SELECT /*+ RANGE_JOIN(data_points_c, 100) */ *
FROM data_points_a
  JOIN data_points_b ON (data_points.point_id = data_points_b.point_id)
  JOIN data_points_c ON (data_points_a.ts BETWEEN data_points_c.start_time AND data_points_c.end_time)

优化的有效性将取决于具有正确的 bin 大小。相对于区间而言太小的 bin 大小可能会导致区间之间的重叠。我们可以通过查看我们将用作过滤器的列中的百分位数来选择合适的 bin 大小。这可以使用 APPROX_PERCENTILE 命令来完成。

在下面的代码示例中,我们正在查看 .25.5 data_ranges 的 .75 个百分位数,endstart 之间有间隔代码>栏:

SELECT APPROX_PERCENTILE(CAST(end - start AS DOUBLE), ARRAY(.25, .5, .75)) FROM data_ranges

如果我们的区间长度是统一的,我们可以将 bin 大小设置为最常见的区间长度。

bin 大小是一个高度依赖于数据分布的 参数,因此建议测试不同的方法来找到合适的 bin 大小,以利用所有并行处理计算优势.

Skew join optimization

当我们的数据分区比其他分区大得多时,我们会遇到 偏度问题。当我们在那个分区上执行我们的进程时,这可能会产生问题,因为在这个分区上工作需要更多的时间。这在 Azure Databricks 等分布式计算系统上尤其糟糕,因为许多任务是并行运行的,并且让一个任务比其他任务花费更长的时间会阻塞依赖于该任务的执行。

这种不均衡的工作负载问题主要影响连接操作,因为大表需要重新洗牌。如果某些执行时间比其他执行时间长,这个问题可能会影响我们的查询。

可以使用 Azure Databricks 中的倾斜连接优化提示来解决此问题,方法与我们在范围连接优化中看到的相同。

提示使用 /* 语法开始,/*SELECT 结束 语句,传递 SKEW 命令,该命令将我们要优化的列作为参数。

下面的代码片段显示了一个示例:

SELECT /*+ SKEW('orders') */ * FROM data, users WHERE user_id = our_condition

Delta Lake 使用提示中的信息来优化我们查询的执行计划。我们还可以将其应用于子查询,如下面的代码示例所示,我们在其中运行子查询来过滤我们的数据:

SELECT /*+ SKEW('C1') */ * FROM (SELECT * FROM data WHERE user_id < 100) C1, orders
  WHERE C1.user_id = our_condition

我们还可以 使用 SKEW 命令指定关系和列。我们将在下一节中看到如何做到这一点。

Relationships and columns

倾斜连接 优化是一项计算密集型操作,必须仅在需要时运行。这就是为什么 SKEW 的参数是列名的原因。运行连接时,只需在我们用来执行此操作的列上运行 SKEW 很有用。

在以下代码示例中,我们将倾斜提示应用于单个列,即 user_id

SELECT /*+ SKEW('orders', user_id) */ *
  FROM orders, users
  WHERE user_id = our_condition

我们还可以将提示应用于多个列,如下所示:

SELECT /*+ SKEW('orders', ('user_id', 'o_region_id')) */ *
  FROM orders, users
  WHERE user_id = our_condition AND o_region_id= c_region_id

您还可以在提示中指定偏斜值。根据查询和数据,偏差值可能是已知的(例如,因为它们从不改变)或者可能很容易找出。这样做可以减少倾斜连接优化的开销。否则,Delta Lake 会自动检测到它们。

根据查询的类型,也可以在提示上指定值。传递给 SKEW 的参数越具体,优化任务的开销就越少。

在以下代码示例中,我们对指定单个倾斜值的单个列运行查询:

SELECT /*+ SKEW('orders', 'user_id', 0) */ *
 FROM orders, users
  WHERE user_id = our_condition 

我们还可以传递几个 skew 值,如下所示:

SELECT /*+ SKEW('orders', 'user_id', (0, 1, 2)) */ *
 FROM orders, users
  WHERE user_id = our_condition 

最后,我们还可以 指定几个列,并分别为这些列指定几个值,如下所示:

SELECT /*+ SKEW('orders', ('user_id', o_region_id), ((0, 1001), (1, 1002))) */ *
 FROM orders, users
  WHERE user_id = our_condition AND o_region_id= c_region_id

我们可以看到,我们可以有选择地纠正数据中的偏度,仅在我们选择的符合规定条件的列中应用这种转换。这样,我们可以以更有效的方式进行优化。

Summary

在本章中,我们讨论了如何应用 Delta Engine 功能(例如 Z-Ordering、数据跳过和数据缓存等)来改进数据的布局结构,从而全面提高查询性能。我们可以利用这些工具将文件重组为更小、更紧凑的文件,其中数据根据访问频率分布。我们还可以在数据列上创建 Bloom 过滤器,以提高我们运行查询的速度。

在下一章中,我们将深入了解如何在 Delta 表中摄取数据流。