一、CountDownLatch:
用来同步一个或多个任务,强制他们等待有其他任务执行的一组操作完成,可以向CountDownLathc设置一个初始计数值,任何在这个对象上调用wait()/await()方法都将等待,直到数值为0,可以通过在其他任务上调用countDown()来减少计数值。CountDwonLatch的典型用法就是将一个程序化分为n个互相独立的可解决任务。
1.示列一:
class TaskProtion implements Runnable{public static int cont=0;private final int id=cont++;private Random random=new Random();private CountDownLatch downLatch;TaskProtion(CountDownLatch lo){downLatch=lo;}@Overridepublic void run() {try {doWork();downLatch.countDown();System.out.println("countDown后的count的值"+downLatch.getCount());} catch (InterruptedException e) {e.printStackTrace();}}public void doWork() throws InterruptedException {TimeUnit.MILLISECONDS.sleep(random.nextInt(2000));System.out.println(this+"completed");}@Overridepublic String toString() {return String.format("for mat "+id);}}class WaitingTask implements Runnable{private static int count=0;private final int id=count++;private CountDownLatch downLatch;WaitingTask(CountDownLatch xuxu){downLatch=xuxu;}@Overridepublic void run() {try {downLatch.await();System.out.println(this+"InterruptExection");} catch (InterruptedException e) {}}@Overridepublic String toString() {return String.format("Task "+id);}}public class CountDemo {static final int SIZE=10;public static void main(String[] args) throws InterruptedException {final ExecutorService service = Executors.newCachedThreadPool();final CountDownLatch latch = new CountDownLatch(SIZE);for (int i = 0; i <2 ; i++) {service.execute(new WaitingTask(latch));}for (int i = 0; i <SIZE ; i++) {service.execute(new TaskProtion(latch));}TimeUnit.SECONDS.sleep(5);service.shutdown();}}
上述程序展示啦CountDownLatch的使用。
注意:
1.countDown()只和初始化的public CountDownLatch(int count){}有关系,同一线程可以执行count次的countDown()
2.在使用countdown()的时候不会阻塞,只有await/wait()方法时候countdown的值不会0时才会被阻塞。
2. 示列二:
class User {private String name;private String id;public void setId(String id) {this.id = id;}public String getId() {return id;}public void setName(String name) {this.name = name;}public String getName() {return name;}@Overridepublic String toString() {return "{User"+" name: "+name+" id: "+id+"}";}}class TaskDemo1 implements Runnable {private CountDownLatch count;private List<User> users;Random random = new Random();private int id = 0;TaskDemo1(CountDownLatch count, List<User> users) {this.count = count;this.users = users;}@Overridepublic void run() {try {info();} catch (InterruptedException e) {e.printStackTrace();}count.countDown();}public void info() throws InterruptedException {User user = new User();user.setName("s" + random.nextInt(10));users.add(user);}}class TaskDemo2 implements Runnable {private CountDownLatch count;private List<User> users;Random random = new Random();TaskDemo2(CountDownLatch count, List<User> users) {this.count = count;this.users = users;}@Overridepublic void run() {try {count.await();if (users.size() != 0 && users != null)for (User user : users) {if (user.getName() != null && user.getId() == null)user.setId("" + random.nextInt(10));}} catch (InterruptedException e) {e.printStackTrace();}}}public class ContdwonDemo {public static void main(String[] args) throws InterruptedException {final List<User> list = new CopyOnWriteArrayList<>();final CountDownLatch count = new CountDownLatch(10);final ExecutorService service = Executors.newCachedThreadPool();for (int i = 0; i < 10; i++) {service.execute(new TaskDemo1(count, list));}TimeUnit.MILLISECONDS.sleep(10);service.execute(new TaskDemo2(count, list));for (User user : list) {System.out.println(user);}service.shutdown();}}
注意:
1.在使用多线程中需要实例对象的时候,要注意给jvm一定时间去实例化对象。或者直接使用CopyOnWriteArrayList不用给定时间,底层add操作有占用一定时间
二、CyclicBarrier:
通过CycliBarrier构造器中开辟一定数量的线程,这些线程在执行到await的时候将挂起,直到其中有一个线程执行CycliBarrier构造器中的任务后才会继续执行.
note:当CycliBarrier的构造器没有传入任务的时候,cyclicBarrier.await();将不会阻塞
示例一:
class HoresDemo implements Runnable{private int coun=0;private CyclicBarrier cyclicBarrier;HoresDemo(CyclicBarrier cyclicBarrier){this.cyclicBarrier=cyclicBarrier;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+" run()任务");try {cyclicBarrier.await();System.out.println(Thread.currentThread().getName()+" 等待后执行的任务");} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}}public class CyclicBarrierDemo {static final int SIZE=5;public static void info(int count,Runnable run){ExecutorService service = Executors.newCachedThreadPool();CyclicBarrier barrier = new CyclicBarrier(count,run);for (int i = 0; i <count ; i++) {service.execute(new HoresDemo(barrier));}}public static void main(String[] args) {Runnable runnable=()->{try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+" CyclicBarrier构造器中的任务");System.exit(0);};info(SIZE,runnable);}}
示列详解:
示例一 展示了CyclicBarrier:的用法:程序中的 CyclicBarrier(count,run);会根据传入的count开启执行的线程数量,同时这些线程在运行到await()时都会等待。知道其中一个线程执行了CyclicBarrier(count,run);中的run任务之后才会继续执行。如果构造器中没有传入run任务的话await将不会阻塞
示例二:赛马游戏
class Horse implements Runnable {private static int count = 0;private final int id = count++;private int strides = 0;//跨步数private Random random = new Random();private CyclicBarrier cyclicBarrier;Horse(CyclicBarrier c){cyclicBarrier=c;}public synchronized int getStrides() {return strides;}@Overridepublic void run() {try {while (!Thread.interrupted()){synchronized (this){strides+=random.nextInt(3);//每个马随机跨的栏数}cyclicBarrier.await();//每个线程等待后将执行CyclicBarrier中的run任务System.out.println(id+"号马跨了"+getStrides()+"栏");}} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {throw new RuntimeException("BrokenBarrierException error");}}@Overridepublic String toString() {return "Horse"+id+" "+" ";}public String tracks(){StringBuilder stringBuilder=new StringBuilder();for (int i = 0; i <getStrides() ; i++) {stringBuilder.append("*");}stringBuilder.append(id);return stringBuilder.toString();}}public class HorseRace {static final int SIZE=75;//总栏数private List<Horse> horses=new ArrayList<>();//存放跑的马private ExecutorService service=Executors.newCachedThreadPool();private CyclicBarrier cycli;public HorseRace(int nHose,final int pause){cycli=new CyclicBarrier(nHose, new Runnable() {@Overridepublic void run() {final StringBuilder builder = new StringBuilder();for (int i = 0; i <SIZE ; i++) {builder.append("|");//打印栏数}System.out.println(builder);for (Horse hors : horses) {System.out.println(hors.tracks());//打印出每个马跑的步数}for (Horse hors : horses) {if (hors.getStrides()>=SIZE) {System.out.println(hors + "won !!!");service.shutdownNow();return;}}try {TimeUnit.MILLISECONDS.sleep(pause);} catch (InterruptedException e) {System.out.println(e);}}});for (int i = 0; i <nHose ; i++) {//开辟了nHose匹马final Horse horse = new Horse(cycli);horses.add(horse);service.execute(horse);}}public static void main(String[] args) {int nHosr=7;int pause=200;new HorseRace(nHosr,pause);}}
示列详解:
当所以的马都跨过他们的栅栏数目时,这个时候遇到了await方法,然后将会抽出一个线程去执行打印马儿栏数和总栅栏的任务。等待这个任务执行完毕后马儿们才会继续跨栏。
三、DelayQueue:
Delay接口:一种混合样式接口,用于标记在给定延迟后应执行操作的对象。用于标记在给定延迟后应执行操作的对象
DelayQueue是一个无界的阻塞队列,该队列存放实现Dealy接口的元素,该队列在存放元素时时无序的,在take时候会按照compareTo()方法设定的排序规则取出元素。
示例一:
class DelayTask implements Runnable, Delayed {private static int count = 0;private final int id = count++;private final int delta;private final long trigger;static CountDownLatch countd = new CountDownLatch(10);protected static List<DelayTask> sequence = new ArrayList<>();public DelayTask(int delta) {this.delta = delta;trigger = System.nanoTime() + TimeUnit.NANOSECONDS.convert(delta, TimeUnit.MILLISECONDS);sequence.add(this);}@Overridepublic void run() {System.out.println(this + " ");countd.countDown();}@Overridepublic String toString() {return String.format("[%1$-4d]", delta) + " Task " + id;}public String summary() {return "(" + id + ":" + delta + ")";}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(trigger - System.nanoTime(), TimeUnit.NANOSECONDS);}@Overridepublic int compareTo(Delayed o) {final DelayTask delayed = (DelayTask) o;if (trigger < delayed.trigger)return -1;if (trigger > delayed.trigger)return 1;return 0;}public static class EndSentinel extends DelayTask {private ExecutorService service = Executors.newCachedThreadPool();public EndSentinel(int delta, ExecutorService executorService) {super(delta);service = executorService;}public void run() {for (DelayTask task : sequence) {System.out.println(task.summary() + " ");}System.out.println();try {countd.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(this + "Calling shutdownNow ");service.shutdownNow();}}}class DelayedTaskConsumer implements Runnable {private DelayQueue<DelayTask> delayQueue;public DelayedTaskConsumer(DelayQueue<DelayTask> delayQueue) {this.delayQueue = delayQueue;}@Overridepublic void run() {try {while (!Thread.interrupted()) {delayQueue.take().run();//进行消费,take 时按照定义的排序顺序}} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费完毕");}}public class DelayQueueDemo {public static void main(String[] args) {info();}public static void info() {final Random random = new Random();ExecutorService e = Executors.newCachedThreadPool();DelayQueue<DelayTask> delayQueue = new DelayQueue<>();for (int i = 0; i < 10; i++) {delayQueue.put(new DelayTask(random.nextInt(5000)));//put时无序}delayQueue.add(new DelayTask.EndSentinel(5000, e));e.execute(new DelayedTaskConsumer(delayQueue));}}
注意:
配合CountDownLatch的使用还可保证,队列中的前面的任务都执行完毕后才回去执行队列最后的结束任务
四、PriorityBlockingQueue:
是一个很的优先级队列,它具有可阻塞的读取操作。同样的在放入对列的时候无序,但在取出对列的时候按照定义排序的顺序取出。
示列:实时取出优先级最高的元素
class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {private Random rand = new Random(47);private static int counter = 0;private final int id = counter++;private final int priority;protected static List<PrioritizedTask> sequence = new ArrayList<PrioritizedTask>();public PrioritizedTask(int priority) {this.priority = priority;sequence.add(this);}public int compareTo(PrioritizedTask arg) {return priority < arg.priority ? 1 :(priority > arg.priority ? -1 : 0);}public void run() {try {TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));} catch (InterruptedException e) {}System.out.println(this);}public String toString() {return String.format("[%1$-3d]", priority) +" Task " + id;}public String summary() {return "(" + id + ":" + priority + ")";}public static class EndSentinel extends PrioritizedTask {private ExecutorService exec;public EndSentinel(ExecutorService e) {super(-1); // Lowest priority in this programexec = e;}public void run() {int count = 0;for (PrioritizedTask pt : sequence) {System.out.print(pt.summary());if (++count % 5 == 0)System.out.println();}System.out.println();System.out.println(this + " Calling shutdownNow()");exec.shutdownNow();}}}class PrioritizedTaskProducer implements Runnable {private Random rand = new Random(47);private Queue<Runnable> queue;private ExecutorService exec;public PrioritizedTaskProducer(Queue<Runnable> q, ExecutorService e) {queue = q;exec = e;}public void run() {for (int i = 0; i < 20; i++) {queue.add(new PrioritizedTask(rand.nextInt(10)));Thread.yield();}try {for (int i = 0; i < 10; i++) {TimeUnit.MILLISECONDS.sleep(250);queue.add(new PrioritizedTask(10));}for (int i = 0; i < 10; i++)queue.add(new PrioritizedTask(i));queue.add(new PrioritizedTask.EndSentinel(exec));} catch (InterruptedException e) {}System.out.println("Finished PrioritizedTaskProducer");}}class PrioritizedTaskConsumer implements Runnable {private PriorityBlockingQueue<Runnable> q;public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q) {this.q = q;}public void run() {try {while (!Thread.interrupted())q.take().run();} catch (InterruptedException e) {}System.out.println("Finished PrioritizedTaskConsumer");}}public class PriorityBlockingQueueDemo {public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool();PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>();exec.execute(new PrioritizedTaskProducer(queue, exec));//TimeUnit.SECONDS.sleep(2);//加入休眠手等待所以任务添加完队列之后在takeexec.execute(new PrioritizedTaskConsumer(queue));}}
示列详解:
1.该程序是实时存放实时却出的:消费者线程会第一时间拿出存入对列中的最高优先级元素 2.在97行处加入休眠的话会在所有的任务都放入对列中之后在取出元素。
五、定时任务:
通过使用定时任务,使用控制一个任务在多长时间后执行,或者在多长时间后隔一定的时长执行一次任务
1. ScheduleThreadPoolExecutor
ScheduleThreadPoolExecutor中的schedule()可设置多久后执行指定任务 scheduleAtFixedRate()方法可以设定在多长时间后隔一定的时长重复执行任务
示列:
public class ScheduleDemo {public static void main(String[] args) {final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);executor.schedule(()->{System.out.println("两秒执行一次");},2000, TimeUnit.MILLISECONDS);executor.scheduleAtFixedRate(()->{System.out.println("每隔两秒执行一次");},0,2,TimeUnit.SECONDS);}}
2.Timer:
示列:
public class TimerDemo {static int count=0;public static void main(String[] args) {Timer timer = new Timer();timer.schedule(new TimerTask() {@Overridepublic void run() {System.out.println("两秒后执行一次");}},2000l);timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {System.out.println("没隔一秒执行一次");}},new Date(System.currentTimeMillis()),2000l);//new Date(System.currentTimeMillis()代表当前时间}}
3.两者的区别:
1.ScheduleThreadPoolExecutor的scheduleAtFixedRate()方法不能提前累加任务,而Timer却可以
示列
public class PkDemo {static int id = 0;public static void main(String[] args) throws InterruptedException {ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);//1.ScheduledThreadPoolExecutor不能累加任务executor.scheduleAtFixedRate(() -> {id++;System.out.println(id);System.out.println("ScheduledThreadPoolExecutor不能累加任务");if (id == 3)executor.shutdownNow();}, -2000, 2000, TimeUnit.MILLISECONDS);TimeUnit.SECONDS.sleep(6);Timer timer = new Timer();timer.scheduleAtFixedRate(new TimerTask() {int id = 0;@Overridepublic void run() {id++;System.out.println(id);System.out.println("Tiemr可以累计任务");if (id==4)timer.cancel();}}, new Date(System.currentTimeMillis() - 4000), 2000L);}}
2.如果出现异常.ScheduleThreadPoolExecutor的scheduleAtFixedRate()方法会吞掉异常,而TImer会抛出异常
示列
class Task extends TimerTask {volatile int id = 0;@Overridepublic void run() {id++;System.out.println("sasa");if (id == 3) throw new RuntimeException("出现异常");}}public class PkDemo2 {static Timer timer = new Timer();static ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);public static void main(String[] args) throws InterruptedException {final Task task = new Task();timer.scheduleAtFixedRate(task,new Date(System.currentTimeMillis()),2000);TimeUnit.SECONDS.sleep(6);executor.scheduleAtFixedRate(new Runnable() {int id=0;@Overridepublic void run() {id++;System.out.println("dsads "+id);if (id==3) {System.out.println("出现异常,但不抛出");throw new RuntimeException("运行时异常");}}},0,2000, TimeUnit.MILLISECONDS);TimeUnit.SECONDS.sleep(6);executor.shutdownNow();}}
但是只要正确的处理异常都能保证任务的继续执行
class Task extends TimerTask {volatile int id = 0;@Overridepublic void run() {id++;System.out.println("sasa"+id);if (id == 3) {try {throw new RuntimeException("出现异常");}catch (RuntimeException e){System.out.println("已经处理完异常");}}}}public class PkDemo2 {static Timer timer = new Timer();static ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);public static void main(String[] args) throws InterruptedException {final Task task = new Task();timer.scheduleAtFixedRate(task,new Date(System.currentTimeMillis()),2000);TimeUnit.SECONDS.sleep(10);timer.cancel();executor.scheduleAtFixedRate(new Runnable() {int id=0;@Overridepublic void run() {id++;System.out.println("dsads "+id);if (id==3) {System.out.println("出现异常,但不抛出");try{throw new RuntimeException("运行时异常");} catch (RuntimeException e){System.out.println("处理了异常");}}}},0,2000, TimeUnit.MILLISECONDS);TimeUnit.SECONDS.sleep(8);executor.shutdownNow();}}
六、Semaphore:
正常的锁都允许一个任务访问一项资源,而Semaphore(计数信号量)将同时允许n个任务访问一项资源。可以将该信号量看作向外分发许可证,如果线程取得分为的许可证就可以放访问该资源。
Semaphore主要方法:
release();//释放许可证,将可用许可证的数量增加一个acquire();//从这个信号量获取一个许可证,如果没用将阻塞直到一个可用,或者线程被中断。如果有,并立即返回,将可用许可证的数量减少一个
列:对象池
public class Pool<T> {private int size;//设置可以 Semaphore可用的数量private List<T> items=new ArrayList<>();private volatile boolean[] checkedOut;private Semaphore available;public Pool(Class<T> al,int size){this.size=size;checkedOut=new boolean[size];available=new Semaphore(size,true);for (int i = 0; i < size; i++) {try {items.add(al.newInstance());//通过反射创建对象} catch (Exception e) {throw new RuntimeException("运行时异常");}}}public T checkOut() throws InterruptedException {available.acquire();//从这个信号量获取一个许可return getItem();}public void checkIn(T x){//把对象返回池中,if (releaseItem(x)) {available.release();//释放许可证,将可用许可证的数量增加一个}}private synchronized T getItem(){for (int i = 0; i <size ; i++) {if (!checkedOut[i]){//检查是该对象是否已经被调用,同时取出集合中对应的对象元素checkedOut[i]=true;return items.get(i);}}return null;}private synchronized boolean releaseItem(T item){int index=items.indexOf(item);//先获取元素在集合中的索引if (index==-1)return false;if (checkedOut[index]){//检查对象是否被掉用,如果掉用则将标志改为falescheckedOut[index]=false;return true;}return false;}}
Fat.Fat类的构造器运行比较耗时,通过对象池将减少着损耗。
public class Fat {private volatile double d;private static int count=0;private final int id=count++;Fat(){for (int i = 0; i <100 ; i++) {d+=(Math.PI+Math.E)/(double)i;}}public void operation(){System.out.println(this);}@Overridepublic String toString() {return "Fat id: "+id;}}
测试:
class CheckOutTask<T> implements Runnable{private static int count=0;private final int id=count++;private Pool<T> pool;CheckOutTask(Pool<T> p){pool=p;}@Overridepublic void run() {try {T out = pool.checkOut();System.out.println(this+" checkOut的对象为:"+out);TimeUnit.MILLISECONDS.sleep(1000);System.out.println(this+" check in的对象为:"+out);pool.checkIn(out);} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic String toString() {return "CheckOutTask " +"id: " + id ;}}public class PoolTest {final static int SIZE=25;public static void main(String[] args) throws InterruptedException {final Pool<Fat> pool = new Pool<>(Fat.class,SIZE);final ExecutorService service = Executors.newCachedThreadPool();for (int i = 0; i <SIZE ; i++) {service.execute(new CheckOutTask<Fat>(pool));}TimeUnit.SECONDS.sleep(2);service.shutdownNow();final List<Fat> list = new ArrayList<>();for (int i = 0; i <SIZE ; i++) {final Fat out = pool.checkOut();list.add(out);}for (Fat fat : list) {System.out.println(fat);}}}
七、Exchanger:
可以在两任务之间交换对象,Exchanger的典型应用场景是:如果一个对象创建比较高昂,那么可以让更多的对象在被创建的同时被消费。
同上例中的fat对象在构造器中比较耗时,通过Exchanger将在这些对象在创建的时候被一个消费者消费
class ExchangerProducer<T> implements Runnable {private Generator<T> generator;private Exchanger<List<T>> exchanger;private List<T> list;ExchangerProducer(Generator<T> gen, Exchanger<List<T>> exchanger, List<T> list) {generator = gen;this.exchanger = exchanger;this.list = list;}@Overridepublic void run() {try {while (!Thread.interrupted()) {for (int i = 0; i < ExchangerDemo.size; i++) {list.add(generator.next());}list = exchanger.exchange(list);}} catch (InterruptedException e) {}}}class ExchangerConsumer<T> implements Runnable {Exchanger<List<T>> exchanger;private List<T> list;private volatile T value;ExchangerConsumer(Exchanger<List<T>> exchanger, List<T> list) {this.exchanger = exchanger;this.list = list;}@Overridepublic void run() {try{while (!Thread.interrupted()){list=exchanger.exchange(list);for (T t : list) {value=t;list.remove(t);}}}catch (InterruptedException e){System.out.println(e);}System.out.println("Final value"+value);}}public class ExchangerDemo {static int size = 10;static int delay=5;public static void main(String[] args) throws InterruptedException {final ExecutorService service = Executors.newCachedThreadPool();Exchanger<List<Fat>> exchanger=new Exchanger<>();List<Fat> producerList=new CopyOnWriteArrayList<Fat>(),consumerList=new CopyOnWriteArrayList<>();service.execute(new ExchangerProducer<>(BasicGenerator.create(Fat.class),exchanger,producerList));service.execute(new ExchangerConsumer<>(exchanger,consumerList));TimeUnit.SECONDS.sleep(delay);service.shutdownNow();}}
注意:
1.当你调用Exchanger.exchanger()进行生产添加时,他将阻塞直到一个消费者任务也调用Exchanger.exchanger()。
public class Example {public static void main(String[] args) throws IOException {List<String > list=new CopyOnWriteArrayList<>();Exchanger<List<String>> exchanger=new Exchanger<>();new Thread(){@Overridepublic void run() {try {list.add("ss");System.out.println(Thread.currentThread().getName()+"开始阻塞");exchanger.exchange(list);System.out.println("因为线程受阻不会输出");} catch (InterruptedException e) {e.printStackTrace();}}}.start();new Thread(){@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+"没用去调用exchanger.exchange输出所以线程被阻塞");}}.start();}}
2.当消费者调用exchanger.exchange(list);时,如果生产者还没有exchanger.exchange(list);添加数据,则消费者会一直等待
public class Example {public static void main(String[] args) throws IOException, InterruptedException {List<String > list=new CopyOnWriteArrayList<>();Exchanger<List<String>> exchanger=new Exchanger<>();new Thread(){@Overridepublic void run() {try {for (int i = 0; i <10 ; i++) {list.add("ss");}System.out.println("添加完毕,但没有放入交换的集合中去");} catch (Exception e) {e.printStackTrace();}}}.start();new Thread(){@Overridepublic void run() {System.out.println("消费者开始消费");while (!Thread.interrupted()) {try {System.out.println("获取不到生成者的集合,开始等待");List<String> strings = exchanger.exchange(list);for (String string : strings) {System.out.println(string);strings.remove(string);}} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者消费完毕");}}}.start();}}
