sparkstreaming实时流处理项目(三)
1.产生日志
编写一段py脚本模拟数据的产生:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#@Time : 2019/3/3 21:01
#@Author: qianliu
#@File : __init__.py.py
import random
import random
import time
url_paths = [
"class/112.html",
"class/128.html",
"class/145.html",
"class/146.html",
"class/131.html",
"class/130.html",
"learn/821",
"course/list"
]
ip_slices = [132,156,124,10,29,167,143,187,30,46,55,63,72,87,98,168]
http_referers = [
"http://www.baidu.com/swd={query}",
"https://www.sogou.com/webquery={query}",
"http://cn.bing.com/searchq={query}",
"https://search.yahoo.com/searchp={query}"
]
search_keyword = [
"Spark SQL实战",
"Hadoop基础",
"Storm实战",
"Spark Streaming实战",
"大数据面试"
]
status_codes = ["200","404","500"]
def sample_url():
return random.sample(url_paths,1)[0]
def sample_ip():
slice = random.sample(ip_slices,4)
return ".".join([str(item) for item in slice])
def sample_referer():
if random.uniform(0,1) > 0.2:
return "-"
refer_str = random.sample(http_referers,1)
query_str = random.sample(search_keyword,1)
return refer_str[0].format(query=query_str[0])
def sample_status_code():
return random.sample(status_codes,1)[0]
def generate_log(count = 10):
time_str = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
f = open("/home/hadoop/access.log","w+")
while count >= 1:
query_log = "{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\" \t{status_code}\t{referer}".format(url=sample_url(),ip=sample_ip(),referer=sample_referer(),status_code=sample_status_code(),local_time=time_str)
print(query_log)
f.write(query_log + "\n")
count = count-1
if __name__ == '__main__':
generate_log(100)
tail命令监控某段日志:
tail -200f access.log
使用crontab表达式
crontab -e
#向里面写入,就会过一分钟时间执行一次/home/hadoop/data/project/log_generator.sh这个脚本,这个脚本里面是执行生成访问记录的py脚本
*/1 * * * * /home/hadoop/data/project/log_generator.sh
这样就实现了过一段时间实现一部分访问记录: