Failover Sink Processor
Failover Sink Processor维护了一个多个sink的有优先级的列表,按照优先级保证,至少有一个sink是可以干活的(处理event的)
如果根据优先级发现,优先级高的sink故障了,故障的sink会被转移到一个故障的池中冷却.
在冷却时,故障的sink也会不管尝试发送event,一旦发送成功,此时会将故障的sink再移动到存活的池中.
必需配置:
sinks – Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be failover
processor.priority. – Priority value. must be one of the sink instances associated with the current sink group A higher priority value Sink gets activated earlier. A larger absolute value indicates higher priority
案例
使用Flume1监控一个端口,其sink组中的sink分别对接Flume2和Flume3,采用FailoverSinkProcessor,实现故障转移的功能。
案例是三个agent, 然后有一个channel ,两个sink ,两个sink都是从这一个channel里面拿数据的,如果你有多个sink需要对接一个channel的话,那么你就需要组成一个sink组.skin组挑哪个sink从channel里面拿数据?就看sinkProcessor处理器了.
FailoverSinkProcessor是从两个Sink里面挑选出一个优先级最高的sink去干活儿.假如说Sink1优先级最高,那么就对接Flume2的agent, 而Flume3是不干活儿的,因为skin2优先级低.
然后模拟一种特殊的情况,就是Flume2的agent挂掉了,那么flume2的agent绑定的端口就停了,那么skin1就发不出去了,那么就相当于Sink1故障了.
这个时候FailoverSinkProcessor就将sink1拿出去,让剩下的优先级最高的skin2去干活儿. sink2对接的是flume3,那么flume3 就可以干活儿了.
开始编写配置文件
配置文件名字都叫demo4.conf
都放到/root/soft/apache-flume-1.7.0/conf/job/路径下面
zjj101
#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
# 配置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
# 配置sink的优先级
a1.sinkgroups.g1.processor.priority.k1=100
a1.sinkgroups.g1.processor.priority.k2=90
a1.sinkgroups.g1.processor.sinks=k1 k2
#组名名.属性名=属性值
a1.sources.r1.type=exec
a1.sources.r1.command=tail -f /root/soft/test.log
#声明r1的channel选择器
a1.sources.r1.selector.type = replicating
#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
##定义sink
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=zjj102
a1.sinks.k1.port=33333
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=zjj103
a1.sinks.k2.port=33333
#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1
zjj102
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#组名名.属性名=属性值
a1.sources.r1.type=avro
a1.sources.r1.bind=zjj102
a1.sources.r1.port=33333
#定义sink
a1.sinks.k1.type=logger
#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
zjj103
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#组名名.属性名=属性值
a1.sources.r1.type=avro
a1.sources.r1.bind=zjj103
a1.sources.r1.port=33333
#定义sink
a1.sinks.k1.type=logger
#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动项目
启动顺序是 zjj101 必须要在 zjj102 zjj103 之后启动,因为zjj102 和zjj103开放监听了端口.
启动zjj102
flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Dflume.root.logger=DEBUG,console
启动zjj103
flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Dflume.root.logger=DEBUG,console
启动zjj101
flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Dflume.root.logger=DEBUG,console
测试
再开一个zjj101终端
往test.log里面输出内容
[root@zjj101 soft]# echo 2020121111111111111 >> test.log
[root@zjj101 soft]# pwd
/root/soft
此时观察zjj102 和zjj103日志发现,只有zjj102日志有输出,说明接收到了数据,
20/10/26 12:09:44 INFO sink.LoggerSink: Event: { headers:{} body: 32 30 32 30 31 32 31 31 31 31 31 31 31 31 31 31 2020121111111111 }
我现在模拟发生故障,我给zjj102的agent关闭掉.. 杀死33333端口的程序.
此时再推送消息就只有zjj103收到了.
然后我给zjj102的agent启动起来,等待zjj101通讯心跳发现zjj102agent恢复正常了, 此时再推送内容,就又推送给zjj102了,而zjj103就没有了..