Union
定义
| Transformation | Description |
|---|---|
| DataStream* → DataStream | Union of two or more data streams creating a new stream containing all the elements from all the streams.Creates a new DataStream by merging DataStream outputs of the same type with each other. The DataStreams merged using this operator will be transformed simultaneously. |
说明
两个或以上的流合并成一个,这两个流的数据类型必须一样
样例
代码
public class UnionDemo {public static void main(String[] args) throws Exception{final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();env.setParallelism(1);DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new DataSource("orangeStream"));DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new DataSource("greenStream"));orangeStream.union(greenStream).print("union");env.execute("Union Demo");}private static class DataSource extends RichParallelSourceFunction<Tuple2<String, Integer>> {private volatile boolean running = true;private volatile String name;public DataSource(String name) {this.name = name;}@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {Random random = new Random();int bound = 100;final long numElements = 6;int i = 0;while (running && i < numElements) {Thread.sleep(1500);Tuple2 data = new Tuple2<>("foo", random.nextInt(bound));ctx.collect(data);System.out.println(Thread.currentThread().getId() + "-" + this.name + "-sand data:" + data);i++;}}@Overridepublic void cancel() {running = false;}}}
输出结果
59-greenStream-sand data:(foo,6)58-orangeStream-sand data:(foo,33)union> (foo,33)union> (foo,6)59-greenStream-sand data:(foo,99)58-orangeStream-sand data:(foo,99)union> (foo,99)union> (foo,99)58-orangeStream-sand data:(foo,23)59-greenStream-sand data:(foo,36)union> (foo,23)union> (foo,36)59-greenStream-sand data:(foo,78)58-orangeStream-sand data:(foo,7)union> (foo,7)union> (foo,78)59-greenStream-sand data:(foo,7)58-orangeStream-sand data:(foo,58)union> (foo,58)union> (foo,7)59-greenStream-sand data:(foo,66)58-orangeStream-sand data:(foo,80)union> (foo,80)union> (foo,66)
说明
DataSource类随机发送数据作为数据源,将两个流做union输出
Connect&CoFlatMap
定义
| Transformation | Description |
|---|---|
| DataStream,DataStream → ConnectedStreams | “Connects” two data streams retaining their types. Connect allowing for shared state between the two streams. |
| ConnectedStreams → DataStream | Similar to flatMap on a connected data stream |
说明
Connect可以连接两个不同数据类型的流,这个是和union最主要的区别,其次union支持2个以上流的合并,而Connect只支持2个流;可以借助CoFlatMap将不同类型的流进行类型统一等操作。
样例
代码
public class ConnectDemo {public static void main(String[] args) throws Exception{final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();env.setParallelism(1);DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new DataSource1());DataStream<Tuple3<String, Integer, Integer>> greenStream = env.addSource(new DataSource2());orangeStream.connect(greenStream).flatMap(new CoFlatMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, Object>() {@Overridepublic void flatMap1(Tuple2<String, Integer> value, Collector<Object> out) throws Exception {if (!value.f0.contains("@")){out.collect(new Tuple3<>(value.f0, value.f1, RandomUtils.nextInt(0, value.f1)));}}@Overridepublic void flatMap2(Tuple3<String, Integer, Integer> value, Collector<Object> out) throws Exception {for (String s : value.f0.split("@")) {out.collect(new Tuple3<>(s, value.f1, value.f2));}}}).print("Connect");env.execute("Connect Demo");}private static class DataSource1 extends RichParallelSourceFunction<Tuple2<String, Integer>> {private volatile boolean running = true;@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {int bound = 50;String[] keys = new String[]{"foo@xxyz", "bar", "baz"};final long numElements = RandomUtils.nextLong(10, 20);int i = 0;while (running && i < numElements) {Thread.sleep(RandomUtils.nextLong(1, 5) * 1000L);Tuple2 data = new Tuple2<>(keys[RandomUtils.nextInt(0, 3)], RandomUtils.nextInt(0, bound));ctx.collect(data);System.out.println(Thread.currentThread().getId() + "-sand data:" + data);i++;}}@Overridepublic void cancel() {running = false;}}private static class DataSource2 extends RichParallelSourceFunction<Tuple3<String, Integer, Integer>> {private volatile boolean running = true;@Overridepublic void run(SourceContext<Tuple3<String, Integer, Integer>> ctx) throws Exception {int bound = 50;String[] keys = new String[]{"foo@xxyz", "bar", "baz"};final long numElements = RandomUtils.nextLong(10, 20);int i = 0;while (running && i < numElements) {Thread.sleep(RandomUtils.nextLong(1, 5) * 1000L);Tuple3 data = new Tuple3<>(keys[RandomUtils.nextInt(0, 3)], RandomUtils.nextInt(0, bound), RandomUtils.nextInt(0, bound));ctx.collect(data);System.out.println(Thread.currentThread().getId() + "-sand data:" + data);i++;}}@Overridepublic void cancel() {running = false;}}}
输出结果
58-sand data:(foo,xxyz,44)59-sand data:(foo,xxyz,47,41)Connect> (foo,47,41)Connect> (xxyz,47,41)58-sand data:(foo,xxyz,0)59-sand data:(baz,12,12)Connect> (baz,12,12)58-sand data:(foo,xxyz,39)59-sand data:(baz,23,27)Connect> (baz,23,27)
说明
DataSource1模拟流的数据类型为Tuple2,DataSource2模拟流的数据类型为Tuple3- 通过
CoFlatMap将数据做一个类型格式统一
Iterate
定义
| Transformation | Description |
|---|---|
| DataStream → IterativeStream → DataStream | Creates a “feedback” loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. |
说明
Iterate提供了一种流计算里面的类似递归方法
样例
代码
public class IterateDemo {public static void main(String[] args) throws Exception{final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();env.setParallelism(1);DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new DataSource());IterativeStream<Tuple2<String, Integer>> iteration = orangeStream.iterate(5000);DataStream<Tuple2<String, Integer>> iterationBody = iteration.map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {return new Tuple2<>(value.f0, value.f1-5);}});DataStream<Tuple2<String, Integer>> feedback = iterationBody.filter(new FilterFunction<Tuple2<String, Integer>>() {@Overridepublic boolean filter(Tuple2<String, Integer> value) throws Exception {return value.f1 > 25;}});iteration.closeWith(feedback);DataStream<Tuple2<String, Integer>> output = iterationBody.filter(new FilterFunction<Tuple2<String, Integer>>() {@Overridepublic boolean filter(Tuple2<String, Integer> value) throws Exception {return value.f1 <= 25;}});feedback.print("Iterate feedback");output.print("Iterate output");env.execute("Iterate Demo");}private static class DataSource extends RichParallelSourceFunction<Tuple2<String, Integer>> {private volatile boolean running = true;@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {int bound = 20;String[] keys = new String[]{"foo", "bar", "baz"};final long numElements = RandomUtils.nextLong(10, 20);int i = 0;while (running && i < numElements) {Thread.sleep(RandomUtils.nextLong(1, 5) * 1000L);if (i == 0){ctx.collect(new Tuple2<>(keys[RandomUtils.nextInt(0, 3)], 36));}else {Tuple2 data = new Tuple2<>(keys[RandomUtils.nextInt(0, 3)], RandomUtils.nextInt(10, bound));ctx.collect(data);System.out.println(Thread.currentThread().getId() + "-sand data:" + data);}i++;}}@Overridepublic void cancel() {running = false;}}}
输出结果
Iterate feedback> (bar,31)Iterate feedback> (bar,26)Iterate output> (bar,21)59-sand data:(baz,11)Iterate output> (baz,6)59-sand data:(foo,14)
说明
发送一条值是36的数据,通过map方法之后仍然大于25,进入到递归,继续执行map里面的方法,直到不满足递归条件后输出结果
