线程同步

线程同步的背景:如果多个线程同时读写一个共享变量,会出现数据不一致的问题。
多线程模型下,要保证逻辑正确,对共享变量进行读写时,必须保证一组指令以原子方式执行:即某一个线程执行时,其他线程必须等待:

这种加锁和解锁之间的代码块我们称之为临界区(Critical Section),任何时候临界区最多只有一个线程能执行。
可见,保证一段代码的原子性就是通过加锁和解锁实现的。Java程序使用synchronized关键字对一个对象进行加锁:

  1. synchronized(lock) {
  2. n = n + 1;
  3. }
  1. class DecThread extends Thread {
  2. public void run() {
  3. for (int i=0; i<10000; i++) {
  4. synchronized(Counter.lock) {
  5. Counter.count -= 1;
  6. }
  7. }
  8. }
  1. synchronized(Counter.lock) { // 获取锁
  2. ...
  3. } // 释放锁

使用synchronized解决了多线程同步访问共享变量的正确性问题。但是,它的缺点是带来了性能下降。因为synchronized代码块无法并发执行。此外,加锁和解锁需要消耗一定的时间,所以,synchronized会降低程序的执行效率。
synchronized用法:

  1. 找出修改共享变量的线程代码块(临界区);
  2. 选择一个共享实例作为锁;
  3. 使用synchronized(lockObject) { ... }

注意:在期望两个线程同步时,使用synchronized方法中的锁参数必须是同一个对象,因为JVM只保证同一个锁在任意时刻只能被一个线程获取,但两个不同的锁在同一时刻可以被两个线程分别获取。

同步方法

当我们锁住的是this实例时,实际上可以用synchronized修饰这个方法。下面两种写法是等价的:

  1. public void add(int n) {
  2. synchronized(this) { // 锁住this
  3. count += n;
  4. } // 解锁
  5. }
  6. public synchronized void add(int n) { // 锁住this
  7. count += n;
  8. } // 解锁

因此,用synchronized修饰的方法就是同步方法,它表示整个方法都必须用this实例加锁。
注意:当对静态方法添加synchronized修饰符时,它的锁对象为该类的Class实例,一下两种方法是等价的。

  1. public synchronized static void test(int n) {
  2. ...
  3. }
  4. public class Counter {
  5. public static void test(int n) {
  6. synchronized(Counter.class) {
  7. ...
  8. }
  9. }
  10. }

死锁

锁的目的是保证不同线程间,不能调用统一方法。但是对于统一线程,是否可以调用同一锁的不同方法?答案是肯定的。JVM允许同一个线程重复获取同一个锁,这种能被同一个线程反复获取的锁,就叫做可重入锁。每获取一次锁,记录+1,每退出synchronized块,记录-1,减到0的时候,才会真正释放锁。
关于死锁:

  1. public void add(int m) {
  2. synchronized(lockA) { // 获得lockA的锁
  3. this.value += m;
  4. synchronized(lockB) { // 获得lockB的锁
  5. this.another += m;
  6. } // 释放lockB的锁
  7. } // 释放lockA的锁
  8. }
  9. public void dec(int m) {
  10. synchronized(lockB) { // 获得lockB的锁
  11. this.another -= m;
  12. synchronized(lockA) { // 获得lockA的锁
  13. this.value -= m;
  14. } // 释放lockA的锁
  15. } // 释放lockB的锁
  16. }

在获取多个锁的时候,不同线程获取多个不同对象的锁可能导致死锁。对于上述代码,线程1和线程2如果分别执行add()dec()方法时:

  • 线程1:进入add(),获得lockA
  • 线程2:进入dec(),获得lockB

随后:

  • 线程1:准备获得lockB,失败,等待中;
  • 线程2:准备获得lockA,失败,等待中。

此时,两个线程各自持有不同的锁,然后各自试图获取对方手里的锁,造成了双方无限等待下去,这就是死锁。死锁无解决方法,只能强行结束进程。
避免方法,应保证不同线程间加锁的顺序一致,将dec()方法改写如下

  1. public void dec(int m) {
  2. synchronized(lockA) { // 获得lockA的锁
  3. this.value -= m;
  4. synchronized(lockB) { // 获得lockB的锁
  5. this.another -= m;
  6. } // 释放lockB的锁
  7. } // 释放lockA的锁
  8. }

wait()和notify()

光靠synchronized无法解决所有多线程协调问题。例如:

  1. class TaskQueue {
  2. Queue<String> queue = new LinkedList<>();
  3. public synchronized void addTask(String s) {
  4. this.queue.add(s);
  5. }
  6. public synchronized String getTask() {
  7. while (queue.isEmpty()) {
  8. }
  9. return queue.remove();
  10. }
  11. }

在以上代码中,实际上getTask()中的while()循环永远不会退出。因为线程在执行while()循环时,已经在getTask()入口获取了this锁,其他线程根本无法调用addTask(),因为addTask()执行条件也是获取this锁。故该线程会因为死循环占用CPU资源。
此时我们可以在while()中添加方法:

  1. public synchronized String getTask() {
  2. while (queue.isEmpty()) {
  3. // 释放this锁:
  4. this.wait();
  5. // 重新获得this锁:
  6. }
  7. return queue.remove();
  8. }

this.wait()表示线程进入等待状态,注意wait()方法必须在当前获取的锁对象上调用,这里获取的是this锁,因此调用this.wait()
调用wait()方法后,线程进入等待状态,wait()方法不会返回,直到将来某个时刻,线程从等待状态被其他线程唤醒后(notify),wait()方法才会返回,然后,继续执行下一条语句。
进入wait状态后,应在其他线程中使用相同对象锁上调用notify()方法:

  1. public synchronized void addTask(String s) {
  2. this.queue.add(s);
  3. this.notify(); // 唤醒在this锁等待的线程
  4. }

注意到在往队列中添加了任务后,线程立刻对this锁对象调用notify()方法,这个方法会唤醒一个正在this锁等待的线程(就是在getTask()中位于this.wait()的线程),从而使得等待线程从this.wait()方法返回。
注意:我们在while()循环中调用wait(),而不是if语句中调用,原因是在等待的这段时间内,无论是while还是if,其中因为变量的改变,导致判断的结果可能发生了变化,因此需要重新再判断一次需要用while()循环。

使用ReentrantLock

从Java 5开始,引入了一个高级的处理并发的java.util.concurrent包,它提供了大量更高级的并发功能,能大大简化多线程程序的编写。
我们知道Java语言直接提供了synchronized关键字用于加锁,但这种锁一是很重,二是获取时必须一直等待,没有额外的尝试机制。
java.util.concurrent.locks包提供的ReentrantLock用于替代synchronized加锁,我们来看一下传统的synchronized代码:

  1. public class Counter {
  2. private int count;
  3. public void add(int n) {
  4. synchronized(this) {
  5. count += n;
  6. }
  7. }
  8. }

如果用ReentrantLock替代,可以把代码改造为:

  1. public class Counter {
  2. private final Lock lock = new ReentrantLock();
  3. private int count;
  4. public void add(int n) {
  5. lock.lock();
  6. try {
  7. count += n;
  8. } finally {
  9. lock.unlock();
  10. }
  11. }
  12. }

因为synchronized是Java语言层面提供的语法,所以我们不需要考虑异常,而ReentrantLock是Java代码实现的锁,我们就必须先获取锁,然后在finally中正确释放锁。
顾名思义,ReentrantLock是可重入锁,它和synchronized一样,一个线程可以多次获取同一个锁。
synchronized不同的是,ReentrantLock可以尝试获取锁:

  1. if (lock.tryLock(1, TimeUnit.SECONDS)) {
  2. try {
  3. ...
  4. } finally {
  5. lock.unlock();
  6. }
  7. }

上述代码在尝试获取锁的时候,最多等待1秒。如果1秒后仍未获取到锁,tryLock()返回false,程序就可以做一些额外处理,而不是无限等待下去。
所以,使用ReentrantLock比直接使用synchronized更安全,线程在tryLock()失败的时候不会导致死锁。

使用Condition


使用ReentrantLock比直接使用synchronized更安全,可以替代synchronized进行线程同步。
但是,synchronized可以配合waitnotify实现线程在条件不满足时等待,条件满足时唤醒,用ReentrantLock我们怎么编写waitnotify的功能呢?
答案是使用Condition对象来实现waitnotify的功能。
我们仍然以TaskQueue为例,把前面用synchronized实现的功能通过ReentrantLockCondition来实现:

  1. class TaskQueue {
  2. private final Lock lock = new ReentrantLock();
  3. private final Condition condition = lock.newCondition();
  4. private Queue<String> queue = new LinkedList<>();
  5. public void addTask(String s) {
  6. lock.lock();
  7. try {
  8. queue.add(s);
  9. condition.signalAll();
  10. } finally {
  11. lock.unlock();
  12. }
  13. }
  14. public String getTask() {
  15. lock.lock();
  16. try {
  17. while (queue.isEmpty()) {
  18. condition.await();
  19. }
  20. return queue.remove();
  21. } finally {
  22. lock.unlock();
  23. }
  24. }
  25. }

可见,使用Condition时,引用的Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例。
Condition提供的await()signal()signalAll()原理和synchronized锁对象的wait()notify()notifyAll()是一致的,并且其行为也是一样的:

  • await()会释放当前锁,进入等待状态;
  • signal()会唤醒某个等待线程;
  • signalAll()会唤醒所有等待线程;
  • 唤醒线程从await()返回后需要重新获得锁。

此外,和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()signalAll()唤醒,可以自己醒来:

  1. if (condition.await(1, TimeUnit.SECOND)) {
  2. // 被其他线程唤醒
  3. } else {
  4. // 指定时间内没有被其他线程唤醒
  5. }

可见,使用Condition配合Lock,我们可以实现更灵活的线程同步。

ReadWriteLock

在同步的实际情况中,一般允许多个线程能够读取数据,但当只要有一个线程在写,其他进程必须等待,不能读和写。

|

| 读 | 写 | | —- | —- | —- |

| 读 | 允许 | 不允许 |

| 写 | 不允许 | 不允许 |

这时使用ReadWriteLock可以解决这个问题:

  • 只允许一个线程写入(此时其他进程既不能读也不能写)
  • 没有写入时,多个线程允许同时读(提高性能)

ReadWriteLock实现起来也非常容易:

public class Counter {
    private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
    private final Lock rlock = rwlock.readLock();
    private final Lock wlock = rwlock.writeLock();
    private int[] counts = new int[10];

    public void inc(int index) {
        wlock.lock(); // 加写锁
        try {
            counts[index] += 1;
        } finally {
            wlock.unlock(); // 释放写锁
        }
    }

    public int[] get() {
        rlock.lock(); // 加读锁
        try {
            return Arrays.copyOf(counts, counts.length);
        } finally {
            rlock.unlock(); // 释放读锁
        }
    }
}

StampedLock

当深入分析ReadWriteLock析ReadWriteLock,会发现它有个潜在的问题:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。
要进一步提升并发执行效率,Java 8引入了新的读写锁:StampedLock
StampedLockReadWriteLock相比,改进之处在于:读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。
乐观锁的意思就是乐观地估计读的过程中大概率不会有写入,因此被称为乐观锁。反过来,悲观锁则是读的过程中拒绝有写入,也就是写入必须等待。显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。
ReadWriteLock相比,写入的加锁是完全一样的,不同的是读取。注意到首先我们通过tryOptimisticRead()获取一个乐观读锁,并返回版本号。接着进行读取,读取完成后,我们通过validate()去验证版本号,如果在读取过程中没有写入,版本号不变,验证成功,我们就可以放心地继续后续操作。如果在读取过程中有写入,版本号会发生变化,验证将失败。在失败的时候,我们再通过获取悲观读锁再次读取。由于写入的概率不高,程序在绝大部分情况下可以通过乐观读锁获取数据,极少数情况下使用悲观读锁获取数据。

public double distanceFromOrigin() {
        long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁
        // 注意下面两行代码不是原子操作
        // 假设x,y = (100,200)
        double currentX = x;
        // 此处已读取到x=100,但x,y可能被写线程修改为(300,400)
        double currentY = y;
        // 此处已读取到y,如果没有写入,读取是正确的(100,200)
        // 如果有写入,读取是错误的(100,400)
        if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
            stamp = stampedLock.readLock(); // 获取一个悲观读锁
            try {
                currentX = x;
                currentY = y;
            } finally {
                stampedLock.unlockRead(stamp); // 释放悲观读锁
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }

可见,StampedLock把读锁细分为乐观读和悲观读,能进一步提升并发效率。但这也是有代价的:一是代码更加复杂,二是StampedLock是不可重入锁,不能在一个线程中反复获取同一个锁。

使用Concurrent集合


之前通过ReentrantLockCondition实现了一个BlockingQueue:
因为BlockingQueue非常有用,所以我们不必自己编写,可以直接使用Java标准库的java.util.concurrent包提供的线程安全的集合:ArrayBlockingQueue
除了BlockingQueue外,针对ListMapSetDeque等,java.util.concurrent包也提供了对应的并发集合类。我们归纳一下:

| interface | non-thread-safe(线程不安全) | thread-safe(线程安全) | | —- | —- | —- |

| List | ArrayList | CopyOnWriteArrayList |

| Map | HashMap | ConcurrentHashMap |

| Set | HashSet / TreeSet | CopyOnWriteArraySet |

| Queue | ArrayDeque / LinkedList | ArrayBlockingQueue / LinkedBlockingQueue |

| Deque | ArrayDeque / LinkedList | LinkedBlockingDeque |

这些集合的使用与非线程安全的集合类完全相同,线程安全的集合类实现同步和加锁解锁都在集合内部实现,对外部调用者,只需要正常按接口引用。
此外java.util.Collections工具类还提供了一个旧的线程安全集合转换器,可以这么用:

Map unsafeMap = new HashMap();
Map threadSafeMap = Collections.synchronizedMap(unsafeMap);

使用Atomic


concurrent除了提供底层锁、并发集合外,还提供了一组原子操作的封装类,它们位于java.util.concurrent.atomic
我们以AtomicInteger为例,它提供的主要操作有:

  • 增加值并返回新值:int addAndGet(int delta)
  • 加1后返回新值:int incrementAndGet()
  • 获取当前值:int get()
  • 用CAS方式设置:int compareAndSet(int expect, int update)

CAS:Compare and Set。若是自己通过CAS编写incrementAndGet()如下所示:

public int incrementAndGet(AtomicInteger var) {
    int prev, next;
    do {
        prev = var.get();
        next = prev + 1;
    } while ( ! var.compareAndSet(prev, next));
    return prev;
}

CAS是指,在这个操作中,如果AtomicInteger的当前值是prev,那么就更新为next,返回true。如果AtomicInteger的当前值不是prev,就什么也不干,返回false。通过CAS操作并配合do ... while循环,即使其他线程修改了AtomicInteger的值,最终的结果也是正确的。

线程池


创建线程需要操作系统资源,如果对于服务器经常频繁创建线程是十分消耗资源的。
那么我们就可以把很多小任务让一组线程来这些,而不是一个任务对应一个新线程。这种接受大量小任务并进行分发处理的就是线程池。
Java标准库提供了ExecutorService接口表示线程池,它的典型用法如下:

public class Main {
    public static void main(String[] args) {
        // 创建一个固定大小的线程池:
        ExecutorService es = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 6; i++) {
            es.submit(new Task("" + i));
        }
        // 关闭线程池:
        es.shutdown();
    }
}

class Task implements Runnable {
    private final String name;

    public Task(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println("start task " + name);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        System.out.println("end task " + name);
    }
}

因为ExecutorService只是接口,Java标准库提供的几个常用实现类有:

  • FixedThreadPool:线程数固定的线程池;
  • CachedThreadPool:线程数根据任务动态调整的线程池;
  • SingleThreadExecutor:仅单线程执行的线程池。

在线程池在程序结束的时候要关闭。使用shutdown()方法关闭线程池时,会等待正在执行的任务执行完再关闭。而shutdownNow()会立刻停止正在执行的任务,awaitTermination()则会等待指定的时间让线程池关闭。

ScheduledThreadPool


还有一种任务,需要定期反复执行,例如,每秒刷新证券价格。这种任务本身固定,需要反复执行的,可以使用ScheduledThreadPool。放入ScheduledThreadPool的任务可以定期反复执行。
创建一个ScheduledThreadPool仍然是通过Executors类:

ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);

我们可以提交一次性任务,它会在指定延迟后只执行一次:

// 1秒后执行一次性任务:
ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);

如果任务以固定的每3秒执行,我们可以这样写:

// 2秒后开始执行定时任务,每3秒执行:
ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);

如果任务以固定的3秒为间隔执行,我们可以这样写:

// 2秒后开始执行定时任务,以3秒为间隔执行:
ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);

注意FixedRate和FixedDelay的区别。FixedRate是指任务总是以固定时间间隔触发,不管任务执行多长时间:

│░░░░   │░░░░░░ │░░░    │░░░░░  │░░░  
├───────┼───────┼───────┼───────┼────>
│<─────>│<─────>│<─────>│<─────>│

而FixedDelay是指,上一次任务执行完毕后,等待固定的时间间隔,再执行下一次任务:

│░░░│       │░░░░░│       │░░│       │░
└───┼───────┼─────┼───────┼──┼───────┼──>
    │<─────>│     │<─────>│  │<─────>│

因此,使用ScheduledThreadPool时,我们要根据需要选择执行一次、FixedRate执行还是FixedDelay执行。

使用Future


在执行多任务时,如果我们需要一个异步的返回结果,就要用到Java标准库里的Callable接口

class Task implements Callable<String> {//泛型,可指定返回的类型
    public String call() throws Exception {
        return longTimeCalculation(); 
    }
}

如果仔细看ExecutorService.submit()方法,可以看到,它返回了一个Future类型,一个Future类型的实例代表一个未来能获取结果的对象:

ExecutorService executor = Executors.newFixedThreadPool(4); 
// 定义任务:
Callable<String> task = new Task();
// 提交任务并获得Future:
Future<String> future = executor.submit(task);
// 从Future获取异步执行返回的结果:
String result = future.get(); // 可能阻塞

一个Future接口表示一个未来可能会返回的结果,它定义的方法有:

  • get():获取结果(可能会等待)
  • get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间;
  • cancel(boolean mayInterruptIfRunning):取消当前任务;
  • isDone():判断任务是否已完成。

    使用ThreadLocal


ThreadLocal的背景:
对于多任务,Java标准库提供的线程池可以方便地执行这些任务,同时复用线程。Web应用程序就是典型的多任务应用,每个用户请求页面时,我们都会创建一个任务,类似:

public void process(User user) {
    checkPermission();
    doWork();
    saveStatus();
    sendResponse();
}

然后,通过线程池去执行这些任务。
观察process()方法,它内部需要调用若干其他方法,同时,我们遇到一个问题:如何在一个线程内传递状态?
process()方法需要传递的状态就是User实例。有的童鞋会想,简单地传入User就可以了:

public void process(User user) {
    checkPermission(user);
    doWork(user);
    saveStatus(user);
    sendResponse(user);
}

但是往往一个方法又会调用其他很多方法,这样会导致User传递到所有地方:

void doWork(User user) {
    queryStatus(user);
    checkStatus();
    setNewStatus(user);
    log();
}

这种在一个线程中,横跨若干方法调用,需要传递的对象,我们通常称之为上下文(Context),它是一种状态,可以是用户身份、任务信息等。
给每个方法增加一个context参数非常麻烦,而且有些时候,如果调用链有无法修改源码的第三方库,User对象就传不进去了。
Java标准库提供了一个特殊的ThreadLocal,它可以在一个线程中传递同一个对象。
ThreadLocal实例通常总是以静态字段初始化如下:

static ThreadLocal<User> threadLocalUser = new ThreadLocal<>();

它的典型使用方式如下:

void processUser(user) {
    try {
        threadLocalUser.set(user);
        step1();
        step2();
    } finally {
        threadLocalUser.remove();
    }
}

通过设置一个User实例关联到ThreadLocal中,在移除之前,所有方法都可以随时获取到该User实例:

void step1() {
    User u = threadLocalUser.get();
    log();
    printUser();
}

void log() {
    User u = threadLocalUser.get();
    println(u.name);
}

void step2() {
    User u = threadLocalUser.get();
    checkUser(u.id);
}

注意到普通的方法调用一定是同一个线程执行的,所以,step1()step2()以及log()方法内,threadLocalUser.get()获取的User对象是同一个实例。
实际上,可以把ThreadLocal看成一个全局Map:每个线程获取ThreadLocal变量时,总是使用Thread自身作为key:

Object threadLocalValue = threadLocalMap.get(Thread.currentThread());

因此,ThreadLocal相当于给每个线程都开辟了一个独立的存储空间,各个线程的ThreadLocal关联的实例互不干扰。
最后,特别注意ThreadLocal一定要在finally中清除:

try {
    threadLocalUser.set(user);
    ...
} finally {
    threadLocalUser.remove();
}