压缩数据集中的元素

在一些算法中,可能需要为数据集元素分配唯一标识符。这个文件展示了我们能如何使用数据集达到这个目的 (https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java).

具有密集索引的zip

zipwithindex为元素分配连续标签,接收数据集作为输入,并返回一个新的数据集(唯一ID,初始值)2-tuples。这个过程需要两个过程,第一个是计数,然后是标记元素,由于计数的同步,所以不能进行流水线操作。备选方案“zipwithuniqueid”以流水线方式工作,并且在唯一标签足够时首选。例如以下代码:

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(2);
  3. DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H");
  4. DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithIndex(in);
  5. result.writeAsCsv(resultPath, "\n", ",");
  6. env.execute();
  1. import org.apache.flink.api.scala._
  2. val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
  3. env.setParallelism(2)
  4. val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")
  5. val result: DataSet[(Long, String)] = input.zipWithIndex
  6. result.writeAsCsv(resultPath, "\n", ",")
  7. env.execute()
  1. from flink.plan.Environment import get_environment
  2. env = get_environment()
  3. env.set_parallelism(2)
  4. input = env.from_elements("A", "B", "C", "D", "E", "F", "G", "H")
  5. result = input.zip_with_index()
  6. result.write_text(result_path)
  7. env.execute()

代码可能产生元组: (0,G), (1,H), (2,A), (3,B), (4,C), (5,D), (6,E), (7,F)

Back to top

具有独特的标识符。

在许多情况下,可能不需要指定连续标签。zipwithuniqueid以流水线方式工作,加快了标签分配过程。此方法接收一个数据集作为输入,并返回一个新的数据集(唯一ID,初始值)2-tuple。例如,以下代码:

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(2);
  3. DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H");
  4. DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithUniqueId(in);
  5. result.writeAsCsv(resultPath, "\n", ",");
  6. env.execute();
  1. import org.apache.flink.api.scala._
  2. val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
  3. env.setParallelism(2)
  4. val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")
  5. val result: DataSet[(Long, String)] = input.zipWithUniqueId
  6. result.writeAsCsv(resultPath, "\n", ",")
  7. env.execute()

可能产生结果: (0,G), (1,A), (2,H), (3,B), (5,C), (7,D), (9,E), (11,F)

Back to top