注意⚠️

Flink 版本 1.14.3

环境准备

第一步:安装 Python

PyFlink 需要 Python 3.6 以上版本(3.6, 3.7 或 3.8),请运行以下命令,以确保 Python 版本满足要求。

  1. $ python --version
  2. # the version printed here must be 3.6, 3.7 or 3.8

第二步:安装 JDK

我们知道 Flink 的运行时是使用 Java 语言开发的,所以为了执行 Flink 作业,您还需要安装 JDK。Flink 提供了对于 JDK 8 以及 JDK 11 的全面支持,您需要确认您的开发环境中是否已经安装了上述版本的 JDK,如果没有的话,首先需要安装 JDK。

  1. sudo apt update
  2. sudo apt install default-jdk

第三步:安装 PyFlink

PyFlink 已经被发布到PyPi,可以通过如下方式安装 PyFlink:

  1. # 创建 Python 虚拟环境
  2. python3 -m venv venv
  3. # 使用上述创建的 Python 虚拟环境
  4. ./venv/bin/activate
  5. # 安装 PyFlink 1.14.3
  6. python -m pip install apache-flink==1.14.3

作业开发

PyFlink Table API 作业

1)创建 TableEnvironment 对象

对于 Table API 作业来说,用户首先需要创建一个 TableEnvironment 对象。下面代码示例展示了如何创建一个 TableEnvironment。

  1. from pyflink.table import EnvironmentSettings, TableEnvironment
  2. # create a streaming TableEnvironment
  3. env_settings = EnvironmentSettings.in_streaming_mode()
  4. table_env = TableEnvironment.create(env_settings)
  5. # or create a batch TableEnvironment
  6. env_settings = EnvironmentSettings.in_batch_mode()
  7. table_env = TableEnvironment.create(env_settings)
TableEnvironment 可以用来:

2)配置作业的执行参数

可以通过以下方式,配置作业的执行参数。以下示例将作业的默认并发度设置为4。

  1. table_env.get_config().get_configuration().set_string('parallelism.default', '4')

3)创建数据源表

Table 是 Python Table API 的核心组件。Table 是 Table API 作业中间结果的逻辑表示。 一个 Table 实例总是与一个特定的 TableEnvironment 相绑定。不支持在同一个查询中合并来自不同 TableEnvironments 的表,例如 join 或者 union 它们。 接下来,需要为作业创建一个数据源表。PyFlink 中提供了多种方式来定义数据源表。 #### 方式一:from_elements 这种方式通常用于测试阶段,可以快速地创建一个数据源表,验证作业逻辑 你可以使用一个列表对象创建一张表: python from pyflink.table import EnvironmentSettings, TableEnvironment # 创建批处理 TableEnvironment env_settings = EnvironmentSettings.in_batch_mode() table_env = TableEnvironment.create(env_settings) table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')]) print(table.to_pandas()) 结果为: python _1 _2 0 1 Hi 1 2 Hello 你也可以创建具有指定列名的表: python table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) print(table.to_pandas()) 结果为: python id data 0 1 Hi 1 2 Hello 默认情况下,表结构是从数据中自动提取的。 如果自动生成的表模式不符合你的要求,你也可以手动指定: python table_without_schema = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) # 默认情况下,“id” 列的类型是 64 位整型 default_type = table_without_schema.to_pandas()["id"].dtype print('By default the type of the "id" column is %s.' % default_type) from pyflink.table import DataTypes table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], DataTypes.ROW([DataTypes.FIELD("id", DataTypes.TINYINT()), DataTypes.FIELD("data", DataTypes.STRING())])) # 现在 “id” 列的类型是 8 位整型 type = table.to_pandas()["id"].dtype print('Now the type of the "id" column is %s.' % type) 结果为: python By default the type of the "id" column is int64. Now the type of the "id" column is int8. 说明: + 第一个参数是必须的,用于指定数据列表,列表中的每一个元素必须为 tuple 类型 + 第二个参数是可选的,用于指定表的 schema #### 方式二:DDL 通过 DDL 的方式来定义数据源表是目前最推荐的方式,且所有 Java Table API & SQL 中支持的 connector,都可以通过 DDL 的方式,在 PyFlink Table API 作业中使用。 以下示例定义了一个名字为 random_source,类型为 datagen 的表。 python from pyflink.table import EnvironmentSettings, TableEnvironment # 创建流计算 TableEnvironment env_settings = EnvironmentSettings.in_streaming_mode() table_env = TableEnvironment.create(env_settings) table_env.execute_sql(""" CREATE TABLE random_source ( id BIGINT, data TINYINT ) WITH ( 'connector' = 'datagen', 'fields.id.kind'='sequence', 'fields.id.start'='1', 'fields.id.end'='3', 'fields.data.kind'='sequence', 'fields.data.start'='4', 'fields.data.end'='6' ) """) table = table_env.from_path("random_source") print(table.to_pandas()) 结果为: python id data 0 1 4 1 2 5 2 3 6 说明: + 当前仅有部分 connector 的实现包含在 Flink 官方提供的发行包中,比如 FileSystem,DataGen、Print、BlackHole 等,大部分 connector 的实现当前没有包含在 Flink 官方提供的发行包中,比如 Kafka、ES 等。针对没有包含在 Flink 官方提供的发行包中的 connector,如果需要在 PyFlink 作业中使用,用户需要显式地指定相应 FAT JAR,比如针对 Kafka,需要使用 JAR 包,JAR 包可以通过如下方式指定: python # 注意:file:///前缀不能省略 table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar") #### 方式三:catalog TableEnvironment 维护了一个使用标识符创建的表的 catalogs 映射。 Catalog 中的表既可以是临时的,并与单个 Flink 会话生命周期相关联,也可以是永久的,跨多个 Flink 会话可见。 通过 SQL DDL 创建的表和视图, 例如 “create table …” 和 “create view …”,都存储在 catalog 中。 你可以通过 SQL 直接访问 catalog 中的表。 如果你要用 Table API 来使用 catalog 中的表,可以使用 “from_path” 方法来创建 Table API 对象: python # 准备 catalog # 将 Table API 表注册到 catalog 中 table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) table_env.create_temporary_view('source_table', table) # 从 catalog 中获取 Table API 表 new_table = table_env.from_path('source_table') print(new_table.to_pandas()) 结果为: python id data 0 1 Hi 1 2 Hello 这种方式和 DDL 的方式类似,只不过表的定义事先已经注册到了 catalog 中了,不需要在作业中重新再定义一遍了。 ### 4)定义作业的计算逻辑 #### 方式一:通过 Table API Table 对象有许多方法,可以用于进行关系操作。 这些方法返回新的 Table 对象,表示对输入 Table 应用关系操作之后的结果。 这些关系操作可以由多个方法调用组成,例如 table.group_by(…).select(…)。 简单的 Table API 聚合查询: python from pyflink.table import EnvironmentSettings, TableEnvironment # 通过 batch table environment 来执行查询 env_settings = EnvironmentSettings.in_batch_mode() table_env = TableEnvironment.create(env_settings) orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)], ['name', 'country', 'revenue']) # 计算所有来自法国客户的收入 revenue = orders \ .select(orders.name, orders.country, orders.revenue) \ .where(orders.country == 'FRANCE') \ .group_by(orders.name) \ .select(orders.name, orders.revenue.sum.alias('rev_sum')) print(revenue.to_pandas()) 结果为: python name rev_sum 0 Jack 30 简单的 Table API 基于行操作的查询: 试试
  1. from pyflink.table import EnvironmentSettings, TableEnvironment
  2. from pyflink.table import DataTypes
  3. from pyflink.table.udf import udf
  4. import pandas as pd
  5. # 通过 batch table environment 来执行查询
  6. env_settings = EnvironmentSettings.in_batch_mode()
  7. table_env = TableEnvironment.create(env_settings)
  8. orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)],
  9. ['name', 'country', 'revenue'])
  10. map_function = udf(lambda x: pd.concat([x.name, x.revenue * 10], axis=1),
  11. result_type=DataTypes.ROW(
  12. [DataTypes.FIELD("name", DataTypes.STRING()),
  13. DataTypes.FIELD("revenue", DataTypes.BIGINT())]),
  14. func_type="pandas")
  15. orders.map(map_function).alias('name', 'revenue').to_pandas()
结果为:
  1. name revenue
  2. 0 Jack 100
  3. 1 Rose 300
  4. 2 Jack 200
试试
  1. @udf(result_type=DataTypes.STRING())
  2. def sub_string(s: str, begin: int, end: int):
  3. return s[begin:end]
  4. transformed_tab = tab.select(sub_string(col('a'), 2, 4))

方式二:通过 SQL 语句

Flink 的 SQL 基于 Apache Calcite,它实现了标准的 SQL。SQL 查询语句使用字符串来表达。

简单的 SQL 聚合查询:

  1. from pyflink.table import EnvironmentSettings, TableEnvironment
  2. # 通过 stream table environment 来执行查询
  3. env_settings = EnvironmentSettings.in_streaming_mode()
  4. table_env = TableEnvironment.create(env_settings)
  5. table_env.execute_sql("""
  6. CREATE TABLE random_source (
  7. id BIGINT,
  8. data TINYINT
  9. ) WITH (
  10. 'connector' = 'datagen',
  11. 'fields.id.kind'='sequence',
  12. 'fields.id.start'='1',
  13. 'fields.id.end'='8',
  14. 'fields.data.kind'='sequence',
  15. 'fields.data.start'='4',
  16. 'fields.data.end'='11'
  17. )
  18. """)
  19. table_env.execute_sql("""
  20. CREATE TABLE print_sink (
  21. id BIGINT,
  22. data_sum TINYINT
  23. ) WITH (
  24. 'connector' = 'print'
  25. )
  26. """)
  27. table_env.execute_sql("""
  28. INSERT INTO print_sink
  29. SELECT id, sum(data) as data_sum FROM
  30. (SELECT id / 2 as id, data FROM random_source)
  31. WHERE id > 1
  32. GROUP BY id
  33. """).wait()
结果为:
  1. 2> +I(4,11)
  2. 6> +I(2,8)
  3. 8> +I(3,10)
  4. 6> -U(2,8)
  5. 8> -U(3,10)
  6. 6> +U(2,15)
  7. 8> +U(3,19)

实际上,上述输出展示了 print 结果表所接收到的 change log。 change log 的格式为:

  1. {subtask id}> {消息类型}{值的字符串格式}

例如,“2> +I(4,11)” 表示这条消息来自第二个 subtask,其中 “+I” 表示这是一条插入的消息,”(4, 11)” 是这条消息的内容。 另外,”-U” 表示这是一条撤回消息 (即更新前),这意味着应该在 sink 中删除或撤回该消息。 “+U” 表示这是一条更新的记录 (即更新后),这意味着应该在 sink 中更新或插入该消息。

所以,从上面的 change log,我们可以得到如下结果:

  1. (4, 11)
  2. (2, 15)
  3. (3, 19)
试试
  1. t_env.create_temporary_function("sub_string", sub_string)
  2. transformed_tab = t_env.sql_query("SELECT sub_string(a, 2, 4) FROM %s" % tab)
说明:
  • TableEnvironment 中提供了多种方式用于执行 SQL 语句,其用途略有不同:如何从 0 到 1 开发 PyFlink API 作业 - 图1

方式三:Table API 和 SQL 的混合使用

在 SQL 中使用 Table 对象:
  1. # 创建一张 sink 表来接收结果数据
  2. table_env.execute_sql("""
  3. CREATE TABLE table_sink (
  4. id BIGINT,
  5. data VARCHAR
  6. ) WITH (
  7. 'connector' = 'print'
  8. )
  9. """)
  10. # 将 Table API 表转换成 SQL 中的视图
  11. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  12. table_env.create_temporary_view('table_api_table', table)
  13. # 将 Table API 表的数据写入结果表
  14. table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_api_table").wait()
结果为:
  1. 6> +I(1,Hi)
  2. 6> +I(2,Hello)
在 Table API 中使用 SQL 表:
  1. # 创建一张 SQL source 表
  2. table_env.execute_sql("""
  3. CREATE TABLE sql_source (
  4. id BIGINT,
  5. data TINYINT
  6. ) WITH (
  7. 'connector' = 'datagen',
  8. 'fields.id.kind'='sequence',
  9. 'fields.id.start'='1',
  10. 'fields.id.end'='4',
  11. 'fields.data.kind'='sequence',
  12. 'fields.data.start'='4',
  13. 'fields.data.end'='7'
  14. )
  15. """)
  16. # 将 SQL 表转换成 Table API 表
  17. table = table_env.from_path("sql_source")
  18. # 或者通过 SQL 查询语句创建表
  19. table = table_env.sql_query("SELECT * FROM sql_source")
  20. # 将表中的数据写出
  21. print(table.to_pandas())
结果为:
  1. id data
  2. 0 2 5
  3. 1 1 4
  4. 2 4 7
  5. 3 3 6

5)写出结果数据

方式一:collect

你可以使用 TableResult.collect 将 Table 的结果收集到客户端,结果的类型为迭代器类型。 以下代码展示了如何使用 TableResult.collect() 方法:
  1. # 准备 source 表
  2. source = table_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
  3. # 得到 TableResult
  4. res = table_env.execute_sql("select a + 1, b, c from %s" % source)
  5. # 遍历结果
  6. with res.collect() as results:
  7. for result in results:
  8. print(result)
结果为:
  1. <Row(2, 'Hi', 'Hello')>
  2. <Row(3, 'Hello', 'Hello')>
说明:
  • 该方式可以方便地将 table 的结果收集到客户端并查看
  • 由于数据最终会收集到客户端,所以最好限制一下数据条数,比如:

res.limit(10).execute(),限制只收集 10 条数据到客户端

方式二:to_pandas

也可以通过 to_pandas 方法,将 table 的结果转换成 pandas.DataFrame 并查看。
  1. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  2. print(table.to_pandas())
结果为:
  1. id data
  2. 0 1 Hi
  3. 1 2 Hello
说明:
  • 该方式与 collect 类似,也会将 table 的结果收集到客户端,所以最好限制一下结果数据的条数

方式三:通过 DDL

和创建数据源表类似,也可以通过 DDL 的方式来创建结果表。
  1. table_env.execute_sql("""
  2. CREATE TABLE sink_table (
  3. id BIGINT,
  4. data VARCHAR
  5. ) WITH (
  6. 'connector' = 'print'
  7. )
  8. """)
  9. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  10. table.execute_insert("sink_table").wait()
结果为:
  1. 6> +I(1,Hi)
  2. 6> +I(2,Hello)
说明:
  • 当使用 print 作为 sink 时,作业结果会打印到标准输出中。如果不需要查看输出,也可以使用 blackhole 作为 sink。
也可以通过 SQL 来完成:
  1. table_env.create_temporary_view("table_source", table)
  2. table_env.execute_sql("INSERT INTO sink_table SELECT * FROM table_source").wait()
你也可以使用 StatementSet 在一个作业中将 Table 中的数据写入到多张 sink 表中:
  1. # 准备 source 表和 sink 表
  2. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  3. table_env.create_temporary_view("simple_source", table)
  4. table_env.execute_sql("""
  5. CREATE TABLE first_sink_table (
  6. id BIGINT,
  7. data VARCHAR
  8. ) WITH (
  9. 'connector' = 'print'
  10. )
  11. """)
  12. table_env.execute_sql("""
  13. CREATE TABLE second_sink_table (
  14. id BIGINT,
  15. data VARCHAR
  16. ) WITH (
  17. 'connector' = 'print'
  18. )
  19. """)
  20. # 创建 statement set
  21. statement_set = table_env.create_statement_set()
  22. # 将 "table" 的数据写入 "first_sink_table"
  23. statement_set.add_insert("first_sink_table", table)
  24. # 通过一条 sql 插入语句将数据从 "simple_source" 写入到 "second_sink_table"
  25. statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source")
  26. # 执行 statement set
  27. statement_set.execute().wait()
结果为:
  1. 7> +I(1,Hi)
  2. 7> +I(1,Hi)
  3. 7> +I(2,Hello)
  4. 7> +I(2,Hello)

6)查看执行计划

Table API 提供了一种机制来查看 Table 的逻辑查询计划和优化后的查询计划。 这是通过 Table.explain() 或者 StatementSet.explain() 方法来完成的。Table.explain() 可以返回一个 Table 的执行计划。StatementSet.explain() 则可以返回含有多个 sink 的作业的执行计划。这些方法会返回一个字符串,字符串描述了以下三个方面的信息:
  1. 关系查询的抽象语法树,即未经优化的逻辑查询计划,
  2. 优化后的逻辑查询计划,
  3. 物理执行计划。
用户在开发或者调试作业的过程中,可能需要查看作业的执行计划,可以通过如下方式。

方式一:Table.explain

比如,当我们需要知道 transformed_tab 当前的执行计划时,可以执行:print(transformed_tab.explain()),可以得到如下输出:
  1. == Abstract Syntax Tree ==
  2. LogicalProject(EXPR$0=[sub_string($0, 2, 4)])
  3. +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]]])
  4. == Optimized Logical Plan ==
  5. PythonCalc(select=[sub_string(a, 2, 4) AS EXPR$0])
  6. +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]]], fields=[a])
  7. == Physical Execution Plan ==
  8. Stage 1 : Data Source
  9. content : Source: PythonInputFormatTableSource(a)
  10. Stage 2 : Operator
  11. content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]], fields=[a])
  12. ship_strategy : FORWARD
  13. Stage 3 : Operator
  14. content : StreamExecPythonCalc
  15. ship_strategy : FORWARD

方式二:TableEnvironment.explain_sql

方式一适用于查看某一个 table 的执行计划,有时候并没有一个现成的 table 对象可用,比如:
  1. print(t_env.explain_sql("INSERT INTO my_sink SELECT * FROM %s " % transformed_tab))
其执行计划如下所示:
  1. == Abstract Syntax Tree ==
  2. LogicalSink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0])
  3. +- LogicalProject(EXPR$0=[sub_string($0, 2, 4)])
  4. +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]]])
  5. == Optimized Logical Plan ==
  6. Sink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0])
  7. +- PythonCalc(select=[sub_string(a, 2, 4) AS EXPR$0])
  8. +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]]], fields=[a])
  9. == Physical Execution Plan ==
  10. Stage 1 : Data Source
  11. content : Source: PythonInputFormatTableSource(a)
  12. Stage 2 : Operator
  13. content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]], fields=[a])
  14. ship_strategy : FORWARD
  15. Stage 3 : Operator
  16. content : StreamExecPythonCalc
  17. ship_strategy : FORWARD
  18. Stage 4 : Data Sink
  19. content : Sink: Sink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0])
  20. ship_strategy : FORWARD

7)总结

完整的作业示例如下:
  1. from pyflink.table import DataTypes, EnvironmentSettings, StreamTableEnvironment
  2. from pyflink.table.expressions import col
  3. from pyflink.table.udf import udf
  4. def table_api_demo():
  5. env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
  6. t_env = StreamTableEnvironment.create(environment_settings=env_settings)
  7. t_env.get_config().get_configuration().set_string('parallelism.default', '4')
  8. t_env.execute_sql("""
  9. CREATE TABLE my_source (
  10. a VARCHAR,
  11. b VARCHAR
  12. ) WITH (
  13. 'connector' = 'datagen',
  14. 'number-of-rows' = '10'
  15. )
  16. """)
  17. tab = t_env.from_path('my_source')
  18. @udf(result_type=DataTypes.STRING())
  19. def sub_string(s: str, begin: int, end: int):
  20. return s[begin:end]
  21. transformed_tab = tab.select(sub_string(col('a'), 2, 4))
  22. t_env.execute_sql("""
  23. CREATE TABLE my_sink (
  24. `sum` VARCHAR
  25. ) WITH (
  26. 'connector' = 'print'
  27. )
  28. """)
  29. table_result = transformed_tab.execute_insert('my_sink')
  30. # 1)等待作业执行结束,用于local执行,否则可能作业尚未执行结束,该脚本已退出,会导致minicluster过早退出
  31. # 2)当作业通过detach模式往remote集群提交时,比如YARN/Standalone/K8s等,需要移除该方法
  32. table_result.wait()
  33. if __name__ == '__main__':
  34. table_api_demo()
执行结果如下:
  1. 4> +I(a1)
  2. 3> +I(b0)
  3. 2> +I(b1)
  4. 1> +I(37)
  5. 3> +I(74)
  6. 4> +I(3d)
  7. 1> +I(07)
  8. 2> +I(f4)
  9. 1> +I(7f)
  10. 2> +I(da)

PyFlink DataStream API 作业

1)创建 StreamExecutionEnvironment 对象

对于 DataStream API 作业来说,用户首先需要定义一个 StreamExecutionEnvironment 对象。
  1. env = StreamExecutionEnvironment.get_execution_environment()

2)配置作业的执行参数

可以通过以下方式,配置作业的执行参数。以下示例将作业的默认并发度设置为4。
  1. env.set_parallelism(4)

3)创建数据源

接下来,需要为作业创建一个数据源。PyFlink 中提供了多种方式来定义数据源。

方式一:from_collection

PyFlink 支持用户从一个列表创建源表。以下示例定义了包含了 3 行数据的表:[(1, ‘aaa|bb’), (2, ‘bb|a’), (3, ‘aaa|a’)],该表有 2 列,列名分别为 a 和 b,类型分别为 VARCHAR 和 BIGINT。
  1. ds = env.from_collection(
  2. collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
  3. type_info=Types.ROW([Types.INT(), Types.STRING()]))
说明:
  • 这种方式通常用于测试阶段,可以方便地创建一个数据源
  • from_collection 方法可以接收两个参数,其中第一个参数用于指定数据列表;第二个参数用于指定数据的类型

方式二:使用 PyFlink DataStream API 中定义的 connector

此外,也可以使用 PyFlink DataStream API 中已经支持的 connector,需要注意的是,1.12 中仅提供了 Kafka connector 的支持。
  1. deserialization_schema = JsonRowDeserializationSchema.builder() \
  2. .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
  3. kafka_consumer = FlinkKafkaConsumer(
  4. topics='test_source_topic',
  5. deserialization_schema=deserialization_schema,
  6. properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})
  7. ds = env.add_source(kafka_consumer)
说明:
  • Kafka connector 当前没有包含在 Flink 官方提供的发行包中,如果需要在PyFlink 作业中使用,用户需要显式地指定相应 FAT JAR [2],JAR 包可以通过如下方式指定:
  1. # 注意:file:///前缀不能省略
  2. env.add_jars("file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar")
  • 即使是 PyFlink DataStream API 作业,也推荐使用 Table & SQL connector 中打包出来的 FAT JAR,可以避免递归依赖的问题。

方式三:使用 PyFlink Table API 中定义的 connector

以下示例定义了如何将 Table & SQL 中支持的 connector 用于 PyFlink DataStream API 作业。
  1. t_env = StreamTableEnvironment.create(stream_execution_environment=env)
  2. t_env.execute_sql("""
  3. CREATE TABLE my_source (
  4. a INT,
  5. b VARCHAR
  6. ) WITH (
  7. 'connector' = 'datagen',
  8. 'number-of-rows' = '10'
  9. )
  10. """)
  11. ds = t_env.to_append_stream(
  12. t_env.from_path('my_source'),
  13. Types.ROW([Types.INT(), Types.STRING()]))
说明:
  • 由于当前 PyFlink DataStream API 中 built-in 支持的 connector 种类还比较少,推荐通过这种方式来创建 PyFlink DataStream API 作业中使用的数据源表,这样的话,所有 PyFlink Table API 中可以使用的 connector,都可以在 PyFlink DataStream API 作业中使用。
  • 需要注意的是,TableEnvironment 需要通过以下方式创建 StreamTableEnvironment.create(stream_execution_environment=env),以使得 PyFlink DataStream API 与 PyFlink Table API 共享同一个 StreamExecutionEnvironment 对象。

4)定义计算逻辑

生成数据源对应的 DataStream 对象之后,接下来就可以使用 PyFlink DataStream API 中定义的各种操作,定义计算逻辑,对 DataStream 对象进行变换了,比如:
  1. def split(s):
  2. splits = s[1].split("|")
  3. for sp in splits:
  4. yield s[0], sp
  5. ds = ds.map(lambda i: (i[0] + 1, i[1])) \
  6. .flat_map(split) \
  7. .key_by(lambda i: i[1]) \
  8. .reduce(lambda i, j: (i[0] + j[0], i[1]))

5)写出结果数据

方式一:print

可以调用 DataStream 对象上的 print 方法,将 DataStream 的结果打印到标准输出中,比如:
  1. ds.print()

方式二:使用 PyFlink DataStream API 中定义的 connector

可以直接使用 PyFlink DataStream API 中已经支持的 connector,需要注意的是,1.12 中提供了对于 FileSystem、JDBC、Kafka connector 的支持,以 Kafka 为例:
  1. serialization_schema = JsonRowSerializationSchema.builder() \
  2. .with_type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
  3. kafka_producer = FlinkKafkaProducer(
  4. topic='test_sink_topic',
  5. serialization_schema=serialization_schema,
  6. producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})
  7. ds.add_sink(kafka_producer)
说明:
  • JDBC、Kafka connector 当前没有包含在 Flink 官方提供的发行包中,如果需要在 PyFlink 作业中使用,用户需要显式地指定相应 FAT JAR,比如 Kafka connector 可以使用 JAR 包 [2],JAR 包可以通过如下方式指定:
  1. # 注意:file:///前缀不能省略
  2. env.add_jars("file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar")
  • 推荐使用 Table & SQL connector 中打包出来的 FAT JAR,可以避免递归依赖的问题。

方式三:使用 PyFlink Table API 中定义的 connector

以下示例展示了如何将 Table & SQL 中支持的 connector,用作 PyFlink DataStream API 作业的 sink。
  1. # 写法一:ds类型为Types.ROW
  2. def split(s):
  3. splits = s[1].split("|")
  4. for sp in splits:
  5. yield Row(s[0], sp)
  6. ds = ds.map(lambda i: (i[0] + 1, i[1])) \
  7. .flat_map(split, Types.ROW([Types.INT(), Types.STRING()])) \
  8. .key_by(lambda i: i[1]) \
  9. .reduce(lambda i, j: Row(i[0] + j[0], i[1]))
  10. # 写法二:ds类型为Types.TUPLE
  11. def split(s):
  12. splits = s[1].split("|")
  13. for sp in splits:
  14. yield s[0], sp
  15. ds = ds.map(lambda i: (i[0] + 1, i[1])) \
  16. .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
  17. .key_by(lambda i: i[1]) \
  18. .reduce(lambda i, j: (i[0] + j[0], i[1]))
  19. # 将ds写出到sink
  20. t_env.execute_sql("""
  21. CREATE TABLE my_sink (
  22. a INT,
  23. b VARCHAR
  24. ) WITH (
  25. 'connector' = 'print'
  26. )
  27. """)
  28. table = t_env.from_data_stream(ds)
  29. table_result = table.execute_insert("my_sink")
说明:
  • 需要注意的是,t_env.from_data_stream(ds) 中的 ds 对象的 result type 类型必须是复合类型 Types.ROW 或者 Types.TUPLE,这也就是为什么需要显式声明作业计算逻辑中 flat_map 操作的 result 类型
  • 作业的提交,需要通过 PyFlink Table API 中提供的作业提交方式进行提交
  • 由于当前 PyFlink DataStream API 中支持的 connector 种类还比较少,推荐通过这种方式来定义 PyFlink DataStream API 作业中使用的数据源表,这样的话,所有 PyFlink Table API 中可以使用的 connector,都可以作为 PyFlink DataStream API 作业的 sink。

6)总结

完整的作业示例如下:

方式一(适合调试):

  1. from pyflink.common.typeinfo import Types
  2. from pyflink.datastream import StreamExecutionEnvironment
  3. def data_stream_api_demo():
  4. env = StreamExecutionEnvironment.get_execution_environment()
  5. env.set_parallelism(4)
  6. ds = env.from_collection(
  7. collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
  8. type_info=Types.ROW([Types.INT(), Types.STRING()]))
  9. def split(s):
  10. splits = s[1].split("|")
  11. for sp in splits:
  12. yield s[0], sp
  13. ds = ds.map(lambda i: (i[0] + 1, i[1])) \
  14. .flat_map(split) \
  15. .key_by(lambda i: i[1]) \
  16. .reduce(lambda i, j: (i[0] + j[0], i[1]))
  17. ds.print()
  18. env.execute()
  19. if __name__ == '__main__':
  20. data_stream_api_demo()
执行结果如下:
  1. 3> (2, 'aaa')
  2. 3> (2, 'bb')
  3. 3> (6, 'aaa')
  4. 3> (4, 'a')
  5. 3> (5, 'bb')
  6. 3> (7, 'a')

方式二(适合线上作业):

  1. from pyflink.common.typeinfo import Types
  2. from pyflink.datastream import StreamExecutionEnvironment
  3. from pyflink.table import StreamTableEnvironment
  4. def data_stream_api_demo():
  5. env = StreamExecutionEnvironment.get_execution_environment()
  6. t_env = StreamTableEnvironment.create(stream_execution_environment=env)
  7. env.set_parallelism(4)
  8. t_env.execute_sql("""
  9. CREATE TABLE my_source (
  10. a INT,
  11. b VARCHAR
  12. ) WITH (
  13. 'connector' = 'datagen',
  14. 'number-of-rows' = '10'
  15. )
  16. """)
  17. ds = t_env.to_append_stream(
  18. t_env.from_path('my_source'),
  19. Types.ROW([Types.INT(), Types.STRING()]))
  20. def split(s):
  21. splits = s[1].split("|")
  22. for sp in splits:
  23. yield s[0], sp
  24. ds = ds.map(lambda i: (i[0] + 1, i[1])) \
  25. .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
  26. .key_by(lambda i: i[1]) \
  27. .reduce(lambda i, j: (i[0] + j[0], i[1]))
  28. t_env.execute_sql("""
  29. CREATE TABLE my_sink (
  30. a INT,
  31. b VARCHAR
  32. ) WITH (
  33. 'connector' = 'print'
  34. )
  35. """)
  36. table = t_env.from_data_stream(ds)
  37. table_result = table.execute_insert("my_sink")
  38. # 1)等待作业执行结束,用于local执行,否则可能作业尚未执行结束,该脚本已退出,会导致minicluster过早退出
  39. # 2)当作业通过detach模式往remote集群提交时,比如YARN/Standalone/K8s等,需要移除该方法
  40. table_result.wait()
  41. if __name__ == '__main__':
  42. data_stream_api_demo()

作业提交

Flink 提供了多种作业部署方式,比如 local、standalone、YARN、K8s 等,PyFlink 也支持上述作业部署方式,请参考 Flink 官方文档 [3],了解更多详细信息。

local

说明:使用该方式执行作业时,会启动一个 minicluster,作业会提交到 minicluster 中执行,该方式适合作业开发阶段。
  1. python3 table_api_demo.py

standalone

说明:使用该方式执行作业时,作业会提交到一个远端的 standalone 集群。
  1. ./bin/flink run --jobmanager localhost:8081 --python table_api_demo.py

YARN Per-Job

说明:使用该方式执行作业时,作业会提交到一个远端的 YARN 集群。
  1. ./bin/flink run --target yarn-per-job --python table_api_demo.py

K8s application mode

说明:使用该方式执行作业时,作业会提交到 K8s 集群,以 application mode 的方式执行。
  1. ./bin/flink run-application \
  2. --target kubernetes-application \
  3. --parallelism 8 \
  4. -Dkubernetes.cluster-id=<ClusterId> \
  5. -Dtaskmanager.memory.process.size=4096m \
  6. -Dkubernetes.taskmanager.cpu=2 \
  7. -Dtaskmanager.numberOfTaskSlots=4 \
  8. -Dkubernetes.container.image=<PyFlinkImageName> \
  9. --pyModule table_api_demo \
  10. --pyFiles file:///path/to/table_api_demo.py

参数说明

除了上面提到的参数之外,通过 flink run 提交的时候,还有其它一些和 PyFlink 作业相关的参数。
参数名 用途描述 示例
-py / —python 指定作业的入口文件 -py file:///path/to/table_api_demo.py
-pym / —pyModule 指定作业的 entry module,功能和—python类似,可用于当作业的 Python 文件为 zip 包,无法通过—python 指定时,相比—python 来说,更通用 -pym table_api_demo -pyfs file:///path/to/table_api_demo.py
-pyfs / —pyFiles 指定一个到多个 Python 文件(.py/.zip等,逗号分割),这些 Python 文件在作业执行的时候,会放到 Python 进程的 PYTHONPATH 中,可以在 Python 自定义函数中访问到 -pyfs file:///path/to/table_api_demo.py,file:///path/to/deps.zip
-pyarch / —pyArchives 指定一个到多个存档文件(逗号分割),这些存档文件,在作业执行的时候,会被解压之后,放到 Python 进程的 workspace 目录,可以通过相对路径的方式进行访问 -pyarch file:///path/to/venv.zip
-pyexec / —pyExecutable 指定作业执行的时候,Python 进程的路径 -pyarch file:///path/to/venv.zip -pyexec venv.zip/venv/bin/python3
-pyreq / —pyRequirements 指定 requirements 文件,requirements 文件中定义了作业的依赖 -pyreq requirements.txt

问题排查

当我们刚刚上手 PyFlink 作业开发的时候,难免会遇到各种各样的问题,学会如何排查问题是非常重要的。接下来,我们介绍一些常见的问题排查手段。

client 端异常输出

PyFlink 作业也遵循 Flink 作业的提交方式,作业首先会在 client 端编译成 JobGraph,然后提交到 Flink 集群执行。如果作业编译有问题,会导致在 client 端提交作业的时候就抛出异常,此时可以在 client 端看到类似这样的输出:
  1. Traceback (most recent call last):
  2. File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 50, in <module>
  3. data_stream_api_demo()
  4. File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 45, in data_stream_api_demo
  5. table_result = table.execute_insert("my_")
  6. File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/table/table.py", line 864, in execute_insert
  7. return TableResult(self._j_table.executeInsert(table_path, overwrite))
  8. File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/py4j/java_gateway.py", line 1285, in __call__
  9. return_value = get_return_value(
  10. File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 162, in deco
  11. raise java_exception
  12. pyflink.util.exceptions.TableException: Sink `default_catalog`.`default_database`.`my_` does not exists
  13. at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:247)
  14. at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:159)
  15. at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:159)
  16. at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  17. at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  18. at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  19. at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  20. at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  21. at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  22. at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  23. at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  24. at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:159)
  25. at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
  26. at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
  27. at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
  28. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  29. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  30. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  31. at java.lang.reflect.Method.invoke(Method.java:498)
  32. at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
  33. at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
  34. at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
  35. at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  36. at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
  37. at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
  38. at java.lang.Thread.run(Thread.java:748)
  39. Process finished with exit code 1
比如上述报错说明作业中使用的名字为”my_”的表不存在。

TaskManager 日志文件

有些错误直到作业运行的过程中才会发生,比如脏数据或者 Python 自定义函数的实现问题等,针对这种错误,通常需要查看 TaskManager 的日志文件,比如以下错误反映用户在 Python 自定义函数中访问的 opencv 库不存在。
  1. Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last):
  2. File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
  3. response = task()
  4. File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
  5. lambda: self.create_worker().do_instruction(request), request)
  6. File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instruction
  7. return getattr(self, request_type)(
  8. File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle
  9. bundle_processor.process_bundle(instruction_id))
  10. File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 977, in process_bundle
  11. input_op_by_transform_id[element.transform_id].process_encoded(
  12. File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded
  13. self.output(decoded_value)
  14. File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output
  15. File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output
  16. File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  17. File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  18. File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 85, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  19. File "pyflink/fn_execution/coder_impl_fast.pyx", line 83, in pyflink.fn_execution.coder_impl_fast.DataStreamFlatMapCoderImpl.encode_to_stream
  20. File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 26, in split
  21. import cv2
  22. ModuleNotFoundError: No module named 'cv2'
  23. at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
  24. at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
  25. at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
  26. at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
  27. at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
  28. at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
  29. at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
  30. at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
  31. at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
  32. at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
  33. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  34. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  35. ... 1 more
说明:
  • local 模式下,TaskManager 的 log 位于 PyFlink 的安装目录下:site-packages/pyflink/log/,也可以通过如下命令找到:
  1. \>>> import pyflink
  2. \>>> print(pyflink.__path__)
  3. ['/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink'],则log文件位于/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/log目录下

自定义日志

有时候,异常日志的内容并不足以帮助我们定位问题,此时可以考虑在 Python 自定义函数中打印一些日志信息。PyFlink 支持用户在 Python 自定义函数中通过 logging 的方式输出 log,比如:
  1. def split(s):
  2. import logging
  3. logging.info("s: " + str(s))
  4. splits = s[1].split("|")
  5. for sp in splits:
  6. yield s[0], sp
通过上述方式,split 函数的输入参数,会打印到 TaskManager 的日志文件中。

远程调试

PyFlink 作业,在运行过程中,会启动一个独立的 Python 进程执行 Python 自定义函数,所以如果需要调试 Python 自定义函数,需要通过远程调试的方式进行,可以参见[4],了解如何在 Pycharm 中进行 Python 远程调试。 1)在 Python 环境中安装 pydevd-pycharm: pip install pydevd-pycharm~=203.7717.65 2)在 Python 自定义函数中设置远程调试参数:
  1. def split(s):
  2. import pydevd_pycharm
  3. pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True, stderrToServer=True)
  4. splits = s[1].split("|")
  5. for sp in splits:
  6. yield s[0], sp
3)按照 Pycharm 中远程调试的步骤,进行操作即可,可以参见[4],也可以参考博客[5]中“代码调试”部分的介绍。 说明:Python 远程调试功能只在 Pycharm 的 professional 版才支持。

参考文档

如何从 0 到 1 开发 PyFlink API 作业

Python API