前言
现FlinkSQL支持的连接如下
Name | Version | Source | Sink |
---|---|---|---|
Filesystem | Bounded and Unbounded Scan, Lookup | Streaming Sink, Batch Sink | |
Elasticsearch | 6.x & 7.x | Not supported | Streaming Sink, Batch Sink |
Apache Kafka | 0.10+ | Unbounded Scan | Streaming Sink, Batch Sink |
Amazon Kinesis Data Streams | Unbounded Scan | Streaming Sink | |
JDBC | Bounded Scan, Lookup | Streaming Sink, Batch Sink | |
Apache HBase | 1.4.x & 2.2.x | Bounded Scan, Lookup | Streaming Sink, Batch Sink |
Apache Hive | Supported Versions | Unbounded Scan, Bounded Scan, Lookup | Streaming Sink, Batch Sink |
然而在实际开发过程中,应对不同的场景和不同公司的开发需求,这需要你自己定义数据源或者数据落脚点
Kudu sink(别的sink以此为例)
1.继承一个RichSinkFunction
package com.wang.kudu;
import com.wang.util.ConfigOptions;import com.wang.util.KUDUUtil;import org.apache.flink.configuration.Configuration;import org.apache.flink.configuration.ReadableConfig;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter;import org.apache.flink.table.data.RowData;import org.apache.flink.table.types.DataType;import org.apache.flink.types.Row;import org.apache.kudu.client.KuduClient;
import java.util.;
_/**
@Desc 具体实现类 @Author wang bo
@Date 2021/5/24 15:49
*/_public class KuduSinkFunction extends RichSinkFunction
private final DataStructureConverter converter;<br /> private final ReadableConfig options;<br /> private final DataType type;<br /> private KuduClient connection;
public KuduSinkFunction(<br /> DataStructureConverter converter, ReadableConfig options, DataType type) {<br /> this.converter = converter;<br /> this.options = options;<br /> this.type = type;<br /> }
@Override public void open(Configuration parameters) throws Exception {<br /> super.open(parameters);<br /> connection = KUDUUtil.getConnection();<br /> }
@Override public void invoke(RowData rowData, Context context) throws Exception {<br /> _// RowKind rowKind = rowData.getRowKind();_ _// if (rowKind.equals(RowKind.UPDATE_AFTER) || rowKind.equals(RowKind.INSERT))_ Row data = (Row) converter.toExternal(rowData);<br /> Set<String> fieldNames = data.getFieldNames(true);<br /> Map<String, Object> value =new HashMap<>();<br /> String dataBase = options.get(ConfigOptions.DATA_BASE);<br /> String tableName = options.get(ConfigOptions.TABLE_NAME);<br /> for (String fieldName : fieldNames) {<br /> Object fieldValue = data.getField(fieldName);<br /> value.put(fieldName,fieldValue);<br /> }<br /> if(!value.isEmpty()){<br /> KUDUUtil.operateRows("upsert",connection,"impala::"+dataBase+"."+tableName, Arrays.asList(value));<br /> }<br /> }
@Override public void close() throws Exception {<br /> super.close();<br /> }<br />}
2.实现一个DynamicTableSink
package com.wang.kudu;
import org.apache.flink.configuration.ReadableConfig;import org.apache.flink.table.connector.ChangelogMode;import org.apache.flink.table.connector.sink.DynamicTableSink;import org.apache.flink.table.connector.sink.SinkFunctionProvider;import org.apache.flink.table.types.DataType;
_/*
@Desc @Author wang bo
@Date 2021/5/24 15:39
*/_public class KuduSink implements DynamicTableSink {
private final DataType type;
private final ReadableConfig options;
public KuduSink(DataType type, ReadableConfig options){<br /> this.type=type;<br /> this.options=options;<br /> }<br /> @Override public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {<br /> return changelogMode;<br /> }
@Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {<br /> DataStructureConverter converter = context.createDataStructureConverter(type);<br /> return SinkFunctionProvider.of(new KuduSinkFunction(converter,options,type));<br /> }
@Override public DynamicTableSink copy() {<br /> return null;<br /> }
@Override public String asSummaryString() {<br /> return null;<br /> }<br />}
3.配置信息
package com.wang.util;
import org.apache.flink.configuration.ConfigOption;
import static org.apache.flink.configuration.ConfigOptions.key;
_/*
@Desc @Author wang bo
@Date 2021/5/24 17:37
*/_public class ConfigOptions {
public static final ConfigOption<String> HOST_PORT = key("hostPort")<br /> .stringType()<br /> .noDefaultValue()<br /> .withDescription("kudu host and port,");
public static final ConfigOption<String> TABLE_NAME = key("tableName")<br /> .stringType()<br /> .noDefaultValue()<br /> .withDescription("table,");
public static final ConfigOption<String> DATA_BASE = key("dataBase")<br /> .stringType()<br /> .noDefaultValue()<br /> .withDescription("database,");
4.实现一个DynamicTableSinkFactory
package com.wang.kudu;
import com.wang.util.ConfigOptions;import org.apache.flink.annotation.Internal;import org.apache.flink.configuration.ConfigOption;import org.apache.flink.configuration.ReadableConfig;import org.apache.flink.table.connector.sink.DynamicTableSink;import org.apache.flink.table.connector.source.DynamicTableSource;import org.apache.flink.table.factories.DynamicTableSinkFactory;import org.apache.flink.table.factories.DynamicTableSourceFactory;import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;import java.util.Set;/*
@Desc @Author wang bo
@Date 2021/5/24 15:35
*/@Internalpublic class KuduFactory implements DynamicTableSinkFactory {
public static final String IDENTIFIER = “kudu”;
public KuduFactory(){}<br /> @Override public DynamicTableSink createDynamicTableSink(Context context) {<br /> FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);<br /> helper.validate();<br /> ReadableConfig options = helper.getOptions();<br /> return new KuduSink(<br /> context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(),<br /> options<br /> );<br /> }
@Override public String factoryIdentifier() {<br /> return IDENTIFIER;<br /> }
@Override public Set<ConfigOption<?>> requiredOptions() {<br /> return new HashSet<>();<br /> }
@Override public Set<ConfigOption<?>> optionalOptions() {<br /> Set<ConfigOption<?>> options = new HashSet<>();<br /> options.add(ConfigOptions.HOST_PORT);<br /> options.add(ConfigOptions.DATA_BASE);<br /> options.add(ConfigOptions.TABLE_NAME);<br /> return options;<br /> }
6.添加resources
需要在resources新建META-INF.services/org.apache.flink.table.factories.Factory文件 不然会报错 内容:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the “License”); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0__#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
_com.wang.kudu.KuduFactory
7.测试
def fromCSVToKUDU(): Unit = {
val sql_csv =<br /> """<br /> |CREATE TABLE test_something (<br /> | id BIGINT,<br /> | order_number STRING,<br /> | app_code STRING<br /> |) WITH (<br /> | 'connector' = 'filesystem',<br /> | 'path' = 'D:\idea_project\study\input\something.csv',<br /> | 'format' = 'csv'<br /> |)<br /> |""".stripMargin
tableBatchEnv.executeSql(sql_csv)
val sql_kudu =<br /> """<br /> |CREATE TABLE test_flink (<br /> | id BIGINT,<br /> | order_number STRING,<br /> | app_code STRING<br /> |) WITH (<br /> | 'connector' = 'kudu',<br /> | 'hostPort' = 'xxxx:7051,xxxx:7051,xxxx:7051',<br /> | 'dataBase' = 'default',<br /> | 'tableName' = 'test_flink'<br /> |)<br /> |""".stripMargin
tableBatchEnv.executeSql(sql_kudu)<br /> tableBatchEnv.executeSql("insert into test_flink select * from test_something")<br /> }<br />至此,去kudu表查询一些,数据已经进入kudu表