sparkstreaming实时流处理项目(六)
1.需求增加
将有些是从搜索引擎中过来的url,做一个统计并且写入hbase
create 'imooc_course_search_clickcount','info'
scan 'imooc_course_search_clickcount'
编写一个访问程序:
package com.qianliu.dao
import com.qianliu.domain.{CourseClickCount, CourseSearchClickCount}
import com.qianliu.utils.HBaseUtils
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.util.Bytes
import scala.collection.mutable.ListBuffer
/** * 从搜索引擎过来的实战课程点击数-数据访问层 */
object CourseSearchClickCountDAO {
val tableName = "imooc_course_search_clickcount"
val cf = "info"
val qualifer = "click_count"
/** * 保存数据到HBase * * @param list CourseSearchClickCount集合 */
def save(list: ListBuffer[CourseSearchClickCount]): Unit = {
val table = HBaseUtils.getInstance().getTable(tableName)
for(ele <- list) {
table.incrementColumnValue(Bytes.toBytes(ele.day_search_course),
Bytes.toBytes(cf),
Bytes.toBytes(qualifer),
ele.click_count)
}
}
/** * 根据rowkey查询值 */
def count(day_search_course: String):Long = {
val table = HBaseUtils.getInstance().getTable(tableName)
val get = new Get(Bytes.toBytes(day_search_course))
val value = table.get(get).getValue(cf.getBytes, qualifer.getBytes)
if(value == null) {
0L
}else{
Bytes.toLong(value)
}
}
def main(args: Array[String]): Unit = {
val list = new ListBuffer[CourseSearchClickCount]
list.append(CourseSearchClickCount("20171111_www.baidu.com_8",8))
list.append(CourseSearchClickCount("20171111_cn.bing.com_9",9))
save(list)
println(count("20171111_www.baidu.com_8") + " : " + count("20171111_cn.bing.com_9"))
}
}
sparkstreaming程序增加功能:
package com.qianliu
import com.qianliu.dao.{CourseClickCountDAO, CourseSearchClickCountDAO}
import com.qianliu.domain.{ClickLog, CourseClickCount, CourseSearchClickCount}
import com.qianliu.utils.DateUtils
import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.ListBuffer
/** * Spark Streaming对接Kafka */
object KafkaStreamingApp {
def main(args: Array[String]): Unit = {
//判断输入的数据长度是否符合要去
if(args.length != 4) {
System.err.println("Usage: KafkaStreamingApp <zkQuorum> <group> <topics> <numThreads>")
}
//初始化输入的数据为Array
val Array(zkQuorum, group, topics, numThreads) = args
//初始化sparkcontext
val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount")
.setMaster("local[2]")
sparkConf.set("spark.testing.memory", "512000000")
//sparkstreamingcontext
val ssc = new StreamingContext(sparkConf, Seconds(60))
//将多个topic输入
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
// TODO... Spark Streaming如何对接Kafka
val messages = KafkaUtils.createStream(ssc, zkQuorum, group,topicMap)
// TODO... 自己去测试为什么要取第二个
//messages.map(_._2).count().print()
//第一步:数据清洗:清洗出来的数据为(ip,时间戳,课程号,http状态码,url)
val logs = messages.map(_._2)
val cleanData = logs.map(line => {
val infos = line.split("\t")
// infos(2) = "GET /class/130.html HTTP/1.1"
// url = /class/130.html
val url = infos(2).split(" ")(1)
var courseId = 0
// 把实战课程的课程编号拿到了
if (url.startsWith("/class")) {
val courseIdHTML = url.split("/")(2)
courseId = courseIdHTML.substring(0, courseIdHTML.lastIndexOf(".")).toInt
}
ClickLog(infos(0), DateUtils.parseToMinute(infos(1)), courseId, infos(3).toInt, infos(4))
}).filter(clicklog => clicklog.courseId != 0)
//打印清洗出来的日志
//cleanData.print()
//第二步,将清洗出来的数据拼接成数据库的数据类型
//统计今天到现在为止实战课程的访问量
cleanData.map(x => {
// HBase rowkey设计: 20171111_88
(x.time.substring(0, 8) + "_" + x.courseId, 1)
}).reduceByKey(_ + _).foreachRDD(rdd => {
rdd.foreachPartition(partitionRecords => {
val list = new ListBuffer[CourseClickCount]
partitionRecords.foreach(pair => {
list.append(CourseClickCount(pair._1, pair._2))
})
CourseClickCountDAO.save(list)
})
})
// 测试步骤三:统计从搜索引擎过来的今天到现在为止实战课程的访问量
cleanData.map(x => {
/** * https://www.sogou.com/web?query=Spark SQL实战 * * ==> * * https:/www.sogou.com/web?query=Spark SQL实战 */
//将www.sogou.com拿出来
val referer = x.referer.replaceAll("//", "/")
val splits = referer.split("/")
var host = ""
if(splits.length > 2) {
host = splits(1)
}
(host, x.courseId, x.time)
}).filter(_._1 != "").map(x => {
(x._3.substring(0,8) + "_" + x._1 + "_" + x._2 , 1)
}).reduceByKey(_ + _).foreachRDD(rdd => {
rdd.foreachPartition(partitionRecords => {
val list = new ListBuffer[CourseSearchClickCount]
partitionRecords.foreach(pair => {
list.append(CourseSearchClickCount(pair._1, pair._2))
})
CourseSearchClickCountDAO.save(list)
})
})
//启动spark
ssc.start()
ssc.awaitTermination()
}
}
收据收集成功!