你敢说你真的懂Spark SQL吗?
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。
http://spark.apache.org/sql/
为什么要学习Spark SQL?我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!同时Spark SQL也支持从Hive中读取数据。
有人可能要说了,Spark SQL无非就是简单的SELECT, FROM ,WHERE, GROUP BY 之类,那么请你不要说你懂Spark SQL。
列行转化你用过吗
TITLE CONTENT
行转列(透视pivot)
SparkTable表结构如下:
多行转多列:下面有两种方式,第一种更灵活些
spark.sql("SELECT * FROM SparkTable").groupBy("product")
.pivot("country").sum("amount").show();
SELECT * FROM SparkTable
pivot
(
sum(amount) for
country in ('USA','JPN','IND','CHA')
)
结果:
多行转一列:
SELECT product,concat_ws(',', collect_set(country))
FROM SparkTable
GROUP BY product
列转行(逆透视unpivot)
多列转多行:
SELECT product,stack(4, 'USA',USA,'JPN',JPN,'IND',IND,'CHA',CHA)
AS (`country`, `amount` )
FROM PivotTable
运行结果如下:
一列转多行:
SELECT ifa, bundle, country AS geo
FROM tb1 e
lateral view explode(bundles) bund AS bundle
窗口函数你用过吗
TITLE CONTENT
first_value
按照时间降序排列获取第一个不为null的ua值赋值给每一条记录
SELECT ifa, first_value(ua, true) over (PARTITION BY ifa ORDER BY time DESC)
AS ua
FROM tb2
row_number & rank
按照时间获取每条记录的排序序号,row_number不重复排序,rank出现重复排序
下面一个是按照ifa 分组后排序,一个是总排序。
SELECT
ifa
row_number() over(PARTITION BY ifa ORDER BY time DESC ) AS rn,
rank() over(ORDER BY time DESC) AS rank
FROM tb2
other
还有其他的请自己test
count(...) over(partition by ... order by ...)--求分组后的总数。
sum(...) over(partition by ... order by ...)--求分组后的和。
max(...) over(partition by ... order by ...)--求分组后的最大值。
min(...) over(partition by ... order by ...)--求分组后的最小值。
avg(...) over(partition by ... order by ...)--求分组后的平均值。
dense_rank() over(partition by ... order by ...)--rank值是连续的。
last_value(...) over(partition by ... order by ...)--求分组内的最后一个值。
lag() over(partition by ... order by ...)--取出前n行数据。
lead() over(partition by ... order by ...)--取出后n行数据。
ratio_to_report() over(partition by ... order by ...)--Ratio_to_report() 括号中就是分子,over() 括号中就是分母。
percent_rank() over(partition by ... order by ...)--结果可以视作为rank()的结果,除以最大的编号
下面的函数你用过吗
TITLE CONTENT
flatten
数据join拉平
> SELECT flatten(array(array(1, 2), array(3, 4)));
[1,2,3,4]
filter
数组过滤
>SELECT filter(array(1, 2, 3), x -> x % 2 == 1);
[1,3]
array_distinct
数组排重
> SELECT array_distinct(array(1, 2, 3, null, 3));
[1,2,3,null]
array_intersect
数组交集
> SELECT array_intersect(array(1, 2, 3), array(1, 3, 5));
[1,3]
array_except
在数据一中但不在数组二中
> SELECT array_except(array(1, 2, 3), array(1, 3, 5));
[2]
get_json_object
获取json对象的值
> SELECT get_json_object('{"a":"b"}', '$.a');
b
when
> SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
1.0
xpath
返回xml 中匹配到的值,以数组返回
> SELECT xpath('<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>','a/b/text()');
["b1","b2","b3"]
下面的参数你配置过吗
TITLE CONTENT
经过shuffle后的分区数量,默认200
set spark.sql.shuffle.partitions=400
默认是4096,意味着每次从parquet文件中读取一列的行数,这个值配大了容易出现OOM,比如一个string字段,假如一个string 1KB,那么读一批就需要4MB内存,加上并发,则需要更大的内存。因此在内存资源不足的场景可以适当减小
set spark.sql.parquet.columnarReaderBatchSize=512
设置忽略错误文件
set spark.sql.files.ignoreCorruptFiles=true
默认是false,如果是true,则会忽略那些空的splits,减小task的数量。
set spark.hadoopRDD.ignoreEmptySplits=true
参数是用于开启spark的自适应执行
set spark.sql.adaptive.enabled=true
set spark.sql.adaptive.join.enabled =true
参数默认是false,当设置为true的时候会在获得分区路径时对分区路径是否存在做一个校验,过滤掉不存在的分区路径,这样就会避免报错
set spark.sql.hive.verifyPartitionPath=true
1或者2,默认是1. MapReduce-4815 详细介绍了 fileoutputcommitter 的原理,实践中设置了 version=2 的比默认 version=1 的减少了70%以上的 commit 时间,但是1更健壮,能处理一些情况下的异常。
set spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2