参考图

使用场景
比如说你收集的日志信息最终都需要写到HDFS上,但是HDFS都是公司内部来使用的,不会让外网来访问的,因此公司可能会有一台机器Flume3,只有Flume3这一台机器有权限来访问HDFS,
那么这个时候可能有Flume1和Flume2两个机器是收集外网的数据,比如这两个机器都各自开放了一个端口让别的程序来往这里写入,但是Flume1和Flume2是没权限往HDFS里面写入的.
这样解决办法就是让Flume1和Flume2把数据传给Flume3,让Flume3去访问HDFS.
准备
三台机器安装了Flume
zjj101 zjj102 zjj103
编写配置文件
配置文件名字都叫demo4.conf
都放到/root/soft/apache-flume-1.7.0/conf/job/路径下面
zjj101
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = exec# 监控日志追加写a1.sources.r1.command = tail -f /root/soft/test.loga1.sources.r1.shell = /bin/bash -c# Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = zjj103a1.sinks.k1.port = 44444# Describe the channela1.channels.c1.type = memorya1.channels.c1.capacity = 10000a1.channels.c1.transactionCapacity = 1000# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
zjj102
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = exec# 监控日志追加写a1.sources.r1.command = tail -f /root/soft/test.loga1.sources.r1.shell = /bin/bash -c# Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = zjj103a1.sinks.k1.port = 44444# Describe the channela1.channels.c1.type = memorya1.channels.c1.capacity = 10000a1.channels.c1.transactionCapacity = 1000# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
zjj103
#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔a1.sources = r1a1.sinks = k1a1.channels = c1# 绑定了ip为zjj103 ,端口为44444 ,别的agent可以通过这个ip端口推送内容,a1.sources.r1.type=avroa1.sources.r1.bind=zjj103a1.sources.r1.port=44444#定义sinka1.sinks.k1.type=logger#定义chanela1.channels.c1.type=memorya1.channels.c1.capacity=1000#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据a1.sources.r1.channels=c1a1.sinks.k1.channel=c1
启动项目并测试
先启动 zjj103 再启动 zjj101
启动zjj103
启动zjj103
[root@zjj103 job]# flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Df lume.root.logger=DEBUG,console
观察zjj103日志
# 显示已经开启了zjj103ip 44444端口的 source,20/10/26 14:00:21 INFO source.AvroSource: Starting Avro source r1: { bindAddress: zjj103, port: 44444 }...20/10/26 14:00:22 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r 1: Successfully registered new MBean.20/10/26 14:00:22 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started20/10/26 14:00:22 INFO source.AvroSource: Avro source r1 started.
启动zjj101
[root@zjj101 job]# flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Df lume.root.logger=DEBUG,console
观察 zjj101日志
# 启动了一个配置信息20/10/26 14:01:20 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner : { source:org.apache.flume.source.ExecSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.a pache.flume.sink.DefaultSinkProcessor@1a72854b counterGroup:{ name:null counters:{} } }} channels:{c1=org.apach e.flume.channel.MemoryChannel{name: c1}} }# 启动了Channel c120/10/26 14:01:20 INFO node.Application: Starting Channel c120/10/26 14:01:20 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms20/10/26 14:01:20 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.20/10/26 14:01:20 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started# 启动了k1 Sink20/10/26 14:01:21 INFO node.Application: Starting Sink k1# 启动了r1Source20/10/26 14:01:21 INFO node.Application: Starting Source r1# 开启 RpcSink k1 连接地址是zjj103 端口4444420/10/26 14:01:21 INFO sink.AbstractRpcSink: Starting RpcSink k1 { host: zjj103, port: 44444 }...20/10/26 14:01:21 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.20/10/26 14:01:21 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started20/10/26 14:01:21 INFO sink.AbstractRpcSink: Rpc sink k1: Building RpcClient with hostname: zjj103, port: 44444# 开始执行命令tail -f /root/soft/test.log ,监控命令的结果20/10/26 14:01:21 INFO source.ExecSource: Exec source starting with command:tail -f /root/soft/test.log20/10/26 14:01:21 INFO sink.AvroSink: Attempting to create Avro Rpc client.20/10/26 14:01:21 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r 1: Successfully registered new MBean.20/10/26 14:01:21 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started20/10/26 14:01:21 WARN api.NettyAvroRpcClient: Using default maxIOWorkers20/10/26 14:01:21 INFO sink.AbstractRpcSink: Rpc sink k1 started.
观察zjj103日志
# 这里显示打开了一个绑定的链接20/10/26 14:01:21 INFO ipc.NettyServer: [id: 0xdc7fd12a, /172.16.10.101:58136 => /172.16.10.103:44444] OPEN20/10/26 14:01:21 INFO ipc.NettyServer: [id: 0xdc7fd12a, /172.16.10.101:58136 => /172.16.10.103:44444] BOUND: / 172.16.10.103:44444# 绑定地址是172.16.10.101 这个地址就是zjj101的地址20/10/26 14:01:21 INFO ipc.NettyServer: [id: 0xdc7fd12a, /172.16.10.101:58136 => /172.16.10.103:44444] CONNECTE D: /172.16.10.101:58136
测试
重新打开一个新的zjj101终端 , 用echo模拟输出日志到test.log
[root@zjj101 soft]# echo 11111111111111111 >> test.log[root@zjj101 soft]# pwd/root/soft
这时候你就能看到zjj103的日志显示有消息推送过来了.. 结果我就不粘贴过来了.
启动zjj102机器
启动方式和zjj101几乎差不多
[root@zjj102 job]# flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Df lume.root.logger=DEBUG,console
日志 ,其实日志很重要, 里面详细的记录了启动时候实例化什么东西,绑定了什么东西,等等.
20/10/26 14:04:06 INFO node.AbstractConfigurationProvider: Creating channels20/10/26 14:04:06 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory20/10/26 14:04:06 INFO node.AbstractConfigurationProvider: Created channel c120/10/26 14:04:06 INFO source.DefaultSourceFactory: Creating instance of source r1, type exec20/10/26 14:04:06 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: avro20/10/26 14:04:06 INFO sink.AbstractRpcSink: Connection reset is set to 0. Will not reset connection to next ho p20/10/26 14:04:06 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]20/10/26 14:04:06 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner : { source:org.apache.flume.source.ExecSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.a pache.flume.sink.DefaultSinkProcessor@38daefe5 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apach e.flume.channel.MemoryChannel{name: c1}} }20/10/26 14:04:06 INFO node.Application: Starting Channel c120/10/26 14:04:06 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started20/10/26 14:04:06 INFO node.Application: Starting Sink k120/10/26 14:04:06 INFO node.Application: Starting Source r120/10/26 14:04:06 INFO sink.AbstractRpcSink: Starting RpcSink k1 { host: zjj103, port: 44444 }...20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started20/10/26 14:04:06 INFO sink.AbstractRpcSink: Rpc sink k1: Building RpcClient with hostname: zjj103, port: 4444420/10/26 14:04:06 INFO source.ExecSource: Exec source starting with command:tail -f /root/soft/test.log20/10/26 14:04:06 INFO sink.AvroSink: Attempting to create Avro Rpc client.20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r 1: Successfully registered new MBean.20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started20/10/26 14:04:06 WARN api.NettyAvroRpcClient: Using default maxIOWorkers20/10/26 14:04:07 INFO sink.AbstractRpcSink: Rpc sink k1 started.
测试
测试zjj102我就不粘贴了,,和测试zjj101差不多是一样的,
也是打开一个新的zjj102终端,然后用 echo 模拟往test.log日志输出东西..
此时不管你在zjj101还是在zjj102输出的日志信息都被传输到 zjj103的flume的agent里面了…
