大厂Spark八股文面经及参考答案(阿里京东唯品会多家面经汇总)
Spark on standalone 模型、YARN 架构模型是怎样的?YARN - cluster 涉及哪些参数?
Spark 支持多种集群管理模式,Standalone 和 YARN 是两种常用部署方式,其架构和交互逻辑差异显著。
Standalone 模型
Standalone 是 Spark 自带的集群管理器,采用 主从架构:
- Master 节点:负责资源调度和集群管理,跟踪所有 Worker 节点的状态。
- Worker 节点:运行 Executor 进程,执行具体的 Task。
- Driver 进程:可以运行在集群内部(Cluster 模式)或外部(Client 模式)。
工作流程:
- 用户提交应用后,Driver 向 Master 申请资源。
- Master 指示 Worker 启动 Executor。
- Executor 向 Driver 注册,接收并执行 Task。
YARN 架构模型
在 YARN 模式下,Spark 依赖 Hadoop YARN 管理资源,分为 YARN Client 和 YARN Cluster 两种模式:
- YARN Client:Driver 运行在提交任务的客户端,适用于交互式调试。
- YARN Cluster:Driver 运行在 YARN 的 ApplicationMaster 容器中,更适合生产环境。
YARN Cluster 核心组件:
- ResourceManager:全局资源管理器,分配 Container。
- ApplicationMaster:负责与 ResourceManager 协商资源,并启动 Executor。
- NodeManager:在节点上创建 Container 运行 Executor。
YARN Cluster 涉及的关键参数:
|
指定使用 YARN 集群模式 |
|
设置 Driver 运行在 YARN 集群内 |
|
定义每个 Executor 的内存大小 |
|
指定 Executor 数量 |
|
每个 Executor 使用的 CPU 核心数 |
|
指定 YARN 队列资源 |
|
指定 Spark 依赖的 Jar 包路径,避免重复上传 |
RDD 的宽依赖和窄依赖是什么?请举例说明相关算子。Spark SQL 的 GroupBy 会造成窄依赖吗?GroupBy 是行动算子吗?为什么要划分宽依赖和窄依赖?
宽窄依赖 是 RDD 的血统关系分类,决定了 Stage 的划分和容错机制。
定义与示例
- 窄依赖:父 RDD 的每个分区最多被子 RDD 的 一个分区 依赖。 算子:map、filter、union。示例:rdd.map(x => x*2) 的每个输入分区独立生成输出分区。
- 宽依赖:父 RDD 的每个分区被子 RDD 的 多个分区 依赖。 算子:groupByKey、reduceByKey、join(非广播场景)。示例:rdd.groupByKey() 需将相同 Key 的数据聚合到同一分区。
Spark SQL 的 GroupBy 依赖类型
- 在 Spark SQL 中,
GROUP BY
会触发 Shuffle,因此属于 宽依赖。
GroupBy 是行动算子吗?
- 不是。
GROUP BY
在 SQL 中属于转换操作(类似transform
),而触发计算的 Action 是collect()
、count()
等。
划分宽窄依赖的意义
- 任务并行优化:窄依赖允许 Task 并行执行,无需等待其他分区数据。
- 容错效率:窄依赖只需重新计算丢失分区的父分区,而宽依赖需回溯整个父 RDD。
- Stage 划分:宽依赖是 Stage 的边界,DAGScheduler 依此切分任务。
源码关键逻辑:
在 RDD.dependencies
方法中,通过判断依赖是否为 ShuffleDependency
确定宽窄。
Spark 提交 job 的流程是怎样的?
从用户执行 spark-submit
到任务完成,流程可分为 资源申请、任务分配、计算执行 三个阶段:
- 资源初始化用户通过 spark-submit 提交应用,指定 --master 和 --deploy-mode。集群管理器(如 YARN)启动 Driver 进程,并创建 SparkContext。SparkContext 向集群管理器申请 Executor 资源,集群管理器分配 Container 并启动 Executor。
- 任务规划与调度Driver 将用户代码转换为 RDD 操作链,并由 DAGScheduler 生成 DAG。DAG 根据宽依赖划分为多个 Stage,每个 Stage 生成对应的 TaskSet。TaskScheduler 将 Task 分配到 Executor,优先考虑数据本地性(如数据所在的节点)。
- 任务执行与反馈Executor 执行 Task,并将状态(成功/失败)汇报给 Driver。对于 Shuffle 操作,Executor 将中间数据写入本地磁盘,供下游 Task 读取。所有 Task 完成后,Driver 关闭 SparkContext,释放集群资源。
关键交互点:
- Executor 注册:Executor 启动后主动向 Driver 注册,形成可用资源池。
- 心跳机制:Executor 定期发送心跳,Driver 监控其存活状态。
- 容错处理:若 Task 失败,Driver 重新调度该 Task;若 Executor 宕机,集群管理器会重新分配资源。
Spark 的阶段是如何划分的?在源码中如何判断属于 Shuffle Map Stage 或 Result Stage?
Stage 的划分基于 RDD 的 依赖类型,尤其是宽依赖(Shuffle Dependency)。
划分规则:
- 窄依赖:父 RDD 的每个分区仅被子 RDD 的一个分区依赖(如
map
、filter
)。这类操作不会触发 Stage 划分。 - 宽依赖:父 RDD 的每个分区可能被子 RDD 的多个分区依赖(如
reduceByKey
、join
)。宽依赖是 Stage 的边界。
源码实现:
- DAGScheduler 通过
getParentStages
方法递归查找宽依赖,将连续的窄依赖合并为一个 Stage。 - ShuffleMapStage:中间 Stage,负责为 Shuffle 操作生成数据。其输出是 Shuffle 文件的写入。
- ResultStage:最终 Stage,对应 Action 操作(如
count()
),直接生成结果。
判断逻辑:
- 在
DAGScheduler.handleJobSubmitted
方法中,最后一个 Stage 会被标记为 ResultStage。 - ShuffleMapStage 的 Task 返回
MapStatus
(记录 Shuffle 数据位置),而 ResultStage 的 Task 直接返回计算结果。
源码关键代码段:
// 判断是否为 ResultStage if (stage.isInstanceOf[ResultStage]) { // 执行 Action 操作 } else { // 执行 ShuffleMapStage }
Spark 处理数据的具体流程是什么?
Spark 数据处理流程围绕 RDD 转换 和 任务执行 展开,具体步骤如下:
- 数据加载从 HDFS、本地文件系统或数据库读取数据,生成初始 RDD(如 sc.textFile("hdfs://path"))。数据默认按块(Block)分区,每个分区对应一个 Task。
- 转换操作应用 map、filter 等窄依赖转换,生成新的 RDD。此时仅记录血统(Lineage),不立即计算。遇到 reduceByKey 等宽依赖时,触发 Shuffle 操作,将数据重新分区。
- 任务执行当调用 Action(如 collect())时,Spark 根据 DAG 生成物理计划,划分 Stage 并提交 Task。Executor 并行执行 Task,处理各自分区的数据。Shuffle 数据写入本地磁盘,供下游 Stage 读取。
- 结果返回与持久化最终结果返回 Driver,或写入外部存储系统(如 HDFS)。若调用 persist(),数据会缓存在内存或磁盘,加速后续计算。
Shuffle 过程详解:
- Map 端:每个 Task 将输出按分区规则写入本地文件(如 HashPartitioner)。
- Reduce 端:Task 从多个 Map 任务的输出中拉取数据,进行聚合或排序。
Spark join 有哪些分类?Spark map join 的实现原理是什么?
Spark 支持多种 Join 策略,选择取决于数据大小和分布:
Join 分类:
Shuffle Hash Join |
两表均较大,但单分区可放入内存 |
对两表按 Join Key 分区,各自构建哈希表 |
Broadcast Hash Join |
小表可放入内存 |
广播小表全量数据,与大表分区本地 Join |
Sort-Merge Join |
大数据量且有序 |
先按 Key 排序,再合并相同 Key 的数据 |
Cartesian Join |
笛卡尔积 |
全量数据交叉匹配,慎用 |
Broadcast Join(Map Join)原理:
- 触发条件:小表大小低于
spark.sql.autoBroadcastJoinThreshold
(默认 10MB)。 - 实现步骤: Driver 将小表数据 全量收集 到内存,并序列化为广播变量。广播变量分发到所有 Executor,每个 Task 持有小表数据的副本。大表的分区数据直接与本地的小表副本进行 Join,避免 Shuffle。
优化意义:
- 消除 Shuffle 带来的网络和磁盘开销,显著提升性能。
- 适合 维表关联 场景(如用户 ID 映射用户信息)。
注意事项:
- 若小表实际大小超过阈值,可能导致 Driver 内存溢出。
- 可通过
hint
强制指定 Broadcast Join(如df1.join(broadcast(df2))
)。
什么是 Spark Shuffle?其优缺点是什么?在什么情况下会产生 Spark Shuffle?为什么要进行 Spark Shuffle?
Spark Shuffle 是 数据跨节点重新分配 的过程,通常由 宽依赖操作(如 reduceByKey
、join
)触发。其核心目的是将数据按规则重新组织,以满足后续计算需求。
优缺点分析
数据聚合 支持跨分区的统计、排序等操作 |
网络开销 大量数据跨节点传输 |
灵活性 支持复杂操作(如多表关联) |
磁盘 I/O Shuffle 数据需落盘保存 |
并行计算 通过分区提升处理效率 |
性能瓶颈 数据倾斜可能导致部分 Task 延迟 |
触发 Shuffle 的场景
- 聚合操作:
groupByKey
、reduceByKey
。 - 关联操作:
join
(除非小表可广播)。 - 排序操作:
sortByKey
、repartitionAndSortWithinPartitions
。 - 重分区:
repartition
、coalesce
(部分情况)。
为什么需要 Shuffle?
- 数据重组:例如,按 Key 聚合需要将相同 Key 的数据集中到同一分区。
- 并行计算:下游 Task 需基于特定分区规则并行处理数据。
Spark 为什么快?为什么适合迭代处理?
Spark 的高性能源于 内存计算、DAG 优化 和 执行引擎创新,而迭代处理的优势则来自 RDD 的缓存机制。
速度快的原因
- 内存优先:中间数据缓存到内存,避免重复磁盘读写(如多次读取同一份数据)。
- DAG 优化:通过逻辑优化合并操作,减少冗余计算。例如,
filter
后接map
可能被合并为单次遍历。 - 延迟执行:转换操作(如
map
)不会立即计算,而是构建执行计划,优化后再触发执行。 - 并行调度:Task 并行执行,且调度时优先考虑数据本地性。
适合迭代处理的优势
- RDD 缓存:在机器学习等迭代场景中,中间结果(如训练数据)可缓存到内存,加速后续迭代。
- 血统机制:即使缓存丢失,也能通过血统快速重建数据,避免重复计算。
对比传统 MapReduce:
- MapReduce 每次迭代需将数据写入 HDFS,而 Spark 直接在内存中传递数据,减少 90% 以上的 I/O 时间。
Spark 数据倾斜问题如何定位和解决?
数据倾斜指 部分分区数据量远大于其他分区,导致某些 Task 执行缓慢,是常见性能问题。
定位方法
- Spark UI 观察:检查 Stage 中各 Task 的处理时间,若个别 Task 耗时过长,可能存在倾斜。
- 数据采样:对 Key 进行随机采样,统计分布情况(如
sample
方法)。 - 日志分析:若 Task 失败重试频繁,可能因倾斜导致内存溢出。
解决方案
加盐处理 |
聚合类操作(如
) |
为倾斜 Key 添加随机前缀,分散数据到不同分区,最终二次聚合 |
调整并行度 |
分区不均 |
增加 Shuffle 分区数(如
) |
广播小表 |
大表关联小表 |
使用 Broadcast Join 避免 Shuffle |
过滤倾斜 Key |
倾斜 Key 可单独处理 |
将倾斜 Key 的数据单独提取处理,再合并结果 |
双重聚合 |
求和类操作 |
先局部聚合(加盐),再全局聚合 |
案例场景:
- Join 倾斜:将大表中与倾斜 Key 关联的数据单独提取,与小表数据广播后 Join,再合并非倾斜部分。
Spark 的内存模型是怎样的?
Spark 的内存管理分为 统一内存池,由 Execution 内存(计算)和 Storage 内存(缓存)动态共享。
内存分配结构
Execution 内存 |
Shuffle、Join 等计算过程的临时数据 |
默认占总内存的 50% |
Storage 内存 |
缓存 RDD 或 Broadcast 数据 |
默认占总内存的 50% |
用户内存 |
用户代码中的数据结构(如 HashMap) |
固定 25% 的堆内存 |
保留内存 |
系统预留 |
固定 300MB |
动态调整机制:
- 当 Execution 内存不足时,可借用 Storage 内存,反之亦然。
- 若 Storage 内存被借用,缓存的数据可能被淘汰(LRU 策略)。
优化建议:
- 避免用户代码中创建过大的对象,防止挤占 Execution/Storage 内存。
- 对于缓存重要数据,可通过
persist(StorageLevel.MEMORY_ONLY)
指定优先级。
什么是 Spark 中的 Transform 和 Action?为什么 Spark 要把操作分为这两类?请列举常用的算子并说明其原理。Spark 的哪些算子会有 shuffle 过程?
Spark 中的操作分为 转换(Transform) 和 动作(Action),这一分类是 Spark 延迟执行 和 执行优化 的核心设计。
- 转换(Transform)定义:对 RDD 进行数据处理的逻辑描述(如 map、filter),但不会立即触发计算。原理:仅记录操作的血统(Lineage),生成新的 RDD。示例:map(func):对每个元素应用函数,生成一对一映射。filter(func):过滤符合条件的元素,分区不变。flatMap(func):将每个元素映射为多个输出(如拆分句子为单词)。
- 动作(Action)定义:触发实际计算并返回结果或写入外部存储的操作(如 count、saveAsTextFile)。原理:执行前会生成完整的 DAG,提交任务到集群执行。示例:collect():将数据收集到 Driver 端,慎用大数据量场景。reduce(func):按函数聚合所有元素(如求和)。foreach(func):对每个元素应用函数(如写入数据库)。
分类原因:
- 优化执行计划:延迟执行允许 Spark 合并多个转换操作,减少计算步骤(如
filter
后接map
合并为单次遍历)。 - 资源管理:避免频繁触发计算导致资源浪费。
触发 Shuffle 的算子:
宽依赖操作会触发 Shuffle,例如:
reduceByKey
:按 Key 聚合,需跨分区收集相同 Key 的数据。groupByKey
:分组操作导致数据重分布。join
(非广播场景):两个表按 Key 关联,需 Shuffle 数据。distinct
:去重操作隐含 Shuffle。
Spark 有了 RDD,为什么还要有 DataFrame 和 DataSet?它们之间的区别是什么?
RDD 是 Spark 的底层抽象,而 DataFrame 和 DataSet 是更高层的 API,旨在解决 RDD 的 执行效率 和 易用性 问题。
引入原因:
- 执行优化:DataFrame/Dataset 通过 Catalyst 优化器 生成优化的物理执行计划(如谓词下推、列裁剪)。
- 结构化数据处理:支持 Schema 定义,可直接操作字段(如
df.select("name")
),无需处理复杂对象。 - 跨语言支持:DataFrame 提供统一的 Java/Scala/Python/R API。
- 类型安全:Dataset(仅 Scala/Java)在编译时检查类型错误。
三者区别:
数据类型 |
任意对象(JVM 类型) |
行对象(Row) |
强类型对象(如 Case Class) |
优化能力 |
无 |
Catalyst 优化器 |
Catalyst 优化器 + 编码器 |
序列化 |
Java 序列化(性能低) |
Tungsten 二进制编码(高效) |
编码器(类型感知高效序列化) |
API 类型 |
函数式操作(lambda) |
结构化 API(SQL 风格) |
强类型 API |
适用场景:
- RDD:需精细控制分区或处理非结构化数据。
- DataFrame:处理结构化数据(如 CSV、JSON),适合 SQL 类操作。
- DataSet:需类型安全且追求性能的场景(如 Scala 复杂业务逻辑)。
Spark 的 Job、Stage、Task 分别是什么?如何划分?Application、job、Stage、task 之间的关系是怎样的?Stage 内部逻辑是什么?
层级关系:
- Application:用户提交的完整 Spark 程序(如一个数据分析作业)。
- Job:由 Action 触发的计算任务。一个 Application 可包含多个 Job。
- Stage:Job 根据 宽依赖 划分的阶段,每个 Stage 包含一组可并行执行的 Task。
- Task:Stage 的最小执行单元,处理一个分区的数据。
划分规则:
- Job:每个 Action(如
count()
)生成一个 Job。 - Stage:从后向前回溯 RDD 依赖链,遇到宽依赖则划分 Stage。
- Task:Stage 的分区数决定 Task 的数量(如输入数据分 100 块,则生成 100 个 Task)。
关系示例:
Application → Job₁(Action1触发) ↓ Stage₁ → Stage₂(Shuffle 依赖) ↓ Task₁, Task₂, ..., Taskₙ
Stage 内部逻辑:
- 并行执行:同一 Stage 的 Task 无依赖,可并行运行。
- 数据本地性:Task 优先分配到存储输入数据的节点。
- Shuffle 边界:Stage 结束时可能需将数据写入磁盘(如 Shuffle Write)。
对 RDD、DAG 和 Task 如何理解?DAG 为什么适合 Spark?请介绍 Spark 的 DAG 及其生成过程。DAGScheduler 如何划分?主要做了哪些工作?
- RDD:弹性分布式数据集,是 Spark 的 数据抽象,包含分区、依赖关系和计算逻辑。
- DAG:有向无环图,表示 RDD 的转换流程,用于 优化执行路径。
- Task:实际执行单位,对应一个分区的数据和计算逻辑。
DAG 适合 Spark 的原因:
- 优化计算流程:合并窄依赖操作,减少数据传递(如将多个
map
合并)。 - 容错与回溯:通过 DAG 血统可快速恢复丢失的数据分区。
DAG 生成过程:
- 用户调用 Action 后,SparkContext 将 RDD 转换链提交给 DAGScheduler。
- DAGScheduler 根据宽依赖划分 Stage,生成 DAG。
- 每个 Stage 转换为 TaskSet,提交给 TaskScheduler 执行。
DAGScheduler 的核心工作:
- Stage 划分:通过递归查找宽依赖,将 Job 拆分为 Stage。
- 任务提交:按 Stage 依赖顺序提交 TaskSet(先父 Stage,后子 Stage)。
- 容错处理:若某个 Stage 失败,仅重新调度该 Stage 及其下游 Stage。
示例:
一个包含 map → reduceByKey → filter
的操作链中:
map
和filter
是窄依赖,合并为一个 Stage。reduceByKey
是宽依赖,触发 Stage 划分。
Spark 的容错机制是怎样的?RDD 如何容错?
Spark 的容错机制依赖 血统(Lineage) 和 检查点(Checkpoint),确保数据丢失后可恢复。
RDD 容错原理:
- 血统机制:记录 RDD 的生成过程(如
map
、filter
等操作)。若某个分区丢失,可根据血统重新计算。 - 检查点:将 RDD 持久化到可靠存储(如 HDFS),切断血统链,避免重算长链路依赖。
容错层级:
- Task 失败:Driver 重新调度该 Task 到其他 Executor。
- Executor 宕机:集群管理器重启 Executor,Driver 重新提交相关 Task。
- Driver 宕机:需借助外部框架(如 YARN 的 ApplicationMaster 重启机制)。
Checkpoint 使用场景:
- 迭代计算:如机器学习中迭代百次的场景,避免血统链过长。
- 关键中间结果:需确保数据安全时(如 Shuffle 后的数据)。
容错优化策略:
- 推测执行:对慢 Task 启动备份任务,防止个别节点拖慢整体作业。
- 黑名单机制:标记常故障的节点,避免重复调度。
案例说明:
假设一个 RDD 经过多次转换后丢失分区:
- Spark 查找该 RDD 的血统,找到其父 RDD。
- 重新计算父 RDD 的对应分区,并沿血统链重新生成丢失数据。
- 若该 RDD 设置了 Checkpoint,则直接从存储中读取,无需重新计算。
Executor 内存如何分配?
Executor 的内存分配是 Spark 性能优化的核心环节,直接影响任务执行的稳定性和效率。其内存划分为 堆内内存(On-Heap) 和 堆外内存(Off-Heap),并由多个子区域动态管理。
堆内内存结构
- Execution 内存:用于计算过程中的临时数据(如 Shuffle 的中间结果、排序缓冲区),默认占总堆内存的 50%。
- Storage 内存:用于缓存 RDD 或广播变量,默认占堆内存的 50%。
- 用户内存(User Memory):存储用户代码中的数据结构(如自定义对象),占总堆内存的 25%。
- 保留内存(Reserved Memory):系统预留空间(固定 300MB),防止内存溢出。
堆外内存
通过 spark.memory.offHeap.enabled
启用,用于存储序列化数据或直接内存操作,需配置 spark.memory.offHeap.size
。
动态调整机制
- 内存借用:Execution 和 Storage 内存可相互借用。例如,当 Execution 内存不足时,可占用未使用的 Storage 内存。
- 内存回收:若 Storage 内存被借用,缓存的数据可能被淘汰(基于 LRU 策略)。
关键参数
|
设置 Executor 的总堆内存(如
) |
|
调整 Execution + Storage 内存的占比(默认 0.6) |
|
指定堆外内存大小(默认
) |
优化建议
- 避免 OOM:合理分配 Execution 和 Storage 内存比例,根据任务类型调整(如缓存密集型任务增加 Storage 占比)。
- 监控工具:通过 Spark UI 观察各区域内存使用情况,调整参数避免溢出。
Spark 的 batchsize 是什么?如何解决小文件合并问题?
Batchsize 定义
在 Spark 中,batchsize 通常指 数据处理的批次大小,常见于流式计算(如 Spark Streaming 的微批处理)。但在批处理场景中,它可指代 每个 Task 处理的数据量,影响内存和并行度。
小文件合并问题
小文件(如 HDFS 上大量 KB 级文件)会导致 Task 数过多,增加调度开销,降低性能。
解决方案
输入阶段合并 |
读取数据时合并小文件,如
控制分区大小 |
输出阶段合并 |
使用
或
减少输出文件数 |
自定义合并逻辑 |
在写入前按业务规则合并数据(如按日期分区) |
Hadoop 合并工具 |
使用
或
预处理文件 |
参数调优示例
- 设置分区大小:
spark.conf.set("spark.sql.files.maxPartitionBytes", "128MB")
- 输出合并:
df.repartition(10).write.parquet("output_path")
注意事项
- 平衡并行度:合并文件需避免分区过大导致单个 Task 内存不足。
- 动态合并:流式场景可设置
maxOffsetsPerTrigger
控制每批数据量。
如何进行 Spark 参数(性能)调优?
Spark 调优需从 资源分配、数据分区、Shuffle 优化、序列化 等多方面入手。
核心调优策略
- 资源分配Executor 数量:根据集群资源调整 spark.executor.instances,避免过多导致资源争抢。Executor 配置:增大 spark.executor.memory 和 spark.executor.cores,提升单节点处理能力。
- 并行度优化调整 Shuffle 分区数:spark.sql.shuffle.partitions=200(默认 200)。输入数据分区:读取时指定分区大小(如 spark.read.parquet("path").repartition(100))。
- Shuffle 优化启用 Tungsten Sort:spark.shuffle.manager=sort(默认)。增大 Shuffle 缓冲区:spark.shuffle.file.buffer=64KB → 128KB。
- 序列化与压缩使用 Kryo 序列化:spark.serializer=org.apache.spark.serializer.KryoSerializer。压缩 Shuffle 数据:spark.shuffle.compress=true(默认启用)。
- 内存与缓存缓存频繁使用的 RDD:rdd.persist(StorageLevel.MEMORY_AND_DISK)。调整内存比例:spark.memory.fraction=0.8(高缓存需求场景)。
案例场景
- 数据倾斜:通过
salting
(加盐)分散 Key,或使用Broadcast Join
避免 Shuffle。 - 频繁 GC:增大 Executor 内存或减少缓存数据量。
Spark 是如何基于内存计算的?
Spark 的 内存计算 是其区别于 MapReduce 的核心特性,通过减少磁盘 I/O 大幅提升性能。
实现机制
- RDD 缓存将中间数据缓存到内存(如 persist()),后续计算直接读取内存数据,避免重复计算。支持多种存储级别:MEMORY_ONLY(纯内存)、MEMORY_AND_DISK(内存+磁盘)等。
- 内存优先策略计算过程优先使用内存存储中间结果(如 Shuffle 的 Map 输出),而非写入磁盘。仅在内存不足时溢出到磁盘(如 spark.memory.spillThreshold 控制阈值)。
- Tungsten 优化引擎堆外内存管理:直接操作二进制数据,减少 JVM 对象开销和 GC 压力。代码生成:将逻辑转换为高效字节码,加速计算(如聚合操作)。
对比 MapReduce
- 数据传递:MapReduce 每个阶段需写磁盘,而 Spark 多个阶段在内存中传递数据。
- 迭代计算:机器学习算法需多次迭代,Spark 内存缓存使迭代时间降低 10 倍以上。
局限与平衡
- 内存不足风险:大数据集可能引发 OOM,需结合磁盘缓存或调整分区策略。
什么是 RDD?RDD 有哪些特点?请介绍常见的 RDD 算子。RDD 的底层原理是什么?RDD 有哪些属性?RDD 的缓存级别有哪些?
RDD 定义
RDD(弹性分布式数据集)是 Spark 的 核心数据抽象,代表不可变、可分区的数据集合,支持并行计算和容错。
五大特点
- 弹性(Resilient):数据丢失时可从血统(Lineage)重建。
- 分布式(Distributed):数据跨节点分区存储。
- 不可变(Immutable):只能通过转换生成新 RDD。
- 分区(Partitioned):数据划分为多个分区并行处理。
- 类型无关(Typed):可存储任意数据类型(对象、元组等)。
常见算子
- 转换(Transform)map(func):一对一处理。filter(func):过滤数据。reduceByKey(func):按 Key 聚合(触发 Shuffle)。
- 动作(Action)collect():返回数据到 Driver。count():统计元素总数。
底层原理
- 依赖关系:窄依赖(父分区到子分区的一对一或多对一)和宽依赖(一对多,触发 Shuffle)。
- 任务调度:DAGScheduler 根据依赖划分 Stage,生成 Task 调度到 Executor。
RDD 五大属性
- 分区列表(Partitions)
- 依赖关系(Dependencies)
- 计算函数(Compute Function)
- 分区器(Partitioner,如 HashPartitioner)
- 数据本地性(Preferred Locations)
缓存级别
|
仅缓存到内存(默认) |
|
内存不足时溢写到磁盘 |
|
序列化后存内存(节省空间) |
|
仅存磁盘 |
|
堆外内存存储(需启用堆外内存配置) |
缓存管理
- 通过
persist()
指定级别,unpersist()
释放缓存。 - 缓存数据可被多个 Job 复用,避免重复计算。
Spark 广播变量的实现和原理是什么?
广播变量是 Spark 中 跨节点高效分发只读数据 的机制,主要用于优化 Shuffle 过程 和 减少数据传输开销。
实现原理
- 分发机制:Driver 将广播变量序列化后上传到集群的共享存储(如 HDFS 或分布式内存),Executor 按需拉取。
- 存储方式:每个 Executor 仅在首次使用时下载数据,并缓存在内存中供后续 Task 复用。
- 序列化优化:采用高效的二进制格式(如 Kyro 序列化)减少网络传输量。
核心优势
- 减少数据传输:避免在 Task 间重复传输相同数据(如小表关联大表时广播小表)。
- 内存共享:同一 Executor 上的多个 Task 共享广播变量副本,降低内存占用。
应用场景
- Map 端 Join:当小表足够小时,广播到所有 Executor 实现无 Shuffle 的 Join。
- 全局配置:分发机器学习模型参数或字典文件等静态数据。
注意事项
- 大小限制:广播变量不宜过大(通常建议小于 2GB),否则可能引发 OOM 或传输延迟。
- 不可变性:广播变量只读,修改需重新创建。
关键参数
spark.sql.autoBroadcastJoinThreshold
:设置自动广播的表大小阈值(默认 10MB)。spark.serializer
:选择序列化方式(如 Kyro 提升性能)。
reduceByKey 和 groupByKey 的区别和作用是什么?reduceByKey 和 reduce 的区别是什么?使用 reduceByKey 出现数据倾斜怎么办?
reduceByKey vs groupByKey
- 数据聚合阶段reduceByKey:在 Map 端预聚合(Combine),减少 Shuffle 数据量。groupByKey:直接传输所有数据到 Reduce 端,无预聚合。
- 性能差异reduceByKey 的 Shuffle 数据量更小,执行效率更高。groupByKey 可能因全量数据传输导致性能瓶颈。
reduceByKey vs reduce
- 操作对象reduceByKey:作用于键值对 RDD,按 Key 聚合。reduce:作用于整个 RDD,最终返回单个值(属于 Action 操作)。
数据倾斜解决方案
- 加盐处理:为倾斜 Key 添加随机前缀,分散到多个分区聚合后再二次聚合。
- 局部聚合:先对分区内数据聚合,再全局聚合(类似 MapReduce 的 Combiner)。
- 过滤倾斜 Key:单独处理倾斜 Key,与其他数据结果合并。
案例场景
假设 reduceByKey
处理用户点击日志时,某热门用户(Key)的日志量极大:
- 对 Key 添加随机后缀(如
userA_1
,userA_2
)。 - 执行
reduceByKey
得到局部聚合结果。 - 去掉后缀,再次聚合得到最终结果。
Spark SQL 的执行原理是什么?如何优化?
执行原理
Spark SQL 基于 Catalyst 优化器 和 Tungsten 执行引擎,流程分为四阶段:
- 逻辑计划生成:将 SQL 或 DataFrame 操作转换为抽象语法树(AST)。
- 逻辑优化:应用规则优化(如谓词下推、常量折叠)。
- 物理计划生成:转换为可执行的 RDD 操作(如选择 BroadcastHashJoin)。
- 代码生成:将物理计划编译为 Java 字节码,提升执行效率。
优化策略
数据分区 |
对常用过滤字段分区(如
)。 |
过滤下推 |
提前过滤数据(如将
条件推到数据源读取阶段)。 |
广播 Join |
小表广播避免 Shuffle(通过
控制)。 |
缓存复用 |
对频繁访问的 DataFrame 执行
。 |
统计信息收集 |
使用
收集表统计信息,帮助优化器选择 Join 策略。 |
Tungsten 引擎优化
- 堆外内存:直接操作二进制数据,减少 GC 开销。
- 列式存储:对 Parquet 等列式格式高效读取。
什么是 Spark checkpoint?
Checkpoint 是 Spark 中 将 RDD 或 DStream 持久化到可靠存储 的容错机制,用于 剪枝血统链 和 提升恢复效率。
与 Cache 的区别
存储位置 |
可靠存储(如 HDFS) |
内存或本地磁盘 |
血统处理 |
切断血统,不可恢复 |
保留血统,可重新计算 |
用途 |
容错和长链路依赖优化 |
临时加速计算 |
使用场景
- 迭代计算:如梯度下降算法迭代百次时,定期 Checkpoint 避免血统过长。
- 流处理:DStream 的 Checkpoint 保存元数据和计算状态。
配置方法
sc.setCheckpointDir("hdfs://path/to/dir") // 设置 Checkpoint 目录 rdd.checkpoint() // 触发 Checkpoint 操作
注意事项
- Checkpoint 是 惰性操作,需触发 Action 后才会执行。
- 频繁 Checkpoint 可能增加 I/O 开销,需权衡性能与可靠性。
Spark SQL 与 DataFrame 如何使用?如何创建 DataFrame?如何在 S
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
17年+码农经历了很多次面试,多次作为面试官面试别人,多次大数据面试和面试别人,深知哪些面试题是会被经常问到。 在多家企业从0到1开发过离线数仓实时数仓等多个大型项目,详细介绍项目架构等企业内部秘不外传的资料,介绍踩过的坑和开发干货,分享多个拿来即用的大数据ETL工具,让小白用户快速入门并精通,指导如何入职后快速上手。 计划更新内容100篇以上,包括一些企业内部秘不外宣的干货,欢迎订阅!