Flink最锋利的武器Flink SQL(入门篇)
原创 左右 大数据左右手 2021-06-08 08:12:45
收录于话题
#Flink
FlinkSQL(入门篇) - 图1

本文目录简介

o 介绍
o 计划器
o Flink与Blink流批环境
o 表
o 支持连接资源
o 创建数据源表
o 创建数据结果表
o 创建数据维表
o 小案例

介绍

Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。
自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是我们熟知的 Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。
从1.9开始,Flink 提供了两个 Table Planner 实现来执行 Table API 和 SQL 程序:Blink Planner和Old Planner,Old Planner 在1.9之前就已经存在了 Planner 的作用主要是把关系型的操作翻译成可执行的、经过优化的 Flink 任务。两种 Planner 所使用的优化规则以及运行时类都不一样。它们在支持的功能上也有些差异。

两种计划器

o Blink 将批处理作业视作流处理的一种特例。严格来说,Table 和 DataSet 之间不支持相互转换,并且批处理作业也不会转换成 DataSet 程序而是转换成 DataStream 程序,流处理作业也一样。
o Blink 计划器不支持 BatchTableSource,而是使用有界的 StreamTableSource 来替代。
o 旧计划器和 Blink 计划器中 FilterableTableSource 的实现是不兼容的。旧计划器会将 PlannerExpression 下推至 FilterableTableSource,而 Blink 计划器则是将 Expression 下推。
o 基于字符串的键值配置选项仅在 Blink 计划器中使用。
o PlannerConfig 在两种计划器中的实现(CalciteConfig)是不同的。
o Blink 计划器会将多sink(multiple-sinks)优化成一张有向无环图(DAG),TableEnvironment 和 StreamTableEnvironment 都支持该特性。旧计划器总是将每个sink都优化成一个新的有向无环图,且所有图相互独立。
o 旧计划器目前不支持 catalog 统计数据,而 Blink 支持。

1.13版本Maven资源

  1. <properties><br /> <flink.version>1.13.0</flink.version><br /> </properties>
  2. <dependencies><br /> <dependency><br /> <groupId>org.apache.flink</groupId><br /> <artifactId>flink-table-api-scala-bridge_2.11</artifactId><br /> <version>${flink.version}</version><br /> </dependency>
  3. <dependency><br /> <groupId>org.apache.flink</groupId><br /> <artifactId>flink-table-planner-blink_2.11</artifactId><br /> <version>${flink.version}</version><br /> </dependency><br /> <dependency><br /> <groupId>org.apache.flink</groupId><br /> <artifactId>flink-streaming-scala_2.11</artifactId><br /> <version>${flink.version}</version><br /> </dependency><br /> <dependency><br /> <groupId>org.apache.flink</groupId><br /> <artifactId>flink-clients_2.11</artifactId><br /> <version>${flink.version}</version><br /> </dependency><br /> <dependency><br /> <groupId>org.apache.flink</groupId><br /> <artifactId>flink-connector-jdbc_2.11</artifactId><br /> <version>${flink.version}</version><br /> </dependency><br /> <dependency><br /> <groupId>com.alibaba.ververica</groupId><br /> <artifactId>flink-connector-mysql-cdc</artifactId><br /> <version>1.4.0</version><br /> </dependency>
  4. <dependency><br /> <groupId>org.apache.flink</groupId><br /> <artifactId>flink-connector-kafka_2.11</artifactId><br /> <version>${flink.version}</version><br /> </dependency><br /> <dependency><br /> <groupId>com.alibaba.ververica</groupId><br /> <artifactId>flink-format-changelog-json</artifactId><br /> <version>1.1.1</version><br /> </dependency><br /> <dependency><br /> <groupId>org.apache.flink</groupId><br /> <artifactId>flink-json</artifactId><br /> <version>${flink.version}</version><br /> </dependency>
  5. <dependency><br /> <groupId>org.apache.flink</groupId><br /> <artifactId>flink-csv</artifactId><br /> <version>${flink.version}</version><br /> </dependency><br /> </dependencies>

环境:TableEnvironment

功能

  1. 在内部的 catalog 中注册 Table
    2. 注册外部的 catalog
    3. 加载可插拔模块
    4. 执行 SQL 查询
    5. 注册自定义函数 (scalar、table 或 aggregation)
    6. 将 DataStream 或 DataSet 转换成 Table
    7. 持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

    Flink与Blink流批环境

    流处理

    o Flink Stream Table Environment
    def flink_streaming_env(): StreamTableEnvironment ={
    val fsSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
    val fsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val fsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(fsEnv, fsSettings)
    fsTableEnv
    }
    o Blink Stream Table Environment
    def blink_streaming_env(): StreamTableEnvironment ={
    val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
    bsTableEnv
    }

    批处理

    o Flink Batch Table Environment
    def flink_batch_env(): BatchTableEnvironment ={
    val fbEnv: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    val fbTableEnv: BatchTableEnvironment = BatchTableEnvironment.create(fbEnv)
    fbTableEnv
    }
    o Blink Batch Table Environment
    def blink_batch_env(): TableEnvironment ={
    val bbSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
    val bbTableEnv: TableEnvironment = TableEnvironment.create(bbSettings)
    bbTableEnv
    }

    数据类型

    时间格式的较多,当然也有自定义的数据类型
SQL
CHAR CHAR/CHAR(n)
VARCHAR / STRING VARCHAR/VARCHAR(n)/STRING
BINARY BINARY/BINARY(n)
VARBINARY / BYTES VARBINARY/VARBINARY(n)/BYTES
DECIMAL DECIMAL/DECIMAL(p)/DECIMAL(p, s)/DEC/DEC(p)/DEC(p, s)/NUMERIC/NUMERIC(p)/NUMERIC(p, s)
TINYINT TINYINT
INT INT/INTEGER
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE/DOUBLE PRECISION
DATE DATE
TIME TIME/TIME(p)
TIMESTAMP TIMESTAMP/TIMESTAMP(p)/TIMESTAMP WITHOUT TIME ZONE/TIMESTAMP(p) WITHOUT TIME ZONE
TIMESTAMP WITH TIME ZONE TIMESTAMP WITH TIME ZONE/TIMESTAMP(p) WITH TIME ZONE
TIMESTAMP_LTZ TIMESTAMP_LTZ/TIMESTAMP_LTZ(p)/TIMESTAMP WITH LOCAL TIME ZONE/TIMESTAMP(p) WITH LOCAL TIME ZONE
INTERVAL YEAR TO MONTH INTERVAL YEAR/INTERVAL YEAR(p)/INTERVAL YEAR(p) TO MONTH/INTERVAL MONTH
INTERVAL DAY TO SECOND INTERVAL DAY/INTERVAL DAY(p1)/INTERVAL DAY(p1) TO HOUR/INTERVAL DAY(p1) TO MINUTE/INTERVAL DAY(p1) TO SECOND(p2)/INTERVAL HOUR/INTERVAL HOUR TO MINUTE/INTERVAL HOUR TO SECOND(p2)/INTERVAL MINUTE/INTERVAL MINUTE TO SECOND(p2)/INTERVAL SECOND/INTERVAL SECOND(p2)
ARRAY ARRAY< t>/ t ARRAY
MAP MAP
MULTISET MULTISET < t>/t MULTISET
ROW ROW
BOOLEAN BOOLEAN
NULL NULL

创建表

临时表(Temporary Table)

临时表通常保存于内存中并且仅在创建它们的 Flink 会话持续期间存在。这些表对于其它会话是不可见的。它们不与任何 catalog 或者数据库绑定但可以在一个命名空间(namespace)中创建。即使它们对应的数据库被删除,临时表也不会被删除。
val sql=
“””
|CREATE TEMPORARY TABLE users (
| id BIGINT,
| name STRING,
| age INT
|) WITH (
| ‘connector’ = ‘xxxx’
|)
|”””.stripMargin

永久表(Permanent Table)

永久表需要 catalog(例如 Hive Metastore)以维护表的元数据。一旦永久表被创建,它将对任何连接到 catalog 的 Flink 会话可见且持续存在,直至被明确删除
val sql=
“””
|CREATE TABLE users (
| id BIGINT,
| name STRING,
| age INT
|) WITH (
| ‘connector’ = ‘xxxx’
|)
|”””.stripMargin

支持的连接资源一览

Name Version Source Sink
Filesystem Bounded and Unbounded Scan, Lookup Streaming Sink, Batch Sink
Elasticsearch 6.x & 7.x Not supported Streaming Sink, Batch Sink
Apache Kafka 0.10+ Unbounded Scan Streaming Sink, Batch Sink
Amazon Kinesis Data Streams Unbounded Scan Streaming Sink
JDBC Bounded Scan, Lookup Streaming Sink, Batch Sink
Apache HBase 1.4.x & 2.2.x Bounded Scan, Lookup Streaming Sink, Batch Sink
Apache Hive Supported Versions Unbounded Scan, Bounded Scan, Lookup Streaming Sink, Batch Sink

注意:数据资源如果没有开发需要的?别着急,可以自定义

创建数据源表

环境初始化

// flink环境 private var bsEnv: StreamExecutionEnvironment =
// 流处理table环境 private var tableEnv: StreamTableEnvironment =
// 批处理table环境 private var tableBatchEnv: TableEnvironment =

def initenvironment(): Unit ={
bsEnv=StreamExecutionEnvironment.getExecutionEnvironment
// 流处理 val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
tableEnv=StreamTableEnvironment.create(bsEnv, bsSettings)
_// 批处理 val bbSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
tableBatchEnv=TableEnvironment.create(bbSettings)
}

CSV

def fromFilesystemWithCSV(): Unit ={
val sql=
“””
|CREATE TABLE users (
| id BIGINT,
| name STRING,
| age INT
|) WITH (
| ‘connector’ = ‘filesystem’,
| ‘path’ = ‘xxx\xx\something.csv’,
| ‘format’ = ‘csv’
|)
|”””.stripMargin

  1. tableBatchEnv.executeSql(sql)<br /> tableBatchEnv.executeSql("select * from users").print()<br /> }<br />

Kafka

o kafka基本
_// kafka基础 def fromKafkaWithBasic(): Unit ={
val kafka_sql=
“””
|CREATE TABLE users (
| id BIGINT,
| name STRING,
| age INT
|) WITH (
| ‘connector’ = ‘kafka’,
| ‘topic’ = ‘testtopic’,
| ‘properties.bootstrap.servers’ = ‘xxxxxxxx’,
| ‘format’ = ‘json’
|)
|”””.stripMargin
tableEnv.executeSql(kafka_sql)
tableEnv.executeSql(“select * from users”).print()
}
o 可用的kafka元数据操作
def fromKafkaWithMetadata(): Unit ={
val kafka_with_metadata=
“””
|CREATE TABLE users (
| event_time TIMESTAMP(3) METADATA FROM ‘timestamp’,
| partition BIGINT METADATA VIRTUAL,
| offset BIGINT METADATA VIRTUAL,
| id BIGINT,
| name STRING,
| age INT
|) WITH (
| ‘connector’ = ‘kafka’,
| ‘topic’ = ‘testtopic’,
| ‘properties.group.id’ = ‘testGroup’,
| ‘properties.bootstrap.servers’ = ‘xxxxxxx’,
| ‘format’ = ‘json’
|)
|”””.stripMargin

  1. tableEnv.executeSql(kafka_with_metadata)<br /> tableEnv.executeSql("select partition,offset,id,age,name from users").print()<br /> }

元信息列

您可以在源表中定义元信息列,以获取Kafka消息的元信息。例如,当WITH参数中定义了多个topic时,如果在Kafka源表中定义了元信息列,那么Flink读取到的数据就会被标识是从哪个topic中读取的数据

Key 数据类型 说明
topic STRING NOT NULL METADATA VIRTUAL Kafka消息所在的Topic名称。
partition INT NOT NULL METADATA VIRTUAL Kafka消息所在的Partition ID。
headers MAP NOT NULL METADATA VIRTUAL Kafka消息的消息头(header)。
leader-epoch INT NOT NULL METADATA VIRTUAL Kafka消息的Leader epoch。
offset BIGINT NOT NULL METADATA VIRTUAL Kafka消息的偏移量(offest)。
timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL Kafka消息的时间戳。
timestamp-type STRING NOT NULL METADATA VIRTUAL Kafka消息的时间戳类型:NoTimestampType:消息中没有定义时间戳CreateTime:消息产生的时间。LogAppendTime:消息被添加到Kafka Broker的时间。

WITH参数
参数 说明 是否必选 数据类型
connector 源表类型。 String
topic topic名称。 String
topic-pattern 匹配读取topic名称的正则表达式。所有匹配该正则表达式的topic在作业运行时均会被订阅。 String
properties.bootstrap.servers Kafka Broker地址。 String
properties.group.id Kafka消费组ID。 String
properties.* Kafka配置。 String
format Flink Kafka Connector在反序列化来自Kafka的消息体(value)时使用的格式。 String
value.format Flink Kafka Connector在反序列化来自Kafka的消息体(value)时使用的格式。 String
key.format 反序列化Kafka消息键(key)时使用的格式。 String
key.fields Kafka消息键(key)解析出来的数据存放的字段。 String
key.fields-prefix 为所有Kafka消息键(Key)指定自定义前缀,以避免与消息体(Value)格式字段重名。 String
value.fields-include 在解析消息体时,是否要包含消息键字段。 String
scan.startup.mode Kafka读取数据的启动位点。 String
scan.startup.specific-offsets 在specific-offsets启动模式下,指定每个分区的启动偏移量。 String
scan.startup.timestamp-millis 在timestamp启动模式下,指定启动位点时间戳。 Long

以上以kafka和filesystem为例,其他可以参考官网

创建数据结果表

create table kafka_sink(
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) with (
‘connector’ = ‘kafka’,
‘topic’ = ‘‘,
‘properties.bootstrap.servers’ = ‘‘,
‘format’ = ‘csv’
)

WITH参数
参数 说明 是否必选 数据类型
connector 结果表类型 STRING
topic 结果表对应的Topic STRING
properties.bootstrap.servers Kafka Broker地址 STRING
format Flink Kafka Connector在反序列化来自Kafka的消息时使用的格式。 STRING
sink.partitioner 从Flink分区到Kafka分区的映射模式。 STRING

小案例从csv文件读取落到mysql

def fromCSVToJDBC(): Unit ={

  1. val sql_csv=<br /> """<br /> |CREATE TABLE person (<br /> | id BIGINT,<br /> | name STRING,<br /> | age INT<br /> |) WITH (<br /> | 'connector' = 'filesystem',<br /> | 'path' = 'xxx\xxx\something.csv',<br /> | 'format' = 'csv'<br /> |)<br /> |""".stripMargin
  2. tableBatchEnv.executeSql(sql_csv)
  3. val sql=<br /> """<br /> |CREATE TABLE users (<br /> | id BIGINT,<br /> | name STRING,<br /> | age INT,<br /> | PRIMARY KEY (id) NOT ENFORCED<br /> |) WITH (<br /> | 'connector' = 'jdbc',<br /> | 'username' = 'root',<br /> | 'password' = 'root',<br /> | 'url' = 'jdbc:mysql://localhost:3306/test?useUnicode=yes&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC',<br /> | 'driver'='com.mysql.cj.jdbc.Driver',<br /> | 'table-name' = 'users'<br /> |)<br /> |""".stripMargin
  4. tableBatchEnv.executeSql(sql)<br /> tableBatchEnv.executeSql("insert into users select id,name,age from person")<br /> }

创建数据维表

CREATE TABLE filesystem_dim (
id STRING,
name STRING) WITH (
‘connector’ = ‘filesystem’,
‘path’ = ‘csv/json/avro/parquet/orc/raw’,
‘format’ = ‘CSV’
)

WITH 参数
参数 说明 是否必选 备注
connector 维表类型 固定值为filesystem。
path 文件路径 URI格式,例如:oss://my_path/my_file。
format 文件格式 参数取值如下:CSV/JSON/AVRO/PARQUET/ORC/RAW
lookup.join.cache.ttl 重新读取数据的TTL时间 默认值为60分钟,即每隔60分钟重新读取数据。

代码示例

—创建event源表
CREATE TABLE event (
id STRING,
data STRING) WITH (
‘connector’ = ‘datahub’

);

—创建white_list维表。
CREATE TABLE white_list (
id STRING,
name STRING,
) WITH (
‘connector’ = ‘filesystem’,
‘path’ = ‘${remote_path_uri}’,
‘format’ = ‘${format}’
);

—关联event源表和white_list维表。
SELECT e., w.FROM event AS eJOIN white_list FOR SYSTEM_TIME AS OF proctime() AS wON e.id = w.id;

本篇总结

本篇向大家介绍了 Flink SQL的入门简介,并没有细究原理与实现逻辑。就是先让大家对Flink SQL先有一个直观的体验。先有兴趣才有深究的动力,未完待续……
转发,点赞,在看。共同学习,同登彼岸~~~