1. 基本要求

  • 统计一小时内的热门商品,每5分钟更新一次
  • 热门度用浏览次数(“pv”)来衡量

2. 解决思路

  • 在所有的用户行为数据中,过滤出浏览(“pv”)行为进行统计
  • 构建滑动窗口,窗口长度为1小时,滑动距离为5分钟

3. 处理流程

image.png

  1. 首先在流数据中进行filter过滤,过滤出pv行为
  2. 根据商品进行分组,key为商品id

image.png

  1. 开窗

image.png
image.png
一份数据同时输入12个滑动窗口(60分钟/5=12)

  1. 计算每个窗口内,每个商品有多少个

image.png
image.png

image.png
image.png

image.png
image.png
image.png


基于 UserBehavior 数据集来进行 分析。项目主体用 Scala 编写,采用 IDEA 作为开发环境进行项目编写,采用 maven 作为项目构建和管理工具。
UserBehaviorAnalysis
HotItemsAnalysis

4. 创建maven项目

4.1 项目框架搭建

打开 IDEA,创建一个 maven 项目,命名为 UserBehaviorAnalysis(不要勾选create from archetype)。由于包含了 多个模块,我们可以以UserBehaviorAnalysis 作为父项目,并在其下建一个名为 HotItemsAnalysis 子项目,用于实时统计热门 top N 商品。
在 UserBehaviorAnalysis 下 新 建 一 个 maven module 作 为 子 项 目 , 命 名 为 HotItemsAnalysis。 父项目只是为了规范化项目结构,方便依赖管理,本身是不需要代码实现的, 所以 UserBehaviorAnalysis 下的 src 文件夹可以删掉
image.png
可以发现父pom文件中包含子模块
image.png

4.2 pom文件配置

声明项目中工具的版本信息

整个项目需要的工具的不同版本可能会对程序运行造成影响,所以应该在最外层的 UserBehaviorAnalysis 中声明所有子模块共用的版本信息。
在父pom文件(UserBehaviorAnalysis)中添加配置:
UserBehaviorAnalysis/pom.xml

  1. <properties>
  2. <flink.version>1.11.2</flink.version>
  3. <scala.binary.version>2.11</scala.binary.version>
  4. <scala.version>2.11</scala.version>
  5. </properties>

添加项目依赖

UserBehaviorAnalysis 中引入flink相关组件公有依赖:

  <dependencies>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
      </dependency>

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
      </dependency>

      <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka_${scala.binary.version}</artifactId>
          <version>${kafka.version}</version>
      </dependency>

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
      </dependency>

  </dependencies>

引入maven插件

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>3.1.1</version>
    <executions>
        <!-- Run shade goal on package phase -->
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <artifactSet>
                    <excludes>
                        <exclude>org.apache.flink:force-shading</exclude>
                        <exclude>com.google.code.findbugs:jsr305</exclude>
                        <exclude>org.slf4j:*</exclude>
                        <exclude>org.apache.logging.log4j:*</exclude>
                    </excludes>
                </artifactSet>
                <filters>
                    <filter>
                        <!-- Do not copy the signatures in the META-INF folder.
                        Otherwise, this might cause SecurityExceptions when using the JAR. -->
                        <artifact>*:*</artifact>
                        <excludes>
                            <exclude>META-INF/*.SF</exclude>
                            <exclude>META-INF/*.DSA</exclude>
                            <exclude>META-INF/*.RSA</exclude>
                        </excludes>
                    </filter>
                </filters>
                <transformers>
                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass>org.example.StreamingJob</mainClass>
                    </transformer>
                </transformers>
            </configuration>
        </execution>
    </executions>
</plugin>

在 HotItemsAnalysis 子模块中,我们并没有引入更多的依赖,所以不需要改动 pom 文件。

4.3 导入数据

在 src/main/目录下,可以看到已有的默认源文件目录是 java,我们可以将其改 名为 scala。将数据文件 UserBehavior.csv 复制到资源文件目录 src/main/resources 下, 我们将从这里读取数据。 至此,我们的准备工作都已完成,接下来可以写代码了。

5. 代码实现

HotItemsAnalysis/src/scala路径下新建HotItems(Scala object)
创建代码,new时如果没有Scala选项时,
image.png
image.png

5.1 定义样例类

//定义输入数据样例类
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)

//定义窗口聚合结果样例类
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)

5.2 将文本数据写入kafka

实际项目中数据一般是从kafka中读取的,本案例所给数据源为文件数据,为了模拟实际项目中的情况,现将本地文件中的数据写入到kafka中,然后从kafka读取数据。

KafkaProducerUtil工具类

将文本数据写入kafka
src\main\scala\hotitems_analysis\KafkaProducerUtil.scala

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

object KafkaProducerUtil {
  def main(args: Array[String]): Unit = {
    writeToKafka("hostitems")
  }
  def writeToKafka(topic: String): Unit ={
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "192.168.188.8:9092")
    properties.setProperty("key.serializer",
      "org.apache.kafka.common.serialization.StringSerializer")
    properties.setProperty("value.serializer",
      "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String,String](properties)

    //从文件读数据,逐行写入kafka
    val bufferedSource =io.Source.fromFile("D:\\IDEA\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv")
    for (line <-bufferedSource.getLines()){
      val record = new ProducerRecord[String,String](topic,line)
      producer.send(record)
    }
    producer.close()
  }
}
  • 需要导入 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
  • writeToKafka(“hostitems”)自定义写入方法,并传入topic
  • 其他代码讲解,请看之前连接中实例代码讲解:https://www.yuque.com/u1046159/qstfva/ve8ydf#LIZyZ

5.3 从kafka中读取数据

//从kafka读取数据
val properties = new Properties()
properties.setProperty("bootstrap.servers", "192.168.188.8:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer",
  "org.apache.kafka.common.serialization.StringSerializer")
properties.setProperty("value.serializer",
  "org.apache.kafka.common.serialization.StringSerializer")

val inputStream = env.addSource(new FlinkKafkaConsumer[String]("hostitems", new SimpleStringSchema(), properties))
  • 使用Flink时,如果从Kafka中读取输入流,默认提供的是String类型的Schema:val myConsumer = new FlinkKafkaConsumer08String, properties)

    5.4 将数据转化为样例类

    val dataStream: DataStream[UserBehavior] = inputStream
    .map(data => {
      val arr = data.split(",")
      UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)
    })
    

    5.4 提取时间戳生成watermark

    .assignAscendingTimestamps(_.timestamp * 1000L)
    

    之所以使用assignAscendingTimestamps,是因为数据文件中时间戳是单调递增的。此方法是数据流的快捷方式,其中已知元素时间戳在每个并行流中单调递增。在这种情况下,系统可以通过跟踪上升时间戳自动且完美地生成水印。这种方法创建时间戳与水印最简单,返回一个long类型的数字就可以了
    **.timestamp 1000L,选择事件时间的字段timestamp ,文本数据时间戳为毫秒,需求是分钟,需要乘 1000L*_

    5.5 窗口聚合操作

    val aggStream: DataStream[ItemViewCount] = dataStream
    .filter(_.behavior == "pv")
    .keyBy("itemId")
    .timeWindow(Time.hours(1), Time.minutes(5))
    .aggregate(new CountAgg(), new ItemViewWindowResult())
    
  • 聚合操作前需要先过滤,过滤出行为是pv的数据,然后根据商品id进行分组

  • 使用.timeWindow(Time size, Time slide)对每个商品做滑动窗口(1小时窗口,5分钟滑动一次)
  • .aggregate(AggregateFunction af, WindowFunction wf)增量的聚合操作,它能使用AggregateFunction提前聚合掉数据,减少state的存储压力。第一个参数是定义窗口聚合规则,第二个函数将每个key每个窗口聚合后的结果带上其他信息进行输出(即CountAgg()输出结果传入ItemViewWindowResult()中)。

CountAgg()

//COUNT统计的聚合函数实现,每出现一条记录就加一
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long] {

  //累加器count的初始值为0
  override def createAccumulator(): Long = 0L

  //每来一条数据调用add方法,累加器和传入的value进行累加即count值+1
  override def add(value: UserBehavior, accumulator: Long): Long = accumulator + 1

  //得到的结果就是count(count值要传递到CountViewWindowResult进行接下来的操作)
  override def getResult(acc: Long): Long = acc

  //如果有两个分区 合并两个分区的数据
  override def merge(a: Long, b: Long): Long = a + b
}

AggregateFunction()函数输入输出数据类型为:输入数据类型、中间聚合状态类型(此处聚合的是count值,count类型为Long)、输出数据类型
image.png
AggregateFunction需要复写的方法有:

  • createAccumulator:创建一个新的累加器。聚合状态就是当前商品的count值
  • add:将给定的输入添加到给定的累加器,并返回新的累加器值。
  • getResult:从累加器获取聚合结果。
  • merge:合并两个累加器,返回合并后的累加器的状态。

ItemViewWindowResult()

//自定义窗口函数
class ItemViewWindowResult() extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {
  override def apply(key: Tuple, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
    val itemId = key.asInstanceOf[Tuple1[Long]].f0
    val windowsEnd = window.getEnd
    val count = input.iterator.next()
    out.collect(ItemViewCount(itemId, windowsEnd, count))
  }
}

其中WindowFunction(全量窗口函数,WindowFunction 输入数据来自AggregateFuntion, 在窗口结束的时候先执行Aggregate对象的getResult,然后再执行 自己的apply对象)属于Scal,引用时需要注意,WindowFunction传入的参数类型:
image.png
其中输入参数类型为CountAgg()返回的数据类型(本案例为Long),输出为样例类类型ItemViewCountwef,key类型为怎么确定?经过keyby之后得到的KeyedStream是JavaTuple类型,故此处的key为元组类型tuple。窗口类型为TimeWindow
image.png

需要拿到参数有:

  • itemId(就是AggregateFunction()返回的key [因为之前的keyby就是通过商品id进行分组的 ],因为key目前是java tuple类型,需要先进行类型转换(p.asInstanceOf[XX] 把 p 转换成 XX 对象的实例)

注意:此处的Tuple1含义,属于java类型,需要的api是import **org.apache.flink.api.java.tuple.{Tuple, Tuple1}**
要想拿到一元组中的值,tuple1方法中有个:
image.png
image.png

  • windowsEnd:使用getEnd
  • count:input.iterator.next()

输出的时候,apply方法本身没有返回值,输出用Collector,代码使用out.collect()


5.6 进行统计整理

val resultStream: DataStream[String] = aggStream
     //按照窗口进行分组,收集窗口内的商品count数据
  .keyBy("windowEnd")
    //自定义处理流程
  .process(new TopNHotItems(5))

为了统计每个窗口下最热门的商品,我们需要再次按窗口进行分组,这里根据ItemViewCount中的windowEnd进行keyBy()操作。然后使用ProcessFunction实现一个自定义的TopN函数TopNHotItems来计算点击量排名前5名的商品,并将排名结果格式化成字符串,便于后续输出。

class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] {
  //定义状态:ListState
  lazy val itemViewCountListState: ListState[ItemViewCount] = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("itemViewCount-list", classOf[ItemViewCount]))

  override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, out: Collector[String]): Unit = {
    //每来一条数据,直接加入ListState
    itemViewCountListState.add(value)
    //注册一个WindowEnd + 1 之后触发的定时器
    ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
  }

  //当定时器触发,可以认为所有窗口统计结果已经到到齐,可以排序输出了
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
    //为了方便排序,另外定义一个ListBuffer,保存ListState里面的所有数据
    val allItemViewCounts: ListBuffer[ItemViewCount] = ListBuffer()
    val iter = itemViewCountListState.get().iterator()
    while (iter.hasNext) {
      allItemViewCounts += iter.next()
    }
    //清空状态
    itemViewCountListState.clear()

    //按照count大小进行排序
    val sortedItemViewCounts = allItemViewCounts.sortBy(_.count)(Ordering.Long.reverse).take(topSize)

    //将排序信息格式化成String,便于打印输出可视化展示
    val result: StringBuilder = new StringBuilder
    result.append("窗口结束时间:").append(new Timestamp(timestamp - 1)).append("\n")

    //遍历结果列表中的每一个ItemViewCount,输出到一行
    for (i <- sortedItemViewCounts.indices) {
      val currentItemViewCount = sortedItemViewCounts(i)
      result.append("NO").append(i + 1).append(": ")
        .append("商品ID= ").append(currentItemViewCount.itemId).append("\t")
        .append("热门度= ").append(currentItemViewCount.count).append("\n")
    }

    result.append("==================\n\n")
    Thread.sleep(1000)
    out.collect(result.toString())
  }
}

ProcessFunction是主要提供了定时器timer的功能(支持EventTimeProcessingTime)。本案例中我们将利用timer来判断何时收齐了某个window下所有商品的点击量数据。由于Watermark的进度是全局的,在processElement方法中,每当收到一条数据ItemViewCount,我们就注册一个windowEnd+1的定时器(Flink框架会自动忽略同一时间的重复注册)。windowEnd+1的定时器被触发时,意味着收到了windowEnd+1的Watermark,即收齐了该windowEnd下的所有商品窗口统计值。我们在onTimer()中处理将收集的所有商品及点击量进行排序,选出TopN,并将排名信息格式化成字符串后进行输出。

这里我们还使用了ListState来存储收到的每条ItemViewCount消息,保证在发生故障时,状态数据的不丢失和一致性。ListState是Flink提供的类似Java List接口的State API,它集成了框架的checkpoint机制,自动做到了exactly-once的语义保证。

KeyedProcessFunction方法传入的key的类型为tuple(当keyby传入的是字符转类型时,分组后返回的结果是tuple类型)

_ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)_的含义:注册一个WindowEnd + 1 之后触发的定时器,延迟1毫秒触发,当触发时,说明watermark已经来了(watermark来到代表定义的watermark之前的数据已经全部到齐,那么此时的窗口可以关闭。如:若windowEnd是九点,九点一毫秒的时候watermark到来,说明九点之前的所有数据都到齐,说明九点之前要关的窗口可以关闭了 )收齐了属于windowEnd窗口的所有商品数据,统一排序处理
image.png
状态编程的讲解见:理解难点https://www.yuque.com/u1046159/qstfva/msa8xa

10-1 热门实时商品统计 - 图22


6. 数据流程图

10-1 热门实时商品统计 - 图23