Lambda表达式是Java SE 8中一个重要的新特性。
    lambda表达式允许你通过表达式来代替功能接口。
    lambda表达式就和方法一样,它提供了一个正常的参数列表和一个使用这些参数的主体。
    Lambda表达式还增强了集合库。
    Java SE 8添加了2个对集合数据进行批量操作的包: java.util.function 包以及java.util.stream 包。
    流(stream)就如同迭代器(iterator),但附加了许多额外的功能。
    在Lambda表达式中this是指外围实例,而匿名类中的this是指匿名类实例。
    如果想在Lambda表达式里面修改外部变量的值也是可以的,可以将变量定义为非局部变量,即为实例变量或者将变量定义为数组。
    Lambda表达式如果引用某局部变量,则直接将其视为final。

    1.lambda表达式没有命名,用来像传递数据一样传递操作。
    单语句可以不用大括号,也不用return,这个语句执行后既返回,或者是输出
    多语句必须指定返回和进行大括号

    参数,有的时候必须和接口保持一致,参数类型可以是泛型,**但注意lambda并不会指定参数的具体复赋值,需要通过唯一接口函数调用时赋值**

    1. Lambda表达式的语法:
    2. ”基本语法: (parameters) -> expression (parameters) ->{ statements; }
    1. // 1. 不需要参数,返回值为 5
    2. () -> 5
    3. // 2. 接收一个参数(数字类型),返回其2倍的值
    4. x -> 2 * x
    5. // 3. 接受2个参数(数字),并返回他们的差值
    6. (x, y) -> x y
    7. // 4. 接收2个int型整数,返回他们的和
    8. (int x, int y) -> x + y
    9. // 5. 接受一个 string 对象,并在控制台打印,不返回任何值(看起来像是返回void)
    10. (String s) -> System.out.print(s)

    lambda流的概念

    stream的特性

    1.stream不存储数据 2.stream不改变源数据 3.stream的延迟执行特性

    通常我们在数组或集合的基础上创建stream,stream不会专门存储数据,对stream的操作也不会影响到创建它的数组和集合,对于stream的聚合、消费或收集操作只能进行一次,再次操作会报错。延迟性是指当stream的终结操作执行的时候,前面的中间操作才执行。

    当我们操作一个流的时候,一般并不会修改流底层的集合(即使集合是线程安全的),如果遍历的时候删除和添加会抛出ConcurrentModificationException异常,而ls.stream().foreach()的时候调用ls的set方法是可以的(比如ls。set(0,0))是可以修改原来集合的元素,如果集合里存的是引用类型也可以重新set或者直接改变对象里的字段。

    由于stream的延迟执行特性,在聚合操作执行前修改数据源是允许的。并且会影响到流里。

    2.函数接口指的是只有一个抽象方法的接口,被当做是lambda表达式的类型。最好使@FunctionalInterface 注解,防止其他人在里面添加方法。``**只需要在想要执行的地方利用传递的对象调用对应的接口中唯一的方法即可**``。

    用来传递的不是变量,是函数语句
    因为接口传递的是函数语句,因此接口的方法个数只能是一个用来对函数语句的填装和赋值以及调用返回值

    **函数式接口实例的创建有三种方式:1、lambda表达式;2、方法引用;3、构造方法引用。**

    常用的函数式接口主要有四种类型,是通过其输入和输出的参数来进行区分的。定义了编码过程中主要的使用场景。

    JDK原装函数式接口
    常用

    接口名 备注
    Function 接收一个T类型的参数,返回一个R类型的结果
    Consumer 接收一个T类型的参数,不返回值
    Predicate 接收一个T类型的参数,返回一个boolean类型的结果
    Supplier 不接受参数,返回一个T类型的结果

    事实上,只要符合注解和接口规范的,都可以称为函数式接口,函数式接口的实现常用lambda来执行,并通过唯一函数名来进行调用

    例子
    image.png

    1. import java.util.function.Consumer;
    2. public class Test {
    3. public static void main(String[] args) {
    4. Consumer consumer = (a) -> System.out.println("this is " + a);
    5. consumer.accept("123");
    6. }
    7. }

    函数式接口的实现使用了lambda表达式,调用则使用了接口中唯一的方法

    3.异步调用使用lambda传递函数

    1. @Configuration
    2. @EnableAsync
    3. @Slf4j
    4. public class ExecutorConfig {
    5. @Value("${async.executor.thread.core_pool_size}")
    6. private int corePoolSize;
    7. @Value("${async.executor.thread.max_pool_size}")
    8. private int maxPoolSize;
    9. @Value("${async.executor.thread.queue_capacity}")
    10. private int queueCapacity;
    11. @Value("${async.executor.thread.name.prefix}")
    12. private String namePrefix;
    13. @Bean(name = "asyncServiceExecutor")
    14. public Executor asyncServiceExecutor() {
    15. log.info("start asyncServiceExecutor");
    16. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    17. //配置核心线程数
    18. executor.setCorePoolSize(corePoolSize);
    19. //配置最大线程数
    20. executor.setMaxPoolSize(maxPoolSize);
    21. //配置队列大小
    22. executor.setQueueCapacity(queueCapacity);
    23. //配置线程池中的线程的名称前缀
    24. executor.setThreadNamePrefix(namePrefix);
    25. // rejection-policy:当pool已经达到max size的时候,如何处理新任务
    26. // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
    27. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    28. //执行初始化
    29. executor.initialize();
    30. return executor;
    31. }
    32. }

    SpringBoot中的线程池bean创建,这是一个通用线程池,我们想要满足不同种类任务的执行,就必须将执行语句放入runnable中,也可以说是一个任务

    1. @FunctionalInterface
    2. public interface ExecutorFunctional {
    3. void execute() throws Exception;
    4. }
    5. //创建一个函数式接口,用来接收不同任务的语句
    1. public interface AsyncService {
    2. void executeAsync(ExecutorFunctional target);
    3. }
    4. //异步任务的接口,用来对异步任务进行定义规范
    5. //Service层规范中也是要接口的,多增加一个关节就会更加灵活
    1. @Service
    2. public class AsyncServiceImpl implements AsyncService {
    3. @Override
    4. @Async("asyncServiceExecutor")
    5. public void executeAsync(ExecutorFunctional target) {
    6. try {
    7. target.execute();
    8. } catch (Exception e) {
    9. e.printStackTrace();
    10. }
    11. }
    12. }
    13. //实现异步任务
    14. //@Async("asyncServiceExecutor")注解表示这是要使用的runnable
    15. //在这个方法中实现对一类任务的控制,日志信息,基本处理和返回处理,具体的函数由lambda进行传入,因为参数是函数式接口
    1. @GetMapping("func")
    2. public void testFunctionalExecutor() {
    3. // CountDownLatch latch = new CountDownLatch(3);
    4. for (int i = 0; i < 3; i++) {
    5. asyncService.executeAsync(() -> {
    6. int random = (int) (2 + Math.random() * 8);
    7. log.info("线程睡{}秒", random);
    8. Thread.sleep(random * 1000);
    9. // latch.countDown();
    10. log.info("子线程睡{}秒,执行完毕", random);
    11. });
    12. }
    13. // latch.await();
    14. log.info("主线程执行完毕");
    15. }
    16. //使用lambda实例化函数式接口。填充方法语句,准备返回值(其实一般不用,毕竟异步任务将函数提交后就不用管了,这一步在上一个方框内进行处理)
    17. //传入Service参数

    那么要打印线程池信息呢?
    我们应该重写ThreadPoolTaskExecutor类,然后在我们的配置类中间加载重写的类

    1. @Slf4j
    2. public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
    3. private void showThreadPoolInfo(String prefix) {
    4. ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
    5. if (null == threadPoolExecutor) {
    6. return;
    7. }
    8. log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
    9. this.getThreadNamePrefix(),
    10. prefix,
    11. threadPoolExecutor.getTaskCount(),
    12. threadPoolExecutor.getCompletedTaskCount(),
    13. threadPoolExecutor.getActiveCount(),
    14. threadPoolExecutor.getQueue().size());
    15. }
    16. @Override
    17. public void execute(Runnable task) {
    18. showThreadPoolInfo("1. do execute");
    19. super.execute(task);
    20. }
    21. @Override
    22. public void execute(Runnable task, long startTimeout) {
    23. showThreadPoolInfo("2. do execute");
    24. super.execute(task, startTimeout);
    25. }
    26. @Override
    27. public Future<?> submit(Runnable task) {
    28. showThreadPoolInfo("1. do submit");
    29. return super.submit(task);
    30. }
    31. @Override
    32. public <T> Future<T> submit(Callable<T> task) {
    33. showThreadPoolInfo("2. do submit");
    34. return super.submit(task);
    35. }
    36. @Override
    37. public ListenableFuture<?> submitListenable(Runnable task) {
    38. showThreadPoolInfo("1. do submitListenable");
    39. return super.submitListenable(task);
    40. }
    41. @Override
    42. public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
    43. showThreadPoolInfo("2. do submitListenable");
    44. return super.submitListenable(task);
    45. }
    46. }
    1. @Configuration
    2. @EnableAsync
    3. @Slf4j
    4. public class ExecutorConfig {
    5. @Value("${async.executor.thread.core_pool_size}")
    6. private int corePoolSize;
    7. @Value("${async.executor.thread.max_pool_size}")
    8. private int maxPoolSize;
    9. @Value("${async.executor.thread.queue_capacity}")
    10. private int queueCapacity;
    11. @Value("${async.executor.thread.name.prefix}")
    12. private String namePrefix;
    13. @Bean(name = "asyncServiceExecutor")
    14. public Executor asyncServiceExecutor() {
    15. log.info("start asyncServiceExecutor");
    16. // ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    17. ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
    18. //配置核心线程数
    19. executor.setCorePoolSize(corePoolSize);
    20. //配置最大线程数
    21. executor.setMaxPoolSize(maxPoolSize);
    22. //配置队列大小
    23. executor.setQueueCapacity(queueCapacity);
    24. //配置线程池中的线程的名称前缀
    25. executor.setThreadNamePrefix(namePrefix);
    26. // rejection-policy:当pool已经达到max size的时候,如何处理新任务
    27. // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
    28. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    29. //执行初始化
    30. executor.initialize();
    31. return executor;
    32. }
    33. }