ReentrantLock(重入锁)
基本使用
Lock lock = new ReentrantLock();lock.lock();lock.unlock();
原理
Sync extends AbstractQueuedSynchronizer{}class FairSync(公平锁) extends Sync{}NonfairSync(非公平锁) extends Sync{}
AbstractQueuedSynchronizer提供了两种锁
1.独占锁
2.共享锁
state = 0 表示当前资源空闲
state>0 表示当前资源抢占到锁还没释放

1.线程A 判断 state 状态为 0 获得锁 并且将 state 设置成了1 并且将自己设置为获得锁的线程
2.线程B 在去判断state状态为 1 获取不到锁 然后会构造一个阻塞队列 双向链表结构 从尾部插入 如果没有头节点则构建一个头节点 然后将当前节点的pred 指向头节点 将头节点的 next 指向当前节点 并进行自旋 尝试去获取锁 如果通过一次自旋获取不到锁 则进行阻塞 线程C
3.线程C也去判断state 状态是1 获取不到锁 然后将自己从尾部插入阻塞队列 将C节点的前一个pred 指向B节点 B节点的next指向C节点 并自旋
4.线程A释放锁 并将state-1 当state 成0 将 当前获取锁的线程设置为null 然后会去唤醒处于阻塞队列中头节点的下一个节点 此时如果是公平锁的话 B节点会获取到锁 但是如果是非公平锁的情况下 如果此时有线程D抢占到锁 则线程B还是依然会在阻塞队列中等待
加锁
public void lock() {sync.lock();}
final void lock() {//cas 原子操作修改state 的值 如果能修改成功则把0变成1 然后记录当前线程idif (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());else//抢占锁逻辑acquire(1);}
public final void acquire(int arg) {//尝试获取独占锁if (!tryAcquire(arg) &&//如果失败则假如aqs 队列中acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
非公平
seata +1
final boolean nonfairTryAcquire(int acquires) {//获取当前线程final Thread current = Thread.currentThread();、//拿到 State 值int c = getState();//如果是0 表示可以去获得锁if (c == 0) {//cas 原子操作修改state 的值 如果能修改成功则把0变成1 然后记录当前线程idif (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}//如果当前线程就是获得锁的线程else if (current == getExclusiveOwnerThread()) {//增加重入次数int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
构建链表
private Node addWaiter(Node mode) {//构建一个nodeNode node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failure// tail = 尾节点 默认是nullNode pred = tail;if (pred != null) {//如果尾节点不等于空 把当前节点当成尾节点 然后把prev指针指向上一个节点 把新进来的节点改成尾节点node.prev = pred;if (compareAndSetTail(pred, node)) {//把上一个节点的next 指针指向刚进来的节点pred.next = node;return node;}}enq(node);return node;}
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initialize//如果尾节点 = = null 用cas 构建一个节点if (compareAndSetHead(new Node()))//把头节点赋值给尾节点tail = head;} else {//如果尾节点不等于空 把当前节点当成尾节点 然后把prev指针指向上一个节点 把新进来的节点改成尾节点node.prev = t;if (compareAndSetTail(t, node)) {//把上一个节点的next 指针指向刚进来的节点t.next = node;return t;}}}}
加入阻塞队列
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {//获取当前节点的前一个节点final Node p = node.predecessor();//如果当前节点的前一个结点是头节点 则说明有资格去争夺锁if (p == head && tryAcquire(arg)) {//把当前节点设置成头节点setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {//前置节点的 waitStatusint ws = pred.waitStatus;//等待其他前直接点的线程被释放if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*///说明可以挂起return true;// 大于0 说明prev节点取消了排队 直接移除这个节点if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/设置prev 节点状态为-1compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
阻塞等待
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
private void cancelAcquire(Node node) {// Ignore if node doesn't existif (node == null)return;node.thread = null;// Skip cancelled predecessors//当前节点的前一个节点Node pred = node.prev;//前一个节点的 waitStatus> 0 (结束状态)while (pred.waitStatus > 0)node.prev = pred = pred.prev;// predNext is the apparent node to unsplice. CASes below will// fail if not, in which case, we lost race vs another cancel// or signal, so no further action is necessary.Node predNext = pred.next;// Can use unconditional write instead of CAS here.// After this atomic step, other Nodes can skip past us.// Before, we are free of interference from other threads.//将 node.waitStatus 设置成 CANCELLED 状态node.waitStatus = Node.CANCELLED;//4. 如果node是tail,更新tail为pred,并使pred.next指向null// If we are the tail, remove ourselves.if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);} else {// If successor needs signal, try to set pred's next-link// so it will get one. Otherwise wake it up to propagate.int ws;//5. 如果node既不是tail,又不是head的后继节点//则将node的前继节点的waitStatus置为SIGNAL//并使node的前继节点指向node的后继节点if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {Node next = node.next;if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next);} else {//6. 如果node是head的后继节点,则直接唤醒node的后继节点unparkSuccessor(node);}node.next = node; // help GC}}
Node的五种状态
CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
0:新结点入队时的默认状态。
释放锁
public void unlock() {sync.release(1);}
public final boolean release(int arg) {//释放锁成功if (tryRelease(arg)) {Node h = head;//如果头节点不为空 并且状态不为0if (h != null && h.waitStatus != 0)//唤醒unparkSuccessor(h);return true;}return false;}
protected final boolean tryRelease(int releases) {//state -1int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {//如果c =0 表示当前是无锁状态 把线程iq清空free = true;setExclusiveOwnerThread(null);}//重新设置 statesetState(c);return free;}
private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)//设置head节点的状态为0compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*///拿到head节点的下一个节点Node s = node.next;//如果下一个节点为null 或者 status>0则表示是 CANCELLED 状态//听过尾部节点开始扫描 找到距离 head最近的一个 waitStatus<=0的节点if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}//如果next 节点不等于空直接唤醒这个线程if (s != null)LockSupport.unpark(s.thread);}
ReentrantReadWriteLock(读写锁)
读写分离减少锁的竞争
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();//获取读锁reentrantReadWriteLock.readLock();//获取写锁reentrantReadWriteLock.writeLock();
加锁
protected final boolean tryAcquire(int acquires) {/** Walkthrough:* 1. If read count nonzero or write count nonzero* and owner is a different thread, fail.* 2. If count would saturate, fail. (This can only* happen if count is already nonzero.)* 3. Otherwise, this thread is eligible for lock if* it is either a reentrant acquire or* queue policy allows it. If so, update state* and set owner.*///拿到当前线程Thread current = Thread.currentThread();//拿到 Stateint c = getState();//从 State中查找当前获得写锁的线程数量因为写锁是互斥的如果获得锁个代表了重入int w = exclusiveCount(c);if (c != 0) {// (Note: if c != 0 and w == 0 then shared count != 0)if (w == 0 || current != getExclusiveOwnerThread())return false;//如果重入 判断重入次数是否大于65535次if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// Reentrant acquiresetState(c + acquires);return true;}//c==0说明读锁和写锁都没有被线程获取if (writerShouldBlock() ||!compareAndSetState(c, c + acquires))return false;setExclusiveOwnerThread(current);return true;}
state采用高低位分开存储读写和写锁 高16位存储读锁状态 低16位存储写锁状态
释放锁
protected final boolean tryRelease(int releases) {//是否等于当前线程if (!isHeldExclusively())throw new IllegalMonitorStateException();int nextc = getState() - releases;//递减锁次数boolean free = exclusiveCount(nextc) == 0;if (free)setExclusiveOwnerThread(null);//如果是0 释放成功setState(nextc);return free;}
共享锁
protected final int tryAcquireShared(int unused) {/** Walkthrough:* 1. If write lock held by another thread, fail.* 2. Otherwise, this thread is eligible for* lock wrt state, so ask if it should block* because of queue policy. If not, try* to grant by CASing state and updating count.* Note that step does not check for reentrant* acquires, which is postponed to full version* to avoid having to check hold count in* the more typical non-reentrant case.* 3. If step 2 fails either because thread* apparently not eligible or CAS fails or count* saturated, chain to version with full retry loop.*///获取当前线程Thread current = Thread.currentThread();//stateint c = getState();//如果是写锁或独占锁的持有者不是当前线程 直接阻塞if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;//获取读锁的数量int r = sharedCount(c);//当前读锁不需要等待 并且 读锁数量不大于65535if (!readerShouldBlock() &&r < MAX_COUNT &&高低位增加读锁的数量compareAndSetState(c, c + SHARED_UNIT)) {//如果是第一次枷锁if (r == 0) {//第一次获得锁的线程就是当前线程firstReader = current;//重入次数是1firstReaderHoldCount = 1;} else if (firstReader == current) {firstReaderHoldCount++;} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}return 1;}//如果cas执行失败 尝试获取共享锁return fullTryAcquireShared(current);}
锁降级
从写锁降级为读锁 如果A线程获得了写锁 在写锁没有释放的情况下在去获得读锁是允许的
特性
StampedLock
StampedLock reentrantReadWriteLock = new StampedLock();//获取写锁reentrantReadWriteLock.writeLock();//获得悲观锁 不阻塞写操作reentrantReadWriteLock.tryOptimisticRead();//获得读锁reentrantReadWriteLock.readLock();
采用一个long 类型的state 变量来表示同步状态 long类型的长度是64位 这里将state拆成3个部分 低7位表述读锁的状态 当一个线程加了读锁时 就在低7位加1 也就是说 最多同时支持127个线程获得读锁 如果线程数超过127则使用readerOverflow进行记录 第八位用来存储写锁的状态 第9-64位存储stamp 记录写锁的状态变化每触发一次写锁 stamped 就会加1
自旋锁根据cpu核心数决定自旋的次数
