Flume概述

Flume定义

Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单

Flume 入门 - 图1

Flume 最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS

Flume基础架构

Flume组成架构 :

Flume 入门 - 图2

Agent

Agent 是一个 JVM进程,它以事件的形式将数据从源头送至目的

Agent主要有3个部分组成

  • Source
  • Channel
  • Sink

Source

Source 是负责接收数据到 Flume Agent 的组件

Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、 taildir 、sequence generator、syslog、http、legacy

Sink

Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent

Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义

Channel

Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink运作在不同的速率上
Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个 Sink 的读取操作

Flume自带两种Channel

  • Memory Channel
  • File Channel

Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失

File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据

Event

传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地

Event 由 Header 和 Body 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组

Flume 入门 - 图3

Flume入门

Flume安装部署

安装地址

安装部署

将 apache-flume-1.9.0-bin.tar.gz 上传到 linux 的 /opt/software 目录下

Flume 入门 - 图4

解压 apache-flume-1.9.0-bin.tar.gz 到 /opt/module/ 目录下

  1. tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/module/

Flume 入门 - 图5

修改 apache-flume-1.9.0-bin 的名称为 flume-1.9.0

  1. mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume-1.9.0

Flume 入门 - 图6

将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3

  1. mv guava-11.0.2.jar guava-11.0.2.blk

Flume 入门 - 图7

配置环境变量

  1. vim /etc/profile.d/my_env.sh
  1. #FLUME_HOME
  2. export FLUME_HOME=/opt/module/flume-1.9.0
  3. export PATH=$PATH:$FLUME_HOME/bin

Flume 入门 - 图8

生效

  1. source /etc/profile

Flume 入门 - 图9

Flume入门案例

监控端口数据官方案例

案例需求:

使用 Flume 监听一个端口,收集该端口数据,并打印到控制台

需求分析:

Flume 入门 - 图10

实现步骤:

  1. 安装netcat工具
  1. sudo yum install -y nc

Flume 入门 - 图11

  1. 判断 44444 端口是否被占用
  1. sudo netstat -nlp | grep 44444

Flume 入门 - 图12

  1. 创建 Flume Agent 配置文件 flume-netcat-logger.conf
  2. 在 flume 目录下创建 job 文件夹并进入 job 文件夹
  1. mkdir job
  1. cd job/

Flume 入门 - 图13

  1. 在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf
  1. vim flume-netcat-logger.conf
  1. 在 flume-netcat-logger.conf 文件中添加
  1. # Name the components on this agent
  2. # a1: agent 名
  3. # r1 : a1的 Source 名
  4. # k1: a1的 Sink 名
  5. # c1: a1的 Channel 名
  6. a1.sources = r1
  7. a1.sinks = k1
  8. a1.channels = c1
  9. # Describe/configure the source
  10. # a1的输入源类型为 netcat 端口类型
  11. # a1的监听的主机
  12. # a1的监听的端口号
  13. a1.sources.r1.type = netcat
  14. a1.sources.r1.bind = localhost
  15. a1.sources.r1.port = 44444
  16. # Describe the sink
  17. # a1 输出目的地是控制台 logger 类型
  18. a1.sinks.k1.type = logger
  19. # Use a channel which buffers events in memory
  20. # a1的channel 类型是 memory 内存型
  21. # al的channel 总容量 1000 个 event
  22. # al的channel 传输时收集到了100条 event 以后提交事务
  23. a1.channels.c1.type = memory
  24. a1.channels.c1.capacity = 1000
  25. a1.channels.c1.transactionCapacity = 100
  26. # Bind the source and sink to the channel
  27. # 将 r1 和 c1 连接起来
  28. # 将 k1 和 cl 连接起来
  29. a1.sources.r1.channels = c1
  30. a1.sinks.k1.channel = c1

Flume 入门 - 图14

注:配置文件来源于官方手册 http://flume.apache.org/FlumeUserGuide.html

  1. 先开启 Flume 监听端口
  1. bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
  1. bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

参数说明:

  • —conf/-c:表示配置文件存储在conf/目录
  • —name/-n:表示给agent起名为a1
  • —conf-file/-f:flume本次启动读取的配置文件是在job文件夹下的flume-telnet.conf文件。
  • -Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括 : log、info、warn、error
  1. 使用 netcat 工具向本机的 44444 端口发送内容
  1. nc localhost 44444
  1. 在 Flume 监听页面观察接收数据情况

nc hadoop102 44444,flume能否接收到?

实时监控单个追加文件

案例需求 :

实时监控Hive日志,并上传到HDFS中

需求分析:

Flume 入门 - 图15

实现步骤:

  1. Flume 要想将数据输出到 HDFS,依赖 Hadoop 相关jar包

检查 /etc/profile.d/my_env.sh 文件,确认 Hadoop 和 Java 环境变量配置正确

创建 flume-file-hdfs.conf 文件

创建文件

  1. vim flume-file-hdfs.conf

注:要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于 Hive日志在 Linux 系统中所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行 Linux 命令来读取文件

添加如下内容

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://cpucode100:8020/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

注意:

对于所有与时间相关的转义序列,Event Header 中必须存在以 “ timestamp ” 的 key(除非 hdfs.useLocalTimeStamp 设置为 true,此方法会使用 TimestampInterceptor 自动添加 timestamp)

a3.sinks.k3.hdfs.useLocalTimeStamp = true
  1. 运行Flume
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
  1. 开启 Hadoop 和 Hive 并操作 Hive 产生日志
bin/hive
  1. 在HDFS上查看文件

实时监控目录下多个新文件

案例需求:

使用Flume监听整个目录的文件,并上传至HDFS

需求分析:

Flume 入门 - 图16

实现步骤:

  1. 创建配置文件 flume-dir-hdfs.conf

创建一个文件

vim flume-dir-hdfs.conf

添加如下内容

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
  1. 启动监控文件夹命令

说明:在使用Spooling Directory Source时,不要在监控目录中创建并持续修改文件;上传完成的文件会以.COMPLETED结尾;被监控文件夹每500毫秒扫描一次文件变动

  1. 向upload文件夹中添加文件

在 /opt/module/flume 目录下创建 upload 文件夹

向 upload 文件夹中添加文件

查看HDFS上的数据

实时监控目录下的多个追加文件

Exec source : 适用于监控一个实时追加的文件,不能实现断点续传

Spooldir Source : 适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步

Taildir Source : 适合用于监听多个实时追加的文件,并且能够实现断点续传

案例需求 :

使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS

需求分析:

Flume 入门 - 图17

实现步骤:

创建配置文件flume-taildir-hdfs.conf

创建一个文件

vim flume-taildir-hdfs.conf

添加如下内容

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume-1.9.0/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume-1.9.0/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume-1.9.0/files2/.*log.*

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://cpucode100:8020/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
  1. 启动监控文件夹命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
  1. 向files文件夹中追加内容

在 /opt/module/flume 目录下创建 files 文件夹

mkdir files

向 upload 文件夹中添加文件

echo hello >> file1.txt
echo atguigu >> file2.txt
  1. 查看HDFS上的数据

Taildir 说明:

Taildir Source 维护了一个 json 格式的 position File ,其会定期的往 position File中更新每个文件读取到的最新的位置,因此能够实现断点续传

Position File 的格式如下:

{
    "inode":2496272,
    "pos":12,
    "file":"/opt/module/flume-1.9.0/files/file1.txt"
}
{
    "inode":2496275,
    "pos":12,
    "file":"/opt/module/flume-1.9.0/files/file2.txt"
}

注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用inode 号码来识别文件

Flume进阶

Flume事务

Flume 入门 - 图18

Put事务流程

  • doPut : 将批数据先写入临时缓冲区 putList
  • doCommne : 存队列是否足够合并
  • doRollback : channel不足, 回滚数据

Take事务

  • doTake : 将数据取到临时缓冲区takeList, 并将数据发送到HDFS
  • doCommit : 如果数据全部发送成功,则清除临时缓冲区takeList
  • doRollback : 数据发送过程中如果出现异常, rollback 将临时缓冲区 takeList 中的数据归还给 channel 内存队列。

Flume Agent内部原理

Flume 入门 - 图19

ChannelSelector

ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)

ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel

SinkProcessor

SinkProcessor共有三种类型 :

  • DefaultSinkProcessor
  • LoadBalancingSinkProcessor
  • FailoverSinkProcessor

DefaultSinkProcessor对应的是单个的Sink,

LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以错误恢复的功能

Flume拓扑结构

Flume 入门 - 图20

这种模式是将多个flume顺序连接起来了,从最初的source开始到最终sink传送的目的存储系统。此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点flume宕机,会影响整个传输系统。

Flume 入门 - 图21

Flume支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel中,或者将不同数据分发到不同的channel中,sink可以选择传送到不同的目的地。

Flume 入门 - 图22

Flume 入门 - 图23

Flume企业开发案例

自定义Interceptor

自定义Source

自定义Sink

Flume数据流监控

Ganglia 的安装与部署

Ganglia 三部分组成 :

  • gmond
  • gmetad
  • gweb

gmond(Ganglia Monitoring Daemon): 一种轻量级服务,安装在每台需要收集指标数据的节点主机上。如 : CPU、内存、磁盘、网络和活跃进程的数据等

gmetad(Ganglia Meta Daemon): 整合所有信息,并以 RRD 格式存储至磁盘的服务

gweb(Ganglia Web)Ganglia可视化工具 : 利用浏览器显示 gmetad 所存储数据的 PHP 前端。以图表 展现集群的运行状态下收集的多种不同指标数据

操作 Flume 测试监控

启动Flume任务

bin/flume-ng agent \
-c conf/ \
-n a1 \
-f datas/netcat-flume-logger.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=ganglia \
-Dflume.monitoring.hosts=cpu101:8649

发送数据观察 ganglia 监测图

nc localhost 44444
字段(图表名称) 字段含义
EventPutAttemptCount source 尝试写入 channel 的事件总数量
EventPutSuccessCount 成功写入 channel 且提交的事件总数量
EventTakeAttemptCount sink 尝试从 channel 拉取事件的总数量
EventTakeSuccessCount sink 成功读取的事件的总数量
StartTime channel 启动的时间(毫秒)
StopTime channel 停止的时间(毫秒)
ChannelSize 目前 channel 中事件的总数量
ChannelFillPercentage channel 占用百分比
ChannelCapacity channel 的容量