批处理:

  • 批处理主要操作大容量静态数据集,并在计算过程中完成后返回结果。可以认为,处理的是一个固定时间间隔分组的数据点集合。批处理模式中使用的数据集通常符号下列特征:
    • 有界:批处理数据集代表数据的有限集合
    • 持久:数据通常始终存储在某种类型的持久存储位置中
    • 大量:批处理操作通常是处理极为海量数据集的唯一方法

流处理:

  • 流处理可以对随时进入系统的数据进行计算。流处理方式无需针对整个数据集执行操作,而是对通过系统传输的每个数据执行操作。流处理中的数据集是”无边界的”,这就产生了几个重要的影响:
    • 可以处理几乎无限量的数据,但是同一时间只能处理一条数据,不同记录间只维持最少量的状态
    • 处理工作是基于事件的,除非明确停止是否没有”尽头”
    • 处理结果立刻可用,并会随着新数据的抵达继续更新

一、电商用户行为分析

  • 用户
    • 登陆方式
    • 上线时间点和时长
    • 页面停留和跳转
  • 商品
    • 收藏、评分、评价、打标签
    • 点击、浏览、购买、支付

由此可以从如下几个角度进行分析:

  • 统计分析
    • 点击
    • 热门商品、近期热门商品、分类热门商品、流量统计
  • 偏好统计
    • 收藏、喜欢、评分、打标签
    • 用户画像、推进列标(结合特征工程和机器学习算法)
  • 风险控制
    • 下订单、支付、登录
    • 刷单监控、订单失效监控、恶意登陆监控

二、项目模块设计

  • 实时统计分析
    • 实时热门商品统计
    • 实时热门页面流量统计
    • 实时访问流量统计
    • APP市场推广统计
    • 页面广告点击量统计
  • 业务流程及风险控制
    • 页面广告黑名单过滤
    • 恶意登陆监控
    • 订单支付失效监控
    • 支付实时对账

三、数据源

  • 用户行为数据
    • userBehavior.csv(userid,itemId,categoryid,behavior,timestamp)(加密后用户id,加密后商品id,商品所属类别id,用户行为类型,行为发生时间戳)
  • web服务器日志
    • apache.log(ip,userid,eventTime,method,url)(访问ip,访问userid,访问时间,访问方法,访问url)

四、需求

1、热门实时商品统计

  • 基本需求
    • 统计最近1小时的热门商品,每5分钟更新一次
    • 热门度用浏览次数(“pv”)来衡量
  • 解决思路
    • 在所有用户行为数据中,过滤出浏览(“pv”)行为进行统计
    • 在构建滑动窗口,窗口长度为1小时,滑动距离为5分钟

2、实时流量统计-热门页面

  • 基本需求
    • 从web服务器日志中,统计实时的热门访问页面
    • 统计每分钟的ip访问量,取出访问量最大的5个地址,每5秒更新一次
  • 解决思路
    • 将服务器日志中的事件,转换成时间戳,作为EventTime
    • 构建滑动窗口,窗口长度为1分钟,滑动距离为5秒

3、实时流量统计-PV和UV

  • 基本需求
    • 从埋点日志中,统计实时的PV和UV
    • 统计每小时的访问量(PV),并对用户进行去重
  • 解决思路
    • 统计埋点日志中的pv行为,利用set数据结构进行去重
    • 对于超大规模的数据,可以考虑用布隆过滤器进行去重

4、市场营销分析-APP市场推广统计

  • 基本需求
    • 从埋点日志中,统计APP市场推广的数据指标
    • 按照不同的推广渠道,分别统计数据
  • 解决思路
    • 通过过滤日志中的用户行为,按照不同的渠道进行统计
    • 可以用processing function处理,得到自定义的输出数据信息

5、市场营销分析-页面广告统计

  • 基本需求
    • 从埋点日志中,统计每小时页面广告的点击量,5秒刷新一次,并按照不同省份进行划分
    • 对于“刷单”式的频繁点击行为进行过滤,并将该用户加入黑名单
  • 解决思路
    • 根据省份进行分组,创建长度为1小时、滑动距离为5秒的时间窗口进行统计
    • 可以用processfunction进行黑名单过滤,检测用户对同一广告的点击量,如果超过上限则将该用户信息以侧输出流输出到黑名单

6、恶意登陆监控

  • 基本需求
    • 用户在短时间内频繁登陆失败,有程序恶意攻击的可能
    • 同一用户(可以式不同ip)在2秒内连续登陆失败,需求报警
  • 解决思路
    • 将用户登陆失败行为存入ListState,设定定时器2秒后触发,查看ListState中有几次失败登陆
    • 更加精确的检测,可以使用CEP库实现事件流的模式匹配

7、订单支付实时监控

  • 基本需求
    • 用户下单后,应设置订单失效时间,以提高用户支付的意愿,并降低系统风险
    • 用户下单后15分钟未支付,则输出监控信息
  • 解决思路
    • 利用CEP库进行事件流的模式匹配,并设定匹配的时间间隔
    • 也可以利用状态编程,用processfunction实现处理逻辑

8、订单支付实时对账

  • 基本需求
    • 用户下单并支付后,应查询到账信息,进行实时对账
    • 如果有不匹配的支付信息或者到账信息,输出提示信息
  • 解决思路
    • 从两条流中分别读取订单支付信息和到账信息,合并处理
    • 用connect连接合并两条流,用coProcessFunction做匹配处理

五、实现代码

5.2、实时流量统计-热门页面

  1. package com.learn.networkflow
  2. import java.net.URL
  3. import java.sql.Timestamp
  4. import java.text.SimpleDateFormat
  5. import java.util.Map
  6. import java.{lang, util}
  7. import org.apache.flink.api.common.functions.AggregateFunction
  8. import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor}
  9. import org.apache.flink.streaming.api.TimeCharacteristic
  10. import org.apache.flink.streaming.api.datastream.{DataStreamSource, KeyedStream, SingleOutputStreamOperator, WindowedStream}
  11. import org.apache.flink.streaming.api.environment._
  12. import org.apache.flink.streaming.api.functions.KeyedProcessFunction
  13. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
  14. import org.apache.flink.streaming.api.functions.windowing.WindowFunction
  15. import org.apache.flink.streaming.api.scala.OutputTag
  16. import org.apache.flink.api.scala._
  17. import org.apache.flink.streaming.api.windowing.time.Time
  18. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  19. import org.apache.flink.util.Collector
  20. import scala.collection.mutable.ListBuffer
  21. //定义输入数据
  22. //访问ip,访问userid,访问时间,访问方法,访问url
  23. case class ApacheLogEvent(ip:String , userid: String , eventTime: Long, method:String ,url:String)
  24. //窗口聚合结果样例类
  25. case class PageViewCount(url:String , windowEnd: Long , count: Long)
  26. /**
  27. * ## 实时流量统计-热门页面
  28. *
  29. * - 基本需求
  30. * - 从web服务器日志中,统计实时的热门访问页面
  31. * - 统计每分钟的ip访问量,取出访问量最大的5个地址,每5秒更新一次
  32. * - 解决思路
  33. * - 将服务器日志中的事件,转换成时间戳,作为EventTime
  34. * - 构建滑动窗口,窗口长度为1分钟,滑动距离为5秒
  35. *
  36. */
  37. object HotPageNetworkFlow {
  38. def main(args: Array[String]): Unit = {
  39. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  40. env.setParallelism(1)
  41. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  42. // val resource: URL = getClass.getResource("/apache.log")
  43. // val inputStream: DataStreamSource[String] = env.readTextFile(resource.getPath)
  44. val inputStream: DataStreamSource[String] = env.socketTextStream("localhost",7777)
  45. val dataStream: SingleOutputStreamOperator[ApacheLogEvent] = inputStream.map(data => {
  46. val arr: Array[String] = data.split(" ")
  47. val format = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
  48. val ts: Long = format.parse(arr(2)).getTime
  49. ApacheLogEvent(arr(0), arr(1), ts, arr(3), arr(4))
  50. }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(1)) {
  51. override def extractTimestamp(element: ApacheLogEvent): Long = element.eventTime
  52. })
  53. dataStream.print("data")
  54. val filterStream: SingleOutputStreamOperator[ApacheLogEvent] = dataStream.filter(_.method=="GET")
  55. val keyedByStream: KeyedStream[ApacheLogEvent, String] = filterStream.keyBy(_.url)
  56. // val windowStream: WindowedStream[ApacheLogEvent, String, TimeWindow] = keyedByStream.timeWindow(Time.minutes(10) , Time.seconds(5))
  57. // implicit val typeInfo = TypeInformation.of(classOf[ApacheLogEvent])//隐式转换
  58. val tag = new OutputTag[ApacheLogEvent]("late")
  59. val aggStream = dataStream
  60. .filter(_.method == "GET")
  61. .keyBy(_.url)
  62. .timeWindow(Time.minutes(10), Time.seconds(5))
  63. .allowedLateness(Time.minutes(1))//延迟窗口多1分钟再关闭
  64. .sideOutputLateData(tag) //标记迟到数据(即窗口关闭后的数据)进入侧输出流,迟到数据最晚时间 = watermark - windowside - lateness
  65. .aggregate(new PageCountAgg(), new PageViewCountWindowResult)
  66. aggStream.print("agg")
  67. val result: SingleOutputStreamOperator[String] = aggStream.keyBy(_.windowEnd)
  68. .process(new TopNHotPages(5))
  69. result.print("result")
  70. aggStream.getSideOutput(tag).print("late")//延迟数据输出
  71. env.execute("Hot Pages Job")
  72. }
  73. }
  74. class PageCountAgg extends AggregateFunction[ApacheLogEvent , Long , Long]{
  75. override def createAccumulator(): Long = 0L
  76. override def add(value: ApacheLogEvent, accumulator: Long): Long = accumulator+1L
  77. override def getResult(accumulator: Long): Long = accumulator
  78. override def merge(a: Long, b: Long): Long = a+b
  79. }
  80. class PageViewCountWindowResult extends WindowFunction[Long , PageViewCount , String , TimeWindow]{
  81. override def apply(key: String, window: TimeWindow, input: lang.Iterable[Long], out: Collector[PageViewCount]): Unit = {
  82. out.collect(PageViewCount(key , window.getEnd , input.iterator().next()))
  83. }
  84. }
  85. class TopNHotPages(n: Int) extends KeyedProcessFunction[Long , PageViewCount , String]{
  86. //保存 url-count
  87. lazy val mapState: MapState[String,Long] = getRuntimeContext.getMapState(new MapStateDescriptor[String,Long]("pageViewCount",classOf[String],classOf[Long]))
  88. override def processElement(value: PageViewCount, ctx: KeyedProcessFunction[Long, PageViewCount, String]#Context, out: Collector[String]): Unit = {
  89. mapState.put(value.url , value.count)
  90. println("当前windownd = "+value.windowEnd)
  91. ctx.timerService().registerEventTimeTimer(value.windowEnd+1)
  92. ctx.timerService().registerEventTimeTimer(value.windowEnd+60000) //注册一分钟后触发当窗口关闭,不再输出结果时,进行清空状态,和延迟关闭窗口时间长度一致
  93. }
  94. override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PageViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
  95. val iter: util.Iterator[Map.Entry[String, Long]] = mapState.entries().iterator()
  96. val allPageViewCount:ListBuffer[(String , Long)] = ListBuffer()
  97. while (iter.hasNext){
  98. val entry = iter.next()
  99. allPageViewCount+= ((entry.getKey , entry.getValue))
  100. }
  101. //清空状态
  102. if (timestamp == ctx.getCurrentKey + 60000){
  103. mapState.clear()
  104. return
  105. }
  106. //排序,取前几的数据
  107. val sortPageViewCounts: ListBuffer[(String, Long)] = allPageViewCount.sortWith(_._2 > _._2).take(n)
  108. val res:StringBuilder = new StringBuilder
  109. res.append("窗口结束时间:").append(new Timestamp(timestamp-1)).append("\n")
  110. for (i <- sortPageViewCounts.indices){
  111. val item: (String , Long) = sortPageViewCounts(i)
  112. res.append("No").append(i+1).append(": ")
  113. .append("url = ").append(item._1).append("\t")
  114. .append("热门度 = ").append(item._2).append("\n")
  115. }
  116. res.append("==============================\n\n")
  117. // Thread.sleep(1000)
  118. out.collect(res.toString)
  119. }
  120. }

5.3、实时流量统计-PV和UV

  1. package com.learn.networkflow
  2. import java.net.URL
  3. import org.apache.flink.api.common.functions.{AggregateFunction, MapFunction}
  4. import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
  5. import org.apache.flink.streaming.api.TimeCharacteristic
  6. import org.apache.flink.streaming.api.datastream.{DataStreamSource, SingleOutputStreamOperator}
  7. import org.apache.flink.streaming.api.functions.KeyedProcessFunction
  8. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
  9. import org.apache.flink.streaming.api.scala._
  10. import org.apache.flink.streaming.api.scala.function.WindowFunction
  11. import org.apache.flink.streaming.api.windowing.time.Time
  12. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  13. import org.apache.flink.util.Collector
  14. import scala.util.Random
  15. //定义输出数据样例类
  16. case class UserBehavior(userid:Long , itemId:Long , categoryid:Int , behavior:String , timestamp:Long)
  17. //加密后用户id,加密后商品id,商品所属类别id,用户行为类型,行为发生时间戳
  18. //定义窗口聚合结果样例类
  19. case class PvCount( windowEnd: Long , count:Long)
  20. /**
  21. * 实时流量统计-PV和UV
  22. *
  23. * - 基本需求
  24. * - 从埋点日志中,统计实时的PV和UV
  25. * - 统计每小时的访问量(PV),并对用户进行去重
  26. * - 解决思路
  27. * - 统计埋点日志中的pv行为,利用set数据结构进行去重
  28. * - 对于超大规模的数据,可以考虑用布隆过滤器进行去重
  29. */
  30. object PageView {
  31. def main(args: Array[String]): Unit = {
  32. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  33. env.setParallelism(1)
  34. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  35. // val resource: URL = getClass.getResource("/apache.log")
  36. // val inputStream: DataStream[String] = env.readTextFile(resource.getPath)
  37. val inputStream: DataStream[String] = env.socketTextStream("localhost",7777)
  38. val dataStream: DataStream[UserBehavior] = inputStream.map(
  39. data => {
  40. val arr: Array[String] = data.split(",")
  41. UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3).toString, arr(4).toLong)
  42. }
  43. ).assignAscendingTimestamps(_.timestamp*1000)
  44. val pvStream = dataStream.filter(_.behavior == "pv")
  45. // .map(data=>("pv",1L))
  46. .map(new MyKeyByFunction)
  47. .keyBy(_._1) //由于key都是一样的,所有数据都分到同一个slot,这样就会导致程序的并行度没有意义,可以使用自定义map组成一个乱序的key
  48. .timeWindow(Time.seconds(5))
  49. .aggregate(new PvCountAgg() , new PvCountWindowResult)
  50. pvStream.print("pvStream") // 这里的聚合结果每来一个数据就会聚合一次,所以需要再增加一个window分组按照每个分组输出
  51. val result = pvStream
  52. .keyBy(_.windowEnd) //再把每个window的结果聚合
  53. .process(new TotalPvCountResult) //根据每个key进行分组,定时再window结束后再统一输出
  54. result.print("result")
  55. env.execute("pv Count Job")
  56. }
  57. }
  58. class PvCountAgg() extends AggregateFunction[(String,Long) , Long , Long]{
  59. override def createAccumulator(): Long = 0L
  60. override def add(value: (String, Long), accumulator: Long): Long = accumulator+1
  61. override def getResult(accumulator: Long): Long = accumulator
  62. override def merge(a: Long, b: Long): Long = a+b
  63. }
  64. class PvCountWindowResult extends WindowFunction[Long , PvCount , String , TimeWindow]{
  65. override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PvCount]): Unit = {
  66. out.collect(PvCount( window.getEnd , input.iterator.next()))
  67. }
  68. }
  69. class MyKeyByFunction extends MapFunction[UserBehavior, (String,Long)]{
  70. override def map(value: UserBehavior): (String, Long) = {
  71. (new Random().nextString(10) , 1)
  72. }
  73. }
  74. class TotalPvCountResult extends KeyedProcessFunction[Long , PvCount , PvCount]{
  75. lazy val totalPvCountResultState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("total-pvCount",classOf[Long]))
  76. override def processElement(value: PvCount, ctx: KeyedProcessFunction[Long, PvCount, PvCount]#Context, out: Collector[PvCount]): Unit = {
  77. totalPvCountResultState.update(value.count + totalPvCountResultState.value())
  78. ctx.timerService().registerEventTimeTimer(value.windowEnd+1)
  79. }
  80. override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PvCount, PvCount]#OnTimerContext, out: Collector[PvCount]): Unit = {
  81. out.collect(PvCount(ctx.getCurrentKey , totalPvCountResultState.value()))
  82. totalPvCountResultState.clear()
  83. }
  84. }

5.4、UV统计-全窗口

  1. package com.learn.networkflow
  2. import org.apache.flink.streaming.api.TimeCharacteristic
  3. import org.apache.flink.streaming.api.scala._
  4. import org.apache.flink.streaming.api.scala.function.AllWindowFunction
  5. import org.apache.flink.streaming.api.windowing.time.Time
  6. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  7. import org.apache.flink.util.Collector
  8. //定义窗口聚合结果样例类
  9. case class UvCount( windowEnd: Long , count:Long)
  10. /**
  11. * uv统计
  12. */
  13. object UvCount {
  14. def main(args: Array[String]): Unit = {
  15. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  16. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  17. env.setParallelism(1)
  18. val inputStream: DataStream[String] = env.socketTextStream("localhost",7777)
  19. val dataStream: DataStream[UserBehavior] = inputStream.map(
  20. data => {
  21. val arr: Array[String] = data.split(",")
  22. UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3).toString, arr(4).toLong)
  23. }
  24. ).assignAscendingTimestamps(_.timestamp*1000)
  25. val uvStream = dataStream.filter(_.behavior == "pv")
  26. .timeWindowAll(Time.seconds(5))
  27. .apply(new UvCountResult())
  28. }
  29. }
  30. /**
  31. * 通过set实现数据去重,但是在实际生产环境这种方法会造成内存过大造成资源浪费
  32. * 可以实现通过布隆过滤器进行去重
  33. */
  34. class UvCountResult() extends AllWindowFunction[UserBehavior , UvCount , TimeWindow]{
  35. override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = {
  36. var userIdSet = Set[Long]()
  37. val iter: Iterator[UserBehavior] = input.iterator
  38. while (iter.hasNext){
  39. val behavior: UserBehavior = iter.next()
  40. userIdSet+=behavior.userid
  41. }
  42. out.collect(UvCount(window.getEnd , userIdSet.size))
  43. }
  44. }

5.5、市场营销分析

5.5.1、APP市场推广统计

  1. package com.market.analysis
  2. import java.lang
  3. import java.sql.Timestamp
  4. import org.apache.flink.api.common.functions.AggregateFunction
  5. import org.apache.flink.streaming.api.TimeCharacteristic
  6. import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
  7. import org.apache.flink.streaming.api.scala._
  8. import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
  9. import org.apache.flink.streaming.api.windowing.time.Time
  10. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  11. import org.apache.flink.util.Collector
  12. import scala.util.Random
  13. //输入数据类型
  14. case class MarketUserBehavior(userId: String , behavior: String , channel: String , timestamp: Long)
  15. //输出数据类型
  16. case class MarketViewCount(windowStart: String , windowEnd: String , channel: String , behavior: String , count: Long)
  17. //自定义数据测试源
  18. class SimulatedSource extends RichSourceFunction[MarketUserBehavior]{
  19. //定义数据是否产生标志位
  20. var running = true
  21. val behaviorSet: Seq[String] = Seq("view" , "click" , "download" , "install" , "uninstall")
  22. val channelSet: Seq[String] = Seq("appstore" , "weibo" , "wechat" , "tieba")
  23. val random:Random = Random
  24. //生产数据
  25. override def run(ctx: SourceFunction.SourceContext[MarketUserBehavior]): Unit = {
  26. val maxCount: Long = Long.MaxValue
  27. var count = 0L
  28. while (running && count < maxCount){
  29. val userId: String = random.nextString(20)
  30. val behavior: String = behaviorSet(random.nextInt(behaviorSet.size))
  31. val channel: String = channelSet(random.nextInt(channelSet.size))
  32. val timestamp: Long = System.currentTimeMillis()
  33. ctx.collect(MarketUserBehavior(userId,behavior,channel,timestamp))
  34. count+=1
  35. Thread.sleep(500)
  36. }
  37. }
  38. //关闭数据产生
  39. override def cancel(): Unit = {
  40. running = false
  41. }
  42. }
  43. /**
  44. * 统计分析根据不同渠道、不同行为 在过去1小时的数量
  45. */
  46. object AppMarketByChannel {
  47. def main(args: Array[String]): Unit = {
  48. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  49. env.setParallelism(1)
  50. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  51. val dataStream: DataStream[MarketUserBehavior] = env.addSource(new SimulatedSource)
  52. .assignAscendingTimestamps(_.timestamp)
  53. //统计
  54. val filterStream: DataStream[MarketUserBehavior] = dataStream.filter(_.behavior != "uninstall")
  55. val keyByStream: KeyedStream[MarketUserBehavior, (String, String)] = filterStream.keyBy(data => {
  56. (data.channel, data.behavior) //可以传入一个二元组,表示根据两个字段进行keyBy
  57. })
  58. val windowStream: WindowedStream[MarketUserBehavior, (String, String), TimeWindow] = keyByStream.timeWindow(Time.days(1) , Time.seconds(5))
  59. /**
  60. * 实现一、通过Agg函数和WindowFunction实现
  61. */
  62. val result: DataStream[MarketViewCount] = windowStream.aggregate(new AppMarkAggregate , new AppMarkWindowCount)
  63. result.print("res1")
  64. /**
  65. * 实现二、通过processFunction实现
  66. */
  67. val res2: DataStream[MarketViewCount] = windowStream.process(new MarketCountByChannel)
  68. res2.print("res2")
  69. env.execute("AppMarket Job")
  70. }
  71. }
  72. /**
  73. * 聚合
  74. */
  75. class AppMarkAggregate extends AggregateFunction[MarketUserBehavior , Long , Long]{
  76. override def createAccumulator(): Long = 0L
  77. override def add(value: MarketUserBehavior, accumulator: Long): Long = accumulator+1
  78. override def getResult(accumulator: Long): Long = accumulator
  79. override def merge(a: Long, b: Long): Long = a+b
  80. }
  81. /**
  82. * 窗口统计
  83. */
  84. class AppMarkWindowCount extends WindowFunction[Long , MarketViewCount , (String , String) , TimeWindow]{
  85. override def apply(key: (String, String), window: TimeWindow, input: Iterable[Long], out: Collector[MarketViewCount]): Unit = {
  86. out.collect(MarketViewCount(new Timestamp(window.getStart).toString , new Timestamp(window.getEnd).toString , key._1 , key._2 , input.iterator.next()))
  87. }
  88. }
  89. /**
  90. * 1、前面提到的 ReduceFunction 和 AggregateFunction 都是基于中间状态实现增量计算的窗口函数,虽然已经满足绝大多数场景,
  91. * 但在某些情况下,统计更复杂的指标可能需要依赖于窗口中所有的数据元素,或需要操作窗口中的状态数据和窗口元数据,
  92. * 这时就需要使用到 ProcessWindowsFunction,ProcessWindowsFunction 能够更加灵活地支持基于窗口全部数 据元素的结果计算,
  93. * 例如对整个窗口数据排序取 TopN,这样的需要就必须使用 ProcessWindowFunction。
  94. *
  95. * 2、ProcessWindowFunction获得一个包含窗口所有元素的可迭代器,以及一个具有时间和状态信息访问权的上下文对象,这使得它比其他窗口函数提供更大的灵活性。
  96. * 这是以性能和资源消耗为代价的,因为元素不能增量地聚合,而是需要在内部缓冲,直到认为窗口可以处理为止,使用该函数需要注意数据量,数据量太大,全量数据保存在内存中,会造成内存溢出。
  97. *
  98. */
  99. class MarketCountByChannel extends ProcessWindowFunction[MarketUserBehavior , MarketViewCount , (String,String) , TimeWindow]{
  100. override def process(key: (String, String), context: Context, elements: Iterable[MarketUserBehavior], out: Collector[MarketViewCount]): Unit = {
  101. val start: Long = context.window.getStart
  102. val end: Long = context.window.getEnd
  103. val channel = key._1
  104. val behavior = key._2
  105. val size: Int = elements.size
  106. out.collect(MarketViewCount(new Timestamp(start).toString , new Timestamp(end).toString , key._1 , key._2 , size))
  107. }
  108. }

5.5.2、页面广告分析

根据不同的省份地区,统计对不同广告的点击量,并且过滤点击超过100次的用户

  1. package com.market.analysis
  2. import java.sql.Timestamp
  3. import java.util.UUID
  4. import org.apache.flink.api.common.functions.AggregateFunction
  5. import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
  6. import org.apache.flink.streaming.api.TimeCharacteristic
  7. import org.apache.flink.streaming.api.functions.KeyedProcessFunction
  8. import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
  9. import org.apache.flink.streaming.api.scala._
  10. import org.apache.flink.streaming.api.scala.function.WindowFunction
  11. import org.apache.flink.streaming.api.windowing.time.Time
  12. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  13. import org.apache.flink.util.Collector
  14. import scala.util.Random
  15. //输入
  16. case class AdClickLog(userId:String , adId: String , province: String , city: String , ts: Long)
  17. //输出
  18. case class AdClickCountByProvince(windowEnd: String , province: String , count: Long)
  19. case class BlackListUserWarning(userId: String , adId: String , des: String)
  20. //自定义数据测试源
  21. class AdClickDataSource extends RichSourceFunction[AdClickLog]{
  22. //定义数据是否产生标志位
  23. var running = true
  24. val gdSet: Seq[String] = Seq("广州" , "深圳" , "珠海" , "中山" , "顺德")
  25. val beijingSet: Seq[String] = Seq("东城区" , "西城区" , "朝阳区" , "崇文区" , "宣武区")
  26. val fujianSet: Seq[String] = Seq("福州" , "厦门" , "漳州" , "泉州" )
  27. val shandongSet: Seq[String] = Seq("青岛" , "淄博" , "枣庄" , "烟台" , "济南" , "潍坊" , "济宁" , "日照")
  28. val provinceSet: Seq[String] = Seq("广东" , "北京" , "福建" , "山东")
  29. private val provincesMap: Map[String, Seq[String]] = Map("广东"->gdSet , "北京"->beijingSet , "福建"->fujianSet , "山东"->shandongSet)
  30. val random:Random = Random
  31. //生产数据
  32. override def run(ctx: SourceFunction.SourceContext[AdClickLog]): Unit = {
  33. val maxCount: Long = Long.MaxValue
  34. var count = 0L
  35. while (running && count < maxCount){
  36. val userId: String = UUID.randomUUID().toString.replace("-","")
  37. val adId: String = random.nextInt(100).toString
  38. val province: String = provinceSet(random.nextInt(provinceSet.size))
  39. val city: String = provincesMap.get(province).get(provincesMap.get(province).size)
  40. val timestamp: Long = System.currentTimeMillis()
  41. ctx.collect(AdClickLog(userId,adId,province,city,timestamp))
  42. count+=1
  43. Thread.sleep(50)
  44. }
  45. }
  46. //关闭数据产生
  47. override def cancel(): Unit = {
  48. running = false
  49. }
  50. }
  51. /**
  52. * 主方法实现
  53. */
  54. object AdClinkAnalysis {
  55. def main(args: Array[String]): Unit = {
  56. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  57. env.setParallelism(1)
  58. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  59. val dataStream: DataStream[AdClickLog] = env.addSource(new AdClickDataSource)
  60. .assignAscendingTimestamps(_.ts)
  61. /**
  62. * 过滤黑名单用户,对于同一用户每点击同一个广告计数加1,然后通过keyBy进行分组统计,超过一定数据量输出到侧输出流
  63. */
  64. val filterKeyByStream: KeyedStream[AdClickLog, (String, String)] = dataStream.keyBy(data=>{(data.userId,data.adId)})
  65. val filterBlackUserStream: DataStream[AdClickLog] = filterKeyByStream.process(new FilterBlackListUserResult)
  66. val keyByStream: KeyedStream[AdClickLog, String] = filterBlackUserStream.keyBy(_.province)
  67. val windowStream: WindowedStream[AdClickLog, String, TimeWindow] = keyByStream.timeWindow(Time.days(1) , Time.seconds(5))
  68. val result: DataStream[AdClickCountByProvince] = windowStream.aggregate(new AdClickAgg,new AdClickWindowRes)
  69. result.print("res")
  70. /**
  71. *
  72. */
  73. filterBlackUserStream.getSideOutput(new OutputTag[BlackListUserWarning]("warning")).print("warning")
  74. env.execute("AdClickJob")
  75. }
  76. }
  77. class AdClickAgg extends AggregateFunction[AdClickLog , Long , Long]{
  78. override def createAccumulator(): Long = 0L
  79. override def add(value: AdClickLog, accumulator: Long): Long = accumulator+1
  80. override def getResult(accumulator: Long): Long = accumulator
  81. override def merge(a: Long, b: Long): Long = a+b
  82. }
  83. class AdClickWindowRes extends WindowFunction[Long,AdClickCountByProvince,String,TimeWindow]{
  84. override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdClickCountByProvince]): Unit = {
  85. out.collect(AdClickCountByProvince(new Timestamp(window.getEnd).toString , key , input.iterator.next()))
  86. }
  87. }
  88. /**
  89. * 黑名单过滤器
  90. * 实现同一用户多次点击同一个广告即禁止其访问
  91. */
  92. class FilterBlackListUserResult extends KeyedProcessFunction[(String,String) , AdClickLog , AdClickLog]{
  93. //用户点击量
  94. lazy val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count",classOf[Long]))
  95. //清空当日黑名单时间
  96. lazy val resetTimerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("reset-ts",classOf[Long]))
  97. //是否触发了黑名单
  98. lazy val isBlackState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-Black",classOf[Boolean]))
  99. lazy val maxcount = 10
  100. override def processElement(value: AdClickLog, ctx: KeyedProcessFunction[(String, String), AdClickLog, AdClickLog]#Context, out: Collector[AdClickLog]): Unit = {
  101. val curCount: Long = countState.value()
  102. //1、判断只要是第一个数据来了,注册当天清空状态的定时器
  103. if (curCount == 0){
  104. val ts = (ctx.timerService().currentProcessingTime()/(1000*3600*24)+1)*(1000*3600*24) - 8*3600*1000 // 获取第二天0点的时间戳
  105. resetTimerTsState.update(ts)
  106. ctx.timerService().registerProcessingTimeTimer(ts)
  107. }
  108. //2、判断count值是否到达阈值,到达了则加入黑名单,输出到侧输出流
  109. if (curCount >= maxcount){
  110. if(!isBlackState.value()){
  111. isBlackState.update(true)
  112. ctx.output(new OutputTag[BlackListUserWarning]("warning"),BlackListUserWarning(value.userId , value.adId , "当前用户点击超过限制次数"))
  113. }
  114. return
  115. }
  116. //3、没有到达点击限制
  117. countState.update(curCount+1)
  118. out.collect(value)
  119. }
  120. override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(String, String), AdClickLog, AdClickLog]#OnTimerContext, out: Collector[AdClickLog]): Unit = {
  121. if (timestamp == resetTimerTsState.value()){
  122. isBlackState.clear()
  123. resetTimerTsState.clear()
  124. countState.clear()
  125. }
  126. }
  127. }

5.6、恶意登录监控

5.6.1、第一个版本

  1. package com.learn.loginfail
  2. import java.util
  3. import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
  4. import org.apache.flink.streaming.api.TimeCharacteristic
  5. import org.apache.flink.streaming.api.functions.KeyedProcessFunction
  6. import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
  7. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
  8. import org.apache.flink.streaming.api.scala._
  9. import org.apache.flink.streaming.api.windowing.time.Time
  10. import org.apache.flink.util.Collector
  11. import scala.collection.mutable.ListBuffer
  12. import scala.util.Random
  13. import java.sql.Timestamp
  14. /**
  15. * 输入数据,登陆数据
  16. */
  17. case class LoginEventData(userId:String , ip:String , eventType:String , timestamp:Long)
  18. /**
  19. * 输出样例类
  20. */
  21. case class LoginFailWarning(userId:String , firstFailTime:Long , lastFailTime:Long , warningMsg:String)
  22. //自定义数据测试源
  23. class LoginEventDataSource extends RichSourceFunction[LoginEventData]{
  24. //定义数据是否产生标志位
  25. var running = true
  26. val eventTypeSet: Seq[String] = Seq("success" , "fail" )
  27. val random:Random = Random
  28. //生产数据
  29. override def run(ctx: SourceFunction.SourceContext[LoginEventData]): Unit = {
  30. val maxCount: Long = Long.MaxValue
  31. var count = 0L
  32. while (running && count < maxCount){
  33. val userId: String = random.nextInt(20).toString
  34. val ip: String = random.nextInt(100000).toString
  35. val eventType: String = eventTypeSet(random.nextInt(eventTypeSet.size))
  36. val timestamp: Long = System.currentTimeMillis()
  37. ctx.collect(LoginEventData(userId,ip,eventType,timestamp))
  38. count+=1
  39. Thread.sleep(500)
  40. }
  41. }
  42. //关闭数据产生
  43. override def cancel(): Unit = {
  44. running = false
  45. }
  46. }
  47. /**
  48. * 检测同一用户在一段时间内,多次登陆失败则报警行为
  49. */
  50. object LoginCheckAnalysis {
  51. def main(args: Array[String]): Unit = {
  52. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  53. env.setParallelism(1)
  54. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  55. val inputStream: DataStream[LoginEventData] = env.addSource(new LoginEventDataSource)
  56. .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEventData](Time.seconds(2)) {
  57. override def extractTimestamp(element: LoginEventData): Long = element.timestamp
  58. })
  59. //进行判断和检测,如果两秒内登陆失败则输出报警信息
  60. val loginFailWarningStream: DataStream[LoginFailWarning] = inputStream.keyBy(_.userId)
  61. .process(new LoginFailWarningResult(2))
  62. loginFailWarningStream.map(data=>{
  63. (data.userId , new Timestamp(data.firstFailTime).toString , new Timestamp(data.lastFailTime).toString , data.warningMsg)
  64. }).print("fail")
  65. env.execute("LoginCheckJob")
  66. }
  67. }
  68. //处理类
  69. class LoginFailWarningResult(failMaxSize: Int) extends KeyedProcessFunction[String , LoginEventData , LoginFailWarning]{
  70. //保存所有登陆失败事件
  71. lazy val loginFailListState: ListState[LoginEventData] = getRuntimeContext.getListState(new ListStateDescriptor[LoginEventData]("failEvent-list",classOf[LoginEventData]))
  72. //保存定时器时间戳
  73. lazy val timeValueState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("time-ts",classOf[Long]))
  74. //每来一条数据,对应调用一次processElement
  75. override def processElement(value: LoginEventData, ctx: KeyedProcessFunction[String, LoginEventData, LoginFailWarning]#Context, out: Collector[LoginFailWarning]): Unit = {
  76. // println("数据进入process = "+value)
  77. //判断是否失败事件
  78. if (value.eventType == "fail"){
  79. loginFailListState.add(value)
  80. //注册2秒后触发报警定时器
  81. ctx.timerService().registerEventTimeTimer(value.timestamp+2000L)
  82. timeValueState.update(value.timestamp+2000L)
  83. }else{
  84. //如果是成功,直接清空状态 和 定时器
  85. loginFailListState.clear()
  86. ctx.timerService().deleteEventTimeTimer(timeValueState.value())
  87. timeValueState.clear()
  88. }
  89. }
  90. override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, LoginEventData, LoginFailWarning]#OnTimerContext, out: Collector[LoginFailWarning]): Unit = {
  91. val allLoginFailEventData = new ListBuffer[LoginEventData]
  92. val iter: util.Iterator[LoginEventData] = loginFailListState.get().iterator()
  93. while (iter.hasNext){
  94. allLoginFailEventData += iter.next()
  95. }
  96. //当超过一定失败次数,触发输出
  97. if (allLoginFailEventData.size >= failMaxSize){
  98. out.collect(new LoginFailWarning(allLoginFailEventData.head.userId ,
  99. allLoginFailEventData.head.timestamp, // 第一个元素
  100. allLoginFailEventData.last.timestamp, // 最后一个元素
  101. "login fail in 2 s for "+ allLoginFailEventData.length +" times"
  102. ))
  103. }
  104. loginFailListState.clear()
  105. timeValueState.clear()
  106. }
  107. }

5.6.2、另一种版本

也不是最好的解决

  1. package com.learn.loginfail
  2. import java.util
  3. import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
  4. import org.apache.flink.streaming.api.TimeCharacteristic
  5. import org.apache.flink.streaming.api.functions.KeyedProcessFunction
  6. import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
  7. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
  8. import org.apache.flink.streaming.api.scala._
  9. import org.apache.flink.streaming.api.windowing.time.Time
  10. import org.apache.flink.util.Collector
  11. import scala.collection.mutable.ListBuffer
  12. import scala.util.Random
  13. import java.sql.Timestamp
  14. /**
  15. * 检测同一用户在一段时间内,多次登陆失败则报警行为
  16. * 改进版
  17. */
  18. object LoginCheckAnalysis {
  19. def main(args: Array[String]): Unit = {
  20. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  21. env.setParallelism(1)
  22. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  23. val inputStream: DataStream[LoginEventData] = env.addSource(new LoginEventDataSource)
  24. .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEventData](Time.seconds(2)) {
  25. override def extractTimestamp(element: LoginEventData): Long = element.timestamp
  26. })
  27. //进行判断和检测,如果两秒内登陆失败则输出报警信息
  28. val loginFailWarningStream: DataStream[LoginFailWarning] = inputStream.keyBy(_.userId)
  29. .process(new LoginFailWarningAdvanceResult(2))
  30. loginFailWarningStream.map(data=>{
  31. (data.userId , new Timestamp(data.firstFailTime).toString , new Timestamp(data.lastFailTime).toString , data.warningMsg)
  32. }).print("fail")
  33. env.execute("LoginCheckJob")
  34. }
  35. }
  36. //处理类,改进版,当数据出现连续的失败时立即报警,而不是等待2秒后
  37. class LoginFailWarningAdvanceResult(failMaxSize: Int) extends KeyedProcessFunction[String , LoginEventData , LoginFailWarning]{
  38. //保存所有登陆失败事件
  39. lazy val loginFailListState: ListState[LoginEventData] = getRuntimeContext.getListState(new ListStateDescriptor[LoginEventData]("failEvent-list",classOf[LoginEventData]))
  40. //每来一条数据,对应调用一次processElement
  41. override def processElement(value: LoginEventData, ctx: KeyedProcessFunction[String, LoginEventData, LoginFailWarning]#Context, out: Collector[LoginFailWarning]): Unit = {
  42. // println("数据进入process = "+value)
  43. //判断是否失败事件
  44. if (value.eventType == "fail"){
  45. val iter: util.Iterator[LoginEventData] = loginFailListState.get().iterator()
  46. if (iter.hasNext){
  47. val preData: LoginEventData = iter.next()
  48. //如果和之前失败的时间间隔在2秒内,则报警输出
  49. if (value.timestamp < preData.timestamp+2000L){
  50. out.collect(new LoginFailWarning(value.userId ,
  51. preData.timestamp, // 第一个元素
  52. value.timestamp, // 最后一个元素
  53. "login fail for 2 times"
  54. ))
  55. }
  56. //不管有没有在2秒内间隔的失败都将状态清空,再以最新的数据作为下一次失败的基准
  57. loginFailListState.clear()
  58. loginFailListState.add(value)
  59. }else{
  60. //之前没有失败的数据,直接添加到状态中
  61. loginFailListState.add(value)
  62. }
  63. }else{
  64. //如果是成功,直接清空状态 和 定时器
  65. loginFailListState.clear()
  66. }
  67. }
  68. }

5.6.3、CEP版本

  1. <!-- cep库相关依赖 -->
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
  5. <version>${flink.version}</version>
  6. </dependency>
  1. package com.learn.loginfail
  2. import java.sql.Timestamp
  3. import java.util
  4. import org.apache.flink.cep.PatternSelectFunction
  5. import org.apache.flink.streaming.api.TimeCharacteristic
  6. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
  7. import org.apache.flink.streaming.api.scala._
  8. import org.apache.flink.streaming.api.windowing.time.Time
  9. import org.apache.flink.cep.scala.{CEP, PatternStream}
  10. import org.apache.flink.cep.scala.pattern.Pattern
  11. /**
  12. * 检测同一用户在一段时间内,多次登陆失败则报警行为
  13. * 改进版
  14. */
  15. object LoginCheckAnalysisCEP {
  16. def main(args: Array[String]): Unit = {
  17. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  18. env.setParallelism(1)
  19. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  20. val inputStream: DataStream[LoginEventData] = env.addSource(new LoginEventDataSource)
  21. .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEventData](Time.seconds(2)) {
  22. override def extractTimestamp(element: LoginEventData): Long = element.timestamp
  23. })
  24. //1、定义匹配模式,连续两秒内登陆失败
  25. val loginFailPattern: Pattern[LoginEventData, LoginEventData] = Pattern
  26. .begin[LoginEventData]("firstFail").where(_.eventType=="fail")
  27. .next("secondFail").where(_.eventType=="fail")
  28. .within(Time.seconds(2))//在2秒内
  29. //2、将模式应用到数据流上,得到一个PatternStream
  30. val patternStream: PatternStream[LoginEventData] = CEP.pattern(inputStream.keyBy(_.userId),loginFailPattern)
  31. //3、检出符合模式的数据流,需要调用select,(将检测出的一组数据放到一个map里面进行操作)
  32. val result: DataStream[LoginFailWarning] = patternStream.select(new LoginFailEventMatch)
  33. result.map(data=>{
  34. (data.userId , new Timestamp(data.firstFailTime).toString , new Timestamp(data.lastFailTime).toString , data.warningMsg)
  35. }).print("fail")
  36. env.execute("LoginCheckJob")
  37. }
  38. }
  39. /**
  40. * 实现自定义PatternSelectFunction
  41. */
  42. class LoginFailEventMatch extends PatternSelectFunction[LoginEventData , LoginFailWarning]{
  43. override def select(map: util.Map[String, util.List[LoginEventData]]): LoginFailWarning = {
  44. val firstFail: LoginEventData = map.get("firstFail").get(0)
  45. val secondFail: LoginEventData = map.get("secondFail").get(0)
  46. LoginFailWarning(firstFail.userId , firstFail.timestamp , secondFail.timestamp , "login fail")
  47. }
  48. }