1. 发布对象
- 发布对象:使一个对象能够被当前范围之外的代码所使用
- 对象逸出:一种错误的发布。当一个对象还没有枃造完成时,就使它被其他线程所见
对于发布对象:
@Slf4j
@NotThreadSafe
public class UnsafePublish {
private String[] states = {"a", "b", "c"};
public String[] getStates() {
return states;
}
public static void main(String[] args) {
UnsafePublish unsafePublish = new UnsafePublish();
log.info("{}", Arrays.toString(unsafePublish.getStates()));
unsafePublish.getStates()[0] = "d";
log.info("{}", Arrays.toString(unsafePublish.getStates()));
}
}
这种发布对象是不安全的,我们没法只有有没有其他线程去修改这个类,从而造成类里这个状态的错误。
对于对象溢出:
@Slf4j
@NotThreadSafe
@NotRecommend
public class Escape {
private int thisCanBeEscape = 0;
public Escape () {
new InnerClass();
}
private class InnerClass {
public InnerClass() {
log.info("{}", Escape.this.thisCanBeEscape);
}
}
public static void main(String[] args) {
new Escape();
}
}
在对象未完成构造之前,不可以将其发布。可以采用工厂方法或者私有构造函数来完成对象创建和监听器的注册等操作。
安全发布对象
- 在静态初始化函数中初始化一个对象引用
- 将对象的引用保存到 volatile 类型域或者 Atomicreference 对象中
- 将对象的引用保存到某个正确构造对象的 final 类型域中
- 将对象的引用保存到一个由锁保护的域中
具体看单例模式:https://www.yuque.com/zhangcq/concurrent/hvhk76
2. 不可变对象
不可变对象需要满足的条件
- 对象创建以后其状态就不能修改
- 对象所有域都是 final 类型
- 对象是正确创建的(在对象创建期间,this 引用没有逸出)
例如 String 类型。
final
- 修饰类:类不能被继承
- 修饰方法:锁定方法不被继承类修改(private 方法会隐式的被指定为 final 方法)
- 修饰变量
- 基本数据变量:不能被修改了
- 引用类型:初始化后不能再指向另外一个对象(但是里面的值是可以修改的)
Collections.unmodifiableXxx:Collection/List/Set/Map…
转变为不可变对象
private static Map<Integer, Integer> map = Maps.newHashMap();
static {
map.put(1, 2);
map.put(3, 4);
map.put(5, 6);
map = Collections.unmodifiableMap(map);
}
public static void main(String[] args) {
map.put(1, 3);
log.info("{}", map.get(1));
}
此时,map 不能被修改(包括里面的值),会派出异常:
Exception in thread "main" java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
at com.mmall.concurrency.example.immutable.ImmutableExample2.main(ImmutableExample2.java:24)
Guava: ImmutableXxx:Collection/List/Set/Map…
转变为不可变对象,并且提供了带初始化数据的声明方法。
@ThreadSafe
public class ImmutableExample3 {
private final static ImmutableList<Integer> list = ImmutableList.of(1, 2, 3);
private final static ImmutableSet set = ImmutableSet.copyOf(list);
private final static ImmutableMap<Integer, Integer> map = ImmutableMap.of(1, 2, 3, 4);
private final static ImmutableMap<Integer, Integer> map2 = ImmutableMap.<Integer, Integer>builder()
.put(1, 2).put(3, 4).put(5, 6).build();
public static void main(String[] args) {
System.out.println(map2.get(3));
set.add("aa");
}
}
上边展示了 google 的 ImmutableXxx 下集合的使用方式。并且如果这些集合如果执行 add
方法,同样报 java.lang.UnsupportedOperationException
3. 线程封闭
- Ad-hoc 线程封闭:程序控制实现,最糟糕,忽略
- 堆栈封闭:局部变量,无并发问题(全局变量容易引起并发问题)(就是我们平常写的方法)
- Threadlocal 线程封闭:特別好的封闭方法
- 内部维护了一个 map。key 为线程名称,value 是要封闭的对象
- 场景:数据库连接对应JDBC的 connection 对象,从连接池获取 connection 对象,并将 connection 对象封闭再线程里面,做到了线程安全。
- 场景2:结合登录过滤器,将用户信息存储到 threadLocal,方便任何地方获取用户信息。方便且线程安全。实例可以看本机代码/Java并发课程资料/课程资料/SourceCode/Concurrency/src/main/java/com/mmall/concurrency/example/threadLocal/RequestHolder.java)
4. 线程不安全类与写法
StringBuild —> StringBuffer
SimpleDateFromat —> JodaTime
SimpledateFormat 使用错误示例:
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
// 请求总数
public static int clientTotal = 500;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
update();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
}
private static void update() {
try {
// SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
simpleDateFormat.parse("20180208");
} catch (Exception e) {
log.error("parse exception", e);
}
}
执行会报错。解决方式是不用使用全局变量,使用局部没量,每次定义新的 SimpleDateFormat
(31行注释)。
ArrayList, HashSet, HashMap 等 Collections
先检查再执行:if (condition (a) { handle (a) }
5. 线程安全 - 同步容器
- ArrayList —> Vector,Stack
- HashMap —> HashTable(key,value 不能为null)
Collections.synchronizedXxx(List、Set、Map)
6. 并发容器(JUC)
ArrayList —> CopyOnWriteArrayList
- 读写分离(读不加锁,写加锁)
- 最终一致性
- 使用时另外开辟空间
HashSet —> CopyOnWriteArraySet
- CopyOnWriteArraySet 底层是 CopyOnWriteArrayList
- 适用于都多写少
TreeSet —> ConcurrentSkipListSet
- 支持自然排序,并且在构造的时候定义比较器
- 底层基于map
- 批量操作不是原子的,有线程安全问题
- 不能添加空元素
HashMap —> ConcurrentHashMap
- 不允许空值
- TreeMap —> ConcurrentSkipListMap
- key 是有序的
- 支持更高的并发。存取时间和线程数没有关系,在数据量一定的情况下,线程数越多越能提现出它的优势
7. 安全共享对象策略 - 总结
- 线程限制:一个被线程限制的对象,由线程独占,并且只能被占有它的线程修改
- 共享只读:一个共享只读的对象,在没有额外同步的情况下,可以被多个线程并发访向,但是任何线程都不能修改它
- 线程安全对象:一个线程安全的对象或者容器,在内部通过同步机制来保证线程安全,所以其他线程无需额外的同步就可以通过公共接口随意访向它
- 被守护对象:被守护对象只能通过获取特定的锁来访向
8. AQS 以及同步组件
8.1 AQS 讲解
AbstractQueuedSynchronizer 用来构建锁或者同步装置的基础框架。底层的数据结构如下:
底层是使用了双向列表,是队列的一种实现。
- 使用 Node 实现 FIFO 队列,用来构建锁或者同步装置的基础框架
- 利用了一个 int 类型标识状态
- 使用方法是继承
- 子类通过继承并通过实现它的方法管理其状态 {acquire 和 release}的方法操纵状态
- 可以同时实现排它锁和共享锁模式(独占、共享)
具体实现见深入理解 AQS
一些基于 AQS 的子类:
- CountDownLatch:闭锁,通过计数来保证线程是否阻塞
- Semaphore:控制同一时间并发线程的数目
- CyclicBarrier
- ReentrantLock
- Condition
- FutureTask
8.2 CountDownLatch
CountDownLatch 是一个同步辅助类,借助他可以完成类似于阻塞当前线程的功能。利用一个给定的计数器来进行初始化,改计数器的操作是原子操作,即同时只能有一个线程去操作该计数器。
- 调用该类的
await
方法的对象会一直处于阻塞状态,直到其他线程调用countDown
这个方法(使计数器减一),使计数器变为 0 为止。 - 此时所有因调用
await
方法而处于等待状态的线程就回继续往下执行。 - 这种操作只会出现一次,因为计数器是不能被重置的。
- 如果业务上需要一个可以重置次数的计数器,可以用 CyclicBarrier
**
@Slf4j
public class CountDownLatchExample2 {
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
// 10 毫秒之后,虽然计数器没有减为0,也会执行 shutdown
countDownLatch.await(10, TimeUnit.MILLISECONDS);
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
}
}
使用场景:程序执行需要等待某个条件完成后才能继续执行后续操作,典型的应用:并行计算。当运算量很大时,可以将运算拆分成多个子任务,等多个任务都完成之后,父任务再拿到多个子任务的计算结果进行汇总。
注意:
- 如果是
countDownLatch.await(10, TimeUnit.MILLISECONDS);
则 10 毫秒(10 毫秒 是test
方法执行的时间)之后,虽然计数器没有减为0,也会执行shutdown
(执行完shutdown
方法后,线程程序不会立刻销毁所有线程,而是让当前已有的线程全部执行完,再把当前线程池销毁)。 countDownLatch.countDown();
为了因程序异常而无法执行,最好放到finally
里执行。
8.3 Semaphore
控制并发访问的线程个数(控制某个资源可悲同时访问的个数),有两个方法:
acquire()
:获取一个许可,如果没有则进行等待release()
:在操作完成后释放一个许可
Semaphore 维护了当前访问的个数,通过提供同步机制来控制同时访问的个数(有限大小的列表)。
使用场景:提供有限访问的资源,比如数据库的连接数。
获取一个许可
**
@Slf4j
public class SemaphoreExample1 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(); // 获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
结果是 大约每一秒执行 3 个线程。
同时获取和释放多个许可
try {
semaphore.acquire(3); // 获取多个许可
test(threadNum);
semaphore.release(3); // 释放多个许可
} catch (Exception e) {
log.error("exception", e);
}
因为 final Semaphore semaphore = new Semaphore(3);
,所以此时相当于单线程执行:
尝试获取许可
if (semaphore.tryAcquire()) { // 尝试获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
}
进行尝试获取许可,如果能获取则执行然后释放,获取不到则直接丢弃。因为 semaphore 初始化是 3,所以执行结果为只有三个线程同时获得了许可:
tryAcquire
还有另外几个重载的方法:
boolean tryAcquire()
boolean tryAcquire(long timeout, TimeUnit unit)
:在一定时间内尝试获取许可boolean tryAcquire(int permits)
:尝试获取的个数boolean tryAcquire(int permits, long timeout, TimeUnit unit)
8.4 CyclicBarriar
一个同步输助类,允许一组 thread 互相等待,直到某个公共屏障点(common barriar point:countdown=0等)。通过它可以完成过个线程之间的相互等待,只有当每个线程都准备就绪后,才能各自继续往下执行。当某个线程执行了 await
方法后,该线程进入等待状态,而且计数器执行 +1 操作,当计数器的值达到设置的初始值时,之前调用 await
方法的线程会被唤醒,继续执行后续的操作。因为 CyclicBarriar 释放后可以重用,因此被称为「循环屏障」。
使用场景:用于多线程计算数据,最后合并计算结果的应用场景。
与 CountDownLatch 比较:
- 相同点
- 都是通过计数器实现
- 不同点
- CountDownLatch 的计数器只能使用一次,而 CyclicBarriar 的计数器可以使用
reset
方法重置,循环使用。 - CountDownLatch 是一个或多个线需要等待其他线程完成某项操作之后才能继续往下执行,描述的是一个或多个线程等他其他线程的关系。CyclicBarriar 是多个线程之间相互等待,直到所有线程都满足之后才能继续执行,描述的是各个线程内部相互等待的关系。
- CyclicBarriar 能处理更加复杂的事情
- CountDownLatch 的计数器只能使用一次,而 CyclicBarriar 的计数器可以使用
示例:
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
结果是先每隔 1s 执行一个线程,当5个线程都执行完,再同时执行 log.info("{} continue", threadNum);
,然后再依次这样执行下面5个。
int await(long timeout, TimeUnit unit)
也可以控制等待时间,但是注意为了不影响其他线程执行,需要捕获异常。
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
try {
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("BarrierException", e);
}
log.info("{} continue", threadNum);
}
输出:
16:53:34.934 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample2 - 0 is ready
16:53:35.930 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample2 - 1 is ready
16:53:36.933 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample2 - 2 is ready
16:53:36.948 [pool-1-thread-1] WARN com.mmall.concurrency.example.aqs.CyclicBarrierExample2 - BarrierException
java.util.concurrent.TimeoutException: null
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:257)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at com.mmall.concurrency.example.aqs.CyclicBarrierExample2.race(CyclicBarrierExample2.java:37)
at com.mmall.concurrency.example.aqs.CyclicBarrierExample2.lambda$main$0(CyclicBarrierExample2.java:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
16:53:36.948 [pool-1-thread-3] WARN com.mmall.concurrency.example.aqs.CyclicBarrierExample2 - BarrierException
...
在声明 CyclicBarriar 的时候,可以指定一个 runnable,此时在线程到达屏障的时候,优先执行 runnable 。
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
输出:
8.3 ReentrantLock 与锁
ReentrantLock 和 synchronize 区别
- 可重入性:两者都是可重入锁
- 锁实现
- synchronize 基于 JVM 实现(出现异常时,jvm 自动释放锁)
- ReentrantLock 是 JDK 实现(手动在 finally 中释放锁)
- 性能区别
- synchronize 引入偏向锁、自旋锁后,两者性能差不多,官方推荐 synchronize
- 功能区别
- ReentrantLock 可以指定公平锁还是非公平锁
- ReentrantLock 有一个 Condition,可以分组唤醒需要唤醒的线程
- ReentrantLock 提供能够中断等待锁的线程的机制,
lock.lockInterruptibly()
- 自旋锁、cas
Condition 使用
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();
new Thread(() -> {
try {
reentrantLock.lock();
log.info("wait signal"); // 1
condition.await(); // 进入 Condition 队列,同时释放锁并等待唤醒
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("get signal"); // 4
reentrantLock.unlock();
}).start();
new Thread(() -> {
reentrantLock.lock();
log.info("get lock"); // 2
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signalAll(); // 唤醒其他线程
log.info("send signal ~ "); // 3
reentrantLock.unlock(); // 释放锁后,其他线程才能拿到锁
}).start();
}
执行结果是:
- wait signal
- get lock
- send signal ~
- get signal
8.4 ReentrantReadWriteLock
不能同时拥有读锁和写锁,当读多写少的时候会让写线程遭遇饥饿,就是读锁一直在用,没有释放,此时没法获取写锁,没法进行写入。不常用。
private final Map<String, Data> map = new TreeMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public Data get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
public Set<String> getAllKeys() {
readLock.lock();
try {
return map.keySet();
} finally {
readLock.unlock();
}
}
public Data put(String key, Data value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
readLock.unlock();
}
}
class Data {
}
8.5 StampedLock
官方实例:
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
//下面看看乐观读锁案例
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
double currentX = x, currentY = y; //将两个字段读入本地局部变量
if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
stamp = sl.readLock(); //如果没有,我们再次获得一个读悲观锁
try {
currentX = x; // 将两个字段读入本地局部变量
currentY = y; // 将两个字段读入本地局部变量
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
//下面是悲观读锁案例
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
if (ws != 0L) { //这是确认转为写锁是否成功
stamp = ws; //如果成功 替换票据
x = newX; //进行状态改变
y = newY; //进行状态改变
break;
} else { //如果不能成功转换为写锁
sl.unlockRead(stamp); //我们显式释放读锁
stamp = sl.writeLock(); //显式直接进行写锁 然后再通过循环再试
}
}
} finally {
sl.unlock(stamp); //释放读锁或写锁
}
}
}
使用时和 ReentrantLock 有点差异:
long stamp = lock.writeLock();
try {
count++;
} finally {
lock.unlock(stamp);
}
8.6 FutureTask
- Callable 与 Runnable 接口对比
- Future 接口
- 可以监视目标线程调用 call 的情况,当你调用 future 的
get
方法的时候就可以获取结果。
- 可以监视目标线程调用 call 的情况,当你调用 future 的
- FutureTask 类
- 实现了 RunnableFuture 接口,而 RunnableFuture 继承了 Future 和 Runnable
- 既可以作为 Runnable 执行,又可以作为 Future 得到返回值。比如一个费时操作,可以先执行其他操作,再得到线程返回值。
public class FutureExample {
static class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
}
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> future = executorService.submit(new MyCallable());
log.info("do something in main");
Thread.sleep(1000);
String result = future.get();
log.info("result:{}", result);
}
}
执行结果:
- do something in main
- do something in callable
- result:Done
FutureTask 方式实现
public static void main(String[] args) throws Exception {
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
});
new Thread(futureTask).start();
log.info("do something in main");
Thread.sleep(1000);
String result = futureTask.get();
log.info("result:{}", result);
}
8.7 BlockingQueue
阻塞队列:当队列满了或者队列消费空的时候,生产线程或消费线程会阻塞,直到有线程进行了消费或者生产。对应的方法如下:
- | 抛出异常 | 返回特殊值(true/false) | 阻塞 | 设置超时时间,超时后返回特殊值 |
---|---|---|---|---|
Insert | add(o) | offer(o) | put(o) | offer(o,timeout,timeunit) |
Remove | remove(o) | poll | take() | poll(timeout,timeunit) |
Examine | element() | peek() |
BlockQueue 的实现类
- ArrayBlockingQueue:一个有界的阻塞队列,底层是一个数组。先进先出方式存储数据
- DelayQueue:里面的元素必须实现 Delayed 接口,一般按照元素过期时间的优先级进行排序
- LInkedBlockQueue:内部实现是链表,先进先出方式存储数据。大小能变动
- PriorityBlockingQueue:没有边界,但是有排序规则。对象必须实现 comparator 接口
- SynchronusQueue:内部仅允许容纳一个元素。当容器插入一个元素后就阻塞,除非这个元素被另一个线程消费。同步无界队列。