前提

已经安装了Kafka Flume Zookeeper

数据准备

事件日志有时间戳
启动日志是json数据

/tmp/logs/app-2020-10-15.log

  1. {"action":"1","ar":"MX","ba":"Sumsung","detail":"","en":"start","entry":"4","extend1":"","g":"4GA40GN6@gmail.com","hw":"750*1134","l":"es","la":"-47.8","ln":"-52.3","loading_time":"0","md":"sumsung-14","mid":"934","nw":"4G","open_ad_type":"1","os":"8.2.0","sr":"I","sv":"V2.7.5","t":"1602681575013","uid":"934","vc":"18","vn":"1.0.3"}
  2. 1602753927636|{"cm":{"ln":"-119.3","sv":"V2.4.6","os":"8.1.7","g":"4S776BJJ@gmail.com","mid":"938","nw":"3G","l":"pt","vc":"17","hw":"640*960","ar":"MX","uid":"938","t":"1602735830539","la":"-4.2","md":"sumsung-4","vn":"1.3.4","ba":"Sumsung","sr":"K"},"ap":"app","et":[{"ett":"1602660297907","en":"newsdetail","kv":{"entry":"2","goodsid":"211","news_staytime":"15","loading_time":"0","action":"3","showtype":"5","category":"31","type1":"102"}},{"ett":"1602727431580","en":"loading","kv":{"extend2":"","loading_time":"0","action":"3","extend1":"","type":"3","type1":"","loading_way":"2"}},{"ett":"1602699168083","en":"active_foreground","kv":{"access":"","push_id":"1"}},{"ett":"1602669144748","en":"favorites","kv":{"course_id":1,"id":0,"add_time":"1602680968789","userid":5}}]}
  3. 1602753927636|{"cm":{"ln":"-87.5","sv":"V2.9.3","os":"8.2.9","g":"S77P900H@gmail.com","mid":"939","nw":"WIFI","l":"pt","vc":"2","hw":"750*1134","ar":"MX","uid":"939","t":"1602742306970","la":"-38.8","md":"Huawei-16","vn":"1.1.1","ba":"Huawei","sr":"K"},"ap":"app","et":[{"ett":"1602727456955","en":"loading","kv":{"extend2":"","loading_time":"21","action":"1","extend1":"","type":"3","type1":"","loading_way":"2"}},{"ett":"1602751097293","en":"notification","kv":{"ap_time":"1602727044268","action":"1","type":"3","content":""}},{"ett":"1602681925522","en":"active_background","kv":{"active_source":"2"}},{"ett":"1602735427209","en":"error","kv":{"errorDetail":"java.lang.NullPointerException\\n at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n at cn.lift.dfdf.web.AbstractBaseController.validInbound","errorBrief":"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)"}},{"ett":"1602678882273","en":"favorites","kv":{"course_id":2,"id":0,"add_time":"1602713010918","userid":3}}]}
  4. {"action":"1","ar":"MX","ba":"Sumsung","detail":"201","en":"start","entry":"2","extend1":"","g":"22265XL8@gmail.com","hw":"640*960","l":"es","la":"15.6","ln":"-66.8","loading_time":"14","md":"sumsung-0","mid":"940","nw":"3G","open_ad_type":"2","os":"8.2.1","sr":"N","sv":"V2.8.8","t":"1602672513312","uid":"940","vc":"4","vn":"1.0.0"}

编写Flume拦截器

  1. <dependency>
  2. <groupId>org.apache.flume</groupId>
  3. <artifactId>flume-ng-core</artifactId>
  4. <version>1.7.0</version>
  5. </dependency>

MyInterceptor

  1. import java.nio.charset.Charset;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.Map;
  5. import org.apache.flume.Context;
  6. import org.apache.flume.Event;
  7. import org.apache.flume.interceptor.Interceptor;
  8. public class MyInterceptor implements Interceptor{
  9. //创建一个放置复合要求数据的集合
  10. private List<Event> results=new ArrayList<>();
  11. private String startFlag="\"en\":\"start\""; // 启动日志特征是 这个字符串开头的
  12. @Override
  13. public void initialize() {
  14. }
  15. /**
  16. * @功能描述: 核心方法,拦截Event
  17. *张俊杰 2020年10月19日 18:48
  18. */
  19. @Override
  20. public Event intercept(Event event) {
  21. byte[] body = event.getBody();
  22. //在header中添加key
  23. Map<String, String> headers = event.getHeaders();
  24. String bodyStr = new String(body, Charset.forName("utf-8"));
  25. boolean flag=true;
  26. //符合启动日志特征
  27. if (bodyStr.contains(startFlag)) { // 启动日志
  28. headers.put("topic", "topic_start");
  29. flag=ETLUtil.validStartLog(bodyStr);
  30. }else {
  31. //事件日志
  32. headers.put("topic", "topic_event");
  33. flag=ETLUtil.validEventLog(bodyStr);
  34. }
  35. //如果验证结果是false
  36. if (!flag) {
  37. return null;
  38. }
  39. return event;
  40. }
  41. /**
  42. * @功能描述: 一次拦截一组.
  43. *张俊杰 2020年10月19日 18:49
  44. */
  45. @Override
  46. public List<Event> intercept(List<Event> events) {
  47. //先清空results
  48. results.clear(); // 这个方法是多次调用的,所有需要每次使用的时候先清空一下
  49. for (Event event : events) {
  50. Event result = intercept(event);
  51. //有可能intercept(event),event不符合要求,会拦截掉,返回null
  52. if (result !=null) {
  53. //放入合法的数据集合中
  54. results.add(result);
  55. }
  56. }
  57. return results;
  58. }
  59. @Override
  60. public void close() {
  61. }
  62. /**
  63. * @类说明: 构建拦截器对象
  64. * 张俊杰 2020年10月19日 18:48
  65. */
  66. public static class Builder implements Interceptor.Builder{
  67. //从flume的配置文件中读取参数
  68. @Override
  69. public void configure(Context context) {
  70. }
  71. //创建一个拦截器对象
  72. @Override
  73. public Interceptor build() {
  74. return new MyInterceptor();
  75. }
  76. }
  77. }

ETLUtil

  1. import java.util.Arrays;
  2. import java.util.List;
  3. import org.apache.commons.lang.StringUtils;
  4. import org.apache.commons.lang.math.NumberUtils;
  5. public class ETLUtil {
  6. //判断启动日志是否复合格式要求
  7. //验证JSON字符串的完整性,是否以{}开头结尾
  8. public static boolean validStartLog(String source) {
  9. //判断body部分是否有数据
  10. if (StringUtils.isBlank(source)) {
  11. return false;
  12. }
  13. //去前后空格
  14. String trimStr = source.trim();
  15. //验证JSON字符串的完整性,是否以{}开头结尾
  16. if (trimStr.startsWith("{") && trimStr.endsWith("}")) {
  17. return true;
  18. }
  19. return false;
  20. }
  21. /**
  22. * @功能描述: 判断事件日志是否复合格式要求
  23. * * 事件日志: 时间戳|{}
  24. * 时间戳需要合法:
  25. * a)长度合法(13位)
  26. * b)都是数字
  27. * 验证JSON字符串的完整性,是否以{}开头结尾
  28. * 不符合要求就返回false ,符合要求就返回true
  29. *张俊杰 2020年10月19日 18:55
  30. */
  31. public static boolean validEventLog(String source) {
  32. //判断body部分是否有数据
  33. if (StringUtils.isBlank(source)) {
  34. return false;
  35. }
  36. //去前后空格
  37. String trimStr = source.trim();
  38. String[] words = trimStr.split("\\|");
  39. if (words.length != 2) {
  40. return false;
  41. }
  42. //判断时间戳
  43. // isNumber()判断值是否是数值类型 123L 0xxx
  44. // isDigits() 判断字符串中是否只能是0-9的数字
  45. if (words[0].length() !=13 || !NumberUtils.isDigits(words[0])) {
  46. return false;
  47. }
  48. //验证JSON字符串的完整性,是否以{}开头结尾
  49. if (words[1].startsWith("{") && words[1].endsWith("}")) {
  50. return true;
  51. }
  52. return false;
  53. }
  54. }

将编写的拦截器打成jar包放到Linux上

上传到apache-flume-1.7.0/lib 的lib目录下面

  1. [root@zjj101 lib]# pwd
  2. /root/soft/apache-flume-1.7.0/lib
  3. [root@zjj101 lib]# rz
  4. rz waiting to receive.
  5. ?a? zmodem ′??. °′ Ctrl+C ??.
  6. Transferring ETLInterceptor-0.0.1-SNAPSHOT.jar...
  7. 100% 4 KB 4 KB/s 00:00:01 0 ′?
  8. ?[root@zjj101 lib]#

编写Flume配置文件测试一下看看拦截器是否有效果

  1. #a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. #组名名.属性名=属性值
  6. a1.sources.r1.type=TAILDIR
  7. a1.sources.r1.filegroups=f1
  8. #读取/tmp/logs/app-yyyy-mm-dd.log ^代表以xxx开头$代表以什么结尾 .代表匹配任意字符
  9. #+代表匹配任意位置,^app.+.log$是正则表达式, 代表app开头 log结尾, . 代表匹配任意字符,+代表匹配任意位置.
  10. a1.sources.r1.filegroups.f1=/tmp/logs/^app.+.log$
  11. #JSON文件的保存位置,如果不配置的话默认是放到Java目录里面的.
  12. a1.sources.r1.positionFile=/root/soft/apache-flume-1.7.0/custdata/log_position.json
  13. #定义拦截器(写拦截器的Builder)
  14. a1.sources.r1.interceptors = i1
  15. a1.sources.r1.interceptors.i1.type = com.atguigu.dw.flume.MyInterceptor$Builder
  16. #定义sink
  17. a1.sinks.k1.type=logger
  18. #定义chanel
  19. a1.channels.c1.type=memory
  20. a1.channels.c1.capacity=1000
  21. #连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
  22. a1.sources.r1.channels=c1
  23. a1.sinks.k1.channel=c1

执行shell命令去验证一下拦截器是否好使

  1. flume-ng agent -c conf/ -n a1 -f /root/soft/apache-flume-1.7.0/conf/test.conf -Dflume.root.logger=DEBUG,console

-n 是指定agent 的名字
-f 是指定配置文件
-Dflume.root.logger=DEBUG,console 是在控制台打印日志

执行完了查看日志信息

  1. 2020-10-20 10:16:20,217 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:/root/soft/apache-flume-1.7.0/conf/test.conf for changes
  2. 2020-10-20 10:16:50,220 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:/root/soft/apache-flume-1.7.0/conf/test.conf for changes
  3. 2020-10-20 10:17:20,223 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:/root/soft/apache-flume-1.7.0/conf/test.conf for changes
  4. 2020-10-20 10:17:50,225 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:/root/soft/apache-flume-1.7.0/conf/test.conf for changes
  5. 2020-10-20 10:17:55,608 (PollableSourceRunner-TaildirSource-r1) [INFO - org.apache.flume.source.taildir.TaildirSource.closeTailFiles(TaildirSource.java:288)] Closed file: /tmp/logs/app-2020-10-15.log, inode: 1870589, pos: 696353
  6. 2020-10-20 10:18:20,228 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:/root/soft/apache-flume-1.7.0/conf/test.conf for changes

查看记录读取位置的json文件,里面显示读取了/tmp/logs/app-2020-10-15.log 日志, 读取到696353

  1. [root@zjj101 apache-flume-1.7.0]# cd custdata/
  2. [root@zjj101 custdata]# ls
  3. log_position.json
  4. [root@zjj101 custdata]# cat log_position.json
  5. [{"inode":1870589,"pos":696353,"file":"/tmp/logs/app-2020-10-15.log"}]
  6. [root@zjj101 custdata]#