【大数据面试题】Spark-Core&Spark-SQL
1-spark架构组成
- Master 节点、Worker 节点、Driver驱动器,Executor 执行器、Task 计算任务
- Master 节点上常驻Master 进程,该进程负责管理所有的Worker 节点。(分配任务、收集运行信息、监控worker的存活状态)
- Worker 节点常驻Worker进程,该进程与Master 节点通信,还管理Spark 任务的执行。(启动Executor,监控任务运行状态)
- Driver:Spark中的Driver即运行上述Application的main函数并创建SparkContext,创建SparkContext的目的是为了准备Spark应用程序的运行环境,在Spark中有SparkContext负责与ClusterManager通信,进行资源申请、任务的分配和监控等,当Executor部分运行完毕后,Driver同时负责将SparkContext关闭。
- Executor 执行器。Executor 是一个JVM 进程,是Spark 计算资源的单位。可以运行多个计算任务。
- Task Spark 应用会被拆分为多个计算任务,分配给Executor 执行。Task 以线程的方式运行在Executor 中。
2-Spark为什么会有自己的资源调度器
Hadoop的Yarn框架比Spark框架诞生的晚,所以Spark自己也设计了一套资源调度框架。
3-Spark运行模式
1)Local:运行在一台机器上。 测试用。
2)Standalone:是Spark自身的一个调度系统。 对集群性能要求非常高时用。国内很少使用。
3)Yarn:采用Hadoop的资源调度器。 国内大量使用。
4)Mesos:国内很少使用。
4-Spark的OOM原因
概括
1、map执行内存溢出
2、shuffle后内存溢出
3、driver内存溢出
map执行中内存溢出代表了所有map类型的操作。包括:flatMap,filter,mapPatitions等。shuffle后内存溢出的shuffle操作包括join,reduceByKey,repartition等操作
具体原因
1.map过程产生大量对象导致内存溢出
例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),这个操作下,每个对象产生了一千个对象,肯定容易造成内存溢出。针对这种问题,在不增加内存的情况下,可以通过减少每个Task的大小,以便达到每个Task即便产生大量对象,也能装得下。具体做法,在map之前调用repartition,增加分区数
2.数据不平衡导致内存溢出
数据不平衡除了有可能导致内存溢出外,也可能导致性能问题,解决办法同上,调用repartition方法,重分区
3.coalesce调用导致内存溢出
HDFS不适合存储小文件,所以spark计算后如果产生的文件太小,会先调用coalesce合并文件,再存入HDFS。这会导致一个问题,假使调用之前有一百个文件,就需要一百个task,调用coalesce之后,产生10个文件,由于coalesce默认shuffle为false,窄依赖,reduce阶段的task数据量是map阶段的10倍,也就是说并不是先100个task执行,而是从头到尾都是10个task再执行,容易OOM,可以将coalesce的shuffle设置成true,这样就会有shuffle,数据中间落盘,不会OOM
4.shuffle后内存溢出
内存溢出可以说都是shuffle后,单个文件过大导致的,在spark中,join,reduceByKey这一类型的过程,都会有shuffle,shuffle使用需要传入一个partitioner,大部分Spark中的shuffle操作,默认是HashPartitioner,默认值是父RDD中最大的分区数,这个参数通过spark.default.parallelism控制,该参数只对HashPartitioner有效,如果是别的partitioner导致OOM就需要在partitioner代码中增加partitions的数量
5-简述Spark的架构
Application:用户编写的Spark应用程序。Driver:Spark中的Driver即运行上述Application的main函数并创建SparkContext,创建SparkContext的目的是为了准备Spark应用程序的运行环境,在Spark中有SparkContext负责与ClusterManager通信,进行资源申请、任务的分配和监控等,当Executor部分运行完毕后,Driver同时负责将SparkContext关闭。Executor:是运行在工作节点(WorkerNode)的一个进程,负责运行Task。**RDD:**弹性分布式数据集,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型。**DAG:**有向无环图,反映RDD之间的依赖关系。**Task:**运行在Executor上的工作单元。**Job:**一个Job包含多个RDD及作用于相应RDD上的各种操作。**Stage:**是Job的基本调度单位,一个Job会分为多组Task,每组Task被称为Stage,或者也被称为TaskSet,代表一组关联的,相互之间没有Shuffle依赖关系的任务组成的任务集。**Cluter Manager:**指的是在集群上获取资源的外部服务。目前有三种类型
- Standalon : spark原生的资源管理,由Master负责资源的分配
- Apache Mesos:与hadoop MR兼容性良好的一种资源调度框架
- Hadoop Yarn: 主要是指Yarn中的ResourceManager 一个Application由一个Driver和若干个Job构成,一个Job由多个Stage构成,一个Stage由多个没有Shuffle关系的Task组成。
当执行一个Application时,Driver会向集群管理器申请资源,启动Executor,并向Executor发送应用程序代码和文件,然后在Executor上执行Task,运行结束后,执行结果会返回给Driver,或者写到HDFS或者其它数据库中
6-spark作业提交流程
1.spark-submit 提交代码,执行 new SparkContext(),在 SparkContext 里构造 DAGScheduler 和 TaskScheduler。
2.TaskScheduler 会通过后台的一个进程,连接 Master,向 Master 注册 Application。Master 接收到 Application 请求后,会使用相应的资源调度算法,在 Worker 上为这个Application 启动多个 Executer。
3.Executor 启动后,会自己反向注册到 TaskScheduler 中。 所有 Executor 都注册到 Driver上之后,SparkContext 结束初始化,接下来往下执行我们自己的代码。
4.每执行到一个 Action,就会创建一个 Job。Job 会提交给 DAGScheduler。
5.DAGScheduler 会将 Job划分为多个 stage,然后每个 stage 创建一个 TaskSet。
6.TaskScheduler 会把每一个 TaskSet 里的 Task,提交到 Executor 上执行。
7.Executor 上有线程池,每接收到一个 Task,就用 TaskRunner 封装,然后从线程池里取出一个线程执行这个 task。(TaskRunner 将我们编写的代码,拷贝,反序列化,执行 Task,每个 Task执行 RDD 里的一个 partition)
7-spark sql与mysql sql的区别
1、Mysql:适用于实时性的查询,一般使用场景都是通过走B+树索引,来让查询效率维持在毫秒级。但是缺点也很明显,举个例子查询的量过大,有百万级别,Mysql直接OOM了。存在性能的瓶颈。而hiveSQL和sparkSQL的查询不存在这种问题,计算完成后的数据都是分布式存储的。
2、Spark SQL是一个用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了关于数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。
8-Spark提交作业参数
1)在提交任务时的几个重要参数
executor-cores —— 每个executor使用的内核数,默认为1,官方建议2-5个,我们企业是4个
num-executors —— 启动executors的数量,默认为2
executor-memory —— executor内存大小,默认1G
driver-cores —— driver使用内核数,默认为1
driver-memory —— driver内存大小,默认512M
2)边给一个提交任务的样式
spark-submit
--master local[5]
--driver-cores 2
--driver-memory 8g
--executor-cores 4
--num-executors 10
--executor-memory 8g
--class PackageName.ClassName XXXX.jar
--name "Spark Job Name"
InputPath
OutputPath
9-RDD定义特性
定义:
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
特性:
RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。
1、分区
RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。
2、只读
如下图所示,RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。
由一个RDD转换到另一个RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写map和reduce了
RDD的操作算子包括两类,一类叫做transformations,它是用来将RDD进行转化,构建RDD的血缘关系;另一类叫做actions,它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存的文件系统中。下图是RDD所支持的操作算子列表。
3、依赖
RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。如下图所示,依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应的,另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系。
4、缓存
如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。
5、CheckPoint
虽然RDD的血缘关系天然地可以实现容错,当RDD的某个分区数据失败或丢失,可以通过血缘关系重建。但是对于长时间迭代型应用来说,随着迭代的进行,RDDs之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。为此,RDD支持checkpoint将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs了,它可以从checkpoint处拿到数据。
10-Spark的transformation算子
1)单Value
(1)map
(2)mapPartitions
(3)mapPartitionsWithIndex
(4)flatMap
(5)glom
(6)groupBy
(7)filter
(8)sample
(9)distinct
(10)coalesce
(11)repartition
(12)sortBy
(13)pipe
2)双vlaue
(1)intersection
(2)union
(3)subtract
(4)zip
3)Key-Value
(1)partitionBy
(2)reduceByKey
(3)groupByKey
(4)aggregateByKey
(5)foldByKey
(6)combineByKey
(7)sortByKey
(8)mapValues
(9)join
(10)cogroup
11-Spark的action算子
(1)reduce
(2)collect
(3)count
(4)first
(5)take
(6)takeOrdered
(7)aggregate
(8)fold
(9)countByKey
(10)save
(11)foreach
12- Spark相近算子的区别
map和mapPartitions区别
1)map:每次处理一条数据
2)mapPartitions:每次处理一个分区数据
Repartition和Coalesce区别
1)关系:
两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)
2)区别:
repartition一定会发生shuffle,coalesce根据传入的参数来判断是否发生shuffle
一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce
reduceByKey与groupByKey的区别
reduceByKey:具有预聚合操作
groupByKey:没有预聚合
在不影响业务逻辑的前提下,优先采用reduceByKey。
reduceByKey、foldByKey、aggregateByKey、combineByKey区别
ReduceByKey 没有初始值 分区内和分区间逻辑相同
foldByKey 有初始值 分区内和分区间逻辑相同
aggregateByKey 有初始值 分区内和分区间逻辑可以不同
combineByKey 初始值可以变化结构 分区内和分区间逻辑不同
13-Spark 中 DAG 是如何形成的
- DAG 是有向无环图,其实就是RDD执行的流程。原始的RDD通过一系列的转换操作就形成了DAG有向无环图,任务执行时(执行Action算子时),可以按照DAG的描述,执行真正的计算(数据被操作的一个过程)。一个Spark应用中可以有一到多个DAG,取决于触发了多少次Action。
- Spark会根据shuffle/宽依赖使用回溯算法来对DAG进行Stage划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的stage/阶段中。
14-Spark宽窄依赖&血缘Lineage
Lineage:
相比其他系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据Transformation操作(如filter、map、join等)行为。当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。因为这种粗颗粒的数据模型,限制了Spark的运用场合,所以Spark并不适用于所有高性能要求的场景,但同时相比细颗粒度的数据模型,也带来了性能的提升。
宽依赖和窄依赖。有Shuffle的是宽依赖。
窄依赖(Narrow Dependency):指父RDD的每个分区只被子RDD的一个分区所使用,例如map、filter等这些算子 一个RDD,对它的父RDD只有简单的一对一的关系,RDD的每个partition仅仅依赖于父RDD 中的一个partition,父RDD和子RDD的partition之间的对应关系,是一对一的。这其中又分两种情况:1个子RDD分区对应1个父RDD分区(如map、filter等算子),1个子RDD分区对应N个父RDD分区(如co-paritioned(协同划分)过的Join)。
窄依赖的算子:map、filter、union、join(hash-partitioned)、mapPartitions、mapValues
宽依赖(Shuffle Dependency):父RDD的每个分区都可能被子RDD的多个分区使用,例如groupByKey、 reduceByKey,sortBykey等算子,这些算子其实都会产生shuffle操作,每一个父RDD的partition中的数据都可能会传输一部分到下一个RDD的每个partition中。此时就会出现,父RDD和子RDD的partition之间,具有错综复杂的关系,这种情况就叫做两个RDD 之间是宽依赖,同时,他们之间会发生shuffle操作。
宽依赖的算子:groupByKey、partitionBy、join(非hash-partitioned);
本质理解:
对于宽依赖,Stage计算的输入和输出在不同的节点上,对于输入节点完好,而输出节点死机的情况,通过重新计算恢复数据这种情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上追溯其祖先看是否可以重试(这就是lineage,血统的意思),窄依赖对于数据的重算开销要远小于宽依赖的数据重算开销。
根据父RDD分区是对应1个还是多个子RDD分区来区分窄依赖(父分区对应一个子分区)和宽依赖(父分区对应多个子分区)。如果对应多个,则当容错重算分区时,因为父分区数据只有一部分是需要重算子分区的,其余数据重算就造成了冗余计算。
窄依赖和宽依赖的概念主要用在两个地方:一个是容错中相当于Redo日志的功能;另一个是在调度中构建DAG作为不同Stage的划分点。
15-Spark任务的划分
(1)Application:初始化一个SparkContext即生成一个Application;
(2)Job:一个Action算子就会生成一个Job;
(3)Stage:Stage等于宽依赖的个数加1;
(4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。
16-spark的持久化&缓存机制
Spark 中一个很重要的能力是将数据持久化(或称为缓存),Spark 的缓存机制是一种空间换时间的方法,在多个操作间都可以访问这些持久化的数据。RDD 可以使用 persist() 方法或 cache() 方法进行持久化。数据将会在第一次 action 操作时进行计算,并缓存在节点的内存中,在该数据上的其他 action 操作将直接使用内存中的数据。这样会让以后的 action 操作计算速度加快(通常运行速度会加速 10 倍)
使用场景为:数据会被多次重复使用。数据量较小可以放在内存空间的情况下可以对RDD 进行缓存。
cache()调用的persist(),是使用默认存储级别的快捷设置方法
17-spark的缓存机制
DataFrame的cache默认采用 MEMORY_AND_DISK
RDD 的cache默认方式采用MEMORY_ONLY
缓存:(1)dataFrame.cache (2)sparkSession.catalog.cacheTable(“tableName”)
释放缓存:(1)dataFrame.unpersist (2)sparkSession.catalog.uncacheTable(“tableName”)
- MEMORY_ONLY : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,部分数据分区将不再缓存,在每次需要用到这些数据时重新进行计算。这是默认的级别。
- MEMORY_AND_DISK : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
- MEMORY_ONLY_SER : 将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式会比反序列化对象的方式节省很多空间,尤其是在使用 fast serializer时会节省更多的空间,但是在读取时会增加 CPU 的计算负担。
- MEMORY_AND_DISK_SER : 类似于 MEMORY_ONLY_SER ,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。
- DISK_ONLY : 只在磁盘上缓存 RDD。
- MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等 : 与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。
- OFF_HEAP: 类似于 MEMORY_ONLY_SER ,但是将数据存储在 off-heap memory,这需要启动 off-heap 内存。
选择方法
- 如果使用默认的存储级别(MEMORY_ONLY),存储在内存中的 RDD 没有发生溢出,那么就选择默认的存储级别。默认存储级别可以最大程度的提高 CPU 的效率,可以使在 RDD 上的操作以最快的速度运行。
- 如果内存不能全部存储 RDD,那么使用 MEMORY_ONLY_SER,并挑选一个快速序列化库将对象序列化,以节省内存空间。使用这种存储级别,计算速度仍然很快。
- 除了在计算该数据集的代价特别高,或者在需要过滤大量数据的情况下,尽量不要将溢出的数据存储到磁盘。因为,重新计算这个数据分区的耗时与从磁盘读取这些数据的耗时差不多。
- 如果想快速还原故障,建议使用多副本存储级别(例如,使用 Spark 作为 web 应用的后台服务,在服务出故障时需要快速恢复的场景下)。所有的存储级别都通过重新计算丢失的数据的方式,提供了完全容错机制。但是多副本级别在发生数据丢失时,不需要重新计算对应的数据库,可以让任务继续运行。
18-Spark分区
1)默认采用Hash分区
缺点:可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据。
2)Ranger分区
要求RDD中的KEY类型必须可以排序。
3)自定义分区
根据需求,自定义分区。
19-Spark累加器
常用作计数器, 累加器用来把Executor端变量信息聚合到Driver端。在Driver中定义的一个变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行合并计算。如果一个变量被声明为一个累加器,那么它将在被改变时会在driver端进行全局汇总,即在分布式运行时每个task运行的只是原始变量的一个副本,然后被改变后会传回Driver合并计算并能改变原始变量的值,分布式计数的功能。
注意事项:
1)累加器只能在Driver端定义和初始化,在Executor端更新,不能在Executor端定义,不能在Executor端(.value)获取值
2累加器要放在行动算子中,因为转换算子执行的次数取决于job的数量,如果一个spark应用有多个行动算子,那么转换算子中的累加器可能会发生不止一次更新,导致结果错误。所以,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在foreach()这样的行动算子中。
原理:
- sc.longAccumulator("xxx")方法会初始化AccumulatorMetadata,包括唯一id和name,并在AccumulatorContext进行accumulator的注册(注册的目的是task结束后,accumulator合并的时候可以找得到)。
- executor端的task执行到相关的rdd方法的时候,会触发闭包的传输,序列化机制中会将是否在driver端的标识改为false,并对当前的accumulator执行copyAndReset(),保证task收到的是一个干净的accumulator,并在taskContext中注册该accumulator。task就可以执行add()方法进行累加操作了。
- driver端的DagScheduler收到了task完成的事件后,会把收到的事件对象中的各个累加器合并到AccumulatorContext中注册好的主累加器中,这样用户在driver端调用longAccumulator.value()的时候,拿到的就是已经处理好后的累加值。
20-Spark广播变量
广播变量可以让程序高效地向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。
假如在spark程序里需要用到大对象,比如:字典等,这个都会由Driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task就会分发一份,这在task数目十分多的情况下Driver的带宽会带来极大的系统开销,而且会大量消耗Executor服务器上的资源,如果将这个变量声明为广播变量,那么只是每个Executor拥有一份,这个Executor启动的task会共享这个变量,节省了通信的成本和服务器的资源。广播变量允许将变量广播(提前广播)给各个Executor。该Executor上的各个Task再从所在节点的BlockManager获取变量,而不是从Driver获取变量,从而提升了效率。
一个Executor只需要在第一个Task启动时,获得一份Broadcast数据,之后的Task都从本节点的BlockManager中获取相关数据。
使用:
1.调用SparkContext.broadcast方法创建一个Broadcast[T]对象。任何序列化的类型都可以这么实现。
2.通过value属性访问改对象的值(Java之中为value()方法)
3.变量只会被发送到各个节点一次,应作为只读值处理
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3)
注意事项:
1)变量一旦被定义为一个广播变量,那么这个变量只能读,不能修改2)不能将一个RDD使用广播变量广播出去,因为RDD是不存储数据的。可以将RDD的结果广播出去。3)广播变量只能在Driver端定义,不能在Executor端定义。4)在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。5)如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。6)如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本
21-SparkSQL中RDD、DataFrame、DataSet三者的转换
22-RDD、DataFrame和DataSet
共性:
1.RDD、 DataFrame、DataSet都是spark平台下的分布式数据集,为处理超大型数据提供便利;2.三者都有惰性机制,在进行创建、转换时,不会立即执行,只有在遇到行动算子的时候才会开始计算;3.在对DataFrame和DataSet进行操作时,许多操作都需要导入:import spark.implicits._ 包;
4.三者都会根据Spark的内存情况进行自动缓存计算,这样即使数据量很大,也不会担心内存溢出;
区别:
1.RDD1.RDD 一般和spark mllib(后面解释)同时使用2.RDD不支持sparksql操作
2.DataFrame1.DataFrame每一行的类型固定为Row,每一列的值无法直接访问,只有通过解析才能获取各个字段的值;2.DataFrame和DataSet一般不与spark mllib同时使用3.DataFrame和DataSet均支持sparksql操作
3.DataSet1.DataFrame和DataSet拥有完全相同的成员函数,区别只是每一行的数据类型不同,DataFrame其实就是DataSet的一个特例,type DataFrame = DataSet[Row]2.DataFrame也可以叫DataSet[Row],每一行的类型为Row,每一行究竟有哪些字段,各个字段的类型是什么无从得知;而DataSet每一行是什么类型是不一定的,自定义case class之后可以很自由的获得每一行中的信息。
23-引起Shuffle的算子
spark中会导致shuffle操作的有以下几种算子、1、repartition类的操作:比如repartition、repartitionAndSortWithinPartitions、coalesce等2、byKey类的操作:比如reduceByKey、groupByKey、sortByKey等3、join类的操作:比如join、cogroup等
24-如何使用Spark实现TopN的获取
方法1:
(1)按照key对数据进行聚合(groupByKey)
(2)将value转换为数组,利用scala的sortBy或者sortWith进行排序(mapValues)数据量太大,会OOM。
方法2:
(1)取出所有的key
(2)对key进行迭代,每次取出一个key利用spark的排序算子进行排序
方法3:
(1)自定义分区器,按照key进行分区,使不同的key进到不同的分区
(2)对每个分区运用spark的排序算子进行排序
25-Streaming、Core、SQL区别联系
Spark Core :
Spark的基础,底层的最小数据单位是:RDD ; 主要是处理一些离线(可以通过结合Spark Streaming来处理实时的数据流)、非格式化数据。它与Hadoop的MapReduce的区别就是,spark core基于内存计算,在速度方面有优势,尤其是机器学习的迭代过程。
Spark SQL:
Spark SQL 底层的数据处理单位是:DataFrame(新版本为DataSet<Row>) ; 主要是通过执行标准 SQL 来处理一些离线(可以通过结合Spark Streaming来处理实时的数据流)、格式化数据。就是Spark生态系统中一个开源的数据仓库组件,可以认为是Hive在Spark的实现,用来存储历史数据,做OLAP、日志分析、数据挖掘、机器学习等等
Spark Streaming:
Spark Streaming底层的数据处理单位是:DStream ; 主要是处理流式数据(数据一直不停的在向Spark程序发送),这里可以结合 Spark Core 和 Spark SQL 来处理数据,如果来源数据是非结构化的数据,那么我们这里就可以结合 Spark Core 来处理,如果数据为结构化的数据,那么我们这里就可以结合Spark SQL 来进行处理。
联系:Spark SQL构建在Spark Core之上,专门用来处理结构化数据(不仅仅是SQL)。即Spark SQL是Spark Core封装而来的!Spark SQL在Spark Core的基础上针对结构化数据处理进行很多优化和改进,
26-持久化Cache与checkpoint的区别
- 目的不同:cache 是为了加速计算,也就是加速后续的job。checkpoint 则是为了在job 运行失败的时候能够快速恢复!
- 存储位置不同:cache 主要使用内存,偶尔使用磁盘存储。checkpoint 为了可靠读写主要采用HDFS 作为存储空间
- 对lineage 影响不同:cache 对lineage无影响。缓存的RDD 丢失后可以通过lineage 重新计算。如果对RDD 进行 checkpoint,HDFS 因为是可靠存储,所以不需要再保存lineage了
- 应用场景不同:cache 机制适用于会被多次读取,占用空间不是特别大的RDD。checkpoint 机制则是适用于数据依赖关系特别复杂,重新计算代价高的RDD,比如某RDD关联的数据过多、计算链过长、被多次重复使用。
27-控制Spark reduce缓存 调优shuffle
参数spark.sql.shuffle.partitions 决定 默认并行度200
spark.reducer.maxSizeInFilght 此参数为reduce task能够拉取多少数据量的一个参数默认48MB,当集群资源足够时,增大此参数可减少reduce拉取数据量的次数,从而达到优化shuffle的效果,一般调大为96MB,,资源够大可继续往上调。
spark.shuffle.file.buffer 此参数为每个shuffle文件输出流的内存缓冲区大小,调大此参数可以减少在创建shuffle文件时进行磁盘搜索和系统调用的次数,默认参数为32k 一般调大为64k。
28-Spark的checkpoint 机制
应用场景:
分布式计算中难免因为网络,存储等原因出现计算失败的情况,RDD中的lineage信息常用来在task失败后重计算使用,为了防止计算失败后从头开始计算造成的大量开销,RDD会checkpoint计算过程的信息,这样作业失败后从checkpoing点重新计算即可,提高效率。
checkPoint条件
- DAG中的Lineage过长,如果重算,则开销太大(如在PageRank中)。
- 在宽依赖上做Checkpoint获得的收益更大。
CheckPoint写流程
- 当RDD的action算子触发计算结束后会执行checkpoint。
- 在spark streaming中每生成一个batch的RDD也会触发checkpoint操作。
- 首先 driver 程序 需要使用 rdd.checkpoint() 去设定哪些 rdd 需要 checkpoint,设定后,该 rdd 就接受 RDDCheckpointData 管理。用户还要设定 checkpoint 的存储路径,一般在 HDFS 上。
- marked for checkpointing:初始化后,RDDCheckpointData 会将 rdd 标记为 MarkedForCheckpoint。
- checkpointing in progress:每个 job 运行结束后会调用 finalRdd.doCheckpoint(),finalRdd 会顺着 computing chain 回溯扫描,碰到要 checkpoint 的 RDD 就将其标记为 CheckpointingInProgress,然后将写磁盘(比如写 HDFS)需要的配置文件(如 core-site.xml 等)broadcast 到其他 worker 节点上的 blockManager。完成以后,启动一个 job 来完成 checkpoint(使用 rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf)))。
- checkpointed:job 完成 checkpoint 后,将该 rdd 的 dependency 全部清掉,并设定该 rdd 状态为 checkpointed。然后,为该 rdd 强加一个依赖,设置该 rdd 的 parent rdd 为 CheckpointRDD,该 CheckpointRDD 负责以后读取在文件系统上的 checkpoint 文件,生成该 rdd 的 partition。
读CheckPoint数据
task计算失败的时候会从checkpoint读取数据进行计算
如果一个RDD被checkpoint了,那么这个 RDD 中对分区和依赖的处理都是使用的RDD内部的checkpointRDD变量,具体实现是 ReliableCheckpointRDD 类型。这个是在 checkpoint 写流程中创建的。依赖和获取分区方法中先判断是否已经checkpoint,如果已经checkpoint了,就斩断依赖,使用ReliableCheckpointRDD,来处理依赖和获取分区。
如果没有,才往前回溯依赖。依赖就是没有依赖,因为已经斩断了依赖,获取分区数据就是读取 checkpoint 到 hdfs目录中不同分区保存下来的文件。
整个 checkpoint 读流程就完了。
29-Spark的Join
影响JOIN操作的因素
(1)数据集的大小
参与JOIN的数据集的大小会直接影响Join操作的执行效率。同样,也会影响JOIN机制的选择和JOIN的执行效率。
(2)JOIN的条件
JOIN的条件会涉及字段之间的逻辑比较。根据JOIN的条件,JOIN可分为两大类:等值连接和非等值连接。等值连接会涉及一个或多个需要同时满足的相等条件。在两个输入数据集的属性之间应用每个等值条件。当使用其他运算符(运算连接符不为**=**)时,称之为非等值连接。
JOIN的类型
- 内连接(Inner Join):仅从输入数据集中输出匹配连接条件的记录。
- 外连接(Outer Join):又分为左外连接、右外链接和全外连接。
- 半连接(Semi Join):右表只用于过滤左表的数据而不出现在结果集中。
- 交叉连接(Cross Join):交叉联接返回左表中的所有行,左表中的每一行与右表中的所有行组合。交叉联接也称作笛卡尔积,不需要关联条件
JOIN执行的5种策略
- Shuffle Hash Join
- Broadcast Hash Join
- Sort Merge Join
- Cartesian Join
- Broadcast Nested Loop Join
Shuffle Hash Join
当要JOIN的表数据量比较大时,可以选择Shuffle Hash Join。这样可以将大表进行按照JOIN的key进行重分区,保证每个相同的JOIN key都发送到同一个分区中。
Shuffle Hash Join的基本步骤主要有以下两点:
- 首先,对于两张参与JOIN的表,分别按照join key进行重分区,该过程会涉及Shuffle,其目的是将相同join key的数据发送到同一个分区,方便分区内进行join。
- 其次,对于每个Shuffle之后的分区,会将小表的分区数据构建成一个Hash table,然后根据join key与大表的分区数据记录进行匹配。
条件与特点:
- 仅支持等值连接,join key不需要排序
- 支持除了全外连接(full outer joins)之外的所有join类型
- 需要对小表构建Hash map,属于内存密集型的操作,如果构建Hash表的一侧数据比较大,可能会造成OOM
- 将参数*spark.sql.join.prefersortmergeJoin (default true)*置为false
Broadcast Hash Join
也称之为Map端JOIN。当有一张表较小时,我们通常选择Broadcast Hash Join,这样可以避免Shuffle带来的开销,从而提高性能。比如事实表与维表进行JOIN时,由于维表的数据通常会很小,所以可以使用Broadcast Hash Join将维表进行Broadcast。这样可以避免数据的Shuffle(在Spark中Shuffle操作是很耗时的),从而提高JOIN的效率。在进行 Broadcast Join 之前,Spark 需要把处于 Executor 端的数据先发送到 Driver 端,然后 Driver 端再把数据广播到 Executor 端。如果我们需要广播的数据比较多,会造成 Driver 端出现 OOM。
- Broadcast阶段 :小表被缓存在executor中
- Hash Join阶段:在每个 executor中执行Hash Join
条件与特点:
- Broadcast Hash Join相比其他的JOIN机制而言,效率更高。但是,Broadcast Hash Join属于网络密集型的操作(数据冗余传输),除此之外,需要在Driver端缓存数据,所以当小表的数据量较大时,会出现OOM的情况
- 被广播的小表的数据量要小于spark.sql.autoBroadcastJoinThreshold值,默认是10MB(10485760)
- 被广播表的大小阈值不能超过8GB,spark2.4源码如下:BroadcastExchangeExec.scala
Sort Merge Join
该JOIN机制是Spark默认的,可以通过参数spark.sql.join.preferSortMergeJoin进行配置,默认是true,即优先使用Sort Merge Join。一般在两张大表进行JOIN时,使用该方式。Sort Merge Join可以减少集群中的数据传输,该方式不会先加载所有数据的到内存,然后进行hashjoin,但是在JOIN之前需要对join key进行排序
- Shuffle Phase : 两张大表根据Join key进行Shuffle重分区
- Sort Phase: 每个分区内的数据进行排序
- Merge Phase: 对来自不同表的排序好的分区数据进行JOIN,通过遍历元素,连接具有相同Join key值的行来合并数据集
条件与特点
- 仅支持等值连接
- 支持所有join类型
- Join Keys是排序的
- 参数**spark.sql.join.prefersortmergeJoin (默认true)**设定为true
Cartesian Join
如果 Spark 中两张参与 Join 的表没指定join key(ON 条件)那么会产生 Cartesian product join,这个 Join 得到的结果其实就是两张行数的乘积。
- 仅支持内连接
- 支持等值和不等值连接
- 开启参数spark.sql.crossJoin.enabled=true
优先级为:Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join.
如何选择JOIN策略
有等值连接的情况
有join提示(hints)的情况,按照下面的顺序
- 1.Broadcast Hint:如果join类型支持,则选择broadcast hash join
- 2.Sort merge hint:如果join key是排序的,则选择 sort-merge join
- 3.shuffle hash hint:如果join类型支持, 选择 shuffle hash join
- 4.shuffle replicate NL hint: 如果是内连接,选择笛卡尔积方式
没有join提示(hints)的情况,则逐个对照下面的规则
- 1.如果join类型支持,并且其中一张表能够被广播(值,默认是10MB),则选择 broadcast hash join
- 2.如果参数spark.sql.join.preferSortMergeJoin设定为false,且一张表足够小(可以构建一个hash map) ,则选择shuffle hash join
- 3.如果join keys 是排序的,则选择sort-merge join
- 4.如果是内连接,选择 cartesian join
- 5.如果可能会发生OOM或者没有可以选择的执行策略,则最终选择broadcast nested loop join
非等值连接情况
有join提示(hints),按照下面的顺序
- 1.broadcast hint:选择bradcast nested loop join.
- 2.shuffle replicate NL hint: 如果是内连接,则选择cartesian product join
没有join提示(hints),则逐个对照下面的规则
- 1.如果一张表足够小(可以被广播),则选择 broadcast nested loop join
- 2.如果是内连接,则选择cartesian product join
- 3.如果可能会发生OOM或者没有可以选择的执行策略,则最终选择broadcast nested loop join
30-spark定位数据倾斜
数据倾斜只会发生在Shuffle过程中,常见的并且可能会出发shuffle操作的算子: distinct、groupByKey、reduceByKey、aggregateByKey、join、repartition等.出现数据倾斜时,很可能激素是代码中使用的这些算子中的某一个导致的.
定位:
如果是用yarn-client模式提交,那么本地是直接可以看到log的,可以在log中找到当前运行到了第几个stage;
如果是用yarn-cluster模式提交,则可以通过Spark Web UI(8088端口)来查看当前运行到了第几个stage。===Spark应用的运行时的端口号-4040端口
无论是使用yarn-client模式还是yarn-cluster模式,我们都可以在Spark Web UI上深入看一下当前这个stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜。
通过 Web UI 看是否数据倾斜
数据倾斜只会发生在shuffle过程中。
通过观察spark UI的界面,定位数据倾斜发生在第几个stage中。
可以在Spark Web UI上看一下当前这个stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜。
- 1段提交代码是1个Application。
- 1个action算子是1个job 。
- 1个job中,以宽依赖为分割线,划分成不同stage,stage编号从0开始 。
- 1个stage中,划分出参数指定数量的task,注意观察Locality Level和Duration列 。Duration 就是 task 的执行时间。
31-50台机器每个机器一个task
首先num-excutors 设置为50,spark.excutors.cores=1,每个Excutor给一个core,spark.task.cores设置为1,每个task可用core为1
32-如何确定task数量
task数量和数据的分区相同,一般就是HDFS上的分区的数量,shuffle后分区数的可以通过shuffle.partitons这个参数调整,调整后reduce task的数量就是这个参数配置的。
33-spark惰性计算的好处
在 Spark 的 RDD 算子中,Transformations 算子都属于惰性求值操作,仅参与 DAG 计算图的构建、指明计算逻辑,并不会被立即调度、执行。
惰性求值的特点是当且仅当数据需要被物化(Materialized)时才会触发计算的执行,RDD 的 Actions 算子提供各种数据物化操作,其主要职责在于触发整个 DAG 计算链条的执行。
当且仅当 Actions 算子触发计算时, DAG 从头至尾的所有算子(前面用于构建 DAG 的 Transformations 算子)才会按照依赖关系的先后顺序依次被调度、执行。
好处:
①不用每一个算子计算一下,减少系统开销
②在spark的内核中,可以按照DAG的逻辑顺序优化后再执行。
34-优化参数配置
spark.sql.shuffle.partitions
spark.default.parallelism只有在处理RDD时有效.spark.sql.shuffle.partitions则是只对SparkSQL有效.
spark.sql.shuffle.partitions: 设置的是 RDD1做shuffle处理后生成的结果RDD2的分区数.
shuffle partitions的数量默认为200,,这个参数是需要进行调整的而且Spark中并不会根据数据量进行动态的设置,需要我们基于数据规模进行调整。> 小数据集,200数值显然有些大,大部分时间都花费在调度,任务执行本身花费时间较小> 大数据集,200数值有些小,不能充分利用集群资源建议:建议设置初始分区的1.5-2倍之间
spark.sql.parquet.compression.codec
park.sql.parquet.compression.codec 默认是snappy。当写parquet文件的时候设置压缩格式。如果在option或者properties里配置了compression或者parquet.compression优先级依次是:compression,parquet.compression,spark.sql.parquet.compression.codec。支持的配置类型有:none,uncompressed,snappy,gzip,lzo,brotli,lz4,zstd。在hadoop2.9.0之前,zstd需要安装ZstandardCodec,brotli需要安装BrotliCodec。
spark.reducer.maxReqsInFligh
此配置限制在任何给定点获取块的远程请求数。当集群中的主机数量增加时,可能会导致到一个或多个节点的大量入站连接,从而导致worker在负载下失败。通过允许它限制fetch请求的数量,可以缓解这种情况。
spark.dynamicAllocation.minExecutors
如果启用动态分配,在executor空闲spark.dynamicAllocation.executorIdleTimeout(默认60s)之后将被释放。
spark.dynamicAllocation.minExecutors和spark.dynamicAllocation.maxExecutors分别为分配的最小及最大值,spark.dynamicAllocation.initialExecutors为初始分配的值,默认取值为minExecutors。在--num-executors参数设置后,将使用此设置的值作为动态分配executor数的初始值。
spark.default.parallelism
参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。
34-Spark容错方式
一般来说,分布式数据集的容错性有两种方式:数据检查点和记录数据的更新。
因此,Spark选择记录更新的方式。RDD只支持粗粒度转换,即只记录单个块上执行的单个操作,然后将创建RDD的一系列变换序列(每个RDD都包含了他是如何由其他RDD变换过来的以及如何重建某一块数据的信息。因此RDD的容错机制又称“血统(Lineage)”容错记录下来,以便恢复丢失的分区。
原理:
对于宽依赖,Stage计算的输入和输出在不同的节点上,对于输入节点完好,而输出节点死机的情况,通过重新计算恢复数据这种情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上追溯其祖先看是否可以重试(这就是lineage,血统的意思),窄依赖对于数据的重算开销要远小于宽依赖的数据重算开销。
根据父RDD分区是对应1个还是多个子RDD分区来区分窄依赖(父分区对应一个子分区)和宽依赖(父分区对应多个子分区)。如果对应多个,则当容错重算分区时,因为父分区数据只有一部分是需要重算子分区的,其余数据重算就造成了冗余计算。
窄依赖和宽依赖的概念主要用在两个地方:一个是容错中相当于Redo日志的功能;另一个是在调度中构建DAG作为不同Stage的划分点。
在容错机制中,如果一个节点死机了,而且运算窄依赖,则只要把丢失的父RDD分区重算即可,不依赖于其他节点。而宽依赖需要父RDD的所有分区都存在,重算就很昂贵了。可以这样理解开销的经济与否:在窄依赖中,在子RDD的分区丢失、重算父RDD分区时,父RDD相应分区的所有数据都是子RDD分区的数据,并不存在冗余计算。在宽依赖情况下,丢失一个子RDD分区重算的每个父RDD的每个分区的所有数据并不是都给丢失的子RDD分区用的,会有一部分数据相当于对应的是未丢失的子RDD分区中需要的数据,这样就会产生冗余计算开销,这也是宽依赖开销更大的原因。
35-Spark的Shuffle过程
若下一个阶段需要依赖前面阶段的所有计算结果时,则需要对前面阶段的所有计算结果进行重新整合和分类,这就需要经历shuffle过程。
shuffle操作需要将数据进行重新聚合和划分,然后分配到集群的各个节点上进行下一个stage操作,这里会涉及集群不同节点间的大量数据交换。由于不同节点间的数据通过网络进行传输时需要先将数据写入磁盘,因此集群中每个节点均有大量的文件读写操作,从而导致shuffle操作十分耗时(相对于map操作)。Spark程序中的Shuffle操作是通过shuffleManage对象进行管理。Spark目前支持的ShuffleMange模式主要有两种:HashShuffleMagnage 和SortShuffleManageShuffle操作包含当前阶段的Shuffle Write(存盘)和下一阶段的Shuffle Read(fetch),两种模式的主要差异是在Shuffle Write阶段
1、HashShuffleManager
shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个 stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“分类”。所 谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入buffer中,当buffer填满之后,才会溢写到磁盘文件中去。
那么每个执行shuffle write的task,下一个stage的 task有多少个,当前stage的每个task就要创建多少份磁盘文件。比如下一个stage总共有100个task, 那么当前stage的每个task都要创建100份磁盘文件。如果当前stage有50个task,总共有10个 Executor,每个Executor执行5个Task,那么每个Executor上总共就要创建500个磁盘文件
shuffle read,通常就是一个stage刚开始时要做的事情。此时该stage的 每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。
shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合 完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。
2、优化HashShuffleManager
spark.shuffle.consolidateFiles=true
开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘 文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件 的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个 task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件 内。
当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的 shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不 会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效 将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。
3、SortShuffleManager
在该模式下,数据会先写入一个内存数据结构中,此 时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算 子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle 算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会 判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数 据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式 分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。 BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一 次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。
一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文 件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文 件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文 件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文 件,其中标识了下游各个task的数据在文件中的start offset与end offset。
SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage 有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每 个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁 盘文件。
4.bypass机制
bypass运行机制的触发条件如下:
shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。默认200 不是聚合类的shuffle算子(比如reduceByKey)
此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash 值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢 写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的 磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经 优化的HashShuffleManager来说,shuffle read的性能会更好。
而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行 排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
36-MapRedueceShuffle与SaprkShuffle的异同
- 功能上,MR的shuffle和Spark的shuffle是没啥区别的,都是对Map端的数据进行分区,要么聚合排序,要么不聚合排序,然后Reduce端或者下一个调度阶段进行拉取数据,完成map端到reduce端的数据传输功能。
- 方案上,有很大的区别,MR的shuffle是基于合并排序的思想,在数据进入reduce端之前,都会进行sort,为了方便后续的reduce端的全局排序,而Spark的shuffle是可选择的聚合,需要通过调用特定的算子才会触发排序聚合的功能。
- 流程上,MR的Map端和Reduce区分非常明显,两块涉及到操作也是各司其职,而Spark的RDD是内存级的数据转换,不落盘,所以没有明确的划分,只是区分不同的调度阶段,不同的算子模型。
- 数据拉取,MR的reduce是直接拉去Map端的分区数据,而Spark是根据索引读取,而且是在action触发的时候才会拉去数据。
- HashShuffle,虽然MR和shuffle读都会进行HashShuffle,但是如果在shuffle读没有combine操作的时候同时分区数少于设定的阈值bypass,则不会在HashMap的时候预先对分区中所有的健值对进行merge和sort,从而省下了排序过程。
37-Saprk的内存管理机制
executor内存分配
在执行Spark 的应用程序时,Spark 集群会启动 Driver 和 Executor 两种 JVM 进程,前者为主控进程,负责创建 Spark 上下文,提交 Spark 作业(Job),并将作业转化为计算任务(Task),在各个 Executor 进程间协调任务的调度,负责在工作节点上执行具体的计算任务,并将结果返回给 Driver,同时为需要持久化的 RDD 提供存储功能。
作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。
堆内内存受到JVM统一管理,堆外内存是直接向操作系统进行内存的申请和释放。
38-一个 Spark 任务某天突然变慢怎么解决
查看执行流程-查看执行计划-分析代码-得出优化策略
处理的两个方向大概就是:
看 Spark Web UI,观察每个 Stage 的执行时间和平常是否有区别,执行时间是否超过以前的平均执行时间。看 Spark 日志,看是否有 error,网络、磁盘空间等是否存在异常。第一种主要是看数据倾斜,第二种主要是看硬件是否有问题。我们今天主要讨论数据倾斜。因为磁盘等硬件有问题或者大数据组件有问题可以直接找基础设施团队的人来解决,不用数据开发来解决。
1.数据倾斜发生的现象可能有以下两种表现:
Spark作业的大部分 task 都执行迅速,只有有限的几个 task 执行的非常慢,此时可能出现了数据倾斜,作业可以运行,但是运行得非常慢;Spark 作业的大部分 task 都执行迅速,但是有的 task 在运行过程中会突然报出 OOM,反复执行几次都在某一个 task 报出 OOM 错误,此时可能出现了数据倾斜,作业无法正常运行。2.数据倾斜发生的原理在进行 shuffle 的时候,必须将各个节点上相同的 key 拉取到某个节点上的一个 task 来进行处理,比如按照 key 进行聚合或 join 等操作。此时如果某个 key 对应的数据量特别大的话,就会发生数据倾斜。
3.定位数据倾斜发生的代码
看 Spark Web UI 的 Jobs,看程序卡在哪里。一般会有好几个 job,分位 Completed Jobs 和 Active Jobs,Active Jobs 下面就是执行较慢的 job。点开正在执行的 job,观察 Stage 下面的 task,有一个 task 执行特别慢。比较每个 task 的数据量,通过 Spark Web UI,我们可以查询到每个任务处理的数据量大小和需要的执行时间。如果这个任务处理的数据量和需要的执行时间都明显多于其他任务,就说明很可能出现了数据倾斜。
39-Saprk的AQE
Spark SQL是Spark开发中使用最广泛的引擎,它使得我们通过简单的几条SQL语句就能完成海量数据(TB或PB级数据)的分析。
AQE(Adaptive Query Execution,自适应查询执行)的作用是对正在执行的查询任务进行优化。AQE使Spark计划器在运行过程中可以检测到在满足某种条件的情况下可以进行的动态自适应规划,自适应规划会基于运行时的统计数据对正在运行的任务进行优化,从而提升性能。
一条SQL语句在执行过程中会经历如下阶段:
通过解析器把SQL语句解析为语法树;通过分析器把语法树解析为分析后的逻辑计划;通过优化器对执行计划进行优化,得到优化后的逻辑计划;逻辑计划通过计划器被转换为物理计划;物理计划在通过查询成本模型评估后,最优的那个将被执行
AQE主要用于解决如下问题
(1)统计信息过期或缺失导致估计错误。
(2)收集统计信息的代价较大。
(3)因某些谓词使用自定义UDF导致无法预估。
(4)开发人员在SQL上手动指定hints跟不上数据的变化。
AQE的工作原理
当查询任务提交后,Spark就会根据Shuffle操作将任务划分为多个查询阶段。在执行过程中,上一个查询执行完之后,系统会将查询结果保存下来,这样下一个查询就可以基于上一个查询的结果继续进行计算了。血缘lineger
SQL语句“select x, avg(y)from t group by x order by avg(y)”的执行在两个Shuffle处被划分为两个查询阶段,第一个查询阶段包括扫描(scan)、聚合(aggregate)和Shuffle操作,第二个查询阶段包括聚合和Shuffle操作,最后对数据进行排序(sort)。
查询阶段的边界是进行运行时优化的最佳时机。在查询阶段的边界处,执行间歇、分区大小、数据大小等统计信息均已产生。Spark AQE主要就是通过这些统计信息对执行计划进行优化的,流程如下。
(1)运行没有依赖的查询阶段。
(2)根据新的统计信息优化剩余的查询阶段。
(3)执行其他已经过优化且满足依赖的查询阶段。
(4)重复步骤(2)和(3),不断执行,优化再执行,直到所有查询阶段执行完。
#数据人的面试交流地##阿里巴巴##秋招#