从saprk的wordcount解析groupBykey与RudeceByKey的源码

1.引出问题

有两个wordcount的书写方式:
1.这是个官方教程的书写wordcount

scala> sc.parallelize(List(List("who am i"),List("where you want to go","i will go with you")))
res21: org.apache.spark.rdd.RDD[List[String]] = ParallelCollectionRDD[17] at parallelize at <console>:25

scala> res21.flatMap(_.flatMap(_.split(" "))).map((_,1)).reduceByKey(_+_).collect
res22: Array[(String, Int)] = Array((am,1), (with,1), (will,1), (who,1), (go,2), (want,1), (you,2), (i,2), (to,1), (where,1))

2.下面一种是学习scala的时候学习的一种wordcount书写方式

scala> res21.flatMap(_.flatMap(_.split(" "))).map((_,1)).groupByKey.map(t=>(t._1,t._2.sum)).collect
res24: Array[(String, Int)] = Array((am,1), (with,1), (will,1), (who,1), (go,2), (want,1), (you,2), (i,2), (to,1), (where,1))

我发现两种方式都能够得到wordcount的结果,于是我就想哪一种速率比较快?

2.进行比较

2.1 groupByKey函数

scala> val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> val rdd3 = rdd1 union rdd2
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[5] at union at <console>:28

scala> rdd3.groupByKey.collect
res1: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(1, 1)), (jerry,CompactBuffer(2, 3)), (shuke,CompactBuffer(2)), (kitty,CompactBuffer(2)))

实验发现,groupByKey函数会直接聚合结果集:
1.创建聚合器
2.将k相同的v编程compactBuffer聚合成为一个元素为compactBuffer的集合
3.调用combineByKeyWithClassTag聚合起来

查看源码:根据key聚合所有的value在一个sequence中
下面补充说明了:If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using PairRDDFunctions.aggregateByKey or PairRDDFunctions.reduceByKey will provide much better performance.表示聚合操作和reduce操作,使用PairRDDFunctions.aggregateByKey和PairRDDFunctions.aggregateByKey更好!

注意:可以看到groupByKey中的combineByKeyWithClassTag中的mapSideCombine = false,表示在map端聚合的操作被禁止了,这就是groupByKey比 reduceByKey慢的原因

2.2 reduceByKey


reduce调用了combineByKeyWithClassTag函数实现聚合操作,将多个partition中的数据聚合在一起。

2.3 aggregateByKey

很明显aggregateByKey的操作就是:
1.赋初值
2.获取saprk环境,并序列化
3.调用combineByKeyWithClassTag

2.4 上面三个函数都调用了combineByKeyWithClassTag

def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope

参数说明:

  • createCombiner:map端,改变 v 的返回值类型为c,类似于初始化操作
  • mergeValue:在每个分区中的操作,类似于mapreduce中的map
  • mergeCombiners:分区聚合后的操作,类似于mapeduce中的reduce
  • partitioner:分区对象
  • mapSideCombine:默认开启map端的聚合操作
  • serializer:序列化

3.分析

3.1 reduceByKey( _ + _ )和aggregateByKey( _ + _ , _ +_ )

reduceByKey( _ + _ )和aggregateByKey( _ + _ , _ + _ )的本质是相同的:

scala> val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> pairRDD.aggregateByKey(0)(_ + _, _ + _).collect
res19: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))              

scala> pairRDD.reduceByKey( _ + _).collect
res20: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))     

他们会先在每个patiition中map求和,然后再将每个partition的结果reduce求和。

3.2 reduceByKey( _ + _ )和groupByKey()

方式1:

scala> res21.flatMap(_.flatMap(_.split(" "))).map((_,1)).reduceByKey(_+_).collect

方式2:

scala> res21.flatMap(_.flatMap(_.split(" "))).map((_,1)).groupByKey.map(t=>(t._1,t._2.sum)).collect

方式1之所以比方式2快是因为,reduce会先在每个partition中求和(并行),然后在对每个partition中的结果求和,而groupByKey.map(t=>(t._1,t._2.sum))会先聚合所有的pairRDD,然后在一个patition中对所有数据求和。原因在于groupByKey禁止了mapSideCombine。

全部评论

相关推荐

2025-12-12 19:01
南京航空航天大学 C++
秋招没咋投,准备&nbsp;wxg&nbsp;转正之后摆烂了。结果不堪字节&nbsp;HR&nbsp;的骚扰还是面了一下字节。之前想去字节的时候怎么面都挂。现在想着随便面一下结果三面技术面都意外顺利还有加面。十月中旬字节发了意向,wxg&nbsp;转正结果无响应。十月底字节拉了保温群,wxg&nbsp;口头通过,系统显示考核中。十一月初和字节&nbsp;ld&nbsp;交流之后得知&nbsp;base&nbsp;居然能选海外,甚至能小&nbsp;wlb&nbsp;一下,wxg&nbsp;无响应无人联系。十一月中旬把字节&nbsp;base&nbsp;转到了海外,wxg&nbsp;流程灰了,一问超时忘处理了,过两天又变考核中了。十一月下旬字节换了海外&nbsp;HR&nbsp;对接,问了期望薪资,wxg&nbsp;考核终于显示通过,无&nbsp;HR&nbsp;保温,无其他保温。十一月底给字节报了个天价,想吓吓他们,同时告诉微信字节要开了,微信无响应。同样十一月底字节&nbsp;HR&nbsp;告诉我确实给不到那么高,但是能拿期权补上,问能不能接受。微信无响应。同样十一月底字节&nbsp;HR&nbsp;告知了具体方案,符合预期。&nbsp;微信无响应。十二月上旬催&nbsp;wxg&nbsp;不开我就盲拒了,wxg&nbsp;HR&nbsp;火急火燎的打电话问情况,问期望。我给了一个不算夸张的总包数字,因为今年市场在涨,过了三天还不联系我,我再催,约时间下午打电话,非得在我给出的数字上压下去几万,微信又不差这点,为什么不能满足我,让我没有拒绝的理由呢?一番纠结抗争,求稳还是追求挑战,最终选择接受迎接新的挑战,因为堂吉诃德永远不会停下脚步!回想起来,在&nbsp;wxg&nbsp;谈薪的阶段,我认为并没有给予我一定的重视,即使&nbsp;HR&nbsp;表示我在实习期间的表现和之前的面评都很靠前。也没有感觉到想要争取我,虽然我表示拒了&nbsp;offer&nbsp;之后要给我加面委定&nbsp;t6&nbsp;再涨,但我三个月没面试让我面面委那就是白给,还是算了。有缘再见了我亲爱的&nbsp;wxg,再见了曾经的梦中情厂,再见亲爱的&nbsp;mt,再见亲爱的朋友们。也再见,北京的一切。我想润了。秋招结束,卸载牛客,下一个三年,下一个五年,下一个十年后再来看看。
面试中的大熊猫爱吃薯...:我嫉妒得狗眼通红
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务