为什么要使用多线程
应用场景
秒杀
思考:图书秒杀案例,为什么在高并发下会出问题?<br />项目代码:[https://gitee.com/eleorc/seckill.git](https://gitee.com/eleorc/seckill.git)<br />Apache压力测试工具:[https://www.apachehaus.com/cgi-bin/download.plx](https://www.apachehaus.com/cgi-bin/download.plx)
抢票
经典抢票案例。
- 同步代码的粒度
- 双重检查 ```java package com.qf.sy2103.thread02;
import java.util.concurrent.TimeUnit;
/**
- 并发问题的根源:
- 如果程序中
- (1)存在共享资源(实例对象的实例属性),
- (2)并且多个线程可以同时访问到该共享资源,
- (3)并且访问共享资源的方法不能保证原子性
- 就会出现并发问题。
- 解决方案:采用适当的同步措施。 */
public class SaleTicket { public static void main(String[] args) {
Ticket ticket = new Ticket();int i = 0;for (; i < 100; i++) {new Thread(new Runnable() {@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}for (int i = 0; i < 30; i++) {ticket.sale();}}}).start();}}
}
class Ticket {
private int number = 5;// 初始有5张票public void sale() {if (number > 0) {synchronized (this) {if (number > 0) {System.out.println(Thread.currentThread().getName() + ":出票,票号" + (number--));}}}}
}
- 一个奇怪的结果,同一个线程似乎有较大的概率获取到锁(非公平锁)- synchronized是非公平锁- 公平锁?```javapackage com.qf.sy2103.seckill;import java.util.concurrent.TimeUnit;public class SaleTicket {public static void main(String[] args) {Ticket ticket = new Ticket();int i = 0;for (;i<100;i++) {new Thread(new Runnable() {@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}for (int i = 0;i<30;i++) {ticket.sale();}}}).start();}}}class Ticket{private int number = 5 ;// 初始有5张票public void sale(){if(number>0){System.out.println(Thread.currentThread().getName()+":出票,票号"+(number--));}}}
- 大数据处理
- 生产者消费者模型:使用多线程可以简化程序开发,典型的就是生产者消费者模型。生产者和消费者使用不同的线程。
多线程理想的状态:无冲突并行
多线程编程模型的引入
web编程中自动引入了多线程模型
JavaAPI中自动引入多线程模型
如何控制多线程程序的正确性?加锁
利用JUC包的类自定义锁
package com.qf.sy2103.seckill;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.locks.LockSupport;public class QfLock {private AtomicInteger state = new AtomicInteger(0);private Thread owner ;private ConcurrentLinkedQueue<Thread> queue = new ConcurrentLinkedQueue<>();public void lock(){if (tryAquire()){return;}Thread currentThread = Thread.currentThread();for (;;){if (tryAquire()){return;}queue.add(currentThread);LockSupport.park(currentThread);}}private boolean tryAquire() {final boolean b = state.compareAndSet(0, 1);if (b){this.owner = Thread.currentThread();return true;}else {return false;}}public void unlock(){if (this.owner!=Thread.currentThread()){throw new RuntimeException("!!!");}final boolean b = state.compareAndSet(1, 0);if (b){this.owner = null;final Thread next = queue.poll();if (next!=null){LockSupport.unpark(next);}}}public static void main(String[] args) {final QfLock qfLock = new QfLock();for (int i=0;i<3;i++) {new Thread(new Runnable() {@Overridepublic void run() {qfLock.lock();System.out.println(Thread.currentThread().getName());qfLock.unlock();}}).start();}}}
公平锁、非公平锁
- 公平锁:当有多个线程进行申请同一把锁的时候,如果线程是按照申请锁的先后顺序依次获取到该锁的,这种锁就叫做公平锁。a,b,c ,假如a线程拿到了synchronized锁,d线程进入争抢锁了,公平锁会保证,一定是b接下来拿到锁,然后是c接下来拿到锁,然后才是d。
- 非公平锁:synchronized 。 a,b,c ,加入a线程拿到了synchronized锁,d线程进入争抢锁了,d就竞争到了锁,那么b和c依然在等待,d可以运行了。 ```java private Lock lock = new ReentrantLock();// 创建一个juc包里的可重入锁 非公平锁
// private Lock lock = new ReentrantLock(true); // 创建一个公平锁。
<a name="TgXP2"></a>### 可重入锁```javapackage com.qf.sy2103.thread02;/*** synchronized 是可重入锁*/public class ReentrantLockDemo {public static void main(String[] args) {final ReentrantLockDemo reentrantLockDemo = new ReentrantLockDemo();reentrantLockDemo.test3();}public synchronized void test1(){System.out.println("test1 is runnning..");test2();}public synchronized void test2(){System.out.println("test2 is runnning..");}public void test3(){test1();}}
package com.qf.sy2103.thread02;import java.util.concurrent.locks.ReentrantLock;/*** ReentrantLock 是可重入锁*/public class ReentrantLockDemo {private ReentrantLock lock = new ReentrantLock();public static void main(String[] args) {final ReentrantLockDemo reentrantLockDemo = new ReentrantLockDemo();reentrantLockDemo.test3();}public void test1(){System.out.println("test1 is runnning..");test2();}public void test2(){System.out.println("test2 is runnning..");}public void test3(){lock.lock();lock.lock();try {test1();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();lock.unlock();}}}
自旋锁
自旋:一段死循环 。
package com.qf.juc;import java.util.concurrent.atomic.AtomicReference;/*** 自定义自旋锁*/public class MySpinLock {private AtomicReference<Thread> atomicReference = new AtomicReference<>();public void lock(){while ( !atomicReference.compareAndSet(null,Thread.currentThread()) ){System.out.println("....");}}public void unlock(){while ( !atomicReference.compareAndSet(Thread.currentThread(),null) ){}}public static void main(String[] args) {MySpinLock mySpinLock = new MySpinLock();mySpinLock.lock();System.out.println("tets");mySpinLock.unlock();}}
读写锁
package com.qf.sy2103.thread02;import java.util.HashMap;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.ReentrantReadWriteLock;public class ReadWriteLockDemo {private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();private ReentrantReadWriteLock.ReadLock readLock = lock.readLock();private ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();private HashMap<Integer,Integer> map = new HashMap<Integer,Integer>();public void put(Integer key,Integer value){try {writeLock.lock();System.out.println(Thread.currentThread().getName()+"write ...");TimeUnit.SECONDS.sleep(3);map.put(key,value);} catch (Exception e) {e.printStackTrace();} finally {writeLock.unlock();}}public Integer get(Integer key){Integer v = null;try {readLock.lock();System.out.println(Thread.currentThread().getName()+"read...");TimeUnit.SECONDS.sleep(1);v = map.get(key);} catch (Exception e) {e.printStackTrace();} finally {readLock.unlock();}return v;}public static void main(String[] args) {final ReadWriteLockDemo readWriteLockDemo = new ReadWriteLockDemo();new Thread(new Runnable() {@Overridepublic void run() {for (int i = 0; i <3 ; i++) {readWriteLockDemo.put(i,i);}}}).start();for (int i = 0 ;i<5;i++) {new Thread(new Runnable() {@Overridepublic void run() {for (int i = 0 ;i<3;i++) {final Integer integer = readWriteLockDemo.get(1);}}}).start();}new Thread(new Runnable() {@Overridepublic void run() {for (int i = 0; i <3 ; i++) {readWriteLockDemo.put(i,i);}}}).start();}}
死锁
检测死锁
- jps : 列出本机所有的java进程号
- jstack 进程号: 列出死锁信息 ```java package com.qf.juc;
import java.util.concurrent.TimeUnit;
public class DeadLock {
public static void main(String[] args) {Object locka = new Object();Object lockb = new Object();new Thread(new Runnable() {@Overridepublic void run() {synchronized (locka) {System.out.println(Thread.currentThread().getName() + "获取到了locka");try {TimeUnit.SECONDS.sleep(1);synchronized (lockb) {System.out.println(Thread.currentThread().getName() + "获取到了lockb");}} catch (InterruptedException e) {e.printStackTrace();}}}}, "A").start();new Thread(new Runnable() {@Overridepublic void run() {synchronized (lockb) {System.out.println(Thread.currentThread().getName() + "获取到了lockb");synchronized (locka) {System.out.println(Thread.currentThread().getName() + "获取到了locka");}}}}, "B").start();}
}
<a name="AIU7m"></a>## 多线程协作编程模型<a name="w7p3h"></a>### 生产者和消费者问题口诀: 判断、等待、业务、通知```javapackage com.qf.juc;public class ProduceConsum {public static void main(String[] args) {Product product = new Product();new Thread(new Runnable() {@Overridepublic void run() {try {for (int i=0;i<10;i++) {product.add();}} catch (InterruptedException e) {e.printStackTrace();}}},"A").start();new Thread(new Runnable() {@Overridepublic void run() {try {for (int i=0;i<10;i++) {product.sub();}} catch (InterruptedException e) {e.printStackTrace();}}},"B").start();}}class Product {private int count = 0;// 生产商品,如果商品的数量大于0,则停止生产商品public synchronized void add() throws InterruptedException {// 1 判断if(count > 0) {this.wait(); // 2 等待}// 3 、业务代码处理System.out.println(Thread.currentThread().getName()+":生产了一个商品,当前的商品数量:"+(++count));// 4 、通知其他等待线程this.notifyAll();}// 消费商品,如果商品的数量小于1,则停止消费商品public synchronized void sub() throws InterruptedException {// 1 判断if(count<1){this.wait(); // 2 等待}// 3 业务System.out.println(Thread.currentThread().getName()+":消费了一个商品,当前的商品数量为:"+(--count));// 4 通知其他线程this.notifyAll();}}
虚假唤醒问题
上面的示例如果生产者和消费者的线程不是一比一,则会出现问题。
class Product {private int count = 0;// 生产商品,如果商品的数量大于0,则停止生产商品public synchronized void add() throws InterruptedException {// 1 判断while (count > 0) {this.wait(); // 2 等待}// 3 、业务代码处理System.out.println(Thread.currentThread().getName()+":生产了一个商品,当前的商品数量:"+(++count));// 4 、通知其他等待线程this.notifyAll();}// 消费商品,如果商品的数量小于1,则停止消费商品public synchronized void sub() throws InterruptedException {// 1 判断while(count<1){this.wait(); // 2 等待}// 3 业务System.out.println(Thread.currentThread().getName()+":消费了一个商品,当前的商品数量为:"+(--count));// 4 通知其他线程this.notifyAll();}}
精准通知问题
package com.qf.juc;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class Demo5 {public static void main(String[] args) {A a = new A();new Thread(new Runnable() {@Overridepublic void run() {for (int i =0;i<5;i++) {a.test1();}}},"A").start();new Thread(new Runnable() {@Overridepublic void run() {for (int i=0;i<5;i++) {a.test2();}}},"B").start();new Thread(new Runnable() {@Overridepublic void run() {for (int i=0;i<5;i++) {a.test3();}}},"C").start();}}class A {private int state = 1;private Lock lock = new ReentrantLock();private Condition condition1 = lock.newCondition();private Condition condition2 = lock.newCondition();private Condition condition3 = lock.newCondition();public void test1() {// 1 加锁lock.lock();try {while (state != 1) {// 让当前线程在 condition1 上进行等待condition1.await();}System.out.println(Thread.currentThread().getName() + "test1 start... ");// 指定唤醒的test2 这个方法this.state = 2;// 定向通知,通知在condition2 上等待的线程,可以唤醒执行了condition2.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public void test2() {// 1 加锁lock.lock();try {while (state != 2) {// 让当前线程在 condition1 上进行等待condition2.await();}System.out.println(Thread.currentThread().getName() + "test2 start... ");// 指定唤醒的test2 这个方法this.state = 3;// 定向通知,通知在condition3 上等待的线程,可以唤醒执行了condition3.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public void test3() {// 1 加锁lock.lock();try {while (state != 3) {// 让当前线程在 condition1 上进行等待condition3.await();}System.out.println(Thread.currentThread().getName() + "test3 start... ");// 指定唤醒的test1 这个方法this.state = 1;// 定向通知,通知在condition1 上等待的线程,可以唤醒执行了condition1.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}}
JUC常用辅助类
CountDownLatch
package com.qf.juc;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;public class Demo2 {// 共享资源public int a = 0 ;public static void main(String[] args) throws InterruptedException {Demo2 demo2 = new Demo2();// 创建一个倒计时器,并且赋值为2.如果主线程在latch对象上等待的化,那么需要其他线程// 调用2次 countdown方法,主线程才会被唤醒继续执行。CountDownLatch latch = new CountDownLatch(2);new Thread(new Runnable() {@Overridepublic void run() {for (int i=0;i<100000;i++) {demo2.a++;}// 把latch对象的计数器减1latch.countDown();}},"A").start();new Thread(new Runnable() {@Overridepublic void run() {for (int i=0;i<100000;i++) {demo2.a++;}latch.countDown();}},"B").start();// 让main线程等待一会// Thread.sleep(1000);// todo: juc 里有单独的类解决这个问题 : CountDownLatch// TimeUnit.SECONDS.sleep(10);// 让主线程在 latch对象上等待,那么需要等待多久呢?// 需要等到 latch对象的计数器变成0 .latch.await();System.out.println(demo2.a);}}
CyclicBarrier (召唤神龙)
package com.qf.juc;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class Demo7 {public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(7, new Runnable() {@Overridepublic void run() {System.out.println("集齐了七颗龙珠可以召唤神龙了!");}});for (int i=0;i<7;i++) {int finalI = i;new Thread(new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+":收集到了龙珠:"+ finalI);try {cyclicBarrier.await();System.out.println(Thread.currentThread().getName()+":执行结束");} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}},""+i).start();}}}
Semaphore
package com.qf.juc;import java.util.concurrent.Semaphore;import java.util.concurrent.TimeUnit;public class Demo8 {public static void main(String[] args) {Semaphore semaphore = new Semaphore(3);for (int i=0;i<6;i++) {new Thread(new Runnable() {@Overridepublic void run() {for (int i=0;i<5;i++) {try {// 在信号量上申请,如果成功,可以执行业务代码,并且信号两的计数器会减一semaphore.acquire();System.out.println(Thread.currentThread().getName()+":抢到了车位");TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}finally {// 信号两的计数器加1semaphore.release();}}}},""+i).start();}}}
