Union

定义

Transformation Description
DataStream* → DataStream Union of two or more data streams creating a new stream containing all the elements from all the streams.Creates a new DataStream by merging DataStream outputs of the same type with each other. The DataStreams merged using this operator will be transformed simultaneously.

说明

两个或以上的流合并成一个,这两个流的数据类型必须一样

样例

代码

  1. public class UnionDemo {
  2. public static void main(String[] args) throws Exception{
  3. final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  4. env.setParallelism(1);
  5. DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new DataSource("orangeStream"));
  6. DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new DataSource("greenStream"));
  7. orangeStream.union(greenStream).print("union");
  8. env.execute("Union Demo");
  9. }
  10. private static class DataSource extends RichParallelSourceFunction<Tuple2<String, Integer>> {
  11. private volatile boolean running = true;
  12. private volatile String name;
  13. public DataSource(String name) {
  14. this.name = name;
  15. }
  16. @Override
  17. public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
  18. Random random = new Random();
  19. int bound = 100;
  20. final long numElements = 6;
  21. int i = 0;
  22. while (running && i < numElements) {
  23. Thread.sleep(1500);
  24. Tuple2 data = new Tuple2<>("foo", random.nextInt(bound));
  25. ctx.collect(data);
  26. System.out.println(Thread.currentThread().getId() + "-" + this.name + "-sand data:" + data);
  27. i++;
  28. }
  29. }
  30. @Override
  31. public void cancel() {
  32. running = false;
  33. }
  34. }
  35. }

输出结果

  1. 59-greenStream-sand data:(foo,6)
  2. 58-orangeStream-sand data:(foo,33)
  3. union> (foo,33)
  4. union> (foo,6)
  5. 59-greenStream-sand data:(foo,99)
  6. 58-orangeStream-sand data:(foo,99)
  7. union> (foo,99)
  8. union> (foo,99)
  9. 58-orangeStream-sand data:(foo,23)
  10. 59-greenStream-sand data:(foo,36)
  11. union> (foo,23)
  12. union> (foo,36)
  13. 59-greenStream-sand data:(foo,78)
  14. 58-orangeStream-sand data:(foo,7)
  15. union> (foo,7)
  16. union> (foo,78)
  17. 59-greenStream-sand data:(foo,7)
  18. 58-orangeStream-sand data:(foo,58)
  19. union> (foo,58)
  20. union> (foo,7)
  21. 59-greenStream-sand data:(foo,66)
  22. 58-orangeStream-sand data:(foo,80)
  23. union> (foo,80)
  24. union> (foo,66)

说明

DataSource类随机发送数据作为数据源,将两个流做union输出

Connect&CoFlatMap

定义

Transformation Description
DataStream,DataStream → ConnectedStreams “Connects” two data streams retaining their types. Connect allowing for shared state between the two streams.
ConnectedStreams → DataStream Similar to flatMap on a connected data stream

说明

Connect可以连接两个不同数据类型的流,这个是和union最主要的区别,其次union支持2个以上流的合并,而Connect只支持2个流;可以借助CoFlatMap将不同类型的流进行类型统一等操作。

样例

代码

  1. public class ConnectDemo {
  2. public static void main(String[] args) throws Exception{
  3. final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  4. env.setParallelism(1);
  5. DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new DataSource1());
  6. DataStream<Tuple3<String, Integer, Integer>> greenStream = env.addSource(new DataSource2());
  7. orangeStream.connect(greenStream).flatMap(new CoFlatMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, Object>() {
  8. @Override
  9. public void flatMap1(Tuple2<String, Integer> value, Collector<Object> out) throws Exception {
  10. if (!value.f0.contains("@")){
  11. out.collect(new Tuple3<>(value.f0, value.f1, RandomUtils.nextInt(0, value.f1)));
  12. }
  13. }
  14. @Override
  15. public void flatMap2(Tuple3<String, Integer, Integer> value, Collector<Object> out) throws Exception {
  16. for (String s : value.f0.split("@")) {
  17. out.collect(new Tuple3<>(s, value.f1, value.f2));
  18. }
  19. }
  20. }).print("Connect");
  21. env.execute("Connect Demo");
  22. }
  23. private static class DataSource1 extends RichParallelSourceFunction<Tuple2<String, Integer>> {
  24. private volatile boolean running = true;
  25. @Override
  26. public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
  27. int bound = 50;
  28. String[] keys = new String[]{"foo@xxyz", "bar", "baz"};
  29. final long numElements = RandomUtils.nextLong(10, 20);
  30. int i = 0;
  31. while (running && i < numElements) {
  32. Thread.sleep(RandomUtils.nextLong(1, 5) * 1000L);
  33. Tuple2 data = new Tuple2<>(keys[RandomUtils.nextInt(0, 3)], RandomUtils.nextInt(0, bound));
  34. ctx.collect(data);
  35. System.out.println(Thread.currentThread().getId() + "-sand data:" + data);
  36. i++;
  37. }
  38. }
  39. @Override
  40. public void cancel() {
  41. running = false;
  42. }
  43. }
  44. private static class DataSource2 extends RichParallelSourceFunction<Tuple3<String, Integer, Integer>> {
  45. private volatile boolean running = true;
  46. @Override
  47. public void run(SourceContext<Tuple3<String, Integer, Integer>> ctx) throws Exception {
  48. int bound = 50;
  49. String[] keys = new String[]{"foo@xxyz", "bar", "baz"};
  50. final long numElements = RandomUtils.nextLong(10, 20);
  51. int i = 0;
  52. while (running && i < numElements) {
  53. Thread.sleep(RandomUtils.nextLong(1, 5) * 1000L);
  54. Tuple3 data = new Tuple3<>(keys[RandomUtils.nextInt(0, 3)], RandomUtils.nextInt(0, bound), RandomUtils.nextInt(0, bound));
  55. ctx.collect(data);
  56. System.out.println(Thread.currentThread().getId() + "-sand data:" + data);
  57. i++;
  58. }
  59. }
  60. @Override
  61. public void cancel() {
  62. running = false;
  63. }
  64. }
  65. }

输出结果

  1. 58-sand data:(foo,xxyz,44)
  2. 59-sand data:(foo,xxyz,47,41)
  3. Connect> (foo,47,41)
  4. Connect> (xxyz,47,41)
  5. 58-sand data:(foo,xxyz,0)
  6. 59-sand data:(baz,12,12)
  7. Connect> (baz,12,12)
  8. 58-sand data:(foo,xxyz,39)
  9. 59-sand data:(baz,23,27)
  10. Connect> (baz,23,27)

说明

  1. DataSource1模拟流的数据类型为Tuple2,DataSource2模拟流的数据类型为Tuple3
  2. 通过CoFlatMap将数据做一个类型格式统一

Iterate

定义

Transformation Description
DataStream → IterativeStream → DataStream Creates a “feedback” loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream.

说明

Iterate提供了一种流计算里面的类似递归方法

样例

代码

  1. public class IterateDemo {
  2. public static void main(String[] args) throws Exception{
  3. final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
  4. env.setParallelism(1);
  5. DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new DataSource());
  6. IterativeStream<Tuple2<String, Integer>> iteration = orangeStream.iterate(5000);
  7. DataStream<Tuple2<String, Integer>> iterationBody = iteration.map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
  8. @Override
  9. public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
  10. return new Tuple2<>(value.f0, value.f1-5);
  11. }
  12. });
  13. DataStream<Tuple2<String, Integer>> feedback = iterationBody.filter(new FilterFunction<Tuple2<String, Integer>>() {
  14. @Override
  15. public boolean filter(Tuple2<String, Integer> value) throws Exception {
  16. return value.f1 > 25;
  17. }
  18. });
  19. iteration.closeWith(feedback);
  20. DataStream<Tuple2<String, Integer>> output = iterationBody.filter(new FilterFunction<Tuple2<String, Integer>>() {
  21. @Override
  22. public boolean filter(Tuple2<String, Integer> value) throws Exception {
  23. return value.f1 <= 25;
  24. }
  25. });
  26. feedback.print("Iterate feedback");
  27. output.print("Iterate output");
  28. env.execute("Iterate Demo");
  29. }
  30. private static class DataSource extends RichParallelSourceFunction<Tuple2<String, Integer>> {
  31. private volatile boolean running = true;
  32. @Override
  33. public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
  34. int bound = 20;
  35. String[] keys = new String[]{"foo", "bar", "baz"};
  36. final long numElements = RandomUtils.nextLong(10, 20);
  37. int i = 0;
  38. while (running && i < numElements) {
  39. Thread.sleep(RandomUtils.nextLong(1, 5) * 1000L);
  40. if (i == 0){
  41. ctx.collect(new Tuple2<>(keys[RandomUtils.nextInt(0, 3)], 36));
  42. }else {
  43. Tuple2 data = new Tuple2<>(keys[RandomUtils.nextInt(0, 3)], RandomUtils.nextInt(10, bound));
  44. ctx.collect(data);
  45. System.out.println(Thread.currentThread().getId() + "-sand data:" + data);
  46. }
  47. i++;
  48. }
  49. }
  50. @Override
  51. public void cancel() {
  52. running = false;
  53. }
  54. }
  55. }

输出结果

  1. Iterate feedback> (bar,31)
  2. Iterate feedback> (bar,26)
  3. Iterate output> (bar,21)
  4. 59-sand data:(baz,11)
  5. Iterate output> (baz,6)
  6. 59-sand data:(foo,14)

说明

发送一条值是36的数据,通过map方法之后仍然大于25,进入到递归,继续执行map里面的方法,直到不满足递归条件后输出结果