Stream 基础概念
-
A sequence of elements supporting sequential and parallel aggregate operations.
-
A stream pipeline consists of a source, zero or more intermediate operations, and a terminal operation
-
Streams are lazy
关键类
了解了上述的概念,要分析 parallel stream 作用原理,就要从terminal operation 源码入手。
/**
* Stream 定义的接口可以作为源码阅读的切入口
*
* Returns whether this stream, if a terminal operation were to be executed,
* would execute in parallel. 注意:terminal operation executed
* @see java.util.stream.BaseStream#isParallel
*/
boolean isParallel();
-
java.util.stream.AbstractPipeline the core implementations of the Stream interface
-
java.util.stream.TerminalOp
-
java.util.stream.ForEachOps.ForEachOp
-
java.util.stream.ReduceOps.ReduceOp
parallel stream
/**
* 通过TerminalOp 发生真正的调用,是否并行,在此处确定
* Evaluate the pipeline with a terminal operation to produce a result.
*
* @see java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp<E_OUT,R>)
*/
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
return isParallel() ? terminalOp.evaluateParallel(...) : terminalOp.evaluateSequential(...);
}
实现-ForEachOp
// 这样的Terminal Operation
persons.parallelStream().map(person -> person.name).forEach(System.out::println)
如何实现并发调用,只需要关注evaluateParallel
方法。
new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
class ForEachTask<S, T> extends ForkJoinTask<T>
至此,我们就知道并发是依靠ForkJoinTask 实现的。
实现-ReduceOp
// 这样的Terminal Operation
persons.parallelStream().map(person -> person.name).collect(Collectors.joining(";"))
ForkJoinPool
分治算法。主旨是将大任务分成若干小任务,之后再并行对这些小任务进行计算,最终汇总这些任务的结果。
工作窃取算法。避免工作线程由于拆分了任务之后的join等待过程。
ForkJoinTask
A ForkJoinTask is a thread-like entity that is much lighter weight than a normal thread. Huge numbers of tasks and subtasks may be hosted by a small number of actual threads in a ForkJoinPool, at the price of some usage limitations. –简单的说,通过线程池,提升并发性能的一种 fork/join实现。
In the most typical usages, a fork-join pair act like a call (fork) and return (join) from a parallel recursive function.
常用配置
-
java.util.concurrent.ForkJoinPool.common.parallelism
-
java.util.concurrent.ForkJoinPool.common.threadFactory
来源: java.util.concurrent.ForkJoinPool#makeCommonPool