1.线程池
1.1newFixedThreadPool
public class FixedTheadPoolTest { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(4); for (int i = 0; i < 100; i++) { executorService.execute(new Task()); } }}class Task implements Runnable { @Override public void run() { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }}
1.2newSingleThreadExecutor
public class FixedTheadPoolTest { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(4); for (int i = 0; i < 100; i++) { executorService.execute(new Task()); } }}class Task implements Runnable { @Override public void run() { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }}
1.3newCachedThreadPool
public class CacheTheadPoolTest { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 100; i++) { executorService.execute(new Task()); } }}
1.4newScheduledThreadPool
public class ScheduledTheadPoolTest { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10); // 每隔5秒执行一次任务// scheduledExecutorService.schedule(new Task(),5, TimeUnit.SECONDS); // 最开始1秒 后续每隔3秒 scheduledExecutorService.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS); }}
1.5停止线程
public class ShowDown { public static void main(String[] args) throws InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(10); for (int i = 0; i < 100; i++) { pool.execute(new ShutDownTask()); } Thread.sleep(1500); // 强制停止线程立即执行 List<Runnable> runnables = pool.shutdownNow(); // 在3秒钟内判断线程是否已停止// pool.shutdown();// boolean b = pool.awaitTermination(7L, TimeUnit.SECONDS);// System.out.println(b);// System.out.println(pool.isShutdown());// pool.shutdown();// // 已停止线程后再调用会报错//// pool.execute(new ShutDownTask());// // 开始结束后就会返回true// System.out.println(pool.isShutdown());// Thread.sleep(10000);// // 测地执行完后才返回true// System.out.println(pool.isTerminated()); }}class ShutDownTask implements Runnable { @Override public void run() { try { Thread.sleep(1500); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + "被中断了"); } }}
1.6钩子拒绝策略
public class PauseBleThreadPool extends ThreadPoolExecutor { private boolean isPaused; private final ReentrantLock lock = new ReentrantLock(); private Condition unpaused = lock.newCondition(); public PauseBleThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); lock.lock(); try { while (isPaused) { unpaused.await(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } private void pause() { lock.lock(); try { isPaused = true; } finally { lock.unlock(); } } public void resume() { lock.lock(); try { isPaused = false; unpaused.signalAll(); } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { PauseBleThreadPool threadPool = new PauseBleThreadPool(10, 20, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); Runnable ru = () -> { System.out.println("我被执行了"); try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } }; for (int i = 0; i < 100; i++) { threadPool.execute(ru); } Thread.sleep(1500); threadPool.pause(); System.out.println("线程池被暂停了"); threadPool.resume(); System.out.println("线程池被恢复了"); }}
2.锁
2.1 ReentrantLock经典案例
/** * 演示多线程预定电影院座位,1个座位只能卖给一个人 */public class CinemaBookSeat { private static ReentrantLock lock = new ReentrantLock(); private static void bookSeat() { lock.lock(); try { System.out.println(Thread.currentThread().getName() + "开始预定座位"); TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName() + "完成预定座位"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) { new Thread(CinemaBookSeat::bookSeat).start(); new Thread(CinemaBookSeat::bookSeat).start(); new Thread(CinemaBookSeat::bookSeat).start(); new Thread(CinemaBookSeat::bookSeat).start(); }}
public class PrintLock { static class Outputer { Lock lock = new ReentrantLock(); public void output(String name) { int len = name.length(); lock.lock(); try { for (int i = 0; i < len; i++) { System.out.print(name.charAt(i)); } System.out.println(""); } finally { lock.unlock(); } } } private void init() { final Outputer outputer = new Outputer(); new Thread(() -> { while (true) { try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } outputer.output("中国近代历史"); } }).start(); new Thread(() -> { while (true) { try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } outputer.output("马克思主义"); } }).start(); } public static void main(String[] args) { new PrintLock().init(); }}
2.2 公平锁和非公平锁案例
public class FairLock { public static void main(String[] args) { PrintQueue printQueue = new PrintQueue(); Thread thread[] = new Thread[10]; for (int i = 0; i < 10; i++) { thread[i] = new Thread(new Job(printQueue)); } for (int i = 0; i < 10; i++) { thread[i].start(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } }}class PrintQueue { // 设置为false就是非公平锁 private Lock queueLock = new ReentrantLock(true); public void printJob(Object document) { queueLock.lock(); try { int duration = new Random().nextInt(10) + 1; System.out.println(Thread.currentThread().getName() + "正在打印,需要" + duration); try { Thread.sleep(duration * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } finally { queueLock.unlock(); } queueLock.lock(); try { int duration = new Random().nextInt(10) + 1; System.out.println(Thread.currentThread().getName() + "正在打印,需要" + duration); try { Thread.sleep(duration * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } finally { queueLock.unlock(); } }}class Job implements Runnable { PrintQueue printQueue; public Job(PrintQueue printQueue) { this.printQueue = printQueue; } @Override public void run() { System.out.println(Thread.currentThread().getName() + "开始打印"); printQueue.printJob(new Object()); System.out.println(Thread.currentThread().getName() + "打印完毕"); }}
2.3 ReentrantReadWriteLock 案例
public class CinemaReadWrite { private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(); private static ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock(); private static ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock(); public static void read() { readLock.lock(); try { System.out.println(Thread.currentThread().getName() + "得到了读锁,正在读取"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(Thread.currentThread().getName() + "释放读锁"); readLock.unlock(); } } public static void write() { writeLock.lock(); try { System.out.println(Thread.currentThread().getName() + "得到了写锁,正在写入"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(Thread.currentThread().getName() + "释放写锁"); writeLock.unlock(); } } public static void main(String[] args) { new Thread(()->read(),"Thread1").start(); new Thread(()->read(),"Thread2").start(); new Thread(()->write(),"Thread3").start(); new Thread(()->write(),"Thread4").start(); }}
2.3.1 头结点为写锁,读锁排队
public static void main(String[] args) { new Thread(()->write(),"Thread1").start(); new Thread(()->read(),"Thread2").start(); new Thread(()->read(),"Thread3").start(); new Thread(()->write(),"Thread4").start(); new Thread(()->read(),"Thread5").start(); }
2.3.2 头结点不为写锁时,读锁可以插队
public class NonfairBargeDemo { private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(false); private static ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock(); private static ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock(); private static void read() { System.out.println(Thread.currentThread().getName() + "开始尝试获取读锁"); readLock.lock(); try { System.out.println(Thread.currentThread().getName() + "得到读锁,正在读取"); try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } } finally { System.out.println(Thread.currentThread().getName() + "释放读锁"); readLock.unlock(); } } private static void write() { System.out.println(Thread.currentThread().getName() + "开始尝试获取写锁"); writeLock.lock(); try { System.out.println(Thread.currentThread().getName() + "得到写锁,正在写入"); try { Thread.sleep(40); } catch (InterruptedException e) { e.printStackTrace(); } } finally { System.out.println(Thread.currentThread().getName() + "释放写锁"); writeLock.unlock(); } } public static void main(String[] args) { new Thread(() -> write(), "Thread1").start(); new Thread(() -> read(), "Thread2").start(); new Thread(() -> read(), "Thread3").start(); new Thread(() -> write(), "Thread4").start(); new Thread(() -> read(), "Thread5").start(); new Thread(() -> { Thread thread[] = new Thread[1000]; for (int i = 0; i < 1000; i++) { thread[i] = new Thread(() -> { read(); }, "子线程创建的Thread" + i); } for (int i = 0; i < 1000; i++) { thread[i].start(); } }).start(); }}
2.4 锁的降级
public class Upgrading { private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(false); private static ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock(); private static ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock(); private static void readUpgrading() { System.out.println(Thread.currentThread().getName() + "开始尝试获取读锁"); readLock.lock(); try { System.out.println(Thread.currentThread().getName() + "得到读锁,正在读取"); try { Thread.sleep(20); System.out.println(Thread.currentThread().getName() + "升级会带来阻塞"); writeLock.lock(); System.out.println(Thread.currentThread().getName() + "获取到了写锁,升级成功"); } catch (InterruptedException e) { e.printStackTrace(); } } finally { System.out.println(Thread.currentThread().getName() + "释放读锁"); readLock.unlock(); } } private static void writeDowngrading() { System.out.println(Thread.currentThread().getName() + "开始尝试获取写锁"); writeLock.lock(); try { System.out.println(Thread.currentThread().getName() + "得到写锁,正在写入"); try { Thread.sleep(40); readLock.lock(); System.out.println("在不释放写锁的情况下,直接获取写锁,成功降级"); } catch (InterruptedException e) { e.printStackTrace(); } } finally { readLock.unlock(); System.out.println(Thread.currentThread().getName() + "释放写锁"); writeLock.unlock(); } } public static void main(String[] args) throws InterruptedException { System.out.println("先演示降级是可以的"); Thread thread1 = new Thread(() -> writeDowngrading(), "Thread1"); thread1.start(); thread1.join(); System.out.println("----------------"); System.out.println("演示升级是不行的"); Thread thread2 = new Thread(() -> readUpgrading(), "Thread2"); thread2.start(); }}
3.原子类
3.1 AtomicInteger
public class AtomicIntegerDemo1 implements Runnable { private static final AtomicInteger atomicInteger = new AtomicInteger(); public void incrementAtomic() { atomicInteger.getAndIncrement(); } private static volatile int basicCount = 0; public void incrementBasic() { basicCount++; } @Override public void run() { for (int i = 0; i < 10000; i++) { incrementAtomic(); incrementBasic(); } } public static void main(String[] args) throws InterruptedException { AtomicIntegerDemo1 r = new AtomicIntegerDemo1(); Thread thread1 = new Thread(r); Thread thread2 = new Thread(r); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("原子类的结果是:" + atomicInteger.get()); System.out.println("普通变量的结果是:" + basicCount); }}
3.2 AtomicIntegerArray
public class AtomicArrayDemo { public static void main(String[] args) { AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(1000); Thread[] threadsIncrementer = new Thread[100]; Thread[] threadsDecrementer = new Thread[100]; Incrementer incrementer = new Incrementer(atomicIntegerArray); Decrementer decrementer = new Decrementer(atomicIntegerArray); for (int i = 0; i < 100; i++) { threadsDecrementer[i] = new Thread(decrementer); threadsIncrementer[i] = new Thread(incrementer); threadsDecrementer[i].start(); threadsIncrementer[i].start(); } for (int i = 0; i < 100; i++) { try { threadsDecrementer[i].join(); threadsIncrementer[i].join(); } catch (InterruptedException e) { e.printStackTrace(); } } for (int i = 0; i < atomicIntegerArray.length(); i++) { if(atomicIntegerArray.get(i)!=0){ System.out.println("发现了错误"+i); } } System.out.println("运行结束"); }}class Decrementer implements Runnable { private AtomicIntegerArray array; public Decrementer(AtomicIntegerArray array) { this.array = array; } @Override public void run() { for (int i = 0; i < array.length(); i++) { array.getAndIncrement(i); } }}class Incrementer implements Runnable { private AtomicIntegerArray array; public Incrementer(AtomicIntegerArray array) { this.array = array; } @Override public void run() { for (int i = 0; i < array.length(); i++) { array.getAndDecrement(i); } }}
3.3 AtomicIntegerFieldUpdater
public class AtomicIntegerFieldUpdaterDemo implements Runnable { static Candidate tom; static Candidate peter; public static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score"); @Override public void run() { for (int i = 0; i < 10000; i++) { peter.score++; scoreUpdater.getAndIncrement(tom); } } public static class Candidate { volatile int score; } public static void main(String[] args) throws InterruptedException { tom = new Candidate(); peter = new Candidate(); AtomicIntegerFieldUpdaterDemo r = new AtomicIntegerFieldUpdaterDemo(); Thread t1 = new Thread(r); Thread t2 = new Thread(r); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("普通变量:" + peter.score); System.out.println("升级后的结果:" + tom.score); }}