这个专题算是尚硅谷的视频笔记
在什么情况下可以用最少的资源做最多的事情,这个我们要知道
不能想到多线程就加锁
volatile关键字与内存可见性
来看一下不用volatile的方式, 在ThreadDemo类中定义了flag, 多线程下修改了值为true,但是在main线程了没有打印出结果,就说明在main线程下flag的值还是false.那是什么原因呢?是因为内存可见性问题,当有多个线程访问共享数据的时候,JVM会为每一个线程分配一个独立的缓存来提高效率, 这也就带来了内存可见性问题.
public class VolatileTest {
public static void main(String[] args) {
ThreadDemo td = new ThreadDemo();
new Thread(td).start();
while (true) {
if(td.isFlag()){
System.out.println("-----------------");
break;
}
}
}
}
class ThreadDemo implements Runnable {
private boolean flag = false;
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true;
System.out.println("flag = "+flag);
}
public boolean isFlag() {
return flag;
}
public void setFlag(boolean flag) {
this.flag = flag;
}
}
当然,要解决内存可见性问题,使用 synchronized 来加锁也是可以的,但是如果使用 synchronized 加锁就会带来性能问题.
//这样使用synchronized也可以,但是影响性能
while (true) {
synchronized(td){
if(td.isFlag()){
System.out.println("-----------------");
break;
}
}
}
相较于synchronized来说, volatile是一种较为轻量级的同步策略.
只需要在定义flag的时候加上volatile关键字即可.
// 加上volatile
private volatile boolean flag = false;
但是需要注意的是:
- volatile不具有互斥性.
- volatile不能保证变量的原子性
原子性
向i++
的操作,就不是原子性的.
在java.util.concurrent.atomic
包下面有很多常用的原子变量.
原子变量都是用volatile修饰的,保证了内存可见性,用了cas算法保证了数字的原子性.
CAS算法是硬件对于并发操作共享数据的支持.
CAS包含了三个操作数:
内存值V
预估值 E
更新值 N
当且仅当 V == E时,V=N,否则,什么都不做.
CAS算法
应用
java5开始,在java.util.concurrent
包下提供了大量支持高效并发访问的集合接口和实现类. 如ConcurrentHashMap等线程安全集合.
引入概念
这些线程安全类底层实现用的就是CAS算法(Compare and Swap 比较交换 ).实现方式是基于硬件平台的汇编指令.也就是说,CAS是靠硬件实现的.
优点
这些算法相对于 synchronized 是比较乐观的, 它不会像 synchronized 一样,当一个线程访问数据的时候,其他线程都阻塞. synchronized 不管是否有线程冲突都会进行加锁,由于 CAS 是非阻塞的,所以不会有死锁问题.并且线程之间的相互影响也非常小,更重要的是,使用无锁的方式完全没有锁竞争带来的系统开销.也没有线程频繁调度带来的开销,所以它要比锁的方式拥有更优越的性能.
实现原理
假设现在有两个线程 T1和T2,在他们各自的运行环境中都有共享变量的副本 V1 , V2 , 预期只存中的值还没有被改变 , 假设现在在并发环境 , 并且T1先拿到了执行权限 , 失败的线程不会被挂起 , 而是被告知这次竞争失败 , 并可以再次发起尝试 .
此时T1线程比较主存中的V和T1线程中的E1,发现E1 = V,也就是说预期是正确的 , 所以执行N1 = V1+1 , 并且将 N1 的值传回主存 . 这时候主存中的V = 21.
线程T2拿到执行权的时候,也要将V和E2进行比较 , 此时因为主存已经被修改 , 所以T2线程再将主存中的值更新到自己的副本中,再发起重试. 直到满足条件.
CurrentHashMap锁分段机制
HashMap是线程不安全的,HashTable是线程安全的。
HashTable底层加了锁,是对整张表加了锁,线程安全,效率低。
CurrentHashMap采用锁分段机制。
CurrentLevel 锁分段级别。
每个段都是独立的锁,就是Segment
但是jdk1.8之后,segment也被取消了,CurrentHashMap底层开始使用CAS。
实现Callable接口
public class TestCallable {
public static void main(String[] args) {
ThreadDemo td = new ThreadDemo();
//如果是实现Runnable接口,可以直接Thread.start(td)就行了
//但是,我们这个有返回值,需要一个东西去接收返回结果
// 执行Callable方式,使用FutureTask实现类去接收结果
FutureTask<Integer> result = new FutureTask<>(td);
new Thread(result).start();
try {
//可以使用get方法来获取返回值
result.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
class ThreadDemo implements Callable {
// 实现Callable的接口,重写call方法
// call方法和run方法的区别可以有返回值,并且可以抛出异常
@Override
public Object call() throws Exception {
int sum = 0;
for(int i = 0;i<=100;i++){
sum+=i;
System.out.println(i);
}
return sum;
}
}
同步锁Lock
Synchronized是隐式锁
Lock是一个显示锁,需要通过lock来加锁,使用unlock来释放锁。
如果不加锁,就会产生线程安全问题
public class TestLock {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(ticket,"1号窗口").start();
new Thread(ticket,"2号窗口").start();
new Thread(ticket,"3号窗口").start();
}
}
//这样就会产生多线程安全问题
class Ticket implements Runnable {
private int tick = 100;
@Override
public void run() {
while(tick>0){
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread().getName() + "完成售票,余票为"+ --tick);
}
}
}
加个锁
public class TestLock {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(ticket,"1号窗口").start();
new Thread(ticket,"2号窗口").start();
new Thread(ticket,"3号窗口").start();
}
}
//这样就会产生多线程安全问题
class Ticket implements Runnable {
private int tick = 100;
private Lock lock = new ReentrantLock();
@Override
public void run() {
while(tick>0){
lock.lock();
try {
if(tick>0){
System.out.println(Thread.currentThread().getName() + "完成售票,余票为"+ --tick);
}
}catch (Exception e){
}finally {
lock.unlock();
}
}
}
}
使用同步锁Lock来完成等待唤醒机制
完成等待唤醒机制就是使用synchronized锁时候的那个wait和notify。
完成等待唤醒最经典的案例就是生产者和消费者案例。
这个还没写完,后续再补充。
public class TestProductAndConsumer {
public static void main(String[] args) {
//一个店员
Clerk clerk = new Clerk();
Productor pro = new Productor(clerk);
Concumer cus = new Concumer(clerk);
new Thread(pro,"生产者A").start();
new Thread(cus,"消费者A").start();
}
}
//店员
class Clerk{
private int product = 0;
//进货
public void get(){
if(product>=10){
System.out.println("产品已满!");
}else{
System.out.println(Thread.currentThread().getName()+" :" + ++product);
}
}
//卖货
public void sale(){
if(product<=0){
System.out.println("缺货");
}else{
System.out.println(Thread.currentThread().getName()+" :" + --product);
}
}
}
//生产者
class Productor implements Runnable {
private Clerk clerk;
public Productor(Clerk clerk){
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.get();
}
}
}
//消费者
class Concumer implements Runnable{
private Clerk clerk;
public Concumer(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
clerk.sale();
}
}
}
Condition 线程通信
在Condition对象中,与wait、notify和notifyAll方法对应的分别是await,signal和signalAll。
编写一个程序,开启3个线程,这三个线程的ID分别是A B C,每个线程将自己的ID在屏幕上打印10遍,要求输出的结果必须按照顺序显示。如ABCABCABC… 依次递归。
public class TestABCAiternate {
/**
* 编写一个程序,开启3个线程,这三个线程的ID分别是A B C,每个线程将自己的ID在屏幕上打印10遍,要求输出的结果必须按照顺序显示。
* 如ABCABCABC... 依次递归
*/
public static void main(String[] args) {
ABCDemo demo = new ABCDemo();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
demo.LoopA();
}
}
},"A").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
demo.LoopB();
}
}
},"B").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
demo.LoopC();
}
}
},"C").start();
}
}
class ABCDemo {
private int number = 1; //当前正在执行线程的标记
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void LoopA(){
lock.lock();
try {
//判断
if(number!=1){
condition1.await();
}
//打印
System.out.println(Thread.currentThread().getName());
//唤醒
number = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void LoopB(){
lock.lock();
try {
//判断
if(number!=2){
condition2.await();
}
//打印
System.out.println(Thread.currentThread().getName());
//唤醒
number = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void LoopC(){
lock.lock();
try {
//判断
if(number!=3){
condition3.await();
}
//打印
System.out.println(Thread.currentThread().getName());
//唤醒
number = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
ReadWriteLock 读写锁
可以多个线程读,只能一个线程写,和前面用lock是一样的,不过需要定义的是ReadWriteLock。
//定义的时候用ReadWriteLock
private ReadWriteLock lock = new ReentrantReadWriteLock();
// 使用的时候也加上readLock或者writeLock
lock.ReadLock().lock;
lock.WriteLock.lock();
线程池
首先,线程池是有类别的,有得是核心线程,有的是非核心线程。所以我们需要两个变量标识核心线程数量coreSize和最大线程数量maxSize。
其次,我们需要一个任务队列来存放任务,这个队列必须是线程安全的,一般使用BlockingQueue阻塞队列来充当。
最后,当任务越来越多而线程处理却不及时,队列满了,线程数也达到最大线程数了,这时候就需要走拒绝策略。常用的拒绝策略有丢弃当前任务,丢弃最老的任务,调用者自己处理等。
为什么要你区分核心线程呢?
这是为了控制系统中线程的数量。
- 当线程池中的线程数没有达到核心线程数的时候,来了一个任务加一个线程是可以的,可以提高任务执行的效率。
- 当线程池中的线程数达到核心线程数的时候,就要控制一下线程的数量了,来任务先进队列。
- 如果任务执行足够快,这些核心线程很快就可以处理完队列中的任务,就没有必要新增线程。
- 如果队列中的任务也满了,这时候只靠核心线程就没有办法处理了,就要增加新的线程,但是线程也不能无限制的增加,所以需要控制最大线程数量maxSize。
实现线程池
//拒绝策略接口
public interface RejectPolicy {
void reject(Runnable task,MyThreadPoolExecutor myThreadPoolExecutor);
}
/**
* 丢弃当前任务
*/
public class DiscardRejectPolicy implements RejectPolicy{
@Override
public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {
System.out.println("discard one task");
}
}
实现一个线程池
public class MyThreadPoolExecutor implements Executor {
//线程池的名字
private String name;
//线程序列号
private AtomicInteger sequence = new AtomicInteger(0);
//核心线程数
private int coreSize;
//最大线程数
private int maxSize;
//任务队列
private BlockingQueue<Runnable> taskQueue;
//拒绝策略
private RejectPolicy rejectPolicy;
//当前运行的线程数
private AtomicInteger runningCount = new AtomicInteger(0);
public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
this.name = name;
this.coreSize = coreSize;
this.maxSize = maxSize;
this.taskQueue = taskQueue;
this.rejectPolicy = rejectPolicy;
}
@Override
public void execute(Runnable task) {
//正在运行的线程数
int count = runningCount.get();
//如果正在运行的线程数小于核心线程数,直接加一个线程
if (count<coreSize){
// 这里不一定能添加成功,addWorker方法里面还要判断一次是不是确实小于核心线程
if(addWorker(task,true)){
return;
}
//如果添加核心线程失败
//todo 创建核心线程失败的逻辑
}
//如果达到了核心线程数,尝试让任务入队
//这里使用offer,是因为offer在队列满了的时候会返回false
if(taskQueue.offer(task)){
}else{
//如果入队失败,说明队列满了,添加一个非核心线程
if(!addWorker(task,false)){
//如果添加非核心线程失败了,就执行拒绝策略
rejectPolicy.reject(task,this);
}
}
}
private boolean addWorker(Runnable newTask,boolean core){
//自旋判断是不是真的可以创建一个线程
for (;;){
// 正在运行的线程数
int count = runningCount.get();
//核心线程还是非核心线程
int max = core?coreSize:maxSize;
//如果不能满足创建线程的条件,直接返回false
if(count>=max){
return false;
}
//修改runningCount成功,可以创建线程
if(runningCount.compareAndSet(count,count+1)){
//线程的名字
String threadName = (core?"core_":"")+name+sequence.incrementAndGet();
//创建线程并启动
new Thread(()->{
System.out.println("thread name : "+Thread.currentThread().getName());
//运行的任务
Runnable task = newTask;
// 不断从任务队列中取任务执行,如果取出来的任务为null,则跳出循环,线程也就结束了
while(task!=null || (task = getTask())!=null){
try{
//执行任务
task.run();
}finally {
//任务执行完成,置为空
task = null;
}
}
},threadName).start();
break;
}
}
return true;
}
private Runnable getTask(){
try{
//task方法会一直阻塞直到取到任务为止
return taskQueue.take();
}catch(InterruptedException e){
//线程中断了,返回null可以结束当前线程
//当前线程要结束了,要把runningCount的数量减一
runningCount.decrementAndGet();
return null;
}
}
}
进行测试
public class MyThreadPoolExecutortest {
public static void main(String[] args) {
Executor threadPool = new MyThreadPoolExecutor("test",5,10,new ArrayBlockingQueue<>(15),new DiscardRejectPolicy());
AtomicInteger num = new AtomicInteger(0);
for (int i = 0; i < 100; i++) {
threadPool.execute(()->{
try{
Thread.sleep(1000);
System.out.println("running :"+System.currentTimeMillis()+" : "+num.incrementAndGet());
}catch (InterruptedException e){
e.printStackTrace();
}
});
}
}
}