zjj101 zjj102 zjj103
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
# 监控日志追加写
a1.sources.r1.command = tail -f /root/soft/test.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = zjj103
a1.sinks.k1.port = 44444
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
# 监控日志追加写
a1.sources.r1.command = tail -f /root/soft/test.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = zjj103
a1.sinks.k1.port = 44444
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 绑定了ip为zjj103 ,端口为44444 ,别的agent可以通过这个ip端口推送内容,
#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据
先启动 zjj103 再启动 zjj101
[root@zjj103 job]# flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Df lume.root.logger=DEBUG,console
# 显示已经开启了zjj103ip 44444端口的 source,
20/10/26 14:00:21 INFO source.AvroSource: Starting Avro source r1: { bindAddress: zjj103, port: 44444 }...
20/10/26 14:00:22 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r 1: Successfully registered new MBean.
20/10/26 14:00:22 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
20/10/26 14:00:22 INFO source.AvroSource: Avro source r1 started.
[root@zjj101 job]# flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Df lume.root.logger=DEBUG,console
观察 zjj101日志
# 启动了一个配置信息
20/10/26 14:01:20 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner : { source:org.apache.flume.source.ExecSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.a pache.flume.sink.DefaultSinkProcessor@1a72854b counterGroup:{ name:null counters:{} } }} channels:{c1=org.apach e.flume.channel.MemoryChannel{name: c1}} }
# 启动了Channel c1
20/10/26 14:01:20 INFO node.Application: Starting Channel c1
20/10/26 14:01:20 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms
20/10/26 14:01:20 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
20/10/26 14:01:20 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
# 启动了k1 Sink
20/10/26 14:01:21 INFO node.Application: Starting Sink k1
# 启动了r1Source
20/10/26 14:01:21 INFO node.Application: Starting Source r1
# 开启 RpcSink k1 连接地址是zjj103 端口44444
20/10/26 14:01:21 INFO sink.AbstractRpcSink: Starting RpcSink k1 { host: zjj103, port: 44444 }...
20/10/26 14:01:21 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
20/10/26 14:01:21 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
20/10/26 14:01:21 INFO sink.AbstractRpcSink: Rpc sink k1: Building RpcClient with hostname: zjj103, port: 44444
# 开始执行命令tail -f /root/soft/test.log ,监控命令的结果
20/10/26 14:01:21 INFO source.ExecSource: Exec source starting with command:tail -f /root/soft/test.log
20/10/26 14:01:21 INFO sink.AvroSink: Attempting to create Avro Rpc client.
20/10/26 14:01:21 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r 1: Successfully registered new MBean.
20/10/26 14:01:21 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
20/10/26 14:01:21 WARN api.NettyAvroRpcClient: Using default maxIOWorkers
20/10/26 14:01:21 INFO sink.AbstractRpcSink: Rpc sink k1 started.
# 这里显示打开了一个绑定的链接
20/10/26 14:01:21 INFO ipc.NettyServer: [id: 0xdc7fd12a, / => /] OPEN
20/10/26 14:01:21 INFO ipc.NettyServer: [id: 0xdc7fd12a, / => /] BOUND: /
# 绑定地址是172.16.10.101 这个地址就是zjj101的地址
20/10/26 14:01:21 INFO ipc.NettyServer: [id: 0xdc7fd12a, / => /] CONNECTE D: /
重新打开一个新的zjj101终端 , 用echo模拟输出日志到test.log
[root@zjj101 soft]# echo 11111111111111111 >> test.log
[root@zjj101 soft]# pwd
这时候你就能看到zjj103的日志显示有消息推送过来了.. 结果我就不粘贴过来了.
[root@zjj102 job]# flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo4.conf" -Df lume.root.logger=DEBUG,console
日志 ,其实日志很重要, 里面详细的记录了启动时候实例化什么东西,绑定了什么东西,等等.
20/10/26 14:04:06 INFO node.AbstractConfigurationProvider: Creating channels
20/10/26 14:04:06 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
20/10/26 14:04:06 INFO node.AbstractConfigurationProvider: Created channel c1
20/10/26 14:04:06 INFO source.DefaultSourceFactory: Creating instance of source r1, type exec
20/10/26 14:04:06 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: avro
20/10/26 14:04:06 INFO sink.AbstractRpcSink: Connection reset is set to 0. Will not reset connection to next ho p
20/10/26 14:04:06 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
20/10/26 14:04:06 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner : { source:org.apache.flume.source.ExecSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.a pache.flume.sink.DefaultSinkProcessor@38daefe5 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apach e.flume.channel.MemoryChannel{name: c1}} }
20/10/26 14:04:06 INFO node.Application: Starting Channel c1
20/10/26 14:04:06 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms
20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
20/10/26 14:04:06 INFO node.Application: Starting Sink k1
20/10/26 14:04:06 INFO node.Application: Starting Source r1
20/10/26 14:04:06 INFO sink.AbstractRpcSink: Starting RpcSink k1 { host: zjj103, port: 44444 }...
20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
20/10/26 14:04:06 INFO sink.AbstractRpcSink: Rpc sink k1: Building RpcClient with hostname: zjj103, port: 44444
20/10/26 14:04:06 INFO source.ExecSource: Exec source starting with command:tail -f /root/soft/test.log
20/10/26 14:04:06 INFO sink.AvroSink: Attempting to create Avro Rpc client.
20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r 1: Successfully registered new MBean.
20/10/26 14:04:06 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
20/10/26 14:04:06 WARN api.NettyAvroRpcClient: Using default maxIOWorkers
20/10/26 14:04:07 INFO sink.AbstractRpcSink: Rpc sink k1 started.
也是打开一个新的zjj102终端,然后用 echo 模拟往test.log日志输出东西..
此时不管你在zjj101还是在zjj102输出的日志信息都被传输到 zjj103的flume的agent里面了…