1、采集数据血缘数据
背景
构建数据血缘主要解决两个问题,查和用。
- 查:根据表中文注释、或表名进行模糊查询对应表;已知表查找上下游输入、输出
- 用:快速找到目标业务模型表、判断表是否有下游能否直接进行删除(odps数据地图中,一些任务没有配置依赖)
血缘基本组成
数据血缘基本组成及数据结构
- 表配置信息:表中英文名、创建者、创建时间、生命周期、是否分区表…
- 任务实例信息:节点ID、节点类型、状态、调度类型、实例父节点ID列表、实例子节点ID列表
- 表输入、输出信息:节点ID、节点类型、状态、输入表列表、输出表列表(odps_sql, odps_script、数据集成)
- 数据源信息:数据源名称、数据源类型、数据源地址、
血缘采集优化
优化要点
- 提高血缘采集效率,缩短采集时间(通过fileList 获取文件ID,本级版本号,更换原有URL 访问接口)
- 在全量采集基础上实现增量更新(基于文件修改的最新时间,完成增量更新)
- 统一对5个odps空间完成血缘采集(提取空间账户信息到配置文件)
获取文件列表:
https://ide.cloud.zj.gov.cn/rest/folder/module/list?projectId={0}&tenantId={1}&useType=0&labelLevels=2&pageNum=1&pageSize=100000'
提取fileId、cloudUuid,lastModifyTime(进行增量更新)
(1)获取所有节点
https://wkbench.cloud.zj.gov.cn/workbench/cwf/node/list?projectId=32762&tenantId=10023&env=prod&sortOrder=&sortField=&includeRelation=false&pageNum=0&pageSize=20&forceUpdateId=0&expand=true&prgTypes=&modifyTime=&nodeType=&searchText=&owner=&solId=&bizId=&resgroupId=&baseLineId=
(2)获取节点上下游关系
父节点:
https://wkbench.cloud.zj.gov.cn/workbench/cwf/node/getNodeListByDepth?projectId=32762&tenantId=10023&env=prod&relation=parent&nodeId={nodeId}&depth=1&detail=false
子节点:
https://wkbench.cloud.zj.gov.cn/workbench/cwf/node/getNodeListByDepth?projectId=32762&tenantId=10023&env=prod&relation=child&nodeId={nodeId}&depth=1&detail=false
(3)获取节点和表关系【包括SQL/数据集成】
获取脚本内容,解析读表、写表
https://wkbench.cloud.zj.gov.cn/workbench/cwf/node/code?projectId=32762&tenantId=10023&env=prod&nodeId={nodeId}
替换后(根据文件ID、文件版本号提取):
https://ide.cloud.zj.gov.cn/rest/file/version?projectId={0}&tenantId={1}&fileId={2}&versions={3}
(4)获取数据源
https://di.cloud.zj.gov.cn/web/di/datasource/list?projectId=32762&tenantId=10023&searchType=&pageSize=1000&pageNum=1
血缘统一采集脚本执行操作
每个脚本完成表、任务、实例信息采集,可以给定一个空间名,或者不指定并行采集5个空间血缘信息
OdpsTableConfig.py ## 采集空间表信息,全量更新
SpiderOdpsFileInfo.py ## 采集空间表信息,全量更新
SpiderTableTaskRela.py ## 采集任务及表依赖信息,增量更新时需指定比较日期(默认前一天),否则标记初始化值为1
OdpsDatasourceConfig.py ## 采集空间数据源信息,全量更新
各个空间血缘信息除了更新到各个前置库表内,同时更新到图数据库对应表。
2、图谱建模
(1)实体模型
序号 | 节点类型 | 主键 |
---|---|---|
1 | ODPS开发人员 | name |
2 | ODPS任务 | taskId |
3 | ODPS表 | tableNameEn |
4 | ODPS数据源 | dsName |
5 | 应用 | appName |
6 | 应用表 | appTableName |
(2)关系模型
实体A | 实体A主键 | 实体B | 实体B主键 | 关系类型 |
---|---|---|---|---|
数据源 | dsName | 应用表 | tableName | 包含 |
数据源 | dsName | 应用 | appName | 属于 |
ODPS开发人员 | name | 任务 | taskId | 创建 |
ODPS开发人员 | name | ODPS表 | tableNameEn | 创建 |
任务 | taskId | 应用表 | appTableName | 输入 |
应用表 | appTableName | 任务 | taskId | 输入 |
任务 | taskId | ODPS表 | tableNameEn | 输入 |
ODPS表 | tableNameEn | 任务 | taskId | 输入 |
任务 | taskId | 任务 | taskId | 输入 |
3、图谱数据入库
(1) 搭建工程
(2)逻辑开发
(3)性能调优
1> 创建索引
2> 线程池及重试机制
3> 使用SSD
4、应用场景
Cypher 语法执行查询操作,参考文档,
https://neo4j.com/docs/cypher-manual/3.5/syntax/
https://neo4j.com/docs/cypher-manual/3.5/syntax/patterns/
(1)查询ODPS中无父节点节点
match (n:TASK) where not ((n:TASK)<-[:INPUT]-(:TASK)) return n
(2)查询任务血缘
match(n:TASK{taskId:"2174800"})-[r:INPUT]->(m:TASK) return n,r,m
(3)查询表血缘
match(n:ODPS_TABLE{tableNameEn:"dim_jgsx"})-[r:INPUT]->(m:TASK)-[r1:INPUT]->(h:ODPS_TABLE) return n,r,m,r1,h
(4)查看某个应用下用到的ODPS的表
整个链路关系:
match(n:APP{appName:"风险预警系统"})-[r:BELONG]-(m:DATA_SOURCE)-[r1:CONTAIN]-(h:APP_TABLE)<-[r2:INPUT]-(j:TASK)<-[r3:INPUT]-(g:ODPS_TABLE) return n.appName,m.dsName,h.appTableName,j.taskName,g.tableNameEn
去重:
match(n:APP{appName:"风险预警系统"})-[r:BELONG]-(m:DATA_SOURCE)-[r1:CONTAIN]-(h:APP_TABLE)<-[r2:INPUT]-(j:TASK)<-[r3:INPUT]-(g:ODPS_TABLE) return distinct g.tableNameEn,g.tableNameCn
(5)表被使用情况
match(n:ODPS_TABLE)-[r:INPUT]->(m:TASK) with n,count(r) as count_r where count_r >=2 return n.tableNameEn,count_r order by count_r desc
(6)复杂查询,查询推送到应用库名为监管对象系统的数据血缘信息,包括应用端表名,数仓模型表名,数源表名
MATCH (a:APP{appName:'监管对象系统'})<-[r:BELONG]-(m:DATA_SOURCE)-[r1:CONTAIN]->(b:APP_TABLE)-[r2:INPUT]-(c:TASK)-[r3:INPUT]-(d:JG_DATA_CENTER_TABLE)<-[:INPUT]-(d1:TASK)-[:INPUT]-(d2:JG_DATA_CENTER_TABLE)<-[*..10]-(e1:TASK)<-[:INPUT]-(e:PUB_DATA_PLATFORM_TABLE) where not m.dsName = 'hlwjg_jgdxfw_test' and e1.taskTypeName = '正常任务' RETURN distinct m.dsName,m.dsDesc,b.appTableName,c.taskName,c.taskTypeName,d.tableNameEn,d.tableNameCn,d1.taskName,d2.tableNameEn,d2.tableNameCn,e1.taskName,e.tableNameEn,e.tableNameCn
5、环境
Neo4j测试环境:
url : http://192.168.20.49:7474/browser/
username : neo4j
password : 12345678
git代码:
1、采集:
http://59.202.42.100/data-odps/pyodps_utils.git
wangzhifei/Other/互联网+监管
2、Neo4j