Flume 配置分析 针对本学习项目,在编写 Flume Agent配置文件之前,首先需要进行组件选型。
Source
本项目主要从一个实时写入数据的文件夹中读取数据,Source 可以选择 Spooling Directory Source. 、Exec Source 和 Taildir Source。Taildir Source 相比 Exec Source、Spooling Directory Source 具有很多优势。Taildir Source 可以实现断点续传、多目录监控配置。而在 Flume 1.6以前需要用户自定义Source,记录每次读取文件的位置,从而实现断点续传。Exec Source 可以实时搜集数据,但是在 Flume 不运行或者Shell命令出错的情况下,数据将会丢失,从而不能记录数据读取位置、实现断点续传。Spooling Directory Source 可以实现目录监控配置,但是不能实时采集数据。
Channel
由于采集日志层 Flume 在读取数据后主要将数据送往 Kafka 消息队列中,所以使用 Kafka Channel是很好的选择,同时选择Kafka Channel 可以不配置 Sink,提高了效率。
拦截器
本项目中主要部署两个拦截器,一个用来过滤格式不正确的非法数据,这在实际生产环境中也是必不可少的,另一个用来分辨日志类型,根据日志类型给 event添加 header 信息,可以帮助 Channel 选择器选择日志应该发往的Channel。
Channel 选择器
采集日志层 Flume 主要部署两个Kafka Channel,分别将数据发往不同的 Kafka Topic,两个 Topic 存储的数据不同,所以需要配置 Channel 选择器决定日志去向,并且配置选择器类型为 multiplexing,在该模式下,会将 event 发送至特定的 Channel,而不会发送至所有 Channel,实现了日志的分类分流。
Flume 的具体配置
在flume/conf 目录下创建 file-flume-kafka.conf文件。
vi file-flume-kafka.conf
#flume agent 组件声明
#定义agent必须的组件名称,同时指定本配置文件的agent名称为a1
a1.sources=r1
a1.channels= c1 c2
#定义source组件相关配置
#配置Source 类型为Taildir
a.sources.r1.type=TAILDIR
#配置taildir source,保存断点位置文件目录
a1.sources.r1.positionFile =/root/flume/test/log_position.json
#配置监控目录组
a1.sources.r1.filegroups = f1
#配置监控目录组下的目录,可多个
a1.sources.r1.filegroups.f1=/tmp/logs/app.+
#配置source 发送数据的目标channel
a1.sources.ri.channel = c1 c2
#配置拦截器-自定义即可。
#配置拦截器名称,需写明全类名
a.sources.r1.interceptors = i1 i2
a.sources.r1.interceptors.i1.type =
com.bigdata.flume.interceptor.LogETLInterceptor$Builder
a.sources.r1.interceptors.i2.type =
com.bigdata.flume.interceptor.LogTypeInterceptor$Builder
##channel选择器配置
#channel1类型
a1.sources.r1.selector.type = multiplexing
#配置选择器识别header中的key
a1.sources.r1.selector.header = topic
#配置不同的header信息,发往不同的channel
a1.sources.r1.selector.mapping.topic_start= c1
a1.sources.r1.selector.mapping.topic_event= c2
#配置channel类型、kafka集群节点服务器列表
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers=
hadoop001:9092,hadoop002:9092,hadoop003:9092
#配置该channel发往kafka的topic,该topic需要在kafka中提前创建
a1.channels.c1.kafka.topic=topic_start
#配置不讲header信息解析为event内容
a1.channels.c1.parseAsFlumeEvent = false
#配置该kafka channel所属消费者组名,为实现multiplexing类型的channel选择器,应将2个kafka channel配置相同的消费者组
a1.channels.c1.kafka.consumer.group.id= flume-consumer
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers=
hadoop001:9092,hadoop002:9092,hadoop003:9092
a1.channels.c2.kafka.topic=topic_event
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id= flume-consumer
Flume的ETL拦截器和日志类型区分拦截器
拦截器编写
待补充