vlambda博客
学习文章列表

读书笔记《distributed-data-systems-with-azure-databricks》第5章Delta引擎简介

Chapter 6: Introducing Structured Streaming

许多组织需要在日常流程中持续使用大量数据。因此,为了能够提取洞察和使用数据,我们需要能够在这些信息到达时对其进行处理,从而需要持续的数据摄取过程。这些持续的应用程序需要克服挑战,例如创建一个可靠的流程来确保数据的正确性,尽管可能会出现流量高峰、数据未及时到达、上游故障等可能的故障,这些在使用时很常见不断输入的数据或转换的数据,没有一致的文件格式,具有不同的结构级别或需要在使用前进行聚合。

处理这些问题的最传统方法是处理在周期性任务中执行的批量数据,这些数据处理原始流和数据并将它们存储为更有效的格式以允许对数据进行查询,这一过程可能会导致高延迟。

在本章中,我们将学习结构化流,它是 Apache Spark 应用程序编程接口 (API),它允许我们以与对静态数据进行批量计算相同的方式对流数据进行计算。 Structured Streaming 是一个容错引擎,它允许我们针对低延迟执行优化操作 Structured Query Language (SQL)-由于使用了 Spark SQL,就像对数据的查询一样,同时确保了数据的一致性。

我们可以在创建 ETL 管道时使用结构化流,这将允许我们过滤、转换和清理原始结构中的数据,将数据类型格式化为更有效的存储格式,基于相关列应用有效的分区策略等等.具体来说,在本章中,我们将研究以下主题:

  • 使用结构化流 API
  • 在处理连续数据流时使用 Azure Databricks 中可用的不同源
  • 从查询失败中恢复
  • 优化流式查询
  • 触发流式查询执行
  • 可视化流数据帧上的数据
  • 结构化流式传输示例

事不宜迟,我们将通过讨论结构化流模型以及我们如何利用它们的架构来提高处理数据流时的性能来开始我们的讨论。

Technical requirements

本章将要求您拥有可用于处理示例的 Azure Databricks 订阅,以及连接到正在运行的集群的笔记本。

让我们首先更详细地研究结构化流模型,以了解哪些替代方案可用于处理 Azure Databricks 中的数据流。

Structured Streaming model

结构化流模型基于一个简单但强大的前提:对数据执行的任何查询都将在给定时间产生与批处理作业相同的结果。该模型通过在数据到达数据湖、引擎内以及与外部系统一起工作时处理数据来确保一致性和可靠性。

如前几章所述,要使用结构化流,我们只需要使用 Spark 数据帧和 API,说明哪些是 I/O 位置。

结构化流模型通过将到达的所有新数据视为附加到未绑定表的新行来工作,从而使我们有机会对数据运行批处理作业,就好像所有输入都被保留了一样,而不必这样做。然后我们可以将流数据查询为静态表并将结果输出到数据接收器。

结构化流式处理能够做到这一点,这要归功于一个名为 Incrementalization 的功能,它会在每次 我们运行询问。这也有助于确定每次新数据到达时哪些状态需要保持更新。

当使用结构化流数据帧时,我们定义何时使用特定触发器更新 表。每次激活触发器时,Azure Databricks 都会通过为到达我们数据源的每个文件创建新行来增量更新表。

最后,要定义的最后一部分是 输出模式。这些模式允许我们确定如何增量写入外部系统,例如 Simple Storage Service (S3) 或三个数据库不同的方式,概述如下:

  • 追加模式:只追加新行。此模式不允许更改现有行。
  • 完整模式:每次有更新时,整个表重写到外部存储。
  • 更新模式:只有自上次触发后发生变化的行被写入外部存储,允许就地更新,例如作为 SQL 表。

结构化流输出表基于顺序流提供一致的结果,并且在使用输出接收器时也确保了容错性。例如,使用 Complete 输出模式,将通过更新 SQL 表中的记录来处理乱序数据。我们可以建立一个阈值来避免更新表的过旧数据。

结构化流式处理是一个容错系统,它允许查询以增量方式对流式数据运行,如以下示例中所述:

  • 例如,如果我们要读取连续到达 S3 位置的 JavaScript Object Notation (JSON) 文件,我们编写以下代码:
    input_df = spark.readStream.json("s3://datalogs")
  • 我们可以对 Spark 数据帧进行操作并进行基于时间的聚合,最终写入 SQL 数据库,如下所示:
    input_df.groupBy($"message", window("update_time",                                      "1小时")).count()        .writeStream.format("jdbc").start(jdbc_conn_string)
  • 如果我们将其作为批处理操作运行,唯一的变化将是我们如何写入和读取数据。例如,要只读取一次数据,请运行以下代码:
    input_df = spark.read.json("s3://datalogs")
  • 然后,我们可以使用标准的 Spark DataFrame API 进行操作并写入 SQL 数据库,如下面的代码片段所示:
    inputDF.groupBy("message", window("update_time",                                    "1小时")).count()        .writeStream.format("jdbc").save(jdbc_conn_string)

在下一节中,我们 将更详细地了解如何使用Structured Streaming API,包括如何执行交互式查询和窗口聚合,我们将讨论映射和过滤操作。

Using the Structured Streaming API

结构化流 集成到 PySpark API 中并嵌入到 Spark DataFrame API 中。它在处理流数据时提供了易用性,并且在大多数情况下,它需要非常小的更改才能从静态数据的计算迁移到流计算。它提供了执行窗口聚合和设置执行模型参数的功能。

正如我们在前几章中所讨论的,在 Azure Databricks 中,数据流表示为 Spark 数据帧。我们可以通过检查数据帧的 isStreaming 属性是否设置为 true 来验证数据帧是数据流.为了使用结构化流进行操作,我们可以将步骤总结为读取、处理和写入,如下所示:

  1. We can read streams of data that are being dumped in, for example, an S3 bucket. The following example code shows how we can use the readStream method, specifying that we are reading a comma-separated values (CSV) file stored in the S3 Uniform Resource Interface (URI) passed:
    input_df=spark.readStream.csv("s3://data_stream_uri")

    生成的 PySpark 数据帧 input_df 将作为输入表,随着新文件到达输入目录,不断扩展新行。我们假设这个表有两列——update_timemessage

  2. Now, you can use the usual data frame operations to transform the data as if it were static. For example, if we want to count action types of each hour, we can do this by grouping the data by message and 1-hour windows of time, as illustrated in the following code snippet:
    counts_df = input_df.groupBy($"message", window($"update_time ", "1 hour")).count()

    新的dataframe,counts_df,是我们的结果表,有message、window、counts列,查询开始时会不断更新。请注意,即使 input_df 是静态表,此操作也会产生每小时计数结果。该功能提供了一个安全的环境来测试静态数据集上的业务逻辑,并在不更改逻辑的情况下无缝迁移到流数据中。

  3. 最后,我们将此表写入接收器并通过使用 Java 数据库连接创建到 SQL 数据库的连接来启动流式计算> (JDBC) 连接字符串,如以下代码片段所示:
    final_query = counts_df.writeStream.format("jdbc").start("jdbc://...")< button class="copy-clipboard-button">复制

返回的查询是 StreamingQuery 查询,它是在后台运行的活动流 执行的句柄。以后可以使用它来管理和监视我们的流式查询的执行。除了这些基础知识之外,结构化流式处理中还可以完成更多操作。

Mapping, filtering, and running aggregations

结构化 流式处理允许使用映射、过滤、选择和其他方法来转换数据。在前面的示例中,我们使用了以下功能之一:通过 Spark API 对数据进行基于时间的聚合。正如我们在前面的示例中看到的,可以通过数据帧上的操作 将聚合等聚合表示为一个简单的组.这些操作的使用将在下一节中举例说明,但请记住,结构化流允许我们以与普通 PySpark 数据帧相同的方式执行映射、过滤和其他数据处理方法。

在下一节中,我们将看到一些如何使用结构化流执行这些操作的示例。

Windowed aggregations on event time

处理具有时间维度的数据时,计算操作通常需要窗口化到某些时间段,在滑动窗口等情况下会相互重叠。例如,我们可能需要在每 5 分钟向前滑动的 1 小时窗口内执行操作。

窗口是使用 PySpark 数据帧中的 window 函数指定的。例如,通过执行以下操作更改为滑动窗口方法(如之前在结构化流模型部分中给出的示例中所示):

input_df.groupBy("message", window("update_time", "1 hour", 
                                   "5 minutes")).count()

在我们的之前的例子中,结果信息被格式化为(hour, message, count);现在,它将具有 (window, message, count) 的形式。迟到的乱序数据将被处理并相应地更新结果,如果我们在 Complete 模式下使用外部数据接收器,数据也会在那里更新。

Merging streaming and static data

正如我们之前提到的 ,使用结构化流式处理意味着使用 Spark 数据帧,这很简单,允许我们结合使用流式处理和静态数据。例如,如果我们有一个名为 users 的表,并且我们想将它附加到流数据帧中,我们可以执行以下操作,我们引入一个名为 users 的表并附加它到我们的 input_df 数据框:

users_df = spark.table("users")
input_df.join(users_df, "user_id").groupBy("user_name", hour("time")).count()

我们还可以使用查询创建此静态数据帧,并在相同的批处理和流操作中运行它。

Interactive queries

在结构化流中,我们 可以使用 Spark JDBC 服务器将计算结果直接用于交互式查询。可以选择使用为少量数据设计的小型内存接收器,我们可以将结果写入 SQL 表,然后查询其中的数据。例如,我们可以创建一个名为 message_counts 的表,它可以用作我们以后可以查询的小型数据接收器。

我们通过创建一个内存表来做到这一点,如下所示:

message_counts.writeStream.format("memory")
  .queryName("user_counts")
  .outputMode("complete")
  .start()

然后,我们使用 SQL 语句对其进行查询,如下所示:

%sql
select sum(count) from message_counts where message='warning'"

在下一个 部分,我们将讨论我们可以从中读取流数据的所有不同来源。

Using different sources with continous streams

数据流可以来自多种来源。结构化流式处理支持从诸如 Delta 表、publish/subscribe (pub/sub) 系统(如 Azure)等源中提取数据事件中心等。我们将在接下来的部分中回顾其中的一些来源,以了解如何将这些数据流连接到在 Azure Databricks 中运行的作业中。

Using a Delta table as a stream source

如前一章 中所述,您可以通过 readStreamwriteStream Spark 方法,特别侧重于克服与处理和处理小文件、管理批处理作业和有效检测新文件相关的问题。

当 Delta 表用作数据流源时,对该表执行的所有查询都将处理该表上的信息以及自流启动以来到达的任何数据。

在下一个示例中,我们将路径和表都加载到数据框中,如下所示:

df = spark.readStream.format("delta").load("/mnt/delta/data_events")

或者,我们可以通过引用 Delta 表来做同样的事情,如下所示:

df = spark.readStream.format("delta").table('data_events')

Delta Lake 中 Structured Streaming 的特点之一是我们可以控制每次触发触发器时要考虑的最大新文件数。我们可以通过将 maxFilesPerTrigger 选项设置为要考虑的所需文件数来控制这一点。另一个要设置的选项是每个微批处理中处理多少数据的速率限制。这是通过使用 maxBytesPerTrigger 选项控制的,该选项控制每次激活触发器时处理的字节数。要处理的字节数将与此选项中指定的数量大致匹配,但可能会略微超过此限制。

值得一提的是,如果我们尝试追加列或修改用作源的 Delta 表的模式,结构化流将失败。如果需要引入更改,我们可以通过两种不同的方式来解决这个问题,概述如下:

  • 进行更改后,我们可以删除输出和检查点并从头开始重新启动流。
  • 我们可以使用 ignoreDeletes 选项来忽略导致数据删除的操作,或者使用 ignoreChanges 这样流就不会得到因源表中的删除或更新而中断。

例如,假设我们有一个名为 user_messages 的表,我们从中流出包含日期、user_email 和消息列的数据按日期划分。如果我们需要在分区边界删除数据,这意味着在分区列上使用 WHERE 子句进行删除,则文件已经被该列分段。因此,删除操作只会从元数据中删除这些文件。

因此,如果您只想在分区列上使用 WHERE 子句删除数据,则可以使用 ignoreDeletes 选项来避免让您的流中断,如以下代码片段所示:

data_events.readStream
  .format("delta").option("ignoreDeletes", "true")
  .load("/mnt/delta/user_messages")

但是,如果您必须根据 user_email删除数据,则需要使用以下代码:

data_events.readStream
  .format("delta").option("ignoreChanges", "true")
  .load("/mnt/delta/user_messages")

如果您使用 UPDATE 语句更新 user_email 变量,则包含该记录的文件将被重写。如果 ignoreChanges 选项设置为 true,则将再次处理此文件,将新记录和其他已处理的记录插入到同一文件中,从而在下游产生重复。因此,建议实现一些逻辑来处理这些传入的重复记录。

为了在不处理整个表的情况下指定 Delta Lake 流源的起点,我们可以使用以下选项:

  • startingVersion:Delta Lake 版本开始
  • startingTimestamp:开始的时间戳

我们可以在这两个选项之间进行选择,但不能同时使用这两个选项。此外,通过设置此选项所做的更改只会在新流启动后发生。如果流已经在运行并且写入了检查点,则这些选项将被忽略:

  • 例如,假设您有一个 user_messages 表。如果您想阅读版本 10 以来的更改,可以使用以下代码:
    data_events.readStream   .format("delta").option("startingVersion", "10")   .load("/mnt/delta/user_messages")
  • 如果您想阅读自 2020-10-15 以来的更改,可以使用以下代码:
    data_events.readStream   .format("delta")option("startingTimestamp", "2020-10-15")   .load("/mnt/delta/user_messages")

请记住,如果 您使用这些选项之一设置起点,则表的架构仍将是 Delta 表的最新架构,因此我们必须避免创建使架构不兼容的更改。否则,流式传输源将在读取数据时产生不正确的结果或失败。

使用结构化流允许我们写入 Delta 表,并且由于事务日志,Delta Lake 将保证在处理完成后准确无误,即使针对流表运行多个查询。

默认情况下,结构化流以 Append 模式运行,这会将新记录添加到表中。此选项由 outputMode 参数控制。

例如,我们可以将模式设置为 "append" 并使用 path 方法引用表格,如下所示:

data_events.writeStream.format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/data_events/_checkpoints/etl-from-json")
  .start("/delta/data_events")

或者,我们可以使用 table 方法,如下所示:

data_events.writeStream.format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/data_events/_checkpoints/etl-from-json")
  .table("data_events")

我们可以使用 Complete 模式来重新处理表格上的所有可用信息。例如,如果我们想在每次更新 表时通过 user_id 值聚合用户数,我们可以设置模式来完成,整个表将在每批中重新处理,如下面的代码片段所示:

spark.readStream.format("delta")
  .load("/mnt/delta/data_events")
  .groupBy("user_id").count()
  .writeStream.format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/mnt/delta/messages_by_user/_checkpoints/streaming-agg")
  .start("/mnt/delta/messages_by_user")

使用一次性触发器可以更好地控制这种行为,使我们能够对应该触发表更新的操作进行细粒度控制。我们可以将它们用于聚合表等应用程序或需要每天处理的数据。

Azure Event Hubs

Azure 事件中心是一种遥测服务, 收集、转换和存储事件。我们可以使用它来从不同的遥测源获取数据并触发云中的流程。

建立连接的方法有很多。一种方法是使用 ConnectionStringBuilder 来制作连接字符串。为此,请按照下列步骤操作:

  1. The following Scala code shows how we can establish a connection using ConnectionStringBuilder by defining parameters such as the event hub name, shared access signature (SAS) key, and key name:
    import org.apache.spark.eventhubs.ConnectionStringBuilder val connections_string = ConnectionStringBuilder()   .setNamespaceName("your_namespace-name")   .setEventHubName("event_hub-name")   .setSasKeyName("your_key-name")   .setSasKey("your_key build

    为了创建此连接,我们需要在使用 Maven 坐标的 Azure Databricks 工作区中安装 Azure 事件中心库。此连接器会定期更新,因此要查看当前版本,请转到 Azure 事件中心 Spark 连接器项目中的 Latest Releases < code class="literal">README 文件。

  2. 建立 连接后,我们可以使用它开始将数据流式传输到 Spark 数据帧中。以下 Scala 代码显示了我们如何首先定义事件中心配置:
    val event_hub_conf = EventHubsConf(connections_string)   .setStartingPosition(EventPosition.fromEndOfStream)
  3. 最后,我们可以开始流式传输到我们的数据帧中,如下所示:
    var streaming_df =   spark.readStream     .format("eventhubs").options(event_hub_conf.toMap)     .load()
  4. 您还可以使用 Python 定义它并指定到 Azure 事件中心的连接字符串,然后开始直接读取它的流,如下所示:
    conf = {} conf["eventhubs.connectionString"] = "" stream_df = 火花     .readStream     .format("eventhubs")     .options(**conf)     .load()

在这里,我们创建了一个连接,并使用它直接从连接字符串中指定的 Azure 事件中心读取流。

在 Azure 事件中心之间建立 连接允许您将正在跟踪的不同源的遥测数据聚合到单个 Spark 数据帧中,该数据帧可以稍后由应用程序处理和使用或存储用于分析。

Auto Loader

自动加载程序是 Azure Databricks 功能,用于在数据到达 Azure Blob 存储、Azure Data Lake Storage Gen1 或 Azure Data Lake Storage Gen2 时增量处理数据。它提供了一个名为 cloudFiles 的结构化流式源,当给定云文件存储上的输入目录路径时,它会在文件到达时自动处理文件,并添加了还处理选项该目录中的现有文件。

Auto Loader 使用两种模式来检测到达存储点的文件,概述如下:

  • 目录列表:这会将所有文件列出到 输入目录中。当存储中的文件数量太大时,它可能会变得太慢。
  • 文件通知:这会创建 Azure 事件中心通知服务,以便在新文件到达输入目录时检测它们。对于大型输入目录,这是一种更具可扩展性的解决方案。

重新启动流时可以更改文件检测模式。例如,如果目录列表模式开始变慢,我们可以将其更改为文件通知。在这两种模式下,Auto Loader 都会跟踪文件以保证这些文件只处理一次。

cloudFiles 选项的设置方式与其他流媒体源的设置方式相同,如下所示:

df = spark.readStream.format("cloudFiles") \
  .option(<cloudFiles-option>, <option-value>) \
  .schema(<schema>).load(<input-path>)

更改 文件检测模式后,我们可以通过将其再次写回到不同的检查点路径来启动新流,如下所示:

df.writeStream.format("delta") \
  .option("checkpointLocation", <checkpoint-path>).start(<output-path>)

接下来,我们将了解如何在结构化流中使用 Apache Kafka 作为其数据源之一。

Apache Kafka

Pub/sub 消息传递系统用于提供异步服务到服务通信。它们用于无服务器和微服务架构中,以构建事件驱动的架构。在这些系统中,发布的消息会立即被该主题的所有订阅者接收。

Apache Kafka 是一个分布式发布/订阅消息系统,它使用实时数据流并以并行和容错的方式将它们提供给下游利益相关者。在构建需要跨不同处理系统处理数据的可靠实时流数据管道时,它非常有用。

Apache Kafka 中的数据被组织成主题,这些主题又被分成分区。每个分区都是一个有序的记录序列,类似于提交日志。随着新数据到达 Apache Kafka 中的每个分区,每个记录都被分配一个称为 offset 的顺序 ID 号。这些主题中的数据会保留一定的时间(称为保留期),它是一个 可配置参数。

系统看似无限的性质意味着我们需要确定要开始读取数据的时间点。为此,我们有以下三种选择:

  • 最早:我们将从流的开头开始读取我们的数据,不包括从 Kafka 中删除的保留期之前的数据。
  • 最新:我们将只处理查询开始后到达的新数据。
  • 每个分区分配:这样,我们指定每个分区的精确偏移量,以控制处理应该开始的确切时间点。当我们想要准确地了解进程失败的位置时,这很有用。

startingOffsets 选项 仅接受这些选项之一,并且仅用于从新检查点开始的查询。从现有检查点重新启动的查询将始终从中断处恢复,除非该偏移处的数据早于保留期。

在结构化流中使用 Apache Kafka 的一个优点是,我们可以管理流第一次启动时要做什么,以及如果由于数据超过保留期而导致查询无法从中断处继续执行时该怎么做,分别使用 startingOffsetsfailOnDataLoss 选项。这是使用 auto.offset.reset 配置选项设置的。

在结构化流中,您可以表达复杂的转换,例如一次性聚合,并将结果输出到各种系统。正如我们所见,将结构化流与 Apache Kafka 结合使用允许您使用与处理批处理数据时相同的 API 转换从 Apache Kafka 读取的增强数据流,并集成从 Kafka 读取的数据,以及存储在其他系统中的数据,例如S3 或 Azure Blob 存储。

为了使用 Apache Kafka,建议将证书存储在 Azure Blob 存储或 Azure Data Lake Storage Gen2 中,以便以后使用装载点进行访问。一旦你的路径被挂载并且你已经存储了你的秘密,你可以通过运行以下代码连接到 Apache Kafka:

streaming_kafka_df = spark.readStream.format("kafka") 
  .option("kafka.bootstrap.servers", ...) 
  .option("kafka.ssl.truststore.location",<dbfs-truststore-location>) 
  .option("kafka.ssl.keystore.location", <dbfs-keystore-location>) 
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>)) 
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))

要写入 Apache Kafka,我们可以在任何包含名为 value 的列和可选的列的 PySpark 数据帧上使用 writeStream命名为 key。如果未指定键列,则将自动添加空值键列。请记住,空值键 列可能会导致 Kafka 中的数据分区不均匀,因此您应该注意这种行为。

我们可以将目标主题指定为 DataStreamWriter 的选项,也可以基于每条记录指定为数据框中名为主题的列。

在以下代码示例中,我们将数据帧中的键值数据写入指定的 Kafka 主题:

query = streaming_kafka_df
  .selectExpr("CAST(user_id AS STRING) AS key", "to_json(struct(*)) AS value") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .option("checkpointLocation", path_to_HDFS).start()

上述查询获取包含用户信息的数据帧并将其写入 Kafka。 user_id 是用作键的字符串。在此操作中,我们将数据帧的所有列转换为 JSON 字符串,将结果放入记录的值中。

在下一节中,我们将学习如何在结构化流中使用 Avro 数据。

Avro data

Apache Avro 是一个常用的数据 序列化系统。我们可以使用 Azure Databricks 中的 Avro 数据,使用 from_avroto_avro 函数来构建流管道,将列编码为二进制和从二进制编码Avro 数据。类似于 from_jsonto_json,您可以将这些函数用于任何二进制列,但您必须手动指定 Avro 架构。 from_avroto_avro 函数可以在流式查询中传递给 SQL 函数。

以下代码片段展示了我们如何使用 from_avroto_avro 函数以及流数据:

from pyspark.sql.avro.functions import from_avro, to_avro
streaming_df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro("key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro("value", SchemaBuilder.builder().intType()).as("value"))

在这种情况下,当读取一个Kafka主题的key和value时,我们必须将二进制Avro数据解码为结构化数据,其架构如下:

我们还可以将 结构化数据从 string 和 int 转换为二进制,将其解释为键和值,然后将其保存到 Kafka 主题中。以下代码片段说明了执行此操作的代码:

streaming_df.select(
    to_avro("key").as("key"),
    to_avro("value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("article", "t")
  .save()

您还可以通过使用 JSON 字符串来定义字段及其数据类型(例如,如果 "/tmp/userdata.avsc" 为)来指定架构,如下所示:

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": " filter_value", "type": ["string", "null"]}
  ]
}

我们可以创建一个 JSON 字符串并在 from_avro 函数中使用它,如下所示。首先,我们指定要读取的 JSON 模式,如下所示:

from pyspark.sql.avro.functions import from_avro, to_avro
json_format_schema = open("/tmp/ userdata.avsc", "r").read()

然后,我们可以在 from_avro 函数中使用模式。在下面的代码示例中,我们首先将 Avro 数据解码为 struct,通过 filter_value 列进行过滤,最后将 name 列编码为 Avro 格式:

output = straming_df.
  .select(from_avro("value", json_format_schema).alias("user"))\
  .where('user.filter_value == "value1"')\
  .select(to_avro("user.name").alias("value"))

Apache Avro 格式广泛用于 Hadoop 环境,是数据管道中的常用选项。 Azure Databricks 支持使用此文件格式引入和处理源。

下一节将深入探讨我们如何集成不同的数据接收器,以便在处理数据之前和之后将数据转储到那里。

Data sinks

有时,有必要将要写入的接收数据流聚合到另一个位置,该位置可以是数据接收器。数据接收器是我们可以调用存储数据的外部源的一种方式,例如,可以是 S3 存储桶,我们希望在其中保留聚合数据流的副本。在本节中,我们将完成将数据流写入任何外部数据接收器位置的步骤。

在结构化流 API 中,我们有两种方法可以将查询的输出写入没有现有流接收器的外部数据源。这些选项是 foreachBatchforeach 函数。

writeStreamforeachBatch 方法允许重用现有的批处理数据源。这是通过指定对流式查询的每个微批次的输出数据执行的函数来完成的。该函数有两个参数:第一个参数是包含微批次输出数据的数据帧,第二个参数是该微批次的唯一 ID。 foreachBatch 方法允许您执行以下操作:

  • 重用现有的批处理数据源。
  • 写入多个位置。
  • 应用额外的数据框操作。

在存储系统中可能没有可用的流式接收器但我们可以批量写入数据的情况下,我们可以使用 foreachBatch 来批量写入数据。

我们还可以将 流式查询的输出写入多个位置。我们可以通过简单地将输出数据帧写入这些多个位置来做到这一点,尽管这可能会导致每次写入不同位置时都会计算数据,并且可能不得不再次读取所有输入数据。为了避免这种情况,我们可以缓存输出数据帧,然后写入该位置,最后取消缓存。

在下面的代码示例中,我们看到了如何在 Scala 中使用 foreachBatch 写入不同的位置:

streaming_df.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(format_location_1).save(your_location_1)  
  batchDF.write.format(format_location_2).save(your_location_2)  
  batchDF.unpersist()
}

foreachBatch 方法可以帮助我们克服某些限制,这些限制可用于静态 DataFrame,但在流式 DataFrame 中不受支持。 foreachBatch 选项允许我们将这些操作应用于每个微批量输出。

foreachBatch 方法保证数据只被处理一次,尽管您可以使用提供的 batchId 参数作为应用重复数据删除方法的手段.还需要注意的是,foreachBatchforeach 之间的主要区别之一是前者不适用于连续处理模式,因为它从根本上依赖于微批量执行,而后者提供了以连续模式写入数据的选项。

如果由于某种原因,foreachBatch 不是可用的选项,那么您可以使用 foreach 选项对自定义编写器进行编码并将数据写入逻辑分为openprocessclose 方法。

在 Python 中,我们可以在函数或对象中使用 foreach 方法。该函数提供了一种对处理逻辑进行编码的有效方法,但不允许在失败的情况下对输出数据进行重复数据删除,从而导致重新处理某些输入数据。在这种情况下,您应该在对象中指定处理逻辑。

以下自定义函数将一行作为输入并将该行写入存储:

def process_row (row):
    """
    Custom function to write row to storage.
    """
    pass

然后,我们可以在writeStreamforeach方法中使用这个函数,如下:

query = streaming_df.writeStream.foreach(process_row).start()

要构造对象,它需要有一个将行写入存储的过程方法。我们还可以添加可选的 openclose 方法来处理连接,如下面的代码片段所示:

class ForeachWriter:
  def open(self, partition_id, epoch_id):
  """
  Optional method to open the connection
  """
  def process(self, row):
     """
     Custom function to write row to storage. Required method.
     """
  def close(self, error):
    """
   Optional method to close the connection
    """

最后,我们可以使用 writeStreamforeach 方法在查询上调用这个对象,如下面的代码片段所示:

query = streaming_df.writeStream.foreach(ForeachWriter()).start()

通过这种方式,我们可以 非常轻松地定义一个类,该类的方法处理我们使用结构化流接收的数据流,然后写入将用作数据接收器的外部位置使用 writeStream 方法处理的数据流。

在下一节中,我们将通过指定检查点的使用来完成我们应该遵循的步骤,以保持数据的完整性,无论查询失败或流中断如何。

Recovering from query failures

失败的发生可能是因为 输入数据架构的更改、计算中使用的表的更改、文件丢失或许多其他根本原因。在开发使用数据流的应用程序时,有必要拥有强大的故障处理方法。结构化流式处理保证即使在失败的情况下结果也将保持有效。为此,它对输入源和输出接收器提出了两个要求,概述如下:

  • 输入源应该是可重放的,这意味着如果作业失败,可以再次读取最近的数据。
  • 输出接收器应该允许事务更新,这意味着这些更新是原子的(在更新运行时,系统应该保持运行)并且可以回滚。

之前 提到的结构化流式处理功能之一是检查点,它可以为流式查询启用。这还允许流通过在发生故障后简单地重新启动流来快速从故障中恢复,这将导致查询从中断处继续,同时保证数据的一致性。因此,始终建议配置 Enable Query Checkpointing 并配置 Databricks Jobs 以在失败后自动重新启动查询。

要启用检查点,我们可以在写入数据帧时将 checkpointLocation 选项设置为所需的检查点路径。下面的代码片段显示了一个示例:

streaming_df.writeStream
  .format("parquet")
  .option("path", output_path)
  .option("checkpointLocation", checkpoint_path)
  .start()

此检查点位置保留有关查询的所有信息。因此,所有查询都必须具有唯一的检查点路径。

我们面临在使用相同检查点路径的重新启动之间允许的流式查询中可能发生的变化的限制。

通过说允许操作,可以理解我们可以实现更改,但是语义或其效果是否符合预期将取决于查询和应用的具体更改。

not allowed 表示重新启动查询时更改可能会失败。

以下是我们可以进行的更改类型的摘要:

  • 我们不允许更改输入源的数量或类型。
  • 如果我们可以更改输入源的参数,这将取决于源和查询。
  • 允许对输出接收器的类型进行一些更改。
  • 如果我们可以更改输出接收器的参数,这将取决于接收器和查询的类型。
  • 允许对操作进行一些更改,例如映射和过滤操作。
  • 由于更新结果所需的状态数据模式发生变化,有状态操作的更改可能会导致失败。

正如我们所见,结构化 流式处理提供了多种功能,以便为生产级数据流处理提供容错能力。

在下一节中,我们将了解如何优化流式查询的性能。

Optimizing streaming queries

我们可以有一个 流式查询,它利用有状态的操作,例如流式聚合、流式 dropDuplicatesmapGroupsWithStateflatMapGroupsWithState,流式传输到流连接等。连续运行这些操作导致需要在每个执行器的内存中维护数百万个数据状态的键,从而导致瓶颈。

为了克服这个问题,Azure Databricks 提供了一个解决方案,即将数据的状态保存在 RocksDB 数据库中,而不是使用执行程序内存。 RocksDB 是一个提供持久键值存储的库,可以有效地管理内存和本地 SSD 中的数据状态。因此,这些解决方案与结构化流检查点结合使用,可确保防止出现故障。

在开始流式查询之前,您可以通过在 SparkSession 中设置以下配置来启用基于 RocksDB 的状态管理:

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

RocksDB 可以维护比标准执行器内存多 100 倍的状态键。我们还可以使用计算优化的实例(例如 Azure Standard_F16s 实例作为工作人员,并将随机分区的数量设置为最多为内核数量的两倍)来提高查询的性能。集群。

Triggering streaming query executions

触发器是中的一种方式,我们定义了将导致对部分数据执行操作的事件,因此它们处理流数据处理的时间。这些触发器由系统检查新数据是否到达的时间间隔定义。如果这个时间间隔太小会导致不必要的资源使用,所以它应该始终是根据您的具体流程定制的时间量。

流式查询的触发器参数将定义该查询是作为固定批处理间隔上的微批处理查询还是作为连续处理查询执行。

Different kinds of triggers

Azure Databricks 中提供了不同类型的触发器,我们可以使用它们来定义何时执行流式查询。此处概述了可用的选项:

  • Unspecified trigger:这是默认选项,表示除非另有说明,否则查询将以微批处理模式执行,其中,当处理先前的微批处理已经成功执行时,会生成微批处理。
  • 基于固定间隔微批处理的触发:查询将以微批处理模式执行,微批处理将在指定时间开始执行 间隔。如果前一个微批处理执行在小于指定的时间间隔内完成,Azure Databricks 将等到该时间间隔结束,然后再启动下一个微批处理。如果微批处理需要更长的指定时间间隔才能完成,那么下一个批处理将在上一个执行完成后立即执行。

    如果新数据可用,则不会执行微批处理。

  • 一次性微批量触发器:查询将只运行一次以处理所有可用数据。当集群启动和关闭以节省资源时,这是一个很好的选项,因此我们可以处理自集群启动以来所有可用的新数据。
  • Continuous trigger with fix checkpoint interval:查询将以连续的低延迟处理模式启动。

接下来,我们将看到一些示例,说明在使用结构化流处理流数据时如何应用这些不同类型的触发器。

Trigger examples

以下代码片段 提供了结构化流中默认触发器的示例,如前所述,它会尽快运行微批处理:

streaming_df.writeStream \
  .format("console") \
  .start()

我们还可以定义一个特定的时间间隔,使用 processingTime 触发选项。在下面的代码示例中,我们设置了 10 秒的微批处理时间间隔:

streaming_df.writeStream \
  .format("console") \
  .trigger(processingTime='10 seconds').start()

如前所述,我们可以设置一次性触发器来一次性处理我们所有的新数据,如下面的代码片段所示:

streaming_df.writeStream \
  .format("console") \
  .trigger(once=True).start()

最后,我们可以设置一个连续触发器,该触发器将有 2 秒的检查点时间间隔来保存数据,以从最终的故障中恢复。以下代码片段说明了此代码:

streaming_df.writeStream
  .format("console")
  .trigger(continuous='1 second').start()

结构化流式处理中的触发器允许我们对在处理在定义的时间间隔内到达的新数据时查询的行为方式进行细粒度控制。

在本章的最后一节中,我们将深入探讨 Azure Databricks 中可以使用哪些工具来可视化数据。

Visualizing data on streaming data frames

在处理结构化流数据帧中的 数据流时,我们可以使用 display 函数可视化实时数据。此函数不同于其他可视化函数,因为它允许我们指定诸如 processingTimecheckpointLocation 等选项,因为它是实时的。数据的性质。设置这些选项是为了管理我们正在可视化的确切时间点,并且应该始终在生产中设置,以便准确了解我们所看到的数据的状态。

在下面的代码 示例中,我们首先定义了一个Structured Streaming 数据帧,然后我们使用display 函数来显示状态在特定检查点位置每 5 秒处理一次的数据:

streaming_df = spark.readStream.format("rate").load()
display(streaming_df.groupBy().count(), processingTime = "5 seconds", checkpointLocation = "<checkpoint-path>")

具体来说,这些是此函数的可用参数:

  • streamName 参数是我们想要可视化的特定流查询。
  • processingTime 参数定义了流式查询执行频率的时间间隔。如果未指定,Azure Databricks 将在前一个过程完成时检查新数据的可用性——这种行为可能会导致不希望的成本增加。因此,这是一个建议在可能的情况下明确设置的选项。

checkpointLocation 参数是写入检查点数据的路径。如果未指定,则该位置将是临时的。建议设置此参数,以便在发生任何故障时可以从其离开的位置重新启动流。

Azure Databricks display 函数支持多种绘图类型。我们可以通过从执行函数的单元格左上角的下拉菜单中选择所需的绘图类型来选择不同类型的绘图,如下面的屏幕截图所示:

读书笔记《distributed-data-systems-with-azure-databricks》第5章Delta引擎简介

图 6.1 – 显示功能的可用选项

我们可以选择创建 不同类型的图,例如条形图、散点图和线图,并具有能够为这些可视化设置选项的额外功能。如果我们点击 Plot Options 菜单,我们将被提示一个窗口,我们可以在其中根据所选类型自定义绘图,如下面的屏幕截图所示:

读书笔记《distributed-data-systems-with-azure-databricks》第5章Delta引擎简介

图 6.2 – 自定义绘图的可用选项

display 选项在可视化一般数据和特别是流式查询时提供了易用性。提供的选项允许我们轻松拖放创建每种类型的绘图所需的变量,选择执行的聚合类型并修改其设计的几个方面。  

下一节将 包含一个示例,在该示例中我们将模拟数据流并使用结构化流式处理来运行流式查询。

Example on Structured Streaming

在此示例中,我们 将研究如何利用我们在前面几节中获得的结构化流式处理知识。我们将使用其中一个示例数据集来模拟传入的数据流,其中我们有一些小的 JSON 文件,在实际场景中,这些文件可能是我们想要处理的传入数据流。我们将使用这些文件来计算指标,例如时间戳操作流的计数和窗口计数。我们来看一下structured-streaming示例数据集的内容,如下:

%fs ls /databricks-datasets/structured-streaming/events/

您会发现 目录中有大约 50 个 JSON 文件。您可以在以下屏幕截图中看到其中一些:

读书笔记《distributed-data-systems-with-azure-databricks》第5章Delta引擎简介

图 6.3 – 结构化流数据集的 JSON 文件

我们可以使用 fs head 选项查看其中一个 JSON 文件包含的内容,如下所示:

%fs head /databricks-datasets/structured-streaming/events/file-0.json

我们可以看到文件中的每一行都有两个字段,分别是 timeaction,所以我们下一步将尝试使用结构化流以交互方式分析这些文件,如下所示:

  1. 我们将采取的第一步是定义数据的模式并将其存储在 JSON 文件中,我们稍后可以使用该文件创建流数据帧。可以在以下代码段中看到此代码:
    from pyspark.sql.types import * input_path = "/databricks-datasets/structured-streaming/events/"
  2. 由于我们已经知道数据的结构,我们不需要推断模式,因此我们将使用 PySpark SQL 类型来定义我们未来流数据帧的模式,如下面的代码片段所示:
    json_schema = StructType([StructField("time",                                        TimestampType(),                                        真),                            StructField("动作",                                        StringType(),                                        True)])
  3. 现在,我们可以使用之前定义的模式构建一个静态数据框,并将数据加载到 JSON 文件中,如下所示:
    static_df = (   火花     .阅读     .schema(json_schema)     .json(input_path) )
  4. 最后,我们可以使用display函数可视化数据框,如上一节所述,如下:
    display(static_df)

    输出如下:

    读书笔记《distributed-data-systems-with-azure-databricks》第5章Delta引擎简介

    图6.4 - 流数据帧

  5. 我们现在准备运行计算,以便通过基于 1 小时的窗口聚合对数据进行分组来了解打开和关闭订单的数量。我们将使用星号 (*) 从 PySpark SQL 函数中导入所有内容,以获取 window 函数等,如图所示在以下代码片段中:
    from pyspark.sql.functions import *       static_count_df = (   static_df     .groupBy(        static_df.action,        window(static_df.time, "1 小时"))         .count() ) static_count_df.cache()
  6. 接下来,我们将数据帧注册为一个名为 data_counts 的临时视图,如以下代码片段所示:
    static_count_df.createOrReplaceTempView("data_counts")
  7. 最后,我们可以直接使用 SQL 命令查询表。例如,要计算所有小时的总计数,我们可以运行以下 SQL 命令:
    %sql select action, sum(count) as total_count from data_counts group by action

    输出如下:

    读书笔记《distributed-data-systems-with-azure-databricks》第5章Delta引擎简介

    图6.5 - 流式数据帧上的SQL查询

  8. 我们还可以使用 窗口计数来创建按timeaction,如下:
    %sql 选择动作,date_format(window.end, "MMM-dd HH:mm") 作为时间,从 data_counts 按时间顺序计数,动作

    输出如下:

    读书笔记《distributed-data-systems-with-azure-databricks》第5章Delta引擎简介

    图6.6 - 流数据帧的计数窗口

  9. 在下面的代码示例中,我们将使用与此处使用的定义类似的定义,但使用 readStream 选项而不是 read方法。此外,为了模拟文件流,我们将使用 maxFilesPerTrigger 选项一次选择一个文件并将其设置为 1
    from pyspark.sql.functions import * streaming_df = (   火花     .readStream                     ;       .schema(json_schema)                    .option("maxFilesPerTrigger", 1)       .json(input_path) )
  10. 然后,我们可以进行之前应用于静态数据帧的相同查询,如下所示:
    streaming_counts_df = (               ;      streaming_df     .groupBy(       streaming_df.action,       window(streaming_df.time, "1 小时"))     .count() )
  11. We can check that the data frame is actually a streaming source of data by running the following code:
    streaming_df.isStreaming

    输出如下:

    读书笔记《distributed-data-systems-with-azure-databricks》第5章Delta引擎简介

    图 6.7 – 检查数据帧是否是流源

    可以看到,streamingCountsDF.isStreaming的输出是True,因此streaming_df 是一个流数据帧。

  12. 下一个选项是 set,以保持较小的随机播放大小,如以下代码片段所示:
    spark.conf.set("spark.sql.shuffle.partitions", "2")  
  13. We can then store the results of our query, passing as format "memory" to store the result in an in-memory table. The queryName option will set the name of this table to data_counts. Finally, we set the output mode to complete so that all counts are on the table. The code can be seen in the following snippet:
    query =   streaming_counts_df     .writeStream     .format("memory")             .queryName("data_counts")          .outputMode("complete")       .start()

    query 对象是一个在后台运行的句柄,不断寻找新文件并更新窗口化结果聚合计数。

    注意前面代码片段中查询的状态。单元格中的进度条显示查询的状态,在我们的例子中是活动的。

  14. 我们将人为地等待一些文件处理完毕,然后对内存中的 data_counts 表运行查询。首先,我们通过运行以下代码引入 5 秒的睡眠:
    从时间导入睡眠 sleep(5)  
  15. Then, we can run a query in another cell as a SQL command, using SQL magic, as follows:
    %sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from data_counts order by time, action

    以下屏幕截图 向我们展示了通过在 Databricks 笔记本代码单元之一中运行此查询创建的结果图:

    读书笔记《distributed-data-systems-with-azure-databricks》第5章Delta引擎简介

    图 6.8 – 等待前的计数

    我们可以看到窗口计数的时间线越来越大。运行这些查询将始终导致最新更新的计数,流式查询正在后台更新。

  16. 如果我们再等几秒钟,我们将在结果表中计算出新数据,如下面的代码片段所示:
    sleep(5) 
  17. And then, we run the query again, as follows:
    %sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from data_counts order by time, action

    我们可以在 中看到通过运行查询创建的图表的以下屏幕截图,该图表右侧的结果随着更多数据的摄取而发生变化:

    读书笔记《distributed-data-systems-with-azure-databricks》第5章Delta引擎简介

    图 6.9 – 等待后的计数

  18. We can also see the resulting number of "opens" and "closes" by running the following SQL query:
    %sql select action, sum(count) as total_count from data_counts group by action order by action

    由于数据集中存在少量文件,此示例有其局限性。将它们全部消耗完后,表将不再有更新。

  19. 最后,我们可以停止 在后台运行的查询,方法是单击查询单元格中的取消链接,或者通过执行 query.stop 函数,如以下代码片段所示:
    query.stop()

无论哪种方式,运行查询的单元格的状态都将更新为 TERMINATED

Summary

在本章中,我们回顾了结构化流的不同特性,并研究了在处理来自不同来源的数据流时如何在 Azure Databricks 中利用它们。

这些源可以是来自 Azure 事件中心的数据,也可以是使用 Delta 表作为流式源、使用 Auto Loader 管理文件检测、从 Apache Kafka 读取、使用 Avro 格式文件以及通过处理数据接收器派生的数据。我们还描述了结构化流在处理数据流时如何提供容错,并研究了如何使用显示功能可视化这些流。最后,我们以一个示例结束,其中我们模拟了 JSON 文件到达存储。

在下一章中,我们将更深入地探讨如何使用 PySpark API 来操作数据,如何在 Azure Databricks 中使用 Python 流行的库,以及在分布式系统上安装它们的细微差别,如何轻松地从 Pandas 迁移使用 Koalas API 进入大数据,以及如何使用流行的 Python 库可视化数据。