方案

一般为维度拆解。

方案描述:分层拆解,并基于对整体的贡献度以及维度各维值的变化的离散度 进行剪枝。

对整体的贡献度:每个节点都计算是对根节点变化的贡献度,而不是对父节点变化的贡献度。

  • 选定维度:初始拆解时,先看各维度下各维值变化的离散度来决定该维度是否可拆解(认为如果离散度不大,即各维值变化都差不多,则无需拆解该维度)
  • 选定维值:计算各维值的贡献度,如果贡献度小于阈值,则剪枝(这里还可以有很多优化策略,比如只保留维值数据占比较大节点,或者只保留步骤1计算出的“离散点”)
  • 重复以上两个步骤
  • 最终,排序节点得到top结果。一般按贡献度排序。而排序策略上又有多种,所有层的所有节点排序,或者只排序叶子节点。(目前的认知:只排序叶子节点会更好,否则会出现重复路径,即维度A-B和A-B-C同时出现在结论中)

特点:

  • 按需剪枝,而不是全量拆解
  • 每一层的贡献度都可能>1,并可能<0。并且子节点和贡献度和父节点的贡献度没有大小关系,都是针对于根节点的变化贡献
  • 每一层的节点的贡献度相加等于1,而不是所有层的节点相加等于1。
  • 最终给出的结论信息,贡献度也可能>1,并且可能明显相加不等于1,可能难理解


贡献度计算

对于数值型数据,贡献度为 : 该节点的变化量 / 根节点的变化量

对于比率型数据:TODO

剪枝策略

子指标拆解:
上述方案并没有考虑子指标拆解,而是单纯的维度拆解。实际上,可以子指标拆解和维度拆解相结合(比如,先找到贡献度大的子指标,然后对子指标做维度拆解)

这时候可能涉及到判断父子指标的相关性。这里有线性相关性算法,比如皮尔森系数等

决定维度是否继续往下拆解
方法1:上述方案中的根据“维度各维值的变化的离散度”决定该维度是否可拆

(1)判断维度下所有维值的变化量是否存在异常值,常见方法:标准差和方差。详见 https://cloud.tencent.com/developer/article/1463726

方法2:维度间排序

用历史占比来与变化量占比的差异性来衡量。假定比较算子为日环比,昨天核心指标值为波动分析技术点 - 图1,在维度波动分析技术点 - 图2上的指标值分别为波动分析技术点 - 图3(即共有n个维度值);今天核心指标为为波动分析技术点 - 图4,在维度波动分析技术点 - 图5上的指标值分别为波动分析技术点 - 图6。那么,历史占比:

波动分析技术点 - 图7

变化量占比(可视作为变化贡献度):

波动分析技术点 - 图8

可以发现,两个向量是等长的。为了度量二者间的差异性,我们采用距离度量;目前采用Euclidean Distance,

波动分析技术点 - 图9%5E2%7D%20%20%0A#card=math&code=d%20%3D%20%5Csqrt%7B%5Csum_i%20%28h_i%20-%20v_i%29%5E2%7D%20%20%0A&id=Tyq7u)

然后对所有维度计算出来的这种差异性排序,排在前面的维度代表要优先拆解。

判断样本相似度,一般采用距离,典型方法:https://www.cnblogs.com/heaad/archive/2011/03/08/1977733.html

其他剪枝策略:
剪枝策略可以有很多种,常见的是基于贡献度阈值。如果考虑到维度下维值较多或较少,或者维值数据占比大不大等影响,还可以增加相应的策略(比如维值数据占比小于一定阈值则剪枝)

结果排序策略

一般按贡献度排序。而排序策略上又有多种,所有层的所有节点排序,或者只排序叶子节点。(目前的认知:只排序叶子节点会更好,否则会出现重复路径,即维度A-B和A-B-C同时出现在结论中)

思考

剪枝是为了性能优化 还是 保证结果准确性?
剪枝过程是个调优的过程。剪枝狠了会影响结果准确性。剪枝策略越多也越影响准确性。
结果排序策略也影响准确性。
这里也涉及到一个如何衡量结果准确性的问题,如何衡量?

代码实现

mr

  1. package com.aliexpress.diagnose.mr;
  2. import com.alibaba.fastjson.JSON;
  3. import com.aliyun.odps.OdpsException;
  4. import com.aliyun.odps.data.TableInfo;
  5. import com.aliyun.odps.data.TableInfo.TableInfoBuilder;
  6. import com.aliyun.odps.mapred.JobClient;
  7. import com.aliyun.odps.mapred.conf.JobConf;
  8. import com.aliyun.odps.mapred.utils.InputUtils;
  9. import com.aliyun.odps.mapred.utils.OutputUtils;
  10. import com.aliyun.odps.mapred.utils.SchemaUtils;
  11. import lombok.extern.slf4j.Slf4j;
  12. /**
  13. * deploy jar to server: add jar /Users/antone/Documents/workspace/op-sycm-ae/sycm-diagnose/target/sycm-diagnose-1
  14. * .0.0-SNAPSHOT.jar -f
  15. *
  16. * 远程执行命令:jar -resources sycm-diagnose-1.0.0-SNAPSHOT.jar -classpath
  17. * /Users/antone/Documents/workspace/op-sycm-ae/sycm-diagnose/target/sycm-diagnose-1.0.0-SNAPSHOT.jar com.aliexpress
  18. * .diagnose.mr.FluctuationMrDriver -inputTable aebi.adi_ae_sycm_trd_diag_analysis_1d -outputTable
  19. * adi_ae_sycm_trd_diag_analysis_1d_test_rst -partitionDate 20210318 -dateRage 1
  20. *
  21. * @date 2021/3/18
  22. */
  23. @Slf4j
  24. public class FluctuationMrDriver {
  25. public static void main(String[] args) throws OdpsException {
  26. Args arg = Args.parse(args);
  27. JobConf job = new JobConf();
  28. job.setMapperClass(CubeMapper.class);
  29. job.setReducerClass(FluctuationReducer.class);
  30. job.setMapOutputKeySchema(SchemaUtils.fromString("seller_admin_seq:bigint"));
  31. job.setMapOutputValueSchema(
  32. SchemaUtils.fromString("seller_admin_seq:bigint,cate_lv2_id:bigint,country_id:string,"
  33. + "platform:string,source_lv2_desc:string,"
  34. + "buyer_type:string,visit_type:string,item_id:bigint,"
  35. + "pay_amt_1d:bigint,uv_1d:bigint,"
  36. + "pay_buyer_cnt_1d:bigint,pay_amt_per_1d:bigint,"
  37. + "uv_per_1d:bigint,pay_buyer_cnt_per_1d:bigint"));
  38. InputUtils.addTable(getTableInfoBuilder(arg), job);
  39. OutputUtils.addTable(TableInfo.builder()
  40. .tableName(arg.getOutputTable())
  41. .partSpec("dt=" + arg.getPartitionDate() +
  42. "/date_range_partition=" + arg.getDateRange())
  43. .build(),
  44. job);
  45. job.set(Args.ARGS, JSON.toJSONString(arg));
  46. JobClient.runJob(job);
  47. }
  48. private static TableInfo getTableInfoBuilder(Args arg) {
  49. TableInfoBuilder inputTableBuilder = TableInfo.builder()
  50. .tableName(arg.getInputTable())
  51. .partSpec("ds=" + arg.getPartitionDate() +
  52. "/date_range=" + arg.getDateRange());
  53. return inputTableBuilder.build();
  54. }
  55. }
  56. package com.aliexpress.diagnose.mr;
  57. import java.io.IOException;
  58. import com.aliexpress.diagnose.indicator.model.DataKeys;
  59. import com.aliyun.odps.data.Record;
  60. import com.aliyun.odps.mapred.MapperBase;
  61. import lombok.extern.slf4j.Slf4j;
  62. import org.apache.commons.lang.StringUtils;
  63. import static com.aliexpress.diagnose.mr.MrDataRepositoryProvider.BUYER_TYPE_KEY;
  64. import static com.aliexpress.diagnose.mr.MrDataRepositoryProvider.PLATFORM_KEY;
  65. import static com.aliexpress.diagnose.mr.MrDataRepositoryProvider.SELLER_ID_KEY;
  66. /**
  67. * @date 2021/3/19
  68. */
  69. @Slf4j
  70. public class CubeMapper extends MapperBase {
  71. private Record key;
  72. @Override
  73. public void setup(TaskContext context) throws IOException {
  74. key = context.createMapOutputKeyRecord();
  75. }
  76. /**
  77. * 按sellerid为key 处理 每一条记录
  78. *
  79. * @param recordNum
  80. * @param record
  81. * @param context
  82. * @throws IOException
  83. */
  84. @Override
  85. public void map(long recordNum, Record record, TaskContext context) throws IOException {
  86. key.setBigint(SELLER_ID_KEY, record.getBigint(SELLER_ID_KEY));
  87. // 增加数据过滤,源表中存在platform buyer_type维度的cube数据,业务上不需要对此维度进行下钻分析,过滤出来两个维度=-9999的数据
  88. if (filter(record)) {
  89. context.write(key, record);
  90. }
  91. }
  92. /**
  93. * 对数据进行过滤,
  94. * 源表中存在platform buyer_type维度的cube数据,业务上不需要对此维度进行下钻分析,过滤出来两个维度=-9999的数据
  95. *
  96. * @param record
  97. * @return
  98. */
  99. private boolean filter(Record record) {
  100. return StringUtils.equals(record.getString(PLATFORM_KEY), DataKeys.TOTAL_DV)
  101. && StringUtils.equals(record.getString(BUYER_TYPE_KEY), DataKeys.TOTAL_DV);
  102. }
  103. }
  104. package com.aliexpress.diagnose.mr;
  105. import java.io.IOException;
  106. import java.util.ArrayList;
  107. import java.util.Iterator;
  108. import com.alibaba.fastjson.JSON;
  109. import com.aliexpress.diagnose.FluctuationService;
  110. import com.aliexpress.diagnose.indicator.RecordRepository;
  111. import com.aliexpress.diagnose.request.FluctuationAnalysisParam;
  112. import com.aliexpress.diagnose.result.model.ResultData;
  113. import com.aliyun.odps.data.Record;
  114. import com.aliyun.odps.mapred.ReducerBase;
  115. import lombok.extern.slf4j.Slf4j;
  116. import static com.aliexpress.diagnose.mr.MrDataRepositoryProvider.ALL_DIMENSIONS;
  117. import static com.aliexpress.diagnose.mr.MrDataRepositoryProvider.SELLER_ID_KEY;
  118. /**
  119. * @date 2021/3/19
  120. */
  121. @Slf4j
  122. public class FluctuationReducer extends ReducerBase {
  123. private Record result;
  124. @Override
  125. public void setup(TaskContext context) throws IOException {
  126. result = context.createOutputRecord();
  127. }
  128. /**
  129. * 同一sellerid的所有记录在一个reduce节点聚合
  130. * 核心就是将所有记录 转换为RecordRepositoryImpl的indicatorValues结构
  131. *
  132. * @param key
  133. * @param values
  134. * @param context
  135. * @throws IOException
  136. */
  137. @Override
  138. public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
  139. Long sellerId = key.getBigint(SELLER_ID_KEY);
  140. RecordRepository repository = new MrDataRepositoryProvider(sellerId, values).create();
  141. FluctuationService fluctuationService = new FluctuationService(repository);
  142. Args args = JSON.parseObject(context.getJobConf().get(Args.ARGS), Args.class);
  143. FluctuationAnalysisParam param = FluctuationAnalysisParam.builder()
  144. .indexCode("pay_amt_1d")
  145. .statDate(null)
  146. .currentDimensionAndValue(new ArrayList<>())
  147. .drillDownDimensions(ALL_DIMENSIONS)
  148. .contributionLowerLimit(args.getContributionLowerLimit().doubleValue())
  149. .cutByDimValueChangeDistribution(args.isCutByDimValueChangeDistribution())
  150. .resultGenStrategy(args.getResultGenStrategy())
  151. .resultTopN(args.getResultTopN())
  152. .build();
  153. ResultData resultData = null;
  154. try {
  155. resultData = fluctuationService.fluctuationAnalysis(param);
  156. log.info("reducerResult {} {}", sellerId, JSON.toJSONString(resultData));
  157. } catch (Exception e) {
  158. log.info("fluctuationAnalysisError {}", sellerId, e);
  159. }
  160. if (resultData != null) {
  161. result.setBigint(SELLER_ID_KEY, sellerId);
  162. result.setString("idx_name", "pay_amt_1d");
  163. result.setString("result", JSON.toJSONString(resultData));
  164. result.setString("date_range", args.getDateRange());
  165. context.write(result);
  166. }
  167. }
  168. @Override
  169. public void cleanup(TaskContext context) throws IOException {
  170. }
  171. }
  172. package com.aliexpress.diagnose.mr;
  173. import java.util.Arrays;
  174. import java.util.HashMap;
  175. import java.util.HashSet;
  176. import java.util.Iterator;
  177. import java.util.List;
  178. import java.util.Map;
  179. import java.util.Set;
  180. import com.aliexpress.diagnose.GmvDims;
  181. import com.aliexpress.diagnose.indicator.RecordRepository;
  182. import com.aliexpress.diagnose.indicator.impl.RecordRepositoryImpl;
  183. import com.aliexpress.diagnose.indicator.model.DataKeys;
  184. import com.aliyun.odps.data.Record;
  185. import com.google.common.collect.Maps;
  186. import lombok.extern.slf4j.Slf4j;
  187. /**
  188. * @date 2021/3/18
  189. */
  190. @Slf4j
  191. public class MrDataRepositoryProvider {
  192. private Iterator<Record> records;
  193. private Long sellerId;
  194. public static final String SELLER_ID_KEY = "seller_admin_seq";
  195. //平台维度
  196. public static final String PLATFORM_KEY = "platform";
  197. //买家维度
  198. public static final String BUYER_TYPE_KEY = "buyer_type";
  199. public static final List<String> ALL_DIMENSIONS = GmvDims.getAllDims();
  200. private static final List<String> indicators = Arrays.asList("pay_amt_per_1d", "pay_amt_1d");
  201. public MrDataRepositoryProvider(Long sellerId, Iterator<Record> records) {
  202. this.sellerId = sellerId;
  203. this.records = records;
  204. }
  205. public RecordRepository create() {
  206. Map<String, Map<String, Double>> indicatorValues = new HashMap<>(1000);
  207. Map<String, Set<String>> dimensionValues = Maps.newHashMapWithExpectedSize(ALL_DIMENSIONS.size());
  208. ALL_DIMENSIONS.forEach(d -> dimensionValues.put(d, new HashSet<>()));
  209. while (records.hasNext()) {
  210. Record record = records.next();
  211. /**
  212. * 维值枚举添加,剔除 -9999
  213. */
  214. ALL_DIMENSIONS.forEach(d -> {
  215. String dv = String.valueOf(record.get(d));
  216. if (!DataKeys.TOTAL_DV.equals(dv)) {
  217. dimensionValues.get(d).add(dv);
  218. }
  219. });
  220. /**
  221. * 维度组合 key 对应 指标 map 构建
  222. */
  223. Map<String, String> currentDV = Maps.newHashMapWithExpectedSize(ALL_DIMENSIONS.size());
  224. ALL_DIMENSIONS.forEach(d -> {
  225. String dv = String.valueOf(record.get(d));
  226. currentDV.put(d, dv);
  227. });
  228. String key = DataKeys.encode(currentDV, ALL_DIMENSIONS);
  229. Map<String, Double> currentV = Maps.newHashMapWithExpectedSize(indicators.size());
  230. for (String indicator : indicators) {
  231. currentV.put(indicator, Double.valueOf(String.valueOf(record.get(indicator))));
  232. }
  233. indicatorValues.put(key, currentV);
  234. }
  235. //log.info("repoDimensionValues {},{}", this.sellerId, JSON.toJSONString(dimensionValues));
  236. return RecordRepositoryImpl.builder()
  237. .indicatorValues(indicatorValues)
  238. .dimensionValues(dimensionValues)
  239. .build();
  240. }
  241. }

task

  1. package com.aliexpress.diagnose.task;
  2. import java.util.concurrent.ForkJoinPool;
  3. import com.aliexpress.diagnose.indicator.RecordRepository;
  4. import com.aliexpress.diagnose.node.NodeFactory;
  5. import com.aliexpress.diagnose.node.NodeService;
  6. import com.aliexpress.diagnose.node.model.BaseNode;
  7. import com.aliexpress.diagnose.request.AnalysisConfig;
  8. /**
  9. * @author antone
  10. */
  11. public class AnalysisJob {
  12. private RecordRepository recordRepository;
  13. private NodeService nodeService;
  14. private NodeFactory nodeFactory;
  15. public AnalysisJob(RecordRepository recordRepository) {
  16. this.recordRepository = recordRepository;
  17. this.nodeService = new NodeService(recordRepository);
  18. this.nodeFactory = new NodeFactory(recordRepository);
  19. }
  20. public BaseNode analysis(AnalysisConfig analysisConfig) {
  21. // 1. 初始构建根节点
  22. BaseNode rootNode = nodeService.createRootNode(analysisConfig);
  23. if(rootNode.withNoFluctuation()) {
  24. return rootNode;
  25. }
  26. // 2. 开始下钻
  27. // TODO 测试性能,优化ForkJoinPool的使用
  28. ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() + 1);
  29. DrillDownNodeTask drillDownNodeTask = new DrillDownNodeTask(rootNode, analysisConfig, nodeFactory);
  30. return forkJoinPool.invoke(drillDownNodeTask);
  31. }
  32. }
  33. package com.aliexpress.diagnose.task;
  34. import java.util.HashMap;
  35. import java.util.List;
  36. import java.util.Map;
  37. import java.util.concurrent.RecursiveTask;
  38. import java.util.stream.Collectors;
  39. import com.aliexpress.diagnose.indicator.IndexRepository;
  40. import com.aliexpress.diagnose.indicator.model.Index;
  41. import com.aliexpress.diagnose.node.CutService;
  42. import com.aliexpress.diagnose.node.NodeFactory;
  43. import com.aliexpress.diagnose.node.model.BaseNode;
  44. import com.aliexpress.diagnose.node.model.CreateDrillDownNodeParam;
  45. import com.aliexpress.diagnose.request.AnalysisConfig;
  46. /**
  47. * @author minglei.jml
  48. */
  49. public class DrillDownNodeTask extends RecursiveTask<BaseNode> {
  50. /**
  51. * 当前待下钻的根节点
  52. */
  53. private BaseNode node;
  54. private AnalysisConfig config;
  55. private NodeFactory nodeFactory;
  56. public DrillDownNodeTask(BaseNode node, AnalysisConfig config, NodeFactory nodeFactory) {
  57. this.node = node;
  58. this.config = config;
  59. this.nodeFactory = nodeFactory;
  60. }
  61. @Override
  62. protected BaseNode compute() {
  63. // 维度下钻
  64. scanDimensions();
  65. return node;
  66. }
  67. private void scanDimensions() {
  68. Index index = IndexRepository.getInstance().get(config.getIndexCode());
  69. //key:下钻维度;存储每个维度的下一级下钻任务
  70. Map<String, List<DrillDownNodeTask>> subTaskMap = new HashMap<>(2);
  71. //key:下钻维度;存储每个维度下的下一级子节点
  72. Map<String, List<BaseNode>> subNodeMap = new HashMap<>(2);
  73. if (node.getDrillDownDimensions() == null || node.getDrillDownDimensions().isEmpty()) {
  74. // 没有可下钻维度
  75. return;
  76. }
  77. // 对每个可下钻维度分别下钻
  78. node.getDrillDownDimensions().forEach(
  79. dimension -> {
  80. CreateDrillDownNodeParam param = CreateDrillDownNodeParam.builder()
  81. .fatherNode(node)
  82. .indexType(index.getType())
  83. .chosenDrillDownDimension(dimension)
  84. .allDimensions(index.getDimensions())
  85. .statDate(config.getStatDate())
  86. .build();
  87. List<BaseNode> subNodes = nodeFactory.createDrillDownNode(param);
  88. // 剪枝
  89. if (subNodes != null) {
  90. subNodes = CutService.getInstance().cutNode(subNodes, config, node, dimension);
  91. }
  92. // 创建子节点的下钻任务
  93. if (subNodes != null && subNodes.size() != 0) {
  94. List<DrillDownNodeTask> tasks = subNodes.stream().map(subNode -> {
  95. DrillDownNodeTask task = new DrillDownNodeTask(subNode, config, nodeFactory);
  96. // fork:开启一个新线程(或重用线程池里的空闲线程),将任务交给该线程处理
  97. task.fork();
  98. return task;
  99. }).collect(Collectors.toList());
  100. subTaskMap.put(dimension, tasks);
  101. }
  102. }
  103. );
  104. // 递归执行 每个子节点任务,并将子节点结果合并
  105. // join: 等待线程处理完毕,得到返回值
  106. subTaskMap.forEach((key, value) -> subNodeMap.put(key,
  107. value.stream().map(v -> v.join()).collect(Collectors.toList())));
  108. node.setSubNodes(subNodeMap);
  109. }
  110. }

node

  1. package com.aliexpress.diagnose.node.model;
  2. import java.util.List;
  3. /**
  4. * @author minglei.jml
  5. */
  6. public interface Node {
  7. /**
  8. * 计算当前节点贡献度
  9. * @return contribution of current node
  10. */
  11. Double computeContribution();
  12. Double computeFluctuationRatio();
  13. /**
  14. * 计算根节点的变化率
  15. * @return
  16. */
  17. Double computeRootNodeFluctuationRatio();
  18. /**
  19. * 获取当前节点所有子节点
  20. * @return
  21. */
  22. List<BaseNode> getAllSubNodes();
  23. /**
  24. * copy root节点的total值 到 当前节点的total字段上
  25. * @param node
  26. */
  27. void copyTotalValue(BaseNode node);
  28. /**
  29. * 把当前节点的值赋值给total,主要用在根节点
  30. */
  31. void copyValueToTotal();
  32. }
  33. package com.aliexpress.diagnose.node.model;
  34. import java.util.ArrayList;
  35. import java.util.List;
  36. import java.util.Map;
  37. import java.util.Objects;
  38. import java.util.stream.Collectors;
  39. import com.aliexpress.diagnose.util.DimUtils;
  40. import com.aliexpress.diagnose.util.MathUtils;
  41. import com.google.common.base.Joiner;
  42. import lombok.AllArgsConstructor;
  43. import lombok.Builder;
  44. import lombok.Data;
  45. import lombok.NoArgsConstructor;
  46. /**
  47. * @author minglei.jml
  48. */
  49. @Data
  50. @Builder
  51. @NoArgsConstructor
  52. @AllArgsConstructor
  53. public class BaseNode implements Node {
  54. private String indexCode;
  55. //private Map<String, String> currentDimensionAndValue;
  56. /**
  57. * 注意这里是个list结构,是为了在当前节点能直接还原出拆解路径,便于排查问题等
  58. * 比如:当前维值为 [B:b, C:c, A:a], 那么必然是从B->C->A这条拆解路径来的,而不是其他A-B-C等路径
  59. */
  60. private List<DimValue> currentDimensionAndValue;
  61. /**
  62. * 可下钻维度
  63. */
  64. private List<String> drillDownDimensions;
  65. /**
  66. * 子节点
  67. * key: 每个下钻的维度名
  68. * value:维度下的子节点
  69. */
  70. private Map<String, List<BaseNode>> subNodes;
  71. /**
  72. * 贡献度
  73. */
  74. private Double contribution;
  75. /**
  76. * 深度
  77. */
  78. private Integer depth;
  79. /**
  80. * 节点的对比值
  81. */
  82. private Double oldValue;
  83. /**
  84. * 节点的当前值
  85. */
  86. private Double newValue;
  87. /**
  88. * 根节点的对比值
  89. */
  90. private Double oldTotalValue;
  91. /**
  92. * 根节点的当前值
  93. */
  94. private Double newTotalValue;
  95. //private Integer cutNodesCount;
  96. BaseNode(String indexCode, List<DimValue> currentDimensionAndValue, List<String> drillDownDimensions,
  97. Integer depth, Double oldValue, Double newValue, Double oldTotalValue, Double newTotalValue) {
  98. this.indexCode = indexCode;
  99. this.currentDimensionAndValue = currentDimensionAndValue;
  100. this.drillDownDimensions = drillDownDimensions;
  101. this.depth = depth;
  102. this.oldValue = oldValue;
  103. this.newValue = newValue;
  104. this.oldTotalValue = oldTotalValue;
  105. this.newTotalValue = newTotalValue;
  106. }
  107. @Override
  108. public List<BaseNode> getAllSubNodes() {
  109. List<BaseNode> nodes = new ArrayList<>();
  110. if (subNodes == null || subNodes.isEmpty()) {
  111. return nodes;
  112. }
  113. subNodes.values().forEach(nodes::addAll);
  114. return nodes;
  115. }
  116. @Override
  117. public Double computeContribution() {
  118. return null;
  119. }
  120. @Override
  121. public Double computeFluctuationRatio() {
  122. return MathUtils.ratio(getNewValue(), getOldValue());
  123. }
  124. @Override
  125. public Double computeRootNodeFluctuationRatio() {
  126. return MathUtils.ratio(getNewTotalValue(), getOldTotalValue());
  127. }
  128. @Override
  129. public void copyTotalValue(BaseNode node) {
  130. }
  131. @Override
  132. public void copyValueToTotal() {
  133. }
  134. /**
  135. * 判断当前节点是否包含另一节点
  136. *
  137. * 判断规则:
  138. * 路径 A-B-C 包含 路径 A-B
  139. * 路径A-C-B 也包含路径 A-B
  140. *
  141. * @param compareNode
  142. * @return
  143. */
  144. public boolean containsNode(BaseNode compareNode) {
  145. Map<String, String> currentDimValueMap = this.dimValueListToMap();
  146. Map<String, String> compareDimValueMap = compareNode.dimValueListToMap();
  147. // 先判断当前维度集合是否完全包含对比的维度集合
  148. if (!currentDimValueMap.keySet().containsAll(compareDimValueMap.keySet())) {
  149. return false;
  150. }
  151. // 在当前维度集合是否完全包含对比的维度集合的前提下,currentNode所有维值都包含compareNode的对应维值才认为currentNode contains compareNode
  152. return !currentDimValueMap.keySet().stream()
  153. .map(dim -> DimUtils.dimValueContains(currentDimValueMap.get(dim),
  154. compareDimValueMap.get(dim)))
  155. .collect(Collectors.toList())
  156. .contains(false);
  157. }
  158. public Map<String, String> dimValueListToMap() {
  159. // list 转 map
  160. return currentDimensionAndValue.stream().collect(
  161. Collectors.toMap(DimValue::getDim, DimValue::getValue));
  162. }
  163. /**
  164. * dimValue转换为string格式:a=a1^b=b1...
  165. *
  166. * @return
  167. */
  168. public String dimValueToString() {
  169. return Joiner.on("^").join(
  170. currentDimensionAndValue.stream()
  171. .map(dimValue -> dimValue.buildDesc())
  172. .collect(Collectors.toList()))
  173. ;
  174. }
  175. /**
  176. * 该节点的描述
  177. *
  178. * @return
  179. */
  180. public String buildNodeDesc() {
  181. return String.format("%s(贡献度:%s, 当前值%s, 对比值%s, 变化率%s)", dimValueToString(), contribution, getNewValue(),
  182. getOldValue(), computeFluctuationRatio());
  183. }
  184. /**
  185. * 判断根节点是否无波动
  186. * @return
  187. */
  188. public boolean withNoFluctuation() {
  189. if(this.getOldTotalValue() == null || this.getNewTotalValue() == null) {
  190. return true;
  191. }
  192. return this.getOldTotalValue().equals(this.getNewTotalValue());
  193. }
  194. @Override
  195. public boolean equals(Object o) {
  196. if (this == o) { return true; }
  197. if (o == null || getClass() != o.getClass()) { return false; }
  198. BaseNode baseNode = (BaseNode)o;
  199. return Objects.equals(indexCode, baseNode.indexCode) &&
  200. // 转换为map后再比较其是否相等
  201. Objects.equals(this.dimValueListToMap(), baseNode.dimValueListToMap());
  202. }
  203. @Override
  204. public int hashCode() {
  205. return Objects.hash(indexCode, this.dimValueListToMap());
  206. }
  207. }
  208. package com.aliexpress.diagnose.node.model;
  209. import java.util.List;
  210. import lombok.Builder;
  211. import lombok.Data;
  212. import lombok.EqualsAndHashCode;
  213. import lombok.NoArgsConstructor;
  214. import lombok.ToString;
  215. import lombok.extern.slf4j.Slf4j;
  216. /**
  217. * 累加型节点
  218. *
  219. * @author minglei.jml
  220. */
  221. @Data
  222. @ToString(callSuper = true)
  223. @EqualsAndHashCode(callSuper = true)
  224. @NoArgsConstructor
  225. @Slf4j
  226. public class AddUpNode extends BaseNode {
  227. @Builder(builderMethodName = "addUpNodeBuilder")
  228. public AddUpNode(String indexCode, List<DimValue> currentDimensionAndValue,
  229. List<String> drillDownDimensions, Integer depth, Double oldValue, Double newValue,
  230. Double oldTotalValue, Double newTotalValue) {
  231. super(indexCode, currentDimensionAndValue, drillDownDimensions, depth, oldValue, newValue, oldTotalValue,
  232. newTotalValue);
  233. }
  234. @Override
  235. public Double computeContribution() {
  236. if (!paramCheck()) {
  237. throw new RuntimeException("factor is blank, can't compute contribution. node is " +
  238. getCurrentDimensionAndValue().toString() + "; new value : " + getNewValue() +
  239. "; old value : " + getOldValue() + "; new total value : " + getNewTotalValue() +
  240. "; old total value : " + getOldTotalValue());
  241. }
  242. if (this.getOldTotalValue().equals(this.getNewTotalValue())) {
  243. log.error("denominator is zero, oldTotalValue = newTotalValue, can't compute contribution. node is {}",
  244. getCurrentDimensionAndValue().toString());
  245. // 暂且设置为贡献度为0
  246. this.setContribution(0d);
  247. } else {
  248. this.setContribution((getNewValue() - getOldValue()) / (getNewTotalValue() - getOldTotalValue()));
  249. }
  250. return this.getContribution();
  251. }
  252. @Override
  253. public void copyTotalValue(BaseNode node) {
  254. if (!(node instanceof AddUpNode)) {
  255. throw new RuntimeException(
  256. "copy node type not match,can't copy total value. node is " + getCurrentDimensionAndValue().toString());
  257. }
  258. AddUpNode copyNode = (AddUpNode)node;
  259. this.setOldTotalValue(copyNode.getOldTotalValue());
  260. this.setNewTotalValue(copyNode.getNewTotalValue());
  261. }
  262. private boolean paramCheck() {
  263. return getNewValue() != null && getNewTotalValue() != null && getOldValue() != null
  264. && getOldTotalValue() != null;
  265. }
  266. @Override
  267. public void copyValueToTotal() {
  268. this.setOldTotalValue(this.getOldValue());
  269. this.setNewTotalValue(this.getNewValue());
  270. }
  271. }
  272. /**
  273. * Alipay.com Inc.
  274. * Copyright (c) 2004-2021 All Rights Reserved.
  275. */
  276. package com.aliexpress.diagnose.node.repository;
  277. import java.util.Collections;
  278. import java.util.List;
  279. import java.util.Optional;
  280. import java.util.stream.Collectors;
  281. import com.aliexpress.diagnose.indicator.Record;
  282. import com.aliexpress.diagnose.indicator.RecordParam;
  283. import com.aliexpress.diagnose.indicator.RecordRepository;
  284. import com.aliexpress.diagnose.indicator.model.IndexType;
  285. import com.aliexpress.diagnose.indicator.model.Pair;
  286. import com.aliexpress.diagnose.node.model.AddUpNode;
  287. import com.aliexpress.diagnose.node.model.BaseNode;
  288. import com.aliexpress.diagnose.node.model.DimValue;
  289. import com.google.common.collect.Lists;
  290. /**
  291. * @author antone
  292. */
  293. public class AddUpNodeDataRepositoryImpl implements NodeDataRepository {
  294. private RecordRepository recordRepository;
  295. public AddUpNodeDataRepositoryImpl(RecordRepository recordRepository) {
  296. this.recordRepository = recordRepository;
  297. }
  298. @Override
  299. public BaseNode query(RecordParam param) {
  300. Optional<Record> record = recordRepository.query(param);
  301. if (record.isPresent()) {
  302. return buildAddUpNode(param, record.get());
  303. }
  304. return null;
  305. }
  306. @Override
  307. public List<BaseNode> query(RecordParam param, String drillDownDimension) {
  308. List<Record> records = recordRepository.query(param, drillDownDimension);
  309. if (records == null) {
  310. return Collections.emptyList();
  311. }
  312. return records.stream()
  313. .map(record -> buildAddUpNode(param, drillDownDimension, record))
  314. .collect(Collectors.toList());
  315. }
  316. private AddUpNode buildAddUpNode(RecordParam param, Record record) {
  317. // 节点数据的查询时约定固定只能查一个指标的数据
  318. Pair indicatorValue = record.getIndicatorValues().get(param.getIndicators().get(0));
  319. return AddUpNode.addUpNodeBuilder()
  320. //
  321. .currentDimensionAndValue(param.getCurrentDimensionAndValue())
  322. .oldValue(indicatorValue.getCompare())
  323. .newValue(indicatorValue.getCurrent())
  324. .build();
  325. }
  326. private AddUpNode buildAddUpNode(RecordParam param, String drillDownDimension, Record record) {
  327. // 节点数据的查询时约定固定只能查一个指标的数据
  328. Pair indicatorValue = record.getIndicatorValues().get(param.getIndicators().get(0));
  329. // !!! 注意:不要修改原变量
  330. List<DimValue> dimValues = Lists.newArrayList(param.getCurrentDimensionAndValue());
  331. // 增加下钻维度值
  332. dimValues.add(DimValue.builder()
  333. .dim(drillDownDimension)
  334. .value(record.getDimensionValues().get(drillDownDimension))
  335. .build()
  336. );
  337. return AddUpNode.addUpNodeBuilder()
  338. .currentDimensionAndValue(dimValues)
  339. .oldValue(indicatorValue.getCompare())
  340. .newValue(indicatorValue.getCurrent())
  341. .build();
  342. }
  343. @Override
  344. public boolean accept(String indexType) {
  345. return IndexType.ADD_UP.getCode().equals(indexType);
  346. }
  347. }
  348. /**
  349. * Alipay.com Inc.
  350. * Copyright (c) 2004-2021 All Rights Reserved.
  351. */
  352. package com.aliexpress.diagnose.node;
  353. import java.util.ArrayList;
  354. import java.util.List;
  355. import java.util.stream.Collectors;
  356. import com.aliexpress.diagnose.node.model.BaseNode;
  357. import com.aliexpress.diagnose.request.AnalysisConfig;
  358. import com.aliexpress.diagnose.util.MathUtils;
  359. import lombok.extern.slf4j.Slf4j;
  360. /**
  361. * 剪枝
  362. *
  363. * @author antone
  364. */
  365. @Slf4j
  366. public class CutService {
  367. private static final CutService INSTANCE = new CutService();
  368. private CutService() {
  369. }
  370. public static CutService getInstance() {
  371. return INSTANCE;
  372. }
  373. /**
  374. * 剪枝
  375. *
  376. * @param originalNodes 剪枝前
  377. * @param config
  378. * @return 剪枝后
  379. */
  380. public List<BaseNode> cutNode(List<BaseNode> originalNodes, AnalysisConfig config, BaseNode parentNode,
  381. String currentDrillDownDimension) {
  382. List<BaseNode> ret = new ArrayList<>();
  383. if (config.isCutByDimValueChangeDistribution()) {
  384. // 如果维值的变化量都相差不大,则全部剪掉(相当于不分析该维度)
  385. if (!MathUtils.hasOutliers(
  386. originalNodes.stream()
  387. // 计算节点的变化值
  388. .map(node -> node.getNewValue() - node.getOldValue())
  389. .collect(Collectors.toList()))
  390. ) {
  391. log.info("[CutService.cutNode] 维值的变化量都相差不大,全部剪掉, 当前父节点为{}, 当前下钻维度为{}",
  392. parentNode.getCurrentDimensionAndValue(), currentDrillDownDimension);
  393. return ret;
  394. }
  395. }
  396. originalNodes.forEach(node -> {
  397. // 计算贡献度
  398. if (node.getContribution() == null) {
  399. node.computeContribution();
  400. }
  401. // 贡献度 <= 阈值,则剪枝
  402. if (node.getContribution() > config.getContributionLowerLimit()) {
  403. ret.add(node);
  404. } else {
  405. log.info(
  406. "贡献度{} <= 阈值{},剪枝, 当前父节点为{}, 下钻节点为{}, oldValue:{}, newValue{}, oldTotalValue:{}, newTotalValue:{}",
  407. node.getContribution(),
  408. config.getContributionLowerLimit(), parentNode.getCurrentDimensionAndValue(),
  409. node.getCurrentDimensionAndValue(), node.getOldValue(), node.getNewValue(), node.getOldTotalValue(),
  410. node.getNewTotalValue());
  411. }
  412. });
  413. return ret;
  414. }
  415. }
  416. package com.aliexpress.diagnose.node;
  417. import java.util.List;
  418. import java.util.Objects;
  419. import java.util.stream.Collectors;
  420. import com.aliexpress.diagnose.indicator.RecordRepository;
  421. import com.aliexpress.diagnose.node.model.BaseNode;
  422. import com.aliexpress.diagnose.node.model.CreateBasicNodeParam;
  423. import com.aliexpress.diagnose.node.model.CreateDrillDownNodeParam;
  424. import com.aliexpress.diagnose.node.repository.NodeDataRepoExecutor;
  425. /**
  426. * Node工厂,负责生产Node,单例
  427. *
  428. * @author minglei.jml
  429. */
  430. public class NodeFactory {
  431. private RecordRepository recordRepository;
  432. private NodeDataRepoExecutor nodeDataRepoExecutor;
  433. public NodeFactory(RecordRepository recordRepository) {
  434. this.recordRepository = recordRepository;
  435. // init 属性
  436. nodeDataRepoExecutor = new NodeDataRepoExecutor(recordRepository);
  437. }
  438. /**
  439. * 创建下钻节点
  440. *
  441. * @param param
  442. * @return
  443. */
  444. public List<BaseNode> createDrillDownNode(CreateDrillDownNodeParam param) {
  445. Objects.requireNonNull(param, "[createDrillDownNode] param is null");
  446. Objects.requireNonNull(param.getFatherNode(), "[createRootNode] param getFatherNode is null");
  447. return doCreate(param);
  448. }
  449. /**
  450. * 创建根节点
  451. *
  452. * @param param
  453. * @return
  454. */
  455. public BaseNode createRootNode(CreateBasicNodeParam param) {
  456. Objects.requireNonNull(param, "[createRootNode] param is null");
  457. Objects.requireNonNull(param.getIndexCode(), "[createRootNode] param indexCode is null");
  458. return doCreateRootNode(param);
  459. }
  460. private BaseNode doCreateRootNode(CreateBasicNodeParam param) {
  461. // 指标查询
  462. // caution: 这里的BaseNode返回已经是实例化了的类型 AddUpNode / RatioNode
  463. BaseNode resultNode = nodeDataRepoExecutor.query(param.getIndexType(), param.convert2RecordParam());
  464. resultNode.setIndexCode(param.getIndexCode());
  465. resultNode.setDrillDownDimensions(param.getDrillDownDimensions());
  466. resultNode.setCurrentDimensionAndValue(param.getCurrentDimensionAndValue());
  467. resultNode.setDepth(param.getDepth());
  468. // 注意这里:将根节点本身的值copy 到 整体值上
  469. resultNode.copyValueToTotal();
  470. resultNode.computeContribution();
  471. return resultNode;
  472. }
  473. private List<BaseNode> doCreate(CreateDrillDownNodeParam param) {
  474. // 指标查询
  475. List<BaseNode> baseNodes = nodeDataRepoExecutor.query(param.getIndexType(), param.convert2RecordParam(),
  476. param.getChosenDrillDownDimension());
  477. List<String> drillDownDimensions = param.getFatherNode().getDrillDownDimensions();
  478. // 所有父节点的可下钻维度中,剔除当前下钻维度,即是当前节点的可下钻维度(也可理解为同层维度中的其他维度都是)
  479. List<String> subDimensions = drillDownDimensions.stream().filter(
  480. dim -> !dim.equals(param.getChosenDrillDownDimension())).collect(Collectors.toList());
  481. baseNodes.forEach(baseNode -> {
  482. baseNode.setIndexCode(param.getFatherNode().getIndexCode());
  483. baseNode.setDrillDownDimensions(subDimensions);
  484. baseNode.setDepth(param.getFatherNode().getDepth() + 1);
  485. // copy 根节点值
  486. baseNode.copyTotalValue(param.getFatherNode());
  487. baseNode.computeContribution();
  488. });
  489. return baseNodes;
  490. }
  491. }
  492. /**
  493. * Alipay.com Inc.
  494. * Copyright (c) 2004-2021 All Rights Reserved.
  495. */
  496. package com.aliexpress.diagnose.node;
  497. import java.util.ArrayList;
  498. import java.util.Collections;
  499. import java.util.Iterator;
  500. import java.util.List;
  501. import java.util.Map.Entry;
  502. import java.util.stream.Collectors;
  503. import com.aliexpress.diagnose.indicator.IndexRepository;
  504. import com.aliexpress.diagnose.indicator.RecordRepository;
  505. import com.aliexpress.diagnose.indicator.model.Index;
  506. import com.aliexpress.diagnose.node.model.BaseNode;
  507. import com.aliexpress.diagnose.node.model.CreateBasicNodeParam;
  508. import com.aliexpress.diagnose.request.AnalysisConfig;
  509. import lombok.extern.slf4j.Slf4j;
  510. import org.apache.commons.collections4.CollectionUtils;
  511. import org.apache.commons.lang.StringUtils;
  512. /**
  513. * @author antone
  514. */
  515. @Slf4j
  516. public class NodeService {
  517. private RecordRepository recordRepository;
  518. private NodeFactory nodeFactory;
  519. public NodeService() {
  520. }
  521. public NodeService(RecordRepository recordRepository) {
  522. this.recordRepository = recordRepository;
  523. this.nodeFactory = new NodeFactory(recordRepository);
  524. }
  525. /**
  526. * 初始创建根节点
  527. *
  528. * @param analysisConfig
  529. * @return
  530. */
  531. public BaseNode createRootNode(AnalysisConfig analysisConfig) {
  532. Index index = IndexRepository.getInstance().get(analysisConfig.getIndexCode());
  533. CreateBasicNodeParam param = CreateBasicNodeParam.builder()
  534. .indexCode(analysisConfig.getIndexCode())
  535. .indexType(index.getType())
  536. .currentDimensionAndValue(analysisConfig.getCurrentDimensionAndValue())
  537. .drillDownDimensions(analysisConfig.getDrillDownDimensions())
  538. .allDimensions(index.getDimensions())
  539. //根节点 深度 定义为0
  540. .depth(0)
  541. .statDate(analysisConfig.getStatDate())
  542. .build();
  543. BaseNode rootNode = nodeFactory.createRootNode(param);
  544. return rootNode;
  545. }
  546. /**
  547. * 将树转换为一个 List
  548. *
  549. * @param rootNode
  550. */
  551. public List<BaseNode> tree2List(BaseNode rootNode) {
  552. List<BaseNode> retList = new ArrayList<>();
  553. retList.add(rootNode);
  554. if (rootNode.getSubNodes() != null && !rootNode.getSubNodes().isEmpty()) {
  555. rootNode.getSubNodes().values().stream().forEach(
  556. v -> v.stream().forEach(baseNode -> retList.addAll(tree2List(baseNode))));
  557. }
  558. return retList;
  559. }
  560. /**
  561. * 获取所有叶子节点
  562. *
  563. * @param rootNode
  564. * @return
  565. */
  566. public List<BaseNode> getLeafNodes(BaseNode rootNode) {
  567. List<BaseNode> retList = new ArrayList<>();
  568. if (rootNode.getSubNodes() != null && !rootNode.getSubNodes().isEmpty()) {
  569. rootNode.getSubNodes().values().stream().forEach(
  570. v -> v.stream().forEach(baseNode -> retList.addAll(getLeafNodes(baseNode))));
  571. } else {
  572. retList.add(rootNode);
  573. }
  574. return retList;
  575. }
  576. /**
  577. * 按贡献度排序节点,从大到小
  578. *
  579. * @param nodes
  580. */
  581. public List<BaseNode> sortNodeByContribution(List<BaseNode> nodes) {
  582. Collections.sort(nodes, (BaseNode o1, BaseNode o2) ->
  583. o2.getContribution().compareTo(o1.getContribution())
  584. );
  585. return nodes;
  586. }
  587. /**
  588. * 去重
  589. * A-B-C 和 B-A-C 属于同一条路径
  590. *
  591. * @param nodes
  592. * @return
  593. */
  594. public List<BaseNode> distinct(List<BaseNode> nodes) {
  595. if (nodes == null || nodes.isEmpty()) {
  596. return new ArrayList<>();
  597. }
  598. return nodes.stream().distinct().collect(Collectors.toList());
  599. }
  600. public void printDiagnoseTree(BaseNode rootNode) {
  601. // 1. 得到所有节点
  602. List<BaseNode> allNodes = tree2List(rootNode);
  603. // 2. 去重
  604. List<BaseNode> distinctedNodes = distinct(allNodes);
  605. //log.info("[printDiagnoseTree]distinctedNodes: {}", distinctedNodes);
  606. // 3. 以去重后的节点再次构建树,构建方法:在所有原始节点(rootNode)中只留下distinctedNodes
  607. removeDuplicateNode(rootNode, distinctedNodes);
  608. // 4. 打印树
  609. displaytree(rootNode, 0);
  610. }
  611. /**
  612. * 在所有原始节点(rootNode)中只留下distinctedNodes
  613. *
  614. * @param rootNode
  615. */
  616. public void removeDuplicateNode(BaseNode rootNode, List<BaseNode> distinctedNodes) {
  617. if (rootNode.getSubNodes() != null && !rootNode.getSubNodes().isEmpty()) {
  618. Iterator<Entry<String, List<BaseNode>>> mapIterator = rootNode.getSubNodes().entrySet().iterator();
  619. while (mapIterator.hasNext()) {
  620. Entry<String, List<BaseNode>> entry = mapIterator.next();
  621. Iterator<BaseNode> iterator = entry.getValue().iterator();
  622. while (iterator.hasNext()) {
  623. BaseNode baseNode = iterator.next();
  624. removeDuplicateNode(baseNode, distinctedNodes);
  625. if (!nodeAbstractContains(distinctedNodes, baseNode)) {
  626. iterator.remove();
  627. }
  628. }
  629. if (CollectionUtils.isEmpty(entry.getValue())) {
  630. mapIterator.remove();
  631. }
  632. }
  633. }
  634. }
  635. /**
  636. * 判断nodesContainer是否绝对包含toCompareNode
  637. * 绝对包含的含义:dimValue完全equal,包含dim的顺序也要相等
  638. *
  639. * @param nodesContainer
  640. * @param toCompareNode
  641. * @return
  642. */
  643. public boolean nodeAbstractContains(List<BaseNode> nodesContainer, BaseNode toCompareNode) {
  644. for (BaseNode baseNode : nodesContainer) {
  645. if (StringUtils.equals(baseNode.dimValueToString(), toCompareNode.dimValueToString())) {
  646. return true;
  647. }
  648. }
  649. return false;
  650. }
  651. /**
  652. * 打印树
  653. *
  654. * @param f
  655. * @param level
  656. */
  657. private static void displaytree(BaseNode f, int level) { //递归显示树
  658. String preStr = "";
  659. for (int i = 0; i < level; i++) {
  660. preStr += " ";
  661. }
  662. if (f.getSubNodes() != null && !f.getSubNodes().isEmpty()) {
  663. List<BaseNode> allSubNodes = f.getSubNodes().values().stream().flatMap(list -> list.stream()).collect(
  664. Collectors.toList());
  665. for (BaseNode baseNode : allSubNodes) {
  666. log.info(preStr + baseNode.buildNodeDesc());
  667. if (baseNode.getSubNodes() != null && !baseNode.getSubNodes().isEmpty()) {
  668. displaytree(baseNode, level + 1);
  669. }
  670. }
  671. }
  672. }
  673. }

入口

  1. /**
  2. * Alipay.com Inc.
  3. * Copyright (c) 2004-2021 All Rights Reserved.
  4. */
  5. package com.aliexpress.diagnose;
  6. import com.aliexpress.diagnose.indicator.RecordRepository;
  7. import com.aliexpress.diagnose.node.NodeService;
  8. import com.aliexpress.diagnose.node.model.BaseNode;
  9. import com.aliexpress.diagnose.request.AnalysisConfig;
  10. import com.aliexpress.diagnose.request.FluctuationAnalysisParam;
  11. import com.aliexpress.diagnose.result.ResultGenerateStrategyFactory;
  12. import com.aliexpress.diagnose.result.model.ResultData;
  13. import com.aliexpress.diagnose.task.AnalysisJob;
  14. import lombok.extern.slf4j.Slf4j;
  15. /**
  16. * 波动分析入口
  17. *
  18. * @author antone
  19. */
  20. @Slf4j
  21. public class FluctuationService {
  22. private RecordRepository recordRepository;
  23. private AnalysisJob analysisJob;
  24. private NodeService nodeService;
  25. public FluctuationService(RecordRepository recordRepository) {
  26. this.recordRepository = recordRepository;
  27. this.analysisJob = new AnalysisJob(recordRepository);
  28. this.nodeService = new NodeService();
  29. }
  30. /**
  31. * 波动分析入口
  32. *
  33. * @param param
  34. * @return
  35. */
  36. public ResultData fluctuationAnalysis(FluctuationAnalysisParam param) {
  37. // 1. 构建波动分析入参
  38. AnalysisConfig analysisConfig = param.getAnalysisConfig();
  39. // 2. 执行波动分析
  40. BaseNode rootNode = analysisJob.analysis(analysisConfig);
  41. // 没波动,则直接返回
  42. if(rootNode.withNoFluctuation()) {
  43. return ResultData.builder()
  44. .oldTotalValue(rootNode.getOldTotalValue())
  45. .newTotalValue(rootNode.getNewTotalValue())
  46. .build();
  47. }
  48. // 打印整个拆解结果树
  49. nodeService.printDiagnoseTree(rootNode);
  50. // 3. 构造结果
  51. ResultData resultData = ResultGenerateStrategyFactory.create(param.getResultGenStrategy())
  52. .generate(rootNode, param.getResultTopN());
  53. return resultData;
  54. }
  55. }

request

  1. package com.aliexpress.diagnose.request;
  2. import java.util.List;
  3. import com.aliexpress.diagnose.node.model.DimValue;
  4. import lombok.AllArgsConstructor;
  5. import lombok.Builder;
  6. import lombok.Data;
  7. import lombok.EqualsAndHashCode;
  8. import lombok.NoArgsConstructor;
  9. /**
  10. * 诊断模型 所需的 分析参数model
  11. *
  12. * @author minglei.jml
  13. */
  14. @Data
  15. @Builder
  16. @EqualsAndHashCode
  17. @NoArgsConstructor
  18. @AllArgsConstructor
  19. public class AnalysisConfig {
  20. /**
  21. * 指标code
  22. */
  23. private String indexCode;
  24. /**
  25. * 当前指标的维度和维值
  26. */
  27. private List<DimValue> currentDimensionAndValue;
  28. /**
  29. * 可下钻维度
  30. */
  31. private List<String> drillDownDimensions;
  32. //private Map<Integer, Double> cutSubNodeStrategy;
  33. /**
  34. * 分析日期
  35. */
  36. private String statDate;
  37. /**
  38. * 贡献度阈值
  39. */
  40. private Double contributionLowerLimit;
  41. /**
  42. * 是否要根据维度下维值变化的离散度剪枝
  43. */
  44. private boolean cutByDimValueChangeDistribution;
  45. }
  46. package com.aliexpress.diagnose.request;
  47. import java.util.List;
  48. import com.aliexpress.diagnose.node.model.DimValue;
  49. import com.aliexpress.diagnose.result.model.ResultGenStrategyEnum;
  50. import lombok.AllArgsConstructor;
  51. import lombok.Builder;
  52. import lombok.Data;
  53. import lombok.EqualsAndHashCode;
  54. import lombok.NoArgsConstructor;
  55. /**
  56. * @author minglei.jml
  57. * 波动分析原始入参
  58. */
  59. @Data
  60. @Builder
  61. @EqualsAndHashCode
  62. @NoArgsConstructor
  63. @AllArgsConstructor
  64. public class FluctuationAnalysisParam {
  65. /**
  66. * 指标code,直接以物理列名作为指标code,比如 "pay_amt_1d"(最近一天的支付金额)
  67. *
  68. * GMV诊断表参考:https://dmc.dw.alibaba-inc.com/dm/table/odps.aebi.adi_ae_sycm_trd_diag_analysis_1d/detail/col#/
  69. */
  70. private String indexCode;
  71. /**
  72. * 要分析的指标对应的维度和维值,比如要分析RU的gmv波动,则currentDimensionAndValue为"country":"RU"
  73. */
  74. private List<DimValue> currentDimensionAndValue;
  75. /**
  76. * 多维分析维度列表,比如要分析平台、类目两个维度,则列表为["platform","category"],也以物理列名表示
  77. */
  78. private List<String> drillDownDimensions;
  79. /**
  80. * 分析日期, 格式:yyyyMMdd
  81. */
  82. private String statDate;
  83. /**------------- 剪枝策略相关参数start ---------------------------- */
  84. /**
  85. * 贡献度下限(低于下限的不再下钻)
  86. */
  87. private Double contributionLowerLimit;
  88. /**
  89. * 是否要根据维度下维值变化的离散度剪枝
  90. */
  91. private boolean cutByDimValueChangeDistribution;
  92. /**------------- 剪枝策略相关参数end---------------------------- */
  93. /**------------- 结果生成策略相关参数start ---------------------------- */
  94. /**
  95. * 结果生成策略
  96. *
  97. * @see ResultGenStrategyEnum
  98. *
  99. * 传递ResultGenStrategyEnum的code
  100. */
  101. private String resultGenStrategy;
  102. /**
  103. * 保留几条最终结果,默认三条
  104. */
  105. private int resultTopN = 3;
  106. /**------------- 结果生成策略相关参数end ---------------------------- */
  107. /**
  108. * model转换
  109. *
  110. * @return
  111. */
  112. public AnalysisConfig getAnalysisConfig() {
  113. return AnalysisConfig.builder()
  114. .indexCode(indexCode)
  115. .drillDownDimensions(drillDownDimensions)
  116. .contributionLowerLimit(contributionLowerLimit)
  117. .currentDimensionAndValue(currentDimensionAndValue)
  118. .statDate(statDate)
  119. .build();
  120. }
  121. }

result

  1. /**
  2. * Alipay.com Inc.
  3. * Copyright (c) 2004-2021 All Rights Reserved.
  4. */
  5. package com.aliexpress.diagnose.result.model;
  6. /**
  7. * 实现三种方式:
  8. * 1. 所有层级所有节点排序,不合并"包含路径"(ABC包含AB)
  9. * 2. 所有层级所有节点排序,合并"包含路径"(ABC包含AB)
  10. * 3. 只拿所有叶子节点排序
  11. * @author antone
  12. */
  13. public enum ResultGenStrategyEnum {
  14. ALL_NODES_WITHOUT_PATH_MERGE("ALL_NODES_WITHOUT_PATH_MERGE", "所有层级所有节点排序,不合并「包含路径」(认为A-B-C包含A-B)"),
  15. ALL_NODES_WITH_PATH_MERGE("ALL_NODES_WITH_PATH_MERGE", "所有层级所有节点排序,合并「包含路径」(认为A-B-C包含A-B)"),
  16. LEAF_NODES_SORTED("LEAF_NODES_SORTED", "只拿所有叶子节点排序");
  17. private String code;
  18. private String desc;
  19. ResultGenStrategyEnum(String code, String desc) {
  20. this.code = code;
  21. this.desc = desc;
  22. }
  23. public String getCode() {
  24. return code;
  25. }
  26. public String getDesc() {
  27. return desc;
  28. }
  29. }
  30. /**
  31. * Alipay.com Inc.
  32. * Copyright (c) 2004-2021 All Rights Reserved.
  33. */
  34. package com.aliexpress.diagnose.result.impl;
  35. import java.util.ArrayList;
  36. import java.util.List;
  37. import java.util.stream.Collectors;
  38. import com.aliexpress.diagnose.node.NodeService;
  39. import com.aliexpress.diagnose.node.model.BaseNode;
  40. import com.aliexpress.diagnose.result.model.ResultData;
  41. import com.aliexpress.diagnose.result.model.ResultRecord;
  42. import com.aliexpress.diagnose.util.DimUtils;
  43. import org.apache.commons.collections4.CollectionUtils;
  44. /**
  45. * @author antone
  46. */
  47. public class BaseResultGenerateStrategy {
  48. protected NodeService nodeService;
  49. public BaseResultGenerateStrategy() {
  50. this.nodeService = new NodeService();
  51. }
  52. public String generateText(ResultRecord resultRecord) {
  53. String textUp = "%s 维度的指标值 上升了 %s, 贡献度为 %s, 当前值为%s, 对比值为%s";
  54. String textDown = "%s 维度的指标值 下降了 %s, 贡献度为 %s, 当前值为%s, 对比值为%s";
  55. String subNodeTextUp = "; 建议重点关注子维度 %s, 其上升了 %s, 贡献度为 %s, 当前值为%s, 对比值为%s";
  56. String subNodeTextDown = "; 建议重点关注子维度 %s, 其下降了 %s, 贡献度为 %s, 当前值为%s, 对比值为%s";
  57. // 决定采用上升还是下降文案
  58. String textReal;
  59. String subNodeTextReal;
  60. if (resultRecord.getFluctuationRatio() == null
  61. || resultRecord.getFluctuationRatio() > 0) {
  62. textReal = textUp;
  63. subNodeTextReal = subNodeTextUp;
  64. } else {
  65. textReal = textDown;
  66. subNodeTextReal = subNodeTextDown;
  67. }
  68. String ret = String.format(textReal, resultRecord.buildDimValueDesc(),
  69. Math.abs(resultRecord.getFluctuationRatio()),
  70. resultRecord.getContribution(), resultRecord.getCurrentValue(), resultRecord.getCompareValue());
  71. // 有子维度的诊断时,添加子维度的文案
  72. if (resultRecord.getBiggestSubNode() != null) {
  73. ret = ret + String.format(subNodeTextReal, resultRecord.getBiggestSubNode().buildDimValueDesc(),
  74. Math.abs(resultRecord.getBiggestSubNode().getFluctuationRatio()),
  75. resultRecord.getBiggestSubNode().getContribution(),
  76. resultRecord.getBiggestSubNode().getCurrentValue(),
  77. resultRecord.getBiggestSubNode().getCompareValue());
  78. }
  79. return ret;
  80. }
  81. public ResultData genResultData(BaseNode rootNode, List<BaseNode> resultNodes) {
  82. List<ResultRecord> resultRecords = resultNodes.stream().map(
  83. baseNode -> {
  84. if (DimUtils.isConvergent(DimUtils.getDims(baseNode.getCurrentDimensionAndValue()))
  85. || CollectionUtils.isEmpty(baseNode.getAllSubNodes())) {
  86. // 如果当前路径可收敛,或者当前是叶子节点,直接构造结果
  87. return ResultRecord.create(
  88. baseNode.getCurrentDimensionAndValue(),
  89. baseNode.getContribution(),
  90. baseNode.getNewValue(),
  91. baseNode.getOldValue(),
  92. baseNode.computeFluctuationRatio(),
  93. null);
  94. } else {
  95. // 否则,再往下看看子节点,展示其下所有子节点的top1(所有维度 所有层级一起排序取top1)
  96. return buildResultRecordWithBiggestSubNode(baseNode);
  97. }
  98. }
  99. ).collect(Collectors.toList());
  100. // 文案拼装,poc阶段先写在这里,最终也是应用层做文案生成
  101. List<String> resultTexts = resultRecords.stream().map(resultRecord -> generateText(resultRecord))
  102. .collect(Collectors.toList());
  103. return ResultData.builder()
  104. .resultRecords(resultRecords)
  105. .resultTexts(resultTexts)
  106. .newTotalValue(rootNode.getNewTotalValue())
  107. .oldTotalValue(rootNode.getOldTotalValue())
  108. .rootNodeFluctuationRatio(rootNode.computeFluctuationRatio())
  109. .build();
  110. }
  111. private ResultRecord buildResultRecordWithBiggestSubNode(BaseNode baseNode) {
  112. // 得到biggest subNode
  113. List<BaseNode> subNodes = baseNode.getAllSubNodes();
  114. // 去重
  115. List<BaseNode> distinctNodes = nodeService.distinct(subNodes);
  116. // 排序
  117. nodeService.sortNodeByContribution(distinctNodes);
  118. // 取top1
  119. BaseNode biggestSubNode = distinctNodes.get(0);
  120. ResultRecord biggestSubResultRecord = ResultRecord.create(
  121. biggestSubNode.getCurrentDimensionAndValue(),
  122. biggestSubNode.getContribution(),
  123. biggestSubNode.getNewValue(),
  124. biggestSubNode.getOldValue(),
  125. biggestSubNode.computeFluctuationRatio(),
  126. null);
  127. return ResultRecord.create(
  128. baseNode.getCurrentDimensionAndValue(),
  129. baseNode.getContribution(),
  130. baseNode.getNewValue(),
  131. baseNode.getOldValue(),
  132. baseNode.computeFluctuationRatio(),
  133. biggestSubResultRecord);
  134. }
  135. /**
  136. * 深度优先方式保留路径
  137. * 即:A - B 和 A-B-C 认为重复,保留A-B-C
  138. *
  139. *
  140. * 以输出list作为左序列,以输入list为右序列
  141. * 初始在左序列中,加入输入list的第一个元素
  142. *
  143. * 依次拿取右序列中的元素,判断是否和左序列中的元素重复
  144. *
  145. * 如果左元素包含右元素,不做任何处理
  146. * 如果右元素包含左元素,替换左元素
  147. * 否则,右元素和所有左元素均不重复,则加入左序列
  148. *
  149. * @param sortedNodes 已经按贡献度排序好的node序列
  150. * @param topN 保留topN
  151. * @return
  152. */
  153. public List<BaseNode> getTopResultDeepFirst(List<BaseNode> sortedNodes, int topN) {
  154. List<BaseNode> ret = new ArrayList<>();
  155. ret.add(sortedNodes.get(0));
  156. for (int i = 1; i < sortedNodes.size(); i++) {
  157. BaseNode rightNode = sortedNodes.get(i);
  158. boolean leftContainsRight = false;
  159. boolean rightContainsLeft = false;
  160. for (int j = 0; j < ret.size(); j++) {
  161. BaseNode leftNode = ret.get(j);
  162. if (leftNode.containsNode(rightNode)) {
  163. leftContainsRight = true;
  164. break;
  165. } else if (rightNode.containsNode(leftNode)) {
  166. rightContainsLeft = true;
  167. // 替换所有右边包含左边的元素,这个时候ret中元素可能会有重复,后面去重
  168. ret.set(j, rightNode);
  169. }
  170. }
  171. if (rightContainsLeft) {
  172. // 右包含左, 去重
  173. ret = nodeService.distinct(ret);
  174. }
  175. // 互不包含,则add to rt
  176. if (!leftContainsRight && !rightContainsLeft) {
  177. ret.add(rightNode);
  178. if (ret.size() == topN) {
  179. return ret;
  180. }
  181. }
  182. }
  183. return ret;
  184. }
  185. }
  186. /**
  187. * Alipay.com Inc.
  188. * Copyright (c) 2004-2021 All Rights Reserved.
  189. */
  190. package com.aliexpress.diagnose.result.impl;
  191. import java.util.List;
  192. import com.aliexpress.diagnose.node.model.BaseNode;
  193. import com.aliexpress.diagnose.result.ResultGenerateStrategy;
  194. import com.aliexpress.diagnose.result.model.ResultData;
  195. import com.aliexpress.diagnose.result.model.ResultGenStrategyEnum;
  196. import org.apache.commons.lang3.StringUtils;
  197. /**
  198. * 所有层级所有节点排序,不合并"包含路径"(ABC包含AB)
  199. * @author antone
  200. */
  201. public class AllNodesWithoutPathMergeStrategy extends BaseResultGenerateStrategy implements ResultGenerateStrategy {
  202. @Override
  203. public ResultData generate(BaseNode rootNode, int resultTopN) {
  204. // 排序所有节点,贡献度排序,取top
  205. List<BaseNode> nodes = nodeService.tree2List(rootNode);
  206. // 先排除根节点
  207. nodes = nodes.subList(1, nodes.size());
  208. // 去重
  209. List<BaseNode> distinctNodes = nodeService.distinct(nodes);
  210. // 排序
  211. nodeService.sortNodeByContribution(distinctNodes);
  212. // 取top
  213. List<BaseNode> resultNodes = distinctNodes.subList(0, Math.min(resultTopN, nodes.size()));
  214. return genResultData(rootNode, resultNodes);
  215. }
  216. @Override
  217. public boolean accept(String strategy) {
  218. return StringUtils.equalsIgnoreCase(strategy, ResultGenStrategyEnum.ALL_NODES_WITHOUT_PATH_MERGE.getCode());
  219. }
  220. }
  221. /**
  222. * Alipay.com Inc.
  223. * Copyright (c) 2004-2021 All Rights Reserved.
  224. */
  225. package com.aliexpress.diagnose.result.impl;
  226. import java.util.List;
  227. import com.aliexpress.diagnose.node.model.BaseNode;
  228. import com.aliexpress.diagnose.result.ResultGenerateStrategy;
  229. import com.aliexpress.diagnose.result.model.ResultData;
  230. import com.aliexpress.diagnose.result.model.ResultGenStrategyEnum;
  231. import org.apache.commons.lang3.StringUtils;
  232. /**
  233. * 所有层级所有节点排序,合并"包含路径"(ABC包含AB, 只保留ABC)
  234. *
  235. * @author antone
  236. */
  237. public class AllNodesWithPathMergeStrategy extends BaseResultGenerateStrategy implements ResultGenerateStrategy {
  238. @Override
  239. public ResultData generate(BaseNode rootNode, int resultTopN) {
  240. List<BaseNode> nodes = nodeService.tree2List(rootNode);
  241. // 先排除根节点
  242. nodes = nodes.subList(1, nodes.size());
  243. // 去重
  244. List<BaseNode> distinctNodes = nodeService.distinct(nodes);
  245. // 排序
  246. nodeService.sortNodeByContribution(distinctNodes);
  247. // 取top, 合并"包含路径"
  248. List<BaseNode> resultNodes =
  249. getTopResultDeepFirst(distinctNodes, resultTopN);
  250. return genResultData(rootNode, resultNodes);
  251. }
  252. @Override
  253. public boolean accept(String strategy) {
  254. return StringUtils.equalsIgnoreCase(strategy, ResultGenStrategyEnum.ALL_NODES_WITH_PATH_MERGE.getCode());
  255. }
  256. }
  257. /**
  258. * Alipay.com Inc.
  259. * Copyright (c) 2004-2021 All Rights Reserved.
  260. */
  261. package com.aliexpress.diagnose.result.impl;
  262. import java.util.Collections;
  263. import java.util.List;
  264. import java.util.Map;
  265. import com.aliexpress.diagnose.indicator.IndexRepository;
  266. import com.aliexpress.diagnose.node.model.BaseNode;
  267. import com.aliexpress.diagnose.result.ResultGenerateStrategy;
  268. import com.aliexpress.diagnose.result.model.ResultData;
  269. import com.aliexpress.diagnose.result.model.ResultGenStrategyEnum;
  270. import lombok.extern.slf4j.Slf4j;
  271. import org.apache.commons.lang3.StringUtils;
  272. /**
  273. * 只拿所有叶子节点排序
  274. *
  275. * @author antone
  276. */
  277. @Slf4j
  278. public class LeafNodesSortedStrategy extends BaseResultGenerateStrategy implements ResultGenerateStrategy {
  279. @Override
  280. public ResultData generate(BaseNode rootNode, int resultTopN) {
  281. // 排序叶子节点,贡献度排序,取top
  282. List<BaseNode> nodes = nodeService.getLeafNodes(rootNode);
  283. // 去重
  284. List<BaseNode> distinctNodes = nodeService.distinct(nodes);
  285. // 将维度按业务语义排序
  286. distinctNodes.forEach(node -> {
  287. Map<String, Integer> dimensionOrders = IndexRepository.getInstance().get(node.getIndexCode())
  288. .getDimensionOrders();
  289. Collections.sort(node.getCurrentDimensionAndValue(), (o1, o2) -> {
  290. return dimensionOrders.get(o1.getDim()).compareTo(dimensionOrders.get(o2.getDim()));
  291. });
  292. });
  293. // 将节点按贡献度排序
  294. nodeService.sortNodeByContribution(distinctNodes);
  295. log.info("[LeafNodesSortedStrategy] all leaf node size after distinct is: {}", distinctNodes.size());
  296. // 取top, 合并"包含路径"(由于前面将维度按业务语义进行了排序,这里可能会出现父子包含路径, 比如A-B-C 和 A-C)
  297. List<BaseNode> resultNodes =
  298. getTopResultDeepFirst(distinctNodes, resultTopN);
  299. return genResultData(rootNode, resultNodes);
  300. }
  301. @Override
  302. public boolean accept(String strategy) {
  303. return StringUtils.equalsIgnoreCase(strategy, ResultGenStrategyEnum.LEAF_NODES_SORTED.getCode());
  304. }
  305. }