第一章.项目需求三:购物券风险预警

1.需求分析

一.简介

实时预警,是一种经常出现在实时计算中的业务类型,根据日志数据中系统报错异常,或者用户行为异常的检测,产生对应预警日志,预警日志通过图形化界面的展示,可以提醒监控方,需要及时核查问题,并采取应对措施

二.需求说明

需求:同一设备,五分钟内使用2个及以上不同账号登录且都增加了收货地址,达到以上要求则产生一条预警日志,并且同一设备,每分钟只记录一次预警

三.数据处理流程图

Spark实时数仓(三) - 图1

2.编辑子模块(gmall-realtime)

一.pom.xml

  1. <dependency>
  2. <groupId>org.elasticsearch</groupId>
  3. <artifactId>elasticsearch-hadoop</artifactId>
  4. <version>6.6.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.alibaba</groupId>
  8. <artifactId>druid</artifactId>
  9. <version>1.0.5</version>
  10. </dependency>

二.样例类

  1. package com.atguigu.realtime.beans
  2. import scala.collection.mutable
  3. import scala.collection.mutable.ListBuffer
  4. /**
  5. * Created by Smexy on 2021/9/30
  6. */
  7. // 对应 actions[{action},... ]
  8. case class Action(action_id:String,
  9. item:String,
  10. item_type:String,
  11. ts:Long
  12. )
  13. // 对应common部分
  14. case class CommonInfo(
  15. ar:String,
  16. ba:String,
  17. ch:String,
  18. is_new:Int,
  19. md:String,
  20. mid:String,
  21. os:String,
  22. uid:Long,
  23. vc:String
  24. )
  25. // 对应Kafka中的一条数据
  26. case class ActionsLog(
  27. actions: List[Action],
  28. ts:Long,
  29. common:CommonInfo
  30. )
  31. //预警日志
  32. case class AlertInfo(id:String,
  33. // 这个设备过去5分组,登录过的用户
  34. uids:mutable.Set[String],
  35. // 这个设备过去5分组,登录过的用户 如果收藏了商品,请将收藏商品的id记录下来
  36. itemIds:mutable.Set[String],
  37. // 这个设备过去5分组,登录过的用户所产生的所有行为
  38. events:ListBuffer[String],
  39. // 预警日志的ts
  40. ts:Long)

三.ES相关(创建index)

  1. PUT _template/gmall_coupon_alert_template
  2. {
  3. "index_patterns": ["gmall_coupon_alert*"],
  4. "settings": {
  5. "number_of_shards": 3
  6. },
  7. "aliases" : {
  8. "{index}-query": {},
  9. "gmall_coupon_alert-query":{}
  10. },
  11. "mappings": {
  12. "_doc":{
  13. "properties":{
  14. "mid":{
  15. "type":"keyword"
  16. },
  17. "uids":{
  18. "type":"keyword"
  19. },
  20. "itemIds":{
  21. "type":"keyword"
  22. },
  23. "events":{
  24. "type":"keyword"
  25. },
  26. "ts":{
  27. "type":"date"
  28. }
  29. }
  30. }
  31. }
  32. }

四.Alert实现

  1. package com.atguigu.realtime.app
  2. import com.alibaba.fastjson.{JSON, JSONObject}
  3. import com.atguigu.constants.MyConstants
  4. import com.atguigu.realtime.beans.{Action, ActionsLog, AlertInfo, CommonInfo}
  5. import com.atguigu.realtime.utils.MyKafkaUtil
  6. import org.apache.spark.SparkConf
  7. import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, OffsetRange}
  8. import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
  9. import java.time.{Instant, LocalDate, LocalDateTime, ZoneId}
  10. import java.time.format.DateTimeFormatter
  11. import scala.collection.mutable
  12. import scala.util.control.Breaks
  13. /**
  14. * at least once + ES(幂等输出,偏移量维护在kafka)
  15. */
  16. object AlertApp extends BaseApp {
  17. override var appName: String = "AlertApp"
  18. override var batchDuration: Int = 10
  19. val groupId = "gmallrealtime"
  20. def main(args: Array[String]): Unit = {
  21. //将ES的配置写入conf
  22. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName(appName)
  23. conf.set("es.index.auto.create", "true")
  24. conf.set("es.nodes", "hadoop102")
  25. conf.set("es.port", "9200")
  26. ssc = new StreamingContext(conf,Seconds(batchDuration))
  27. runApp{
  28. //1.获取初始DS
  29. val ds = MyKafkaUtil.getKafkaStream(Array(MyConstants.ACTIONS_LOG), ssc, groupId)
  30. var offsetRanges:Array[OffsetRange] = null
  31. //2.获取偏移量信息
  32. val ds1 = ds.transform(rdd => {
  33. offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  34. //3.封装样例类
  35. rdd.map(record => {
  36. //获取actions
  37. val jSONObject:JSONObject = JSON.parseObject(record.value())
  38. /**
  39. * 导入静态方法,封装了scala集合和java集合的转换方法
  40. * scala集合转java集合 scala集合.asJava
  41. * java集合转scala集合 java集合.asScala
  42. */
  43. import collection.JavaConverters._
  44. //将util.list转为scala的list
  45. val actions: List[Action] = JSON.parseArray(jSONObject.getString("actions"), classOf[Action]).asScala.toList
  46. //获取common
  47. val common = JSON.parseObject(jSONObject.getString("common"), classOf[CommonInfo])
  48. ActionsLog(actions, jSONObject.getLong("ts"), common)
  49. })
  50. })
  51. //4.开窗5分钟,按照(mid,uid)分组
  52. val ds2 = ds1.window(Minutes(5))
  53. .map(log => {
  54. ((log.common.mid, log.common.uid), log)
  55. })
  56. .groupByKey()
  57. //5.根据需求,计算每个用户是否进行收货地址修改
  58. val ds3 = ds2.map {
  59. case ((mid, uid), actionLogs) => {
  60. //假设不需要预警
  61. var ifNeedAlert: Boolean = false
  62. //判断是否有需要预警的嫌疑
  63. Breaks.breakable {
  64. actionLogs.foreach(actionLogs => {
  65. actionLogs.actions.foreach(action => {
  66. if (action.action_id == "trade_add_address") {
  67. ifNeedAlert = true
  68. //不需要往下遍历,跳出循环
  69. Breaks.break
  70. }
  71. })
  72. })
  73. }
  74. //如果需要预警,返回这个人此期间产生的Logs
  75. if(ifNeedAlert){
  76. (mid,actionLogs)
  77. }else{
  78. (null,null)
  79. }
  80. }
  81. }
  82. //6.过滤(null,null)
  83. val ds4 = ds3.filter(_._1 != null)
  84. //7.继续将同一个设备,有嫌疑的用户及日志,分到同一组
  85. val ds5 = ds4.groupByKey()
  86. //8.过去5分钟,修改收货地址用户超过2个及以上的设备才需要预警
  87. val ds6 = ds5.filter(_._2.size >= 2)
  88. //9.压平
  89. val ds7 = ds6.mapValues(_.flatten)
  90. //10.生成预警信息
  91. val ds8 = ds7.map {
  92. case (mid, actionsLogs) => {
  93. var uids = mutable.Set[String]()
  94. var itemIds = mutable.Set[String]()
  95. var events = mutable.ListBuffer[String]()
  96. actionsLogs.foreach(actionsLog => {
  97. uids.add(actionsLog.common.uid.toString)
  98. actionsLog.actions.foreach(action => {
  99. if (action.action_id == "favor_add") {
  100. itemIds.add(action.item)
  101. }
  102. events.append(action.action_id)
  103. })
  104. })
  105. // id = mid_yyyy-MM-dd HH:mm
  106. val formatter1: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm")
  107. val ts: Long = System.currentTimeMillis()
  108. val time: LocalDateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneId.of("Asia/Shanghai"))
  109. var id = mid + "_" + time.format(formatter1)
  110. AlertInfo(id, uids, itemIds, events, ts)
  111. }
  112. }
  113. ds8.cache()
  114. ds8.print(100)
  115. //为了使用ESRdd的静态方法
  116. import org.elasticsearch.spark._
  117. //11.写到ES
  118. ds8.foreachRDD(rdd=>{
  119. rdd.saveToEs("/gmall_coupon_alert"+ LocalDate.now()+"/_doc" ,Map("es.mapping.id" -> "id") )
  120. //提交偏移量
  121. ds.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  122. })
  123. }
  124. }
  125. }

五.启动APP

Spark实时数仓(三) - 图2

ES客户端查看

Spark实时数仓(三) - 图3

3.Kibana发布可视化界面

一.创建索引样式

Spark实时数仓(三) - 图4

Spark实时数仓(三) - 图5

二.做树状图

Spark实时数仓(三) - 图6

Spark实时数仓(三) - 图7

Spark实时数仓(三) - 图8

Spark实时数仓(三) - 图9

三.做饼图

Spark实时数仓(三) - 图10

四.看板展示

Spark实时数仓(三) - 图11

第二章.项目需求四:灵活分析

1.需求分析

一.灵活查询的场景

  1. 数仓中存储了大量的明细数据,但是Hadoop存储的数仓计算必须经过MR,所以即时交互性非常糟糕,为了方便数据分析人员查看信息,数据平台需要提供一个能够根据文字及选项等条件,进行灵活分析判断的数据功能

Spark实时数仓(三) - 图12

二.需求详细

输入参数

日期 查询数据的日期
关键字 根据商品名称涉及到的词进行搜索

返回结果

男女比例占比 男,女
年龄比例占比 20岁以下,20-30岁,30岁以上
购买行为数据明细 包括用户id,性别,年龄,级别,购买的时间,商品价格,订单状态等信息,可翻页

三.数据处理流程图

Spark实时数仓(三) - 图13

2.编辑子模块(gmall-canalclient)

编写canal客户端采集明细数据到kafka

  1. package com.atguigu.client;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.alibaba.otter.canal.client.CanalConnector;
  4. import com.alibaba.otter.canal.client.CanalConnectors;
  5. import com.alibaba.otter.canal.protocol.CanalEntry;
  6. import com.alibaba.otter.canal.protocol.Message;
  7. import com.atguigu.constants.MyConstants;
  8. import com.google.protobuf.ByteString;
  9. import com.google.protobuf.InvalidProtocolBufferException;
  10. import java.net.InetSocketAddress;
  11. import java.util.List;
  12. //canalclient连接服务端
  13. public class CanalClient2 {
  14. public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
  15. //1.创建一个客户端对象
  16. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop103", 11111), "example", null, null);
  17. //2.连接
  18. connector.connect();
  19. //3.发请求
  20. connector.subscribe("sparkproduct.*");
  21. //4.拉数据
  22. while(true){
  23. Message message = connector.get(100);
  24. //判断是否拉取到了数据
  25. if(message.getId() == -1){
  26. System.out.println("歇会再去");
  27. Thread.sleep(5000);
  28. continue;
  29. }
  30. //解析数据(message->拉取一次,多条SQL,造成的写操作变化)
  31. //List<CanalEntry.Entry> entrys:->保存的就是多条SQL引起的变化
  32. List<CanalEntry.Entry> entries = message.getEntries();
  33. for (CanalEntry.Entry entry : entries) {
  34. //Entry:一个SQL引起的变化
  35. //只要insert语句
  36. if(entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)){
  37. //ByteStrinng storeValue_存的是一条SQL引起的数据变化,,不能直接使用,必须反序列化
  38. String tableName = entry.getHeader().getTableName();
  39. ByteString storeValue = entry.getStoreValue();
  40. parseDate(tableName,storeValue);
  41. }
  42. }
  43. }
  44. //
  45. }
  46. private static void parseDate(String tableName,ByteString storeValue) throws InvalidProtocolBufferException {
  47. //反序列化
  48. CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
  49. CanalEntry.EventType eventType = rowChange.getEventType();
  50. //只要insert
  51. if (tableName.equals("order_info") && eventType.equals(CanalEntry.EventType.INSERT) ){
  52. sendDataToKafka(rowChange,MyConstants.GMALL_ORDER_INFO);
  53. }else if(tableName.equals("order_detail") && eventType.equals(CanalEntry.EventType.INSERT)){
  54. sendDataToKafka(rowChange,MyConstants.GMALL_ORDER_DETAIL);
  55. }else if(tableName.equals("user_info") && (eventType.equals(CanalEntry.EventType.INSERT) || eventType.equals(CanalEntry.EventType.UPDATE))){
  56. sendDataToKafka(rowChange,MyConstants.GMALL_USER_INFO);
  57. }
  58. }
  59. public static void sendDataToKafka(CanalEntry.RowChange rowChange,String topicName){
  60. List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
  61. //将每一行数据的所有列封装为json字符串
  62. for (CanalEntry.RowData rowData : rowDatasList) {
  63. JSONObject jsonObject = new JSONObject();
  64. //获取insert后的列的信息
  65. List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
  66. for (CanalEntry.Column column : afterColumnsList) {
  67. jsonObject.put(column.getName(),column.getValue());
  68. }
  69. System.out.println(jsonObject.toJSONString());
  70. //生产到kafka
  71. MyProducer.sendData(topicName,jsonObject.toJSONString());
  72. }
  73. }
  74. }

3.编辑子模块(gmall-realtime)

一.数据处理流程

Spark实时数仓(三) - 图14

二.样例类

  1. package com.atguigu.realtime.beans
  2. case class OrderDetail(id:String,
  3. order_id: String,
  4. sku_name: String,
  5. sku_id: String,
  6. order_price: String,
  7. img_url: String,
  8. sku_num: String)
  9. case class UserInfo(id:String,
  10. login_name:String,
  11. user_level:String,
  12. birthday:String,
  13. gender:String)
  14. import java.text.SimpleDateFormat
  15. import java.util
  16. case class SaleDetail(
  17. var order_detail_id:String =null,
  18. var order_id: String=null,
  19. var order_status:String=null,
  20. var create_time:String=null,
  21. var user_id: String=null,
  22. var sku_id: String=null,
  23. var user_gender: String=null,
  24. var user_age: Int=0,
  25. var user_level: String=null,
  26. var sku_price: Double=0D,
  27. var sku_name: String=null,
  28. var dt:String=null) {
  29. def this(orderInfo:OrderInfo,orderDetail: OrderDetail) {
  30. this
  31. mergeOrderInfo(orderInfo)
  32. mergeOrderDetail(orderDetail)
  33. }
  34. def mergeOrderInfo(orderInfo:OrderInfo): Unit ={
  35. if(orderInfo!=null){
  36. this.order_id=orderInfo.id
  37. this.order_status=orderInfo.order_status
  38. this.create_time=orderInfo.create_time
  39. this.dt=orderInfo.create_date
  40. this.user_id=orderInfo.user_id
  41. }
  42. }
  43. def mergeOrderDetail(orderDetail: OrderDetail): Unit ={
  44. if(orderDetail!=null){
  45. this.order_detail_id=orderDetail.id
  46. this.sku_id=orderDetail.sku_id
  47. this.sku_name=orderDetail.sku_name
  48. this.sku_price=orderDetail.order_price.toDouble
  49. }
  50. }
  51. def mergeUserInfo(userInfo: UserInfo): Unit ={
  52. if(userInfo!=null){
  53. this.user_id=userInfo.id
  54. val formattor = new SimpleDateFormat("yyyy-MM-dd")
  55. val date: util.Date = formattor.parse(userInfo.birthday)
  56. val curTs: Long = System.currentTimeMillis()
  57. val betweenMs= curTs-date.getTime
  58. val age=betweenMs/1000L/60L/60L/24L/365L
  59. this.user_age=age.toInt
  60. this.user_gender=userInfo.gender
  61. this.user_level=userInfo.user_level
  62. }
  63. }
  64. }

三.采集user_Info进入缓存(redis)

  • 使用order_Info.userid和userinfo.id
  • 当前这笔订单的userid是当前新注册的用户,还是之前注册的用户?都有可能!需要join user_info的全量用户
  • 全量的user_info存储在哪里
  • mysql中user_info性能不太好
  • 将mysql中的全量user_info存入到redis中
  • 目的:在redis中有全量的用户信息
  • 写一个程序,批量将mysql的user_info表的数据读入,写出到redis!
  • 如果可以获取全量用户的binlog日志,也可以使用canal,读取全量的binlog日志,将数据生产到kafka,再写入到redis(采用)
  1. package com.atguigu.realtime.app
  2. import com.alibaba.fastjson.JSON
  3. import com.atguigu.constants.MyConstants
  4. import com.atguigu.realtime.utils.{MyKafkaUtil, RedisUtil}
  5. import org.apache.spark.SparkConf
  6. import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges}
  7. import org.apache.spark.streaming.{Seconds, StreamingContext}
  8. object UserApp extends BaseApp {
  9. override var appName: String = "UserApp"
  10. override var batchDuration: Int = 10
  11. val groupId = "gmallrealtime"
  12. def main(args: Array[String]): Unit = {
  13. ssc = new StreamingContext(new SparkConf().setMaster("local[*]").setAppName(appName),Seconds(batchDuration))
  14. runApp{
  15. //获取初始DS
  16. val ds = MyKafkaUtil.getKafkaStream(Array(MyConstants.GMALL_USER_INFO), ssc, groupId)
  17. ds.foreachRDD(rdd=>{
  18. if(!rdd.isEmpty()){
  19. //获取偏移量
  20. val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  21. rdd.foreachPartition(partition => {
  22. val jedis = RedisUtil.getJedisClient()
  23. partition.foreach(record => {
  24. val jSONObject = JSON.parseObject(record.value())
  25. jedis.set("userInfo:"+jSONObject.getString("id"),record.value())
  26. })
  27. jedis.close()
  28. })
  29. ds.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  30. }
  31. })
  32. }
  33. }
  34. }

Spark实时数仓(三) - 图15

四.ES索引建立

  1. PUT _template/gmall2021_sale_detail_template
  2. {
  3. "index_patterns": ["gmall2021_sale_detail*"],
  4. "settings": {
  5. "number_of_shards": 3
  6. },
  7. "aliases" : {
  8. "{index}-query": {},
  9. "gmall2021_sale_detail-query":{}
  10. },
  11. "mappings" : {
  12. "_doc" : {
  13. "properties" : {
  14. "order_detail_id" : {
  15. "type" : "keyword"
  16. },
  17. "order_id" : {
  18. "type" : "keyword"
  19. },
  20. "create_time" : {
  21. "type" : "date" ,
  22. "format" : "yyyy-MM-dd HH:mm:ss"
  23. },
  24. "dt" : {
  25. "type" : "date"
  26. },
  27. "order_status" : {
  28. "type" : "keyword"
  29. },
  30. "sku_id" : {
  31. "type" : "keyword"
  32. },
  33. "sku_name" : {
  34. "type" : "text",
  35. "analyzer": "ik_max_word"
  36. },
  37. "sku_price" : {
  38. "type" : "float"
  39. },
  40. "user_age" : {
  41. "type" : "long"
  42. },
  43. "user_gender" : {
  44. "type" : "keyword"
  45. },
  46. "user_id" : {
  47. "type" : "keyword"
  48. },
  49. "user_level" : {
  50. "type" : "keyword",
  51. "index" : false
  52. }
  53. }
  54. }
  55. }
  56. }

五.双流join

Spark实时数仓(三) - 图16

  1. package com.atguigu.realtime.app
  2. import com.alibaba.fastjson.JSON
  3. import com.atguigu.constants.MyConstants
  4. import com.atguigu.realtime.beans.{OrderDetail, OrderInfo, SaleDetail, UserInfo}
  5. import com.atguigu.realtime.utils.{MyKafkaUtil, RedisUtil}
  6. import com.google.gson.Gson
  7. import org.apache.kafka.clients.consumer.ConsumerRecord
  8. import org.apache.spark.SparkConf
  9. import org.apache.spark.rdd.RDD
  10. import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, OffsetRange}
  11. import org.apache.spark.streaming.{Seconds, StreamingContext}
  12. import java.time.{LocalDate, LocalDateTime}
  13. import java.time.format.DateTimeFormatter
  14. import scala.collection.mutable
  15. /**
  16. * SparkStreaming中双流join的注意事项
  17. * 1.两个流必须从一个StreamingContext中获取
  18. * 2.只有DStream[K-V],才能调用join算子
  19. * 3.join的原理是根据key进行join
  20. * 4.在join时,只有要当join的数据,位于同一批次,被处理时才能join上
  21. * 5.如果由于网络延迟,导致两个流要join的数据,位于不同的批次被消费,此时无法join
  22. */
  23. object SaleDetailApp extends BaseApp {
  24. override var appName: String = "SaleDetailApp"
  25. override var batchDuration: Int = 10
  26. val groupId = "gmallrealtime"
  27. def main(args: Array[String]): Unit = {
  28. //将es的配置写入sparkconf
  29. val conf: SparkConf = new SparkConf().setMaster("local[4]").setAppName(appName)
  30. conf.set("es.index.auto.create", "true")
  31. conf.set("es.nodes", "hadoop102")
  32. conf.set("es.port", "9200")
  33. ssc = new StreamingContext(conf, Seconds(batchDuration))
  34. runApp{
  35. //获取初始DS
  36. val ds1 = MyKafkaUtil.getKafkaStream(Array(MyConstants.GMALL_ORDER_DETAIL), ssc, groupId)
  37. val ds2 = MyKafkaUtil.getKafkaStream(Array(MyConstants.GMALL_ORDER_INFO), ssc, groupId)
  38. //获取当前批次的偏移量,将初始DS,转换为DS[K-V]
  39. var orderInfoOffsetRanges:Array[OffsetRange] = null
  40. var orderDetailOffsetRanges:Array[OffsetRange] = null
  41. val ds3 = ds1.transform(rdd => {
  42. orderDetailOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  43. rdd.map(record => {
  44. val orderDetail = JSON.parseObject(record.value(), classOf[OrderDetail])
  45. (orderDetail.order_id, orderDetail)
  46. })
  47. })
  48. val ds4 = ds2.transform(rdd => {
  49. orderInfoOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  50. parseRecordToOrderInfo(rdd)
  51. })
  52. val ds5 = ds4.fullOuterJoin(ds3)
  53. //使用缓存处理join后的数据,同一批次能join的就join,不能join的就写入缓存,等待后续批次
  54. //缓存使用的是redis,存在读写redis,以分区为单位获取连接
  55. val ds6 = ds5.mapPartitions(partition => {
  56. val jedis = RedisUtil.getJedisClient()
  57. val gson = new Gson()
  58. //准备一个集合,存放可以上的明细数据
  59. val results = mutable.ListBuffer[SaleDetail]()
  60. partition.foreach {
  61. case (orderId, (orderInfoOption, orderDetailOption)) => {
  62. if (orderInfoOption != None) {
  63. val oi = orderInfoOption.get
  64. if (orderDetailOption != None) {
  65. val od = orderDetailOption.get
  66. //关联当前批次可以join上的数据
  67. val saleDetail = new SaleDetail(oi, od)
  68. results.append(saleDetail)
  69. }
  70. //写入redis
  71. jedis.setex("orderInfo:" + orderId, 5 * 2 * 60, gson.toJson(oi))
  72. //从Redis中找,有没有之前批次早到的orderDetail,有就关联
  73. val set = jedis.smembers("orderDetail:" + orderId)
  74. //如果可以不存在,不会报空指针异常,此时set是一个[],并不是null
  75. set.forEach(orderDetailStr => {
  76. val orderDetail = JSON.parseObject(orderDetailStr, classOf[OrderDetail])
  77. val saleDetail = new SaleDetail(oi, orderDetail)
  78. results.append(saleDetail)
  79. })
  80. } else {
  81. val od = orderDetailOption.get
  82. //到缓存中找有没有早到的orderInfo,有就关联
  83. val str = jedis.get("orderInfo:" + orderId)
  84. if (str != null) {
  85. val info = JSON.parseObject(str, classOf[OrderInfo])
  86. val saleDetail = new SaleDetail(info, od)
  87. results.append(saleDetail)
  88. } else {
  89. //到缓存中没有找到早到的orderInfo,说明orderDetail早到了,写入缓存
  90. jedis.sadd("orderDetail:" + orderId, gson.toJson(od))
  91. //设置过期时间
  92. jedis.expire("orderDetail:" + orderId, 5 * 2 * 60)
  93. }
  94. }
  95. }
  96. }
  97. jedis.close()
  98. results.iterator
  99. })
  100. //ds6是已经将orderInfo和orderDetail join后的DS,需要从redis中根据ds6的userId获取用户的其他信息
  101. val ds7 = ds6.mapPartitions(partition => {
  102. val jedis = RedisUtil.getJedisClient()
  103. val it = partition.map(saleDetail => {
  104. val userInfoStr = jedis.get("userInfo:" + saleDetail.user_id)
  105. val userInfo = JSON.parseObject(userInfoStr, classOf[UserInfo])
  106. saleDetail.mergeUserInfo(userInfo)
  107. saleDetail
  108. })
  109. jedis.close()
  110. it
  111. })
  112. ds7.cache()
  113. println("即将写入:" + ds7.count().print())
  114. ds7.print(1000)
  115. import org.elasticsearch.spark._
  116. //写入ES
  117. ds7.foreachRDD(rdd => {
  118. // resource: 目的地 cfg: 配置
  119. rdd.saveToEs("/gmall2021_sale_detail"+ LocalDate.now()+"/_doc" ,Map("es.mapping.id" -> "order_detail_id") )
  120. // 提交偏移量
  121. ds1.asInstanceOf[CanCommitOffsets].commitAsync(orderDetailOffsetRanges)
  122. ds2.asInstanceOf[CanCommitOffsets].commitAsync(orderInfoOffsetRanges)
  123. })
  124. }
  125. }
  126. def parseRecordToOrderInfo(rdd: RDD[ConsumerRecord[String, String]]):RDD[(String,OrderInfo)] = {
  127. rdd.map(record=>{
  128. val orderInfo = JSON.parseObject(record.value(), classOf[OrderInfo])
  129. //"create_time": "2021-09-28 21:12:40",
  130. val formatter1 = DateTimeFormatter.ofPattern("yyyy-MM-dd")
  131. val formatter2 = DateTimeFormatter.ofPattern("HH")
  132. val formatter3 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
  133. // 先将 "2021-09-28 21:12:40" 转换为 LocalDateTime对象
  134. val localDateTime = LocalDateTime.parse(orderInfo.create_time, formatter3)
  135. orderInfo.create_date=localDateTime.format(formatter1)
  136. orderInfo.create_hour=localDateTime.format(formatter2)
  137. (orderInfo.id,orderInfo)
  138. })
  139. }
  140. }

Spark实时数仓(三) - 图17

Spark实时数仓(三) - 图18

4.编辑子模块(gmall-publisher)

输入路径及参数

http://localhost:8070/sale_detail?date=2021-08-21&startpage=1&size=5&keyword=小米手机

一.pom.xml

<!--- ES依赖包-->
<dependency>
    <groupId>io.searchbox</groupId>
    <artifactId>jest</artifactId>
    <version>5.3.3</version>
</dependency>

<dependency>
    <groupId>net.java.dev.jna</groupId>
    <artifactId>jna</artifactId>
    <version>4.5.2</version>
</dependency>

<dependency>
    <groupId>org.codehaus.janino</groupId>
    <artifactId>commons-compiler</artifactId>
    <version>2.7.8</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

二.配置 application.properties

#es
spring.elasticsearch.jest.uris=http://hadoop102:9200

三.业务代码实现

  1. pojo层

option.java(一个统计选项)

package com.example.gmallpublisher.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class Option {
    String name;
    Double value;
}

SaleDetail.java(销售统计详情)

package com.example.gmallpublisher.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * Created by Smexy on 2021/10/8
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SaleDetail {

    private String order_detail_id;
    private String order_id;
    private String order_status;
    private String create_time;
    private String user_id;
    private String sku_id;
    private String user_gender;
    private Integer user_age;
    private String user_level;
    private Double sku_price;
    private String sku_name;
    private String dt;

    private String  es_metadata_id;
}

Stat.java(一组统计选项)

import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.List;
@Data
@AllArgsConstructor
public class Stat {
    String title;
    List<Option> options;
}
  1. dao层

单独创建dao层的原因

  • 为了和Mybatis的mapper进行区分
  • 因为在主程序上标注了@MapperScan,如果将ESDao也放入mapper包下,此时spring也会为ESDao提供代理对象,在创建对象时会报错,在创建对象时,会根据ESDao中的方法名,去匹配SQL,并没有SQL,报错

ESDao.java

package com.example.gmallpublisher.dao;

import com.alibaba.fastjson.JSONObject;

import java.io.IOException;

/**
 * http://localhost:8070/sale_detail?date=2021-08-21&startpage=1&size=5&keyword=小米手机
 */

public interface ESDao {
    JSONObject getData(String data,Integer startpage,Integer size,String keyword) throws IOException;
}

ESDaoImpl.java

package com.example.gmallpublisher.dao;

import com.alibaba.fastjson.JSONObject;
import com.example.gmallpublisher.pojo.Option;
import com.example.gmallpublisher.pojo.SaleDetail;
import com.example.gmallpublisher.pojo.Stat;
import com.ibm.icu.text.DecimalFormat;
import io.searchbox.client.JestClient;
import io.searchbox.core.Search;
import io.searchbox.core.SearchResult;
import io.searchbox.core.search.aggregation.MetricAggregation;
import io.searchbox.core.search.aggregation.TermsAggregation;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;



import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * http://localhost:8070/sale_detail?date=2021-08-21&startpage=1&size=5&keyword=小米手机
 * date:查询date这一天的购物明细
 *     作用,生成要查询的indexName:gmall2021_sale_detail2021-10-09
 *  size:要查询多少条数据
 *  startpage:查询哪一页的数据
 *  keyword:检索的关键字
 *
 */
@Repository
public class ESDaoImpl implements ESDao{
    @Autowired
    private JestClient jestClient;
    public SearchResult getResult(String date,Integer startpage,Integer size,String keyword) throws IOException {
        int from =(startpage - 1) * size;
        String indexName = "gmall2021_sale_detail" + date;
        //构造match{}
        MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("sku_name", keyword);
        //构造aggs:{}
        TermsBuilder termsBuilder1 = AggregationBuilders.terms("gender_count").field("user_gender").size(2);
        TermsBuilder termsBuilder2 = AggregationBuilders.terms("age_count").field("user_age").size(150);
        //构造外层{}
        String queryString = new SearchSourceBuilder().query(matchQueryBuilder).aggregation(termsBuilder1).aggregation(termsBuilder2)
                .from(from).size(size).toString();
        Search search = new Search.Builder(queryString).addIndex(indexName).addType("_doc").build();
        SearchResult searchResult = jestClient.execute(search);
        return searchResult;

    }
    //detail:[{SaleDetail},{}]
    public ArrayList<SaleDetail> getDetail(SearchResult result){
        ArrayList<SaleDetail> results = new ArrayList<>();
        List<SearchResult.Hit<SaleDetail, Void>> hits = result.getHits(SaleDetail.class);
        for (SearchResult.Hit<SaleDetail, Void> hit : hits) {
            SaleDetail source = hit.source;
            source.setEs_metadata_id(hit.id);
            results.add(source);
        }
        return results;

    }
    public Stat getAgeStat(SearchResult result){
        MetricAggregation aggregations = result.getAggregations();
        TermsAggregation age_count = aggregations.getTermsAggregation("age_count");
        List<TermsAggregation.Entry> buckets = age_count.getBuckets();
        int ageFrom20To30 = 0;
        int agelt20 = 0;
        int agegt30 = 0;
        for (TermsAggregation.Entry bucket : buckets) {
            if (Integer.parseInt(bucket.getKey())<20){
                agelt20 += bucket.getCount();
            }else if(Integer.parseInt(bucket.getKey()) >=30){
                agegt30 += bucket.getCount();
            }else{
                ageFrom20To30 += bucket.getCount();
            }
        }
        double sumCount = ageFrom20To30 + agelt20 + agegt30;
        DecimalFormat decimalFormat = new DecimalFormat("###.##");
        ArrayList<Option> options = new ArrayList<Option>();
        options.add(new Option("20岁到30岁",Double.parseDouble(decimalFormat.format(ageFrom20To30 / sumCount * 100))));
        options.add(new Option("30岁及30岁以上",Double.parseDouble(decimalFormat.format(agegt30 / sumCount * 100))));
        options.add(new Option("20岁以下",Double.parseDouble(decimalFormat.format(agelt20 / sumCount * 100))));

        return new Stat("用户年龄占比",options);

    }
    public Stat getGenderStat(SearchResult result){

        MetricAggregation aggregations = result.getAggregations();

        TermsAggregation age_count = aggregations.getTermsAggregation("gender_count");

        List<TermsAggregation.Entry> buckets = age_count.getBuckets();

        int maleCount = 0;
        int femaleCount = 0;


        for (TermsAggregation.Entry bucket : buckets) {

            if (bucket.getKey().equals("M")){
                maleCount += bucket.getCount();
            }else{
                femaleCount+= bucket.getCount();
            }

        }

        double sumCount = maleCount + femaleCount ;

        DecimalFormat decimalFormat = new DecimalFormat("###.##");

        ArrayList<Option> options = new ArrayList<>();

        options.add(new Option("男",Double.parseDouble(decimalFormat.format(maleCount / sumCount * 100))));
        options.add(new Option("女",Double.parseDouble(decimalFormat.format(femaleCount / sumCount * 100))));


        return new Stat("用户性别占比",options);

    }




    @Override
    public JSONObject getData(String date, Integer startpage, Integer size, String keyword) throws IOException {
        SearchResult searchResult = getResult(date, startpage, size, keyword);

        JSONObject jsonObject = new JSONObject();

        ArrayList<Stat> stats = new ArrayList<>();

        stats.add(getAgeStat(searchResult));
        stats.add(getGenderStat(searchResult));

        jsonObject.put("total",searchResult.getTotal());
        jsonObject.put("stats",stats);
        jsonObject.put("detail",getDetail(searchResult));


        return jsonObject;
    }
}
  1. service层

PublisherService.java

package com.example.gmallpublisher.service;

import com.alibaba.fastjson.JSONObject;
import com.example.gmallpublisher.pojo.DAUPerHour;
import com.example.gmallpublisher.pojo.GMVPerHour;

import java.io.IOException;
import java.util.List;

public interface PublisherService {
    //查询单日日活
    Integer getDAUByDate(String date);
    //查询当日新增的设备数
    Integer getNewMidCountByDate(String date);
    //查询当日分时的设备数
    List<DAUPerHour> getDAUPerHourData(String date);
    //查询当天累计的GMV
    Double getGMVByDate(String date);
    //查询当天分时GMV
    List<GMVPerHour> getGMVPerHourDate(String date);
    //灵活分析查询
    JSONObject getData(String date, Integer startpage, Integer size, String keyword) throws IOException;

}

PublisherServiceImpl.java

package com.example.gmallpublisher.service;

import com.alibaba.fastjson.JSONObject;
import com.example.gmallpublisher.dao.ESDao;
import com.example.gmallpublisher.mapper.DAUMapper;
import com.example.gmallpublisher.mapper.GMVMapper;
import com.example.gmallpublisher.pojo.DAUPerHour;


import com.example.gmallpublisher.pojo.GMVPerHour;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


import java.io.IOException;
import java.util.List;

@Service
public class PublisherServiceImpl implements PublisherService {


    @Autowired
    private DAUMapper dauMapper;
    @Autowired
    private GMVMapper gmvMapper;
    @Autowired
    private ESDao esDao;
    @Override
    public Integer getDAUByDate(String date) {
        System.out.println("遵循的业务步骤");
        return dauMapper.getDAUByDate(date);
    }

    @Override
    public Integer getNewMidCountByDate(String date) {
        System.out.println("遵循的业务步骤");
        return dauMapper.getNewMidCountByDate(date);
    }

    @Override
    public List<DAUPerHour> getDAUPerHourData(String date) {
        System.out.println("遵循业务步骤");
        return dauMapper.getDAUPerHourData(date);
    }

    @Override
    public Double getGMVByDate(String date) {
        System.out.println("遵循业务步骤");
        return gmvMapper.getGMVByDate(date);
    }

    @Override
    public List<GMVPerHour> getGMVPerHourDate(String date) {
        System.out.println("遵循业务步骤");
        return gmvMapper.getGMVPerHourDate(date);
    }
    @Override
    public JSONObject getData(String date, Integer startpage, Integer size, String keyword) throws IOException {
        return esDao.getData(date,startpage,size,keyword);
    }
}
  1. controller层
    @Autowired
    private PublisherService publisherService;
    @RequestMapping(value="/sale_detail")
    public Object handle3(String date,Integer startpage,Integer size,String keyword) throws IOException {
        return publisherService.getData(date,startpage,size,keyword);
    }
  1. index.html
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>首页</title>
</head>
<body>
<a href=" http://localhost:8070/realtime-total?date=2021-10-07">请求今天DAU和GMV</a> <br>
<a href=" http://localhost:8070/realtime-hours?id=dau&date=2021-10-07">请求今天和昨天分时DAU</a>
<br/>
<a href=" http://localhost:8070/realtime-hours?id=order_amount&date=2021-10-07">请求今天和昨天分时GMV</a>
<a href=" http://localhost:8070/sale_detail?date=2021-10-09&startpage=2&size=10&keyword=小米手机">请求10-09日购物明细</a>

</body>
</html>

四.运行程序

Spark实时数仓(三) - 图19