# The Java Streams Parallel
Published: 01-30-2025, Last Updated: 02-01-2025
I made this as someone who loves reading into OpenJDK source code, and obsesses over streaming
and concurrency, but found the Java Streams source particularly difficult to grok in as much detail
as I'd like. I'm hoping this document will make the implementation and performance characteristics
a little more accessible to myself and others. I know this may be a moving target, especially with
Gatherers now being introduced, but perhaps patching or versioning this document as the
implementation evolves will be easier with some groundwork laid.
This document is intended to be intermediate- to expert-friendly, not necessarily beginner-friendly.
I may not dwell too long on terminology or public abstractions that have been well-hashed elsewhere. I
recommend other resources for that, such as the stream package doc,
and this article series by Brian Goetz.
This document is current with
JDK 24 [build 34],
and consists of a single html file.
If you have substantive additions or corrections to propose, I have (hesitantly) created an
issue tracker for this site. Or, if
you'd like to work with me, you can reach me at danielaveryj@gmail.com.
Caution: Depending on implementation details is inherently
risky! This document is not intended to mislead anyone into believing that subsequent (or prior) JDK
versions will adhere to the same design or performance characteristics.
# Table of Contents
# Stream concurrency summary ↩
Note: In this doc I am using a slightly different definition of "eager" and "lazy" than the
stream package doc,
which uses "eager" and "lazy" to refer to roughly "when the stream (as a whole)
executes", rather than "when specific operations on the stream execute (during the whole execution)".
By the former definition, all intermediate operations are lazy. By the latter definition (which I
extrapolated from AbstractPipeline.opEvaluateParallelLazy() and AbstractPipeline.opEvaluateParallel()[Eager?]),
stateful intermediate operations on a parallel stream may execute either eagerly or lazily. By both
definitions, all existing terminal operations are eager, except for spliterator()/iterator().
Java Streams use data parallelism:
| Data parallelism in a pipeline |
| Pipeline |
Data |
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| ↓ |
↓ |
↓ |
↓ |
| Task D |
Task E |
Task F |
Task G |
| map() |
↓ |
↓ |
↓ |
↓ |
| filter() |
| flatMap() |
| collect() |
| |
|
|
|
|
|
|
|
| |
Task B |
|
Task C |
|
| ↓ |
↓ |
| |
|
|
|
| |
Task A |
|
| ↓ |
Here, the pipeline consists of several operations, terminated by the first "eager" operation (in this
case, "collect()"). The eager operation associates the data with a root task (think: thread), which splits
the data and associates each split with a child task. Each child task repeats this process, until the data
will split no more, indicating a leaf task. Each leaf task consumes its split of the data, applying the
pipeline operations and accumulating each element into a result, according to the eager operation. Each
parent task combines its children's results to form its own result, according to the eager operation. This
continues until the root task has a result.
In parallel Java Streams, pipelines can have multiple eager operations, by requiring each intermediate
eager operation to produce a new dataset as its result. This becomes input to the remaining pipeline, down
to the next eager operation.
For data parallelism to achieve speedup, the data should be large enough - and/or the operations
time-intensive enough - to offset the cost of the multitasking overhead (e.g. initial thread scheduling, and
result-combining). However, note that in Java Streams, after any intermediate eager operation, the resultant
data will be resident in memory (so "too large" is possible).
Data parallelism is typically contrasted with task parallelism:
| Task parallelism in a pipeline |
| Pipeline |
Data |
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| ↓ |
|
| Task C |
| map() |
↓ |
| |
|
|
|
|
buffer (C pushes, B polls) |
| ↓ |
|
| Task B |
| filter() |
↓ |
| |
|
|
|
|
buffer (B pushes, A polls) |
| ↓ |
|
| Task A |
| flatMap() |
↓ |
| collect() |
Here, the pipeline consists of several operations, optionally split up by buffers, and ending in a
terminal operation (in this case, "collect()"). Tasks are created to produce or poll elements from upstream,
apply pipeline operations, and consume or push elements downstream. The last task accumulates elements into
a result, according to the terminal operation.
Buffers enable tasks to progress concurrently, but are typically bounded in size to constrain memory
usage. If a buffer fills up, the upstream task must wait until it is not full; if a buffer empties, the
downstream task must wait until it is not empty. In the case of small/temporary mismatches in upstream and
downstream processing rate, a larger buffer spends memory to buy concurrency, enabling tasks to spend less
time waiting on each other.
For task parallelism to achieve speedup, the operations should be time-intensive enough to offset the
cost of the multitasking overhead (e.g. upstream task context-switching when downstream buffer is full, and
downstream task context-switching when upstream buffer is empty). This "time intensity" typically comes from
operations being IO-bound (and thus already having context-switching and downtime).
The "declarative" approach to task parallelism depicted here - interacting with a first-class "pipeline"
object, with operations that abstract over tasks and buffers - has been modeled by e.g. the various Reactive
Streams libraries, and Kotlin Flows. An "imperative" approach - interacting with the lower-level tasks (or
actors/threads/processes) and buffers (or mailboxes/queues/channels) more directly - has been modeled by
e.g. Akka's actors, Erlang's processes, and Go's goroutines + channels.
Combining data parallelism and task parallelism
We can't really use data paralleism in an arbitrary task-parallel task, because in general we don't have
the data in hand in order to split it. The data arrives incrementally over time. We can instead do something
related, and route the data to one of several downstream tasks as it arrives - a kind of load-balancing
operation. Note that pipeline topology need not be linear - it can diverge (fan-out) and even re-converge
(fan-in).
We can use task parallelism in (leaf) data-parallel tasks. However, if the operations are CPU-bound,
which Java Streams was traditionally intended for, then this does nothing but add the devastating IO
overhead of waiting on full/empty buffers and context-switching. That said, the new
Gatherers.mapConcurrent() introduces a limited mechanism for IO-bound work, that actually encapsulates a
buffer, fan-out, and fan-in, in one operation.
Eager and lazy operations
A Java Stream is created from an initial spliterator, which yields elements of a dataset. In sequential
execution, the initial spliterator is advanced into a consumer representing the entire stream pipeline. In
parallel execution, the stream is divided into "segments", where each stateful intermediate operation ends
a segment. These intermediate operations may be either "eager" or "lazy".
Eager operations are implemented as described by data parallelism above. They consume the most
recent spliterator, and (if intermediate) buffer output elements into a new dataset and yield its
spliterator. Eager operations cannot emit downstream before consuming the full upstream.
Lazy operations are implemented by wrapping the most recent spliterator in a new spliterator that
implements the upstream pipeline - from just after the last segment ended, down to and including the
operation itself. Lazy operations can emit downstream before consuming the full upstream.
Whether a given stateful intermediate operation executes eagerly or lazily often depends on
characteristics of the stream (see the Stream operations summary
and Stream planner).
The terminal operation ends the last segment. Almost all terminal operations are eager - they consume the
most recent spliterator, rather than wrap it. The exceptions are the lazy terminal operations spliterator()
and iterator() - they wrap (a supplier of) the most recent spliterator.
# Stream package classes summary ↩
The below table contains a short description of each top-level class in java.util.stream, to get your bearings.
Note: The JDK goes out of its way to provide "primitive" stream types (i.e. IntStream, LongStream,
DoubleStream) that meticulously avoid boxing their primitive elements, for maximum performance. This results
in more complicated class structures designed to support both reference- and primitive-flavored streams.
See example class structurings
class SomeClass {
static class OfInt extends SomeClass { ... }
static class OfLong extends SomeClass { ... }
static class OfDouble extends SomeClass { ... }
}
abstract class SomeClass {
static class OfRef extends SomeClass { ... }
static class OfInt extends SomeClass { ... }
static class OfLong extends SomeClass { ... }
static class OfDouble extends SomeClass { ... }
}
abstract class SomeClass {
static class OfRef extends SomeClass { ... }
abstract static class OfPrimitive extends SomeClass { ... } static class OfInt extends SomeClass.OfPrimitive { ... }
static class OfLong extends SomeClass.OfPrimitive { ... }
static class OfDouble extends SomeClass.OfPrimitive { ... }
}
In this doc I try to focus on the "reference" stream type (i.e. Stream) and its supporting classes. The
code in the primitive counterparts will be highly reminiscent, except for using primitive-specific subclasses
and primitive-accepting/returning variants of methods to avoid boxing.
Aside: What is "scrawl"?
Many of my detail notes are presented as a mix of pseudocode and prose, that I will refer
to as "scrawl" (to dampen expectations). Scrawl is tailored for brevity, rather than precision.
Some scrawl I have put more effort into, some less. This "flashcard"-esque style helps me build
a mental map of the code, but requires reading between the lines. To give a sense of some of the
obscurities used in scrawl:
- "expr1 -> expr2" means "(something in) expr1 calls into or evaluates to expr2", OR, "if (expr1) { return expr2; }"
- "expr1 ?: expr2" means "the value of expr1 if it is not null, else the value of expr2"
- "expr1 |> expr2" means "the value of expr1 is used in expr2"
- "_" means "something goes here, perhaps an argument list or parameter list, but I didn't care to spell it out"
- "if condition: do1, do2; else do3" means "if (condition) { do1; do2; } else { do3; }" (idk why I do this, thankfully it's infrequent)
- I often elide keywords like "new", "return", "class"...
- I pretend StreamOpFlags (and some other constants) are global booleans
- I summarize some classes (eg custom tasks and spliterators) in a { block } immediately after they are used
- I replace recursive generics with "Self", eg: "BaseStream<T, U extends BaseStream<T, U>>" becomes "BaseStream<T, Self>"
- That is, when I'm not omitting generics, or types, entirely
I have included the scrawl just in case it is helpful, but it may not be, and you are not
obligated to read it. In fact, I can only recommend reading scrawl alongside the actual source
code (and a hot drink). Good luck, and have fun! :)
| Top-level java.util.stream classes |
High-level summary |
| Model |
BaseStream (public) |
scrawl
interface BaseStream<T, Self>
extends AutoCloseable
{
Iterator<T> iterator();
Spliterator<T> spliterator();
boolean isParallel();
Self parallel();
Self sequential();
Self unordered();
Self onClose(Runnable handler);
void close();
}
|
Interface extended by all streams (reference and primitive),
that encompasses only the broadest stroke of what a stream is to Java: Something that
wraps an iterator/spliterator, and may be ordered/unordered, sequential/parallel, and
closeable. |
| Stream (public) |
scrawl
interface Stream<T>
extends BaseStream<T, Stream<T>>
{
// declarations of familiar methods, some with default impls
}
|
The familiar interface that declares available operations on
streams. This is the "reference"-flavored variant; I have elided the primitive variants
(IntStream, LongStream, DoubleStream) in this document for brevity, since their
implementations differ only mildly. |
| Sink |
scrawl
interface Sink<T> extends Consumer<T> {
default void begin(long size) {}
default void end() {}
default boolean cancellationRequested() { return false; }
default void accept(int value) { throw }
default void accept(long value) { throw }
default void accept(double value) { throw }
void accept(T value);
interface OfInt
extends Sink<Integer>, IntConsumer
...
abstract static class ChainedReference<T, E_OUT>
implements Sink<T>
{
Sink<? super E_OUT> downstream;
void begin(long size) { downstream.begin(size); }
void end() { downstream.end(); }
void cancellationRequested() { downstream.cancellationRequested(); }
}
abstract static class ChainedInt<E_OUT>
implements Sink.OfInt
...
}
|
A Consumer extended with one-shot methods to inform that it is
"about to be sent elements" and "done being sent elements", as well as a method to query
if it "wants more elements?". Intermediate operations on streams make heavy use of
"sink-wrapping" - taking a downstream sink and creating a new sink that adds its own
state/handling for incoming elements, before (conditionally) passing elements downstream. |
| PipelineHelper |
scrawl
abstract class PipelineHelper<P_OUT> {
abstract StreamShape getSourceShape();
abstract int getStreamAndOpFlags();
abstract <P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator);
abstract <P_IN, S extends Sink<P_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator);
abstract <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
abstract <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
abstract <P_IN> Sink<P_IN> wrapSink(Sink<P_OUT> sink);
abstract <P_IN> Spliterator<P_OUT> wrapSpliterator(Spliterator<P_IN> spliterator);
abstract <P_IN> Node<P_OUT> evaluate(Spliterator<P_IN> spliterator, boolean flatten, IntFunction<P_OUT[]> generator);
abstract Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator);
}
|
An abstract class extended by AbstractPipeline, and declaring
some of the core "heavy-hitting" internal methods for advancing elements through the
stream. As AbstractPipeline is the only subclass, it's not clear that PipelineHelper is
really needed, but it does have fewer type parameters to spell out compared to passing
AbstractPipeline around. |
| AbstractPipeline |
scrawl
abstract class AbstractPipeline<E_IN, E_OUT, Self>
extends PipelineHelper<E_OUT>
implements BaseStream<E_OUT, Self>
{
final AbstractPipeline sourceStage;
final AbstractPipeline previousStage;
final int sourceOrOpFlags;
AbstractPipeline nextStage;
int depth;
int combinedFlags;
Spliterator<?> sourceSpliterator;
boolean linkedOrConsumed;
Runnable sourceCloseAction;
boolean parallel;
// From PipelineHelper:
StreamShape getSourceShape() {
go back stages until p.depth==0, then p.getOutputShape()
}
int getStreamAndOpFlags() {
combinedFlags
}
<P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator) {
size = spliterator.getExactSizeIfKnown() if StreamOpFlag.SIZED else -1
if size != -1 && StreamOpFlag.SIZE_ADJUSTING && !isParallel(), size = stage.exactOutputSize(size) for stages
}
<P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(sink), spliterator); return sink;
// Commonly called by TerminalOp.evaluateSequential() and
// AbstractTask.doLeaf() (by TerminalOp.evaluateParallel(), AbstractPipeline.opEvaluateParallel[Lazy]())
}
<P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
copyIntoWithCancel(_) if SHORT_CIRCUIT else wrappedSink.begin(_); spliterator.forEachRemaining(_); _.end();
}
<P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
go back stages until p.depth==0, then wrappedSink.begin(_); p.forEachWithCancel(_); wrappedSink.end();
}
<P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
sink = AbstractPipeline.opWrapSink(sink) for this+previous stages until depth==0
// Called by wrapAndCopyInto(_) (by terminal ops and stateful+parallel ops)
// Called by the WrappingSpliterator returned from wrap(_) (by wrapSpliterator(_))
// Called by a few other places where wrapAndCopyInto(_) was less convenient
}
<P_IN> Spliterator<E_OUT> wrapSpliterator(Spliterator<P_IN> sourceSpliterator) {
sourceSpliterator if depth==0 else wrap(this, () -> sourceSpliterator, isParallel())
// Called by overrides of opEvaluateParallelLazy() (DistinctOps, WhileOps, SliceOps, GathererOp)
}
<P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator, boolean flatten, IntFunction<E_OUT[]> generator) {
evaluateToNode(_) if isParallel() else wrapAndCopyInto(makeNodeBuilder(_), spliterator).build()
// Called by evaluateToArrayNode() (by toArray())
}
//--Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<E_OUT[]> generator);
// From BaseStream:
//--Iterator<T> iterator()
Spliterator<E_OUT> spliterator() {
this == sourceStage && sourceSpliterator != null -> sourceSpliterator
this == sourceStage && sourceSupplier != null -> lazySpliterator(sourceSupplier)
else -> wrap(this, () -> sourceSpliterator(0), isParallel())
}
boolean isParallel() {
sourceStage.parallel
}
Self parallel() {
sourceStage.parallel=true; return this
}
Self sequential() {
sourceStage.parallel=false; return this
}
//--Self unordered()
Self onClose(Runnable handler) {
sourceStage.sourceCloseAction = Streams.composeWithExceptions(existing, handler); return this
}
void close() {
sourceStage.sourceCloseAction.run()
}
// New declarations:
<R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
terminalOp.evaluate[Parallel|Sequential](this, sourceSpliterator(terminalOp.getOpFlags()))
// Called by all terminal ops except toArray()/toList()
}
Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
if isParallel() && previousStage != null && opIsStateful(), depth=0; opEvaluateParallel(_);
else evaluate(sourceSpliterator(0), true, generator)
// Called by toArray()/toList()
}
Spliterator<E_OUT> sourceStageSpliterator() {
only called on source stage, returns source[Spliterator|Supplier.get()], sets linkedOrConsumed
// Called by Head.forEach[Ordered]() when !isParallel()
}
int getStreamFlags() {
StreamOpFlag.toStreamFlags(combinedFlags)
}
Spliterator<?> sourceSpliterator(int terminalFlags) {
gets sourceStage.source[Spliterator|Supplier.get()]
if isParallel() && hasAnyStateful(), finds each p where p.opIsStateful()
spliterator = p.opEvaluateParallelLazy(previousStage, spliterator)
overrides depth! (0 if opIsStateful else ++) and combined flags on each p
returns spliterator
// Called by evaluate() (both overloads), and thus by all terminal ops
}
long exactOutputSize(long previousSize) {
previousSize // overridden by SliceOps (skip()/limit())
}
boolean isOrdered() {
StreamOpFlag.ORDERED.isKnown(combinedFlags)
}
abstract StreamShape getOutputShape();
abstract <P_IN> Node<E_OUT> evaluateToNode(PipelineHelper<E_OUT> ph, Spliterator<P_IN> s, boolean flatn, IntFunction<E_OUT[]> g);
abstract <P_IN> Spliterator<E_OUT> wrap(PipelineHelper<E_OUT> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel);
abstract Spliterator<E_OUT> lazySpliterator(Supplier<? extends Spliterator<E_OUT>> supplier);
abstract boolean forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink);
// Overridden in StatelessOp/StatefulOp + subclasses
abstract boolean opIsStateful();
abstract Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink);
// Overridden in StatefulOp subclasses
<P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> ph, Spliterator<P_IN> s, IntFunction<E_OUT[]> g) {
throw
// Called by evaluateToArrayNode(_) (by toArray()/toList()) when isParallel() && opIsStateful()
// Called by default opEvaluateParallelLazy()
}
<P_IN> Spliterator<E_OUT> opEvaluateParallelLazy(PipelineHelper<E_OUT> helper, Spliterator<P_IN> spliterator) {
opEvaluateParallel(helper, spliterator, Nodes.castingArray()).spliterator() // ie not lazy by default
// Called by sourceSpliterator(_) (by evaluate(), all terminal ops) when isParallel() && hasAnyStateful()
}
}
|
Arguably the most important class in the package, extended by
all the (reference and primitive) "pipeline" classes that implement the (reference and
primitive) "stream" interfaces. AbstractPipeline defines the core methods for advancing
elements through the stream, wrapping successive operation sinks, etc. AbstractPipelines
are constructed to form a doubly-linked list of "stages" - each intermediate stream
operation creates a new AbstractPipeline linked to the previous one, retaining access to
stream characteristics and operation-specific behavior at every previous stage. Like
many abstract classes in this package with reference- and primitive-flavored subclasses,
AbstractPipeline declares and calls into several abstract methods that are implemented
in subclasses, allowing for type specialization. |
| ReferencePipeline |
scrawl
abstract class ReferencePipeline<P_IN, P_OUT>
extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
implements Stream<P_OUT>
{
// Overrides of inherited abstract methods that were declared purely to enable primitive specialization
// Plus definitions of the familiar stream methods
// From PipelineHelper:
Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<E_OUT[]> generator) {
Nodes.builder(exactSizeIfKnown, generator)
// Called by evaluate(split, flatn, gen) (by evaluateToArrayNode(), toArray()) if !isParallel()
}
// From AbstractPipeline:
StreamShape getOutputShape() {
StreamShape.REFERENCE
}
<P_IN> Node<E_OUT> evaluateToNode(PipelineHelper<E_OUT> ph, Spliterator<P_IN> s, boolean flatn, IntFunction<E_OUT[]> g) {
Nodes.collect(ph, s, flatn, g)
// Called by evaluate(split, flatn, gen) if isParallel() (by evaluateToArrayNode(), toArray())
}
<P_IN> Spliterator<E_OUT> wrap(PipelineHelper<E_OUT> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel) {
new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel)
// Called by spliterator() and wrapSpliterator() (by opEvaluateParallelLazy() - WhileOps, DistinctOps, SliceOps,
// by sourceSpliterator(_) when isParallel() && hasAnyStateful(), by evaluate(), all terminal ops)
}
Spliterator<E_OUT> lazySpliterator(Supplier<? extends Spliterator<E_OUT>> supplier) {
new StreamSpliterators.DelegatingSpliterator<>(supplier)
// Called by spliterator() when this == sourceStage && sourceSupplier != null
}
boolean forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink) {
do { } while (!(cancelled = sink.cancellationRequested()) && spliterator.tryAdvance(sink))
// Called by copyIntoWithCancel()
}
//--boolean opIsStateful();
//--Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink);
// From BaseStream:
Iterator<T> iterator() {
Spliterators.iterator(spliterator())
}
Self unordered() {
new StatelessOp w/ StreamOpFlag.NOT_ORDERED, opWrapSink(sink) -> sink
}
// Head is instantiated in StreamSupport.stream(), which is THE method for creating an initial stream from a spliterator
// Called by concat(), generate(), builder().build(), all Collection.stream() impls, Arrays.stream()...
static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> { ... }
// StatelessOp is used directly by unordered(), filter(), map(), flatMap(), mapMulti(), peek().
// Common strategy is to make an anonymous subclass of StatelessOp, and inside
// override opWrapSink() to return an anonymous subclass of Sink.ChainedReference.
abstract static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> { ... }
// StatefulOp is used indirectly by distinct() (DistinctOps), sorted() (SortedOps), limit()/skip() (SliceOps), takeWhile()/dropWhile() (WhileOps).
// Common strategy is to make an anonymous subclass of StatefulOp, and inside
// override opWrapSink() to return an anonymous subclass of Sink.ChainedReference,
// and override other relevant methods on StatefulOp as well.
abstract static class StatefulOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> { ... }
}
|
Extends AbstractPipeline to implement Stream, by implementing
leftover specializable methods from AbstractPipeline, as well as the familiar stream
operation methods (which mostly delegate to utility classes, except for the simpler
stateless operations). ReferencePipeline itself is still abstract, and declares several
static inner subclasses: Head, for the first pipeline stage (simply wrapping a
spliterator), plus StatelessOp and StatefulOp, for intermediate operations. |
| TerminalOp |
scrawl
interface TerminalOp<E_IN, R> {
default StreamShape inputShape() { return StreamShape.REFERENCE; }
default int getOpFlags() { return 0; }
default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper, Spliterator<P_IN> spliterator) { evaluateSequential(_) }
<P_IN> R evaluateSequential(PipelineHelper<E_IN> helper, Spliterator<P_IN> spliterator);
}
|
Interface that defines how to evaluate a terminal operation,
in sequential and parallel modes, given an upstream pipeline and spliterator. Most
terminal operations use this interface, but there are a couple special cases that are
implemented directly on AbstractPipeline: toArray()/toList() and spliterator()/iterator(). |
| TerminalSink |
scrawl
interface TerminalSink<T, R> extends Sink<T>, Supplier<R> { }
|
A Sink that also extends Supplier. Used by terminal operations,
which don't have a downstream sink, but do want to expose a result after the upstream is
done sending elements. The TerminalSink interface is only used inside TerminalOps, and
only if they have multiple TerminalSink implementations to abstract over. |
| StreamShape |
scrawl
enum StreamShape { REFERENCE, INT_VALUE, LONG_VALUE, DOUBLE_VALUE; }
|
Enum of the 4 (1 reference and 3 primitive) stream variants.
Though passed around widely, its usages (aside from assertions) ultimately boil down to
selecting a type-specialized implementation in a handful of places. |
| StreamOpFlag |
scrawl
enum StreamOpFlag {
DISTINCT, SORTED, ORDERED, SIZED, SHORT_CIRCUIT, SIZE_ADJUSTING;
enum Type { SPLITERATOR, STREAM, OP, TERMINAL_OP, UPSTREAM_TERMINAL_OP; }
// other masking + combining stuff
}
|
Enum of stream characteristics, used to optimize various stream
operations. Flags are initially derived from the Head spliterator characteristics, and
may be set, cleared, or unaffected by each stream operation. Each pipeline stage tracks
its own changes to the stream flags, so that the changes can be combined
one-stage-at-a-time when the stream is evaluated. |
| AbstractSpinedBuffer |
scrawl
abstract class AbstractSpinedBuffer {
int initialChunkPower;
int elementIndex;
int spineIndex;
long[] priorElementCount;
boolean isEmpty() { ... }
long count() { ... }
int chunkSize(int n) { ... }
abstract void clear();
}
|
Like several other abstract classes, exists to deduplicate
common functionality from reference and primitive subclasses. Extended by SpinedBuffer
itself (reference variant), and the primitive variants, which are static inner classes
inside SpinedBuffer. |
| SpinedBuffer |
scrawl
class SpinedBuffer<E>
extends AbstractSpinedBuffer
implements Consumer<E>, Iterable<E>
{
E[] curChunk;
E[][] spine;
long capacity() { ... }
void inflateSpine() { ... }
void ensureCapacity(long targetSize) { ... }
void increaseCapacity() { ... }
E get(long index) { ... }
void copyInto(E[] array, int offset) { ... }
E[] asArray(IntFunction<E[]> arrayFactory) { ... }
void clear() { ... }
Iterator<E> iterator() { ... }
void forEach(Consumer<? super E> consumer) { ... }
void accept(E e) { ... }
String toString() { ... }
Spliterator<E> spliterator() { ... }
abstract static class OfPrimitive<E, T_ARR, T_CONS>
extends AbstractSpinedBuffer
implements Iterable<E>
static class OfInt
extends SpinedBuffer.OfPrimitive<Integer, int[], IntConsumer>
implements IntConsumer
...
}
|
Basically an append-only list, backed by an array-of-arrays to
avoid copy-on-resize. When a subarray ("chunk") fills up, a new one is allocated (with
double the capacity) for subsequent elements. Copying only happens if the outer array
("spine") is filled with full chunks, at which point chunk references are copied to a
new spine with double the (chunk) capacity. This data structure is used internally in
places where elements must be collected into a buffer whose final size is not known
upfront. |
| Node |
scrawl
interface Node<T> {
Spliterator<T> spliterator();
void forEach(Consumer<? super T> consumer);
default int getChildCount() { return 0; }
default Node<T> getChild(int i) { throw }
default Node<T> truncate(long from, long to, IntFunction<T[]> generator) {
nb = Nodes.builder(size = to-from, generator) // size is known -> FixedNodeBuilder
nb.begin(size); advance spliterator() into nb between from and to; nb.end();
nb.build()
}
T[] asArray(IntFunction<T[]> generator);
void copyInto(T[] array, int offset);
default StreamShape getShape() { return StreamShape.REFERENCE; }
long count();
interface Builder<T>
extends Sink<T>
{
Node<T> build();
interface OfInt
extends Node.Builder<Integer>, Sink.OfInt
{
Node.OfInt build();
}
...
}
interface OfPrimitive<T, ...>
extends Node<T>
{ ... }
interface OfInt
extends OfPrimitive<Integer, ...>
...
}
|
Basically a list, backed by a binary tree of Nodes. A "leaf" Node simply
wraps an array/collection, while an "internal" Node concatenates two child Nodes. The shape of a
node tree generally corresponds to the shape of a parallel computation tree (of fork-join tasks)
that produced it: Leaf tasks produce leaf nodes, and parent tasks concatenate child nodes. "The
use of Node within the stream framework is largely to avoid copying data unnecessarily during
parallel operations." Inner interface Node.Builder extends Sink. |
| AbstractTask |
scrawl
abstract class AbstractTask<P_IN, P_OUT, R, Self>
extends CountedCompleter<R>
{
PipelineHelper<P_OUT> helper;
Spliterator<P_IN> spliterator;
long targetSize;
Self leftChild;
Self rightChild;
R localResult;
static int getLeafTarget() { ... }
static long suggestTargetSize(long sizeEstimate) { ... }
// From CountedCompleter:
void compute() { ... }
R getRawResult() { ... }
void setRawResult(R result) { ... }
void onCompletion(CountedCompleter<?> caller) { ... }
// New:
long getTargetSize(long sizeEstimate) { ... }
R getLocalResult() { ... }
void setLocalResult(R result) { ... }
boolean isLeaf() { ... }
boolean isRoot() { ... }
Self getParent() { ... }
boolean isLeftmostNode() { ... }
abstract Self makeChild(Spliterator<P_IN> spliterator);
abstract R doLeaf();
}
|
A ForkJoinTask (a CountedCompleter, specifically) used to implement the
parallel-eager mode of most stateful or terminal stream operations. Each task is associated
with a spliterator. When invoked, a task splits its spliterator (if possible and size is above a
threshold) and passes the two halves to new child tasks, forming a parallel computation tree. If
a task does not split, it is a leaf task, and consumes its spliterator in some way (left
abstract) to produce a result, then tries to complete its parent. When both children complete,
the parent completes, which is usually overridden to combine child results to produce its own
result, before trying to complete its parent, and so on up the tree. When a task splits, it
typically fork():s one child task (submits it to the ForkJoinPool, to be executed when a thread
is available), and "becomes" the other (continues splitting the child's spliterator, or performs
the leaf action if child is a leaf task). This avoids blocking the thread executing the task. |
| AbstractShortCircuitTask |
scrawl
abstract class AbstractShortCircuitTask<P_IN, P_OUT, R, Self>
extends AbstractTask<P_IN, P_OUT, R, Self>
{
AtomicReference<R> sharedResult;
volatile boolean canceled;
// From AbstractTask:
void compute() { ... }
R getRawResult() { ... }
R getLocalResult() { ... }
void setLocalResult(R result) { ... }
// New:
void shortCircuit(R result) { ... }
void cancel() { ... }
boolean taskCanceled() { ... }
void cancelLaterNodes() { ... }
abstract R getEmptyResult();
}
|
An AbstractTask extended with the ability to short-circuit
compare-and-set the root task's result, plus a 'canceled' flag (which is checked at least
once, when the task is first invoked), and the ability to cancel all tasks that come
later in the root spliterator's encounter order. Used to implement the parallel-eager
mode of short-circuiting stateful or terminal stream operations. |
| Collector (public) |
|
The (hopefully) familiar interface for expressing a custom,
parallelizable, terminal reduction operation. A parallel computation tree would perform the
operation by having each leaf task initialize an accumulation (via supplier()) and mutate it
with incoming elements (via accumulator()), then have each parent task combine final child
accumulations (via combiner()), and the root task transform the final combined accumulation
(via finisher()). In sequential operation, the root task is a leaf task, and no combining is
needed. Collectors can indicate by their Characteristics if they are "concurrent" (can use same
accumulation in all tasks), "unordered" (can accumulate elements in any order), and/or use a
no-op finisher(). This enables optimizations in some cases. |
| Gatherer (public) |
|
The interface for expressing a custom, optionally parallelizable,
intermediate operation. A parallel computation tree would perform the operation by having each
leaf task initialize some state (via initializer()) and optionally mutate it with incoming
elements and/or emit downstream (via integrator()), then have each parent task combine final
child state (via combiner()), and the root task use the final combined state to optionally emit
more downstream. In sequential operation (a sequential Stream or a sequential Gatherer), no
combining is needed. Gatherers can indicate by their arguments whether they are "stateless" (use
a no-op initializer()), "greedy" (integrator never initiates cancellation - only propagates
downstream cancellation), "sequential" (use a no-op combiner()), and/or use a no-op finisher().
This enables (or disables) optimizations in some cases. Two gatherers can be fused into one
using Gatherer.andThen(). The resulting gatherer is stateless if both initial gatherers were
stateless and greedy; greedy if both were greedy; sequential if either was sequential; and uses
a no-op finisher() if both did. |
| GathererOp |
|
Implementation of gather() (not nested in a utility class, unlike other
stateful ops). GathererOp extends ReferencePipeline directly, rather than StatelessOp or
StatefulOp. This is presumably because gather() could theoretically be either a stateless or
stateful intermediate operation, depending on the Gatherer. Currently it is conservatively
always considered stateful. |
| Utility |
StreamSupport (public) |
|
Creates streams from spliterators or suppliers-of-spliterators,
by instantiating the appropriate Head class. This is THE entry point for creating an
initial stream - called by Stream.concat(), generate(), builder().build(), all
Collection.stream() impls, Arrays.stream()... |
| StreamSpliterators |
Spliterators for Stream.spliterator(), skip()/limit(), distinct(), generate() |
| Streams |
Spliterators for Stream.concat(), range(), implementation for builder(),
plus composeWithExceptions() (used by onClose()) and composedClose() (used by concat()) |
| Nodes |
scrawl
class Nodes {
static <T> IntFunction<T[]> castingArray() { ... }
static <T> Node<T> emptyNode(StreamShape shape) { ... }
static <T> Node<T> conc(StreamShape shape, Node<T> left, Node<T> right) { ... }
static <T> Node<T> node(T[] array) { ... }
static <T> Node<T> node(Collection<T> c) { ... }
static <T> Node.Builder<T> builder(long exactSizeIfKnown, IntFunction<T[]> generator) { ... }
static <T> Node.Builder<T> builder() { ... }
static <P_IN, P_OUT> Node<P_OUT> collect(PipelineHelper<P_OUT> ph, Spliterator<P_IN> split, boolean fltn, IntFunction<P_OUT[]> gen) { ... }
static <T> Node<T> flatten(Node<T> node, IntFunction<T[]> generator) { ... }
// Primitive variants (int shown)
static Node.OfInt node(int[] array) { ... }
static Node.Builder.OfInt intBuilder(long exactSizeIfKnown) { ... }
static Node.Builder.OfInt intBuilder() { ... }
static <P_IN> Node.OfInt collectInt(PipelineHelper<Integer> ph, Spliterator<P_IN> split, boolean fltn) { ... }
static Node.OfInt flattenInt(Node.OfInt node) { ... }
...
EmptyNode (leaf node that is empty)
ArrayNode (leaf node around a pre-existing array)
CollectionNode (leaf node around a pre-existing collection)
FixedNodeBuilder (leaf node that accumulates elements up to a known size)
SpinedNodeBuilder (leaf node that accumulates elements up to an unknown size)
ConcNode (branch node around left and right nodes)
InternalNodeSpliterator (used by ConcNode)
ToArrayTask (task that parallel consumes a node/tree into a single array)
SizedCollectorTask (task that parallel consumes spliterator splits into a single array)
CollectorTask (task that parallel consumes spliterator splits into a tree of nodes)
...all of these classes have ref+primitive specializations
}
|
A utility that implements "leaf" Nodes for wrapping arrays and
collections, Node.Builders (that are themselves both Sinks and "leaf" Nodes - build() returns
this) to accumulate a known or unknown amount of elements (using an array or SpinedBuffer,
respectively), the "internal"/concat Node, and a few additional Node-related helper methods and
tasks |
| Collectors (public) |
|
A (hopefully) familiar utility that defines a variety of
common collectors, some of which accept a downstream collector and act as "wrapping"
or "multilevel" collectors |
| Gatherers (public) |
A utility that defines a variety of (expected-to-be) common gatherers |
| DistinctOps |
Implementation of distinct() |
| SliceOps |
Implementation of skip(), limit() |
| SortedOps |
Implementation of sorted() |
| WhileOps |
Implementation of takeWhile(), dropWhile() |
| FindOps |
Implementation of findFirst(), findAny() |
| ForEachOps |
Implementation of forEach(), forEachOrdered() |
| MatchOps |
Implementation of anyMatch(), allMatch(), noneMatch() |
| ReduceOps |
Implementation of reduce(), collect(), count() |
# Stream lifecycle summary ↩
This is an outline of how a stream executes, from creation to consumption.
- StreamSupport.stream() instantiates ReferencePipeline.Head from spliterator, or
spliterator-supplier and characteristics.
Intermediate ops instantiate a subclass of ReferencePipeline.StatelessOp or
ReferencePipeline.StatefulOp (new: or GathererOp), with the previous pipeline as parent.
- The subclass overrides opWrapSink(), and if op is stateful overrides opEvaluateParallel() and
optionally opEvaluateParallelLazy().
- opWrapSink() is the "sequential" implementation that describes whether/how to propagate
received elements to a downstream sink. This method is always used for stateless ops
(which leave the "opEvaluateParallel" methods unimplemented), and only used for stateful ops if
the pipeline is sequential.
Terminal ops (except toArray(gen), spliterator(), and derivatives) instantiate a subclass of
TerminalOp, and call pipeline.evaluate(terminalOp) to consume the stream and produce a result.
This constructs a sourceSpliterator(), which will initially be the Head spliterator,
and if parallel will be replaced with the spliterator result of calling opEvaluateParallelLazy()
on each stateful op in the pipeline, in order.
- By default, opEvaluateParallelLazy() calls opEvaluateParallel(), which by default throws. Stateful ops must at least override opEvaluateParallel().
opEvaluateParallel() produces a Node. Different implementations take different approaches:
- WhileOps instantiates and invokes a custom subclass of Abstract[ShortCircuit]Task to produce a Node.
- SortedOps evaluate():s the pipeline to an array, calls Arrays.parallelSort(), and wraps the result in a Node.
- SliceOps sometimes instantiates a custom task, and sometimes instantiates a custom wrapping spliterator and consumes it with Nodes.collect().
- DistinctOps instantiates a terminal op (ReduceOp or ForEachOp) to evaluateParallel(), and wraps the result in a Node.
- opEvaluateParallelLazy() instantiates a custom wrapping spliterator if possible, else essentially does opEvaluateParallel().spliterator().
Then it returns either terminalOp.evaluateSequential() or terminalOp.evaluateParallel(),
passing the pipeline and spliterator.
evaluateSequential():
- Typically implemented by instantiating a custom TerminalSink, then calling pipeline.wrapAndCopyInto(sink, spliterator).get().
- wrapAndCopyInto() first wraps the sink via sink = opWrapSink(sink) on each pipeline stage, in reverse order. Then begin():s the sink,
advances the spliterator into it (checking cancellation before each advance if the pipeline is short-circuiting), then end():s the sink.
evaluateParallel():
- Typically implemented by instantiating and invoking a custom ForkJoinTask - may extend Abstract[ShortCircuit]Task, or even
CountedCompleter directly. The TerminalOp itself is typically passed to this task and used in leaf tasks to implement behavior
similar to evaluateSequential() - instantiating a custom TerminalSink, wrapping it, and advancing a spliterator into it to get a task result.
The remaining terminal ops (toArray(gen), spliterator(), and derivatives) are implemented
directly on AbstractPipeline:
toArray(gen):
- Calls evaluateToArrayNode()...
- which calls opEvaluateParallel() (if parallel and last op was stateful), OR evaluate()...
- which calls wrapAndCopyInto(makeNodeBuilder(), sourceSpliterator).build() (if sequential), OR evaluateToNode() (if parallel)...
- which consumes the sourceSpliterator with Nodes.collect()
spliterator():
- if Head, returns the original spliterator if present, or the spliterator-supplier wrapped in a delegating spliterator.
- else, wraps a supplier of sourceSpliterator() in a spliterator, that itself wraps the (remaining) pipeline around a buffering sink.
- Advancing this wrapped spliterator will first look for the next element in the buffer. If the buffer is empty, the wrapped spliterator will
advance the source spliterator into the wrapped sink (which may buffer multiple outputs), until either the buffer is not empty or the source
spliterator is exhausted.
# Stream op flags usages ↩
This list details where each stream op flag is read to optimize the stream.
DISTINCT
- read by DistinctOps opWrapSink, opEvaluateParallel[Lazy] to determine if we can no-op
SORTED
- read by SortedOps opWrapSink, opEvaluateParallel to determine if we can no-op
- read by DistinctOps opWrapSink to determine if we can use a cheap "lastSeen" reference instead of a full Set
ORDERED
- read by DistinctOps opEvaluateParallel[Lazy] to determine if order is needed (after failing (DISTINCT))
- if so, reduce into a LinkedHashSet, instead of forEach-ing into a ConcurrentHashMap (eager) or using DistinctSpliterator (lazy)
- read by SliceOps opEvaluateParallel[Lazy] to determine if order is needed (after failing (size > 0 && SUBSIZED))
- if not, use UnorderedSliceSpliterator (lazy) and collect to Node (eager)
- read by WhileOps opEvaluateParallelLazy to determine if order is needed
- if yes, use opEvaluateParallel().spliterator() -> TakeWhileTask or DropWhileTask (instead of UnorderedWhileSpliterator)
- used in tasks: indicates to drop nodes after first short-circuit node (in TakeWhileTask), or retain+truncate skipped elements (in DropWhileTask)
- read by FindOps.evaluateParallel() to determine if task mustFindFirst (recovering whether findFirst() or findAny() created the op)
- read by ReferencePipeline.unordered() to no-op if not ordered, and by collect() as an acceptable alternative to collector UNORDERED (to use forEach)
SIZED
- read by SortedOps opWrapSink to pre-size array (if !SORTED)
- read by AbstractPipeline.exactOutputSizeIfKnown() to check if output size can be derived from spliterator exact size
SHORT_CIRCUIT
- read by AbstractPipeline.isShortCircuitingPipeline() - checked by flatMap() to determine how to advance inner stream (allMatch vs forEach)
- read+cleared by stateful ops in AbstractPipeline.sourceSpliterator()
- read by AbstractPipeline.copyInto() to determine if slower copyIntoWithCancel() is needed
- read by ForEachTask to stop splitting if shared sink cancelled
SIZE_ADJUSTING
- read by AbstractPipeline.exactOutputSizeIfKnown() to check if output size can be derived from spliterator exact size + known adjustments
- SIZE_ADJUSTING basically preserves SIZED when there are known fixed-amount adjustments, but requires calculating the adjusted size
Related usages:
AbstractPipeline.exactOutputSizeIfKnown(spliterator)
- read by SliceOps opEvaluateParallel[Lazy] as part of checking if pipeline is sized (to use SliceSpliterator)
- and in root leaf task to (theoretically) pre-size Node
- read by WhileOps DropWhileTask on non-root leaf task to (theoretically) pre-size Node builder
- read by ForEachOps ForEachOrderedTask to pre-size Node builder (if needed)
- read by ReduceOps for count() evaluate[Sequential|Parallel] to bypass all processing if size is known
- read by StreamSpliterators.AbstractWrappingSpliterator.getExactSizeIfKnown()
- read by AbstractPipeline.evaluate(split, fltn, gen) to pre-size Node builder in sequential() case
- read by Nodes.collect*(ph, split, fltn[, gen]) to pre-size array in parallel() case
- and in called CollectorTask to pre-size leaf Node builder
# Stream operations summary ↩
The below table details how each stream operation affects stream op flags and memory usage in isolation.
Note: The "ΔFlags" below (mostly corresponding to AbstractPipeline.sourceOrOpFlags)
represent the changes that one operation applies to the "cumulative" stream flags (mostly corresponding to
AbstractPipeline.combinedFlags). For intermediate operations, these changes are applied AFTER the operation,
to affect downstream operations. For terminal operations, these changes are applied BEFORE the operation, to
affect the terminal operation itself (if it even bothers to use this mechanism).
Note: The "Memory Usage" columns below sometimes refer to what I call "amplifying" operations -
operations that may increase the number of elements in the stream. These include flatMap(), mapMulti(),
and gather(). Conversely, "de-amplifying" operations may decrease the number of elements in the stream.
These include filter(), skip(), limit(), takeWhile(), and dropWhile() - as well as flatMap(), mapMulti(),
and gather() again (since they may emit no output elements for an input element). I conservatively ignore
the possible effect of de-amplifying operations when considering memory usage.
| Intermediate Operations |
ΔFlags (combined AFTER operation) |
Memory Usage |
| DISTINCT |
SORTED |
ORDERED |
SIZED |
SHORT_CIRCUIT |
SIZE_ADJUSTING |
sequential |
parallel |
| Stateless |
map() |
scrawl
map(mapper) -> StatelessOp {
NOT_SORTED | NOT_DISTINCT
opWrapSink(flags, sink) {
apply mapper and emit transformed element downstream
cancellationRequested() = downstream.cancellationRequested()
}
}
|
CLEARS |
CLEARS |
|
|
|
|
Okay |
Okay |
| flatMap() |
scrawl
flatMap(mapper) -> StatelessOp {
NOT_SORTED | NOT_DISTINCT | NOT_SIZED
opWrapSink(flags, sink) {
apply mapper and emit stream of elements downstream (until downstream cancels, if short-circuiting)
cancellationRequested() = cancel || (cancel |= sink.cancellationRequested())
}
}
|
CLEARS |
CLEARS |
|
CLEARS |
|
|
Okay |
Okay |
| mapMulti() |
scrawl
mapMulti(mapper) -> StatelessOp {
NOT_SORTED | NOT_DISTINCT | NOT_SIZED
opWrapSink(flags, sink) {
apply mapper to emit multiple elements downstream
cancellationRequested() = downstream.cancellationRequested()
}
}
|
CLEARS |
CLEARS |
|
CLEARS |
|
|
Okay |
Okay |
| filter() |
scrawl
filter(predicate) -> StatelessOp {
NOT_SIZED
opWrapSink(flags, sink) {
emit element downstream if passes predicate
cancellationRequested() = downstream.cancellationRequested()
}
}
|
|
|
|
CLEARS |
|
|
Okay |
Okay |
| peek() |
scrawl
peek(action) -> StatelessOp {
0
opWrapSink(flags, sink) {
apply action then emit same element downstream
cancellationRequested() = downstream.cancellationRequested()
}
}
|
|
|
|
|
|
|
Okay |
Okay |
| unordered() |
scrawl
unordered() -> StatelessOp {
NOT_ORDERED
opWrapSink(flags, sink) {
sink // No-op
cancellationRequested() = downstream.cancellationRequested()
}
}
|
|
|
CLEARS |
|
|
|
Okay |
Okay |
| Stateful |
skip() |
scrawl
skip(skip) -> SliceOps.makeRef(this, skip, -1) -> StatefulOp {
limit(limit) -> SliceOps.makeRef(this, 0, limit) -> StatefulOp {
IS_SIZE_ADJUSTING | ((limit != -1) ? IS_SHORT_CIRCUIT : 0)
opWrapSink(flags, sink) {
ignore first skip elements, then emit next limit elements
// cancellationRequested() = reached limit || downstream.cancellationRequested()
}
opEvaluateParallel(ph, split, gen) {
if size > 0 && SUBSIZED: StreamSpliterators.SliceSpliterator.OfRef(_) |> Nodes.collect(_)
if !ORDERED: StreamSpliterators.UnorderedSliceSpliterator.OfRef(_) |> Nodes.collect(_)
else: new SliceTask(_).invoke()
}
opEvaluateParallelLazy(ph, split) {
if size > 0 && SUBSIZED: ph.wrapSpliterator(split) |> StreamSpliterators.SliceSpliterator.OfRef(_) {
skips until start of slice (skip), then advances one-at-a-time until end of slice (limit)
trySplit drops out-of-range splits to try splitting again
}
if !ORDERED: StreamSpliterators.UnorderedSliceSpliterator.OfRef(_) {
there's an atomic long of permits, initially set to (limit >= 0 ? skip + limit : skip); permits monotonically decreases
splitting is disabled if there are no permits left*
if there are permits remaining, we optimistically read+buffer data from spliterator (tryAdvance <= 1, forEachRemaining <= chunkSize)
then we request to acquire permits up to buffer size, reducing permits, receiving n <= request (n < request -> skipping or limiting)
if > 0, we pass that many elements from the buffer to the action
}
else: SliceTask(_).invoke().spliterator() {
each leaf copies its split into a wrapped node-builder
onCompletion, non-leaf tasks concat child nodes, and root task truncates its node to the slice
onCompletion, non-leaf non-root tasks cancel later tasks if their size + size of completed tasks to the left >= skip + limit
}
}
}
|
|
|
|
|
|
SETS |
Okay |
Caution: (a) Eager if ORDERED and !(SIZED and source-spliterator is SUBSIZED),
(b) Buffering risk if upstream contained amplifying operations |
| limit() |
|
|
|
|
SETS |
SETS |
Okay |
Caution: (a) Eager if ORDERED and !(SIZED and source-spliterator is SUBSIZED),
(b) Buffering risk if upstream contained amplifying operations |
| takeWhile() |
scrawl
takeWhile(predicate) -> WhileOps.makeTakeWhileRef(this, predicate) -> StatefulOp {
NOT_SIZED | IS_SHORT_CIRCUIT
opWrapSink(flags, sink) {
// take until predicate fails
// cancellationRequested() = predicate failed || downstream.cancellationRequested()
}
opEvaluateParallel(ph, split, gen) {
TakeWhileTask(_).invoke() {
each leaf copies its split into a wrapped node-builder, with cancellation (if cancellationRequested, cancel later tasks)
onCompletion, non-leaf tasks adopt left child node if ordered and left short-circuited, else concat left and right nodes
}
}
opEvaluateParallelLazy(ph, split) {
if ORDERED: opEvaluateParallel(_).spliterator()
else: UnorderedWhileSpliterator.OfRef.Taking(_) {
// Emits elements until predicate fails, or cancelled (predicate failed in another split; checked ~1/64 steps)
// Splitting is disabled if cancelled
}
}
}
|
|
|
|
CLEARS |
SETS |
|
Okay |
Caution: (a) Eager if ORDERED, (b) Buffering risk if lazy AND upstream
contained amplifying operations |
| dropWhile() |
scrawl
dropWhile(predicate) -> WhileOps.makeDropWhileRef(this, predicate) -> StatefulOp {
NOT_SIZED
opWrapSink(flags, sink) {
// drop until predicate passes (may retain and count dropped elements if used from DropWhileTask)
// cancellationRequested() = downstream.cancellationRequested()
}
opEvaluateParallel(ph, split, gen) {
DropWhileTask(_).invoke() {
each leaf copies its split into a wrapped node-builder
if unordered, the op sink drops elements (until the predicate fails) as it goes (no attempt to cancel others' dropping once failed)
else, the op sink emits and counts "dropped" elements (into the node builder)
onCompletion, non-leaf tasks concat child nodes, and if ordered use the drop counts to truncate the concat node from the left
(ie, if a left node's drop count < its node count, subsequent node drop counts are ignored and "dropped" elements are preserved)
}
}
opEvaluateParallelLazy(ph, split) {
if ORDERED: opEvaluateParallel(_).spliterator()
else: UnorderedWhileSpliterator.OfRef.Dropping(_) {
// Drops elements until predicate fails, or cancelled (predicate failed in another split; checked ~1/64 steps)
}
}
}
|
|
|
|
CLEARS |
|
|
Okay |
Caution: (a) Eager if ORDERED, (b) Buffering risk if lazy AND upstream
contained amplifying operations |
| gather() |
scrawl
gather(gatherer) -> GathererOp.of(this, gatherer) -> GathererOp {
NOT_SORTED | NOT_DISTINCT | NOT_SIZED | (integrator is greedy ? 0 : IS_SHORT_CIRCUIT)
opWrapSink(flags, sink) {
initialize state, then integrate elements, then emit finishing elements
stop emitting elements if integrator or downstream cancels
cancellationRequested() = integrator returned false or downstream requested cancellation
}
opEvaluateParallel(ph, split, gen) {
evaluate(wrappedSpliterator, _, NodeBuilder collector parts)
}
opEvalauteParallelLazy(ph, split) {
opEvaluateParallel(_).spliterator()
}
collect(_) {
// collect() is overridden to avoid gather() accumulating into a Node first
// (since GathererOp reports that it is a stateful op)
evaluate(wrappedSpliterator, _, collector parts)
}
evaluate(_) {
Sequential {
// Used directly if stream is sequential, else used indirectly by Hybrid/Parallel
initialize gatherer + collector state, integrate elements into collector accumulator,
then emit finishing elements and finish collector
stop emitting elements if integrator cancels
}
Hybrid {
// Used if stream is parallel and gatherer is sequential (default combiner)
like parallel forEachOrdered(), but only buffers the upstream if gatherer is greedy
makes no use of the gatherer's or collector's combiners
cancels later tasks if !greedy and the integrator short-circuits
}
Parallel {
// Used if stream is parallel and gatherer is not sequential (non-default combiner)
parent tasks use the gatherer's and collector's combiners to merge childrens' state
cancels later tasks if !greedy and the integrator short-circuits
}
}
}
|
CLEARS |
CLEARS |
|
CLEARS |
SETS if gatherer is !greedy |
|
Caution: Memory usage depends on how the gatherer's state grows |
Caution: (a) Eager unless followed by collect(), (b) Memory usage depends on
how the gatherer's state grows, (c) Buffers elements from upstream until predecessor tasks finish
if gatherer is sequential and greedy, (d) Buffering risk if gatherer is !greedy AND upstream
contained amplifying operations |
| distinct() |
scrawl
distinct() -> DistinctOps.makeRef(this) -> StatefulOp {
IS_DISTINCT | NOT_SIZED
opWrapSink(flags, sink) {
if DISTINCT: sink // No-op
if SORTED: store last emitted element, emit next element if not equal
else: store set of emitted elements, emit next element if not in set
// cancellationRequested() = downstream.cancellationRequested()
}
opEvaluateParallel(ph, split, gen) {
if DISTINCT: ph.evaluate(split, false, gen)
if ORDERED: ReduceOps.makeRef(<into LinkedHashSet>).evaluateParallel(_) |> Nodes.node(<set>)
else: ForEachOps.makeRef(<into ConcurrentHashMap>).evaluateParallel(_) |> Nodes.node(<map keys>)
}
opEvaluateParallelLazy(ph, split) {
if DISTINCT: ph.wrapSpliterator(split) // No-op
if ORDERED: ReduceOps.makeRef(<into LinkedHashSet>).evaluateParallel(_) |> Nodes.node(<set>).spliterator()
else: StreamSpliterators.DistinctSpliterator(ph.wrapSpliterator(split)) {
// Emits elements if not present in a ConcurrentHashMap, shared by all splits
}
}
}
|
SETS |
|
|
CLEARS |
|
|
Caution: Accumulates into a Set if !DISTINCT and !SORTED |
Caution: (a) Eager if !DISTINCT and ORDERED, (b) Accumulates into a Set if
!DISTINCT and !ORDERED, (c) Buffering risk if lazy AND upstream contained amplifying operations
AND downstream consumes elements one-at-a-time |
| sorted() |
scrawl
sorted([cmp]) -> SortedOps.makeRef(this[, cmp]) -> StatefulOp {
IS_ORDERED | (isNaturalSort ? IS_SORTED : NOT_SORTED)
opWrapSink(flags, sink) {
if SORTED && isNaturalSort: sink // No-op
if SIZED: SizedRefSortingSink(sink, cmp) {
// begin() by making an array, accept() into the array, end() by sorting and emitting all downstream
// cancellationRequested() = false, but records that call happened
// - indicates that pipeline is short-circuiting, so end() should poll downstream.cancellationRequested() after each emission
}
else: RefSortingSink(sink, cmp) {
// begin() by making a list, accept() into the list, end() by sorting and emitting all downstream
// cancellationRequested() = false, but records that call happened
// - indicates that pipeline is short-circuiting, so end() should poll downstream.cancellationRequested() after each emission
}
}
opEvaluateParallel(ph, split, gen) {
if SORTED && isNaturalSort: ph.evaluate(split, false, gen)
else: ph.evaluate(split, true, gen).asArray(gen) |> Arrays.parallelSort(_) |> Nodes.node(_)
}
//--opEvaluateParallelLazy(ph, split);
}
|
|
SETS if no comparator else CLEARS |
SETS |
|
|
|
Caution: Accumulates elements into an array/List if !(SORTED and no comparator) |
Warning: Always eager; Accumulates elements into an array/List |
| Terminal Operations |
ΔFlags (combined BEFORE operation) |
Memory Usage |
| DISTINCT |
SORTED |
ORDERED |
SIZED |
SHORT_CIRCUIT |
SIZE_ADJUSTING |
sequential |
parallel |
| |
min() |
scrawl
min(cmp) -> reduce(BinaryOperator.minBy(cmp))
max(cmp) -> reduce(BinaryOperator.maxBy(cmp))
|
|
|
|
|
|
|
Okay |
Okay |
| max() |
|
|
|
|
|
|
Okay |
Okay |
| findFirst() |
scrawl
findFirst() -> evaluate(FindOps.makeRef(true)) -> TerminalOp {
findAny() -> evaluate(FindOps.makeRef(false)) -> TerminalOp {
getOpFlags() {
IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : NOT_ORDERED)
}
evaluateSequential(ph, split) {
ph.wrapAndCopyInto(FindSink(), split).get() ?: emptyValue
}
evaluateParallel(ph, split) {
mustFindFirst = ORDERED, FindTask(_).invoke() {
each leaf copies into a FindSink()
if it finds a result:
if op is findAny or node is leftmost: short-circuit shared/root result
else: cancel later tasks (prevent further splitting/forking)
onCompletion, non-leaf tasks adopt the first present result of their children (if any, and cancel later tasks)
}
}
FindSink {
accept(value) {
if !hasValue: hasValue = true, this.value = value
}
get() {
hasValue ? Optional.of(value) : null
}
cancellationRequested() {
hasValue
}
}
}
|
|
|
|
|
SETS |
|
Okay |
Okay |
| findAny() |
|
|
CLEARS |
|
SETS |
|
Okay |
Okay |
| anyMatch() |
scrawl
anyMatch(predicate) -> evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY)) -> TerminalOp {
allMatch(predicate) -> evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL)) -> TerminalOp {
noneMatch(predicate) -> evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE)) -> TerminalOp {
getOpFlags() {
IS_SHORT_CIRCUIT | NOT_ORDERED
}
evaluateSequential(ph, split) {
ph.wrapAndCopyInto(MatchSink(), split).getAndClearState()
}
evaluateParallel(ph, split) {
MatchTask(_).invoke() {
doLeaf():
boolean b = ph.wrapAndCopyInto(MatchSink(), split).getAndClearState()
if (b == matchKind.shortCircuitResult) shortCircuit(b)
getEmptyResult(): !matchKind.shortCircuitResult
}
}
MatchSink {
boolean stop, value = !matchKind.shortCircuitResult
accept(t) {
if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) {
stop = true;
value = matchKind.shortCircuitResult;
}
}
getAndClearState() { // returns boolean; avoids boxing like TerminalSink.get()
value
}
cancellationRequested() {
stop
}
}
}
|
|
|
CLEARS |
|
SETS |
|
Okay |
Okay |
| allMatch() |
|
|
CLEARS |
|
SETS |
|
Okay |
Okay |
| noneMatch() |
|
|
CLEARS |
|
SETS |
|
Okay |
Okay |
| count() |
scrawl
count() -> evaluate(ReduceOps.makeRefCounting()) -> TerminalOp {
getOpFlags() {
NOT_ORDERED
}
evaluateSequential(ph, split) {
if size is known: size
else: ph.wrapAndCopyInto(CountingSink(), split)
}
evaluateParallel(ph, split) {
if size is known: size
else: ReduceTask(this, ph, split).invoke().get()
}
CountingSink {
begin(size) { count = 0 }
accept(t) { count++ }
combine(other) { count += other.count }
get() { count }
//--cancellationRequested()
}
}
|
|
|
CLEARS |
|
|
|
Okay |
Okay |
| forEach() |
scrawl
forEach(action) -> evaluate(ForEachOps.makeRef(action, false)) -> TerminalOp, TerminalSink {
forEachOrdered(action) -> evaluate(ForEachOps.makeRef(action, true)) -> TerminalOp, TerminalSink {
getOpFlags() {
ordered ? 0 : NOT_ORDERED
}
evaluateSequential(ph, split) {
ph.wrapAndCopyInto(this, split).get()
}
evaluateParallel(ph, split) {
if (ordered): ForEachOrderedTask(_).invoke() {
if a leaf task is blocked by a predecessor, it copies its spliterator into a wrapped nodebuilder (runs the pipeline above forEachOrdered)
onCompletion, the node (if created, else spliterator) is forEach-d with the action
}
else: ForEachTask(_).invoke() {
each leaf task copies its split into the (pre-wrapped) sink
}
}
// From TerminalSink:
accept(t) {
action.accept(t)
}
get() {
null
}
//--cancellationRequested()
}
|
|
|
CLEARS |
|
|
|
Okay |
Okay |
| forEachOrdered() |
|
|
|
|
|
|
Okay |
Warning: Buffers elements from upstream until predecessor tasks finish |
| toArray() |
scrawl
toArray(gen) -> Nodes.flatten(evaluateToArrayNode(gen), gen).asArray(gen)
toArray() -> toArray(Object[]::new)
toList() -> unmodifiableListAroundArray(toArray())
|
|
|
|
|
|
|
Warning: Accumulates elements into an array |
Warning: Accumulates elements into an array |
| toList() |
|
|
|
|
|
|
Warning: Accumulates elements into an array |
Warning: Accumulates elements into an array |
| reduce() |
scrawl
reduce(_) -> evaluate(ReduceOps.makeRef(_)) -> TerminalOp {
getOpFlags() {
0
}
evaluateSequential(ph, split) {
ph.wrapAndCopyInto(ReducingSink(), split)
}
evaluateParallel(ph, split) {
ReduceTask(this, ph, split).invoke().get()
}
ReducingSink {
begin(size) { state = seed }
accept(t) { state = reducer.apply(state, t) }
combine(other) { state = combiner.apply(state, other.state) }
get() { state }
//--cancellationRequested()
}
ReduceTask {
each leaf task does result = ph.wrapAndCopyInto(op.makeSink(), split)
onCompletion, non-leaf tasks do leftResult.combine(rightResult), result = leftResult
}
}
|
|
|
|
|
|
|
Caution: Memory usage depends on how the reduction's accumulation grows |
Caution: Memory usage depends on how the reduction's accumulation grows |
| collect() |
scrawl
collect(_) -> {
if isParallel() && collector is CONCURRENT && (!ORDERED || collector is UNORDERED):
container = supplier.get(), forEach(u -> accumulator.accept(container, u))
else: container = evaluate(ReduceOps.makeRef(collector)) -> TerminalOp {
getOpFlags() {
collector is UNORDERED ? NOT_ORDERED : 0
}
evaluateSequential(ph, split) {
ph.wrapAndCopyInto(ReducingSink(), split)
}
evaluateParallel(ph, split) {
ReduceTask(this, ph, split).invoke().get()
}
ReducingSink {
begin(size) { state = supplier.get() }
accept(t) { accumulator.accept(state, t) }
combine(other) { state = combiner.apply(state, other.state) }
get() { state }
//--cancellationRequested()
}
}
collector is IDENTITY_FINISH ? container : finisher.apply(container)
}
|
|
|
CLEARS if collector is UNORDERED |
|
|
|
Caution: Memory usage depends on how the collector's accumulation grows |
Caution: Memory usage depends on how the collector's accumulation grows |
| spliterator() |
scrawl
spliterator() {
this == sourceStage && sourceSpliterator != null -> sourceSpliterator
this == sourceStage && sourceSupplier != null -> lazySpliterator(sourceSupplier)
else -> wrap(this, () -> sourceSpliterator(0), isParallel())
}
|
|
|
|
|
|
|
Caution: (a) Upstream memory usage will be realized upon
consumption, (b) Buffering risk if upstream contained amplifying operations AND spliterator is
consumed one-at-a-time |
Caution: (a) Upstream memory usage will be realized upon consumption, (b)
Buffering risk if upstream contained amplifying operations AND spliterator is consumed
one-at-a-time |
| iterator() |
scrawl
Spliterators.iterator(spliterator())
|
|
|
|
|
|
|
Caution: (a) Upstream memory usage will be realized upon
consumption, (b) Buffering risk if upstream contained amplifying operations AND iterator is
consumed one-at-a-time |
Caution: (a) Upstream memory usage will be realized upon consumption, (b)
Buffering risk if upstream contained amplifying operations AND iterator is consumed
one-at-a-time |
Caution: Splitting a naive spliterator may cause buffering. In parallel
streams, the first eager operation (stateful or terminal) can evoke heavy memory usage if the initial
spliterator's trySplit() relies on buffering many/large elements. For instance,
Spliterators.AbstractSpliterator and Spliterators.IteratorSpliterator (often used when converting an
iterator to a spliterator) both split by instantiating an array and advancing elements into it, then
returning a spliterator over the array. Each time the original spliterator splits, the capacity of the array
it instantiates and fills is increased (up to a threshold). If the original spliterator covers sufficiently
many/large elements, it can eventually run out of memory attempting to split.
Caution: Amplifying operations that feed into a pipeline-wrapping
spliterator may cause buffering. Lazy operations necessarily wrap the upstream pipeline in a spliterator.
When we initially call tryAdvance() on this upstream-wrapping spliterator (StreamSpliterators.WrappingSpliterator),
it calls tryAdvance() on the incoming pipeline spliterator, passing a consumer that applies the upstream
pipeline operations and ultimately pushes output elements into a buffer. This buffer is because the pipeline
may contain amplifying operations that can produce multiple output elements per input element from the
incoming spliterator. The next time tryAdvance() is called on the upstream-wrapping spliterator, it will
poll from the buffer if it is not empty. So, in the case that we have an upstream-wrapping spliterator over
large amplifying operations, and we consume it with tryAdvance(), we can actually use a lot of memory.
See examples
Stream.of(0).flatMap(_ -> Stream.generate(() -> 0)).spliterator().tryAdvance(_ -> {});
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).skip(10) .findFirst(); Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).limit(10) .findFirst(); Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).takeWhile(i -> i < 10).findFirst(); Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).dropWhile(i -> i < 10).findFirst(); Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).distinct() .findFirst();
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).skip(10) .forEach(_ -> {}); Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).limit(10) .forEach(_ -> {}); Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).takeWhile(i -> i < 10).forEach(_ -> {}); Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).dropWhile(i -> i < 10).forEach(_ -> {});
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).distinct() .forEach(_ -> {});
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).skip(10).toArray(); Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).limit(10).toArray();
Stream.of(0).parallel().flatMap(_ -> Stream.generate(() -> 0)).gather(nonGreedyGatherer).forEach(_ -> {});
Stream.of(0).flatMap(_ -> Stream.generate(() -> 0)).gather(nonGreedyGatherer).collect(anyCollector);
Note: Though parallel forEachOrdered() and parallel gather() (with a sequential + greedy Gatherer)
both buffer the upstream in leaf tasks, the actual amount buffered depends on the leaf spliterator size, the
parallelism of the ForkJoinPool running the stream, and whether the upstream uses amplifying/de-amplifying
operations. The buffering in these cases does not necessarily hold the entire upstream in memory at once.
# Stream planner ↩
This table simulates what will happen to stream op flags and memory usage across stream operations.
Try adding/changing stream operations and initial spliterator characteristics.
Legend
lazy barrier: (Dashed line) Indicates that a (parallel)
stateful op evaluates lazily, by wrapping the incoming spliterator in a new spliterator that implements
the pipeline since the last barrier, and becomes the new "incoming" spliterator downstream. A lazy
barrier is also drawn for the terminal operations spliterator()/iterator(), since they likewise wrap the
incoming spliterator.
eager barrier: (Solid line) Indicates that a (parallel)
stateful op evaluates eagerly, by accumulating the upstream pipeline into a Node, whose spliterator
becomes the new "incoming" spliterator downstream. An eager barrier is also drawn for (sequential)
sorted(), since it must accumulate all elements from the upstream pipeline before emitting any
downstream. (It is possible for (sequential) gather() to behave in this way too, but for now no barrier
is drawn.) If a stateful op is fused to a toArray()/toList() terminal op, no barrier is drawn - the
stateful op evaluates in parallel-eager mode to produce a Node, but that Node is converted to an array
directly, rather than taking its spliterator (and toArray()/toList() are already eager). If gather() is
fused to collect(), no barrier is drawn - the gather() no longer evaluates in parallel-eager mode to
produce a Node, and instead emits directly into the collector's accumulator.
Spliterator Characteristics, ΔFlags, and Flags are all "outgoing" from the stage they are
adjacent to. (The exception to this is eager terminal ops, which do not have an outgoing spliterator,
and apply flag deltas BEFORE evaluation.) At parallel stateful op barriers, the outgoing spliterator is
a new spliterator - I chose to depict the barrier going under the stage, and above the rest of the row.
| Spliterator Characteristics |
ΔFlags |
Flags |
| DI |
DISTINCT |
DI |
DISTINCT |
DI |
DISTINCT |
| SO |
SORTED |
SO |
SORTED |
SO |
SORTED |
| OR |
ORDERED |
OR |
ORDERED |
OR |
ORDERED |
| SI |
SIZED |
SI |
SIZED |
SI |
SIZED |
| NO |
NONNULL |
SC |
SHORT_CIRCUIT |
SC |
SHORT_CIRCUIT |
| IM |
IMMUTABLE |
SA |
SIZE_ADJUSTING |
SA |
SIZE_ADJUSTING |
| CO |
CONCURRENT |
|
No delta |
|
Flag is known absent |
| SU |
SUBSIZED |
S |
Delta sets flag |
Y |
Flag is known present |
| ? |
Characteristic is unknown† (assumed absent) |
C |
Delta clears flag |
|
| |
Characteristic is known absent |
C/S |
Delta changes‡ (e.g. from "clears" to "sets") |
| Y |
Characteristic is known present |
|
† When a stateful op is evaluated in parallel-eager mode, the implementation of the resultant Node
(and its spliterator) can depend on details that this tool doesn't or can't model, such as the size of
the input spliterator, whether/how that spliterator splits, and whether nodes are concatenated,
truncated, etc. In these cases, I depict the intersection of all possible sets of spliterator characteristics as
"known", and the remaining possible characteristics as "unknown (assumed absent)". In practice, this is
mostly benign, but can occasionaly lead to some mispredictions when the actual spliterator would have
been SUBSIZED.
‡ Deltas may be depicted as "changed" for one of several reasons:
- After AbstractPipeline.sourceSpliterator() evaluates a stateful op (in parallel mode), it
overwrites the SIZED delta based on the resultant spliterator, and removes the SHORT_CIRCUIT
delta (if any).
- For fused gather() ops, the actual implementation removes the first gather() stage entirely,
and fuses its Gatherer to the Gatherer of the second gather() stage. I chose to depict this as
removing the deltas from the first stage (and displaying its Memory Usage as N/A).
- For the niche case of a gather() fused to a collect() with an UNORDERED collector, the
collect() no longer clears the ORDERED flag. I chose to depict this as removing the delta.
# Additional tangents ↩
This is a list of some interesting tidbits that didn't make it elsewhere.
- Spliterators can share state, Collectors (and Gatherers) can combine state, but neither can do both
- Spliterators can share state across tasks by passing their state into child splits when they
are created in trySplit(). This is used to great effect in existing stateful operations'
parallel-lazy implementations.
- Collectors have no mechanism to create shared state - each leaf task initializes its own
local state by invoking the Collector's supplier(). But unlike Spliterators, Collectors can
combine child tasks' state in the parent task, via the combiner().
- There is an exception: collect():ing a Collector that is CONCURRENT + UNORDERED will
invoke the supplier() once and share the state among all tasks; but then tasks get no
local state, and the combiner() is unused.
- Additionally, Collectors can share state if that state is stored on the Collector
itself; but this breaks the usual reusability of Collectors.
- Stated differently: A Spliterator can coordinate "on the way down" (when forking tasks) to
share state; A Collector can coordinate "on the way up" (when joining tasks) to combine state;
Neither can do both.
- How would existing intermediate operations behave if implemented as gatherers (especially under parallelism)?
- gather() itself is currently always eager in parallel streams (unless followed by collect()).
- The only way to avoid excessive eager barriers would be to fuse adjacent gather()
operations... However, this tends to lose parallelism (a fused Gatherer is sequential if ANY
constituent Gatherer is sequential) and greediness (a fused Gatherer is greedy only if ALL
constituent Gatherers are greedy).
- There would always be an eager barrier before the terminal operation, unless the
terminal operation was toArray()/toList() (which still stores all elements) or collect()
(which needn't store all elements, but cannot short-circuit).
- Stateless ops would use an integrator and possibly a finisher, plus a no-op initializer and a
no-op combiner (but non-default, to avoid forcing sequential).
- Stateful ops would use an integrator and possibly a finisher, plus a custom initializer, and
ideally a custom combiner, or the default combiner (forcing sequential) if needed.
- Unfortunately, like Collectors, Gatherers provide no way to share state across tasks
(without sacrificing the reusability of the Gatherer).
- Gatherers also have no way of signaling cancellation during the "combine" step, nor any
way of reading or selectively writing stream op flags to optimize themselves and subsequent
operations. And they do not have access to the task's spliterator characteristics (e.g. its
size, if known) when initializing task state.
- Altogether, this means that of the existing stateful ops:
- skip()/limit()/takeWhile()/dropWhile() would need to be sequential gatherers.
The current parallel-eager implementations rely heavily on shared state (and
limit()/takeWhile() can also cancel during the "combine" step).
- distinct() could be parallel, using a LinkedHashSet for the state, and emitting
elements in the finisher. This actually corresponds to the current distinct()
behavior when the stream is ORDERED (and !DISTINCT). However, for the other cases, a
gatherer cannot detect (or signal) that the stream is DISTINCT (to no-op), and can't
share a ConcurrentHashMap among tasks even if it could detect that the stream is
!ORDERED.
- sorted() could be parallel, using an ArrayList for the state, and sorting +
emitting elements in the finisher. This corresponds to the current sorted() behavior
when the stream is !SORTED, and assuming no size information is known.
- We can implement all existing eager terminal operations in terms of Gatherers
- The general strategy is to fuse gather() to collect(), then do everything in the gatherer
and emit nothing to the collector, except for one element when the gatherer finishes. We could
use a custom collector whose result is the first/only element it accumulates, or we could just
use Collectors.toList() and extract the first element.
- Most operations would even retain similar parallel performance characteristics. The
short-circuiting terminal ops (findFirst()/findAny()/anyMatch()/allMatch()/noneMatch()) would
leverage the Integrator's ability to short-circuit and cancel later tasks. The sequential
terminal ops (findFirst()/forEachOrdered()) would leverage Gatherer.defaultCombiner() to run
sequentially while still running the upstream in parallel. The reduction terminal ops
(reduce()/collect()/min()/max()) would leverage the gatherer's combiner() to run in their usual,
parallel way. The main shortfalls are: collect() would miss out on its CONCURRENT + UNORDERED
optimization (where it initializes state once and shares among all tasks), and terminal ops that
make use of stream size information (count()/toArray()/toList()) would not have that available
to optimize.
- We can implement custom intermediate operations without Gatherers (by design)
- To implement an intermediate lazy operation, we can invoke the spliterator() terminal
operation on a Stream, then wrap that spliterator in a custom spliterator implementing
our operation, and pass the resultant spliterator to StreamSupport.stream().
- To implement an intermediate eager operation, we can use the overload of
StreamSupport.stream() that accepts a spliterator-supplier, rather than a spliterator. This
supplier is not invoked until a terminal operation on the Stream (excluding the lazy
spliterator()/iterator() terminal operations, which do not invoke the supplier until the
resultant spliterator/iterator is operated on). If we invoke a terminal operation on one Stream
(to collect it to a dataset) inside another Stream's spliterator-supplier, we get behavior like
an eager operation. The former Stream is not consumed until the latter is consumed.
- peek() is the only intermediate operation that does not affect stream flags
- This is why the Stream planner uses peek() as the default new
operation.
- This can arguably serve a useful purpose, because the operation still marks the previous
stream object as linked, so further operations on it will throw. A no-op peek() allows e.g. a
callee that intends to consume the stream in a deferred callback to "take ownership" of it, so
that the caller fails-fast if it attempts to use the stream after the callee returns. However,
this usage is not entirely without consequence: In particular, the inserted peek() can prevent
operation fusion between an otherwise-adjacent stateful op + toArray()/toList(), or gather() +
gather()/collect().
- flatMap() closes each Stream returned from its behavioral parameter
- This is documented in its javadoc, and can arguably serve a useful purpose for implicit
resource management, e.g.: Stream.of(0).flatMap(_ -> getResourceStream()). However, this
approach cannot make use of characteristic info from the inner stream - e.g. here, the outer
stream is one element and does not usefully parallelize. The parallelism of the inner stream
is ignored - flatMap() must force it to be sequential() before calling forEach() to emit its
elements downstream (or allMatch(), if the downstream can short-circuit). Additionally, this
approach introduces a possibly-large amplifying op, which can play into a buffering risk.
- mapMulti()'s behavioral parameter cannot detect downstream cancellation
- This is clear from its signature, but may be worth a mention, as a large amplifying mapMulti()
can waste time on useless work if the downstream is short-circuiting. This does not pose any
additional buffering risk, as a cancelled downstream will simply discard new incoming elements.