第一部分 数据仓库理论

1数据仓库

1.1什么是数据仓库

1988年,IBM公司第一次提出信息仓库(Information Warehouse)的概念
1991年,Bill Inmon(比尔恩门)出版《Buliding the Data Warehouse》标志数据仓库概念的确立。数据仓库(Data Warehouse)是一个面向主题的(Subject Oriented)、集成的(Integrated)、相对稳定的(Non-Volatile)、反映历史变化的(Time Variant)数据集合,用于支持管理决策(Decision-Making Support)。Bill Inmon被称为数据仓库之父

1.2数据仓库四大特征

面向主题
与传统数据库面向应用进行数据组织的特点相对应,数据仓库中的数据是面向主题进行组织的。
主题是一个抽象的概念,是较高层次上企业信息系统中的数据综合、归类并进行分析利用的抽象。
面向主题的数据组织方式,就是在较高层次上对分析对象的数据的一个完整、一致的描述,能完整、统一地刻划各个分析对象所涉及的企业的各项数据,以及数据之间的联系。
集成的
数据仓库中的数据会从多个数据源中获取,这些数据源包括多种类型数据库、文件系统以及Internet网上数据等,它们通过数据集成而形成数据仓库中的数据。
稳定的
数据仓库的用户对数据的操作大多是数据查询或比较复杂的挖掘,一旦数据进入数据仓库以后,一般情况下被较长时间保留。数据经加工和集成进入数据仓库后是极少更新的,通常只需要定期的加载和更新。
反映历史变化的
数据仓库包含各种粒度的历史数据。虽然数据仓库不会修改数据,但并不是说数据仓库的数据是永远不变的。
数据仓库的数据随时间的变化表现在:

  • 数据仓库的数据时限一般要远远长于操作型数据的数据时限
  • 业务系统存储的是当前数据,而数据仓库中的数据是历史数据
  • 数据仓库中的数据是按照时间顺序追加的,都带有时间属性

    1.3 数据仓库作用

    整合企业业务数据,建立统一的数据中心;
    产生业务报表,了解企业的经营状况;
    为企业运营、决策提供数据支持;
    可以作为各个业务的数据源,形成业务数据互相反馈的良性循环;
    分析用户行为数据,通过数据挖掘来降低投入成本,提高投入效果;
    开发数据产品,直接或间接地为企业盈利;

    1.4数据仓库与数据库的区别

    OLTP(On-Line Transaction Processing 联机事务处理),主要针对具体业务在数据库系统的日常操作,通常对少数记录进行查询、修改。
    OLAP(On-Line Analytical Processing 联机分析处理),一般针对某些主题的历史数据进行分析,支持管理决策。
    数据仓库的出现并不是要取代数据库

  • 数据库是面向事务的设计,数据仓库是面向主题设计的

  • 数据库存储有限期限的业务数据,数据仓库存储的是企业历史数据
  • 数据库设计尽量避免冗余,数据存储设计满足第三范式,但是便于进行数据分析。数据仓库在设计时有意引入冗余,依照分析需求,分析维度、分析指标进行设计
  • 数据库是为捕获数据而设计,数据仓库是为分析数据而设计

image.png

1.5数据集市

数据仓库(DW)是一种反映主题的全局性数据组织。但全局性数据仓库往往太大,在实际应用中将它们按部门或业务分别建立反映各个子主题的局部性数据组织,即数据集市(DataMart),有时也称它为部门数据仓库。数据集市,用于支持部门级的数据分析与决策
数据孤岛,数据间缺乏关联性,彼此无法兼容

2数据仓库建模方法

数据模型就是数据组织和存储方法,它强调从业务、数据存取和使用角度合理存储数据。

2.1ER模型

Bill Inmon提出从全企业的高度设计的3NF模型,用实体关系ER模型描述企业业务。
特点:

  • 需要全面了解整个企业业务和数据
  • 实施周期非常长
  • 对建模人员能力要求高

建模阶段:

  • 高层模型:一个高度抽象的模型,描述主要的主题以及主题间的关系,用于描述企业的业务总体概况
  • 中层模型:在高层模型的基础上,细化主题的数据项
  • 物理模型(也叫底层模型):在中层模型的基础上,考虑物理存储,同时基于性能和平台特点进行物理属性的设计,也可能做一 些表的合并、分区的设计等

    2.2维度模型

    Ralph Kimball大师倡导,他的《数据仓库工具箱》是数据仓库领域最流行的数据仓库建模金典
    维度建模从分析决策的需求出发构建模型,为分析需求服务,重点关注用户如何更快速地完成需求分析,同时具有较好的大规模复杂查询的响应性能。
    其设计分为以下几个步骤:

  • 选择需要进行分析决策的业务过程。

  • 选择数据的粒度。
  • 识别维表。
  • 选择事实。确定分析需要衡量的指标

维度模型特点:
对技术要求不高,快速上手,敏捷迭代,快速交付;更快速完成分析需求,较好的大规模复杂查询的响应性能。

3数据仓库分层

分层的主要原因是在管理数据的时候,能对数据有一个更加清晰的掌控。从以下方面来说:

  • 清晰的数据结构

每一个数据分层都有它的作用域,在使用表的时候能更方便地定位和理解。

  • 将复杂的问题简单化

将一个复杂的任务分解成多个步骤来完成,每一层只处理单一的问题,比较简单和容易理解。而且便于维护数据的准确性,当数据出现问题之后,可以不用修复所有的数据,只需要从有问题的地方开始修复。

  • 减少重复开发

规范数据分层,开发一些通用的中间层数据,能够减少极大的重复计算。

  • 屏蔽原始数据的异常

屏蔽业务的影响,不必改一次业务就需要重新接入数据。

  • 数据血缘的追踪

最终给业务呈现的是一个能直接使用业务表,但是它的来源很多,如果有一张来源表出问题了,借助血缘最终能够快速准确地定位到问题,并清楚它的危害范围。
常见分为三层:数据操作层,数据仓库层应用数据层(数据集市层)
image.png

ODS(Operation Data Store 数据准备区)。数据仓库源头系统的数据表通常会原封不动的存储一份,这称为ODS层,也称为准备区。它们是后续数据仓库层加工数据的来源。ODS层数据的主要来源包括:

  • 业务数据库:可使用DataX、Sqoop等工具来抽取,每天定时抽取一次;
  • 埋点数据:线上系统会打入各种日志,这些日志一般以文件的形式保存,可以用 Flume 定时抽取;
  • 其他数据源:从第三方购买的数据、或是网络爬虫抓取的数据;

DW(Data Warehouse 数据仓库层)。包含DWD、DWS、DIM层,由ODS层数据加工而成。主要完成数据加工与整合,建立一致性的维度,构建可复用的面向分析和统计的明细事实表,以及汇总公共粒度的指标。

  • DWD(Data Warehouse Detail 细节数据层),是业务层与数据仓库的隔离层。以业务过程作为建模驱动,基于每个具体的业务过程特点,构建细粒度的明细层事实表。可以结合企业的数据使用特点,将明细事实表的某些重要维度属性字段做适当冗余,也即宽表化处理;
  • DWS(Data Warehouse Service 服务数据层),基于DWD的基础数据,整合汇总成分析某一个主题域的服务数据。以分析的主题为建模驱动,基于上层的应用和产品的指标需求,构建公共粒度的汇总指标事实表;
  • 公共维度层(DIM):基于维度建模理念思想,建立一致性维度;
  • TMP层 :临时层,存放计算过程中临时产生的数据;

ADS(Application Data Store 应用数据层)。基于DW数据,整合汇总成主题域的服务数据,用于提供后续的业务查询等。

4数据仓库模型

4.1事实表与维度表

在数据仓库中,保存度量值的详细值或事实的表称为事实表。
常见事实表:订单事实表
事实表的特点:表多(各种各样的事实表);数据量大
事实表根据数据的粒度可以分为:事务事实表、周期快照事实表、累计快照事实表
维度表(维表)可以看作是用来分析数据的角度,纬度表中包含事实数据表中事实记录的特性。这些特性为分析者提供有用的信息。
常见维度表:时间维度、地域维度、商品维度

  • 事实表是关注的内容(如:销售额、销售量)
  • 维表是观察事务的角度

    4.2事实表分类

    1.事务事实表
    事务事实表记录的事务层面的事实,保存的是最原子的数据,也称“原子事实表”。
    一旦事务被提交,事实表数据被插入,数据就不再进行更改,其更新方式为增量更新。
    事务事实表的日期维度记录的是事务发生的日期,它记录的事实是事务活动的内容。
    2.周期快照事实表
    周期快照事实表以具有规律性的、可预见的时间间隔来记录事实,时间间隔如每天、每月、每年等等。。它统计的是间隔周期内的度量统计,如历史至今、自然年至今、季度至今等等。周期快照事实表的粒度是每个时间段一条记录,通常比事务事实表的粒度要粗,是在事务事实表之上建立的聚集表。
    3.累计快照事实表
    和周期快照事实表都是存储的事务数据的快照信息,周期快照事实表记录的确定的周期的数据,而累积快照事实表记录的不确定的周期的数据
    累积快照事实表代表的是完全覆盖一个事务或产品的生命周期的时间跨度,它通常具有多个日期字段,用来记录整个生命周期中的关键时间点。
    image.png

    4.3星型模型

    是一种多维的数据关系,由一个事实表和一组维表组成;
    事实表在中心,周围围绕的连接着维表;
    事实表:包含大量数据,没有冗余
    维表:逆规范化,包含一定的冗余数据
    image.png

    4.4雪花模型

    雪花模型中的维表是规范化的,没有冗余数据
    image.png
    星型模型存在数据冗余,所以在查询统计时只需要做少量的表连接,查询效率高;
    星型模型不考虑维表正规化的因素,设计、实现容易;
    在数据冗余可接受的情况下,实际上使用星型模型比较多;

    4.5事实星座

    由多个主题构成,包含多个事实表,维表公用,可以共享,星型模式的汇聚,称为星座模式或事实星座模式
    特点:公用维表
    image.png

    5.元数据

    元数据(Metadata)是关于数据的数据。元数据打通了源数据、数据仓库、数据应用,记录了数据从产生到消费的全过程。
    在大数据平台中,元数据贯穿大数据平台数据流动的全过程,主要包括数据源元数据、数据加工处理过程元数据、数据主题库专题库元数据、服务层元数据、应用层元数据等。
    image.png

    第二部分 电商离线数仓设计

    1.需求分析

    数据仓库项目主要分析以下数据:
    日志数据:启动日志,点击日志(广告点击日志)
    业务数据库的交易数据:用户下单、提交订单、支付、退款等核心交易数据分析
    数据仓库项目分析任务:

  • 会员活跃度分析主题

每日新增会员数;每日、周、月活跃会员数;留存会员数、留存会员率

  • 广告业务分析主题

广告点击次数、广告点击购买率、广告曝光次数

  • 核心交易分析主题

订单数、成交商品数、支付金额

2.数据埋点

数据埋点,将用户的浏览、点击事件采集上报的一套数据采集的方法。
跟踪应用使用情况,包括访问数,访客数,停留时长,浏览数,跳出率。
这些信息分为两类:页面统计,操作行为统计
埋点为数据分析提供基础数据,工作流程分为:
根据埋点需求完成开发(前端开发工程师 js)
App或网页采用用户数据
数据上报服务器
数据的清洗、加工、存储(大数据工程师)
进行数据分析等到相应的指标(大数据工程师)
埋点方法:
手动埋点:开发需要手动写代码,需要公司自主研发一套埋点框架
优点:埋点数据更加精准
缺点:工作量大,容易出错
无痕埋点:不用开发写代码实现,主要使用第三方统计工具,如友盟,百度移动,魔法等
优点:简单便捷
缺点:埋点数据统一,不够个性化和精准

3.数据指标体系

指标:对数据的统计值。如:会员数、活跃会员数、会员留存数;广告点击量;订单金额、订单数都是指标;
指标体系:将各种指标系统的组织起来,按照业务模型、标准对指标进行分类和分层;
建立指标体系实际上是与需求方达成一致。能有效遏制不靠谱的需求,让需求变得有条例和体系化;
指标体系是指导数据仓库建设的基石。稳定而且体系化的需求,有利于数据仓库方案的优化,和效率提升;
建议指标体系需要注重的三个原则:准确,可解释,结构性,主要分为四个步骤:理清业务阶段和需求、确定核心指标、对指标进行维度的拆解、指标的落地
指标的构成:
基础指标+【修饰词】+时间段
修饰词是可选的;基础指标和时间段是必须的
基础指标是不可拆分的指标,如:交易额、支付金额、下单数
修饰词多是某种场景的表现,如:通过搜索带来的交易等
时间段即为一个时间周期,如:双十一期间,618活动期间等
三者叠加形成业务上的常用指标,这些指标是派生指标,工作中遇到的指标多为派生性指标

4.总体架构设计

4.1技术方案选型

框架选型
Apache社区版/第三方发行版(CDH/HDP/Fusion Insight)
Cloudera的CDH:最成型的发行版本,拥有最多的部署案例。提供强大的部署、管理和监控工具。国内使用最多的版本;拥有强大的社区支持,当遇到问题时,能够通过社区、论坛等网络资源快速获取解决方法;
Hortonworks的CDH:100%开源,可以进行二次开发,但没有CDH稳定。国内使用相对较少;
华为的Fusion Insight:华为基于hadoop2.7.2版开发的,坚持分层,解耦,开放的原则,得益于高可靠性,在全国各地政府、运营商、金融系统有较多案例。
软件选型
数据采集:DataX、Flume、Sqoop、Logstash、Kafka
数据存储:HDFS、HBase
数据计算:Hive、MapReduceTez、Spark、Flink
调度系统:Airflow、azkaban、Oozie
元数据管理:Atlas
数据质量管理:Griffin
即席查询:Impala、Kylin、ClickHouse、Presto、Druid
其他:MySQL
软件版本
image.png

4.2系统逻辑框架

image.png

4.3开发物理环境

5台物理机;500G数据盘;32G内存;8个core
image.png
image.png

4.4数据仓库命名规范

1 数据库命名
命名规则:数仓对应分层
命名示例:ods / dwd / dws/ dim / temp / ads
2 数仓各层对应数据库
ods层 -> ods{业务线|业务项目}
dw层 -> dwd
{业务线|业务项目} + dws{业务线|业务项目}
dim层 -> dim
维表
ads层 -> ads{业务线|业务项目} (统计指标等)
临时数据 -> temp
{业务线|业务项目}
备注:本项目未采用
3 表命名(数据库表命名规则)
ODS层:
命名规则:ods{业务线|业务项目}[数据来源类型]_{业务}
DWD层:
命名规则:dwd{业务线|业务项目}{主题域}{子业务}
* DWS层:
命名规则:dws
{业务线|业务项目}{主题域}{汇总相关粒度}{汇总时间周期}
* ADS层:
命名规则:ads
{业务线|业务项目}{统计业务}{报表form|热门排序topN}
* DIM层:
命名规则:dim{业务线|业务项目|pub公共}{维度}
创建数据库:

  1. create database if not exists ods;
  2. create database if not exists dwd;
  3. create database if not exists dws;
  4. create database if not exists ads;
  5. create database if not exists dim;
  6. create database if not exists tmp;
  7. create database if not exists test;

第三部分 电商分析之-会员活跃度

1.需求分析

计算指标:
新增会员:每日新增会员数
活跃会员:每日,每周,每月的活跃会员数
会员留存:1日,2日,3日会员留存数、1日,2日,3日会员留存率
指标口径业务逻辑:
会员:以设备为判断标准,每个独立设备认为是一个会员。Android系统通常根据IMEI号,IOS系统通常根据OpenUDID 来标识一个独立会员,每部移动设备是一个会员;
活跃会员:打开应用的会员即为活跃会员,暂不考虑用户的实际使用情况。一台设备每天多次打开计算为一个活跃会员。
会员活跃率:一天内活跃会员数与总会员数的比率是日活跃率;还有周活跃率(自然周)、月活跃率(自然月);
新增会员:第一次使用应用的用户,定义为新增会员;卸载再次安装的设备,不会被算作一次新增。新增用户包括日新增会员、周(自然周)新增会员、月(自然月)新增会员;
留存会员与留存率:某段时间的新增会员,经过一段时间后,仍继续使用应用认为是留存会员;这部分会员占当时新增会员的比例为留存率。

2.日志数据采集

数据采集:日志文件 => Flume => HDFS => ODS
数据采集流程:
image.png
选择Flume作为采集日志数据的工具:
Flume 1.8+:提供了一个非常好用的 Taildir Source使用该source,可以监控多个目录,对目录中新写入的数据进行实时采集

2.1 taildir source配置

taildir source的特点:
使用正则表达式匹配目录中的文件名
监控的文件中,一旦有数据写入,Flume就会将信息写入到指定的Sink
高可靠,不会丢失数据
不会对跟踪文件有任何处理,不会重命名也不会删除
不支持Windows,不能读二进制文件。支持按行读取文本文件
taildir source配置:

  1. a1.sources.r1.type = TAILDIR
  2. a1.sources.r1.positionFile = /data/lagoudw/conf/startlog_position.json
  3. a1.sources.r1.filegroups = f1
  4. a1.sources.r1.filegroups.f1 = /data/lagoudw/logs/start/.*log

positionFile
配置检查点文件的路径,检查点文件会以 json 格式保存已经读取文件的位置,解决断点续传的问题
filegroups
指定filegroups,可以有多个,以空格分隔(taildir source可同时监控多个目录中的文件)
filegroups.
配置每个filegroup的文件绝对路径,文件名可以用正则表达式匹配

2.2 hdfs sink配置

  1. a1.sinks.k1.type = hdfs
  2. a1.sinks.k1.hdfs.path = /user/data/logs/start/%Y-%m-%d/
  3. a1.sinks.k1.hdfs.filePrefix = startlog.
  4. a1.sinks.k1.hdfs.fileType = DataStream
  5. # 配置文件滚动方式(文件大小32M
  6. a1.sinks.k1.hdfs.rollSize = 33554432
  7. a1.sinks.k1.hdfs.rollCount = 0
  8. a1.sinks.k1.hdfs.rollInterval = 0
  9. a1.sinks.k1.hdfs.idleTimeout = 0
  10. a1.sinks.k1.hdfs.minBlockReplicas = 1
  11. # hdfs上刷新的event的个数
  12. a1.sinks.k1.hdfs.batchSize = 100
  13. # 使用本地时间
  14. a1.sinks.k1.hdfs.useLocalTimeStamp = true

滚动生成文件的策略:
基于时间。hdfs.rollInterval 30秒
基于文件大小。hdfs.rollSize 1024字节
基于event数量。hdfs.rollCount 10个event
基于文件空闲时间。hdfs.idleTimeout 0(0,禁用)
minBlockReplicas。默认值与 hdfs 副本数一致。设为1是为了让 Flume 感知不到hdfs的块复制,此时其他的滚动方式配置(时间间隔、文件大小、events数量)才不会受影响

2.3Agent的配置

/data/lagoudw/conf/flume-log2hdfs1.conf

  1. a1.sources = r1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. # taildir source
  5. a1.sources.r1.type = TAILDIR
  6. a1.sources.r1.positionFile = /data/lagoudw/conf/startlog_position.json
  7. a1.sources.r1.filegroups = f1
  8. a1.sources.r1.filegroups.f1 = /data/lagoudw/logs/start/.*log
  9. # memorychannel
  10. a1.channels.c1.type = memory
  11. a1.channels.c1.capacity = 100000
  12. a1.channels.c1.transactionCapacity = 2000
  13. # hdfs sink
  14. a1.sinks.k1.type = hdfs
  15. a1.sinks.k1.hdfs.path = /user/data/logs/start/%Y-%m-%d/
  16. a1.sinks.k1.hdfs.filePrefix = startlog.
  17. a1.sinks.k1.hdfs.fileType = DataStream
  18. # 配置文件滚动方式(文件大小32M
  19. a1.sinks.k1.hdfs.rollSize = 33554432
  20. a1.sinks.k1.hdfs.rollCount = 0
  21. a1.sinks.k1.hdfs.rollInterval = 0
  22. a1.sinks.k1.hdfs.idleTimeout = 0
  23. a1.sinks.k1.hdfs.minBlockReplicas = 1
  24. # hdfs上刷新的event的个数
  25. a1.sinks.k1.hdfs.batchSize = 1000
  26. # 使用本地时间
  27. a1.sinks.k1.hdfs.useLocalTimeStamp = true
  28. # Bind the source and sink to the channel
  29. a1.sources.r1.channels = c1
  30. a1.sinks.k1.channel = c1

2.4 Flume的优化配置

1.启动agent

  1. flume-ng agent --conf-file /data/lagoudw/conf/flume-log2hdfs1.conf -name a1
  2. -Dflume.root.logger=INFO,console

2.启动报错
java.lang.OutOfMemoryError: GC overhead limit exceeded
Flume jvm堆内存分配过小,应在$FLUME_HOME/conf/flume-env.sh中修改JAVA_OPTS的值

  1. export JAVA_OPTS="-Xms4000m -Xmx4000m -Dcom.sun.management.jmxremote"

要想使配置文件生效,还要在命令行启动指令中指定配置文件目录

  1. flume-ng agent --conf /opt/lagou/servers/flume-1.9/conf --conf-file
  2. /data/lagoudw/conf/flume-log2hdfs1.conf
  3. -name a1 -Dflume.root.logger=INFO,console

Flume内存参数设置及优化:
根据日志数据量的大小,Jvm堆一般要设置为4G或更高-Xms -Xmx 最好设置一致,减少内存抖动带来的性能影响。
存在的问题:Flume放数据时,使用本地时间;不理会日志的时间戳

2.7采集启动日志和事件日志(使用自定义拦截器)

启动日志和事件日志放在不同的目录下。想要一次拿到全部日志需要监控多个目录
image.png
思路:
1.taildir监控多个目录
2.修改自定义拦截器,不同来源的数据加上不同标记
3.hdfs sink根据标记写文件
Agent配置
/data/lagoudw/conf/flume-log2hdfs3.conf

  1. a1.sources = r1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. # taildir source
  5. a1.sources.r1.type = TAILDIR
  6. a1.sources.r1.positionFile = /data/lagoudw/conf/startlog_position.json
  7. a1.sources.r1.filegroups = f1 f2
  8. a1.sources.r1.filegroups.f1 = /data/lagoudw/logs/start/.*log
  9. a1.sources.r1.headers.f1.logtype = start
  10. a1.sources.r1.filegroups.f2 = /data/lagoudw/logs/event/.*log
  11. a1.sources.r1.headers.f2.logtype = event
  12. # 自定义拦截器
  13. a1.sources.r1.interceptors = i1
  14. a1.sources.r1.interceptors.i1.type =
  15. cn.clzy.dw.flume.interceptor.LogTypeInterceptor$Builder
  16. # memorychannel
  17. a1.channels.c1.type = memory
  18. a1.channels.c1.capacity = 100000
  19. a1.channels.c1.transactionCapacity = 2000
  20. # hdfs sink
  21. a1.sinks.k1.type = hdfs
  22. a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}/
  23. a1.sinks.k1.hdfs.filePrefix = startlog.
  24. a1.sinks.k1.hdfs.fileType = DataStream
  25. # 配置文件滚动方式(文件大小32M
  26. a1.sinks.k1.hdfs.rollSize = 33554432
  27. a1.sinks.k1.hdfs.rollCount = 0
  28. a1.sinks.k1.hdfs.rollInterval = 0
  29. a1.sinks.k1.hdfs.idleTimeout = 0
  30. a1.sinks.k1.hdfs.minBlockReplicas = 1
  31. # hdfs上刷新的event的个数
  32. a1.sinks.k1.hdfs.batchSize = 1000
  33. # 使用本地时间
  34. # a1.sinks.k1.hdfs.useLocalTimeStamp = true
  35. # Bind the source and sink to the channel
  36. a1.sources.r1.channels = c1
  37. a1.sinks.k1.channel = c1

headers..
给event增加header key。不同的filegroup,可配置不同的value
自定义拦截器
编码完成后打包上传服务器,放在$FLUME_HOME/lib下

  1. package cn.clzy.dw.flume.interceptor;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONArray;
  4. import com.alibaba.fastjson.JSONObject;
  5. import org.apache.commons.compress.utils.Charsets;
  6. import org.apache.flume.Context;
  7. import org.apache.flume.Event;
  8. import org.apache.flume.event.SimpleEvent;
  9. import org.apache.flume.interceptor.Interceptor;
  10. import org.junit.Test;
  11. import java.time.Instant;
  12. import java.time.LocalDateTime;
  13. import java.time.ZoneId;
  14. import java.time.format.DateTimeFormatter;
  15. import java.util.ArrayList;
  16. import java.util.HashMap;
  17. import java.util.List;
  18. import java.util.Map;
  19. public class LogTypeInterceptor implements Interceptor {
  20. @Override
  21. public void initialize() {
  22. }
  23. @Override
  24. // 逐条处理event
  25. public Event intercept(Event event) {
  26. // 获取 event 的 body
  27. String eventBody = new String(event.getBody(), Charsets.UTF_8);
  28. // 获取 event 的 header
  29. Map<String, String> headersMap = event.getHeaders();
  30. // 解析body获取json串
  31. String[] bodyArr = eventBody.split("\\s+");
  32. try{
  33. String jsonStr = bodyArr[6];
  34. // 解析json串获取时间戳
  35. String timestampStr = "";
  36. JSONObject jsonObject = JSON.parseObject(jsonStr);
  37. if (headersMap.getOrDefault("logtype", "").equals("start")){
  38. // 取启动日志的时间戳
  39. timestampStr =jsonObject.getJSONObject("app_active").getString("time");
  40. } else if (headersMap.getOrDefault("logtype","").equals("event")) {
  41. // 取事件日志第一条记录的时间戳
  42. JSONArray jsonArray =jsonObject.getJSONArray("lagou_event");
  43. if (jsonArray.size() > 0){
  44. timestampStr =jsonArray.getJSONObject(0).getString("time");
  45. }
  46. }
  47. // 将时间戳转换为字符串 "yyyy-MM-dd"
  48. // 将字符串转换为Long
  49. long timestamp = Long.parseLong(timestampStr);
  50. DateTimeFormatter formatter =DateTimeFormatter.ofPattern("yyyy-MM-dd");
  51. Instant instant = Instant.ofEpochMilli(timestamp);
  52. LocalDateTime localDateTime = LocalDateTime.ofInstant(instant,ZoneId.systemDefault());
  53. String date = formatter.format(localDateTime);
  54. // 将转换后的字符串放置header中
  55. headersMap.put("logtime", date);
  56. event.setHeaders(headersMap);
  57. }catch (Exception e){
  58. headersMap.put("logtime", "Unknown");
  59. event.setHeaders(headersMap);
  60. }
  61. return event;
  62. }
  63. @Override
  64. public List<Event> intercept(List<Event> events) {
  65. List<Event> lstEvent = new ArrayList<>();
  66. for (Event event: events){
  67. Event outEvent = intercept(event);
  68. if (outEvent != null) {
  69. lstEvent.add(outEvent);
  70. }
  71. }
  72. return lstEvent;
  73. }
  74. @Override
  75. public void close() {
  76. }
  77. public static class Builder implements Interceptor.Builder {
  78. @Override
  79. public Interceptor build() {
  80. return new LogTypeInterceptor();
  81. }
  82. @Override
  83. public void configure(Context context) {
  84. }
  85. }
  86. @Test
  87. public void startJunit(){
  88. String str = "2020-08-02 18:19:32.959 [main] INFO
  89. com.lagou.ecommerce.AppStart - {\"app_active\":
  90. {\"name\":\"app_active\",\"json\":
  91. {\"entry\":\"1\",\"action\":\"0\",\"error_code\":\"0\"},\"time\":159634284
  92. 0284},\"attr\":{\"area\":\"大庆
  93. \",\"uid\":\"2F10092A2\",\"app_v\":\"1.1.15\",\"event_type\":\"common\",\"
  94. device_id\":\"1FB872-
  95. 9A1002\",\"os_type\":\"2.8\",\"channel\":\"TB\",\"language\":\"chinese\",\
  96. "brand\":\"iphone-8\"}}";
  97. Map<String, String> map = new HashMap<>();
  98. // new Event
  99. Event event = new SimpleEvent();
  100. map.put("logtype", "start");
  101. event.setHeaders(map);
  102. event.setBody(str.getBytes(Charsets.UTF_8));
  103. // 调用interceptor处理event
  104. LogTypeInterceptor customerInterceptor = new LogTypeInterceptor();
  105. Event outEvent = customerInterceptor.intercept(event);
  106. // 处理结果
  107. Map<String, String> headersMap = outEvent.getHeaders();
  108. System.out.println(JSON.toJSONString(headersMap));
  109. }
  110. @Test
  111. public void eventJunit(){
  112. String str = "2020-08-02 18:20:11.877 [main] INFO
  113. com.lagou.ecommerce.AppEvent - {\"lagou_event\":
  114. [{\"name\":\"goods_detail_loading\",\"json\":
  115. {\"entry\":\"1\",\"goodsid\":\"0\",\"loading_time\":\"93\",\"action\":\"3\
  116. ",\"staytime\":\"56\",\"showtype\":\"2\"},\"time\":1596343881690},
  117. {\"name\":\"loading\",\"json\":
  118. {\"loading_time\":\"15\",\"action\":\"3\",\"loading_type\":\"3\",\"type\":
  119. \"1\"},\"time\":1596356988428},{\"name\":\"notification\",\"json\":
  120. {\"action\":\"1\",\"type\":\"2\"},\"time\":1596374167278},
  121. {\"name\":\"favorites\",\"json\":
  122. {\"course_id\":1,\"id\":0,\"userid\":0},\"time\":1596350933962}],\"attr\":
  123. {\"area\":\"长治
  124. \",\"uid\":\"2F10092A4\",\"app_v\":\"1.1.14\",\"event_type\":\"common\",\"
  125. device_id\":\"1FB872-
  126. 9A1004\",\"os_type\":\"0.5.0\",\"channel\":\"QL\",\"language\":\"chinese\"
  127. ,\"brand\":\"xiaomi-0\"}}";
  128. Map<String, String> map = new HashMap<>();
  129. // new Event
  130. Event event = new SimpleEvent();
  131. map.put("logtype", "event");
  132. event.setHeaders(map);
  133. event.setBody(str.getBytes(Charsets.UTF_8));
  134. // 调用interceptor处理event
  135. LogTypeInterceptor customerInterceptor = new LogTypeInterceptor();
  136. Event outEvent = customerInterceptor.intercept(event);
  137. // 处理结果
  138. Map<String, String> headersMap = outEvent.getHeaders();
  139. System.out.println(JSON.toJSONString(headersMap));
  140. }
  141. }

启动Agent,拷贝日志,检查HDFS文件

# 清理环境
rm -f /data/lagoudw/conf/startlog_position.json
rm -f /data/lagoudw/logs/start/*.log
rm -f /data/lagoudw/logs/event/*.log
# 启动 Agent
flume-ng agent --conf /opt/lagou/servers/flume-1.9/conf --conf-file
/data/lagoudw/conf/flume-log2hdfs4.conf -name a1 -
Dflume.root.logger=INFO,console
# 拷贝日志
cd /data/lagoudw/logs/source
cp event0802.log ../event/
cp start0802.log ../start/
# 检查HDFS文件
hdfs dfs -ls /user/data/logs/event
hdfs dfs -ls /user/data/logs/start
# 生产环境中用以下方式启动Agent
nohup flume-ng agent --conf /opt/lagou/servers/flume-1.9/conf --conf-file
/data/lagoudw/conf/flume-log2hdfs3.conf -name a1 -
Dflume.root.logger=INFO,LOGFILE > /dev/null 2>&1 &

nohup,该命令允许用户退出帐户/关闭终端之后继续运行相应的进程
/dev/null,代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称黑洞
标准输入0,从键盘获得输入 /proc/self/fd/0
标准输出1,输出到屏幕(控制台) /proc/self/fd/1
错误输出2,输出到屏幕(控制台) /proc/self/fd/2
>/dev/null标准输出1重定向到 /dev/null 中,此时标准输出不存在,没有任何地方能够找到输出的内容
2>&1 错误输出将会和标准输出输出到同一个地方
>/dev/null 2>&1 不会输出任何信息到控制台,也不会有任何信息输出到文件中

2.8日志数据采集小结

使用taildir source 监控指定的多个目录,可以给不同目录的日志加上不同 header
在每个目录中可以使用正则匹配多个文件
使用自定义拦截器,主要功能是从json串中获取时间戳,加到event的header中
hdfs sink使用event header中的信息写数据(控制写文件的位置)
hdfs文件的滚动方式(基于文件大小、基于event数量、基于时间)
调节flume jvm内存的分配

3.ODS建表和数据加载

image.png
ODS层的数据与源数据的格式基本相同
建表:

use ODS;
create external table ods.ods_start_log(
`str` string)
comment '用户启动日志信息'
partitioned by (`dt` string)
location '/user/data/logs/start';

加载启动日志数据:
/data/lagoudw/script/member_active/ods_load_log.sh
可以传参数确定日志,如果没有传参使用昨天日期

#!/bin/bash
APP=ODS
hive=/opt/apps/hive-2.3.7/bin/hive
# 可以输入日期;如果未输入日期取昨天的时间
if [ -n "$1" ]
then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
# 定义要执行的SQL
sql="
alter table "$APP".ods_start_log add partition(dt='$do_date');
"
$hive -e "$sql"

sh ods_load_log.sh 2020-07-21
sh ods_load_log.sh 2020-07-22
sh ods_load_log.sh 2020-07-23

4.json数据的处理

数据文件中每行必须是一个完整的 json 串,一个 json串 不能跨越多行。
Hive处理json数据的方式:

  • 使用内建的函数get_json_object、json_tuple
  • 使用自定义的UDF
  • 使用第三方的SerDe

    4.1使用内建函数处理

    get_json_object(string json_string, string path)
    返回值:String
    说明:解析json字符串json_string,返回path指定的内容;如果输入的json字符串无效,那么返回NUll;函数每次只能返回一个数据项;
    json_tuple(jsonStr, k1, k2, …)
    返回值:所有的输入参数、输出参数都是String;
    说明:参数为一组键k1,k2,。。。。。和json字符串,返回值的元组。该方法比get_json_object高效,因此可以在一次调用中输入多个键;
    explode,使用explod将Hive一行中复杂的 array 或 map 结构拆分成多行。
    测试数据和建表加载数据:
    user1;18;male;{"id": 1,"ids": [101,102,103],"total_number": 3}
    user2;20;female;{"id": 2,"ids": [201,202,203,204],"total_number": 4}
    user3;23;male;{"id": 3,"ids": [301,302,303,304,305],"total_number": 5}
    user4;17;male;{"id": 4,"ids": [401,402,403,304],"total_number": 5}
    user5;35;female;{"id": 5,"ids": [501,502,503],"total_number": 3}
    use test;
    CREATE TABLE IF NOT EXISTS jsont1(
    username string,
    age int,
    sex string,
    json string
    )
    row format delimited fields terminated by ';';
    load data local inpath '/data/lagoudw/data/weibo.json' overwrite into table jsont1;
    
    json的处理:
    -- get 单层值
    select username, age, sex, get_json_object(json, "$.id") id,
    get_json_object(json, "$.ids") ids,
    get_json_object(json, "$.total_number") num
    from jsont1;
    -- get 数组
    select username, age, sex, get_json_object(json, "$.id") id,
    get_json_object(json, "$.ids[0]") ids0,
    get_json_object(json, "$.ids[1]") ids1,
    get_json_object(json, "$.ids[2]") ids2,
    get_json_object(json, "$.ids[3]") ids3,
    get_json_object(json, "$.total_number") num
    from jsont1;
    -- 使用 json_tuple 一次处理多个字段
    select json_tuple(json, 'id', 'ids', 'total_number') from jsont1;
    -- 使用 explode + lateral view
    -- 第一步,将 [101,102,103] 中的 [ ] 替换掉
    -- 第二步,将上一步的字符串变为数组
    -- 第三步,使用explode + lateral view 将数据展开
    with tmp as(
    select username, age, sex, id, ids, num
    from jsont1
    lateral view json_tuple(json, 'id', 'ids', 'total_number') t1 as id, ids,num
    )
    select username, age, sex, id, ids1, num from tmp
    lateral view explode(split(regexp_replace(ids, "\\[|\\]", ""), ",")) t1 as ids1;
    
    json_tuple 优点是一次可以解析多个json字段,对嵌套结果的解析操作复杂;

    4.2使用UDF处理

    自定义UDF处理json串中的数组
    输入:json串,数组的key
    输出:字符串数组
    pom文件增加依赖项
    <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>2.3.7</version>
    <scope>provided</scope>
    </dependency>
    
    java文件 ```java package cn.clzy.dw.hive.udf; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; import com.google.common.base.Strings; import org.apache.hadoop.hive.ql.exec.UDF; import org.junit.Test;

import java.util.ArrayList;

public class ParseJsonArray extends UDF { public ArrayList evaluate(String jsonStr,String arrKey){ if (Strings.isNullOrEmpty(jsonStr)){ return null; } try { JSONObject object= JSON.parseObject(jsonStr); JSONArray jsonArray=object.getJSONArray(arrKey); ArrayList result = new ArrayList<>(); for (Object o : jsonArray) { result.add(o.toString()); } return result; }catch (JSONException e){ return null; } } @Test public void JunitParseJsonArray(){ String str=”{\”id\”: 1,\”ids\”: [101,102,103],\”total_number\”:3}”; String key=”ids”; ArrayList evaluate = evaluate(str, key); System.out.println(JSON.toJSONString(evaluate)); } }

使用自定义UDF函数:
```java
-- 添加开发的jar包(在Hive命令行中)
add jar /data/lagoudw/jars/model3_dw-1.0-SNAPSHOT-jar-with-dependencies.jar;
-- 创建临时函数。指定类名一定要完整的路径,即包名加类名
create temporary function lagou_json_array as "cn.clzy.dw.hive.udf.ParseJsonArray";
-- 执行查询
-- 解析json串中的数组
select username, age, sex, lagou_json_array(json, "ids") ids from jsont1;
-- 解析json串中的数组,并展开
select username, age, sex, ids1 from jsont1
lateral view explode(lagou_json_array(json, "ids")) t1 as ids1;
-- 解析json串中的id、num
select username, age, sex, id, num
from jsont1
lateral view json_tuple(json, 'id', 'total_number') t1 as id, num;
-- 解析json串中的数组,并展开
select username, age, sex, ids1, id, num
from jsont1
lateral view explode(lagou_json_array(json, "ids")) t1 as ids1
lateral view json_tuple(json, 'id', 'total_number') t1 as id, num;

4.3使用SerDe处理

序列化是对象转换为字节序列的过程;反序列化是字节序列恢复为对象的过程;
对象序列化主要有两种途径:

  • 对象的持久化,把对象转化成字节序列后保存到文件中
  • 对象数据的网络传送

Hive使用Serde进行行对象的序列与反序列化。最后实现把文件内容映射到 hive 表中的字段数据类型。
hive自身自带的内置SerDe:
LazySimpleSerDe(默认的SerDe
ParquetHiveSerDe
OrcSerde
对于纯json格式的数据,可以使用JsonSerDe来处理

{"id": 1,"ids": [101,102,103],"total_number": 3}
{"id": 2,"ids": [201,202,203,204],"total_number": 4}
{"id": 3,"ids": [301,302,303,304,305],"total_number": 5}
{"id": 4,"ids": [401,402,403,304],"total_number": 5}
{"id": 5,"ids": [501,502,503],"total_number": 3}
create table jsont2(
id int,
ids array<string>,
total_number int
)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
load data local inpath '/data/lagoudw/data/json2.dat' into table jsont2;

小结:
1、简单格式的json数据,使用get_json_object、json_tuple处理
2、对于嵌套数据类型,可以使用UDF
3、纯json串可使用JsonSerDe处理更简单

5.DWD层建表和数据加载

主要任务:ODS(包含json串) => DWD
json数据解析,丢弃无用数据(数据清洗),保留有效信息,并将数据展开,形成用户每日启动明细表

5.1创建DWD层表

use DWD;
drop table if exists dwd.dwd_start_log;
CREATE TABLE dwd.dwd_start_log(
`device_id` string,
`area` string,
`uid` string,
`app_v` string,
`event_type` string,
`os_type` string,
`channel` string,
`language` string,
`brand` string,
`entry` string,
`action` string,
`error_code` string
)
PARTITIONED BY (dt string)
STORED AS parquet;

5.2加载DWD层数据

script/member_active/dwd_load_start.sh

#!/bin/bash
source /etc/profile
# 可以输入日期;如果未输入日期取昨天的时间
if [ -n "$1" ]
then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
# 定义要执行的SQL
sql="
with tmp as(
select split(str, ' ')[7] line
from ods.ods_start_log
where dt='$do_date'
)
insert overwrite table dwd.dwd_start_log
partition(dt='$do_date')
select get_json_object(line, '$.attr.device_id'),
get_json_object(line, '$.attr.area'),
get_json_object(line, '$.attr.uid'),
get_json_object(line, '$.attr.app_v'),
get_json_object(line, '$.attr.event_type'),
get_json_object(line, '$.attr.os_type'),
get_json_object(line, '$.attr.channel'),
get_json_object(line, '$.attr.language'),
get_json_object(line, '$.attr.brand'),
get_json_object(line, '$.app_active.json.entry'),
get_json_object(line, '$.app_active.json.action'),
get_json_object(line, '$.app_active.json.error_code')
from tmp;
"
hive -e "$sql"

DWD:json数据的解析,数据清洗

6.活跃会员

活跃会员:打开应用的会员即为活跃会员;
新增会员:第一次使用应用的用户,定义为新增会员;
留存会员:某段时间的新增会员,经过一段时间后,仍继续使用应用认为是留存会员;
活跃会员指标需求:每日、每周、每月的活跃会员数
DWD:会员的每日启动信息明细(会员都是活跃会员;某个会员可能会出现多次)
DWS:每日活跃会员信息(关键)、每周活跃会员信息、每月活跃会员信息
ADS:每日、每周、每月活跃会员数(输出)
处理过程:
1、建表(每日、每周、每月活跃会员信息)
2、每日启动明细 ===> 每日活跃会员
3、每日活跃会员 => 每周活跃会员;每日活跃会员 => 每月活跃会员
4、汇总生成ADS层的数据

6.1创建DWS层表

use dws;
drop table if exists dws.dws_member_start_day;
create table dws.dws_member_start_day
(
`device_id` string,
`uid` string,
`app_v` string,
`os_type` string,
`language` string,
`channel` string,
`area` string,
`brand` string
) COMMENT '会员日启动汇总'
partitioned by(dt string)
stored as parquet;
drop table if exists dws.dws_member_start_week;
create table dws.dws_member_start_week(
`device_id` string,
`uid` string,
`app_v` string,
`os_type` string,
`language` string,
`channel` string,
`area` string,
`brand` string,
`week` string
) COMMENT '会员周启动汇总'
PARTITIONED BY (`dt` string)
stored as parquet;
drop table if exists dws.dws_member_start_month;
create table dws.dws_member_start_month(
`device_id` string,
`uid` string,
`app_v` string,
`os_type` string,
`language` string,
`channel` string,
`area` string,
`brand` string,
`month` string
) COMMENT '会员月启动汇总'
PARTITIONED BY (`dt` string)
stored as parquet;

6.2加载DWS层数据

script/member_active/dws_load_member_start.sh

#!/bin/bash
source /etc/profile
# 可以输入日期;如果未输入日期取昨天的时间
if [ -n "$1" ]
then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
# 定义要执行的SQL
# 汇总得到每日活跃会员信息;每日数据汇总得到每周、每月数据
sql="
insert overwrite table dws.dws_member_start_day
partition(dt='$do_date')
select device_id,
concat_ws('|', collect_set(uid)),
concat_ws('|', collect_set(app_v)),
concat_ws('|', collect_set(os_type)),
concat_ws('|', collect_set(language)),
concat_ws('|', collect_set(channel)),
concat_ws('|', collect_set(area)),
concat_ws('|', collect_set(brand))
from dwd.dwd_start_log
where dt='$do_date'
group by device_id;
-- 汇总得到每周活跃会员
insert overwrite table dws.dws_member_start_week
partition(dt='$do_date')
select device_id,
concat_ws('|', collect_set(uid)),
concat_ws('|', collect_set(app_v)),
concat_ws('|', collect_set(os_type)),
concat_ws('|', collect_set(language)),
concat_ws('|', collect_set(channel)),
concat_ws('|', collect_set(area)),
concat_ws('|', collect_set(brand)),
date_add(next_day('$do_date', 'mo'), -7)
from dws.dws_member_start_day
where dt >= date_add(next_day('$do_date', 'mo'), -7)
and dt <= '$do_date'
group by device_id;
-- 汇总得到每月活跃会员
insert overwrite table dws.dws_member_start_month
partition(dt='$do_date')
select device_id,
concat_ws('|', collect_set(uid)),
concat_ws('|', collect_set(app_v)),
concat_ws('|', collect_set(os_type)),
concat_ws('|', collect_set(language)),
concat_ws('|', collect_set(channel)),
concat_ws('|', collect_set(area)),
concat_ws('|', collect_set(brand)),
date_format('$do_date', 'yyyy-MM')
from dws.dws_member_start_day
where dt >= date_format('$do_date', 'yyyy-MM-01')
and dt <= '$do_date'
group by device_id;
"
hive -e "$sql"

DWS(每日、每周、每月活跃会员的汇总表)

6.3创建ADS层

计算当天,当周,当月活跃会员数量

drop table if exists ads.ads_member_active_count;
create table ads.ads_member_active_count(
`day_count` int COMMENT '当日会员数量',
`week_count` int COMMENT '当周会员数量',
`month_count` int COMMENT '当月会员数量'
) COMMENT '活跃会员数'
partitioned by(dt string)
row format delimited fields terminated by ',';

6.4加载ADS层数据

script/member_active/ads_load_member_active.sh

#!/bin/bash
source /etc/profile
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql="
with tmp as(
select 'day' datelabel, count(*) cnt, dt
from dws.dws_member_start_day
where dt='$do_date'
group by dt
union all
select 'week' datelabel, count(*) cnt, dt
from dws.dws_member_start_week
where dt='$do_date'
group by dt
union all
select 'month' datelabel, count(*) cnt, dt
from dws.dws_member_start_month
where dt='$do_date'
group by dt
)
insert overwrite table ads.ads_member_active_count
partition(dt='$do_date')
select sum(case when datelabel='day' then cnt end) as day_count,
sum(case when datelabel='week' then cnt end) as week_count,
sum(case when datelabel='month' then cnt end) as month_count
from tmp
group by dt;
"
hive -e "$sql"

或者

#!/bin/bash
source /etc/profile
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table ads.ads_member_active_count
partition(dt='$do_date')
select daycnt, weekcnt, monthcnt
from (select dt, count(*) daycnt
from dws.dws_member_start_day
where dt='$do_date'
group by dt
) day join
(select dt, count(*) weekcnt
from dws.dws_member_start_week
where dt='$do_date'
group by dt
) week on day.dt=week.dt
join
(select dt, count(*) monthcnt
from dws.dws_member_start_month
where dt='$do_date'
group by dt
) month on day.dt=month.dt;
"
hive -e "$sql"

6.5小结

image.png
脚本执行顺序
ods_load_startlog.sh
dwd_load_startlog.sh
dws_load_member_start.sh
ads_load_member_active.sh

7.新增会员

留存会员:某段时间的新增会员,经过一段时间后,仍继续使用应用认为是留存会员;
新增会员:第一次使用应用的用户,定义为新增会员;卸载再次安装的设备,不会被算作一次新增。
需求:每日新增会员数
流程:

  • 在所有会员信息中增加时间列,表示这个会员是哪一天成为新增会员
  • 只需要一张表:所有会员的信息(id,dt)
  • 将新增会员 插入 所有会员表中

测试:计算新增会员

-- 日启动表 => DWS
drop table t1;
create table t1(id int, dt string)
row format delimited fields terminated by ',';
load data local inpath '/data/lagoudw/data/t10.dat' into table t1;
4,2020-08-02
5,2020-08-02
6,2020-08-02
7,2020-08-02
8,2020-08-02
9,2020-08-02
-- 全量数据 => DWS
drop table t2;
create table t2(id int, dt string)
row format delimited fields terminated by ',';
load data local inpath '/data/lagoudw/data/t2.dat' into table t2;
1,2020-08-01
2,2020-08-01
3,2020-08-01
4,2020-08-01
5,2020-08-01
6,2020-08-01
-- 找出 2020-08-02 的新用户
select t1.id, t1.dt, t2.id, t2.dt
from t1 left join t2 on t1.id=t2.id
where t1.dt="2020-08-02";
select t1.id, t1.dt
from t1 left join t2 on t1.id=t2.id
where t1.dt="2020-08-02"
and t2.id is null;
-- 将找到 2020-08-02 新用户数据插入t2表中
insert into table t2
select t1.id, t1.dt
from t1 left join t2 on t1.id=t2.id
where t1.dt="2020-08-02"
and t2.id is null;
-- 检查结果
select * from t2;
-- t1 加载 2020-08-03 的数据
14,2020-08-03
15,2020-08-03
16,2020-08-03
17,2020-08-03
18,2020-08-03
19,2020-08-03
load data local inpath '/data/lagoudw/data/t3.dat' into table t1;
-- 将找到 2020-08-03 新用户数据插入t2表中
insert into table t2
select t1.id, t1.dt
from t1 left join t
where t1.dt="2020-08-03"
and t2.id is null;
-- 检查结果
select * from t2;

7.1创建DWS层表

drop table if exists dws.dws_member_add_day;
create table dws.dws_member_add_day
(
`device_id` string,
`uid` string,
`app_v` string,
`os_type` string,
`language` string,
`channel` string,
`area` string,
`brand` string,
`dt` string
) COMMENT '每日新增会员明细'
stored as parquet;

7.2加载DWS层

script/member_active/dws_load_member_add_day.sh

#!/bin/bash
source /etc/profile
if [ -n "$1" ]
then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql="
insert into table dws.dws_member_add_day
select t1.device_id,
t1.uid,
t1.app_v,
t1.os_type,
t1.language,
t1.channel,
t1.area,
t1.brand,
'$do_date'
from dws.dws_member_start_day t1 left join dws.dws_member_add_day t2
on t1.device_id=t2.device_id
where t1.dt='$do_date'
and t2.device_id is null;
"
hive -e "$sql"

7.3创建ADS层表

drop table if exists ads.ads_new_member_cnt;
create table ads.ads_new_member_cnt
(
`cnt` string
)
partitioned by(dt string)
row format delimited fields terminated by ',';

7.4加载ADS层数据

script/member_active/ads_load_member_add.sh

#!/bin/bash
source /etc/profile
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table ads.ads_new_member_cnt
partition (dt='$do_date')
select count(1)
from dws.dws_member_add_day
where dt = '$do_date'
"
hive -e "$sql"

7.5小结

image.png
调用脚本次序:
dws_load_member_add_day.sh
ads_load_member_add.sh

8.留存会员

留存会员与留存率:某段时间的新增会员,经过一段时间后,仍继续使用应用认为是留存会员;这部分会员占当时新增会员的比例为留存率。
image.png

8.1创建DWS层表

-- 会员留存明细
drop table if exists dws.dws_member_retention_day;
create table dws.dws_member_retention_day
(
`device_id` string,
`uid` string,
`app_v` string,
`os_type` string,
`language` string,
`channel` string,
`area` string,
`brand` string,
`add_date` string comment '会员新增时间',
`retention_date` int comment '留存天数'
)COMMENT '每日会员留存明细'
PARTITIONED BY (`dt` string)
stored as parquet;

8.2加载DWS层数据

script/member_active/dws_load_member_retention_day.sh

#!/bin/bash
source /etc/profile
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table dws.dws_member_retention_day
partition(dt='$do_date')
(
select t2.device_id,
t2.uid,
t2.app_v,
t2.os_type,
t2.language,
t2.channel,
t2.area,
t2.brand,
t2.dt add_date,
1
from dws.dws_member_start_day t1 join dws.dws_member_add_day t2 on
t1.device_id=t2.device_id
where t2.dt=date_add('$do_date', -1)
and t1.dt='$do_date'
  union all
select t2.device_id,
t2.uid,
t2.app_v,
t2.os_type,
t2.language,
t2.channel,
t2.area,
t2.brand,
t2.dt add_date,
2
from dws.dws_member_start_day t1 join dws.dws_member_add_day t2 on
t1.device_id=t2.device_id
where t2.dt=date_add('$do_date', -2)
and t1.dt='$do_date'
union all
select t2.device_id,
t2.uid,
t2.app_v,
t2.os_type,
t2.language,
t2.channel,
t2.area,
  t2.brand,
t2.dt add_date,
3
from dws.dws_member_start_day t1 join dws.dws_member_add_day t2 on
t1.device_id=t2.device_id
where t2.dt=date_add('$do_date', -3)
and t1.dt='$do_date'
);
"
hive -e "$sql"

若出现return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask的错误
需改写SQL

#!/bin/bash
source /etc/profile
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql="
drop table if exists tmp.tmp_member_retention;
create table tmp.tmp_member_retention as
(
select t2.device_id,
t2.uid,
t2.app_v,
t2.os_type,
t2.language,
t2.channel,
t2.area,
t2.brand,
t2.dt add_date,
1
from dws.dws_member_start_day t1 join dws.dws_member_add_day t2 on
t1.device_id=t2.device_id
where t2.dt=date_add('$do_date', -1)
and t1.dt='$do_date'
union all
select t2.device_id,
t2.uid,
t2.app_v,
t2.os_type,
t2.language,
t2.channel,
t2.area,
t2.brand,
t2.dt add_date,
2
from dws.dws_member_start_day t1 join dws.dws_member_add_day t2 on
t1.device_id=t2.device_id
where t2.dt=date_add('$do_date', -2)
and t1.dt='$do_date'
union all
select t2.device_id,
t2.uid,
t2.app_v,
t2.os_type,
t2.language,
t2.channel,
t2.area,
t2.brand,
t2.dt add_date,
3
from dws.dws_member_start_day t1 join dws.dws_member_add_day t2 on
t1.device_id=t2.device_id
where t2.dt=date_add('$do_date', -3)
and t1.dt='$do_date'
);
insert overwrite table dws.dws_member_retention_day
partition(dt='$do_date')
select * from tmp.tmp_member_retention;
"
hive -e "$sql"

8.3创建ADS层表

-- 会员留存数
drop table if exists ads.ads_member_retention_count;
create table ads.ads_member_retention_count
(
`add_date` string comment '新增日期',
`retention_day` int comment '截止当前日期留存天数',
`retention_count` bigint comment '留存数'
) COMMENT '会员留存数'
partitioned by(dt string)
row format delimited fields terminated by ',';
-- 会员留存率
drop table if exists ads.ads_member_retention_rate;
create table ads.ads_member_retention_rate
(
`add_date` string comment '新增日期',
`retention_day` int comment '截止当前日期留存天数',
`retention_count` bigint comment '留存数',
`new_mid_count` bigint comment '当日会员新增数',
`retention_ratio` decimal(10,2) comment '留存率'
) COMMENT '会员留存率'
partitioned by(dt string)
row format delimited fields terminated by ',';

8.4加载ADS层数据

script/member_active/ads_load_member_retention.sh
会员留存率的计算
image.png

#!/bin/bash
source /etc/profile
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table ads.ads_member_retention_count
partition (dt='$do_date')
select add_date, retention_date,
count(*) retention_count
from dws.dws_member_retention_day
where dt='$do_date'
group by add_date, retention_date;
insert overwrite table ads.ads_member_retention_rate
partition (dt='$do_date')
select t1.add_date,
t1.retention_day,
t1.retention_count,
t2.cnt,
t1.retention_count/t2.cnt*100
from ads.ads_member_retention_count t1 join ads.ads_new_member_cnt t2 on
t1.add_date=t2.dt
where t1.dt='$do_date';
"
hive -e "$sql"

8.5小结

image.png
脚本调用次序:

# 加载ODS / DWD 层采集
ods_load_startlog.sh
dwd_load_startlog.sh
# 活跃会员
dws_load_member_start.sh
ads_load_member_active.sh
# 新增会员
dws_load_member_add_day.sh
ads_load_member_add.sh
# 会员留存
dws_load_member_retention_day.sh
ads_load_member_retention.sh

9.Datax数据导出

image.png
ADS有4张表需要从数据仓库的ADS层导入MySQL,即:Hive => MySQL
ads.ads_member_active_count
ads.ads_member_retention_count
ads.ads_member_retention_rate
ads.ads_new_member_cnt

-- MySQL 建表
-- 活跃会员数
create database dwads;
drop table if exists dwads.ads_member_active_count;
create table dwads.ads_member_active_count(
`dt` varchar(10) COMMENT '统计日期',
`day_count` int COMMENT '当日会员数量',
`week_count` int COMMENT '当周会员数量',
`month_count` int COMMENT '当月会员数量',
primary key (dt)
);
-- 新增会员数
drop table if exists dwads.ads_new_member_cnt;
create table dwads.ads_new_member_cnt
(
`dt` varchar(10) COMMENT '统计日期',
`cnt` string,
primary key (dt)
);
-- 会员留存数
drop table if exists dwads.ads_member_retention_count;
create table dwads.ads_member_retention_count
(
`dt` varchar(10) COMMENT '统计日期',
`add_date` string comment '新增日期',
`retention_day` int comment '截止当前日期留存天数',
`retention_count` bigint comment '留存数',
primary key (dt)
) COMMENT '会员留存情况';
      -- 会员留存率
drop table if exists dwads.ads_member_retention_rate;
create table dwads.ads_member_retention_rate
(
`dt` varchar(10) COMMENT '统计日期',
`add_date` string comment '新增日期',
`retention_day` int comment '截止当前日期留存天数',
`retention_count` bigint comment '留存数',
`new_mid_count` bigint comment '当日会员新增数',
`retention_ratio` decimal(10,2) comment '留存率',
primary key (dt)
) COMMENT '会员留存率';

导出活跃会员数(ads_member_active_count)
export_member_active_count.json
hdfsreader => mysqlwriter

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      }
    },
    "content": [
      {
        "reader": {
          "name": "hdfsreader",
          "parameter": {
            "path": "/lagou/hive/warehouse/ads.db/ads_member_active_count/dt=$do_date/*",
            "defaultFS": "hdfs://hadoop1:9000",
            "column": [
              {
                "type": "string",
                "value": "$do_date"
              },
              {
                "index": 0,
                "type": "string"
              },
              {
                "index": 1,
                "type": "string"
              },
              {
                "index": 2,
                "type": "string"
              }
            ],
            "fileType": "text",
            "encoding": "UTF-8",
            "fieldDelimiter": ","
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "writeMode": "replace",
            "username": "hive",
            "password": "12345678",
            "column": [
              "dt",
              "day_count",
              "week_count",
              "month_count"
            ],
            "preSql": [
              ""
            ],
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://hadoop2:3306/dwads?useUnicode=true&characterEncoding=utf-8",
                "table": [
                  "ads_member_active_count"
                ]
              }
            ]
          }
        }
      }
    ]
  }
}

执行命令:

python datax.py -p "-Ddo_date=2020-07-23" /data/lagoudw/script/member_active/t1.json

export_member_active_count.sh

#!/bin/bash
JSON=/data/lagoudw/script/member_active
source /etc/profile
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
python $DATAX_HOME/bin/datax.py -p "-Ddo_date=$do_date"
$JSON/export_member_active_count.json 2020-07-23

9.1调整map task和reduce task的个数,修改每个task的内存大小

调整每个task内存大小

<property>
<name>mapred.child.java.opts</name>
<value>-Xmx3072m</value>
</property>

调整map个数
mapred.max.split.size=256000000
# mapred.max.split.size=256M;3.5G / 256M = 14 Mapper Task
调整reduce个数
hive.exec.reducers.bytes.per.reducer
hive.exec.reducers.max

第四部分 电商分析之—广告业务

1.需求分析

采集的信息包括:
商品详情页加载:goods_detail_loading
商品列表:loading
消息通知:notification
商品评论:comment
收藏:favorites
点赞:praise
广告:ad

  • action。用户行为;0 曝光;1 曝光后点击;2 购买
  • duration。停留时长
  • shop_id。商家id
  • event_type。”ad”
  • ad_type。格式类型;1 JPG;2 PNG;3 GIF;4 SWF
  • show_style。显示风格,0 静态图;1 动态图
  • product_id。产品id
  • place。广告位置;首页=1,左侧=2,右侧=3,列表页=4
  • sort。排序位置

需求指标:
1、点击次数统计(分时统计)
曝光次数、不同用户id数、不同用户数
点击次数、不同用户id数、不同用户数
购买次数、不同用户id数、不同用户数
2、转化率-漏斗分析
点击率 = 点击次数 / 曝光次数
购买率 = 购买次数 / 点击次数
3、活动曝光效果评估:
行为(曝光、点击、购买)、时间段、广告位、产品,统计对应的次数
时间段、广告位、商品,曝光次数最多的前N个

3.ODS层建表和数据加载

drop table if exists ods.ods_log_event;
CREATE EXTERNAL TABLE ods.ods_log_event(`str` string)
PARTITIONED BY (`dt` string)
STORED AS TEXTFILE
LOCATION '/user/data/logs/event';

/data/lagoudw/script/advertisement/ods_load_event_log.sh

#!/bin/bash
source /etc/profile
if [ -n "$1" ]
then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql="
alter table ods.ods_log_event add partition (dt='$do_date');
"
hive -e "$sql"

4.DWD层建表和数据加载

ODS => 解析json,从json串中,提取jsonArray数据;将公共信息从json串中解析出来 => 所有事件的明细
所有事件的明细,包括:

  • 分区
  • 事件(json串)
  • 公共信息字段

所有事件的明细 => 广告json串解析 => 广告事件的明细
广告事件的明细:

  • 分区
  • 广告信息字段
  • 公共信息字段

    4.1DWD层建表

    -- 所有事件明细
    drop table if exists dwd.dwd_event_log;
    CREATE EXTERNAL TABLE dwd.dwd_event_log(
    `device_id` string,
    `uid` string,
    `app_v` string,
    `os_type` string,
    `event_type` string,
    `language` string,
    `channel` string,
    `area` string,
    `brand` string,
    `name` string,
    `event_json` string,
    `report_time` string)
    PARTITIONED BY (`dt` string)
    stored as parquet;
    -- 与广告点击明细
    drop table if exists dwd.dwd_ad;
    CREATE TABLE dwd.dwd_ad(
    `device_id` string,
    `uid` string,
    `app_v` string,
    `os_type` string,
    `event_type` string,
    `language` string,
    `channel` string,
    `area` string,
    `brand` string,
    `report_time` string,
    `duration` int,
    `ad_action` int,
    `shop_id` int,
    `ad_type` int,
    `show_style` smallint,
    `product_id` int,
    `place` string,
    `sort` int,
    `hour` string
    )
    PARTITIONED BY (`dt` string)
    stored as parquet;
    

    4.2事件json串解析

    使用UDF处理jsonArray
    第三部分 电商分析之—会员活跃度 => 第4节 json数据处理
    package cn.lagou.dw.hive.udf;
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONArray;
    import com.alibaba.fastjson.JSONException;
    import com.alibaba.fastjson.JSONObject;
    import com.google.common.base.Strings;
    import org.apache.hadoop.hive.ql.exec.UDF;
    import org.junit.Test;
    import java.util.ArrayList;
    public class ParseJsonArray extends UDF {
    public ArrayList<String> evaluate(String jsonStr) {
    // 传入空字符串,返回null
    if (Strings.isNullOrEmpty(jsonStr)){
    return null;
    }
    try{
    // 获取jsonArray
    JSONArray jsonArray = JSON.parseArray(jsonStr);
    ArrayList<String> lst = new ArrayList<>();
    for(Object o: jsonArray) {
    lst.add(o.toString());
    }
    return lst;
    }catch (JSONException e){
    return null;
    }
    }
    @Test
    public void JunitParseJsonArray() {
    String jsonStr = "
    [{\"name\":\"goods_detail_loading\",\"json\":
    {\"entry\":\"1\",\"goodsid\":\"0\",\"loading_time\":\"93\",\"acti
    on\":\"3\",\"staytime\":\"56\",\"showtype\":\"2\"},\"time\":15963
    43881690},{\"name\":\"loading\",\"json\":
    {\"loading_time\":\"15\",\"action\":\"3\",\"loading_type\":\"3\",
    \"type\":\"1\"},\"time\":1596356988428},
    {\"name\":\"notification\",\"json\":
    {\"action\":\"1\",\"type\":\"2\"},\"time\":1596374167278},
    {\"name\":\"favorites\",\"json\":
    {\"course_id\":1,\"id\":0,\"userid\":0},\"time\":1596350933962}]"
    ;
    ArrayList<String> result = evaluate(jsonStr);
    System.out.println(result.size());
    System.out.println(JSON.toJSONString(result));
    }
    }
    

    4.3DWD层数据加载

    /data/lagoudw/script/advertisement/dwd_load_event_log.sh
    #!/bin/bash
    source /etc/profile
    if [ -n "$1" ] ;then
    do_date=$1
    else
    do_date=`date -d "-1 day" +%F`
    fi
    sql="
    use dwd;
    add jar /data/lagoudw/jars/model3_dw-1.0-SNAPSHOT-jar-with-dependencies.jar;
    create temporary function json_array as 'cn.clzy.dw.hive.udf.ParseJsonArray';
    with
    tmp_start as
    (
    select split(str, ' ')[7] as line
    from ods.ods_log_event
    where dt='$do_date'
    )
    insert overwrite table dwd.dwd_event_log
    PARTITION (dt='$do_date')
    select
    device_id,
    uid,
    app_v,
    os_type,
    event_type,
    language,
    channel,
    area,
    brand,
    get_json_object(k,'$.name') as name,
    get_json_object(k,'$.json') as json,
    get_json_object(k,'$.time') as time
    from
    (
    select
    get_json_object(line,'$.attr.device_id') as device_id,
    get_json_object(line,'$.attr.uid') as uid,
    get_json_object(line,'$.attr.app_v') as app_v,
    get_json_object(line,'$.attr.os_type') as os_type,
    get_json_object(line,'$.attr.event_type') as event_type,
    get_json_object(line,'$.attr.language') as language,
    get_json_object(line,'$.attr.channel') as channel,
    get_json_object(line,'$.attr.area') as area,
    get_json_object(line,'$.attr.brand') as brand,
    get_json_object(line,'$.lagou_event') as lagou_event
    from tmp_start
    ) A lateral view explode(json_array(lagou_event)) B as k
    "
    hive -e "$sql"
    
    从全部的事件日志获取广告点击事件:
    /data/lagoudw/script/advertisement/dwd_load_ad_log.sh
    #!/bin/bash
    source /etc/profile
    if [ -n "$1" ] ;then
    do_date=$1
    else
    do_date=`date -d "-1 day" +%F`
    fi
    sql="
    insert overwrite table dwd.dwd_ad
    PARTITION (dt='$do_date')
    select
    device_id,
    uid,
    app_v,
    os_type,
    event_type,
    language,
    channel,
    area,
    brand,
    report_time,
    get_json_object(event_json,'$.duration') ,
    get_json_object(event_json,'$.ad_action') ,
    get_json_object(event_json,'$.shop_id') ,
    get_json_object(event_json,'$.ad_type'),
    get_json_object(event_json,'$.show_style'),
    get_json_object(event_json,'$.product_id'),
    get_json_object(event_json,'$.place'),
    get_json_object(event_json,'$.sort'),
    from_unixtime(ceil(report_time/1000), 'HH')
    from dwd.dwd_event_log
    where dt='$do_date' and name='ad';
    "
    hive -e "$sql"
    
    日志 => Flume => ODS => 清洗、转换 => 广告事件详细信息

    5.广告点击次数分析

    5.1需求分析

    广告:ad
    action。用户行为;0 曝光;1 曝光后点击;2 购买
    duration。停留时长
    shop_id。商家id
    event_type。”ad”
    ad_type。格式类型;1 JPG;2 PNG;3 GIF;4 SWF
    show_style。显示风格,0 静态图;1 动态图
    product_id。产品id
    place。广告位置;首页=1,左侧=2,右侧=3,列表页=4
    sort。排序位置
    公共字段
    分时统计:
    曝光次数、不同用户id数(公共信息中的uid)、不同用户数(公共信息中的device_id)
    点击次数、不同用户id数、不同用户数(device_id)
    购买次数、不同用户id数、不同用户数(device_id)
    DWD => DWS(不需要) => ADS;在某个分析中不是所有的层都会用到

    5.2创建ADS层

    drop table if exists ads.ads_ad_show;
    create table ads.ads_ad_show(
    cnt bigint,
    u_cnt bigint,
    device_cnt bigint,
    ad_action tinyint,
    hour string
    ) PARTITIONED BY (`dt` string)
    row format delimited fields terminated by ',';
    

    5.3加载ADS层数据

    /data/lagoudw/script/advertisement/ads_load_ad_show.sh
    #!/bin/bash
    source /etc/profile
    if [ -n "$1" ] ;then
    do_date=$1
    else
    do_date=`date -d "-1 day" +%F`
    fi
    sql="
    insert overwrite table ads.ads_ad_show
    partition (dt='$do_date')
    select count(1),
    count(distinct uid),
    count(distinct device_id),
    ad_action,
    hour
    from dwd.dwd_ad
    where dt='$do_date'
    group by ad_action, hour
    "
    hive -e "$sql"
    

    6.漏斗分析(点击率购买率)

    6.1需求分析

    分时统计:
    点击率 = 点击次数 / 曝光次数
    购买率 = 购买次数 / 点击次数

    6.2创建ADS层表

    drop table if exists ads.ads_ad_show_rate;
    create table ads.ads_ad_show_rate(
    hour string,
    click_rate double,
    buy_rate double
    ) PARTITIONED BY (`dt` string)
    row format delimited fields terminated by ',';
    
    image.png
    列转行
    -- 方法一
    select sum(case when ad_action='0' then cnt end) show_cnt,
    sum(case when ad_action='1' then cnt end) click_cnt,
    sum(case when ad_action='2' then cnt end) buy_cnt,
    hour
    from ads.ads_ad_show
    where dt='2020-08-02' and hour='01'
    group by hour ;
    -- 方法二
    select max(case when ad_action='0' then cnt end) show_cnt,
    max(case when ad_action='1' then cnt end) click_cnt,
    max(case when ad_action='2' then cnt end) buy_cnt,
    hour
    from ads.ads_ad_show
    where dt='2020-08-02' and hour='01'
    group by hour ;
    

    6.2加载ADS层数据

    /data/lagoudw/script/advertisement/ads_load_ad_show_rate.sh
    #!/bin/bash
    source /etc/profile
    if [ -n "$1" ] ;then
    do_date=$1
    else
    do_date=`date -d "-1 day" +%F`
    fi
    sql="
    with tmp as(
    select max(case when ad_action='0' then cnt end) show_cnt,
    max(case when ad_action='1' then cnt end) click_cnt,
    max(case when ad_action='2' then cnt end) buy_cnt,
    hour
    from ads.ads_ad_show
    where dt='$do_date'
    group by hour
    )
    insert overwrite table ads.ads_ad_show_rate
    partition (dt='$do_date')
    select hour,
    click_cnt / show_cnt as click_rate,
    buy_cnt / click_cnt as buy_rate
    from tmp;
    "
    hive -e "$sql"
    

    7.曝光次数TopN分析

    7.1需求分析

    活动曝光效果评估:
    行为(曝光、点击、购买)、时间段、广告位、商品,统计对应的次数
    时间段、广告位、商品,曝光次数最多的前100个

    7.2创建ADS层表

    drop table if exists ads.ads_ad_show_place;
    create table ads.ads_ad_show_place(
    ad_action tinyint,
    hour string,
    place string,
    product_id int,
    cnt bigint
    )PARTITIONED BY (`dt` string)
    row format delimited fields terminated by ',';
    drop table if exists ads.ads_ad_show_place_window;
    create table ads.ads_ad_show_place_window(
    hour string,
    place string,
    product_id int,
    cnt bigint,
    rank int
    )PARTITIONED BY (`dt` string)
    row format delimited fields terminated by ',';
    

    7.3加载ADS层数据

    /data/lagoudw/script/advertisement/ads_load_ad_show_page.sh
    #!/bin/bash
    source /etc/profile
    if [ -n "$1" ] ;then
    do_date=$1
    else
    do_date=`date -d "-1 day" +%F`
    fi
    sql="
    insert overwrite table ads.ads_ad_show_place
    partition (dt='$do_date')
    select ad_action,
    hour,
    place,
    product_id,
    count(1)
    from dwd.dwd_ad
    where dt='$do_date'
    group by ad_action, hour, place, product_id;
    "
    hive -e "$sql"
    
    /data/lagoudw/script/advertisement/ads_load_ad_show_page_window.sh
    #!/bin/bash
    source /etc/profile
    if [ -n "$1" ] ;then
    do_date=$1
    else
    do_date=`date -d "-1 day" +%F`
    fi
    sql="
    insert overwrite table ads.ads_ad_show_place_window
    partition (dt='$do_date')
    select *
    from (
    select hour,
    place,
    product_id,
    cnt,
    row_number() over (partition by hour, place,
    product_id order by cnt desc) rank
    from ads.ads_ad_show_place
    where dt='$do_date' and ad_action='0'
    ) t
    where rank <= 100
    "
    hive -e "$sql"
    

    8.广告分析小结

    image.png
    脚本调用次序: ```sql ods_load_event_log.sh dwd_load_event_log.sh dwd_load_ad_log.sh

ads_load_ad_show.sh ads_load_ad_show_rate.sh ads_load_ad_show_page.sh ads_load_ad_show_page_window.sh

<a name="R3SIi"></a>
## 9.ADS层数据导出(DataX)
步骤:<br />在MySQL创建对应的表<br />创建配置文件(json)<br />执行命令,使用json配置文件;测试<br />编写执行脚本(shell)<br />shell脚本的测试<br />1.MySQL建表
```sql
drop table if exists dwads.ads_ad_show_place;
create table dwads.ads_ad_show_place(
ad_action tinyint,
hour varchar(2),
place varchar(20),
product_id int,
cnt int,
dt varchar(10)
);

2.创建配置文件
/data/lagoudw/script/advertisement/ads_ad_show_place.json

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      }
    },
    "content": [
      {
        "reader": {
          "name": "hdfsreader",
          "parameter": {
            "path": "/user/hive/warehouse/ads.db/ads_ad_show_place/dt=$do_date/*",
            "defaultFS": "hdfs://linux121:9000",
            "column": [
              {
                "index": 0,
                "type": "string"
              },
              {
                "index": 1,
                "type": "string"
              },
              {
                "index": 2,
                "type": "string"
              },
              {
                "index": 3,
                "type": "string"
              },
              {
                "index": 4,
                "type": "string"
              },
              {
                "type": "string",
                "value": "$do_date"
              }
            ],
            "fileType": "text",
            "encoding": "UTF-8",
            "fieldDelimiter": ","
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "writeMode": "insert",
            "username": "hive",
            "password": "12345678",
            "column": [
              "ad_action",
              "hour",
              "place",
              "product_id",
              "cnt",
              "dt"
            ],
            "preSql": [
              "delete from ads_ad_show_place where dt='$do_date'"
            ],
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://linux123:3306/dwads?useUnicode=true&characterEncoding=utf-8",
                "table": [
                  "ads_ad_show_place"
                ]
              }
            ]
          }
        }
      }
    ]
  }
}

3.执行命令

python /data/modules/datax/bin/datax.py -p "-Ddo_date=2020-07-21"
/data/lagoudw/script/advertisement/ads_ad_show_place.json

4.编写脚本
/data/lagoudw/script/advertisement/ads_ad_show_place.sh

#!/bin/bash
source /etc/profile
JSON=/data/lagoudw/script
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
python $DATAX_HOME/bin/datax.py -p "-Ddo_date=$do_date"
$JSON/advertisement/ads_ad_show_place.json

5.执行脚本
sh /data/lagoudw/script/advertisement/ads_ad_show_place.sh 2020-07-21

第五部分 附录

1.DataX入门

1.1DataX概述

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。
DataX作为中间传输载体负责连接各种数据源.
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
image.png
Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework;
Writer: 数据写入模块,负责不断向Framework取数据,并将数据写入到目的端;
Framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
核心模块
1.DataX完成单个数据同步的作业,称为Job。
2.DataX Job启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将Task组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5
4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。

1.2 DataX使用案例

Reader插件和Writer插件
DataX3.0版本提供

"reader": {
"name": "mysqlreader", #从mysql数据库获取数据(也支持sqlserverreader,oraclereader)
"name": "txtfilereader", #从本地获取数据
"name": "hdfsreader", #从hdfs文件、hive表获取数据
"name": "streamreader", #从stream流获取数据(常用于测试)
"name": "httpreader", #从http URL获取数据
}
"writer": {
"name":"hdfswriter", #向hdfs,hive表写入数据
"name":"mysqlwriter ", #向mysql写入数据(也支持sqlserverwriter,oraclewriter)
"name":"streamwriter ", #向stream流写入数据。(常用于测试)
}

各种Reader插件、Writer插件的参考文档:https://github.com/alibaba/DataX
json配置文件模板

  • 整个配置文件是一个job的描述;
  • job下面有两个配置项,content和setting,其中content用来描述该任务的源和目的端的信息,setting用来描述任务本身的信息;
  • content又分为两部分,reader和writer,分别用来描述源端和目的端的信息;
  • setting中的speed项表示同时起几个并发执行该任务;
  • job.setting.speed(流量控制)

    Job支持用户对速度的自定义控制,channel的值可以控制同步时的并发数,byte的值可以控制同步时的速度。

  • job.setting.errorLimit(脏数据控制)

Job支持用户对于脏数据的自定义监控和告警,包括对脏数据最大记录数阈值(record值)或者脏数据占比阈值 (percentage值),当Job传输过程出现的脏数据大于用户指定的数量/百分比,DataX Job报错退出。
执行命令

python $DATAX_HOME/bin/datax.py /data/lagoudw/json/stream2stream.json

2.Tez

Hortonworks在2014年左右发布了Stinger Initiative,引入新的runtime框架——Tez,消除Hive的延迟以及吞吐量限制。Tez通过消除不必要的task、障碍同步和对HDFS的读写作业来优化Hive job;

2.1Tez概述

Tez是Apache开源的支持DAG(有向无环图)作业的计算框架,是支持Hadoop 2.x的重要引擎。
image.png
Tez将Map task和Reduce task进一步拆分为:
image.png
Tez的task由Input、processor、output阶段组成,可以表达所有复杂的map和reduce操作
image.png
Tez可以将多个有依赖的作业转换为一个作业(只需写一次HDFS,中间环节较少),从而大大提升DAG作业的性能。Tez已被Hortonworks用于Hive引擎的优化,经测试一般小任务比Hive MR 的2-3倍速度左右,大任务7-10倍左右,根据情况不同可能不一样。

2.2设置

hive命令行设置Tez执行(命令行窗口关闭不再生效)
hive> set hive.execution.engine=tez;
默认使用Tez:
在$HIVE_HOME/conf目录下hive-site.xml 中增加

<property>
<name>hive.execution.engine</name>
<value>tez</value>
</property>