四、高级特性

1. 拦截器

  • Flume支持在运行时对 event 进行修改丢弃,通过配置拦截器来实现;
  • Flume里面的拦截器是实现了 org.apache.flume.interceptor.Interceptor 接口的类;
  • Flume也支持链式的拦截器执行方式,在配置文件里面配置多个拦截器就可以了;
  • 拦截器的顺序取决于它们配置的顺序,Event 按照顺序经过每一个拦截器;

时间添加戳拦截器

  • 这个拦截器会向每个 event 的 header 中添加一个时间戳属性进去,key默认是“timestamp”(也可以通过下面表格中的header来自定义),value就是当前的毫秒值(其实就是用System.currentTimeMillis() 方法得到的)。如果event已经存在同名的属性,可以选择是否保留原始的值

    1. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2322054/1607067422305-7c25920e-bcc6-4c0e-81df-b2cc9c570ffa.png#align=left&display=inline&height=183&margin=%5Bobject%20Object%5D&name=image.png&originHeight=233&originWidth=953&size=30427&status=done&style=none&width=748)<br />**时间添加拦截器测试:**<br />**<br />**1、再次运行 入门案例 中的测试,观察 event header信息,可以看见 header 为空**

    ```shell $FLUME_HOME/bin/flume-ng agent —name a1 \ —conf-file $FLUME_HOME/conf/flume-netcat-logger.conf \ -Dflume.root.logger=INFO,console

telnet linux123 8888

输入 hello world

   ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2322054/1607067508565-e397dbde-995d-49c6-9b38-e5453c15e35b.png#align=left&display=inline&height=34&margin=%5Bobject%20Object%5D&name=image.png&originHeight=40&originWidth=934&size=11577&status=done&style=shadow&width=793)<br />**2、在入门案例的基础上,在配置文件中增加时间拦截器的配置**

   - ` timestamp.conf`
```sql
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux123
a1.sources.r1.port = 8888

# ######这部分是新增 时间拦截器的 内容######
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

# 是否保留Event header中已经存在的同名时间戳,缺省值false
a1.sources.r1.interceptors.i1.preserveExisting= false
# ################################################

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 500

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、启动Agent,启动 telnet 输入信息

$FLUME_HOME/bin/flume-ng agent --name a1 \
--conf-file $FLUME_HOME/conf/timestamp.conf \
-Dflume.root.logger=INFO,console

telnet linux3 8888
# 输入 hello world

image.png

Host添加拦截器

  • 这个拦截器会把当前Agent的 hostname 或者 IP 地址写入到Event的header中,key默认是“host”(也可以通过配置自定义key),value可以选择使用hostname或者IP地址。

image.png
host添加拦截器测试:
1、在时间拦截器案例的基础上,在配置文件中增加主机名拦截器的配置

  • hostname.conf ```sql

    Name the components on this agent

    a1.sources = r1 a1.sinks = k1 a1.channels = c1

configure the source

a1.sources.r1.type = netcat a1.sources.r1.bind = linux123 a1.sources.r1.port = 8888

################这部分是新增 时间拦截器 的内容

a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = timestamp a1.sources.r1.interceptors.i1.preserveExisting= false

#

#################这部分是新增 主机名拦截器 的内容

a1.sources.r1.interceptors.i2.type = host

如果header中已经存在同名的属性是否保留

a1.sources.r1.interceptors.i2.preserveExisting= false

true:使用IP地址;false:使用hostname

a1.sources.r1.interceptors.i2.useIP = false

#

Describe the sink

a1.sinks.k1.type = logger

Use a channel which buffers events in memory

a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 500

Bind the source and sink to the channel

a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

**2、启动Agent,启动 telnet 输入信息**

- 可以看见event headers 中 增加了主机名信息、时间戳信息

![image.png](https://cdn.nlark.com/yuque/0/2020/png/2322054/1607068467913-69d26b24-de7c-4455-bfe4-d8c9ef8c62a4.png#align=left&display=inline&height=58&margin=%5Bobject%20Object%5D&name=image.png&originHeight=76&originWidth=941&size=18717&status=done&style=shadow&width=721)
```shell
$FLUME_HOME/bin/flume-ng agent --name a1 \
--conf-file $FLUME_HOME/conf/hostname.conf \
-Dflume.root.logger=INFO,console

telnet linux123 8888
# 输入 hello world

正则表达式过滤拦截器

  • 这个拦截器会把Event的body当做字符串来处理,并用配置的正则表达式来匹配。可以配置指定被匹配到的Event丢弃还是没被匹配到的Event丢弃
  • 具体教程请看官网文档

2. 选择器

复制选择器

  • source 可以向多个 channel 同时写数据,所以也就产生了以何种方式向多个 channel 写的问题:
    • replication(复制,缺省)。数据完整地发送到每一个channel;
    • multiplexing(多路复用)。通过配置来按照一定的规则进行分发;

image.png

a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3
  • 上面这个例子中,c3配置成了可选的。向c3发送数据如果失败了会被忽略。c1和c2没有配置成可选的,向c1和c2写数据失败会导致事务失败回滚。


多路复用选择器

image.png

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state #以每个Event的header中的state这个属性的值作为选择channel的依据
a1.sources.r1.selector.mapping.CZ = c1 #如果state=CZ,则选择c1这个channel
a1.sources.r1.selector.mapping.US = c2 c3 #如果state=US,则选择c2 和 c3 这两个channel
a1.sources.r1.selector.default = c4 #默认使用c4这个channel

自定义选择器

  • 自定义选择器就是开发一个 org.apache.flume.ChannelSelector 接口的实现类。实现类以及依赖的jar包在启动时候都必须放入Flume的classpath

image.png

a1.sources = r1
a1.channels = c1
a1.sources.r1.selector.type = org.liyifeng.flume.channel.MyChannelSelector

3. Sink组逻辑处理器

  • 可以把多个sink分成一个组, Sink组逻辑处理器可以对这同一个组里的几个sink进行负载均衡 或者 其中一个sink发生故障后将输出Event的任务转移到其他的sink上

  • N个sink将Event输出到对应的N个目的地的,通过 Sink组逻辑处理器 可以把这N个sink配置成负载均衡或者故障转移的工作方式:

    • 负载均衡是将channel里面的Event,按照配置的负载机制(比如轮询)分别发送到sink各自对应的目的地
    • 故障转移是这N个sink同一时间只有一个在工作,其余的作为备用,工作的sink挂掉之后备用的sink顶上

image.png

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
  • 默认
    • 默认的组逻辑处理器就是只有一个sink的情况,这种情况就没必要配置sink组了。前面的例子都是 source -channel - sink这种一对一,单个sink的

故障转移

  • 故障转移组逻辑处理器维护了一个发送Event失败的sink的列表,保证有一个sink是可用的来发送Event。

  • 故障转移机制的工作原理是将故障sink降级到一个池中,在池中为它们分配冷却期(超时时间),在重试之前随顺序故障而增加。 Sink成功发送事件后,它将恢复到实时池。sink具有与之相关的优先级,数值越大,优先级越高。如果在发送Event时Sink发生故障,会继续尝试下一个具有最高优先级的sink。 例如,在优先级为80的sink之前激活优先级为100的sink。如果未指定优先级,则根据配置中的顺序来选取。

  • 要使用故障转移选择器,不仅要设置sink组的选择器为failover,还有为每一个sink设置一个唯一的优先级数值。可以使用 maxpenalty 属性设置故障转移时间的上限(毫秒)。

image.png

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

负载均衡

  • 负载均衡Sink 选择器提供了在多个sink上进行负载均衡流量的功能。 它维护一个活动sink列表的索引来实现负载的分配。 支持轮询( round_robin )【默认值】和 随机( random )两种选择机制分配负载。

  • 工作时,此选择器使用其配置的选择机制选择下一个sink并调用它。 如果所选sink无法正常工作,则处理器通过其配置的选择机制选择下一个可用sink。 此实现不会将失败的Sink列入黑名单,而是继续乐观地尝试每个可用的Sink。

  • 如果所有sink调用都失败了,选择器会将故障抛给sink的运行器。

  • 如果 backoff 设置为true则启用了退避机制,失败的sink会被放入黑名单,达到一定的超时时间后会自动从黑名单移除。 如从黑名单出来后sink仍然失败,则再次进入黑名单而且超时时间会翻倍,以避免在无响应的sink上浪费过长时间。 如果没有启用退避机制,在禁用此功能的情况下,发生sink传输失败后,会将本次负载传给下一个sink继续尝试,因此这种情况下是不均衡的。

image.png

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

4. 事务机制与可靠性

  • 一提到事务,首先就想到的是关系型数据库中的事务,事务一个典型的特征就是将一批操作做成原子性的,要么都成功,要么都失败
  • 在Flume中一共有两个事务:

    • Put事务。在Source到Channel之间
    • Take事务。Channel到Sink之间
  • 从 Source 到 Channel 过程中,数据在 Flume 中会被封装成 Event 对象,也就是一批 Event ,把这批 Event 放到一个事务中,把这个事务也就是这批event一次性的放入Channel 中。同理,Take事务的时候,也是把这一批event组成的事务统一拿出来到sink放到HDFS上

image.png

Flume中的 Put 事务

  • 事务开始的时候会调用一个doPut 方法, doPut 方法将一批数据放在putList中;
    • putList在向 Channel 发送数据之前先检查 Channel 的容量能否放得下,如果放不下一个都不放,只能doRollback;
    • 数据批的大小取决于配置参数 batch size 的值;
    • putList的大小取决于配置 Channel 的参数transaction capacity 的大小,该参数大小就体现在putList上;(Channel的另一个参数capacity 指的是 Channel 的容量);
  • 数据顺利的放到putList之后,接下来可以调用doCommit 方法,把putList中所有的 Event 放到 Channel 中,成功放完之后就清空putList;

  • 在doCommit提交之后,事务在向 Channel 存放数据的过程中,事务容易出问题。如 Sink取数据慢,而 Source 放数据速度快,容易造成 Channel 中数据的积压,如果 putList 中的数据放不进去,会如何呢?

  • 此时会调用 doRollback 方法,doRollback方法会进行两项操作:将putList清空; 抛出 ChannelException异常。source会捕捉到doRollback抛出的异常,然后source就将刚才的一批数据重新采集,然后重新开始一个新的事务,这就是事务的回滚

    Flume中的 Take 事务

  • Take事务同样也有takeList,HDFS sink配置有一个 batch size,这个参数决定 Sink从 Channel 取数据的时候一次取多少个,所以该 batch size 得小于 takeList 的大小,而takeList的大小取决于transaction capacity 的大小,同样是channel 中的参数。

事务开始后:

  • doTake方法会将channel中的event剪切到takeList中。如果后面接的是HDFS Sink的话,在把Channel中的event剪切到takeList中的同时也往写入HDFS的IO缓冲流中放一份event(数据写入HDFS是先写入IO缓冲流然后flush到HDFS);
  • 当takeList中存放了batch size 数量的event之后,就会调用doCommit方法,doCommit方法会做两个操作:

    • 1、针对HDFS Sink,手动调用IO流的flush方法,将IO流缓冲区的数据写入到HDFS磁盘中;
    • 2、清空takeList中的数据
  • flush到HDFS的时候组容易出问题。flush到HDFS的时候,可能由于网络原因超时导致数据传输失败,这个时候调用doRollback方法来进行回滚,回滚的时候由于 takeList 中还有备份数据,所以将takeList中的数据原封不动地还给channel,这时候就完成了事务的回滚。

  • 但是,如果flush到HDFS的时候,数据flush了一半之后出问题了,这意味着已经有一半的数据已经发送到HDFS上面了,现在出了问题,同样需要调用doRollback方法来进行回滚,回滚并没有“一半”之说,它只会把整个takeList中的数据返回给channel,然后继续进行数据的读写。这样开启下一个事务的时候容易造成数据重复的问题。

  • Flume在数据进行采集传输的时候,有可能会造成数据的重复,但不会丢失数据。

  • Flume在数据传输的过程中是否可靠,还需要考虑具体使用Source、Channel、Sink的类型。

    • 分析Source
      • exec Source ,后面接 tail -f ,这个数据也是有可能丢的
      • TailDir Source ,这个是不会丢数据的,它可以保证数据不丢失
    • 分析sink
      • Hdfs Sink,数据有可能重复,但是不会丢失
    • 最后,分析channel。理论上说:要想数据不丢失的话,还是要用 File channel;memory channel 在 Flume挂掉的时候是有可能造成数据的丢失的。
    • 生产中常使用 **TailDir sourceHDFS sink**,虽然数据会重复但是不会丢失

      5. 高可用案例

  • 案例:实现Agent的故障转移

image.png
1、配置环境

  • 在linux121、linux122上部署Flume、修改环境变量 ```powershell

    在liunx123上执行

    /opt/lagou/servers scp -r flume-1.9.0/ linux121:$PWD scp -r flume-1.9.0/ linux122:$PWD

cd /etc scp profile linux121:$PWD scp profile linux122:$PWD

在linux121、linux122上分别执行

source /etc/profile

**2、conf 文件**

- linux123:`flume-taildir-avro.conf`
```sql
# agent name
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2

# source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/flume_log/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/root/.*log
a1.sources.r1.fileHeader = true

# interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = Type
a1.sources.r1.interceptors.i1.value = LOGIN

# 在event header添加了时间戳
a1.sources.r1.interceptors.i2.type = timestamp

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 500

# sink group
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2

# set sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux121
a1.sinks.k1.port = 9999

# set sink2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux122
a1.sinks.k2.port = 9999

# set failover(分配sink1和sink2的权重)
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 100
a1.sinkgroups.g1.processor.priority.k2 = 60
a1.sinkgroups.g1.processor.maxpenalty = 10000
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
  • linux121:flume-avro-hdfs.conf ```sql

    set Agent name

    a2.sources = r1 a2.channels = c1 a2.sinks = k1

Source

a2.sources.r1.type = avro a2.sources.r1.bind = linux121 a2.sources.r1.port = 9999

interceptor

a2.sources.r1.interceptors = i1 a2.sources.r1.interceptors.i1.type = static a2.sources.r1.interceptors.i1.key = Collector a2.sources.r1.interceptors.i1.value = linux121

set channel

a2.channels.c1.type = memory a2.channels.c1.capacity = 10000 a2.channels.c1.transactionCapacity = 500

HDFS Sink

a2.sinks.k1.type=hdfs a2.sinks.k1.hdfs.path=hdfs://linux121:9000/flume/failover/ a2.sinks.k1.hdfs.fileType=DataStream a2.sinks.k1.hdfs.writeFormat=TEXT a2.sinks.k1.hdfs.rollInterval=60 a2.sinks.k1.hdfs.filePrefix=%Y-%m-%d a2.sinks.k1.hdfs.minBlockReplicas=1 a2.sinks.k1.hdfs.rollSize=0 a2.sinks.k1.hdfs.rollCount=0 a2.sinks.k1.hdfs.idleTimeout=0 a2.sources.r1.channels = c1 a2.sinks.k1.channel=c1


- linux122:`flume-avro-hdfs.conf`
```sql
# set Agent name
a3.sources = r1
a3.channels = c1
a3.sinks = k1

# Source
a3.sources.r1.type = avro
a3.sources.r1.bind = linux122
a3.sources.r1.port = 9999

# interceptor
a3.sources.r1.interceptors = i1
a3.sources.r1.interceptors.i1.type = static
a3.sources.r1.interceptors.i1.key = Collector
a3.sources.r1.interceptors.i1.value = linux122

# set channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 500

# HDFS Sink
a3.sinks.k1.type=hdfs
a3.sinks.k1.hdfs.path=hdfs://linux121:9000/flume/failover/
a3.sinks.k1.hdfs.fileType=DataStream
a3.sinks.k1.hdfs.writeFormat=TEXT
a3.sinks.k1.hdfs.rollInterval=60
a3.sinks.k1.hdfs.filePrefix=%Y-%m-%d
a3.sinks.k1.hdfs.minBlockReplicas=1
a3.sinks.k1.hdfs.rollSize=0
a3.sinks.k1.hdfs.rollCount=0
a3.sinks.k1.hdfs.idleTimeout=0
a3.sources.r1.channels = c1
a3.sinks.k1.channel=c1

3、分别在linux121、linux122、linux123上启动对应服务(**先启动下游的agent**)

# linux121
flume-ng agent --name a2 --conf-file ~/conf/flume-avro-hdfs.conf

# linux122
flume-ng agent --name a3 --conf-file ~/conf/flume-avro-hdfs.conf

# linux123
flume-ng agent --name a1 --conf-file ~/conf/flume-taildir-avro2.conf

4、先hive.log中写入数据,检查HDFS目录
5、杀掉一个Agent,看看另外Agent是否能启动