HDFS Sink

image.png

  • 方式一:logger,将最终采集日志打印到控制台
  • 方式二:hdfs sink,将采集的数据存储到hdfs上

    配置文件

    ```shell

    Name the components on this agent

    a1.sources = r1 a1.sinks = k1 a1.channels = c1

Describe/configure the source

a1.sources.r1.type = exec a1.sources.r1.command = cat /home/hadoop/second.txt

Describe the sink

a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /a/flume/test_01

Use a channel which buffers events in memory

a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100

Bind the source and sink to the channel

a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

  1. <a name="e28A7"></a>
  2. ## 启动脚本
  3. `flume-ng agent --conf conf --conf-file sink/a.conf --name a1`
  4. <a name="sA2fg"></a>
  5. ## 数据验证
  6. `hadoop dfs -cat /a/flume/test_01/FlumeData.1595929423024.tmp`<br />产生文件:前缀.时间戳.后缀 —— FlumeData.1595929423024.tmp
  7. <a name="NePl3"></a>
  8. # Sink-HDFS
  9. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/1964088/1595900687568-491d79d7-741d-4217-b5fe-c7fd6a3832cc.png#align=left&display=inline&height=71&margin=%5Bobject%20Object%5D&name=image.png&originHeight=71&originWidth=1181&size=25291&status=done&style=none&width=1181)
  10. - 有一个参数满足就会执行
  11. - 旧文件关闭写入,开启新文件(文件回滚)
  12. <a name="pSXdu"></a>
  13. ## 实验一:cat
  14. <a name="NrXFP"></a>
  15. ### 配置文件
  16. ```shell
  17. # Name the components on this agent
  18. a1.sources = r1
  19. a1.sinks = k1
  20. a1.channels = c1
  21. # Describe/configure the source
  22. a1.sources.r1.type = exec
  23. a1.sources.r1.command = cat /home/hadoop/second.txt
  24. # Describe the sink
  25. a1.sinks.k1.type = hdfs
  26. a1.sinks.k1.hdfs.path = /a/flume/test_01
  27. a1.sinks.k1.hdfs.filePrefix = pkq
  28. a1.sinks.k1.hdfs.fileSuffix = .123
  29. a1.sinks.k1.hdfs.rollInterval = 10
  30. a1.sinks.k1.hdfs.rollSize = 134200000
  31. a1.sinks.k1.hdfs.rollCount = 10000
  32. # Use a channel which buffers events in memory
  33. a1.channels.c1.type = memory
  34. a1.channels.c1.capacity = 1000
  35. a1.channels.c1.transactionCapacity = 100
  36. # Bind the source and sink to the channel
  37. a1.sources.r1.channels = c1
  38. a1.sinks.k1.channel = c1

启动脚本

flume-ng agent --conf conf --conf-file b.conf --name a1
问:设置10s滚动一个文件(a1.sinks.k1.hdfs.rollInterval = 10),若没有变化,过10s会不会滚动?
不会。采集不到数据时,滚动文件会造成hdfs存储空文件和空文件的原数据信息,给namenode压力。

实验二:tail

配置文件

  1. # Name the components on this agent
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # Describe/configure the source
  6. a1.sources.r1.type = exec
  7. a1.sources.r1.command = tail -F /home/hadoop/second.txt
  8. # Describe the sink
  9. a1.sinks.k1.type = hdfs
  10. a1.sinks.k1.hdfs.path = /a/flume/test_01
  11. a1.sinks.k1.hdfs.filePrefix = pkq
  12. a1.sinks.k1.hdfs.fileSuffix = .123
  13. a1.sinks.k1.hdfs.rollInterval = 10
  14. a1.sinks.k1.hdfs.rollSize = 134200000
  15. a1.sinks.k1.hdfs.rollCount = 10000
  16. # Use a channel which buffers events in memory
  17. a1.channels.c1.type = memory
  18. a1.channels.c1.capacity = 1000
  19. a1.channels.c1.transactionCapacity = 100
  20. # Bind the source and sink to the channel
  21. a1.sources.r1.channels = c1
  22. a1.sinks.k1.channel = c1

启动脚本

flume-ng agent --conf conf --conf-file c.conf --name a1

总结

  • 批量采集:cat 文件;文件夹
  • 实时采集:tail -F 文件

    实验三:cat - 时间(Timestamp Interceptor)

    配置文件

    ```shell

    Name the components on this agent

    a1.sources = r1 a1.sinks = k1 a1.channels = c1

Describe/configure the source

a1.sources.r1.type = exec a1.sources.r1.command = cat /home/hadoop/second.txt a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp

Describe the sink

a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /a/flume/order/%Y-%m-%d/%H-%M-%S a1.sinks.k1.hdfs.filePrefix = pkq a1.sinks.k1.hdfs.fileSuffix = .123 a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134200000 a1.sinks.k1.hdfs.rollCount = 10000

Use a channel which buffers events in memory

a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100

Bind the source and sink to the channel

a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

<a name="bsv2X"></a>
### 启动脚本
`flume-ng agent --conf conf --conf-file c.conf --name a1`
<a name="WFU42"></a>
## 实验四:tail -F - 时间(Timestamp Interceptor)
<a name="f21wz"></a>
### 配置文件
```shell
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/second.txt
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /a/flume/order/%Y-%m-%d/%H-%M-%S
a1.sinks.k1.hdfs.filePrefix = pkq
a1.sinks.k1.hdfs.fileSuffix = .123
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134200000
a1.sinks.k1.hdfs.rollCount = 10000

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动脚本

flume-ng agent --conf conf --conf-file c.conf --name a1