需求
Flume1监控SpringBoot的日志信息, 然后将监控到的内容传递给Flume2和Flume3 ,
Flume2 和Flume3 可以各自处理各自的逻辑,比如说Flume2 上传到HDFS上, Flume3 可以上传到Kafka里面..
这里演示Flume2和Flume3就直接通过打印日志的形式演示了.
找一个SpringBoot项目
随便找一个SpringBoot项目, 注意,要在启动zjj101的agent之前启动这个SpringBoot项目
可以用我这个:
https://blog.csdn.net/qq_41489540/article/details/109242639
打成一个jar包放到Linux里面的/root/soft 命令下面
nohup java -jar demo.jar >test.log 2>&1 &
启动完了就会在同目录下生成一个 test.log文件去记录这个SpringBoot的项目的日志信息.
生成的test.log文件的位置在/root/soft/test.log位置
编写agent1配置文件
放到zjj101机器
#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
#组名名.属性名=属性值
a1.sources.r1.type=exec
# 监视/root/soft/test.log的实时输出
a1.sources.r1.command=tail -f /root/soft/test.log
#声明r1的channel选择器
a1.sources.r1.selector.type = replicating
#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
##定义sink
a1.sinks.k1.type=avro
# zjj102是ip,我已经配置了host映射了
a1.sinks.k1.hostname=zjj102
a1.sinks.k1.port=33333
a1.sinks.k2.type=avro
# zjj103是ip,我已经配置了host映射了.
a1.sinks.k2.hostname=zjj103
a1.sinks.k2.port=33333
#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1 c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2
编写agent2配置文件
放到zjj102机器用来接收zjj101机器agent推送过来的内容
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#组名名.属性名=属性值
a1.sources.r1.type=avro
# zjj102 是一个ip,我已经配置了host映射了.
a1.sources.r1.bind=zjj102
a1.sources.r1.port=33333
#定义sink
a1.sinks.k1.type=logger
#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
编写agent3配置文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#组名名.属性名=属性值
a1.sources.r1.type=avro
# zjj103是一个ip,我已经配置了host映射
a1.sources.r1.bind=zjj103
a1.sources.r1.port=33333
#定义sink
a1.sinks.k1.type=logger
#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
开始启动
先启动 zjj102 zjj103机器, 因为这两个机器监听端口, 然后zjj101机器才能开始推送.
在zjj103执行命令启动agent
flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo2.conf" -Dflume.root.logger=DEBUG,console
出现下面日志说明启动成功
20/10/24 17:07:43 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
20/10/24 17:07:43 INFO node.Application: Starting Sink k1
20/10/24 17:07:43 INFO node.Application: Starting Source r1
# 启动source 监听zjj103的33333端口
20/10/24 17:07:44 INFO source.AvroSource: Starting Avro source r1: { bindAddress: zjj103, port: 33333 }...
20/10/24 17:07:45 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: S uccessfully registered new MBean.
20/10/24 17:07:45 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
20/10/24 17:07:45 INFO source.AvroSource: Avro source r1 started.
20/10/24 17:09:10 INFO ipc.NettyServer: [id: 0x46912a56, /172.16.10.101:56640 => /172.16.10.103:33333] OPEN
20/10/24 17:09:10 INFO ipc.NettyServer: [id: 0x46912a56, /172.16.10.101:56640 => /172.16.10.103:33333] BOUND: /172. 16.10.103:33333
20/10/24 17:09:10 INFO ipc.NettyServer: [id: 0x46912a56, /172.16.10.101:56640 => /172.16.10.103:33333] CONNECTED: / 172.16.10.101:56640
在zjj102执行命令启动agent
flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo2.conf" -Dflume.root.logger=DEBUG,console
出现下面日志说明启动成功
20/10/24 17:08:27 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 start ed
20/10/24 17:08:27 INFO source.AvroSource: Avro source r1 started.
# ..... 忽略了一些日志
# 下面显示监听了zjj102的33333端口
20/10/24 17:12:21 INFO lifecycle.LifecycleSupervisor: Stopping lifecycle supervisor 10
20/10/24 17:12:21 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider stopping
20/10/24 17:12:21 INFO source.AvroSource: Avro source r1 stopping: Avro source r1: { bindAddress: zjj102, port: 33333 }
在zjj101执行命令启动agent
flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo2.conf" -Dflume.root.logger=DEBUG,console
出现这个日志说明启动成功
20/10/24 17:09:09 INFO node.Application: Starting Sink k1
20/10/24 17:09:09 INFO node.Application: Starting Sink k2
20/10/24 17:09:09 INFO sink.AbstractRpcSink: Starting RpcSink k2 { host: zjj103, port: 33333 }...
20/10/24 17:09:09 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK , name: k2: Successfully registered new MBean.
20/10/24 17:09:09 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k2 started
# 绑定了zjj103的ip 33333端口
20/10/24 17:09:09 INFO sink.AbstractRpcSink: Rpc sink k2: Building RpcClient with hostname: zjj103, port: 33333
20/10/24 17:09:09 INFO sink.AvroSink: Attempting to create Avro Rpc client.
20/10/24 17:09:09 INFO node.Application: Starting Source r1
20/10/24 17:09:09 INFO source.ExecSource: Exec source starting with command:tail -f /root/soft/test. log
# 绑定了zjj102的ip 33333 端口
20/10/24 17:09:09 INFO sink.AbstractRpcSink: Starting RpcSink k1 { host: zjj102, port: 33333 }...
20/10/24 17:09:09 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK , name: k1: Successfully registered new MBean.
20/10/24 17:09:09 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
20/10/24 17:09:09 INFO sink.AbstractRpcSink: Rpc sink k1: Building RpcClient with hostname: zjj102, port: 33333
启动完毕开始测试
测试
发起get请求
// zjj101是ip地址,和zjj101机器ip地址是一样的, 我配置了host映射
此时就会发现 zjj102和zjj103机器日志信息开始动了,说明zjj101的Flume 已经将数据 传给zjj102和zjj103的Flume了.
而且是相同的数据.
zjj102机器
20/10/24 17:11:08 INFO sink.LoggerSink: Event: { headers:{} body: 31 37 3A 31 31 3A 30 30 20 5B 68 74 74 70 2D 6E 17:11:00 [http-n }
20/10/24 17:11:08 INFO sink.LoggerSink: Event: { headers:{} body: 31 37 3A 31 31 3A 30 30 20 5B 68 74 74 70 2D 6E 17:11:00 [http-n }
zjj103机器
20/10/24 17:11:08 INFO sink.LoggerSink: Event: { headers:{} body: 31 37 3A 31 31 3A 30 30 20 5B 68 74 74 70 2D 6E 17:11:00 [http-n }
20/10/24 17:11:08 INFO sink.LoggerSink: Event: { headers:{} body: 31 37 3A 31 31 3A 30 30 20 5B 68 74 74 70 2D 6E 17:11:00 [http-n }