第一章.项目需求一:日活统计

1.创建子模块(gmall-realtime)

该模块为实时处理模块,主要负责对采集到的数据进行实时处理

一.pom.xml

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-core_2.12</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.spark</groupId>
  8. <artifactId>spark-streaming_2.12</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.apache.spark</groupId>
  12. <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  13. </dependency>
  14. <!-- 使用java连接redis -->
  15. <dependency>
  16. <groupId>redis.clients</groupId>
  17. <artifactId>jedis</artifactId>
  18. <version>2.9.0</version>
  19. </dependency>
  20. <dependency>
  21. <artifactId>gmall-common</artifactId>
  22. <groupId>org.example</groupId>
  23. <version>1.0-SNAPSHOT</version>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.apache.phoenix</groupId>
  27. <artifactId>phoenix-spark</artifactId>
  28. <version>5.0.0-HBase-2.0</version>
  29. <exclusions>
  30. <exclusion>
  31. <groupId>org.glassfish</groupId>
  32. <artifactId>javax.el</artifactId>
  33. </exclusion>
  34. </exclusions>
  35. </dependency>
  36. <dependency>
  37. <groupId>org.apache.spark</groupId>
  38. <artifactId>spark-sql_2.12</artifactId>
  39. </dependency>
  40. <dependency>
  41. <groupId>commons-httpclient</groupId>
  42. <artifactId>commons-httpclient</artifactId>
  43. <version>3.1</version>
  44. </dependency>
  45. </dependencies>
  46. <build>
  47. <plugins>
  48. <!-- 该插件用于将Scala代码编译成class文件 -->
  49. <plugin>
  50. <groupId>net.alchim31.maven</groupId>
  51. <artifactId>scala-maven-plugin</artifactId>
  52. <version>3.2.0</version>
  53. <executions>
  54. <execution>
  55. <!-- 声明绑定到maven的compile阶段 -->
  56. <goals>
  57. <goal>compile</goal>
  58. <goal>testCompile</goal>
  59. </goals>
  60. </execution>
  61. </executions>
  62. </plugin>
  63. </plugins>
  64. </build>

二.配置文件

  1. config.properties
  1. # Kafka配置
  2. kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092
  3. # Redis配置
  4. redis.host=hadoop102
  5. redis.port=6379
  1. log4j.properties
  1. #定义了名为atguigu.MyConsole的一个ConsoleAppender 向控制台输出日志,红色打印,
  2. log4j.appender.atguigu.MyConsole=org.apache.log4j.ConsoleAppender
  3. log4j.appender.atguigu.MyConsole.target=System.err
  4. log4j.appender.atguigu.MyConsole.layout=org.apache.log4j.PatternLayout
  5. # 年-月-日 时:分:秒 10个占位符 日志级别 (全类名:方法名) - 消息 换行
  6. log4j.appender.atguigu.MyConsole.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n
  7. #代表指定哪个类的logger使用什么级别和appender进行日志输出 全类名可能需要修改
  8. log4j.rootLogger=error,atguigu.MyConsole

三.工具类

  • 消费kafka中的数据
  • 利用Redis过滤当日已经计入的日活设备
  • 把每批次新增的当日的日活信息保存在Hbase中
  • 从HBase中查询出数据,发布成数据接口,通过可视化工程调用
  1. PropertiesUtil.scala
  1. package com.atguigu.realtime.utils
  2. import java.io.InputStreamReader
  3. import java.util.Properties
  4. // 加载类路径的一个properties文件,只需要调用load方法,自动将properties文件读取,封装一个Properties返回
  5. object PropertiesUtil {
  6. def load(propertieName:String): Properties ={
  7. val prop=new Properties()
  8. prop.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertieName) , "UTF-8"))
  9. prop
  10. }
  11. }
  1. MyKafkaUtil.scala
  1. package com.atguigu.realtime.utils
  2. import java.util.Properties
  3. import org.apache.kafka.clients.consumer.ConsumerRecord
  4. import org.apache.kafka.common.TopicPartition
  5. import org.apache.kafka.common.serialization.StringDeserializer
  6. import org.apache.spark.streaming.StreamingContext
  7. import org.apache.spark.streaming.dstream.InputDStream
  8. import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
  9. object MyKafkaUtil {
  10. // 读取配置文件中的信息
  11. private val properties: Properties = PropertiesUtil.load("config.properties")
  12. // kafka集群地址
  13. val broker_list: String = properties.getProperty("kafka.broker.list")
  14. def getKafkaStream(topics: Array[String], ssc: StreamingContext, groupId:String, saveOffsetToMysql:Boolean = false ,
  15. offsetsMap: Map[TopicPartition, Long] =null , ifAutoCommit:String ="false" ): InputDStream[ConsumerRecord[String, String]] = {
  16. //kafka消费者配置
  17. val kafkaParam = Map(
  18. "bootstrap.servers" -> broker_list,
  19. "key.deserializer" -> classOf[StringDeserializer],
  20. "value.deserializer" -> classOf[StringDeserializer],
  21. "group.id" -> groupId,
  22. "auto.offset.reset" -> "earliest",
  23. "enable.auto.commit" -> ifAutoCommit
  24. )
  25. var ds: InputDStream[ConsumerRecord[String, String]] =null;
  26. if (saveOffsetToMysql){
  27. ds = KafkaUtils.createDirectStream[String, String](
  28. ssc,
  29. LocationStrategies.PreferConsistent,
  30. ConsumerStrategies.Subscribe[String, String](topics, kafkaParam,offsetsMap)
  31. )
  32. }else{
  33. ds = KafkaUtils.createDirectStream[String, String](
  34. ssc,
  35. LocationStrategies.PreferConsistent,
  36. ConsumerStrategies.Subscribe[String, String](topics, kafkaParam))
  37. }
  38. ds
  39. }
  40. }
  1. RedisUtil
  1. package com.atguigu.realtime.utils
  2. import java.util.Properties
  3. import redis.clients.jedis.{Jedis}
  4. object RedisUtil {
  5. val config: Properties = PropertiesUtil.load("config.properties")
  6. val host: String = config.getProperty("redis.host")
  7. val port: String = config.getProperty("redis.port")
  8. def getJedisClient():Jedis={
  9. new Jedis(host,port.toInt)
  10. }
  11. }

四.DAU实现

  1. 流程图

Spark实时数仓(二) - 图1

  1. 设计redis的key,value | key | value | | —- | —- | | 当前批次日期(logDate) | 设备号(mid) |
  1. Phoenix建表
  1. create table gmall2021_startuplog
  2. (
  3. ar varchar,
  4. ba varchar,
  5. ch varchar,
  6. is_new Integer,
  7. md varchar,
  8. mid varchar,
  9. os varchar,
  10. uid BIGINT,
  11. vc varchar,
  12. entry varchar,
  13. loading_time Integer,
  14. open_ad_id Integer,
  15. open_ad_ms Integer,
  16. open_ad_skip_ms Integer,
  17. logDate varchar,
  18. logHour varchar,
  19. ts BIGINT
  20. CONSTRAINT dau_pk PRIMARY KEY (mid, logDate));
  1. 为了便于对数据进行解析,需要将数据封装为样例类(StartUpLog,StartLogoInfo)
  1. package com.atguigu.realtime.beans
  2. case class StartUpLog(
  3. //common
  4. var ar:String,
  5. var ba:String,
  6. var ch:String,
  7. var is_new:Int,
  8. var md:String,
  9. var mid:String,
  10. var os:String,
  11. var uid:Long,
  12. var vc:String,
  13. // start
  14. var entry:String,
  15. var loading_time:Int,
  16. var open_ad_id:Int,
  17. var open_ad_ms:Int,
  18. var open_ad_skip_ms:Int,
  19. //kafka里面没有,为了统计最终结果,额外设置的字段,需要从ts转换得到
  20. var logDate:String,
  21. var logHour:String,
  22. var ts:Long){
  23. def mergeStartInfo(startInfo:StartLogInfo):Unit={
  24. if (startInfo != null){
  25. this.entry = startInfo.entry
  26. this.loading_time = startInfo.loading_time
  27. this.open_ad_id = startInfo.open_ad_id
  28. this.open_ad_ms = startInfo.open_ad_ms
  29. this.open_ad_skip_ms = startInfo.open_ad_skip_ms
  30. }
  31. }
  32. }
  33. // 封装了 启动日志中 start:{}部分的属性,目的是为了方便展平
  34. case class StartLogInfo(
  35. entry:String,
  36. loading_time:Int,
  37. open_ad_id:Int,
  38. open_ad_ms:Int,
  39. open_ad_skip_ms:Int
  40. )
  1. 将每个需求实现的公共部分封装为一个抽象类(BaseApp)
  1. package com.atguigu.realtime.app
  2. import org.apache.spark.streaming.StreamingContext
  3. //该类是所有实时App程序继承的父类
  4. abstract class BaseApp {
  5. //抽象属性(只声明未赋值),让子类实现的
  6. var appName:String
  7. var batchDuration:Int
  8. var ssc:StreamingContext = null
  9. def runApp(op: =>Unit){
  10. try{
  11. //一堆代码
  12. op
  13. ssc.start()
  14. ssc.awaitTermination()
  15. }catch{
  16. case e:Exception =>
  17. e.printStackTrace()
  18. throw new RuntimeException("运行失败")
  19. }
  20. }
  21. }
  1. 日活统计程序(DAUApp)
  1. package com.atguigu.realtime.app
  2. import com.alibaba.fastjson.JSON
  3. import com.atguigu.constants.MyConstants
  4. import com.atguigu.realtime.beans.{StartLogInfo, StartUpLog}
  5. import com.atguigu.realtime.utils.{MyKafkaUtil, RedisUtil}
  6. import org.apache.hadoop.hbase.HBaseConfiguration
  7. import org.apache.kafka.clients.consumer.ConsumerRecord
  8. import org.apache.spark.SparkConf
  9. import org.apache.spark.rdd.RDD
  10. import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges}
  11. import org.apache.spark.streaming.{Seconds, StreamingContext}
  12. import java.time.{Instant, LocalDateTime, ZoneId}
  13. import java.time.format.DateTimeFormatter
  14. import scala.Seq
  15. //导入Phoenix提供的静态方法
  16. import org.apache.phoenix.spark._
  17. /**
  18. * 精确一次性: at least once(offsets维护到kafka) + hbase(幂等性输出)
  19. */
  20. object DAUApp extends BaseApp {
  21. override var appName: String = "DAUApp"
  22. override var batchDuration: Int = 10
  23. val groupId = "gmallrealtime"
  24. /**
  25. * 将rdd封装为样例类
  26. * @param rdd
  27. * @return
  28. */
  29. def parseRecordToBean(rdd: RDD[ConsumerRecord[String, String]]):RDD[StartUpLog] = {
  30. rdd.map(record => {
  31. //将record.value()封装为json对象
  32. val jSONObject = JSON.parseObject(record.value())
  33. //封装common部分压平的结果
  34. val startUpLog = JSON.parseObject(jSONObject.getString("common"), classOf[StartUpLog])
  35. //封装start部分并合并
  36. val startLogInfo = JSON.parseObject(jSONObject.getString("start"), classOf[StartLogInfo])
  37. startUpLog.mergeStartInfo(startLogInfo)
  38. //合并ts
  39. startUpLog.ts = jSONObject.getLong("ts")
  40. //获取当前日志的日期和小时
  41. val formatter1 = DateTimeFormatter.ofPattern("yyyy-MM-dd")
  42. val formatter2 = DateTimeFormatter.ofPattern("HH")
  43. val time = LocalDateTime.ofInstant(Instant.ofEpochMilli(startUpLog.ts), ZoneId.of("Asia/Shanghai"))
  44. startUpLog.logDate = time.format(formatter1)
  45. startUpLog.logHour = time.format(formatter2)
  46. startUpLog
  47. })
  48. }
  49. /**
  50. * 同批次去重(根据当前批次的日期和设备号去重)
  51. * @param rdd
  52. * @return
  53. */
  54. def removeDuplicateLogsInCommonBatch(rdd: RDD[StartUpLog]):RDD[StartUpLog] = {
  55. val rdd1 = rdd.map(log => {
  56. ((log.logDate, log.mid), log)
  57. })
  58. val rdd2 = rdd1.groupByKey()
  59. //只对value进行运算,可以使用mapValues
  60. val rdd3 = rdd2.mapValues({
  61. _.toList.sortBy(_.ts).take(1)
  62. })
  63. val rdd4 = rdd3.map(date => {
  64. ((date._1, date._2.head))
  65. })
  66. rdd4.values
  67. }
  68. /**
  69. * 跨批次过滤
  70. * @param rdd
  71. * @return
  72. */
  73. def removeDuplicateLogsInDiffBatch(rdd: RDD[StartUpLog]):RDD[StartUpLog] = {
  74. //连接数据库,都是以分区为单位获取连接
  75. rdd.mapPartitions(partition=>{
  76. //获取连接
  77. val jedis = RedisUtil.getJedisClient()
  78. //判断这条日志的mid和logdate是否在set集合中已经存在
  79. val filterdPartition = partition.filter(log => !jedis.sismember("DAU:" + log.logDate, log.mid))
  80. //关闭
  81. jedis.close()
  82. //返回处理后的分区
  83. filterdPartition
  84. })
  85. }
  86. def main(args: Array[String]): Unit = {
  87. ssc = new StreamingContext(new SparkConf().setMaster("local[4]").setAppName(appName),Seconds(batchDuration))
  88. runApp{
  89. //1.获取初始ds
  90. val ds = MyKafkaUtil.getKafkaStream(Array(MyConstants.STARTUP_LOG),ssc,
  91. groupId)
  92. ds.foreachRDD(rdd => {
  93. //真正从kafka消费到数据后,再执行以下运算,防止job每次数据提交,但是并无数据计算!
  94. if (!rdd.isEmpty()){
  95. //2.获取偏移量
  96. val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  97. //3.封装为样例类
  98. val rdd1 = parseRecordToBean(rdd)
  99. println("当前这批数据消费到:" + rdd1.count())
  100. //4.同批次去重
  101. val rdd2 = removeDuplicateLogsInCommonBatch(rdd1)
  102. println("同批次去重后: " + rdd2.count())
  103. //5.跨批次过滤
  104. val rdd3 = removeDuplicateLogsInDiffBatch(rdd2)
  105. rdd3.cache()
  106. println("跨批次过滤后: " + rdd3.count())
  107. //6.写入hbase
  108. rdd3.saveToPhoenix("GMALL2021_STARTUPLOG",
  109. Seq("AR", "BA", "CH", "IS_NEW", "MD", "MID", "OS", "UID", "VC", "ENTRY", "LOADING_TIME","OPEN_AD_ID","OPEN_AD_MS","OPEN_AD_SKIP_MS","LOGDATE","LOGHOUR","TS"),
  110. HBaseConfiguration.create(),
  111. Some("hadoop102:2181"))
  112. //7.将写入hbase完成的mid,logdate 记录到redis
  113. rdd3.foreachPartition(partition=>{
  114. val jedis = RedisUtil.getJedisClient()
  115. partition.foreach(log=>{
  116. jedis.sadd("DAU:"+log.logDate,log.mid)
  117. //设置key的过期时间,设置24小时过期
  118. jedis.expire("DAU:"+log.logDate,60*60*24)
  119. })
  120. jedis.close()
  121. })
  122. //8.提交偏移量
  123. ds.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  124. }
  125. })
  126. }
  127. }
  128. }
  1. 将数据写入数据库并测试

分别启动gmall-logger,nginx,zookeeper,kafka,hadoop,hbase,redis,phoenix

生产数据到kafka()

Spark实时数仓(二) - 图2

启动APP

Spark实时数仓(二) - 图3

分别在redis和Phoenix客户端验证

Spark实时数仓(二) - 图4

Spark实时数仓(二) - 图5

2.创建子模块(gmall-publisher)

该模块为日活数据查询接口,是一个springboot工程,主要负责对实时处理后的数据进行查询

一.pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <artifactId>spark-gmall</artifactId>
  7. <groupId>org.example</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <artifactId>gmall-publisher</artifactId>
  11. <version>0.0.1-SNAPSHOT</version>
  12. <name>gmall-publisher</name>
  13. <description>Demo project for Spring Boot</description>
  14. <properties>
  15. <java.version>1.8</java.version>
  16. </properties>
  17. <dependencies>
  18. <dependency>
  19. <groupId>org.springframework.boot</groupId>
  20. <artifactId>spring-boot-starter-web</artifactId>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.mybatis.spring.boot</groupId>
  24. <artifactId>mybatis-spring-boot-starter</artifactId>
  25. <version>2.2.0</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.projectlombok</groupId>
  29. <artifactId>lombok</artifactId>
  30. <optional>true</optional>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.springframework.boot</groupId>
  34. <artifactId>spring-boot-starter-test</artifactId>
  35. <scope>test</scope>
  36. </dependency>
  37. <dependency>
  38. <artifactId>gmall-common</artifactId>
  39. <groupId>org.example</groupId>
  40. <version>1.0-SNAPSHOT</version>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.apache.phoenix</groupId>
  44. <artifactId>phoenix-core</artifactId>
  45. <version>5.0.0-HBase-2.0</version>
  46. <exclusions>
  47. <exclusion>
  48. <groupId>org.glassfish</groupId>
  49. <artifactId>javax.el</artifactId>
  50. </exclusion>
  51. </exclusions>
  52. </dependency>
  53. <!-- google提供的java开发的工具包,在很多框架中,都引入了guava-->
  54. <dependency>
  55. <groupId>com.google.guava</groupId>
  56. <artifactId>guava</artifactId>
  57. <version>20.0</version>
  58. </dependency>
  59. <!--java.lang.NoSuchMethodError: org.apache.hadoop.security.authentication.util.KerberosUtil.hasKerberosKeyTab-->
  60. <dependency>
  61. <groupId>org.apache.hadoop</groupId>
  62. <artifactId>hadoop-common</artifactId>
  63. <version>2.7.2</version>
  64. </dependency>
  65. <!-- 异构(不同架构 mysql hbase)数据源的切换 -->
  66. <dependency>
  67. <groupId>com.baomidou</groupId>
  68. <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
  69. <version>2.4.2</version>
  70. </dependency>
  71. <dependency>
  72. <groupId>mysql</groupId>
  73. <artifactId>mysql-connector-java</artifactId>
  74. </dependency>
  75. </dependencies>
  76. <build>
  77. <plugins>
  78. <plugin>
  79. <groupId>org.springframework.boot</groupId>
  80. <artifactId>spring-boot-maven-plugin</artifactId>
  81. <version>1.5.10.RELEASE</version>
  82. <configuration>
  83. <excludes>
  84. <exclude>
  85. <groupId>org.projectlombok</groupId>
  86. <artifactId>lombok</artifactId>
  87. </exclude>
  88. </excludes>
  89. </configuration>
  90. </plugin>
  91. </plugins>
  92. </build>
  93. </project>

二.配置文件

  1. application.properties
  1. #绑定8070端口,没写项目名 ,使用http://主机名:8070/
  2. server.port=8070
  3. #默认使用springboot提供的logger,记录级别为error
  4. logging.level.root=error
  5. #mybatis
  6. mybatis.mapperLocations=classpath:mybatis/*.xml
  7. mybatis.configuration.map-underscore-to-camel-case=true
  1. application.yml
  1. spring:
  2. datasource:
  3. dynamic:
  4. primary: hbase #设置默认的数据源或者数据源组,默认值即为master
  5. strict: false #严格匹配数据源,默认false. true未匹配到指定数据源时抛异常,false使用默认数据源
  6. datasource:
  7. hbase:
  8. url: jdbc:phoenix:hadoop102,hadoop103,hadoop104:2181
  9. username:
  10. password:
  11. driver-class-name: org.apache.phoenix.jdbc.PhoenixDriver
  12. mysql:
  13. url: jdbc:mysql://hadoop103:3306/sparkproduct
  14. username: root
  15. password: "321074"
  16. driver-class-name: com.mysql.jdbc.Driver

三.业务代码实现

  1. pojo层(DAUPerHour)
  1. package com.example.gmallpublisher.pojo;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. @NoArgsConstructor
  6. @AllArgsConstructor
  7. @Data
  8. public class DAUPerHour {
  9. private String hour;
  10. private Integer dau;
  11. }
  1. mapper层(DAUMapper)
  1. package com.example.gmallpublisher.mapper;
  2. import com.baomidou.dynamic.datasource.annotation.DS;
  3. import com.example.gmallpublisher.pojo.DAUPerHour;
  4. import org.springframework.stereotype.Repository;
  5. import java.util.List;
  6. @Repository
  7. @DS("hbase")
  8. public interface DAUMapper {
  9. //查询单日日活
  10. Integer getDAUByDate(String date);
  11. //查询当日新增的设备数
  12. Integer getNewMidCountByDate(String date);
  13. //查询当日分时的设备数
  14. List<DAUPerHour> getDAUPerHourData(String date);
  15. }
  1. DAUMapper.xml
  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE mapper
  3. PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
  4. "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
  5. <!--namespace=绑定一个对应的Dao/Mapper接口-->
  6. <mapper namespace="com.example.gmallpublisher.mapper.DAUMapper">
  7. <!--查询当日日活-->
  8. <select id="getDAUByDate" resultType="int">
  9. select
  10. count(*)
  11. from GMALL2021_STARTUPLOG
  12. where logdate = #{logdate}
  13. </select>
  14. <!-- 查询当日新增的设备数-->
  15. <select id="getNewMidCountByDate" resultType="int">
  16. select count(*)
  17. from
  18. (select mid
  19. from GMALL2021_STARTUPLOG
  20. where logdate = #{logdate}) t1
  21. left join
  22. (select mid
  23. from GMALL2021_STARTUPLOG
  24. where logdate &lt; #{logdate}) t2
  25. on t1.mid = t2.mid
  26. where t2.mid is null
  27. </select>
  28. <!-- 查询当日分时的设备数 -->
  29. <select id="getDAUPerHourData" resultType="com.example.gmallpublisher.pojo.DAUPerHour">
  30. select
  31. loghour hour,
  32. count(*) dau
  33. from GMALL2021_STARTUPLOG
  34. where logdate = #{logdate}
  35. group by loghour
  36. </select>
  37. </mapper>
  1. service层(PublisherService,PublisherServiceImpl)
  1. package com.example.gmallpublisher.service;
  2. import com.example.gmallpublisher.pojo.DAUPerHour;
  3. import com.example.gmallpublisher.pojo.GMVPerHour;
  4. import java.util.List;
  5. public interface PublisherService {
  6. //查询单日日活
  7. Integer getDAUByDate(String date);
  8. //查询当日新增的设备数
  9. Integer getNewMidCountByDate(String date);
  10. //查询当日分时的设备数
  11. List<DAUPerHour> getDAUPerHourData(String date);
  12. }
  1. package com.example.gmallpublisher.service;
  2. import com.example.gmallpublisher.mapper.DAUMapper;
  3. import com.example.gmallpublisher.mapper.GMVMapper;
  4. import com.example.gmallpublisher.pojo.DAUPerHour;
  5. import com.example.gmallpublisher.pojo.GMVPerHour;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Service;
  8. import java.util.List;
  9. @Service
  10. public class PublisherServiceImpl implements PublisherService {
  11. @Autowired
  12. private DAUMapper dauMapper;
  13. @Override
  14. public Integer getDAUByDate(String date) {
  15. System.out.println("遵循的业务步骤");
  16. return dauMapper.getDAUByDate(date);
  17. }
  18. @Override
  19. public Integer getNewMidCountByDate(String date) {
  20. System.out.println("遵循的业务步骤");
  21. return dauMapper.getNewMidCountByDate(date);
  22. }
  23. @Override
  24. public List<DAUPerHour> getDAUPerHourData(String date) {
  25. System.out.println("遵循业务步骤");
  26. return dauMapper.getDAUPerHourData(date);
  27. }
  28. }
  1. controller层
  1. package com.example.gmallpublisher.controller;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.example.gmallpublisher.pojo.DAUPerHour;
  4. import com.example.gmallpublisher.pojo.GMVPerHour;
  5. import com.example.gmallpublisher.service.PublisherService;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. import java.time.LocalDate;
  10. import java.util.ArrayList;
  11. import java.util.List;
  12. @RestController
  13. public class PublisherController {
  14. @Autowired
  15. private PublisherService publisherService;
  16. /**
  17. * [
  18. * {"id":"dau","name":"当日日活数","value":1200},
  19. * {"id":"new_mid","name":"新增设备数","value":233}
  20. *
  21. * ]
  22. * @param date
  23. * @return
  24. */
  25. @RequestMapping(value="/realtime-total")
  26. public Object handle1(String date){
  27. //获取当日日活
  28. Integer dauByDate = publisherService.getDAUByDate(date);
  29. //获取当日新增设备数
  30. Integer newMidCountByDate = publisherService.getNewMidCountByDate(date);
  31. ArrayList<JSONObject> result = new ArrayList<>();
  32. JSONObject jsonObject1 = new JSONObject();
  33. JSONObject jsonObject2 = new JSONObject();
  34. /*JSONObject jsonObject3 = new JSONObject();*/
  35. jsonObject1.put("id","dau");
  36. jsonObject1.put("name","当日日活数");
  37. jsonObject1.put("value",dauByDate);
  38. jsonObject2.put("id","new_mid");
  39. jsonObject2.put("name","新增设备数");
  40. jsonObject2.put("value",newMidCountByDate);
  41. result.add(jsonObject1);
  42. result.add(jsonObject2);
  43. /*result.add(jsonObject3);*/
  44. return result;
  45. }
  46. /**
  47. *
  48. * {"yesterday":
  49. {
  50. "11":383,
  51. "12":123,
  52. "17":88,
  53. "19":200
  54. },
  55. "today":
  56. {"12":38,"13":1233,"17":123,"19":688 }
  57. }
  58. *
  59. * @param date
  60. * @return
  61. */
  62. @RequestMapping(value="realtime-hours")
  63. public Object handle2(String id,String date){
  64. JSONObject result = new JSONObject();
  65. //根据今天的日期计算昨天的日期
  66. String yestodayDate = LocalDate.parse(date).minusDays(1).toString();
  67. if("dau".equals(id)){
  68. List<DAUPerHour> todayData = publisherService.getDAUPerHourData(date);
  69. List<DAUPerHour> yestodayData = publisherService.getDAUPerHourData(yestodayDate);
  70. result.put("yesterday",parseDAUDate(yestodayData));
  71. result.put("today",parseDAUDate(todayData));
  72. }
  73. return result;
  74. }
  75. public JSONObject parseDAUDate(List<DAUPerHour> data){
  76. JSONObject result = new JSONObject();
  77. for (DAUPerHour dauPerHour : data) {
  78. result.put(dauPerHour.getHour(),dauPerHour.getDau());
  79. }
  80. return result;
  81. }
  82. }
  1. 主程序
  1. package com.example.gmallpublisher;
  2. import org.mybatis.spring.annotation.MapperScan;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. @SpringBootApplication
  6. @MapperScan(basePackages = "com.example.gmallpublisher.mapper")
  7. public class GmallPublisherApplication {
  8. public static void main(String[] args) {
  9. SpringApplication.run(GmallPublisherApplication.class, args);
  10. }
  11. }
  1. index.html
  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>首页</title>
  6. </head>
  7. <body>
  8. <a href=" http://localhost:8070/realtime-total?date=2021-10-07">请求今天DAU</a> <br>
  9. <a href=" http://localhost:8070/realtime-hours?id=dau&date=2021-10-07">请求今天和昨天分时DAU</a>
  10. <br/>
  11. </body>
  12. </html>

四.运行程序

浏览器访问: http://localhost:8070/index.html

Spark实时数仓(二) - 图6

  1. 请求今天DAU

Spark实时数仓(二) - 图7

  1. 请求今天和昨天分时DAU

Spark实时数仓(二) - 图8

第二章.项目需求二:交易额统计

数据处理流程图

Spark实时数仓(二) - 图9

1.业务采集数据

交易额统计需要采集业务数据,业务数据一般存储在关系型数据库mysql中,此时可以使用Canal来实时采集业务数据到kafka

Canal的工作原理:将自己伪装成Slave(从数据库),假装从Master(主数据库)复制数据

一.MySql的准备

  1. 使用存储过程模拟数据
  1. call `init_data`('2021-10-06',10,3,FALSE);
  2. call `init_data`('2021-10-07',10,3,FALSE);
  1. 开启Binlog

MYSQL需要让主机把写操作记录到日志中,故需要开启binlog

在安装mysql的主机(hadoop103)上编辑配置文件(/etc/my.cnf),在[mysqld]下面添加

  1. log-bin=mysql-bin
  2. server-id=3
  3. binlog_format=row
  4. binlog-do-db=sparkproduct

修改完之后重启mysql

  1. sudo systemctl restart mysqld

二.安装Canal

  1. 下载Canal

下载地址: https://github.com/alibaba/canal/releases

  1. 将压缩包上传到Linux上并解压

注意: 解压前自己要先创建一个目录来存放解压后的Canal数据

  1. 修改canal.properties(全局配置文件)
  1. vim conf/canal.properties
  2. canal.ip = hadoop103
  1. 修改instance.properties (实例配置文件)
  1. vim conf/example/instance.properties
  2. canal.instance.mysql.slaveId=0
  3. # position info
  4. canal.instance.master.address=hadoop103:3306
  5. canal.instance.master.journal.name=mysql-bin.000001
  6. canal.instance.master.position=154
  1. 在mysql中执行以下语句(赋权限)
  1. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
  2. FLUSH PRIVILEGES;
  1. 启动canal
  1. [atguigu@hadoop103 canal]$ bin/startup.sh

2.创建子模块(gmall-canalclient)

该模块为canal客户端,订阅canalServer拉取到的数据

一.pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>spark-gmall</artifactId>
  7. <groupId>org.example</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>gmall-canalclient</artifactId>
  12. <properties>
  13. <maven.compiler.source>8</maven.compiler.source>
  14. <maven.compiler.target>8</maven.compiler.target>
  15. </properties>
  16. <dependencies>
  17. <dependency>
  18. <artifactId>gmall-common</artifactId>
  19. <groupId>org.example</groupId>
  20. <version>1.0-SNAPSHOT</version>
  21. </dependency>
  22. <!--canal client-->
  23. <dependency>
  24. <groupId>com.alibaba.otter</groupId>
  25. <artifactId>canal.client</artifactId>
  26. <version>1.1.2</version>
  27. </dependency>
  28. <!--kafka : 需要使用生产者写入kafka-->
  29. <dependency>
  30. <groupId>org.apache.kafka</groupId>
  31. <artifactId>kafka-clients</artifactId>
  32. </dependency>
  33. </dependencies>
  34. </project>

二.生产者(MyProducer.java)

  1. package com.atguigu.client;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.Producer;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import java.util.Properties;
  6. public class MyProducer {
  7. private static Producer producer;
  8. static{
  9. producer= getProducer();
  10. }
  11. //获取一个生产者
  12. public static Producer getProducer(){
  13. Properties properties = new Properties();
  14. //集群地址 key,value的序列化器
  15. properties.put("bootstrap.servers","hadoop102:9092,hadoop103:9092,hadoop104:9092");
  16. properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
  17. properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
  18. return new KafkaProducer<String,String>(properties);
  19. }
  20. public static void sendData(String topic,String value){
  21. producer.send(new ProducerRecord(topic,value));
  22. }
  23. }

三.canal客户端(CanalClient.java)

  1. package com.atguigu.client;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.alibaba.otter.canal.client.CanalConnector;
  4. import com.alibaba.otter.canal.client.CanalConnectors;
  5. import com.alibaba.otter.canal.protocol.CanalEntry;
  6. import com.alibaba.otter.canal.protocol.Message;
  7. import com.atguigu.constants.MyConstants;
  8. import com.google.protobuf.ByteString;
  9. import com.google.protobuf.InvalidProtocolBufferException;
  10. import java.net.InetSocketAddress;
  11. import java.util.List;
  12. //canalclient连接服务端
  13. public class CanalClient {
  14. public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
  15. //1.创建一个客户端对象
  16. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop103", 11111), "example", null, null);
  17. //2.连接
  18. connector.connect();
  19. //3.发请求
  20. connector.subscribe("sparkproduct.order_info");
  21. //4.拉数据
  22. while(true){
  23. Message message = connector.get(100);
  24. //判断是否拉取到了数据
  25. if(message.getId() == -1){
  26. System.out.println("歇会再去");
  27. Thread.sleep(5000);
  28. continue;
  29. }
  30. //解析数据(message->拉取一次,多条SQL,造成的写操作变化)
  31. //List<CanalEntry.Entry> entrys:->保存的就是多条SQL引起的变化
  32. List<CanalEntry.Entry> entries = message.getEntries();
  33. for (CanalEntry.Entry entry : entries) {
  34. //Entry:一个SQL引起的变化
  35. //只要insert语句
  36. if(entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)){
  37. //ByteStrinng storeValue_存的是一条SQL引起的数据变化,,不能直接使用,必须反序列化
  38. ByteString storeValue = entry.getStoreValue();
  39. parseDate(storeValue);
  40. }
  41. }
  42. }
  43. //
  44. }
  45. private static void parseDate(ByteString storeValue) throws InvalidProtocolBufferException {
  46. //反序列化
  47. CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
  48. //只要insert
  49. if (rowChange.getEventType().equals(CanalEntry.EventType.INSERT)){
  50. List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
  51. //将每一行的所有数据封装为json字符串
  52. for (CanalEntry.RowData rowData : rowDatasList) {
  53. JSONObject jsonObject = new JSONObject();
  54. //获取insert之后的列的信息
  55. //Column:可以获取列之前的值以及之后的值,以及列名
  56. List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
  57. for (CanalEntry.Column column : afterColumnsList) {
  58. jsonObject.put(column.getName(),column.getValue());
  59. }
  60. //System.out.println(jsonObject.toJSONString());
  61. //生产到kafka
  62. MyProducer.sendData(MyConstants.GMALL_ORDER_INFO,jsonObject.toJSONString());
  63. }
  64. }
  65. }
  66. }

四.运行程序,去kafka客户端查看

Spark实时数仓(二) - 图10

3.编辑子模块(gmall-realtime)

实时处理从canal客户端发送到kafka的数据

一.pom.xml

  1. <dependency>
  2. <groupId>mysql</groupId>
  3. <artifactId>mysql-connector-java</artifactId>
  4. <version>5.1.30</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.elasticsearch</groupId>
  8. <artifactId>elasticsearch-hadoop</artifactId>
  9. <version>6.6.0</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>com.alibaba</groupId>
  13. <artifactId>druid</artifactId>
  14. <version>1.0.5</version>
  15. </dependency>

二.配置文件(db.properties)

  1. #Mysql
  2. jdbc.datasource.size=20
  3. jdbc.url=jdbc:mysql://hadoop103:3306/sparkproduct?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true
  4. jdbc.user=root
  5. jdbc.password=321074
  6. jdbc.driver.name=com.mysql.jdbc.Driver

三.工具类(JDBCUtil.java)

  1. package com.atguigu.realtime.utils
  2. import java.sql.{Connection, PreparedStatement, ResultSet}
  3. import java.util.Properties
  4. import com.alibaba.druid.pool.DruidDataSourceFactory
  5. import javax.sql.DataSource
  6. object JDBCUtil {
  7. private val properties: Properties = PropertiesUtil.load("db.properties")
  8. // 创建连接池对象
  9. var dataSource:DataSource = init()
  10. // 连接池的初始化
  11. def init():DataSource = {
  12. val paramMap = new java.util.HashMap[String, String]()
  13. paramMap.put("driverClassName", properties.getProperty("jdbc.driver.name"))
  14. paramMap.put("url", properties.getProperty("jdbc.url"))
  15. paramMap.put("username", properties.getProperty("jdbc.user"))
  16. paramMap.put("password", properties.getProperty("jdbc.password"))
  17. paramMap.put("maxActive", properties.getProperty("jdbc.datasource.size"))
  18. // 使用Druid连接池对象
  19. DruidDataSourceFactory.createDataSource(paramMap)
  20. }
  21. // 从连接池中获取连接对象
  22. def getConnection(): Connection = {
  23. dataSource.getConnection
  24. }
  25. def main(args: Array[String]): Unit = {
  26. println(getConnection())
  27. }
  28. }

四.GMV实现

  1. 创建两张表分别存储GMV和偏移量
  1. CREATE TABLE `gmvstats` (
  2. `date` VARCHAR(10) NOT NULL,
  3. `hour` VARCHAR(10) NOT NULL,
  4. `gmv` DECIMAL(20,2) DEFAULT NULL,
  5. PRIMARY KEY (`date`,`hour`)
  6. ) ENGINE=INNODB DEFAULT CHARSET=utf8;
  1. CREATE TABLE `offsets` (
  2. `group_id` VARCHAR(200) NOT NULL,
  3. `topic` VARCHAR(200) NOT NULL,
  4. `partition` INT(5) NOT NULL,
  5. `offset` BIGINT(10) ,
  6. PRIMARY KEY (`group_id`,`topic`,`partition`)
  7. ) ENGINE=INNODB DEFAULT CHARSET=utf8;
  1. 样例类
  1. package com.atguigu.realtime.beans
  2. case class OrderInfo(
  3. id: String,
  4. province_id: String,
  5. consignee: String,
  6. order_comment: String,
  7. var consignee_tel: String,
  8. order_status: String,
  9. payment_way: String,
  10. user_id: String,
  11. img_url: String,
  12. total_amount: Double,
  13. expire_time: String,
  14. delivery_address: String,
  15. create_time: String,
  16. operate_time: String,
  17. tracking_no: String,
  18. parent_order_id: String,
  19. out_trade_no: String,
  20. trade_body: String,
  21. // 方便分时和每日统计,额外添加的字段
  22. var create_date: String,
  23. var create_hour: String)
  1. 交易额统计程序(GMVApp)
  1. package com.atguigu.realtime.app
  2. import com.alibaba.fastjson.JSON
  3. import com.atguigu.constants.MyConstants
  4. import com.atguigu.realtime.app.DAUApp.{appName, batchDuration, groupId, ssc}
  5. import com.atguigu.realtime.beans.OrderInfo
  6. import com.atguigu.realtime.utils.{JDBCUtil, MyKafkaUtil}
  7. import org.apache.kafka.clients.consumer.ConsumerRecord
  8. import org.apache.kafka.common.TopicPartition
  9. import org.apache.spark.SparkConf
  10. import org.apache.spark.rdd.RDD
  11. import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
  12. import org.apache.spark.streaming.{Seconds, StreamingContext}
  13. import java.sql.{Connection, PreparedStatement}
  14. import java.time.LocalDateTime
  15. import java.time.format.DateTimeFormatter
  16. import scala.collection.mutable
  17. /**
  18. *
  19. *
  20. * 计算每天,所有订单的GMV数据。
  21. * 每天的总GMV
  22. * 每天每个时段的GMV
  23. *
  24. * 结果: K(时间的说明)-V(GMV)
  25. *
  26. * 聚合类的运算: 采取 at least once + 事务(聚合的结果 + 偏移量) 维护到mysql!
  27. *
  28. *
  29. */
  30. object GMVApp extends BaseApp {
  31. override var appName: String = "GMVApp"
  32. override var batchDuration: Int = 10
  33. val groupId="gmallrealtime"
  34. /**
  35. * 查询上一次消费的offset
  36. * @param groupId
  37. * @param topicName
  38. * @return
  39. */
  40. def selectLatestOffsetsFromDB(groupId: String, topicName: String) = {
  41. val offsets = mutable.Map[TopicPartition, Long]()
  42. var connection:Connection = null
  43. var ps:PreparedStatement = null
  44. try {
  45. connection = JDBCUtil.getConnection()
  46. val sql =
  47. """
  48. |select * from `offsets` where `groupid` = ? and `topic` = ?
  49. |""".stripMargin
  50. ps = connection.prepareStatement(sql)
  51. ps.setString(1,groupId)
  52. ps.setString(2,topicName)
  53. val resultSet = ps.executeQuery()
  54. while(resultSet.next()){
  55. val topicPartition = new TopicPartition(resultSet.getString("topic"), resultSet.getInt("partitionid"))
  56. val offset = resultSet.getLong("offset")
  57. offsets.put(topicPartition,offset)
  58. }
  59. } catch {
  60. case e:Exception =>{
  61. e.printStackTrace()
  62. throw new RuntimeException("查询偏移量失败")
  63. }
  64. } finally {
  65. if(ps != null){
  66. ps.close()
  67. }
  68. if (connection != null){
  69. connection.close()
  70. }
  71. }
  72. offsets.toMap
  73. }
  74. /**
  75. * 将Rdd封装为样例类
  76. * @param rdd
  77. * @return
  78. */
  79. def parseRecordToOrderInfo(rdd: RDD[ConsumerRecord[String, String]]):RDD[OrderInfo] = {
  80. rdd.map(record=>{
  81. val orderInfo = JSON.parseObject(record.value(), classOf[OrderInfo])
  82. //"create_time": "2021-09-28 21:12:40",
  83. val formatter1 = DateTimeFormatter.ofPattern("yyyy-MM-dd")
  84. val formatter2 = DateTimeFormatter.ofPattern("HH")
  85. val formatter3 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
  86. // 先将 "2021-09-28 21:12:40" 转换为 LocalDateTime对象
  87. val localDateTime = LocalDateTime.parse(orderInfo.create_time, formatter3)
  88. orderInfo.create_date=localDateTime.format(formatter1)
  89. orderInfo.create_hour=localDateTime.format(formatter2)
  90. orderInfo
  91. })
  92. }
  93. /**
  94. * 在一个事务中,将结果和偏移量写出到mysql数据库中
  95. * @param results
  96. * @param offsetRanges
  97. */
  98. def writeDataAndOffsetInCommonBatch(results: Array[((String, String), Double)], offsetRanges: Array[OffsetRange]): Unit = {
  99. var connection:Connection = null
  100. var ps1:PreparedStatement = null
  101. var ps2:PreparedStatement = null
  102. try {
  103. connection = JDBCUtil.getConnection()
  104. //关闭事务的自动提交
  105. connection.setAutoCommit(false)
  106. //先判断当前的是有状态计算,还是无状态计算
  107. //有状态计算
  108. val sql1 =
  109. """
  110. |insert into gmvstats values(?,?,?)
  111. |on duplicate key update gmv = gmv + values(gmv)
  112. |""".stripMargin
  113. val sql2 =
  114. """
  115. |insert into offsets values(?,?,?,?)
  116. |on duplicate key update offset = values(offset)
  117. |""".stripMargin
  118. ps1 = connection.prepareStatement(sql1)
  119. ps2 = connection.prepareStatement(sql2)
  120. for (((date, hour), gmv)<- results) {
  121. ps1.setString(1,date)
  122. ps1.setString(2,hour)
  123. ps1.setDouble(3,gmv)
  124. //攒起来
  125. ps1.addBatch
  126. }
  127. //批量提交
  128. val dataResults = ps1.executeBatch()
  129. for (offsetRange <- offsetRanges) {
  130. ps2.setString(1,groupId)
  131. ps2.setString(2,offsetRange.topic)
  132. ps2.setInt(3,offsetRange.partition)
  133. ps2.setLong(4,offsetRange.untilOffset)
  134. //遍历一次,写一个分区的信息,一个offsetRange代表一个主题的一个分区
  135. ps2.addBatch()
  136. }
  137. val offsetResult = ps2.executeBatch()
  138. //提交事务
  139. connection.commit()
  140. println("写入数据" + dataResults.size)
  141. println("写入偏移量" + offsetResult.size)
  142. } catch {
  143. case e:Exception =>{
  144. connection.rollback()
  145. e.printStackTrace()
  146. throw new RuntimeException("写入结果和偏移量失败")
  147. }
  148. } finally {
  149. if(ps1 != null){
  150. ps1.close()
  151. }
  152. if(ps2 != null){
  153. ps2.close()
  154. }
  155. if (connection != null){
  156. connection.close()
  157. }
  158. }
  159. }
  160. def main(args: Array[String]): Unit = {
  161. ssc = new StreamingContext(new SparkConf().setMaster("local[4]").setAppName(appName),Seconds(batchDuration))
  162. runApp{
  163. //1.查询上次消费的offsets的位置
  164. val offsets = selectLatestOffsetsFromDB(groupId, MyConstants.GMALL_ORDER_INFO)
  165. //2,根据上次消费的位置,获取ds
  166. val ds = MyKafkaUtil.getKafkaStream(Array(MyConstants.GMALL_ORDER_INFO),ssc,
  167. groupId,true,offsets)
  168. //3.获取初始ds的偏移量信息
  169. ds.foreachRDD(rdd=>{
  170. if(!rdd.isEmpty()){
  171. val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  172. //4.封装为样例类
  173. val rdd1 = parseRecordToOrderInfo(rdd)
  174. //5.计算GMV
  175. val rdd2 = rdd1.map(orderInfo => ((orderInfo.create_date, orderInfo.create_hour), orderInfo.total_amount))
  176. .reduceByKey(_ + _)
  177. //6.将结果拉取到driver端和偏移量在一个事务中写入mysql
  178. val result = rdd2.collect()
  179. writeDataAndOffsetInCommonBatch(result,offsetRanges)
  180. }
  181. })
  182. }
  183. }
  184. }
  1. 启动APP,并去mysql客户端查看

Spark实时数仓(二) - 图11

Spark实时数仓(二) - 图12

4.编辑子模块(gmall-publisher)

一.业务代码实现

  1. pojo层(GMVPerHour.java)
  1. package com.example.gmallpublisher.pojo;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. @NoArgsConstructor
  6. @AllArgsConstructor
  7. @Data
  8. public class GMVPerHour {
  9. private String item;
  10. private Integer gmv;
  11. }
  1. mapper层(GMVMapper)
  1. package com.example.gmallpublisher.mapper;
  2. import com.baomidou.dynamic.datasource.annotation.DS;
  3. import com.example.gmallpublisher.pojo.GMVPerHour;
  4. import org.springframework.stereotype.Repository;
  5. import java.util.List;
  6. @Repository
  7. @DS("mysql")
  8. public interface GMVMapper {
  9. //查询当天累计的GMV
  10. Double getGMVByDate(String date);
  11. //查询当天分时GMV
  12. List<GMVPerHour> getGMVPerHourDate(String date);
  13. }
  1. GMVMapper.xml
  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE mapper
  3. PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
  4. "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
  5. <!--namespace=绑定一个对应的Dao/Mapper接口-->
  6. <mapper namespace="com.example.gmallpublisher.mapper.GMVMapper">
  7. <!--查询当日累计的GMV-->
  8. <select id="getGMVByDate" resultType="double">
  9. select
  10. sum(gmv)
  11. from gmvstats
  12. where `date` = #{logdate}
  13. </select>
  14. <!-- 查询分时GMV-->
  15. <select id="getGMVPerHourDate" resultType="com.example.gmallpublisher.pojo.GMVPerHour">
  16. select
  17. `hour` item,
  18. sum(gmv) gmv
  19. from gmvstats
  20. where `date`= #{logdate}
  21. group by `hour`
  22. </select>
  23. </mapper>
  1. service层(PublisherService,PublisherServiceImpl)
  1. package com.example.gmallpublisher.service;
  2. import com.example.gmallpublisher.pojo.DAUPerHour;
  3. import com.example.gmallpublisher.pojo.GMVPerHour;
  4. import java.util.List;
  5. public interface PublisherService {
  6. //查询单日日活
  7. Integer getDAUByDate(String date);
  8. //查询当日新增的设备数
  9. Integer getNewMidCountByDate(String date);
  10. //查询当日分时的设备数
  11. List<DAUPerHour> getDAUPerHourData(String date);
  12. //查询当天累计的GMV
  13. Double getGMVByDate(String date);
  14. //查询当天分时GMV
  15. List<GMVPerHour> getGMVPerHourDate(String date);
  16. }
  1. package com.example.gmallpublisher.service;
  2. import com.example.gmallpublisher.mapper.DAUMapper;
  3. import com.example.gmallpublisher.mapper.GMVMapper;
  4. import com.example.gmallpublisher.pojo.DAUPerHour;
  5. import com.example.gmallpublisher.pojo.GMVPerHour;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Service;
  8. import java.util.List;
  9. @Service
  10. public class PublisherServiceImpl implements PublisherService {
  11. @Autowired
  12. private DAUMapper dauMapper;
  13. @Autowired
  14. private GMVMapper gmvMapper;
  15. @Override
  16. public Integer getDAUByDate(String date) {
  17. System.out.println("遵循的业务步骤");
  18. return dauMapper.getDAUByDate(date);
  19. }
  20. @Override
  21. public Integer getNewMidCountByDate(String date) {
  22. System.out.println("遵循的业务步骤");
  23. return dauMapper.getNewMidCountByDate(date);
  24. }
  25. @Override
  26. public List<DAUPerHour> getDAUPerHourData(String date) {
  27. System.out.println("遵循业务步骤");
  28. return dauMapper.getDAUPerHourData(date);
  29. }
  30. @Override
  31. public Double getGMVByDate(String date) {
  32. System.out.println("遵循业务步骤");
  33. return gmvMapper.getGMVByDate(date);
  34. }
  35. @Override
  36. public List<GMVPerHour> getGMVPerHourDate(String date) {
  37. System.out.println("遵循业务步骤");
  38. return gmvMapper.getGMVPerHourDate(date);
  39. }
  40. }
  1. controller层
  1. package com.example.gmallpublisher.controller;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.example.gmallpublisher.pojo.DAUPerHour;
  4. import com.example.gmallpublisher.pojo.GMVPerHour;
  5. import com.example.gmallpublisher.service.PublisherService;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. import java.time.LocalDate;
  10. import java.util.ArrayList;
  11. import java.util.List;
  12. @RestController
  13. public class PublisherController {
  14. @Autowired
  15. private PublisherService publisherService;
  16. /**
  17. * [
  18. * {"id":"dau","name":"当日日活数","value":1200},
  19. * {"id":"new_mid","name":"新增设备数","value":233}
  20. * {"id":"order_amount","name":"当日交易额","value":1000.2 }
  21. * ]
  22. * @param date
  23. * @return
  24. */
  25. @RequestMapping(value="/realtime-total")
  26. public Object handle1(String date){
  27. //获取当日日活
  28. Integer dauByDate = publisherService.getDAUByDate(date);
  29. //获取当日新增设备数
  30. Integer newMidCountByDate = publisherService.getNewMidCountByDate(date);
  31. //获取当天累计的交易额
  32. Double gmvByDate = publisherService.getGMVByDate(date);
  33. ArrayList<JSONObject> result = new ArrayList<>();
  34. JSONObject jsonObject1 = new JSONObject();
  35. JSONObject jsonObject2 = new JSONObject();
  36. JSONObject jsonObject3 = new JSONObject();
  37. jsonObject1.put("id","dau");
  38. jsonObject1.put("name","当日日活数");
  39. jsonObject1.put("value",dauByDate);
  40. jsonObject2.put("id","new_mid");
  41. jsonObject2.put("name","新增设备数");
  42. jsonObject2.put("value",newMidCountByDate);
  43. jsonObject3.put("id","order_amount");
  44. jsonObject3.put("name","当日交易额");
  45. jsonObject3.put("value",gmvByDate);
  46. result.add(jsonObject1);
  47. result.add(jsonObject2);
  48. result.add(jsonObject3);
  49. return result;
  50. }
  51. /**
  52. *
  53. * {"yesterday":
  54. {
  55. "11":383,
  56. "12":123,
  57. "17":88,
  58. "19":200
  59. },
  60. "today":
  61. {"12":38,"13":1233,"17":123,"19":688 }
  62. }
  63. *
  64. * @param date
  65. * @return
  66. */
  67. @RequestMapping(value="realtime-hours")
  68. public Object handle2(String id,String date){
  69. JSONObject result = new JSONObject();
  70. //根据今天的日期计算昨天的日期
  71. String yestodayDate = LocalDate.parse(date).minusDays(1).toString();
  72. if("dau".equals(id)){
  73. List<DAUPerHour> todayData = publisherService.getDAUPerHourData(date);
  74. List<DAUPerHour> yestodayData = publisherService.getDAUPerHourData(yestodayDate);
  75. result.put("yesterday",parseDAUDate(yestodayData));
  76. result.put("today",parseDAUDate(todayData));
  77. }
  78. if ("order_amount".equals(id)){
  79. List<GMVPerHour> todayData = publisherService.getGMVPerHourDate(date);
  80. List<GMVPerHour> yestodayData = publisherService.getGMVPerHourDate(yestodayDate);
  81. result.put("yesterday",parseGMVData(yestodayData));
  82. result.put("today",parseGMVData(todayData));
  83. }
  84. return result;
  85. }
  86. private Object parseGMVData(List<GMVPerHour> data) {
  87. JSONObject result = new JSONObject();
  88. for (GMVPerHour gmvPerHour : data) {
  89. result.put(gmvPerHour.getItem(),gmvPerHour.getGmv());
  90. }
  91. return result;
  92. }
  93. public JSONObject parseDAUDate(List<DAUPerHour> data){
  94. JSONObject result = new JSONObject();
  95. for (DAUPerHour dauPerHour : data) {
  96. result.put(dauPerHour.getHour(),dauPerHour.getDau());
  97. }
  98. return result;
  99. }
  100. }
  1. 主程序
  1. package com.example.gmallpublisher;
  2. import org.mybatis.spring.annotation.MapperScan;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. @SpringBootApplication
  6. @MapperScan(basePackages = "com.example.gmallpublisher.mapper")
  7. public class GmallPublisherApplication {
  8. public static void main(String[] args) {
  9. SpringApplication.run(GmallPublisherApplication.class, args);
  10. }
  11. }
  1. index.html
  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>首页</title>
  6. </head>
  7. <body>
  8. <a href=" http://localhost:8070/realtime-total?date=2021-10-07">请求今天DAU和GMV</a> <br>
  9. <a href=" http://localhost:8070/realtime-hours?id=dau&date=2021-10-07">请求今天和昨天分时DAU</a>
  10. <br/>
  11. <a href=" http://localhost:8070/realtime-hours?id=order_amount&date=2021-10-07">请求今天和昨天分时GMV</a>
  12. </body>
  13. </html>

二.运行程序

浏览器访问: http://localhost:8070/index.html

Spark实时数仓(二) - 图13

  1. 请求今天DAU和GMV

Spark实时数仓(二) - 图14

  1. 请求今天和昨天分时DAU

Spark实时数仓(二) - 图15

  1. 请求今天和昨天分时GMV

Spark实时数仓(二) - 图16