跟踪wordcount源码了解spark的每个RDD执行过程
1.wordcount中的RDD关系
执行wordcount:
#初始化入参数
scala> val rdd1 = sc.parallelize(List(List("2","3,4","8,7"),List("1")))
rdd1: org.apache.spark.rdd.RDD[List[String]] = ParallelCollectionRDD[1] at parallelize at <console>:24
#wordcount代码
scala> val rdd2 = rdd1.flatMap(_.flatMap(_.split(","))).map((_,1)).reduceByKey(_+_)
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:26
#输出结果
scala> rdd2.collect
res0: Array[(String, Int)] = Array((4,1), (8,1), (2,1), (7,1), (3,1), (1,1))
查看RDD依赖关系:
scala> rdd2.toDebugString
res1: String =
(2) ShuffledRDD[4] at reduceByKey at <console>:26 []
+-(2) MapPartitionsRDD[3] at map at <console>:26 []
| MapPartitionsRDD[2] at flatMap at <console>:26 []
| ParallelCollectionRDD[1] at parallelize at <console>:24 []
可以看出以上wordcount的执行过程中,产生了四个RDD:
(1)第一个RDD是ParallelCollectionRDD,完成数据的并行化
(2)第二个RDD是flatMap的MapPartitionsRDD
(3)第三个是map函数产生的MapPartitionsRDD
(4)最后一个是reduceByKey执行过程中,由map shuffle reduce过程中产生的ShuffledRDD
如图所示:
图中的±(2) 表示该wordcount被划分成为了两个stage:两个阶段中必须等第一个阶段的所有task都结束了才可以进行第二阶段。
2.RDD介绍
RDD在执行过程中会创建Lineage来记录信息:
RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区
。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
上面的rdd2.toDebugString代码就是显示的是Lineage记录下来的信息。
3.textFile()方法读取文件
sc.textFile(args(0)).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(identity,false).saveAsTextFile(args(1))
跟踪textFile方法
/** * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. * @param path path to the text file on a supported file system * @param minPartitions suggested minimum number of partitions for the resulting RDD * @return RDD of lines of the text file */
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
传入参数:
- path:传入需要读取的文件路径
- minPartitions:有一个hadoop的默认分区数,也可以自己定义。比如hadoop默认块是128M(hadoop2.7.3以前是128M,之后变成了256M),如果传入1G文件,默认分区数量是8个分布。
执行过程:
(1)assertNotStopped():用来确保此时只有一个sparkContext。
(2)hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions):读取文件
(3)map方法
3.1跟踪hadoopFile方法:
/** Get an RDD for a Hadoop file with an arbitrary InputFormat * * @note Because Hadoop's RecordReader class re-uses the same Writable object for each * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle * operation will create many references to the same object. * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. * @param path directory to the input data files, the path can be comma separated paths * as a list of inputs * @param inputFormatClass storage format of the data to be read * @param keyClass `Class` of the key associated with the `inputFormatClass` parameter * @param valueClass `Class` of the value associated with the `inputFormatClass` parameter * @param minPartitions suggested minimum number of partitions for the resulting RDD * @return RDD of tuples of key and corresponding value */
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// This is a hack to enforce loading hdfs-site.xml.
// See SPARK-11227 for details.
FileSystem.getLocal(hadoopConfiguration)
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
传入的四个参数
- path:传入读取的文件路径
- TextInputFormat:读取文件的方式,可以用此方法实现对文件的读取方式自定义
- LongWritable:读取的<key,value>的key格式为LongWritable
- Text:读取的<key,value>的value格式为Text
- minPartitions:块的数量
执行过程:
(1) assertNotStopped():确保只有一个sparkContext产生
(2)FileSystem.getLocal(hadoopConfiguration):获取配置文件
(3) val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)):将hadoop的配置文件广播到每一个spark的executor中
(4) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path):读取hadoop文件
(5)new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions)
参数:
- confBroadcast:广播的配置文件
- setInputPathsFunc:读取文件的job配置
- inputFormatClass:读取文件的格式
- keyClass:<key,value>中的key的class
- valueClass:<key,value>中的value的class
- minPartitions:分区数
3.2跟踪map方法
/** * Return a new RDD by applying a function to all elements of this RDD. */
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
执行过程:
(1)clean()方法执行的是清洗数据并发送到task上
(2)开启了一个MapPartitionsRDD在每个partition上map使用函数
4.flatMap()
/** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
与map的过程类似,只不过他是在每个partition中调用了scala的flatMap方法.
5.完整的wordcount的执行流程
窄依赖:是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如map、filter、union等操作都会产生窄依赖;(独生子女)
宽依赖:是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖;(超生)