算子介绍(Operators)
Flink 的转换(Transformmations) 就是用户通过算子将一个 DataStream 转为新的 DataStream 的过程,主要分为三类:
- 数据流转换(DataStream Transformations):进行数据流相关转换操作;
- 物理分区(Physical partitioning):Flink 提供的底层 API ,用户在数据转换完成后对数据分区进行更细粒度的配置;
- 算子链和资源组(Task chaining and resource groups):允许用户进行算子链和资源组的细粒度的控制。
算子分类
Map
Transformation(数据流转换):[DataStream → DataStream]
描述:使用最多的算子,即输入一个元素,输出一个元素
代码示例:
给定1-9的数字,将其乘以2
import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class OperatorTest {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Integer> dataStream = env.fromElements(1,2,3,4,5,6,7,8,9,10);DataStream<Integer> mapDataStream = dataStream.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer integer) throws Exception {// 所有元素都乘以2return integer * 2;}});mapDataStream.print();env.execute("Opeartor Test");}}// 输出: 2,4,6,8,10,12,14,16,18
FlatMap
Transformation(数据流转换):DataStream → DataStream
描述:与Map功能一致,但是 FlatMap 是输入一个元素,输出0个或多个元素
代码示例:
import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.util.Arrays;public class FlatMapTest {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String in = "Flink,Spark,Storm,Hive";DataStream<String> dataStream = env.fromElements(in);DataStream<String> flatMapStream = dataStream.flatMap(new FlatMapFunction<String, String>(){@Overridepublic void flatMap(String in, Collector<String> out) throws Exception {for(String s: in.split(",")){out.collect(s);}}}).returns(Types.STRING);// Lambda 写法// DataStream<String> flatMapStream = dataStream.flatMap((String s,Collector<String> out) ->// Arrays.stream(s.split(",")).forEach(x -> out.collect(x))// ).returns(Types.STRING);flatMapStream.print();env.execute("FlatMap");}}
KeyBy
Transformation(数据流转换):DataStream -> KeyedStream
描述:KeyBy 将相同键划分刀捅一个分区,在逻辑上是基于 key 对流进行分区。在内部,它使用 hash 函数对流进行分区
代码示例:
import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyByTest {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(new Tuple2<>("Flink",10),new Tuple2<>("Flink",20),new Tuple2<>("Spark",30),new Tuple2<>("Spark",15),new Tuple2<>("Storm",20),new Tuple2<>("Storm",5));KeyedStream<Tuple2<String, Integer>,String> keyedStream = dataStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> in) throws Exception {return in.f0;}});keyedStream.print();env.execute("KeyBy Test");}
reduce
Transformation(数据流转换):KeyedStream -> DataStream
描述:Reduce 返回单个的结果值,并且 reduce 操作每处理一个元素总是创建一个新值。常用的方法有 average, sum, min, max, count,使用 reduce 方法都可实现。
代码示例:
import javassist.bytecode.analysis.Type;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class reduceTest {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(new Tuple2<>("Flink",10),new Tuple2<>("Flink",20),new Tuple2<>("Spark",30),new Tuple2<>("Spark",15),new Tuple2<>("Storm",20),new Tuple2<>("Storm",5));KeyedStream<Tuple2<String, Integer>,String> keyedStream = dataStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> in) throws Exception {return in.f0;}});DataStream<Tuple2<String,Integer>> reduceStream = keyedStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> stringIntegerTuple2, Tuple2<String, Integer> t1) throws Exception {return new Tuple2<>(stringIntegerTuple2.f0,stringIntegerTuple2.f1 + t1.f1);}});// lambda写法// SingleOutputStreamOperator<Tuple2<String,Integer>> reduceStream = dataStream.keyBy(0)// .reduce((value1,value2) -> new Tuple2<>(value1.f0,value1.f1+value2.f1)).returns(Types.TUPLE(Types.STRING, Types.INT));reduceStream.print();env.execute("reduce Test");}}
Filter
Transformation(数据流转换):DataStream -> DataStream
描述:过滤出符合条件的数据
代码示例:
import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class filterTest {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);DataStream<Integer> filterDataStream = dataStream.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer integer) throws Exception {return integer % 2 == 0;}});// Lambda 写法// DataStream<Integer> filterDataStream = dataStream.filter(x -> x % 2 == 0 );filterDataStream.print();env.execute("filter Test");}}
Union
Transformation(数据流转换):DataStream → DataStream
描述:Union 函数将两个或多个数据流结合在一起。 这样就可以并行地组合数据流。 如果我们将一个流与自身组合,那么它会输出每个记录两次
*代码示例:
import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import scala.Tuple2;public class UnionTest {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String,Integer>> dataStream1 = env.fromElements(new Tuple2<>("Flink",10),new Tuple2<>("Flink",20));DataStream<Tuple2<String,Integer>> dataStream2 = env.fromElements(new Tuple2<>("Spark",15),new Tuple2<>("Spark",25));DataStream<Tuple2<String,Integer>> unionStream = dataStream1.union(dataStream2);unionStream.print();env.execute("Union Test");}}
Connection
Transformation(数据流转换):DataStream,DataStream → ConnectedStream
描述:将两个数据流合并到一起与Union 算子的区别:
- ConnectedStreams 只能连接两个流,而 union 可以连接多于两个流。
- ConnectedStreams 连接的两个流类型可以不一致,而 union 连接的流的类型必须一致。
- ConnectedStreams 会对两个流的数据应用不同的处理方法,并且双流之间可以共享状态。这在第一个流的输入会影响第二个流时, 会非常有用。
- Connect合并后,可用map中的CoMapFunction或flatMap中的CoFlatMapFunction来对合并流中的每个流进行处理
代码示例:
import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.ConnectedStreams;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.co.CoMapFunction;public class ConnectTest {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String,Integer>> dataStream1 = env.fromElements(new Tuple2<>("Flink",10),new Tuple2<>("Spark",15));DataStream<Tuple2<String,String>> dataStream2 = env.fromElements(new Tuple2<>("Flink","流计算"),new Tuple2<>("Spark","微批计算"));ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, String>> connectedStreams = dataStream1.connect(dataStream2);DataStream<Object> dataStream3 = connectedStreams.keyBy(0,0).map(new CoMapFunction<Tuple2<String, Integer>, Tuple2<String, String>, Object>() {@Overridepublic String map1(Tuple2<String, Integer> value) throws Exception {return value.f0 + "-" + value.f1;}@Overridepublic Tuple3<String,String,String> map2(Tuple2<String, String> value) throws Exception {return new Tuple3<String,String,String>(value.f0,"-", value.f1);}});dataStream3.print();env.execute("Connection Test");}}
Aggregations
Transformation(数据流转换):KeyedStream -> DataStream
描述:Aggregations 是官方提供的聚合算子,封装了常用的聚合操作
代码示例:
Tuple
访问 Tuple 的元素时可以用一个或多个位置键
import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.api.java.tuple.Tuple6;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class AggregationsTest {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple6<String,String,String,Integer,Double,Integer>> dataStream = env.fromElements(new Tuple6<>("技术部","小明","男",25,5000.0,2),new Tuple6<>("技术部","李华","男",26,6000.0,3),new Tuple6<>("技术部","玛丽亚","女",24,4500.0,1),new Tuple6<>("技术部","Jane","男",28,8000.0,5),new Tuple6<>("技术部","康康","男",30,10000.0,10));KeyedStream<Tuple6<String,String,String,Integer,Double,Integer>, Tuple> keyedStream = dataStream.keyBy(0);// keyedStream.print();// 滚动计算指定key的最小值,可以通过index或者fieldName来指定key// keyedStream.min(0).print();keyedStream.min(0).print();// // 滚动计算指定key的最大值keyedStream.max(0).print();// // 滚动计算指定key的最小值,并返回其对应的元素keyedStream.minBy(0).print();// // 滚动计算指定key的最大值,并返回其对应的元素keyedStream.maxBy(0).print();env.execute("Aggregations Test");}}
POJOs
使用 POJOs 时不能使用位置键,需要指定key
定义 PoJo
import lombok.AllArgsConstructor;import lombok.Data;import lombok.EqualsAndHashCode;@Data@AllArgsConstructor@EqualsAndHashCodepublic class Company {public String name;public String department;public String gender;public int age;public double salary;public int workYear;public Company(){};}
import dao.Company;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class AggregationsPojoTest {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Company> dataStream = env.fromElements(new Company("技术部","小明","男",25,5000.0,2),new Company("技术部","李华","男",26,6000.0,3),new Company("技术部","玛丽亚","女",24,4500.0,1),new Company("技术部","Jane","男",28,8000.0,5),new Company("技术部","康康","男",30,10000.0,10));KeyedStream<Company, Tuple> keyedStream = dataStream.keyBy("gender");// keyedStream.print();// 滚动计算指定key的最小值,可以通过index或者fieldName来指定key// keyedStream.min(0).print();keyedStream.min("gender").print();// // 滚动计算指定key的最大值keyedStream.max("gender").print();// // 滚动计算指定key的最小值,并返回其对应的元素keyedStream.minBy("gender").print();// // 滚动计算指定key的最大值,并返回其对应的元素keyedStream.maxBy("gender").print();env.execute("Aggregations Test");}}
windows
Transformation(数据流转换):KeyedStream → WindowedStream
描述:Window 函数允许按时间或其他条件对现有 KeyedStream 进行分组。 以下是以 5 秒的时间窗口聚合:
代码示例:
import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.WindowedStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class windowTest {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(new Tuple2<>("Flink",10),new Tuple2<>("Flink",20),new Tuple2<>("Spark",30),new Tuple2<>("Spark",15),new Tuple2<>("Storm",20),new Tuple2<>("Storm",5));WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = dataStream.keyBy(value->value.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(5)));// .window( TumblingEventTimeWindows.of(Time.seconds(10))).;// 转为DataStreamwindowedStream.reduce((value1,value2) -> new Tuple2<>(value1.f0, value1.f1+value2.f1)).returns(Types.TUPLE(Types.STRING, Types.INT)).print();env.execute("Windows Test");}}
windowsAll
Transformation(数据流转换): DataStream -> WindowedStream
描述:flink 也可以可以在常规 DataStreams 上定义 WindowedStream,Windows 根据某些特征对所有流事件进行分组,通常,这是非并行数据转换,因为它在非分区数据流上运行
代码示例:
import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.AllWindowedStream;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.WindowedStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class windowAllTest {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setStreamTimeCharacteristic();DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(new Tuple2<>("Flink",10),new Tuple2<>("Flink",20),new Tuple2<>("Spark",30),new Tuple2<>("Spark",15),new Tuple2<>("Storm",20),new Tuple2<>("Storm",5));AllWindowedStream<Tuple2<String, Integer>, TimeWindow> windowedStream = dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)));// .window( TumblingEventTimeWindows.of(Time.seconds(10))).;windowedStream.reduce((value1,value2) -> new Tuple2<>(value1.f0, value1.f1+value2.f1)).returns(Types.TUPLE(Types.STRING, Types.INT)).print();env.execute("WindowsAll Test");}}
Window Join
Transformation(数据流转换):
描述:窗口连接连接两个共享公共密钥并位于同一窗口中的流的元素。可以使用窗口分配器定义这些窗口,并对来自两个流的元素进行评估。然后将来自双方的元素传递给用户定义的,JoinFunction或者FlatJoinFunction用户可以发出满足连接条件的结果。
代码示例:
import org.apache.flink.api.common.functions.JoinFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;public class WindowJoinTest {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String,Integer>> dataStream1 = env.fromElements(new Tuple2<>("Flink",10),new Tuple2<>("Spark",15));DataStream<Tuple2<String,String>> dataStream2 = env.fromElements(new Tuple2<>("Flink","流计算"),new Tuple2<>("Spark","微批计算"));dataStream1.join(dataStream2).where(value -> value.f0).equalTo(value -> value.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, String>, Object>() {@Overridepublic Object join(Tuple2<String, Integer> value1, Tuple2<String, String> value2) throws Exception {return new Tuple3<String,Integer,String>(value1.f0,value1.f1,value2.f1);}}).print();env.execute("Window Join Test");}}
windows Reduce
Transformation(数据流转换):
描述:
代码示例:
