file-flume-kafka.conf
#为各组件命名a1.sources = r1a1.channels = c1#描述 source a1.sources.r1.type = TAILDIRa1.sources.r1.filegroups = f1a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json#自定义拦截器 校验json数据是否完整,可不加这个拦截器a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.ETLInterceptor$Builder#描述 channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannela1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092a1.channels.c1.kafka.topic = topic_loga1.channels.c1.parseAsFlumeEvent = false#绑定 source 和 channel 以及 sink 和 channel 的关系a1.sources.r1.channels = c1
kafka-flume-hdfs.conf
## 组件a1.sources=r1a1.channels=c1a1.sinks=k1## source1a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000a1.sources.r1.batchDurationMillis = 2000a1.sources.r1.kafka.bootstrap.servers =hadoop102:9092,hadoop103:9092,hadoop104:9092#可以逗号分割topics 也可以正则匹配多个topics》》topic1,topic2 ^topic[0-9]$a1.sources.r1.kafka.topics=topic_log#自定义拦截器,修改header时间为数据时间,本拦截器必须添加 java代码详见尚硅谷文档 $Builder是固定写法,前边是全类名a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type =com.atguigu.flume.interceptor.TimeStampInterceptor$Builder## channel1a1.channels.c1.type = filea1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/a1.channels.c1.maxFileSize = 2146435071a1.channels.c1.capacity = 1000000a1.channels.c1.keep-alive = 6## sink1a1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d #输出格式a1.sinks.k1.hdfs.filePrefix = log- #前缀a1.sinks.k1.hdfs.round = false#文件在128M时生成新文件、文件在1小时无写入后生成新文件a1.sinks.k1.hdfs.rollInterval = 10 #学习使用10,生产设置为3600 (3600秒一小时)a1.sinks.k1.hdfs.rollSize = 134217728a1.sinks.k1.hdfs.rollCount = 0## 控制输出文件是lzop文件,此时输出的lzo文件没有索引 还需要后续手动创建a1.sinks.k1.hdfs.fileType = CompressedStreama1.sinks.k1.hdfs.codeC = lzop## 拼装a1.sources.r1.channels = c1a1.sinks.k1.channel= c1