数据分析EPHS(15)-Spark如何处理Hive的集合类型?
上一篇中我们介绍了hive中的数据类型,其中一类比较重要的类型即集合类型,主要包括struct、map、array三种。那么我们在spark中处理这三种类型呢?本文就来介绍一下。
1、数据介绍
我们还是先来回顾一下上篇中介绍的数据:
创建表:
create table if not exists
datatype_test4(
id int,
info struct<name:string,weight:double>,
score array<Int>,
info_map map<string,string>)
row format delimited fields terminated by ','
COLLECTION ITEMS TERMINATED BY ';'
MAP KEYS TERMINATED BY ':';
可以看到,我们定义了三种不同的集合类型字段,并指定了集合类型的分隔符为";",即struct,array,以及map的不同kv之间用";"分割,同时定义了map的key和value之间用":"分割。
接下来,我们创建如下内容的txt文件:
1,文文;70,99;96;100,name:文文;country:china
2,毛毛;60,99;92;100,name:毛毛;country:koera
3,超超;65,99;96;100,name:超超;country:japan
倒入hive中并查看:
load data local inpath '/Users/meituan_sxw/Downloads/test4.txt' into table datatype_test4;
select * from datatype_test4;
结果如下:
2、struct类型
struct类型在spark中对应的类型为org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema,对应的处理代码如下:
val df = spark.sql(
"""
|select * from
|default.datatype_test4
""".stripMargin)
.map(row=>{
val id = row.getAs[Int]("id")
val info = row.getAs[GenericRowWithSchema]("info")
val name = info.getAs[String]("name")
val weight = info.getAs[Double]("weight")
(id,name,weight)
}).collect().foreach(println)
输出结果为:
(1,文文,70.0)
(2,毛毛,60.0)
(3,超超,65.0)
其实一开始我也不知道是什么类型的,这主要看报错了,假设我们按row.getAs[String]("info")去获取对应的信息,则会发现有如下报错:
所以根据报错,顺藤摸瓜就可以啦。
3、array类型
对于hive中array类型的数据,如果用的是scala语言的话,对应的类型是scala.collection.mutable.WrappedArray,处理方式如下:
val df = spark.sql(
"""
|select * from
|default.datatype_test4
""".stripMargin)
.map(row=>{
val id = row.getAs[Int]("id")
val score = row.getAs[WrappedArray[Int]]("score")
(id,score(0),score(1),score(2))
}).collect().foreach(println)
输出结果为:
(1,99,96,100)
(2,99,92,100)
(3,99,96,100)
4、Map类型
对于hive中map类型的数据,对应的也是Scala中的scala.collection.immutable.Map类型,处理代码如下:
val df = spark.sql(
"""
|select * from
|default.datatype_test4
""".stripMargin)
.map(row=>{
val id = row.getAs[Int]("id")
val info = row.getAs[Map[String,String]]("info_map")
val name = info.get("name")
val country = info.get("country")
(id,name,country)
}).collect().foreach(println)
对应的输出如下:
(1,Some(文文),Some(china))
(2,Some(毛毛),Some(koera))
(3,Some(超超),Some(japan))
看上去有点奇怪,这是因为scala中的get() 方法返回的是一个叫 Option[String] 的类别。Option 有两个子类别,一个是 Some,一个是 None,当回传 Some 的时候,代表成功地返回了一个 String,同时可以通过 get() 这个函式拿到那个 String,如果返回的是 None,则代表没有字符串可以给你。所以,我们正确的写法如下:
val df = spark.sql(
"""
|select * from
|default.datatype_test4
""".stripMargin)
.map(row=>{
val id = row.getAs[Int]("id")
val info = row.getAs[Map[String,String]]("info_map")
val name = info.get("name").get
val country = info.get("country").get
(id,name,country)
}).collect().foreach(println)
此时才是我们想要的结果:
(1,文文,china)
(2,毛毛,koera)
(3,超超,japan)
好了,本篇就到这里了!