Spark
重要概念
架构
- Standalone模式下才有Master和Worker的叫法。
- Master:类似
YARN
中的ResourceManager
。作用:①监控Worker
,接收对Worker
的注册;②接收Client
提交的Application,调度等待的Application并向Worker提交。 - Worker:类似
YARN
中的NodeManager
,掌管着所在节点的资源信息。作用:①通过RegisterWorker
注册到Master
;②发送心跳信息;③根据Master发送的Application配置进程环境,启动ExecutorBackend
(执行Task所需的临时进程)。 - Executor:
SparkContext
对象一旦连接到集群管理器,就可以获取到集群中每个节点上的Executor。Executor是一个进程(进程名:ExecutorBackend,运行在Work节点上),用来执行计算和为应用程序存储数据,然后Spark会发送应用程序代码(比如Jar包)到每个Ececutor,最后SparkContext对象发送任务到执行器开始执行程序。 - Driver Program:运行应用程序的main()函数并创建SparkContext的线程。每个Spark应用程序都包括一个驱动程序,驱动程序负责把并行操作发布到集群中。Driver会将用户程序划分为不同的执行阶段,每个执行阶段由一组完全相同的Task组成,这些Task分别作用于待处理数据的不同分区。在阶段划分完成和Task创建后,Driver会向Executor发送Task。
重要类:
SparkContext
:负责连接到集群管理器。
RDD(弹性分布式数据集)
- RDD类中的特点
- 分区列表,即 一个RDD中有多个分区,每个分区中有多条数据,内部有方法
getPartitions: Array[Partition]
,默认等于核心数 - 切片
- 依赖列表,每个RDD都会依赖于其他RDD,内部有方法
getDependencies: Seq[Dependency[_]]
,窄依赖(NarrowDependency)分为OneToOneDependency
和RangeDependency
,宽依赖,没有对应的类,就只有ShuffleDependency
。并且宽依赖一定会有shuffle阶段。 - RDD是一个泛型类
- 分区列表,即 一个RDD中有多个分区,每个分区中有多条数据,内部有方法
- RDD类中的特点
弹性:
- 存储的弹性:内存和磁盘的自动切换
- 容错的弹性:数据丢失可以自动恢复
- 计算的弹性:计算出错重试机制,即出错时可能会切换Executor
- 分片的弹性:可根据需要重新分片
- 只读
- RDD是只读的
- RDD包括两类算子
- transformation:将RDD进行转化,构建RDD的血缘关系
- action:触发RDD的计算,得到结果或者保存文件系统,比如
count
、collect
、savaAsTextFile
- 依赖:RDDs通过操作算子开始进行转换,转换之后得到新的RDD包含从其他RDDs衍生所必须的信息,RDDs之间维护者这种血缘关系
- 隐式转换:RDD的伴生类中有一些隐式转换,
转换算子
序列
单值
map
和mapPartitions
的区别:map
是对每个分区内的每个元素都执行,mapPartitions
是对每个分区都执行一次flatMap
:一进多出,即flatMap
中的方法返回的应该是一个可遍历的集合glom
:将分区中的所有元素合并成一个Array
多值
zip
:将两个RDD中索引相同的值合并为一个二元组
键值对
单值
partitionBy
:重分区reduceByKey
:聚合算子,在shuffle之前聚合(即预聚合),并且分区内和分区间聚合逻辑相同groupByKey
:分组算子,foldByKey
:聚合算子,可以有0值,在分区内会先进行聚合,并且分区内和分区间聚合逻辑相同aggregateByKey
:聚合算子,三个参数,可以有0值,分区内调用func1,分区间调用func2combineByKey
:聚合算子,比aggregateByKey
更加灵活sortByKey
:对key进行排序mapValues
:对value进行map操作
多值
join
和leftOuterJoin
和rightOuterJoin
:
行动算子
- collect
- count
- take
- takeOrdered
- countByKey
- reduce:聚合,无0值,分区内和分区间相同
- fold:聚合,有0值,分区内和分区间相同
- aggregate:聚合,有0值,分区内和分区间不同
- foreach:这个遍历是在executor上完成的,
概念:
- Application:一个Driver Program一个Application。基于Spark 构建的用户程序。 由集群上的驱动程序和执行程序组成。
- Job:一个Application中,每个Job包含多个Stage,每个action会产生一个Job。由多个Task组成的并行计算,这些任务响应 Spark action而产生;你会在驱动程序的日志中看到这个术语。
- Stage:每个宽依赖会产生一个新stage,也和分区器有关系(如果分区器相同,不产生新的Stage)。每个Job被划分为更小的task集,称为stage,彼此依赖(类似于MapReduce中的map和reduce阶段);您将在驱动程序日志中看到这个术语。
- Task:一个Stage包含多个Task,Task是一个线程,是执行代码的最小单位,Task和分区数相等。将发送给一个Executor的工作单元去执行。
RDD缓存级别,聚合算子默认会进行缓存
划分DAG:
- 首先,根据依赖关系的不同将DAG划分为不同的阶段(Stage)。
- 对于窄依赖,由于Partition依赖关系的确定性,Partition的转换处理就可以在同一个线程里完成,窄依赖被Spark划分到同一个执行阶段;
- 对于宽依赖,由于Shuffle的存在,只能在parent RDD(s)Shuffle处理完成后,才能开始接下来的计算,因此宽依赖就是Spark划分Stage的依据,即Spark根据宽依赖将DAG划分为不同的Stage。
- 在一个Stage内部,每个Partition都会被分配一个计算任务(Task),这些Task是可以并行执行的。
- Stage之间根据依赖关系变成了一个大粒度的DAG,这个DAG的执行顺序也是从前向后的。也就是说,Stage只有在它没有parent Stage或者parent Stage都已经执行完成后,才可以执行。
- 举例
- 首先,根据依赖关系的不同将DAG划分为不同的阶段(Stage)。
分区算法:
代码
private object ParallelCollectionRDD { /** * Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range * collections specially, encoding the slices as other Ranges to minimize memory cost. This makes * it efficient to run Spark over RDDs representing large sets of numbers. And if the collection * is an inclusive Range, we use inclusive range for the last slice. */ def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = { if (numSlices < 1) { throw new IllegalArgumentException("Positive number of slices required") } def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { (0 until numSlices).iterator.map { i => val start = ((i * length) / numSlices).toInt val end = (((i + 1) * length) / numSlices).toInt (start, end) } } seq match { case r: Range => positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) => // If the range is inclusive, use inclusive range for the last slice if (r.isInclusive && index == numSlices - 1) { new Range.Inclusive(r.start + start * r.step, r.end, r.step) } else { new Range(r.start + start * r.step, r.start + end * r.step, r.step) } }.toSeq.asInstanceOf[Seq[Seq[T]]] case nr: NumericRange[_] => // For ranges of Long, Double, BigInteger, etc val slices = new ArrayBuffer[Seq[T]](numSlices) var r = nr for ((start, end) <- positions(nr.length, numSlices)) { val sliceSize = end - start slices += r.take(sliceSize).asInstanceOf[Seq[T]] r = r.drop(sliceSize) } slices // 如果是一个Array,会将Array分割为多个Seq case _ => val array = seq.toArray positions(array.length, numSlices).map { case (start, end) => array.slice(start, end).toSeq }.toSeq } } }