hello.txtsensor.txt
pom.xml 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu</groupId>
<artifactId>FlinkTutorial</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.7.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 该插件用于将Scala代码编译成class文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<!-- 声明绑定到maven的compile阶段 -->
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
WordCount.scala
package com.atguigu.wc
import org.apache.flink.api.scala._
/**
* Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
*
* Project: FlinkTutorial
* Package: com.atguigu.wc
* Version: 1.0
*
* Created by wushengran on 2019/9/16 11:48
*/
// 批处理代码
object WordCount {
def main(args: Array[String]): Unit = {
// 创建一个批处理的执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 从文件中读取数据
val inputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\hello.txt"
val inputDataSet = env.readTextFile(inputPath)
// 分词之后做count
val wordCountDataSet = inputDataSet.flatMap(_.split(" "))
.map( (_, 1) )
.groupBy(0)
.sum(1)
// 打印输出
wordCountDataSet.print()
}
}
StreamWordCount.scala
package com.atguigu.wc
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
/**
* Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
*
* Project: FlinkTutorial
* Package: com.atguigu.wc
* Version: 1.0
*
* Created by wushengran on 2019/9/16 14:08
*/
object StreamWordCount {
def main(args: Array[String]): Unit = {
val params = ParameterTool.fromArgs(args)
val host: String = params.get("host")
val port: Int = params.getInt("port")
// 创建一个流处理的执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// env.setParallelism(1)
// env.disableOperatorChaining()
// 接收socket数据流
val textDataStream = env.socketTextStream(host, port)
// 逐一读取数据,分词之后进行wordcount
val wordCountDataStream = textDataStream.flatMap(_.split("\\s"))
.filter(_.nonEmpty).startNewChain()
.map( (_, 1) )
.keyBy(0)
.sum(1)
// 打印输出
wordCountDataStream.print().setParallelism(1)
// 执行任务
env.execute("stream word count job")
}
}
SourceTest.scala
package com.atguigu.apitest
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import scala.util.Random
/**
* Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
*
* Project: FlinkTutorial
* Package: com.atguigu.apitest
* Version: 1.0
*
* Created by wushengran on 2019/9/17 10:11
*/
// 定义传感器数据样例类
case class SensorReading( id: String, timestamp: Long, temperature: Double )
object SourceTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 1. 从集合中读取数据
val stream1 = env.fromCollection(List(
SensorReading("sensor_1", 1547718199, 35.80018327300259),
SensorReading("sensor_6", 1547718201, 15.402984393403084),
SensorReading("sensor_7", 1547718202, 6.720945201171228),
SensorReading("sensor_10", 1547718205, 38.101067604893444)
))
// env.fromElements("flink", 1, 32, 3213, 0.324).print("test")
// 2. 从文件中读取数据
val stream2 = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
// 3. 从kafka中读取数据
// 创建kafka相关的配置
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
// 4. 自定义数据源
val stream4 = env.addSource(new SensorSource())
// sink输出
stream4.print("stream4")
env.execute("source api test")
}
}
class SensorSource() extends SourceFunction[SensorReading]{
// 定义一个flag:表示数据源是否还在正常运行
var running: Boolean = true
override def cancel(): Unit = running = false
override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
// 创建一个随机数发生器
val rand = new Random()
// 随机初始换生成10个传感器的温度数据,之后在它基础随机波动生成流数据
var curTemp = 1.to(10).map(
i => ( "sensor_" + i, 60 + rand.nextGaussian() * 20 )
)
// 无限循环生成流数据,除非被cancel
while(running){
// 更新温度值
curTemp = curTemp.map(
t => (t._1, t._2 + rand.nextGaussian())
)
// 获取当前的时间戳
val curTime = System.currentTimeMillis()
// 包装成SensorReading,输出
curTemp.foreach(
t => ctx.collect( SensorReading(t._1, curTime, t._2) )
)
// 间隔100ms
Thread.sleep(100)
}
}
}
TransformTest.scala
package com.atguigu.apitest
import org.apache.flink.api.common.functions.{FilterFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
/**
* Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
*
* Project: FlinkTutorial
* Package: com.atguigu.apitest
* Version: 1.0
*
* Created by wushengran on 2019/9/17 11:41
*/
object TransformTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 读入数据
val inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
// Transform操作
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
}
)
// 1. 聚合操作
val stream1 = dataStream
.keyBy("id")
// .sum("temperature")
.reduce( (x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature + 10) )
// 2. 分流,根据温度是否大于30度划分
val splitStream = dataStream
.split( sensorData => {
if( sensorData.temperature > 30 ) Seq("high") else Seq("low")
} )
val highTempStream = splitStream.select("high")
val lowTempStream = splitStream.select("low")
val allTempStream = splitStream.select("high", "low")
// 3. 合并两条流
val warningStream = highTempStream.map( sensorData => (sensorData.id, sensorData.temperature) )
val connectedStreams = warningStream.connect(lowTempStream)
val coMapStream = connectedStreams.map(
warningData => ( warningData._1, warningData._2, "high temperature warning" ),
lowData => ( lowData.id, "healthy" )
)
val unionStream = highTempStream.union(lowTempStream)
// 函数类
dataStream.filter( new MyFilter() ).print()
// 输出数据
// dataStream.print()
// highTempStream.print("high")
// lowTempStream.print("low")
// allTempStream.print("all")
// unionStream.print("union")
env.execute("transform test job")
}
}
class MyFilter() extends FilterFunction[SensorReading]{
override def filter(value: SensorReading): Boolean = {
value.id.startsWith("sensor_1")
}
}
class MyMapper() extends RichMapFunction[SensorReading, String]{
override def map(value: SensorReading): String = {
"flink"
}
override def open(parameters: Configuration): Unit = super.open(parameters)
}
WindowTest.scala
package com.atguigu.apitest
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks, KeyedProcessFunction}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/**
* Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
*
* Project: FlinkTutorial
* Package: com.atguigu.apitest
* Version: 1.0
*
* Created by wushengran on 2019/9/18 9:31
*/
object WindowTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 设置事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(500)
// 读入数据
// val inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
val inputStream = env.socketTextStream("localhost", 7777)
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}
)
// .assignAscendingTimestamps(_.timestamp * 1000L)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
})
// .assignTimestampsAndWatermarks( new MyAssigner() )
.map(data => (data.id, data.temperature))
.keyBy(_._1)
// .process( new MyProcess() )
.timeWindow(Time.seconds(10), Time.seconds(3))
.reduce((result, data) => (data._1, result._2.min(data._2))) // 统计10秒内的最低温度值
dataStream.print()
env.execute("window api test")
}
}
class MyAssigner() extends AssignerWithPeriodicWatermarks[SensorReading]{
// 定义固定延迟为3秒
val bound: Long = 3 * 1000L
// 定义当前收到的最大的时间戳
var maxTs: Long = Long.MinValue
override def getCurrentWatermark: Watermark = {
new Watermark(maxTs - bound)
}
override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
maxTs = maxTs.max(element.timestamp * 1000L)
element.timestamp * 1000L
}
}
class MyAssigner2() extends AssignerWithPunctuatedWatermarks[SensorReading]{
val bound: Long = 1000L
override def checkAndGetNextWatermark(lastElement: SensorReading, extractedTimestamp: Long): Watermark = {
if( lastElement.id == "sensor_1" ){
new Watermark(extractedTimestamp - bound)
}else{
null
}
}
override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
element.timestamp * 1000L
}
}
SideOutputTest.scala
package com.atguigu.apitest
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/**
* Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
*
* Project: FlinkTutorial
* Package: com.atguigu.apitest
* Version: 1.0
*
* Created by wushengran on 2019/8/24 11:16
*/
object SideOutputTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.socketTextStream("localhost", 7777)
val dataStream = stream.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
})
.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[SensorReading]( Time.seconds(1) ) {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000
} )
val processedStream = dataStream
.process( new FreezingAlert() )
// dataStream.print("input data")
processedStream.print("processed data")
processedStream.getSideOutput( new OutputTag[String]("freezing alert") ).print("alert data")
env.execute("side output test")
}
}
// 冰点报警,如果小于32F,输出报警信息到侧输出流
class FreezingAlert() extends ProcessFunction[SensorReading, SensorReading]{
// lazy val alertOutput: OutputTag[String] = new OutputTag[String]( "freezing alert" )
override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
if( value.temperature < 32.0 ){
ctx.output( new OutputTag[String]( "freezing alert" ), "freezing alert for " + value.id )
}
out.collect( value )
}
}
ProcessFunctionTest.scala
package com.atguigu.apitest
import org.apache.flink.api.common.functions.{RichFlatMapFunction, RichMapFunction}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/**
* Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
*
* Project: FlinkTutorial
* Package: com.atguigu.apitest
* Version: 1.0
*
* Created by wushengran on 2019/8/24 10:14
*/
object ProcessFunctionTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(60000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
env.getCheckpointConfig.setCheckpointTimeout(100000)
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
// env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(100)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.seconds(300), org.apache.flink.api.common.time.Time.seconds(10)))
// env.setStateBackend( new RocksDBStateBackend("") )
val stream = env.socketTextStream("localhost", 7777)
val dataStream = stream.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
})
.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[SensorReading]( Time.seconds(1) ) {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000
} )
val processedStream = dataStream.keyBy(_.id)
.process( new TempIncreAlert() )
val processedStream2 = dataStream.keyBy(_.id)
// .process( new TempChangeAlert(10.0) )
.flatMap( new TempChangeAlert(10.0) )
val processedStream3 = dataStream.keyBy(_.id)
.flatMapWithState[(String, Double, Double), Double]{
// 如果没有状态的话,也就是没有数据来过,那么就将当前数据温度值存入状态
case ( input: SensorReading, None ) => ( List.empty, Some(input.temperature) )
// 如果有状态,就应该与上次的温度值比较差值,如果大于阈值就输出报警
case ( input: SensorReading, lastTemp: Some[Double] ) =>
val diff = ( input.temperature - lastTemp.get ).abs
if( diff > 10.0 ){
( List((input.id, lastTemp.get, input.temperature)), Some(input.temperature) )
} else
( List.empty, Some(input.temperature) )
}
dataStream.print("input data")
processedStream3.print("processed data")
env.execute("process function test")
}
}
class TempIncreAlert() extends KeyedProcessFunction[String, SensorReading, String]{
// 定义一个状态,用来保存上一个数据的温度值
lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double]("lastTemp", classOf[Double]) )
// 定义一个状态,用来保存定时器的时间戳
lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long]("currentTimer", classOf[Long]) )
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
// 先取出上一个温度值
val preTemp = lastTemp.value()
// 更新温度值
lastTemp.update( value.temperature )
val curTimerTs = currentTimer.value()
if( value.temperature < preTemp || preTemp == 0.0 ){
// 如果温度下降,或是第一条数据,删除定时器并清空状态
ctx.timerService().deleteProcessingTimeTimer( curTimerTs )
currentTimer.clear()
} else if ( value.temperature > preTemp && curTimerTs == 0 ){
// 温度上升且没有设过定时器,则注册定时器
val timerTs = ctx.timerService().currentProcessingTime() + 5000L
ctx.timerService().registerProcessingTimeTimer( timerTs )
currentTimer.update( timerTs )
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
// 输出报警信息
out.collect( ctx.getCurrentKey + " 温度连续上升" )
currentTimer.clear()
}
}
class TempChangeAlert(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)]{
private var lastTempState: ValueState[Double] = _
override def open(parameters: Configuration): Unit = {
// 初始化的时候声明state变量
lastTempState = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double]))
}
override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = {
// 获取上次的温度值
val lastTemp = lastTempState.value()
// 用当前的温度值和上次的求差,如果大于阈值,输出报警信息
val diff = (value.temperature - lastTemp).abs
if(diff > threshold){
out.collect( (value.id, lastTemp, value.temperature) )
}
lastTempState.update(value.temperature)
}
}
class TempChangeAlert2(threshold: Double) extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)]{
// 定义一个状态变量,保存上次的温度值
lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double]("lastTemp", classOf[Double]) )
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, (String, Double, Double)]#Context, out: Collector[(String, Double, Double)]): Unit = {
// 获取上次的温度值
val lastTemp = lastTempState.value()
// 用当前的温度值和上次的求差,如果大于阈值,输出报警信息
val diff = (value.temperature - lastTemp).abs
if(diff > threshold){
out.collect( (value.id, lastTemp, value.temperature) )
}
lastTempState.update(value.temperature)
}
}
EsSinkTest.scala
package com.atguigu.apitest.sinktest
import java.util
import com.atguigu.apitest.SensorReading
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
/**
* Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
*
* Project: FlinkTutorial
* Package: com.atguigu.apitest.sinktest
* Version: 1.0
*
* Created by wushengran on 2019/9/17 16:27
*/
object EsSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// source
val inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
// transform
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
}
)
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("localhost", 9200))
// 创建一个esSink 的builder
val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](
httpHosts,
new ElasticsearchSinkFunction[SensorReading] {
override def process(element: SensorReading, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
println("saving data: " + element)
// 包装成一个Map或者JsonObject
val json = new util.HashMap[String, String]()
json.put("sensor_id", element.id)
json.put("temperature", element.temperature.toString)
json.put("ts", element.timestamp.toString)
// 创建index request,准备发送数据
val indexRequest = Requests.indexRequest()
.index("sensor")
.`type`("readingdata")
.source(json)
// 利用index发送请求,写入数据
indexer.add(indexRequest)
println("data saved.")
}
}
)
// sink
dataStream.addSink( esSinkBuilder.build() )
env.execute("es sink test")
}
}
JdbcSinkTest.scala
package com.atguigu.apitest.sinktest
import java.sql.{Connection, DriverManager, PreparedStatement}
import com.atguigu.apitest.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
/**
* Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
*
* Project: FlinkTutorial
* Package: com.atguigu.apitest.sinktest
* Version: 1.0
*
* Created by wushengran on 2019/9/17 16:44
*/
object JdbcSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// source
val inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
// transform
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}
)
// sink
dataStream.addSink( new MyJdbcSink() )
env.execute("jdbc sink test")
}
}
class MyJdbcSink() extends RichSinkFunction[SensorReading]{
// 定义sql连接、预编译器
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _
// 初始化,创建连接和预编译语句
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")
insertStmt = conn.prepareStatement("INSERT INTO temperatures (sensor, temp) VALUES (?,?)")
updateStmt = conn.prepareStatement("UPDATE temperatures SET temp = ? WHERE sensor = ?")
}
// 调用连接,执行sql
override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
// 执行更新语句
updateStmt.setDouble(1, value.temperature)
updateStmt.setString(2, value.id)
updateStmt.execute()
// 如果update没有查到数据,那么执行插入语句
if( updateStmt.getUpdateCount == 0 ){
insertStmt.setString(1, value.id)
insertStmt.setDouble(2, value.temperature)
insertStmt.execute()
}
}
// 关闭时做清理工作
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}
KafkaSinkTest.scala
package com.atguigu.apitest.sinktest
import java.util.Properties
import com.atguigu.apitest.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
/**
* Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
*
* Project: FlinkTutorial
* Package: com.atguigu.apitest.sinktest
* Version: 1.0
*
* Created by wushengran on 2019/9/17 15:43
*/
object KafkaSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// source
// val inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val inputStream = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
// Transform操作
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ).toString // 转成String方便序列化输出
}
)
// sink
dataStream.addSink( new FlinkKafkaProducer011[String]( "sinkTest", new SimpleStringSchema(), properties) )
dataStream.print()
env.execute("kafka sink test")
}
}
RedisSinkTest.scala
package com.atguigu.apitest.sinktest
import com.atguigu.apitest.SensorReading
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
/**
* Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
*
* Project: FlinkTutorial
* Package: com.atguigu.apitest.sinktest
* Version: 1.0
*
* Created by wushengran on 2019/9/17 16:12
*/
object RedisSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// source
val inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")
// transform
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
}
)
val conf = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build()
// sink
dataStream.addSink( new RedisSink(conf, new MyRedisMapper()) )
env.execute("redis sink test")
}
}
class MyRedisMapper() extends RedisMapper[SensorReading]{
// 定义保存数据到redis的命令
override def getCommandDescription: RedisCommandDescription = {
// 把传感器id和温度值保存成哈希表 HSET key field value
new RedisCommandDescription( RedisCommand.HSET, "sensor_temperature" )
}
// 定义保存到redis的value
override def getValueFromData(t: SensorReading): String = t.temperature.toString
// 定义保存到redis的key
override def getKeyFromData(t: SensorReading): String = t.id
}