Java Pipeline

目标

通过提供初始输入并传递处理后的输出以供下一阶段使用,从而允许在一系列阶段中进行数据处理。

解释

Pipeline模式为管道模式,也称为流水线模式。通过预先设定好的一系列的阶段来处理输入的数据,每个阶段的输出即是下一个阶段的输入。
模型图如下:
2021-05-06-21-13-32-976643.png
pipeline模式
从图中可以看出,整个流水线内数据流转是从上游到下游,上游的输出是下游的输入,按阶段依次执行。
Source: 表示数据来源,比如:KafkaSource。
Channel:表示对数据进行处理的组件,比如:JsonChannel,对数据进行json转换和处理。
Sink:表示数据落地或下沉的地方,比如:KafkaSink,表示数据发送到指定的kafka;DbSInk表示数据落地到DB。
可以看出,Pipeline是由Source(必须有),Channel(不一定需要),Sink(必须有)三种类型的组件自由组合而成的。

代码示例

  1. /**
  2. * 生命周期
  3. */
  4. public interface LifeCycle {
  5. /**
  6. * 初始化
  7. * @param config
  8. */
  9. void init(String config);
  10. /**
  11. * 启动
  12. */
  13. void startup();
  14. /**
  15. * 结束
  16. */
  17. void shutdown();
  18. }
  1. /**
  2. * 组件
  3. */
  4. public interface Component<T> extends LifeCycle {
  5. /**
  6. * 组件名称
  7. * @return
  8. */
  9. String getName();
  10. /**
  11. * 获取下游组件
  12. * @return
  13. */
  14. Collection<Component> getDownStrems();
  15. /**
  16. * 执行
  17. */
  18. void execute(T o);
  19. }
  1. /**
  2. * 组件抽象实现
  3. * @param <T> 输入
  4. * @param <R> 输出
  5. */
  6. public abstract class AbstractComponent<T, R> implements Component<T>{
  7. @Override
  8. public void execute(T o) {
  9. // 当前组件执行
  10. R r = doExecute(o);
  11. System.out.println(getName() + " receive " + o + " return " + r);
  12. // 获取下游组件,并执行
  13. Collection<Component> downStreams = getDownStrems();
  14. if (!CollectionUtils.isEmpty(downStreams)) {
  15. downStreams.forEach(c -> c.execute(r));
  16. }
  17. }
  18. /**
  19. * 具体组件执行处理
  20. * @param o 传入的数据
  21. * @return
  22. */
  23. protected abstract R doExecute(T o);
  24. @Override
  25. public void startup() {
  26. // 下游 -> 上游 依次启动
  27. Collection<Component> downStreams = getDownStrems();
  28. if (!CollectionUtils.isEmpty(downStreams)) {
  29. downStreams.forEach(Component::startup);
  30. }
  31. // do startup
  32. System.out.println("--------- " + getName() + " is start --------- ");
  33. }
  34. @Override
  35. public void shutdown() {
  36. // 上游 -> 下游 依次关闭
  37. // do shutdown
  38. System.out.println("--------- " + getName() + " is shutdown --------- ");
  39. Collection<Component> downStreams = getDownStrems();
  40. if (!CollectionUtils.isEmpty(downStreams)) {
  41. downStreams.forEach(Component::shutdown);
  42. }
  43. }
  44. }
  1. /**
  2. * 数据来源
  3. */
  4. public abstract class Source<T, R> extends AbstractComponent<T, R>{
  5. }
  1. /**
  2. * 数据处理
  3. */
  4. public abstract class Channel<T, R> extends AbstractComponent<T, R> {
  5. }
  1. /**
  2. * 数据落地/下沉
  3. */
  4. public abstract class Sink<T, R> extends AbstractComponent<T, R> {
  5. }

上面封装了基本的组件实现,下面扩展一下具体的实现,用一个简单的例子说明:
IntegerSource -> IncrChannel -> StringChannel -> ConsoleSink
从上面组件名称和方向可以判断出来要做的流水线是什么,大概过程如:
输入一个数字 -> 数字+1 -> 转为字符串 -> 控制台输出
那么开始来实现这个过程吧。

  1. /**
  2. * 来源
  3. */
  4. public class IntegerSource extends Source<Integer, Integer> {
  5. private int val = 0;
  6. @Override
  7. protected Integer doExecute(Integer o) {
  8. return o;
  9. }
  10. @Override
  11. public void init(String config) {
  12. System.out.println("--------- " + getName() + " init --------- ");
  13. val = 1;
  14. }
  15. @Override
  16. public void startup() {
  17. super.startup();
  18. execute(val);
  19. }
  20. @Override
  21. public String getName() {
  22. return "Integer-Source";
  23. }
  24. @Override
  25. public Collection<Component> getDownStrems() {
  26. return Collections.singletonList(new IncrChannel());
  27. }
  28. }
  1. /**
  2. * 处理:数字+1
  3. */
  4. public class IncrChannel extends Channel<Integer, Integer> {
  5. @Override
  6. protected Integer doExecute(Integer o) {
  7. return o + 1;
  8. }
  9. @Override
  10. public String getName() {
  11. return "Incr-Channel";
  12. }
  13. @Override
  14. public Collection<Component> getDownStrems() {
  15. return Collections.singletonList(new StringChannel());
  16. }
  17. @Override
  18. public void init(String config) {
  19. }
  20. }
  1. /**
  2. * 处理:转为字符串
  3. */
  4. public class StringChannel extends Channel<Integer, String> {
  5. @Override
  6. protected String doExecute(Integer o) {
  7. return "str" + o;
  8. }
  9. @Override
  10. public String getName() {
  11. return "String-Channel";
  12. }
  13. @Override
  14. public Collection<Component> getDownStrems() {
  15. return Collections.singletonList(new ConsoleSink());
  16. }
  17. @Override
  18. public void init(String config) {
  19. }
  20. }
  1. /**
  2. * 控制台
  3. */
  4. public class ConsoleSink extends Sink<String, Void> {
  5. @Override
  6. protected Void doExecute(String o) {
  7. return null;
  8. }
  9. @Override
  10. public String getName() {
  11. return "Console-Sink";
  12. }
  13. @Override
  14. public Collection<Component> getDownStrems() {
  15. return null;
  16. }
  17. @Override
  18. public void init(String config) {
  19. }
  20. }

扩展实现已完成,整个流水线基本已设置好,来测试一下吧

  1. /**
  2. * 流水线
  3. */
  4. public class Pipeline implements LifeCycle{
  5. /**
  6. * 数据源
  7. */
  8. private Source source;
  9. public Pipeline(Source source) {
  10. this.source = source;
  11. }
  12. @Override
  13. public void init(String config) {
  14. // 初始化
  15. System.out.println("--------- Pipeline init --------- ");
  16. source.init(null);
  17. }
  18. @Override
  19. public void startup() {
  20. // 启动
  21. System.out.println("--------- Pipeline startup --------- ");
  22. source.startup();
  23. }
  24. @Override
  25. public void shutdown() {
  26. // 结束
  27. source.shutdown();
  28. System.out.println("--------- Pipeline shutdown --------- ");
  29. }
  30. }
  31. Pipeline pipeline = new Pipeline(new IntegerSource());
  32. pipeline.init(null);
  33. pipeline.startup();
  34. pipeline.shutdown();

执行后结果如下:

  1. --------- Pipeline init ---------
  2. --------- Integer-Source init ---------
  3. --------- Pipeline startup ---------
  4. --------- Console-Sink is start ---------
  5. --------- String-Channel is start ---------
  6. --------- Incr-Channel is start ---------
  7. --------- Integer-Source is start ---------
  8. Integer-Source receive 1 return 1
  9. Incr-Channel receive 1 return 2
  10. String-Channel receive 2 return str2
  11. Console-Sink receive str2 return null
  12. --------- Integer-Source is shutdown ---------
  13. --------- Incr-Channel is shutdown ---------
  14. --------- String-Channel is shutdown ---------
  15. --------- Console-Sink is shutdown ---------
  16. --------- Pipeline shutdown ---------