简述: Java 8引入了全新的Stream API。这里的Stream和I/O流不同,它更像具有Iterable的集合类,但行为和集合类又有所不同。

最新添加的Stream API(java.util.stream) 把真正的函数式编程风格引入到Java中。这是目前为止对Java类库最好的补充,因为Stream API可以极大提供Java程序员的生产力,让程序员写出高效率、干净、简洁的代码。

什么是流

Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的 Iterator。原始版本的 Iterator,用户只能显式地一个一个遍历元素并对其执行某些操作;高级版本的 Stream,用户只要给出需要对其包含的元素执行什么操作,比如 “过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等,Stream 会隐式地在内部进行遍历,做出相应的数据转换。

你可以将 Streams 视为 Java 中第一个充分利用了lambda表达式的强大功能的库,但它没有什么特别奇妙的地方(尽管它被紧密集成到核心 JDK 库中)。Streams 不是该语言的一部分 — 它是一个精心设计的库,充分利用了一些较新的语言特性。

Stream 就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。

而和迭代器又不同的是,Stream 可以并行化操作,迭代器只能命令式地、串行化操作。顾名思义,当使用串行方式去遍历时,每个 item 读完后再读下一个 item。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。Stream 的并行操作依赖于 Java7 中引入的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。Java 的并行 API 演变历程基本如下:

  • 1.0-1.4 中的 java.lang.Thread
  • 5.0 中的 java.util.concurrent
  • 6.0 中的 Phasers 等
  • 7.0 中的 Fork/Join 框架
  • 8.0 中的 Lambda

Stream 的另外一大特点是,数据源本身可以是无限的。
**

集合和流的差异

  • 流并不存储其元素
  • 流的操作不会修改其数据源
  • 流的操作是尽可能惰性执行的

流的创建

  • Collection接口下的两个方法可以获取流
  1. public interface Collection<E> extends Iterable<E> {
  2. //将集合转换为一个流
  3. default Stream<E> stream() {
  4. return StreamSupport.stream(spliterator(), false);
  5. }
  6. //产生当前集合中所有元素的顺序流或并行流
  7. default Stream<E> parallelStream() {
  8. return StreamSupport.stream(spliterator(), true);
  9. }
  10. }
  • 数组的静态Stream.of方法
  1. public interface Stream<T> extends BaseStream<T, Stream<T>> {
  2. //产生一个元素为给定值的流
  3. public static<T> Stream<T> of(T t) {
  4. return StreamSupport.stream(new Streams.StreamBuilderImpl<>(t), false);
  5. }
  6. //产生一个元素为给定值的流,这里是不定长参数
  7. public static<T> Stream<T> of(T... values) {
  8. return Arrays.stream(values);
  9. }
  10. }
  • Array.stream(array, from, to)方法可以从数组中位于from(包括)和to(不包括)的元素中创建一个流
  1. public class Arrays {
  2. public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
  3. return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
  4. }
  5. public static <T> Stream<T> stream(T[] array) {
  6. return stream(array, 0, array.length);
  7. }
  8. }
  • 创建不包含任何元素的流
  1. public interface Stream<T> extends BaseStream<T, Stream<T>> {
  2. public static<T> Stream<T> empty() {
  3. return StreamSupport.stream(Spliterators.<T>emptySpliterator(), false);
  4. }
  5. }
  • 创建无限流的静态方法
  1. public interface Stream<T> extends BaseStream<T, Stream<T>> {
  2. //接受一个不包含任何引元的函数(从技术上将,是一个Supplier<T>接口的对象),创建无限流
  3. public static<T> Stream<T> generate(Supplier<T> s) {
  4. Objects.requireNonNull(s);
  5. return StreamSupport.stream(
  6. new StreamSpliterators.InfiniteSupplyingSpliterator.OfRef<>(Long.MAX_VALUE, s), false);
  7. }
  8. //接受一个“种子”值,以及一个函数(UnaryOperation<T>),产生无限序列
  9. public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) {
  10. Objects.requireNonNull(f);
  11. final Iterator<T> iterator = new Iterator<T>() {
  12. @SuppressWarnings("unchecked")
  13. T t = (T) Streams.NONE;
  14. @Override
  15. public boolean hasNext() {
  16. return true;
  17. }
  18. @Override
  19. public T next() {
  20. return t = (t == Streams.NONE) ? seed : f.apply(t);
  21. }
  22. };
  23. return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
  24. iterator,
  25. Spliterator.ORDERED | Spliterator.IMMUTABLE), false);
  26. }
  27. }

Java API中还有大量可以创建流的方法,这里就不一一列举了。

  1. //Collection接口的stream()方法
  2. ArrayList<Integer> arrayList = new ArrayList<>();
  3. arrayList.add(1);
  4. Stream<Integer> stream = arrayList.stream();
  5. //并行流
  6. Stream<Integer> parallelStream = arrayList.parallelStream();
  7. //静态Stream.of()方法
  8. Stream<Integer> stream1 = Stream.of(1,2,3,4);
  9. Stream stream3 = Stream.empty();
  10. //Arrays.stream()方法,把数组中的元素创建一个流
  11. int[] sum = {1,2,3,4,5};
  12. Stream<Integer> stream2 = (Stream<Integer>) Arrays.stream(sum);

流的操作

流的操作类型分为两种:

  • Intermediate(转换操作):一个流可以后面跟随零个或多个 intermediate 操作。其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说,仅仅调用到这类方法,并没有真正开始流的遍历。
  • Terminal(终止操作):一个流只能有一个 terminal 操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以这必定是流的最后一个操作。Terminal 操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个 side effect。

具体方法

filter、map、flatMap方法

  1. public interface Stream<T> extends BaseStream<T, Stream<T>> {
  2. //返回一个流,该流包含与给定谓词匹配的该流的元素。
  3. Stream<T> filter(Predicate<? super T> predicate);
  4. //返回一个流,该流包括将给定函数应用于此流的元素的结果。
  5. <R> Stream<R> map(Function<? super T, ? extends R> mapper);
  6. //返回一个流,该流包括将流中的每个元素替换为通过将提供的映射函数应用于每个元素
  7. //而生成的映射流的内容而得到的结果。
  8. <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
  9. }

filter转换会产生一个流,它的元素与某种条件相匹配。它的引元是Predicate,即从T到boolean的函数。
Predicate就是一个函数式接口,是一个布尔值函数,即里面的test方法返回的是一个布尔值。

  1. //转换为只包含长单词的另一个流
  2. List<String> wordList = ...;
  3. Stream<String> longWords = wordList.stream().filter(w -> w.length() > 12);

map方法可以按照某种方式来转换流中的值,传递执行该转换的函数。

  1. //将所有单词都转换为小写
  2. Stream<String> lowerCaseWords = words.stream().map(String::toLowerCase);
  3. //也可以使用java8的新特性lambda表达式
  4. //产生的流中包含了所有单词的首字母
  5. Stream<String> firstLetters = words.stream().map(s -> s.substring(0,1));

如果我们在一个字符串流上执行映射操作

  1. Stream<Stream<String>> result = words.stream().map((s -> s.substring(0,1));
  2. //result: 得到一个流的流

就会得到一个流的流,比如 [… [“b”,”o”,”a”,”t”], [“a”,”a”,”a”], …],这并不是我们想要的,如果我们想要摊平它变回一个流 [… “b”,”o”,”a”,”t”, “a”,”a”,”a”, …],我们可以使用flatMap方法。

  1. Stream<String> flatResult = words.stream().flatMap(s -> s.substring(0,1));

抽取子流和连接流

  1. public interface Stream<T> extends BaseStream<T, Stream<T>> {
  2. //返回由该流的元素组成的流,其maxSize长度被截断为不超过长度
  3. Stream<T> limit(long maxSize);
  4. //在丢弃流的第一个n元素之后,返回由该流的其余元素组成的流。
  5. Stream<T> skip(long n);
  6. //创建一个延迟串联的流,其元素是第一个流的所有元素,然后是第二个流的所有元素。
  7. public static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b) {
  8. Objects.requireNonNull(a);
  9. Objects.requireNonNull(b);
  10. @SuppressWarnings("unchecked")
  11. Spliterator<T> split = new Streams.ConcatSpliterator.OfRef<>(
  12. (Spliterator<T>) a.spliterator(), (Spliterator<T>) b.spliterator());
  13. Stream<T> stream = StreamSupport.stream(split, a.isParallel() || b.isParallel());
  14. return stream.onClose(Streams.composedClose(a, b));
  15. }
  16. }

调用limit方法会返回一个新的流,它在n个元素之后结束(如果原来的流更短,那么就会在流结束时结束)。这个方法对于裁剪无限流的尺寸会显得特别有用

  1. Stream<Double> randoms = Stream.generate(Math::random).limit(100);

调用skip方法正好相反,它会丢弃前n个元素。例如在将文本分隔为单词时,按照split方法的工作方式,第一个元素是没什么用的空字符串,这时候可以用skip跳过它

  1. Stream<String> words = Stream.of(contents.split("\\PL+")).skip(1);

concat方法可以将两个流连接起来。当然第一流不应该是无限的,否则第二个流永远都不会得到处理的机会。

Stream<String> combined = Stream.concat(stream1, stream2);

其他的流转换

public interface Stream<T> extends BaseStream<T, Stream<T>> {

    //返回由该流的不同元素组成的流(根据 Object.equals(Object))。
    Stream<T> distinct();

    //返回由该流的元素组成的流,并根据自然顺序排序。
    Stream<T> sorted();

    //返回由该流的元素组成的流,并根据提供的进行排序Comparator。
    Stream<T> sorted(Comparator<? super T> comparator);

    //返回由该流的元素组成的流,并在从结果流中消耗元素时对每个元素另外执行提供的操作。
    Stream<T> peek(Consumer<? super T> action);
}

distinct方法会返回一个流,它的元素是原有流中经过剔除重复元素后产生的。这个流能够记住它已经看到过的元素。

Stream<String> uniqueWords = Stream.of("merrily", "merrily", "merrily", "merrily", "gently").distinct();
//只有一个merrily

sorted方法会产生一个新的流,并会对元素进行排序。其中一种用于操作Comparable元素的流,而另一种可以接受一个Comparator。

Stream<String> longestFirst = words.stream().sorted(Compartor.comparing(String::length).reversed());

peek方法会产生另一个流,它的元素与原来的流中的元素相同,但是在每次获取一个元素时,都会调用一个函数。

//实际访问一个元素时,就会打印出来一条消息
Object[] powers = Stream.iterate(1.0, p -> p*2).peek(e -> System.out.println(e).limit(20).toArray());

约简操作

约简是一种终结操作,它们会将流约简为可以在程序中使用的非法值。

public interface Stream<T> extends BaseStream<T, Stream<T>> {

    //返回此流中的元素计数。
    long count();

    //根据提供的返回此流的最小元素 Comparator。
    Optional<T> min(Comparator<? super T> comparator);

    //根据提供的返回此流的最大元素 Comparator。
    Optional<T> max(Comparator<? super T> comparator);

    //返回Optional描述此流的第一个元素的描述;Optional如果流为空,则返回null 。
    Optional<T> findFirst();

    //返回Optional描述流中某些元素的描述;Optional如果流为空,则返回空。
    Optional<T> findAny();

    //返回此流的任何元素是否与提供的谓词匹配。
    boolean anyMatch(Predicate<? super T> predicate);

    //返回此流的所有元素是否与提供的谓词匹配。
    boolean allMatch(Predicate<? super T> predicate);

    //返回此流中是否没有元素与提供的谓词匹配。
    boolean noneMatch(Predicate<? super T> predicate);

    //使用提供的标识值和关联 累加函数对此流的元素 执行归约,然后返回归约后的值。
    T reduce(T identity, BinaryOperator<T> accumulator);

    //执行减少有关此流的元件,使用 缔合累积功能,并返回一个Optional描述该减小值,如果有的话。
    Optional<T> reduce(BinaryOperator<T> accumulator);

    //执行减少有关此流的元件,使用所提供的身份,积累和组合功能。
    <U> U reduce(U identity,
                 BiFunction<U, ? super T, U> accumulator,
                 BinaryOperator<U> combiner);


}

count方法就是一种简单约简,它会返回流中元素的数量

Stream<String> words = Stream.of("A","B","C");
System.out.println(words.count());
//输出 :3

max和min方法会返回流中元素的最大值和最小值。另外,这些方法返回的是一个类型Optional的值,它要么在其中包装了答案,要么表示没有任何值(因为流碰巧为空)。

Stream<String> words = Stream.of("A","B","C");
System.out.println(words.max(String::compareToIgnoreCase));
//输出 Optional[C]

findFirst返回的是非空集合中的第一个值。它通常会在与filter组合使用时显得很有用。例如,下面展示了如何找到第一个以字母A开头的单词。如果没有则Optional为空

Stream<String> words = Stream.of("ADD","BDD","CDD");
System.out.println(words.filter(s -> s.startsWith("A")).findFirst());
//输出 Optional[ADD]

如果不强调使用第一个匹配,而是使用任意的匹配都可以,那么就可以使用findAny方法。

如果只想知道是否存在匹配,那么可以使用anyMatch。这个方法会接受一个断言引元,因此不需要使用filter。
同样的allMatch和noneMatch方法,它们分别会在所有元素和没有任何元素匹配断言的情况下返回true。

boolean aWordStartsWithQ = words.parallel().anyMatch(s -> s.startsWith("A"));

reduce方法是一种用于从流中计算某个值的通用机制,其最简单的形式将接受一个二元函数,并从前两个元素开始持续应用它。比如求和函数:

List<Integer> values =...;
Optional<Integer> sum = values.stream().reduce((x,y) -> x+y);
//reduce会计算v0+v1+v2+...,如果流为空,会返回空Optional
//上面也可写成reduce(Integer::sum);

如果reduce有一项约简操作op,那么该约简就会产生 v0 op v1 op v2 op … , 其中我们调用函数op(vi,vi+1)写作vi op vi+1。这项操作应该是可结合的

通常会有一个幺元值e,使得 e op x = x,可以使用这个元素作为计算的起点。然后可以调用第二种形式的reduce,如果流为空,则会返回幺元值,就不再需要处理Optional类了

List<Integer> values =...;
Optional<Integer> sum = values.stream().reduce(0, (x,y) -> x+y);

假设你有一个对象流,并且想对某些属性求和,例如字符串流中的所有字符串的长度,那么可以提供一个“累积器”函数(total, word) -> total + word.length()。而且需要合并结果,因此需要提供第二个函数执行处理。例如

int result = words.reduce(0, (total, word) -> total + word.length(),
                          (total1, total2) -> total1 + total2);

收集结果

public interface BaseStream<T, S extends BaseStream<T, S>>
        extends AutoCloseable {

    //产生一个用于获取当前流中各个元素的迭代器。这是一种终结操作    
    Iterator<T> iterator();

}
public interface Stream<T> extends BaseStream<T, Stream<T>> {

    //在流的每个元素上调用action,这是一种终结操作
    void forEach(Consumer<? super T> action);

    //如果流具有定义的遇到顺序,则按流的遇到顺序对此流的每个元素执行操作。
    void forEachOrdered(Consumer<? super T> action);

    //返回包含此流元素的数组。
    Object[] toArray();

    //产生一个对象数组,或者在将构造器引用传递进去时,返回一个对应类型的数组。终结操作
    <A> A[] toArray(IntFunction<A[]> generator);

    //在此流的元素上执行可变的归约运算。
    <R> R collect(Supplier<R> supplier,
                  BiConsumer<R, ? super T> accumulator,
                  BiConsumer<R, R> combiner);

    //使用给定的收集器来收集当前流中的元素
    <R, A> R collect(Collector<? super T, A, R> collector);
}

当处理完流,通常会想要查看其元素。此时可以调用iterator方法,它会产生可以用来访问元素的旧式风格的迭代器。

或者可以调用forEach方法,将某个函数应用于每个元素。在并行流上,forEach方法会以任意顺序遍历各个元素。也可以用forEachOrdered方法按照流中的顺序来处理。不过这个方法会丧失并行处理的部分甚至全部优势。

//遍历输出流中的元素
stream.forEach(System.out::println);

toArray方法可以获得由流的元素构成的数组,但是由于无法在运行时创建泛型数组,所以会返回一个Object[]数组。也可以通过传递构造器引用来获取正确的数组类型。

//将流转换为数组,获得Object[]数组
Object[] result = stream.toArray();
//传入String的构造器引用,获得String[]数组
String[] result = stream.toArray(String::new);

针对将流中的元素收集到另一个目标中,有一个便捷方法collect可用,它会接受一个Collector接口的实例。Collectors类提供了大量用于生成公共收集器的工厂方法。调用collect方法可以进行多种操作:

//将流收集到列表中
List<String> result = stream.collect(Collectors.toList());
//将流收集到集中
Set<String> result = stream.collect(Collectors.toSet());

//如果想要获取集的具体实现类,可以
TreeSet<String> result = stream.collect(Collectors.toCollection(TreeSet::new));

//如果想要通过连接操作收集流中的所有字符串
String result = stream.collect(Collectors.joining());

//还可以在元素之间添加分隔符
String result = stream.collect(Collectors.joining(","));

基本类型流

之前我们都是将整数收集到Stream中,尽管很明显,将每个整数都包装到包装器对象中是很低效的。对其他基本类型来说,情况也是一样的,这些基本类型是:double、float、short、char、byte和boolean。流库中有专门的类型IntStream、LongStream和DoubleStream,用来直接存储基本类型值。

如果想要存储short,char,byte和boolean,可以使用IntStream;
对于float,可以使用DoubleStream。

仅列举一个为例,其余两个API都拥有相似的方法

public interface IntStream extends BaseStream<Integer, IntStream> {

    //计算当前流的总和
    int sum();

    //计算当前流的最小值
    OptionalInt min();

    //计算当前流的最大值
    OptionalInt max();

    //计算当前流的平均值
    OptionalDouble average();

    //获取这些结果的所有四种值的对象
    IntSummaryStatistics summaryStatistics();

    //产生一个由给定元素构成的IntStream
    public static IntStream of(int t) {
        return StreamSupport.intStream(new Streams.IntStreamBuilderImpl(t), false);
    }

    //产生一个由给定元素构成的IntStream
    public static IntStream of(int... values) {
        return Arrays.stream(values);
    }

    //产生一个由给定范围内的整数构成的IntStream
    public static IntStream range(int startInclusive, int endExclusive) {
        if (startInclusive >= endExclusive) {
            return empty();
        } else {
            return StreamSupport.intStream(
                    new Streams.RangeIntSpliterator(startInclusive, endExclusive, false), false);
        }
    }

    //产生一个由给定范围内的整数构成的IntStream
    public static IntStream rangeClosed(int startInclusive, int endInclusive) {
        if (startInclusive > endInclusive) {
            return empty();
        } else {
            return StreamSupport.intStream(
                    new Streams.RangeIntSpliterator(startInclusive, endInclusive, true), false);
        }
    }

    //产生用于当前流中的元素的包装器对象流
    Stream<Integer> boxed();

    //产生一个由当前流中的元素构成的数组
    int[] toArray();
}

当你拥有一个对象流时,可以用mapToInt、mapToLong和mapToDouble将其转换为基本类型流。

Stream<String> words = ... ;
IntStream lengths = words.mapToInt(String::length);

将基本类型流转换为对象流则需要使用boxed方法

Stream<Integer> integers = IntStream.range(0,100).boxed();
//range方法可以生成步长为1的整数范围的流
//boxed方法将基本类型流包装为包装类

并行流

在上面流的介绍里,我们谈到的流大多数是串行流,而上文也提到过并行流。在流的创建里,stream()方法就是创建串行流,parallelStream()方法创建的是并行流。那么他们的区别是什么呢?

串行流,即单线程执行的;并行流,即多线程执行操作。
**

public interface Collection<E> extends Iterable<E> { 
    //用当前集合中的元素产生一个并行流
    default Stream<E> parallelStream() {
        return StreamSupport.stream(spliterator(), true);
    }
}
public interface BaseStream<T, S extends BaseStream<T, S>>
        extends AutoCloseable {

    //产生一个与当前流中元素相同的并行流
    S parallel();

    //产生一个与当前流中元素相同的无序流
    S unordered();
}

**
流使得并行处理块操作变得更容易。这个过程几乎是自动的,但是需要遵守一些规则,首先,必须有一个并行流。可以用Collection.parallelStream()方法从任何集合中获取一个并行流:
而且,parallel方法可以将任意的顺序流转换为并行流。

Stream<String> parallelWords = words.parallelStream();

Stream<String> parallelWords = Stream.of(wordArray).parallel();

只要在终结方法执行时,流处于并行模式,那么所有的中间流操作都将被并行化。

在Java中,并行流使用默认的fork-join池(ForkJoinPool)来操作流的各个部分,并且该池是所有并行流共享的。
**

并行流的作用

并行流就是支持多线程操作的流,它使得并行处理变得简单。

Stream具有平行处理能力,处理的过程会分而治之,也就是将一个大任务切分成多个小任务,这表示每个任务都是一个操作。

举一个简单的栗子看效果:

//创建一个串行流,并且遍历输出全部元素
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.stream()
       .forEach(System.out::println);

// 结果是 1 2 3 4 5 6 7 8 9

//创建一个并行流,并且遍历输出全部元素
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
       .forEach(System.out::println);

//结果是 6 5 7 9 1 8 4 3 2  (但其实结果每次都不一样)

上面的栗子可以看到,并行流在进行操作时,将一个大操作分成了多个小操作并行进行,再将结果组合起来,于是输出的结果顺序是任意顺序。

倘若你想要结果按照原来元素的顺序,就上面的例子,你可以这样做

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
       .forEachOrdered(System.out::println);
//输出 1 2 3 4 5 6 7 8 9

注意:如果forEachOrdered()中间有其他如filter()的中介操作,会试着平行化处理,然后最终forEachOrdered()会以原数据顺序处理,因此,使用forEachOrdered()这类的有序处理,可能会(或完全失去)失去平行化的一些优势,实际上中介操作亦有可能如此,例如sorted()方法。

并行流的性能问题

要想深入的研究parallelStream,我们必须先了解ForkJoin框架和ForkJoinPool。

简单了解Fork/Join 框架

Fork/Join 框架是 Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

我们再通过 Fork 和 Join 这两个单词来理解下 Fork/Join 框架,Fork 就是把一个大任务切分为若干子任务并行的执行,Join 就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算 1+2+。。+10000,可以分割成 10 个子任务,每个子任务分别对 1000 个数进行求和,最终汇总这 10 个子任务的结果。

ForkJoinPool


Java 8为ForkJoinPool添加了一个通用线程池,这个线程池用来处理那些没有被显式提交到任何线程池的任务。它是ForkJoinPool类型上的一个静态元素,它拥有的默认线程数量等于运行计算机上的处理器数量。当调用Arrays类上添加的新方法时,自动并行化就会发生。比如用来排序一个数组的并行快速排序,用来对一个数组中的元素进行并行遍历。自动并行化也被运用在Java 8新添加的Stream API中。

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
       .forEach(System.out::println);

对于列表中的元素的操作都会以并行的方式执行。forEach方法会为每个元素的计算操作创建一个任务,该任务会被前文中提到的ForkJoinPool中的通用线程池处理。以上的并行计算逻辑当然也可以使用ThreadPoolExecutor完成,但是就代码的可读性和代码量而言,使用ForkJoinPool明显更胜一筹。

并行流的陷阱

从java8开始,并行编程变得很容易,通过并行流(parallelStream),可以很轻松的实现多线程并行处理。但是,这里面有个性能“陷阱”,如果不注意,使用并行流的效果反而更差,这个陷阱是什么呢?

这个陷阱就是,并行流默认都是用同一个默认的ForkJoinPool,这个ForkJoinPool的线程数和CPU的核心数相同。如果是计算密集型的操作,直接使用是没有问题的,因为这个ForkJoinPool会将所有的CPU打满,系统资源是没有浪费的。但是,如果其中还有IO操作或等待操作,这个默认的ForkJoinPool只能消耗一部分CPU,而另外的并行流因为获取不到该ForkJoinPool的使用权,性能将大大降低。可见,默认的ForkJoinPool必须只能处理计算密集型的任务


Optional类型

Optional 对象是一种包装器对象,要么包装了类型T的对象,要么没有包装任何对象。

对于第一种情况,我们称为这种值为存在的。Optional类型被当作一种更安全的方式,用来替代类型T的引用,这种引用要么引用某个对象,要么为null。

Optional 类的引入很好的解决空指针异常。

创建Optional值

public final class Optional<T> {

    ////产生一个具有给定值的Optional。如果value为null,那么会抛出NullPointerException异常
    public static <T> Optional<T> of(T value) {
        return new Optional<>(value);
    }

    //产生一个具有给定值的Optional。如果value为null,那么会产生一个空Optional
    public static <T> Optional<T> ofNullable(T value) {
        return value == null ? empty() : of(value);
    }

    //产生一个空的Optional
    public static<T> Optional<T> empty() {
        @SuppressWarnings("unchecked")
        Optional<T> t = (Optional<T>) EMPTY;
        return t;
    }
}

例如:

public static Optional<Double> inverse(Double x){
    return x == 0 ? Optional.empty() : Optional.of(1 / x);
}

ofNullable方法被用来作为可能出现的null值和可选值之间的桥梁。该方法会在obj不为null的情况下返回Optional.of(obj),否则返回Optional.empty()。

使用Optional值

public final class Optional<T> { 

    //产生这个Optional的值,或者在该Optional为空时,产生other
    public T orElse(T other) {
        return value != null ? value : other;
    }

    //产生这个Optional的值,或者在该Optional为空时,产生调用other的结果
    public T orElseGet(Supplier<? extends T> other) {
        return value != null ? value : other.get();
    }

    //产生将该Optional的值传递给mapper后的结果,只要这个Optional不为空且结果不为null,
    //否则产生一个空Optional
    public <X extends Throwable> T orElseThrow(Supplier<? extends X> exceptionSupplier) throws X {
        if (value != null) {
            return value;
        } else {
            throw exceptionSupplier.get();
        }
    }

    //如果该Optional不为空,那么就将它的值传递给consumer
    public void ifPresent(Consumer<? super T> consumer) {
        if (value != null)
            consumer.accept(value);
    }

    //产生将该Optional的值传递给mapper后的结果,只要这个Optional不为空且结果不为null,否则产生空Optional
    public<U> Optional<U> map(Function<? super T, ? extends U> mapper) {
        Objects.requireNonNull(mapper);
        if (!isPresent())
            return empty();
        else {
            return Optional.ofNullable(mapper.apply(value));
        }
    }
}

有效使用Optional的关键是要使用这样的方法:它在值不存在的情况下会产生一个可替代物,而只有在值存在的情况下才会使用这个值。

//有值使产生string,无值时产生空字符串""
String result = optionalString.orElse("");

//值不存在的时候调用这个lambda表达式,计算默认值
String result = optionalString.orElseGet(() -> Local.getDefault().getDisplayName());

//在没有任何值时抛出异常
String result = optionalString.orElseThrow(IllegalStateException::new);

上面是在不存在任何值的情况下产生相应的替代物。另一种使用可选值的策略是只有其存在的情况下才消费该值

ifPresent方法会接受一个函数。如果该可选值存在,那么它会被传递给该函数。否则,不发生任何事情。

//如果值存在时添加到某个集合中
optionaValue.ifPresent(v -> results.add(v));
//或者这样用,lambda表达式的方法引用
optionaValue.ifPresent(results::add);

当调用ifPresent时,从该函数不会返回任何值。如果想要处理函数的结果,应该使用map

Optional<Boolean> added = optionalValue.map(results::add);
//若值存在,Optional为处理结果true或者false,若不存在,则为空Optional

不适合使用Optional的方式

如果没有正确地使用Optional值,那么相比较以往地得到“某物或null”地方式,你并没有得到任何好处。

public final class Optional<T> {

    //产生这个Optional的值,或者在该Optional为空时,抛出一个NoSuchElementException异常
    public T get() {
        if (value == null) {
            throw new NoSuchElementException("No value present");
        }
        return value;
    }

    //如果该Optional不为空,则返回true
    public boolean isPresent() {
        return value != null;
    }
}

get方法会在Optional值存在的情况下获得其中包装的元素,或者在不存在的情况下抛出一个NoSuchElementEception对象。因此,

Optional<T> optionalValue = ...;
optionalValue.get().someMethod();

//并不比下面的方式更安全

T value = ...;
value.someMehtod();

isPresent方法报告某个Optional对象是否具有一个值。但是

if(optionalValue.isPresent())optionalValue.get().someMethod();

//并不比下面的方式更容易处理

if(value != null)value.someMethod();

flatMap构建Optional值

假如你有一个可以产生Optional对象的方法f,并且目标类型T具有一个可以产生Optional对象的方法g。如果他们是普通的方法,你可以通过s.f().g()来组合起来,但是这种组合无法工作,因为s.f()的类型为Optional,而不是T。

public final class Optional<T> {

    //产生将mapper应用于当前Optional值所产生的结果,或者在当前Optional为空时,返回一个空Optional
    public<U> Optional<U> flatMap(Function<? super T, Optional<U>> mapper) {
        Objects.requireNonNull(mapper);
        if (!isPresent())
            return empty();
        else {
            return Objects.requireNonNull(mapper.apply(value));
        }
    }
}

为了解决这个问题,我们可以调用flatMap方法。

Optional<Y> result = s.f().flatMap(T::g);
//如果s.f()的值存在,那么可以调用T.g(),否则返回一个空Optional<Y>

Collectors类型

Collector<T, A, R>是一个接口,其中Collectors类有用于多种收集器的工厂方法

在流里,collect()方法大量运用到了Collectors里的工厂方法进行收集操作等,该方法里具体能做到什么取决于Collectiors类。

由于该类型的收集器工厂方法太多了,下面列举了部分方法,不一一举例了,需要用到的时候再详细研究。

收集结果

下面这些方法在上面流里有提到过,就不再具体举例。

public final class Collectors {

    //将元素收集到列表中的收集器
    public static <T>
    Collector<T, ?, List<T>> toList() {
        return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
                                   (left, right) -> { left.addAll(right); return left; },
                                   CH_ID);
    }

    //将元素收集到集中的收集器
    public static <T>
    Collector<T, ?, Set<T>> toSet() {
        return new CollectorImpl<>((Supplier<Set<T>>) HashSet::new, Set::add,
                                   (left, right) -> { left.addAll(right); return left; },
                                   CH_UNORDERED_ID);
    }

    //将元素收集到任意集合中的收集器
    public static <T, C extends Collection<T>>
    Collector<T, ?, C> toCollection(Supplier<C> collectionFactory) {
        return new CollectorImpl<>(collectionFactory, Collection<T>::add,
                                   (r1, r2) -> { r1.addAll(r2); return r1; },
                                   CH_ID);
    }

    //连接字符串的收集器
    public static Collector<CharSequence, ?, String> joining() {
        return new CollectorImpl<CharSequence, StringBuilder, String>(
                StringBuilder::new, StringBuilder::append,
                (r1, r2) -> { r1.append(r2); return r1; },
                StringBuilder::toString, CH_NOID);
    }

    //连接字符串,并以指定分隔符分隔的收集器
    public static Collector<CharSequence, ?, String> joining(CharSequence delimiter) {
        return joining(delimiter, "", "");
    }

    //连接字符串,并以指定分隔符分隔,第一个字符串之前可以有前缀,最后一个字符串有后缀的收集器
    public static Collector<CharSequence, ?, String> joining(CharSequence delimiter,
                                                             CharSequence prefix,
                                                             CharSequence suffix) {
        return new CollectorImpl<>(
                () -> new StringJoiner(delimiter, prefix, suffix),
                StringJoiner::add, StringJoiner::merge,
                StringJoiner::toString, CH_NOID);
    }
}

收集到映射表

public final class Collectors {

    /**
     *产生一个收集器,它会产生一个映射表或并发映射表。keyMapper和valueMapper函数
     *会应用于每个收集到的元素上,从而在所产生的映射表中生存一个键值项。默认情况下,当
     *两个元素产生相同的键时,会抛出一个IllegalStateException异常。你可以提供一个
     *mergeFunction来合并具有相同键的值。默认情况下,其结果是一个HashMap或ConcurrentHashMap。
     *你可以提供一个mapSupplier,它会产生所期望的映射表实例
     */
    public static <T, K, U>
    Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper,
                                    Function<? super T, ? extends U> valueMapper) {
        return toMap(keyMapper, valueMapper, throwingMerger(), HashMap::new);
    }

    public static <T, K, U>
    Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper,
                                    Function<? super T, ? extends U> valueMapper,
                                    BinaryOperator<U> mergeFunction) {
        return toMap(keyMapper, valueMapper, mergeFunction, HashMap::new);
    }

    public static <T, K, U, M extends Map<K, U>>
    Collector<T, ?, M> toMap(Function<? super T, ? extends K> keyMapper,
                                Function<? super T, ? extends U> valueMapper,
                                BinaryOperator<U> mergeFunction,
                                Supplier<M> mapSupplier) {
        BiConsumer<M, T> accumulator
                = (map, element) -> map.merge(keyMapper.apply(element),
                                              valueMapper.apply(element), mergeFunction);
        return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), CH_ID);
    }

    public static <T, K, U>
    Collector<T, ?, ConcurrentMap<K,U>> toConcurrentMap(Function<? super T, ? extends K> keyMapper,
                                                        Function<? super T, ? extends U> valueMapper) {
        return toConcurrentMap(keyMapper, valueMapper, throwingMerger(), ConcurrentHashMap::new);
    }

    public static <T, K, U>
    Collector<T, ?, ConcurrentMap<K,U>>
    toConcurrentMap(Function<? super T, ? extends K> keyMapper,
                    Function<? super T, ? extends U> valueMapper,
                    BinaryOperator<U> mergeFunction) {
        return toConcurrentMap(keyMapper, valueMapper, mergeFunction, ConcurrentHashMap::new);
    }

    public static <T, K, U, M extends ConcurrentMap<K, U>>
    Collector<T, ?, M> toConcurrentMap(Function<? super T, ? extends K> keyMapper,
                                       Function<? super T, ? extends U> valueMapper,
                                       BinaryOperator<U> mergeFunction,
                                       Supplier<M> mapSupplier) {
        BiConsumer<M, T> accumulator
                = (map, element) -> map.merge(keyMapper.apply(element),
                                              valueMapper.apply(element), mergeFunction);
        return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), CH_CONCURRENT_ID);
    }
}

基础数据类型收集器

public final class Collectors { 

    //产生能够生产IntSummaryStatistics对象的收集器,通过它可以获得将mapper应用于
    //每个元素后所产生的结果的个数、总和、平均值、最大值和最小值
    public static <T>
    Collector<T, ?, IntSummaryStatistics> summarizingInt(ToIntFunction<? super T> mapper) {
        return new CollectorImpl<T, IntSummaryStatistics, IntSummaryStatistics>(
                IntSummaryStatistics::new,
                (r, t) -> r.accept(mapper.applyAsInt(t)),
                (l, r) -> { l.combine(r); return l; }, CH_ID);
    }

    //产生能够生产LongSummaryStatistics对象的收集器,通过它可以获得将mapper应用于
    //每个元素后所产生的结果的个数、总和、平均值、最大值和最小值
    public static <T>
    Collector<T, ?, LongSummaryStatistics> summarizingLong(ToLongFunction<? super T> mapper) {
        return new CollectorImpl<T, LongSummaryStatistics, LongSummaryStatistics>(
                LongSummaryStatistics::new,
                (r, t) -> r.accept(mapper.applyAsLong(t)),
                (l, r) -> { l.combine(r); return l; }, CH_ID);
    }

    //产生能够生产DoubleSummaryStatistics对象的收集器,通过它可以获得将mapper应用于
    //每个元素后所产生的结果的个数、总和、平均值、最大值和最小值
    public static <T>
    Collector<T, ?, DoubleSummaryStatistics> summarizingDouble(ToDoubleFunction<? super T> mapper) {
        return new CollectorImpl<T, DoubleSummaryStatistics, DoubleSummaryStatistics>(
                DoubleSummaryStatistics::new,
                (r, t) -> r.accept(mapper.applyAsDouble(t)),
                (l, r) -> { l.combine(r); return l; }, CH_ID);
    }
}

群组和分区

public final class Collectors { 

   //产生一个收集器,它会产生一个映射表,
   //其键是将classifier应用于所有收集到的元素上所产生的结果,而值是由具有相同键的元素构成的一个个列表
    public static <T, K> Collector<T, ?, Map<K, List<T>>>
    groupingBy(Function<? super T, ? extends K> classifier) {
        return groupingBy(classifier, toList());
    }

   //产生一个收集器,它会产生一个并发映射表
   //其键是将classifier应用于所有收集到的元素上所产生的结果,而值是由具有相同键的元素构成的一个个列表
   public static <T, K>
    Collector<T, ?, ConcurrentMap<K, List<T>>>
    groupingByConcurrent(Function<? super T, ? extends K> classifier) {
        return groupingByConcurrent(classifier, ConcurrentHashMap::new, toList());
    }

   //产生一个收集器,他会产生一个映射表,其键是true/false,而值是由满足/不满足断言的元素构成的列表
    public static <T>
    Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(Predicate<? super T> predicate) {
        return partitioningBy(predicate, toList());
    }
}

下游收集器

public final class Collectors { 

    //产生一个可以对收集到的元素进行计数的收集器
    public static <T> Collector<T, ?, Long>
    counting() {
        return reducing(0L, e -> 1L, Long::sum);
    }

    //产生一个收集器,对将mapper应用到收集到的元素上之后产生的值计算总和
    public static <T> Collector<T, ?, Integer>
    summingInt(ToIntFunction<? super T> mapper) {
        return new CollectorImpl<>(
                () -> new int[1],
                (a, t) -> { a[0] += mapper.applyAsInt(t); },
                (a, b) -> { a[0] += b[0]; return a; },
                a -> a[0], CH_NOID);
    }

    //产生一个收集器,对将mapper应用到收集到的元素上之后产生的值计算总和
    public static <T> Collector<T, ?, Long>
    summingLong(ToLongFunction<? super T> mapper) {
        return new CollectorImpl<>(
                () -> new long[1],
                (a, t) -> { a[0] += mapper.applyAsLong(t); },
                (a, b) -> { a[0] += b[0]; return a; },
                a -> a[0], CH_NOID);
    }

    //产生一个收集器,对将mapper应用到收集到的元素上之后产生的值计算总和
    public static <T> Collector<T, ?, Double>
    summingDouble(ToDoubleFunction<? super T> mapper) {
        /*
         * In the arrays allocated for the collect operation, index 0
         * holds the high-order bits of the running sum, index 1 holds
         * the low-order bits of the sum computed via compensated
         * summation, and index 2 holds the simple sum used to compute
         * the proper result if the stream contains infinite values of
         * the same sign.
         */
        return new CollectorImpl<>(
                () -> new double[3],
                (a, t) -> { sumWithCompensation(a, mapper.applyAsDouble(t));
                            a[2] += mapper.applyAsDouble(t);},
                (a, b) -> { sumWithCompensation(a, b[0]);
                            a[2] += b[2];
                            return sumWithCompensation(a, b[1]); },
                a -> computeFinalSum(a),
                CH_NOID);
    }

    //产生一个收集器,使用comparator指定的排序方法,计算收集到的元素中的最大值
    public static <T> Collector<T, ?, Optional<T>>
    maxBy(Comparator<? super T> comparator) {
        return reducing(BinaryOperator.maxBy(comparator));
    }

    //产生一个收集器,使用comparator指定的排序方法,计算收集到的元素中的最小值
    public static <T> Collector<T, ?, Optional<T>>
    minBy(Comparator<? super T> comparator) {
        return reducing(BinaryOperator.minBy(comparator));
    }

    //产生一个收集器,它会产生一个映射表,其键是将mapper应用到收集到的数据上而产生的,
    //其值是使用downstream收集器收集到的具有相同键的元素
    public static <T, U, A, R>
    Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper,
                               Collector<? super U, A, R> downstream) {
        BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
        return new CollectorImpl<>(downstream.supplier(),
                                   (r, t) -> downstreamAccumulator.accept(r, mapper.apply(t)),
                                   downstream.combiner(), downstream.finisher(),
                                   downstream.characteristics());
    }
}