运行环境配置
集群配置
execution.checkpointing.unaligned=true
execution.checkpointing.interval=10 min
containerized.taskmanager.env.PYTHONPATH=/home/appweb/shuidi/software/python
Python配置
import random
@udf(result_type=DataTypes.STRING(), func_type="pandas")
def generate_url(column):
urls = [
'http://iceye.wacai.info/stream-schema/info',
'http://druid.router.wacai.info/unified-console.html#query',
'http://merlin-wdp.wacai.info/flink/job/list',
'http://cdhmanager.stanlee.wacai.info/cmf/login',
'http://middleware.wacai.info/hermes-web/dashboard',
'http://streaming-yarn.stanlee.wacai.info/cluster/apps/RUNNING',
'http://team.caimi-inc.com/',
'https://loan.wacai.info/wups/roleApply',
'http://mario.wacai.info/home#',
'http://biuc.wacai.info/',
'https://logs.wacai.info/app/kibana#/home?_g=()',
'http://awm.wacai.info/awm/notebook',
'http://grafana.wacai.info/?orgId=1',
'http://wafe.qsh.wacai.info/configMgr',
'http://sensordata.wacai.info/login/index.html',
'http://zeppelin.wacai.info/#/',
'http://prometheus-qsh.wacai.info/graph',
'http://bitask.caimi-inc.com/index.php?m=script&a=index#',
'http://platform.wacai.info/prophet-node/job',
'http://atlas.wacai.info/index.html#!/glossary'
]
return column.map(lambda item: random.choice(urls))
st_env.drop_temporary_function('generate_url')
st_env.create_temporary_function('generate_url', generate_url)
SQL配置
SQL运行配置
-- SQL运行配置
SET table.dynamic-table-options.enabled=true; -- 开启 SQL hints功能
SET table.exec.mini-batch.enabled=true; -- 开启mini-batch
SET table.exec.mini-batch.allow-latency=5 s;
SET table.exec.mini-batch.size=5000;
SET table.exec.state.ttl=48 h; -- 设置State保存时长
SET python.executable=/data/program/miniconda3/envs/pyflink/bin/python; -- 设计python运行程序路径
创建Catalog
CREATE CATALOG iceberg WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://172.16.48.191:9083',
'clients'='5',
'property-version'='1',
'hive-conf-dir'='/data/program/flink-114/conf/iceberg'
);
-- flink jdbc url参数配置
-- jdbc:mysql://192.168.5.12:3307?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&useServerPrepStmts=true&cachePrepStmts=true&allowMultiQueries=true&useConfigs=maxPerformance
CREATE CATALOG jdbc WITH(
'type' = 'jdbc',
'default-database' = 'data_dict',
'username' = 'qa_conn',
'password' = 'qa_conn',
'base-url' = 'jdbc:mysql://192.168.5.12:3307'
);
CREATE CATALOG mysql_cdc WITH(
'type' = 'mysql-cdc',
'default-database' = 'wac_edu',
'hostname' = '192.168.5.14',
'port' = '3342',
'username' = 'qa_conn',
'password' = 'qa_conn'
);
创建基本的表
-- =================================
-- 定义Source,View
-- =================================
-- 生成数据
CREATE TABLE clicks_datagen (
`user` VARCHAR COMMENT '用户名称',
`cTime` AS LOCALTIMESTAMP COMMENT '点击时间',
`url` VARCHAR COMMENT '点击链接',
`procTime` AS PROCTIME() COMMENT '处理时间',
WATERMARK FOR `cTime` AS `cTime` - INTERVAL '5' SECOND
)
COMMENT '用户点击表'
WITH (
'connector' = 'datagen',
'rows-per-second' = '100',
'fields.user.kind' = 'random',
'fields.user.length' = '5',
'fields.url.kind' = 'random',
'fields.url.length' = '5'
);
-- 转化生成的数据
CREATE VIEW clicks_datagen_view AS
SELECT
`user`,
`cTime`,
generate_url(`url`) as `url`,
`procTime`
FROM clicks_datagen;
-- blackhole 写入这里的数据都会ignore
CREATE TABLE clicks_blackhole (
`user` VARCHAR COMMENT '用户名称',
`cTime` TIMESTAMP COMMENT '点击时间',
`url` VARCHAR COMMENT '点击链接'
)
COMMENT '用户点击表'
WITH (
'connector' = 'blackhole'
);
datagen —> hermes —> blackhole
CREATE TABLE clicks_hermes (
`user` VARCHAR COMMENT '用户名称',
`cTime` TIMESTAMP COMMENT '点击时间',
`url` VARCHAR COMMENT '点击链接'
)
COMMENT '用户点击表'
WITH (
'connector' = 'hermes',
'properties.hermes.center' = 'http://hermes-center.middleware.wse.test.wacai.info',
'properties.hermes.cluster.id' = 'test',
'properties.group.id' = 'flink-test',
'scan.startup.mode' = 'latest-offset',
'topic' = 'merlin.flink.clicks',
'format' = 'json'
);
-- datagen --> hermes --> blackhole
INSERT INTO clicks_hermes
SELECT
`user`,
`cTime`,
`url`
FROM clicks_datagen_view;
INSERT INTO clicks_blackhole
SELECT * FROM clicks_hermes;
datagen —> kafka sink —> kafka raw source —> hdfs
-- kafka topic
CREATE TABLE clicks_kafka (
`user` VARCHAR COMMENT '用户名称',
`cTime` TIMESTAMP COMMENT '点击时间',
`url` VARCHAR COMMENT '点击链接'
)
COMMENT '用户点击表'
WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '172.16.48.194:9092',
'properties.transactional.id' = 'flink1.11_test-transaction',
'properties.acks' = 'all',
'properties.isolation.level' = 'read_committed',
'properties.group.id' = 'flink-test',
'scan.startup.mode' = 'latest-offset',
'topic' = 'merlin.flink.clicks',
'format' = 'json'
);
-- kafka topic,format格式是raw,不解析内容
CREATE TABLE clicks_kafka_source_raw (
`content` BYTES,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND
)
COMMENT '用户点击表'
WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '172.16.48.194:9092',
'properties.transactional.id' = 'flink1.11_test-transaction',
'properties.acks' = 'all',
'properties.isolation.level' = 'read_committed',
'properties.group.id' = 'flink-test',
'scan.startup.mode' = 'latest-offset',
'topic' = 'merlin.flink.clicks',
'format' = 'raw'
);
-- hdfs sink
CREATE TABLE clicks_hdfs (
`content` BYTES,
`dt` VARCHAR,
`hour` VARCHAR
)
COMMENT '用户点击表'
PARTITIONED BY (`dt`, `hour`) WITH (
'connector' = 'filesystem',
'path' = 'hdfs://172.16.48.23:8020/shuidi/flink/tables/clicks',
'sink.partition-commit.trigger' = 'partition-time',
'sink.partition-commit.delay' = '1 h',
'sink.rolling-policy.rollover-interval' = '60 min',
'partition.time-extractor.timestamp-pattern' = '$dt $hour:00:00',
'sink.partition-commit.policy.kind' = 'success-file',
'sink.partition-commit.success-file.name' = '_success',
'format' = 'raw'
);
-- datagen --> kafka sink --> kafka raw source --> hdfs
INSERT INTO clicks_kafka
SELECT
`user`,
`cTime`,
`url`
FROM clicks_datagen_view;
INSERT INTO clicks_hdfs
SELECT
`content`,
CAST(CAST(`ts` AS DATE) AS VARCHAR),
CAST(HOUR(`ts`) AS VARCHAR)
FROM clicks_kafka_source_raw;
datagen —> pulsar
-- pulsar sink
CREATE TABLE clicks_pulsar (
`user` VARCHAR COMMENT '用户名称',
`cTime` VARCHAR COMMENT '点击时间',
`url` VARCHAR COMMENT '点击链接'
)
COMMENT '用户点击表'
WITH (
'connector' = 'pulsar',
'topic' = 'persistent://merlin/flink-sql/clicks',
'service-url' = 'pulsar://172.16.48.194:6650',
'admin-url' = 'http://172.16.48.194:8080',
'format' = 'json'
);
-- datagen --> pulsar
INSERT INTO clicks_pulsar
SELECT
`user`,
CAST(`cTime` AS VARCHAR),
`url`
FROM clicks_datagen_view;
datagen —> elasticsearch
-- elasticsearch 6
-- =================================
-- elasticsearch sink
CREATE TABLE clicks_es6(
`user` VARCHAR COMMENT '用户名称',
`cTime` TIMESTAMP COMMENT '点击时间',
`url` VARCHAR COMMENT '点击链接'
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = 'http://192.168.5.16:9200;http://192.168.5.17:9200',
'index' = 'merlin_flink_clicks-{cTime|yyyy-MM-dd}'
);
-- datagen --> kafka --> es6
INSERT INTO clicks_es6
SELECT
`user`,
`cTime`,
`url`
FROM clicks_kafka;
-- elasticsearch 7
-- =================================
-- elasticsearch sink
CREATE TABLE clicks_es7(
`user` VARCHAR COMMENT '用户名称',
`cTime` TIMESTAMP COMMENT '点击时间',
`url` VARCHAR COMMENT '点击链接'
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://172.16.48.194:9200',
'index' = 'merlin_flink_clicks-{cTime|yyyy-MM-dd}'
);
-- datagen --> kafka --> es7
INSERT INTO clicks_es7
SELECT
`user`,
`cTime`,
`url`
FROM clicks_kafka;
datagen —> kafka upsert —> blackhole
CREATE TABLE clicks_upsert_kafka (
`user` VARCHAR COMMENT '用户名称',
`cTime` TIMESTAMP COMMENT '点击时间',
`url` VARCHAR COMMENT '点击链接',
PRIMARY KEY (`user`) NOT ENFORCED
)
COMMENT '用户点击表'
WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = '172.16.48.194:9092',
'properties.group.id' = 'flink-test',
'topic' = 'merlin.flink.clicks.upsert',
'key.format' = 'json',
'value.format' = 'json',
'value.fields-include' = 'EXCEPT_KEY'
);
-- datagen --> kafka upsert --> blackhole
INSERT INTO clicks_upsert_kafka
SELECT
`user`,
`cTime`,
`url`
FROM clicks_datagen_view;
INSERT INTO clicks_blackhole
SELECT * FROM clicks_upsert_kafka;
更新数据实例
按天统计更新数据
SQL
CREATE TABLE teacher_student_relation_source(
`id` BIGINT COMMENT '主键id',
`teacher_id` BIGINT COMMENT '讲师id',
`student_id` BIGINT COMMENT '学生id',
`status` INT COMMENT '状态,1表示绑定,0表示解绑',
`talk_times` INT COMMENT '交流次数',
`relation_level` INT COMMENT '关系等级,值越高等级越强',
`created_time` TIMESTAMP COMMENT '创建时间',
`updated_time` TIMESTAMP COMMENT '更新时间',
WATERMARK FOR `updated_time` AS `updated_time`
) WITH (
'connector' = 'kafka',
'topic' = 'teacher.student.relation',
'properties.bootstrap.servers' = '172.16.48.194:9092',
'properties.group.id' = 'flink-test',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-json'
);
CREATE TABLE print_sink(
`dt` DATE COMMENT '日期',
`teacher_id` BIGINT COMMENT '讲师id',
`cnt` BIGINT COMMENT '个数'
) WITH (
'connector' = 'print'
);
INSERT INTO print_sink
SELECT
CAST(`updated_time` AS DATE) AS `dt`,
`teacher_id`,
count(*) AS cnt
FROM teacher_student_relation_source
WHERE `status` = 1
GROUP BY CAST(`updated_time` AS DATE), `teacher_id`;
输入
{"after":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":1,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 11:21:10"},"op":"c"}
{"after":{"id":2,"teacher_id":1,"student_id":2,"status":1,"talk_times":2,"relation_level":2,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 11:21:10"},"op":"c"}
{"after":{"id":3,"teacher_id":1,"student_id":3,"status":1,"talk_times":3,"relation_level":3,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 11:21:10"},"op":"c"}
{"before":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":1,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 11:21:10"},"after":{"id":1,"teacher_id":1,"student_id":1,"status":0,"talk_times":1,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 13:21:10"},"op":"u"}
{"before":{"id":1,"teacher_id":1,"student_id":1,"status":0,"talk_times":1,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 13:21:10"},"after":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":1,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 14:21:10"},"op":"u"}
{"before":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":1,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 14:21:10"},"after":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":2,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 14:22:10"},"op":"u"}
{"before":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":2,"relation_level":1,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 14:22:10"},"after":{"id":1,"teacher_id":1,"student_id":1,"status":1,"talk_times":100,"relation_level":2,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 14:40:10"},"op":"u"}
{"before":{"id":2,"teacher_id":1,"student_id":2,"status":1,"talk_times":2,"relation_level":2,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 15:21:10"},"op":"d"}
{"before":{"id":3,"teacher_id":1,"student_id":3,"status":1,"talk_times":3,"relation_level":3,"created_time":"2021-09-30 11:21:10","updated_time":"2021-09-30 11:21:10"},"after":{"id":3,"teacher_id":1,"student_id":3,"status":1,"talk_times":3,"relation_level":3,"created_time":"2021-09-30 11:21:10","updated_time":"2021-10-01 11:21:10"},"op":"u"}
输出
+I(2021-09-30,1,1)
-U(2021-09-30,1,1)
+U(2021-09-30,1,2)
-U(2021-09-30,1,2)
+U(2021-09-30,1,3)
-U(2021-09-30,1,3)
+U(2021-09-30,1,2)
-U(2021-09-30,1,2)
+U(2021-09-30,1,3)
-U(2021-09-30,1,3)
+U(2021-09-30,1,2)
-U(2021-09-30,1,2)
+U(2021-09-30,1,3)
-U(2021-09-30,1,3)
+U(2021-09-30,1,2)
-U(2021-09-30,1,2)
+U(2021-09-30,1,3)
-U(2021-09-30,1,3)
+U(2021-09-30,1,2)
-U(2021-09-30,1,2)
+U(2021-09-30,1,1)
+I(2021-10-01,1,1)