上次更新:2020-02-24
前言
一般我们说并发编程,其实就是让我们的程序执行的更快,我们可以用多线程的,一个线程干活变成多个线程干活。但由于线程是由CPU分配资源进行调度的,多个线程执行一个任务会变得不可控。
拿数钱这个活来说吧,原来一个线程数数2万耗费2秒,该线程的CPU分配的资源用完了,就标记下,当前数到1万了,等再分配到资源后,继续从刚才1万的位置开始数。现在为了加快速度,把数钱这个任务分开,让2个线程数2万,每个线程数1万,这样只要1秒。
// 有问题
static int totalMoney = 0;
public static void main(String[] args) throws Exception {
final int count = 10000;
Thread thread1 = new Thread(() -> {
for (int i = 0; i < count; i++) {
totalMoney++;
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < count; i++) {
totalMoney++;
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
// 结果小于2万
System.out.println(totalMoney);
}
理想很美好,现实很骨感。上述结果最后得到的是小于2万的。但是要解决数钱这个问题,有很多种手段,这就要学习并发编程了。
上面啰里啰嗦的说了一大堆,总结起来并发编程有三个核心问题:分工、同步和互斥
- 分工:将一个任务拆成若干个子任务去完成
- 同步:子任务有的执行快,有的执行慢,如何保证大家的步调一致
- 互斥:上述两步是为了提供程序的性能,互斥是为了解决程序的正确性
将一个大问题拆解成三个小问题,逐个攻破。
一些基本的理论,不会再去讲,网上有很多了,假设读者已经知道这几点:
- 什么是进程
- 什么是线程
- 进程、线程的关系
- 线程的几种状态
- 并行和并发的概念
Java内存模型
Happens-Before规则
它约束了编译器的优化行为,前面一个操作的结果对后续操作是可见的,提出了六项规则:
1. 顺序性原则
2. volatile 变量原则
对一个volatile变量的写操作对后续这个volatile变量读操作是可见的
3. 传递性
如果 A Happens-Before B,且 B Happens-Before C,那么 A Happens-Before C
4. 管程中锁的规则
一个锁的解锁 Happens-Before 于后续对这个锁的加锁
5. start()规则
主线程 A 启动子线程 B 后,子线程 B 能够看到主线程在启动子线程 B 前的操作
6. join()规则
线程A等待线程B完成(调用了线程B的join()方法),线程B完成后,线程A能看到线程B的操作
使用
线程使用
继承 Thread 类
class MyThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
实现 Runnable 接口
class MyThread implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
Callable
class MyThread implements Callable {
@Override
public Object call() throws Exception {
System.out.println("hello world " + Thread.currentThread().getName());
return "hello world ";
}
}
Lambda 的形式
new Thread(() ->
System.out.println("hello world " + Thread.currentThread().getName()
).start();
线程休眠
sleep();阻塞,让出CPU资源
Thread.sleep();
线程让出CPU资源
yield(); 当前线程愿意让出已获得的 CPU 执行资源,由系统重新进行调度。
Thread.yield();
线程等待
join(); 会阻塞当前线程,直到另一线程执行结束
public static void main(String[] args) {
Thread threadA = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
}
System.out.println("线程A执行结束");
});
Thread threadB = new Thread(() -> {
try {
// 这里会阻塞,直到线程A执行结束
threadA.join();
System.out.println("线程B开始执行");
} catch (Exception e) {
}
});
// 输出结果:
// 线程A执行结束
// 线程B开始执行
threadB.start();
threadA.start();
}
wait()/notify()
这两个方法在 Object 类中,也就是说所有的类都有可以使用,用来等待和通知,调用前需要获取锁
public static void main(String[] args) {
ObjectWaitNotify objectWaitNotify = new ObjectWaitNotify();
new Thread(() -> {
try {
System.out.println("等待中...");
synchronized (objectWaitNotify) {
objectWaitNotify.wait();
}
System.out.println("被唤醒了,等待结束...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (objectWaitNotify) {
objectWaitNotify.notify();
}
System.out.println("唤醒...");
}).start();
}
ThreadLocal
ThreadLocal,让每个线程可以有自己的局部空间,实现线程之间的隔离。本质是一个 Map, key 就是其本身,value 则存的数据,而且 key 的引用类型是 WeakReference,这样就保证了一旦该线程结束,下次GC就会被回收,从而防止内存泄漏。
典型应用就是Spring的事务管理器中。
public static void main(String[] args) {
ThreadLocal<String> threadLocalA = new ThreadLocal<>();
ThreadLocal<String> threadLocalB = new ThreadLocal<>();
threadLocalA.set("A-1");
threadLocalB.set("B-1");
// 输出: A-1
System.out.println(threadLocalA.get());
// 输出: B-1
System.out.println(threadLocalB.get());
}
final
被 final 关键字修饰的变量是不可以改变的,编译器可以使劲的优化。
volatile
它是一个关键字,用来修饰变量。通过插入内存屏障指令禁止编译器和CPU对程序进行重排序。
特性:
- 数据的可见性,一个线程修改了值,另一个线程可以马上得知。
- 禁止指令重排序
可见性
导致可见性的原因是缓存。每个线程都有自己的内存空间,volatile 的作用就是让线程操作该变量时不使用自己的内存,而是直接操作内存,这样每修改该变量,其它线程都能看得到,保证了数据的可见性。
重排序
导致有序性的原因是编译优化。为了性能优化,JMM 在不改变正确语义的前提下,会允许编译器和处理器对指令序列进行重排序。使用了 volatile 可以禁止重排序。比如在单例中的经典使用:
public class DoubleCheckSingleton {
/**
* volatile 禁止指令重排,可见性
*/
private volatile static DoubleCheckSingleton doubleCheckSingleton;
private DoubleCheckSingleton () {
}
public static DoubleCheckSingleton getSingleton() {
if (null == doubleCheckSingleton) {
synchronized (DoubleCheckSingleton.class) {
if (doubleCheckSingleton == null) {
doubleCheckSingleton = new DoubleCheckSingleton();
}
}
}
return doubleCheckSingleton;
}
}
synchronized
synchronzied 保证线程同步,是可重入锁,会存在阻塞,导致线程的上下文切换。Java中每个对象都有对象都有一个对象头,锁的对象就是存放在这里面的。
反编译一个 class 文件: javap -verbose xxx.class
得到如下结果(截取部分片段),会在 synchronized 块中插入 monitorenter 和 monitorexit:
public static void main(String[] args) {
ThreadDemo demo = new ThreadDemo();
int i = 0;
synchronized (demo) {
i ++;
}
System.out.println(i);
}
//=============反编译后的============================
13: monitorenter # 进入
14: iinc 2, 1
17: aload_3
18: monitorexit # 退出
线程同步:
public class SynchronizedDemo2 implements Runnable{
private int count = 5;
@Override
public synchronized void run() {
try {
// 为了效果,否则太快了
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
count--;
// 打印结果每个count值不相等
System.out.println(count);
}
public static void main(String[] args) {
SynchronizedDemo2 s1 = new SynchronizedDemo2();
// 一个对象多次访问共享数据,需要在方法中加synchronized,对象锁来保证数据正确,
for (int i=0; i< 5; i++) {
new Thread(s1).start();
}
}
}
ReentrantLock
用法比 Synchronized 更灵活,有多个构造方法可以使用,例如锁是否公平等等
public class ReentrantLockDemo implements Runnable{
/**
* true设置为公平锁,反之
*/
private ReentrantLock lock = new ReentrantLock(true);
@Override
public void run() {
for (int i=0; i<2; i++) {
try {
// 先获取锁
lock.lock();
System.out.println(Thread.currentThread().getName() + " get lock!");
Thread.sleep(1);
}catch (Exception e) {
e.printStackTrace();
}finally {
// 一定要在finally释放锁
lock.unlock();
}
}
}
public static void main(String[] args) {
ReentrantLockDemo rntrt = new ReentrantLockDemo();
Thread t1 = new Thread(rntrt,"T1");
Thread t2 = new Thread(rntrt,"T2");
// 打印结果,交替获取锁
t1.start();
t2.start();
}
}
Condition
通过 Lock 获取,提供的 await() 和 signal() 功能和 wait() 和 notify 类似,但是实现机制不一样,而且更灵活
public class ConditionDemo {
public static void main(String[] args) {
WaitSignal waitSignal = new WaitSignal();
new Thread(() -> {
try {
System.out.println("等待中...");
waitSignal.await();
System.out.println("被唤醒了,等待结束...");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
waitSignal.signal();
System.out.println("唤醒");
}).start();
}
static class WaitSignal {
private final ReentrantLock lock;
private final Condition condition;
public WaitSignal() {
lock = new ReentrantLock();
condition = lock.newCondition();
}
public void await() throws InterruptedException {
// 先获取锁
lock.lock();
condition.await();
lock.unlock();
}
public void signal() {
lock.lock();
condition.signal();
lock.unlock();
}
}
}
原子操作
J.U.C下 的 atomic 包提供了很多原子操作的类,使用上都类似,也好理解
public class IntegerAtomicDemo {
public static void main(String[] args) throws InterruptedException {
AtomicLong atomicLong = new AtomicLong(100);
System.out.println(atomicLong.get());
atomicLong.set(101);
System.out.println(atomicLong.get());
System.out.println(atomicLong.compareAndSet(100, 102));
System.out.println(atomicLong.compareAndSet(101, 500));
System.out.println(atomicLong.incrementAndGet());
System.out.println(atomicLong.getAndIncrement());
System.out.println(atomicLong.get());
}
}
CountDownLatch
线程协同工具,用“乘客到齐再发车”来形容再形象不过了
public class CountDownLatchDemo {
public static void main(String[] args) {
try {
new CountDownLatchDemo().go();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void go() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(new Task(countDownLatch),"Thread1").start();
new Thread(new Task(countDownLatch),"Thread2").start();
new Thread(new Task(countDownLatch),"Thread3").start();
countDownLatch.await();
System.out.println("所有线程已经到达,主线程开始执行 " + System.currentTimeMillis());
}
class Task implements Runnable {
private CountDownLatch countDownLatch;
public Task(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + " 已到达 " + System.currentTimeMillis());
// 减一个
this.countDownLatch.countDown();
}
}
}
CyclicBarrier
译为循环栅栏,和 CountDownLatch 类似,但是它可以循环使用
public class CyclicBarrierDemo {
public static void main(String[] args) {
try {
new CyclicBarrierDemo().go();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void go() throws InterruptedException {
// 第一个参数是要等待的数量,另一个是执行的操作
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
System.out.println("所有线程已经完成:" + System.currentTimeMillis());
}
});
new Thread(new Task(cyclicBarrier), "Thread1").start();
new Thread(new Task(cyclicBarrier), "Thread2").start();
new Thread(new Task(cyclicBarrier), "Thread3").start();
new Thread(new Task(cyclicBarrier), "Thread1").start();
cyclicBarrier.reset();
new Thread(new Task(cyclicBarrier), "Thread1").start();
new Thread(new Task(cyclicBarrier), "Thread2").start();
new Thread(new Task(cyclicBarrier), "Thread3").start();
}
class Task implements Runnable {
private CyclicBarrier cyclicBarrier;
public Task(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + " 已到达 " + System.currentTimeMillis());
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
Semaphore
译为:信号量,或许叫“许可证“更好理解,可以用来控制最大并发数
public class SemaphoreDemo {
private static ExecutorService pool = new ThreadPoolExecutor(10, 10,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new MyThreadFactory("my-test"));
public static void main(String[] args) {
// 设置最多同时获取的数量
Semaphore semaphore = new Semaphore(3);
//模拟用户访问
for (int i = 0; i < 10; i++) {
final int No = i;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
// 拿到许可
semaphore.acquire();
System.out.println("Accessing: " + No);
Thread.sleep(2000);
// 释放
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// 放到线程池中执行,可以改变线程池的core数,观察结果
pool.execute(runnable);
}
}
}
ConcurrentHashMap
在 JDK1.8 中,使用到了数组+链表+红黑树这三种结构;使用了 CAS 和 synchronized 来保证并发操作;其中 key 和 value 不能为 null;使用 synchronized 对每个链表的头结点加锁;如果链表节点大于等于8就会转换为红黑树;节点为6的时候又转为链表。
CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存地址里面的值和A的值是一样的,那么就将内存里面的值更新成B。CAS是通过无限循环来获取数据的,若果在第一轮循环中,a线程获取地址里面的值被b线程修改了,那么a线程需要自旋,到下次循环才有可能机会执行。
put 的过程:
将 key 相应的的 hash 值插入对应的数组中,如果该数组位置为 null,使用 CAS 插入,如果已经有值了就使用 synchronized 锁住,并追加到链表中,如果 Node 元素是红黑树类型,那么就插入到红黑树中。
get 的过程:
get 不涉及到并发,只要将 key 相应的 hash 从 Node 数组中获取,如果是链表或红黑树则需要遍历。
线程池
任何消耗资源且需要频繁重复创建的,理论上都可以用池化技术来优化,线程池也是基于这个理念。线程池使用的是生产者-消费者模型,线程池本身就是消费者。JDK中提供了多种类型的线程池:
- CacheThreadPool,短时间内处理大量工作的线程池,会根据任务数量产生对应的线程,并试图缓存线程以便重复使用,如果限制 60 秒没被使用,则会被移除缓存
- FixedThreadPool,创建一个数量固定的线程池,超出的任务会在队列中等待空闲的线程,可用于控制程序的最大并发数
- SingleThreadPool,创建一个单线程线程池
- ScheduledThreadPool,创建一个数量固定的线程池,支持执行定时性或周期性任务
执行过程:当一个任务进来,如果线程池中的线程小于核心线程数,那么就创建,如果线程池中的线程数大于核心线程数但是小于工作队列???
以定时任务线程池举例:
public class SchedulerPoolDemo {
/**
* 创建工作线程数为1
* 默认使用AbortPolicy策略
*/
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);
scheduledThreadPool.scheduleWithFixedDelay(() -> {
System.out.println("ThreadPool:" + LocalDateTime.now());
}, 1,1L, TimeUnit.SECONDS);
}
}
线程池的参数:
corePoolSize
线程池核心线程数量,也就是该线程池无论如何都会保留的数量
maximumPoolSize
线程池的最大数量,如果workQueue满了且核心线程数小于该值,会创建新的线程,但不能超过它,当空闲下来该数量就会减少;如果核心线程等于该值,将执行拒绝策略
keepAliveTime 和 unit
时间单位,一个线程是空闲还是忙的状态,取决于这两个参数
workQueue
工作队列
threadFactory
创建线程的工厂
RejectedExecutionHandler handler
如果所有线程都在忙,工作队列也满了,就会执行该决绝策略。可以自定义拒绝策略,有:
- CallerRunsPolicy:提交任务的线程自己去执行该任务。
- AbortPolicy:默认的拒绝策略,会throws RejectedExecutionException。
- DiscardPolicy:直接丢弃任务,没有任何异常抛出。
- DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入 到工作队列。
注意事项:
- 使用有界队列,否则容易OOM
- 建议自定义拒绝策略
- 在线程池中执行的任务报错只会让该线程终止,而不会抛异常,一定要在业务逻辑中进行异常处理
一般使用线程池的任务可以分为两种: I/O密集型和CPU密集型,使用时workQueue要设置大小,否则可能会抛OOM异常。
其实目的都是为了将硬件的性能发挥极致,如果只是CPU计算型的话,那么几核CPU用多个个线程即可,顶多再加1个线程数,防止阻塞;I/O密集型只要算好I/O耗时和CPU计算时间,做到让CPU无缝切换。
I/O密集型计算场景:
最佳线程数 = CPU核数 * (1 +(I/O 耗时 / CPU 耗时))
CPU密集型计算场景:
最佳线程数 = CPU核数 + 1
虽然提供了Executors来方便的创建线程池,但是在阿里巴巴开发手册中不建议这样做,可能会出现OOM。所以还是使用 ThreadPoolExecutor 来创建,而且能让开发者更明确现线程池的运行规则。
线程工厂
如果你在排查问题使用过 jstack 这个命令就会知道线程的命名有多重要:
一般项目中我们都会实现 ThreadFactory 接口,自定义一个,例如看 Dubbo 中是如何自定义一个线程工厂:
public class NamedThreadFactory implements ThreadFactory {
protected static final AtomicInteger POOL_SEQ = new AtomicInteger(1);
protected final AtomicInteger mThreadNum = new AtomicInteger(1);
protected final String mPrefix;
protected final boolean mDaemon;
protected final ThreadGroup mGroup;
public NamedThreadFactory() {
this("pool-" + POOL_SEQ.getAndIncrement(), false);
}
public NamedThreadFactory(String prefix) {
this(prefix, false);
}
public NamedThreadFactory(String prefix, boolean daemon) {
mPrefix = prefix + "-thread-";
mDaemon = daemon;
SecurityManager s = System.getSecurityManager();
mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
}
@Override
public Thread newThread(Runnable runnable) {
String name = mPrefix + mThreadNum.getAndIncrement();
Thread ret = new Thread(mGroup, runnable, name, 0);
ret.setDaemon(mDaemon);
return ret;
}
public ThreadGroup getThreadGroup() {
return mGroup;
}
}
锁优化
自旋锁
当线程被阻塞,该线程就会被挂起,会发生上下文切换,也就是从用户态切换到内核态,性能影响大。所以有了自旋锁,发生阻塞时,不挂起,而是原地自旋等待,减少了上下文切换带来的性能开销
锁的分类
悲观锁
悲观锁认为对于同一个数据的并发操作,一定是会发生修改的。synchronized是典型的悲观锁
乐观锁
乐观锁正好和悲观锁相反,它获取数据的时候,并不担心数据被修改,每次获取数据的时候也不会加锁,只是在更新数据的时候,通过判断现有的数据是否和原数据一致来判断数据是否被其他线程操作。CAS就是乐观锁
共享锁
独占锁
公平锁
抢占锁的机制,是否遵循FIFO顺序,非公平锁的性能比公平锁好,因为公平锁需要调度。
例如:ReentrantLock pairLock = new ReentrantLock(true);
非公平锁
不遵循FIFO顺序获得锁,例如:ReentrantLock pairLock = new ReentrantLock(false);
可重入锁
可重入锁指的是该线程获取了该锁之后,可以无限次的进入该锁锁住的代码
自旋锁
自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,但是循环会消耗 CPU 资源
常见面试题
优雅停止线程的方法是什么?
模拟出并发访问场景
线程的状态切换过程和相关方法的使用
使用wait()和notify()模拟阻塞队列
什么是CAS?
Compare And Swap,比较并交换,它有3个值,分别是内存中的V,预期值A和新的值B,先将内存位置中的值V与预期的值A进行比较,如果相等,那么将内存中的值V更新为新的值B。整个操作是原子的。
但它会出现ABA问题,可以使用版本号的来解决
它是乐观锁
什么是锁升级?
锁的状态有4中,分别是无锁状态、偏向锁状态、轻量级锁状态、重量级锁状态,会随着竞争而逐渐升级。
偏向锁
当一线程访问同步块的时候,会在对象头和栈帧中的锁记录存储偏向锁的线程 ID,以后该线程进入或者退出同步块时,不需要进行 CAS 操作来加锁或者解锁,只需要简单测试一下对象头的 Mark Word里是否存储着指向当前线程的偏向锁。如果测试成功,表示该线程获得了锁。如果测试失败,需要再测试一下 Mark Word 中偏向锁的标识是否设置为 1(表示当前是偏向锁) ,如果没有设置,则使用 CAS 竞争锁,如果设置了,则尝试使用 CAS 将对象头的偏向锁指向当前线程。
轻量级锁
线程在执行同步块之前,JVM 会先在当前线程的栈帧中创建用于存储锁的记录空间,并将对象头中的 MarkWord 字段复制到锁记录中,当前线程尝试使用 CAS 将对象头中的 Mark Word 替换成指向锁记录的指针。如果获得成功,当前线程获得了锁,如果失败,表示其他线程竞争锁,当前线程采用 CAS 来获取锁。
轻量级锁解锁的时候,会使用原子的 CAS 操作将 锁记录中的 Mark Word 替换会对象头,如果成功,表示没有竞争发送,如果失败,当前存在锁竞争,锁就会膨胀成重量级锁。一旦锁变成了重量级锁,就不会再恢复到轻量级锁,当锁处于这个状态下,其他线程试图获取锁是,都会被阻塞,当持有的线程释放锁之后会唤醒这些线程,被唤醒的线程开始新一轮的锁争抢
什么是内存泄漏?
当一个变量在代码中一直未被使用,占用内存,并且不能被GC回收,就是内存泄漏