1.数据仓库元数据管理

元数据(MetaData)狭义的解释是用来描述数据的数据。广义的来看,除了业务逻辑直接读写处理的那些业务数据,所有其它用来维持整个系统运转所需的信息/数据都可以叫作元数据。
管理元数据的目的,是为了让用户能够更高效的使用数据,也是为了让平台管理人员能更加有效的做好数据的维护管理工作。
常见的元数据信息包括:

  • 表结构信息
  • 数据的空间存储,读写记录,权限归属和其它各类统计信息
  • 数据的血缘关系信息
  • 数据的业务属性信息

数据血缘关系(血缘信息,lineage血统信息):数据之间的上下游来源去向关系,数据从哪里来到哪里去。
数据的业务属性信息:一张数据表的统计口径信息,这张表干什么用的,各个字段的具体统计方式,业务描述,业务标签,脚本逻辑的历史变迁记录,变迁原因等

2.Atlas介绍

2015年由Hortonworks公司主导开发
Atlas是Hadoop平台元数据框架;
Atlas是一组可扩展的核心基础治理服务,使企业能够有效,高效地满足Hadoop中的合规性要求,并能与整个企业数据生态系统集成;
Apache Atlas为组织提供了开放的元数据管理和治理功能,以建立数据资产的目录,对这些资产进行分类和治理,并为IT团队、数据分析团队提供围绕这些数据资产的协作功能。
image.png
Atlas由元数据的收集,存储和查询展示三部分核心组件组成
Atlas包括以下组件:
Core。Atlas功能核心组件,提供元数据的获取与导出(Ingets/Export)、类型系统(Type System)、元数据存储索引查询等核心功能
Integration。Atlas对外集成模块。外部组件的元数据通过该模块将元数据交给Atlas管理
Metadata source。Atlas支持的元数据数据源,以插件形式提供。当前支持从以下来源提取和管理元数据:

  • Hive
  • HBase
  • Sqoop
  • Kafka
  • Storm

Applications。Atlas的上层应用,可以用来查询由Atlas管理的元数据类型和对象
Graph Engine(图计算引擎)。Altas使用图模型管理元数据对象。图数据库提供了极大的灵活性,并能有效处理元数据对象之间的关系。除了管理图对象之外,图计算引擎还为元数据对象创建适当的索引,以便进行高效的访问。在
Atlas 1.0 之前采用Titan作为图存储引擎,从1.0开始采用 JanusGraph 作为图存储引擎。JanusGraph 底层又分为两块:

  • Metadata Store。采用 HBase 存储 Atlas 管理的元数据;
  • Index Store。采用Solr存储元数据的索引,便于高效搜索;

    3.安装配置

    3.1安装依赖

    Maven 3.6.3(完成)
    HBase 1.1.2(不需要安装,需要软件包)
    Solr 5.5.1(不需要安装,需要软件包)
    atlas 1.2.0(需要编译)
    由于官方没有提供二进制的安装版本,所以Altas需要编译安装

    3.2安装步骤

    1.软件包
    apache-atlas-1.2.0-sources.tar.gz
    solr-5.5.1.tgz
    hbase-1.1.2.tar.gz
    2.解压缩源码,修改配置

    1. # 解压缩
    2. cd /opt/lagou/software
    3. tar zxvf apache-atlas-1.2.0-sources.tar.gz
    4. cd apache-atlas-sources-1.2.0/
    5. # 修改配置
    6. vi pom.xml
    7. # 修改
    8. 645 <npm-for-v2.version>3.10.8</npm-for-v2.version>
    9. 652 <hadoop.version>2.9.2</hadoop.version>

    3.将Hbase,Solr的包拷贝到对应的目录中

    1. cd /opt/lagou/software/apache-atlas-sources-1.2.0
    2. # 创建目录
    3. cd distro/
    4. mkdir solr
    5. mkdir hbase
    6. # 拷贝软件包
    7. cp /opt/lagou/software/solr-5.5.1.tgz ./solr/
    8. cp /opt/lagou/software/hbase-1.1.2.tar.gz ./hbase/

    4、maven设置阿里镜像

    1. cd $MAVEN_HOME/conf
    2. # 在配置文件中添加
    3. vi settings.xml
    4. # 加在 158 行后
    5. <mirror>
    6. <id>alimaven</id>
    7. <name>aliyun maven</name>
    8. <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    9. <mirrorOf>central</mirrorOf>
    10. </mirror>

    5.Atlas编译

    1. cd /opt/lagou/software/apache-atlas-sources-1.2.0
    2. export MAVEN_OPTS="-Xms2g -Xmx2g"
    3. mvn clean -DskipTests package -Pdist,embedded-hbase-solr

    编译完的软件位置:/opt/lagou/software/apache-atlas-sources-1.2.0/distro/target
    编译完的软件:apache-atlas-1.2.0-bin.tar.gz
    6.Atlas安装

    1. cd /opt/lagou/software/apache-atlas-sources-
    2. 1.2.0/distro/target
    3. # 解压缩
    4. tar zxvf apache-atlas-1.2.0-bin.tar.gz
    5. mv apache-atlas-1.2.0/ /opt/lagou/servers/atlas-1.2.0
    6. # 修改 /etc/profile,设置环境变量 ATLAS_HOME
    7. # 启动服务(第一次启动服务的时间比较长)
    8. cd $ATLAS_HOME/bin
    9. ./atlas_start.py
    10. # 检查后台进程 (1个atlas、2个HBase、1个solr后台进程)
    11. ps -ef | grep atlas
    12. /opt/lagou/servers/atlas-1.2.0/server/webapp/atlas
    13. hbase-daemon.sh
    14. org.apache.hadoop.hbase.master.HMaster
    15. /opt/lagou/servers/atlas-1.2.0/solr/server
    16. # 停止服务
    17. ./atlas_stop.py

    检查solr的状态:

    1. cd /opt/lagou/servers/atlas-1.2.0/solr/bin
    2. ./solr status
    3. Solr process 25038 running on port 9838
    4. {
    5. "solr_home":"/opt/lagou/servers/atlas-
    6. 1.2.0/solr/server/solr",
    7. "version":"5.5.1 c08f17bca0d9cbf516874d13d221ab100e5b7d58 -
    8. anshum - 2016-04-30 13:28:18",
    9. "startTime":"2020-07-31T11:58:45.638Z",
    10. "uptime":"0 days, 14 hours, 55 minutes, 11 seconds",
    11. "memory":"54.8 MB (%11.2) of 490.7 MB",
    12. "cloud":{
    13. "ZooKeeper":"localhost:2181",
    14. "liveNodes":"1",
    15. "collections":"3"}}

    检查zk状态:

    1. echo stat|nc localhost 2181

    Web服务:http://linux122:21000/login.jsp
    用户名/口令:admin/admin
    账号的信息存储在文件 conf/users-credentials.properties 中。其中 Password 通过如
    下方式产生sha256sum 摘要信息:

    1. echo -n 1 "admin" | sha256sum

    4.Hive血缘关系导入
    配置HIVE_HOME环境变量;将 $ATLAS_HOME/conf/atlasapplication.properties 拷贝到 $HIVE_HOME/conf 目录下

    1. ln -s $ATLAS_HOME/conf/atlas-application.properties
    2. $HIVE_HOME/conf/atlas-application.properties

    拷贝jar包

    # $ATLAS_HOME/server/webapp/atlas/WEB-INF/lib/ 目录下的3个jar,
    拷贝到 $ATLAS_HOME/hook/hive/atlas-hive-plugin-impl/ 目录下
    jackson-jaxrs-base-2.9.9.jar
    jackson-jaxrs-json-provider-2.9.9.jar
    jackson-module-jaxb-annotations-2.9.9.jar
    ln -s $ATLAS_HOME/server/webapp/atlas/WEB-INF/lib/jacksonjaxrs-
    base-2.9.9.jar $ATLAS_HOME/hook/hive/atlas-hive-pluginimpl/
    jackson-jaxrs-base-2.9.9.jar
    ln -s $ATLAS_HOME/server/webapp/atlas/WEB-INF/lib/jacksonjaxrs-
    json-provider-2.9.9.jar $ATLAS_HOME/hook/hive/atlashive-
    plugin-impl/jackson-jaxrs-json-provider-2.9.9.jar
    ln -s $ATLAS_HOME/server/webapp/atlas/WEB-INF/lib/jacksonmodule-
    jaxb-annotations-2.9.9.jar $ATLAS_HOME/hook/hive/atlashive-
    plugin-impl/jackson-module-jaxb-annotations-2.9.9.jar
    

    修改hive的配置
    hive-site.xml

    <property>
    <name>hive.exec.post.hooks</name>
    <value>org.apache.atlas.hive.hook.HiveHook</value>
    </property>
    

    $HIVE_HOME/conf/hive-env.sh中添加HIVE_AUX_JARS_PATH变量

    export HIVE_AUX_JARS_PATH=/opt/lagou/servers/atlas-1.2.0/hook/hive
    

    批量导入hive数据
    Hive能正常启动;在执行的过程中需要用户名/口令:admin/admin

    import-hive.sh
    

    成功导出可以看见最后的提示信息:Hive Meta Data imported successfully!!!
    在浏览器中可以看见:Search 中的选项有变化
    Hive hook 可捕获以下操作:

  • create database

  • create table/view, create table as select
  • load, import, export
  • DMLs (insert)
  • alter database
  • alter table
  • alter view

    5.与电商业务集成

    开发(建库、建表) => 导入数据 => 执行Hive脚本=>导入Hive的血缘关系
    建表
    -- 创建DataBases;
    CREATE DATABASE ODS;
    CREATE DATABASE DIM;
    CREATE DATABASE DWD;
    CREATE DATABASE DWS;
    CREATE DATABASE ADS;
    -- 创建ODS表
    DROP TABLE IF EXISTS `ods.ods_trade_orders`;
    CREATE EXTERNAL TABLE `ods.ods_trade_orders`(
    `orderid` int,
    `orderno` string,
    `userid` bigint,
    `status` tinyint,
    `productmoney` decimal(10,0),
    `totalmoney` decimal(10,0),
    `paymethod` tinyint,
    `ispay` tinyint,
    `areaid` int,
    `tradesrc` tinyint,
    `tradetype` int,
    `isrefund` tinyint,
    `dataflag` tinyint,
    `createtime` string,
    `paytime` string,
    `modifiedtime` string)
    COMMENT '订单表'
    PARTITIONED BY (`dt` string)
    row format delimited fields terminated by ','
    location '/user/data/trade.db/orders/';
    DROP TABLE IF EXISTS `ods.ods_trade_order_product`;
    CREATE EXTERNAL TABLE `ods.ods_trade_order_product`(
    `id` string,
    `orderid` decimal(10,2),
    `productid` string,
    `productnum` string,
    `productprice` string,
    `money` string,
    `extra` string,
    `createtime` string)
    COMMENT '订单明细表'
    PARTITIONED BY (`dt` string)
    row format delimited fields terminated by ','
    location '/user/data/trade.db/order_product/';
    DROP TABLE IF EXISTS `ods.ods_trade_product_info`;
    CREATE EXTERNAL TABLE `ods.ods_trade_product_info`(
    `productid` bigint,
    `productname` string,
    `shopid` string,
    `price` decimal(10,0),
    `issale` tinyint,
    `status` tinyint,
    `categoryid` string,
    `createtime` string,
    `modifytime` string)
    COMMENT '产品信息表'
    PARTITIONED BY (`dt` string)
    row format delimited fields terminated by ','
    location '/user/data/trade.db/product_info/';
    DROP TABLE IF EXISTS `ods.ods_trade_product_category`;
    CREATE EXTERNAL TABLE `ods.ods_trade_product_category`(
    `catid` int,
    `parentid` int,
    `catname` string,
    `isshow` tinyint,
    `sortnum` int,
    `isdel` tinyint,
    `createtime` string,
    `level` tinyint)
    COMMENT '产品分类表'
    PARTITIONED BY (`dt` string)
    row format delimited fields terminated by ','
    location '/user/data/trade.db/product_category';
    DROP TABLE IF EXISTS `ods.ods_trade_shops`;
    CREATE EXTERNAL TABLE `ods.ods_trade_shops`(
    `shopid` int,
    `userid` int,
    `areaid` int,
    `shopname` string,
    `shoplevel` tinyint,
    `status` tinyint,
    `createtime` string,
    `modifytime` string)
    COMMENT '商家店铺表'
    PARTITIONED BY (`dt` string)
    row format delimited fields terminated by ','
    location '/user/data/trade.db/shops';
    DROP TABLE IF EXISTS `ods.ods_trade_shop_admin_org`;
    CREATE EXTERNAL TABLE `ods.ods_trade_shop_admin_org`(
    `id` int,
    `parentid` int,
    `orgname` string,
    `orglevel` tinyint,
    `isdelete` tinyint,
    `createtime` string,
    `updatetime` string,
    `isshow` tinyint,
    `orgType` tinyint)
    COMMENT '商家地域组织表'
    PARTITIONED BY (`dt` string)
    row format delimited fields terminated by ','
    location '/user/data/trade.db/shop_org/';
    DROP TABLE IF EXISTS `ods.ods_trade_payments`;
    CREATE EXTERNAL TABLE `ods.ods_trade_payments`(
    `id` string,
    `paymethod` string,
    `payname` string,
    `description` string,
    `payorder` int,
    `online` tinyint)
    COMMENT '支付方式表'
    PARTITIONED BY (`dt` string)
    row format delimited fields terminated by ','
    location '/user/data/trade.db/payments/';
    -- 创建DIM表
    DROP TABLE IF EXISTS dim.dim_trade_product_cat;
    create table if not exists dim.dim_trade_product_cat(
    firstId int, -- 一级商品分类id
    firstName string, -- 一级商品分类名称
    secondId int, -- 二级商品分类Id
    secondName string, -- 二级商品分类名称
    thirdId int, -- 三级商品分类id
    thirdName string -- 三级商品分类名称
    )
    partitioned by (dt string)
    STORED AS PARQUET;
    drop table if exists dim.dim_trade_shops_org;
    create table dim.dim_trade_shops_org(
    shopid int,
    shopName string,
    cityId int,
    cityName string ,
    regionId int ,
    regionName string
    )
    partitioned by (dt string)
    STORED AS PARQUET;
    drop table if exists dim.dim_trade_payment;
    create table if not exists dim.dim_trade_payment(
    paymentId string, -- 支付方式id
    paymentName string -- 支付方式名称
    )
    partitioned by (dt string)
    STORED AS PARQUET;
    drop table if exists dim.dim_trade_product_info;
    create table dim.dim_trade_product_info(
    `productId` bigint,
    `productName` string,
    `shopId` string,
    `price` decimal,
    `isSale` tinyint,
    `status` tinyint,
    `categoryId` string,
    `createTime` string,
    `modifyTime` string,
    `start_dt` string,
    `end_dt` string
    ) COMMENT '产品表'
    STORED AS PARQUET;
    -- 创建DWD表
    -- 订单事实表(拉链表)
    DROP TABLE IF EXISTS dwd.dwd_trade_orders;
    create table dwd.dwd_trade_orders(
    `orderId` int,
    `orderNo` string,
    `userId` bigint,
    `status` tinyint,
    `productMoney` decimal,
    `totalMoney` decimal,
    `payMethod` tinyint,
    `isPay` tinyint,
    `areaId` int,
    `tradeSrc` tinyint,
    `tradeType` int,
    `isRefund` tinyint,
    `dataFlag` tinyint,
    `createTime` string,
    `payTime` string,
    `modifiedTime` string,
    `start_date` string,
    `end_date` string
    ) COMMENT '订单事实拉链表'
    partitioned by (dt string)
    STORED AS PARQUET;
    -- 创建DWS表
    DROP TABLE IF EXISTS dws.dws_trade_orders;
    create table if not exists dws.dws_trade_orders(
    orderid string, -- 订单id
    cat_3rd_id string, -- 商品三级分类id
    shopid string, -- 店铺id
    paymethod tinyint, -- 支付方式
    productsnum bigint, -- 商品数量
    paymoney double, -- 订单商品明细金额
    paytime string -- 订单时间
    )
    partitioned by (dt string)
    STORED AS PARQUET;
    -- 订单明细表宽表
    DROP TABLE IF EXISTS dws.dws_trade_orders_w;
    create table if not exists dws.dws_trade_orders_w(
    orderid string, -- 订单id
    cat_3rd_id string, -- 商品三级分类id
    thirdname string, -- 商品三级分类名称
    secondname string, -- 商品二级分类名称
    firstname string, -- 商品一级分类名称
    shopid string, -- 店铺id
    shopname string, -- 店铺名
    regionname string, -- 店铺所在大区
    cityname string, -- 店铺所在城市
    paymethod tinyint, -- 支付方式
    productsnum bigint, -- 商品数量
    paymoney double, -- 订单明细金额
    paytime string -- 订单时间
    )
    partitioned by (dt string)
    STORED AS PARQUET;
    -- 创建ADS表
    -- ADS层订单分析表
    DROP TABLE IF EXISTS ads.ads_trade_order_analysis;
    create table if not exists ads.ads_trade_order_analysis(
    areatype string, -- 区域范围:区域类型(全国、大
    区、城市)
    regionname string, -- 区域名称
    cityname string, -- 城市名称
    categorytype string, -- 商品分类类型(一级、二级)
    category1 string, -- 商品一级分类名称
    category2 string, -- 商品二级分类名称
    totalcount bigint, -- 订单数量
    total_productnum bigint, -- 商品数量
    totalmoney double -- 支付金额
    )
    partitioned by (dt string)
    row format delimited fields terminated by ',';
    
    使用Sqoop加载数据
    sqoop import \
    --connect jdbc:mysql://linux123:3306/ebiz \
    --username hive \
    --password 12345678 \
    --target-dir /user/data/trade.db/orders/dt=2020-07-21/ \
    --table lagou_trade_orders \
    --delete-target-dir \
    --num-mappers 1 \
    --fields-terminated-by ','
    sqoop import \
    --connect jdbc:mysql://linux123:3306/ebiz \
    --username hive \
    --password 12345678 \
    --target-dir /user/data/trade.db/payments/dt=2020-07-21/ \
    --table lagou_payments \
    --delete-target-dir \
    --num-mappers 1 \
    --fields-terminated-by ','
    sqoop import \
    --connect jdbc:mysql://linux123:3306/ebiz \
    --username hive \
    --password 12345678 \
    --target-dir /user/data/trade.db/product_category/dt=2020-07-
    21/ \
    --table lagou_product_category \
    --delete-target-dir \
    --num-mappers 1 \
    --fields-terminated-by ','
    sqoop import \
    --connect jdbc:mysql://linux123:3306/ebiz \
    --username hive \
    --password 12345678 \
    --target-dir /user/data/trade.db/product_info/dt=2020-07-21/ \
    --table lagou_product_info \
    --delete-target-dir \
    --num-mappers 1 \
    --fields-terminated-by ','
    sqoop import \
    --connect jdbc:mysql://linux123:3306/ebiz \
    --username hive \
    --password 12345678 \
    --target-dir /user/data/trade.db/order_product/dt=2020-07-21/
    \
    --table lagou_order_product \
    --delete-target-dir \
    --num-mappers 1 \
    --fields-terminated-by ','
    sqoop import \
    --connect jdbc:mysql://linux123:3306/ebiz \
    --username hive \
    --password 12345678 \
    --target-dir /user/data/trade.db/shop_org/dt=2020-07-21/ \
    --table lagou_shop_admin_org \
    --delete-target-dir \
    --num-mappers 1 \
    --fields-terminated-by ','
    sqoop import \
    --connect jdbc:mysql://linux123:3306/ebiz \
    --username hive \
    --password 12345678 \
    --target-dir /user/data/trade.db/shops/dt=2020-07-21/ \
    --table lagou_shops \
    --delete-target-dir \
    --num-mappers 1 \
    --fields-terminated-by ','
    alter table ods.ods_trade_orders add partition(dt='2020-07-
    21');
    alter table ods.ods_trade_payments add partition(dt='2020-07-
    21');
    alter table ods.ods_trade_product_category add
    partition(dt='2020-07-21');
    alter table ods.ods_trade_product_info add partition(dt='2020-
    07-21');
    alter table ods.ods_trade_order_product add
    partition(dt='2020-07-21');
    alter table ods.ods_trade_shop_admin_org add
    partition(dt='2020-07-21');
    alter table ods.ods_trade_shops add partition(dt='2020-07-
    21');
    
    脚本
    # 加载DIM层数据
    sh /data/lagoudw/script/trade/dim_load_product_cat.sh 2020-07-
    21
    sh /data/lagoudw/script/trade/dim_load_shop_org.sh 2020-07-21
    sh /data/lagoudw/script/trade/dim_load_payment.sh 2020-07-21
    sh /data/lagoudw/script/trade/dim_load_product_info.sh 2020-
    07-21
    # 加载DWD层数据
    sh /data/lagoudw/script/trade/dwd_load_trade_orders.sh 2020-
    07-21
    # 加载DWS层数据
    sh /data/lagoudw/script/trade/dws_load_trade_orders.sh 2020-
    07-21
    # 加载ADS层数据
    sh /data/lagoudw/script/trade/ads_load_trade_order_analysis.sh
    2020-07-21
    
    创建 Classfication:order_analysis
    创建Glossary:ODS层 => 电商业务
    查看血缘关系 ads_trade_order_analysis: