CountDownLatch

示例一

1,注意在多线程环境下,去实例化一个对象。得给予JVM一定的时间去初始化一个对象。 1-1,或者直接使用CopyOnWriteArrayList不用给定时间,底层add操作加了锁 2,如何在多个任务间传递数据

  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.Random;
  4. import java.util.concurrent.CountDownLatch;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. import java.util.concurrent.TimeUnit;
  8. class User{
  9. private String name;
  10. private long id;
  11. public void setName(String name) {
  12. this.name = name;
  13. }
  14. public void setId(long id) {
  15. this.id = id;
  16. }
  17. public String getName() {
  18. return name;
  19. }
  20. public long getId() {
  21. return id;
  22. }
  23. @Override
  24. public String toString() {
  25. return "User{" +
  26. "name='" + name + '\'' +
  27. ", id=" + id +
  28. '}';
  29. }
  30. }
  31. class TaskPart1 implements Runnable {
  32. private static int counter = 0;
  33. private final int id = counter++;
  34. private static Random rand = new Random(47);
  35. private final CountDownLatch latch;
  36. private List<User> users;
  37. TaskPart1(CountDownLatch latch,List<User> users) {
  38. this.latch = latch;
  39. this.users = users;
  40. }
  41. @Override
  42. public void run() {
  43. try {
  44. doWork();
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. }
  48. latch.countDown();
  49. // System.out.println(Thread.currentThread().getName()+"->"+users);
  50. }
  51. public void doWork() throws InterruptedException {
  52. User user = new User();
  53. user.setName("aa->"+rand.nextInt(100));
  54. TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));
  55. users.add(user);//如果不休眠的话 可能user还未分配空间,此时存放的user为null
  56. }
  57. }
  58. class TaskPart2 implements Runnable {
  59. private static int counter = 0;
  60. private final int id = counter++;
  61. private final CountDownLatch latch;
  62. private static Random rand = new Random(47);
  63. private List<User> users;
  64. TaskPart2(CountDownLatch latch,List<User> users) {
  65. this.latch = latch;
  66. this.users = users;
  67. }
  68. @Override
  69. public void run() {
  70. try {
  71. latch.await();
  72. //发起远程调用公安系统的API 查询用户的身份证号
  73. if(users.size()!=0 && users!=null) {
  74. for (User user : users) {
  75. if(user!=null) {
  76. user.setId(rand.nextLong());
  77. }
  78. // System.out.println(user);
  79. }
  80. }
  81. } catch (InterruptedException e) {
  82. e.printStackTrace();
  83. }
  84. }
  85. }
  86. public class CountDownLatchDemo2 {
  87. static final int SIZE = 10;
  88. public static void main(String[] args) throws InterruptedException {
  89. ExecutorService exec = Executors.newCachedThreadPool();
  90. // All must share a single CountDownLatch object:
  91. List<User> users = new ArrayList<>();
  92. CountDownLatch latch = new CountDownLatch(10);
  93. for (int i = 0; i < SIZE; i++) {
  94. exec.execute(new TaskPart1(latch,users));
  95. }
  96. exec.execute(new TaskPart2(latch,users));
  97. TimeUnit.SECONDS.sleep(5);
  98. for (User user : users) {
  99. System.out.println(user);
  100. }
  101. exec.shutdown();
  102. }
  103. }

示例二

countDown()只和初始化的public CountDownLatch(int count){}有关系,同一线程可以执行count次的countDown()

package com.thinking.in.java.course.chapter21.waxomatic2;


import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

class Task1 implements Runnable{

    private CountDownLatch countDown;
    Task1(CountDownLatch count){
        this.countDown = count;
    }
    public void doWork1() {
        try {
            TimeUnit.SECONDS.sleep(2);
            countDown.countDown();
            countDown.countDown();
            countDown.countDown();
            countDown.countDown();
            countDown.countDown();
            System.out.println("finish do work 1");
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
    @Override
    public void run() {
        doWork1();
    }
}
class Task2 implements Runnable{

    private CountDownLatch countDown;

    Task2(CountDownLatch count){
        this.countDown = count;
    }

    public void doWork2() {
        try {
            countDown.await();
            TimeUnit.SECONDS.sleep(1);
            System.out.println("finish  work ");
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        doWork2();
    }
}
class Task3 implements Runnable{

    private CountDownLatch countDown;

    Task3(CountDownLatch count){
        this.countDown = count;
    }

    public void doWork3() {
        try {
            TimeUnit.SECONDS.sleep(5);
            System.out.println("finish do work 3");
            countDown.countDown();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        doWork3();
    }
}

public class CountDownLatchDemo3 {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch count = new CountDownLatch(5);
        new Thread( new Task2(count)).start();
        new Thread( new Task1(count)).start();
        //new Thread( new Task3(count)).start();
    }
}

CyclicBarrier

//nHorses 开栅之前等待的线程数
//barrierAction 开栅之后要去执行的任务(动作)
barrierAction只需要一个线程去执行;由等待中的parties个线程中任意一个去执行,同一时间点只有一个线程在执行;

public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

示例

更精彩的例子参考21.7.2赛马示例

package com.thinking.in.java.course.chapter21;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

class RunTask implements Runnable{
    private static int counter = 0;
    private final int id = counter++;

    private static CyclicBarrier barrier;

    public RunTask(CyclicBarrier barrier) {
        this.barrier = barrier;
    }
    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                System.out.println(this+"->"+Thread.currentThread().getName());
                barrier.await();
            }
        }catch (InterruptedException e) {
            System.out.println(e.getMessage());
        } catch (BrokenBarrierException e) {
            // This one we want to know about
            throw new RuntimeException(e);
        }
    }
    public String toString() {
        return "RunTask " + id + " ";
    }
}

public class CyclicBarrierDemo {
    private CyclicBarrier barrier;
    private ExecutorService exec = Executors.newCachedThreadPool();

    CyclicBarrierDemo(int count){
        //CyclicBarrier中的任务 由等待中的count个线程中任意一个去执行
        barrier = new CyclicBarrier(count, new Runnable() {
            AtomicInteger execCount = new AtomicInteger(1);
            @Override
            public void run() {
                System.out.println(count+"任务执行完成后再去执行 "+execCount.get()+" 次数"+"->"+Thread.currentThread().getName());
                execCount.incrementAndGet();
                if(execCount.get() > 5){
                    exec.shutdownNow();
                }
            }
        });

        for (int i = 0; i < count; i++) {
            RunTask task = new RunTask(barrier);
            exec.execute(task);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new CyclicBarrierDemo(3);
    }
}
DelayQueue

DelayQueue

存放的时候是无序的,take是按照自定义的顺序获取

示例

package com.thinking.in.java.course.chapter21;

import java.util.concurrent.*;
import java.util.*;

/**
    DelayedTask 不需要实现Runnable也行
*/
class DelayedTask implements  Delayed {
    private static int counter = 0;
    private final int id = counter++;
    private final int delta;
    private final long trigger;
    static CountDownLatch latch = new CountDownLatch(20);


    public DelayedTask(int delayInMilliseconds) {
        delta = delayInMilliseconds;
        trigger = System.nanoTime() +  TimeUnit.NANOSECONDS.convert(delta, TimeUnit.MILLISECONDS);
        //sequence.add(this);
    }

    public long getDelay(TimeUnit unit) {
        return unit.convert(trigger - System.nanoTime(), TimeUnit.NANOSECONDS);
    }

    public int compareTo(Delayed arg) {
        DelayedTask that = (DelayedTask) arg;
        if (trigger < that.trigger) return -1;
        if (trigger > that.trigger) return 1;
        return 0;
    }

    public void run() {
      try {
          TimeUnit.NANOSECONDS.sleep(5000-trigger);//触发的时间越靠后,休眠时间越短
          TimeUnit.MILLISECONDS.sleep(800);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      latch.countDown();

    }
    public String toString() {
        return String.format("[%1$-4d]", delta) +     " Task " + id;
    }

    public static class EndSentinel extends DelayedTask {
        private ExecutorService exec;

        public EndSentinel(int delay, ExecutorService e) {
            super(delay);
            exec = e;
        }

        public void run() {
          try {
            latch.await();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          System.out.println(this + " Calling shutdownNow()");
          exec.shutdownNow();
        }
    }
}

class DelayedTaskConsumer implements Runnable {
    private DelayQueue<DelayedTask> q;

    public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
        this.q = q;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                final DelayedTask take = q.take();
                take.run(); // Run task with the current thread
                System.out.println(Thread.currentThread().getName()+"获取了->"+take+"任务");
            }
        } catch (InterruptedException e) {
        }
    }
}

public class DelayQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        Random rand = new Random(47);
        ExecutorService exec = Executors.newCachedThreadPool();
        DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
        for (int i = 0; i < 20; i++)
            queue.put(new DelayedTask(rand.nextInt(5000)));
        queue.add(new DelayedTask.EndSentinel(5000, exec));

        for (int i = 0; i < 20; i++) {
            exec.execute(new DelayedTaskConsumer(queue));
        }
    }
}


PriorityBlockingQueue

PriorityBlockingQueue

实时将队列中优先级最高的元素取出来 典型的业务场景:银行办理业务

示例

理解点1, System.out.println(“当前队列中的元素数量->”+q.size()+””);
q.take().run();//只能取出当前队列中优先级比较高的任务 理解点2, 休眠一段时间,让所有的元素准备就绪, TimeUnit.SECONDS.sleep(5);//验证PriorityBlockingQueue中所有的元素已经就绪

package com.thinking.in.java.course.chapter21;

import java.util.concurrent.*;
import java.util.*;

class PrioritizedTask implements  Runnable, Comparable<PrioritizedTask> {
    private Random rand = new Random(47);
    private static int counter = 0;
    private final int id = counter++;
    private final int priority;
    protected static List<PrioritizedTask> sequence =  new ArrayList<PrioritizedTask>();

    public PrioritizedTask(int priority) {
        this.priority = priority;
        sequence.add(this);
    }
    //降序排序
    public int compareTo(PrioritizedTask arg) {
        return priority < arg.priority ? 1 : (priority > arg.priority ? -1 : 0);
    }

    public void run() {
        try {
            TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));
        } catch (InterruptedException e) {
            // Acceptable way to exit
        }
        System.out.println(this);
    }

    public String toString() {
        return String.format("[%1$-3d]", priority) +  " Task " + id;
    }

    public String summary() {
        return "(" + id + ":" + priority + ")";
    }

    public static class EndSentinel extends PrioritizedTask {
        private ExecutorService exec;

        public EndSentinel(ExecutorService e) {
            super(-1);
            exec = e;
        }

        public void run() {
            int count = 0;
            System.out.println(this + " Calling shutdownNow()");
            exec.shutdownNow();
        }
    }
}

class PrioritizedTaskProducer implements Runnable {
    private Random rand = new Random(47);
    private Queue<Runnable> queue;
    private ExecutorService exec;

    public PrioritizedTaskProducer(Queue<Runnable> q, ExecutorService e) {
        queue = q;
        exec = e; // Used for EndSentinel
    }

    public void run() {
        //假设开门后的一些普通用户
        for (int i = 0; i < 20; i++) {
            queue.add(new PrioritizedTask(rand.nextInt(10)));
            //Thread.yield();
        }

        try {
            //过了一段时间来了10个VIP用户
            for (int i = 0; i < 10; i++) {
                TimeUnit.MILLISECONDS.sleep(250);
                queue.add(new PrioritizedTask(10));
            }
            //再接下里的时候又来了随机客户
            for (int i = 0; i < 10; i++)
                queue.add(new PrioritizedTask(i));
            queue.add(new PrioritizedTask.EndSentinel(exec));
        } catch (InterruptedException e) {
        }
        System.out.println("Finished PrioritizedTaskProducer");
    }
}

class PrioritizedTaskConsumer implements Runnable {
    private PriorityBlockingQueue<Runnable> q;

    public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q) {
        this.q = q;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                System.out.println("当前队列中的元素数量->"+q.size()+"");
                q.take().run();//只能取出当前队列中优先级比较高的任务
            }
        } catch (InterruptedException e) {
            // Acceptable way to exit
        }
        System.out.println("Finished PrioritizedTaskConsumer");
    }
}

public class PriorityBlockingQueueDemo {
    public static void main(String[] args) throws Exception {
        Random rand = new Random(47);
        ExecutorService exec = Executors.newCachedThreadPool();
        PriorityBlockingQueue<Runnable> queue =  new PriorityBlockingQueue<Runnable>();
        exec.execute(new PrioritizedTaskProducer(queue, exec));
       1
        exec.execute(new PrioritizedTaskConsumer(queue));
    }
}

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor

对于线程池中的计划任务,同一线程池,谁空闲谁去跑任务,任务与线程池中线程非绑定关系 示例参考:GreenhouseScheduler

示例一

jdk自带的Timer类使用方式 1, timer.scheduleAtFixedRate强调的是执行的频率 2,当某个任务发生异常的时候,在不处理异常的情况下,其他任务亦无法执行


package com.thinking.in.java.course.chapter21.timer;

import java.util.Random;
import java.util.TimerTask;

public class TTask extends TimerTask {

    private final Random random = new Random(47);

    private int i;

    private String flag;

    public TTask(String flag){
        this.flag = flag;
    }

    @Override
    public void run() {
        i++;
        if(i == random.nextInt(5)){
            try {
                throw new RuntimeException("随机异常" + flag);
            }catch (RuntimeException e){
                System.out.println("(。・∀・)ノ゙嗨,失误了");
                return;
            }
        }
        System.out.println(flag+"=======执行任务======"+i);
    }
}


package com.thinking.in.java.course.chapter21.timer;

import java.util.Date;
import java.util.Timer;
import java.util.concurrent.TimeUnit;

/**
 * 当某个TTask发生异常的时候,其他的任务都无法执行
 */
public class TimerTaskDemo {
    public static void main(String[] args) throws InterruptedException {
        Timer timer = new Timer();
        //模拟提交5个任务
        for (int i = 0; i < 5; i++) {
            timer.scheduleAtFixedRate(new TTask("t"+i),new Date(System.currentTimeMillis()),1000L);
        }
        TimeUnit.SECONDS.sleep(5);
        timer.cancel();
    }
}

示例二:


import java.util.Random;

public class Task implements Runnable{
    private final Random random = new Random(47);

    private int i;

    private String flag;

    public Task(String flag){
        this.flag = flag;
    }

    @Override
    public void run() {
        i++;
        if(i == random.nextInt(5)){
            //try {
                //将影响本任务后续按频率执行的次数
                throw new RuntimeException("随机异常" + flag);
            /*}catch (RuntimeException e){
                System.out.println("(。・∀・)ノ゙嗨,失误了");
                return;
            }*/
        }
        System.out.println(flag+"=======执行任务======"+i);
    }
}

package com.thinking.in.java.course.chapter21.schedule;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ScheduledDemo {
    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService service =new ScheduledThreadPoolExecutor(1);
        for (int i = 0; i < 5; i++) {
            service.scheduleAtFixedRate(new Task("t"+i),0,1000L,TimeUnit.MILLISECONDS);
        }
        TimeUnit.SECONDS.sleep(5);
        service.shutdownNow();
    }
}
package com.thinking.in.java.course.chapter21;


import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolExecutorTestCase {


    public static void main(String[] args) throws InterruptedException {
        ScheduledThreadPoolExecutor schedule = new ScheduledThreadPoolExecutor(2);
        /*schedule.schedule(()->{
            System.out.println("一秒后执行");
        },1000, TimeUnit.MILLISECONDS);*/

       schedule.scheduleAtFixedRate(()->{
            try {
                TimeUnit.MILLISECONDS.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("两秒后开始执行,并且每个一秒执行一次");
        },-2000,1000,TimeUnit.MILLISECONDS);

        schedule.scheduleWithFixedDelay(()->{
            try {
                TimeUnit.MILLISECONDS.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("按照固定的频率执行");
        },-2000,1000,TimeUnit.MILLISECONDS);
        TimeUnit.MILLISECONDS.sleep(10000);
        schedule.shutdown();
    }
}

Semaphore

线程池、数据库连接池的原理

示例

package com.thinking.in.java.course.chapter21;

import java.util.concurrent.*;
import java.util.*;

public class Pool<T> {
    private int size;
    //该ArrayList的索引与checkedOut的索引是一一对应的
    private List<T> items = new ArrayList<T>();//存放对象的池子
    private volatile boolean[] checkedOut;//检出的标识
    private Semaphore available;

    public Pool(Class<T> classObject, int size) {
        this.size = size;
        checkedOut = new boolean[size];//默认为全false
        available = new Semaphore(size, true);
        // Load pool with objects that can be checked out:
        for (int i = 0; i < size; ++i) {
            try {
                // 构建对象池的时候,直接将对象反射出来放入池子中
                items.add(classObject.newInstance());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    public T checkOut() throws InterruptedException {
        available.acquire();
        return getItem();
    }

    public void checkIn(T x) {
        if (releaseItem(x))
            available.release();
    }

    private synchronized T getItem() {
        for (int i = 0; i < size; ++i)
            if (!checkedOut[i]) {
                checkedOut[i] = true;
                return items.get(i);
            }
        return null; // Semaphore prevents reaching here
    }

    private synchronized boolean releaseItem(T item) {
        int index = items.indexOf(item);
        if (index == -1) return false; // Not in the list
        if (checkedOut[index]) {
            checkedOut[index] = false;
            return true;
        }
        return false; // Wasn't checked out
    }
}

//--------------------------
package com.thinking.in.java.course.chapter21;

public class Fat {
  private volatile double d; // Prevent optimization
  private static int counter = 0;
  private final int id = counter++;
  public Fat() {
    // Expensive, interruptible operation:
    for(int i = 1; i < 10000; i++) {
      d += (Math.PI + Math.E) / (double)i;
    }
  }
  public void operation() { System.out.println(this); }
  public String toString() { return "Fat id: " + id; }
}

//--------------------------
package com.thinking.in.java.course.chapter21;

import java.util.concurrent.*;
import java.util.*;

// A task to check a resource out of a pool:
class CheckoutTask<T> implements Runnable {
    private static int counter = 0;
    private final int id = counter++;
    private Pool<T> pool;

    public CheckoutTask(Pool<T> pool) {
        this.pool = pool;
    }

    public void run() {
        try {
            T item = pool.checkOut();
            System.out.println(this + "checked out " + item);
            TimeUnit.SECONDS.sleep(3);//用了3秒
            System.out.println(this + "checking in " + item);
            pool.checkIn(item);
        } catch (InterruptedException e) {
            // Acceptable way to terminate
        }
    }

    public String toString() {
        return "CheckoutTask " + id + " ";
    }
}

public class SemaphoreDemo {
    final static int SIZE = 25;

    public static void main(String[] args) throws Exception {
        final Pool<Fat> pool = new Pool<Fat>(Fat.class, SIZE);//初始化25个fat
        ExecutorService exec = Executors.newCachedThreadPool();

       /* for (int i = 0; i < SIZE; i++)
            exec.execute(new CheckoutTask<Fat>(pool));

        System.out.println("All CheckoutTasks created");*/

       List<Fat> list = new ArrayList<Fat>();
        for (int i = 0; i < SIZE; i++) {
            System.out.println(i + ": main() thread checked out ");
            Fat f = pool.checkOut();
            f.operation();
            list.add(f);
        }

        Future<?> blocked = exec.submit(new Runnable() {
            public void run() {
                try {
                    // Semaphore prevents additional checkout,
                    // so call is blocked:
                    pool.checkOut();//因为全部检出,Semaphore获取不到acquire 将会阻塞
                    System.out.println("=========");
                } catch (InterruptedException e) {
                    System.out.println("checkOut() Interrupted");
                }
            }
        });
        TimeUnit.SECONDS.sleep(5);
        blocked.cancel(true); // Break out of blocked call
        System.out.println("Checking in objects in " + list);
       for (Fat f : list)
            pool.checkIn(f);
        for (Fat f : list)
            pool.checkIn(f); // Second checkIn ignored
        exec.shutdown();
    }
}

Exchanger

交换对象的栅栏 应用场景:对于创建代价高昂的对象,边创建边使用

示例

package com.thinking.in.java.course.chapter21;

import com.thinking.in.java.course.util.BasicGenerator;
import com.thinking.in.java.course.util.Generator;

import java.util.concurrent.*;
import java.util.*;

class ExchangerProducer<T> implements Runnable {
    private Generator<T> generator;
    private Exchanger<List<T>> exchanger;
    private List<T> holder;

    ExchangerProducer(Exchanger<List<T>> exchg, Generator<T> gen, List<T> holder) {
        exchanger = exchg;
        generator = gen;
        this.holder = holder;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                for (int i = 0; i < ExchangerDemo.size; i++) {
                    //TimeUnit.MILLISECONDS.sleep(200);
                    final T t = generator.next();
                    holder.add(t);
                    System.out.println("生产了编号 " + i + " 的对象 " + t);
                    //当没有消费者的时候,会阻塞
                }
                System.out.println("before exchange->"+holder.size());
                holder = exchanger.exchange(holder);
                System.out.println("after  exchange->"+holder.size());
              // Exchange full for empty:
            }
        } catch (InterruptedException e) {
            // OK to terminate this way.
        }
    }
}

class ExchangerConsumer<T> implements Runnable {
    private Exchanger<List<T>> exchanger;
    private List<T> holder;
    private volatile T value;

    ExchangerConsumer(Exchanger<List<T>> ex, List<T> holder) {
        exchanger = ex;
        this.holder = holder;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                holder = exchanger.exchange(holder);
                //for循环 消费完了,再进行交换
                for (T x : holder) {
                    value = x; // Fetch out value
                    holder.remove(x); // OK for CopyOnWriteArrayList
                    System.out.println("Consumer Final value: " + value);
                    TimeUnit.MILLISECONDS.sleep(2000);
                }
            }
        } catch (InterruptedException e) {
            // OK to terminate this way.
        }
        //System.out.println("Final value: " + value);

    }
}

public class ExchangerDemo {
    static int size = 10;
    static int delay = 50; // Seconds

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();
        Exchanger<List<Fat>> xc = new Exchanger<List<Fat>>();
        List<Fat>
                producerList = new CopyOnWriteArrayList<Fat>(),
                consumerList = new CopyOnWriteArrayList<Fat>();

        exec.execute(new ExchangerProducer<Fat>(xc, BasicGenerator.create(Fat.class), producerList));

        exec.execute(new ExchangerConsumer<Fat>(xc, consumerList));
        TimeUnit.SECONDS.sleep(delay);
        exec.shutdownNow();
    }
}