网站流量统计项目

网站流量统计项目总结以及搭建流程

一:架构图



二: 项目搭建流程

  1. 搭建埋点服务器FluxAPPServer

  • 在Eclipse中搭建DynamicWeb项目,添加前段代码,此处是用户行为数据的产生地,注意此处需要在前端代码中埋点。
  • 埋点:指的是在服务器前端代码中添加JS代码块,用于收集用户行为数据,并将这些数据发送到FluxLogServer日志服务器中去。
  • JS代码如下:
/**函数可对字符串进行编码,这样就可以在所有的计算机上读取该字符串。*/
function ar_encode(str)
{
	//进行URL编码
	return encodeURI(str);
}


/**屏幕分辨率*/
function ar_get_screen()
{
	var c = "";

	if (self.screen) {
		c = screen.width+"x"+screen.height;
	}

	return c;
}

/**颜色质量*/
function ar_get_color()
{
	var c = ""; 

	if (self.screen) {
		c = screen.colorDepth+"-bit";
	}

	return c;
}

/**返回当前的浏览器语言*/
function ar_get_language()
{
	var l = "";
	var n = navigator;

	if (n.language) {
		l = n.language.toLowerCase();
	}
	else
	if (n.browserLanguage) {
		l = n.browserLanguage.toLowerCase();
	}

	return l;
}

/**返回浏览器类型IE,Firefox*/
function ar_get_agent()
{
	var a = "";
	var n = navigator;

	if (n.userAgent) {
		a = n.userAgent;
	}

	return a;
}

/**方法可返回一个布尔值,该值指示浏览器是否支持并启用了Java*/
function ar_get_jvm_enabled()
{
	var j = "";
	var n = navigator;

	j = n.javaEnabled() ? 1 : 0;

	return j;
}

/**返回浏览器是否支持(启用)cookie */
function ar_get_cookie_enabled()
{
	var c = "";
	var n = navigator;
	c = n.cookieEnabled ? 1 : 0;

	return c;
}

/**检测浏览器是否支持Flash或有Flash插件*/
function ar_get_flash_ver()
{
	var f="",n=navigator;

	if (n.plugins && n.plugins.length) {
		for (var ii=0;ii<n.plugins.length;ii++) {
			if (n.plugins[ii].name.indexOf('Shockwave Flash')!=-1) {
				f=n.plugins[ii].description.split('Shockwave Flash ')[1];
				break;
			}
		}
	}
	else
	if (window.ActiveXObject) {
		for (var ii=10;ii>=2;ii--) {
			try {
				var fl=eval("new ActiveXObject('ShockwaveFlash.ShockwaveFlash."+ii+"');");
				if (fl) {
					f=ii + '.0';
					break;
				}
			}
			 catch(e) {}
		}
	}
	return f;
} 

 
/**匹配顶级域名*/
function ar_c_ctry_top_domain(str)
{
	var pattern = "/^aero$|^cat$|^coop$|^int$|^museum$|^pro$|^travel$|^xxx$|^com$|^net$|^gov$|^org$|^mil$|^edu$|^biz$|^info$|^name$|^ac$|^mil$|^co$|^ed$|^gv$|^nt$|^bj$|^hz$|^sh$|^tj$|^cq$|^he$|^nm$|^ln$|^jl$|^hl$|^js$|^zj$|^ah$|^hb$|^hn$|^gd$|^gx$|^hi$|^sc$|^gz$|^yn$|^xz$|^sn$|^gs$|^qh$|^nx$|^xj$|^tw$|^hk$|^mo$|^fj$|^ha$|^jx$|^sd$|^sx$/i";

	if(str.match(pattern)){ return 1; }

	return 0;
}

/**处理域名地址*/
function ar_get_domain(host)
{
	//如果存在则截去域名开头的 "www."
	var d=host.replace(/^www\./, "");

	//剩余部分按照"."进行split操作,获取长度
	var ss=d.split(".");
	var l=ss.length;

	//如果长度为3,则为xxx.yyy.zz格式
	if(l == 3){
		//如果yyy为顶级域名,zz为次级域名,保留所有
		if(ar_c_ctry_top_domain(ss[1]) && ar_c_ctry_domain(ss[2])){
		}
		//否则只保留后两节
		else{
			d = ss[1]+"."+ss[2];
		}
	}
	//如果长度大于3
	else if(l >= 3){

		//如果host本身是个ip地址,则直接返回该ip地址为完整域名
		var ip_pat = "^[0-9]*\.[0-9]*\.[0-9]*\.[0-9]*$";
		if(host.match(ip_pat)){
			return d;
		}
		//如果host后两节为顶级域名及次级域名,则保留后三节
		if(ar_c_ctry_top_domain(ss[l-2]) && ar_c_ctry_domain(ss[l-1])) {
			d = ss[l-3]+"."+ss[l-2]+"."+ss[l-1];
		}
		//否则保留后两节
		else{
			d = ss[l-2]+"."+ss[l-1];
		}
	}
		
	return d;
}


/**返回cookie信息*/
function ar_get_cookie(name)
{
	//获取所有cookie信息
	var co=document.cookie;
	
	//如果名字是个空 返回所有cookie信息
	if (name == "") {
		return co;
	}
	
	//名字不为空 则在所有的cookie中查找这个名字的cookie
	var mn=name+"=";
	var b,e;
	b=co.indexOf(mn);

	//没有找到这个名字的cookie 则返回空
	if (b < 0) {
		return "";
	}

	//找到了这个名字的cookie 获取cookie的值返回
	e=co.indexOf(";", b+name.length);
	if (e < 0) {
		return co.substring(b+name.length + 1);
	}
	else {
		return co.substring(b+name.length + 1, e);
	}
}

/**
 	设置cookie信息
	操作符:
		0 表示不设置超时时间 cookie是一个会话级别的cookie  cookie信息保存在浏览器内存当中 浏览器关闭时cookie消失
		1 表示设置超时时间为10年以后 cookie会一直保存在浏览器的临时文件夹里 直到超时时间到来 或用户手动清空cookie为止
		2 表示设置超时时间为1个小时以后 cookie会一直保存在浏览器的临时文件夹里 直到超时时间到来 或用户手动清空cookie为止
 * */
function ar_set_cookie(name, val, cotp) 
{ 
	var date=new Date; 
	var year=date.getFullYear(); 
	var hour=date.getHours(); 

	var cookie="";

	if (cotp == 0) { 
		cookie=name+"="+val+";"; 
	} 
	else if (cotp == 1) { 
		year=year+10; 
		date.setYear(year); 
		cookie=name+"="+val+";expires="+date.toGMTString()+";"; 
	} 
	else if (cotp == 2) { 
		hour=hour+1; 
		date.setHours(hour); 
		cookie=name+"="+val+";expires="+date.toGMTString()+";"; 
	} 

	var d=ar_get_domain(document.domain);
	if(d != ""){
		cookie +="domain="+d+";";
	}
	cookie +="path="+"/;";

	document.cookie=cookie;
}



/**返回客户端时间*/
function ar_get_stm() 
{ 
	return new Date().getTime();
} 


/**返回指定个数的随机数字串*/
function ar_get_random(n) {
	var str = "";
	for (var i = 0; i < n; i ++) {
		str += String(parseInt(Math.random() * 10));
	}
	return str;
}

/* main function */
function ar_main() {
	
	//收集完日志 指定数据提交到的路径(FluxLogServer日志处理服务器)
	var dest_path   = "http://localhost:8080/FluxLogServer/servlet/LogServlet?"; 
	var expire_time = 30 * 60 * 1000;//会话超时时长

	//处理uv
	//--获取cookie ar_stat_uv的值
	var uv_str = ar_get_cookie("ar_stat_uv");
	var uv_id = "";
	//--如果cookie ar_stat_uv的值为空
	if (uv_str == ""){
		//--为这个新uv配置id,为一个长度20的随机数字
		uv_id = ar_get_random(20);
		//--设置cookie ar_stat_uv 保存时间为10年
		ar_set_cookie("ar_stat_uv", uv_id, 1);
	}
	//--如果cookie ar_stat_uv的值不为空
	else{
		//--获取uv_id
		uv_id  = uv_str;
	}

	//处理ss
	//--获取cookie ar_stat_ss
	var ss_str = ar_get_cookie("ar_stat_ss"); 
	var ss_id = "";  //sessin id
	var ss_no = 0;   //session有效期内访问页面的次数

	//--如果cookie中不存在ar_stat_ss 说明是一次新的会话
	if (ss_str == ""){
		//--随机生成长度为10的session id
		ss_id = ar_get_random(10);
		//--session有效期内页面访问次数为0
		ss_no = 0;
		//--拼接cookie ar_stat_ss 值 格式为 会话编号_会话期内访问次数_客户端时间_网站id
		value = ss_id+"_"+ss_no+"_"+ar_get_stm();
		//--设置cookie ar_stat_ss
		ar_set_cookie("ar_stat_ss", value, 0); 
	} 
	//--如果cookie中存在ar_stat_ss
	else { 
		//获取ss相关信息
		var items = ss_str.split("_");
		//--ss_id
		var cookie_ss_id  = items[0];
		//--ss_no
		var cookie_ss_no  = parseInt(items[1]);
		//--ss_stm
		var cookie_ss_stm = items[2];

		//如果当前时间-当前会话上一次访问页面的时间>30分钟,虽然cookie还存在,但是其实已经超时了!仍然需要重新生成cookie
		if (ar_get_stm() - cookie_ss_stm > expire_time) { 
			//--重新生成会话id
			ss_id = ar_get_random(10);
			//--设置会话中的页面访问次数为0
			ss_no = 0;
		} 
		//--如果会话没有超时
		else{
			//--会话id不变
			ss_id = cookie_ss_id;
			//--设置会话中的页面方位次数+1
			ss_no = cookie_ss_no + 1;
		}

		//--重新拼接cookie ar_stat_ss的值 
		value = ss_id+"_"+ss_no+"_"+ar_get_stm();
		ar_set_cookie("ar_stat_ss", value, 0); 
	}

	//当前地址
	var url = document.URL; 
	url = ar_encode(String(url)); 
	
	//当前资源名
	var urlname = document.URL.substring(document.URL.lastIndexOf("/")+1);
	urlname = ar_encode(String(urlname)); 
	
    //返回导航到当前网页的超链接所在网页的URL
	var ref = document.referrer; 
	ref = ar_encode(String(ref)); 


	//网页标题
	var title = document.title;
	title = ar_encode(String(title)); 

	//网页字符集
	var charset = document.charset;
	charset = ar_encode(String(charset)); 

	//屏幕信息
	var screen = ar_get_screen(); 
	screen = ar_encode(String(screen)); 

	//颜色信息
	var color =ar_get_color(); 
	color =ar_encode(String(color)); 

	//语言信息
	var language = ar_get_language(); 
	language = ar_encode(String(language));

 	//浏览器类型
	var agent =ar_get_agent(); 
	agent =ar_encode(String(agent));

	//浏览器是否支持并启用了java
	var jvm_enabled =ar_get_jvm_enabled(); 
	jvm_enabled =ar_encode(String(jvm_enabled)); 

	//浏览器是否支持并启用了cookie
	var cookie_enabled =ar_get_cookie_enabled(); 
	cookie_enabled =ar_encode(String(cookie_enabled)); 

	//浏览器flash版本
	var flash_ver = ar_get_flash_ver();
	flash_ver = ar_encode(String(flash_ver)); 

	
	//当前ss状态 格式为"会话id_会话次数_当前时间"
	var stat_ss = ss_id+"_"+ss_no+"_"+ar_get_stm();

	//拼接访问地址 增加如上信息
	dest=dest_path+"url="+url+"&urlname="+urlname+"&title="+title+"&chset="+charset+"&scr="+screen+"&col="+color+"&lg="+language+"&je="+jvm_enabled+"&ce="+cookie_enabled+"&fv="+flash_ver+"&cnv="+String(Math.random())+"&ref="+ref+"&uagent="+agent+"&stat_uv="+uv_id+"&stat_ss="+stat_ss;


    //数据封装到<img>标签中  通过图片将数据发送给后台
    document.getElementsByTagName("body")[0].innerHTML += "<img src=\""+dest+"\" border=\"0\" width=\"1\" height=\"1\" />";
    
}

//一旦打开页面就触发main方法 开始收集数据
window.onload = function(){
	//触发main方法
	ar_main();
}


2.搭建FluxLogServer服务器

  • 同样是DynamicWeb项目,添加到Tomcat中
  • 该项目主要是要实现接收从FluxAppServer发送过来的数据,并对数据做简单处理后打出日志,同时于此处整合Flume,通过Flume收集此处的日志信息
  • 项目代码如下
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.log4j.Logger;


public class LogServlet extends HttpServlet {  //此处引入log4j的实例  private Logger loger=Logger.getLogger(LogServlet.class);    //处理Get请求  public void doGet(HttpServletRequest request, HttpServletResponse response)  throws ServletException, IOException {    //0.测试是否接受到了 FluxAppServer发送过来的请求消息  String info = URLDecoder.decode(request.getQueryString(),"UTF-8");  System.out.println(info);  /**  接收到的请求为:  url=http://localhost:8080/FluxAppServer/b.jsp&urlname=b.jsp&title=页面B&chset=UTF-8&scr=1920x1080&col=24-bit&lg=zh-cn&je=0&ce=1&fv=&cnv=0.9455173433464339&ref=&uagent=Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36&stat_uv=46611240728069970531&stat_ss=1209855703_16_1569036737318  */      //1.处理数据,返回的数据类型为: 去除属性名 只要属性值 属性值之间用|进行分割  //1.1当前接受到的数据格式:属性名=属性值&属性名=属性值  String[] kvs = info.split("&");  StringBuffer buffer = new StringBuffer();  //1.2处理每一个属性键值对 后保存到Buffer中  for (String kv : kvs) {  //ref属性可能会有多个 会导致越界  //处理越界问题: 项目练习只取第一个ref  String value = kv.split("=").length==2?kv.split("=")[1]:"";  buffer.append(value + "|");  }  //1.3拼接用户的IP地址  buffer.append(request.getRemoteAddr());    //1.4打日志:指定info记录级别(级别低->高: debug,info,warn,error,fatal)//*Log代码+logfj的配置文件  System.out.println("buffer  " + buffer);  /**   * bufferhttp://localhost:8080/FluxAppServer/b.jsp|b.jsp|页面B|UTF-8|1920x1080|24-bit|zh-cn|0|1||0.9455173433464339||Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36|46611240728069970531|1209855703_16_1569036737318|0:0:0:0:0:0:0:1   */      loger.info(buffer);    }  public void doPost(HttpServletRequest request, HttpServletResponse response)  throws ServletException, IOException {  doGet(request, response);  }

}
		
  • 添加log4j配置文件,配置日志和Flume相关信息
#设置日志输出级别为
log4j.rootLogger = info,stdout,flume
#日志打印输出到控制台
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %m%n

#日志发送给Flume 配置Flume相关信息
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = hadoop01
log4j.appender.flume.Port = 44444
log4j.appender.flume.UnsafeMode = true

3.搭建Flume服务器

  • 结构描述:扇出分流,一支流向HDFS以便日志文件的生成,一支流向Kafka以便数据实时处理
  • 配置Flume的启动文件如下(data下的配置文件)

a1.sources=s1
a1.channels=c1 c2
a1.sinks=k1 k2

# 由于数据是从服务器JAVA程序过来的,此处描述source类型为avro
a1.sources.s1.type=avro

#此处的bind与port要与上述FluxLogServer项目中的log4j配置文件中的属性保持一致,以便数据交接
a1.sources.s1.bind=10.42.170.167
a1.sources.s1.port=44444

#为了后期再HDFS中生成的日志文件是以日期进行分类的,此处接收数据的时候应该添加时间戳
a1.sources.s1.interceptors=i1
a1.sources.s1.interceptors.i1.type=timestamp

#描述channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100

#描述流向HDFS的sink
a1.sinks.k1.type=hdfs

#指定日志文件在HDFS中生成的路径 时间是自动获取Linux系统的时间
a1.sinks.k1.hdfs.path=hdfs://hadoop01:9000/weblog/reporttime=%Y-%m-%d

#文件生成的间隔时间:一小时生成一次新文件 (s)
a1.sinks.k1.hdfs.rollInterval = 3600

#生成的文件大小: 文件超过多大会生成一次新文件  0KB 不按文件大小来生成
s1.sinks.k1.hdfs.rollSize = 0

#每写几条数据就生成文件:0表示不按数据写入条数来生成
s1.sinks.k1.hdfs.rollCount=0

#表示文件是普通文本类型 
a1.sinks.k1.hdfs.fileType = DataStream

#描述流向Kafka的sink
a1.sinks.k2.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.brokerList=hadoop01:9092
a1.sinks.k2.topic=weblog

#绑定
a1.sources.s1.channels=c1 c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2
  • 注意要点:
      1. Source为AVRO
      2. 两个sink,一个链接Kafka以便数据后期给Spark实时处理,一个链接HDFS以便后期数据可以进行离线批处理
      3. HDFS相关配置的注意点:
        1. 日志文件输出的时候需要添加 linux自动生成的时间 以便日志文件最后在HDFS是以每天日期来分区的
        2. 配置Flume自动生成日志文件到HDFS
  • 测试HDFS是否成功收集日志:
      1. 日志数据提供给HDFS:日志数据传输到HDFS后数据会文件的形式保存到HDFS中
      2. 功能测试:
          1. Flume配置文件配置好之后启动Flume
          2. 启动Hadoop
          3. 启动ZK集群和Kafka
          4. 访问目标页面产生用户行为数据后查看Flume是否有数据变动,查看Hadoop中是否有结果文件生成


4.Hive清理HDFS收集到的日志文件

1.Hive总表构建

  • 总表的作用:接收HDFS中的FLume日志数据
  • 总表构建:启动hive,创建数据库,在数据库中创建外部表
  • 总表结构:

  • 建表语句:
create external table flux 
(url string,urlname string,title string,chset string,scr string,col string,lg string,je string,ec string,fv string,cn string,ref string,uagent string,stat_uv string,stat_ss string,cip string) 
PARTITIONED BY (reporttime string) row format 
delimited fields terminated by '|' 
location '/weblog001';


  • 注意:location指向了外部表保存在HDFS中的位置

  • 由于之前FLume日志写出的位置为

此处意味着,Flume输出到HDFS中的日志文件所在文件夹就是HIVE外部表的位置
此时要将日志数据导入到外部表只需要添加Hive分区,分区为reporttime='当天时间'

  • 在HIVE的bin下编写Shell脚本,脚本主要功能为:
  1. 自动为HIVE外部表Flux 总表添加分区,分区为系统当天时间;
  2. 添加Linux定时任务,使得HDFS当天日志文件内容能够插入到对应HIVE总表对应分区中;

脚本如下:
auto.sh
#!/bin/sh          
source /etc/profile;
today=$(date +%Y-%m-%d)
/home/presoftware/hive/bin/hive<<EOF
use weblog001;
alter table flux add partition(reporttime='$today');
添加定时任务:(corntab工具
0 0 * * * ./home/presoftware/hive/bin/auto.sh 

Linux一到 0点执行自动任务,为hive的Flux表格添加分区 reportitme=XX,当天用户访问页面,然后当天日志数据就会由Flume保存到hdfs中的weblog001/reporttime=XX中,同时这些数据也会被hive外部表Flux引用插入到对应的表格分区 reporttime=XX 中;


2.Hive数据清洗(清洗表dataclear  业务细表tongji)

dataclear数据清洗表:

  • 建表语句:
create table dataclear
(reportTime string,url string,urlname string,uvid string,ssid string,sscount string,sstime string,cip string)
row format delimited fields terminated by '|';



  • 数据插入:

>insert overwrite table dataclear
select reporttime,url,urlname,stat_uv,split(stat_ss,"_")[0],split(stat_ss,"_")[1],split(stat_ss,"_")[2],cip from flux;


  • tongji业务表:

  • 业务指标以及统计HQL:
1)pv:统计 页面点击数
hive>select count(*) as pv from dataclear where reportTime = '2019-09-23';



2)uv
uv - 独立访客数 - 一天之内所有的访客的数量 - 一天之内uvid去重后的总数
hive>select count(distinct uvid) as uv from dataclear where reportTime = '2019-09-23';


3)vv
vv - 独立会话数 - 一天之内所有的会话的数量 - 一天之内ssid去重后的总数
hive>select count(distinct ssid) as vv from dataclear where reportTime = '2019-09-23';   4)br
br - 跳出率 - 一天内跳出的会话总数/会话总数 
hive>
select round(br_taba.a/br_tabb.b,4)as br from 
(select count(*) as a from (select ssid from dataclear where reportTime='2019-09-23' group by ssid having count(ssid) = 1) as br_tab) as br_taba,
(select count(distinct ssid) as b from dataclear where reportTime='2019-09-23') as br_tabb;



5)newip
newip - 新增ip总数 - 一天内所有ip去重后在历史数据中从未出现过的数量
hive>select count(distinct dataclear.cip) from dataclear where dataclear.reportTime = '2019-09-23' 
and  cip not in
(select dc2.cip from dataclear as dc2 where dc2.reportTime < '2019-09-23');



6)newcust
newcust - 新增客户数 - 一天内所有的uvid去重后在历史数据中从未出现过的总数
hive>select count(distinct dataclear.uvid) from dataclear where dataclear.reportTime='2019-09-23'
and uvid not in 
(select dc2.uvid from dataclear as dc2 where dc2.reportTime < '2019-09-23');


7)avgtime
avgtime - 平均访问时常 - 一天内所有会话的访问时常的平均值 
注: 一个会话的时长 = 会话中所有访问的时间的最大值 - 会话中所有访问时间的最小值
hive>select avg(atTab.usetime) as avgtime from 
(select max(sstime) - min(sstime) as usetime from dataclear where reportTime='2019-09-23' group by ssid) as atTab;   8)avgdeep
avgdeep - 平均访问深度 - 一天内所有会话访问深度的平均值
一个会话的访问深度=一个会话访问的所有url去重后的个数

hive>select round(avg(adTab.deep),4) as avgdeep from
(select count(distinct urlname) as deep from dataclear where reportTime='2019-09-23' group by ssid) as adTab;



  • 业务细表的构建:
hive> create table tongji(reportTime string,pv int,uv int,vv int, br double,newip int, newcust int, avgtime double,avgdeep double) row format delimited fields terminated by '|';


  • 数据插入:
 hive>
insert overwrite table tongji select '2019-09-02' , tab1.pv,tab2.uv,tab3.vv,tab4.br,tab5.newip,tab6.newcust,tab7.avgtime,tab8.avgdeep from
(select count(*) as pv from dataclear where reportTime = '2019-09-23') as tab1,
(select count(distinct uvid) as uv from dataclear where reportTime = '2019-09-23') as tab2,
(select count(distinct ssid) as vv from dataclear where reportTime = '2019-09-23') as tab3,
(select round(br_taba.a/br_tabb.b,4)as br from (select count(*) as a from (select ssid from dataclear where reportTime='2019-09-23' group by ssid having count(ssid) = 1) as br_tab) as br_taba,
(select count(distinct ssid) as b from dataclear where reportTime='2019-09-23') as br_tabb) as tab4,
(select count(distinct dataclear.cip) as newip from dataclear where dataclear.reportTime = '2019-09-23' and cip not in (select dc2.cip from dataclear as dc2 where dc2.reportTime < '2019-09-23')) as tab5,
(select count(distinct dataclear.uvid) as newcust from dataclear where dataclear.reportTime='2019-09-23' and uvid not in (select dc2.uvid from dataclear as dc2 where dc2.reportTime < '2019-09-23')) as tab6,
(select round(avg(atTab.usetime),4) as avgtime from (select max(sstime) - min(sstime) as usetime from dataclear where reportTime='2019-09-23' group by ssid) as atTab) as tab7,
(select round(avg(deep),4) as avgdeep from (select count(distinct urlname) as deep from dataclear where reportTime='2019-09-23' group by ssid) as adTab) as tab8;

注意:此时无论是数据清洗表还是业务细表都是属于hive内部表。保存在HDFS的user/hive中

5.清洗后的数据导入MySql--这一步也可以添加到定时任务中

  • 在mysql中建表:
create table tongji(reportTime varchar(40),pv int,uv int,vv int,br double,newip int,newcust int,avgtime double,avgdeep double);

  • 通过Sqoop将hive数据导出:
sh sqoop export --connect jdbc:mysql://192.168.150.137:3306/weblog001 --username root --password root --export-dir '/user/hive/warehouse/weblog.db/tongji' --table tongji -m 1 --fields-terminated-by '|'

6.离线数据的数据可视化

  • 已经落地在MySQL了就可以通过Echart来实现数据可视化


7.Kafka消息中间件


  • 作用:Flume日志文件数据产生较快,直接交给Spark处理,可能会出现数据来不及处理的情况,此时添加Kafka保证数据一定能够被计算处理。
  • 数据生产:Flume配置文件中已经配置了数据发送给Kafka ,主题topic为weblog
  • 测试:打开页面访问,此时flume和kafka都会有数据产生

8.SparkStream 数据实时处理,并将处理结果写入HBase中

  1. 新建Scala项目
  2. 导入Spark,Kafka,Spark-Kafka相关的包
  3. 启动ZK集群
  4. 启动Kafka,创建主题,启动生产者线程,便于后期测试发送数据
  5. 编写Spark代码实现对Kafka数据的获取和处理

  • 测试代码如下:
  • 访问页面,启动kafka消费者,此时实时日志数据会被Kafka接收,接收后数据会被SparkStream实时处理
//测试Kafka 与Stream的连接,
//此时Kafka有数据生产后,Stream会立即处理
object Kafka_test  {
  
    def main(args: Array[String]): Unit = {
    val conf = new SparkConf();
    conf.setAppName("spark_streaming_Kafka_demo");
    conf.setMaster("local[2]");
    val sc = new SparkContext(conf);
    

    val ssc = new StreamingContext(sc,Seconds(5));
     
    //指定消费者组名
    val consumerGroup = "gp1"
    
    //指定消费主题 Key是kafka主题名,value是消费的线程数 //可以指定消费多个主题 多个键值对
    val topics =Map("weblog001"->1)
    
    //连接kafka 消费数据
    val kstream = KafkaUtils.createStream(ssc, "hadoop01:2181,hadoop02:2181,hadoop03:2181", consumerGroup, topics)
    
        kstream.print();

    //真正有用的数据是后面的数据
    kstream.map(_._2).print();

    //启动streaming
    ssc.start();
        //一直运行直到人为干预
    ssc.awaitTermination();
   
  }

  
}

  • 测试结果:Flume,Kafka,程序都会会有数据变化
  • FLume:

  • Kafka:

  • 程序:


  • 将数据插入HBASE对应表格中实现数据仓库的存储:
Hbase建表,表结构为:




主程序:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.netlib.util.Second
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import java.util.Calendar

object demo {

	def main(args: Array[String]): Unit = {

			//准备SparkStreaming的环境
			//在本地模式下, 启动至少2个线程,至少一个线程消费kafka队列数据,至少一个线程负责SparkStreaming
			val conf = new SparkConf().setMaster("local[5]").setAppName("kafkaStream")
					//1.1配置sc对象
					val sc =new SparkContext(conf)
			//配置Stream对象 指定每隔5秒执行一次流处理
			val ssc = new StreamingContext(sc,Seconds(5))

			//指定ZK集群地址
			val zkHosts = "hadoop01:2181,hadoop02:2181,hadoop03:2181"
			//指定消费者组名
			val consumerGroup = "gp1"
			//指定消费主题, Key是kafka主题名,value是消费的线程数 //可以指定消费多个主题 多个键值对
			val topics =Map("weblog001"->1)

			//创建Stream
			val kafkaStream =KafkaUtils
			.createStream(ssc, zkHosts, consumerGroup, topics) //此处打印出来的数据格式 (null,XX目标数据)
			.map{x=>x._2}   
			.foreachRDD{    //作用于DStream中每一个RDD
				rdd => 

				val lines = rdd.toLocalIterator //获取一个Batch中的RDD数据,并转化为本地的迭代器类型

				while(lines .hasNext){
					//遍历每一个RDD 数据
					val line = lines .next()
							//清洗RDD
							val info = line.split("\\|")

							println(info)

							//  提取业务字段
							val url = info(0)
							val urlname = info(1)
							val uvid = info(13)
							val ssid =info(14).split("_")(0)                          
							val sscount =info(14).split("_")(1)                          
							val sstime =info(14).split("_")(2)                          
							val cip =info(15)  

							//处理完这些数据封装到Bean中
							val bean = LogBean(url,urlname,uvid,ssid,sscount,sstime,cip)

		

							//实现业务指标的查询处理

							//1.pv:用户访问一次,PV+1 
							val pv =1

							//--如果uvid在当天记录已存在,则uv=0
							//--当天的范围如何定义:startTime=当天0:00的时间戳
							//--endTime=当前记录中的sstime
							//--把范围定义之后,就可以去HBase做范围查询
							
							val endtime = sstime.toLong
							val calendar = Calendar.getInstance

							//表示以endTime为基准 找到当天的 0:00
							calendar.setTimeInMillis(endtime)
							calendar.set(Calendar.HOUR, 0)
							calendar.set(Calendar.MINUTE, 0)
							calendar.set(Calendar.SECOND, 0)
							calendar.set(Calendar.MILLISECOND, 0)

							//获取当天的0:00的时间戳
							val  starttime = calendar.getTimeInMillis
							//查询范围确定后,通过行键正则过滤器来匹配UVID的数据
							val uvidRegex ="^\\d+_"+uvid+".*$"
							
							//执行HBASE表查询 通过行键正则过滤器匹配
							val uvResult = HbaseUtils.queyByRowRegex(sc,starttime, endtime, uvidRegex)

							//如果uvResult.count==0表示此uvid在今天的记录中从没出现过 此时uv=1
							val uv=if(uvResult.count()==0)1 else 0

							//3.vv:独立会话数。如果在当天范围内是新的会话,则VV=1 反之VV=0
							val ssidRegex= "^\\d+_\\d+_"+ssid+".*$"
							val vvResult =HbaseUtils.queyByRowRegex(sc, starttime, endtime, ssidRegex)
							val vv = if(vvResult.count()==0) 1 else 0

							//4.newIp:新增IP数 如果当前记录中ip在历史数据中没有出现过  newip=1 否则newip=0
							val newipRegex="^\\d+_\\d+_\\d_"+cip+".*$"
							val newipResult=HbaseUtils.queyByRowRegex(sc, starttime, endtime, newipRegex)
							val newip=if(newipResult.count()==0) 1 else 0

							//4.newcust:新增用户数,判断当前的uvid在历史数据中,如果没出现过,则newcust=1
							val newcustResult=HbaseUtils.queyByRowRegex(sc, starttime, endtime, uvidRegex)
							val newcust=if(newcustResult.count()==0) 1 else 0

							//引入MYSQLBEAN
							val mysqlBean = MysqlBean(sstime.toLong,pv,uv,vv,newip,newcust)

							//将数据插入mysql数据库中
							
							MysqlUtil.saveToMysql(mysqlBean)
							
							//调用工具类 将bean插入到HBASE中
							HbaseUtils.saveToHbase(sc,bean)

				}
			}

			
			//启动Stream
			ssc.start()
			//使得程序不断进行
			ssc.awaitTermination()

	}


}


工具类HbaseUtils,代码如下:
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.fs.shell.find.Result
import org.apache.hadoop.hbase.client.Put
import java.util.Random
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.filter.RowFilter
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.spark.sql.catalyst.expressions.StringRegexExpression
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.Base64

object HbaseUtils {
  //将SparkRDD写入到Hbase中
  def saveToHbase (sc:SparkContext,bean:LogBean){
    
    sc.hadoopConfiguration.set("hbse.zookeeper.quorum", "hadoop01,hadoop02,hadoop03")
    sc.hadoopConfiguration.set("hbse.zookeeper.property.clientPort", "2181")
    
    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"fluxtable")
    
    //底层基于MR
     val job = new Job(sc.hadoopConfiguration)
      
      // 定义输出的key类型:Writable hadoop的序列化机制
     job.setOutputKeyClass(classOf[ImmutableBytesWritable])
       // 定义输出的Value类型
     job.setOutputValueClass(classOf[Result])
       // 定义输出的表格类型
     job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
  
     
     //插入HBASE :RDD+元组
     //此时数据是LogBean类型 需要转化为RDD
     val hbaseRdd = sc.makeRDD(List(bean)).map { bean =>  
       
     //创建HBASE的行对象,并指定行键
       //行键设计思路:时间错+用户ID+会话ID+本地IP+随机数
       //行键设计目的:
       //1.行键中包含时间戳 有利于按时间维度来做范围查询
       //2.包含用户会话本地id 利于通过行键正则匹配查询相关数据
       //3.行键后拼接随机数字是为了满足散列原则,避免数据集中到一个Hregion中
       
       val rowKey = bean.sstime+"_"+bean.uvid+"_"+bean.ssid+"_"+bean.cip+new Random().nextInt(100)  
       
       //新建put对象 将rowKey转化为字节数组传入
       val put = new Put(rowKey.getBytes)
       
       put.add("cf1".getBytes, "url".getBytes, bean.url.getBytes)
       put.add("cf1".getBytes, "urlname".getBytes, bean.urlname.getBytes)
       put.add("cf1".getBytes, "uvid".getBytes, bean.uvid.getBytes)
       put.add("cf1".getBytes, "ssid".getBytes, bean.ssid.getBytes)
       put.add("cf1".getBytes, "sscount".getBytes, bean.sscount.getBytes)
       put.add("cf1".getBytes, "sstime".getBytes, bean.sstime.getBytes)
       put.add("cf1".getBytes, "cip".getBytes, bean.cip.getBytes)

       //最后要放回kv结构 
       (new ImmutableBytesWritable,put)
       
    }
          
    hbaseRdd.saveAsNewAPIHadoopDataset(job.getConfiguration)

    
    
}
  
  //没有写等号 就相对于是返回Unit
  def queyByRowRegex(sc:SparkContext,starttime:Long, endtime:Long, regex:String)={
    
    val hbaseConf = HBaseConfiguration.create
    sc.hadoopConfiguration.set("hbse.zookeeper.quorum", "hadoop01,hadoop02,hadoop03")
    sc.hadoopConfiguration.set("hbse.zookeeper.property.clientPort", "2181")
    sc.hadoopConfiguration.set(TableInputFormat.INPUT_TABLE,"fluxtable")
    
    //范围查询 scan
    val scan = new Scan()
    //设定扫描范围
    scan.setStartRow(starttime.toString().getBytes)    
    scan.setStopRow(endtime.toString().getBytes)    
    
    //使用行键正则过滤器:
    //比较规则  比较对象
    val filter = new RowFilter(CompareOp.EQUAL,new RegexStringComparator(regex))
    
    //绑定过滤器到scan对象 使得扫描HBASE表格的时候过滤器会生效
    scan.setFilter(filter)

    //设置scan对象
    hbaseConf.set(TableInputFormat.SCAN,Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray()))
          
    val resultRDD = 
        sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

    resultRDD
        
  }

}

实体类LogBean代码如下:

//样例类:
//默认构造一个空构造器,public级别的,我们可以直接访问;
//无需new
//默认实现序列化
//默认实现toString
//默认实现了equals 和hashCode

//该对象用于封装日志数据
case class LogBean (
    val url:String,
    val urlname:String,
    val uvid:String,
    val ssid:String,
    val sscount:String,
    val sstime:String,
    val cip:String){
  
}


MySQL工具类:
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.text.SimpleDateFormat
import com.mchange.v2.c3p0.ComboPooledDataSource

object MysqlUtil {
  //引入c3p0连接池对象
  val dataSource=new ComboPooledDataSource
  
  def saveToMysql(mysqlBean: MysqlBean) = {
    var conn :Connection= null
    var ps1:PreparedStatement=null
    var rs1:ResultSet=null
    var ps2:PreparedStatement=null
    var ps3:PreparedStatement=null
    
    try {
      
      val sdf = new SimpleDateFormat("YYYY-MM-dd")
      //获取当天日期时间
      val nowTime =sdf.format(mysqlBean.time)
      
      //获取连接数据
      conn=dataSource.getConnection()
      //查询当天数据
      ps1=conn.prepareStatement("select * from tonji2 where reporttime=?")
      ps1.setString(1,nowTime)
      //执行查询  返回结果集 
      rs1=ps1.executeQuery()
      if(rs1.next()){
        //表示当天已经有了数据,更新各个指标,实现累加
       ps3=conn.prepareStatement("update tongji2 set pv=pv+?,uv=uv+?,vv=vv+?,newip=newip+?,newcust=newcust+? where reporttime=?")
       ps3.setInt(1,mysqlBean.pv)
       ps3.setInt(2,mysqlBean.uv)
       ps3.setInt(3,mysqlBean.vv)
       ps3.setInt(4,mysqlBean.newip)
       ps3.setInt(5,mysqlBean.newcust)
       ps3.setString(6,nowTime)
       
       ps3.executeUpdate()
      }else{
        //表示当天还有数据,则执行插入
         ps2=conn.prepareStatement("insert into tongji2 values(?,?,?,?,?,?)")
         ps2.setString(1,nowTime)
         ps2.setInt(2, mysqlBean.pv)
         ps2.setInt(3, mysqlBean.uv)
         ps2.setInt(4, mysqlBean.vv)
         ps2.setInt(5, mysqlBean.newip)
         ps2.setInt(6, mysqlBean.newcust)
         //执行插入
         ps2.executeUpdate()
      }    
      
    } catch {
      case t: Throwable => t.printStackTrace() // TODO: handle error
    }finally{
      //关闭流之前先判空   以免出现空指针异常
       if(ps3!=null)ps3.close()
       if(ps2!=null)ps2.close()
       if(ps1!=null)ps1.close()
       if(rs1!=null)rs1.close()
       if(conn!=null)conn.close()
    }
    
  }
}

MySQLBean代码:
//用于封装要插入MySQL的数据 
case class MysqlBean (time:Long,pv:Int,uv:Int,vv:Int,newip:Int,newcust:Int) {
  
}

注意:数据从kafka发送过来的时候,被SparkStream处理,处理后的数据首先会被分析统计为监控指标(MySQLBean),然后根据MySQLUtil落地到MySQL中,而日志数据(LogBean)会落地到Hbase中。





全部评论

相关推荐

02-26 16:52
门头沟学院 Java
Lunarloop:董事长亲自到ssob来要IM项目的技术方案来了
点赞 评论 收藏
分享
神哥不得了:神哥来啦~自我评价和校园经历的话可以直接删了,从大厂暑期的话应该没有什么太多问题,应该是能拿到很多大厂面试机会的,就是在面试的时候表示的好一点就行,可以在面试前先把高频top 50的八股多巩固几遍,千万不要看那些假高频八股,这两个项目的话问题不是很大,应该能够帮你找到大厂实习的,算法的话一定要刷起来,因为大厂有些还是比较看重算法的
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客企业服务