vlambda博客
学习文章列表

读书笔记《distributed-data-systems-with-azure-databricks》第7章在Azure数据库中使用Python库

Chapter 8: Databricks Runtime for Machine Learning

本章将深入探讨经典机器学习算法的开发,以基于表格数据训练和部署模型,同时探索库和算法。这些示例将重点介绍使用 Azure Databricks Runtime for Machine Learning (Databricks Runtime ML) 的特殊性和优势。

在本章中,我们将探讨以下概念,这些概念侧重于我们如何提取和改进数据中可用的特征,以训练我们的机器学习和深度学习模型。我们将讨论的主题在这里列出:

  • 加载数据中
  • 特征工程
  • 时间序列数据源
  • 处理缺失值
  • 从文本中提取特征
  • 在表格数据上训练机器学习模型

在接下来的部分中,我们将讨论执行所介绍的操作所需的必要库,并提供一些关于最佳实践和一些核心概念如何与之相关的上下文。

事不宜迟,让我们开始使用 Azure Databricks 中的机器和深度学习模型。

Loading data

逗号分隔 值(CSV)是最广泛使用的格式机器学习应用程序中的表格数据。正如名称所暗示的,它以行的形式存储数据,以逗号或制表符分隔。

部分介绍了有关加载专门用于机器学习和深度学习应用程序的数据的信息。尽管我们可以考虑前面章节中介绍的这些概念,但我们将加强有关如何将表格数据直接读入 Azure Databricks 以及执行此操作的最佳实践的概念。

Reading data from DBFS

在分布式计算环境(如 Azure Databricks)中训练机器学习算法时,共享存储的需求变得很重要,尤其是在使用分布式深度学习应用程序时。 Azure Databricks 文件 系统 (DBFS ) 允许使用 Spark 和本地文件应用程序 集群的数据>编程 接口(API):

在 Azure Databricks 中,我们可以选择提供高性能 Filesystem in 用户空间 (FUSE)挂载,这是一个虚拟文件系统,可以在< code class="literal">/dbfs 所有集群节点的位置。这允许在集群节点中运行的进程使用具有本地文件 API 的分布式存储系统读取到相同的位置。

  • 例如,当我们将文件写入 DBFS 位置时,如以下代码片段所示,我们正在使用跨所有集群节点的共享文件系统:
    with open("/dbfs/tmp/test_dbfs.txt", 'w') as f:   f.write("这是\n")   f.write("在共享中\n")   f.write("文件系统.\n")
  • 同样,我们可以使用本地文件 API 读取分布式文件系统上的文件,如下面的代码片段所示:
    with open("/dbfs/tmp/test_dbfs.txt", "r") as f_read:   对于 f_read 中的行:     打印(行)
  • 这样,我们可以将输出显示到控制台,如下图所示:

读书笔记《distributed-data-systems-with-azure-databricks》第7章在Azure数据库中使用Python库

图 8.1 – 在共享文件系统中读取文件的输出

在使用 Azure Databricks Runtime ML 时,我们可以选择使用 dbfs:/ml 文件夹,这是一个经过优化以提供高性能的特殊文件夹 输入/输出 (I/O) 用于深度学习操作。作为解决支持文件小于2 千兆字节 (GB) 在 Azure Databricks 中,因此建议将数据保存在此文件夹中。

我们可以通过读取表或本地 CSV 文件来加载数据,用于机器和深度学习应用程序的 训练和推理,我们将在下一节中看到。加载数据后,我们有很多选项可以将其转换为 Spark DataFrames、pandas DataFrames 或 NumPy 数组。

在下一部分中,我们将再次介绍如何读取 CSV 文件以使用 Azure Databricks 中的分布式深度学习应用程序。

Reading CSV files

CSV 是机器学习数据最常见的格式之一,它经常用于保存表格数据。在这里,我们使用术语表格数据来描述结构化为表示观察的行的数据,而这些数据又使用列形式的变量或属性来描述。

正如我们在前几章中看到的,我们有多种方法可以将 CSV 文件加载到 Azure Databricks 中。第一个选项是使用 PySpark API 将数据导入 Spark DataFrame 以解析数据。以下代码会将存储在 DBFS 中指定 databricks 数据集文件夹中的文件读取到 Spark DataFrame 中,从而推断文件中的基础数据类型:

diamonds = spark.read.format('csv').options(header='true', inferSchema='true').load('/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv')

在 Azure Databricks 中,我们 还可以选择使用 Structured Query < strong class="bold">Language (SQL) 命令将数据从 CSV 文件导入到临时视图中,然后我们可以查询。我们可以通过在单元格中使用 %sql 魔法并将 CSV 文件中的数据加载到临时视图中来做到这一点,如下所示:

%sql
CREATE TEMPORARY VIEW diamonds
USING CSV
OPTIONS (path "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv", header "true", mode "FAILFAST")

注意这里,我们通过FAILFAST模式来停止文件的解析,如果有错误会导致异常在其中找到行。

一旦数据被导入到视图中,我们可以使用SQL命令对临时视图进行查询,如下:

%sql 
SELECT * FROM diamonds

这将向我们展示我们刚刚创建的临时视图,如以下屏幕截图所示:

读书笔记《distributed-data-systems-with-azure-databricks》第7章在Azure数据库中使用Python库

图 8.2 – 使用 SQL 读取 CSV 文件

如果我们读取指定模式的 CSV 文件,如果指定的模式与我们指定的不同,我们可能会遇到一些问题——例如,我们可能在指定为整数数据类型的列中有一个字符串值。这可能导致结果与实际数据有很大差异。因此,验证数据的正确性始终是一种很好的做法。

我们可以通过选择解析器运行的可用模式之一来指定 CSV 解析器的行为。此处概述了此模式的可用选项:

  • PERMISSIVE:这是 默认模式。在这里,空值被插入到未正确解析的字段中。这使我们能够检查未正确解析的行。
  • DROPMALFORMED:此 模式会丢弃包含无法正确解析的值的行。
  • FAILFAST:如果发现任何格式错误的数据,此模式会中止读取文件。

要设置模式,请使用 模式选项,如以下代码片段所示:

drop_wrong_schema = sqlContext.read.format("csv").option("mode", "FAILFAST")

在下一个示例中,我们将读取一个 CSV 文件,指定一个已知的数据模式,并将解析器模式设置为在其类型未指定时删除格式错误的数据。如前所述,指定模式始终是一个好习惯。如果数据的模式是已知的,最好避免模式推断。在这里,我们将读取一个包含 ID、名称和姓氏列的文件,第一个为整数,其余为字符串:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
data_schema = StructType([
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("lastname", StringType())
])
(sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(data_schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))

现在我们已经了解了如何从 CSV 文件中读取表格数据,我们可以使用这些数据从中提取特征。这些功能将使我们能够准确地对数据进行建模,并拥有更有效的机器学习和深度学习模型。在下一节中,我们将学习如何在 Azure Databricks 中执行特征工程。

Feature engineering

机器学习模型使用输入数据进行训练,然后提供对未见数据的预测作为结果。此输入数据通常由通常以结构化列形式出现的特征组成。算法使用这些数据来推断可用于推断结果的模式。在这里,特征工程的需求产生于两个主要目标,如下所示:

  • 重构输入数据,使其与我们为任务选择的机器学习算法兼容。例如,如果我们为模型选择的算法不支持分类值,我们需要对这些值进行编码。
  • 根据我们为手头问题选择的性能指标改进模型产生的预测。

通过特征工程,我们从原始输入数据中提取相关特征,以便能够根据要解决的问题的建模方式准确地表示它,从而改进新数据的性能模型。请记住,所使用的功能对模型性能的影响比其他任何事情都大。

特征工程可以被认为是通过使关键特征或模式更加透明,将输入数据转换为更易于机器学习算法解释的内容的过程,尽管我们有时可以生成新功能使数据可视化更易于解释或模型算法的模式推断更清晰。

在特征工程中应用了多种 策略。其中一些列在这里:

  • 缺失数据插补和管理
  • 异常值处理
  • 装箱
  • 对数变换
  • 一键编码
  • 正常化
  • 分组和聚合操作
  • 特征分割
  • 缩放

其中一些策略仅适用于某些算法或数据集,而其他策略在所有情况下都可能是有益的。

人们普遍认为,您建模的特征越好,您获得的性能就越好。虽然在某些情况下确实如此,但它也可能具有误导性。

模型的性能有很多因素,具有不同程度的重要性。这些因素包括算法的选择、可用数据和提取的特征。您还可以将问题建模方式以及用于估计性能的客观指标添加到这些因素中。然而,需要强大的功能来有效地描述数据中固有的结构,并使算法的工作更容易。

处理大量数据时,Spark SQL 和 MLlib 等工具在用于特征工程时会非常有效。 Databricks Runtime ML 中还包含一些第三方库,例如 scikit-learn,它提供了从数据中提取特征的有用方法。

本部分不打算深入探讨每种算法的机制,但会加强一些概念以了解它们在 Azure Databricks 中的应用方式。

Tokenizer

标记化可以一般描述为一个过程,在该过程中,我们将输入字符的字符串转换为由构成它的各个部分形成的数组。这些部分称为标记,通常是单个单词,但可以是一定数量的字符,或者称为 n-gram 的单词组合。稍后我们将使用这些生成的标记进行其他形式的处理以提取或转换特征。从这个意义上说,标记化过程可以被认为是特征工程的任务。令牌是根据传递给解析器的特定模式来识别的。

在 PySpark 中,我们可以使用简单的 Tokenizer 类来标记字符串输入序列。以下代码示例显示了我们如何将句子拆分为单词序列:

from pyspark.ml.feature import Tokenizer
sentenceDataFrame = sqlContext.createDataFrame([
  (0, "Spark is great for Data Science"),
  (0, "Also for data engineering"),
  (1, "Logistic regression models are neat")
], ["label", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsDataFrame = tokenizer.transform(sentenceDataFrame)
for words_label in wordsDataFrame.select("words", "label").take(3):
  print(words_label)

在这里,我们可以看到编码的令牌:

读书笔记《distributed-data-systems-with-azure-databricks》第7章在Azure数据库中使用Python库

图 8.3 – 词标记化的输出

我们还可以使用 更高级的RegexTokenizer 分词器来使用正则表达式指定分词边界。

Binarizer

二值化是 中的一个过程,您可以在该过程中为特征建立一个数值阈值,以便将它们转换为二值特征。如果您可以假设我们的数据具有概率正态分布,二值化对于预处理具有连续数值特征的输入数据非常有用。此过程使机器学习和深度学习中使用的算法更容易处理数据,因为它们将数据描述为更明确的结构。

在 PySpark 中,我们可以使用简单的 Binarizer 类,它允许我们对连续的数值特征进行二值化。除了 inputColoutputCol 的常用参数之外,简单的 Binarizer 类还有一个参数阈值,用于建立阈值以对连续数值特征进行二值化。大于该阈值的特征被二值化为 1.0,等于或小于该阈值的特征被二值化为 0.0。下面的代码示例展示了我们如何使用 Binarizer 类对数值特征进行二值化。首先,我们导入 Binarizer 类并创建一个带有 labelfeature 列:

from pyspark.ml.feature import Binarizer
continuousDataFrame = sqlContext.createDataFrame([
  (0, 0.345),
  (1, 0.826),
  (2, 0.142)
], ["label", "feature"])

接下来,我们实例化 Binarizer 类,指定阈值以及输入和输出列。然后,我们转换示例 DataFrame 并存储和显示结果。以下代码片段说明了执行此操作的代码:

binarizer = Binarizer(threshold=0.5, inputCol="feature", 
                      outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)
binarizedFeatures = binarizedDataFrame.select("binarized_feature")
for binarized_feature, in binarizedFeatures.collect():
  print(binarized_feature)

我们应该看到根据我们建立的阈值 打印了转换后的 DataFrame 的二值化特征。接下来,我们将学习如何应用多项式展开来在更高维空间中表达特征。

Polynomial expansion

在数学中,多项式展开是一种数学运算,用于将特征表示为其他更高级别特征的乘积。因此,它可以用来扩展传递的特征,将它们表示为更高维度的一组特征。

多项式展开可以被认为是将特征展开到多项式空间中的过程,以便尝试在展开的空间中找到模式,否则在低维空间中可能难以找到。这在处理做出简单假设的算法时很常见,例如线性回归,否则可能无法捕获相关模式。我们转换特征的这个多项式空间是由原始维度的 n 度组合形成的。 PySpark PolynomialExpansion 类为我们提供了这个功能。以下代码示例展示了如何使用它将特征扩展到 3 次多项式空间:

from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
    (Vectors.dense([2.0, 1.0]),),
    (Vectors.dense([0.0, 0.0]),),
    (Vectors.dense([3.0, -1.0]),)
], ["features"])
polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)
polyDF.show(truncate=False)

在这里,我们可以看到在多项式展开中创建的变量:

读书笔记《distributed-data-systems-with-azure-databricks》第7章在Azure数据库中使用Python库

图 8.4 – 多项式展开的输出特征

将我们的特征 转换为更高维度的空间是在特征工程中使用的绝佳工具。它允许我们用不同的术语来描述数据,并提高机器学习算法对可能隐藏在数据中的更复杂模式的敏感性。

StringIndexer

PySpark StringIndexer 将标签的字符串列编码为标签索引的列。这些索引的范围从 0numLabels,按标签频率排序。因此,最频繁的标签将被索引为 0。如果输入列是数字,PySpark StringIndexer 类会将其转换为字符串并索引字符串值。

StringIndexer 类接受输入列名和输出列名。在这里,我们将展示如何使用它来索引名为 cluster 的列:

from pyspark.ml.feature import StringIndexer
df = sqlContext.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), 
     (4, "a"), (5, "c")],
    ["id", "cluster"])
indexer = StringIndexer(inputCol="cluster", 
                        outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()

在以下屏幕截图中,我们 可以看到字符串已正确索引到变量的 DataFrame 中:

读书笔记《distributed-data-systems-with-azure-databricks》第7章在Azure数据库中使用Python库

图 8.5 – StringIndexer 类的输出转换

在此示例中,a 集群获得 index 0,因为它是最频繁,其次是索引 1c 和索引 b代码类="literal">2。

One-hot encoding

One-hot 编码 是一种在特征工程中用于创建一组 二进制值(通常称为虚拟变量)的转换,它们是主要用于将具有多个可能值的分类变量表示为只有一个高值 (1) 和所有其他低值 (0) 的变量。这些布尔特征中的每一个都代表原始分类变量中的单个可能值。

One-hot 编码有时用于可视化、模型效率或根据训练算法的要求准备数据。为此,我们通过对分类变量进行编码来构建不同的特征。我们将这一列拆分为每个级别的布尔特征,而不是具有多个级别的单个特征列,其中唯一接受的值是 1 或 0。

One-hot 编码是 一种在神经网络和深度学习中的其他算法中特别流行的技术,用于编码分类特征,允许期望连续特征的算法(如逻辑回归)使用这些分类特征。在 PySpark 和 Azure Databricks 中,我们可以使用 OneHotEncoder 类将标签索引列映射到二进制向量的编码列,最多具有一个可能的值。在这里,我们使用 OneHotEncoder 类将分类特征转换为数值特征:

from pyspark.ml.feature import OneHotEncoder
df = spark.createDataFrame([
    (0.0, 1.0),
    (1.0, 0.0),
    (2.0, 1.0),
    (0.0, 2.0),
    (0.0, 1.0),
    (2.0, 0.0)
], ["clusterV1", "clusterV2"])
encoder = OneHotEncoder(inputCols=["clusterV1", 
                                   "clusterV2"],
                        outputCols=["catV1", "vatV2"])
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()

在这里,我们看到了我们获得的编码变量:

读书笔记《distributed-data-systems-with-azure-databricks》第7章在Azure数据库中使用Python库

图 8.6 – one-hot 编码的输出

通过这种方式,我们可以使用one-hot编码将Spark DataFrame的一列或多列中的分类特征转换为数字特征,这些特征可以稍后转换或直接传递给我们的模型训练和推断。

VectorIndexer

在 Azure Databricks 中,我们可以 使用 VectorIndexer 类来索引分类特征,作为一次性编码的替代方法。此类用于包含 Vector 类型列的数据帧。它的工作原理是确定哪些特征是分类的,并将这些特征的原始值转换为分类索引。接下来概述了它遵循的步骤:

  1. 它将 Vector 类型的列和 maxCategories 参数作为输入,顾名思义,该参数指定单个变量的最大类别数的阈值。
  2. 它根据它们持有的不同值的数量来决定哪些特征是分类的。最多 maxCategories 的特征将被标记为分类。
  3. 它为声明为分类的每个特征计算基于 0 的类别索引。
  4. 最后,它通过将原始特征值转换为索引来索引分类特征。

这种分类特征的索引使我们能够克服某些算法(例如决策树和树集合)在处理分类特征以提高性能或简单地能够使用仅允许数值特征的算法方面的限制。

在下面的代码示例中,我们将读取标记点的示例数据集,并使用 PySpark VectorIndexer 类来决定构成数据集的哪些特征应被视为分类变量。我们稍后会将这些分类特征的值转换为索引:

from pyspark.ml.feature import VectorIndexer
from pyspark.mllib.util import MLUtils
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()
indexer = VectorIndexer(inputCol="features", 
                        outputCol="indexed", 
                        maxCategories=10)
indexerModel = indexer.fit(data)
# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)

这个分类数据转换成连续的,然后可以传递给算法,如DecisionTreeRegressor,以便能够处理分类特征。

Normalizer

另一种常见的特征工程方法是将数据带入给定的区间。这样做的第一个原因是将计算限制在固定值范围内,以防止数值不准确,并限制在处理太大或太小的数字时可能出现的所需计算能力。归一化特征数值的第二个原因是,一些机器学习算法在数据归一化后会更好地处理数据。我们可以采取几种方法来规范化我们的数据。

这些不同的方法是由于一些机器学习算法需要不同的归一化策略才能有效执行。例如,对于 k-nearest neighbors (KNN) ,特定特征中的 值范围会影响模型中该特征的权重。因此,值越大,特征的重要性就越大。在神经网络的情况下,这种归一化本身可能不会影响最终的性能结果,但它会加快训练速度,并避免由于传播过大的值而导致的梯度利用和消失问题或者对于模型中的后续层太小。另一方面,基于决策树的机器学习算法既不会因归一化而受益也不会受到伤害。

正确的归一化策略将取决于问题和选择的算法,而不是一些一般的统计或计算考虑,并且正确的选择将始终与领域知识密切相关。

在 PySpark 中,Normalizer 是一个转换 Vector 行,对每个向量进行归一化以将向量的范数转换为一个单位。它将值 p 作为参数,它指定用于标准化的 p 范数。这个 p 值默认设置为 2,相当于说我们正在转换为欧几里得范数。

以下 代码示例演示了如何以 libsvm 格式加载数据集,然后对每一行进行规范化以具有单位范数:

from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import Normalizer
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
dataFrame = sqlContext.createDataFrame(data)
# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", 
                        outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})

这种归一化过程广泛用于特征工程,将帮助我们标准化数据并改善学习算法的行为。

StandardScaler

在自然界的许多领域,事物往往受正态或高斯概率分布支配。归一化是我们在特征工程中所说的减去特征平均值的过程,它允许我们将特征集中在 0 附近。然后,我们将其除以标准差,这将告诉我们特征在周围的分布0. 最后,我们将得到一个以 0 为中心的变量,取值范围在 -1 和 1 之间。

在 Azure Databricks 中,我们可以使用 PySpark StandardScaler 类将 Vector 行的数据集转换为具有单位标准差和/或 0 均值的标准化特征。它采用以下参数:

  • withStd:该参数默认设置为true。它将数据缩放到单位标准差,这意味着我们除以标准差。
  • withMean:该参数默认设置为false。它将在缩放之前以均值为 0 的数据居中。它会创建密集的输出,因此这不适用于稀疏输入并且会引发异常。

PySpark StandardScaler 类是一种转换,可以适应数据集以生成 StandardScalerModel;后一种模型稍后可用于计算输入数据的汇总统计信息。然后可以将 StandardScalerModel 转换为数据集中具有单位标准差和/或 0 均值特征的列向量。

考虑到如果一个特征的 标准差已经是 0,StandardScalerModel 将默认返回 0.0 作为该特征的向量。

在下面的代码示例中,我们将展示如何加载 libsvm 格式的数据集,然后将每个特征归一化以具有单位标准差。

  1. 首先,我们将进行必要的导入并读取数据集,这是 mllib 文件夹中的示例数据集:
    从 pyspark.mllib.util 导入 MLUtils 从 pyspark.ml.feature 导入 StandardScaler 数据 = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") dataFrame = sqlContext.createDataFrame(数据) 缩放器 = StandardScaler(inputCol="features",                         outputCol= “缩放特征”,                         withStd= True, withMean=False)
  2. 然后,我们可以通过将 StandardScaler 类拟合到从输入数据创建的 DataFrame 来计算汇总统计信息,如下所示:
    scalerModel = scaler.fit(dataFrame)
  3. 最后,我们可以将每个特征归一化,使其具有单位标准差,如下所示:
    scaledData = scalerModel.transform(dataFrame)

规范化 广泛用于数据科学。它帮助我们将所有连续数值放在同一尺度上,并且可以帮助我们克服在我们的学习算法中使用的连续变量不受固定范围限制且未居中时出现的许多问题。

Bucketizer

PySpark Bucketizer 类将 一列连续特征转换为一列特征桶,其中这些桶由用户指定使用拆分参数:

splits 参数用于将连续特征映射到存储桶中。指定 n+1 个拆分将产生 n 个桶。由拆分 xy 定义的存储桶将保存 [x ,y) 除了最后一个桶外,它还将包含 y 范围内的值。这些分裂必须严格增加。请记住,如果另有指定,指定拆分之外的值将被视为错误。拆分示例可以在以下代码片段中看到:

splits = Array(Double.NegativeInfinity, 0.0, 1.0, 
               Double.PositiveInfinity)
splits = Array(0.0, 1.0, 2.0)

必须明确提供拆分值以涵盖所有 Double 值。

考虑到如果您不知道目标列的上限和下限,最好添加 Double.NegativeInfinityDouble.PositiveInfinity 作为拆分的边界,以防止由于“out of Bucketizer bounds”异常而可能出现的任何潜在错误。另请注意,这些拆分必须以严格的递增顺序指定。

在下面的示例中,我们将展示如何使用 PySpark BucketizerDouble 值分桶到另一个索引列中> 类:

  1. 首先,我们将进行必要的导入并定义要分桶的 DataFrame,如下所示:
    from pyspark.ml.feature import Bucketizer splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")] 数据 = [(-0.5,), (-0.3,), (0.0,), (0.2,)] dataFrame = sqlContext.createDataFrame(数据,                                         [“特征”]) bucketizer = Bucketizer(拆分=拆分,                         inputCol= “特征”,                         outputCol= "bucketedFeatures")
  2. 然后,我们可以将原始数据转化为它的桶索引,如下:
    bucketedData = bucketizer.transform(dataFrame) display(bucketedData)
  3. 这是得到的分桶数据:

    读书笔记《distributed-data-systems-with-azure-databricks》第7章在Azure数据库中使用Python库

    图 8.7 – Bucketizer 输出特性

  4. 在 PySpark 中,我们 也可以在 Bucketizer 上使用 QuantileDiscretizer 类,其中 < code class="literal">QuantileDiscretizer 是一个能够处理 Not a Number (NaN) 值,这是两者之间的区别,因为 Bucketizer 类是一个转换器,如果输入数据包含 NaN 值,则会引发错误。以下代码片段提供了两个类产生相似输出的情况示例:
    from pyspark.ml.feature import QuantileDiscretizer 从 pyspark.ml.feature 导入 Bucketizer 数据 = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)] df = spark.createDataFrame(data, ["id", "hour"]) result_discretizer = QuantileDiscretizer(numBuckets=3, inputCol="小时", outputCol="result").fit(df).transform(df) result_discretizer.show() splits = [-float("inf"),3, 10, float("inf")] result_bucketizer = Bucketizer(splits=splits, inputCol="小时", outputCol="result").transform(df) result_bucketizer.show()

这里 QuantileDiscretizer 类将根据数据确定桶拆分,而 Bucketizer 类将根据数据将数据排列到桶中您通过拆分指定的

因此,当您已经知道您期望哪些桶时,最好使用 Bucketizer,而 QuantileDiscretizer 类将估计为你分裂。

在前面的示例中,由于输入数据和选择的拆分,两个进程的输出是相似的。否则,结果可能会在不同情况下有很大差异。

Element-wise product

逐元素 乘积是特征工程中在处理排列为矩阵的数据时非常常用的数学运算。 PySpark ElementwiseProduct 类将每个输入向量乘以先前指定的权重向量。为此,它将使用逐元素乘法,该乘法将通过定义为权重的标量乘数来缩放输入向量的每一列。

ElementwiseProduct类以scalingVec为主要参数,即本次操作的变换向量。

以下示例将举例说明我们如何使用 ElementwiseProduct 类和转换向量值来转换向量:

from pyspark.mllib.linalg.distributed import RowMatrix 
v1 = sc.parallelize([[2.0, 2.0, 2.0], [3.0, 3.0, 3.0]]) 
mat = RowMatrix(v1) 
v2 = Vectors.dense([0.0, 1.0, 2.0]) 
transformer = ElementwiseProduct(v2) 
transformedData = transformer.transform(mat.rows) 
print transformedData.collect()

那么这个操作可以理解为Hadamard乘积,也就是输入数组和权重向量的元素乘积,反过来会给出结果另一个向量。

Time-series data sources

在数据科学 和工程中,最常见的挑战之一是时间数据操作。包含地理空间或交易数据的数据集,主要位于应用程序的金融和经济领域,是由时间戳索引的数据的一些最常见示例。在金融、欺诈甚至社会经济时间数据等领域工作最终导致需要加入、聚合和可视化数据点。

这些时间数据通常以日期时间格式出现,这些格式不仅可能在格式本身上有所不同,而且在其包含的信息方面也可能有所不同。其中一个例子是 DD/MM/YYYY 和 MM/DD/YYYY 格式之间的差异。如果使用的格式不匹配,误解这些不同的日期时间格式可能会导致失败或格式错误的结果。此外,这些数据不是数字格式,正如我们在本章前几节中所看到的那样,可能会导致需要克服的几个障碍,其中一个是这些数据无法通过以下方式轻松解释。机器学习和深度学习中使用的大多数学习算法。

这就是特征工程发挥作用的地方,它为我们提供了从这些数据转换和创建新特征的工具。这方面的一个例子可能是重新组织数字特征中的数据,例如日、月和年,甚至使用动态时间扭曲技术来操作特征来比较不同长度的时间序列,例如比较不同长度的时间序列。二月和三月,分别有 28 天和 31 天。

在 Azure Databricks 中,我们有几个功能允许我们执行时间序列的连接、聚合和窗口化等操作,并具有并行执行此处理的额外好处。此外,Koalas API 允许我们使用类似于 Pandas 的语法来处理这些数据,这使得从实验到生产的过渡更容易处理。

在此示例中,我们将使用财务数据来说明我们如何在 Azure Databricks 中操作时态数据集。我们将使用的数据是一个基于股票市场的示例,其中包含正在交易的不同公司的交易信息。您可以在此链接中找到有关此类数据的更多信息:https://www.tickdata.com/产品/nbbo/

  1. We can get example data by running the following code in an Azure notebook cell. This will download the data that we will read afterward:
    %sh wget https://pages.databricks.com/rs/094-YMS-629/images/ASOF_Quotes.csv ; wget https://pages.databricks.com/rs/094-YMS-629/images/ASOF_Trades.csv ; 

    在这里,我们下载了两种数据集,一种用于交易,一种用于报价。我们将在时间列上合并这些数据,以比较在同一时间点进行的交易和报价。

  2. Before reading the data, we will define a schema for it because this information is already known to us. We will assign the PySpark TimestampType class to the columns that hold temporal data about the trade execution and quote time. We will also rename the trade and execution time column names to events_ts to finally convert this data into Delta format. The code to do this is shown in the following snippet:
    from pyspark.sql.types import * trade_schema = StructType([     StructField("symbol", StringType()),     StructField("event_ts", TimestampType()),     StructField("trade_dt", StringType()),     StructField("trade_pr", DoubleType()) ]) quote_schema = StructType([     StructField("symbol", StringType()),     StructField("event_ts", TimestampType()),     StructField("trade_dt", StringType()),     StructField("bid_pr", DoubleType()),     StructField("ask_pr", DoubleType()) ])

    一旦我们下载了数据并为我们的 Spark 数据框指定了所需的模式,我们就可以根据数据模式解析 CSV 文件并将它们存储为我们以后能够查询的 Delta 表。

  3. 我们已决定 使用 Delta 表以从优化的数据格式中受益,该格式使我们能够处理大型压缩平面文件并利用底层引擎的强大功能,使我们能够轻松扩展并根据可用数据量并行化该过程。代码如以下片段所示:
    spark.read.format("csv").schema(trade_schema).option("header", "true").option(" delimiter", ",").load("/tmp/finserv/ASOF_Trades.csv").write.mode('overwrite').format("delta").save('/tmp/finserv/delta/trades' ) spark.read.format("csv").schema(quote_schema).option("header", "true").option("delimiter", ",").load("/tmp/finserv/ASOF_Quotes.csv" ).write.mode('overwrite').format("delta").save('/tmp/finserv/delta/quotes')
  4. 在我们读取数据并将数据存储到 Delta 表中后,我们可以通过将生成的 Delta 表显示为 Spark Data Frame 来检查数据是否已正确解析,如下所示:
    display(spark.read.format("delta").load("/tmp/finserv/delta/trades"))

现在我们有了我们的交易数据,我们可以使用它来合并交易和报价,聚合交易,并对其执行窗口操作。

Joining time-series data

当使用 处理时间序列数据时,as-of 连接是一种合并技术,通常是指在时间戳的确切时刻获取给定事件的值。对于大部分时间数据,不同类型的日期时间序列将连接在一起。在我们的特定情况下,我们想知道数据集中的每家公司在我们可用的任何给定时间点的特定贸易状态。在这里,这些状态可以是 - 例如 - NBBO,它是交易和投资中 National Best Bid Offer 的首字母缩写词.

在以下示例中,我们将获取数据集中每个可用公司的 NBBO 状态。我们将对数据进行处理,以便为每个公司提供我们作为时间戳的每个数据点的最新投标和报价状态。一旦我们进行了计算,我们就可以可视化出价和出价之间的差异,以了解公司的流动性在哪些时间点可能会达到低点:

  1. 首先,我们将合并交易品种上的数据。我们假设我们已经将 Delta 表加载到名为 trades 和 offers 的两个 Spark 数据帧中,我们将在符号列。代码显示在以下代码段中:
    un= trades.filter(col("symbol") == "K").select('event_ts', 'price', ' symbol', 'bid', 'offer', 'ind_cd').union(quotes.filter(col("symbol") == "K").select('event_ts', 'price', 'symbol', ' bid', 'offer', 'ind_cd'))
  2. 执行完合并后,我们可以将结果可视化,如下所示:
    display(un)
  3. 现在我们的数据已经合并到一个 Spark DataFrame 中,我们可以执行窗口操作了。首先,我们需要定义我们将使用的窗口函数。为此,我们将使用 Window 类,它是 PySpark 中的内置方法 来执行窗口操作。在下面的代码片段中,我们将通过代表公司的符号列定义分区:
    from pyspark.sql.window 导入窗口 partition_spec = Window.partitionBy('symbol')
  4. 然后,我们需要指定用于对数据进行排序的机制。在这里,我们使用 ind_cd 列作为 sort 键,该列指定交易前报价的值:
    join_spec = partition_spec.orderBy('event_ts')。 rowsBetween(Window.unboundedPreceding,             Window.currentRow)
  5. 然后,我们将使用 SQL 命令查询数据并通过对最后一个出价运行 SELECT 操作来获取 lasted_bid 值,然后在join_spec上,如下:
    %sql select(last("bid", True).over(join_spec).alias("latest_bid"))

通过这种方式,我们展示了使用 PySpark API 在时态数据上运行联接、合并和聚合来操作 Azure Databricks 中的时间序列数据是多么容易。在下一节中,我们将学习如何利用类似于 Pandas 的语法的优势来简化从笔记本到生产的过渡。

Using the Koalas API

当使用时间序列时,执行与插补和删除重复相关的任务是相当常见的。当在数据集上以高频率插入多条记录时,往往会发生这些重复记录。我们可以使用 Koalas API 使用非常熟悉的类似 Pandas 的语法来执行这些操作,如前几章所述:

  1. 在以下代码示例中,我们将把我们的 Delta 表读入 Koalas DataFrame 并通过按 event_ts 列分组来执行重复数据删除,然后检索该时间点的最大值:
    import databricks.koalas as ks kdf_src = ks.read_delta("/tmp/finserv/delta/ofi_quotes2") grouped_kdf = kdf_src.groupby(['event_ts'],                                as_index=False).max() grouped_kdf.sort_values(by=['event_ts']) grouped_kdf.head()
  2. 现在我们已经从 DataFrame 中删除了重复项,我们可以在数据中执行转换以创建一个滞后的 Koalas Data Frame,这对于进行移动平均或其他统计趋势计算等计算非常有帮助。如以下代码片段所示,Koalas 通过使用 Koalas Data Frame 的 shift 方法,可以轻松获取窗口内的滞后或领先值:
    grouped_kdf.set_index('event_ts', inplace=True,                       drop=True) lag_grouped_kdf = grouped_kdf.shift(周期=1,                                      fill_value=0) lag_grouped_kdf.head()
  3. 现在我们已经计算了滞后值,我们将能够将此数据集与我们的原始 DataFrame 合并,以将所有数据合并到一个结构中,以简化统计趋势的计算。在下面的代码示例中,我们演示了将两个不同的数据帧合并为一个数据帧是多么简单:
    滞后 = grouped_kdf.merge(lag_grouped_kdf,                             left_index=真,                             right_index=真,                            后缀=['', '_lag']) laagged.head()

Koalas API 对我们的数据建模非常有帮助,并且可以轻松执行必要的转换,即使在处理时间序列数据时也是如此。在下一部分中,我们将了解在 Azure Databricks 中工作时如何处理数据框中可能存在的缺失值。

Handling missing values

现实生活中的数据远非完美,缺失值的情况非常普遍。数据变得不可用的机制对于提出一个好的插补策略非常重要。我们将插补称为处理数据中缺失值的过程,在大多数情况下,这些值表示为 NaN 值。其中最重要的方面之一是知道缺少哪些值:

  1. 在下面的代码示例中,我们将展示如何通过转换此布尔输出对 Spark isNull 方法的所有布尔输出求和来找出哪些列具有缺失值或空值为整数:
    from pyspark.sql.functions import col, sum df.select(*(sum(col(c).isNull().cast(" int")).alias(c) for c in df.columns)).show()< /上一个> 
  2. 另一种选择是使用 Spark 数据帧描述方法的输出来过滤掉每列中缺失值的计数,最后减去行数以获得实际缺失值的数量,如下所示:
    from pyspark.sql.functions import lit 行 = df.count() summary = df.describe().filter(col("summary") == "count") summary.select(*((lit(rows)-col(c)).alias(c) for c in df.columns )).show()
  3. 一旦我们 识别了空值,我们可以使用 na.drop() 删除所有包含空值的行Spark数据帧的方法,如下:
    null_df.na.drop().show()
  4. 我们还可以指定删除空值个数大于 2 的行,如下所示:
    null_df.na.drop(thresh=2).show()
  5. 其他可用选项之一是使用此方法的 how 参数。例如,我们可以将此参数指定为 any,表示删除具有任意数量的空值的行,如下面的代码片段所示:
    null_df.na.drop(how='any').show()
  6. 我们还可以使用接受列名列表的子集参数在单个列上删除空值,如以下代码片段所示:
    null_df.na.drop(subset=['Sales']).show()
  7. For example, we can specify to drop records that have null values in all of the specified subset of columns, as follows:
    null_df.na.drop(how='all',subset=['Name','Sales']).show()

    如前所述,处理缺失数据时可用的选项之一是对它们执行插补,这可以理解为根据我们知道不会改变数据实际分布的预定义策略填充这些空值。估算空值总是比删除数据更好。

  8. 例如,我们可以选择用有助于我们识别它们的字符串填充空值,如下所示:
    null_df.na.fill('NA').show()
  9. Or, we can also fill the missing values with an integer value, like this:
    null_df.na.fill(0).show()

    一种更常见的选择是用根据当前值计算的平均值或中值填充缺失值。这样,我们尽量保留数据的实际分布,并尽量不严重影响其他计算,例如列均值。然而,有必要记住,如果我们重新计算平均值,我们将得到另一个值。

  10. 在这里,我们可以用该特定列的平均值填充 一个数值列,如下所示:
    from pyspark.sql.functions 导入意思 mean_val=null_df.select(mean(null_df.Sales)).collect() print(type(mean_val)) #mean_val 是一个列表行对象 print('销售额的平均值', mean_val[0][0]) mean_sales=mean_val[0][0]
  11. 现在我们已经计算了平均值并将其存储为名为 men_sales 的变量,我们可以使用该值填充 sales 中的空值 列,如下:
    null_df.na.fill(mean_sales,subset=['Sales']).show()

通过这种方式,我们可以处理我们在数据中找到的所有可能的空值或缺失值,尽管用于处理它们的策略在很大程度上取决于创建缺失值的机制和问题的领域知识。

Extracting features from text

从文本中提取信息 依赖于能够捕获底层语言结构。这意味着我们打算捕捉标记之间的含义和关系以及它们试图在句子中传达的含义。这些与理解文本含义相关的操作和任务产生了跨学科领域的一个完整分支,称为自然语言处理(NLP)。在这里,我们将重点介绍一些与将文本转换为数字特征相关的示例,这些特征可以在以后使用 Azure Databricks 中的 PySpark API 用于机器学习和深度学习算法。

TF-IDF

术语 频率反向 文档 频率 (TF-IDF) 是一种非常常用的文本预处理操作,将句子转换为基于组成它们的标记的相对频率。术语频率逆用于创建一组数字特征,这些特征是根据单词与上下文的相关程度构建的,而不仅仅是与集合或语料库中的文档相关。

在 Azure Databricks ML 中,TF-IDF 是一个在两个独立部分中完成的操作,第一个是 TF (+hashing),然后是 IDF:

  • TF:HashingTF 是一个 PySpark Transformer 类,它接受一组术语并将它们转换为固定长度的特征向量。在特征工程中,当我们引用“一组术语”时,这可能意味着我们正在引用 bag of < strong class="bold">单词(弓)。 算法结合了 TF 计数和散列技巧(有时 称为 特征 < strong class="bold">hashing),以便将特征快速有效地向量化为向量或矩阵,以降低维度。
  • IDF:IDF 是一个 PySpark Estimator 类,它适用于数据集并因此产生 IDF 模型IDFModel 作为输入特征向量,在这种情况下来自 HashingTF 类,并缩放每个特征列。它将自动降低应用于列的权重,具体取决于它在语料库中出现的频率。

在下面的示例中,我们将一个句子数组作为输入。每个句子将使用 PySpark Tokenizer 类拆分为标记,这将为每个句子生成一个 BOW:

  1. 稍后——如前所述,在下面的代码块中——我们将应用 HashingTF 类来降低特征向量的维数:
    from pyspark.ml.feature import HashingTF, IDF, Tokenizer sentenceData = sqlContext.createDataFrame([   (0, "嗨,我听说了 Spark"),   (0, "我希望 Java 可以使用案例类"),   (1, “逻辑回归模型很整洁”) ], ["标签", "句子"]) tokenizer = Tokenizer(inputCol="sentence", outputCol="words") wordsData = tokenizer.transform(sentenceData) hashingTF = HashingTF(inputCol="words",                       outputCol="rawFeatures",                       numFeatures=20)
                       
                 
                   
                 
  2. 稍后,当我们在机器学习或深度学习算法中使用它时,我们将应用 IDF 类来重新缩放获得的特征,以避免梯度问题并普遍提高性能。代码如下所示:
    featurizedData = hashingTF.transform(wordsData) idf = IDF(inputCol="rawFeatures",outputCol="features") idfModel = idf.fit(featurizedData) rescaledData = idfModel.transform(featurizedData)
  3. 最后,我们可以展示得到的特征,如下:
    for features_label in rescaledData.select("features","label").take(3):   print(features_label)
  4. 我们可以看到生成变量的整个过程,如下截图所示:

读书笔记《distributed-data-systems-with-azure-databricks》第7章在Azure数据库中使用Python库

图 8.8 – TF-IDF 操作的输出特征

通过这种方式,我们从文本中提取我们的特征,并将其转换为可以输入任何学习算法的缩放数字特征。

Word2vec

Word2vec 是一种 嵌入技术,用于 NLP 的特征工程。它使用神经网络从大量文本语料库中学习单词之间的关联,并生成表示每个标记的向量。该向量具有允许我们仅通过使用(例如)余弦相似度测量表示它们的向量之间的距离来指示两个词之间的语义相似性的属性。这些 向量通常被称为词嵌入,它们通常有数百个维度来表示它们。

在 PySpark 中,Word2vec 是一个 Estimator 类,它将一个数组作为输入并训练一个 Word2VecModel,这是一个将每个单词映射到单个固定大小向量的模型。

在接下来显示的代码示例中,我们从一组文档开始,每个文档都表示为一个单词序列。对于每个文档,我们将其转换为特征向量。然后可以将此特征向量传递给学习算法:

  1. 在下面的代码示例中,我们将创建一个 Spark 数据帧,其中的句子使用简单的字符串拆分来拆分为它们的标记,这意味着每一行代表该句子的一个 BOW:
    从 pyspark.ml.feature 导入 Word2Vec documentDF = sqlContext.createDataFrame([   ("嗨,我听说了 Spark".split(" "), ),   ("我希望 Java 可以使用案例类".split(" "), ),   ("逻辑回归模型很整洁".split(" "), ) ], ["text"])
  2. 然后,我们可以将这些数组映射为向量,如下所示:
    word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
  3. 最后,我们可以将 Word2Vec 模型拟合到我们刚刚创建的数组中,并通过使用拟合模型转换输入 DataFrame 来获得结果特征,如下所示:
    model = word2Vec.fit(documentDF) 结果 = model.transform(documentDF) 对于 result.select("result").take(3) 中的功能:   print(feature)
  4. 我们可以在这里看到在 Azure Databricks 笔记本中的单个单元上执行的整个过程:
    读书笔记《distributed-data-systems-with-azure-databricks》第7章在Azure数据库中使用Python库

图 8.9 – 使用 Word2Vec 的输出嵌入

通过这种方式,我们可以非常使用文本嵌入中最广泛使用的模型之一:Word2Vec,简单地创建从文本源获得的数字表示。

Training machine learning models on tabular data

在这个例子中,我们将使用数据科学中非常流行的数据集,即物理化学特性的葡萄酒数据集,来预测特定葡萄酒的质量。我们将使用 Azure Databricks Runtime ML,因此请确保将笔记本附加到运行此版本可用运行时的群集,如本章开头的要求中所述。

Engineering the variables

我们将开始使用 以下步骤:

  1. 我们的第一步是加载必要的数据来训练我们的模型。我们将加载数据集,这些数据集作为示例数据集存储在 DBFS 中,但您也可以从 UCI 机器学习存储库中获取它们。代码显示在以下代码段中:
    import pandas as pd white_wine = pd.read_csv("/dbfs/databricks-datasets/wine-quality/winequality-white.csv", sep=";") red_wine = pd.read_csv("/dbfs/databricks-datasets/wine-quality/winequality-red.csv", sep=";")
  2. 接下来,我们将这两个 pandas Data Frames 合并为一个,并添加一个名为 is_red 的新二进制特征来区分它们,如下所示:
    red_wine['is_red'] = 1 白酒['is_red'] = 0 data = pd.concat([red_wine, white_wine], axis=0)
  3. 在此之后,我们将从列名中删除空格,以便能够将其保存为 Spark 数据框,如下所示:
    data.rename(columns=lambda x: x.replace(' ', '_'), inplace=True) data.head()
  4. Here, we can see the created Pandas Data frame:

    读书笔记《distributed-data-systems-with-azure-databricks》第7章在Azure数据库中使用Python库

    图 8.10 – 数据集的重命名列

  5. 现在我们有了统一的数据集,我们将为所有质量高于 7 的葡萄酒创建一个名为 high_quality 的新特征,如下所示:
    high_quality = (data.quality >= 7).astype(int) data.quality = high_quality
  6. 之后,我们可以将数据集拆分为训练数据集和测试数据集,稍后我们将使用这些数据集来训练我们的机器学习模型,推断质量列。在以下代码块中,我们导入 scikit-learn train_test_split 函数,以便将数据拆分为 X 定义的特征目标列定义为 y
    from sklearn.model_selection import train_test_split 火车,测试 = train_test_split(数据,random_state=123) X_train = train.drop(["质量"], axis=1) X_test = test.drop(["质量"], axis=1) y_train = train.quality y_test = test.quality

因此,我们将特征和目标列拆分为训练和测试数据集,可用于训练和基准测试模型。

Building the ML model

在这里,我们将使用 分类器,因为我们试图预测的是表格数据集上的二进制输出。所选算法将是 scikit-learn Python 库中可用的随机森林分类器。我们还将使用 MLflow 来跟踪模型性能,并保存模型以供以后使用:

  1. 我们将首先进行所有必要的导入,如下所示:
    导入mlflow 导入 mlflow.pyfunc 导入 mlflow.sklearn 将 numpy 导入为 np 从 sklearn.ensemble 导入 RandomForestClassifier 从 sklearn.metrics 导入 roc_auc_score from mlflow.models.signature import infer_signature
  2. 我们将在模型周围创建一个名为 SklearnModelWrapper 的包装类,继承自 mlflow.pyfunc.PythonModel 类。此类将返回实例属于确定类的概率。代码显示在以下代码段中:
    类 SklearnModelWrapper(mlflow.pyfunc.PythonModel):   def __init__(self, model):     self.model = 模型   def predict(self, context, model_input):     return self.model.predict_proba(model_input)[:,1] 
  3. 定义模型后,我们将使用 mlflow.start_run 创建一个新的 MLflow 运行,让我们跟上与赛道表现。我们还可以调用 mlflow.log_param 来显示哪些参数正在传递给模型。在以下代码示例中,mlflow.log_metric 将记录 性能我们选择的指标,例如 receiver operating characteristic (< strong class="bold">ROC) 在这种情况下:
    使用 mlflow.start_run(run_name='untuned_random_forest'):   n_estimators = 10   model = RandomForestClassifier(n_estimators=n_estimators, random_state=np.random.RandomState(123))   model.fit(X_train, y_train)   predictions_test = model.predict_proba(X_test)[:,1]   auc_score = roc_auc_score(y_test, predictions_test)   mlflow.log_param('n_estimators', n_estimators)   mlflow.log_metric('auc', auc_score)   wrappedModel = SklearnModelWrapper(模型)   signature = infer_signature(X_train, WrappedModel.predict(None, X_train))   mlflow.pyfunc.log_model("random_forest_model", python_model=wrappedModel, signature=signature) 
  4. 在前面的代码片段中,predict_proba 函数将返回一个元组,其中包含属于两个类之一的概率。我们将只保留一个,因此我们使用 [:, 1 对输出进行切片。部署模型后,我们创建一个签名,稍后将用于验证输入。在这里,我们可以看到在 Azure Databricks 笔记本上运行的整个代码:

    读书笔记《distributed-data-systems-with-azure-databricks》第7章在Azure数据库中使用Python库

    图 8.1 - 使用 MLflow start_run 装饰器1类图模型

  5. 在模型经过训练后,我们可以检查模型的学习特征重要性输出作为健全性检查,如下所示:
    feature_importances = pd.DataFrame(model.feature_importances_, index=X_train.columns.tolist(), columns=['importance']) feature_importances.sort_values('importance', ascending=False)
  6. 在这里,我们可以看到哪些 变量对模型最重要:
    读书笔记《distributed-data-systems-with-azure-databricks》第7章在Azure数据库中使用Python库

    图 8.12 – 特征模型结构

  7. 它表明酒精和密度在预测质量方面都很重要。你可以点击右上角的Experiment按钮查看Experiment Runs strong> 侧边栏,如下图所示:
读书笔记《distributed-data-systems-with-azure-databricks》第7章在Azure数据库中使用Python库

图 8.13 – 跟踪的实验运行

在这个例子中,这个模型实现了一个区域 曲线 (AUC) 0.889。

Registering the model in the MLflow Model Registry

接下来的步骤将是 在 MLflow 模型注册表中注册我们刚刚训练的模型,这是一个集中的模型注册表,允许我们管理训练模型的整个生命周期并在 Azure Databricks 中的任何位置使用我们的模型。这个模型库不仅会存储模型,还会为我们提供对它们的版本控制,并允许我们添加描述和评论:

  1. 在下一个示例中,我们将展示如何以编程方式注册模型,但您也可以使用 用户 界面 (UI) 在 Azure Databricks 工作区中。首先,我们将存储我们刚刚训练的模型的最新运行的运行 ID。我们搜索运行 ID,使用 tags.mlflow.runName 参数按模型名称过滤,如下所示:
    run_id = mlflow.search_runs(filter_string='tags.mlflow.runName = "untuned_random_forest"').iloc[0].run_id model_name = "葡萄酒品质" model_version = mlflow.register_model(f"runs:/{run_id}/random_forest_model", model_name)
                      
                
                  
                 

    如果导航到Azure Databricks 工作区中的模型页面,您将看到我们刚刚注册的模型。

  2. 在我们的模型注册成功后,我们就可以设置这个模型的舞台了。我们将 将我们的模型移至生产环境并将其加载到笔记本中,如以下代码片段所示:
    from mlflow.tracking import MlflowClient 客户端 = MlflowClient() client.transition_model_version_stage( 名称=模型名称, 版本=model_version.version, 阶段="生产", )
  3. 我们可以在单元格输出中看到模型已正确注册:

    读书笔记《distributed-data-systems-with-azure-databricks》第7章在Azure数据库中使用Python库

    图 8.14 – 正确注册的模型

  4. 现在,我们可以通过使用 f"models:/{model_name}/production" 路径来引用我们的模型。然后可以将模型加载到 Spark 用户定义 函数 (UDF) 所以 可以应用到增量表以进行批量推理。以下代码将加载我们的模型并将其应用于 Delta 表:
    导入 mlflow.pyfunc 从 pyspark.sql.functions 导入结构 apply_model_udf = mlflow.pyfunc.spark_udf(spark, f"models:/{model_name}/production") new_data = spark.read.format("delta").load(table_path)
  5. 一旦加载了模型并从 Delta 表中读取了数据,我们就可以使用结构体将输入变量传递给注册为 UDF 的模型,如下所示:
    udf_inputs = struct(*(X_train.columns.tolist())) new_data = new_data.withColumn( “预言”, apply_model_udf(udf_inputs) ) display(new_data)

在这里,Delta 表中的每条记录现在都有一个关联的预测。

Model serving

我们可以使用我们的 模型进行低延迟预测,方法是使用 MLflow 模型为我们提供一个端点,以便我们可以使用 REST API 发出请求并获取预测作为响应。

您将需要 Databricks 令牌才能向端点发出请求。如前几章所述,您可以在 Azure Databricks 的 User Settings 页面中获取您的令牌。

在能够向我们的终结点发出请求之前,请记住在 Azure Databricks 工作区的模型页面上启用服务。您可以在以下屏幕截图中看到有关如何执行此操作的提醒:

读书笔记《distributed-data-systems-with-azure-databricks》第7章在Azure数据库中使用Python库

图 8.15 – 启用模型服务

  1. 最后,我们可以调用我们的模型。以下示例函数将发出一个 Pandas 数据帧,并将返回数据帧中每一行的推理:
    导入操作系统 导入请求 将熊猫导入为 pd def score_model(数据集:pd.DataFrame): url = 'https://YOUR_DATABRICKS_URL/model/wine_quality/Production/invocations' headers = {'Authorization': f'Bearer {your_databricks_token}' data_json = dataset.to_dict(orient='split') response = requests.request(method='POST', headers=headers, url=url, json=data_json) 如果 response.status_code != 200:   引发异常(f'请求失败,状态为 {response.status_code},{response.text}') return response.json()
  2. 我们可以通过传递X_test数据来比较在本地模型上得到的结果,如下代码片段所示。我们应该得到相同的结果:
    num_predictions = 5 serve_predictions = score_model(X_test[:num_predictions]) model_evaluations = model.predict(X_test[:num_predictions]) pd.DataFrame({ “模型预测”:model_evaluations, “服务模型预测”:served_predictions, })

通过这种方式,我们通过使用我们发布的模型作为发出请求的端点,对小批量数据启用了低延迟预测。

Summary

在本节中,我们介绍了许多与如何使用标记化、多项式扩展和 one-hot 编码等方法提取和改进数据中可用特征相关的示例。这些方法允许我们为模型的训练准备变量,并被视为特征工程的一部分。

接下来,我们深入研究了如何使用 TF-IDF 和 Word2Vec 从文本中提取特征,以及如何使用 PySpark API 处理 Azure Databricks 中的缺失数据。最后,我们完成了一个示例,说明如何训练深度学习模型并使其准备好在发布 REST API 请求时提供服务和获取预测。

在下一章中,我们将更多地关注使用 TFRecords 和 Petastorm 处理大量数据以进行深度学习,以及如何利用现有模型从 Azure Databricks 中的新数据中提取特征。