Failover Sink Processor

Failover Sink Processor维护了一个多个sink的有优先级的列表,按照优先级保证,至少有一个sink是可以干活的(处理event的)
如果根据优先级发现,优先级高的sink故障了,故障的sink会被转移到一个故障的池中冷却.
在冷却时,故障的sink也会不管尝试发送event,一旦发送成功,此时会将故障的sink再移动到存活的池中.

必需配置:
sinks – Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be failover
processor.priority. – Priority value. must be one of the sink instances associated with the current sink group A higher priority value Sink gets activated earlier. A larger absolute value indicates higher priority

案例

使用Flume1监控一个端口,其sink组中的sink分别对接Flume2和Flume3,采用FailoverSinkProcessor,实现故障转移的功能。
Flume故障转移Failover Sink Processor和demo - 图1
案例是三个agent, 然后有一个channel ,两个sink ,两个sink都是从这一个channel里面拿数据的,如果你有多个sink需要对接一个channel的话,那么你就需要组成一个sink组.skin组挑哪个sink从channel里面拿数据?就看sinkProcessor处理器了.

FailoverSinkProcessor是从两个Sink里面挑选出一个优先级最高的sink去干活儿.假如说Sink1优先级最高,那么就对接Flume2的agent, 而Flume3是不干活儿的,因为skin2优先级低.

然后模拟一种特殊的情况,就是Flume2的agent挂掉了,那么flume2的agent绑定的端口就停了,那么skin1就发不出去了,那么就相当于Sink1故障了.
这个时候FailoverSinkProcessor就将sink1拿出去,让剩下的优先级最高的skin2去干活儿. sink2对接的是flume3,那么flume3 就可以干活儿了.

开始编写配置文件

配置文件名字都叫demo4.conf
都放到/root/soft/apache-flume-1.7.0/conf/job/路径下面

zjj101

  1. #a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
  2. a1.sources = r1
  3. a1.sinks = k1 k2
  4. a1.channels = c1
  5. # 配置sink组
  6. a1.sinkgroups = g1
  7. a1.sinkgroups.g1.sinks = k1 k2
  8. a1.sinkgroups.g1.processor.type = failover
  9. # 配置sink的优先级
  10. a1.sinkgroups.g1.processor.priority.k1=100
  11. a1.sinkgroups.g1.processor.priority.k2=90
  12. a1.sinkgroups.g1.processor.sinks=k1 k2
  13. #组名名.属性名=属性值
  14. a1.sources.r1.type=exec
  15. a1.sources.r1.command=tail -f /root/soft/test.log
  16. #声明r1的channel选择器
  17. a1.sources.r1.selector.type = replicating
  18. #定义chanel
  19. a1.channels.c1.type=memory
  20. a1.channels.c1.capacity=1000
  21. ##定义sink
  22. a1.sinks.k1.type=avro
  23. a1.sinks.k1.hostname=zjj102
  24. a1.sinks.k1.port=33333
  25. a1.sinks.k2.type=avro
  26. a1.sinks.k2.hostname=zjj103
  27. a1.sinks.k2.port=33333
  28. #连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
  29. a1.sources.r1.channels=c1
  30. a1.sinks.k1.channel=c1
  31. a1.sinks.k2.channel=c1

zjj102

  1. a1.sources = r1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. #组名名.属性名=属性值
  5. a1.sources.r1.type=avro
  6. a1.sources.r1.bind=zjj102
  7. a1.sources.r1.port=33333
  8. #定义sink
  9. a1.sinks.k1.type=logger
  10. #定义chanel
  11. a1.channels.c1.type=memory
  12. a1.channels.c1.capacity=1000
  13. #连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
  14. a1.sources.r1.channels=c1
  15. a1.sinks.k1.channel=c1

zjj103

  1. a1.sources = r1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. #组名名.属性名=属性值
  5. a1.sources.r1.type=avro
  6. a1.sources.r1.bind=zjj103
  7. a1.sources.r1.port=33333
  8. #定义sink
  9. a1.sinks.k1.type=logger
  10. #定义chanel
  11. a1.channels.c1.type=memory
  12. a1.channels.c1.capacity=1000
  13. #连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
  14. a1.sources.r1.channels=c1
  15. a1.sinks.k1.channel=c1

启动项目

启动顺序是 zjj101 必须要在 zjj102 zjj103 之后启动,因为zjj102 和zjj103开放监听了端口.

启动zjj102

  1. flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Dflume.root.logger=DEBUG,console

启动zjj103

  1. flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Dflume.root.logger=DEBUG,console

启动zjj101

  1. flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Dflume.root.logger=DEBUG,console

测试

再开一个zjj101终端
往test.log里面输出内容

  1. [root@zjj101 soft]# echo 2020121111111111111 >> test.log
  2. [root@zjj101 soft]# pwd
  3. /root/soft

此时观察zjj102 和zjj103日志发现,只有zjj102日志有输出,说明接收到了数据,

  1. 20/10/26 12:09:44 INFO sink.LoggerSink: Event: { headers:{} body: 32 30 32 30 31 32 31 31 31 31 31 31 31 31 31 31 2020121111111111 }

我现在模拟发生故障,我给zjj102的agent关闭掉.. 杀死33333端口的程序.

此时再推送消息就只有zjj103收到了.

然后我给zjj102的agent启动起来,等待zjj101通讯心跳发现zjj102agent恢复正常了, 此时再推送内容,就又推送给zjj102了,而zjj103就没有了..