Flume 插件
一. Flume HTTPSource 自定义 HTTP 源
测试
curl -X POST -H "Content-Type:application/json;charset=UTF-8" -d '{"custom_ip":"192.168.1.1","host":"random_host.example.com"}' http://hostname:portcurl -X POST -H "Content-Type:application/json;charset=UTF-8" -d 'aaa,bbb,ccc' http://hostname:port
Flume 中配置案例
# 配置需要处理的 srouce channels sinksagentDw.sources = srcSafeRealtimeClickLogHttpagentDw.channels = chSafeRealtimeClickLogHttpagentDw.sinks = sinkSafeRealtimeClickLogHttp1 sinkSafeRealtimeClickLogHttp2 sinkSafeRealtimeClickLogHttp3# 对所有的出口 slink 做 Load balancing Sink Processor 负载平衡处理器配置, 防止远端单点故障agentDw.sinkgroups = sinkGroupSafeRealtimeClickLogHttp# --- SafeRealtimeClickLogHttp 配置 Start --- ## srcSafeRealtimeClickLogHttp source 配置agentDw.sources.srcSafeRealtimeClickLogHttp.type = httpagentDw.sources.srcSafeRealtimeClickLogHttp.port = 10101agentDw.sources.srcSafeRealtimeClickLogHttp.host = 0.0.0.0agentDw.sources.srcSafeRealtimeClickLogHttp.handler = com.dw.flume.source.http.HTTPCustomHandleragentDw.sources.srcSafeRealtimeClickLogHttp.threads = 6agentDw.sources.srcSafeRealtimeClickLogHttp.channels = chSafeRealtimeClickLogHttp# SrcUbaAppActionLog Timestamp Interceptor 配置#agentDw.sources.srcSafeRealtimeClickLogHttp.interceptors = in1#agentDw.sources.srcSafeRealtimeClickLogHttp.interceptors.in1.type = timestamp#agentDw.sources.srcSafeRealtimeClickLogHttp.interceptors.in1.preserveExisting = true# chSafeRealtimeClickLog channels 配置agentDw.channels.chSafeRealtimeClickLogHttp.type = fileagentDw.channels.chSafeRealtimeClickLogHttp.checkpointDir = /var/log/flume/SafeRealtimeClickLogHttp/checkpointagentDw.channels.chSafeRealtimeClickLogHttp.dataDirs = /var/log/flume/SafeRealtimeClickLogHttp/dataagentDw.channels.chSafeRealtimeClickLogHttp.capacity = 1000000agentDw.channels.chSafeRealtimeClickLogHttp.maxFileSize = 2146435071agentDw.channels.chSafeRealtimeClickLogHttp.threads = 6# kafka#agentDw.sinks.sinkSafeRealtimeClickLogHttp1.type = org.apache.flume.sink.kafka.KafkaSink#agentDw.sinks.sinkSafeRealtimeClickLogHttp1.channel = chSafeRealtimeClickLogHttp#agentDw.sinks.sinkSafeRealtimeClickLogHttp1.kafka.bootstrap.servers = node4:9092,node5:9092,node6:9092#agentDw.sinks.sinkSafeRealtimeClickLogHttp1.kafka.topic = SafeRealtimeClickLog# 为该通道中的所有事件指定一个Kafka分区ID, 默认情况下,如果此属性未设置,事件将由Kafka生产者的partition器分配# agentDw.sinks.sinkSafeRealtimeClickLogStreamKafka.defaultPartitionId#agentDw.sinks.sinkSafeRealtimeClickLogHttp1.kafka.flumeBatchSize = 2000#agentDw.sinks.sinkSafeRealtimeClickLogHttp1.kafka.producer.acks = -1#agentDw.sinks.sinkSafeRealtimeClickLogHttp1.kafka.producer.linger.ms = 1# sinkSafeRealtimeClickLogHttp1 To thrift sinks 配置agentDw.sinks.sinkSafeRealtimeClickLogHttp1.channel = chSafeRealtimeClickLogHttpagentDw.sinks.sinkSafeRealtimeClickLogHttp1.type = httpagentDw.sinks.sinkSafeRealtimeClickLogHttp1.endpoint = http://node1:10501agentDw.sinks.sinkSafeRealtimeClickLogHttp1.connectTimeout = 2000agentDw.sinks.sinkSafeRealtimeClickLogHttp1.requestTimeout = 2000# HTTP 报文头 Content-Type:application/json;charset=UTF-8 || Content-Type:text/plain;charset=UTF-8agentDw.sinks.sinkSafeRealtimeClickLogHttp1.contentTypeHeader = Content-Type:application/json;charset=UTF-8agentDw.sinks.sinkSafeRealtimeClickLogHttp1.acceptHeader = Content-Type:application/json;charset=UTF-8agentDw.sinks.sinkSafeRealtimeClickLogHttp1.defaultBackoff = trueagentDw.sinks.sinkSafeRealtimeClickLogHttp1.defaultRollback = trueagentDw.sinks.sinkSafeRealtimeClickLogHttp1.defaultIncrementMetrics = falseagentDw.sinks.sinkSafeRealtimeClickLogHttp1.backoff.200 = falseagentDw.sinks.sinkSafeRealtimeClickLogHttp1.rollback.200 = falseagentDw.sinks.sinkSafeRealtimeClickLogHttp1.incrementMetrics.200 = true# --- SafeRealtimeClickLogHttp 配置 End --- #
二. Flume Interceptors 自定拦截器编写
使用:
方法 1 指定 --classpathflume-ng agent -c /etc/flume-ng/conf -f /etc/flume-ng/conf/flume.conf -n agentDw --classpath /usr/lib/flume-ng/lib/dw-flume-1.0.0.jar方法 2 放到插件目录下/opt/cloudera/parcels/CDH/lib/flume-ng/lib/usr/lib/flume-ng/lib/usr/lib/flume-ng/plugins.d$FLUME_HOME/plugins.d
Flume 中配置案例
- header 中的字段进行正则匹配分离出更多 header
# 配置需要处理的 srouce channels slinksagentDw.sources = SrcBrowserUseLogagentDw.channels = ChBrowserUseLogagentDw.sinks = SinkBrowserUseLog# --- BrowserUseLog 配置 Start --- ## SrcBrowserUseLog syslog source 配置agentDw.sources.SrcBrowserUseLog.type = spooldiragentDw.sources.SrcBrowserUseLog.spoolDir = /var/log/flume/browser_use/monitor# 完成删除文件 immediate | neveragentDw.sources.SrcBrowserUseLog.deletePolicy = never# 递归检测目录agentDw.sources.SrcBrowserUseLog.recursiveDirectorySearch = true# 上传文件的绝对路径agentDw.sources.SrcBrowserUseLog.fileHeader = trueagentDw.sources.SrcBrowserUseLog.fileHeaderKey = file# 上传的文件名agentDw.sources.SrcBrowserUseLog.basenameHeader = trueagentDw.sources.SrcBrowserUseLog.basenameHeaderKey = basenameagentDw.sources.SrcBrowserUseLog.channels = ChBrowserUseLog# SrcBrowserUseLog Interceptors 配置agentDw.sources.SrcBrowserUseLog.interceptors = in1# 自定义 header 拦截器agentDw.sources.SrcBrowserUseLog.interceptors.in1.type = com.angejia.dw.flume.source.interceptors.RegexExtractorHeaderInterceptor$Builder# 拦截则正则规则agentDw.sources.SrcBrowserUseLog.interceptors.in1.regex = browser_use/monitor/([A-Za-z0-9/._-]+)/agentDw.sources.SrcBrowserUseLog.interceptors.in1.extractorHeader = true# 拦截的 header key (来自 source)agentDw.sources.SrcBrowserUseLog.interceptors.in1.extractorHeaderKey = file# browser_use/monitor/(.*)$ 正则匹配后的映射 s1 -> log_path , 以此类推agentDw.sources.SrcBrowserUseLog.interceptors.in1.serializers = s1agentDw.sources.SrcBrowserUseLog.interceptors.in1.serializers.s1.name = log_path# ChBrowserUseLog channels 配置agentDw.channels.ChBrowserUseLog.type = fileagentDw.channels.ChBrowserUseLog.checkpointDir = /var/log/flume/browser_use/checkpointagentDw.channels.ChBrowserUseLog.dataDirs = /var/log/flume/browser_use/dataagentDw.channels.ChBrowserUseLog.threads = 2# ChBrowserUseLog To HDFSagentDw.sinks.SinkBrowserUseLog.type = hdfsagentDw.sinks.SinkBrowserUseLog.channel = ChBrowserUseLog# 写入目录和文件规则,格式agentDw.sinks.SinkBrowserUseLog.hdfs.path = hdfs://nameservice1/ods/browser_use/%{log_path}agentDw.sinks.SinkBrowserUseLog.hdfs.filePrefix = logagentDw.sinks.SinkBrowserUseLog.hdfs.fileSuffix = .log# 写入文件前缀规则agentDw.sinks.SinkBrowserUseLog.hdfs.inUsePrefix = .agentDw.sinks.SinkBrowserUseLog.hdfs.inUseSuffix = .tmp#agentDw.sinks.SinkBrowserUseLog.hdfs.round = trueagentDw.sinks.SinkBrowserUseLog.hdfs.roundValue = 10agentDw.sinks.SinkBrowserUseLog.hdfs.roundUnit = minute# 复制块, 用于控制滚动大小agentDw.sinks.SinkBrowserUseLog.hdfs.minBlockReplicas=1agentDw.sinks.SinkBrowserUseLog.hdfs.rollSize = 0agentDw.sinks.SinkBrowserUseLog.hdfs.rollCount = 0agentDw.sinks.SinkBrowserUseLog.hdfs.rollInterval = 0# 写入格式agentDw.sinks.SinkBrowserUseLog.hdfs.writeFormat = Text# 文件格式 : SequenceFile, DataStream(数据不会压缩输出文件) or CompressedStreamagentDw.sinks.SinkBrowserUseLog.hdfs.fileType = DataStream# 批处理达到这个上限, 写到 HDFSagentDw.sinks.SinkBrowserUseLog.hdfs.batchSize = 100# hdfs 打开、写、刷新、关闭的超时时间, 毫秒agentDw.sinks.SinkBrowserUseLog.hdfs.callTimeout = 60000# 多少秒没有写入就关闭这个文件, 0 不关闭agentDw.sinks.SinkBrowserUseLog.hdfs.idleTimeout = 1# 使用本地时间agentDw.sinks.SinkBrowserUseLog.hdfs.useLocalTimeStamp = true# --- BrowserUseLog 配置 End --- #
