sparkstreaming实时流处理项目(六)

1.需求增加

将有些是从搜索引擎中过来的url,做一个统计并且写入hbase

create  'imooc_course_search_clickcount','info'
scan 'imooc_course_search_clickcount'

编写一个访问程序:

package com.qianliu.dao

import com.qianliu.domain.{CourseClickCount, CourseSearchClickCount}
import com.qianliu.utils.HBaseUtils
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.util.Bytes

import scala.collection.mutable.ListBuffer

/** * 从搜索引擎过来的实战课程点击数-数据访问层 */
object CourseSearchClickCountDAO {

  val tableName = "imooc_course_search_clickcount"
  val cf = "info"
  val qualifer = "click_count"


  /** * 保存数据到HBase * * @param list CourseSearchClickCount集合 */
  def save(list: ListBuffer[CourseSearchClickCount]): Unit = {

    val table = HBaseUtils.getInstance().getTable(tableName)

    for(ele <- list) {
      table.incrementColumnValue(Bytes.toBytes(ele.day_search_course),
        Bytes.toBytes(cf),
        Bytes.toBytes(qualifer),
        ele.click_count)
    }

  }


  /** * 根据rowkey查询值 */
  def count(day_search_course: String):Long = {
    val table = HBaseUtils.getInstance().getTable(tableName)

    val get = new Get(Bytes.toBytes(day_search_course))
    val value = table.get(get).getValue(cf.getBytes, qualifer.getBytes)

    if(value == null) {
      0L
    }else{
      Bytes.toLong(value)
    }
  }

  def main(args: Array[String]): Unit = {


    val list = new ListBuffer[CourseSearchClickCount]
    list.append(CourseSearchClickCount("20171111_www.baidu.com_8",8))
    list.append(CourseSearchClickCount("20171111_cn.bing.com_9",9))

    save(list)

    println(count("20171111_www.baidu.com_8") + " : " + count("20171111_cn.bing.com_9"))
  }

}

sparkstreaming程序增加功能:

package com.qianliu

import com.qianliu.dao.{CourseClickCountDAO, CourseSearchClickCountDAO}
import com.qianliu.domain.{ClickLog, CourseClickCount, CourseSearchClickCount}
import com.qianliu.utils.DateUtils
import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable.ListBuffer

/** * Spark Streaming对接Kafka */
object KafkaStreamingApp {

  def main(args: Array[String]): Unit = {
    //判断输入的数据长度是否符合要去
    if(args.length != 4) {
      System.err.println("Usage: KafkaStreamingApp <zkQuorum> <group> <topics> <numThreads>")
    }

    //初始化输入的数据为Array
    val Array(zkQuorum, group, topics, numThreads) = args

    //初始化sparkcontext
    val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount")
      .setMaster("local[2]")
    sparkConf.set("spark.testing.memory", "512000000")
    //sparkstreamingcontext
    val ssc = new StreamingContext(sparkConf, Seconds(60))

    //将多个topic输入
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

    // TODO... Spark Streaming如何对接Kafka
    val messages = KafkaUtils.createStream(ssc, zkQuorum, group,topicMap)

    // TODO... 自己去测试为什么要取第二个
    //messages.map(_._2).count().print()

    //第一步:数据清洗:清洗出来的数据为(ip,时间戳,课程号,http状态码,url)
    val logs = messages.map(_._2)
    val cleanData = logs.map(line => {
      val infos = line.split("\t")

      // infos(2) = "GET /class/130.html HTTP/1.1"
      // url = /class/130.html
      val url = infos(2).split(" ")(1)
      var courseId = 0

      // 把实战课程的课程编号拿到了
      if (url.startsWith("/class")) {
        val courseIdHTML = url.split("/")(2)
        courseId = courseIdHTML.substring(0, courseIdHTML.lastIndexOf(".")).toInt
      }

      ClickLog(infos(0), DateUtils.parseToMinute(infos(1)), courseId, infos(3).toInt, infos(4))
    }).filter(clicklog => clicklog.courseId != 0)

    //打印清洗出来的日志
    //cleanData.print()

    //第二步,将清洗出来的数据拼接成数据库的数据类型
    //统计今天到现在为止实战课程的访问量

    cleanData.map(x => {

      // HBase rowkey设计: 20171111_88

      (x.time.substring(0, 8) + "_" + x.courseId, 1)
    }).reduceByKey(_ + _).foreachRDD(rdd => {
      rdd.foreachPartition(partitionRecords => {
        val list = new ListBuffer[CourseClickCount]

        partitionRecords.foreach(pair => {
          list.append(CourseClickCount(pair._1, pair._2))
        })

        CourseClickCountDAO.save(list)
      })
    })

    // 测试步骤三:统计从搜索引擎过来的今天到现在为止实战课程的访问量

    cleanData.map(x => {

      /** * https://www.sogou.com/web?query=Spark SQL实战 * * ==> * * https:/www.sogou.com/web?query=Spark SQL实战 */
        //将www.sogou.com拿出来
      val referer = x.referer.replaceAll("//", "/")
      val splits = referer.split("/")
      var host = ""
      if(splits.length > 2) {
        host = splits(1)
      }

      (host, x.courseId, x.time)
    }).filter(_._1 != "").map(x => {
      (x._3.substring(0,8) + "_" + x._1 + "_" + x._2 , 1)
    }).reduceByKey(_ + _).foreachRDD(rdd => {
      rdd.foreachPartition(partitionRecords => {
        val list = new ListBuffer[CourseSearchClickCount]

        partitionRecords.foreach(pair => {
          list.append(CourseSearchClickCount(pair._1, pair._2))
        })

        CourseSearchClickCountDAO.save(list)
      })
    })


    //启动spark
    ssc.start()
    ssc.awaitTermination()
  }
}

收据收集成功!

全部评论

相关推荐

黑皮白袜臭脚体育生:春节刚过就开卷吗?哈基馆,你这家伙......
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客企业服务