并发和并行

  • 并发
    • 多个任务在一个cpu上交替执行
  • 并行

    • 多个任务在多个cpu上同时执行

      同步和异步

  • 同步


  • 异步

    tamcat并发低的原因

  • tomcat的并发处理为什么这么低,其实是因为tomcat的运行模式导致的,当用户去使用浏览器访问tomcat服务器的时候,首先会经过tcp协议的三次握手四次挥手保证网络通畅,最终通过双方的socket来发送请求和响应;

  • 确认连接以后用户访问服务器,这个时候tomcat的线程监听器监听到信息,告知tomcat,tomcat就会从工作线程池中分配线程来完成本次的访问;
  • 我们的项目都是部署在tomcat中,tomcat会将打包好的request发送给controller层,service层,dao层,最后和db进行数据交互,然后将结果打包成Response通过socket返回给用户;
  • 因为tomcat的运行模式是同步阻塞的,所以他这整个执行流程都不会收回线程或者转移线程,而整个请求的完成其实是很费时间的,又因为tomcat本身的总线程并不多,所以tomcat的并发才这么低;
  • 线程总数不多,tps不高

    线程池的7大参数:

  1. 常驻线程数
  2. 最大线程数
  3. 线程销毁时间
  4. 时间单位
  5. 阻塞队列
  6. 默认工厂
  7. 拒绝策略

线程池原理:

线程池刚创建初始化好的时候他的线程数其实是0,当任务提交过来后,线程池会判断当前的线程数是不是小于常驻线程数,小于,就创建对应的常驻线程,当线程池中的线程不再小于常驻线程之后,就暂时不创建线程了,后面来了其他任务,就到阻塞队列排队,当阻塞队列满了,就根据最大线程数把其他零时线程创建出来,当阻塞队列又满了,执行对应的拒绝策略。当一段时间后,如果常驻线程能够把任务处理过来,就把零时线程销毁。

阿里手册上明确指出,原生线程池的阻塞队列是Integer.MAX_VALUE,也叫无界队列,队列太大,无数任务都可以提交到线程池内部,会把线程池 撑爆。

并发编程三大问题

什么是并发编程:

  • 多个线程相互协作,一起完成同一件事情。
  • 可见性问题

    1. public class VolatileDemo {
    2. static int flag = 0;
    3. public static void main(String[] args) {
    4. //线程一
    5. new Thread(() -> {
    6. int localFlag = flag;
    7. while (true) {
    8. if (localFlag != flag) {
    9. System.out.println("线程一读取到被修改的值 " + flag);
    10. localFlag = flag;
    11. }
    12. }
    13. }).start();
    14. //线程二
    15. new Thread(() -> {
    16. int localFlag = flag;
    17. while (true) {
    18. System.out.println("线程二标志位被修改为了:" + ++localFlag);
    19. flag = localFlag;
    20. try {
    21. TimeUnit.SECONDS.sleep(2);
    22. } catch (Exception e) {
    23. e.printStackTrace();
    24. }
    25. }
    26. }).start();
    27. }

    代码理解:根据代码的运行逻辑应该是线程一和线程二交替进行,可是实际运行情况却不是这样,而是线程一只执行了一次就不再执行了

    • cpu内存模型
      • 原因:现代计算机内存和cpu的读写速度差异大,cpu直接操作内存的话会导致性能变的很差,所以一般是给cpu加几层缓存,cpu就不需要频繁的跟内存通信,所以就会出现两个问题:
        • 1.可能当前cpu修改的数据只是存储在了缓存中,并没有刷回内存中
        • 2.就算刷回了内存中,其他cpu并不知道你内存中的数据已经发生了修改
      • 解决方法
        • MESI缓存一致性协议
          • 强制把数据刷写回主内存
          • 嗅探机制:当cpu写完之后,会向其他cpu发送一个指令,让其他cpu中的缓存失效,从而其他cpu要重新从内存中加载数据
    • java内存模型

      • 原理:

        • 基于cpu缓存模型来建立
        • 把存放数据的地方分成了工作内存和主内存

          • 工作步骤:
            • 从主存中读取数据(read),将读取到的数据写入工作内存中(load),从工作内存中读取数据来计算(use),再将计算好的数据重新赋值到工作内存中(assign),将工作内存中的数据写入主存(store),再将store过去的变量值赋值给主存中的变量(write)

          image.png

        • 1.同cpu模型一致,可能当前cpu修改的数据只是存储在了缓存中,并没有刷回内存中

        • 2.就算刷回了内存中,其他cpu并不知道你内存中的数据已经发生了修改
      • volatile解决可见性问题
        • 成员变量加上volatile关键字后,只要修改成功,就会强制将工作内存中的数据写入主存(store),再将store过去的变量值赋值给主存中的变量(write),同时让其他cpu中的工作内存中的数据失效,其他cpu要重新从内存中加载数据。
  • 原子性问题

    1. for (int i = 0; i < 20; i++) {
    2. new Thread(()->{
    3. try {
    4. Thread.sleep(20);
    5. } catch (InterruptedException e) {
    6. e.printStackTrace();
    7. }
    8. System.out.println(num++);
    9. }).start();
    10. }
    • 原子性:多个操作要么同时成功,要么同时失败
    • Java内存模型把操作数据的过程分成了很多步,他执行每一步操作cpu的执行权都可能被别的线程抢走,当多条线程同时对一个数据进行修改时,线程一从工作内存中获取到数据,然后cpu执行权可能就被线程二抢走了,线程一就发生了阻塞,当线程二也获取到工作内存中的数据,然后对数据进行修改,当修改成功,就会强制刷写回主内存,并让其他线程中工作内存的数据失效,但线程一已经获取到工作内存中的数据了,工作内存中数据失效并不会影响他的执行,所以当他抢到cpu执行权时,他也会修改数据,当修改成功也会强制把数据刷写回主存,造成数据覆盖
    • 解决方法
      • 使用atomic原子性类中的CAS无锁化机制来解决
  • 重排序问题

    • 编译器在不改变结果的情况下,对指令进行优化,按照他能理解的,而且性能最高的方式来去对 java的指令进行重新排序,在单线程的时候没关系,但是在多线程就会出问题
    • 解决方案
      • happends-bofore原则:保证在一定情况下,不会发生指令重排
      • 但是当没有满足happends-bofore原则时,还会出现指令重排问题
      • volatile:除了是解决内存可见性问题以外,还可以禁止指令重拍,但是无法解决原子性问题

        AtomicInteger的使用

  • Atomic中CAS无锁化原理

    • CAS(比较并替换):无锁化机制
    • 理解:CAS是一种无锁算法,CAS有3个操作数,内存值A,旧的预期值B,要修改的新值C。当且仅当预期值B和内存值A相同时,将内存值A修改为C,采用的乐观锁的思想;当预期值B和内存值A不相同,就会基于do while 循环 compareAndSwap来进行自旋。

    1651938727729.png ```java

  1. //this:当前要操作的对象
  2. //valueOffset:当前要操作的变量偏移量(变量的内存当前值)
  3. //expect:期望内存中的值
  4. //update:要修改的新值

public final class Unsafe { //省略很多代码…… public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); //省略很多代码…… }

//读取传入对象this(var1)在内存中偏移量为valueOffset(var2)位置的值与期望值expect(var4)作比较。

//相等,就把update(var5)值赋值给valueOffset(var2)位置的值,方法返回true。

//不等,就取消赋值,方法返回false

  1. - 优点:
  2. - 获得cpu底层支持,性能很高
  3. - 局部原子性,如果内存值==预估值,就一定会用修改值去替换内存值。
  4. - 缺点:
  5. - ABA问题:第二次拿到的数据虽然和第一次一样,但可能是别的线程改了后又被另外的线程改回来的数据
  6. - 解决方法:用版本号,标志位之类的方案解决,参考乐观锁
  7. - 高并发下大量线程执行不成功自旋对cpu造成巨大压力
  8. - 解决方法:利用LongAdder分布CAS机制优化多线程自旋问题
  9. - **unsafe类**
  10. - unsafeJDK内部的类,提供了一些底层操作的不安全类,只有jdk内部可以拿到,atomic底层的操作就是依赖于unsafe
  11. ```java
  12. public class CasSpinLock {
  13. //成员变量初始化
  14. static int num = 0;
  15. public static void main(String[] args) {
  16. //创建CAS无锁化对象
  17. SpinLock spinLock = new SpinLockCas();
  18. //循环创建多个线程对数据进行修改
  19. for (int i = 0; i < 20; i++) {
  20. new Thread(()->{
  21. //调用自定义CAS对象实现加锁功能
  22. spinLock.lock();
  23. System.out.println(num++);
  24. spinLock.unlock();
  25. }).start();
  26. }
  27. }
  28. }
  29. interface SpinLock{
  30. void lock();
  31. void unlock();
  32. }
  33. class SpinLockCas implements SpinLock{
  34. //创建AtomicInteger原子类
  35. AtomicInteger atomic = new AtomicInteger(0);
  36. @Override
  37. public void lock() {
  38. //比较
  39. while(!atomic.compareAndSet(0,1)){
  40. }
  41. }
  42. @Override
  43. public void unlock() {
  44. //替换
  45. atomic.compareAndSet(1,0);
  46. }
  47. }

LongAdder优化CAS机制多线程自旋问题原理

  • 这个类是jdk1.8之后提供的类
  • 原理:
    • 当有大量并发来操作base(就是Atomic里的Value),会创建出来很多cell,多个线程就去操作对应的cell
    • 当有一个cell竞争很激烈容易失败,会自动迁移,把线程分配一些到其他cell中,如果每一个cell都竞争激烈,那就再创建cell
    • 其实就是将内存中操作的变量拆分出来,让它变成多个变量( 这里和ConcurrentHashMap的原理就很相似了),然后让线程去竞争这些变量,将这些变量处理完后,然后在进行求和,这样降低了变量的并发度,减少了CAS失败次数

1651941805123.png

Threadlocal

volatile、atomic、threadLocal、读写锁使用建议

(1)关于volatile:简单读,简单写,如果仅仅只是去修改某一句代码的值,但是这个共享变量的值,是有人改,有人读,其实这种情况下就只需要添加volatile就可以了,比如源码分析中的一些标志位
(2)关于atomic:如果仅仅只是单纯的去累加一些值,推荐使用atomic,通过cas原子类来进行累加,性能比syn好得多
(3)关于threadLocal:如果你定义的一个成员变量不需要多个线程去进行共享操作的话,那么就可以使用threadLocal的副本机制,让每个线程都维护一个变量的副本,每个线程操作自己的变量
(4)关于读写锁:当多个线程来并发访问一块空间,而且业务稍微复杂一点,但是不是太复杂,就需要加锁了,优先考虑读写锁,syn锁太重了,在读多写少的这样一种情况下,读锁->大量线程并发的读,写数据时不能很多人同时过来写,也不能读,那么就应该使用读写锁
(5)不要让锁的时间过于长,尽量操作一些内存的一些数据就可以了。
(6)控制锁的粒度

多线程的使用

  • 同步:产生阻塞,一个一个执行
  • 异步:找别人帮我做,我接着做我自己的事

    • 异步编排第一种使用:很多个查询或者很多个没有关联的分支都要执行 ```java @Autowired @Qualifier(“myThreadPool”) private Executor threadPool;

    @Override public void submit(Orders orders) { Long userId = BaseContext.getCurrentId(); //第一条异步线程查询购物车数据并返回结果,supplyAsync方法带返回值 CompletableFuture> taskShoppingCarts = CompletableFuture.supplyAsync(() -> {

    1. List<ShoppingCart> shoppingCarts = shoppingCartService.list(
    2. new LambdaQueryWrapper<ShoppingCart>()
    3. .eq(ShoppingCart::getUserId, userId));
    4. if (shoppingCarts == null || shoppingCarts.size() == 0) {
    5. throw new CustomException("购物车为空,不能下单!");
    6. }
    7. log.info("查询购物车打印时间:"+LocalDateTime.now());
    8. return shoppingCarts;

    },threadPool); //第二条异步线程查询用户数据并返回结果 CompletableFuture taskUser = CompletableFuture.supplyAsync(() -> {

    1. User user = userService.getById(userId);
    2. log.info("查询用户打印时间:"+LocalDateTime.now());
    3. return user;

    }, threadPool);

    //雪花算法生产id Long orderId = IdWorker.getId(); //第三条异步线程查询地址数据并返回结果 CompletableFuture taskAddressBook = CompletableFuture.supplyAsync(() -> {

    1. Long addressBookId = orders.getAddressBookId();
    2. AddressBook addressBook = addressBookService.getById(addressBookId);
    3. if (addressBook == null) {
    4. throw new CustomException("地址信息有误!");
    5. }
    6. log.info("查询地址打印时间:"+LocalDateTime.now());
    7. return addressBook;

    }, threadPool); List shoppingCarts =null; User user =null; AddressBook addressBook =null; try {

    1. CompletableFuture.allOf(taskUser,taskShoppingCarts,taskAddressBook).get();
    2. shoppingCarts = taskShoppingCarts.get();
    3. user = taskUser.get();
    4. addressBook = taskAddressBook.get();

    } catch (Exception e) {

    1. e.printStackTrace();

    } ```

  • 异步编排第二种使用:加载很大的任务,可以以分页的形式把大任务分成很多块。没有线程安全问题 ```java public class Test02 {

    public static void main(String[] args) {

    1. ExecutorService threadPool = Executors.newFixedThreadPool(5);
    2. //每个线程一次执行任务数
    3. int pageSize = 10;
    4. //最大任务量
    5. int allCount = 100;
    6. //计算最大执行次数
    7. int page = allCount % pageSize == 0 ? allCount / pageSize : allCount / pageSize + 1;
    8. //定义信号枪,没执行一次page减一,当page没有变成0,就会调用await()阻塞
    9. CountDownLatch cdl = new CountDownLatch(page);
    10. long startTime = System.currentTimeMillis();
    11. for (int i = 1; i <= page; i++) { // 1 2 3
    12. threadPool.submit(new TaskThread(i, pageSize,cdl));
    13. }
    14. //只有当他听到信号抢之后,主线程再往下边走,
    15. //而且信号枪什么时候响起来呢,当所有分线程都跑完得时候
    16. // countDownLatch 叫信号枪
    17. // countDownLatch countDown await countDownLatch
    18. //他得底层维护了一个变量 10 countDown --> 9 countDown -->8
    19. //当这个变量没有变为0之前,他就会阻塞
    20. try {
    21. cdl.await();
    22. } catch (Exception e) {
    23. e.printStackTrace();
    24. }
    25. long endTime = System.currentTimeMillis();
    26. System.out.println("最后执行结果" + (endTime-startTime));

    }

    static class TaskThread implements Runnable {

    1. int page;
    2. int pageSize;
    3. CountDownLatch countDownLatch;
    4. public TaskThread(int page, int pageSize,CountDownLatch countDownLatch) {
    5. this.page = page;
    6. this.pageSize = pageSize;
    7. this.countDownLatch = countDownLatch;
    8. }
    9. // 想办法 0 1000:执行 0-999
    10. // 想办法:1 1000 执行 1000-1999
    11. // 想办法:2 1000执行 2000-2999
    12. // ....
    13. //当所有的循环都处理好之后,实际上所有的任务都完了
    14. @Override
    15. public void run() {
    16. int startPage = (page-1)*pageSize; // 1 1000
    17. int endPage = (page * pageSize) -1 ; // 999 1999
    18. for (int i = startPage; i <= endPage; i++) {
    19. System.out.println(Thread.currentThread().getName() + "正在处理第" + i );
    20. }
    21. countDownLatch.countDown();
    22. }

    }

}

```