Flink没有类似于spark中的foreach方法,让用户可以进行迭代操作。虽有对外的输出操作都要利用Sink完成。Sink的功能就是负责把flink处理后的数据输出到外部系统中。
    Sink的操作,官方第三方提供了多种sink源,如:dataStream.addSink( new Sink ) 即可给数据添加一个Sink。

    1. package datasource
    2. import org.apache.flink.api.common.serialization.SimpleStringEncoder
    3. import org.apache.flink.core.fs.Path
    4. import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource}
    5. import org.apache.flink.streaming.api.environment._
    6. import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
    7. object SinkDemo {
    8. def main(args: Array[String]): Unit = {
    9. val env = StreamExecutionEnvironment.getExecutionEnvironment
    10. env.setParallelism(1)
    11. val inputStream: DataStreamSource[String] = env.readTextFile("C:\\ideaworkshop\\FlinkDemo\\LearnFirst\\src\\main\\resources\\sensor.txt")
    12. val dataStream:DataStream[SensorReading] = inputStream.map(data=>{
    13. val rs = data.split(",")
    14. SensorReading(rs(0).trim.toString,rs(1).trim.toLong,rs(2).trim.toDouble)
    15. })
    16. //第一种sink,直接打印在控制台
    17. dataStream.print()
    18. //第二种sink,写入到文件
    19. dataStream.writeAsText("C:\\ideaworkshop\\FlinkDemo\\LearnFirst\\src\\main\\resources\\out.csv")
    20. //第三种sink,写到文件
    21. val sink: StreamingFileSink[SensorReading] = StreamingFileSink.forRowFormat(
    22. new Path("C:\\ideaworkshop\\FlinkDemo\\LearnFirst\\src\\main\\resources\\output"),
    23. new SimpleStringEncoder[SensorReading]("UTF-8")) // 所有数据都写到同一个路径
    24. .build()
    25. dataStream.addSink(sink)
    26. //第四种sink,kafka
    27. dataStream.addSink(new FlinkKafkaProducer011[String]("localhost:9092","sinktest",new SimpleStringSchema()))
    28. env.execute("Sink Test")
    29. }
    30. }

    第三个Sink说明:StreamingFileSink 有两个方法可以输出到文件 forRowFormat 和 forBulkFormat,名字差不多代表的方法的含义:行编码格式和块编码格式;forRowFormat 比较简单,只提供了 SimpleStringEncoder 写文本文件,可以指定编码。

    自定义Sink

    1. /**
    2. * 自定义的Sink
    3. */
    4. class MyJDBCSink() extends RichSinkFunction{
    5. //定义SQL转换、预编译
    6. var conn : Connection = _
    7. var insertStat : PreparedStatement = _
    8. var updateStat : PreparedStatement = _
    9. /**
    10. * 初始化参数和预编译sql语句
    11. * @param parameters
    12. */
    13. override def open(parameters: Configuration): Unit = {
    14. super.open(parameters)
    15. conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","user","password")
    16. insertStat = conn.prepareStatement("")
    17. updateStat = conn.prepareStatement("")
    18. }
    19. /**
    20. * 调用连接器,执行sql
    21. * @param value 可以指定数据类型(泛型)
    22. * @param context
    23. */
    24. def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
    25. updateStat.setDouble(1,value.temperature)
    26. val i = updateStat.executeUpdate()
    27. if(i == 0){
    28. insertStat.setDouble(1,value.temperature)
    29. insertStat.execute()
    30. }
    31. }
    32. override def close(): Unit = {
    33. insertStat.close()
    34. updateStat.close()
    35. conn.close()
    36. }
    37. }