目的:用split将一个流分为N个流,用select查询
环境:jdk8 工具:idea
创建maven项目
引入jar包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
代码演示
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> inputStream = env.readTextFile("文件绝对路径");
DataStream<Test> dataStream = inputStream.map(new MapFunction<String, Test>() {
@Override
public Test map(String value) throws Exception {
String[] words = value.split(" ");
return new Test(words[0], Integer.parseInt(words[1]), words[2]);
}
});
//分流
SplitStream<Test> spl=dataStream.split(new OutputSelector<Test>() {
@Override
public Iterable<String> select(Test value) {
return (value.getScore() >= 80) ? Collections.singletonList("high") :
Collections.singletonList("low");
}
});
//根据key获取分流
DataStream<Test> highTempStream = spl.select("high");
DataStream<Test> lowTempStream = spl.select("low");
highTempStream.print("highTempStream");
lowTempStream.print("lowTempStream");
env.execute();
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
highTempStream> Test{id='1', score=80, content='bs'}
highTempStream> Test{id='2', score=99, content='jj'}
highTempStream> Test{id='3', score=95, content='jw'}
highTempStream> Test{id='1', score=99, content='jj'}