JUC(java.util.concurrent) 并发工具包
可重入锁
意思就是,一个线程可以多次的获得锁,但是获得了多少次锁,就得解多少次锁,否则的话,其他的线程无法获得锁。synchronized 默认是可重入锁。
private static Lock lock = new ReentrantLock();
public static void main(String[] args) {
lock.lock();
lock.lock();
lock.lock();
lock.lock();
// 锁了多少次,就得解开多少次
lock.unlock();
lock.unlock();
lock.unlock();
lock.unlock();
}
CountDownLatch
倒计时闭锁,用于协调一组线程的工作
简单粗暴的api
- countDown(); 倒计时减一
await(); 让其中一线程等待,倒计时锁减到0 ```java public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> {
// 每个工人开始干活
int second = new Random().nextInt(10);
try {
Thread.sleep(second * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程" + finalI + "活儿干完了");
// 活干完了,倒计时减一
latch.countDown();
}).start();
}
latch.await();
System.out.println("我是老板,所有的工人的活儿都干完了");
}
输出
```java
线程2活儿干完了
线程0活儿干完了
线程6活儿干完了
线程4活儿干完了
线程5活儿干完了
线程8活儿干完了
线程1活儿干完了
线程7活儿干完了
线程3活儿干完了
线程9活儿干完了
我是老板,所有的工人的活儿都干完了
CyclicBarrier
让多个线程,等在一个起跑线上。让所有线程等在一个点,没有任何一个线程可以单独通过这个屏障。只有所有线程都执行到这个点的时候,才能穿过
api更加粗暴
await();
public static void main(String[] args) { CyclicBarrier barrier = new CyclicBarrier(10); for (int i = 0; i < 10; i++) { int finalI = i; new Thread(() -> { // 每个工人开始干活 int second = new Random().nextInt(10); try { Thread.sleep(second * 10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程" + finalI + "活儿干完了"); try { barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println("等其他人到了,我们才能继续"); }).start(); } }
输出
线程4活儿干完了 线程0活儿干完了 线程1活儿干完了 线程9活儿干完了 线程5活儿干完了 线程3活儿干完了 线程8活儿干完了 线程6活儿干完了 线程2活儿干完了 线程7活儿干完了 等其他人到了,我们才能继续 等其他人到了,我们才能继续 等其他人到了,我们才能继续 等其他人到了,我们才能继续 等其他人到了,我们才能继续 等其他人到了,我们才能继续 等其他人到了,我们才能继续 等其他人到了,我们才能继续 等其他人到了,我们才能继续 等其他人到了,我们才能继续
Semaphore
信号量
想想以下有一大片麦子,线程就是工人,5个工人,要割麦子必须要用到镰刀,那么这个镰刀就是信号量。加入有三把镰刀,那么同时就只能有三个线程进行工作。 如果只有一把镰刀,那么这个信号量就是锁
apiacquire(); 获取信号量
- release(); 释放信号量
BlockingQueue/BlockingDeque
BlockingQueue
就是一个会阻塞的队列,指定这个队列的容量,加入超过这个容量,那么当前线程就会卡住,只要别的线程取走一个元素,腾出来空间,那么才会完成这个put的操作
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(2);
blockingQueue.put(1);
blockingQueue.put(1);
blockingQueue.put(1); // 这里会卡出,知道别的线程调用了take方法,腾给出空间
new Thread(() -> {
try {
blockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
BlockingDeque
这个一个双端队列,满足先进先出,但是头和尾都可以进出
线程池/future
future
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 一个线程池,里面有10个线程
ExecutorService threadPool = Executors.newFixedThreadPool(10);
// 给线程池提交一个任务(runnable实现),但是future会立即返回,
Future<?> future = threadPool.submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我结束工作了");
// 如果在这里抛出异常,那么future.get的时候,就会把这个异常拿到
});
System.out.println("我把工作提交了");
// future.get() 会等待线程池里面的某个线程把任务执行完,才能拿到结果,在那之前,get方法会阻塞
System.out.println(future.get());
System.out.println("工作做完");
}
线程池
线程池的参数们
- corePoolSize 核心员工数量
- maximumPoolSize 最大招募的员工数量
- keepAliveTime/unit 员工闲下来多久后炒掉他们
- workQueue 订单队列
- threadFactory 造人的工程
- handler 订单实在太多的处理策略(拒绝策略)
- AbortPolicy A handler for rejected tasks that throws a{@link RejectedExecutionException}. default policy 放弃,并且丢个异常说明处理不了
- CallerRunsPolicy 让调用者执行
- DiscardOldestPolicy 丢弃最旧的任务丢掉
- DiscardPolicy 偷摸的把最新进来的任务丢掉
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
用Executors的方法来创建线程池
newFixedThreadPool
/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue. At any point, at most * {@code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks. The threads in the pool will exist * until it is explicitly {@link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if {@code nThreads <= 0} */ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
newWorkStealingPool 线程执行完自己的任务,会偷其他线程的任务
/** * Creates a thread pool that maintains enough threads to support * the given parallelism level, and may use multiple queues to * reduce contention. The parallelism level corresponds to the * maximum number of threads actively engaged in, or available to * engage in, task processing. The actual number of threads may * grow and shrink dynamically. A work-stealing pool makes no * guarantees about the order in which submitted tasks are * executed. * * @param parallelism the targeted parallelism level * @return the newly created thread pool * @throws IllegalArgumentException if {@code parallelism <= 0} * @since 1.8 */ public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
newSingleThreadExecutor 单线程线程池
/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.) Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * {@code newFixedThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created single-threaded Executor */ public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
newCachedThreadPool 缓存线程池
/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available. These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to {@code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using {@link ThreadPoolExecutor} constructors. * * @return the newly created thread pool */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
等等等等。。 具体的看Executors的源码