1、采集数据血缘数据

背景

构建数据血缘主要解决两个问题,查和用。

  • 查:根据表中文注释、或表名进行模糊查询对应表;已知表查找上下游输入、输出
  • 用:快速找到目标业务模型表、判断表是否有下游能否直接进行删除(odps数据地图中,一些任务没有配置依赖)

血缘基本组成

数据血缘基本组成及数据结构

  • 表配置信息:表中英文名、创建者、创建时间、生命周期、是否分区表…
  • 任务实例信息:节点ID、节点类型、状态、调度类型、实例父节点ID列表、实例子节点ID列表
  • 表输入、输出信息:节点ID、节点类型、状态、输入表列表、输出表列表(odps_sql, odps_script、数据集成)
  • 数据源信息:数据源名称、数据源类型、数据源地址、

血缘采集优化

优化要点

  • 提高血缘采集效率,缩短采集时间(通过fileList 获取文件ID,本级版本号,更换原有URL 访问接口)
  • 在全量采集基础上实现增量更新(基于文件修改的最新时间,完成增量更新)
  • 统一对5个odps空间完成血缘采集(提取空间账户信息到配置文件)

获取文件列表:

  1. https://ide.cloud.zj.gov.cn/rest/folder/module/list?projectId={0}&tenantId={1}&useType=0&labelLevels=2&pageNum=1&pageSize=100000'

提取fileId、cloudUuid,lastModifyTime(进行增量更新)

(1)获取所有节点

  1. https://wkbench.cloud.zj.gov.cn/workbench/cwf/node/list?projectId=32762&tenantId=10023&env=prod&sortOrder=&sortField=&includeRelation=false&pageNum=0&pageSize=20&forceUpdateId=0&expand=true&prgTypes=&modifyTime=&nodeType=&searchText=&owner=&solId=&bizId=&resgroupId=&baseLineId=

(2)获取节点上下游关系

父节点:

  1. https://wkbench.cloud.zj.gov.cn/workbench/cwf/node/getNodeListByDepth?projectId=32762&tenantId=10023&env=prod&relation=parent&nodeId={nodeId}&depth=1&detail=false

子节点:

  1. https://wkbench.cloud.zj.gov.cn/workbench/cwf/node/getNodeListByDepth?projectId=32762&tenantId=10023&env=prod&relation=child&nodeId={nodeId}&depth=1&detail=false

(3)获取节点和表关系【包括SQL/数据集成】

获取脚本内容,解析读表、写表

  1. https://wkbench.cloud.zj.gov.cn/workbench/cwf/node/code?projectId=32762&tenantId=10023&env=prod&nodeId={nodeId}

替换后(根据文件ID、文件版本号提取):

  1. https://ide.cloud.zj.gov.cn/rest/file/version?projectId={0}&tenantId={1}&fileId={2}&versions={3}

(4)获取数据源

  1. https://di.cloud.zj.gov.cn/web/di/datasource/list?projectId=32762&tenantId=10023&searchType=&pageSize=1000&pageNum=1

血缘统一采集脚本执行操作

每个脚本完成表、任务、实例信息采集,可以给定一个空间名,或者不指定并行采集5个空间血缘信息

  1. OdpsTableConfig.py ## 采集空间表信息,全量更新
  2. SpiderOdpsFileInfo.py ## 采集空间表信息,全量更新
  3. SpiderTableTaskRela.py ## 采集任务及表依赖信息,增量更新时需指定比较日期(默认前一天),否则标记初始化值为1
  4. OdpsDatasourceConfig.py ## 采集空间数据源信息,全量更新

各个空间血缘信息除了更新到各个前置库表内,同时更新到图数据库对应表。

2、图谱建模

(1)实体模型

序号 节点类型 主键
1 ODPS开发人员 name
2 ODPS任务 taskId
3 ODPS表 tableNameEn
4 ODPS数据源 dsName
5 应用 appName
6 应用表 appTableName

(2)关系模型

实体A 实体A主键 实体B 实体B主键 关系类型
数据源 dsName 应用表 tableName 包含
数据源 dsName 应用 appName 属于
ODPS开发人员 name 任务 taskId 创建
ODPS开发人员 name ODPS表 tableNameEn 创建
任务 taskId 应用表 appTableName 输入
应用表 appTableName 任务 taskId 输入
任务 taskId ODPS表 tableNameEn 输入
ODPS表 tableNameEn 任务 taskId 输入
任务 taskId 任务 taskId 输入
ODPS表 tableNameEn ODPS表 tableNameEn 输入

3、图谱数据入库

(1) 搭建工程

(2)逻辑开发

(3)性能调优

  1. 1> 创建索引
  2. 2> 线程池及重试机制
  3. 3> 使用SSD

4、应用场景

Cypher 语法执行查询操作,参考文档,
https://neo4j.com/docs/cypher-manual/3.5/syntax/
https://neo4j.com/docs/cypher-manual/3.5/syntax/patterns/

(1)查询ODPS中无父节点节点

match (n:TASK) where not ((n:TASK)<-[:INPUT]-(:TASK)) return n

(2)查询任务血缘

match(n:TASK{taskId:"2174800"})-[r:INPUT]->(m:TASK) return n,r,m

(3)查询表血缘

match(n:ODPS_TABLE{tableNameEn:"dim_jgsx"})-[r:INPUT]->(m:TASK)-[r1:INPUT]->(h:ODPS_TABLE) return n,r,m,r1,h

(4)查看某个应用下用到的ODPS的表

整个链路关系:
match(n:APP{appName:"风险预警系统"})-[r:BELONG]-(m:DATA_SOURCE)-[r1:CONTAIN]-(h:APP_TABLE)<-[r2:INPUT]-(j:TASK)<-[r3:INPUT]-(g:ODPS_TABLE) return n.appName,m.dsName,h.appTableName,j.taskName,g.tableNameEn
去重:
match(n:APP{appName:"风险预警系统"})-[r:BELONG]-(m:DATA_SOURCE)-[r1:CONTAIN]-(h:APP_TABLE)<-[r2:INPUT]-(j:TASK)<-[r3:INPUT]-(g:ODPS_TABLE) return distinct g.tableNameEn,g.tableNameCn

(5)表被使用情况

match(n:ODPS_TABLE)-[r:INPUT]->(m:TASK) with n,count(r) as count_r where count_r >=2 return n.tableNameEn,count_r order by count_r desc

(6)复杂查询,查询推送到应用库名为监管对象系统的数据血缘信息,包括应用端表名,数仓模型表名,数源表名

MATCH  (a:APP{appName:'监管对象系统'})<-[r:BELONG]-(m:DATA_SOURCE)-[r1:CONTAIN]->(b:APP_TABLE)-[r2:INPUT]-(c:TASK)-[r3:INPUT]-(d:JG_DATA_CENTER_TABLE)<-[:INPUT]-(d1:TASK)-[:INPUT]-(d2:JG_DATA_CENTER_TABLE)<-[*..10]-(e1:TASK)<-[:INPUT]-(e:PUB_DATA_PLATFORM_TABLE) where not m.dsName = 'hlwjg_jgdxfw_test' and e1.taskTypeName = '正常任务'   RETURN distinct m.dsName,m.dsDesc,b.appTableName,c.taskName,c.taskTypeName,d.tableNameEn,d.tableNameCn,d1.taskName,d2.tableNameEn,d2.tableNameCn,e1.taskName,e.tableNameEn,e.tableNameCn

5、环境

Neo4j测试环境:

url : http://192.168.20.49:7474/browser/

username : neo4j

password : 12345678

git代码:

1、采集:

http://59.202.42.100/data-odps/pyodps_utils.git

wangzhifei/Other/互联网+监管

2、Neo4j

http://59.202.42.100/data-odps/datarelatedneo4j.git