Spark-分组TOPN算法
该数据集都为:“http://bigdata.edu360.cn/laozhou” 这个样子,需求是找到每个学科下最受欢迎的老师
方法一:
/**
* 数据放到scala 集合里面进行操作
*/
object GroupFavTeacher_1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("FavTeacher").setMaster("local")
val sc = new SparkContext(conf)
//指定以后从哪里读取数据
val lines = sc.textFile(args(0))
//整理数据
val subject_teacherAndOne = lines.map(line => {
//val line = "http://bigdata.edu360.cn/laoyu"
val conSubject = line.split("/")(2)
val subject =conSubject.split("[.]")(0)
val teacher = line.split("/")(3)
((subject, teacher),1)
})
//聚合,将学科和老师联合当做key
val reduced: RDD[((String,String), Int)] = subject_teacherAndOne.reduceByKey(_+_)
//分组排序(按学科进行分组)
val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1)
//经过分组后,一个分区内可能有多个学科的数据,一个学科就是一个迭代器
//将每一个组拿出来进行操作v
//为什么可以调用sacla的sortby方法呢?因为一个学科的数据已经在一个scala集合里面了
val sorted = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(3))
val resulted = sorted.collect()
//收集
println(resulted.toBuffer)
sc.stop()
}
}
- 方法二:
/**
* 先过滤再统计计算
*/
object GroupFavTeacher_2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("GroupFavTeacher_2").setMaster("local")
val sc = new SparkContext(conf)
//val topN = args(1).toInt
val subjects = Array("bigdata", "javaee", "php")
//指定以后从哪里读取数据
val lines = sc.textFile(args(0))
//整理数据
val subject_teacherAndOne = lines.map(line => {
//val line = "http://bigdata.edu360.cn/laozhang"
val conSubject = line.split("/")(2)
val subject =conSubject.split("[.]")(0)
val teacher = line.split("/")(3)
((subject, teacher),1)
})
//聚合,将学科和老师联合当做key
val reduced: RDD[((String,String), Int)] = subject_teacherAndOne.reduceByKey(_+_)
//分组排序(按学科进行分组)
val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1)
//scala的集合排序是在内存中进行的,但是内存有可能不够用
//可以调用RDD的sortby方法,内存+磁盘进行排序
for(sb <- subjects) {
//该RDD中对应的数据仅有一个学科的数据(因为过滤过了)
val filted = grouped.filter(_._1 == sb)
//现在调用的是RDD的sortBy方法,(take是一个action,会触发任务提交)
val filtedResulted = filted.sortBy(_._2, false).take(3)
println(filtedResulted.toBuffer)
}
sc.stop()
}
}
- 方法三:
/**
*自定义分区器(k,v)
*
/
object GroupFavTeacher3 {
def main(args: Array[String]): Unit = {
val topN = args(1).toInt
val conf = new SparkConf().setAppName("GroupFavTeacher2").setMaster("local[4]")
val sc = new SparkContext(conf)
//指定以后从哪里读取数据
val lines: RDD[String] = sc.textFile(args(0))
//整理数据
val sbjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => {
val index = line.lastIndexOf("/")
val teacher = line.substring(index + 1)
val httpHost = line.substring(0, index)
val subject = new URL(httpHost).getHost.split("[.]")(0)
((subject, teacher), 1)
})
//聚合,将学科和老师联合当做key
val reduced: RDD[((String, String), Int)] = sbjectTeacherAndOne.reduceByKey(_+_)
//计算有多少学科
val subjects: Array[String] = reduced.map(_._1._1).distinct().collect()
//自定义一个分区器,并且按照指定的分区器进行分区
val sbPatitioner = new SubjectParitioner(subjects);
//partitionBy按照指定的分区规则进行分区
//调用partitionBy时RDD的Key是(String, String)
val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(sbPatitioner)
//一次拿出一个分区(可以操作一个分区中的数据了)
val sorted: RDD[((String, String), Int)] = partitioned.mapPartitions(it => {
//将迭代器转换成list,然后排序,在转换成迭代器返回
it.toList.sortBy(_._2).reverse.take(topN).iterator
})
//
val r: Array[((String, String), Int)] = sorted.collect()
println(r.toBuffer)
sc.stop()
}
}
//自定义分区器
//思想就是把每一种给一个编号,每一个编号下的分区都是该学科的数据
class SubjectParitioner(sbs: Array[String]) extends Partitioner {
//相当于主构造器(new的时候会执行一次)
//用于存放规则的一个map
val rules = new mutable.HashMap[String, Int]()
var i = 0
for(sb <- sbs) {
//rules(sb) = i
rules.put(sb, i)
i += 1
}
//返回分区的数量(下一个RDD有多少分区)
override def numPartitions: Int = sbs.length
//根据传入的key计算分区标号
//key是一个元组(String, String)
override def getPartition(key: Any): Int = {
//获取学科名称
val subject = key.asInstanceOf[(String, String)]._1
//根据规则计算分区编号
rules(subject)
}
}