Flume基础
Name 设置节点名字
a1.sources = r1 设置数据来源名字为r1a1.channels = c1 设置管道名字为a1.sinks = k1 管道消费者名字
Source数据来源
a1.sources.r1.type = netcat 数据来源类型* netcat为网络端口* exec 为监控数据* spooldir 为监控的目录下的新文件* TAILDIR 监控目录下多个追加文件a1.sources.r1.bind = localhosta1.sources.r1.port = 6666
Agent
Agent是一个JVM进程,它以事件的形式将数据从源头送至目的。
Agent主要有3个部分组成,Source、Channel、Sink。
Source
Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、 taildir 、sequence generator、syslog、http、legacy。
Sink
Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。
Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。
Channel
Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。
Flume自带两种Channel:Memory Channel和File Channel。
Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
Event
传输单元,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。Event由Header和Body两部分组成,Header用来存放该event的一些属性,为K-V结构,Body用来存放该条数据,形式为字节数组。
1. 入门案例1: 监控端口数据配置文件名: netcat-flume-logger.conf#Named 节点名字a1.sources = r1a1.channels = c1a1.sinks = k1#Source 数据来源a1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 6666#Channel 管道设置a1.channels.c1.type = memorya1.channels.c1.capacity = 10000a1.channels.c1.transactionCapacity = 100#Sink 消费者a1.sinks.k1.type = logger#Binda1.sources.r1.channels = c1a1.sinks.k1.channel = c1启动: flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/jobs/netcat-flume-logger.conf --name a1 -Dflume.root.logger=INFO,console
2. 入门案例 2.1 实时监控单个追加文件,将内容打印到控制台配置文件名字: exec-flume-logger.conf#Nameda1.sources = r1a1.channels = c1a1.sinks = k1#Source 数据来源类型为exec 在控制台执行tail指令 实时监控获取a1.sources.r1.type = execa1.sources.r1.command = tail -f /opt/module/flume-1.9.0/jobs/tail.txt#Channela1.channels.c1.type = memorya1.channels.c1.capacity = 10000a1.channels.c1.transactionCapacity = 100#Sinka1.sinks.k1.type = logger#Binda1.sources.r1.channels = c1a1.sinks.k1.channel = c1启动: flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/jobs/netcat-flume-logger.conf --name a1 -Dflume.root.logger=INFO,console另一种启动方式启动: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/exec-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console
3. 入门案例 2.2 实时监控单个追加文件,将内容上传到HDFS中
配置文件名字: exec-flume-hdfs.conf
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#Source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /opt/module/flume-1.9.0/jobs/tail.txt
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#Sink
# 指定类型为hdfs格式
a1.sinks.k1.type = hdfs
# 指定命名路径
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/exec-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
4. 入门案例 3 实时监控目录下的新文件,将内容上传到HDFS中
配置文件名字: spooling-flume-hdfs.conf
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#Source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/module/flume/jobs/spooling
# 监控的文件后缀名
a1.sources.r1.fileSuffix = .COMPLETED
# 忽略某些文件,使用正则表达式
a1.sources.r1.ignorePattern = .*\.tmp
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/spooling-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
5. 入门案例4 实时监控目录下多个追加文件,将内容上传到HDFS中
配置文件名字: taildir-flume-hdfs.conf
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#Source
a1.sources.r1.type = TAILDIR
# 监控两个组
a1.sources.r1.filegroups = f1 f2
# 每个组监控的文件
a1.sources.r1.filegroups.f1 = /opt/module/flume/jobs/taildir/.*\.txt
a1.sources.r1.filegroups.f2 = /opt/module/flume/jobs/taildir/.*\.log
# 记录每一个文件读取的位置的文件(断点续传,方便再下次启动flume时继续采集)
a1.sources.r1.positionFile = /opt/module/flume/jobs/position/position.json
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/taildir-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
[{"inode":1317393,"pos":6,"file":"/opt/module/flume-1.9.0/jobs/taildir/file1.txt"},
{"inode":1317394,"pos":6,"file":"/opt/module/flume-1.9.0/jobs/taildir/file2.txt"}]
[{"inode":1317393,"pos":6,"file":"/opt/module/flume-1.9.0/jobs/taildir/file1.txt"},
{"inode":1317394,"pos":6,"file":"/opt/module/flume-1.9.0/jobs/taildir/file2.txt"},
{"inode":1317395,"pos":5,"file":"/opt/module/flume-1.9.0/jobs/taildir/log1.log"},
{"inode":1317396,"pos":5,"file":"/opt/module/flume-1.9.0/jobs/taildir/log2.log"}]
Flume高阶
复制案例
1. 复制案例
监控文件内容的变动,将监控到的内容分别给到Flume2和Flume3,Flume2将内容写到HDFS,Flume将文件写到HDFS
flume1.conf
#Named
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
#Source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/flume/jobs/taildir/.*\.txt
a1.sources.r1.positionFile = /opt/module/flume/jobs/position/position.json
#channel selector
a1.sources.r1.selector.type = replicating
#Channel
a1.channels.c1.type = memory
# channel容量
a1.channels.c1.capacity = 10000
# 事务的列表容量
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100
#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888
#Bind
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
flume2.conf
a2.sources = r1
a2.channels = c1
a2.sinks = k1
#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777
#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100
#Sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
a2.sinks.k1.hdfs.filePrefix = logs-
a2.sinks.k1.hdfs.round = true
a2.sinks.k1.hdfs.roundValue = 1
a2.sinks.k1.hdfs.roundUnit = hour
a2.sinks.k1.hdfs.useLocalTimeStamp = true
a2.sinks.k1.hdfs.batchSize = 100
a2.sinks.k1.hdfs.fileType = DataStream
a2.sinks.k1.hdfs.rollInterval = 60
a2.sinks.k1.hdfs.rollSize = 134217700
a2.sinks.k1.hdfs.rollCount = 0
#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume3.conf
#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888
#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
#Sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/flume/jobs/fileroll
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
启动:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume3.conf -n a3 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume2.conf -n a2 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume1.conf -n a1 -Dflume.root.logger=INFO,console
负载均衡
flume1.conf
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666
#channel selector
a1.sources.r1.selector.type = replicating
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888
#Sink processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = random
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
flume2.conf
a2.sources = r1
a2.channels = c1
a2.sinks = k1
#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777
#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100
#Sink
a2.sinks.k1.type = logger
#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume3.conf
#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888
#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
#Sink
a3.sinks.k1.type = logger
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
启动:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume3.conf -n a3 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume2.conf -n a2 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume1.conf -n a1 -Dflume.root.logger=INFO,console
故障转移
3. 故障转移案例
flume1.conf
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666
#channel selector
a1.sources.r1.selector.type = replicating
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888
#Sink processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
flume2.conf
a2.sources = r1
a2.channels = c1
a2.sinks = k1
#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777
#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100
#Sink
a2.sinks.k1.type = logger
#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume3.conf
#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888
#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
#Sink
a3.sinks.k1.type = logger
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
启动:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume3.conf -n a3 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume2.conf -n a2 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume1.conf -n a1 -Dflume.root.logger=INFO,console
聚合案例
4. 聚合案例
flume1.conf
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#Source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 8888
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume2.conf
a2.sources = r1
a2.channels = c1
a2.sinks = k1
#Source
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 6666
#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100
#Sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 8888
#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume3.conf
#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 8888
#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
#Sink
a3.sinks.k1.type = logger
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
启动:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggre/flume3.conf -n a3 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggre/flume2.conf -n a2 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggre/flume1.conf -n a1 -Dflume.root.logger=INFO,console
多路案例
自定义拦截器
编写Java代码定义拦截器,打成Jar包放入flume的lib目录里面,实现功能:将含有atguigu的字段发送给flume2管道,含有shangguigu的数据发送给flume3管道,其他发送给flume4管道
package com.atguigu.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
/**
* 自定义拦截器
*/
public class EventHeaderInterceptor implements Interceptor {
@Override
public void initialize() {}
/**
* 拦截方法
* @param event 当前被拦截的event
* @return
*/
@Override
public Event intercept(Event event) {
//1.获取event的headers
Map<String, String> headers = event.getHeaders();
//2. 获取event的body
String body = new String(event.getBody(), StandardCharsets.UTF_8);
//3.判断body中是否半酣“atguigu”,"shangguigu"
if (body.contains("atguigu")){
headers.put("title","at");
}else if (body.contains("shangguigu")){
headers.put("title","st");
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}
@Override
public void close() {}
public static class MyBuilder implements Builder{
@Override
public Interceptor build() {
return new EventHeaderInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
5. 多路案例
flume1.conf
#Named
a1.sources = r1
a1.channels = c1 c2 c3
a1.sinks = k1 k2 k3
#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 5555
#channel selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = title
a1.sources.r1.selector.mapping.at = c1
a1.sources.r1.selector.mapping.st = c2
a1.sources.r1.selector.default = c3
# Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.EventHeaderInterceptor$MyBuilder
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100
a1.channels.c3.type = memory
a1.channels.c3.capacity = 10000
a1.channels.c3.transactionCapacity = 100
#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 6666
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 7777
a1.sinks.k3.type = avro
a1.sinks.k3.hostname = localhost
a1.sinks.k3.port = 8888
#Bind
a1.sources.r1.channels = c1 c2 c3
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3
flume2.conf
a2.sources = r1
a2.channels = c1
a2.sinks = k1
#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 6666
#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100
#Sink
a2.sinks.k1.type = logger
#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
flume3.conf
#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 7777
#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
#Sink
a3.sinks.k1.type = logger
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
flume4.conf
#Named
a4.sources = r1
a4.channels = c1
a4.sinks = k1
#Source
a4.sources.r1.type = avro
a4.sources.r1.bind = localhost
a4.sources.r1.port = 8888
#Channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100
#Sink
a4.sinks.k1.type = logger
#Bind
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1
启动:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume4.conf -n a4 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume3.conf -n a3 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume2.conf -n a2 -Dflume.root.logger=INFO,console
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume1.conf -n a1 -Dflume.root.logger=INFO,console
自定义Source
编写java代码,导出为jar包,并放入flume的lib目录中
实现功能:
package com.atguigu.flume.source;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* 自定义Source 需要继承AbstractSource,实现 Configurable ,PollableSource接口.
*/
public class MySource extends AbstractSource implements Configurable, PollableSource {
private String prefix;
/**
* Source的核心处理方法,
*
* 该方法在flume的处理流程中是循环调用的。
* @return
* @throws EventDeliveryException
*/
@Override
public Status process() throws EventDeliveryException {
//休眠一秒中
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
Status status = null ;
try {
// Receive new data
// 采集数据,封装成event对象
Event e = getSomeData();
// Store the Event into this Source's associated Channel(s)
// 将event对象交给ChannelProcessor进行处理
getChannelProcessor().processEvent(e);
// 正常处理,返回Status.READY
status = Status.READY;
} catch (Throwable t) {
// Log exception, handle individual exceptions as needed
// 处理失败,返回 Status.BACKOFF
status = Status.BACKOFF;
}
return status;
}
/**
* 随机生成一个字符串作为采集的数据
* @return
*/
private Event getSomeData() {
String data = UUID.randomUUID().toString();
String resultData = prefix + data ;
SimpleEvent event = new SimpleEvent();
event.setBody(resultData.getBytes(StandardCharsets.UTF_8));
event.getHeaders().put("author","wyh");
return event ;
}
/**
* 规避时间的增长步长
* @return
*/
@Override
public long getBackOffSleepIncrement() {
return 1;
}
/**
* 最大的规避时间
* @return
*/
@Override
public long getMaxBackOffSleepInterval() {
return 10;
}
/**
* 用于读取flume的配置信息 xxx.conf
* @param context
*/
@Override
public void configure(Context context) {
//读取source的prefix属性,默认为log-
prefix = context.getString("prefix","log-");
}
}
6. 自定义Source
flume4.conf
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#Source
a1.sources.r1.type = com.atguigu.flume.source.MySource
a1.sources.r1.prefix = log--
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = logger
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/mysource-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console
自定义Sink
编写java代码用来接收数据
package com.atguigu.flume.sink;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
/**
* 自定义Sink ,需要继承Flume提供的AbstractSink,实现Configurable接口
*/
public class MySink extends AbstractSink implements Configurable {
Logger logger = LoggerFactory.getLogger(MySink.class);
/**
* 核心处理方法
*
* 该方法在flume的处理流程中是循环调用的.
* @return
* @throws EventDeliveryException
*/
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
//获取Channel
Channel ch = getChannel();
//获取事务对象
Transaction txn = ch.getTransaction();
//开启事务
txn.begin();
try {
// 从channel中获取event
Event event = ch.take();
// 处理event
storeSomeData(event);
// 处理成功,提交事务
txn.commit();
status = Status.READY;
} catch (Throwable t) {
// 处理失败,回滚事务
txn.rollback();
status = Status.BACKOFF;
} finally{
//不论事务成功与否。都要关闭
txn.close();
}
return status;
}
private void storeSomeData(Event event) {
String printData = event.getHeaders() + " ::: "+ new String(event.getBody(), StandardCharsets.UTF_8);
logger.info(printData);
}
@Override
public void configure(Context context) {
}
}
7. 自定义Sink
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#Source
a1.sources.r1.type = com.atguigu.flume.source.MySource
a1.sources.r1.prefix = log--
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = com.atguigu.flume.sink.MySink
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/mysource-flume-mysink.conf -n a1 -Dflume.root.logger=INFO,console
ganglia监控
8. ganglia监控
flume-ng agent \
-c $FLUME_HOME/conf \
-n a1 \
-f $FLUME_HOME/jobs/netcat-flume-logger.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=ganglia \
-Dflume.monitoring.hosts=hadoop102:8649
