Source 就是数据的来源,Transform 其实就是 Flink 干的事情,可以做一系列的操作,操作完后就把计算后的数据结果 Sink 到某个地方。(可以是 MySQL、ElasticSearch、Kafka、Cassandra 等)。这个 sink 的意思也不一定非得说成要把数据存储到某个地方去。其实官网用的 Connector 来形容要去的地方更合适,这个 Connector 可以有 MySQL、ElasticSearch、Kafka、Cassandra RabbitMQ 等。
一、以 PrintSinkFunction 为例
1.1、实现
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.streaming.api.functions.sink;import org.apache.flink.annotation.PublicEvolving;import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;/*** Implementation of the SinkFunction writing every tuple to the standard* output or standard error stream.** <p>* Four possible format options:* {@code sinkIdentifier}:taskId> output <- {@code sinkIdentifier} provided, parallelism > 1* {@code sinkIdentifier}> output <- {@code sinkIdentifier} provided, parallelism == 1* taskId> output <- no {@code sinkIdentifier} provided, parallelism > 1* output <- no {@code sinkIdentifier} provided, parallelism == 1* </p>** @param <IN> Input record type*/@PublicEvolvingpublic class PrintSinkFunction<IN> extends RichSinkFunction<IN> {private static final long serialVersionUID = 1L;private final PrintSinkOutputWriter<IN> writer;/*** Instantiates a print sink function that prints to standard out.*/public PrintSinkFunction() {writer = new PrintSinkOutputWriter<>(false);}/*** Instantiates a print sink function that prints to standard out.** @param stdErr True, if the format should print to standard error instead of standard out.*/public PrintSinkFunction(final boolean stdErr) {writer = new PrintSinkOutputWriter<>(stdErr);}/*** Instantiates a print sink function that prints to standard out and gives a sink identifier.** @param stdErr True, if the format should print to standard error instead of standard out.* @param sinkIdentifier Message that identify sink and is prefixed to the output of the value*/public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) {writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr);}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());}@Overridepublic void invoke(IN record) {writer.write(record);}@Overridepublic String toString() {return writer.toString();}}
可以看到它就是实现了 RichSinkFunction 抽象类,然后实现了 invoke 方法,这里 invoke 方法就是把记录打印出来了就是,没做其他的额外操作。
1.2、使用
package com.wells.flink.demo.sink;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;import org.apache.flink.util.Collector;/*** Description* Created by wells on 2020-05-07 23:31:31*/public class PrintSinkTest {public static void main(String[] args) throws Exception {String host = "127.0.0.1";int port = 8090;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream(host, port);SingleOutputStreamOperator<Tuple2<String, Integer>> counts = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] words = line.split(" ");for (String word : words) {collector.collect(new Tuple2<String, Integer>(word, 1));}}}).keyBy(0).sum(1);// counts.print().setParallelism(1);// 这个效果与上面的 print 是一样的counts.addSink(new PrintSinkFunction<>());env.execute();}}
Flink目前支持的Sink如下:
相关组件支持(需要导入不同的组件依赖包):
