任务概述

  • 对应指标:车辆实时位置、车辆实时下一站、车辆实时舒适度、车辆实时状态、实时车速、实时累计行驶里程、实时车型状态分布
  • 开发语言Scala,实时计算引擎flink

数据流向

image.png

  1. 实时数据流
    • 公交车定时(10s)通过http发送can总线原包数据到tcp服务
    • tcp服务对http原始数据包进行解包,并按指定分隔符(FAF5)切割数据,发送(字节数组)到kafka
    • flink利用自定义反序列化方法将字节数组转化成字符串
    • 对gps数据进行拆分截取,并根据深标协议转码
    • 根据相关ID到hbase取得维度信息
    • 进一步计算相关指标,并将数据sink到后端mysql
  2. 维表
    • 各业务库原始数据由平台采集同步到平台ods层
    • 利用hql进行清洗转化得到dim层维度数据
    • 用dataX将维表数据同步到hbse,供flink程序进行join

code

source 类

  • 利用flink自带的kafka connector FlinkKafkaConsumer09创建DataSource
  • 由于kafka中发送的数据为字节数组,所以序列化方法改用自定义的序列化方法 ByteArrayDeserializationSchema ```scala package com.szbus.source

import java.util import java.util.Properties import com.szbus.util.ByteArrayDeserializationSchema import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition

/*

  • @author : 恋晨
  • Date : 2019/7/12 3:44 PM
  • 功能 : 获取各个数据流的source / class source {

/**

  1. * 设置kafka参数
  2. */

val properties = new Properties() properties.setProperty(“bootstrap.servers”, “10.128.1.13:9092,10.128.1.18:9092,10.128.1.19:9092”) properties.setProperty(“group.id”, “prd”) / key 反序列化 */ properties.setProperty(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”) / value 反序列化 */ properties.setProperty(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)

/**

  1. * 获取gps数据
  2. * @param env flink运行环境
  3. * @return 返回gps source
  4. */

def getGpsSource(env:StreamExecutionEnvironment,parallelism:Int): DataStream[String] ={

  1. val gpsSource = env
  2. .addSource(new FlinkKafkaConsumer09[String](
  3. "bus_gps",
  4. new ByteArrayDeserializationSchema(),
  5. properties
  6. ).setStartFromLatest()).name("gpsSource")
  7. .setParallelism(parallelism)
  8. gpsSource

} }

  1. 自定义序列化方法
  2. ```java
  3. package com.szbus.util;
  4. import org.apache.flink.api.common.serialization.DeserializationSchema;
  5. import org.apache.flink.api.common.serialization.SerializationSchema;
  6. import org.apache.flink.api.common.typeinfo.TypeInformation;
  7. import java.io.IOException;
  8. /**
  9. * @author : 恋晨
  10. * Date : 2019/6/28 5:57 PM
  11. * 功能 : kafka Topic 字节数组反序列化类
  12. */
  13. public class ByteArrayDeserializationSchema implements DeserializationSchema<String>,SerializationSchema<String>{
  14. public static String parseByte2HexStr(byte buf[]){
  15. StringBuffer sb = new StringBuffer();
  16. for (int i = 0 ; i < buf.length ; i++){
  17. String hex = Integer.toHexString(buf[i] & 0xFF);
  18. if(hex.length() == 1){
  19. hex = '0' + hex;
  20. }
  21. sb.append(hex.toUpperCase());
  22. }
  23. return sb.toString();
  24. }
  25. @Override
  26. public String deserialize(byte[] bytes) throws IOException {
  27. String out = parseByte2HexStr(bytes);
  28. return out;
  29. }
  30. @Override
  31. public boolean isEndOfStream(String s) {
  32. return false;
  33. }
  34. @Override
  35. public byte[] serialize(String s) {
  36. return new byte[0];
  37. }
  38. @Override
  39. public TypeInformation<String> getProducedType() {
  40. return TypeInformation.of(String.class);
  41. }
  42. }

join 类

  • 这个类继承AsyncFunction,利用flink异步IO进行流维join
  • 输入输出数据类型,分别数case class{ gps , gps_join_stop_previous}
  • 从输入数据中取出bus_id和line_code,用这两个rowKey从维表dim_hbase_line_stop,dim_hbase_line_stop取得对应的数据(json格式)
  • 从json取出需要的字段(up_end_order:上行终点站序号,down_end_order:下行终点站序号,stop_list:站点列表,bus_type:车辆型号),组合成新的case class gps_join_stop_previous
  1. package com.szbus.join
  2. import com.szbus.cache.HbaseSideCache
  3. import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
  4. import com.szbus.main.caseClasses.gps
  5. import com.szbus.main.caseClasses.gps_join_stop_previous
  6. /**
  7. *
  8. * @author : 恋晨
  9. * Date : 2019/7/23 2:26 PM
  10. * 功能 :
  11. *
  12. */
  13. class gpsJoinStop extends AsyncFunction [gps , gps_join_stop_previous]{
  14. override def asyncInvoke(input: gps, resultFuture: ResultFuture[gps_join_stop_previous]): Unit = {
  15. val bus_id = input.busId
  16. val line_code = input.line_code
  17. val stop_list = HbaseSideCache.getValueByKey("dim_hbase_line_stop" , line_code)
  18. val bus_type = HbaseSideCache.getValueByKey("dim_hbase_bus_type" , bus_id)
  19. val obj = Seq(gps_join_stop_previous
  20. .apply(
  21. input.busId,
  22. input.runType,
  23. input.lng,
  24. input.lat,
  25. input.speed,
  26. input.mileage,
  27. input.dataTime,
  28. input.direction,
  29. input.station_num,
  30. stop_list.getString("up_end_order"),
  31. stop_list.getString("down_end_order"),
  32. stop_list.getString("stop_list"),
  33. bus_type.getString("bus_type")
  34. )
  35. )
  36. resultFuture.complete(obj)
  37. }
  38. }

sink 类

  1. package com.szbus.sink
  2. import java.sql.{Connection, PreparedStatement}
  3. import com.szbus.util.MySqlClient
  4. import com.szbus.main.caseClasses.gpsSink
  5. import org.apache.flink.configuration.Configuration
  6. import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
  7. import org.apache.flink.types.Row
  8. import org.slf4j.LoggerFactory
  9. /**
  10. *
  11. * @author : 恋晨
  12. * Date : 2019/7/3 9:06 PM
  13. * 功能 : 自定义mysql sink类
  14. *
  15. */
  16. class gpsSinkToMySql extends RichSinkFunction[gpsSink]{
  17. final val log = LoggerFactory.getLogger("gpsSinkToMySql")
  18. private var connection:Connection = _
  19. private var ps:PreparedStatement = _
  20. /**
  21. * 初始化方法
  22. * 获取数据库连接
  23. */
  24. override def open(parameters: Configuration): Unit ={
  25. connection = MySqlClient.getConnection()
  26. }
  27. /**
  28. * close方法
  29. * 关闭连接释放资源
  30. */
  31. override def close(): Unit = {
  32. try{
  33. if (connection != null) {
  34. connection.close();
  35. }
  36. if (ps != null) {
  37. ps.close();
  38. }
  39. }catch {
  40. case e: Exception => log.error(e.getMessage)
  41. }
  42. }
  43. /**
  44. * invoke方法
  45. * 每条数据都会调用一次invoke方法
  46. */
  47. override def invoke(value: gpsSink, context: SinkFunction.Context[_]): Unit = {
  48. try{
  49. val insertSql = "INSERT INTO bas_tech_bus_real_time(bus_id,run_type,lng,lat,speed,mileage,direction,next_station,event_time,car_type,comfort_level) VALUES (?,?,?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE run_type=?,lng=?,lat=?,speed=?,mileage=?,direction=?,next_station=?,event_time=?,car_type=?,comfort_level=?;"
  50. ps = connection.prepareStatement(insertSql)
  51. /**组装数据,执行插入操作*/
  52. ps.setString(1, value.busId)
  53. ps.setString(2, value.runType)
  54. ps.setString(3, value.lng)
  55. ps.setString(4, value.lat)
  56. ps.setString(5, value.speed)
  57. ps.setString(6, value.mileage)
  58. ps.setString(7, value.direction)
  59. ps.setString(8, value.nextStation)
  60. ps.setString(9, value.dataTime)
  61. ps.setString(10, value.bus_type)
  62. ps.setInt(11 , value.comfort)
  63. ps.setString(12, value.runType)
  64. ps.setString(13, value.lng)
  65. ps.setString(14, value.lat)
  66. ps.setString(15, value.speed)
  67. ps.setString(16, value.mileage)
  68. ps.setString(17, value.direction)
  69. ps.setString(18, value.nextStation)
  70. ps.setString(19, value.dataTime)
  71. ps.setString(20, value.bus_type)
  72. ps.setInt(21 , value.comfort)
  73. ps.executeLargeUpdate()
  74. }catch {
  75. case e: Exception => log.error(e.getMessage)
  76. }
  77. }
  78. }

flink 主类

  1. package com.szbus.main
  2. import com.alibaba.fastjson.JSON
  3. import com.szbus.join.gpsJoinStop
  4. import com.szbus.main.caseClasses.{gps,gpsSink}
  5. import com.szbus.sink.gpsSinkToMySql
  6. import com.szbus.udf.{IFF, Key2Type, getInterval, nextStation}
  7. import com.szbus.util.gpsAnalyzeFunctions._
  8. import com.szbus.source.source
  9. import org.apache.flink.api.common.time.Time
  10. import org.apache.flink.configuration.{CheckpointingOptions, Configuration}
  11. import org.apache.flink.contrib.streaming.state.{RocksDBOptions, RocksDBStateBackend}
  12. import org.apache.flink.runtime.state.StateBackend
  13. import org.apache.flink.runtime.state.filesystem.FsStateBackend
  14. import org.apache.flink.streaming.api.CheckpointingMode
  15. import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
  16. import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
  17. import org.apache.flink.types.Row
  18. import org.apache.flink.util.{Collector, TernaryBoolean}
  19. import org.slf4j.LoggerFactory
  20. import scala.concurrent.duration
  21. import scala.util.Random
  22. /**
  23. *
  24. * @author : 恋晨
  25. * Date : 2019/6/28 4:59 PM
  26. * 功能 : 解析GPS主程序
  27. *
  28. */
  29. object szBusGps {
  30. final val log = LoggerFactory.getLogger("szbusGps")
  31. final val seq = (70 to 90).map(x=>x)
  32. def main(args: Array[String]): Unit = {
  33. /**
  34. * 获取flink执行环境
  35. */
  36. val env = StreamExecutionEnvironment.getExecutionEnvironment
  37. /**
  38. * 启用checkpoint
  39. * 设置checkpoint最小间隔为20000ms
  40. * 设置checkpoint超时时间
  41. * 设置任务的默认并行度为4
  42. *
  43. * 在数据量很大的情况下,缩短checkpoint的时间,
  44. * 可以减小每次checkpoint 的state的大小,提高稳定性
  45. */
  46. env.enableCheckpointing(30000, CheckpointingMode.AT_LEAST_ONCE)
  47. env.getCheckpointConfig.setMinPauseBetweenCheckpoints(30000)
  48. env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  49. env.getCheckpointConfig.setCheckpointTimeout(900000)
  50. env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  51. env.setParallelism(10)
  52. /**
  53. * 设置状态后端为rocksDB
  54. *
  55. * 线上环境建议不要使用memory 作为状态后端
  56. * TIMER分为HEAP(默认,性能更好)和RocksDB(扩展好)
  57. */
  58. val checkpointDataUri ="hdfs:///flink/checkpoints-data"
  59. val tmpDir = "file:///tmp/rocksdb/data/"
  60. val fsStateBackend: StateBackend = new FsStateBackend(checkpointDataUri)
  61. val rocksDBBackend: RocksDBStateBackend = new RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE)
  62. val config = new Configuration()
  63. config.setString(RocksDBOptions.TIMER_SERVICE_FACTORY,RocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString)
  64. rocksDBBackend.configure(config)
  65. rocksDBBackend.setDbStoragePath(tmpDir)
  66. env.setStateBackend(rocksDBBackend.asInstanceOf[StateBackend])
  67. /**
  68. * 获取source
  69. */
  70. val SOURCE = new source
  71. val gpsSource = SOURCE.getGpsSource(env,3)
  72. /**
  73. *过滤掉不是车辆GPS的报文数据
  74. * gps数据的命令字为41
  75. */
  76. val GpsFilterMapStream = gpsSource
  77. .filter(x => {
  78. var out = false
  79. if (x.length >= 12 ){
  80. /**
  81. * length从报文中截取出报文长度的编码
  82. * command为报文的命令字
  83. * dataLen报文长度
  84. */
  85. val length = x.substring(4 , 8)
  86. val command = x.substring(10 , 12)
  87. val dataLen = Integer.parseInt(length , 16)
  88. if("41".equals(command) & x.length > 8 + dataLen*2){
  89. out = true
  90. }else{
  91. out = false
  92. }
  93. }else{
  94. out = false
  95. }
  96. out
  97. }).name("过滤非gps报文")
  98. .setParallelism(3)
  99. /**
  100. * 报文字段拆分
  101. */
  102. .map(x =>{(
  103. x.substring(42 , 58),
  104. x.substring(160 , 162),
  105. x.substring(110 , 120),
  106. x.substring(120 , 128),
  107. x.substring(144 , 148),
  108. x.substring(164 , 172),
  109. x.substring(156 , 158),
  110. x.substring(128 , 140),
  111. x.substring(58 , 74),
  112. x.substring(92 , 94)
  113. )}).name("拆分报文")
  114. .setParallelism(3)
  115. /**
  116. *将拆分出来的报文 转化成 具体的字段值
  117. */
  118. val messageSplitStream = GpsFilterMapStream
  119. .map(x =>{
  120. var speed = "0.0"
  121. var mileage = "0.0"
  122. try{
  123. speed = (x._5.toDouble/10).toString
  124. mileage = (x._6.toDouble/100).toString
  125. }catch{
  126. case e:Exception => log.error(e.getMessage)
  127. }
  128. gps(
  129. asciiToString(stringToDecimal(x._1)),
  130. if(!"1".equals(bytes2BinaryStr(hexStringToByte(x._7)).substring(6,7))) "营运中" else operationStatus2(x._2),
  131. DfToDd("lon" , x._3).toString,
  132. DfToDd("lat" , x._4).toString,
  133. speed,
  134. mileage,
  135. "20" + x._8,
  136. bytes2BinaryStr(hexStringToByte(x._7)).substring(3,4),
  137. asciiToString(stringToDecimal(x._9)).replaceAll("\u0000" , ""),
  138. Integer.parseInt(x._10 , 16).toString
  139. )
  140. }).name("根据深标协议对字段解码").setParallelism(6)
  141. .filter(x =>{
  142. (x.lat != "0.0" || x.lng != "0.0") && x.line_code != null && x.line_code != "" && x.station_num != null && x.station_num != "" && x.busId != null && x.busId != ""
  143. }).name("过滤无效gps")
  144. .setParallelism(6)
  145. val gps_join_stop = AsyncDataStream.unorderedWait(
  146. messageSplitStream , new gpsJoinStop , 2 , duration.MINUTES
  147. ).name("gps数据join线路站点信息").setParallelism(10)
  148. val gpsNextStream = gps_join_stop
  149. .map(x =>{
  150. val next_Station = nextStation(x.station_num,x.up_end_order,x.down_end_order,x.stop_list)
  151. val comfort = Random.shuffle(seq).take(1)(0)
  152. gpsSink.apply(
  153. x.busId,
  154. x.runType,
  155. x.lng,
  156. x.lat,
  157. x.speed,
  158. x.mileage,
  159. x.direction,
  160. next_Station,
  161. x.dataTime,
  162. x.bus_type,
  163. if(x.station_num== x.up_end_order || x.station_num== x.down_end_order) 100 else comfort
  164. )
  165. }).name("计算车辆下一站").setParallelism(10)
  166. gpsNextStream
  167. .addSink(new gpsSinkToMySql).name("gpsSinkToMySql").setParallelism(10)
  168. env.execute("GPS实时任务")
  169. }
  170. /**
  171. * 计算车辆实时下一站
  172. * @param current_order 车辆当前站点序号(上下行合在一起的序号)
  173. * @param up_end_order 上行终点站序号
  174. * @param down_end_order 下行终点站序号
  175. * @param stop_list 线路站点列表,jsonString格式
  176. * @return 返回车辆前站点序号对应的下一站,当车辆到达上行或者下行终点站时返回 “--”
  177. */
  178. def nextStation (current_order:String , up_end_order:String , down_end_order:String , stop_list:String): String = {
  179. var next_station = "--"
  180. if(stop_list == null || current_order == null){
  181. next_station
  182. }else{
  183. if(up_end_order == current_order || down_end_order == current_order){
  184. next_station
  185. }else{
  186. next_station = JSON.parseObject(stop_list).getString((current_order.toInt + 1).toString)
  187. if(next_station == null){
  188. "--"
  189. }else{
  190. next_station
  191. }
  192. }
  193. }
  194. }
  195. }