file-flume-kafka.conf

  1. #为各组件命名
  2. a1.sources = r1
  3. a1.channels = c1
  4. #描述
  5. source a1.sources.r1.type = TAILDIR
  6. a1.sources.r1.filegroups = f1
  7. a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
  8. a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
  9. #自定义拦截器 校验json数据是否完整,可不加这个拦截器
  10. a1.sources.r1.interceptors = i1
  11. a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.ETLInterceptor$Builder
  12. #描述
  13. channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
  14. a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
  15. a1.channels.c1.kafka.topic = topic_log
  16. a1.channels.c1.parseAsFlumeEvent = false
  17. #绑定 source 和 channel 以及 sink 和 channel 的关系
  18. a1.sources.r1.channels = c1

kafka-flume-hdfs.conf

  1. ## 组件
  2. a1.sources=r1
  3. a1.channels=c1
  4. a1.sinks=k1
  5. ## source1
  6. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
  7. a1.sources.r1.batchSize = 5000
  8. a1.sources.r1.batchDurationMillis = 2000
  9. a1.sources.r1.kafka.bootstrap.servers =hadoop102:9092,hadoop103:9092,hadoop104:9092
  10. #可以逗号分割topics 也可以正则匹配多个topics》》topic1,topic2 ^topic[0-9]$
  11. a1.sources.r1.kafka.topics=topic_log
  12. #自定义拦截器,修改header时间为数据时间,本拦截器必须添加 java代码详见尚硅谷文档 $Builder是固定写法,前边是全类名
  13. a1.sources.r1.interceptors = i1
  14. a1.sources.r1.interceptors.i1.type =com.atguigu.flume.interceptor.TimeStampInterceptor$Builder
  15. ## channel1
  16. a1.channels.c1.type = file
  17. a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
  18. a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
  19. a1.channels.c1.maxFileSize = 2146435071
  20. a1.channels.c1.capacity = 1000000
  21. a1.channels.c1.keep-alive = 6
  22. ## sink1
  23. a1.sinks.k1.type = hdfs
  24. a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d #输出格式
  25. a1.sinks.k1.hdfs.filePrefix = log- #前缀
  26. a1.sinks.k1.hdfs.round = false
  27. #文件在128M时生成新文件、文件在1小时无写入后生成新文件
  28. a1.sinks.k1.hdfs.rollInterval = 10 #学习使用10,生产设置为3600 (3600秒一小时)
  29. a1.sinks.k1.hdfs.rollSize = 134217728
  30. a1.sinks.k1.hdfs.rollCount = 0
  31. ## 控制输出文件是lzop文件,此时输出的lzo文件没有索引 还需要后续手动创建
  32. a1.sinks.k1.hdfs.fileType = CompressedStream
  33. a1.sinks.k1.hdfs.codeC = lzop
  34. ## 拼装
  35. a1.sources.r1.channels = c1
  36. a1.sinks.k1.channel= c1