批处理:
- 批处理主要操作大容量静态数据集,并在计算过程中完成后返回结果。可以认为,处理的是一个固定时间间隔分组的数据点集合。批处理模式中使用的数据集通常符号下列特征:
- 有界:批处理数据集代表数据的有限集合
- 持久:数据通常始终存储在某种类型的持久存储位置中
- 大量:批处理操作通常是处理极为海量数据集的唯一方法
流处理:
- 流处理可以对随时进入系统的数据进行计算。流处理方式无需针对整个数据集执行操作,而是对通过系统传输的每个数据执行操作。流处理中的数据集是”无边界的”,这就产生了几个重要的影响:
- 可以处理几乎无限量的数据,但是同一时间只能处理一条数据,不同记录间只维持最少量的状态
- 处理工作是基于事件的,除非明确停止是否没有”尽头”
- 处理结果立刻可用,并会随着新数据的抵达继续更新
一、电商用户行为分析
- 用户
- 登陆方式
- 上线时间点和时长
- 页面停留和跳转
- 商品
- 收藏、评分、评价、打标签
- 点击、浏览、购买、支付
由此可以从如下几个角度进行分析:
- 统计分析
- 点击
- 热门商品、近期热门商品、分类热门商品、流量统计
- 偏好统计
- 收藏、喜欢、评分、打标签
- 用户画像、推进列标(结合特征工程和机器学习算法)
- 风险控制
- 下订单、支付、登录
- 刷单监控、订单失效监控、恶意登陆监控
二、项目模块设计
- 实时统计分析
- 实时热门商品统计
- 实时热门页面流量统计
- 实时访问流量统计
- APP市场推广统计
- 页面广告点击量统计
- 业务流程及风险控制
- 页面广告黑名单过滤
- 恶意登陆监控
- 订单支付失效监控
- 支付实时对账
三、数据源
- 用户行为数据
- userBehavior.csv(userid,itemId,categoryid,behavior,timestamp)(加密后用户id,加密后商品id,商品所属类别id,用户行为类型,行为发生时间戳)
- web服务器日志
- apache.log(ip,userid,eventTime,method,url)(访问ip,访问userid,访问时间,访问方法,访问url)
四、需求
1、热门实时商品统计
- 基本需求
- 统计最近1小时的热门商品,每5分钟更新一次
- 热门度用浏览次数(“pv”)来衡量
- 解决思路
- 在所有用户行为数据中,过滤出浏览(“pv”)行为进行统计
- 在构建滑动窗口,窗口长度为1小时,滑动距离为5分钟
2、实时流量统计-热门页面
- 基本需求
- 从web服务器日志中,统计实时的热门访问页面
- 统计每分钟的ip访问量,取出访问量最大的5个地址,每5秒更新一次
- 解决思路
- 将服务器日志中的事件,转换成时间戳,作为EventTime
- 构建滑动窗口,窗口长度为1分钟,滑动距离为5秒
3、实时流量统计-PV和UV
- 基本需求
- 从埋点日志中,统计实时的PV和UV
- 统计每小时的访问量(PV),并对用户进行去重
- 解决思路
- 统计埋点日志中的pv行为,利用set数据结构进行去重
- 对于超大规模的数据,可以考虑用布隆过滤器进行去重
4、市场营销分析-APP市场推广统计
- 基本需求
- 从埋点日志中,统计APP市场推广的数据指标
- 按照不同的推广渠道,分别统计数据
- 解决思路
- 通过过滤日志中的用户行为,按照不同的渠道进行统计
- 可以用processing function处理,得到自定义的输出数据信息
5、市场营销分析-页面广告统计
- 基本需求
- 从埋点日志中,统计每小时页面广告的点击量,5秒刷新一次,并按照不同省份进行划分
- 对于“刷单”式的频繁点击行为进行过滤,并将该用户加入黑名单
- 解决思路
- 根据省份进行分组,创建长度为1小时、滑动距离为5秒的时间窗口进行统计
- 可以用processfunction进行黑名单过滤,检测用户对同一广告的点击量,如果超过上限则将该用户信息以侧输出流输出到黑名单
6、恶意登陆监控
- 基本需求
- 用户在短时间内频繁登陆失败,有程序恶意攻击的可能
- 同一用户(可以式不同ip)在2秒内连续登陆失败,需求报警
- 解决思路
- 将用户登陆失败行为存入ListState,设定定时器2秒后触发,查看ListState中有几次失败登陆
- 更加精确的检测,可以使用CEP库实现事件流的模式匹配
7、订单支付实时监控
- 基本需求
- 用户下单后,应设置订单失效时间,以提高用户支付的意愿,并降低系统风险
- 用户下单后15分钟未支付,则输出监控信息
- 解决思路
- 利用CEP库进行事件流的模式匹配,并设定匹配的时间间隔
- 也可以利用状态编程,用processfunction实现处理逻辑
8、订单支付实时对账
- 基本需求
- 用户下单并支付后,应查询到账信息,进行实时对账
- 如果有不匹配的支付信息或者到账信息,输出提示信息
- 解决思路
- 从两条流中分别读取订单支付信息和到账信息,合并处理
- 用connect连接合并两条流,用coProcessFunction做匹配处理
五、实现代码
5.2、实时流量统计-热门页面
package com.learn.networkflow
import java.net.URL
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Map
import java.{lang, util}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.{DataStreamSource, KeyedStream, SingleOutputStreamOperator, WindowedStream}
import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.functions.windowing.WindowFunction
import org.apache.flink.streaming.api.scala.OutputTag
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
//定义输入数据
//访问ip,访问userid,访问时间,访问方法,访问url
case class ApacheLogEvent(ip:String , userid: String , eventTime: Long, method:String ,url:String)
//窗口聚合结果样例类
case class PageViewCount(url:String , windowEnd: Long , count: Long)
/**
* ## 实时流量统计-热门页面
*
* - 基本需求
* - 从web服务器日志中,统计实时的热门访问页面
* - 统计每分钟的ip访问量,取出访问量最大的5个地址,每5秒更新一次
* - 解决思路
* - 将服务器日志中的事件,转换成时间戳,作为EventTime
* - 构建滑动窗口,窗口长度为1分钟,滑动距离为5秒
*
*/
object HotPageNetworkFlow {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// val resource: URL = getClass.getResource("/apache.log")
// val inputStream: DataStreamSource[String] = env.readTextFile(resource.getPath)
val inputStream: DataStreamSource[String] = env.socketTextStream("localhost",7777)
val dataStream: SingleOutputStreamOperator[ApacheLogEvent] = inputStream.map(data => {
val arr: Array[String] = data.split(" ")
val format = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
val ts: Long = format.parse(arr(2)).getTime
ApacheLogEvent(arr(0), arr(1), ts, arr(3), arr(4))
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(1)) {
override def extractTimestamp(element: ApacheLogEvent): Long = element.eventTime
})
dataStream.print("data")
val filterStream: SingleOutputStreamOperator[ApacheLogEvent] = dataStream.filter(_.method=="GET")
val keyedByStream: KeyedStream[ApacheLogEvent, String] = filterStream.keyBy(_.url)
// val windowStream: WindowedStream[ApacheLogEvent, String, TimeWindow] = keyedByStream.timeWindow(Time.minutes(10) , Time.seconds(5))
// implicit val typeInfo = TypeInformation.of(classOf[ApacheLogEvent])//隐式转换
val tag = new OutputTag[ApacheLogEvent]("late")
val aggStream = dataStream
.filter(_.method == "GET")
.keyBy(_.url)
.timeWindow(Time.minutes(10), Time.seconds(5))
.allowedLateness(Time.minutes(1))//延迟窗口多1分钟再关闭
.sideOutputLateData(tag) //标记迟到数据(即窗口关闭后的数据)进入侧输出流,迟到数据最晚时间 = watermark - windowside - lateness
.aggregate(new PageCountAgg(), new PageViewCountWindowResult)
aggStream.print("agg")
val result: SingleOutputStreamOperator[String] = aggStream.keyBy(_.windowEnd)
.process(new TopNHotPages(5))
result.print("result")
aggStream.getSideOutput(tag).print("late")//延迟数据输出
env.execute("Hot Pages Job")
}
}
class PageCountAgg extends AggregateFunction[ApacheLogEvent , Long , Long]{
override def createAccumulator(): Long = 0L
override def add(value: ApacheLogEvent, accumulator: Long): Long = accumulator+1L
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a+b
}
class PageViewCountWindowResult extends WindowFunction[Long , PageViewCount , String , TimeWindow]{
override def apply(key: String, window: TimeWindow, input: lang.Iterable[Long], out: Collector[PageViewCount]): Unit = {
out.collect(PageViewCount(key , window.getEnd , input.iterator().next()))
}
}
class TopNHotPages(n: Int) extends KeyedProcessFunction[Long , PageViewCount , String]{
//保存 url-count
lazy val mapState: MapState[String,Long] = getRuntimeContext.getMapState(new MapStateDescriptor[String,Long]("pageViewCount",classOf[String],classOf[Long]))
override def processElement(value: PageViewCount, ctx: KeyedProcessFunction[Long, PageViewCount, String]#Context, out: Collector[String]): Unit = {
mapState.put(value.url , value.count)
println("当前windownd = "+value.windowEnd)
ctx.timerService().registerEventTimeTimer(value.windowEnd+1)
ctx.timerService().registerEventTimeTimer(value.windowEnd+60000) //注册一分钟后触发当窗口关闭,不再输出结果时,进行清空状态,和延迟关闭窗口时间长度一致
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PageViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
val iter: util.Iterator[Map.Entry[String, Long]] = mapState.entries().iterator()
val allPageViewCount:ListBuffer[(String , Long)] = ListBuffer()
while (iter.hasNext){
val entry = iter.next()
allPageViewCount+= ((entry.getKey , entry.getValue))
}
//清空状态
if (timestamp == ctx.getCurrentKey + 60000){
mapState.clear()
return
}
//排序,取前几的数据
val sortPageViewCounts: ListBuffer[(String, Long)] = allPageViewCount.sortWith(_._2 > _._2).take(n)
val res:StringBuilder = new StringBuilder
res.append("窗口结束时间:").append(new Timestamp(timestamp-1)).append("\n")
for (i <- sortPageViewCounts.indices){
val item: (String , Long) = sortPageViewCounts(i)
res.append("No").append(i+1).append(": ")
.append("url = ").append(item._1).append("\t")
.append("热门度 = ").append(item._2).append("\n")
}
res.append("==============================\n\n")
// Thread.sleep(1000)
out.collect(res.toString)
}
}
5.3、实时流量统计-PV和UV
package com.learn.networkflow
import java.net.URL
import org.apache.flink.api.common.functions.{AggregateFunction, MapFunction}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.{DataStreamSource, SingleOutputStreamOperator}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.util.Random
//定义输出数据样例类
case class UserBehavior(userid:Long , itemId:Long , categoryid:Int , behavior:String , timestamp:Long)
//加密后用户id,加密后商品id,商品所属类别id,用户行为类型,行为发生时间戳
//定义窗口聚合结果样例类
case class PvCount( windowEnd: Long , count:Long)
/**
* 实时流量统计-PV和UV
*
* - 基本需求
* - 从埋点日志中,统计实时的PV和UV
* - 统计每小时的访问量(PV),并对用户进行去重
* - 解决思路
* - 统计埋点日志中的pv行为,利用set数据结构进行去重
* - 对于超大规模的数据,可以考虑用布隆过滤器进行去重
*/
object PageView {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// val resource: URL = getClass.getResource("/apache.log")
// val inputStream: DataStream[String] = env.readTextFile(resource.getPath)
val inputStream: DataStream[String] = env.socketTextStream("localhost",7777)
val dataStream: DataStream[UserBehavior] = inputStream.map(
data => {
val arr: Array[String] = data.split(",")
UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3).toString, arr(4).toLong)
}
).assignAscendingTimestamps(_.timestamp*1000)
val pvStream = dataStream.filter(_.behavior == "pv")
// .map(data=>("pv",1L))
.map(new MyKeyByFunction)
.keyBy(_._1) //由于key都是一样的,所有数据都分到同一个slot,这样就会导致程序的并行度没有意义,可以使用自定义map组成一个乱序的key
.timeWindow(Time.seconds(5))
.aggregate(new PvCountAgg() , new PvCountWindowResult)
pvStream.print("pvStream") // 这里的聚合结果每来一个数据就会聚合一次,所以需要再增加一个window分组按照每个分组输出
val result = pvStream
.keyBy(_.windowEnd) //再把每个window的结果聚合
.process(new TotalPvCountResult) //根据每个key进行分组,定时再window结束后再统一输出
result.print("result")
env.execute("pv Count Job")
}
}
class PvCountAgg() extends AggregateFunction[(String,Long) , Long , Long]{
override def createAccumulator(): Long = 0L
override def add(value: (String, Long), accumulator: Long): Long = accumulator+1
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a+b
}
class PvCountWindowResult extends WindowFunction[Long , PvCount , String , TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PvCount]): Unit = {
out.collect(PvCount( window.getEnd , input.iterator.next()))
}
}
class MyKeyByFunction extends MapFunction[UserBehavior, (String,Long)]{
override def map(value: UserBehavior): (String, Long) = {
(new Random().nextString(10) , 1)
}
}
class TotalPvCountResult extends KeyedProcessFunction[Long , PvCount , PvCount]{
lazy val totalPvCountResultState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("total-pvCount",classOf[Long]))
override def processElement(value: PvCount, ctx: KeyedProcessFunction[Long, PvCount, PvCount]#Context, out: Collector[PvCount]): Unit = {
totalPvCountResultState.update(value.count + totalPvCountResultState.value())
ctx.timerService().registerEventTimeTimer(value.windowEnd+1)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PvCount, PvCount]#OnTimerContext, out: Collector[PvCount]): Unit = {
out.collect(PvCount(ctx.getCurrentKey , totalPvCountResultState.value()))
totalPvCountResultState.clear()
}
}
5.4、UV统计-全窗口
package com.learn.networkflow
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
//定义窗口聚合结果样例类
case class UvCount( windowEnd: Long , count:Long)
/**
* uv统计
*/
object UvCount {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val inputStream: DataStream[String] = env.socketTextStream("localhost",7777)
val dataStream: DataStream[UserBehavior] = inputStream.map(
data => {
val arr: Array[String] = data.split(",")
UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3).toString, arr(4).toLong)
}
).assignAscendingTimestamps(_.timestamp*1000)
val uvStream = dataStream.filter(_.behavior == "pv")
.timeWindowAll(Time.seconds(5))
.apply(new UvCountResult())
}
}
/**
* 通过set实现数据去重,但是在实际生产环境这种方法会造成内存过大造成资源浪费
* 可以实现通过布隆过滤器进行去重
*/
class UvCountResult() extends AllWindowFunction[UserBehavior , UvCount , TimeWindow]{
override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = {
var userIdSet = Set[Long]()
val iter: Iterator[UserBehavior] = input.iterator
while (iter.hasNext){
val behavior: UserBehavior = iter.next()
userIdSet+=behavior.userid
}
out.collect(UvCount(window.getEnd , userIdSet.size))
}
}
5.5、市场营销分析
5.5.1、APP市场推广统计
package com.market.analysis
import java.lang
import java.sql.Timestamp
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.util.Random
//输入数据类型
case class MarketUserBehavior(userId: String , behavior: String , channel: String , timestamp: Long)
//输出数据类型
case class MarketViewCount(windowStart: String , windowEnd: String , channel: String , behavior: String , count: Long)
//自定义数据测试源
class SimulatedSource extends RichSourceFunction[MarketUserBehavior]{
//定义数据是否产生标志位
var running = true
val behaviorSet: Seq[String] = Seq("view" , "click" , "download" , "install" , "uninstall")
val channelSet: Seq[String] = Seq("appstore" , "weibo" , "wechat" , "tieba")
val random:Random = Random
//生产数据
override def run(ctx: SourceFunction.SourceContext[MarketUserBehavior]): Unit = {
val maxCount: Long = Long.MaxValue
var count = 0L
while (running && count < maxCount){
val userId: String = random.nextString(20)
val behavior: String = behaviorSet(random.nextInt(behaviorSet.size))
val channel: String = channelSet(random.nextInt(channelSet.size))
val timestamp: Long = System.currentTimeMillis()
ctx.collect(MarketUserBehavior(userId,behavior,channel,timestamp))
count+=1
Thread.sleep(500)
}
}
//关闭数据产生
override def cancel(): Unit = {
running = false
}
}
/**
* 统计分析根据不同渠道、不同行为 在过去1小时的数量
*/
object AppMarketByChannel {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataStream: DataStream[MarketUserBehavior] = env.addSource(new SimulatedSource)
.assignAscendingTimestamps(_.timestamp)
//统计
val filterStream: DataStream[MarketUserBehavior] = dataStream.filter(_.behavior != "uninstall")
val keyByStream: KeyedStream[MarketUserBehavior, (String, String)] = filterStream.keyBy(data => {
(data.channel, data.behavior) //可以传入一个二元组,表示根据两个字段进行keyBy
})
val windowStream: WindowedStream[MarketUserBehavior, (String, String), TimeWindow] = keyByStream.timeWindow(Time.days(1) , Time.seconds(5))
/**
* 实现一、通过Agg函数和WindowFunction实现
*/
val result: DataStream[MarketViewCount] = windowStream.aggregate(new AppMarkAggregate , new AppMarkWindowCount)
result.print("res1")
/**
* 实现二、通过processFunction实现
*/
val res2: DataStream[MarketViewCount] = windowStream.process(new MarketCountByChannel)
res2.print("res2")
env.execute("AppMarket Job")
}
}
/**
* 聚合
*/
class AppMarkAggregate extends AggregateFunction[MarketUserBehavior , Long , Long]{
override def createAccumulator(): Long = 0L
override def add(value: MarketUserBehavior, accumulator: Long): Long = accumulator+1
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a+b
}
/**
* 窗口统计
*/
class AppMarkWindowCount extends WindowFunction[Long , MarketViewCount , (String , String) , TimeWindow]{
override def apply(key: (String, String), window: TimeWindow, input: Iterable[Long], out: Collector[MarketViewCount]): Unit = {
out.collect(MarketViewCount(new Timestamp(window.getStart).toString , new Timestamp(window.getEnd).toString , key._1 , key._2 , input.iterator.next()))
}
}
/**
* 1、前面提到的 ReduceFunction 和 AggregateFunction 都是基于中间状态实现增量计算的窗口函数,虽然已经满足绝大多数场景,
* 但在某些情况下,统计更复杂的指标可能需要依赖于窗口中所有的数据元素,或需要操作窗口中的状态数据和窗口元数据,
* 这时就需要使用到 ProcessWindowsFunction,ProcessWindowsFunction 能够更加灵活地支持基于窗口全部数 据元素的结果计算,
* 例如对整个窗口数据排序取 TopN,这样的需要就必须使用 ProcessWindowFunction。
*
* 2、ProcessWindowFunction获得一个包含窗口所有元素的可迭代器,以及一个具有时间和状态信息访问权的上下文对象,这使得它比其他窗口函数提供更大的灵活性。
* 这是以性能和资源消耗为代价的,因为元素不能增量地聚合,而是需要在内部缓冲,直到认为窗口可以处理为止,使用该函数需要注意数据量,数据量太大,全量数据保存在内存中,会造成内存溢出。
*
*/
class MarketCountByChannel extends ProcessWindowFunction[MarketUserBehavior , MarketViewCount , (String,String) , TimeWindow]{
override def process(key: (String, String), context: Context, elements: Iterable[MarketUserBehavior], out: Collector[MarketViewCount]): Unit = {
val start: Long = context.window.getStart
val end: Long = context.window.getEnd
val channel = key._1
val behavior = key._2
val size: Int = elements.size
out.collect(MarketViewCount(new Timestamp(start).toString , new Timestamp(end).toString , key._1 , key._2 , size))
}
}
5.5.2、页面广告分析
根据不同的省份地区,统计对不同广告的点击量,并且过滤点击超过100次的用户
package com.market.analysis
import java.sql.Timestamp
import java.util.UUID
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.util.Random
//输入
case class AdClickLog(userId:String , adId: String , province: String , city: String , ts: Long)
//输出
case class AdClickCountByProvince(windowEnd: String , province: String , count: Long)
case class BlackListUserWarning(userId: String , adId: String , des: String)
//自定义数据测试源
class AdClickDataSource extends RichSourceFunction[AdClickLog]{
//定义数据是否产生标志位
var running = true
val gdSet: Seq[String] = Seq("广州" , "深圳" , "珠海" , "中山" , "顺德")
val beijingSet: Seq[String] = Seq("东城区" , "西城区" , "朝阳区" , "崇文区" , "宣武区")
val fujianSet: Seq[String] = Seq("福州" , "厦门" , "漳州" , "泉州" )
val shandongSet: Seq[String] = Seq("青岛" , "淄博" , "枣庄" , "烟台" , "济南" , "潍坊" , "济宁" , "日照")
val provinceSet: Seq[String] = Seq("广东" , "北京" , "福建" , "山东")
private val provincesMap: Map[String, Seq[String]] = Map("广东"->gdSet , "北京"->beijingSet , "福建"->fujianSet , "山东"->shandongSet)
val random:Random = Random
//生产数据
override def run(ctx: SourceFunction.SourceContext[AdClickLog]): Unit = {
val maxCount: Long = Long.MaxValue
var count = 0L
while (running && count < maxCount){
val userId: String = UUID.randomUUID().toString.replace("-","")
val adId: String = random.nextInt(100).toString
val province: String = provinceSet(random.nextInt(provinceSet.size))
val city: String = provincesMap.get(province).get(provincesMap.get(province).size)
val timestamp: Long = System.currentTimeMillis()
ctx.collect(AdClickLog(userId,adId,province,city,timestamp))
count+=1
Thread.sleep(50)
}
}
//关闭数据产生
override def cancel(): Unit = {
running = false
}
}
/**
* 主方法实现
*/
object AdClinkAnalysis {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataStream: DataStream[AdClickLog] = env.addSource(new AdClickDataSource)
.assignAscendingTimestamps(_.ts)
/**
* 过滤黑名单用户,对于同一用户每点击同一个广告计数加1,然后通过keyBy进行分组统计,超过一定数据量输出到侧输出流
*/
val filterKeyByStream: KeyedStream[AdClickLog, (String, String)] = dataStream.keyBy(data=>{(data.userId,data.adId)})
val filterBlackUserStream: DataStream[AdClickLog] = filterKeyByStream.process(new FilterBlackListUserResult)
val keyByStream: KeyedStream[AdClickLog, String] = filterBlackUserStream.keyBy(_.province)
val windowStream: WindowedStream[AdClickLog, String, TimeWindow] = keyByStream.timeWindow(Time.days(1) , Time.seconds(5))
val result: DataStream[AdClickCountByProvince] = windowStream.aggregate(new AdClickAgg,new AdClickWindowRes)
result.print("res")
/**
*
*/
filterBlackUserStream.getSideOutput(new OutputTag[BlackListUserWarning]("warning")).print("warning")
env.execute("AdClickJob")
}
}
class AdClickAgg extends AggregateFunction[AdClickLog , Long , Long]{
override def createAccumulator(): Long = 0L
override def add(value: AdClickLog, accumulator: Long): Long = accumulator+1
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a+b
}
class AdClickWindowRes extends WindowFunction[Long,AdClickCountByProvince,String,TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdClickCountByProvince]): Unit = {
out.collect(AdClickCountByProvince(new Timestamp(window.getEnd).toString , key , input.iterator.next()))
}
}
/**
* 黑名单过滤器
* 实现同一用户多次点击同一个广告即禁止其访问
*/
class FilterBlackListUserResult extends KeyedProcessFunction[(String,String) , AdClickLog , AdClickLog]{
//用户点击量
lazy val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count",classOf[Long]))
//清空当日黑名单时间
lazy val resetTimerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("reset-ts",classOf[Long]))
//是否触发了黑名单
lazy val isBlackState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-Black",classOf[Boolean]))
lazy val maxcount = 10
override def processElement(value: AdClickLog, ctx: KeyedProcessFunction[(String, String), AdClickLog, AdClickLog]#Context, out: Collector[AdClickLog]): Unit = {
val curCount: Long = countState.value()
//1、判断只要是第一个数据来了,注册当天清空状态的定时器
if (curCount == 0){
val ts = (ctx.timerService().currentProcessingTime()/(1000*3600*24)+1)*(1000*3600*24) - 8*3600*1000 // 获取第二天0点的时间戳
resetTimerTsState.update(ts)
ctx.timerService().registerProcessingTimeTimer(ts)
}
//2、判断count值是否到达阈值,到达了则加入黑名单,输出到侧输出流
if (curCount >= maxcount){
if(!isBlackState.value()){
isBlackState.update(true)
ctx.output(new OutputTag[BlackListUserWarning]("warning"),BlackListUserWarning(value.userId , value.adId , "当前用户点击超过限制次数"))
}
return
}
//3、没有到达点击限制
countState.update(curCount+1)
out.collect(value)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(String, String), AdClickLog, AdClickLog]#OnTimerContext, out: Collector[AdClickLog]): Unit = {
if (timestamp == resetTimerTsState.value()){
isBlackState.clear()
resetTimerTsState.clear()
countState.clear()
}
}
}
5.6、恶意登录监控
5.6.1、第一个版本
package com.learn.loginfail
import java.util
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
import scala.util.Random
import java.sql.Timestamp
/**
* 输入数据,登陆数据
*/
case class LoginEventData(userId:String , ip:String , eventType:String , timestamp:Long)
/**
* 输出样例类
*/
case class LoginFailWarning(userId:String , firstFailTime:Long , lastFailTime:Long , warningMsg:String)
//自定义数据测试源
class LoginEventDataSource extends RichSourceFunction[LoginEventData]{
//定义数据是否产生标志位
var running = true
val eventTypeSet: Seq[String] = Seq("success" , "fail" )
val random:Random = Random
//生产数据
override def run(ctx: SourceFunction.SourceContext[LoginEventData]): Unit = {
val maxCount: Long = Long.MaxValue
var count = 0L
while (running && count < maxCount){
val userId: String = random.nextInt(20).toString
val ip: String = random.nextInt(100000).toString
val eventType: String = eventTypeSet(random.nextInt(eventTypeSet.size))
val timestamp: Long = System.currentTimeMillis()
ctx.collect(LoginEventData(userId,ip,eventType,timestamp))
count+=1
Thread.sleep(500)
}
}
//关闭数据产生
override def cancel(): Unit = {
running = false
}
}
/**
* 检测同一用户在一段时间内,多次登陆失败则报警行为
*/
object LoginCheckAnalysis {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream: DataStream[LoginEventData] = env.addSource(new LoginEventDataSource)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEventData](Time.seconds(2)) {
override def extractTimestamp(element: LoginEventData): Long = element.timestamp
})
//进行判断和检测,如果两秒内登陆失败则输出报警信息
val loginFailWarningStream: DataStream[LoginFailWarning] = inputStream.keyBy(_.userId)
.process(new LoginFailWarningResult(2))
loginFailWarningStream.map(data=>{
(data.userId , new Timestamp(data.firstFailTime).toString , new Timestamp(data.lastFailTime).toString , data.warningMsg)
}).print("fail")
env.execute("LoginCheckJob")
}
}
//处理类
class LoginFailWarningResult(failMaxSize: Int) extends KeyedProcessFunction[String , LoginEventData , LoginFailWarning]{
//保存所有登陆失败事件
lazy val loginFailListState: ListState[LoginEventData] = getRuntimeContext.getListState(new ListStateDescriptor[LoginEventData]("failEvent-list",classOf[LoginEventData]))
//保存定时器时间戳
lazy val timeValueState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("time-ts",classOf[Long]))
//每来一条数据,对应调用一次processElement
override def processElement(value: LoginEventData, ctx: KeyedProcessFunction[String, LoginEventData, LoginFailWarning]#Context, out: Collector[LoginFailWarning]): Unit = {
// println("数据进入process = "+value)
//判断是否失败事件
if (value.eventType == "fail"){
loginFailListState.add(value)
//注册2秒后触发报警定时器
ctx.timerService().registerEventTimeTimer(value.timestamp+2000L)
timeValueState.update(value.timestamp+2000L)
}else{
//如果是成功,直接清空状态 和 定时器
loginFailListState.clear()
ctx.timerService().deleteEventTimeTimer(timeValueState.value())
timeValueState.clear()
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, LoginEventData, LoginFailWarning]#OnTimerContext, out: Collector[LoginFailWarning]): Unit = {
val allLoginFailEventData = new ListBuffer[LoginEventData]
val iter: util.Iterator[LoginEventData] = loginFailListState.get().iterator()
while (iter.hasNext){
allLoginFailEventData += iter.next()
}
//当超过一定失败次数,触发输出
if (allLoginFailEventData.size >= failMaxSize){
out.collect(new LoginFailWarning(allLoginFailEventData.head.userId ,
allLoginFailEventData.head.timestamp, // 第一个元素
allLoginFailEventData.last.timestamp, // 最后一个元素
"login fail in 2 s for "+ allLoginFailEventData.length +" times"
))
}
loginFailListState.clear()
timeValueState.clear()
}
}
5.6.2、另一种版本
也不是最好的解决
package com.learn.loginfail
import java.util
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
import scala.util.Random
import java.sql.Timestamp
/**
* 检测同一用户在一段时间内,多次登陆失败则报警行为
* 改进版
*/
object LoginCheckAnalysis {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream: DataStream[LoginEventData] = env.addSource(new LoginEventDataSource)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEventData](Time.seconds(2)) {
override def extractTimestamp(element: LoginEventData): Long = element.timestamp
})
//进行判断和检测,如果两秒内登陆失败则输出报警信息
val loginFailWarningStream: DataStream[LoginFailWarning] = inputStream.keyBy(_.userId)
.process(new LoginFailWarningAdvanceResult(2))
loginFailWarningStream.map(data=>{
(data.userId , new Timestamp(data.firstFailTime).toString , new Timestamp(data.lastFailTime).toString , data.warningMsg)
}).print("fail")
env.execute("LoginCheckJob")
}
}
//处理类,改进版,当数据出现连续的失败时立即报警,而不是等待2秒后
class LoginFailWarningAdvanceResult(failMaxSize: Int) extends KeyedProcessFunction[String , LoginEventData , LoginFailWarning]{
//保存所有登陆失败事件
lazy val loginFailListState: ListState[LoginEventData] = getRuntimeContext.getListState(new ListStateDescriptor[LoginEventData]("failEvent-list",classOf[LoginEventData]))
//每来一条数据,对应调用一次processElement
override def processElement(value: LoginEventData, ctx: KeyedProcessFunction[String, LoginEventData, LoginFailWarning]#Context, out: Collector[LoginFailWarning]): Unit = {
// println("数据进入process = "+value)
//判断是否失败事件
if (value.eventType == "fail"){
val iter: util.Iterator[LoginEventData] = loginFailListState.get().iterator()
if (iter.hasNext){
val preData: LoginEventData = iter.next()
//如果和之前失败的时间间隔在2秒内,则报警输出
if (value.timestamp < preData.timestamp+2000L){
out.collect(new LoginFailWarning(value.userId ,
preData.timestamp, // 第一个元素
value.timestamp, // 最后一个元素
"login fail for 2 times"
))
}
//不管有没有在2秒内间隔的失败都将状态清空,再以最新的数据作为下一次失败的基准
loginFailListState.clear()
loginFailListState.add(value)
}else{
//之前没有失败的数据,直接添加到状态中
loginFailListState.add(value)
}
}else{
//如果是成功,直接清空状态 和 定时器
loginFailListState.clear()
}
}
}
5.6.3、CEP版本
<!-- cep库相关依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
package com.learn.loginfail
import java.sql.Timestamp
import java.util
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
/**
* 检测同一用户在一段时间内,多次登陆失败则报警行为
* 改进版
*/
object LoginCheckAnalysisCEP {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val inputStream: DataStream[LoginEventData] = env.addSource(new LoginEventDataSource)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEventData](Time.seconds(2)) {
override def extractTimestamp(element: LoginEventData): Long = element.timestamp
})
//1、定义匹配模式,连续两秒内登陆失败
val loginFailPattern: Pattern[LoginEventData, LoginEventData] = Pattern
.begin[LoginEventData]("firstFail").where(_.eventType=="fail")
.next("secondFail").where(_.eventType=="fail")
.within(Time.seconds(2))//在2秒内
//2、将模式应用到数据流上,得到一个PatternStream
val patternStream: PatternStream[LoginEventData] = CEP.pattern(inputStream.keyBy(_.userId),loginFailPattern)
//3、检出符合模式的数据流,需要调用select,(将检测出的一组数据放到一个map里面进行操作)
val result: DataStream[LoginFailWarning] = patternStream.select(new LoginFailEventMatch)
result.map(data=>{
(data.userId , new Timestamp(data.firstFailTime).toString , new Timestamp(data.lastFailTime).toString , data.warningMsg)
}).print("fail")
env.execute("LoginCheckJob")
}
}
/**
* 实现自定义PatternSelectFunction
*/
class LoginFailEventMatch extends PatternSelectFunction[LoginEventData , LoginFailWarning]{
override def select(map: util.Map[String, util.List[LoginEventData]]): LoginFailWarning = {
val firstFail: LoginEventData = map.get("firstFail").get(0)
val secondFail: LoginEventData = map.get("secondFail").get(0)
LoginFailWarning(firstFail.userId , firstFail.timestamp , secondFail.timestamp , "login fail")
}
}