前言

现FlinkSQL支持的连接如下

Name Version Source Sink
Filesystem Bounded and Unbounded Scan, Lookup Streaming Sink, Batch Sink
Elasticsearch 6.x & 7.x Not supported Streaming Sink, Batch Sink
Apache Kafka 0.10+ Unbounded Scan Streaming Sink, Batch Sink
Amazon Kinesis Data Streams Unbounded Scan Streaming Sink
JDBC Bounded Scan, Lookup Streaming Sink, Batch Sink
Apache HBase 1.4.x & 2.2.x Bounded Scan, Lookup Streaming Sink, Batch Sink
Apache Hive Supported Versions Unbounded Scan, Bounded Scan, Lookup Streaming Sink, Batch Sink

然而在实际开发过程中,应对不同的场景和不同公司的开发需求,这需要你自己定义数据源或者数据落脚点

Kudu sink(别的sink以此为例)

1.继承一个RichSinkFunction

package com.wang.kudu;
import com.wang.util.ConfigOptions;import com.wang.util.KUDUUtil;import org.apache.flink.configuration.Configuration;import org.apache.flink.configuration.ReadableConfig;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter;import org.apache.flink.table.data.RowData;import org.apache.flink.table.types.DataType;import org.apache.flink.types.Row;import org.apache.kudu.client.KuduClient;

import java.util.;
_/**
@Desc 具体实现类 @Author wang bo
@Date 2021/5/24 15:49
*/_public class KuduSinkFunction extends RichSinkFunction {

  1. private final DataStructureConverter converter;<br /> private final ReadableConfig options;<br /> private final DataType type;<br /> private KuduClient connection;
  2. public KuduSinkFunction(<br /> DataStructureConverter converter, ReadableConfig options, DataType type) {<br /> this.converter = converter;<br /> this.options = options;<br /> this.type = type;<br /> }
  3. @Override public void open(Configuration parameters) throws Exception {<br /> super.open(parameters);<br /> connection = KUDUUtil.getConnection();<br /> }
  4. @Override public void invoke(RowData rowData, Context context) throws Exception {<br /> _// RowKind rowKind = rowData.getRowKind();_ _// if (rowKind.equals(RowKind.UPDATE_AFTER) || rowKind.equals(RowKind.INSERT))_ Row data = (Row) converter.toExternal(rowData);<br /> Set<String> fieldNames = data.getFieldNames(true);<br /> Map<String, Object> value =new HashMap<>();<br /> String dataBase = options.get(ConfigOptions.DATA_BASE);<br /> String tableName = options.get(ConfigOptions.TABLE_NAME);<br /> for (String fieldName : fieldNames) {<br /> Object fieldValue = data.getField(fieldName);<br /> value.put(fieldName,fieldValue);<br /> }<br /> if(!value.isEmpty()){<br /> KUDUUtil.operateRows("upsert",connection,"impala::"+dataBase+"."+tableName, Arrays.asList(value));<br /> }<br /> }
  5. @Override public void close() throws Exception {<br /> super.close();<br /> }<br />}

2.实现一个DynamicTableSink

package com.wang.kudu;
import org.apache.flink.configuration.ReadableConfig;import org.apache.flink.table.connector.ChangelogMode;import org.apache.flink.table.connector.sink.DynamicTableSink;import org.apache.flink.table.connector.sink.SinkFunctionProvider;import org.apache.flink.table.types.DataType;

_/*
@Desc @Author wang bo
@Date 2021/5/24 15:39
*/_public class KuduSink implements DynamicTableSink {
private final DataType type;
private final ReadableConfig options;

  1. public KuduSink(DataType type, ReadableConfig options){<br /> this.type=type;<br /> this.options=options;<br /> }<br /> @Override public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {<br /> return changelogMode;<br /> }
  2. @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {<br /> DataStructureConverter converter = context.createDataStructureConverter(type);<br /> return SinkFunctionProvider.of(new KuduSinkFunction(converter,options,type));<br /> }
  3. @Override public DynamicTableSink copy() {<br /> return null;<br /> }
  4. @Override public String asSummaryString() {<br /> return null;<br /> }<br />}

3.配置信息

package com.wang.util;
import org.apache.flink.configuration.ConfigOption;
import static org.apache.flink.configuration.ConfigOptions.key;
_/*
@Desc @Author wang bo
@Date 2021/5/24 17:37
*/_public class ConfigOptions {

  1. public static final ConfigOption<String> HOST_PORT = key("hostPort")<br /> .stringType()<br /> .noDefaultValue()<br /> .withDescription("kudu host and port,");
  2. public static final ConfigOption<String> TABLE_NAME = key("tableName")<br /> .stringType()<br /> .noDefaultValue()<br /> .withDescription("table,");
  3. public static final ConfigOption<String> DATA_BASE = key("dataBase")<br /> .stringType()<br /> .noDefaultValue()<br /> .withDescription("database,");

}

4.实现一个DynamicTableSinkFactory

package com.wang.kudu;
import com.wang.util.ConfigOptions;import org.apache.flink.annotation.Internal;import org.apache.flink.configuration.ConfigOption;import org.apache.flink.configuration.ReadableConfig;import org.apache.flink.table.connector.sink.DynamicTableSink;import org.apache.flink.table.connector.source.DynamicTableSource;import org.apache.flink.table.factories.DynamicTableSinkFactory;import org.apache.flink.table.factories.DynamicTableSourceFactory;import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;import java.util.Set;/*
@Desc @Author wang bo
@Date 2021/5/24 15:35
*/
@Internalpublic class KuduFactory implements DynamicTableSinkFactory {
public static final String IDENTIFIER = “kudu”;

  1. public KuduFactory(){}<br /> @Override public DynamicTableSink createDynamicTableSink(Context context) {<br /> FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);<br /> helper.validate();<br /> ReadableConfig options = helper.getOptions();<br /> return new KuduSink(<br /> context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(),<br /> options<br /> );<br /> }
  2. @Override public String factoryIdentifier() {<br /> return IDENTIFIER;<br /> }
  3. @Override public Set<ConfigOption<?>> requiredOptions() {<br /> return new HashSet<>();<br /> }
  4. @Override public Set<ConfigOption<?>> optionalOptions() {<br /> Set<ConfigOption<?>> options = new HashSet<>();<br /> options.add(ConfigOptions.HOST_PORT);<br /> options.add(ConfigOptions.DATA_BASE);<br /> options.add(ConfigOptions.TABLE_NAME);<br /> return options;<br /> }

}

6.添加resources

需要在resources新建META-INF.services/org.apache.flink.table.factories.Factory文件 不然会报错 内容:
FlinkSQL 自定义实现Sink - 图1
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the “License”); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
#
http://www.apache.org/licenses/LICENSE-2.0__#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
_com.wang.kudu.KuduFactory

7.测试


def fromCSVToKUDU(): Unit = {

  1. val sql_csv =<br /> """<br /> |CREATE TABLE test_something (<br /> | id BIGINT,<br /> | order_number STRING,<br /> | app_code STRING<br /> |) WITH (<br /> | 'connector' = 'filesystem',<br /> | 'path' = 'D:\idea_project\study\input\something.csv',<br /> | 'format' = 'csv'<br /> |)<br /> |""".stripMargin
  2. tableBatchEnv.executeSql(sql_csv)
  3. val sql_kudu =<br /> """<br /> |CREATE TABLE test_flink (<br /> | id BIGINT,<br /> | order_number STRING,<br /> | app_code STRING<br /> |) WITH (<br /> | 'connector' = 'kudu',<br /> | 'hostPort' = 'xxxx:7051,xxxx:7051,xxxx:7051',<br /> | 'dataBase' = 'default',<br /> | 'tableName' = 'test_flink'<br /> |)<br /> |""".stripMargin
  4. tableBatchEnv.executeSql(sql_kudu)<br /> tableBatchEnv.executeSql("insert into test_flink select * from test_something")<br /> }<br />至此,去kudu表查询一些,数据已经进入kudu