1.线程池

1.1newFixedThreadPool

  1. public class FixedTheadPoolTest {
  2. public static void main(String[] args) {
  3. ExecutorService executorService = Executors.newFixedThreadPool(4);
  4. for (int i = 0; i < 100; i++) {
  5. executorService.execute(new Task());
  6. }
  7. }
  8. }
  9. class Task implements Runnable {
  10. @Override
  11. public void run() {
  12. try {
  13. Thread.sleep(500);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. System.out.println(Thread.currentThread().getName());
  18. }
  19. }

1.2newSingleThreadExecutor

  1. public class FixedTheadPoolTest {
  2. public static void main(String[] args) {
  3. ExecutorService executorService = Executors.newFixedThreadPool(4);
  4. for (int i = 0; i < 100; i++) {
  5. executorService.execute(new Task());
  6. }
  7. }
  8. }
  9. class Task implements Runnable {
  10. @Override
  11. public void run() {
  12. try {
  13. Thread.sleep(500);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. System.out.println(Thread.currentThread().getName());
  18. }
  19. }

1.3newCachedThreadPool

  1. public class CacheTheadPoolTest {
  2. public static void main(String[] args) {
  3. ExecutorService executorService = Executors.newCachedThreadPool();
  4. for (int i = 0; i < 100; i++) {
  5. executorService.execute(new Task());
  6. }
  7. }
  8. }

1.4newScheduledThreadPool

  1. public class ScheduledTheadPoolTest {
  2. public static void main(String[] args) {
  3. ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
  4. // 每隔5秒执行一次任务
  5. // scheduledExecutorService.schedule(new Task(),5, TimeUnit.SECONDS);
  6. // 最开始1秒 后续每隔3秒
  7. scheduledExecutorService.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS);
  8. }
  9. }

1.5停止线程

  1. public class ShowDown {
  2. public static void main(String[] args) throws InterruptedException {
  3. ExecutorService pool = Executors.newFixedThreadPool(10);
  4. for (int i = 0; i < 100; i++) {
  5. pool.execute(new ShutDownTask());
  6. }
  7. Thread.sleep(1500);
  8. // 强制停止线程立即执行
  9. List<Runnable> runnables = pool.shutdownNow();
  10. // 在3秒钟内判断线程是否已停止
  11. // pool.shutdown();
  12. // boolean b = pool.awaitTermination(7L, TimeUnit.SECONDS);
  13. // System.out.println(b);
  14. // System.out.println(pool.isShutdown());
  15. // pool.shutdown();
  16. // // 已停止线程后再调用会报错
  17. //// pool.execute(new ShutDownTask());
  18. // // 开始结束后就会返回true
  19. // System.out.println(pool.isShutdown());
  20. // Thread.sleep(10000);
  21. // // 测地执行完后才返回true
  22. // System.out.println(pool.isTerminated());
  23. }
  24. }
  25. class ShutDownTask implements Runnable {
  26. @Override
  27. public void run() {
  28. try {
  29. Thread.sleep(1500);
  30. System.out.println(Thread.currentThread().getName());
  31. } catch (InterruptedException e) {
  32. System.out.println(Thread.currentThread().getName() + "被中断了");
  33. }
  34. }
  35. }

1.6钩子拒绝策略

  1. public class PauseBleThreadPool extends ThreadPoolExecutor {
  2. private boolean isPaused;
  3. private final ReentrantLock lock = new ReentrantLock();
  4. private Condition unpaused = lock.newCondition();
  5. public PauseBleThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
  6. super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
  7. }
  8. @Override
  9. protected void beforeExecute(Thread t, Runnable r) {
  10. super.beforeExecute(t, r);
  11. lock.lock();
  12. try {
  13. while (isPaused) {
  14. unpaused.await();
  15. }
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. } finally {
  19. lock.unlock();
  20. }
  21. }
  22. private void pause() {
  23. lock.lock();
  24. try {
  25. isPaused = true;
  26. } finally {
  27. lock.unlock();
  28. }
  29. }
  30. public void resume() {
  31. lock.lock();
  32. try {
  33. isPaused = false;
  34. unpaused.signalAll();
  35. } finally {
  36. lock.unlock();
  37. }
  38. }
  39. public static void main(String[] args) throws InterruptedException {
  40. PauseBleThreadPool threadPool = new PauseBleThreadPool(10, 20, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
  41. Runnable ru = () -> {
  42. System.out.println("我被执行了");
  43. try {
  44. Thread.sleep(1500);
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. }
  48. };
  49. for (int i = 0; i < 100; i++) {
  50. threadPool.execute(ru);
  51. }
  52. Thread.sleep(1500);
  53. threadPool.pause();
  54. System.out.println("线程池被暂停了");
  55. threadPool.resume();
  56. System.out.println("线程池被恢复了");
  57. }
  58. }

2.锁

2.1 ReentrantLock经典案例

  1. /**
  2. * 演示多线程预定电影院座位,1个座位只能卖给一个人
  3. */
  4. public class CinemaBookSeat {
  5. private static ReentrantLock lock = new ReentrantLock();
  6. private static void bookSeat() {
  7. lock.lock();
  8. try {
  9. System.out.println(Thread.currentThread().getName() + "开始预定座位");
  10. TimeUnit.SECONDS.sleep(1);
  11. System.out.println(Thread.currentThread().getName() + "完成预定座位");
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. } finally {
  15. lock.unlock();
  16. }
  17. }
  18. public static void main(String[] args) {
  19. new Thread(CinemaBookSeat::bookSeat).start();
  20. new Thread(CinemaBookSeat::bookSeat).start();
  21. new Thread(CinemaBookSeat::bookSeat).start();
  22. new Thread(CinemaBookSeat::bookSeat).start();
  23. }
  24. }
  1. public class PrintLock {
  2. static class Outputer {
  3. Lock lock = new ReentrantLock();
  4. public void output(String name) {
  5. int len = name.length();
  6. lock.lock();
  7. try {
  8. for (int i = 0; i < len; i++) {
  9. System.out.print(name.charAt(i));
  10. }
  11. System.out.println("");
  12. } finally {
  13. lock.unlock();
  14. }
  15. }
  16. }
  17. private void init() {
  18. final Outputer outputer = new Outputer();
  19. new Thread(() -> {
  20. while (true) {
  21. try {
  22. Thread.sleep(5);
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. outputer.output("中国近代历史");
  27. }
  28. }).start();
  29. new Thread(() -> {
  30. while (true) {
  31. try {
  32. Thread.sleep(5);
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. outputer.output("马克思主义");
  37. }
  38. }).start();
  39. }
  40. public static void main(String[] args) {
  41. new PrintLock().init();
  42. }
  43. }

2.2 公平锁和非公平锁案例

  1. public class FairLock {
  2. public static void main(String[] args) {
  3. PrintQueue printQueue = new PrintQueue();
  4. Thread thread[] = new Thread[10];
  5. for (int i = 0; i < 10; i++) {
  6. thread[i] = new Thread(new Job(printQueue));
  7. }
  8. for (int i = 0; i < 10; i++) {
  9. thread[i].start();
  10. try {
  11. Thread.sleep(100);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. }
  17. }
  18. class PrintQueue {
  19. // 设置为false就是非公平锁
  20. private Lock queueLock = new ReentrantLock(true);
  21. public void printJob(Object document) {
  22. queueLock.lock();
  23. try {
  24. int duration = new Random().nextInt(10) + 1;
  25. System.out.println(Thread.currentThread().getName() + "正在打印,需要" + duration);
  26. try {
  27. Thread.sleep(duration * 1000);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. } finally {
  32. queueLock.unlock();
  33. }
  34. queueLock.lock();
  35. try {
  36. int duration = new Random().nextInt(10) + 1;
  37. System.out.println(Thread.currentThread().getName() + "正在打印,需要" + duration);
  38. try {
  39. Thread.sleep(duration * 1000);
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. }
  43. } finally {
  44. queueLock.unlock();
  45. }
  46. }
  47. }
  48. class Job implements Runnable {
  49. PrintQueue printQueue;
  50. public Job(PrintQueue printQueue) {
  51. this.printQueue = printQueue;
  52. }
  53. @Override
  54. public void run() {
  55. System.out.println(Thread.currentThread().getName() + "开始打印");
  56. printQueue.printJob(new Object());
  57. System.out.println(Thread.currentThread().getName() + "打印完毕");
  58. }
  59. }

2.3 ReentrantReadWriteLock 案例

  1. public class CinemaReadWrite {
  2. private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
  3. private static ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
  4. private static ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
  5. public static void read() {
  6. readLock.lock();
  7. try {
  8. System.out.println(Thread.currentThread().getName() + "得到了读锁,正在读取");
  9. Thread.sleep(1000);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. } finally {
  13. System.out.println(Thread.currentThread().getName() + "释放读锁");
  14. readLock.unlock();
  15. }
  16. }
  17. public static void write() {
  18. writeLock.lock();
  19. try {
  20. System.out.println(Thread.currentThread().getName() + "得到了写锁,正在写入");
  21. Thread.sleep(1000);
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. } finally {
  25. System.out.println(Thread.currentThread().getName() + "释放写锁");
  26. writeLock.unlock();
  27. }
  28. }
  29. public static void main(String[] args) {
  30. new Thread(()->read(),"Thread1").start();
  31. new Thread(()->read(),"Thread2").start();
  32. new Thread(()->write(),"Thread3").start();
  33. new Thread(()->write(),"Thread4").start();
  34. }
  35. }

2.3.1 头结点为写锁,读锁排队

  1. public static void main(String[] args) {
  2. new Thread(()->write(),"Thread1").start();
  3. new Thread(()->read(),"Thread2").start();
  4. new Thread(()->read(),"Thread3").start();
  5. new Thread(()->write(),"Thread4").start();
  6. new Thread(()->read(),"Thread5").start();
  7. }

2.3.2 头结点不为写锁时,读锁可以插队

  1. public class NonfairBargeDemo {
  2. private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(false);
  3. private static ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
  4. private static ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
  5. private static void read() {
  6. System.out.println(Thread.currentThread().getName() + "开始尝试获取读锁");
  7. readLock.lock();
  8. try {
  9. System.out.println(Thread.currentThread().getName() + "得到读锁,正在读取");
  10. try {
  11. Thread.sleep(20);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. } finally {
  16. System.out.println(Thread.currentThread().getName() + "释放读锁");
  17. readLock.unlock();
  18. }
  19. }
  20. private static void write() {
  21. System.out.println(Thread.currentThread().getName() + "开始尝试获取写锁");
  22. writeLock.lock();
  23. try {
  24. System.out.println(Thread.currentThread().getName() + "得到写锁,正在写入");
  25. try {
  26. Thread.sleep(40);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. } finally {
  31. System.out.println(Thread.currentThread().getName() + "释放写锁");
  32. writeLock.unlock();
  33. }
  34. }
  35. public static void main(String[] args) {
  36. new Thread(() -> write(), "Thread1").start();
  37. new Thread(() -> read(), "Thread2").start();
  38. new Thread(() -> read(), "Thread3").start();
  39. new Thread(() -> write(), "Thread4").start();
  40. new Thread(() -> read(), "Thread5").start();
  41. new Thread(() -> {
  42. Thread thread[] = new Thread[1000];
  43. for (int i = 0; i < 1000; i++) {
  44. thread[i] = new Thread(() -> {
  45. read();
  46. }, "子线程创建的Thread" + i);
  47. }
  48. for (int i = 0; i < 1000; i++) {
  49. thread[i].start();
  50. }
  51. }).start();
  52. }
  53. }

2.4 锁的降级

  1. public class Upgrading {
  2. private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(false);
  3. private static ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
  4. private static ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
  5. private static void readUpgrading() {
  6. System.out.println(Thread.currentThread().getName() + "开始尝试获取读锁");
  7. readLock.lock();
  8. try {
  9. System.out.println(Thread.currentThread().getName() + "得到读锁,正在读取");
  10. try {
  11. Thread.sleep(20);
  12. System.out.println(Thread.currentThread().getName() + "升级会带来阻塞");
  13. writeLock.lock();
  14. System.out.println(Thread.currentThread().getName() + "获取到了写锁,升级成功");
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. } finally {
  19. System.out.println(Thread.currentThread().getName() + "释放读锁");
  20. readLock.unlock();
  21. }
  22. }
  23. private static void writeDowngrading() {
  24. System.out.println(Thread.currentThread().getName() + "开始尝试获取写锁");
  25. writeLock.lock();
  26. try {
  27. System.out.println(Thread.currentThread().getName() + "得到写锁,正在写入");
  28. try {
  29. Thread.sleep(40);
  30. readLock.lock();
  31. System.out.println("在不释放写锁的情况下,直接获取写锁,成功降级");
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. } finally {
  36. readLock.unlock();
  37. System.out.println(Thread.currentThread().getName() + "释放写锁");
  38. writeLock.unlock();
  39. }
  40. }
  41. public static void main(String[] args) throws InterruptedException {
  42. System.out.println("先演示降级是可以的");
  43. Thread thread1 = new Thread(() -> writeDowngrading(), "Thread1");
  44. thread1.start();
  45. thread1.join();
  46. System.out.println("----------------");
  47. System.out.println("演示升级是不行的");
  48. Thread thread2 = new Thread(() -> readUpgrading(), "Thread2");
  49. thread2.start();
  50. }
  51. }

3.原子类

3.1 AtomicInteger

  1. public class AtomicIntegerDemo1 implements Runnable {
  2. private static final AtomicInteger atomicInteger = new AtomicInteger();
  3. public void incrementAtomic() {
  4. atomicInteger.getAndIncrement();
  5. }
  6. private static volatile int basicCount = 0;
  7. public void incrementBasic() {
  8. basicCount++;
  9. }
  10. @Override
  11. public void run() {
  12. for (int i = 0; i < 10000; i++) {
  13. incrementAtomic();
  14. incrementBasic();
  15. }
  16. }
  17. public static void main(String[] args) throws InterruptedException {
  18. AtomicIntegerDemo1 r = new AtomicIntegerDemo1();
  19. Thread thread1 = new Thread(r);
  20. Thread thread2 = new Thread(r);
  21. thread1.start();
  22. thread2.start();
  23. thread1.join();
  24. thread2.join();
  25. System.out.println("原子类的结果是:" + atomicInteger.get());
  26. System.out.println("普通变量的结果是:" + basicCount);
  27. }
  28. }

3.2 AtomicIntegerArray

  1. public class AtomicArrayDemo {
  2. public static void main(String[] args) {
  3. AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(1000);
  4. Thread[] threadsIncrementer = new Thread[100];
  5. Thread[] threadsDecrementer = new Thread[100];
  6. Incrementer incrementer = new Incrementer(atomicIntegerArray);
  7. Decrementer decrementer = new Decrementer(atomicIntegerArray);
  8. for (int i = 0; i < 100; i++) {
  9. threadsDecrementer[i] = new Thread(decrementer);
  10. threadsIncrementer[i] = new Thread(incrementer);
  11. threadsDecrementer[i].start();
  12. threadsIncrementer[i].start();
  13. }
  14. for (int i = 0; i < 100; i++) {
  15. try {
  16. threadsDecrementer[i].join();
  17. threadsIncrementer[i].join();
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. for (int i = 0; i < atomicIntegerArray.length(); i++) {
  23. if(atomicIntegerArray.get(i)!=0){
  24. System.out.println("发现了错误"+i);
  25. }
  26. }
  27. System.out.println("运行结束");
  28. }
  29. }
  30. class Decrementer implements Runnable {
  31. private AtomicIntegerArray array;
  32. public Decrementer(AtomicIntegerArray array) {
  33. this.array = array;
  34. }
  35. @Override
  36. public void run() {
  37. for (int i = 0; i < array.length(); i++) {
  38. array.getAndIncrement(i);
  39. }
  40. }
  41. }
  42. class Incrementer implements Runnable {
  43. private AtomicIntegerArray array;
  44. public Incrementer(AtomicIntegerArray array) {
  45. this.array = array;
  46. }
  47. @Override
  48. public void run() {
  49. for (int i = 0; i < array.length(); i++) {
  50. array.getAndDecrement(i);
  51. }
  52. }
  53. }

3.3 AtomicIntegerFieldUpdater

  1. public class AtomicIntegerFieldUpdaterDemo implements Runnable {
  2. static Candidate tom;
  3. static Candidate peter;
  4. public static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");
  5. @Override
  6. public void run() {
  7. for (int i = 0; i < 10000; i++) {
  8. peter.score++;
  9. scoreUpdater.getAndIncrement(tom);
  10. }
  11. }
  12. public static class Candidate {
  13. volatile int score;
  14. }
  15. public static void main(String[] args) throws InterruptedException {
  16. tom = new Candidate();
  17. peter = new Candidate();
  18. AtomicIntegerFieldUpdaterDemo r = new AtomicIntegerFieldUpdaterDemo();
  19. Thread t1 = new Thread(r);
  20. Thread t2 = new Thread(r);
  21. t1.start();
  22. t2.start();
  23. t1.join();
  24. t2.join();
  25. System.out.println("普通变量:" + peter.score);
  26. System.out.println("升级后的结果:" + tom.score);
  27. }
  28. }