为何要写入ClickHouse数据库,ClickHouse数据库作为专门解决大量数据统计分析的数据库,在保证了海量数据存储的能力,同时又兼顾了响应速度。而且还支持标准SQL,即灵活又易上手。
create table visitor_stats_2021 (
stt DateTime,
edt DateTime,
vc String,
ch String ,
ar String ,
is_new String ,
uv_ct UInt64,
pv_ct UInt64,
sv_ct UInt64,
uj_ct UInt64,
dur_sum UInt64,
ts UInt64
) engine =ReplacingMergeTree( ts)
partition by toYYYYMMDD(stt)
order by ( stt,edt,is_new,vc,ch,ar);
之所以选用ReplacingMergeTree引擎主要是靠它来保证数据表的幂等性。
Ø paritition by 把日期变为数字类型(如:20201126),用于分区。所以尽量保证查询条件尽量包含stt字段。
Ø order by 后面字段数据在同一分区下,出现重复会被去重,重复数据保留ts最大的数据。
2) 加入ClickHouse依赖包
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.0</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
其中flink-connector-jdbc 是官方通用的jdbcSink包。只要引入对应的jdbc驱动,flink可以用它应对各种支持jdbc的数据库,比如phoenix也可以用它。但是这个jdbc-sink只支持数据流对应一张数据表。如果是一流对多表,就必须通过自定义的方式实现了,比如之前的维度数据。
虽然这种jdbc-sink只能一流对一表,但是由于内部使用了预编译器,所以可以实现批量提交以优化写入速度。