为什么要使用多线程
应用场景
秒杀
思考:图书秒杀案例,为什么在高并发下会出问题?<br />项目代码:[https://gitee.com/eleorc/seckill.git](https://gitee.com/eleorc/seckill.git)<br />Apache压力测试工具:[https://www.apachehaus.com/cgi-bin/download.plx](https://www.apachehaus.com/cgi-bin/download.plx)
抢票
经典抢票案例。
- 同步代码的粒度
- 双重检查 ```java package com.qf.sy2103.thread02;
import java.util.concurrent.TimeUnit;
/**
- 并发问题的根源:
- 如果程序中
- (1)存在共享资源(实例对象的实例属性),
- (2)并且多个线程可以同时访问到该共享资源,
- (3)并且访问共享资源的方法不能保证原子性
- 就会出现并发问题。
- 解决方案:采用适当的同步措施。 */
public class SaleTicket { public static void main(String[] args) {
Ticket ticket = new Ticket();
int i = 0;
for (; i < 100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 30; i++) {
ticket.sale();
}
}
}).start();
}
}
}
class Ticket {
private int number = 5;// 初始有5张票
public void sale() {
if (number > 0) {
synchronized (this) {
if (number > 0) {
System.out.println(Thread.currentThread().getName() + ":出票,票号" + (number--));
}
}
}
}
}
- 一个奇怪的结果,同一个线程似乎有较大的概率获取到锁(非公平锁)
- synchronized是非公平锁
- 公平锁?
```java
package com.qf.sy2103.seckill;
import java.util.concurrent.TimeUnit;
public class SaleTicket {
public static void main(String[] args) {
Ticket ticket = new Ticket();
int i = 0;
for (;i<100;i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0;i<30;i++) {
ticket.sale();
}
}
}).start();
}
}
}
class Ticket{
private int number = 5 ;// 初始有5张票
public void sale(){
if(number>0){
System.out.println(Thread.currentThread().getName()+":出票,票号"+(number--));
}
}
}
- 大数据处理
- 生产者消费者模型:使用多线程可以简化程序开发,典型的就是生产者消费者模型。生产者和消费者使用不同的线程。
多线程理想的状态:无冲突并行
多线程编程模型的引入
web编程中自动引入了多线程模型
JavaAPI中自动引入多线程模型
如何控制多线程程序的正确性?加锁
利用JUC包的类自定义锁
package com.qf.sy2103.seckill;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
public class QfLock {
private AtomicInteger state = new AtomicInteger(0);
private Thread owner ;
private ConcurrentLinkedQueue<Thread> queue = new ConcurrentLinkedQueue<>();
public void lock(){
if (tryAquire()){
return;
}
Thread currentThread = Thread.currentThread();
for (;;){
if (tryAquire()){
return;
}
queue.add(currentThread);
LockSupport.park(currentThread);
}
}
private boolean tryAquire() {
final boolean b = state.compareAndSet(0, 1);
if (b){
this.owner = Thread.currentThread();
return true;
}else {
return false;
}
}
public void unlock(){
if (this.owner!=Thread.currentThread()){
throw new RuntimeException("!!!");
}
final boolean b = state.compareAndSet(1, 0);
if (b){
this.owner = null;
final Thread next = queue.poll();
if (next!=null){
LockSupport.unpark(next);
}
}
}
public static void main(String[] args) {
final QfLock qfLock = new QfLock();
for (int i=0;i<3;i++) {
new Thread(new Runnable() {
@Override
public void run() {
qfLock.lock();
System.out.println(Thread.currentThread().getName());
qfLock.unlock();
}
}).start();
}
}
}
公平锁、非公平锁
- 公平锁:当有多个线程进行申请同一把锁的时候,如果线程是按照申请锁的先后顺序依次获取到该锁的,这种锁就叫做公平锁。a,b,c ,假如a线程拿到了synchronized锁,d线程进入争抢锁了,公平锁会保证,一定是b接下来拿到锁,然后是c接下来拿到锁,然后才是d。
- 非公平锁:synchronized 。 a,b,c ,加入a线程拿到了synchronized锁,d线程进入争抢锁了,d就竞争到了锁,那么b和c依然在等待,d可以运行了。 ```java private Lock lock = new ReentrantLock();// 创建一个juc包里的可重入锁 非公平锁
// private Lock lock = new ReentrantLock(true); // 创建一个公平锁。
<a name="TgXP2"></a>
### 可重入锁
```java
package com.qf.sy2103.thread02;
/**
* synchronized 是可重入锁
*/
public class ReentrantLockDemo {
public static void main(String[] args) {
final ReentrantLockDemo reentrantLockDemo = new ReentrantLockDemo();
reentrantLockDemo.test3();
}
public synchronized void test1(){
System.out.println("test1 is runnning..");
test2();
}
public synchronized void test2(){
System.out.println("test2 is runnning..");
}
public void test3(){
test1();
}
}
package com.qf.sy2103.thread02;
import java.util.concurrent.locks.ReentrantLock;
/**
* ReentrantLock 是可重入锁
*/
public class ReentrantLockDemo {
private ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
final ReentrantLockDemo reentrantLockDemo = new ReentrantLockDemo();
reentrantLockDemo.test3();
}
public void test1(){
System.out.println("test1 is runnning..");
test2();
}
public void test2(){
System.out.println("test2 is runnning..");
}
public void test3(){
lock.lock();
lock.lock();
try {
test1();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
lock.unlock();
}
}
}
自旋锁
自旋:一段死循环 。
package com.qf.juc;
import java.util.concurrent.atomic.AtomicReference;
/**
* 自定义自旋锁
*/
public class MySpinLock {
private AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void lock(){
while ( !atomicReference.compareAndSet(null,Thread.currentThread()) ){
System.out.println("....");
}
}
public void unlock(){
while ( !atomicReference.compareAndSet(Thread.currentThread(),null) ){
}
}
public static void main(String[] args) {
MySpinLock mySpinLock = new MySpinLock();
mySpinLock.lock();
System.out.println("tets");
mySpinLock.unlock();
}
}
读写锁
package com.qf.sy2103.thread02;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockDemo {
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
private ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
private HashMap<Integer,Integer> map = new HashMap<Integer,Integer>();
public void put(Integer key,Integer value){
try {
writeLock.lock();
System.out.println(Thread.currentThread().getName()+"write ...");
TimeUnit.SECONDS.sleep(3);
map.put(key,value);
} catch (Exception e) {
e.printStackTrace();
} finally {
writeLock.unlock();
}
}
public Integer get(Integer key){
Integer v = null;
try {
readLock.lock();
System.out.println(Thread.currentThread().getName()+"read...");
TimeUnit.SECONDS.sleep(1);
v = map.get(key);
} catch (Exception e) {
e.printStackTrace();
} finally {
readLock.unlock();
}
return v;
}
public static void main(String[] args) {
final ReadWriteLockDemo readWriteLockDemo = new ReadWriteLockDemo();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i <3 ; i++) {
readWriteLockDemo.put(i,i);
}
}
}).start();
for (int i = 0 ;i<5;i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0 ;i<3;i++) {
final Integer integer = readWriteLockDemo.get(1);
}
}
}).start();
}
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i <3 ; i++) {
readWriteLockDemo.put(i,i);
}
}
}).start();
}
}
死锁
检测死锁
- jps : 列出本机所有的java进程号
- jstack 进程号: 列出死锁信息 ```java package com.qf.juc;
import java.util.concurrent.TimeUnit;
public class DeadLock {
public static void main(String[] args) {
Object locka = new Object();
Object lockb = new Object();
new Thread(new Runnable() {
@Override
public void run() {
synchronized (locka) {
System.out.println(Thread.currentThread().getName() + "获取到了locka");
try {
TimeUnit.SECONDS.sleep(1);
synchronized (lockb) {
System.out.println(Thread.currentThread().getName() + "获取到了lockb");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "A").start();
new Thread(new Runnable() {
@Override
public void run() {
synchronized (lockb) {
System.out.println(Thread.currentThread().getName() + "获取到了lockb");
synchronized (locka) {
System.out.println(Thread.currentThread().getName() + "获取到了locka");
}
}
}
}, "B").start();
}
}
<a name="AIU7m"></a>
## 多线程协作编程模型
<a name="w7p3h"></a>
### 生产者和消费者问题
口诀: 判断、等待、业务、通知
```java
package com.qf.juc;
public class ProduceConsum {
public static void main(String[] args) {
Product product = new Product();
new Thread(new Runnable() {
@Override
public void run() {
try {
for (int i=0;i<10;i++) {
product.add();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(new Runnable() {
@Override
public void run() {
try {
for (int i=0;i<10;i++) {
product.sub();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"B").start();
}
}
class Product {
private int count = 0;
// 生产商品,如果商品的数量大于0,则停止生产商品
public synchronized void add() throws InterruptedException {
// 1 判断
if(count > 0) {
this.wait(); // 2 等待
}
// 3 、业务代码处理
System.out.println(Thread.currentThread().getName()+":生产了一个商品,当前的商品数量:"+(++count));
// 4 、通知其他等待线程
this.notifyAll();
}
// 消费商品,如果商品的数量小于1,则停止消费商品
public synchronized void sub() throws InterruptedException {
// 1 判断
if(count<1){
this.wait(); // 2 等待
}
// 3 业务
System.out.println(Thread.currentThread().getName()+":消费了一个商品,当前的商品数量为:"+(--count));
// 4 通知其他线程
this.notifyAll();
}
}
虚假唤醒问题
上面的示例如果生产者和消费者的线程不是一比一,则会出现问题。
class Product {
private int count = 0;
// 生产商品,如果商品的数量大于0,则停止生产商品
public synchronized void add() throws InterruptedException {
// 1 判断
while (count > 0) {
this.wait(); // 2 等待
}
// 3 、业务代码处理
System.out.println(Thread.currentThread().getName()+":生产了一个商品,当前的商品数量:"+(++count));
// 4 、通知其他等待线程
this.notifyAll();
}
// 消费商品,如果商品的数量小于1,则停止消费商品
public synchronized void sub() throws InterruptedException {
// 1 判断
while(count<1){
this.wait(); // 2 等待
}
// 3 业务
System.out.println(Thread.currentThread().getName()+":消费了一个商品,当前的商品数量为:"+(--count));
// 4 通知其他线程
this.notifyAll();
}
}
精准通知问题
package com.qf.juc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Demo5 {
public static void main(String[] args) {
A a = new A();
new Thread(new Runnable() {
@Override
public void run() {
for (int i =0;i<5;i++) {
a.test1();
}
}
},"A").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i=0;i<5;i++) {
a.test2();
}
}
},"B").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i=0;i<5;i++) {
a.test3();
}
}
},"C").start();
}
}
class A {
private int state = 1;
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void test1() {
// 1 加锁
lock.lock();
try {
while (state != 1) {
// 让当前线程在 condition1 上进行等待
condition1.await();
}
System.out.println(Thread.currentThread().getName() + "test1 start... ");
// 指定唤醒的test2 这个方法
this.state = 2;
// 定向通知,通知在condition2 上等待的线程,可以唤醒执行了
condition2.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void test2() {
// 1 加锁
lock.lock();
try {
while (state != 2) {
// 让当前线程在 condition1 上进行等待
condition2.await();
}
System.out.println(Thread.currentThread().getName() + "test2 start... ");
// 指定唤醒的test2 这个方法
this.state = 3;
// 定向通知,通知在condition3 上等待的线程,可以唤醒执行了
condition3.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void test3() {
// 1 加锁
lock.lock();
try {
while (state != 3) {
// 让当前线程在 condition1 上进行等待
condition3.await();
}
System.out.println(Thread.currentThread().getName() + "test3 start... ");
// 指定唤醒的test1 这个方法
this.state = 1;
// 定向通知,通知在condition1 上等待的线程,可以唤醒执行了
condition1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
JUC常用辅助类
CountDownLatch
package com.qf.juc;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class Demo2 {
// 共享资源
public int a = 0 ;
public static void main(String[] args) throws InterruptedException {
Demo2 demo2 = new Demo2();
// 创建一个倒计时器,并且赋值为2.如果主线程在latch对象上等待的化,那么需要其他线程
// 调用2次 countdown方法,主线程才会被唤醒继续执行。
CountDownLatch latch = new CountDownLatch(2);
new Thread(new Runnable() {
@Override
public void run() {
for (int i=0;i<100000;i++) {
demo2.a++;
}
// 把latch对象的计数器减1
latch.countDown();
}
},"A").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i=0;i<100000;i++) {
demo2.a++;
}
latch.countDown();
}
},"B").start();
// 让main线程等待一会
// Thread.sleep(1000);
// todo: juc 里有单独的类解决这个问题 : CountDownLatch
// TimeUnit.SECONDS.sleep(10);
// 让主线程在 latch对象上等待,那么需要等待多久呢?
// 需要等到 latch对象的计数器变成0 .
latch.await();
System.out.println(demo2.a);
}
}
CyclicBarrier (召唤神龙)
package com.qf.juc;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Demo7 {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, new Runnable() {
@Override
public void run() {
System.out.println("集齐了七颗龙珠可以召唤神龙了!");
}
});
for (int i=0;i<7;i++) {
int finalI = i;
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+":收集到了龙珠:"+ finalI);
try {
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName()+":执行结束");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
},""+i).start();
}
}
}
Semaphore
package com.qf.juc;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class Demo8 {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i=0;i<6;i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int i=0;i<5;i++) {
try {
// 在信号量上申请,如果成功,可以执行业务代码,并且信号两的计数器会减一
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+":抢到了车位");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 信号两的计数器加1
semaphore.release();
}
}
}
},""+i).start();
}
}
}