Load balancing Sink Processor

负载均衡的sink processor! Load balancing Sink Processor维持了sink组中active状态的sink.
使用round_robin(轮询) 或 random(随机) 算法,来分散sink组中存活的sink之间的负载
假设当前sink组里面有三个sink, 这时候Load balancing Sink Processor 只会维护存活状态的sink,如果某个sink挂掉了就会从sink组里面去掉.
然后发送数据的时候就根据sink组里面两个存活的sink里面,找一个来发送数据,至于找哪个skin发送就得看是使用round_robin(轮询) 还是random(随机) 算法
必需配置:
processor.sinks – Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be load_balance
案例
使用Flume1监控一个端口,其sink组中的sink分别对接Flume2和Flume3,采用Load balancing Sink Processor实现负载均衡功能,让Flume2和FLume3轮询发送
开始编写配置文件
配置文件名字都叫demo4.conf
都放到/root/soft/apache-flume-1.7.0/conf/job/路径下面
zjj101
#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔a1.sources = r1a1.sinks = k1 k2a1.channels = c1# 配置sink组a1.sinkgroups = g1a1.sinkgroups.g1.sinks = k1 k2# 配置使用负载均衡策略a1.sinkgroups.g1.processor.type = load_balance# k1 k2轮流干活儿a1.sinkgroups.g1.processor.sinks=k1 k2#组名名.属性名=属性值a1.sources.r1.type=execa1.sources.r1.command=tail -f /root/soft/test.log#声明r1的channel选择器a1.sources.r1.selector.type = replicating#定义chanela1.channels.c1.type=memorya1.channels.c1.capacity=1000##定义sinka1.sinks.k1.type=avroa1.sinks.k1.hostname=zjj102a1.sinks.k1.port=33333a1.sinks.k2.type=avroa1.sinks.k2.hostname=zjj103a1.sinks.k2.port=33333#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!a1.sources.r1.channels=c1a1.sinks.k1.channel=c1a1.sinks.k2.channel=c1
zjj102
a1.sources = r1a1.sinks = k1a1.channels = c1#组名名.属性名=属性值a1.sources.r1.type=avroa1.sources.r1.bind=zjj102a1.sources.r1.port=33333#定义sinka1.sinks.k1.type=logger#定义chanela1.channels.c1.type=memorya1.channels.c1.capacity=1000#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!a1.sources.r1.channels=c1a1.sinks.k1.channel=c1
zjj103
a1.sources = r1a1.sinks = k1a1.channels = c1#组名名.属性名=属性值a1.sources.r1.type=avroa1.sources.r1.bind=zjj103a1.sources.r1.port=33333#定义sinka1.sinks.k1.type=logger#定义chanela1.channels.c1.type=memorya1.channels.c1.capacity=1000#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!a1.sources.r1.channels=c1a1.sinks.k1.channel=c1
启动项目
启动顺序是 zjj101 必须要在 zjj102 zjj103 之后启动,因为zjj102 和zjj103开放监听了端口.
启动zjj102
flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Dflume.root.logger=DEBUG,console
启动zjj103
flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Dflume.root.logger=DEBUG,console
启动zjj101
flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Dflume.root.logger=DEBUG,console
测试
再开一个zjj101终端
往test.log里面输出内容
[root@zjj101 soft]# echo 13115464564561 >> test.log[root@zjj101 soft]# echo 13115464564561 >> test.log[root@zjj101 soft]# echo 1311546561 >> test.log[root@zjj101 soft]# echo 131151 >> test.log[root@zjj101 soft]# pwd/root/soft
此时观察zjj102 和zjj103日志发现,zjj102和zjj103机器是轮询接收消息的.就是你接收一条我接收一条消息.
然后我给zjj103关掉 , 杀死 zjj103的33333端口占用的进程,再发送内容
[root@zjj101 soft]# echo 51 >> test.log[root@zjj101 soft]# echo 51 >> test.log[root@zjj101 soft]# echo 51 >> test.log
然后你就会发现zjj102机器全部接收到了这三条消息的内容.
