- 环境准备
- 作业开发
- PyFlink Table API 作业
- PyFlink DataStream API 作业">PyFlink DataStream API 作业
- 作业提交
- 问题排查
注意⚠️
Flink 版本 1.14.3
环境准备
第一步:安装 Python
PyFlink 需要 Python 3.6 以上版本(3.6, 3.7 或 3.8),请运行以下命令,以确保 Python 版本满足要求。
$ python --version
# the version printed here must be 3.6, 3.7 or 3.8
第二步:安装 JDK
我们知道 Flink 的运行时是使用 Java 语言开发的,所以为了执行 Flink 作业,您还需要安装 JDK。Flink 提供了对于 JDK 8 以及 JDK 11 的全面支持,您需要确认您的开发环境中是否已经安装了上述版本的 JDK,如果没有的话,首先需要安装 JDK。
sudo apt update
sudo apt install default-jdk
第三步:安装 PyFlink
PyFlink 已经被发布到PyPi,可以通过如下方式安装 PyFlink:
# 创建 Python 虚拟环境
python3 -m venv venv
# 使用上述创建的 Python 虚拟环境
./venv/bin/activate
# 安装 PyFlink 1.14.3
python -m pip install apache-flink==1.14.3
作业开发
PyFlink Table API 作业
1)创建 TableEnvironment 对象
对于 Table API 作业来说,用户首先需要创建一个 TableEnvironment 对象。下面代码示例展示了如何创建一个 TableEnvironment。
TableEnvironment 可以用来:
from pyflink.table import EnvironmentSettings, TableEnvironment
# create a streaming TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# or create a batch TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
- 创建 Table
- 将 Table 注册成临时表
- 执行 SQL 查询,更多细节可查阅 SQL
- 注册用户自定义的 (标量,表值,或者聚合) 函数, 更多细节可查阅 普通的用户自定义函数 和 向量化的用户自定义函数
- 配置作业,更多细节可查阅 Python 配置
- 管理 Python 依赖,更多细节可查阅 依赖管理
- 提交作业执行
2)配置作业的执行参数
可以通过以下方式,配置作业的执行参数。以下示例将作业的默认并发度设置为4。
table_env.get_config().get_configuration().set_string('parallelism.default', '4')
3)创建数据源表
Table 是 Python Table API 的核心组件。Table 是 Table API 作业中间结果的逻辑表示。 一个 Table 实例总是与一个特定的 TableEnvironment 相绑定。不支持在同一个查询中合并来自不同 TableEnvironments 的表,例如 join 或者 union 它们。 接下来,需要为作业创建一个数据源表。PyFlink 中提供了多种方式来定义数据源表。 #### 方式一:from_elements 这种方式通常用于测试阶段,可以快速地创建一个数据源表,验证作业逻辑 你可以使用一个列表对象创建一张表:python
from pyflink.table import EnvironmentSettings, TableEnvironment
# 创建批处理 TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
print(table.to_pandas())
结果为:
python
_1 _2
0 1 Hi
1 2 Hello
你也可以创建具有指定列名的表:
python
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
print(table.to_pandas())
结果为:
python
id data
0 1 Hi
1 2 Hello
默认情况下,表结构是从数据中自动提取的。
如果自动生成的表模式不符合你的要求,你也可以手动指定:
python
table_without_schema = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
# 默认情况下,“id” 列的类型是 64 位整型
default_type = table_without_schema.to_pandas()["id"].dtype
print('By default the type of the "id" column is %s.' % default_type)
from pyflink.table import DataTypes
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
DataTypes.ROW([DataTypes.FIELD("id", DataTypes.TINYINT()),
DataTypes.FIELD("data", DataTypes.STRING())]))
# 现在 “id” 列的类型是 8 位整型
type = table.to_pandas()["id"].dtype
print('Now the type of the "id" column is %s.' % type)
结果为:
python
By default the type of the "id" column is int64.
Now the type of the "id" column is int8.
说明:
+ 第一个参数是必须的,用于指定数据列表,列表中的每一个元素必须为 tuple 类型
+ 第二个参数是可选的,用于指定表的 schema
#### 方式二:DDL
通过 DDL 的方式来定义数据源表是目前最推荐的方式,且所有 Java Table API & SQL 中支持的 connector,都可以通过 DDL 的方式,在 PyFlink Table API 作业中使用。
以下示例定义了一个名字为 random_source,类型为 datagen 的表。
python
from pyflink.table import EnvironmentSettings, TableEnvironment
# 创建流计算 TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql("""
CREATE TABLE random_source (
id BIGINT,
data TINYINT
) WITH (
'connector' = 'datagen',
'fields.id.kind'='sequence',
'fields.id.start'='1',
'fields.id.end'='3',
'fields.data.kind'='sequence',
'fields.data.start'='4',
'fields.data.end'='6'
)
""")
table = table_env.from_path("random_source")
print(table.to_pandas())
结果为:
python
id data
0 1 4
1 2 5
2 3 6
说明:
+ 当前仅有部分 connector 的实现包含在 Flink 官方提供的发行包中,比如 FileSystem,DataGen、Print、BlackHole 等,大部分 connector 的实现当前没有包含在 Flink 官方提供的发行包中,比如 Kafka、ES 等。针对没有包含在 Flink 官方提供的发行包中的 connector,如果需要在 PyFlink 作业中使用,用户需要显式地指定相应 FAT JAR,比如针对 Kafka,需要使用 JAR 包,JAR 包可以通过如下方式指定:
python
# 注意:file:///前缀不能省略
table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar")
#### 方式三:catalog
TableEnvironment 维护了一个使用标识符创建的表的 catalogs 映射。
Catalog 中的表既可以是临时的,并与单个 Flink 会话生命周期相关联,也可以是永久的,跨多个 Flink 会话可见。
通过 SQL DDL 创建的表和视图, 例如 “create table …” 和 “create view …”,都存储在 catalog 中。
你可以通过 SQL 直接访问 catalog 中的表。
如果你要用 Table API 来使用 catalog 中的表,可以使用 “from_path” 方法来创建 Table API 对象:
python
# 准备 catalog
# 将 Table API 表注册到 catalog 中
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view('source_table', table)
# 从 catalog 中获取 Table API 表
new_table = table_env.from_path('source_table')
print(new_table.to_pandas())
结果为:
python
id data
0 1 Hi
1 2 Hello
这种方式和 DDL 的方式类似,只不过表的定义事先已经注册到了 catalog 中了,不需要在作业中重新再定义一遍了。
### 4)定义作业的计算逻辑
#### 方式一:通过 Table API
Table 对象有许多方法,可以用于进行关系操作。 这些方法返回新的 Table 对象,表示对输入 Table 应用关系操作之后的结果。 这些关系操作可以由多个方法调用组成,例如 table.group_by(…).select(…)。
简单的 Table API 聚合查询:
python
from pyflink.table import EnvironmentSettings, TableEnvironment
# 通过 batch table environment 来执行查询
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)],
['name', 'country', 'revenue'])
# 计算所有来自法国客户的收入
revenue = orders \
.select(orders.name, orders.country, orders.revenue) \
.where(orders.country == 'FRANCE') \
.group_by(orders.name) \
.select(orders.name, orders.revenue.sum.alias('rev_sum'))
print(revenue.to_pandas())
结果为:
python
name rev_sum
0 Jack 30
简单的 Table API 基于行操作的查询:
试试
结果为:
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import DataTypes
from pyflink.table.udf import udf
import pandas as pd
# 通过 batch table environment 来执行查询
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)],
['name', 'country', 'revenue'])
map_function = udf(lambda x: pd.concat([x.name, x.revenue * 10], axis=1),
result_type=DataTypes.ROW(
[DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("revenue", DataTypes.BIGINT())]),
func_type="pandas")
orders.map(map_function).alias('name', 'revenue').to_pandas()
试试
name revenue
0 Jack 100
1 Rose 300
2 Jack 200
@udf(result_type=DataTypes.STRING())
def sub_string(s: str, begin: int, end: int):
return s[begin:end]
transformed_tab = tab.select(sub_string(col('a'), 2, 4))
方式二:通过 SQL 语句
Flink 的 SQL 基于 Apache Calcite,它实现了标准的 SQL。SQL 查询语句使用字符串来表达。简单的 SQL 聚合查询:
结果为:
from pyflink.table import EnvironmentSettings, TableEnvironment
# 通过 stream table environment 来执行查询
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql("""
CREATE TABLE random_source (
id BIGINT,
data TINYINT
) WITH (
'connector' = 'datagen',
'fields.id.kind'='sequence',
'fields.id.start'='1',
'fields.id.end'='8',
'fields.data.kind'='sequence',
'fields.data.start'='4',
'fields.data.end'='11'
)
""")
table_env.execute_sql("""
CREATE TABLE print_sink (
id BIGINT,
data_sum TINYINT
) WITH (
'connector' = 'print'
)
""")
table_env.execute_sql("""
INSERT INTO print_sink
SELECT id, sum(data) as data_sum FROM
(SELECT id / 2 as id, data FROM random_source)
WHERE id > 1
GROUP BY id
""").wait()
2> +I(4,11)
6> +I(2,8)
8> +I(3,10)
6> -U(2,8)
8> -U(3,10)
6> +U(2,15)
8> +U(3,19)
实际上,上述输出展示了 print 结果表所接收到的 change log。 change log 的格式为:
{subtask id}> {消息类型}{值的字符串格式}
例如,“2> +I(4,11)” 表示这条消息来自第二个 subtask,其中 “+I” 表示这是一条插入的消息,”(4, 11)” 是这条消息的内容。 另外,”-U” 表示这是一条撤回消息 (即更新前),这意味着应该在 sink 中删除或撤回该消息。 “+U” 表示这是一条更新的记录 (即更新后),这意味着应该在 sink 中更新或插入该消息。
所以,从上面的 change log,我们可以得到如下结果:
试试
(4, 11)
(2, 15)
(3, 19)
说明:
t_env.create_temporary_function("sub_string", sub_string)
transformed_tab = t_env.sql_query("SELECT sub_string(a, 2, 4) FROM %s" % tab)
- TableEnvironment 中提供了多种方式用于执行 SQL 语句,其用途略有不同:
方式三:Table API 和 SQL 的混合使用
在 SQL 中使用 Table 对象:结果为:
# 创建一张 sink 表来接收结果数据
table_env.execute_sql("""
CREATE TABLE table_sink (
id BIGINT,
data VARCHAR
) WITH (
'connector' = 'print'
)
""")
# 将 Table API 表转换成 SQL 中的视图
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view('table_api_table', table)
# 将 Table API 表的数据写入结果表
table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_api_table").wait()
在 Table API 中使用 SQL 表:
6> +I(1,Hi)
6> +I(2,Hello)
结果为:
# 创建一张 SQL source 表
table_env.execute_sql("""
CREATE TABLE sql_source (
id BIGINT,
data TINYINT
) WITH (
'connector' = 'datagen',
'fields.id.kind'='sequence',
'fields.id.start'='1',
'fields.id.end'='4',
'fields.data.kind'='sequence',
'fields.data.start'='4',
'fields.data.end'='7'
)
""")
# 将 SQL 表转换成 Table API 表
table = table_env.from_path("sql_source")
# 或者通过 SQL 查询语句创建表
table = table_env.sql_query("SELECT * FROM sql_source")
# 将表中的数据写出
print(table.to_pandas())
id data
0 2 5
1 1 4
2 4 7
3 3 6
5)写出结果数据
方式一:collect
你可以使用 TableResult.collect 将 Table 的结果收集到客户端,结果的类型为迭代器类型。 以下代码展示了如何使用 TableResult.collect() 方法:结果为:
# 准备 source 表
source = table_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
# 得到 TableResult
res = table_env.execute_sql("select a + 1, b, c from %s" % source)
# 遍历结果
with res.collect() as results:
for result in results:
print(result)
说明:
<Row(2, 'Hi', 'Hello')>
<Row(3, 'Hello', 'Hello')>
- 该方式可以方便地将 table 的结果收集到客户端并查看
- 由于数据最终会收集到客户端,所以最好限制一下数据条数,比如:
res.limit(10).execute(),限制只收集 10 条数据到客户端
方式二:to_pandas
也可以通过 to_pandas 方法,将 table 的结果转换成 pandas.DataFrame 并查看。结果为:
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
print(table.to_pandas())
说明:
id data
0 1 Hi
1 2 Hello
- 该方式与 collect 类似,也会将 table 的结果收集到客户端,所以最好限制一下结果数据的条数
方式三:通过 DDL
和创建数据源表类似,也可以通过 DDL 的方式来创建结果表。结果为:
table_env.execute_sql("""
CREATE TABLE sink_table (
id BIGINT,
data VARCHAR
) WITH (
'connector' = 'print'
)
""")
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table.execute_insert("sink_table").wait()
说明:
6> +I(1,Hi)
6> +I(2,Hello)
- 当使用 print 作为 sink 时,作业结果会打印到标准输出中。如果不需要查看输出,也可以使用 blackhole 作为 sink。
你也可以使用 StatementSet 在一个作业中将 Table 中的数据写入到多张 sink 表中:
table_env.create_temporary_view("table_source", table)
table_env.execute_sql("INSERT INTO sink_table SELECT * FROM table_source").wait()
结果为:
# 准备 source 表和 sink 表
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view("simple_source", table)
table_env.execute_sql("""
CREATE TABLE first_sink_table (
id BIGINT,
data VARCHAR
) WITH (
'connector' = 'print'
)
""")
table_env.execute_sql("""
CREATE TABLE second_sink_table (
id BIGINT,
data VARCHAR
) WITH (
'connector' = 'print'
)
""")
# 创建 statement set
statement_set = table_env.create_statement_set()
# 将 "table" 的数据写入 "first_sink_table"
statement_set.add_insert("first_sink_table", table)
# 通过一条 sql 插入语句将数据从 "simple_source" 写入到 "second_sink_table"
statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source")
# 执行 statement set
statement_set.execute().wait()
7> +I(1,Hi)
7> +I(1,Hi)
7> +I(2,Hello)
7> +I(2,Hello)
6)查看执行计划
Table API 提供了一种机制来查看 Table 的逻辑查询计划和优化后的查询计划。 这是通过 Table.explain() 或者 StatementSet.explain() 方法来完成的。Table.explain() 可以返回一个 Table 的执行计划。StatementSet.explain() 则可以返回含有多个 sink 的作业的执行计划。这些方法会返回一个字符串,字符串描述了以下三个方面的信息:- 关系查询的抽象语法树,即未经优化的逻辑查询计划,
- 优化后的逻辑查询计划,
- 物理执行计划。
方式一:Table.explain
比如,当我们需要知道 transformed_tab 当前的执行计划时,可以执行:print(transformed_tab.explain()),可以得到如下输出:
== Abstract Syntax Tree ==
LogicalProject(EXPR$0=[sub_string($0, 2, 4)])
+- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]]])
== Optimized Logical Plan ==
PythonCalc(select=[sub_string(a, 2, 4) AS EXPR$0])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]]], fields=[a])
== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: PythonInputFormatTableSource(a)
Stage 2 : Operator
content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]], fields=[a])
ship_strategy : FORWARD
Stage 3 : Operator
content : StreamExecPythonCalc
ship_strategy : FORWARD
方式二:TableEnvironment.explain_sql
方式一适用于查看某一个 table 的执行计划,有时候并没有一个现成的 table 对象可用,比如:其执行计划如下所示:
print(t_env.explain_sql("INSERT INTO my_sink SELECT * FROM %s " % transformed_tab))
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0])
+- LogicalProject(EXPR$0=[sub_string($0, 2, 4)])
+- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]]])
== Optimized Logical Plan ==
Sink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0])
+- PythonCalc(select=[sub_string(a, 2, 4) AS EXPR$0])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]]], fields=[a])
== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: PythonInputFormatTableSource(a)
Stage 2 : Operator
content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]], fields=[a])
ship_strategy : FORWARD
Stage 3 : Operator
content : StreamExecPythonCalc
ship_strategy : FORWARD
Stage 4 : Data Sink
content : Sink: Sink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0])
ship_strategy : FORWARD
7)总结
完整的作业示例如下:执行结果如下:
from pyflink.table import DataTypes, EnvironmentSettings, StreamTableEnvironment
from pyflink.table.expressions import col
from pyflink.table.udf import udf
def table_api_demo():
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
t_env.get_config().get_configuration().set_string('parallelism.default', '4')
t_env.execute_sql("""
CREATE TABLE my_source (
a VARCHAR,
b VARCHAR
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10'
)
""")
tab = t_env.from_path('my_source')
@udf(result_type=DataTypes.STRING())
def sub_string(s: str, begin: int, end: int):
return s[begin:end]
transformed_tab = tab.select(sub_string(col('a'), 2, 4))
t_env.execute_sql("""
CREATE TABLE my_sink (
`sum` VARCHAR
) WITH (
'connector' = 'print'
)
""")
table_result = transformed_tab.execute_insert('my_sink')
# 1)等待作业执行结束,用于local执行,否则可能作业尚未执行结束,该脚本已退出,会导致minicluster过早退出
# 2)当作业通过detach模式往remote集群提交时,比如YARN/Standalone/K8s等,需要移除该方法
table_result.wait()
if __name__ == '__main__':
table_api_demo()
4> +I(a1)
3> +I(b0)
2> +I(b1)
1> +I(37)
3> +I(74)
4> +I(3d)
1> +I(07)
2> +I(f4)
1> +I(7f)
2> +I(da)
PyFlink DataStream API 作业
1)创建 StreamExecutionEnvironment 对象
对于 DataStream API 作业来说,用户首先需要定义一个 StreamExecutionEnvironment 对象。
env = StreamExecutionEnvironment.get_execution_environment()
2)配置作业的执行参数
可以通过以下方式,配置作业的执行参数。以下示例将作业的默认并发度设置为4。
env.set_parallelism(4)
3)创建数据源
接下来,需要为作业创建一个数据源。PyFlink 中提供了多种方式来定义数据源。方式一:from_collection
PyFlink 支持用户从一个列表创建源表。以下示例定义了包含了 3 行数据的表:[(1, ‘aaa|bb’), (2, ‘bb|a’), (3, ‘aaa|a’)],该表有 2 列,列名分别为 a 和 b,类型分别为 VARCHAR 和 BIGINT。说明:
ds = env.from_collection(
collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
type_info=Types.ROW([Types.INT(), Types.STRING()]))
- 这种方式通常用于测试阶段,可以方便地创建一个数据源
- from_collection 方法可以接收两个参数,其中第一个参数用于指定数据列表;第二个参数用于指定数据的类型
方式二:使用 PyFlink DataStream API 中定义的 connector
此外,也可以使用 PyFlink DataStream API 中已经支持的 connector,需要注意的是,1.12 中仅提供了 Kafka connector 的支持。说明:
deserialization_schema = JsonRowDeserializationSchema.builder() \
.type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
kafka_consumer = FlinkKafkaConsumer(
topics='test_source_topic',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})
ds = env.add_source(kafka_consumer)
- Kafka connector 当前没有包含在 Flink 官方提供的发行包中,如果需要在PyFlink 作业中使用,用户需要显式地指定相应 FAT JAR [2],JAR 包可以通过如下方式指定:
# 注意:file:///前缀不能省略
env.add_jars("file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar")
- 即使是 PyFlink DataStream API 作业,也推荐使用 Table & SQL connector 中打包出来的 FAT JAR,可以避免递归依赖的问题。
方式三:使用 PyFlink Table API 中定义的 connector
以下示例定义了如何将 Table & SQL 中支持的 connector 用于 PyFlink DataStream API 作业。说明:
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.execute_sql("""
CREATE TABLE my_source (
a INT,
b VARCHAR
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10'
)
""")
ds = t_env.to_append_stream(
t_env.from_path('my_source'),
Types.ROW([Types.INT(), Types.STRING()]))
- 由于当前 PyFlink DataStream API 中 built-in 支持的 connector 种类还比较少,推荐通过这种方式来创建 PyFlink DataStream API 作业中使用的数据源表,这样的话,所有 PyFlink Table API 中可以使用的 connector,都可以在 PyFlink DataStream API 作业中使用。
- 需要注意的是,TableEnvironment 需要通过以下方式创建 StreamTableEnvironment.create(stream_execution_environment=env),以使得 PyFlink DataStream API 与 PyFlink Table API 共享同一个 StreamExecutionEnvironment 对象。
4)定义计算逻辑
生成数据源对应的 DataStream 对象之后,接下来就可以使用 PyFlink DataStream API 中定义的各种操作,定义计算逻辑,对 DataStream 对象进行变换了,比如:
def split(s):
splits = s[1].split("|")
for sp in splits:
yield s[0], sp
ds = ds.map(lambda i: (i[0] + 1, i[1])) \
.flat_map(split) \
.key_by(lambda i: i[1]) \
.reduce(lambda i, j: (i[0] + j[0], i[1]))
5)写出结果数据
方式一:print
可以调用 DataStream 对象上的 print 方法,将 DataStream 的结果打印到标准输出中,比如:
ds.print()
方式二:使用 PyFlink DataStream API 中定义的 connector
可以直接使用 PyFlink DataStream API 中已经支持的 connector,需要注意的是,1.12 中提供了对于 FileSystem、JDBC、Kafka connector 的支持,以 Kafka 为例:说明:
serialization_schema = JsonRowSerializationSchema.builder() \
.with_type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
kafka_producer = FlinkKafkaProducer(
topic='test_sink_topic',
serialization_schema=serialization_schema,
producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})
ds.add_sink(kafka_producer)
- JDBC、Kafka connector 当前没有包含在 Flink 官方提供的发行包中,如果需要在 PyFlink 作业中使用,用户需要显式地指定相应 FAT JAR,比如 Kafka connector 可以使用 JAR 包 [2],JAR 包可以通过如下方式指定:
# 注意:file:///前缀不能省略
env.add_jars("file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar")
- 推荐使用 Table & SQL connector 中打包出来的 FAT JAR,可以避免递归依赖的问题。
方式三:使用 PyFlink Table API 中定义的 connector
以下示例展示了如何将 Table & SQL 中支持的 connector,用作 PyFlink DataStream API 作业的 sink。说明:
# 写法一:ds类型为Types.ROW
def split(s):
splits = s[1].split("|")
for sp in splits:
yield Row(s[0], sp)
ds = ds.map(lambda i: (i[0] + 1, i[1])) \
.flat_map(split, Types.ROW([Types.INT(), Types.STRING()])) \
.key_by(lambda i: i[1]) \
.reduce(lambda i, j: Row(i[0] + j[0], i[1]))
# 写法二:ds类型为Types.TUPLE
def split(s):
splits = s[1].split("|")
for sp in splits:
yield s[0], sp
ds = ds.map(lambda i: (i[0] + 1, i[1])) \
.flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
.key_by(lambda i: i[1]) \
.reduce(lambda i, j: (i[0] + j[0], i[1]))
# 将ds写出到sink
t_env.execute_sql("""
CREATE TABLE my_sink (
a INT,
b VARCHAR
) WITH (
'connector' = 'print'
)
""")
table = t_env.from_data_stream(ds)
table_result = table.execute_insert("my_sink")
- 需要注意的是,t_env.from_data_stream(ds) 中的 ds 对象的 result type 类型必须是复合类型 Types.ROW 或者 Types.TUPLE,这也就是为什么需要显式声明作业计算逻辑中 flat_map 操作的 result 类型
- 作业的提交,需要通过 PyFlink Table API 中提供的作业提交方式进行提交
- 由于当前 PyFlink DataStream API 中支持的 connector 种类还比较少,推荐通过这种方式来定义 PyFlink DataStream API 作业中使用的数据源表,这样的话,所有 PyFlink Table API 中可以使用的 connector,都可以作为 PyFlink DataStream API 作业的 sink。
6)总结
完整的作业示例如下:方式一(适合调试):
执行结果如下:
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
def data_stream_api_demo():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
ds = env.from_collection(
collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
type_info=Types.ROW([Types.INT(), Types.STRING()]))
def split(s):
splits = s[1].split("|")
for sp in splits:
yield s[0], sp
ds = ds.map(lambda i: (i[0] + 1, i[1])) \
.flat_map(split) \
.key_by(lambda i: i[1]) \
.reduce(lambda i, j: (i[0] + j[0], i[1]))
ds.print()
env.execute()
if __name__ == '__main__':
data_stream_api_demo()
3> (2, 'aaa')
3> (2, 'bb')
3> (6, 'aaa')
3> (4, 'a')
3> (5, 'bb')
3> (7, 'a')
方式二(适合线上作业):
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
def data_stream_api_demo():
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
env.set_parallelism(4)
t_env.execute_sql("""
CREATE TABLE my_source (
a INT,
b VARCHAR
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10'
)
""")
ds = t_env.to_append_stream(
t_env.from_path('my_source'),
Types.ROW([Types.INT(), Types.STRING()]))
def split(s):
splits = s[1].split("|")
for sp in splits:
yield s[0], sp
ds = ds.map(lambda i: (i[0] + 1, i[1])) \
.flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
.key_by(lambda i: i[1]) \
.reduce(lambda i, j: (i[0] + j[0], i[1]))
t_env.execute_sql("""
CREATE TABLE my_sink (
a INT,
b VARCHAR
) WITH (
'connector' = 'print'
)
""")
table = t_env.from_data_stream(ds)
table_result = table.execute_insert("my_sink")
# 1)等待作业执行结束,用于local执行,否则可能作业尚未执行结束,该脚本已退出,会导致minicluster过早退出
# 2)当作业通过detach模式往remote集群提交时,比如YARN/Standalone/K8s等,需要移除该方法
table_result.wait()
if __name__ == '__main__':
data_stream_api_demo()
作业提交
Flink 提供了多种作业部署方式,比如 local、standalone、YARN、K8s 等,PyFlink 也支持上述作业部署方式,请参考 Flink 官方文档 [3],了解更多详细信息。local
说明:使用该方式执行作业时,会启动一个 minicluster,作业会提交到 minicluster 中执行,该方式适合作业开发阶段。
python3 table_api_demo.py
standalone
说明:使用该方式执行作业时,作业会提交到一个远端的 standalone 集群。
./bin/flink run --jobmanager localhost:8081 --python table_api_demo.py
YARN Per-Job
说明:使用该方式执行作业时,作业会提交到一个远端的 YARN 集群。
./bin/flink run --target yarn-per-job --python table_api_demo.py
K8s application mode
说明:使用该方式执行作业时,作业会提交到 K8s 集群,以 application mode 的方式执行。
./bin/flink run-application \
--target kubernetes-application \
--parallelism 8 \
-Dkubernetes.cluster-id=<ClusterId> \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.container.image=<PyFlinkImageName> \
--pyModule table_api_demo \
--pyFiles file:///path/to/table_api_demo.py
参数说明
除了上面提到的参数之外,通过 flink run 提交的时候,还有其它一些和 PyFlink 作业相关的参数。参数名 | 用途描述 | 示例 |
---|---|---|
-py / —python | 指定作业的入口文件 | -py file:///path/to/table_api_demo.py |
-pym / —pyModule | 指定作业的 entry module,功能和—python类似,可用于当作业的 Python 文件为 zip 包,无法通过—python 指定时,相比—python 来说,更通用 | -pym table_api_demo -pyfs file:///path/to/table_api_demo.py |
-pyfs / —pyFiles | 指定一个到多个 Python 文件(.py/.zip等,逗号分割),这些 Python 文件在作业执行的时候,会放到 Python 进程的 PYTHONPATH 中,可以在 Python 自定义函数中访问到 | -pyfs file:///path/to/table_api_demo.py,file:///path/to/deps.zip |
-pyarch / —pyArchives | 指定一个到多个存档文件(逗号分割),这些存档文件,在作业执行的时候,会被解压之后,放到 Python 进程的 workspace 目录,可以通过相对路径的方式进行访问 | -pyarch file:///path/to/venv.zip |
-pyexec / —pyExecutable | 指定作业执行的时候,Python 进程的路径 | -pyarch file:///path/to/venv.zip -pyexec venv.zip/venv/bin/python3 |
-pyreq / —pyRequirements | 指定 requirements 文件,requirements 文件中定义了作业的依赖 | -pyreq requirements.txt |
问题排查
当我们刚刚上手 PyFlink 作业开发的时候,难免会遇到各种各样的问题,学会如何排查问题是非常重要的。接下来,我们介绍一些常见的问题排查手段。client 端异常输出
PyFlink 作业也遵循 Flink 作业的提交方式,作业首先会在 client 端编译成 JobGraph,然后提交到 Flink 集群执行。如果作业编译有问题,会导致在 client 端提交作业的时候就抛出异常,此时可以在 client 端看到类似这样的输出:比如上述报错说明作业中使用的名字为”my_”的表不存在。
Traceback (most recent call last):
File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 50, in <module>
data_stream_api_demo()
File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 45, in data_stream_api_demo
table_result = table.execute_insert("my_")
File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/table/table.py", line 864, in execute_insert
return TableResult(self._j_table.executeInsert(table_path, overwrite))
File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/py4j/java_gateway.py", line 1285, in __call__
return_value = get_return_value(
File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 162, in deco
raise java_exception
pyflink.util.exceptions.TableException: Sink `default_catalog`.`default_database`.`my_` does not exists
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:247)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:159)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:159)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:159)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Process finished with exit code 1
TaskManager 日志文件
有些错误直到作业运行的过程中才会发生,比如脏数据或者 Python 自定义函数的实现问题等,针对这种错误,通常需要查看 TaskManager 的日志文件,比如以下错误反映用户在 Python 自定义函数中访问的 opencv 库不存在。说明:
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last):
File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
response = task()
File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instruction
return getattr(self, request_type)(
File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 977, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 85, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File "pyflink/fn_execution/coder_impl_fast.pyx", line 83, in pyflink.fn_execution.coder_impl_fast.DataStreamFlatMapCoderImpl.encode_to_stream
File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 26, in split
import cv2
ModuleNotFoundError: No module named 'cv2'
at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
- local 模式下,TaskManager 的 log 位于 PyFlink 的安装目录下:site-packages/pyflink/log/,也可以通过如下命令找到:
\>>> import pyflink
\>>> print(pyflink.__path__)
['/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink'],则log文件位于/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/log目录下
自定义日志
有时候,异常日志的内容并不足以帮助我们定位问题,此时可以考虑在 Python 自定义函数中打印一些日志信息。PyFlink 支持用户在 Python 自定义函数中通过 logging 的方式输出 log,比如:通过上述方式,split 函数的输入参数,会打印到 TaskManager 的日志文件中。
def split(s):
import logging
logging.info("s: " + str(s))
splits = s[1].split("|")
for sp in splits:
yield s[0], sp
远程调试
PyFlink 作业,在运行过程中,会启动一个独立的 Python 进程执行 Python 自定义函数,所以如果需要调试 Python 自定义函数,需要通过远程调试的方式进行,可以参见[4],了解如何在 Pycharm 中进行 Python 远程调试。 1)在 Python 环境中安装 pydevd-pycharm: pip install pydevd-pycharm~=203.7717.65 2)在 Python 自定义函数中设置远程调试参数:3)按照 Pycharm 中远程调试的步骤,进行操作即可,可以参见[4],也可以参考博客[5]中“代码调试”部分的介绍。 说明:Python 远程调试功能只在 Pycharm 的 professional 版才支持。
def split(s):
import pydevd_pycharm
pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True, stderrToServer=True)
splits = s[1].split("|")
for sp in splits:
yield s[0], sp
参考文档