前提
已经安装了Kafka Flume Zookeeper
数据准备
事件日志有时间戳
启动日志是json数据
/tmp/logs/app-2020-10-15.log
{"action":"1","ar":"MX","ba":"Sumsung","detail":"","en":"start","entry":"4","extend1":"","g":"4GA40GN6@gmail.com","hw":"750*1134","l":"es","la":"-47.8","ln":"-52.3","loading_time":"0","md":"sumsung-14","mid":"934","nw":"4G","open_ad_type":"1","os":"8.2.0","sr":"I","sv":"V2.7.5","t":"1602681575013","uid":"934","vc":"18","vn":"1.0.3"}1602753927636|{"cm":{"ln":"-119.3","sv":"V2.4.6","os":"8.1.7","g":"4S776BJJ@gmail.com","mid":"938","nw":"3G","l":"pt","vc":"17","hw":"640*960","ar":"MX","uid":"938","t":"1602735830539","la":"-4.2","md":"sumsung-4","vn":"1.3.4","ba":"Sumsung","sr":"K"},"ap":"app","et":[{"ett":"1602660297907","en":"newsdetail","kv":{"entry":"2","goodsid":"211","news_staytime":"15","loading_time":"0","action":"3","showtype":"5","category":"31","type1":"102"}},{"ett":"1602727431580","en":"loading","kv":{"extend2":"","loading_time":"0","action":"3","extend1":"","type":"3","type1":"","loading_way":"2"}},{"ett":"1602699168083","en":"active_foreground","kv":{"access":"","push_id":"1"}},{"ett":"1602669144748","en":"favorites","kv":{"course_id":1,"id":0,"add_time":"1602680968789","userid":5}}]}1602753927636|{"cm":{"ln":"-87.5","sv":"V2.9.3","os":"8.2.9","g":"S77P900H@gmail.com","mid":"939","nw":"WIFI","l":"pt","vc":"2","hw":"750*1134","ar":"MX","uid":"939","t":"1602742306970","la":"-38.8","md":"Huawei-16","vn":"1.1.1","ba":"Huawei","sr":"K"},"ap":"app","et":[{"ett":"1602727456955","en":"loading","kv":{"extend2":"","loading_time":"21","action":"1","extend1":"","type":"3","type1":"","loading_way":"2"}},{"ett":"1602751097293","en":"notification","kv":{"ap_time":"1602727044268","action":"1","type":"3","content":""}},{"ett":"1602681925522","en":"active_background","kv":{"active_source":"2"}},{"ett":"1602735427209","en":"error","kv":{"errorDetail":"java.lang.NullPointerException\\n at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n at cn.lift.dfdf.web.AbstractBaseController.validInbound","errorBrief":"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)"}},{"ett":"1602678882273","en":"favorites","kv":{"course_id":2,"id":0,"add_time":"1602713010918","userid":3}}]}{"action":"1","ar":"MX","ba":"Sumsung","detail":"201","en":"start","entry":"2","extend1":"","g":"22265XL8@gmail.com","hw":"640*960","l":"es","la":"15.6","ln":"-66.8","loading_time":"14","md":"sumsung-0","mid":"940","nw":"3G","open_ad_type":"2","os":"8.2.1","sr":"N","sv":"V2.8.8","t":"1602672513312","uid":"940","vc":"4","vn":"1.0.0"}
编写Flume拦截器
<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.7.0</version></dependency>
MyInterceptor
import java.nio.charset.Charset;import java.util.ArrayList;import java.util.List;import java.util.Map;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;public class MyInterceptor implements Interceptor{//创建一个放置复合要求数据的集合private List<Event> results=new ArrayList<>();private String startFlag="\"en\":\"start\""; // 启动日志特征是 这个字符串开头的@Overridepublic void initialize() {}/*** @功能描述: 核心方法,拦截Event*张俊杰 2020年10月19日 18:48*/@Overridepublic Event intercept(Event event) {byte[] body = event.getBody();//在header中添加keyMap<String, String> headers = event.getHeaders();String bodyStr = new String(body, Charset.forName("utf-8"));boolean flag=true;//符合启动日志特征if (bodyStr.contains(startFlag)) { // 启动日志headers.put("topic", "topic_start");flag=ETLUtil.validStartLog(bodyStr);}else {//事件日志headers.put("topic", "topic_event");flag=ETLUtil.validEventLog(bodyStr);}//如果验证结果是falseif (!flag) {return null;}return event;}/*** @功能描述: 一次拦截一组.*张俊杰 2020年10月19日 18:49*/@Overridepublic List<Event> intercept(List<Event> events) {//先清空resultsresults.clear(); // 这个方法是多次调用的,所有需要每次使用的时候先清空一下for (Event event : events) {Event result = intercept(event);//有可能intercept(event),event不符合要求,会拦截掉,返回nullif (result !=null) {//放入合法的数据集合中results.add(result);}}return results;}@Overridepublic void close() {}/*** @类说明: 构建拦截器对象* 张俊杰 2020年10月19日 18:48*/public static class Builder implements Interceptor.Builder{//从flume的配置文件中读取参数@Overridepublic void configure(Context context) {}//创建一个拦截器对象@Overridepublic Interceptor build() {return new MyInterceptor();}}}
ETLUtil
import java.util.Arrays;import java.util.List;import org.apache.commons.lang.StringUtils;import org.apache.commons.lang.math.NumberUtils;public class ETLUtil {//判断启动日志是否复合格式要求//验证JSON字符串的完整性,是否以{}开头结尾public static boolean validStartLog(String source) {//判断body部分是否有数据if (StringUtils.isBlank(source)) {return false;}//去前后空格String trimStr = source.trim();//验证JSON字符串的完整性,是否以{}开头结尾if (trimStr.startsWith("{") && trimStr.endsWith("}")) {return true;}return false;}/*** @功能描述: 判断事件日志是否复合格式要求* * 事件日志: 时间戳|{}* 时间戳需要合法:* a)长度合法(13位)* b)都是数字* 验证JSON字符串的完整性,是否以{}开头结尾* 不符合要求就返回false ,符合要求就返回true*张俊杰 2020年10月19日 18:55*/public static boolean validEventLog(String source) {//判断body部分是否有数据if (StringUtils.isBlank(source)) {return false;}//去前后空格String trimStr = source.trim();String[] words = trimStr.split("\\|");if (words.length != 2) {return false;}//判断时间戳// isNumber()判断值是否是数值类型 123L 0xxx// isDigits() 判断字符串中是否只能是0-9的数字if (words[0].length() !=13 || !NumberUtils.isDigits(words[0])) {return false;}//验证JSON字符串的完整性,是否以{}开头结尾if (words[1].startsWith("{") && words[1].endsWith("}")) {return true;}return false;}}
将编写的拦截器打成jar包放到Linux上
上传到apache-flume-1.7.0/lib 的lib目录下面
[root@zjj101 lib]# pwd/root/soft/apache-flume-1.7.0/lib[root@zjj101 lib]# rzrz waiting to receive.?a? zmodem ′??. °′ Ctrl+C ??.Transferring ETLInterceptor-0.0.1-SNAPSHOT.jar...100% 4 KB 4 KB/s 00:00:01 0 ′??[root@zjj101 lib]#
编写Flume配置文件测试一下看看拦截器是否有效果
#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔a1.sources = r1a1.sinks = k1a1.channels = c1#组名名.属性名=属性值a1.sources.r1.type=TAILDIRa1.sources.r1.filegroups=f1#读取/tmp/logs/app-yyyy-mm-dd.log ^代表以xxx开头$代表以什么结尾 .代表匹配任意字符#+代表匹配任意位置,^app.+.log$是正则表达式, 代表app开头 log结尾, . 代表匹配任意字符,+代表匹配任意位置.a1.sources.r1.filegroups.f1=/tmp/logs/^app.+.log$#JSON文件的保存位置,如果不配置的话默认是放到Java目录里面的.a1.sources.r1.positionFile=/root/soft/apache-flume-1.7.0/custdata/log_position.json#定义拦截器(写拦截器的Builder)a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = com.atguigu.dw.flume.MyInterceptor$Builder#定义sinka1.sinks.k1.type=logger#定义chanela1.channels.c1.type=memorya1.channels.c1.capacity=1000#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!a1.sources.r1.channels=c1a1.sinks.k1.channel=c1
执行shell命令去验证一下拦截器是否好使
flume-ng agent -c conf/ -n a1 -f /root/soft/apache-flume-1.7.0/conf/test.conf -Dflume.root.logger=DEBUG,console
-n 是指定agent 的名字
-f 是指定配置文件
-Dflume.root.logger=DEBUG,console 是在控制台打印日志
执行完了查看日志信息
2020-10-20 10:16:20,217 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:/root/soft/apache-flume-1.7.0/conf/test.conf for changes2020-10-20 10:16:50,220 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:/root/soft/apache-flume-1.7.0/conf/test.conf for changes2020-10-20 10:17:20,223 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:/root/soft/apache-flume-1.7.0/conf/test.conf for changes2020-10-20 10:17:50,225 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:/root/soft/apache-flume-1.7.0/conf/test.conf for changes2020-10-20 10:17:55,608 (PollableSourceRunner-TaildirSource-r1) [INFO - org.apache.flume.source.taildir.TaildirSource.closeTailFiles(TaildirSource.java:288)] Closed file: /tmp/logs/app-2020-10-15.log, inode: 1870589, pos: 6963532020-10-20 10:18:20,228 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:/root/soft/apache-flume-1.7.0/conf/test.conf for changes
查看记录读取位置的json文件,里面显示读取了/tmp/logs/app-2020-10-15.log 日志, 读取到696353
[root@zjj101 apache-flume-1.7.0]# cd custdata/[root@zjj101 custdata]# lslog_position.json[root@zjj101 custdata]# cat log_position.json[{"inode":1870589,"pos":696353,"file":"/tmp/logs/app-2020-10-15.log"}][root@zjj101 custdata]#
