diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9e557e4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,27 @@ +# Created by .ignore support plugin (hsz.mobi) +### Java template +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar +*.idea + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + diff --git a/3-Lambda and Collections.md b/3-Lambda and Collections.md index 56d0839..7b0153a 100644 --- a/3-Lambda and Collections.md +++ b/3-Lambda and Collections.md @@ -6,7 +6,7 @@ 我们先从最熟悉的*Java集合框架(Java Collections Framework, JCF)*开始说起。 -为引入Lambda表达式,Java8新增了`java.util.funcion`包,里面包含常用的**函数接口**,这是Lambda表达式的基础,Java集合框架也新增部分接口,以便与Lambda表达式对接。 +为引入Lambda表达式,Java8新增了`java.util.function`包,里面包含常用的**函数接口**,这是Lambda表达式的基础,Java集合框架也新增部分接口,以便与Lambda表达式对接。 首先回顾一下Java集合框架的接口继承结构: @@ -390,4 +390,4 @@ return null; ## 总结 1. Java8为容器新增一些有用的方法,这些方法有些是为**完善原有功能**,有些是为**引入函数式编程**,学习和使用这些方法有助于我们写出更加简洁有效的代码. -2. **函数接口**虽然很多,但绝大多数时候我们根本不需要知道它们的名字,书写Lambda表达式时类型推断帮我们做了一切. \ No newline at end of file +2. **函数接口**虽然很多,但绝大多数时候我们根本不需要知道它们的名字,书写Lambda表达式时类型推断帮我们做了一切. diff --git a/6-Stream Pipelines.md b/6-Stream Pipelines.md index 6839966..35e5398 100644 --- a/6-Stream Pipelines.md +++ b/6-Stream Pipelines.md @@ -32,6 +32,54 @@ int longestStringLengthStartingWithA
Stream操作分类 | ||
中间操作(Intermediate operations) | 无状态(Stateless) | unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek() |
有状态(Stateful) | distinct() sorted() sorted() limit() skip() | |
结束操作(Terminal operations) | 非短路操作 | forEach() forEachOrdered() toArray() reduce() collect() max() min() count() |
短路操作(short-circuiting) | anyMatch() allMatch() noneMatch() findFirst() findAny() |
方法名 | 作用 |
void begin(long size) | 开始遍历元素之前调用该方法,通知Sink做好准备。 |
void end() | 所有元素遍历完成之后调用,通知Sink没有更多的元素了。 |
boolean cancellationRequested() | 是否可以结束操作,可以让短路操作尽早结束。 |
void accept(T t) | 遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept(T t)方法就行了。 |
This method should be overridden when the {@link #spliterator()}
+ * method cannot return a spliterator that is {@code IMMUTABLE},
+ * {@code CONCURRENT}, or late-binding. (See {@link #spliterator()}
+ * for details.)
+ *
+ * @implSpec
+ * The default implementation creates a parallel {@code Stream} from the
+ * collection's {@code Spliterator}.
+ *
+ * @return a possibly parallel {@code Stream} over the elements in this
+ * collection
+ * @since 1.8
+ */
+default Stream
+
+从运行结果里面我们可以很清楚的看到parallelStream同时使用了主线程和`ForkJoinPool.commonPool`创建的线程。
+值得说明的是这个运行结果并不是唯一的,实际运行的时候可能会得到多个结果,比如:
+
+
+
+甚至你的运行结果里面只有主线程。
+
+来源于java 8 实战的书籍的一段话:
+> 并行流内部使用了默认的`ForkJoinPool`(7.2节会进一步讲到分支/合并框架),它默认的线程数量就是你的处理器数量,这个值是由`Runtime.getRuntime().available- Processors()`得到的。 但是你可以通过系统属性`java.util.concurrent.ForkJoinPool.common. parallelism`来改变线程池大小,如下所示: `System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");` 这是一个全局设置,因此它将影响代码中所有的并行流。反过来说,目前还无法专为某个 并行流指定这个值。一般而言,让`ForkJoinPool`的大小等于处理器数量是个不错的默认值, 除非你有很好的理由,否则我们强烈建议你不要修改它。
+
+```java
+// 设置全局并行流并发线程数
+System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");
+System.out.println(ForkJoinPool.getCommonPoolParallelism());// 输出 12
+System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
+System.out.println(ForkJoinPool.getCommonPoolParallelism());// 输出 12
+```
+为什么两次的运行结果是一样的呢?上面刚刚说过了这是一个全局设置,`java.util.concurrent.ForkJoinPool.common.parallelism`是final类型的,整个JVM中只允许设置一次。既然默认的并发线程数不能反复修改,那怎么进行不同线程数量的并发测试呢?答案是:`引入ForkJoinPool`
+```java
+IntStream range = IntStream.range(1, 100000);
+// 传入parallelism
+new ForkJoinPool(parallelism).submit(() -> range.parallel().forEach(System.out::println)).get();
+```
+因此,使用parallelStream时需要注意的一点是,**多个parallelStream之间默认使用的是同一个线程池**,所以IO操作尽量不要放进parallelStream中,否则会阻塞其他parallelStream。
+> Using a ForkJoinPool and submit for a parallel stream does not reliably use all threads. If you look at this ( [Parallel stream from a HashSet doesn't run in parallel](https://stackoverflow.com/questions/28985704/parallel-stream-from-a-hashset-doesnt-run-in-parallel) ) and this ( [Why does the parallel stream not use all the threads of the ForkJoinPool?](https://stackoverflow.com/questions/36947336/why-does-the-parallel-stream-not-use-all-the-threads-of-the-forkjoinpool) ), you'll see the reasoning.
+
+```java
+// 获取当前机器CPU处理器的数量
+System.out.println(Runtime.getRuntime().availableProcessors());// 输出 4
+// parallelStream默认的并发线程数
+System.out.println(ForkJoinPool.getCommonPoolParallelism());// 输出 3
+```
+为什么parallelStream默认的并发线程数要比CPU处理器的数量少1个?文章的开始已经提过了。因为最优的策略是每个CPU处理器分配一个线程,然而主线程也算一个线程,所以要占一个名额。
+这一点可以从源码中看出来:
+```java
+static final int MAX_CAP = 0x7fff; // max #workers - 1
+// 无参构造函数
+public ForkJoinPool() {
+ this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
+ defaultForkJoinWorkerThreadFactory, null, false);
+}bs-channel
+```
+
+## 从parallelStream认识[Fork/Join 框架](https://www.infoq.cn/article/fork-join-introduction/)
+Fork/Join 框架的核心是采用分治法的思想,将一个大任务拆分为若干互不依赖的子任务,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务。同时,为了最大限度地提高并行处理能力,采用了工作窃取算法来运行任务,也就是说当某个线程处理完自己工作队列中的任务后,尝试当其他线程的工作队列中窃取一个任务来执行,直到所有任务处理完毕。所以为了减少线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
+- Fork/Join 的运行流程图
+
+
+简单地说就是大任务拆分成小任务,分别用不同线程去完成,然后把结果合并后返回。所以第一步是拆分,第二步是分开运算,第三步是合并。这三个步骤分别对应的就是Collector的*supplier*,*accumulator*和*combiner*。
+- 工作窃取算法
+Fork/Join最核心的地方就是利用了现代硬件设备多核,在一个操作时候会有空闲的CPU,那么如何利用好这个空闲的cpu就成了提高性能的关键,而这里我们要提到的工作窃取(work-stealing)算法就是整个Fork/Join框架的核心理念,工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
+
+
+## 使用parallelStream的利弊
+使用parallelStream的几个好处:
+1) 代码优雅,可以使用lambda表达式,原本几句代码现在一句可以搞定;
+2) 运用多核特性(forkAndJoin)并行处理,大幅提高效率。
+关于并行流和多线程的性能测试可以看一下下面的几篇博客:
+[并行流适用场景-CPU密集型](https://blog.csdn.net/larva_s/article/details/90403578)
+[提交订单性能优化系列之006-普通的Thread多线程改为Java8的parallelStream并发流](https://blog.csdn.net/blueskybluesoul/article/details/82817007)
+
+然而,任何事物都不是完美的,并行流也不例外,其中最明显的就是使用(parallel)Stream极其不便于代码的跟踪调试,此外并行流带来的不确定性也使得我们对它的使用变得格外谨慎。我们得去了解更多的并行流的相关知识来保证自己能够正确的使用这把双刃剑。
+
+parallelStream使用时需要注意的点:
+1) **parallelStream是线程不安全的;**
+```java
+List