@@ -32,6 +32,54 @@ int longestStringLengthStartingWithA
32
32
<table width =" 600 " ><tr ><td colspan =" 3 " align =" center " border =" 0 " >Stream操作分类</td ></tr ><tr ><td rowspan =" 2 " border =" 1 " >中间操作(Intermediate operations)</td ><td >无状态(Stateless)</td ><td >unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek()</td ></tr ><tr ><td >有状态(Stateful)</td ><td >distinct() sorted() sorted() limit() skip() </td ></tr ><tr ><td rowspan =" 2 " border =" 1 " >结束操作(Terminal operations)</td ><td >非短路操作</td ><td >forEach() forEachOrdered() toArray() reduce() collect() max() min() count()</td ></tr ><tr ><td >短路操作(short-circuiting)</td ><td >anyMatch() allMatch() noneMatch() findFirst() findAny()</td ></tr ></table >
33
33
34
34
Stream上的所有操作分为两类:中间操作和结束操作,中间操作只是一种标记,只有结束操作才会触发实际计算。中间操作又可以分为无状态的(* Stateless* )和有状态的(* Stateful* ),无状态中间操作是指元素的处理不受前面元素的影响,而有状态的中间操作必须等到所有元素处理之后才知道最终结果,比如排序是有状态操作,在读取所有元素之前并不能确定排序结果;结束操作又可以分为短路操作和非短路操作,短路操作是指不用处理全部元素就可以返回结果,比如* 找到第一个满足条件的元素* 。之所以要进行如此精细的划分,是因为底层对每一种情况的处理方式不同。
35
+ 为了更好的理解流的中间操作和终端操作,可以通过下面的两段代码来看他们的执行过程。
36
+ ``` Java
37
+ IntStream . range(1 , 10 )
38
+ .peek(x - > System . out. print(" \n A" + x))
39
+ .limit(3 )
40
+ .peek(x - > System . out. print(" B" + x))
41
+ .forEach(x - > System . out. print(" C" + x));
42
+ ```
43
+ 输出为:
44
+ A1B1C1
45
+ A2B2C2
46
+ A3B3C3
47
+ 中间操作是懒惰的,也就是中间操作不会对数据做任何操作,直到遇到了最终操作。而最终操作,都是比较热情的。他们会往前回溯所有的中间操作。也就是当执行到最后的forEach操作的时候,它会回溯到它的上一步中间操作,上一步中间操作,又会回溯到上上一步的中间操作,...,直到最初的第一步。
48
+ 第一次forEach执行的时候,会回溯peek 操作,然后peek会回溯更上一步的limit操作,然后limit会回溯更上一步的peek操作,顶层没有操作了,开始自上向下开始执行,输出:A1B1C1
49
+ 第二次forEach执行的时候,然后会回溯peek 操作,然后peek会回溯更上一步的limit操作,然后limit会回溯更上一步的peek操作,顶层没有操作了,开始自上向下开始执行,输出:A2B2C2
50
+
51
+ ...
52
+ 当第四次forEach执行的时候,然后会回溯peek 操作,然后peek会回溯更上一步的limit操作,到limit的时候,发现limit(3)这个job已经完成,这里就相当于循环里面的break操作,跳出来终止循环。
53
+
54
+ 再来看第二段代码:
55
+
56
+ ``` Java
57
+ IntStream . range(1 , 10 )
58
+ .peek(x - > System . out. print(" \n A" + x))
59
+ .skip(6 )
60
+ .peek(x - > System . out. print(" B" + x))
61
+ .forEach(x - > System . out. print(" C" + x));
62
+ ```
63
+ 输出为:
64
+ A1
65
+ A2
66
+ A3
67
+ A4
68
+ A5
69
+ A6
70
+ A7B7C7
71
+ A8B8C8
72
+ A9B9C9
73
+ 第一次forEach执行的时候,会回溯peek操作,然后peek会回溯更上一步的skip操作,skip回溯到上一步的peek操作,顶层没有操作了,开始自上向下开始执行,执行到skip的时候,因为执行到skip,这个操作的意思就是跳过,下面的都不要执行了,也就是就相当于循环里面的continue,结束本次循环。输出:A1
74
+
75
+ 第二次forEach执行的时候,会回溯peek操作,然后peek会回溯更上一步的skip操作,skip回溯到上一步的peek操作,顶层没有操作了,开始自上向下开始执行,执行到skip的时候,发现这是第二次skip,结束本次循环。输出:A2
76
+
77
+ ...
78
+
79
+ 第七次forEach执行的时候,会回溯peek操作,然后peek会回溯更上一步的skip操作,skip回溯到上一步的peek操作,顶层没有操作了,开始自上向下开始执行,执行到skip的时候,发现这是第七次skip,已经大于6了,它已经执行完了skip(6)的job了。这次skip就直接跳过,继续执行下面的操作。输出:A7B7C7
80
+
81
+ ...直到循环结束。
82
+
35
83
36
84
## 一种直白的实现方式
37
85
@@ -89,7 +137,7 @@ Stream流水线组织结构示意图如下:
89
137
90
138
<table width =" 600px " ><tr ><td align =" center " >方法名</td ><td align =" center " >作用</td ></tr ><tr ><td >void begin(long size)</td ><td >开始遍历元素之前调用该方法,通知Sink做好准备。</td ></tr ><tr ><td >void end()</td ><td >所有元素遍历完成之后调用,通知Sink没有更多的元素了。</td ></tr ><tr ><td >boolean cancellationRequested()</td ><td >是否可以结束操作,可以让短路操作尽早结束。</td ></tr ><tr ><td >void accept(T t)</td ><td >遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept(T t)方法就行了。</td ></tr ></table >
91
139
92
- 有了上面的协议,相邻Stage之间调用就很方便了,每个Stage都会将自己的操作封装到一个Sink里,前一个Stage只需调用后一个Stage的` accept() ` 方法即可,并不需要知道其内部是如何处理的。当然对于有状态的操作,Sink的` begin() ` 和` end() ` 方法也是必须实现的。比如Stream.sorted()是一个有状态的中间操作,其对应的Sink.begin()方法可能创建一个乘放结果的容器 ,而accept()方法负责将元素添加到该容器,最后end()负责对容器进行排序。对于短路操作,` Sink.cancellationRequested() ` 也是必须实现的,比如Stream.findFirst()是短路操作,只要找到一个元素,cancellationRequested()就应该返回* true* ,以便调用者尽快结束查找。Sink的四个接口方法常常相互协作,共同完成计算任务。** 实际上Stream API内部实现的的本质,就是如何重载Sink的这四个接口方法 ** 。
140
+ 有了上面的协议,相邻Stage之间调用就很方便了,每个Stage都会将自己的操作封装到一个Sink里,前一个Stage只需调用后一个Stage的` accept() ` 方法即可,并不需要知道其内部是如何处理的。当然对于有状态的操作,Sink的` begin() ` 和` end() ` 方法也是必须实现的。比如Stream.sorted()是一个有状态的中间操作,其对应的Sink.begin()方法可能创建一个盛放结果的容器 ,而accept()方法负责将元素添加到该容器,最后end()负责对容器进行排序。对于短路操作,` Sink.cancellationRequested() ` 也是必须实现的,比如Stream.findFirst()是短路操作,只要找到一个元素,cancellationRequested()就应该返回* true* ,以便调用者尽快结束查找。Sink的四个接口方法常常相互协作,共同完成计算任务。** 实际上Stream API内部实现的的本质,就是如何重写Sink的这四个接口方法 ** 。
93
141
94
142
有了Sink对操作的包装,Stage之间的调用问题就解决了,执行时只需要从流水线的head开始对数据源依次调用每个Stage对应的Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。一种可能的Sink.accept()方法流程是这样的:
95
143
@@ -163,7 +211,7 @@ class RefSortingSink<T> extends AbstractRefSortingSink<T> {
163
211
```
164
212
165
213
上述代码完美的展现了Sink的四个接口方法是如何协同工作的:
166
- 1 . 首先beging ()方法告诉Sink参与排序的元素个数,方便确定中间结果容器的的大小;
214
+ 1 . 首先begin ()方法告诉Sink参与排序的元素个数,方便确定中间结果容器的的大小;
167
215
2 . 之后通过accept()方法将元素添加到中间结果当中,最终执行时调用者会不断调用该方法,直到遍历所有元素;
168
216
3 . 最后end()方法告诉Sink所有元素遍历完毕,启动排序步骤,排序完成后将结果传递给下游的Sink;
169
217
4 . 如果下游的Sink是短路操作,将结果传递给下游时不断询问下游cancellationRequested()是否可以结束处理。
@@ -245,4 +293,4 @@ $ java -version
245
293
java version " 1.8.0_101"
246
294
Java(TM) SE Runtime Environment (build 1.8.0_101-b13)
247
295
Java HotSpot(TM) Server VM (build 25.101-b13, mixed mode)
248
- ```
296
+ ```
0 commit comments