hello.txtsensor.txt
pom.xml 文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.atguigu</groupId>
  7. <artifactId>FlinkTutorial</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <dependencies>
  10. <dependency>
  11. <groupId>org.apache.flink</groupId>
  12. <artifactId>flink-scala_2.11</artifactId>
  13. <version>1.7.2</version>
  14. </dependency>
  15. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
  16. <dependency>
  17. <groupId>org.apache.flink</groupId>
  18. <artifactId>flink-streaming-scala_2.11</artifactId>
  19. <version>1.7.2</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.apache.flink</groupId>
  23. <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
  24. <version>1.7.2</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.apache.bahir</groupId>
  28. <artifactId>flink-connector-redis_2.11</artifactId>
  29. <version>1.0</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.apache.flink</groupId>
  33. <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
  34. <version>1.7.2</version>
  35. </dependency>
  36. <dependency>
  37. <groupId>mysql</groupId>
  38. <artifactId>mysql-connector-java</artifactId>
  39. <version>5.1.44</version>
  40. </dependency>
  41. <dependency>
  42. <groupId>org.apache.flink</groupId>
  43. <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
  44. <version>1.7.2</version>
  45. </dependency>
  46. </dependencies>
  47. <build>
  48. <plugins>
  49. <!-- 该插件用于将Scala代码编译成class文件 -->
  50. <plugin>
  51. <groupId>net.alchim31.maven</groupId>
  52. <artifactId>scala-maven-plugin</artifactId>
  53. <version>3.4.6</version>
  54. <executions>
  55. <execution>
  56. <!-- 声明绑定到maven的compile阶段 -->
  57. <goals>
  58. <goal>testCompile</goal>
  59. </goals>
  60. </execution>
  61. </executions>
  62. </plugin>
  63. <plugin>
  64. <groupId>org.apache.maven.plugins</groupId>
  65. <artifactId>maven-assembly-plugin</artifactId>
  66. <version>3.0.0</version>
  67. <configuration>
  68. <descriptorRefs>
  69. <descriptorRef>jar-with-dependencies</descriptorRef>
  70. </descriptorRefs>
  71. </configuration>
  72. <executions>
  73. <execution>
  74. <id>make-assembly</id>
  75. <phase>package</phase>
  76. <goals>
  77. <goal>single</goal>
  78. </goals>
  79. </execution>
  80. </executions>
  81. </plugin>
  82. </plugins>
  83. </build>
  84. </project>

WordCount.scala

  1. package com.atguigu.wc
  2. import org.apache.flink.api.scala._
  3. /**
  4. * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
  5. *
  6. * Project: FlinkTutorial
  7. * Package: com.atguigu.wc
  8. * Version: 1.0
  9. *
  10. * Created by wushengran on 2019/9/16 11:48
  11. */
  12. // 批处理代码
  13. object WordCount {
  14. def main(args: Array[String]): Unit = {
  15. // 创建一个批处理的执行环境
  16. val env = ExecutionEnvironment.getExecutionEnvironment
  17. // 从文件中读取数据
  18. val inputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\hello.txt"
  19. val inputDataSet = env.readTextFile(inputPath)
  20. // 分词之后做count
  21. val wordCountDataSet = inputDataSet.flatMap(_.split(" "))
  22. .map( (_, 1) )
  23. .groupBy(0)
  24. .sum(1)
  25. // 打印输出
  26. wordCountDataSet.print()
  27. }
  28. }

StreamWordCount.scala

  1. package com.atguigu.wc
  2. import org.apache.flink.api.java.utils.ParameterTool
  3. import org.apache.flink.streaming.api.scala._
  4. /**
  5. * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
  6. *
  7. * Project: FlinkTutorial
  8. * Package: com.atguigu.wc
  9. * Version: 1.0
  10. *
  11. * Created by wushengran on 2019/9/16 14:08
  12. */
  13. object StreamWordCount {
  14. def main(args: Array[String]): Unit = {
  15. val params = ParameterTool.fromArgs(args)
  16. val host: String = params.get("host")
  17. val port: Int = params.getInt("port")
  18. // 创建一个流处理的执行环境
  19. val env = StreamExecutionEnvironment.getExecutionEnvironment
  20. // env.setParallelism(1)
  21. // env.disableOperatorChaining()
  22. // 接收socket数据流
  23. val textDataStream = env.socketTextStream(host, port)
  24. // 逐一读取数据,分词之后进行wordcount
  25. val wordCountDataStream = textDataStream.flatMap(_.split("\\s"))
  26. .filter(_.nonEmpty).startNewChain()
  27. .map( (_, 1) )
  28. .keyBy(0)
  29. .sum(1)
  30. // 打印输出
  31. wordCountDataStream.print().setParallelism(1)
  32. // 执行任务
  33. env.execute("stream word count job")
  34. }
  35. }

SourceTest.scala

  1. package com.atguigu.apitest
  2. import java.util.Properties
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema
  4. import org.apache.flink.streaming.api.functions.source.SourceFunction
  5. import org.apache.flink.streaming.api.scala._
  6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
  7. import scala.util.Random
  8. /**
  9. * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
  10. *
  11. * Project: FlinkTutorial
  12. * Package: com.atguigu.apitest
  13. * Version: 1.0
  14. *
  15. * Created by wushengran on 2019/9/17 10:11
  16. */
  17. // 定义传感器数据样例类
  18. case class SensorReading( id: String, timestamp: Long, temperature: Double )
  19. object SourceTest {
  20. def main(args: Array[String]): Unit = {
  21. val env = StreamExecutionEnvironment.getExecutionEnvironment
  22. env.setParallelism(1)
  23. // 1. 从集合中读取数据
  24. val stream1 = env.fromCollection(List(
  25. SensorReading("sensor_1", 1547718199, 35.80018327300259),
  26. SensorReading("sensor_6", 1547718201, 15.402984393403084),
  27. SensorReading("sensor_7", 1547718202, 6.720945201171228),
  28. SensorReading("sensor_10", 1547718205, 38.101067604893444)
  29. ))
  30. // env.fromElements("flink", 1, 32, 3213, 0.324).print("test")
  31. // 2. 从文件中读取数据
  32. val stream2 = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
  33. // 3. 从kafka中读取数据
  34. // 创建kafka相关的配置
  35. val properties = new Properties()
  36. properties.setProperty("bootstrap.servers", "localhost:9092")
  37. properties.setProperty("group.id", "consumer-group")
  38. properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  39. properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  40. properties.setProperty("auto.offset.reset", "latest")
  41. val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
  42. // 4. 自定义数据源
  43. val stream4 = env.addSource(new SensorSource())
  44. // sink输出
  45. stream4.print("stream4")
  46. env.execute("source api test")
  47. }
  48. }
  49. class SensorSource() extends SourceFunction[SensorReading]{
  50. // 定义一个flag:表示数据源是否还在正常运行
  51. var running: Boolean = true
  52. override def cancel(): Unit = running = false
  53. override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
  54. // 创建一个随机数发生器
  55. val rand = new Random()
  56. // 随机初始换生成10个传感器的温度数据,之后在它基础随机波动生成流数据
  57. var curTemp = 1.to(10).map(
  58. i => ( "sensor_" + i, 60 + rand.nextGaussian() * 20 )
  59. )
  60. // 无限循环生成流数据,除非被cancel
  61. while(running){
  62. // 更新温度值
  63. curTemp = curTemp.map(
  64. t => (t._1, t._2 + rand.nextGaussian())
  65. )
  66. // 获取当前的时间戳
  67. val curTime = System.currentTimeMillis()
  68. // 包装成SensorReading,输出
  69. curTemp.foreach(
  70. t => ctx.collect( SensorReading(t._1, curTime, t._2) )
  71. )
  72. // 间隔100ms
  73. Thread.sleep(100)
  74. }
  75. }
  76. }

TransformTest.scala

  1. package com.atguigu.apitest
  2. import org.apache.flink.api.common.functions.{FilterFunction, RichMapFunction}
  3. import org.apache.flink.configuration.Configuration
  4. import org.apache.flink.streaming.api.scala._
  5. /**
  6. * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
  7. *
  8. * Project: FlinkTutorial
  9. * Package: com.atguigu.apitest
  10. * Version: 1.0
  11. *
  12. * Created by wushengran on 2019/9/17 11:41
  13. */
  14. object TransformTest {
  15. def main(args: Array[String]): Unit = {
  16. val env = StreamExecutionEnvironment.getExecutionEnvironment
  17. env.setParallelism(1)
  18. // 读入数据
  19. val inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
  20. // Transform操作
  21. val dataStream = inputStream
  22. .map(
  23. data => {
  24. val dataArray = data.split(",")
  25. SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
  26. }
  27. )
  28. // 1. 聚合操作
  29. val stream1 = dataStream
  30. .keyBy("id")
  31. // .sum("temperature")
  32. .reduce( (x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature + 10) )
  33. // 2. 分流,根据温度是否大于30度划分
  34. val splitStream = dataStream
  35. .split( sensorData => {
  36. if( sensorData.temperature > 30 ) Seq("high") else Seq("low")
  37. } )
  38. val highTempStream = splitStream.select("high")
  39. val lowTempStream = splitStream.select("low")
  40. val allTempStream = splitStream.select("high", "low")
  41. // 3. 合并两条流
  42. val warningStream = highTempStream.map( sensorData => (sensorData.id, sensorData.temperature) )
  43. val connectedStreams = warningStream.connect(lowTempStream)
  44. val coMapStream = connectedStreams.map(
  45. warningData => ( warningData._1, warningData._2, "high temperature warning" ),
  46. lowData => ( lowData.id, "healthy" )
  47. )
  48. val unionStream = highTempStream.union(lowTempStream)
  49. // 函数类
  50. dataStream.filter( new MyFilter() ).print()
  51. // 输出数据
  52. // dataStream.print()
  53. // highTempStream.print("high")
  54. // lowTempStream.print("low")
  55. // allTempStream.print("all")
  56. // unionStream.print("union")
  57. env.execute("transform test job")
  58. }
  59. }
  60. class MyFilter() extends FilterFunction[SensorReading]{
  61. override def filter(value: SensorReading): Boolean = {
  62. value.id.startsWith("sensor_1")
  63. }
  64. }
  65. class MyMapper() extends RichMapFunction[SensorReading, String]{
  66. override def map(value: SensorReading): String = {
  67. "flink"
  68. }
  69. override def open(parameters: Configuration): Unit = super.open(parameters)
  70. }

WindowTest.scala

  1. package com.atguigu.apitest
  2. import org.apache.flink.streaming.api.TimeCharacteristic
  3. import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks, KeyedProcessFunction}
  4. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
  5. import org.apache.flink.streaming.api.scala._
  6. import org.apache.flink.streaming.api.watermark.Watermark
  7. import org.apache.flink.streaming.api.windowing.time.Time
  8. import org.apache.flink.util.Collector
  9. /**
  10. * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
  11. *
  12. * Project: FlinkTutorial
  13. * Package: com.atguigu.apitest
  14. * Version: 1.0
  15. *
  16. * Created by wushengran on 2019/9/18 9:31
  17. */
  18. object WindowTest {
  19. def main(args: Array[String]): Unit = {
  20. val env = StreamExecutionEnvironment.getExecutionEnvironment
  21. env.setParallelism(1)
  22. // 设置事件时间
  23. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  24. env.getConfig.setAutoWatermarkInterval(500)
  25. // 读入数据
  26. // val inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
  27. val inputStream = env.socketTextStream("localhost", 7777)
  28. val dataStream = inputStream
  29. .map(
  30. data => {
  31. val dataArray = data.split(",")
  32. SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
  33. }
  34. )
  35. // .assignAscendingTimestamps(_.timestamp * 1000L)
  36. .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
  37. override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
  38. })
  39. // .assignTimestampsAndWatermarks( new MyAssigner() )
  40. .map(data => (data.id, data.temperature))
  41. .keyBy(_._1)
  42. // .process( new MyProcess() )
  43. .timeWindow(Time.seconds(10), Time.seconds(3))
  44. .reduce((result, data) => (data._1, result._2.min(data._2))) // 统计10秒内的最低温度值
  45. dataStream.print()
  46. env.execute("window api test")
  47. }
  48. }
  49. class MyAssigner() extends AssignerWithPeriodicWatermarks[SensorReading]{
  50. // 定义固定延迟为3秒
  51. val bound: Long = 3 * 1000L
  52. // 定义当前收到的最大的时间戳
  53. var maxTs: Long = Long.MinValue
  54. override def getCurrentWatermark: Watermark = {
  55. new Watermark(maxTs - bound)
  56. }
  57. override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
  58. maxTs = maxTs.max(element.timestamp * 1000L)
  59. element.timestamp * 1000L
  60. }
  61. }
  62. class MyAssigner2() extends AssignerWithPunctuatedWatermarks[SensorReading]{
  63. val bound: Long = 1000L
  64. override def checkAndGetNextWatermark(lastElement: SensorReading, extractedTimestamp: Long): Watermark = {
  65. if( lastElement.id == "sensor_1" ){
  66. new Watermark(extractedTimestamp - bound)
  67. }else{
  68. null
  69. }
  70. }
  71. override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
  72. element.timestamp * 1000L
  73. }
  74. }

SideOutputTest.scala

  1. package com.atguigu.apitest
  2. import org.apache.flink.runtime.state.filesystem.FsStateBackend
  3. import org.apache.flink.runtime.state.memory.MemoryStateBackend
  4. import org.apache.flink.streaming.api.TimeCharacteristic
  5. import org.apache.flink.streaming.api.functions.ProcessFunction
  6. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
  7. import org.apache.flink.streaming.api.scala._
  8. import org.apache.flink.streaming.api.windowing.time.Time
  9. import org.apache.flink.util.Collector
  10. /**
  11. * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
  12. *
  13. * Project: FlinkTutorial
  14. * Package: com.atguigu.apitest
  15. * Version: 1.0
  16. *
  17. * Created by wushengran on 2019/8/24 11:16
  18. */
  19. object SideOutputTest {
  20. def main(args: Array[String]): Unit = {
  21. val env = StreamExecutionEnvironment.getExecutionEnvironment
  22. env.setParallelism(1)
  23. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  24. val stream = env.socketTextStream("localhost", 7777)
  25. val dataStream = stream.map(data => {
  26. val dataArray = data.split(",")
  27. SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
  28. })
  29. .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[SensorReading]( Time.seconds(1) ) {
  30. override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000
  31. } )
  32. val processedStream = dataStream
  33. .process( new FreezingAlert() )
  34. // dataStream.print("input data")
  35. processedStream.print("processed data")
  36. processedStream.getSideOutput( new OutputTag[String]("freezing alert") ).print("alert data")
  37. env.execute("side output test")
  38. }
  39. }
  40. // 冰点报警,如果小于32F,输出报警信息到侧输出流
  41. class FreezingAlert() extends ProcessFunction[SensorReading, SensorReading]{
  42. // lazy val alertOutput: OutputTag[String] = new OutputTag[String]( "freezing alert" )
  43. override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
  44. if( value.temperature < 32.0 ){
  45. ctx.output( new OutputTag[String]( "freezing alert" ), "freezing alert for " + value.id )
  46. }
  47. out.collect( value )
  48. }
  49. }

ProcessFunctionTest.scala

  1. package com.atguigu.apitest
  2. import org.apache.flink.api.common.functions.{RichFlatMapFunction, RichMapFunction}
  3. import org.apache.flink.api.common.restartstrategy.RestartStrategies
  4. import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
  5. import org.apache.flink.configuration.Configuration
  6. import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
  7. import org.apache.flink.runtime.state.filesystem.FsStateBackend
  8. import org.apache.flink.runtime.state.memory.MemoryStateBackend
  9. import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
  10. import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
  11. import org.apache.flink.streaming.api.functions.KeyedProcessFunction
  12. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
  13. import org.apache.flink.streaming.api.scala._
  14. import org.apache.flink.streaming.api.windowing.time.Time
  15. import org.apache.flink.util.Collector
  16. /**
  17. * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
  18. *
  19. * Project: FlinkTutorial
  20. * Package: com.atguigu.apitest
  21. * Version: 1.0
  22. *
  23. * Created by wushengran on 2019/8/24 10:14
  24. */
  25. object ProcessFunctionTest {
  26. def main(args: Array[String]): Unit = {
  27. val env = StreamExecutionEnvironment.getExecutionEnvironment
  28. env.setParallelism(1)
  29. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  30. env.enableCheckpointing(60000)
  31. env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
  32. env.getCheckpointConfig.setCheckpointTimeout(100000)
  33. env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  34. // env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  35. env.getCheckpointConfig.setMinPauseBetweenCheckpoints(100)
  36. env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
  37. env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.seconds(300), org.apache.flink.api.common.time.Time.seconds(10)))
  38. // env.setStateBackend( new RocksDBStateBackend("") )
  39. val stream = env.socketTextStream("localhost", 7777)
  40. val dataStream = stream.map(data => {
  41. val dataArray = data.split(",")
  42. SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
  43. })
  44. .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[SensorReading]( Time.seconds(1) ) {
  45. override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000
  46. } )
  47. val processedStream = dataStream.keyBy(_.id)
  48. .process( new TempIncreAlert() )
  49. val processedStream2 = dataStream.keyBy(_.id)
  50. // .process( new TempChangeAlert(10.0) )
  51. .flatMap( new TempChangeAlert(10.0) )
  52. val processedStream3 = dataStream.keyBy(_.id)
  53. .flatMapWithState[(String, Double, Double), Double]{
  54. // 如果没有状态的话,也就是没有数据来过,那么就将当前数据温度值存入状态
  55. case ( input: SensorReading, None ) => ( List.empty, Some(input.temperature) )
  56. // 如果有状态,就应该与上次的温度值比较差值,如果大于阈值就输出报警
  57. case ( input: SensorReading, lastTemp: Some[Double] ) =>
  58. val diff = ( input.temperature - lastTemp.get ).abs
  59. if( diff > 10.0 ){
  60. ( List((input.id, lastTemp.get, input.temperature)), Some(input.temperature) )
  61. } else
  62. ( List.empty, Some(input.temperature) )
  63. }
  64. dataStream.print("input data")
  65. processedStream3.print("processed data")
  66. env.execute("process function test")
  67. }
  68. }
  69. class TempIncreAlert() extends KeyedProcessFunction[String, SensorReading, String]{
  70. // 定义一个状态,用来保存上一个数据的温度值
  71. lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double]("lastTemp", classOf[Double]) )
  72. // 定义一个状态,用来保存定时器的时间戳
  73. lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long]("currentTimer", classOf[Long]) )
  74. override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
  75. // 先取出上一个温度值
  76. val preTemp = lastTemp.value()
  77. // 更新温度值
  78. lastTemp.update( value.temperature )
  79. val curTimerTs = currentTimer.value()
  80. if( value.temperature < preTemp || preTemp == 0.0 ){
  81. // 如果温度下降,或是第一条数据,删除定时器并清空状态
  82. ctx.timerService().deleteProcessingTimeTimer( curTimerTs )
  83. currentTimer.clear()
  84. } else if ( value.temperature > preTemp && curTimerTs == 0 ){
  85. // 温度上升且没有设过定时器,则注册定时器
  86. val timerTs = ctx.timerService().currentProcessingTime() + 5000L
  87. ctx.timerService().registerProcessingTimeTimer( timerTs )
  88. currentTimer.update( timerTs )
  89. }
  90. }
  91. override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
  92. // 输出报警信息
  93. out.collect( ctx.getCurrentKey + " 温度连续上升" )
  94. currentTimer.clear()
  95. }
  96. }
  97. class TempChangeAlert(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)]{
  98. private var lastTempState: ValueState[Double] = _
  99. override def open(parameters: Configuration): Unit = {
  100. // 初始化的时候声明state变量
  101. lastTempState = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double]))
  102. }
  103. override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = {
  104. // 获取上次的温度值
  105. val lastTemp = lastTempState.value()
  106. // 用当前的温度值和上次的求差,如果大于阈值,输出报警信息
  107. val diff = (value.temperature - lastTemp).abs
  108. if(diff > threshold){
  109. out.collect( (value.id, lastTemp, value.temperature) )
  110. }
  111. lastTempState.update(value.temperature)
  112. }
  113. }
  114. class TempChangeAlert2(threshold: Double) extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)]{
  115. // 定义一个状态变量,保存上次的温度值
  116. lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double]("lastTemp", classOf[Double]) )
  117. override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, (String, Double, Double)]#Context, out: Collector[(String, Double, Double)]): Unit = {
  118. // 获取上次的温度值
  119. val lastTemp = lastTempState.value()
  120. // 用当前的温度值和上次的求差,如果大于阈值,输出报警信息
  121. val diff = (value.temperature - lastTemp).abs
  122. if(diff > threshold){
  123. out.collect( (value.id, lastTemp, value.temperature) )
  124. }
  125. lastTempState.update(value.temperature)
  126. }
  127. }

EsSinkTest.scala

  1. package com.atguigu.apitest.sinktest
  2. import java.util
  3. import com.atguigu.apitest.SensorReading
  4. import org.apache.flink.api.common.functions.RuntimeContext
  5. import org.apache.flink.streaming.api.scala._
  6. import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
  7. import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
  8. import org.apache.http.HttpHost
  9. import org.elasticsearch.client.Requests
  10. /**
  11. * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
  12. *
  13. * Project: FlinkTutorial
  14. * Package: com.atguigu.apitest.sinktest
  15. * Version: 1.0
  16. *
  17. * Created by wushengran on 2019/9/17 16:27
  18. */
  19. object EsSinkTest {
  20. def main(args: Array[String]): Unit = {
  21. val env = StreamExecutionEnvironment.getExecutionEnvironment
  22. env.setParallelism(1)
  23. // source
  24. val inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
  25. // transform
  26. val dataStream = inputStream
  27. .map(
  28. data => {
  29. val dataArray = data.split(",")
  30. SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
  31. }
  32. )
  33. val httpHosts = new util.ArrayList[HttpHost]()
  34. httpHosts.add(new HttpHost("localhost", 9200))
  35. // 创建一个esSink 的builder
  36. val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](
  37. httpHosts,
  38. new ElasticsearchSinkFunction[SensorReading] {
  39. override def process(element: SensorReading, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
  40. println("saving data: " + element)
  41. // 包装成一个Map或者JsonObject
  42. val json = new util.HashMap[String, String]()
  43. json.put("sensor_id", element.id)
  44. json.put("temperature", element.temperature.toString)
  45. json.put("ts", element.timestamp.toString)
  46. // 创建index request,准备发送数据
  47. val indexRequest = Requests.indexRequest()
  48. .index("sensor")
  49. .`type`("readingdata")
  50. .source(json)
  51. // 利用index发送请求,写入数据
  52. indexer.add(indexRequest)
  53. println("data saved.")
  54. }
  55. }
  56. )
  57. // sink
  58. dataStream.addSink( esSinkBuilder.build() )
  59. env.execute("es sink test")
  60. }
  61. }

JdbcSinkTest.scala

  1. package com.atguigu.apitest.sinktest
  2. import java.sql.{Connection, DriverManager, PreparedStatement}
  3. import com.atguigu.apitest.SensorReading
  4. import org.apache.flink.configuration.Configuration
  5. import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
  6. import org.apache.flink.streaming.api.scala._
  7. /**
  8. * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
  9. *
  10. * Project: FlinkTutorial
  11. * Package: com.atguigu.apitest.sinktest
  12. * Version: 1.0
  13. *
  14. * Created by wushengran on 2019/9/17 16:44
  15. */
  16. object JdbcSinkTest {
  17. def main(args: Array[String]): Unit = {
  18. val env = StreamExecutionEnvironment.getExecutionEnvironment
  19. env.setParallelism(1)
  20. // source
  21. val inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
  22. // transform
  23. val dataStream = inputStream
  24. .map(
  25. data => {
  26. val dataArray = data.split(",")
  27. SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
  28. }
  29. )
  30. // sink
  31. dataStream.addSink( new MyJdbcSink() )
  32. env.execute("jdbc sink test")
  33. }
  34. }
  35. class MyJdbcSink() extends RichSinkFunction[SensorReading]{
  36. // 定义sql连接、预编译器
  37. var conn: Connection = _
  38. var insertStmt: PreparedStatement = _
  39. var updateStmt: PreparedStatement = _
  40. // 初始化,创建连接和预编译语句
  41. override def open(parameters: Configuration): Unit = {
  42. super.open(parameters)
  43. conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")
  44. insertStmt = conn.prepareStatement("INSERT INTO temperatures (sensor, temp) VALUES (?,?)")
  45. updateStmt = conn.prepareStatement("UPDATE temperatures SET temp = ? WHERE sensor = ?")
  46. }
  47. // 调用连接,执行sql
  48. override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
  49. // 执行更新语句
  50. updateStmt.setDouble(1, value.temperature)
  51. updateStmt.setString(2, value.id)
  52. updateStmt.execute()
  53. // 如果update没有查到数据,那么执行插入语句
  54. if( updateStmt.getUpdateCount == 0 ){
  55. insertStmt.setString(1, value.id)
  56. insertStmt.setDouble(2, value.temperature)
  57. insertStmt.execute()
  58. }
  59. }
  60. // 关闭时做清理工作
  61. override def close(): Unit = {
  62. insertStmt.close()
  63. updateStmt.close()
  64. conn.close()
  65. }
  66. }

KafkaSinkTest.scala

  1. package com.atguigu.apitest.sinktest
  2. import java.util.Properties
  3. import com.atguigu.apitest.SensorReading
  4. import org.apache.flink.api.common.serialization.SimpleStringSchema
  5. import org.apache.flink.streaming.api.scala._
  6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic
  7. import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
  8. /**
  9. * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
  10. *
  11. * Project: FlinkTutorial
  12. * Package: com.atguigu.apitest.sinktest
  13. * Version: 1.0
  14. *
  15. * Created by wushengran on 2019/9/17 15:43
  16. */
  17. object KafkaSinkTest {
  18. def main(args: Array[String]): Unit = {
  19. val env = StreamExecutionEnvironment.getExecutionEnvironment
  20. env.setParallelism(1)
  21. // source
  22. // val inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
  23. val properties = new Properties()
  24. properties.setProperty("bootstrap.servers", "localhost:9092")
  25. properties.setProperty("group.id", "consumer-group")
  26. properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  27. properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  28. properties.setProperty("auto.offset.reset", "latest")
  29. val inputStream = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
  30. // Transform操作
  31. val dataStream = inputStream
  32. .map(
  33. data => {
  34. val dataArray = data.split(",")
  35. SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ).toString // 转成String方便序列化输出
  36. }
  37. )
  38. // sink
  39. dataStream.addSink( new FlinkKafkaProducer011[String]( "sinkTest", new SimpleStringSchema(), properties) )
  40. dataStream.print()
  41. env.execute("kafka sink test")
  42. }
  43. }

RedisSinkTest.scala

  1. package com.atguigu.apitest.sinktest
  2. import com.atguigu.apitest.SensorReading
  3. import org.apache.flink.streaming.api.scala._
  4. import org.apache.flink.streaming.connectors.redis.RedisSink
  5. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
  6. import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
  7. /**
  8. * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
  9. *
  10. * Project: FlinkTutorial
  11. * Package: com.atguigu.apitest.sinktest
  12. * Version: 1.0
  13. *
  14. * Created by wushengran on 2019/9/17 16:12
  15. */
  16. object RedisSinkTest {
  17. def main(args: Array[String]): Unit = {
  18. val env = StreamExecutionEnvironment.getExecutionEnvironment
  19. env.setParallelism(1)
  20. // source
  21. val inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
  22. // transform
  23. val dataStream = inputStream
  24. .map(
  25. data => {
  26. val dataArray = data.split(",")
  27. SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
  28. }
  29. )
  30. val conf = new FlinkJedisPoolConfig.Builder()
  31. .setHost("localhost")
  32. .setPort(6379)
  33. .build()
  34. // sink
  35. dataStream.addSink( new RedisSink(conf, new MyRedisMapper()) )
  36. env.execute("redis sink test")
  37. }
  38. }
  39. class MyRedisMapper() extends RedisMapper[SensorReading]{
  40. // 定义保存数据到redis的命令
  41. override def getCommandDescription: RedisCommandDescription = {
  42. // 把传感器id和温度值保存成哈希表 HSET key field value
  43. new RedisCommandDescription( RedisCommand.HSET, "sensor_temperature" )
  44. }
  45. // 定义保存到redis的value
  46. override def getValueFromData(t: SensorReading): String = t.temperature.toString
  47. // 定义保存到redis的key
  48. override def getKeyFromData(t: SensorReading): String = t.id
  49. }