Java并发编程
AQS简介及其原理
AbstractQueueSynchronize,抽象的队列式同步器,简称AQS,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类的实现都依赖于它,如常用的ReentrantLock、Semaphore、CountDownLatch等
原理
它维护了一个volatile int state共享资源和一个FIFO线程等待队列(多线程争用资源时会进入此队列)
资源state的访问方式:getState()、setState()、compareAndSetState()
AQS定义了两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同事执行,如Semaphore、CountDownLatch)
不同的自定义同步去争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队、唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现一下几种方法:
- isHeldExclusively():表示该线程是否正在独占资源,只有用到condition才需要实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,否则返回false
- tryRelease(int):独占方式。尝试释放资源,成功则返回true
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败,0表示成功,但没有剩余可用资源,大于0表示成功,且有剩余资源
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待节点则返回true,否则返回false
以ReentrantLock为例。state初始化为0,表示未锁定状态。当A线程lock()时,会调用tryAcquire()独占该锁,并将state+1,此时,其他线程在tryAcquire()时就会失败,直到A线程unloc()到state=0(即释放锁)为止,其它线程才有机会获取该锁。线程释放锁之前,也是能够重复获取此锁的,即state会累加,这就是可重入的概念。需要注意的是,获取多少次就要释放多少次,这样才能保证state回到0的状态
CountdownLatch。线程任务会分为N个子线程去执行,state也会初始化为N(N要与线程个数一致),这N个线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减一。等所有子线程都执行完后,即state=0,会unpark()调用主线程,主线程就会从await()返回,继续后续动作
一般来说,自定义同步器要么是独占的,要么是共享的,它们只需要实现tryAcquire(int)-tryRelease(int),或者tryAcquireShared(int)-tryReleaseShared(int)中的一种即可。但AQS也支持自定义同步器实现独占和共享两种方式,如ReentrantReadWriteLock。
底层使用node来封装每个等待获取资源的线程,除了包含线程本身还有其等待状态,通过waitStatus变量表示,有5种状态,>0表示节点已被取消,即线程不会再被调度,<0表示处于有效的等待状态,=0表示线程入队时的默认状态
并发问题的一些解决方案
无锁
- 局部变量
由于局部变量仅仅存在每个线程的工作内存中,而每个线程只能通过主内存进行通信,并不会影响自己内存中的数据,所以不会引发线程安全问题 - 不可变对象
不可变对象一经创建,对外的状态就不会改变,所以也不会引发线程安全问题 - ThreadLocal
本质上也是在每个线程创建自己的一个副本,每个线程的副本是互不影响的,所以也不会引发线程安全问题 cas原子类
CAS:比较并置换,只有当内存地址所对应的值和旧的预期值相等的时候,才会将内存地址对应的值更新为新的值,CAS底层使用dowhile循环
Atomic系列使用的底层使用的就是CAS操作来保证数据得原子性,从而保证线程安全,底层存在一个unsafe实例,从而提供硬件级别的原子操作,因为java无法直接访问到操作系统底层的的硬件,为此java使用native方法来拓展这部分功能,其中unsafe类就是一个操作入口,unsafe提供分配和释放内存、挂起和恢复线程、定位对象字段内存地址、修改对象的字段值、cas操作等功能。
加锁
- synchronized
- reentrantload
synchronized和reentrantload都是采用了悲观锁的策略
利用不可变对象解决并发问题
如果类中存在数据或集合,在提供给外部访问之前需要做防御性复制
案例路劲:F:\typora笔记\java并发编程\案例\04_案例实战:百万流量的短信网关系统,如何基于不可变模式解决并发问题?
实体类是用final修饰类和属性,将其改造为不可变对象
然后通过Collection提供的unmodifiableMap做防御性复制,将返回的对象进行封装。这样即便外部改变获取到对象的状态,也不会影响本身的数据
接着提供一个直接替换实例的方法,便于用来刷新整个服务商信息:
当短信服务商列表发生变化的时候,我们通过调用changeRouteInfo方法,更新数据库中的服务商信息,接着替换整个SmsRouter实例。
这样一来,SmsRouter在构造函数的时候会调用loadSmsInfoRouteMapFromDb方法将更新后的短信服务商列表从数据库中读取出来,然后更新到内存中。
到此为止,我们就通过不可变模式避免了短信网关中服务商列表更新的线程安全问题,这归功于短信服务上信息SmsInfo的不可变性,从而避免了修改SmsInfo在多线程环境下的线程安全问题,另外在SmsRouter获取服务上列表的过程中,对服务商列表进行了了防御性复制,避免外部其他的类对SmsRouter中的短信服务商列表的进行修改。
Guarded Suspension模式
即保护性暂挂模式,如果某个线程执行特定的操作前需要满足一定的条件,则在该条件为满足时将线程暂停运行(即暂挂线程,使其处于等待状态,直到该条件满足时才继续该线程的运行),可以大大降低多线程那个获取锁时锁冲突带来的性能开销
在多线程开发中,常常为了提高应用程序的并发性,会将一个任务分解为多个子任务交个多个线程并行执行,而多个线程之间相互协作时,一个线程需要等待另外的线程完成后继续下一步操作,而使用Guarded Suspension模式就可以解决这类问题。
public class GuardedQueue {
private final Queue<Integer> sourceList;
public GuardedQueue() {
this.sourceList = new LinkedBlockingQueue<>();
}
public synchronized Integer get() {
while (sourceList.isEmpty()) {
try {
wait(); // <--- 如果队列为null,等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return sourceList.peek();
}
public synchronized void put(Integer e) {
sourceList.add(e);
notifyAll(); //<--- 通知,继续执行 }
}
public class App {
public static void main(String[] args) {
GuardedQueue guardedQueue = new GuardedQueue();
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.execute(() -> {
guardedQueue.get();
}
);
Thread.sleep(2000);
executorService.execute(() -> {
guardedQueue.put(20);
}
);
executorService.shutdown();
executorService.awaitTermination(30, TimeUnit.SECONDS);
}
}
notify()是随机地通知等待队列中的任意一个线程,而notifyAll()会通知等待队列中的所有线程,随机通知的坏处就是可能有些线程会永远都收不到通知
轮询的方式,当并发量增大,获取锁的冲突增加的时候,这种方案就不合适了。因为在这种场景下,可能要循环上万次才能获得锁,非常消耗性能,互联网高并发下显然不合适
Guarded Suspension模式在ArrayBlockingQueue阻塞式队列中应用到:
- 当执行put操作时,如果队列满了,就加入到notFull的等待队列,此时若有任务通过take操作移除元素时,会唤醒notFull condition条件等待队列中的线程,执行 put操作
- 当执行take操作时,如果队列为空,就加入到notEmpty的等待队列中,此时若有任务通过put操作插入元素时,会唤醒notEmpty condition条件等待队列中的线程,执行take操作
两阶段终止模式
网络连接等原因造成线程获取不到网络资源
而可能导致网络连接出现问题的原因:
- 线程资源长期被占用着
- 已建立的网络连接,无法被正常释放
这些情况进而会导致服务器资源被白白占用,客户端的请求也会堆积的越来越多,无法被处理。为了避免事态进一步恶化,必须想办法释放线程资源:
使用另一个线程添加标记位,通过改变标记位状态,改变线程运行状态。这也要保证标记位的可见性,因此需要用volitale修饰
而既要保证线程灵活的切换运行状态,又要保证线程需处理完当前任务,就需要进行两阶段终止模式
关于阻塞状态中的线程如何终止,可以使用interrupt方法
shutdownNow方法用到了两阶段终止模式,它是线程池的关闭方法,其中advanceRunState(STOP)方法本质上就是一个线程标记位,改变线程池的状态,而interruptWorkers()方法,就是终止所有的线程,底层是interrupt操作
示例
原子性
在统计接口访问次数的时候,如果只是单纯的每次访问接口时一次,此时根据java内存模型来看,当多个线程同时时,由于从主内存读取的数据是一样的,当++成功后,刷新回主内存时,最后的结构也都是一致的,所以这是线程不安全的,这时需要引用原子性:一个操作或者多个操作,要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行
案例
基于保护性暂挂模式的智能安防的报警功能
初始化报警服务,ConnectingTask()通过blocker去唤醒,唤醒方法为condition.signal()。然后通过ScheduledThreadPoolExecutor创建线程池,每隔5s发送一次心跳到报警服务器,HeartbeatTask()为自定义心跳检查线程,连接断开了则重新连接,服务器的连接状态通过一个volatile修饰的变量来保证
public void init() {
// 报警服务于报警服务器连接的线程
Thread connectingThread = new Thread(new ConnectingTask());
connectingThread.start();
// 每个5s发送一次心跳到报警服务器
ScheduledThreadPoolExecutor heartbeatExecutor = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
private AtomicInteger index = new AtomicInteger();
/**
* @Author 容创伟
* @Description 构造一个新线程。实现还可以初始化优先级、名称、守护进程状态、线程组等。
* @Date 9:40 2022/3/11
* @Param [r]
* @return
**/
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread();
thread.setName("heartbeat-thread-" + index);
// 当jvm退出的时候退出,当唯一运行的线程为守护线程时,jvm退出,此方法将线程设置为守护线程
thread.setDaemon(true);
return thread;
}
});
// 每5s执行一次
heartbeatExecutor.scheduleAtFixedRate(new HeartbeatTask(), 5000, 2000, TimeUnit.MILLISECONDS);
}
上报信息给服务器。构建一个发送信息的线程,通过blocker执行该线程,判断是否持有锁,不满足则等待,满足则继续执行,通过Runnable.call()方式,因为GuardedAction实例是通过实现Runnable接口的方式构建的,所以直接调用
/**
* 上报报警信息给报警服务
*/
public void sendAlarm(AlarmInfo alarmInfo) throws Exception {
// 构建guardedAction
GuardedAction<Void> guardedAction = new GuardedAction<Void>(agentConnected) {
@Override
public Void call() throws Exception {
// 执行目标函数
doSendAlarm(alarmInfo);
return null;
}
};
// 通过blocker执行目标
blocker.callWithGuard(guardedAction);
}
Blocker中通过callWithGuard判断是否建立连接,否,线程循环等待,是则执行目标方法
@Override
public <V> V callWithGuard(GuardedAction<V> guardedAction) throws Exception {
lock.lockInterruptibly();
try {
// 判断条件是否满足,满足则执行目标动作,不满足则进入到条件等待队列中
final Predicate predicate = guardedAction.predicate;
while (!predicate.evaluate()) {
System.out.println("alarm connecting alarm system,thread wait");
// 条件不满足
condition.await();
// 当线程从条件等待队列欢迎后,获取锁成功,然后再次尝试去判断条件是否满足
}
// 条件满足,执行目标内容
System.out.println("alarm connected execute call");
return guardedAction.call();
} finally {
lock.unlock();
}
}
报警信息发送器注册到报警管理器中,报警管理器收到下线命令,通知报警信息管理器也跟着下线,同时报警管理器处理好信息后才下线
/**
* 当多个线程共享一个terminationToken实例时,通过队列的方式来记录所有的停止线程,减少锁的方式来实现
*/
private final Queue<WeakReference<Termination>> coordinatedThreads;
/**
* 注册一个线程到terminationToken上
*
* @param thread 线程
*/
public void register(Termination thread) {
coordinatedThreads.add(new WeakReference<>(thread));
}
通过队列来记录报警信息发送器,当报警管理器通过interrupt()关闭后,报警信息发送器的线程通过死循环的方式来判断报警管理器是否关闭,是否还有任务执行
/**
* 终止线程执行
*/
@Override
public void run() {
Exception ex = null;
try {
for (; ; ) {
// 死循环 先判断中断实例的标识是否为true,同时没有未完成的任务
System.out.println("告警线程执行,此时中断标志位:" + terminationToken.isToShutdown() + ", 未完成的任务数量:" + terminationToken.reservations.get());
if (terminationToken.isToShutdown() && terminationToken.reservations.get() <= 0) {
// 线程已经终止了 中断线程退出
System.out.println("中断标志为true,未完成任务为0,告警线程退出");
break;
}
// 执行具体的业务逻辑
doRun();
}
} catch (Exception e) {
// 中断线程可能给调用interrupt被中断
ex = e;
if (e instanceof InterruptedException) {
// 中断线程响应退出
System.out.println("中断响应:" + e);
}
} finally {
try {
System.out.println("告警线程停止,回调终止后的清理工作");
doCleanup(ex);
} finally {
// 通知terminationToken管理的所有线程实例进行退出
System.out.println("标志实例对象中一个线程终止,通知其他线程终止");
terminationToken.notifyThreadTermination(this);
}
}
}
/**
* 通知terminationToken中所有实例,有一个线程停止了,通知其他线程也停止
*
* @param thread
*/
public void notifyThreadTermination(Termination thread) {
WeakReference<Termination> wrThread;
Termination otherThread;
while ((wrThread = coordinatedThreads.poll()) != null) {
otherThread = wrThread.get();
if (otherThread != null && otherThread != thread) {
otherThread.terminate();
}
}
}
Promis模式(异步编程模式)
Executor需在Promisor的create()方法中执行。create()的返回值就是Promise。Result则是Promise中get()方法的返回值
非异步模式下可通过join方法串行执行
步骤
Promisor类对外暴露一个方法create(),一旦执行了这个create方法,两件事就会发生:
执行器Executor开始异步执行任务
Promisor会创建并返回一个Promise,立刻返回,不会阻塞程序运行
可通过FutureTask来启动异步线程
案例
个人云盘和网盘之间进行数据同步的时候,需要先去建立网络连接然后再扫描本地文件的内容同步文件到云盘中,建立网络连接和扫码文件的操作串行化太慢,这时可使用异步模式
需网络连接完成和本地文件扫码完成才算准备完毕
生产者和消费者模式
生产者和消费者线程在实际的开发中一般选择jdk自带的线程池来实现,至于中间的任务队列,由于多线程并发操作需要线程安全问题,所以一般使用juc自带的并发集合进行封装,还有就是队列一般也会做长度限制,队列满了之后生产者线程需要阻塞,队列空了之后消费者线程需要阻塞,可以直接选择jdk自带的线程安全的有界的阻塞队列
public class Test {
public static void main(String[] args) {
// 生产者线程池
ExecutorService producerThreads = Executors.newFixedThreadPool(3);
// 消费者线程池
ExecutorService consumerThreads = Executors.newFixedThreadPool(2);
// 任务队列,长度为10
ArrayBlockingQueue<Task> taskQueue = new ArrayBlockingQueue<Task>(10);
// 生产者提交任务
producerThreads.submit(() -> {
try {
taskQueue.put(new Task("任务"));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 消费者处理任务
consumerThreads.submit(() -> {
try {
Task task = taskQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
static class Task {
// 任务名称
private String taskName;
public Task(String taskName) {
this.taskName = taskName;
}
}
}
过饱问题
消费者消费速度比生产者生产速度慢,经过某段时间导致阻塞队列堆满,从而造成客户端响应时间过长
解决办法无非就增大阻塞队列大小(需考虑业务容忍的最长响应时间)、限制生产者生产速度(严重的资源浪费)、增加消费者线程(成本高)
JUC
结构
tools:工具类,又叫信号量三组工具类,含有:
- CountDownLatch(闭锁),是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待
- CyclicBarrier(栅栏),之所以叫barrier,是因为这也是一个同步辅助类,允许一组线程互相等待,知道到达某个公共屏障点,并且在释放等待线程后可以重用
- Semaphore(信号量),是一个计数信号量,它的本质是一个共享锁。信号量维护了一个信号量许可集。线程可以通过调用acquire()方法来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可,否则线程必须等待,直到有可用的许可为止。线程可以用过releasa()来释放它所持有的信号量许可。
executor(执行者):是Java里面线程池的顶级接口,但它只是一个执行线程的工具,真正的线程池接口是ExecutorService,里面包含的类有:
- ScheduledExecutorService,解决那些需要任务重复执行的问题
- ScheduledThreadPoolExecutor,周期性任务调度的类实现
- atomic(原子性包),是jdk提供的一组原子操作类,包含有AtomicBoolean、AtomicInteger、AtomicIntegerArray等原子变量类,它们的实现原理大多是持有它们各自的对应的类型变量value,而且被volatile关键字修饰,保证每次线程要使用这些变量都会拿到最新的值。
locks(锁包):是JDK提供的锁机制,相比synchronized关键字来进行同步锁,功能更加强大,它为锁提供了一个框架,该框架允许更灵活的使用锁,包含的实现类有:
- ReentrantLock,独占锁,是指只能被独自占领,即同一个时间点只能被一个线程锁获取到的锁
- ReentrantReadWriteLock,它包括子类ReadLock(共享锁)和WriteLock(独占锁)。
- LockSupport,它具备阻塞线程和接触阻塞线程的功能,并且不会引发死锁
collections(集合类):主要是提供线程安全的集合,比如:
- ArrayList对应的高并发集合类是CopyOnWriteArrayList
- HashSet对应的高并发集合类是CopyOnWriteArraySet
- HashMap对应的高并发集合类是ConcurrentHashMap等等
集合类不安全
List不安全
//java.util.ConcurrentModificationException 并发修改异常!
public class ListTest {
public static void main(String[] args) {
//并发下 arrayList 是不安全的
/**
* 解决方案
* 1. 使用vector解决
* 2. List<String> arrayList = Collections.synchronizedList(new ArrayList<>());
* 3. List<String> arrayList = new CopyOnWriteArrayList<>();
*/
//copyOnWrite 写入时复制 COW 计算机程序设计领域的一种优化策略
//多个线程调用的时候, list, 读取的时候固定的,写入的时候,可能会覆盖
//在写入的时候避免覆盖造成数据问题
//CopyOnWriteArrayList 比 vector牛逼在哪里
//读写分离
List<String> arrayList = new CopyOnWriteArrayList<>();
for (int i = 0; i < 100; i++) {
new Thread(()->{
arrayList.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(arrayList);
},String.valueOf(i)).start();
}
}
}
Set不安全
/**
* 同理可证
*/
public class SetTest {
public static void main(String[] args) {
// Set<String> set = new HashSet<>();
//如何解决hashSet线程安全问题
//1. Set<String> set = Collections.synchronizedSet(new HashSet<>());
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 0; i < 100; i++) {
new Thread(() -> {
set.add(UUID.randomUUID().toString().substring(0, 5));
System.out.println(set);
}, String.valueOf(i)).start();
}
}
}
写时复制思想
指的是实行读写分离,如果执行的是写操作,则复制一个新集合,在新集合内添加或者删除元素。待一切修改完成之后,在将原集合的引用指向新的集合,而原集合没有任何引用指向它,就会自动被gc回收。好处就是,可以高并发地对COW进行读和遍历操作,而不需要加锁,因为当前集合不会添加任何操作
HashMap不安全
常用的辅助类
CountdownLatch
允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助
CountdownLatch用给定的技术初始化。await方法阻塞,直到由于countDown()方法的调用而导致当前计数达到零,之后所有等待线程被释放,并且任何后续的await调用立即返回。这是一个一次性的现象,即计数无法重置,如需重置,则需考虑使用CycliBarrier
//计数器
public class demo02 {
public static void main(String[] args) throws InterruptedException {
//相当于计数器
CountDownLatch countDownLatch = new CountDownLatch(5);
//计数器总数是5,当减少为0,任务才继续向下执行
for (int i = 1; i <6 ; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"==>start");
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
System.out.println("main线程继续向下执行");
}
}
Semaphore
一个计数信号量。在概念上,信号量维持一组许可证。如果有必要,每个acquire()都会阻塞,知道许可证可用,然后才能使用它。每个release()添加许可证,潜在地释放阻塞获取放。但是,没有使用实际的许可证对象;Semaphore只保留可用数量的计数,并相应地执行。
信号量通常用于限制线程数,而不是访问某些资源
public class SemaphoreTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 6; i++) {
int temp = i;
new Thread(()->{
try {
semaphore.acquire(); //获取
System.out.println(temp + "号车抢到车位");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); //释放
System.out.println(temp + "号车离开车位");
}
}).start();
}
}
}
锁知识
死锁
在很多互联网公司的交易系统中,会出现这样的changing,账户A转账给账户B,同事账户B也转账给账户A,两个账户都需要锁住余额,所以通常会申请两把锁。转账时,先锁住自己的账户,并获取对方的锁,保证同一时刻只能由一个线程去执行转账。但这时可能就会出现,对方给我转账,同时我也给对方转账,那么双方都持有自己的锁,且尝试去获取对象的锁,这就造成可能一直申请不到对方的锁,循环等待发生死锁的问题。如此,用户体验肯定是极差的
所谓的死锁,就是两个或两个以上的线程在执行过程中,互相持有对象所需要的资源,导致这些线程处于等待状态,无法继续执行。若无外力作用下,它们都将无法继续执行下去,就进入了永久的阻塞状态。
如上图所示:线程01获得账户A的锁,且同时去尝试获取账户B的锁,但是,线程02已经获得了账户B的锁,所以线程01只能等待,同样,线程02无法获得账户A的锁(已经被线程01获取),也只能等待。于是,线程01和02都在等待对方持有的锁,且会无期限的等待下去,这就是死锁的产生
public class DeadLock {
public static String obj1 = "obj1";
public static String obj2 = "obj2";
public static void main(String[] args) {
Thread a = new Thread(new Lock1());
Thread b = new Thread(new Lock2());
a.start();
b.start();
}
}
class Lock1 implements Runnable {
@Override
public void run() {
try {
System.out.println("Lock1 running");
while (true) {
synchronized (DeadLock.obj1) {
System.out.println("Lock1 lock obj1");
Thread.sleep(3000);
synchronized (DeadLock.obj2) {
System.out.println("Lock1 lock obj2");
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
class Lock2 implements Runnable {
@Override
public void run() {
try {
System.out.println("Lock2 running");
while (true) {
synchronized (DeadLock.obj2) {
System.out.println("Lock2 lock obj2");
Thread.sleep(3000);
synchronized (DeadLock.obj1) {
System.out.println("Lock2 lock obj1");
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
输出结果:
Lock1 running
Lock1 lock obj1
Lock2 running
Lock2 lock obj2
并没有打印Lock1 lock obj2和Lock2 lock obj1,是因为发生了死锁现象
死锁产生的原因
虽然进程在运行过程中,可能发生死锁,但死锁的繁盛也必须具备一定的条件死锁的发生必须具备一下四个必要条件:
- 互斥,共享资源X和Y只能被一个线程占用
- 占有且等待,线程01已经取得共享资源X,在等待共享资源Y的时候,不释放共享资源X
- 不可抢占,其他线程不能强行抢占线程01占有的资源
- 循环等待,线程01等待线程02占有的资源,线程02等在线程01占有的资源,就是循环等待。
如何避免死锁
应用程序一旦发生死锁,并没有什么特别好的办法解决,通过情况下,我们只能重启系统,因此,解决死锁的最后的办法就是避免死锁:
首先,互斥是没有办法避免的;对于占用且等待这个条件,可以一次性申请所有的资源,这样就不存在等待了。
对于不可抢占,占用部分资源的进程进一步申请其他资源时,拖过申请不到,可以在一定时间后,主动释放它占用的资源,这样就解决了不可抢占这个条件
对于循环等待,我们可以靠按次序申请资源来预防。所谓按序申请,就是给资源设定顺序,申请的时候可以先申请序号小的资源,在申请序号大的资源,这样资源线性化后,自然就不存在循环等待了
破坏占用且等待条件
我们要破坏占用且等待,就是某个线程一次性申请占有所有的资源
public class DeadLock02 {
public static void main(String[] args) {
Account a = new Account();
Account b = new Account();
a.transfer(b,100);
b.transfer(a,200);
}
static class Allocator {
private List<Object> als = new ArrayList<Object>();
// 一次性申请所有资源
synchronized boolean apply(Object from, Object to){
if(als.contains(from) || als.contains(to)){
return false;
} else {
als.add(from);
als.add(to);
}
return true;
}
synchronized void clean(Object from, Object to){
als.remove(from);
als.remove(to);
}
}
static class Account {
private Allocator actr = DeadLock02.getInstance();
private int balance;
void transfer(Account target, int amt){
while(!actr.apply(this, target));
try{
synchronized(this){
System.out.println(this.toString()+" lock obj1");
synchronized(target){
System.out.println(this.toString()+" lock obj2");
if (this.balance > amt){
this.balance -= amt;
target.balance += amt;
}
}
}
} finally {
//执行完后,再释放持有的资源
actr.clean(this, target);
}
}
}
private void Allocator(){};
private static class SingleTonHoler{
private static Allocator INSTANCE = new Allocator();
}
public static Allocator getInstance(){
return SingleTonHoler.INSTANCE;
}
}
得到的结果
Account@3cd1f1c8 lock obj1
Account@3cd1f1c8 lock obj2
Account@3a4afd6d lock obj1
Account@3a4afd6d lock obj2
破坏不可抢占条件
破坏不可抢占条件,需要发生死锁的线程能够主动释放它占有的资源,但使用synchronized是做不到的。原因是synchronized申请不到资源时,线程直接进入阻塞状态,而线程进入了阻塞状态也就没有办法释放它占有的资源了。不过jdk中concurrent包提供了lock解决这个问题
显式视同lock类中的定时tryLock功能来代替内置锁机制,可以检测死锁和从死锁中恢复过来。使用内置锁的线程获取不到锁会被阻塞,而显示锁可以指定一个超时实现,在等待超过该时间后tryLock就会返回一个失败信息,也会释放其拥有的资源
public class DeadLock {
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
public static void main(String[] args) {
Thread a = new Thread(new Lock1());
Thread b = new Thread(new Lock2());
a.start();
b.start();
}
static class Lock1 implements Runnable {
@Override
public void run() {
try {
System.out.println("Lock1 running");
while (true) {
if (lock1.tryLock(1, TimeUnit.MILLISECONDS)) {
System.out.println("Lock1 lock obj1");
//Thread.sleep(3000);
if (lock2.tryLock(1, TimeUnit.MILLISECONDS)) {
System.out.println("Lock1 lock obj2");
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock1.unlock();
lock2.unlock();
}
}
}
破坏循环等待
破坏这个条件,只需要对系统中的资源进行统一编号,进程可在任何时刻提出资源申请,必须按照资源的编号顺序提出。这样就能保证系统不会出现死锁,这就是资源有序分配法
class Account {
private int id;
private int balance;
void transfer(Account target, int amt){
Account left = this;
Account right = target;
if (this.id > target.id) {
left = target;
right = this;
}
synchronized(left){
synchronized(right){
if (this.balance > amt){
this.balance -= amt;
target.balance += amt;
}
}
}
}
}
Synchronized
Synchronized是Java中的关键字,是一种同步锁
作用域
- 修饰一个代码块,被修饰的代码块称为同步语句块,其作用的范围是大括号{}括起来的代码,作用的对象是调用这个代码块的对象
- 修饰一个方法,被修饰的方法称为同步方法,其作用的范围是整个方法,作用的对象是调用这个方法的对象
- 修个一个静态方法,其作用的范围是整个静态方法,作用的对象是这个类的所有对象
- 修饰一个类,其作用的范围是synchronized后面括起来的部分,作用主的对象是这个类的所有对象
互斥锁
多个线程并发竞争锁的时候,没有抢到锁的线程会进入休眠状态,即sleep-waiting。当锁被释放的时候,处于休眠状态的一个线程会再次获取到锁
缺点就是这一系列过程需要线程切换,需要执行很多CPU指令。如果CPU执行线程切换的时间比锁占用的时间还长,那么可能还不如使用自旋锁。因此互斥锁适用于锁占用时间长的场合。
CLH锁
是一种基于逻辑队列非线程饥饿的一种自旋公平锁。
原理
首先通过尾结点指针来构建等待线程的逻辑队列,因此能确保线程先到先服务的公平性,可以说这个尾指针就是构建逻辑队列的桥梁。此外,这个尾结点指针是原子引用类型,避免了多线程并发操作的线程安全性问题。
构建完之后通过等待锁的每个线程在自己的某个变量上自旋等待,这个变量将由前一个线程写入。由于某个线程获取锁操作时总是通过尾结点指针获取到前一线程写入的变量,而尾结点指针又是原子引用类型,因此确保了这个变量获取出来总是线程安全的。
Lock锁
Lock l = ....;
l.lock();//加锁
try{
...
}finally{
l.unlock;//释放锁
}
ReentrantLock(可重用锁)
public ReentrantLock() {
sync = new NonfairSync(); //无参默认非公平锁
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();//传参为true为公平锁
}
公平锁:先来后到,一定要排队,不允许插队
非公平锁:可以插队(ReentrantLock默认为非公平锁)
ReentrantReadWriteLock
ReentrantReadWriteLock.ReadLock;//读锁
ReentrantReadWriteLock.WriteLock;//写锁
Condition
Condition是个接口,用来精确的通知和唤醒线程。Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition();
Condition的基本的方式就是await()和signal(),且调用这些方法必须在lock保护之内
而Condition中的await()方式对应Object的wait()方法;signal()方法对应Object的notify()方法;signalAll()对应Object的notifyAll();
生产者消费者模式(完成加一减一各一次操作):
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class PC {
public static void main(String[] args) {
a a = new a();
new Thread(()->{
for (int i =0;i<10;i++){
a.increment();
}
},"A").start();
new Thread(()->{
for (int i =0;i<10;i++){
a.decrease();
}
},"B").start();
}
}
class a{
public int nummber=0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void increment(){
lock.lock();
try {
while(nummber!=0){
condition.await();
}
nummber++;
System.out.println(Thread.currentThread().getName()+">>"+nummber);
condition.signalAll();
}
catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrease(){
lock.lock();
try {
while(nummber!=1){
condition.await();
}
nummber--;
System.out.println(Thread.currentThread().getName()+">>"+nummber);
condition.signalAll();
}
catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
分布式锁
为什么要使用分布式锁
关于分布式锁,可能绝大部分人都会或多或少涉及到。 我举二个例子: 场景一:从前端界面发起一笔支付请求,如果前端没有做防重处理,那么可能在某一个时刻会有两笔一样的单子同时到达系统后台。
场景二:在App中下订单的时候,点击确认之后,没反应,就又点击了几次。在这种情况下,如果无法保证该接口的幂等性,那么将会出现重复下单问题。 在接收消息的时候,消息推送重复。如果处理消息的接口无法保证幂等,那么重复消费消息产生的影响可能会非常大。
类似这种场景,我们有很多种方法,可以使用幂等操作,也可以使用锁的操作。 我们先来解释一下什么是幂等操作: 所谓幂等,简单地说,就是对接口的多次调用所产生的结果和调用一次是一致的。扩展一下,这里的接口,可以理解为对外发布的HTTP接口或者Thrift接口,也可以是接收消息的内部接口,甚至是一个内部方法或操作。
在分布式环境中,网络环境更加复杂, 因前端操作抖动、网络故障、消息重复、响应速度慢等原因,对接口的重复调用概率会比集中式环境下更大,尤其是重复消息在分布式环境中很难避免。Tyler Treat也在《You Cannot Have Exactly-Once Delivery》一文中提到:
Within the context of a distributed system, you cannot have exactly-once message delivery.(在分布式系统的上下文中,您不能只进行一次消息传递。)
分布式环境中,有些接口是天然保证幂等性的,如查询操作。有些对数据的修改是一个常量,并且无其他记录和操作,那也可以说是具有幂等性的。其他情况下,所有涉及对数据的修改、状态的变更就都有必要防止重复性操作的发生。通过间接的实现接口的幂等性来防止重复操作所带来的影响,成为了一种有效的解决方案。
redis实现分布式锁
一般在项目中引入Redisson的依赖,然后基于Redis实现分布式锁的加锁与释放锁。
RLock lock = redisson. getLock(“myLock” );
lock.lock();
lock.unlock();
此外,Redisson还支持redis单实例、redis哨兵、redis cluster、redis master-slave等各种部署架构,都可以给你完美实现。
上述Redis分布式锁的缺点
其实上面那种方案最大的问题,就是如果你对某个redis master实例,写入了myLock这种锁key的value,此时会异步复制给对应的master slave实例。
但是这个过程中一旦发生redis master宕机,主备切换,redis slave变为了redis master。
接着就会导致,客户端2来尝试加锁的时候,在新的redis master上完成了加锁,而客户端1也以为自己成功加了锁。
此时就会导致多个客户端对一个分布式锁完成了加锁。
这时系统在业务语义上一定会出现问题,导致各种脏数据的产生。
所以这个就是redis cluster,或者是redis master-slave架构的主从异步复制导致的redis分布式锁的最大缺陷:在redis master实例宕机的时候,可能导致多个客户端同时完成加锁。
用分布式锁如何解决库存超卖问题?
同一个锁key,同一时间只能有一个客户端拿到锁,其他客户端会陷入无限的等待来尝试获取那个锁,只有获取到锁的客户端才能执行下面的业务逻辑。
有没有其他方案可以解决库存超卖问题?
当然有啊!比如悲观锁,分布式锁,乐观锁,队列串行化,异步队列分散,Redis原子操作,等等,很多方案,我们对库存超卖有自己的一整套优化机制。
分布式锁的方案在高并发场景下的问题
分布式锁一旦加了之后,对同一个商品的下单请求,会导致所有客户端都必须对同一个商品的库存锁key进行加锁。
比如,对iphone这个商品的下单,都必对“iphone_stock”这个锁key来加锁。这样会导致对同一个商品的下单请求,就必须串行化,一个接一个的处理。
大家再回去对照上面的图反复看一下,应该能想明白这个问题。
假设加锁之后,释放锁之前,查库存 -> 创建订单 -> 扣减库存,这个过程性能很高吧,算他全过程20毫秒,这应该不错了。
那么1秒是1000毫秒,只能容纳50个对这个商品的请求依次串行完成处理。
比如一秒钟来50个请求,都是对iphone下单的,那么每个请求处理20毫秒,一个一个来,最后1000毫秒正好处理完50个请求。
大家看一眼下面的图,加深一下感觉。
所以看到这里,大家起码也明白了,简单的使用分布式锁来处理库存超卖问题,存在什么缺陷。
缺陷就是同一个商品多用户同时下单的时候,会基于分布式锁串行化处理,导致没法同时处理同一个商品的大量下单的请求。
这种方案,要是应对那种低并发、无秒杀场景的普通小电商系统,可能还可以接受。
因为如果并发量很低,每秒就不到10个请求,没有瞬时高并发秒杀单个商品的场景的话,其实也很少会对同一个商品在一秒内瞬间下1000个订单,因为小电商系统没那场景。
如何对分布式锁进行高并发优化?
好了,终于引入正题了,那么现在怎么办呢?
面试官说,我现在就卡死,库存超卖就是用分布式锁来解决,而且一秒对一个iphone下上千订单,怎么优化?
现在按照刚才的计算,你一秒钟只能处理针对iphone的50个订单。
其实说出来也很简单,相信很多人看过java里的ConcurrentHashMap的源码和底层原理,应该知道里面的核心思路,就是分段加锁!
把数据分成很多个段,每个段是一个单独的锁,所以多个线程过来并发修改数据的时候,可以并发的修改不同段的数据。不至于说,同一时间只能有一个线程独占修改ConcurrentHashMap中的数据。
另外,Java 8中新增了一个LongAdder类,也是针对Java 7以前的AtomicLong进行的优化,解决的是CAS类操作在高并发场景下,使用乐观锁思路,会导致大量线程长时间重复循环。
LongAdder中也是采用了类似的分段CAS操作,失败则自动迁移到下一个分段进行CAS的思路。
其实分布式锁的优化思路也是类似的,之前我们是在另外一个业务场景下落地了这个方案到生产中,不是在库存超卖问题里用的。
但是库存超卖这个业务场景不错,很容易理解,所以我们就用这个场景来说一下。大家看看下面的图:
其实这就是分段加锁。你想,假如你现在iphone有1000个库存,那么你完全可以给拆成20个库存段,要是你愿意,可以在数据库的表里建20个库存字段,比如stock_01,stock_02,类似这样的,也可以在redis之类的地方放20个库存key。
总之,就是把你的1000件库存给他拆开,每个库存段是50件库存,比如stock_01对应50件库存,stock_02对应50件库存。
接着,每秒1000个请求过来了,好!此时其实可以是自己写一个简单的随机算法,每个请求都是随机在20个分段库存里,选择一个进行加锁。
bingo!这样就好了,同时可以有最多20个下单请求一起执行,每个下单请求锁了一个库存分段,然后在业务逻辑里面,就对数据库或者是Redis中的那个分段库存进行操作即可,包括查库存 -> 判断库存是否充足 -> 扣减库存。
这相当于什么呢?相当于一个20毫秒,可以并发处理掉20个下单请求,那么1秒,也就可以依次处理掉20 50 = 1000个对iphone的下单请求了。
一旦对某个数据做了分段处理之后,有一个坑大家一定要注意:就是如果某个下单请求,咔嚓加锁,然后发现这个分段库存里的库存不足了,此时咋办?
*这时你得自动释放锁,然后立马换下一个分段库存,再次尝试加锁后尝试处理。这个过程一定要实现。
分布式锁并发优化方案有没有什么不足?
不足肯定是有的,最大的不足,大家发现没有,很不方便啊!实现太复杂了。
- 首先,你得对一个数据分段存储,一个库存字段本来好好的,现在要分为20个分段库存字段;
- 其次,你在每次处理库存的时候,还得自己写随机算法,随机挑选一个分段来处理;
- 最后,如果某个分段中的数据不足了,你还得自动切换到下一个分段数据去处理。
这个过程都是要手动写代码实现的,还是有点工作量,挺麻烦的。
不过我们确实在一些业务场景里,因为用到了分布式锁,然后又必须要进行锁并发的优化,又进一步用到了分段加锁的技术方案,效果当然是很好的了,一下子并发性能可以增长几十倍。
synchronized和lock锁的区别
synchronized内置的java关键字,Lock是一个java类
synchronized无法判断获取锁的状态,Lock可以判断是否获取到了锁
synchronized会自动释放锁,Lock必须要手动释放锁,如果不释放就会产生死锁
synchronized也是可重入锁,但不可以终断,非公平;Lock锁,可重入的,可以判断锁,可自行设置是否公平
synchronized适合锁少量的代码同步问题,Lock适合锁大量的同步代码
线程知识
进程:每个进程都有独立的代码和数据空间(进程上下文),进程间的切换会有较大的开销,一个进程包含1-n个线程,进程是资源分配的最小单位。进程和线程一样分为五个简短:创建、就绪、运行、阻塞、终止。多进程是指操作系统能同时运行多个程序
线程:同一类线程共享代码和数据空间,每个线程有独立的运行栈和程序计数器(PC),线程切换开销小,线程是cpu调度的最小单位。多线程是指在同一个程序中有多个顺序流在执行。
线程状态
一个枚举类
new(初始化)
实现runnable接口和继承Thread可以得到一个线程类,将其实例new出来,线程就进入了初始化的状态
public class ThreadTest extends Thread {
private String name;
public ThreadTest(String name) {
this.name = name;
} public void run() {
for (int i = 1; i < 11; i++) {
System.out.println(Thread.currentThread().getName() + " thread " + i);
}
}
public static void main(String[] args) {
ThreadTest t1 = new ThreadTest("thread1");
ThreadTest t2 = new ThreadTest("thread2");
ThreadTest t3 = new ThreadTest("thread3");
t1.start();
t2.start();
t3.start();
}
}
public class TestRunnable implements Runnable {
public void run() {
for(int i=1;i<11;i++){
System.out.println(Thread.currentThread().getName()+" thread "+ i);
}
}
public static void main(String[] args) {
Thread t1=new Thread(new TestRunnable(),"thread1");
Thread t2=new Thread(new TestRunnable(),"thread2");
Thread t3=new Thread(new TestRunnable(),"thread3");
t1.start();
t2.start();
t3.start();
}
}
runnable(可运行状态)
READY(就绪)
该状态的线程位于可运行线程池中,此线程池中的线程都在等待获取CPU的使用权
就绪状态的意思就是只要调度程序没有选中线程,那么该线程永远是就绪状态
当调用线程的start方法,此线程就进入就绪状态
当前线程sleep方法结束,其他线程join结束,等待用户操作完毕,某个线程拿到对象锁,这些线程也将进入就绪状态
当前线程时间片用完了,调用当前线程的yield方法,当前线程也会进入就绪状态
锁池里的线程拿到对象锁后,进入就绪状态
running(运行中)
线程调度程序从可运行池中选择一个线程作为当前线程时线程所处的状态。这也是线程进入运行状态的唯一一种方式。
blocked(阻塞)
运行中的线程因某种原因放弃了CPU的使用权,暂时停止运行。知道线程进入就绪状态,才有机会转到运行状态
阻塞状态是线程阻塞在进入synchronized关键字修饰的方法或代码块(获取锁)之前时的状态
等待阻塞
运行的线程执行wait()方法,JVM会把该线程放入等待池中,且wait方法会释放持有的锁
同步阻塞
运行的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM会把该线程放入锁池中。
其他阻塞
运行的线程执行sleep()或join()方法,又或者发出了I/O请求时,JVM会把该线程置为阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/o处理完毕,线程重新转入就绪状态,需要注意的是sleep()是不会释放锁的
waiting(等待,无时限)
调用sleep或者wait方法后线程处于waiting状态,等待被唤醒
timed_waiting(超时等待,有时限)
调用sleep或者wait方法后线程处于timed_waiting状态,等待被唤醒或时间超时自动唤醒
terminated(终止)
当线程的run方法完成时,或者主线程的main方法完成时,我们就认为它终止了。这个线程对象也许是活的,但是它已经不是一个单独执行的线程。线程一旦终止,就不能复生
在一个终止的线程上调用start方法,会放出illegalThreadStateException异常
线程状态之间的转换
new到runnable状态
new状态的线程是不会被操作系统调度的,因此也不会被执行,只有当调用这个线程的start方法,将new状态转换到runnable状态才会被系统调度
runnable与blocked状态的装换
目前只有当线程等待synchronized的隐式锁时,线程才会从runnable向blocked转换
被synchronized修饰的方法、代码块在同一时刻只允许一个线程执行,其他线程只能等待。这种情况下,等待的线程就会冲runnable转换到blocked状态。而当等待的线程获得synchronized隐式锁时,就有会从blocked转换到runnable状态
public class Account {
public void draw(double drawAmount) {
synchronized(this) {
System.out.println("hello");
}
}
public synchronized void drawTask(double drawAmount) {
System.out.println("hello");
}
}
runnable与waiting的转换
有三种场景会触发线程从runnable想waiting转换:
- 获得synchronized隐式锁的线程,调用Object.wait()方法
public void startThread() {
Thread t = new Thread( new Runnable() {
public void run() {
System.out.println( "开始执行线程...");
System.out.println( "进入等待状态...");
synchronized (object) {
try {
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println( "线程结束...");
}
});
t.start();
}
- 调用线程同步Thread.join()方法。例如有一个线程对象Thread A,当调用A.join()的时候,执行这条语句的线程会等待A执行完,而等待中的这个线程,其状态会从runnable转换到waiting。当线程a执行完,原来等待它的线程又会从waiting状态转换到runnable
- 调用LockSupport.park()方法。java并发包中的锁,都是基于LockSupport对象实现的,调用其park方法,当前线程就会阻塞,线程的状态会从runnable转换到waiting。调用LockSupport.unpark(Thread t)方法,可唤醒目标线程,目标线程的状态又会从waiting状态转换到runnable
runnable与timed_waiting的状态转换
有5中场景会触发runnable向timed_waiting转换:
- 调用带超时参数的Thread.sleep(long millis)方法
public void startThread() {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("开始执行线程...");
System.out.println("进入睡眠状态...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程结束...");
}
});
t.start();
}
- 获得synchronized隐式锁的线程,调用带超时参数的Object.wait(long timeout)方法
- 调用带超时参数的Thread.join(long millis)方法
- 调用带超时参数的LockSupport.parkNanos(Object blocker,long deadline)方法
- 调用带超时参数的LockSupport.parkUntil(long deadline)方法。
runnable到terminated
线程执行完run方法后,会自动转换到terminated状态。不过当执行run的过程中抛出异常,也会导致线程终止
如果需要强制终端run方法的执行,则调用interrupt方法。interrupt方法仅仅是通知线程,让线程有机会执行一些后续操作,同事也可以无视这个通知
public class ThreadTerminatedState implements Runnable {
public static void main(String[] args) {
Thread thread = new Thread(new ThreadTerminatedState());
//NEW
System.out.println( thread.getState());
thread.start();
//RUNNABLE
System.out.println( thread.getState());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//TERMINATED System.out.println( thread.getState());
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
System.out.println(i);
}
}
}
线程的实现方式
继承Thread类
package com.multithread.learning;
class Thread1 extends Thread{
private String name;
public Thread1(String name) {
this.name=name;
}
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(name + "运行 : " + i);
try {
sleep((int) Math.random() * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Main {
public static void main(String[] args) {
Thread1 mTh1=new Thread1("A");
Thread1 mTh2=new Thread1("B");
mTh1.start();
mTh2.start();
}
}
每次运行输出的结果可能不同。当程序运行时,java虚拟机启动一个进程,其中主线程main在main()调用的时候被创建,随着main线程的启动,其中mTh1和mTh2线程的start()方法也会被调用,这样整个应用就是在多线程运行
需要注意的是start方法调用后并不是立即执行多线程打码,而是使得该线程变为可运行状态(Runnable),什么时候运行是由系统决定的,这就是每次运行时输出结果可能不同的原因,且是不能重复调用的,会报线程状态异常
实现Runnable接口
重写run即可,然后new一个实例,执行start方法
package com.multithread.runnable;
class Thread2 implements Runnable{
private String name;
public Thread2(String name) {
this.name=name;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(name + "运行 : " + i);
try {
Thread.sleep((int) Math.random() * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Main {
public static void main(String[] args) {
new Thread(new Thread2("C")).start();
new Thread(new Thread2("D")).start();
}
}
Thread类实际上也是实现了Runnable接口的类。但不管是Thread类还是Runnable接口,其所有的线程代码都是通过允星河Thread的start方法来使线程变为可运行的,即所有的线程都是通过Thread对象的API来控制的
实现Callable接口
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(new MyThread());
new Thread(futureTask,"a").start();
System.out.println(futureTask.get());
}
}
class MyThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("call()方法被调用了");
return 1024;
}
}
与Runnable不同的是,Callable可以有返回值,且能够抛出异常,启动线程的方法也从run()=》call()
Thread和Runnable的区别
继承Thread的不适合资源共享,而实现Runnable接口则很容易实现资源共享
Runnable接口适合多个相同的程序代码的线程去处理同一个资源
Runnable接口可以避免java中的单继承的限制
Runnable能够增加程序的健壮性,代码可以被多个线程共享,代码和数据可以独立
线程池只能放入实现Runnable或callable类线程,不能直接放入继承Thread的类
线程调度
线程优先级
Java线程有优先级,优先级高的线程会获得较多的运行机会。
Java线程的优先级用整数表示,取值范围1~10,Thread类有一下三个静态常量:
static int MAX_PRIORITY; //线程可以具有的最高优先级,取值为10。
static int MIN_PRIORITY; //线程可以具有的最低优先级,取值为1。
static int NORM_PRIORITY; //分配给线程的默认优先级,取值为5。
Thread类的setPriority()和getPriority()方法分别用来设置和获取线程的优先级。
每个线程都有默认的优先级。主线程的默认优先级为Thread.NORM_PRIORITY。
线程的优先级有继承关系,比如A线程中创建了B线程,那么B将和A具有相同的优先级。
JVM提供了10个线程优先级,但与常见的操作系统都不能很好的映射。如果希望程序能够移植到各个操作系统中,应该仅仅使用Thread类提供的三个静态常量作为优先级,这样能保证同样的优先级采用了同样的调度方式。
线程睡眠
Thread.sleep(long millis)方法,是线程转到阻塞状态。millis参数设定睡眠的时间,以毫秒为单位。当睡眠结束后,就转为就绪状态。sleep()平台移植性好。
线程等待
Object类中的wait()方法,使当前线程进入等待,知道其他线程调用此对象的notify()或者notifyAll()来唤醒该线程。这两个唤醒方法也是Object类中的方法,行为等价与调用wait(0),区别不同的是notify()是随机从等待池中唤醒某个线程,而notifyAll()会唤醒等待池中全部的线程。
Obj.wait()与Obj.notify()或者Obj.notifyAll()方法必须与synchronized(Obj)一起使用,也就是Obj.wait()与Obj.notify()或者Obj.notifyAll()方法是针对已经获取了Obj锁后进行的操作。从语法角度来说这三个方法必须在synchronized(Obj){…}语句块内执行;从功能上来说wait就是线程在获取对象锁后,主动释放对象锁,同时该线程休眠,直到有其他线程调用对象的notify或者notifyAll方法来唤醒该线程,该线程才能继续获取对象锁,并继续执行。有一点需要注意的是,wait调用后,并不是马上就释放对象锁的,而是在相应的synchronized(){}语句块执行结束后,自动释放锁。当调用该对象的notify后,JVM会在wait()对象锁的线程中随机选取一线程,赋予其独享锁,唤醒该线程
线程让步
Thread.yield()方法,暂停当前正在执行的线程对象(即将运行中状态转为可运行状态),把执行机会让给相同或者更高优先级的线程
虽如此,然并不能保证能够达到让步的目的,因为即使状态变为可运行的,但还是有可能再次被调度程序选中
线程加入
join()方法,等待该线程终止。在当前线程中调用另一个线程的join()方法,则当前线程转入阻塞状态,直到另一个线程运行结束,当前线程再由阻塞状态转为就绪状态。
在很多情况下,主线程生成并启动了子线程,如果子线程里要进行大量的耗时的运算,主线程往往将于子线程之前结束,但是如果主线程处理完其他的事务后,需要用到子线程的处理结果,也就是主线程需要等待子线程执行完成之后在结束,这个时候就要用到join方法了
线程唤醒
Object类中的notify()方法,唤醒在此对象监视器上等待的单个线程。如果所有线程都在此对象上等待,则会随机选择唤醒其中一个线程。线程通过调用其中一个线程的wait方法,在对象的监视器上等待。直到当前的线程放弃此对象上的锁定,才能继续执行被唤醒的线程。被唤醒的线程将以常规方式与该对象上主动同步的其他所有线程进行竞争。例如,唤醒的线程在作为锁定此对象的下一个线程方面没有可靠的特权或劣势,类似的方法还有notifyAll(),唤醒在此对象监视器上等待的所有线程。
类似的功能在1.5之前还有suspend()和resume(),不过1.5后已经被移出,因为有死锁倾向。
线程同步
synchronized关键字
线程数据的传递
在传统的同步开发模式下,当我们调用一个函数时,通过这个函数的参数将数据传入,并通过这个函数的返回值来返回最终的结果。但在多线程的异步模式下,数据的传递与返回和同步开发模式是有很大区别的。由于线程的运行和结束是不可预料的,因此,在传递和返回数据时就无法像函数一样通过函数参数和return语句来返回数据。
通过构造方法来传递数据
在黄建线程时,必须要建立一个Thread类或其子类的实例。因此,当我们在调用start方法之前通过线程类的构造方法将数据传入线程,并将传入的数据使用类变量保存起来,以便线程使用(其实就是在run方法中使用)
public class MyThread1 extends Thread
{
private String name;
public MyThread1(String name)
{
this.name = name;
}
public void run()
{
System.out.println("hello " + name);
}
public static void main(String[] args)
{
Thread thread = new MyThread1("world");
thread.start();
}
}
由于这种方法是在创建线程对象的同时传递数据的,因此,在线程运行之前这些数据就已经到位了,这样就不会造成数据在线程运行后才传入的现象。如果要传递更复杂的数据,可以使用集合类等数据结构。
使用构造方法来传递数据虽然安全,打如果要传递的数据比较多时,就会造成很多不便。由于Java没有默认参数,要想实现类似默认参数的效果,就得使用重载,这样不但使构造方法本身过于复杂,又会使构造方法在数量上大增。因此,要想避免这种情况,可以通过类方法或类变量来传递数据
通过类变量和类方法来传递数据
向对象中传入数据一般有两次机会,第一次是在建立对象是通过构造方法将数据传入,另外一种就是在类中定义一系列的public的方法或者变量。然后在建立完对象后,通过对象实例逐个赋值
public class MyThread2 implements Runnable
{
private String name;
public void setName(String name)
{
this.name = name;
}
public void run()
{
System.out.println("hello " + name);
}
public static void main(String[] args)
{
MyThread2 myThread = new MyThread2();
myThread.setName("world");
Thread thread = new Thread(myThread);
thread.start();
}
}
通过回调函数传递数据
上面两种虽然是最常用的,但这两种方式都是在main方法中主动将数据传入线程类,这对于线程来说,是被动接收这些数据的。然而,在有些应用中需要在线程运行的过程中动态地获取数据
class Data
{
public int value = 0;
}
class Work
{
public void process(Data data, Integer numbers)
{
for (int n : numbers)
{
data.value += n;
}
}
}
public class MyThread3 extends Thread
{
private Work work;
public MyThread3(Work work)
{
this.work = work;
}
public void run()
{
java.util.Random random = new java.util.Random();
Data data = new Data();
int n1 = random.nextInt(1000);
int n2 = random.nextInt(2000);
int n3 = random.nextInt(3000);
work.process(data, n1, n2, n3); // 使用回调函数
System.out.println(String.valueOf(n1) + "+" + String.valueOf(n2) + "+"
+ String.valueOf(n3) + "=" + data.value);
}
public static void main(String[] args)
{
Thread thread = new MyThread3(new Work());
thread.start();
}
}
线程不安全
多个线程相互竞争同一个资源,导致最后的结果与预期的不符,这就叫线程不安全
线程池
本质是hashset+阻塞队列,hashset用于存放正在工作的线程,阻塞队列用于存放工作任务
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待这监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建于销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过渡调度
当派发任务,线程池中正在执行的任务满后,后加入的任务会进入到阻塞队列中,当阻塞队列也满后,会在线程池中创建额外的线程来执行任务,此时如果创建的线程也达到阈值后,后加入的任务就会被拒绝
作用
减少在创建和销毁线程上所花的时间以及系统资源的开销
将当前任务主主线程隔离,能够实现和主线程的异步执行,特别是很多可以分开重复执行的任务。但是,一昧的开启现充也不一定能带来性能上的优化,线程池休眠也是要占用一定的内存空间的,所以合理的选择线程池大小也是有一定的依据。
提高相应速度,方便管理
Executors创建线程池
Java中创建线程池很简单,只需要调用Executors中相应的便捷方法即可,比如Executors.newFixedThreadPool(int nThreads),但是便捷不仅隐藏了复杂性,也为我们埋下了潜在的隐患(OOM(内存用完了),线程耗尽)。
Executors创建线程池便捷方法列表:
方法名 | 功能 |
---|---|
newFixedThreadPool(int nThreads) | 创建固定大小的线程池 |
newSingleThreadExecutor() | 创建只有一个线程的线程池 |
newCachedThreadPool() | 创建一个可缓存、不限线程数上限的线程池,任何提交的任务都将立即执行。当如果当前线程池的规模超出了处理需求,将回收空的线程;当需求增加事,会增加线程数量 |
newScheduledThreadPool() | 创建一个固定大小的线程池,而且以延迟或者定时的方式来执行,类似Timer |
小程序使用这些快捷方法没什么问题,对于服务端需要长期运行的程序,创建线程池应该直接使用ThreadPoolExecutor的构造方法。没错,上述Executors方法创建的线程池就是ThreadPoolExecutor。
阿里巴巴开发收车有提到线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,避免资源耗尽的风险,且Executors返回的线程池对象的弊端有:
- newFixedThreadPool和newSingleThreadExecutor允许的请求队列长度为Integer.MAX_VALUE,约为21亿,可能会堆积大量的请求,从而导致OOM
newCachedThreadPool和newScheduledThreadPool允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM
ThreadPoolExecutor构造方法
Executors中创建线程池的快捷方法,实际上是调用了ThreadPoolExecutor的构造方法(定时任务使用的是ScheduledThreadPoolExecutor),该类构造方法参数列表如下:
// Java线程池的完整构造函数
public ThreadPoolExecutor(
int corePoolSize, // 线程池长期维持的线程数,即使线程处于Idle状态,也不会回收。
int maximumPoolSize, // 线程数的上限
long keepAliveTime, TimeUnit unit, // 超过corePoolSize的线程的idle时长,
// 超过这个时间,多余的线程会被回收。
BlockingQueueworkQueue, // 任务的排队队列
ThreadFactory threadFactory, // 新线程的产生方式
RejectedExecutionHandler handler) // 拒绝策略
竟然有7个参数,很无奈,构造一个线程池确实需要这么多参数。这些参数中,比较容易引起问题的有corePoolSize, maximumPoolSize, workQueue以及handler:corePoolSize和maximumPoolSize设置不但会影响效率,甚至会耗尽线程;
- workQueue设置不当容易导致OOM;
- handler设置会导致提交任务时抛出异常。
ExecutorService threadPool =
new ThreadPoolExecutor(3, //核心线程池大小
5, //最大并发数
10, //超时时间
TimeUnit.SECONDS, //时间单位
new LinkedBlockingQueue<>(),//线程等候队列
Executors.defaultThreadFactory(), //线程创建工厂
new ThreadPoolExecutor.DiscardOldestPolicy());//拒绝策略
四种默认的拒绝策略
/*
new ThreadPoolExecutor.AbortPolicy() 超出最大处理线程抛出异常(最大处理线程数=最大线程数+阻塞队列大小)
new ThreadPoolExecutor.CallerRunsPolicy() 从哪个线程创建就由那个线程执行(如main线程创建,就在main线程执行)
new ThreadPoolExecutor.DiscardPolicy() 队列满了不会抛出异常
new ThreadPoolExecutor.DiscardOldestPolicy() 尝试去和第一个竞争,也不会抛出异常
/
线程池的工作顺序
corePoolSize -> 任务队列 -> maximumPoolSize -> 拒绝策略
如果运行的线程少于corePoolSize,执行器总是更喜欢添加一个新线程,而不是排队。 如果corePoolSize或更多线程正在运行,执行器总是更喜欢对请求进行排队,而不是添加新线程。 如果请求无法排队,则会创建一个新线程,除非该线程超过最大大小,在这种情况下,任务将被拒绝。
Runnable和Callable
可以向线程池提交的任务有两种:Runnable和Callable,二者的区别如下:
- 方法签名不同,void Runnable.run(), V Callable.call() throws Exception
- 是否允许有返回值,Callable允许有返回值
- 是否允许抛出异常,Callable允许抛出异常。
Callable是JDK1.5时加入的接口,作为Runnable的一种补充,允许有返回值,允许抛出异常。
三种提交任务的方式:
提交方式 | 是否关心返回结果 |
---|---|
Future |
是 |
void execute(Runnable command) | 否 |
Future<?> submit(Runnable task) | 否,虽然返回Future,但是其get()方法总是返回null |
如何正确使用线程池
避免使用无界队列
不要使用Executors.newXXXThreadPool()快捷方法创建线程池,因为这种方式会使用无界的任务队列,为避免OOM,我们应该使用ThreadPoolExecutor的构造方法手动指定队列的最大长度:
ExecutorService executorService = new ThreadPoolExecutor(2, 2,
0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(512), // 使用有界队列,避免OOM
new ThreadPoolExecutor.DiscardPolicy());
明确拒绝任务时的行为
任务队列总有占满的时候,这是再submit()提交新的任务会怎么样呢?RejectedExecutionHandler接口为我们提供了控制方式,接口定义如下:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
线程池给我们提供了几种常见的拒绝策略:
拒绝策略 | 拒绝行为 |
---|---|
AbortPolicy | 抛出RejectedExecutionException |
DiscardPolicy | 什么也不做,直接忽略 |
DiscardOldestPolicy | 丢弃执行队列中最老的任务,尝试为当前提交的任务腾出位置 |
CallerRunsPolicy | 直接由提交任务者执行这个任务 |
线程池默认的拒绝行为是AbortPolicy,也就是抛出RejectedExecutionHandler异常,该异常是非受检异常,很容易忘记捕获。如果不关心任务被拒绝的事件,可以将拒绝策略设置成DiscardPolicy,这样多余的任务会悄悄的被忽略。
ExecutorService executorService = new ThreadPoolExecutor(2, 2,
0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(512),
new ThreadPoolExecutor.DiscardPolicy());// 指定拒绝策略
获取处理结果和异常
线程池的处理结果、以及处理过程中的异常都被包装到Future中,并在调用Future.get()方法时获取,执行过程中的异常会被包装成ExecutionException,submit()方法本身不会传递结果和任务执行过程中的异常。获取执行结果的代码可以这样写:
ExecutorService executorService = Executors.newFixedThreadPool(4);
Future