基本概念
1. EventEvent 是 Flume NG 数据传输的基本单元。类似于 JMS 和消息系统中的消息。一个 Event 由标题和正文组成:前者是键/值映射,后者是任意字节数组。
2. Source
数据收集组件,从外部数据源收集数据,并存储到 Channel 中。
3. ChannelChannel 是源和接收器之间的管道,用于临时存储数据。可以是内存或持久化的文件系统:
Memory Channel: 使用内存,优点是速度快,但数据可能会丢失 (如突然宕机);File Channel: 使用持久化的文件系统,优点是能保证数据不丢失,但是速度慢。
4. SinkSink 的主要功能从 Channel 中读取 Event,并将其存入外部存储系统或将其转发到下一个 Source,成功后再从 Channel中移除 Event。
5. Agent
是一个独立的 (JVM) 进程,包含 Source、 Channel、 Sink 等组件。
组件种类
Flume 中的每一个组件都提供了丰富的类型,适用于不同场景:
- Source 类型 :内置了几十种类型,如
Avro Source,Thrift Source,Kafka Source,JMS Source; - Sink 类型 :
HDFS Sink,Hive Sink,HBaseSinks,Avro Sink等; - Channel 类型 :
Memory Channel,JDBC Channel,Kafka Channel,File Channel等。
对于 Flume 的使用,除非有特别的需求,否则通过组合内置的各种类型的 Source,Sink 和 Channel 就能满足大多数的需求。在 Flume 官网 上对所有类型组件的配置参数均以表格的方式做了详尽的介绍,并附有配置样例;同时不同版本的参数可能略有所不同,所以使用时建议选取官网对应版本的 User Guide 作为主要参考资料。
配置文件
一个Agent中的Source、Sink 分别与Channel 相连。
- 定义各个组件的名字,配置Source、Sink 分别与Channel 相连
```
.sources = .sinks = .channels =
set channel for source
set channel for sink
2. 定义Source,Sink,Channel 各自属性
properties for channels
properties for sinks
<a name="v4rYx"></a># 拦截器拦截器是简单的插件式组件,设置在source 和channel 之间。source 接收到的时间,在写入channel 之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个source 接收到的事件。可以自定义拦截器。<a name="aXWaV"></a>## 自定义拦截器```javapackage cn.com.bonc.interceptor;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.ArrayList;import java.util.List;/*** 自定义拦截器,实现Interceptor接口,并且实现其抽象方法*/public class CustomInterceptor implements Interceptor {//打印日志,便于测试方法的执行顺序private static final Logger logger = LoggerFactory.getLogger(CustomLogger.class);//自定义拦截器参数,用来接收自定义拦截器flume配置参数private static String param = "";/*** 拦截器构造方法,在自定义拦截器静态内部类的build方法中调用,用来创建自定义拦截器对象。*/public CustomInterceptor() {logger.info("----------自定义拦截器构造方法执行");}/*** 该方法用来初始化拦截器,在拦截器的构造方法执行之后执行,也就是创建完拦截器对象之后执行*/@Overridepublic void initialize() {logger.info("----------自定义拦截器的initialize方法执行");}/*** 用来处理每一个event对象,该方法不会被系统自动调用,一般在 List<Event> intercept(List<Event> events) 方法内部调用。** @param event* @return*/@Overridepublic Event intercept(Event event) {logger.info("----------intercept(Event event)方法执行,处理单个event");logger.info("----------接收到的自定义拦截器参数值param值为:" + param);/*这里编写event的处理代码*/return event;}/*** 用来处理一批event对象集合,集合大小与flume启动配置有关,和transactionCapacity大小保持一致。一般直接调用 Event intercept(Event event) 处理每一个event数据。** @param events* @return*/@Overridepublic List<Event> intercept(List<Event> events) {logger.info("----------intercept(List<Event> events)方法执行");/*这里编写对于event对象集合的处理代码,一般都是遍历event的对象集合,对于每一个event对象,调用 Event intercept(Event event) 方法,然后根据返回值是否为null,来将其添加到新的集合中。*/List<Event> results = new ArrayList<>();Event event;for (Event e : events) {event = intercept(e);if (event != null) {results.add(event);}}return results;}/*** 该方法主要用来销毁拦截器对象值执行,一般是一些释放资源的处理*/@Overridepublic void close() {logger.info("----------自定义拦截器close方法执行");}/*** 通过该静态内部类来创建自定义对象供flume使用,实现Interceptor.Builder接口,并实现其抽象方法*/public static class Builder implements Interceptor.Builder {/*** 该方法主要用来返回创建的自定义类拦截器对象** @return*/@Overridepublic Interceptor build() {logger.info("----------build方法执行");return new CustomInterceptor();}/*** 用来接收flume配置自定义拦截器参数** @param context 通过该对象可以获取flume配置自定义拦截器的参数*/@Overridepublic void configure(Context context) {logger.info("----------configure方法执行");/*通过调用context对象的getString方法来获取flume配置自定义拦截器的参数,方法参数要和自定义拦截器配置中的参数保持一致+*/param = context.getString("param");}}}
总体架构
- 采取分层架构,agent 层采集数据,collector 层将agent 层的数据集中起来处理,传输到存储层存储。此处由于实时的路线只有一个节点所以没有加上collector 层,直接与kafka 对接。
- 此处标明的预处理通过在agent层自定义拦截器实现。
离线
架构
点击查看【processon】
实现
agent层与collector 层是通过collector 层的端口对接的,因此需要先启动collector 层的Flume,collector 层和agent 层的执行命令都大致相同,因采集配置文件而异
collector层
本人一般把采集的配置放到Flume 的 collect/ 目录下,此目录需手动创建
- 新建server-collector.conf
- Flume 根目录下运行,启动collector 层Agent
bin/flume-ng agent \--name agent \-c conf \-f collect/agent.conf \-Dflume.root.logger=INFO,console
agent:配置文件中定义的conf:Flume 根目录下的conf文件夹collecto/server-collector.conf:配置文件的位置-Dflume.root.logger=INFO,console:运行过程中的日志信息打印在控制台上agent层
- 自定义拦截器预处理数据,将每条日志拼接在一起
通过IDEA新建maven 的quick-start 项目,最后将项目打包成jar,放到agent层服务器上的Flume 的 lib 文件夹下
- pom.xml
- CustomInterceptor.java
- FlumeBuilder.java
- 新建
server-agent.conf,采集日志/var/log/messages - 启动agent层Agent
bin/flume-ng agent \--name agent \-c conf \-f collect/server-agent.conf \-Dflume.root.logger=INFO,console
实时
实时路线Flume 的sink 直接与Kafka 对接,采集交换机日志
配置交换机的远程日志服务器
交换机为华为。
创建kafka主题
bin/kafka-topics.sh --create \--zookeeper node01:2181 \--replication-factor 1 \--partitions 1 --topic kafka-switch
启动kafka主题
bin/kafka-console-consumer.sh --bootstrap-server node01:9092 --topic kafka-switch
新建配置文件
switch-logs.conf``` a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type=exec a1.sources.s1.command=tail -F /var/log/switch01.log a1.sources.s1.channels=c1
设置Kafka接收器
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
设置Kafka地址
a1.sinks.k1.brokerList=node01:9092
设置发送到Kafka上的主题
a1.sinks.k1.topic=kafka-switch
设置序列化方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder a1.sinks.k1.channel=c1
a1.channels.c1.type=memory a1.channels.c1.capacity=10000 a1.channels.c1.transactionCapacity=100
3. 启动Flume```shellbin/flume-ng agent \--name a1 \-c conf \-f collect/switch.conf \-Dflume.root.logger=INFO,console
