目的:掌握reduce聚合算子等算子,keyBy是聚合算子的必备条件
环境:jdk8 工具:idea
创建maven项目
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
代码演示
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> inputStream = env.readTextFile("C:\\Users\\Luo\\Desktop\\com.helloflink\\src\\main\\resources\\text");
DataStream<Test> dataStream = inputStream.map(new MapFunction<String, Test>() {
@Override
public Test map(String value) throws Exception {
String[] words = value.split(" ");
return new Test(words[0], Integer.parseInt(words[1]),words[2]);
}
});
dataStream.keyBy("id").reduce(new ReduceFunction<Test>() {
@Override
public Test reduce(Test value1, Test value2) throws Exception {
return new Test(value1.getId(), Math.max(value1.getScore(), value2.getScore()), value2.getContent());
}
}).print("reduce"); //reduce做聚合(和上一次的值可以比较),输入输出类型要一样
dataStream.keyBy("id").sum("score").print("sum"); //求和
dataStream.keyBy("id").min("score").print("min"); //求最小值
dataStream.keyBy("id").max("score").print("max"); //求最大值
dataStream.keyBy("id").maxBy("score").print("maxBy"); //求最小值
dataStream.keyBy("id").minBy("score").print("minBy"); //求最大值
env.execute();
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
max> Test{id='1', score=80, content='bs'}
minBy> Test{id='1', score=80, content='bs'}
minBy> Test{id='2', score=99, content='jj'}
minBy> Test{id='3', score=95, content='jw'}
minBy> Test{id='1', score=80, content='bs'}
maxBy> Test{id='1', score=80, content='bs'}
reduce> Test{id='1', score=80, content='bs'}
max> Test{id='2', score=99, content='jj'}
reduce> Test{id='2', score=99, content='jj'}
max> Test{id='3', score=95, content='jw'}
reduce> Test{id='3', score=95, content='jw'}
maxBy> Test{id='2', score=99, content='jj'}
maxBy> Test{id='3', score=95, content='jw'}
max> Test{id='1', score=99, content='bs'}
maxBy> Test{id='1', score=99, content='jj'}
reduce> Test{id='1', score=99, content='jj'}
min> Test{id='1', score=80, content='bs'}
min> Test{id='2', score=99, content='jj'}
min> Test{id='3', score=95, content='jw'}
min> Test{id='1', score=80, content='bs'}
sum> Test{id='1', score=80, content='bs'}
sum> Test{id='2', score=99, content='jj'}
sum> Test{id='3', score=95, content='jw'}
sum> Test{id='1', score=179, content='bs'}
思考
min()和minBy()有什么区别?
min只返回计算的最小值,而最小值对应的其他数据不保证正确。
minBy返回计算的最小值,并且最小值对应的其他数据是保证正确的。