前提
已经安装了Kafka Flume Zookeeper
数据准备
事件日志有时间戳
启动日志是json数据
/tmp/logs/app-2020-10-15.log
{"action":"1","ar":"MX","ba":"Sumsung","detail":"","en":"start","entry":"4","extend1":"","g":"4GA40GN6@gmail.com","hw":"750*1134","l":"es","la":"-47.8","ln":"-52.3","loading_time":"0","md":"sumsung-14","mid":"934","nw":"4G","open_ad_type":"1","os":"8.2.0","sr":"I","sv":"V2.7.5","t":"1602681575013","uid":"934","vc":"18","vn":"1.0.3"}
1602753927636|{"cm":{"ln":"-119.3","sv":"V2.4.6","os":"8.1.7","g":"4S776BJJ@gmail.com","mid":"938","nw":"3G","l":"pt","vc":"17","hw":"640*960","ar":"MX","uid":"938","t":"1602735830539","la":"-4.2","md":"sumsung-4","vn":"1.3.4","ba":"Sumsung","sr":"K"},"ap":"app","et":[{"ett":"1602660297907","en":"newsdetail","kv":{"entry":"2","goodsid":"211","news_staytime":"15","loading_time":"0","action":"3","showtype":"5","category":"31","type1":"102"}},{"ett":"1602727431580","en":"loading","kv":{"extend2":"","loading_time":"0","action":"3","extend1":"","type":"3","type1":"","loading_way":"2"}},{"ett":"1602699168083","en":"active_foreground","kv":{"access":"","push_id":"1"}},{"ett":"1602669144748","en":"favorites","kv":{"course_id":1,"id":0,"add_time":"1602680968789","userid":5}}]}
1602753927636|{"cm":{"ln":"-87.5","sv":"V2.9.3","os":"8.2.9","g":"S77P900H@gmail.com","mid":"939","nw":"WIFI","l":"pt","vc":"2","hw":"750*1134","ar":"MX","uid":"939","t":"1602742306970","la":"-38.8","md":"Huawei-16","vn":"1.1.1","ba":"Huawei","sr":"K"},"ap":"app","et":[{"ett":"1602727456955","en":"loading","kv":{"extend2":"","loading_time":"21","action":"1","extend1":"","type":"3","type1":"","loading_way":"2"}},{"ett":"1602751097293","en":"notification","kv":{"ap_time":"1602727044268","action":"1","type":"3","content":""}},{"ett":"1602681925522","en":"active_background","kv":{"active_source":"2"}},{"ett":"1602735427209","en":"error","kv":{"errorDetail":"java.lang.NullPointerException\\n at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n at cn.lift.dfdf.web.AbstractBaseController.validInbound","errorBrief":"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)"}},{"ett":"1602678882273","en":"favorites","kv":{"course_id":2,"id":0,"add_time":"1602713010918","userid":3}}]}
{"action":"1","ar":"MX","ba":"Sumsung","detail":"201","en":"start","entry":"2","extend1":"","g":"22265XL8@gmail.com","hw":"640*960","l":"es","la":"15.6","ln":"-66.8","loading_time":"14","md":"sumsung-0","mid":"940","nw":"3G","open_ad_type":"2","os":"8.2.1","sr":"N","sv":"V2.8.8","t":"1602672513312","uid":"940","vc":"4","vn":"1.0.0"}
编写Flume拦截器
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
MyInterceptor
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
public class MyInterceptor implements Interceptor{
//创建一个放置复合要求数据的集合
private List<Event> results=new ArrayList<>();
private String startFlag="\"en\":\"start\""; // 启动日志特征是 这个字符串开头的
@Override
public void initialize() {
}
/**
* @功能描述: 核心方法,拦截Event
*张俊杰 2020年10月19日 18:48
*/
@Override
public Event intercept(Event event) {
byte[] body = event.getBody();
//在header中添加key
Map<String, String> headers = event.getHeaders();
String bodyStr = new String(body, Charset.forName("utf-8"));
boolean flag=true;
//符合启动日志特征
if (bodyStr.contains(startFlag)) { // 启动日志
headers.put("topic", "topic_start");
flag=ETLUtil.validStartLog(bodyStr);
}else {
//事件日志
headers.put("topic", "topic_event");
flag=ETLUtil.validEventLog(bodyStr);
}
//如果验证结果是false
if (!flag) {
return null;
}
return event;
}
/**
* @功能描述: 一次拦截一组.
*张俊杰 2020年10月19日 18:49
*/
@Override
public List<Event> intercept(List<Event> events) {
//先清空results
results.clear(); // 这个方法是多次调用的,所有需要每次使用的时候先清空一下
for (Event event : events) {
Event result = intercept(event);
//有可能intercept(event),event不符合要求,会拦截掉,返回null
if (result !=null) {
//放入合法的数据集合中
results.add(result);
}
}
return results;
}
@Override
public void close() {
}
/**
* @类说明: 构建拦截器对象
* 张俊杰 2020年10月19日 18:48
*/
public static class Builder implements Interceptor.Builder{
//从flume的配置文件中读取参数
@Override
public void configure(Context context) {
}
//创建一个拦截器对象
@Override
public Interceptor build() {
return new MyInterceptor();
}
}
}
ETLUtil
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
public class ETLUtil {
//判断启动日志是否复合格式要求
//验证JSON字符串的完整性,是否以{}开头结尾
public static boolean validStartLog(String source) {
//判断body部分是否有数据
if (StringUtils.isBlank(source)) {
return false;
}
//去前后空格
String trimStr = source.trim();
//验证JSON字符串的完整性,是否以{}开头结尾
if (trimStr.startsWith("{") && trimStr.endsWith("}")) {
return true;
}
return false;
}
/**
* @功能描述: 判断事件日志是否复合格式要求
* * 事件日志: 时间戳|{}
* 时间戳需要合法:
* a)长度合法(13位)
* b)都是数字
* 验证JSON字符串的完整性,是否以{}开头结尾
* 不符合要求就返回false ,符合要求就返回true
*张俊杰 2020年10月19日 18:55
*/
public static boolean validEventLog(String source) {
//判断body部分是否有数据
if (StringUtils.isBlank(source)) {
return false;
}
//去前后空格
String trimStr = source.trim();
String[] words = trimStr.split("\\|");
if (words.length != 2) {
return false;
}
//判断时间戳
// isNumber()判断值是否是数值类型 123L 0xxx
// isDigits() 判断字符串中是否只能是0-9的数字
if (words[0].length() !=13 || !NumberUtils.isDigits(words[0])) {
return false;
}
//验证JSON字符串的完整性,是否以{}开头结尾
if (words[1].startsWith("{") && words[1].endsWith("}")) {
return true;
}
return false;
}
}
将编写的拦截器打成jar包放到Linux上
上传到apache-flume-1.7.0/lib 的lib目录下面
[root@zjj101 lib]# pwd
/root/soft/apache-flume-1.7.0/lib
[root@zjj101 lib]# rz
rz waiting to receive.
?a? zmodem ′??. °′ Ctrl+C ??.
Transferring ETLInterceptor-0.0.1-SNAPSHOT.jar...
100% 4 KB 4 KB/s 00:00:01 0 ′?
?[root@zjj101 lib]#
编写Flume配置文件测试一下看看拦截器是否有效果
#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#组名名.属性名=属性值
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
#读取/tmp/logs/app-yyyy-mm-dd.log ^代表以xxx开头$代表以什么结尾 .代表匹配任意字符
#+代表匹配任意位置,^app.+.log$是正则表达式, 代表app开头 log结尾, . 代表匹配任意字符,+代表匹配任意位置.
a1.sources.r1.filegroups.f1=/tmp/logs/^app.+.log$
#JSON文件的保存位置,如果不配置的话默认是放到Java目录里面的.
a1.sources.r1.positionFile=/root/soft/apache-flume-1.7.0/custdata/log_position.json
#定义拦截器(写拦截器的Builder)
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.dw.flume.MyInterceptor$Builder
#定义sink
a1.sinks.k1.type=logger
#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
执行shell命令去验证一下拦截器是否好使
flume-ng agent -c conf/ -n a1 -f /root/soft/apache-flume-1.7.0/conf/test.conf -Dflume.root.logger=DEBUG,console
-n 是指定agent 的名字
-f 是指定配置文件
-Dflume.root.logger=DEBUG,console 是在控制台打印日志
执行完了查看日志信息
2020-10-20 10:16:20,217 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:/root/soft/apache-flume-1.7.0/conf/test.conf for changes
2020-10-20 10:16:50,220 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:/root/soft/apache-flume-1.7.0/conf/test.conf for changes
2020-10-20 10:17:20,223 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:/root/soft/apache-flume-1.7.0/conf/test.conf for changes
2020-10-20 10:17:50,225 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:/root/soft/apache-flume-1.7.0/conf/test.conf for changes
2020-10-20 10:17:55,608 (PollableSourceRunner-TaildirSource-r1) [INFO - org.apache.flume.source.taildir.TaildirSource.closeTailFiles(TaildirSource.java:288)] Closed file: /tmp/logs/app-2020-10-15.log, inode: 1870589, pos: 696353
2020-10-20 10:18:20,228 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:/root/soft/apache-flume-1.7.0/conf/test.conf for changes
查看记录读取位置的json文件,里面显示读取了/tmp/logs/app-2020-10-15.log 日志, 读取到696353
[root@zjj101 apache-flume-1.7.0]# cd custdata/
[root@zjj101 custdata]# ls
log_position.json
[root@zjj101 custdata]# cat log_position.json
[{"inode":1870589,"pos":696353,"file":"/tmp/logs/app-2020-10-15.log"}]
[root@zjj101 custdata]#