[toc]

需求

日志:启动日志(OK)、事件日志

数据采集:日志文件 => Flume => HDFS => ODS

原始日志数据

  1. 2020-07-30 14:18:47.339 [main] INFOcom.lagou.ecommerce.AppStart -
  2. {
  3. "app_active": {
  4. "name": "app_active",
  5. "json": {
  6. "entry": "1",
  7. "action": "1",
  8. "error_code": "0"
  9. },
  10. "time": 1596111888529
  11. },
  12. "attr": {
  13. "area": "泰安",
  14. "uid": "2F10092A9",
  15. "app_v": "1.1.13",
  16. "event_type": "common",
  17. "device_id": "1FB872-9A1009",
  18. "os_type": "4.7.3",
  19. "channel": "DK",
  20. "language": "chinese",
  21. "brand": "iphone-9"
  22. }
  23. }

数据采集的流程

日志采集 - 图1

采集日志工具 : Flume 1.8+ :

  • 提供了一个非常好用的 Taildir Source
  • 使用该 source,可以监控多个目录,对目录中新写入的数据进行实时采集

taildir source配置

特点

  • 使用正则表达式匹配目录中的文件名
  • 监控的文件中,一旦有数据写入,Flume 就会将信息写入到指定的 Sink
  • 高可靠,不会丢失数据
  • 不会对跟踪文件有任何处理,不会重命名也不会删除
  • 支持按行读取文本文件 , 不支持 Windows,不能读二进制文件

source 配置

  1. a1.sources.r1.type = TAILDIR
  2. # 配置检查点文件的路径,检查点文件会以 json 格式保存已经读取文件的位置,解决断点续传的问题
  3. a1.sources.r1.positionFile = /opt/module/flume-1.9.0/conf/startlog_position.json
  4. # 同时监控多个目录中的文件 ( 多个,以 空格 分隔 )
  5. a1.sources.r1.filegroups = f1
  6. # 每个文件绝对路径,文件名可以用正则表达式匹配
  7. a1.sources.r1.filegroups.f1 = /opt/module/data/flume-1.9.0/logs/start/.*log

hdfs sink配置

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/yx_test/data/logs/start/%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = startlog
a1.sinks.k1.hdfs.fileType = DataStream

# 配置文件滚动方式 (默认: 1024字节) 
a1.sinks.k1.hdfs.rollSize = 33554432
# 基于event数量 (默认: 10 个 event) 
a1.sinks.k1.hdfs.rollCount = 0
# 基于时间 (默认: 30 秒)
a1.sinks.k1.hdfs.rollInterval = 0
# 基于文件空闲时间 0 : 禁用
a1.sinks.k1.hdfs.idleTimeout = 0
#(默认: hdfs 副本数) 1 : 让 Flume 感知不到 hdfs 的块复制,从而不影响其他配置 ( 时间间隔、文件大小、events数量 )
a1.sinks.k1.hdfs.minBlockReplicas = 1

# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 100

# 使用本地时间
a1.sinks.k1.hdfs.useLocalTimeStamp = true

Agent 的配置

vim /opt/module/flume-1.9.0/conf/flume-log2hdfs1.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/conf/startlog_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 =  /opt/module/data/flume-1.9.0/logs/start/.*log

# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000

# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/yx_test/data/logs/start/%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = startlog
a1.sinks.k1.hdfs.fileType = DataStream

# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1

# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000
# 使用本地时间
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Flume 的优化配置

启动 agent

flume-ng agent --conf-file /opt/module/flume-1.9.0/conf/flumelog2hdfs1.conf -name a1 -Dflume.roog.logger=INFO,console

/data/flume/logs/ 目录中放入日志文件,报错:

java.lang.OutOfMemoryError: GC overhead limit exceeded

查看运行 Flume 的配置文件

ps -ef | grep flume

日志采集 - 图2

Flume jvm堆默认最大分配 20m,该值太小,需要调整

$FLUME_HOME/conf/flume-env.sh 中增加内容

日志采集 - 图3

export JAVA_OPTS="-Xms4000m -Xmx4000m -Dcom.sun.management.jmxremote"

日志采集 - 图4

使配置文件生效,需要在命令行中指定配置文件目录

flume-ng agent --conf /opt/apps/flume-1.9/conf --conf-file /data/flume/conf/flume-log2hdfs1.conf -name a1 -Dflume.roog.logger=INFO,console

Flume 内存参数设置及优化:

  • 根据日志数据量的大小,Jvm 堆一般设置 : 4G或更高
  • -Xms -Xmx 最好设置一致,避免内存抖动带来的性能影响

问题:Flume 放数据时,使用本地时间 , 不管日志的时间戳 : 会导致日志时间漂移

自定义拦截器

Flume Agent 配置使用了本地时间,可能会导致数据存放的路径不正确 , 为此问题需要使用自定义拦截器

agent 用于测试自定义拦截器 :

  • netcat source => logger sink
# a1是agent的名称
# source、channel、sink的名称分别为:r1 c1 k1
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = cpu101
a1.sources.r1.port = 9999
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.cpucode.flume.interceptor.CustomerInterceptor$Builder

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

# sink
a1.sinks.k1.type = logger

# source、channel、sink之间的关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

自定义拦截器的原理:

  • 自定义拦截器要集成 Flume 的 Interceptor
  • Event 分为 header 和 body(接收的字符串)
  • 获取 header 和 body
  • 从 body 中获取 "time":1596382570539 ,并将时间戳转换为字符串 “yyyy-MM-dd”
  • 将转换后的字符串放置 header 中

自定义拦截器的实现:

  1. 获取 event 的 header
  2. 获取 event 的 body
  3. 解析 body 获取 json 串
  4. 解析 json 串获取时间戳
  5. 将时间戳转换为字符串 “yyyy-MM-dd”
  6. 将转换后的字符串放置 header 中
  7. 返回 event

Mevan

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>provided</scope>
    </dependency>

</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>

        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

日志采集 - 图5

Java

package com.cpucode.flume.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.compress.utils.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.interceptor.Interceptor;
import org.junit.Test;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class CustomerInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }

    // 逐条处理event
    @Override
    public Event intercept(Event event) {
        // 获取 event 的 body
        String eventBody = new String(event.getBody(), Charsets.UTF_8);
        // 获取 event 的 header
        Map<String, String> headersMap = event.getHeaders();
        // 解析body获取json串
        String[] bodyArr = eventBody.split("\\s+");

        try{
            String jsonStr = bodyArr[6];
            // 解析json串获取时间戳
            JSONObject jsonObject = JSON.parseObject(jsonStr);
            String timestampStr = jsonObject.getJSONObject("app_active").getString("time");

            // 将时间戳转换为字符串 "yyyy-MM-dd"
            // 将字符串转换为Long
            long timestamp = long.parselong(timestampStr);
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
            Instant instant = Instant.ofEpochMilli(timestamp);
            LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
            String date = formatter.format(localDateTime);
            // 将转换后的字符串放置header中
            headersMap.put("logtime", date);
            event.setHeaders(headersMap);
        }
        catch (Exception e){
            headersMap.put("logtime", "Unknown");
            event.setHeaders(headersMap);
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> lstEvent = new ArrayList<>();
        for (Event event: events){
            Event outEvent = intercept(event);
            if (outEvent != null) {
                lstEvent.add(outEvent);
            }
        }
        return lstEvent;
    }

    @Override
    public void close() {
    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new CustomerInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }

    @Test
    public void testJunit(){
        String str = "2020-08-02 18:19:32.959 [main] INFO com.lagou.ecommerce.AppStart - {\"app_active\":{\"name\":\"app_active\",\"json\":{\"entry\":\"1\",\"action\":\"0\",\"error_code\":\"0\"},\"time\":1596342840284},\"attr\":{\"area\":\"大庆\",\"uid\":\"2F10092A2\",\"app_v\":\"1.1.15\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1002\",\"os_type\":\"2.8\",\"channel\":\"TB\",\"language\":\"chinese\",\"brand\":\"iphone-8\"}}";
        Map<String, String> map = new HashMap<>();

        // new Event
        Event event = new SimpleEvent();
        event.setHeaders(map);
        event.setBody(str.getBytes(Charsets.UTF_8));

        // 调用interceptor处理event
        CustomerInterceptor customerInterceptor = new CustomerInterceptor();
        Event outEvent = customerInterceptor.intercept(event);

        // 处理结果
        Map<String, String> headersMap = outEvent.getHeaders();
        System.out.println(JSON.toJSONString(headersMap));
    }
}

将程序打包,放在 flume/lib 目录下

启动 Agent 测试

flume-ng agent --conf /opt/apps/flume-1.9/conf --conf-file /data/flume/conf/flumetest1.conf -name a1 -Dflume.roog.logger=INFO,console

采集启动日志(使用自定义拦截器)

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/conf/startlog_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/data/flume-1.9.0/logs/start/.*log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.cpucode.flume.interceptor.CustomerInterceptor$Builder

# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000

# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/yx_test/data/logs/start/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog
a1.sinks.k1.hdfs.fileType = DataStream

# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1

# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000

# 使用本地时间
# a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

修改:

  • 给 source 增加自定义拦截器
  • 去掉本地时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true
  • 根据 header 中的 logtime 写文件

启动服务

flume-ng agent --conf /opt/apps/flume-1.9/conf --conf-file/data/flume/conf/flume-log2hdfs2.conf -name a1 -Dflume.root.logger=INFO,console

拷贝日志

检查HDFS文件

采集启动日志和事件日志

采集两种日志:启动日志、事件日志,不同的日志放置在不同的目录下

利用一个 Flume 通过监控多个目录 , 拿到全部日志

日志采集 - 图6

总体思路

  • taildir 监控多个目录
  • 修改自定义拦截器,不同来源的数据加上不同标志
  • hdfs sink 根据标志写文件

Agent 配置

vim /opt/module/flume-1.9.0/job/flume-log2hdfs4.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/conf/startlog_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/data/flume-1.9.0/logs/start/.*log
a1.sources.r1.headers.f1.logtype = start
a1.sources.r1.filegroups.f2 = /opt/module/data/flume-1.9.0/logs/event/.*log
a1.sources.r1.headers.f2.logtype = event

# 自定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.cpucode.flume.interceptor.LogTypeInterceptor$Builder

# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000

# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/yx_test/data/logs/%{logtype}/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog
a1.sinks.k1.hdfs.fileType = DataStream

# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1

# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • filegroups :同时监控多个目录中的文件 ( 多个,以 `` 分隔 )
  • headers.<filegroupName>.<headerKey> : 给 event 增加 header key。不同的 filegroup,可配不同的 value

自定义拦截器

package com.cpucode.flume.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.compress.utils.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.interceptor.Interceptor;
import org.junit.Test;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author : cpucode
 * @date : 2022/6/24 21:11
 * @github : https://github.com/CPU-Code
 * @csdn : https://cpucode.blog.csdn.net
 */
public class LogTypeInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }

    /**
     * 逐条处理event
     * @param event
     * @return
     */
    @Override
    public Event intercept(Event event) {
        // 获取 event 的 body
        String eventBody = new String(event.getBody(), Charsets.UTF_8);
        // 获取 event 的 header
        Map<String, String> headersMap = event.getHeaders();
        // 解析 body 获取 json 串
        String[] bodyArr = eventBody.split("\\s+");

        try{
            String jsonStr = bodyArr[6];

            // 解析json串获取时间戳
            String timestampStr = "";
            JSONObject jsonObject = JSON.parseObject(jsonStr);

            if (headersMap.getOrDefault("logtype", "").equals("start")) {
                // 取启动日志的时间戳
                timestampStr = jsonObject.getJSONObject("app_active").getString("time");
            } else if (headersMap.getOrDefault("logtype", "").equals("event")) {
                // 取事件日志第一条记录的时间戳
                JSONArray jsonArray = jsonObject.getJSONArray("lagou_event");
                if (jsonArray.size() > 0){
                    timestampStr = jsonArray.getJSONObject(0).getString("time");
                }
            }

            // 将时间戳转换为字符串 "yyyy-MM-dd"
            // 将字符串转换为Long
            long timestamp = Long.parseLong(timestampStr);
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
            Instant instant = Instant.ofEpochMilli(timestamp);
            LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
            String date = formatter.format(localDateTime);

            // 将转换后的字符串放置header中
            headersMap.put("logtime", date);
            event.setHeaders(headersMap);
        } catch (Exception e){
            headersMap.put("logtime", "Unknown");
            event.setHeaders(headersMap);
        }

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        List<Event> lstEvent = new ArrayList<>();
        for (Event event: list){
            Event outEvent = intercept(event);
            if (outEvent != null) {
                lstEvent.add(outEvent);
            }
        }
        return lstEvent;
    }

    @Override
    public void close() {
    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new LogTypeInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }

    @Test
    public void startJunit(){
        String str = "2020-08-02 18:19:32.959 [main] INFO com.lagou.ecommerce.AppStart - {\"app_active\":{\"name\":\"app_active\",\"json\":{\"entry\":\"1\",\"action\":\"0\",\"error_code\":\"0\"},\"time\":1596342840284},\"attr\":{\"area\":\"大庆\",\"uid\":\"2F10092A2\",\"app_v\":\"1.1.15\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1002\",\"os_type\":\"2.8\",\"channel\":\"TB\",\"language\":\"chinese\",\"brand\":\"iphone-8\"}}";
        Map<String, String> map = new HashMap<>();

        // new Event
        Event event = new SimpleEvent();
        map.put("logtype", "start");
        event.setHeaders(map);
        event.setBody(str.getBytes(Charsets.UTF_8));

        // 调用interceptor处理event
        LogTypeInterceptor customerInterceptor = new LogTypeInterceptor();
        Event outEvent = customerInterceptor.intercept(event);
        // 处理结果
        Map<String, String> headersMap = outEvent.getHeaders();
        System.out.println(JSON.toJSONString(headersMap));
    }

    @Test
    public void eventJunit(){
        String str = "2020-08-02 18:20:11.877 [main] INFO com.lagou.ecommerce.AppEvent - {\"lagou_event\":[{\"name\":\"goods_detail_loading\",\"json\":{\"entry\":\"1\",\"goodsid\":\"0\",\"loading_time\":\"93\",\"action\":\"3\",\"staytime\":\"56\",\"showtype\":\"2\"},\"time\":1596343881690},{\"name\":\"loading\",\"json\":{\"loading_time\":\"15\",\"action\":\"3\",\"loading_type\":\"3\",\"type\":\"1\"},\"time\":1596356988428},{\"name\":\"notification\",\"json\":{\"action\":\"1\",\"type\":\"2\"},\"time\":1596374167278},{\"name\":\"favorites\",\"json\":{\"course_id\":1,\"id\":0,\"userid\":0},\"time\":1596350933962}],\"attr\":{\"area\":\"长治\",\"uid\":\"2F10092A4\",\"app_v\":\"1.1.14\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1004\",\"os_type\":\"0.5.0\",\"channel\":\"QL\",\"language\":\"chinese\",\"brand\":\"xiaomi-0\"}}";
        Map<String, String> map = new HashMap<>();

        // new Event
        Event event = new SimpleEvent();
        map.put("logtype", "event");
        event.setHeaders(map);
        event.setBody(str.getBytes(Charsets.UTF_8));

        // 调用interceptor处理event
        LogTypeInterceptor customerInterceptor = new LogTypeInterceptor();
        Event outEvent = customerInterceptor.intercept(event);

        // 处理结果
        Map<String, String> headersMap = outEvent.getHeaders();
        System.out.println(JSON.toJSONString(headersMap));
    }
}

编码完成后打包上传服务器,放置在 $FLUME_HOME/lib

日志采集 - 图7

日志采集 - 图8

测试

启动 Agent ,拷贝日志,检查 HDFS 文件

清理环境

rm -f /opt/module/flume-1.9.0/conf/startlog_position.json
rm -f /opt/module/data/flume-1.9.0/logs/start/*.log
rm -f /opt/module/data/flume-1.9.0/logs/event/*.log

启动测试 Agent

/opt/module/flume-1.9.0/bin/flume-ng agent --conf /opt/module/flume-1.9.0/conf --conf-file /opt/module/flume-1.9.0/job/flume-log2hdfs4.conf -name a1 -Dflume.root.logger=INFO,console

生成日志

cd /opt/module/data/flume-1.9.0

cp eventlog0721.small.log /opt/module/data/flume-1.9.0/logs/event/
cp eventlog0721.small.log /opt/module/data/flume-1.9.0/logs/start/

日志采集 - 图9

检查HDFS文件

hdfs dfs -ls /user/yx_test/data/logs/event
hdfs dfs -ls /user/yx_test/data/logs/start

日志采集 - 图10

生产环境下启动 Agent

nohup flume-ng agent --conf /opt/module/flume-1.9.0/conf --conffile /opt/module/flume-1.9.0/job/flume-log2hdfs3.conf -name a1 -Dflume.root.logger=INFO,LOGFILE > /dev/null 2>&1 &
  • nohup : 允许用户退出帐户/关闭终端之后继续运行相应的进程
  • /dev/null : linux 的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称黑洞
  • 标准输入0 : 从键盘获得输入 /proc/self/fd/0
  • 标准输出1 : 输出到屏幕(控制台) /proc/self/fd/1
  • 错误输出2 : 输出到屏幕(控制台) /proc/self/fd/2
  • /dev/null : 标准输出1重定向到 /dev/null 中,此时标准输出不存在,没有任何地方能够找到输出的内容
  • 2>&1 : 错误输出将会和标准输出输出到同一个地方
  • >/dev/null 2>&1 : 不会输出任何信息到控制台,也不会有任何信息输出到文件中

总结

  • 使用 taildir source 监控指定的多个目录,可以给不同目录的日志加上不同 header
  • 在每个目录中可以使用正则匹配多个文件
  • 使用自定义拦截器,主要功能是从 json 串中获取时间戳,加到 event 的 header 中 hdfs sink 使用 event header 中的信息写数据(控制写文件的位置)
  • hdfs 文件的滚动方式(基于文件大小、基于 event 数量、基于时间)
  • 调节 flume jvm 内存的分配