并发并行
并发:同一时间 应对 多件事情的能力称为并发
并行:同一时间 处理 多件事情的能力称为并行
预备知识
创建线程的三种方式
/***第一种,创建匿名内部类,重写run方法*/new Thread(){@Overridepublic void run() {System.out.println(123);}}.start();//lambda简化new Thread(()->{System.out.println("lambda简化");}).start();/***第二种,创建类继承Runnable接口,重写run方法,将该类作为参数传递**/public class Mythread implements Runnable{@Overridepublic void run() {System.out.println("第二种创建线程的方法");}}Thread t1 =new Thread(new Mythread());t1.start();/***第三种,使用FetureTask,线程有返回值**/FutureTask<Integer> task =new FutureTask<>(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {return 1;}});Thread t2 =new Thread(task);t2.start();task.get();//阻塞等待线程返回结果
第三章,常用的方法
join() //等待线程运行结束getId() //获取线程唯一长整形idgetName() //获取线程名setName() //修改线程名getPriority() //获取优先级setPriority() //设置优先级getState() //获取线程状态,6个枚举值isAlive() //是否存活isInterrupted() //是否被打断,不会清除打断标记interrupt() //打断线程Thread.currentThread() //获取当前线程Thread.interrupted() //是否被打断,会清除打断标记,静态方法Thread.sleep() //线程休眠Thread.yield() //当前线程让出cup
线程状态
线程的状态(五种)
六种
线程状态转换

假设有线程 Thread t
- new —->runnable
- 调用t.start(),从new 转换为runnable
- runnable —->waiting
- t线程用synchronized(obj)获取对象锁后
- 调用obj.wait()时,t线程从runnable —> waiting
- 调用obj.notify(),obj.notifyAll(),t.interrupt()时
- 竞争锁成功,t线程从waiting —>runnable
- 竞争失败,t线程从waiting —>blocked
- runnable <—->waiting
- 当前线程调用t.join()之后,当前线程从runnable —>waiting
- t运行结束后,当前线程从waiting —>runnable
- 当前线程调用LockSupport.park()之后,当前线程从 runnable —>waiting
- 调用LockSupport.unpark(目标线程),或者调用线程的interrupt(),会让目标线程从waiting —>runnable
- runnable <—>timed_waiting
- t线程用synchronized(obj)获取对象锁后
- 调用obj.wait(long n)方法时,t线程从runnable —>timed_waiting
- t线程等待超过 n毫秒,或者调用obj.notify(),obj.notifyAll(),t.interrupt()时
- 竞争锁成功,t线程从timed_waiting—>runnable
- 竞争失败,t线程从timed_waiting->>blocked
- 当前线程调用t.join(long n),当前线程从runnable —>timed_waiting
- 当前线程等待超过n毫秒,或者t线程运行结束,或者调用了当前线程的 interrupt()时,当前线程从timed_waiting —>runnable
- 当前线程 调用Thread.sleep(long n),当前线程从runnable —>timed_waiting
- 当前线程等待超过n毫秒,当前线程从timed_waiting —>runnable
- 当前线程调用LockSupport.parkNanos(long nanos)或LockSupport.parkUntil(long millis)时,当前线程从runnable —>timed_waiting
- 调用LockSupport.unpark(目标线程),或者调用了线程的interrupt(),或者等待超时,当前线程从timed_waiting —>runnable
- runnable —>blocked
- t线程使用synchronized(obj)获取对象锁时,竞争失败,t线程从runnable —>blocked
- obj锁被其他线程释放后,会唤醒该对象上所有blocked的线程重新竞争,如果t线程竞争成功,从 blocked —>runnable,否则仍处于blocked。
- runnable —>terminated
- 当前线程所有代码运行完毕后,进入terminated
第四章
基本概念
临界区: 一段代码存在对共享资源的多线程读写操作,称这段代码块为临界区
竞态条件: 多个线程在临界区内执行,由于代码块的 执行序列不同而导致结果无法预测,称之为发生了 竞态条件
解决方案
阻塞式解决方案: synchronized,Lock 非阻塞的解决方案: 原子变量
synchronized
语法
synchronized(对象){临界区}
异常
synchronized加在方法上(锁的是this对象)
public synchronized void test(){}//等价于public void test(){synchronized (this){}}
synchronized加在静态方法上(锁的是类对象)
public synchronized static void test(){}//等价于public static void test(){synchronized (Mytest.class){}}
Monitor(监视器)
Monitor是系统提供的,结构如下

工作原理: Thread-0获取锁的时候,Owner指向Thread-0,之后其他线程来获取锁的时候,会存放到EntryList中进入阻塞状态,当Thread-0释放锁之后,会重新唤醒EntryList中的线程进行竞争。当线程没有满足条件,调用wait()方法之后,会释放锁,并且存入waitSet中,当满足条件后会被唤醒并且参与竞争,运行wait()之后的代码。
Mark Word
对象头中的Mark Word(标记字)主要用来表示对象的线程锁状态,另外还可以用来配合GC、存放该对象的hashCode;
轻量级锁

当线程执行synchronized时,会在栈帧中创建一条锁记录,且此时没有其他线程竞争时,锁对象的Mark Word会存放锁记录的地址,并且交换值,表示轻量级锁。

轻量级锁解锁过程: 交换值,删除锁记录。
锁膨胀

当线程执行synchronized时,锁对象被多个线程竞争,或者已经被其他线程使用,会执行锁膨胀过程。申请一个重量锁,Monitor 此时锁对象的Mark Word会指向Monitor的地址,并且当前持有锁的线程存放到Owner,没有竞争到锁的线程存放到EntryList中进行阻塞,等待唤醒。
自旋优化
线程2获取锁时,如果失败并不会立刻进入阻塞状态,而是自旋重新获取锁,重复几次成功则执行临界资源 如果自旋失败,进入Monitor的EntryList进行等待。
偏向锁
java6开始使用偏向锁进行优化。 偏向锁的Mark Word 存放的是 线程id + 101 偏向锁默认开启的。锁对象的Mark Word中存放的不是锁记录的地址,而是线程的id。当无线程竞争的时候,就不会给同一个线程重复加锁。 偏向锁默认开启,但是有延迟,加上JVM参数取消延迟 -XX:BiasedLockingStartupDelay=0 禁用偏向锁: -XX:-UseBiasedLocking
<--需要引入包--><dependency><groupId>org.openjdk.jol</groupId><artifactId>jol-core</artifactId><version>0.16</version></dependency>
偏向锁偏向撤销 1、调用hashCode,将 线程id替换为 锁记录地址 2、其他线程竞争锁,锁会变为轻量级锁
批量重偏向 如果对象虽然被多个线程访问,但没有竞争,这时偏向了线程 T1 的对象仍有机会重新偏向 T2,重偏向会重置对象的 Thread ID;当撤销偏向锁达到阈值 20 次后,jvm 会这样觉得,我是不是偏向错了呢,于是会在给这些对象加锁时重新偏向至t2。因为前19次是轻量,释放之后为无锁不可偏向,但是20次后面的是偏向t2,释放之后依然是偏向t2。
批量撤销
- 当一个偏向锁如果撤销次数到达40的时候就认为这个对象设计的有问题;那么JVM会把这个对象所对应的类所有的对象都撤销偏向锁;并且新实例化的对象也是不可偏向的
- t1线程创建40个a对象,t2撤销偏向锁40次,t3开始加锁。t1 中40个对象都是偏向锁,t2撤销19次开始偏向t2,t3撤销19次后所有对象都会被JVM设置为不偏向,并且同一个类中创建的心类也不偏向。
锁消除
锁消除是发生在编译器级别的一种锁优化方式。
有时候我们写的代码完全不需要加锁,却执行了加锁操作。 jvm在运行的时候会优化代码进行锁消除
线程安全分析
设计模式
wait和notify
原理
线程获取锁之后但缺少运算条件,执行wait方法之后会释放锁并进入waitSet,进入waiting状态,当生产该条件的线程执行完毕后,会执行notify唤醒该线程进入entryList重新竞争锁。
API介绍
实例
public class TestWaitify {private static final Object room =new Object();private static Boolean hasOk = false;private static Boolean hasMail = false;public static void main(String[] args) throws InterruptedException {new Thread(()->{synchronized(room){while(!hasOk){try {System.out.println("waiting1");room.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("线程一ok");}}).start();new Thread(()->{synchronized(room){while(!hasMail){System.out.println("waiting2");try {room.wait();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("2Ok");}}}).start();TimeUnit.SECONDS.sleep(3);hasMail = true;synchronized (room){System.out.println("notify");room.notifyAll();}}}
虚假唤醒 notify()没有正确唤醒准备好的线程,这种情况称之为虚假唤醒。
保护性暂停
原理
一个线程等待另一个线程的执行结果,如果结果不断从一个线程到另一个线程,需要使用消息队列。
实例
package com.example.thread;import java.util.concurrent.TimeUnit;/*** @author thesky* @date 2021/9/8 9:59*/public class Stop {public static void main(String[] args) {Guard guard = new Guard();//线程1,等待线程2给结果new Thread(()->{try {Object o = guard.get(2000);if (o !=null){System.out.println("线程1收到结果");}else{System.out.println("超时");}} catch (InterruptedException e) {e.printStackTrace();}},"线程1").start();//线程2new Thread(()->{Object o = new Object();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("线程2生产");guard.put(o);},"线程2").start();}}class Guard{//结果private Object response;//获取结果public Object get(long timeout) throws InterruptedException {synchronized(this){long begin = System.currentTimeMillis();long passTime = 0;/* 下面超时优化while(response==null){//没有结果,线程等待System.out.println("没有结果,等待。。。。");this.wait();}*/while(response==null){if (passTime > timeout){break;}//没有结果,线程等待long delay = timeout - passTime;System.out.println("没有结果,等待。。。。");this.wait(delay);passTime = System.currentTimeMillis() - begin;}return response;}}//生产结果public void put(Object obj){synchronized(this){this.response = obj;System.out.println("结果传递,唤醒线程");this.notifyAll();}}}
托管多个
package com.example.thread;import java.util.Hashtable;import java.util.Map;import java.util.Set;import java.util.concurrent.TimeUnit;/*** @author thesky* @date 2021/9/8 9:59*/public class Stop {public static void main(String[] args)for (int i = 0; i < 3; i++) {new People().start();}try {System.out.println("休眠");TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}for (Integer id : MailBox.getIds()) {new PostMan(id,id+"123456").start();}}}class Guard{private int id;public int getId(){return id;}public Guard(int id){this.id=id;}//结果private Object response;//获取结果public Object get(long timeout) throws InterruptedException {synchronized(this){long begin = System.currentTimeMillis();long passTime = 0;while(response==null){long delay = timeout - passTime;if (delay <= 0){break;}//没有结果,线程等待System.out.println("没有结果,等待。。。。");this.wait(delay);passTime = System.currentTimeMillis() - begin;}return response;}}//生产结果public void put(Object obj){synchronized(this){this.response = obj;System.out.println("结果传递,唤醒线程");this.notifyAll();}}}class People extends Thread{@Overridepublic void run() {Guard guard = MailBox.creatBox();try {System.out.println("等待");Object o = guard.get(5000);if (o!=null){System.out.println("get message!!!");}} catch (InterruptedException e) {e.printStackTrace();}}}class PostMan extends Thread{private int mailId;private String message;public PostMan(int mailId,String message){this.mailId=mailId;this.message=message;}@Overridepublic void run() {Guard byId = MailBox.getById(mailId);byId.put(message);}}class MailBox{private static Map<Integer,Guard> box=new Hashtable<>();private static int id = 1;//生成idprivate static synchronized int generateId(){return id++;}public static Guard creatBox(){Guard guard = new Guard(generateId());box.put(guard.getId(),guard);return guard;}public static Set<Integer> getIds(){return box.keySet();}public static Guard getById(int id){return box.remove(id);}}
生产者消费者
实例——阻塞队列
package com.example.thread;import java.util.LinkedList;import java.util.concurrent.TimeUnit;/*** @author thesky* @date 2021/9/8 14:07*/public class Link {public static void main(String[] args) {MyQueue myQueue =new MyQueue(2);new Thread(()->{while (true){myQueue.take();}},"123").start();for (int i = 0; i < 3; i++) {int tem = i;new Thread(()->{myQueue.push(tem+"hao");}).start();}}}class MyQueue{private Integer capcity;private LinkedList<String> queues =new LinkedList<>();public MyQueue(int capcity){this.capcity = capcity;}public void push(String message){synchronized (queues){while (queues.size()==capcity){try {System.out.println("生产者等待");queues.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("生成");queues.addLast(message);queues.notifyAll();}}public String take(){synchronized(queues){while (queues.isEmpty()){try {System.out.println("消费者等待");queues.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("消费");queues.notifyAll();String s = queues.removeFirst();return s;}}}
park &unpark
与wait和notify的区别
- wait和notify必须配合Monitor使用,但是unpark不需要
- park和unpark以线程为单位,阻塞唤醒线程,但是notify只能随机唤醒一个线程,notifyAll唤醒所有线程,精确度不同。
- 可以先unpark,但是不能先notify。
原理
每个线程都会关联一个Parker对象。里面有三个属性,counter = 0时,调用park,获得_mutex互斥锁,线程进入_cond条件变量阻塞,调用unpark时,counter=1,且最多为1,所以多次调用unpar无效。此时唤醒_cond条件变量中的线程,设置counter=0。 当先调用unpark时,设置counter = 1,之后调用park,此时counter = 1,无需阻塞,设置counter = 0
实例
package com.example.thread;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.LockSupport;/*** @author thesky* @date 2021/9/8 19:16*/public class Park {public static void main(String[] args) {Thread thread = new Thread(() -> {System.out.println("start");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("park");LockSupport.park();System.out.println("continue");});thread.start();try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("unpark");LockSupport.unpark(thread);}}
固定运行顺序
多把锁
多个不想关的锁可以提高并发度,但是会出现死锁现象
死锁
死锁是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象
哲学家问题解决
活锁
两个线程改变彼此的停止条件导致无法停止,称之为活锁 解决方法: 增加随机睡眠时间
线程饥饿
线程优先级太低,导致很长时间不被cup调度导致无法完成。
ReenterantLock
特点
多个条件变量
private Condition condition1 = lock.newCondition(); //多变量private ReentrantLock lock =new ReentrantLock(true); //开启公平锁,默认不填为非公平锁public void reenlock(){Thread thread1 = new Thread(() -> {try {while(true){if (lock.tryLock()){System.out.println("t1上锁成功");if (!a){try {System.out.println("条件不满足");condition1.await();} catch (InterruptedException e) {e.printStackTrace();}}else{System.out.println("条件满足");break;}}}} finally {lock.unlock();System.out.println("t1释放锁");}});thread1.start();try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}if (lock.tryLock()) {System.out.println("main获取锁");a = true;condition1.signalAll();lock.unlock();}}
AQS
aqs是同步器框架,自定义同步器需要继承和重写方法
package demo.thread;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.AbstractQueuedSynchronizer;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;/*** @author thesky* @date 2021/10/20 14:55*/public class MyLock implements Lock {class Mysyn extends AbstractQueuedSynchronizer{@Override //尝试获取锁protected boolean tryAcquire(int arg) {if (compareAndSetState(0,1)){//给当前线程加上锁setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}@Override //尝试释放锁protected boolean tryRelease(int arg) {setExclusiveOwnerThread(null);setState(0);return true;}@Override //是否独占锁protected boolean isHeldExclusively() {return getState() == 1;}public Condition newCondition(){return new ConditionObject();}}private Mysyn mysyn =new Mysyn();@Override // 加锁public void lock() {mysyn.acquire(1);}@Override // 可打断加锁public void lockInterruptibly() throws InterruptedException {mysyn.acquireInterruptibly(1);}@Override //尝试加锁public boolean tryLock() {return mysyn.tryAcquire(1);}@Override //带超时的尝试加锁public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return mysyn.tryAcquireNanos(1,unit.toNanos(time));}@Override //释放锁public void unlock() {mysyn.release(1); //release会唤醒阻塞的线程,tryRelease不会唤醒}@Override //创建条件变量public Condition newCondition() {return mysyn.newCondition();}}
ReenterantLock实现原理,理解AQS
构造器

ReenterantLock 构造器默认实现的是非公平锁
线程加锁成功
非公平锁上锁,调用 compareAndSetState ,成功就将锁设置给当前线程,否则进入 acquire(1)方法。
final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}
加锁失败
acquire会在次尝试获得锁,依旧失败就 addWaiter()构造一个Node队列 Node双向链表,第一位 是null,成为哑元或者哨兵,用来占位并不关联线程。 这个链表的元素,前一个结点状态是-1,需要唤醒后一个节点,最后的节点状态是 0 当锁释放,队列中第一的线程可以抢占锁,如果此时队列外的线程先一步抢占到了锁,队列中的线程就抢占失败,因此称之为非公平锁。
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
锁重入和释放
线程上锁,判断锁的owner是不是自己,如果是自己,就会让state+1,之后释放锁的时候每次 state-1
(不可)可打断原理
不可打断: 线程被打断之后,并不会立即返回打断标记,而是只有当该线程获得锁之后,才会返回打断标记(true),并执行打断操作。 可打断: 线程park之后阻塞,如果被打断,if语句就是true,就立刻抛出异常,该线程也就停止了。
公平锁
如果状态为 0 ,会先判断队列中有没有等待线程,如果没有就获取锁,如果有就不会竞争锁。
条件变量
当调用await后,如果该condition不存在,就创建一个Node,然后将线程存入,state设置为-3,当调用该变量的signal后,会将该节点 加入到 阻塞队列的队尾,并设置状态为0,之前的队尾节点状态设置为-1。
读写锁 ReentrantReadWriteLock
获取写锁之后,未释放写锁还可以获取读锁,称之为降级 获取读锁之后,未释放锁不能获得读锁,称之为升级 可重入
ReentrantReadWriteLock reentrantReadWriteLock =new ReentrantReadWriteLock();try{reentrantReadWriteLock.readLock().lock();System.out.println("read.....");}finally {reentrantReadWriteLock.readLock().unlock();}try{reentrantReadWriteLock.writeLock().lock();System.out.println("写....");//锁降级reentrantReadWriteLock.readLock().lock();System.out.println("read.....");reentrantReadWriteLock.readLock().unlock();}finally {reentrantReadWriteLock.writeLock().unlock();}
StampedLock
Semaphore
在一定时间内限制访问共享资源的线程数量
CountDownLatch
CyclicBarrier
第五章
可见性
原子性
volatile修饰的虽然可以保证可见性,但是不能保证原子性。
有序性
指令重排
jvm在执行代码的时候,会进行优化,因此会产生指令重排
禁用:
使用volatile,volatile指令会加入读写屏障
DCL(double checked locking)
保护共享资源
CAS(Check and Set)
AtomicInteger balance;private volatile AtomicInteger money = new AtomicInteger(1000);public void reenlock(){int a = 100;while (true){int prev = money.get();int next = prev - a;if (money.compareAndSet(prev,next)) {break;}}}
原子整数 AtomicInteger
AtomicInteger money = new AtomicInteger(1000);money.getAndIncrement(); //i++money.incrementAndGet(); //++imoney.decrementAndGet(); //--imoney.addAndGet(10); //先加在读
原子引用
AtomicReference<BigDecimal>
ABA问题
当其他线程修改共享变量为 A -> B ->A时,另一个线程不会发觉共享变量被修改过
解决,加版本号
AtomicStampedReference<String> ref;int stamp = ref.getStamp;ref.compareAndSet(oldvalue,newvalue,stamp,stamp+1);
是否被更改过
AtomicMarkableReference
原子数组
AtomicIntegerArrayAtomicLongArrayAtomicReferenceArray<T>
字段更新器
AtomicIntegerFieldUpdaterAtomicLongFieldUpdaterAtomicReferenceFieldUpdater
原子累加器
@Testpublic void test2(){demo(()->new AtomicLong(0),adder->{adder.getAndIncrement();});//AtomicLong 没有 LongAdder效率高demo(()->new LongAdder(),adder->{adder.increment();});}private <T> void demo(Supplier<T> adder, Consumer<T> action){T a = adder.get();}
扩展,函数式接口
LongAdder源码
缓存行伪共享

cpu有独立的一级缓存 二级缓存,以及共享的三级缓存,三级缓存之下是内存。如果两个cpu处理同一个数据,会造成缓存行时效。 一个缓存行存放64个字节的 cell对象24个字节,一个缓存行可以存放两个cell对象 所以使用@sun.misc.Contended注解,为使用的对象前后各加128个字节大小的padding,使每一个缓存行只能存放一个cell对象,这样不会造成对方的缓存行时效。
Unsafe
非常底层的对象,只能通过反射获得
//反射获取try {Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");theUnsafe.setAccessible(true); //允许获取私有属性Unsafe unsafe =(Unsafe) theUnsafe.get(null);System.out.println(unsafe);} catch (Exception e) {e.printStackTrace();}
小结
第七章
不可变对象
例如String 利用保护性拷贝的方式,在修改的时候,创建一个新的对象。但是创建对象过于频繁
享元模式(连接池)
使用场景:重用数量有限的同一类对象时 享元模式(Flyweight Pattern)主要用于减少创建对象的数量,以减少内存占用和提高性能。这种类型的设计模式属于结构型模式,它提供了减少对象数量从而改善应用所需的对象结构的方式。
public class Pool {Logger logger = LoggerFactory.getLogger(Pool.class);//连接池大小private final int poolsize;//连接对象数组private Connection[] connections;//连接状态数组private AtomicIntegerArray states;//初始化方法public Pool(int size){this.poolsize = size;this.connections = new Connection[size];this.states = new AtomicIntegerArray(new int[poolsize]);for (int i =0;i<size;i++){connections[i] = new MyConnection();}}//借连接public Connection borrow(){while(true){for (int i = 0; i < poolsize; i++) {if (states.get(i)==0) {if (states.compareAndSet(i,0,1)) {logger.info("借出{}",connections[i]);return connections[i];}}}synchronized(this){try {logger.info("满了");this.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}//归还连接public void free(Connection con){for (int i = 0; i < poolsize; i++) {if (connections[i]==con) {states.set(i,0);logger.info("回收{}",con);synchronized (this){this.notifyAll();}break;}}}}class MyConnection implements Connection{//实现}
final原理
final会增加写屏障
小结
第八章,线程池
阻塞队列
线程池队列的作用,当线程池的线程无法处理过多的任务时,可以讲任务存放到队列中,等线程处理当前任务之后慢慢消费。所以队列功能有两个,存储任务,供线程获取任务。 在存储任务的时候,如果队列空间已满,此时可以用不同的策略处理,比如阻塞等待和带超时的等待或者其他策略。 线程获取任务的时候,如果队列是空的,可以让线程阻塞等待或者超时等待。
package demo.thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.ArrayDeque;import java.util.Deque;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;/*** @author thesky* @date 2021/10/16 17:23*/public class BlockQueue<T> {Logger logger = LoggerFactory.getLogger(BlockQueue.class);private Deque<T> queue = new ArrayDeque<>();private ReentrantLock lock =new ReentrantLock();private Condition fullWaitSet = lock.newCondition();private Condition emptyWaitSet = lock.newCondition();private int capcity;public BlockQueue(int capcity){this.capcity =capcity;}//带超时的阻塞获取public T poll(long timeout, TimeUnit unit){lock.lock();try{long nanos = unit.toNanos(timeout);while (queue.isEmpty()){try {if (nanos<=0){return null;}nanos = emptyWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal();return t;}finally{lock.unlock();}}// 阻塞获取任务public T take(){lock.lock();try{while (queue.isEmpty()){try {emptyWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal();return t;}finally{lock.unlock();}}//策略模式//向队列中存放任务public void put(T task){lock.lock();try{while (queue.size()==capcity){try {fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}queue.addLast(task);emptyWaitSet.signalAll();}finally {lock.unlock();}}//带超时的put任务public void putTimeout(T task,long timeout, TimeUnit unit){lock.lock();try{long nanos = unit.toNanos(timeout);while (queue.size()==capcity){try {if (nanos>0){nanos = fullWaitSet.awaitNanos(nanos);}} catch (InterruptedException e) {e.printStackTrace();}}queue.addLast(task);fullWaitSet.signal();}finally{lock.unlock();}}public void tryPut(RejectPolicy<T> rejectPolicy,T task){lock.lock();try{//队列是否已满if (queue.size()==capcity){logger.info("满了");rejectPolicy.reject(this,task);}else{logger.info("没满");queue.addLast(task);emptyWaitSet.signal();}}finally {lock.unlock();}}public int size(){lock.lock();try {return queue.size();} finally {lock.unlock();}}}
线程池
线程池定义了 队列类型, 核心线程数以及救济线程数。任务是由线程池执行的,线程池获得任务之后,将任务分配给继承Thread类的线程处理对象(称线程),线程处理任务,当线程处理完之后,线程不能立刻结束,而是应该检查队列中是否有任务,如果有,则取出继续执行。当所有任务处理完之后,核心线程继续阻塞等待。救济线程结束移除。
package demo.thread;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.HashSet;import java.util.concurrent.TimeUnit;/*** @author thesky* @date 2021/10/16 17:52*/public class MyPool {Logger logger = LoggerFactory.getLogger(MyPool.class);private BlockQueue<Runnable> taskQueue;//线程集合private HashSet<Worker> workers = new HashSet();private int coreSize;private long tiemout;private TimeUnit timeUnit;private RejectPolicy<Runnable> rejectPolicy;public MyPool(int coreSize, long tiemout, TimeUnit unit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {logger.info("构造线程池");this.coreSize = coreSize;this.tiemout = tiemout;this.timeUnit = unit;this.taskQueue = new BlockQueue<>(queueCapcity);this.rejectPolicy =rejectPolicy;}public void execute(Runnable task){//当任务数没有超过核心时直接执行synchronized (workers){if (workers.size()<coreSize){logger.info("增加worker");Worker worker =new Worker(task);workers.add(worker);worker.start();}//如果超过了,加入任务队列else{logger.info("{}存入队列",task);//taskQueue.put(task);//死等 带超时时间等待 放弃任务 抛出异常 调用线程自己执行taskQueue.tryPut(rejectPolicy,task);}}}class Worker extends Thread{private Runnable task;public Worker(Runnable task){this.task =task;}@Overridepublic void run() {//当task !=null//执行完毕之后,去任务队列中获取//最后结束while (task!=null || (task = taskQueue.poll(tiemout,timeUnit))!=null){try {logger.info("执行{}",task);task.run();} catch (Exception e) {e.printStackTrace();}finally {task = null;}}synchronized (workers){logger.info("worker移除{}",this);workers.remove(this);}}}}
策略模式
策略模式可以灵活的调用 对象提供的策略,让用户自定处理逻辑
package demo.thread;/*** @author thesky* @date 2021/10/17 15:42*/@FunctionalInterfacepublic interface RejectPolicy<T> {void reject(BlockQueue<T> queue,T task);}//例子MyPool myPool = new MyPool(1,1, TimeUnit.SECONDS,2,(queue,task)->{//queue.put(task);queue.poll(1,TimeUnit.SECONDS);});public void tryPut(RejectPolicy<T> rejectPolicy,T task){lock.lock();try{//队列是否已满if (queue.size()==capcity){logger.info("满了");rejectPolicy.reject(this,task); //再此调用传参}else{logger.info("没满");queue.addLast(task);emptyWaitSet.signal();}}finally {lock.unlock();}}
线程池状态
java自带的线程池ThreadPoolExecutor
固定线程池
固定核心线程池
带缓冲线程池
全是救济线程池,队列只能存放1个任务,之后阻塞等待线程取走。
newSingleThreadExecutor

线程固定为1,该线程池队列处理任务先进先处理
线程池提交任务
execute(Runable command);<T> Future submit(Callable<T> task);invokeAllinvokeAny
关闭线程池
shutdown();shutdownNow(); //没有执行的任务返回
工作线程模式
当任务类型不一样的时候,可以设置多个池分别处理
分配合理线程池大小
任务调度
ScheduledExecutorService//延时执行任务pool.schedule(()->{},延时时间,单位)//定时执行任务pool.scheduleAtFixedRate(()->{},延时时间,间隔时间,单位)pool.scheduleWithFixedDelay(()->{},延时时间,间隔时间,单位)
六种
