一. 基于服务器 log 的热门页面浏览量统计

基本需求

  • 读取服务器日志中的每 一行 log,统计在一段时间内用户访问每一个 url 的次数,然后排序输出显示

解决思路

  • 每隔 5 秒,输出最近 10 分钟内访问量最多的前 N 个 URL
  • 与之前的热门实时商品统计类似

    需要注意的是,原始日志中的时间是“dd/MM/yyyy:HH:mm:ss”的形式,需要 定义一个 DateTimeFormat 将其转换为我们需要的时间戳格式:

    1. 208.115.111.72 - - 17/05/2015:11:05:49 +0000 GET /blog/geekery/oniguruma-named-capture-example.html?commentlimit=0
    2. 208.115.111.72 - - 17/05/2015:11:05:50 +0000 GET /blog/geekery/oniguruma-named-capture-example.html?commentlimit=0
    3. 208.115.111.72 - - 17/05/2015:11:05:46 +0000 GET /blog/geekery/oniguruma-named-capture-example.html?commentlimit=0
    4. 208.115.111.72 - - 17/05/2015:11:05:51 +0000 GET /blog/geekery/oniguruma-named-capture-example.html?commentlimit=0
    5. 208.115.111.72 - - 17/05/2015:11:05:31 +0000 GET /blog/geekery/oniguruma-named-capture-example.html?commentlimit=0

    HotPagesNetworkFlow.scala

    定义样例类

    ```scala //输入数据的样例类 case class ApacheLogEvent(ip:String,userId:String,timestamp:Long,method:String,url:String)

//定义一个窗口聚合结果样例类 case class PageViewCount(url:String,windowEnd:Long,count:Long)

<a name="bYpY4"></a>
## 读取数据转化为样例类
```scala
val dataStream = inputStream
  .map(data =>{
    val arr = data.split(" ")
    //输入日期格式
    val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
    //
    val ts = simpleDateFormat.parse(arr(3)).getTime
    ApacheLogEvent(arr(0),arr(1),ts,arr(5),arr(6))
  })
  1. 注意:原始日志中的时间是“dd/MM/yyyy:HH:mm:ss”的形式,需 定义一个DateTimeFormat 将其转换为我们需要的时间戳格式
  2. ApacheLogEvent(arr(0),arr(1),ts,arr(5),arr(6)),注意定义的样例类在文本文件中的位置
  3. gettime返回的是毫秒数

    完整代码

    ```scala package networkflow_analysis

import java.sql.Timestamp import java.text.SimpleDateFormat

import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

//输入数据的样例类 case class ApacheLogEvent(ip:String,userId:String,timestamp:Long,method:String,url:String)

//定义一个窗口聚合结果样例类 case class PageViewCount(url:String,windowEnd:Long,count:Long)

object HotPagesNetworkFlow { def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

/**
 * 读取数据
 * 转换为样例类
 * 并提取时间戳和watermark
 */
  val inputStream=env.socketTextStream("192.168.188.8",7777)
val dataStream = inputStream
  .map(data =>{
    val arr = data.split(" ")

// 对事件时间进行转换,得到时间戳 val simpleDateFormat = new SimpleDateFormat(“dd/MM/yyyy:HH:mm:ss”) val ts = simpleDateFormat.parse(arr(3)).getTime //getTime返回的是毫秒数 ApacheLogEvent(arr(0),arr(1),ts,arr(5),arr(6)) //注意定义的样例类在文本文件中的位置 }) //传入的参数为延迟时间,延迟时间看数据中最大乱序程度(估算一下) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorApacheLogEvent) { override def extractTimestamp(element: ApacheLogEvent): Long = element.timestamp })

//进行开窗聚合以及排序输出
val aggStream = dataStream
  .filter(_.method== "GET")
  .keyBy(_.url)
  .timeWindow(Time.minutes(10),Time.seconds(5))
  //乱序数据处理
  .allowedLateness(Time.minutes(1))
  .sideOutputLateData(new OutputTag[ApacheLogEvent]("late"))

  .aggregate(new PageCountAgg(),new PageViewCountWindowResult())

val resultStream = aggStream
  .keyBy(_.windowEnd)
  .process(new TopNHotPage(3))

dataStream.print("data")
aggStream.print("agg")
aggStream.getSideOutput(new OutputTag[ApacheLogEvent]("late")).print()
resultStream.print()
env.execute("hot pages job")

}

}

class PageCountAgg() extends AggregateFunction[ApacheLogEvent,Long,Long]{ override def createAccumulator(): Long = 0L

override def add(value: ApacheLogEvent, accumulator: Long): Long = accumulator+1

override def getResult(accumulator: Long): Long = accumulator

override def merge(a: Long, b: Long): Long = a+b }

class PageViewCountWindowResult() extends WindowFunction[Long,PageViewCount,String,TimeWindow]{ override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PageViewCount]): Unit = { out.collect(PageViewCount(key,window.getEnd,input.iterator.next())) } }

class TopNHotPage(n: Int) extends KeyedProcessFunction[Long,PageViewCount,String]{

lazy val pageViewCountMapState: MapState[String,Long] = getRuntimeContext.getMapState(new MapStateDescriptorString,Long)

override def processElement(value: PageViewCount, ctx: KeyedProcessFunction[Long, PageViewCount, String]#Context, out: Collector[String]): Unit = { pageViewCountMapState.put(value.url,value.count+1) ctx.timerService().registerEventTimeTimer(value.windowEnd+1) //另外注册一个定时器,一分钟后触发,这时窗口彻底关闭,不再有聚合结果,可以清空状态 ctx.timerService().registerEventTimeTimer(value.windowEnd + 60000L) }

override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PageViewCount, String]#OnTimerContext, out: Collector[String]): Unit = { / val allPageViewCounts: ListBuffer[PageViewCount] = ListBuffer() val iter = pageViewCountListState.get().iterator() while(iter.hasNext){ allPageViewCounts +=iter.next() } /

//判断定时器触发时间,如果已近是窗口结束时间1分钟之后,那么直接清空状态
if (timestamp ==ctx.getCurrentKey + 60000L){
  pageViewCountMapState.clear()
  return
}
val allPageViewCounts: ListBuffer[(String,Long)] = ListBuffer()
val iter = pageViewCountMapState.entries().iterator()
while (iter.hasNext){
  val entry = iter.next()
  //外面括号表示方法调用,里面括号表示二元组
  allPageViewCounts +=((entry.getKey,entry.getValue))

}

//提前清空状态

// pageViewCountListState.clear()

//按照访问量进行排序并输出topN
val sortedPageViewCounts = allPageViewCounts.sortWith(_._2 > _._2).take(n)

//将排序信息格式化成String,便于打印输出可视化展示
val result: StringBuilder = new StringBuilder
result.append("窗口结束时间:").append(new Timestamp(timestamp - 1)).append("\n")

//遍历结果列表中的每一个ItemViewCount,输出到一行
for (i <- sortedPageViewCounts.indices) {
  val currentItemViewCount = sortedPageViewCounts(i)
  result.append("NO").append(i + 1).append(": ")
    .append("页面url= ").append(currentItemViewCount._1).append("\t")
    .append("热门度= ").append(currentItemViewCount._2).append("\n")
}

result.append("==================\n\n")
Thread.sleep(1000)
out.collect(result.toString())

} }

<a name="vnPNU"></a>
# 二.  基于埋点日志数据的网络流量统计  
     从 web 服务器 log 中得到的 url,往往更多的是请求某个资源地址 (/*.js、/*.css),如果要针对页面进行统计往往还需要进行过滤。而在实际电商应 用中,相比每个单独页面的访问量,我们可能更加关心整个电商网站的网络流量。 这个指标,除了合并之前每个页面的统计结果之外,还可以通过统计埋点日志数据 中的“pv”行为来得到。  

<a name="J2hmo"></a>
##  网站总浏览量(PV)的统计  
 衡量网站流量一个最简单的指标,就是网站的页面浏览量(Page View,PV)
```scala
package networkflow_analysis

import org.apache.flink.api.common.functions.{AggregateFunction, MapFunction}
import org.apache.flink.api.common.state.{ListState, ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.util.Random

case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
//定义输出pv统计的样例类
case class PvCount(windowEnd:Long,count:Long)


object PageView {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
//    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val resource = getClass.getResource("/UserBehavior.csv")
    val inputStream: DataStream[String] = env.readTextFile(resource.getPath)
    //转换为样例类类型,并提取时间戳与watermark
    val dataStream: DataStream[UserBehavior] = inputStream
      .map(data => {
        val arr = data.split(",")
        UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)
      })
      // 选择事件时间的字段
      // 因为数据中时间戳不是乱序的,使用assignAscendingTimestamps
      .assignAscendingTimestamps(_.timestamp * 1000L)

    val pvStream = dataStream
      .filter(_.behavior =="pv")
//      .map( data => ("pv",1L))  //定义一个pv字段作为分组的dummy key
      .map(new MyMapper())
      .keyBy(_._1)  //当前所有数据会被分在同一个组
      .timeWindow(Time.hours(1)) //一小时滚动窗口
      .aggregate(new PvCountAgg(),new PvCountWindowResult())

    val totalPvStream = pvStream
        .keyBy(_.windowEnd)
        .process(new TotalPvCountResult())

    totalPvStream.print()
    env.execute("pv job")
  }
}

//自定义预聚合函数
class PvCountAgg() extends AggregateFunction[(String,Long),Long,Long]{
  override def createAccumulator(): Long = 0L

  override def add(value: (String,Long), accumulator: Long): Long = accumulator+1

  override def getResult(accumulator: Long): Long = accumulator

  override def merge(a: Long, b: Long): Long = a+b
}

//自定义窗口函数
class PvCountWindowResult() extends WindowFunction[Long,PvCount,String,TimeWindow]{
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PvCount]): Unit = {
    out.collect(PvCount(window.getEnd,input.head))
  }
}

//自定义mapper,随机分组生成key,防止数据倾斜
class MyMapper() extends MapFunction[UserBehavior,(String,Long)]{
  override def map(value: UserBehavior): (String, Long) = {
    (Random.nextString(10),1L)
  }
}

class TotalPvCountResult() extends KeyedProcessFunction[Long,PvCount,PvCount]{
  //定义一个状态,保存当前所有的count总和
  lazy val totalPvCountResultState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("total-pv",classOf[Long]))

  override def processElement(value: PvCount, ctx: KeyedProcessFunction[Long, PvCount, PvCount]#Context, out: Collector[PvCount]): Unit = {
    //每来一个数据将count值叠加到当前状态上
    val currentTotalCount = totalPvCountResultState.value()
    totalPvCountResultState.update(currentTotalCount + value.count)

    //注册windowEnd+1触发的定时器
    ctx.timerService().registerEventTimeTimer(value.windowEnd+1)
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PvCount, PvCount]#OnTimerContext, out: Collector[PvCount]): Unit = {
    val totalPvCount = totalPvCountResultState.value()
    out.collect(PvCount(ctx.getCurrentKey,totalPvCount))
    totalPvCountResultState.clear()
  }
}

网站独立访客数(UV)的统计

在上节的例子中,我们统计的是所有用户对页面的所有浏览行为,也就是说, 同一用户的浏览行为会被重复统计。而在实际应用中,我们往往还会关注,在一段 时间内到底有多少不同的用户访问了网站。
另外一个统计流量的重要指标是网站的独立访客数(Unique Visitor,UV)。UV 指的是一段时间(比如一小时)内访问网站的总人数,1 天内同一访客的多次访问 只记录为一个访客。
对于 UserBehavior 数据源来说,我们直接可以根据 userId 来区分不同的 用户。
在 src/main/scala 下创建 UniqueVisitor.scala 文件

package networkflow_analysis

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

//定义输出输出UV统计样例类
case class UvCount(windowEnd:Long,count:Long)

object UniqueVisitor {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val resource = getClass.getResource("/UserBehavior.csv")
    val inputStream: DataStream[String] = env.readTextFile(resource.getPath)
    //转换为样例类类型,并提取时间戳与watermark
    val dataStream: DataStream[UserBehavior] = inputStream
      .map(data => {
        val arr = data.split(",")
        UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)
      })
      // 选择事件时间的字段
      // 因为数据中时间戳不是乱序的,使用assignAscendingTimestamps
      .assignAscendingTimestamps(_.timestamp * 1000L)

    val uvStream = dataStream
      .filter(_.behavior == "pv")
      .timeWindowAll(Time.hours(1)) //直接不分组,基于DataStream开1小时滚动窗口
      .apply(new UvCountResult())

    uvStream.print()
    env.execute("uv job")
  }

}

//自定义全窗口函数,用一个set结构保存数据,进行自动去重
class UvCountResult() extends AllWindowFunction[UserBehavior,UvCount,TimeWindow]{
  override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = {
    //定义set
    var userIdSet = Set[Long]()

    //遍历窗口中所有数据,把userID添加到set中,自动去重
    for (userBehavior <- input){
      userIdSet += userBehavior.userId
    }
    //将set的size作为去重后的uv进行输出
    out.collect(UvCount(window.getEnd,userIdSet.size))
  }
}

上例中,把所有数据的userId 都存在了窗口计算的状态里,在窗口收集数据的过程中,状态会不断增大。一般情况下,只要不超出内存的承受范围,这种做法也没什么问题;但如果我们遇到的数据量很大呢?把所有数据暂存放到内存里,显然不是一个好注意。我们会想到,可以利用 redis这种内存级 k v 数据库,为我们做一个缓存。但如果我们遇到的情况非常极端,数据大到惊人呢?比如上亿级的用户,要去重计算 UV 。如果放到redis 中,亿级的用户id (每个20字节左右的话)可能需要几G甚至几十G的空间来存储。当然放到 redis 中,用集群进行扩展也不是不可以,但明显代价太大了。一个更好的想法是,其实我们不需要完整地存储用户ID 的信息,只要知道他在不在就行了。所以其实我们可以进行压缩处理,用一位(bit)就可以表示一个用户的状态。这个思想的具体实现就是布隆过滤器(Bloom Filter)
本质上布隆过滤器是一种数据结构,比较巧妙的概率型数据结构(probabilisticdata structure ),特点是高效地插入和查询,可以用来告诉你 “某样东西一定不存在或者可能存在”。
它本身是一个很长的二进制向量,既然是二进制的向量,那么显而易见的,存放的不是 0,就是 1。相比于传统的 List、Set、Map 等数据结构,它更高效、占用 空间更少,但是缺点是其返回的结果是概率性的,而不是确切的。 我们的目标就是,利用某种方法(一般是 Hash 函数)把每个数据,对应到一个 位图的某一位上去;如果数据存在,那一位就是 1,不存在则为 0。

超大数据量去重——布隆过滤器查重-过滤的UV统计

package networkflow_analysis

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis

////定义输出输出UV统计样例类
//case class UvCount(windowEnd:Long,count:Long)

object UvWithBloom {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val resource = getClass.getResource("/UserBehavior.csv")
    val inputStream: DataStream[String] = env.readTextFile(resource.getPath)
    //转换为样例类类型,并提取时间戳与watermark
    val dataStream: DataStream[UserBehavior] = inputStream
      .map(data => {
        val arr = data.split(",")
        UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)
      })
      // 选择事件时间的字段
      // 因为数据中时间戳不是乱序的,使用assignAscendingTimestamps
      .assignAscendingTimestamps(_.timestamp * 1000L)

    val uvStream = dataStream
      .filter(_.behavior == "pv")
      .map(data => ("uv",data.userId))
      .keyBy(_._1)
      .timeWindow(Time.hours(1)) //直接不分组,基于DataStream开1小时滚动窗口
      .trigger(new MyTrigger())   //自定义触发器
      .process(new UvCountWithBloom())

    uvStream.print()
    env.execute("uv with bloom job")
  }
}

//每来一条数据直接触发窗口计算,并清空窗口状态
class MyTrigger() extends Trigger[(String,Long),TimeWindow]{
  override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
    TriggerResult.FIRE_AND_PURGE
  }
  override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

  override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

  override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}
}

//自定义布隆过滤器,主要就是一个位图和hash函数
class Bloom(size:Long) extends Serializable{
  private val cap = size   //默认cap是2的整次幂

  //hash函数
  def hash(value: String,seed:Int):Long = {
    var result = 0
    for (i <- 0 until value.length){
      result = result * seed + value.charAt(i)
    }
    //返回hash值,要映射到cap范围内
    (cap -1) & result
  }
}

//实现自定义的窗口处理函数
class UvCountWithBloom() extends ProcessWindowFunction[(String,Long),UvCount,String,TimeWindow]{
//  定义Redis连接以及布隆过滤器
  lazy val jedis = new Jedis("localhost",6379)
  lazy val bloomFilter = new Bloom(1<<29)  //位的个数 64MB = 2^6(64) * 2^20(1M)  * 2^3(8bit)
  override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {
    //首先先定义Redis中存储位图的key
    val storedBitMapKey = context.window.getEnd.toString

    //另外将当前窗口的uv count值,作为状态保存到Redis中,用一个叫做uvcount的hash表来保存(windEnd,count)
    val uvCountMap = "uvcount"
    val currentKey = context.window.getEnd.toString
    var count = 0L
    //从Redis窗口中取出当前窗口的uv count值
    if(jedis.hget(uvCountMap,currentKey) !=null){
      count = jedis.hget(uvCountMap,currentKey).toLong
    }
    //去重:判断当前useID对应的hash值是否为0
    val userId = elements.last._2.toString
    //计算hash值,就对应为图中的偏移量
    val offset = bloomFilter.hash(userId,61)
    //用Redis的位操作命令,取bitmap中对应的值
    val isExsit = jedis.getbit(storedBitMapKey,offset)
    if (!isExsit){
      //如果不存在,那么位图随影位置为1,并且将count值加1
      jedis.setbit(storedBitMapKey,offset,true)
      jedis.hset(uvCountMap,currentKey,(count+1).toString)

    }


  }
}

三. App市场推广统计


import java.sql.Timestamp
import java.util.UUID

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.util.Random

//定义输入数据的样例类
case class MarketUserBehavior(userId: String, behavior: String, channel: String, timestamp: Long)

//定义输出数据样例类
case class MarketViewCount(windowStart: String, windowEnd: String, channel: String, behavior: String, count: Long)

//自定义测试数据源
class SimulatedSource extends RichSourceFunction[MarketUserBehavior] {
  //是否运行的标识位
  var running = true
  //定义用户行为和渠道集合
  val behaviorSet: Seq[String] = Seq("view", "download", "install", "uninstall")
  val channelSet: Seq[String] = Seq("appstore", "weibo", "wechat", "tieba")
  val rand: Random = Random

  override def run(ctx: SourceFunction.SourceContext[MarketUserBehavior]): Unit = {
    //定义一个生成数据最大数量
    val maxCounts = Long.MaxValue
    var count = 0L

    //while循环,不停的产生数据
    while (running && count < maxCounts) {
      val id = UUID.randomUUID().toString
      //随机生成数据,预先定义一个集合,随机生成一个对应的下标,指定范围0—size之间
      val behavior = behaviorSet(rand.nextInt(behaviorSet.size))
      val channel = channelSet(rand.nextInt(channelSet.size))
      val ts = System.currentTimeMillis()
      ctx.collect(MarketUserBehavior(id, behavior, channel, ts))
      count += 1
      Thread.sleep(50L)


    }
  }

  override def cancel(): Unit = running = false
}

object MarketAnalysis {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputStream2: DataStream[String] = env.readTextFile("D:\\IDEA\\UserBehaviorAnalysis\\NetworkFlowAnalysis\\src\\main\\resources\\apache.log")


    val dataStream = env.addSource(new SimulatedSource)
      .assignAscendingTimestamps(_.timestamp)

    //开窗统计输出
    val resultStream = dataStream
      .filter(_.behavior != "uninstall")
      .keyBy(data => (data.channel, data.behavior))
      .timeWindow(Time.days(1), Time.seconds(5))
      .process(new MarketCountByChannel())

    resultStream.print()

    env.execute("app market by channel job")


  }
}

//自定义ProcessFunction
class MarketCountByChannel() extends ProcessWindowFunction[MarketUserBehavior, MarketViewCount, (String, String), TimeWindow] {

  override def process(key: (String, String), context: Context, elements: Iterable[MarketUserBehavior], out: Collector[MarketViewCount]): Unit = {
    val start = new Timestamp(context.window.getStart).toString
    val end = new Timestamp(context.window.getEnd).toString
    val channel = key._1
    val behavior = key._2
    val count = elements.size
    out.collect(MarketViewCount(start, end, channel, behavior, count))

  }
}

四. 页面广告统计

import java.sql.Timestamp

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

//定义样例类
case class AdClickLog(userId: Long, adId: Long, province: String, city: String, timestamp: Long)

case class AdClickCountryByProvince(windowEnd: String, province: String, count: Long)

object AdClickAnalysis {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //读取数据
    val resource = getClass.getResource("AdClickLog.csv")
    val inputStream = env.readTextFile(resource.getPath)

    //转换为样例类,并提取时间戳
    val adCountResult = inputStream
      .map(data => {
        val arr = data.split(",")
        AdClickLog(arr(0).toLong, arr(1).toLong, arr(2), arr(3), arr(4).toLong)
      })
      .assignAscendingTimestamps(_.timestamp * 1000L)

    //开窗聚合统计
      .keyBy(_.province)
      .timeWindow(Time.days(1),Time.seconds(5))
      //之所以定义AdCountWindowResult是因为AdCountAgg拿不到窗口信息
      .aggregate(new AdCountAgg(),new AdCountWindowResult())

    adCountResult.print()
    env.execute("ad count statistics job")


  }

}

class AdCountAgg() extends AggregateFunction[AdClickLog,Long,Long]{
  override def createAccumulator(): Long = 0L
  override def add(value: AdClickLog, accumulator: Long): Long = accumulator+1

  override def getResult(accumulator: Long): Long = accumulator

  override def merge(a: Long, b: Long): Long = a+b
}

class AdCountWindowResult() extends WindowFunction[Long,AdClickCountryByProvince,String,TimeWindow]{
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdClickCountryByProvince]): Unit = {
    val end = new Timestamp(window.getEnd).toString
    out.collect(AdClickCountryByProvince(end,key,input.head))
  }
}

输入数据类型:
image.png
输出数据:
image.png

五. 黑名单过滤

上节我们进行的点击量统计,同一用户的重复点击是会叠加计算的。在实际场 景中,同一用户确实可能反复点开同一个广告,这也说明了用户对广告更大的兴趣; 但是如果用户在一段时间非常频繁地点击广告,这显然不是一个正常行为,有刷点 击量的嫌疑。所以我们可以对一段时间内(比如一天内)的用户点击行为进行约束, 如果对同一个广告点击超过一定限额(比如 100 次),应该把该用户加入黑名单并 报警,此后其点击行为不应该再统计。

方法:用processfunction进行黑名单过滤,检测出用户对同一广告的点击量,如果超出上限就将用户信息已侧输出流输出到黑名单中

import java.sql.Timestamp

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

//定义样例类
case class AdClickLog(userId: Long, adId: Long, province: String, city: String, timestamp: Long)

case class AdClickCountryByProvince(windowEnd: String, province: String, count: Long)

//黑名单报警信息样例类
case class BlackListUserWarning(userId: Long, adId: Long, msg: String)

object AdClickAnalysis {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //读取数据
    val resource = getClass.getResource("AdClickLog.csv")
    val inputStream = env.readTextFile(resource.getPath)

    //转换为样例类,并提取时间戳
    val adLogStream = inputStream
      .map(data => {
        val arr = data.split(",")
        AdClickLog(arr(0).toLong, arr(1).toLong, arr(2), arr(3), arr(4).toLong)
      })
      .assignAscendingTimestamps(_.timestamp * 1000L)

    //过操作,将有刷单行为的用户输出到测输出流
    val filterBlackUserStream: DataStream[AdClickLog] = adLogStream
      .keyBy(data => (data.userId, data.adId))
      .process(new FilterBlackListUserResult(100))

    val adCountResultStream = filterBlackUserStream
      //开窗聚合统计
      .keyBy(_.province)
      .timeWindow(Time.hours(1), Time.seconds(5))
      //之所以定义AdCountWindowResult是因为AdCountAgg拿不到窗口信息
      .aggregate(new AdCountAgg(), new AdCountWindowResult())


    adCountResultStream.print("count result")
    filterBlackUserStream.getSideOutput(new OutputTag[BlackListUserWarning]("warning")).print("warning")
    env.execute("ad count statistics job")


  }

}

class AdCountAgg() extends AggregateFunction[AdClickLog, Long, Long] {
  override def createAccumulator(): Long = 0L

  override def add(value: AdClickLog, accumulator: Long): Long = accumulator + 1

  override def getResult(accumulator: Long): Long = accumulator

  override def merge(a: Long, b: Long): Long = a + b
}

class AdCountWindowResult() extends WindowFunction[Long, AdClickCountryByProvince, String, TimeWindow] {
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdClickCountryByProvince]): Unit = {
    val end = new Timestamp(window.getEnd).toString
    out.collect(AdClickCountryByProvince(end, key, input.head))
  }
}

//自定义keyedProcessFunction
class FilterBlackListUserResult(maxCount:Long) extends KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog] {
  //定义状态:保存用户对广告的点击量、每天零点定时清空状态的时间戳、标记当前用户是否进过黑名单
  lazy val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count", classOf[Long]))
  lazy val resetTimerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("reset-ts", classOf[Long]))
  lazy val isBlackState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-black", classOf[Boolean]))

  override def processElement(value: AdClickLog, ctx: KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]#Context, out: Collector[AdClickLog]): Unit = {
    val currentCount = countState.value()
    //判断只要是第一个数据来了,直接注册0点清空状态的定时器
    if (currentCount == 0) {
      val ts = (ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1) * (24 * 60 * 60 * 1000) - 8 * 60 * 60 * 1000
      resetTimerTsState.update(ts)
      ctx.timerService().registerEventTimeTimer(ts)
    }

    //判断count值是否达到预定的阈值,如果超过输出到黑名单
    if (currentCount >= maxCount){
      //判断是否已经在黑名单中,没有的话才输出测输出流
      if (!isBlackState.value()){
        isBlackState.update(true)
        ctx.output(new OutputTag[BlackListUserWarning]("warning"),BlackListUserWarning(value.userId,value.adId,"Click ad over"+maxCount+"times today"))
      }
      return
    }
    //正常情况:count+1,将数据原样输出
    countState.update(currentCount + 1)
    out.collect(value)
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]#OnTimerContext, out: Collector[AdClickLog]): Unit = {
    if(timestamp==resetTimerTsState){
      isBlackState.clear()
      countState.clear()
    }
  }
}

六. 恶意登陆监控

image.png

需要在 pom 文件中引入 CEP 的相关依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

PYTHONUNBUFFERED=1

七.