为什么要使用多线程

应用场景

  • 秒杀

    1. 思考:图书秒杀案例,为什么在高并发下会出问题?<br />项目代码:[https://gitee.com/eleorc/seckill.git](https://gitee.com/eleorc/seckill.git)<br />Apache压力测试工具:[https://www.apachehaus.com/cgi-bin/download.plx](https://www.apachehaus.com/cgi-bin/download.plx)
  • 抢票

经典抢票案例。

  • 同步代码的粒度
  • 双重检查 ```java package com.qf.sy2103.thread02;

import java.util.concurrent.TimeUnit;

/**

  • 并发问题的根源:
  • 如果程序中
  • (1)存在共享资源(实例对象的实例属性),
  • (2)并且多个线程可以同时访问到该共享资源,
  • (3)并且访问共享资源的方法不能保证原子性
  • 就会出现并发问题。
  • 解决方案:采用适当的同步措施。 */

public class SaleTicket { public static void main(String[] args) {

  1. Ticket ticket = new Ticket();
  2. int i = 0;
  3. for (; i < 100; i++) {
  4. new Thread(new Runnable() {
  5. @Override
  6. public void run() {
  7. try {
  8. TimeUnit.SECONDS.sleep(1);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. for (int i = 0; i < 30; i++) {
  13. ticket.sale();
  14. }
  15. }
  16. }).start();
  17. }
  18. }

}

class Ticket {

  1. private int number = 5;// 初始有5张票
  2. public void sale() {
  3. if (number > 0) {
  4. synchronized (this) {
  5. if (number > 0) {
  6. System.out.println(Thread.currentThread().getName() + ":出票,票号" + (number--));
  7. }
  8. }
  9. }
  10. }

}

  1. - 一个奇怪的结果,同一个线程似乎有较大的概率获取到锁(非公平锁)
  2. - synchronized是非公平锁
  3. - 公平锁?
  4. ```java
  5. package com.qf.sy2103.seckill;
  6. import java.util.concurrent.TimeUnit;
  7. public class SaleTicket {
  8. public static void main(String[] args) {
  9. Ticket ticket = new Ticket();
  10. int i = 0;
  11. for (;i<100;i++) {
  12. new Thread(new Runnable() {
  13. @Override
  14. public void run() {
  15. try {
  16. TimeUnit.SECONDS.sleep(1);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. for (int i = 0;i<30;i++) {
  21. ticket.sale();
  22. }
  23. }
  24. }).start();
  25. }
  26. }
  27. }
  28. class Ticket{
  29. private int number = 5 ;// 初始有5张票
  30. public void sale(){
  31. if(number>0){
  32. System.out.println(Thread.currentThread().getName()+":出票,票号"+(number--));
  33. }
  34. }
  35. }
  • 大数据处理
  • 生产者消费者模型:使用多线程可以简化程序开发,典型的就是生产者消费者模型。生产者和消费者使用不同的线程。

多线程理想的状态:无冲突并行

多线程编程模型的引入

web编程中自动引入了多线程模型

JavaAPI中自动引入多线程模型

如何控制多线程程序的正确性?加锁

利用JUC包的类自定义锁

  1. package com.qf.sy2103.seckill;
  2. import java.util.concurrent.ConcurrentLinkedQueue;
  3. import java.util.concurrent.atomic.AtomicInteger;
  4. import java.util.concurrent.locks.LockSupport;
  5. public class QfLock {
  6. private AtomicInteger state = new AtomicInteger(0);
  7. private Thread owner ;
  8. private ConcurrentLinkedQueue<Thread> queue = new ConcurrentLinkedQueue<>();
  9. public void lock(){
  10. if (tryAquire()){
  11. return;
  12. }
  13. Thread currentThread = Thread.currentThread();
  14. for (;;){
  15. if (tryAquire()){
  16. return;
  17. }
  18. queue.add(currentThread);
  19. LockSupport.park(currentThread);
  20. }
  21. }
  22. private boolean tryAquire() {
  23. final boolean b = state.compareAndSet(0, 1);
  24. if (b){
  25. this.owner = Thread.currentThread();
  26. return true;
  27. }else {
  28. return false;
  29. }
  30. }
  31. public void unlock(){
  32. if (this.owner!=Thread.currentThread()){
  33. throw new RuntimeException("!!!");
  34. }
  35. final boolean b = state.compareAndSet(1, 0);
  36. if (b){
  37. this.owner = null;
  38. final Thread next = queue.poll();
  39. if (next!=null){
  40. LockSupport.unpark(next);
  41. }
  42. }
  43. }
  44. public static void main(String[] args) {
  45. final QfLock qfLock = new QfLock();
  46. for (int i=0;i<3;i++) {
  47. new Thread(new Runnable() {
  48. @Override
  49. public void run() {
  50. qfLock.lock();
  51. System.out.println(Thread.currentThread().getName());
  52. qfLock.unlock();
  53. }
  54. }).start();
  55. }
  56. }
  57. }

公平锁、非公平锁

  1. 公平锁:当有多个线程进行申请同一把锁的时候,如果线程是按照申请锁的先后顺序依次获取到该锁的,这种锁就叫做公平锁。a,b,c ,假如a线程拿到了synchronized锁,d线程进入争抢锁了,公平锁会保证,一定是b接下来拿到锁,然后是c接下来拿到锁,然后才是d。
  2. 非公平锁:synchronized 。 a,b,c ,加入a线程拿到了synchronized锁,d线程进入争抢锁了,d就竞争到了锁,那么b和c依然在等待,d可以运行了。 ```java private Lock lock = new ReentrantLock();// 创建一个juc包里的可重入锁 非公平锁

// private Lock lock = new ReentrantLock(true); // 创建一个公平锁。

  1. <a name="TgXP2"></a>
  2. ### 可重入锁
  3. ```java
  4. package com.qf.sy2103.thread02;
  5. /**
  6. * synchronized 是可重入锁
  7. */
  8. public class ReentrantLockDemo {
  9. public static void main(String[] args) {
  10. final ReentrantLockDemo reentrantLockDemo = new ReentrantLockDemo();
  11. reentrantLockDemo.test3();
  12. }
  13. public synchronized void test1(){
  14. System.out.println("test1 is runnning..");
  15. test2();
  16. }
  17. public synchronized void test2(){
  18. System.out.println("test2 is runnning..");
  19. }
  20. public void test3(){
  21. test1();
  22. }
  23. }
  1. package com.qf.sy2103.thread02;
  2. import java.util.concurrent.locks.ReentrantLock;
  3. /**
  4. * ReentrantLock 是可重入锁
  5. */
  6. public class ReentrantLockDemo {
  7. private ReentrantLock lock = new ReentrantLock();
  8. public static void main(String[] args) {
  9. final ReentrantLockDemo reentrantLockDemo = new ReentrantLockDemo();
  10. reentrantLockDemo.test3();
  11. }
  12. public void test1(){
  13. System.out.println("test1 is runnning..");
  14. test2();
  15. }
  16. public void test2(){
  17. System.out.println("test2 is runnning..");
  18. }
  19. public void test3(){
  20. lock.lock();
  21. lock.lock();
  22. try {
  23. test1();
  24. } catch (Exception e) {
  25. e.printStackTrace();
  26. } finally {
  27. lock.unlock();
  28. lock.unlock();
  29. }
  30. }
  31. }

自旋锁

自旋:一段死循环 。

  1. package com.qf.juc;
  2. import java.util.concurrent.atomic.AtomicReference;
  3. /**
  4. * 自定义自旋锁
  5. */
  6. public class MySpinLock {
  7. private AtomicReference<Thread> atomicReference = new AtomicReference<>();
  8. public void lock(){
  9. while ( !atomicReference.compareAndSet(null,Thread.currentThread()) ){
  10. System.out.println("....");
  11. }
  12. }
  13. public void unlock(){
  14. while ( !atomicReference.compareAndSet(Thread.currentThread(),null) ){
  15. }
  16. }
  17. public static void main(String[] args) {
  18. MySpinLock mySpinLock = new MySpinLock();
  19. mySpinLock.lock();
  20. System.out.println("tets");
  21. mySpinLock.unlock();
  22. }
  23. }

读写锁

  1. package com.qf.sy2103.thread02;
  2. import java.util.HashMap;
  3. import java.util.concurrent.TimeUnit;
  4. import java.util.concurrent.locks.ReentrantReadWriteLock;
  5. public class ReadWriteLockDemo {
  6. private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  7. private ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
  8. private ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
  9. private HashMap<Integer,Integer> map = new HashMap<Integer,Integer>();
  10. public void put(Integer key,Integer value){
  11. try {
  12. writeLock.lock();
  13. System.out.println(Thread.currentThread().getName()+"write ...");
  14. TimeUnit.SECONDS.sleep(3);
  15. map.put(key,value);
  16. } catch (Exception e) {
  17. e.printStackTrace();
  18. } finally {
  19. writeLock.unlock();
  20. }
  21. }
  22. public Integer get(Integer key){
  23. Integer v = null;
  24. try {
  25. readLock.lock();
  26. System.out.println(Thread.currentThread().getName()+"read...");
  27. TimeUnit.SECONDS.sleep(1);
  28. v = map.get(key);
  29. } catch (Exception e) {
  30. e.printStackTrace();
  31. } finally {
  32. readLock.unlock();
  33. }
  34. return v;
  35. }
  36. public static void main(String[] args) {
  37. final ReadWriteLockDemo readWriteLockDemo = new ReadWriteLockDemo();
  38. new Thread(new Runnable() {
  39. @Override
  40. public void run() {
  41. for (int i = 0; i <3 ; i++) {
  42. readWriteLockDemo.put(i,i);
  43. }
  44. }
  45. }).start();
  46. for (int i = 0 ;i<5;i++) {
  47. new Thread(new Runnable() {
  48. @Override
  49. public void run() {
  50. for (int i = 0 ;i<3;i++) {
  51. final Integer integer = readWriteLockDemo.get(1);
  52. }
  53. }
  54. }).start();
  55. }
  56. new Thread(new Runnable() {
  57. @Override
  58. public void run() {
  59. for (int i = 0; i <3 ; i++) {
  60. readWriteLockDemo.put(i,i);
  61. }
  62. }
  63. }).start();
  64. }
  65. }

死锁

检测死锁

  1. jps : 列出本机所有的java进程号
  2. jstack 进程号: 列出死锁信息 ```java package com.qf.juc;

import java.util.concurrent.TimeUnit;

public class DeadLock {

  1. public static void main(String[] args) {
  2. Object locka = new Object();
  3. Object lockb = new Object();
  4. new Thread(new Runnable() {
  5. @Override
  6. public void run() {
  7. synchronized (locka) {
  8. System.out.println(Thread.currentThread().getName() + "获取到了locka");
  9. try {
  10. TimeUnit.SECONDS.sleep(1);
  11. synchronized (lockb) {
  12. System.out.println(Thread.currentThread().getName() + "获取到了lockb");
  13. }
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. }
  19. }, "A").start();
  20. new Thread(new Runnable() {
  21. @Override
  22. public void run() {
  23. synchronized (lockb) {
  24. System.out.println(Thread.currentThread().getName() + "获取到了lockb");
  25. synchronized (locka) {
  26. System.out.println(Thread.currentThread().getName() + "获取到了locka");
  27. }
  28. }
  29. }
  30. }, "B").start();
  31. }

}

  1. <a name="AIU7m"></a>
  2. ## 多线程协作编程模型
  3. <a name="w7p3h"></a>
  4. ### 生产者和消费者问题
  5. 口诀: 判断、等待、业务、通知
  6. ```java
  7. package com.qf.juc;
  8. public class ProduceConsum {
  9. public static void main(String[] args) {
  10. Product product = new Product();
  11. new Thread(new Runnable() {
  12. @Override
  13. public void run() {
  14. try {
  15. for (int i=0;i<10;i++) {
  16. product.add();
  17. }
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. },"A").start();
  23. new Thread(new Runnable() {
  24. @Override
  25. public void run() {
  26. try {
  27. for (int i=0;i<10;i++) {
  28. product.sub();
  29. }
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. },"B").start();
  35. }
  36. }
  37. class Product {
  38. private int count = 0;
  39. // 生产商品,如果商品的数量大于0,则停止生产商品
  40. public synchronized void add() throws InterruptedException {
  41. // 1 判断
  42. if(count > 0) {
  43. this.wait(); // 2 等待
  44. }
  45. // 3 、业务代码处理
  46. System.out.println(Thread.currentThread().getName()+":生产了一个商品,当前的商品数量:"+(++count));
  47. // 4 、通知其他等待线程
  48. this.notifyAll();
  49. }
  50. // 消费商品,如果商品的数量小于1,则停止消费商品
  51. public synchronized void sub() throws InterruptedException {
  52. // 1 判断
  53. if(count<1){
  54. this.wait(); // 2 等待
  55. }
  56. // 3 业务
  57. System.out.println(Thread.currentThread().getName()+":消费了一个商品,当前的商品数量为:"+(--count));
  58. // 4 通知其他线程
  59. this.notifyAll();
  60. }
  61. }

虚假唤醒问题

上面的示例如果生产者和消费者的线程不是一比一,则会出现问题。

  1. class Product {
  2. private int count = 0;
  3. // 生产商品,如果商品的数量大于0,则停止生产商品
  4. public synchronized void add() throws InterruptedException {
  5. // 1 判断
  6. while (count > 0) {
  7. this.wait(); // 2 等待
  8. }
  9. // 3 、业务代码处理
  10. System.out.println(Thread.currentThread().getName()+":生产了一个商品,当前的商品数量:"+(++count));
  11. // 4 、通知其他等待线程
  12. this.notifyAll();
  13. }
  14. // 消费商品,如果商品的数量小于1,则停止消费商品
  15. public synchronized void sub() throws InterruptedException {
  16. // 1 判断
  17. while(count<1){
  18. this.wait(); // 2 等待
  19. }
  20. // 3 业务
  21. System.out.println(Thread.currentThread().getName()+":消费了一个商品,当前的商品数量为:"+(--count));
  22. // 4 通知其他线程
  23. this.notifyAll();
  24. }
  25. }

精准通知问题

  1. package com.qf.juc;
  2. import java.util.concurrent.locks.Condition;
  3. import java.util.concurrent.locks.Lock;
  4. import java.util.concurrent.locks.ReentrantLock;
  5. public class Demo5 {
  6. public static void main(String[] args) {
  7. A a = new A();
  8. new Thread(new Runnable() {
  9. @Override
  10. public void run() {
  11. for (int i =0;i<5;i++) {
  12. a.test1();
  13. }
  14. }
  15. },"A").start();
  16. new Thread(new Runnable() {
  17. @Override
  18. public void run() {
  19. for (int i=0;i<5;i++) {
  20. a.test2();
  21. }
  22. }
  23. },"B").start();
  24. new Thread(new Runnable() {
  25. @Override
  26. public void run() {
  27. for (int i=0;i<5;i++) {
  28. a.test3();
  29. }
  30. }
  31. },"C").start();
  32. }
  33. }
  34. class A {
  35. private int state = 1;
  36. private Lock lock = new ReentrantLock();
  37. private Condition condition1 = lock.newCondition();
  38. private Condition condition2 = lock.newCondition();
  39. private Condition condition3 = lock.newCondition();
  40. public void test1() {
  41. // 1 加锁
  42. lock.lock();
  43. try {
  44. while (state != 1) {
  45. // 让当前线程在 condition1 上进行等待
  46. condition1.await();
  47. }
  48. System.out.println(Thread.currentThread().getName() + "test1 start... ");
  49. // 指定唤醒的test2 这个方法
  50. this.state = 2;
  51. // 定向通知,通知在condition2 上等待的线程,可以唤醒执行了
  52. condition2.signal();
  53. } catch (InterruptedException e) {
  54. e.printStackTrace();
  55. } finally {
  56. lock.unlock();
  57. }
  58. }
  59. public void test2() {
  60. // 1 加锁
  61. lock.lock();
  62. try {
  63. while (state != 2) {
  64. // 让当前线程在 condition1 上进行等待
  65. condition2.await();
  66. }
  67. System.out.println(Thread.currentThread().getName() + "test2 start... ");
  68. // 指定唤醒的test2 这个方法
  69. this.state = 3;
  70. // 定向通知,通知在condition3 上等待的线程,可以唤醒执行了
  71. condition3.signal();
  72. } catch (InterruptedException e) {
  73. e.printStackTrace();
  74. } finally {
  75. lock.unlock();
  76. }
  77. }
  78. public void test3() {
  79. // 1 加锁
  80. lock.lock();
  81. try {
  82. while (state != 3) {
  83. // 让当前线程在 condition1 上进行等待
  84. condition3.await();
  85. }
  86. System.out.println(Thread.currentThread().getName() + "test3 start... ");
  87. // 指定唤醒的test1 这个方法
  88. this.state = 1;
  89. // 定向通知,通知在condition1 上等待的线程,可以唤醒执行了
  90. condition1.signal();
  91. } catch (InterruptedException e) {
  92. e.printStackTrace();
  93. } finally {
  94. lock.unlock();
  95. }
  96. }
  97. }

JUC常用辅助类

CountDownLatch

  1. package com.qf.juc;
  2. import java.util.concurrent.CountDownLatch;
  3. import java.util.concurrent.TimeUnit;
  4. public class Demo2 {
  5. // 共享资源
  6. public int a = 0 ;
  7. public static void main(String[] args) throws InterruptedException {
  8. Demo2 demo2 = new Demo2();
  9. // 创建一个倒计时器,并且赋值为2.如果主线程在latch对象上等待的化,那么需要其他线程
  10. // 调用2次 countdown方法,主线程才会被唤醒继续执行。
  11. CountDownLatch latch = new CountDownLatch(2);
  12. new Thread(new Runnable() {
  13. @Override
  14. public void run() {
  15. for (int i=0;i<100000;i++) {
  16. demo2.a++;
  17. }
  18. // 把latch对象的计数器减1
  19. latch.countDown();
  20. }
  21. },"A").start();
  22. new Thread(new Runnable() {
  23. @Override
  24. public void run() {
  25. for (int i=0;i<100000;i++) {
  26. demo2.a++;
  27. }
  28. latch.countDown();
  29. }
  30. },"B").start();
  31. // 让main线程等待一会
  32. // Thread.sleep(1000);
  33. // todo: juc 里有单独的类解决这个问题 : CountDownLatch
  34. // TimeUnit.SECONDS.sleep(10);
  35. // 让主线程在 latch对象上等待,那么需要等待多久呢?
  36. // 需要等到 latch对象的计数器变成0 .
  37. latch.await();
  38. System.out.println(demo2.a);
  39. }
  40. }

CyclicBarrier (召唤神龙)

  1. package com.qf.juc;
  2. import java.util.concurrent.BrokenBarrierException;
  3. import java.util.concurrent.CyclicBarrier;
  4. public class Demo7 {
  5. public static void main(String[] args) {
  6. CyclicBarrier cyclicBarrier = new CyclicBarrier(7, new Runnable() {
  7. @Override
  8. public void run() {
  9. System.out.println("集齐了七颗龙珠可以召唤神龙了!");
  10. }
  11. });
  12. for (int i=0;i<7;i++) {
  13. int finalI = i;
  14. new Thread(new Runnable() {
  15. @Override
  16. public void run() {
  17. System.out.println(Thread.currentThread().getName()+":收集到了龙珠:"+ finalI);
  18. try {
  19. cyclicBarrier.await();
  20. System.out.println(Thread.currentThread().getName()+":执行结束");
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. } catch (BrokenBarrierException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. },""+i).start();
  28. }
  29. }
  30. }

Semaphore

  1. package com.qf.juc;
  2. import java.util.concurrent.Semaphore;
  3. import java.util.concurrent.TimeUnit;
  4. public class Demo8 {
  5. public static void main(String[] args) {
  6. Semaphore semaphore = new Semaphore(3);
  7. for (int i=0;i<6;i++) {
  8. new Thread(new Runnable() {
  9. @Override
  10. public void run() {
  11. for (int i=0;i<5;i++) {
  12. try {
  13. // 在信号量上申请,如果成功,可以执行业务代码,并且信号两的计数器会减一
  14. semaphore.acquire();
  15. System.out.println(Thread.currentThread().getName()+":抢到了车位");
  16. TimeUnit.SECONDS.sleep(1);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }finally {
  20. // 信号两的计数器加1
  21. semaphore.release();
  22. }
  23. }
  24. }
  25. },""+i).start();
  26. }
  27. }
  28. }