从saprk的wordcount解析groupBykey与RudeceByKey的源码
1.引出问题
有两个wordcount的书写方式:
1.这是个官方教程的书写wordcount
scala> sc.parallelize(List(List("who am i"),List("where you want to go","i will go with you")))
res21: org.apache.spark.rdd.RDD[List[String]] = ParallelCollectionRDD[17] at parallelize at <console>:25
scala> res21.flatMap(_.flatMap(_.split(" "))).map((_,1)).reduceByKey(_+_).collect
res22: Array[(String, Int)] = Array((am,1), (with,1), (will,1), (who,1), (go,2), (want,1), (you,2), (i,2), (to,1), (where,1))
2.下面一种是学习scala的时候学习的一种wordcount书写方式
scala> res21.flatMap(_.flatMap(_.split(" "))).map((_,1)).groupByKey.map(t=>(t._1,t._2.sum)).collect
res24: Array[(String, Int)] = Array((am,1), (with,1), (will,1), (who,1), (go,2), (want,1), (you,2), (i,2), (to,1), (where,1))
我发现两种方式都能够得到wordcount的结果,于是我就想哪一种速率比较快?
2.进行比较
2.1 groupByKey函数
scala> val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> val rdd3 = rdd1 union rdd2
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[5] at union at <console>:28
scala> rdd3.groupByKey.collect
res1: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(1, 1)), (jerry,CompactBuffer(2, 3)), (shuke,CompactBuffer(2)), (kitty,CompactBuffer(2)))
实验发现,groupByKey函数会直接聚合结果集:
1.创建聚合器
2.将k相同的v编程compactBuffer聚合成为一个元素为compactBuffer的集合
3.调用combineByKeyWithClassTag聚合起来
查看源码:根据key聚合所有的value在一个sequence中
下面补充说明了:If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using PairRDDFunctions.aggregateByKey
or PairRDDFunctions.reduceByKey
will provide much better performance.表示聚合操作和reduce操作,使用PairRDDFunctions.aggregateByKey和PairRDDFunctions.aggregateByKey更好!
注意:可以看到groupByKey中的combineByKeyWithClassTag中的mapSideCombine = false,表示在map端聚合的操作被禁止了,这就是groupByKey比 reduceByKey慢的原因
2.2 reduceByKey
reduce调用了combineByKeyWithClassTag函数实现聚合操作,将多个partition中的数据聚合在一起。
2.3 aggregateByKey
很明显aggregateByKey的操作就是:
1.赋初值
2.获取saprk环境,并序列化
3.调用combineByKeyWithClassTag
2.4 上面三个函数都调用了combineByKeyWithClassTag
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope
参数说明:
- createCombiner:map端,改变 v 的返回值类型为c,类似于初始化操作
- mergeValue:在每个分区中的操作,类似于mapreduce中的map
- mergeCombiners:分区聚合后的操作,类似于mapeduce中的reduce
- partitioner:分区对象
- mapSideCombine:默认开启map端的聚合操作
- serializer:序列化
3.分析
3.1 reduceByKey( _ + _ )和aggregateByKey( _ + _ , _ +_ )
reduceByKey( _ + _ )和aggregateByKey( _ + _ , _ + _ )的本质是相同的:
scala> val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> pairRDD.aggregateByKey(0)(_ + _, _ + _).collect
res19: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
scala> pairRDD.reduceByKey( _ + _).collect
res20: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
他们会先在每个patiition中map求和,然后再将每个partition的结果reduce求和。
3.2 reduceByKey( _ + _ )和groupByKey()
方式1:
scala> res21.flatMap(_.flatMap(_.split(" "))).map((_,1)).reduceByKey(_+_).collect
方式2:
scala> res21.flatMap(_.flatMap(_.split(" "))).map((_,1)).groupByKey.map(t=>(t._1,t._2.sum)).collect
方式1之所以比方式2快是因为,reduce会先在每个partition中求和(并行),然后在对每个partition中的结果求和,而groupByKey.map(t=>(t._1,t._2.sum))会先聚合所有的pairRDD,然后在一个patition中对所有数据求和。原因在于groupByKey禁止了mapSideCombine。