网站流量统计项目
网站流量统计项目总结以及搭建流程
一:架构图
二: 项目搭建流程
- 搭建埋点服务器FluxAPPServer
- 在Eclipse中搭建DynamicWeb项目,添加前段代码,此处是用户行为数据的产生地,注意此处需要在前端代码中埋点。
- 埋点:指的是在服务器前端代码中添加JS代码块,用于收集用户行为数据,并将这些数据发送到FluxLogServer日志服务器中去。
- JS代码如下:
/**函数可对字符串进行编码,这样就可以在所有的计算机上读取该字符串。*/ function ar_encode(str) { //进行URL编码 return encodeURI(str); } /**屏幕分辨率*/ function ar_get_screen() { var c = ""; if (self.screen) { c = screen.width+"x"+screen.height; } return c; } /**颜色质量*/ function ar_get_color() { var c = ""; if (self.screen) { c = screen.colorDepth+"-bit"; } return c; } /**返回当前的浏览器语言*/ function ar_get_language() { var l = ""; var n = navigator; if (n.language) { l = n.language.toLowerCase(); } else if (n.browserLanguage) { l = n.browserLanguage.toLowerCase(); } return l; } /**返回浏览器类型IE,Firefox*/ function ar_get_agent() { var a = ""; var n = navigator; if (n.userAgent) { a = n.userAgent; } return a; } /**方法可返回一个布尔值,该值指示浏览器是否支持并启用了Java*/ function ar_get_jvm_enabled() { var j = ""; var n = navigator; j = n.javaEnabled() ? 1 : 0; return j; } /**返回浏览器是否支持(启用)cookie */ function ar_get_cookie_enabled() { var c = ""; var n = navigator; c = n.cookieEnabled ? 1 : 0; return c; } /**检测浏览器是否支持Flash或有Flash插件*/ function ar_get_flash_ver() { var f="",n=navigator; if (n.plugins && n.plugins.length) { for (var ii=0;ii<n.plugins.length;ii++) { if (n.plugins[ii].name.indexOf('Shockwave Flash')!=-1) { f=n.plugins[ii].description.split('Shockwave Flash ')[1]; break; } } } else if (window.ActiveXObject) { for (var ii=10;ii>=2;ii--) { try { var fl=eval("new ActiveXObject('ShockwaveFlash.ShockwaveFlash."+ii+"');"); if (fl) { f=ii + '.0'; break; } } catch(e) {} } } return f; } /**匹配顶级域名*/ function ar_c_ctry_top_domain(str) { var pattern = "/^aero$|^cat$|^coop$|^int$|^museum$|^pro$|^travel$|^xxx$|^com$|^net$|^gov$|^org$|^mil$|^edu$|^biz$|^info$|^name$|^ac$|^mil$|^co$|^ed$|^gv$|^nt$|^bj$|^hz$|^sh$|^tj$|^cq$|^he$|^nm$|^ln$|^jl$|^hl$|^js$|^zj$|^ah$|^hb$|^hn$|^gd$|^gx$|^hi$|^sc$|^gz$|^yn$|^xz$|^sn$|^gs$|^qh$|^nx$|^xj$|^tw$|^hk$|^mo$|^fj$|^ha$|^jx$|^sd$|^sx$/i"; if(str.match(pattern)){ return 1; } return 0; } /**处理域名地址*/ function ar_get_domain(host) { //如果存在则截去域名开头的 "www." var d=host.replace(/^www\./, ""); //剩余部分按照"."进行split操作,获取长度 var ss=d.split("."); var l=ss.length; //如果长度为3,则为xxx.yyy.zz格式 if(l == 3){ //如果yyy为顶级域名,zz为次级域名,保留所有 if(ar_c_ctry_top_domain(ss[1]) && ar_c_ctry_domain(ss[2])){ } //否则只保留后两节 else{ d = ss[1]+"."+ss[2]; } } //如果长度大于3 else if(l >= 3){ //如果host本身是个ip地址,则直接返回该ip地址为完整域名 var ip_pat = "^[0-9]*\.[0-9]*\.[0-9]*\.[0-9]*$"; if(host.match(ip_pat)){ return d; } //如果host后两节为顶级域名及次级域名,则保留后三节 if(ar_c_ctry_top_domain(ss[l-2]) && ar_c_ctry_domain(ss[l-1])) { d = ss[l-3]+"."+ss[l-2]+"."+ss[l-1]; } //否则保留后两节 else{ d = ss[l-2]+"."+ss[l-1]; } } return d; } /**返回cookie信息*/ function ar_get_cookie(name) { //获取所有cookie信息 var co=document.cookie; //如果名字是个空 返回所有cookie信息 if (name == "") { return co; } //名字不为空 则在所有的cookie中查找这个名字的cookie var mn=name+"="; var b,e; b=co.indexOf(mn); //没有找到这个名字的cookie 则返回空 if (b < 0) { return ""; } //找到了这个名字的cookie 获取cookie的值返回 e=co.indexOf(";", b+name.length); if (e < 0) { return co.substring(b+name.length + 1); } else { return co.substring(b+name.length + 1, e); } } /** 设置cookie信息 操作符: 0 表示不设置超时时间 cookie是一个会话级别的cookie cookie信息保存在浏览器内存当中 浏览器关闭时cookie消失 1 表示设置超时时间为10年以后 cookie会一直保存在浏览器的临时文件夹里 直到超时时间到来 或用户手动清空cookie为止 2 表示设置超时时间为1个小时以后 cookie会一直保存在浏览器的临时文件夹里 直到超时时间到来 或用户手动清空cookie为止 * */ function ar_set_cookie(name, val, cotp) { var date=new Date; var year=date.getFullYear(); var hour=date.getHours(); var cookie=""; if (cotp == 0) { cookie=name+"="+val+";"; } else if (cotp == 1) { year=year+10; date.setYear(year); cookie=name+"="+val+";expires="+date.toGMTString()+";"; } else if (cotp == 2) { hour=hour+1; date.setHours(hour); cookie=name+"="+val+";expires="+date.toGMTString()+";"; } var d=ar_get_domain(document.domain); if(d != ""){ cookie +="domain="+d+";"; } cookie +="path="+"/;"; document.cookie=cookie; } /**返回客户端时间*/ function ar_get_stm() { return new Date().getTime(); } /**返回指定个数的随机数字串*/ function ar_get_random(n) { var str = ""; for (var i = 0; i < n; i ++) { str += String(parseInt(Math.random() * 10)); } return str; } /* main function */ function ar_main() { //收集完日志 指定数据提交到的路径(FluxLogServer日志处理服务器) var dest_path = "http://localhost:8080/FluxLogServer/servlet/LogServlet?"; var expire_time = 30 * 60 * 1000;//会话超时时长 //处理uv //--获取cookie ar_stat_uv的值 var uv_str = ar_get_cookie("ar_stat_uv"); var uv_id = ""; //--如果cookie ar_stat_uv的值为空 if (uv_str == ""){ //--为这个新uv配置id,为一个长度20的随机数字 uv_id = ar_get_random(20); //--设置cookie ar_stat_uv 保存时间为10年 ar_set_cookie("ar_stat_uv", uv_id, 1); } //--如果cookie ar_stat_uv的值不为空 else{ //--获取uv_id uv_id = uv_str; } //处理ss //--获取cookie ar_stat_ss var ss_str = ar_get_cookie("ar_stat_ss"); var ss_id = ""; //sessin id var ss_no = 0; //session有效期内访问页面的次数 //--如果cookie中不存在ar_stat_ss 说明是一次新的会话 if (ss_str == ""){ //--随机生成长度为10的session id ss_id = ar_get_random(10); //--session有效期内页面访问次数为0 ss_no = 0; //--拼接cookie ar_stat_ss 值 格式为 会话编号_会话期内访问次数_客户端时间_网站id value = ss_id+"_"+ss_no+"_"+ar_get_stm(); //--设置cookie ar_stat_ss ar_set_cookie("ar_stat_ss", value, 0); } //--如果cookie中存在ar_stat_ss else { //获取ss相关信息 var items = ss_str.split("_"); //--ss_id var cookie_ss_id = items[0]; //--ss_no var cookie_ss_no = parseInt(items[1]); //--ss_stm var cookie_ss_stm = items[2]; //如果当前时间-当前会话上一次访问页面的时间>30分钟,虽然cookie还存在,但是其实已经超时了!仍然需要重新生成cookie if (ar_get_stm() - cookie_ss_stm > expire_time) { //--重新生成会话id ss_id = ar_get_random(10); //--设置会话中的页面访问次数为0 ss_no = 0; } //--如果会话没有超时 else{ //--会话id不变 ss_id = cookie_ss_id; //--设置会话中的页面方位次数+1 ss_no = cookie_ss_no + 1; } //--重新拼接cookie ar_stat_ss的值 value = ss_id+"_"+ss_no+"_"+ar_get_stm(); ar_set_cookie("ar_stat_ss", value, 0); } //当前地址 var url = document.URL; url = ar_encode(String(url)); //当前资源名 var urlname = document.URL.substring(document.URL.lastIndexOf("/")+1); urlname = ar_encode(String(urlname)); //返回导航到当前网页的超链接所在网页的URL var ref = document.referrer; ref = ar_encode(String(ref)); //网页标题 var title = document.title; title = ar_encode(String(title)); //网页字符集 var charset = document.charset; charset = ar_encode(String(charset)); //屏幕信息 var screen = ar_get_screen(); screen = ar_encode(String(screen)); //颜色信息 var color =ar_get_color(); color =ar_encode(String(color)); //语言信息 var language = ar_get_language(); language = ar_encode(String(language)); //浏览器类型 var agent =ar_get_agent(); agent =ar_encode(String(agent)); //浏览器是否支持并启用了java var jvm_enabled =ar_get_jvm_enabled(); jvm_enabled =ar_encode(String(jvm_enabled)); //浏览器是否支持并启用了cookie var cookie_enabled =ar_get_cookie_enabled(); cookie_enabled =ar_encode(String(cookie_enabled)); //浏览器flash版本 var flash_ver = ar_get_flash_ver(); flash_ver = ar_encode(String(flash_ver)); //当前ss状态 格式为"会话id_会话次数_当前时间" var stat_ss = ss_id+"_"+ss_no+"_"+ar_get_stm(); //拼接访问地址 增加如上信息 dest=dest_path+"url="+url+"&urlname="+urlname+"&title="+title+"&chset="+charset+"&scr="+screen+"&col="+color+"&lg="+language+"&je="+jvm_enabled+"&ce="+cookie_enabled+"&fv="+flash_ver+"&cnv="+String(Math.random())+"&ref="+ref+"&uagent="+agent+"&stat_uv="+uv_id+"&stat_ss="+stat_ss; //数据封装到<img>标签中 通过图片将数据发送给后台 document.getElementsByTagName("body")[0].innerHTML += "<img src=\""+dest+"\" border=\"0\" width=\"1\" height=\"1\" />"; } //一旦打开页面就触发main方法 开始收集数据 window.onload = function(){ //触发main方法 ar_main(); }
2.搭建FluxLogServer服务器
- 同样是DynamicWeb项目,添加到Tomcat中
- 该项目主要是要实现接收从FluxAppServer发送过来的数据,并对数据做简单处理后打出日志,同时于此处整合Flume,通过Flume收集此处的日志信息
- 项目代码如下
import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.log4j.Logger; public class LogServlet extends HttpServlet { //此处引入log4j的实例 private Logger loger=Logger.getLogger(LogServlet.class); //处理Get请求 public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { //0.测试是否接受到了 FluxAppServer发送过来的请求消息 String info = URLDecoder.decode(request.getQueryString(),"UTF-8"); System.out.println(info); /** 接收到的请求为: url=http://localhost:8080/FluxAppServer/b.jsp&urlname=b.jsp&title=页面B&chset=UTF-8&scr=1920x1080&col=24-bit&lg=zh-cn&je=0&ce=1&fv=&cnv=0.9455173433464339&ref=&uagent=Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36&stat_uv=46611240728069970531&stat_ss=1209855703_16_1569036737318 */ //1.处理数据,返回的数据类型为: 去除属性名 只要属性值 属性值之间用|进行分割 //1.1当前接受到的数据格式:属性名=属性值&属性名=属性值 String[] kvs = info.split("&"); StringBuffer buffer = new StringBuffer(); //1.2处理每一个属性键值对 后保存到Buffer中 for (String kv : kvs) { //ref属性可能会有多个 会导致越界 //处理越界问题: 项目练习只取第一个ref String value = kv.split("=").length==2?kv.split("=")[1]:""; buffer.append(value + "|"); } //1.3拼接用户的IP地址 buffer.append(request.getRemoteAddr()); //1.4打日志:指定info记录级别(级别低->高: debug,info,warn,error,fatal)//*Log代码+logfj的配置文件 System.out.println("buffer " + buffer); /** * bufferhttp://localhost:8080/FluxAppServer/b.jsp|b.jsp|页面B|UTF-8|1920x1080|24-bit|zh-cn|0|1||0.9455173433464339||Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36|46611240728069970531|1209855703_16_1569036737318|0:0:0:0:0:0:0:1 */ loger.info(buffer); } public void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { doGet(request, response); } }
- 添加log4j配置文件,配置日志和Flume相关信息
#设置日志输出级别为 log4j.rootLogger = info,stdout,flume #日志打印输出到控制台 log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target = System.out log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern = %m%n #日志发送给Flume 配置Flume相关信息 log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = hadoop01 log4j.appender.flume.Port = 44444 log4j.appender.flume.UnsafeMode = true
3.搭建Flume服务器
- 结构描述:扇出分流,一支流向HDFS以便日志文件的生成,一支流向Kafka以便数据实时处理
- 配置Flume的启动文件如下(data下的配置文件)
a1.sources=s1 a1.channels=c1 c2 a1.sinks=k1 k2 # 由于数据是从服务器JAVA程序过来的,此处描述source类型为avro a1.sources.s1.type=avro #此处的bind与port要与上述FluxLogServer项目中的log4j配置文件中的属性保持一致,以便数据交接 a1.sources.s1.bind=10.42.170.167 a1.sources.s1.port=44444 #为了后期再HDFS中生成的日志文件是以日期进行分类的,此处接收数据的时候应该添加时间戳 a1.sources.s1.interceptors=i1 a1.sources.s1.interceptors.i1.type=timestamp #描述channel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 a1.channels.c2.type=memory a1.channels.c2.capacity=1000 a1.channels.c2.transactionCapacity=100 #描述流向HDFS的sink a1.sinks.k1.type=hdfs #指定日志文件在HDFS中生成的路径 时间是自动获取Linux系统的时间 a1.sinks.k1.hdfs.path=hdfs://hadoop01:9000/weblog/reporttime=%Y-%m-%d #文件生成的间隔时间:一小时生成一次新文件 (s) a1.sinks.k1.hdfs.rollInterval = 3600 #生成的文件大小: 文件超过多大会生成一次新文件 0KB 不按文件大小来生成 s1.sinks.k1.hdfs.rollSize = 0 #每写几条数据就生成文件:0表示不按数据写入条数来生成 s1.sinks.k1.hdfs.rollCount=0 #表示文件是普通文本类型 a1.sinks.k1.hdfs.fileType = DataStream #描述流向Kafka的sink a1.sinks.k2.type=org.apache.flume.sink.kafka.KafkaSink a1.sinks.k2.brokerList=hadoop01:9092 a1.sinks.k2.topic=weblog #绑定 a1.sources.s1.channels=c1 c2 a1.sinks.k1.channel=c1 a1.sinks.k2.channel=c2
- 注意要点:
- Source为AVRO
- 两个sink,一个链接Kafka以便数据后期给Spark实时处理,一个链接HDFS以便后期数据可以进行离线批处理
- HDFS相关配置的注意点:
- 日志文件输出的时候需要添加 linux自动生成的时间 以便日志文件最后在HDFS是以每天日期来分区的
- 配置Flume自动生成日志文件到HDFS
- 测试HDFS是否成功收集日志:
- 日志数据提供给HDFS:日志数据传输到HDFS后数据会文件的形式保存到HDFS中
- 功能测试:
- Flume配置文件配置好之后启动Flume
- 启动Hadoop
- 启动ZK集群和Kafka
- 访问目标页面产生用户行为数据后查看Flume是否有数据变动,查看Hadoop中是否有结果文件生成
4.Hive清理HDFS收集到的日志文件
1.Hive总表构建
- 总表的作用:接收HDFS中的FLume日志数据
- 总表构建:启动hive,创建数据库,在数据库中创建外部表
- 总表结构:
- 建表语句:
create external table flux (url string,urlname string,title string,chset string,scr string,col string,lg string,je string,ec string,fv string,cn string,ref string,uagent string,stat_uv string,stat_ss string,cip string) PARTITIONED BY (reporttime string) row format delimited fields terminated by '|' location '/weblog001';
- 注意:location指向了外部表保存在HDFS中的位置
- 由于之前FLume日志写出的位置为
此处意味着,Flume输出到HDFS中的日志文件所在文件夹就是HIVE外部表的位置
此时要将日志数据导入到外部表只需要添加Hive分区,分区为reporttime='当天时间'
- 在HIVE的bin下编写Shell脚本,脚本主要功能为:
- 自动为HIVE外部表Flux 总表添加分区,分区为系统当天时间;
- 添加Linux定时任务,使得HDFS当天日志文件内容能够插入到对应HIVE总表对应分区中;
脚本如下:
auto.sh
#!/bin/sh source /etc/profile; today=$(date +%Y-%m-%d) /home/presoftware/hive/bin/hive<<EOF use weblog001; alter table flux add partition(reporttime='$today');添加定时任务:(corntab工具)
0 0 * * * ./home/presoftware/hive/bin/auto.sh
Linux一到 0点执行自动任务,为hive的Flux表格添加分区 reportitme=XX,当天用户访问页面,然后当天日志数据就会由Flume保存到hdfs中的weblog001/reporttime=XX中,同时这些数据也会被hive外部表Flux引用插入到对应的表格分区 reporttime=XX 中;
2.Hive数据清洗(清洗表dataclear 业务细表tongji)
dataclear数据清洗表:
- 建表语句:
create table dataclear (reportTime string,url string,urlname string,uvid string,ssid string,sscount string,sstime string,cip string) row format delimited fields terminated by '|';
- 数据插入:
>insert overwrite table dataclear select reporttime,url,urlname,stat_uv,split(stat_ss,"_")[0],split(stat_ss,"_")[1],split(stat_ss,"_")[2],cip from flux;
- tongji业务表:
- 业务指标以及统计HQL:
1)pv:统计 页面点击数 hive>select count(*) as pv from dataclear where reportTime = '2019-09-23'; 2)uv uv - 独立访客数 - 一天之内所有的访客的数量 - 一天之内uvid去重后的总数 hive>select count(distinct uvid) as uv from dataclear where reportTime = '2019-09-23'; 3)vv vv - 独立会话数 - 一天之内所有的会话的数量 - 一天之内ssid去重后的总数 hive>select count(distinct ssid) as vv from dataclear where reportTime = '2019-09-23'; 4)br br - 跳出率 - 一天内跳出的会话总数/会话总数 hive> select round(br_taba.a/br_tabb.b,4)as br from (select count(*) as a from (select ssid from dataclear where reportTime='2019-09-23' group by ssid having count(ssid) = 1) as br_tab) as br_taba, (select count(distinct ssid) as b from dataclear where reportTime='2019-09-23') as br_tabb; 5)newip newip - 新增ip总数 - 一天内所有ip去重后在历史数据中从未出现过的数量 hive>select count(distinct dataclear.cip) from dataclear where dataclear.reportTime = '2019-09-23' and cip not in (select dc2.cip from dataclear as dc2 where dc2.reportTime < '2019-09-23'); 6)newcust newcust - 新增客户数 - 一天内所有的uvid去重后在历史数据中从未出现过的总数 hive>select count(distinct dataclear.uvid) from dataclear where dataclear.reportTime='2019-09-23' and uvid not in (select dc2.uvid from dataclear as dc2 where dc2.reportTime < '2019-09-23'); 7)avgtime avgtime - 平均访问时常 - 一天内所有会话的访问时常的平均值 注: 一个会话的时长 = 会话中所有访问的时间的最大值 - 会话中所有访问时间的最小值 hive>select avg(atTab.usetime) as avgtime from (select max(sstime) - min(sstime) as usetime from dataclear where reportTime='2019-09-23' group by ssid) as atTab; 8)avgdeep avgdeep - 平均访问深度 - 一天内所有会话访问深度的平均值 一个会话的访问深度=一个会话访问的所有url去重后的个数 hive>select round(avg(adTab.deep),4) as avgdeep from (select count(distinct urlname) as deep from dataclear where reportTime='2019-09-23' group by ssid) as adTab;
- 业务细表的构建:
hive> create table tongji(reportTime string,pv int,uv int,vv int, br double,newip int, newcust int, avgtime double,avgdeep double) row format delimited fields terminated by '|';
- 数据插入:
hive> insert overwrite table tongji select '2019-09-02' , tab1.pv,tab2.uv,tab3.vv,tab4.br,tab5.newip,tab6.newcust,tab7.avgtime,tab8.avgdeep from (select count(*) as pv from dataclear where reportTime = '2019-09-23') as tab1, (select count(distinct uvid) as uv from dataclear where reportTime = '2019-09-23') as tab2, (select count(distinct ssid) as vv from dataclear where reportTime = '2019-09-23') as tab3, (select round(br_taba.a/br_tabb.b,4)as br from (select count(*) as a from (select ssid from dataclear where reportTime='2019-09-23' group by ssid having count(ssid) = 1) as br_tab) as br_taba, (select count(distinct ssid) as b from dataclear where reportTime='2019-09-23') as br_tabb) as tab4, (select count(distinct dataclear.cip) as newip from dataclear where dataclear.reportTime = '2019-09-23' and cip not in (select dc2.cip from dataclear as dc2 where dc2.reportTime < '2019-09-23')) as tab5, (select count(distinct dataclear.uvid) as newcust from dataclear where dataclear.reportTime='2019-09-23' and uvid not in (select dc2.uvid from dataclear as dc2 where dc2.reportTime < '2019-09-23')) as tab6, (select round(avg(atTab.usetime),4) as avgtime from (select max(sstime) - min(sstime) as usetime from dataclear where reportTime='2019-09-23' group by ssid) as atTab) as tab7, (select round(avg(deep),4) as avgdeep from (select count(distinct urlname) as deep from dataclear where reportTime='2019-09-23' group by ssid) as adTab) as tab8;
5.清洗后的数据导入MySql--这一步也可以添加到定时任务中
- 在mysql中建表:
create table tongji(reportTime varchar(40),pv int,uv int,vv int,br double,newip int,newcust int,avgtime double,avgdeep double);
- 通过Sqoop将hive数据导出:
sh sqoop export --connect jdbc:mysql://192.168.150.137:3306/weblog001 --username root --password root --export-dir '/user/hive/warehouse/weblog.db/tongji' --table tongji -m 1 --fields-terminated-by '|'
6.离线数据的数据可视化
- 已经落地在MySQL了就可以通过Echart来实现数据可视化
7.Kafka消息中间件
- 作用:Flume日志文件数据产生较快,直接交给Spark处理,可能会出现数据来不及处理的情况,此时添加Kafka保证数据一定能够被计算处理。
- 数据生产:Flume配置文件中已经配置了数据发送给Kafka ,主题topic为weblog
- 测试:打开页面访问,此时flume和kafka都会有数据产生
8.SparkStream 数据实时处理,并将处理结果写入HBase中
- 新建Scala项目
- 导入Spark,Kafka,Spark-Kafka相关的包
- 启动ZK集群
- 启动Kafka,创建主题,启动生产者线程,便于后期测试发送数据
- 编写Spark代码实现对Kafka数据的获取和处理
- 测试代码如下:
- 访问页面,启动kafka消费者,此时实时日志数据会被Kafka接收,接收后数据会被SparkStream实时处理
//测试Kafka 与Stream的连接, //此时Kafka有数据生产后,Stream会立即处理 object Kafka_test { def main(args: Array[String]): Unit = { val conf = new SparkConf(); conf.setAppName("spark_streaming_Kafka_demo"); conf.setMaster("local[2]"); val sc = new SparkContext(conf); val ssc = new StreamingContext(sc,Seconds(5)); //指定消费者组名 val consumerGroup = "gp1" //指定消费主题 Key是kafka主题名,value是消费的线程数 //可以指定消费多个主题 多个键值对 val topics =Map("weblog001"->1) //连接kafka 消费数据 val kstream = KafkaUtils.createStream(ssc, "hadoop01:2181,hadoop02:2181,hadoop03:2181", consumerGroup, topics) kstream.print(); //真正有用的数据是后面的数据 kstream.map(_._2).print(); //启动streaming ssc.start(); //一直运行直到人为干预 ssc.awaitTermination(); } }
- 测试结果:Flume,Kafka,程序都会会有数据变化
- FLume:
- Kafka:
- 程序:
- 将数据插入HBASE对应表格中实现数据仓库的存储:
主程序:
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.netlib.util.Second import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.kafka.KafkaUtils import java.util.Calendar object demo { def main(args: Array[String]): Unit = { //准备SparkStreaming的环境 //在本地模式下, 启动至少2个线程,至少一个线程消费kafka队列数据,至少一个线程负责SparkStreaming val conf = new SparkConf().setMaster("local[5]").setAppName("kafkaStream") //1.1配置sc对象 val sc =new SparkContext(conf) //配置Stream对象 指定每隔5秒执行一次流处理 val ssc = new StreamingContext(sc,Seconds(5)) //指定ZK集群地址 val zkHosts = "hadoop01:2181,hadoop02:2181,hadoop03:2181" //指定消费者组名 val consumerGroup = "gp1" //指定消费主题, Key是kafka主题名,value是消费的线程数 //可以指定消费多个主题 多个键值对 val topics =Map("weblog001"->1) //创建Stream val kafkaStream =KafkaUtils .createStream(ssc, zkHosts, consumerGroup, topics) //此处打印出来的数据格式 (null,XX目标数据) .map{x=>x._2} .foreachRDD{ //作用于DStream中每一个RDD rdd => val lines = rdd.toLocalIterator //获取一个Batch中的RDD数据,并转化为本地的迭代器类型 while(lines .hasNext){ //遍历每一个RDD 数据 val line = lines .next() //清洗RDD val info = line.split("\\|") println(info) // 提取业务字段 val url = info(0) val urlname = info(1) val uvid = info(13) val ssid =info(14).split("_")(0) val sscount =info(14).split("_")(1) val sstime =info(14).split("_")(2) val cip =info(15) //处理完这些数据封装到Bean中 val bean = LogBean(url,urlname,uvid,ssid,sscount,sstime,cip) //实现业务指标的查询处理 //1.pv:用户访问一次,PV+1 val pv =1 //--如果uvid在当天记录已存在,则uv=0 //--当天的范围如何定义:startTime=当天0:00的时间戳 //--endTime=当前记录中的sstime //--把范围定义之后,就可以去HBase做范围查询 val endtime = sstime.toLong val calendar = Calendar.getInstance //表示以endTime为基准 找到当天的 0:00 calendar.setTimeInMillis(endtime) calendar.set(Calendar.HOUR, 0) calendar.set(Calendar.MINUTE, 0) calendar.set(Calendar.SECOND, 0) calendar.set(Calendar.MILLISECOND, 0) //获取当天的0:00的时间戳 val starttime = calendar.getTimeInMillis //查询范围确定后,通过行键正则过滤器来匹配UVID的数据 val uvidRegex ="^\\d+_"+uvid+".*$" //执行HBASE表查询 通过行键正则过滤器匹配 val uvResult = HbaseUtils.queyByRowRegex(sc,starttime, endtime, uvidRegex) //如果uvResult.count==0表示此uvid在今天的记录中从没出现过 此时uv=1 val uv=if(uvResult.count()==0)1 else 0 //3.vv:独立会话数。如果在当天范围内是新的会话,则VV=1 反之VV=0 val ssidRegex= "^\\d+_\\d+_"+ssid+".*$" val vvResult =HbaseUtils.queyByRowRegex(sc, starttime, endtime, ssidRegex) val vv = if(vvResult.count()==0) 1 else 0 //4.newIp:新增IP数 如果当前记录中ip在历史数据中没有出现过 newip=1 否则newip=0 val newipRegex="^\\d+_\\d+_\\d_"+cip+".*$" val newipResult=HbaseUtils.queyByRowRegex(sc, starttime, endtime, newipRegex) val newip=if(newipResult.count()==0) 1 else 0 //4.newcust:新增用户数,判断当前的uvid在历史数据中,如果没出现过,则newcust=1 val newcustResult=HbaseUtils.queyByRowRegex(sc, starttime, endtime, uvidRegex) val newcust=if(newcustResult.count()==0) 1 else 0 //引入MYSQLBEAN val mysqlBean = MysqlBean(sstime.toLong,pv,uv,vv,newip,newcust) //将数据插入mysql数据库中 MysqlUtil.saveToMysql(mysqlBean) //调用工具类 将bean插入到HBASE中 HbaseUtils.saveToHbase(sc,bean) } } //启动Stream ssc.start() //使得程序不断进行 ssc.awaitTermination() } }
工具类HbaseUtils,代码如下:
import org.apache.spark.SparkContext import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.fs.shell.find.Result import org.apache.hadoop.hbase.client.Put import java.util.Random import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.filter.RowFilter import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp import org.apache.spark.sql.catalyst.expressions.StringRegexExpression import org.apache.hadoop.hbase.filter.RegexStringComparator import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.Base64 object HbaseUtils { //将SparkRDD写入到Hbase中 def saveToHbase (sc:SparkContext,bean:LogBean){ sc.hadoopConfiguration.set("hbse.zookeeper.quorum", "hadoop01,hadoop02,hadoop03") sc.hadoopConfiguration.set("hbse.zookeeper.property.clientPort", "2181") sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"fluxtable") //底层基于MR val job = new Job(sc.hadoopConfiguration) // 定义输出的key类型:Writable hadoop的序列化机制 job.setOutputKeyClass(classOf[ImmutableBytesWritable]) // 定义输出的Value类型 job.setOutputValueClass(classOf[Result]) // 定义输出的表格类型 job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) //插入HBASE :RDD+元组 //此时数据是LogBean类型 需要转化为RDD val hbaseRdd = sc.makeRDD(List(bean)).map { bean => //创建HBASE的行对象,并指定行键 //行键设计思路:时间错+用户ID+会话ID+本地IP+随机数 //行键设计目的: //1.行键中包含时间戳 有利于按时间维度来做范围查询 //2.包含用户会话本地id 利于通过行键正则匹配查询相关数据 //3.行键后拼接随机数字是为了满足散列原则,避免数据集中到一个Hregion中 val rowKey = bean.sstime+"_"+bean.uvid+"_"+bean.ssid+"_"+bean.cip+new Random().nextInt(100) //新建put对象 将rowKey转化为字节数组传入 val put = new Put(rowKey.getBytes) put.add("cf1".getBytes, "url".getBytes, bean.url.getBytes) put.add("cf1".getBytes, "urlname".getBytes, bean.urlname.getBytes) put.add("cf1".getBytes, "uvid".getBytes, bean.uvid.getBytes) put.add("cf1".getBytes, "ssid".getBytes, bean.ssid.getBytes) put.add("cf1".getBytes, "sscount".getBytes, bean.sscount.getBytes) put.add("cf1".getBytes, "sstime".getBytes, bean.sstime.getBytes) put.add("cf1".getBytes, "cip".getBytes, bean.cip.getBytes) //最后要放回kv结构 (new ImmutableBytesWritable,put) } hbaseRdd.saveAsNewAPIHadoopDataset(job.getConfiguration) } //没有写等号 就相对于是返回Unit def queyByRowRegex(sc:SparkContext,starttime:Long, endtime:Long, regex:String)={ val hbaseConf = HBaseConfiguration.create sc.hadoopConfiguration.set("hbse.zookeeper.quorum", "hadoop01,hadoop02,hadoop03") sc.hadoopConfiguration.set("hbse.zookeeper.property.clientPort", "2181") sc.hadoopConfiguration.set(TableInputFormat.INPUT_TABLE,"fluxtable") //范围查询 scan val scan = new Scan() //设定扫描范围 scan.setStartRow(starttime.toString().getBytes) scan.setStopRow(endtime.toString().getBytes) //使用行键正则过滤器: //比较规则 比较对象 val filter = new RowFilter(CompareOp.EQUAL,new RegexStringComparator(regex)) //绑定过滤器到scan对象 使得扫描HBASE表格的时候过滤器会生效 scan.setFilter(filter) //设置scan对象 hbaseConf.set(TableInputFormat.SCAN,Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray())) val resultRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) resultRDD } }
实体类LogBean代码如下:
//样例类: //默认构造一个空构造器,public级别的,我们可以直接访问; //无需new //默认实现序列化 //默认实现toString //默认实现了equals 和hashCode //该对象用于封装日志数据 case class LogBean ( val url:String, val urlname:String, val uvid:String, val ssid:String, val sscount:String, val sstime:String, val cip:String){ }
MySQL工具类:
import java.sql.Connection import java.sql.PreparedStatement import java.sql.ResultSet import java.text.SimpleDateFormat import com.mchange.v2.c3p0.ComboPooledDataSource object MysqlUtil { //引入c3p0连接池对象 val dataSource=new ComboPooledDataSource def saveToMysql(mysqlBean: MysqlBean) = { var conn :Connection= null var ps1:PreparedStatement=null var rs1:ResultSet=null var ps2:PreparedStatement=null var ps3:PreparedStatement=null try { val sdf = new SimpleDateFormat("YYYY-MM-dd") //获取当天日期时间 val nowTime =sdf.format(mysqlBean.time) //获取连接数据 conn=dataSource.getConnection() //查询当天数据 ps1=conn.prepareStatement("select * from tonji2 where reporttime=?") ps1.setString(1,nowTime) //执行查询 返回结果集 rs1=ps1.executeQuery() if(rs1.next()){ //表示当天已经有了数据,更新各个指标,实现累加 ps3=conn.prepareStatement("update tongji2 set pv=pv+?,uv=uv+?,vv=vv+?,newip=newip+?,newcust=newcust+? where reporttime=?") ps3.setInt(1,mysqlBean.pv) ps3.setInt(2,mysqlBean.uv) ps3.setInt(3,mysqlBean.vv) ps3.setInt(4,mysqlBean.newip) ps3.setInt(5,mysqlBean.newcust) ps3.setString(6,nowTime) ps3.executeUpdate() }else{ //表示当天还有数据,则执行插入 ps2=conn.prepareStatement("insert into tongji2 values(?,?,?,?,?,?)") ps2.setString(1,nowTime) ps2.setInt(2, mysqlBean.pv) ps2.setInt(3, mysqlBean.uv) ps2.setInt(4, mysqlBean.vv) ps2.setInt(5, mysqlBean.newip) ps2.setInt(6, mysqlBean.newcust) //执行插入 ps2.executeUpdate() } } catch { case t: Throwable => t.printStackTrace() // TODO: handle error }finally{ //关闭流之前先判空 以免出现空指针异常 if(ps3!=null)ps3.close() if(ps2!=null)ps2.close() if(ps1!=null)ps1.close() if(rs1!=null)rs1.close() if(conn!=null)conn.close() } } }
MySQLBean代码:
//用于封装要插入MySQL的数据 case class MysqlBean (time:Long,pv:Int,uv:Int,vv:Int,newip:Int,newcust:Int) { }
注意:数据从kafka发送过来的时候,被SparkStream处理,处理后的数据首先会被分析统计为监控指标(MySQLBean),然后根据MySQLUtil落地到MySQL中,而日志数据(LogBean)会落地到Hbase中。