1.Flume

1.1 Flume 介绍

  1. Flumecloudera开发的后来贡献给了Apache的一套用分布式,高可靠的,高可用的海量分布式日志采集、聚合和传输的系统,将大量日志数据从许多不同的源移动到一个集中的数据存储。<br /> Apache Flume的使用不仅仅局限于日志数据聚合。由于数据源是可定制的,Flume可以用于传输大量事件数据,包括但不限于网络流量数据、社交媒体生成的数据、电子邮件消息和几乎所有可能的数据源。

Flume的特点:
(1)可以和任意集中式存储进行集成(HDFS,HBASE)
(2)输入的数据速率大于写入存储目的地速率,flume会进行缓冲
(3)flume提供上下文路由(数据流路线)
(4) Flume中的事物基于channel,使用了两个事物模型(sender+receiver),确保消息被可靠发送
(5) flume是 可靠的,容错的,可扩展的。
(6) flume易实现只需要简单的配置数据源以及存储端的相关信息即可,不需要写复杂的代码!!!

1.2 Flume组件

  1. Flume 实现日志文件的采集,其主要是不断地去监听日志文件的变化,最后将日志文件中变化的数据流采集到HDFS中。如下图所示,Flume主要由3个部分组成:
  • Event:消息的基本单位,有header和body组成
  • Agent:JVM进程,负责将一端外部来源产生的消息转 发到另一端外部的目的地
    • Source:从外部来源读入event,并写入channel
    • Channel:event暂存组件,source写入后,event将会 一直保存,
    • Sink:从channel读入event,并写入目的地

Flume-日志采集工具 - 图1

  1. Flume到底是如何工作的呢?主要还是通过上面原型图将源源不断地数据流存储到外部,对于一条数据也就是事件流如下图所示实现采集

Flume-日志采集工具 - 图2
对于数据流,则是将不断产生的事件流最后进行汇总即可,如下所示:

Flume-日志采集工具 - 图3

2. Flume实践

2.1. 配置文件

  1. Flume的实现则是通过配置conf文件即可完成数据的采集。如下所示,是一个flume配置文件。

Flume采集到HDFS

  1. # a2 则为 这次 Flume 任务agent的名称
  2. # r2 则为 source 名称
  3. # k2 - sink 名称
  4. # c2 - channel 名称
  5. a2.sources = r2
  6. a2.sinks = k2
  7. a2.channels = c2
  8. # 配置source的信息
  9. # type - source 类型
  10. # command - 监听命令 监听的文件 XXX/XXX.log
  11. a2.sources.r2.type = exec
  12. a2.sources.r2.command = tail -F XXX/XXX.log
  13. # 配置sink信息
  14. # type - 流出存储地
  15. # hdfs.path - hdfs 地址 数据按时间分区ds进行存放
  16. # rollInterval 按时间生成 HDFS 文件
  17. # rollCount:按写入的 event 的个数触发 roll,生成新的 HDFS 文件
  18. # fileType:指定保存到 HDFS 文件系统上的文件类型
  19. a2.sinks.k2.type = hdfs
  20. a2.sinks.k2.hdfs.path = hdfs://XXXXhdfsIP和端口/数据存储地址
  21. a2.sinks.k2.hdfs.rollInterval = 0
  22. a2.sinks.k2.hdfs.tollSize = 1024000
  23. a2.sinks.k2.hdfs.rollCount = 0
  24. a2.sinks.k2.hdfs.fileType = DataStream
  25. # 时间拦截
  26. a2.sinks.k2.hdfs.useLocalTimeStamp = true
  27. a2.sinks.k2.hdfs.callTimeout = 60000
  28. # 设置channel type 以及 存储信息
  29. a2.channels.c2.type = memory
  30. a2.channels.c2.capacity = 1000
  31. a2.channels.c2.transactionCapacity = 100
  32. # 将sink和source通过channel 连接起来
  33. a2.sources.r2.channels = c2
  34. a2.sinks.k2.channel = c2

2.1.1 Source 参数说明

对接各种外部数据源,将收集到的事件发送到Channel中,一个source可以向多个channel发送event,Flume内置非常丰富的Source,同时用户可以自定义Source。针对不同的Source来不同的参数配置,下面给出了一些常用参数。

Source类型 Type 用途
Avro Source avro 启动一个Avro Server,可与上一级Agent连接
Exec Source exec 执行unix command,获取标准输出,如tail -f
Taildir Source TAILDIR 监听目录或文件
Spooling Directory Source spooldir 监听目录下的新增文件
Kafka Source org.apache.flume.sourc e.kafka.KafkaSource 读取Kafka数据
JMS Source jms 从JMS源读取数据

NetCat Source

  • NetCat Source可以使用TCP和UDP两种协议方式,使用方法基本相同,通过监听指定的IP和端口来传输数据,它会将监听到的每一行数据转化成一个Event写入到Channel中。

    1. Property Name Default Description
    2. channels@
    3. type@ 类型指定为:netcat
    4. bind@ 绑定机器名或IP地址
    5. port@ 端口号
    6. max-line-length 512 一行的最大字节数
    7. ack-every-event true 对成功接受的Event返回OK
    8. selector.type replicating 选择器类型replicating or multiplexing
    9. selector.* 选择器相关参数
    10. interceptors 拦截器列表,多个以空格分隔
    11. interceptors.* 拦截器相关参数

    Avro Source

  • 不同主机上的Agent通过网络传输数据可使用的Source,一般是接受Avro client的数据,或和是上一级Agent的Avro Sink成对存在。

    1. Property Name Default Description
    2. channels@
    3. type@ 类型指定为:avro
    4. bind@ 监听的主机名或IP地址
    5. port@ 端口号
    6. threads 传输可使用的最大线程数
    7. selector.type
    8. selector.*
    9. interceptors 拦截器列表
    10. interceptors.*
    11. compression-type none 可设置为“none deflate”. 压缩类型需要和AvroSource匹配

    Exec Source
    Exec source通过执行给定的Unix命令的传输结果数据,如,cat,tail -F等,实时性比较高,但是一旦Agent进程出现问题,可能会导致数据的丢失。

    1. Property Name Default Description
    2. channels@
    3. type@ 类型指定为:exec
    4. command@ 需要去执行的命令
    5. shell 运行命令的shell脚本文件
    6. restartThrottle 10000 尝试重启的超时时间
    7. restart false 如果命令执行失败,是否重启
    8. logStdErr false 是否记录错误日志
    9. batchSize 20 批次写入channel的最大日志数量
    10. batchTimeout 3000 批次写入数据的最大等待时间(毫秒)
    11. selector.type replicating 选择器类型replicating or multiplexing
    12. selector.* 选择器其他参数
    13. interceptors 拦截器列表,多个空格分隔
    14. interceptors.*

    Spooling Directory Source

  • 通过监控一个文件夹将新增文件内容转换成Event传输数据,特点是不会丢失数据,使用Spooling Directory Source需要注意的两点是,1)不能对被监控的文件夹下的新增的文件做出任何更改,2)新增到监控文件夹的文件名称必须是唯一的。由于是对整个新增文件的监控,Spooling Directory Source的实时性相对较低,不过可以采用对文件高粒度分割达到近似实时。

    1. Property Name Default Description
    2. channels@
    3. type@ 类型指定:spooldir.
    4. spoolDir@ 被监控的文件夹目录
    5. fileSuffix .COMPLETED 完成数据传输的文件后缀标志
    6. deletePolicy never 删除已经完成数据传输的文件时间:never or immediate
    7. fileHeader false 是否在header中添加文件的完整路径信息
    8. fileHeaderKey file 如果header中添加文件的完整路径信息时key的名称
    9. basenameHeader false 是否在header中添加文件的基本名称信息
    10. basenameHeaderKey basename 如果header中添加文件的基本名称信息时key的名称
    11. includePattern ^.*$ 使用正则来匹配新增文件需要被传输数据的文件
    12. ignorePattern ^$ 使用正则来忽略新增的文件
    13. trackerDir .flumespool 存储元数据信息目录
    14. consumeOrder oldest 文件消费顺序:oldest, youngest and random.
    15. maxBackoff 4000 如果channel容量不足,尝试写入的超时时间,如果仍然不能写入,则会抛出ChannelException
    16. batchSize 100 批次处理粒度
    17. inputCharset UTF-8 输入码表格式
    18. decodeErrorPolicy FAIL 遇到不可解码字符后的处理方式:FAILREPLACEIGNORE
    19. selector.type replicating 选择器类型:replicating or multiplexing
    20. selector.* 选择器其他参数
    21. interceptors 拦截器列表,空格分隔
    22. interceptors.*

    Taildir Source

  • 可以实时的监控指定一个或多个文件中的新增内容,由于该方式将数据的偏移量保存在一个指定的json文件中,即使在Agent挂掉或被kill也不会有数据的丢失,需要注意的是,该Source不能在Windows上使用。

    1. Property Name Default Description
    2. channels@
    3. type@ 指定类型:TAILDIR.
    4. filegroups@ 文件组的名称,多个空格分隔
    5. filegroups.<filegroupName>@ 被监控文件的绝对路径
    6. positionFile ~/.flume/taildir_position.json 存储数据偏移量路径
    7. headers.<filegroupName>.<headerKey> Header key的名称
    8. byteOffsetHeader false 是否添加字节偏移量到key为‘byteoffset’值中
    9. skipToEnd false 当偏移量不能写入到文件时是否跳到文件结尾
    10. idleTimeout 120000 关闭没有新增内容的文件超时时间(毫秒)
    11. writePosInterval 3000 positionfile 写入每一个文件lastposition的时间间隔
    12. batchSize 100 批次处理行数
    13. fileHeader false 是否添加header存储文件绝对路径
    14. fileHeaderKey file fileHeader启用时,使用的key

    2.1.2 Channels参数说明

    Memory Channel

  • Memory Channel是使用内存来存储Event,使用内存的意味着数据传输速率会很快,但是当Agent挂掉后,存储在Channel中的数据将会丢失。

    1. Property Name Default Description
    2. type@ 类型指定为:memory
    3. capacity 100 存储在channel中的最大容量
    4. transactionCapacity 100 从一个source中去或者给一个sink,每个事务中最大的事件数
    5. keep-alive 3 对于添加或者删除一个事件的超时的秒钟
    6. byteCapacityBufferPercentage 20 定义缓存百分比
    7. byteCapacity see description Channel中允许存储的最大字节总数

    File Channel

  • File Channel使用磁盘来存储Event,速率相对于Memory Channel较慢,但数据不会丢失

    1. Property Name Default Description
    2. type@ 类型指定:file.
    3. checkpointDir ~/.flume/file-channel/checkpoint checkpoint目录
    4. useDualCheckpoints false 备份checkpoint,为TruebackupCheckpointDir必须设置
    5. backupCheckpointDir 备份checkpoint目录
    6. dataDirs ~/.flume/file-channel/data 数据存储所在的目录设置
    7. transactionCapacity 10000 Event存储最大值
    8. checkpointInterval 30000 checkpoint间隔时间
    9. maxFileSize 2146435071 单一日志最大设置字节数
    10. minimumRequiredSpace 524288000 最小的请求闲置空间(以字节为单位)
    11. capacity 1000000 Channel最大容量
    12. keep-alive 3 一个存放操作的等待时间值(秒)
    13. use-log-replay-v1 false Expert: 使用老的回复逻辑
    14. use-fast-replay false Expert: 回复不需要队列
    15. checkpointOnClose true

    2.1.3 Sink参数说明

    Flume常用Sinks有Log Sink,HDFS Sink,Avro Sink,Kafka Sink,当然也可以自定义Sink。
    Logger Sink

  • Logger Sink以INFO 级别的日志记录到log日志中,这种方式通常用于测试。

    1. Property Name Default Description
    2. channel@
    3. type 类型指定:logger
    4. maxBytesToLog 16 能够记录的最大Event Body字节数

    HDFS Sink

  • Sink数据到HDFS,目前支持text 和 sequence files两种文件格式,支持压缩,并可以对数据进行分区,分桶存储。 ```shell Name Default Description channel@ –
    type@ – 指定类型:hdfs hdfs.path@ – HDFS的路径,eg hdfs://namenode/flume/webdata/ hdfs.filePrefix FlumeData 保存数据文件的前缀名 hdfs.fileSuffix – 保存数据文件的后缀名 hdfs.inUsePrefix – 临时写入的文件前缀名 hdfs.inUseSuffix .tmp 临时写入的文件后缀名 hdfs.rollInterval 30 间隔多长将临时文件滚动成最终目标文件,单位:秒,

    1. 如果设置成0,则表示不根据时间来滚动文件

    hdfs.rollSize 1024 当临时文件达到多少(单位:bytes)时,滚动成目标文件,

    1. 如果设置成0,则表示不根据临时文件大小来滚动文件

    hdfs.rollCount 10 当 events 数据达到该数量时候,将临时文件滚动成目标文件,

    1. 如果设置成0,则表示不根据events数据来滚动文件

    hdfs.idleTimeout 0 当目前被打开的临时文件在该参数指定的时间(秒)内,

    1. 没有任何数据写入,则将该临时文件关闭并重命名成目标文件

    hdfs.batchSize 100 每个批次刷新到 HDFS 上的 events 数量 hdfs.codeC – 文件压缩格式,包括:gzip, bzip2, lzo, lzop, snappy hdfs.fileType SequenceFile 文件格式,包括:SequenceFile, DataStream,CompressedStre,

    1. 当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC;
    2. 当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值;

    hdfs.maxOpenFiles 5000 最大允许打开的HDFS文件数,当打开的文件数达到该值,

    1. 最早打开的文件将会被关闭

    hdfs.minBlockReplicas – HDFS副本数,写入 HDFS 文件块的最小副本数。

    1. 该参数会影响文件的滚动配置,一般将该参数配置成1,才可以按照配置正确滚动文件

    hdfs.writeFormat Writable 写 sequence 文件的格式。包含:Text, Writable(默认) hdfs.callTimeout 10000 执行HDFS操作的超时时间(单位:毫秒) hdfs.threadsPoolSize 10 hdfs sink 启动的操作HDFS的线程数 hdfs.rollTimerPoolSize 1 hdfs sink 启动的根据时间滚动文件的线程数 hdfs.kerberosPrincipal – HDFS安全认证kerberos配置 hdfs.kerberosKeytab – HDFS安全认证kerberos配置 hdfs.proxyUser 代理用户 hdfs.round false 是否启用时间上的”舍弃” hdfs.roundValue 1 时间上进行“舍弃”的值 hdfs.roundUnit second 时间上进行”舍弃”的单位,包含:second,minute,hour hdfs.timeZone Local Time 时区。 hdfs.useLocalTimeStamp false 是否使用当地时间 hdfs.closeTries 0 Number hdfs sink 关闭文件的尝试次数;

    1. 如果设置为1,当一次关闭文件失败后,hdfs sink将不会再次尝试关闭文件,
    2. 这个未关闭的文件将会一直留在那,并且是打开状态;
    3. 设置为0,当一次关闭失败后,hdfs sink会继续尝试下一次关闭,直到成功

    hdfs.retryInterval 180 hdfs sink 尝试关闭文件的时间间隔,

    1. 如果设置为0,表示不尝试,相当于于将hdfs.closeTries设置成1

    serializer TEXT 序列化类型 serializer.*

  1. **Avro Sink**
  2. ```shell
  3. Property Name Default Description
  4. channel@ –
  5. type@ – 指定类型:avro.
  6. hostname@ – 主机名或IP
  7. port@ – 端口号
  8. batch-size 100 批次处理Event数
  9. connect-timeout 20000 连接超时时间
  10. request-timeout 20000 请求超时时间
  11. compression-type none 压缩类型,“none” or “deflate”.
  12. compression-level 6 压缩级别,0表示不压缩,1-9数字越大,压缩比越高
  13. ssl false 使用ssl加密

Kafka Sink

  • 传输数据到Kafka中,需要注意的是Flume版本和Kafka版本的兼容性

    1. Property Name Default Description
    2. type 指定类型:org.apache.flume.sink.kafka.KafkaSink
    3. kafka.bootstrap.servers kafka服务地址
    4. kafka.topic default-flume-topic kafka Topic
    5. flumeBatchSize 100 批次写入kafka Event
    6. kafka.producer.acks 1 多少个副本确认后才能确定消息传递成功,0表示不需要确认
    7. 1表示只需要首要的副本得到确认,-1表示等待所有确认。

    2.2 运行Flume

    1. flume-ng agent --conf flume-conf 配置路径 --conf-file 采集任务配置文件 -name agent -Dflume.root.logger=INFO,console

    如下图所示,说明Flume开始了采集任务,
    image.png
    如下图,是通过jar包产生的一组测试数据,当flume开始工作的时候,会不断输出如下所示数据流信息,同时将采集到数据存储到hdfs路径下
    对于flume任务来说,在设定时间限制的情况下,不需要专门的去停止flume任务,否则利用ctrl+z强制停止,然后再使用kill 命令去kill掉进程,ps -ef | grep XXX.conf 利用该命令去查询相应的进程 然后kill即可。

    主要是第一次使用Flume工具,因此,有很多的不懂,整个过程虽然简单,但是还是磕磕绊绊吧。下面篇我们利用Java来产生大量日志数据(继而调用相应的jar包 并通过flume监听 采集到HDFS中)。