简介

反向etl区别与etl,其主要目的在于数据仓库到下游系统, etl更多的是为了 数仓的创建.
反向etl目的在于提供一个标准声明,方便下游app共享基础能力和扩展.
用途: 主数据订阅/第三方数据订阅等

核心组件

  • easybatch 批处理
  • xxljob 分布式调度
  • pf4j 插件化开发
  • dremio 数据湖(大于数仓,提供更强大的olap能力)

easybatch 批处理结合 xxljob调度即可实现 简单可靠的数据分发功能,配合数据湖/pf4j 插件开发,可以实现相当灵活的 : 反向etl功能/数据订阅.

  1. public interface IBatch<SOURCE, SINK> {
  2. JobExecutor jobExecutor();
  3. RecordReader reader(SOURCE i);
  4. RecordWriter writer(BatchResult result, SINK o);
  5. RecordMapper mapper();
  6. default CustomPipelineListener customPipelineListener(String jobId) {
  7. return new CustomPipelineListener(jobId);
  8. }
  9. default CustomWriterListener customWriterListener(String jobId) {
  10. return new CustomWriterListener(jobId);
  11. }
  12. default CustomReaderListener customReaderListener(String jobId) {
  13. return new CustomReaderListener(jobId);
  14. }
  15. // 可选
  16. default RecordProcessor processor() {
  17. return null;
  18. }
  19. default BatchResult execute(SOURCE source, SINK sink) {
  20. BatchResult result = new BatchResult(UUID.randomUUID().toString(), XxlJobHelper.getJobId());
  21. JobExecutor jobExecutor = jobExecutor();
  22. JobBuilder jobBuilder = new JobBuilder();
  23. jobBuilder.reader(reader(source));
  24. jobBuilder.writer(writer(result, sink));
  25. //jobBuilder.batchSize(100);
  26. RecordMapper mapper = mapper();
  27. if (mapper != null) {
  28. jobBuilder.mapper(mapper);
  29. }
  30. RecordProcessor processor = processor();
  31. if (processor != null) {
  32. jobBuilder.processor(processor);
  33. }
  34. jobBuilder.readerListener(customReaderListener(result.getJobId()));
  35. jobBuilder.writerListener(customWriterListener(result.getJobId()));
  36. jobBuilder.pipelineListener(customPipelineListener(result.getJobId()));
  37. Job job = jobBuilder.build();
  38. JobReport jobReport = jobExecutor.execute(job);
  39. result.setJobReport(jobReport.getStatus().toString());
  40. return result;
  41. }
  42. }
  1. @Data
  2. @AllArgsConstructor
  3. @Slf4j
  4. public class BatchResult {
  5. private String jobId;
  6. private long xxlJobId;
  7. /**
  8. * 总分发记录数
  9. */
  10. private Long totalNum;
  11. /**
  12. * 当前分发记录序号
  13. */
  14. private Long currentNum;
  15. /**
  16. * 单次写请求的数据量
  17. */
  18. private Integer batchSize;
  19. private Boolean success;
  20. private String msg;
  21. private String jobReport;
  22. private int successNum;
  23. private int errorNum;
  24. }