CountDownLatch
示例一
1,注意在多线程环境下,去实例化一个对象。得给予JVM一定的时间去初始化一个对象。 1-1,或者直接使用CopyOnWriteArrayList不用给定时间,底层add操作加了锁 2,如何在多个任务间传递数据
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class User{
private String name;
private long id;
public void setName(String name) {
this.name = name;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public long getId() {
return id;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", id=" + id +
'}';
}
}
class TaskPart1 implements Runnable {
private static int counter = 0;
private final int id = counter++;
private static Random rand = new Random(47);
private final CountDownLatch latch;
private List<User> users;
TaskPart1(CountDownLatch latch,List<User> users) {
this.latch = latch;
this.users = users;
}
@Override
public void run() {
try {
doWork();
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
// System.out.println(Thread.currentThread().getName()+"->"+users);
}
public void doWork() throws InterruptedException {
User user = new User();
user.setName("aa->"+rand.nextInt(100));
TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));
users.add(user);//如果不休眠的话 可能user还未分配空间,此时存放的user为null
}
}
class TaskPart2 implements Runnable {
private static int counter = 0;
private final int id = counter++;
private final CountDownLatch latch;
private static Random rand = new Random(47);
private List<User> users;
TaskPart2(CountDownLatch latch,List<User> users) {
this.latch = latch;
this.users = users;
}
@Override
public void run() {
try {
latch.await();
//发起远程调用公安系统的API 查询用户的身份证号
if(users.size()!=0 && users!=null) {
for (User user : users) {
if(user!=null) {
user.setId(rand.nextLong());
}
// System.out.println(user);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class CountDownLatchDemo2 {
static final int SIZE = 10;
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
// All must share a single CountDownLatch object:
List<User> users = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < SIZE; i++) {
exec.execute(new TaskPart1(latch,users));
}
exec.execute(new TaskPart2(latch,users));
TimeUnit.SECONDS.sleep(5);
for (User user : users) {
System.out.println(user);
}
exec.shutdown();
}
}
示例二
countDown()只和初始化的public CountDownLatch(int count){}有关系,同一线程可以执行count次的countDown()
package com.thinking.in.java.course.chapter21.waxomatic2;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
class Task1 implements Runnable{
private CountDownLatch countDown;
Task1(CountDownLatch count){
this.countDown = count;
}
public void doWork1() {
try {
TimeUnit.SECONDS.sleep(2);
countDown.countDown();
countDown.countDown();
countDown.countDown();
countDown.countDown();
countDown.countDown();
System.out.println("finish do work 1");
}catch (InterruptedException e){
e.printStackTrace();
}
}
@Override
public void run() {
doWork1();
}
}
class Task2 implements Runnable{
private CountDownLatch countDown;
Task2(CountDownLatch count){
this.countDown = count;
}
public void doWork2() {
try {
countDown.await();
TimeUnit.SECONDS.sleep(1);
System.out.println("finish work ");
}catch (InterruptedException e){
e.printStackTrace();
}
}
@Override
public void run() {
doWork2();
}
}
class Task3 implements Runnable{
private CountDownLatch countDown;
Task3(CountDownLatch count){
this.countDown = count;
}
public void doWork3() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("finish do work 3");
countDown.countDown();
}catch (InterruptedException e){
e.printStackTrace();
}
}
@Override
public void run() {
doWork3();
}
}
public class CountDownLatchDemo3 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch count = new CountDownLatch(5);
new Thread( new Task2(count)).start();
new Thread( new Task1(count)).start();
//new Thread( new Task3(count)).start();
}
}
CyclicBarrier
//nHorses 开栅之前等待的线程数
//barrierAction 开栅之后要去执行的任务(动作) barrierAction只需要一个线程去执行;由等待中的parties个线程中任意一个去执行,同一时间点只有一个线程在执行;public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
示例
更精彩的例子参考21.7.2赛马示例
package com.thinking.in.java.course.chapter21;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
class RunTask implements Runnable{
private static int counter = 0;
private final int id = counter++;
private static CyclicBarrier barrier;
public RunTask(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
System.out.println(this+"->"+Thread.currentThread().getName());
barrier.await();
}
}catch (InterruptedException e) {
System.out.println(e.getMessage());
} catch (BrokenBarrierException e) {
// This one we want to know about
throw new RuntimeException(e);
}
}
public String toString() {
return "RunTask " + id + " ";
}
}
public class CyclicBarrierDemo {
private CyclicBarrier barrier;
private ExecutorService exec = Executors.newCachedThreadPool();
CyclicBarrierDemo(int count){
//CyclicBarrier中的任务 由等待中的count个线程中任意一个去执行
barrier = new CyclicBarrier(count, new Runnable() {
AtomicInteger execCount = new AtomicInteger(1);
@Override
public void run() {
System.out.println(count+"任务执行完成后再去执行 "+execCount.get()+" 次数"+"->"+Thread.currentThread().getName());
execCount.incrementAndGet();
if(execCount.get() > 5){
exec.shutdownNow();
}
}
});
for (int i = 0; i < count; i++) {
RunTask task = new RunTask(barrier);
exec.execute(task);
}
}
public static void main(String[] args) throws InterruptedException {
new CyclicBarrierDemo(3);
}
}
DelayQueue
DelayQueue
存放的时候是无序的,take是按照自定义的顺序获取
示例
package com.thinking.in.java.course.chapter21;
import java.util.concurrent.*;
import java.util.*;
/**
DelayedTask 不需要实现Runnable也行
*/
class DelayedTask implements Delayed {
private static int counter = 0;
private final int id = counter++;
private final int delta;
private final long trigger;
static CountDownLatch latch = new CountDownLatch(20);
public DelayedTask(int delayInMilliseconds) {
delta = delayInMilliseconds;
trigger = System.nanoTime() + TimeUnit.NANOSECONDS.convert(delta, TimeUnit.MILLISECONDS);
//sequence.add(this);
}
public long getDelay(TimeUnit unit) {
return unit.convert(trigger - System.nanoTime(), TimeUnit.NANOSECONDS);
}
public int compareTo(Delayed arg) {
DelayedTask that = (DelayedTask) arg;
if (trigger < that.trigger) return -1;
if (trigger > that.trigger) return 1;
return 0;
}
public void run() {
try {
TimeUnit.NANOSECONDS.sleep(5000-trigger);//触发的时间越靠后,休眠时间越短
TimeUnit.MILLISECONDS.sleep(800);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
}
public String toString() {
return String.format("[%1$-4d]", delta) + " Task " + id;
}
public static class EndSentinel extends DelayedTask {
private ExecutorService exec;
public EndSentinel(int delay, ExecutorService e) {
super(delay);
exec = e;
}
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class DelayedTaskConsumer implements Runnable {
private DelayQueue<DelayedTask> q;
public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
this.q = q;
}
public void run() {
try {
while (!Thread.interrupted()) {
final DelayedTask take = q.take();
take.run(); // Run task with the current thread
System.out.println(Thread.currentThread().getName()+"获取了->"+take+"任务");
}
} catch (InterruptedException e) {
}
}
}
public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
for (int i = 0; i < 20; i++)
queue.put(new DelayedTask(rand.nextInt(5000)));
queue.add(new DelayedTask.EndSentinel(5000, exec));
for (int i = 0; i < 20; i++) {
exec.execute(new DelayedTaskConsumer(queue));
}
}
}
PriorityBlockingQueue
PriorityBlockingQueue
实时将队列中优先级最高的元素取出来 典型的业务场景:银行办理业务
示例
理解点1, System.out.println(“当前队列中的元素数量->”+q.size()+””);
q.take().run();//只能取出当前队列中优先级比较高的任务 理解点2, 休眠一段时间,让所有的元素准备就绪, TimeUnit.SECONDS.sleep(5);//验证PriorityBlockingQueue中所有的元素已经就绪
package com.thinking.in.java.course.chapter21;
import java.util.concurrent.*;
import java.util.*;
class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
private Random rand = new Random(47);
private static int counter = 0;
private final int id = counter++;
private final int priority;
protected static List<PrioritizedTask> sequence = new ArrayList<PrioritizedTask>();
public PrioritizedTask(int priority) {
this.priority = priority;
sequence.add(this);
}
//降序排序
public int compareTo(PrioritizedTask arg) {
return priority < arg.priority ? 1 : (priority > arg.priority ? -1 : 0);
}
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));
} catch (InterruptedException e) {
// Acceptable way to exit
}
System.out.println(this);
}
public String toString() {
return String.format("[%1$-3d]", priority) + " Task " + id;
}
public String summary() {
return "(" + id + ":" + priority + ")";
}
public static class EndSentinel extends PrioritizedTask {
private ExecutorService exec;
public EndSentinel(ExecutorService e) {
super(-1);
exec = e;
}
public void run() {
int count = 0;
System.out.println(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class PrioritizedTaskProducer implements Runnable {
private Random rand = new Random(47);
private Queue<Runnable> queue;
private ExecutorService exec;
public PrioritizedTaskProducer(Queue<Runnable> q, ExecutorService e) {
queue = q;
exec = e; // Used for EndSentinel
}
public void run() {
//假设开门后的一些普通用户
for (int i = 0; i < 20; i++) {
queue.add(new PrioritizedTask(rand.nextInt(10)));
//Thread.yield();
}
try {
//过了一段时间来了10个VIP用户
for (int i = 0; i < 10; i++) {
TimeUnit.MILLISECONDS.sleep(250);
queue.add(new PrioritizedTask(10));
}
//再接下里的时候又来了随机客户
for (int i = 0; i < 10; i++)
queue.add(new PrioritizedTask(i));
queue.add(new PrioritizedTask.EndSentinel(exec));
} catch (InterruptedException e) {
}
System.out.println("Finished PrioritizedTaskProducer");
}
}
class PrioritizedTaskConsumer implements Runnable {
private PriorityBlockingQueue<Runnable> q;
public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q) {
this.q = q;
}
public void run() {
try {
while (!Thread.interrupted()) {
System.out.println("当前队列中的元素数量->"+q.size()+"");
q.take().run();//只能取出当前队列中优先级比较高的任务
}
} catch (InterruptedException e) {
// Acceptable way to exit
}
System.out.println("Finished PrioritizedTaskConsumer");
}
}
public class PriorityBlockingQueueDemo {
public static void main(String[] args) throws Exception {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>();
exec.execute(new PrioritizedTaskProducer(queue, exec));
1
exec.execute(new PrioritizedTaskConsumer(queue));
}
}
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
对于线程池中的计划任务,同一线程池,谁空闲谁去跑任务,任务与线程池中线程非绑定关系 示例参考:GreenhouseScheduler
示例一
jdk自带的Timer类使用方式 1, timer.scheduleAtFixedRate强调的是执行的频率 2,当某个任务发生异常的时候,在不处理异常的情况下,其他任务亦无法执行
package com.thinking.in.java.course.chapter21.timer;
import java.util.Random;
import java.util.TimerTask;
public class TTask extends TimerTask {
private final Random random = new Random(47);
private int i;
private String flag;
public TTask(String flag){
this.flag = flag;
}
@Override
public void run() {
i++;
if(i == random.nextInt(5)){
try {
throw new RuntimeException("随机异常" + flag);
}catch (RuntimeException e){
System.out.println("(。・∀・)ノ゙嗨,失误了");
return;
}
}
System.out.println(flag+"=======执行任务======"+i);
}
}
package com.thinking.in.java.course.chapter21.timer;
import java.util.Date;
import java.util.Timer;
import java.util.concurrent.TimeUnit;
/**
* 当某个TTask发生异常的时候,其他的任务都无法执行
*/
public class TimerTaskDemo {
public static void main(String[] args) throws InterruptedException {
Timer timer = new Timer();
//模拟提交5个任务
for (int i = 0; i < 5; i++) {
timer.scheduleAtFixedRate(new TTask("t"+i),new Date(System.currentTimeMillis()),1000L);
}
TimeUnit.SECONDS.sleep(5);
timer.cancel();
}
}
示例二:
import java.util.Random;
public class Task implements Runnable{
private final Random random = new Random(47);
private int i;
private String flag;
public Task(String flag){
this.flag = flag;
}
@Override
public void run() {
i++;
if(i == random.nextInt(5)){
//try {
//将影响本任务后续按频率执行的次数
throw new RuntimeException("随机异常" + flag);
/*}catch (RuntimeException e){
System.out.println("(。・∀・)ノ゙嗨,失误了");
return;
}*/
}
System.out.println(flag+"=======执行任务======"+i);
}
}
package com.thinking.in.java.course.chapter21.schedule;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ScheduledDemo {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService service =new ScheduledThreadPoolExecutor(1);
for (int i = 0; i < 5; i++) {
service.scheduleAtFixedRate(new Task("t"+i),0,1000L,TimeUnit.MILLISECONDS);
}
TimeUnit.SECONDS.sleep(5);
service.shutdownNow();
}
}
package com.thinking.in.java.course.chapter21;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ScheduledThreadPoolExecutorTestCase {
public static void main(String[] args) throws InterruptedException {
ScheduledThreadPoolExecutor schedule = new ScheduledThreadPoolExecutor(2);
/*schedule.schedule(()->{
System.out.println("一秒后执行");
},1000, TimeUnit.MILLISECONDS);*/
schedule.scheduleAtFixedRate(()->{
try {
TimeUnit.MILLISECONDS.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("两秒后开始执行,并且每个一秒执行一次");
},-2000,1000,TimeUnit.MILLISECONDS);
schedule.scheduleWithFixedDelay(()->{
try {
TimeUnit.MILLISECONDS.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("按照固定的频率执行");
},-2000,1000,TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS.sleep(10000);
schedule.shutdown();
}
}
Semaphore
线程池、数据库连接池的原理
示例
package com.thinking.in.java.course.chapter21;
import java.util.concurrent.*;
import java.util.*;
public class Pool<T> {
private int size;
//该ArrayList的索引与checkedOut的索引是一一对应的
private List<T> items = new ArrayList<T>();//存放对象的池子
private volatile boolean[] checkedOut;//检出的标识
private Semaphore available;
public Pool(Class<T> classObject, int size) {
this.size = size;
checkedOut = new boolean[size];//默认为全false
available = new Semaphore(size, true);
// Load pool with objects that can be checked out:
for (int i = 0; i < size; ++i) {
try {
// 构建对象池的时候,直接将对象反射出来放入池子中
items.add(classObject.newInstance());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
public T checkOut() throws InterruptedException {
available.acquire();
return getItem();
}
public void checkIn(T x) {
if (releaseItem(x))
available.release();
}
private synchronized T getItem() {
for (int i = 0; i < size; ++i)
if (!checkedOut[i]) {
checkedOut[i] = true;
return items.get(i);
}
return null; // Semaphore prevents reaching here
}
private synchronized boolean releaseItem(T item) {
int index = items.indexOf(item);
if (index == -1) return false; // Not in the list
if (checkedOut[index]) {
checkedOut[index] = false;
return true;
}
return false; // Wasn't checked out
}
}
//--------------------------
package com.thinking.in.java.course.chapter21;
public class Fat {
private volatile double d; // Prevent optimization
private static int counter = 0;
private final int id = counter++;
public Fat() {
// Expensive, interruptible operation:
for(int i = 1; i < 10000; i++) {
d += (Math.PI + Math.E) / (double)i;
}
}
public void operation() { System.out.println(this); }
public String toString() { return "Fat id: " + id; }
}
//--------------------------
package com.thinking.in.java.course.chapter21;
import java.util.concurrent.*;
import java.util.*;
// A task to check a resource out of a pool:
class CheckoutTask<T> implements Runnable {
private static int counter = 0;
private final int id = counter++;
private Pool<T> pool;
public CheckoutTask(Pool<T> pool) {
this.pool = pool;
}
public void run() {
try {
T item = pool.checkOut();
System.out.println(this + "checked out " + item);
TimeUnit.SECONDS.sleep(3);//用了3秒
System.out.println(this + "checking in " + item);
pool.checkIn(item);
} catch (InterruptedException e) {
// Acceptable way to terminate
}
}
public String toString() {
return "CheckoutTask " + id + " ";
}
}
public class SemaphoreDemo {
final static int SIZE = 25;
public static void main(String[] args) throws Exception {
final Pool<Fat> pool = new Pool<Fat>(Fat.class, SIZE);//初始化25个fat
ExecutorService exec = Executors.newCachedThreadPool();
/* for (int i = 0; i < SIZE; i++)
exec.execute(new CheckoutTask<Fat>(pool));
System.out.println("All CheckoutTasks created");*/
List<Fat> list = new ArrayList<Fat>();
for (int i = 0; i < SIZE; i++) {
System.out.println(i + ": main() thread checked out ");
Fat f = pool.checkOut();
f.operation();
list.add(f);
}
Future<?> blocked = exec.submit(new Runnable() {
public void run() {
try {
// Semaphore prevents additional checkout,
// so call is blocked:
pool.checkOut();//因为全部检出,Semaphore获取不到acquire 将会阻塞
System.out.println("=========");
} catch (InterruptedException e) {
System.out.println("checkOut() Interrupted");
}
}
});
TimeUnit.SECONDS.sleep(5);
blocked.cancel(true); // Break out of blocked call
System.out.println("Checking in objects in " + list);
for (Fat f : list)
pool.checkIn(f);
for (Fat f : list)
pool.checkIn(f); // Second checkIn ignored
exec.shutdown();
}
}
Exchanger
交换对象的栅栏 应用场景:对于创建代价高昂的对象,边创建边使用
示例
package com.thinking.in.java.course.chapter21;
import com.thinking.in.java.course.util.BasicGenerator;
import com.thinking.in.java.course.util.Generator;
import java.util.concurrent.*;
import java.util.*;
class ExchangerProducer<T> implements Runnable {
private Generator<T> generator;
private Exchanger<List<T>> exchanger;
private List<T> holder;
ExchangerProducer(Exchanger<List<T>> exchg, Generator<T> gen, List<T> holder) {
exchanger = exchg;
generator = gen;
this.holder = holder;
}
public void run() {
try {
while (!Thread.interrupted()) {
for (int i = 0; i < ExchangerDemo.size; i++) {
//TimeUnit.MILLISECONDS.sleep(200);
final T t = generator.next();
holder.add(t);
System.out.println("生产了编号 " + i + " 的对象 " + t);
//当没有消费者的时候,会阻塞
}
System.out.println("before exchange->"+holder.size());
holder = exchanger.exchange(holder);
System.out.println("after exchange->"+holder.size());
// Exchange full for empty:
}
} catch (InterruptedException e) {
// OK to terminate this way.
}
}
}
class ExchangerConsumer<T> implements Runnable {
private Exchanger<List<T>> exchanger;
private List<T> holder;
private volatile T value;
ExchangerConsumer(Exchanger<List<T>> ex, List<T> holder) {
exchanger = ex;
this.holder = holder;
}
public void run() {
try {
while (!Thread.interrupted()) {
holder = exchanger.exchange(holder);
//for循环 消费完了,再进行交换
for (T x : holder) {
value = x; // Fetch out value
holder.remove(x); // OK for CopyOnWriteArrayList
System.out.println("Consumer Final value: " + value);
TimeUnit.MILLISECONDS.sleep(2000);
}
}
} catch (InterruptedException e) {
// OK to terminate this way.
}
//System.out.println("Final value: " + value);
}
}
public class ExchangerDemo {
static int size = 10;
static int delay = 50; // Seconds
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
Exchanger<List<Fat>> xc = new Exchanger<List<Fat>>();
List<Fat>
producerList = new CopyOnWriteArrayList<Fat>(),
consumerList = new CopyOnWriteArrayList<Fat>();
exec.execute(new ExchangerProducer<Fat>(xc, BasicGenerator.create(Fat.class), producerList));
exec.execute(new ExchangerConsumer<Fat>(xc, consumerList));
TimeUnit.SECONDS.sleep(delay);
exec.shutdownNow();
}
}