安装
1. 下载并解压
tar zxvf apache-flume-1.8.0-bin.tar.gz -C /soft
2. 修改flume的环境变量
cp flume-env.sh.template flume-env.sh
3. 修改JAVA_HOME变量的值
export JAVA_HOME=/soft/jdk
flume
收集日志、移动、聚合框架。
基于事件。
agent
source //接收数据,生产者
//put()
//NetcatSource
//ExecSource,实时收集 tail -F xxx.txt
//spooldir
//seq
//Stress
//avroSource
channel //暂存数据,缓冲区,
//非永久性:MemoryChannel
//永久性 :FileChannel,磁盘.
//SpillableMemoryChannel :Mem + FileChannel.Capacity
sink //输出数据,消费者
//从channel提取take()数据,write()destination.
//HdfsSink
//HbaseSink
//avroSink
1.创建配置文件-hello.conf
1.创建配置文件
[/soft/flume/conf/hello.conf]
#声明三种组件
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#定义source信息
a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=8888
#定义sink信息
a1.sinks.k1.type=logger
#定义channel信息
a1.channels.c1.type=memory
#绑定在一起
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
2.运行
a)启动flume agent
$>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console
b)启动nc的客户端
$>nc localhost 8888
$nc>hello world
c)在flume的终端输出hello world.
2.实时收集日志并将日志打印到console
实时日志收集,实时收集日志。
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type=exec
# 实时检查该文件是否发生变化,将变化的文件发送到sink
a1.sources.r1.command=tail -F /home/centos/test.txt
a1.sinks.k1.type=logger
a1.channels.c1.type=memory
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
3.批量收集
监控一个文件夹,静态文件。
收集完之后,会重命名文件成新文件。.compeleted.
a)配置文件
[spooldir_r.conf]
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/home/centos/spool
a1.sources.r1.fileHeader=true
a1.sinks.k1.type=logger
a1.channels.c1.type=memory
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
b)创建目录
$>mkdir ~/spool
c)启动flume
$>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console
4.flume + Kafka
flume收集的数据送往Kafka消息队列
配置文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type=exec
#-F 最后10行,如果从头开始收集 -c +0 -F:持续收集后续数据,否则进程停止。
a1.sources.r1.command=tail -F -c +0 /home/centos01/callLog/callLog.log
a1.channels.c1.type=memory
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = calllog
#a1.sinks.k1.kafka.bootstrap.servers = sa0:9092 sa1:9092 sa2:9092
#a1.sinks.k1.brokerList = sa0:9092 sa1:9092 sa2:9092
a1.sinks.k1.brokerList = sa0:9092
#a1.sinks.k1.kafka.bootstrap.servers = 192.168.0.121:9092 192.168.0.122:9092 192.168.0.123:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
5.hbase + flume
将flume收集的数据存储到hbase中
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888
a1.sinks.k1.type = hbase
a1.sinks.k1.table = ns1:t12
a1.sinks.k1.columnFamily = f1
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.channels.c1.type=memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
6.flume + hdfs
1.hdfs
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
#是否是产生新目录,每十分钟产生一个新目录,一般控制的目录方面。
#2017-12-12 -->
#2017-12-12 -->%H%M%S
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.useLocalTimeStamp=true
#是否产生新文件。
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.hdfs.rollSize=10
a1.sinks.k1.hdfs.rollCount=3
a1.channels.c1.type=memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume + ftp
flume-ng-ftp-source-FTP.conf
### wwww.keedio.com
# example file, protocol is ftp, process by lines, and sink to file_roll
# for testing poporses.
## Sources Definition for agent "agent"
#ACTIVE LIST
agent.sources = ftp1
agent.sinks = k1
agent.channels = ch1
##### SOURCE IS ftp server
# Type of source for ftp sources
agent.sources.ftp1.type = org.keedio.flume.source.ftp.source.Source
agent.sources.ftp1.client.source = ftp
# Connection properties for ftp server
agent.sources.ftp1.name.server = 192.168.0.4
agent.sources.ftp1.port = 21
agent.sources.ftp1.user = mortadelo
agent.sources.ftp1.password = secret
# Process files in
agent.sources.ftp1.working.directory = /subdirA/subdirAA
# Proces files matches (java regex for ftp-ftps)
agent.sources.ftp1.filter.pattern = .+\\.csv
# keep file track status in folder
agent.sources.ftp1.folder = /var/log/flume-ftp
# file track status name
agent.sources.ftp1.file.name = ftp1-status-file.ser
# Discover delay, each configured milisecond directory will be explored
agent.sources.ftp1.run.discover.delay=5000
# Process by lines
agent.sources.ftp1.flushlines = true
# Discover and process files under user's home directory
agent.sources.ftp1.search.recursive = true
# Do not process file while it is being written.
agent.sources.ftp1.processInUse = false
# If file must not be processed while it is being written, wait timeout.
agent.sources.ftp1.processInUseTimeout = 30
agent.sinks.k1.type = file_roll
agent.sinks.k1.sink.directory = /var/log/flume-ftp
agent.sinks.k1.sink.rollInterval = 7200
agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 10000
agent.channels.ch1.transactionCapacity = 1000
agent.sources.ftp1.channels = ch1
agent.sinks.k1.channel = ch1
flume-ng-ftp-source-FTPS.conf
# www.keedio.com
# example configuration file for FTP SECURE
## Sources Definition for agent "agent"
#ACTIVE LIST
agent.sources = ftps1
agent.sinks = k1
agent.channels = ch1
##### SOURCE IS ftp server
# Type of source for ftp sources
agent.sources.ftps1.type = org.keedio.flume.source.ftp.source.Source
agent.sources.ftps1.client.source = ftps
#agent.sources.ftp1.type = org.apache.flume.source.FTPSource
# Connection properties for ftp server
agent.sources.ftps1.name.server = 192.168.0.2
agent.sources.ftps1.port = 21
agent.sources.ftps1.user = mortadelo
agent.sources.ftps1.password = secret
agent.sources.ftps1.folder = /var/log/flume-ftp
agent.sources.ftps1.file.name = ftps1-status-file.ser
##secure
agent.sources.ftps1.security.enabled = true
agent.sources.ftps1.security.cipher = TLS
agent.sources.ftps1.security.certificate.enabled =false
# Discover delay, each configured milisecond directory will be explored
agent.sources.ftps1.run.discover.delay=5000
##process by chunks of bytes
agent.sources.ftps1.flushlines = false
# source will write events in sink file_roll (testing porposes).
agent.sinks.k1.type = file_roll
agent.sinks.k1.sink.directory = /var/log/flume-ftp
agent.sinks.k1.sink.rollInterval = 7200
agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 10000
agent.channels.ch1.transactionCapacity = 1000
flume-ng-ftp-source-SFTP.conf
# www.keedio.com
# example configuration for SFTP
## Sources Definition for agent "agent"
#ACTIVE LIST
agent.sources = sftp1
agent.sinks = k1
agent.channels = ch1
##### SOURCE IS sftp server
# Type of source for sftp sources
agent.sources.sftp1.type = org.keedio.flume.source.ftp.source.Source
agent.sources.sftp1.client.source = sftp
#agent.sources.sftp1.type = org.apache.flume.source.SFTPSource
# Connection properties for ftp server
agent.sources.sftp1.name.server = 192.168.0.2
agent.sources.sftp1.port = 22
agent.sources.sftp1.user = filemon
agent.sources.sftp1.password = secret
# Process files in
agent.sources.sftp1.working.directory = /home/filemon/subdirA
# Proces files matches (java regex for sftp)
agent.sources.sftp1.filter.pattern = .+\\.csv
# keep file track status in folder
agent.sources.sftp1.folder = /var/log/flume-ftp
# file track status name
agent.sources.sftp1.file.name = sftp1-status-file.ser
## root is launching flume binary.
agent.sources.sftp1.knownHosts = /root/.ssh/known_hosts
## for testing porposes only, default is yes
agent.sources.sftp1.strictHostKeyChecking = no
# Discover delay, each configured milisecond directory will be explored
agent.sources.sftp1.run.discover.delay=5000
#process by lines.
agent.sources.sftp1.flushlines = true
# Whether a recursive search should be conducted on working directory
agent.sources.sftp1.search.recursive = false
# Whether files that are currently being written to should be skipped
agent.sources.sftp1.search.processInUse = false
agent.sources.sftp1.search.processInUseTimeout = 30 # Seconds ago used to determine whether file is still being written to
# If source files are compressed, they can be decompressed on the fly
# Specify compression format like this. The existence of this property implies that source files are compressed
agent.sources.sftp1.compressed = gzip # Source files are GZIP compressed
agent.sinks.k1.type = file_roll
agent.sinks.k1.sink.directory = /var/log/flume-ftp
agent.sinks.k1.sink.rollInterval = 7200
agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 10000
agent.channels.ch1.transactionCapacity = 1000
agent.sources.sftp1.channels = ch1
agent.sinks.k1.channel = ch1