简介

Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些source。

官方也提供了自定义source的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#source根据官方说明自定义MySource需要继承AbstractSource类并实现Configurable和PollableSource接口。

需求

使用flume接收数据,并给每条数据添加前缀,输出到控制台。前缀可从flume配置文件中配置。
Flume自定义Source - 图1

编写代码

创建个Maven工程

依赖

  1. <dependency>
  2. <groupId>org.apache.flume</groupId>
  3. <artifactId>flume-ng-core</artifactId>
  4. <version>1.7.0</version>
  5. </dependency>

具体代码

  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import org.apache.flume.Context;
  4. import org.apache.flume.Event;
  5. import org.apache.flume.PollableSource;
  6. import org.apache.flume.channel.ChannelProcessor;
  7. import org.apache.flume.conf.Configurable;
  8. import org.apache.flume.event.SimpleEvent;
  9. import org.apache.flume.source.AbstractSource;
  10. /*
  11. * 使用flume接收数据,并给每条数据添加前缀,输出到控制台。前缀可从flume配置文件中配置
  12. */
  13. public class MySource extends AbstractSource implements Configurable, PollableSource {
  14. private String prefix;
  15. // 最核心方法,在process()中,创建Event,将event放入channel
  16. // Status{ READY, BACKOFF}
  17. // READY: source成功第封装了event,存入到channel,返回READY
  18. // BACKOFF: source无法封装了event,无法存入到channel,返回BACKOFF
  19. // process()方法会被Source所在的线程循环调用!
  20. @Override
  21. public Status process() {
  22. Status status = Status.READY;
  23. //封装event
  24. List<Event> datas = new ArrayList<>();
  25. for (int i = 0; i < 10; i++) {
  26. SimpleEvent e = new SimpleEvent();
  27. //向body中封装数据
  28. e.setBody((prefix + "hello" + i).getBytes());
  29. datas.add(e);
  30. }
  31. //将数据放入channel
  32. // 获取当前source对象对应的channelprocessor
  33. try {
  34. Thread.sleep(5000);
  35. ChannelProcessor cp = getChannelProcessor();
  36. cp.processEventBatch(datas);
  37. } catch (Exception e) {
  38. status = Status.BACKOFF;
  39. e.printStackTrace();
  40. }
  41. return status;
  42. }
  43. // 当source没有数据可封装时,会让source所在的线程先休息一会,休息的时间,由以下值*计数器系数
  44. @Override
  45. public long getBackOffSleepIncrement() {
  46. return 2000;
  47. }
  48. @Override
  49. public long getMaxBackOffSleepInterval() {
  50. return 5000;
  51. }
  52. // 从配置中来读取信息
  53. @Override
  54. public void configure(Context context) {
  55. //从配置文件中读取key为prefix的属性值,如果没有配置,提供默认值default:
  56. prefix = context.getString("prefix", "default:");
  57. }
  58. }

Maven打包

Maven 先clean 再package

打成 Flume-0.0.1-SNAPSHOT.jar 之后放到apache-flume-1.7.0/lib目录下面

编写配置文件

  1. # Name the components on this agent
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # Describe/configure the source
  6. a1.sources.r1.type = com.zjj.flume.custom.MySource
  7. a1.sources.r1.delay = 1000
  8. # Describe the sink
  9. a1.sinks.k1.type = logger
  10. # Use a channel which buffers events in memory
  11. a1.channels.c1.type = memory
  12. a1.channels.c1.capacity = 10000
  13. a1.channels.c1.transactionCapacity = 1000
  14. # Bind the source and sink to the channel
  15. a1.sources.r1.channels = c1
  16. a1.sinks.k1.channel = c1

启动agent

  1. [root@zjj101 job]# flume-ng agent -n a1 -c conf/ -f "/root/soft/apache-flume-1.7.0/conf/job/demo1.conf" -Dflume.root.logger=DEBUG,console

等待几秒 控制台打印内容

  1. 20/10/26 15:24:19 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 30 default:hello0 }
  2. 20/10/26 15:24:19 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 31 default:hello1 }
  3. 20/10/26 15:24:19 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 32 default:hello2 }
  4. 20/10/26 15:24:19 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 33 default:hello3 }
  5. 20/10/26 15:24:19 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 34 default:hello4 }
  6. 20/10/26 15:24:19 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 35 default:hello5 }
  7. 20/10/26 15:24:19 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 36 default:hello6 }
  8. 20/10/26 15:24:19 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 37 default:hello7 }
  9. 20/10/26 15:24:19 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 38 default:hello8 }
  10. 20/10/26 15:24:19 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 39 default:hello9 }
  11. 20/10/26 15:24:24 INFO sink.LoggerSink: Event: { headers:{} body: 64 65 66 61 75 6C 74 3A 68 65 6C 6C 6F 30 default:hello0 }

说明配置生效了..

代码

https://gitee.com/crow1/flume-my-source