第一章.项目概述
1.实时需求与离线需求的比较
- 离线需求(T+1):一般是根据前一日的数据生成报表等数据,虽然统计指标,报表繁多,但是对时效性不敏感
- 实时需求(T+0):主要侧重于对当日数据的实时监控,通常业务逻辑相对离线需求简单一下,统计指标也少一些,但是更注重数据的时效性,以及用户的交互性
2.需求说明
- 日用户首次登陆(日活)分时趋势图,昨日对比
- 交易额及分时趋势图,昨日对比
- 购物券功能风险预警
- 用户购买明细灵活分析功能
3.架构分析
一.离线数仓

二.Spark实时数仓

4.项目涉及技术

第二章.基础工程搭建
1.创建父工程(spark-gmall)
pom.xml
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><spark.version>3.0.0</spark.version><scala.version>2.12.10</scala.version><log4j.version>1.2.17</log4j.version><fastjson.version>1.2.62</fastjson.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><kafka.client.verson>2.4.1</kafka.client.verson></properties><dependencies><!-- 具体的日志实现 --><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>${log4j.version}</version></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.client.verson}</version></dependency></dependencies></dependencyManagement>
2.创建子模块(gmall-common)
gmall-common模块主要提供整个项目各模块共用的公共工具类,常量和依赖
一.pom.xml
<dependencies><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency></dependencies>
二.MyConstants.java
package com.atguigu.constants;public interface MyConstants {//保存kafka的五种用户行为String STARTUP_LOG = "STARTUP_LOG";String ERROR_LOG = "ERROR_LOG";String DISPLAY_LOG = "DISPLAY_LOG";String PAGE_LOG = "PAGE_LOG";String ACTIONS_LOG = "ACTIONS_LOG";//交易明细需要使用的三种不同的业务数据String GMALL_ORDER_INFO = "GMALL_ORDER_INFO";String GMALL_ORDER_DETAIL = "GMALL_ORDER_DETAIL";String GMALL_USER_INFO = "GMALL_USER_INFO";}
3.创建子模块(gmall-logger)
gmall-logger主要用于采集日志数据到kafka
该模块是一个webapp ,创建时选SpringBoot

一.项目搭建
- 将gmall-logger模块的pom.xml中的以下配置剪切到spark-gmall的pom.xml中
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.10.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent>
- 将gmall-common模块pom.xml中的以下配置复制到gmall-logger模块的pom.xml
<parent><artifactId>spark-gmall</artifactId><groupId>org.example</groupId><version>1.0-SNAPSHOT</version></parent>
- spark-gmall的pom.xml添加以下内容
<modules><module>gmall-common</module><module>gmall-logger</module><module>gmall-realtime</module><module>gmall-canalclient</module></modules>
- gmall-logger.xml
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><!--排除自带的 logger --><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><!-- java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory--><dependency><groupId>commons-logging</groupId><artifactId>commons-logging</artifactId><version>1.1.1</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><artifactId>gmall-common</artifactId><groupId>org.example</groupId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>1.5.10.RELEASE</version><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build>
- application.properties
server.context-path=/gmall_loggerserver.port=8888#============== kafka ===================# 指定kafka 代理地址,可以多个spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092# 指定消息key和消息体的编解码方式spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
- 整合log4j(log4j.properties)
#定义了名为atguigu.MyConsole的一个ConsoleAppender 向控制台输出日志,红色打印,log4j.appender.atguigu.MyConsole=org.apache.log4j.ConsoleAppenderlog4j.appender.atguigu.MyConsole.target=System.errlog4j.appender.atguigu.MyConsole.layout=org.apache.log4j.PatternLayout# 年-月-日 时:分:秒 10个占位符 日志级别 (全类名:方法名) - 消息 换行log4j.appender.atguigu.MyConsole.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n#定义了一个DailyRollingFileAppender 将日志写入到一个按照日期滚动的文件中。 当天的数据,会写入到app.log中,# 过一天后,app.log会滚动,滚动的旧文件 以 app.log.昨天的日期 ,滚动的新文件还叫app.loglog4j.appender.atguigu.File=org.apache.log4j.DailyRollingFileAppenderlog4j.appender.atguigu.File.file=/opt/module/logs/app.loglog4j.appender.atguigu.File.DatePattern='.'yyyy-MM-ddlog4j.appender.atguigu.File.layout=org.apache.log4j.PatternLayoutlog4j.appender.atguigu.File.layout.ConversionPattern=%m%n#代表指定哪个类的logger使用什么级别和appender进行日志输出 全类名可能需要修改log4j.logger.com.atguigu.gmalllogger.controller.LogController=info,atguigu.File,atguigu.MyConsole
- 编写controller
package com.atguigu.gmalllogger.controller;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.atguigu.constants.MyConstants;import lombok.extern.log4j.Log4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import org.springframework.kafka.core.KafkaTemplate;/*** 客户端在发送日志的时候,参数名是固定的*/@RestController@Log4j //Logger log = Logger.getLogger(LogController.class)public class LogController {//从容器中找一个KafkaTemplate类型的对象,赋值给producer@Autowiredprivate KafkaTemplate producer;@RequestMapping(value="/applog")public Object handle1(String param){//System.out.println(param);log.info(param);//先将param由JsonStr转化为java对象(Map)JSONObject jsonObject = JSON.parseObject(param);if (jsonObject.get("start") != null){producer.send(MyConstants.STARTUP_LOG, param );}if (jsonObject.get("actions") != null){producer.send(MyConstants.ACTIONS_LOG, param );}if (jsonObject.get("page") != null){producer.send(MyConstants.PAGE_LOG, param );}if (jsonObject.get("displays") != null){producer.send(MyConstants.DISPLAY_LOG, param );}if (jsonObject.get("err") != null){producer.send(MyConstants.ERROR_LOG, param );}return "success";}@RequestMapping(value = "/hello")public Object handle2(){System.out.println("hello!");return "success";}}
- 将程序打成jar包,上传到虚拟机,并分发
二.配置nginx(反向代理)
- 什么是正向代理和反向代理?
# 正向代理(代理服务器代理的是客户端的ip)正向代理: 代理的是客户端的ip例如 客户端Aip无法访问B域名可以让客户端Aip先访问一下代理服务器,将Aip代理为Cip,使用Cip访问B域名#反向代理(代理服务器代理的是服务端的ip)反向代理: 代理的是服务端的ip例如 A用户访问 B域名,C域名,D域名让A用户访问一个代理服务器,由代理服务器将请求转到指定的A,B,C域名
- 安装
1.yum安装依赖包sudo yum -y install openssl openssl-devel pcre pcre-devel zlib zlib-devel gcc gcc-c++2.上传压缩包到linux上并解压tar -zxvf nginx-1.12.2.tar.gz -C /opt/module3. 进入解压缩目录,执行./configure --prefix=/opt/module/nginxmake && make install
- 启动,关闭命令
# 启动在/opt/module/nginx目录下执行 sudo sbin/nginx# 关闭在/opt/module/nginx目录下执行 sudo sbin/nginx -s stop# 重新加载在/opt/module/nginx目录下执行 sudo sbin/nginx -s reload
- 修改配置文件(/opt/module/nginx/conf/nginx.conf)
http{..........upstream logserver{server hadoop102:8888 weight=1;server hadoop103:8888 weight=1;server hadoop104:8888 weight=1;}server {listen 81;server_name logserver;location / {root html;index index.html index.htm;proxy_pass http://logserver;proxy_connect_timeout 10;}..........}
三.gmall-logger集群脚本
#!/bin/bash#第一层采集通道的一键启动脚本,只接收单个start或stop参数if(($#!=1))thenecho 请输入单个start或stop参数!exitfi#对传入的单个参数进行校验,在hadoop102和hadoop103且执行第一层采集通道的启动和停止命令if [ $1 = start ]thencmd="nohup java -jar /opt/module/gmall-logger.jar > /dev/null 2>&1 &"elif [ $1 = stop ]thencmd="ps -ef | grep gmall-logger.jar | grep -v grep | awk '{print \$2}' | xargs kill "elseecho 请输入单个start或stop参数!fi#在hadoop102和hadoop103且执行第一层采集通道的启动和停止命令for i in hadoop102 hadoop103 hadoop104doecho "--------------$i-----------------"ssh $i $cmddone
4.创建子模块(sparkstreaming)
sparkstreaming模块主要用于学习sparkstreaming精确一次性消费
一.配置文件和工具类
pom.xml
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.16</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.0.5</version></dependency></dependencies><build><finalName>SparkCoreTest</finalName><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build>
db.properties
#jdbc配置jdbc.datasource.size=10jdbc.url=jdbc:mysql://hadoop102:3306/sparkproduct?useSSL=false&useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=truejdbc.user=rootjdbc.password=321074jdbc.driver.name=com.mysql.cj.jdbc.Driver
JDBCUtil
package com.atguigu.utilsimport java.sql.Connectionimport com.alibaba.druid.pool.DruidDataSourceFactoryimport javax.sql.DataSource/*** Created by VULCAN on 2020/11/3*/object JDBCUtil {// 创建连接池对象var dataSource:DataSource = init()// 连接池的初始化def init():DataSource = {val paramMap = new java.util.HashMap[String, String]()paramMap.put("driverClassName", PropertiesUtil.getValue("jdbc.driver.name"))paramMap.put("url", PropertiesUtil.getValue("jdbc.url"))paramMap.put("username", PropertiesUtil.getValue("jdbc.user"))paramMap.put("password", PropertiesUtil.getValue("jdbc.password"))paramMap.put("maxActive", PropertiesUtil.getValue("jdbc.datasource.size"))// 使用Druid连接池对象DruidDataSourceFactory.createDataSource(paramMap)}// 从连接池中获取连接对象def getConnection(): Connection = {dataSource.getConnection}}
PropertiesUtil
package com.atguigu.utilsimport java.util.ResourceBundle/*** Created by VULCAN on 2020/11/3*/object PropertiesUtil {// 绑定配置文件// ResourceBundle专门用于读取配置文件,所以读取时,不需要增加扩展名val resourceFile: ResourceBundle = ResourceBundle.getBundle("db")def getValue( key : String ): String = {resourceFile.getString(key)}}
二.获取偏移量
package com.atguigu.kafkaimport org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentimport org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe/*** 获取偏移量* 如果想要获取当前批次数据的偏移量,只能从初始DS(DirectKafkaInputDStream)中获取*/object GetOffsetsFromKafka {def main(args: Array[String]): Unit = {import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}//创建StreamingContext,设置一个批次的时间val conf = new SparkConf().setMaster("local[4]").setAppName("test")val ssc = new StreamingContext(conf, Seconds(5))//设置日志级别ssc.sparkContext.setLogLevel("error")//配置kafkaval kafkaParams = Map[String, Object]("bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "app1","auto.offset.reset" -> "latest","enable.auto.commit" -> "false")val topics = Array("topicA")val stream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))/*** DS.foreachRDD{* 只有RDD算子中的代码是在Executor端运行* 其他的都在Driver端运行* }*/stream.foreachRDD { rdd =>//获取当前批次的offsets//OffsetRange:代表kafka一个主题的一个分区,从中拉取到的数据的offset的范围//offsetRange的个数=消费的分区数val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRangesstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)for (range <- offsetRanges) {println(Thread.currentThread().getName + ":" + range)}val rdd1 = rdd.map(record => {println(Thread.currentThread().getName + ":" + record.value())record.value})rdd1.collect()}stream.map(record => (record.key, record.value))ssc.start()ssc.awaitTermination()}}

三.提交偏移量
package com.atguigu.kafkaimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.streaming.kafka010._/*** 在SparkStreaming,只有两个算子可以将DS的运算,转换为RDD的运算** 1. DS.foreachRDD(rdd=>{* rdd.算子* })* 2. DS1 = DS.transform(rdd=>{* rdd.算子* })* foreachRDD:输出操作,没有返回值!* transform:转换操作,有返回值!** 只有从kafka获取到的那个DS<DirectKafkaInputDStream>,才有提交偏移量的功能!*/object GetOffsetsFromKafka2 {def main(args: Array[String]): Unit = {import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}//创建StreamingContext,设置一个批次的时间val conf = new SparkConf().setMaster("local[4]").setAppName("test")val ssc = new StreamingContext(conf, Seconds(5))//设置日志级别ssc.sparkContext.setLogLevel("error")//配置kafkaval kafkaParams = Map[String, Object]("bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "app1","auto.offset.reset" -> "latest","enable.auto.commit" -> "false")val topics = Array("topicA")val stream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))//声明一个变量来接收偏移量var offsetRanges: Array[OffsetRange] = null//使用transform获取偏移量val ds1 = stream.transform(rdd => {offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesfor (elem <- offsetRanges) {println(elem)}rdd})val ds2 = ds1.map(record => record.value())//输出后提交ds2.foreachRDD(rdd=>{val results = rdd.collect()//输出results.foreach(println(_))//提交偏移量stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)})stream.map(record => (record.key, record.value))ssc.start()ssc.awaitTermination()}}

四.精确一次性消费案例一
package com.atguigu.kafkaimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}/*** at least once (将offset维护在kafka中| 自定义存储位置) + 幂等性输出*/object ConsumeExactlyOnceDemo1 {def main(args: Array[String]): Unit = {import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}//创建StreamingContext,设置一个批次的时间val conf = new SparkConf().setMaster("local[4]").setAppName("test")val ssc = new StreamingContext(conf, Seconds(5))//设置日志级别ssc.sparkContext.setLogLevel("error")//配置kafkaval kafkaParams = Map[String, Object]("bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "app1","auto.offset.reset" -> "latest",//1.手动提交offset"enable.auto.commit" -> "false")val topics = Array("topicA")//2.当决定将offsets信息维护在kafka中时,默认自动会从kafka集群中获取获取当前组上次消费的位置信息val stream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))//声明一个变量来接收偏移量var offsetRanges:Array[OffsetRange] = null//获取当前批次数据的偏移量val ds1 = stream.transform(rdd => {offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesfor (elem <- offsetRanges) {println(elem)}//3.各种转换运算rdd})//3.各种转换运算val ds2 = ds1.map(_.value())//4.输出ds2.foreachRDD(rdd=>{//5.幂等性输出,保证写出的数据库是支持幂等操作//此处代码省略.....//6.决定提交偏移量到kafka的consumeroffsets//手动提交offsetsstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)})ssc.start()ssc.awaitTermination()}}
五.精确一次性消费案例二
package com.atguigu.kafkaimport com.atguigu.utils.JDBCUtilimport org.apache.kafka.common.TopicPartitionimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.streaming.kafka010._import java.sql.{Connection, PreparedStatement}import scala.collection.mutable/*** at least once (将offset维护在kafka中| 自定义存储位置) + 事务(结果和offsets一起写出到数据库)* 自己维护offsets存储到外部数据库(非kafka),以mysql为例* 适用场景:聚合类运算* 以wordcount为例*/object ConsumeExactlyOnceDemo2 {val groupId = "test02"val topicName = "topicA"/*** 查询上一次消费的offset* @param groupId* @param topicName* @return*/def selectLatestOffsetsFromDB(groupId: String, topicName: String) = {val offsets = mutable.Map[TopicPartition, Long]()var connection:Connection = nullvar ps:PreparedStatement = nulltry {connection = JDBCUtil.getConnection()val sql ="""|select * from `offsets` where `groupid` = ? and `topic` = ?|""".stripMarginps = connection.prepareStatement(sql)ps.setString(1,groupId)ps.setString(2,topicName)val resultSet = ps.executeQuery()while(resultSet.next()){val topicPartition = new TopicPartition(resultSet.getString("topic"), resultSet.getInt("partitionid"))val offset = resultSet.getLong("offset")offsets.put(topicPartition,offset)}} catch {case e:Exception =>{e.printStackTrace()throw new RuntimeException("查询偏移量失败")}} finally {if(ps != null){ps.close()}if (connection != null){connection.close()}}offsets.toMap}/*** 在一个事务中,将结果和偏移量写出到mysql数据库中* @param results* @param offsetRanges*/def writeDataAndOffsetInCommonBatch(results: Array[(String, Int)], offsetRanges: Array[OffsetRange]): Unit = {var connection:Connection = nullvar ps1:PreparedStatement = nullvar ps2:PreparedStatement = nulltry {connection = JDBCUtil.getConnection()//关闭事务的自动提交connection.setAutoCommit(false)//先判断当前的是有状态计算,还是无状态计算//有状态计算val sql1 ="""|insert into wordcount values(?,?)|on duplicate key update count = count + values(count)|""".stripMarginval sql2 ="""|insert into offsets values(?,?,?,?)|on duplicate key update offset = values(offset)|""".stripMarginps1 = connection.prepareStatement(sql1)ps2 = connection.prepareStatement(sql2)for ((word,count)<- results) {ps1.setString(1,word)ps1.setInt(2,count)//攒起来ps1.addBatch}//批量提交val dataResults = ps1.executeBatch()for (offsetRange <- offsetRanges) {ps2.setString(1,groupId)ps2.setString(2,offsetRange.topic)ps2.setInt(3,offsetRange.partition)ps2.setLong(4,offsetRange.untilOffset)//遍历一次,写一个分区的信息,一个offsetRange代表一个主题的一个分区ps2.addBatch()}val offsetResult = ps2.executeBatch()//提交事务connection.commit()println("写入数据" + dataResults.size)println("写入偏移量" + offsetResult.size)} catch {case e:Exception =>{connection.rollback()e.printStackTrace()throw new RuntimeException("写入结果和偏移量失败")}} finally {if(ps1 != null){ps1.close()}if(ps2 != null){ps2.close()}if (connection != null){connection.close()}}}def main(args: Array[String]): Unit = {import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}//创建StreamingContext,设置一个批次的时间val conf = new SparkConf().setMaster("local[4]").setAppName("test")val ssc = new StreamingContext(conf, Seconds(5))//设置日志级别ssc.sparkContext.setLogLevel("error")//配置kafkaval kafkaParams = Map[String, Object]("bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> groupId,"auto.offset.reset" -> "latest",//1.手动提交offset"enable.auto.commit" -> "false")val topics = Array(topicName)/*2.每批数据,在运算前,要先去数据库查询上一次消费的offsets查询的条件是,哪个组,消费到了哪个topic,到哪个位置*/val offsets = selectLatestOffsetsFromDB(groupId, topicName)println("-------------------------")println(offsets)//3.基于上一次消费的最新位置,获取DS之后,进行各种转换val stream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,offsets))//声明一个变量来接收偏移量var offsetRanges:Array[OffsetRange] = null//获取当前批次数据的偏移量val ds1 = stream.transform(rdd => {offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges//4.各种转换运算val rdd1 = rdd.map(record => record.value())rdd1})//4.各种转换运算val ds2 = ds1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)//4.输出ds2.foreachRDD(rdd=>{//收集聚合后的结果到Driver端val results = rdd.collect()println("聚合的结果"+results.mkString(","))//在一个事务中,将结果和offsets信息写出到mysqlwriteDataAndOffsetInCommonBatch(results,offsetRanges)})ssc.start()ssc.awaitTermination()}}
在虚拟机上打开kafka生产者进行测试
kafka-console-producer.sh --broker-list hadoop102:9092 --topic topicA

运行程序

打开mysql客户端查看


