vlambda博客
学习文章列表

你敢说你真的懂Spark SQL吗?

      Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。

http://spark.apache.org/sql/


   为什么要学习Spark SQL?我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!同时Spark SQL也支持从Hive中读取数据。


      有人可能要说了,Spark SQL无非就是简单的SELECT, FROM ,WHERE, GROUP BY 之类,那么请你不要说你懂Spark SQL




列行转化你用过吗

TITLE CONTENT





行转列(透视pivot)

SparkTable表结构如下:


你敢说你真的懂Spark SQL吗?


多行转多列:下面有两种方式,第一种更灵活些


spark.sql("SELECT * FROM SparkTable").groupBy("product")
.pivot("country").sum("amount").show();


SELECT *  FROM SparkTable
pivot
(
sum(amount) for
country in ('USA','JPN','IND','CHA')
)


结果:

你敢说你真的懂Spark SQL吗?


多行转一列:


SELECT product,concat_ws(',', collect_set(country)) 

FROM SparkTable 

GROUP BY product



列转行(逆透视unpivot)



多列转多行:

SELECT product,stack(4, 'USA',USA,'JPN',JPN,'IND',IND,'CHA',CHA)

AS (`country`, `amount` )

FROM PivotTable


运行结果如下:

你敢说你真的懂Spark SQL吗?


一列转多行:

SELECT ifa, bundle, country AS geo
FROM tb1 e
lateral view explode(bundles) bund AS bundle




 


窗口函数你用过吗

TITLE CONTENT



你敢说你真的懂Spark SQL吗?

first_value

按照时间降序排列获取第一个不为null的ua值赋值给每一条记录


SELECT ifa, first_value(ua, true) over (PARTITION BY ifa ORDER BY time DESC) 

AS ua

FROM tb2


row_number & rank

按照时间获取每条记录的排序序号,row_number不重复排序,rank出现重复排序

下面一个是按照ifa 分组后排序,一个是总排序。


SELECT
ifa
row_number() over(PARTITION BY ifa ORDER BY time DESC ) AS rn,
rank() over(ORDER BY time DESC) AS rank
FROM tb2


other

还有其他的请自己test

count(...) over(partition by ... order by ...)--求分组后的总数。
sum(...) over(partition by ... order by ...)--求分组后的和。
max(...) over(partition by ... order by ...)--求分组后的最大值。
min(...) over(partition by ... order by ...)--求分组后的最小值。
avg(...) over(partition by ... order by ...)--求分组后的平均值。
dense_rank() over(partition by ... order by ...)--rank值是连续的。
last_value(...) over(partition by ... order by ...)--求分组内的最后一个值。
lag() over(partition by ... order by ...)--取出前n行数据。
lead() over(partition by ... order by ...)--取出后n行数据。
ratio_to_report() over(partition by ... order by ...)--Ratio_to_report() 括号中就是分子,over() 括号中就是分母。
percent_rank() over(partition by ... order by ...)--结果可以视作为rank()的结果,除以最大的编号






下面的函数你用过吗

TITLE CONTENT



flatten

数据join拉平


> SELECT flatten(array(array(1, 2), array(3, 4)));
[1,2,3,4]

filter

数组过滤

>SELECT filter(array(1, 2, 3), x -> x % 2 == 1);
[1,3]

array_distinct

数组排重

> SELECT array_distinct(array(1, 2, 3, null, 3));
[1,2,3,null]

array_intersect

数组交集

> SELECT array_intersect(array(1, 2, 3), array(1, 3, 5));
[1,3]

array_except

在数据一中但不在数组二中


> SELECT array_except(array(1, 2, 3), array(1, 3, 5));
[2]

get_json_object

获取json对象的值

> SELECT get_json_object('{"a":"b"}', '$.a');
b

when

> SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
1.0


xpath

返回xml 中匹配到的值,以数组返回

> SELECT xpath('<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>','a/b/text()');
["b1","b2","b3"]





 下面的参数你配置过吗

TITLE CONTENT




经过shuffle后的分区数量,默认200


set spark.sql.shuffle.partitions=400


默认是4096,意味着每次从parquet文件中读取一列的行数,这个值配大了容易出现OOM,比如一个string字段,假如一个string 1KB,那么读一批就需要4MB内存,加上并发,则需要更大的内存。因此在内存资源不足的场景可以适当减小


set spark.sql.parquet.columnarReaderBatchSize=512


设置忽略错误文件

set spark.sql.files.ignoreCorruptFiles=true



默认是false,如果是true,则会忽略那些空的splits,减小task的数量。

set spark.hadoopRDD.ignoreEmptySplits=true


参数是用于开启spark的自适应执行

set spark.sql.adaptive.enabled=true
set spark.sql.adaptive.join.enabled =true



参数默认是false,当设置为true的时候会在获得分区路径时对分区路径是否存在做一个校验,过滤掉不存在的分区路径,这样就会避免报错

set spark.sql.hive.verifyPartitionPath=true



1或者2,默认是1. MapReduce-4815 详细介绍了 fileoutputcommitter 的原理,实践中设置了 version=2 的比默认 version=1 的减少了70%以上的 commit 时间,但是1更健壮,能处理一些情况下的异常。

set spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2




猜您喜欢
往期精选▼