一、CountDownLatch:

用来同步一个或多个任务,强制他们等待有其他任务执行的一组操作完成,可以向CountDownLathc设置一个初始计数值,任何在这个对象上调用wait()/await()方法都将等待,直到数值为0,可以通过在其他任务上调用countDown()来减少计数值。CountDwonLatch的典型用法就是将一个程序化分为n个互相独立的可解决任务。

1.示列一:

  1. class TaskProtion implements Runnable{
  2. public static int cont=0;
  3. private final int id=cont++;
  4. private Random random=new Random();
  5. private CountDownLatch downLatch;
  6. TaskProtion(CountDownLatch lo){
  7. downLatch=lo;
  8. }
  9. @Override
  10. public void run() {
  11. try {
  12. doWork();
  13. downLatch.countDown();
  14. System.out.println("countDown后的count的值"+downLatch.getCount());
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. public void doWork() throws InterruptedException {
  20. TimeUnit.MILLISECONDS.sleep(random.nextInt(2000));
  21. System.out.println(this+"completed");
  22. }
  23. @Override
  24. public String toString() {
  25. return String.format("for mat "+id);
  26. }
  27. }
  28. class WaitingTask implements Runnable{
  29. private static int count=0;
  30. private final int id=count++;
  31. private CountDownLatch downLatch;
  32. WaitingTask(CountDownLatch xuxu){
  33. downLatch=xuxu;
  34. }
  35. @Override
  36. public void run() {
  37. try {
  38. downLatch.await();
  39. System.out.println(this+"InterruptExection");
  40. } catch (InterruptedException e) {
  41. }
  42. }
  43. @Override
  44. public String toString() {
  45. return String.format("Task "+id);
  46. }
  47. }
  48. public class CountDemo {
  49. static final int SIZE=10;
  50. public static void main(String[] args) throws InterruptedException {
  51. final ExecutorService service = Executors.newCachedThreadPool();
  52. final CountDownLatch latch = new CountDownLatch(SIZE);
  53. for (int i = 0; i <2 ; i++) {
  54. service.execute(new WaitingTask(latch));
  55. }
  56. for (int i = 0; i <SIZE ; i++) {
  57. service.execute(new TaskProtion(latch));
  58. }
  59. TimeUnit.SECONDS.sleep(5);
  60. service.shutdown();
  61. }
  62. }

上述程序展示啦CountDownLatch的使用。
注意:

1.countDown()只和初始化的public CountDownLatch(int count){}有关系,同一线程可以执行count次的countDown()
2.在使用countdown()的时候不会阻塞,只有await/wait()方法时候countdown的值不会0时才会被阻塞。

2. 示列二:

  1. class User {
  2. private String name;
  3. private String id;
  4. public void setId(String id) {
  5. this.id = id;
  6. }
  7. public String getId() {
  8. return id;
  9. }
  10. public void setName(String name) {
  11. this.name = name;
  12. }
  13. public String getName() {
  14. return name;
  15. }
  16. @Override
  17. public String toString() {
  18. return "{User"+" name: "+name+" id: "+id+"}";
  19. }
  20. }
  21. class TaskDemo1 implements Runnable {
  22. private CountDownLatch count;
  23. private List<User> users;
  24. Random random = new Random();
  25. private int id = 0;
  26. TaskDemo1(CountDownLatch count, List<User> users) {
  27. this.count = count;
  28. this.users = users;
  29. }
  30. @Override
  31. public void run() {
  32. try {
  33. info();
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. count.countDown();
  38. }
  39. public void info() throws InterruptedException {
  40. User user = new User();
  41. user.setName("s" + random.nextInt(10));
  42. users.add(user);
  43. }
  44. }
  45. class TaskDemo2 implements Runnable {
  46. private CountDownLatch count;
  47. private List<User> users;
  48. Random random = new Random();
  49. TaskDemo2(CountDownLatch count, List<User> users) {
  50. this.count = count;
  51. this.users = users;
  52. }
  53. @Override
  54. public void run() {
  55. try {
  56. count.await();
  57. if (users.size() != 0 && users != null)
  58. for (User user : users) {
  59. if (user.getName() != null && user.getId() == null)
  60. user.setId("" + random.nextInt(10));
  61. }
  62. } catch (InterruptedException e) {
  63. e.printStackTrace();
  64. }
  65. }
  66. }
  67. public class ContdwonDemo {
  68. public static void main(String[] args) throws InterruptedException {
  69. final List<User> list = new CopyOnWriteArrayList<>();
  70. final CountDownLatch count = new CountDownLatch(10);
  71. final ExecutorService service = Executors.newCachedThreadPool();
  72. for (int i = 0; i < 10; i++) {
  73. service.execute(new TaskDemo1(count, list));
  74. }
  75. TimeUnit.MILLISECONDS.sleep(10);
  76. service.execute(new TaskDemo2(count, list));
  77. for (User user : list) {
  78. System.out.println(user);
  79. }
  80. service.shutdown();
  81. }
  82. }

注意:

1.在使用多线程中需要实例对象的时候,要注意给jvm一定时间去实例化对象。或者直接使用CopyOnWriteArrayList不用给定时间,底层add操作有占用一定时间

二、CyclicBarrier:

通过CycliBarrier构造器中开辟一定数量的线程,这些线程在执行到await的时候将挂起,直到其中有一个线程执行CycliBarrier构造器中的任务后才会继续执行.
note:当CycliBarrier的构造器没有传入任务的时候,cyclicBarrier.await();将不会阻塞

示例一:

  1. class HoresDemo implements Runnable{
  2. private int coun=0;
  3. private CyclicBarrier cyclicBarrier;
  4. HoresDemo(CyclicBarrier cyclicBarrier){
  5. this.cyclicBarrier=cyclicBarrier;
  6. }
  7. @Override
  8. public void run() {
  9. System.out.println(Thread.currentThread().getName()+" run()任务");
  10. try {
  11. cyclicBarrier.await();
  12. System.out.println(Thread.currentThread().getName()+" 等待后执行的任务");
  13. } catch (InterruptedException | BrokenBarrierException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. }
  18. public class CyclicBarrierDemo {
  19. static final int SIZE=5;
  20. public static void info(int count,Runnable run){
  21. ExecutorService service = Executors.newCachedThreadPool();
  22. CyclicBarrier barrier = new CyclicBarrier(count,run);
  23. for (int i = 0; i <count ; i++) {
  24. service.execute(new HoresDemo(barrier));
  25. }
  26. }
  27. public static void main(String[] args) {
  28. Runnable runnable=()->{
  29. try {
  30. TimeUnit.SECONDS.sleep(10);
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. }
  34. System.out.println(Thread.currentThread().getName()+" CyclicBarrier构造器中的任务");
  35. System.exit(0);
  36. };
  37. info(SIZE,runnable);
  38. }
  39. }

示列详解:

示例一 展示了CyclicBarrier:的用法:程序中的 CyclicBarrier(count,run);会根据传入的count开启执行的线程数量,同时这些线程在运行到await()时都会等待。知道其中一个线程执行了CyclicBarrier(count,run);中的run任务之后才会继续执行。如果构造器中没有传入run任务的话await将不会阻塞

示例二:赛马游戏

  1. class Horse implements Runnable {
  2. private static int count = 0;
  3. private final int id = count++;
  4. private int strides = 0;//跨步数
  5. private Random random = new Random();
  6. private CyclicBarrier cyclicBarrier;
  7. Horse(CyclicBarrier c){
  8. cyclicBarrier=c;
  9. }
  10. public synchronized int getStrides() {
  11. return strides;
  12. }
  13. @Override
  14. public void run() {
  15. try {
  16. while (!Thread.interrupted()){
  17. synchronized (this){
  18. strides+=random.nextInt(3);//每个马随机跨的栏数
  19. }
  20. cyclicBarrier.await();//每个线程等待后将执行CyclicBarrier中的run任务
  21. System.out.println(id+"号马跨了"+getStrides()+"栏");
  22. }
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. } catch (BrokenBarrierException e) {
  26. throw new RuntimeException("BrokenBarrierException error");
  27. }
  28. }
  29. @Override
  30. public String toString() {
  31. return "Horse"+id+" "+" ";
  32. }
  33. public String tracks(){
  34. StringBuilder stringBuilder=new StringBuilder();
  35. for (int i = 0; i <getStrides() ; i++) {
  36. stringBuilder.append("*");
  37. }
  38. stringBuilder.append(id);
  39. return stringBuilder.toString();
  40. }
  41. }
  42. public class HorseRace {
  43. static final int SIZE=75;//总栏数
  44. private List<Horse> horses=new ArrayList<>();//存放跑的马
  45. private ExecutorService service=Executors.newCachedThreadPool();
  46. private CyclicBarrier cycli;
  47. public HorseRace(int nHose,final int pause){
  48. cycli=new CyclicBarrier(nHose, new Runnable() {
  49. @Override
  50. public void run() {
  51. final StringBuilder builder = new StringBuilder();
  52. for (int i = 0; i <SIZE ; i++) {
  53. builder.append("|");//打印栏数
  54. }
  55. System.out.println(builder);
  56. for (Horse hors : horses) {
  57. System.out.println(hors.tracks());//打印出每个马跑的步数
  58. }
  59. for (Horse hors : horses) {
  60. if (hors.getStrides()>=SIZE) {
  61. System.out.println(hors + "won !!!");
  62. service.shutdownNow();
  63. return;
  64. }
  65. }
  66. try {
  67. TimeUnit.MILLISECONDS.sleep(pause);
  68. } catch (InterruptedException e) {
  69. System.out.println(e);
  70. }
  71. }
  72. });
  73. for (int i = 0; i <nHose ; i++) {//开辟了nHose匹马
  74. final Horse horse = new Horse(cycli);
  75. horses.add(horse);
  76. service.execute(horse);
  77. }
  78. }
  79. public static void main(String[] args) {
  80. int nHosr=7;
  81. int pause=200;
  82. new HorseRace(nHosr,pause);
  83. }
  84. }

示列详解:

当所以的马都跨过他们的栅栏数目时,这个时候遇到了await方法,然后将会抽出一个线程去执行打印马儿栏数和总栅栏的任务。等待这个任务执行完毕后马儿们才会继续跨栏。

三、DelayQueue:

Delay接口:一种混合样式接口,用于标记在给定延迟后应执行操作的对象。用于标记在给定延迟后应执行操作的对象
DelayQueue是一个无界的阻塞队列,该队列存放实现Dealy接口的元素,该队列在存放元素时时无序的,在take时候会按照compareTo()方法设定的排序规则取出元素。

示例一:

  1. class DelayTask implements Runnable, Delayed {
  2. private static int count = 0;
  3. private final int id = count++;
  4. private final int delta;
  5. private final long trigger;
  6. static CountDownLatch countd = new CountDownLatch(10);
  7. protected static List<DelayTask> sequence = new ArrayList<>();
  8. public DelayTask(int delta) {
  9. this.delta = delta;
  10. trigger = System.nanoTime() + TimeUnit.NANOSECONDS.convert(delta, TimeUnit.MILLISECONDS);
  11. sequence.add(this);
  12. }
  13. @Override
  14. public void run() {
  15. System.out.println(this + " ");
  16. countd.countDown();
  17. }
  18. @Override
  19. public String toString() {
  20. return String.format("[%1$-4d]", delta) + " Task " + id;
  21. }
  22. public String summary() {
  23. return "(" + id + ":" + delta + ")";
  24. }
  25. @Override
  26. public long getDelay(TimeUnit unit) {
  27. return unit.convert(trigger - System.nanoTime(), TimeUnit.NANOSECONDS);
  28. }
  29. @Override
  30. public int compareTo(Delayed o) {
  31. final DelayTask delayed = (DelayTask) o;
  32. if (trigger < delayed.trigger)
  33. return -1;
  34. if (trigger > delayed.trigger)
  35. return 1;
  36. return 0;
  37. }
  38. public static class EndSentinel extends DelayTask {
  39. private ExecutorService service = Executors.newCachedThreadPool();
  40. public EndSentinel(int delta, ExecutorService executorService) {
  41. super(delta);
  42. service = executorService;
  43. }
  44. public void run() {
  45. for (DelayTask task : sequence) {
  46. System.out.println(task.summary() + " ");
  47. }
  48. System.out.println();
  49. try {
  50. countd.await();
  51. } catch (InterruptedException e) {
  52. e.printStackTrace();
  53. }
  54. System.out.println(this + "Calling shutdownNow ");
  55. service.shutdownNow();
  56. }
  57. }
  58. }
  59. class DelayedTaskConsumer implements Runnable {
  60. private DelayQueue<DelayTask> delayQueue;
  61. public DelayedTaskConsumer(DelayQueue<DelayTask> delayQueue) {
  62. this.delayQueue = delayQueue;
  63. }
  64. @Override
  65. public void run() {
  66. try {
  67. while (!Thread.interrupted()) {
  68. delayQueue.take().run();//进行消费,take 时按照定义的排序顺序
  69. }
  70. } catch (InterruptedException e) {
  71. e.printStackTrace();
  72. }
  73. System.out.println("消费完毕");
  74. }
  75. }
  76. public class DelayQueueDemo {
  77. public static void main(String[] args) {
  78. info();
  79. }
  80. public static void info() {
  81. final Random random = new Random();
  82. ExecutorService e = Executors.newCachedThreadPool();
  83. DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
  84. for (int i = 0; i < 10; i++) {
  85. delayQueue.put(new DelayTask(random.nextInt(5000)));//put时无序
  86. }
  87. delayQueue.add(new DelayTask.EndSentinel(5000, e));
  88. e.execute(new DelayedTaskConsumer(delayQueue));
  89. }
  90. }

注意:

配合CountDownLatch的使用还可保证,队列中的前面的任务都执行完毕后才回去执行队列最后的结束任务

四、PriorityBlockingQueue:

是一个很的优先级队列,它具有可阻塞的读取操作。同样的在放入对列的时候无序,但在取出对列的时候按照定义排序的顺序取出。

示列:实时取出优先级最高的元素

  1. class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
  2. private Random rand = new Random(47);
  3. private static int counter = 0;
  4. private final int id = counter++;
  5. private final int priority;
  6. protected static List<PrioritizedTask> sequence = new ArrayList<PrioritizedTask>();
  7. public PrioritizedTask(int priority) {
  8. this.priority = priority;
  9. sequence.add(this);
  10. }
  11. public int compareTo(PrioritizedTask arg) {
  12. return priority < arg.priority ? 1 :
  13. (priority > arg.priority ? -1 : 0);
  14. }
  15. public void run() {
  16. try {
  17. TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));
  18. } catch (InterruptedException e) {
  19. }
  20. System.out.println(this);
  21. }
  22. public String toString() {
  23. return String.format("[%1$-3d]", priority) +
  24. " Task " + id;
  25. }
  26. public String summary() {
  27. return "(" + id + ":" + priority + ")";
  28. }
  29. public static class EndSentinel extends PrioritizedTask {
  30. private ExecutorService exec;
  31. public EndSentinel(ExecutorService e) {
  32. super(-1); // Lowest priority in this program
  33. exec = e;
  34. }
  35. public void run() {
  36. int count = 0;
  37. for (PrioritizedTask pt : sequence) {
  38. System.out.print(pt.summary());
  39. if (++count % 5 == 0)
  40. System.out.println();
  41. }
  42. System.out.println();
  43. System.out.println(this + " Calling shutdownNow()");
  44. exec.shutdownNow();
  45. }
  46. }
  47. }
  48. class PrioritizedTaskProducer implements Runnable {
  49. private Random rand = new Random(47);
  50. private Queue<Runnable> queue;
  51. private ExecutorService exec;
  52. public PrioritizedTaskProducer(Queue<Runnable> q, ExecutorService e) {
  53. queue = q;
  54. exec = e;
  55. }
  56. public void run() {
  57. for (int i = 0; i < 20; i++) {
  58. queue.add(new PrioritizedTask(rand.nextInt(10)));
  59. Thread.yield();
  60. }
  61. try {
  62. for (int i = 0; i < 10; i++) {
  63. TimeUnit.MILLISECONDS.sleep(250);
  64. queue.add(new PrioritizedTask(10));
  65. }
  66. for (int i = 0; i < 10; i++)
  67. queue.add(new PrioritizedTask(i));
  68. queue.add(new PrioritizedTask.EndSentinel(exec));
  69. } catch (InterruptedException e) {
  70. }
  71. System.out.println("Finished PrioritizedTaskProducer");
  72. }
  73. }
  74. class PrioritizedTaskConsumer implements Runnable {
  75. private PriorityBlockingQueue<Runnable> q;
  76. public PrioritizedTaskConsumer(
  77. PriorityBlockingQueue<Runnable> q) {
  78. this.q = q;
  79. }
  80. public void run() {
  81. try {
  82. while (!Thread.interrupted())
  83. q.take().run();
  84. } catch (InterruptedException e) {
  85. }
  86. System.out.println("Finished PrioritizedTaskConsumer");
  87. }
  88. }
  89. public class PriorityBlockingQueueDemo {
  90. public static void main(String[] args) throws Exception {
  91. ExecutorService exec = Executors.newCachedThreadPool();
  92. PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>();
  93. exec.execute(new PrioritizedTaskProducer(queue, exec));
  94. //TimeUnit.SECONDS.sleep(2);//加入休眠手等待所以任务添加完队列之后在take
  95. exec.execute(new PrioritizedTaskConsumer(queue));
  96. }
  97. }

示列详解:

1.该程序是实时存放实时却出的:消费者线程会第一时间拿出存入对列中的最高优先级元素 2.在97行处加入休眠的话会在所有的任务都放入对列中之后在取出元素。


五、定时任务:

  1. 通过使用定时任务,使用控制一个任务在多长时间后执行,或者在多长时间后隔一定的时长执行一次任务

1. ScheduleThreadPoolExecutor

ScheduleThreadPoolExecutor中的schedule()可设置多久后执行指定任务 scheduleAtFixedRate()方法可以设定在多长时间后隔一定的时长重复执行任务

示列:

  1. public class ScheduleDemo {
  2. public static void main(String[] args) {
  3. final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
  4. executor.schedule(()->{
  5. System.out.println("两秒执行一次");
  6. },2000, TimeUnit.MILLISECONDS);
  7. executor.scheduleAtFixedRate(()->{
  8. System.out.println("每隔两秒执行一次");
  9. },0,2,TimeUnit.SECONDS);
  10. }
  11. }

2.Timer:

示列:

  1. public class TimerDemo {
  2. static int count=0;
  3. public static void main(String[] args) {
  4. Timer timer = new Timer();
  5. timer.schedule(new TimerTask() {
  6. @Override
  7. public void run() {
  8. System.out.println("两秒后执行一次");
  9. }
  10. },2000l);
  11. timer.scheduleAtFixedRate(new TimerTask() {
  12. @Override
  13. public void run() {
  14. System.out.println("没隔一秒执行一次");
  15. }
  16. },new Date(System.currentTimeMillis()),2000l);
  17. //new Date(System.currentTimeMillis()代表当前时间
  18. }
  19. }

3.两者的区别:

1.ScheduleThreadPoolExecutor的scheduleAtFixedRate()方法不能提前累加任务,而Timer却可以
示列

  1. public class PkDemo {
  2. static int id = 0;
  3. public static void main(String[] args) throws InterruptedException {
  4. ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
  5. //1.ScheduledThreadPoolExecutor不能累加任务
  6. executor.scheduleAtFixedRate(() -> {
  7. id++;
  8. System.out.println(id);
  9. System.out.println("ScheduledThreadPoolExecutor不能累加任务");
  10. if (id == 3)
  11. executor.shutdownNow();
  12. }, -2000, 2000, TimeUnit.MILLISECONDS);
  13. TimeUnit.SECONDS.sleep(6);
  14. Timer timer = new Timer();
  15. timer.scheduleAtFixedRate(new TimerTask() {
  16. int id = 0;
  17. @Override
  18. public void run() {
  19. id++;
  20. System.out.println(id);
  21. System.out.println("Tiemr可以累计任务");
  22. if (id==4)timer.cancel();
  23. }
  24. }, new Date(System.currentTimeMillis() - 4000), 2000L);
  25. }
  26. }

2.如果出现异常.ScheduleThreadPoolExecutor的scheduleAtFixedRate()方法会吞掉异常,而TImer会抛出异常
示列

  1. class Task extends TimerTask {
  2. volatile int id = 0;
  3. @Override
  4. public void run() {
  5. id++;
  6. System.out.println("sasa");
  7. if (id == 3) throw new RuntimeException("出现异常");
  8. }
  9. }
  10. public class PkDemo2 {
  11. static Timer timer = new Timer();
  12. static ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
  13. public static void main(String[] args) throws InterruptedException {
  14. final Task task = new Task();
  15. timer.scheduleAtFixedRate(task,new Date(System.currentTimeMillis()),2000);
  16. TimeUnit.SECONDS.sleep(6);
  17. executor.scheduleAtFixedRate(new Runnable() {
  18. int id=0;
  19. @Override
  20. public void run() {
  21. id++;
  22. System.out.println("dsads "+id);
  23. if (id==3) {
  24. System.out.println("出现异常,但不抛出");
  25. throw new RuntimeException("运行时异常");
  26. }
  27. }
  28. },0,2000, TimeUnit.MILLISECONDS);
  29. TimeUnit.SECONDS.sleep(6);
  30. executor.shutdownNow();
  31. }
  32. }

但是只要正确的处理异常都能保证任务的继续执行

  1. class Task extends TimerTask {
  2. volatile int id = 0;
  3. @Override
  4. public void run() {
  5. id++;
  6. System.out.println("sasa"+id);
  7. if (id == 3) {
  8. try {
  9. throw new RuntimeException("出现异常");
  10. }catch (RuntimeException e){
  11. System.out.println("已经处理完异常");
  12. }
  13. }
  14. }
  15. }
  16. public class PkDemo2 {
  17. static Timer timer = new Timer();
  18. static ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
  19. public static void main(String[] args) throws InterruptedException {
  20. final Task task = new Task();
  21. timer.scheduleAtFixedRate(task,new Date(System.currentTimeMillis()),2000);
  22. TimeUnit.SECONDS.sleep(10);
  23. timer.cancel();
  24. executor.scheduleAtFixedRate(new Runnable() {
  25. int id=0;
  26. @Override
  27. public void run() {
  28. id++;
  29. System.out.println("dsads "+id);
  30. if (id==3) {
  31. System.out.println("出现异常,但不抛出");
  32. try{
  33. throw new RuntimeException("运行时异常");
  34. } catch (RuntimeException e){
  35. System.out.println("处理了异常");
  36. }
  37. }
  38. }
  39. },0,2000, TimeUnit.MILLISECONDS);
  40. TimeUnit.SECONDS.sleep(8);
  41. executor.shutdownNow();
  42. }
  43. }

六、Semaphore:

正常的锁都允许一个任务访问一项资源,而Semaphore(计数信号量)将同时允许n个任务访问一项资源。可以将该信号量看作向外分发许可证,如果线程取得分为的许可证就可以放访问该资源。
Semaphore主要方法:

  1. release();//释放许可证,将可用许可证的数量增加一个
  2. acquire();//从这个信号量获取一个许可证,如果没用将阻塞直到一个可用,或者线程被中断。如果有,并立即返回,将可用许可证的数量减少一个

列:对象池

  1. public class Pool<T> {
  2. private int size;//设置可以 Semaphore可用的数量
  3. private List<T> items=new ArrayList<>();
  4. private volatile boolean[] checkedOut;
  5. private Semaphore available;
  6. public Pool(Class<T> al,int size){
  7. this.size=size;
  8. checkedOut=new boolean[size];
  9. available=new Semaphore(size,true);
  10. for (int i = 0; i < size; i++) {
  11. try {
  12. items.add(al.newInstance());//通过反射创建对象
  13. } catch (Exception e) {
  14. throw new RuntimeException("运行时异常");
  15. }
  16. }
  17. }
  18. public T checkOut() throws InterruptedException {
  19. available.acquire();//从这个信号量获取一个许可
  20. return getItem();
  21. }
  22. public void checkIn(T x){//把对象返回池中,
  23. if (releaseItem(x)) {
  24. available.release();//释放许可证,将可用许可证的数量增加一个
  25. }
  26. }
  27. private synchronized T getItem(){
  28. for (int i = 0; i <size ; i++) {
  29. if (!checkedOut[i]){//检查是该对象是否已经被调用,同时取出集合中对应的对象元素
  30. checkedOut[i]=true;
  31. return items.get(i);
  32. }
  33. }
  34. return null;
  35. }
  36. private synchronized boolean releaseItem(T item){
  37. int index=items.indexOf(item);//先获取元素在集合中的索引
  38. if (index==-1)return false;
  39. if (checkedOut[index]){//检查对象是否被掉用,如果掉用则将标志改为fales
  40. checkedOut[index]=false;
  41. return true;
  42. }
  43. return false;
  44. }
  45. }

Fat.Fat类的构造器运行比较耗时,通过对象池将减少着损耗。

  1. public class Fat {
  2. private volatile double d;
  3. private static int count=0;
  4. private final int id=count++;
  5. Fat(){
  6. for (int i = 0; i <100 ; i++) {
  7. d+=(Math.PI+Math.E)/(double)i;
  8. }
  9. }
  10. public void operation(){
  11. System.out.println(this);
  12. }
  13. @Override
  14. public String toString() {
  15. return "Fat id: "+id;
  16. }
  17. }

测试:

  1. class CheckOutTask<T> implements Runnable{
  2. private static int count=0;
  3. private final int id=count++;
  4. private Pool<T> pool;
  5. CheckOutTask(Pool<T> p){
  6. pool=p;
  7. }
  8. @Override
  9. public void run() {
  10. try {
  11. T out = pool.checkOut();
  12. System.out.println(this+" checkOut的对象为:"+out);
  13. TimeUnit.MILLISECONDS.sleep(1000);
  14. System.out.println(this+" check in的对象为:"+out);
  15. pool.checkIn(out);
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. @Override
  21. public String toString() {
  22. return "CheckOutTask " +
  23. "id: " + id ;
  24. }
  25. }
  26. public class PoolTest {
  27. final static int SIZE=25;
  28. public static void main(String[] args) throws InterruptedException {
  29. final Pool<Fat> pool = new Pool<>(Fat.class,SIZE);
  30. final ExecutorService service = Executors.newCachedThreadPool();
  31. for (int i = 0; i <SIZE ; i++) {
  32. service.execute(new CheckOutTask<Fat>(pool));
  33. }
  34. TimeUnit.SECONDS.sleep(2);
  35. service.shutdownNow();
  36. final List<Fat> list = new ArrayList<>();
  37. for (int i = 0; i <SIZE ; i++) {
  38. final Fat out = pool.checkOut();
  39. list.add(out);
  40. }
  41. for (Fat fat : list) {
  42. System.out.println(fat);
  43. }
  44. }
  45. }

七、Exchanger:

可以在两任务之间交换对象,Exchanger的典型应用场景是:如果一个对象创建比较高昂,那么可以让更多的对象在被创建的同时被消费。
同上例中的fat对象在构造器中比较耗时,通过Exchanger将在这些对象在创建的时候被一个消费者消费

  1. class ExchangerProducer<T> implements Runnable {
  2. private Generator<T> generator;
  3. private Exchanger<List<T>> exchanger;
  4. private List<T> list;
  5. ExchangerProducer(Generator<T> gen, Exchanger<List<T>> exchanger, List<T> list) {
  6. generator = gen;
  7. this.exchanger = exchanger;
  8. this.list = list;
  9. }
  10. @Override
  11. public void run() {
  12. try {
  13. while (!Thread.interrupted()) {
  14. for (int i = 0; i < ExchangerDemo.size; i++) {
  15. list.add(generator.next());
  16. }
  17. list = exchanger.exchange(list);
  18. }
  19. } catch (InterruptedException e) {
  20. }
  21. }
  22. }
  23. class ExchangerConsumer<T> implements Runnable {
  24. Exchanger<List<T>> exchanger;
  25. private List<T> list;
  26. private volatile T value;
  27. ExchangerConsumer(Exchanger<List<T>> exchanger, List<T> list) {
  28. this.exchanger = exchanger;
  29. this.list = list;
  30. }
  31. @Override
  32. public void run() {
  33. try{
  34. while (!Thread.interrupted()){
  35. list=exchanger.exchange(list);
  36. for (T t : list) {
  37. value=t;
  38. list.remove(t);
  39. }
  40. }
  41. }catch (InterruptedException e){
  42. System.out.println(e);
  43. }
  44. System.out.println("Final value"+value);
  45. }
  46. }
  47. public class ExchangerDemo {
  48. static int size = 10;
  49. static int delay=5;
  50. public static void main(String[] args) throws InterruptedException {
  51. final ExecutorService service = Executors.newCachedThreadPool();
  52. Exchanger<List<Fat>> exchanger=new Exchanger<>();
  53. List<Fat> producerList=new CopyOnWriteArrayList<Fat>(),
  54. consumerList=new CopyOnWriteArrayList<>();
  55. service.execute(new ExchangerProducer<>(BasicGenerator.create(Fat.class),exchanger,producerList));
  56. service.execute(new ExchangerConsumer<>(exchanger,consumerList));
  57. TimeUnit.SECONDS.sleep(delay);
  58. service.shutdownNow();
  59. }
  60. }

注意:
1.当你调用Exchanger.exchanger()进行生产添加时,他将阻塞直到一个消费者任务也调用Exchanger.exchanger()。

  1. public class Example {
  2. public static void main(String[] args) throws IOException {
  3. List<String > list=new CopyOnWriteArrayList<>();
  4. Exchanger<List<String>> exchanger=new Exchanger<>();
  5. new Thread(){
  6. @Override
  7. public void run() {
  8. try {
  9. list.add("ss");
  10. System.out.println(Thread.currentThread().getName()+"开始阻塞");
  11. exchanger.exchange(list);
  12. System.out.println("因为线程受阻不会输出");
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. }.start();
  18. new Thread(){
  19. @Override
  20. public void run() {
  21. System.out.println(Thread.currentThread().getName()+"没用去调用exchanger.exchange输出所以线程被阻塞");
  22. }
  23. }.start();
  24. }
  25. }
  1. 2.当消费者调用exchanger.exchange(list);时,如果生产者还没有exchanger.exchange(list);添加数据,则消费者会一直等待
  1. public class Example {
  2. public static void main(String[] args) throws IOException, InterruptedException {
  3. List<String > list=new CopyOnWriteArrayList<>();
  4. Exchanger<List<String>> exchanger=new Exchanger<>();
  5. new Thread(){
  6. @Override
  7. public void run() {
  8. try {
  9. for (int i = 0; i <10 ; i++) {
  10. list.add("ss");
  11. }
  12. System.out.println("添加完毕,但没有放入交换的集合中去");
  13. } catch (Exception e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. }.start();
  18. new Thread(){
  19. @Override
  20. public void run() {
  21. System.out.println("消费者开始消费");
  22. while (!Thread.interrupted()) {
  23. try {
  24. System.out.println("获取不到生成者的集合,开始等待");
  25. List<String> strings = exchanger.exchange(list);
  26. for (String string : strings) {
  27. System.out.println(string);
  28. strings.remove(string);
  29. }
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. System.out.println("消费者消费完毕");
  34. }
  35. }
  36. }.start();
  37. }
  38. }