常见算子的数据倾斜解决方案

推荐阅读文章列表

大数据开发面经汇总【持续更新...】

我的大数据学习之路

大数据开发面试笔记V6.0

前言

数据倾斜既是工作中最常见的一个问题,也是大数据求职面试中最常见的一个话题。

想必大家一定听说过数据倾斜,也知道很多的解决方案,但是你是否知道哪些算子最容易发生数据倾斜,对应的解决方案又有哪些呢?比如最常见的热门算子:join、group by、count(distinct)、row_number【数据倾斜出现频率从高至低】

本文将会从 数据倾斜定义、如何分析数据倾斜、解决方案 三个方面来剖析数据倾斜问题

数据倾斜定义

定义:通俗来讲,一张表中某个或某些特定值出现的频次远大于其他数值,这样就会导致某个或某些task处理的数据量远超过其他task,因此发生数据倾斜。

举例:下图是计算所有大学生每门技术的学习人数,可以发现学习hadoop的人数远大于spark,那么处理hadoop的task的压力就会更大,导致数据倾斜。

alt

如何分析数据倾斜

  1. 确诊问题(判断问题是不是数据倾斜)
  2. 定位问题(判断具体是哪段代码发生数据倾斜)
  3. 分析问题(分析造成数据倾斜的原因是什么)

alt

具体来讲:

  1. 打开sparkui界面,查看所有stage的执行时长,是否存在某个或某几个stage执行时长大大超过了其余stage的平均时长
  2. 获取执行时长最长的stage的id,到DAG图中进行搜索,找到对应的执行代码
  3. 通过Python/SQL分析代码涉及到所有表的shuffle key的分布,观察是否存在某个key分布较集中

解决方案

Join

两种情况,一种是大小表join,一种是大大表join

  • 大小表join:开启mapjoin即可
-- b是小表(MB级及以下) a是大表(GB及以上)
select /*+mapjoin(b)*/
    a.*
from a
left join b
on a.id = b.id
  • 大大表join:

【存在热点值】:拆分热点和非热点

-- 1.识别表a的热点key,并保存在tmp表中
-- 2.热点key和表b进行join
-- 3.非热点key和表b进行join
-- 4.将2和3的结果合并一起
select /*+mapjoin(a)*/a.id, a.name, b.score
from (select * from a where a.id in (select id from tmp)) a
join b
on a.id = b.id
union all
select a.id, a.name, b.score
from (select * from a where a.id not in (select id from tmp)) a
join b
on a.id = b.id

【不存在热点值】:采用分桶join

  create table t (
      a string,
      b string
  ) 
  partitioned by (dt string)
  clustered by (b) into 2048 buckets;

Group By

  • 加盐打散
-- 优化前sql
select id, count(*)
from t 
group by id
;
-- 优化后sql
select 
    t.id,
    sum(t.cnt)
from (
    select 
        id, 
        case when id in ('1001','1002') then cast(rand() * 100 as bigint), 
        count(*) as cnt
    from t
    group by 
        id, 
        case when id in ('1001','1002') then cast(rand() * 100 as bigint)
) t
group by t.id
;

Count Distinct

-- 优化前sql
select dt, count(distinct user_id) as cnt
from t
group by dt 
;
  • 方式1:两阶段聚合+加盐打散
select 
    split(rand_dt, '_')[1] as dt,
    count(*) as cnt
from (
    select 
        concat(cast(rand() * 10 as bigint), '_', dt) as rand_dt
        user_id
    from t
    group by 
        concat(cast(rand() * 10 as bigint), '_', dt)
        user_id
) t
group by split(rand_dt, '_')[1]
;
  • 方式2:构建bitmap
select 
    dt, 
    getcardinality(idbits) as cnt  -- 2. 计算基数
from (
    -- 1.转换为bit
    select 
        dt, 
        id2bit(user_id) as idbits
    from t
) t
group by dt

Row_number

  • 加盐打散
-- 需求:计算每个用户的成绩排名(假设大量用户id进入到同一个reduce进行计算)
-- 优化前sql
select 
    uid, score, 
    row_number() over(partition by uid order by score desc) rk
from t

-- 优化后sql
select 
    uid, score, 
    row_number() over(partition by uid, bucket order by score desc) rk
from (
    select 
        uid, score, cast(rand() * 10 as bigint) as bucket
    from t
)t

下期预告

关于数据治理,我有些话要说

#数据人的面试交流地##校招过来人的经验分享##2025届提前批#
全部评论
佬问一下最后row_number那里加盐打散partition by uid,bucket后结果与上一段partition by uid为什么相同呢
点赞 回复 分享
发布于 06-30 17:21 河南

相关推荐

1. C++中的构造函数和析构函数的作用是什么?2. 什么是C++中的命名空间?如何使用?3. C++中的虚析构函数有什么作用?4. C++中如何实现抽象类和接口?5. 什么是多态的静态绑定和动态绑定?6. C++中的默认参数是什么?如何使用?7. 什么是C++中的强制类型转换?8. C++中如何使用std::vector和std::list的区别?9. 什么是C++中的std::map和std::set?10. C++中的异常安全性分为哪几种级别?11. 什么是C++中的内存对齐?12. C++中如何使用std::pair和std::tuple?13. C++中的friend类和friend函数有什么区别?14. C++中如何实现模板类?15. 什么是C++中的类型推导(decltype)?16. C++中的智能指针如何防止内存泄漏?17. C++中如何使用std::shared_ptr和std::weak_ptr?18. C++中的std::mutex和std::lock_guard有什么区别?19. 什么是C++中的线程安全容器?20. C++中如何实现条件变量的使用?21. 什么是C++中的移动语义?22. C++中的std::function和函数指针有什么区别?23. C++中如何使用std::algorithm库?24. C++中的std::initializer_list是什么?25. C++中如何使用模板元编程?26. 什么是C++中的类型特征(type traits)?27. C++中如何实现自定义的迭代器?28. C++中的std::unique_ptr和std::shared_ptr的使用场景是什么?29. C++中如何处理字符串和字符数组的区别?30. C++中如何使用std::string和C风格字符串?31. 什么是C++中的析构函数的虚函数?32. C++中如何实现运算符重载的友元函数?33. C++中的std::array和C风格数组有什么区别?34. C++中如何使用范围for循环遍历容器?35. C++中的std::optional是什么,如何使用?嵌入式C++面经推荐大佬面经  链接在下边  c++/嵌入式面经专栏-牛客网 https://www.nowcoder.com/creation/manager/columnDetail/MJNwoM
点赞 评论 收藏
分享
10-29 11:50
东北大学 C++
分布式缓存,你的分布式key是怎么分桶的?分桶规则是什么?节点的增减会不会影响你的数据分布?有没有主节点?没有主节点来管理整个数据的一个分布规则,如果有一个节点挂了,客户端怎么感知的?假设正在请求中,down了一个节点,比如原来5个down之后变成4个,那这个时候你请求的有部分数据,它的数据分布就变了,你的客户端怎么知道这个事情?怎么会路由到其他节点上去呢?你这个数据可能正在迁移,你数据正在迁移,假设你的第五个节点宕机了,那么你的第五个节点的数据从哪里取出来?你这个节点宕机了,其他节点怎么拿到他这个宕机的数据?相当于你存的节点的数据在数据库都有?缓存有副本吗?平时都建议把C++模板类的实现,放到头文件中,基于什么考虑?C++一个虚函数可以定义成模板函数吗?如果我在一个头文件中定义一个函数,这里会有什么问题吗?一个类的大小有什么有关?如果一个类本身有一个虚函数,再加一个虚函数它的大小怎么变化?unordered_map和map的区别有unordered_map查找时间复杂度是O(1),既然有基于哈希表的map,为什么还要有基于红黑树的map?解决哈希冲突的一些方法C++三种智能指针unique_ptr怎么做到的单独拥有一块内存,而不是和别人共享一块内存?手撕:字符串相乘,没撕出来,挂
腾讯一面1582人在聊 查看17道真题和解析
点赞 评论 收藏
分享
7 21 评论
分享
牛客网
牛客企业服务