hello.txtsensor.txt
pom.xml 文件
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.atguigu</groupId><artifactId>FlinkTutorial</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>1.7.2</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.7.2</version></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.11</artifactId><version>1.7.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId><version>1.7.2</version></dependency></dependencies><build><plugins><!-- 该插件用于将Scala代码编译成class文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.4.6</version><executions><execution><!-- 声明绑定到maven的compile阶段 --><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>
WordCount.scala
package com.atguigu.wcimport org.apache.flink.api.scala._/*** Copyright (c) 2018-2028 尚硅谷 All Rights Reserved** Project: FlinkTutorial* Package: com.atguigu.wc* Version: 1.0** Created by wushengran on 2019/9/16 11:48*/// 批处理代码object WordCount {def main(args: Array[String]): Unit = {// 创建一个批处理的执行环境val env = ExecutionEnvironment.getExecutionEnvironment// 从文件中读取数据val inputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\hello.txt"val inputDataSet = env.readTextFile(inputPath)// 分词之后做countval wordCountDataSet = inputDataSet.flatMap(_.split(" ")).map( (_, 1) ).groupBy(0).sum(1)// 打印输出wordCountDataSet.print()}}
StreamWordCount.scala
package com.atguigu.wcimport org.apache.flink.api.java.utils.ParameterToolimport org.apache.flink.streaming.api.scala._/*** Copyright (c) 2018-2028 尚硅谷 All Rights Reserved** Project: FlinkTutorial* Package: com.atguigu.wc* Version: 1.0** Created by wushengran on 2019/9/16 14:08*/object StreamWordCount {def main(args: Array[String]): Unit = {val params = ParameterTool.fromArgs(args)val host: String = params.get("host")val port: Int = params.getInt("port")// 创建一个流处理的执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment// env.setParallelism(1)// env.disableOperatorChaining()// 接收socket数据流val textDataStream = env.socketTextStream(host, port)// 逐一读取数据,分词之后进行wordcountval wordCountDataStream = textDataStream.flatMap(_.split("\\s")).filter(_.nonEmpty).startNewChain().map( (_, 1) ).keyBy(0).sum(1)// 打印输出wordCountDataStream.print().setParallelism(1)// 执行任务env.execute("stream word count job")}}
SourceTest.scala
package com.atguigu.apitestimport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.functions.source.SourceFunctionimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011import scala.util.Random/*** Copyright (c) 2018-2028 尚硅谷 All Rights Reserved** Project: FlinkTutorial* Package: com.atguigu.apitest* Version: 1.0** Created by wushengran on 2019/9/17 10:11*/// 定义传感器数据样例类case class SensorReading( id: String, timestamp: Long, temperature: Double )object SourceTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// 1. 从集合中读取数据val stream1 = env.fromCollection(List(SensorReading("sensor_1", 1547718199, 35.80018327300259),SensorReading("sensor_6", 1547718201, 15.402984393403084),SensorReading("sensor_7", 1547718202, 6.720945201171228),SensorReading("sensor_10", 1547718205, 38.101067604893444)))// env.fromElements("flink", 1, 32, 3213, 0.324).print("test")// 2. 从文件中读取数据val stream2 = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")// 3. 从kafka中读取数据// 创建kafka相关的配置val properties = new Properties()properties.setProperty("bootstrap.servers", "localhost:9092")properties.setProperty("group.id", "consumer-group")properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("auto.offset.reset", "latest")val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))// 4. 自定义数据源val stream4 = env.addSource(new SensorSource())// sink输出stream4.print("stream4")env.execute("source api test")}}class SensorSource() extends SourceFunction[SensorReading]{// 定义一个flag:表示数据源是否还在正常运行var running: Boolean = trueoverride def cancel(): Unit = running = falseoverride def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {// 创建一个随机数发生器val rand = new Random()// 随机初始换生成10个传感器的温度数据,之后在它基础随机波动生成流数据var curTemp = 1.to(10).map(i => ( "sensor_" + i, 60 + rand.nextGaussian() * 20 ))// 无限循环生成流数据,除非被cancelwhile(running){// 更新温度值curTemp = curTemp.map(t => (t._1, t._2 + rand.nextGaussian()))// 获取当前的时间戳val curTime = System.currentTimeMillis()// 包装成SensorReading,输出curTemp.foreach(t => ctx.collect( SensorReading(t._1, curTime, t._2) ))// 间隔100msThread.sleep(100)}}}
TransformTest.scala
package com.atguigu.apitestimport org.apache.flink.api.common.functions.{FilterFunction, RichMapFunction}import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.scala._/*** Copyright (c) 2018-2028 尚硅谷 All Rights Reserved** Project: FlinkTutorial* Package: com.atguigu.apitest* Version: 1.0** Created by wushengran on 2019/9/17 11:41*/object TransformTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// 读入数据val inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")// Transform操作val dataStream = inputStream.map(data => {val dataArray = data.split(",")SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )})// 1. 聚合操作val stream1 = dataStream.keyBy("id")// .sum("temperature").reduce( (x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature + 10) )// 2. 分流,根据温度是否大于30度划分val splitStream = dataStream.split( sensorData => {if( sensorData.temperature > 30 ) Seq("high") else Seq("low")} )val highTempStream = splitStream.select("high")val lowTempStream = splitStream.select("low")val allTempStream = splitStream.select("high", "low")// 3. 合并两条流val warningStream = highTempStream.map( sensorData => (sensorData.id, sensorData.temperature) )val connectedStreams = warningStream.connect(lowTempStream)val coMapStream = connectedStreams.map(warningData => ( warningData._1, warningData._2, "high temperature warning" ),lowData => ( lowData.id, "healthy" ))val unionStream = highTempStream.union(lowTempStream)// 函数类dataStream.filter( new MyFilter() ).print()// 输出数据// dataStream.print()// highTempStream.print("high")// lowTempStream.print("low")// allTempStream.print("all")// unionStream.print("union")env.execute("transform test job")}}class MyFilter() extends FilterFunction[SensorReading]{override def filter(value: SensorReading): Boolean = {value.id.startsWith("sensor_1")}}class MyMapper() extends RichMapFunction[SensorReading, String]{override def map(value: SensorReading): String = {"flink"}override def open(parameters: Configuration): Unit = super.open(parameters)}
WindowTest.scala
package com.atguigu.apitestimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks, KeyedProcessFunction}import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.watermark.Watermarkimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.util.Collector/*** Copyright (c) 2018-2028 尚硅谷 All Rights Reserved** Project: FlinkTutorial* Package: com.atguigu.apitest* Version: 1.0** Created by wushengran on 2019/9/18 9:31*/object WindowTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// 设置事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.getConfig.setAutoWatermarkInterval(500)// 读入数据// val inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")val inputStream = env.socketTextStream("localhost", 7777)val dataStream = inputStream.map(data => {val dataArray = data.split(",")SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)})// .assignAscendingTimestamps(_.timestamp * 1000L).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L})// .assignTimestampsAndWatermarks( new MyAssigner() ).map(data => (data.id, data.temperature)).keyBy(_._1)// .process( new MyProcess() ).timeWindow(Time.seconds(10), Time.seconds(3)).reduce((result, data) => (data._1, result._2.min(data._2))) // 统计10秒内的最低温度值dataStream.print()env.execute("window api test")}}class MyAssigner() extends AssignerWithPeriodicWatermarks[SensorReading]{// 定义固定延迟为3秒val bound: Long = 3 * 1000L// 定义当前收到的最大的时间戳var maxTs: Long = Long.MinValueoverride def getCurrentWatermark: Watermark = {new Watermark(maxTs - bound)}override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {maxTs = maxTs.max(element.timestamp * 1000L)element.timestamp * 1000L}}class MyAssigner2() extends AssignerWithPunctuatedWatermarks[SensorReading]{val bound: Long = 1000Loverride def checkAndGetNextWatermark(lastElement: SensorReading, extractedTimestamp: Long): Watermark = {if( lastElement.id == "sensor_1" ){new Watermark(extractedTimestamp - bound)}else{null}}override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {element.timestamp * 1000L}}
SideOutputTest.scala
package com.atguigu.apitestimport org.apache.flink.runtime.state.filesystem.FsStateBackendimport org.apache.flink.runtime.state.memory.MemoryStateBackendimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.ProcessFunctionimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.util.Collector/*** Copyright (c) 2018-2028 尚硅谷 All Rights Reserved** Project: FlinkTutorial* Package: com.atguigu.apitest* Version: 1.0** Created by wushengran on 2019/8/24 11:16*/object SideOutputTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = env.socketTextStream("localhost", 7777)val dataStream = stream.map(data => {val dataArray = data.split(",")SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)}).assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[SensorReading]( Time.seconds(1) ) {override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000} )val processedStream = dataStream.process( new FreezingAlert() )// dataStream.print("input data")processedStream.print("processed data")processedStream.getSideOutput( new OutputTag[String]("freezing alert") ).print("alert data")env.execute("side output test")}}// 冰点报警,如果小于32F,输出报警信息到侧输出流class FreezingAlert() extends ProcessFunction[SensorReading, SensorReading]{// lazy val alertOutput: OutputTag[String] = new OutputTag[String]( "freezing alert" )override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {if( value.temperature < 32.0 ){ctx.output( new OutputTag[String]( "freezing alert" ), "freezing alert for " + value.id )}out.collect( value )}}
ProcessFunctionTest.scala
package com.atguigu.apitestimport org.apache.flink.api.common.functions.{RichFlatMapFunction, RichMapFunction}import org.apache.flink.api.common.restartstrategy.RestartStrategiesimport org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}import org.apache.flink.configuration.Configurationimport org.apache.flink.contrib.streaming.state.RocksDBStateBackendimport org.apache.flink.runtime.state.filesystem.FsStateBackendimport org.apache.flink.runtime.state.memory.MemoryStateBackendimport org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanupimport org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}import org.apache.flink.streaming.api.functions.KeyedProcessFunctionimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.util.Collector/*** Copyright (c) 2018-2028 尚硅谷 All Rights Reserved** Project: FlinkTutorial* Package: com.atguigu.apitest* Version: 1.0** Created by wushengran on 2019/8/24 10:14*/object ProcessFunctionTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.enableCheckpointing(60000)env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)env.getCheckpointConfig.setCheckpointTimeout(100000)env.getCheckpointConfig.setFailOnCheckpointingErrors(false)// env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)env.getCheckpointConfig.setMinPauseBetweenCheckpoints(100)env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.seconds(300), org.apache.flink.api.common.time.Time.seconds(10)))// env.setStateBackend( new RocksDBStateBackend("") )val stream = env.socketTextStream("localhost", 7777)val dataStream = stream.map(data => {val dataArray = data.split(",")SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)}).assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[SensorReading]( Time.seconds(1) ) {override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000} )val processedStream = dataStream.keyBy(_.id).process( new TempIncreAlert() )val processedStream2 = dataStream.keyBy(_.id)// .process( new TempChangeAlert(10.0) ).flatMap( new TempChangeAlert(10.0) )val processedStream3 = dataStream.keyBy(_.id).flatMapWithState[(String, Double, Double), Double]{// 如果没有状态的话,也就是没有数据来过,那么就将当前数据温度值存入状态case ( input: SensorReading, None ) => ( List.empty, Some(input.temperature) )// 如果有状态,就应该与上次的温度值比较差值,如果大于阈值就输出报警case ( input: SensorReading, lastTemp: Some[Double] ) =>val diff = ( input.temperature - lastTemp.get ).absif( diff > 10.0 ){( List((input.id, lastTemp.get, input.temperature)), Some(input.temperature) )} else( List.empty, Some(input.temperature) )}dataStream.print("input data")processedStream3.print("processed data")env.execute("process function test")}}class TempIncreAlert() extends KeyedProcessFunction[String, SensorReading, String]{// 定义一个状态,用来保存上一个数据的温度值lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double]("lastTemp", classOf[Double]) )// 定义一个状态,用来保存定时器的时间戳lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long]("currentTimer", classOf[Long]) )override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {// 先取出上一个温度值val preTemp = lastTemp.value()// 更新温度值lastTemp.update( value.temperature )val curTimerTs = currentTimer.value()if( value.temperature < preTemp || preTemp == 0.0 ){// 如果温度下降,或是第一条数据,删除定时器并清空状态ctx.timerService().deleteProcessingTimeTimer( curTimerTs )currentTimer.clear()} else if ( value.temperature > preTemp && curTimerTs == 0 ){// 温度上升且没有设过定时器,则注册定时器val timerTs = ctx.timerService().currentProcessingTime() + 5000Lctx.timerService().registerProcessingTimeTimer( timerTs )currentTimer.update( timerTs )}}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {// 输出报警信息out.collect( ctx.getCurrentKey + " 温度连续上升" )currentTimer.clear()}}class TempChangeAlert(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)]{private var lastTempState: ValueState[Double] = _override def open(parameters: Configuration): Unit = {// 初始化的时候声明state变量lastTempState = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double]))}override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = {// 获取上次的温度值val lastTemp = lastTempState.value()// 用当前的温度值和上次的求差,如果大于阈值,输出报警信息val diff = (value.temperature - lastTemp).absif(diff > threshold){out.collect( (value.id, lastTemp, value.temperature) )}lastTempState.update(value.temperature)}}class TempChangeAlert2(threshold: Double) extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)]{// 定义一个状态变量,保存上次的温度值lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double]("lastTemp", classOf[Double]) )override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, (String, Double, Double)]#Context, out: Collector[(String, Double, Double)]): Unit = {// 获取上次的温度值val lastTemp = lastTempState.value()// 用当前的温度值和上次的求差,如果大于阈值,输出报警信息val diff = (value.temperature - lastTemp).absif(diff > threshold){out.collect( (value.id, lastTemp, value.temperature) )}lastTempState.update(value.temperature)}}
EsSinkTest.scala
package com.atguigu.apitest.sinktestimport java.utilimport com.atguigu.apitest.SensorReadingimport org.apache.flink.api.common.functions.RuntimeContextimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkimport org.apache.http.HttpHostimport org.elasticsearch.client.Requests/*** Copyright (c) 2018-2028 尚硅谷 All Rights Reserved** Project: FlinkTutorial* Package: com.atguigu.apitest.sinktest* Version: 1.0** Created by wushengran on 2019/9/17 16:27*/object EsSinkTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// sourceval inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")// transformval dataStream = inputStream.map(data => {val dataArray = data.split(",")SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )})val httpHosts = new util.ArrayList[HttpHost]()httpHosts.add(new HttpHost("localhost", 9200))// 创建一个esSink 的builderval esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](httpHosts,new ElasticsearchSinkFunction[SensorReading] {override def process(element: SensorReading, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {println("saving data: " + element)// 包装成一个Map或者JsonObjectval json = new util.HashMap[String, String]()json.put("sensor_id", element.id)json.put("temperature", element.temperature.toString)json.put("ts", element.timestamp.toString)// 创建index request,准备发送数据val indexRequest = Requests.indexRequest().index("sensor").`type`("readingdata").source(json)// 利用index发送请求,写入数据indexer.add(indexRequest)println("data saved.")}})// sinkdataStream.addSink( esSinkBuilder.build() )env.execute("es sink test")}}
JdbcSinkTest.scala
package com.atguigu.apitest.sinktestimport java.sql.{Connection, DriverManager, PreparedStatement}import com.atguigu.apitest.SensorReadingimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}import org.apache.flink.streaming.api.scala._/*** Copyright (c) 2018-2028 尚硅谷 All Rights Reserved** Project: FlinkTutorial* Package: com.atguigu.apitest.sinktest* Version: 1.0** Created by wushengran on 2019/9/17 16:44*/object JdbcSinkTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// sourceval inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")// transformval dataStream = inputStream.map(data => {val dataArray = data.split(",")SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)})// sinkdataStream.addSink( new MyJdbcSink() )env.execute("jdbc sink test")}}class MyJdbcSink() extends RichSinkFunction[SensorReading]{// 定义sql连接、预编译器var conn: Connection = _var insertStmt: PreparedStatement = _var updateStmt: PreparedStatement = _// 初始化,创建连接和预编译语句override def open(parameters: Configuration): Unit = {super.open(parameters)conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")insertStmt = conn.prepareStatement("INSERT INTO temperatures (sensor, temp) VALUES (?,?)")updateStmt = conn.prepareStatement("UPDATE temperatures SET temp = ? WHERE sensor = ?")}// 调用连接,执行sqloverride def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {// 执行更新语句updateStmt.setDouble(1, value.temperature)updateStmt.setString(2, value.id)updateStmt.execute()// 如果update没有查到数据,那么执行插入语句if( updateStmt.getUpdateCount == 0 ){insertStmt.setString(1, value.id)insertStmt.setDouble(2, value.temperature)insertStmt.execute()}}// 关闭时做清理工作override def close(): Unit = {insertStmt.close()updateStmt.close()conn.close()}}
KafkaSinkTest.scala
package com.atguigu.apitest.sinktestimport java.util.Propertiesimport com.atguigu.apitest.SensorReadingimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semanticimport org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}/*** Copyright (c) 2018-2028 尚硅谷 All Rights Reserved** Project: FlinkTutorial* Package: com.atguigu.apitest.sinktest* Version: 1.0** Created by wushengran on 2019/9/17 15:43*/object KafkaSinkTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// source// val inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")val properties = new Properties()properties.setProperty("bootstrap.servers", "localhost:9092")properties.setProperty("group.id", "consumer-group")properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("auto.offset.reset", "latest")val inputStream = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))// Transform操作val dataStream = inputStream.map(data => {val dataArray = data.split(",")SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ).toString // 转成String方便序列化输出})// sinkdataStream.addSink( new FlinkKafkaProducer011[String]( "sinkTest", new SimpleStringSchema(), properties) )dataStream.print()env.execute("kafka sink test")}}
RedisSinkTest.scala
package com.atguigu.apitest.sinktestimport com.atguigu.apitest.SensorReadingimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.redis.RedisSinkimport org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfigimport org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}/*** Copyright (c) 2018-2028 尚硅谷 All Rights Reserved** Project: FlinkTutorial* Package: com.atguigu.apitest.sinktest* Version: 1.0** Created by wushengran on 2019/9/17 16:12*/object RedisSinkTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// sourceval inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")// transformval dataStream = inputStream.map(data => {val dataArray = data.split(",")SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )})val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build()// sinkdataStream.addSink( new RedisSink(conf, new MyRedisMapper()) )env.execute("redis sink test")}}class MyRedisMapper() extends RedisMapper[SensorReading]{// 定义保存数据到redis的命令override def getCommandDescription: RedisCommandDescription = {// 把传感器id和温度值保存成哈希表 HSET key field valuenew RedisCommandDescription( RedisCommand.HSET, "sensor_temperature" )}// 定义保存到redis的valueoverride def getValueFromData(t: SensorReading): String = t.temperature.toString// 定义保存到redis的keyoverride def getKeyFromData(t: SensorReading): String = t.id}
