vlambda博客
学习文章列表

真香!PySpark整合Apache Hudi实战

1. 准备

Hudi支持Spark-2.x版本,你可以点击如下链接安装Spark,并使用pyspark启动

 
   
   
 
  1. # pyspark

  2. export PYSPARK_PYTHON=$(which python3)

  3. spark-2.4.4-bin-hadoop2.7/bin/pyspark \

  4. --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \

  5. --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

  • spark-avro模块需要在--packages显示指定

  • spark-avro和spark的版本必须匹配

  • 本示例中,由于依赖spark-avro2.11,因此使用的是scala2.11构建hudi-spark-bundle,如果使用spark-avro2.12,相应的需要使用hudi-spark-bundle_2.12

进行一些前置变量初始化

 
   
   
 
  1. # pyspark

  2. tableName = "hudi_trips_cow"

  3. basePath = "file:///tmp/hudi_trips_cow"

  4. dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()

其中DataGenerator可以用来基于行程schema生成插入和删除的样例数据。

2. 插入数据

生成一些新的行程数据,加载到DataFrame中,并将DataFrame写入Hudi表

 
   
   
 
  1. # pyspark

  2. inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))

  3. df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))


  4. hudi_options = {

  5. 'hoodie.table.name': tableName,

  6. 'hoodie.datasource.write.recordkey.field': 'uuid',

  7. 'hoodie.datasource.write.partitionpath.field': 'partitionpath',

  8. 'hoodie.datasource.write.table.name': tableName,

  9. 'hoodie.datasource.write.operation': 'insert',

  10. 'hoodie.datasource.write.precombine.field': 'ts',

  11. 'hoodie.upsert.shuffle.parallelism': 2,

  12. 'hoodie.insert.shuffle.parallelism': 2

  13. }


  14. df.write.format("hudi"). \

  15. options(**hudi_options). \

  16. mode("overwrite"). \

  17. save(basePath)

mode(Overwrite)会覆盖并重新创建数据集。示例中提供了一个主键 (schema中的 uuid),分区字段( region/county/city)和组合字段(schema中的 ts) 以确保行程记录在每个分区中都是唯一的。

3. 查询数据

将数据加载至DataFrame

 
   
   
 
  1. # pyspark

  2. tripsSnapshotDF = spark. \

  3. read. \

  4. format("hudi"). \

  5. load(basePath + "/*/*/*/*")


  6. tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")


  7. spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()

  8. spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()

该查询提供读取优化视图,由于我们的分区路径格式为 region/country/city),从基本路径(basepath)开始,我们使用 load(basePath+"/*/*/*/*")来加载数据。

4. 更新数据

与插入新数据类似,还是使用DataGenerator生成更新数据,然后使用DataFrame写入Hudi表。

 
   
   
 
  1. # pyspark

  2. updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))

  3. df = spark.read.json(spark.sparkContext.parallelize(updates, 2))

  4. df.write.format("hudi"). \

  5. options(**hudi_options). \

  6. mode("append"). \

  7. save(basePath)

注意,现在保存模式现在为 append。通常,除非是第一次尝试创建数据集,否则请始终使用追加模式。每个写操作都会生成一个新的由时间戳表示的commit 。

5. 增量查询

Hudi提供了增量拉取的能力,即可以拉取从指定commit时间之后的变更,如不指定结束时间,那么将会拉取最新的变更。

 
   
   
 
  1. # pyspark

  2. # reload data

  3. spark. \

  4. read. \

  5. format("hudi"). \

  6. load(basePath + "/*/*/*/*"). \

  7. createOrReplaceTempView("hudi_trips_snapshot")


  8. commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").limit(50).collect()))

  9. beginTime = commits[len(commits) - 2] # commit time we are interested in


  10. # incrementally query data

  11. incremental_read_options = {

  12. 'hoodie.datasource.query.type': 'incremental',

  13. 'hoodie.datasource.read.begin.instanttime': beginTime,

  14. }


  15. tripsIncrementalDF = spark.read.format("hudi"). \

  16. options(**incremental_read_options). \

  17. load(basePath)

  18. tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")


  19. spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()

这表示查询在开始时间提交之后的所有变更,此增量拉取功能可以在批量数据上构建流式管道。

6. 特定时间点查询

即如何查询特定时间的数据,可以通过将结束时间指向特定的提交时间,将开始时间指向”000”(表示最早的提交时间)来表示特定时间。

 
   
   
 
  1. # pyspark

  2. beginTime = "000" # Represents all commits > this time.

  3. endTime = commits[len(commits) - 2]


  4. # query point in time data

  5. point_in_time_read_options = {

  6. 'hoodie.datasource.query.type': 'incremental',

  7. 'hoodie.datasource.read.end.instanttime': endTime,

  8. 'hoodie.datasource.read.begin.instanttime': beginTime

  9. }


  10. tripsPointInTimeDF = spark.read.format("hudi"). \

  11. options(**point_in_time_read_options). \

  12. load(basePath)


  13. tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")

  14. spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

7. 删除数据

删除传入的HoodieKey集合,注意:删除操作只支持append模式

 
   
   
 
  1. # pyspark

  2. # fetch total records count

  3. spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()

  4. # fetch two records to be deleted

  5. ds = spark.sql("select uuid, partitionPath from hudi_trips_snapshot").limit(2)


  6. # issue deletes

  7. hudi_delete_options = {

  8. 'hoodie.table.name': tableName,

  9. 'hoodie.datasource.write.recordkey.field': 'uuid',

  10. 'hoodie.datasource.write.partitionpath.field': 'partitionpath',

  11. 'hoodie.datasource.write.table.name': tableName,

  12. 'hoodie.datasource.write.operation': 'delete',

  13. 'hoodie.datasource.write.precombine.field': 'ts',

  14. 'hoodie.upsert.shuffle.parallelism': 2,

  15. 'hoodie.insert.shuffle.parallelism': 2

  16. }


  17. from pyspark.sql.functions import lit

  18. deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))

  19. df = spark.sparkContext.parallelize(deletes).toDF(['partitionpath', 'uuid']).withColumn('ts', lit(0.0))

  20. df.write.format("hudi"). \

  21. options(**hudi_delete_options). \

  22. mode("append"). \

  23. save(basePath)


  24. # run the same read query as above.

  25. roAfterDeleteViewDF = spark. \

  26. read. \

  27. format("hudi"). \

  28. load(basePath + "/*/*/*/*")

  29. roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")

  30. # fetch should return (total - 2) records

  31. spark.sql("select uuid, partitionPath from hudi_trips_snapshot").count()

8. 总结

本篇博文展示了如何使用pyspark来插入、删除、更新Hudi表,有pyspark和Hudi需求的小伙伴不妨一试!

猜你喜欢