基本概念

1. Event
Event 是 Flume NG 数据传输的基本单元。类似于 JMS 和消息系统中的消息。一个 Event 由标题和正文组成:前者是键/值映射,后者是任意字节数组。
2. Source
数据收集组件,从外部数据源收集数据,并存储到 Channel 中。
3. Channel
Channel 是源和接收器之间的管道,用于临时存储数据。可以是内存或持久化的文件系统:

  • Memory Channel : 使用内存,优点是速度快,但数据可能会丢失 (如突然宕机);
  • File Channel : 使用持久化的文件系统,优点是能保证数据不丢失,但是速度慢。

4. Sink
Sink 的主要功能从 Channel 中读取 Event,并将其存入外部存储系统或将其转发到下一个 Source,成功后再从 Channel中移除 Event
5. Agent
是一个独立的 (JVM) 进程,包含 SourceChannelSink 等组件。

组件种类

Flume 中的每一个组件都提供了丰富的类型,适用于不同场景:

  • Source 类型 :内置了几十种类型,如 Avro SourceThrift SourceKafka SourceJMS Source
  • Sink 类型 :HDFS SinkHive SinkHBaseSinksAvro Sink 等;
  • Channel 类型 :Memory ChannelJDBC ChannelKafka ChannelFile Channel 等。

对于 Flume 的使用,除非有特别的需求,否则通过组合内置的各种类型的 Source,Sink 和 Channel 就能满足大多数的需求。在 Flume 官网 上对所有类型组件的配置参数均以表格的方式做了详尽的介绍,并附有配置样例;同时不同版本的参数可能略有所不同,所以使用时建议选取官网对应版本的 User Guide 作为主要参考资料。

配置文件

一个Agent中的Source、Sink 分别与Channel 相连。

  1. 定义各个组件的名字,配置Source、Sink 分别与Channel 相连 ``` .sources = .sinks = .channels =

set channel for source

.sources..channels =

set channel for sink

.sinks..channel =

  1. 2. 定义SourceSinkChannel 各自属性

.sources.. =

properties for channels

.channel.. =

properties for sinks

.sources.. =

  1. <a name="v4rYx"></a>
  2. # 拦截器
  3. 拦截器是简单的插件式组件,设置在source 和channel 之间。source 接收到的时间,在写入channel 之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个source 接收到的事件。可以自定义拦截器。
  4. <a name="aXWaV"></a>
  5. ## 自定义拦截器
  6. ```java
  7. package cn.com.bonc.interceptor;
  8. import org.apache.flume.Context;
  9. import org.apache.flume.Event;
  10. import org.apache.flume.interceptor.Interceptor;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. import java.util.ArrayList;
  14. import java.util.List;
  15. /**
  16. * 自定义拦截器,实现Interceptor接口,并且实现其抽象方法
  17. */
  18. public class CustomInterceptor implements Interceptor {
  19. //打印日志,便于测试方法的执行顺序
  20. private static final Logger logger = LoggerFactory.getLogger(CustomLogger.class);
  21. //自定义拦截器参数,用来接收自定义拦截器flume配置参数
  22. private static String param = "";
  23. /**
  24. * 拦截器构造方法,在自定义拦截器静态内部类的build方法中调用,用来创建自定义拦截器对象。
  25. */
  26. public CustomInterceptor() {
  27. logger.info("----------自定义拦截器构造方法执行");
  28. }
  29. /**
  30. * 该方法用来初始化拦截器,在拦截器的构造方法执行之后执行,也就是创建完拦截器对象之后执行
  31. */
  32. @Override
  33. public void initialize() {
  34. logger.info("----------自定义拦截器的initialize方法执行");
  35. }
  36. /**
  37. * 用来处理每一个event对象,该方法不会被系统自动调用,一般在 List<Event> intercept(List<Event> events) 方法内部调用。
  38. *
  39. * @param event
  40. * @return
  41. */
  42. @Override
  43. public Event intercept(Event event) {
  44. logger.info("----------intercept(Event event)方法执行,处理单个event");
  45. logger.info("----------接收到的自定义拦截器参数值param值为:" + param);
  46. /*
  47. 这里编写event的处理代码
  48. */
  49. return event;
  50. }
  51. /**
  52. * 用来处理一批event对象集合,集合大小与flume启动配置有关,和transactionCapacity大小保持一致。一般直接调用 Event intercept(Event event) 处理每一个event数据。
  53. *
  54. * @param events
  55. * @return
  56. */
  57. @Override
  58. public List<Event> intercept(List<Event> events) {
  59. logger.info("----------intercept(List<Event> events)方法执行");
  60. /*
  61. 这里编写对于event对象集合的处理代码,一般都是遍历event的对象集合,对于每一个event对象,调用 Event intercept(Event event) 方法,然后根据返回值是否为null,
  62. 来将其添加到新的集合中。
  63. */
  64. List<Event> results = new ArrayList<>();
  65. Event event;
  66. for (Event e : events) {
  67. event = intercept(e);
  68. if (event != null) {
  69. results.add(event);
  70. }
  71. }
  72. return results;
  73. }
  74. /**
  75. * 该方法主要用来销毁拦截器对象值执行,一般是一些释放资源的处理
  76. */
  77. @Override
  78. public void close() {
  79. logger.info("----------自定义拦截器close方法执行");
  80. }
  81. /**
  82. * 通过该静态内部类来创建自定义对象供flume使用,实现Interceptor.Builder接口,并实现其抽象方法
  83. */
  84. public static class Builder implements Interceptor.Builder {
  85. /**
  86. * 该方法主要用来返回创建的自定义类拦截器对象
  87. *
  88. * @return
  89. */
  90. @Override
  91. public Interceptor build() {
  92. logger.info("----------build方法执行");
  93. return new CustomInterceptor();
  94. }
  95. /**
  96. * 用来接收flume配置自定义拦截器参数
  97. *
  98. * @param context 通过该对象可以获取flume配置自定义拦截器的参数
  99. */
  100. @Override
  101. public void configure(Context context) {
  102. logger.info("----------configure方法执行");
  103. /*
  104. 通过调用context对象的getString方法来获取flume配置自定义拦截器的参数,方法参数要和自定义拦截器配置中的参数保持一致+
  105. */
  106. param = context.getString("param");
  107. }
  108. }
  109. }

总体架构

点击查看【processon】

  • 采取分层架构,agent 层采集数据,collector 层将agent 层的数据集中起来处理,传输到存储层存储。此处由于实时的路线只有一个节点所以没有加上collector 层,直接与kafka 对接。
  • 此处标明的预处理通过在agent层自定义拦截器实现。

    离线

    架构

    点击查看【processon】

实现

agent层与collector 层是通过collector 层的端口对接的,因此需要先启动collector 层的Flume,collector 层和agent 层的执行命令都大致相同,因采集配置文件而异

collector层

本人一般把采集的配置放到Flume 的 collect/ 目录下,此目录需手动创建

  1. 新建server-collector.conf
  2. Flume 根目录下运行,启动collector 层Agent
    1. bin/flume-ng agent \
    2. --name agent \
    3. -c conf \
    4. -f collect/agent.conf \
    5. -Dflume.root.logger=INFO,console
  • agent :配置文件中定义的
  • conf :Flume 根目录下的 conf 文件夹
  • collecto/server-collector.conf :配置文件的位置
  • -Dflume.root.logger=INFO,console :运行过程中的日志信息打印在控制台上

    agent层

  1. 自定义拦截器预处理数据,将每条日志拼接在一起

通过IDEA新建maven 的quick-start 项目,最后将项目打包成jar,放到agent层服务器上的Flume 的 lib 文件夹下

  • pom.xml
  • CustomInterceptor.java
  • FlumeBuilder.java
  1. 新建 server-agent.conf ,采集日志 /var/log/messages
  2. 启动agent层Agent
    1. bin/flume-ng agent \
    2. --name agent \
    3. -c conf \
    4. -f collect/server-agent.conf \
    5. -Dflume.root.logger=INFO,console

实时

实时路线Flume 的sink 直接与Kafka 对接,采集交换机日志

配置交换机的远程日志服务器

交换机为华为。

  1. 创建kafka主题

    1. bin/kafka-topics.sh --create \
    2. --zookeeper node01:2181 \
    3. --replication-factor 1 \
    4. --partitions 1 --topic kafka-switch
  2. 启动kafka主题

    1. bin/kafka-console-consumer.sh --bootstrap-server node01:9092 --topic kafka-switch
  3. 新建配置文件 switch-logs.conf ``` a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type=exec a1.sources.s1.command=tail -F /var/log/switch01.log a1.sources.s1.channels=c1

设置Kafka接收器

a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink

设置Kafka地址

a1.sinks.k1.brokerList=node01:9092

设置发送到Kafka上的主题

a1.sinks.k1.topic=kafka-switch

设置序列化方式

a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder a1.sinks.k1.channel=c1

a1.channels.c1.type=memory a1.channels.c1.capacity=10000 a1.channels.c1.transactionCapacity=100

  1. 3. 启动Flume
  2. ```shell
  3. bin/flume-ng agent \
  4. --name a1 \
  5. -c conf \
  6. -f collect/switch.conf \
  7. -Dflume.root.logger=INFO,console

参考

  1. Flume使用:将A服务器上的日志实时采集到B服务器上(avro source + avro sink)
  2. Flume [Bug]: org.apache.commons.cli.MissingOptionException: Missing required option: n
  3. flume 启动:No appenders could be found for logger
  4. https://github.com/heibaiying/BigData-Notes/blob/master/notes/Flume简介及基本使用.md
  5. 自定义flume拦截器