目的:用split将一个流分为N个流,用select查询
环境:jdk8 工具:idea
创建maven项目
引入jar包
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency>
代码演示
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> inputStream = env.readTextFile("文件绝对路径");DataStream<Test> dataStream = inputStream.map(new MapFunction<String, Test>() {@Overridepublic Test map(String value) throws Exception {String[] words = value.split(" ");return new Test(words[0], Integer.parseInt(words[1]), words[2]);}});//分流SplitStream<Test> spl=dataStream.split(new OutputSelector<Test>() {@Overridepublic Iterable<String> select(Test value) {return (value.getScore() >= 80) ? Collections.singletonList("high") :Collections.singletonList("low");}});//根据key获取分流DataStream<Test> highTempStream = spl.select("high");DataStream<Test> lowTempStream = spl.select("low");highTempStream.print("highTempStream");lowTempStream.print("lowTempStream");env.execute();
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.highTempStream> Test{id='1', score=80, content='bs'}highTempStream> Test{id='2', score=99, content='jj'}highTempStream> Test{id='3', score=95, content='jw'}highTempStream> Test{id='1', score=99, content='jj'}
