Python编程指南(试用版)

Flink中的分析程序是在数据集上实现转换(例如过滤、映射、连接、分组)的常规程序。数据集最初是从某些源(例如,通过读取文件或从集合)创建的。结果通过接收器返回,接收器可以将数据写入(分布式)文件或标准输出(例如命令行终端)。Flink程序在各种环境下运行,独立运行,或嵌入到其他程序中。执行可以在本地JVM中执行,也可以在许多机器的集群中执行。

为了创建自己的Flink程序,我们鼓励您从程序框架(Program Skeleton)开始,逐步添加自己的转化模块(Transformations)。其余部分作为附加操作和高级功能的参考。

示例程序

下面的程序是wordcount的一个完整的工作示例。您可以复制和粘贴代码以在本地运行它。

  1. from flink.plan.Environment import get_environment
  2. from flink.functions.GroupReduceFunction import GroupReduceFunction
  3. class Adder(GroupReduceFunction):
  4. def reduce(self, iterator, collector):
  5. count, word = iterator.next()
  6. count += sum([x[0] for x in iterator])
  7. collector.collect((count, word))
  8. env = get_environment()
  9. data = env.from_elements("Who's there?",
  10. "I think I hear them. Stand, ho! Who's there?")
  11. data \
  12. .flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \
  13. .group_by(1) \
  14. .reduce_group(Adder(), combinable=True) \
  15. .output()
  16. env.execute(local=True)

程序框架

正如我们在示例中看到的,Flink程序看起来像常规的Python程序。每个程序由相同的基本部分组成:

1.获取“环境”,

2.加载/创建初始数据,

3.指定此数据的转换,

4。指定计算结果的放置位置,以及

5。执行你的程序。

现在,我们将对这些步骤进行概述,但请参阅各个部分了解更多详细信息。

“环境”是所有Flink程序的基础。您可以在类“environment”上使用这些静态方法获得一个:

  1. get_environment()

为了指定数据源,执行环境有几种方法可以从文件中读取数据。要将文本文件作为一系列行读取,可以使用:

  1. env = get_environment()
  2. text = env.read_text("file:///path/to/file")

这将为您提供一个数据集,您可以在该数据集上应用转换。有关数据源和输入格式的详细信息,请参阅[数据源](数据源)。

一旦拥有了一个数据集,您就可以应用转换来创建一个新的数据集,然后您可以将其写入文件、再次转换或与其他数据集组合。通过使用自己的自定义转换函数对数据集调用方法来应用转换。例如,映射转换如下所示:

  1. data.map(lambda x: x*2)

这将通过将原始数据集中的每个值加倍来创建新的数据集。有关更多信息和所有转换的列表,请参阅[转换](转换)。

一旦您有了一个需要写入磁盘的数据集,就可以对数据集调用以下方法之一:

  1. data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)
  2. write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', write_mode=Constants.NO_OVERWRITE)
  3. output()

最后一个方法只对在本地机器上开发/调试有用,它将把数据集的内容输出到标准输出。(请注意,在集群中,结果将进入集群节点的标准输出流,并最终进入工作人员的输出文件中)。前两个按名字说的做。有关写入文件的详细信息,请参阅[数据接收器](数据接收器)。

一旦指定了完整的程序,就需要在“environment”上调用“execute”。这将提交您的程序以在集群上执行。

项目设置

除了设置Flink,不需要额外的工作。python包可以在flink发行版的/resource文件夹中找到。运行作业时,Flink包以及计划和可选包会通过HDFS自动分布在集群中。

在安装了python 2.7或3.4的Linux/Windows系统上测试了pythonAPI。

默认情况下,Flink将通过调用“python”来启动python进程。通过在flink-conf.yaml中设置“python.binary.path”键,可以修改此行为以使用所选的二进制文件。

延迟执行

所有Flink程序都是延迟执行的:当程序的主要方法被执行时,数据加载和转换不会直接发生。相反,每个操作都被创建并添加到程序的计划中。当对环境对象调用“execute()”方法之一时,实际执行操作。

延迟执行允许您构建复杂的程序,Flink作为一个整体计划单元执行。

转化

数据转换将一个或多个数据集转换为新的数据集。程序可以将多个转换组合成复杂的程序集。

本节简要概述了可用的转换。[转换文档](dataset_transforms.html)用示例对所有转换进行了完整描述。

转化 描述
Map 获取一个元素并生成一个元素.
  1. data.map(lambda x: x * 2)

| | FlatMap | Takes one element and produces zero, one, or more elements.

  1. data.flat_map(
  2. lambda x,c: [(1,word) for word in line.lower().split() for line in x])

| | MapPartition | Transforms a parallel partition in a single function call. The function get the partition as an Iterator and can produce an arbitrary number of result values. The number of elements in each partition depends on the parallelism and previous operations.

  1. data.map_partition(lambda x,c: [value * 2 for value in x])

| | Filter | Evaluates a boolean function for each element and retains those for which the function returns true.

  1. data.filter(lambda x: x &gt; 1000)

| | Reduce | Combines a group of elements into a single element by repeatedly combining two elements into one. Reduce may be applied on a full data set, or on a grouped data set.

  1. data.reduce(lambda x,y : x + y)

| | ReduceGroup | Combines a group of elements into one or more elements. ReduceGroup may be applied on a full data set, or on a grouped data set.

  1. class Adder(GroupReduceFunction):
  2. def reduce(self, iterator, collector):
  3. count, word = iterator.next()
  4. count += sum([x[0] for x in iterator)
  5. collector.collect((count, word))
  6. data.reduce_group(Adder())

| | Aggregate | Performs a built-in operation (sum, min, max) on one field of all the Tuples in a data set or in each group of a data set. Aggregation can be applied on a full dataset or on a grouped data set.

  1. #此代码查找第一个字段中所有值的和以及第二个字段中所有值的最大值
  2. data.aggregate(Aggregation.Sum, 0).and_agg(Aggregation.Max, 1)
  3. # min(), max(), and sum()都是可用的
  4. data.sum(0).and_agg(Aggregation.Max, 1)

| | Join | Joins two data sets by creating all pairs of elements that are equal on their keys. Optionally uses a JoinFunction to turn the pair of elements into a single element. See keys on how to define join keys.

  1. #在这种情况下,元组字段用作键。
  2. #“0”是第一个元组上的联接字段
  3. #“1”是第二个元组上的联接字段。
  4. result = input1.join(input2).where(0).equal_to(1)

| | CoGroup | The two-dimensional variant of the reduce operation. Groups each input on one or more fields and then joins the groups. The transformation function is called per pair of groups. See keys on how to define coGroup keys.

  1. data1.co_group(data2).where(0).equal_to(1)

| | Cross | Builds the Cartesian product (cross product) of two inputs, creating all pairs of elements. Optionally uses a CrossFunction to turn the pair of elements into a single element.

  1. result = data1.cross(data2)

| | Union | Produces the union of two data sets.

  1. data.union(data2)

| | ZipWithIndex | Assigns consecutive indexes to each element. For more information, please refer to the Zip Elements Guide.

  1. data.zip_with_index()

|

指定键

某些转换(如join或cogroup)要求在参数数据集上定义键,而其他转换(reduce、groupreduce)允许在应用前将数据集分组到键上。

数据集分组为

  1. reduced = data \
  2. .group_by(<define key here>) \
  3. .reduce_group(<do something>)

Flink的数据模型不是基于键值对的。因此,您不需要将数据集类型物理打包为键和值。键是“虚拟的”:它们被定义为实际数据上的函数,以指导分组运算符。

定义元组的键

最简单的情况是在元组的一个或多个字段上对元组的数据集进行分组:

  1. reduced = data \
  2. .group_by(0) \
  3. .reduce_group(<do something>)

数据集分组在元组的第一个字段上。因此,group reduce函数将在第一个字段中接收具有相同值的元组组。

  1. grouped = data \
  2. .group_by(0,1) \
  3. .reduce(/*do something*/)

数据集分组在由第一个和第二个字段组成的复合键上,因此reduce函数将接收两个字段具有相同值的组。

关于嵌套元组的说明:如果有一个数据集具有一个嵌套元组,并且指定了“group_by(&lt;index of tuple&gt;)”,则系统将使用完整的元组作为键。

将函数传递给Flink

某些操作需要用户定义的函数,而所有这些操作都接受lambda函数和Rich functions作为参数。

  1. data.filter(lambda x: x > 5)
  1. class Filter(FilterFunction):
  2. def filter(self, value):
  3. return value > 5
  4. data.filter(Filter())

Rich Function允许使用导入的函数,提供对广播变量的访问,可以使用init()参数化,并且是复杂函数的转到选项。它们也是为reduce操作定义可选“combine”函数的唯一方法。

lambda函数允许轻松插入一个liners。请注意,如果操作可以返回多个值,lambda函数必须返回iterable。(所有接收收集器参数的函数)

数据类型

Flink的python api目前只提供对原始python类型(int、float、bool、string)和字节数组的本机支持。

可以通过向环境传递序列化程序、反序列化程序和类型类来扩展类型支持。

  1. class MyObj(object):
  2. def __init__(self, i):
  3. self.value = i
  4. class MySerializer(object):
  5. def serialize(self, value):
  6. return struct.pack(">i", value.value)
  7. class MyDeserializer(object):
  8. def _deserialize(self, read):
  9. i = struct.unpack(">i", read(4))[0]
  10. return MyObj(i)
  11. env.register_custom_type(MyObj, MySerializer(), MyDeserializer())

元组/列表

可以将元组(或列表)用于复合类型。python元组被映射到flink tuple类型,其中包含固定数量的各种类型的字段(最多25个)。一个元组的每个字段都可以是一个基元类型——包括更多的元组,从而产生嵌套的元组。

  1. word_counts = env.from_elements(("hello", 1), ("world",2))
  2. counts = word_counts.map(lambda x: x[1])

当使用需要键来分组或匹配记录的运算符时,元组允许您简单地指定要用作键的字段的位置。可以指定多个位置来使用组合键(请参见[节数据转换](转换))。

  1. wordCounts \
  2. .group_by(0) \
  3. .reduce(MyReduceFunction())

数据源

数据源创建初始数据集,例如来自文件或集合。

基于文件的:

  • read_text(path)-逐行读取文件并将其作为字符串返回。
  • read csv(path,type)-解析逗号(或其他字符)分隔字段的文件。返回元组的数据集。支持基本的Java类型和它们的值对应类型作为字段类型。

基于集合:

  • from_elements(*args)-从seq创建数据集。所有元素
  • generate_sequence(from, to) - 并行生成给定间隔内的数字序列。

例子

  1. env = get_environment
  2. \# read text file from local files system
  3. localLiens = env.read_text("file:#/path/to/my/textfile")
  4. \# read text file from a HDFS running at nnHost:nnPort
  5. hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")
  6. \# read a CSV file with three fields, schema defined using constants defined in flink.plan.Constants
  7. csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))
  8. \# create a set from some given elements
  9. values = env.from_elements("Foo", "bar", "foobar", "fubar")
  10. \# generate a number sequence
  11. numbers = env.generate_sequence(1, 10000000)

数据汇

数据接收器使用数据集并用于存储或返回它们:

  • write_text()-将元素按行写入字符串。字符串是通过调用每个元素的_str()方法获得的。
  • write_csv(…)-将元组作为逗号分隔的值文件写入。行和字段分隔符是可配置的。每个字段的值来自对象的\str()方法。
  • output()-在standard out上打印每个元素的\str()值。 A DataSet can be input to multiple operations. Programs can write or print a data set and at the same time run additional transformations on them.

Examples

Standard data sink methods:

  1. write DataSet to a file on the local file system
  2. textData.write_text("file:///my/result/on/localFS")
  3. write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
  4. textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS")
  5. write DataSet to a file and overwrite the file if it exists
  6. textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)
  7. tuples as lines with pipe as the separator "a|b|c"
  8. values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|")
  9. this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
  10. values.write_text("file:///path/to/the/result/file")

广播变量

广播变量允许您将数据集除了操作的常规输入外,还可用于操作的所有并行实例。这对于辅助数据集或依赖于数据的参数化很有用。然后,可以在操作员处作为集合访问数据集。

  • broadcast:广播集通过’with_broadcast_set(dataset,string)按名称注册`
  • 访问:通过“self.context.get_broadcast_variable(string)”在目标运算符处访问
  1. class MapperBcv(MapFunction):
  2. def map(self, value):
  3. factor = self.context.get_broadcast_variable("bcv")[0][0]
  4. return value * factor
  5. # 1\. The DataSet to be broadcast
  6. toBroadcast = env.from_elements(1, 2, 3)
  7. data = env.from_elements("a", "b")
  8. # 2\. Broadcast the DataSet
  9. data.map(MapperBcv()).with_broadcast_set("bcv", toBroadcast)

在注册和访问广播数据集时,请确保名称(在上一个示例中为“bcv”)匹配。 :由于广播变量的内容保存在每个节点的内存中,不应太大。对于更简单的事情,比如标量值,您可以简单地参数化RICH FUNCTION。

并行执行

本节描述如何在Flink中配置程序的并行执行。Flink程序由多个任务(操作员、数据源和接收器)组成。一个任务被分成几个并行实例来执行,每个并行实例处理任务输入数据的一个子集。任务的并行实例数称为“并行度”或“并行度”。

任务的并行度可以在不同级别的Flink中指定。

执行环境级别

Flink程序在[执行环境](程序框架)的上下文中执行。执行环境为它执行的所有运算符、数据源和数据接收器定义默认的并行性。通过显式配置运算符的并行性,可以覆盖执行环境的并行性。

可以通过调用’set_parallelism()’方法来指定执行环境的默认并行性。要执行[wordcount](example program)示例程序的所有运算符、数据源和数据接收器,并行度为“3”,请按以下方式设置执行环境的默认并行度:

  1. env = get_environment()
  2. env.set_parallelism(3)
  3. text.flat_map(lambda x,c: x.lower().split()) \
  4. .group_by(1) \
  5. .reduce_group(Adder(), combinable=True) \
  6. .output()
  7. env.execute()

系统等级

通过在./conf/flink-conf.yaml中设置’parallelism.default’属性,可以定义所有执行环境的系统范围默认并行性。有关详细信息,请参阅[配置](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html)文档。

执行计划

要使用flink运行计划,请转到flink发行版,然后从/bin文件夹运行pyflink.sh脚本。包含该计划的脚本必须作为第一个参数传递,后面是一些附加的python包,最后由-个附加的参数分隔,这些参数将被馈送到脚本中。

  1. ./bin/pyflink.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]