Flume 配置分析 针对本学习项目,在编写 Flume Agent配置文件之前,首先需要进行组件选型。

Source

本项目主要从一个实时写入数据的文件夹中读取数据,Source 可以选择 Spooling Directory Source. 、Exec Source 和 Taildir Source。Taildir Source 相比 Exec Source、Spooling Directory Source 具有很多优势。Taildir Source 可以实现断点续传、多目录监控配置。而在 Flume 1.6以前需要用户自定义Source,记录每次读取文件的位置,从而实现断点续传。Exec Source 可以实时搜集数据,但是在 Flume 不运行或者Shell命令出错的情况下,数据将会丢失,从而不能记录数据读取位置、实现断点续传。Spooling Directory Source 可以实现目录监控配置,但是不能实时采集数据。

Channel

由于采集日志层 Flume 在读取数据后主要将数据送往 Kafka 消息队列中,所以使用 Kafka Channel是很好的选择,同时选择Kafka Channel 可以不配置 Sink,提高了效率。

拦截器

本项目中主要部署两个拦截器,一个用来过滤格式不正确的非法数据,这在实际生产环境中也是必不可少的,另一个用来分辨日志类型,根据日志类型给 event添加 header 信息,可以帮助 Channel 选择器选择日志应该发往的Channel。

Channel 选择器

采集日志层 Flume 主要部署两个Kafka Channel,分别将数据发往不同的 Kafka Topic,两个 Topic 存储的数据不同,所以需要配置 Channel 选择器决定日志去向,并且配置选择器类型为 multiplexing,在该模式下,会将 event 发送至特定的 Channel,而不会发送至所有 Channel,实现了日志的分类分流。

Flume 的具体配置


在flume/conf 目录下创建 file-flume-kafka.conf文件。

  1. vi file-flume-kafka.conf
  2. #flume agent 组件声明
  3. #定义agent必须的组件名称,同时指定本配置文件的agent名称为a1
  4. a1.sources=r1
  5. a1.channels= c1 c2
  6. #定义source组件相关配置
  7. #配置Source 类型为Taildir
  8. a.sources.r1.type=TAILDIR
  9. #配置taildir source,保存断点位置文件目录
  10. a1.sources.r1.positionFile =/root/flume/test/log_position.json
  11. #配置监控目录组
  12. a1.sources.r1.filegroups = f1
  13. #配置监控目录组下的目录,可多个
  14. a1.sources.r1.filegroups.f1=/tmp/logs/app.+
  15. #配置source 发送数据的目标channel
  16. a1.sources.ri.channel = c1 c2
  17. #配置拦截器-自定义即可。
  18. #配置拦截器名称,需写明全类名
  19. a.sources.r1.interceptors = i1 i2
  20. a.sources.r1.interceptors.i1.type =
  21. com.bigdata.flume.interceptor.LogETLInterceptor$Builder
  22. a.sources.r1.interceptors.i2.type =
  23. com.bigdata.flume.interceptor.LogTypeInterceptor$Builder
  24. ##channel选择器配置
  25. #channel1类型
  26. a1.sources.r1.selector.type = multiplexing
  27. #配置选择器识别header中的key
  28. a1.sources.r1.selector.header = topic
  29. #配置不同的header信息,发往不同的channel
  30. a1.sources.r1.selector.mapping.topic_start= c1
  31. a1.sources.r1.selector.mapping.topic_event= c2
  32. #配置channel类型、kafka集群节点服务器列表
  33. a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
  34. a1.channels.c1.kafka.bootstrap.servers=
  35. hadoop001:9092,hadoop002:9092,hadoop003:9092
  36. #配置该channel发往kafka的topic,该topic需要在kafka中提前创建
  37. a1.channels.c1.kafka.topic=topic_start
  38. #配置不讲header信息解析为event内容
  39. a1.channels.c1.parseAsFlumeEvent = false
  40. #配置该kafka channel所属消费者组名,为实现multiplexing类型的channel选择器,应将2个kafka channel配置相同的消费者组
  41. a1.channels.c1.kafka.consumer.group.id= flume-consumer
  42. a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
  43. a1.channels.c2.kafka.bootstrap.servers=
  44. hadoop001:9092,hadoop002:9092,hadoop003:9092
  45. a1.channels.c2.kafka.topic=topic_event
  46. a1.channels.c2.parseAsFlumeEvent = false
  47. a1.channels.c2.kafka.consumer.group.id= flume-consumer

Flume的ETL拦截器和日志类型区分拦截器

拦截器编写

待补充