1、监控端口数据

flume:
用flume配置job\flume.xx.xx.conf文件
创建一个44444的客户端,并监视
netcat:
用netcat给44444端口发送信息

具体看文档:尚硅谷大数据技术之Flume.docx


2、实时监控目录下的多个追加文件

对比:

Exec source:适用于监控一个实时追加的文件,不能实现断点续传
spooldir source:适用于同步新文件,但不使用与对于实时追加日志文件进行
Taildir source : 适用于监听多个实时追加的文件(指定文件夹),并且能实现断点续传(保存成json文件)

*断点续传:
重新执行监控命令后会继续从之前读到的位置继续读取文件,而不会重新再读文件,
生产环境中,公司的数据一个月清理一次,如果不断点续传那么清理后重新读到的数据则是清理后不完整的数据了
记录偏移量(保存成json文件(tail_dir.json)):
Taildir Source维护了一个json格式的position File,其会定期的往position File中更新每个文件读取到的最新的位置“pos”,因此能够实现断点续传。Position File的格式如下:
image.png
注:Linux中储存文件元数据的区域就叫做inode,每个inode都有一个号码,操作系统用inode号码来识别不同的文件,Unix/Linux系统内部不使用文件名,而使用inode号码来识别文件

多文件夹的优劣:
优:每个磁盘的io是有限的,设置多文件,就是增加磁盘数量,提升io能力
劣:多文件夹有可能造成数据量太大,导致内存不足,可以改为filechannal


实现

1)在/opt/module/flume/下创建job目录,在job中创建配置文件

  1. vim flume-taildir-hdfs.conf//起名:source 方式- hdfs写入目标

特别指出:
# a1.sources.r1.kafka.consumer.auto.offset.reset = earliest 这个参数
所有消费者第一次消费才有用!!!

  1. ====================指定===================
  2. a3.sources = r3
  3. a3.sinks = k3
  4. a3.channels = c3
  5. ====================source====================
  6. # Describe/configure the source
  7. a3.sources.r3.type = TAILDIR
  8. a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
  9. a3.sources.r3.filegroups = f1 f2
  10. a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
  11. a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*、
  12. // kafka source 可指定消费模式:latest(默认)
  13. # a1.sources.r1.kafka.consumer.auto.offset.reset = earliest
  14. //所有消费者第一次消费才有用!!!
  15. a1.sources.r1.kafka.consumer.auto.offset.reset = earliest
  16. ====================sink====================
  17. # Describe the sink
  18. a3.sinks.k3.type = hdfs
  19. a3.sinks.k3.hdfs.path = hdfs://hadoop102:9820/flume/upload2/%Y%m%d/%H
  20. #上传文件的前缀
  21. a3.sinks.k3.hdfs.filePrefix = upload-
  22. #是否按照时间滚动文件夹
  23. a3.sinks.k3.hdfs.round = true
  24. #多少时间单位创建一个新的文件夹
  25. a3.sinks.k3.hdfs.roundValue = 1
  26. #重新定义时间单位
  27. a3.sinks.k3.hdfs.roundUnit = hour
  28. #是否使用本地时间戳
  29. a3.sinks.k3.hdfs.useLocalTimeStamp = true
  30. #积攒多少个Event才flush到HDFS一次!!!!!!!!!!!!!!!
  31. a3.sinks.k3.hdfs.batchSize = 100
  32. #设置文件类型,可支持压缩
  33. a3.sinks.k3.hdfs.fileType = DataStream
  34. #多久生成一个新的文件
  35. a3.sinks.k3.hdfs.rollInterval = 60
  36. #设置每个文件的滚动大小大概是128M
  37. a3.sinks.k3.hdfs.rollSize = 134217700
  38. #文件的滚动与Event数量无关
  39. a3.sinks.k3.hdfs.rollCount = 0
  40. ====================channel====================
  41. # Use a channel which buffers events in memory
  42. //channels格式memory/file
  43. a3.channels.c3.type = memory
  44. //channel的容量
  45. a3.channels.c3.capacity = 1000
  46. //channel两边事务的容量!!!!!!!!!!
  47. a3.channels.c3.transactionCapacity = 100
  48. //file chanel 可指定多目录
  49. ====================绑定====================
  50. # Bind the source and sink to the channel
  51. a3.sources.r3.channels = c3
  52. a3.sinks.k3.channel = c3

2)根据:

  1. a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
  2. a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*

创建两个文件夹:files、files2在对应路径上

3)启动监控文件夹命令
第一种写法:

  1. [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

第二种写法:

  1. [atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

参数说明:
—conf/-c:表示配置文件存储在conf/目录
—name/-n:表示给agent起名为a1 (多个flume这个-n可以相同,只要和配置文件的相同即可)
—conf-file/-f:flume本次启动读取的配置文件是在job文件夹下的flume-telnet.conf文件。
-Dflume.root.logger=INFO,console :
-D表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error。

4)查看hdfs对应位置:

  1. a3.sinks.k3.hdfs.path = hdfs://hadoop102:9820/flume/upload2/%Y%m%d/%H

5)后台运行,记得写绝对路径

  1. nohup /opt/flume/bin/flume-ng agent -c /opt/flume/conf/ -n a1 -f /opt/flume/job/flume-kafka-hdfs.conf -Dflume.root.logger=INFO,console &