前言

进程:内存运行的程序。

线程:进程中的一个执行单元。

创建多线程的方式

本质都是调用自己写的run方法。

1继承thread抽象类

  1. Thread thread = new Thread(){
  2. @Override
  3. public void run() {
  4. System.out.println("线程创建的第一种方式:"+Thread.currentThread().getName());
  5. }
  6. };
  7. thread.start();

2实现Runnable接口

  1. Thread thread2 = new Thread(new Runnable() {
  2. @Override
  3. public void run() {
  4. System.out.println("线程创建的第二种方式:"+Thread.currentThread().getName());
  5. }
  6. });
  7. thread2.start();

3实现Callable接口

可以有返回值,可以抛出异常

  1. public class TestCallAble implements Callable<String> {
  2. /**
  3. * Computes a result, or throws an exception if unable to do so.
  4. *
  5. * @return computed result
  6. * @throws Exception if unable to compute a result
  7. */
  8. @Override
  9. public String call() throws Exception {
  10. return "123";
  11. }
  12. }
  13. # 调用
  14. Future<String> submit = executorService.submit(new TestCallAble());
  15. System.out.println(submit.get());

lambda简写

改进实现runnable接口的形式

jdk8 lambda简写:

  1. new Thread(()->{
  2. System.out.println(123);
  3. }).start();

完整版

  1. new Thread(new Runnable() {
  2. @Override
  3. public void run() {
  4. for (int i = 0; i < 10000; i++) {
  5. System.out.println(Thread.currentThread().getName() + i);
  6. }
  7. }
  8. }).start();

线程池

  • Executors创建线程池

阿里巴巴不推荐

  1. ExecutorService service = Executors.newFixedThreadPool(2);
  2. service.submit(new Runnable() {
  3. @Override
  4. public void run() {
  5. System.out.println("新线程执行");
  6. }
  7. });
  1. 有什么线程池?
  2. new SingleThreadExecutor:创建一个单线程的线程池
  3. new FixedThreadPool:创建固定大小的线程池
  4. new CachedThreadPool:创建一个可缓存的线程池,最大空闲时间默认为1分钟,超过就会被删除。
  5. new ScheduledThreadPool:创建一个大小无限的线程池,可以延迟、定时循环执行任务。
  • 线程池启动策略
  1. ThreadPoolExecutor executor = new ThreadPoolExecutor(10,
  2. 20,
  3. 60L,
  4. TimeUnit.SECONDS,
  5. new SynchronousQueue<Runnable>(),
  6. Executors.defaultThreadFactory(),
  7. new ThreadPoolExecutor.AbortPolicy()
  8. );
  1. 线程池创建
  2. 添加任务时:
  3. 如果当前线程数量小于corePoolSize,则马上创建线程运行任务。
  4. 如果大于等于core Pool Size,任务放入队列。
  5. 如果队列满了,线程总数小于最大值,创建线程运行任务。
  6. 如何,线程总数等于,则报错。

线程安全

多个线程同时对一个数据进行写操作,会出现安全问题。

同步代码块

所有线程都要使用同一把锁,比如: “”

  1. synchronized(同步锁){
  2. # 需要同步操作的代码
  3. }
  4. # 强制处于waiting状态
  5. 同步锁.waiting();
  6. # 唤醒处于waiting状态的线程来抢锁:
  7. 同步锁.notify();

同步方法

对于非static方法,同步锁就是this。 对于static方法,我们使用当前方法所在类的字节码对象(类名.class)

  1. public synchronized void method(){
  2. 可能会产生线程安全问题的代码
  3. }

锁机制

同理:一定要使用同一把锁!!!

  1. Lock lock = new ReentrantLock();
  2. ExecutorService service = Executors.newFixedThreadPool(2);
  3. service.submit(new Runnable() {
  4. @Override
  5. public void run() {
  6. lock.lock();
  7. System.out.println(123);
  8. lock.unlock();
  9. }
  10. });

线程状态

java-线程 - 图1

多线程处理数据

多线程执行任务,并获取返回的结果,按照返回结果的顺序依次处理。

参考:https://my.oschina.net/hongliangsun/blog/1546370

  1. import java.util.concurrent.Callable;
  2. import java.util.concurrent.CompletionService;
  3. import java.util.concurrent.ExecutionException;
  4. import java.util.concurrent.ExecutorCompletionService;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. import java.util.concurrent.Future;
  8. public class ExecutorCallableTest {
  9. public static void main(String[] args) {
  10. //创建一个线程池
  11. ExecutorService pools = Executors.newFixedThreadPool(5);
  12. CompletionService<Integer> s = new ExecutorCompletionService<Integer>(pools);
  13. //创建多个有返回值的任务, 要用lambda表达式。
  14. for(int i = 0 ; i <= 10 ; i++){
  15. s.submit(()-> new Integer(id) );
  16. }
  17. for(int i = 0 ; i <= 10 ; i++){
  18. try {
  19. System.out.println(s.take().get());
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. pools.shutdown();
  25. }
  26. }
  27. 任务[1]开始执行
  28. 任务[4]开始执行
  29. 任务[5]开始执行
  30. 任务[6]开始执行
  31. 任务[0]开始执行
  32. 任务[2]开始执行
  33. 任务[9]开始执行
  34. 任务[8]开始执行
  35. 任务[7]开始执行
  36. 4
  37. 1
  38. 任务[10]开始执行
  39. 5
  40. 6
  41. 0
  42. 2
  43. 9
  44. 8
  45. 7
  46. 10
  47. 任务[3]开始执行
  48. 3

限制线程访问数

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. import java.util.concurrent.Semaphore;
  4. public class SemaphoreDemo {
  5. public static void main(String[] args) {
  6. ExecutorService exec = Executors.newCachedThreadPool();
  7. // 最多10个线程同时访问
  8. final Semaphore semaphore = new Semaphore(10);
  9. // 20个线程同时启动
  10. for (int i = 1; i <= 20; i++) {
  11. final int index = i;
  12. exec.execute(new Runnable() {
  13. @Override
  14. public void run() {
  15. try {
  16. // 获取许可
  17. semaphore.acquire();
  18. // 调用资源
  19. callRomote(index);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. } finally {
  23. // 访问完后,释放
  24. semaphore.release();
  25. //availablePermits()指的是当前库中有多少个许可可以被使用
  26. System.out.println("availablePermits => " + semaphore.availablePermits());
  27. }
  28. }
  29. });
  30. }
  31. // 退出线程池
  32. exec.shutdown();
  33. }
  34. /**
  35. * 被调用资源
  36. *
  37. * @param arg
  38. */
  39. public static void callRomote(int arg) {
  40. System.out.println("arg: " + arg);
  41. try {
  42. Thread.sleep((long) (Math.random() * 6000));
  43. } catch (InterruptedException e) {
  44. e.printStackTrace();
  45. }
  46. }
  47. }

CompletableFuture

参考:https://www.liaoxuefeng.com/wiki/1252599548343744/1306581182447650
jdk8新功能

回调函数

针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法

  1. public class Main {
  2. public static void main(String[] args) throws Exception {
  3. // 创建异步执行任务:
  4. CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice);
  5. // 如果执行成功:
  6. cf.thenAccept((result) -> {
  7. System.out.println("price: " + result);
  8. });
  9. // 如果执行异常:
  10. cf.exceptionally((e) -> {
  11. e.printStackTrace();
  12. return null;
  13. });
  14. // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
  15. Thread.sleep(200);
  16. }
  17. static Double fetchPrice() {
  18. try {
  19. Thread.sleep(100);
  20. } catch (InterruptedException e) {
  21. }
  22. if (Math.random() < 0.3) {
  23. throw new RuntimeException("fetch price failed!");
  24. }
  25. return 5 + Math.random() * 20;
  26. }
  27. }

并行执行

功能:
image.png

  1. public class Main {
  2. public static void main(String[] args) throws Exception {
  3. // 两个CompletableFuture执行异步查询:
  4. CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
  5. return queryCode("中国石油", "https://finance.sina.com.cn/code/");
  6. });
  7. CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
  8. return queryCode("中国石油", "https://money.163.com/code/");
  9. });
  10. // 用anyOf合并为一个新的CompletableFuture:
  11. CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
  12. // 两个CompletableFuture执行异步查询:
  13. CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
  14. return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
  15. });
  16. CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
  17. return fetchPrice((String) code, "https://money.163.com/price/");
  18. });
  19. // 用anyOf合并为一个新的CompletableFuture:
  20. CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
  21. // 最终结果:
  22. cfFetch.thenAccept((result) -> {
  23. System.out.println("price: " + result);
  24. });
  25. // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
  26. Thread.sleep(200);
  27. }
  28. static String queryCode(String name, String url) {
  29. System.out.println("query code from " + url + "...");
  30. try {
  31. Thread.sleep((long) (Math.random() * 100));
  32. } catch (InterruptedException e) {
  33. }
  34. return "601857";
  35. }
  36. static Double fetchPrice(String code, String url) {
  37. System.out.println("query price from " + url + "...");
  38. try {
  39. Thread.sleep((long) (Math.random() * 100));
  40. } catch (InterruptedException e) {
  41. }
  42. return 5 + Math.random() * 20;
  43. }
  44. }

等待完成

  1. // 开始 等待所有任务执行完成
  2. CompletableFuture<Void> all = CompletableFuture.allOf(cf1, cf2);
  3. System.out.println("start block");
  4. all.join();
  5. System.out.println("block finish, consume time:" + (System.currentTimeMillis() - start));