算子介绍(Operators)

Flink 的转换(Transformmations) 就是用户通过算子将一个 DataStream 转为新的 DataStream 的过程,主要分为三类:

  • 数据流转换(DataStream Transformations):进行数据流相关转换操作;
  • 物理分区(Physical partitioning):Flink 提供的底层 API ,用户在数据转换完成后对数据分区进行更细粒度的配置;
  • 算子链和资源组(Task chaining and resource groups):允许用户进行算子链和资源组的细粒度的控制。

算子分类

Map

Transformation(数据流转换):[DataStream → DataStream]
描述:使用最多的算子,即输入一个元素,输出一个元素
代码示例:
给定1-9的数字,将其乘以2

  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. public class OperatorTest {
  5. public static void main(String[] args) throws Exception {
  6. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  7. DataStream<Integer> dataStream = env.fromElements(1,2,3,4,5,6,7,8,9,10);
  8. DataStream<Integer> mapDataStream = dataStream.map(new MapFunction<Integer, Integer>() {
  9. @Override
  10. public Integer map(Integer integer) throws Exception {
  11. // 所有元素都乘以2
  12. return integer * 2;
  13. }
  14. });
  15. mapDataStream.print();
  16. env.execute("Opeartor Test");
  17. }
  18. }
  19. // 输出: 2,4,6,8,10,12,14,16,18

FlatMap

Transformation(数据流转换):DataStream → DataStream
描述:与Map功能一致,但是 FlatMap 是输入一个元素,输出0个或多个元素
代码示例:

  1. import org.apache.flink.api.common.functions.FlatMapFunction;
  2. import org.apache.flink.api.common.typeinfo.Types;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.util.Collector;
  6. import java.util.Arrays;
  7. public class FlatMapTest {
  8. public static void main(String[] args) throws Exception {
  9. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  10. String in = "Flink,Spark,Storm,Hive";
  11. DataStream<String> dataStream = env.fromElements(in);
  12. DataStream<String> flatMapStream = dataStream.flatMap(
  13. new FlatMapFunction<String, String>(){
  14. @Override
  15. public void flatMap(String in, Collector<String> out) throws Exception {
  16. for(String s: in.split(",")){
  17. out.collect(s);
  18. }
  19. }
  20. }
  21. ).returns(Types.STRING);
  22. // Lambda 写法
  23. // DataStream<String> flatMapStream = dataStream.flatMap((String s,Collector<String> out) ->
  24. // Arrays.stream(s.split(",")).forEach(x -> out.collect(x))
  25. // ).returns(Types.STRING);
  26. flatMapStream.print();
  27. env.execute("FlatMap");
  28. }
  29. }


KeyBy

Transformation(数据流转换):DataStream -> KeyedStream
描述:KeyBy 将相同键划分刀捅一个分区,在逻辑上是基于 key 对流进行分区。在内部,它使用 hash 函数对流进行分区
代码示例:

  1. import org.apache.flink.api.java.functions.KeySelector;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.datastream.KeyedStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. public class KeyByTest {
  7. public static void main(String[] args) throws Exception {
  8. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  9. DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(
  10. new Tuple2<>("Flink",10),
  11. new Tuple2<>("Flink",20),
  12. new Tuple2<>("Spark",30),
  13. new Tuple2<>("Spark",15),
  14. new Tuple2<>("Storm",20),
  15. new Tuple2<>("Storm",5)
  16. );
  17. KeyedStream<Tuple2<String, Integer>,String> keyedStream = dataStream.keyBy(
  18. new KeySelector<Tuple2<String, Integer>, String>() {
  19. @Override
  20. public String getKey(Tuple2<String, Integer> in) throws Exception {
  21. return in.f0;
  22. }
  23. }
  24. );
  25. keyedStream.print();
  26. env.execute("KeyBy Test");
  27. }

reduce

Transformation(数据流转换):KeyedStream -> DataStream
描述:Reduce 返回单个的结果值,并且 reduce 操作每处理一个元素总是创建一个新值。常用的方法有 average, sum, min, max, count,使用 reduce 方法都可实现。
代码示例:

  1. import javassist.bytecode.analysis.Type;
  2. import org.apache.flink.api.common.functions.ReduceFunction;
  3. import org.apache.flink.api.common.typeinfo.Types;
  4. import org.apache.flink.api.java.functions.KeySelector;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.datastream.KeyedStream;
  8. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. public class reduceTest {
  11. public static void main(String[] args) throws Exception {
  12. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13. DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(
  14. new Tuple2<>("Flink",10),
  15. new Tuple2<>("Flink",20),
  16. new Tuple2<>("Spark",30),
  17. new Tuple2<>("Spark",15),
  18. new Tuple2<>("Storm",20),
  19. new Tuple2<>("Storm",5)
  20. );
  21. KeyedStream<Tuple2<String, Integer>,String> keyedStream = dataStream.keyBy(
  22. new KeySelector<Tuple2<String, Integer>, String>() {
  23. @Override
  24. public String getKey(Tuple2<String, Integer> in) throws Exception {
  25. return in.f0;
  26. }
  27. }
  28. );
  29. DataStream<Tuple2<String,Integer>> reduceStream = keyedStream.reduce(
  30. new ReduceFunction<Tuple2<String, Integer>>() {
  31. @Override
  32. public Tuple2<String, Integer> reduce(Tuple2<String, Integer> stringIntegerTuple2, Tuple2<String, Integer> t1) throws Exception {
  33. return new Tuple2<>(stringIntegerTuple2.f0,stringIntegerTuple2.f1 + t1.f1);
  34. }
  35. }
  36. );
  37. // lambda写法
  38. // SingleOutputStreamOperator<Tuple2<String,Integer>> reduceStream = dataStream.keyBy(0)
  39. // .reduce((value1,value2) -> new Tuple2<>(value1.f0,value1.f1+value2.f1)).returns(Types.TUPLE(Types.STRING, Types.INT));
  40. reduceStream.print();
  41. env.execute("reduce Test");
  42. }
  43. }


Filter

Transformation(数据流转换):DataStream -> DataStream
描述:过滤出符合条件的数据
代码示例:

  1. import org.apache.flink.api.common.functions.FilterFunction;
  2. import org.apache.flink.api.common.functions.MapFunction;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. public class filterTest {
  6. public static void main(String[] args) throws Exception {
  7. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  8. DataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  9. DataStream<Integer> filterDataStream = dataStream.filter(
  10. new FilterFunction<Integer>() {
  11. @Override
  12. public boolean filter(Integer integer) throws Exception {
  13. return integer % 2 == 0;
  14. }
  15. }
  16. );
  17. // Lambda 写法
  18. // DataStream<Integer> filterDataStream = dataStream.filter(x -> x % 2 == 0 );
  19. filterDataStream.print();
  20. env.execute("filter Test");
  21. }
  22. }


Union

Transformation(数据流转换):DataStream → DataStream
描述:Union 函数将两个或多个数据流结合在一起。 这样就可以并行地组合数据流。 如果我们将一个流与自身组合,那么它会输出每个记录两次
*代码示例:

  1. import org.apache.flink.streaming.api.datastream.DataStream;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import scala.Tuple2;
  4. public class UnionTest {
  5. public static void main(String[] args) throws Exception {
  6. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  7. DataStream<Tuple2<String,Integer>> dataStream1 = env.fromElements(
  8. new Tuple2<>("Flink",10),
  9. new Tuple2<>("Flink",20)
  10. );
  11. DataStream<Tuple2<String,Integer>> dataStream2 = env.fromElements(
  12. new Tuple2<>("Spark",15),
  13. new Tuple2<>("Spark",25));
  14. DataStream<Tuple2<String,Integer>> unionStream = dataStream1.union(dataStream2);
  15. unionStream.print();
  16. env.execute("Union Test");
  17. }
  18. }

Connection

Transformation(数据流转换):DataStream,DataStream → ConnectedStream
描述:将两个数据流合并到一起与Union 算子的区别:

  • ConnectedStreams 只能连接两个流,而 union 可以连接多于两个流。
  • ConnectedStreams 连接的两个流类型可以不一致,而 union 连接的流的类型必须一致。
  • ConnectedStreams 会对两个流的数据应用不同的处理方法,并且双流之间可以共享状态。这在第一个流的输入会影响第二个流时, 会非常有用。
  • Connect合并后,可用map中的CoMapFunction或flatMap中的CoFlatMapFunction来对合并流中的每个流进行处理

代码示例:

  1. import org.apache.flink.api.java.tuple.Tuple2;
  2. import org.apache.flink.api.java.tuple.Tuple3;
  3. import org.apache.flink.streaming.api.datastream.ConnectedStreams;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.api.functions.co.CoMapFunction;
  7. public class ConnectTest {
  8. public static void main(String[] args) throws Exception {
  9. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  10. DataStream<Tuple2<String,Integer>> dataStream1 = env.fromElements(
  11. new Tuple2<>("Flink",10),
  12. new Tuple2<>("Spark",15)
  13. );
  14. DataStream<Tuple2<String,String>> dataStream2 = env.fromElements(
  15. new Tuple2<>("Flink","流计算"),
  16. new Tuple2<>("Spark","微批计算")
  17. );
  18. ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, String>> connectedStreams = dataStream1.connect(dataStream2);
  19. DataStream<Object> dataStream3 = connectedStreams.keyBy(0,0)
  20. .map(new CoMapFunction<Tuple2<String, Integer>, Tuple2<String, String>, Object>() {
  21. @Override
  22. public String map1(Tuple2<String, Integer> value) throws Exception {
  23. return value.f0 + "-" + value.f1;
  24. }
  25. @Override
  26. public Tuple3<String,String,String> map2(Tuple2<String, String> value) throws Exception {
  27. return new Tuple3<String,String,String>(value.f0,"-", value.f1);
  28. }
  29. });
  30. dataStream3.print();
  31. env.execute("Connection Test");
  32. }
  33. }

Aggregations

Transformation(数据流转换):KeyedStream -> DataStream
描述:Aggregations 是官方提供的聚合算子,封装了常用的聚合操作
代码示例:

Tuple

访问 Tuple 的元素时可以用一个或多个位置键

  1. import org.apache.flink.api.java.tuple.Tuple;
  2. import org.apache.flink.api.java.tuple.Tuple6;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.datastream.KeyedStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. public class AggregationsTest {
  7. public static void main(String[] args) throws Exception {
  8. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  9. DataStream<Tuple6<String,String,String,Integer,Double,Integer>> dataStream = env.fromElements(
  10. new Tuple6<>("技术部","小明","男",25,5000.0,2),
  11. new Tuple6<>("技术部","李华","男",26,6000.0,3),
  12. new Tuple6<>("技术部","玛丽亚","女",24,4500.0,1),
  13. new Tuple6<>("技术部","Jane","男",28,8000.0,5),
  14. new Tuple6<>("技术部","康康","男",30,10000.0,10)
  15. );
  16. KeyedStream<Tuple6<String,String,String,Integer,Double,Integer>, Tuple> keyedStream = dataStream.keyBy(0);
  17. // keyedStream.print();
  18. // 滚动计算指定key的最小值,可以通过index或者fieldName来指定key
  19. // keyedStream.min(0).print();
  20. keyedStream.min(0).print();
  21. // // 滚动计算指定key的最大值
  22. keyedStream.max(0).print();
  23. // // 滚动计算指定key的最小值,并返回其对应的元素
  24. keyedStream.minBy(0).print();
  25. // // 滚动计算指定key的最大值,并返回其对应的元素
  26. keyedStream.maxBy(0).print();
  27. env.execute("Aggregations Test");
  28. }
  29. }

POJOs

使用 POJOs 时不能使用位置键,需要指定key
定义 PoJo

  1. import lombok.AllArgsConstructor;
  2. import lombok.Data;
  3. import lombok.EqualsAndHashCode;
  4. @Data
  5. @AllArgsConstructor
  6. @EqualsAndHashCode
  7. public class Company {
  8. public String name;
  9. public String department;
  10. public String gender;
  11. public int age;
  12. public double salary;
  13. public int workYear;
  14. public Company(){};
  15. }
  1. import dao.Company;
  2. import org.apache.flink.api.java.tuple.Tuple;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.datastream.KeyedStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. public class AggregationsPojoTest {
  7. public static void main(String[] args) throws Exception {
  8. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  9. DataStream<Company> dataStream = env.fromElements(
  10. new Company("技术部","小明","男",25,5000.0,2),
  11. new Company("技术部","李华","男",26,6000.0,3),
  12. new Company("技术部","玛丽亚","女",24,4500.0,1),
  13. new Company("技术部","Jane","男",28,8000.0,5),
  14. new Company("技术部","康康","男",30,10000.0,10)
  15. );
  16. KeyedStream<Company, Tuple> keyedStream = dataStream.keyBy("gender");
  17. // keyedStream.print();
  18. // 滚动计算指定key的最小值,可以通过index或者fieldName来指定key
  19. // keyedStream.min(0).print();
  20. keyedStream.min("gender").print();
  21. // // 滚动计算指定key的最大值
  22. keyedStream.max("gender").print();
  23. // // 滚动计算指定key的最小值,并返回其对应的元素
  24. keyedStream.minBy("gender").print();
  25. // // 滚动计算指定key的最大值,并返回其对应的元素
  26. keyedStream.maxBy("gender").print();
  27. env.execute("Aggregations Test");
  28. }
  29. }


windows

Transformation(数据流转换):KeyedStream → WindowedStream
描述:Window 函数允许按时间或其他条件对现有 KeyedStream 进行分组。 以下是以 5 秒的时间窗口聚合:
代码示例:

  1. import org.apache.flink.api.common.typeinfo.Types;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.datastream.WindowedStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  7. import org.apache.flink.streaming.api.windowing.time.Time;
  8. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  9. public class windowTest {
  10. public static void main(String[] args) throws Exception {
  11. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12. DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(
  13. new Tuple2<>("Flink",10),
  14. new Tuple2<>("Flink",20),
  15. new Tuple2<>("Spark",30),
  16. new Tuple2<>("Spark",15),
  17. new Tuple2<>("Storm",20),
  18. new Tuple2<>("Storm",5)
  19. );
  20. WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = dataStream.keyBy(value->value.f0)
  21. .window(TumblingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(5)));
  22. // .window( TumblingEventTimeWindows.of(Time.seconds(10))).;
  23. // 转为DataStream
  24. windowedStream.reduce(
  25. (value1,value2) -> new Tuple2<>(value1.f0, value1.f1+value2.f1)).returns(Types.TUPLE(Types.STRING, Types.INT)
  26. ).print();
  27. env.execute("Windows Test");
  28. }
  29. }

windowsAll

Transformation(数据流转换): DataStream -> WindowedStream
描述:flink 也可以可以在常规 DataStreams 上定义 WindowedStream,Windows 根据某些特征对所有流事件进行分组,通常,这是非并行数据转换,因为它在非分区数据流上运行
代码示例:

  1. import org.apache.flink.api.common.typeinfo.Types;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.AllWindowedStream;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.datastream.WindowedStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  8. import org.apache.flink.streaming.api.windowing.time.Time;
  9. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  10. public class windowAllTest {
  11. public static void main(String[] args) throws Exception {
  12. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13. // env.setStreamTimeCharacteristic();
  14. DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(
  15. new Tuple2<>("Flink",10),
  16. new Tuple2<>("Flink",20),
  17. new Tuple2<>("Spark",30),
  18. new Tuple2<>("Spark",15),
  19. new Tuple2<>("Storm",20),
  20. new Tuple2<>("Storm",5)
  21. );
  22. AllWindowedStream<Tuple2<String, Integer>, TimeWindow> windowedStream = dataStream
  23. .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)));
  24. // .window( TumblingEventTimeWindows.of(Time.seconds(10))).;
  25. windowedStream.reduce(
  26. (value1,value2) -> new Tuple2<>(value1.f0, value1.f1+value2.f1)).returns(Types.TUPLE(Types.STRING, Types.INT)
  27. ).print();
  28. env.execute("WindowsAll Test");
  29. }
  30. }

Window Join

Transformation(数据流转换):
描述:窗口连接连接两个共享公共密钥并位于同一窗口中的流的元素。可以使用窗口分配器定义这些窗口,并对来自两个流的元素进行评估。然后将来自双方的元素传递给用户定义的,JoinFunction或者FlatJoinFunction用户可以发出满足连接条件的结果。
代码示例:

  1. import org.apache.flink.api.common.functions.JoinFunction;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.api.java.tuple.Tuple3;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  7. import org.apache.flink.streaming.api.windowing.time.Time;
  8. public class WindowJoinTest {
  9. public static void main(String[] args) throws Exception {
  10. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  11. DataStream<Tuple2<String,Integer>> dataStream1 = env.fromElements(
  12. new Tuple2<>("Flink",10),
  13. new Tuple2<>("Spark",15)
  14. );
  15. DataStream<Tuple2<String,String>> dataStream2 = env.fromElements(
  16. new Tuple2<>("Flink","流计算"),
  17. new Tuple2<>("Spark","微批计算")
  18. );
  19. dataStream1.join(dataStream2).
  20. where(value -> value.f0)
  21. .equalTo(value -> value.f0)
  22. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  23. .apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, String>, Object>() {
  24. @Override
  25. public Object join(Tuple2<String, Integer> value1, Tuple2<String, String> value2) throws Exception {
  26. return new Tuple3<String,Integer,String>(value1.f0,value1.f1,value2.f1);
  27. }
  28. }).print()
  29. ;
  30. env.execute("Window Join Test");
  31. }
  32. }

windows Reduce

Transformation(数据流转换):
描述:
代码示例: