常见算子的数据倾斜解决方案

推荐阅读文章列表

大数据开发面经汇总【持续更新...】

我的大数据学习之路

大数据开发面试笔记V6.0

前言

数据倾斜既是工作中最常见的一个问题,也是大数据求职面试中最常见的一个话题。

想必大家一定听说过数据倾斜,也知道很多的解决方案,但是你是否知道哪些算子最容易发生数据倾斜,对应的解决方案又有哪些呢?比如最常见的热门算子:join、group by、count(distinct)、row_number【数据倾斜出现频率从高至低】

本文将会从 数据倾斜定义、如何分析数据倾斜、解决方案 三个方面来剖析数据倾斜问题

数据倾斜定义

定义:通俗来讲,一张表中某个或某些特定值出现的频次远大于其他数值,这样就会导致某个或某些task处理的数据量远超过其他task,因此发生数据倾斜。

举例:下图是计算所有大学生每门技术的学习人数,可以发现学习hadoop的人数远大于spark,那么处理hadoop的task的压力就会更大,导致数据倾斜。

alt

如何分析数据倾斜

  1. 确诊问题(判断问题是不是数据倾斜)
  2. 定位问题(判断具体是哪段代码发生数据倾斜)
  3. 分析问题(分析造成数据倾斜的原因是什么)

alt

具体来讲:

  1. 打开sparkui界面,查看所有stage的执行时长,是否存在某个或某几个stage执行时长大大超过了其余stage的平均时长
  2. 获取执行时长最长的stage的id,到DAG图中进行搜索,找到对应的执行代码
  3. 通过Python/SQL分析代码涉及到所有表的shuffle key的分布,观察是否存在某个key分布较集中

解决方案

Join

两种情况,一种是大小表join,一种是大大表join

  • 大小表join:开启mapjoin即可
-- b是小表(MB级及以下) a是大表(GB及以上)
select /*+mapjoin(b)*/
    a.*
from a
left join b
on a.id = b.id
  • 大大表join:

【存在热点值】:拆分热点和非热点

-- 1.识别表a的热点key,并保存在tmp表中
-- 2.热点key和表b进行join
-- 3.非热点key和表b进行join
-- 4.将2和3的结果合并一起
select /*+mapjoin(a)*/a.id, a.name, b.score
from (select * from a where a.id in (select id from tmp)) a
join b
on a.id = b.id
union all
select a.id, a.name, b.score
from (select * from a where a.id not in (select id from tmp)) a
join b
on a.id = b.id

【不存在热点值】:采用分桶join

  create table t (
      a string,
      b string
  ) 
  partitioned by (dt string)
  clustered by (b) into 2048 buckets;

Group By

  • 加盐打散
-- 优化前sql
select id, count(*)
from t 
group by id
;
-- 优化后sql
select 
    t.id,
    sum(t.cnt)
from (
    select 
        id, 
        case when id in ('1001','1002') then cast(rand() * 100 as bigint), 
        count(*) as cnt
    from t
    group by 
        id, 
        case when id in ('1001','1002') then cast(rand() * 100 as bigint)
) t
group by t.id
;

Count Distinct

-- 优化前sql
select dt, count(distinct user_id) as cnt
from t
group by dt 
;
  • 方式1:两阶段聚合+加盐打散
select 
    split(rand_dt, '_')[1] as dt,
    count(*) as cnt
from (
    select 
        concat(cast(rand() * 10 as bigint), '_', dt) as rand_dt
        user_id
    from t
    group by 
        concat(cast(rand() * 10 as bigint), '_', dt)
        user_id
) t
group by split(rand_dt, '_')[1]
;
  • 方式2:构建bitmap
select 
    dt, 
    getcardinality(idbits) as cnt  -- 2. 计算基数
from (
    -- 1.转换为bit
    select 
        dt, 
        id2bit(user_id) as idbits
    from t
) t
group by dt

Row_number

  • 加盐打散
-- 需求:计算每个用户的成绩排名(假设大量用户id进入到同一个reduce进行计算)
-- 优化前sql
select 
    uid, score, 
    row_number() over(partition by uid order by score desc) rk
from t

-- 优化后sql
select 
    uid, score, 
    row_number() over(partition by uid, bucket order by score desc) rk
from (
    select 
        uid, score, cast(rand() * 10 as bigint) as bucket
    from t
)t

下期预告

关于数据治理,我有些话要说

#数据人的面试交流地##校招过来人的经验分享##2025届提前批#
全部评论
佬问一下最后row_number那里加盐打散partition by uid,bucket后结果与上一段partition by uid为什么相同呢
点赞
送花
回复 分享
发布于 06-30 17:21 河南

相关推荐

4 6 评论
分享
牛客网
牛客企业服务