需求

Flume1监控SpringBoot的日志信息, 然后将监控到的内容传递给Flume2和Flume3 ,

Flume2 和Flume3 可以各自处理各自的逻辑,比如说Flume2 上传到HDFS上, Flume3 可以上传到Kafka里面..

这里演示Flume2和Flume3就直接通过打印日志的形式演示了.

找一个SpringBoot项目

随便找一个SpringBoot项目, 注意,要在启动zjj101的agent之前启动这个SpringBoot项目

可以用我这个:
https://blog.csdn.net/qq_41489540/article/details/109242639

打成一个jar包放到Linux里面的/root/soft 命令下面

  1. nohup java -jar demo.jar >test.log 2>&1 &

启动完了就会在同目录下生成一个 test.log文件去记录这个SpringBoot的项目的日志信息.

生成的test.log文件的位置在/root/soft/test.log位置

编写agent1配置文件

放到zjj101机器

  1. #a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
  2. a1.sources = r1
  3. a1.sinks = k1 k2
  4. a1.channels = c1 c2
  5. #组名名.属性名=属性值
  6. a1.sources.r1.type=exec
  7. # 监视/root/soft/test.log的实时输出
  8. a1.sources.r1.command=tail -f /root/soft/test.log
  9. #声明r1的channel选择器
  10. a1.sources.r1.selector.type = replicating
  11. #定义chanel
  12. a1.channels.c1.type=memory
  13. a1.channels.c1.capacity=1000
  14. a1.channels.c2.type=memory
  15. a1.channels.c2.capacity=1000
  16. ##定义sink
  17. a1.sinks.k1.type=avro
  18. # zjj102是ip,我已经配置了host映射了
  19. a1.sinks.k1.hostname=zjj102
  20. a1.sinks.k1.port=33333
  21. a1.sinks.k2.type=avro
  22. # zjj103是ip,我已经配置了host映射了.
  23. a1.sinks.k2.hostname=zjj103
  24. a1.sinks.k2.port=33333
  25. #连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
  26. a1.sources.r1.channels=c1 c2
  27. a1.sinks.k1.channel=c1
  28. a1.sinks.k2.channel=c2

编写agent2配置文件

放到zjj102机器用来接收zjj101机器agent推送过来的内容

  1. a1.sources = r1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. #组名名.属性名=属性值
  5. a1.sources.r1.type=avro
  6. # zjj102 是一个ip,我已经配置了host映射了.
  7. a1.sources.r1.bind=zjj102
  8. a1.sources.r1.port=33333
  9. #定义sink
  10. a1.sinks.k1.type=logger
  11. #定义chanel
  12. a1.channels.c1.type=memory
  13. a1.channels.c1.capacity=1000
  14. #连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
  15. a1.sources.r1.channels=c1
  16. a1.sinks.k1.channel=c1

编写agent3配置文件

  1. a1.sources = r1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. #组名名.属性名=属性值
  5. a1.sources.r1.type=avro
  6. # zjj103是一个ip,我已经配置了host映射
  7. a1.sources.r1.bind=zjj103
  8. a1.sources.r1.port=33333
  9. #定义sink
  10. a1.sinks.k1.type=logger
  11. #定义chanel
  12. a1.channels.c1.type=memory
  13. a1.channels.c1.capacity=1000
  14. a1.sources.r1.channels=c1
  15. a1.sinks.k1.channel=c1

开始启动

先启动 zjj102 zjj103机器, 因为这两个机器监听端口, 然后zjj101机器才能开始推送.

在zjj103执行命令启动agent

  1. flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo2.conf" -Dflume.root.logger=DEBUG,console

出现下面日志说明启动成功

  1. 20/10/24 17:07:43 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
  2. 20/10/24 17:07:43 INFO node.Application: Starting Sink k1
  3. 20/10/24 17:07:43 INFO node.Application: Starting Source r1
  4. # 启动source 监听zjj103的33333端口
  5. 20/10/24 17:07:44 INFO source.AvroSource: Starting Avro source r1: { bindAddress: zjj103, port: 33333 }...
  6. 20/10/24 17:07:45 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: S uccessfully registered new MBean.
  7. 20/10/24 17:07:45 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
  8. 20/10/24 17:07:45 INFO source.AvroSource: Avro source r1 started.
  9. 20/10/24 17:09:10 INFO ipc.NettyServer: [id: 0x46912a56, /172.16.10.101:56640 => /172.16.10.103:33333] OPEN
  10. 20/10/24 17:09:10 INFO ipc.NettyServer: [id: 0x46912a56, /172.16.10.101:56640 => /172.16.10.103:33333] BOUND: /172. 16.10.103:33333
  11. 20/10/24 17:09:10 INFO ipc.NettyServer: [id: 0x46912a56, /172.16.10.101:56640 => /172.16.10.103:33333] CONNECTED: / 172.16.10.101:56640

在zjj102执行命令启动agent

  1. flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo2.conf" -Dflume.root.logger=DEBUG,console

出现下面日志说明启动成功

  1. 20/10/24 17:08:27 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 start ed
  2. 20/10/24 17:08:27 INFO source.AvroSource: Avro source r1 started.
  3. # ..... 忽略了一些日志
  4. # 下面显示监听了zjj102的33333端口
  5. 20/10/24 17:12:21 INFO lifecycle.LifecycleSupervisor: Stopping lifecycle supervisor 10
  6. 20/10/24 17:12:21 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider stopping
  7. 20/10/24 17:12:21 INFO source.AvroSource: Avro source r1 stopping: Avro source r1: { bindAddress: zjj102, port: 33333 }

在zjj101执行命令启动agent

  1. flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo2.conf" -Dflume.root.logger=DEBUG,console

出现这个日志说明启动成功

  1. 20/10/24 17:09:09 INFO node.Application: Starting Sink k1
  2. 20/10/24 17:09:09 INFO node.Application: Starting Sink k2
  3. 20/10/24 17:09:09 INFO sink.AbstractRpcSink: Starting RpcSink k2 { host: zjj103, port: 33333 }...
  4. 20/10/24 17:09:09 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK , name: k2: Successfully registered new MBean.
  5. 20/10/24 17:09:09 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k2 started
  6. # 绑定了zjj103的ip 33333端口
  7. 20/10/24 17:09:09 INFO sink.AbstractRpcSink: Rpc sink k2: Building RpcClient with hostname: zjj103, port: 33333
  8. 20/10/24 17:09:09 INFO sink.AvroSink: Attempting to create Avro Rpc client.
  9. 20/10/24 17:09:09 INFO node.Application: Starting Source r1
  10. 20/10/24 17:09:09 INFO source.ExecSource: Exec source starting with command:tail -f /root/soft/test. log
  11. # 绑定了zjj102的ip 33333 端口
  12. 20/10/24 17:09:09 INFO sink.AbstractRpcSink: Starting RpcSink k1 { host: zjj102, port: 33333 }...
  13. 20/10/24 17:09:09 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK , name: k1: Successfully registered new MBean.
  14. 20/10/24 17:09:09 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
  15. 20/10/24 17:09:09 INFO sink.AbstractRpcSink: Rpc sink k1: Building RpcClient with hostname: zjj102, port: 33333

启动完毕开始测试

测试

发起get请求
// zjj101是ip地址,和zjj101机器ip地址是一样的, 我配置了host映射

http://zjj101:8080/index

此时就会发现 zjj102和zjj103机器日志信息开始动了,说明zjj101的Flume 已经将数据 传给zjj102和zjj103的Flume了.
而且是相同的数据.

zjj102机器

  1. 20/10/24 17:11:08 INFO sink.LoggerSink: Event: { headers:{} body: 31 37 3A 31 31 3A 30 30 20 5B 68 74 74 70 2D 6E 17:11:00 [http-n }
  2. 20/10/24 17:11:08 INFO sink.LoggerSink: Event: { headers:{} body: 31 37 3A 31 31 3A 30 30 20 5B 68 74 74 70 2D 6E 17:11:00 [http-n }

zjj103机器

  1. 20/10/24 17:11:08 INFO sink.LoggerSink: Event: { headers:{} body: 31 37 3A 31 31 3A 30 30 20 5B 68 74 74 70 2D 6E 17:11:00 [http-n }
  2. 20/10/24 17:11:08 INFO sink.LoggerSink: Event: { headers:{} body: 31 37 3A 31 31 3A 30 30 20 5B 68 74 74 70 2D 6E 17:11:00 [http-n }