运行环境配置

集群配置

  1. execution.checkpointing.unaligned=true
  2. execution.checkpointing.interval=10 min
  3. containerized.taskmanager.env.PYTHONPATH=/home/appweb/shuidi/software/python

Python配置

  1. import random
  2. @udf(result_type=DataTypes.STRING(), func_type="pandas")
  3. def generate_url(column):
  4. urls = [
  5. 'http://iceye.wacai.info/stream-schema/info',
  6. 'http://druid.router.wacai.info/unified-console.html#query',
  7. 'http://merlin-wdp.wacai.info/flink/job/list',
  8. 'http://cdhmanager.stanlee.wacai.info/cmf/login',
  9. 'http://middleware.wacai.info/hermes-web/dashboard',
  10. 'http://streaming-yarn.stanlee.wacai.info/cluster/apps/RUNNING',
  11. 'http://team.caimi-inc.com/',
  12. 'https://loan.wacai.info/wups/roleApply',
  13. 'http://mario.wacai.info/home#',
  14. 'http://biuc.wacai.info/',
  15. 'https://logs.wacai.info/app/kibana#/home?_g=()',
  16. 'http://awm.wacai.info/awm/notebook',
  17. 'http://grafana.wacai.info/?orgId=1',
  18. 'http://wafe.qsh.wacai.info/configMgr',
  19. 'http://sensordata.wacai.info/login/index.html',
  20. 'http://zeppelin.wacai.info/#/',
  21. 'http://prometheus-qsh.wacai.info/graph',
  22. 'http://bitask.caimi-inc.com/index.php?m=script&a=index#',
  23. 'http://platform.wacai.info/prophet-node/job',
  24. 'http://atlas.wacai.info/index.html#!/glossary'
  25. ]
  26. return column.map(lambda item: random.choice(urls))
  27. st_env.drop_temporary_function('generate_url')
  28. st_env.create_temporary_function('generate_url', generate_url)

SQL配置

SQL运行配置

  1. -- SQL运行配置
  2. SET table.dynamic-table-options.enabled=true; -- 开启 SQL hints功能
  3. SET table.exec.mini-batch.enabled=true; -- 开启mini-batch
  4. SET table.exec.mini-batch.allow-latency=5 s;
  5. SET table.exec.mini-batch.size=5000;
  6. SET table.exec.state.ttl=48 h; -- 设置State保存时长
  7. SET python.executable=/data/program/miniconda3/envs/pyflink/bin/python; -- 设计python运行程序路径

创建Catalog

  1. CREATE CATALOG iceberg WITH (
  2. 'type'='iceberg',
  3. 'catalog-type'='hive',
  4. 'uri'='thrift://172.16.48.191:9083',
  5. 'clients'='5',
  6. 'property-version'='1',
  7. 'hive-conf-dir'='/data/program/flink-114/conf/iceberg'
  8. );
  9. -- flink jdbc url参数配置
  10. -- jdbc:mysql://192.168.5.12:3307?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&useServerPrepStmts=true&cachePrepStmts=true&allowMultiQueries=true&useConfigs=maxPerformance
  11. CREATE CATALOG jdbc WITH(
  12. 'type' = 'jdbc',
  13. 'default-database' = 'data_dict',
  14. 'username' = 'qa_conn',
  15. 'password' = 'qa_conn',
  16. 'base-url' = 'jdbc:mysql://192.168.5.12:3307'
  17. );
  18. CREATE CATALOG mysql_cdc WITH(
  19. 'type' = 'mysql-cdc',
  20. 'default-database' = 'wac_edu',
  21. 'hostname' = '192.168.5.14',
  22. 'port' = '3342',
  23. 'username' = 'qa_conn',
  24. 'password' = 'qa_conn'
  25. );

创建基本的表

  1. -- =================================
  2. -- 定义SourceView
  3. -- =================================
  4. -- 生成数据
  5. CREATE TABLE clicks_datagen (
  6. `user` VARCHAR COMMENT '用户名称',
  7. `cTime` AS LOCALTIMESTAMP COMMENT '点击时间',
  8. `url` VARCHAR COMMENT '点击链接',
  9. `procTime` AS PROCTIME() COMMENT '处理时间',
  10. WATERMARK FOR `cTime` AS `cTime` - INTERVAL '5' SECOND
  11. )
  12. COMMENT '用户点击表'
  13. WITH (
  14. 'connector' = 'datagen',
  15. 'rows-per-second' = '100',
  16. 'fields.user.kind' = 'random',
  17. 'fields.user.length' = '5',
  18. 'fields.url.kind' = 'random',
  19. 'fields.url.length' = '5'
  20. );
  21. -- 转化生成的数据
  22. CREATE VIEW clicks_datagen_view AS
  23. SELECT
  24. `user`,
  25. `cTime`,
  26. generate_url(`url`) as `url`,
  27. `procTime`
  28. FROM clicks_datagen;
  29. -- blackhole 写入这里的数据都会ignore
  30. CREATE TABLE clicks_blackhole (
  31. `user` VARCHAR COMMENT '用户名称',
  32. `cTime` TIMESTAMP COMMENT '点击时间',
  33. `url` VARCHAR COMMENT '点击链接'
  34. )
  35. COMMENT '用户点击表'
  36. WITH (
  37. 'connector' = 'blackhole'
  38. );

datagen —> hermes —> blackhole

  1. CREATE TABLE clicks_hermes (
  2. `user` VARCHAR COMMENT '用户名称',
  3. `cTime` TIMESTAMP COMMENT '点击时间',
  4. `url` VARCHAR COMMENT '点击链接'
  5. )
  6. COMMENT '用户点击表'
  7. WITH (
  8. 'connector' = 'hermes',
  9. 'properties.hermes.center' = 'http://hermes-center.middleware.wse.test.wacai.info',
  10. 'properties.hermes.cluster.id' = 'test',
  11. 'properties.group.id' = 'flink-test',
  12. 'scan.startup.mode' = 'latest-offset',
  13. 'topic' = 'merlin.flink.clicks',
  14. 'format' = 'json'
  15. );
  16. -- datagen --> hermes --> blackhole
  17. INSERT INTO clicks_hermes
  18. SELECT
  19. `user`,
  20. `cTime`,
  21. `url`
  22. FROM clicks_datagen_view;
  23. INSERT INTO clicks_blackhole
  24. SELECT * FROM clicks_hermes;

datagen —> kafka sink —> kafka raw source —> hdfs

  1. -- kafka topic
  2. CREATE TABLE clicks_kafka (
  3. `user` VARCHAR COMMENT '用户名称',
  4. `cTime` TIMESTAMP COMMENT '点击时间',
  5. `url` VARCHAR COMMENT '点击链接'
  6. )
  7. COMMENT '用户点击表'
  8. WITH (
  9. 'connector' = 'kafka',
  10. 'properties.bootstrap.servers' = '172.16.48.194:9092',
  11. 'properties.transactional.id' = 'flink1.11_test-transaction',
  12. 'properties.acks' = 'all',
  13. 'properties.isolation.level' = 'read_committed',
  14. 'properties.group.id' = 'flink-test',
  15. 'scan.startup.mode' = 'latest-offset',
  16. 'topic' = 'merlin.flink.clicks',
  17. 'format' = 'json'
  18. );
  19. -- kafka topicformat格式是raw,不解析内容
  20. CREATE TABLE clicks_kafka_source_raw (
  21. `content` BYTES,
  22. `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
  23. WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND
  24. )
  25. COMMENT '用户点击表'
  26. WITH (
  27. 'connector' = 'kafka',
  28. 'properties.bootstrap.servers' = '172.16.48.194:9092',
  29. 'properties.transactional.id' = 'flink1.11_test-transaction',
  30. 'properties.acks' = 'all',
  31. 'properties.isolation.level' = 'read_committed',
  32. 'properties.group.id' = 'flink-test',
  33. 'scan.startup.mode' = 'latest-offset',
  34. 'topic' = 'merlin.flink.clicks',
  35. 'format' = 'raw'
  36. );
  37. -- hdfs sink
  38. CREATE TABLE clicks_hdfs (
  39. `content` BYTES,
  40. `dt` VARCHAR,
  41. `hour` VARCHAR
  42. )
  43. COMMENT '用户点击表'
  44. PARTITIONED BY (`dt`, `hour`) WITH (
  45. 'connector' = 'filesystem',
  46. 'path' = 'hdfs://172.16.48.23:8020/shuidi/flink/tables/clicks',
  47. 'sink.partition-commit.trigger' = 'partition-time',
  48. 'sink.partition-commit.delay' = '1 h',
  49. 'sink.rolling-policy.rollover-interval' = '60 min',
  50. 'partition.time-extractor.timestamp-pattern' = '$dt $hour:00:00',
  51. 'sink.partition-commit.policy.kind' = 'success-file',
  52. 'sink.partition-commit.success-file.name' = '_success',
  53. 'format' = 'raw'
  54. );
  55. -- datagen --> kafka sink --> kafka raw source --> hdfs
  56. INSERT INTO clicks_kafka
  57. SELECT
  58. `user`,
  59. `cTime`,
  60. `url`
  61. FROM clicks_datagen_view;
  62. INSERT INTO clicks_hdfs
  63. SELECT
  64. `content`,
  65. CAST(CAST(`ts` AS DATE) AS VARCHAR),
  66. CAST(HOUR(`ts`) AS VARCHAR)
  67. FROM clicks_kafka_source_raw;

datagen —> pulsar

  1. -- pulsar sink
  2. CREATE TABLE clicks_pulsar (
  3. `user` VARCHAR COMMENT '用户名称',
  4. `cTime` VARCHAR COMMENT '点击时间',
  5. `url` VARCHAR COMMENT '点击链接'
  6. )
  7. COMMENT '用户点击表'
  8. WITH (
  9. 'connector' = 'pulsar',
  10. 'topic' = 'persistent://merlin/flink-sql/clicks',
  11. 'service-url' = 'pulsar://172.16.48.194:6650',
  12. 'admin-url' = 'http://172.16.48.194:8080',
  13. 'format' = 'json'
  14. );
  15. -- datagen --> pulsar
  16. INSERT INTO clicks_pulsar
  17. SELECT
  18. `user`,
  19. CAST(`cTime` AS VARCHAR),
  20. `url`
  21. FROM clicks_datagen_view;

datagen —> elasticsearch

  1. -- elasticsearch 6
  2. -- =================================
  3. -- elasticsearch sink
  4. CREATE TABLE clicks_es6(
  5. `user` VARCHAR COMMENT '用户名称',
  6. `cTime` TIMESTAMP COMMENT '点击时间',
  7. `url` VARCHAR COMMENT '点击链接'
  8. ) WITH (
  9. 'connector' = 'elasticsearch-6',
  10. 'hosts' = 'http://192.168.5.16:9200;http://192.168.5.17:9200',
  11. 'index' = 'merlin_flink_clicks-{cTime|yyyy-MM-dd}'
  12. );
  13. -- datagen --> kafka --> es6
  14. INSERT INTO clicks_es6
  15. SELECT
  16. `user`,
  17. `cTime`,
  18. `url`
  19. FROM clicks_kafka;
  20. -- elasticsearch 7
  21. -- =================================
  22. -- elasticsearch sink
  23. CREATE TABLE clicks_es7(
  24. `user` VARCHAR COMMENT '用户名称',
  25. `cTime` TIMESTAMP COMMENT '点击时间',
  26. `url` VARCHAR COMMENT '点击链接'
  27. ) WITH (
  28. 'connector' = 'elasticsearch-7',
  29. 'hosts' = 'http://172.16.48.194:9200',
  30. 'index' = 'merlin_flink_clicks-{cTime|yyyy-MM-dd}'
  31. );
  32. -- datagen --> kafka --> es7
  33. INSERT INTO clicks_es7
  34. SELECT
  35. `user`,
  36. `cTime`,
  37. `url`
  38. FROM clicks_kafka;

datagen —> kafka upsert —> blackhole

  1. CREATE TABLE clicks_upsert_kafka (
  2. `user` VARCHAR COMMENT '用户名称',
  3. `cTime` TIMESTAMP COMMENT '点击时间',
  4. `url` VARCHAR COMMENT '点击链接',
  5. PRIMARY KEY (`user`) NOT ENFORCED
  6. )
  7. COMMENT '用户点击表'
  8. WITH (
  9. 'connector' = 'upsert-kafka',
  10. 'properties.bootstrap.servers' = '172.16.48.194:9092',
  11. 'properties.group.id' = 'flink-test',
  12. 'topic' = 'merlin.flink.clicks.upsert',
  13. 'key.format' = 'json',
  14. 'value.format' = 'json',
  15. 'value.fields-include' = 'EXCEPT_KEY'
  16. );
  17. -- datagen --> kafka upsert --> blackhole
  18. INSERT INTO clicks_upsert_kafka
  19. SELECT
  20. `user`,
  21. `cTime`,
  22. `url`
  23. FROM clicks_datagen_view;
  24. INSERT INTO clicks_blackhole
  25. SELECT * FROM clicks_upsert_kafka;

更新数据实例

按天统计更新数据

SQL

  1. CREATE TABLE teacher_student_relation_source(
  2. `id` BIGINT COMMENT '主键id',
  3. `teacher_id` BIGINT COMMENT '讲师id',
  4. `student_id` BIGINT COMMENT '学生id',
  5. `status` INT COMMENT '状态,1表示绑定,0表示解绑',
  6. `talk_times` INT COMMENT '交流次数',
  7. `relation_level` INT COMMENT '关系等级,值越高等级越强',
  8. `created_time` TIMESTAMP COMMENT '创建时间',
  9. `updated_time` TIMESTAMP COMMENT '更新时间',
  10. WATERMARK FOR `updated_time` AS `updated_time`
  11. ) WITH (
  12. 'connector' = 'kafka',
  13. 'topic' = 'teacher.student.relation',
  14. 'properties.bootstrap.servers' = '172.16.48.194:9092',
  15. 'properties.group.id' = 'flink-test',
  16. 'scan.startup.mode' = 'earliest-offset',
  17. 'format' = 'debezium-json'
  18. );
  19. CREATE TABLE print_sink(
  20. `dt` DATE COMMENT '日期',
  21. `teacher_id` BIGINT COMMENT '讲师id',
  22. `cnt` BIGINT COMMENT '个数'
  23. ) WITH (
  24. 'connector' = 'print'
  25. );
  26. INSERT INTO print_sink
  27. SELECT
  28. CAST(`updated_time` AS DATE) AS `dt`,
  29. `teacher_id`,
  30. count(*) AS cnt
  31. FROM teacher_student_relation_source
  32. WHERE `status` = 1
  33. GROUP BY CAST(`updated_time` AS DATE), `teacher_id`;

输入

  1. {"after":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":1,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 11:21:10"},"op":"c"}
  2. {"after":{"id":2,"teacher_id":1,"student_id":2,"status":1,"talk_times":2,"relation_level":2,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 11:21:10"},"op":"c"}
  3. {"after":{"id":3,"teacher_id":1,"student_id":3,"status":1,"talk_times":3,"relation_level":3,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 11:21:10"},"op":"c"}
  4. {"before":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":1,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 11:21:10"},"after":{"id":1,"teacher_id":1,"student_id":1,"status":0,"talk_times":1,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 13:21:10"},"op":"u"}
  5. {"before":{"id":1,"teacher_id":1,"student_id":1,"status":0,"talk_times":1,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 13:21:10"},"after":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":1,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 14:21:10"},"op":"u"}
  6. {"before":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":1,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 14:21:10"},"after":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":2,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 14:22:10"},"op":"u"}
  7. {"before":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":2,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 14:22:10"},"after":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":100,"relation_level":2,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 14:40:10"},"op":"u"}
  8. {"before":{"id":2,"teacher_id":1,"student_id":2,"status":1,"talk_times":2,"relation_level":2,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 15:21:10"},"op":"d"}
  9. {"before":{"id":3,"teacher_id":1,"student_id":3,"status":1,"talk_times":3,"relation_level":3,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 11:21:10"},"after":{"id":3,"teacher_id":1,"student_id":3,"status":1,"talk_times":3,"relation_level":3,"created_time":"2021-09-30 11:21:10","updated_time":"2021-10-01 11:21:10"},"op":"u"}

输出

  1. +I(2021-09-30,1,1)
  2. -U(2021-09-30,1,1)
  3. +U(2021-09-30,1,2)
  4. -U(2021-09-30,1,2)
  5. +U(2021-09-30,1,3)
  6. -U(2021-09-30,1,3)
  7. +U(2021-09-30,1,2)
  8. -U(2021-09-30,1,2)
  9. +U(2021-09-30,1,3)
  10. -U(2021-09-30,1,3)
  11. +U(2021-09-30,1,2)
  12. -U(2021-09-30,1,2)
  13. +U(2021-09-30,1,3)
  14. -U(2021-09-30,1,3)
  15. +U(2021-09-30,1,2)
  16. -U(2021-09-30,1,2)
  17. +U(2021-09-30,1,3)
  18. -U(2021-09-30,1,3)
  19. +U(2021-09-30,1,2)
  20. -U(2021-09-30,1,2)
  21. +U(2021-09-30,1,1)
  22. +I(2021-10-01,1,1)