目的:掌握reduce聚合算子等算子,keyBy是聚合算子的必备条件
环境:jdk8 工具:idea
创建maven项目
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency>
代码演示
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> inputStream = env.readTextFile("C:\\Users\\Luo\\Desktop\\com.helloflink\\src\\main\\resources\\text");DataStream<Test> dataStream = inputStream.map(new MapFunction<String, Test>() {@Overridepublic Test map(String value) throws Exception {String[] words = value.split(" ");return new Test(words[0], Integer.parseInt(words[1]),words[2]);}});dataStream.keyBy("id").reduce(new ReduceFunction<Test>() {@Overridepublic Test reduce(Test value1, Test value2) throws Exception {return new Test(value1.getId(), Math.max(value1.getScore(), value2.getScore()), value2.getContent());}}).print("reduce"); //reduce做聚合(和上一次的值可以比较),输入输出类型要一样dataStream.keyBy("id").sum("score").print("sum"); //求和dataStream.keyBy("id").min("score").print("min"); //求最小值dataStream.keyBy("id").max("score").print("max"); //求最大值dataStream.keyBy("id").maxBy("score").print("maxBy"); //求最小值dataStream.keyBy("id").minBy("score").print("minBy"); //求最大值env.execute();
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.max> Test{id='1', score=80, content='bs'}minBy> Test{id='1', score=80, content='bs'}minBy> Test{id='2', score=99, content='jj'}minBy> Test{id='3', score=95, content='jw'}minBy> Test{id='1', score=80, content='bs'}maxBy> Test{id='1', score=80, content='bs'}reduce> Test{id='1', score=80, content='bs'}max> Test{id='2', score=99, content='jj'}reduce> Test{id='2', score=99, content='jj'}max> Test{id='3', score=95, content='jw'}reduce> Test{id='3', score=95, content='jw'}maxBy> Test{id='2', score=99, content='jj'}maxBy> Test{id='3', score=95, content='jw'}max> Test{id='1', score=99, content='bs'}maxBy> Test{id='1', score=99, content='jj'}reduce> Test{id='1', score=99, content='jj'}min> Test{id='1', score=80, content='bs'}min> Test{id='2', score=99, content='jj'}min> Test{id='3', score=95, content='jw'}min> Test{id='1', score=80, content='bs'}sum> Test{id='1', score=80, content='bs'}sum> Test{id='2', score=99, content='jj'}sum> Test{id='3', score=95, content='jw'}sum> Test{id='1', score=179, content='bs'}
思考
min()和minBy()有什么区别?
min只返回计算的最小值,而最小值对应的其他数据不保证正确。minBy返回计算的最小值,并且最小值对应的其他数据是保证正确的。
