1.Flume定义

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。

1.2 Flume基础架构

image.png

1.2.1Agent

Agent是一个JVM进程,它以事件的形式将数据从源头送至目的。
Agent主要有3个部分组成,Source、Channel、Sink。

1.2.2Source

Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。

1.2.3Source

Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。
Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。

1.2.4Channel

Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。
Flume自带两种Channel:Memory Channel和File Channel。
Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。

1.2.5Event

传输单元,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。Event由HeaderBody两部分组成,Header用来存放该event的一些属性,为K-V结构,Body用来存放该条数据,形式为字节数组。

1.2.6Interceptors

在Flume中允许使用拦截器对传输中的event进行拦截和处理!拦截器必须实现org.apache.flume.interceptor.Interceptor接口。拦截器可以根据开发者的设定修改甚至删除event!Flume同时支持拦截器链,即由多个拦截器组合而成!通过指定拦截器链中拦截器的顺序,event将按照顺序依次被拦截器进行处理!

1.2.7Channel Selectors

  1. Channel Selectors用于source组件将event传输给多个channel的场景。常用的有replicating(默认)和multiplexing两种类型。replicating负责将event复制到多个channel,而multiplexing则根据event的属性和配置的参数进行匹配,匹配成功则发送到指定的channel!

1.2.8Sink Processors

用户可以将多个sink组成一个整体(sink组),Sink Processors可用于提供组内的所有sink的负载平衡功能,或在时间故障的情况下实现从一个sink到另一个sink的故障转移。

2.Flume使用

  1. tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
  2. mv flume-env.sh.template flume-env.sh
  3. vi flume-env.sh
  4. export JAVA_HOME=/opt/module/jdk1.8.0_144
  5. 1.使用Flume监听一个端口,收集该端口数据,并打印到控制台
  6. sudo yum install -y nc
  7. sudo netstat -tunlp | grep 44444

2.1监控单个文件

  1. # Name the components on this agent
  2. a2.sources = r2
  3. a2.sinks = k2
  4. a2.channels = c2
  5. # Describe/configure the source
  6. a2.sources.r2.type = exec
  7. a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
  8. a2.sources.r2.shell = /bin/bash -c
  9. # Describe the sink
  10. a2.sinks.k2.type = hdfs
  11. a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
  12. #上传文件的前缀
  13. a2.sinks.k2.hdfs.filePrefix = logs-
  14. #是否按照时间滚动文件夹
  15. a2.sinks.k2.hdfs.round = true
  16. #多少时间单位创建一个新的文件夹
  17. a2.sinks.k2.hdfs.roundValue = 1
  18. #重新定义时间单位
  19. a2.sinks.k2.hdfs.roundUnit = hour
  20. #是否使用本地时间戳
  21. a2.sinks.k2.hdfs.useLocalTimeStamp = true
  22. #积攒多少个Event才flush到HDFS一次
  23. a2.sinks.k2.hdfs.batchSize = 100
  24. #多久生成一个新的文件
  25. a2.sinks.k2.hdfs.rollInterval = 60
  26. #设置每个文件的滚动大小
  27. a2.sinks.k2.hdfs.rollSize = 134217700
  28. #文件的滚动与Event数量无关
  29. a2.sinks.k2.hdfs.rollCount = 0
  30. # Use a channel which buffers events in memory
  31. a2.channels.c2.type = memory
  32. a2.channels.c2.capacity = 10000
  33. a2.channels.c2.transactionCapacity = 1000
  34. # Bind the source and sink to the channel
  35. a2.sources.r2.channels = c2
  36. a2.sinks.k2.channel = c2

参考地址:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html