Load balancing Sink Processor

Flume之负载均衡Load balancing Sink Processor和案例 - 图1

负载均衡的sink processor! Load balancing Sink Processor维持了sink组中active状态的sink.
使用round_robin(轮询) 或 random(随机) 算法,来分散sink组中存活的sink之间的负载

假设当前sink组里面有三个sink, 这时候Load balancing Sink Processor 只会维护存活状态的sink,如果某个sink挂掉了就会从sink组里面去掉.
然后发送数据的时候就根据sink组里面两个存活的sink里面,找一个来发送数据,至于找哪个skin发送就得看是使用round_robin(轮询) 还是random(随机) 算法

必需配置:
processor.sinks – Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be load_balance

案例

使用Flume1监控一个端口,其sink组中的sink分别对接Flume2和Flume3,采用Load balancing Sink Processor实现负载均衡功能,让Flume2和FLume3轮询发送

开始编写配置文件

配置文件名字都叫demo4.conf
都放到/root/soft/apache-flume-1.7.0/conf/job/路径下面

zjj101

  1. #a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
  2. a1.sources = r1
  3. a1.sinks = k1 k2
  4. a1.channels = c1
  5. # 配置sink组
  6. a1.sinkgroups = g1
  7. a1.sinkgroups.g1.sinks = k1 k2
  8. # 配置使用负载均衡策略
  9. a1.sinkgroups.g1.processor.type = load_balance
  10. # k1 k2轮流干活儿
  11. a1.sinkgroups.g1.processor.sinks=k1 k2
  12. #组名名.属性名=属性值
  13. a1.sources.r1.type=exec
  14. a1.sources.r1.command=tail -f /root/soft/test.log
  15. #声明r1的channel选择器
  16. a1.sources.r1.selector.type = replicating
  17. #定义chanel
  18. a1.channels.c1.type=memory
  19. a1.channels.c1.capacity=1000
  20. ##定义sink
  21. a1.sinks.k1.type=avro
  22. a1.sinks.k1.hostname=zjj102
  23. a1.sinks.k1.port=33333
  24. a1.sinks.k2.type=avro
  25. a1.sinks.k2.hostname=zjj103
  26. a1.sinks.k2.port=33333
  27. #连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
  28. a1.sources.r1.channels=c1
  29. a1.sinks.k1.channel=c1
  30. a1.sinks.k2.channel=c1

zjj102

  1. a1.sources = r1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. #组名名.属性名=属性值
  5. a1.sources.r1.type=avro
  6. a1.sources.r1.bind=zjj102
  7. a1.sources.r1.port=33333
  8. #定义sink
  9. a1.sinks.k1.type=logger
  10. #定义chanel
  11. a1.channels.c1.type=memory
  12. a1.channels.c1.capacity=1000
  13. #连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
  14. a1.sources.r1.channels=c1
  15. a1.sinks.k1.channel=c1

zjj103

  1. a1.sources = r1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. #组名名.属性名=属性值
  5. a1.sources.r1.type=avro
  6. a1.sources.r1.bind=zjj103
  7. a1.sources.r1.port=33333
  8. #定义sink
  9. a1.sinks.k1.type=logger
  10. #定义chanel
  11. a1.channels.c1.type=memory
  12. a1.channels.c1.capacity=1000
  13. #连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
  14. a1.sources.r1.channels=c1
  15. a1.sinks.k1.channel=c1

启动项目

启动顺序是 zjj101 必须要在 zjj102 zjj103 之后启动,因为zjj102 和zjj103开放监听了端口.

启动zjj102

  1. flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Dflume.root.logger=DEBUG,console

启动zjj103

  1. flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Dflume.root.logger=DEBUG,console

启动zjj101

  1. flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Dflume.root.logger=DEBUG,console

测试

再开一个zjj101终端
往test.log里面输出内容

  1. [root@zjj101 soft]# echo 13115464564561 >> test.log
  2. [root@zjj101 soft]# echo 13115464564561 >> test.log
  3. [root@zjj101 soft]# echo 1311546561 >> test.log
  4. [root@zjj101 soft]# echo 131151 >> test.log
  5. [root@zjj101 soft]# pwd
  6. /root/soft

此时观察zjj102 和zjj103日志发现,zjj102和zjj103机器是轮询接收消息的.就是你接收一条我接收一条消息.

然后我给zjj103关掉 , 杀死 zjj103的33333端口占用的进程,再发送内容

  1. [root@zjj101 soft]# echo 51 >> test.log
  2. [root@zjj101 soft]# echo 51 >> test.log
  3. [root@zjj101 soft]# echo 51 >> test.log

然后你就会发现zjj102机器全部接收到了这三条消息的内容.