pom.xml文件引入
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>Spark_Test</artifactId><version>1.0-SNAPSHOT</version><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build><dependencies><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.1.40</version><!-- <scope>test</scope>--></dependency><!-- <dependency>--><!-- <groupId>org.apache.flink</groupId>--><!-- <artifactId>flink-clients_2.11</artifactId>--><!-- <version>1.3.2</version>--><!-- </dependency>--><!-- <dependency>--><!-- <groupId>org.apache.flink</groupId>--><!-- <artifactId>flink-table-api-java-bridge_2.11</artifactId>--><!-- <version>1.10.0</version>--><!-- </dependency>--><!-- <dependency>--><!-- <groupId>org.apache.flink</groupId>--><!-- <artifactId>flink-table</artifactId>--><!-- <version>1.10.0</version>--><!-- </dependency>--><!-- <dependency>--><!-- <groupId>org.apache.flink</groupId>--><!-- <artifactId>flink-runtime_2.11</artifactId>--><!-- <version>1.8.2</version>--><!-- </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.10.1</version></dependency><!-- <dependency>--><!-- <groupId>org.apache.flink</groupId>--><!-- <artifactId>flink-scala_2.11</artifactId>--><!-- <version>1.7.2</version>--><!-- </dependency>--><!-- <dependency>--><!-- <groupId>org.apache.flink</groupId>--><!-- <artifactId>flink-java</artifactId>--><!-- <version>1.8.2</version>--><!-- </dependency>--><!-- <dependency>--><!-- <groupId>org.apache.flink</groupId>--><!-- <artifactId>flink-streaming-scala_2.11</artifactId>--><!-- <version>1.7.2</version>--><!-- </dependency>--><!-- <dependency>--><!-- <groupId>org.apache.flink</groupId>--><!-- <artifactId>flink-streaming-java_2.11</artifactId>--><!-- <version>1.7.2</version>--><!-- </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.4</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime-blink_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>1.10.0</version></dependency></dependencies></project>
import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.java.StreamTableEnvironment;public class FlinkSinkClickHouse {public static void main(String[] args) {String url = "jdbc:clickhouse://localhost:8123/test";String user = "";String passwd = "";String driver = "ru.yandex.clickhouse.ClickHouseDriver";int batchsize = 500; // 设置batch size,测试的话可以设置小一点,这样可以立刻看到数据被写入EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);String query = "select 3,'jk'";Table table = bsTableEnv.sqlQuery(query);String insertIntoCkSql = "INSERT INTO user_local(id,name)\n" +"VALUES(?,?)";JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setDrivername(driver).setDBUrl(url).setUsername(user).setPassword(passwd).setQuery(insertIntoCkSql).setBatchSize(batchsize).setParameterTypes(Types.LONG, Types.STRING).build();String[] arr = {"id","name"};TypeInformation[] type = {Types.LONG, Types.STRING};bsTableEnv.registerTableSink("sink",arr,type,sink);bsTableEnv.insertInto(table, "sink");try {bsTableEnv.execute("Flink Table API to ClickHouse Example");} catch (Exception e) {e.printStackTrace();}}// public static void main(String[] args) {//// String url = "jdbc:clickhouse://localhost:8123/test";// String user = "";// String passwd = "";// String driver = "ru.yandex.clickhouse.ClickHouseDriver";// int batchsize = 500; // 设置batch size,测试的话可以设置小一点,这样可以立刻看到数据被写入////// // 创建执行环境//// EnvironmentSettings settings = EnvironmentSettings//// .newInstance()//// .useBlinkPlanner()//// .inStreamingMode()//// .build();////// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//// StreamTableEnvironment tbEnv = TableEnvironment(env);//////// String kafkaSource11 = "" +//// "CREATE TABLE user_behavior ( " +//// " `user_id` BIGINT, -- 用户id\n" +//// " `item_id` BIGINT, -- 商品id\n" +//// " `cat_id` BIGINT, -- 品类id\n" +//// " `action` STRING, -- 用户行为\n" +//// " `province` INT, -- 用户所在的省份\n" +//// " `ts` BIGINT, -- 用户行为发生的时间戳\n" +//// " `proctime` AS PROCTIME(), -- 通过计算列产生一个处理时间列\n" +//// " `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间\n" +//// " WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 在eventTime上定义watermark\n" +//// ") WITH ( 'connector' = 'kafka', -- 使用 kafka connector\n" +//// " 'topic' = 'user_behavior', -- kafka主题\n" +//// " 'scan.startup.mode' = 'earliest-offset', -- 偏移量,从起始 offset 开始读取\n" +//// " 'properties.group.id' = 'group1', -- 消费者组\n" +//// " 'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', -- kafka broker 地址\n" +//// " 'format' = 'json', -- 数据源格式为 json\n" +//// " 'json.fail-on-missing-field' = 'true',\n" +//// " 'json.ignore-parse-errors' = 'false'" +//// ")";//////// StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);//////////// // Kafka Source//// try {//// tEnv.execute(kafkaSource11);//// } catch (Exception e) {//// e.printStackTrace();//// }//// StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);//// String query = "select 1,'1';";// Table table = tEnv.sqlQuery(query);////// -- INSERT INTO user_local VALUES(1,'tom'),(2,'jack');// String insertIntoCkSql = "INSERT INTO test.user_local(id,name)\n" +// "VALUES(?,?)";//// //将数据写入 ClickHouse Sink// JDBCAppendTableSink sink = JDBCAppendTableSink// .builder()// .setDrivername(driver)// .setDBUrl(url)// .setUsername(user)// .setPassword(passwd)// .setQuery(insertIntoCkSql)// .setBatchSize(batchsize)// .setParameterTypes(Types.LONG, Types.STRING)// .build();//// String[] arr = {"id","name"};// TypeInformation[] type = {Types.LONG, Types.STRING};//// tEnv.registerTableSink(// "sink",// arr,// type,// sink// );//// tEnv.insertInto(table, "sink");//// try {// tEnv.execute("Flink Table API to ClickHouse Example");// } catch (Exception e) {// e.printStackTrace();// }// }}
