1. CREATE CATALOG iceberg WITH (
    2. 'type'='iceberg',
    3. 'catalog-type'='hive',
    4. 'uri'='thrift://172.16.48.191:9083',
    5. 'clients'='5',
    6. 'property-version'='1',
    7. 'hive-conf-dir'='/data/program/flink-114/conf/iceberg'
    8. );
    9. -- =================================
    10. -- 定义SourceView
    11. -- =================================
    12. -- 生成数据
    13. CREATE TABLE clicks_datagen
    14. (
    15. `user` VARCHAR COMMENT '用户名称',
    16. `cTime` AS LOCALTIMESTAMP COMMENT '点击时间',
    17. `url` VARCHAR COMMENT '点击链接',
    18. `procTime` AS PROCTIME() COMMENT '处理时间',
    19. WATERMARK FOR `cTime` AS `cTime` - INTERVAL '5' SECOND
    20. ) COMMENT '用户点击表'
    21. WITH (
    22. 'connector' = 'datagen',
    23. 'rows-per-second' = '100',
    24. 'fields.user.kind' = 'random',
    25. 'fields.user.length' = '5',
    26. 'fields.url.kind' = 'random',
    27. 'fields.url.length' = '5'
    28. );
    29. -- elasticsearch 6
    30. -- =================================
    31. -- elasticsearch sink
    32. CREATE TABLE clicks_es6
    33. (
    34. `user` VARCHAR COMMENT '用户名称',
    35. `cTime` TIMESTAMP COMMENT '点击时间',
    36. `url` VARCHAR COMMENT '点击链接'
    37. ) WITH (
    38. 'connector' = 'elasticsearch-6',
    39. 'document-type' = '_doc',
    40. 'hosts' = 'http://192.168.5.16:9200;http://192.168.5.17:9200',
    41. 'index' = 'merlin_flink_clicks-{cTime|yyyy-MM-dd}'
    42. );
    43. -- datagen --> kafka --> es6
    44. INSERT INTO iceberg.shuidi.clicks
    45. SELECT `user`,
    46. `cTime`,
    47. `url`
    48. FROM clicks_datagen;