安装

  1. 1. 下载并解压
  2. tar zxvf apache-flume-1.8.0-bin.tar.gz -C /soft
  3. 2. 修改flume的环境变量
  4. cp flume-env.sh.template flume-env.sh
  5. 3. 修改JAVA_HOME变量的值
  6. export JAVA_HOME=/soft/jdk

flume

  1. 收集日志、移动、聚合框架。
  2. 基于事件。

agent

  1. source //接收数据,生产者
  2. //put()
  3. //NetcatSource
  4. //ExecSource,实时收集 tail -F xxx.txt
  5. //spooldir
  6. //seq
  7. //Stress
  8. //avroSource
  9. channel //暂存数据,缓冲区,
  10. //非永久性:MemoryChannel
  11. //永久性 :FileChannel,磁盘.
  12. //SpillableMemoryChannel :Mem + FileChannel.Capacity
  13. sink //输出数据,消费者
  14. //从channel提取take()数据,write()destination.
  15. //HdfsSink
  16. //HbaseSink
  17. //avroSink

1.创建配置文件-hello.conf

  1. 1.创建配置文件
  2. [/soft/flume/conf/hello.conf]
  3. #声明三种组件
  4. a1.sources = r1
  5. a1.channels = c1
  6. a1.sinks = k1
  7. #定义source信息
  8. a1.sources.r1.type=netcat
  9. a1.sources.r1.bind=localhost
  10. a1.sources.r1.port=8888
  11. #定义sink信息
  12. a1.sinks.k1.type=logger
  13. #定义channel信息
  14. a1.channels.c1.type=memory
  15. #绑定在一起
  16. a1.sources.r1.channels=c1
  17. a1.sinks.k1.channel=c1
  18. 2.运行
  19. a)启动flume agent
  20. $>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console
  21. b)启动nc的客户端
  22. $>nc localhost 8888
  23. $nc>hello world
  24. c)在flume的终端输出hello world.

2.实时收集日志并将日志打印到console

  1. 实时日志收集,实时收集日志。
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. a1.sources.r1.type=exec
  6. # 实时检查该文件是否发生变化,将变化的文件发送到sink
  7. a1.sources.r1.command=tail -F /home/centos/test.txt
  8. a1.sinks.k1.type=logger
  9. a1.channels.c1.type=memory
  10. a1.sources.r1.channels=c1
  11. a1.sinks.k1.channel=c1

3.批量收集

  1. 监控一个文件夹,静态文件。
  2. 收集完之后,会重命名文件成新文件。.compeleted.
  3. a)配置文件
  4. [spooldir_r.conf]
  5. a1.sources = r1
  6. a1.channels = c1
  7. a1.sinks = k1
  8. a1.sources.r1.type=spooldir
  9. a1.sources.r1.spoolDir=/home/centos/spool
  10. a1.sources.r1.fileHeader=true
  11. a1.sinks.k1.type=logger
  12. a1.channels.c1.type=memory
  13. a1.sources.r1.channels=c1
  14. a1.sinks.k1.channel=c1
  15. b)创建目录
  16. $>mkdir ~/spool
  17. c)启动flume
  18. $>bin/flume-ng agent -f ../conf/helloworld.conf -n a1 -Dflume.root.logger=INFO,console

4.flume + Kafka

  1. flume收集的数据送往Kafka消息队列

配置文件

  1. a1.sources = r1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. a1.sources.r1.type=exec
  5. #-F 最后10行,如果从头开始收集 -c +0 -F:持续收集后续数据,否则进程停止。
  6. a1.sources.r1.command=tail -F -c +0 /home/centos01/callLog/callLog.log
  7. a1.channels.c1.type=memory
  8. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  9. a1.sinks.k1.kafka.topic = calllog
  10. #a1.sinks.k1.kafka.bootstrap.servers = sa0:9092 sa1:9092 sa2:9092
  11. #a1.sinks.k1.brokerList = sa0:9092 sa1:9092 sa2:9092
  12. a1.sinks.k1.brokerList = sa0:9092
  13. #a1.sinks.k1.kafka.bootstrap.servers = 192.168.0.121:9092 192.168.0.122:9092 192.168.0.123:9092
  14. a1.sinks.k1.kafka.flumeBatchSize = 20
  15. a1.sinks.k1.kafka.producer.acks = 1
  16. a1.sources.r1.channels = c1
  17. a1.sinks.k1.channel = c1

5.hbase + flume

将flume收集的数据存储到hbase中

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sinks = k1
  4. a1.sources.r1.type = netcat
  5. a1.sources.r1.bind = localhost
  6. a1.sources.r1.port = 8888
  7. a1.sinks.k1.type = hbase
  8. a1.sinks.k1.table = ns1:t12
  9. a1.sinks.k1.columnFamily = f1
  10. a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
  11. a1.channels.c1.type=memory
  12. a1.sources.r1.channels = c1
  13. a1.sinks.k1.channel = c1

6.flume + hdfs

  1. 1.hdfs
  2. a1.sources = r1
  3. a1.channels = c1
  4. a1.sinks = k1
  5. a1.sources.r1.type = netcat
  6. a1.sources.r1.bind = localhost
  7. a1.sources.r1.port = 8888
  8. a1.sinks.k1.type = hdfs
  9. a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/%M/%S
  10. a1.sinks.k1.hdfs.filePrefix = events-
  11. #是否是产生新目录,每十分钟产生一个新目录,一般控制的目录方面。
  12. #2017-12-12 -->
  13. #2017-12-12 -->%H%M%S
  14. a1.sinks.k1.hdfs.round = true
  15. a1.sinks.k1.hdfs.roundValue = 10
  16. a1.sinks.k1.hdfs.roundUnit = second
  17. a1.sinks.k1.hdfs.useLocalTimeStamp=true
  18. #是否产生新文件。
  19. a1.sinks.k1.hdfs.rollInterval=10
  20. a1.sinks.k1.hdfs.rollSize=10
  21. a1.sinks.k1.hdfs.rollCount=3
  22. a1.channels.c1.type=memory
  23. a1.sources.r1.channels = c1
  24. a1.sinks.k1.channel = c1

flume + ftp

flume-ng-ftp-source-FTP.conf

  1. ### wwww.keedio.com
  2. # example file, protocol is ftp, process by lines, and sink to file_roll
  3. # for testing poporses.
  4. ## Sources Definition for agent "agent"
  5. #ACTIVE LIST
  6. agent.sources = ftp1
  7. agent.sinks = k1
  8. agent.channels = ch1
  9. ##### SOURCE IS ftp server
  10. # Type of source for ftp sources
  11. agent.sources.ftp1.type = org.keedio.flume.source.ftp.source.Source
  12. agent.sources.ftp1.client.source = ftp
  13. # Connection properties for ftp server
  14. agent.sources.ftp1.name.server = 192.168.0.4
  15. agent.sources.ftp1.port = 21
  16. agent.sources.ftp1.user = mortadelo
  17. agent.sources.ftp1.password = secret
  18. # Process files in
  19. agent.sources.ftp1.working.directory = /subdirA/subdirAA
  20. # Proces files matches (java regex for ftp-ftps)
  21. agent.sources.ftp1.filter.pattern = .+\\.csv
  22. # keep file track status in folder
  23. agent.sources.ftp1.folder = /var/log/flume-ftp
  24. # file track status name
  25. agent.sources.ftp1.file.name = ftp1-status-file.ser
  26. # Discover delay, each configured milisecond directory will be explored
  27. agent.sources.ftp1.run.discover.delay=5000
  28. # Process by lines
  29. agent.sources.ftp1.flushlines = true
  30. # Discover and process files under user's home directory
  31. agent.sources.ftp1.search.recursive = true
  32. # Do not process file while it is being written.
  33. agent.sources.ftp1.processInUse = false
  34. # If file must not be processed while it is being written, wait timeout.
  35. agent.sources.ftp1.processInUseTimeout = 30
  36. agent.sinks.k1.type = file_roll
  37. agent.sinks.k1.sink.directory = /var/log/flume-ftp
  38. agent.sinks.k1.sink.rollInterval = 7200
  39. agent.channels.ch1.type = memory
  40. agent.channels.ch1.capacity = 10000
  41. agent.channels.ch1.transactionCapacity = 1000
  42. agent.sources.ftp1.channels = ch1
  43. agent.sinks.k1.channel = ch1

flume-ng-ftp-source-FTPS.conf

  1. # www.keedio.com
  2. # example configuration file for FTP SECURE
  3. ## Sources Definition for agent "agent"
  4. #ACTIVE LIST
  5. agent.sources = ftps1
  6. agent.sinks = k1
  7. agent.channels = ch1
  8. ##### SOURCE IS ftp server
  9. # Type of source for ftp sources
  10. agent.sources.ftps1.type = org.keedio.flume.source.ftp.source.Source
  11. agent.sources.ftps1.client.source = ftps
  12. #agent.sources.ftp1.type = org.apache.flume.source.FTPSource
  13. # Connection properties for ftp server
  14. agent.sources.ftps1.name.server = 192.168.0.2
  15. agent.sources.ftps1.port = 21
  16. agent.sources.ftps1.user = mortadelo
  17. agent.sources.ftps1.password = secret
  18. agent.sources.ftps1.folder = /var/log/flume-ftp
  19. agent.sources.ftps1.file.name = ftps1-status-file.ser
  20. ##secure
  21. agent.sources.ftps1.security.enabled = true
  22. agent.sources.ftps1.security.cipher = TLS
  23. agent.sources.ftps1.security.certificate.enabled =false
  24. # Discover delay, each configured milisecond directory will be explored
  25. agent.sources.ftps1.run.discover.delay=5000
  26. ##process by chunks of bytes
  27. agent.sources.ftps1.flushlines = false
  28. # source will write events in sink file_roll (testing porposes).
  29. agent.sinks.k1.type = file_roll
  30. agent.sinks.k1.sink.directory = /var/log/flume-ftp
  31. agent.sinks.k1.sink.rollInterval = 7200
  32. agent.channels.ch1.type = memory
  33. agent.channels.ch1.capacity = 10000
  34. agent.channels.ch1.transactionCapacity = 1000

flume-ng-ftp-source-SFTP.conf

  1. # www.keedio.com
  2. # example configuration for SFTP
  3. ## Sources Definition for agent "agent"
  4. #ACTIVE LIST
  5. agent.sources = sftp1
  6. agent.sinks = k1
  7. agent.channels = ch1
  8. ##### SOURCE IS sftp server
  9. # Type of source for sftp sources
  10. agent.sources.sftp1.type = org.keedio.flume.source.ftp.source.Source
  11. agent.sources.sftp1.client.source = sftp
  12. #agent.sources.sftp1.type = org.apache.flume.source.SFTPSource
  13. # Connection properties for ftp server
  14. agent.sources.sftp1.name.server = 192.168.0.2
  15. agent.sources.sftp1.port = 22
  16. agent.sources.sftp1.user = filemon
  17. agent.sources.sftp1.password = secret
  18. # Process files in
  19. agent.sources.sftp1.working.directory = /home/filemon/subdirA
  20. # Proces files matches (java regex for sftp)
  21. agent.sources.sftp1.filter.pattern = .+\\.csv
  22. # keep file track status in folder
  23. agent.sources.sftp1.folder = /var/log/flume-ftp
  24. # file track status name
  25. agent.sources.sftp1.file.name = sftp1-status-file.ser
  26. ## root is launching flume binary.
  27. agent.sources.sftp1.knownHosts = /root/.ssh/known_hosts
  28. ## for testing porposes only, default is yes
  29. agent.sources.sftp1.strictHostKeyChecking = no
  30. # Discover delay, each configured milisecond directory will be explored
  31. agent.sources.sftp1.run.discover.delay=5000
  32. #process by lines.
  33. agent.sources.sftp1.flushlines = true
  34. # Whether a recursive search should be conducted on working directory
  35. agent.sources.sftp1.search.recursive = false
  36. # Whether files that are currently being written to should be skipped
  37. agent.sources.sftp1.search.processInUse = false
  38. agent.sources.sftp1.search.processInUseTimeout = 30 # Seconds ago used to determine whether file is still being written to
  39. # If source files are compressed, they can be decompressed on the fly
  40. # Specify compression format like this. The existence of this property implies that source files are compressed
  41. agent.sources.sftp1.compressed = gzip # Source files are GZIP compressed
  42. agent.sinks.k1.type = file_roll
  43. agent.sinks.k1.sink.directory = /var/log/flume-ftp
  44. agent.sinks.k1.sink.rollInterval = 7200
  45. agent.channels.ch1.type = memory
  46. agent.channels.ch1.capacity = 10000
  47. agent.channels.ch1.transactionCapacity = 1000
  48. agent.sources.sftp1.channels = ch1
  49. agent.sinks.k1.channel = ch1