- 三、基础应用
- a1是agent的名称。source、channel、sink的名称分别为:r1 c1 k1
- source
- channel
- sink
- 指定 source、channel、sink 之间的关系
- Name the components on this agent
- Describe/configure the source
- Use a channel which buffers events in memory
- Describe the sink(9000是NameNode的地址,也可能是8020,具体看HDFS的配置文件)
- 上传文件的前缀
- 是否使用本地时间戳
- 积攒500个Event才flush到HDFS一次
- 设置文件类型,支持压缩。DataStream不启用压缩
- 1分钟滚动一次
- 128M滚动一次
- 文件的滚动与Event数量无关
- 最小冗余数(建议值:1)
- Bind the source and sink to the channel
- Name the components on this agent
- Describe/configure the source
- Flume Source监控的文件夹目录,该目录下的文件会被Flume收集
- 忽略以.tmp结尾的文件,不上传
- Use a channel which buffers events in memory
- Describe the sink
- 上传文件的前缀
- 是否使用本地时间戳
- 积攒500个Event,flush到HDFS一次
- 设置文件类型
- 60秒滚动一次
- 128M滚动一次
- 文件滚动与event数量无关
- 最小冗余数
- Bind the source and sink to the channel
- Name the components on this agent
- 将数据流复制给所有channel
- source
- 记录每个文件最新消费位置
- 备注:.log 是正则表达式;这里写成 .log 是错误的
- sink
- channel
- Bind the source and sink to the channel
- Name the components on this agent
- Describe/configure the source
- Describe the sink
- 目录需要提前创建好
- Describe the channel
- Bind the source and sink to the channel
三、基础应用
- Flume 支持的数据源种类有很多,可以来自directory、http、kafka等。Flume提供了Source组件用来采集数据源常见的 Source 有:
- avro source:监听 Avro 端口来接收外部 avro 客户端的事件流。avro-source接收到的是经过avro序列化后的数据,然后反序列化数据继续传输。如果是avro source的话,源数据必须是经过avro序列化后的数据。利用Avro source可以实现多级流动、扇出流、扇入流等效果。接收通过flume提供的avro客户端发送的日志信息
- Avro是Hadoop的一个数据序列化系统,由Hadoop的创始人Doug Cutting(也是Lucene,Nutch等项目的创始人)开发,设计用于支持大批量数据交换的应用。它的主要特点有:
- 支持二进制序列化方式,可以便捷,快速地处理大量数据
- 动态语言友好,Avro提供的机制使动态语言可以方便地处理Avro数据
- Avro是Hadoop的一个数据序列化系统,由Hadoop的创始人Doug Cutting(也是Lucene,Nutch等项目的创始人)开发,设计用于支持大批量数据交换的应用。它的主要特点有:
- exec source:可以将命令产生的输出作为source。如ping 192.168.234.163、tail -f hive.log
- netcat source:一个NetCat Source用来监听一个指定端口,并接收监听到的数据。
- spooling directory source:将指定的文件加入到“自动搜集”目录中。flume会持续监听这个目录,把文件当做source来处理(可断点续传)。
- 注意:一旦文件被放到目录中后,便不能修改,如果修改,flume会报错。此外,也不能有重名的文件
- Taildir Source(1.7.0开始出现):监控指定的多个文件,一旦文件内有新写入的数据,就会将其写入到指定的sink内,本来源可靠性高,不会丢失数据(支持断点续传)。
- 优点:其不会对于跟踪的文件有任何处理,不会重命名也不会删除,不会做任何修改
- 缺点:目前不支持Windows系统(没关系,用Linux),不支持读取二进制文件(没关系,读取的日志一般是文本文件),支持一行一行的读取文本文件
- avro source:监听 Avro 端口来接收外部 avro 客户端的事件流。avro-source接收到的是经过avro序列化后的数据,然后反序列化数据继续传输。如果是avro source的话,源数据必须是经过avro序列化后的数据。利用Avro source可以实现多级流动、扇出流、扇入流等效果。接收通过flume提供的avro客户端发送的日志信息
采集到的日志需要进行缓存,Flume提供了Channel组件用来缓存数据。常见的 Channel 有:
- (1)memory channel:缓存到内存中(最常用)
- (2)file channel:缓存到文件中(磁盘)
- (3)JDBC channel:通过JDBC缓存到关系型数据库中
- (4)kafka channel:缓存到kafka中
缓存的数据最终需要进行保存,Flume提供了Sink组件用来保存数据。常见的 Sink 有:
- (1)logger sink:将信息显示在标准输出上,主要用于测试
- (2)avro sink:Flume events发送到sink,转换为Avro events,并发送到配置好的hostname/port。从配置好的channel按照配置好的批量大小批量获取events
- (3)null sink:将接收到events全部丢弃
- (4)HDFS sink:将 events 写进HDFS。支持创建文本和序列文件,支持两种文件类型压缩。文件可以基于数据的经过时间、大小、事件的数量周期性地滚动
- (5)Hive sink:该sink streams 将包含分割文本或者JSON数据的events直接传送到Hive表或分区中。使用Hive事务写events。当一系列events提交到Hive时,它们马上可以被Hive查询到
- (6)HBase sink:保存到HBase中
- (7)kafka sink:保存到kafka中
日志采集就是根据业务需求选择合适的Source、Channel、Sink,并将其组合在一起
1. 入门案例
中文flume帮助文档
业务需求:监听本机 8888 端口,Flume将监听的数据实时显示在控制台
需求分析**:**
- 使用 telnet 工具可以向 8888 端口发送数据
- 监听端口数据,选择 netcat source
- channel 选择 memory channel
- 数据实时显示,选择 logger sink
实现步骤:
1、安装 telnet 工具
yum install telnet
2、检查 8888 端口是否被占用
- 如果该端口被占用,可以选择使用其他端口完成任务
lsof -i:8888
3、创建 Flume Agent 配置文件
flume-netcat-logger.conf
- 路径:
$FLUME_HOME/conf/
```sqla1是agent的名称。source、channel、sink的名称分别为:r1 c1 k1
a1.sources = r1 a1.channels = c1 a1.sinks = k1
source
a1.sources.r1.type = netcat a1.sources.r1.bind = linux123 a1.sources.r1.port = 8888
channel
a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100
sink
a1.sinks.k1.type = logger
指定 source、channel、sink 之间的关系
a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 注:**Memory Channel **是使用内存缓冲Event的Channel实现。速度比较快速,容量会受到 jvm 内存大小的限制,可靠性不够高。**适用于允许丢失数据,但对性能要求较高的日志采集业务**。
**4、启动Flume Agent**
```shell
# 符号 \ 代表:在linux中还没输完命令,换下一行再继续输入
$FLUME_HOME/bin/flume-ng agent --name a1 \
--conf-file $FLUME_HOME/conf/flume-netcat-logger.conf \
-Dflume.root.logger=INFO,console
# 或者:
cd $FLUME_HOME
./bin/flume-ng agent --name a1 \
--conf-file conf/flume-netcat-logger.conf \
-Dflume.root.logger=INFO,console
5、使用 telnet 向本机的 8888 端口发送消息
telnet linux123 8888
6、在 Flume 监听页面查看数据接收情况
INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 0D
hello world. }
INFO sink.LoggerSink: Event: { headers:{} body: 41 72 65 20 79 6F 75 20 6F 6B 3F 0D
Are you ok?. }
2. 监控日志文件信息到HDFS
业务需求:监控本地日志文件,收集内容实时上传到HDFS
需求分析:
- 使用 tail -F 命令即可找到本地日志文件产生的信息
- source 选择 exec。exec 监听一个指定的命令,获取命令的结果作为数据源。source组件从这个命令的结果中取数据。当agent进程挂掉重启后,可能存在数据丢失;
- channel 选择 memory channel
- sink 选择 HDFS ```shell tail -f 等同于 —follow=descriptor,根据文件描述符进行追踪,当文件改名或被删除,追踪停止
tail -F 等同于 —follow=name —retry,根据文件名进行追踪,并保持重试,即该文件被删除或改名后,如果再次创建相同的文件名,会继续追踪
**实现步骤:**<br />**<br />**1、环境准备**
- Flume要想将数据输出到HDFS,必须持有Hadoop相关jar包。将以下文件拷贝至 `$FLUME_HOME/lib` 文件夹下
- 在 `$HADOOP_HOME/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib` 有这些文件
- `commons-configuration-1.6.jar`
- `hadoop-auth-2.9.2.jar`
- `hadoop-common-2.9.2.jar`
- `hadoop-hdfs-2.9.2.jar`
- `commons-io-2.4.jar`
- `htrace-core4-4.1.0-incubating.jar`
```shell
# 在 $HADOOP_HOME/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib 有这些文件
cd $HADOOP_HOME/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib
cp commons-configuration-1.6.jar $FLUME_HOME/lib/
cp hadoop-auth-2.9.2.jar $FLUME_HOME/lib/
cp hadoop-common-2.9.2.jar $FLUME_HOME/lib/
cp hadoop-hdfs-2.9.2.jar $FLUME_HOME/lib/
cp commons-io-2.4.jar $FLUME_HOME/lib/
cp htrace-core4-4.1.0-incubating.jar $FLUME_HOME/lib/
2、创建配置文件
flume-exec-hdfs.conf
```sqlName the components on this agent
a2.sources = r2 a2.sinks = k2 a2.channels = c2
Describe/configure the source
a2.sources.r2.type = exec a2.sources.r2.command = tail -F /tmp/root/hive.log
Use a channel which buffers events in memory
a2.channels.c2.type = memory a2.channels.c2.capacity = 10000 a2.channels.c2.transactionCapacity = 500
Describe the sink(9000是NameNode的地址,也可能是8020,具体看HDFS的配置文件)
a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://linux121:9000/flume/%Y%m%d/%H%M
上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
积攒500个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 500
设置文件类型,支持压缩。DataStream不启用压缩
a2.sinks.k2.hdfs.fileType = DataStream
1分钟滚动一次
a2.sinks.k2.hdfs.rollInterval = 60
128M滚动一次
a2.sinks.k2.hdfs.rollSize = 134217700
文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
最小冗余数(建议值:1)
a2.sinks.k2.hdfs.minBlockReplicas = 1
Bind the source and sink to the channel
a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
**3、启动Agent**
- 注:因为 sink 设置的是hdfs,如果此时hdfs没有打开,那么启动界面会周期性的报错:拒绝连接(因为没有打开hdfs),打开hdfs后此错误信息将不再出现
```shell
$FLUME_HOME/bin/flume-ng agent --name a2 \
--conf-file $FLUME_HOME/conf/flume-exec-hdfs.conf \
-Dflume.root.logger=INFO,console
4、启动Hadoop和Hive,操作Hive产生日志
start-dfs.sh
start-yarn.sh
- 在命令行多次执行,以产生更多日志记录
hive -e "show databases"
5、在HDFS上查看文件**
- hsfs上文件按照日期来分开保存
hdfs://linux121:9000/flume/%Y%m%d/%H%M
3. 监控目录采集信息到HDFS
业务需求:
- 监控指定目录,收集信息实时上传到HDFS
需求分析:
- source 选择 spooldir。spooldir 能够保证数据不丢失,且能够实现断点续传,但延迟较高,不能实时监控
- channel 选择 memory
- sink 选择 HDFS
spooldir Source 监听一个指定的目录,即只要向指定目录添加新的文件,source组件就可以获取到该信息,并解析该文件的内容,写入到channel。sink处理完之后,标记该文件已完成处理,文件名添加
.completed
后缀。虽然是自动监控整个目录,但是只能监控文件,如果以追加的方式向已被处理的文件中添加内容,source并不能识别。需要注意的是:- 拷贝到spooldir目录下的文件不可以再打开编辑
- 无法监控子目录的文件夹变动
- 被监控文件夹每500毫秒扫描一次文件变动
- 适合**用于同步新文件,但不适合**对实时追加日志的文件进行监听并同步
1、创建配置文件
flume-spooldir-hdfs.conf
- 注:监视的目录要存在,不然会报错(增加该目录后,报错会消失)
```sql
Name the components on this agent
a3.sources = r3 a3.channels = c3 a3.sinks = k3
Describe/configure the source
Flume Source监控的文件夹目录,该目录下的文件会被Flume收集
a3.sources.r3.type = spooldir a3.sources.r3.spoolDir = /root/upload a3.sources.r3.fileSuffix = .COMPLETED a3.sources.r3.fileHeader = true
忽略以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*.tmp)
Use a channel which buffers events in memory
a3.channels.c3.type = memory a3.channels.c3.capacity = 10000 a3.channels.c3.transactionCapacity = 500
Describe the sink
a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://linux121:9000/flume/upload/%Y%m%d/%H%M
上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
积攒500个Event,flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 500
设置文件类型
a3.sinks.k3.hdfs.fileType = DataStream
60秒滚动一次
a3.sinks.k3.hdfs.rollInterval = 60
128M滚动一次
a3.sinks.k3.hdfs.rollSize = 134217700
文件滚动与event数量无关
a3.sinks.k3.hdfs.rollCount = 0
最小冗余数
a3.sinks.k3.hdfs.minBlockReplicas = 1
Bind the source and sink to the channel
a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3
**HDFS Sink的配置:**<br />一般使用 HDFS Sink 都会采用滚动生成文件的方式,滚动生成文件的策略有:
- 基于时间
- hdfs.rollInterval
- 缺省值:30,单位秒
- 0:表示禁用
- 基于文件大小
- hdfs.rollSize
- 缺省值:1024字节,采用缺省值会产生很多小文件,不推荐使用缺省值
- 0:表示禁用
- 基于event数量
- hdfs.rollCount
- 10
- 0:表示禁用
- 基于文件空闲时间
- hdfs.idleTimeout
- 缺省值:0,表示禁用
- 基于HDFS文件副本数
- hdfs.minBlockReplicas
- 默认:与HDFS的副本数一致
- **要将该参数设置为1,**否则HFDS文件所在块的复制会引起文件滚动
- **如果要避免 HDFS Sink 产生****小文件****,参考如下参数设置:**
```sql
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.path=hdfs://linux121:9000/flume/events/%Y/%m/%d/%H/%M
a1.sinks.k1.hdfs.minBlockReplicas=1
a1.sinks.k1.hdfs.rollInterval=3600
a1.sinks.k1.hdfs.rollSize=0
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=0
其他重要配置:
- hdfs.useLocalTimeStamp
- 使用本地时间:true,而不是 event header 的时间戳
- 默认值:false
- hdfs.round
- 时间戳是否四舍五入
- 默认值false
- 如果为true,会影响所有的时间,除了t%
- hdfs.roundValue
- 四舍五入的最高倍数(单位配置在hdfs.roundUnit),但是要小于当前时间
- 默认值:1
- hdfs.roundUnit
- 可选值为:second、minute、hour
- 默认值:second
2、启动Agent
$FLUME_HOME/bin/flume-ng agent --name a3 \
--conf-file $FLUME_HOME/conf/flume-spooldir-hdfs.conf \
-Dflume.root.logger=INFO,console
3、向upload文件夹中添加文件
- 因为配置文件中 指定了监视的目录如下
/root/upload
4、查看HDFS上的数据
hdfs://linux121:9000/flume/upload/%Y%m%d/%H%M
4. 监控日志文件采集数据到HDFS、本地文件系统
业务需求:
- 监控日志文件,收集信息上传到HDFS(hdfs) 和 本地文件系统(file_roll)
需求分析:
- 需要多个Agent级联实现
- source 选择 taildir
- channel 选择 memory channel
- 最终的 sink 分别选择 hdfs、file_roll
- taildir Source。Flume 1.7.0加入的新Source,相当于 spooldir source + exec source。可以监控多个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集。实时监控一批文件,并记录每个文件最新消费位置,agent进程重启后不会有数据丢失的问题。
- 目前不适用于Windows系统;
- 其不会对于跟踪的文件有任何处理,不会重命名也不会删除,不会做任何修改(优点)。
不支持读取二进制文件,支持一行一行的读取文本文件。
![image.png](https://cdn.nlark.com/yuque/0/2020/png/2322054/1607053606722-573f1d6f-0b7e-4ad1-b4ce-c75121815a7e.png?x-oss-process=image%2Fwatermark%2Ctype_d3F5LW1pY3JvaGVp%2Csize_14%2Ctext_TGFuY2VNYWk%3D%2Ccolor_FFFFFF%2Cshadow_50%2Ct_80%2Cg_se%2Cx_10%2Cy_10#align=left&display=inline&height=544&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1088&originWidth=2225&size=247790&status=done&style=shadow&width=1112.5)
实现步骤(核心是上图):
1、创建第一个配置文件
flume-taildir-avro.conf
配置文件包括:
将数据流复制给所有channel
a1.sources.r1.selector.type = replicating
source
a1.sources.r1.type = taildir
记录每个文件最新消费位置
a1.sources.r1.positionFile = /root/flume/taildir_position.json a1.sources.r1.filegroups = f1
备注:.log 是正则表达式;这里写成 .log 是错误的
a1.sources.r1.filegroups.f1 = /tmp/root/.*log
sink
a1.sinks.k1.type = avro a1.sinks.k1.hostname = linux123 a1.sinks.k1.port = 9091 a1.sinks.k2.type = avro a1.sinks.k2.hostname = linux123 a1.sinks.k2.port = 9092
channel
a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 500 a1.channels.c2.type = memory a1.channels.c2.capacity = 10000 a1.channels.c2.transactionCapacity = 500
Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
**2、创建第二个配置文件**
- `flume-avro-hdfs.conf` 配置文件包括:
- 1个 `avro source`
- 1个 `memory channel`
- 1个 `hdfs sink`
```sql
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = linux123
a2.sources.r1.port = 9091
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 500
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://linux121:9000/flume2/%Y%m%d/%H
# 上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
# 是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
# 500个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 500
# 设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
# 60秒生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 60
a2.sinks.k1.hdfs.rollSize = 0
a2.sinks.k1.hdfs.rollCount = 0
a2.sinks.k1.hdfs.minBlockReplicas = 1
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
3、创建第三个配置文件
flume-avro-file.conf
配置文件包括:
Describe/configure the source
a3.sources.r1.type = avro a3.sources.r1.bind = linux123 a3.sources.r1.port = 9092
Describe the sink
a3.sinks.k1.type = file_roll
目录需要提前创建好
a3.sinks.k1.sink.directory = /root/flume/output
Describe the channel
a3.channels.c2.type = memory a3.channels.c2.capacity = 10000 a3.channels.c2.transactionCapacity = 500
Bind the source and sink to the channel
a3.sources.r1.channels = c2 a3.sinks.k1.channel = c2
**4、分别启动3个Agent**
```sql
# 符号 & 表示程序在后台运行
$FLUME_HOME/bin/flume-ng agent --name a3 \
--conf-file $FLUME_HOME/conf/flume-avro-file.conf \
-Dflume.root.logger=INFO,console &
$FLUME_HOME/bin/flume-ng agent --name a2 \
--conf-file $FLUME_HOME/conf/flume-avro-hdfs.conf \
-Dflume.root.logger=INFO,console &
$FLUME_HOME/bin/flume-ng agent --name a1 \
--conf-file $FLUME_HOME/conf/flume-taildir-avro.conf \
-Dflume.root.logger=INFO,console &
5、执行hive命令产生日志
hive -e "show databases"
6、分别检查HDFS文件、本地文件、以及消费位置文件
- hdfs文件:
hdfs://linux121:9000/flume2/%Y%m%d/%H
- 本地文件:
/root/flume/output
- 消费位置文件:
/root/flume/taildir_position.json
**
总结:三种监控日志文件Source的对比
exec Source:
- 适用于监控一个实时追加的文件,但不能保证数据不丢失
spooldir Source:
- 能够保证数据不丢失,且能够实现断点续传,但延迟较高,不能实时监控
taildir Source:
- 既能够实现断点续传,又可以保证数据不丢失,还能够进行实时监控