1. 基本转换算子

1.1 map

  1. package com.nkong.blink.transform;
  2. import org.apache.flink.api.common.functions.FilterFunction;
  3. import org.apache.flink.api.common.functions.FlatMapFunction;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.util.Collector;
  9. /**
  10. * @author nkong
  11. * @time 2022/2/7 15:55
  12. */
  13. public class MyTransform {
  14. public static void main(String[] args) throws Exception {
  15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16. env.setParallelism(1);
  17. DataStream<String> dataStream = env.fromCollection(Lists.newArrayList("flink", "java", "mysql", "blink"));
  18. // map
  19. dataStream.map(new MapFunction<String, Integer>() {
  20. @Override
  21. public Integer map(String value) throws Exception {
  22. return value.length();
  23. }
  24. }).print("map");
  25. env.execute();
  26. }
  27. }

1.2 flatMap

  1. package com.nkong.blink.transform;
  2. import org.apache.flink.api.common.functions.FilterFunction;
  3. import org.apache.flink.api.common.functions.FlatMapFunction;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.util.Collector;
  9. /**
  10. * @author nkong
  11. * @time 2022/2/7 15:55
  12. */
  13. public class MyTransform {
  14. public static void main(String[] args) throws Exception {
  15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16. env.setParallelism(1);
  17. DataStream<String> dataStream = env.fromCollection(Lists.newArrayList("flink", "java", "mysql", "blink"));
  18. // flatMap
  19. dataStream.flatMap(new FlatMapFunction<String, String>() {
  20. @Override
  21. public void flatMap(String value, Collector<String> out) throws Exception {
  22. out.collect(value.toLowerCase());
  23. }
  24. }).print("flatMap");
  25. env.execute();
  26. }
  27. }

1.3 filter

  1. package com.nkong.blink.transform;
  2. import org.apache.flink.api.common.functions.FilterFunction;
  3. import org.apache.flink.api.common.functions.FlatMapFunction;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.util.Collector;
  9. /**
  10. * @author nkong
  11. * @time 2022/2/7 15:55
  12. */
  13. public class MyTransform {
  14. public static void main(String[] args) throws Exception {
  15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16. env.setParallelism(1);
  17. DataStream<String> dataStream = env.fromCollection(Lists.newArrayList("flink", "java", "mysql", "blink"));
  18. // filter
  19. dataStream.filter(new FilterFunction<String>() {
  20. @Override
  21. public boolean filter(String value) throws Exception {
  22. return value.contains("k");
  23. }
  24. }).print("filter");
  25. env.execute();
  26. }
  27. }

2. 多流转换算子

2.1 union

将多条流连接为一条流。
源码:

  1. @SafeVarargs
  2. public final DataStream<T> union(DataStream<T>... streams) {
  3. List<Transformation<T>> unionedTransforms = new ArrayList<>();
  4. unionedTransforms.add(this.transformation);
  5. for (DataStream<T> newStream : streams) {
  6. if (!getType().equals(newStream.getType())) {
  7. throw new IllegalArgumentException(
  8. "Cannot union streams of different types: "
  9. + getType()
  10. + " and "
  11. + newStream.getType());
  12. }
  13. unionedTransforms.add(newStream.getTransformation());
  14. }
  15. return new DataStream<>(this.environment, new UnionTransformation<>(unionedTransforms));
  16. }
  1. 注:元素类型要保持一致。<br />使用示例:
  1. package com.nkong.blink.transform;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. /**
  7. * @author nkong
  8. * @time 2022/2/8 9:42
  9. */
  10. public class JoinJob {
  11. public static void main(String[] args) throws Exception {
  12. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13. env.setParallelism(1);
  14. DataStream<String> dataStream1 = env.fromCollection(Lists.newArrayList("flink", "java", "flink"));
  15. DataStream<String> dataStream2 = env.fromCollection(Lists.newArrayList("flink", "python"));
  16. DataStream<Tuple2<String, Integer>> dataStream3 = env.fromCollection(Lists.newArrayList(
  17. new Tuple2<>("flink", 1),
  18. new Tuple2<>("blink", 2)));
  19. DataStream<String> union = dataStream1.union(dataStream2);
  20. union.print("union");
  21. env.execute();
  22. }
  23. }

输出:

  1. union> flink
  2. union> java
  3. union> flink
  4. union> flink
  5. union> python

2.2 join

作用于两条流,只返回匹配到的结果,形成一条新的流。相当于MySQL里的内连接。

语法: dataStream.join(otherStream)
.where(0) .equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {…});

2.2.1 join执行流程分析:

  1. public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) {
  2. return new JoinedStreams<>(this, otherStream);
  3. }
  1. 1. 调用join方法,返回JoinedStreams<T, T2>对象,该对象提供where方法:
  1. public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector) {
  2. requireNonNull(keySelector);
  3. final TypeInformation<KEY> keyType =
  4. TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
  5. return where(keySelector, keyType);
  6. }
  1. 2. 调用where方法,返回Where<KEY>对象,该对象提供equalTo方法:
  1. public EqualTo equalTo(KeySelector<T2, KEY> keySelector) {
  2. requireNonNull(keySelector);
  3. final TypeInformation<KEY> otherKey =
  4. TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
  5. return equalTo(keySelector, otherKey);
  6. }
  1. 3. 调用equalTo方法,返回window对象。
  1. public <W extends Window> WithWindow<T1, T2, KEY, W> window(
  2. WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
  3. return new WithWindow<>(
  4. input1,
  5. input2,
  6. keySelector1,
  7. keySelector2,
  8. keyType,
  9. assigner,
  10. null,
  11. null,
  12. null);
  13. }
  1. 使用示例:
  1. package com.nkong.blink.transform;
  2. import org.apache.flink.api.common.functions.JoinFunction;
  3. import org.apache.flink.api.java.functions.KeySelector;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
  9. import org.apache.flink.streaming.api.windowing.time.Time;
  10. import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
  11. /**
  12. * @author nkong
  13. * @time 2022/2/8 9:42
  14. */
  15. public class JoinJob {
  16. public static void main(String[] args) throws Exception {
  17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18. env.setParallelism(1);
  19. DataStream<String> dataStream1 = env.fromCollection(Lists.newArrayList("flink", "java", "flink"));
  20. DataStream<String> dataStream2 = env.fromCollection(Lists.newArrayList("flink", "python"));
  21. DataStream<Tuple2<String, Integer>> dataStream3 = env.fromCollection(Lists.newArrayList(
  22. new Tuple2<>("flink", 1),
  23. new Tuple2<>("blink", 2)));
  24. DataStream<String> join = dataStream2.join(dataStream3).where(new KeySelector<String, String>() {
  25. @Override
  26. public String getKey(String value) throws Exception {
  27. return value;
  28. }
  29. }).equalTo(new KeySelector<Tuple2<String, Integer>, String>() {
  30. @Override
  31. public String getKey(Tuple2<String, Integer> value) throws Exception {
  32. return value.f0;
  33. }
  34. }).window(ProcessingTimeSessionWindows.withGap(Time.seconds(100))).trigger(CountTrigger.of(1))
  35. .apply(new JoinFunction<String, Tuple2<String, Integer>, String>() {
  36. @Override
  37. public String join(String first, Tuple2<String, Integer> second) throws Exception {
  38. return "first:" + first + " " +
  39. "second:" + second.f0 + " ";
  40. }
  41. });
  42. join.print("join");
  43. env.execute();
  44. }
  45. }
输出:
join> first:flink second:flink

2.3 coGroup

将两个数据流/集合按照key进行group,然后将相同key的数据进行处理,但是它和join操作稍有区别,它在一个流/数据集中没有找到与另一个匹配的数据还是会输出。执行流程类似join。

语法: dataStream.coGroup(otherStream)
.where(0) .equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {…});

使用示例:
package com.nkong.blink.transform;

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.util.Collector;

/**
 * @author nkong
 * @time 2022/2/8 9:42
 */
public class JoinJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> dataStream1 = env.fromCollection(Lists.newArrayList("flink", "java", "flink"));
        DataStream<String> dataStream2 = env.fromCollection(Lists.newArrayList("flink", "python"));
        DataStream<Tuple2<String, Integer>> dataStream3 = env.fromCollection(Lists.newArrayList(
                new Tuple2<>("flink", 1),
                new Tuple2<>("blink", 2)));

        DataStream<String> coGroup = dataStream1.coGroup(dataStream2).where(new KeySelector<String, String>() {
            @Override
            public String getKey(String value) throws Exception {
                return value;
            }
        }).equalTo(new KeySelector<String, String>() {
            @Override
            public String getKey(String value) throws Exception {
                return value;
            }
        }).window(ProcessingTimeSessionWindows.withGap(Time.seconds(100))).trigger(CountTrigger.of(1)).apply(new CoGroupFunction<String, String, String>() {
            @Override
            public void coGroup(Iterable<String> first, Iterable<String> second, Collector<String> out) throws Exception {
                StringBuilder stringBuffer = new StringBuilder();
                for (String s : first) {
                    stringBuffer.append("first:").append(s).append(" ");
                }
                for (String o : second) {
                    stringBuffer.append("second:").append(o).append(" ");
                }
                out.collect(stringBuffer.toString());
            }
        });
        coGroup.print("coGroup");

        env.execute();
    }

}

输出:

coGroup> first:flink 
coGroup> first:java 
coGroup> first:flink first:flink 
coGroup> first:flink first:flink second:flink 
coGroup> second:python

2.4 connect

connect提供了和union类似的功能,用来连接两个数据流。
它与union的区别在于:

  1. connect只能连接两个数据流,union可以连接多个数据流。
  2. connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。
  3. 两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。

    语法: DataStream<Integer> someStream = //… DataStream<String> otherStream = //… ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

    使用示例: ```java package com.nkong.blink.transform;

import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction;

/**

  • @author nkong
  • @time 2022/2/8 9:42 */ public class JoinJob {

    public static void main(String[] args) throws Exception {

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     env.setParallelism(1);
     DataStream<String> dataStream1 = env.fromCollection(Lists.newArrayList("flink", "java", "flink"));
     DataStream<String> dataStream2 = env.fromCollection(Lists.newArrayList("flink", "python"));
     DataStream<Tuple2<String, Integer>> dataStream3 = env.fromCollection(Lists.newArrayList(
             new Tuple2<>("flink", 1),
             new Tuple2<>("blink", 2)));
    
     DataStream<String> connect = dataStream1.connect(dataStream3).map(new CoMapFunction<String, Tuple2<String, Integer>, String>() {
    
         // 处理流1
         @Override
         public String map1(String value) throws Exception {
             return value;
         }
    
         // 处理流2
         @Override
         public String map2(Tuple2<String, Integer> value) throws Exception {
             return value.f0;
         }
     });
     connect.print("connect");
    
     env.execute();
    

    }

}

输出:
```java
connect> flink
connect> java
connect> flink
connect> flink
connect> blink

3. 聚合算子

3.1 keyBy

image.png
DataStreamKeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key的元素, 在内部以 hash的形式实现的。
1、KeyBy会重新分区;
2、不同的key有可能分到一起,因为是通过hash原理实现的;

package com.nkong.blink.transform;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author nkong
 * @time 2022/2/7 15:55
 */
public class MyTransform {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> dataStream = env.fromCollection(Lists.newArrayList("flink", "java", "mysql", "flink"));
        dataStream.keyBy(new KeySelector<String, String>() {
            @Override
            public String getKey(String value) throws Exception {
                return value;
            }
        });
        env.execute();
    }

}

3.2 Rolling Aggregation

这些算子可以针对KeyedStream的每一个支流做聚合。

  • sum()
  • min()
  • max()
  • minBy()
  • maxBy() ```java package com.nkong.blink.start;

import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector;

/**

  • @author nkong
  • @time 2022/02/08 14:38 */ public class SumTransformJob {

    public static void main(String[] args) throws Exception {

     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     DataStream<String> text = env.socketTextStream("127.0.0.1", 9000);
     DataStream<Tuple2<String, Integer>> dataStream = text.flatMap(new MyFlatMap())
             .keyBy(new MyKeySelector())
             // 滚动时间窗口
             .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
             .sum(1);
     dataStream.print();
     env.execute("SumTransformJob");
    

    }

    private static class MyFlatMap implements FlatMapFunction> {

     @Override
     public void flatMap(String inStr, Collector<Tuple2<String, Integer>> collector) throws Exception {
         String[] tokens = inStr.toLowerCase().split("\t", -1);
         for (String token : tokens) {
             if (token.length() > 0) {
                 collector.collect(new Tuple2<String, Integer>(token, 1));
             }
         }
     }
    

    }

    private static class MyKeySelector implements KeySelector, Object> {

     @Override
     public Object getKey(Tuple2<String, Integer> stringIntegerTuple) throws Exception {
         return stringIntegerTuple.f0;
     }
    

    }

}

<a name="HU9iG"></a>
### 3.3 reduce
**KeyedStream**→ **DataStream**: 一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
```java
package com.nkong.blink.transform;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author nkong
 * @time 2022/2/8 15:55
 */
public class ReduceJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> dataStream = env.fromCollection(Lists.newArrayList("flink,1", "java,2", "mysql,3", "flink,4"));
        DataStream<Tuple2<String, Integer>> streamOperator = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] split = s.split(",", -1);
                collector.collect(new Tuple2<String, Integer>(split[0], Integer.valueOf(split[1])));
            }
        });
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = streamOperator.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });
        DataStream<Tuple2<String, Integer>> reduce = keyedStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            /**
             *
             * @param value1 上次聚合计算保存的结果状态数据
             * @param value2 本次流入的数据
             * @return 聚合计算后的结果
             * @throws Exception
             */
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return new Tuple2<>(value1.f0 + value2.f0, value1.f1 + value2.f1);
            }
        });
        reduce.print();
        env.execute();
    }

}
输出:
10> (flink,1)
10> (mysql,3)
2> (java,2)
10> (flinkflink,5)