vlambda博客
学习文章列表

【数仓】Hadoop、Hive 和 Spark 中数据倾斜的处理

  数据倾斜几乎是大数据开发的必考题。今天通过一篇文章来学习数据倾斜及其处理方法。

1.什么是数据倾斜

  对于分布式系统,大量的数据集中到一台或几台服务器上,称为数据倾斜。数据倾斜现象有两种,一是数据频率倾斜,某一区域的数据量远远大于其他区域;二是数据大小倾斜,部分记录的大小远远大于平均值。

  开发中的常见情况是出现了热点 key(重复的 key 大量出现)。默认情况下,Map 阶段同一个 key 的数据发给同一个 Reduce 处理,导致某一个 Reduce 程序消耗的资源和运行时间远大于其他 Reduce 程序。

  以 Spark 和 Hive 为例,在做数据运算的时候会涉及到,count distinct、group by、join on等操作,这些都会触发Shuffle动作。一旦触发Shuffle,所有相同key的值就会被拉到一个或几个Reducer节点上,容易发生单点计算问题,导致数据倾斜。

  除了 key 分布不均,建表时考虑不周也会导致数据倾斜。比如两张表要关联的字段,一个表中将空值定为 null,而另一个表却定义为 0。关联时就会卡死。另外业务激增也可能导致某几个 key 的数据量增加。

2.Hadoop 中的数据倾斜

2.1 Hadoop 中数据倾斜的表现
  • 有一个多几个Reduce卡住,卡在99.99%,一直不能结束。
  • 各种container报错OOM
  • 异常的Reducer读写的数据量极大,至少远远超过其它正常的Reducer
  • 伴随着数据倾斜,会出现任务被kill等各种诡异的表现。
2.1 Hadoop 中数据倾斜的处理
  1. 抽样和范围分区:对原始数据抽样得到的结果集来预设分区边界值。
  2. 自定义分区:对于热点 key 发送到一部分 Reduce 实例,其他的发送给剩余的 Reduce 实例。
  3. Combiner:可以聚合精简数据,大量减少数据倾斜。适合于Sum()求和,并不适合Average()求平均数。
  4. 局部聚合加全局聚合:第一次在map阶段对那些导致了数据倾斜的key 加上1到n的随机前缀,这样本来相同的key 也会被分到多个 Reducer 中进行局部聚合,数量就会大大降低。第二次 MapReduce,去掉 key 的随机前缀,进行全局聚合。

3.Hive 中的数据倾斜

3.1 Hive 中数据倾斜的表现

一般都发生在Sql中group by和join on上,而且和数据逻辑绑定比较深。

3.2 Hive 中数据倾斜的处理
  1. 先 group by 再 count 代替 count(distinct)。COUNT DISTINCT 操作只有一个 Reduce Task,数据量大时会导致整个 Job 很难完成。
  2. 采用 Map Join。
  3. 开启数据倾斜时的负载均衡:set hive.groupby.skewindata=true。当选项设定为true,生成的查询计划会有两个MR Job:
    • 第一个MR Job 中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;
    • 第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的原始 Group By Key被分布到同一个Reduce中),最后完成最终的聚合操作。
  4. 空 KEY 过滤:对于异常值如果不需要的话,最好是提前在 where 条件里过滤掉,这样可以使计算量大大减少。
  5. 控制空值分布:将为空的 key 转变为字符串加随机数或纯随机数,将因空值而造成倾斜的数据分不到多个 Reducer。

4.Spark 中的数据倾斜

4.1 Spark 中数据倾斜的表现
  • Executor lost,OOM,Shuffle 过程出错;
  • Driver OOM;
  • 单个 Executor 执行时间特别久,整体任务卡在某个阶段不能结束;
  • 正常运行的任务突然失败;
4.2 Spark 中数据倾斜的处理
  1. 使用 Hive 预处理:适用于导致数据倾斜的是Hive表,Hive 表中的数据本身不均匀。
  2. 过滤导致倾斜的 key:适用于只有少数几个热点 key,并且过滤之后对结果没影响。
  3. 提高 shuffle 操作的并行度。
  4. 两阶段聚合:局部 + 全局。
  5. 将reduce join转为map join:适用于一个 RDD 或表的数据量比较小,即一个大表和一个小表的情况。
  6. 采样倾斜 key 并分拆 join 操作。适用于两个表都很大,但其中某一个 RDD/Hive 表中的少数几个 key 的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均匀。采用计算出最多的几个 key,将这几个 key 对应的数据从原来的 RDD 中拆分出来,形成一个单独的 RDD,并给每个 key 都打上 n 以内的随机数作为前缀。
  7. 使用随机前缀和扩容 RDD 进行 join:适用于 RDD 中有大量的key导致数据倾斜。

5.总结

  解决数据倾斜的思路有业务逻辑层面、程序层面、调参层面,但都可以采用平台无关的方式,比如更好的数据预处理,异常值的过滤等。因此,解决数据倾斜的重点在于对数据设计和业务的理解。