JUC(java.util.concurrent) 并发工具包

可重入锁

意思就是,一个线程可以多次的获得锁,但是获得了多少次锁,就得解多少次锁,否则的话,其他的线程无法获得锁。synchronized 默认是可重入锁。

  1. private static Lock lock = new ReentrantLock();
  2. public static void main(String[] args) {
  3. lock.lock();
  4. lock.lock();
  5. lock.lock();
  6. lock.lock();
  7. // 锁了多少次,就得解开多少次
  8. lock.unlock();
  9. lock.unlock();
  10. lock.unlock();
  11. lock.unlock();
  12. }

CountDownLatch

倒计时闭锁,用于协调一组线程的工作
简单粗暴的api

  • countDown(); 倒计时减一
  • await(); 让其中一线程等待,倒计时锁减到0 ```java public static void main(String[] args) throws InterruptedException {

    1. CountDownLatch latch = new CountDownLatch(10);
    2. for (int i = 0; i < 10; i++) {
    3. int finalI = i;
    4. new Thread(() -> {
    5. // 每个工人开始干活
    6. int second = new Random().nextInt(10);
    7. try {
    8. Thread.sleep(second * 1000);
    9. } catch (InterruptedException e) {
    10. e.printStackTrace();
    11. }
    12. System.out.println("线程" + finalI + "活儿干完了");
    13. // 活干完了,倒计时减一
    14. latch.countDown();
    15. }).start();
    16. }
    latch.await();
    System.out.println("我是老板,所有的工人的活儿都干完了");

}
输出
```java
线程2活儿干完了
线程0活儿干完了
线程6活儿干完了
线程4活儿干完了
线程5活儿干完了
线程8活儿干完了
线程1活儿干完了
线程7活儿干完了
线程3活儿干完了
线程9活儿干完了
我是老板,所有的工人的活儿都干完了

CyclicBarrier

让多个线程,等在一个起跑线上。让所有线程等在一个点,没有任何一个线程可以单独通过这个屏障。只有所有线程都执行到这个点的时候,才能穿过
api更加粗暴

  • await();

    public static void main(String[] args) {
          CyclicBarrier barrier = new CyclicBarrier(10);
    
          for (int i = 0; i < 10; i++) {
              int finalI = i;
              new Thread(() -> {
                  // 每个工人开始干活
                  int second = new Random().nextInt(10);
    
                  try {
                      Thread.sleep(second * 10);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
    
                  System.out.println("线程" + finalI + "活儿干完了");
    
                  try {
                      barrier.await();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  } catch (BrokenBarrierException e) {
                      e.printStackTrace();
                  }
                  System.out.println("等其他人到了,我们才能继续");
    
              }).start();
          }
      }
    

    输出

    线程4活儿干完了
    线程0活儿干完了
    线程1活儿干完了
    线程9活儿干完了
    线程5活儿干完了
    线程3活儿干完了
    线程8活儿干完了
    线程6活儿干完了
    线程2活儿干完了
    线程7活儿干完了
    等其他人到了,我们才能继续
    等其他人到了,我们才能继续
    等其他人到了,我们才能继续
    等其他人到了,我们才能继续
    等其他人到了,我们才能继续
    等其他人到了,我们才能继续
    等其他人到了,我们才能继续
    等其他人到了,我们才能继续
    等其他人到了,我们才能继续
    等其他人到了,我们才能继续
    

    Semaphore

    信号量
    想想以下有一大片麦子,线程就是工人,5个工人,要割麦子必须要用到镰刀,那么这个镰刀就是信号量。加入有三把镰刀,那么同时就只能有三个线程进行工作。 如果只有一把镰刀,那么这个信号量就是锁
    api

  • acquire(); 获取信号量

  • release(); 释放信号量

BlockingQueue/BlockingDeque

BlockingQueue

就是一个会阻塞的队列,指定这个队列的容量,加入超过这个容量,那么当前线程就会卡住,只要别的线程取走一个元素,腾出来空间,那么才会完成这个put的操作

public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(2);
        blockingQueue.put(1);
        blockingQueue.put(1);
        blockingQueue.put(1); // 这里会卡出,知道别的线程调用了take方法,腾给出空间

        new Thread(() -> {
            try {
                blockingQueue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

BlockingDeque

这个一个双端队列,满足先进先出,但是头和尾都可以进出

线程池/future

future

public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 一个线程池,里面有10个线程
        ExecutorService threadPool = Executors.newFixedThreadPool(10);

        // 给线程池提交一个任务(runnable实现),但是future会立即返回,
        Future<?> future = threadPool.submit(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("我结束工作了");
            // 如果在这里抛出异常,那么future.get的时候,就会把这个异常拿到
        });

        System.out.println("我把工作提交了");
        // future.get() 会等待线程池里面的某个线程把任务执行完,才能拿到结果,在那之前,get方法会阻塞
        System.out.println(future.get());
        System.out.println("工作做完");
    }

线程池

线程池的参数们

  • corePoolSize 核心员工数量
  • maximumPoolSize 最大招募的员工数量
  • keepAliveTime/unit 员工闲下来多久后炒掉他们
  • workQueue 订单队列
  • threadFactory 造人的工程
  • handler 订单实在太多的处理策略(拒绝策略)
    • AbortPolicy A handler for rejected tasks that throws a{@link RejectedExecutionException}. default policy 放弃,并且丢个异常说明处理不了
    • CallerRunsPolicy 让调用者执行
    • DiscardOldestPolicy 丢弃最旧的任务丢掉
    • DiscardPolicy 偷摸的把最新进来的任务丢掉

image.png

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

用Executors的方法来创建线程池

  • newFixedThreadPool

      /**
       * Creates a thread pool that reuses a fixed number of threads
       * operating off a shared unbounded queue.  At any point, at most
       * {@code nThreads} threads will be active processing tasks.
       * If additional tasks are submitted when all threads are active,
       * they will wait in the queue until a thread is available.
       * If any thread terminates due to a failure during execution
       * prior to shutdown, a new one will take its place if needed to
       * execute subsequent tasks.  The threads in the pool will exist
       * until it is explicitly {@link ExecutorService#shutdown shutdown}.
       *
       * @param nThreads the number of threads in the pool
       * @return the newly created thread pool
       * @throws IllegalArgumentException if {@code nThreads <= 0}
       */
      public static ExecutorService newFixedThreadPool(int nThreads) {
          return new ThreadPoolExecutor(nThreads, nThreads,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>());
      }
    
  • newWorkStealingPool 线程执行完自己的任务,会偷其他线程的任务

      /**
       * Creates a thread pool that maintains enough threads to support
       * the given parallelism level, and may use multiple queues to
       * reduce contention. The parallelism level corresponds to the
       * maximum number of threads actively engaged in, or available to
       * engage in, task processing. The actual number of threads may
       * grow and shrink dynamically. A work-stealing pool makes no
       * guarantees about the order in which submitted tasks are
       * executed.
       *
       * @param parallelism the targeted parallelism level
       * @return the newly created thread pool
       * @throws IllegalArgumentException if {@code parallelism <= 0}
       * @since 1.8
       */
      public static ExecutorService newWorkStealingPool(int parallelism) {
          return new ForkJoinPool
              (parallelism,
               ForkJoinPool.defaultForkJoinWorkerThreadFactory,
               null, true);
      }
    
  • newSingleThreadExecutor 单线程线程池

      /**
       * Creates an Executor that uses a single worker thread operating
       * off an unbounded queue. (Note however that if this single
       * thread terminates due to a failure during execution prior to
       * shutdown, a new one will take its place if needed to execute
       * subsequent tasks.)  Tasks are guaranteed to execute
       * sequentially, and no more than one task will be active at any
       * given time. Unlike the otherwise equivalent
       * {@code newFixedThreadPool(1)} the returned executor is
       * guaranteed not to be reconfigurable to use additional threads.
       *
       * @return the newly created single-threaded Executor
       */
      public static ExecutorService newSingleThreadExecutor() {
          return new FinalizableDelegatedExecutorService
              (new ThreadPoolExecutor(1, 1,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>()));
      }
    
  • newCachedThreadPool 缓存线程池

      /**
       * Creates a thread pool that creates new threads as needed, but
       * will reuse previously constructed threads when they are
       * available.  These pools will typically improve the performance
       * of programs that execute many short-lived asynchronous tasks.
       * Calls to {@code execute} will reuse previously constructed
       * threads if available. If no existing thread is available, a new
       * thread will be created and added to the pool. Threads that have
       * not been used for sixty seconds are terminated and removed from
       * the cache. Thus, a pool that remains idle for long enough will
       * not consume any resources. Note that pools with similar
       * properties but different details (for example, timeout parameters)
       * may be created using {@link ThreadPoolExecutor} constructors.
       *
       * @return the newly created thread pool
       */
      public static ExecutorService newCachedThreadPool() {
          return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                        60L, TimeUnit.SECONDS,
                                        new SynchronousQueue<Runnable>());
      }
    

    等等等等。。 具体的看Executors的源码