将日志从采集日志层flume发送到kafka集群后,接下来的需求是将日志数据通过flume进行落盘存储到hdfs。
部署Flume Agent到hadoop003上,进而实现hadoop001、hadoop002负责日志的生产和采集、hadoop003负责日志的消费存储。
消费日志层flume集群规划
| hadoop001 | hadoop002 | hadoop003 | |
|---|---|---|---|
| Flume(消费kafka) | flume |
消费日志Flume配置
配置分析
消费日志层flume主要从Kafka中读取消息,所以要选用kafka Source 。
channel选用File Channel,能最大限度避免数据丢失。
sink选用HDFS Sink,可以将日志直接落盘到HDFS中。
Flume配置
创建配置文件
touch /root/flume/conf/kafka-flume-hdfs.confvi kafka-flume-hdfs.conf#文件内容如下#flume agent 组件声明a1.sources=r1 r2a1.channels= c1 c2a1.sinks=k1 k2#Source1属性配置#配置Source 类型为kafka sourcea.sources.r1.type=org.apache.flume.source.kafka.kafkaSource#配置kafka source 每次从kafka topic中拉取的event个数a1.sources.r1.batchSize =5000#配置拉取数据批次间隔为2000msa1.sources.r1.batchDurationMillis = 2000#配置kafka集群地址a1.sources.ri.kafka.bootstrap.servers =hadoop001:9092,hadoop002:9092,hadoop003:9092#配置source 对接kafka主题a1.sources.ri.kafka.topics=topic_start##source2属性配置,与source1配置相似,只是消费主题不同a1.sources.r2.type = org.apache.flume.source.kafka.kafkaSourcea1.sources.r1.batchSize =5000a1.sources.r1.batchDurationMillis = 2000a1.sources.ri.kafka.bootstrap.servers =hadoop001:9092,hadoop002:9092,hadoop003:9092a1.sources.ri.kafka.topics=topic_event##channel1属性配置#channel1类型为file channela1.channels.c1.type =file#配置存储file channel传输数据的断点信息目录a1.channels.c1.checkpointDir = /root/flume/checkpoint/behavior1#配置file channel 传输数据的存储位置a1.channels.c1.dataDirs = /root/flume/data/behavior1/#配置file channel最大存储容量a1.channels.c1.maxFileSize = 2146435071#配置file channel最多存储event个数a1.channels.c1.capacity = 1000000#配置channel满时,put事物的超时时间a1.channels.c1.keep-alive = 6#配置channel2属性配置同channel1,配置不同路径a1.channels.c2.type =filea1.channels.c2.checkpointDir = /root/flume/checkpoint/behavior2a1.channels.c2.dataDirs = /root/flume/data/behavior2/a1.channels.c2.maxFileSize = 2146435071a1.channels.c2.capacity = 1000000a1.channels.c2.keep-alive = 6#sink1属性配置a1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%da1.sinks.k1.hdfs.filePrefix = logstart-#sink2属性配置同sink1a1.sinks.k2.type = hdfsa1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%da1.sinks.k2.hdfs.filePrefix = logstart-#避免产生大量小文件的相关属性配置a1.sinks.k1.hdfs.rollInterval = 10a1.sinks.k1.hdfs.rollSize = 134217728a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k2.hdfs.rollInterval = 10a1.sinks.k2.hdfs.rollSize = 134217728a1.sinks.k2.hdfs.rollCount = 0#控制输出文件是压缩文件a1.sinks.k1.hdfs.fileType = CompressedStreama1.sinks.k2.hdfs.fileType = CompressedStreama1.sinks.k1.hdfs.codeC = lzopa1.sinks.k2.hdfs.codeC = lzop#拼装a1.sources.r1.channels = c1a1.sinks.k1.channel = c1a1.sources.r2.channels = c2a1.sinks.k2.channel = c2
消费日志Flume启动,停止脚本
vi f2.sh#! /bin/bashcase $1 in"start"){for i in hadoop003doecho "-------启动 $i 消费flume ---------"ssh $i "source /etc/profile;nohup /root/flume/bin/fulme-ng agent --conf-file /root/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/root/flume/log.txt 2>&1 &"done};;"stop"){for i in hadoop003doecho "-------停止 $i 消费flume ---------"ssh $i " ps -ef|grep kafka-flume-hdfs |grep -v grep |awk '{print$2}' |xargs kill 9"done};;esac:wq
注:如xargs kill 后不带参数,会报错(提示kill加入参数)
chmod 777 f2.sh #加权限./f2.sh start./f2.sh stop
结果如下
