批处理:
- 批处理主要操作大容量静态数据集,并在计算过程中完成后返回结果。可以认为,处理的是一个固定时间间隔分组的数据点集合。批处理模式中使用的数据集通常符号下列特征:
- 有界:批处理数据集代表数据的有限集合
- 持久:数据通常始终存储在某种类型的持久存储位置中
- 大量:批处理操作通常是处理极为海量数据集的唯一方法
流处理:
- 流处理可以对随时进入系统的数据进行计算。流处理方式无需针对整个数据集执行操作,而是对通过系统传输的每个数据执行操作。流处理中的数据集是”无边界的”,这就产生了几个重要的影响:
- 可以处理几乎无限量的数据,但是同一时间只能处理一条数据,不同记录间只维持最少量的状态
- 处理工作是基于事件的,除非明确停止是否没有”尽头”
- 处理结果立刻可用,并会随着新数据的抵达继续更新
一、电商用户行为分析
- 用户
- 登陆方式
- 上线时间点和时长
- 页面停留和跳转
- 商品
- 收藏、评分、评价、打标签
- 点击、浏览、购买、支付
由此可以从如下几个角度进行分析:
- 统计分析
- 点击
- 热门商品、近期热门商品、分类热门商品、流量统计
- 偏好统计
- 收藏、喜欢、评分、打标签
- 用户画像、推进列标(结合特征工程和机器学习算法)
- 风险控制
- 下订单、支付、登录
- 刷单监控、订单失效监控、恶意登陆监控
二、项目模块设计
- 实时统计分析
- 实时热门商品统计
- 实时热门页面流量统计
- 实时访问流量统计
- APP市场推广统计
- 页面广告点击量统计
- 业务流程及风险控制
- 页面广告黑名单过滤
- 恶意登陆监控
- 订单支付失效监控
- 支付实时对账
三、数据源
- 用户行为数据
- userBehavior.csv(userid,itemId,categoryid,behavior,timestamp)(加密后用户id,加密后商品id,商品所属类别id,用户行为类型,行为发生时间戳)
- web服务器日志
- apache.log(ip,userid,eventTime,method,url)(访问ip,访问userid,访问时间,访问方法,访问url)
四、需求
1、热门实时商品统计
- 基本需求
- 统计最近1小时的热门商品,每5分钟更新一次
- 热门度用浏览次数(“pv”)来衡量
- 解决思路
- 在所有用户行为数据中,过滤出浏览(“pv”)行为进行统计
- 在构建滑动窗口,窗口长度为1小时,滑动距离为5分钟
2、实时流量统计-热门页面
- 基本需求
- 从web服务器日志中,统计实时的热门访问页面
- 统计每分钟的ip访问量,取出访问量最大的5个地址,每5秒更新一次
- 解决思路
- 将服务器日志中的事件,转换成时间戳,作为EventTime
- 构建滑动窗口,窗口长度为1分钟,滑动距离为5秒
3、实时流量统计-PV和UV
- 基本需求
- 从埋点日志中,统计实时的PV和UV
- 统计每小时的访问量(PV),并对用户进行去重
- 解决思路
- 统计埋点日志中的pv行为,利用set数据结构进行去重
- 对于超大规模的数据,可以考虑用布隆过滤器进行去重
4、市场营销分析-APP市场推广统计
- 基本需求
- 从埋点日志中,统计APP市场推广的数据指标
- 按照不同的推广渠道,分别统计数据
- 解决思路
- 通过过滤日志中的用户行为,按照不同的渠道进行统计
- 可以用processing function处理,得到自定义的输出数据信息
5、市场营销分析-页面广告统计
- 基本需求
- 从埋点日志中,统计每小时页面广告的点击量,5秒刷新一次,并按照不同省份进行划分
- 对于“刷单”式的频繁点击行为进行过滤,并将该用户加入黑名单
- 解决思路
- 根据省份进行分组,创建长度为1小时、滑动距离为5秒的时间窗口进行统计
- 可以用processfunction进行黑名单过滤,检测用户对同一广告的点击量,如果超过上限则将该用户信息以侧输出流输出到黑名单
6、恶意登陆监控
- 基本需求
- 用户在短时间内频繁登陆失败,有程序恶意攻击的可能
- 同一用户(可以式不同ip)在2秒内连续登陆失败,需求报警
- 解决思路
- 将用户登陆失败行为存入ListState,设定定时器2秒后触发,查看ListState中有几次失败登陆
- 更加精确的检测,可以使用CEP库实现事件流的模式匹配
7、订单支付实时监控
- 基本需求
- 用户下单后,应设置订单失效时间,以提高用户支付的意愿,并降低系统风险
- 用户下单后15分钟未支付,则输出监控信息
- 解决思路
- 利用CEP库进行事件流的模式匹配,并设定匹配的时间间隔
- 也可以利用状态编程,用processfunction实现处理逻辑
8、订单支付实时对账
- 基本需求
- 用户下单并支付后,应查询到账信息,进行实时对账
- 如果有不匹配的支付信息或者到账信息,输出提示信息
- 解决思路
- 从两条流中分别读取订单支付信息和到账信息,合并处理
- 用connect连接合并两条流,用coProcessFunction做匹配处理
五、实现代码
5.2、实时流量统计-热门页面
package com.learn.networkflowimport java.net.URLimport java.sql.Timestampimport java.text.SimpleDateFormatimport java.util.Mapimport java.{lang, util}import org.apache.flink.api.common.functions.AggregateFunctionimport org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor}import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.datastream.{DataStreamSource, KeyedStream, SingleOutputStreamOperator, WindowedStream}import org.apache.flink.streaming.api.environment._import org.apache.flink.streaming.api.functions.KeyedProcessFunctionimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.functions.windowing.WindowFunctionimport org.apache.flink.streaming.api.scala.OutputTagimport org.apache.flink.api.scala._import org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collectorimport scala.collection.mutable.ListBuffer//定义输入数据//访问ip,访问userid,访问时间,访问方法,访问urlcase class ApacheLogEvent(ip:String , userid: String , eventTime: Long, method:String ,url:String)//窗口聚合结果样例类case class PageViewCount(url:String , windowEnd: Long , count: Long)/*** ## 实时流量统计-热门页面** - 基本需求* - 从web服务器日志中,统计实时的热门访问页面* - 统计每分钟的ip访问量,取出访问量最大的5个地址,每5秒更新一次* - 解决思路* - 将服务器日志中的事件,转换成时间戳,作为EventTime* - 构建滑动窗口,窗口长度为1分钟,滑动距离为5秒**/object HotPageNetworkFlow {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// val resource: URL = getClass.getResource("/apache.log")// val inputStream: DataStreamSource[String] = env.readTextFile(resource.getPath)val inputStream: DataStreamSource[String] = env.socketTextStream("localhost",7777)val dataStream: SingleOutputStreamOperator[ApacheLogEvent] = inputStream.map(data => {val arr: Array[String] = data.split(" ")val format = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")val ts: Long = format.parse(arr(2)).getTimeApacheLogEvent(arr(0), arr(1), ts, arr(3), arr(4))}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(1)) {override def extractTimestamp(element: ApacheLogEvent): Long = element.eventTime})dataStream.print("data")val filterStream: SingleOutputStreamOperator[ApacheLogEvent] = dataStream.filter(_.method=="GET")val keyedByStream: KeyedStream[ApacheLogEvent, String] = filterStream.keyBy(_.url)// val windowStream: WindowedStream[ApacheLogEvent, String, TimeWindow] = keyedByStream.timeWindow(Time.minutes(10) , Time.seconds(5))// implicit val typeInfo = TypeInformation.of(classOf[ApacheLogEvent])//隐式转换val tag = new OutputTag[ApacheLogEvent]("late")val aggStream = dataStream.filter(_.method == "GET").keyBy(_.url).timeWindow(Time.minutes(10), Time.seconds(5)).allowedLateness(Time.minutes(1))//延迟窗口多1分钟再关闭.sideOutputLateData(tag) //标记迟到数据(即窗口关闭后的数据)进入侧输出流,迟到数据最晚时间 = watermark - windowside - lateness.aggregate(new PageCountAgg(), new PageViewCountWindowResult)aggStream.print("agg")val result: SingleOutputStreamOperator[String] = aggStream.keyBy(_.windowEnd).process(new TopNHotPages(5))result.print("result")aggStream.getSideOutput(tag).print("late")//延迟数据输出env.execute("Hot Pages Job")}}class PageCountAgg extends AggregateFunction[ApacheLogEvent , Long , Long]{override def createAccumulator(): Long = 0Loverride def add(value: ApacheLogEvent, accumulator: Long): Long = accumulator+1Loverride def getResult(accumulator: Long): Long = accumulatoroverride def merge(a: Long, b: Long): Long = a+b}class PageViewCountWindowResult extends WindowFunction[Long , PageViewCount , String , TimeWindow]{override def apply(key: String, window: TimeWindow, input: lang.Iterable[Long], out: Collector[PageViewCount]): Unit = {out.collect(PageViewCount(key , window.getEnd , input.iterator().next()))}}class TopNHotPages(n: Int) extends KeyedProcessFunction[Long , PageViewCount , String]{//保存 url-countlazy val mapState: MapState[String,Long] = getRuntimeContext.getMapState(new MapStateDescriptor[String,Long]("pageViewCount",classOf[String],classOf[Long]))override def processElement(value: PageViewCount, ctx: KeyedProcessFunction[Long, PageViewCount, String]#Context, out: Collector[String]): Unit = {mapState.put(value.url , value.count)println("当前windownd = "+value.windowEnd)ctx.timerService().registerEventTimeTimer(value.windowEnd+1)ctx.timerService().registerEventTimeTimer(value.windowEnd+60000) //注册一分钟后触发当窗口关闭,不再输出结果时,进行清空状态,和延迟关闭窗口时间长度一致}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PageViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {val iter: util.Iterator[Map.Entry[String, Long]] = mapState.entries().iterator()val allPageViewCount:ListBuffer[(String , Long)] = ListBuffer()while (iter.hasNext){val entry = iter.next()allPageViewCount+= ((entry.getKey , entry.getValue))}//清空状态if (timestamp == ctx.getCurrentKey + 60000){mapState.clear()return}//排序,取前几的数据val sortPageViewCounts: ListBuffer[(String, Long)] = allPageViewCount.sortWith(_._2 > _._2).take(n)val res:StringBuilder = new StringBuilderres.append("窗口结束时间:").append(new Timestamp(timestamp-1)).append("\n")for (i <- sortPageViewCounts.indices){val item: (String , Long) = sortPageViewCounts(i)res.append("No").append(i+1).append(": ").append("url = ").append(item._1).append("\t").append("热门度 = ").append(item._2).append("\n")}res.append("==============================\n\n")// Thread.sleep(1000)out.collect(res.toString)}}
5.3、实时流量统计-PV和UV
package com.learn.networkflowimport java.net.URLimport org.apache.flink.api.common.functions.{AggregateFunction, MapFunction}import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.datastream.{DataStreamSource, SingleOutputStreamOperator}import org.apache.flink.streaming.api.functions.KeyedProcessFunctionimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.WindowFunctionimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collectorimport scala.util.Random//定义输出数据样例类case class UserBehavior(userid:Long , itemId:Long , categoryid:Int , behavior:String , timestamp:Long)//加密后用户id,加密后商品id,商品所属类别id,用户行为类型,行为发生时间戳//定义窗口聚合结果样例类case class PvCount( windowEnd: Long , count:Long)/*** 实时流量统计-PV和UV** - 基本需求* - 从埋点日志中,统计实时的PV和UV* - 统计每小时的访问量(PV),并对用户进行去重* - 解决思路* - 统计埋点日志中的pv行为,利用set数据结构进行去重* - 对于超大规模的数据,可以考虑用布隆过滤器进行去重*/object PageView {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// val resource: URL = getClass.getResource("/apache.log")// val inputStream: DataStream[String] = env.readTextFile(resource.getPath)val inputStream: DataStream[String] = env.socketTextStream("localhost",7777)val dataStream: DataStream[UserBehavior] = inputStream.map(data => {val arr: Array[String] = data.split(",")UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3).toString, arr(4).toLong)}).assignAscendingTimestamps(_.timestamp*1000)val pvStream = dataStream.filter(_.behavior == "pv")// .map(data=>("pv",1L)).map(new MyKeyByFunction).keyBy(_._1) //由于key都是一样的,所有数据都分到同一个slot,这样就会导致程序的并行度没有意义,可以使用自定义map组成一个乱序的key.timeWindow(Time.seconds(5)).aggregate(new PvCountAgg() , new PvCountWindowResult)pvStream.print("pvStream") // 这里的聚合结果每来一个数据就会聚合一次,所以需要再增加一个window分组按照每个分组输出val result = pvStream.keyBy(_.windowEnd) //再把每个window的结果聚合.process(new TotalPvCountResult) //根据每个key进行分组,定时再window结束后再统一输出result.print("result")env.execute("pv Count Job")}}class PvCountAgg() extends AggregateFunction[(String,Long) , Long , Long]{override def createAccumulator(): Long = 0Loverride def add(value: (String, Long), accumulator: Long): Long = accumulator+1override def getResult(accumulator: Long): Long = accumulatoroverride def merge(a: Long, b: Long): Long = a+b}class PvCountWindowResult extends WindowFunction[Long , PvCount , String , TimeWindow]{override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PvCount]): Unit = {out.collect(PvCount( window.getEnd , input.iterator.next()))}}class MyKeyByFunction extends MapFunction[UserBehavior, (String,Long)]{override def map(value: UserBehavior): (String, Long) = {(new Random().nextString(10) , 1)}}class TotalPvCountResult extends KeyedProcessFunction[Long , PvCount , PvCount]{lazy val totalPvCountResultState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("total-pvCount",classOf[Long]))override def processElement(value: PvCount, ctx: KeyedProcessFunction[Long, PvCount, PvCount]#Context, out: Collector[PvCount]): Unit = {totalPvCountResultState.update(value.count + totalPvCountResultState.value())ctx.timerService().registerEventTimeTimer(value.windowEnd+1)}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PvCount, PvCount]#OnTimerContext, out: Collector[PvCount]): Unit = {out.collect(PvCount(ctx.getCurrentKey , totalPvCountResultState.value()))totalPvCountResultState.clear()}}
5.4、UV统计-全窗口
package com.learn.networkflowimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.AllWindowFunctionimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collector//定义窗口聚合结果样例类case class UvCount( windowEnd: Long , count:Long)/*** uv统计*/object UvCount {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)val inputStream: DataStream[String] = env.socketTextStream("localhost",7777)val dataStream: DataStream[UserBehavior] = inputStream.map(data => {val arr: Array[String] = data.split(",")UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3).toString, arr(4).toLong)}).assignAscendingTimestamps(_.timestamp*1000)val uvStream = dataStream.filter(_.behavior == "pv").timeWindowAll(Time.seconds(5)).apply(new UvCountResult())}}/*** 通过set实现数据去重,但是在实际生产环境这种方法会造成内存过大造成资源浪费* 可以实现通过布隆过滤器进行去重*/class UvCountResult() extends AllWindowFunction[UserBehavior , UvCount , TimeWindow]{override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = {var userIdSet = Set[Long]()val iter: Iterator[UserBehavior] = input.iteratorwhile (iter.hasNext){val behavior: UserBehavior = iter.next()userIdSet+=behavior.userid}out.collect(UvCount(window.getEnd , userIdSet.size))}}
5.5、市场营销分析
5.5.1、APP市场推广统计
package com.market.analysisimport java.langimport java.sql.Timestampimport org.apache.flink.api.common.functions.AggregateFunctionimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}import org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collectorimport scala.util.Random//输入数据类型case class MarketUserBehavior(userId: String , behavior: String , channel: String , timestamp: Long)//输出数据类型case class MarketViewCount(windowStart: String , windowEnd: String , channel: String , behavior: String , count: Long)//自定义数据测试源class SimulatedSource extends RichSourceFunction[MarketUserBehavior]{//定义数据是否产生标志位var running = trueval behaviorSet: Seq[String] = Seq("view" , "click" , "download" , "install" , "uninstall")val channelSet: Seq[String] = Seq("appstore" , "weibo" , "wechat" , "tieba")val random:Random = Random//生产数据override def run(ctx: SourceFunction.SourceContext[MarketUserBehavior]): Unit = {val maxCount: Long = Long.MaxValuevar count = 0Lwhile (running && count < maxCount){val userId: String = random.nextString(20)val behavior: String = behaviorSet(random.nextInt(behaviorSet.size))val channel: String = channelSet(random.nextInt(channelSet.size))val timestamp: Long = System.currentTimeMillis()ctx.collect(MarketUserBehavior(userId,behavior,channel,timestamp))count+=1Thread.sleep(500)}}//关闭数据产生override def cancel(): Unit = {running = false}}/*** 统计分析根据不同渠道、不同行为 在过去1小时的数量*/object AppMarketByChannel {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val dataStream: DataStream[MarketUserBehavior] = env.addSource(new SimulatedSource).assignAscendingTimestamps(_.timestamp)//统计val filterStream: DataStream[MarketUserBehavior] = dataStream.filter(_.behavior != "uninstall")val keyByStream: KeyedStream[MarketUserBehavior, (String, String)] = filterStream.keyBy(data => {(data.channel, data.behavior) //可以传入一个二元组,表示根据两个字段进行keyBy})val windowStream: WindowedStream[MarketUserBehavior, (String, String), TimeWindow] = keyByStream.timeWindow(Time.days(1) , Time.seconds(5))/*** 实现一、通过Agg函数和WindowFunction实现*/val result: DataStream[MarketViewCount] = windowStream.aggregate(new AppMarkAggregate , new AppMarkWindowCount)result.print("res1")/*** 实现二、通过processFunction实现*/val res2: DataStream[MarketViewCount] = windowStream.process(new MarketCountByChannel)res2.print("res2")env.execute("AppMarket Job")}}/*** 聚合*/class AppMarkAggregate extends AggregateFunction[MarketUserBehavior , Long , Long]{override def createAccumulator(): Long = 0Loverride def add(value: MarketUserBehavior, accumulator: Long): Long = accumulator+1override def getResult(accumulator: Long): Long = accumulatoroverride def merge(a: Long, b: Long): Long = a+b}/*** 窗口统计*/class AppMarkWindowCount extends WindowFunction[Long , MarketViewCount , (String , String) , TimeWindow]{override def apply(key: (String, String), window: TimeWindow, input: Iterable[Long], out: Collector[MarketViewCount]): Unit = {out.collect(MarketViewCount(new Timestamp(window.getStart).toString , new Timestamp(window.getEnd).toString , key._1 , key._2 , input.iterator.next()))}}/*** 1、前面提到的 ReduceFunction 和 AggregateFunction 都是基于中间状态实现增量计算的窗口函数,虽然已经满足绝大多数场景,* 但在某些情况下,统计更复杂的指标可能需要依赖于窗口中所有的数据元素,或需要操作窗口中的状态数据和窗口元数据,* 这时就需要使用到 ProcessWindowsFunction,ProcessWindowsFunction 能够更加灵活地支持基于窗口全部数 据元素的结果计算,* 例如对整个窗口数据排序取 TopN,这样的需要就必须使用 ProcessWindowFunction。** 2、ProcessWindowFunction获得一个包含窗口所有元素的可迭代器,以及一个具有时间和状态信息访问权的上下文对象,这使得它比其他窗口函数提供更大的灵活性。* 这是以性能和资源消耗为代价的,因为元素不能增量地聚合,而是需要在内部缓冲,直到认为窗口可以处理为止,使用该函数需要注意数据量,数据量太大,全量数据保存在内存中,会造成内存溢出。**/class MarketCountByChannel extends ProcessWindowFunction[MarketUserBehavior , MarketViewCount , (String,String) , TimeWindow]{override def process(key: (String, String), context: Context, elements: Iterable[MarketUserBehavior], out: Collector[MarketViewCount]): Unit = {val start: Long = context.window.getStartval end: Long = context.window.getEndval channel = key._1val behavior = key._2val size: Int = elements.sizeout.collect(MarketViewCount(new Timestamp(start).toString , new Timestamp(end).toString , key._1 , key._2 , size))}}
5.5.2、页面广告分析
根据不同的省份地区,统计对不同广告的点击量,并且过滤点击超过100次的用户
package com.market.analysisimport java.sql.Timestampimport java.util.UUIDimport org.apache.flink.api.common.functions.AggregateFunctionimport org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.KeyedProcessFunctionimport org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala.function.WindowFunctionimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collectorimport scala.util.Random//输入case class AdClickLog(userId:String , adId: String , province: String , city: String , ts: Long)//输出case class AdClickCountByProvince(windowEnd: String , province: String , count: Long)case class BlackListUserWarning(userId: String , adId: String , des: String)//自定义数据测试源class AdClickDataSource extends RichSourceFunction[AdClickLog]{//定义数据是否产生标志位var running = trueval gdSet: Seq[String] = Seq("广州" , "深圳" , "珠海" , "中山" , "顺德")val beijingSet: Seq[String] = Seq("东城区" , "西城区" , "朝阳区" , "崇文区" , "宣武区")val fujianSet: Seq[String] = Seq("福州" , "厦门" , "漳州" , "泉州" )val shandongSet: Seq[String] = Seq("青岛" , "淄博" , "枣庄" , "烟台" , "济南" , "潍坊" , "济宁" , "日照")val provinceSet: Seq[String] = Seq("广东" , "北京" , "福建" , "山东")private val provincesMap: Map[String, Seq[String]] = Map("广东"->gdSet , "北京"->beijingSet , "福建"->fujianSet , "山东"->shandongSet)val random:Random = Random//生产数据override def run(ctx: SourceFunction.SourceContext[AdClickLog]): Unit = {val maxCount: Long = Long.MaxValuevar count = 0Lwhile (running && count < maxCount){val userId: String = UUID.randomUUID().toString.replace("-","")val adId: String = random.nextInt(100).toStringval province: String = provinceSet(random.nextInt(provinceSet.size))val city: String = provincesMap.get(province).get(provincesMap.get(province).size)val timestamp: Long = System.currentTimeMillis()ctx.collect(AdClickLog(userId,adId,province,city,timestamp))count+=1Thread.sleep(50)}}//关闭数据产生override def cancel(): Unit = {running = false}}/*** 主方法实现*/object AdClinkAnalysis {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val dataStream: DataStream[AdClickLog] = env.addSource(new AdClickDataSource).assignAscendingTimestamps(_.ts)/*** 过滤黑名单用户,对于同一用户每点击同一个广告计数加1,然后通过keyBy进行分组统计,超过一定数据量输出到侧输出流*/val filterKeyByStream: KeyedStream[AdClickLog, (String, String)] = dataStream.keyBy(data=>{(data.userId,data.adId)})val filterBlackUserStream: DataStream[AdClickLog] = filterKeyByStream.process(new FilterBlackListUserResult)val keyByStream: KeyedStream[AdClickLog, String] = filterBlackUserStream.keyBy(_.province)val windowStream: WindowedStream[AdClickLog, String, TimeWindow] = keyByStream.timeWindow(Time.days(1) , Time.seconds(5))val result: DataStream[AdClickCountByProvince] = windowStream.aggregate(new AdClickAgg,new AdClickWindowRes)result.print("res")/****/filterBlackUserStream.getSideOutput(new OutputTag[BlackListUserWarning]("warning")).print("warning")env.execute("AdClickJob")}}class AdClickAgg extends AggregateFunction[AdClickLog , Long , Long]{override def createAccumulator(): Long = 0Loverride def add(value: AdClickLog, accumulator: Long): Long = accumulator+1override def getResult(accumulator: Long): Long = accumulatoroverride def merge(a: Long, b: Long): Long = a+b}class AdClickWindowRes extends WindowFunction[Long,AdClickCountByProvince,String,TimeWindow]{override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdClickCountByProvince]): Unit = {out.collect(AdClickCountByProvince(new Timestamp(window.getEnd).toString , key , input.iterator.next()))}}/*** 黑名单过滤器* 实现同一用户多次点击同一个广告即禁止其访问*/class FilterBlackListUserResult extends KeyedProcessFunction[(String,String) , AdClickLog , AdClickLog]{//用户点击量lazy val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count",classOf[Long]))//清空当日黑名单时间lazy val resetTimerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("reset-ts",classOf[Long]))//是否触发了黑名单lazy val isBlackState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-Black",classOf[Boolean]))lazy val maxcount = 10override def processElement(value: AdClickLog, ctx: KeyedProcessFunction[(String, String), AdClickLog, AdClickLog]#Context, out: Collector[AdClickLog]): Unit = {val curCount: Long = countState.value()//1、判断只要是第一个数据来了,注册当天清空状态的定时器if (curCount == 0){val ts = (ctx.timerService().currentProcessingTime()/(1000*3600*24)+1)*(1000*3600*24) - 8*3600*1000 // 获取第二天0点的时间戳resetTimerTsState.update(ts)ctx.timerService().registerProcessingTimeTimer(ts)}//2、判断count值是否到达阈值,到达了则加入黑名单,输出到侧输出流if (curCount >= maxcount){if(!isBlackState.value()){isBlackState.update(true)ctx.output(new OutputTag[BlackListUserWarning]("warning"),BlackListUserWarning(value.userId , value.adId , "当前用户点击超过限制次数"))}return}//3、没有到达点击限制countState.update(curCount+1)out.collect(value)}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(String, String), AdClickLog, AdClickLog]#OnTimerContext, out: Collector[AdClickLog]): Unit = {if (timestamp == resetTimerTsState.value()){isBlackState.clear()resetTimerTsState.clear()countState.clear()}}}
5.6、恶意登录监控
5.6.1、第一个版本
package com.learn.loginfailimport java.utilimport org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.KeyedProcessFunctionimport org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferimport scala.util.Randomimport java.sql.Timestamp/*** 输入数据,登陆数据*/case class LoginEventData(userId:String , ip:String , eventType:String , timestamp:Long)/*** 输出样例类*/case class LoginFailWarning(userId:String , firstFailTime:Long , lastFailTime:Long , warningMsg:String)//自定义数据测试源class LoginEventDataSource extends RichSourceFunction[LoginEventData]{//定义数据是否产生标志位var running = trueval eventTypeSet: Seq[String] = Seq("success" , "fail" )val random:Random = Random//生产数据override def run(ctx: SourceFunction.SourceContext[LoginEventData]): Unit = {val maxCount: Long = Long.MaxValuevar count = 0Lwhile (running && count < maxCount){val userId: String = random.nextInt(20).toStringval ip: String = random.nextInt(100000).toStringval eventType: String = eventTypeSet(random.nextInt(eventTypeSet.size))val timestamp: Long = System.currentTimeMillis()ctx.collect(LoginEventData(userId,ip,eventType,timestamp))count+=1Thread.sleep(500)}}//关闭数据产生override def cancel(): Unit = {running = false}}/*** 检测同一用户在一段时间内,多次登陆失败则报警行为*/object LoginCheckAnalysis {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val inputStream: DataStream[LoginEventData] = env.addSource(new LoginEventDataSource).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEventData](Time.seconds(2)) {override def extractTimestamp(element: LoginEventData): Long = element.timestamp})//进行判断和检测,如果两秒内登陆失败则输出报警信息val loginFailWarningStream: DataStream[LoginFailWarning] = inputStream.keyBy(_.userId).process(new LoginFailWarningResult(2))loginFailWarningStream.map(data=>{(data.userId , new Timestamp(data.firstFailTime).toString , new Timestamp(data.lastFailTime).toString , data.warningMsg)}).print("fail")env.execute("LoginCheckJob")}}//处理类class LoginFailWarningResult(failMaxSize: Int) extends KeyedProcessFunction[String , LoginEventData , LoginFailWarning]{//保存所有登陆失败事件lazy val loginFailListState: ListState[LoginEventData] = getRuntimeContext.getListState(new ListStateDescriptor[LoginEventData]("failEvent-list",classOf[LoginEventData]))//保存定时器时间戳lazy val timeValueState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("time-ts",classOf[Long]))//每来一条数据,对应调用一次processElementoverride def processElement(value: LoginEventData, ctx: KeyedProcessFunction[String, LoginEventData, LoginFailWarning]#Context, out: Collector[LoginFailWarning]): Unit = {// println("数据进入process = "+value)//判断是否失败事件if (value.eventType == "fail"){loginFailListState.add(value)//注册2秒后触发报警定时器ctx.timerService().registerEventTimeTimer(value.timestamp+2000L)timeValueState.update(value.timestamp+2000L)}else{//如果是成功,直接清空状态 和 定时器loginFailListState.clear()ctx.timerService().deleteEventTimeTimer(timeValueState.value())timeValueState.clear()}}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, LoginEventData, LoginFailWarning]#OnTimerContext, out: Collector[LoginFailWarning]): Unit = {val allLoginFailEventData = new ListBuffer[LoginEventData]val iter: util.Iterator[LoginEventData] = loginFailListState.get().iterator()while (iter.hasNext){allLoginFailEventData += iter.next()}//当超过一定失败次数,触发输出if (allLoginFailEventData.size >= failMaxSize){out.collect(new LoginFailWarning(allLoginFailEventData.head.userId ,allLoginFailEventData.head.timestamp, // 第一个元素allLoginFailEventData.last.timestamp, // 最后一个元素"login fail in 2 s for "+ allLoginFailEventData.length +" times"))}loginFailListState.clear()timeValueState.clear()}}
5.6.2、另一种版本
也不是最好的解决
package com.learn.loginfailimport java.utilimport org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.KeyedProcessFunctionimport org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferimport scala.util.Randomimport java.sql.Timestamp/*** 检测同一用户在一段时间内,多次登陆失败则报警行为* 改进版*/object LoginCheckAnalysis {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val inputStream: DataStream[LoginEventData] = env.addSource(new LoginEventDataSource).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEventData](Time.seconds(2)) {override def extractTimestamp(element: LoginEventData): Long = element.timestamp})//进行判断和检测,如果两秒内登陆失败则输出报警信息val loginFailWarningStream: DataStream[LoginFailWarning] = inputStream.keyBy(_.userId).process(new LoginFailWarningAdvanceResult(2))loginFailWarningStream.map(data=>{(data.userId , new Timestamp(data.firstFailTime).toString , new Timestamp(data.lastFailTime).toString , data.warningMsg)}).print("fail")env.execute("LoginCheckJob")}}//处理类,改进版,当数据出现连续的失败时立即报警,而不是等待2秒后class LoginFailWarningAdvanceResult(failMaxSize: Int) extends KeyedProcessFunction[String , LoginEventData , LoginFailWarning]{//保存所有登陆失败事件lazy val loginFailListState: ListState[LoginEventData] = getRuntimeContext.getListState(new ListStateDescriptor[LoginEventData]("failEvent-list",classOf[LoginEventData]))//每来一条数据,对应调用一次processElementoverride def processElement(value: LoginEventData, ctx: KeyedProcessFunction[String, LoginEventData, LoginFailWarning]#Context, out: Collector[LoginFailWarning]): Unit = {// println("数据进入process = "+value)//判断是否失败事件if (value.eventType == "fail"){val iter: util.Iterator[LoginEventData] = loginFailListState.get().iterator()if (iter.hasNext){val preData: LoginEventData = iter.next()//如果和之前失败的时间间隔在2秒内,则报警输出if (value.timestamp < preData.timestamp+2000L){out.collect(new LoginFailWarning(value.userId ,preData.timestamp, // 第一个元素value.timestamp, // 最后一个元素"login fail for 2 times"))}//不管有没有在2秒内间隔的失败都将状态清空,再以最新的数据作为下一次失败的基准loginFailListState.clear()loginFailListState.add(value)}else{//之前没有失败的数据,直接添加到状态中loginFailListState.add(value)}}else{//如果是成功,直接清空状态 和 定时器loginFailListState.clear()}}}
5.6.3、CEP版本
<!-- cep库相关依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
package com.learn.loginfailimport java.sql.Timestampimport java.utilimport org.apache.flink.cep.PatternSelectFunctionimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.cep.scala.{CEP, PatternStream}import org.apache.flink.cep.scala.pattern.Pattern/*** 检测同一用户在一段时间内,多次登陆失败则报警行为* 改进版*/object LoginCheckAnalysisCEP {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val inputStream: DataStream[LoginEventData] = env.addSource(new LoginEventDataSource).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEventData](Time.seconds(2)) {override def extractTimestamp(element: LoginEventData): Long = element.timestamp})//1、定义匹配模式,连续两秒内登陆失败val loginFailPattern: Pattern[LoginEventData, LoginEventData] = Pattern.begin[LoginEventData]("firstFail").where(_.eventType=="fail").next("secondFail").where(_.eventType=="fail").within(Time.seconds(2))//在2秒内//2、将模式应用到数据流上,得到一个PatternStreamval patternStream: PatternStream[LoginEventData] = CEP.pattern(inputStream.keyBy(_.userId),loginFailPattern)//3、检出符合模式的数据流,需要调用select,(将检测出的一组数据放到一个map里面进行操作)val result: DataStream[LoginFailWarning] = patternStream.select(new LoginFailEventMatch)result.map(data=>{(data.userId , new Timestamp(data.firstFailTime).toString , new Timestamp(data.lastFailTime).toString , data.warningMsg)}).print("fail")env.execute("LoginCheckJob")}}/*** 实现自定义PatternSelectFunction*/class LoginFailEventMatch extends PatternSelectFunction[LoginEventData , LoginFailWarning]{override def select(map: util.Map[String, util.List[LoginEventData]]): LoginFailWarning = {val firstFail: LoginEventData = map.get("firstFail").get(0)val secondFail: LoginEventData = map.get("secondFail").get(0)LoginFailWarning(firstFail.userId , firstFail.timestamp , secondFail.timestamp , "login fail")}}
