1.什么是Worker Thread 设计模式

Worker-Thread模式有时也称为流水线设计模式,这种设计模式类似于工厂流水线,上游工作人员完成了某个电子产品的组装之后,将半成品放到流水线传送带上,接下来的加工工作则会交给下游的工人,如图所示。
image.png

2.Worker Thread模式实现

根据我们前面的描述可以看出Worker-Thread模式需要有如下几个角色。

  • 流水线工人:流水线工人主要用来对传送带上的产品进行加工。
  • 流水线传送带:用于传送来自上游的产品。
  • 产品组装说明书:用来说明该产品如何组装。

2.1 产品及组装说明书

  1. // 在流水线上需要被加工的产品,create作为一个模版方法,提供了加工产品的说明书
  2. public abstract class InstructionBook {
  3. public final void create() {
  4. this.firstProcess();
  5. this.secondProcess();
  6. }
  7. protected abstract void firstProcess();
  8. protected abstract void secondProcess();
  9. }

抽象类InstructionBook,代表着组装产品的说明书,其中经过流水线传送带的产品将通过create() 方法进行加工,而firstProcess() 和secondProcess() 则代表着加工每个产品的步骤,这就是说明书的作用。
传送带上的产品除了说明书以外还需要有产品自身,产品继承了说明书,每个产品都有产品编号, 通用Production的代码如清单所示。

  1. public class Production extends InstructionBook {
  2. // 产品编号
  3. private final int proID;
  4. public Production(int proID) {
  5. this.proID = proID;
  6. }
  7. @Override
  8. protected void firstProcess() {
  9. System.out.println("execute the " + proID + " first process");
  10. }
  11. @Override
  12. protected void secondProcess() {
  13. System.out.println("execute the " + proID + " second process");
  14. }
  15. }

2.2流水线传送带

流水线的传送带主要用于传送待加工的产品,上游的工作人员将完成的半成品放到传送带上, 工作人员从传送带上取下产品进行再次加工, 传送带Production Channel的代码所示。

  1. // 产品传送带, 在传送带上除了负责产品加工的工人之外,还有在传送带上等待加工的产品
  2. public class ProductionChannel {
  3. // 传送带上最多可以有多少个待加工的产品
  4. private final static int MAX_PROD = 100;
  5. // 主要用来存放待加工的产品,也就是传送带
  6. private final Production[] productionQueue;
  7. // 队列尾
  8. private int tail;
  9. // 队列头
  10. private int head;
  11. // 当前在流水线上有多少个待加工的产品
  12. private int total;
  13. // 在流水线上工作的工人
  14. private final Worker[] workers;
  15. // 创建ProductionChannel时应指需要多少个流水线工人
  16. public ProductionChannel(int workerSize) {
  17. this.workers = new Worker[workerSize];
  18. this.productionQueue = new Production[MAX_PROD];
  19. // 实例化每一个工人(Worker线程)并且启动
  20. for(int i = 0; i < workerSize; i++) {
  21. workers[i] = new Worker("Worker-" + i, this);
  22. workers[i].start();
  23. }
  24. }
  25. // 接受来自上游的半成品(待加工的产品)
  26. public void offerProduction(Production production) {
  27. synchronized (this) {
  28. //当传送带上待加工的产品超过了最大值时需要阻塞上游再次传送产品
  29. while( total >= productionQueue.length ) {
  30. try {
  31. this.wait();
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. }
  36. // 将产品放到传送带,并且通知工人线程工作
  37. productionQueue[tail] = production;
  38. tail = (tail + 1) % productionQueue.length;
  39. total++;
  40. this.notifyAll();
  41. }
  42. }
  43. // 工人线程(Worker)从传送带上获取产品,并且进行加工
  44. public Production takeProductino() {
  45. synchronized (this) {
  46. //当传送带上没有产品时,工人等待着产品从上游传送到传送带上
  47. while(total <= 0 ) {
  48. try {
  49. this.wait();
  50. } catch (InterruptedException e) {
  51. e.printStackTrace();
  52. }
  53. }
  54. // 获取产品
  55. Production prod = productionQueue[head];
  56. head = (head+1) % productionQueue.length;
  57. total--;
  58. this.notifyAll();
  59. return prod;
  60. }
  61. }
  62. }

传送带是搁置产品的地方,如果工人们处理比较慢则会导致无限制的产品积压,因此我们需要做的是让上游的流水线阻塞并等待,直至流水线有位置可以用于放置新的产品为止,MAX_PROD的作用就在于此,其用于控制传送带的最大容量,传送带被创建的同时,流水线上的工人们也已经就绪到位,等待着流水线上产品的到来。

2.3流水线工人

流水线工人是Thread的子类,不断地从流水线上提取产品然后进行再次加工,加工的方法是create() (对该产品的加工方法说明书) ,流水线工人示例代码如所示。

  1. import java.util.Random;
  2. import java.util.concurrent.TimeUnit;
  3. public class Worker extends Thread{
  4. private final ProductionChannel channel;
  5. //主要用于获取一个随机值,模拟加工一个产品需要耗费一定的时间,当然每个工人操作时所花费的时间可也能不一样
  6. private final static Random random = new Random(System.currentTimeMillis());
  7. public Worker(String workerName, ProductionChannel channel) {
  8. super(workerName);
  9. this.channel = channel;
  10. }
  11. @Override
  12. public void run() {
  13. while(true) {
  14. try {
  15. // 从传送带上获取产品
  16. Production production = channel.takeProductino();
  17. System.out.println(getName() + " process the " + production);
  18. production.create();
  19. TimeUnit.SECONDS.sleep(random.nextInt(10));
  20. }catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. }
  25. }

2.4 流水线测试

  1. import java.util.concurrent.TimeUnit;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. import java.util.stream.IntStream;
  4. import static java.util.concurrent.ThreadLocalRandom.current;
  5. public class Test {
  6. public static void main(String[] args) {
  7. // 流水线上有5个工人
  8. final ProductionChannel channel = new ProductionChannel(5);
  9. AtomicInteger productionNo = new AtomicInteger();
  10. // 流水线上有8个工作人员往传送带上不断地放置等待加工的半成品
  11. IntStream.range(1,8).forEach(i -> new Thread(
  12. () -> {
  13. while(true) {
  14. channel.offerProduction(new Production(productionNo.getAndIncrement()));
  15. try {
  16. TimeUnit.SECONDS.sleep(current().nextInt(10));
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. }
  22. ).start());
  23. }
  24. }

2.5 Worker-Thread和Producer-Consumer

(1) Producer-Consumer模式

图是生产者消费者模式关键角色之间的关系图。
image.png

首先Producer、Consumer对Queue都是依赖关系, 其次Producer要做的就是不断地往Queue中生产数据, 而Consumer则是不断地从Queue中获取数据,Queue既不知道Producer的存在也不知道Consumer的存在,最后Consumer对Queue中数据的消费并不依赖于数据本身的方法(使用说明书)。

(2) Worker-Thread模式

图是Worker-Thread模式关键角色之间的关系图。
左侧的线程, 也就是传送带上游的线程, 同样在不断地往传送带(Queue) 中生产数据,而当Channel被启动的时候, 就会同时创建并启动若干数量的Worker线程, 因此我们可以看出Worker于Channel来说并不是单纯的依赖关系, 而是聚合关系, Channel必须知道Worker的存在。
image.png