读书笔记《distributed-data-systems-with-azure-databricks》第四章三角洲湖上的蔚蓝数据库
Chapter 5: Introducing Delta Engine
Delta Engine 是 Delta Lake 的查询引擎,默认包含在 Azure Databricks 中。由于优化的布局和改进的数据索引,它的构建方式允许我们以多种方式优化 Delta Lake 中的数据处理。这些优化操作包括使用动态文件修剪 (DFP)、Z-Ordering、Auto Compaction、ad hoc processing 和更多的。这些优化操作的额外好处是,其中一些操作以自动方式发生,只需使用 Delta Lake。您将以多种方式使用 Delta 引擎优化。
在本章中,你将学习如何利用 Delta Lake 优化 Azure Databricks 中的 Delta Lake ETL。以下是我们将集中讨论的主题:
- 使用 Delta Engine 优化文件管理
- 使用 DFP 优化查询
- 使用布隆过滤器
- 优化连接性能
Delta Engine 是关于优化的,文件管理可能是我们的主要缺点之一。在本章中,您将学习如何使用 Delta Engine 的功能来提高 ETL 的性能。
Optimizing file management with Delta Engine
Delta Engine 允许改进对 Delta Lake 中文件的管理,从而提高查询速度,这要归功于存储数据布局的优化。 Delta Lake 通过使用两种类型的算法来做到这一点:bin-packing 和 Z-Ordering。第一个算法 在将小文件合并为大文件时很有用,并且在处理大文件时更有效。第二种算法借鉴了数学分析,并应用于数据的底层结构,以将多个维度映射到一个维度,同时保留数据点的局部性。
在本节中,我们将了解这些算法的工作原理,了解如何使用作用于我们数据的命令来实现它们,以及如何借助时间旅行功能处理数据快照。
需要记住的是,虽然我们在使用 Delta Lake 时会进行自动优化,但大多数这些优化不会自动发生,其中一些必须手动应用。我们将看到如何——以及多久——这样做。
Merging small files using bin-packing
我们在前面的章节中已经讨论了处理从不同来源到达我们数据湖的几个小文件可能会出现的困难。 Delta Engine 允许我们将小文件合并为更大的文件,从而为我们提供了解决此问题的方法。
Bin-packing 与 Z-Ordering 一样,是一种从数学分析中借用的算法,最初的构思是优化如何将不同体积的对象打包到有限数量的 bin 中,同时使用尽可能少的 bin。我们已经可以想到如何将其应用于将较小的文件打包成最少数量的较大、更高效的文件的问题。它们如下:
- We can apply this algorithm to a Delta table by using the
OPTIMIZE
command on a specific table path, as shown in the following code example:在表中运行此操作将在输出中显示受操作影响的表的路径,以及优化期间修改的文件数量的指标,如以下屏幕截图所示:
- As we are used to doing, we can also just pass the table name, like this:
该表很小,因此优化实际上不会更改包含表数据的文件的任何内容,如下面的屏幕截图所示:
- If we have many files but we want to focus on a certain number of them, we can optimize a portion of the data by explicitly passing a
WHERE
clause. This will cause theOPTIMIZE
command to just act on a subset of data, as illustrated in the following code snippet:当运行
OPTIMIZE
命令后,我们会看到一系列与已经发生的文件重组相关的文件统计信息,例如删除文件和添加文件,甚至还有一些Z -订购统计。
Delta Lake 将优化上述表,从而提高查询速度,这要归功于我们将扫描较少数量的文件来读取我们的表。需要记住的一点是,由 OPTIMIZE
命令触发的 bin-packing 优化是一个幂等动作,这是一个花哨的词来表示该命令的执行只有在数据集上第一次触发时才会生效。
当此操作发生在表上时,正在优化的表的读取器不会受到影响,这要归功于 Delta 表的 Snapshot Isolation 功能。即使底层文件结构被算法重新排序并相应修改事务日志,也能确保表的完整性,因此查询将在 OPTIMIZE
之前和之后产生相同的结果命令已在表上运行。与正在优化的表相关的任何数据流也不会受到此命令的影响。
该算法的目标是生成一系列大小均匀的文件,在存储中提供大小和数量之间的平衡。
那么,如果这种重组如此高效并且不会影响存储在表中的数据,那么为什么 OPTIMIZE
不是自动的呢?这个问题的答案在于,要将多个文件合并为一个较大的文件,您需要在目录中预先设置这些小文件,否则将没有任何东西可以合并;因此,OPTIMIZE
不能自动运行。另一个答案是,压缩在计算、存储和时间资源方面也是一项昂贵的操作,因此当正在优化的表是流的源时,这可能会影响 Azure Databricks 执行低延迟流的能力。
讨论的最后一点涉及应该多久优化一次增量表。当然,这是性能和用于运行优化的资源成本之间的权衡。如果您的优先级是性能,则此操作应更频繁或定期(例如每天)进行;否则,您可以优先考虑成本并减少运行频率。
Skipping data
随着新数据的到来,Delta Lake 将跟踪与输入/输出(I/O) 数据粒度。这些统计信息是自动获得的,在查询时存储有关最小值和最大值的信息。这样做的目的是识别具有一定相关性的列,相应地计划查询,并避免不必要的 I/O 操作。在查找查询期间,Azure Databricks 数据跳过功能将利用获得的统计信息来跳过在此操作中不需要读取的文件。
粒度可以调整,并且可以很好地与分区集成,即使这些是独立的操作。
请记住,数据跳过是一种索引概率技术,与 Bloom 过滤器(将在本章后面讨论)一样,它可能会导致误报。
没有 需要配置数据跳过,因为它默认激活并尽可能应用,这也意味着此操作的性能与数据的底层结构绑定。我们可以配置的是我们想要在统计期间跟踪多少列——默认情况下是前 32 列,可以使用 dataSkippingNumIndexedCols
选项修改该值。考虑到跟踪列是一项资源密集型操作,因此添加更多列或跟踪具有长字符串数据类型的列将对写入文件时的开销产生影响。
解决这些问题的方法是使用 ALTER TABLE
更改长字符串列的类型,或将其移出 dataSkippingNumIndexedCols< /code> 选项,在收集统计信息时被跳过。
借助数据跳过,我们可以减少每次查询中需要读取的数据量,从而提高 ETL 管道的性能并减少时间和成本。
在下一节中,我们将了解 Z-Ordering 以及如何利用它来进一步提高性能。
Using Z-order clustering
有时,数据 可能分散在文件中,这在我们对其运行查询时是不切实际的。这会导致元数据大小的增加,以及列出目录的问题和文件压缩问题。
为了处理这个问题,我们可以使用分区技术(正如我们将在下一节中看到的那样),试图使我们的信息分布更加均匀。反过来,这些技术的性能取决于信息如何分散在文件中,因此在运行它们之前,建议您应用 Z-Ordering。
Z-Ordering 是 Delta Lake 的一项功能,旨在将相关数据分配到同一位置。这可以提高我们对运行此命令的表的查询性能,因为 Azure Databricks 可以通过数据跳过方法利用它来更有效地读取和分区文件。我们可以使用 ZORDER BY
命令在 Azure Databricks 中使用 Z-Ordering:
- In the following code example, we will optimize the data table on the
event_type
column:通过在笔记本的 结构化查询语言 (SQL) 单元格中运行此操作,我们应该得到所做的修改根据指定的分区优化表。从下面的截图示例中,我们可以看到我们的表很小,我们并不需要优化表的底层文件结构:
- We can also reference Delta tables by path, as illustrated in the following code snippet:
通过在 Azure Databricks 笔记本的 SQL 单元格中运行上述代码,我们应该可以看到操作的结果指标,其方式与之前按名称指定表时相同,如以下屏幕截图所示:
Z-Ordering 也可以通过传递条件应用于数据子集。
- In the following code example, we will only optimize data in the
event_type
column between today and yesterday:在下面的屏幕截图中,我们可以看到通过在笔记本的 SQL 单元格中运行此操作,我们将看到表的路径和操作的指标,其方式与前面的示例相同:
具有许多唯一值的列,以及经常用于过滤的列,是最推荐通过运行 ZORDER
。我们还可以通过将作为参数分隔的列名传递给
ZORDER
命令来指定多个列。
多次运行 ZORDER
会导致多次执行,但这被认为是增量操作。在新数据到达后运行 ZORDER
不会影响已经优化的数据。
要记住的一件事是运行 Z-Ordering 与数据大小无关。该操作的目标是重新定位相关文件,而不是均匀分布数据,因为这也会导致数据偏斜,我们将在下一节中看到如何解决这个问题。
Managing data recency
正如上一章中提到的,当查询一个增量表时,除非另有说明,否则我们正在读取该表的最新状态,这要归功于增量表是自动更新的。这是一个自动运行的过程,但如果我们愿意,我们可以在不需要时修改此行为。这可能在执行历史分析时发生,因此我们不需要预先更新表。这样做会在查询时产生更低的延迟,简而言之,这意味着查询将运行得更快。
可以使用 spark.databricks.delta.stalenessLimit
选项设置此行为,该选项将 time
字符串值和将此应用于我们正在处理的当前会话,因此它不会影响其他用户 阅读该表。 time
参数以 毫秒 (ms) 表示,范围可以,例如,从 1 小时到 1 天。如果超过这个过期限制,表状态将被更新。
值得一提的是,该参数不会阻止更新表——它只是避免查询等待表更新。
Understanding checkpoints
我们已经在讨论表上的流式源时讨论了检查点,但我们还没有完全讨论它们是如何工作的。
Delta Lake 使用检查点来聚合 Delta 表的状态,以便在我们需要表的最新状态时将其作为计算的起点。检查点默认每 10 次提交写入一次,它们存储在表路径中的 _delta_log
目录中。
这种机制节省了大量时间,因为否则,在查询表时,要获取最新状态 Delta Lake 必须读取大量 JavaScript Object Notation (JSON) 文件,表示表的事务日志以获取最后一个状态。
简而言之,检查点保存与在当前状态之前对表所做的所有事务和修改相关的信息,检查已被最近的操作取消的先前操作 是否被删除从检查站。这些类型的操作称为无效操作,并使用称为规则的策略删除和解。这样做是为了减少日志的整体大小,并在重建快照时提高读取效率。正如我们所见,检查点对于快照隔离和访问先前版本的数据至关重要。
检查点也是存储有关列使用情况的统计信息以供在数据跳过期间使用的地方。
查看日志目录时,我们会看到它有一个命名约定,如下面的代码片段所示:
我们可以看到检查点的 名称由一个前缀组成,该前缀表示它所拥有的表的版本和表的文件格式。以更一般的方式,我们可以说具有 table_format
的表的 n 版本的检查点名称将是 n.checkpoint.table_format
。
鉴于这种格式和事务日志的性质,如果用户想要访问最新的检查点,则很难列出目录中的所有检查点(可能有数千个)。因此,Delta Lake 将最后一个检查点存储在 _delta_log/_last_checkpoint
文件中,以便为构建表的最新快照提供方便的访问。
如前所述,在使用结构化流时,如果启用了检查点,则流可以在失败的情况下重新启动,如下所示:
- To do this, we can set the
delta.checkpoint.writeStatsAsStruct
option totrue
in our streaming table, as follows:我们可以在笔记本的 SQL 单元中运行它,如果操作成功,它应该返回
OK
,如下例所示:
- 或者,当使用 Delta 表时,我们可以使用带有
delta
前缀的表的路径,如下所示代码片段: - 我们还可以通过运行以下 Python 命令在处理流数据帧时使用 Apache Spark
checkpointLocation
选项,该命令会写入一个名为data< 的流数据帧/code> 以 Parquet 格式设置并设置检查点位置路径。每个查询必须有一个唯一的检查点:
在我们的结构化流中设置检查点可以让我们在可能出现故障的情况下进行备份。如果 流重新启动,它将自动从其离开的位置继续。
Automatically optimizing files with Delta Engine
自动优化是 Delta Engine 的一个 功能,用于进一步优化 Delta 表。它会在大表中的数据操作语言 (DML)操作期间自动压缩小文件,合并文件并保持元数据清洁。我们有两个选项来应用自动优化:Optimized Writes 和 Auto Compaction。 Optimized Writes 尝试将表分区均匀地排列到代表每个分区的 128 个 兆字节 (MB) 大小的文件中,以改进写入。 Auto Compaction 是一个 OPTIMIZE
命令,压缩大小设置为 128 MB,在每次写入操作后运行。
自动优化使经常修改的表保持优化,这在处理低延迟数据流或频繁表合并时非常有用。
我们需要通过在特定表上启用 our_delta_table.autoOptimize.optimizeWrite
和 our_delta_table.autoOptimize.autoCompact
中的选项来启用自动优化:
- To enable this option on existing tables, we can use the
TBLPROPERTIES
SQL command to set those properties totrue
, as in the following example:我们可以在笔记本的 SQL 指定单元格中运行此命令,如以下屏幕截图所示:
必须手动启用这些选项的一个原因是,如果我们在同一张表上有许多来自不同用户的并发 DML 操作,这可能会导致事务日志中的冲突,这可能很危险,因为这不会导致任何失败或重试。
另外要记住的是,在大表上,在表上启用自动优化可以与 OPTIMIZE
结合使用,并且不能代替 ZORDER
。
让我们更详细地了解如何使用 Optimized Writes 和 Auto Compaction,如下所示:
- Optimized Writes:当在表上启用 Optimized Writes 时,Delta Engine 将尝试将表重新排序为每个表分区大约 128 MB 的文件,这可以 根据数据模式变化,以平衡并行写入的文件数量。此过程根据表的分区结构对文件进行重新排序。当数据流具有几分钟的延迟时,此选项很有用。
- Auto Compaction:正如前面提到的,Auto Compaction 在每次写入后运行,因此如果在表上启用它,则不会每次写操作后都需要调用
OPTIMIZE
。如果我们没有在工作台上运行定期安排的OPTIMIZE
作业,那么启用此选项是个好主意。
大多数时候 在 Azure Databricks 中优化数据处理时,很容易只关注转换。但我们应该永远记住,Databricks 使用分布式计算,需要在工作人员之间分发数据。确保数据井井有条并对其存储进行了优化,这将带来更好的性能指标,不仅适用于存储,还适用于数据处理。
Using caching to improve performance
缓存是一种操作,我们将数据存储在更靠近处理位置的位置,以提高性能。它可以在 Azure Databricks 中以两种不同的方式应用:增量缓存和 Apache Spark 缓存。根据我们处理的情况,我们可以利用每个选项的特定特性来提高表格的读取速度。
这样做的好处是无需选择使用哪个选项,因为它们可以同时使用。
让我们深入了解这两个选项的特征,并看看它们如何应用于提高性能。
Delta and Apache Spark caching
Delta 和 Apache Spark 有不同的特点,可以总结如下:
- Delta 缓存: Delta 缓存会自动将远程 Parquet 文件的副本复制到本地节点存储中以加速其处理。它可以访问的远程位置是 Databricks File System (DBFS)、Hadoop 分布式文件系统 (HDFS)、Azure Blob 存储、Azure Data Lake Storage Gen1 和 Azure Data Lake Storage Gen2。由于优化的解压缩和符合处理要求的输出格式,它可以比 Spark 缓存运行得更快。可以使用
CACHE
语句预加载数据。然后数据存储在本地磁盘上,并且可以通过 固态硬盘 (SSD) 快速读取。 - Spark 缓存:Spark 缓存可以存储来自子查询的结果并处理除 Parquet 之外的多种文件格式。必须手动指定要缓存的查询和表。数据存储在内存中,这会损害性能。
我们可以通过从集群配置中选择 Delta Cache Accelerated 工作器之一来启用 Delta 缓存,如下面的屏幕截图所示:
- 为了启用 Spark 缓存,我们需要在我们的数据帧上运行缓存选项——如以下 Python 命令所示——我们在其中缓存一个名为
data
的数据帧: - 缓存会调用persist方法,传入
MEMORY_DISK
作为参数,意思就是和下面的Python命令一样:
我们有几个参数作为存储级别传递,概述如下:
DISK_ONLY
:这只会将序列化的数据保存在磁盘上。MEMORY_ONLY
:这会将反序列化的数据仅保留在内存中,实现非常好的计算性能,但限制了可用于存储的内存量。MEMORY_AND_DISK
:这会将数据持久化在内存中,如果没有足够的内存可用,它将驱逐数据以存储在磁盘上。这是 Spark 中的默认模式,它将尽可能多的数据存储在浮动内存中,其余的则被驱逐到磁盘中。计算时间将比仅使用磁盘快,但比仅使用内存要慢。OFF_HEAP
:数据持久化在堆外内存中,即不受Java虚拟机监管的内存(JVM) 垃圾收集器。数据将被更快地读取,但应谨慎使用,因为必须由用户指定分配的内存。
对缓存数据在 worker 中的存储位置进行细粒度控制 允许我们对使用数据的算法的性能进行细粒度控制。存储在本地工作内存中的数据将比从磁盘读取更快,但能够保存更少量的数据。 OFF_HEAP
内存应该小心使用,因为它的数据也被 JVM 用于内部进程。
Caching a subset of the data
我们也可以显式地选择部分数据缓存在worker本地存储中,使用CACHE SELECT
命令以下方式:
同样,我们可以使用 delta
前缀和表路径来指定 Delta 表,如以下代码片段所示:
在以下屏幕截图示例中,我们在笔记本的 SQL 单元格中运行它,将条件指定为 COVID
数据集的特定省份:
这将只记录符合我们条件的指定列进行缓存,从而防止后续查询扫描远程文件。
在使用 Delta 表时不需要这样做,因为数据会在查询时被缓存,但当您需要提高查询性能时它会很有用。
或者,我们也可以对会话期间经常使用的部分数据执行此操作,如下所示:
接下来,我们将看到如何配置 Delta 缓存的行为。
Configuring the Delta cache
建议始终对工作实例使用Delta缓存加速集群,这些实例是运行SSD磁盘的优化实例。这些实例已默认配置为使用 Delta 缓存,但您可以在会话创建期间使用 Apache Spark 选项手动配置其参数:
- 要建立最大磁盘使用量,默认设置为工作器本地存储的一半,您可以使用
maxDiskUsage
选项,以字节为单位,如下例所示,我们将其设置为50g
: - 我们还可以使用
maxMetaDataCache
选项指定元数据的最大分配空间(以字节为单位),如下例所示,其中设置为1g :
- 我们也可以禁用压缩,默认是开启的,如下:
- 要禁用 Delta 缓存,我们可以使用以下 Scala 命令:
很高兴知道,当禁用 Delta 缓存时,我们不会丢失已经存储在 worker 本地存储中的数据,但我们会避免添加更多数据。
在下一个 部分中,您将了解 DFP,这是一项 Delta Lake 功能,可提高查询性能。
Optimizing queries using DFP
DFP 是 Delta Lake 功能,可自动跳过与查询无关的文件。它是 Azure Databricks 中的默认选项,通过收集有关 Delta Lake 中文件的数据 工作,无需明确声明应在查询中跳过文件,通过以下方式提高性能利用数据的粒度。
DFP 关于进程是否启用、表的最小大小以及触发进程所需的最小文件数的行为可以通过以下选项进行管理:
spark.databricks.optimizer.dynamicPartitionPruning
(默认为true
):是否启用 DFP。spark.databricks.optimizer.deltaTableSizeThreshold
(默认为10 GB
):激活 DFP 的 Delta 表的最小大小。spark.databricks.optimizer.deltaTableFilesThreshold
(默认为1000
):表示探针端Delta表的文件数触发 DFP 所需的联接。如果文件数量较少,则不值得触发该过程。
DFP 可用于非分区表、非选择性连接以及对它们执行 DML 操作时。
通过应用 DFP 获得的改进与数据的粒度级别相关。因此,在使用 DFP 时应用 Z 排序可能是一个好主意。
Using DFP
为了首先了解我们如何应用 DFP,让我们看一个示例 我们手动使用文字语句和另一个 Delta Lake 功能,称为 动态分区修剪 (民进党)。
在物理计划阶段,Delta Lake 为查询创建一个可执行计划。该计划将计算分布在许多工人的集群中。此计划已优化为仅扫描与查询相关的文件。
如果我们要运行查询,但我们想根据条件显式过滤结果。一个更具体的例子是选择 col1
、col2
和 col3
从我们的表中,并根据最大值在其中一列上应用过滤条件,如以下代码片段所示:
Delta Lake 在我们的列中保留有关最小值和最大值的信息,因此它会根据我们的过滤条件自动跳过与我们的查询无关的文件。
这是可能的,因为过滤包含文字条件(在我们的例子中是最大值条件),因此 Delta Lake 可以计划和优化文件扫描。如果过滤条件是联接的一部分,则 Delta Lake 无法计划要跳过哪些文件,因为过滤条件是另一个查询的结果。这称为 DPP,是 Delta Lake 的一个先前功能,与 DFP 一样,旨在提高查询的性能。
在使用基于星型模式的数据模型时,我们首先连接两个或多个表的以下类型的查询非常常见:
在这种情况下,条件作用于维度表而不是事实表。 Delta Lake 必须扫描事实表上的所有文件,扫描维度表中的所有文件,应用过滤条件,然后执行连接。
在这种情况下,DPP 将不起作用,但如果我们启用了 DFP,它将重新排序查询的执行计划,以便能够提前检查两个表上的条件,这提供了跳过两个表上的文件并增加的可能性查询的性能。
DFP 在 Azure Databricks 中默认启用,并根据以下规则应用于查询:
- 连接策略是
BROADCAST HASH JOIN
。 - 连接类型为
INNER
或LEFT-SEMI
。 - 内表是一个 Delta 表。
- 内表中的文件数量足以证明应用 DFP 是合理的。
Using Bloom filters
布隆过滤器 是一种根据条件有效过滤数据库中记录的方法。它们具有概率性质,用于测试集合中元素的成员资格。我们可能会遇到误报,但不会遇到误报。这些过滤器是作为一种数学结构开发的,当要扫描的数据量无法读取时应用,并且基于散列技术。
Delta Lake 为我们提供了对查询应用 Bloom 过滤器以进一步提高性能的能力。我们将看到它们如何在基本层面上工作,以及它们如何在 Delta Lake 中应用。
Understanding Bloom filters
正如在本节的介绍中提到的,布隆过滤器是用于测试元素是否属于某个类别的概率数据结构。此结构是使用散列函数填充的固定长度位数组,该函数将信息映射为 1 和 0。数组的长度取决于允许的误报数。动臂过滤器具有恒定的复杂性,并且相对于项目需要非常小的空间。
为了将数据映射到这个概率数据结构中,我们使用了允许我们获得数据指纹的散列函数,这意味着这个散列(几乎)是唯一的,可以用来比较或识别数据。这些散列函数是单向运算,并且必须始终为相同的数据提供相同的输出,并且如果可能的话,输出 值必须均匀且随机分布,以便相似的输入不会给出相同的结果。
这些哈希值可以在以后逐位比较以检查相似性。
在下面的截图示例中,我们比较了 W 元素是否属于 {x, y, z} 组。为此,我们将比较具有散列 W 的结果散列数组之一的位置与由该组的所有散列填充的组合位置。如果 W 散列中的所有 1 都在组的散列填充的位置,则 W 必须在集合中。如果 W 散列中的任何位落入 0,则它们不属于该集合:
我们可以看到,布隆过滤器是一种 快速提供答案的方法,如果一个元素属于一个集合,并且它们在数据工程中非常重要。接下来,我们将了解如何在 Delta Lake 中实现 Bloom 过滤器,以确保我们的查询执行良好。
Bloom filters in Azure Databricks
Azure Databricks Delta Lake 在运行查询以启用数据跳过时使用 Bloom 筛选器检查文件是否与特定筛选器匹配。它通过创建与每个文件关联的Bloom filter索引文件来做到这一点,最多五个级别,可以快速告诉Delta Lake文件包含所需数据的概率,或者是否根本不匹配。这些文件是存储在数据文件路径中的子目录中的单行 Parquet 文件。
如果给定文件不存在 Bloom 索引文件,则将始终读取该文件。当然,布隆过滤器索引文件的大小与选择的 误报率 (FPR) 成正比,因为较低的rate 将始终需要数组中的更多位进行比较,例如,对于给定的 10% FPR,这是 5 位。
_delta_index
包含索引文件,其命名格式与添加 index.v1.parquet
后缀。
我们可以通过将 spark.databricks.io.skipping.bloomFilter.enabled
会话配置设置为 true
来启用或禁用布隆过滤器或 false
,如以下屏幕截图所示。 Azure Databricks 中已默认启用此选项:
Creating a Bloom filter index
我们可以使用 CREATE BLOOMFILTER INDEX
命令创建布隆过滤器。这适用于表上的所有列或仅适用于其中的一个子集。
创建布隆过滤器的语法如以下代码片段所示:
以前面章节中使用的 COVID
数据集为例,我们可以在 patient_id
列中创建一个布隆过滤器,如图所示在以下屏幕截图中:
我们还可以使用 Delta 表的路径引用相同的命令,如以下代码片段所示:
在这种情况下,我们为 col1
和 col2
列创建 Bloom 过滤器,FPR 分别为 10% 和 20%。
我们还可以传递 numItems
选项,这是一个文件可以包含的不同项目的数量,以及 maxExpectedFpp
选项,它控制不将布隆过滤器写入磁盘的最大 FPR。也就是说,我们创建了一种方法来更快地查询写入内存的数据,我们只需计算一次。
我们还可以使用 DROP BLOOMFILTER INDEX
命令从表中删除所有 Bloom 过滤器。如果我们想删除我们最近创建的 Bloom 过滤器,我们可以运行以下代码:
这将删除 col1
和 col2
列中的所有 Bloom 过滤器。请记住,在表上运行 VACUUM
也会删除所有 Bloom 过滤器。
Optimizing join performance
对表执行连接可能是一项耗费资源的操作。为了提高此类操作的性能,我们可以选择数据子集或纠正可能存在的缺陷,例如我们的数据中文件 大小的分布不成比例。解决这些问题可以提高性能并导致更有效地使用分布式计算能力。
Azure Databricks Delta Lake 通过提供范围筛选和更正我们表中数据文件大小分布的偏度来优化连接操作。
Range join optimization
连接是经常使用的,因此优化这些操作可以大大提高我们的查询性能。范围连接优化是指定需要对范围给定的数据子集执行连接的过程。
当连接操作具有类型为数字或日期时间类型的过滤条件时,将应用范围连接优化,并且可以将其理解为间隔并具有 bin 大小的调整数。
可以在以下代码片段中看到间隔的示例:
要使用范围连接 优化,我们需要建立所需的 bin 大小。为此,我们使用提示或在 Spark 会话配置中进行设置,因为除非指定,否则它不会运行。 bin-size 数字在使用日期类型时以天为单位,在使用时间戳时以秒为单位。
Enabling range join optimization
我们通过显式传递带有与 bin 大小对应的参数的提示来启用 范围连接优化。通过在 SELECT
语句上使用特定的 RANGE_JOIN
语法调用提示,并接收目标表和 bin 大小作为参数。除了表,我们还可以指定视图和子查询,考虑到这可能会损害性能。
在下一个 示例中,范围连接是在 bin 大小为 20
的表上建立的:
您还可以在范围内将 bin 大小指定为数据的百分位数。在以下情况下,这是 10%:
最后,我们还可以使用 BETWEEN
命令指定时间戳和日期时间格式的范围连接,如前所述:
优化的有效性将取决于具有正确的 bin 大小。相对于区间而言太小的 bin 大小可能会导致区间之间的重叠。我们可以通过查看我们将用作过滤器的列中的百分位数来选择合适的 bin 大小。这可以使用 APPROX_PERCENTILE
命令来完成。
在下面的代码示例中,我们正在查看 .25
、.5
和
个百分位数,data_ranges
的 .75end
和 start
之间有间隔代码>栏:
如果我们的区间长度是统一的,我们可以将 bin 大小设置为最常见的区间长度。
bin 大小是一个高度依赖于数据分布的 参数,因此建议测试不同的方法来找到合适的 bin 大小,以利用所有并行处理计算优势.
Skew join optimization
当我们的数据分区比其他分区大得多时,我们会遇到 偏度问题。当我们在那个分区上执行我们的进程时,这可能会产生问题,因为在这个分区上工作需要更多的时间。这在 Azure Databricks 等分布式计算系统上尤其糟糕,因为许多任务是并行运行的,并且让一个任务比其他任务花费更长的时间会阻塞依赖于该任务的执行。
这种不均衡的工作负载问题主要影响连接操作,因为大表需要重新洗牌。如果某些执行时间比其他执行时间长,这个问题可能会影响我们的查询。
可以使用 Azure Databricks 中的倾斜连接优化提示来解决此问题,方法与我们在范围连接优化中看到的相同。
提示使用 /*
语法开始,/*
以 SELECT 结束
语句,传递 SKEW
命令,该命令将我们要优化的列作为参数。
下面的代码片段显示了一个示例:
Delta Lake 使用提示中的信息来优化我们查询的执行计划。我们还可以将其应用于子查询,如下面的代码示例所示,我们在其中运行子查询来过滤我们的数据:
我们还可以 使用 SKEW
命令指定关系和列。我们将在下一节中看到如何做到这一点。
Relationships and columns
倾斜连接 优化是一项计算密集型操作,必须仅在需要时运行。这就是为什么 SKEW
的参数是列名的原因。运行连接时,只需在我们用来执行此操作的列上运行 SKEW
很有用。
在以下代码示例中,我们将倾斜提示应用于单个列,即 user_id
:
我们还可以将提示应用于多个列,如下所示:
您还可以在提示中指定偏斜值。根据查询和数据,偏差值可能是已知的(例如,因为它们从不改变)或者可能很容易找出。这样做可以减少倾斜连接优化的开销。否则,Delta Lake 会自动检测到它们。
根据查询的类型,也可以在提示上指定值。传递给 SKEW
的参数越具体,优化任务的开销就越少。
在以下代码示例中,我们对指定单个倾斜值的单个列运行查询:
我们还可以传递几个 skew 值,如下所示:
最后,我们还可以 指定几个列,并分别为这些列指定几个值,如下所示:
我们可以看到,我们可以有选择地纠正数据中的偏度,仅在我们选择的符合规定条件的列中应用这种转换。这样,我们可以以更有效的方式进行优化。
Summary
在本章中,我们讨论了如何应用 Delta Engine 功能(例如 Z-Ordering、数据跳过和数据缓存等)来改进数据的布局结构,从而全面提高查询性能。我们可以利用这些工具将文件重组为更小、更紧凑的文件,其中数据根据访问频率分布。我们还可以在数据列上创建 Bloom 过滤器,以提高我们运行查询的速度。
在下一章中,我们将深入了解如何在 Delta 表中摄取数据流。