Basic

CAS

compare and swap(exchange)
image.png
ABA 问题(读取的时候是 A,然后被别的线程修改成 B,然后再修改回 A)可以通过使用 AtomicStampedReference 来给值添加一个版本号,当修改值的时候,同时修改版本,比较值的时候同时比较版本号。(基础类型无所谓,但是引用类型被修改了其中的值,就会发生变化)

  1. public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

lock cmpxchg 指令 cmpxchg 非原子性
lock 指令在执行后面的指令的时候锁定一个北桥信号(硬件)
在线程数量比较多的情况下(未必):LongAdder(分段锁) > AtomicLong > Sychronized

对象在内存中的存储布局

image.png

  • java -XX:+PrintCommandLineFlags -version
  • image.png
  • 对象头 12 个字节:markword 8 个字节,class pointer 4 个字节(开启压缩,不开启:8 个字节)
  • instance date 用来存放对象中的属性
  • 对齐 padding:如果对象前面占用的内存不能被 8 整除,用来补全剩下的。

Object o = new Object(); o 在内存中占用 16 个字节(对象头 12 个字节,padding 4 个字节)

锁升级

image.png

过程

new - 偏向锁 - 轻量级锁(无锁,自旋锁,自适应自旋) - 重量级锁
对象 O 刚 new 出来为无锁态;线程 A 是第一个使用对象的线程,就在对象头的 MarkWord 里面添加自己的线程 id,然后这个时候升级为偏向锁;当有线程前来和线程 A 抢夺对象的使用权,这个时候进行 CAS,看哪个线程可以将自己线程生成的 Lock Record 存放到对象头中,然后别的线程会在旁边进行自旋等待,这个时候升级为轻量级锁;当有线程进行了 10 次自旋,或者CPU 线程占用到一半升级为重量级锁,这个时候除了正在使用对象的线程,其余的线程在锁的栈中进行等待。(用户态 -> 内核态)

锁消除(lock eliminate

  1. public void add(String str1, String str2) {
  2. StringBuffer sb = new StringBuffer();
  3. sb.append(str1).append(str2);
  4. //当这个StringBuffer只是在add这个方法中适用的时候,那么append就不需要再加锁了
  5. }

锁粗化(lock coarsening

  1. public void test(String str) {
  2. StringBuffer sb = new StringBuffer();
  3. int i = 1;
  4. while (i > 100) { //锁加在while循环上
  5. sb.append(str);
  6. i++;
  7. }
  8. }

Cache Line

缓存行对齐 64 字节

MESI Cache 一致性协议

实现数据一致性(MESI,锁总线)
cpu 每个 cache line 标记四种状态

  • Modified 修改
  • Exclusive 独占
  • Shared 共享
  • Invalid 失效

    内存屏障

    屏障两边的指令不可以重排!保障有序!(sfence mfence lfence)
    JSR 内存屏障(Java Standard Request)

  • LoadLoad 屏障:Load1;LoadLoad;Load2,在 Load2 及后续读取操作要读取的数据被访问前,保证 Load1 要读取的数据被读取完毕。

  • StoreStore 屏障:Store1;StoreStore;Store2,在 Store2 及后续写入操作执行前,保证 Store1 的写入操作对其它处理器可见。
  • LoadStore 屏障: Load1;LoadStore;Store2,在Store2 及后续写入操作执行前,保证 Load1 要读取的数据被读取完毕。
  • StoreLoad 屏障:Store1;StoreLoad;Load2,在 Load2 及后续所有读取操作执行前,保证 Store1 的写入对所有处理器可见。

    volatile 如何解决指令重排序

    volatile:禁止指令重排;线程可见;不保证原子性
    只是加了一个标记(ACC_VOLATILE)
    涉及:DCL(Double Check Load)单例需要添加 volatile 吗?需要 ```java //DCL单例 private b = 5; private static volatile T instance;

public static T getInstance() { if (instance == null) { synchronized (instance) { if (instance == null) { instance = new T(); } } } return instance; }

  1. ```java
  2. 0 new #2 <T> --------> 半初始化
  3. 7 astore_1 --------> 执行构造方法
  4. 4 invokespecial #3 <T.<init>> --------> 建立联系
  5. ========== 指令重排 ============
  6. 0 new #2 <T> --------> 半初始化
  7. 4 invokespecial #3 <T.<init>> --------> 建立联系
  8. 7 astore_1 --------> 执行构造方法
  9. ========== 这个时候 =========
  10. if (isntance != null) xxx -> 使用了半初始化的对象(里面的属性的值只是默认值,并没有开始赋值)
  1. ============ jvm内存屏障 ==============
  2. StoreStoreBarrier
  3. volatile 写操作
  4. StoreLoadBarrier
  5. ================
  6. LoadLoadBarrier
  7. volatile 读操作
  8. LoadStoreBarrier
  9. ============ hotspot =============
  10. lock; addl (空操作)

Java 中的引用类型

强软弱虚

强引用

当没有引用指向堆中的对象,对象就会被回收。(普通引用)

软引用

  1. //-Xmx20M
  2. public class Soft {
  3. public static void main(String[] args) {
  4. SoftReference<byte[]> m = new SoftReference<>(new byte[1024 * 1024 * 10]);
  5. System.out.println(m.get());
  6. System.gc();
  7. try {
  8. Thread.sleep(100);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. System.out.println(m.get());
  13. //新创建一个数组,heap装不下,这个时候会进行一次垃圾回收,将软引用的对象回收掉
  14. byte[] bytes = new byte[1024 * 1024 * 11];
  15. System.out.println(m.get()); //null
  16. }
  17. }
  18. // 经常被用来当做缓存

弱引用

  1. public class Weak {
  2. public static void main(String[] args) {
  3. WeakReference<M> m = new WeakReference<>(new M());
  4. System.out.println(m.get());
  5. System.gc();
  6. //进行垃圾回收的时候就会弱引用的对象回收,一次性
  7. System.out.println(m.get()); //null
  8. }
  9. }

ThreadLocal

线程本地对象

  1. public void set(T value) {
  2. Thread t = Thread.currentThread();
  3. ThreadLocalMap map = getMap(t); //ThreadLocal.ThreadLocalMap threadLocals = null;
  4. if (map != null)
  5. map.set(this, value); //将当前的ThreadLocal作为map的key,存入的值作为value
  6. else
  7. createMap(t, value);
  8. }
  9. private void remove(ThreadLocal<?> key) {
  10. Entry[] tab = table;
  11. int len = tab.length;
  12. int i = key.threadLocalHashCode & (len-1);
  13. for (Entry e = tab[i];
  14. e != null;
  15. e = tab[i = nextIndex(i, len)]) {
  16. if (e.get() == key) {
  17. e.clear();
  18. expungeStaleEntry(i);
  19. return;
  20. }
  21. }
  22. }
  23. static class ThreadLocalMap {
  24. //若使用强引用,即使使threadLocal = null;那么key的引用仍然使用ThreadLocal对象,对出现内存泄漏
  25. //但是弱引用还是存在内存泄漏,ThreadLocal被回收掉,那么key就为null,那么value就永远不可达
  26. //所以在使用ThradLocal之后,需要执行threadLocal.remove();
  27. static class Entry extends WeakReference<ThreadLocal<?>> {
  28. /** The value associated with this ThreadLocal. */
  29. Object value;
  30. Entry(ThreadLocal<?> k, Object v) {
  31. super(k);
  32. value = v;
  33. }
  34. }
  35. }

虚引用

管理堆外内存(Zero Copy)

  1. public class Phantom {
  2. //NIO
  3. //当Queue里面的东西被回收了,就会触发
  4. private static final ReferenceQueue<M> QUEUE = new ReferenceQueue<>();
  5. public static void main(String[] args) {
  6. PhantomReference<M> phantomReference = new PhantomReference<>(new M(), QUEUE);
  7. System.out.println(phantomReference.get()); //null
  8. System.gc();
  9. System.out.println(phantomReference.get()); //null
  10. }
  11. }

Lock

ReentrantLock

  1. public class T_ReentrantLock {
  2. Lock lock = new ReentrantLock();
  3. public void m() {
  4. try {
  5. lock.lock();
  6. for (int i = 0; i < 10; i++) {
  7. TimeUnit.SECONDS.sleep(1);
  8. System.out.println(i);
  9. }
  10. } catch (Exception e) {
  11. e.printStackTrace();
  12. } finally {
  13. lock.unlock();
  14. }
  15. }
  16. public void n() {
  17. try {
  18. lock.lock();
  19. System.out.println("n()...");
  20. } finally {
  21. lock.unlock();
  22. }
  23. }
  24. public static void main(String[] args) {
  25. T_ReentrantLock t = new T_ReentrantLock();
  26. new Thread(t :: m).start();
  27. try {
  28. TimeUnit.SECONDS.sleep(1);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. new Thread(t :: n).start();
  33. }
  34. }
  • CAS VS Sync
  • tryLock();
  • lockInterruptibly();
  • 可以进行公平锁和非公平锁的切换(判断锁的等待队列里面有没有线程在等待)

    ReadLock

    共享锁

    WriteLock

    排他锁

    ReadWriteLock

    ```java public class T_ReadWriteLock {

    static Lock lock = new ReentrantLock(); private static int value;

    static ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); static Lock readLock = readWriteLock.readLock(); //执行读操作的时候,允许其他的读操作共同执行,分享锁 static Lock writeLock = readWriteLock.writeLock(); //执行写操作的时候,不允许执行其他的写操作,排他锁

    public static void read(Lock lock) {

    1. try {
    2. lock.lock();
    3. Thread.sleep(1000);
    4. System.out.println("read over");
    5. } catch (InterruptedException e) {
    6. e.printStackTrace();
    7. } finally {
    8. lock.unlock();
    9. }

    }

    public static void write(Lock lock, int var) {

    1. try {
    2. lock.lock();
    3. Thread.sleep(1000);
    4. value = var;
    5. System.out.println("write over");
    6. } catch (InterruptedException e) {
    7. e.printStackTrace();
    8. } finally {
    9. lock.unlock();
    10. }

    }

    public static void main(String[] args) { // Lock lockRead = lock; // Lock lockWrite = lock;

    1. Lock lockRead = readLock;
    2. Lock lockWrite = writeLock;
    3. for (int i = 0; i < 18; i++) {
    4. new Thread(() -> read(lockRead)).start();
    5. }
    6. for (int i = 0; i < 2; i++) {
    7. new Thread(() -> write(lockWrite, new Random().nextInt())).start();
    8. }

    }

}

  1. <a name="gBLE3"></a>
  2. ### _CountDownLatch_
  3. ```java
  4. public class T_CountDown {
  5. public static void usingCountDown() {
  6. Thread[] threads = new Thread[100];
  7. CountDownLatch latch = new CountDownLatch(threads.length);
  8. for (int i = 0; i < threads.length; i++) {
  9. threads[i] = new Thread(() -> {
  10. for (int j = 0; j < 10000; j++) {
  11. j++;
  12. }
  13. latch.countDown();
  14. });
  15. }
  16. for (int i = 0; i < threads.length; i++) {
  17. threads[i].start();
  18. }
  19. try {
  20. latch.await(); //拦截其他的线程,等待当前线程执行完毕
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. System.out.println(latch.getCount());
  25. }
  26. public static void main(String[] args) {
  27. usingCountDown();
  28. }
  29. }

CyclicBarrier

  1. public class T_CyclicBarrier {
  2. public static void main(String[] args) {
  3. CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("满员"));
  4. for (int i = 0; i < 100; i++) {
  5. new Thread(() -> {
  6. try {
  7. barrier.await(); //等待满20个线程,执行barrier
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. } catch (BrokenBarrierException e) {
  11. e.printStackTrace();
  12. }
  13. }).start();
  14. }
  15. }
  16. }

Semaphore

限流

  1. public class T_Semaphore {
  2. public static void main(String[] args) {
  3. Semaphore s = new Semaphore(2); //代表可以同时执行几个线程
  4. new Thread(() -> {
  5. try {
  6. s.acquire(); //将Semaphore中的1置为0,其他的线程不允许访问
  7. System.out.println("T1 running ...");
  8. Thread.sleep(500);
  9. System.out.println("T1 end ...");
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. } finally {
  13. s.release(); //将Semaphore中的0置为1,其他的线程可以开始访问
  14. }
  15. }).start();
  16. new Thread(() -> {
  17. try {
  18. s.acquire();
  19. System.out.println("T2 running ...");
  20. Thread.sleep(500);
  21. System.out.println("T2 end ...");
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. } finally {
  25. s.release();
  26. }
  27. }).start();
  28. }
  29. }

Exchanger

  1. public class T_Exchanger {
  2. static Exchanger<String> exchanger = new Exchanger();
  3. public static void main(String[] args) {
  4. new Thread(() -> {
  5. String s = "T1";
  6. try {
  7. s = exchanger.exchange(s); //线程堵塞,等待下一个可以交换的线程
  8. //然后将两个值进行交换,线程继续执行
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. System.out.println(Thread.currentThread().getName() + "\t" + s);
  13. }, "s1").start();
  14. new Thread(() -> {
  15. String s = "T2";
  16. try {
  17. s = exchanger.exchange(s);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. System.out.println(Thread.currentThread().getName() + "\t" + s);
  22. }, "s2").start();
  23. }
  24. }

LockSupport

  1. public class T_LockSupport {
  2. public static void main(String[] args) {
  3. Thread thread = new Thread(() -> {
  4. for (int i = 0; i < 10; i++) {
  5. System.out.println(i);
  6. try {
  7. TimeUnit.SECONDS.sleep(1);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. if (i == 5) {
  12. LockSupport.park(); //当i等于5的时候,线程阻塞
  13. }
  14. }
  15. });
  16. thread.start();
  17. try {
  18. TimeUnit.SECONDS.sleep(8);
  19. System.out.println("after 8 seconds");
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. LockSupport.unpark(thread); //让线程继续运行
  24. }
  25. }

Phaser

  1. //分段,等到所有需要执行这个阶段的线程执行阶段完毕,进入下一个阶段
  2. public class T_Phaser {
  3. static MarryPhaser phaser = new MarryPhaser();
  4. public static void main(String[] args) {
  5. phaser.bulkRegister(7);
  6. for (int i = 0; i < 5; i++) {
  7. new Thread(new Person("person" + i, 0)).start();
  8. }
  9. new Thread(new Person("新郎", 1)).start();
  10. new Thread(new Person("新娘", 1)).start();
  11. }
  12. static class Person implements Runnable {
  13. String name;
  14. int id;
  15. public Person(){}
  16. public Person(String name, int id) {
  17. this.name = name;
  18. this.id = id;
  19. }
  20. public void arrive() throws InterruptedException {
  21. TimeUnit.SECONDS.sleep(1);
  22. System.out.println(name + "已经到达");
  23. phaser.arriveAndAwaitAdvance();
  24. }
  25. public void eat() throws InterruptedException {
  26. TimeUnit.SECONDS.sleep(1);
  27. System.out.println(name + "吃完");
  28. phaser.arriveAndAwaitAdvance();
  29. }
  30. public void leave() throws InterruptedException {
  31. TimeUnit.SECONDS.sleep(1);
  32. System.out.println(name + "离开");
  33. phaser.arriveAndAwaitAdvance();
  34. }
  35. public void intoBridalRoom() throws InterruptedException {
  36. TimeUnit.SECONDS.sleep(1);
  37. if (id == 1) {
  38. System.out.println(name + "入洞房");
  39. phaser.arriveAndAwaitAdvance();
  40. } else {
  41. phaser.arriveAndDeregister();
  42. }
  43. }
  44. public void getUp() throws InterruptedException {
  45. TimeUnit.SECONDS.sleep(1);
  46. if (id == 1) {
  47. System.out.println(name + "起床");
  48. phaser.arriveAndAwaitAdvance();
  49. }
  50. }
  51. @Override
  52. public void run() {
  53. try {
  54. arrive();
  55. eat();
  56. leave();
  57. intoBridalRoom();
  58. getUp();
  59. } catch (InterruptedException e) {
  60. e.printStackTrace();
  61. }
  62. }
  63. }
  64. static class MarryPhaser extends Phaser {
  65. @Override
  66. protected boolean onAdvance(int phase, int registeredParties) {
  67. switch (phase) {
  68. case 0:
  69. System.out.println("所有人都到齐:" + registeredParties);
  70. System.out.println();
  71. return false;
  72. case 1:
  73. System.out.println("所有人吃完饭:" + registeredParties);
  74. System.out.println();
  75. return false;
  76. case 2:
  77. System.out.println("所有人离开:" + registeredParties);
  78. System.out.println();
  79. return false;
  80. case 3:
  81. System.out.println("新娘新郎入洞房:" + registeredParties);
  82. System.out.println();
  83. return false;
  84. case 4:
  85. System.out.println("新娘新郎起床:" + registeredParties);
  86. return true;
  87. default:
  88. return false;
  89. }
  90. }
  91. }
  92. }

Test_01

  1. /**
  2. * 实现一个容器,提供两个方法add,size
  3. * 写两个线程,线程1添加10个元素到容器中,
  4. * 线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
  5. */

Using CountDownLatch

  1. //使用CountDownLatch
  2. public class T_Test_02 {
  3. //volatile尽量不要用来使用修饰引用对象
  4. // volatile List lists = Collections.synchronizedList(new LinkedList<>());
  5. volatile List lists = new LinkedList();
  6. public void add(Object o) {
  7. lists.add(o);
  8. }
  9. public int size() {
  10. return lists.size();
  11. }
  12. public static void main(String[] args) {
  13. T_Test_02 t = new T_Test_02();
  14. CountDownLatch latch1 = new CountDownLatch(1);
  15. CountDownLatch latch2 = new CountDownLatch(1);
  16. new Thread(() -> {
  17. System.out.println("t2 start");
  18. if (t.size() != 5) {
  19. try {
  20. latch1.await();
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. latch2.countDown();
  26. System.out.println("t2 end");
  27. }, "t2").start();
  28. try {
  29. TimeUnit.SECONDS.sleep(1);
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. new Thread(() -> {
  34. System.out.println("t1 start");
  35. for (int i = 0; i < 10; i++) {
  36. t.add(new Object());
  37. System.out.println("add " + i);
  38. if (t.size() == 5) {
  39. latch1.countDown();
  40. try {
  41. latch2.await();
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }
  45. }
  46. // try {
  47. // TimeUnit.SECONDS.sleep(1);
  48. // } catch (InterruptedException e) {
  49. // e.printStackTrace();
  50. // }
  51. }
  52. System.out.println("t1 end");
  53. }, "t1").start();
  54. }
  55. }

Using LockSupport

  1. //LockSupport
  2. //condition1
  3. public class T_Test_03 {
  4. volatile List lists = new LinkedList();
  5. public void add(Object o) {
  6. lists.add(o);
  7. }
  8. public int size() {
  9. return lists.size();
  10. }
  11. public static void main(String[] args) {
  12. T_Test_03 t = new T_Test_03();
  13. Thread t1 = new Thread(() -> {
  14. for (int i = 0; i < 10; i++) {
  15. t.add(new Object());
  16. System.out.println("add " + t.size());
  17. if (t.size() == 5) {
  18. LockSupport.park();
  19. }
  20. }
  21. }, "t1");
  22. t1.start();
  23. new Thread(() -> {
  24. while (true) {
  25. if (t.size() == 5) {
  26. LockSupport.unpark(t1);
  27. break;
  28. }
  29. }
  30. System.out.println("t2 end");
  31. }, "t2").start();
  32. }
  33. }
  34. //condition2
  35. public class T_Test_03 {
  36. volatile List lists = new LinkedList();
  37. public void add(Object o) {
  38. lists.add(o);
  39. }
  40. public int size() {
  41. return lists.size();
  42. }
  43. static Thread t1, t2;
  44. public static void main(String[] args) {
  45. T_Test_03 t = new T_Test_03();
  46. t2 = new Thread(() -> {
  47. System.out.println("t2 start");
  48. LockSupport.park();
  49. System.out.println("t2 end");
  50. LockSupport.unpark(t1);
  51. }, "t2");
  52. t2.start();
  53. try {
  54. TimeUnit.SECONDS.sleep(1);
  55. } catch (InterruptedException e) {
  56. e.printStackTrace();
  57. }
  58. t1 = new Thread(() -> {
  59. for (int i = 0; i < 10; i++) {
  60. t.add(new Object());
  61. System.out.println("add " + t.size());
  62. if (t.size() == 5) {
  63. LockSupport.unpark(t2);
  64. LockSupport.park();
  65. }
  66. }
  67. }, "t1");
  68. t1.start();
  69. }
  70. }

Using Synchronized

  1. //synchronized
  2. public class T_Test_04 {
  3. volatile List lists = new LinkedList();
  4. public void add(Object o) {
  5. lists.add(o);
  6. }
  7. public int size() {
  8. return lists.size();
  9. }
  10. public static void main(String[] args) {
  11. T_Test_04 t = new T_Test_04();
  12. final Object lock = new Object();
  13. new Thread(() -> {
  14. synchronized (lock) {
  15. System.out.println("t2 start");
  16. if (t.size() != 5) {
  17. try {
  18. lock.wait();
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. System.out.println("t2 end");
  24. lock.notify();
  25. }
  26. }, "t2").start();
  27. try {
  28. TimeUnit.SECONDS.sleep(1);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. new Thread(() -> {
  33. synchronized (lock) {
  34. for (int i = 0; i < 10; i++) {
  35. t.add(new Object());
  36. System.out.println("add " + t.size());
  37. if (t.size() == 5) {
  38. lock.notify();
  39. try {
  40. lock.wait();
  41. } catch (InterruptedException e) {
  42. e.printStackTrace();
  43. }
  44. }
  45. }
  46. }
  47. }, "t1").start();
  48. }
  49. }

Test_02

  1. /**
  2. * 写一个固定容量同步容器,拥有put和get方法,以及getCount方法,
  3. * 能够支持2个生产者线程以及10个消费者线程的阻塞调用
  4. */

Using Synchronized

  1. //synchronized
  2. public class MyContainer_01<T> {
  3. final private LinkedList<T> list = new LinkedList<>();
  4. final private int MAX = 10;
  5. private int count = 0;
  6. private synchronized void put(T t) {
  7. while (list.size() == MAX) {
  8. try {
  9. this.wait();
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. list.add(t);
  15. ++count;
  16. System.out.println("当前有" + count + "个");
  17. this.notifyAll(); //通知消费者线程进行消费
  18. }
  19. private synchronized T get() {
  20. T t = null;
  21. while (list.size() == 0) {
  22. try {
  23. this.wait();
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. t = list.removeFirst();
  29. -- count;
  30. System.out.println("当前有" + count + "个");
  31. this.notifyAll(); //通知生产者进行生产
  32. return t;
  33. }
  34. public static void main(String[] args) {
  35. MyContainer_01<String> c = new MyContainer_01<>();
  36. //启动消费者线程
  37. for (int i = 0; i < 10; i++) {
  38. new Thread(() -> {
  39. for (int j = 0; j < 5; j++) {
  40. System.out.println(c.get());
  41. }
  42. }, "c" + i).start();
  43. }
  44. try {
  45. TimeUnit.SECONDS.sleep(2);
  46. } catch (InterruptedException e) {
  47. e.printStackTrace();
  48. }
  49. //启动生产者线程
  50. for (int i = 0; i < 2; i++) {
  51. new Thread(() -> {
  52. for (int j = 0; j < 25; j++) {
  53. c.put(Thread.currentThread().getName() + " " + j);
  54. }
  55. }, "p" + i).start();
  56. }
  57. }
  58. }

Using Condition

  1. public class MyContainer_02<T> {
  2. final private LinkedList<T> list = new LinkedList<>();
  3. final private int MAX = 10;
  4. private int count = 0;
  5. private Lock lock = new ReentrantLock();
  6. private Condition producer = lock.newCondition();
  7. private Condition consumer = lock.newCondition();
  8. private void put(T t) {
  9. try {
  10. lock.lock();
  11. while (list.size() == MAX) {
  12. producer.await();
  13. }
  14. list.add(t);
  15. ++count;
  16. consumer.signalAll(); //通知消费者线程消费
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. } finally {
  20. lock.unlock();
  21. }
  22. }
  23. private T get() {
  24. T t = null;
  25. try {
  26. lock.lock();
  27. while (list.size() == 0) {
  28. consumer.await();
  29. }
  30. t = list.removeFirst();
  31. -- count;
  32. producer.signalAll(); //通知生产者生产
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. } finally {
  36. lock.unlock();
  37. }
  38. return t;
  39. }
  40. public static void main(String[] args) {
  41. MyContainer_02<String> c = new MyContainer_02<>();
  42. //启动消费者线程
  43. for (int i = 0; i < 10; i++) {
  44. new Thread(() -> {
  45. for (int j = 0; j < 5; j++) {
  46. System.out.println(c.get());
  47. }
  48. }, "c" + i).start();
  49. }
  50. try {
  51. TimeUnit.SECONDS.sleep(2);
  52. } catch (InterruptedException e) {
  53. e.printStackTrace();
  54. }
  55. //启动生产者线程
  56. for (int i = 0; i < 2; i++) {
  57. new Thread(() -> {
  58. for (int j = 0; j < 25; j++) {
  59. c.put(Thread.currentThread().getName() + " " + j);
  60. }
  61. }, "p" + i).start();
  62. }
  63. }
  64. }

AQS

AbstractQueuedSynchronizer
AQS 就是基于 CLH 队列,用 volatile 修饰共享变量 state,线程通过 CAS 去改变状态符,成功则获取锁成功,失败则进入等待队列,等待被唤醒。

JUC - 图5
reentrantLock.lock();

  1. //ReentrantLock
  2. public void lock() {
  3. sync.lock();
  4. }
  5. final boolean nonfairTryAcquire(int acquires) {
  6. final Thread current = Thread.currentThread();
  7. int c = getState();
  8. if (c == 0) {
  9. if (compareAndSetState(0, acquires)) {
  10. setExclusiveOwnerThread(current);
  11. return true;
  12. }
  13. }
  14. else if (current == getExclusiveOwnerThread()) {
  15. int nextc = c + acquires;
  16. if (nextc < 0) // overflow
  17. throw new Error("Maximum lock count exceeded");
  18. setState(nextc);
  19. return true;
  20. }
  21. return false;
  22. }
  23. //AbstractQueuedSynchronizer
  24. /**
  25. * The synchronization state.
  26. */
  27. private volatile int state;