简介
什么是juc
**JUC**
就是**java.util.concurrent**
工具包的简称。这是一个处理线程的工具包,jdk1.5
开始出现;
线程和进程概念
进程与线程
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。 在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。程序是指令、数据及其组织形式的描述,进程是程序的实体。
线程(thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
总结来说:
- 进程:指在系统中正在运行的一个应用程序,程序一旦运行就是进程,进程一一资源分配的最小单位。
线程:系统分配处理器时间资源的基本单元,或者说进程之内独立执行的一个单元执行流。线程一一程序执行的最小单位。
线程的状态
NEW
新建RUNNABLE
准备就绪BLOCKED
阻塞WAITING
不见不散TIMIED_WAITING
过时不候TERMINATED
终结wait
和sleep
区别
sleep
是Thread
的静态方法,wait
是Object
的方法,任何对象实例都能调用。sleep
不会释放锁,它也不需要占用锁。wait
会释放锁,但调用它的前提是当前线程占有锁(即代码要在synchronized
中)。- 它们都可以被
interrupted
方法中断。并发和执行
串行模式
- 串行表示所有任务都依次按先后顺序进行。串行意味着必须先装完一车柴才能运送这车柴,只有运送到了,才能卸下这车柴,并且只有完成了这整个三个步骤,才能进行下一个步骤。
-
并行模式
并行意味着可以同时取得多个任务,并同时去执行所取得的这些任务。并行模式相当于将长长的一条队列,划分成了多条短队列,所以并行缩短了任务队列的长度。并行的效率从代码层次上强依赖于多进程/多线程代码,从硬件角度上则依赖于多核CPU。
并发
并发(concurrent)指的是多个程序可以同时运行的现象,更细化的是多进程可以同时运行或者多指令可以同时运行。但这不是重点,在描述并发的时候也不会去扣这种字眼是否精确,并发的重点在于它是一种现象,并发描述的是多进程同时运行的现象。但实际上,对于单核心 CPU来说,同一时刻只能运行一个线程。所以,这里的”同时运行”表示的不是真的同一时刻有多个线程运行的现象,这是并行的概念,而是提供一种功能让用户看来多个程序同时运行起来了,但实际上这些程序中的进程不是一直霸占CPU的,而是执行一会停一会。
小结
并发:同一时刻多个线程在访问同一个资源,多个线程对一个点;
例子:春运抢票 电商秒杀.
并行:多项工作一起执行,之后再汇总。
-
管程(Monitor)
Monitor
翻译过来是监视器,我们平时所说的锁就是一种监视器,是一种同步机制,保证同一个时间只能有一个线程能去访问被保护的数据或代码;-
用户线程and守护线程
用户线程
自定义线程,平时用到的线程;
特定:主线程结束,用户线程还在运行,jvm存活;Thread aa = new Thread(() -> {
//线程名:Thread.currentThread().getName()
//是否是守护线程:Thread.currentThread().isDaemon()
...
}, "用户线程名");
aa.start();
守护线程
特殊线程,运行在后台,比如垃圾回收;
特定:主线程结束,没有用户线程,守护线程结束,jvm结束;Thread aa = new Thread(() -> {
//线程名:Thread.currentThread().getName()
//是否是守护线程:Thread.currentThread().isDaemon()
...
}, "守护线程名");
aa.setDaemon(true);
aa.start();
Lock
接口关键字
Synchronized
作用范围
synchronized
是Java中的关键字,是一种同步锁。它修饰的对象有以下几种: 修饰一个代码块,被修饰的代码块称为同步语句块,
- 其作用的范围是
大括号{
括起来的代码,作用的对象是调用这个代码块的对象;。
- 其作用的范围是
修饰一个方法,被修饰的方法称为同步方法,
第一步:创建资源类,在资源类创建属性和操作方法;
- 第二步:创建多个线程,调用资源类的操作方法;
//第一步:创建资源类,在资源类创建属性和操作方法;
class Ticket{
private int number=30;
public synchronized void sale(){
//加上synchronized关键字就可对方法加锁,保证数据安全性
if(number>0){
System.out.print(Thread.currentThread().getName());
System.out.println(":卖出:1张票,还剩下:"+--number);
}
}
}
public class SaleTicker {
public static void main(String[] args) {
Ticket ticket=new Ticket();
//第二步:创建多个线程,调用资源类的操作方法;
new Thread(new Runnable() {
@Override
public void run() {
for(int i=0;i<30;i++){
ticket.sale();
}
}
},"线程1").start();
new Thread(new Runnable() {
@Override
public void run() {
for(int i=0;i<30;i++){
ticket.sale();
}
}
},"线程2").start();
new Thread(new Runnable() {
@Override
public void run() {
for(int i=0;i<30;i++){
ticket.sale();
}
}
},"线程3").start();
}
}
什么是
lock
接口介绍
Lock
锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作。它们允许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对象。Lock
提供了比synchronized
更多的功能。Lock
与的Synchfonized
区别:Lock
不是Java
语言内置的,synchronized
是Java
语言的关键字,因此是内置特性。Lock
是一个类,通过这个类可以实现同步访问。Lock
和synchronized
有一点非常大的不同,采用synchronized
不需要用户去手动释放锁,当synchronized
方法或者synchronized
代码块执行完之后,系统会自动让线程释放对锁的占用;而Lock
则必须要用户去手动释放锁,如果没有主动释放锁,就有可能导致出现死锁现象。。使用
可重入锁:可重入就是说某个线程已经获得某个锁,可以再次获取锁而不会出现死锁;class Ticket{
private int number=30;
private final ReentrantLock lock=new ReentrantLock();//创建可重入锁
public void sale(){
lock.lock();//上锁
if(number>0){
System.out.print(Thread.currentThread().getName());
System.out.println(":卖出:1张票,还剩下:"+--number);
}
lock.unlock();//解锁
}
}
Lock
和synchronized
有以下几点不同:
Lock
是一个接口,而synchronized
是Java中的关键字,synchronized
是内置的语言实现;synchronized
在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock
在发生异常时,如果没有主动通过unLock()
去释放锁,则很可能造成死锁现象,因此使用Lock
时需要在finally
块中释放锁。Lock
可以让等待锁的线程响应中断,而synchronized
却不行,使用synchronized
时,等待的线程会一直等待下去,不能够响应中断。- 通过
Lock
可以知道有没有成功获取锁,而synchronized
却无法办到。 Lock
可以提高多个线程进行读操作的效率:- 在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时
Lock
的性能要远远优于synchronized
。线程间通信
jdk
官方解释:Class Object
void notify()
译:唤醒在此对象监视器上等待的单个线程; void notifyAll()
译:唤醒在此对象监视器上等待的所有线程; void wait( )
译:导致当前的线程等待,直到其他线程调用此对象的notify()
方法或notifyAll()
方法; void wait(long timeout)
译:导致当前的线程等待,直到其他线程调用此对象的notify()
方法或notifyAll()
方法,或者指定的时间过完。 void wait(long timeout, int nanos)
译:导致当前的线程等待,直到其他线程调用此对象的notify()
方法或notifyAll()
方法,或者其他线程打断了当前线程,或者指定的时间过完。
- 在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时
小总结:
wait()
,notify()
,notifyAll()
都不属于Thread
类,而是属于Object
基础类,也就是每个对象都有wait()
,notify()
,notifyAll()
功能,因为每个对象都有锁,锁是每个对象的基础,当然操作锁的方法也是最基础了。- 当需要调用以上的方法的时候,一定要对竞争资源进行加锁,如果不加锁的话,则会报
IllegalMonitorStateException
异常; - 当想要调用
wait()
进行线程等待时,必须要取得这个锁对象的控制权(对象监视器),一般是放到synchronized(obj)
代码中。 - 在while循环里而不是if语句下使用
wait
,这样,会在线程暂停恢复后都检查wait
的条件,并在条件实际上并未改变的情况下处理唤醒通知; - 调用
obj.wait()
释放了obj
的锁,否则其他线程也无法获得obj
的锁,也就无法在synchronized(obj){obj.notify()}
代码段内唤醒A。 notify()
方法只会通知等待队列中的第一个相关线程(不会通知优先级比较高的线程);notifyAll()
通知所有等待该竞争资源的线程(也不会按照线程的优先级来执行);- 假设有三个线程执行了
obj.wait()
,那么obj.notifyAll()
则能全部唤醒tread1
,thread2
,thread3
,但是要继续执行obj.wait()
的下一条语句,必须获得obj
锁,因此,tread1
,thread2
,thread3
只有一个有机会获得锁继续执行,例如tread1
,其余的需要等待thread1
释放obj
锁之后才能继续执行。 当调用
obj.notify/notifyAll
后,调用线程依旧持有obj
锁,因此,tread1
,thread2
,thread3
虽被唤醒,但是仍无法获得obj锁。直到调用线程退出synchronized
块,释放obj
锁后,thread1,thread2,thread3
中的一个才有机会获得锁继续执行。实操
class Share{ private int number=0;//初始值 public synchronized void incr() throws InterruptedException{ if(number!=0){//如果number不是0,等待 this.wait();//wait()方法特点:在哪里等待,在哪里被唤醒 } number++;//number值是0,加1 System.out.println(Thread.currentThread().getName()+"::"+number); this.notifyAll();//通知其他所有线程【线程间通信】 } public synchronized void decr() throws InterruptedException{ if(number!=1){//如果number不是1,等待 this.wait(); } number--;//number值是0,减1 System.out.println(Thread.currentThread().getName()+"::"+number); this.notifyAll();//通知其他所有线程【线程间通信】 } } public class SaleTicker { public static void main(String[] args) { Share share=new Share(); new Thread(()->{ //lambda表达式方式实现Runable for(int i=0;i<10;i++){ try { share.incr(); } catch (InterruptedException e) { e.printStackTrace(); } } },"加一线程").start(); new Thread(()->{ for(int i=0;i<10;i++){ try { share.decr(); } catch (InterruptedException e) { e.printStackTrace(); } } },"减一线程").start(); } }
虚假唤醒问题
解决方法:用while()
来替代if()
:class Share{ private int number=0;//初始值 public synchronized void incr() throws InterruptedException{ while(number!=0){//如果number不是0,等待 //使用while()防止虚假唤醒问题 this.wait(); } number++;//number值是0,加1 System.out.println(Thread.currentThread().getName()+"::"+number); this.notifyAll();//通知其他所有线程【线程间通信】 } public synchronized void decr() throws InterruptedException{ while(number!=1){//如果number不是1,等待 this.wait(); } number--;//number值是0,减1 System.out.println(Thread.currentThread().getName()+"::"+number); this.notifyAll();//通知其他所有线程【线程间通信】 } }
不同方法实现线程间通信总结
synchronized
方法如上:
资源类方法名使用
synchronized
关键字修饰,- 等待使用
this.wait();
, -
Lock
方法 Lock
替换synchronized
方法和语句的使用,Condition
取代了对象监视器方法的使用。class Share1{ private int number=0; private Lock lock=new ReentrantLock();//创建一个可重入锁 private Condition condition=lock.newCondition(); //new一个Condition对象,使用这个对象来等待和通知其他线程 public void incr() throws InterruptedException { lock.lock();//上锁 try { while (number!=0){//使用while()防止虚假唤醒问题 condition.await();//等待 } number++; System.out.println(Thread.currentThread().getName()+"::"+number); condition.signalAll();//通知其他all线程 }finally { lock.unlock();//释放锁 } } }
线程间定制化通信
就是使线程按照约定的顺序进行调用;
没有什么新技术,就是设置一个标志位,例如
private int flag=1;
然后使用while()
对这个标志位进行判断是wait
还是进行操作,并且进行操作后对这个标志位进行赋值,然后另一个也使用while()
进行判断。- 总结来说就是使用
while()
来判断标志位,之后根据判断结果wait
或进行操作,抢占顺序之类的没有影响,因为有这个while()
判断;集合的线程安全
线程不安全演示
ArrayList
报错:List<String> list=new ArrayList<>(); for(int i=0;i<10;i++){ new Thread(()->{ list.add(UUID.randomUUID().toString().substring(0,8)); System.out.println(list); },String.valueOf(i)).start(); }
Exception in thread “8” java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911) at java.util.ArrayList$Itr.next(ArrayList.java:861) at java.util.AbstractCollection.toString(AbstractCollection.java:461) at java.lang.String.valueOf(String.java:2994) at java.io.PrintStream.println(PrintStream.java:821)
问题就出在System.out.println(list);
这,主要由于ArrayList
的add()
没加synchronized
关键字:
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}
也就是线程不安全的,由于在高并发的情况下使得输出的时候可能其他线程正在add()
中,使两者产生冲突报错;
HashSet
Set<String> set= new HashSet<>();
for(int i=0;i<1000;i++){
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(set);
},String.valueOf(i)).start();
}
并发现修改问题,报错如下:
Exception in thread “998” Exception in thread “186” java.util.ConcurrentModificationException at java.util.HashMap$HashIterator.nextNode(HashMap.java:1469) at java.util.HashMap$KeyIterator.next(HashMap.java:1493) at java.util.AbstractCollection.toString(AbstractCollection.java:461) at java.lang.String.valueOf(String.java:2994) at java.io.PrintStream.println(PrintStream.java:821)
HashMap
解决方案
ArrayList
Vector
方案
使用Vector
来替代ArrayList
:同ArrayList
一样,Vector
也是List
接口的实现类,并且观察Vector
的源码发现,他的大多数实现方法都是加了synchronized
关键字的;
public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);
elementData[elementCount++] = e;
return true;
}
collections
方案
可以通过collections
类里面的一个静态方法,来修饰ArrayList
,使其线程安全;主要操作如下:
List<String> list= Collections.synchronizedList(new ArrayList<>());
同样collections
类里面还有其他类似的静态方法
但这种方案依然比较古老,不推荐;
CopyOnWriteArrayList
方案
JUC,java.util.concurrent
包中的CopyOnWriteArrayList
类来替代ArrayList
;
List<String> list= new CopyOnWriteArrayList<>();
HashSet
同样使用JUC,java.util.concurrent
包中的类来实现,这里使用CopyOnWriteArraySet
类来替代HashSet
:
Set<String> set= new CopyOnWriteArraySet<>();
HashMap
同样使用JUC,java.util.concurrent
包中的类来实现,这里使用ConcurrentHashMap
类来替代HashMap
:
Map<String,String> map=new ConcurrentHashMap<>();
多线程锁
synchronized
锁的范围分析
当一个资源类里的非静态方法使用synchronized
修饰时:
- 就好比我们把这个资源类比作是厕所,方法比作是单间坑位,
synchronized
锁其中一个厕所里所有的有锁坑位,并不影响去另一个厕所的人,只影响去这个厕所里有锁的坑位的人;
当一个资源类里的静态方法static
使用synchronized
修饰时:
- 这时锁的就不是当前对象(厕所)的此方法(坑位)
this
,而是当前资源类的class
字节码对象;
总结:synchronized
实现同步的基础:Java中的每一个对象都可以作为锁,具体表现为以下3种形式。
- 对于普通同步方法,锁是当前实例对象。
- 对于静态同步方法,锁是当前类的
class
对象。 -
公平锁和非公平锁
公平锁:
private Lock lock=new ReentrantLock(true);
所有线程都会干到活;
- 执行效率相对较低;
非公平锁:private Lock lock=new ReentrantLock(false);
或private Lock lock=new ReentrantLock();
- 是抢占式的,抢的多抢的少全靠自己,可能出现一个线程把活全干了,其他线程饿死的情况;
优点是执行效率高;
public ReentrantLock() { sync = new NonfairSync(); } //NonfairSync():非公平锁 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } //FairSync():公平锁
可重入锁
可重入锁
ReentrantLock
:可重入就是说某个线程已经获得某个锁,可以再次获取锁而不会出现死锁;synchronized
(隐式)和lock
(显式)都是可重入锁;死锁
概念:两个或以上的线程由于争夺对方锁资源,会不退让,同时等待对方释放锁的现象,如果没有外力干涉,程序无法再执行下去;
产生原因:
- 系统资源不足;
- 进程运行推进顺序不合理;
- 资源分配不当;
死锁演示
死锁验证,首先控制台输入命令public class DeadLock { public static String a=new String(); public static String b=new String(); public static void main(String[] args) { Thread t1= new Thread(() -> { synchronized (a) { System.out.println(Thread.currentThread().getName() + "已获取a的锁,正在争取b的锁"); synchronized (b) { System.out.println(Thread.currentThread().getName() + "已获取ab的锁"); } } }); Thread t2= new Thread(() -> { synchronized (b) { System.out.println(Thread.currentThread().getName() + "已获取b的锁,正在争取a的锁"); synchronized (a) { System.out.println(Thread.currentThread().getName() + "已获取ba的锁"); } } }); t1.start(); t2.start(); } }
jps -l
(类似于linux中的**ps -ef**
)查看所有进程,选择一个你认为有死锁的进程,记住进程号,之后控制台输入jstack 进程号
,如果出现下面情况说明有死锁:
Callable
接口
目前我们学习了有两种创建线程的方法-一种是通过创建Thread
类,另一种是通过使用Runnable
创建线程。但是,Runnable
缺少的一项功能是:当线程终止时(即run()
完成时),我们无法使线程返回结果。为了支持此功能,Java中提供了Callable
接口。
Callable 接口的特点如下(重点):
- 为了实现
Runnable
,需要实现不返回任何内容的run()
方法,而对于Callable
,需要实现在完成时返回结果的call()
方法。 call()
方法可以引发异常,而run()
则不能。-
FutureTask
import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; class MyCallableThread implements Callable { @Override public Object call() throws Exception { return 250; } } public class CallableTest { public static void main(String[] args) { FutureTask<Integer> futureTask1=new FutureTask<>(new MyCallableThread()); //传统方法 FutureTask<Integer> futureTask2=new FutureTask<>(()->{ return 666; });//使用lambda表达式方法 } }
FutureTask
创建线程使用
FutureTask(未来任务)
有一个特点:只有第一次才进行计算,之后调用get()
方法得到返回值都是直接返回第一次计算出的结果;FutureTask<Integer> futureTask2=new FutureTask<>(()->{ System.out.println(Thread.currentThread().getName()+"来了"); return 666; });//使用lambda表达式方法 new Thread(futureTask2,"线程2").start(); System.out.println("返回结果:"+futureTask2.get());
JUC-辅助类
减少计数
CountDownLatch
CountDownLatch
类可以设置一个计数器,然后 通过
countDown
方法来进行减1的操作,- 使用
await
方法等待计数器不大于0,然后继续执行await
方法之后的语句。
CountDownLatch
主要有两个方法,
- 当一个或多个线程调用
await
方法时,这些线程会阻塞。 - 其它线程调用
countDown
方法会将计数器减1(调用countDown
方法的线程不会阻塞)。 - 当计数器的值变为0时,因
await
方法阻塞的线程会被唤醒,继续执行。
输出(同学的离开顺序不确定,但是最后锁门是已经确定的):import java.util.concurrent.CountDownLatch; public class CountDownLatchTest { public static void main(String[] args) { /*使的最后才锁门*/ CountDownLatch countDownLatch=new CountDownLatch(6); //创建一个CountDownLatch对象,初始值设置为6 for(int i=0;i<6;i++){ new Thread(()->{ System.out.println(Thread.currentThread().getName()+"号离开教室"); countDownLatch.countDown();//计数器-1 },String.valueOf(i)).start(); } try { countDownLatch.await();//等待,直到计数器值为0才执行下面代码 System.out.println("班长锁门了"); } catch (InterruptedException e) { e.printStackTrace(); } } }
0号离开教室 2号离开教室 3号离开教室 1号离开教室 4号离开教室 5号离开教室 班长锁门了
循环栅栏CyclicBarrier
允许一组线程全部等待彼此达到共同屏障点的同步辅助。循环阻塞在涉及固定大小的线程方的程序中很有用,这些线程必须偶尔等待彼此。屏障被称为循环 ,因为它可以在等待的线程被释放之后重新使用。CyclicBarrier
看英文单词可以看出大概就是循环阻塞的意思,在使用中CyclicBarrier
的构造方法第一个参数是目标障碍数,每次执行CyclicBarrier
的await()
方法一次障碍数会加一,如果达到了目标障碍数,才会执行 cyclicBarrier.await()
之后的语句和初始化后设定的达到障碍数之后做的事情。可以将CyclicBarrier
的await()
方法理解为加1操作。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
private static int NUMVER=7;
public static void main(String[] args) {
CyclicBarrier cyclicBarrier=new CyclicBarrier(NUMVER,
()->{System.out.println("\n集齐了7颗龙珠了");});
//第1个参数是目标障碍数;第2个参数是达到障碍数之后做的事情(Runnable对象)
for (int i=1;i<NUMVER+1;i++){
new Thread(()->{
try {
System.out.print(" 龙珠"+Thread.currentThread().getName());
cyclicBarrier.await();
System.out.print ("许愿望 ");
} catch (Exception e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
输出:
龙珠1 龙珠3 龙珠4 龙珠2 龙珠5 龙珠6 龙珠7 集齐了7颗龙珠了 许愿望 许愿望 许愿望 许愿望 许愿望 许愿望 许愿望
信号塔Semaphore
一个计数信号量。在概念上,信号量维持一组许可证。如果有必要,每个acquire()
(从该信号量获取许可证)都会阻塞,直到许可证可用,然后才能使用它。每个release()
(释放许可证,将其返回到信号量)添加许可证,潜在地释放阻塞获取方。但是,没有使用实际的许可证对象;Semaphore
只保留可用数量的计数,并相应地执行。信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) {
Semaphore semaphore=new Semaphore(3);
for(int i=1;i<=6;i++){
new Thread(()->{
System.out.println("线程"+Thread.currentThread().getName()+"等待中***");
try {
semaphore.acquire();//抢占
System.out.println("线程"+Thread.currentThread().getName()+"抢到<----");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("线程"+Thread.currentThread().getName()+"将要释放---->");
semaphore.release();//释放
}
},String.valueOf(i)).start();
}
}
}
结果:
线程1等待中 线程2等待中 线程2抢到武器<—— 线程3等待中 线程3抢到武器<—— 线程5等待中 线程1抢到武器<—— 线程4等待中 线程6等待中 线程3将要释放——> 线程2将要释放——> 线程1将要释放——> 线程4抢到武器<—— 线程5抢到武器<—— 线程6抢到武器<—— 线程6将要释放——> 线程5将要释放——> 线程4将要释放——>
JUC-读写锁ReentrantReadWriteLock
- 读锁:共享锁;
- 写锁:独占锁;
- 读锁和写锁之间会发生死锁,写锁和写锁之间会发生死锁,读锁和读锁之间不会发生死锁;
- 对于不同线程之间读读共享,读写、写写互斥;
- 但是对于同一个线程来说获取读锁时也可以获取其写锁(必须先获取写锁再获取读锁才行);
使用
private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
//首先声明一个ReentrantReadWriteLock对象 readWriteLock.writeLock().lock();//添加写锁并上锁 readWriteLock.writeLock().unlock();//释放写锁 readWriteLock.readLock().lock();//添加读锁并上锁 readWriteLock.readLock().unlock();//释放读锁
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class MyCache{
private volatile Map<String,Object> map=new HashMap<>();
/*
volatile是Java提供的一种轻量级的同步机制。Java语言包含两种内在的同步机制:
同步块(或方法)和 volatile变量,相比于synchronized(synchronized通常称为重量级锁),
volatile更轻量级,因为它不会引起线程上下文的切换和调度。但是volatile变量的同步性较差
(有时它更简单并且开销更低),而且其使用也更容易出错。
*/
private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
//首先声明一个ReentrantReadWriteLock对象
//放数据
public void put(String key,Object value){
readWriteLock.writeLock().lock();//添加写锁并上锁
System.out.println(Thread.currentThread().getName()+"正在对《"+key+"》进行写操作");
try {
Thread.sleep(300);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入数据《"+key+"》成功");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
readWriteLock.writeLock().unlock();//释放写锁
}
}
//取数据
public Object get(String key){
readWriteLock.readLock().lock();//添加读锁并上锁
System.out.println(Thread.currentThread().getName()+"正在对《"+key+"》进行读操作");
Object r=new Object();
try {
Thread.sleep(300);
r= map.get(key);
System.out.println("数据《"+r+"》已读到");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
readWriteLock.readLock().unlock();//释放读锁
}
return r;
}
}
public class readWrite {
public static void main(String[] args) {
MyCache myCache=new MyCache();
for (int i=0;i<5;i++){
final int num=i;
new Thread(()->{
myCache.put(num+"",num+"");
},String.valueOf(i)).start();
}
for (int i=0;i<5;i++){
final int num=i;
new Thread(()->{
myCache.get(num+"");
},String.valueOf(i)).start();
}
}
}
读写锁的演变
**Synchronized**
与**ReentrantLock**
区别:
- 这两种同步方式有很多相似之处,它们都是加锁方式同步,而且都是阻塞式的同步,也就是说当如果一个线程获得了对象锁,进入了同步块,其他访问该同步块的线程都必须阻塞在同步块外面等待,而进行线程阻塞和唤醒的代价是比较高的;
- 这两种方式最大区别就是对于
Synchronized
来说,它是java语言的关键字,是原生语法层面的互斥,需要jvm
实现。而ReentrantLock
它是JDK 1.5
之后提供的API
层面的互斥锁,需要lock()
和unlock()
方法配合try/finally
语句块来完成 - 便利性:很明显
Synchronized
的使用比较方便简洁,并且由编译器去保证锁的加锁和释放,而ReentrantLock
需要手工声明来加锁和释放锁,为了避免忘记手工释放锁造成死锁,所以最好在finally
中声明释放锁。 锁的细粒度和灵活度:很明显
ReentrantLock
优于Synchronized
;读写锁的降级
读锁
不能升级为写锁
;ReentrantReadWriteLock rrwl=new ReentrantReadWriteLock(); ReentrantReadWriteLock.WriteLock writeLock = rrwl.writeLock(); ReentrantReadWriteLock.ReadLock readLock = rrwl.readLock(); new Thread(()->{ writeLock.lock();//1、获取写锁 ..... readLock.lock();//2、获取读锁 ..... writeLock.unlock();//3、释放写锁(此步就是锁降级操作) readLock.unlock();//4、释放读锁 },"1").start();
阻塞队列
BlockingQueue
介绍
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue
都给你一手包办了。常见的
BlockingQueue
ArrayBlockingQueue
(常用)基于数组的阻塞队列实现,在
ArrayBlockingQueue
内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue
内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。ArrayBlockingQueue
在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue
;按照实现原理来分析,ArrayBlockingQueue
完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。之所以没这样去做,也许是因为ArrayBlockingQueue
的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。ArrayBlockingQueue
和LinkedBlockingQueue
间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node
对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue
时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。-
LinkedBlockingQueue
(常用) 基于链表的阻塞队列,同
ArrayListBlockingQueue
类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue
可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue
之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。ArrayBlockingQueue
和LinkedBlockingQueue
是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。一句话总结∶由链表结构组成的有界(大小默认值为
Integer.MaxValue)
阻塞队列。DelayQueue
DelayQueue
中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue
是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。-
PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的
Compator
对象来决定),但需要注意的是PriorityBlockingQueue
并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。- 因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现
PriorityBlockingQueue
时,内部控制线程同步的锁采用的是公平锁。 -
SynchronousQueue
一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的
BlockingQueue
来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。- 一句话总结:不存储元素的阻塞队列,也即是单个元素的队列;
声明一个
SynchronousQueue
有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:
公平模式:
SynchronousQueue
会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;。非公平模式(
SynchronousQueue
默认):SynchronousQueue
采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。LinkedTransferQueue
LinkedTransferQueue
是一个由链表结构组成的无界阻塞TransferQueue
队列。相对于其他阻塞队列,LinkedTransferQueue
多了tryTransfer
和transfer
方法。LinkedTransferQueue
采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为null
)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为null
的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。-
LinkedBlockingDeque
LinkedBlockingDeque
是一个由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移除元素。- 对于一些指定的操作,在插入或者获取队列元素时如果队列状态不允许该操作可能会阻塞住该线程直到队列状态变更为允许操作,这里的阻塞一般有两种情况:
- 插入元素时:如果当前队列已满将会进入阻塞状态,一直等到队列有空的位置时再讲该元素插入,该操作可以通过设置超时参数,超时后返回
false
表示操作失败,也可以不设置超时参数一直阻塞,中断后抛出InterruptedException
异常。 - 读取元素时:如果当前队列为空会阻塞住直到队列不为空然后返回元素,同样可以通过设置超时参数。
- 插入元素时:如果当前队列已满将会进入阻塞状态,一直等到队列有空的位置时再讲该元素插入,该操作可以通过设置超时参数,超时后返回
-
BlockingQueue
核心方法使用
BlockingQueue blockingQueue=new ArrayBlockingQueue(3); blockingQueue.xxx
ThreadPool
线程池线程池简介
线程池(英语∶
thread pool
):一种线程使用模式。线程过多会带来调度开销进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。- 线程池的优势:线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
它的主要特点为:
作用:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。
- 特征:线程池中的线程处于一定的量,可以很好的控制线程的并发量。线程可以重复被使用,在显示关闭之前,都将一直存在。超出一定量的线程被提交时候需在队列中等待;
输出://newFixedThreadPool1池n线程 ExecutorService threadPool1 = Executors.newFixedThreadPool(5);//这里只有5个线程 try { for(int i=0;i<10;i++){ threadPool1.execute(()->{ System.out.println("线程池名字:"+Thread.currentThread().getName()); }); } }finally { threadPool1.shutdown();//连接放回线程池 }
线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-5 线程池名字:pool-1-thread-5 线程池名字:pool-1-thread-4 线程池名字:pool-1-thread-3 线程池名字:pool-1-thread-2 线程池名字:pool-1-thread-3 线程池名字:pool-1-thread-4 线程池名字:pool-1-thread-5 线程池名字:pool-1-thread-1
Executors.newSingleThreadExecutor()
(一个任务一个任务执行,一池一线程)
//一池一线程
ExecutorService threadExecutor = Executors.newSingleThreadExecutor();//1个线程
try {
for(int i=0;i<10;i++){
threadExecutor.execute(()->{
System.out.println("线程池名字:"+Thread.currentThread().getName());
});
}
}finally {
threadExecutor.shutdown();//连接放回线程池
}
输出:
线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-1
Executors.newCachedThreadPool()
(线程池根据需求创建线程,可扩容,遇强则强)
//可扩容线程
ExecutorService threadPool = Executors.newCachedThreadPool();
try {
for(int i=0;i<10;i++){
threadPool.execute(()->{
System.out.println("线程池名字:"+Thread.currentThread().getName());
});
}
}finally {
threadPool.shutdown();//连接放回线程池
}
输出:
线程池名字:pool-1-thread-1 线程池名字:pool-1-thread-6 线程池名字:pool-1-thread-3 线程池名字:pool-1-thread-4 线程池名字:pool-1-thread-7 线程池名字:pool-1-thread-8 线程池名字:pool-1-thread-2 线程池名字:pool-1-thread-5 线程池名字:pool-1-thread-10 线程池名字:pool-1-thread-9
源码
//newFixedThreadPool,1池n线程
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//newSingleThreadExecutor,//一池一线程
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
//newCachedThreadPool,可扩容线程
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
观察源码发现,三者的源码都是ThreadPoolExecutor
;
ThreadPoolExecutor
七个参数介绍
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {...
**corePoolSize**
:核心线程数- 核心线程会一直存活,即使没有任务需要执行;
- 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理;
- 设置
allowCoreThreadTimeout=true
(默认false
)时,核心线程会超时关闭;
**maximumPoolSize**
:最大线程数;**keepAliveTime**
:非核心线程存活时间;**unit**
:存活时间单位;**workQueue**
:阻塞队列,当核心线程都用完后,请求就会放到阻塞队列中进行等待;**threadFactory**
:线程工厂,用于创建线程;**handler**
:超过阻塞队列时新来的线程请求拒绝策略;ThreadPoolExecutor
工作流程自定义线程池
//自定义线程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, 5, 2L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() );
Fork/Join
分支合并框架简介
Fork/Join
它可以将一个大的任务拆分成多个子任务(子任务分配线程来操作,实现多线程)进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。Fork/Join
框架要完成两件事情:Fork
:把一个复杂任务进行分拆,大事化小。-
步骤
任务分割:首先
Fork/Join
框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割。- 执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据。
使用示例
实现一个从1加到100的操作,一次只能两个加数相加,并且两个加数的差必须小于10
package juc.lock;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
class MyTask extends RecursiveTask<Integer>{
//拆分时两个加数差值不能超过10
private static final Integer VALUE=10;
private int begin;
private int end;
private int result;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
System.out.println("线程名子:"+Thread.currentThread().getName());
if((end-begin)<VALUE){
for(int i=begin;i<=end;i++){
result+=i;
}
}else {
int mid=(begin+end)/2;//求出中间值
MyTask myTask1=new MyTask(begin,mid);//左边
MyTask myTask2=new MyTask(mid+1,end);//右边
myTask1.fork();//调用方法拆分
myTask2.fork();
result=myTask1.join()+myTask2.join();//合并
}
return result;
}
}
public class Fork_join_Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask myTask=new MyTask(1,100);
//创建我们自定义的RecursiveTask对象
ForkJoinPool forkJoinPool=new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
//创建分支合并对象
System.out.println("最终结果:"+forkJoinTask.get());
forkJoinPool.shutdown();//关闭池对象
}
}
输出:
线程名子:ForkJoinPool-1-worker-12 …… 最终结果:5050
CompletableFuture
异步回调
//异步调用,无返回值
CompletableFuture<Void> completableFuture1=CompletableFuture.runAsync(()->{
System.out.println("无返回值线程名称:"+Thread.currentThread().getName());
});
completableFuture1.get();
//异步调用,有返回值
CompletableFuture<Integer> completableFuture2=CompletableFuture.supplyAsync(()->{
System.out.println("有返回值线程名称:"+Thread.currentThread().getName());
return 110;
});
completableFuture2.whenComplete((t,u)->{
System.out.println("-----t:"+t);//返回值
System.out.println("-----u:"+u);//异常信息
}).get();