不同的问题,都有不同的解决模型,将这些模型简单梳理一下,建立起自己的思维系统,之后遇到这种问题就能够更快做出反应,并且能够站在更加系统性的视角。
image.png

加锁 & 不加锁

线程操作资源,加锁是为了保护资源,限制访问资源的线程数。所以加锁的目标是资源类,常用的做法就是将资源再一次进行封装,添加上锁相关的工具。

  1. // Task is a wrapper around S that adds synchronization.
  2. class Task {
  3. private Lock lock;
  4. private S source;
  5. }
  • 加锁保护资源。当资源会被多线程操作,但需要保证一致性,才需要考虑加锁。对于例子,若 chunk 分配给了某一个线程单独处理,那么对该 chunk 的操作不需要加锁。

  • 加锁控制并发度。当需要线程数超过并发限度,才需要考虑加锁。对于例子,若在设计 API 过程中最多开了 pCount 个线程,那么没有必要通过加锁来控制并发度。

同步 & 互斥

同步是为了控制操作的顺序,互斥是为了控制资源被独占。并发设计模型实际上就是同步模型,核心为生产者消费者模型。

生产者消费者模型

设计一个 API,helper(T[], int chunkSize, int pCount, Processor p)。在 pCount 的并发限度下处理数据 arr。arr 在处理之前,先分块,每一块的大小最多 chunkSize。

生产者生产任务,消费者取来任务执行。任务从生产者到消费者需要一个中转空间,供生产者放置任务、消费者获取任务,所以生产者消费者往往需要结合阻塞队列来实现。下面使用生产者消费者模型来解决这个问题:

  1. // helper(T[], int chunkSize, int pCount, Processor p)
  2. @FunctionalInterface
  3. interface Processor<T> {
  4. void process(T t);
  5. }
  6. class Slice<T> {
  7. final T[] arr;
  8. final int st, en;
  9. public Slice(T[] arr, int st, int en) {
  10. this.arr = arr;
  11. this.st = st;
  12. this.en = en;
  13. }
  14. }
  15. class Task<T> {
  16. private final Slice<T> content;
  17. public Task(Slice<T> content) {
  18. this.content = content;
  19. }
  20. public void run(Processor<T> p, AtomicInteger cnt) {
  21. for (int i = content.st; i < content.en; i++) {
  22. p.process(content.arr[i]);
  23. cnt.incrementAndGet();
  24. }
  25. }
  26. }
  27. class Helper<T> {
  28. private BlockingQueue<Task<T>> q = new LinkedBlockingQueue<>();
  29. private AtomicInteger cnt = new AtomicInteger(0);
  30. public void help(T[] arr, int size, int pc, Processor<T> p) {
  31. // consume
  32. for (int i = 0; i < pc; i++) {
  33. new Thread(() -> {
  34. while (cnt.get() < arr.length) {
  35. Task<T> task = q.poll();
  36. if (task != null) {
  37. task.run(p, cnt);
  38. }
  39. }
  40. }).start();
  41. }
  42. // produce
  43. int i;
  44. for (i = 0; i < arr.length - size; i += size) {
  45. Task<T> task = new Task<>(new Slice<>(arr, i, i + size));
  46. q.offer(task);
  47. }
  48. if (i < arr.length) {
  49. Task<T> task = new Task<>(new Slice<T>(arr, i, i + size));
  50. q.offer(task);
  51. }
  52. }
  53. }
  • Java 中一切皆对象,所以将操作的逻辑抽象为一个接口 Processer,处理的数据抽象为泛型 T
  • 由于初始结构为一个数组,可以用切片的思想来进行分片,但是 Java 没有原生的分片,故定义一个分片类 Slice。那么此时的 Slice,就是最原始的资源。
  • 将资源封装为任务 Task,也就是加上线程安全的保护措施。由于这里我考虑每一个分片仅由一个线程负责,所以不存在数据竞争,无需加锁。在执行任务的时候,我增加了一个 cnt 变量,用于记录完成的任务数,方便最后让线程退出。
  • 实现生产者消费者模型,阻塞队列用于存储任务。生产者往队列中放任务,消费者从队列中取任务。

image.png

等待通知模型

等待通知模型最适用于控制逻辑顺序,等待的是线程,通知的也是线程。对于上一个例子,任务之间的执行实际上没有顺序要求,所以不适合使用等待通知模型。为了不使例子复杂化,使用实现阻塞队列的例子。阻塞队列存在逻辑顺序,队列非空才能够取数据,队列非满才能放数据。(BlockingQueue 三种实现方式

  1. class BlockingQueue<T> {
  2. private Queue<T> q = new LinkedList<>();
  3. private Lock lock = new ReentrantLock();
  4. private Condition notEmpty = lock.newCondition();
  5. private Condition notFull = lock.newCondition();
  6. private int size;
  7. private int capacity;
  8. public T poll() {
  9. T task = null;
  10. lock.lock();
  11. try {
  12. while (size == 0) {
  13. notEmpty.await();
  14. }
  15. task = q.poll();
  16. size--;
  17. notFull.signal();
  18. } finally {
  19. lock.unlock();
  20. }
  21. return task;
  22. }
  23. public void offer(T task) {
  24. lock.lock();
  25. try {
  26. while (size == capacity) {
  27. notFull.await();
  28. }
  29. q.offer(task);
  30. size++;
  31. notEmpty.signal();
  32. } finally {
  33. lock.unlock();
  34. }
  35. }
  36. }

总结

核心点在于认清:线程操作任务,任务是资源的安全包装。在解决问题主要记住两个模型:

  1. 生产者消费者模型,是绝大多数线程执行任务的抽象,常常结合阻塞队列使用。
  2. 等待通知模型,适用于有逻辑顺序约束的情况。