以下示例程序展示了Flink的不同应用程序,从简单的字数统计到图形算法。代码示例说明了Flink的DataSet API的使用。
可以在Flink源存储库的flink-examples-batch模块中找到以下和更多示例的完整源代码。
运行一个例子
为了运行Flink示例,我们假设您有一个正在运行的Flink实例。导航中的“快速入门”和“设置”选项卡描述了启动Flink的各种方法。
最简单的方法是运行./bin/start-cluster.sh,默认情况下启动一个带有一个JobManager和一个TaskManager的本地集群。
Flink的每个二进制版本都包含一个examples目录,其中包含此页面上每个示例的jar文件。
要运行WordCount示例,请发出以下命令:
./bin/flink run ./examples/batch/WordCount.jar
其他示例可以以类似的方式启动。
请注意,通过使用内置数据,许多示例在不传递任何参数的情况下运行。要使用实际数据运行WordCount,您必须将路径传递给数据:
./bin/flink run ./examples/batch/WordCount.jar --input /path/to/some/text/data --output /path/to/result
请注意,非本地文件系统需要模式前缀,例如hdfs://。
字数
WordCount是大数据处理系统的“Hello World”。它计算文本集合中单词的频率。该算法分两步进行:首先,文本将文本分成单个单词。其次,对单词进行分组和计数。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSet<String> text = env.readTextFile("/path/to/file");DataSet<Tuple2<String, Integer>> counts =// split up the lines in pairs (2-tuples) containing: (word,1)text.flatMap(new Tokenizer())// group by the tuple field "0" and sum up tuple field "1".groupBy(0).sum(1);counts.writeAsCsv(outputPath, "\n", " ");// User-defined functionspublic static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {// normalize and split the lineString[] tokens = value.toLowerCase().split("\\W+");// emit the pairsfor (String token : tokens) {if (token.length() > 0) {out.collect(new Tuple2<String, Integer>(token, 1));}}}}
该字计数示例实现上述算法的输入参数:--input <path> --output <path>。作为测试数据,任何文本文件都可以。
val env = ExecutionEnvironment.getExecutionEnvironment// get input data val text = env.readTextFile("/path/to/file")val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }.map { (_, 1) }.groupBy(0).sum(1)counts.writeAsCsv(outputPath, "\n", " ")
该字计数示例实现上述算法的输入参数:--input <path> --output <path>。作为测试数据,任何文本文件都可以。
网页排名
PageRank算法计算链接定义的图形中页面的“重要性”,链接指向一个页面到另一个页面。它是一种迭代图算法,这意味着它重复应用相同的计算。在每次迭代中,每个页面在其所有邻居上分配其当前等级,并将其新等级计算为从其邻居接收的等级的纳税总和。PageRank算法由Google搜索引擎推广,该搜索引擎利用网页的重要性对搜索查询的结果进行排名。
在这个简单的例子中,PageRank通过批量迭代和固定数量的迭代来实现。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// read the pages and initial ranks by parsing a CSV fileDataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath).types(Long.class, Double.class)// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env);// set iterative data setIterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);DataSet<Tuple2<Long, Double>> newRanks = iteration// join pages with outgoing edges and distribute rank.join(pageLinkLists).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())// collect and sum ranks.groupBy(0).sum(1)// apply dampening factor.map(new Dampener(DAMPENING_FACTOR, numPages));DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(newRanks,newRanks.join(iteration).where(0).equalTo(0)// termination condition.filter(new EpsilonFilter()));finalPageRanks.writeAsCsv(outputPath, "\n", " ");// User-defined functionspublic static final class JoinVertexWithEdgesMatchimplements FlatJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long[]>,Tuple2<Long, Double>> {@Overridepublic void join(<Tuple2<Long, Double> page, Tuple2<Long, Long[]> adj,Collector<Tuple2<Long, Double>> out) {Long[] neighbors = adj.f1;double rank = page.f1;double rankToDistribute = rank / ((double) neigbors.length);for (int i = 0; i < neighbors.length; i++) {out.collect(new Tuple2<Long, Double>(neighbors[i], rankToDistribute));}}}public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {private final double dampening, randomJump;public Dampener(double dampening, double numVertices) {this.dampening = dampening;this.randomJump = (1 - dampening) / numVertices;}@Overridepublic Tuple2<Long, Double> map(Tuple2<Long, Double> value) {value.f1 = (value.f1 * dampening) + randomJump;return value;}}public static final class EpsilonFilterimplements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {@Overridepublic boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;}}
所述的PageRank程序实现上述实施例。它需要运行以下参数:--pages <path> --links <path> --output <path> --numPages <n> --iterations <n>。
// User-defined types case class Link(sourceId: Long, targetId: Long)case class Page(pageId: Long, rank: Double)case class AdjacencyList(sourceId: Long, targetIds: Array[Long])// set up execution environment val env = ExecutionEnvironment.getExecutionEnvironment// read the pages and initial ranks by parsing a CSV file val pages = env.readCsvFile[Page](docs_1.7-SNAPSHOT_pagesInputPath)// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids)) val links = env.readCsvFile[Link](docs_1.7-SNAPSHOT_linksInputPath)// assign initial ranks to pages val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))// build adjacency list from link input val adjacencyLists = links// initialize lists.map(e => AdjacencyList(e.sourceId, Array(e.targetId)))// concatenate lists.groupBy("sourceId").reduce {(l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)}// start iteration val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {currentRanks =>val newRanks = currentRanks// distribute ranks to target pages.join(adjacencyLists).where("pageId").equalTo("sourceId") {(page, adjacent, out: Collector[Page]) =>for (targetId <- adjacent.targetIds) {out.collect(Page(targetId, page.rank / adjacent.targetIds.length))}}// collect ranks and sum them up.groupBy("pageId").aggregate(SUM, "rank")// apply dampening factor.map { p =>Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))}// terminate if no rank update was significantval termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {(current, next, out: Collector[Int]) =>// check for significant updateif (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)}(newRanks, termination)}val result = finalRanks// emit result result.writeAsCsv(outputPath, "\n", " ")
he PageRank program implements the above example. It requires the following parameters to run: --pages <path> --links <path> --output <path> --numPages <n> --iterations <n>.
输入文件是纯文本文件,必须格式如下:
页面表示为由新行字符分隔的(长)ID。
- 例如,
"1\n2\n12\n42\n63\n"给出五个页面ID为1,2,12,42和63的页面。 
- 例如,
 链接表示为由空格字符分隔的页面ID对。链接由换行符分隔:
- 例如,
"1 2\n2 12\n1 12\n42 63\n"给出四个(定向)链接(1) - >(2),(2) - >(12),(1) - >(12)和(42) - >(63)。 
- 例如,
 
对于这个简单的实现,要求每个页面至少有一个传入链接和一个传出链接(页面可以指向自身)。
连接组件
连通分量算法通过为同一连接部分中的所有顶点分配相同的组件ID来识别较大图形的部分。与PageRank类似,Connected Components是一种迭代算法。在每个步骤中,每个顶点将其当前组件ID传播到其所有邻居。如果顶点小于其自己的组件ID,则顶点接受来自邻居的组件ID。
此实现使用增量迭代:未更改其组件ID的顶点不参与下一步。这会产生更好的性能,因为后面的迭代通常只处理一些异常值顶点。
// read vertex and edge dataDataSet<Long> vertices = getVertexDataSet(env);DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());// assign the initial component IDs (equal to the vertex ID)DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());// open a delta iterationDeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);// apply the step logic:DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset()// join with the edges.join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())// select the minimum neighbor component ID.groupBy(0).aggregate(Aggregations.MIN, 1)// update if the component ID of the candidate is smaller.join(iteration.getSolutionSet()).where(0).equalTo(0).flatMap(new ComponentIdFilter());// close the delta iteration (delta and new workset are identical)DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);// emit resultresult.writeAsCsv(outputPath, "\n", " ");// User-defined functionspublic static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {@Overridepublic Tuple2<T, T> map(T vertex) {return new Tuple2<T, T>(vertex, vertex);}}public static final class UndirectEdgeimplements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();@Overridepublic void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {invertedEdge.f0 = edge.f1;invertedEdge.f1 = edge.f0;out.collect(edge);out.collect(invertedEdge);}}public static final class NeighborWithComponentIDJoinimplements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {@Overridepublic Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);}}public static final class ComponentIdFilterimplements FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>,Tuple2<Long, Long>> {@Overridepublic void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value,Collector<Tuple2<Long, Long>> out) {if (value.f0.f1 < value.f1.f1) {out.collect(value.f0);}}}
该ConnectedComponents程序实现上述实施例。它需要运行以下参数:--vertices <path> --edges <path> --output <path> --iterations <n>。
// set up execution environment val env = ExecutionEnvironment.getExecutionEnvironment// read vertex and edge data// assign the initial components (equal to the vertex id) val vertices = getVerticesDataSet(env).map { id => (id, id) }// undirected edges by emitting for each input edge the input edges itself and an inverted// version val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }// open a delta iteration val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {(s, ws) =>// apply the step logic: join with the edgesval allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>(edge._2, vertex._2)}// select the minimum neighborval minNeighbors = allNeighbors.groupBy(0).min(1)// update if the component of the candidate is smallerval updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {(newVertex, oldVertex, out: Collector[(Long, Long)]) =>if (newVertex._2 < oldVertex._2) out.collect(newVertex)}// delta and new workset are identical(updatedComponents, updatedComponents)}verticesWithComponents.writeAsCsv(outputPath, "\n", " ")
这个PageRank程序实现了上面的例子。它需要运行以下参数:--pages <path> --links <path> --output <path> --numPages <n> --iterations <n>。
输入文件是纯文本文件,必须格式如下:
顶点表示为ID并用换行符分隔。
- 例如,
"1\n2\n12\n42\n63\n"给出五个顶点(1),(2),(12),(42)和(63)。 
- 例如,
 边缘表示为由空格字符分隔的顶点ID的对。边线由换行符分隔:
- 例如,
"1 2\n2 12\n1 12\n42 63\n"给出四个(无向)链路(1) - (2),(2) - (12),(1) - (12)和(42) - (63)。 
- 例如,
 
