一、车辆超速信息监控
- 从kafka读取卡口车辆的速度信息,关联卡口的限速信息表,统计车辆超速20%车辆,保存到mysql结果表中。
- 注意:使用广播数据流,使用场景,两个流要关联,一个流大,一个流小,小的流会定期变化,可以将小的流全量广播到每个taskManager中。
- 主要涉及kafkasource、mysqlsink、广播算子的样例代码 ```scala import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet} import java.util.Properties
import com.bjsxt.traffic.utils.{JDBCSink, MonitorCarInfo, MonitorLimitSpeedInfo, OverSpeedCarInfo} import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor, ReadOnlyBroadcastState} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.BroadcastStream import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase} import org.apache.flink.util.Collector import org.apache.kafka.common.serialization.StringSerializer
object OverSpeedCarAnaly { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.streaming.api.scala._
//设置Kafka 配置val props = new Properties()props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")props.setProperty("key.deserializer",classOf[StringSerializer].getName)props.setProperty("value.deserializer",classOf[StringSerializer].getName)props.setProperty("group.id","group1125")//创建Kafka Source,monitortopic1125表示读取的topic//setStartFromEarliest表示从最开始读取数据val kafkaSource: FlinkKafkaConsumerBase[String] = new FlinkKafkaConsumer[String]("monitortopic1125",new SimpleStringSchema(),props).setStartFromEarliest()//读取车辆监控信息val ds1: DataStream[String] = env.addSource(kafkaSource)// val ds1: DataStream[String] = env.socketTextStream("mynode5",9999)//对车辆监控信息进行转换数据//MonitorCarInfo格式如下://areaId:String,roadId:String,monitorId:String,cameraId:String,actionTime:Long,car:String,speed:Doubleval carMonitorDS: DataStream[MonitorCarInfo] = ds1.map(line => {val arr: Array[String] = line.split("\t")MonitorCarInfo(arr(0), arr(1), arr(2), arr(3), arr(4).toLong, arr(5), arr(6).toDouble)})//读取区域-道路-卡扣 车辆限速信息,读取mysql中的数据,每隔半小时读取一次//使用 RichSourceFunction 生成卡口限速信息val limitSpeedDs: DataStream[MonitorLimitSpeedInfo] = env.addSource(new RichSourceFunction[MonitorLimitSpeedInfo] {var conn: Connection = _var pst: PreparedStatement = _var stop = false//当初始化 RichSourceFunction时调用一次,可以初始化数据库连接对象override def open(parameters: Configuration): Unit = {conn = DriverManager.getConnection("jdbc:mysql://192.168.179.14:3306/traffic_monitor", "root", "123456")pst = conn.prepareStatement("select area_id,road_id,monitor_id,speed_limit from t_monitor_speed_limit_info")}//产生数据的方法,一般使用一个循环来实现读取mysql数据override def run(ctx: SourceFunction.SourceContext[MonitorLimitSpeedInfo]): Unit = {while (!stop) {val set: ResultSet = pst.executeQuery()while (set.next()) {val areaId: String = set.getString("area_id")val roadId: String = set.getString(2)val monitorId: String = set.getString(3)val speedLimit: Double = set.getDouble(4)ctx.collect(new MonitorLimitSpeedInfo(areaId, roadId, monitorId, speedLimit))}//每隔半小时查询一次数据库全量的限速信息Thread.sleep(30 * 1000 * 60)}}//当Flink程序取消时,自动调用cancel方法override def cancel(): Unit = {stop = truepst.close()conn.close()}})//将从mysql中读取的限速信息,使用广播流广播出去val mapStateDesc = new MapStateDescriptor[String,Double]("map-state",classOf[String],classOf[Double])val bcLimitSpeed: BroadcastStream[MonitorLimitSpeedInfo] = limitSpeedDs.broadcast(mapStateDesc)//连接车辆监控的数据流与当前的广播流 ,进行车辆超速判断val overSpeedInfoDs: DataStream[OverSpeedCarInfo] = carMonitorDS.connect(bcLimitSpeed).process(new BroadcastProcessFunction[MonitorCarInfo, MonitorLimitSpeedInfo, OverSpeedCarInfo] {//针对车辆监控数据进行处理,value : 当前本条数据 cxt:Flink上下文,out:回收数据对象override def processElement(value: MonitorCarInfo, ctx: BroadcastProcessFunction[MonitorCarInfo, MonitorLimitSpeedInfo, OverSpeedCarInfo]#ReadOnlyContext, out: Collector[OverSpeedCarInfo]): Unit = {val areaId: String = value.areaIdval monitorId: String = value.monitorIdval cameraId: String = value.cameraId//本条数据区域_道路_卡扣 信息val key = areaId + "_" + monitorId + "_" + cameraId//获取广播状态,判断当前数据key 是否有对应的限速状态val readOnlyBS: ReadOnlyBroadcastState[String, Double] = ctx.getBroadcastState(mapStateDesc)if (readOnlyBS.contains(key)) {//如果包含当前key对应的限速if (value.speed > readOnlyBS.get(key) * 1.2) {//超速车辆,回收回去out.collect(new OverSpeedCarInfo(value.car, value.monitorId, value.roadId, value.speed, readOnlyBS.get(key), value.actionTime))}} else {//如果不包含限速信息,默认限速为60if (value.speed > 60 * 1.2) {//超速车辆信息,回收回去out.collect(new OverSpeedCarInfo(value.car, value.monitorId, value.roadId, value.speed, 60.0, value.actionTime))}}}//针对广播流数据进行处理 ,value :当前广播的此条数据,ctx :Flink上下文,out:回收数据对象override def processBroadcastElement(value: MonitorLimitSpeedInfo, ctx: BroadcastProcessFunction[MonitorCarInfo, MonitorLimitSpeedInfo, OverSpeedCarInfo]#Context, out: Collector[OverSpeedCarInfo]): Unit = {val broadCastState: BroadcastState[String, Double] = ctx.getBroadcastState(mapStateDesc)val key = value.areaId + "_" + value.roadId + "_" + value.monitorId //key : 区域_道路_卡扣broadCastState.put(key, value.limitSpeed)}})//将结果写入mysql 表 t_speed_infooverSpeedInfoDs.addSink(new JDBCSink[OverSpeedCarInfo]("OverSpeedCarInfo"))env.execute()
} }
import java.util import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Table} import org.apache.hadoop.conf import org.apache.hadoop.hbase.{HBaseConfiguration, TableName, client} /**
HBase sink ,批量数据插入到HBase中 */ class HBaseSink extends RichSinkFunction[java.util.List[Put]] { var configuration: conf.Configuration = var conn: client.Connection = //初始化 RichSinkFunction 对象时 执行一次 override def open(parameters: Configuration): Unit = { configuration = HBaseConfiguration.create() configuration.set(“hbase.zookeeper.quorum”,”mynode3:2181,mynode4:2181,mynode5:2181”) conn = ConnectionFactory.createConnection(configuration) }
//每条数据执行一次 override def invoke(value: util.List[Put], context: SinkFunction.Context[_]): Unit = { //连接HBase 表 val table: Table = conn.getTable(TableName.valueOf(“a1”)) table.put(value) } } ```
二、每个区域每个道路每个卡扣的平均速度
- 每隔1分钟统计过去5分钟,每个区域每个道路每个卡扣的平均速度
- 涉及EventTime时间语义 ```scala import java.text.SimpleDateFormat import java.util.{Date, Properties}
import com.bjsxt.traffic.utils.{JDBCSink, MonitorAvgSpeedInfo, MonitorCarInfo} import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.util.Collector import org.apache.kafka.common.serialization.StringDeserializer
/**
每隔1分钟统计过去5分钟,每个区域每个道路每个卡扣的平均速度 */ object MonitorAvgSpeedAnaly { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.streaming.api.scala._
//设置并行度和事件时间 env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//读取Kafka中 实时监控车辆数据 val props = new Properties() props.setProperty(“bootstrap.servers”,”mynode1:9092,mynode2:9092,mynode3:9092”) props.setProperty(“key.deserializer”,classOf[StringDeserializer].getName) props.setProperty(“value.deserializer”,classOf[StringDeserializer].getName) props.setProperty(“group.id”,”group112502”)
//读取Kafka 中监控到实时的车辆信息 val monitorInfosDs: DataStream[String] = env.addSource(new FlinkKafkaConsumerString,props).setStartFromEarliest()) // val monitorInfosDs: DataStream[String] = env.socketTextStream(“mynode5”,9999) //数据类型转换 val transferDS: DataStream[MonitorCarInfo] = monitorInfosDs.map(line => { val arr: Array[String] = line.split(“\t”) MonitorCarInfo(arr(0), arr(1), arr(2), arr(3), arr(4).toLong, arr(5), arr(6).toDouble) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorMonitorCarInfo) { override def extractTimestamp(element: MonitorCarInfo): Long = element.actionTime//设置EventTime的字段 })
//设置窗口,每隔1分钟计算每个卡扣的平均速度,使用aggregate 增量计算+全量计算val monitorAvgSpeedDS: DataStream[MonitorAvgSpeedInfo] = transferDS.keyBy(mi => {mi.areaId + "_" + mi.roadId + "_" + mi.monitorId}).timeWindow(Time.minutes(5), Time.minutes(1)).aggregate(new AggregateFunction[MonitorCarInfo, (Long, Double), (Long, Double)] {override def createAccumulator(): (Long, Double) = (0L, 0L)override def add(value: MonitorCarInfo, accumulator: (Long, Double)): (Long, Double) = {(accumulator._1 + 1, accumulator._2 + value.speed)}override def getResult(accumulator: (Long, Double)): (Long, Double) = accumulatoroverride def merge(a: (Long, Double), b: (Long, Double)): (Long, Double) = (a._1 + b._1, a._2 + b._2)},new WindowFunction[(Long, Double), MonitorAvgSpeedInfo, String, TimeWindow] {//key : 区域_道路_卡扣 window : 当前窗口,input :输入的数据 ,out:回收数据对象override def apply(key: String, window: TimeWindow, input: Iterable[(Long, Double)], out: Collector[MonitorAvgSpeedInfo]): Unit = {val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")val startTime: String = sdf.format(new Date(window.getStart))val endTime: String = sdf.format(new Date(window.getEnd))//计算当前卡扣的平均速度val last: (Long, Double) = input.lastval avgSpeed: Double = (last._2 / last._1).formatted("%.2f").toDoubleout.collect(new MonitorAvgSpeedInfo(startTime, endTime, key, avgSpeed, last._1))}})//保存到mysql 数据库中monitorAvgSpeedDS.addSink(new JDBCSink[MonitorAvgSpeedInfo]("MonitorAvgSpeedInfo"))env.execute()
} }
//定义卡扣平均速度信息 case class MonitorAvgSpeedInfo(windowStartTime:String,windowEndTime:String,monitorId:String,avgSpeed:Double,carCount:Long)
<a name="X3shh"></a>## 三、每隔1分钟统计最近5分钟 最通畅的topN卡扣信息```scalaimport java.util.Propertiesimport com.bjsxt.traffic.utils._import org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunctionimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.flink.util.Collectorimport org.apache.kafka.common.serialization.StringDeserializerobject TopNMinAvgSpeedMonitorAnaly {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.streaming.api.scala._//设置并行度和事件时间env.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//读取Kafka中 实时监控车辆数据val props = new Properties()props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")props.setProperty("key.deserializer",classOf[StringDeserializer].getName)props.setProperty("value.deserializer",classOf[StringDeserializer].getName)props.setProperty("group.id","group112502")//读取Kafka 中监控到实时的车辆信息val monitorInfosDs: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("monitortopic1125",new SimpleStringSchema(),props).setStartFromEarliest())// val monitorInfosDs: DataStream[String] = env.socketTextStream("mynode5",9999)//数据类型转换val transferDS: DataStream[MonitorCarInfo] = monitorInfosDs.map(line => {val arr: Array[String] = line.split("\t")MonitorCarInfo(arr(0), arr(1), arr(2), arr(3), arr(4).toLong, arr(5), arr(6).toDouble)}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MonitorCarInfo](Time.seconds(5)) {override def extractTimestamp(element: MonitorCarInfo): Long = element.actionTime})//每隔1分钟统计过去5分钟,最通畅的top5卡扣信息val top5MonitorInfoDS: DataStream[Top5MonitorInfo] = transferDS.timeWindowAll(Time.minutes(5), Time.minutes(1)).process(new ProcessAllWindowFunction[MonitorCarInfo, Top5MonitorInfo, TimeWindow] {//context :Flink 上下文,elements : 窗口期内所有元素,out: 回收数据对象override def process(context: Context, elements: Iterable[MonitorCarInfo], out: Collector[Top5MonitorInfo]): Unit = {val map = scala.collection.mutable.Map[String, MonitorSpeedClsCount]()val iter: Iterator[MonitorCarInfo] = elements.iteratorwhile (iter.hasNext) {val currentInfo: MonitorCarInfo = iter.next()val areaId: String = currentInfo.areaIdval roadId: String = currentInfo.roadIdval monitorId: String = currentInfo.monitorIdval speed: Double = currentInfo.speedval currentKey = areaId + "_" + roadId + "_" + monitorId//判断当前map中是否含有当前本条数据对应的 区域_道路_卡扣的信息if (map.contains(currentKey)) {//判断当前此条车辆速度位于哪个速度端,给map中当前key 对应的value MonitorSpeedClsCount 对象对应的速度段加1if (speed >= 120) {map.get(currentKey).get.hightSpeedCarCount += 1} else if (speed >= 90) {map.get(currentKey).get.middleSpeedCount += 1} else if (speed >= 60) {map.get(currentKey).get.normalSpeedCarCount += 1} else {map.get(currentKey).get.lowSpeedCarCount += 1}} else {//不包含 当前keyval mscc = MonitorSpeedClsCount(0L, 0L, 0L, 0L)if (speed >= 120) {mscc.hightSpeedCarCount += 1} else if (speed >= 90) {mscc.middleSpeedCount += 1} else if (speed >= 60) {mscc.normalSpeedCarCount += 1} else {mscc.lowSpeedCarCount += 1}map.put(currentKey, mscc)}}val tuples: List[(String, MonitorSpeedClsCount)] = map.toList.sortWith((tp1, tp2) => {tp1._2 > tp2._2}).take(5)for (elem <- tuples) {val windowStartTime: String = DateUtils.timestampToDataStr(context.window.getStart)val windowEndTime: String = DateUtils.timestampToDataStr(context.window.getEnd)out.collect(new Top5MonitorInfo(windowStartTime, windowEndTime, elem._1, elem._2.hightSpeedCarCount, elem._2.middleSpeedCount, elem._2.normalSpeedCarCount, elem._2.lowSpeedCarCount))}}})//打印结果top5MonitorInfoDS.addSink(new JDBCSink[Top5MonitorInfo]("Top5MonitorInfo"))env.execute()}}
四、实时对违法车辆-套牌车辆 进行监控
import java.util.Propertiesimport com.bjsxt.traffic.utils.{DateUtils, JDBCSink, MonitorCarInfo, ViolationCarInfo}import org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}import org.apache.flink.streaming.api.functions.KeyedProcessFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.flink.util.Collectorimport org.apache.kafka.common.serialization.StringDeserializerobject RepatitionCarInfo {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.streaming.api.scala._//设置kafka 配置val props = new Properties()props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")props.setProperty("key.deserializer",classOf[StringDeserializer].getName)props.setProperty("value.deserializer",classOf[StringDeserializer].getName)props.setProperty("group.id","group112504")//Flink 读取Kafka 中数据val carMonitorInfoDS: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("monitortopic1125",new SimpleStringSchema(),props).setStartFromEarliest())//转换数据val transferDs: DataStream[MonitorCarInfo] = carMonitorInfoDS.map(line => {val arr: Array[String] = line.split("\t")MonitorCarInfo(arr(0), arr(1), arr(2), arr(3), arr(4).toLong, arr(5), arr(6).toDouble)})//对监控到车辆数据,使用状态编程 ,统计在10s 内通过两个卡扣的涉嫌套牌车辆val violationCarInfoDS: DataStream[ViolationCarInfo] = transferDs.keyBy(_.car).process(new KeyedProcessFunction[String, MonitorCarInfo, ViolationCarInfo] {//给每个车辆设置状态,状态中存储的是最近这辆车经过卡扣的时间private lazy val carStateTime: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("carTimeState", classOf[Long]))//value : 当前本条数据 ,ctx :上下文数据,out:回收数据对象override def processElement(value: MonitorCarInfo, ctx: KeyedProcessFunction[String, MonitorCarInfo, ViolationCarInfo]#Context, out: Collector[ViolationCarInfo]): Unit = {//获取当前车辆的状态值val preLastActionTime: Long = carStateTime.value()//判断当前车辆是否有对应的状态值if (preLastActionTime == 0) {//没有状态,存储状态carStateTime.update(value.actionTime)} else {//获取当前数据通过卡扣拍摄时间val actionTime: Long = value.actionTimeval car: String = value.car//有对应的状态,就获取状态时间与当前本条数据时间做差值,看下是否超过10sif ((actionTime - preLastActionTime).abs < 10 * 1000) {//涉嫌套牌车辆out.collect(new ViolationCarInfo(car,"涉嫌套牌",DateUtils.timestampToDataStr(System.currentTimeMillis()),s"车辆${car}通过上一次卡扣时间${preLastActionTime},本次通过卡扣时间${actionTime},两次时间差值为${(preLastActionTime - actionTime).abs}ms"))}//更新状态carStateTime.update(actionTime.max(preLastActionTime))}}})violationCarInfoDS.addSink(new JDBCSink[ViolationCarInfo]("ViolationCarInfo"))env.execute()}}
