AQS
AQS 的概述
AQS 全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
AQS 的特点是:
- 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
- getState:获取 state 状态
- setState:设置 state 状态
- compareAndSetState:CAS 机制设置 state 状态
- 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
- 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
- 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
-
AQS 实现不可重入锁示例
自定义同步器

// 自定义同步器class MySync extends AbstractQueuedSynchronizer {@Overrideprotected boolean tryAcquire(int arg) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}@Overrideprotected boolean tryRelease(int arg) {if (getState() == 0) {throw new IllegalMonitorStateException();}setExclusiveOwnerThread(null);setState(0);return true;}@Overrideprotected int tryAcquireShared(int arg) {return super.tryAcquireShared(arg);}@Overrideprotected boolean tryReleaseShared(int arg) {return super.tryReleaseShared(arg);}// 判断当前的线程是否已经获取到了锁,该线程是否正在独占资源@Overrideprotected boolean isHeldExclusively() {return getState() == 1;}@Overridepublic String toString() {return super.toString();}protected Condition newCondition() {return new ConditionObject();}}
自定义锁
// 使用自定义同步器实现自定义锁class MyLock implements Lock {MySync mySync = new MySync();// 尝试加锁,不成功进入等待队列@Overridepublic void lock() {mySync.acquire(1);}// 尝试加锁,不成功进入等待队列,可打断@Overridepublic void lockInterruptibly() throws InterruptedException {mySync.acquireInterruptibly(1);}// 尝试一次加锁,不成功返回,不进入队列@Overridepublic boolean tryLock() {return mySync.tryAcquire(1);}// 尝试加锁,不成功,进入等待队列,有时限@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return mySync.tryAcquireNanos(1, unit.toNanos(time));}// 释放锁@Overridepublic void unlock() {mySync.release(1);}// 生成条件变量@Overridepublic Condition newCondition() {return mySync.newCondition();}}
测试
public static void main(String[] args) {MyLock myLock = new MyLock();new Thread(new Runnable() {@Overridepublic void run() {myLock.lock();try {System.out.println("lock1()");Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("unLock1()");myLock.unlock();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {myLock.lock();try {System.out.println("lock2()");Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("unLock2()");myLock.unlock();}}}).start();}
ReentrantReadWriteLock
ReentrantReadWriteLock 是读写锁。
读写锁 - 读读可以并发,读 - 写互斥,写 - 写互斥。使用示例
提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法。 ```java public class Main { public static void main(String[] args) {
DataContainer dataContainer = new DataContainer();new Thread(() -> {try {dataContainer.read();} catch (InterruptedException e) {e.printStackTrace();}}, "t1").start();new Thread(() -> {try {dataContainer.read();} catch (InterruptedException e) {e.printStackTrace();}}, "t2").start();
} }
class DataContainer {
private int data;private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();private ReentrantReadWriteLock.ReadLock rLock = rw.readLock();private ReentrantReadWriteLock.WriteLock wLock = rw.writeLock();public int read() throws InterruptedException {System.out.println("获取读锁");rLock.lock();try {System.out.println("读取数据");Thread.sleep(1000);return data;} finally {System.out.println("释放读锁");rLock.unlock();}}public void write() throws InterruptedException {System.out.printf("获取写锁");wLock.lock();try {System.out.println("写入数据");Thread.sleep(1000);} finally {System.out.printf("", "释放写锁");wLock.unlock();}}
}
<a name="XN7mG"></a>## 注意事项读锁不支持条件变量<br />重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待<br />即支持锁降级,但不支持锁升级。<a name="bX4Wc"></a># SemaphoreSemaphore 信号量,可以用来限制能同时访问共享资源的线程上限。```javapublic class Main {public static void main(String[] args) {Semaphore semaphore = new Semaphore(3);// 实现的效果:一次最多只能有 3 个线程打印 runningfor (int i = 0; i < 10; i++) {new Thread(new Runnable() {@Overridepublic void run() {try {// 获得许可,可用许可数量 - 1semaphore.acquire();System.out.println("running");Thread.sleep(10000);System.out.println("end");} catch (InterruptedException e) {e.printStackTrace();} finally {// 释放许可,可用许可数量 + 1semaphore.release();}}}).start();}}}
CountdownLatch
CountdownLatch 可以用来进行线程同步协作,等待所有线程完成倒计时。
public class Main {public static void main(String[] args) throws InterruptedException {// 设置计数值CountDownLatch countDownLatch = new CountDownLatch(3);for (int i = 0; i < 3; i++) {new Thread(new Runnable() {@Overridepublic void run() {try {System.out.println("running");Thread.sleep(1000);System.out.println("end");// 让计数值 - 1countDownLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}}}).start();}// 等待计数值为 0 再继续执行后面的代码countDownLatch.await();System.out.println("主线程执行");}}
CyclicBarrier
[ˈsaɪklɪk ˈbæriɚ] CyclicBarrier 循环栅栏,用来进行线程协作,等待线程满足某个计数。
构造时设置『计数值』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当计数值达到设置值时,继续执行。
public class Main {public static void main(String[] args) throws InterruptedException {CyclicBarrier cyclicBarrier = new CyclicBarrier(3);for (int i = 0; i < 6; i++) {new Thread(new Runnable() {@Overridepublic void run() {try {// 等待计数值达到设定值再继续执行后面的代码// 并且可以循环多次使用cyclicBarrier.await();System.out.println("running");Thread.sleep(1000);System.out.println("end");} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}).start();}}}
