本章介绍一些同步工具

ReentrantLock

ReentrantLock 是一种可重入锁,是 Java 5 新引入的一种锁。synchronized 本身也是一种可重入锁。

它的使用方法是在需要实现同步的代码块开头使用 lock() 进行加锁,执行完毕之后使用 unlock() 释放锁。unlock() 方法要求放在 finally 块里,使得无论怎样锁最终都会被释放,否则会出现死锁的问题。

  1. public class ReentrantLockDemo{
  2. private int sum = 0;
  3. private Lock lock = new ReentrantLock();
  4. /**
  5. * 使用ReentrantLock实现同步
  6. */
  7. public void add(){
  8. //上锁
  9. lock.lock();
  10. try {
  11. Thread.sleep(10);
  12. sum++;
  13. System.out.println("add:" + sum);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. } finally {
  17. //释放锁
  18. lock.unlock();
  19. }
  20. }
  21. }

ReentrantLock 需要手动加锁释放锁,而 synchronized 可以自动释放锁,不需要手动操作,避免了程序员忘记释放锁的情况,这是它们之间其中一个不同点。

那为什么我们还要用 ReentrantLock 呢?ReentrantLock 还是有一些功能强大的地方,如 tryLock() 方法。

tryLock() 方法使用时会对代码块进行尝试锁定,但是不管锁定与否,最终方法还是会继续执行,而 synchronized 如果锁定失败就会阻塞。

  1. public class ReentrantLockDemo{
  2. private int sum = 0;
  3. private Lock lock = new ReentrantLock();
  4. /**
  5. * 使用 tryLock 进行尝试锁定
  6. * 不管锁定与否,方法都会继续执行,因此这里是线程不安全的,sum的值不能保证
  7. */
  8. public void tryLock(){
  9. //尝试上锁,保存返回值
  10. boolean locked = lock.tryLock();
  11. System.out.println("tryLock: " + locked);
  12. try{
  13. Thread.sleep(10);
  14. sum++;
  15. System.out.println("tryLock: " + sum);
  16. }catch (InterruptedException e){
  17. e.printStackTrace();
  18. }finally {
  19. if(locked){
  20. //如果锁定成功了,释放锁
  21. lock.unlock();
  22. }
  23. }
  24. }
  25. }

lockInterruptibly() 这个方法实现了一个可以被打断的加锁,对 interrupt() 方法做出回应。即如果你一个线程拿到了锁之后无限 sleep() ,这时候你再使用 lock() 是无法打断前面那个线程的,其他线程也就永远拿不到那把锁。如果使用 lockInterruptibly() 方法,则调用线程的 interrupt() 方法即可打断它。

  1. public void interrupt(){
  2. Lock lock = new ReentrantLock();
  3. Thread thread1 = new Thread(() -> {
  4. try{
  5. lock.lock();
  6. System.out.println("thread 1 start");
  7. Thread.sleep(Integer.MAX_VALUE);
  8. System.out.println("thread 1 end");
  9. }catch (InterruptedException e){
  10. e.printStackTrace();
  11. }finally {
  12. lock.unlock();
  13. }
  14. });
  15. thread1.start();
  16. Thread thread2 = new Thread(() -> {
  17. try{
  18. //lock.lock();
  19. lock.lockInterruptibly();
  20. System.out.println("thread 2 start");
  21. Thread.sleep(2);
  22. System.out.println("thread 2 end");
  23. }catch (InterruptedException e){
  24. e.printStackTrace();
  25. }finally {
  26. lock.unlock();
  27. }
  28. });
  29. thread2.start();
  30. try {
  31. Thread.sleep(20);
  32. }catch (InterruptedException e){
  33. e.printStackTrace();
  34. }
  35. thread2.interrupt();
  36. }

上例中如果 Thread 2 中使用的是 lock() ,则一直不会执行直到 Thread 1 睡眠结束,thread2.interrupt(); 没有任何作用。如果是 lock.lockInterruptibly(); ,则在调用了 thread2.interrupt();时 Thread 2 就会被打断。

当然这些功能也不要问我有什么应用场景哈,我也不知道。

公平锁,也是 ReentrantLock 的一个特点。原本锁是不公平的,但是当使用 ReentrantLock 并且指定开启了公平锁之后,就会按照先来后到的原则,先来先行,每个人都是公平的执行,而不能插队。

CountDownLatch

CountDown 就是倒数,Latch 就是门槛,意思就是一直倒数,直到倒数完我指定的数了才开门放你走。

用一个例子描述就是,一个人是一个线程,每个人要从学校出发去春游,一共100人,一辆车运走。那 Latch 就是100,我要等 100 个学生都来齐了,我才能开车,不然谁都只能等着。

countDownLatch.await(); 方法就是一直等待直到倒数完毕。countDownLatch.countDown(); 就是符合某个条件之后调用它倒数一下。

  1. public class CountDownLatchDemo {
  2. public static void main(String[] args) {
  3. Thread[] threads = new Thread[100];
  4. CountDownLatch countDownLatch = new CountDownLatch(threads.length);
  5. for(int i = 0; i < threads.length; i++){
  6. threads[i] = new Thread(() -> {
  7. System.out.println("Thread finished");
  8. //一个线程完成了,记录倒数
  9. countDownLatch.countDown();
  10. });
  11. }
  12. for(int i = 0; i < threads.length; i++){
  13. threads[i].start();
  14. }
  15. try{
  16. //筑起门栏,全部线程执行完之前不让执行往下走
  17. countDownLatch.await();
  18. }catch (InterruptedException e){
  19. e.printStackTrace();
  20. }
  21. System.out.println("end");
  22. }
  23. }

另外有一个实现是使用原本线程的 join() 也可以实现这个功能,不过这个功能实现是一个一个join() ,等全部 join() 完了才可以,相较上面的实现不够灵活。

  1. public void join(){
  2. Thread[] threads = new Thread[100];
  3. for(int i = 0; i < threads.length; i++){
  4. threads[i] = new Thread(() -> {
  5. System.out.println("Thread finished");
  6. //一个线程完成了,记录倒数
  7. });
  8. }
  9. for(int i = 0; i < threads.length; i++){
  10. threads[i].start();
  11. }
  12. for(int i = 0; i < threads.length; i++){
  13. try {
  14. threads[i].join();
  15. }catch (InterruptedException e){
  16. e.printStackTrace();
  17. }
  18. }
  19. System.out.println("end");
  20. }

CyclicBarrier

这个同步工具叫 CyclicBarrier,意思是循环栅栏,有一个栅栏,什么时候人满了就把栅栏推倒,然后全部放出去,出去之后栅栏又重新起来,循环操作。

用一个例子举例就是,学生去春游,20人一辆车,100人。栅栏的限额就是20,等20人齐了,开走一辆车,然后下一辆车来了,继续等,等到20人又齐了才继续发车。

该类构造器有两个参数,第一个是栅栏限额,第二个是行为,不传也可以,意思是栅栏满了之后做什么行为。

  1. public class CyclicBarrierDemo {
  2. public static void main(String[] args) {
  3. //定义一个限额20的栅栏,满人之后输出满人了
  4. CyclicBarrier cyclicBarrier = new CyclicBarrier(20,()->{
  5. System.out.println("车满人了,发车!");
  6. });
  7. for(int i = 0; i < 100; i++){
  8. new Thread(() -> {
  9. System.out.println("学生上车了!");
  10. try{
  11. //等车
  12. cyclicBarrier.await();
  13. }catch (InterruptedException e){
  14. e.printStackTrace();
  15. }catch (BrokenBarrierException e){
  16. e.printStackTrace();
  17. }
  18. }).start();
  19. }
  20. }
  21. }

Phaser

Phaser 的翻译是阶段,有点像结合了 CountDownLatch 和 CyclicBarrier,它是按照不同的阶段来对线程进行执行的,本身维护了一个阶段这样的成员变量。每次需要全部线程到达某个阶段了,才能一起进入下一个阶段。

onAdvance() 方法意思是该线程目前到达了一个阶段了,需要等待其他线程一同到达一起进入下一个阶段;
arriveAndAwaitAdvance() 方法是在自定义的阶段里建立栅栏拦截等待全部线程达到同一阶段再进入下一阶段;
arriveAndDeregister() 方法是不再进入下一个阶段,退出流程
register() 方法相反,可以往上加入阶段一起往下走。

ReadWriteLock

ReadWriteLock 是读写锁。读写锁的概念就是共享锁和排他锁,读锁是共享锁,写锁就是排他锁。

当普通使用锁的时候,可能你会把读和写的操作都锁上了,然后这两种操作都是串行操作的,这样一来对于读来说性能就会有所浪费。于是我们可以换一种实现方法,使用读写锁,当读线程进来的时候就使用读锁,大家一起读,但是你写线程就先别进来了,等我读完先;当写线程进来的时候就使用写锁,你们那些读进程就别进来读了,等我写完先。

简单来说读写锁就是对同一个共享数据进行管理的时候,允许多个读线程一起读,但是读的过程中不能写,允许一个线程写,但是读线程不能读。

  1. public class ReadWriteLockDemo {
  2. private int sum = 0;
  3. /**
  4. * 读方法,传读锁
  5. * @param lock 读锁
  6. */
  7. public void read(Lock lock){
  8. try{
  9. lock.lock();
  10. Thread.sleep(1000);
  11. System.out.println("Read: " + sum);
  12. }catch (InterruptedException e){
  13. e.printStackTrace();
  14. }finally {
  15. lock.unlock();
  16. }
  17. }
  18. /**
  19. * 写方法,传入写锁
  20. * @param lock 写锁
  21. */
  22. public void write(Lock lock){
  23. try{
  24. lock.lock();
  25. Thread.sleep(10);
  26. sum++;
  27. System.out.println("Write: " + sum);
  28. }catch (InterruptedException e){
  29. e.printStackTrace();
  30. }finally {
  31. lock.unlock();
  32. }
  33. }
  34. }
  35. public class Client {
  36. private static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  37. private static Lock readLock = readWriteLock.readLock();
  38. private static Lock writeLock = readWriteLock.writeLock();
  39. public static void main(String[] args) {
  40. ReadWriteLockDemo readWriteLockDemo = new ReadWriteLockDemo();
  41. Runnable runnableRead = () -> readWriteLockDemo.read(readLock);
  42. Runnable runnableWrite = () -> readWriteLockDemo.write(writeLock);
  43. for(int i = 0; i < 100; i++){
  44. new Thread(runnableRead).start();
  45. }
  46. for(int i = 0; i < 20; i++){
  47. new Thread(runnableWrite).start();
  48. }
  49. }
  50. }

Exchanger

Exchanger 叫做交换器,用于两个线程之间交换数据用的。其原理相当于第一个线程把自己的成员变量放入 Exchanger 这个容器的第一个格子中,然后阻塞;第二个线程也把自己的成员变量放入这个容器的第二个格子中,然后这两个格子的数据交换,两个线程就继续往下跑。
Exchanger 只能是两个线程之间,交换只能两两进行。

exchange() 交换

  1. public class ExchangerDemo {
  2. private static Exchanger<String> exchanger = new Exchanger<>();
  3. public static void main(String[] args) {
  4. new Thread(() -> {
  5. String s1 = "exchange param 1";
  6. try{
  7. System.out.println("Thread 1 not exchanged:" + s1);
  8. s1 = exchanger.exchange(s1);
  9. }catch (InterruptedException e){
  10. e.printStackTrace();
  11. }
  12. System.out.println("Thread 1 exchanged:" + s1);
  13. }).start();
  14. new Thread(() -> {
  15. String s2 = "exchange param 2";
  16. try{
  17. System.out.println("Thread 2 not exchanged:" + s2);
  18. s2 = exchanger.exchange(s2);
  19. }catch (InterruptedException e){
  20. e.printStackTrace();
  21. }
  22. System.out.println("Thread 2 exchanged:" + s2);
  23. }).start();
  24. }
  25. }
  1. 输出:
  2. Thread 1 not exchanged:exchange param 1
  3. Thread 2 not exchanged:exchange param 2
  4. Thread 2 exchanged:exchange param 1
  5. Thread 1 exchanged:exchange param 2

Semaphore

Semaphore 即信号量,又称信号灯,用于限流。创建的时候会按初始值创建一个一定大小的容器,每当有线程调用 acquire() 的时候,这个限量就减一,直到为0了,其他线程调用 acquire() 的时候就会阻塞。调用 release() 的时候这个限量就加一,相当于释放锁,而且每次调用 acquire() 的时候都注意要在最后 release() 掉。

该工具适合用于限流的场景,如秒杀商品。具体操作不举例了,其他文章里有详细介绍。

总结

ReentrantLock 比 synchronized 更灵活更方便;
CountDownLatch 是一个倒计时门槛,计数完了才开门,程序才继续往下执行;
CyclicBarrier 是一个循环栅栏,每次达到一定限额才开门,程序继续执行,然后栅栏恢复继续拦截;
Phaser 是一个分阶段栅栏;
ReadWriteLock 是一个读写锁,重点掌握;
Semaphore 是限流用的;
Exchanger 用于两个线程之间两两交换。