• Stream API就是流操作api,Java中 流 都是指数据,尤其是大量数据,所以Stream API是为了方便处理大量数据的,也就是给大数据运算做基础的。

  • 注意区别于io流,io的流操作的数据是字节码或字符,主要是处理输入输出。

  • Stream api的流操作的数据是数组或集合,主要是数据的计算,把一个集合转变为另一个集合的过程中,进行一系列流水线式的操作,主要是数据的查找、过滤和映射等操作。

  • 集合是数据,流(Stream)是指计算 数据集合或数据数组的计算,称为Stream,也就是我们常说的流式计算

  • 相通于Hadoop框架的流式计算,map-reduce思想是一致的

image.png
注意:

  • stream api是基于lambda表达式的,也就是基于函数式编程的
  • stream不会自己存储数据,只是处理数据
  • stream不会改吧源数据对象,处理结束后会返回一个新生成的stream
  • stream操作是有延时的,意味着它们会等到·需要结果的时候才执行

    一、创建Stream

  1. //1. 通过Collection系列集合提供的 stream()或ParallelStream() 获取及河流
  2. List<String> list = new ArrayList<>();
  3. Stream<String> stream = list.stream();//非并行的流
  4. Stream<String> parallelStream = list.parallelStream();//并行的流
  5. //2. 通过Arrays的静态方法 stream()获取数组流
  6. Stream<Integer> stream1 = Arrays.stream(new Integer[10]);
  7. //3. 通过Stream类的静态方法of()
  8. Stream<Serializable> stream2 = Stream.of(1, 2, "initit.com");
  9. //4. 创建无限流
  10. //4.1 迭代方式
  11. Stream<Integer> stream3 = Stream.iterate(1001, x -> x + 2);
  12. /*
  13. 说明:
  14. 方法:iterate(final T seed, final UnaryOperator<T> f)
  15. 其中 UnaryOperator 接口如下:
  16. @FunctionalInterface
  17. public interface UnaryOperator<T> extends Function<T, T> {
  18. static <T> UnaryOperator<T> identity() {
  19. return t -> t;
  20. }
  21. }
  22. */
  23. //终止 测试
  24. stream3.limit(5).forEach(System.out::println);
  25. // 4.2 生成方式
  26. Stream<Double> stream4 = Stream.generate(Math::random);
  27. stream4.limit(100).forEach(System.out::println);
  28. //说明: generate(Supplier<T> s)
  29. //@FunctionalInterface
  30. //public interface Supplier<T> {
  31. // T get();
  32. //}

二、中间操作

  • filter 过滤数据
  • limit 截断数据,不输出后边的数据
  • skip 跳过数据,不输出前边的数据
  • distinct 去重
  • map 映射
  • flatMap 平铺的映射,map和flatMap的区别等同于add和addAll的区别,flatMap是把多个流连成一个流,addAll是把多个集合连成一个集合
  • sorted 排序,默认排序
  • sorted(comparator)重写comparator方法,定制排序
  1. //模拟一个User对象
  2. class User {
  3. private Integer id; //主键
  4. private String name; //姓名
  5. private Integer age;//年龄
  6. private Double salary;//薪水
  7. public User(String name, Integer age, Double salary) {
  8. this.name = name;
  9. this.age = age;
  10. this.salary = salary;
  11. }
  12. //构造函数 方便测试
  13. public User(String name, int age, double salary) {
  14. this.name = name;
  15. this.age = age;
  16. this.salary = salary;
  17. }
  18. //后边要比对对象是否相同 需要重写equals和hasCode方法
  19. @Override
  20. public boolean equals(Object o) {
  21. if (this == o) return true;
  22. if (!(o instanceof User)) return false;
  23. User user = (User) o;
  24. return Objects.equals(getId(), user.getId()) &&
  25. Objects.equals(getName(), user.getName()) &&
  26. Objects.equals(getAge(), user.getAge()) &&
  27. Objects.equals(getSalary(), user.getSalary());
  28. }
  29. @Override
  30. public int hashCode() {
  31. return Objects.hash(getId(), getName(), getAge(), getSalary());
  32. }
  33. //toString 方便控制台输出查看
  34. @Override
  35. public String toString() {
  36. return "User{" +
  37. "id=" + id +
  38. ", name='" + name + '\'' +
  39. ", age=" + age +
  40. ", salary=" + salary +
  41. '}';
  42. }
  43. //省略get set方法
  44. }
  45. //准备数据
  46. List<User> list = new ArrayList<User>() {
  47. {
  48. add(new User("张三", 18, 6000));
  49. add(new User("张三", 18, 6000));
  50. add(new User("张三", 18, 6000));
  51. add(new User("李四", 20, 6000));
  52. add(new User("张三", 50, 8000));
  53. add(new User("编程指南", 30, 120000));
  54. //为测试速度 添加一千万数据
  55. // for (int i = 0; i < 1000 * 10000; i++) {
  56. // add(new User(UUID.randomUUID() + "", 30, 120000));
  57. // }
  58. }
  59. };
  60. Stream<User> stream = list.stream();
  61. @Test
  62. public void filter() {
  63. //过滤 filter
  64. stream.filter(user -> user.getAge() > 18).forEach(System.out::println);
  65. // Stream<T> filter(Predicate<? super T> predicate);
  66. //Predicate的方法 boolean test(T t);
  67. //输出如下:
  68. //User{id=0, name='李四', age=20, salary=6000.0}
  69. //User{id=0, name='张三', age=50, salary=8000.0}
  70. //User{id=0, name='编程指南', age=30, salary=120000.0}
  71. }
  72. //迭代 forEach
  73. @Test
  74. public void foreach() {
  75. stream.forEach(System.out::println);
  76. //输出
  77. //User{id=0, name='张三', age=18, salary=6000.0}
  78. //User{id=0, name='张三', age=18, salary=6000.0}
  79. //User{id=0, name='张三', age=18, salary=6000.0}
  80. //User{id=0, name='李四', age=20, salary=6000.0}
  81. //User{id=0, name='张三', age=50, salary=8000.0}
  82. //User{id=0, name='编程指南', age=30, salary=120000.0}
  83. //stream 会帮我们自动实现内部迭代 forEach
  84. //区别于外部迭代
  85. list.forEach(System.out::println);
  86. //其本质就是以下代码
  87. Iterator<User> iterator = list.iterator();
  88. while (iterator.hasNext()) {
  89. System.out.println(iterator.next());
  90. }
  91. //也就是
  92. for (User user : list) {
  93. System.out.println(user);
  94. }
  95. }
  96. //过滤 filter
  97. @Test
  98. public void test() {
  99. Stream<User> userStream = stream.filter(user -> {
  100. System.out.println("判断年龄……");
  101. return user.getAge() > 18;
  102. });
  103. //此时运行无任何输出
  104. //调用终止操作
  105. userStream.forEach(System.out::println);
  106. //输出如下:
  107. //判断年龄……
  108. //判断年龄……
  109. //判断年龄……
  110. //判断年龄……
  111. //User{id=0, name='李四', age=20, salary=6000.0}
  112. //判断年龄……
  113. //User{id=0, name='张三', age=50, salary=8000.0}
  114. //判断年龄……
  115. //User{id=0, name='编程指南', age=30, salary=120000.0}
  116. //说明: 中间操作不会自动执行,只有调用终止操作时,才会一次性执行 这个叫 惰性求值
  117. }
  118. //截断 limit
  119. @Test
  120. public void limit() {
  121. stream.limit(3).forEach(System.out::println);
  122. //输出如下:
  123. //User{id=0, name='张三', age=18, salary=6000.0}
  124. //User{id=0, name='张三', age=18, salary=6000.0}
  125. //User{id=0, name='张三', age=18, salary=6000.0}
  126. }
  127. //跳过 skip
  128. @Test
  129. public void skip() {
  130. stream.skip(3).forEach(System.out::println);
  131. //输出如下:
  132. //User{id=0, name='李四', age=20, salary=6000.0}
  133. //User{id=0, name='张三', age=50, salary=8000.0}
  134. //User{id=0, name='编程指南', age=30, salary=120000.0}
  135. }
  136. //去重 distinct
  137. @Test
  138. public void distinct() {
  139. //distinct 需要重写 equals 和 hasCode方法
  140. stream.distinct().forEach(System.out::println);
  141. //输出如下:
  142. //User{id=0, name='张三', age=18, salary=6000.0}
  143. //User{id=0, name='李四', age=20, salary=6000.0}
  144. //User{id=0, name='张三', age=50, salary=8000.0}
  145. //User{id=0, name='编程指南', age=30, salary=120000.0}
  146. }
  147. //映射 map
  148. @Test
  149. public void map() {
  150. //假如要提取用户名
  151. stream.map(User::getName)
  152. .forEach(System.out::println);
  153. //map方法接收一个Function函数:接收一个值,处理后返回一个新值
  154. //就是把每个对象映射成一个新对象
  155. // <R> Stream<R> map(Function<? super T, ? extends R> mapper);
  156. //@FunctionalInterface
  157. //public interface Function<T, R> {
  158. // R apply(T t);
  159. //}
  160. //输出如下:
  161. //张三
  162. //张三
  163. //张三
  164. //李四
  165. //张三
  166. //编程指南
  167. }
  168. //再来个映射测试 问每个人的薪水是多少
  169. @Test
  170. public void map1() {
  171. stream.map(user -> user.getName() + "的薪水是: " + user.getSalary())
  172. .forEach(System.out::println);
  173. //map方法接收一个Function函数:接收一个值,处理后返回一个新值
  174. //就是把每个对象映射成一个新对象
  175. // <R> Stream<R> map(Function<? super T, ? extends R> mapper);
  176. //@FunctionalInterface
  177. //public interface Function<T, R> {
  178. // R apply(T t);
  179. //}
  180. //输出如下:
  181. //张三的薪水是: 6000.0
  182. //张三的薪水是: 6000.0
  183. //张三的薪水是: 6000.0
  184. //李四的薪水是: 6000.0
  185. //张三的薪水是: 8000.0
  186. //编程指南的薪水是: 120000.0
  187. }
  188. //扁平化的映射 flatMap
  189. //就是把每一个值映射成一个流,然后把所有的流连在一起变成一个流
  190. @Test
  191. public void flatMap() {
  192. stream.flatMap(user -> getStream(user.getName()))
  193. .forEach(System.out::println);
  194. //<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
  195. //输出:
  196. //张
  197. //三
  198. //张
  199. //三
  200. //张
  201. //三
  202. //李
  203. //四
  204. //张
  205. //三
  206. //编
  207. //程
  208. //指
  209. //南
  210. // map 和flatMap 的区别
  211. // 等同于 add和addAll的区别
  212. // 例如 map ={aaaa,bbbb,cccc}
  213. // 那么flatMap = {a,a,a,a,b,b,b,b,c,c,c,c}
  214. //就是把多个流合并为一个流了, 联想list.addAll() 把多个list合并成一个list
  215. }
  216. private Stream<String> getStream(String str) {
  217. return Arrays.stream(str.split(""));
  218. }
  219. //排序:自然排序 sorted() 就是按照字典排序
  220. @Test
  221. public void sorted() {
  222. Stream.of(1, 4, 5, 5, 6, 2, 3).sorted()
  223. .forEach(System.out::print);
  224. //输出: 1234556
  225. }
  226. //排序:定制排序 sorted(Comparator<? super T> comparator) 就是定制 Comparator接口
  227. @Test
  228. public void sorted1() {
  229. Stream.of(1, 4, 5, 5, 6, 2, 3)
  230. //倒序排列
  231. .sorted((x, y) -> y > x ? 1 : -1)
  232. .forEach(System.out::print);
  233. //输出: 1234556
  234. }
  235. //排序例子: 先按薪资排,然后按年龄排
  236. @Test
  237. public void sorted2() {
  238. long now = System.currentTimeMillis();
  239. list.stream().sorted((o1, o2) -> {
  240. if (o1.getAge().equals(o2.getAge())) {
  241. return o1.getSalary().compareTo(o2.getSalary());
  242. } else {
  243. return o1.getAge().compareTo(o2.getAge());
  244. }
  245. }).close();
  246. System.out.println(System.currentTimeMillis() - now);
  247. //速度测试: 百万数据
  248. // parallelStream()是 37毫秒 51 36
  249. //stream() 是31毫秒 41 35
  250. //速度测试: 千万数据
  251. // parallelStream()是 32毫秒
  252. //stream() 是33毫秒
  253. }
  254. //对比一下速度
  255. @Test
  256. public void sorted3() {
  257. long now2 = System.currentTimeMillis();
  258. list.sort((o1, o2) -> {
  259. if (o1.getAge().equals(o2.getAge())) {
  260. return o1.getSalary().compareTo(o2.getSalary());
  261. } else {
  262. return o1.getAge().compareTo(o2.getAge());
  263. }
  264. });
  265. // for (User user : list) {
  266. // System.out.println(user);
  267. // }
  268. System.out.println(System.currentTimeMillis() - now2);
  269. //速度测试: 百万数据 67毫秒 73 61
  270. //千万数据: 192毫秒
  271. }

三、结束操作

  1. //是否都匹配 allMatch
  2. boolean allMatch = list.stream().allMatch(user -> user.getAge().equals(18));
  3. System.out.println(allMatch);//输出false 不是所有用户都18岁
  4. //至少有一个匹配 anyMatch
  5. boolean anyMatch = list.stream().anyMatch(user -> user.getAge() == 18);
  6. System.out.println(anyMatch);//输出true 有18岁的用户
  7. //都不匹配 noneMatch
  8. boolean noneMatch = list.stream().noneMatch(user -> user.getAge() == 18);
  9. System.out.println(noneMatch);//输出false 不是所有用户都不是18岁
  10. //查找第一个 findFirst
  11. Optional<User> first = list.stream()
  12. .sorted(Comparator.comparingDouble(User::getSalary))
  13. .findFirst();
  14. System.out.println(first);
  15. //Optional 可选的,用来封装对象的容器 目的是减少空指针 我们后边会细说
  16. //查找任意一个
  17. Optional<User> any = list.parallelStream()
  18. .filter(user -> user.getAge() > 18)
  19. .findAny();
  20. System.out.println(any.get());
  21. //查找最大值
  22. Optional<User> max = list.stream().max(Comparator.comparingDouble(User::getAge));
  23. System.out.println(max);
  24. // 查年龄最大的用户 Optional[User{id=null, name='张三', age=50, salary=8000.0}]
  25. //查最小的
  26. //查最少的工资是多少
  27. Optional<Double> min = list.stream().map(User::getSalary)
  28. .min(Double::compareTo);
  29. System.out.println(min);//输出6000
  30. //归约
  31. Stream<Integer> stream = Stream.of(1, 2, 3, 4, 5, 6,7,8,9,10);
  32. Integer reduce = stream.reduce(0, Integer::sum);
  33. System.out.println(reduce); //55
  34. // Optional<T> reduce(BinaryOperator<T> accumulator);
  35. //归约 计算工资的总和
  36. Optional<Double> reduce1 = list.stream().map(User::getSalary)
  37. .reduce(Double::sum);
  38. System.out.println(reduce1.get());//152000.0
  39. //收集
  40. list.stream()
  41. .map(User::getName)
  42. .collect(Collectors.toList())
  43. .forEach(System.out::println);
  44. // <R> R collect(Supplier<R> supplier,
  45. // BiConsumer<R, ? super T> accumulator,
  46. // BiConsumer<R, R> combiner);
  47. //输出:
  48. //张三
  49. //张三
  50. //张三
  51. //李四
  52. //张三
  53. //编程指南
  54. System.out.println("----------------");
  55. list.stream()
  56. .map(User::getName)
  57. .collect(Collectors.toSet())
  58. .forEach(System.out::println);
  59. //输出:
  60. //李四
  61. //张三
  62. //编程指南
  63. System.out.println("==============");
  64. LinkedHashSet<String> linkedHashSet = list.stream()
  65. .map(User::getName)
  66. .collect(Collectors.toCollection(LinkedHashSet::new));
  67. System.out.println(linkedHashSet);
  68. //输出: [张三, 李四, 编程指南]
  69. //计算平均值
  70. Long count = list.stream()
  71. .collect(Collectors.counting());
  72. //等同于
  73. // Long collect = list.stream().count();
  74. System.out.println(count);
  75. //计算工资的平均值
  76. Double avg = list.stream().collect(Collectors.averagingDouble(User::getSalary));
  77. System.out.println(avg);
  78. //激素那年龄总和
  79. Double sum = list.stream().collect(Collectors.summingDouble(User::getAge));
  80. //等同于
  81. // Double sum = list.stream().mapToDouble(User::getAge).sum();
  82. System.out.println(sum);
  83. //以上输出:
  84. //6
  85. //25333.333333333332
  86. //154.0
  87. System.out.println("-------分组----------");
  88. //分组
  89. //安年龄分组
  90. Map<Integer, List<User>> map = list.stream()
  91. .collect(Collectors.groupingBy(User::getAge));
  92. System.out.println(map);
  93. //{50=[User{id=null, name='张三', age=50, salary=8000.0}],
  94. // 18=[User{id=null, name='张三', age=18, salary=6000.0},
  95. // User{id=null, name='张三', age=18, salary=6000.0},
  96. // User{id=null, name='张三', age=18, salary=6000.0}],
  97. // 20=[User{id=null, name='李四', age=20, salary=6000.0}],
  98. // 30=[User{id=null, name='编程指南', age=30, salary=120000.0}]}
  99. //多级分组
  100. System.out.println("-------------多级分组-----------");
  101. Map<String, Map<String, List<User>>> map2 = list.stream()
  102. .collect(Collectors.groupingBy(User::getName,
  103. Collectors.groupingBy(user -> {
  104. if (user.getAge() <= 18) {
  105. return "青少年";
  106. } else if (user.getAge() <= 60) {
  107. return "中年";
  108. } else {
  109. return "老年";
  110. }
  111. })));
  112. System.out.println(map2);
  113. //输出:{李四={中年=[User{id=null, name='李四', age=20, salary=6000.0}]},
  114. // 张三={青少年=[User{id=null, name='张三', age=18, salary=6000.0},
  115. // User{id=null, name='张三', age=18, salary=6000.0},
  116. // User{id=null, name='张三', age=18, salary=6000.0}],
  117. // 中年=[User{id=null, name='张三', age=50, salary=8000.0}]},
  118. // 编程指南={中年=[User{id=null, name='编程指南', age=30, salary=120000.0}]}}
  119. //收集统计
  120. System.out.println("-----收集统计-----");
  121. LongSummaryStatistics statistics = list.stream()
  122. .collect(Collectors.summarizingLong(User::getAge));
  123. double average = statistics.getAverage();
  124. long count1 = statistics.getCount();
  125. long max1 = statistics.getMax();
  126. long min1 = statistics.getMin();
  127. long sum1 = statistics.getSum();
  128. //收集 连接
  129. String join = list.stream().map(User::getName).collect(Collectors.joining());
  130. System.out.println(join);
  131. //张三张三张三李四张三编程指南
  132. String join1 = list.stream().map(User::getName).collect(Collectors.joining(" , "));
  133. System.out.println(join1);
  134. //张三 , 张三 , 张三 , 李四 , 张三 , 编程指南

并行流和顺序流

  • 并行流 就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。
  • Java 8 中将并行进行了优化,我们可以很容易的对数据进行并行操作。
  • Stream API 可以声明性地通过 parallel() 与sequential() 在并行流与顺序流之间进行切换。
  • 并行流底层使用的是JDK7的Fork/Join框架实现的,相当于提供了一个种简便的使用方式。

了解 Fork/Join 框架

Fork/Join 框架:就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行 join 汇总.
image.png

Fork/Join框架相对于多线程执行的优势是,可以更高效的利用cpu,采用 “工作窃取”模式(work-stealing):
当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。

相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的处理方式上.在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态.而在fork/join框架实现中,如果某个子问题由于等待另外一个子问题的完成而无法继续运行.那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了线程的等待时间,提高了性能 。

举个例子: 计算0到一千亿数字求和
**
一、传统循环累积求和法

  1. //for循环累加
  2. @Test
  3. public void test2() {
  4. long start = System.currentTimeMillis();
  5. long sum = 0L;
  6. for (long i = 0L; i <= 10000000000L; i++) {
  7. sum += i;
  8. }
  9. System.out.println(sum);
  10. long end = System.currentTimeMillis();
  11. System.out.println("耗费的时间为: " + (end - start));
  12. //耗费的时间为: 2660
  13. }

二、ForkJoin框架计算

  1. package com.initit.java8.streamapi;
  2. import java.util.concurrent.RecursiveTask;
  3. /**
  4. * ForkJoin框架使用案例
  5. * 计算求和
  6. */
  7. public class ForkJoinCalculate extends RecursiveTask<Long> {
  8. private long start;
  9. private long end;
  10. private static final long THRESHOLD = 10000L; //临界值
  11. public ForkJoinCalculate(long start, long end) {
  12. this.start = start;
  13. this.end = end;
  14. }
  15. /**
  16. * 小于临界值10000的时候,累加求和
  17. * 大于临界值10000的时候,对半拆分任务,交给多个线程执行
  18. */
  19. @Override
  20. protected Long compute() {
  21. long length = end - start;
  22. if (length <= THRESHOLD) {
  23. long sum = 0;
  24. for (long i = start; i <= end; i++) {
  25. sum += i;
  26. }
  27. return sum;
  28. } else {
  29. long middle = (start + end) / 2;
  30. ForkJoinCalculate left = new ForkJoinCalculate(start, middle);
  31. left.fork(); //拆分,并将该子任务压入线程队列
  32. ForkJoinCalculate right = new ForkJoinCalculate(middle + 1, end);
  33. right.fork();
  34. return left.join() + right.join();
  35. }
  36. }
  37. }
  38. //测试
  39. @Test
  40. public void test1() {
  41. long start = System.currentTimeMillis();
  42. ForkJoinPool pool = new ForkJoinPool();
  43. ForkJoinTask<Long> task = new ForkJoinCalculate(0L, 10000000000L);
  44. long sum = pool.invoke(task);
  45. System.out.println(sum);
  46. long end = System.currentTimeMillis();
  47. System.out.println("耗费的时间为: " + (end - start));
  48. //耗费的时间为: 1145
  49. }

三、使用Stream API 并行流计算求和

  1. @Test
  2. public void test3() {
  3. long start = System.currentTimeMillis();
  4. Long sum = LongStream.rangeClosed(0L, 10000000000L)
  5. .parallel() //切换为并行流
  6. // .sequential() //切换为顺序流 串行
  7. .sum();
  8. System.out.println(sum);
  9. long end = System.currentTimeMillis();
  10. System.out.println("耗费的时间为: " + (end - start));
  11. //耗费的时间为: 755
  12. }
  • .parallel() //切换为并行流
  • .sequential() //切换为顺序流 串行

map-reduce思想

最后简单提一下map-reduce思想。

  • map 映射,数学概念 把A映射成B,A和B有某种对应关系
  • reduce 归约,数学概念,把一系列数据按照某种关系累积

起源于google,用来处理大量的网页数据(大数据计算),后hadoop框架借鉴改思想。
map-reduce是一种并行处理数据的架构模式,就是利用大量的计算机来并行计算数据。

打个比喻:

  • We want to count all the books in the library. You count up shelf #1, I count up shelf #2. That’s map. The more people we get, the faster it goes.
  • Now we get together and add our individual counts. That’s reduce.
  • 我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。
  • 现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”。

知乎有位大神说:

  • 举个栗子,要斗地主了,需要从10副混一起的牌里找出一幅牌,10副牌分10堆给10个人去清,每个人分别把黑红梅方放一堆,这就是map
  • 然后安排4个人每人去清一种花色的牌,从中找出1到K,这就是reduce了

hadoop入门有一个统计单词出现次数的例子word count,我们简单分析一下,了解这种思想:

  1. MapReduce大体上分为六个步骤:input, split, map, shuffle, reduce, output。细节描述如下:
  2. 1. 输入(input):如给定一个文档,包含如下四行:
  3. Hello Java
  4. Hello C
  5. Hello Java
  6. Hello C++
  7. 2. 拆分(split):将上述文档中每一行的内容转换为key-value对,即:
  8. 0 - Hello Java
  9. 1 - Hello C
  10. 2 Hello Java
  11. 3 - Hello C++
  12. 3. 映射(map):将拆分之后的内容转换成新的key-value对,即:
  13. (Hello , 1)
  14. (Java , 1)
  15. (Hello , 1)
  16. (C , 1)
  17. (Hello , 1)
  18. (Java , 1)
  19. (Hello , 1)
  20. (C++ , 1)
  21. 4. 派发(shuffle):将key相同的扔到一起去,即:
  22. (Hello , 1)
  23. (Hello , 1)
  24. (Hello , 1)
  25. (Hello , 1)
  26. (Java , 1)
  27. (Java , 1)
  28. (C , 1)
  29. (C++ , 1)
  30. 注意:这一步需要移动数据,原来的数据可能在不同的datanode上,这一步过后,相同key的数据会被移动到同一台机器上。最终,它会返回一个list包含各种k-value对,即:
  31. { Hello: 1,1,1,1}
  32. {Java: 1,1}
  33. {C: 1}
  34. {C++: 1}
  35. 5. 缩减(reduce):把同一个key的结果加在一起。如:
  36. (Hello , 4)
  37. (Java , 2)
  38. (C , 1)
  39. (C++,1)
  40. 6. 输出(output): 输出缩减之后的所有结果。

数据流处理-Stream API - 图3

就是一种拆分和合并的思想,目的是使用多个计算资源(多台计算、多进程、多线程等等)并行计算,以提升计算效率,节省计算时间。是一种很好的架构模式,大数据计算框架Hadoop等都是采用map-reduce思想。