一、线程间通信方式

1.volatile关键字

强制不同线程每次去主存读取信息

2.使用wait()和notiry()方法

这是Object类提供的方法

3.使用countDownLatch(倒计时器)

CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。
原理如下:

内部类Sync直接继承自AQS,并重写了tryAcquireShare和tryReleaseShare方法。实现了当state不为零时将线程入队挂起,在tryReleaseShare执行减一后,如果state为0则,逐个将入队的线程唤醒的操作 当有多个线程调用await时候,当countDown到0的时候,所有调用await的线程都会被唤醒

使用方法如下:

一些线程执行countDownLatch.await(); 另一些线程执行countDownLatch.countDown(); 当countDownLatch倒计时到0的时候从countDownLatch.await();处继续执行

  1. //线程B等待计数器倒计时完成之后才继续向下执行
  2. public class CountDownTest {
  3. static CountDownLatch countDownLatch=new CountDownLatch(2);
  4. public static void main(String[] args) {
  5. new MyThread("B",countDownLatch).start();
  6. new MyThread("A",countDownLatch).start();
  7. new MyThread("A",countDownLatch).start();
  8. }
  9. }
  10. class MyThread extends Thread{
  11. CountDownLatch countDownLatch;
  12. String name;
  13. MyThread(String name,CountDownLatch countDownLatch){
  14. this.name=name;
  15. this.countDownLatch=countDownLatch;
  16. }
  17. @Override
  18. public void run() {
  19. try {
  20. if(!"A".equals(this.name)){
  21. countDownLatch.await();
  22. }
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. System.out.println(this.name+" "+"执行");
  27. countDownLatch.countDown();
  28. }
  29. }
  30. //输出结果:
  31. A 执行
  32. A 执行
  33. B 执行

4.ReentrantLock和Condition

4.1 ReentrantLock和Synchronized的区别

1.Synchronized是JVM的锁,ReentrantLock是一个类
2.Synchronized加解锁自动进行,ReentrantLock加解锁手动进行,比较麻烦,但是比较灵活
通过lock()加锁,通过unlock()解锁,解锁操作最好放在finally块中
3.Synchronized是不可中断锁,获取不到一个锁就会一直等待,
ReentrantLock是可中断锁,可以设置等待多久返回去做其他事情
4.ReentrantLock可以初始化为公平锁,Synchronized只能是非公平锁。

4.2 抽象队列同步器(AQS)

AQS定义了多线程对临界资源的访问,可以基于AQS定制不同的同步访问逻辑来定制各种锁,是一种模板设计模式
资源的访问可以分为独占式或共享式
AQS的核心思想是,如果被请求的资源空闲,就将请求资源的线程设置为工作线程,将共享资源设置为锁定状态
如果请求的资源已经在锁定状态,那么就使用一个双向链表维护等待队列
要访问的资源的同步状态state(为一个整数)使用volatile修饰,使用CAS方式去更新state:

不同的锁只需要实现如何获取(acquire)资源的部分即可,等待队列已经由AQS实现好了

4.3 基于AQS的ReentrantLock

ReentrantLock只实现了Lock和Serializeable接口,其内部类Sync 继承了AbstractQueuedSynchronizer
利用了AQS框架来实现加锁机制:

  1. //AQS类的逻辑,使用acquire函数来获取锁,核心是tryAcquire
  2. //tryAcquire获取锁成功了就什么的都不干,失败了就acquireQueued入队
  3. public final void acquire(int arg) {
  4. if (!tryAcquire(arg) &&
  5. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  6. selfInterrupt();
  7. }
  8. //非公平锁对获取锁的实现
  9. final void lock() {
  10. if (compareAndSetState(0, 1))
  11. setExclusiveOwnerThread(Thread.currentThread());
  12. else
  13. acquire(1);
  14. }
  15. //非公平锁的tryAcquire调用nonfairTryAcquire
  16. final boolean nonfairTryAcquire(int acquires) {
  17. final Thread current = Thread.currentThread();
  18. int c = getState();
  19. //当资源空闲(c=0),直接通过CAS操作独占
  20. if (c == 0) {
  21. if (compareAndSetState(0, acquires)) {
  22. setExclusiveOwnerThread(current);
  23. return true;
  24. }
  25. }
  26. //否则判断占据的线程是否是自身,是则可重入锁,不是则获取失败
  27. else if (current == getExclusiveOwnerThread()) {
  28. int nextc = c + acquires;
  29. if (nextc < 0) // overflow
  30. throw new Error("Maximum lock count exceeded");
  31. setState(nextc);
  32. return true;
  33. }
  34. return false;
  35. }
  36. // 公平锁对tryAcquire实现
  37. final void lock() {
  38. acquire(1);
  39. }
  40. //公平锁的tryAcquire调用tryAcquire
  41. protected final boolean tryAcquire(int acquires) {
  42. final Thread current = Thread.currentThread();
  43. int c = getState();
  44. if (c == 0) {
  45. //这里和非公平锁的区别是,非公平锁cas修改成功就行,这里还要判断是否有等待队列
  46. if (!hasQueuedPredecessors() &&
  47. compareAndSetState(0, acquires)) {
  48. setExclusiveOwnerThread(current);
  49. return true;
  50. }
  51. }
  52. //重入机制和非公平锁是一样的
  53. else if (current == getExclusiveOwnerThread()){
  54. int nextc = c + acquires;
  55. if (nextc < 0)
  56. throw new Error("Maximum lock count exceeded");
  57. setState(nextc);
  58. return true;
  59. }
  60. return false;
  61. }

解锁的unlock过程对于公平锁和非公平锁都是一样的
unlock将状态减去一定数值,当状态为0则表示线程释放锁

  1. public void unlock() {
  2. sync.release(1);
  3. }
  4. protected final boolean tryRelease(int releases) {
  5. int c = getState() - releases;
  6. if (Thread.currentThread() != getExclusiveOwnerThread())
  7. throw new IllegalMonitorStateException();
  8. boolean free = false;
  9. if (c == 0) {
  10. free = true;
  11. setExclusiveOwnerThread(null);
  12. }
  13. setState(c);
  14. return free;
  15. }

流程如下:
image.png

4.4 结合Condition

Condition通过Lock.newCondition()获取,相比Synchronized的wait()和notify()来说,condition的await()和signal()更灵活,因为一个Lock可以新建多个Condition,实现有选择的线程通知
condition的await()和signal()更灵活也需要在lock()和unlock()之间执行
下面是一个使用condition的await()和signal()实现生产者和消费者的示例,缓冲区大小为5,满了就停止生产,为0就停止消费,中间过程可以消费和生产:
注意:使用两个condition分别控制空了和满了的停止消费和生产:

  1. public class ConditionTest {
  2. public static void main(String[] args) {
  3. Queue<Integer> buffer=new LinkedList<>();
  4. ReentrantLock lock=new ReentrantLock();
  5. Condition empty=lock.newCondition();
  6. Condition full=lock.newCondition();
  7. new Thread(new Producer(empty,full,buffer,lock)).start();
  8. new Thread(new Producer(empty,full,buffer,lock)).start();
  9. new Thread(new Consumer(empty,full,buffer,lock)).start();
  10. }
  11. }
  12. class Producer implements Runnable{
  13. Condition empty,full;
  14. Queue<Integer> buffer;
  15. Lock lock;
  16. Producer(Condition empty,Condition full,Queue<Integer> buffer,Lock lock){
  17. this.empty=empty;
  18. this.full=full;
  19. this.buffer=buffer;
  20. this.lock=lock;
  21. }
  22. public void produce(){
  23. lock.lock();
  24. if(buffer.size()<5){
  25. buffer.offer(1);
  26. System.out.println(Thread.currentThread().getName()+"生产了一个,buffer有"+buffer.size());
  27. empty.signal();
  28. }else{
  29. try {
  30. full.await();
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. lock.unlock();
  36. }
  37. @Override
  38. public void run() {
  39. for (int i = 0; i < 10; i++) {
  40. produce();
  41. }
  42. }
  43. }
  44. class Consumer implements Runnable {
  45. Condition empty,full;
  46. Queue<Integer> buffer;
  47. Lock lock;
  48. Consumer(Condition empty,Condition full,Queue<Integer> buffer,Lock lock)
  49. {
  50. this.empty=empty;
  51. this.full=full;
  52. this.buffer=buffer;
  53. this.lock=lock;
  54. }
  55. public void consume(){
  56. while (true){
  57. lock.lock();
  58. if(buffer.size()>0){
  59. buffer.poll();
  60. System.out.println(Thread.currentThread().getName()+"消费了一个,buffer有"+buffer.size());
  61. full.signal();
  62. }else {
  63. try {
  64. empty.await();
  65. } catch (InterruptedException e) {
  66. e.printStackTrace();
  67. }
  68. }
  69. lock.unlock();
  70. }
  71. }
  72. @Override
  73. public void run() {
  74. consume();
  75. }
  76. }

5.使用CyclicBarrier

CyclicBarrier构造函数:

  1. //代表要阻塞多少个线程(需要多少个线程到达了这里再执行后续)
  2. private int count;
  3. //代表屏障出阻塞的线程数量
  4. public CyclicBarrier(int parties)
  5. //barrierAction可以指定所有线程都到达屏障后执行的函数
  6. public CyclicBarrier(int parties, Runnable barrierAction)

CyclicBarrier一般作为成员变量传入线程,线程调用CyclicBarrier的await方法代表已经到达了阻塞点
await()方法中对count减1,当不为0说明还要等待其他线程,就使用ReentrantLock变量的condition.await()方法阻塞自身,当为0的时候说明所有线程都已经到达,使用ReentrantLock变量的condition.signalAll()方法通知唤醒其余等待的线程继续执行,具体逻辑如下:

  1. private int dowait(boolean timed, long nanos)
  2. throws InterruptedException, BrokenBarrierException,
  3. TimeoutException {
  4. final ReentrantLock lock = this.lock;
  5. lock.lock();
  6. try {
  7. final Generation g = generation;
  8. if (g.broken)
  9. throw new BrokenBarrierException();
  10. if (Thread.interrupted()) {
  11. breakBarrier();
  12. throw new InterruptedException();
  13. }
  14. int index = --count;
  15. if (index == 0) { // tripped
  16. boolean ranAction = false;
  17. try {
  18. final Runnable command = barrierCommand;
  19. if (command != null)
  20. command.run();
  21. ranAction = true;
  22. nextGeneration();//里面会调用signalAll唤醒所有阻塞线程
  23. return 0;
  24. } finally {
  25. if (!ranAction)
  26. breakBarrier();
  27. }
  28. }
  29. // loop until tripped, broken, interrupted, or timed out
  30. for (;;) {
  31. try {
  32. if (!timed)
  33. trip.await();
  34. else if (nanos > 0L)
  35. nanos = trip.awaitNanos(nanos);
  36. } catch (InterruptedException ie) {
  37. if (g == generation && ! g.broken) {
  38. breakBarrier();
  39. throw ie;
  40. } else {
  41. // We're about to finish waiting even if we had not
  42. // been interrupted, so this interrupt is deemed to
  43. // "belong" to subsequent execution.
  44. Thread.currentThread().interrupt();
  45. }
  46. }
  47. if (g.broken)
  48. throw new BrokenBarrierException();
  49. if (g != generation)
  50. return index;
  51. if (timed && nanos <= 0L) {
  52. breakBarrier();
  53. throw new TimeoutException();
  54. }
  55. }
  56. } finally {
  57. lock.unlock();
  58. }
  59. }

6.CyclicBarrier和CountDownLatch的区别

(1)countDownLatch是等待事件到达后执行后续事件,CyclicBarrier是等待一组线程到达之后执行各自线程的 后续部分。
image.pngimage.png
(2)countDownLatch是一次性的,计数器倒计时到0下一次就不会有屏障了,除非手动设置;
但是CyclicBarrier会循环设置屏障阻碍线程

(3)调用countDownLatch的countDown()之后,线程并不会阻塞,调用CyclicBarrier的await()方法,会阻塞当前线程,直到CyclicBarrier指定的线程全部都到达了指定点的时候,才能继续往下执行

7.ThreadLocal内存溢出

每个线程维护一个ThreadLocalMap,其中的每个Enrty的key是一个指向ThreadLocal变量的弱引用,当ThreadLocal变量被回收之后,这个key就会指向空,直到线程结束才会被回收,ThreadLocal解决方法是在每次set,remove方法中都判断当前key是否为空,为空则删除,并遍历整个ThreadLocalMap,将所有key为空的entry的value指向null