pom.xml文件引入
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>Spark_Test</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.1.40</version>
<!-- <scope>test</scope>-->
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-clients_2.11</artifactId>-->
<!-- <version>1.3.2</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-table-api-java-bridge_2.11</artifactId>-->
<!-- <version>1.10.0</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-table</artifactId>-->
<!-- <version>1.10.0</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-runtime_2.11</artifactId>-->
<!-- <version>1.8.2</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.10.1</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-scala_2.11</artifactId>-->
<!-- <version>1.7.2</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-java</artifactId>-->
<!-- <version>1.8.2</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-streaming-scala_2.11</artifactId>-->
<!-- <version>1.7.2</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-streaming-java_2.11</artifactId>-->
<!-- <version>1.7.2</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.10.0</version>
</dependency>
</dependencies>
</project>
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
public class FlinkSinkClickHouse {
public static void main(String[] args) {
String url = "jdbc:clickhouse://localhost:8123/test";
String user = "";
String passwd = "";
String driver = "ru.yandex.clickhouse.ClickHouseDriver";
int batchsize = 500; // 设置batch size,测试的话可以设置小一点,这样可以立刻看到数据被写入
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
String query = "select 3,'jk'";
Table table = bsTableEnv.sqlQuery(query);
String insertIntoCkSql = "INSERT INTO user_local(id,name)\n" +
"VALUES(?,?)";
JDBCAppendTableSink sink = JDBCAppendTableSink
.builder()
.setDrivername(driver)
.setDBUrl(url)
.setUsername(user)
.setPassword(passwd)
.setQuery(insertIntoCkSql)
.setBatchSize(batchsize)
.setParameterTypes(Types.LONG, Types.STRING)
.build();
String[] arr = {"id","name"};
TypeInformation[] type = {Types.LONG, Types.STRING};
bsTableEnv.registerTableSink(
"sink",
arr,
type,
sink
);
bsTableEnv.insertInto(table, "sink");
try {
bsTableEnv.execute("Flink Table API to ClickHouse Example");
} catch (Exception e) {
e.printStackTrace();
}
}
// public static void main(String[] args) {
//
// String url = "jdbc:clickhouse://localhost:8123/test";
// String user = "";
// String passwd = "";
// String driver = "ru.yandex.clickhouse.ClickHouseDriver";
// int batchsize = 500; // 设置batch size,测试的话可以设置小一点,这样可以立刻看到数据被写入
//
//
// // 创建执行环境
//// EnvironmentSettings settings = EnvironmentSettings
//// .newInstance()
//// .useBlinkPlanner()
//// .inStreamingMode()
//// .build();
//
//// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//
// StreamTableEnvironment tbEnv = TableEnvironment(env);
//
//
//// String kafkaSource11 = "" +
//// "CREATE TABLE user_behavior ( " +
//// " `user_id` BIGINT, -- 用户id\n" +
//// " `item_id` BIGINT, -- 商品id\n" +
//// " `cat_id` BIGINT, -- 品类id\n" +
//// " `action` STRING, -- 用户行为\n" +
//// " `province` INT, -- 用户所在的省份\n" +
//// " `ts` BIGINT, -- 用户行为发生的时间戳\n" +
//// " `proctime` AS PROCTIME(), -- 通过计算列产生一个处理时间列\n" +
//// " `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间\n" +
//// " WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 在eventTime上定义watermark\n" +
//// ") WITH ( 'connector' = 'kafka', -- 使用 kafka connector\n" +
//// " 'topic' = 'user_behavior', -- kafka主题\n" +
//// " 'scan.startup.mode' = 'earliest-offset', -- 偏移量,从起始 offset 开始读取\n" +
//// " 'properties.group.id' = 'group1', -- 消费者组\n" +
//// " 'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', -- kafka broker 地址\n" +
//// " 'format' = 'json', -- 数据源格式为 json\n" +
//// " 'json.fail-on-missing-field' = 'true',\n" +
//// " 'json.ignore-parse-errors' = 'false'" +
//// ")";
////
//// StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
////
////
//// // Kafka Source
//// try {
//// tEnv.execute(kafkaSource11);
//// } catch (Exception e) {
//// e.printStackTrace();
//// }
//// StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
//
// String query = "select 1,'1';";
// Table table = tEnv.sqlQuery(query);
//
//// -- INSERT INTO user_local VALUES(1,'tom'),(2,'jack');
// String insertIntoCkSql = "INSERT INTO test.user_local(id,name)\n" +
// "VALUES(?,?)";
//
// //将数据写入 ClickHouse Sink
// JDBCAppendTableSink sink = JDBCAppendTableSink
// .builder()
// .setDrivername(driver)
// .setDBUrl(url)
// .setUsername(user)
// .setPassword(passwd)
// .setQuery(insertIntoCkSql)
// .setBatchSize(batchsize)
// .setParameterTypes(Types.LONG, Types.STRING)
// .build();
//
// String[] arr = {"id","name"};
// TypeInformation[] type = {Types.LONG, Types.STRING};
//
// tEnv.registerTableSink(
// "sink",
// arr,
// type,
// sink
// );
//
// tEnv.insertInto(table, "sink");
//
// try {
// tEnv.execute("Flink Table API to ClickHouse Example");
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
}