Flink没有类似于spark中的foreach方法,让用户可以进行迭代操作。虽有对外的输出操作都要利用Sink完成。Sink的功能就是负责把flink处理后的数据输出到外部系统中。
Sink的操作,官方和第三方提供了多种sink源,如:dataStream.addSink( new Sink ) 即可给数据添加一个Sink。
package datasourceimport org.apache.flink.api.common.serialization.SimpleStringEncoderimport org.apache.flink.core.fs.Pathimport org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource}import org.apache.flink.streaming.api.environment._import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkobject SinkDemo {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val inputStream: DataStreamSource[String] = env.readTextFile("C:\\ideaworkshop\\FlinkDemo\\LearnFirst\\src\\main\\resources\\sensor.txt")val dataStream:DataStream[SensorReading] = inputStream.map(data=>{val rs = data.split(",")SensorReading(rs(0).trim.toString,rs(1).trim.toLong,rs(2).trim.toDouble)})//第一种sink,直接打印在控制台dataStream.print()//第二种sink,写入到文件dataStream.writeAsText("C:\\ideaworkshop\\FlinkDemo\\LearnFirst\\src\\main\\resources\\out.csv")//第三种sink,写到文件val sink: StreamingFileSink[SensorReading] = StreamingFileSink.forRowFormat(new Path("C:\\ideaworkshop\\FlinkDemo\\LearnFirst\\src\\main\\resources\\output"),new SimpleStringEncoder[SensorReading]("UTF-8")) // 所有数据都写到同一个路径.build()dataStream.addSink(sink)//第四种sink,kafkadataStream.addSink(new FlinkKafkaProducer011[String]("localhost:9092","sinktest",new SimpleStringSchema()))env.execute("Sink Test")}}
第三个Sink说明:StreamingFileSink 有两个方法可以输出到文件 forRowFormat 和 forBulkFormat,名字差不多代表的方法的含义:行编码格式和块编码格式;forRowFormat 比较简单,只提供了 SimpleStringEncoder 写文本文件,可以指定编码。
自定义Sink
/*** 自定义的Sink*/class MyJDBCSink() extends RichSinkFunction{//定义SQL转换、预编译var conn : Connection = _var insertStat : PreparedStatement = _var updateStat : PreparedStatement = _/*** 初始化参数和预编译sql语句* @param parameters*/override def open(parameters: Configuration): Unit = {super.open(parameters)conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","user","password")insertStat = conn.prepareStatement("")updateStat = conn.prepareStatement("")}/*** 调用连接器,执行sql* @param value 可以指定数据类型(泛型)* @param context*/def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {updateStat.setDouble(1,value.temperature)val i = updateStat.executeUpdate()if(i == 0){insertStat.setDouble(1,value.temperature)insertStat.execute()}}override def close(): Unit = {insertStat.close()updateStat.close()conn.close()}}
