什么是数据倾斜
当数据分布不均匀,集中在少数几个 Key上,进行处理时,数据和运算会集中在少数上,造成数据热点,称之为数据倾斜。
当数据倾斜出现时,会出现任务进度长时间维持在99%(或100%)上,一直卡在少量 Reduce 任务上,甚至因为超时被杀掉。
如何判断数据倾斜
1. 通过任务时间判断
如果个别Reduce的时间比其它Reduce时间长的多,例如大部分Reduce任务都在5分钟之内完成,而个别Reduce任务超过30分钟还未完成,就可能存在数据倾斜的情况。
注意,如果Reduce任务执行很长,但每个Reduce任务执行时间都差不多,则有可能Reduce个数过少导致的,可以适当设置更多的Reduce个数。
另外,有时候某些节点有问题,也会导致分配在上面的Reduce 任务执行得很慢。当这种情况发生,MapReduce 的推车执行特性会重启一个任务。如果新任务很快完成,那可能是节点问题。如果新任务也运行很久,则有可能是数据倾斜问题。
2. 通过任务Counter判断
可以到任务的 AM 查看各个 Reduce 任务的输入记录数和输出字符数的Counter 。如果个别任务和其它任务的数量差别很大,则可能发生了数据倾斜问题。
如何定位数据倾斜
1. 定位对应大Key
Hive在执行join操作的时候,会打印join相关日志。我们可以通过日志查找数据量大的Key:
1) 在 AM 找到运行特别慢的任务,打开相应日志。
2) 搜索日志中出现的 “rows for joinkey”, 可以看到 Key 取值。
3) 找到时间跨度比较大的记录,对应 Key 值便是大 Key。
2 定位任务卡住的Stage
1)通过Job Name定位 Stage
Hive会默认在Job Name 带相应的Stage阶段。我们可以在AM找到这样运行慢的任务,然后根据Job Name找出相应的Stage。
2)从任务日志定位 Stage
如果Job Name是自定义的,可以通过任务日志找到相应的Stage。在AM找到这样运行慢的任务,打开日志,搜索 “CommonJoinOperator: JOIN struct”。
例如下图的任务日志,关键信息是struct<_col1:string,_col6:string>。
参考 SQL 执行计划,可以定位该阶段为Stage-1。
3. 定位SQL代码问题点
结合上述步骤找到的大 Key 和 Stage,我们可以推测出哪段代码执行时出现了数据倾斜。
如何解决数据倾斜
1. 过滤脏数据
如果大 Key 是无意义的数据,可以直接过滤掉,避免数据倾斜
2. 重写业务逻辑
结合具体场景重写 SQL。例如下面的SQL,日志表与用户表通过user_id关联,可能导致user_id为null的发生数据倾斜。
SELECT a.xx, b.yy FROM
log a
JOIN
users b ON a.user_id = b.user_id
此时可以改写 SQL,把日志表中user_id= null的数据单独处理,如下:
SELECT a.xx, b.yy FROM
log a
JOIN
users b ON a.user_id IS
NOT
NULL
AND a.user_id = b.user_id
UNION
ALL
SELECT a.xx, NULL
AS yy FROM
log a WHERE a.user_id IS
NULL;
3. Map 局部聚合
对于聚合操作(GROUP BY)引起的数据倾斜,可以设置:
// 控制在GROUP BY的时候是否Map局部聚合
set hive.map.aggr=true;
// 判断是否需要进行Map局部聚合。预先取100000条数据聚合,如果聚合后的条
// 数/100000>0.5,则不聚合
set hive.groupby.mapaggr.checkinterval=100000;
set hive.map.aggr.hash.min.reduction=0.5;
4. hive.groupby.skewindata
对于聚合查询(GROUP BY)引起的数据倾斜,可以设置:
set hive.groupby.skewindata=true;
该参数会启动两个MR Job,第一个Job先不按GROUP BY字段分发,而是随机分发做一次聚合.然后启动第二个Job,拿前面聚合过的数据按GROUP BY字段分发计算出最终结果。不过,对于多列去重操作,无法使用这个配置,会报错。因为对于去重操作,第一个 Job 的随机分发,需要根据去重列哈希,聚合后的去重结果,在第二个 Job 累加起来,才是正确的整体去重统计结果。如果多列去重,就无法随机分发。
5. hive.optimize.skewjoin
对于联合查询(JOIN)引起的数据倾斜,可以设置:
set hive.optimize.skewjoin=true;
set hive.skewjoin.key=100000;
记录超过hive.skewjoin.key(默认100000)阈值的key值先写入HDFS,然后再进行一个map join的job任务,最终和其他key值的结果合并为最终结果。不过,该参数对full outer join无效。