Spark使用explode展开嵌套的JSON数据
在使用Spark的人中,估计很多人都会使用DataFrame及SQLContext,而在众多的数据格式中,很可能会遇到JSON数据,此数据还可能包含嵌套关系,比如像如下的JSON数据:
{"name":"Michael", "age":25,"myScore":[{"score1":19,"score2":23},{"score1":58,"score2":50}]}
{"name":"Andy", "age":30,"myScore":[{"score1":29,"score2":33},{"score1":38,"score2":52},{"score1":88,"score2":71}]}
{"name":"Justin", "age":19,"myScore":[{"score1":39,"score2":43},{"score1":28,"score2":53}]}
此时,如果我们直接用DataFrame的show方法可以看到:
+---+--------------------+-------+
|age| myScore| name|
+---+--------------------+-------+
| 25| [[23,19], [50,58]]|Michael|
| 30|[[33,29], [52,38]...| Andy|
| 19| [[43,39], [53,28]]| Justin|
+---+--------------------+-------+
root
|-- age: long (nullable = true)
|-- myScore: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- score2: long (nullable = true)
| | |-- score1: long (nullable = true)
|-- name: string (nullable = true)
由于myScore是一个数组,所以,在上述show得到的表中,我们不能直接使用sql来查询或聚合,那么如何才能将myScore的数组类型展开呢?
我们可以考虑使用explode函数,如下
val df = sqlContext.read.json("hdfs://master:9000/test/people_Array.json")
df.show()
df.printSchema()
val dfScore = df.select(df("name"),explode(df("myScore"))).toDF("name","myScore")
val dfMyScore = dfScore.select("name","myScore.score1", "myScore.score2")
dfScore.show()
此时,会得到如下结果,这个时候的表,就跟我们平时看到的关系型数据庫的表是一样的了,接下来,我们就可以执行相关的sql查询了
+-------+-----------------+------------------+
| name| score1| score2|
+-------+-----------------+------------------+
|Michael| 19| 23|
|Michael| 58| 50|
| Andy| 29| 33|
| Andy| 38| 52|
| Andy| 88| 71|
| Justin| 39| 43|
| Justin| 28| 53|
+-------+-----------------+------------------+
完整的代码如下:
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions._
import org.junit.{After, Before, Test}
import org.junit.Assert.assertEquals
/**
* Created by yang on 8/16/16.
*/
class Test {
var sc: SparkContext = _
var sqlContext:SQLContext = _
def init(): Unit ={
val conf = new SparkConf().setAppName("Test").setMaster("spark://master:7077")
sc = new SparkContext(conf)
sqlContext = new org.apache.spark.sql.SQLContext(sc)
}
def TestMapFun(): Unit ={
val df = sqlContext.read.json("hdfs://master:9000/test/people_Array.json")
df.show()
df.printSchema()
val dfScore = df.select(df("name"),explode(df("myScore"))).toDF("name","myScore")
val dfMyScore = dfScore.select("name","myScore.score1", "myScore.score2")
dfMyScore.show()
dfMyScore.registerTempTable("table1")
val result = sqlContext.sql("select name,avg(hwScore_Std),avg(exScore_Std) from table1")
assertEquals(7,dfMyScore.count())
}
}
以上代码需要一些包,我是用sbt构建的,内容如下:
name := "Test"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "junit" % "junit" % "4.12" % "test"
libraryDependencies += "com.novocode" % "junit-interface" % "0.11" % "test"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0"