线程安全与线程不安全

  • 线程安全指多个线程在执行同一段代码的时候采用加锁机制,使每次的执行结果和单线程执行的结果都是一样的,不存在执行程序时出现意外结果。
  • 线程不安全是指不提供加锁机制保护,有可能出现多个线程先后更改数据造成所得到的数据是脏数据。

    单核可以运行多线程程序吗?

    可以,主要是依靠于操作系统的进程的调度算法。
    1.(单核CPU)同一时间,cpu只能处理1个线程,只有1个线程在执行;
    2.多线程同时执行:是CPU快速的在多个线程之间的切换;
    3.cpu调度线程的时间足够快,就造成了多线程的“同时”执行;
    4.如果线程数非常多,cpu会在n个线程之间切换,消耗大量的cpu资源;
    5.每个线程被调度的次数会降低,线程的执行效率降低;

    创建线程的几个方法

    1、继承Thread类
    Thread 类本质上是实现了 Runnable 接口的一个实例,代表一个线程的实例。启动线程的唯一方法就是通过 Thread 类的 start()实例方法。start()方法是一个 native 方法,它将启动一个新线程,并执行 run()方法。 ```java public class MyThread extends Thread { public void run() {
    1. System.out.println("MyThread.run()");
    } }

MyThread myThread1 = new MyThread(); myThread1.start();

  1. 2、**实现Runnable**
  2. ```java
  3. public class MyThread extends OtherClass implements Runnable {
  4. public void run() {
  5. System.out.println("MyThread.run()");
  6. }
  7. }
  8. //启动 MyThread,需要首先实例化一个 Thread,并传入自己的 MyThread 实例:
  9. MyThread myThread = new MyThread();
  10. Thread thread = new Thread(myThread);
  11. thread.start();
  12. //事实上,当传入一个 Runnable target 参数给 Thread 后,Thread 的 run()方法就会调用
  13. target.run()
  14. public void run() {
  15. if (target != null) {
  16. target.run();
  17. }
  18. }

3、ExecutorService、Callable、Future 有返回值线程
有返回值的任务必须实现 Callable 接口,类似的,无返回值的任务必须实现 Runnable 接口。执行 Callable 任务后,可以获取一个 Future 的对象,在该对象上调用 get 就可以获取到 Callable 任务返回的 Object 了,再结合线程池接口 ExecutorService 就可以实现传说中有返回结果的多线程了。

  1. //创建一个线程池
  2. ExecutorService pool = Executors.newFixedThreadPool(taskSize);
  3. // 创建多个有返回值的任务
  4. List<Future> list = new ArrayList<Future>();
  5. for (int i = 0; i < taskSize; i++) {
  6. Callable c = new MyCallable(i + " ");
  7. // 执行任务并获取 Future 对象
  8. Future f = pool.submit(c);
  9. list.add(f);
  10. }
  11. // 关闭线程池
  12. pool.shutdown();
  13. // 获取所有并发任务的运行结果
  14. for (Future f : list) {
  15. // 从 Future 对象上获取任务的返回值,并输出到控制台
  16. System.out.println("res:" + f.get().toString());
  17. }

4、基于线程池的方式
线程和数据库连接这些资源都是非常宝贵的资源。那么每次需要的时候创建,不需要的时候销毁,是非常浪费资源的。那么我们就可以使用缓存的策略,也就是使用线程池。

  1. // 创建线程池
  2. ExecutorService threadPool = Executors.newFixedThreadPool(10);
  3. while(true) {
  4. threadPool.execute(new Runnable() { // 提交多个线程任务,并执行
  5. @Override
  6. public void run() {
  7. System.out.println(Thread.currentThread().getName() + " is running ..");
  8. try {
  9. Thread.sleep(3000);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. });
  15. }

如何停止一个线程

正常运行结束

程序运行结束,线程自动结束。

使用退出标志退出线程

一般 run()方法执行完,线程就会正常结束,然而,常常有些线程是守护线程。它们需要长时间的运行,只有在外部某些条件满足的情况下,才能关闭这些线程。使用一个变量来控制循环,例如:最直接的方法就是设一个 boolean 类型的标志,并通过设置这个标志为 true 或 false 来控制 while循环是否退出,代码示例:

  1. public class ThreadSafe extends Thread {
  2. public volatile boolean exit = false;
  3. public void run() {
  4. while (!exit){
  5. //do something
  6. }
  7. }
  8. }

注意,exit需要使用volatile修饰,可以在多个线程间可见。

Interrupt 方法结束线程

使用 interrupt()方法来中断线程有两种情况:

  1. 线程处于阻塞状态:如使用了 sleep,同步锁的 wait,socket 中的 receiver,accept 等方法时,会使线程处于阻塞状态。当调用线程的 interrupt()方法时,会抛出 InterruptException 异常。阻塞中的那个方法抛出这个异常,通过代码捕获该异常,然后 break 跳出循环状态,从而让我们有机会结束这个线程的执行。通常很多人认为只要调用 interrupt 方法线程就会结束,实际上是错的, 一定要先捕获 InterruptedException 异常之后通过 break 来跳出循环,才能正常结束 run 方法。
  2. 线程未处于阻塞状态:使用 isInterrupted()判断线程的中断标志来退出循环。当使用interrupt()方法时,中断标志就会置 true,和使用自定义的标志来控制循环是一样的道理。
    1. public class ThreadSafe extends Thread {
    2. public void run() {
    3. while (!isInterrupted()){ //非阻塞过程中通过判断中断标志来退出
    4. try {
    5. Thread.sleep(5*1000);//阻塞过程捕获中断异常来退出
    6. } catch (InterruptedException e) {
    7. e.printStackTrace();
    8. break;//捕获到异常之后,执行 break 跳出循环
    9. }
    10. }
    11. }

    stop 方法终止线程(线程不安全)

    程序中可以直接使用 thread.stop()来强行终止线程,但是 stop 方法是很危险的,就像突然关闭计算机电源,而不是按正常程序关机一样,可能会产生不可预料的结果,不安全主要是:thread.stop()调用之后,创建子线程的线程就会抛出 ThreadDeatherror 的错误,并且会释放子线程所持有的所有锁。一般任何进行加锁的代码块,都是为了保护数据的一致性,如果在调用thread.stop()后导致了该线程所持有的所有锁的突然释放(不可控制),那么被保护数据就有可能呈现不一致性,其他线程在使用这些被破坏的数据时,有可能导致一些很奇怪的应用程序错误。因此,并不推荐使用 stop 方法来终止线程。

    继承Thread方式和实现Runnable方法有什么区别?

    继承Thread类,就是多个线程各自完成自己的任务;实现Runnable接口,就是多个线程共同完成一个任务。
    如果继承了其他父类,则不能使用继承thread方式,但是它的好处是比较方便,如果实现runnable方法进行创建,则适合于资源共享,且不受单继承的限制,推荐使用实现runnable方式。

    线程状态

  • NEW
  • RUNNABLE
  • BLOCKED
  • WAITING
  • TIMED_WAITING
  • TERMINATED

image-20200302174621019.png

start

start()通知“线程规划器”此线程已经准备就绪,等待调用线程对象的run()方法。这个过程其实就是让系统安排一个时间来调用Thread中的run()方法,也就是使线程得到运行,启动线程,具有异步执行的效果,即不会马上执行。

run

如果调用run()方法则是同步,并不会交给“线程规划器”,而是“main”,它由Java虚拟机在运行相应线程时直接调用。

wait

使调用该方法的线程释放共享资源的锁,然后从运行状态退出,进入等待队列,直到被再次唤醒;调用之前线程必须获得该对象的对象级别锁,即只能在同步方法同步块中调用wait()方法,这是为了避免出现lost wake up问题(notify在wait之前执行)。
永远在while循环里而不是if语句下使用wait。这样,循环会在线程睡眠前后都检查wait的条件,并在条件实际上并未改变的情况下处理唤醒通知。

notify

随机唤醒等待队列中等待同一共享资源的“一个”线程,并使该线程退出等待队列,进入可运行状态。

yield

放弃当前的CPU资源,但持有锁,进入就绪状态,将它让给其他的任务去占用CPU执行时间。只有优先级与当前线程相同,或者优先级比当前线程更高的处于就绪状态的线程才会获得执行机会。

sleep

在指定的毫秒数内让当前“正在执行的线程”休眠,进入阻塞状态,不会释放持有的锁与资源。低优先级的线程也有执行的机会。

join

在线程z中调用线程x,会使所属线程对象x正常执行run()方法中的任务,而使当前线程z进行无限期的阻塞,进入等待状态,等线程x销毁后再继续执行线程z后面的代码。join会释放锁。

线程中断

interrupt()可以停止线程,但不会真的停止,需要其他语句配合,由线程来决定如何以及何时退出。
立即退出的方法:

  • 异常
  • 沉睡:先sleep()再interrupt()
  • 暴力:Thread.stop(),不过该方法已经作废
  • 返回停止:return;

    守护线程

    用户线程:我们平常创建的普通线程。
    守护线程:用来服务于用户线程;不需要上层逻辑介入。
    需要注意的点:

  • thread.setDaemon(true)必须在thread.start()之前设置;

  • 在Daemon线程中产生的新线程也是Daemon的;
  • 守护线程不能用于去访问固有资源,比如读写操作或者计算逻辑。因为它会在任何时候甚至在一个操作的中间发生中断;
  • Java自带的多线程框架,比如ExecutorService,会将守护线程转换为用户线程,所以如果要使用后台线程就不能用Java的线程池。
  • 当所有的用户线程运行结束后,守护线程就会被kill。

    Executor和ExecutorService的区别

    | | Executor | ExecutorService | | —- | —- | —- | | 关系 | 父接口 | 子接口 | | 提交任务的方法 | execute(),无返回值 | submit(),有返回值 | | 可否取消任务 | 不能取消 | 可以使用future.cancel()来取消pending状态的任务 | | 可否关闭线程池 | 没有相应方法 | 可以,使用shutdown()或者shutdownNow() |

CAS

自旋CAS就是循环进行CAS操作直到成功为止,它是乐观锁实现的机制,原理是利用了处理器提供的CMPXCHG指令实现的。
CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。 如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值 。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该位置的值。
三大问题:

  • ABA如果一个值原来是A,变成了B,又变成了A,那么CAS进行检查时,就会发现值没有变化,但是是变化了的。可以通过加上版本号来解决;Atomic包的AtomicStampedReference也可以来解决ABA问题。
  • 循环时间长开销大自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。
  • 只能保证一个共享变量的原子操作对于单个变量,可使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证原子性。可以使用锁或者使用AtomicReference将多个共享变量合并为一个共享变量。

    Synchronized

    它可以确保原子性、互斥性、可见性,并且是可重入锁(计数器实现)。主要修饰的主体如下:

  • 修饰普通方法:锁住的是对象实例(this)

  • 修饰静态方法:锁住的是Class实例,又因为Class的相关数据存储在永久带PermGen (jdk1.8 则是 metaspace),永久带是全局共享的,因此静态方法锁相当于类的一个全局锁, 会锁所有调用该方法的线程;
  • 修饰代码块:锁住的是所有以该对象为锁的代码块。它有多个队列, 当多个线程一起访问某个对象监视器的时候,对象监视器会将这些线程存储在不同的容器中。

    实现原理

    JVM基于进入和退出Monitor对象来实现方法同步和代码块同步,编译后monitorenter插入到同步代码块的开始位置,monitorexit插入到方法结束处和异常处。监视器锁(monitor)依赖于底层操作系统的 Mutex Lock 来实现,而操作系统实现线程之间的切换时需要从用户态转换到核心态,这个状态之间的转换需要相对比较长的时间,时间成本相对较高。从而会有锁升级的概念。

    锁升级

  • 偏向锁大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,为了让线程获得锁的代价更低而引入了偏向锁,它根据Mark Word是否存储指向当前线程的偏向锁来判断是否获取锁。当有其他线程竞争锁时,就会释放偏向锁。会根据线程是否活着来进入无锁状态还是轻量级锁。

  • 轻量级锁线程在当前线程的栈帧中创建用于存储锁记录的空间,并将对象头中的Mark Word复制到锁记录中,之后尝试使用CAS将对象头中的Mark Word替换为指向锁记录的指针,如果替换失败,则使用自旋来获取锁;释放锁时,使用原子的CAS操作将Displaced Mark Word替换回到对象头,如果有竞争发生就膨胀微重量级锁。
  • 重量级锁就是上面所述的基于monitor的同步方式。

    锁消除

    锁消除是指虚拟机即时编译器(JIT)在运行时,对一些代码上要求同步,但是检测到不可能发生数据竞争的锁进行消除。

    锁粗化

    如果虚拟机检测到有这样一串零碎的操作都对同一个对象加锁,将会把加锁同步的范围扩展(粗化)到整个操作序列的外部。

    MarkWord内容

    以64位JVM为例:
    image.png
    由此可知,在无锁状态下,Mark Word中可以存储对象的identity hash code值。当对象的hashCode()方法(非用户自定义)第一次被调用时,JVM会生成对应的identity hash code值,并将该值存储到Mark Word中。后续如果该对象的hashCode()方法再次被调用则不会再通过JVM进行计算得到,而是直接从Mark Word中获取。只有这样才能保证多次获取到的identity hash code的值是相同的。

我们还知道,对于轻量级锁,获取锁的线程栈帧中有锁记录(Lock Record)空间,用于存储Mark Word的拷贝,官方称之为Displaced Mark Word,该拷贝中可以包含identity hash code,所以轻量级锁可以和identity hash code共存;对于重量级锁,ObjectMonitor类里有字段可以记录非加锁状态下的mark word,其中也可以存储identity hash code的值,所以重量级锁也可以和identity hash code共存

对于偏向锁,在线程获取偏向锁时,会用Thread ID和epoch值覆盖identity hash code所在的位置。如果一个对象的hashCode()方法已经被调用过一次之后,这个对象还能被设置偏向锁么?答案是不能。因为如果可以的话,那Mark Word中的identity hash code必然会被偏向线程Id给覆盖,这就会造成同一个对象前后两次调用hashCode()方法得到的结果不一致。

HotSpot VM的锁实现机制是:
当一个对象已经计算过identity hash code,它就无法进入偏向锁状态;
当一个对象当前正处于偏向锁状态,并且需要计算其identity hash code的话,则它的偏向锁会被撤销,并且锁会膨胀为轻量级锁或者重量锁;
轻量级锁的实现中,会通过线程栈帧的锁记录存储Displaced Mark Word;重量锁的实现中,ObjectMonitor类里有字段可以记录非加锁状态下的mark word,其中可以存储identity hash code的值。

ReentrantLock

Lock底层实现基于AQS实现,采用线程独占的方式,在硬件层面依赖特殊的CPU指令(CAS),是可重入锁。
简单来说,ReenTrantLock的实现是一种自旋锁,通过循环调用CAS操作来实现加锁。它的性能比较好也是因为避免了使线程进入内核态的阻塞状态。想尽办法避免线程进入内核的阻塞状态。

可重入

锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取;线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁(计数器自增和自减控制)。

公平锁与非公平锁

所谓的公平是指获取锁的顺序符合请求的绝对时间顺序,也就是FIFO。公平锁获取多了hasQueuedPredecessors()方法来判断当前节点是否有前驱结点,有则等待前驱节点释放锁,由于线程切换次数较多,它比非公平锁性能差了100倍。

与Synchronized区别

  1. ReentrantLock 通过方法 lock()与 unlock()来进行加锁与解锁操作,与 synchronized 会 被 JVM 自动解锁机制不同,ReentrantLock 加锁后需要手动进行解锁。为了避免程序出现异常而无法正常解锁的情况,使用 ReentrantLock 必须在 finally 控制块中进行解锁操作。
  2. ReentrantLock 相比 synchronized 的优势是可中断、公平锁、多个锁。这种情况下需要使用ReentrantLock。

    Java内存模型

    JMM决定一个线程对共享变量的写入何时对另一个线程可见。线程之间的共享变量存储在主内存中,每个线程都有一个本地内存,本地内存中存储了该线程以读/写共享变量的副本。
    两线程通信步骤:线程A把本地内存A中更新过的共享变量刷新到主内存中去;线程B到主内存中去读取线程A之前已更新过的共享变量。JMM通过控制主内存与每个线程的本地内存之间的交互,来为Java程序提供内存可见性的保证。
    image-20200302201601105.png

    volatile

    volatile保证可见性,但不保证互斥性与原子性。

    读-写内存语义

    写:当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值刷新到主内存。
    读:当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效,线程将从主内存中读取共享变量。

    volatile内存语义的实现

    为了实现volatile的内存语义,编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。
    volatile禁止指令重排序的规则:

  3. 当第二个操作是voaltile写时,无论第一个操作是什么,都不能进行重排序;

  4. 当第一个操作是volatile读时,不管第二个操作是什么,都不能进行重排序;
  5. 当第一个操作是volatile写时,第二个操作是volatile读时,不能进行重排序。

volatile在单例模式中,就有很好的运用,防止因为多线程场景下由于重排序问题造成的双重锁判断条件过早判断,造成逻辑问题。

使用场景

一个线程写,多个线程读的场景。

AQS

队列同步器AbstractQueuedSynchronizer,是用来构建锁或者其他同步器组件的基础框架,使用int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。基于AQS的有CountDownLatch、CyclicBarrier、Semaphore等。

CountDownLatch

用来协调多个线程之间的同步,或者说起到线程之间的通信,类似于join()方法,可以让当前执行线程等待join线程执行结束,CountDownLatch功能更多。构造方法传入一个n,调用countDown时,n就会减一,当n不为0时,await方法会阻塞当前线程。

  1. public class CountDownLatchTest {
  2. static CountDownLatch c = new CountDownLatch(2);
  3. public static void main(String[] args) throws InterruptedException {
  4. new Thread(() -> {
  5. System.out.println(1);
  6. c.countDown();
  7. System.out.println(2);
  8. c.countDown();
  9. }).start();
  10. c.await();
  11. System.out.println(3);
  12. }
  13. }

程序会顺序输出1,2,3。countDown的本质是调用AQS的releaseShared(1)。

CyclicBarrier

让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门。构造方法传入一个整型数值,代表拦截的线程数量,当线程到达屏障后调用await方法告诉CyclicBarrier,已到达屏障,当前线程被阻塞。与CountDownLatch的区别是CountDownLatch计数器只能使用一次,而CyclicBarrier能使用reset()重置,还有一些额外的api,如统计阻塞线程数量等。

  1. public class CyclicBarrierTest {
  2. static CyclicBarrier c = new CyclicBarrier(2);
  3. public static void main(String[] args) {
  4. new Thread(() -> {
  5. try {
  6. Thread.sleep(5000);
  7. c.await();
  8. } catch (InterruptedException | BrokenBarrierException e) {
  9. e.printStackTrace();
  10. }
  11. System.out.println(1);
  12. }).start();
  13. try {
  14. c.await();
  15. } catch (InterruptedException | BrokenBarrierException e) {
  16. e.printStackTrace();
  17. }
  18. System.out.println(2);
  19. }
  20. }

5秒过后将会随机输出1,2。底层是使用实现了AQS的ReentrantLock与Condition来操作的。

Semaphore

用来通知同时访问特定资源的线程数量,通过协调各个线程,以保证合理地使用公共资源。在控制资源的时候会用到,比如数据库连接。构造方法接受一个整型数值,表示可用的许可证数量,acquire()获取许可证,release()方法归还许可证。

  1. public class SemaphoreTest {
  2. private static final int THREAD_COUNT = 30;
  3. private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
  4. private static Semaphore s = new Semaphore(10);
  5. public static void main(String[] args) {
  6. for (int i = 0; i < THREAD_COUNT; i++) {
  7. threadPool.execute(() -> {
  8. try {
  9. s.acquire();
  10. System.out.println("save data");
  11. s.release();
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. });
  16. }
  17. threadPool.shutdown();
  18. }
  19. }

程序将会输出30个save data。

Excutors各个方法的弊端

  • newFixedThreadPool和newSingleThreadExecutor主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。在其构造方法中创建了一个无界队列LinkedBlockingQueuesize,是一个最大值为Integer.MAX_VALUE的线程阻塞队列,当添加任务的速度大于线程池处理任务的速度,可能会在队列堆积大量的请求,消耗很大的内存。
  • newCachedThreadPool和newScheduledThreadPool主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

推荐写法:

  1. // 第一种
  2. ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
  3.     new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
  4. // 第二种
  5. ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
  6. //Common Thread Pool
  7. ExecutorService pool = new ThreadPoolExecutor(5, 200,
  8. 0L, TimeUnit.MILLISECONDS,
  9. new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
  10. pool.execute(()-> System.out.println(Thread.currentThread().getName()));
  11. pool.shutdown();//gracefully shutdown

如何配置线程池大小

对于不同性质的任务来说,CPU密集型任务应配置尽可能小的线程,如配置CPU个数+1的线程数,IO密集型任务应配置尽可能多的线程,因为IO操作不占用CPU,不要让CPU闲下来,应加大线程数量,如配置两倍CPU个数+1。
最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1) CPU数目
*线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。

手写线程池

  1. public class MyThreadPool {
  2. private static final int WORK_NUM = 10;
  3. private static final int TASK_NUM = 100;
  4. private int workNum;
  5. private int taskNum;
  6. private final Set<WorkThread> workThreads;
  7. private final BlockingQueue<Runnable> taskQueue;
  8. public MyThreadPool() {
  9. this(WORK_NUM, TASK_NUM);
  10. }
  11. public MyThreadPool(int workNum, int taskNum) {
  12. if (workNum <= 0) {
  13. workNum = WORK_NUM;
  14. }
  15. if (taskNum <= 0) {
  16. taskNum = TASK_NUM;
  17. }
  18. taskQueue = new ArrayBlockingQueue<>(taskNum);
  19. this.workNum = workNum;
  20. this.taskNum = taskNum;
  21. workThreads = new HashSet<>();
  22. // 启动一定数量的线程数,从队列中获取任务处理
  23. for (int i = 0; i < workNum; i++) {
  24. WorkThread workThread = new WorkThread("thread_" + i);
  25. workThread.start();
  26. workThreads.add(workThread);
  27. }
  28. }
  29. public void execute(Runnable task) {
  30. try {
  31. taskQueue.put(task);
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. }
  36. public void destroy() {
  37. System.out.println("ready close thread pool...");
  38. if (workThreads == null || workThreads.isEmpty()) {
  39. return ;
  40. }
  41. for (WorkThread workThread : workThreads) {
  42. workThread.stopWork();
  43. //help gc
  44. workThread = null;
  45. }
  46. workThreads.clear();
  47. }
  48. private class WorkThread extends Thread {
  49. public WorkThread(String name) {
  50. super();
  51. setName(name);
  52. }
  53. @Override
  54. public void run() {
  55. while (!interrupted()) {
  56. try {
  57. // 获取任务
  58. Runnable runnable = taskQueue.take();
  59. if (runnable != null) {
  60. System.out.println(getName() + " ready execute:" + runnable.toString());
  61. // 执行任务
  62. runnable.run();
  63. }
  64. runnable = null;
  65. } catch (Exception e) {
  66. interrupted();
  67. e.printStackTrace();
  68. }
  69. }
  70. }
  71. public void stopWork() {
  72. interrupted();
  73. }
  74. }
  75. }

手写阻塞队列

  1. public class MyBlockingQueue {
  2. /**
  3. * 队列容器
  4. */
  5. private List<Integer> container = new LinkedList<>();
  6. /**
  7. * 队列最大长度
  8. */
  9. private static final int MAX_NUM = 50;
  10. /**
  11. * 用于记录容器中元素的个数
  12. */
  13. private int count;
  14. /**
  15. * 申明锁
  16. */
  17. private Lock lock = new ReentrantLock();
  18. /**
  19. * 标识,可以表示具体的线程
  20. */
  21. private final Condition conditionNull = lock.newCondition();
  22. private final Condition conditionFull = lock.newCondition();
  23. /**
  24. * 向阻塞队列中添加元素
  25. */
  26. public void add(Integer item) throws InterruptedException {
  27. if (item == null) {
  28. throw new NullPointerException();
  29. }
  30. //申明可中断锁,简单起见也可以直接使用lock.lock(),lock.tryLock()
  31. lock.lockInterruptibly();
  32. try {
  33. try {
  34. while (container.size() == MAX_NUM) {
  35. System.out.println("队列已满,线程进入等待队列");
  36. conditionFull.await();
  37. }
  38. } catch (InterruptedException e) {
  39. System.out.println("出现异常,唤醒阻塞线程conditionFull");
  40. conditionFull.signal();
  41. throw e;
  42. }
  43. System.out.println("添加元素:" + item);
  44. ++count;
  45. container.add(item);
  46. System.out.println("唤醒阻塞线程...");
  47. //添加成功之后,调用signal()发出唤醒通知
  48. //注意这里的唤醒一定要在unlock()之前
  49. conditionNull.signal();
  50. Thread.sleep(1000);
  51. } finally {
  52. System.out.println("添加方法释放锁...");
  53. //手动释放锁
  54. lock.unlock();
  55. }
  56. }
  57. /**
  58. * 从阻塞队列中删除元素
  59. */
  60. public Integer take() throws InterruptedException {
  61. lock.lockInterruptibly();
  62. try {
  63. try {
  64. while (count == 0) {
  65. System.out.println("队列元素为空,进入阻塞...");
  66. conditionNull.await();
  67. }
  68. } catch (InterruptedException ie) {
  69. System.out.println("出现异常,唤醒阻塞线程conditionNull");
  70. conditionNull.signal();
  71. throw ie;
  72. }
  73. --count;
  74. Integer x = container.get(0);
  75. System.out.println("取出方法取出元素:" + x);
  76. container.remove(0);
  77. conditionFull.signal();
  78. return x;
  79. } finally {
  80. System.out.println("取出方法释放锁...");
  81. lock.unlock();
  82. }
  83. }
  84. }

多线程打印ABC

  1. public static void main(String[] args) {
  2. PrintABC printABC = new PrintABC();
  3. new Thread(printABC::printA).start();
  4. new Thread(printABC::printB).start();
  5. new Thread(printABC::printC).start();
  6. }
  7. private Semaphore semaphoreA = new Semaphore(1);
  8. private Semaphore semaphoreB = new Semaphore(0);
  9. private Semaphore semaphoreC = new Semaphore(0);
  10. public void printA() {
  11. print("A", semaphoreA, semaphoreB);
  12. }
  13. public void printB() {
  14. print("B", semaphoreB, semaphoreC);
  15. }
  16. public void printC() {
  17. print("C", semaphoreC, semaphoreA);
  18. }
  19. private void print(String name, Semaphore currentSemaphore, Semaphore nextSemaphore) {
  20. for (int i = 0; i < 10; ) {
  21. try {
  22. currentSemaphore.acquire();
  23. System.out.println(Thread.currentThread().getName() + " try to point " + name);
  24. i++;
  25. nextSemaphore.release();
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. }