sparkstreaming实时流处理项目(一)
1.项目流程
2.CentOS上的一些文件
目录为/home/hadoop
3.环境配置
cat ~/.bash_profile
# .bash_profile
#Get the aliases and functions
if [ -f ~/.bashrc ]; then
. ~/.bashrc
fi
#User specific environment and startup programs
export JAVA_HOME=/home/hadoop/app/jdk1.8.0_144
export PATH=$JAVA_HOME/bin:$PATH
export FLUME_HOME=/home/hadoop/app/apache-flume-1.6.0-cdh5.7.0-bin
export PATH=$FLUME_HOME/bin:$PATH
export ZK_HOME=/home/hadoop/app/zookeeper-3.4.5-cdh5.7.0
export PATH=$ZK_HOME/bin:$PATH
export KAFKA_HOME=/home/hadoop/app/kafka_2.11-0.9.0.0
export PATH=$KAFKA_HOME/bin:$PATH
export SCALA_HOME=/home/hadoop/app/scala-2.11.8
export PATH=$SCALA_HOME/bin:$PATH
export MAVEN_HOME=/home/hadoop/app/apache-maven-3.3.9
export PATH=$MAVEN_HOME/bin:$PATH
export HADOOP_HOME=/home/hadoop/app/hadoop-2.6.0-cdh5.7.0
export PATH=$HADOOP_HOME/bin:$PATH
export HBASE_HOME=/home/hadoop/app/hbase-1.2.0-cdh5.7.0
export PATH=$HBASE_HOME/bin:$PATH
export SPARK_HOME=/home/hadoop/app/spark-2.2.0-bin-2.6.0-cdh5.7.0
export PATH=$SPARK_HOME/bin:$PATH
4.flume
4.1 flume测试使用
在flume的conf文件中
vi example.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop000 #绑定到hadoop000上
a1.sources.r1.port = 44444 #端口4444
# Describe the sink
a1.sinks.k1.type = logger #下沉到logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory #channel的数据保存在内存中
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume:
flume-ng agent --name a1 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/example.conf -Dflume.root.logger=INFO,console
开一个窗口,使用如下命令:
sudo telnet hadoop000 44444
输入如下:
flume产生日志:
表示测试成功!
4.2 flume将A服务器的数据采集到B上
大概思路是A机器中exec监控某个文件,下沉数据到avro中,B机器中数据来源avro,下沉数据到logger
A机器中:
vi exec-memory-avro.conf
#加入数据:
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /home/hadoop/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = hadoop000
exec-memory-avro.sinks.avro-sink.port = 44444
exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel
机器B:
vi avro-memory-logger.conf
#加入数据
avro-memory-logger.sources = avro-source
avro-memory-logger.sinks = logger-sink
avro-memory-logger.channels = memory-channel
avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.bind = hadoop000
avro-memory-logger.sources.avro-source.port = 44444
avro-memory-logger.sinks.logger-sink.type = logger
avro-memory-logger.channels.memory-channel.type = memory
avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sinks.logger-sink.channel = memory-channel
5.kafka
5.1 kafka简单命令的测试
启动kafka:
#先启动zookeeper
zkServer.sh start
bin/kafka-server-start.sh config/server.properties
创建topic:
kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 3 --topic hello_topic
生产数据:
kafka-console-producer.sh --broker-list localhost:9092 --topic hello_topic
接收数据:
kafka-console-consumer.sh --zookeeper localhost:2181 --topic hello_topic --from-beginning
5.2 代码开发
调试过程出错:
flume kafka failed to send producer request with correlation id 2 to broker 0 with
部署Flume在Window环境中,Kafka部署在Linux上,从Flume发送事件到Kafka始终有一下错误,
经过长时间在网上搜索终于把问题解决,
修改kafka中配置项,
#advertised.host.name=
注释去掉,并配置上kafka所在linux的ip地址
advertised.host.name=192.168.10.10
重启kafka。