pom.xml文件引入

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <modelVersion>4.0.0</modelVersion>
    6. <groupId>org.example</groupId>
    7. <artifactId>Spark_Test</artifactId>
    8. <version>1.0-SNAPSHOT</version>
    9. <build>
    10. <plugins>
    11. <plugin>
    12. <groupId>org.apache.maven.plugins</groupId>
    13. <artifactId>maven-compiler-plugin</artifactId>
    14. <configuration>
    15. <source>8</source>
    16. <target>8</target>
    17. </configuration>
    18. </plugin>
    19. </plugins>
    20. </build>
    21. <dependencies>
    22. <dependency>
    23. <groupId>ru.yandex.clickhouse</groupId>
    24. <artifactId>clickhouse-jdbc</artifactId>
    25. <version>0.1.40</version>
    26. <!-- <scope>test</scope>-->
    27. </dependency>
    28. <!-- <dependency>-->
    29. <!-- <groupId>org.apache.flink</groupId>-->
    30. <!-- <artifactId>flink-clients_2.11</artifactId>-->
    31. <!-- <version>1.3.2</version>-->
    32. <!-- </dependency>-->
    33. <!-- <dependency>-->
    34. <!-- <groupId>org.apache.flink</groupId>-->
    35. <!-- <artifactId>flink-table-api-java-bridge_2.11</artifactId>-->
    36. <!-- <version>1.10.0</version>-->
    37. <!-- </dependency>-->
    38. <!-- <dependency>-->
    39. <!-- <groupId>org.apache.flink</groupId>-->
    40. <!-- <artifactId>flink-table</artifactId>-->
    41. <!-- <version>1.10.0</version>-->
    42. <!-- </dependency>-->
    43. <!-- <dependency>-->
    44. <!-- <groupId>org.apache.flink</groupId>-->
    45. <!-- <artifactId>flink-runtime_2.11</artifactId>-->
    46. <!-- <version>1.8.2</version>-->
    47. <!-- </dependency>-->
    48. <dependency>
    49. <groupId>org.apache.flink</groupId>
    50. <artifactId>flink-jdbc_2.11</artifactId>
    51. <version>1.10.1</version>
    52. </dependency>
    53. <!-- <dependency>-->
    54. <!-- <groupId>org.apache.flink</groupId>-->
    55. <!-- <artifactId>flink-scala_2.11</artifactId>-->
    56. <!-- <version>1.7.2</version>-->
    57. <!-- </dependency>-->
    58. <!-- <dependency>-->
    59. <!-- <groupId>org.apache.flink</groupId>-->
    60. <!-- <artifactId>flink-java</artifactId>-->
    61. <!-- <version>1.8.2</version>-->
    62. <!-- </dependency>-->
    63. <!-- <dependency>-->
    64. <!-- <groupId>org.apache.flink</groupId>-->
    65. <!-- <artifactId>flink-streaming-scala_2.11</artifactId>-->
    66. <!-- <version>1.7.2</version>-->
    67. <!-- </dependency>-->
    68. <!-- <dependency>-->
    69. <!-- <groupId>org.apache.flink</groupId>-->
    70. <!-- <artifactId>flink-streaming-java_2.11</artifactId>-->
    71. <!-- <version>1.7.2</version>-->
    72. <!-- </dependency>-->
    73. <dependency>
    74. <groupId>org.apache.flink</groupId>
    75. <artifactId>flink-streaming-java_2.11</artifactId>
    76. <version>1.10.0</version>
    77. </dependency>
    78. <dependency>
    79. <groupId>org.projectlombok</groupId>
    80. <artifactId>lombok</artifactId>
    81. <version>1.18.4</version>
    82. </dependency>
    83. <dependency>
    84. <groupId>org.slf4j</groupId>
    85. <artifactId>slf4j-simple</artifactId>
    86. <version>1.7.25</version>
    87. </dependency>
    88. <dependency>
    89. <groupId>org.apache.flink</groupId>
    90. <artifactId>flink-table-api-java</artifactId>
    91. <version>1.10.0</version>
    92. </dependency>
    93. <dependency>
    94. <groupId>org.apache.flink</groupId>
    95. <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    96. <version>1.10.0</version>
    97. </dependency>
    98. <dependency>
    99. <groupId>org.apache.flink</groupId>
    100. <artifactId>flink-table-planner-blink_2.11</artifactId>
    101. <version>1.10.0</version>
    102. </dependency>
    103. <dependency>
    104. <groupId>org.apache.flink</groupId>
    105. <artifactId>flink-table-runtime-blink_2.11</artifactId>
    106. <version>1.10.0</version>
    107. </dependency>
    108. <dependency>
    109. <groupId>org.apache.flink</groupId>
    110. <artifactId>flink-table-common</artifactId>
    111. <version>1.10.0</version>
    112. </dependency>
    113. </dependencies>
    114. </project>
    1. import org.apache.flink.api.common.typeinfo.TypeInformation;
    2. import org.apache.flink.api.common.typeinfo.Types;
    3. import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    5. import org.apache.flink.table.api.EnvironmentSettings;
    6. import org.apache.flink.table.api.Table;
    7. import org.apache.flink.table.api.java.StreamTableEnvironment;
    8. public class FlinkSinkClickHouse {
    9. public static void main(String[] args) {
    10. String url = "jdbc:clickhouse://localhost:8123/test";
    11. String user = "";
    12. String passwd = "";
    13. String driver = "ru.yandex.clickhouse.ClickHouseDriver";
    14. int batchsize = 500; // 设置batch size,测试的话可以设置小一点,这样可以立刻看到数据被写入
    15. EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    16. StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    17. StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
    18. String query = "select 3,'jk'";
    19. Table table = bsTableEnv.sqlQuery(query);
    20. String insertIntoCkSql = "INSERT INTO user_local(id,name)\n" +
    21. "VALUES(?,?)";
    22. JDBCAppendTableSink sink = JDBCAppendTableSink
    23. .builder()
    24. .setDrivername(driver)
    25. .setDBUrl(url)
    26. .setUsername(user)
    27. .setPassword(passwd)
    28. .setQuery(insertIntoCkSql)
    29. .setBatchSize(batchsize)
    30. .setParameterTypes(Types.LONG, Types.STRING)
    31. .build();
    32. String[] arr = {"id","name"};
    33. TypeInformation[] type = {Types.LONG, Types.STRING};
    34. bsTableEnv.registerTableSink(
    35. "sink",
    36. arr,
    37. type,
    38. sink
    39. );
    40. bsTableEnv.insertInto(table, "sink");
    41. try {
    42. bsTableEnv.execute("Flink Table API to ClickHouse Example");
    43. } catch (Exception e) {
    44. e.printStackTrace();
    45. }
    46. }
    47. // public static void main(String[] args) {
    48. //
    49. // String url = "jdbc:clickhouse://localhost:8123/test";
    50. // String user = "";
    51. // String passwd = "";
    52. // String driver = "ru.yandex.clickhouse.ClickHouseDriver";
    53. // int batchsize = 500; // 设置batch size,测试的话可以设置小一点,这样可以立刻看到数据被写入
    54. //
    55. //
    56. // // 创建执行环境
    57. //// EnvironmentSettings settings = EnvironmentSettings
    58. //// .newInstance()
    59. //// .useBlinkPlanner()
    60. //// .inStreamingMode()
    61. //// .build();
    62. //
    63. //// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    64. // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    65. //
    66. // StreamTableEnvironment tbEnv = TableEnvironment(env);
    67. //
    68. //
    69. //// String kafkaSource11 = "" +
    70. //// "CREATE TABLE user_behavior ( " +
    71. //// " `user_id` BIGINT, -- 用户id\n" +
    72. //// " `item_id` BIGINT, -- 商品id\n" +
    73. //// " `cat_id` BIGINT, -- 品类id\n" +
    74. //// " `action` STRING, -- 用户行为\n" +
    75. //// " `province` INT, -- 用户所在的省份\n" +
    76. //// " `ts` BIGINT, -- 用户行为发生的时间戳\n" +
    77. //// " `proctime` AS PROCTIME(), -- 通过计算列产生一个处理时间列\n" +
    78. //// " `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间\n" +
    79. //// " WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 在eventTime上定义watermark\n" +
    80. //// ") WITH ( 'connector' = 'kafka', -- 使用 kafka connector\n" +
    81. //// " 'topic' = 'user_behavior', -- kafka主题\n" +
    82. //// " 'scan.startup.mode' = 'earliest-offset', -- 偏移量,从起始 offset 开始读取\n" +
    83. //// " 'properties.group.id' = 'group1', -- 消费者组\n" +
    84. //// " 'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', -- kafka broker 地址\n" +
    85. //// " 'format' = 'json', -- 数据源格式为 json\n" +
    86. //// " 'json.fail-on-missing-field' = 'true',\n" +
    87. //// " 'json.ignore-parse-errors' = 'false'" +
    88. //// ")";
    89. ////
    90. //// StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
    91. ////
    92. ////
    93. //// // Kafka Source
    94. //// try {
    95. //// tEnv.execute(kafkaSource11);
    96. //// } catch (Exception e) {
    97. //// e.printStackTrace();
    98. //// }
    99. //// StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
    100. //
    101. // String query = "select 1,'1';";
    102. // Table table = tEnv.sqlQuery(query);
    103. //
    104. //// -- INSERT INTO user_local VALUES(1,'tom'),(2,'jack');
    105. // String insertIntoCkSql = "INSERT INTO test.user_local(id,name)\n" +
    106. // "VALUES(?,?)";
    107. //
    108. // //将数据写入 ClickHouse Sink
    109. // JDBCAppendTableSink sink = JDBCAppendTableSink
    110. // .builder()
    111. // .setDrivername(driver)
    112. // .setDBUrl(url)
    113. // .setUsername(user)
    114. // .setPassword(passwd)
    115. // .setQuery(insertIntoCkSql)
    116. // .setBatchSize(batchsize)
    117. // .setParameterTypes(Types.LONG, Types.STRING)
    118. // .build();
    119. //
    120. // String[] arr = {"id","name"};
    121. // TypeInformation[] type = {Types.LONG, Types.STRING};
    122. //
    123. // tEnv.registerTableSink(
    124. // "sink",
    125. // arr,
    126. // type,
    127. // sink
    128. // );
    129. //
    130. // tEnv.insertInto(table, "sink");
    131. //
    132. // try {
    133. // tEnv.execute("Flink Table API to ClickHouse Example");
    134. // } catch (Exception e) {
    135. // e.printStackTrace();
    136. // }
    137. // }
    138. }