3.1 FLUME 事物
Put 事务流程:
do Put: 将此数据写入临时缓冲区putList
do commit: 检查channel内存队列是否足够putList的所有数据发送
do rollback: channel内存空间不足, 回滚数据
Take 事务流程:
do take: 将数据取到临时缓冲区takeList中,并将数据发送到HDFS中
do commit: 如果数据全部发送成功,则清除临时缓冲区takeList
do roolback: 数据发送过程中如果出现异常,临时缓冲区takeList将数据还给channel内存队列。这意味着可能takeList的数据发送一半又重新归还给Channel,会出现数据重复。
3.2 FLUME AGENT 内部原理
ChannelSelector
ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。
ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。
SinkProcessor
SinkProcessor共有三种类型,分别是DefaultSinkProcessor、LoadBalancingSinkProcessor(负载均衡)和FailoverSinkProcessor(故障转移)
DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以故障转移,实现高可用。
- Source: 接收数据,将数据封装成event
① channelProcessor : 处理Event
② Interceptor: 拦截器的处理
③ channelSelector:
replicating(复制,给每个channel的数据一样)
multiplexing(多路,根据event的header进行判断决定给到哪个channel)
SinkProcessor: 决定event给哪些个sink
① DefaultSinkProcessor 只支持一个sink
② LoadBalancingSinkProcessor 负载均衡,基于轮询或者随机的规则将event给到sink
③ FailoverSinkProcessor 故障转移,实现高可用.
整体的数据流
Source-> channelProcessor -> [Interceptor -> Interceptor -> Interceptor -> …] -> ChannelSelector -> SinkProcessor -> Sink
3.3 FLUME 拓扑结构
3.3.1
这种模式是将多个flume顺序连接起来了,从最初的source开始到最终sink传送的目的存储系统。此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点flume宕机,会影响整个传输系统。
3.3.2 复制和多路复用
Flume支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel中,或者将不同数据分发到不同的channel中,sink可以选择传送到不同的目的地。
3.3.3负载均衡和故障转移
Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。
这种模式是我们最常见的,也非常实用,日常web应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用flume的这种组合方式能很好的解决这一问题,每台服务器部署一个flume采集日志,传送到一个集中收集日志的flume,再由此flume上传到hdfs、hive、hbase等,进行日志分析。
3.4 FLUME 企业开发
3.4.1复制和多路复用
编辑配置文件
# example.conf: A single-node Flume configuration
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources = r1
a3.sources.r1.type = avro
a3.sources.r1.channels = c1
a3.sources.r1.bind = 0.0.0.0
a3.sources.r1.port = 44444
# Describe the sink
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a3.sinks.k1.hdfs.filePrefix = events-
a3.sinks.k1.hdfs.round = true
a3.sinks.k1.hdfs.roundValue = 10
a3.sinks.k1.hdfs.roundUnit = minute
a3.sinks.k1.hdfs.fileType = DataStream
a3.sinks.k1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
flume-ng agent -c $FLUME_HOME/conf -f 03-flume-avro-hdfs.conf -n a3 -Dflume.root.logger=INFO,console
# example.conf: A single-node Flume configuration
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources = r1
a2.sources.r1.type = avro
a2.sources.r1.channels = c1
a2.sources.r1.bind = 0.0.0.0
a2.sources.r1.port = 55555
# Describe the sink
a2.sinks.k1.type = file_roll
a2.sinks.k1.sink.directory = /home/atguigu/flume/file_roll
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-ng agent -c $FLUME_HOME/conf -f 02-flume-avro-fill-roll.conf -n a2 -Dflume.root.logger=INFO,console
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# replicating Configuration
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /home/atguigu/flume/taildir/taildir.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /home/atguigu/flume/taildir/.*log
a1.sources.r1.filegroups.f2 = /home/atguigu/flume/taildir/.*data
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 44444
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 55555
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
flume-ng agent -c $FLUME_HOME/conf -f 01-flume-taildir-replication.conf -n a1 -Dflume.root.logger=Info,console
3.4.2 负载均衡和故障转移
# example.conf: A single-node Flume configuration
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources = r1
a3.sources.r1.type = avro
a3.sources.r1.channels = c1
a3.sources.r1.bind = 0.0.0.0
a3.sources.r1.port = 44444
# Describe the sink
a3.sinks.k1.type = logger
# Use a channel which buffers events in memory
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
flume-ng agent -c $FLUME_HOME/conf -f 03-flume-avro-logger.conf -n a3 -Dflume.root.logger=INFO,console
# example.conf: A single-node Flume configuration
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources = r1
a2.sources.r1.type = avro
a2.sources.r1.channels = c1
a2.sources.r1.bind = 0.0.0.0
a2.sources.r1.port = 55555
# Describe the sink
a2.sinks.k1.type = logger
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume-ng agent -c $FLUME_HOME/conf -f 02-flume-avro-logger.conf -n a2 -Dflume.root.logger=INFO,console
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
# sink processor k2的优先级大于k1,当k2宕机重启成功后,仍会选择k2进行数据发送
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 44444
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 55555
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
flume-ng agent -c $FLUME_HOME/conf -f 01-flume-netcat-failover.conf -n a1 -Dflume.root.logger=Info,console
3.4.3 FLUME 聚合
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 55555
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume-ng agent -c $FLUME_HOME/conf -f 03-flume-netcat-avro.conf -n a1 -Dflume.root.logger=INFO,console
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 55555
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume-ng agent -c $FLUME_HOME/conf -f 02-flume-exec-avro.conf -n a1 -Dflume.root.logger=INFO,console
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1 r2
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources = r1 r2
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 55555
a1.sources.r2.type = avro
a1.sources.r2.bind = 0.0.0.0
a1.sources.r2.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sinks.k1.channel = c1
flume-ng agent -c $FLUME_HOME/conf -n a1 -f 01-flume-avro-logger.conf -Dflume.root.logger=INlsFO,console