并发并行
并发:同一时间 应对 多件事情的能力称为并发
并行:同一时间 处理 多件事情的能力称为并行
预备知识
创建线程的三种方式
/**
*第一种,创建匿名内部类,重写run方法
*/
new Thread(){
@Override
public void run() {
System.out.println(123);
}
}.start();
//lambda简化
new Thread(()->{
System.out.println("lambda简化");
}).start();
/**
*第二种,创建类继承Runnable接口,重写run方法,将该类作为参数传递
*
*/
public class Mythread implements Runnable{
@Override
public void run() {
System.out.println("第二种创建线程的方法");
}
}
Thread t1 =new Thread(new Mythread());
t1.start();
/**
*第三种,使用FetureTask,线程有返回值
*
*/
FutureTask<Integer> task =new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 1;
}
});
Thread t2 =new Thread(task);
t2.start();
task.get();//阻塞等待线程返回结果
第三章,常用的方法
join() //等待线程运行结束
getId() //获取线程唯一长整形id
getName() //获取线程名
setName() //修改线程名
getPriority() //获取优先级
setPriority() //设置优先级
getState() //获取线程状态,6个枚举值
isAlive() //是否存活
isInterrupted() //是否被打断,不会清除打断标记
interrupt() //打断线程
Thread.currentThread() //获取当前线程
Thread.interrupted() //是否被打断,会清除打断标记,静态方法
Thread.sleep() //线程休眠
Thread.yield() //当前线程让出cup
线程状态
线程的状态(五种) 六种
线程状态转换
假设有线程 Thread t
- new —->runnable
- 调用t.start(),从new 转换为runnable
- runnable —->waiting
- t线程用synchronized(obj)获取对象锁后
- 调用obj.wait()时,t线程从runnable —> waiting
- 调用obj.notify(),obj.notifyAll(),t.interrupt()时
- 竞争锁成功,t线程从waiting —>runnable
- 竞争失败,t线程从waiting —>blocked
- runnable <—->waiting
- 当前线程调用t.join()之后,当前线程从runnable —>waiting
- t运行结束后,当前线程从waiting —>runnable
- 当前线程调用LockSupport.park()之后,当前线程从 runnable —>waiting
- 调用LockSupport.unpark(目标线程),或者调用线程的interrupt(),会让目标线程从waiting —>runnable
- runnable <—>timed_waiting
- t线程用synchronized(obj)获取对象锁后
- 调用obj.wait(long n)方法时,t线程从runnable —>timed_waiting
- t线程等待超过 n毫秒,或者调用obj.notify(),obj.notifyAll(),t.interrupt()时
- 竞争锁成功,t线程从timed_waiting—>runnable
- 竞争失败,t线程从timed_waiting->>blocked
- 当前线程调用t.join(long n),当前线程从runnable —>timed_waiting
- 当前线程等待超过n毫秒,或者t线程运行结束,或者调用了当前线程的 interrupt()时,当前线程从timed_waiting —>runnable
- 当前线程 调用Thread.sleep(long n),当前线程从runnable —>timed_waiting
- 当前线程等待超过n毫秒,当前线程从timed_waiting —>runnable
- 当前线程调用LockSupport.parkNanos(long nanos)或LockSupport.parkUntil(long millis)时,当前线程从runnable —>timed_waiting
- 调用LockSupport.unpark(目标线程),或者调用了线程的interrupt(),或者等待超时,当前线程从timed_waiting —>runnable
- runnable —>blocked
- t线程使用synchronized(obj)获取对象锁时,竞争失败,t线程从runnable —>blocked
- obj锁被其他线程释放后,会唤醒该对象上所有blocked的线程重新竞争,如果t线程竞争成功,从 blocked —>runnable,否则仍处于blocked。
- runnable —>terminated
- 当前线程所有代码运行完毕后,进入terminated
第四章
基本概念
临界区: 一段代码存在对共享资源的多线程读写操作,称这段代码块为临界区
竞态条件: 多个线程在临界区内执行,由于代码块的 执行序列不同而导致结果无法预测,称之为发生了 竞态条件
解决方案
阻塞式解决方案: synchronized,Lock 非阻塞的解决方案: 原子变量
synchronized
语法
synchronized(对象){
临界区
}
异常
synchronized加在方法上(锁的是this对象)
public synchronized void test(){
}
//等价于
public void test(){
synchronized (this){
}
}
synchronized加在静态方法上(锁的是类对象)
public synchronized static void test(){
}
//等价于
public static void test(){
synchronized (Mytest.class){
}
}
Monitor(监视器)
Monitor是系统提供的,结构如下
工作原理: Thread-0获取锁的时候,Owner指向Thread-0,之后其他线程来获取锁的时候,会存放到EntryList中进入阻塞状态,当Thread-0释放锁之后,会重新唤醒EntryList中的线程进行竞争。当线程没有满足条件,调用wait()方法之后,会释放锁,并且存入waitSet中,当满足条件后会被唤醒并且参与竞争,运行wait()之后的代码。
Mark Word
对象头中的Mark Word(标记字)主要用来表示对象的线程锁状态,另外还可以用来配合GC、存放该对象的hashCode;
轻量级锁
当线程执行synchronized时,会在栈帧中创建一条锁记录,且此时没有其他线程竞争时,锁对象的Mark Word会存放锁记录的地址,并且交换值,表示轻量级锁。
轻量级锁解锁过程: 交换值,删除锁记录。
锁膨胀
当线程执行synchronized时,锁对象被多个线程竞争,或者已经被其他线程使用,会执行锁膨胀过程。申请一个重量锁,Monitor 此时锁对象的Mark Word会指向Monitor的地址,并且当前持有锁的线程存放到Owner,没有竞争到锁的线程存放到EntryList中进行阻塞,等待唤醒。
自旋优化
线程2获取锁时,如果失败并不会立刻进入阻塞状态,而是自旋重新获取锁,重复几次成功则执行临界资源 如果自旋失败,进入Monitor的EntryList进行等待。
偏向锁
java6开始使用偏向锁进行优化。 偏向锁的Mark Word 存放的是 线程id + 101 偏向锁默认开启的。锁对象的Mark Word中存放的不是锁记录的地址,而是线程的id。当无线程竞争的时候,就不会给同一个线程重复加锁。 偏向锁默认开启,但是有延迟,加上JVM参数取消延迟 -XX:BiasedLockingStartupDelay=0 禁用偏向锁: -XX:-UseBiasedLocking
<--需要引入包-->
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.16</version>
</dependency>
偏向锁偏向撤销 1、调用hashCode,将 线程id替换为 锁记录地址 2、其他线程竞争锁,锁会变为轻量级锁
批量重偏向 如果对象虽然被多个线程访问,但没有竞争,这时偏向了线程 T1 的对象仍有机会重新偏向 T2,重偏向会重置对象的 Thread ID;当撤销偏向锁达到阈值 20 次后,jvm 会这样觉得,我是不是偏向错了呢,于是会在给这些对象加锁时重新偏向至t2。因为前19次是轻量,释放之后为无锁不可偏向,但是20次后面的是偏向t2,释放之后依然是偏向t2。
批量撤销
- 当一个偏向锁如果撤销次数到达40的时候就认为这个对象设计的有问题;那么JVM会把这个对象所对应的类所有的对象都撤销偏向锁;并且新实例化的对象也是不可偏向的
- t1线程创建40个a对象,t2撤销偏向锁40次,t3开始加锁。t1 中40个对象都是偏向锁,t2撤销19次开始偏向t2,t3撤销19次后所有对象都会被JVM设置为不偏向,并且同一个类中创建的心类也不偏向。
锁消除
锁消除是发生在编译器级别的一种锁优化方式。
有时候我们写的代码完全不需要加锁,却执行了加锁操作。 jvm在运行的时候会优化代码进行锁消除
线程安全分析
设计模式
wait和notify
原理
线程获取锁之后但缺少运算条件,执行wait方法之后会释放锁并进入waitSet,进入waiting状态,当生产该条件的线程执行完毕后,会执行notify唤醒该线程进入entryList重新竞争锁。
API介绍
实例
public class TestWaitify {
private static final Object room =new Object();
private static Boolean hasOk = false;
private static Boolean hasMail = false;
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
synchronized(room){
while(!hasOk){
try {
System.out.println("waiting1");
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("线程一ok");
}
}).start();
new Thread(()->{
synchronized(room){
while(!hasMail){
System.out.println("waiting2");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("2Ok");
}
}
}).start();
TimeUnit.SECONDS.sleep(3);
hasMail = true;
synchronized (room){
System.out.println("notify");
room.notifyAll();
}
}
}
虚假唤醒 notify()没有正确唤醒准备好的线程,这种情况称之为虚假唤醒。
保护性暂停
原理
一个线程等待另一个线程的执行结果,如果结果不断从一个线程到另一个线程,需要使用消息队列。
实例
package com.example.thread;
import java.util.concurrent.TimeUnit;
/**
* @author thesky
* @date 2021/9/8 9:59
*/
public class Stop {
public static void main(String[] args) {
Guard guard = new Guard();
//线程1,等待线程2给结果
new Thread(()->{
try {
Object o = guard.get(2000);
if (o !=null){
System.out.println("线程1收到结果");
}else{
System.out.println("超时");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"线程1").start();
//线程2
new Thread(()->{
Object o = new Object();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程2生产");
guard.put(o);
},"线程2").start();
}
}
class Guard{
//结果
private Object response;
//获取结果
public Object get(long timeout) throws InterruptedException {
synchronized(this){
long begin = System.currentTimeMillis();
long passTime = 0;
/* 下面超时优化
while(response==null){
//没有结果,线程等待
System.out.println("没有结果,等待。。。。");
this.wait();
}
*/
while(response==null){
if (passTime > timeout){
break;
}
//没有结果,线程等待
long delay = timeout - passTime;
System.out.println("没有结果,等待。。。。");
this.wait(delay);
passTime = System.currentTimeMillis() - begin;
}
return response;
}
}
//生产结果
public void put(Object obj){
synchronized(this){
this.response = obj;
System.out.println("结果传递,唤醒线程");
this.notifyAll();
}
}
}
托管多个
package com.example.thread;
import java.util.Hashtable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* @author thesky
* @date 2021/9/8 9:59
*/
public class Stop {
public static void main(String[] args)
for (int i = 0; i < 3; i++) {
new People().start();
}
try {
System.out.println("休眠");
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (Integer id : MailBox.getIds()) {
new PostMan(id,id+"123456").start();
}
}
}
class Guard{
private int id;
public int getId(){
return id;
}
public Guard(int id){
this.id=id;
}
//结果
private Object response;
//获取结果
public Object get(long timeout) throws InterruptedException {
synchronized(this){
long begin = System.currentTimeMillis();
long passTime = 0;
while(response==null){
long delay = timeout - passTime;
if (delay <= 0){
break;
}
//没有结果,线程等待
System.out.println("没有结果,等待。。。。");
this.wait(delay);
passTime = System.currentTimeMillis() - begin;
}
return response;
}
}
//生产结果
public void put(Object obj){
synchronized(this){
this.response = obj;
System.out.println("结果传递,唤醒线程");
this.notifyAll();
}
}
}
class People extends Thread{
@Override
public void run() {
Guard guard = MailBox.creatBox();
try {
System.out.println("等待");
Object o = guard.get(5000);
if (o!=null){
System.out.println("get message!!!");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class PostMan extends Thread{
private int mailId;
private String message;
public PostMan(int mailId,String message){
this.mailId=mailId;
this.message=message;
}
@Override
public void run() {
Guard byId = MailBox.getById(mailId);
byId.put(message);
}
}
class MailBox{
private static Map<Integer,Guard> box=new Hashtable<>();
private static int id = 1;
//生成id
private static synchronized int generateId(){
return id++;
}
public static Guard creatBox(){
Guard guard = new Guard(generateId());
box.put(guard.getId(),guard);
return guard;
}
public static Set<Integer> getIds(){
return box.keySet();
}
public static Guard getById(int id){
return box.remove(id);
}
}
生产者消费者
实例——阻塞队列
package com.example.thread;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
/**
* @author thesky
* @date 2021/9/8 14:07
*/
public class Link {
public static void main(String[] args) {
MyQueue myQueue =new MyQueue(2);
new Thread(()->{
while (true){
myQueue.take();
}
},"123").start();
for (int i = 0; i < 3; i++) {
int tem = i;
new Thread(()->{
myQueue.push(tem+"hao");
}).start();
}
}
}
class MyQueue{
private Integer capcity;
private LinkedList<String> queues =new LinkedList<>();
public MyQueue(int capcity){
this.capcity = capcity;
}
public void push(String message){
synchronized (queues){
while (queues.size()==capcity){
try {
System.out.println("生产者等待");
queues.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("生成");
queues.addLast(message);
queues.notifyAll();
}
}
public String take(){
synchronized(queues){
while (queues.isEmpty()){
try {
System.out.println("消费者等待");
queues.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("消费");
queues.notifyAll();
String s = queues.removeFirst();
return s;
}
}
}
park &unpark
与wait和notify的区别
- wait和notify必须配合Monitor使用,但是unpark不需要
- park和unpark以线程为单位,阻塞唤醒线程,但是notify只能随机唤醒一个线程,notifyAll唤醒所有线程,精确度不同。
- 可以先unpark,但是不能先notify。
原理
每个线程都会关联一个Parker对象。里面有三个属性,counter = 0时,调用park,获得_mutex互斥锁,线程进入_cond条件变量阻塞,调用unpark时,counter=1,且最多为1,所以多次调用unpar无效。此时唤醒_cond条件变量中的线程,设置counter=0。 当先调用unpark时,设置counter = 1,之后调用park,此时counter = 1,无需阻塞,设置counter = 0
实例
package com.example.thread;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
/**
* @author thesky
* @date 2021/9/8 19:16
*/
public class Park {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
System.out.println("start");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("park");
LockSupport.park();
System.out.println("continue");
});
thread.start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("unpark");
LockSupport.unpark(thread);
}
}
固定运行顺序
多把锁
多个不想关的锁可以提高并发度,但是会出现死锁现象
死锁
死锁是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象
哲学家问题解决
活锁
两个线程改变彼此的停止条件导致无法停止,称之为活锁 解决方法: 增加随机睡眠时间
线程饥饿
线程优先级太低,导致很长时间不被cup调度导致无法完成。
ReenterantLock
特点
多个条件变量
private Condition condition1 = lock.newCondition(); //多变量
private ReentrantLock lock =new ReentrantLock(true); //开启公平锁,默认不填为非公平锁
public void reenlock(){
Thread thread1 = new Thread(() -> {
try {
while(true){
if (lock.tryLock()){
System.out.println("t1上锁成功");
if (!a){
try {
System.out.println("条件不满足");
condition1.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
System.out.println("条件满足");
break;
}
}
}
} finally {
lock.unlock();
System.out.println("t1释放锁");
}
});
thread1.start();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (lock.tryLock()) {
System.out.println("main获取锁");
a = true;
condition1.signalAll();
lock.unlock();
}
}
AQS
aqs是同步器框架,自定义同步器需要继承和重写方法
package demo.thread;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @author thesky
* @date 2021/10/20 14:55
*/
public class MyLock implements Lock {
class Mysyn extends AbstractQueuedSynchronizer{
@Override //尝试获取锁
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0,1)){
//给当前线程加上锁
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override //尝试释放锁
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override //是否独占锁
protected boolean isHeldExclusively() {
return getState() == 1;
}
public Condition newCondition(){
return new ConditionObject();
}
}
private Mysyn mysyn =new Mysyn();
@Override // 加锁
public void lock() {
mysyn.acquire(1);
}
@Override // 可打断加锁
public void lockInterruptibly() throws InterruptedException {
mysyn.acquireInterruptibly(1);
}
@Override //尝试加锁
public boolean tryLock() {
return mysyn.tryAcquire(1);
}
@Override //带超时的尝试加锁
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return mysyn.tryAcquireNanos(1,unit.toNanos(time));
}
@Override //释放锁
public void unlock() {
mysyn.release(1); //release会唤醒阻塞的线程,tryRelease不会唤醒
}
@Override //创建条件变量
public Condition newCondition() {
return mysyn.newCondition();
}
}
ReenterantLock实现原理,理解AQS
构造器
ReenterantLock 构造器默认实现的是非公平锁
线程加锁成功
非公平锁上锁,调用 compareAndSetState ,成功就将锁设置给当前线程,否则进入 acquire(1)方法。
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
加锁失败
acquire会在次尝试获得锁,依旧失败就 addWaiter()构造一个Node队列 Node双向链表,第一位 是null,成为哑元或者哨兵,用来占位并不关联线程。 这个链表的元素,前一个结点状态是-1,需要唤醒后一个节点,最后的节点状态是 0 当锁释放,队列中第一的线程可以抢占锁,如果此时队列外的线程先一步抢占到了锁,队列中的线程就抢占失败,因此称之为非公平锁。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
锁重入和释放
线程上锁,判断锁的owner是不是自己,如果是自己,就会让state+1,之后释放锁的时候每次 state-1
(不可)可打断原理
不可打断: 线程被打断之后,并不会立即返回打断标记,而是只有当该线程获得锁之后,才会返回打断标记(true),并执行打断操作。 可打断: 线程park之后阻塞,如果被打断,if语句就是true,就立刻抛出异常,该线程也就停止了。
公平锁
如果状态为 0 ,会先判断队列中有没有等待线程,如果没有就获取锁,如果有就不会竞争锁。
条件变量
当调用await后,如果该condition不存在,就创建一个Node,然后将线程存入,state设置为-3,当调用该变量的signal后,会将该节点 加入到 阻塞队列的队尾,并设置状态为0,之前的队尾节点状态设置为-1。
读写锁 ReentrantReadWriteLock
获取写锁之后,未释放写锁还可以获取读锁,称之为降级 获取读锁之后,未释放锁不能获得读锁,称之为升级 可重入
ReentrantReadWriteLock reentrantReadWriteLock =new ReentrantReadWriteLock();
try{
reentrantReadWriteLock.readLock().lock();
System.out.println("read.....");
}finally {
reentrantReadWriteLock.readLock().unlock();
}
try{
reentrantReadWriteLock.writeLock().lock();
System.out.println("写....");
//锁降级
reentrantReadWriteLock.readLock().lock();
System.out.println("read.....");
reentrantReadWriteLock.readLock().unlock();
}finally {
reentrantReadWriteLock.writeLock().unlock();
}
StampedLock
Semaphore
在一定时间内限制访问共享资源的线程数量
CountDownLatch
CyclicBarrier
第五章
可见性
原子性
volatile修饰的虽然可以保证可见性,但是不能保证原子性。
有序性
指令重排
jvm在执行代码的时候,会进行优化,因此会产生指令重排
禁用:
使用volatile,volatile指令会加入读写屏障
DCL(double checked locking)
保护共享资源
CAS(Check and Set)
AtomicInteger balance;
private volatile AtomicInteger money = new AtomicInteger(1000);
public void reenlock(){
int a = 100;
while (true){
int prev = money.get();
int next = prev - a;
if (money.compareAndSet(prev,next)) {
break;
}
}
}
原子整数 AtomicInteger
AtomicInteger money = new AtomicInteger(1000);
money.getAndIncrement(); //i++
money.incrementAndGet(); //++i
money.decrementAndGet(); //--i
money.addAndGet(10); //先加在读
原子引用
AtomicReference<BigDecimal>
ABA问题
当其他线程修改共享变量为 A -> B ->A时,另一个线程不会发觉共享变量被修改过
解决,加版本号
AtomicStampedReference<String> ref;
int stamp = ref.getStamp;
ref.compareAndSet(oldvalue,newvalue,stamp,stamp+1);
是否被更改过
AtomicMarkableReference
原子数组
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray<T>
字段更新器
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
AtomicReferenceFieldUpdater
原子累加器
@Test
public void test2(){
demo(
()->new AtomicLong(0),
adder->{
adder.getAndIncrement();
}
);
//AtomicLong 没有 LongAdder效率高
demo(
()->new LongAdder(),
adder->{
adder.increment();
}
);
}
private <T> void demo(Supplier<T> adder, Consumer<T> action){
T a = adder.get();
}
扩展,函数式接口
LongAdder源码
缓存行伪共享
cpu有独立的一级缓存 二级缓存,以及共享的三级缓存,三级缓存之下是内存。如果两个cpu处理同一个数据,会造成缓存行时效。 一个缓存行存放64个字节的 cell对象24个字节,一个缓存行可以存放两个cell对象 所以使用@sun.misc.Contended注解,为使用的对象前后各加128个字节大小的padding,使每一个缓存行只能存放一个cell对象,这样不会造成对方的缓存行时效。
Unsafe
非常底层的对象,只能通过反射获得
//反射获取
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true); //允许获取私有属性
Unsafe unsafe =(Unsafe) theUnsafe.get(null);
System.out.println(unsafe);
} catch (Exception e) {
e.printStackTrace();
}
小结
第七章
不可变对象
例如String 利用保护性拷贝的方式,在修改的时候,创建一个新的对象。但是创建对象过于频繁
享元模式(连接池)
使用场景:重用数量有限的同一类对象时 享元模式(Flyweight Pattern)主要用于减少创建对象的数量,以减少内存占用和提高性能。这种类型的设计模式属于结构型模式,它提供了减少对象数量从而改善应用所需的对象结构的方式。
public class Pool {
Logger logger = LoggerFactory.getLogger(Pool.class);
//连接池大小
private final int poolsize;
//连接对象数组
private Connection[] connections;
//连接状态数组
private AtomicIntegerArray states;
//初始化方法
public Pool(int size){
this.poolsize = size;
this.connections = new Connection[size];
this.states = new AtomicIntegerArray(new int[poolsize]);
for (int i =0;i<size;i++){
connections[i] = new MyConnection();
}
}
//借连接
public Connection borrow(){
while(true){
for (int i = 0; i < poolsize; i++) {
if (states.get(i)==0) {
if (states.compareAndSet(i,0,1)) {
logger.info("借出{}",connections[i]);
return connections[i];
}
}
}
synchronized(this){
try {
logger.info("满了");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//归还连接
public void free(Connection con){
for (int i = 0; i < poolsize; i++) {
if (connections[i]==con) {
states.set(i,0);
logger.info("回收{}",con);
synchronized (this){
this.notifyAll();
}
break;
}
}
}
}
class MyConnection implements Connection{
//实现
}
final原理
final会增加写屏障
小结
第八章,线程池
阻塞队列
线程池队列的作用,当线程池的线程无法处理过多的任务时,可以讲任务存放到队列中,等线程处理当前任务之后慢慢消费。所以队列功能有两个,存储任务,供线程获取任务。 在存储任务的时候,如果队列空间已满,此时可以用不同的策略处理,比如阻塞等待和带超时的等待或者其他策略。 线程获取任务的时候,如果队列是空的,可以让线程阻塞等待或者超时等待。
package demo.thread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author thesky
* @date 2021/10/16 17:23
*/
public class BlockQueue<T> {
Logger logger = LoggerFactory.getLogger(BlockQueue.class);
private Deque<T> queue = new ArrayDeque<>();
private ReentrantLock lock =new ReentrantLock();
private Condition fullWaitSet = lock.newCondition();
private Condition emptyWaitSet = lock.newCondition();
private int capcity;
public BlockQueue(int capcity){
this.capcity =capcity;
}
//带超时的阻塞获取
public T poll(long timeout, TimeUnit unit){
lock.lock();
try{
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()){
try {
if (nanos<=0){
return null;
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally{
lock.unlock();
}
}
// 阻塞获取任务
public T take(){
lock.lock();
try{
while (queue.isEmpty()){
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally{
lock.unlock();
}
}
//策略模式
//向队列中存放任务
public void put(T task){
lock.lock();
try{
while (queue.size()==capcity){
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
emptyWaitSet.signalAll();
}finally {
lock.unlock();
}
}
//带超时的put任务
public void putTimeout(T task,long timeout, TimeUnit unit){
lock.lock();
try{
long nanos = unit.toNanos(timeout);
while (queue.size()==capcity){
try {
if (nanos>0){
nanos = fullWaitSet.awaitNanos(nanos);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
fullWaitSet.signal();
}finally{
lock.unlock();
}
}
public void tryPut(RejectPolicy<T> rejectPolicy,T task){
lock.lock();
try{
//队列是否已满
if (queue.size()==capcity){
logger.info("满了");
rejectPolicy.reject(this,task);
}else{
logger.info("没满");
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
public int size(){
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}
线程池
线程池定义了 队列类型, 核心线程数以及救济线程数。任务是由线程池执行的,线程池获得任务之后,将任务分配给继承Thread类的线程处理对象(称线程),线程处理任务,当线程处理完之后,线程不能立刻结束,而是应该检查队列中是否有任务,如果有,则取出继续执行。当所有任务处理完之后,核心线程继续阻塞等待。救济线程结束移除。
package demo.thread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
/**
* @author thesky
* @date 2021/10/16 17:52
*/
public class MyPool {
Logger logger = LoggerFactory.getLogger(MyPool.class);
private BlockQueue<Runnable> taskQueue;
//线程集合
private HashSet<Worker> workers = new HashSet();
private int coreSize;
private long tiemout;
private TimeUnit timeUnit;
private RejectPolicy<Runnable> rejectPolicy;
public MyPool(int coreSize, long tiemout, TimeUnit unit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {
logger.info("构造线程池");
this.coreSize = coreSize;
this.tiemout = tiemout;
this.timeUnit = unit;
this.taskQueue = new BlockQueue<>(queueCapcity);
this.rejectPolicy =rejectPolicy;
}
public void execute(Runnable task){
//当任务数没有超过核心时直接执行
synchronized (workers){
if (workers.size()<coreSize){
logger.info("增加worker");
Worker worker =new Worker(task);
workers.add(worker);
worker.start();
}
//如果超过了,加入任务队列
else{
logger.info("{}存入队列",task);
//taskQueue.put(task);
//死等 带超时时间等待 放弃任务 抛出异常 调用线程自己执行
taskQueue.tryPut(rejectPolicy,task);
}
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task){
this.task =task;
}
@Override
public void run() {
//当task !=null
//执行完毕之后,去任务队列中获取
//最后结束
while (task!=null || (task = taskQueue.poll(tiemout,timeUnit))!=null){
try {
logger.info("执行{}",task);
task.run();
} catch (Exception e) {
e.printStackTrace();
}finally {
task = null;
}
}
synchronized (workers){
logger.info("worker移除{}",this);
workers.remove(this);
}
}
}
}
策略模式
策略模式可以灵活的调用 对象提供的策略,让用户自定处理逻辑
package demo.thread;
/**
* @author thesky
* @date 2021/10/17 15:42
*/
@FunctionalInterface
public interface RejectPolicy<T> {
void reject(BlockQueue<T> queue,T task);
}
//例子
MyPool myPool = new MyPool(1,1, TimeUnit.SECONDS,2,(queue,task)->{
//queue.put(task);
queue.poll(1,TimeUnit.SECONDS);
});
public void tryPut(RejectPolicy<T> rejectPolicy,T task){
lock.lock();
try{
//队列是否已满
if (queue.size()==capcity){
logger.info("满了");
rejectPolicy.reject(this,task); //再此调用传参
}else{
logger.info("没满");
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
线程池状态
java自带的线程池ThreadPoolExecutor
固定线程池
固定核心线程池
带缓冲线程池
全是救济线程池,队列只能存放1个任务,之后阻塞等待线程取走。
newSingleThreadExecutor
线程固定为1,该线程池队列处理任务先进先处理
线程池提交任务
execute(Runable command);
<T> Future submit(Callable<T> task);
invokeAll
invokeAny
关闭线程池
shutdown();
shutdownNow(); //没有执行的任务返回
工作线程模式
当任务类型不一样的时候,可以设置多个池分别处理
分配合理线程池大小
任务调度
ScheduledExecutorService
//延时执行任务
pool.schedule(()->{},延时时间,单位)
//定时执行任务
pool.scheduleAtFixedRate(()->{},延时时间,间隔时间,单位)
pool.scheduleWithFixedDelay(()->{},延时时间,间隔时间,单位)