将日志从采集日志层flume发送到kafka集群后,接下来的需求是将日志数据通过flume进行落盘存储到hdfs。
部署Flume Agent到hadoop003上,进而实现hadoop001、hadoop002负责日志的生产和采集、hadoop003负责日志的消费存储。
消费日志层flume集群规划
hadoop001 | hadoop002 | hadoop003 | |
---|---|---|---|
Flume(消费kafka) | flume |
消费日志Flume配置
配置分析
消费日志层flume主要从Kafka中读取消息,所以要选用kafka Source 。
channel选用File Channel,能最大限度避免数据丢失。
sink选用HDFS Sink,可以将日志直接落盘到HDFS中。
Flume配置
创建配置文件
touch /root/flume/conf/kafka-flume-hdfs.conf
vi kafka-flume-hdfs.conf
#文件内容如下
#flume agent 组件声明
a1.sources=r1 r2
a1.channels= c1 c2
a1.sinks=k1 k2
#Source1属性配置
#配置Source 类型为kafka source
a.sources.r1.type=org.apache.flume.source.kafka.kafkaSource
#配置kafka source 每次从kafka topic中拉取的event个数
a1.sources.r1.batchSize =5000
#配置拉取数据批次间隔为2000ms
a1.sources.r1.batchDurationMillis = 2000
#配置kafka集群地址
a1.sources.ri.kafka.bootstrap.servers =
hadoop001:9092,hadoop002:9092,hadoop003:9092
#配置source 对接kafka主题
a1.sources.ri.kafka.topics=topic_start
##source2属性配置,与source1配置相似,只是消费主题不同
a1.sources.r2.type = org.apache.flume.source.kafka.kafkaSource
a1.sources.r1.batchSize =5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.ri.kafka.bootstrap.servers =
hadoop001:9092,hadoop002:9092,hadoop003:9092
a1.sources.ri.kafka.topics=topic_event
##channel1属性配置
#channel1类型为file channel
a1.channels.c1.type =file
#配置存储file channel传输数据的断点信息目录
a1.channels.c1.checkpointDir = /root/flume/checkpoint/behavior1
#配置file channel 传输数据的存储位置
a1.channels.c1.dataDirs = /root/flume/data/behavior1/
#配置file channel最大存储容量
a1.channels.c1.maxFileSize = 2146435071
#配置file channel最多存储event个数
a1.channels.c1.capacity = 1000000
#配置channel满时,put事物的超时时间
a1.channels.c1.keep-alive = 6
#配置channel2属性配置同channel1,配置不同路径
a1.channels.c2.type =file
a1.channels.c2.checkpointDir = /root/flume/checkpoint/behavior2
a1.channels.c2.dataDirs = /root/flume/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6
#sink1属性配置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
#sink2属性配置同sink1
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logstart-
#避免产生大量小文件的相关属性配置
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
#控制输出文件是压缩文件
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop
#拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel = c2
消费日志Flume启动,停止脚本
vi f2.sh
#! /bin/bash
case $1 in
"start"){
for i in hadoop003
do
echo "-------启动 $i 消费flume ---------"
ssh $i "source /etc/profile;nohup /root/flume/bin/fulme-ng agent --conf-file /root/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/root/flume/log.txt 2>&1 &"
done
};;
"stop"){
for i in hadoop003
do
echo "-------停止 $i 消费flume ---------"
ssh $i " ps -ef|grep kafka-flume-hdfs |grep -v grep |awk '{print$2}' |xargs kill 9"
done
};;
esac
:wq
注:如xargs kill 后不带参数,会报错(提示kill加入参数)
chmod 777 f2.sh #加权限
./f2.sh start
./f2.sh stop
结果如下