Condition的设计目的


image.png :::tips Condition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods.
Condition 将 Object监视器(monitor)中 的方法(waitnotifynotifyAll)分解成不同的java对象,可以将这些对象与任意 Lock 接口的实现组合起来,为每一个对象提供多个等待集合(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了Object监视器方法的使用。 :::

  • LockCondition出现的目的是实现类似synchornized中的wait()和notify()机制,以此来实现多个线程之间的协调与通信。
  • 使用Condition,整个过程都是由开发者来控制的,相比于synchornized的传统方式,更加灵活,功能也更加强大。

Condition的使用

  • Condition必须绑定一个Lock实例来使用
  • 调用Condition.await()方法将会在wait之前自动释放锁,在wait返回之前会重新尝试获取锁。
  • AQS 是Condition的一个重要实现类。

    await

    1. class BoundedBuffer {
    2. final Lock lock = new ReentrantLock();
    3. //多个等待集合
    4. final Condition notFull = lock.newCondition();
    5. final Condition notEmpty = lock.newCondition();
    6. final Object[] items = new Object[100];
    7. int putptr, takeptr, count;
    8. public void put(Object x) throws InterruptedException {
    9. lock.lock();
    10. try {
    11. while (count == items.length)
    12. notFull.await();
    13. items[putptr] = x;
    14. if (++putptr == items.length) putptr = 0;
    15. ++count;
    16. notEmpty.signal();
    17. } finally {
    18. lock.unlock();
    19. }
    20. }
    21. public Object take() throws InterruptedException {
    22. lock.lock();
    23. try {
    24. while (count == 0)
    25. notEmpty.await();
    26. Object x = items[takeptr];
    27. if (++takeptr == items.length) takeptr = 0;
    28. --count;
    29. notFull.signal();
    30. return x;
    31. } finally {
    32. lock.unlock();
    33. }
    34. }
    35. }

    awaitUninterruptibly

    使用condition.awaitUninterruptibly()后,调用thread.interrupt(0则不会报错

awaitNanos

该方法返回了所剩毫微秒数的一个估计值,以等待所提供的 nanosTimeout 值的时间,如果超时,则返回一个小于等于 0 的值。可以用此值来确定在等待返回但某一等待条件仍不具备的情况下,是否要再次等待,以及再次等待的时间。

  1. synchronized boolean aMethod(long timeout, TimeUnit unit) {
  2. long nanosTimeout = unit.toNanos(timeout);
  3. while (!conditionBeingWaitedFor) {
  4. if (nanosTimeout > 0)
  5. nanosTimeout = theCondition.awaitNanos(nanosTimeout);
  6. else
  7. return false;
  8. }
  9. // ...
  10. }

signal

唤醒一个等待线程。
AQS 链表中的第一个元素

signalAll

唤醒所有等待线程。

package com.shengsiyuan.concurrency4;

/*
    传统上,我们可以通过synchronized关键字 + wait + notify/notifyAll 来实现多个线程之间的协调与通信,整个过程都是由JVM来帮助
    我们实现的;开发者无需(也是无法)了解底层的实现细节

    从JDK 5开始,并发包提供了Lock, Condition(await与signal/signalAll)来实现多个线程之间的协调与通信,

    Thread.sleep与await(或是Object的wait方法)的本质区别:sleep方法本质上不会释放锁,而await会释放锁,并且在signal后,还需要
    重新获得锁才能继续执行(该行为与Object的wait方法完全一致)
 */


import java.util.Arrays;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;

public class MyTest2 {
    public static void main(String[] args) {
        BoundedContainer boundedContainer = new BoundedContainer();

        IntStream.range(0, 10).forEach(i -> new Thread(() -> {
            try {
                boundedContainer.put("hello");
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        }).start());

        IntStream.range(0, 10).forEach(i -> new Thread(() -> {
            try {
                boundedContainer.take();
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        }).start());
    }
}


class BoundedContainer {

    private String[] elements = new String[10];

    private Lock lock = new ReentrantLock();

    private Condition notEmptyCondition = lock.newCondition();

    private Condition notFullCondition = lock.newCondition();

    private int elementCount; // elements数组中已有的元素数量

    private int putIndex;

    private int takeIndex;


    public void put(String element) throws InterruptedException {
        this.lock.lock();

        try {
            while (this.elementCount == this.elements.length) {
                notFullCondition.await();
            }

            elements[putIndex] = element;

            if (++putIndex == this.elements.length) {
                putIndex = 0;
            }

            ++elementCount;

            System.out.println("put method: " + Arrays.toString(elements));

            notEmptyCondition.signal();
        } finally {
            this.lock.unlock();
        }
    }

    public String take() throws InterruptedException {
        this.lock.lock();

        try {
            while (0 == this.elementCount) {
                notEmptyCondition.await();
            }

            String element = elements[takeIndex];

            elements[takeIndex] = null;

            if (++takeIndex == this.elements.length) {
                takeIndex = 0;
            }

            --elementCount;

            System.out.println("take method: " + Arrays.toString(elements));

            notFullCondition.signal();

            return element;

        } finally {
            this.lock.unlock();
        }
    }
}