目的:统计text文件中以空格分隔的单词出现频数
环境:jdk8 工具:idea2019.3

创建maven项目

引入jar包

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-java</artifactId>
  4. <version>1.10.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-streaming-java_2.12</artifactId>
  9. <version>1.10.1</version>
  10. </dependency>

代码演示

DataSet用于离线数据统计

  1. //创建批处理执行环境
  2. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  3. //读取数据
  4. DataSource<String> input = env.readTextFile("txt的绝对路径");
  5. //统计数据
  6. DataSet<Tuple2<String,Integer>> res=input.flatMap(new MyFlatMapper()).groupBy(0).sum(1);
  7. //输出打印
  8. res.print();
  9. public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
  10. public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
  11. String[] words = value.split(" ");
  12. for (String word : words) {
  13. out.collect(new Tuple2<>(word, 1));
  14. }
  15. }
  16. }

批处理单词统计.rar

结果

  1. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  2. SLF4J: Defaulting to no-operation (NOP) logger implementation
  3. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  4. (php,2)
  5. (java,2)
  6. (word,3)
  7. (c,1)
  8. (scale,2)
  9. (hello,10)

思考:

  1. input.flatMap(new MyFlatMapper()).groupBy(0).sum(1);

分组的位置以及求和的位置有什么作用?