Source 就是数据的来源,Transform 其实就是 Flink 干的事情,可以做一系列的操作,操作完后就把计算后的数据结果 Sink 到某个地方。(可以是 MySQL、ElasticSearch、Kafka、Cassandra 等)。这个 sink 的意思也不一定非得说成要把数据存储到某个地方去。其实官网用的 Connector 来形容要去的地方更合适,这个 Connector 可以有 MySQL、ElasticSearch、Kafka、Cassandra RabbitMQ 等。

一、以 PrintSinkFunction 为例

1.1、实现

  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package org.apache.flink.streaming.api.functions.sink;
  18. import org.apache.flink.annotation.PublicEvolving;
  19. import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter;
  20. import org.apache.flink.configuration.Configuration;
  21. import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
  22. /**
  23. * Implementation of the SinkFunction writing every tuple to the standard
  24. * output or standard error stream.
  25. *
  26. * <p>
  27. * Four possible format options:
  28. * {@code sinkIdentifier}:taskId> output <- {@code sinkIdentifier} provided, parallelism > 1
  29. * {@code sinkIdentifier}> output <- {@code sinkIdentifier} provided, parallelism == 1
  30. * taskId> output <- no {@code sinkIdentifier} provided, parallelism > 1
  31. * output <- no {@code sinkIdentifier} provided, parallelism == 1
  32. * </p>
  33. *
  34. * @param <IN> Input record type
  35. */
  36. @PublicEvolving
  37. public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
  38. private static final long serialVersionUID = 1L;
  39. private final PrintSinkOutputWriter<IN> writer;
  40. /**
  41. * Instantiates a print sink function that prints to standard out.
  42. */
  43. public PrintSinkFunction() {
  44. writer = new PrintSinkOutputWriter<>(false);
  45. }
  46. /**
  47. * Instantiates a print sink function that prints to standard out.
  48. *
  49. * @param stdErr True, if the format should print to standard error instead of standard out.
  50. */
  51. public PrintSinkFunction(final boolean stdErr) {
  52. writer = new PrintSinkOutputWriter<>(stdErr);
  53. }
  54. /**
  55. * Instantiates a print sink function that prints to standard out and gives a sink identifier.
  56. *
  57. * @param stdErr True, if the format should print to standard error instead of standard out.
  58. * @param sinkIdentifier Message that identify sink and is prefixed to the output of the value
  59. */
  60. public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) {
  61. writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr);
  62. }
  63. @Override
  64. public void open(Configuration parameters) throws Exception {
  65. super.open(parameters);
  66. StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
  67. writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
  68. }
  69. @Override
  70. public void invoke(IN record) {
  71. writer.write(record);
  72. }
  73. @Override
  74. public String toString() {
  75. return writer.toString();
  76. }
  77. }

可以看到它就是实现了 RichSinkFunction 抽象类,然后实现了 invoke 方法,这里 invoke 方法就是把记录打印出来了就是,没做其他的额外操作。

1.2、使用

  1. package com.wells.flink.demo.sink;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
  8. import org.apache.flink.util.Collector;
  9. /**
  10. * Description
  11. * Created by wells on 2020-05-07 23:31:31
  12. */
  13. public class PrintSinkTest {
  14. public static void main(String[] args) throws Exception {
  15. String host = "127.0.0.1";
  16. int port = 8090;
  17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18. DataStreamSource<String> source = env.socketTextStream(host, port);
  19. SingleOutputStreamOperator<Tuple2<String, Integer>> counts = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
  20. @Override
  21. public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
  22. String[] words = line.split(" ");
  23. for (String word : words) {
  24. collector.collect(new Tuple2<String, Integer>(word, 1));
  25. }
  26. }
  27. }).keyBy(0).sum(1);
  28. // counts.print().setParallelism(1);
  29. // 这个效果与上面的 print 是一样的
  30. counts.addSink(new PrintSinkFunction<>());
  31. env.execute();
  32. }
  33. }

Flink目前支持的Sink如下:
image.png

相关组件支持(需要导入不同的组件依赖包):
image.png

参考:http://www.54tianzhisheng.cn/2018/10/29/flink-sink/