最近觉得一直看框架知识也不太好,感觉再怎么学都只是crud,以前学的多线程的知识都没怎么用过,顶多就是在读取内容比较多的时候,开几个线程读一下。
所以想进阶一下看看,顺便学了一句话,多学java中不变的知识,毕竟像框架这种东西,更新换代太快。这些不变的知识才是更应该牢牢掌握的基础(并不是说框架知识不重要!)。
JUC就是java.util .concurrent工具包的简称,是java多线程部分必须要掌握的知识。
Lock 锁
在学习 juc 之前,通常都是使用synchronized关键来进行线程之间的同步的,但是在juc中,出现了一个新的锁:Lock锁。
通常我们在使用同步方法的时候会整个方法都被锁起来,不是很灵活,虽然也可以用同步代码块,但是新的Lock锁,看起来好像更加不错。
Lock 是一个接口,它被称为可重入锁,它的一些实现类比较常用的是ReentrantLock。
下边我们来使用Lock锁来改写一下 练习synchronized 时候的卖票小案例。
但是要注意一点,现在我们要遵循一个规范
线程,操作,资源类。这种三步走策略
这种方法是高内聚,低耦合的,
操作是资源自身的操作,应该在资源类里边。
比如例子中的票Ticket,这个就是资源类,卖票是票 Ticket 这个类自己的方法,应该写在Ticket这个类中,这就称之为内聚。
线程在用到的时候最好使用lambda表达式简化,这一套模板能让我们的代码看起来更加简洁,也使我们的代码耦合度更低。
具体可以看下边例子。
Lock 卖票小案例
class Ticket {
private int number = 30;
private final Lock lock = new ReentrantLock();
public void salTicket() {
lock.lock();
try {
if (number > 0) {
System.out.println(Thread.currentThread().getName()
+ "拿到了第" + number + "张票,还剩下" + --number + "张票");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class LockTest {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(()-> {
for (int i =0; i < 35; i++) {
ticket.salTicket();
}
},"A").start();
new Thread(()-> {
for (int i =0; i < 35; i++) {
ticket.salTicket();
}
},"B").start();
new Thread(()-> {
for (int i =0; i < 35; i++) {
ticket.salTicket();
}
},"C").start();
}
}
Lock的线程通信
在使用Lock进行线程通信之前,我们回顾一下之前使用 synchronized 的时候是怎么进行通信的。
但是要始终记住一句话 先判断,再操作,最后通知!
现在给出一个场景。
有一个面包店有一个订单的功能
要求有两个线程,一个是顾客来订单,一个是店家做蛋糕,有一定订单就来生产一个蛋糕
两者各循环10次,对一个变量cake(初始也为0) 进行操作,要求最后cake仍未0
这个就是生产者消费者的变种类型的问题。实质也是生产者消费者模型。
synchronized 的通信
class Order {
private int cake = 0;
public synchronized void produce() throws InterruptedException {
if (cake != 0) {
this.wait();
}
cake++;
System.out.println(Thread.currentThread().getName()+" : "+cake);
notifyAll();
}
public synchronized void consume() throws InterruptedException {
if (cake == 0) {
this.wait();
}
cake--;
System.out.println(Thread.currentThread().getName()+" : "+cake);
notifyAll();
}
}
public class Communicate {
public static void main(String[] args) {
Order order = new Order();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
order.produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
order.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
}
}
Lock的通信模型。
在juc中,我们知道了我们的锁从synchronized同步锁变成了 Lock 锁,那么问题来了,这个时候我们还能使用wait和notify吗?
答案是不行的,wait和notify 是Object类的两个方法,你可以在Lock中调用,但是一定会报错,wait和notify,又或者notifyAll 这三种方法只能在 synchronized 同步方法或者同步代码块中使用,否则报错。
而我们Lock 锁也有了新的通信机制。Condition
又有了新的方法叫 await 和 signal 或者signalAll 这三种方法。
于是我们在juc中有了一套新的同步/通信模型。
Lock 对应 synchronized
await 对应 wait
signal ,signalAll 对应 notify,notifyAll
class Order {
private int cake = 0;
private final Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void produce() {
lock.lock();
try {
if (cake != 0) {
condition.await();
}
cake++;
System.out.println(Thread.currentThread().getName()+" : "+cake);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void consume() {
lock.lock();
try {
if (cake == 0) {
condition.await();
}
cake--;
System.out.println(Thread.currentThread().getName()+" : "+cake);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class Communicate {
public static void main(String[] args) {
Order order = new Order();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
order.produce();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
order.consume();
}
}, "B").start();
}
}
虚假唤醒
但是如果现在需求变更,我们蛋糕店做大了,又招了一个厨师。现在我们可以同时招待两个顾客。
那这个时候,我们就需要再开辟两个线程了,我们直接复制这两个线程 然后一个叫C,一个叫D
C为新的糕点师傅,D为顾客
这个时候运行,会发现一件事。我们的cake数量开始改变了,不再是我们所想的1,0,1,0交替。而是出现其它我们不期望出现的值
那这是怎么回事呢?
这种情况我们称之为虚假唤醒!
我们在判断的时候使用的if,假设现在cake 初始化为 0,线程A进入。cake变为1,这个时候我们要唤醒其它线程了,我们所期望的是 最好线程B或者D这两个顾客线程进入消费,来出现 cake—的情况。
但是,现在C进程进来了,判断cake = 1,不符合生产情况,进入 wait状态(await 和 wait 是会释放锁的!),那这个时候,A再次被cpu调度,cake = 1,继续进入wait,两个生产者全部进入阻塞态,要唤消费者了,也就是顾客线程了,比如现在是顾客线程B执行完后,cake—,变为0,然后再次唤醒线程,这次仍然唤醒了C,C来继续执行,cake++变为1,然后继续唤醒别的线程,这次唤醒了A,问题就处在了这里!!!,我们现在阻塞的位置是await,如果被唤醒我们就要执行cake++操作了,根本不会没有判断 cake 的情况!!!!。
这就是虚假唤醒。我们可以将 if 判断 改为 while判断来进行解决。
在线程通信的时候记住两句话!!!!!!
- 线程通信的步骤: 判断/操作/通知
- 在判断时不要使用if,而要使用while来防止线程的虚假唤醒
为什么选用 Lock,不用Synchronized?
看到这里,我们明白,既然现在Synchronized 能做的改成了Lock,那么为什么要使用Lock,而不像原来那样直接使用 Synchronized的呢?
要知道,新的api的出现,一定是为了去掉 旧的api 不好的地方。
假设现在的场景需要三个线程A,B,C,需要我们按着 A->B->C的顺序 来进行交替打印,
如果是A打印5次,如果是B打印10次,如果是C打印15次
这样循环5次。这个时候我们要怎么做呢?
我们首先要搞一个标志位!假设现在让 number = 1时,线程A执行,number =2,B执行,number =3,C执行。
我们使用 Synchronized 其实也可以实现,但是我们在 notifyAll的时候是唤醒所有的线程,让这些线程抢夺资源,
那么问题来了,这样唤醒时耗费的资源,如果我们是不是能省去,指定下一个就是B进程被唤醒呢?
接下来就能体会出 Lock的 优点,准确唤醒!
Lock 锁的精准唤醒
首先要知道一个Lock锁,是可以有多把钥匙的。每把钥匙都可以进行开锁行为。
假设现在condition1 是A拿的钥匙,condition2是B拿的钥匙,condition3是C拿的钥匙。
我们在执行完A操作时先进行判断,
不符合情况拿着condition1 await,进入阻塞态
符合情况执行完打印操作,用condition2去signal,自然就可以指定下一个被唤醒的是B
class Printer {
private int number = 1;
private final Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void print5() {
lock.lock();
try {
while (number != 1) {
//A 进入阻塞态
condition1.await();
}
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "::" + i);
}
//指定标志位 为 B
number = 2;
//唤醒B
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print10() {
lock.lock();
try {
while (number != 2) {
//B 进入阻塞态
condition2.await();
}
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + "::" + i);
}
//指定标志位 为 C
number = 3;
//唤醒C
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print15() {
lock.lock();
try {
while (number != 3) {
//C 进入阻塞态
condition3.await();
}
for (int i = 0; i < 15; i++) {
System.out.println(Thread.currentThread().getName() + "::" + i);
}
//指定标志位 为 A
number = 1;
//唤醒A
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class SignalTest {
public static void main(String[] args) {
Printer printer = new Printer();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
printer.print5();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
printer.print10();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
printer.print15();
}
}, "C").start();
}
}
多线程八锁
线程使用锁的八种情况。
在此之前我们先来回顾一些知识。
- synchronized 在同步方法时,监视器(锁)为 对象锁(this)
- synchronized 在同步静态方法时,锁为类锁 (对象.class)
- 所有的静态同步方法共享一个类锁。
- 所有的非静态同步方法,共享一个对象锁。
- 当一个线程拿到锁之后,其它线程必须进入等待状态,直到该线程释放锁。
- 静态同步方法与非静态同步方法之间是不会有竞争条件的,因为它们竞争的不是一把锁。当然它们与普通方法也没有竞争条件。
标准访问 ```java class Phone {
public synchronized void sendEmail() {
//发邮件 System.out.println("send Email ....");
}
public synchronized void sendSMS() {
//发短信 System.out.println("send SMS ....");
}
}
public class Lock8 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(phone::sendEmail,"A").start();
//这里睡眠,是为了先让 A执行
Thread.sleep(200);
new Thread(phone::sendSMS,"B").start();
}
}
在标准访问中,也就是线程A去发邮件,线程B去发短信。<br />输出顺序是什么?<br />send Email ....<br />send SMS ....<br />(因为我们在开启线程A后,让main线程进入了睡眠状态,这个时间足够A执行完了,如果不进行睡眠,那就无法预知顺序,看cpu心情)
2. 如果在发送邮件中睡眠4秒,是先输出发送邮件还是发送短信?
仍然是<br />send Email ....<br />send SMS ....
因为同步方法为对象锁,线程A先拿到了锁,线程B只能等待。
```java
class Phone {
public synchronized void sendEmail() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
//发邮件
System.out.println("send Email ....");
}
public synchronized void sendSMS() {
//发短信
System.out.println("send SMS ....");
}
}
public class Lock8 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(phone::sendEmail,"A").start();
//这里睡眠,是为了先让 A执行
Thread.sleep(200);
new Thread(phone::sendSMS,"B").start();
}
}
- 新增一个普通的方法 hello,那么先打印发送邮件还是hello?
先打印 hello,因为hello是普通方法,不带Synchronized,并不需要锁就可以去访问
class Phone {
public synchronized void sendEmail() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
//发邮件
System.out.println("send Email ....");
}
public synchronized void sendSMS() {
//发短信
System.out.println("send SMS ....");
}
public void hello() {
System.out.println("hello");
}
}
public class Lock8 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(phone::sendEmail,"A").start();
//这里睡眠,是为了先让 A执行
Thread.sleep(200);
new Thread(phone::hello,"B").start();
}
}
- 现在有两部手机对象,先打印发邮件还是发短信
send SMS ….
send Email ….
因为对象变成了两个,锁也变成了两个,线程A拿phone这个锁,线程B拿phone2这个锁,并不是同一把锁,不产生竞争条件。
class Phone {
public synchronized void sendEmail() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
//发邮件
System.out.println("send Email ....");
}
public synchronized void sendSMS() {
//发短信
System.out.println("send SMS ....");
}
public void hello() {
System.out.println("hello");
}
}
public class Lock8 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
Phone phone2 = new Phone();
new Thread(phone::sendEmail,"A").start();
//这里睡眠,是为了先让 A执行
Thread.sleep(200);
new Thread(phone2::sendSMS,"B").start();
}
}
- 两个静态同步方法,同一部手机,先打印发邮件还是发短信
先打印发邮件
send Email ….
send SMS ….
因为线程A先拿到 类锁,B也需要类锁,B必须等待A释放
class Phone {
public static synchronized void sendEmail() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
//发邮件
System.out.println("send Email ....");
}
public static synchronized void sendSMS() {
//发短信
System.out.println("send SMS ....");
}
public void hello() {
System.out.println("hello");
}
}
public class Lock8 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
//Phone phone2 = new Phone();
//这里就不采用方法引用了
new Thread(()-> phone.sendEmail(),"A").start();
//这里睡眠,是为了先让 A执行
Thread.sleep(200);
new Thread(()-> phone.sendSMS(),"B").start();
}
}
- 两个静态同步方法,2部手机,先打印发邮件还是发短信
send Email ….
send SMS ….
仍然先打印发邮件,因为所有的静态同步方法共享一个类锁。虽然有多个对象,但是类锁只有一个,仍然存在竞争条件。
class Phone {
public static synchronized void sendEmail() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
//发邮件
System.out.println("send Email ....");
}
public static synchronized void sendSMS() {
//发短信
System.out.println("send SMS ....");
}
public void hello() {
System.out.println("hello");
}
}
public class Lock8 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
Phone phone2 = new Phone();
new Thread(()-> phone.sendEmail(),"A").start();
//这里睡眠,是为了先让 A执行
Thread.sleep(200);
new Thread(()-> phone2.sendSMS(),"B").start();
}
}
- 一个普通同步方法,一个静态同步方法,一部手机,先打印发邮件还是发短信
现在发送邮件方法加上 static,发送短信方法不加 static
线程A拿到了类锁,线程B拿到了对象锁,两者也不构成竞争条件。
所以先发送短信,再发送邮件
send SMS ….
send Email ….
class Phone {
public static synchronized void sendEmail() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
//发邮件
System.out.println("send Email ....");
}
public synchronized void sendSMS() {
//发短信
System.out.println("send SMS ....");
}
public void hello() {
System.out.println("hello");
}
}
public class Lock8 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
//Phone phone2 = new Phone();
new Thread(()-> phone.sendEmail(),"A").start();
//这里睡眠,是为了先让 A执行
Thread.sleep(200);
new Thread(()-> phone.sendSMS(),"B").start();
}
}
- 一个普通同步方法,一个静态同步方法,两部手机,先打印发邮件还是发短信
同第7种情况一样,理由也一样。
send SMS ….
send Email ….
class Phone {
public static synchronized void sendEmail() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
//发邮件
System.out.println("send Email ....");
}
public synchronized void sendSMS() {
//发短信
System.out.println("send SMS ....");
}
public void hello() {
System.out.println("hello");
}
}
public class Lock8 {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
Phone phone2 = new Phone();
new Thread(()-> phone.sendEmail(),"A").start();
//这里睡眠,是为了先让 A执行
Thread.sleep(200);
new Thread(()-> phone2.sendSMS(),"B").start();
}
}
List 不安全
在多线程环境下,ArrayList 是不安全的。我们可以验证一下。
public static void main(String[] args) {
List<String> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(list);
}).start();
}
}
有的线程直接报 java.util.ConcurrentModificationException,并发修改异常。
查看ArrayList 代码可以看到,底层的add方法并没有加锁。
这样多线程操作同一个资源,毫无疑问会导致数据不一致。
那么怎么解决呢?还记得vector吗?就是List的前身,它的底层是用synchronized锁的,所以线程安全问题是解决了,但是由于get其实也加了锁,所以无论读写,都只能同时有一个进程进入,这大大降低了效率。
还有一个方法就是List
有没有一种方法让我们读的时候效率很快,写的时候线程也安全呢?
CopyOnWriteArrayList
使用CopyOnWriteArrayList 就可以解决这个问题,观看底层源码可以看到。
public CopyOnWriteArrayList() {
setArray(new Object[0]);
}
public boolean add(E e) {
synchronized (lock) {
Object[] es = getArray();
int len = es.length;
es = Arrays.copyOf(es, len + 1);
es[len] = e;
setArray(es);
return true;
}
}
public E get(int index) {
return elementAt(getArray(), index);
}
它的底层其实也是一个数组。
但add一个元素的时候,使用Lock锁,每次进行扩容(java11中还是变成了 synchronized关键字),这样线程安全的问题是绝对解决了的。
我们再来看看读写的问题。
仔细观看我们首先是获取到当前这个底层数组getArray(); 赋值给 es,这就相当于将之前的数组备份了一份。
我们将新数组写进es,在写的时候,如果有线程进来读,getArray()读的还是原来的老数组。当我们完成写操作之后,再将原来的老数组覆盖。
是不是有种读写分离的味道?没错,这样我们就可以提高读的效率的,同时保证数据一致性。
Set 和 map也不安全
CopyOnWriteArraySet
它是线程安全的无序的set集合,底层仍然是Lock锁(java11中还是变成了 synchronized关键字)
CopyOnWriteArraySet和HashSet虽然都继承于共同的父类AbstractSet。但是,HashSet是通过 散列表(HashMap)实现的,而CopyOnWriteArraySet则是通过 动态数组(CopyOnWriteArrayList) 实现的,并不是散列表。
CopyOnWriteSet底层包含一个CopyOnWriteList,几乎所有操作都是借助CopyOnWriteList但是,HashSet就不一样了,底层是一个HashMap,set的value为 HashMap的 key,而HashMa p的value 则是一个固定的Object对象。
public CopyOnWriteArraySet() {
al = new CopyOnWriteArrayList<E>();
}
具体可以自行测试和观看源码。
ConcurrentHashMap
代替原来线程不安全的HashMap。
具体源码以及设计思想怎么实现的可以跳转 https://www.jianshu.com/p/d0b37b927c48
这里主要说一下HashMap的 底层结构数组+链表+红黑树。离之前学习的太久了,现在回顾一下。
static final float DEFAULT_LOAD_FACTOR = 0.75f;
public HashMap() {
this.loadFactor = DEFAULT_LOAD_FACTOR; // all other fields defaulted
}
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
V value;
Node<K,V> next;
...
...
}
在创建一个空参HashMap对象时,默认的容器大小为16,负载因子为0.75。
也就是说当有容器中put 12个对象的时候,HashMap进行扩容。二进制位左移一位,从16变成了32(2的5次方,变成2的6次方)。
默认情况下,底层存放的是Node
如果链表过长了,就会将链表转为红黑树。
读写锁 ReadWriteLock
在使用Lock的时候,我们解决了数据一致性的问题。但是我们的效率问题就下来了。
仔细想想,我们在读的时候似乎根本没必要去加锁,如果我们使用Lock,无论读写,都只能进去一个线程。
juc 帮我们准备了一个读写锁。使用我们在写的时候加写锁保证唯一写,在读的时候加写锁,也可以并发读。
我们现在启动5个写线程,5个读线程。
至于 volatile 关键字的知识,我们现在只需要指定用volatile关键字修饰一下 map来达到多个线程共享数据的目的就行。
扩展:
至于具体知识点:https://blog.csdn.net/z956281507/article/details/79683796
关于 volatile 解决指令重排的讲解 https://www.cnblogs.com/chenyangyao/p/5269622.html
https://www.cnblogs.com/hangzhi/p/11279495.html
class MyCache {
//volatile 保证数据在多个线程中可以共享(多个线程可见)
private volatile Map<String, Object> map = new HashMap<>();
public void put(String key, Object val) {
System.out.println(Thread.currentThread().getName() + "---写入数据" + key + "::" + val);
map.put(key, val);
System.out.println(Thread.currentThread().getName() + "写入完成");
}
public void get(String key) {
System.out.println(Thread.currentThread().getName() + "开始读取");
System.out.println(Thread.currentThread().getName() + "读取到数据"+map.get(key));
}
}
public class ReadWriterLockTest {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 0; i < 5; i++) {
final int index = i;
new Thread(()-> {
myCache.put(index+"",index);
},"write:" + i).start();
}
for (int i = 0; i < 5; i++) {
final int index = i;
new Thread(()-> {
myCache.get(index+"");
},"read:" + i).start();
}
}
}
输出:
read:1开始读取
read:4开始读取
read:0开始读取
read:3开始读取
read:2开始读取
write:3---写入数据3::3
write:4---写入数据4::4
write:2---写入数据2::2
write:0---写入数据0::0
write:1---写入数据1::1
read:1读取到数据null
read:2读取到数据null
read:0读取到数据null
read:3读取到数据null
read:4读取到数据null
write:3写入完成
write:1写入完成
write:0写入完成
write:2写入完成
write:4写入完成
可以看到,用volatile修饰的map 不具有原子性。我们的数据在写的时候,写入还没有完成,立刻被切换到了读线程。
这样并不是我们所期望的,我们期望的是在像 mysql的事务一样,我们可以正常并发读,但是我写的时候必须具有原子性,要么写入成功,要么写入失败,中间不允许被打断。
当我们采取了读写锁之后
class MyCache {
//volatile 保证数据在多个线程中可以共享(多个线程可见)
private volatile Map<String, Object> map = new HashMap<>();
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public void put(String key, Object val){
readWriteLock.writeLock().lock();
System.out.println(Thread.currentThread().getName() + "---写入数据" + key + "::" + val);
try {
TimeUnit.MILLISECONDS.sleep(300);
map.put(key, val);
System.out.println(Thread.currentThread().getName() + "写入完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
public void get(String key) {
readWriteLock.readLock().lock();
System.out.println(Thread.currentThread().getName() + "开始读取");
try {
TimeUnit.MILLISECONDS.sleep(300);
System.out.println(Thread.currentThread().getName() + "读取到数据"+map.get(key));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
public class ReadWriterLockTest {
public static void main(String[] args) {
MyCache myCache = new MyCache();
//在不加读写锁的情况下我们可以看到
for (int i = 0; i < 5; i++) {
final int index = i;
new Thread(()-> {
myCache.put(index+"",index);
},"write:" + i).start();
}
for (int i = 0; i < 5; i++) {
final int index = i;
new Thread(()-> {
myCache.get(index+"");
},"read:" + i).start();
}
}
}
输出
write:2---写入数据2::2
write:2写入完成
write:3---写入数据3::3
write:3写入完成
read:0开始读取
read:0读取到数据null
write:0---写入数据0::0
write:0写入完成
read:3开始读取
read:3读取到数据3
write:4---写入数据4::4
write:4写入完成
read:1开始读取
read:4开始读取
read:1读取到数据null
read:4读取到数据4
write:1---写入数据1::1
write:1写入完成
read:2开始读取
read:2读取到数据2
Process finished with exit code 0
juc 下的辅助类
CountDownLatch
public static void main(String[] args) throws InterruptedException {
//表示出我们最后要等待几个线程
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(()-> {
System.out.println(Thread.currentThread().getName() + "离开了房间");
//需要等待的线程数量减少1。
countDownLatch.countDown();
},String.valueOf(i)).start();
}
//阻塞在这里,直到 CountDownLatch中的count为0
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "关上了门");
}
CyclicBarrier
public static void main(String[] args) {
//不同于 CountDown,cycleBarrier 用来做加法,当线程数量到达指定值的时候,运行对应的线程。
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,() -> System.out.println("召唤神龙"));
for (int i = 1; i <= 7; i++) {
final int index = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() +"收集到了第" + index +"颗龙珠");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
Semaphore
semaphore 就类似于操作系统中的信号量机制
它的 acquire() 和 release() 相当于信号量的P,V操作
当 信号量值为1的时候为同步,类似于Synchronized
当 信号量 >1的时候用来做互斥信号量,限制进入的线程数量
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(4);
for (int i = 0; i < 6; i++) {
final int index = i;
new Thread(()-> {
try {
//用来获取信号量,此时信号量的值-1
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "抢占了车位");
//需要等待的线程数量减少1。
TimeUnit.SECONDS.sleep(index);
System.out.println(Thread.currentThread().getName() + "离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放信号量,信号量值+1
semaphore.release();
}
},String.valueOf(i)).start();
}
}
BolckingQueue阻塞队列
BolckingQueue :阻塞队列,从上图可以看出它其实也是Collection家族下的一员。
下边三个类则是三个常用实现类。
下边来介绍一下阻塞队列的四组API
add 在队列满的时候报错,正常情况下返回true。remove 在队列空的时候报错,正常情况下返回值。
无参的offer 在队列满的时候返回false,正常情况下返回true。poll 在队列空的时候返回null,正常情况下返回值。
put 在队列满的时候阻塞,正常情况下无返回。take 在队列空的时候阻塞,正常情况下返回值。
有参的offer 和 poll 则会指定 阻塞多长时间。参数第一个为等待多长时间,第二个为时间单位
当然你也可以混合使用这些api
public static void main(String[] args) throws InterruptedException {
//创建一个阻塞队列
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
// addAndRemove(blockingQueue);
// noArgsOfferAndPoll(blockingQueue);
// putAndTake(blockingQueue);
argsOfferAndPoll(blockingQueue);
}
private static void argsOfferAndPoll(BlockingQueue<String> blockingQueue) throws InterruptedException {
blockingQueue.offer("a",2, TimeUnit.SECONDS);
blockingQueue.offer("b",2, TimeUnit.SECONDS);
blockingQueue.offer("c",2, TimeUnit.SECONDS);
blockingQueue.offer("d",2, TimeUnit.SECONDS);
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
// System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
}
private static void putAndTake(BlockingQueue<String> blockingQueue) throws InterruptedException {
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
// blockingQueue.put("d");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
}
private static void noArgsOfferAndPoll(BlockingQueue<String> blockingQueue) {
//不会报异常,只会返回false和 null
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
// System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
}
private static void addAndRemove(BlockingQueue<String> blockingQueue) {
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
//满了则报错
// System.out.println(blockingQueue.add("d"));
//remove 同理
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove());
}
SynchronousQueue 同步队列
SynchronousQueue相比于其它两个而言比较特殊。
SynchronousQueue是一个不存储元素的队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。
它非常适合做交换工作,生产者的线程和消费者的线程同步以传递某些信息、事件或者任务。
所以如果你只有一个线程put 或者take会阻塞。(我只在 main函数 put之后,线程进入阻塞状态)
public static void main(String[] args) throws InterruptedException {
//synchronousQueue 不存储元素 而是要put一个取出一个,否则线程阻塞。
BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();
new Thread(()->{
try {
synchronousQueue.put("a");
synchronousQueue.put("b");
synchronousQueue.put("c");
System.out.println(Thread.currentThread().getName()+":end");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
new Thread(()->{
try {
TimeUnit.MILLISECONDS.sleep(300);
System.out.println(synchronousQueue.take());
System.out.println(synchronousQueue.take());
System.out.println(synchronousQueue.take());
System.out.println(Thread.currentThread().getName()+":end");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
}
详解:https://blog.csdn.net/chenssy/article/details/77371992
https://blog.csdn.net/qq_38293564/article/details/80604194
线程池
优点
- 减少系统资源的消耗
- 控制线程数量,可控最大并发数
- 方便管理,线程复用
juc中内置的线程池
- newCachedThreadPool:用来创建一个可以无限扩大的线程池,适用于负载较轻的场景,执行短期异步任务。(可以使得任务快速得到执行,因为任务时间执行短,可以很快结束,也不会造成cpu过度切换)
- newFixedThreadPool:创建一个固定大小的线程池,因为采用无界的阻塞队列,所以实际线程数量永远不会变化,适用于负载较重的场景,对当前线程数量进行限制。(保证线程数可控,不会造成线程过多,导致系统负载更为严重)
- newSingleThreadExecutor:创建一个单线程的线程池,适用于需要保证顺序执行各个任务。
- newScheduledThreadPool:适用于执行延时或者周期性任务。
不过阿里巴巴开发手册中明确说到
所以一般我们要自己手搓线程池,而且虽然我们这些内置的线程池有四种,但点进源码会发现,
其实都是ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
ThreadPoolExecutor 参数详解
public ThreadPoolExecutor(int corePoolSize, //指定线程池中的线程数量
int maximumPoolSize, //指定了线程池中最大线程数量
long keepAliveTime, //当线程池数量超过corePoolSize,多余的空闲线程存活时间(超过corePoolSize的空闲线程会在多久内销毁)
TimeUnit unit, //keepAliveTime的单位
BlockingQueue
ThreadFactory threadFactory, //线程工厂,用来创建线程,一般使用默认的即可
RejectedExecutionHandler handler) //拒绝策略,当任务太多来不及处理,如何进行拒绝
线程池工作流程
在创建了线程池后,等待提交过来的任务请求
当调用execute()方法添加一个请求任务时,线程池会做出如下判断
如果正在运行的线程池数量小于corePoolSize,那么马上创建线程运行这个任务
- 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列
- 如果这时候队列满了,并且正在运行的线程数量还小于maximumPoolSize,那么还是创建非核心线程运行这个任务;
- 如果队列满了并且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行
- 当一个线程完成任务时,它会从队列中取下一个任务来执行
- 当一个线程无事可做操作一定的时间(keepAliveTime)时,线程池会判断:
如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉
所以线程池的所有任务完成后,它会最终收缩到corePoolSize的大小
线程池的拒绝策略
- AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
- CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不
会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
- DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中
尝试再次提交当前任务。
- DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。
如果允许任务丢失,这是最好的一种策略。
手搓一个简单的线程池
public static void main(String[] args) {
final Integer corePoolSize = 3;
final Integer maximumPoolSize = 5;
final Long keepAliveTime = 1L;
// 自定义线程池,只改变了LinkBlockingQueue的队列大小
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,//一般为cpu逻辑核心数 + 1 Runtime.getRuntime().availableProcessors() + 1;
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
try {
// 循环十次,模拟业务办理,让5个线程处理 <= 8(maximumPoolSize + 队列容量capacity )个请求不会出现问题。
//如果超过 8 ,就要看拒绝策略了,像默认的AbortPolicy(如果cpu处理过超快,有的时候也不会,但基本超过了都会报错)
for (int i = 0; i < 10; i++) {
final int tempInt = i;
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + ":给用户:" + tempInt + " 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
线程池详解:https://www.jianshu.com/p/7726c70cdc40
https://blog.csdn.net/LookForDream_/article/details/106081021
ForkJoin
它的思想就是讲一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。
ForkJoinPool
既然任务是被逐渐的细化的,那就需要把这些任务存在一个池子里面,这个池子就是ForkJoinPool,它与其它的ExecutorService区别主要在于它使用“工作窃取“,那什么是工作窃取呢?
一个大任务会被划分成无数个小任务,这些任务被分配到不同的队列,这些队列有些干活干的块,有些干得慢。于是干得快的,一看自己没任务需要执行了,就去隔壁的队列里面拿去任务执行。
ForkJoinTask
ForkJoinTask就是ForkJoinPool里面的每一个任务。他主要有两个子类:RecursiveAction和RecursiveTask。然后通过fork()方法去分配任务执行任务,通过join()方法汇总任务结果
public class TestForkJoin {
static class MyForkJoinTask extends RecursiveTask<Integer> {
private Integer start;
private Integer end;
//临界值
private final static Integer MAX = 1000;
public MyForkJoinTask(Integer start, Integer end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
if ((start - end) > 1000) {
int middle = (start + end) / 2;
MyForkJoinTask task1 = new MyForkJoinTask(start, middle);
MyForkJoinTask task2 = new MyForkJoinTask(middle + 1, end);
task1.fork();
task2.fork();
return task1.join() + task2.join();
} else {
//正常计算
for (int i = start; i <= end; i++) {
sum += i;
}
return sum;
}
}
}
public static void main(String[] args) {
// 这是Fork/Join框架的线程池
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Integer> taskFuture = pool.submit(new MyForkJoinTask(1,10000));
try {
Integer result = taskFuture.get();
System.out.println("result = " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace(System.out);
}
//还有一种方式是并行流
int reduce = IntStream.rangeClosed(0, 10000).parallel().reduce(0, Integer::sum);
System.out.println(reduce);
}
}
经过网上查找资料分析,Forkjoin 其实并不如 java8的 并行流处理
异步回调
在java 8 中新加的特性
廖雪峰的讲解 https://www.liaoxuefeng.com/wiki/1252599548343744/1306581182447650
使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。
从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
public static void main(String[] args) throws ExecutionException, InterruptedException {
//CompletableFuture 是Future的一个实现类。
//completableFuture这套使用异步任务的操作都是创建成了守护线程。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("future 异步执行");
// int res = 10 / 0;
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"异步执行完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1024;
}).whenCompleteAsync((res, err)->{
if (err == null){
System.out.println("结果" + res);
}
}).exceptionally(err -> {
System.out.println("出错啦" + err);
return 0;
});
//由于任务是异步的,所以这里应该先被执行
System.out.println("halo");
//如果这里不使用get阻塞,上边的任务线程执行到一半就会强制关闭。
//System.out.println(future.get());
//为了解决这个问题。我们可以这样,等待allOf(里边所有的线程运行结束)
CompletableFuture.allOf(future).get();
}