CREATE CATALOG iceberg WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://172.16.48.191:9083', 'clients'='5', 'property-version'='1', 'hive-conf-dir'='/data/program/flink-114/conf/iceberg');-- =================================-- 定义Source,View-- =================================-- 生成数据CREATE TABLE clicks_datagen( `user` VARCHAR COMMENT '用户名称', `cTime` AS LOCALTIMESTAMP COMMENT '点击时间', `url` VARCHAR COMMENT '点击链接', `procTime` AS PROCTIME() COMMENT '处理时间', WATERMARK FOR `cTime` AS `cTime` - INTERVAL '5' SECOND) COMMENT '用户点击表'WITH ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.user.kind' = 'random', 'fields.user.length' = '5', 'fields.url.kind' = 'random', 'fields.url.length' = '5');-- elasticsearch 6-- =================================-- elasticsearch sinkCREATE TABLE clicks_es6( `user` VARCHAR COMMENT '用户名称', `cTime` TIMESTAMP COMMENT '点击时间', `url` VARCHAR COMMENT '点击链接') WITH ( 'connector' = 'elasticsearch-6', 'document-type' = '_doc', 'hosts' = 'http://192.168.5.16:9200;http://192.168.5.17:9200', 'index' = 'merlin_flink_clicks-{cTime|yyyy-MM-dd}');-- datagen --> kafka --> es6INSERT INTO iceberg.shuidi.clicksSELECT `user`,`cTime`,`url`FROM clicks_datagen;