并发并行

并发:同一时间 应对 多件事情的能力称为并发

并行:同一时间 处理 多件事情的能力称为并行

预备知识

创建线程的三种方式

  1. /**
  2. *第一种,创建匿名内部类,重写run方法
  3. */
  4. new Thread(){
  5. @Override
  6. public void run() {
  7. System.out.println(123);
  8. }
  9. }.start();
  10. //lambda简化
  11. new Thread(()->{
  12. System.out.println("lambda简化");
  13. }).start();
  14. /**
  15. *第二种,创建类继承Runnable接口,重写run方法,将该类作为参数传递
  16. *
  17. */
  18. public class Mythread implements Runnable{
  19. @Override
  20. public void run() {
  21. System.out.println("第二种创建线程的方法");
  22. }
  23. }
  24. Thread t1 =new Thread(new Mythread());
  25. t1.start();
  26. /**
  27. *第三种,使用FetureTask,线程有返回值
  28. *
  29. */
  30. FutureTask<Integer> task =new FutureTask<>(new Callable<Integer>() {
  31. @Override
  32. public Integer call() throws Exception {
  33. return 1;
  34. }
  35. });
  36. Thread t2 =new Thread(task);
  37. t2.start();
  38. task.get();//阻塞等待线程返回结果

第三章,常用的方法

  1. join() //等待线程运行结束
  2. getId() //获取线程唯一长整形id
  3. getName() //获取线程名
  4. setName() //修改线程名
  5. getPriority() //获取优先级
  6. setPriority() //设置优先级
  7. getState() //获取线程状态,6个枚举值
  8. isAlive() //是否存活
  9. isInterrupted() //是否被打断,不会清除打断标记
  10. interrupt() //打断线程
  11. Thread.currentThread() //获取当前线程
  12. Thread.interrupted() //是否被打断,会清除打断标记,静态方法
  13. Thread.sleep() //线程休眠
  14. Thread.yield() //当前线程让出cup

线程状态

线程的状态(五种) image.png 六种

线程状态转换

image.png

假设有线程 Thread t

  1. new —->runnable
    1. 调用t.start(),从new 转换为runnable
  2. runnable —->waiting
    1. t线程用synchronized(obj)获取对象锁后
      1. 调用obj.wait()时,t线程从runnable —> waiting
      2. 调用obj.notify(),obj.notifyAll(),t.interrupt()时
        1. 竞争锁成功,t线程从waiting —>runnable
        2. 竞争失败,t线程从waiting —>blocked
  3. runnable <—->waiting
    1. 当前线程调用t.join()之后,当前线程从runnable —>waiting
    2. t运行结束后,当前线程从waiting —>runnable
    3. 当前线程调用LockSupport.park()之后,当前线程从 runnable —>waiting
    4. 调用LockSupport.unpark(目标线程),或者调用线程的interrupt(),会让目标线程从waiting —>runnable
  4. runnable <—>timed_waiting
    1. t线程用synchronized(obj)获取对象锁后
      1. 调用obj.wait(long n)方法时,t线程从runnable —>timed_waiting
      2. t线程等待超过 n毫秒,或者调用obj.notify(),obj.notifyAll(),t.interrupt()时
        1. 竞争锁成功,t线程从timed_waiting—>runnable
        2. 竞争失败,t线程从timed_waiting->>blocked
    2. 当前线程调用t.join(long n),当前线程从runnable —>timed_waiting
    3. 当前线程等待超过n毫秒,或者t线程运行结束,或者调用了当前线程的 interrupt()时,当前线程从timed_waiting —>runnable
    4. 当前线程 调用Thread.sleep(long n),当前线程从runnable —>timed_waiting
    5. 当前线程等待超过n毫秒,当前线程从timed_waiting —>runnable
    6. 当前线程调用LockSupport.parkNanos(long nanos)或LockSupport.parkUntil(long millis)时,当前线程从runnable —>timed_waiting
    7. 调用LockSupport.unpark(目标线程),或者调用了线程的interrupt(),或者等待超时,当前线程从timed_waiting —>runnable
  5. runnable —>blocked
    1. t线程使用synchronized(obj)获取对象锁时,竞争失败,t线程从runnable —>blocked
    2. obj锁被其他线程释放后,会唤醒该对象上所有blocked的线程重新竞争,如果t线程竞争成功,从 blocked —>runnable,否则仍处于blocked。
  6. runnable —>terminated
    1. 当前线程所有代码运行完毕后,进入terminated

第四章

基本概念

临界区: 一段代码存在对共享资源的多线程读写操作,称这段代码块为临界区

竞态条件: 多个线程在临界区内执行,由于代码块的 执行序列不同而导致结果无法预测,称之为发生了 竞态条件

解决方案

阻塞式解决方案: synchronized,Lock 非阻塞的解决方案: 原子变量

synchronized

语法

  1. synchronized(对象){
  2. 临界区
  3. }

异常

image.png

synchronized加在方法上(锁的是this对象)

  1. public synchronized void test(){
  2. }
  3. //等价于
  4. public void test(){
  5. synchronized (this){
  6. }
  7. }

synchronized加在静态方法上(锁的是类对象)

  1. public synchronized static void test(){
  2. }
  3. //等价于
  4. public static void test(){
  5. synchronized (Mytest.class){
  6. }
  7. }

Monitor(监视器)

Monitor是系统提供的,结构如下

image.png

工作原理: Thread-0获取锁的时候,Owner指向Thread-0,之后其他线程来获取锁的时候,会存放到EntryList中进入阻塞状态,当Thread-0释放锁之后,会重新唤醒EntryList中的线程进行竞争。当线程没有满足条件,调用wait()方法之后,会释放锁,并且存入waitSet中,当满足条件后会被唤醒并且参与竞争,运行wait()之后的代码。

Mark Word

对象头中的Mark Word(标记字)主要用来表示对象的线程锁状态,另外还可以用来配合GC、存放该对象的hashCode;

image.png

轻量级锁

image.png

当线程执行synchronized时,会在栈帧中创建一条锁记录,且此时没有其他线程竞争时,锁对象的Mark Word会存放锁记录的地址,并且交换值,表示轻量级锁。

image.png

轻量级锁解锁过程: 交换值,删除锁记录。

锁膨胀

image.png

当线程执行synchronized时,锁对象被多个线程竞争,或者已经被其他线程使用,会执行锁膨胀过程。申请一个重量锁,Monitor 此时锁对象的Mark Word会指向Monitor的地址,并且当前持有锁的线程存放到Owner,没有竞争到锁的线程存放到EntryList中进行阻塞,等待唤醒。

image.png

自旋优化

线程2获取锁时,如果失败并不会立刻进入阻塞状态,而是自旋重新获取锁,重复几次成功则执行临界资源 如果自旋失败,进入Monitor的EntryList进行等待。

偏向锁

java6开始使用偏向锁进行优化。 偏向锁的Mark Word 存放的是 线程id + 101 偏向锁默认开启的。锁对象的Mark Word中存放的不是锁记录的地址,而是线程的id。当无线程竞争的时候,就不会给同一个线程重复加锁。 偏向锁默认开启,但是有延迟,加上JVM参数取消延迟 -XX:BiasedLockingStartupDelay=0 禁用偏向锁: -XX:-UseBiasedLocking

  1. <--需要引入包-->
  2. <dependency>
  3. <groupId>org.openjdk.jol</groupId>
  4. <artifactId>jol-core</artifactId>
  5. <version>0.16</version>
  6. </dependency>

偏向锁偏向撤销 1、调用hashCode,将 线程id替换为 锁记录地址 2、其他线程竞争锁,锁会变为轻量级锁

批量重偏向 如果对象虽然被多个线程访问,但没有竞争,这时偏向了线程 T1 的对象仍有机会重新偏向 T2,重偏向会重置对象的 Thread ID;当撤销偏向锁达到阈值 20 次后,jvm 会这样觉得,我是不是偏向错了呢,于是会在给这些对象加锁时重新偏向至t2。因为前19次是轻量,释放之后为无锁不可偏向,但是20次后面的是偏向t2,释放之后依然是偏向t2。

批量撤销

  • 当一个偏向锁如果撤销次数到达40的时候就认为这个对象设计的有问题;那么JVM会把这个对象所对应的类所有的对象都撤销偏向锁;并且新实例化的对象也是不可偏向的
  • t1线程创建40个a对象,t2撤销偏向锁40次,t3开始加锁。t1 中40个对象都是偏向锁,t2撤销19次开始偏向t2,t3撤销19次后所有对象都会被JVM设置为不偏向,并且同一个类中创建的心类也不偏向。

锁消除

锁消除是发生在编译器级别的一种锁优化方式。
有时候我们写的代码完全不需要加锁,却执行了加锁操作。 jvm在运行的时候会优化代码进行锁消除

线程安全分析

image.png

设计模式

wait和notify

原理

线程获取锁之后但缺少运算条件,执行wait方法之后会释放锁并进入waitSet,进入waiting状态,当生产该条件的线程执行完毕后,会执行notify唤醒该线程进入entryList重新竞争锁。

API介绍

image.png

实例

  1. public class TestWaitify {
  2. private static final Object room =new Object();
  3. private static Boolean hasOk = false;
  4. private static Boolean hasMail = false;
  5. public static void main(String[] args) throws InterruptedException {
  6. new Thread(()->{
  7. synchronized(room){
  8. while(!hasOk){
  9. try {
  10. System.out.println("waiting1");
  11. room.wait();
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. System.out.println("线程一ok");
  17. }
  18. }).start();
  19. new Thread(()->{
  20. synchronized(room){
  21. while(!hasMail){
  22. System.out.println("waiting2");
  23. try {
  24. room.wait();
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. System.out.println("2Ok");
  29. }
  30. }
  31. }).start();
  32. TimeUnit.SECONDS.sleep(3);
  33. hasMail = true;
  34. synchronized (room){
  35. System.out.println("notify");
  36. room.notifyAll();
  37. }
  38. }
  39. }

虚假唤醒 notify()没有正确唤醒准备好的线程,这种情况称之为虚假唤醒。

保护性暂停

原理

一个线程等待另一个线程的执行结果,如果结果不断从一个线程到另一个线程,需要使用消息队列。

实例

  1. package com.example.thread;
  2. import java.util.concurrent.TimeUnit;
  3. /**
  4. * @author thesky
  5. * @date 2021/9/8 9:59
  6. */
  7. public class Stop {
  8. public static void main(String[] args) {
  9. Guard guard = new Guard();
  10. //线程1,等待线程2给结果
  11. new Thread(()->{
  12. try {
  13. Object o = guard.get(2000);
  14. if (o !=null){
  15. System.out.println("线程1收到结果");
  16. }else{
  17. System.out.println("超时");
  18. }
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. },"线程1").start();
  23. //线程2
  24. new Thread(()->{
  25. Object o = new Object();
  26. try {
  27. TimeUnit.SECONDS.sleep(1);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. System.out.println("线程2生产");
  32. guard.put(o);
  33. },"线程2").start();
  34. }
  35. }
  36. class Guard{
  37. //结果
  38. private Object response;
  39. //获取结果
  40. public Object get(long timeout) throws InterruptedException {
  41. synchronized(this){
  42. long begin = System.currentTimeMillis();
  43. long passTime = 0;
  44. /* 下面超时优化
  45. while(response==null){
  46. //没有结果,线程等待
  47. System.out.println("没有结果,等待。。。。");
  48. this.wait();
  49. }
  50. */
  51. while(response==null){
  52. if (passTime > timeout){
  53. break;
  54. }
  55. //没有结果,线程等待
  56. long delay = timeout - passTime;
  57. System.out.println("没有结果,等待。。。。");
  58. this.wait(delay);
  59. passTime = System.currentTimeMillis() - begin;
  60. }
  61. return response;
  62. }
  63. }
  64. //生产结果
  65. public void put(Object obj){
  66. synchronized(this){
  67. this.response = obj;
  68. System.out.println("结果传递,唤醒线程");
  69. this.notifyAll();
  70. }
  71. }
  72. }

托管多个

  1. package com.example.thread;
  2. import java.util.Hashtable;
  3. import java.util.Map;
  4. import java.util.Set;
  5. import java.util.concurrent.TimeUnit;
  6. /**
  7. * @author thesky
  8. * @date 2021/9/8 9:59
  9. */
  10. public class Stop {
  11. public static void main(String[] args)
  12. for (int i = 0; i < 3; i++) {
  13. new People().start();
  14. }
  15. try {
  16. System.out.println("休眠");
  17. TimeUnit.SECONDS.sleep(2);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. for (Integer id : MailBox.getIds()) {
  22. new PostMan(id,id+"123456").start();
  23. }
  24. }
  25. }
  26. class Guard{
  27. private int id;
  28. public int getId(){
  29. return id;
  30. }
  31. public Guard(int id){
  32. this.id=id;
  33. }
  34. //结果
  35. private Object response;
  36. //获取结果
  37. public Object get(long timeout) throws InterruptedException {
  38. synchronized(this){
  39. long begin = System.currentTimeMillis();
  40. long passTime = 0;
  41. while(response==null){
  42. long delay = timeout - passTime;
  43. if (delay <= 0){
  44. break;
  45. }
  46. //没有结果,线程等待
  47. System.out.println("没有结果,等待。。。。");
  48. this.wait(delay);
  49. passTime = System.currentTimeMillis() - begin;
  50. }
  51. return response;
  52. }
  53. }
  54. //生产结果
  55. public void put(Object obj){
  56. synchronized(this){
  57. this.response = obj;
  58. System.out.println("结果传递,唤醒线程");
  59. this.notifyAll();
  60. }
  61. }
  62. }
  63. class People extends Thread{
  64. @Override
  65. public void run() {
  66. Guard guard = MailBox.creatBox();
  67. try {
  68. System.out.println("等待");
  69. Object o = guard.get(5000);
  70. if (o!=null){
  71. System.out.println("get message!!!");
  72. }
  73. } catch (InterruptedException e) {
  74. e.printStackTrace();
  75. }
  76. }
  77. }
  78. class PostMan extends Thread{
  79. private int mailId;
  80. private String message;
  81. public PostMan(int mailId,String message){
  82. this.mailId=mailId;
  83. this.message=message;
  84. }
  85. @Override
  86. public void run() {
  87. Guard byId = MailBox.getById(mailId);
  88. byId.put(message);
  89. }
  90. }
  91. class MailBox{
  92. private static Map<Integer,Guard> box=new Hashtable<>();
  93. private static int id = 1;
  94. //生成id
  95. private static synchronized int generateId(){
  96. return id++;
  97. }
  98. public static Guard creatBox(){
  99. Guard guard = new Guard(generateId());
  100. box.put(guard.getId(),guard);
  101. return guard;
  102. }
  103. public static Set<Integer> getIds(){
  104. return box.keySet();
  105. }
  106. public static Guard getById(int id){
  107. return box.remove(id);
  108. }
  109. }

生产者消费者

image.png

实例——阻塞队列

  1. package com.example.thread;
  2. import java.util.LinkedList;
  3. import java.util.concurrent.TimeUnit;
  4. /**
  5. * @author thesky
  6. * @date 2021/9/8 14:07
  7. */
  8. public class Link {
  9. public static void main(String[] args) {
  10. MyQueue myQueue =new MyQueue(2);
  11. new Thread(()->{
  12. while (true){
  13. myQueue.take();
  14. }
  15. },"123").start();
  16. for (int i = 0; i < 3; i++) {
  17. int tem = i;
  18. new Thread(()->{
  19. myQueue.push(tem+"hao");
  20. }).start();
  21. }
  22. }
  23. }
  24. class MyQueue{
  25. private Integer capcity;
  26. private LinkedList<String> queues =new LinkedList<>();
  27. public MyQueue(int capcity){
  28. this.capcity = capcity;
  29. }
  30. public void push(String message){
  31. synchronized (queues){
  32. while (queues.size()==capcity){
  33. try {
  34. System.out.println("生产者等待");
  35. queues.wait();
  36. } catch (InterruptedException e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. System.out.println("生成");
  41. queues.addLast(message);
  42. queues.notifyAll();
  43. }
  44. }
  45. public String take(){
  46. synchronized(queues){
  47. while (queues.isEmpty()){
  48. try {
  49. System.out.println("消费者等待");
  50. queues.wait();
  51. } catch (InterruptedException e) {
  52. e.printStackTrace();
  53. }
  54. }
  55. System.out.println("消费");
  56. queues.notifyAll();
  57. String s = queues.removeFirst();
  58. return s;
  59. }
  60. }
  61. }

park &unpark

与wait和notify的区别

  1. wait和notify必须配合Monitor使用,但是unpark不需要
  2. park和unpark以线程为单位,阻塞唤醒线程,但是notify只能随机唤醒一个线程,notifyAll唤醒所有线程,精确度不同。
  3. 可以先unpark,但是不能先notify。

原理

每个线程都会关联一个Parker对象。里面有三个属性,counter = 0时,调用park,获得_mutex互斥锁,线程进入_cond条件变量阻塞,调用unpark时,counter=1,且最多为1,所以多次调用unpar无效。此时唤醒_cond条件变量中的线程,设置counter=0。 当先调用unpark时,设置counter = 1,之后调用park,此时counter = 1,无需阻塞,设置counter = 0

实例

  1. package com.example.thread;
  2. import java.util.concurrent.TimeUnit;
  3. import java.util.concurrent.locks.LockSupport;
  4. /**
  5. * @author thesky
  6. * @date 2021/9/8 19:16
  7. */
  8. public class Park {
  9. public static void main(String[] args) {
  10. Thread thread = new Thread(() -> {
  11. System.out.println("start");
  12. try {
  13. TimeUnit.SECONDS.sleep(1);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. System.out.println("park");
  18. LockSupport.park();
  19. System.out.println("continue");
  20. });
  21. thread.start();
  22. try {
  23. TimeUnit.SECONDS.sleep(2);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. System.out.println("unpark");
  28. LockSupport.unpark(thread);
  29. }
  30. }

固定运行顺序

多把锁

多个不想关的锁可以提高并发度,但是会出现死锁现象

死锁

死锁是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象

定位死锁,使用Jconsole测试、jps 、jstack

哲学家问题解决

活锁

两个线程改变彼此的停止条件导致无法停止,称之为活锁 解决方法: 增加随机睡眠时间

线程饥饿

线程优先级太低,导致很长时间不被cup调度导致无法完成。

ReenterantLock

特点

可重入、可中断、设置超时、公平锁,支持多个条件变量

多个条件变量

  1. private Condition condition1 = lock.newCondition(); //多变量
  2. private ReentrantLock lock =new ReentrantLock(true); //开启公平锁,默认不填为非公平锁
  3. public void reenlock(){
  4. Thread thread1 = new Thread(() -> {
  5. try {
  6. while(true){
  7. if (lock.tryLock()){
  8. System.out.println("t1上锁成功");
  9. if (!a){
  10. try {
  11. System.out.println("条件不满足");
  12. condition1.await();
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. }else{
  17. System.out.println("条件满足");
  18. break;
  19. }
  20. }
  21. }
  22. } finally {
  23. lock.unlock();
  24. System.out.println("t1释放锁");
  25. }
  26. });
  27. thread1.start();
  28. try {
  29. TimeUnit.SECONDS.sleep(3);
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. if (lock.tryLock()) {
  34. System.out.println("main获取锁");
  35. a = true;
  36. condition1.signalAll();
  37. lock.unlock();
  38. }
  39. }

AQS

aqs是同步器框架,自定义同步器需要继承和重写方法

  1. package demo.thread;
  2. import java.util.concurrent.TimeUnit;
  3. import java.util.concurrent.locks.AbstractQueuedSynchronizer;
  4. import java.util.concurrent.locks.Condition;
  5. import java.util.concurrent.locks.Lock;
  6. /**
  7. * @author thesky
  8. * @date 2021/10/20 14:55
  9. */
  10. public class MyLock implements Lock {
  11. class Mysyn extends AbstractQueuedSynchronizer{
  12. @Override //尝试获取锁
  13. protected boolean tryAcquire(int arg) {
  14. if (compareAndSetState(0,1)){
  15. //给当前线程加上锁
  16. setExclusiveOwnerThread(Thread.currentThread());
  17. return true;
  18. }
  19. return false;
  20. }
  21. @Override //尝试释放锁
  22. protected boolean tryRelease(int arg) {
  23. setExclusiveOwnerThread(null);
  24. setState(0);
  25. return true;
  26. }
  27. @Override //是否独占锁
  28. protected boolean isHeldExclusively() {
  29. return getState() == 1;
  30. }
  31. public Condition newCondition(){
  32. return new ConditionObject();
  33. }
  34. }
  35. private Mysyn mysyn =new Mysyn();
  36. @Override // 加锁
  37. public void lock() {
  38. mysyn.acquire(1);
  39. }
  40. @Override // 可打断加锁
  41. public void lockInterruptibly() throws InterruptedException {
  42. mysyn.acquireInterruptibly(1);
  43. }
  44. @Override //尝试加锁
  45. public boolean tryLock() {
  46. return mysyn.tryAcquire(1);
  47. }
  48. @Override //带超时的尝试加锁
  49. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  50. return mysyn.tryAcquireNanos(1,unit.toNanos(time));
  51. }
  52. @Override //释放锁
  53. public void unlock() {
  54. mysyn.release(1); //release会唤醒阻塞的线程,tryRelease不会唤醒
  55. }
  56. @Override //创建条件变量
  57. public Condition newCondition() {
  58. return mysyn.newCondition();
  59. }
  60. }

ReenterantLock实现原理,理解AQS

image.png

构造器

image.png

ReenterantLock 构造器默认实现的是非公平锁

线程加锁成功

非公平锁上锁,调用 compareAndSetState ,成功就将锁设置给当前线程,否则进入 acquire(1)方法。

  1. final void lock() {
  2. if (compareAndSetState(0, 1))
  3. setExclusiveOwnerThread(Thread.currentThread());
  4. else
  5. acquire(1);
  6. }

加锁失败

acquire会在次尝试获得锁,依旧失败就 addWaiter()构造一个Node队列 Node双向链表,第一位 是null,成为哑元或者哨兵,用来占位并不关联线程。 这个链表的元素,前一个结点状态是-1,需要唤醒后一个节点,最后的节点状态是 0 当锁释放,队列中第一的线程可以抢占锁,如果此时队列外的线程先一步抢占到了锁,队列中的线程就抢占失败,因此称之为非公平锁。

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

锁重入和释放

线程上锁,判断锁的owner是不是自己,如果是自己,就会让state+1,之后释放锁的时候每次 state-1

(不可)可打断原理

不可打断: 线程被打断之后,并不会立即返回打断标记,而是只有当该线程获得锁之后,才会返回打断标记(true),并执行打断操作。 可打断: 线程park之后阻塞,如果被打断,if语句就是true,就立刻抛出异常,该线程也就停止了。

公平锁

如果状态为 0 ,会先判断队列中有没有等待线程,如果没有就获取锁,如果有就不会竞争锁。

条件变量

当调用await后,如果该condition不存在,就创建一个Node,然后将线程存入,state设置为-3,当调用该变量的signal后,会将该节点 加入到 阻塞队列的队尾,并设置状态为0,之前的队尾节点状态设置为-1。

读写锁 ReentrantReadWriteLock

获取写锁之后,未释放写锁还可以获取读锁,称之为降级 获取读锁之后,未释放锁不能获得读锁,称之为升级 可重入

  1. ReentrantReadWriteLock reentrantReadWriteLock =new ReentrantReadWriteLock();
  2. try{
  3. reentrantReadWriteLock.readLock().lock();
  4. System.out.println("read.....");
  5. }finally {
  6. reentrantReadWriteLock.readLock().unlock();
  7. }
  8. try{
  9. reentrantReadWriteLock.writeLock().lock();
  10. System.out.println("写....");
  11. //锁降级
  12. reentrantReadWriteLock.readLock().lock();
  13. System.out.println("read.....");
  14. reentrantReadWriteLock.readLock().unlock();
  15. }finally {
  16. reentrantReadWriteLock.writeLock().unlock();
  17. }

StampedLock

Semaphore

在一定时间内限制访问共享资源的线程数量

CountDownLatch

CyclicBarrier

第五章

可见性

volatile

原子性

volatile修饰的虽然可以保证可见性,但是不能保证原子性。

有序性

指令重排

jvm在执行代码的时候,会进行优化,因此会产生指令重排

禁用:
使用volatile,volatile指令会加入读写屏障

DCL(double checked locking)

保护共享资源

CAS(Check and Set)

  1. AtomicInteger balance;
  2. private volatile AtomicInteger money = new AtomicInteger(1000);
  3. public void reenlock(){
  4. int a = 100;
  5. while (true){
  6. int prev = money.get();
  7. int next = prev - a;
  8. if (money.compareAndSet(prev,next)) {
  9. break;
  10. }
  11. }
  12. }

原子整数 AtomicInteger

  1. AtomicInteger money = new AtomicInteger(1000);
  2. money.getAndIncrement(); //i++
  3. money.incrementAndGet(); //++i
  4. money.decrementAndGet(); //--i
  5. money.addAndGet(10); //先加在读

原子引用

  1. AtomicReference<BigDecimal>

ABA问题

当其他线程修改共享变量为 A -> B ->A时,另一个线程不会发觉共享变量被修改过

解决,加版本号

  1. AtomicStampedReference<String> ref;
  2. int stamp = ref.getStamp;
  3. ref.compareAndSet(oldvalue,newvalue,stamp,stamp+1);

是否被更改过

  1. AtomicMarkableReference

原子数组

  1. AtomicIntegerArray
  2. AtomicLongArray
  3. AtomicReferenceArray<T>

字段更新器

  1. AtomicIntegerFieldUpdater
  2. AtomicLongFieldUpdater
  3. AtomicReferenceFieldUpdater

原子累加器

  1. @Test
  2. public void test2(){
  3. demo(
  4. ()->new AtomicLong(0),
  5. adder->{
  6. adder.getAndIncrement();
  7. }
  8. );
  9. //AtomicLong 没有 LongAdder效率高
  10. demo(
  11. ()->new LongAdder(),
  12. adder->{
  13. adder.increment();
  14. }
  15. );
  16. }
  17. private <T> void demo(Supplier<T> adder, Consumer<T> action){
  18. T a = adder.get();
  19. }

扩展,函数式接口

LongAdder源码

缓存行伪共享

image.png

cpu有独立的一级缓存 二级缓存,以及共享的三级缓存,三级缓存之下是内存。如果两个cpu处理同一个数据,会造成缓存行时效。 一个缓存行存放64个字节的 cell对象24个字节,一个缓存行可以存放两个cell对象 所以使用@sun.misc.Contended注解,为使用的对象前后各加128个字节大小的padding,使每一个缓存行只能存放一个cell对象,这样不会造成对方的缓存行时效。

Unsafe

非常底层的对象,只能通过反射获得

  1. //反射获取
  2. try {
  3. Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
  4. theUnsafe.setAccessible(true); //允许获取私有属性
  5. Unsafe unsafe =(Unsafe) theUnsafe.get(null);
  6. System.out.println(unsafe);
  7. } catch (Exception e) {
  8. e.printStackTrace();
  9. }

image.png

小结

image.png

第七章

不可变对象

例如String 利用保护性拷贝的方式,在修改的时候,创建一个新的对象。但是创建对象过于频繁

享元模式(连接池)

使用场景:重用数量有限的同一类对象时 享元模式(Flyweight Pattern)主要用于减少创建对象的数量,以减少内存占用和提高性能。这种类型的设计模式属于结构型模式,它提供了减少对象数量从而改善应用所需的对象结构的方式。

  1. public class Pool {
  2. Logger logger = LoggerFactory.getLogger(Pool.class);
  3. //连接池大小
  4. private final int poolsize;
  5. //连接对象数组
  6. private Connection[] connections;
  7. //连接状态数组
  8. private AtomicIntegerArray states;
  9. //初始化方法
  10. public Pool(int size){
  11. this.poolsize = size;
  12. this.connections = new Connection[size];
  13. this.states = new AtomicIntegerArray(new int[poolsize]);
  14. for (int i =0;i<size;i++){
  15. connections[i] = new MyConnection();
  16. }
  17. }
  18. //借连接
  19. public Connection borrow(){
  20. while(true){
  21. for (int i = 0; i < poolsize; i++) {
  22. if (states.get(i)==0) {
  23. if (states.compareAndSet(i,0,1)) {
  24. logger.info("借出{}",connections[i]);
  25. return connections[i];
  26. }
  27. }
  28. }
  29. synchronized(this){
  30. try {
  31. logger.info("满了");
  32. this.wait();
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. }
  38. }
  39. //归还连接
  40. public void free(Connection con){
  41. for (int i = 0; i < poolsize; i++) {
  42. if (connections[i]==con) {
  43. states.set(i,0);
  44. logger.info("回收{}",con);
  45. synchronized (this){
  46. this.notifyAll();
  47. }
  48. break;
  49. }
  50. }
  51. }
  52. }
  53. class MyConnection implements Connection{
  54. //实现
  55. }

final原理

final会增加写屏障

小结

image.png

第八章,线程池

阻塞队列

线程池队列的作用,当线程池的线程无法处理过多的任务时,可以讲任务存放到队列中,等线程处理当前任务之后慢慢消费。所以队列功能有两个,存储任务,供线程获取任务。 在存储任务的时候,如果队列空间已满,此时可以用不同的策略处理,比如阻塞等待和带超时的等待或者其他策略。 线程获取任务的时候,如果队列是空的,可以让线程阻塞等待或者超时等待。

  1. package demo.thread;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import java.util.ArrayDeque;
  5. import java.util.Deque;
  6. import java.util.concurrent.TimeUnit;
  7. import java.util.concurrent.locks.Condition;
  8. import java.util.concurrent.locks.ReentrantLock;
  9. /**
  10. * @author thesky
  11. * @date 2021/10/16 17:23
  12. */
  13. public class BlockQueue<T> {
  14. Logger logger = LoggerFactory.getLogger(BlockQueue.class);
  15. private Deque<T> queue = new ArrayDeque<>();
  16. private ReentrantLock lock =new ReentrantLock();
  17. private Condition fullWaitSet = lock.newCondition();
  18. private Condition emptyWaitSet = lock.newCondition();
  19. private int capcity;
  20. public BlockQueue(int capcity){
  21. this.capcity =capcity;
  22. }
  23. //带超时的阻塞获取
  24. public T poll(long timeout, TimeUnit unit){
  25. lock.lock();
  26. try{
  27. long nanos = unit.toNanos(timeout);
  28. while (queue.isEmpty()){
  29. try {
  30. if (nanos<=0){
  31. return null;
  32. }
  33. nanos = emptyWaitSet.awaitNanos(nanos);
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. }
  38. T t = queue.removeFirst();
  39. fullWaitSet.signal();
  40. return t;
  41. }finally{
  42. lock.unlock();
  43. }
  44. }
  45. // 阻塞获取任务
  46. public T take(){
  47. lock.lock();
  48. try{
  49. while (queue.isEmpty()){
  50. try {
  51. emptyWaitSet.await();
  52. } catch (InterruptedException e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. T t = queue.removeFirst();
  57. fullWaitSet.signal();
  58. return t;
  59. }finally{
  60. lock.unlock();
  61. }
  62. }
  63. //策略模式
  64. //向队列中存放任务
  65. public void put(T task){
  66. lock.lock();
  67. try{
  68. while (queue.size()==capcity){
  69. try {
  70. fullWaitSet.await();
  71. } catch (InterruptedException e) {
  72. e.printStackTrace();
  73. }
  74. }
  75. queue.addLast(task);
  76. emptyWaitSet.signalAll();
  77. }finally {
  78. lock.unlock();
  79. }
  80. }
  81. //带超时的put任务
  82. public void putTimeout(T task,long timeout, TimeUnit unit){
  83. lock.lock();
  84. try{
  85. long nanos = unit.toNanos(timeout);
  86. while (queue.size()==capcity){
  87. try {
  88. if (nanos>0){
  89. nanos = fullWaitSet.awaitNanos(nanos);
  90. }
  91. } catch (InterruptedException e) {
  92. e.printStackTrace();
  93. }
  94. }
  95. queue.addLast(task);
  96. fullWaitSet.signal();
  97. }finally{
  98. lock.unlock();
  99. }
  100. }
  101. public void tryPut(RejectPolicy<T> rejectPolicy,T task){
  102. lock.lock();
  103. try{
  104. //队列是否已满
  105. if (queue.size()==capcity){
  106. logger.info("满了");
  107. rejectPolicy.reject(this,task);
  108. }else{
  109. logger.info("没满");
  110. queue.addLast(task);
  111. emptyWaitSet.signal();
  112. }
  113. }finally {
  114. lock.unlock();
  115. }
  116. }
  117. public int size(){
  118. lock.lock();
  119. try {
  120. return queue.size();
  121. } finally {
  122. lock.unlock();
  123. }
  124. }
  125. }

线程池

线程池定义了 队列类型, 核心线程数以及救济线程数。任务是由线程池执行的,线程池获得任务之后,将任务分配给继承Thread类的线程处理对象(称线程),线程处理任务,当线程处理完之后,线程不能立刻结束,而是应该检查队列中是否有任务,如果有,则取出继续执行。当所有任务处理完之后,核心线程继续阻塞等待。救济线程结束移除。

  1. package demo.thread;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import java.util.HashSet;
  5. import java.util.concurrent.TimeUnit;
  6. /**
  7. * @author thesky
  8. * @date 2021/10/16 17:52
  9. */
  10. public class MyPool {
  11. Logger logger = LoggerFactory.getLogger(MyPool.class);
  12. private BlockQueue<Runnable> taskQueue;
  13. //线程集合
  14. private HashSet<Worker> workers = new HashSet();
  15. private int coreSize;
  16. private long tiemout;
  17. private TimeUnit timeUnit;
  18. private RejectPolicy<Runnable> rejectPolicy;
  19. public MyPool(int coreSize, long tiemout, TimeUnit unit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {
  20. logger.info("构造线程池");
  21. this.coreSize = coreSize;
  22. this.tiemout = tiemout;
  23. this.timeUnit = unit;
  24. this.taskQueue = new BlockQueue<>(queueCapcity);
  25. this.rejectPolicy =rejectPolicy;
  26. }
  27. public void execute(Runnable task){
  28. //当任务数没有超过核心时直接执行
  29. synchronized (workers){
  30. if (workers.size()<coreSize){
  31. logger.info("增加worker");
  32. Worker worker =new Worker(task);
  33. workers.add(worker);
  34. worker.start();
  35. }
  36. //如果超过了,加入任务队列
  37. else{
  38. logger.info("{}存入队列",task);
  39. //taskQueue.put(task);
  40. //死等 带超时时间等待 放弃任务 抛出异常 调用线程自己执行
  41. taskQueue.tryPut(rejectPolicy,task);
  42. }
  43. }
  44. }
  45. class Worker extends Thread{
  46. private Runnable task;
  47. public Worker(Runnable task){
  48. this.task =task;
  49. }
  50. @Override
  51. public void run() {
  52. //当task !=null
  53. //执行完毕之后,去任务队列中获取
  54. //最后结束
  55. while (task!=null || (task = taskQueue.poll(tiemout,timeUnit))!=null){
  56. try {
  57. logger.info("执行{}",task);
  58. task.run();
  59. } catch (Exception e) {
  60. e.printStackTrace();
  61. }finally {
  62. task = null;
  63. }
  64. }
  65. synchronized (workers){
  66. logger.info("worker移除{}",this);
  67. workers.remove(this);
  68. }
  69. }
  70. }
  71. }

策略模式

策略模式可以灵活的调用 对象提供的策略,让用户自定处理逻辑

  1. package demo.thread;
  2. /**
  3. * @author thesky
  4. * @date 2021/10/17 15:42
  5. */
  6. @FunctionalInterface
  7. public interface RejectPolicy<T> {
  8. void reject(BlockQueue<T> queue,T task);
  9. }
  10. //例子
  11. MyPool myPool = new MyPool(1,1, TimeUnit.SECONDS,2,(queue,task)->{
  12. //queue.put(task);
  13. queue.poll(1,TimeUnit.SECONDS);
  14. });
  15. public void tryPut(RejectPolicy<T> rejectPolicy,T task){
  16. lock.lock();
  17. try{
  18. //队列是否已满
  19. if (queue.size()==capcity){
  20. logger.info("满了");
  21. rejectPolicy.reject(this,task); //再此调用传参
  22. }else{
  23. logger.info("没满");
  24. queue.addLast(task);
  25. emptyWaitSet.signal();
  26. }
  27. }finally {
  28. lock.unlock();
  29. }
  30. }

线程池状态

image.png

java自带的线程池ThreadPoolExecutor

image.png

固定线程池

固定核心线程池

image.png

带缓冲线程池

全是救济线程池,队列只能存放1个任务,之后阻塞等待线程取走。

image.png

newSingleThreadExecutor

image.png

线程固定为1,该线程池队列处理任务先进先处理

线程池提交任务

  1. execute(Runable command);
  2. <T> Future submit(Callable<T> task);
  3. invokeAll
  4. invokeAny

关闭线程池

  1. shutdown();
  2. shutdownNow(); //没有执行的任务返回

工作线程模式

当任务类型不一样的时候,可以设置多个池分别处理

分配合理线程池大小

image.png

任务调度

  1. ScheduledExecutorService
  2. //延时执行任务
  3. pool.schedule(()->{},延时时间,单位)
  4. //定时执行任务
  5. pool.scheduleAtFixedRate(()->{},延时时间,间隔时间,单位)
  6. pool.scheduleWithFixedDelay(()->{},延时时间,间隔时间,单位)

tomcat线程池

forkjoinpool

线程安全的集合类 JUC