一、车辆超速信息监控
- 从kafka读取卡口车辆的速度信息,关联卡口的限速信息表,统计车辆超速20%车辆,保存到mysql结果表中。
- 注意:使用广播数据流,使用场景,两个流要关联,一个流大,一个流小,小的流会定期变化,可以将小的流全量广播到每个taskManager中。
- 主要涉及kafkasource、mysqlsink、广播算子的样例代码 ```scala import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet} import java.util.Properties
import com.bjsxt.traffic.utils.{JDBCSink, MonitorCarInfo, MonitorLimitSpeedInfo, OverSpeedCarInfo} import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor, ReadOnlyBroadcastState} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.BroadcastStream import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase} import org.apache.flink.util.Collector import org.apache.kafka.common.serialization.StringSerializer
object OverSpeedCarAnaly { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.streaming.api.scala._
//设置Kafka 配置
val props = new Properties()
props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
props.setProperty("key.deserializer",classOf[StringSerializer].getName)
props.setProperty("value.deserializer",classOf[StringSerializer].getName)
props.setProperty("group.id","group1125")
//创建Kafka Source,monitortopic1125表示读取的topic
//setStartFromEarliest表示从最开始读取数据
val kafkaSource: FlinkKafkaConsumerBase[String] = new FlinkKafkaConsumer[String]("monitortopic1125",new SimpleStringSchema(),props).setStartFromEarliest()
//读取车辆监控信息
val ds1: DataStream[String] = env.addSource(kafkaSource)
// val ds1: DataStream[String] = env.socketTextStream("mynode5",9999)
//对车辆监控信息进行转换数据
//MonitorCarInfo格式如下:
//areaId:String,roadId:String,monitorId:String,cameraId:String,actionTime:Long,car:String,speed:Double
val carMonitorDS: DataStream[MonitorCarInfo] = ds1.map(line => {
val arr: Array[String] = line.split("\t")
MonitorCarInfo(arr(0), arr(1), arr(2), arr(3), arr(4).toLong, arr(5), arr(6).toDouble)
})
//读取区域-道路-卡扣 车辆限速信息,读取mysql中的数据,每隔半小时读取一次
//使用 RichSourceFunction 生成卡口限速信息
val limitSpeedDs: DataStream[MonitorLimitSpeedInfo] = env.addSource(new RichSourceFunction[MonitorLimitSpeedInfo] {
var conn: Connection = _
var pst: PreparedStatement = _
var stop = false
//当初始化 RichSourceFunction时调用一次,可以初始化数据库连接对象
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://192.168.179.14:3306/traffic_monitor", "root", "123456")
pst = conn.prepareStatement("select area_id,road_id,monitor_id,speed_limit from t_monitor_speed_limit_info")
}
//产生数据的方法,一般使用一个循环来实现读取mysql数据
override def run(ctx: SourceFunction.SourceContext[MonitorLimitSpeedInfo]): Unit = {
while (!stop) {
val set: ResultSet = pst.executeQuery()
while (set.next()) {
val areaId: String = set.getString("area_id")
val roadId: String = set.getString(2)
val monitorId: String = set.getString(3)
val speedLimit: Double = set.getDouble(4)
ctx.collect(new MonitorLimitSpeedInfo(areaId, roadId, monitorId, speedLimit))
}
//每隔半小时查询一次数据库全量的限速信息
Thread.sleep(30 * 1000 * 60)
}
}
//当Flink程序取消时,自动调用cancel方法
override def cancel(): Unit = {
stop = true
pst.close()
conn.close()
}
})
//将从mysql中读取的限速信息,使用广播流广播出去
val mapStateDesc = new MapStateDescriptor[String,Double]("map-state",classOf[String],classOf[Double])
val bcLimitSpeed: BroadcastStream[MonitorLimitSpeedInfo] = limitSpeedDs.broadcast(mapStateDesc)
//连接车辆监控的数据流与当前的广播流 ,进行车辆超速判断
val overSpeedInfoDs: DataStream[OverSpeedCarInfo] = carMonitorDS.connect(bcLimitSpeed).process(new BroadcastProcessFunction[MonitorCarInfo, MonitorLimitSpeedInfo, OverSpeedCarInfo] {
//针对车辆监控数据进行处理,value : 当前本条数据 cxt:Flink上下文,out:回收数据对象
override def processElement(value: MonitorCarInfo, ctx: BroadcastProcessFunction[MonitorCarInfo, MonitorLimitSpeedInfo, OverSpeedCarInfo]#ReadOnlyContext, out: Collector[OverSpeedCarInfo]): Unit = {
val areaId: String = value.areaId
val monitorId: String = value.monitorId
val cameraId: String = value.cameraId
//本条数据区域_道路_卡扣 信息
val key = areaId + "_" + monitorId + "_" + cameraId
//获取广播状态,判断当前数据key 是否有对应的限速状态
val readOnlyBS: ReadOnlyBroadcastState[String, Double] = ctx.getBroadcastState(mapStateDesc)
if (readOnlyBS.contains(key)) {
//如果包含当前key对应的限速
if (value.speed > readOnlyBS.get(key) * 1.2) {
//超速车辆,回收回去
out.collect(new OverSpeedCarInfo(value.car, value.monitorId, value.roadId, value.speed, readOnlyBS.get(key), value.actionTime))
}
} else {
//如果不包含限速信息,默认限速为60
if (value.speed > 60 * 1.2) {
//超速车辆信息,回收回去
out.collect(new OverSpeedCarInfo(value.car, value.monitorId, value.roadId, value.speed, 60.0, value.actionTime))
}
}
}
//针对广播流数据进行处理 ,value :当前广播的此条数据,ctx :Flink上下文,out:回收数据对象
override def processBroadcastElement(value: MonitorLimitSpeedInfo, ctx: BroadcastProcessFunction[MonitorCarInfo, MonitorLimitSpeedInfo, OverSpeedCarInfo]#Context, out: Collector[OverSpeedCarInfo]): Unit = {
val broadCastState: BroadcastState[String, Double] = ctx.getBroadcastState(mapStateDesc)
val key = value.areaId + "_" + value.roadId + "_" + value.monitorId //key : 区域_道路_卡扣
broadCastState.put(key, value.limitSpeed)
}
})
//将结果写入mysql 表 t_speed_info
overSpeedInfoDs.addSink(new JDBCSink[OverSpeedCarInfo]("OverSpeedCarInfo"))
env.execute()
} }
import java.util import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Table} import org.apache.hadoop.conf import org.apache.hadoop.hbase.{HBaseConfiguration, TableName, client} /**
HBase sink ,批量数据插入到HBase中 */ class HBaseSink extends RichSinkFunction[java.util.List[Put]] { var configuration: conf.Configuration = var conn: client.Connection = //初始化 RichSinkFunction 对象时 执行一次 override def open(parameters: Configuration): Unit = { configuration = HBaseConfiguration.create() configuration.set(“hbase.zookeeper.quorum”,”mynode3:2181,mynode4:2181,mynode5:2181”) conn = ConnectionFactory.createConnection(configuration) }
//每条数据执行一次 override def invoke(value: util.List[Put], context: SinkFunction.Context[_]): Unit = { //连接HBase 表 val table: Table = conn.getTable(TableName.valueOf(“a1”)) table.put(value) } } ```
二、每个区域每个道路每个卡扣的平均速度
- 每隔1分钟统计过去5分钟,每个区域每个道路每个卡扣的平均速度
- 涉及EventTime时间语义 ```scala import java.text.SimpleDateFormat import java.util.{Date, Properties}
import com.bjsxt.traffic.utils.{JDBCSink, MonitorAvgSpeedInfo, MonitorCarInfo} import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.util.Collector import org.apache.kafka.common.serialization.StringDeserializer
/**
每隔1分钟统计过去5分钟,每个区域每个道路每个卡扣的平均速度 */ object MonitorAvgSpeedAnaly { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.streaming.api.scala._
//设置并行度和事件时间 env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//读取Kafka中 实时监控车辆数据 val props = new Properties() props.setProperty(“bootstrap.servers”,”mynode1:9092,mynode2:9092,mynode3:9092”) props.setProperty(“key.deserializer”,classOf[StringDeserializer].getName) props.setProperty(“value.deserializer”,classOf[StringDeserializer].getName) props.setProperty(“group.id”,”group112502”)
//读取Kafka 中监控到实时的车辆信息 val monitorInfosDs: DataStream[String] = env.addSource(new FlinkKafkaConsumerString,props).setStartFromEarliest()) // val monitorInfosDs: DataStream[String] = env.socketTextStream(“mynode5”,9999) //数据类型转换 val transferDS: DataStream[MonitorCarInfo] = monitorInfosDs.map(line => { val arr: Array[String] = line.split(“\t”) MonitorCarInfo(arr(0), arr(1), arr(2), arr(3), arr(4).toLong, arr(5), arr(6).toDouble) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorMonitorCarInfo) { override def extractTimestamp(element: MonitorCarInfo): Long = element.actionTime//设置EventTime的字段 })
//设置窗口,每隔1分钟计算每个卡扣的平均速度,使用aggregate 增量计算+全量计算
val monitorAvgSpeedDS: DataStream[MonitorAvgSpeedInfo] = transferDS.keyBy(mi => {
mi.areaId + "_" + mi.roadId + "_" + mi.monitorId
}).timeWindow(Time.minutes(5), Time.minutes(1))
.aggregate(new AggregateFunction[MonitorCarInfo, (Long, Double), (Long, Double)] {
override def createAccumulator(): (Long, Double) = (0L, 0L)
override def add(value: MonitorCarInfo, accumulator: (Long, Double)): (Long, Double) = {
(accumulator._1 + 1, accumulator._2 + value.speed)
}
override def getResult(accumulator: (Long, Double)): (Long, Double) = accumulator
override def merge(a: (Long, Double), b: (Long, Double)): (Long, Double) = (a._1 + b._1, a._2 + b._2)
},
new WindowFunction[(Long, Double), MonitorAvgSpeedInfo, String, TimeWindow] {
//key : 区域_道路_卡扣 window : 当前窗口,input :输入的数据 ,out:回收数据对象
override def apply(key: String, window: TimeWindow, input: Iterable[(Long, Double)], out: Collector[MonitorAvgSpeedInfo]): Unit = {
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val startTime: String = sdf.format(new Date(window.getStart))
val endTime: String = sdf.format(new Date(window.getEnd))
//计算当前卡扣的平均速度
val last: (Long, Double) = input.last
val avgSpeed: Double = (last._2 / last._1).formatted("%.2f").toDouble
out.collect(new MonitorAvgSpeedInfo(startTime, endTime, key, avgSpeed, last._1))
}
})
//保存到mysql 数据库中
monitorAvgSpeedDS.addSink(new JDBCSink[MonitorAvgSpeedInfo]("MonitorAvgSpeedInfo"))
env.execute()
} }
//定义卡扣平均速度信息 case class MonitorAvgSpeedInfo(windowStartTime:String,windowEndTime:String,monitorId:String,avgSpeed:Double,carCount:Long)
<a name="X3shh"></a>
## 三、每隔1分钟统计最近5分钟 最通畅的topN卡扣信息
```scala
import java.util.Properties
import com.bjsxt.traffic.utils._
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector
import org.apache.kafka.common.serialization.StringDeserializer
object TopNMinAvgSpeedMonitorAnaly {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//设置并行度和事件时间
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//读取Kafka中 实时监控车辆数据
val props = new Properties()
props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
props.setProperty("group.id","group112502")
//读取Kafka 中监控到实时的车辆信息
val monitorInfosDs: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("monitortopic1125",new SimpleStringSchema(),props).setStartFromEarliest())
// val monitorInfosDs: DataStream[String] = env.socketTextStream("mynode5",9999)
//数据类型转换
val transferDS: DataStream[MonitorCarInfo] = monitorInfosDs.map(line => {
val arr: Array[String] = line.split("\t")
MonitorCarInfo(arr(0), arr(1), arr(2), arr(3), arr(4).toLong, arr(5), arr(6).toDouble)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MonitorCarInfo](Time.seconds(5)) {
override def extractTimestamp(element: MonitorCarInfo): Long = element.actionTime
})
//每隔1分钟统计过去5分钟,最通畅的top5卡扣信息
val top5MonitorInfoDS: DataStream[Top5MonitorInfo] = transferDS.timeWindowAll(Time.minutes(5), Time.minutes(1))
.process(new ProcessAllWindowFunction[MonitorCarInfo, Top5MonitorInfo, TimeWindow] {
//context :Flink 上下文,elements : 窗口期内所有元素,out: 回收数据对象
override def process(context: Context, elements: Iterable[MonitorCarInfo], out: Collector[Top5MonitorInfo]): Unit = {
val map = scala.collection.mutable.Map[String, MonitorSpeedClsCount]()
val iter: Iterator[MonitorCarInfo] = elements.iterator
while (iter.hasNext) {
val currentInfo: MonitorCarInfo = iter.next()
val areaId: String = currentInfo.areaId
val roadId: String = currentInfo.roadId
val monitorId: String = currentInfo.monitorId
val speed: Double = currentInfo.speed
val currentKey = areaId + "_" + roadId + "_" + monitorId
//判断当前map中是否含有当前本条数据对应的 区域_道路_卡扣的信息
if (map.contains(currentKey)) {
//判断当前此条车辆速度位于哪个速度端,给map中当前key 对应的value MonitorSpeedClsCount 对象对应的速度段加1
if (speed >= 120) {
map.get(currentKey).get.hightSpeedCarCount += 1
} else if (speed >= 90) {
map.get(currentKey).get.middleSpeedCount += 1
} else if (speed >= 60) {
map.get(currentKey).get.normalSpeedCarCount += 1
} else {
map.get(currentKey).get.lowSpeedCarCount += 1
}
} else {
//不包含 当前key
val mscc = MonitorSpeedClsCount(0L, 0L, 0L, 0L)
if (speed >= 120) {
mscc.hightSpeedCarCount += 1
} else if (speed >= 90) {
mscc.middleSpeedCount += 1
} else if (speed >= 60) {
mscc.normalSpeedCarCount += 1
} else {
mscc.lowSpeedCarCount += 1
}
map.put(currentKey, mscc)
}
}
val tuples: List[(String, MonitorSpeedClsCount)] = map.toList.sortWith((tp1, tp2) => {
tp1._2 > tp2._2
}).take(5)
for (elem <- tuples) {
val windowStartTime: String = DateUtils.timestampToDataStr(context.window.getStart)
val windowEndTime: String = DateUtils.timestampToDataStr(context.window.getEnd)
out.collect(new Top5MonitorInfo(windowStartTime, windowEndTime, elem._1, elem._2.hightSpeedCarCount, elem._2.middleSpeedCount, elem._2.normalSpeedCarCount, elem._2.lowSpeedCarCount))
}
}
})
//打印结果
top5MonitorInfoDS.addSink(new JDBCSink[Top5MonitorInfo]("Top5MonitorInfo"))
env.execute()
}
}
四、实时对违法车辆-套牌车辆 进行监控
import java.util.Properties
import com.bjsxt.traffic.utils.{DateUtils, JDBCSink, MonitorCarInfo, ViolationCarInfo}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector
import org.apache.kafka.common.serialization.StringDeserializer
object RepatitionCarInfo {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//设置kafka 配置
val props = new Properties()
props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
props.setProperty("group.id","group112504")
//Flink 读取Kafka 中数据
val carMonitorInfoDS: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("monitortopic1125",new SimpleStringSchema(),props).setStartFromEarliest())
//转换数据
val transferDs: DataStream[MonitorCarInfo] = carMonitorInfoDS.map(line => {
val arr: Array[String] = line.split("\t")
MonitorCarInfo(arr(0), arr(1), arr(2), arr(3), arr(4).toLong, arr(5), arr(6).toDouble)
})
//对监控到车辆数据,使用状态编程 ,统计在10s 内通过两个卡扣的涉嫌套牌车辆
val violationCarInfoDS: DataStream[ViolationCarInfo] = transferDs.keyBy(_.car).process(new KeyedProcessFunction[String, MonitorCarInfo, ViolationCarInfo] {
//给每个车辆设置状态,状态中存储的是最近这辆车经过卡扣的时间
private lazy val carStateTime: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("carTimeState", classOf[Long]))
//value : 当前本条数据 ,ctx :上下文数据,out:回收数据对象
override def processElement(value: MonitorCarInfo, ctx: KeyedProcessFunction[String, MonitorCarInfo, ViolationCarInfo]#Context, out: Collector[ViolationCarInfo]): Unit = {
//获取当前车辆的状态值
val preLastActionTime: Long = carStateTime.value()
//判断当前车辆是否有对应的状态值
if (preLastActionTime == 0) {
//没有状态,存储状态
carStateTime.update(value.actionTime)
} else {
//获取当前数据通过卡扣拍摄时间
val actionTime: Long = value.actionTime
val car: String = value.car
//有对应的状态,就获取状态时间与当前本条数据时间做差值,看下是否超过10s
if ((actionTime - preLastActionTime).abs < 10 * 1000) {
//涉嫌套牌车辆
out.collect(new ViolationCarInfo(car,
"涉嫌套牌",
DateUtils.timestampToDataStr(System.currentTimeMillis()),
s"车辆${car}通过上一次卡扣时间${preLastActionTime},本次通过卡扣时间${actionTime},两次时间差值为${(preLastActionTime - actionTime).abs}ms"))
}
//更新状态
carStateTime.update(actionTime.max(preLastActionTime))
}
}
})
violationCarInfoDS.addSink(new JDBCSink[ViolationCarInfo]("ViolationCarInfo"))
env.execute()
}
}