准备
三台机器,安装好Flume
前言
Multiplexing Channel Selector
- Multiplexing Channel Selector根据evnet header中属性,参考用户自己配置的映射信息,将event发送到指定的channel
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
# 如果state为CZ的话就发送c1
a1.sources.r1.selector.mapping.CZ = c1
# 如果state为US的话就发送 c2 c3
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4
r1中每个event根据header中key为state的值,进行选择,如果state=CZ,这类event发送到c1,
如果state=US,这类event发送到c2,c3,state=其他,发送到c4.
编写配置文件
配置文件名字都叫demo3.conf
都放到/root/soft/apache-flume-1.7.0/conf/job/路径下面
zjj101机器
#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
#组名名.属性名=属性值
a1.sources.r1.type=exec
a1.sources.r1.command=tail -f /root/soft/test.log
#声明r1的channel选择器
a1.sources.r1.selector.type = multiplexing
# 如果header里面state为CZ的话,就往c1 channel里面发送
# 如果state为US的话,就往c2 channel里面发送
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2
#使用拦截器为event加上某个header
# 这里是为了演示 的,和上面的配置没什么关系
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
# 所有的event都会在 header 里面 key为state的
a1.sources.r1.interceptors.i1.key = state
a1.sources.r1.interceptors.i1.value = CZ
#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
##定义sink
# 监听ip为zjj102机器的33333端口
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=zjj102
a1.sinks.k1.port=33333
# 监听ip为zjj103机器的33333端口
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=zjj103
a1.sinks.k2.port=33333
#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1 c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2
zjj102机器
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#组名名.属性名=属性值
# 监听zjj102机器的33333端口
a1.sources.r1.type=avro
a1.sources.r1.bind=zjj102
a1.sources.r1.port=33333
#定义sink
a1.sinks.k1.type=logger
#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
zjj103机器
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#组名名.属性名=属性值
a1.sources.r1.type=avro
# 监听zjj103机器的33333端口
a1.sources.r1.bind=zjj103
a1.sources.r1.port=33333
#定义sink
a1.sinks.k1.type=logger
#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动机器
启动顺序 zjj102 zjj103 zjj101
因为zjj102 zjj103 是监听端口 , 所以先启动,
启动zjj102机器
注意 -n 别写错了,要写配置文件里面的名字, 然后就是 -c 路径别写错了.
flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo3.conf" -Dflume.root.logger=DEBUG,console
日志
20/10/26 10:17:53 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
20/10/26 10:17:53 INFO conf.FlumeConfiguration: Processing:k1
20/10/26 10:17:53 INFO conf.FlumeConfiguration: Processing:k1
20/10/26 10:17:53 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
20/10/26 10:17:53 INFO node.AbstractConfigurationProvider: Creating channels
20/10/26 10:17:53 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
20/10/26 10:17:53 INFO node.AbstractConfigurationProvider: Created channel c1
20/10/26 10:17:53 INFO source.DefaultSourceFactory: Creating instance of source r1, type avro
20/10/26 10:17:53 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger
20/10/26 10:17:53 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
20/10/26 10:17:53 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:Avro source r1: { bindAddress: zjj102, port: 33333 } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@663a5d counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
20/10/26 10:17:53 INFO node.Application: Starting Channel c1
20/10/26 10:17:53 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms
20/10/26 10:17:53 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
20/10/26 10:17:53 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
20/10/26 10:17:53 INFO node.Application: Starting Sink k1
20/10/26 10:17:53 INFO node.Application: Starting Source r1
20/10/26 10:17:53 INFO source.AvroSource: Starting Avro source r1: { bindAddress: zjj102, port: 33333 }...
20/10/26 10:17:54 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
20/10/26 10:17:54 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
20/10/26 10:17:54 INFO source.AvroSource: Avro source r1 started.
启动zjj103机器
flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo3.conf" -Dflume.root.logger=DEBUG,console
日志
20/10/26 09:46:25 INFO node.Application: Starting Channel c1
20/10/26 09:46:25 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 5 00 ms
20/10/26 09:46:26 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
20/10/26 09:46:26 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, na me: c1 started
20/10/26 09:46:26 INFO node.Application: Starting Sink k1
20/10/26 09:46:26 INFO node.Application: Starting Source r1
# 启动了
20/10/26 09:46:26 INFO source.AvroSource: Starting Avro source r1: { bindAddress: zjj103, port: 33333 }...
20/10/26 09:46:28 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
20/10/26 09:46:28 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, nam e: r1 started
20/10/26 09:46:28 INFO source.AvroSource: Avro source r1 started.
20/10/26 09:47:33 INFO ipc.NettyServer: [id: 0xe193f3e9, /172.16.10.101:56644 => /172.16.10.103:3333 3] OPEN
20/10/26 09:47:33 INFO ipc.NettyServer: [id: 0xe193f3e9, /172.16.10.101:56644 => /172.16.10.103:3333 3] BOUND: /172.16.10.103:33333
20/10/26 09:47:33 INFO ipc.NettyServer: [id: 0xe193f3e9, /172.16.10.101:56644 => /172.16.10.103:3333 3] CONNECTED: /172.16.10.101:56644
启动zjj101机器
flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo3.conf" -Dflume.root.logger=DEBUG,console
日志,有下面的说明打印成功了.
20/10/26 10:17:53 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
20/10/26 10:17:53 INFO node.Application: Starting Sink k1
20/10/26 10:17:53 INFO node.Application: Starting Source r1
20/10/26 10:17:53 INFO source.AvroSource: Starting Avro source r1: { bindAddress: zjj102, port: 33333 }...
20/10/26 10:17:54 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
20/10/26 10:17:54 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
20/10/26 10:17:54 INFO source.AvroSource: Avro source r1 started.
20/10/26 10:18:21 INFO ipc.NettyServer: [id: 0xa51a2f41, /172.16.10.101:59982 => /172.16.10.102:33333] OPEN
# 绑定了172.16.10.102:33333 ,也就是zjj102机器
20/10/26 10:18:21 INFO ipc.NettyServer: [id: 0xa51a2f41, /172.16.10.101:59982 => /172.16.10.102:33333] BOUND: /172.16.10.102:33333
20/10/26 10:18:21 INFO ipc.NettyServer: [id: 0xa51a2f41, /172.16.10.101:59982 => /172.16.10.102:33333] CONNECTED: /172.16.10.101:59982
开始测试
重新打开一个zjj101终端
模拟往test.log里面记录日志
[root@zjj101 soft]# echo 12306 >> test.log
zjj102日志
zjj102出现了这个,说明已经接收到了test.log新的日志内容
20/10/26 10:20:13 INFO sink.LoggerSink: Event: { headers:{state=CZ} body: 31 32 33 30 36 12306 }
zjj103日志
日志显示没有接收到,. 我就不粘贴在这里了,
因为要想发到 zjj103 需要请求头里面 state属性是US 才能发送.