vlambda博客
学习文章列表

翻译:.NET 对Apache®Spark™开业大数据分析工具的支持

.NET for Apache Spark旨在使.NET开发人员可以访问Apache®Spark™ ,从而使激动人心的大数据分析世界成为现实 。 .NET for Spark可用于处理批量数据,实时流,机器学习和即席查询。

DataFrame是Spark编程中的核心数据结构之一。 DataFrame是组织为命名列的分布式数据集合。 在Spark应用程序中,我们通常首先从数据源读取输入数据,将其存储在DataFrame中,然后利用Spark SQL之类的功能来转换并从数据中获取见解。 用户定义函数或UDF是基于列的函数,可让我们处理存储在DataFrames中的数据。

在2019年12月, .NET团队宣布了Microsoft.Data.Analysis.DataFrame类型的预览版,以简化.NET中的数据探索。 现在,在2020年3月,我们为.NET for Spark代码库引入了便利性API,以将Microsoft.Data.Analysis.DataFrame对象与Spark中的UDF结合使用。 这些便捷的API使.NET for Spark中的UDF数据处理和分析更加便捷和简洁。

在此博客文章中,我们将探讨:

  • 实施目标和细节

  • 编码示例和DataFrame比较

  • 包起来

实施目标和细节

背景和目标

让我们从关于Spark UDF中数据共享的一些背景开始。 Apache Spark以Apache Arrow格式将数据流传输到基于Arrow的UDF。 Apache Arrow提供了一种标准化的,独立于语言的格式,用于处理内存中的数据。 它专为通过其列存储格式进行高性能,高效的分析而设计,并且它提供库和零拷贝消息传递,以实现跨进程的通信。

因为Spark将数据以Arrow格式流式传输到UDF,所以在使用UDF时,您需要了解Arrow格式,例如如何读取Arrow列,写入Arrow列以及解包RecordBatch,这是Arrow中的2D数据类型,包括列的等长列的集合。

本博客文章中描述的工作的主要目标是通过一组便捷的API改进.NET for Spark中的标量和矢量UDF。 您现在可以在.NET for Spark应用程序中使用Microsoft.Data.Analysis.DataFrame对象,这些对象将为您处理UDF背后的箭头格式数据。

细节

Spark UDF有几种:pickling,标量和矢量。 我们的便捷API特别适用于标量和矢量UDF。

PythonpicklingUDF是Spark UDF的旧版本。 他们利用Python的序列化pickling格式而不是Arrow来在JVM和.NET for Spark进程之间转换数据。 一旦指定了要处理的列,pickling的UDF就会占用其每一行,应用给定的功能,然后添加新的列,从而导致相当多的开销。

相比之下,标量UDF和矢量UDF利用Arrow序列化,使它们能够利用内存列式格式的好处,并使数据传输更加高效。 数据序列化为Arrow格式后,就可以直接在Spark流程中使用,并且不再需要序列化或反序列化了,这是对Pythonpickling格式的重大改进。 Arrow可以使用零复制方法跨数据块(一次同时显示多个行和列)而不是逐行创建DataFrame。

我们针对Apache Spark便捷API的新.NET特别适用于标量和矢量UDF,因为它们使用Arrow。

在使用这些便捷的API之前,您需要枚举Arrow RecordBatch才能在基于Arrow的UDF中使用列。 RecordBatches是基于箭头的对象,而不是标准的Spark对象,因此可能会破坏程序中的代码流或熟悉度。

我们的新API自动将数据包装在Microsoft.Data.Analysis.DataFrame中,而不是RecordBatch中。 包装不涉及复制数据,因此可以确保性能保持较高,因为我们的编码简便性也得到了提高。

您可以在程序中同时使用传统的Spark SQL和Microsoft.Data.Analysis.DataFrames。 传统的Spark DataFrame在您的Spark集群中分布数据。 它用于Spark驱动程序中的整个数据集。 创建UDF后,传统DataFrame中的数据将以Arrow格式流式传输到工作机上的UDF。 一旦进入UDF,您现在将使用Microsoft.Data.Analysis.DataFrame(而不是RecordBatches)-它将存储在单台计算机上。 Microsoft.Data.Analysis.DataFrame的概念类似于Python Pandas DataFrame 。

简单的例子

让我们从一个基本示例开始。 假设我们有一个向量UDF,该向量将2列相加并返回结果。 传统上,UDF将接收2个ArrowArray(例如DoubleArray)并返回一个新的ArrowArray。 不幸的是,您需要创建新的数组,然后遍历数据中的每一行。 除了循环外,您还需要处理Arrow库中的Builder对象来创建新数组,从而导致不必要的分配。

如果改为使用Microsoft.Data.Analysis.DataFrame,则可以按照以下方式编写一些内容: dataframe.ColumnA + dataframe.ColumnB 。 不太方便!

再举一个例子,我们经常创建返回一组列的UDF。 对于传统的Spark DataFrame,这些列必须作为Arrow RecordBatch返回。 但是有了新的便捷API,我们就可以返回一个DataFrame,其他所有事情都在内部处理!

详细的例子

矢量Udfs

让我们看一个更详细的具体示例。 VectorUdfs.cs是使用传统Spark DataFrame的程序。 它以人的姓名和年龄作为输入读取Json文件,并将数据存储在DataFrame中。 该程序利用Spark将相同年龄的记录分组,然后在每个年龄组上应用自定义UDF。

假设您有2个年龄相同的人:

21 | John

21 | Sally

UDF将计算所有具有相同年龄的名称中的字符数。 因此,在这种情况下,您将获得1行: 21 | 9 ,其中9是“ John” .Length +“ Sally” .Length的结果。

VectorDataFrameUdfs.cs是一个更新的程序,可以通过传统的DataFrame和Microsoft.Data.Analysis.DataFrame来完成相同的任务。

这两个程序都使用分组地图矢量UDF,并且非常类似地应用它。 在VectorUdfs.cs中,代码如下:

df.GroupBy("age")
.Apply(
new StructType(new[]
{
new StructField("age", new IntegerType()),
new StructField("nameCharCount", new IntegerType())
}),
r
=> CountCharacters(r))

CountCharacters在每个程序中的实现方式有所不同。 在VectorUdfs.cs中,定义为:

private static RecordBatch CountCharacters(RecordBatch records){
StringArray nameColumn = records.Column("name") as StringArray;

int characterCount = 0;

for (int i = 0; i < nameColumn.Length; ++i)
{
string current = nameColumn.GetString(i);
characterCount
+= current.Length;
}

int ageFieldIndex = records.Schema.GetFieldIndex("age");
Field ageField = records.Schema.GetFieldByIndex(ageFieldIndex);

// Return 1 record, if we were given any. 0, otherwise.
int returnLength = records.Length > 0 ? 1 : 0;

return new RecordBatch(
new Schema.Builder()
.Field(ageField)
.Field(f => f.Name("name_CharCount").DataType(Int32Type.Default))
.Build(),
new IArrowArray[]
{
records
.Column(ageFieldIndex),
new Int32Array.Builder().Append(characterCount).Build()
},
returnLength
);}

在VectorDataFrameUdfs.cs中,方法是:

private static FxDataFrame CountCharacters(FxDataFrame dataFrame){
int characterCount = 0;

var characterCountColumn = new PrimitiveDataFrameColumn<int>("nameCharCount");
var ageColumn = new PrimitiveDataFrameColumn</int><int>("age");
ArrowStringDataFrameColumn nameColumn = dataFrame["name"] as ArrowStringDataFrameColumn;
for (long i = 0; i < dataFrame.Rows.Count; ++i)
{
characterCount
+= nameColumn[i].Length;
}

if (dataFrame.Rows.Count > 0)
{
characterCountColumn
.Append(characterCount);
ageColumn
.Append((int?)dataFrame["age"][0]);
}

return new FxDataFrame(ageColumn, characterCountColumn);}</int>

请注意, FxDataFrame类型表示Microsoft.Data.Analysis.DataFrame,而DataFrame表示传统的Spark DataFrame。 在程序顶部的后一个示例中表明了这一点:

using DataFrame = Microsoft.Spark.Sql.DataFrame;

using FxDataFrame = Microsoft.Data.Analysis.DataFrame;

如您所见,后者的CountCharacters实现完全处理DataFrames而不是RecordBatches。 它也几乎是第一个实现长度的一半!

在一次比较中,我们可以看到这些新API在何处为我们的.NET for Spark应用程序增加了很多便利。 VectorUdfs.cs要求我们在RecordBatch类型之间来回转换,这需要许多额外的代码行。 相比之下,新的VectorDataFrameUdfs.cs示例可以立即进入我们的数据处理。

TPC-H向量函数

用于Apache Spark的.NET是为高性能而设计的,并且在TPC-H基准测试中表现良好。 TPC-H基准测试包括一组面向业务的临时查询和并发数据修改。 选择查询和填充数据库的数据具有广泛的行业关联性。

VectorFunctions.cs和VectorDataFrameFunctions.cs都执行相同的ComputeTotal TPC-H函数,其中价格是根据税费和折扣计算的。 但是,仅后者程序利用Microsoft.Data.Analysis.DataFrame。

VectorFunctions.cs中的ComputeTotal :

internal static DoubleArray ComputeTotal(DoubleArray price, DoubleArray discount, DoubleArray tax){
if ((price.Length != discount.Length) || (price.Length != tax.Length))
{
throw new ArgumentException("Arrays need to be the same length");
}

int length = price.Length;
var builder = new DoubleArray.Builder().Reserve(length);
ReadOnlySpan<double> prices = price.Values;
ReadOnlySpan</double><double> discounts = discount.Values;
ReadOnlySpan</double><double> taxes = tax.Values;
for (int i = 0; i < length; ++i)
{
builder
.Append(prices[i] * (1 - discounts[i]) * (1 + taxes[i]));
}

return builder.Build();}</double>


ComputeTotal中ComputeTotal中的逻辑(不包括初始数组长度检查)只有1行!

return price * (1 - discount) * (1 + tax);

现在,我们可以利用Apache Arrow的巨大优势,而不会产生额外的代码开销或混乱-太棒了!

结语

感谢Prashanth Govindarajan,Eric Erhardt,Terry Kim,以及Apache Spark团队的.NET和.NET的其他成员,感谢他们对这项杰出工作的贡献。

我们很乐意帮助您开始使用.NET for Apache Spark,并听听您的反馈。 您可以从我们的登录页面请求演示(https://dot.net/spark) ,并查看.NET for Spark GitHub存储库(https://github.com/dotnet/spark),以参与我们的工作,以使.NET成为构建大数据应用程序的出色技术堆栈。

出处:https://devblogs.microsoft.com/dotnet/net-for-apache-spark-in-memory-dataframe-support/

翻译:google翻译