概述Flume之两个Agent串联接收消息 - 图2

这种模式是将多个flume顺序连接起来了,从最初的source开始到最终sink传送的目的存储系统。此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点flume宕机,会影响整个传输系统。

两套agent可能是两个机器,一旦发生两个agent串联,头一个agent的sink必须选AvroSink,然后第二个agent的Source要选AvroSource.

案例

在zjj101机器上agent1 使用架构 netcatsource—-memorychannel—arvosink
在zjj102机器上agent2使用架构 avrosource——memorychannel—loggersink
agent1给输出结果输出到agent2上.

编写agent配置文件

zjj101机器

demo1.conf

  1. #agent1
  2. #a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
  3. a1.sources = r1
  4. a1.sinks = k1
  5. a1.channels = c1
  6. #组名名.属性名=属性值
  7. a1.sources.r1.type=netcat
  8. a1.sources.r1.bind=zjj101
  9. a1.sources.r1.port=44444
  10. #定义sink
  11. a1.sinks.k1.type=avro
  12. a1.sinks.k1.hostname=zjj102
  13. a1.sinks.k1.port=33333
  14. #定义chanel
  15. a1.channels.c1.type=memory
  16. a1.channels.c1.capacity=1000
  17. #连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
  18. a1.sources.r1.channels=c1
  19. a1.sinks.k1.channel=c1

zjj102机器

demo1.conf

  1. #agent2
  2. #a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
  3. a1.sources = r1
  4. a1.sinks = k1
  5. a1.channels = c1
  6. #组名名.属性名=属性值
  7. a1.sources.r1.type=avro
  8. a1.sources.r1.bind=zjj102
  9. a1.sources.r1.port=33333
  10. #定义sink
  11. a1.sinks.k1.type=logger
  12. #定义chanel
  13. a1.channels.c1.type=memory
  14. a1.channels.c1.capacity=1000
  15. #连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
  16. a1.sources.r1.channels=c1
  17. a1.sinks.k1.channel=c1

启动Agent

启动的时候先启动zjj102机器的的agent,因为zjj102机器的的agent绑定了ip端口号啥的 ,zjj101机器才能进行推送,所有先启zjj102的.

因为两台机器的文件位置是一样的,所以命令看起来差不多也是一样的.

启动zjj102的agent

linux命令

  1. flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo1.conf" -Dflume.root.logger=DEBUG,console

启动zjj101的agent

linux命令

  1. flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo1.conf" -Dflume.root.logger=DEBUG,console

测试

由于上面两个agent是在前台启动的,所有就干脆打开一个Linux终端往 44444端口推送内容

第二个zjj101终端

  1. # zjj101是配置了hosts.如果你没配置hosts的话,就把zjj101替换你要连接的服务器的ip
  2. [root@zjj101 ~]# nc zjj101 44444
  3. sdad
  4. OK
  5. 你好,我是张俊杰
  6. OK
  7. helloflume
  8. OK

查看结果

zjj102机器

可以看到zjj102机器已经获取到结果了,

这个结果是 第二个zjj101终端 往第一个zjj101终端 发送内容, 然后 第一个zjj101终端把内容传给了zjj102机器的agent上.

你也可以理解为第一个zjj101终端和第二个zjj101终端是两台机器,.

  1. 20/10/23 17:56:51 INFO source.AvroSource: Avro source r1 started.
  2. 20/10/23 17:57:01 INFO ipc.NettyServer: [id: 0x1655defa, /172.16.10.101:59964 => /172.16.10.102:33333] OPEN
  3. 20/10/23 17:57:01 INFO ipc.NettyServer: [id: 0x1655defa, /172.16.10.101:59964 => /172.16.10.102:33333] BOUND: /172.16.10.102:33333
  4. 20/10/23 17:57:01 INFO ipc.NettyServer: [id: 0x1655defa, /172.16.10.101:59964 => /172.16.10.102:33333] CONNECTED: /172.16.10.101:59964
  5. 20/10/23 17:59:13 INFO sink.LoggerSink: Event: { headers:{} body: 73 64 61 64 sdad }
  6. 20/10/23 17:59:17 INFO sink.LoggerSink: Event: { headers:{} body: E4 BD A0 E5 A5 BD 2C E6 88 91 E6 98 AF E5 BC A0 ......,......... }
  7. 20/10/23 17:59:47 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F 66 6C 75 6D 65 helloflum