数据倾斜【数据仓库系列05】
在Map端读数据时,由于读人数据的文件大小分布不均匀,因此会导致有些Map Instance 读取并且处理的数据特别多,而有些Map Instance 处理的数据特别少,造成Map端长尾。
以下两种情况可能会导致Map端长尾:
上游表文件的大小特别不均匀,并且小文件特别多,导致当前表Map端读取的数据分布不均匀,引起长尾。
Map端做聚合时,由于某些Map Instance读取文件的某个值特别多而引起长尾,主要是指Count Distinct操作。
第一种情况导致的Map 端长尾,可通过对上游合并小文件,同时调节本节点的小文件的参数来进行优化。
第二种情况可以使用“ distribute by rand ()”来打乱数据分布,使数据尽可能分布均匀。
hive.map.aggr = true:
map端聚集操作;
hive.groupby.skewindata = true:
数据倾斜时负载均衡,启动两个MR,随机打散Key。
在写人磁盘之前,线程首先根据Reduce Instance 的个数划分分区,数据将会根据Key 值Hash 到不同的分区上,一个Reduce Instance 对应一个分区的数据。
Map 端也会做部分聚合操作,以减少输入Reduce 端的数据量。
由于数据是根据Hash 分配的,因此也会导致有些Reduce Instance 会分配到大量数据,而有些Reduce Instance 却分配到很少数据,甚至没有分配到数据。
Map 端长尾的根本原因是由于读人的文件块的数据分布不均匀,再加上UDF 函数性能、Join 、聚合操作等,导致读人数据量大的Map lnstance 耗时较长。
在开发过程中如果遇到Map 端长尾的情况,首先考虑如何让Map Instance 读取的数据量足够均匀,然后判断是哪些操作导致Map Instance 比较慢,最后考虑这些操作是否必须在Map 端完成,在其他阶段是否会做得更好。
SQL在Join 执行阶段会将Join Key相同的数据分发到同一个执行Instance上处理。
如果某个Key 上的数据量比较大,则会导致该Instance 执行时间较长。
其表现为:
在执行日志中该Join Task 的大部分Instance 都已执行完成,但少数几个Instance 一直处于执行中(这种现象称之为长尾)。
Join的某路输入比较小,可以采用Map Join,避免分发引起长尾(Map Join的原理是将Join操作提前到Map 端执行,将小表读人内存,顺序扫描大表完成Join,这样可以避免因为分发key不均匀导致数据倾斜,MapJoin 的使用方法非常简单,在代码中select 后加上“/*+mapjoin(a) */”即可,其中a 代表小表的别名)。
Join的每路输入都较大,且长尾是空值导致的,可以将空值处理成随机值,避免聚集。
Join的每路输入都较大,且长尾是热点值导致的,可以对热点值和非热点值分别进行处理,再合并数据。
Reduce端负责的是对Map端梳理后的有序key-value键值对进行聚合,即进行Count、Sum 、Avg等聚合操作,得到最终聚合的结果。
Distinct是MaxCompute SQL中支持的语法,用于对字段去重。
比如计算在某个时间段内支付买家数、访问UV等,都是需要用Distinct进行去重的。
MaxCompute中Distinct的执行原理是将需要去重的字段以及Group By字段联合作为key 将数据分发到Reduce端。
因为Distinct 操作,数据无法在Map 端的Shuffle阶段根据Group By先做一次聚合操作,以减少传输的数据量,而是将所有的数据都传输到Reduce端,当key的数据分发不均匀时,就会导致Reduce端长尾。
Reduce端产生长尾的主要原因就是key的数据分布不均匀。
比如有些Reduce任务Instance处理的数据记录多,有些处理的数据记录少,造成Reduce端长尾。
如下几种情况会造成Reduce 端长尾:
对同一个表按照维度对不同的列进行Count Distinct操作,造成Map端数据膨胀,从而使得下游的Join和Reduce出现链路上的长尾。
Map端直接做聚合时出现key值分布不均匀,造成Reduce端长尾。
动态分区数过多时可能造成小文件过多,从而引起Reduce端长尾。
多个Distinct同时出现在一段SQL代码中时,数据会被分发多次,不仅会造成数据膨胀N倍,还会把长尾现象放大N倍。
嵌套编写,先Group By再Count(*),如果group by维度过小,采用sum() group by的方式来替换count(distinct)完成计算;
可以对热点key进行单独处理,然后通过“ Union All ”合并;
可以把符合不同条件的数据放到不同的分区,避免通过多次“Insert Overwrite”,写人表中,特别是分区数比较多时,能够很好地简化代码;
在把不同指标Join 在一起之前, 一定要确保指标的粒度是原始表的数据粒度;
当代码比较臃肿时,也可以将上述子查询落到中间表里。
distinct会将b列所有的数据保存到内存中,形成一个类似hash的结构,速度是十分的块;
但是在大数据背景下,因为b列所有的值都会形成以key值,极有可能发生OOM
所以,可以考虑使用Group By 或者 ROW_NUMBER() OVER(PARTITION BY col)方式代替COUNT(DISTINCT col)
众所周知,小文件在HDFS中存储本身就会占用过多的内存空间,那么对于MR查询过程中过多的小文件又会造成启动过多的Mapper Task, 每个Mapper都是一个后台线程,会占用JVM的空间
在Hive中,动态分区会造成在插入数据过程中,生成过多零碎的小文件(请回忆昨天讲的动态分区的逻辑)
不合理的Reducer Task数量的设置也会造成小文件的生成,因为最终Reducer是将数据落地到HDFS中的
采用Sequencefile作为表存储格式,不要用textfile,在一定程度上可以减少小文件(常见于在流计算的时候采用Sequencefile格式进行存储)
慎重使用动态分区,最好在分区中指定分区字段的val值
最好数据的校验工作,比如通过脚本方式检测hive表的文件数量,并进行文件合并
在大数据量多字段的数据表中,如果使用 SELECT * 方式去查询数据,会造成很多无效数据的处理,会占用程序资源,造成资源的浪费
在查询数据表时,指定所需的待查字段名,而非使用 * 号
4. 减少数据集,不要在表关联后面加WHERE条件
采用谓词下推的技术,提早进行过滤有可能减少必须在数据库分区之间传递的数据量
所谓谓词下推就是通过嵌套的方式,将底层查询语句尽量推到数据底层去过滤,这样在上层应用中就可以使用更少的数据量来查询,这种SQL技巧被称为谓词下推(Predicate pushdown)
SELECT * FROM (SELECT * FROM stu WHERE age=18) as t
一个表内有许多空值时会导致MapReduce过程中,空成为一个key值,对应的会有大量的value值, 而一个key的value会一起到达reduce造成内存不足
1、在查询的时候,过滤掉所有为NULL的数据,比如:
(select * from res where id is not null ) n
left join org_tbl o on n.id = o.id;
2、查询出空值并给其赋上随机数,避免了key值为空(数据倾斜中常用的一种技巧)
case when n.id is null then concat('hive', rand()) else n.id end = o.id;
通过设置参数 hive.exec.parallel 值为 true,就可以开启并发执行。
不过,在共享集群中,需要注意下,如果 job 中并行阶段增多,那么集群利用率就会增加。
set hive.exec.parallel=true; //打开任务并行执行
set hive.exec.parallel.thread.number=16; //同一个 sql 允许最大并行度,默认为 8
过多的启动和初始化 reduce 也会消耗时间和资源
有多少个Reduer就会有多少个文件产生,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题
hive.exec.reducers.bytes.per.reducer=256000000
hive.exec.reducers.max=1009
set mapreduce.job.reduces=n
使用了Order by (Order By是会进行全局排序)
直接COUNT(1),没有加GROUP BY,比如:
SELECT COUNT(1) FROM tbl WHERE pt=’201909’
有笛卡尔积操作,原因是Hive对笛卡尔积支持较弱,因为找不到join key
避免使用全局排序,可以使用sort by进行局部排序
使用GROUP BY进行统计,不会进行全局排序,比如:
SELECT pt,COUNT(1) FROM tbl WHERE pt=’201909’;
有时候Hive处理的数据量非常小,那么在这种情况下,为查询出发执行任务的时间消耗可能会比实际job的执行时间要长,对于大多数这种情况,hive可以通过本地模式在单节点上处理所有任务,对于小数据量任务可以大大的缩短时间
hive.exec.mode.local.auto=true
Order by(全局排序,一个reducer) 效率比Sort by(不是全局排序,其在数据进入reducer前完成排序)低,窗口函数使用:
row_number()over(distribute BY uid sort by dt desc)
使用sort by 你可以指定执行的reduce 个数 (set mapred.reduce.tasks=<number>)
排序只能是升序排序(默认排序规则),不能指定排序规则为asc 或者desc。
标签: