Spark Web UI及任务优化实战-数据倾斜的定位与处理

什么是数据倾斜?

  • 并行处理的数据集中,某一部分(如Spark的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。
  • 造成数据倾斜的原因有很多,其中一个常见场景是一张表中少量key的记录条数特别多,导致在做Join的时候,同一key的大量数据被一个task执行,导致这个task执行耗时过长。
  • 数据倾斜不仅可以由join导致,也会有group by,partition by,count(distinct)等需要对数据进行重分区的场景导致。

数据倾斜的表现

  • 任务运行几个小时甚至数十个小时
  • 任务OOM

需要进一步定位

数据倾斜排查思路

Join数据倾斜解决思路

无效Key Join
  • 无需保留时:null值、0,-1,-9999等无业务含义的值,如果不需要保留或者是inner join 可以直接过滤

原始伪代码:

select  order_id,
        order_number,
        a.sku_id,
        user_id,
        sku_name,
        use_date,
        sku_price
from    (
            select  order_id,
                    order_number,
                    sku_id,
                    user_id
            from    y_life.dws_order_sku_df
            where   date='${date}'
        ) t1
left join
        (
            select  sku_id,
                    sku_name,
                    use_date,
                    sku_price
            from    y_life.dim_product_sku_info
            where   date='${date}'
        ) t2
on      t1.sku_id=t2.sku_id

y_life.dws_order_sku_df的sku_id有大量为-1,null的数据,业务不需要,可以直接过滤

优化后:

select  order_id,
        order_number,
        a.sku_id,
        user_id,
        sku_name,
        use_date,
        sku_price
from    (
            select  order_id,
                    order_number,
                    sku_id,
                    user_id
            from    y_life.dws_order_sku_df
            where   date='${date}'
            and nvl(sku_id,-1) > 0 
        ) t1
left join
        (
            select  sku_id,
                    sku_name,
                    use_date,
                    sku_price
            from    y_life.dim_product_sku_info
            where   date='${date}'
        ) t2
on      t1.sku_id=t2.sku_id
select  order_id,
        order_number,
        a.sku_id,
        user_id,
        sku_name,
        use_date,
        sku_price
from    (
            select  order_id,
                    order_number,
                    sku_id,
                    user_id
            from    y_life.dws_order_sku_df
            where   date='${date}'
        ) t1
inner join
        (
            select  sku_id,
                    sku_name,
                    use_date,
                    sku_price
            from    y_life.dim_product_sku_info
            where   date='${date}'
        ) t2
on      t1.sku_id=t2.sku_id
  • 需要保留时:需要左表所有数据,根据关联key获取属性值,此时可以将无效值进行随机打散
select  order_id,
        order_number,
        a.sku_id,
        user_id,
        sku_name,
        use_date,
        sku_price
from    (
            select  order_id,
                    order_number,
                    sku_id,
                    user_id
            from    y_life.dws_order_sku_df
            where   date='${date}'
        ) t1
left join
        (
            select  sku_id,
                    sku_name,
                    use_date,
                    sku_price
            from    y_life.dim_product_sku_info
            where   date='${date}'
        ) t2
on if(t1.sku_id is null,order_id*-1,t1.sku_id) = t2.sku_id

注意事项:

1.避免随机打散的值跟右表关联的上,比如右表也是正整数,那么左表随机打散不能使用正整数,避免碰撞上

2.如果表中没有分布较为均匀的key, 避免直接使用rand(),因为 spark task 失败后重试会导致rand()生成跟之前不一样的数据

有效Key Join
  • 大表 join 小表

采用Map join 绕过shuffle, 即原本需要走 shuffle join(SortMergeJoin、ShuffledHashJoin) 的数据 让它走 map join(BroadcastHashJoin)

--设置广播表的大小
set spark.sql.autoBroadcastJoinThreshold = 100M;
--100M的左右表使用此方式,如果广播表过大,广播表的时间可能会比之前的运行时间还长

使用hit语法 /*+ MAPJOIN(XX) */

 select 
         /*+MAPJOIN(t2)*/ 
        order_id,
        order_number,
        sku_id,
        user_id,
        t1.account_id,
        account_name,
        account_status
from    (
            select  order_id,
                    order_number,
                    sku_id,
                    user_id,
                    account_id
            from    y_life.dws_order_sku_df
            where   date='${date}'
        ) t1
left join
        (
            select  account_id,
                    account_name,
                    account_status
            from    y_life.dim_account_info
            where   date='${date}'
        ) t2
on      t1.account_id=t2.account_id
  • 大表Join大表提高shuffle并发参数优化:通过参数调整并行度让key的分布更加分散。这样能让key分布略微均匀
  • 倍数扩容
  • Key拆分冷热Key分离: 如果倾斜的Key比较少,可以很容易的分离出来,可以采用单独提取热Key。缺点开发繁琐,后续维护成本高。优势处理倾斜比较有效

    基于订单的生成的热点商家表(Topn)

select  account_id
from    (
            select  account_id,
                    row_number() over (
                        order by
                                ord_cnt desc
                    ) as ord_rank
            from    (
                        select 
                                -- 每个商家的订单量
                                account_id,
                                count(distinct order_id) as ord_cnt
                        from    y_life.dws_order_sku_df
                        where   date='${date}'
                        group by
                                account_id
                    ) tmp1
        ) tmp2
where   tmp2.ord_rank<=100

    冷热key拆分+Union

with hot_account as (
    select  account_id
    from    y_life.dim_hot_account_info
    where   date='${date}'
)
select  order_id,
        order_number,
        sku_id,
        user_id,
        t1.account_id,
        account_name,
        account_status
from    (
            select  order_id,
                    order_number,
                    sku_id,
                    user_id,
                    account_id
            from    y_life.dws_order_sku_df t1
            inner   join
                    hot_account t2
            on      t1.account_id=t2.account_id
            where   date='${date}'
        ) t1
left join
        (
            select  account_id,
                    account_name,
                    account_status
            from    y_life.dim_account_info
            where   date='${date}'
        ) t2
on      t1.account_id=t2.account_id
union all
select  order_id,
        order_number,
        sku_id,
        user_id,
        t1.account_id,
        account_name,
        account_status
from    (
            select  order_id,
                    order_number,
                    sku_id,
                    user_id,
                    account_id
            from    y_life.dws_order_sku_df t1
            left join
                    hot_account t2
            on      t1.account_id=t2.account_id
            where   t1.date='${date}'
            and     t2.account_id is null
        ) t1
left join
        (
            select  account_id,
                    account_name,
                    account_status
            from    y_life.dim_account_info
            where   date='${date}'
        ) t2
on      t1.account_id=t2.account_id
  • 倍数扩容:如果倾斜的key比较多,且无法通过冷热key拆分实现。可以通过倍数扩容,即膨胀维表数量,核心的思路增加并行度

倍数扩容:

select  order_id,
        order_number,
        sku_id,
        user_id,
        t1.account_id,
        account_name,
        account_status
from    (
            select  order_id,
                    order_number,
                    sku_id,
                    user_id,
                    account_id,
                    FLOOR(order_id % 5) as numkey
                    --FLOOR(rand() * 1000 % 5) as  numkey
            from    y_life.dws_order_sku_df
            where   date='${date}'
        ) t1
left join
        (
            select  account_id,
                    account_name,
                    account_status,
                    CAST(num as bigint) as numkey
            from    y_life.dim_account_info 
            lateral view EXPLODE(split('0,1,2,3,4', ',')) t1 as num
            where   date='${date}'
        ) t2
on      t1.account_id=t2.account_id
and     t1.numkey=t2.numkey

等价写法

select  order_id,
        order_number,
        sku_id,
        user_id,
        t1.account_id,
        account_name,
        account_status
from    (
            select  order_id,
                    order_number,
                    sku_id,
                    user_id,
                    account_id,
                    pmod(order_id,5) numkey
            from    y_life.dws_order_sku_df
            where   date='${date}'
        ) t1
left join
        (
            select  account_id,
                    account_name,
                    account_status,
                    pos as numkey
            from    y_life.dim_account_info 
                    lateral view posexplode((split(space(5), ' '))) t as pos,val
            where   date='${date}'
        ) t2
on      t1.account_id=t2.account_id
and     t1.numkey=t2.numkey

Row_number数据倾斜解决思路

  • 两阶段聚合:先局部做一次聚合,再全局聚合

原代码:求每个商家的首单商品ID,时间信息

select
sku_id,
create_time
from
(
select  order_id,
        order_number,
        sku_id,
        user_id,
        account_id,
        row_number() over (
            partition by
                    account_id
            order by
                    create_time asc
        ) as rn
from    y_life.dws_order_sku_df
where   date='${date}'
) tmp 
where tmp.rn =1
  • 优化方式一:

为使Map阶段中Partition各分组数据尽可能均匀,增加随机列,将其作为Partition中一参数。

select  sku_id,
        create_time
from    (
            select  sku_id,
                    create_time,
                    row_number() over (
                        partition by
                                account_id
                        order by
                                create_time asc
                    ) as rn_2
            from    (
                        select  order_id,
                                order_number,
                                sku_id,
                                user_id,
                                account_id,
                                row_number() over (
                                    partition by
                                            account_id,
                                            FLOOR(order_id%10)
                                    order by
                                            create_time asc
                                ) as rn_1
                        from    y_life.dws_order_sku_df
                        where   date='${date}'
                    ) tmp1
            where   tmp1.rn_1=1
        ) tmp2
where   tmp2.rn_2=1
  • 优化方式二:使用min、max + named_struct 替换
select  account_id,
        min_account.product_id as product_id,
        min_account.create_time as create_time
from    (
            select  account_id,
                    min(
                        named_struct(
                            'create_time',
                            create_time,
                            'product_id',
                            product_id
                        )
                    ) as min_account
            from    ies_life.dws_order_sku_df a
            where   date='${date}'
            group by
                    account_id
        ) tmp1
  • struct的排序其实是对其中多个字段的依次排序。将需要排序的字段放在struct参数的前面,后面依次放其他要获取的字段,如将create_time放在最前面,取struct的最小值也就是取create_time的最小值。
  • 使用min()函数,在map端就使用了min进行了初步的聚合,大大减少了shuffle的数据量和reduce的数据量。
  • 方式适用场景:分组中重复率高;只取分组中首次或末次的记录;

#牛客创作赏金赛##数据人的面试交流地##数据人offer决赛圈怎么选##数据开发工程师##java#
全部评论

相关推荐

要冲外企的祖国花朵很温柔:今年有签约礼盒嘛
点赞 评论 收藏
分享
评论
2
收藏
分享
牛客网
牛客企业服务