运行环境配置
集群配置
execution.checkpointing.unaligned=trueexecution.checkpointing.interval=10 mincontainerized.taskmanager.env.PYTHONPATH=/home/appweb/shuidi/software/python
Python配置
import random@udf(result_type=DataTypes.STRING(), func_type="pandas")def generate_url(column): urls = [ 'http://iceye.wacai.info/stream-schema/info', 'http://druid.router.wacai.info/unified-console.html#query', 'http://merlin-wdp.wacai.info/flink/job/list', 'http://cdhmanager.stanlee.wacai.info/cmf/login', 'http://middleware.wacai.info/hermes-web/dashboard', 'http://streaming-yarn.stanlee.wacai.info/cluster/apps/RUNNING', 'http://team.caimi-inc.com/', 'https://loan.wacai.info/wups/roleApply', 'http://mario.wacai.info/home#', 'http://biuc.wacai.info/', 'https://logs.wacai.info/app/kibana#/home?_g=()', 'http://awm.wacai.info/awm/notebook', 'http://grafana.wacai.info/?orgId=1', 'http://wafe.qsh.wacai.info/configMgr', 'http://sensordata.wacai.info/login/index.html', 'http://zeppelin.wacai.info/#/', 'http://prometheus-qsh.wacai.info/graph', 'http://bitask.caimi-inc.com/index.php?m=script&a=index#', 'http://platform.wacai.info/prophet-node/job', 'http://atlas.wacai.info/index.html#!/glossary' ] return column.map(lambda item: random.choice(urls))st_env.drop_temporary_function('generate_url')st_env.create_temporary_function('generate_url', generate_url)
SQL配置
SQL运行配置
-- SQL运行配置SET table.dynamic-table-options.enabled=true; -- 开启 SQL hints功能SET table.exec.mini-batch.enabled=true; -- 开启mini-batchSET table.exec.mini-batch.allow-latency=5 s;SET table.exec.mini-batch.size=5000;SET table.exec.state.ttl=48 h; -- 设置State保存时长SET python.executable=/data/program/miniconda3/envs/pyflink/bin/python; -- 设计python运行程序路径
创建Catalog
CREATE CATALOG iceberg WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://172.16.48.191:9083', 'clients'='5', 'property-version'='1', 'hive-conf-dir'='/data/program/flink-114/conf/iceberg');-- flink jdbc url参数配置-- jdbc:mysql://192.168.5.12:3307?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&useServerPrepStmts=true&cachePrepStmts=true&allowMultiQueries=true&useConfigs=maxPerformanceCREATE CATALOG jdbc WITH( 'type' = 'jdbc', 'default-database' = 'data_dict', 'username' = 'qa_conn', 'password' = 'qa_conn', 'base-url' = 'jdbc:mysql://192.168.5.12:3307');CREATE CATALOG mysql_cdc WITH( 'type' = 'mysql-cdc', 'default-database' = 'wac_edu', 'hostname' = '192.168.5.14', 'port' = '3342', 'username' = 'qa_conn', 'password' = 'qa_conn');
创建基本的表
-- =================================-- 定义Source,View-- =================================-- 生成数据CREATE TABLE clicks_datagen ( `user` VARCHAR COMMENT '用户名称', `cTime` AS LOCALTIMESTAMP COMMENT '点击时间', `url` VARCHAR COMMENT '点击链接', `procTime` AS PROCTIME() COMMENT '处理时间', WATERMARK FOR `cTime` AS `cTime` - INTERVAL '5' SECOND)COMMENT '用户点击表'WITH ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.user.kind' = 'random', 'fields.user.length' = '5', 'fields.url.kind' = 'random', 'fields.url.length' = '5');-- 转化生成的数据CREATE VIEW clicks_datagen_view ASSELECT `user`, `cTime`, generate_url(`url`) as `url`, `procTime`FROM clicks_datagen;-- blackhole 写入这里的数据都会ignoreCREATE TABLE clicks_blackhole ( `user` VARCHAR COMMENT '用户名称', `cTime` TIMESTAMP COMMENT '点击时间', `url` VARCHAR COMMENT '点击链接')COMMENT '用户点击表'WITH ( 'connector' = 'blackhole');
datagen —> hermes —> blackhole
CREATE TABLE clicks_hermes ( `user` VARCHAR COMMENT '用户名称', `cTime` TIMESTAMP COMMENT '点击时间', `url` VARCHAR COMMENT '点击链接')COMMENT '用户点击表'WITH ( 'connector' = 'hermes', 'properties.hermes.center' = 'http://hermes-center.middleware.wse.test.wacai.info', 'properties.hermes.cluster.id' = 'test', 'properties.group.id' = 'flink-test', 'scan.startup.mode' = 'latest-offset', 'topic' = 'merlin.flink.clicks', 'format' = 'json');-- datagen --> hermes --> blackholeINSERT INTO clicks_hermesSELECT `user`, `cTime`, `url`FROM clicks_datagen_view;INSERT INTO clicks_blackholeSELECT * FROM clicks_hermes;
datagen —> kafka sink —> kafka raw source —> hdfs
-- kafka topicCREATE TABLE clicks_kafka ( `user` VARCHAR COMMENT '用户名称', `cTime` TIMESTAMP COMMENT '点击时间', `url` VARCHAR COMMENT '点击链接')COMMENT '用户点击表'WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '172.16.48.194:9092', 'properties.transactional.id' = 'flink1.11_test-transaction', 'properties.acks' = 'all', 'properties.isolation.level' = 'read_committed', 'properties.group.id' = 'flink-test', 'scan.startup.mode' = 'latest-offset', 'topic' = 'merlin.flink.clicks', 'format' = 'json');-- kafka topic,format格式是raw,不解析内容CREATE TABLE clicks_kafka_source_raw ( `content` BYTES, `ts` TIMESTAMP(3) METADATA FROM 'timestamp', WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND)COMMENT '用户点击表'WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '172.16.48.194:9092', 'properties.transactional.id' = 'flink1.11_test-transaction', 'properties.acks' = 'all', 'properties.isolation.level' = 'read_committed', 'properties.group.id' = 'flink-test', 'scan.startup.mode' = 'latest-offset', 'topic' = 'merlin.flink.clicks', 'format' = 'raw');-- hdfs sinkCREATE TABLE clicks_hdfs ( `content` BYTES, `dt` VARCHAR, `hour` VARCHAR)COMMENT '用户点击表'PARTITIONED BY (`dt`, `hour`) WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://172.16.48.23:8020/shuidi/flink/tables/clicks', 'sink.partition-commit.trigger' = 'partition-time', 'sink.partition-commit.delay' = '1 h', 'sink.rolling-policy.rollover-interval' = '60 min', 'partition.time-extractor.timestamp-pattern' = '$dt $hour:00:00', 'sink.partition-commit.policy.kind' = 'success-file', 'sink.partition-commit.success-file.name' = '_success', 'format' = 'raw');-- datagen --> kafka sink --> kafka raw source --> hdfsINSERT INTO clicks_kafkaSELECT `user`, `cTime`, `url`FROM clicks_datagen_view;INSERT INTO clicks_hdfsSELECT `content`, CAST(CAST(`ts` AS DATE) AS VARCHAR), CAST(HOUR(`ts`) AS VARCHAR)FROM clicks_kafka_source_raw;
datagen —> pulsar
-- pulsar sinkCREATE TABLE clicks_pulsar ( `user` VARCHAR COMMENT '用户名称', `cTime` VARCHAR COMMENT '点击时间', `url` VARCHAR COMMENT '点击链接')COMMENT '用户点击表'WITH ( 'connector' = 'pulsar', 'topic' = 'persistent://merlin/flink-sql/clicks', 'service-url' = 'pulsar://172.16.48.194:6650', 'admin-url' = 'http://172.16.48.194:8080', 'format' = 'json');-- datagen --> pulsarINSERT INTO clicks_pulsarSELECT `user`, CAST(`cTime` AS VARCHAR), `url`FROM clicks_datagen_view;
datagen —> elasticsearch
-- elasticsearch 6-- =================================-- elasticsearch sinkCREATE TABLE clicks_es6( `user` VARCHAR COMMENT '用户名称', `cTime` TIMESTAMP COMMENT '点击时间', `url` VARCHAR COMMENT '点击链接') WITH ( 'connector' = 'elasticsearch-6', 'hosts' = 'http://192.168.5.16:9200;http://192.168.5.17:9200', 'index' = 'merlin_flink_clicks-{cTime|yyyy-MM-dd}');-- datagen --> kafka --> es6INSERT INTO clicks_es6SELECT `user`, `cTime`, `url`FROM clicks_kafka;-- elasticsearch 7-- =================================-- elasticsearch sinkCREATE TABLE clicks_es7( `user` VARCHAR COMMENT '用户名称', `cTime` TIMESTAMP COMMENT '点击时间', `url` VARCHAR COMMENT '点击链接') WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://172.16.48.194:9200', 'index' = 'merlin_flink_clicks-{cTime|yyyy-MM-dd}');-- datagen --> kafka --> es7INSERT INTO clicks_es7SELECT `user`, `cTime`, `url`FROM clicks_kafka;
datagen —> kafka upsert —> blackhole
CREATE TABLE clicks_upsert_kafka ( `user` VARCHAR COMMENT '用户名称', `cTime` TIMESTAMP COMMENT '点击时间', `url` VARCHAR COMMENT '点击链接', PRIMARY KEY (`user`) NOT ENFORCED)COMMENT '用户点击表'WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = '172.16.48.194:9092', 'properties.group.id' = 'flink-test', 'topic' = 'merlin.flink.clicks.upsert', 'key.format' = 'json', 'value.format' = 'json', 'value.fields-include' = 'EXCEPT_KEY');-- datagen --> kafka upsert --> blackholeINSERT INTO clicks_upsert_kafkaSELECT `user`, `cTime`, `url`FROM clicks_datagen_view;INSERT INTO clicks_blackholeSELECT * FROM clicks_upsert_kafka;
更新数据实例
按天统计更新数据
SQL
CREATE TABLE teacher_student_relation_source( `id` BIGINT COMMENT '主键id', `teacher_id` BIGINT COMMENT '讲师id', `student_id` BIGINT COMMENT '学生id', `status` INT COMMENT '状态,1表示绑定,0表示解绑', `talk_times` INT COMMENT '交流次数', `relation_level` INT COMMENT '关系等级,值越高等级越强', `created_time` TIMESTAMP COMMENT '创建时间', `updated_time` TIMESTAMP COMMENT '更新时间', WATERMARK FOR `updated_time` AS `updated_time`) WITH ( 'connector' = 'kafka', 'topic' = 'teacher.student.relation', 'properties.bootstrap.servers' = '172.16.48.194:9092', 'properties.group.id' = 'flink-test', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json');CREATE TABLE print_sink( `dt` DATE COMMENT '日期', `teacher_id` BIGINT COMMENT '讲师id', `cnt` BIGINT COMMENT '个数') WITH ( 'connector' = 'print');INSERT INTO print_sinkSELECT CAST(`updated_time` AS DATE) AS `dt`, `teacher_id`, count(*) AS cntFROM teacher_student_relation_sourceWHERE `status` = 1GROUP BY CAST(`updated_time` AS DATE), `teacher_id`;
输入
{"after":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":1,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 11:21:10"},"op":"c"}{"after":{"id":2,"teacher_id":1,"student_id":2,"status":1,"talk_times":2,"relation_level":2,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 11:21:10"},"op":"c"}{"after":{"id":3,"teacher_id":1,"student_id":3,"status":1,"talk_times":3,"relation_level":3,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 11:21:10"},"op":"c"}{"before":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":1,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 11:21:10"},"after":{"id":1,"teacher_id":1,"student_id":1,"status":0,"talk_times":1,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 13:21:10"},"op":"u"}{"before":{"id":1,"teacher_id":1,"student_id":1,"status":0,"talk_times":1,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 13:21:10"},"after":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":1,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 14:21:10"},"op":"u"}{"before":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":1,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 14:21:10"},"after":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":2,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 14:22:10"},"op":"u"}{"before":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":2,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 14:22:10"},"after":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":100,"relation_level":2,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 14:40:10"},"op":"u"}{"before":{"id":2,"teacher_id":1,"student_id":2,"status":1,"talk_times":2,"relation_level":2,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 15:21:10"},"op":"d"}{"before":{"id":3,"teacher_id":1,"student_id":3,"status":1,"talk_times":3,"relation_level":3,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 11:21:10"},"after":{"id":3,"teacher_id":1,"student_id":3,"status":1,"talk_times":3,"relation_level":3,"created_time":"2021-09-30 11:21:10","updated_time":"2021-10-01 11:21:10"},"op":"u"}
输出
+I(2021-09-30,1,1)-U(2021-09-30,1,1)+U(2021-09-30,1,2)-U(2021-09-30,1,2)+U(2021-09-30,1,3)-U(2021-09-30,1,3)+U(2021-09-30,1,2)-U(2021-09-30,1,2)+U(2021-09-30,1,3)-U(2021-09-30,1,3)+U(2021-09-30,1,2)-U(2021-09-30,1,2)+U(2021-09-30,1,3)-U(2021-09-30,1,3)+U(2021-09-30,1,2)-U(2021-09-30,1,2)+U(2021-09-30,1,3)-U(2021-09-30,1,3)+U(2021-09-30,1,2)-U(2021-09-30,1,2)+U(2021-09-30,1,1)+I(2021-10-01,1,1)