Flume

Flume是一个开箱即用的数据传输组件,可提供分布式、可用、可靠的数据收集、聚合、移动服务,具有伸缩的健壮性、容错性,failover及数据恢复机制。Flume使用Event数据模型在集群内多个Flume节点间进行数据流转,支持在线的数据分析应用。

Flume模型

Flume的实例称为agent,是集群内一个完整的节点,每个agent包含三个组件:Source,Channel,Sink,数据被封装成Event在多个agent间顺序流经Source、Channel、Sink,这便是Flume的模型结构。一个agent内,Source与Sink为单独线程,通过Channel独立的进行数据的写入和提取。

  • Source使用特定协议从上游数据源接收Event,流入agent,写进Channel;
  • Channel缓存Source流入的Event,被Sink处理。Channel是一个数据换从队列,有多种存储方案,能缓冲流出失败的数据(可靠性),使用磁盘或外部存储方案的Channel能使节点从故障中进行恢复(故障恢复);
  • Sink提取Channel中的Event,使Event流出agent,流向外部存储或下游数据节点,Sink使用事务进行Event递交,Event递交成功后才会从Channel中移除(可靠性)。

Flume模型允许使用者构建多节点的数据流,生成复杂的有向无环的拓扑结构。

Flume使用

Flume具有开箱即用的特性,以当前1.8为例,只需

  1. 具有8及以上版本jre环境
  2. 磁盘文件访问权限
  3. 足够的内存空间(尤其使用Memory Channel时)
  4. 足够的磁盘空间(尤其使用File Channel时)
  5. 有效的Flume配置文件 | flume-ng agent -n $agent_name -c conf -f $configfile | | :—- |

Flume配置文件

配置文件声明了agent内Source、Channel、Sink的配置信息,Event的流入流出方向,多个连通的agent的配置文件,描述了整体数据流的拓扑结构。
一个有效的Flume基本配置信息包含四个部分

组件声明

声明agent内各组件个数名称

a1.sources = r1 r2a1.sinks = k1a1.channels = c1

配置声明a1 agent中r1、r2两个Source,一个Sink k1,一个Channel c1。

Source配置

各Source的类型及其配置,使用的Source类型不同,配置参数也会不同

a1.sources.r1.type = thrifta1.sources.r1.channels = c1a1.sources.r1.bind = localhosta1.sources.r1.port = 44444a1.sources.r2.type = TAILDIRa1.sources.r2.channels = c1a1.sources.r2.positionFile = /var/log/flume/taildir_position.jsona1.sources.r2.filegroups = f1 f2a1.sources.r2.filegroups.f1 = /var/log/test1/example.loga1.sources.r2.headers.f1.headerKey1 = value1a1.sources.r2.filegroups.f2 = /var/log/test2/.log.a1.sources.r2.headers.f2.headerKey1 = value2a1.sources.r2.headers.f2.headerKey2 = value2-2a1.sources.r2.fileHeader = true

对组件声明内的两个Source进行配置,r1为ThriftSource,在44444端口监听thrift协议的数据流入,数据缓存入c1 Channel;r2为TaildirSource,以本机指定的文件为数据输入,并监听文件的写入,文件的读取进度行号保存在positionFile中,r2也以c1为Channel。
Flume预定义的Source包含Avro,Thrift,Exec,JMS,Spooling,Taildir,Twitter,Kafka,NetCat,Sequence Generator,Syslog,Http等类型。Avro Source与Avro Sink常用于连接多个agent。

Channel配置

Channel的类型配置,存储空间参数配置等

a1.channels.c1.type = memorya1.channels.c1.capacity = 10000a1.channels.c1.transactionCapacity = 10000a1.channels.c1.byteCapacityBufferPercentage = 20a1.channels.c1.byteCapacity = 800000

对组件声明的c1进行配置,类型为MemoryChannel,及数据缓存在内存中,使用MemoryChannel时特别需要注意机器应具备足够的空闲内存,agent故障重启后,Channel中的数据会丢失,但是MemoryChannel是所有Channel中最高效的。capacity及byteCapacity确定了Channel使用的存储空间大小,transactionCapacity则指定Sink事务一次可提取的Event数量。
Flume预定义的Channel包含Memory,JDBC,Kafka,File,Spillable等类型。当一个Source被冗余写入多个Channel时,产生了数据流备份,而多个Sink写入一个Channel时,产生了数据流合并。

Sink配置

Sink的类型配置,Sink的流向配置等

a1.sinks.k1.type = avroa1.sinks.k1.channel = c1a1.sinks.k1.hostname = 10.10.10.10a1.sinks.k1.port = 4545

对组件声明的k1进行配置,类型为AvroSink,数据流出到下游10.10.10.10:4545端口。
Flume预定义的Sink包含HDFS,Hive,Logger,Avro,Thrift,IRC,File Roll, Null,HBase,ElasticSearch,Kafka,HTTP等类型。多个Sink从Channel取数据时,根据下游数据流出的配置,如果Sink流出到不同的下游,则产生了数据流的分发。

额外配置

为了适应多样的数据流,还有一些额外配置

  • Flume Channel Selector,指明Event写入Channel的策略,默认为replicating,即冗余写入配置的Channel,从而实现了数据流的备份,Flume预定义的Selector还有multiplexing,可根据header值选择写入的Channel,用于数据分流;
  • File Sink Processor,将多个Sink绑定到一起,指定Sink对Event的处理行为,默认Default Sink Processor只接受一个Sink,还可选择Failover Sink Processor,言即将失败的Event fail-over到同组其他Sink中,还可使用LoadBalance Sink Processor,来做分发的负载均衡;
  • Event Serializers,Event的序列化类;
  • Flume Interceptors,用于Source接受Event后对Event的处理,如数据清洗,打标签,去标签等。

    Flume调优

    分布式系统中的调优,有集群优化及节点性能优化多种手段。

    集群优化

    以数据存取的速度排序来看,跨机架的数据流动性能优于跨机房数据流动,同节点上数据流动性能优于跨机架数据流动,最好是在进程内流动,也即是说,在设计Flume数据流拓扑结构的时候,除非必要,应:

  • 尽量减少数据流动次数,避免数据反复进出机架或机房;

  • 增加单节点对数据的处理深度,以减少数据流层次;
  • 水平扩展节点,提高集群吞吐。

    节点优化

    节点优化关注在agent的吞吐能力上。具体来讲,有以下几个方面的优化手段,同时我们还需要考虑到稳定性及数据安全性:
  1. 事务批量提交的Event数量。前面提到过,Sink使用事务进行数据向下游的递交,单次事务中提交的数据数量,是影响节点吞吐量的最主要原因。事务一次提交的Event数量越多,吞吐量越大,但同时也意味着,数据向下游流出的延迟越大,当发生事务回滚时,下游可能产生的重复记录就越多。Sink的配置中batch-size用于配置一次提交的Event数量(不同的Sink可能会有略微区别)。
    • Channel中transactionCapability参数(Sink的事务每次能从此Channel中提取的Event的数量的最大值)对单个事务提交的Event数量具有限制所用,batch-size不得大于transactionCapability。
    • 在一个数据流的有向无环图中,下游的数据节点的batch-size之应不小于上游数据节点的batch-size之和,否则,下游节点可能成为数据流通道流量的瓶颈,最好是二者相等,这样即平衡了上下游的吞吐能力,又减少了下游数据的延迟。
  2. Channel选择。Source和Sink在进行事务内数据的写入或提取时,都需要先打开通道,并完成数据的写入或提取后才可以允许事务确认,在上下游节点未发生阻塞的情况下,Channel的速度能决定数据在节点中的驻留时间,由于内存读写远超磁盘读写速度,更远超网络读写速度,因此,MemoryChannel在数据流动性能上要优于FileChannel,后者又优于Kafka和JDBCChannel。
    • 从安全性上来讲,MemoryChannel并不是一个好的选择,毕竟,在集群环境中,节点宕机还是相对常见的事情,故障发生后,节点Channel中由上游节点提交而尚未提交到下游节点的数据就丢失了。
  3. 并联。除了集群层次水平扩展节点外,Flume也允许在节点内进行并发线路的调整,通过增加Channel或Sink(增加数据流动并发能力)来提高agent吞吐,从安全性上考虑,尽量使用较小的batch-size,吞吐量上的不足,可以通过节点或节点内并发线路的增加来弥补。

如果使用了Serializer或Interceptor,也可能对集群的效率产生影响。

Flume开发

除了Flume开箱即用的一些组件,使用者还可通过实现模块的接口,扩展Flume的功能,只需将实现的模块jar文件丢到$FLUME_CLASSPATH目录下就能在配置文件中配置使用用户自定义模块。但是还是建议一个规范的文件结构管理非官方发布的组件:将组件文件放在$FLUME_HOME/plugins.d目录中,flume-ng命令会启动脚本在此目录中查找符合规范的模块,并添加到java的classpath路径中。plugins.d目录内模块文件子目录规范如下:

  1. lib - 实现模块的jar文件
  2. libext - 模块的依赖jar文件
  3. native - 模块依赖的native库,如.so文件

如:

plugins.d/ plugins.d/custom-source-1/ plugins.d/custom-source-1/lib/my-source.jar plugins.d/custom-source-1/libext/spring-core-2.5.6.jar plugins.d/custom-source-2/ plugins.d/custom-source-2/lib/custom.jar plugins.d/custom-source-2/native/gettext.so

事务

image.png

Agent内部原理

image.png

具体可参见下面文档
尚硅谷大数据技术之Flume笔记.pdf

面试题

  1. 你是如何实现 Flume 数据传输的监控的

    使用第三方框架 Ganglia 实时监控 Flume。

  2. Flume 的 的 Source ,Sink ,Channel 的作用?你们 Source 是什么类型?
    作用
    (1)Source 组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy,
    (2)Channel 组件对采集到的数据进行缓存,可以存放在 Memory 或 File 中。
    (3)Sink 组件是用于把数据发送到目的地的组件,目的地包括 HDFS、Logger、avro、thrift、ipc、file、Hbase、solr、自定义。
    我公司采用的 Source 类型为
    (1)监控后台日志:exec
    (2)监控后台产生日志的端口:netcat Exec spooldir

  3. Flume 的 的 Channel Selectors

image.png
image.pngChannel Selectors,可以让不同的项目日志通过不同的Channel到不同的Sink中去。官方文档上Channel Selectors 有两种类型:Replicating Channel Selector (default)和Multiplexing Channel Selector
这两种Selector的区别是:Replicating 会将source过来的events发往所有channel,而Multiplexing可以选择该发往哪些Channel。

  1. Flume 参数调优

  2. Source
    增加 Source 个(使用 Tair Dir Source 时可增加 FileGroups 个数)可以增大 Source 的读取数据的能力。例如:当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录,同时配置好多个 Source 以保证 Source 有足够的能力获取到新产生的数据。batchSize 参数决定 Source 一次批量运输到 Channel 的 event 条数,适当调大这个参数可以提高 Source 搬运 Event 到 Channel 时的性能。
    2. Channel
    type 选择 memory 时 Channel 的性能最好,但是如果 Flume 进程意外挂掉可能会丢失数据。type 选择 file 时 Channel 的容错性更好,但是性能上会比 memory channel 差。使用 file Channel 时 dataDirs 配置多个不同盘下的目录可以提高性能。Capacity 参数决定 Channel 可容纳最大的 event 条数。transactionCapacity 参数决定每
    次 Source 往 channel 里面写的最大 event 条数和每次 Sink 从 channel 里面读的最大 event条数。transactionCapacity 需要大于 Source 和 Sink 的 batchSize 参数。
    3. Sink
    增加 Sink 的个数可以增加 Sink 消费 event 的能力。Sink 也不是越多越好够用就行,过多的 Sink 会占用系统资源,造成系统资源不必要的浪费。batchSize 参数决定 Sink 一次批量从 Channel 读取的 event 条数,适当调大这个参数可以提高 Sink 从 Channel 搬出 event 的性能。
    image.png
    image.png5. Flume 的事务机制
    Flume 的事务机制(类似数据库的事务机制):Flume 使用两个独立的事务分别负责从Soucrce 到 Channel,以及从 Channel 到 Sink 的事件传递。比如 spooling directory source 为文件的每一行创建一个事件,一旦事务中所有的事件全部传递到 Channel 且提交成功,那 么 Soucrce 就将该文件标记为完成。同理,事务以类似的方式处理从 Channel 到 Sink 的传 递过程,如果因为某种原因使得事件无法记录,那么事务将会回滚。且所有的事件都会保持
    到 Channel 中,等待重新传递。

  3. Flume 采集数据会丢失吗?

根据 Flume 的架构原理,Flume 是不可能丢失数据的,其内部有完善的事务机制,Source 到 Channel 是事务性的,Channel 到 Sink 是事务性的,因此这两个环节不会出现数据的丢失,唯一可能丢失数据的情况是 Channel 采用 memoryChannel,agent 宕机导致数据丢失,或者 Channel 存储数据已满,导致 Source 不再写入,未写入的数据丢失。Flume 不会丢失数据,但是有可能造成数据的重复,例如数据已经成功由 Sink 发出,但是没有接收到响应,Sink 会再次发送数据,此时可能会导致数据的重复。
image.png