目的:三种基本转换算子map、flatMap、filter
环境:jdk8 工具:idea2019.3

创建maven项目

引入jar包

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-java</artifactId>
  4. <version>1.10.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-streaming-java_2.12</artifactId>
  9. <version>1.10.1</version>
  10. </dependency>

代码演示

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. DataStream<String> dataStream = env.readTextFile("绝对路径");
  3. //对值进行处理后可以用不同格式返回
  4. DataStream<Integer> integerDataStream = dataStream.map(new MapFunction<String, Integer>() {
  5. public Integer map(String value) throws Exception {
  6. return value.length();
  7. }
  8. });
  9. //单个值的输入后可以进行多值输出
  10. DataStream<String> flatMap = dataStream.flatMap(new FlatMapFunction<String, String>() {
  11. public void flatMap(String value, Collector<String> out) throws Exception {
  12. String[] fields=value.split(" ");
  13. for (String field:fields){
  14. out.collect(field);
  15. }
  16. }
  17. });
  18. //进行过滤元素
  19. DataStream<String> stringDataStream=dataStream.filter(new FilterFunction<String>() {
  20. public boolean filter(String value) throws Exception {
  21. return value.contains("test");
  22. }
  23. });
  24. stringDataStream.print("filter");
  25. flatMap.print("flatMap");
  26. integerDataStream.print("map");

基本转换算子.rar

结果

  1. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  2. SLF4J: Defaulting to no-operation (NOP) logger implementation
  3. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  4. map:3> 10
  5. flatMap:3> hello
  6. flatMap:3> test
  7. filter:3> hello test
  8. map:4> 10
  9. flatMap:4> hello
  10. flatMap:4> java
  11. map:2> 10
  12. flatMap:2> hello
  13. flatMap:2> word