一、CountDownLatch:
用来同步一个或多个任务,强制他们等待有其他任务执行的一组操作完成,可以向CountDownLathc设置一个初始计数值,任何在这个对象上调用wait()/await()方法都将等待,直到数值为0,可以通过在其他任务上调用countDown()来减少计数值。CountDwonLatch的典型用法就是将一个程序化分为n个互相独立的可解决任务。
1.示列一:
class TaskProtion implements Runnable{
public static int cont=0;
private final int id=cont++;
private Random random=new Random();
private CountDownLatch downLatch;
TaskProtion(CountDownLatch lo){
downLatch=lo;
}
@Override
public void run() {
try {
doWork();
downLatch.countDown();
System.out.println("countDown后的count的值"+downLatch.getCount());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void doWork() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(random.nextInt(2000));
System.out.println(this+"completed");
}
@Override
public String toString() {
return String.format("for mat "+id);
}
}
class WaitingTask implements Runnable{
private static int count=0;
private final int id=count++;
private CountDownLatch downLatch;
WaitingTask(CountDownLatch xuxu){
downLatch=xuxu;
}
@Override
public void run() {
try {
downLatch.await();
System.out.println(this+"InterruptExection");
} catch (InterruptedException e) {
}
}
@Override
public String toString() {
return String.format("Task "+id);
}
}
public class CountDemo {
static final int SIZE=10;
public static void main(String[] args) throws InterruptedException {
final ExecutorService service = Executors.newCachedThreadPool();
final CountDownLatch latch = new CountDownLatch(SIZE);
for (int i = 0; i <2 ; i++) {
service.execute(new WaitingTask(latch));
}
for (int i = 0; i <SIZE ; i++) {
service.execute(new TaskProtion(latch));
}
TimeUnit.SECONDS.sleep(5);
service.shutdown();
}
}
上述程序展示啦CountDownLatch的使用。
注意:
1.countDown()只和初始化的public CountDownLatch(int count){}有关系,同一线程可以执行count次的countDown()
2.在使用countdown()的时候不会阻塞,只有await/wait()方法时候countdown的值不会0时才会被阻塞。
2. 示列二:
class User {
private String name;
private String id;
public void setId(String id) {
this.id = id;
}
public String getId() {
return id;
}
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
@Override
public String toString() {
return "{User"+" name: "+name+" id: "+id+"}";
}
}
class TaskDemo1 implements Runnable {
private CountDownLatch count;
private List<User> users;
Random random = new Random();
private int id = 0;
TaskDemo1(CountDownLatch count, List<User> users) {
this.count = count;
this.users = users;
}
@Override
public void run() {
try {
info();
} catch (InterruptedException e) {
e.printStackTrace();
}
count.countDown();
}
public void info() throws InterruptedException {
User user = new User();
user.setName("s" + random.nextInt(10));
users.add(user);
}
}
class TaskDemo2 implements Runnable {
private CountDownLatch count;
private List<User> users;
Random random = new Random();
TaskDemo2(CountDownLatch count, List<User> users) {
this.count = count;
this.users = users;
}
@Override
public void run() {
try {
count.await();
if (users.size() != 0 && users != null)
for (User user : users) {
if (user.getName() != null && user.getId() == null)
user.setId("" + random.nextInt(10));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ContdwonDemo {
public static void main(String[] args) throws InterruptedException {
final List<User> list = new CopyOnWriteArrayList<>();
final CountDownLatch count = new CountDownLatch(10);
final ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
service.execute(new TaskDemo1(count, list));
}
TimeUnit.MILLISECONDS.sleep(10);
service.execute(new TaskDemo2(count, list));
for (User user : list) {
System.out.println(user);
}
service.shutdown();
}
}
注意:
1.在使用多线程中需要实例对象的时候,要注意给jvm一定时间去实例化对象。或者直接使用CopyOnWriteArrayList不用给定时间,底层add操作有占用一定时间
二、CyclicBarrier:
通过CycliBarrier构造器中开辟一定数量的线程,这些线程在执行到await的时候将挂起,直到其中有一个线程执行CycliBarrier构造器中的任务后才会继续执行.
note:当CycliBarrier的构造器没有传入任务的时候,cyclicBarrier.await();将不会阻塞
示例一:
class HoresDemo implements Runnable{
private int coun=0;
private CyclicBarrier cyclicBarrier;
HoresDemo(CyclicBarrier cyclicBarrier){
this.cyclicBarrier=cyclicBarrier;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" run()任务");
try {
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName()+" 等待后执行的任务");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public class CyclicBarrierDemo {
static final int SIZE=5;
public static void info(int count,Runnable run){
ExecutorService service = Executors.newCachedThreadPool();
CyclicBarrier barrier = new CyclicBarrier(count,run);
for (int i = 0; i <count ; i++) {
service.execute(new HoresDemo(barrier));
}
}
public static void main(String[] args) {
Runnable runnable=()->{
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" CyclicBarrier构造器中的任务");
System.exit(0);
};
info(SIZE,runnable);
}
}
示列详解:
示例一 展示了CyclicBarrier:的用法:程序中的 CyclicBarrier(count,run);会根据传入的count开启执行的线程数量,同时这些线程在运行到await()时都会等待。知道其中一个线程执行了CyclicBarrier(count,run);中的run任务之后才会继续执行。如果构造器中没有传入run任务的话await将不会阻塞
示例二:赛马游戏
class Horse implements Runnable {
private static int count = 0;
private final int id = count++;
private int strides = 0;//跨步数
private Random random = new Random();
private CyclicBarrier cyclicBarrier;
Horse(CyclicBarrier c){
cyclicBarrier=c;
}
public synchronized int getStrides() {
return strides;
}
@Override
public void run() {
try {
while (!Thread.interrupted()){
synchronized (this){
strides+=random.nextInt(3);//每个马随机跨的栏数
}
cyclicBarrier.await();//每个线程等待后将执行CyclicBarrier中的run任务
System.out.println(id+"号马跨了"+getStrides()+"栏");
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
throw new RuntimeException("BrokenBarrierException error");
}
}
@Override
public String toString() {
return "Horse"+id+" "+" ";
}
public String tracks(){
StringBuilder stringBuilder=new StringBuilder();
for (int i = 0; i <getStrides() ; i++) {
stringBuilder.append("*");
}
stringBuilder.append(id);
return stringBuilder.toString();
}
}
public class HorseRace {
static final int SIZE=75;//总栏数
private List<Horse> horses=new ArrayList<>();//存放跑的马
private ExecutorService service=Executors.newCachedThreadPool();
private CyclicBarrier cycli;
public HorseRace(int nHose,final int pause){
cycli=new CyclicBarrier(nHose, new Runnable() {
@Override
public void run() {
final StringBuilder builder = new StringBuilder();
for (int i = 0; i <SIZE ; i++) {
builder.append("|");//打印栏数
}
System.out.println(builder);
for (Horse hors : horses) {
System.out.println(hors.tracks());//打印出每个马跑的步数
}
for (Horse hors : horses) {
if (hors.getStrides()>=SIZE) {
System.out.println(hors + "won !!!");
service.shutdownNow();
return;
}
}
try {
TimeUnit.MILLISECONDS.sleep(pause);
} catch (InterruptedException e) {
System.out.println(e);
}
}
});
for (int i = 0; i <nHose ; i++) {//开辟了nHose匹马
final Horse horse = new Horse(cycli);
horses.add(horse);
service.execute(horse);
}
}
public static void main(String[] args) {
int nHosr=7;
int pause=200;
new HorseRace(nHosr,pause);
}
}
示列详解:
当所以的马都跨过他们的栅栏数目时,这个时候遇到了await方法,然后将会抽出一个线程去执行打印马儿栏数和总栅栏的任务。等待这个任务执行完毕后马儿们才会继续跨栏。
三、DelayQueue:
Delay接口:一种混合样式接口,用于标记在给定延迟后应执行操作的对象。用于标记在给定延迟后应执行操作的对象
DelayQueue是一个无界的阻塞队列,该队列存放实现Dealy接口的元素,该队列在存放元素时时无序的,在take时候会按照compareTo()方法设定的排序规则取出元素。
示例一:
class DelayTask implements Runnable, Delayed {
private static int count = 0;
private final int id = count++;
private final int delta;
private final long trigger;
static CountDownLatch countd = new CountDownLatch(10);
protected static List<DelayTask> sequence = new ArrayList<>();
public DelayTask(int delta) {
this.delta = delta;
trigger = System.nanoTime() + TimeUnit.NANOSECONDS.convert(delta, TimeUnit.MILLISECONDS);
sequence.add(this);
}
@Override
public void run() {
System.out.println(this + " ");
countd.countDown();
}
@Override
public String toString() {
return String.format("[%1$-4d]", delta) + " Task " + id;
}
public String summary() {
return "(" + id + ":" + delta + ")";
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(trigger - System.nanoTime(), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed o) {
final DelayTask delayed = (DelayTask) o;
if (trigger < delayed.trigger)
return -1;
if (trigger > delayed.trigger)
return 1;
return 0;
}
public static class EndSentinel extends DelayTask {
private ExecutorService service = Executors.newCachedThreadPool();
public EndSentinel(int delta, ExecutorService executorService) {
super(delta);
service = executorService;
}
public void run() {
for (DelayTask task : sequence) {
System.out.println(task.summary() + " ");
}
System.out.println();
try {
countd.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this + "Calling shutdownNow ");
service.shutdownNow();
}
}
}
class DelayedTaskConsumer implements Runnable {
private DelayQueue<DelayTask> delayQueue;
public DelayedTaskConsumer(DelayQueue<DelayTask> delayQueue) {
this.delayQueue = delayQueue;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
delayQueue.take().run();//进行消费,take 时按照定义的排序顺序
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费完毕");
}
}
public class DelayQueueDemo {
public static void main(String[] args) {
info();
}
public static void info() {
final Random random = new Random();
ExecutorService e = Executors.newCachedThreadPool();
DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
for (int i = 0; i < 10; i++) {
delayQueue.put(new DelayTask(random.nextInt(5000)));//put时无序
}
delayQueue.add(new DelayTask.EndSentinel(5000, e));
e.execute(new DelayedTaskConsumer(delayQueue));
}
}
注意:
配合CountDownLatch的使用还可保证,队列中的前面的任务都执行完毕后才回去执行队列最后的结束任务
四、PriorityBlockingQueue:
是一个很的优先级队列,它具有可阻塞的读取操作。同样的在放入对列的时候无序,但在取出对列的时候按照定义排序的顺序取出。
示列:实时取出优先级最高的元素
class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
private Random rand = new Random(47);
private static int counter = 0;
private final int id = counter++;
private final int priority;
protected static List<PrioritizedTask> sequence = new ArrayList<PrioritizedTask>();
public PrioritizedTask(int priority) {
this.priority = priority;
sequence.add(this);
}
public int compareTo(PrioritizedTask arg) {
return priority < arg.priority ? 1 :
(priority > arg.priority ? -1 : 0);
}
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));
} catch (InterruptedException e) {
}
System.out.println(this);
}
public String toString() {
return String.format("[%1$-3d]", priority) +
" Task " + id;
}
public String summary() {
return "(" + id + ":" + priority + ")";
}
public static class EndSentinel extends PrioritizedTask {
private ExecutorService exec;
public EndSentinel(ExecutorService e) {
super(-1); // Lowest priority in this program
exec = e;
}
public void run() {
int count = 0;
for (PrioritizedTask pt : sequence) {
System.out.print(pt.summary());
if (++count % 5 == 0)
System.out.println();
}
System.out.println();
System.out.println(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class PrioritizedTaskProducer implements Runnable {
private Random rand = new Random(47);
private Queue<Runnable> queue;
private ExecutorService exec;
public PrioritizedTaskProducer(Queue<Runnable> q, ExecutorService e) {
queue = q;
exec = e;
}
public void run() {
for (int i = 0; i < 20; i++) {
queue.add(new PrioritizedTask(rand.nextInt(10)));
Thread.yield();
}
try {
for (int i = 0; i < 10; i++) {
TimeUnit.MILLISECONDS.sleep(250);
queue.add(new PrioritizedTask(10));
}
for (int i = 0; i < 10; i++)
queue.add(new PrioritizedTask(i));
queue.add(new PrioritizedTask.EndSentinel(exec));
} catch (InterruptedException e) {
}
System.out.println("Finished PrioritizedTaskProducer");
}
}
class PrioritizedTaskConsumer implements Runnable {
private PriorityBlockingQueue<Runnable> q;
public PrioritizedTaskConsumer(
PriorityBlockingQueue<Runnable> q) {
this.q = q;
}
public void run() {
try {
while (!Thread.interrupted())
q.take().run();
} catch (InterruptedException e) {
}
System.out.println("Finished PrioritizedTaskConsumer");
}
}
public class PriorityBlockingQueueDemo {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>();
exec.execute(new PrioritizedTaskProducer(queue, exec));
//TimeUnit.SECONDS.sleep(2);//加入休眠手等待所以任务添加完队列之后在take
exec.execute(new PrioritizedTaskConsumer(queue));
}
}
示列详解:
1.该程序是实时存放实时却出的:消费者线程会第一时间拿出存入对列中的最高优先级元素 2.在97行处加入休眠的话会在所有的任务都放入对列中之后在取出元素。
五、定时任务:
通过使用定时任务,使用控制一个任务在多长时间后执行,或者在多长时间后隔一定的时长执行一次任务
1. ScheduleThreadPoolExecutor
ScheduleThreadPoolExecutor中的schedule()可设置多久后执行指定任务 scheduleAtFixedRate()方法可以设定在多长时间后隔一定的时长重复执行任务
示列:
public class ScheduleDemo {
public static void main(String[] args) {
final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
executor.schedule(()->{
System.out.println("两秒执行一次");
},2000, TimeUnit.MILLISECONDS);
executor.scheduleAtFixedRate(()->{
System.out.println("每隔两秒执行一次");
},0,2,TimeUnit.SECONDS);
}
}
2.Timer:
示列:
public class TimerDemo {
static int count=0;
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("两秒后执行一次");
}
},2000l);
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
System.out.println("没隔一秒执行一次");
}
},new Date(System.currentTimeMillis()),2000l);
//new Date(System.currentTimeMillis()代表当前时间
}
}
3.两者的区别:
1.ScheduleThreadPoolExecutor的scheduleAtFixedRate()方法不能提前累加任务,而Timer却可以
示列
public class PkDemo {
static int id = 0;
public static void main(String[] args) throws InterruptedException {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
//1.ScheduledThreadPoolExecutor不能累加任务
executor.scheduleAtFixedRate(() -> {
id++;
System.out.println(id);
System.out.println("ScheduledThreadPoolExecutor不能累加任务");
if (id == 3)
executor.shutdownNow();
}, -2000, 2000, TimeUnit.MILLISECONDS);
TimeUnit.SECONDS.sleep(6);
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
int id = 0;
@Override
public void run() {
id++;
System.out.println(id);
System.out.println("Tiemr可以累计任务");
if (id==4)timer.cancel();
}
}, new Date(System.currentTimeMillis() - 4000), 2000L);
}
}
2.如果出现异常.ScheduleThreadPoolExecutor的scheduleAtFixedRate()方法会吞掉异常,而TImer会抛出异常
示列
class Task extends TimerTask {
volatile int id = 0;
@Override
public void run() {
id++;
System.out.println("sasa");
if (id == 3) throw new RuntimeException("出现异常");
}
}
public class PkDemo2 {
static Timer timer = new Timer();
static ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
public static void main(String[] args) throws InterruptedException {
final Task task = new Task();
timer.scheduleAtFixedRate(task,new Date(System.currentTimeMillis()),2000);
TimeUnit.SECONDS.sleep(6);
executor.scheduleAtFixedRate(new Runnable() {
int id=0;
@Override
public void run() {
id++;
System.out.println("dsads "+id);
if (id==3) {
System.out.println("出现异常,但不抛出");
throw new RuntimeException("运行时异常");
}
}
},0,2000, TimeUnit.MILLISECONDS);
TimeUnit.SECONDS.sleep(6);
executor.shutdownNow();
}
}
但是只要正确的处理异常都能保证任务的继续执行
class Task extends TimerTask {
volatile int id = 0;
@Override
public void run() {
id++;
System.out.println("sasa"+id);
if (id == 3) {
try {
throw new RuntimeException("出现异常");
}catch (RuntimeException e){
System.out.println("已经处理完异常");
}
}
}
}
public class PkDemo2 {
static Timer timer = new Timer();
static ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
public static void main(String[] args) throws InterruptedException {
final Task task = new Task();
timer.scheduleAtFixedRate(task,new Date(System.currentTimeMillis()),2000);
TimeUnit.SECONDS.sleep(10);
timer.cancel();
executor.scheduleAtFixedRate(new Runnable() {
int id=0;
@Override
public void run() {
id++;
System.out.println("dsads "+id);
if (id==3) {
System.out.println("出现异常,但不抛出");
try{
throw new RuntimeException("运行时异常");
} catch (RuntimeException e){
System.out.println("处理了异常");
}
}
}
},0,2000, TimeUnit.MILLISECONDS);
TimeUnit.SECONDS.sleep(8);
executor.shutdownNow();
}
}
六、Semaphore:
正常的锁都允许一个任务访问一项资源,而Semaphore(计数信号量)将同时允许n个任务访问一项资源。可以将该信号量看作向外分发许可证,如果线程取得分为的许可证就可以放访问该资源。
Semaphore主要方法:
release();//释放许可证,将可用许可证的数量增加一个
acquire();//从这个信号量获取一个许可证,如果没用将阻塞直到一个可用,或者线程被中断。如果有,并立即返回,将可用许可证的数量减少一个
列:对象池
public class Pool<T> {
private int size;//设置可以 Semaphore可用的数量
private List<T> items=new ArrayList<>();
private volatile boolean[] checkedOut;
private Semaphore available;
public Pool(Class<T> al,int size){
this.size=size;
checkedOut=new boolean[size];
available=new Semaphore(size,true);
for (int i = 0; i < size; i++) {
try {
items.add(al.newInstance());//通过反射创建对象
} catch (Exception e) {
throw new RuntimeException("运行时异常");
}
}
}
public T checkOut() throws InterruptedException {
available.acquire();//从这个信号量获取一个许可
return getItem();
}
public void checkIn(T x){//把对象返回池中,
if (releaseItem(x)) {
available.release();//释放许可证,将可用许可证的数量增加一个
}
}
private synchronized T getItem(){
for (int i = 0; i <size ; i++) {
if (!checkedOut[i]){//检查是该对象是否已经被调用,同时取出集合中对应的对象元素
checkedOut[i]=true;
return items.get(i);
}
}
return null;
}
private synchronized boolean releaseItem(T item){
int index=items.indexOf(item);//先获取元素在集合中的索引
if (index==-1)return false;
if (checkedOut[index]){//检查对象是否被掉用,如果掉用则将标志改为fales
checkedOut[index]=false;
return true;
}
return false;
}
}
Fat.Fat类的构造器运行比较耗时,通过对象池将减少着损耗。
public class Fat {
private volatile double d;
private static int count=0;
private final int id=count++;
Fat(){
for (int i = 0; i <100 ; i++) {
d+=(Math.PI+Math.E)/(double)i;
}
}
public void operation(){
System.out.println(this);
}
@Override
public String toString() {
return "Fat id: "+id;
}
}
测试:
class CheckOutTask<T> implements Runnable{
private static int count=0;
private final int id=count++;
private Pool<T> pool;
CheckOutTask(Pool<T> p){
pool=p;
}
@Override
public void run() {
try {
T out = pool.checkOut();
System.out.println(this+" checkOut的对象为:"+out);
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println(this+" check in的对象为:"+out);
pool.checkIn(out);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "CheckOutTask " +
"id: " + id ;
}
}
public class PoolTest {
final static int SIZE=25;
public static void main(String[] args) throws InterruptedException {
final Pool<Fat> pool = new Pool<>(Fat.class,SIZE);
final ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i <SIZE ; i++) {
service.execute(new CheckOutTask<Fat>(pool));
}
TimeUnit.SECONDS.sleep(2);
service.shutdownNow();
final List<Fat> list = new ArrayList<>();
for (int i = 0; i <SIZE ; i++) {
final Fat out = pool.checkOut();
list.add(out);
}
for (Fat fat : list) {
System.out.println(fat);
}
}
}
七、Exchanger:
可以在两任务之间交换对象,Exchanger的典型应用场景是:如果一个对象创建比较高昂,那么可以让更多的对象在被创建的同时被消费。
同上例中的fat对象在构造器中比较耗时,通过Exchanger将在这些对象在创建的时候被一个消费者消费
class ExchangerProducer<T> implements Runnable {
private Generator<T> generator;
private Exchanger<List<T>> exchanger;
private List<T> list;
ExchangerProducer(Generator<T> gen, Exchanger<List<T>> exchanger, List<T> list) {
generator = gen;
this.exchanger = exchanger;
this.list = list;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
for (int i = 0; i < ExchangerDemo.size; i++) {
list.add(generator.next());
}
list = exchanger.exchange(list);
}
} catch (InterruptedException e) {
}
}
}
class ExchangerConsumer<T> implements Runnable {
Exchanger<List<T>> exchanger;
private List<T> list;
private volatile T value;
ExchangerConsumer(Exchanger<List<T>> exchanger, List<T> list) {
this.exchanger = exchanger;
this.list = list;
}
@Override
public void run() {
try{
while (!Thread.interrupted()){
list=exchanger.exchange(list);
for (T t : list) {
value=t;
list.remove(t);
}
}
}catch (InterruptedException e){
System.out.println(e);
}
System.out.println("Final value"+value);
}
}
public class ExchangerDemo {
static int size = 10;
static int delay=5;
public static void main(String[] args) throws InterruptedException {
final ExecutorService service = Executors.newCachedThreadPool();
Exchanger<List<Fat>> exchanger=new Exchanger<>();
List<Fat> producerList=new CopyOnWriteArrayList<Fat>(),
consumerList=new CopyOnWriteArrayList<>();
service.execute(new ExchangerProducer<>(BasicGenerator.create(Fat.class),exchanger,producerList));
service.execute(new ExchangerConsumer<>(exchanger,consumerList));
TimeUnit.SECONDS.sleep(delay);
service.shutdownNow();
}
}
注意:
1.当你调用Exchanger.exchanger()进行生产添加时,他将阻塞直到一个消费者任务也调用Exchanger.exchanger()。
public class Example {
public static void main(String[] args) throws IOException {
List<String > list=new CopyOnWriteArrayList<>();
Exchanger<List<String>> exchanger=new Exchanger<>();
new Thread(){
@Override
public void run() {
try {
list.add("ss");
System.out.println(Thread.currentThread().getName()+"开始阻塞");
exchanger.exchange(list);
System.out.println("因为线程受阻不会输出");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
new Thread(){
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"没用去调用exchanger.exchange输出所以线程被阻塞");
}
}.start();
}
}
2.当消费者调用exchanger.exchange(list);时,如果生产者还没有exchanger.exchange(list);添加数据,则消费者会一直等待
public class Example {
public static void main(String[] args) throws IOException, InterruptedException {
List<String > list=new CopyOnWriteArrayList<>();
Exchanger<List<String>> exchanger=new Exchanger<>();
new Thread(){
@Override
public void run() {
try {
for (int i = 0; i <10 ; i++) {
list.add("ss");
}
System.out.println("添加完毕,但没有放入交换的集合中去");
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
new Thread(){
@Override
public void run() {
System.out.println("消费者开始消费");
while (!Thread.interrupted()) {
try {
System.out.println("获取不到生成者的集合,开始等待");
List<String> strings = exchanger.exchange(list);
for (String string : strings) {
System.out.println(string);
strings.remove(string);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者消费完毕");
}
}
}.start();
}
}