目的:掌握reduce聚合算子等算子,keyBy是聚合算子的必备条件
环境:jdk8 工具:idea2019.3

创建maven项目

引入jar包

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-java</artifactId>
  4. <version>1.10.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-streaming-java_2.12</artifactId>
  9. <version>1.10.1</version>
  10. </dependency>

代码演示

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(1);
  3. DataStream<String> inputStream = env.readTextFile("C:\\Users\\Luo\\Desktop\\com.helloflink\\src\\main\\resources\\text");
  4. DataStream<Test> dataStream = inputStream.map(new MapFunction<String, Test>() {
  5. @Override
  6. public Test map(String value) throws Exception {
  7. String[] words = value.split(" ");
  8. return new Test(words[0], Integer.parseInt(words[1]),words[2]);
  9. }
  10. });
  11. dataStream.keyBy("id").reduce(new ReduceFunction<Test>() {
  12. @Override
  13. public Test reduce(Test value1, Test value2) throws Exception {
  14. return new Test(value1.getId(), Math.max(value1.getScore(), value2.getScore()), value2.getContent());
  15. }
  16. }).print("reduce"); //reduce做聚合(和上一次的值可以比较),输入输出类型要一样
  17. dataStream.keyBy("id").sum("score").print("sum"); //求和
  18. dataStream.keyBy("id").min("score").print("min"); //求最小值
  19. dataStream.keyBy("id").max("score").print("max"); //求最大值
  20. dataStream.keyBy("id").maxBy("score").print("maxBy"); //求最小值
  21. dataStream.keyBy("id").minBy("score").print("minBy"); //求最大值
  22. env.execute();

聚合算子.rar

结果

  1. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  2. SLF4J: Defaulting to no-operation (NOP) logger implementation
  3. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  4. max> Test{id='1', score=80, content='bs'}
  5. minBy> Test{id='1', score=80, content='bs'}
  6. minBy> Test{id='2', score=99, content='jj'}
  7. minBy> Test{id='3', score=95, content='jw'}
  8. minBy> Test{id='1', score=80, content='bs'}
  9. maxBy> Test{id='1', score=80, content='bs'}
  10. reduce> Test{id='1', score=80, content='bs'}
  11. max> Test{id='2', score=99, content='jj'}
  12. reduce> Test{id='2', score=99, content='jj'}
  13. max> Test{id='3', score=95, content='jw'}
  14. reduce> Test{id='3', score=95, content='jw'}
  15. maxBy> Test{id='2', score=99, content='jj'}
  16. maxBy> Test{id='3', score=95, content='jw'}
  17. max> Test{id='1', score=99, content='bs'}
  18. maxBy> Test{id='1', score=99, content='jj'}
  19. reduce> Test{id='1', score=99, content='jj'}
  20. min> Test{id='1', score=80, content='bs'}
  21. min> Test{id='2', score=99, content='jj'}
  22. min> Test{id='3', score=95, content='jw'}
  23. min> Test{id='1', score=80, content='bs'}
  24. sum> Test{id='1', score=80, content='bs'}
  25. sum> Test{id='2', score=99, content='jj'}
  26. sum> Test{id='3', score=95, content='jw'}
  27. sum> Test{id='1', score=179, content='bs'}

思考

min()和minBy()有什么区别?

  1. min只返回计算的最小值,而最小值对应的其他数据不保证正确。
  2. minBy返回计算的最小值,并且最小值对应的其他数据是保证正确的。

测试过程