算子介绍(Operators)
Flink 的转换(Transformmations) 就是用户通过算子将一个 DataStream 转为新的 DataStream 的过程,主要分为三类:
- 数据流转换(DataStream Transformations):进行数据流相关转换操作;
- 物理分区(Physical partitioning):Flink 提供的底层 API ,用户在数据转换完成后对数据分区进行更细粒度的配置;
- 算子链和资源组(Task chaining and resource groups):允许用户进行算子链和资源组的细粒度的控制。
算子分类
Map
Transformation(数据流转换):[DataStream → DataStream]
描述:使用最多的算子,即输入一个元素,输出一个元素
代码示例:
给定1-9的数字,将其乘以2
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class OperatorTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> dataStream = env.fromElements(1,2,3,4,5,6,7,8,9,10);
DataStream<Integer> mapDataStream = dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer integer) throws Exception {
// 所有元素都乘以2
return integer * 2;
}
});
mapDataStream.print();
env.execute("Opeartor Test");
}
}
// 输出: 2,4,6,8,10,12,14,16,18
FlatMap
Transformation(数据流转换):DataStream → DataStream
描述:与Map功能一致,但是 FlatMap 是输入一个元素,输出0个或多个元素
代码示例:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class FlatMapTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String in = "Flink,Spark,Storm,Hive";
DataStream<String> dataStream = env.fromElements(in);
DataStream<String> flatMapStream = dataStream.flatMap(
new FlatMapFunction<String, String>(){
@Override
public void flatMap(String in, Collector<String> out) throws Exception {
for(String s: in.split(",")){
out.collect(s);
}
}
}
).returns(Types.STRING);
// Lambda 写法
// DataStream<String> flatMapStream = dataStream.flatMap((String s,Collector<String> out) ->
// Arrays.stream(s.split(",")).forEach(x -> out.collect(x))
// ).returns(Types.STRING);
flatMapStream.print();
env.execute("FlatMap");
}
}
KeyBy
Transformation(数据流转换):DataStream -> KeyedStream
描述:KeyBy 将相同键划分刀捅一个分区,在逻辑上是基于 key 对流进行分区。在内部,它使用 hash 函数对流进行分区
代码示例:
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KeyByTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(
new Tuple2<>("Flink",10),
new Tuple2<>("Flink",20),
new Tuple2<>("Spark",30),
new Tuple2<>("Spark",15),
new Tuple2<>("Storm",20),
new Tuple2<>("Storm",5)
);
KeyedStream<Tuple2<String, Integer>,String> keyedStream = dataStream.keyBy(
new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> in) throws Exception {
return in.f0;
}
}
);
keyedStream.print();
env.execute("KeyBy Test");
}
reduce
Transformation(数据流转换):KeyedStream -> DataStream
描述:Reduce 返回单个的结果值,并且 reduce 操作每处理一个元素总是创建一个新值。常用的方法有 average, sum, min, max, count,使用 reduce 方法都可实现。
代码示例:
import javassist.bytecode.analysis.Type;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class reduceTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(
new Tuple2<>("Flink",10),
new Tuple2<>("Flink",20),
new Tuple2<>("Spark",30),
new Tuple2<>("Spark",15),
new Tuple2<>("Storm",20),
new Tuple2<>("Storm",5)
);
KeyedStream<Tuple2<String, Integer>,String> keyedStream = dataStream.keyBy(
new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> in) throws Exception {
return in.f0;
}
}
);
DataStream<Tuple2<String,Integer>> reduceStream = keyedStream.reduce(
new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> stringIntegerTuple2, Tuple2<String, Integer> t1) throws Exception {
return new Tuple2<>(stringIntegerTuple2.f0,stringIntegerTuple2.f1 + t1.f1);
}
}
);
// lambda写法
// SingleOutputStreamOperator<Tuple2<String,Integer>> reduceStream = dataStream.keyBy(0)
// .reduce((value1,value2) -> new Tuple2<>(value1.f0,value1.f1+value2.f1)).returns(Types.TUPLE(Types.STRING, Types.INT));
reduceStream.print();
env.execute("reduce Test");
}
}
Filter
Transformation(数据流转换):DataStream -> DataStream
描述:过滤出符合条件的数据
代码示例:
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class filterTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
DataStream<Integer> filterDataStream = dataStream.filter(
new FilterFunction<Integer>() {
@Override
public boolean filter(Integer integer) throws Exception {
return integer % 2 == 0;
}
}
);
// Lambda 写法
// DataStream<Integer> filterDataStream = dataStream.filter(x -> x % 2 == 0 );
filterDataStream.print();
env.execute("filter Test");
}
}
Union
Transformation(数据流转换):DataStream → DataStream
描述:Union 函数将两个或多个数据流结合在一起。 这样就可以并行地组合数据流。 如果我们将一个流与自身组合,那么它会输出每个记录两次
*代码示例:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import scala.Tuple2;
public class UnionTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String,Integer>> dataStream1 = env.fromElements(
new Tuple2<>("Flink",10),
new Tuple2<>("Flink",20)
);
DataStream<Tuple2<String,Integer>> dataStream2 = env.fromElements(
new Tuple2<>("Spark",15),
new Tuple2<>("Spark",25));
DataStream<Tuple2<String,Integer>> unionStream = dataStream1.union(dataStream2);
unionStream.print();
env.execute("Union Test");
}
}
Connection
Transformation(数据流转换):DataStream,DataStream → ConnectedStream
描述:将两个数据流合并到一起与Union 算子的区别:
- ConnectedStreams 只能连接两个流,而 union 可以连接多于两个流。
- ConnectedStreams 连接的两个流类型可以不一致,而 union 连接的流的类型必须一致。
- ConnectedStreams 会对两个流的数据应用不同的处理方法,并且双流之间可以共享状态。这在第一个流的输入会影响第二个流时, 会非常有用。
- Connect合并后,可用map中的CoMapFunction或flatMap中的CoFlatMapFunction来对合并流中的每个流进行处理
代码示例:
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
public class ConnectTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String,Integer>> dataStream1 = env.fromElements(
new Tuple2<>("Flink",10),
new Tuple2<>("Spark",15)
);
DataStream<Tuple2<String,String>> dataStream2 = env.fromElements(
new Tuple2<>("Flink","流计算"),
new Tuple2<>("Spark","微批计算")
);
ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, String>> connectedStreams = dataStream1.connect(dataStream2);
DataStream<Object> dataStream3 = connectedStreams.keyBy(0,0)
.map(new CoMapFunction<Tuple2<String, Integer>, Tuple2<String, String>, Object>() {
@Override
public String map1(Tuple2<String, Integer> value) throws Exception {
return value.f0 + "-" + value.f1;
}
@Override
public Tuple3<String,String,String> map2(Tuple2<String, String> value) throws Exception {
return new Tuple3<String,String,String>(value.f0,"-", value.f1);
}
});
dataStream3.print();
env.execute("Connection Test");
}
}
Aggregations
Transformation(数据流转换):KeyedStream -> DataStream
描述:Aggregations 是官方提供的聚合算子,封装了常用的聚合操作
代码示例:
Tuple
访问 Tuple 的元素时可以用一个或多个位置键
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class AggregationsTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple6<String,String,String,Integer,Double,Integer>> dataStream = env.fromElements(
new Tuple6<>("技术部","小明","男",25,5000.0,2),
new Tuple6<>("技术部","李华","男",26,6000.0,3),
new Tuple6<>("技术部","玛丽亚","女",24,4500.0,1),
new Tuple6<>("技术部","Jane","男",28,8000.0,5),
new Tuple6<>("技术部","康康","男",30,10000.0,10)
);
KeyedStream<Tuple6<String,String,String,Integer,Double,Integer>, Tuple> keyedStream = dataStream.keyBy(0);
// keyedStream.print();
// 滚动计算指定key的最小值,可以通过index或者fieldName来指定key
// keyedStream.min(0).print();
keyedStream.min(0).print();
// // 滚动计算指定key的最大值
keyedStream.max(0).print();
// // 滚动计算指定key的最小值,并返回其对应的元素
keyedStream.minBy(0).print();
// // 滚动计算指定key的最大值,并返回其对应的元素
keyedStream.maxBy(0).print();
env.execute("Aggregations Test");
}
}
POJOs
使用 POJOs 时不能使用位置键,需要指定key
定义 PoJo
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
@Data
@AllArgsConstructor
@EqualsAndHashCode
public class Company {
public String name;
public String department;
public String gender;
public int age;
public double salary;
public int workYear;
public Company(){};
}
import dao.Company;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class AggregationsPojoTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Company> dataStream = env.fromElements(
new Company("技术部","小明","男",25,5000.0,2),
new Company("技术部","李华","男",26,6000.0,3),
new Company("技术部","玛丽亚","女",24,4500.0,1),
new Company("技术部","Jane","男",28,8000.0,5),
new Company("技术部","康康","男",30,10000.0,10)
);
KeyedStream<Company, Tuple> keyedStream = dataStream.keyBy("gender");
// keyedStream.print();
// 滚动计算指定key的最小值,可以通过index或者fieldName来指定key
// keyedStream.min(0).print();
keyedStream.min("gender").print();
// // 滚动计算指定key的最大值
keyedStream.max("gender").print();
// // 滚动计算指定key的最小值,并返回其对应的元素
keyedStream.minBy("gender").print();
// // 滚动计算指定key的最大值,并返回其对应的元素
keyedStream.maxBy("gender").print();
env.execute("Aggregations Test");
}
}
windows
Transformation(数据流转换):KeyedStream → WindowedStream
描述:Window 函数允许按时间或其他条件对现有 KeyedStream 进行分组。 以下是以 5 秒的时间窗口聚合:
代码示例:
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class windowTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(
new Tuple2<>("Flink",10),
new Tuple2<>("Flink",20),
new Tuple2<>("Spark",30),
new Tuple2<>("Spark",15),
new Tuple2<>("Storm",20),
new Tuple2<>("Storm",5)
);
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = dataStream.keyBy(value->value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(5)));
// .window( TumblingEventTimeWindows.of(Time.seconds(10))).;
// 转为DataStream
windowedStream.reduce(
(value1,value2) -> new Tuple2<>(value1.f0, value1.f1+value2.f1)).returns(Types.TUPLE(Types.STRING, Types.INT)
).print();
env.execute("Windows Test");
}
}
windowsAll
Transformation(数据流转换): DataStream -> WindowedStream
描述:flink 也可以可以在常规 DataStreams 上定义 WindowedStream,Windows 根据某些特征对所有流事件进行分组,通常,这是非并行数据转换,因为它在非分区数据流上运行
代码示例:
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class windowAllTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setStreamTimeCharacteristic();
DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(
new Tuple2<>("Flink",10),
new Tuple2<>("Flink",20),
new Tuple2<>("Spark",30),
new Tuple2<>("Spark",15),
new Tuple2<>("Storm",20),
new Tuple2<>("Storm",5)
);
AllWindowedStream<Tuple2<String, Integer>, TimeWindow> windowedStream = dataStream
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)));
// .window( TumblingEventTimeWindows.of(Time.seconds(10))).;
windowedStream.reduce(
(value1,value2) -> new Tuple2<>(value1.f0, value1.f1+value2.f1)).returns(Types.TUPLE(Types.STRING, Types.INT)
).print();
env.execute("WindowsAll Test");
}
}
Window Join
Transformation(数据流转换):
描述:窗口连接连接两个共享公共密钥并位于同一窗口中的流的元素。可以使用窗口分配器定义这些窗口,并对来自两个流的元素进行评估。然后将来自双方的元素传递给用户定义的,JoinFunction或者FlatJoinFunction用户可以发出满足连接条件的结果。
代码示例:
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class WindowJoinTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String,Integer>> dataStream1 = env.fromElements(
new Tuple2<>("Flink",10),
new Tuple2<>("Spark",15)
);
DataStream<Tuple2<String,String>> dataStream2 = env.fromElements(
new Tuple2<>("Flink","流计算"),
new Tuple2<>("Spark","微批计算")
);
dataStream1.join(dataStream2).
where(value -> value.f0)
.equalTo(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, String>, Object>() {
@Override
public Object join(Tuple2<String, Integer> value1, Tuple2<String, String> value2) throws Exception {
return new Tuple3<String,Integer,String>(value1.f0,value1.f1,value2.f1);
}
}).print()
;
env.execute("Window Join Test");
}
}
windows Reduce
Transformation(数据流转换):
描述:
代码示例: