目的:用split将一个流分为N个流,用select查询
环境:jdk8 工具:idea

创建maven项目

引入jar包

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-java</artifactId>
  4. <version>1.10.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-streaming-java_2.12</artifactId>
  9. <version>1.10.1</version>
  10. </dependency>

代码演示

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(1);
  3. DataStream<String> inputStream = env.readTextFile("文件绝对路径");
  4. DataStream<Test> dataStream = inputStream.map(new MapFunction<String, Test>() {
  5. @Override
  6. public Test map(String value) throws Exception {
  7. String[] words = value.split(" ");
  8. return new Test(words[0], Integer.parseInt(words[1]), words[2]);
  9. }
  10. });
  11. //分流
  12. SplitStream<Test> spl=dataStream.split(new OutputSelector<Test>() {
  13. @Override
  14. public Iterable<String> select(Test value) {
  15. return (value.getScore() >= 80) ? Collections.singletonList("high") :
  16. Collections.singletonList("low");
  17. }
  18. });
  19. //根据key获取分流
  20. DataStream<Test> highTempStream = spl.select("high");
  21. DataStream<Test> lowTempStream = spl.select("low");
  22. highTempStream.print("highTempStream");
  23. lowTempStream.print("lowTempStream");
  24. env.execute();

分流算子.rar(6 KB)

  1. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  2. SLF4J: Defaulting to no-operation (NOP) logger implementation
  3. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  4. highTempStream> Test{id='1', score=80, content='bs'}
  5. highTempStream> Test{id='2', score=99, content='jj'}
  6. highTempStream> Test{id='3', score=95, content='jw'}
  7. highTempStream> Test{id='1', score=99, content='jj'}