ReentrantLock

可以替代synchronized

相对于Synchronized的优势

可以使用trylock

  1. public class T03_ReentrantLock3 {
  2. Lock lock = new ReentrantLock();
  3. void m1() {
  4. try {
  5. lock.lock();
  6. for (int i = 0; i < 7; i++) {
  7. TimeUnit.SECONDS.sleep(1);
  8. System.out.println(i);
  9. }
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. } finally {
  13. lock.unlock();
  14. }
  15. }
  16. /**
  17. * 使用tryLock进行尝试锁定,不管锁定与否,方法都将继续执行
  18. * 可以根据tryLock的返回值来判定是否锁定
  19. * 也可以指定tryLock的时间,由于tryLock(time)抛出异常,所以要注意unclock的处理,必须放到finally中
  20. */
  21. void m2() {
  22. /*
  23. boolean locked = lock.tryLock();
  24. System.out.println("m2 ..." + locked);
  25. if(locked) lock.unlock();
  26. */
  27. boolean locked = false;
  28. try {
  29. locked = lock.tryLock(5, TimeUnit.SECONDS);
  30. System.out.println("m2 ..." + locked);
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. } finally {
  34. if(locked) lock.unlock();
  35. }
  36. }
  37. public static void main(String[] args) {
  38. T03_ReentrantLock3 rl = new T03_ReentrantLock3();
  39. new Thread(rl::m1).start();
  40. try {
  41. TimeUnit.SECONDS.sleep(1);
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }
  45. new Thread(rl::m2).start();
  46. }
  47. }

可以打断线程等待 lockInterruptibly

  1. public class T04_ReentrantLock4 {
  2. public static void main(String[] args) {
  3. Lock lock = new ReentrantLock();
  4. Thread t1 = new Thread(()->{
  5. try {
  6. lock.lock();
  7. System.out.println("t1 start");
  8. TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
  9. System.out.println("t1 end");
  10. } catch (InterruptedException e) {
  11. System.out.println("interrupted!");
  12. } finally {
  13. lock.unlock();
  14. }
  15. });
  16. t1.start();
  17. Thread t2 = new Thread(()->{
  18. try {
  19. //lock.lock();
  20. lock.lockInterruptibly(); //可以对interrupt()方法做出响应
  21. System.out.println("t2 start");
  22. TimeUnit.SECONDS.sleep(5);
  23. System.out.println("t2 end");
  24. } catch (InterruptedException e) {
  25. System.out.println("interrupted!");
  26. } finally {
  27. lock.unlock();
  28. }
  29. });
  30. t2.start();
  31. try {
  32. TimeUnit.SECONDS.sleep(1);
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. t1.interrupt(); //打断线程2的等待
  37. }
  38. }

公平锁

线程获取锁有先后顺序(通过队列实现)

  1. public class T05_ReentrantLock5 extends Thread {
  2. private static ReentrantLock lock=new ReentrantLock(true); //参数为true表示为公平锁,请对比输出结果
  3. public void run() {
  4. for(int i=0; i<100; i++) {
  5. lock.lock();
  6. try{
  7. System.out.println(Thread.currentThread().getName()+"获得锁");
  8. }finally{
  9. lock.unlock();
  10. }
  11. }
  12. }
  13. public static void main(String[] args) {
  14. T05_ReentrantLock5 rl=new T05_ReentrantLock5();
  15. Thread th1=new Thread(rl);
  16. Thread th2=new Thread(rl);
  17. th1.start();
  18. th2.start();
  19. }
  20. }

CountDownLatch

  1. public class T06_TestCountDownLatch {
  2. public static void main(String[] args) {
  3. usingJoin();
  4. usingCountDownLatch();
  5. }
  6. private static void usingCountDownLatch() {
  7. Thread[] threads = new Thread[100];
  8. CountDownLatch latch = new CountDownLatch(threads.length);
  9. for(int i=0; i<threads.length; i++) {
  10. threads[i] = new Thread(()->{
  11. int result = 0;
  12. for(int j=0; j<10000; j++) result += j;
  13. latch.countDown(); // threads - 1,为0时 latch.await() 处继续
  14. });
  15. }
  16. for (int i = 0; i < threads.length; i++) {
  17. threads[i].start();
  18. }
  19. try {
  20. latch.await();
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. System.out.println("end latch");
  25. }
  26. private static void usingJoin() {
  27. Thread[] threads = new Thread[100];
  28. for(int i=0; i<threads.length; i++) {
  29. threads[i] = new Thread(()->{
  30. int result = 0;
  31. for(int j=0; j<10000; j++) result += j;
  32. });
  33. }
  34. for (int i = 0; i < threads.length; i++) {
  35. threads[i].start();
  36. }
  37. for (int i = 0; i < threads.length; i++) {
  38. try {
  39. threads[i].join();
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. }
  43. }
  44. System.out.println("end join");
  45. }
  46. }

CyclicBarrier

  1. public class T07_TestCyclicBarrier {
  2. public static void main(String[] args) {
  3. //CyclicBarrier barrier = new CyclicBarrier(20);
  4. CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("满人"));
  5. /*CyclicBarrier barrier = new CyclicBarrier(20, new Runnable() {
  6. @Override
  7. public void run() {
  8. System.out.println("满人,发车");
  9. }
  10. });*/
  11. for(int i=0; i<100; i++) {
  12. new Thread(()->{
  13. try {
  14. barrier.await(); // 阻塞
  15. // ... 执行代码
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. } catch (BrokenBarrierException e) {
  19. e.printStackTrace();
  20. }
  21. }).start();
  22. }
  23. }
  24. }
  25. // result
  26. // 满人
  27. // 满人
  28. // 满人
  29. // 满人
  30. // 满人

场景

限流(Guava RateLimiter)

Phaser

  1. public class T09_TestPhaser2 {
  2. static Random r = new Random();
  3. static MarriagePhaser phaser = new MarriagePhaser();
  4. static void milliSleep(int milli) {
  5. try {
  6. TimeUnit.MILLISECONDS.sleep(milli);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. }
  11. public static void main(String[] args) {
  12. phaser.bulkRegister(7);
  13. for(int i=0; i<5; i++) {
  14. new Thread(new Person("p" + i)).start();
  15. }
  16. new Thread(new Person("新郎")).start();
  17. new Thread(new Person("新娘")).start();
  18. }
  19. static class MarriagePhaser extends Phaser {
  20. @Override
  21. protected boolean onAdvance(int phase, int registeredParties) {
  22. switch (phase) {
  23. case 0:
  24. System.out.println("所有人到齐了!" + registeredParties);
  25. System.out.println();
  26. return false;
  27. case 1:
  28. System.out.println("所有人吃完了!" + registeredParties);
  29. System.out.println();
  30. return false;
  31. case 2:
  32. System.out.println("所有人离开了!" + registeredParties);
  33. System.out.println();
  34. return false;
  35. case 3:
  36. System.out.println("婚礼结束!新郎新娘抱抱!" + registeredParties);
  37. return true;
  38. default:
  39. return true;
  40. }
  41. }
  42. }
  43. static class Person implements Runnable {
  44. String name;
  45. public Person(String name) {
  46. this.name = name;
  47. }
  48. public void arrive() {
  49. milliSleep(r.nextInt(1000));
  50. System.out.printf("%s 到达现场!\n", name);
  51. phaser.arriveAndAwaitAdvance();
  52. }
  53. public void eat() {
  54. milliSleep(r.nextInt(1000));
  55. System.out.printf("%s 吃完!\n", name);
  56. phaser.arriveAndAwaitAdvance();
  57. }
  58. public void leave() {
  59. milliSleep(r.nextInt(1000));
  60. System.out.printf("%s 离开!\n", name);
  61. phaser.arriveAndAwaitAdvance();
  62. }
  63. private void hug() {
  64. if(name.equals("新郎") || name.equals("新娘")) {
  65. milliSleep(r.nextInt(1000));
  66. System.out.printf("%s 洞房!\n", name);
  67. phaser.arriveAndAwaitAdvance();
  68. } else {
  69. phaser.arriveAndDeregister();
  70. //phaser.register()
  71. }
  72. }
  73. @Override
  74. public void run() {
  75. arrive();
  76. eat();
  77. leave();
  78. hug();
  79. }
  80. }
  81. }

ReadWriteLock -> StampedLock

共享锁

排它锁

使用

public class Counter {
    private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
    private final Lock rlock = rwlock.readLock();
    private final Lock wlock = rwlock.writeLock();
    private int[] counts = new int[10];
    public void inc(int index) {
        wlock.lock(); // 加写锁
        try {
            counts[index] += 1;
        } finally {
            wlock.unlock(); // 释放写锁
        }
    }
    public int[] get() {
        rlock.lock(); // 加读锁
        try {
            return Arrays.copyOf(counts, counts.length);
        } finally {
            rlock.unlock(); // 释放读锁
        }
    }
}

Semaphore

使用

public class T11_TestSemaphore {
    public static void main(String[] args) {
        //Semaphore s = new Semaphore(2);
        Semaphore s = new Semaphore(2, true);
        //允许一个线程同时执行
        //Semaphore s = new Semaphore(1);

        new Thread(()->{
            try {
                s.acquire(); // 获取许可 number - 1

                System.out.println("T1 running...");
                Thread.sleep(200);
                System.out.println("T1 running...");

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                s.release();
            }
        }).start();

        new Thread(()->{
            try {
                s.acquire();

                System.out.println("T2 running...");
                Thread.sleep(200);
                System.out.println("T2 running...");

                s.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

场景

  • 用于多个共享资源的互斥使用.
  • 用于并发线程数的控制.

    Exchanger

    两线程间交换数据 ```java public class T12_TestExchanger {

    static Exchanger exchanger = new Exchanger<>();

    public static void main(String[] args) {

      new Thread(()->{
          String s = "T1";
          try {
              s = exchanger.exchange(s); // 阻塞
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
          System.out.println(Thread.currentThread().getName() + " " + s);
    
      }, "t1").start();
    
    new Thread(()->{
        String s = "T2";
        try {
            s = exchanger.exchange(s);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " " + s);

    }, "t2").start();


}

}

<a name="dmmoq"></a>
## LockSupport
```java
public class T13_TestLockSupport {
    public static void main(String[] args) {
        Thread t = new Thread(()->{
            for (int i = 0; i < 10; i++) {
                System.out.println(i);
                if(i == 5) {
                    LockSupport.park();
//                    LockSupport.park();
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        t.start();

        LockSupport.unpark(t);

    }
}