多线程回顾
开启多线程
- 继承Thread类,重写run方法,然后通过类的start()方法开启一个新的线程
```java
public class Demo1 {
public static void main(String[] args) {
} }new MyThread().start();
while (true) {
System.out.println("main()方法正在运行");
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 两个打印语句交替打印
*/
class MyThread extends Thread { @Override public void run() { while (true) { System.out.println(“MyThread类的run()方法运行”); try { Thread.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } } }
2. 实现runnable接口,通过该类的实例对象构建线程类,在通过线程对象的start()方法开启多线程
```java
public class Demo1 {
public static void main(String[] args) {
MyThread myThread = new MyThread();
Thread thread = new Thread(myThread);
thread.start();
while (true) {
System.out.println("main()方法运行");
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class MyThread implements Runnable {
@Override
public void run() {
while (true) {
System.out.println("MyThread类的run()方法运行");
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- 实现Callable接口实现多线程,但是只有Runnable类型的参数才能构造Thread,而FutureTask类继承了Runnable接口且可以使用Callable类型参数构造,可以把FutureTask看作是适配器Adapter,连接Thread类和Callable的实现类, Callable接口相对于Runnable,可以获得线程异步执行的返回值,但get()获取返回值时会阻塞主线程
```java
public class Demo1 {
public static void main(String[] args) {
} }MyThread myThread = new MyThread();
FutureTask<String> ft = new FutureTask<>(myThread);
new Thread(ft).start();
try {
String myThreadReturn = ft.get();
System.out.println(myThreadReturn);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("main()方法执行");
class MyThread implements Callable
<a name="wmv6k"></a>
## 设置线程优先级
- Thread类中的setPriority()方法
- 线程优先级默认为5,可设置为1-10
```java
public class Demo3 {
public static void main(String[] args) {
Thread A = new Thread(new Task(), "A");
Thread B = new Thread(new Task(), "B");
A.setPriority(1);
B.setPriority(10);
A.start();
B.start();
}
}
class Task implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在运行方法");
}
}
线程让步
- run方法中,满足了某个条件后通过调用yield()方法将CPU执行权让给优先级相同或更高的线程
```java
public class Demo4 {
public static void main(String[] args) {
} }Thread A = new Thread(new Task4(), "A");
Thread B = new Thread(new Task4(), "B");
/* 线程优先级为1-10,默认为5 */
A.setPriority(1);
B.setPriority(10);
A.start();
B.start();
class Task4 implements Runnable { @Override public void run() { for (int i = 0; i < 6; i++) { if (i == 3) { System.out.println(Thread.currentThread().getName() + “线程让步”); Thread.yield(); } System.out.println(Thread.currentThread().getName() + “ “ + i); } } }
<a name="KkGVi"></a>
## 线程插队
- 本线程执行过程中,调用别的线程的join()方法,即可将本线程阻塞,待被调用线程执行完成后再唤醒本线程
```java
public class Demo5 {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(new Task5(), "A");
t.start();
for (int i = 0; i < 10; i++) {
if (i == 2) {
t.join();
}
System.out.println(Thread.currentThread().getName() + " " + i);
}
}
}
class Task5 implements Runnable {
@Override
public void run() {
for (int i = 0; i < 50; i++) {
System.out.println(Thread.currentThread().getName() + " " + i);
}
}
}
线程同步
同步方法
- 多线程下要想不出现安全问题,就必须保证同一时刻只有一个线程能访问资源,可以为操作资源的方法加入锁,或把方法加入同步代码块中
- 并不是给run方法加synchronized,而是给操作单个资源的方法加synchronized
- 对于Synchronized修饰的方法,因为有不同的线程进入该方法中被阻塞,所以在操作资源前仍需要对资源的可操作性进行判断,否则资源唤醒后没有验证可操作性直接修改资源,就会出错
```java
public class Demo6 {
public static void main(String[] args) {
} }TicketWarehouse6 tw = new TicketWarehouse6();
new Thread(tw, "A").start();
new Thread(tw, "B").start();
new Thread(tw, "C").start();
new Thread(tw, "D").start();
class TicketWarehouse6 implements Runnable {
private int tickets = 100;
@Override
public void run() {
while (tickets > 0) {
sell();
}
}
public synchronized void sell() {
try {
//模拟卖票时的处理时间
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (tickets > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 "
+ (100 - (--tickets)) + " 张票");
}
}
}
<a name="xMiFC"></a>
#### 同步代码块
- 把操作单个资源方法放入到同步代码块中
```java
public class Demo7 {
public static void main(String[] args) {
TicketWarehouse7 tw = new TicketWarehouse7();
new Thread(tw, "A").start();
new Thread(tw, "B").start();
new Thread(tw, "C").start();
new Thread(tw, "D").start();
}
}
class TicketWarehouse7 implements Runnable {
private int tickets = 100;
@Override
public void run() {
while (tickets > 0) {
synchronized (TicketWarehouse7.class) {
sell();
}
}
}
public void sell() {
try {
//模拟卖票时的处理时间
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (tickets > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 "
+ (100 - (--tickets)) + " 张票");
}
}
}
模拟一个死锁
- A拿到了A锁,想通过拿到B锁执行方法后释放A锁;B拿到了B锁,想通过拿到A锁执行方法后释放B锁
两者都互相持有对方想要的资源,程序无法执行,产生了死锁
public class Demo8 {
public static void main(String[] args) {
Object A = new Object();
Object B = new Object();
new Thread(()->{
synchronized (A) {
System.out.println(Thread.currentThread().getName() + " 拿到了 A 锁");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (B) {
System.out.println(Thread.currentThread().getName() + " 拿到了 B 锁");
}
}
}, "线程 A").start();
new Thread(()->{
synchronized (B) {
System.out.println(Thread.currentThread().getName() + " 拿到了 A 锁");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (A) {
System.out.println(Thread.currentThread().getName() + " 拿到了 B 锁");
}
}
}, "线程 B").start();
}
}
并发编程
并发:单个CPU的操作系统在进行多线程操作时,将CPU时间划分为若干个时间段,再将时间段分配给各个线程执行,再一个时间段内只运行一个线程,其他线程处于挂起状态。
- 并行:多CPU操作系统进行多线程操作,不同的CPU执行不同的线程,即线程之间不抢占CPU资源。
sleep和wait区别
- sleep只让出CPU,不释放同步资源锁;wait会释放同步资源锁,notify才能解除线程的wait状态
sleep可以在任何地方使用,wait只能在同步方法或同步代码块中使用
Lock锁
使用三部曲
创建锁
- 操作资源前加锁
- 操作完资源后解锁
```java
public class Demo1SynchronizedAndLock {
public static void main(String[] args) {
} }TicketWarehouse tw = new TicketWarehouse();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
try {
tw.sell();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
try {
tw.sell();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
try {
tw.sell();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
class TicketWarehouse { private int tickets = 100;
ReentrantLock lock = new ReentrantLock();
public void sell() throws InterruptedException {
lock.lock();
if (tickets > 0) {
TimeUnit.MILLISECONDS.sleep(100);
System.out.println(Thread.currentThread().getName() + " 卖出了第 "
+ (100 - (--tickets)) + " 张票");
}
lock.unlock();
}
}
<a name="mKIyn"></a>
### Lock锁与synchronized的区别
- synchronized是Java内置关键字,而Lock是一个类
- synchronized无法获取锁的状态,Lock可以判断是否获取到了锁
- synchronized会自动释放锁,Lock必须手动释放,不然会产生死锁
- synchronized 线程1获取到锁后线程2只会等待,而Lock对于这种情况线程2不一定会等待
- synchronized可重入锁,不可中断,Lock可重入锁,可以判断锁,可以设置公平与非公平
- synchronized适合少量的代码同步,Lock适合大量的代码同步
<a name="zlpRt"></a>
## 生产者消费者
<a name="l1sz5"></a>
### synchronized版
- Data类中有一个资源属性,一个增加资源的方法,一个减少资源方法
- increase()方法中判断资源为1则进入等待状态,否则增加资源,唤醒其他线程
- decrease()方法中判断资源为0则进入等待状态,否则减少资源,唤醒其他线程
- 老问题:在操作资源时必须先对资源的可操作性进行判断
```java
public class Demo2SynchronizedPC {
public static void main(String[] args) {
Data2 data = new Data2();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
try {
data.decrease();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者1").start();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
try {
data.decrease();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者2").start();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
try {
data.increase();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "生产者1").start();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
try {
data.increase();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "生产者2").start();
}
}
class Data2 {
private int number = 0;
//+1
public synchronized void increase() throws InterruptedException {
while (number != 0) {
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "进行了 +1 操作,number=>" + number);
this.notifyAll();
}
//-1
public synchronized void decrease() throws InterruptedException {
while (number == 0) {
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "进行了 -1 操作,number=>" + number);
this.notifyAll();
}
}
Lock版
- 创建一个lock锁以后,通过lock锁获取condition对象
- 线程的等待方法为condition.await(); 唤醒其他线程的方法为condition.signalAll();
- 不要忘记lock.unlock()解锁!!
```java
public class Demo3LockPC {
public static void main(String[] args) {
} }Data3 data = new Data3();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
data.increase();
}
}, "生产者1").start();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
data.decrease();
}
}, "消费者1").start();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
data.decrease();
}
}, "消费者2").start();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
data.increase();
}
}, "生产者2").start();
class Data3 { private int number = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void increase() {
lock.lock();
try {
while (number != 0) {
condition.await(); //等待
}
number++;
System.out.println(Thread.currentThread().getName() + " -> " + number);
condition.signalAll(); //通知其他线程
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrease() {
lock.lock();
try {
while (number == 0) {
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName() + " -> " + number);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
<a name="crlBG"></a>
## Lock锁的优势
- 精准唤醒某个线程
- 创建两个线程,分别调用了方法B和方法C,都被阻塞
- 再创建一个新的线程,通过conditionC.signal();来唤醒调用方法C的那个线程
```java
public class Demo4Condition {
public static void main(String[] args) throws InterruptedException {
Task4 task = new Task4();
/* 线程B和C使用根据不同的条件而相机进入阻塞状态 */
new Thread(task::printB, "B").start();
new Thread(task::printC, "C").start();
/* 创建一个新的线程唤醒指定的线程(根据不同的condition),没被唤醒的线程仍是阻塞状态 */
new Thread(task::printA, "A").start();
}
}
class Task4 {
private int flag = 1;
Lock lock = new ReentrantLock();
Condition conditionB = lock.newCondition();
Condition conditionC = lock.newCondition();
public void printA() {
lock.lock();
/*
唤醒C
*/
conditionC.signal();
// conditionB.signal();
lock.unlock();
}
public void printB() {
lock.lock();
try {
conditionB.await();
System.out.println(Thread.currentThread().getName() + ": B");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC() {
lock.lock();
try {
conditionC.await();
System.out.println(Thread.currentThread().getName() + ": C");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
8锁问题
- 先发短信还是先打电话?
```java
public class Demo1 {
public static void main(String[] args) throws InterruptedException {
} }Phone phone = new Phone();
new Thread(phone::sendSMS, "A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(phone::call, "B").start();
class Phone { public synchronized void sendSMS() { System.out.println(“发短信”); }
public synchronized void call() {
System.out.println("打电话");
}
}
**结果:**先发短信,再打电话,但不是程序的顺序执行原因
2. 在1的基础上,让发短信延迟4秒中
```java
public synchronized void sendSMS() {
TimeUnit.SECONDS.sleep(4);
System.out.println("发短信");
}
结果:还是先发短信再打电话,不是顺序执行的原因,是因为synchronized锁的对象是phone(同一个对象), 谁先拿到谁执行,另一个等待。
- 在Phone类中添加一个普通方法hello(),此时再运行
```java
…
new Thread(()->{
}).start(); …phone.hello();
class Phone{ … public void hello(){ … } }
**结果:**先执行hello(),因为普通方法不受synchronized影响,但是如果把延迟4秒去掉,那么就会变为顺序执行
4. 再创建一个Phone对象,两个线程调用两个不同对象的方法
```java
public class Demo4 {
public static void main(String[] args) throws Exception {
Phone4 phone1 = new Phone4();
Phone4 phone2 = new Phone4();
new Thread(()->{
try {
phone1.sendSMS();
} catch (Exception e) {
e.printStackTrace();
}
}, "A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(phone2::call, "B").start();
}
}
class Phone4 {
public synchronized void sendSMS() throws Exception {
TimeUnit.SECONDS.sleep(4);
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}
结果:先打电话再发短信,执行:线程A开启,延迟4秒后执行 -> 等待1秒 -> 线程B开启 -> 线程B执行 -> 线程A等待结束,执行
两个synchronized方法都加上static关键字,一个对象调用两个方法,两个对象调用两个方法
class Phone5 {
public static synchronized void sendSMS() throws Exception {
TimeUnit.SECONDS.sleep(4);
System.out.println("发短信");
}
public static synchronized void call() {
System.out.println("打电话");
}
}
结果:结果都是先发短信后打电话,因为这时锁的就不是对象了,而是类Class,Class只有一份,不管有多少个对象,谁先拿到Class的锁,谁就先执行
类的两个方法,只有一个是static修饰,使用同一个对象调用两个方法 ```java public class Demo7 { public static void main(String[] args) throws Exception {
Phone7 phone = new Phone7();
new Thread(()->{
try {
phone.sendSMS();
} catch (Exception e) {
e.printStackTrace();
}
}, "A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
try {
phone.call();
} catch (Exception e) {
e.printStackTrace();
}
}, "B").start();
} }
class Phone7 { public static synchronized void sendSMS() throws Exception { TimeUnit.SECONDS.sleep(4); System.out.println(“发短信”); }
public synchronized void call() {
System.out.println("打电话");
}
}
**结果:**先打电话再发短信,因为锁的是两个不同的对象,一个是类模板,一个是对象调用者
7. 在6的基础上使用两个对象调用两个方法
**结果:**还是先打电话再发短信,两把锁锁的是不同的对象
<a name="54bbba80"></a>
### 结论
![image.png](https://cdn.nlark.com/yuque/0/2021/png/21390724/1630314900935-a254a7d2-27f7-4974-b620-80b5d1dc59e4.png#clientId=u79faf74b-ca4c-4&from=paste&height=119&id=u5a74d229&margin=%5Bobject%20Object%5D&name=image.png&originHeight=119&originWidth=917&originalType=binary&ratio=1&size=13362&status=done&style=none&taskId=u8d4f95d1-24fe-497d-9a74-943ce0bdde8&width=917)
<a name="QLI5F"></a>
## 不安全的集合类
- 在并发环境下,List、Set、Map都是不安全的,修改可能会出现java.util.ConcurrentModificationException
<a name="fAgwT"></a>
### 解决方案
<a name="zulbl"></a>
#### Collection.synchronized
- 使用ArrayList或Set构造一个线程安全的ArrayList或HashSet
```java
List list = Collections.synchronizedList(new ArrayList<>());
Set<Object> set = Collections.synchronizedSet(new HashSet<>());
Vector代替List
使用线程安全的集合类
- CopyOnWriteArrayList
- CopyOnWriteArraySet
-
辅助工具
CountDownLatch
类似于一个计数器,调用countDown()方法可以减少计数器,当计数器归零时候执行countDownLatch.await()后面的代码
public class Demo01TestCountDownLatch {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(24);
for (int i = 1; i <= 24; i++) {
System.out.println("现在是第" + i + "个小时");
countDownLatch.countDown();
}
countDownLatch.await();
/* 计数器归零了,开始执行以下代码 */
System.out.println("已经过了24个小时");
}
}
CyclickBarrier
允许一组线程互相等待,以达到一个共同的屏障点后开始执行某操作。
创建CyclickBarrier对象的时候可以指定多少个线程达到共同屏障点后执行什么样的操作
public class Demo02TestCyclickBarrier {
public static void main(String[] args) {
/* 允许一组线程相互等待,到达一个共同的屏障点后才继续执行操作 */
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("可以召唤神龙了");
});
for (int i = 1; i <= 7; i++) {
int finalI = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 收集了 " + finalI + " 星珠");
try {
//执行完毕后进入等待状态
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}
Semaphore
计数信号量,acquire获取许可,release释放许可,通常用于限制访问同一资源的线程数量
public class Demo03TestSemaphore {
public static void main(String[] args) {
//总共由两个信号量,多个线程竞争,使用完毕后释放信号量
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 获得了许可证");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}
读写锁
写的时候只允许一个线程去写,但读的时候可以允许多个线程去读,这种场景可以使用读写锁(有些也叫独占锁、共享锁)实现。
- 读写锁相对于普通的公平锁/非公平锁,细粒度更高
- 在不加锁的情况下运行,可能会出现一个key还没写入完毕,另一个线程的key就开始写入了,因此我们对于put方法加上写锁,对于get方法加上读锁 ```java ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public void put(String key, String value) { readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + “ 开始写入 “ + key + “ - “ + value); map.put(key, value); System.out.println(Thread.currentThread().getName() + “ 写入完毕”); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); } }
public String get(String key) { String s = “”; readWriteLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + “ 读取 “ + key + “ 的内容”); s = map.get(key); System.out.println(Thread.currentThread().getName() + “ 读取的内容为 “ + s); return s; } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); return s; } }
- 改进后只有完全执行完put方法后,其他线程才能进行put方法,达到了我们想要的效果。
<a name="nIIai"></a>
## 阻塞队列
<a name="tBQE7"></a>
### 关系树
![image.png](https://cdn.nlark.com/yuque/0/2021/png/21390724/1630336774083-f661c2a0-cd2c-4356-a236-8755bb67a198.png#clientId=u3cc455d9-2476-4&from=paste&height=627&id=uc5382d25&margin=%5Bobject%20Object%5D&name=image.png&originHeight=627&originWidth=799&originalType=binary&ratio=1&size=39173&status=done&style=none&taskId=u40e69d6a-a19a-413f-be6b-9f90eceffdb&width=799)
<a name="Zymyx"></a>
### ArrayBlockingQueue
| **方式** | **抛出异常** | **不会抛出异常,有返回值** | **阻塞 等待** | **超时 等待** |
| --- | --- | --- | --- | --- |
| **添加** | add | offer | put | offer(timenum,timeUnit) |
| **移除** | remove | poll | take | poll(timenum,timeUnit) |
| **判断队列首** | element | peek | - | - |
```java
/**
* 有返回值,抛出异常
* add remove element
*/
public static void test1() {
ArrayBlockingQueue<Object> queue1 = new ArrayBlockingQueue<>(3); //初始化大小为3
System.out.println(queue1.add("1"));
System.out.println(queue1.add("2"));
System.out.println(queue1.add("3"));
System.out.println(queue1.element()); //队首元素
System.out.println(queue1.remove());
System.out.println(queue1.remove());
System.out.println(queue1.remove());
System.out.println(queue1.remove()); //没有元素了还使用remove()方法,则会抛出异常
}
/**
* 不抛出异常,有返回值
* offer poll peek
*/
public static void test2() {
ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(3);
System.out.println(queue.offer("1"));
System.out.println(queue.offer("2"));
System.out.println(queue.offer("3"));
System.out.println(queue.peek());
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll()); //不抛出异常,直接返回null
}
/**
* 阻塞等待
* put take
*/
public static void test3() throws InterruptedException {
ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(3);
queue.put("1");
queue.put("2");
queue.put("3");
System.out.println(queue.peek());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take()); //取不到元素,会进入阻塞状态
}
/**
* 超时等待
* offer poll(time)
*/
public static void test4() throws InterruptedException {
ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(3);
queue.offer("1");
queue.offer("2");
queue.offer("3");
System.out.println(queue.poll(2, TimeUnit.SECONDS));
System.out.println(queue.poll(2, TimeUnit.SECONDS));
System.out.println(queue.poll(2, TimeUnit.SECONDS));
System.out.println(queue.poll(2, TimeUnit.SECONDS)); //等待两秒,若还没有取到元素则返回null
}
同步队列
可以看作是容量为1的队列,放入一个元素后必须取出来才能继续放进去,底层采用lock锁保证线程安全
public class Demo {
public static void main(String[] args) {
SynchronousQueue<Object> queue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " put 1");
queue.put("1");
System.out.println(Thread.currentThread().getName() + " put 2");
queue.put("2");
System.out.println(Thread.currentThread().getName() + " put 3");
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T1").start();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " take " + queue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " take " + queue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " take " + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T2").start();
}
}
线程池
创建
单个线程
- 固定线程
弹性伸缩大小
public class Demo {
public static void main(String[] args) {
ExecutorService threadPool1 = Executors.newSingleThreadExecutor();
ExecutorService threadPool2 = Executors.newFixedThreadPool(5);
ExecutorService threadPool3 = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 10; i++) {
threadPool3.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool3.shutdown();
}
}
}
7大参数
指的是ThreadPoolExecutor 构造器中的7个参数
- corePoolSize – 要保留在池中的线程数,即使它们处于空闲状态,除非设置了allowCoreThreadTimeOut
- maximumPoolSize – 池中允许的最大线程数
- keepAliveTime – 多余空闲线程在终止前等待新任务的最长时间
- unit – keepAliveTime参数的时间单位
- workQueue – 用于在执行任务之前保存任务的队列。 这个队列将只保存execute方法提交的Runnable任务
- threadFactory – 执行程序创建新线程时使用的工厂
- handler – 执行被阻塞时使用的处理程序,因为达到了线程边界和队列容量
代码实现
public class Demo2 {
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 2, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()); //默认的策略:丢弃任务并抛出RejectedExecutionException异常
//new ThreadPoolExecutor.DiscardPolicy()); 丢弃任务,不抛出异常
//new ThreadPoolExecutor.DiscardOldestPolicy()); 丢弃旧任务(最先加入队列的),再把新任务添加
//new ThreadPoolExecutor.CallerRunsPolicy()); 由调用线程处理该任务
try {
for (int i = 1; i <= 10; i++) { //这里的10代表有10个任务需要处理,但是整个线程池中最大可容纳8个任务,因此会抛出异常
int finalI = i;
threadPool.execute(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 执行任务" + finalI + " ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
4种拒绝策略
- new ThreadPoolExecutor.AbortPolicy()); 默认的策略:丢弃任务并抛出RejectedExecutionException异常
- new ThreadPoolExecutor.DiscardPolicy()); 丢弃任务,不抛出异常
- new ThreadPoolExecutor.DiscardOldestPolicy()); 丢弃旧任务(最先加入队列的),再把新任务添加
new ThreadPoolExecutor.CallerRunsPolicy()); 由调用线程处理该任务
如何设置线程池大小
CPU密集型:电脑核数是几核就设置几
int cpuNum = Runtime.getRuntime().availableProcessors(); //获取CPU核心数(这里指的是线程数)
I/O密集型
比如程序中有15个大型任务,IO非常占用资源,此时我们可以设置线程数量为IO数的两倍,如30函数式接口
function函数型接口
接受收一个参数并产生结果的函数。这是一个功能接口,其功能方法是apply(Object)
public class TestFunction {
public static void main(String[] args) {
Function<String, String> function = new Function<String, String>() {
@Override
public String apply(String s) {
return s;
}
};
System.out.println(function.apply("hello world"));
//简写
Function<String, String> function1 = (str) -> {
//对str进行处理,之后返回处理结果
return str;
};
System.out.println(function1.apply("hello world"));
}
}
Predicate断定型接口
也是一个功能函数,但是返回结果是布尔类型
public class TestPredicate {
public static void main(String[] args) {
Predicate<String> predicate = (str) -> {
return str.isEmpty();
};
System.out.println(predicate.test("123"));
System.out.println(predicate.test(""));
}
}
Consummer 消费型接口
接受单个输入参数且不返回结果的操作。 与大多数其他功能接口不同, Consumer预计通过副作用进行操作。这是一个功能接口,其功能方法是accept(Object)
public class TestConsummer {
public static void main(String[] args) {
Consumer<String> consumer = (str) -> {
System.out.println(str);
};
consumer.accept("123");
}
}
Supplier供给型接口
结果的提供者。没有要求每次调用供应商时都返回一个新的或不同的结果。这是一个函数式接口,其函数式方法是get()
public class TestSupplier {
public static void main(String[] args) {
Supplier<Integer> supplier = () -> {
return 1024;
};
System.out.println(supplier.get());
}
}
Stream流式计算
public class Demo {
public static void main(String[] args) {
User a = new User(1, "a", 20);
User b = new User(2, "b", 23);
User c = new User(3, "c", 22);
User d = new User(4, "d", 26);
User e = new User(5, "e", 24);
List<User> users = Arrays.asList(a, b, c, d, e);
/**
* 筛选id为偶数
* 年龄>23
* 名字转为大写
* 倒序排序
* 输出一个
*/
users.stream()
.filter(u->{return u.getId()%2==0;})
.filter(u->{return u.getAge()>23;})
.map(u->{return u.getName().toUpperCase();})
.sorted((u1, u2)->{
return u2.compareTo(u1);
})
.limit(1)
.forEach(System.out::println);
}
}
ForkJoin
核心思想
工作窃取
空闲线程可以拿到其他线程的未执行完的任务,实现:双端队列
案例:分解求和
- 求和类要继承RecursiveTask
,重写compute方法 - 定义小任务(计算小范围的值)
当范围大于某个值以后分解任务,并将其fork压入队列,返回值为各小任务的join结果相加
public class ForkJoinDemo extends RecursiveTask<Long> {
private long start;
private long end;
//临界值
private long temp = 10000L;
public ForkJoinDemo(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if ((end - start) < temp) {
long sum = 0;
for (long i = start; i < end; i++) {
sum += i;
}
return sum;
} else {
long middle = (start + end) / 2;
ForkJoinDemo forkJoinDemoTask1 = new ForkJoinDemo(start, middle);
forkJoinDemoTask1.fork(); //将任务压入线程队列
ForkJoinDemo forkJoinDemoTask2 = new ForkJoinDemo(middle, end);
forkJoinDemoTask2.fork();
return forkJoinDemoTask1.join() + forkJoinDemoTask2.join();
}
}
}
测试:实例化我们自定义的计算类,然后将其提交给ForkJoinPool,最后用get获取结果
public class Test {
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinDemo task = new ForkJoinDemo(0, 100_0000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(task);
Long aLong = submit.get();
long end = System.currentTimeMillis();
System.out.println("sum = " + aLong + " time = " + (end - start));
}
}
异步回调
runAsync
没有返回值的异步任务
public class TestRunAsync {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> task = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
System.out.println("123");
System.out.println(task.get());
}
}
执行结果
supplyAsync
有返回值的异步任务,可以自定义程序结束后的success回调和error回调
public class TestSupplyAsync {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
int i = 0;
try {
TimeUnit.SECONDS.sleep(1);
i = 1 / 0;
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
});
System.out.println(task.whenComplete((t, u) -> {
//success回调
System.out.println("t表示正确返回的结果:" + t);
System.out.println("u表示异常抛出的信息:" + u);
}).exceptionally((e)->{
//error回调
System.out.println(e.getMessage());
return 404;
}).get());
}
}
无异常时
- 有异常时
Volatile与JMM
Volatile
- Java对共享变量的操作并不是在主内存中进行的,而是在线程自己的工作内存中
- 操作共享变量需要对变量加锁,同时重新读取主内存中该变量的值
切换到另一个线程的时候需要将变量解锁,同时将值重新写回主内存中
可见性
public class Demo {
private static int num = 0;
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
while (num == 0) {
}
}).start();
Thread.sleep(2000);
num = 1;
System.out.println(num);
}
}
- 上述代码,由于没有保证num的可见性,即使main线程中改变了num的值,在另一个线程中也没有读取到,所以线程一直没有终止
解决方法:变量num加上volatile修饰符,保证各线程对共享变量的可见性
原子性
平常我们认为的原子操作,如num++,并不是由一条指令构成,在底层由执行引擎执行多条字节码指令(入栈、相加、出栈)完成
public class Demo3 {
private static int num = 0;
private static void add() {
num++;
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 200; i++) {
new Thread(() -> {
for (int j = 0; j < 10000; j++) {
add();
}
}).start();
}
while (Thread.activeCount() > 2) {
Thread.yield();
}
Thread.sleep(2000);
System.out.println(num);
}
}
以上代码理论输出为2000000,但实际上并达不到,即使我们给num加了volatile修饰符
- 解决方案:
- 对变量的操作加锁:synchronized、lock
- 使用AtomicInteger类
AtomicInteger封装了一个Unsafe变量,Unsafe变量的操作都是native方法
指令重排
源代码 -> 编译器优化重排 -> 指令也可能重排 -> 内存系统也可能会重排 -> 执行
可能造成的影响结果:前提:a b x y这四个值 默认都是0 | 线程A | 线程B | | —- | —- | | x = a | y = b | | b = 1 | a = 2 |
正常的结果: x = 0; y =0; | 线程A | 线程B | | —- | —- | | b = 1 | a = 2 | | x = a | y = b |
可能在线程A中会出现,先执行b=1,然后再执行x=a;在B线程中可能会出现,先执行a=2,然后执行y=b;那么就有可能结果如下:x=2; y=1
volatile 可以避免指令重排
- 内存屏障
- 保证特定的操作的执行顺序
- 保证某些变量的内存可见性
CAS
概念
AtomicXXX 类的一个方法,compareAndSet(expect, update),expect指期望的值,update指设置的新值
public class Demo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(123);
//参数:期待的值,设置的新值
System.out.println(atomicInteger.compareAndSet(123, 124)); //true
System.out.println(atomicInteger.get()); //124
System.out.println(atomicInteger.compareAndSet(123, 125)); //false
System.out.println(atomicInteger.get()); //124
}
}
Unsafe 类
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
总结: 比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么执行操作,如果不是则一直循环,使用的是自旋锁。
缺点:
当一个线程通过CAS修改某个数据的时候,被另一个线程抢先修改但是又改了回去,该线程并不知道数据被修改过了,不影响最终结果
public class TestABA {
public static void main(String[] args) throws InterruptedException {
AtomicInteger atomicInteger = new AtomicInteger(123);
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean b = atomicInteger.compareAndSet(123, 124);
System.out.println(b);
System.out.println("修改atomicInteger -> " + atomicInteger.get());
}, "A").start();
new Thread(() -> {
atomicInteger.compareAndSet(123, 125);
System.out.println("修改atomicInteger -> " + atomicInteger.get());
atomicInteger.compareAndSet(125, 123);
System.out.println("修改atomicInteger -> " + atomicInteger.get());
}, "B").start();
}
}
解决:使用AtomicStampedReference,定义初始值和版本号,每次对变量的操作都要修改版本号,这样其他线程就可以知道该变量有没有被修改过
public class Demo2 {
static AtomicStampedReference<Integer> reference = new AtomicStampedReference<>(1, 1);
public static void main(String[] args) {
//相当于定义了一个版本号
new Thread(() -> {
int stamp = reference.getStamp(); //当前版本号
System.out.println(Thread.currentThread().getName() + " ==> 当前获取的版本号为:" + stamp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//满足特定条件才改变:即初始版本号
boolean b = reference.compareAndSet(1, 2, 1, reference.getStamp() + 1);
if (b) {
System.out.println("修改成功");
} else {
System.out.println("修改失败");
}
}, "A").start();
new Thread(() -> {
reference.compareAndSet(1, 3, reference.getStamp(), reference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + " ==> 修改值 1 -> 3,stamp " + reference.getStamp() + " -> " + (reference.getStamp() + 1));
reference.compareAndSet(3, 1, reference.getStamp(), reference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + " ==> 修改值 3 -> 1,stamp " + reference.getStamp() + " -> " + (reference.getStamp() + 1));
}, "B").start();
}
}
各种锁
公平锁/非公平锁
公平锁:不能够插队,必须先来后到
非公平锁:可以插队(默认)
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
可重入锁
Synchronized 锁 ```java public class Demo1 { public static void main(String[] args) {
Phone phone = new Phone();
new Thread(() -> {
phone.sms();
}, "A").start();
new Thread(() -> {
phone.sms();
}, "B").start();
} }
class Phone { public synchronized void sms() { System.out.println(Thread.currentThread().getName() + “ => sms”); call(); //这里也有锁 }
public synchronized void call() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " => call");
}
}
- lock 锁
```java
public class Demo2 {
public static void main(String[] args) {
Phone2 phone = new Phone2();
new Thread(()->{
phone.sms();
},"A").start();
new Thread(()->{
phone.sms();
},"B").start();
}
}
class Phone2{
Lock lock=new ReentrantLock();
public void sms(){
lock.lock(); //细节:这个是两把锁,两个钥匙
//lock.lock(); lock锁必须配对,否则就会死锁在里面
try {
System.out.println(Thread.currentThread().getName()+"=> sms");
call();//这里也有一把锁
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void call(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "=> call");
}catch (Exception e){
e.printStackTrace();
}
finally {
lock.unlock();
}
}
}
自旋锁
底层使用的是CAS自旋锁 ```java public class TestSpinLock { public static void main(String[] args) throws InterruptedException {
//使用CAS实现自旋锁
SpinlockDemo spinlockDemo=new SpinlockDemo();
new Thread(()->{
spinlockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
} finally {
spinlockDemo.myunlock();
}
},"t1").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
spinlockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
} finally {
spinlockDemo.myunlock();
}
},"t2").start();
}
}
class SpinlockDemo {
//thread null
AtomicReference<Thread> atomicReference=new AtomicReference<>();
//加锁
public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(thread.getName()+"===> mylock");
//自旋锁
while (!atomicReference.compareAndSet(null,thread)){
System.out.println(Thread.currentThread().getName()+" ==> 自旋中~");
}
}
//解锁
public void myunlock(){
Thread thread=Thread.currentThread();
System.out.println(thread.getName()+"===> myUnlock");
atomicReference.compareAndSet(thread,null);
}
} ```
- 自旋的过程可以看作是一个循环,t2进程必须等待t1进程Unlock后,才能Unlock,在这之前进行自旋等待