Source 就是数据的来源,Transform 其实就是 Flink 干的事情,可以做一系列的操作,操作完后就把计算后的数据结果 Sink 到某个地方。(可以是 MySQL、ElasticSearch、Kafka、Cassandra 等)。这个 sink 的意思也不一定非得说成要把数据存储到某个地方去。其实官网用的 Connector 来形容要去的地方更合适,这个 Connector 可以有 MySQL、ElasticSearch、Kafka、Cassandra RabbitMQ 等。
一、以 PrintSinkFunction 为例
1.1、实现
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.functions.sink;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
/**
* Implementation of the SinkFunction writing every tuple to the standard
* output or standard error stream.
*
* <p>
* Four possible format options:
* {@code sinkIdentifier}:taskId> output <- {@code sinkIdentifier} provided, parallelism > 1
* {@code sinkIdentifier}> output <- {@code sinkIdentifier} provided, parallelism == 1
* taskId> output <- no {@code sinkIdentifier} provided, parallelism > 1
* output <- no {@code sinkIdentifier} provided, parallelism == 1
* </p>
*
* @param <IN> Input record type
*/
@PublicEvolving
public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
private static final long serialVersionUID = 1L;
private final PrintSinkOutputWriter<IN> writer;
/**
* Instantiates a print sink function that prints to standard out.
*/
public PrintSinkFunction() {
writer = new PrintSinkOutputWriter<>(false);
}
/**
* Instantiates a print sink function that prints to standard out.
*
* @param stdErr True, if the format should print to standard error instead of standard out.
*/
public PrintSinkFunction(final boolean stdErr) {
writer = new PrintSinkOutputWriter<>(stdErr);
}
/**
* Instantiates a print sink function that prints to standard out and gives a sink identifier.
*
* @param stdErr True, if the format should print to standard error instead of standard out.
* @param sinkIdentifier Message that identify sink and is prefixed to the output of the value
*/
public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) {
writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
}
@Override
public void invoke(IN record) {
writer.write(record);
}
@Override
public String toString() {
return writer.toString();
}
}
可以看到它就是实现了 RichSinkFunction 抽象类,然后实现了 invoke 方法,这里 invoke 方法就是把记录打印出来了就是,没做其他的额外操作。
1.2、使用
package com.wells.flink.demo.sink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.util.Collector;
/**
* Description
* Created by wells on 2020-05-07 23:31:31
*/
public class PrintSinkTest {
public static void main(String[] args) throws Exception {
String host = "127.0.0.1";
int port = 8090;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream(host, port);
SingleOutputStreamOperator<Tuple2<String, Integer>> counts = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
collector.collect(new Tuple2<String, Integer>(word, 1));
}
}
}).keyBy(0).sum(1);
// counts.print().setParallelism(1);
// 这个效果与上面的 print 是一样的
counts.addSink(new PrintSinkFunction<>());
env.execute();
}
}
Flink目前支持的Sink如下:
相关组件支持(需要导入不同的组件依赖包):