3.1 FLUME 事物

Put 事务流程:
do Put: 将此数据写入临时缓冲区putList
do commit: 检查channel内存队列是否足够putList的所有数据发送
do rollback: channel内存空间不足, 回滚数据
Take 事务流程:
do take: 将数据取到临时缓冲区takeList中,并将数据发送到HDFS中
do commit: 如果数据全部发送成功,则清除临时缓冲区takeList
do roolback: 数据发送过程中如果出现异常,临时缓冲区takeList将数据还给channel内存队列。这意味着可能takeList的数据发送一半又重新归还给Channel,会出现数据重复。

3.2 FLUME AGENT 内部原理

ChannelSelector
ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。
ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。

SinkProcessor
SinkProcessor共有三种类型,分别是DefaultSinkProcessor、LoadBalancingSinkProcessor(负载均衡)和FailoverSinkProcessor(故障转移)
DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以故障转移,实现高可用。

  1. Source: 接收数据,将数据封装成event

① channelProcessor : 处理Event
② Interceptor: 拦截器的处理
③ channelSelector:
replicating(复制,给每个channel的数据一样)
multiplexing(多路,根据event的header进行判断决定给到哪个channel)
SinkProcessor: 决定event给哪些个sink
① DefaultSinkProcessor 只支持一个sink
② LoadBalancingSinkProcessor 负载均衡,基于轮询或者随机的规则将event给到sink
③ FailoverSinkProcessor 故障转移,实现高可用.
整体的数据流
Source-> channelProcessor -> [Interceptor -> Interceptor -> Interceptor -> …] -> ChannelSelector -> SinkProcessor -> Sink

3.3 FLUME 拓扑结构

3.3.1

image.png
这种模式是将多个flume顺序连接起来了,从最初的source开始到最终sink传送的目的存储系统。此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点flume宕机,会影响整个传输系统。

3.3.2 复制和多路复用

image.png

Flume支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel中,或者将不同数据分发到不同的channel中,sink可以选择传送到不同的目的地。

3.3.3负载均衡和故障转移

image.png
Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。

image.png
这种模式是我们最常见的,也非常实用,日常web应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用flume的这种组合方式能很好的解决这一问题,每台服务器部署一个flume采集日志,传送到一个集中收集日志的flume,再由此flume上传到hdfs、hive、hbase等,进行日志分析。

3.4 FLUME 企业开发

3.4.1复制和多路复用

编辑配置文件

  1. # example.conf: A single-node Flume configuration
  2. # Name the components on this agent
  3. a3.sources = r1
  4. a3.sinks = k1
  5. a3.channels = c1
  6. # Describe/configure the source
  7. a3.sources = r1
  8. a3.sources.r1.type = avro
  9. a3.sources.r1.channels = c1
  10. a3.sources.r1.bind = 0.0.0.0
  11. a3.sources.r1.port = 44444
  12. # Describe the sink
  13. a3.sinks.k1.type = hdfs
  14. a3.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
  15. a3.sinks.k1.hdfs.filePrefix = events-
  16. a3.sinks.k1.hdfs.round = true
  17. a3.sinks.k1.hdfs.roundValue = 10
  18. a3.sinks.k1.hdfs.roundUnit = minute
  19. a3.sinks.k1.hdfs.fileType = DataStream
  20. a3.sinks.k1.hdfs.useLocalTimeStamp = true
  21. # Use a channel which buffers events in memory
  22. a3.channels.c1.type = memory
  23. a3.channels.c1.capacity = 1000
  24. a3.channels.c1.transactionCapacity = 100
  25. # Bind the source and sink to the channel
  26. a3.sources.r1.channels = c1
  27. a3.sinks.k1.channel = c1

flume-ng agent -c $FLUME_HOME/conf -f 03-flume-avro-hdfs.conf -n a3 -Dflume.root.logger=INFO,console

  1. # example.conf: A single-node Flume configuration
  2. # Name the components on this agent
  3. a2.sources = r1
  4. a2.sinks = k1
  5. a2.channels = c1
  6. # Describe/configure the source
  7. a2.sources = r1
  8. a2.sources.r1.type = avro
  9. a2.sources.r1.channels = c1
  10. a2.sources.r1.bind = 0.0.0.0
  11. a2.sources.r1.port = 55555
  12. # Describe the sink
  13. a2.sinks.k1.type = file_roll
  14. a2.sinks.k1.sink.directory = /home/atguigu/flume/file_roll
  15. # Use a channel which buffers events in memory
  16. a2.channels.c1.type = memory
  17. a2.channels.c1.capacity = 1000
  18. a2.channels.c1.transactionCapacity = 100
  19. # Bind the source and sink to the channel
  20. a2.sources.r1.channels = c1
  21. a2.sinks.k1.channel = c1

flume-ng agent -c $FLUME_HOME/conf -f 02-flume-avro-fill-roll.conf -n a2 -Dflume.root.logger=INFO,console

  1. # example.conf: A single-node Flume configuration
  2. # Name the components on this agent
  3. a1.sources = r1
  4. a1.sinks = k1 k2
  5. a1.channels = c1 c2
  6. # replicating Configuration
  7. a1.sources.r1.selector.type = replicating
  8. a1.sources.r1.channels = c1 c2
  9. # Describe/configure the source
  10. a1.sources.r1.type = TAILDIR
  11. a1.sources.r1.positionFile = /home/atguigu/flume/taildir/taildir.json
  12. a1.sources.r1.filegroups = f1 f2
  13. a1.sources.r1.filegroups.f1 = /home/atguigu/flume/taildir/.*log
  14. a1.sources.r1.filegroups.f2 = /home/atguigu/flume/taildir/.*data
  15. # Describe the sink
  16. a1.sinks.k1.type = avro
  17. a1.sinks.k1.hostname = hadoop102
  18. a1.sinks.k1.port = 44444
  19. a1.sinks.k2.type = avro
  20. a1.sinks.k2.hostname = hadoop102
  21. a1.sinks.k2.port = 55555
  22. # Use a channel which buffers events in memory
  23. a1.channels.c1.type = memory
  24. a1.channels.c1.capacity = 1000
  25. a1.channels.c1.transactionCapacity = 100
  26. a1.channels.c2.type = memory
  27. a1.channels.c2.capacity = 1000
  28. a1.channels.c2.transactionCapacity = 100
  29. # Bind the source and sink to the channel
  30. a1.sources.r1.channels = c1 c2
  31. a1.sinks.k1.channel = c1
  32. a1.sinks.k2.channel = c2

flume-ng agent -c $FLUME_HOME/conf -f 01-flume-taildir-replication.conf -n a1 -Dflume.root.logger=Info,console

3.4.2 负载均衡和故障转移

  1. # example.conf: A single-node Flume configuration
  2. # Name the components on this agent
  3. a3.sources = r1
  4. a3.sinks = k1
  5. a3.channels = c1
  6. # Describe/configure the source
  7. a3.sources = r1
  8. a3.sources.r1.type = avro
  9. a3.sources.r1.channels = c1
  10. a3.sources.r1.bind = 0.0.0.0
  11. a3.sources.r1.port = 44444
  12. # Describe the sink
  13. a3.sinks.k1.type = logger
  14. # Use a channel which buffers events in memory
  15. a3.channels.c1.type = memory
  16. a3.channels.c1.capacity = 1000
  17. a3.channels.c1.transactionCapacity = 100
  18. # Bind the source and sink to the channel
  19. a3.sources.r1.channels = c1
  20. a3.sinks.k1.channel = c1

flume-ng agent -c $FLUME_HOME/conf -f 03-flume-avro-logger.conf -n a3 -Dflume.root.logger=INFO,console

  1. # example.conf: A single-node Flume configuration
  2. # Name the components on this agent
  3. a2.sources = r1
  4. a2.sinks = k1
  5. a2.channels = c1
  6. # Describe/configure the source
  7. a2.sources = r1
  8. a2.sources.r1.type = avro
  9. a2.sources.r1.channels = c1
  10. a2.sources.r1.bind = 0.0.0.0
  11. a2.sources.r1.port = 55555
  12. # Describe the sink
  13. a2.sinks.k1.type = logger
  14. # Use a channel which buffers events in memory
  15. a2.channels.c1.type = memory
  16. a2.channels.c1.capacity = 1000
  17. a2.channels.c1.transactionCapacity = 100
  18. # Bind the source and sink to the channel
  19. a2.sources.r1.channels = c1
  20. a2.sinks.k1.channel = c1

flume-ng agent -c $FLUME_HOME/conf -f 02-flume-avro-logger.conf -n a2 -Dflume.root.logger=INFO,console

  1. # example.conf: A single-node Flume configuration
  2. # Name the components on this agent
  3. a1.sources = r1
  4. a1.sinks = k1 k2
  5. a1.channels = c1
  6. # sink processor k2的优先级大于k1,当k2宕机重启成功后,仍会选择k2进行数据发送
  7. a1.sinkgroups = g1
  8. a1.sinkgroups.g1.sinks = k1 k2
  9. a1.sinkgroups.g1.processor.type = failover
  10. a1.sinkgroups.g1.processor.priority.k1 = 5
  11. a1.sinkgroups.g1.processor.priority.k2 = 10
  12. a1.sinkgroups.g1.processor.type = load_balance
  13. a1.sinkgroups.g1.processor.backoff = true
  14. a1.sinkgroups.g1.processor.selector = random
  15. # Describe/configure the source
  16. a1.sources.r1.type = netcat
  17. a1.sources.r1.bind = 0.0.0.0
  18. a1.sources.r1.port = 6666
  19. # Describe the sink
  20. a1.sinks.k1.type = avro
  21. a1.sinks.k1.hostname = hadoop102
  22. a1.sinks.k1.port = 44444
  23. a1.sinks.k2.type = avro
  24. a1.sinks.k2.hostname = hadoop102
  25. a1.sinks.k2.port = 55555
  26. # Use a channel which buffers events in memory
  27. a1.channels.c1.type = memory
  28. a1.channels.c1.capacity = 1000
  29. a1.channels.c1.transactionCapacity = 100
  30. # Bind the source and sink to the channel
  31. a1.sources.r1.channels = c1
  32. a1.sinks.k1.channel = c1
  33. a1.sinks.k2.channel = c1

flume-ng agent -c $FLUME_HOME/conf -f 01-flume-netcat-failover.conf -n a1 -Dflume.root.logger=Info,console

3.4.3 FLUME 聚合

  1. # example.conf: A single-node Flume configuration
  2. # Name the components on this agent
  3. a1.sources = r1
  4. a1.sinks = k1
  5. a1.channels = c1
  6. # Describe/configure the source
  7. a1.sources.r1.type = netcat
  8. a1.sources.r1.bind = 0.0.0.0
  9. a1.sources.r1.port = 44444
  10. # Describe the sink
  11. a1.sinks.k1.type = avro
  12. a1.sinks.k1.hostname = hadoop102
  13. a1.sinks.k1.port = 55555
  14. # Use a channel which buffers events in memory
  15. a1.channels.c1.type = memory
  16. a1.channels.c1.capacity = 1000
  17. a1.channels.c1.transactionCapacity = 100
  18. # Bind the source and sink to the channel
  19. a1.sources.r1.channels = c1
  20. a1.sinks.k1.channel = c1

flume-ng agent -c $FLUME_HOME/conf -f 03-flume-netcat-avro.conf -n a1 -Dflume.root.logger=INFO,console

  1. # example.conf: A single-node Flume configuration
  2. # Name the components on this agent
  3. a1.sources = r1
  4. a1.sinks = k1
  5. a1.channels = c1
  6. # Describe/configure the source
  7. a1.sources.r1.type = netcat
  8. a1.sources.r1.bind = 0.0.0.0
  9. a1.sources.r1.port = 44444
  10. # Describe the sink
  11. a1.sinks.k1.type = avro
  12. a1.sinks.k1.hostname = hadoop102
  13. a1.sinks.k1.port = 55555
  14. # Use a channel which buffers events in memory
  15. a1.channels.c1.type = memory
  16. a1.channels.c1.capacity = 1000
  17. a1.channels.c1.transactionCapacity = 100
  18. # Bind the source and sink to the channel
  19. a1.sources.r1.channels = c1
  20. a1.sinks.k1.channel = c1

flume-ng agent -c $FLUME_HOME/conf -f 02-flume-exec-avro.conf -n a1 -Dflume.root.logger=INFO,console

  1. # example.conf: A single-node Flume configuration
  2. # Name the components on this agent
  3. a1.sources = r1 r2
  4. a1.sinks = k1
  5. a1.channels = c1
  6. # Describe/configure the source
  7. a1.sources = r1 r2
  8. a1.sources.r1.type = avro
  9. a1.sources.r1.bind = 0.0.0.0
  10. a1.sources.r1.port = 55555
  11. a1.sources.r2.type = avro
  12. a1.sources.r2.bind = 0.0.0.0
  13. a1.sources.r2.port = 44444
  14. # Describe the sink
  15. a1.sinks.k1.type = logger
  16. # Use a channel which buffers events in memory
  17. a1.channels.c1.type = memory
  18. a1.channels.c1.capacity = 1000
  19. a1.channels.c1.transactionCapacity = 100
  20. # Bind the source and sink to the channel
  21. a1.sources.r1.channels = c1
  22. a1.sources.r2.channels = c1
  23. a1.sinks.k1.channel = c1

flume-ng agent -c $FLUME_HOME/conf -n a1 -f 01-flume-avro-logger.conf -Dflume.root.logger=INlsFO,console

3.5 自定义 Interceptor

3.5 FLUME 数据流监控

监控Flume写入写出两个事务的触发次数和成功次数