参考图

Flume之将两个Flume的数据聚合到第三个Flume里面. - 图1

使用场景

比如说你收集的日志信息最终都需要写到HDFS上,但是HDFS都是公司内部来使用的,不会让外网来访问的,因此公司可能会有一台机器Flume3,只有Flume3这一台机器有权限来访问HDFS,
那么这个时候可能有Flume1和Flume2两个机器是收集外网的数据,比如这两个机器都各自开放了一个端口让别的程序来往这里写入,但是Flume1和Flume2是没权限往HDFS里面写入的.
这样解决办法就是让Flume1和Flume2把数据传给Flume3,让Flume3去访问HDFS.

准备

三台机器安装了Flume
zjj101 zjj102 zjj103

编写配置文件

配置文件名字都叫demo4.conf
都放到/root/soft/apache-flume-1.7.0/conf/job/路径下面

zjj101

  1. # Name the components on this agent
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # Describe/configure the source
  6. a1.sources.r1.type = exec
  7. # 监控日志追加写
  8. a1.sources.r1.command = tail -f /root/soft/test.log
  9. a1.sources.r1.shell = /bin/bash -c
  10. # Describe the sink
  11. a1.sinks.k1.type = avro
  12. a1.sinks.k1.hostname = zjj103
  13. a1.sinks.k1.port = 44444
  14. # Describe the channel
  15. a1.channels.c1.type = memory
  16. a1.channels.c1.capacity = 10000
  17. a1.channels.c1.transactionCapacity = 1000
  18. # Bind the source and sink to the channel
  19. a1.sources.r1.channels = c1
  20. a1.sinks.k1.channel = c1

zjj102

  1. # Name the components on this agent
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # Describe/configure the source
  6. a1.sources.r1.type = exec
  7. # 监控日志追加写
  8. a1.sources.r1.command = tail -f /root/soft/test.log
  9. a1.sources.r1.shell = /bin/bash -c
  10. # Describe the sink
  11. a1.sinks.k1.type = avro
  12. a1.sinks.k1.hostname = zjj103
  13. a1.sinks.k1.port = 44444
  14. # Describe the channel
  15. a1.channels.c1.type = memory
  16. a1.channels.c1.capacity = 10000
  17. a1.channels.c1.transactionCapacity = 1000
  18. # Bind the source and sink to the channel
  19. a1.sources.r1.channels = c1
  20. a1.sinks.k1.channel = c1

zjj103

  1. #a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # 绑定了ip为zjj103 ,端口为44444 ,别的agent可以通过这个ip端口推送内容,
  6. a1.sources.r1.type=avro
  7. a1.sources.r1.bind=zjj103
  8. a1.sources.r1.port=44444
  9. #定义sink
  10. a1.sinks.k1.type=logger
  11. #定义chanel
  12. a1.channels.c1.type=memory
  13. a1.channels.c1.capacity=1000
  14. #连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据
  15. a1.sources.r1.channels=c1
  16. a1.sinks.k1.channel=c1

启动项目并测试

先启动 zjj103 再启动 zjj101

启动zjj103

启动zjj103

  1. [root@zjj103 job]# flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Df lume.root.logger=DEBUG,console

观察zjj103日志

  1. # 显示已经开启了zjj103ip 44444端口的 source,
  2. 20/10/26 14:00:21 INFO source.AvroSource: Starting Avro source r1: { bindAddress: zjj103, port: 44444 }...
  3. 20/10/26 14:00:22 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r 1: Successfully registered new MBean.
  4. 20/10/26 14:00:22 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
  5. 20/10/26 14:00:22 INFO source.AvroSource: Avro source r1 started.

启动zjj101

  1. [root@zjj101 job]# flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Df lume.root.logger=DEBUG,console

观察 zjj101日志

  1. # 启动了一个配置信息
  2. 20/10/26 14:01:20 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner : { source:org.apache.flume.source.ExecSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.a pache.flume.sink.DefaultSinkProcessor@1a72854b counterGroup:{ name:null counters:{} } }} channels:{c1=org.apach e.flume.channel.MemoryChannel{name: c1}} }
  3. # 启动了Channel c1
  4. 20/10/26 14:01:20 INFO node.Application: Starting Channel c1
  5. 20/10/26 14:01:20 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms
  6. 20/10/26 14:01:20 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
  7. 20/10/26 14:01:20 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
  8. # 启动了k1 Sink
  9. 20/10/26 14:01:21 INFO node.Application: Starting Sink k1
  10. # 启动了r1Source
  11. 20/10/26 14:01:21 INFO node.Application: Starting Source r1
  12. # 开启 RpcSink k1 连接地址是zjj103 端口44444
  13. 20/10/26 14:01:21 INFO sink.AbstractRpcSink: Starting RpcSink k1 { host: zjj103, port: 44444 }...
  14. 20/10/26 14:01:21 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
  15. 20/10/26 14:01:21 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
  16. 20/10/26 14:01:21 INFO sink.AbstractRpcSink: Rpc sink k1: Building RpcClient with hostname: zjj103, port: 44444
  17. # 开始执行命令tail -f /root/soft/test.log ,监控命令的结果
  18. 20/10/26 14:01:21 INFO source.ExecSource: Exec source starting with command:tail -f /root/soft/test.log
  19. 20/10/26 14:01:21 INFO sink.AvroSink: Attempting to create Avro Rpc client.
  20. 20/10/26 14:01:21 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r 1: Successfully registered new MBean.
  21. 20/10/26 14:01:21 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
  22. 20/10/26 14:01:21 WARN api.NettyAvroRpcClient: Using default maxIOWorkers
  23. 20/10/26 14:01:21 INFO sink.AbstractRpcSink: Rpc sink k1 started.

观察zjj103日志

  1. # 这里显示打开了一个绑定的链接
  2. 20/10/26 14:01:21 INFO ipc.NettyServer: [id: 0xdc7fd12a, /172.16.10.101:58136 => /172.16.10.103:44444] OPEN
  3. 20/10/26 14:01:21 INFO ipc.NettyServer: [id: 0xdc7fd12a, /172.16.10.101:58136 => /172.16.10.103:44444] BOUND: / 172.16.10.103:44444
  4. # 绑定地址是172.16.10.101 这个地址就是zjj101的地址
  5. 20/10/26 14:01:21 INFO ipc.NettyServer: [id: 0xdc7fd12a, /172.16.10.101:58136 => /172.16.10.103:44444] CONNECTE D: /172.16.10.101:58136

测试

重新打开一个新的zjj101终端 , 用echo模拟输出日志到test.log

  1. [root@zjj101 soft]# echo 11111111111111111 >> test.log
  2. [root@zjj101 soft]# pwd
  3. /root/soft

这时候你就能看到zjj103的日志显示有消息推送过来了.. 结果我就不粘贴过来了.

启动zjj102机器

启动方式和zjj101几乎差不多

  1. [root@zjj102 job]# flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Df lume.root.logger=DEBUG,console

日志 ,其实日志很重要, 里面详细的记录了启动时候实例化什么东西,绑定了什么东西,等等.

  1. 20/10/26 14:04:06 INFO node.AbstractConfigurationProvider: Creating channels
  2. 20/10/26 14:04:06 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
  3. 20/10/26 14:04:06 INFO node.AbstractConfigurationProvider: Created channel c1
  4. 20/10/26 14:04:06 INFO source.DefaultSourceFactory: Creating instance of source r1, type exec
  5. 20/10/26 14:04:06 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: avro
  6. 20/10/26 14:04:06 INFO sink.AbstractRpcSink: Connection reset is set to 0. Will not reset connection to next ho p
  7. 20/10/26 14:04:06 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
  8. 20/10/26 14:04:06 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner : { source:org.apache.flume.source.ExecSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.a pache.flume.sink.DefaultSinkProcessor@38daefe5 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apach e.flume.channel.MemoryChannel{name: c1}} }
  9. 20/10/26 14:04:06 INFO node.Application: Starting Channel c1
  10. 20/10/26 14:04:06 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms
  11. 20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
  12. 20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
  13. 20/10/26 14:04:06 INFO node.Application: Starting Sink k1
  14. 20/10/26 14:04:06 INFO node.Application: Starting Source r1
  15. 20/10/26 14:04:06 INFO sink.AbstractRpcSink: Starting RpcSink k1 { host: zjj103, port: 44444 }...
  16. 20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
  17. 20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
  18. 20/10/26 14:04:06 INFO sink.AbstractRpcSink: Rpc sink k1: Building RpcClient with hostname: zjj103, port: 44444
  19. 20/10/26 14:04:06 INFO source.ExecSource: Exec source starting with command:tail -f /root/soft/test.log
  20. 20/10/26 14:04:06 INFO sink.AvroSink: Attempting to create Avro Rpc client.
  21. 20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r 1: Successfully registered new MBean.
  22. 20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
  23. 20/10/26 14:04:06 WARN api.NettyAvroRpcClient: Using default maxIOWorkers
  24. 20/10/26 14:04:07 INFO sink.AbstractRpcSink: Rpc sink k1 started.

测试

测试zjj102我就不粘贴了,,和测试zjj101差不多是一样的,
也是打开一个新的zjj102终端,然后用 echo 模拟往test.log日志输出东西..

此时不管你在zjj101还是在zjj102输出的日志信息都被传输到 zjj103的flume的agent里面了…