尚硅谷大数据技术之Flume.docx
学习官网

Flume基础

Name 设置节点名字

  1. a1.sources = r1 设置数据来源名字为r1
  2. a1.channels = c1 设置管道名字为
  3. a1.sinks = k1 管道消费者名字

Source数据来源

  1. a1.sources.r1.type = netcat 数据来源类型
  2. * netcat为网络端口
  3. * exec 为监控数据
  4. * spooldir 为监控的目录下的新文件
  5. * TAILDIR 监控目录下多个追加文件
  6. a1.sources.r1.bind = localhost
  7. a1.sources.r1.port = 6666

Agent

Agent是一个JVM进程,它以事件的形式将数据从源头送至目的。
Agent主要有3个部分组成,Source、Channel、Sink。

Source

Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、 taildir 、sequence generator、syslog、http、legacy。

Sink

Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。
Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。

Channel

Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。
Flume自带两种Channel:Memory Channel和File Channel。
Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。

Event

传输单元,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。Event由HeaderBody两部分组成,Header用来存放该event的一些属性,为K-V结构,Body用来存放该条数据,形式为字节数组。

  1. 1. 入门案例1: 监控端口数据
  2. 配置文件名: netcat-flume-logger.conf
  3. #Named 节点名字
  4. a1.sources = r1
  5. a1.channels = c1
  6. a1.sinks = k1
  7. #Source 数据来源
  8. a1.sources.r1.type = netcat
  9. a1.sources.r1.bind = localhost
  10. a1.sources.r1.port = 6666
  11. #Channel 管道设置
  12. a1.channels.c1.type = memory
  13. a1.channels.c1.capacity = 10000
  14. a1.channels.c1.transactionCapacity = 100
  15. #Sink 消费者
  16. a1.sinks.k1.type = logger
  17. #Bind
  18. a1.sources.r1.channels = c1
  19. a1.sinks.k1.channel = c1
  20. 启动: flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/jobs/netcat-flume-logger.conf --name a1 -Dflume.root.logger=INFO,console
  1. 2. 入门案例 2.1 实时监控单个追加文件,将内容打印到控制台
  2. 配置文件名字: exec-flume-logger.conf
  3. #Named
  4. a1.sources = r1
  5. a1.channels = c1
  6. a1.sinks = k1
  7. #Source 数据来源类型为exec 在控制台执行tail指令 实时监控获取
  8. a1.sources.r1.type = exec
  9. a1.sources.r1.command = tail -f /opt/module/flume-1.9.0/jobs/tail.txt
  10. #Channel
  11. a1.channels.c1.type = memory
  12. a1.channels.c1.capacity = 10000
  13. a1.channels.c1.transactionCapacity = 100
  14. #Sink
  15. a1.sinks.k1.type = logger
  16. #Bind
  17. a1.sources.r1.channels = c1
  18. a1.sinks.k1.channel = c1
  19. 启动: flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/jobs/netcat-flume-logger.conf --name a1 -Dflume.root.logger=INFO,console
  20. 另一种启动方式
  21. 启动: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/exec-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console

3. 入门案例 2.2  实时监控单个追加文件,将内容上传到HDFS中
配置文件名字: exec-flume-hdfs.conf

#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 

#Source
a1.sources.r1.type = exec 
a1.sources.r1.command = tail -f /opt/module/flume-1.9.0/jobs/tail.txt

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink 
# 指定类型为hdfs格式
a1.sinks.k1.type = hdfs
# 指定命名路径
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0

#Bind
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

启动: flume-ng agent -c  $FLUME_HOME/conf  -f  $FLUME_HOME/jobs/exec-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
4. 入门案例 3  实时监控目录下的新文件,将内容上传到HDFS中
配置文件名字: spooling-flume-hdfs.conf

#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 

#Source 
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/module/flume/jobs/spooling
# 监控的文件后缀名
a1.sources.r1.fileSuffix = .COMPLETED
# 忽略某些文件,使用正则表达式
a1.sources.r1.ignorePattern = .*\.tmp


#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0

#Bind
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

启动: flume-ng agent -c  $FLUME_HOME/conf  -f  $FLUME_HOME/jobs/spooling-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console


5. 入门案例4   实时监控目录下多个追加文件,将内容上传到HDFS中
配置文件名字: taildir-flume-hdfs.conf

#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 

#Source
a1.sources.r1.type = TAILDIR
# 监控两个组
a1.sources.r1.filegroups = f1 f2
# 每个组监控的文件
a1.sources.r1.filegroups.f1 = /opt/module/flume/jobs/taildir/.*\.txt
a1.sources.r1.filegroups.f2 = /opt/module/flume/jobs/taildir/.*\.log
# 记录每一个文件读取的位置的文件(断点续传,方便再下次启动flume时继续采集)
a1.sources.r1.positionFile = /opt/module/flume/jobs/position/position.json

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0

#Bind
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

启动: flume-ng agent -c  $FLUME_HOME/conf  -f  $FLUME_HOME/jobs/taildir-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console



[{"inode":1317393,"pos":6,"file":"/opt/module/flume-1.9.0/jobs/taildir/file1.txt"},
{"inode":1317394,"pos":6,"file":"/opt/module/flume-1.9.0/jobs/taildir/file2.txt"}]

[{"inode":1317393,"pos":6,"file":"/opt/module/flume-1.9.0/jobs/taildir/file1.txt"},
{"inode":1317394,"pos":6,"file":"/opt/module/flume-1.9.0/jobs/taildir/file2.txt"},
{"inode":1317395,"pos":5,"file":"/opt/module/flume-1.9.0/jobs/taildir/log1.log"},
{"inode":1317396,"pos":5,"file":"/opt/module/flume-1.9.0/jobs/taildir/log2.log"}]

Flume高阶

复制案例

1. 复制案例
监控文件内容的变动,将监控到的内容分别给到Flume2和Flume3,Flume2将内容写到HDFS,Flume将文件写到HDFS

flume1.conf  
#Named
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

#Source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/flume/jobs/taildir/.*\.txt
a1.sources.r1.positionFile = /opt/module/flume/jobs/position/position.json

#channel selector
a1.sources.r1.selector.type = replicating

#Channel
a1.channels.c1.type = memory
# channel容量
a1.channels.c1.capacity = 10000
# 事务的列表容量
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100

#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888

#Bind
a1.sources.r1.channels = c1 c2  
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c2 


flume2.conf

a2.sources = r1
a2.channels = c1
a2.sinks = k1 

#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
a2.sinks.k1.hdfs.filePrefix = logs-
a2.sinks.k1.hdfs.round = true
a2.sinks.k1.hdfs.roundValue = 1
a2.sinks.k1.hdfs.roundUnit = hour
a2.sinks.k1.hdfs.useLocalTimeStamp = true
a2.sinks.k1.hdfs.batchSize = 100
a2.sinks.k1.hdfs.fileType = DataStream
a2.sinks.k1.hdfs.rollInterval = 60
a2.sinks.k1.hdfs.rollSize = 134217700
a2.sinks.k1.hdfs.rollCount = 0

#Bind
a2.sources.r1.channels = c1 
a2.sinks.k1.channel = c1 


flume3.conf 
#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1 

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/flume/jobs/fileroll

#Bind
a3.sources.r1.channels = c1 
a3.sinks.k1.channel = c1 


启动:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume3.conf -n a3 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume2.conf -n a2 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume1.conf -n a1 -Dflume.root.logger=INFO,console

负载均衡

flume1.conf 
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2

#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666

#channel selector
a1.sources.r1.selector.type = replicating

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888

#Sink processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = random 


#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c1




flume2.conf

a2.sources = r1
a2.channels = c1
a2.sinks = k1 

#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = logger

#Bind
a2.sources.r1.channels = c1 
a2.sinks.k1.channel = c1 


flume3.conf 

#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1 

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = logger

#Bind
a3.sources.r1.channels = c1 
a3.sinks.k1.channel = c1 


启动:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume3.conf -n a3 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume2.conf -n a2 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume1.conf -n a1 -Dflume.root.logger=INFO,console

故障转移

3. 故障转移案例
flume1.conf 
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2

#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666

#channel selector
a1.sources.r1.selector.type = replicating

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888

#Sink processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10


#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c1




flume2.conf

a2.sources = r1
a2.channels = c1
a2.sinks = k1 

#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = logger

#Bind
a2.sources.r1.channels = c1 
a2.sinks.k1.channel = c1 


flume3.conf 

#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1 

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = logger

#Bind
a3.sources.r1.channels = c1 
a3.sinks.k1.channel = c1 


启动:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume3.conf -n a3 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume2.conf -n a2 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume1.conf -n a1 -Dflume.root.logger=INFO,console

聚合案例

4. 聚合案例
flume1.conf 
#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#Source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 8888

#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 



flume2.conf

a2.sources = r1
a2.channels = c1
a2.sinks = k1 

#Source
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 6666

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 8888

#Bind
a2.sources.r1.channels = c1 
a2.sinks.k1.channel = c1 


flume3.conf 

#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1 

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 8888

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = logger

#Bind
a3.sources.r1.channels = c1 
a3.sinks.k1.channel = c1 


启动:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggre/flume3.conf -n a3 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggre/flume2.conf -n a2 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggre/flume1.conf -n a1 -Dflume.root.logger=INFO,console

多路案例

自定义拦截器

编写Java代码定义拦截器,打成Jar包放入flume的lib目录里面,实现功能:将含有atguigu的字段发送给flume2管道,含有shangguigu的数据发送给flume3管道,其他发送给flume4管道

package com.atguigu.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

/**
 * 自定义拦截器
 */
public class EventHeaderInterceptor implements Interceptor {
    @Override
    public void initialize() {}

    /**
     * 拦截方法
     * @param event 当前被拦截的event
     * @return
     */
    @Override
    public Event intercept(Event event) {
        //1.获取event的headers
        Map<String, String> headers = event.getHeaders();
        //2. 获取event的body
        String body = new String(event.getBody(), StandardCharsets.UTF_8);
        //3.判断body中是否半酣“atguigu”,"shangguigu"
        if (body.contains("atguigu")){
            headers.put("title","at");
        }else if (body.contains("shangguigu")){
            headers.put("title","st");
        }
        return event;
    }


    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {}

    public static class MyBuilder implements Builder{

        @Override
        public Interceptor build() {
            return new EventHeaderInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}
5. 多路案例
flume1.conf 
#Named
a1.sources = r1
a1.channels = c1 c2 c3
a1.sinks = k1 k2 k3

#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 5555

#channel selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = title
a1.sources.r1.selector.mapping.at = c1
a1.sources.r1.selector.mapping.st = c2
a1.sources.r1.selector.default = c3

# Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.EventHeaderInterceptor$MyBuilder

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100


a1.channels.c3.type = memory
a1.channels.c3.capacity = 10000
a1.channels.c3.transactionCapacity = 100


#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 6666

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 7777

a1.sinks.k3.type = avro
a1.sinks.k3.hostname = localhost
a1.sinks.k3.port = 8888

#Bind
a1.sources.r1.channels = c1 c2 c3   
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c2 
a1.sinks.k3.channel = c3 




flume2.conf

a2.sources = r1
a2.channels = c1
a2.sinks = k1 

#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 6666

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = logger

#Bind
a2.sources.r1.channels = c1 
a2.sinks.k1.channel = c1 


flume3.conf 

#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1 

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 7777

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = logger

#Bind
a3.sources.r1.channels = c1 
a3.sinks.k1.channel = c1 



flume4.conf 

#Named
a4.sources = r1
a4.channels = c1
a4.sinks = k1 

#Source
a4.sources.r1.type = avro
a4.sources.r1.bind = localhost
a4.sources.r1.port = 8888

#Channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100

#Sink
a4.sinks.k1.type = logger

#Bind
a4.sources.r1.channels = c1 
a4.sinks.k1.channel = c1 


启动:
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume4.conf -n a4 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume3.conf -n a3 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume2.conf -n a2 -Dflume.root.logger=INFO,console

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume1.conf -n a1 -Dflume.root.logger=INFO,console

自定义Source

编写java代码,导出为jar包,并放入flume的lib目录中
实现功能:

package com.atguigu.flume.source;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * 自定义Source 需要继承AbstractSource,实现 Configurable ,PollableSource接口.
 */
public class MySource extends AbstractSource implements Configurable, PollableSource {
    private String prefix;

    /**
     * Source的核心处理方法,
     *
     * 该方法在flume的处理流程中是循环调用的。
     * @return
     * @throws EventDeliveryException
     */
    @Override
    public Status process() throws EventDeliveryException {
        //休眠一秒中
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        Status status = null ;
        try {

            // Receive new data
            // 采集数据,封装成event对象
            Event e = getSomeData();

            // Store the Event into this Source's associated Channel(s)
            // 将event对象交给ChannelProcessor进行处理
            getChannelProcessor().processEvent(e);
            // 正常处理,返回Status.READY
            status = Status.READY;
        } catch (Throwable t) {
            // Log exception, handle individual exceptions as needed
            // 处理失败,返回 Status.BACKOFF
            status = Status.BACKOFF;
        }
        return status;
    }

    /**
     * 随机生成一个字符串作为采集的数据
     * @return
     */
    private Event getSomeData() {
        String data  = UUID.randomUUID().toString();
        String resultData = prefix + data ;
        SimpleEvent event = new SimpleEvent();
        event.setBody(resultData.getBytes(StandardCharsets.UTF_8));
        event.getHeaders().put("author","wyh");
        return event ;
    }

    /**
     * 规避时间的增长步长
     * @return
     */
    @Override
    public long getBackOffSleepIncrement() {
        return 1;
    }

    /**
     * 最大的规避时间
     * @return
     */
    @Override
    public long getMaxBackOffSleepInterval() {
        return 10;
    }

    /**
     * 用于读取flume的配置信息 xxx.conf
     * @param context
     */
    @Override
    public void configure(Context context) {
        //读取source的prefix属性,默认为log-
        prefix = context.getString("prefix","log-");
    }
}
6. 自定义Source
flume4.conf 

#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 

#Source
a1.sources.r1.type = com.atguigu.flume.source.MySource
a1.sources.r1.prefix = log--

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = logger

#Bind
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/mysource-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console

自定义Sink

编写java代码用来接收数据

package com.atguigu.flume.sink;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;

/**
 * 自定义Sink ,需要继承Flume提供的AbstractSink,实现Configurable接口
 */
public class MySink extends AbstractSink implements Configurable {

    Logger logger = LoggerFactory.getLogger(MySink.class);

    /**
     * 核心处理方法
     *
     * 该方法在flume的处理流程中是循环调用的.
     * @return
     * @throws EventDeliveryException
     */
    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;

        // Start transaction
        //获取Channel
        Channel ch = getChannel();
        //获取事务对象
        Transaction txn = ch.getTransaction();
        //开启事务
        txn.begin();
        try {
            // 从channel中获取event
            Event event = ch.take();

            // 处理event
            storeSomeData(event);
            // 处理成功,提交事务
            txn.commit();
            status = Status.READY;
        } catch (Throwable t) {
            // 处理失败,回滚事务
            txn.rollback();

            status = Status.BACKOFF;

        } finally{
            //不论事务成功与否。都要关闭
            txn.close();
        }
        return status;
    }

    private void storeSomeData(Event event) {
        String printData  = event.getHeaders() + " ::: "+ new String(event.getBody(), StandardCharsets.UTF_8);
        logger.info(printData);
    }

    @Override
    public void configure(Context context) {

    }
}
7. 自定义Sink

#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 

#Source
a1.sources.r1.type = com.atguigu.flume.source.MySource
a1.sources.r1.prefix = log--

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = com.atguigu.flume.sink.MySink

#Bind
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/mysource-flume-mysink.conf -n a1 -Dflume.root.logger=INFO,console

ganglia监控

8. ganglia监控
flume-ng agent \
-c $FLUME_HOME/conf \
-n a1 \
-f $FLUME_HOME/jobs/netcat-flume-logger.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=ganglia \
-Dflume.monitoring.hosts=hadoop102:8649