Flume概述
Flume定义
Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单

Flume 最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS
Flume基础架构
Flume组成架构 :

Agent
Agent 是一个 JVM进程,它以事件的形式将数据从源头送至目的
Agent主要有3个部分组成
- Source
- Channel
- Sink
Source
Source 是负责接收数据到 Flume Agent 的组件
Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、 taildir 、sequence generator、syslog、http、legacy
Sink
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义
Channel
Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink运作在不同的速率上
Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个 Sink 的读取操作
Flume自带两种Channel
- Memory Channel
- File Channel
Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失
File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据
Event
传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地
Event 由 Header 和 Body 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组

Flume入门
Flume安装部署
安装地址
- Flume官网地址:http://flume.apache.org/
- 文档查看地址:http://flume.apache.org/FlumeUserGuide.html
- 下载地址:http://archive.apache.org/dist/flume/
安装部署
将 apache-flume-1.9.0-bin.tar.gz 上传到 linux 的 /opt/software 目录下

解压 apache-flume-1.9.0-bin.tar.gz 到 /opt/module/ 目录下
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/module/

修改 apache-flume-1.9.0-bin 的名称为 flume-1.9.0
mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume-1.9.0

将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3
mv guava-11.0.2.jar guava-11.0.2.blk

配置环境变量
vim /etc/profile.d/my_env.sh
#FLUME_HOMEexport FLUME_HOME=/opt/module/flume-1.9.0export PATH=$PATH:$FLUME_HOME/bin

生效
source /etc/profile

Flume入门案例
监控端口数据官方案例
案例需求:
使用 Flume 监听一个端口,收集该端口数据,并打印到控制台
需求分析:

实现步骤:
- 安装netcat工具
sudo yum install -y nc

- 判断 44444 端口是否被占用
sudo netstat -nlp | grep 44444

- 创建 Flume Agent 配置文件 flume-netcat-logger.conf
- 在 flume 目录下创建 job 文件夹并进入 job 文件夹
mkdir job
cd job/

- 在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf
vim flume-netcat-logger.conf
- 在 flume-netcat-logger.conf 文件中添加
# Name the components on this agent# a1: agent 名# r1 : a1的 Source 名# k1: a1的 Sink 名# c1: a1的 Channel 名a1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the source# a1的输入源类型为 netcat 端口类型# a1的监听的主机# a1的监听的端口号a1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444# Describe the sink# a1 输出目的地是控制台 logger 类型a1.sinks.k1.type = logger# Use a channel which buffers events in memory# a1的channel 类型是 memory 内存型# al的channel 总容量 1000 个 event# al的channel 传输时收集到了100条 event 以后提交事务a1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel# 将 r1 和 c1 连接起来# 将 k1 和 cl 连接起来a1.sources.r1.channels = c1a1.sinks.k1.channel = c1

注:配置文件来源于官方手册 http://flume.apache.org/FlumeUserGuide.html
- 先开启 Flume 监听端口
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
参数说明:
- —conf/-c:表示配置文件存储在conf/目录
- —name/-n:表示给agent起名为a1
- —conf-file/-f:flume本次启动读取的配置文件是在job文件夹下的flume-telnet.conf文件。
- -Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括 : log、info、warn、error
- 使用 netcat 工具向本机的 44444 端口发送内容
nc localhost 44444
- 在 Flume 监听页面观察接收数据情况
nc hadoop102 44444,flume能否接收到?
实时监控单个追加文件
案例需求 :
实时监控Hive日志,并上传到HDFS中
需求分析:

实现步骤:
- Flume 要想将数据输出到 HDFS,依赖 Hadoop 相关jar包
检查 /etc/profile.d/my_env.sh 文件,确认 Hadoop 和 Java 环境变量配置正确
创建 flume-file-hdfs.conf 文件
创建文件
vim flume-file-hdfs.conf
注:要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于 Hive日志在 Linux 系统中所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行 Linux 命令来读取文件
添加如下内容
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://cpucode100:8020/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
注意:
对于所有与时间相关的转义序列,Event Header 中必须存在以 “ timestamp ” 的 key(除非 hdfs.useLocalTimeStamp 设置为 true,此方法会使用 TimestampInterceptor 自动添加 timestamp)
a3.sinks.k3.hdfs.useLocalTimeStamp = true
- 运行Flume
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
- 开启 Hadoop 和 Hive 并操作 Hive 产生日志
bin/hive
- 在HDFS上查看文件
实时监控目录下多个新文件
案例需求:
使用Flume监听整个目录的文件,并上传至HDFS
需求分析:

实现步骤:
- 创建配置文件 flume-dir-hdfs.conf
创建一个文件
vim flume-dir-hdfs.conf
添加如下内容
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
- 启动监控文件夹命令
说明:在使用Spooling Directory Source时,不要在监控目录中创建并持续修改文件;上传完成的文件会以.COMPLETED结尾;被监控文件夹每500毫秒扫描一次文件变动
- 向upload文件夹中添加文件
在 /opt/module/flume 目录下创建 upload 文件夹
向 upload 文件夹中添加文件
查看HDFS上的数据
实时监控目录下的多个追加文件
Exec source : 适用于监控一个实时追加的文件,不能实现断点续传
Spooldir Source : 适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步
Taildir Source : 适合用于监听多个实时追加的文件,并且能够实现断点续传
案例需求 :
使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS
需求分析:

实现步骤:
创建配置文件flume-taildir-hdfs.conf
创建一个文件
vim flume-taildir-hdfs.conf
添加如下内容
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume-1.9.0/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume-1.9.0/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume-1.9.0/files2/.*log.*
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://cpucode100:8020/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
- 启动监控文件夹命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
- 向files文件夹中追加内容
在 /opt/module/flume 目录下创建 files 文件夹
mkdir files
向 upload 文件夹中添加文件
echo hello >> file1.txt
echo atguigu >> file2.txt
- 查看HDFS上的数据
Taildir 说明:
Taildir Source 维护了一个 json 格式的 position File ,其会定期的往 position File中更新每个文件读取到的最新的位置,因此能够实现断点续传
Position File 的格式如下:
{
"inode":2496272,
"pos":12,
"file":"/opt/module/flume-1.9.0/files/file1.txt"
}
{
"inode":2496275,
"pos":12,
"file":"/opt/module/flume-1.9.0/files/file2.txt"
}
注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用inode 号码来识别文件
Flume进阶
Flume事务

Put事务流程
- doPut : 将批数据先写入临时缓冲区 putList
- doCommne : 存队列是否足够合并
- doRollback : channel不足, 回滚数据
Take事务
- doTake : 将数据取到临时缓冲区takeList, 并将数据发送到HDFS
- doCommit : 如果数据全部发送成功,则清除临时缓冲区takeList
- doRollback : 数据发送过程中如果出现异常, rollback 将临时缓冲区 takeList 中的数据归还给 channel 内存队列。
Flume Agent内部原理

ChannelSelector
ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)
ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel
SinkProcessor
SinkProcessor共有三种类型 :
- DefaultSinkProcessor
- LoadBalancingSinkProcessor
- FailoverSinkProcessor
DefaultSinkProcessor对应的是单个的Sink,
LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以错误恢复的功能
Flume拓扑结构

这种模式是将多个flume顺序连接起来了,从最初的source开始到最终sink传送的目的存储系统。此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点flume宕机,会影响整个传输系统。

Flume支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel中,或者将不同数据分发到不同的channel中,sink可以选择传送到不同的目的地。


Flume企业开发案例
自定义Interceptor
自定义Source
自定义Sink
Flume数据流监控
Ganglia 的安装与部署
Ganglia 三部分组成 :
- gmond
- gmetad
- gweb
gmond(Ganglia Monitoring Daemon): 一种轻量级服务,安装在每台需要收集指标数据的节点主机上。如 : CPU、内存、磁盘、网络和活跃进程的数据等
gmetad(Ganglia Meta Daemon): 整合所有信息,并以 RRD 格式存储至磁盘的服务
gweb(Ganglia Web)Ganglia可视化工具 : 利用浏览器显示 gmetad 所存储数据的 PHP 前端。以图表 展现集群的运行状态下收集的多种不同指标数据
操作 Flume 测试监控
启动Flume任务
bin/flume-ng agent \
-c conf/ \
-n a1 \
-f datas/netcat-flume-logger.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=ganglia \
-Dflume.monitoring.hosts=cpu101:8649
发送数据观察 ganglia 监测图
nc localhost 44444
| 字段(图表名称) | 字段含义 |
|---|---|
| EventPutAttemptCount | source 尝试写入 channel 的事件总数量 |
| EventPutSuccessCount | 成功写入 channel 且提交的事件总数量 |
| EventTakeAttemptCount | sink 尝试从 channel 拉取事件的总数量 |
| EventTakeSuccessCount | sink 成功读取的事件的总数量 |
| StartTime | channel 启动的时间(毫秒) |
| StopTime | channel 停止的时间(毫秒) |
| ChannelSize | 目前 channel 中事件的总数量 |
| ChannelFillPercentage | channel 占用百分比 |
| ChannelCapacity | channel 的容量 |
