参考图
使用场景
比如说你收集的日志信息最终都需要写到HDFS上,但是HDFS都是公司内部来使用的,不会让外网来访问的,因此公司可能会有一台机器Flume3,只有Flume3这一台机器有权限来访问HDFS,
那么这个时候可能有Flume1和Flume2两个机器是收集外网的数据,比如这两个机器都各自开放了一个端口让别的程序来往这里写入,但是Flume1和Flume2是没权限往HDFS里面写入的.
这样解决办法就是让Flume1和Flume2把数据传给Flume3,让Flume3去访问HDFS.
准备
三台机器安装了Flume
zjj101 zjj102 zjj103
编写配置文件
配置文件名字都叫demo4.conf
都放到/root/soft/apache-flume-1.7.0/conf/job/路径下面
zjj101
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
# 监控日志追加写
a1.sources.r1.command = tail -f /root/soft/test.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = zjj103
a1.sinks.k1.port = 44444
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
zjj102
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
# 监控日志追加写
a1.sources.r1.command = tail -f /root/soft/test.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = zjj103
a1.sinks.k1.port = 44444
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
zjj103
#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 绑定了ip为zjj103 ,端口为44444 ,别的agent可以通过这个ip端口推送内容,
a1.sources.r1.type=avro
a1.sources.r1.bind=zjj103
a1.sources.r1.port=44444
#定义sink
a1.sinks.k1.type=logger
#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动项目并测试
先启动 zjj103 再启动 zjj101
启动zjj103
启动zjj103
[root@zjj103 job]# flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Df lume.root.logger=DEBUG,console
观察zjj103日志
# 显示已经开启了zjj103ip 44444端口的 source,
20/10/26 14:00:21 INFO source.AvroSource: Starting Avro source r1: { bindAddress: zjj103, port: 44444 }...
20/10/26 14:00:22 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r 1: Successfully registered new MBean.
20/10/26 14:00:22 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
20/10/26 14:00:22 INFO source.AvroSource: Avro source r1 started.
启动zjj101
[root@zjj101 job]# flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Df lume.root.logger=DEBUG,console
观察 zjj101日志
# 启动了一个配置信息
20/10/26 14:01:20 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner : { source:org.apache.flume.source.ExecSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.a pache.flume.sink.DefaultSinkProcessor@1a72854b counterGroup:{ name:null counters:{} } }} channels:{c1=org.apach e.flume.channel.MemoryChannel{name: c1}} }
# 启动了Channel c1
20/10/26 14:01:20 INFO node.Application: Starting Channel c1
20/10/26 14:01:20 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms
20/10/26 14:01:20 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
20/10/26 14:01:20 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
# 启动了k1 Sink
20/10/26 14:01:21 INFO node.Application: Starting Sink k1
# 启动了r1Source
20/10/26 14:01:21 INFO node.Application: Starting Source r1
# 开启 RpcSink k1 连接地址是zjj103 端口44444
20/10/26 14:01:21 INFO sink.AbstractRpcSink: Starting RpcSink k1 { host: zjj103, port: 44444 }...
20/10/26 14:01:21 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
20/10/26 14:01:21 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
20/10/26 14:01:21 INFO sink.AbstractRpcSink: Rpc sink k1: Building RpcClient with hostname: zjj103, port: 44444
# 开始执行命令tail -f /root/soft/test.log ,监控命令的结果
20/10/26 14:01:21 INFO source.ExecSource: Exec source starting with command:tail -f /root/soft/test.log
20/10/26 14:01:21 INFO sink.AvroSink: Attempting to create Avro Rpc client.
20/10/26 14:01:21 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r 1: Successfully registered new MBean.
20/10/26 14:01:21 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
20/10/26 14:01:21 WARN api.NettyAvroRpcClient: Using default maxIOWorkers
20/10/26 14:01:21 INFO sink.AbstractRpcSink: Rpc sink k1 started.
观察zjj103日志
# 这里显示打开了一个绑定的链接
20/10/26 14:01:21 INFO ipc.NettyServer: [id: 0xdc7fd12a, /172.16.10.101:58136 => /172.16.10.103:44444] OPEN
20/10/26 14:01:21 INFO ipc.NettyServer: [id: 0xdc7fd12a, /172.16.10.101:58136 => /172.16.10.103:44444] BOUND: / 172.16.10.103:44444
# 绑定地址是172.16.10.101 这个地址就是zjj101的地址
20/10/26 14:01:21 INFO ipc.NettyServer: [id: 0xdc7fd12a, /172.16.10.101:58136 => /172.16.10.103:44444] CONNECTE D: /172.16.10.101:58136
测试
重新打开一个新的zjj101终端 , 用echo模拟输出日志到test.log
[root@zjj101 soft]# echo 11111111111111111 >> test.log
[root@zjj101 soft]# pwd
/root/soft
这时候你就能看到zjj103的日志显示有消息推送过来了.. 结果我就不粘贴过来了.
启动zjj102机器
启动方式和zjj101几乎差不多
[root@zjj102 job]# flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Df lume.root.logger=DEBUG,console
日志 ,其实日志很重要, 里面详细的记录了启动时候实例化什么东西,绑定了什么东西,等等.
20/10/26 14:04:06 INFO node.AbstractConfigurationProvider: Creating channels
20/10/26 14:04:06 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
20/10/26 14:04:06 INFO node.AbstractConfigurationProvider: Created channel c1
20/10/26 14:04:06 INFO source.DefaultSourceFactory: Creating instance of source r1, type exec
20/10/26 14:04:06 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: avro
20/10/26 14:04:06 INFO sink.AbstractRpcSink: Connection reset is set to 0. Will not reset connection to next ho p
20/10/26 14:04:06 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
20/10/26 14:04:06 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner : { source:org.apache.flume.source.ExecSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.a pache.flume.sink.DefaultSinkProcessor@38daefe5 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apach e.flume.channel.MemoryChannel{name: c1}} }
20/10/26 14:04:06 INFO node.Application: Starting Channel c1
20/10/26 14:04:06 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms
20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
20/10/26 14:04:06 INFO node.Application: Starting Sink k1
20/10/26 14:04:06 INFO node.Application: Starting Source r1
20/10/26 14:04:06 INFO sink.AbstractRpcSink: Starting RpcSink k1 { host: zjj103, port: 44444 }...
20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
20/10/26 14:04:06 INFO sink.AbstractRpcSink: Rpc sink k1: Building RpcClient with hostname: zjj103, port: 44444
20/10/26 14:04:06 INFO source.ExecSource: Exec source starting with command:tail -f /root/soft/test.log
20/10/26 14:04:06 INFO sink.AvroSink: Attempting to create Avro Rpc client.
20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r 1: Successfully registered new MBean.
20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
20/10/26 14:04:06 WARN api.NettyAvroRpcClient: Using default maxIOWorkers
20/10/26 14:04:07 INFO sink.AbstractRpcSink: Rpc sink k1 started.
测试
测试zjj102我就不粘贴了,,和测试zjj101差不多是一样的,
也是打开一个新的zjj102终端,然后用 echo 模拟往test.log日志输出东西..
此时不管你在zjj101还是在zjj102输出的日志信息都被传输到 zjj103的flume的agent里面了…