1.fluem 实时监测服务器端文件
1.下载插件
git clone https://github.com/keedio/flume-ftp-source.git
2.编译
mvn clean package -DskipTests
3.将生产的jar拷贝到$FLUME_HOME/lib/
cp flume-ftp-source-2.2.0.jar $FLUME_HOME/lib/
4.下载依赖jar
wget http://central.maven.org/maven2/commons-net/commons-net/3.3/commons-net-3.3.jar
wget http://central.maven.org/maven2/com/jcraft/jsch/0.1.54/jsch-0.1.54.jar
5. 将commons-net-3.3.jar 和 jsch-0.1.54.jar 拷贝到$FLUME_HOME/lib/
cp commons-net-3.3.jar $FLUME_HOME/lib/
cp jsch-0.1.54.jar $FLUME_HOME/lib/
6. 修改conf文件
touch flume-ng-ftp-source-FTP.conf
7. 添加修改的内容(flume收集文件后保存到文件中)
## Sources Definition for agent "agent"
#ACTIVE LIST
agent.sources = ftp1
agent.sinks = k1
agent.channels = ch1
##### SOURCE IS ftp server
# Type of source for ftp sources
agent.sources.ftp1.type = org.keedio.flume.source.ftp.source.Source
agent.sources.ftp1.client.source = ftp
# Connection properties for ftp server
agent.sources.ftp1.name.server = 192.168.1.64
agent.sources.ftp1.port = 21
agent.sources.ftp1.user =test
agent.sources.ftp1.password =12345
# Process files in
agent.sources.ftp1.working.directory = /
# Proces files matches (java regex for ftp-ftps)
agent.sources.ftp1.filter.pattern = .+\\.txt
# keep file track status in folder
agent.sources.ftp1.folder = /root/download
# file track status name
agent.sources.ftp1.file.name = ftp1-status-file.ser
# Discover delay, each configured milisecond directory will be explored
agent.sources.ftp1.run.discover.delay=5000
# Process by lines
agent.sources.ftp1.flushlines = true
# Discover and process files under user's home directory
agent.sources.ftp1.search.recursive = true
# Do not process file while it is being written.
agent.sources.ftp1.processInUse = false
# If file must not be processed while it is being written, wait timeout.
agent.sources.ftp1.processInUseTimeout = 30
agent.sinks.k1.type = file_roll
agent.sinks.k1.sink.directory = /var/log/flume-ftp
agent.sinks.k1.sink.rollInterval = 7200
agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 10000
agent.channels.ch1.transactionCapacity = 1000
agent.sources.ftp1.channels = ch1
agent.sinks.k1.channel = ch1
8. 运行flume-ng
./bin/flume-ng agent -c conf -conf-file conf/flume-ng-ftp-source-FTP.conf --name agent -Dflume.root.logger=INFO,console
注意如果使用cdh的flume,需要将commons-net-3.3.jar, jsch-0.1.54.jar, flume-ftp-source-2.2.0.jar复制到一下目录
cp ./* /opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/flume-ng/lib
如果配置flume采集后的数据发送kafka中消费
2. fpt-flume-kakfa.conf
## Sources Definition for agent "agent"
#ACTIVE LIST
agent.sources = ftp1
agent.sinks = k1
agent.channels = ch1
##### SOURCE IS ftp server
# Type of source for ftp sources
agent.sources.ftp1.type = org.keedio.flume.source.ftp.source.Source
agent.sources.ftp1.client.source = ftp
# Connection properties for ftp server
agent.sources.ftp1.name.server = 192.168.1.64
agent.sources.ftp1.port = 21
agent.sources.ftp1.user =test
agent.sources.ftp1.password =12345
# Process files in
agent.sources.ftp1.working.directory = /
# Proces files matches (java regex for ftp-ftps)
agent.sources.ftp1.filter.pattern = .+\\.txt
# keep file track status in folder
agent.sources.ftp1.folder = /root/download
# file track status name
agent.sources.ftp1.file.name = ftp1-status-file.ser
# Discover delay, each configured milisecond directory will be explored
agent.sources.ftp1.run.discover.delay=5000
# Process by lines
agent.sources.ftp1.flushlines = true
# Discover and process files under user's home directory
agent.sources.ftp1.search.recursive = true
# Do not process file while it is being written.
agent.sources.ftp1.processInUse = false
# If file must not be processed while it is being written, wait timeout.
agent.sources.ftp1.processInUseTimeout = 30
#agent.sinks.k1.type = file_roll
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka.topic = test
agent.sinks.k1.brokerList = 192.168.1.64:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
#agent.sinks.k1.sink.directory = /var/log/flume-ftp
#agent.sinks.k1.sink.rollInterval = 7200
agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 10000
agent.channels.ch1.transactionCapacity = 1000
agent.sources.ftp1.channels = ch1
agent.sinks.k1.channel = ch1