前提

https://blog.csdn.net/qq_41489540/article/details/109175329

编写Flume配置文件

f1.conf

  1. #a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
  2. a1.sources = r1
  3. a1.channels = c1 c2
  4. #组名名.属性名=属性值
  5. a1.sources.r1.type=TAILDIR
  6. a1.sources.r1.filegroups=f1
  7. # 一批写多少个
  8. a1.sources.r1.batchSize=1000
  9. #读取/tmp/logs/app-yyyy-mm-dd.log ^代表以xxx开头$代表以什么结尾 .代表匹配任意字符
  10. #+代表匹配任意位置
  11. a1.sources.r1.filegroups.f1=/tmp/logs/^app.+.log$
  12. #JSON文件的保存位置
  13. a1.sources.r1.positionFile=/root/soft/apache-flume-1.7.0/custdata/log_position.json
  14. #定义拦截器
  15. a1.sources.r1.interceptors = i1
  16. a1.sources.r1.interceptors.i1.type = com.atguigu.dw.flume.MyInterceptor$Builder
  17. #定义ChannelSelector
  18. a1.sources.r1.selector.type = multiplexing
  19. a1.sources.r1.selector.header = topic
  20. # 要和你自己写的拦截器一样才行.
  21. a1.sources.r1.selector.mapping.topic_start = c1
  22. a1.sources.r1.selector.mapping.topic_event = c2
  23. #定义chanel为 KafkaChannel
  24. a1.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannel
  25. # Kafka地址
  26. a1.channels.c1.kafka.bootstrap.servers=zjj101:9092,zjj102:9092,zjj103:9092
  27. #定义Kafka主题地址.如果不自己创建的话会自动创建,建议自己创建,自己创建可以指定分区和副本.
  28. a1.channels.c1.kafka.topic=topic_start
  29. # 不希望存header信息
  30. a1.channels.c1.parseAsFlumeEvent=false
  31. a1.channels.c2.type=org.apache.flume.channel.kafka.KafkaChannel
  32. a1.channels.c2.kafka.bootstrap.servers=zjj101:9092,zjj102:9092,zjj103:9092
  33. a1.channels.c2.kafka.topic=topic_event
  34. a1.channels.c2.parseAsFlumeEvent=false
  35. #连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
  36. a1.sources.r1.channels=c1 c2

创建两个topic

创建 topic_start 和 topic_event ,都是三个分区两个副本数
Flume将Tomcat日志收集到Kafka里面 - 图1

启动Flume收集日志

  1. [root@zjj101 conf]# flume-ng agent -c conf/ -n a1 -f /root/soft/apache-flume-1.7.0/conf/f1.conf -Dflume.root.logger=DEBUG,console

查看Flume记录日志的json文件

发现已经读了app-2020-10-15.log 文件.

  1. [root@zjj101 custdata]# cat log_position.json
  2. [{"inode":1870589,"pos":696353,"file":"/tmp/logs/app-2020-10-15.log"}]
  3. [root@zjj101 custdata]#

用kafka 工具查看

Flume将Tomcat日志收集到Kafka里面 - 图2
发现已经有数据了