Flink最锋利的武器Flink SQL(入门篇)
原创 左右 大数据左右手 2021-06-08 08:12:45
收录于话题
#Flink
本文目录简介
o 介绍
o 计划器
o Flink与Blink流批环境
o 表
o 支持连接资源
o 创建数据源表
o 创建数据结果表
o 创建数据维表
o 小案例
介绍
Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。
自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是我们熟知的 Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。
从1.9开始,Flink 提供了两个 Table Planner 实现来执行 Table API 和 SQL 程序:Blink Planner和Old Planner,Old Planner 在1.9之前就已经存在了 Planner 的作用主要是把关系型的操作翻译成可执行的、经过优化的 Flink 任务。两种 Planner 所使用的优化规则以及运行时类都不一样。它们在支持的功能上也有些差异。
两种计划器
o Blink 将批处理作业视作流处理的一种特例。严格来说,Table 和 DataSet 之间不支持相互转换,并且批处理作业也不会转换成 DataSet 程序而是转换成 DataStream 程序,流处理作业也一样。
o Blink 计划器不支持 BatchTableSource,而是使用有界的 StreamTableSource 来替代。
o 旧计划器和 Blink 计划器中 FilterableTableSource 的实现是不兼容的。旧计划器会将 PlannerExpression 下推至 FilterableTableSource,而 Blink 计划器则是将 Expression 下推。
o 基于字符串的键值配置选项仅在 Blink 计划器中使用。
o PlannerConfig 在两种计划器中的实现(CalciteConfig)是不同的。
o Blink 计划器会将多sink(multiple-sinks)优化成一张有向无环图(DAG),TableEnvironment 和 StreamTableEnvironment 都支持该特性。旧计划器总是将每个sink都优化成一个新的有向无环图,且所有图相互独立。
o 旧计划器目前不支持 catalog 统计数据,而 Blink 支持。
1.13版本Maven资源
<properties><br /> <flink.version>1.13.0</flink.version><br /> </properties>
<dependencies><br /> <dependency><br /> <groupId>org.apache.flink</groupId><br /> <artifactId>flink-table-api-scala-bridge_2.11</artifactId><br /> <version>${flink.version}</version><br /> </dependency>
<dependency><br /> <groupId>org.apache.flink</groupId><br /> <artifactId>flink-table-planner-blink_2.11</artifactId><br /> <version>${flink.version}</version><br /> </dependency><br /> <dependency><br /> <groupId>org.apache.flink</groupId><br /> <artifactId>flink-streaming-scala_2.11</artifactId><br /> <version>${flink.version}</version><br /> </dependency><br /> <dependency><br /> <groupId>org.apache.flink</groupId><br /> <artifactId>flink-clients_2.11</artifactId><br /> <version>${flink.version}</version><br /> </dependency><br /> <dependency><br /> <groupId>org.apache.flink</groupId><br /> <artifactId>flink-connector-jdbc_2.11</artifactId><br /> <version>${flink.version}</version><br /> </dependency><br /> <dependency><br /> <groupId>com.alibaba.ververica</groupId><br /> <artifactId>flink-connector-mysql-cdc</artifactId><br /> <version>1.4.0</version><br /> </dependency>
<dependency><br /> <groupId>org.apache.flink</groupId><br /> <artifactId>flink-connector-kafka_2.11</artifactId><br /> <version>${flink.version}</version><br /> </dependency><br /> <dependency><br /> <groupId>com.alibaba.ververica</groupId><br /> <artifactId>flink-format-changelog-json</artifactId><br /> <version>1.1.1</version><br /> </dependency><br /> <dependency><br /> <groupId>org.apache.flink</groupId><br /> <artifactId>flink-json</artifactId><br /> <version>${flink.version}</version><br /> </dependency>
<dependency><br /> <groupId>org.apache.flink</groupId><br /> <artifactId>flink-csv</artifactId><br /> <version>${flink.version}</version><br /> </dependency><br /> </dependencies>
环境:TableEnvironment
功能
- 在内部的 catalog 中注册 Table
2. 注册外部的 catalog
3. 加载可插拔模块
4. 执行 SQL 查询
5. 注册自定义函数 (scalar、table 或 aggregation)
6. 将 DataStream 或 DataSet 转换成 Table
7. 持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用Flink与Blink流批环境
流处理
o Flink Stream Table Environment
def flink_streaming_env(): StreamTableEnvironment ={
val fsSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val fsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val fsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(fsEnv, fsSettings)
fsTableEnv
}
o Blink Stream Table Environment
def blink_streaming_env(): StreamTableEnvironment ={
val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
bsTableEnv
}批处理
o Flink Batch Table Environment
def flink_batch_env(): BatchTableEnvironment ={
val fbEnv: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val fbTableEnv: BatchTableEnvironment = BatchTableEnvironment.create(fbEnv)
fbTableEnv
}
o Blink Batch Table Environment
def blink_batch_env(): TableEnvironment ={
val bbSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv: TableEnvironment = TableEnvironment.create(bbSettings)
bbTableEnv
}数据类型
时间格式的较多,当然也有自定义的数据类型
SQL | |
---|---|
CHAR | CHAR/CHAR(n) |
VARCHAR / STRING | VARCHAR/VARCHAR(n)/STRING |
BINARY | BINARY/BINARY(n) |
VARBINARY / BYTES | VARBINARY/VARBINARY(n)/BYTES |
DECIMAL | DECIMAL/DECIMAL(p)/DECIMAL(p, s)/DEC/DEC(p)/DEC(p, s)/NUMERIC/NUMERIC(p)/NUMERIC(p, s) |
TINYINT | TINYINT |
INT | INT/INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE/DOUBLE PRECISION |
DATE | DATE |
TIME | TIME/TIME(p) |
TIMESTAMP | TIMESTAMP/TIMESTAMP(p)/TIMESTAMP WITHOUT TIME ZONE/TIMESTAMP(p) WITHOUT TIME ZONE |
TIMESTAMP WITH TIME ZONE | TIMESTAMP WITH TIME ZONE/TIMESTAMP(p) WITH TIME ZONE |
TIMESTAMP_LTZ | TIMESTAMP_LTZ/TIMESTAMP_LTZ(p)/TIMESTAMP WITH LOCAL TIME ZONE/TIMESTAMP(p) WITH LOCAL TIME ZONE |
INTERVAL YEAR TO MONTH | INTERVAL YEAR/INTERVAL YEAR(p)/INTERVAL YEAR(p) TO MONTH/INTERVAL MONTH |
INTERVAL DAY TO SECOND | INTERVAL DAY/INTERVAL DAY(p1)/INTERVAL DAY(p1) TO HOUR/INTERVAL DAY(p1) TO MINUTE/INTERVAL DAY(p1) TO SECOND(p2)/INTERVAL HOUR/INTERVAL HOUR TO MINUTE/INTERVAL HOUR TO SECOND(p2)/INTERVAL MINUTE/INTERVAL MINUTE TO SECOND(p2)/INTERVAL SECOND/INTERVAL SECOND(p2) |
ARRAY | ARRAY< t>/ t ARRAY |
MAP | MAP |
MULTISET | MULTISET < t>/t MULTISET |
ROW | ROW |
BOOLEAN | BOOLEAN |
NULL | NULL |
创建表
临时表(Temporary Table)
临时表通常保存于内存中并且仅在创建它们的 Flink 会话持续期间存在。这些表对于其它会话是不可见的。它们不与任何 catalog 或者数据库绑定但可以在一个命名空间(namespace)中创建。即使它们对应的数据库被删除,临时表也不会被删除。
val sql=
“””
|CREATE TEMPORARY TABLE users (
| id BIGINT,
| name STRING,
| age INT
|) WITH (
| ‘connector’ = ‘xxxx’
|)
|”””.stripMargin
永久表(Permanent Table)
永久表需要 catalog(例如 Hive Metastore)以维护表的元数据。一旦永久表被创建,它将对任何连接到 catalog 的 Flink 会话可见且持续存在,直至被明确删除
val sql=
“””
|CREATE TABLE users (
| id BIGINT,
| name STRING,
| age INT
|) WITH (
| ‘connector’ = ‘xxxx’
|)
|”””.stripMargin
支持的连接资源一览
Name | Version | Source | Sink |
---|---|---|---|
Filesystem | Bounded and Unbounded Scan, Lookup Streaming Sink, Batch Sink | ||
Elasticsearch | 6.x & 7.x | Not supported | Streaming Sink, Batch Sink |
Apache Kafka | 0.10+ | Unbounded Scan | Streaming Sink, Batch Sink |
Amazon Kinesis Data Streams | Unbounded Scan | Streaming Sink | |
JDBC | Bounded Scan, Lookup | Streaming Sink, Batch Sink | |
Apache HBase | 1.4.x & 2.2.x Bounded Scan, Lookup | Streaming Sink, Batch Sink | |
Apache Hive | Supported Versions | Unbounded Scan, Bounded Scan, Lookup | Streaming Sink, Batch Sink |
创建数据源表
环境初始化
// flink环境 private var bsEnv: StreamExecutionEnvironment =
// 流处理table环境 private var tableEnv: StreamTableEnvironment =
// 批处理table环境 private var tableBatchEnv: TableEnvironment =
def initenvironment(): Unit ={
bsEnv=StreamExecutionEnvironment.getExecutionEnvironment
// 流处理 val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
tableEnv=StreamTableEnvironment.create(bsEnv, bsSettings)
_// 批处理 val bbSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
tableBatchEnv=TableEnvironment.create(bbSettings)
}
CSV
def fromFilesystemWithCSV(): Unit ={
val sql=
“””
|CREATE TABLE users (
| id BIGINT,
| name STRING,
| age INT
|) WITH (
| ‘connector’ = ‘filesystem’,
| ‘path’ = ‘xxx\xx\something.csv’,
| ‘format’ = ‘csv’
|)
|”””.stripMargin
tableBatchEnv.executeSql(sql)<br /> tableBatchEnv.executeSql("select * from users").print()<br /> }<br />
Kafka
o kafka基本
_// kafka基础 def fromKafkaWithBasic(): Unit ={
val kafka_sql=
“””
|CREATE TABLE users (
| id BIGINT,
| name STRING,
| age INT
|) WITH (
| ‘connector’ = ‘kafka’,
| ‘topic’ = ‘testtopic’,
| ‘properties.bootstrap.servers’ = ‘xxxxxxxx’,
| ‘format’ = ‘json’
|)
|”””.stripMargin
tableEnv.executeSql(kafka_sql)
tableEnv.executeSql(“select * from users”).print()
}
o 可用的kafka元数据操作
def fromKafkaWithMetadata(): Unit ={
val kafka_with_metadata=
“””
|CREATE TABLE users (
| event_time TIMESTAMP(3) METADATA FROM ‘timestamp’,
| partition BIGINT METADATA VIRTUAL,
| offset BIGINT METADATA VIRTUAL,
| id BIGINT,
| name STRING,
| age INT
|) WITH (
| ‘connector’ = ‘kafka’,
| ‘topic’ = ‘testtopic’,
| ‘properties.group.id’ = ‘testGroup’,
| ‘properties.bootstrap.servers’ = ‘xxxxxxx’,
| ‘format’ = ‘json’
|)
|”””.stripMargin
tableEnv.executeSql(kafka_with_metadata)<br /> tableEnv.executeSql("select partition,offset,id,age,name from users").print()<br /> }
元信息列
您可以在源表中定义元信息列,以获取Kafka消息的元信息。例如,当WITH参数中定义了多个topic时,如果在Kafka源表中定义了元信息列,那么Flink读取到的数据就会被标识是从哪个topic中读取的数据
Key | 数据类型 | 说明 |
---|---|---|
topic | STRING NOT NULL METADATA VIRTUAL | Kafka消息所在的Topic名称。 |
partition | INT NOT NULL METADATA VIRTUAL | Kafka消息所在的Partition ID。 |
headers | MAP |
Kafka消息的消息头(header)。 |
leader-epoch | INT NOT NULL METADATA VIRTUAL | Kafka消息的Leader epoch。 |
offset | BIGINT NOT NULL METADATA VIRTUAL | Kafka消息的偏移量(offest)。 |
timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL | Kafka消息的时间戳。 |
timestamp-type | STRING NOT NULL METADATA VIRTUAL | Kafka消息的时间戳类型:NoTimestampType:消息中没有定义时间戳CreateTime:消息产生的时间。LogAppendTime:消息被添加到Kafka Broker的时间。 |
WITH参数
参数 | 说明 | 是否必选 | 数据类型 |
---|---|---|---|
connector | 源表类型。 | 是 | String |
topic | topic名称。 | 是 | String |
topic-pattern | 匹配读取topic名称的正则表达式。所有匹配该正则表达式的topic在作业运行时均会被订阅。 | 否 | String |
properties.bootstrap.servers | Kafka Broker地址。 | 是 | String |
properties.group.id | Kafka消费组ID。 | 是 | String |
properties.* | Kafka配置。 | 否 | String |
format | Flink Kafka Connector在反序列化来自Kafka的消息体(value)时使用的格式。 | 是 | String |
value.format | Flink Kafka Connector在反序列化来自Kafka的消息体(value)时使用的格式。 | 是 | String |
key.format | 反序列化Kafka消息键(key)时使用的格式。 | 否 | String |
key.fields | Kafka消息键(key)解析出来的数据存放的字段。 | 否 | String |
key.fields-prefix | 为所有Kafka消息键(Key)指定自定义前缀,以避免与消息体(Value)格式字段重名。 | 否 | String |
value.fields-include | 在解析消息体时,是否要包含消息键字段。 | 否 | String |
scan.startup.mode | Kafka读取数据的启动位点。 | 否 | String |
scan.startup.specific-offsets | 在specific-offsets启动模式下,指定每个分区的启动偏移量。 | 否 | String |
scan.startup.timestamp-millis | 在timestamp启动模式下,指定启动位点时间戳。 | 否 | Long |
以上以kafka和filesystem为例,其他可以参考官网
创建数据结果表
create table kafka_sink(
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) with (
‘connector’ = ‘kafka’,
‘topic’ = ‘
‘properties.bootstrap.servers’ = ‘
‘format’ = ‘csv’
)
WITH参数
参数 | 说明 | 是否必选 | 数据类型 |
---|---|---|---|
connector | 结果表类型 | 是 | STRING |
topic | 结果表对应的Topic | 是 | STRING |
properties.bootstrap.servers | Kafka Broker地址 | 是 | STRING |
format | Flink Kafka Connector在反序列化来自Kafka的消息时使用的格式。 | 是 | STRING |
sink.partitioner | 从Flink分区到Kafka分区的映射模式。 | 否 | STRING |
小案例从csv文件读取落到mysql
def fromCSVToJDBC(): Unit ={
val sql_csv=<br /> """<br /> |CREATE TABLE person (<br /> | id BIGINT,<br /> | name STRING,<br /> | age INT<br /> |) WITH (<br /> | 'connector' = 'filesystem',<br /> | 'path' = 'xxx\xxx\something.csv',<br /> | 'format' = 'csv'<br /> |)<br /> |""".stripMargin
tableBatchEnv.executeSql(sql_csv)
val sql=<br /> """<br /> |CREATE TABLE users (<br /> | id BIGINT,<br /> | name STRING,<br /> | age INT,<br /> | PRIMARY KEY (id) NOT ENFORCED<br /> |) WITH (<br /> | 'connector' = 'jdbc',<br /> | 'username' = 'root',<br /> | 'password' = 'root',<br /> | 'url' = 'jdbc:mysql://localhost:3306/test?useUnicode=yes&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC',<br /> | 'driver'='com.mysql.cj.jdbc.Driver',<br /> | 'table-name' = 'users'<br /> |)<br /> |""".stripMargin
tableBatchEnv.executeSql(sql)<br /> tableBatchEnv.executeSql("insert into users select id,name,age from person")<br /> }
创建数据维表
CREATE TABLE filesystem_dim (
id STRING,
name STRING) WITH (
‘connector’ = ‘filesystem’,
‘path’ = ‘csv/json/avro/parquet/orc/raw’,
‘format’ = ‘CSV’
)
WITH 参数
参数 | 说明 | 是否必选 | 备注 |
---|---|---|---|
connector | 维表类型 | 是 | 固定值为filesystem。 |
path | 文件路径 | 是 | URI格式,例如:oss://my_path/my_file。 |
format | 文件格式 | 是 | 参数取值如下:CSV/JSON/AVRO/PARQUET/ORC/RAW |
lookup.join.cache.ttl | 重新读取数据的TTL时间 | 否 | 默认值为60分钟,即每隔60分钟重新读取数据。 |
代码示例
—创建event源表
CREATE TABLE event (
id STRING,
data STRING) WITH (
‘connector’ = ‘datahub’
…
);
—创建white_list维表。
CREATE TABLE white_list (
id STRING,
name STRING,
) WITH (
‘connector’ = ‘filesystem’,
‘path’ = ‘${remote_path_uri}’,
‘format’ = ‘${format}’
);
—关联event源表和white_list维表。
SELECT e., w.FROM event AS eJOIN white_list FOR SYSTEM_TIME AS OF proctime() AS wON e.id = w.id;
本篇总结
本篇向大家介绍了 Flink SQL的入门简介,并没有细究原理与实现逻辑。就是先让大家对Flink SQL先有一个直观的体验。先有兴趣才有深究的动力,未完待续……
转发,点赞,在看。共同学习,同登彼岸~~~