sparkstreaming实时流处理项目(二)
1.log4j的日志采集到flume
log4j.properties如下,这样可以将控制台的信息输出到flume中:
#配置
log4j.rootLogger=INFO,stdout,flume
#stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
#flume
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.48.138
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
streaming.conf将flume的信息输出到kafka,kafka下沉到logger中:
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink
#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414
#define channel
agent1.channels.logger-channel.type=memory
#define sink
agent1.sinks.log-sink.type=logger
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel
启动flume
flume-ng agent --name agent1 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/streaming.conf -Dflume.root.logger=INFO,console
运行程序产生日志文件:
flume将采集得到的文件打印:
2.将flume采集的数据下沉到kafka
启动kafka:
zkServer.sh start
kafka-server-start.sh ../config/server.properties
编写flume的streaming2.conf将flume的数据下沉到kafka,
streaming2.conf中的数据:
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink
#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414
#define channel
agent1.channels.logger-channel.type=memory
#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = streamingtopic
agent1.sinks.kafka-sink.brokerList = hadoop000:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel
在kafka中创建一个topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic streamingtopic
查看topic列表
kafka-topics.sh --list --zookeeper hadoop000:2181
在kafka中创建一个消费者去消费
kafka-console-consumer.sh --zookeeper localhost:2181 --topic streamingtopic --from-beginning
出现如下,表示成功:
sparkstreaming消费kafka数据
package com.qianliu
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/** * Spark Streaming对接Kafka */
object KafkaStreamingApp {
def main(args: Array[String]): Unit = {
//设置日志文件
LoggerLevels.setStreamingLogLevels()
//判断输入的数据长度是否符合要去
if(args.length != 4) {
System.err.println("Usage: KafkaStreamingApp <zkQuorum> <group> <topics> <numThreads>")
}
//初始化输入的数据为Array
val Array(zkQuorum, group, topics, numThreads) = args
//初始化sparkcontext
val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount")
.setMaster("local[2]")
sparkConf.set("spark.testing.memory", "512000000")
//sparkstreamingcontext
val ssc = new StreamingContext(sparkConf, Seconds(5))
//将多个topic输入
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
// TODO... Spark Streaming如何对接Kafka
val messages = KafkaUtils.createStream(ssc, zkQuorum, group,topicMap)
// TODO... 自己去测试为什么要取第二个
messages.map(_._2).count().print()
//启动spark
ssc.start()
ssc.awaitTermination()
}
}
过一段时间会输出一段数据,在时间间隙内无数据产生: