[toc]
需求
日志:启动日志(OK)、事件日志
数据采集:日志文件 => Flume => HDFS => ODS
原始日志数据
2020-07-30 14:18:47.339 [main] INFOcom.lagou.ecommerce.AppStart -{"app_active": {"name": "app_active","json": {"entry": "1","action": "1","error_code": "0"},"time": 1596111888529},"attr": {"area": "泰安","uid": "2F10092A9","app_v": "1.1.13","event_type": "common","device_id": "1FB872-9A1009","os_type": "4.7.3","channel": "DK","language": "chinese","brand": "iphone-9"}}
数据采集的流程

采集日志工具 : Flume 1.8+ :
- 提供了一个非常好用的 Taildir Source
- 使用该 source,可以监控多个目录,对目录中新写入的数据进行实时采集
taildir source配置
特点
- 使用正则表达式匹配目录中的文件名
- 监控的文件中,一旦有数据写入,Flume 就会将信息写入到指定的 Sink
- 高可靠,不会丢失数据
- 不会对跟踪文件有任何处理,不会重命名也不会删除
- 支持按行读取文本文件 , 不支持 Windows,不能读二进制文件
source 配置
a1.sources.r1.type = TAILDIR# 配置检查点文件的路径,检查点文件会以 json 格式保存已经读取文件的位置,解决断点续传的问题a1.sources.r1.positionFile = /opt/module/flume-1.9.0/conf/startlog_position.json# 同时监控多个目录中的文件 ( 多个,以 空格 分隔 )a1.sources.r1.filegroups = f1# 每个文件绝对路径,文件名可以用正则表达式匹配a1.sources.r1.filegroups.f1 = /opt/module/data/flume-1.9.0/logs/start/.*log
hdfs sink配置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/yx_test/data/logs/start/%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = startlog
a1.sinks.k1.hdfs.fileType = DataStream
# 配置文件滚动方式 (默认: 1024字节)
a1.sinks.k1.hdfs.rollSize = 33554432
# 基于event数量 (默认: 10 个 event)
a1.sinks.k1.hdfs.rollCount = 0
# 基于时间 (默认: 30 秒)
a1.sinks.k1.hdfs.rollInterval = 0
# 基于文件空闲时间 0 : 禁用
a1.sinks.k1.hdfs.idleTimeout = 0
#(默认: hdfs 副本数) 1 : 让 Flume 感知不到 hdfs 的块复制,从而不影响其他配置 ( 时间间隔、文件大小、events数量 )
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 100
# 使用本地时间
a1.sinks.k1.hdfs.useLocalTimeStamp = true
Agent 的配置
vim /opt/module/flume-1.9.0/conf/flume-log2hdfs1.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/conf/startlog_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/data/flume-1.9.0/logs/start/.*log
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/yx_test/data/logs/start/%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = startlog
a1.sinks.k1.hdfs.fileType = DataStream
# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000
# 使用本地时间
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Flume 的优化配置
启动 agent
flume-ng agent --conf-file /opt/module/flume-1.9.0/conf/flumelog2hdfs1.conf -name a1 -Dflume.roog.logger=INFO,console
向 /data/flume/logs/ 目录中放入日志文件,报错:
java.lang.OutOfMemoryError: GC overhead limit exceeded
查看运行 Flume 的配置文件
ps -ef | grep flume

Flume jvm堆默认最大分配 20m,该值太小,需要调整
在 $FLUME_HOME/conf/flume-env.sh 中增加内容

export JAVA_OPTS="-Xms4000m -Xmx4000m -Dcom.sun.management.jmxremote"

使配置文件生效,需要在命令行中指定配置文件目录
flume-ng agent --conf /opt/apps/flume-1.9/conf --conf-file /data/flume/conf/flume-log2hdfs1.conf -name a1 -Dflume.roog.logger=INFO,console
Flume 内存参数设置及优化:
- 根据日志数据量的大小,Jvm 堆一般设置 : 4G或更高
-Xms -Xmx最好设置一致,避免内存抖动带来的性能影响
问题:Flume 放数据时,使用本地时间 , 不管日志的时间戳 : 会导致日志时间漂移
自定义拦截器
Flume Agent 配置使用了本地时间,可能会导致数据存放的路径不正确 , 为此问题需要使用自定义拦截器
agent 用于测试自定义拦截器 :
- netcat source => logger sink
# a1是agent的名称
# source、channel、sink的名称分别为:r1 c1 k1
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = cpu101
a1.sources.r1.port = 9999
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.cpucode.flume.interceptor.CustomerInterceptor$Builder
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
# sink
a1.sinks.k1.type = logger
# source、channel、sink之间的关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
自定义拦截器的原理:
- 自定义拦截器要集成 Flume 的 Interceptor
- Event 分为 header 和 body(接收的字符串)
- 获取 header 和 body
- 从 body 中获取
"time":1596382570539,并将时间戳转换为字符串 “yyyy-MM-dd” - 将转换后的字符串放置 header 中
自定义拦截器的实现:
- 获取 event 的 header
- 获取 event 的 body
- 解析 body 获取 json 串
- 解析 json 串获取时间戳
- 将时间戳转换为字符串 “yyyy-MM-dd”
- 将转换后的字符串放置 header 中
- 返回 event
Mevan
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

Java
package com.cpucode.flume.interceptor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.compress.utils.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.interceptor.Interceptor;
import org.junit.Test;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class CustomerInterceptor implements Interceptor {
@Override
public void initialize() {
}
// 逐条处理event
@Override
public Event intercept(Event event) {
// 获取 event 的 body
String eventBody = new String(event.getBody(), Charsets.UTF_8);
// 获取 event 的 header
Map<String, String> headersMap = event.getHeaders();
// 解析body获取json串
String[] bodyArr = eventBody.split("\\s+");
try{
String jsonStr = bodyArr[6];
// 解析json串获取时间戳
JSONObject jsonObject = JSON.parseObject(jsonStr);
String timestampStr = jsonObject.getJSONObject("app_active").getString("time");
// 将时间戳转换为字符串 "yyyy-MM-dd"
// 将字符串转换为Long
long timestamp = long.parselong(timestampStr);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
Instant instant = Instant.ofEpochMilli(timestamp);
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
String date = formatter.format(localDateTime);
// 将转换后的字符串放置header中
headersMap.put("logtime", date);
event.setHeaders(headersMap);
}
catch (Exception e){
headersMap.put("logtime", "Unknown");
event.setHeaders(headersMap);
}
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
List<Event> lstEvent = new ArrayList<>();
for (Event event: events){
Event outEvent = intercept(event);
if (outEvent != null) {
lstEvent.add(outEvent);
}
}
return lstEvent;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new CustomerInterceptor();
}
@Override
public void configure(Context context) {
}
}
@Test
public void testJunit(){
String str = "2020-08-02 18:19:32.959 [main] INFO com.lagou.ecommerce.AppStart - {\"app_active\":{\"name\":\"app_active\",\"json\":{\"entry\":\"1\",\"action\":\"0\",\"error_code\":\"0\"},\"time\":1596342840284},\"attr\":{\"area\":\"大庆\",\"uid\":\"2F10092A2\",\"app_v\":\"1.1.15\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1002\",\"os_type\":\"2.8\",\"channel\":\"TB\",\"language\":\"chinese\",\"brand\":\"iphone-8\"}}";
Map<String, String> map = new HashMap<>();
// new Event
Event event = new SimpleEvent();
event.setHeaders(map);
event.setBody(str.getBytes(Charsets.UTF_8));
// 调用interceptor处理event
CustomerInterceptor customerInterceptor = new CustomerInterceptor();
Event outEvent = customerInterceptor.intercept(event);
// 处理结果
Map<String, String> headersMap = outEvent.getHeaders();
System.out.println(JSON.toJSONString(headersMap));
}
}
将程序打包,放在 flume/lib 目录下
启动 Agent 测试
flume-ng agent --conf /opt/apps/flume-1.9/conf --conf-file /data/flume/conf/flumetest1.conf -name a1 -Dflume.roog.logger=INFO,console
采集启动日志(使用自定义拦截器)
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/conf/startlog_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/data/flume-1.9.0/logs/start/.*log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.cpucode.flume.interceptor.CustomerInterceptor$Builder
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/yx_test/data/logs/start/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog
a1.sinks.k1.hdfs.fileType = DataStream
# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000
# 使用本地时间
# a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
修改:
- 给 source 增加自定义拦截器
- 去掉本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true - 根据 header 中的 logtime 写文件
启动服务
flume-ng agent --conf /opt/apps/flume-1.9/conf --conf-file/data/flume/conf/flume-log2hdfs2.conf -name a1 -Dflume.root.logger=INFO,console
拷贝日志
检查HDFS文件
采集启动日志和事件日志
采集两种日志:启动日志、事件日志,不同的日志放置在不同的目录下
利用一个 Flume 通过监控多个目录 , 拿到全部日志

总体思路
- taildir 监控多个目录
- 修改自定义拦截器,不同来源的数据加上不同标志
- hdfs sink 根据标志写文件
Agent 配置
vim /opt/module/flume-1.9.0/job/flume-log2hdfs4.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/conf/startlog_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/data/flume-1.9.0/logs/start/.*log
a1.sources.r1.headers.f1.logtype = start
a1.sources.r1.filegroups.f2 = /opt/module/data/flume-1.9.0/logs/event/.*log
a1.sources.r1.headers.f2.logtype = event
# 自定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.cpucode.flume.interceptor.LogTypeInterceptor$Builder
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/yx_test/data/logs/%{logtype}/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog
a1.sinks.k1.hdfs.fileType = DataStream
# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- filegroups :同时监控多个目录中的文件 ( 多个,以 `` 分隔 )
headers.<filegroupName>.<headerKey>: 给 event 增加 header key。不同的 filegroup,可配不同的 value
自定义拦截器
package com.cpucode.flume.interceptor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.compress.utils.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.interceptor.Interceptor;
import org.junit.Test;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author : cpucode
* @date : 2022/6/24 21:11
* @github : https://github.com/CPU-Code
* @csdn : https://cpucode.blog.csdn.net
*/
public class LogTypeInterceptor implements Interceptor {
@Override
public void initialize() {
}
/**
* 逐条处理event
* @param event
* @return
*/
@Override
public Event intercept(Event event) {
// 获取 event 的 body
String eventBody = new String(event.getBody(), Charsets.UTF_8);
// 获取 event 的 header
Map<String, String> headersMap = event.getHeaders();
// 解析 body 获取 json 串
String[] bodyArr = eventBody.split("\\s+");
try{
String jsonStr = bodyArr[6];
// 解析json串获取时间戳
String timestampStr = "";
JSONObject jsonObject = JSON.parseObject(jsonStr);
if (headersMap.getOrDefault("logtype", "").equals("start")) {
// 取启动日志的时间戳
timestampStr = jsonObject.getJSONObject("app_active").getString("time");
} else if (headersMap.getOrDefault("logtype", "").equals("event")) {
// 取事件日志第一条记录的时间戳
JSONArray jsonArray = jsonObject.getJSONArray("lagou_event");
if (jsonArray.size() > 0){
timestampStr = jsonArray.getJSONObject(0).getString("time");
}
}
// 将时间戳转换为字符串 "yyyy-MM-dd"
// 将字符串转换为Long
long timestamp = Long.parseLong(timestampStr);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
Instant instant = Instant.ofEpochMilli(timestamp);
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
String date = formatter.format(localDateTime);
// 将转换后的字符串放置header中
headersMap.put("logtime", date);
event.setHeaders(headersMap);
} catch (Exception e){
headersMap.put("logtime", "Unknown");
event.setHeaders(headersMap);
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
List<Event> lstEvent = new ArrayList<>();
for (Event event: list){
Event outEvent = intercept(event);
if (outEvent != null) {
lstEvent.add(outEvent);
}
}
return lstEvent;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new LogTypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
@Test
public void startJunit(){
String str = "2020-08-02 18:19:32.959 [main] INFO com.lagou.ecommerce.AppStart - {\"app_active\":{\"name\":\"app_active\",\"json\":{\"entry\":\"1\",\"action\":\"0\",\"error_code\":\"0\"},\"time\":1596342840284},\"attr\":{\"area\":\"大庆\",\"uid\":\"2F10092A2\",\"app_v\":\"1.1.15\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1002\",\"os_type\":\"2.8\",\"channel\":\"TB\",\"language\":\"chinese\",\"brand\":\"iphone-8\"}}";
Map<String, String> map = new HashMap<>();
// new Event
Event event = new SimpleEvent();
map.put("logtype", "start");
event.setHeaders(map);
event.setBody(str.getBytes(Charsets.UTF_8));
// 调用interceptor处理event
LogTypeInterceptor customerInterceptor = new LogTypeInterceptor();
Event outEvent = customerInterceptor.intercept(event);
// 处理结果
Map<String, String> headersMap = outEvent.getHeaders();
System.out.println(JSON.toJSONString(headersMap));
}
@Test
public void eventJunit(){
String str = "2020-08-02 18:20:11.877 [main] INFO com.lagou.ecommerce.AppEvent - {\"lagou_event\":[{\"name\":\"goods_detail_loading\",\"json\":{\"entry\":\"1\",\"goodsid\":\"0\",\"loading_time\":\"93\",\"action\":\"3\",\"staytime\":\"56\",\"showtype\":\"2\"},\"time\":1596343881690},{\"name\":\"loading\",\"json\":{\"loading_time\":\"15\",\"action\":\"3\",\"loading_type\":\"3\",\"type\":\"1\"},\"time\":1596356988428},{\"name\":\"notification\",\"json\":{\"action\":\"1\",\"type\":\"2\"},\"time\":1596374167278},{\"name\":\"favorites\",\"json\":{\"course_id\":1,\"id\":0,\"userid\":0},\"time\":1596350933962}],\"attr\":{\"area\":\"长治\",\"uid\":\"2F10092A4\",\"app_v\":\"1.1.14\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1004\",\"os_type\":\"0.5.0\",\"channel\":\"QL\",\"language\":\"chinese\",\"brand\":\"xiaomi-0\"}}";
Map<String, String> map = new HashMap<>();
// new Event
Event event = new SimpleEvent();
map.put("logtype", "event");
event.setHeaders(map);
event.setBody(str.getBytes(Charsets.UTF_8));
// 调用interceptor处理event
LogTypeInterceptor customerInterceptor = new LogTypeInterceptor();
Event outEvent = customerInterceptor.intercept(event);
// 处理结果
Map<String, String> headersMap = outEvent.getHeaders();
System.out.println(JSON.toJSONString(headersMap));
}
}
编码完成后打包上传服务器,放置在 $FLUME_HOME/lib 下


测试
启动 Agent ,拷贝日志,检查 HDFS 文件
清理环境
rm -f /opt/module/flume-1.9.0/conf/startlog_position.json
rm -f /opt/module/data/flume-1.9.0/logs/start/*.log
rm -f /opt/module/data/flume-1.9.0/logs/event/*.log
启动测试 Agent
/opt/module/flume-1.9.0/bin/flume-ng agent --conf /opt/module/flume-1.9.0/conf --conf-file /opt/module/flume-1.9.0/job/flume-log2hdfs4.conf -name a1 -Dflume.root.logger=INFO,console
生成日志
cd /opt/module/data/flume-1.9.0
cp eventlog0721.small.log /opt/module/data/flume-1.9.0/logs/event/
cp eventlog0721.small.log /opt/module/data/flume-1.9.0/logs/start/

检查HDFS文件
hdfs dfs -ls /user/yx_test/data/logs/event
hdfs dfs -ls /user/yx_test/data/logs/start

生产环境下启动 Agent
nohup flume-ng agent --conf /opt/module/flume-1.9.0/conf --conffile /opt/module/flume-1.9.0/job/flume-log2hdfs3.conf -name a1 -Dflume.root.logger=INFO,LOGFILE > /dev/null 2>&1 &
- nohup : 允许用户退出帐户/关闭终端之后继续运行相应的进程
- /dev/null : linux 的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称黑洞
- 标准输入0 : 从键盘获得输入
/proc/self/fd/0 - 标准输出1 : 输出到屏幕(控制台)
/proc/self/fd/1 - 错误输出2 : 输出到屏幕(控制台)
/proc/self/fd/2 /dev/null: 标准输出1重定向到/dev/null中,此时标准输出不存在,没有任何地方能够找到输出的内容2>&1: 错误输出将会和标准输出输出到同一个地方>/dev/null 2>&1: 不会输出任何信息到控制台,也不会有任何信息输出到文件中
总结
- 使用 taildir source 监控指定的多个目录,可以给不同目录的日志加上不同 header
- 在每个目录中可以使用正则匹配多个文件
- 使用自定义拦截器,主要功能是从 json 串中获取时间戳,加到 event 的 header 中 hdfs sink 使用 event header 中的信息写数据(控制写文件的位置)
- hdfs 文件的滚动方式(基于文件大小、基于 event 数量、基于时间)
- 调节 flume jvm 内存的分配
