竞争条件
两个或以上的线程对操作同一个对象,最终的结果又取决于之前操作的顺序,这时候就出现了竞争条件。例如两个线程同时执行指令:
count++。
问题在于这是一个非原子操作。该指令可能被处理如下:
1) 将count加载到寄存器(读取count)
2) 增加1
3) 将结果回写到count(回写到内存)
如果线程A执行完1,2两步后被剥夺了运行权,线程B被唤醒并成功将count加1。然后线程A被唤醒继续执行第3步。最终的结果是count增加了1,而我们需要的结果是将count加2。
在竞争条件下,为了保证业务逻辑的正确性,我们需要使用锁来实现串行化执行。除了直接使用 synchronized
关键字实现锁外,并发编程API还提供了Lock
接口。Lock接口提供了一系列方法用于提供出色的锁控制,因此相比于直接的监控,功能更丰富。
jdk提供了多种锁实现。
ReentrantLock
ReentrantLock
类与synchronized
关键字一样具备互斥锁的行为,但是有许多扩展的功能。正如名字所言,ReentrantLock
具有可重入性,这点与synchronized
相同。
我们使用ReentrantLock
来实现并发环境下的计数器:
ReentrantLock lock = new ReentrantLock();
int count = 0;
void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
通过 lock()
上锁, unlock()
释放锁。很重要的一点是,必须将代码包含在 try/finally
块中,确保异常情况下锁可以正常释放。当多个线程同时执行该方法,只有一个线程能够进入lock保护的代码块,并且,只有这个线程释放了锁,其他线程才有机会进入受保护代码块。已获得锁的线程可以再次进入。
锁还支持其他丰富的功能,如下所示
ExecutorService executor = Executors.newFixedThreadPool(2);
ReentrantLock lock = new ReentrantLock();
executor.submit(() -> {
lock.lock();
try {
sleep(1);
} finally {
lock.unlock();
}
});
executor.submit(() -> {
System.out.println("Locked: " + lock.isLocked());
System.out.println("Held by me: " + lock.isHeldByCurrentThread());
boolean locked = lock.tryLock();
System.out.println("Lock acquired: " + locked);
});
stop(executor);
当第一个任务持有锁1秒钟,第二个任务可以通过这几个api(isLocked, isHeldByCurrentThread, tryLock)检测锁的状态:
Locked: true
Held by me: false
Lock acquired: false
不同于lock()方法,tryLock()方法尝试获取锁,但不会使当前线程阻塞,我们需要根据返回但boolean值来判断是否已经获得锁。
条件对象
通常,线程进入临界区,却发现在某一条件满足之后它才能执行。要使用一个条件对象来管理那些已经获得了一个锁但是却不能做有用工作的线程。在上述银行转账的例子中,我们要避免选择没有足够余额的账户来作为转出账户。我们又不能使用如下的代码来进行控制:
if (bank.geBalance(from) > amount) {
bank.transfer(from, to, amount);
}
因为当前线程完全可能在完成判断之后且在调用transfer方法之前被中断。在线程再次运行前,账户余额可能已经低于提款金额。必须确保没有其他线程在本检查余额和转账活动之间修改余额。通过锁来保护检查和转账动作来做到这一点:
public void transfer(int from, int to, int amount) {
bankLock.lock();
try {
while (accounts[from] < amount) {
// wait
// ...
}
// transfer funds
// ...
}
finally {
bankLock.unlock();
}
}
当账户中没有足够的余额时,当前线程应该做什么呢?等待另一个线程向账户中转入资金。但是当前线程刚刚获得了bankLock的排他性访问,其他线程根本没有机会进行存款操作。这就是我们需要条件对象的原因。
一个锁对象可以有一个或多个条件对象。可以使用newCondition()方法获得一个条件对象。
public class Bank {
private final double[] accounts;
private Lock bankLock = new ReentrantLock();
private Condition sufficientFunds;
public Bank(int n, double initialBalance) {
accounts = new double[n];
for (int i=0; i<n; i++) {
accounts[i] = initialBalance;
}
sufficientFunds = bankLock.newCondition();
}
public void transfer(int from, int to, int amount) throws InterruptedException {
bankLock.lock();
try {
while (accounts[from] < amount) {
sufficientFunds.await();
}
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf("%10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf("Total Balance: %10.2f%n", getTotalBalance());
sufficientFunds.signalAll();
}
finally {
bankLock.unlock();
}
}
public double getTotalBalance() {
bankLock.lock();
try {
double sum = 0;
for (double a : accounts) {
sum += a;
}
return sum;
}
finally {
bankLock.unlock();
}
}
}
当前线程调用了sufficientFunds.await()方法后阻塞了,并放弃了锁。这样另一个线程有机会获得锁来进行转账操作。
调用了await方法的线程和普通的等待获取锁的线程有本质区别。一旦一个线程调用了await方法,该线程进入该条件的等待集。当锁可用时,该线程不能立刻解除阻塞。直到另一个线程调用了同一个条件对象的signalAll方法为止。通常在对象的状态往有利方向改变时调用signalAll方法,本例中,一旦有转账发生,则调用signalAll方法。另一个方法signal则是随机解除等待集中某个线程的组赛状态。这更有效但是也更有风险,如果随机选择的一个线程发现自己仍然不能运行,它再次阻塞,如果没有其他线程来再次调用signal,那么系统就死锁了。
ReadWriteLock
ReadWriteLock
接口实现了另一种锁操作:用于读和写操作的一对锁。读写锁的核心思想是:如果没有线程更改某个变量,那么对这个变量的并发读操作是安全的。所以如果写锁没有被持有,读锁可以同时被多个线程持有。在读比写更频繁的情况下,读写锁可以有效提升性能和吞吐量
ExecutorService executor = Executors.newFixedThreadPool(2);
Map<String, String> map = new HashMap<>();
ReadWriteLock lock = new ReentrantReadWriteLock();
executor.submit(() -> {
lock.writeLock().lock();
try {
sleep(1);
map.put("foo", "bar");
} finally {
lock.writeLock().unlock();
}
});
上面这个示例中,首先获取一个写锁,往sleep 1秒后往map中放入一个值。在这个任务结束之前,另外两个任务试图从map中读数据,并sleep 1秒
Runnable readTask = () -> {
lock.readLock().lock();
try {
System.out.println(map.get("foo"));
sleep(1);
} finally {
lock.readLock().unlock();
}
};
executor.submit(readTask);
executor.submit(readTask);
stop(executor);
当你执行这段代码的时候,你会发现,两个读任务必须等待写任务结束后才能开始;写任务结束后,两个读任务并行执行,并同时向控制台输出结果,他们不需要互相等待对方结束,因为写锁在不被持有的情况下,读锁可以同时被多个线程持有。
StampedLock
Java 8 提供的 StampedLock
同样支持读写锁。与 ReadWriteLock
不同的是, StampedLock
的锁操作返回一个long类型的标签,你可以使用这些标签来释放或者检测锁是否有效。标签锁还支持另一种锁模式:乐观锁。
让我们用 StampedLock
来代替 ReadWriteLock
重写上面的例子
ExecutorService executor = Executors.newFixedThreadPool(2);
Map<String, String> map = new HashMap<>();
StampedLock lock = new StampedLock();
executor.submit(() -> {
long stamp = lock.writeLock();
try {
sleep(1);
map.put("foo", "bar");
} finally {
lock.unlockWrite(stamp);
}
});
Runnable readTask = () -> {
long stamp = lock.readLock();
try {
System.out.println(map.get("foo"));
sleep(1);
} finally {
lock.unlockRead(stamp);
}
};
executor.submit(readTask);
executor.submit(readTask);
stop(executor);
调用 readLock()
或 writeLock()
返回一个标签,用作后续释放锁。请记住,标签锁没有实现可重入特性。每一次调用返回一个新的标签,如果锁已经被持有,则阻塞,即使该线程已经获得了锁。
这段代码的执行效果与ReadWriteLock
示例一样。
接下来的示例展示乐观锁:
ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();
executor.submit(() -> {
long stamp = lock.tryOptimisticRead();
try {
System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
sleep(1);
System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
sleep(2);
System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
} finally {
lock.unlock(stamp);
}
});
executor.submit(() -> {
long stamp = lock.writeLock();
try {
System.out.println("Write Lock acquired");
sleep(2);
} finally {
lock.unlock(stamp);
System.out.println("Write done");
}
});
stop(executor);
通过调用tryOptimisticRead()
方法可以获得一个乐观读锁,返回一个标签,且不阻塞线程,无论锁是否可获得。如果已经有一把激活的写锁,则返回的标签等于0。你可以通过调用lock.validate(stamp)
来检查标签是否有效。
执行上述代码得到以下输出:
Optimistic Lock Valid: true
Write Lock acquired
Optimistic Lock Valid: false
Write done
Optimistic Lock Valid: false
与普通读锁对比,乐观读锁不会阻止其他线程立刻获得写锁(普通情况下读写锁不能被同时获得)。当第一个线程获得乐观读锁并睡了1秒后,第二个线程无需等待乐观读锁释放,就可以立刻获得写锁,即刻起乐观读锁失效,即使写锁被释放了,该乐观读锁依然无效。
因此当你使用乐观锁时,每次访问共享可变变量后,你都要验证读锁是否依然有效。
有时候,我们需要将一个读锁转换为写锁,而不需要重新释放和获取锁。 StampedLock
提供了tryConvertToWriteLock()
方法来实现这一目的:
ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();
executor.submit(() -> {
long stamp = lock.readLock();
try {
if (count == 0) {
stamp = lock.tryConvertToWriteLock(stamp);
if (stamp == 0L) {
System.out.println("Could not convert to write lock");
stamp = lock.writeLock();
}
count = 23;
}
System.out.println(count);
} finally {
lock.unlock(stamp);
}
});
stop(executor);
该任务首先获得一个读锁,并将 count
变量的当前值输出到控制台。但是,如果当前值是0的话,我们希望将其设置为23。我们首先尝试将读锁转换为写锁,不中断其他线程的并发访问。调用 tryConvertToWriteLock()
不会阻塞,但是可能返回0,代表当前没有写锁可用,这种情况下,我们调用 writeLock()
方法,阻塞当前线程直到有写锁可用。
Semaphores
并发API还支持计数信号量。普通锁提供对变量或资源的互斥访问,然而信号量则用于操作一组访问权限。这可以用作在许多场景中,限制应用某个部分的并发量。
ExecutorService executor = Executors.newFixedThreadPool(10);
Semaphore semaphore = new Semaphore(5);
Runnable longRunningTask = () -> {
boolean permit = false;
try {
permit = semaphore.tryAcquire(1, TimeUnit.SECONDS);
if (permit) {
System.out.println("Semaphore acquired");
sleep(5);
} else {
System.out.println("Could not acquire semaphore");
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
} finally {
if (permit) {
semaphore.release();
}
}
}
IntStream.range(0, 10)
.forEach(i -> executor.submit(longRunningTask));
stop(executor);
执行器可能并发执行10个任务,但是我们设置信号量的大小为5,因此并发数被限制为5。切记要使用try/finally
代码块以释放信号量。
执行上述代码获得以下输出:
Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
死锁
死锁发生在两个或以上的线程获取其它线程正持有的锁时。当多个线程同时需要同样的锁,但是以不同的顺序来获取它们,就有可能造成死锁。
产生死锁的四个条件:
- 互斥条件:一段时间内,某资源只能由一个线程占用
- 占有且等待条件:线程已经持有至少一个资源,但又提出了新的资源请求,而该资源已经其它线程占有
- 非抢占条件:线程已经获得的资源,在未使用完之前,不能被剥夺
- 循环等待条件:存在一个封闭的等待链,链中的每个线程至少持有一个此链中下一个线程所需要的资源
举个例子,如果线程1获得了锁A,并且尝试获取锁B,而线程2已经获得锁B,并试图获得锁A,死锁就诞生了。线程1永远无法获得锁B,同样线程2永远无法获得锁A。并且,它们各自都不知道,依然拿着自己的锁不放。
Thread 1 locks A, waits for B
Thread 2 locks B, waits for A
以下是一个例子,TreeNode类的不同实例调用synchronized方法
public class TreeNode {
TreeNode parent = null;
List children = new ArrayList();
public synchronized void addChild(TreeNode child){
if(!this.children.contains(child)) {
this.children.add(child);
child.setParentOnly(this);
}
}
public synchronized void addChildOnly(TreeNode child){
if(!this.children.contains(child){
this.children.add(child);
}
}
public synchronized void setParent(TreeNode parent){
this.parent = parent;
parent.addChildOnly(this);
}
public synchronized void setParentOnly(TreeNode parent){
this.parent = parent;
}
}
如果线程1调用了parent.addChild(child) 方法,同时线程2调用了child.setParent(parent)方法,这里的parent和child是同一个,那么死锁就有可能发生。
Thread 1: parent.addChild(child); //locks parent
--> child.setParentOnly(parent);
Thread 2: child.setParent(parent); //locks child
--> parent.addChildOnly()
首先线程1调用parent.addChild(child),由于addChild()方法是synchronized修饰的,这样一来,线程1就锁住了parent对象(其它线程无法调用parent的同步方法,因为其内部锁已经被线程1占有了)。
接着线程2调用child.setParent(parent),由于setParent()也是synchronized修饰的,这样一来,线程2就锁住了child对象(其它线程无法调用child的同步方法,因为其内部锁已经被线程2占有了)。
现在,parent和child对象同时被两个线程分别锁住了。接下来,线程1想调用child.setParentOnly()方法,但是child对象被线程2锁住了,所以线程1阻塞了。同样线程2想调用parent.addChildOnly()方法,而parent被线程1锁住了,线程2也只能阻塞。于是,线程1和线程2就都在阻塞等待获取对方持有的锁。
注意,这两个线程必须同时调用parent.addChild(child) 和 child.setParent(parent) ,才会造成死锁。以上代码也许能完好地执行很长时间,直到在突然的一瞬间,死锁发生了。
再次强调,这两个线程必须同时获取锁才会造成死锁。举例,如果线程1比线程2稍早一点,锁住了parent和child,那么线程2就没有机会获得child的内部锁,死锁就不会发生。由于线程调度是无法预测的,因此没有办法预测死锁会在什么时候发生,我们只能知道,它可能发生。
更复杂的死锁
死锁可能包含多于两个线程的场景。这让其更难被预知。以下是四个线程导致死锁的例子:
Thread 1 locks A, waits for B
Thread 2 locks B, waits for C
Thread 3 locks C, waits for D
Thread 4 locks D, waits for A
线程1等待线程2,线程2等待线程3,线程3等待线程4,线程4等待线程1。
数据库死锁
一个更复杂的死锁场景是数据库事务。一个数据库事务可能包含多个sql更新请求。当一条记录在一个事物中被更新,这条记录被上锁,以阻止其它事务更改它,直到第一个事务结束。因此同一个事务中的每个请求都可能锁上数据库中的某些记录。
如果多个事务同时执行,都想更新同样的记录,那么就有死锁的风险。
举例
Transaction 1, request 1, locks record 1 for update
Transaction 2, request 1, locks record 2 for update
Transaction 1, request 2, tries to lock record 2 for update.
Transaction 2, request 2, tries to lock record 1 for update.
由于锁是在不同的请求中获取的,不是所有的锁都能事先被感知,很难检测或阻止数据库事务中的死锁
如何防止死锁
锁排序
死锁发生在多个线程以不同的顺序获取锁。如果能够确保任意线程都以相同的方式获取锁,死锁就不会发生。举例:
Thread 1:
lock A
lock B
Thread 2:
wait for A
lock C (when A locked)
Thread 3:
wait for A
wait for B
wait for C
如果一个线程,如thread 3,需要获取多个锁,它必须以指定的顺序来获取它们。它不能跳过锁序列中前面的锁,而直接获取后面的锁。
thread 2和thread 3在获取锁A之前,都不可能锁C。而由于thread 1获得了锁A,thread 2和thread 3都必须等待,直到thread 2释放了锁A。然后它们才能获得锁A,继而才能尝试获得锁C。
锁排序是一种简单且有效的防止死锁的方式。但是,只有在你事先知道所有需要的锁时才能使用锁排序。事情往往并没有那么简单。
锁超时
另外一个死锁防止机制是,在尝试获取锁时加上超时时间。这意味着,一个线程在尝试获取一把锁时,最多尝试这么长时间。如果该线程在经过这么长时间的尝试之后,依然没有获得锁,它将回头,释放所有已持有的锁,经过一段随机时间的等待后再次尝试。这个随机等待时间,使其它等待获取这些相同锁的线程有机会成功,从而使应用得以继续运行。
以下是一个关于两个线程试图以不同的顺序获取两把相同的锁,且线程回首重试的例子
Thread 1 locks A
Thread 2 locks B
Thread 1 attempts to lock B but is blocked
Thread 2 attempts to lock A but is blocked
Thread 1's lock attempt on B times out
Thread 1 backs up and releases A as well
Thread 1 waits randomly (e.g. 257 millis) before retrying.
Thread 2's lock attempt on A times out
Thread 2 backs up and releases B as well
Thread 2 waits randomly (e.g. 43 millis) before retrying.
在以上例子中,线程2大约在线程1重试获取锁B之前200ms开始重试获取锁A,因此线程2能够成功获得2把锁。当线程2完成并释放了两把锁后,线程1也有机会获得它们。
需要记住的是,使用超时锁,并不仅仅是为了防止死锁,也适用于被锁的代码块需要很长处理时间的场景。
另外,如果有足够多的线程竞争相同的资源,即使添加了超时和回头机制,它们也有可能一遍又一遍地重试。对于2个线程、等待0~500毫秒的场景,这种情况也许不会发生,但是对于10或20个线程,情况就不同了,这时最好让每个线程在重试之前等待更多的时间,一个通用的算法是让等待时间与重试次数成正比。
锁超时机制的一个问题是,无法给synchronized块设置超时时间。你必须创建一个自定义的锁类,或者使用java.util.concurrency包中已内置的锁类。以下是使用锁超时机制的代码示例
public static void main(String[] args) throws Exception {
final ReentrantLock lock1 = new ReentrantLock();
final ReentrantLock lock2 = new ReentrantLock();
Runnable try1_2 = getRunnable(lock1, "lock 1", lock2, "lock 2");
Runnable try2_1 = getRunnable(lock2, "lock 2", lock1, "lock 1");
new Thread(try1_2).start();
new Thread(try2_1).start();
}
private static Runnable getRunnable(final ReentrantLock lock1, final String lock1Name, final ReentrantLock lock2, final String lock2Name) {
return new Runnable() {
@Override
public void run() {
try {
if (lock1.tryLock(1, TimeUnit.SECONDS)) {
System.out.println(lock1Name + " acquired in thread " + Thread.currentThread());
if (lock2.tryLock(1, TimeUnit.SECONDS)) {
System.out.println(lock2Name + " acquired in thread " + Thread.currentThread());
Thread.sleep(2000);
} else {
System.out.println("Could not acquire "+lock2Name + " in thread " + Thread.currentThread());
lock1.unlock();
System.out.println(lock1Name + " released in thread " + Thread.currentThread());
}
} else {
System.out.println("Could not acquire " + lock1Name + " in thread " + Thread.currentThread());
}
} catch (InterruptedException e) {
//you should not ignore it
} finally {
if (lock1.isHeldByCurrentThread()) lock1.unlock();
if (lock2.isHeldByCurrentThread()) lock2.unlock();
}
}
};
}
死锁检测
死锁检测是一个更重的死锁防止机制,旨在针对无法进行锁排序,且锁超时不可行的场景。
每当一个线程获得了一把锁,在一个数据结构(如map或者graph)中记录线程和锁。另外,每当一个线程请求一把锁,也记录到这个数据结构中。
当一个线程请求一把锁,但是请求被拒绝时,该线程可以浏览这个锁图来检测是否有死锁。举例,如果线程A请求锁7,但是锁7已经被线程B持有,那么线程A可以检查线程B是否已经请求了线程A已经持有的锁。如果线程B确实请求了这样的一把锁,那么死锁已经发生了。
当然,死锁的剧情可能比两个线程互相持有对方请求的锁复杂得多。线程A可能在等待线程B,线程B等待线程C,线程C等待线程D,线程D等待线程A。线程A为了检测死锁,必须检测B请求的所有锁,逐个排查线程B请求的锁正在被哪个线程占用,最终会找到线程C,而排查线程C请求的锁,可以找到线程D,排查线程D请求的锁,最终发现其被线程A占有,于是就能发现死锁已经发生了。
下图展示了锁被4个线程(A,B,C,D)占有和请求的场景。这样一个数据结构可以用作死锁检测
那么,当死锁被检测到后,线程又该怎么做呢?
一个可能的做法是,释放所有锁,回退,随机等待一段时间后再试。但是,如果有许多线程都在竞争同样的锁,可能导致重复出现死锁,即便实施了回退和等待。
更好的选择是,给这些线程赋予优先级,使得只有一个或少部分线程回退,剩下的线程继续尝试获取需要的锁。为了避免线程被分配到优先级始终是固定的,我们可以在检测到死锁的时候,随机分配优先级。
自旋锁
线程通过 busy-wait-loop 方式来获取锁, 任何时刻只有一个线程能够获得锁, 其它线程忙等待直到获得锁。
应用场景
spinlock 不会有线程状态切换,所以响应更快。
使用spinlock时, 临界区要尽量短,不要有显示或隐式的系统调用。如读写文件等操作。
临界区:指的是一个访问共用资源(例如:共用设备或是共用存储器)的程序片段,当有线程进入临界区段时,其他线程或是进程必须等待
当存在大量线程,竞争激烈时,不适合使用自旋锁,因为每个线程都在在执行,占用CPU时间。
Java 实现自旋锁
本例子使用了CAS原子操作,非公平锁, 即获得锁的先后顺序,不会按照进入lock的先后顺序进行。
package concurrent.lock;
import java.util.concurrent.atomic.AtomicReference;
public class SpinLock implements Lock{
private AtomicReference<Thread> sign = new AtomicReference<Thread>();
public void lock(){
Thread current=Thread.currentThread();
while(!sign.compareAndSet(null, current)){
}
}
public void unlock(){
Thread current=Thread.currentThread();
sign.compareAndSet(current, null);
}
}
使用ReentrantLock的tryLock是更好的自旋锁方案?