一、线程间通信方式
1.volatile关键字
2.使用wait()和notiry()方法
3.使用countDownLatch(倒计时器)
CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。
原理如下:
内部类Sync直接继承自AQS,并重写了tryAcquireShare和tryReleaseShare方法。实现了当state不为零时将线程入队挂起,在tryReleaseShare执行减一后,如果state为0则,逐个将入队的线程唤醒的操作 当有多个线程调用await时候,当countDown到0的时候,所有调用await的线程都会被唤醒
使用方法如下:
一些线程执行countDownLatch.await(); 另一些线程执行countDownLatch.countDown(); 当countDownLatch倒计时到0的时候从countDownLatch.await();处继续执行
//线程B等待计数器倒计时完成之后才继续向下执行
public class CountDownTest {
static CountDownLatch countDownLatch=new CountDownLatch(2);
public static void main(String[] args) {
new MyThread("B",countDownLatch).start();
new MyThread("A",countDownLatch).start();
new MyThread("A",countDownLatch).start();
}
}
class MyThread extends Thread{
CountDownLatch countDownLatch;
String name;
MyThread(String name,CountDownLatch countDownLatch){
this.name=name;
this.countDownLatch=countDownLatch;
}
@Override
public void run() {
try {
if(!"A".equals(this.name)){
countDownLatch.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.name+" "+"执行");
countDownLatch.countDown();
}
}
//输出结果:
A 执行
A 执行
B 执行
4.ReentrantLock和Condition
4.1 ReentrantLock和Synchronized的区别
1.Synchronized是JVM的锁,ReentrantLock是一个类
2.Synchronized加解锁自动进行,ReentrantLock加解锁手动进行,比较麻烦,但是比较灵活
通过lock()加锁,通过unlock()解锁,解锁操作最好放在finally块中
3.Synchronized是不可中断锁,获取不到一个锁就会一直等待,
ReentrantLock是可中断锁,可以设置等待多久返回去做其他事情
4.ReentrantLock可以初始化为公平锁,Synchronized只能是非公平锁。
4.2 抽象队列同步器(AQS)
AQS定义了多线程对临界资源的访问,可以基于AQS定制不同的同步访问逻辑来定制各种锁,是一种模板设计模式
资源的访问可以分为独占式或共享式
AQS的核心思想是,如果被请求的资源空闲,就将请求资源的线程设置为工作线程,将共享资源设置为锁定状态
如果请求的资源已经在锁定状态,那么就使用一个双向链表维护等待队列
要访问的资源的同步状态state(为一个整数)使用volatile修饰,使用CAS方式去更新state:
不同的锁只需要实现如何获取(acquire)资源的部分即可,等待队列已经由AQS实现好了
4.3 基于AQS的ReentrantLock
ReentrantLock只实现了Lock和Serializeable接口,其内部类Sync 继承了AbstractQueuedSynchronizer
利用了AQS框架来实现加锁机制:
//AQS类的逻辑,使用acquire函数来获取锁,核心是tryAcquire
//tryAcquire获取锁成功了就什么的都不干,失败了就acquireQueued入队
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//非公平锁对获取锁的实现
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//非公平锁的tryAcquire调用nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//当资源空闲(c=0),直接通过CAS操作独占
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//否则判断占据的线程是否是自身,是则可重入锁,不是则获取失败
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 公平锁对tryAcquire实现
final void lock() {
acquire(1);
}
//公平锁的tryAcquire调用tryAcquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//这里和非公平锁的区别是,非公平锁cas修改成功就行,这里还要判断是否有等待队列
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//重入机制和非公平锁是一样的
else if (current == getExclusiveOwnerThread()){
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
解锁的unlock过程对于公平锁和非公平锁都是一样的
unlock将状态减去一定数值,当状态为0则表示线程释放锁
public void unlock() {
sync.release(1);
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
流程如下:
4.4 结合Condition
Condition通过Lock.newCondition()获取,相比Synchronized的wait()和notify()来说,condition的await()和signal()更灵活,因为一个Lock可以新建多个Condition,实现有选择的线程通知
condition的await()和signal()更灵活也需要在lock()和unlock()之间执行
下面是一个使用condition的await()和signal()实现生产者和消费者的示例,缓冲区大小为5,满了就停止生产,为0就停止消费,中间过程可以消费和生产:
注意:使用两个condition分别控制空了和满了的停止消费和生产:
public class ConditionTest {
public static void main(String[] args) {
Queue<Integer> buffer=new LinkedList<>();
ReentrantLock lock=new ReentrantLock();
Condition empty=lock.newCondition();
Condition full=lock.newCondition();
new Thread(new Producer(empty,full,buffer,lock)).start();
new Thread(new Producer(empty,full,buffer,lock)).start();
new Thread(new Consumer(empty,full,buffer,lock)).start();
}
}
class Producer implements Runnable{
Condition empty,full;
Queue<Integer> buffer;
Lock lock;
Producer(Condition empty,Condition full,Queue<Integer> buffer,Lock lock){
this.empty=empty;
this.full=full;
this.buffer=buffer;
this.lock=lock;
}
public void produce(){
lock.lock();
if(buffer.size()<5){
buffer.offer(1);
System.out.println(Thread.currentThread().getName()+"生产了一个,buffer有"+buffer.size());
empty.signal();
}else{
try {
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
lock.unlock();
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
produce();
}
}
}
class Consumer implements Runnable {
Condition empty,full;
Queue<Integer> buffer;
Lock lock;
Consumer(Condition empty,Condition full,Queue<Integer> buffer,Lock lock)
{
this.empty=empty;
this.full=full;
this.buffer=buffer;
this.lock=lock;
}
public void consume(){
while (true){
lock.lock();
if(buffer.size()>0){
buffer.poll();
System.out.println(Thread.currentThread().getName()+"消费了一个,buffer有"+buffer.size());
full.signal();
}else {
try {
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
lock.unlock();
}
}
@Override
public void run() {
consume();
}
}
5.使用CyclicBarrier
CyclicBarrier构造函数:
//代表要阻塞多少个线程(需要多少个线程到达了这里再执行后续)
private int count;
//代表屏障出阻塞的线程数量
public CyclicBarrier(int parties)
//barrierAction可以指定所有线程都到达屏障后执行的函数
public CyclicBarrier(int parties, Runnable barrierAction)
CyclicBarrier一般作为成员变量传入线程,线程调用CyclicBarrier的await方法代表已经到达了阻塞点
await()方法中对count减1,当不为0说明还要等待其他线程,就使用ReentrantLock变量的condition.await()方法阻塞自身,当为0的时候说明所有线程都已经到达,使用ReentrantLock变量的condition.signalAll()方法通知唤醒其余等待的线程继续执行,具体逻辑如下:
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();//里面会调用signalAll唤醒所有阻塞线程
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
6.CyclicBarrier和CountDownLatch的区别
(1)countDownLatch是等待事件到达后执行后续事件,CyclicBarrier是等待一组线程到达之后执行各自线程的 后续部分。
(2)countDownLatch是一次性的,计数器倒计时到0下一次就不会有屏障了,除非手动设置;
但是CyclicBarrier会循环设置屏障阻碍线程
(3)调用countDownLatch的countDown()之后,线程并不会阻塞,调用CyclicBarrier的await()方法,会阻塞当前线程,直到CyclicBarrier指定的线程全部都到达了指定点的时候,才能继续往下执行
7.ThreadLocal内存溢出
每个线程维护一个ThreadLocalMap,其中的每个Enrty的key是一个指向ThreadLocal变量的弱引用,当ThreadLocal变量被回收之后,这个key就会指向空,直到线程结束才会被回收,ThreadLocal解决方法是在每次set,remove方法中都判断当前key是否为空,为空则删除,并遍历整个ThreadLocalMap,将所有key为空的entry的value指向null