基本概念

并发与并行

并发:轮流运行 并行:多核同时执行

进程与线程

进程:程序执行一次的过程 线程:进程中一个独立运行的路径

什么是多线程

多线程:一个进程同时完成多个任务 ,不同的任务由不同的线程独立执行

多线程与多进程

多进程:数据独立 多线程:共享数据 多线程通信更容易 资源消耗更小


线程的状态

操作系统中有5总 java6种 可以通过 t.getState() 获取

  • New(新建)

new Thread(r);

  • Runnable(可运行)

    t.start();
    此时等待CPU调度 抢占时间片 不一定马上被执行

  • Blocked(阻塞)、

尝试获取对象锁 但该锁被占用

  • Waiting(等待)

等待其他线程通知
(如:调用wait join 等待Lock Condition)

  • Timed waiting(计时等待)

等待其他线程通知或超时
(如带有参数的 sleep wait join Lock.tryLock Condition.await)

  • Terminated(终止)

运行结束 或 抛异常 或 主动停止

线程状态转化图

image.png


InterruptedException 中断异常

一个等待状态的线程被调用interrupt()方法就会抛出 所以 sleep wait 方法需要捕获中断异常

代码

  1. public class Test {
  2. public static void main(String[] args) {
  3. //开启睡觉线程
  4. Thread t = new Thread(()->{
  5. try {
  6. Task.sleepTask();
  7. } catch (InterruptedException e) {
  8. //被打断时的处理
  9. System.out.println("睡觉线程被打断了 正在进行业务操作 如:数据回滚");
  10. }
  11. },"睡觉线程");
  12. t.start();
  13. //打断睡觉线程
  14. try {
  15. TimeUnit.SECONDS.sleep(2);
  16. System.out.println("主线程两秒后打断睡觉线程");
  17. t.interrupt();
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. }
  23. class Task{
  24. public static void sleepTask() throws InterruptedException {
  25. System.out.println(Thread.currentThread().getName()+" 开始睡觉了");
  26. TimeUnit.SECONDS.sleep(10);
  27. }
  28. }

image.png

!注:InterruptedException 推荐抛出 这样调用该的人才能处理


守护进程

t.setDaemon(true); 1.必须在调用前设置 2.一般用于给别的线程提供服务 如:倒计时线程 GC线程 3.当只剩下守护线程时程序就结束


uncaughtException 未捕获异常

1.出现未捕获异常时 线程死亡前会把异常传给UncaughtExceptionHandler的实现类处理 2.默认的处理器是ThreadGroup 如果没设置 就是单纯的打印栈信息 3.给t1线程设置处理器 t1.setUncaughtExceptionHandler(UncaughtExceptionHandler eh); 4.给全部线程设置处理器 Thread.setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler eh);

默认未捕获异常处理器:ThreadGroup的源码
image.png

代码

  1. public static void main(String[] args) throws InterruptedException {
  2. Thread thread = new Thread(()->{
  3. int i = 1/0;
  4. System.out.println("OK");
  5. });
  6. thread.setUncaughtExceptionHandler((t,e)->{
  7. System.out.println(t.getName()+" 线程出现了未知异常:"+e.getMessage()+",正在记录日志");
  8. });
  9. thread.start();
  10. }

结果:image.png


线程优先级

1 ~ 10 默认5 10最大 thread.setPriority(10); / thread.setPriority(Thread.MAX_PRIORITY);


同步

覆盖问题

多个线程同时修改一个对象 发送了覆盖现象


覆盖问题例子

小红小明各有1000元存银行 银行总存款为2000 让两人互相转钱 结果出现银行总存款大于2000 各自余额小于0

image.png
image.png

代码

  1. public class Bank {
  2. //银行账号
  3. private List<Account> accounts = new ArrayList<>();
  4. //开户
  5. public void openAccount(Account account) {
  6. accounts.add(account);
  7. System.out.println(account.getName() + "开户成功 存款:" + account.getBalance()
  8. + " 银行总存款:" + getTotalBalance());
  9. }
  10. //转账
  11. public void transfer(Account from, Account to, int amount) {
  12. from.setBalance(from.getBalance() - amount);
  13. to.setBalance(to.getBalance() + amount);
  14. System.out.println(from.getName() + " 给 " + to.getName() + " 转了:" + amount
  15. + " 当前余额:" + from.getBalance() + " 银行总存款:" + getTotalBalance());
  16. }
  17. //查询银行总存款
  18. public int getTotalBalance() {
  19. return accounts.stream().mapToInt(Account::getBalance).sum();
  20. }
  21. public static void main(String[] args) {
  22. Bank bank = new Bank();
  23. Account a1 = new Account("小明", 1000);
  24. Account a2 = new Account("小红", 1000);
  25. bank.openAccount(a1);
  26. bank.openAccount(a2);
  27. //开两个线程让小红小明互相转钱
  28. for (int i = 0; i < 2; i++) {
  29. new Thread(() -> {
  30. while (true) {
  31. bank.transfer(a1, a2, 500);
  32. bank.transfer(a2, a1, 500);
  33. }
  34. }).start();
  35. }
  36. }
  37. }
  38. class Account {
  39. private String name;
  40. private int balance;
  41. public Account(String name, int balance) {
  42. this.name = name;
  43. this.balance = balance;
  44. }
  45. public String getName() {
  46. return name;
  47. }
  48. public int getBalance() {
  49. return balance;
  50. }
  51. public void setBalance(int balance) {
  52. this.balance = balance;
  53. }
  54. }

原因及图解

转账的操作不具有原子性 例如:线程1 和 线程2 同时执行给账号A转账100的操作 由于转账操作不具有原子性 线程2在线程1将本地内存中的数据写回共享内存前先完成了转账操作 结果线程2的操作结果被线程1给覆盖了 理论存款为1200 实际只有1100 image.png


解法1 重入锁ReentrantLock

  1. 修改转账和查询方法 在方法前后加上重入锁 将方法变为原子性操作

image.png

  1. 在转账前加入余额不足等待操作 在转账后唤醒等待的账号

image.png 重入锁:

  • 线程可以嵌套获得锁对象 比如AB方法都有锁 A调用B 就是锁了两次
  • 请确保调用的是同一个Lock对象锁才生效
  • 解锁推荐放在finally里
  • 有加锁就必须解锁 否则会发送死锁
  • 可以配置公平锁(先来后到)

Condition

  • 判断条件要放在while里 不然会产生虚假唤醒
  • 唤醒最好用signalAll方法 signal只唤醒一个 如果唤醒的这个不符合条件就会产生死锁
  1. public class Bank {
  2. //银行账号
  3. private List<Account> accounts = new ArrayList<>();
  4. //重入锁
  5. private Lock lock = new ReentrantLock();
  6. //余额不足等待条件
  7. private Condition sufficientFunds = lock.newCondition();
  8. //开户
  9. public void openAccount(Account account) {
  10. accounts.add(account);
  11. System.out.println(account.getName() + "开户成功 存款:" + account.getBalance()
  12. + " 银行总存款:" + getTotalBalance());
  13. }
  14. //转账
  15. public void transfer(Account from, Account to, int amount) {
  16. lock.lock();
  17. try {
  18. while (from.getBalance() < amount)
  19. sufficientFunds.await();
  20. from.setBalance(from.getBalance() - amount);
  21. to.setBalance(to.getBalance() + amount);
  22. sufficientFunds.signalAll();
  23. System.out.println(from.getName() + " 给 " + to.getName() + " 转了:" + amount
  24. + " 当前余额:" + from.getBalance() + " 银行总存款:" + getTotalBalance());
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. } finally {
  28. lock.unlock();
  29. }
  30. }
  31. //查询银行总存款
  32. public int getTotalBalance() {
  33. lock.lock();
  34. try {
  35. return accounts.stream().mapToInt(Account::getBalance).sum();
  36. } finally {
  37. lock.unlock();
  38. }
  39. }
  40. public static void main(String[] args) {
  41. Bank bank = new Bank();
  42. Account a1 = new Account("小明", 1000);
  43. Account a2 = new Account("小红", 1000);
  44. bank.openAccount(a1);
  45. bank.openAccount(a2);
  46. //开两个线程让小红小明互相转钱
  47. for (int i = 0; i < 4; i++) {
  48. new Thread(() -> {
  49. while (true) {
  50. bank.transfer(a1, a2, 500);
  51. bank.transfer(a2, a1, 500);
  52. }
  53. }).start();
  54. }
  55. }
  56. }

解法2 synchronized 同步方法

  1. 调用synchronized锁住转账和查询方法

image.png

  1. 加入余额不足等待和唤醒

image.png

synchronized

  • 锁方法就是谁调用锁谁 如果是静态方法就是锁类对象的锁 A.class

与Lock的区别

  • 不能中断
  • 不能指定超时时间 (lock有tryLock方法)
  • 只有一个条件 this.wait 可能不够用
  1. public class Bank {
  2. //银行账号
  3. private List<Account> accounts = new ArrayList<>();
  4. //开户
  5. public void openAccount(Account account) {
  6. accounts.add(account);
  7. System.out.println(account.getName() + "开户成功 存款:" + account.getBalance()
  8. + " 银行总存款:" + getTotalBalance());
  9. }
  10. //转账
  11. public synchronized void transfer(Account from, Account to, int amount){
  12. while (from.getBalance() < amount) {
  13. try {
  14. this.wait();
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. from.setBalance(from.getBalance() - amount);
  20. to.setBalance(to.getBalance() + amount);
  21. this.notifyAll();
  22. System.out.println(from.getName() + " 给 " + to.getName() + " 转了:" + amount
  23. + " 当前余额:" + from.getBalance() + " 银行总存款:" + getTotalBalance());
  24. }
  25. //查询银行总存款
  26. public synchronized int getTotalBalance() {
  27. return accounts.stream().mapToInt(Account::getBalance).sum();
  28. }
  29. public static void main(String[] args) {
  30. Bank bank = new Bank();
  31. Account a1 = new Account("小明", 1000);
  32. Account a2 = new Account("小红", 1000);
  33. bank.openAccount(a1);
  34. bank.openAccount(a2);
  35. //开两个线程让小红小明互相转钱
  36. for (int i = 0; i < 4; i++) {
  37. new Thread(() -> {
  38. while (true) {
  39. bank.transfer(a1, a2, 500);
  40. bank.transfer(a2, a1, 500);
  41. }
  42. }).start();
  43. }
  44. }
  45. }

解法3 同步代码块

image.png

  1. public class Bank {
  2. //银行账号
  3. private List<Account> accounts = new ArrayList<>();
  4. //监视器
  5. private Object monitor = new Object();
  6. //开户
  7. public void openAccount(Account account) {
  8. accounts.add(account);
  9. System.out.println(account.getName() + "开户成功 存款:" + account.getBalance()
  10. + " 银行总存款:" + getTotalBalance());
  11. }
  12. //转账
  13. public void transfer(Account from, Account to, int amount) {
  14. synchronized (monitor) {
  15. while (from.getBalance() < amount) {
  16. try {
  17. monitor.wait();
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. from.setBalance(from.getBalance() - amount);
  23. to.setBalance(to.getBalance() + amount);
  24. monitor.notifyAll();
  25. System.out.println(from.getName() + " 给 " + to.getName() + " 转了:" + amount
  26. + " 当前余额:" + from.getBalance() + " 银行总存款:" + getTotalBalance());
  27. }
  28. }
  29. //查询银行总存款
  30. public int getTotalBalance() {
  31. synchronized (monitor) {
  32. return accounts.stream().mapToInt(Account::getBalance).sum();
  33. }
  34. }
  35. public static void main(String[] args) {
  36. Bank bank = new Bank();
  37. Account a1 = new Account("小明", 1000);
  38. Account a2 = new Account("小红", 1000);
  39. bank.openAccount(a1);
  40. bank.openAccount(a2);
  41. //开两个线程让小红小明互相转钱
  42. for (int i = 0; i < 4; i++) {
  43. new Thread(() -> {
  44. while (true) {
  45. bank.transfer(a1, a2, 500);
  46. bank.transfer(a2, a1, 500);
  47. }
  48. }).start();
  49. }
  50. }
  51. }

volatile

  1. 保证线程可见性

如下代码 不加volatile线程将不会停止

  1. private volatile static boolean flag = true;
  2. public static void main(String[] args) throws InterruptedException {
  3. new Thread(()->{
  4. while (flag){
  5. }
  6. System.out.println("循环停止");
  7. }).start();
  8. TimeUnit.SECONDS.sleep(1);
  9. flag = false;
  10. }

2.防止指令重排 DCL双重检测锁必须加volatile 或者说多线程下的判断条件最好都加


原子操作

  1. //AtomicInteger
  2. int result = atomicInteger.incrementAndGet();//+1
  3. atomicInteger.compareAndSet(result, 0);//设为0
  4. atomicInteger.updateAndGet((x)->{return Math.max(x, 10);});//和10比较
  5. atomicInteger.accumulateAndGet(3,(x,y)->{return x+y;});//+3
  6. //LongAdder 提升效率的自增 分开计算最后合并
  7. LongAdder adder = new LongAdder();
  8. for (int i = 0; i < 1000; i++) {
  9. new Thread(()->{
  10. adder.increment();
  11. }).start();
  12. }
  13. System.out.println(adder.sum());
  14. //LongAccumulator 表达式操作
  15. LongAccumulator accumulator = new LongAccumulator(Math::max,0);
  16. for (int i = 0; i < 1000; i++) {
  17. new Thread(()->{
  18. accumulator.accumulate(1);
  19. }).start();
  20. }
  21. System.out.println(accumulator.get());

线程局部变量 ThreadLocal

可以保证线程持有一个本地对象 原理是一个Map key为线程本身 value是持有的对象 key和value通过弱引用连接的一起 方便线程消销毁时回收对象

应用: SimpleDateFormat线程不安全 通过ThreadLocal保证其安全

Random线程安全 但是多线程会降低效率 ThreadLocalRandom.current().nextInt(10);可提高效率

  1. public static ThreadLocal<SimpleDateFormat> string = ThreadLocal
  2. .withInitial(() -> {
  3. return new SimpleDateFormat("yyy-MM-dd");
  4. });

死锁

多个线程互相等待对方锁持有的锁而导致程序停住 java语言本身不能解决死锁 必须在程序设计层面解决

死锁是狄杰斯克拉提出来了 有四个必要条件: 条件互斥 请求与保持 不可剥夺 循环等待 破环其中至少一种条件就可解决死锁

常用解法: 管程法 信号灯法

为什么弃用 stop suspend方法

这两个方法容易造成死锁


线程安全的集合

阻塞队列

阻塞队列:队满时插入 和 队空时获取 时可阻塞的队列

当发送队满时插入 和 队空时获取时的三种方法:

  1. 阻塞

put / take 适合做线程管理

  1. 返回值

offer / peek / poll offer和poll可以指定时间

  1. 抛异常

add / element / remove

ArrayBlockingQueue 可以指定是否公平 LinkedBlockingQueue 理论上大小无上线 并且是一个双端队列 PriorityBlockingQueue 可以指定优先级 DelayQueue 延迟队列 时间到了才能获取 比如完成3秒后执行提交的动作 TransferQueue 传递队列 生产者调用传递方法等待消费者接收 可以完成聊天功能

!注: 当poll和peek失败时返回null 所以阻塞队列不能存放null值

  1. //DelayQueue事例
  2. public class Test {
  3. public static void main(String[] args) throws InterruptedException {
  4. DelayQueue queue = new DelayQueue();
  5. queue.put(new MyDelayed(() -> {
  6. System.out.println("执行任务1");
  7. }, 3, TimeUnit.SECONDS));
  8. queue.put(new MyDelayed(() -> {
  9. System.out.println("执行任务2");
  10. }, 6, TimeUnit.SECONDS));
  11. queue.put(new MyDelayed(() -> {
  12. System.out.println("执行任务3");
  13. }, 3, TimeUnit.SECONDS));
  14. while (true) {
  15. MyDelayed task = (MyDelayed) queue.take();
  16. task.execute();
  17. }
  18. }
  19. }
  20. //自定义延迟对象
  21. class MyDelayed implements Delayed {
  22. private long executionTime;
  23. //时间到了后执行的任务
  24. private Runnable runnable;
  25. public MyDelayed(Runnable runnable, long delay, TimeUnit timeUnit) {
  26. this.runnable = runnable;
  27. this.executionTime = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(delay, timeUnit);
  28. }
  29. //获得过去时间 大于 0 则过期
  30. @Override
  31. public long getDelay(TimeUnit unit) {
  32. return unit.convert( executionTime - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
  33. }
  34. //队列内部比较 优先返回快过期的任务
  35. @Override
  36. public int compareTo(Delayed o) {
  37. return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
  38. }
  39. public void execute() {
  40. new Thread(runnable).start();
  41. }
  42. }
  1. //TransferQueue 生产者等待消费者上线
  2. public static void main(String[] args) throws InterruptedException {
  3. TransferQueue queue = new LinkedTransferQueue();
  4. for (int i = 0; i < 10; i++) {
  5. int tmp = i;
  6. new Thread(() -> {
  7. try {
  8. queue.transfer("消费者传过来的数据" + tmp);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. }).start();
  13. }
  14. new Thread(() -> {
  15. while (true) {
  16. try {
  17. String msg = (String) queue.take();
  18. System.out.println("我是消费者 我接收到了:" + msg);
  19. TimeUnit.MILLISECONDS.sleep(500);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. }).start();
  25. }

阻塞队列例子

判断文件夹下的文件是否包含某个关键字

一个线程读取文件夹下面的全部文件放入阻塞队列中 最后一个文件用特殊标志位代替 另一个线程监听接收队列中的文件 并开启新的线程进行查找操作 当监听到特殊标志位文件时结束

  1. /**
  2. * 类描述: 利用阻塞队列和多线程 读取文件夹下代码的行数
  3. */
  4. public class CountCodeLines {
  5. //文件夹路径
  6. private BlockingQueue<File> queue = new LinkedBlockingQueue<>();
  7. private static Long lines = 0L;
  8. private static AtomicInteger finished = new AtomicInteger();
  9. private File dummy = new File("");//用于做结束的判断条件
  10. //线程池
  11. private ExecutorService threadPool = new ThreadPoolExecutor(2,
  12. Runtime.getRuntime().availableProcessors(),
  13. 30,
  14. TimeUnit.SECONDS,
  15. new ArrayBlockingQueue<Runnable>(30),
  16. Executors.defaultThreadFactory(),
  17. new ThreadPoolExecutor.CallerRunsPolicy());
  18. /**
  19. * 统计代码量
  20. *
  21. * @return
  22. */
  23. public long count(String dir) throws InterruptedException {
  24. //获取所有文件放入阻塞队列
  25. threadPool.execute(()->{doReadFiles(dir);});
  26. //监听阻塞队列并统计
  27. int count = 0;
  28. do {
  29. File file = queue.take();
  30. System.out.println(file.getAbsolutePath());
  31. if (file.equals(dummy)) {
  32. break;
  33. }
  34. //开启线程读取行数
  35. threadPool.execute(()->{readLines(file);});
  36. count++;
  37. } while (true);
  38. threadPool.shutdown();
  39. while (count != finished.get()) {
  40. TimeUnit.MILLISECONDS.sleep(100);
  41. }
  42. return lines;
  43. }
  44. //将java文件从文件夹中读出来并放入阻塞队列
  45. private void doReadFiles(String dir) {
  46. try {
  47. readFiles(dir);
  48. queue.put(dummy);
  49. } catch (InterruptedException e) {
  50. e.printStackTrace();
  51. }
  52. }
  53. //将java文件从文件夹中读出来并放入阻塞队列
  54. private void readFiles(String dir) throws InterruptedException {
  55. File file = new File(dir);
  56. if (file.isDirectory()) {
  57. String[] files = file.list();
  58. for (String subFile : files) {
  59. //递归
  60. this.readFiles(file.getAbsolutePath() + "\\" + subFile);
  61. }
  62. } else if ("java".equals(file.getName().substring(file.getName().lastIndexOf(".") + 1))) {
  63. queue.put(file);
  64. }
  65. }
  66. //读取每个文件的行数 并加到共享变量中
  67. private void readLines(File file) {
  68. try {
  69. int tmp = 0;
  70. Scanner scanner = new Scanner(file);
  71. while (scanner.hasNextLine()) {
  72. scanner.nextLine();
  73. tmp++;
  74. }
  75. synchronized (lines) {
  76. lines += tmp;
  77. }
  78. } catch (FileNotFoundException e) {
  79. e.printStackTrace();
  80. }finally {
  81. scanner.close();
  82. }
  83. finished.incrementAndGet();
  84. }
  85. public static void main(String[] args) {
  86. CountCodeLines countCodeLines = new CountCodeLines();
  87. try {
  88. System.out.println(countCodeLines.count("D:\\eclipse-workspace"));
  89. } catch (InterruptedException e) {
  90. e.printStackTrace();
  91. }
  92. }
  93. }

并发的Map Set Queue

ConcurrentHashMap ConcurrentSkipListMap ConcurrentSkipListSet ConcurrentLinkedQueue

这些集合采用了复杂的算法 尽可能提高并发访问时的效率 他们的size方法是通过变量合并的 不是顺发的 mappinngCount方法可以返回Long类型的个数 防止21亿不够用 返回的迭代器是弱一致性的 就是不一定能反应之后的修改情况 但是不会抛出并发修改异常

ConcurrentHashMap 默认支持16个并发 大于16就阻塞 不允许null值 ConcurrentLinkedQueue 线程安全无上限 ConcurrentSkipListSet 跳表数据结构 有序 元素需要实现Comparable ConcurrentSkipListMap 键有序 同上

ConcurrentHashMap 的键可以当并发的set Set set = ConcurrentHashMap.newKeySet();

  1. //保证原子性的自增
  2. ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap();
  3. map.compute("key",(k,v)->{return v == null ? 1 : v + 1;});
  4. System.out.println(map.get("key"));//结果为1
  5. map.merge("key", 2, (x,y)->{return x+y;});
  6. System.out.println(map.get("key"));//结果为3

CopyOnWrite

修改时先拷贝一个新的副本 用新的副本修改后替换掉老的 这样查询的时候就不用上锁了 当查询远大于修改时很推荐使用 但有个缺点:修改时正在遍历的线程可能会访问到老的数据

CopyOnWriteArrayList list = new CopyOnWriteArrayList(); CopyOnWriteArraySet set = new CopyOnWriteArraySet();

image.png


Collections提供的线程方法

  1. List<Object> list = Collections.synchronizedList(new ArrayList<>());
  2. Map<Object, Object> map = Collections.synchronizedMap(new HashMap<>());
  3. Set<Object> set = Collections.synchronizedSet(new HashSet<>());

Collections 给每个子类都提供了支持并发的内部包装类 本质上就是返回包装类对象 该对象大部分方法都上锁 如map: image.png image.png

当经常修改时用Collections的方法比较好 查询比修改多很多时用CopyOnWrite比较好 但是Collections的内部类依然不能遍历时修改 这时要么自己上锁 要么用ConcurrentHashMap或者CopyOnWrite


总结

  1. 需要阻塞就用阻塞队列
  2. 遍历时修改 就用Concurrent的 和 CopyOnWrite 不然会抛并发修改异常

但是这两个都不能保证遍历的数据是最新的

  1. Collections提供的内部类能保证单一的增删改查是原子性的
  2. 如果查操作多于增删改就用 CopyOnWrite 因为这玩意查询方法不用上锁 比较快

高效并发操作


ConcurrentHashMap的批操作

有三种方法 search reduce forEach 搜索 归约遍历

  1. //search方法
  2. //需要指定阀值 就是每个线程大约处理多少数据
  3. //方法判断失败时返回null
  4. //操作键 返回第一个长度大于10的键
  5. String key = map.searchKeys(1000, (k)->{return k.length() > 10 ? k : null;});
  6. //操作值 返回第一个大于1000的值
  7. Integer value = map.searchValues(1000, (v)->{return v > 1000 ? v : null;});
  8. //操作键和值 返回第一个值大于1000的键
  9. String key = map.search(1000, (k, v) -> {return v > 1000 ? k : null;});
  10. //操作键值对 同上
  11. String key = map.searchEntries(1000, (e)->{return e.getValue() > 1000 ? e.getKey() : null;});
  12. //reduce方法
  13. //操作键和值 将值大于1000的数累加
  14. Integer reduce = map.reduce(1000,
  15. (k, v) -> {return v > 1000 ? v : null;},
  16. (v1, v2) -> {return v1 + v2;});
  17. //多了个默认值 当集合为空的时候返回0 但是遍历无结果仍然返回null
  18. Integer reduce = map.reduceToInt(1000,
  19. (k, v) -> {return v > 100000 ? v : null;},
  20. 0,
  21. (v1, v2) -> {return v1 + v2;});
  1. //普通遍历
  2. map.forEach((k, v) -> {
  3. System.out.println(k + " " + v);
  4. });
  5. //并行
  6. map.forEach(1000, (k, v) -> {
  7. System.out.println(k + " " + v);
  8. });
  9. //第二个函数的返回值传给第三个函数作为参数
  10. map.forEach(1000, (k, v) -> {
  11. return k + " " + v;
  12. }, (s) -> {
  13. System.out.println(s);
  14. });

数组的并行操作

  1. 并行排序

Arrays.parallelSort(arr); //并行排序 本质是ForkJoin

  1. 并行填充

Arrays.parallelSetAll(arr, (i)->{return i+1;}); 往arr中填写1 2 3 4 … 等数据

  1. 并行与前一个操作

Arrays.parallelPrefix(ints, (x, y) -> {return x + y;}); 与前一个相加 比如 [ 2,5,1,0] 执行后 -> [2,7,8,8]

  1. //填充ints 数组 1 2 3 4 5 6 7 8 9 10
  2. int[] ints = new int[10];
  3. Arrays.parallelSetAll(ints, (i)->{return i+1;});
  4. //与前一个相加
  5. //比如 [ 2,5,1,0] 执行后 -> [2,7,8,8]
  6. Arrays.parallelPrefix(ints, (x, y) -> {return x + y;});

并行填充本质:
image.png


任务和线程池

线程池:利益池化技术管理线程 强调一个复用的概念 能达到节约资源 提高效率 控制最大并发量的效果 阿里巴巴开发手册不让使用Java自带的线程池对象 因为默认最大值是Int的最大值 可能会发生OOM 因此我们深刻理解线程池参数的含义 掌握自己配置的能力


Callable和Future

callable 是有返回值的任务 但是线程只能接收Runnable对象 于是用Future封装了下 并且通过Future管理(获取结果、取消任务、查询任务状态)

  1. Callable<String> task = ()->{return "我是Callable的call方法";};
  2. FutureTask<String> futureTask = new FutureTask<>(task);
  3. new Thread(futureTask).start();
  4. System.out.println(futureTask.get());

Executors

Executors可以通过静态工厂的方法创建线程池 线程池可以执行Callable或Runnable任务 并返回Future对象 如果是调用Runnable任务返回的Future 调用get时只会简单的返回null 不用时请调用shoutdown或shoutdownNow image.png

  • newCachedThreadPool
    • 任务立即执行 有空闲线程就调用 没有就创建 线程或过期
    • 适合任务快 或者 任务阻塞
  • newFixedThreadPool
    • 固定线程 线程不够了就把任务放入等待队列
  • newSingleThreadExecutor
    • 同上 可以做性能分析 通单线程替换 然后统计运行时间
  • newScheduledThreadPool
    • 固定线程 延迟执行 或 延迟后周期执行
  • newSingleThreadScheduledExecutor
    • 单线程 同上

_

  1. //定时
  2. ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(5);
  3. threadPool.schedule(() -> {
  4. System.out.println("延迟执行");
  5. }, 5, TimeUnit.SECONDS);
  6. threadPool.scheduleAtFixedRate(() -> {
  7. System.out.println("延迟然后周期执行执行");
  8. },2, 3, TimeUnit.SECONDS);
  9. threadPool.scheduleWithFixedDelay(() -> {
  10. System.out.println("延迟然后周期执行 上一个任务完成时才能调度");
  11. }, 3,5, TimeUnit.SECONDS);

invokeAny invokeAll 和 ExecutorCompletionService

invokeAny: 并发执行任务 返回第一个结果 并且返回后其他任务便不往下执行了 适合完成搜索功能 invokeAll: 并发执行任务 等全部执行完后一切返回结果 ExecutorCompletionService: 并发执行任务 通过包装普通执行器创建 会将执行的结果按序放到阻塞队列中 一个一个获取

  1. //返回第一个
  2. String firstResult = threadPool.invokeAny(tasks);
  3. System.out.println(firstResult);
  4. //等全部完成 返回全部
  5. List<Future<String>> futures = threadPool.invokeAll(tasks);
  6. for (Future<String> future : futures) {
  7. System.out.println(future.get());
  8. }
  9. //完成一个返回一个
  10. ExecutorCompletionService<String> service = new ExecutorCompletionService<>(threadPool);
  11. for (Callable<String> task : tasks) {
  12. service.submit(task);
  13. }
  14. for (int i = 0; i < tasks.size(); i++) {
  15. System.out.println(service.take().get());
  16. }

fork-join

类似递归的思想 把大任务分成小任务多线程处理 最后合并 适合大数据计算 有一个任务窃取的概念 任务队列是双向的 执行完任务的空闲线程会从末尾偷取别的线程的任务来工作 使用方法 继承RecursiveTask 或者 RecursiveAction 前者有返回值

  1. //处理double数组 但测试感觉不是很快
  2. class Counter extends RecursiveTask<Integer> {
  3. public static final int THRESHOLD = 10000;
  4. private double[] values;
  5. private int from;
  6. private int to;
  7. private DoublePredicate filter;
  8. public Counter(double[] values, int from, int to, DoublePredicate filter) {
  9. this.values = values;
  10. this.from = from;
  11. this.to = to;
  12. this.filter = filter;
  13. }
  14. @Override
  15. protected Integer compute() {
  16. if ((to - from) < THRESHOLD) {
  17. int count = 0;
  18. for (int i = from; i < to; i++) {
  19. if (filter.test(values[i])) count++;
  20. }
  21. return count;
  22. } else {
  23. //fork-join
  24. int mid = (from + to) / 2;
  25. Counter first = new Counter(values, from, mid, filter);
  26. Counter second = new Counter(values, mid, to, filter);
  27. invokeAll(first, second);
  28. return first.join() + second.join();
  29. }
  30. }
  31. }

invockAll方法是父类的方法 执行多个任务 调用后阻塞 直到所有任务都完成 image.png


异步计算

不用再阻塞等待结果 通过回调方法异步处理


CompletableFuture

通过回调函数异步执行 并且可以编写执行顺序

  1. ExecutorService threadPool = Executors.newFixedThreadPool(4);
  2. // 创建异步执行任务:
  3. CompletableFuture<Double> cf = CompletableFuture.supplyAsync(()->{
  4. try {
  5. TimeUnit.SECONDS.sleep(1);
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. }
  9. return 1.0;},threadPool);
  10. // 如果执行成功:
  11. cf.thenAccept((result) -> {
  12. System.out.println("price: " + result);
  13. });
  14. // 如果执行异常:
  15. cf.exceptionally((e) -> {
  16. e.printStackTrace();
  17. return null;
  18. });
  19. threadPool.shutdown();