1.fluem 实时监测服务器端文件

  1. 1.下载插件
  2. git clone https://github.com/keedio/flume-ftp-source.git
  3. 2.编译
  4. mvn clean package -DskipTests
  5. 3.将生产的jar拷贝到$FLUME_HOME/lib/
  6. cp flume-ftp-source-2.2.0.jar $FLUME_HOME/lib/
  7. 4.下载依赖jar
  8. wget http://central.maven.org/maven2/commons-net/commons-net/3.3/commons-net-3.3.jar
  9. wget http://central.maven.org/maven2/com/jcraft/jsch/0.1.54/jsch-0.1.54.jar
  10. 5. commons-net-3.3.jar jsch-0.1.54.jar 拷贝到$FLUME_HOME/lib/
  11. cp commons-net-3.3.jar $FLUME_HOME/lib/
  12. cp jsch-0.1.54.jar $FLUME_HOME/lib/
  13. 6. 修改conf文件
  14. touch flume-ng-ftp-source-FTP.conf
  15. 7. 添加修改的内容(flume收集文件后保存到文件中)
  16. ## Sources Definition for agent "agent"
  17. #ACTIVE LIST
  18. agent.sources = ftp1
  19. agent.sinks = k1
  20. agent.channels = ch1
  21. ##### SOURCE IS ftp server
  22. # Type of source for ftp sources
  23. agent.sources.ftp1.type = org.keedio.flume.source.ftp.source.Source
  24. agent.sources.ftp1.client.source = ftp
  25. # Connection properties for ftp server
  26. agent.sources.ftp1.name.server = 192.168.1.64
  27. agent.sources.ftp1.port = 21
  28. agent.sources.ftp1.user =test
  29. agent.sources.ftp1.password =12345
  30. # Process files in
  31. agent.sources.ftp1.working.directory = /
  32. # Proces files matches (java regex for ftp-ftps)
  33. agent.sources.ftp1.filter.pattern = .+\\.txt
  34. # keep file track status in folder
  35. agent.sources.ftp1.folder = /root/download
  36. # file track status name
  37. agent.sources.ftp1.file.name = ftp1-status-file.ser
  38. # Discover delay, each configured milisecond directory will be explored
  39. agent.sources.ftp1.run.discover.delay=5000
  40. # Process by lines
  41. agent.sources.ftp1.flushlines = true
  42. # Discover and process files under user's home directory
  43. agent.sources.ftp1.search.recursive = true
  44. # Do not process file while it is being written.
  45. agent.sources.ftp1.processInUse = false
  46. # If file must not be processed while it is being written, wait timeout.
  47. agent.sources.ftp1.processInUseTimeout = 30
  48. agent.sinks.k1.type = file_roll
  49. agent.sinks.k1.sink.directory = /var/log/flume-ftp
  50. agent.sinks.k1.sink.rollInterval = 7200
  51. agent.channels.ch1.type = memory
  52. agent.channels.ch1.capacity = 10000
  53. agent.channels.ch1.transactionCapacity = 1000
  54. agent.sources.ftp1.channels = ch1
  55. agent.sinks.k1.channel = ch1
  56. 8. 运行flume-ng
  57. ./bin/flume-ng agent -c conf -conf-file conf/flume-ng-ftp-source-FTP.conf --name agent -Dflume.root.logger=INFO,console

注意如果使用cdh的flume,需要将commons-net-3.3.jar, jsch-0.1.54.jar, flume-ftp-source-2.2.0.jar复制到一下目录

  1. cp ./* /opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/flume-ng/lib

如果配置flume采集后的数据发送kafka中消费

2. fpt-flume-kakfa.conf

  1. ## Sources Definition for agent "agent"
  2. #ACTIVE LIST
  3. agent.sources = ftp1
  4. agent.sinks = k1
  5. agent.channels = ch1
  6. ##### SOURCE IS ftp server
  7. # Type of source for ftp sources
  8. agent.sources.ftp1.type = org.keedio.flume.source.ftp.source.Source
  9. agent.sources.ftp1.client.source = ftp
  10. # Connection properties for ftp server
  11. agent.sources.ftp1.name.server = 192.168.1.64
  12. agent.sources.ftp1.port = 21
  13. agent.sources.ftp1.user =test
  14. agent.sources.ftp1.password =12345
  15. # Process files in
  16. agent.sources.ftp1.working.directory = /
  17. # Proces files matches (java regex for ftp-ftps)
  18. agent.sources.ftp1.filter.pattern = .+\\.txt
  19. # keep file track status in folder
  20. agent.sources.ftp1.folder = /root/download
  21. # file track status name
  22. agent.sources.ftp1.file.name = ftp1-status-file.ser
  23. # Discover delay, each configured milisecond directory will be explored
  24. agent.sources.ftp1.run.discover.delay=5000
  25. # Process by lines
  26. agent.sources.ftp1.flushlines = true
  27. # Discover and process files under user's home directory
  28. agent.sources.ftp1.search.recursive = true
  29. # Do not process file while it is being written.
  30. agent.sources.ftp1.processInUse = false
  31. # If file must not be processed while it is being written, wait timeout.
  32. agent.sources.ftp1.processInUseTimeout = 30
  33. #agent.sinks.k1.type = file_roll
  34. agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  35. agent.sinks.kafka.topic = test
  36. agent.sinks.k1.brokerList = 192.168.1.64:9092
  37. a1.sinks.k1.kafka.flumeBatchSize = 20
  38. a1.sinks.k1.kafka.producer.acks = 1
  39. #agent.sinks.k1.sink.directory = /var/log/flume-ftp
  40. #agent.sinks.k1.sink.rollInterval = 7200
  41. agent.channels.ch1.type = memory
  42. agent.channels.ch1.capacity = 10000
  43. agent.channels.ch1.transactionCapacity = 1000
  44. agent.sources.ftp1.channels = ch1
  45. agent.sinks.k1.channel = ch1