Spark Web UI及任务优化实战-数据倾斜的定位与处理
什么是数据倾斜?
- 并行处理的数据集中,某一部分(如Spark的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。
- 造成数据倾斜的原因有很多,其中一个常见场景是一张表中少量key的记录条数特别多,导致在做Join的时候,同一key的大量数据被一个task执行,导致这个task执行耗时过长。
- 数据倾斜不仅可以由join导致,也会有group by,partition by,count(distinct)等需要对数据进行重分区的场景导致。
数据倾斜的表现
- 任务运行几个小时甚至数十个小时
- 任务OOM
需要进一步定位
数据倾斜排查思路
Join数据倾斜解决思路
无效Key Join
- 无需保留时:null值、0,-1,-9999等无业务含义的值,如果不需要保留或者是inner join 可以直接过滤
原始伪代码:
select order_id,
order_number,
a.sku_id,
user_id,
sku_name,
use_date,
sku_price
from (
select order_id,
order_number,
sku_id,
user_id
from y_life.dws_order_sku_df
where date='${date}'
) t1
left join
(
select sku_id,
sku_name,
use_date,
sku_price
from y_life.dim_product_sku_info
where date='${date}'
) t2
on t1.sku_id=t2.sku_id
y_life.dws_order_sku_df的sku_id有大量为-1,null的数据,业务不需要,可以直接过滤
优化后:
select order_id,
order_number,
a.sku_id,
user_id,
sku_name,
use_date,
sku_price
from (
select order_id,
order_number,
sku_id,
user_id
from y_life.dws_order_sku_df
where date='${date}'
and nvl(sku_id,-1) > 0
) t1
left join
(
select sku_id,
sku_name,
use_date,
sku_price
from y_life.dim_product_sku_info
where date='${date}'
) t2
on t1.sku_id=t2.sku_id
select order_id,
order_number,
a.sku_id,
user_id,
sku_name,
use_date,
sku_price
from (
select order_id,
order_number,
sku_id,
user_id
from y_life.dws_order_sku_df
where date='${date}'
) t1
inner join
(
select sku_id,
sku_name,
use_date,
sku_price
from y_life.dim_product_sku_info
where date='${date}'
) t2
on t1.sku_id=t2.sku_id
- 需要保留时:需要左表所有数据,根据关联key获取属性值,此时可以将无效值进行随机打散
select order_id,
order_number,
a.sku_id,
user_id,
sku_name,
use_date,
sku_price
from (
select order_id,
order_number,
sku_id,
user_id
from y_life.dws_order_sku_df
where date='${date}'
) t1
left join
(
select sku_id,
sku_name,
use_date,
sku_price
from y_life.dim_product_sku_info
where date='${date}'
) t2
on if(t1.sku_id is null,order_id*-1,t1.sku_id) = t2.sku_id
注意事项:
1.避免随机打散的值跟右表关联的上,比如右表也是正整数,那么左表随机打散不能使用正整数,避免碰撞上
2.如果表中没有分布较为均匀的key, 避免直接使用rand(),因为 spark task 失败后重试会导致rand()生成跟之前不一样的数据
有效Key Join
- 大表 join 小表
采用Map join 绕过shuffle, 即原本需要走 shuffle join(SortMergeJoin、ShuffledHashJoin) 的数据 让它走 map join(BroadcastHashJoin)
--设置广播表的大小 set spark.sql.autoBroadcastJoinThreshold = 100M; --100M的左右表使用此方式,如果广播表过大,广播表的时间可能会比之前的运行时间还长
使用hit语法 /*+ MAPJOIN(XX) */
select
/*+MAPJOIN(t2)*/
order_id,
order_number,
sku_id,
user_id,
t1.account_id,
account_name,
account_status
from (
select order_id,
order_number,
sku_id,
user_id,
account_id
from y_life.dws_order_sku_df
where date='${date}'
) t1
left join
(
select account_id,
account_name,
account_status
from y_life.dim_account_info
where date='${date}'
) t2
on t1.account_id=t2.account_id
- 大表Join大表提高shuffle并发参数优化:通过参数调整并行度让key的分布更加分散。这样能让key分布略微均匀
- 倍数扩容
- Key拆分冷热Key分离: 如果倾斜的Key比较少,可以很容易的分离出来,可以采用单独提取热Key。缺点开发繁琐,后续维护成本高。优势处理倾斜比较有效
基于订单的生成的热点商家表(Topn)
select account_id
from (
select account_id,
row_number() over (
order by
ord_cnt desc
) as ord_rank
from (
select
-- 每个商家的订单量
account_id,
count(distinct order_id) as ord_cnt
from y_life.dws_order_sku_df
where date='${date}'
group by
account_id
) tmp1
) tmp2
where tmp2.ord_rank<=100
冷热key拆分+Union
with hot_account as (
select account_id
from y_life.dim_hot_account_info
where date='${date}'
)
select order_id,
order_number,
sku_id,
user_id,
t1.account_id,
account_name,
account_status
from (
select order_id,
order_number,
sku_id,
user_id,
account_id
from y_life.dws_order_sku_df t1
inner join
hot_account t2
on t1.account_id=t2.account_id
where date='${date}'
) t1
left join
(
select account_id,
account_name,
account_status
from y_life.dim_account_info
where date='${date}'
) t2
on t1.account_id=t2.account_id
union all
select order_id,
order_number,
sku_id,
user_id,
t1.account_id,
account_name,
account_status
from (
select order_id,
order_number,
sku_id,
user_id,
account_id
from y_life.dws_order_sku_df t1
left join
hot_account t2
on t1.account_id=t2.account_id
where t1.date='${date}'
and t2.account_id is null
) t1
left join
(
select account_id,
account_name,
account_status
from y_life.dim_account_info
where date='${date}'
) t2
on t1.account_id=t2.account_id
- 倍数扩容:如果倾斜的key比较多,且无法通过冷热key拆分实现。可以通过倍数扩容,即膨胀维表数量,核心的思路增加并行度
倍数扩容:
select order_id,
order_number,
sku_id,
user_id,
t1.account_id,
account_name,
account_status
from (
select order_id,
order_number,
sku_id,
user_id,
account_id,
FLOOR(order_id % 5) as numkey
--FLOOR(rand() * 1000 % 5) as numkey
from y_life.dws_order_sku_df
where date='${date}'
) t1
left join
(
select account_id,
account_name,
account_status,
CAST(num as bigint) as numkey
from y_life.dim_account_info
lateral view EXPLODE(split('0,1,2,3,4', ',')) t1 as num
where date='${date}'
) t2
on t1.account_id=t2.account_id
and t1.numkey=t2.numkey
等价写法
select order_id,
order_number,
sku_id,
user_id,
t1.account_id,
account_name,
account_status
from (
select order_id,
order_number,
sku_id,
user_id,
account_id,
pmod(order_id,5) numkey
from y_life.dws_order_sku_df
where date='${date}'
) t1
left join
(
select account_id,
account_name,
account_status,
pos as numkey
from y_life.dim_account_info
lateral view posexplode((split(space(5), ' '))) t as pos,val
where date='${date}'
) t2
on t1.account_id=t2.account_id
and t1.numkey=t2.numkey
Row_number数据倾斜解决思路
- 两阶段聚合:先局部做一次聚合,再全局聚合
原代码:求每个商家的首单商品ID,时间信息
select
sku_id,
create_time
from
(
select order_id,
order_number,
sku_id,
user_id,
account_id,
row_number() over (
partition by
account_id
order by
create_time asc
) as rn
from y_life.dws_order_sku_df
where date='${date}'
) tmp
where tmp.rn =1
- 优化方式一:
为使Map阶段中Partition各分组数据尽可能均匀,增加随机列,将其作为Partition中一参数。
select sku_id,
create_time
from (
select sku_id,
create_time,
row_number() over (
partition by
account_id
order by
create_time asc
) as rn_2
from (
select order_id,
order_number,
sku_id,
user_id,
account_id,
row_number() over (
partition by
account_id,
FLOOR(order_id%10)
order by
create_time asc
) as rn_1
from y_life.dws_order_sku_df
where date='${date}'
) tmp1
where tmp1.rn_1=1
) tmp2
where tmp2.rn_2=1
- 优化方式二:使用min、max + named_struct 替换
select account_id,
min_account.product_id as product_id,
min_account.create_time as create_time
from (
select account_id,
min(
named_struct(
'create_time',
create_time,
'product_id',
product_id
)
) as min_account
from ies_life.dws_order_sku_df a
where date='${date}'
group by
account_id
) tmp1
- struct的排序其实是对其中多个字段的依次排序。将需要排序的字段放在struct参数的前面,后面依次放其他要获取的字段,如将create_time放在最前面,取struct的最小值也就是取create_time的最小值。
- 使用min()函数,在map端就使用了min进行了初步的聚合,大大减少了shuffle的数据量和reduce的数据量。
- 方式适用场景:分组中重复率高;只取分组中首次或末次的记录;
