Java 线程

1、Volatile关键字与内存可见性

volatile关键字:当多个线程操作数据时,可以保证内存中的数据的可见性,相较于synchronize是一种较为轻量级的同步策略。
注意:

  • 1.volatile 不具备“互斥性”
  • 2.volatile 不能保证变量的“原子性”

    A.主存的变量数据在其他线程缓存中不可见的代码演示案例

    ```java /**

    • TestVolatile
    • encoding:UTF-8 *
    • volatile关键字 *
    • @author Fcant 下午 12:49:36 2020/2/24/0024 */ public class TestVolatile { public static void main(String[] args) {

      1. ThreadFh fh = new ThreadFh();
      2. new Thread(fh).start();
      3. while (true) {
      4. if (fh.isFlag()) {
      5. System.out.println("-----------------");
      6. break;
      7. }
      8. }

      } }

class ThreadFh implements Runnable { private boolean flag = false;

  1. public boolean isFlag() {
  2. return flag;
  3. }
  4. public void setFlag(boolean flag) {
  5. this.flag = flag;
  6. }
  7. @Override
  8. public void run() {
  9. try {
  10. Thread.sleep(200);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. flag = true;
  15. System.out.println("flag=" + isFlag());
  16. }

}

  1. <a name="KHIbU"></a>
  2. ### B.使用同步锁进行内存变量的刷新-同步锁容易造成多个线程访问的阻塞
  3. ```java
  4. /**
  5. * TestVolatile
  6. * <p>
  7. * encoding:UTF-8
  8. *
  9. * volatile关键字
  10. *
  11. * @author Fcant 下午 12:49:36 2020/2/24/0024
  12. */
  13. public class TestVolatile {
  14. public static void main(String[] args) {
  15. ThreadFh fh = new ThreadFh();
  16. new Thread(fh).start();
  17. while (true) {
  18. synchronized (fh) {
  19. if (fh.isFlag()) {
  20. System.out.println("-----------------");
  21. break;
  22. }
  23. }
  24. }
  25. }
  26. }
  27. class ThreadFh implements Runnable {
  28. private boolean flag = false;
  29. public boolean isFlag() {
  30. return flag;
  31. }
  32. public void setFlag(boolean flag) {
  33. this.flag = flag;
  34. }
  35. @Override
  36. public void run() {
  37. try {
  38. Thread.sleep(200);
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. }
  42. flag = true;
  43. System.out.println("flag=" + isFlag());
  44. }
  45. }

C.使用Volatile关键字解决主存变量不可见性的问题

  1. /**
  2. * TestVolatile
  3. * <p>
  4. * encoding:UTF-8
  5. *
  6. * volatile关键字:当多个线程操作数据时,可以保证内存中的数据的可见性
  7. * 相较于synchronize是一种较为轻量级的同步策略
  8. * 注意:
  9. * 1.volatile 不具备“互斥性”
  10. * 2.volatile 不能保证变量的“原子性”
  11. *
  12. * @author Fcant 下午 12:49:36 2020/2/24/0024
  13. */
  14. public class TestVolatile {
  15. public static void main(String[] args) {
  16. ThreadFh fh = new ThreadFh();
  17. new Thread(fh).start();
  18. while (true) {
  19. if (fh.isFlag()) {
  20. System.out.println("-----------------");
  21. break;
  22. }
  23. }
  24. }
  25. }
  26. class ThreadFh implements Runnable {
  27. private volatile boolean flag = false;
  28. public boolean isFlag() {
  29. return flag;
  30. }
  31. public void setFlag(boolean flag) {
  32. this.flag = flag;
  33. }
  34. @Override
  35. public void run() {
  36. try {
  37. Thread.sleep(200);
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. }
  41. flag = true;
  42. System.out.println("flag=" + isFlag());
  43. }
  44. }

2、原子变量-CAS算法

A.以i++演示原子性问题

  1. /**
  2. * TestAtmoic
  3. * <p>
  4. * encoding:UTF-8
  5. *
  6. * 一、i++的原子性问题:i++的操作实际上分为三个步骤“读-改-写”
  7. * int i = 10;
  8. * i = i++; // 10
  9. * int temp = i;
  10. * i = i + 1;
  11. * i = temp;
  12. *
  13. * 二、原子变量:JDK1.5后,java.util.concurrent.atomic 包下提供了常用的原子变量
  14. * 1.Volatile保证内存可见性
  15. * 2.CAS(Compare-And-Swap)算法保证数据的原子性
  16. * 内存值 V
  17. * 预估值 A
  18. * 更新值 B
  19. * 当且仅当V == A时,V = B,否则,将不作任何操作
  20. *
  21. * @author Fcant 下午 17:34:21 2020/2/24/0024
  22. */
  23. public class TestAtomic {
  24. public static void main(String[] args) {
  25. AtomicFh atomicFh = new AtomicFh();
  26. for (int i = 0; i < 10; i++) {
  27. new Thread(atomicFh).start();
  28. }
  29. }
  30. }
  31. class AtomicFh implements Runnable {
  32. private volatile int serialNumber = 0;
  33. public int getSerialNumber() {
  34. return serialNumber++;
  35. }
  36. @Override
  37. public void run() {
  38. try {
  39. Thread.sleep(200);
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. }
  43. System.out.println(Thread.currentThread().getName() + " : " + getSerialNumber());
  44. }
  45. }

B.使用java.util.concurrent.atomic提供的原子类解决原子性问题

  1. import java.util.concurrent.atomic.AtomicInteger;
  2. /**
  3. * TestAtmoic
  4. * <p>
  5. * encoding:UTF-8
  6. *
  7. * 一、i++的原子性问题:i++的操作实际上分为三个步骤“读-改-写”
  8. * int i = 10;
  9. * i = i++; // 10
  10. * int temp = i;
  11. * i = i + 1;
  12. * i = temp;
  13. *
  14. * 二、原子变量:JDK1.5后,java.util.concurrent.atomic 包下提供了常用的原子变量
  15. * 1.Volatile保证内存可见性
  16. * 2.CAS(Compare-And-Swap)算法保证数据的原子性
  17. * 内存值 V
  18. * 预估值 A
  19. * 更新值 B
  20. * 当且仅当V == A时,V = B,否则,将不作任何操作
  21. *
  22. * @author Fcant 下午 17:34:21 2020/2/24/0024
  23. */
  24. public class TestAtomic {
  25. public static void main(String[] args) {
  26. AtomicFh atomicFh = new AtomicFh();
  27. for (int i = 0; i < 10; i++) {
  28. new Thread(atomicFh).start();
  29. }
  30. }
  31. }
  32. class AtomicFh implements Runnable {
  33. // private volatile int serialNumber = 0;
  34. private AtomicInteger serialNumber = new AtomicInteger();
  35. public int getSerialNumber() {
  36. return serialNumber.getAndIncrement();
  37. }
  38. @Override
  39. public void run() {
  40. try {
  41. Thread.sleep(200);
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }
  45. System.out.println(Thread.currentThread().getName() + " : " + getSerialNumber());
  46. }
  47. }

C.CAS算法

CAS(Compare-And-Swap)是一种硬件对并发的支持,针对多处理器操作而设计的处理器中的一种特殊指令,用于管理对共享数据的并发访问。
CAS是一种无锁的非阻塞算法的实现。
CAS包含了3个操作数:

  1. 需要读写的内存值V
  2. 进行比较的值A
  3. 拟写入的新值B

当且仅当V的值等于A时,CAS通过原子方式用新值B来更新V的值,否则不会执行任何操作。

  1. /**
  2. * TestCompareAndSwap
  3. * <p>
  4. * encoding:UTF-8
  5. *
  6. * @author Fcant 下午 18:09:34 2020/2/24/0024
  7. */
  8. public class TestCompareAndSwap {
  9. public static void main(String[] args) {
  10. final CompareAndSwap cas = new CompareAndSwap();
  11. for(int i=1 ;i < 10;i++){
  12. new Thread(new Runnable() {
  13. @Override
  14. public void run() {
  15. int expectedValue = cas.getValue();
  16. boolean compareAndSet = cas.compareAndSet(expectedValue, (int) Math.random() * 101);
  17. System.out.println(compareAndSet);
  18. }
  19. });
  20. }
  21. }
  22. }
  23. class CompareAndSwap {
  24. private int value;
  25. // 获取内存值
  26. public synchronized int getValue() {
  27. return value;
  28. }
  29. // 比较
  30. public synchronized int compareAndSwap(int expectedValue, int newValue) {
  31. int oldValue = value;
  32. if (oldValue == expectedValue) {
  33. oldValue = newValue;
  34. }
  35. return oldValue;
  36. }
  37. // 设置
  38. public synchronized boolean compareAndSet(int ecpectedValue, int newValue) {
  39. return ecpectedValue == compareAndSwap(ecpectedValue, newValue);
  40. }
  41. }

3、ConcurrentHashMap锁分段机制

Java5.0在java.util.concurrent包中提供了多种并发容器类来改进同步容器的性能。
ConcurrentHashMap同步容器类是Java5增加的一个线程安全的哈希表。对与多线程的操作,介于HashMapHashtable之间。内部采用“锁分段”机制替代Hashtable的独占锁。进而提高性能。
此包还提供了设计用于多线程上下文中的Collection实现:
ConcurrentHashMapConcurrentSkipListMapConcurrentSkipListSetCopyOnWriteArrayListCopyOnWriteArraySet
当期望许多线程访问一个给定的collection时,ConcurrentHashMap通常优于同步的HashMapConcurrentSkipListMap通常优于同步的TreeMap
当期望的读数和遍历远远大于列表的更新数时,CopyOnWriteArrayList优于同步的ArrayList

A.使用ArrayList时出现并发修改异常

  1. import java.util.ArrayList;
  2. import java.util.Collections;
  3. import java.util.Iterator;
  4. import java.util.List;
  5. /**
  6. * TestCopyOnWriteArray
  7. * <p>
  8. * encoding:UTF-8
  9. *
  10. * CopyOnWriteArrayList/CopyOnWriteArraySet:“写入并复制”
  11. *
  12. * @author Fcant 下午 18:47:27 2020/2/24/0024
  13. */
  14. public class TestCopyOnWriteArray {
  15. public static void main(String[] args) {
  16. HelloThread helloThread = new HelloThread();
  17. for (int i = 0; i < 10; i++) {
  18. new Thread(helloThread).start();
  19. }
  20. }
  21. }
  22. class HelloThread implements Runnable {
  23. private static List<String> list = Collections.synchronizedList(new ArrayList<>());
  24. static {
  25. list.add("AA");
  26. list.add("BB");
  27. list.add("CC");
  28. }
  29. @Override
  30. public void run() {
  31. Iterator<String> iterator = list.iterator();
  32. while (iterator.hasNext()) {
  33. System.out.println(iterator.next());
  34. list.add("DD");
  35. }
  36. }
  37. }

B.使用CopyOnWriteArrayList解决并发修改异常

CopyOnWriteArrayList在写入时会复制一个新的链表
注意:添加操作多时,效率低,因为每次添加时都会进行复制。并发迭代操作时可以选择

  1. import java.util.Iterator;
  2. import java.util.concurrent.CopyOnWriteArrayList;
  3. /**
  4. * TestCopyOnWriteArray
  5. * <p>
  6. * encoding:UTF-8
  7. *
  8. * CopyOnWriteArrayList/CopyOnWriteArraySet:“写入并复制”
  9. * 注意:添加操作多时,效率低,因为每次添加时都会进行复制。并发迭代操作时可以选择
  10. *
  11. * @author Fcant 下午 18:47:27 2020/2/24/0024
  12. */
  13. public class TestCopyOnWriteArray {
  14. public static void main(String[] args) {
  15. HelloThread helloThread = new HelloThread();
  16. for (int i = 0; i < 10; i++) {
  17. new Thread(helloThread).start();
  18. }
  19. }
  20. }
  21. class HelloThread implements Runnable {
  22. // private static List<String> list = Collections.synchronizedList(new ArrayList<>());
  23. private static CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
  24. static {
  25. list.add("AA");
  26. list.add("BB");
  27. list.add("CC");
  28. }
  29. @Override
  30. public void run() {
  31. Iterator<String> iterator = list.iterator();
  32. while (iterator.hasNext()) {
  33. System.out.println(iterator.next());
  34. list.add("DD");
  35. }
  36. }
  37. }

4、CountDownLatch闭锁

Java5在java.util.concurrent包中提供了多种并发容器类来改进同步容器的性能。
CountDownLatch一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活动直到其他活动都完成才继续执行:

  1. 确保某个计算在其需要的所有资源都被初始化之后才继续执行;
  2. 确保某个服务在其依赖的所有其他服务都已经启动之后才启动;
  3. 等待直到某个操作所有参与者都准备就绪再继续执行。

    A.未进行闭锁的操作案例

    ```java import java.util.concurrent.CountDownLatch;

/**

  • TestCountDownLatch
  • encoding:UTF-8 *
  • CountDownLatch:闭锁,在完成某些运算时,只有其他所有线程的运算全部完成,当前运算才继续执行 *
  • @author Fcant 下午 19:24:36 2020/2/24/0024 */ public class TestCountDownLatch { public static void main(String[] args) {

    1. final CountDownLatch countDownLatch = new CountDownLatch(5);
    2. LatchFh fh = new LatchFh(countDownLatch);
    3. long start = System.currentTimeMillis();
    4. for (int i = 0; i < 10; i++) {
    5. new Thread(fh).start();
    6. }
    7. long end = System.currentTimeMillis();
    8. System.out.println("耗时为:" + (end - start));

    } }

class LatchFh implements Runnable {

  1. private CountDownLatch countDownLatch;
  2. public LatchFh(CountDownLatch countDownLatch) {
  3. this.countDownLatch = countDownLatch;
  4. }
  5. @Override
  6. public void run() {
  7. for (int i = 0; i < 50000; i++) {
  8. if (i % 2 == 0) {
  9. System.out.println(i);
  10. }
  11. }
  12. }

}

  1. <a name="NVj4g"></a>
  2. ### B.案例修改为闭锁方式
  3. ```java
  4. import java.util.concurrent.CountDownLatch;
  5. /**
  6. * TestCountDownLatch
  7. * <p>
  8. * encoding:UTF-8
  9. *
  10. * CountDownLatch:闭锁,在完成某些运算时,只有其他所有线程的运算全部完成,当前运算才继续执行
  11. *
  12. * @author Fcant 下午 19:24:36 2020/2/24/0024
  13. */
  14. public class TestCountDownLatch {
  15. public static void main(String[] args) {
  16. final CountDownLatch countDownLatch = new CountDownLatch(5);
  17. LatchFh fh = new LatchFh(countDownLatch);
  18. long start = System.currentTimeMillis();
  19. for (int i = 0; i < 5; i++) {
  20. new Thread(fh).start();
  21. }
  22. try {
  23. countDownLatch.await();
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. long end = System.currentTimeMillis();
  28. System.out.println("耗时为:" + (end - start));
  29. }
  30. }
  31. class LatchFh implements Runnable {
  32. private CountDownLatch countDownLatch;
  33. public LatchFh(CountDownLatch countDownLatch) {
  34. this.countDownLatch = countDownLatch;
  35. }
  36. @Override
  37. public void run() {
  38. synchronized (this) {
  39. try {
  40. for (int i = 0; i < 50000; i++) {
  41. if (i % 2 == 0) {
  42. System.out.println(i);
  43. }
  44. }
  45. }finally {
  46. countDownLatch.countDown();
  47. }
  48. }
  49. }
  50. }

5、实现Callable接口

  • 一、创建执行线程的方式三:实现Callable接口。相较于实现Runnable接口的方式,Callable方法可以有返回值,并且可以抛出异常
  • 二、执行Callable方式,需要FutureTask实现类的支持,用于接收运算结果。FutureTaskFuture接口的实现类 ```java import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask;

/**

  • TestCallable
  • encoding:UTF-8 *
  • 一、创建执行线程的方式三:实现Callable接口。相较于实现Runnable接口的方式,Callable方法可以有返回值,并且可以抛出异常
  • 二、执行Callable方式,需要FutureTask实现类的支持,用于接收运算结果。FutureTask是Future接口的实现类 *
  • @author Fcant 下午 21:17:28 2020/2/24/0024 */ public class TestCallable { public static void main(String[] args) {

    1. ThreadFc fc = new ThreadFc();
    2. // 1.执行Callable方式,需要FutureTask实现类的支持,用于接收运算结果
    3. FutureTask<Integer> futureTask = new FutureTask<>(fc);
    4. new Thread(futureTask).start();
    5. // 2.接收线程运算后的结果
    6. try {
    7. // FutureTask可用于闭锁
    8. System.out.println(futureTask.get());
    9. } catch (InterruptedException e) {
    10. e.printStackTrace();
    11. } catch (ExecutionException e) {
    12. e.printStackTrace();
    13. }

    } }

class ThreadFc implements Callable { @Override public Integer call() throws Exception { int sum = 0; for (int i = 0; i < 100; i++) { sum += i; } return sum; } }

  1. <a name="BNKOc"></a>
  2. ## 6、`Lock`同步锁
  3. <a name="fwGhi"></a>
  4. ### A.多线程访问共享变量异常代码演示
  5. ```java
  6. /**
  7. * TestLock
  8. * <p>
  9. * encoding:UTF-8
  10. *
  11. * 一、用于解决多线程安全问题的方式
  12. * synchronized:隐式锁
  13. * 1.同步代码块
  14. * 2.同步方法
  15. *
  16. * jdk1.5以后:
  17. * 3.同步锁Lock
  18. * 注意:同步锁是一个显示锁,需要通过lock()上锁,必须通过unlock()方法进行释放锁
  19. *
  20. * @author Fcant 上午 10:02:19 2020/2/25/0025
  21. */
  22. public class TestLock {
  23. public static void main(String[] args) {
  24. Ticket ticket = new Ticket();
  25. new Thread(ticket, "1号窗口").start();
  26. new Thread(ticket, "2号窗口").start();
  27. new Thread(ticket, "3号窗口").start();
  28. }
  29. }
  30. class Ticket implements Runnable {
  31. private int tick = 100;
  32. @Override
  33. public void run() {
  34. while (true) {
  35. if (tick > 0) {
  36. try {
  37. Thread.sleep(200);
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. }
  41. System.out.println(Thread.currentThread().getName() + "完成售票,余票为:" + --tick);
  42. }
  43. }
  44. }
  45. }

B.使用Lock同步锁解决多线程访问共享变量的问题

  1. import java.util.concurrent.locks.Lock;
  2. import java.util.concurrent.locks.ReentrantLock;
  3. /**
  4. * TestLock
  5. * <p>
  6. * encoding:UTF-8
  7. *
  8. * 一、用于解决多线程安全问题的方式
  9. * synchronized:隐式锁
  10. * 1.同步代码块
  11. * 2.同步方法
  12. *
  13. * jdk1.5以后:
  14. * 3.同步锁Lock
  15. * 注意:同步锁是一个显示锁,需要通过lock()上锁,必须通过unlock()方法进行释放锁
  16. *
  17. * @author Fcant 上午 10:02:19 2020/2/25/0025
  18. */
  19. public class TestLock {
  20. public static void main(String[] args) {
  21. Ticket ticket = new Ticket();
  22. new Thread(ticket, "1号窗口").start();
  23. new Thread(ticket, "2号窗口").start();
  24. new Thread(ticket, "3号窗口").start();
  25. }
  26. }
  27. class Ticket implements Runnable {
  28. private int tick = 100;
  29. private Lock lock = new ReentrantLock();
  30. @Override
  31. public void run() {
  32. while (true) {
  33. lock.lock();
  34. try {
  35. if (tick > 0) {
  36. try {
  37. Thread.sleep(200);
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. }
  41. System.out.println(Thread.currentThread().getName() + "完成售票,余票为:" + --tick);
  42. }
  43. }finally {
  44. lock.unlock();
  45. }
  46. }
  47. }
  48. }

C.通过Lock实现等待唤醒机制,生产者与消费者模式

①、没有等待唤醒机制的无限消费案例演示

  1. /**
  2. * TestProductorAndConsumer
  3. * <p>
  4. * encoding:UTF-8
  5. *
  6. * @author Fcant 上午 10:46:52 2020/2/25/0025
  7. */
  8. public class TestProductAndConsumer {
  9. public static void main(String[] args) {
  10. Clerk clerk = new Clerk();
  11. Product product = new Product(clerk);
  12. Consumer consumer = new Consumer(clerk);
  13. new Thread(product, "生产者A").start();
  14. new Thread(consumer, "消费者B").start();
  15. }
  16. }
  17. // 店员
  18. class Clerk{
  19. private int product = 0;
  20. // 进货
  21. public synchronized void get() {
  22. if (product >= 10) {
  23. System.out.println("仓库已满,无法进货!");
  24. } else {
  25. System.out.println(Thread.currentThread().getName() + " : " + ++product);
  26. }
  27. }
  28. // 卖货
  29. public synchronized void sale() {
  30. if (product <= 0) {
  31. System.out.println("缺货中···");
  32. } else {
  33. System.out.println(Thread.currentThread().getName() + " : " + --product);
  34. }
  35. }
  36. }
  37. // 生产者
  38. class Product implements Runnable{
  39. private Clerk clerk;
  40. public Product(Clerk clerk) {
  41. this.clerk = clerk;
  42. }
  43. @Override
  44. public void run() {
  45. for (int i = 0; i < 20; i++) {
  46. clerk.get();
  47. }
  48. }
  49. }
  50. // 消费者
  51. class Consumer implements Runnable{
  52. private Clerk clerk;
  53. public Consumer(Clerk clerk) {
  54. this.clerk = clerk;
  55. }
  56. @Override
  57. public void run() {
  58. for (int i = 0; i < 20; i++) {
  59. clerk.sale();
  60. }
  61. }
  62. }

②、使用等待唤醒机制解决无限消费

  1. /**
  2. * TestProductorAndConsumer
  3. * <p>
  4. * encoding:UTF-8
  5. *
  6. * @author Fcant 上午 10:46:52 2020/2/25/0025
  7. */
  8. public class TestProductAndConsumer {
  9. public static void main(String[] args) {
  10. Clerk clerk = new Clerk();
  11. Product product = new Product(clerk);
  12. Consumer consumer = new Consumer(clerk);
  13. new Thread(product, "生产者A").start();
  14. new Thread(consumer, "消费者B").start();
  15. }
  16. }
  17. // 店员
  18. class Clerk{
  19. private int product = 0;
  20. // 进货
  21. public synchronized void get() {
  22. if (product >= 10) {
  23. System.out.println("仓库已满,无法进货!");
  24. try {
  25. this.wait();
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. } else {
  30. System.out.println(Thread.currentThread().getName() + " : " + ++product);
  31. this.notifyAll();
  32. }
  33. }
  34. // 卖货
  35. public synchronized void sale() {
  36. if (product <= 0) {
  37. try {
  38. this.wait();
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. }
  42. System.out.println("缺货中···");
  43. } else {
  44. System.out.println(Thread.currentThread().getName() + " : " + --product);
  45. this.notifyAll();
  46. }
  47. }
  48. }
  49. // 生产者
  50. class Product implements Runnable{
  51. private Clerk clerk;
  52. public Product(Clerk clerk) {
  53. this.clerk = clerk;
  54. }
  55. @Override
  56. public void run() {
  57. for (int i = 0; i < 20; i++) {
  58. clerk.get();
  59. }
  60. }
  61. }
  62. // 消费者
  63. class Consumer implements Runnable{
  64. private Clerk clerk;
  65. public Consumer(Clerk clerk) {
  66. this.clerk = clerk;
  67. }
  68. @Override
  69. public void run() {
  70. for (int i = 0; i < 20; i++) {
  71. clerk.sale();
  72. }
  73. }
  74. }

③、为了避免虚假唤醒问题,应该总是使用在循环中

  1. /**
  2. * TestProductorAndConsumer
  3. * <p>
  4. * encoding:UTF-8
  5. *
  6. * @author Fcant 上午 10:46:52 2020/2/25/0025
  7. */
  8. public class TestProductAndConsumer {
  9. public static void main(String[] args) {
  10. Clerk clerk = new Clerk();
  11. Product product = new Product(clerk);
  12. Consumer consumer = new Consumer(clerk);
  13. new Thread(product, "生产者A").start();
  14. new Thread(consumer, "消费者B").start();
  15. new Thread(product, "生产者C").start();
  16. new Thread(consumer, "消费者D").start();
  17. }
  18. }
  19. // 店员
  20. class Clerk{
  21. private int product = 0;
  22. // 进货
  23. public synchronized void get() {
  24. while (product >= 1) {
  25. System.out.println("仓库已满,无法进货!");
  26. try {
  27. this.wait();
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. System.out.println(Thread.currentThread().getName() + " : " + ++product);
  33. this.notifyAll();
  34. }
  35. // 卖货
  36. public synchronized void sale() {
  37. while (product <= 0) {
  38. try {
  39. this.wait();
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. }
  43. System.out.println("缺货中···");
  44. }
  45. System.out.println(Thread.currentThread().getName() + " : " + --product);
  46. this.notifyAll();
  47. }
  48. }
  49. // 生产者
  50. class Product implements Runnable{
  51. private Clerk clerk;
  52. public Product(Clerk clerk) {
  53. this.clerk = clerk;
  54. }
  55. @Override
  56. public void run() {
  57. for (int i = 0; i < 20; i++) {
  58. try {
  59. Thread.sleep(200);
  60. } catch (InterruptedException e) {
  61. e.printStackTrace();
  62. }
  63. clerk.get();
  64. }
  65. }
  66. }
  67. // 消费者
  68. class Consumer implements Runnable{
  69. private Clerk clerk;
  70. public Consumer(Clerk clerk) {
  71. this.clerk = clerk;
  72. }
  73. @Override
  74. public void run() {
  75. for (int i = 0; i < 20; i++) {
  76. clerk.sale();
  77. }
  78. }
  79. }

7、Condition控制线程通信

Condition接口描述了可能会与锁有关联的条件变量。这些变量在用法上与使用Object.wait访问的隐式监视器类似,但提供了更强大的功能。需要特别指出的是,单个Lock可能与多个Condition对象关联。为了避免兼容性问题,Condition方法的名称与对应Object版本中的不同。
Condition对象中,与waitnotifynotifyAll方法对应的分别是awaitsignal、和signalAll
Condition实例实质上被绑定到一个锁上。要为特定Lock实例获得Condition实例,请使用其newCondition()方法

使用LockCondition实现生产者和消费者案例

  1. import java.util.concurrent.locks.Condition;
  2. import java.util.concurrent.locks.Lock;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. /**
  5. * TestProductorAndConsumer
  6. * <p>
  7. * encoding:UTF-8
  8. *
  9. * @author Fcant 上午 10:46:52 2020/2/25/0025
  10. */
  11. public class TestProductAndConsumerByLock {
  12. public static void main(String[] args) {
  13. Clerk clerk = new Clerk();
  14. Product product = new Product(clerk);
  15. Consumer consumer = new Consumer(clerk);
  16. new Thread(product, "生产者A").start();
  17. new Thread(consumer, "消费者B").start();
  18. new Thread(product, "生产者C").start();
  19. new Thread(consumer, "消费者D").start();
  20. }
  21. }
  22. // 店员
  23. class Clerk{
  24. private int product = 0;
  25. private Lock lock = new ReentrantLock();
  26. private Condition condition = lock.newCondition();
  27. // 进货
  28. public void get() {
  29. lock.lock();
  30. try {
  31. while (product >= 1) {
  32. System.out.println("仓库已满,无法进货!");
  33. condition.signal();
  34. }
  35. System.out.println(Thread.currentThread().getName() + " : " + ++product);
  36. this.notifyAll();
  37. }finally {
  38. condition.signalAll();
  39. }
  40. }
  41. // 卖货
  42. public synchronized void sale() {
  43. lock.lock();
  44. try {
  45. while (product <= 0) {
  46. condition.signal();
  47. System.out.println("缺货中···");
  48. }
  49. System.out.println(Thread.currentThread().getName() + " : " + --product);
  50. condition.signalAll();
  51. }finally {
  52. lock.unlock();
  53. }
  54. }
  55. }
  56. // 生产者
  57. class Product implements Runnable{
  58. private Clerk clerk;
  59. public Product(Clerk clerk) {
  60. this.clerk = clerk;
  61. }
  62. @Override
  63. public void run() {
  64. for (int i = 0; i < 20; i++) {
  65. try {
  66. Thread.sleep(200);
  67. } catch (InterruptedException e) {
  68. e.printStackTrace();
  69. }
  70. clerk.get();
  71. }
  72. }
  73. }
  74. // 消费者
  75. class Consumer implements Runnable{
  76. private Clerk clerk;
  77. public Consumer(Clerk clerk) {
  78. this.clerk = clerk;
  79. }
  80. @Override
  81. public void run() {
  82. for (int i = 0; i < 20; i++) {
  83. clerk.sale();
  84. }
  85. }
  86. }

8、线程按序交替

  1. import java.util.concurrent.locks.Condition;
  2. import java.util.concurrent.locks.Lock;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. /**
  5. * TestSortThread
  6. * <p>
  7. * encoding:UTF-8
  8. *
  9. * 编写一个程序,开启3个线程,这三个线程的ID分别为A、B、C,每个线程将自己的ID在屏幕上打印10遍,要求输出的结果必须按顺序显示
  10. * 如:ABCABCABC···依次递归
  11. *
  12. * @author Fcant 下午 13:52:37 2020/2/25/0025
  13. */
  14. public class TestSortThread {
  15. public static void main(String[] args) {
  16. Alternate alternate = new Alternate();
  17. new Thread(new Runnable() {
  18. @Override
  19. public void run() {
  20. for (int i = 0; i <= 20; i++) {
  21. alternate.loopA(i);
  22. }
  23. }
  24. }, "A").start();
  25. new Thread(new Runnable() {
  26. @Override
  27. public void run() {
  28. for (int i = 0; i <= 20; i++) {
  29. alternate.loopB(i);
  30. }
  31. }
  32. }, "B").start();
  33. new Thread(new Runnable() {
  34. @Override
  35. public void run() {
  36. for (int i = 0; i <= 20; i++) {
  37. alternate.loopC(i);
  38. }
  39. System.out.println("------------------------");
  40. }
  41. }, "C").start();
  42. }
  43. }
  44. class Alternate {
  45. private int num = 1;
  46. private Lock lock = new ReentrantLock();
  47. private Condition condition1 = lock.newCondition();
  48. private Condition condition2 = lock.newCondition();
  49. private Condition condition3 = lock.newCondition();
  50. public void loopA(int totalLoop) {
  51. lock.lock();
  52. try {
  53. if (num != 1) {
  54. condition1.await();
  55. }
  56. for (int i = 0; i < 5; i++) {
  57. System.out.println(Thread.currentThread().getName() + "-" + i + "-" + totalLoop);
  58. }
  59. num = 2;
  60. condition2.signal();
  61. } catch (Exception e) {
  62. e.printStackTrace();
  63. }finally {
  64. lock.unlock();
  65. }
  66. }
  67. public void loopB(int totalLoop) {
  68. lock.lock();
  69. try {
  70. if (num != 2) {
  71. condition2.await();
  72. }
  73. for (int i = 0; i < 15; i++) {
  74. System.out.println(Thread.currentThread().getName() + "-" + i + "-" + totalLoop);
  75. }
  76. num = 3;
  77. condition3.signal();
  78. } catch (Exception e) {
  79. e.printStackTrace();
  80. }finally {
  81. lock.unlock();
  82. }
  83. }
  84. public void loopC(int totalLoop) {
  85. lock.lock();
  86. try {
  87. if (num != 3) {
  88. condition3.await();
  89. }
  90. for (int i = 0; i < 20; i++) {
  91. System.out.println(Thread.currentThread().getName() + "-" + i + "-" + totalLoop);
  92. }
  93. num = 1;
  94. condition1.signal();
  95. } catch (Exception e) {
  96. e.printStackTrace();
  97. }finally {
  98. lock.unlock();
  99. }
  100. }
  101. }

9、ReadWriteLock读写锁

  1. import java.util.concurrent.locks.ReadWriteLock;
  2. import java.util.concurrent.locks.ReentrantReadWriteLock;
  3. /**
  4. * TestReadWriteLock
  5. * <p>
  6. * encoding:UTF-8
  7. *
  8. * 1.ReadWriteLock:读写锁
  9. * 写写/读写 需要“互斥”
  10. * 读读 不需要互斥
  11. *
  12. * @author Fcant 下午 14:53:31 2020/2/25/0025
  13. */
  14. public class TestReadWriteLock {
  15. public static void main(String[] args) {
  16. ReadWriteLockFh readWriteLockFh = new ReadWriteLockFh();
  17. new Thread(new Runnable() {
  18. @Override
  19. public void run() {
  20. readWriteLockFh.set((int) (Math.random() * 101));
  21. }
  22. }, "Write").start();
  23. for (int i = 0; i < 1000; i++) {
  24. new Thread(new Runnable() {
  25. @Override
  26. public void run() {
  27. readWriteLockFh.get();
  28. }
  29. }, "Read").start();
  30. }
  31. }
  32. }
  33. class ReadWriteLockFh {
  34. private int num = 0;
  35. private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  36. // 读
  37. public void get() {
  38. readWriteLock.readLock().lock();
  39. try {
  40. System.out.println(Thread.currentThread().getName() + " : " + num);
  41. }finally {
  42. readWriteLock.readLock().unlock();
  43. }
  44. }
  45. // 写
  46. public void set(int num) {
  47. readWriteLock.writeLock().lock();
  48. try {
  49. System.out.println(Thread.currentThread().getName());
  50. this.num = num;
  51. }finally {
  52. readWriteLock.writeLock().unlock();
  53. }
  54. }
  55. }

10、线程八锁

线程八锁的关键:
①非静态方法的锁默认为this,静态方法的锁为对应的Class实例
②某一时刻内,只能有一个线程持有锁,无论几个方法

  1. /**
  2. * TestThread8Monitor
  3. * <p>
  4. * encoding:UTF-8
  5. *
  6. * 题目:判断打印的one或two
  7. * 1.两个普通同步方法,两个线程,标准打印 // one two
  8. * 2.新增Thread.sleep() 给 getOne()方法 // one two
  9. * 3.新增普通方法getThree() // three one two
  10. * 4.两个普通同步方法,两个Number对象 // two one
  11. * 5.修改getOne()为静态同步方法 // one two
  12. * 6.修改两个方法均为静态同步方法,一个Number对象 // one two
  13. * 7.一个静态同步方法,一个非静态同步方法,两个Number对象 // two one
  14. * 8.两个静态同步方法,两个Number对象 // one two
  15. *
  16. * 线程八锁的关键:
  17. * ①非静态方法的锁默认为this,静态方法的锁为对应的Class实例
  18. * ②某一时刻内,只能有一个线程持有锁,无论几个方法
  19. *
  20. * @author Fcant 下午 15:11:22 2020/2/25/0025
  21. */
  22. public class TestThread8Monitor {
  23. public static void main(String[] args) {
  24. Number number = new Number();
  25. Number num = new Number();
  26. new Thread(new Runnable() {
  27. @Override
  28. public void run() {
  29. number.getOne();
  30. }
  31. }).start();
  32. new Thread(new Runnable() {
  33. @Override
  34. public void run() {
  35. num.getTwo();
  36. }
  37. }).start();
  38. new Thread(new Runnable() {
  39. @Override
  40. public void run() {
  41. number.getThree();
  42. }
  43. }).start();
  44. }
  45. }
  46. class Number {
  47. public synchronized void getOne() {
  48. try {
  49. Thread.sleep(3000);
  50. } catch (InterruptedException e) {
  51. e.printStackTrace();
  52. }
  53. System.out.println("ONE");
  54. }
  55. public synchronized void getTwo() {
  56. System.out.println("TWO");
  57. }
  58. public synchronized void getThree() {
  59. System.out.println("Three");
  60. }
  61. }

11、线程池

A.线程池

提供了一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁额外开销,提高了响应速率

B.线程池体系结构

java.util.concurrent.Ececutor: 负责线程的使用与调度的根接口
|—ExecutorService 子接口:线程池的主要接口
|—ThreadPoolExecutor 线程池的实现类
|—ScheduledExecutorService 子接口:负责线程的调度
|—ScheduledThreadPoolExecutor:继承ThreadPoolExecutor,实现ScheduledExecutorService
image.png

C.工具类:Executors

ExecutorService newFixedThreadPool() : 创建固定大小的线程池。
ExecutorService newCachedThreadPool() : 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。
ExecutorService newSingleThreadExecutor() : 创建单个线程池,线程池中只有一个线程。
ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务。

D.线程池的使用

  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.concurrent.*;
  4. /**
  5. * TestThreadPool
  6. * <p>
  7. * encoding:UTF-8
  8. *
  9. * 一、线程池
  10. * 提供了一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁额外开销,提高了响应速率
  11. *
  12. * 二、线程池体系结构
  13. * java.util.concurrent.Ececutor: 负责线程的使用与调度的根接口
  14. * |--ExecutorService 子接口:线程池的主要接口
  15. * |--ThreadPoolExecutor 线程池的实现类
  16. * |--ScheduledExecutorService 子接口:负责线程的调度
  17. * |--ScheduledThreadPoolExecutor:继承ThreadPoolExecutor,实现ScheduledExecutorService
  18. *
  19. * 三、工具类:Executors
  20. * ExecutorService newFixedThreadPool() : 创建固定大小的线程池。
  21. * ExecutorService newCachedThreadPool() : 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。
  22. * ExecutorService newSingleThreadExecutor() : 创建单个线程池,线程池中只有一个线程。
  23. * ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务。
  24. *
  25. * @author Fcant 下午 16:03:54 2020/2/25/0025
  26. */
  27. public class TestThreadPool {
  28. public static void main(String[] args) throws ExecutionException, InterruptedException {
  29. ThreadPoolFc fc = new ThreadPoolFc();
  30. // 1.创建线程池
  31. ExecutorService executorService = Executors.newFixedThreadPool(5);
  32. // 2.为线程池分配任务
  33. /*for (int i = 0; i < 10; i++) {
  34. executorService.submit(fc);
  35. }*/
  36. List<Future<Integer>> list = new ArrayList<>();
  37. for (int j = 0; j < 10; j++) {
  38. Future<Integer> future = executorService.submit(new Callable<Integer>() {
  39. @Override
  40. public Integer call() throws Exception {
  41. int sum = 0;
  42. for (int i = 0; i < 100; i++) {
  43. sum += i;
  44. }
  45. return sum;
  46. }
  47. });
  48. list.add(future);
  49. }
  50. // 3.关闭线程池
  51. executorService.shutdown();
  52. for (Future<Integer> integerFuture : list) {
  53. System.out.println(integerFuture.get());
  54. }
  55. // new Thread(fc).start();
  56. // new Thread(fc).start();
  57. }
  58. }
  59. class ThreadPoolFc implements Runnable {
  60. @Override
  61. public void run() {
  62. for (int i = 0; i < 100; i++) {
  63. System.out.println(Thread.currentThread().getName() + " : " + i);
  64. }
  65. }
  66. }

12、线程调度

  1. import java.util.Random;
  2. import java.util.concurrent.*;
  3. /**
  4. * TestSchuledThreadPool
  5. * <p>
  6. * encoding:UTF-8
  7. *
  8. * * 一、线程池
  9. * * 提供了一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁额外开销,提高了响应速率
  10. * *
  11. * * 二、线程池体系结构
  12. * * java.util.concurrent.Ececutor: 负责线程的使用与调度的根接口
  13. * * |--ExecutorService 子接口:线程池的主要接口
  14. * * |--ThreadPoolExecutor 线程池的实现类
  15. * * |--ScheduledExecutorService 子接口:负责线程的调度
  16. * * |--ScheduledThreadPoolExecutor:继承ThreadPoolExecutor,实现ScheduledExecutorService
  17. * *
  18. * * 三、工具类:Executors
  19. * * ExecutorService newFixedThreadPool() : 创建固定大小的线程池。
  20. * * ExecutorService newCachedThreadPool() : 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。
  21. * * ExecutorService newSingleThreadExecutor() : 创建单个线程池,线程池中只有一个线程。
  22. * * ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务。
  23. *
  24. * @author Fcant 下午 16:58:29 2020/2/25/0025
  25. */
  26. public class TestScheduledThreadPool {
  27. public static void main(String[] args) throws ExecutionException, InterruptedException {
  28. ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
  29. for (int i = 0; i < 10; i++) {
  30. ScheduledFuture<Integer> future = scheduledExecutorService.schedule(new Callable<Integer>() {
  31. @Override
  32. public Integer call() throws Exception {
  33. int num = new Random().nextInt(100);
  34. System.out.println(Thread.currentThread().getName() + ":" + num);
  35. return num;
  36. }
  37. }, 3, TimeUnit.SECONDS);
  38. System.out.println(future.get());
  39. }
  40. scheduledExecutorService.shutdown();
  41. }
  42. }

13、ForkJoinPool分支/合并框架 工作窃取

Fork/Join框架:就是在必要情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行join汇总。
image.png

Fork/Join框架与传统线程池的区别

采用“工作窃取”模式(work-stealing):
当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。
相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的处理方式上,在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态,而在fork/join框架实现中,如果某个子问题由于等待另外一个子问题的完成而无法继续运行,那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行,这种方式减少了线程的等待时间,提高了性能。

  1. public class ForkJoinCalculate extends RecursiveTask<Long> {
  2. public static final long serialVersionUID = 134656970987L;
  3. private Long start;
  4. private Long end;
  5. public ForkJoinCalculate(Long start, Long end) {
  6. this.start = start;
  7. this.end = end;
  8. }
  9. public static final long THRESHOLD = 10000;
  10. @Override
  11. protected Long compute() {
  12. long length = end - start;
  13. if (length <= THRESHOLD) {
  14. long sum = 0;
  15. for (long i = start; i <= end; i++) {
  16. sum += i;
  17. }
  18. return sum;
  19. } else {
  20. long middle = (start + end) / 2;
  21. ForkJoinCalculate left = new ForkJoinCalculate(start, middle);
  22. // 拆分子任务,同时压入线程队列
  23. left.fork();
  24. ForkJoinCalculate right = new ForkJoinCalculate(middle + 1, end);
  25. right.fork();
  26. return left.join() + right.join();
  27. }
  28. }
  29. }
  1. public class ForkJoinTest {
  2. // Fork/Join操作
  3. @Test
  4. public void forkJoinTest() {
  5. Instant start = Instant.now();
  6. ForkJoinPool pool = new ForkJoinPool();
  7. ForkJoinCalculate task = new ForkJoinCalculate(0L, 10000000L);
  8. Long sum = pool.invoke(task);
  9. System.out.println(sum);
  10. Instant end = Instant.now();
  11. System.out.println("耗费时间为:" + Duration.between(start, end).toMillis() + "毫秒");
  12. }
  13. // 常规行程操作
  14. @Test
  15. public void threadTest() {
  16. Instant start = Instant.now();
  17. Long sum = 0L;
  18. for(long i=0 ;i < 10000000L;i++){
  19. sum += i;
  20. }
  21. System.out.println(sum);
  22. Instant end = Instant.now();
  23. System.out.println("耗费时间为:" + Duration.between(start, end).toMillis() + "毫秒");
  24. }
  25. // Java8并行流的操作
  26. @Test
  27. public void parallelTest() {
  28. Instant start = Instant.now();
  29. LongStream.rangeClosed(0, 100000000L)
  30. .parallel()
  31. .reduce(0, Long::sum);
  32. Instant end = Instant.now();
  33. System.out.println("耗时为:" + Duration.between(start, end).toMillis() + "毫秒");
  34. }
  35. }