第一章.项目概述

1.实时需求与离线需求的比较

  • 离线需求(T+1):一般是根据前一日的数据生成报表等数据,虽然统计指标,报表繁多,但是对时效性不敏感
  • 实时需求(T+0):主要侧重于对当日数据的实时监控,通常业务逻辑相对离线需求简单一下,统计指标也少一些,但是更注重数据的时效性,以及用户的交互性

2.需求说明

  • 日用户首次登陆(日活)分时趋势图,昨日对比
  • 交易额及分时趋势图,昨日对比
  • 购物券功能风险预警
  • 用户购买明细灵活分析功能

3.架构分析

一.离线数仓

Spark实时数仓(一) - 图1

二.Spark实时数仓

Spark实时数仓(一) - 图2

4.项目涉及技术

Spark实时数仓(一) - 图3

第二章.基础工程搭建

1.创建父工程(spark-gmall)

pom.xml

  1. <properties>
  2. <maven.compiler.source>8</maven.compiler.source>
  3. <maven.compiler.target>8</maven.compiler.target>
  4. <spark.version>3.0.0</spark.version>
  5. <scala.version>2.12.10</scala.version>
  6. <log4j.version>1.2.17</log4j.version>
  7. <fastjson.version>1.2.62</fastjson.version>
  8. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  9. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  10. <java.version>1.8</java.version>
  11. <kafka.client.verson>2.4.1</kafka.client.verson>
  12. </properties>
  13. <dependencies>
  14. <!-- 具体的日志实现 -->
  15. <dependency>
  16. <groupId>log4j</groupId>
  17. <artifactId>log4j</artifactId>
  18. <version>${log4j.version}</version>
  19. </dependency>
  20. </dependencies>
  21. <dependencyManagement>
  22. <dependencies>
  23. <dependency>
  24. <groupId>com.alibaba</groupId>
  25. <artifactId>fastjson</artifactId>
  26. <version>${fastjson.version}</version>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.apache.spark</groupId>
  30. <artifactId>spark-core_2.12</artifactId>
  31. <version>${spark.version}</version>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.apache.spark</groupId>
  35. <artifactId>spark-hive_2.12</artifactId>
  36. <version>${spark.version}</version>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.apache.spark</groupId>
  40. <artifactId>spark-sql_2.12</artifactId>
  41. <version>${spark.version}</version>
  42. </dependency>
  43. <dependency>
  44. <groupId>org.apache.spark</groupId>
  45. <artifactId>spark-streaming_2.12</artifactId>
  46. <version>${spark.version}</version>
  47. </dependency>
  48. <dependency>
  49. <groupId>org.apache.spark</groupId>
  50. <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  51. <version>${spark.version}</version>
  52. </dependency>
  53. <dependency>
  54. <groupId>org.apache.kafka</groupId>
  55. <artifactId>kafka-clients</artifactId>
  56. <version>${kafka.client.verson}</version>
  57. </dependency>
  58. </dependencies>
  59. </dependencyManagement>

2.创建子模块(gmall-common)

gmall-common模块主要提供整个项目各模块共用的公共工具类,常量和依赖

一.pom.xml

  1. <dependencies>
  2. <dependency>
  3. <groupId>com.alibaba</groupId>
  4. <artifactId>fastjson</artifactId>
  5. </dependency>
  6. </dependencies>

二.MyConstants.java

  1. package com.atguigu.constants;
  2. public interface MyConstants {
  3. //保存kafka的五种用户行为
  4. String STARTUP_LOG = "STARTUP_LOG";
  5. String ERROR_LOG = "ERROR_LOG";
  6. String DISPLAY_LOG = "DISPLAY_LOG";
  7. String PAGE_LOG = "PAGE_LOG";
  8. String ACTIONS_LOG = "ACTIONS_LOG";
  9. //交易明细需要使用的三种不同的业务数据
  10. String GMALL_ORDER_INFO = "GMALL_ORDER_INFO";
  11. String GMALL_ORDER_DETAIL = "GMALL_ORDER_DETAIL";
  12. String GMALL_USER_INFO = "GMALL_USER_INFO";
  13. }

3.创建子模块(gmall-logger)

gmall-logger主要用于采集日志数据到kafka

该模块是一个webapp ,创建时选SpringBoot

Spark实时数仓(一) - 图4

一.项目搭建

  1. 将gmall-logger模块的pom.xml中的以下配置剪切到spark-gmall的pom.xml中
  1. <parent>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-parent</artifactId>
  4. <version>1.5.10.RELEASE</version>
  5. <relativePath/> <!-- lookup parent from repository -->
  6. </parent>
  1. 将gmall-common模块pom.xml中的以下配置复制到gmall-logger模块的pom.xml
  1. <parent>
  2. <artifactId>spark-gmall</artifactId>
  3. <groupId>org.example</groupId>
  4. <version>1.0-SNAPSHOT</version>
  5. </parent>
  1. spark-gmall的pom.xml添加以下内容
  1. <modules>
  2. <module>gmall-common</module>
  3. <module>gmall-logger</module>
  4. <module>gmall-realtime</module>
  5. <module>gmall-canalclient</module>
  6. </modules>
  1. gmall-logger.xml
  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. <exclusions>
  6. <!--排除自带的 logger -->
  7. <exclusion>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-logging</artifactId>
  10. </exclusion>
  11. </exclusions>
  12. </dependency>
  13. <!-- java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory-->
  14. <dependency>
  15. <groupId>commons-logging</groupId>
  16. <artifactId>commons-logging</artifactId>
  17. <version>1.1.1</version>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.springframework.kafka</groupId>
  21. <artifactId>spring-kafka</artifactId>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.projectlombok</groupId>
  25. <artifactId>lombok</artifactId>
  26. <optional>true</optional>
  27. </dependency>
  28. <dependency>
  29. <artifactId>gmall-common</artifactId>
  30. <groupId>org.example</groupId>
  31. <version>1.0-SNAPSHOT</version>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.springframework.boot</groupId>
  35. <artifactId>spring-boot-starter-test</artifactId>
  36. <scope>test</scope>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.springframework.kafka</groupId>
  40. <artifactId>spring-kafka-test</artifactId>
  41. <scope>test</scope>
  42. </dependency>
  43. <dependency>
  44. <groupId>org.apache.kafka</groupId>
  45. <artifactId>kafka-clients</artifactId>
  46. </dependency>
  47. </dependencies>
  48. <build>
  49. <plugins>
  50. <plugin>
  51. <groupId>org.springframework.boot</groupId>
  52. <artifactId>spring-boot-maven-plugin</artifactId>
  53. <version>1.5.10.RELEASE</version>
  54. <configuration>
  55. <excludes>
  56. <exclude>
  57. <groupId>org.projectlombok</groupId>
  58. <artifactId>lombok</artifactId>
  59. </exclude>
  60. </excludes>
  61. </configuration>
  62. </plugin>
  63. </plugins>
  64. </build>
  1. application.properties
  1. server.context-path=/gmall_logger
  2. server.port=8888
  3. #============== kafka ===================
  4. # 指定kafka 代理地址,可以多个
  5. spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
  6. # 指定消息key和消息体的编解码方式
  7. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  8. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  1. 整合log4j(log4j.properties)
  1. #定义了名为atguigu.MyConsole的一个ConsoleAppender 向控制台输出日志,红色打印,
  2. log4j.appender.atguigu.MyConsole=org.apache.log4j.ConsoleAppender
  3. log4j.appender.atguigu.MyConsole.target=System.err
  4. log4j.appender.atguigu.MyConsole.layout=org.apache.log4j.PatternLayout
  5. # 年-月-日 时:分:秒 10个占位符 日志级别 (全类名:方法名) - 消息 换行
  6. log4j.appender.atguigu.MyConsole.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n
  7. #定义了一个DailyRollingFileAppender 将日志写入到一个按照日期滚动的文件中。 当天的数据,会写入到app.log中,
  8. # 过一天后,app.log会滚动,滚动的旧文件 以 app.log.昨天的日期 ,滚动的新文件还叫app.log
  9. log4j.appender.atguigu.File=org.apache.log4j.DailyRollingFileAppender
  10. log4j.appender.atguigu.File.file=/opt/module/logs/app.log
  11. log4j.appender.atguigu.File.DatePattern='.'yyyy-MM-dd
  12. log4j.appender.atguigu.File.layout=org.apache.log4j.PatternLayout
  13. log4j.appender.atguigu.File.layout.ConversionPattern=%m%n
  14. #代表指定哪个类的logger使用什么级别和appender进行日志输出 全类名可能需要修改
  15. log4j.logger.com.atguigu.gmalllogger.controller.LogController=info,atguigu.File,atguigu.MyConsole
  1. 编写controller
  1. package com.atguigu.gmalllogger.controller;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.atguigu.constants.MyConstants;
  5. import lombok.extern.log4j.Log4j;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. import org.springframework.kafka.core.KafkaTemplate;
  10. /**
  11. * 客户端在发送日志的时候,参数名是固定的
  12. */
  13. @RestController
  14. @Log4j //Logger log = Logger.getLogger(LogController.class)
  15. public class LogController {
  16. //从容器中找一个KafkaTemplate类型的对象,赋值给producer
  17. @Autowired
  18. private KafkaTemplate producer;
  19. @RequestMapping(value="/applog")
  20. public Object handle1(String param){
  21. //System.out.println(param);
  22. log.info(param);
  23. //先将param由JsonStr转化为java对象(Map)
  24. JSONObject jsonObject = JSON.parseObject(param);
  25. if (jsonObject.get("start") != null){
  26. producer.send(MyConstants.STARTUP_LOG, param );
  27. }
  28. if (jsonObject.get("actions") != null){
  29. producer.send(MyConstants.ACTIONS_LOG, param );
  30. }
  31. if (jsonObject.get("page") != null){
  32. producer.send(MyConstants.PAGE_LOG, param );
  33. }
  34. if (jsonObject.get("displays") != null){
  35. producer.send(MyConstants.DISPLAY_LOG, param );
  36. }
  37. if (jsonObject.get("err") != null){
  38. producer.send(MyConstants.ERROR_LOG, param );
  39. }
  40. return "success";
  41. }
  42. @RequestMapping(value = "/hello")
  43. public Object handle2(){
  44. System.out.println("hello!");
  45. return "success";
  46. }
  47. }
  1. 将程序打成jar包,上传到虚拟机,并分发

二.配置nginx(反向代理)

  1. 什么是正向代理和反向代理?
  1. # 正向代理(代理服务器代理的是客户端的ip)
  2. 正向代理: 代理的是客户端的ip
  3. 例如 客户端Aip无法访问B域名
  4. 可以让客户端Aip先访问一下代理服务器,将Aip代理为Cip,使用Cip访问B域名
  5. #反向代理(代理服务器代理的是服务端的ip)
  6. 反向代理: 代理的是服务端的ip
  7. 例如 A用户访问 B域名,C域名,D域名
  8. A用户访问一个代理服务器,由代理服务器将请求转到指定的A,B,C域名
  1. 安装
  1. 1.yum安装依赖包
  2. sudo yum -y install openssl openssl-devel pcre pcre-devel zlib zlib-devel gcc gcc-c++
  3. 2.上传压缩包到linux上并解压
  4. tar -zxvf nginx-1.12.2.tar.gz -C /opt/module
  5. 3. 进入解压缩目录,执行
  6. ./configure --prefix=/opt/module/nginx
  7. make && make install
  1. 启动,关闭命令
  1. # 启动
  2. 在/opt/module/nginx目录下执行 sudo sbin/nginx
  3. # 关闭
  4. 在/opt/module/nginx目录下执行 sudo sbin/nginx -s stop
  5. # 重新加载
  6. 在/opt/module/nginx目录下执行 sudo sbin/nginx -s reload
  1. 修改配置文件(/opt/module/nginx/conf/nginx.conf)
  1. http{
  2. ..........
  3. upstream logserver{
  4. server hadoop102:8888 weight=1;
  5. server hadoop103:8888 weight=1;
  6. server hadoop104:8888 weight=1;
  7. }
  8. server {
  9. listen 81;
  10. server_name logserver;
  11. location / {
  12. root html;
  13. index index.html index.htm;
  14. proxy_pass http://logserver;
  15. proxy_connect_timeout 10;
  16. }
  17. ..........
  18. }

三.gmall-logger集群脚本

  1. #!/bin/bash
  2. #第一层采集通道的一键启动脚本,只接收单个start或stop参数
  3. if(($#!=1))
  4. then
  5. echo 请输入单个startstop参数!
  6. exit
  7. fi
  8. #对传入的单个参数进行校验,在hadoop102和hadoop103且执行第一层采集通道的启动和停止命令
  9. if [ $1 = start ]
  10. then
  11. cmd="nohup java -jar /opt/module/gmall-logger.jar > /dev/null 2>&1 &"
  12. elif [ $1 = stop ]
  13. then
  14. cmd="ps -ef | grep gmall-logger.jar | grep -v grep | awk '{print \$2}' | xargs kill "
  15. else
  16. echo 请输入单个startstop参数!
  17. fi
  18. #在hadoop102和hadoop103且执行第一层采集通道的启动和停止命令
  19. for i in hadoop102 hadoop103 hadoop104
  20. do
  21. echo "--------------$i-----------------"
  22. ssh $i $cmd
  23. done

4.创建子模块(sparkstreaming)

sparkstreaming模块主要用于学习sparkstreaming精确一次性消费

SparkStreaming精确一次性消费.pdf

一.配置文件和工具类

pom.xml

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.spark</groupId>
  4. <artifactId>spark-core_2.12</artifactId>
  5. <version>3.0.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.spark</groupId>
  9. <artifactId>spark-streaming_2.12</artifactId>
  10. <version>3.0.0</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.spark</groupId>
  14. <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  15. <version>3.0.0</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>mysql</groupId>
  19. <artifactId>mysql-connector-java</artifactId>
  20. <version>8.0.16</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>com.alibaba</groupId>
  24. <artifactId>druid</artifactId>
  25. <version>1.0.5</version>
  26. </dependency>
  27. </dependencies>
  28. <build>
  29. <finalName>SparkCoreTest</finalName>
  30. <plugins>
  31. <plugin>
  32. <groupId>net.alchim31.maven</groupId>
  33. <artifactId>scala-maven-plugin</artifactId>
  34. <version>3.2.0</version>
  35. <executions>
  36. <execution>
  37. <goals>
  38. <goal>compile</goal>
  39. <goal>testCompile</goal>
  40. </goals>
  41. </execution>
  42. </executions>
  43. </plugin>
  44. </plugins>
  45. </build>

db.properties

  1. #jdbc配置
  2. jdbc.datasource.size=10
  3. jdbc.url=jdbc:mysql://hadoop102:3306/sparkproduct?useSSL=false&useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true
  4. jdbc.user=root
  5. jdbc.password=321074
  6. jdbc.driver.name=com.mysql.cj.jdbc.Driver

JDBCUtil

  1. package com.atguigu.utils
  2. import java.sql.Connection
  3. import com.alibaba.druid.pool.DruidDataSourceFactory
  4. import javax.sql.DataSource
  5. /**
  6. * Created by VULCAN on 2020/11/3
  7. */
  8. object JDBCUtil {
  9. // 创建连接池对象
  10. var dataSource:DataSource = init()
  11. // 连接池的初始化
  12. def init():DataSource = {
  13. val paramMap = new java.util.HashMap[String, String]()
  14. paramMap.put("driverClassName", PropertiesUtil.getValue("jdbc.driver.name"))
  15. paramMap.put("url", PropertiesUtil.getValue("jdbc.url"))
  16. paramMap.put("username", PropertiesUtil.getValue("jdbc.user"))
  17. paramMap.put("password", PropertiesUtil.getValue("jdbc.password"))
  18. paramMap.put("maxActive", PropertiesUtil.getValue("jdbc.datasource.size"))
  19. // 使用Druid连接池对象
  20. DruidDataSourceFactory.createDataSource(paramMap)
  21. }
  22. // 从连接池中获取连接对象
  23. def getConnection(): Connection = {
  24. dataSource.getConnection
  25. }
  26. }

PropertiesUtil

  1. package com.atguigu.utils
  2. import java.util.ResourceBundle
  3. /**
  4. * Created by VULCAN on 2020/11/3
  5. */
  6. object PropertiesUtil {
  7. // 绑定配置文件
  8. // ResourceBundle专门用于读取配置文件,所以读取时,不需要增加扩展名
  9. val resourceFile: ResourceBundle = ResourceBundle.getBundle("db")
  10. def getValue( key : String ): String = {
  11. resourceFile.getString(key)
  12. }
  13. }

二.获取偏移量

  1. package com.atguigu.kafka
  2. import org.apache.kafka.clients.consumer.ConsumerRecord
  3. import org.apache.kafka.common.serialization.StringDeserializer
  4. import org.apache.spark.streaming.kafka010._
  5. import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
  6. import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
  7. /**
  8. * 获取偏移量
  9. * 如果想要获取当前批次数据的偏移量,只能从初始DS(DirectKafkaInputDStream)中获取
  10. */
  11. object GetOffsetsFromKafka {
  12. def main(args: Array[String]): Unit = {
  13. import org.apache.spark.SparkConf
  14. import org.apache.spark.streaming.{Seconds, StreamingContext}
  15. //创建StreamingContext,设置一个批次的时间
  16. val conf = new SparkConf().setMaster("local[4]").setAppName("test")
  17. val ssc = new StreamingContext(conf, Seconds(5))
  18. //设置日志级别
  19. ssc.sparkContext.setLogLevel("error")
  20. //配置kafka
  21. val kafkaParams = Map[String, Object](
  22. "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
  23. "key.deserializer" -> classOf[StringDeserializer],
  24. "value.deserializer" -> classOf[StringDeserializer],
  25. "group.id" -> "app1",
  26. "auto.offset.reset" -> "latest",
  27. "enable.auto.commit" -> "false"
  28. )
  29. val topics = Array("topicA")
  30. val stream = KafkaUtils.createDirectStream[String, String](
  31. ssc,
  32. LocationStrategies.PreferConsistent,
  33. ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
  34. )
  35. /**
  36. * DS.foreachRDD{
  37. * 只有RDD算子中的代码是在Executor端运行
  38. * 其他的都在Driver端运行
  39. * }
  40. */
  41. stream.foreachRDD { rdd =>
  42. //获取当前批次的offsets
  43. //OffsetRange:代表kafka一个主题的一个分区,从中拉取到的数据的offset的范围
  44. //offsetRange的个数=消费的分区数
  45. val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  46. stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  47. for (range <- offsetRanges) {
  48. println(Thread.currentThread().getName + ":" + range)
  49. }
  50. val rdd1 = rdd.map(record => {
  51. println(Thread.currentThread().getName + ":" + record.value())
  52. record.value
  53. })
  54. rdd1.collect()
  55. }
  56. stream.map(record => (record.key, record.value))
  57. ssc.start()
  58. ssc.awaitTermination()
  59. }
  60. }

Spark实时数仓(一) - 图5

三.提交偏移量

  1. package com.atguigu.kafka
  2. import org.apache.kafka.common.serialization.StringDeserializer
  3. import org.apache.spark.streaming.kafka010._
  4. /**
  5. * 在SparkStreaming,只有两个算子可以将DS的运算,转换为RDD的运算
  6. *
  7. * 1. DS.foreachRDD(rdd=>{
  8. * rdd.算子
  9. * })
  10. * 2. DS1 = DS.transform(rdd=>{
  11. * rdd.算子
  12. * })
  13. * foreachRDD:输出操作,没有返回值!
  14. * transform:转换操作,有返回值!
  15. *
  16. * 只有从kafka获取到的那个DS<DirectKafkaInputDStream>,才有提交偏移量的功能!
  17. */
  18. object GetOffsetsFromKafka2 {
  19. def main(args: Array[String]): Unit = {
  20. import org.apache.spark.SparkConf
  21. import org.apache.spark.streaming.{Seconds, StreamingContext}
  22. //创建StreamingContext,设置一个批次的时间
  23. val conf = new SparkConf().setMaster("local[4]").setAppName("test")
  24. val ssc = new StreamingContext(conf, Seconds(5))
  25. //设置日志级别
  26. ssc.sparkContext.setLogLevel("error")
  27. //配置kafka
  28. val kafkaParams = Map[String, Object](
  29. "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
  30. "key.deserializer" -> classOf[StringDeserializer],
  31. "value.deserializer" -> classOf[StringDeserializer],
  32. "group.id" -> "app1",
  33. "auto.offset.reset" -> "latest",
  34. "enable.auto.commit" -> "false"
  35. )
  36. val topics = Array("topicA")
  37. val stream = KafkaUtils.createDirectStream[String, String](
  38. ssc,
  39. LocationStrategies.PreferConsistent,
  40. ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
  41. )
  42. //声明一个变量来接收偏移量
  43. var offsetRanges: Array[OffsetRange] = null
  44. //使用transform获取偏移量
  45. val ds1 = stream.transform(rdd => {
  46. offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  47. for (elem <- offsetRanges) {
  48. println(elem)
  49. }
  50. rdd
  51. })
  52. val ds2 = ds1.map(record => record.value())
  53. //输出后提交
  54. ds2.foreachRDD(rdd=>{
  55. val results = rdd.collect()
  56. //输出
  57. results.foreach(println(_))
  58. //提交偏移量
  59. stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  60. })
  61. stream.map(record => (record.key, record.value))
  62. ssc.start()
  63. ssc.awaitTermination()
  64. }
  65. }

Spark实时数仓(一) - 图6

四.精确一次性消费案例一

  1. package com.atguigu.kafka
  2. import org.apache.kafka.common.serialization.StringDeserializer
  3. import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
  4. /**
  5. * at least once (将offset维护在kafka中| 自定义存储位置) + 幂等性输出
  6. */
  7. object ConsumeExactlyOnceDemo1 {
  8. def main(args: Array[String]): Unit = {
  9. import org.apache.spark.SparkConf
  10. import org.apache.spark.streaming.{Seconds, StreamingContext}
  11. //创建StreamingContext,设置一个批次的时间
  12. val conf = new SparkConf().setMaster("local[4]").setAppName("test")
  13. val ssc = new StreamingContext(conf, Seconds(5))
  14. //设置日志级别
  15. ssc.sparkContext.setLogLevel("error")
  16. //配置kafka
  17. val kafkaParams = Map[String, Object](
  18. "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
  19. "key.deserializer" -> classOf[StringDeserializer],
  20. "value.deserializer" -> classOf[StringDeserializer],
  21. "group.id" -> "app1",
  22. "auto.offset.reset" -> "latest",
  23. //1.手动提交offset
  24. "enable.auto.commit" -> "false"
  25. )
  26. val topics = Array("topicA")
  27. //2.当决定将offsets信息维护在kafka中时,默认自动会从kafka集群中获取获取当前组上次消费的位置信息
  28. val stream = KafkaUtils.createDirectStream[String, String](
  29. ssc,
  30. LocationStrategies.PreferConsistent,
  31. ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
  32. )
  33. //声明一个变量来接收偏移量
  34. var offsetRanges:Array[OffsetRange] = null
  35. //获取当前批次数据的偏移量
  36. val ds1 = stream.transform(rdd => {
  37. offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  38. for (elem <- offsetRanges) {
  39. println(elem)
  40. }
  41. //3.各种转换运算
  42. rdd
  43. })
  44. //3.各种转换运算
  45. val ds2 = ds1.map(_.value())
  46. //4.输出
  47. ds2.foreachRDD(rdd=>{
  48. //5.幂等性输出,保证写出的数据库是支持幂等操作
  49. //此处代码省略.....
  50. //6.决定提交偏移量到kafka的consumeroffsets
  51. //手动提交offsets
  52. stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  53. })
  54. ssc.start()
  55. ssc.awaitTermination()
  56. }
  57. }

五.精确一次性消费案例二

  1. package com.atguigu.kafka
  2. import com.atguigu.utils.JDBCUtil
  3. import org.apache.kafka.common.TopicPartition
  4. import org.apache.kafka.common.serialization.StringDeserializer
  5. import org.apache.spark.streaming.kafka010._
  6. import java.sql.{Connection, PreparedStatement}
  7. import scala.collection.mutable
  8. /**
  9. * at least once (将offset维护在kafka中| 自定义存储位置) + 事务(结果和offsets一起写出到数据库)
  10. * 自己维护offsets存储到外部数据库(非kafka),以mysql为例
  11. * 适用场景:聚合类运算
  12. * 以wordcount为例
  13. */
  14. object ConsumeExactlyOnceDemo2 {
  15. val groupId = "test02"
  16. val topicName = "topicA"
  17. /**
  18. * 查询上一次消费的offset
  19. * @param groupId
  20. * @param topicName
  21. * @return
  22. */
  23. def selectLatestOffsetsFromDB(groupId: String, topicName: String) = {
  24. val offsets = mutable.Map[TopicPartition, Long]()
  25. var connection:Connection = null
  26. var ps:PreparedStatement = null
  27. try {
  28. connection = JDBCUtil.getConnection()
  29. val sql =
  30. """
  31. |select * from `offsets` where `groupid` = ? and `topic` = ?
  32. |""".stripMargin
  33. ps = connection.prepareStatement(sql)
  34. ps.setString(1,groupId)
  35. ps.setString(2,topicName)
  36. val resultSet = ps.executeQuery()
  37. while(resultSet.next()){
  38. val topicPartition = new TopicPartition(resultSet.getString("topic"), resultSet.getInt("partitionid"))
  39. val offset = resultSet.getLong("offset")
  40. offsets.put(topicPartition,offset)
  41. }
  42. } catch {
  43. case e:Exception =>{
  44. e.printStackTrace()
  45. throw new RuntimeException("查询偏移量失败")
  46. }
  47. } finally {
  48. if(ps != null){
  49. ps.close()
  50. }
  51. if (connection != null){
  52. connection.close()
  53. }
  54. }
  55. offsets.toMap
  56. }
  57. /**
  58. * 在一个事务中,将结果和偏移量写出到mysql数据库中
  59. * @param results
  60. * @param offsetRanges
  61. */
  62. def writeDataAndOffsetInCommonBatch(results: Array[(String, Int)], offsetRanges: Array[OffsetRange]): Unit = {
  63. var connection:Connection = null
  64. var ps1:PreparedStatement = null
  65. var ps2:PreparedStatement = null
  66. try {
  67. connection = JDBCUtil.getConnection()
  68. //关闭事务的自动提交
  69. connection.setAutoCommit(false)
  70. //先判断当前的是有状态计算,还是无状态计算
  71. //有状态计算
  72. val sql1 =
  73. """
  74. |insert into wordcount values(?,?)
  75. |on duplicate key update count = count + values(count)
  76. |""".stripMargin
  77. val sql2 =
  78. """
  79. |insert into offsets values(?,?,?,?)
  80. |on duplicate key update offset = values(offset)
  81. |""".stripMargin
  82. ps1 = connection.prepareStatement(sql1)
  83. ps2 = connection.prepareStatement(sql2)
  84. for ((word,count)<- results) {
  85. ps1.setString(1,word)
  86. ps1.setInt(2,count)
  87. //攒起来
  88. ps1.addBatch
  89. }
  90. //批量提交
  91. val dataResults = ps1.executeBatch()
  92. for (offsetRange <- offsetRanges) {
  93. ps2.setString(1,groupId)
  94. ps2.setString(2,offsetRange.topic)
  95. ps2.setInt(3,offsetRange.partition)
  96. ps2.setLong(4,offsetRange.untilOffset)
  97. //遍历一次,写一个分区的信息,一个offsetRange代表一个主题的一个分区
  98. ps2.addBatch()
  99. }
  100. val offsetResult = ps2.executeBatch()
  101. //提交事务
  102. connection.commit()
  103. println("写入数据" + dataResults.size)
  104. println("写入偏移量" + offsetResult.size)
  105. } catch {
  106. case e:Exception =>{
  107. connection.rollback()
  108. e.printStackTrace()
  109. throw new RuntimeException("写入结果和偏移量失败")
  110. }
  111. } finally {
  112. if(ps1 != null){
  113. ps1.close()
  114. }
  115. if(ps2 != null){
  116. ps2.close()
  117. }
  118. if (connection != null){
  119. connection.close()
  120. }
  121. }
  122. }
  123. def main(args: Array[String]): Unit = {
  124. import org.apache.spark.SparkConf
  125. import org.apache.spark.streaming.{Seconds, StreamingContext}
  126. //创建StreamingContext,设置一个批次的时间
  127. val conf = new SparkConf().setMaster("local[4]").setAppName("test")
  128. val ssc = new StreamingContext(conf, Seconds(5))
  129. //设置日志级别
  130. ssc.sparkContext.setLogLevel("error")
  131. //配置kafka
  132. val kafkaParams = Map[String, Object](
  133. "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
  134. "key.deserializer" -> classOf[StringDeserializer],
  135. "value.deserializer" -> classOf[StringDeserializer],
  136. "group.id" -> groupId,
  137. "auto.offset.reset" -> "latest",
  138. //1.手动提交offset
  139. "enable.auto.commit" -> "false"
  140. )
  141. val topics = Array(topicName)
  142. /*
  143. 2.每批数据,在运算前,要先去数据库查询上一次消费的offsets
  144. 查询的条件是,哪个组,消费到了哪个topic,到哪个位置
  145. */
  146. val offsets = selectLatestOffsetsFromDB(groupId, topicName)
  147. println("-------------------------")
  148. println(offsets)
  149. //3.基于上一次消费的最新位置,获取DS之后,进行各种转换
  150. val stream = KafkaUtils.createDirectStream[String, String](
  151. ssc,
  152. LocationStrategies.PreferConsistent,
  153. ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,offsets)
  154. )
  155. //声明一个变量来接收偏移量
  156. var offsetRanges:Array[OffsetRange] = null
  157. //获取当前批次数据的偏移量
  158. val ds1 = stream.transform(rdd => {
  159. offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  160. //4.各种转换运算
  161. val rdd1 = rdd.map(record => record.value())
  162. rdd1
  163. })
  164. //4.各种转换运算
  165. val ds2 = ds1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
  166. //4.输出
  167. ds2.foreachRDD(rdd=>{
  168. //收集聚合后的结果到Driver端
  169. val results = rdd.collect()
  170. println("聚合的结果"+results.mkString(","))
  171. //在一个事务中,将结果和offsets信息写出到mysql
  172. writeDataAndOffsetInCommonBatch(results,offsetRanges)
  173. })
  174. ssc.start()
  175. ssc.awaitTermination()
  176. }
  177. }

在虚拟机上打开kafka生产者进行测试

  1. kafka-console-producer.sh --broker-list hadoop102:9092 --topic topicA

Spark实时数仓(一) - 图7

运行程序

Spark实时数仓(一) - 图8

打开mysql客户端查看

Spark实时数仓(一) - 图9

Spark实时数仓(一) - 图10