- HDFS Sink
- Name the components on this agent
- Describe/configure the source
- Describe the sink
- Use a channel which buffers events in memory
- Bind the source and sink to the channel
- 实验二:tail
- 总结
- 实验三:cat - 时间(Timestamp Interceptor)
- Name the components on this agent
- Describe/configure the source
- Describe the sink
- Use a channel which buffers events in memory
- Bind the source and sink to the channel
HDFS Sink
- 方式一:logger,将最终采集日志打印到控制台
- 方式二:hdfs sink,将采集的数据存储到hdfs上
配置文件
```shellName the components on this agent
a1.sources = r1 a1.sinks = k1 a1.channels = c1
Describe/configure the source
a1.sources.r1.type = exec a1.sources.r1.command = cat /home/hadoop/second.txt
Describe the sink
a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /a/flume/test_01
Use a channel which buffers events in memory
a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
Bind the source and sink to the channel
a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
<a name="e28A7"></a>
## 启动脚本
`flume-ng agent --conf conf --conf-file sink/a.conf --name a1`
<a name="sA2fg"></a>
## 数据验证
`hadoop dfs -cat /a/flume/test_01/FlumeData.1595929423024.tmp`<br />产生文件:前缀.时间戳.后缀 —— FlumeData.1595929423024.tmp
<a name="NePl3"></a>
# Sink-HDFS

- 有一个参数满足就会执行
- 旧文件关闭写入,开启新文件(文件回滚)
<a name="pSXdu"></a>
## 实验一:cat
<a name="NrXFP"></a>
### 配置文件
```shell
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat /home/hadoop/second.txt
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /a/flume/test_01
a1.sinks.k1.hdfs.filePrefix = pkq
a1.sinks.k1.hdfs.fileSuffix = .123
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134200000
a1.sinks.k1.hdfs.rollCount = 10000
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动脚本
flume-ng agent --conf conf --conf-file b.conf --name a1
问:设置10s滚动一个文件(a1.sinks.k1.hdfs.rollInterval = 10),若没有变化,过10s会不会滚动?
不会。采集不到数据时,滚动文件会造成hdfs存储空文件和空文件的原数据信息,给namenode压力。
实验二:tail
配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/second.txt
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /a/flume/test_01
a1.sinks.k1.hdfs.filePrefix = pkq
a1.sinks.k1.hdfs.fileSuffix = .123
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134200000
a1.sinks.k1.hdfs.rollCount = 10000
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动脚本
flume-ng agent --conf conf --conf-file c.conf --name a1
总结
- 批量采集:cat 文件;文件夹
- 实时采集:tail -F 文件
实验三:cat - 时间(Timestamp Interceptor)
配置文件
```shellName the components on this agent
a1.sources = r1 a1.sinks = k1 a1.channels = c1
Describe/configure the source
a1.sources.r1.type = exec a1.sources.r1.command = cat /home/hadoop/second.txt a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp
Describe the sink
a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /a/flume/order/%Y-%m-%d/%H-%M-%S a1.sinks.k1.hdfs.filePrefix = pkq a1.sinks.k1.hdfs.fileSuffix = .123 a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134200000 a1.sinks.k1.hdfs.rollCount = 10000
Use a channel which buffers events in memory
a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
Bind the source and sink to the channel
a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
<a name="bsv2X"></a>
### 启动脚本
`flume-ng agent --conf conf --conf-file c.conf --name a1`
<a name="WFU42"></a>
## 实验四:tail -F - 时间(Timestamp Interceptor)
<a name="f21wz"></a>
### 配置文件
```shell
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/second.txt
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /a/flume/order/%Y-%m-%d/%H-%M-%S
a1.sinks.k1.hdfs.filePrefix = pkq
a1.sinks.k1.hdfs.fileSuffix = .123
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134200000
a1.sinks.k1.hdfs.rollCount = 10000
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动脚本
flume-ng agent --conf conf --conf-file c.conf --name a1