一、车辆超速信息监控

  • 从kafka读取卡口车辆的速度信息,关联卡口的限速信息表,统计车辆超速20%车辆,保存到mysql结果表中。
  • 注意:使用广播数据流,使用场景,两个流要关联,一个流大,一个流小,小的流会定期变化,可以将小的流全量广播到每个taskManager中。
  • 主要涉及kafkasource、mysqlsink、广播算子的样例代码 ```scala import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet} import java.util.Properties

import com.bjsxt.traffic.utils.{JDBCSink, MonitorCarInfo, MonitorLimitSpeedInfo, OverSpeedCarInfo} import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor, ReadOnlyBroadcastState} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.BroadcastStream import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase} import org.apache.flink.util.Collector import org.apache.kafka.common.serialization.StringSerializer

object OverSpeedCarAnaly { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.streaming.api.scala._

  1. //设置Kafka 配置
  2. val props = new Properties()
  3. props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
  4. props.setProperty("key.deserializer",classOf[StringSerializer].getName)
  5. props.setProperty("value.deserializer",classOf[StringSerializer].getName)
  6. props.setProperty("group.id","group1125")
  7. //创建Kafka Source,monitortopic1125表示读取的topic
  8. //setStartFromEarliest表示从最开始读取数据
  9. val kafkaSource: FlinkKafkaConsumerBase[String] = new FlinkKafkaConsumer[String]("monitortopic1125",new SimpleStringSchema(),props).setStartFromEarliest()
  10. //读取车辆监控信息
  11. val ds1: DataStream[String] = env.addSource(kafkaSource)
  12. // val ds1: DataStream[String] = env.socketTextStream("mynode5",9999)
  13. //对车辆监控信息进行转换数据
  14. //MonitorCarInfo格式如下:
  15. //areaId:String,roadId:String,monitorId:String,cameraId:String,actionTime:Long,car:String,speed:Double
  16. val carMonitorDS: DataStream[MonitorCarInfo] = ds1.map(line => {
  17. val arr: Array[String] = line.split("\t")
  18. MonitorCarInfo(arr(0), arr(1), arr(2), arr(3), arr(4).toLong, arr(5), arr(6).toDouble)
  19. })
  20. //读取区域-道路-卡扣 车辆限速信息,读取mysql中的数据,每隔半小时读取一次
  21. //使用 RichSourceFunction 生成卡口限速信息
  22. val limitSpeedDs: DataStream[MonitorLimitSpeedInfo] = env.addSource(new RichSourceFunction[MonitorLimitSpeedInfo] {
  23. var conn: Connection = _
  24. var pst: PreparedStatement = _
  25. var stop = false
  26. //当初始化 RichSourceFunction时调用一次,可以初始化数据库连接对象
  27. override def open(parameters: Configuration): Unit = {
  28. conn = DriverManager.getConnection("jdbc:mysql://192.168.179.14:3306/traffic_monitor", "root", "123456")
  29. pst = conn.prepareStatement("select area_id,road_id,monitor_id,speed_limit from t_monitor_speed_limit_info")
  30. }
  31. //产生数据的方法,一般使用一个循环来实现读取mysql数据
  32. override def run(ctx: SourceFunction.SourceContext[MonitorLimitSpeedInfo]): Unit = {
  33. while (!stop) {
  34. val set: ResultSet = pst.executeQuery()
  35. while (set.next()) {
  36. val areaId: String = set.getString("area_id")
  37. val roadId: String = set.getString(2)
  38. val monitorId: String = set.getString(3)
  39. val speedLimit: Double = set.getDouble(4)
  40. ctx.collect(new MonitorLimitSpeedInfo(areaId, roadId, monitorId, speedLimit))
  41. }
  42. //每隔半小时查询一次数据库全量的限速信息
  43. Thread.sleep(30 * 1000 * 60)
  44. }
  45. }
  46. //当Flink程序取消时,自动调用cancel方法
  47. override def cancel(): Unit = {
  48. stop = true
  49. pst.close()
  50. conn.close()
  51. }
  52. })
  53. //将从mysql中读取的限速信息,使用广播流广播出去
  54. val mapStateDesc = new MapStateDescriptor[String,Double]("map-state",classOf[String],classOf[Double])
  55. val bcLimitSpeed: BroadcastStream[MonitorLimitSpeedInfo] = limitSpeedDs.broadcast(mapStateDesc)
  56. //连接车辆监控的数据流与当前的广播流 ,进行车辆超速判断
  57. val overSpeedInfoDs: DataStream[OverSpeedCarInfo] = carMonitorDS.connect(bcLimitSpeed).process(new BroadcastProcessFunction[MonitorCarInfo, MonitorLimitSpeedInfo, OverSpeedCarInfo] {
  58. //针对车辆监控数据进行处理,value : 当前本条数据 cxt:Flink上下文,out:回收数据对象
  59. override def processElement(value: MonitorCarInfo, ctx: BroadcastProcessFunction[MonitorCarInfo, MonitorLimitSpeedInfo, OverSpeedCarInfo]#ReadOnlyContext, out: Collector[OverSpeedCarInfo]): Unit = {
  60. val areaId: String = value.areaId
  61. val monitorId: String = value.monitorId
  62. val cameraId: String = value.cameraId
  63. //本条数据区域_道路_卡扣 信息
  64. val key = areaId + "_" + monitorId + "_" + cameraId
  65. //获取广播状态,判断当前数据key 是否有对应的限速状态
  66. val readOnlyBS: ReadOnlyBroadcastState[String, Double] = ctx.getBroadcastState(mapStateDesc)
  67. if (readOnlyBS.contains(key)) {
  68. //如果包含当前key对应的限速
  69. if (value.speed > readOnlyBS.get(key) * 1.2) {
  70. //超速车辆,回收回去
  71. out.collect(new OverSpeedCarInfo(value.car, value.monitorId, value.roadId, value.speed, readOnlyBS.get(key), value.actionTime))
  72. }
  73. } else {
  74. //如果不包含限速信息,默认限速为60
  75. if (value.speed > 60 * 1.2) {
  76. //超速车辆信息,回收回去
  77. out.collect(new OverSpeedCarInfo(value.car, value.monitorId, value.roadId, value.speed, 60.0, value.actionTime))
  78. }
  79. }
  80. }
  81. //针对广播流数据进行处理 ,value :当前广播的此条数据,ctx :Flink上下文,out:回收数据对象
  82. override def processBroadcastElement(value: MonitorLimitSpeedInfo, ctx: BroadcastProcessFunction[MonitorCarInfo, MonitorLimitSpeedInfo, OverSpeedCarInfo]#Context, out: Collector[OverSpeedCarInfo]): Unit = {
  83. val broadCastState: BroadcastState[String, Double] = ctx.getBroadcastState(mapStateDesc)
  84. val key = value.areaId + "_" + value.roadId + "_" + value.monitorId //key : 区域_道路_卡扣
  85. broadCastState.put(key, value.limitSpeed)
  86. }
  87. })
  88. //将结果写入mysql 表 t_speed_info
  89. overSpeedInfoDs.addSink(new JDBCSink[OverSpeedCarInfo]("OverSpeedCarInfo"))
  90. env.execute()

} }


import java.util import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Table} import org.apache.hadoop.conf import org.apache.hadoop.hbase.{HBaseConfiguration, TableName, client} /**

  • HBase sink ,批量数据插入到HBase中 */ class HBaseSink extends RichSinkFunction[java.util.List[Put]] { var configuration: conf.Configuration = var conn: client.Connection = //初始化 RichSinkFunction 对象时 执行一次 override def open(parameters: Configuration): Unit = { configuration = HBaseConfiguration.create() configuration.set(“hbase.zookeeper.quorum”,”mynode3:2181,mynode4:2181,mynode5:2181”) conn = ConnectionFactory.createConnection(configuration) }

    //每条数据执行一次 override def invoke(value: util.List[Put], context: SinkFunction.Context[_]): Unit = { //连接HBase 表 val table: Table = conn.getTable(TableName.valueOf(“a1”)) table.put(value) } } ```

二、每个区域每个道路每个卡扣的平均速度

  • 每隔1分钟统计过去5分钟,每个区域每个道路每个卡扣的平均速度
  • 涉及EventTime时间语义 ```scala import java.text.SimpleDateFormat import java.util.{Date, Properties}

import com.bjsxt.traffic.utils.{JDBCSink, MonitorAvgSpeedInfo, MonitorCarInfo} import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.util.Collector import org.apache.kafka.common.serialization.StringDeserializer

/**

  • 每隔1分钟统计过去5分钟,每个区域每个道路每个卡扣的平均速度 */ object MonitorAvgSpeedAnaly { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.streaming.api.scala._

    //设置并行度和事件时间 env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //读取Kafka中 实时监控车辆数据 val props = new Properties() props.setProperty(“bootstrap.servers”,”mynode1:9092,mynode2:9092,mynode3:9092”) props.setProperty(“key.deserializer”,classOf[StringDeserializer].getName) props.setProperty(“value.deserializer”,classOf[StringDeserializer].getName) props.setProperty(“group.id”,”group112502”)

    //读取Kafka 中监控到实时的车辆信息 val monitorInfosDs: DataStream[String] = env.addSource(new FlinkKafkaConsumerString,props).setStartFromEarliest()) // val monitorInfosDs: DataStream[String] = env.socketTextStream(“mynode5”,9999) //数据类型转换 val transferDS: DataStream[MonitorCarInfo] = monitorInfosDs.map(line => { val arr: Array[String] = line.split(“\t”) MonitorCarInfo(arr(0), arr(1), arr(2), arr(3), arr(4).toLong, arr(5), arr(6).toDouble) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorMonitorCarInfo) { override def extractTimestamp(element: MonitorCarInfo): Long = element.actionTime//设置EventTime的字段 })

  1. //设置窗口,每隔1分钟计算每个卡扣的平均速度,使用aggregate 增量计算+全量计算
  2. val monitorAvgSpeedDS: DataStream[MonitorAvgSpeedInfo] = transferDS.keyBy(mi => {
  3. mi.areaId + "_" + mi.roadId + "_" + mi.monitorId
  4. }).timeWindow(Time.minutes(5), Time.minutes(1))
  5. .aggregate(new AggregateFunction[MonitorCarInfo, (Long, Double), (Long, Double)] {
  6. override def createAccumulator(): (Long, Double) = (0L, 0L)
  7. override def add(value: MonitorCarInfo, accumulator: (Long, Double)): (Long, Double) = {
  8. (accumulator._1 + 1, accumulator._2 + value.speed)
  9. }
  10. override def getResult(accumulator: (Long, Double)): (Long, Double) = accumulator
  11. override def merge(a: (Long, Double), b: (Long, Double)): (Long, Double) = (a._1 + b._1, a._2 + b._2)
  12. },
  13. new WindowFunction[(Long, Double), MonitorAvgSpeedInfo, String, TimeWindow] {
  14. //key : 区域_道路_卡扣 window : 当前窗口,input :输入的数据 ,out:回收数据对象
  15. override def apply(key: String, window: TimeWindow, input: Iterable[(Long, Double)], out: Collector[MonitorAvgSpeedInfo]): Unit = {
  16. val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  17. val startTime: String = sdf.format(new Date(window.getStart))
  18. val endTime: String = sdf.format(new Date(window.getEnd))
  19. //计算当前卡扣的平均速度
  20. val last: (Long, Double) = input.last
  21. val avgSpeed: Double = (last._2 / last._1).formatted("%.2f").toDouble
  22. out.collect(new MonitorAvgSpeedInfo(startTime, endTime, key, avgSpeed, last._1))
  23. }
  24. })
  25. //保存到mysql 数据库中
  26. monitorAvgSpeedDS.addSink(new JDBCSink[MonitorAvgSpeedInfo]("MonitorAvgSpeedInfo"))
  27. env.execute()

} }

//定义卡扣平均速度信息 case class MonitorAvgSpeedInfo(windowStartTime:String,windowEndTime:String,monitorId:String,avgSpeed:Double,carCount:Long)

  1. <a name="X3shh"></a>
  2. ## 三、每隔1分钟统计最近5分钟 最通畅的topN卡扣信息
  3. ```scala
  4. import java.util.Properties
  5. import com.bjsxt.traffic.utils._
  6. import org.apache.flink.api.common.serialization.SimpleStringSchema
  7. import org.apache.flink.streaming.api.TimeCharacteristic
  8. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
  9. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  10. import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
  11. import org.apache.flink.streaming.api.windowing.time.Time
  12. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  13. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
  14. import org.apache.flink.util.Collector
  15. import org.apache.kafka.common.serialization.StringDeserializer
  16. object TopNMinAvgSpeedMonitorAnaly {
  17. def main(args: Array[String]): Unit = {
  18. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  19. import org.apache.flink.streaming.api.scala._
  20. //设置并行度和事件时间
  21. env.setParallelism(1)
  22. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  23. //读取Kafka中 实时监控车辆数据
  24. val props = new Properties()
  25. props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
  26. props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
  27. props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
  28. props.setProperty("group.id","group112502")
  29. //读取Kafka 中监控到实时的车辆信息
  30. val monitorInfosDs: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("monitortopic1125",new SimpleStringSchema(),props).setStartFromEarliest())
  31. // val monitorInfosDs: DataStream[String] = env.socketTextStream("mynode5",9999)
  32. //数据类型转换
  33. val transferDS: DataStream[MonitorCarInfo] = monitorInfosDs.map(line => {
  34. val arr: Array[String] = line.split("\t")
  35. MonitorCarInfo(arr(0), arr(1), arr(2), arr(3), arr(4).toLong, arr(5), arr(6).toDouble)
  36. }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MonitorCarInfo](Time.seconds(5)) {
  37. override def extractTimestamp(element: MonitorCarInfo): Long = element.actionTime
  38. })
  39. //每隔1分钟统计过去5分钟,最通畅的top5卡扣信息
  40. val top5MonitorInfoDS: DataStream[Top5MonitorInfo] = transferDS.timeWindowAll(Time.minutes(5), Time.minutes(1))
  41. .process(new ProcessAllWindowFunction[MonitorCarInfo, Top5MonitorInfo, TimeWindow] {
  42. //context :Flink 上下文,elements : 窗口期内所有元素,out: 回收数据对象
  43. override def process(context: Context, elements: Iterable[MonitorCarInfo], out: Collector[Top5MonitorInfo]): Unit = {
  44. val map = scala.collection.mutable.Map[String, MonitorSpeedClsCount]()
  45. val iter: Iterator[MonitorCarInfo] = elements.iterator
  46. while (iter.hasNext) {
  47. val currentInfo: MonitorCarInfo = iter.next()
  48. val areaId: String = currentInfo.areaId
  49. val roadId: String = currentInfo.roadId
  50. val monitorId: String = currentInfo.monitorId
  51. val speed: Double = currentInfo.speed
  52. val currentKey = areaId + "_" + roadId + "_" + monitorId
  53. //判断当前map中是否含有当前本条数据对应的 区域_道路_卡扣的信息
  54. if (map.contains(currentKey)) {
  55. //判断当前此条车辆速度位于哪个速度端,给map中当前key 对应的value MonitorSpeedClsCount 对象对应的速度段加1
  56. if (speed >= 120) {
  57. map.get(currentKey).get.hightSpeedCarCount += 1
  58. } else if (speed >= 90) {
  59. map.get(currentKey).get.middleSpeedCount += 1
  60. } else if (speed >= 60) {
  61. map.get(currentKey).get.normalSpeedCarCount += 1
  62. } else {
  63. map.get(currentKey).get.lowSpeedCarCount += 1
  64. }
  65. } else {
  66. //不包含 当前key
  67. val mscc = MonitorSpeedClsCount(0L, 0L, 0L, 0L)
  68. if (speed >= 120) {
  69. mscc.hightSpeedCarCount += 1
  70. } else if (speed >= 90) {
  71. mscc.middleSpeedCount += 1
  72. } else if (speed >= 60) {
  73. mscc.normalSpeedCarCount += 1
  74. } else {
  75. mscc.lowSpeedCarCount += 1
  76. }
  77. map.put(currentKey, mscc)
  78. }
  79. }
  80. val tuples: List[(String, MonitorSpeedClsCount)] = map.toList.sortWith((tp1, tp2) => {
  81. tp1._2 > tp2._2
  82. }).take(5)
  83. for (elem <- tuples) {
  84. val windowStartTime: String = DateUtils.timestampToDataStr(context.window.getStart)
  85. val windowEndTime: String = DateUtils.timestampToDataStr(context.window.getEnd)
  86. out.collect(new Top5MonitorInfo(windowStartTime, windowEndTime, elem._1, elem._2.hightSpeedCarCount, elem._2.middleSpeedCount, elem._2.normalSpeedCarCount, elem._2.lowSpeedCarCount))
  87. }
  88. }
  89. })
  90. //打印结果
  91. top5MonitorInfoDS.addSink(new JDBCSink[Top5MonitorInfo]("Top5MonitorInfo"))
  92. env.execute()
  93. }
  94. }

四、实时对违法车辆-套牌车辆 进行监控

  1. import java.util.Properties
  2. import com.bjsxt.traffic.utils.{DateUtils, JDBCSink, MonitorCarInfo, ViolationCarInfo}
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema
  4. import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
  5. import org.apache.flink.streaming.api.functions.KeyedProcessFunction
  6. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  7. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
  8. import org.apache.flink.util.Collector
  9. import org.apache.kafka.common.serialization.StringDeserializer
  10. object RepatitionCarInfo {
  11. def main(args: Array[String]): Unit = {
  12. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  13. import org.apache.flink.streaming.api.scala._
  14. //设置kafka 配置
  15. val props = new Properties()
  16. props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
  17. props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
  18. props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
  19. props.setProperty("group.id","group112504")
  20. //Flink 读取Kafka 中数据
  21. val carMonitorInfoDS: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("monitortopic1125",new SimpleStringSchema(),props).setStartFromEarliest())
  22. //转换数据
  23. val transferDs: DataStream[MonitorCarInfo] = carMonitorInfoDS.map(line => {
  24. val arr: Array[String] = line.split("\t")
  25. MonitorCarInfo(arr(0), arr(1), arr(2), arr(3), arr(4).toLong, arr(5), arr(6).toDouble)
  26. })
  27. //对监控到车辆数据,使用状态编程 ,统计在10s 内通过两个卡扣的涉嫌套牌车辆
  28. val violationCarInfoDS: DataStream[ViolationCarInfo] = transferDs.keyBy(_.car).process(new KeyedProcessFunction[String, MonitorCarInfo, ViolationCarInfo] {
  29. //给每个车辆设置状态,状态中存储的是最近这辆车经过卡扣的时间
  30. private lazy val carStateTime: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("carTimeState", classOf[Long]))
  31. //value : 当前本条数据 ,ctx :上下文数据,out:回收数据对象
  32. override def processElement(value: MonitorCarInfo, ctx: KeyedProcessFunction[String, MonitorCarInfo, ViolationCarInfo]#Context, out: Collector[ViolationCarInfo]): Unit = {
  33. //获取当前车辆的状态值
  34. val preLastActionTime: Long = carStateTime.value()
  35. //判断当前车辆是否有对应的状态值
  36. if (preLastActionTime == 0) {
  37. //没有状态,存储状态
  38. carStateTime.update(value.actionTime)
  39. } else {
  40. //获取当前数据通过卡扣拍摄时间
  41. val actionTime: Long = value.actionTime
  42. val car: String = value.car
  43. //有对应的状态,就获取状态时间与当前本条数据时间做差值,看下是否超过10s
  44. if ((actionTime - preLastActionTime).abs < 10 * 1000) {
  45. //涉嫌套牌车辆
  46. out.collect(new ViolationCarInfo(car,
  47. "涉嫌套牌",
  48. DateUtils.timestampToDataStr(System.currentTimeMillis()),
  49. s"车辆${car}通过上一次卡扣时间${preLastActionTime},本次通过卡扣时间${actionTime},两次时间差值为${(preLastActionTime - actionTime).abs}ms"))
  50. }
  51. //更新状态
  52. carStateTime.update(actionTime.max(preLastActionTime))
  53. }
  54. }
  55. })
  56. violationCarInfoDS.addSink(new JDBCSink[ViolationCarInfo]("ViolationCarInfo"))
  57. env.execute()
  58. }
  59. }