spark实时项目课堂笔记.txt
Sqoop底层是MR,默认是4个map,没有reduce,没有落盘
Sqoop采集时要注意:mysql和Hive中null值问题:mysql是null, Hive是/N
Sqoop数据一致性问题:可以通过参数指定,采集到的数据会存到一个表里,保证数据没问题了,再存到HDFS
写代码时:
- 要考虑调用某个算子时需不需要返回值
当写库操作时通常使用
def saveMidToRedis(filterByMidDStream: DStream[StartUpLog]) = {
filterByMidDStream.foreachRDD(rdd => {
rdd.foreachPartition(partition => {
...
})
})
}
当要做
map
操作时,可以先做mapPartition
- 首先对于创建连接来说,可以减少连接次数,仅在每个分区下创建一次连接
- 对于操作数据来说,可以并行执行,一个分区就是一个线程,如果用的是
map
会把所有数据发到一个线程中串行执行
- 在SparkStreaming对接kafka时,Spark分区数量和kafka中创建的topic的分区数量一致
离线需求:数据时间:当天处理昨天的数据(T+1)
程序执行时间:凌晨12点半开始跑,上班前跑完。
实时需求:数据时间:当天分析当天实时产生的数据(T+0)
程序执行时间:一直跑724
离线和实时需求:需求单一
即席需求:需求灵活
架构图一:
优点:耦合性低,系统稳定
缺点:性能不好
架构图二:
优点:效率高
缺点:耦合度太高,系统不稳定
————————————-日活需求————————————
1.数据来源 kafka -> startUp topic中数据 启动日志
2.业务处理:统计日活 ->去重
先做批次间去重(redis ->set)
再做批次内去重(groupBykey-> 对时间排序(sort) ->取第一条数据(take))
A B
200 150 120
B A
200 170 120
结论:先用去重能力强的,再用去重能力弱的。这样可以避免我们操作的数据总量过大。
3.数据精准一次消费(数据不丢->手动维护偏移量,数据不重->幂等性)
数据去向 Hbase 明细数据
优:灵活性高
缺:数据量大,占用磁盘
也可以将数据存至mysql 存结果数据
优:数据量小
缺:需求单一,灵活性低
往Redis写数据
1.存什么
mid
2.用什么类型
set
3.redisKey怎么设计
“DAU:”+logdate
使用批次间去重方案三缺点:
缺点一:不能广播太大的数据量
200M
2
200 0000
20000000
缺点二:
4-16[a,b,c]
4-17[]
2021-4-16 23:59:57 [a,b]
2021-4-17 00:01:02 [c]
[]
4-17[c]
方案一:获取两天的数据,根据key来判断要和哪天的数据来做对比
方案二:什么都不用管
create table gmall1116_dau
(
mid varchar,
uid varchar,
appid varchar,
area varchar,
os varchar,
ch varchar,
type varchar,
vs varchar,
logDate varchar,
logHour varchar,
ts bigint
CONSTRAINT dau_pk PRIMARY KEY (mid, logDate));
springboot分层
Controller
接收请求
Service
处理请求
DAO mapper(mybatis ->JDBC)
操作数据
@RestController 标识为Controller层并返回但是普通对象,如String
@RequestMapping(“xxx”) 通过这个注解可以将发送过来的请求映射到对应的方法中从而调用对应的方法
@RequestParam(“xxx”) 通过这个注解可以将发送过来携带参数的请求映射到方法中的具体某个参数
@RestController = @Controller + @ResponseBody =>可以返回一个普通对象而不是一个网页
@Service标识为service层
———Lombok(小辣椒)插件 通常用于javaBean
@Data =>get/set方法
@NoArgsConstructor =>空参构造
@AllArgsConstructor =>全参构造
select LOGHOUR lh, count() ct from GMALL1116DAU where LOGDATE=’2021-04-17’ group by LOGHOUR
+——-+———+
| LH | CT |
+——-+———+
| 11 | 990 |
| 12 | 9 |
+——-+———+
[(LH -> 11),(CT->990),
(LH->12),(CT->9)]
(11->990),(12->9)
—————————————-交易额需求————————————
binlog分类
STATEMENT
语句级别,保存的是mysql的执行语句 类比于redis的AOF
ROW
行级别,保存的是具体数据 类比redis的RDB
MIXED
混合
存储过程:
指定模拟哪天的数据(日期) 生成几个订单 生成几个用户 生成数据前是否要情况这些表 true:清空 false:不清空
———————————————-预警需求——————————————————
同一设备,(后:mid groupBykey)
5分钟内 (先:开5分钟窗口)
三次及以上用不同账号领取优惠劵,(uid >=3 ? 去重->set set->集合大小来判断有几个不同用户 coupon?)
并且过程中没有浏览商品。(反向判断,是否浏览商品,浏览的话就不符合预警日志)
达到以上要求则产生一条预警日志。(生成预警日志)
并且同一设备,每分钟只记录一次预警。(保存到ES,doc id->mid+精确到分钟的时间)
——————————————灵活分析需求——————————————————
实时模块作用:
将order_info、order_detail、user_info这三张表的数据关联起来,然后明细数据保存至ES。
重点解决:
因网络延迟所带来的数据丢失问题。
orderInfo
1.存什么
orderInfo
2.用什么类型
string
3.redisKey怎么设计
“OrderInfo:”orderId
orderDetail
1.存什么
orderDetail
2.用什么类型
set
3.redisKey怎么设计
“orderDetail:”orderId
userInfo
1.存什么
userInfo
2.用什么类型
string
3.redisKey怎么设计
“userInfo:”userId
import collection.JavaConverters.
通过导入上面的包可以将java集合转为scala集合,用法:在java集合后面.asScala
import org.json4s.native.Serialization
implicit val formats=org.json4s.DefaultFormats
val orderInfoJson: String = Serialization.write(orderInfo)
通过以上方式可以将样例类转为json字符串
对于双流Json+缓存方式
orderInfo:必要干的事:
1.将自己写入缓存
2.查对方缓存,看有没有能关联上的数据
orderDetail:必要干的事:
1.查对方缓存,看有没有能关联上的数据
接口数据结构分析:
Map
total 62 int
stat List