vlambda博客
学习文章列表

Spark使用explode展开嵌套的JSON数据

在使用Spark的人中,估计很多人都会使用DataFrame及SQLContext,而在众多的数据格式中,很可能会遇到JSON数据,此数据还可能包含嵌套关系,比如像如下的JSON数据:

{"name":"Michael", "age":25,"myScore":[{"score1":19,"score2":23},{"score1":58,"score2":50}]}{"name":"Andy", "age":30,"myScore":[{"score1":29,"score2":33},{"score1":38,"score2":52},{"score1":88,"score2":71}]}{"name":"Justin", "age":19,"myScore":[{"score1":39,"score2":43},{"score1":28,"score2":53}]}

此时,如果我们直接用DataFrame的show方法可以看到:

+---+--------------------+-------+|age| myScore| name|+---+--------------------+-------+| 25| [[23,19], [50,58]]|Michael|| 30|[[33,29], [52,38]...| Andy|| 19| [[43,39], [53,28]]| Justin|+---+--------------------+-------+
root |-- age: long (nullable = true) |-- myScore: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- score2: long (nullable = true) | | |-- score1: long (nullable = true) |-- name: string (nullable = true)

由于myScore是一个数组,所以,在上述show得到的表中,我们不能直接使用sql来查询或聚合,那么如何才能将myScore的数组类型展开呢?
我们可以考虑使用explode函数,如下

val df = sqlContext.read.json("hdfs://master:9000/test/people_Array.json")df.show()df.printSchema()val dfScore = df.select(df("name"),explode(df("myScore"))).toDF("name","myScore")val dfMyScore = dfScore.select("name","myScore.score1", "myScore.score2")dfScore.show()

此时,会得到如下结果,这个时候的表,就跟我们平时看到的关系型数据庫的表是一样的了,接下来,我们就可以执行相关的sql查询了

+-------+-----------------+------------------+| name| score1| score2|+-------+-----------------+------------------+|Michael| 19| 23||Michael| 58| 50|| Andy| 29| 33|| Andy| 38| 52|| Andy| 88| 71|| Justin| 39| 43|| Justin| 28| 53|+-------+-----------------+------------------+

完整的代码如下:

import org.apache.spark.SparkContextimport org.apache.spark.sql.{DataFrame, SQLContext}import org.apache.spark.sql.functions._import org.junit.{After, Before, Test}import org.junit.Assert.assertEquals
/** * Created by yang on 8/16/16. */class Test {
@transient var sc: SparkContext = _
@transient var sqlContext:SQLContext = _
@Before def init(): Unit ={ val conf = new SparkConf().setAppName("Test").setMaster("spark://master:7077") sc = new SparkContext(conf) sqlContext = new org.apache.spark.sql.SQLContext(sc) }
@Test def TestMapFun(): Unit ={ val df = sqlContext.read.json("hdfs://master:9000/test/people_Array.json") df.show() df.printSchema()
val dfScore = df.select(df("name"),explode(df("myScore"))).toDF("name","myScore") val dfMyScore = dfScore.select("name","myScore.score1", "myScore.score2") dfMyScore.show()
dfMyScore.registerTempTable("table1") val result = sqlContext.sql("select name,avg(hwScore_Std),avg(exScore_Std) from table1") assertEquals(7,dfMyScore.count()) }}

以上代码需要一些包,我是用sbt构建的,内容如下:

name := "Test"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "junit" % "junit" % "4.12" % "test"
libraryDependencies += "com.novocode" % "junit-interface" % "0.11" % "test"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0"