Spark Web UI及任务优化实战-数据倾斜的定位与处理
什么是数据倾斜?
- 并行处理的数据集中,某一部分(如Spark的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。
- 造成数据倾斜的原因有很多,其中一个常见场景是一张表中少量key的记录条数特别多,导致在做Join的时候,同一key的大量数据被一个task执行,导致这个task执行耗时过长。
- 数据倾斜不仅可以由join导致,也会有group by,partition by,count(distinct)等需要对数据进行重分区的场景导致。
数据倾斜的表现
- 任务运行几个小时甚至数十个小时
- 任务OOM
需要进一步定位
数据倾斜排查思路
Join数据倾斜解决思路
无效Key Join
- 无需保留时:null值、0,-1,-9999等无业务含义的值,如果不需要保留或者是inner join 可以直接过滤
原始伪代码:
select order_id, order_number, a.sku_id, user_id, sku_name, use_date, sku_price from ( select order_id, order_number, sku_id, user_id from y_life.dws_order_sku_df where date='${date}' ) t1 left join ( select sku_id, sku_name, use_date, sku_price from y_life.dim_product_sku_info where date='${date}' ) t2 on t1.sku_id=t2.sku_id
y_life.dws_order_sku_df的sku_id有大量为-1,null的数据,业务不需要,可以直接过滤
优化后:
select order_id, order_number, a.sku_id, user_id, sku_name, use_date, sku_price from ( select order_id, order_number, sku_id, user_id from y_life.dws_order_sku_df where date='${date}' and nvl(sku_id,-1) > 0 ) t1 left join ( select sku_id, sku_name, use_date, sku_price from y_life.dim_product_sku_info where date='${date}' ) t2 on t1.sku_id=t2.sku_id
select order_id, order_number, a.sku_id, user_id, sku_name, use_date, sku_price from ( select order_id, order_number, sku_id, user_id from y_life.dws_order_sku_df where date='${date}' ) t1 inner join ( select sku_id, sku_name, use_date, sku_price from y_life.dim_product_sku_info where date='${date}' ) t2 on t1.sku_id=t2.sku_id
- 需要保留时:需要左表所有数据,根据关联key获取属性值,此时可以将无效值进行随机打散
select order_id, order_number, a.sku_id, user_id, sku_name, use_date, sku_price from ( select order_id, order_number, sku_id, user_id from y_life.dws_order_sku_df where date='${date}' ) t1 left join ( select sku_id, sku_name, use_date, sku_price from y_life.dim_product_sku_info where date='${date}' ) t2 on if(t1.sku_id is null,order_id*-1,t1.sku_id) = t2.sku_id
注意事项:
1.避免随机打散的值跟右表关联的上,比如右表也是正整数,那么左表随机打散不能使用正整数,避免碰撞上
2.如果表中没有分布较为均匀的key, 避免直接使用rand(),因为 spark task 失败后重试会导致rand()生成跟之前不一样的数据
有效Key Join
- 大表 join 小表
采用Map join 绕过shuffle, 即原本需要走 shuffle join(SortMergeJoin、ShuffledHashJoin) 的数据 让它走 map join(BroadcastHashJoin)
--设置广播表的大小 set spark.sql.autoBroadcastJoinThreshold = 100M; --100M的左右表使用此方式,如果广播表过大,广播表的时间可能会比之前的运行时间还长
使用hit语法 /*+ MAPJOIN(XX) */
select /*+MAPJOIN(t2)*/ order_id, order_number, sku_id, user_id, t1.account_id, account_name, account_status from ( select order_id, order_number, sku_id, user_id, account_id from y_life.dws_order_sku_df where date='${date}' ) t1 left join ( select account_id, account_name, account_status from y_life.dim_account_info where date='${date}' ) t2 on t1.account_id=t2.account_id
- 大表Join大表提高shuffle并发参数优化:通过参数调整并行度让key的分布更加分散。这样能让key分布略微均匀
- 倍数扩容
- Key拆分冷热Key分离: 如果倾斜的Key比较少,可以很容易的分离出来,可以采用单独提取热Key。缺点开发繁琐,后续维护成本高。优势处理倾斜比较有效
基于订单的生成的热点商家表(Topn)
select account_id from ( select account_id, row_number() over ( order by ord_cnt desc ) as ord_rank from ( select -- 每个商家的订单量 account_id, count(distinct order_id) as ord_cnt from y_life.dws_order_sku_df where date='${date}' group by account_id ) tmp1 ) tmp2 where tmp2.ord_rank<=100
冷热key拆分+Union
with hot_account as ( select account_id from y_life.dim_hot_account_info where date='${date}' ) select order_id, order_number, sku_id, user_id, t1.account_id, account_name, account_status from ( select order_id, order_number, sku_id, user_id, account_id from y_life.dws_order_sku_df t1 inner join hot_account t2 on t1.account_id=t2.account_id where date='${date}' ) t1 left join ( select account_id, account_name, account_status from y_life.dim_account_info where date='${date}' ) t2 on t1.account_id=t2.account_id union all select order_id, order_number, sku_id, user_id, t1.account_id, account_name, account_status from ( select order_id, order_number, sku_id, user_id, account_id from y_life.dws_order_sku_df t1 left join hot_account t2 on t1.account_id=t2.account_id where t1.date='${date}' and t2.account_id is null ) t1 left join ( select account_id, account_name, account_status from y_life.dim_account_info where date='${date}' ) t2 on t1.account_id=t2.account_id
- 倍数扩容:如果倾斜的key比较多,且无法通过冷热key拆分实现。可以通过倍数扩容,即膨胀维表数量,核心的思路增加并行度
倍数扩容:
select order_id, order_number, sku_id, user_id, t1.account_id, account_name, account_status from ( select order_id, order_number, sku_id, user_id, account_id, FLOOR(order_id % 5) as numkey --FLOOR(rand() * 1000 % 5) as numkey from y_life.dws_order_sku_df where date='${date}' ) t1 left join ( select account_id, account_name, account_status, CAST(num as bigint) as numkey from y_life.dim_account_info lateral view EXPLODE(split('0,1,2,3,4', ',')) t1 as num where date='${date}' ) t2 on t1.account_id=t2.account_id and t1.numkey=t2.numkey
等价写法
select order_id, order_number, sku_id, user_id, t1.account_id, account_name, account_status from ( select order_id, order_number, sku_id, user_id, account_id, pmod(order_id,5) numkey from y_life.dws_order_sku_df where date='${date}' ) t1 left join ( select account_id, account_name, account_status, pos as numkey from y_life.dim_account_info lateral view posexplode((split(space(5), ' '))) t as pos,val where date='${date}' ) t2 on t1.account_id=t2.account_id and t1.numkey=t2.numkey
Row_number数据倾斜解决思路
- 两阶段聚合:先局部做一次聚合,再全局聚合
原代码:求每个商家的首单商品ID,时间信息
select sku_id, create_time from ( select order_id, order_number, sku_id, user_id, account_id, row_number() over ( partition by account_id order by create_time asc ) as rn from y_life.dws_order_sku_df where date='${date}' ) tmp where tmp.rn =1
- 优化方式一:
为使Map阶段中Partition各分组数据尽可能均匀,增加随机列,将其作为Partition中一参数。
select sku_id, create_time from ( select sku_id, create_time, row_number() over ( partition by account_id order by create_time asc ) as rn_2 from ( select order_id, order_number, sku_id, user_id, account_id, row_number() over ( partition by account_id, FLOOR(order_id%10) order by create_time asc ) as rn_1 from y_life.dws_order_sku_df where date='${date}' ) tmp1 where tmp1.rn_1=1 ) tmp2 where tmp2.rn_2=1
- 优化方式二:使用min、max + named_struct 替换
select account_id, min_account.product_id as product_id, min_account.create_time as create_time from ( select account_id, min( named_struct( 'create_time', create_time, 'product_id', product_id ) ) as min_account from ies_life.dws_order_sku_df a where date='${date}' group by account_id ) tmp1
- struct的排序其实是对其中多个字段的依次排序。将需要排序的字段放在struct参数的前面,后面依次放其他要获取的字段,如将create_time放在最前面,取struct的最小值也就是取create_time的最小值。
- 使用min()函数,在map端就使用了min进行了初步的聚合,大大减少了shuffle的数据量和reduce的数据量。
- 方式适用场景:分组中重复率高;只取分组中首次或末次的记录;