Flume 插件

一. Flume HTTPSource 自定义 HTTP 源

测试

  1. curl -X POST -H "Content-Type:application/json;charset=UTF-8" -d '{"custom_ip":"192.168.1.1","host":"random_host.example.com"}' http://hostname:port
  2. curl -X POST -H "Content-Type:application/json;charset=UTF-8" -d 'aaa,bbb,ccc' http://hostname:port

Flume 中配置案例

  1. # 配置需要处理的 srouce channels sinks
  2. agentDw.sources = srcSafeRealtimeClickLogHttp
  3. agentDw.channels = chSafeRealtimeClickLogHttp
  4. agentDw.sinks = sinkSafeRealtimeClickLogHttp1 sinkSafeRealtimeClickLogHttp2 sinkSafeRealtimeClickLogHttp3
  5. # 对所有的出口 slink 做 Load balancing Sink Processor 负载平衡处理器配置, 防止远端单点故障
  6. agentDw.sinkgroups = sinkGroupSafeRealtimeClickLogHttp
  7. # --- SafeRealtimeClickLogHttp 配置 Start --- #
  8. # srcSafeRealtimeClickLogHttp source 配置
  9. agentDw.sources.srcSafeRealtimeClickLogHttp.type = http
  10. agentDw.sources.srcSafeRealtimeClickLogHttp.port = 10101
  11. agentDw.sources.srcSafeRealtimeClickLogHttp.host = 0.0.0.0
  12. agentDw.sources.srcSafeRealtimeClickLogHttp.handler = com.dw.flume.source.http.HTTPCustomHandler
  13. agentDw.sources.srcSafeRealtimeClickLogHttp.threads = 6
  14. agentDw.sources.srcSafeRealtimeClickLogHttp.channels = chSafeRealtimeClickLogHttp
  15. # SrcUbaAppActionLog Timestamp Interceptor 配置
  16. #agentDw.sources.srcSafeRealtimeClickLogHttp.interceptors = in1
  17. #agentDw.sources.srcSafeRealtimeClickLogHttp.interceptors.in1.type = timestamp
  18. #agentDw.sources.srcSafeRealtimeClickLogHttp.interceptors.in1.preserveExisting = true
  19. # chSafeRealtimeClickLog channels 配置
  20. agentDw.channels.chSafeRealtimeClickLogHttp.type = file
  21. agentDw.channels.chSafeRealtimeClickLogHttp.checkpointDir = /var/log/flume/SafeRealtimeClickLogHttp/checkpoint
  22. agentDw.channels.chSafeRealtimeClickLogHttp.dataDirs = /var/log/flume/SafeRealtimeClickLogHttp/data
  23. agentDw.channels.chSafeRealtimeClickLogHttp.capacity = 1000000
  24. agentDw.channels.chSafeRealtimeClickLogHttp.maxFileSize = 2146435071
  25. agentDw.channels.chSafeRealtimeClickLogHttp.threads = 6
  26. # kafka
  27. #agentDw.sinks.sinkSafeRealtimeClickLogHttp1.type = org.apache.flume.sink.kafka.KafkaSink
  28. #agentDw.sinks.sinkSafeRealtimeClickLogHttp1.channel = chSafeRealtimeClickLogHttp
  29. #agentDw.sinks.sinkSafeRealtimeClickLogHttp1.kafka.bootstrap.servers = node4:9092,node5:9092,node6:9092
  30. #agentDw.sinks.sinkSafeRealtimeClickLogHttp1.kafka.topic = SafeRealtimeClickLog
  31. # 为该通道中的所有事件指定一个Kafka分区ID, 默认情况下,如果此属性未设置,事件将由Kafka生产者的partition器分配
  32. # agentDw.sinks.sinkSafeRealtimeClickLogStreamKafka.defaultPartitionId
  33. #agentDw.sinks.sinkSafeRealtimeClickLogHttp1.kafka.flumeBatchSize = 2000
  34. #agentDw.sinks.sinkSafeRealtimeClickLogHttp1.kafka.producer.acks = -1
  35. #agentDw.sinks.sinkSafeRealtimeClickLogHttp1.kafka.producer.linger.ms = 1
  36. # sinkSafeRealtimeClickLogHttp1 To thrift sinks 配置
  37. agentDw.sinks.sinkSafeRealtimeClickLogHttp1.channel = chSafeRealtimeClickLogHttp
  38. agentDw.sinks.sinkSafeRealtimeClickLogHttp1.type = http
  39. agentDw.sinks.sinkSafeRealtimeClickLogHttp1.endpoint = http://node1:10501
  40. agentDw.sinks.sinkSafeRealtimeClickLogHttp1.connectTimeout = 2000
  41. agentDw.sinks.sinkSafeRealtimeClickLogHttp1.requestTimeout = 2000
  42. # HTTP 报文头 Content-Type:application/json;charset=UTF-8 || Content-Type:text/plain;charset=UTF-8
  43. agentDw.sinks.sinkSafeRealtimeClickLogHttp1.contentTypeHeader = Content-Type:application/json;charset=UTF-8
  44. agentDw.sinks.sinkSafeRealtimeClickLogHttp1.acceptHeader = Content-Type:application/json;charset=UTF-8
  45. agentDw.sinks.sinkSafeRealtimeClickLogHttp1.defaultBackoff = true
  46. agentDw.sinks.sinkSafeRealtimeClickLogHttp1.defaultRollback = true
  47. agentDw.sinks.sinkSafeRealtimeClickLogHttp1.defaultIncrementMetrics = false
  48. agentDw.sinks.sinkSafeRealtimeClickLogHttp1.backoff.200 = false
  49. agentDw.sinks.sinkSafeRealtimeClickLogHttp1.rollback.200 = false
  50. agentDw.sinks.sinkSafeRealtimeClickLogHttp1.incrementMetrics.200 = true
  51. # --- SafeRealtimeClickLogHttp 配置 End --- #

二. Flume Interceptors 自定拦截器编写

使用:

  1. 方法 1 指定 --classpath
  2. flume-ng agent -c /etc/flume-ng/conf -f /etc/flume-ng/conf/flume.conf -n agentDw --classpath /usr/lib/flume-ng/lib/dw-flume-1.0.0.jar
  3. 方法 2 放到插件目录下
  4. /opt/cloudera/parcels/CDH/lib/flume-ng/lib
  5. /usr/lib/flume-ng/lib
  6. /usr/lib/flume-ng/plugins.d
  7. $FLUME_HOME/plugins.d

Flume 中配置案例

  • header 中的字段进行正则匹配分离出更多 header
  1. # 配置需要处理的 srouce channels slinks
  2. agentDw.sources = SrcBrowserUseLog
  3. agentDw.channels = ChBrowserUseLog
  4. agentDw.sinks = SinkBrowserUseLog
  5. # --- BrowserUseLog 配置 Start --- #
  6. # SrcBrowserUseLog syslog source 配置
  7. agentDw.sources.SrcBrowserUseLog.type = spooldir
  8. agentDw.sources.SrcBrowserUseLog.spoolDir = /var/log/flume/browser_use/monitor
  9. # 完成删除文件 immediate | never
  10. agentDw.sources.SrcBrowserUseLog.deletePolicy = never
  11. # 递归检测目录
  12. agentDw.sources.SrcBrowserUseLog.recursiveDirectorySearch = true
  13. # 上传文件的绝对路径
  14. agentDw.sources.SrcBrowserUseLog.fileHeader = true
  15. agentDw.sources.SrcBrowserUseLog.fileHeaderKey = file
  16. # 上传的文件名
  17. agentDw.sources.SrcBrowserUseLog.basenameHeader = true
  18. agentDw.sources.SrcBrowserUseLog.basenameHeaderKey = basename
  19. agentDw.sources.SrcBrowserUseLog.channels = ChBrowserUseLog
  20. # SrcBrowserUseLog Interceptors 配置
  21. agentDw.sources.SrcBrowserUseLog.interceptors = in1
  22. # 自定义 header 拦截器
  23. agentDw.sources.SrcBrowserUseLog.interceptors.in1.type = com.angejia.dw.flume.source.interceptors.RegexExtractorHeaderInterceptor$Builder
  24. # 拦截则正则规则
  25. agentDw.sources.SrcBrowserUseLog.interceptors.in1.regex = browser_use/monitor/([A-Za-z0-9/._-]+)/
  26. agentDw.sources.SrcBrowserUseLog.interceptors.in1.extractorHeader = true
  27. # 拦截的 header key (来自 source)
  28. agentDw.sources.SrcBrowserUseLog.interceptors.in1.extractorHeaderKey = file
  29. # browser_use/monitor/(.*)$ 正则匹配后的映射 s1 -> log_path , 以此类推
  30. agentDw.sources.SrcBrowserUseLog.interceptors.in1.serializers = s1
  31. agentDw.sources.SrcBrowserUseLog.interceptors.in1.serializers.s1.name = log_path
  32. # ChBrowserUseLog channels 配置
  33. agentDw.channels.ChBrowserUseLog.type = file
  34. agentDw.channels.ChBrowserUseLog.checkpointDir = /var/log/flume/browser_use/checkpoint
  35. agentDw.channels.ChBrowserUseLog.dataDirs = /var/log/flume/browser_use/data
  36. agentDw.channels.ChBrowserUseLog.threads = 2
  37. # ChBrowserUseLog To HDFS
  38. agentDw.sinks.SinkBrowserUseLog.type = hdfs
  39. agentDw.sinks.SinkBrowserUseLog.channel = ChBrowserUseLog
  40. # 写入目录和文件规则,格式
  41. agentDw.sinks.SinkBrowserUseLog.hdfs.path = hdfs://nameservice1/ods/browser_use/%{log_path}
  42. agentDw.sinks.SinkBrowserUseLog.hdfs.filePrefix = log
  43. agentDw.sinks.SinkBrowserUseLog.hdfs.fileSuffix = .log
  44. # 写入文件前缀规则
  45. agentDw.sinks.SinkBrowserUseLog.hdfs.inUsePrefix = .
  46. agentDw.sinks.SinkBrowserUseLog.hdfs.inUseSuffix = .tmp
  47. #
  48. agentDw.sinks.SinkBrowserUseLog.hdfs.round = true
  49. agentDw.sinks.SinkBrowserUseLog.hdfs.roundValue = 10
  50. agentDw.sinks.SinkBrowserUseLog.hdfs.roundUnit = minute
  51. # 复制块, 用于控制滚动大小
  52. agentDw.sinks.SinkBrowserUseLog.hdfs.minBlockReplicas=1
  53. agentDw.sinks.SinkBrowserUseLog.hdfs.rollSize = 0
  54. agentDw.sinks.SinkBrowserUseLog.hdfs.rollCount = 0
  55. agentDw.sinks.SinkBrowserUseLog.hdfs.rollInterval = 0
  56. # 写入格式
  57. agentDw.sinks.SinkBrowserUseLog.hdfs.writeFormat = Text
  58. # 文件格式 : SequenceFile, DataStream(数据不会压缩输出文件) or CompressedStream
  59. agentDw.sinks.SinkBrowserUseLog.hdfs.fileType = DataStream
  60. # 批处理达到这个上限, 写到 HDFS
  61. agentDw.sinks.SinkBrowserUseLog.hdfs.batchSize = 100
  62. # hdfs 打开、写、刷新、关闭的超时时间, 毫秒
  63. agentDw.sinks.SinkBrowserUseLog.hdfs.callTimeout = 60000
  64. # 多少秒没有写入就关闭这个文件, 0 不关闭
  65. agentDw.sinks.SinkBrowserUseLog.hdfs.idleTimeout = 1
  66. # 使用本地时间
  67. agentDw.sinks.SinkBrowserUseLog.hdfs.useLocalTimeStamp = true
  68. # --- BrowserUseLog 配置 End --- #