CREATE CATALOG iceberg WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://172.16.48.191:9083',
'clients'='5',
'property-version'='1',
'hive-conf-dir'='/data/program/flink-114/conf/iceberg'
);
-- =================================
-- 定义Source,View
-- =================================
-- 生成数据
CREATE TABLE clicks_datagen
(
`user` VARCHAR COMMENT '用户名称',
`cTime` AS LOCALTIMESTAMP COMMENT '点击时间',
`url` VARCHAR COMMENT '点击链接',
`procTime` AS PROCTIME() COMMENT '处理时间',
WATERMARK FOR `cTime` AS `cTime` - INTERVAL '5' SECOND
) COMMENT '用户点击表'
WITH (
'connector' = 'datagen',
'rows-per-second' = '100',
'fields.user.kind' = 'random',
'fields.user.length' = '5',
'fields.url.kind' = 'random',
'fields.url.length' = '5'
);
-- elasticsearch 6
-- =================================
-- elasticsearch sink
CREATE TABLE clicks_es6
(
`user` VARCHAR COMMENT '用户名称',
`cTime` TIMESTAMP COMMENT '点击时间',
`url` VARCHAR COMMENT '点击链接'
) WITH (
'connector' = 'elasticsearch-6',
'document-type' = '_doc',
'hosts' = 'http://192.168.5.16:9200;http://192.168.5.17:9200',
'index' = 'merlin_flink_clicks-{cTime|yyyy-MM-dd}'
);
-- datagen --> kafka --> es6
INSERT INTO iceberg.shuidi.clicks
SELECT `user`,
`cTime`,
`url`
FROM clicks_datagen;