vlambda博客
学习文章列表

数据倾斜【数据仓库系列05】

MapReduce数据倾斜:

(一)Map端

在Map端读数据时,由于读人数据的文件大小分布不均匀,因此会导致有些Map Instance 读取并且处理的数据特别多,而有些Map Instance 处理的数据特别少,造成Map端长尾。 以下两种情况可能会导致Map端长尾:
上游表文件的大小特别不均匀,并且小文件特别多,导致当前表Map端读取的数据分布不均匀,引起长尾。
Map端做聚合时,由于某些Map Instance读取文件的某个值特别多而引起长尾,主要是指Count Distinct操作。
解决方案:
第一种情况导致的Map 端长尾,可通过对上游合并小文件,同时调节本节点的小文件的参数来进行优化。
第二种情况可以使用“ distribute by rand ()”来打乱数据分布,使数据尽可能分布均匀。
Map端主要通过设置以下两个参数来避免数据倾斜:
hive.map.aggr = true: map端聚集操作;
hive.groupby.skewindata = true: 数据倾斜时负载均衡,启动两个MR,随机打散Key。
在写人磁盘之前,线程首先根据Reduce Instance 的个数划分分区,数据将会根据Key 值Hash 到不同的分区上,一个Reduce Instance 对应一个分区的数据。 Map 端也会做部分聚合操作,以减少输入Reduce 端的数据量。 由于数据是根据Hash 分配的,因此也会导致有些Reduce Instance 会分配到大量数据,而有些Reduce Instance 却分配到很少数据,甚至没有分配到数据。
Map 端长尾的根本原因是由于读人的文件块的数据分布不均匀,再加上UDF 函数性能、Join 、聚合操作等,导致读人数据量大的Map lnstance 耗时较长。 在开发过程中如果遇到Map 端长尾的情况,首先考虑如何让Map Instance 读取的数据量足够均匀,然后判断是哪些操作导致Map Instance 比较慢,最后考虑这些操作是否必须在Map 端完成,在其他阶段是否会做得更好。

(二)Join端

SQL在Join 执行阶段会将Join Key相同的数据分发到同一个执行Instance上处理。 如果某个Key 上的数据量比较大,则会导致该Instance 执行时间较长。 其表现为: 在执行日志中该Join Task 的大部分Instance 都已执行完成,但少数几个Instance 一直处于执行中(这种现象称之为长尾)。
这里主要讲述三种常见的倾斜场景:
Join的某路输入比较小,可以采用Map Join,避免分发引起长尾(Map Join的原理是将Join操作提前到Map 端执行,将小表读人内存,顺序扫描大表完成Join,这样可以避免因为分发key不均匀导致数据倾斜,MapJoin 的使用方法非常简单,在代码中select 后加上“/*+mapjoin(a) */”即可,其中a 代表小表的别名)。
Join的每路输入都较大,且长尾是空值导致的,可以将空值处理成随机值,避免聚集。
Join的每路输入都较大,且长尾是热点值导致的,可以对热点值和非热点值分别进行处理,再合并数据。

(三)Reduce端

Reduce端负责的是对Map端梳理后的有序key-value键值对进行聚合,即进行Count、Sum 、Avg等聚合操作,得到最终聚合的结果。 Distinct是MaxCompute SQL中支持的语法,用于对字段去重。 比如计算在某个时间段内支付买家数、访问UV等,都是需要用Distinct进行去重的。 MaxCompute中Distinct的执行原理是将需要去重的字段以及Group By字段联合作为key 将数据分发到Reduce端。
因为Distinct 操作,数据无法在Map 端的Shuffle阶段根据Group By先做一次聚合操作,以减少传输的数据量,而是将所有的数据都传输到Reduce端,当key的数据分发不均匀时,就会导致Reduce端长尾。 Reduce端产生长尾的主要原因就是key的数据分布不均匀。 比如有些Reduce任务Instance处理的数据记录多,有些处理的数据记录少,造成Reduce端长尾。 如下几种情况会造成Reduce 端长尾:
对同一个表按照维度对不同的列进行Count Distinct操作,造成Map端数据膨胀,从而使得下游的Join和Reduce出现链路上的长尾。
Map端直接做聚合时出现key值分布不均匀,造成Reduce端长尾。
动态分区数过多时可能造成小文件过多,从而引起Reduce端长尾。
多个Distinct同时出现在一段SQL代码中时,数据会被分发多次,不仅会造成数据膨胀N倍,还会把长尾现象放大N倍。
解决方案:
嵌套编写,先Group By再Count(*),如果group by维度过小,采用sum() group by的方式来替换count(distinct)完成计算;
可以对热点key进行单独处理,然后通过“ Union All ”合并;
可以把符合不同条件的数据放到不同的分区,避免通过多次“Insert Overwrite”,写人表中,特别是分区数比较多时,能够很好地简化代码;
在把不同指标Join 在一起之前, 一定要确保指标的粒度是原始表的数据粒度; 当代码比较臃肿时,也可以将上述子查询落到中间表里。

Hive数据倾斜:

1. 慎重使用COUNT DISTINCT

原因如下:
distinct会将b列所有的数据保存到内存中,形成一个类似hash的结构,速度是十分的块; 但是在大数据背景下,因为b列所有的值都会形成以key值,极有可能发生OOM
解决方案:
所以,可以考虑使用Group By 或者 ROW_NUMBER() OVER(PARTITION BY col)方式代替COUNT(DISTINCT col)

2. 小文件会造成资源的过度占用以及影响查询效率

原因如下:
众所周知,小文件在HDFS中存储本身就会占用过多的内存空间,那么对于MR查询过程中过多的小文件又会造成启动过多的Mapper Task, 每个Mapper都是一个后台线程,会占用JVM的空间
在Hive中,动态分区会造成在插入数据过程中,生成过多零碎的小文件(请回忆昨天讲的动态分区的逻辑)
不合理的Reducer Task数量的设置也会造成小文件的生成,因为最终Reducer是将数据落地到HDFS中的
解决方案:
在数据源头HDFS中控制小文件产生的个数,比如
采用Sequencefile作为表存储格式,不要用textfile,在一定程度上可以减少小文件(常见于在流计算的时候采用Sequencefile格式进行存储)
减少reduce的数量(可以使用参数进行控制)
慎重使用动态分区,最好在分区中指定分区字段的val值
最好数据的校验工作,比如通过脚本方式检测hive表的文件数量,并进行文件合并
合并多个文件数据到一个文件中,重新构建表

3. 慎重使用SELECT *

原因如下:
在大数据量多字段的数据表中,如果使用 SELECT * 方式去查询数据,会造成很多无效数据的处理,会占用程序资源,造成资源的浪费
解决方案:
在查询数据表时,指定所需的待查字段名,而非使用 * 号

4. 减少数据集,不要在表关联后面加WHERE条件

原因如下:
比如以下语句:
SELECT * FROM stu as t
LEFT JOIN course as t1
ON t.id=t2.stu_id
WHERE t.age=18;
请思考上面语句是否具有优化的空间? 如何优化?
解决方案:
采用谓词下推的技术,提早进行过滤有可能减少必须在数据库分区之间传递的数据量
谓词下推的解释:
所谓谓词下推就是通过嵌套的方式,将底层查询语句尽量推到数据底层去过滤,这样在上层应用中就可以使用更少的数据量来查询,这种SQL技巧被称为谓词下推(Predicate pushdown)
那么上面语句就可以采用这种方式来处理:
SELECT * FROM (SELECT * FROM stu WHERE age=18) as t
LEFT JOIN course AS t1
on t.id=t1.stu_id

5. 处理掉字段中带有空值的数据

原因如下:
一个表内有许多空值时会导致MapReduce过程中,空成为一个key值,对应的会有大量的value值, 而一个key的value会一起到达reduce造成内存不足
解决方式:
1、在查询的时候,过滤掉所有为NULL的数据,比如:
create table res_tbl as
select n.* from
(select * from res where id is not null ) n
left join org_tbl o on n.id = o.id;
2、查询出空值并给其赋上随机数,避免了key值为空(数据倾斜中常用的一种技巧)
create table res_tbl as
select n.* from res n
full join org_tbl o on
case when n.id is null then concat('hive', rand()) else n.id end = o.id;

6. 设置并行执行任务数

通过设置参数 hive.exec.parallel 值为 true,就可以开启并发执行。 不过,在共享集群中,需要注意下,如果 job 中并行阶段增多,那么集群利用率就会增加。
set hive.exec.parallel=true; //打开任务并行执行
set hive.exec.parallel.thread.number=16; //同一个 sql 允许最大并行度,默认为 8

7. 设置合理的Reducer个数

原因如下:
过多的启动和初始化 reduce 也会消耗时间和资源
有多少个Reduer就会有多少个文件产生,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题
解决方案:
Reducer设置的原则:
每个Reduce处理的数据默认是256MB
hive.exec.reducers.bytes.per.reducer=256000000
每个任务最大的reduce数,默认为1009
hive.exec.reducers.max=1009
计算reduce数的公式
N=min(参数2,总输入数据量/参数1)
设置Reducer的数量
set mapreduce.job.reduces=n

8. 解决一个reduce问题

Hive一个Reduce原因如下:
使用了Order by (Order By是会进行全局排序)
直接COUNT(1),没有加GROUP BY,比如: SELECT COUNT(1) FROM tbl WHERE pt=’201909’
有笛卡尔积操作,原因是Hive对笛卡尔积支持较弱,因为找不到join key
解决方案:
避免使用全局排序,可以使用sort by进行局部排序
使用GROUP BY进行统计,不会进行全局排序,比如:
SELECT pt,COUNT(1) FROM tbl WHERE pt=’201909’;

9. 选择使用本地模式

有时候Hive处理的数据量非常小,那么在这种情况下,为查询出发执行任务的时间消耗可能会比实际job的执行时间要长,对于大多数这种情况,hive可以通过本地模式在单节点上处理所有任务,对于小数据量任务可以大大的缩短时间
可以通过
hive.exec.mode.local.auto=true

10. Sort by替代Order by

Order by(全局排序,一个reducer) 效率比Sort by(不是全局排序,其在数据进入reducer前完成排序)低,窗口函数使用: row_number()over(distribute BY uid sort by dt desc)
使用sort by 你可以指定执行的reduce 个数 (set mapred.reduce.tasks=<number>)
对输出的数据再执行归并排序,即可以得到全部结果

11. cluster by问题

排序只能是升序排序(默认排序规则),不能指定排序规则为asc 或者desc。

12. 建设中间表

13. 调整Job优先级

14. 解决笛卡尔积