AQS

AQS 的概述

AQS 全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架

AQS 的特点是:

  • 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
    • getState:获取 state 状态
    • setState:设置 state 状态
    • compareAndSetState:CAS 机制设置 state 状态
    • 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
  • 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
  • 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

    AQS 实现不可重入锁示例

    自定义同步器

    image.png

    1. // 自定义同步器
    2. class MySync extends AbstractQueuedSynchronizer {
    3. @Override
    4. protected boolean tryAcquire(int arg) {
    5. if (compareAndSetState(0, 1)) {
    6. setExclusiveOwnerThread(Thread.currentThread());
    7. return true;
    8. }
    9. return false;
    10. }
    11. @Override
    12. protected boolean tryRelease(int arg) {
    13. if (getState() == 0) {
    14. throw new IllegalMonitorStateException();
    15. }
    16. setExclusiveOwnerThread(null);
    17. setState(0);
    18. return true;
    19. }
    20. @Override
    21. protected int tryAcquireShared(int arg) {
    22. return super.tryAcquireShared(arg);
    23. }
    24. @Override
    25. protected boolean tryReleaseShared(int arg) {
    26. return super.tryReleaseShared(arg);
    27. }
    28. // 判断当前的线程是否已经获取到了锁,该线程是否正在独占资源
    29. @Override
    30. protected boolean isHeldExclusively() {
    31. return getState() == 1;
    32. }
    33. @Override
    34. public String toString() {
    35. return super.toString();
    36. }
    37. protected Condition newCondition() {
    38. return new ConditionObject();
    39. }
    40. }

    自定义锁

    1. // 使用自定义同步器实现自定义锁
    2. class MyLock implements Lock {
    3. MySync mySync = new MySync();
    4. // 尝试加锁,不成功进入等待队列
    5. @Override
    6. public void lock() {
    7. mySync.acquire(1);
    8. }
    9. // 尝试加锁,不成功进入等待队列,可打断
    10. @Override
    11. public void lockInterruptibly() throws InterruptedException {
    12. mySync.acquireInterruptibly(1);
    13. }
    14. // 尝试一次加锁,不成功返回,不进入队列
    15. @Override
    16. public boolean tryLock() {
    17. return mySync.tryAcquire(1);
    18. }
    19. // 尝试加锁,不成功,进入等待队列,有时限
    20. @Override
    21. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    22. return mySync.tryAcquireNanos(1, unit.toNanos(time));
    23. }
    24. // 释放锁
    25. @Override
    26. public void unlock() {
    27. mySync.release(1);
    28. }
    29. // 生成条件变量
    30. @Override
    31. public Condition newCondition() {
    32. return mySync.newCondition();
    33. }
    34. }

    测试

    1. public static void main(String[] args) {
    2. MyLock myLock = new MyLock();
    3. new Thread(new Runnable() {
    4. @Override
    5. public void run() {
    6. myLock.lock();
    7. try {
    8. System.out.println("lock1()");
    9. Thread.sleep(1000);
    10. } catch (InterruptedException e) {
    11. e.printStackTrace();
    12. } finally {
    13. System.out.println("unLock1()");
    14. myLock.unlock();
    15. }
    16. }
    17. }).start();
    18. new Thread(new Runnable() {
    19. @Override
    20. public void run() {
    21. myLock.lock();
    22. try {
    23. System.out.println("lock2()");
    24. Thread.sleep(500);
    25. } catch (InterruptedException e) {
    26. e.printStackTrace();
    27. } finally {
    28. System.out.println("unLock2()");
    29. myLock.unlock();
    30. }
    31. }
    32. }).start();
    33. }

    ReentrantReadWriteLock

    ReentrantReadWriteLock 是读写锁。
    读写锁 - 读读可以并发,读 - 写互斥,写 - 写互斥。

    使用示例

    提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法。 ```java public class Main { public static void main(String[] args) {

    1. DataContainer dataContainer = new DataContainer();
    2. new Thread(() -> {
    3. try {
    4. dataContainer.read();
    5. } catch (InterruptedException e) {
    6. e.printStackTrace();
    7. }
    8. }, "t1").start();
    9. new Thread(() -> {
    10. try {
    11. dataContainer.read();
    12. } catch (InterruptedException e) {
    13. e.printStackTrace();
    14. }
    15. }, "t2").start();

    } }

class DataContainer {

  1. private int data;
  2. private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
  3. private ReentrantReadWriteLock.ReadLock rLock = rw.readLock();
  4. private ReentrantReadWriteLock.WriteLock wLock = rw.writeLock();
  5. public int read() throws InterruptedException {
  6. System.out.println("获取读锁");
  7. rLock.lock();
  8. try {
  9. System.out.println("读取数据");
  10. Thread.sleep(1000);
  11. return data;
  12. } finally {
  13. System.out.println("释放读锁");
  14. rLock.unlock();
  15. }
  16. }
  17. public void write() throws InterruptedException {
  18. System.out.printf("获取写锁");
  19. wLock.lock();
  20. try {
  21. System.out.println("写入数据");
  22. Thread.sleep(1000);
  23. } finally {
  24. System.out.printf("", "释放写锁");
  25. wLock.unlock();
  26. }
  27. }

}

  1. <a name="XN7mG"></a>
  2. ## 注意事项
  3. 读锁不支持条件变量<br />重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待<br />即支持锁降级,但不支持锁升级。
  4. <a name="bX4Wc"></a>
  5. # Semaphore
  6. Semaphore 信号量,可以用来限制能同时访问共享资源的线程上限。
  7. ```java
  8. public class Main {
  9. public static void main(String[] args) {
  10. Semaphore semaphore = new Semaphore(3);
  11. // 实现的效果:一次最多只能有 3 个线程打印 running
  12. for (int i = 0; i < 10; i++) {
  13. new Thread(new Runnable() {
  14. @Override
  15. public void run() {
  16. try {
  17. // 获得许可,可用许可数量 - 1
  18. semaphore.acquire();
  19. System.out.println("running");
  20. Thread.sleep(10000);
  21. System.out.println("end");
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. } finally {
  25. // 释放许可,可用许可数量 + 1
  26. semaphore.release();
  27. }
  28. }
  29. }).start();
  30. }
  31. }
  32. }

CountdownLatch

CountdownLatch 可以用来进行线程同步协作,等待所有线程完成倒计时。

  1. public class Main {
  2. public static void main(String[] args) throws InterruptedException {
  3. // 设置计数值
  4. CountDownLatch countDownLatch = new CountDownLatch(3);
  5. for (int i = 0; i < 3; i++) {
  6. new Thread(new Runnable() {
  7. @Override
  8. public void run() {
  9. try {
  10. System.out.println("running");
  11. Thread.sleep(1000);
  12. System.out.println("end");
  13. // 让计数值 - 1
  14. countDownLatch.countDown();
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }).start();
  20. }
  21. // 等待计数值为 0 再继续执行后面的代码
  22. countDownLatch.await();
  23. System.out.println("主线程执行");
  24. }
  25. }

CyclicBarrier

[ˈsaɪklɪk ˈbæriɚ] CyclicBarrier 循环栅栏,用来进行线程协作,等待线程满足某个计数。
构造时设置『计数值』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当计数值达到设置值时,继续执行。

  1. public class Main {
  2. public static void main(String[] args) throws InterruptedException {
  3. CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
  4. for (int i = 0; i < 6; i++) {
  5. new Thread(new Runnable() {
  6. @Override
  7. public void run() {
  8. try {
  9. // 等待计数值达到设定值再继续执行后面的代码
  10. // 并且可以循环多次使用
  11. cyclicBarrier.await();
  12. System.out.println("running");
  13. Thread.sleep(1000);
  14. System.out.println("end");
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. } catch (BrokenBarrierException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. }).start();
  22. }
  23. }
  24. }

ConcurrentHashMap