方案
一般为维度拆解。
方案描述:分层拆解,并基于对整体的贡献度以及维度各维值的变化的离散度 进行剪枝。
对整体的贡献度:每个节点都计算是对根节点变化的贡献度,而不是对父节点变化的贡献度。
- 选定维度:初始拆解时,先看各维度下各维值变化的离散度来决定该维度是否可拆解(认为如果离散度不大,即各维值变化都差不多,则无需拆解该维度)
- 选定维值:计算各维值的贡献度,如果贡献度小于阈值,则剪枝(这里还可以有很多优化策略,比如只保留维值数据占比较大节点,或者只保留步骤1计算出的“离散点”)
- 重复以上两个步骤
- 最终,排序节点得到top结果。一般按贡献度排序。而排序策略上又有多种,所有层的所有节点排序,或者只排序叶子节点。(目前的认知:只排序叶子节点会更好,否则会出现重复路径,即维度A-B和A-B-C同时出现在结论中)
特点:
- 按需剪枝,而不是全量拆解
- 每一层的贡献度都可能>1,并可能<0。并且子节点和贡献度和父节点的贡献度没有大小关系,都是针对于根节点的变化贡献
- 每一层的节点的贡献度相加等于1,而不是所有层的节点相加等于1。
- 最终给出的结论信息,贡献度也可能>1,并且可能明显相加不等于1,可能难理解
贡献度计算
对于数值型数据,贡献度为 : 该节点的变化量 / 根节点的变化量
对于比率型数据:TODO
剪枝策略
子指标拆解:
上述方案并没有考虑子指标拆解,而是单纯的维度拆解。实际上,可以子指标拆解和维度拆解相结合(比如,先找到贡献度大的子指标,然后对子指标做维度拆解)
这时候可能涉及到判断父子指标的相关性。这里有线性相关性算法,比如皮尔森系数等
决定维度是否继续往下拆解
方法1:上述方案中的根据“维度各维值的变化的离散度”决定该维度是否可拆
(1)判断维度下所有维值的变化量是否存在异常值,常见方法:标准差和方差。详见 https://cloud.tencent.com/developer/article/1463726
方法2:维度间排序
用历史占比来与变化量占比的差异性来衡量。假定比较算子为日环比,昨天核心指标值为,在维度上的指标值分别为(即共有n个维度值);今天核心指标为为,在维度上的指标值分别为。那么,历史占比:
变化量占比(可视作为变化贡献度):
可以发现,两个向量是等长的。为了度量二者间的差异性,我们采用距离度量;目前采用Euclidean Distance,
%5E2%7D%20%20%0A#card=math&code=d%20%3D%20%5Csqrt%7B%5Csum_i%20%28h_i%20-%20v_i%29%5E2%7D%20%20%0A&id=Tyq7u)
然后对所有维度计算出来的这种差异性排序,排在前面的维度代表要优先拆解。
判断样本相似度,一般采用距离,典型方法:https://www.cnblogs.com/heaad/archive/2011/03/08/1977733.html
其他剪枝策略:
剪枝策略可以有很多种,常见的是基于贡献度阈值。如果考虑到维度下维值较多或较少,或者维值数据占比大不大等影响,还可以增加相应的策略(比如维值数据占比小于一定阈值则剪枝)
结果排序策略
一般按贡献度排序。而排序策略上又有多种,所有层的所有节点排序,或者只排序叶子节点。(目前的认知:只排序叶子节点会更好,否则会出现重复路径,即维度A-B和A-B-C同时出现在结论中)
思考
剪枝是为了性能优化 还是 保证结果准确性?
剪枝过程是个调优的过程。剪枝狠了会影响结果准确性。剪枝策略越多也越影响准确性。
结果排序策略也影响准确性。
这里也涉及到一个如何衡量结果准确性的问题,如何衡量?
代码实现
mr
package com.aliexpress.diagnose.mr;
import com.alibaba.fastjson.JSON;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.data.TableInfo.TableInfoBuilder;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
import lombok.extern.slf4j.Slf4j;
/**
* deploy jar to server: add jar /Users/antone/Documents/workspace/op-sycm-ae/sycm-diagnose/target/sycm-diagnose-1
* .0.0-SNAPSHOT.jar -f
*
* 远程执行命令:jar -resources sycm-diagnose-1.0.0-SNAPSHOT.jar -classpath
* /Users/antone/Documents/workspace/op-sycm-ae/sycm-diagnose/target/sycm-diagnose-1.0.0-SNAPSHOT.jar com.aliexpress
* .diagnose.mr.FluctuationMrDriver -inputTable aebi.adi_ae_sycm_trd_diag_analysis_1d -outputTable
* adi_ae_sycm_trd_diag_analysis_1d_test_rst -partitionDate 20210318 -dateRage 1
*
* @date 2021/3/18
*/
@Slf4j
public class FluctuationMrDriver {
public static void main(String[] args) throws OdpsException {
Args arg = Args.parse(args);
JobConf job = new JobConf();
job.setMapperClass(CubeMapper.class);
job.setReducerClass(FluctuationReducer.class);
job.setMapOutputKeySchema(SchemaUtils.fromString("seller_admin_seq:bigint"));
job.setMapOutputValueSchema(
SchemaUtils.fromString("seller_admin_seq:bigint,cate_lv2_id:bigint,country_id:string,"
+ "platform:string,source_lv2_desc:string,"
+ "buyer_type:string,visit_type:string,item_id:bigint,"
+ "pay_amt_1d:bigint,uv_1d:bigint,"
+ "pay_buyer_cnt_1d:bigint,pay_amt_per_1d:bigint,"
+ "uv_per_1d:bigint,pay_buyer_cnt_per_1d:bigint"));
InputUtils.addTable(getTableInfoBuilder(arg), job);
OutputUtils.addTable(TableInfo.builder()
.tableName(arg.getOutputTable())
.partSpec("dt=" + arg.getPartitionDate() +
"/date_range_partition=" + arg.getDateRange())
.build(),
job);
job.set(Args.ARGS, JSON.toJSONString(arg));
JobClient.runJob(job);
}
private static TableInfo getTableInfoBuilder(Args arg) {
TableInfoBuilder inputTableBuilder = TableInfo.builder()
.tableName(arg.getInputTable())
.partSpec("ds=" + arg.getPartitionDate() +
"/date_range=" + arg.getDateRange());
return inputTableBuilder.build();
}
}
package com.aliexpress.diagnose.mr;
import java.io.IOException;
import com.aliexpress.diagnose.indicator.model.DataKeys;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.MapperBase;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import static com.aliexpress.diagnose.mr.MrDataRepositoryProvider.BUYER_TYPE_KEY;
import static com.aliexpress.diagnose.mr.MrDataRepositoryProvider.PLATFORM_KEY;
import static com.aliexpress.diagnose.mr.MrDataRepositoryProvider.SELLER_ID_KEY;
/**
* @date 2021/3/19
*/
@Slf4j
public class CubeMapper extends MapperBase {
private Record key;
@Override
public void setup(TaskContext context) throws IOException {
key = context.createMapOutputKeyRecord();
}
/**
* 按sellerid为key 处理 每一条记录
*
* @param recordNum
* @param record
* @param context
* @throws IOException
*/
@Override
public void map(long recordNum, Record record, TaskContext context) throws IOException {
key.setBigint(SELLER_ID_KEY, record.getBigint(SELLER_ID_KEY));
// 增加数据过滤,源表中存在platform buyer_type维度的cube数据,业务上不需要对此维度进行下钻分析,过滤出来两个维度=-9999的数据
if (filter(record)) {
context.write(key, record);
}
}
/**
* 对数据进行过滤,
* 源表中存在platform buyer_type维度的cube数据,业务上不需要对此维度进行下钻分析,过滤出来两个维度=-9999的数据
*
* @param record
* @return
*/
private boolean filter(Record record) {
return StringUtils.equals(record.getString(PLATFORM_KEY), DataKeys.TOTAL_DV)
&& StringUtils.equals(record.getString(BUYER_TYPE_KEY), DataKeys.TOTAL_DV);
}
}
package com.aliexpress.diagnose.mr;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import com.alibaba.fastjson.JSON;
import com.aliexpress.diagnose.FluctuationService;
import com.aliexpress.diagnose.indicator.RecordRepository;
import com.aliexpress.diagnose.request.FluctuationAnalysisParam;
import com.aliexpress.diagnose.result.model.ResultData;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.ReducerBase;
import lombok.extern.slf4j.Slf4j;
import static com.aliexpress.diagnose.mr.MrDataRepositoryProvider.ALL_DIMENSIONS;
import static com.aliexpress.diagnose.mr.MrDataRepositoryProvider.SELLER_ID_KEY;
/**
* @date 2021/3/19
*/
@Slf4j
public class FluctuationReducer extends ReducerBase {
private Record result;
@Override
public void setup(TaskContext context) throws IOException {
result = context.createOutputRecord();
}
/**
* 同一sellerid的所有记录在一个reduce节点聚合
* 核心就是将所有记录 转换为RecordRepositoryImpl的indicatorValues结构
*
* @param key
* @param values
* @param context
* @throws IOException
*/
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
Long sellerId = key.getBigint(SELLER_ID_KEY);
RecordRepository repository = new MrDataRepositoryProvider(sellerId, values).create();
FluctuationService fluctuationService = new FluctuationService(repository);
Args args = JSON.parseObject(context.getJobConf().get(Args.ARGS), Args.class);
FluctuationAnalysisParam param = FluctuationAnalysisParam.builder()
.indexCode("pay_amt_1d")
.statDate(null)
.currentDimensionAndValue(new ArrayList<>())
.drillDownDimensions(ALL_DIMENSIONS)
.contributionLowerLimit(args.getContributionLowerLimit().doubleValue())
.cutByDimValueChangeDistribution(args.isCutByDimValueChangeDistribution())
.resultGenStrategy(args.getResultGenStrategy())
.resultTopN(args.getResultTopN())
.build();
ResultData resultData = null;
try {
resultData = fluctuationService.fluctuationAnalysis(param);
log.info("reducerResult {} {}", sellerId, JSON.toJSONString(resultData));
} catch (Exception e) {
log.info("fluctuationAnalysisError {}", sellerId, e);
}
if (resultData != null) {
result.setBigint(SELLER_ID_KEY, sellerId);
result.setString("idx_name", "pay_amt_1d");
result.setString("result", JSON.toJSONString(resultData));
result.setString("date_range", args.getDateRange());
context.write(result);
}
}
@Override
public void cleanup(TaskContext context) throws IOException {
}
}
package com.aliexpress.diagnose.mr;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.aliexpress.diagnose.GmvDims;
import com.aliexpress.diagnose.indicator.RecordRepository;
import com.aliexpress.diagnose.indicator.impl.RecordRepositoryImpl;
import com.aliexpress.diagnose.indicator.model.DataKeys;
import com.aliyun.odps.data.Record;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
/**
* @date 2021/3/18
*/
@Slf4j
public class MrDataRepositoryProvider {
private Iterator<Record> records;
private Long sellerId;
public static final String SELLER_ID_KEY = "seller_admin_seq";
//平台维度
public static final String PLATFORM_KEY = "platform";
//买家维度
public static final String BUYER_TYPE_KEY = "buyer_type";
public static final List<String> ALL_DIMENSIONS = GmvDims.getAllDims();
private static final List<String> indicators = Arrays.asList("pay_amt_per_1d", "pay_amt_1d");
public MrDataRepositoryProvider(Long sellerId, Iterator<Record> records) {
this.sellerId = sellerId;
this.records = records;
}
public RecordRepository create() {
Map<String, Map<String, Double>> indicatorValues = new HashMap<>(1000);
Map<String, Set<String>> dimensionValues = Maps.newHashMapWithExpectedSize(ALL_DIMENSIONS.size());
ALL_DIMENSIONS.forEach(d -> dimensionValues.put(d, new HashSet<>()));
while (records.hasNext()) {
Record record = records.next();
/**
* 维值枚举添加,剔除 -9999
*/
ALL_DIMENSIONS.forEach(d -> {
String dv = String.valueOf(record.get(d));
if (!DataKeys.TOTAL_DV.equals(dv)) {
dimensionValues.get(d).add(dv);
}
});
/**
* 维度组合 key 对应 指标 map 构建
*/
Map<String, String> currentDV = Maps.newHashMapWithExpectedSize(ALL_DIMENSIONS.size());
ALL_DIMENSIONS.forEach(d -> {
String dv = String.valueOf(record.get(d));
currentDV.put(d, dv);
});
String key = DataKeys.encode(currentDV, ALL_DIMENSIONS);
Map<String, Double> currentV = Maps.newHashMapWithExpectedSize(indicators.size());
for (String indicator : indicators) {
currentV.put(indicator, Double.valueOf(String.valueOf(record.get(indicator))));
}
indicatorValues.put(key, currentV);
}
//log.info("repoDimensionValues {},{}", this.sellerId, JSON.toJSONString(dimensionValues));
return RecordRepositoryImpl.builder()
.indicatorValues(indicatorValues)
.dimensionValues(dimensionValues)
.build();
}
}
task
package com.aliexpress.diagnose.task;
import java.util.concurrent.ForkJoinPool;
import com.aliexpress.diagnose.indicator.RecordRepository;
import com.aliexpress.diagnose.node.NodeFactory;
import com.aliexpress.diagnose.node.NodeService;
import com.aliexpress.diagnose.node.model.BaseNode;
import com.aliexpress.diagnose.request.AnalysisConfig;
/**
* @author antone
*/
public class AnalysisJob {
private RecordRepository recordRepository;
private NodeService nodeService;
private NodeFactory nodeFactory;
public AnalysisJob(RecordRepository recordRepository) {
this.recordRepository = recordRepository;
this.nodeService = new NodeService(recordRepository);
this.nodeFactory = new NodeFactory(recordRepository);
}
public BaseNode analysis(AnalysisConfig analysisConfig) {
// 1. 初始构建根节点
BaseNode rootNode = nodeService.createRootNode(analysisConfig);
if(rootNode.withNoFluctuation()) {
return rootNode;
}
// 2. 开始下钻
// TODO 测试性能,优化ForkJoinPool的使用
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() + 1);
DrillDownNodeTask drillDownNodeTask = new DrillDownNodeTask(rootNode, analysisConfig, nodeFactory);
return forkJoinPool.invoke(drillDownNodeTask);
}
}
package com.aliexpress.diagnose.task;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.RecursiveTask;
import java.util.stream.Collectors;
import com.aliexpress.diagnose.indicator.IndexRepository;
import com.aliexpress.diagnose.indicator.model.Index;
import com.aliexpress.diagnose.node.CutService;
import com.aliexpress.diagnose.node.NodeFactory;
import com.aliexpress.diagnose.node.model.BaseNode;
import com.aliexpress.diagnose.node.model.CreateDrillDownNodeParam;
import com.aliexpress.diagnose.request.AnalysisConfig;
/**
* @author minglei.jml
*/
public class DrillDownNodeTask extends RecursiveTask<BaseNode> {
/**
* 当前待下钻的根节点
*/
private BaseNode node;
private AnalysisConfig config;
private NodeFactory nodeFactory;
public DrillDownNodeTask(BaseNode node, AnalysisConfig config, NodeFactory nodeFactory) {
this.node = node;
this.config = config;
this.nodeFactory = nodeFactory;
}
@Override
protected BaseNode compute() {
// 维度下钻
scanDimensions();
return node;
}
private void scanDimensions() {
Index index = IndexRepository.getInstance().get(config.getIndexCode());
//key:下钻维度;存储每个维度的下一级下钻任务
Map<String, List<DrillDownNodeTask>> subTaskMap = new HashMap<>(2);
//key:下钻维度;存储每个维度下的下一级子节点
Map<String, List<BaseNode>> subNodeMap = new HashMap<>(2);
if (node.getDrillDownDimensions() == null || node.getDrillDownDimensions().isEmpty()) {
// 没有可下钻维度
return;
}
// 对每个可下钻维度分别下钻
node.getDrillDownDimensions().forEach(
dimension -> {
CreateDrillDownNodeParam param = CreateDrillDownNodeParam.builder()
.fatherNode(node)
.indexType(index.getType())
.chosenDrillDownDimension(dimension)
.allDimensions(index.getDimensions())
.statDate(config.getStatDate())
.build();
List<BaseNode> subNodes = nodeFactory.createDrillDownNode(param);
// 剪枝
if (subNodes != null) {
subNodes = CutService.getInstance().cutNode(subNodes, config, node, dimension);
}
// 创建子节点的下钻任务
if (subNodes != null && subNodes.size() != 0) {
List<DrillDownNodeTask> tasks = subNodes.stream().map(subNode -> {
DrillDownNodeTask task = new DrillDownNodeTask(subNode, config, nodeFactory);
// fork:开启一个新线程(或重用线程池里的空闲线程),将任务交给该线程处理
task.fork();
return task;
}).collect(Collectors.toList());
subTaskMap.put(dimension, tasks);
}
}
);
// 递归执行 每个子节点任务,并将子节点结果合并
// join: 等待线程处理完毕,得到返回值
subTaskMap.forEach((key, value) -> subNodeMap.put(key,
value.stream().map(v -> v.join()).collect(Collectors.toList())));
node.setSubNodes(subNodeMap);
}
}
node
package com.aliexpress.diagnose.node.model;
import java.util.List;
/**
* @author minglei.jml
*/
public interface Node {
/**
* 计算当前节点贡献度
* @return contribution of current node
*/
Double computeContribution();
Double computeFluctuationRatio();
/**
* 计算根节点的变化率
* @return
*/
Double computeRootNodeFluctuationRatio();
/**
* 获取当前节点所有子节点
* @return
*/
List<BaseNode> getAllSubNodes();
/**
* copy root节点的total值 到 当前节点的total字段上
* @param node
*/
void copyTotalValue(BaseNode node);
/**
* 把当前节点的值赋值给total,主要用在根节点
*/
void copyValueToTotal();
}
package com.aliexpress.diagnose.node.model;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import com.aliexpress.diagnose.util.DimUtils;
import com.aliexpress.diagnose.util.MathUtils;
import com.google.common.base.Joiner;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author minglei.jml
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class BaseNode implements Node {
private String indexCode;
//private Map<String, String> currentDimensionAndValue;
/**
* 注意这里是个list结构,是为了在当前节点能直接还原出拆解路径,便于排查问题等
* 比如:当前维值为 [B:b, C:c, A:a], 那么必然是从B->C->A这条拆解路径来的,而不是其他A-B-C等路径
*/
private List<DimValue> currentDimensionAndValue;
/**
* 可下钻维度
*/
private List<String> drillDownDimensions;
/**
* 子节点
* key: 每个下钻的维度名
* value:维度下的子节点
*/
private Map<String, List<BaseNode>> subNodes;
/**
* 贡献度
*/
private Double contribution;
/**
* 深度
*/
private Integer depth;
/**
* 节点的对比值
*/
private Double oldValue;
/**
* 节点的当前值
*/
private Double newValue;
/**
* 根节点的对比值
*/
private Double oldTotalValue;
/**
* 根节点的当前值
*/
private Double newTotalValue;
//private Integer cutNodesCount;
BaseNode(String indexCode, List<DimValue> currentDimensionAndValue, List<String> drillDownDimensions,
Integer depth, Double oldValue, Double newValue, Double oldTotalValue, Double newTotalValue) {
this.indexCode = indexCode;
this.currentDimensionAndValue = currentDimensionAndValue;
this.drillDownDimensions = drillDownDimensions;
this.depth = depth;
this.oldValue = oldValue;
this.newValue = newValue;
this.oldTotalValue = oldTotalValue;
this.newTotalValue = newTotalValue;
}
@Override
public List<BaseNode> getAllSubNodes() {
List<BaseNode> nodes = new ArrayList<>();
if (subNodes == null || subNodes.isEmpty()) {
return nodes;
}
subNodes.values().forEach(nodes::addAll);
return nodes;
}
@Override
public Double computeContribution() {
return null;
}
@Override
public Double computeFluctuationRatio() {
return MathUtils.ratio(getNewValue(), getOldValue());
}
@Override
public Double computeRootNodeFluctuationRatio() {
return MathUtils.ratio(getNewTotalValue(), getOldTotalValue());
}
@Override
public void copyTotalValue(BaseNode node) {
}
@Override
public void copyValueToTotal() {
}
/**
* 判断当前节点是否包含另一节点
*
* 判断规则:
* 路径 A-B-C 包含 路径 A-B
* 路径A-C-B 也包含路径 A-B
*
* @param compareNode
* @return
*/
public boolean containsNode(BaseNode compareNode) {
Map<String, String> currentDimValueMap = this.dimValueListToMap();
Map<String, String> compareDimValueMap = compareNode.dimValueListToMap();
// 先判断当前维度集合是否完全包含对比的维度集合
if (!currentDimValueMap.keySet().containsAll(compareDimValueMap.keySet())) {
return false;
}
// 在当前维度集合是否完全包含对比的维度集合的前提下,currentNode所有维值都包含compareNode的对应维值才认为currentNode contains compareNode
return !currentDimValueMap.keySet().stream()
.map(dim -> DimUtils.dimValueContains(currentDimValueMap.get(dim),
compareDimValueMap.get(dim)))
.collect(Collectors.toList())
.contains(false);
}
public Map<String, String> dimValueListToMap() {
// list 转 map
return currentDimensionAndValue.stream().collect(
Collectors.toMap(DimValue::getDim, DimValue::getValue));
}
/**
* dimValue转换为string格式:a=a1^b=b1...
*
* @return
*/
public String dimValueToString() {
return Joiner.on("^").join(
currentDimensionAndValue.stream()
.map(dimValue -> dimValue.buildDesc())
.collect(Collectors.toList()))
;
}
/**
* 该节点的描述
*
* @return
*/
public String buildNodeDesc() {
return String.format("%s(贡献度:%s, 当前值%s, 对比值%s, 变化率%s)", dimValueToString(), contribution, getNewValue(),
getOldValue(), computeFluctuationRatio());
}
/**
* 判断根节点是否无波动
* @return
*/
public boolean withNoFluctuation() {
if(this.getOldTotalValue() == null || this.getNewTotalValue() == null) {
return true;
}
return this.getOldTotalValue().equals(this.getNewTotalValue());
}
@Override
public boolean equals(Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }
BaseNode baseNode = (BaseNode)o;
return Objects.equals(indexCode, baseNode.indexCode) &&
// 转换为map后再比较其是否相等
Objects.equals(this.dimValueListToMap(), baseNode.dimValueListToMap());
}
@Override
public int hashCode() {
return Objects.hash(indexCode, this.dimValueListToMap());
}
}
package com.aliexpress.diagnose.node.model;
import java.util.List;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
/**
* 累加型节点
*
* @author minglei.jml
*/
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
@Slf4j
public class AddUpNode extends BaseNode {
@Builder(builderMethodName = "addUpNodeBuilder")
public AddUpNode(String indexCode, List<DimValue> currentDimensionAndValue,
List<String> drillDownDimensions, Integer depth, Double oldValue, Double newValue,
Double oldTotalValue, Double newTotalValue) {
super(indexCode, currentDimensionAndValue, drillDownDimensions, depth, oldValue, newValue, oldTotalValue,
newTotalValue);
}
@Override
public Double computeContribution() {
if (!paramCheck()) {
throw new RuntimeException("factor is blank, can't compute contribution. node is " +
getCurrentDimensionAndValue().toString() + "; new value : " + getNewValue() +
"; old value : " + getOldValue() + "; new total value : " + getNewTotalValue() +
"; old total value : " + getOldTotalValue());
}
if (this.getOldTotalValue().equals(this.getNewTotalValue())) {
log.error("denominator is zero, oldTotalValue = newTotalValue, can't compute contribution. node is {}",
getCurrentDimensionAndValue().toString());
// 暂且设置为贡献度为0
this.setContribution(0d);
} else {
this.setContribution((getNewValue() - getOldValue()) / (getNewTotalValue() - getOldTotalValue()));
}
return this.getContribution();
}
@Override
public void copyTotalValue(BaseNode node) {
if (!(node instanceof AddUpNode)) {
throw new RuntimeException(
"copy node type not match,can't copy total value. node is " + getCurrentDimensionAndValue().toString());
}
AddUpNode copyNode = (AddUpNode)node;
this.setOldTotalValue(copyNode.getOldTotalValue());
this.setNewTotalValue(copyNode.getNewTotalValue());
}
private boolean paramCheck() {
return getNewValue() != null && getNewTotalValue() != null && getOldValue() != null
&& getOldTotalValue() != null;
}
@Override
public void copyValueToTotal() {
this.setOldTotalValue(this.getOldValue());
this.setNewTotalValue(this.getNewValue());
}
}
/**
* Alipay.com Inc.
* Copyright (c) 2004-2021 All Rights Reserved.
*/
package com.aliexpress.diagnose.node.repository;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import com.aliexpress.diagnose.indicator.Record;
import com.aliexpress.diagnose.indicator.RecordParam;
import com.aliexpress.diagnose.indicator.RecordRepository;
import com.aliexpress.diagnose.indicator.model.IndexType;
import com.aliexpress.diagnose.indicator.model.Pair;
import com.aliexpress.diagnose.node.model.AddUpNode;
import com.aliexpress.diagnose.node.model.BaseNode;
import com.aliexpress.diagnose.node.model.DimValue;
import com.google.common.collect.Lists;
/**
* @author antone
*/
public class AddUpNodeDataRepositoryImpl implements NodeDataRepository {
private RecordRepository recordRepository;
public AddUpNodeDataRepositoryImpl(RecordRepository recordRepository) {
this.recordRepository = recordRepository;
}
@Override
public BaseNode query(RecordParam param) {
Optional<Record> record = recordRepository.query(param);
if (record.isPresent()) {
return buildAddUpNode(param, record.get());
}
return null;
}
@Override
public List<BaseNode> query(RecordParam param, String drillDownDimension) {
List<Record> records = recordRepository.query(param, drillDownDimension);
if (records == null) {
return Collections.emptyList();
}
return records.stream()
.map(record -> buildAddUpNode(param, drillDownDimension, record))
.collect(Collectors.toList());
}
private AddUpNode buildAddUpNode(RecordParam param, Record record) {
// 节点数据的查询时约定固定只能查一个指标的数据
Pair indicatorValue = record.getIndicatorValues().get(param.getIndicators().get(0));
return AddUpNode.addUpNodeBuilder()
//
.currentDimensionAndValue(param.getCurrentDimensionAndValue())
.oldValue(indicatorValue.getCompare())
.newValue(indicatorValue.getCurrent())
.build();
}
private AddUpNode buildAddUpNode(RecordParam param, String drillDownDimension, Record record) {
// 节点数据的查询时约定固定只能查一个指标的数据
Pair indicatorValue = record.getIndicatorValues().get(param.getIndicators().get(0));
// !!! 注意:不要修改原变量
List<DimValue> dimValues = Lists.newArrayList(param.getCurrentDimensionAndValue());
// 增加下钻维度值
dimValues.add(DimValue.builder()
.dim(drillDownDimension)
.value(record.getDimensionValues().get(drillDownDimension))
.build()
);
return AddUpNode.addUpNodeBuilder()
.currentDimensionAndValue(dimValues)
.oldValue(indicatorValue.getCompare())
.newValue(indicatorValue.getCurrent())
.build();
}
@Override
public boolean accept(String indexType) {
return IndexType.ADD_UP.getCode().equals(indexType);
}
}
/**
* Alipay.com Inc.
* Copyright (c) 2004-2021 All Rights Reserved.
*/
package com.aliexpress.diagnose.node;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import com.aliexpress.diagnose.node.model.BaseNode;
import com.aliexpress.diagnose.request.AnalysisConfig;
import com.aliexpress.diagnose.util.MathUtils;
import lombok.extern.slf4j.Slf4j;
/**
* 剪枝
*
* @author antone
*/
@Slf4j
public class CutService {
private static final CutService INSTANCE = new CutService();
private CutService() {
}
public static CutService getInstance() {
return INSTANCE;
}
/**
* 剪枝
*
* @param originalNodes 剪枝前
* @param config
* @return 剪枝后
*/
public List<BaseNode> cutNode(List<BaseNode> originalNodes, AnalysisConfig config, BaseNode parentNode,
String currentDrillDownDimension) {
List<BaseNode> ret = new ArrayList<>();
if (config.isCutByDimValueChangeDistribution()) {
// 如果维值的变化量都相差不大,则全部剪掉(相当于不分析该维度)
if (!MathUtils.hasOutliers(
originalNodes.stream()
// 计算节点的变化值
.map(node -> node.getNewValue() - node.getOldValue())
.collect(Collectors.toList()))
) {
log.info("[CutService.cutNode] 维值的变化量都相差不大,全部剪掉, 当前父节点为{}, 当前下钻维度为{}",
parentNode.getCurrentDimensionAndValue(), currentDrillDownDimension);
return ret;
}
}
originalNodes.forEach(node -> {
// 计算贡献度
if (node.getContribution() == null) {
node.computeContribution();
}
// 贡献度 <= 阈值,则剪枝
if (node.getContribution() > config.getContributionLowerLimit()) {
ret.add(node);
} else {
log.info(
"贡献度{} <= 阈值{},剪枝, 当前父节点为{}, 下钻节点为{}, oldValue:{}, newValue{}, oldTotalValue:{}, newTotalValue:{}",
node.getContribution(),
config.getContributionLowerLimit(), parentNode.getCurrentDimensionAndValue(),
node.getCurrentDimensionAndValue(), node.getOldValue(), node.getNewValue(), node.getOldTotalValue(),
node.getNewTotalValue());
}
});
return ret;
}
}
package com.aliexpress.diagnose.node;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import com.aliexpress.diagnose.indicator.RecordRepository;
import com.aliexpress.diagnose.node.model.BaseNode;
import com.aliexpress.diagnose.node.model.CreateBasicNodeParam;
import com.aliexpress.diagnose.node.model.CreateDrillDownNodeParam;
import com.aliexpress.diagnose.node.repository.NodeDataRepoExecutor;
/**
* Node工厂,负责生产Node,单例
*
* @author minglei.jml
*/
public class NodeFactory {
private RecordRepository recordRepository;
private NodeDataRepoExecutor nodeDataRepoExecutor;
public NodeFactory(RecordRepository recordRepository) {
this.recordRepository = recordRepository;
// init 属性
nodeDataRepoExecutor = new NodeDataRepoExecutor(recordRepository);
}
/**
* 创建下钻节点
*
* @param param
* @return
*/
public List<BaseNode> createDrillDownNode(CreateDrillDownNodeParam param) {
Objects.requireNonNull(param, "[createDrillDownNode] param is null");
Objects.requireNonNull(param.getFatherNode(), "[createRootNode] param getFatherNode is null");
return doCreate(param);
}
/**
* 创建根节点
*
* @param param
* @return
*/
public BaseNode createRootNode(CreateBasicNodeParam param) {
Objects.requireNonNull(param, "[createRootNode] param is null");
Objects.requireNonNull(param.getIndexCode(), "[createRootNode] param indexCode is null");
return doCreateRootNode(param);
}
private BaseNode doCreateRootNode(CreateBasicNodeParam param) {
// 指标查询
// caution: 这里的BaseNode返回已经是实例化了的类型 AddUpNode / RatioNode
BaseNode resultNode = nodeDataRepoExecutor.query(param.getIndexType(), param.convert2RecordParam());
resultNode.setIndexCode(param.getIndexCode());
resultNode.setDrillDownDimensions(param.getDrillDownDimensions());
resultNode.setCurrentDimensionAndValue(param.getCurrentDimensionAndValue());
resultNode.setDepth(param.getDepth());
// 注意这里:将根节点本身的值copy 到 整体值上
resultNode.copyValueToTotal();
resultNode.computeContribution();
return resultNode;
}
private List<BaseNode> doCreate(CreateDrillDownNodeParam param) {
// 指标查询
List<BaseNode> baseNodes = nodeDataRepoExecutor.query(param.getIndexType(), param.convert2RecordParam(),
param.getChosenDrillDownDimension());
List<String> drillDownDimensions = param.getFatherNode().getDrillDownDimensions();
// 所有父节点的可下钻维度中,剔除当前下钻维度,即是当前节点的可下钻维度(也可理解为同层维度中的其他维度都是)
List<String> subDimensions = drillDownDimensions.stream().filter(
dim -> !dim.equals(param.getChosenDrillDownDimension())).collect(Collectors.toList());
baseNodes.forEach(baseNode -> {
baseNode.setIndexCode(param.getFatherNode().getIndexCode());
baseNode.setDrillDownDimensions(subDimensions);
baseNode.setDepth(param.getFatherNode().getDepth() + 1);
// copy 根节点值
baseNode.copyTotalValue(param.getFatherNode());
baseNode.computeContribution();
});
return baseNodes;
}
}
/**
* Alipay.com Inc.
* Copyright (c) 2004-2021 All Rights Reserved.
*/
package com.aliexpress.diagnose.node;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import com.aliexpress.diagnose.indicator.IndexRepository;
import com.aliexpress.diagnose.indicator.RecordRepository;
import com.aliexpress.diagnose.indicator.model.Index;
import com.aliexpress.diagnose.node.model.BaseNode;
import com.aliexpress.diagnose.node.model.CreateBasicNodeParam;
import com.aliexpress.diagnose.request.AnalysisConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
/**
* @author antone
*/
@Slf4j
public class NodeService {
private RecordRepository recordRepository;
private NodeFactory nodeFactory;
public NodeService() {
}
public NodeService(RecordRepository recordRepository) {
this.recordRepository = recordRepository;
this.nodeFactory = new NodeFactory(recordRepository);
}
/**
* 初始创建根节点
*
* @param analysisConfig
* @return
*/
public BaseNode createRootNode(AnalysisConfig analysisConfig) {
Index index = IndexRepository.getInstance().get(analysisConfig.getIndexCode());
CreateBasicNodeParam param = CreateBasicNodeParam.builder()
.indexCode(analysisConfig.getIndexCode())
.indexType(index.getType())
.currentDimensionAndValue(analysisConfig.getCurrentDimensionAndValue())
.drillDownDimensions(analysisConfig.getDrillDownDimensions())
.allDimensions(index.getDimensions())
//根节点 深度 定义为0
.depth(0)
.statDate(analysisConfig.getStatDate())
.build();
BaseNode rootNode = nodeFactory.createRootNode(param);
return rootNode;
}
/**
* 将树转换为一个 List
*
* @param rootNode
*/
public List<BaseNode> tree2List(BaseNode rootNode) {
List<BaseNode> retList = new ArrayList<>();
retList.add(rootNode);
if (rootNode.getSubNodes() != null && !rootNode.getSubNodes().isEmpty()) {
rootNode.getSubNodes().values().stream().forEach(
v -> v.stream().forEach(baseNode -> retList.addAll(tree2List(baseNode))));
}
return retList;
}
/**
* 获取所有叶子节点
*
* @param rootNode
* @return
*/
public List<BaseNode> getLeafNodes(BaseNode rootNode) {
List<BaseNode> retList = new ArrayList<>();
if (rootNode.getSubNodes() != null && !rootNode.getSubNodes().isEmpty()) {
rootNode.getSubNodes().values().stream().forEach(
v -> v.stream().forEach(baseNode -> retList.addAll(getLeafNodes(baseNode))));
} else {
retList.add(rootNode);
}
return retList;
}
/**
* 按贡献度排序节点,从大到小
*
* @param nodes
*/
public List<BaseNode> sortNodeByContribution(List<BaseNode> nodes) {
Collections.sort(nodes, (BaseNode o1, BaseNode o2) ->
o2.getContribution().compareTo(o1.getContribution())
);
return nodes;
}
/**
* 去重
* A-B-C 和 B-A-C 属于同一条路径
*
* @param nodes
* @return
*/
public List<BaseNode> distinct(List<BaseNode> nodes) {
if (nodes == null || nodes.isEmpty()) {
return new ArrayList<>();
}
return nodes.stream().distinct().collect(Collectors.toList());
}
public void printDiagnoseTree(BaseNode rootNode) {
// 1. 得到所有节点
List<BaseNode> allNodes = tree2List(rootNode);
// 2. 去重
List<BaseNode> distinctedNodes = distinct(allNodes);
//log.info("[printDiagnoseTree]distinctedNodes: {}", distinctedNodes);
// 3. 以去重后的节点再次构建树,构建方法:在所有原始节点(rootNode)中只留下distinctedNodes
removeDuplicateNode(rootNode, distinctedNodes);
// 4. 打印树
displaytree(rootNode, 0);
}
/**
* 在所有原始节点(rootNode)中只留下distinctedNodes
*
* @param rootNode
*/
public void removeDuplicateNode(BaseNode rootNode, List<BaseNode> distinctedNodes) {
if (rootNode.getSubNodes() != null && !rootNode.getSubNodes().isEmpty()) {
Iterator<Entry<String, List<BaseNode>>> mapIterator = rootNode.getSubNodes().entrySet().iterator();
while (mapIterator.hasNext()) {
Entry<String, List<BaseNode>> entry = mapIterator.next();
Iterator<BaseNode> iterator = entry.getValue().iterator();
while (iterator.hasNext()) {
BaseNode baseNode = iterator.next();
removeDuplicateNode(baseNode, distinctedNodes);
if (!nodeAbstractContains(distinctedNodes, baseNode)) {
iterator.remove();
}
}
if (CollectionUtils.isEmpty(entry.getValue())) {
mapIterator.remove();
}
}
}
}
/**
* 判断nodesContainer是否绝对包含toCompareNode
* 绝对包含的含义:dimValue完全equal,包含dim的顺序也要相等
*
* @param nodesContainer
* @param toCompareNode
* @return
*/
public boolean nodeAbstractContains(List<BaseNode> nodesContainer, BaseNode toCompareNode) {
for (BaseNode baseNode : nodesContainer) {
if (StringUtils.equals(baseNode.dimValueToString(), toCompareNode.dimValueToString())) {
return true;
}
}
return false;
}
/**
* 打印树
*
* @param f
* @param level
*/
private static void displaytree(BaseNode f, int level) { //递归显示树
String preStr = "";
for (int i = 0; i < level; i++) {
preStr += " ";
}
if (f.getSubNodes() != null && !f.getSubNodes().isEmpty()) {
List<BaseNode> allSubNodes = f.getSubNodes().values().stream().flatMap(list -> list.stream()).collect(
Collectors.toList());
for (BaseNode baseNode : allSubNodes) {
log.info(preStr + baseNode.buildNodeDesc());
if (baseNode.getSubNodes() != null && !baseNode.getSubNodes().isEmpty()) {
displaytree(baseNode, level + 1);
}
}
}
}
}
入口
/**
* Alipay.com Inc.
* Copyright (c) 2004-2021 All Rights Reserved.
*/
package com.aliexpress.diagnose;
import com.aliexpress.diagnose.indicator.RecordRepository;
import com.aliexpress.diagnose.node.NodeService;
import com.aliexpress.diagnose.node.model.BaseNode;
import com.aliexpress.diagnose.request.AnalysisConfig;
import com.aliexpress.diagnose.request.FluctuationAnalysisParam;
import com.aliexpress.diagnose.result.ResultGenerateStrategyFactory;
import com.aliexpress.diagnose.result.model.ResultData;
import com.aliexpress.diagnose.task.AnalysisJob;
import lombok.extern.slf4j.Slf4j;
/**
* 波动分析入口
*
* @author antone
*/
@Slf4j
public class FluctuationService {
private RecordRepository recordRepository;
private AnalysisJob analysisJob;
private NodeService nodeService;
public FluctuationService(RecordRepository recordRepository) {
this.recordRepository = recordRepository;
this.analysisJob = new AnalysisJob(recordRepository);
this.nodeService = new NodeService();
}
/**
* 波动分析入口
*
* @param param
* @return
*/
public ResultData fluctuationAnalysis(FluctuationAnalysisParam param) {
// 1. 构建波动分析入参
AnalysisConfig analysisConfig = param.getAnalysisConfig();
// 2. 执行波动分析
BaseNode rootNode = analysisJob.analysis(analysisConfig);
// 没波动,则直接返回
if(rootNode.withNoFluctuation()) {
return ResultData.builder()
.oldTotalValue(rootNode.getOldTotalValue())
.newTotalValue(rootNode.getNewTotalValue())
.build();
}
// 打印整个拆解结果树
nodeService.printDiagnoseTree(rootNode);
// 3. 构造结果
ResultData resultData = ResultGenerateStrategyFactory.create(param.getResultGenStrategy())
.generate(rootNode, param.getResultTopN());
return resultData;
}
}
request
package com.aliexpress.diagnose.request;
import java.util.List;
import com.aliexpress.diagnose.node.model.DimValue;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
/**
* 诊断模型 所需的 分析参数model
*
* @author minglei.jml
*/
@Data
@Builder
@EqualsAndHashCode
@NoArgsConstructor
@AllArgsConstructor
public class AnalysisConfig {
/**
* 指标code
*/
private String indexCode;
/**
* 当前指标的维度和维值
*/
private List<DimValue> currentDimensionAndValue;
/**
* 可下钻维度
*/
private List<String> drillDownDimensions;
//private Map<Integer, Double> cutSubNodeStrategy;
/**
* 分析日期
*/
private String statDate;
/**
* 贡献度阈值
*/
private Double contributionLowerLimit;
/**
* 是否要根据维度下维值变化的离散度剪枝
*/
private boolean cutByDimValueChangeDistribution;
}
package com.aliexpress.diagnose.request;
import java.util.List;
import com.aliexpress.diagnose.node.model.DimValue;
import com.aliexpress.diagnose.result.model.ResultGenStrategyEnum;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
/**
* @author minglei.jml
* 波动分析原始入参
*/
@Data
@Builder
@EqualsAndHashCode
@NoArgsConstructor
@AllArgsConstructor
public class FluctuationAnalysisParam {
/**
* 指标code,直接以物理列名作为指标code,比如 "pay_amt_1d"(最近一天的支付金额)
*
* GMV诊断表参考:https://dmc.dw.alibaba-inc.com/dm/table/odps.aebi.adi_ae_sycm_trd_diag_analysis_1d/detail/col#/
*/
private String indexCode;
/**
* 要分析的指标对应的维度和维值,比如要分析RU的gmv波动,则currentDimensionAndValue为"country":"RU"
*/
private List<DimValue> currentDimensionAndValue;
/**
* 多维分析维度列表,比如要分析平台、类目两个维度,则列表为["platform","category"],也以物理列名表示
*/
private List<String> drillDownDimensions;
/**
* 分析日期, 格式:yyyyMMdd
*/
private String statDate;
/**------------- 剪枝策略相关参数start ---------------------------- */
/**
* 贡献度下限(低于下限的不再下钻)
*/
private Double contributionLowerLimit;
/**
* 是否要根据维度下维值变化的离散度剪枝
*/
private boolean cutByDimValueChangeDistribution;
/**------------- 剪枝策略相关参数end---------------------------- */
/**------------- 结果生成策略相关参数start ---------------------------- */
/**
* 结果生成策略
*
* @see ResultGenStrategyEnum
*
* 传递ResultGenStrategyEnum的code
*/
private String resultGenStrategy;
/**
* 保留几条最终结果,默认三条
*/
private int resultTopN = 3;
/**------------- 结果生成策略相关参数end ---------------------------- */
/**
* model转换
*
* @return
*/
public AnalysisConfig getAnalysisConfig() {
return AnalysisConfig.builder()
.indexCode(indexCode)
.drillDownDimensions(drillDownDimensions)
.contributionLowerLimit(contributionLowerLimit)
.currentDimensionAndValue(currentDimensionAndValue)
.statDate(statDate)
.build();
}
}
result
/**
* Alipay.com Inc.
* Copyright (c) 2004-2021 All Rights Reserved.
*/
package com.aliexpress.diagnose.result.model;
/**
* 实现三种方式:
* 1. 所有层级所有节点排序,不合并"包含路径"(ABC包含AB)
* 2. 所有层级所有节点排序,合并"包含路径"(ABC包含AB)
* 3. 只拿所有叶子节点排序
* @author antone
*/
public enum ResultGenStrategyEnum {
ALL_NODES_WITHOUT_PATH_MERGE("ALL_NODES_WITHOUT_PATH_MERGE", "所有层级所有节点排序,不合并「包含路径」(认为A-B-C包含A-B)"),
ALL_NODES_WITH_PATH_MERGE("ALL_NODES_WITH_PATH_MERGE", "所有层级所有节点排序,合并「包含路径」(认为A-B-C包含A-B)"),
LEAF_NODES_SORTED("LEAF_NODES_SORTED", "只拿所有叶子节点排序");
private String code;
private String desc;
ResultGenStrategyEnum(String code, String desc) {
this.code = code;
this.desc = desc;
}
public String getCode() {
return code;
}
public String getDesc() {
return desc;
}
}
/**
* Alipay.com Inc.
* Copyright (c) 2004-2021 All Rights Reserved.
*/
package com.aliexpress.diagnose.result.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import com.aliexpress.diagnose.node.NodeService;
import com.aliexpress.diagnose.node.model.BaseNode;
import com.aliexpress.diagnose.result.model.ResultData;
import com.aliexpress.diagnose.result.model.ResultRecord;
import com.aliexpress.diagnose.util.DimUtils;
import org.apache.commons.collections4.CollectionUtils;
/**
* @author antone
*/
public class BaseResultGenerateStrategy {
protected NodeService nodeService;
public BaseResultGenerateStrategy() {
this.nodeService = new NodeService();
}
public String generateText(ResultRecord resultRecord) {
String textUp = "%s 维度的指标值 上升了 %s, 贡献度为 %s, 当前值为%s, 对比值为%s";
String textDown = "%s 维度的指标值 下降了 %s, 贡献度为 %s, 当前值为%s, 对比值为%s";
String subNodeTextUp = "; 建议重点关注子维度 %s, 其上升了 %s, 贡献度为 %s, 当前值为%s, 对比值为%s";
String subNodeTextDown = "; 建议重点关注子维度 %s, 其下降了 %s, 贡献度为 %s, 当前值为%s, 对比值为%s";
// 决定采用上升还是下降文案
String textReal;
String subNodeTextReal;
if (resultRecord.getFluctuationRatio() == null
|| resultRecord.getFluctuationRatio() > 0) {
textReal = textUp;
subNodeTextReal = subNodeTextUp;
} else {
textReal = textDown;
subNodeTextReal = subNodeTextDown;
}
String ret = String.format(textReal, resultRecord.buildDimValueDesc(),
Math.abs(resultRecord.getFluctuationRatio()),
resultRecord.getContribution(), resultRecord.getCurrentValue(), resultRecord.getCompareValue());
// 有子维度的诊断时,添加子维度的文案
if (resultRecord.getBiggestSubNode() != null) {
ret = ret + String.format(subNodeTextReal, resultRecord.getBiggestSubNode().buildDimValueDesc(),
Math.abs(resultRecord.getBiggestSubNode().getFluctuationRatio()),
resultRecord.getBiggestSubNode().getContribution(),
resultRecord.getBiggestSubNode().getCurrentValue(),
resultRecord.getBiggestSubNode().getCompareValue());
}
return ret;
}
public ResultData genResultData(BaseNode rootNode, List<BaseNode> resultNodes) {
List<ResultRecord> resultRecords = resultNodes.stream().map(
baseNode -> {
if (DimUtils.isConvergent(DimUtils.getDims(baseNode.getCurrentDimensionAndValue()))
|| CollectionUtils.isEmpty(baseNode.getAllSubNodes())) {
// 如果当前路径可收敛,或者当前是叶子节点,直接构造结果
return ResultRecord.create(
baseNode.getCurrentDimensionAndValue(),
baseNode.getContribution(),
baseNode.getNewValue(),
baseNode.getOldValue(),
baseNode.computeFluctuationRatio(),
null);
} else {
// 否则,再往下看看子节点,展示其下所有子节点的top1(所有维度 所有层级一起排序取top1)
return buildResultRecordWithBiggestSubNode(baseNode);
}
}
).collect(Collectors.toList());
// 文案拼装,poc阶段先写在这里,最终也是应用层做文案生成
List<String> resultTexts = resultRecords.stream().map(resultRecord -> generateText(resultRecord))
.collect(Collectors.toList());
return ResultData.builder()
.resultRecords(resultRecords)
.resultTexts(resultTexts)
.newTotalValue(rootNode.getNewTotalValue())
.oldTotalValue(rootNode.getOldTotalValue())
.rootNodeFluctuationRatio(rootNode.computeFluctuationRatio())
.build();
}
private ResultRecord buildResultRecordWithBiggestSubNode(BaseNode baseNode) {
// 得到biggest subNode
List<BaseNode> subNodes = baseNode.getAllSubNodes();
// 去重
List<BaseNode> distinctNodes = nodeService.distinct(subNodes);
// 排序
nodeService.sortNodeByContribution(distinctNodes);
// 取top1
BaseNode biggestSubNode = distinctNodes.get(0);
ResultRecord biggestSubResultRecord = ResultRecord.create(
biggestSubNode.getCurrentDimensionAndValue(),
biggestSubNode.getContribution(),
biggestSubNode.getNewValue(),
biggestSubNode.getOldValue(),
biggestSubNode.computeFluctuationRatio(),
null);
return ResultRecord.create(
baseNode.getCurrentDimensionAndValue(),
baseNode.getContribution(),
baseNode.getNewValue(),
baseNode.getOldValue(),
baseNode.computeFluctuationRatio(),
biggestSubResultRecord);
}
/**
* 深度优先方式保留路径
* 即:A - B 和 A-B-C 认为重复,保留A-B-C
*
*
* 以输出list作为左序列,以输入list为右序列
* 初始在左序列中,加入输入list的第一个元素
*
* 依次拿取右序列中的元素,判断是否和左序列中的元素重复
*
* 如果左元素包含右元素,不做任何处理
* 如果右元素包含左元素,替换左元素
* 否则,右元素和所有左元素均不重复,则加入左序列
*
* @param sortedNodes 已经按贡献度排序好的node序列
* @param topN 保留topN
* @return
*/
public List<BaseNode> getTopResultDeepFirst(List<BaseNode> sortedNodes, int topN) {
List<BaseNode> ret = new ArrayList<>();
ret.add(sortedNodes.get(0));
for (int i = 1; i < sortedNodes.size(); i++) {
BaseNode rightNode = sortedNodes.get(i);
boolean leftContainsRight = false;
boolean rightContainsLeft = false;
for (int j = 0; j < ret.size(); j++) {
BaseNode leftNode = ret.get(j);
if (leftNode.containsNode(rightNode)) {
leftContainsRight = true;
break;
} else if (rightNode.containsNode(leftNode)) {
rightContainsLeft = true;
// 替换所有右边包含左边的元素,这个时候ret中元素可能会有重复,后面去重
ret.set(j, rightNode);
}
}
if (rightContainsLeft) {
// 右包含左, 去重
ret = nodeService.distinct(ret);
}
// 互不包含,则add to rt
if (!leftContainsRight && !rightContainsLeft) {
ret.add(rightNode);
if (ret.size() == topN) {
return ret;
}
}
}
return ret;
}
}
/**
* Alipay.com Inc.
* Copyright (c) 2004-2021 All Rights Reserved.
*/
package com.aliexpress.diagnose.result.impl;
import java.util.List;
import com.aliexpress.diagnose.node.model.BaseNode;
import com.aliexpress.diagnose.result.ResultGenerateStrategy;
import com.aliexpress.diagnose.result.model.ResultData;
import com.aliexpress.diagnose.result.model.ResultGenStrategyEnum;
import org.apache.commons.lang3.StringUtils;
/**
* 所有层级所有节点排序,不合并"包含路径"(ABC包含AB)
* @author antone
*/
public class AllNodesWithoutPathMergeStrategy extends BaseResultGenerateStrategy implements ResultGenerateStrategy {
@Override
public ResultData generate(BaseNode rootNode, int resultTopN) {
// 排序所有节点,贡献度排序,取top
List<BaseNode> nodes = nodeService.tree2List(rootNode);
// 先排除根节点
nodes = nodes.subList(1, nodes.size());
// 去重
List<BaseNode> distinctNodes = nodeService.distinct(nodes);
// 排序
nodeService.sortNodeByContribution(distinctNodes);
// 取top
List<BaseNode> resultNodes = distinctNodes.subList(0, Math.min(resultTopN, nodes.size()));
return genResultData(rootNode, resultNodes);
}
@Override
public boolean accept(String strategy) {
return StringUtils.equalsIgnoreCase(strategy, ResultGenStrategyEnum.ALL_NODES_WITHOUT_PATH_MERGE.getCode());
}
}
/**
* Alipay.com Inc.
* Copyright (c) 2004-2021 All Rights Reserved.
*/
package com.aliexpress.diagnose.result.impl;
import java.util.List;
import com.aliexpress.diagnose.node.model.BaseNode;
import com.aliexpress.diagnose.result.ResultGenerateStrategy;
import com.aliexpress.diagnose.result.model.ResultData;
import com.aliexpress.diagnose.result.model.ResultGenStrategyEnum;
import org.apache.commons.lang3.StringUtils;
/**
* 所有层级所有节点排序,合并"包含路径"(ABC包含AB, 只保留ABC)
*
* @author antone
*/
public class AllNodesWithPathMergeStrategy extends BaseResultGenerateStrategy implements ResultGenerateStrategy {
@Override
public ResultData generate(BaseNode rootNode, int resultTopN) {
List<BaseNode> nodes = nodeService.tree2List(rootNode);
// 先排除根节点
nodes = nodes.subList(1, nodes.size());
// 去重
List<BaseNode> distinctNodes = nodeService.distinct(nodes);
// 排序
nodeService.sortNodeByContribution(distinctNodes);
// 取top, 合并"包含路径"
List<BaseNode> resultNodes =
getTopResultDeepFirst(distinctNodes, resultTopN);
return genResultData(rootNode, resultNodes);
}
@Override
public boolean accept(String strategy) {
return StringUtils.equalsIgnoreCase(strategy, ResultGenStrategyEnum.ALL_NODES_WITH_PATH_MERGE.getCode());
}
}
/**
* Alipay.com Inc.
* Copyright (c) 2004-2021 All Rights Reserved.
*/
package com.aliexpress.diagnose.result.impl;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import com.aliexpress.diagnose.indicator.IndexRepository;
import com.aliexpress.diagnose.node.model.BaseNode;
import com.aliexpress.diagnose.result.ResultGenerateStrategy;
import com.aliexpress.diagnose.result.model.ResultData;
import com.aliexpress.diagnose.result.model.ResultGenStrategyEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
/**
* 只拿所有叶子节点排序
*
* @author antone
*/
@Slf4j
public class LeafNodesSortedStrategy extends BaseResultGenerateStrategy implements ResultGenerateStrategy {
@Override
public ResultData generate(BaseNode rootNode, int resultTopN) {
// 排序叶子节点,贡献度排序,取top
List<BaseNode> nodes = nodeService.getLeafNodes(rootNode);
// 去重
List<BaseNode> distinctNodes = nodeService.distinct(nodes);
// 将维度按业务语义排序
distinctNodes.forEach(node -> {
Map<String, Integer> dimensionOrders = IndexRepository.getInstance().get(node.getIndexCode())
.getDimensionOrders();
Collections.sort(node.getCurrentDimensionAndValue(), (o1, o2) -> {
return dimensionOrders.get(o1.getDim()).compareTo(dimensionOrders.get(o2.getDim()));
});
});
// 将节点按贡献度排序
nodeService.sortNodeByContribution(distinctNodes);
log.info("[LeafNodesSortedStrategy] all leaf node size after distinct is: {}", distinctNodes.size());
// 取top, 合并"包含路径"(由于前面将维度按业务语义进行了排序,这里可能会出现父子包含路径, 比如A-B-C 和 A-C)
List<BaseNode> resultNodes =
getTopResultDeepFirst(distinctNodes, resultTopN);
return genResultData(rootNode, resultNodes);
}
@Override
public boolean accept(String strategy) {
return StringUtils.equalsIgnoreCase(strategy, ResultGenStrategyEnum.LEAF_NODES_SORTED.getCode());
}
}