工作没来就一直等待,工作来了就干活。

    突然想起了当年在外面去富士康工作的经验,流水线作业。一根皮带,贯穿南北,我负责安装相机皮圈,来一个手机我就安一个,没来,我就等待着,和工友扯皮,当然来了一样可以扯皮。

    那么我们就先来个一个车间,车间里可以容纳 N 个工人,外面会一直有工作不定时的发布。然后工人拿到这个工作,开始工作。

    其实我们会发现这个和生产者消费者的模式很形似,只不过针对的对象不同,只需要针对客户线程,而工人线程怎么做我们就不必关系。只关心任务的发布即可。消费者和生成者则是两端都要关心。本质是一致的,对外的表现是不同的。

    1. public class Channel {
    2. private final int MAX_REQUEST = 10;
    3. private WorkThread[] threadPool;
    4. private Request[] requests;
    5. private int count;//工作线程的数量
    6. private int tail;//下次放的位置
    7. private int head;//下次拿的位置
    8. public Channel(int threadNum) {
    9. threadPool = new WorkThread[threadNum];
    10. requests = new Request[MAX_REQUEST];
    11. for (int i = 0; i < threadPool.length; i++) {
    12. threadPool[i] = new WorkThread("work-" + i, this);
    13. }
    14. }
    15. public synchronized void putRequest(Request request) {
    16. while (count >= requests.length) {
    17. try {
    18. wait();
    19. } catch (InterruptedException e) {
    20. e.printStackTrace();
    21. }
    22. }
    23. requests[tail] = request;
    24. tail = (tail + 1) % MAX_REQUEST;
    25. count++;
    26. notifyAll();
    27. }
    28. public synchronized Request poll() {
    29. while (count <= 0) {
    30. try {
    31. wait();
    32. } catch (InterruptedException e) {
    33. e.printStackTrace();
    34. }
    35. }
    36. Request request = requests[head];
    37. head = (head + 1) % MAX_REQUEST;
    38. count--;
    39. notifyAll();
    40. return request;
    41. }
    42. public void startWorks() {
    43. for (int i = 0; i < threadPool.length; i++) {
    44. threadPool[i].start();
    45. }
    46. }
    47. }
    1. public class Request {
    2. private String name;
    3. private int number;
    4. private Random random = new Random();
    5. public Request(String name, int number) {
    6. this.name = name;
    7. this.number = number;
    8. }
    9. public void exec() {
    10. System.out.println(Thread.currentThread().getName()+" exec:" + name + "--" + number);
    11. try {
    12. Thread.sleep(random.nextInt(1000));
    13. } catch (InterruptedException e) {
    14. e.printStackTrace();
    15. }
    16. }
    17. }
    1. public class ClientThread extends Thread {
    2. private Channel channel;
    3. private String name;
    4. private int number;
    5. private Random random = new Random();
    6. public ClientThread(String name, Channel channel) {
    7. this.channel = channel;
    8. this.name = name;
    9. }
    10. @Override
    11. public void run() {
    12. super.run();
    13. while (true) {
    14. channel.putRequest(new Request(name, number++));
    15. try {
    16. Thread.sleep(random.nextInt(1000));
    17. } catch (InterruptedException e) {
    18. e.printStackTrace();
    19. }
    20. }
    21. }
    22. }
    1. public class WorkThread extends Thread {
    2. private final Channel channel;
    3. public WorkThread(String name, Channel channel) {
    4. super(name);
    5. this.channel = channel;
    6. }
    7. @Override
    8. public void run() {
    9. super.run();
    10. while (true) {
    11. Request request = channel.poll();
    12. request.exec();
    13. }
    14. }
    15. }
    1. public class Main {
    2. public static void main(String[] args){
    3. Channel channel = new Channel(5);
    4. channel.startWorks();
    5. new ClientThread("AAA",channel).start();
    6. new ClientThread("BBB",channel).start();
    7. new ClientThread("CCC",channel).start();
    8. }
    9. }

    运行结果:
    image.png