目的:统计text文件中以空格分隔的单词出现频数
环境:jdk8 工具:idea2019.3

创建maven项目

引入jar包

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-java</artifactId>
  4. <version>1.10.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-streaming-java_2.12</artifactId>
  9. <version>1.10.1</version>
  10. </dependency>

代码演示

DataStream用于实时数据统计

  1. //创建流处理执行环境
  2. StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
  3. env.setMaxParallelism(8);//设置并行度,默认CPU核数
  4. //读取数据
  5. DataStream<String> input = env.readTextFile("txt的绝对路径");
  6. //统计数据
  7. DataStream<Tuple2<String,Integer>> res=input.flatMap(new MyFlatMapper()).keyBy(0).sum(1);
  8. //输出打印
  9. res.print();
  10. //执行
  11. env.execute();
  12. public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
  13. public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
  14. String[] words = value.split(" ");
  15. for (String word : words) {
  16. out.collect(new Tuple2<>(word, 1));
  17. }
  18. }
  19. }

流处理单词统计.rar

结果

>前代表线程

  1. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  2. SLF4J: Defaulting to no-operation (NOP) logger implementation
  3. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  4. 2> (hello,1)
  5. 2> (hello,2)
  6. 4> (cc,1)
  7. 3> (word,1)
  8. 3> (java,1)
  9. 2> (hello,3)
  10. 1> (php,1)