从saprk的wordcount解析groupBykey与RudeceByKey的源码

1.引出问题

有两个wordcount的书写方式:
1.这是个官方教程的书写wordcount

scala> sc.parallelize(List(List("who am i"),List("where you want to go","i will go with you")))
res21: org.apache.spark.rdd.RDD[List[String]] = ParallelCollectionRDD[17] at parallelize at <console>:25

scala> res21.flatMap(_.flatMap(_.split(" "))).map((_,1)).reduceByKey(_+_).collect
res22: Array[(String, Int)] = Array((am,1), (with,1), (will,1), (who,1), (go,2), (want,1), (you,2), (i,2), (to,1), (where,1))

2.下面一种是学习scala的时候学习的一种wordcount书写方式

scala> res21.flatMap(_.flatMap(_.split(" "))).map((_,1)).groupByKey.map(t=>(t._1,t._2.sum)).collect
res24: Array[(String, Int)] = Array((am,1), (with,1), (will,1), (who,1), (go,2), (want,1), (you,2), (i,2), (to,1), (where,1))

我发现两种方式都能够得到wordcount的结果,于是我就想哪一种速率比较快?

2.进行比较

2.1 groupByKey函数

scala> val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> val rdd3 = rdd1 union rdd2
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[5] at union at <console>:28

scala> rdd3.groupByKey.collect
res1: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(1, 1)), (jerry,CompactBuffer(2, 3)), (shuke,CompactBuffer(2)), (kitty,CompactBuffer(2)))

实验发现,groupByKey函数会直接聚合结果集:
1.创建聚合器
2.将k相同的v编程compactBuffer聚合成为一个元素为compactBuffer的集合
3.调用combineByKeyWithClassTag聚合起来

查看源码:根据key聚合所有的value在一个sequence中
下面补充说明了:If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using PairRDDFunctions.aggregateByKey or PairRDDFunctions.reduceByKey will provide much better performance.表示聚合操作和reduce操作,使用PairRDDFunctions.aggregateByKey和PairRDDFunctions.aggregateByKey更好!

注意:可以看到groupByKey中的combineByKeyWithClassTag中的mapSideCombine = false,表示在map端聚合的操作被禁止了,这就是groupByKey比 reduceByKey慢的原因

2.2 reduceByKey


reduce调用了combineByKeyWithClassTag函数实现聚合操作,将多个partition中的数据聚合在一起。

2.3 aggregateByKey

很明显aggregateByKey的操作就是:
1.赋初值
2.获取saprk环境,并序列化
3.调用combineByKeyWithClassTag

2.4 上面三个函数都调用了combineByKeyWithClassTag

def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope

参数说明:

  • createCombiner:map端,改变 v 的返回值类型为c,类似于初始化操作
  • mergeValue:在每个分区中的操作,类似于mapreduce中的map
  • mergeCombiners:分区聚合后的操作,类似于mapeduce中的reduce
  • partitioner:分区对象
  • mapSideCombine:默认开启map端的聚合操作
  • serializer:序列化

3.分析

3.1 reduceByKey( _ + _ )和aggregateByKey( _ + _ , _ +_ )

reduceByKey( _ + _ )和aggregateByKey( _ + _ , _ + _ )的本质是相同的:

scala> val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> pairRDD.aggregateByKey(0)(_ + _, _ + _).collect
res19: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))              

scala> pairRDD.reduceByKey( _ + _).collect
res20: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))     

他们会先在每个patiition中map求和,然后再将每个partition的结果reduce求和。

3.2 reduceByKey( _ + _ )和groupByKey()

方式1:

scala> res21.flatMap(_.flatMap(_.split(" "))).map((_,1)).reduceByKey(_+_).collect

方式2:

scala> res21.flatMap(_.flatMap(_.split(" "))).map((_,1)).groupByKey.map(t=>(t._1,t._2.sum)).collect

方式1之所以比方式2快是因为,reduce会先在每个partition中求和(并行),然后在对每个partition中的结果求和,而groupByKey.map(t=>(t._1,t._2.sum))会先聚合所有的pairRDD,然后在一个patition中对所有数据求和。原因在于groupByKey禁止了mapSideCombine。

全部评论

相关推荐

11-24 00:11
已编辑
广东工业大学 算法工程师
避雷深圳&nbsp;&nbsp;yidao,试用期&nbsp;6&nbsp;个月。好嘛,试用期还没结束,就直接告诉你尽快找下一家吧,我谢谢您嘞
牛客75408465号:笑死,直属领导和 hr 口径都没统一,各自说了一些离谱的被裁理由,你们能不能认真一点呀,哈哈哈哈哈😅😅😅
点赞 评论 收藏
分享
10-07 20:48
门头沟学院 Java
听说改名就会有offer:可能是实习上着班想到后面还要回学校给导师做牛马,看着身边都是21-25的年纪,突然emo了了
点赞 评论 收藏
分享
评论
点赞
收藏
分享
牛客网
牛客企业服务