Flink没有类似于spark中的foreach方法,让用户可以进行迭代操作。虽有对外的输出操作都要利用Sink完成。Sink的功能就是负责把flink处理后的数据输出到外部系统中。
Sink的操作,官方和第三方提供了多种sink源,如:dataStream.addSink( new Sink ) 即可给数据添加一个Sink。
package datasource
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource}
import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
object SinkDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream: DataStreamSource[String] = env.readTextFile("C:\\ideaworkshop\\FlinkDemo\\LearnFirst\\src\\main\\resources\\sensor.txt")
val dataStream:DataStream[SensorReading] = inputStream.map(data=>{
val rs = data.split(",")
SensorReading(rs(0).trim.toString,rs(1).trim.toLong,rs(2).trim.toDouble)
})
//第一种sink,直接打印在控制台
dataStream.print()
//第二种sink,写入到文件
dataStream.writeAsText("C:\\ideaworkshop\\FlinkDemo\\LearnFirst\\src\\main\\resources\\out.csv")
//第三种sink,写到文件
val sink: StreamingFileSink[SensorReading] = StreamingFileSink.forRowFormat(
new Path("C:\\ideaworkshop\\FlinkDemo\\LearnFirst\\src\\main\\resources\\output"),
new SimpleStringEncoder[SensorReading]("UTF-8")) // 所有数据都写到同一个路径
.build()
dataStream.addSink(sink)
//第四种sink,kafka
dataStream.addSink(new FlinkKafkaProducer011[String]("localhost:9092","sinktest",new SimpleStringSchema()))
env.execute("Sink Test")
}
}
第三个Sink说明:StreamingFileSink 有两个方法可以输出到文件 forRowFormat 和 forBulkFormat,名字差不多代表的方法的含义:行编码格式和块编码格式;forRowFormat 比较简单,只提供了 SimpleStringEncoder 写文本文件,可以指定编码。
自定义Sink
/**
* 自定义的Sink
*/
class MyJDBCSink() extends RichSinkFunction{
//定义SQL转换、预编译
var conn : Connection = _
var insertStat : PreparedStatement = _
var updateStat : PreparedStatement = _
/**
* 初始化参数和预编译sql语句
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","user","password")
insertStat = conn.prepareStatement("")
updateStat = conn.prepareStatement("")
}
/**
* 调用连接器,执行sql
* @param value 可以指定数据类型(泛型)
* @param context
*/
def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
updateStat.setDouble(1,value.temperature)
val i = updateStat.executeUpdate()
if(i == 0){
insertStat.setDouble(1,value.temperature)
insertStat.execute()
}
}
override def close(): Unit = {
insertStat.close()
updateStat.close()
conn.close()
}
}