准备

三台机器,安装好Flume

前言

Multiplexing Channel Selector

  1. Multiplexing Channel Selector根据evnet header中属性,参考用户自己配置的映射信息,将event发送到指定的channel
  1. a1.sources = r1
  2. a1.channels = c1 c2 c3 c4
  3. a1.sources.r1.selector.type = multiplexing
  4. a1.sources.r1.selector.header = state
  5. # 如果state为CZ的话就发送c1
  6. a1.sources.r1.selector.mapping.CZ = c1
  7. # 如果state为US的话就发送 c2 c3
  8. a1.sources.r1.selector.mapping.US = c2 c3
  9. a1.sources.r1.selector.default = c4

r1中每个event根据header中key为state的值,进行选择,如果state=CZ,这类event发送到c1,
如果state=US,这类event发送到c2,c3,state=其他,发送到c4.

编写配置文件

配置文件名字都叫demo3.conf
都放到/root/soft/apache-flume-1.7.0/conf/job/路径下面

zjj101机器

  1. #a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
  2. a1.sources = r1
  3. a1.sinks = k1 k2
  4. a1.channels = c1 c2
  5. #组名名.属性名=属性值
  6. a1.sources.r1.type=exec
  7. a1.sources.r1.command=tail -f /root/soft/test.log
  8. #声明r1的channel选择器
  9. a1.sources.r1.selector.type = multiplexing
  10. # 如果header里面state为CZ的话,就往c1 channel里面发送
  11. # 如果state为US的话,就往c2 channel里面发送
  12. a1.sources.r1.selector.header = state
  13. a1.sources.r1.selector.mapping.CZ = c1
  14. a1.sources.r1.selector.mapping.US = c2
  15. #使用拦截器为event加上某个header
  16. # 这里是为了演示 的,和上面的配置没什么关系
  17. a1.sources.r1.interceptors = i1
  18. a1.sources.r1.interceptors.i1.type = static
  19. # 所有的event都会在 header 里面 key为state的
  20. a1.sources.r1.interceptors.i1.key = state
  21. a1.sources.r1.interceptors.i1.value = CZ
  22. #定义chanel
  23. a1.channels.c1.type=memory
  24. a1.channels.c1.capacity=1000
  25. a1.channels.c2.type=memory
  26. a1.channels.c2.capacity=1000
  27. ##定义sink
  28. # 监听ip为zjj102机器的33333端口
  29. a1.sinks.k1.type=avro
  30. a1.sinks.k1.hostname=zjj102
  31. a1.sinks.k1.port=33333
  32. # 监听ip为zjj103机器的33333端口
  33. a1.sinks.k2.type=avro
  34. a1.sinks.k2.hostname=zjj103
  35. a1.sinks.k2.port=33333
  36. #连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
  37. a1.sources.r1.channels=c1 c2
  38. a1.sinks.k1.channel=c1
  39. a1.sinks.k2.channel=c2

zjj102机器

  1. a1.sources = r1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. #组名名.属性名=属性值
  5. # 监听zjj102机器的33333端口
  6. a1.sources.r1.type=avro
  7. a1.sources.r1.bind=zjj102
  8. a1.sources.r1.port=33333
  9. #定义sink
  10. a1.sinks.k1.type=logger
  11. #定义chanel
  12. a1.channels.c1.type=memory
  13. a1.channels.c1.capacity=1000
  14. #连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
  15. a1.sources.r1.channels=c1
  16. a1.sinks.k1.channel=c1

zjj103机器

  1. a1.sources = r1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. #组名名.属性名=属性值
  5. a1.sources.r1.type=avro
  6. # 监听zjj103机器的33333端口
  7. a1.sources.r1.bind=zjj103
  8. a1.sources.r1.port=33333
  9. #定义sink
  10. a1.sinks.k1.type=logger
  11. #定义chanel
  12. a1.channels.c1.type=memory
  13. a1.channels.c1.capacity=1000
  14. a1.sources.r1.channels=c1
  15. a1.sinks.k1.channel=c1

启动机器

启动顺序 zjj102 zjj103 zjj101

因为zjj102 zjj103 是监听端口 , 所以先启动,

启动zjj102机器

注意 -n 别写错了,要写配置文件里面的名字, 然后就是 -c 路径别写错了.

  1. flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo3.conf" -Dflume.root.logger=DEBUG,console

日志

  1. 20/10/26 10:17:53 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
  2. 20/10/26 10:17:53 INFO conf.FlumeConfiguration: Processing:k1
  3. 20/10/26 10:17:53 INFO conf.FlumeConfiguration: Processing:k1
  4. 20/10/26 10:17:53 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
  5. 20/10/26 10:17:53 INFO node.AbstractConfigurationProvider: Creating channels
  6. 20/10/26 10:17:53 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
  7. 20/10/26 10:17:53 INFO node.AbstractConfigurationProvider: Created channel c1
  8. 20/10/26 10:17:53 INFO source.DefaultSourceFactory: Creating instance of source r1, type avro
  9. 20/10/26 10:17:53 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger
  10. 20/10/26 10:17:53 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
  11. 20/10/26 10:17:53 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:Avro source r1: { bindAddress: zjj102, port: 33333 } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@663a5d counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
  12. 20/10/26 10:17:53 INFO node.Application: Starting Channel c1
  13. 20/10/26 10:17:53 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms
  14. 20/10/26 10:17:53 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
  15. 20/10/26 10:17:53 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
  16. 20/10/26 10:17:53 INFO node.Application: Starting Sink k1
  17. 20/10/26 10:17:53 INFO node.Application: Starting Source r1
  18. 20/10/26 10:17:53 INFO source.AvroSource: Starting Avro source r1: { bindAddress: zjj102, port: 33333 }...
  19. 20/10/26 10:17:54 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
  20. 20/10/26 10:17:54 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
  21. 20/10/26 10:17:54 INFO source.AvroSource: Avro source r1 started.

启动zjj103机器

  1. flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo3.conf" -Dflume.root.logger=DEBUG,console

日志

  1. 20/10/26 09:46:25 INFO node.Application: Starting Channel c1
  2. 20/10/26 09:46:25 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 5 00 ms
  3. 20/10/26 09:46:26 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
  4. 20/10/26 09:46:26 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, na me: c1 started
  5. 20/10/26 09:46:26 INFO node.Application: Starting Sink k1
  6. 20/10/26 09:46:26 INFO node.Application: Starting Source r1
  7. # 启动了
  8. 20/10/26 09:46:26 INFO source.AvroSource: Starting Avro source r1: { bindAddress: zjj103, port: 33333 }...
  9. 20/10/26 09:46:28 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
  10. 20/10/26 09:46:28 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, nam e: r1 started
  11. 20/10/26 09:46:28 INFO source.AvroSource: Avro source r1 started.
  12. 20/10/26 09:47:33 INFO ipc.NettyServer: [id: 0xe193f3e9, /172.16.10.101:56644 => /172.16.10.103:3333 3] OPEN
  13. 20/10/26 09:47:33 INFO ipc.NettyServer: [id: 0xe193f3e9, /172.16.10.101:56644 => /172.16.10.103:3333 3] BOUND: /172.16.10.103:33333
  14. 20/10/26 09:47:33 INFO ipc.NettyServer: [id: 0xe193f3e9, /172.16.10.101:56644 => /172.16.10.103:3333 3] CONNECTED: /172.16.10.101:56644

启动zjj101机器

  1. flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo3.conf" -Dflume.root.logger=DEBUG,console

日志,有下面的说明打印成功了.

  1. 20/10/26 10:17:53 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
  2. 20/10/26 10:17:53 INFO node.Application: Starting Sink k1
  3. 20/10/26 10:17:53 INFO node.Application: Starting Source r1
  4. 20/10/26 10:17:53 INFO source.AvroSource: Starting Avro source r1: { bindAddress: zjj102, port: 33333 }...
  5. 20/10/26 10:17:54 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
  6. 20/10/26 10:17:54 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
  7. 20/10/26 10:17:54 INFO source.AvroSource: Avro source r1 started.
  8. 20/10/26 10:18:21 INFO ipc.NettyServer: [id: 0xa51a2f41, /172.16.10.101:59982 => /172.16.10.102:33333] OPEN
  9. # 绑定了172.16.10.102:33333 ,也就是zjj102机器
  10. 20/10/26 10:18:21 INFO ipc.NettyServer: [id: 0xa51a2f41, /172.16.10.101:59982 => /172.16.10.102:33333] BOUND: /172.16.10.102:33333
  11. 20/10/26 10:18:21 INFO ipc.NettyServer: [id: 0xa51a2f41, /172.16.10.101:59982 => /172.16.10.102:33333] CONNECTED: /172.16.10.101:59982

开始测试

重新打开一个zjj101终端

模拟往test.log里面记录日志

  1. [root@zjj101 soft]# echo 12306 >> test.log

zjj102日志

zjj102出现了这个,说明已经接收到了test.log新的日志内容

  1. 20/10/26 10:20:13 INFO sink.LoggerSink: Event: { headers:{state=CZ} body: 31 32 33 30 36 12306 }

zjj103日志
日志显示没有接收到,. 我就不粘贴在这里了,
因为要想发到 zjj103 需要请求头里面 state属性是US 才能发送.