批处理的例子

下面的示例程序展示了Flink的不同应用程序,从简单的单词计数到图形算法。代码示例演示了Flink’s DataSet API的使用。

以下示例的完整源代码和更多示例可以在Flink源代码存储库的flink-examples-batch中找到。

运行一个例子

为了运行Flink示例,我们假设您有一个正在运行的Flink实例可用。导航中的“快速启动”和“设置”选项卡描述了启动Flink的各种方法。

最简单的方法是运行./bin/start-cluster.sh。默认情况下,它使用一个JobManager和一个TaskManager启动本地集群。

Flink的每个二进制版本都包含一个examples目录,其中包含用于该页上每个示例的jar文件。

要运行WordCount示例,发出以下命令:

  1. ./bin/flink run ./examples/batch/WordCount.jar

其他示例也可以以类似的方式开始。

注意,通过使用内置数据,许多示例在运行时不传递任何参数。要使用真实数据运行WordCount,必须将路径传递给数据:

  1. ./bin/flink run ./examples/batch/WordCount.jar --input /path/to/some/text/data --output /path/to/result

注意,非本地文件系统需要一个模式前缀,如hdfs://

Word Count

WordCount是大数据处理系统的“Hello World”。它计算文本集合中单词的频率。该算法分为两个步骤:首先,将文本拆分为单个单词。其次,对单词进行分组和计数。

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. DataSet<String> text = env.readTextFile("/path/to/file");
  3. DataSet<Tuple2<String, Integer>> counts =
  4. // split up the lines in pairs (2-tuples) containing: (word,1)
  5. text.flatMap(new Tokenizer())
  6. // group by the tuple field "0" and sum up tuple field "1"
  7. .groupBy(0)
  8. .sum(1);
  9. counts.writeAsCsv(outputPath, "\n", " ");
  10. // User-defined functions
  11. public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
  12. @Override
  13. public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
  14. // normalize and split the line
  15. String[] tokens = value.toLowerCase().split("\\W+");
  16. // emit the pairs
  17. for (String token : tokens) {
  18. if (token.length() > 0) {
  19. out.collect(new Tuple2<String, Integer>(token, 1));
  20. }
  21. }
  22. }
  23. }

WordCount example 使用输入参数实现了上述算法:--input &lt;path&gt; --output &lt;path&gt;。作为测试数据,任何文本文件都可以。

  1. val env = ExecutionEnvironment.getExecutionEnvironment
  2. // get input data val text = env.readTextFile("/path/to/file")
  3. val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  4. .map { (_, 1) }
  5. .groupBy(0)
  6. .sum(1)
  7. counts.writeAsCsv(outputPath, "\n", " ")

WordCount example 使用输入参数实现了上述算法:--input &lt;path&gt; --output &lt;path&gt;。作为测试数据,任何文本文件都可以。

网页排名

PageRank算法计算链接定义的图中页面的“重要性”,链接从一个页面指向另一个页面。它是一种迭代图算法,即重复应用相同的计算。在每次迭代中,每个页面都将其当前的秩分布到所有相邻的页面上,并将其新秩计算为从相邻页面获得的秩的累加和。PageRank算法是由谷歌搜索引擎推广的,它利用网页的重要性对搜索查询结果进行排序。

在这个简单的例子中,PageRank是通过bulk iteration和固定数量的迭代来实现的。

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. // read the pages and initial ranks by parsing a CSV file
  3. DataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath)
  4. .types(Long.class, Double.class)
  5. // the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
  6. DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env);
  7. // set iterative data set
  8. IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);
  9. DataSet<Tuple2<Long, Double>> newRanks = iteration
  10. // join pages with outgoing edges and distribute rank
  11. .join(pageLinkLists).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
  12. // collect and sum ranks
  13. .groupBy(0).sum(1)
  14. // apply dampening factor
  15. .map(new Dampener(DAMPENING_FACTOR, numPages));
  16. DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
  17. newRanks,
  18. newRanks.join(iteration).where(0).equalTo(0)
  19. // termination condition
  20. .filter(new EpsilonFilter()));
  21. finalPageRanks.writeAsCsv(outputPath, "\n", " ");
  22. // User-defined functions
  23. public static final class JoinVertexWithEdgesMatch
  24. implements FlatJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long[]>,
  25. Tuple2<Long, Double>> {
  26. @Override
  27. public void join(<Tuple2<Long, Double> page, Tuple2<Long, Long[]> adj,
  28. Collector<Tuple2<Long, Double>> out) {
  29. Long[] neighbors = adj.f1;
  30. double rank = page.f1;
  31. double rankToDistribute = rank / ((double) neigbors.length);
  32. for (int i = 0; i < neighbors.length; i++) {
  33. out.collect(new Tuple2<Long, Double>(neighbors[i], rankToDistribute));
  34. }
  35. }
  36. }
  37. public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
  38. private final double dampening, randomJump;
  39. public Dampener(double dampening, double numVertices) {
  40. this.dampening = dampening;
  41. this.randomJump = (1 - dampening) / numVertices;
  42. }
  43. @Override
  44. public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
  45. value.f1 = (value.f1 * dampening) + randomJump;
  46. return value;
  47. }
  48. }
  49. public static final class EpsilonFilter
  50. implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
  51. @Override
  52. public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
  53. return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
  54. }
  55. }

PageRank program实现了上述示例。它需要以下参数来运行:--pages &lt;path&gt; --links &lt;path&gt; --output &lt;path&gt; --numPages &lt;n&gt; --iterations &lt;n&gt;

  1. // User-defined types case class Link(sourceId: Long, targetId: Long)
  2. case class Page(pageId: Long, rank: Double)
  3. case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
  4. // set up execution environment val env = ExecutionEnvironment.getExecutionEnvironment
  5. // read the pages and initial ranks by parsing a CSV file val pages = env.readCsvFile[Page](pagesInputPath)
  6. // the links are encoded as an adjacency list: (page-id, Array(neighbor-ids)) val links = env.readCsvFile[Link](linksInputPath)
  7. // assign initial ranks to pages val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))
  8. // build adjacency list from link input val adjacencyLists = links
  9. // initialize lists
  10. .map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
  11. // concatenate lists
  12. .groupBy("sourceId").reduce {
  13. (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
  14. }
  15. // start iteration val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
  16. currentRanks =>
  17. val newRanks = currentRanks
  18. // distribute ranks to target pages
  19. .join(adjacencyLists).where("pageId").equalTo("sourceId") {
  20. (page, adjacent, out: Collector[Page]) =>
  21. for (targetId <- adjacent.targetIds) {
  22. out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
  23. }
  24. }
  25. // collect ranks and sum them up
  26. .groupBy("pageId").aggregate(SUM, "rank")
  27. // apply dampening factor
  28. .map { p =>
  29. Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
  30. }
  31. // terminate if no rank update was significant
  32. val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
  33. (current, next, out: Collector[Int]) =>
  34. // check for significant update
  35. if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
  36. }
  37. (newRanks, termination)
  38. }
  39. val result = finalRanks
  40. // emit result result.writeAsCsv(outputPath, "\n", " ")

PageRank程序实现了上述示例。它需要以下参数来运行:--pages &lt;path&gt; --links &lt;path&gt; --output &lt;path&gt; --numPages &lt;n&gt; --iterations &lt;n&gt;

输入文件是纯文本文件,必须格式化如下:

  • 页面表示为由新行字符分隔的(长)ID。
    • 例如"1\n2\n12\n42\n63\n" 给出了5页的IDs 1、2、12、42和63。
  • 链接表示为由空格字符分隔的页面id对。链接用换行符分隔:
    • 例如"1 2\n2 12\n1 12\n42 63\n" 给四(导演)链接(1)→(2),(2)→(12),(1)→(12)和(42)→(63)。

对于这个简单的实现,要求每个页面至少有一个传入和一个传出链接(页面可以指向自己)。

连接组件

连通分量算法通过将同一连通部分中的所有顶点分配给相同的分量ID来识别一个较大图中的连通部分。与PageRank类似,连通分量是一种迭代算法。在每一步中,每个顶点将其当前的组件ID传播到它的所有邻居。如果一个顶点的组件ID小于它自己的组件ID,那么它接受来自邻居的组件ID。

此实现使用delta iteration:没有更改其组件ID的顶点不参与下一步。这将产生更好的性能,因为后面的迭代通常只处理少数离群点。

  1. // read vertex and edge data
  2. DataSet<Long> vertices = getVertexDataSet(env);
  3. DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());
  4. // assign the initial component IDs (equal to the vertex ID)
  5. DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
  6. // open a delta iteration
  7. DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
  8. verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);
  9. // apply the step logic:
  10. DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset()
  11. // join with the edges
  12. .join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
  13. // select the minimum neighbor component ID
  14. .groupBy(0).aggregate(Aggregations.MIN, 1)
  15. // update if the component ID of the candidate is smaller
  16. .join(iteration.getSolutionSet()).where(0).equalTo(0)
  17. .flatMap(new ComponentIdFilter());
  18. // close the delta iteration (delta and new workset are identical)
  19. DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
  20. // emit result
  21. result.writeAsCsv(outputPath, "\n", " ");
  22. // User-defined functions
  23. public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
  24. @Override
  25. public Tuple2<T, T> map(T vertex) {
  26. return new Tuple2<T, T>(vertex, vertex);
  27. }
  28. }
  29. public static final class UndirectEdge
  30. implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
  31. Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
  32. @Override
  33. public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
  34. invertedEdge.f0 = edge.f1;
  35. invertedEdge.f1 = edge.f0;
  36. out.collect(edge);
  37. out.collect(invertedEdge);
  38. }
  39. }
  40. public static final class NeighborWithComponentIDJoin
  41. implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
  42. @Override
  43. public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
  44. return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
  45. }
  46. }
  47. public static final class ComponentIdFilter
  48. implements FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>,
  49. Tuple2<Long, Long>> {
  50. @Override
  51. public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value,
  52. Collector<Tuple2<Long, Long>> out) {
  53. if (value.f0.f1 < value.f1.f1) {
  54. out.collect(value.f0);
  55. }
  56. }
  57. }

ConnectedComponents program 实现了上述示例。它需要以下参数来运行:--vertices &lt;path&gt; --edges &lt;path&gt; --output &lt;path&gt; --iterations &lt;n&gt;

  1. // set up execution environment val env = ExecutionEnvironment.getExecutionEnvironment
  2. // read vertex and edge data
  3. // assign the initial components (equal to the vertex id) val vertices = getVerticesDataSet(env).map { id => (id, id) }
  4. // undirected edges by emitting for each input edge the input edges itself and an inverted
  5. // version val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }
  6. // open a delta iteration val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {
  7. (s, ws) =>
  8. // apply the step logic: join with the edges
  9. val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>
  10. (edge._2, vertex._2)
  11. }
  12. // select the minimum neighbor
  13. val minNeighbors = allNeighbors.groupBy(0).min(1)
  14. // update if the component of the candidate is smaller
  15. val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
  16. (newVertex, oldVertex, out: Collector[(Long, Long)]) =>
  17. if (newVertex._2 < oldVertex._2) out.collect(newVertex)
  18. }
  19. // delta and new workset are identical
  20. (updatedComponents, updatedComponents)
  21. }
  22. verticesWithComponents.writeAsCsv(outputPath, "\n", " ")

ConnectedComponents程序实现了上述示例。它需要以下参数来运行:--vertices &lt;path&gt; --edges &lt;path&gt; --output &lt;path&gt; --iterations &lt;n&gt;

输入文件是纯文本文件,必须格式化如下:

  • 顶点表示为id,用换行符分隔。
    • 例如"1\n2\n12\n42\n63\n" 给了5个顶点(1),(2),(12),(42)和(63)。
  • 边缘表示为顶点id的对,顶点id由空间字符分隔。边缘用换行符分隔:
    • 例如"1 2\n2 12\n1 12\n42 63\n"给出4个(无向)链路(1)-(2),(2)-(12),(1)-(12),和(42)-(63)。