两个线程同时开启,如果让两个线程顺序执行

  1. synchronized,然后用 wait notify
  2. locksupport 的 park,unpark
  3. cas,不停的去判断另一个线程运行完了没有
  4. 使用很多种 aqs 实现,如 countdownlatch,semaphore,reentrylock 等
  5. blockingqueue
  6. exchanger
  7. 单一线程池——控制线程顺序运行 8.future 9.pipedinputstram 和 pipedoutputstream

使用join()

join在线程里面意味着“插队”,哪个线程调用join代表哪个线程插队先执行——但是插谁的队是有讲究了,不是说你可以插到队头去做第一个吃螃蟹的人,而是插到在当前运行线程的前面,比如系统目前运行线程A,在线程A里面调用了线程B.join方法,则接下来线程B会抢先在线程A面前执行,等到线程B全部执行完后才继续执行线程A。

而在JDK的解释中,join方法被解释成等待这个线程死亡,也就是等待这个线程全部执行完后才继续执行接下来的进程。

image.png

  1. @Test
  2. @SneakyThrows
  3. public void useJoin() {
  4. Thread t1 = new Thread(() -> System.out.println(T1_WORD));
  5. Thread t2 = new Thread(() -> {
  6. try {
  7. t1.join(); //插队
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. System.out.println(T2_WORD);
  12. });
  13. t1.start();
  14. t2.start();
  15. t2.join();
  16. }

使用synchronized关键字和 wait&notify

  1. private static volatile Integer STATUS = 1;
  2. @Test
  3. @SneakyThrows
  4. public void useSynchronized() {
  5. Object object = new Object();
  6. Thread t1 = new Thread(() -> {
  7. while (true) {
  8. while (STATUS == 1) {
  9. synchronized (object) {
  10. try {
  11. System.out.println(T1_WORD);
  12. STATUS = 2;
  13. object.wait();
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. }
  19. }
  20. }, "T1");
  21. Thread t2 = new Thread(() -> {
  22. while (true) {
  23. while (STATUS == 2) {
  24. synchronized (object) {
  25. try {
  26. System.out.println(T2_WORD);
  27. TimeUnit.SECONDS.sleep(2);
  28. STATUS = 1;
  29. object.notify();
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. }
  35. }
  36. }, "T2");
  37. t2.start();
  38. t1.start();
  39. t2.join();
  40. }

使用LockSupport (链接

  1. @Test
  2. @SneakyThrows
  3. public void useLockSupport() {
  4. Thread t2 = new Thread(() -> {
  5. LockSupport.park();
  6. System.out.println(T2_WORD);
  7. });
  8. Thread t1 = new Thread(() -> {
  9. System.out.println(T1_WORD);
  10. try {
  11. TimeUnit.SECONDS.sleep(2);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. LockSupport.unpark(t2);
  16. });
  17. t1.start();
  18. t2.start();
  19. t2.join();
  20. }

使用CAS

  1. private final AtomicBoolean THREAD_A = new AtomicBoolean(false);
  2. @Test
  3. @SneakyThrows
  4. public void useCas() {
  5. Thread t1 = new Thread(() -> {
  6. System.out.println(T1_WORD);
  7. try {
  8. TimeUnit.SECONDS.sleep(2);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. THREAD_A.set(true);
  13. });
  14. Thread t2 = new Thread(() -> {
  15. while (true) {
  16. if (THREAD_A.get()) {
  17. System.out.println(T2_WORD);
  18. break;
  19. }
  20. }
  21. });
  22. t1.start();
  23. t2.start();
  24. t2.join();
  25. }

使用很多种 aqs 实现,如 countdownlatch,semaphore,reentrylock 等

CountDownLatch(2021.04.02 修正)

  1. private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
  2. @Test
  3. @SneakyThrows
  4. public void useCountDownLatch() {
  5. Thread t1 = new Thread(() -> {
  6. try {
  7. while (true) {
  8. while (STATUS == 1) {
  9. System.out.println(T1_WORD);
  10. COUNT_DOWN_LATCH.countDown();
  11. STATUS = 2;
  12. COUNT_DOWN_LATCH.await();
  13. }
  14. }
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. });
  19. Thread t2 = new Thread(() -> {
  20. try {
  21. while (true) {
  22. while (STATUS == 2) {
  23. COUNT_DOWN_LATCH.await();
  24. System.out.println(T2_WORD);
  25. TimeUnit.SECONDS.sleep(2);
  26. STATUS = 1;
  27. COUNT_DOWN_LATCH.countDown();
  28. }
  29. }
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. });
  34. t1.start();
  35. t2.start();
  36. t2.join();
  37. }

Semaphore(2021.04.02 修正)

  1. private static final Semaphore SEMAPHORE_A = new Semaphore(1);
  2. private static final Semaphore SEMAPHORE_B = new Semaphore(0);
  3. @SneakyThrows
  4. @Test
  5. public void useSemaphore() {
  6. Thread t1 = new Thread(() -> {
  7. try {
  8. while (true) {
  9. SEMAPHORE_A.acquire();
  10. System.out.println(T1_WORD);
  11. SEMAPHORE_B.release();
  12. }
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. });
  17. Thread t2 = new Thread(() -> {
  18. try {
  19. while (true) {
  20. SEMAPHORE_B.acquire();
  21. System.out.println(T2_WORD);
  22. System.out.println("-------------------");
  23. TimeUnit.SECONDS.sleep(2);
  24. SEMAPHORE_A.release();
  25. }
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. });
  30. t1.start();
  31. t2.start();
  32. t2.join();
  33. }

ReentryLock

    private static final ReentrantLock REENTRANT_LOCK = new ReentrantLock();

    @Test
    @SneakyThrows
    public void useReentryLock() {

        Thread t1 = new Thread(() -> {
            try {
                REENTRANT_LOCK.lock();
                System.out.println(T1_WORD);
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                REENTRANT_LOCK.unlock();
            }
        });

        Thread t2 = new Thread(() -> {
            try {
                REENTRANT_LOCK.lock();
                System.out.println(T2_WORD);
            } finally {
                REENTRANT_LOCK.unlock();
            }
        });
        t1.start();
        t2.start();
        t2.join();
    }

使用BlockingQueue:take(),put()

    private static final BlockingQueue<Integer> BLOCKING_QUEUE = new ArrayBlockingQueue<>(1);

    @Test
    @SneakyThrows
    public void useBlockingQueue() {

        Thread t1 = new Thread(() -> {
            try {
                System.out.println(T1_WORD);
               TimeUnit.SECONDS.sleep(3);
                BLOCKING_QUEUE.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        Thread t2 = new Thread(() -> {
            try {
                Integer take = BLOCKING_QUEUE.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(T2_WORD);
        });
        t1.start();
        t2.start();
        t2.join();
    }

使用Exchanger

    private static final Exchanger<Boolean> EXCHANGER = new Exchanger<>();

    @Test
    @SneakyThrows
    public void useExchanger() {

        Thread t1 = new Thread(() -> {
            System.out.println(T1_WORD);
            try {
                EXCHANGER.exchange(true);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Thread t2 = new Thread(() -> {
            try {
                if (EXCHANGER.exchange(false)) {
                    System.out.println(T2_WORD);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        t1.start();
        t2.start();
        t2.join();

    }

使用单线程池

     @Test
    @SneakyThrows
    public void useSingleThreadPool() {

        ExecutorService executor = Executors.newSingleThreadExecutor();
        Thread t1 = new Thread(() -> {
            System.out.println(T1_WORD);
            try {
                TimeUnit.SECONDS.sleep(4);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Thread t2 = new Thread(() -> {
            System.out.println(T2_WORD);
            COUNT_DOWN_LATCH.countDown();
        });
        executor.execute(t1);
        executor.execute(t2);
        COUNT_DOWN_LATCH.await();
        executor.shutdown();

    }

使用Future

    @Test
    @SneakyThrows
    public void useFuture() {

        Future<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println(T1_WORD);
            try {
                TimeUnit.SECONDS.sleep(4);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        future.get();
        System.out.println(T2_WORD);
    }

使用PipedInputStream和PipedOutputStream

    @Test
    @SneakyThrows
    public void usePipedInputStreamAndPipedOutputStream() {

        PipedInputStream pis = new PipedInputStream();
        PipedOutputStream pos = new PipedOutputStream();
        pis.connect(pos);
        Thread t1 = new Thread(() -> {
            System.out.println(T1_WORD);
            try {
                TimeUnit.SECONDS.sleep(3);
                pos.write(T1_WORD.getBytes());
            } catch (InterruptedException | IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    pos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
        Thread t2 = new Thread(() -> {
            byte[] byteArray = new byte[20];
            try {
                int readLength = pis.read(byteArray);
                while (readLength != -1) {
                    String newData = new String(byteArray, 0, readLength);
                    System.out.println("读出数据:" + newData);
                    readLength = pis.read(byteArray);
                }
                System.out.println(T2_WORD);
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    pis.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
        t1.start();
        t2.start();
        t2.join();
    }