为何要写入ClickHouse数据库,ClickHouse数据库作为专门解决大量数据统计分析的数据库,在保证了海量数据存储的能力,同时又兼顾了响应速度。而且还支持标准SQL,即灵活又易上手。

  1. create table visitor_stats_2021 (
  2. stt DateTime,
  3. edt DateTime,
  4. vc String,
  5. ch String ,
  6. ar String ,
  7. is_new String ,
  8. uv_ct UInt64,
  9. pv_ct UInt64,
  10. sv_ct UInt64,
  11. uj_ct UInt64,
  12. dur_sum UInt64,
  13. ts UInt64
  14. ) engine =ReplacingMergeTree( ts)
  15. partition by toYYYYMMDD(stt)
  16. order by ( stt,edt,is_new,vc,ch,ar);

之所以选用ReplacingMergeTree引擎主要是靠它来保证数据表的幂等性。
Ø paritition by 把日期变为数字类型(如:20201126),用于分区。所以尽量保证查询条件尽量包含stt字段。
Ø order by 后面字段数据在同一分区下,出现重复会被去重,重复数据保留ts最大的数据。

2) 加入ClickHouse依赖包

<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.3.0</version>
    <exclusions>
        <exclusion>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </exclusion>
        <exclusion>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

其中flink-connector-jdbc 是官方通用的jdbcSink包。只要引入对应的jdbc驱动,flink可以用它应对各种支持jdbc的数据库,比如phoenix也可以用它。但是这个jdbc-sink只支持数据流对应一张数据表。如果是一流对多表,就必须通过自定义的方式实现了,比如之前的维度数据。
虽然这种jdbc-sink只能一流对一表,但是由于内部使用了预编译器,所以可以实现批量提交以优化写入速度。