跟踪wordcount源码了解spark的每个RDD执行过程

1.wordcount中的RDD关系

执行wordcount:

#初始化入参数
scala> val rdd1 = sc.parallelize(List(List("2","3,4","8,7"),List("1")))
rdd1: org.apache.spark.rdd.RDD[List[String]] = ParallelCollectionRDD[1] at parallelize at <console>:24
#wordcount代码
scala> val rdd2 = rdd1.flatMap(_.flatMap(_.split(","))).map((_,1)).reduceByKey(_+_)
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:26
#输出结果
scala> rdd2.collect
res0: Array[(String, Int)] = Array((4,1), (8,1), (2,1), (7,1), (3,1), (1,1))

查看RDD依赖关系:

scala> rdd2.toDebugString
res1: String =
(2) ShuffledRDD[4] at reduceByKey at <console>:26 []
 +-(2) MapPartitionsRDD[3] at map at <console>:26 []
    |  MapPartitionsRDD[2] at flatMap at <console>:26 []
    |  ParallelCollectionRDD[1] at parallelize at <console>:24 []

可以看出以上wordcount的执行过程中,产生了四个RDD:
(1)第一个RDD是ParallelCollectionRDD,完成数据的并行化
(2)第二个RDD是flatMap的MapPartitionsRDD
(3)第三个是map函数产生的MapPartitionsRDD
(4)最后一个是reduceByKey执行过程中,由map shuffle reduce过程中产生的ShuffledRDD

如图所示:

图中的±(2) 表示该wordcount被划分成为了两个stage:两个阶段中必须等第一个阶段的所有task都结束了才可以进行第二阶段。

2.RDD介绍

RDD在执行过程中会创建Lineage来记录信息:

RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

上面的rdd2.toDebugString代码就是显示的是Lineage记录下来的信息。

3.textFile()方法读取文件

sc.textFile(args(0)).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(identity,false).saveAsTextFile(args(1))

跟踪textFile方法

/** * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. * @param path path to the text file on a supported file system * @param minPartitions suggested minimum number of partitions for the resulting RDD * @return RDD of lines of the text file */
  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

传入参数:

  • path:传入需要读取的文件路径
  • minPartitions:有一个hadoop的默认分区数,也可以自己定义。比如hadoop默认块是128M(hadoop2.7.3以前是128M,之后变成了256M),如果传入1G文件,默认分区数量是8个分布。

执行过程:
(1)assertNotStopped():用来确保此时只有一个sparkContext。
(2)hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions):读取文件
(3)map方法

3.1跟踪hadoopFile方法:

/** Get an RDD for a Hadoop file with an arbitrary InputFormat * * @note Because Hadoop's RecordReader class re-uses the same Writable object for each * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle * operation will create many references to the same object. * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. * @param path directory to the input data files, the path can be comma separated paths * as a list of inputs * @param inputFormatClass storage format of the data to be read * @param keyClass `Class` of the key associated with the `inputFormatClass` parameter * @param valueClass `Class` of the value associated with the `inputFormatClass` parameter * @param minPartitions suggested minimum number of partitions for the resulting RDD * @return RDD of tuples of key and corresponding value */
  def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
    assertNotStopped()

    // This is a hack to enforce loading hdfs-site.xml.
    // See SPARK-11227 for details.
    FileSystem.getLocal(hadoopConfiguration)

    // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
    val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)
  }

传入的四个参数

  • path:传入读取的文件路径
  • TextInputFormat:读取文件的方式,可以用此方法实现对文件的读取方式自定义
  • LongWritable:读取的<key,value>的key格式为LongWritable
  • Text:读取的<key,value>的value格式为Text
  • minPartitions:块的数量

执行过程:
(1) assertNotStopped():确保只有一个sparkContext产生
(2)FileSystem.getLocal(hadoopConfiguration):获取配置文件
(3) val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)):将hadoop的配置文件广播到每一个spark的executor中
(4) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path):读取hadoop文件
(5)new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions)
参数:

  • confBroadcast:广播的配置文件
  • setInputPathsFunc:读取文件的job配置
  • inputFormatClass:读取文件的格式
  • keyClass:<key,value>中的key的class
  • valueClass:<key,value>中的value的class
  • minPartitions:分区数

3.2跟踪map方法

/** * Return a new RDD by applying a function to all elements of this RDD. */
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

执行过程:
(1)clean()方法执行的是清洗数据并发送到task上
(2)开启了一个MapPartitionsRDD在每个partition上map使用函数

4.flatMap()

/** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  }

与map的过程类似,只不过他是在每个partition中调用了scala的flatMap方法.

5.完整的wordcount的执行流程




窄依赖:是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如map、filter、union等操作都会产生窄依赖;(独生子女)

宽依赖:是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖;(超生)

参考文献:
https://www.cnblogs.com/qingyunzong/p/8899715.html

全部评论

相关推荐

10-27 17:26
东北大学 Java
点赞 评论 收藏
分享
评论
点赞
收藏
分享
牛客网
牛客企业服务