# The Java Streams Parallel

Published: 01-30-2025, Last Updated: 01-30-2025

I made this as someone who loves reading into OpenJDK source code, and obsesses over streaming and concurrency, but found the Java Streams source particularly difficult to grok in as much detail as I'd like. I'm hoping this document will make the implementation and performance characteristics a little more accessible to myself and others. I know this may be a moving target, especially with Gatherers now being introduced, but perhaps patching or versioning this document as the implementation evolves will be easier with some groundwork laid.

This document is intended to be intermediate- to expert-friendly, not necessarily beginner-friendly. I may not dwell too long on terminology or public abstractions that have been well-hashed elsewhere. I recommend other resources for that, such as the stream package doc, and this article series by Brian Goetz.

This document is current with JDK 24 [build 34], and consists of a single html file.

If you have substantive additions or corrections to propose, I have (hesitantly) created an issue tracker for this site. Or, if you'd like to work with me, you can reach me at danielaveryj@gmail.com.

Caution: Depending on implementation details is inherently risky! This document is not intended to mislead anyone into believing that subsequent (or prior) JDK versions will adhere to the same design or performance characteristics.


# Table of Contents


# Stream concurrency summary

Note: In this doc I am using a slightly different definition of "eager" and "lazy" than the stream package doc, which uses "eager" and "lazy" to refer to roughly "when the stream (as a whole) executes", rather than "when specific operations on the stream execute (during the whole execution)". By the former definition, all intermediate operations are lazy. By the latter definition (which I extrapolated from AbstractPipeline.opEvaluateParallelLazy() and AbstractPipeline.opEvaluateParallel()[Eager?]), stateful intermediate operations on a parallel stream may execute either eagerly or lazily. By both definitions, all existing terminal operations are eager, except for spliterator()/iterator().

Java Streams use data parallelism:

Data parallelism in a pipeline
Pipeline Data
                                 
Task D Task E Task F Task G
map()
filter()
flatMap()
collect()
               
  Task B   Task C  
       
  Task A  

Here, the pipeline consists of several operations, terminated by the first "eager" operation (in this case, "collect()"). The eager operation associates the data with a root task (think: thread), which splits the data and associates each split with a child task. Each child task repeats this process, until the data will split no more, indicating a leaf task. Each leaf task consumes its split of the data, applying the pipeline operations and accumulating each element into a result, according to the eager operation. Each parent task combines its children's results to form its own result, according to the eager operation. This continues until the root task has a result.

In parallel Java Streams, pipelines can have multiple eager operations, by requiring each intermediate eager operation to produce a new dataset as its result. This becomes input to the remaining pipeline, down to the next eager operation.

For data parallelism to achieve speedup, the data should be large enough - and/or the operations time-intensive enough - to offset the cost of the multitasking overhead (e.g. initial thread scheduling, and result-combining). However, note that in Java Streams, after any intermediate eager operation, the resultant data will be resident in memory (so "too large" is possible).

Data parallelism is typically contrasted with task parallelism:

Task parallelism in a pipeline
Pipeline Data
                                 
 
Task C
map()
           buffer (C pushes, B polls)
 
Task B
filter()
           buffer (B pushes, A polls)
 
Task A
flatMap()
collect()

Here, the pipeline consists of several operations, optionally split up by buffers, and ending in a terminal operation (in this case, "collect()"). Tasks are created to produce or poll elements from upstream, apply pipeline operations, and consume or push elements downstream. The last task accumulates elements into a result, according to the terminal operation.

Buffers enable tasks to progress concurrently, but are typically bounded in size to constrain memory usage. If a buffer fills up, the upstream task must wait until it is not full; if a buffer empties, the downstream task must wait until it is not empty. In the case of small/temporary mismatches in upstream and downstream processing rate, a larger buffer spends memory to buy concurrency, enabling tasks to spend less time waiting on each other.

For task parallelism to achieve speedup, the operations should be time-intensive enough to offset the cost of the multitasking overhead (e.g. upstream task context-switching when downstream buffer is full, and downstream task context-switching when upstream buffer is empty). This "time intensity" typically comes from operations being IO-bound (and thus already having context-switching and downtime).

The "declarative" approach to task parallelism depicted here - interacting with a first-class "pipeline" object, with operations that abstract over tasks and buffers - has been modeled by e.g. the various Reactive Streams libraries, and Kotlin Flows. An "imperative" approach - interacting with the lower-level tasks (or actors/threads/processes) and buffers (or mailboxes/queues/channels) more directly - has been modeled by e.g. Akka's actors, Erlang's processes, and Go's goroutines + channels.

Combining data parallelism and task parallelism

We can't really use data paralleism in an arbitrary task-parallel task, because in general we don't have the data in hand in order to split it. The data arrives incrementally over time. We can instead do something related, and route the data to one of several downstream tasks as it arrives - a kind of load-balancing operation. Note that pipeline topology need not be linear - it can diverge (fan-out) and even re-converge (fan-in).

We can use task parallelism in (leaf) data-parallel tasks. However, if the operations are CPU-bound, which Java Streams was traditionally intended for, then this does nothing but add the devastating IO overhead of waiting on full/empty buffers and context-switching. That said, the new Gatherers.mapConcurrent() introduces a limited mechanism for IO-bound work, that actually encapsulates a buffer, fan-out, and fan-in, in one operation.

Eager and lazy operations

A Java Stream is created from an initial spliterator, which yields elements of a dataset. In sequential execution, the initial spliterator is advanced into a consumer representing the entire stream pipeline. In parallel execution, the stream is divided into "segments", where each stateful intermediate operation ends a segment. These intermediate operations may be either "eager" or "lazy".

Eager operations are implemented as described by data parallelism above. They consume the most recent spliterator, and (if intermediate) buffer output elements into a new dataset and yield its spliterator. Eager operations cannot emit downstream before consuming the full upstream.

Lazy operations are implemented by wrapping the most recent spliterator in a new spliterator that implements the upstream pipeline - from just after the last segment ended, down to and including the operation itself. Lazy operations can emit downstream before consuming the full upstream.

Whether a given stateful intermediate operation executes eagerly or lazily often depends on characteristics of the stream (see the Stream operations summary and Stream planner).

The terminal operation ends the last segment. Almost all terminal operations are eager - they consume the most recent spliterator, rather than wrap it. The exceptions are the lazy terminal operations spliterator() and iterator() - they wrap (a supplier of) the most recent spliterator.


# Stream package classes summary

The below table contains a short description of each top-level class in java.util.stream, to get your bearings.

Note: The JDK goes out of its way to provide "primitive" stream types (i.e. IntStream, LongStream, DoubleStream) that meticulously avoid boxing their primitive elements, for maximum performance. This results in more complicated class structures designed to support both reference- and primitive-flavored streams.

See example class structurings
// Primitive subclasses of reference variant:
class SomeClass {
    // ...shared and reference-specific methods...
    static class OfInt extends SomeClass { ... }
    static class OfLong extends SomeClass { ... }
    static class OfDouble extends SomeClass { ... }
}

// Reference and primitive subclasses of abstract parent:
abstract class SomeClass {
    // ...shared methods...
    static class OfRef extends SomeClass { ... }
    static class OfInt extends SomeClass { ... }
    static class OfLong extends SomeClass { ... }
    static class OfDouble extends SomeClass { ... }
}

// Sometimes an additional class layer is added above the primitive specializations:
abstract class SomeClass {
    // ...shared methods...
    static class OfRef extends SomeClass { ... }
    abstract static class OfPrimitive extends SomeClass { ... } // <-- See here
    static class OfInt extends SomeClass.OfPrimitive { ... }
    static class OfLong extends SomeClass.OfPrimitive { ... }
    static class OfDouble extends SomeClass.OfPrimitive { ... }
}

// Sometimes classes are nested (as shown above), sometimes not

In this doc I try to focus on the "reference" stream type (i.e. Stream) and its supporting classes. The code in the primitive counterparts will be highly reminiscent, except for using primitive-specific subclasses and primitive-accepting/returning variants of methods to avoid boxing.

Aside: What is "scrawl"?

Many of my detail notes are presented as a mix of pseudocode and prose, that I will refer to as "scrawl" (to dampen expectations). Scrawl is tailored for brevity, rather than precision. Some scrawl I have put more effort into, some less. This "flashcard"-esque style helps me build a mental map of the code, but requires reading between the lines. To give a sense of some of the obscurities used in scrawl:

I have included the scrawl just in case it is helpful, but it may not be, and you are not obligated to read it. In fact, I can only recommend reading scrawl alongside the actual source code (and a hot drink). Good luck, and have fun! :)

Top-level java.util.stream classes High-level summary
Model BaseStream (public)
scrawl
interface BaseStream<T, Self>
    extends AutoCloseable
{
    Iterator<T> iterator();
    Spliterator<T> spliterator();
    boolean isParallel();
    Self parallel();
    Self sequential();
    Self unordered();
    Self onClose(Runnable handler);
    void close();
}
Interface extended by all streams (reference and primitive), that encompasses only the broadest stroke of what a stream is to Java: Something that wraps an iterator/spliterator, and may be ordered/unordered, sequential/parallel, and closeable.
Stream (public)
scrawl
interface Stream<T>
    extends BaseStream<T, Stream<T>>
{
    // declarations of familiar methods, some with default impls
}
The familiar interface that declares available operations on streams. This is the "reference"-flavored variant; I have elided the primitive variants (IntStream, LongStream, DoubleStream) in this document for brevity, since their implementations differ only mildly.
Sink
scrawl
interface Sink<T> extends Consumer<T> {
    default void begin(long size) {}
    default void end() {}
    default boolean cancellationRequested() { return false; }
    default void accept(int value) { throw }
    default void accept(long value) { throw }
    default void accept(double value) { throw }
    void accept(T value);

    interface OfInt
        extends Sink<Integer>, IntConsumer
    ...

    abstract static class ChainedReference<T, E_OUT>
        implements Sink<T>
    {
        Sink<? super E_OUT> downstream;
        void begin(long size) { downstream.begin(size); }
        void end() { downstream.end(); }
        void cancellationRequested() { downstream.cancellationRequested(); }
    }

    abstract static class ChainedInt<E_OUT>
        implements Sink.OfInt
    ...
}
A Consumer extended with one-shot methods to inform that it is "about to be sent elements" and "done being sent elements", as well as a method to query if it "wants more elements?". Intermediate operations on streams make heavy use of "sink-wrapping" - taking a downstream sink and creating a new sink that adds its own state/handling for incoming elements, before (conditionally) passing elements downstream.
PipelineHelper
scrawl
abstract class PipelineHelper<P_OUT> {
    abstract StreamShape                     getSourceShape();
    abstract int                             getStreamAndOpFlags();
    abstract <P_IN> long                     exactOutputSizeIfKnown(Spliterator<P_IN> spliterator);
    abstract <P_IN, S extends Sink<P_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator);
    abstract <P_IN> void                     copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
    abstract <P_IN> boolean                  copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
    abstract <P_IN> Sink<P_IN>               wrapSink(Sink<P_OUT> sink);
    abstract <P_IN> Spliterator<P_OUT>       wrapSpliterator(Spliterator<P_IN> spliterator);
    abstract <P_IN> Node<P_OUT>              evaluate(Spliterator<P_IN> spliterator, boolean flatten, IntFunction<P_OUT[]> generator);
    abstract Node.Builder<P_OUT>             makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator);
}
An abstract class extended by AbstractPipeline, and declaring some of the core "heavy-hitting" internal methods for advancing elements through the stream. As AbstractPipeline is the only subclass, it's not clear that PipelineHelper is really needed, but it does have fewer type parameters to spell out compared to passing AbstractPipeline around.
AbstractPipeline
scrawl
abstract class AbstractPipeline<E_IN, E_OUT, Self>
    extends PipelineHelper<E_OUT>
    implements BaseStream<E_OUT, Self>
{
    final AbstractPipeline sourceStage;
    final AbstractPipeline previousStage;
    final int              sourceOrOpFlags;
    AbstractPipeline       nextStage;
    int                    depth;
    int                    combinedFlags;
    Spliterator<?>         sourceSpliterator;
    boolean                linkedOrConsumed;
    Runnable               sourceCloseAction;
    boolean                parallel;

    // From PipelineHelper:
    StreamShape                     getSourceShape() {
                                      go back stages until p.depth==0, then p.getOutputShape()
                                    }
    int                             getStreamAndOpFlags() {
                                      combinedFlags
                                    }
    <P_IN> long                     exactOutputSizeIfKnown(Spliterator<P_IN> spliterator) {
                                      size = spliterator.getExactSizeIfKnown() if StreamOpFlag.SIZED else -1
                                      if size != -1 && StreamOpFlag.SIZE_ADJUSTING && !isParallel(), size = stage.exactOutputSize(size) for stages 
                                    }
    <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
                                      copyInto(wrapSink(sink), spliterator); return sink;
                                      // Commonly called by TerminalOp.evaluateSequential() and
                                      // AbstractTask.doLeaf() (by TerminalOp.evaluateParallel(), AbstractPipeline.opEvaluateParallel[Lazy]())
                                    }
    <P_IN> void                     copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
                                      copyIntoWithCancel(_) if SHORT_CIRCUIT else wrappedSink.begin(_); spliterator.forEachRemaining(_); _.end();
                                    }
    <P_IN> boolean                  copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
                                      go back stages until p.depth==0, then wrappedSink.begin(_); p.forEachWithCancel(_); wrappedSink.end();
                                    }
    <P_IN> Sink<P_IN>               wrapSink(Sink<E_OUT> sink) {
                                      sink = AbstractPipeline.opWrapSink(sink) for this+previous stages until depth==0
                                      // Called by wrapAndCopyInto(_) (by terminal ops and stateful+parallel ops)
                                      // Called by the WrappingSpliterator returned from wrap(_) (by wrapSpliterator(_))
                                      // Called by a few other places where wrapAndCopyInto(_) was less convenient
                                    }
    <P_IN> Spliterator<E_OUT>       wrapSpliterator(Spliterator<P_IN> sourceSpliterator) {
                                      sourceSpliterator if depth==0 else wrap(this, () -> sourceSpliterator, isParallel())
                                      // Called by overrides of opEvaluateParallelLazy() (DistinctOps, WhileOps, SliceOps, GathererOp)
                                    }
    <P_IN> Node<E_OUT>              evaluate(Spliterator<P_IN> spliterator, boolean flatten, IntFunction<E_OUT[]> generator) {
                                      evaluateToNode(_) if isParallel() else wrapAndCopyInto(makeNodeBuilder(_), spliterator).build()
                                      // Called by evaluateToArrayNode() (by toArray())
                                    }
    //--Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<E_OUT[]> generator);


    // From BaseStream:
    //--Iterator<T> iterator()
    Spliterator<E_OUT>              spliterator() {
                                      this == sourceStage && sourceSpliterator != null -> sourceSpliterator
                                      this == sourceStage && sourceSupplier != null -> lazySpliterator(sourceSupplier)
                                      else -> wrap(this, () -> sourceSpliterator(0), isParallel())
                                    }
    boolean                         isParallel() {
                                      sourceStage.parallel
                                    }
    Self                            parallel() {
                                      sourceStage.parallel=true; return this
                                    }
    Self                            sequential() {
                                      sourceStage.parallel=false; return this
                                    }
    //--Self unordered()
    Self                            onClose(Runnable handler) {
                                      sourceStage.sourceCloseAction = Streams.composeWithExceptions(existing, handler); return this
                                    }
    void                            close() {
                                      sourceStage.sourceCloseAction.run()
                                    }

    // New declarations:
    <R> R                           evaluate(TerminalOp<E_OUT, R> terminalOp) {
                                      terminalOp.evaluate[Parallel|Sequential](this, sourceSpliterator(terminalOp.getOpFlags()))
                                      // Called by all terminal ops except toArray()/toList()
                                    }
    Node<E_OUT>                     evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
                                      if isParallel() && previousStage != null && opIsStateful(), depth=0; opEvaluateParallel(_);
                                      else evaluate(sourceSpliterator(0), true, generator)
                                      // Called by toArray()/toList()
                                    }
    Spliterator<E_OUT>              sourceStageSpliterator() {
                                      only called on source stage, returns source[Spliterator|Supplier.get()], sets linkedOrConsumed
                                      // Called by Head.forEach[Ordered]() when !isParallel()
                                    }
    int                             getStreamFlags() {
                                      StreamOpFlag.toStreamFlags(combinedFlags)
                                    }
    Spliterator<?>                  sourceSpliterator(int terminalFlags) {
                                      gets sourceStage.source[Spliterator|Supplier.get()]
                                      if isParallel() && hasAnyStateful(), finds each p where p.opIsStateful()
                                        spliterator = p.opEvaluateParallelLazy(previousStage, spliterator)
                                        overrides depth! (0 if opIsStateful else ++) and combined flags on each p
                                      returns spliterator
                                      // Called by evaluate() (both overloads), and thus by all terminal ops
                                    }
    long                            exactOutputSize(long previousSize) {
                                      previousSize // overridden by SliceOps (skip()/limit())
                                    }
    boolean                         isOrdered() {
                                      StreamOpFlag.ORDERED.isKnown(combinedFlags)
                                    }

    abstract StreamShape               getOutputShape();
    abstract <P_IN> Node<E_OUT>        evaluateToNode(PipelineHelper<E_OUT> ph, Spliterator<P_IN> s, boolean flatn, IntFunction<E_OUT[]> g);
    abstract <P_IN> Spliterator<E_OUT> wrap(PipelineHelper<E_OUT> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel);
    abstract Spliterator<E_OUT>        lazySpliterator(Supplier<? extends Spliterator<E_OUT>> supplier);
    abstract boolean                   forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink);

    // Overridden in StatelessOp/StatefulOp + subclasses
    abstract boolean                   opIsStateful();
    abstract Sink<E_IN>                opWrapSink(int flags, Sink<E_OUT> sink);

    // Overridden in StatefulOp subclasses
    <P_IN> Node<E_OUT>                 opEvaluateParallel(PipelineHelper<E_OUT> ph, Spliterator<P_IN> s, IntFunction<E_OUT[]> g) {
                                         throw
                                         // Called by evaluateToArrayNode(_) (by toArray()/toList()) when isParallel() && opIsStateful()
                                         // Called by default opEvaluateParallelLazy() 
                                       }
    <P_IN> Spliterator<E_OUT>          opEvaluateParallelLazy(PipelineHelper<E_OUT> helper, Spliterator<P_IN> spliterator) {
                                         opEvaluateParallel(helper, spliterator, Nodes.castingArray()).spliterator() // ie not lazy by default
                                         // Called by sourceSpliterator(_) (by evaluate(), all terminal ops) when isParallel() && hasAnyStateful()
                                       }
}
Arguably the most important class in the package, extended by all the (reference and primitive) "pipeline" classes that implement the (reference and primitive) "stream" interfaces. AbstractPipeline defines the core methods for advancing elements through the stream, wrapping successive operation sinks, etc. AbstractPipelines are constructed to form a doubly-linked list of "stages" - each intermediate stream operation creates a new AbstractPipeline linked to the previous one, retaining access to stream characteristics and operation-specific behavior at every previous stage. Like many abstract classes in this package with reference- and primitive-flavored subclasses, AbstractPipeline declares and calls into several abstract methods that are implemented in subclasses, allowing for type specialization.
ReferencePipeline
scrawl
abstract class ReferencePipeline<P_IN, P_OUT>
    extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
    implements Stream<P_OUT>
{
    // Overrides of inherited abstract methods that were declared purely to enable primitive specialization
    // Plus definitions of the familiar stream methods

    // From PipelineHelper:
    Node.Builder<E_OUT>       makeNodeBuilder(long exactSizeIfKnown, IntFunction<E_OUT[]> generator) {
                                Nodes.builder(exactSizeIfKnown, generator)
                                // Called by evaluate(split, flatn, gen) (by evaluateToArrayNode(), toArray()) if !isParallel()
                              }

    // From AbstractPipeline:
    StreamShape               getOutputShape() {
                                StreamShape.REFERENCE
                              }
    <P_IN> Node<E_OUT>        evaluateToNode(PipelineHelper<E_OUT> ph, Spliterator<P_IN> s, boolean flatn, IntFunction<E_OUT[]> g) {
                                Nodes.collect(ph, s, flatn, g)
                                // Called by evaluate(split, flatn, gen) if isParallel() (by evaluateToArrayNode(), toArray())
                              }
    <P_IN> Spliterator<E_OUT> wrap(PipelineHelper<E_OUT> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel) {
                                new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel)
                                // Called by spliterator() and wrapSpliterator() (by opEvaluateParallelLazy() - WhileOps, DistinctOps, SliceOps,
                                // by sourceSpliterator(_) when isParallel() && hasAnyStateful(), by evaluate(), all terminal ops)
                              }
    Spliterator<E_OUT>        lazySpliterator(Supplier<? extends Spliterator<E_OUT>> supplier) {
                                new StreamSpliterators.DelegatingSpliterator<>(supplier)
                                // Called by spliterator() when this == sourceStage && sourceSupplier != null
                              }
    boolean                   forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink) {
                                do { } while (!(cancelled = sink.cancellationRequested()) && spliterator.tryAdvance(sink))
                                // Called by copyIntoWithCancel()
                              }
    //--boolean opIsStateful();
    //--Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink);

    // From BaseStream:
    Iterator<T>               iterator() {
                                Spliterators.iterator(spliterator())
                              }
    Self                      unordered() {
                                new StatelessOp w/ StreamOpFlag.NOT_ORDERED, opWrapSink(sink) -> sink
                              }

    // Head is instantiated in StreamSupport.stream(), which is THE method for creating an initial stream from a spliterator
    // Called by concat(), generate(), builder().build(), all Collection.stream() impls, Arrays.stream()...

    static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> { ... }

    // StatelessOp is used directly by unordered(), filter(), map(), flatMap(), mapMulti(), peek().
    // Common strategy is to make an anonymous subclass of StatelessOp, and inside
    // override opWrapSink() to return an anonymous subclass of Sink.ChainedReference.

    abstract static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> { ... }

    // StatefulOp is used indirectly by distinct() (DistinctOps), sorted() (SortedOps), limit()/skip() (SliceOps), takeWhile()/dropWhile() (WhileOps).
    // Common strategy is to make an anonymous subclass of StatefulOp, and inside
    // override opWrapSink() to return an anonymous subclass of Sink.ChainedReference,
    // and override other relevant methods on StatefulOp as well.

    abstract static class StatefulOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> { ... }
}
Extends AbstractPipeline to implement Stream, by implementing leftover specializable methods from AbstractPipeline, as well as the familiar stream operation methods (which mostly delegate to utility classes, except for the simpler stateless operations). ReferencePipeline itself is still abstract, and declares several static inner subclasses: Head, for the first pipeline stage (simply wrapping a spliterator), plus StatelessOp and StatefulOp, for intermediate operations.
TerminalOp
scrawl
interface TerminalOp<E_IN, R> {
    default StreamShape inputShape() { return StreamShape.REFERENCE; }
    default int getOpFlags() { return 0; }
    default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper, Spliterator<P_IN> spliterator) { evaluateSequential(_) }
    <P_IN> R evaluateSequential(PipelineHelper<E_IN> helper, Spliterator<P_IN> spliterator);
}
Interface that defines how to evaluate a terminal operation, in sequential and parallel modes, given an upstream pipeline and spliterator. Most terminal operations use this interface, but there are a couple special cases that are implemented directly on AbstractPipeline: toArray()/toList() and spliterator()/iterator().
TerminalSink
scrawl
interface TerminalSink<T, R> extends Sink<T>, Supplier<R> { }
A Sink that also extends Supplier. Used by terminal operations, which don't have a downstream sink, but do want to expose a result after the upstream is done sending elements. The TerminalSink interface is only used inside TerminalOps, and only if they have multiple TerminalSink implementations to abstract over.
StreamShape
scrawl
enum StreamShape { REFERENCE, INT_VALUE, LONG_VALUE, DOUBLE_VALUE; }
Enum of the 4 (1 reference and 3 primitive) stream variants. Though passed around widely, its usages (aside from assertions) ultimately boil down to selecting a type-specialized implementation in a handful of places.
StreamOpFlag
scrawl
enum StreamOpFlag {
    DISTINCT, SORTED, ORDERED, SIZED, SHORT_CIRCUIT, SIZE_ADJUSTING;
    enum Type { SPLITERATOR, STREAM, OP, TERMINAL_OP, UPSTREAM_TERMINAL_OP; }
    // other masking + combining stuff
}
Enum of stream characteristics, used to optimize various stream operations. Flags are initially derived from the Head spliterator characteristics, and may be set, cleared, or unaffected by each stream operation. Each pipeline stage tracks its own changes to the stream flags, so that the changes can be combined one-stage-at-a-time when the stream is evaluated.
AbstractSpinedBuffer
scrawl
abstract class AbstractSpinedBuffer {
    int    initialChunkPower;
    int    elementIndex;
    int    spineIndex;
    long[] priorElementCount;

    boolean       isEmpty() { ... }
    long          count() { ... }
    int           chunkSize(int n) { ... }
    abstract void clear();
}
Like several other abstract classes, exists to deduplicate common functionality from reference and primitive subclasses. Extended by SpinedBuffer itself (reference variant), and the primitive variants, which are static inner classes inside SpinedBuffer.
SpinedBuffer
scrawl
class SpinedBuffer<E>
    extends AbstractSpinedBuffer
    implements Consumer<E>, Iterable<E>
{
    E[]   curChunk;
    E[][] spine;

    long           capacity() { ... }
    void           inflateSpine() { ... }
    void           ensureCapacity(long targetSize) { ... }
    void           increaseCapacity() { ... }
    E              get(long index) { ... }
    void           copyInto(E[] array, int offset) { ... }
    E[]            asArray(IntFunction<E[]> arrayFactory) { ... }
    void           clear() { ... }
    Iterator<E>    iterator() { ... }
    void           forEach(Consumer<? super E> consumer) { ... }
    void           accept(E e) { ... }
    String         toString() { ... }
    Spliterator<E> spliterator() { ... }

    abstract static class OfPrimitive<E, T_ARR, T_CONS>
        extends AbstractSpinedBuffer
        implements Iterable<E>

    static class OfInt
        extends SpinedBuffer.OfPrimitive<Integer, int[], IntConsumer>
        implements IntConsumer
    ...
}
Basically an append-only list, backed by an array-of-arrays to avoid copy-on-resize. When a subarray ("chunk") fills up, a new one is allocated (with double the capacity) for subsequent elements. Copying only happens if the outer array ("spine") is filled with full chunks, at which point chunk references are copied to a new spine with double the (chunk) capacity. This data structure is used internally in places where elements must be collected into a buffer whose final size is not known upfront.
Node
scrawl
interface Node<T> {
    Spliterator<T>      spliterator();
    void                forEach(Consumer<? super T> consumer);
    default             int getChildCount() { return 0; }
    default             Node<T> getChild(int i) { throw }
    default             Node<T> truncate(long from, long to, IntFunction<T[]> generator) {
                          nb = Nodes.builder(size = to-from, generator)  // size is known -> FixedNodeBuilder
                          nb.begin(size); advance spliterator() into nb between from and to; nb.end();
                          nb.build()
                        }
    T[]                 asArray(IntFunction<T[]> generator);
    void                copyInto(T[] array, int offset);
    default StreamShape getShape() { return StreamShape.REFERENCE; }
    long                count();

    interface Builder<T>
        extends Sink<T>
    {
        Node<T> build();

        interface OfInt
            extends Node.Builder<Integer>, Sink.OfInt
        {
            Node.OfInt build();
        }
        ...
    }

    interface OfPrimitive<T, ...>
        extends Node<T>
    { ... }

    interface OfInt
        extends OfPrimitive<Integer, ...>
    ...
}
Basically a list, backed by a binary tree of Nodes. A "leaf" Node simply wraps an array/collection, while an "internal" Node concatenates two child Nodes. The shape of a node tree generally corresponds to the shape of a parallel computation tree (of fork-join tasks) that produced it: Leaf tasks produce leaf nodes, and parent tasks concatenate child nodes. "The use of Node within the stream framework is largely to avoid copying data unnecessarily during parallel operations." Inner interface Node.Builder extends Sink.
AbstractTask
scrawl
abstract class AbstractTask<P_IN, P_OUT, R, Self>
    extends CountedCompleter<R>
{
    PipelineHelper<P_OUT> helper;
    Spliterator<P_IN>     spliterator;
    long                  targetSize;
    Self                  leftChild;
    Self                  rightChild;
    R                     localResult;

    static int  getLeafTarget() { ... }
    static long suggestTargetSize(long sizeEstimate) { ... }

    // From CountedCompleter:
    void          compute() { ... }
    R             getRawResult() { ... }
    void          setRawResult(R result) { ... }
    void          onCompletion(CountedCompleter<?> caller) { ... }

    // New:
    long          getTargetSize(long sizeEstimate) { ... }
    R             getLocalResult() { ... }
    void          setLocalResult(R result) { ... }
    boolean       isLeaf() { ... }
    boolean       isRoot() { ... }
    Self          getParent() { ... }
    boolean       isLeftmostNode() { ... }
    abstract Self makeChild(Spliterator<P_IN> spliterator);
    abstract R    doLeaf();
}
A ForkJoinTask (a CountedCompleter, specifically) used to implement the parallel-eager mode of most stateful or terminal stream operations. Each task is associated with a spliterator. When invoked, a task splits its spliterator (if possible and size is above a threshold) and passes the two halves to new child tasks, forming a parallel computation tree. If a task does not split, it is a leaf task, and consumes its spliterator in some way (left abstract) to produce a result, then tries to complete its parent. When both children complete, the parent completes, which is usually overridden to combine child results to produce its own result, before trying to complete its parent, and so on up the tree. When a task splits, it typically fork():s one child task (submits it to the ForkJoinPool, to be executed when a thread is available), and "becomes" the other (continues splitting the child's spliterator, or performs the leaf action if child is a leaf task). This avoids blocking the thread executing the task.
AbstractShortCircuitTask
scrawl
abstract class AbstractShortCircuitTask<P_IN, P_OUT, R, Self>
    extends AbstractTask<P_IN, P_OUT, R, Self>
{
    AtomicReference<R> sharedResult;
    volatile boolean   canceled;

    // From AbstractTask:
    void       compute() { ... }
    R          getRawResult() { ... }
    R          getLocalResult() { ... }
    void       setLocalResult(R result) { ... }

    // New:
    void       shortCircuit(R result) { ... }
    void       cancel() { ... }
    boolean    taskCanceled() { ... }
    void       cancelLaterNodes() { ... }
    abstract R getEmptyResult();
}
An AbstractTask extended with the ability to short-circuit compare-and-set the root task's result, plus a 'canceled' flag (which is checked at least once, when the task is first invoked), and the ability to cancel all tasks that come later in the root spliterator's encounter order. Used to implement the parallel-eager mode of short-circuiting stateful or terminal stream operations.
Collector (public)   The (hopefully) familiar interface for expressing a custom, parallelizable, terminal reduction operation. A parallel computation tree would perform the operation by having each leaf task initialize an accumulation (via supplier()) and mutate it with incoming elements (via accumulator()), then have each parent task combine final child accumulations (via combiner()), and the root task transform the final combined accumulation (via finisher()). In sequential operation, the root task is a leaf task, and no combining is needed. Collectors can indicate by their Characteristics if they are "concurrent" (can use same accumulation in all tasks), "unordered" (can accumulate elements in any order), and/or use a no-op finisher(). This enables optimizations in some cases.
Gatherer (public)   The interface for expressing a custom, optionally parallelizable, intermediate operation. A parallel computation tree would perform the operation by having each leaf task initialize some state (via initializer()) and optionally mutate it with incoming elements and/or emit downstream (via integrator()), then have each parent task combine final child state (via combiner()), and the root task use the final combined state to optionally emit more downstream. In sequential operation (a sequential Stream or a sequential Gatherer), no combining is needed. Gatherers can indicate by their arguments whether they are "stateless" (use a no-op initializer()), "greedy" (integrator never initiates cancellation - only propagates downstream cancellation), "sequential" (use a no-op combiner()), and/or use a no-op finisher(). This enables (or disables) optimizations in some cases. Two gatherers can be fused into one using Gatherer.andThen(). The resulting gatherer is stateless if both initial gatherers were stateless and greedy; greedy if both were greedy; sequential if either was sequential; and uses a no-op finisher() if both did.
GathererOp   Implementation of gather() (not nested in a utility class, unlike other stateful ops). GathererOp extends ReferencePipeline directly, rather than StatelessOp or StatefulOp. This is presumably because gather() could theoretically be either a stateless or stateful intermediate operation, depending on the Gatherer. Currently it is conservatively always considered stateful.
Utility StreamSupport (public)   Creates streams from spliterators or suppliers-of-spliterators, by instantiating the appropriate Head class. This is THE entry point for creating an initial stream - called by Stream.concat(), generate(), builder().build(), all Collection.stream() impls, Arrays.stream()...
StreamSpliterators Spliterators for Stream.spliterator(), skip()/limit(), distinct(), generate()
Streams Spliterators for Stream.concat(), range(), implementation for builder(), plus composeWithExceptions() (used by onClose()) and composedClose() (used by concat())
Nodes
scrawl
class Nodes {
    static <T> IntFunction<T[]> castingArray() { ... }
    static <T> Node<T> emptyNode(StreamShape shape) { ... }
    static <T> Node<T> conc(StreamShape shape, Node<T> left, Node<T> right) { ... }
    static <T> Node<T> node(T[] array) { ... }
    static <T> Node<T> node(Collection<T> c) { ... }
    static <T> Node.Builder<T> builder(long exactSizeIfKnown, IntFunction<T[]> generator) { ... }
    static <T> Node.Builder<T> builder() { ... }
    static <P_IN, P_OUT> Node<P_OUT> collect(PipelineHelper<P_OUT> ph, Spliterator<P_IN> split, boolean fltn, IntFunction<P_OUT[]> gen) { ... }
    static <T> Node<T> flatten(Node<T> node, IntFunction<T[]> generator) { ... }

    // Primitive variants (int shown)
    static Node.OfInt node(int[] array) { ... }
    static Node.Builder.OfInt intBuilder(long exactSizeIfKnown) { ... }
    static Node.Builder.OfInt intBuilder() { ... }
    static <P_IN> Node.OfInt collectInt(PipelineHelper<Integer> ph, Spliterator<P_IN> split, boolean fltn) { ... }
    static Node.OfInt flattenInt(Node.OfInt node) { ... }
    ...

    EmptyNode               (leaf node that is empty)
    ArrayNode               (leaf node around a pre-existing array)
    CollectionNode          (leaf node around a pre-existing collection)
    FixedNodeBuilder        (leaf node that accumulates elements up to a known size)
    SpinedNodeBuilder       (leaf node that accumulates elements up to an unknown size)
    ConcNode                (branch node around left and right nodes)
    InternalNodeSpliterator (used by ConcNode)
    ToArrayTask             (task that parallel consumes a node/tree into a single array)
    SizedCollectorTask      (task that parallel consumes spliterator splits into a single array)
    CollectorTask           (task that parallel consumes spliterator splits into a tree of nodes)
    ...all of these classes have ref+primitive specializations
}
A utility that implements "leaf" Nodes for wrapping arrays and collections, Node.Builders (that are themselves both Sinks and "leaf" Nodes - build() returns this) to accumulate a known or unknown amount of elements (using an array or SpinedBuffer, respectively), the "internal"/concat Node, and a few additional Node-related helper methods and tasks
Collectors (public)   A (hopefully) familiar utility that defines a variety of common collectors, some of which accept a downstream collector and act as "wrapping" or "multilevel" collectors
Gatherers (public) A utility that defines a variety of (expected-to-be) common gatherers
DistinctOps Implementation of distinct()
SliceOps Implementation of skip(), limit()
SortedOps Implementation of sorted()
WhileOps Implementation of takeWhile(), dropWhile()
FindOps Implementation of findFirst(), findAny()
ForEachOps Implementation of forEach(), forEachOrdered()
MatchOps Implementation of anyMatch(), allMatch(), noneMatch()
ReduceOps Implementation of reduce(), collect(), count()


# Stream lifecycle summary

This is an outline of how a stream executes, from creation to consumption.

  1. StreamSupport.stream() instantiates ReferencePipeline.Head from spliterator, or spliterator-supplier and characteristics.
  2. Intermediate ops instantiate a subclass of ReferencePipeline.StatelessOp or ReferencePipeline.StatefulOp (new: or GathererOp), with the previous pipeline as parent.
    • The subclass overrides opWrapSink(), and if op is stateful overrides opEvaluateParallel() and optionally opEvaluateParallelLazy().
    • opWrapSink() is the "sequential" implementation that describes whether/how to propagate received elements to a downstream sink. This method is always used for stateless ops (which leave the "opEvaluateParallel" methods unimplemented), and only used for stateful ops if the pipeline is sequential.
  3. Terminal ops (except toArray(gen), spliterator(), and derivatives) instantiate a subclass of TerminalOp, and call pipeline.evaluate(terminalOp) to consume the stream and produce a result.
    1. This constructs a sourceSpliterator(), which will initially be the Head spliterator, and if parallel will be replaced with the spliterator result of calling opEvaluateParallelLazy() on each stateful op in the pipeline, in order.
      • By default, opEvaluateParallelLazy() calls opEvaluateParallel(), which by default throws. Stateful ops must at least override opEvaluateParallel().
      • opEvaluateParallel() produces a Node. Different implementations take different approaches:
        • WhileOps instantiates and invokes a custom subclass of Abstract[ShortCircuit]Task to produce a Node.
        • SortedOps evaluate():s the pipeline to an array, calls Arrays.parallelSort(), and wraps the result in a Node.
        • SliceOps sometimes instantiates a custom task, and sometimes instantiates a custom wrapping spliterator and consumes it with Nodes.collect().
        • DistinctOps instantiates a terminal op (ReduceOp or ForEachOp) to evaluateParallel(), and wraps the result in a Node.
      • opEvaluateParallelLazy() instantiates a custom wrapping spliterator if possible, else essentially does opEvaluateParallel().spliterator().
    2. Then it returns either terminalOp.evaluateSequential() or terminalOp.evaluateParallel(), passing the pipeline and spliterator.
      • evaluateSequential():
        • Typically implemented by instantiating a custom TerminalSink, then calling pipeline.wrapAndCopyInto(sink, spliterator).get().
        • wrapAndCopyInto() first wraps the sink via sink = opWrapSink(sink) on each pipeline stage, in reverse order. Then begin():s the sink, advances the spliterator into it (checking cancellation before each advance if the pipeline is short-circuiting), then end():s the sink.
      • evaluateParallel():
        • Typically implemented by instantiating and invoking a custom ForkJoinTask - may extend Abstract[ShortCircuit]Task, or even CountedCompleter directly. The TerminalOp itself is typically passed to this task and used in leaf tasks to implement behavior similar to evaluateSequential() - instantiating a custom TerminalSink, wrapping it, and advancing a spliterator into it to get a task result.
  4. The remaining terminal ops (toArray(gen), spliterator(), and derivatives) are implemented directly on AbstractPipeline:
    • toArray(gen):
      • Calls evaluateToArrayNode()...
        • which calls opEvaluateParallel() (if parallel and last op was stateful), OR evaluate()...
          • which calls wrapAndCopyInto(makeNodeBuilder(), sourceSpliterator).build() (if sequential), OR evaluateToNode() (if parallel)...
            • which consumes the sourceSpliterator with Nodes.collect()
    • spliterator():
      • if Head, returns the original spliterator if present, or the spliterator-supplier wrapped in a delegating spliterator.
      • else, wraps a supplier of sourceSpliterator() in a spliterator, that itself wraps the (remaining) pipeline around a buffering sink.
        • Advancing this wrapped spliterator will first look for the next element in the buffer. If the buffer is empty, the wrapped spliterator will advance the source spliterator into the wrapped sink (which may buffer multiple outputs), until either the buffer is not empty or the source spliterator is exhausted.

# Stream op flags usages

This list details where each stream op flag is read to optimize the stream.

Related usages:


# Stream operations summary

The below table details how each stream operation affects stream op flags and memory usage in isolation.

Note: The "ΔFlags" below (mostly corresponding to AbstractPipeline.sourceOrOpFlags) represent the changes that one operation applies to the "cumulative" stream flags (mostly corresponding to AbstractPipeline.combinedFlags). For intermediate operations, these changes are applied AFTER the operation, to affect downstream operations. For terminal operations, these changes are applied BEFORE the operation, to affect the terminal operation itself (if it even bothers to use this mechanism).

Note: The "Memory Usage" columns below sometimes refer to what I call "amplifying" operations - operations that may increase the number of elements in the stream. These include flatMap(), mapMulti(), and gather(). Conversely, "de-amplifying" operations may decrease the number of elements in the stream. These include filter(), skip(), limit(), takeWhile(), and dropWhile() - as well as flatMap(), mapMulti(), and gather() again (since they may emit no output elements for an input element). I conservatively ignore the possible effect of de-amplifying operations when considering memory usage.

Intermediate Operations ΔFlags (combined AFTER operation) Memory Usage
DISTINCT SORTED ORDERED SIZED SHORT_CIRCUIT SIZE_ADJUSTING sequential parallel
Stateless map()
scrawl
map(mapper) -> StatelessOp {
    NOT_SORTED | NOT_DISTINCT
    opWrapSink(flags, sink) {
        apply mapper and emit transformed element downstream
        cancellationRequested() = downstream.cancellationRequested()
    }
}
CLEARS CLEARS         Okay Okay
flatMap()
scrawl
flatMap(mapper) -> StatelessOp {
    NOT_SORTED | NOT_DISTINCT | NOT_SIZED
    opWrapSink(flags, sink) {
        apply mapper and emit stream of elements downstream (until downstream cancels, if short-circuiting)
        cancellationRequested() = cancel || (cancel |= sink.cancellationRequested())
    }
}
CLEARS CLEARS   CLEARS     Okay Okay
mapMulti()
scrawl
mapMulti(mapper) -> StatelessOp {
    NOT_SORTED | NOT_DISTINCT | NOT_SIZED
    opWrapSink(flags, sink) {
        apply mapper to emit multiple elements downstream
        cancellationRequested() = downstream.cancellationRequested()
    }
}
CLEARS CLEARS   CLEARS     Okay Okay
filter()
scrawl
filter(predicate) -> StatelessOp {
    NOT_SIZED
    opWrapSink(flags, sink) {
        emit element downstream if passes predicate
        cancellationRequested() = downstream.cancellationRequested()
    }
}
      CLEARS     Okay Okay
peek()
scrawl
peek(action) -> StatelessOp {
    0
    opWrapSink(flags, sink) {
        apply action then emit same element downstream
        cancellationRequested() = downstream.cancellationRequested()
    }
}
            Okay Okay
unordered()
scrawl
unordered() -> StatelessOp {
    NOT_ORDERED
    opWrapSink(flags, sink) {
        sink // No-op
        cancellationRequested() = downstream.cancellationRequested()
    }
}
    CLEARS       Okay Okay
Stateful skip()
scrawl
skip(skip)   -> SliceOps.makeRef(this, skip, -1) -> StatefulOp {
limit(limit) -> SliceOps.makeRef(this, 0, limit) -> StatefulOp {
    IS_SIZE_ADJUSTING | ((limit != -1) ? IS_SHORT_CIRCUIT : 0)
    opWrapSink(flags, sink) {
        ignore first skip elements, then emit next limit elements
        // cancellationRequested() = reached limit || downstream.cancellationRequested()
    }
    opEvaluateParallel(ph, split, gen) {
        if size > 0 && SUBSIZED: StreamSpliterators.SliceSpliterator.OfRef(_) |> Nodes.collect(_)
        if !ORDERED: StreamSpliterators.UnorderedSliceSpliterator.OfRef(_) |> Nodes.collect(_)
        else: new SliceTask(_).invoke()
    }
    opEvaluateParallelLazy(ph, split) {
        if size > 0 && SUBSIZED: ph.wrapSpliterator(split) |> StreamSpliterators.SliceSpliterator.OfRef(_) {
            skips until start of slice (skip), then advances one-at-a-time until end of slice (limit)
            trySplit drops out-of-range splits to try splitting again
        }
        if !ORDERED: StreamSpliterators.UnorderedSliceSpliterator.OfRef(_) {
            there's an atomic long of permits, initially set to (limit >= 0 ? skip + limit : skip); permits monotonically decreases
            splitting is disabled if there are no permits left*
            if there are permits remaining, we optimistically read+buffer data from spliterator (tryAdvance <= 1, forEachRemaining <= chunkSize)
            then we request to acquire permits up to buffer size, reducing permits, receiving n <= request (n < request -> skipping or limiting)
            if > 0, we pass that many elements from the buffer to the action
        }
        else: SliceTask(_).invoke().spliterator() {
            each leaf copies its split into a wrapped node-builder
            onCompletion, non-leaf tasks concat child nodes, and root task truncates its node to the slice
            onCompletion, non-leaf non-root tasks cancel later tasks if their size + size of completed tasks to the left >= skip + limit
        }
    }
}
          SETS Okay Caution: (a) Eager if ORDERED and !(SIZED and source-spliterator is SUBSIZED), (b) Buffering risk if upstream contained amplifying operations
limit()         SETS SETS Okay Caution: (a) Eager if ORDERED and !(SIZED and source-spliterator is SUBSIZED), (b) Buffering risk if upstream contained amplifying operations
takeWhile()
scrawl
takeWhile(predicate) -> WhileOps.makeTakeWhileRef(this, predicate) -> StatefulOp {
    NOT_SIZED | IS_SHORT_CIRCUIT
    opWrapSink(flags, sink) {
        // take until predicate fails
        // cancellationRequested() = predicate failed || downstream.cancellationRequested()
    }
    opEvaluateParallel(ph, split, gen) {
        TakeWhileTask(_).invoke() {
            each leaf copies its split into a wrapped node-builder, with cancellation (if cancellationRequested, cancel later tasks)
            onCompletion, non-leaf tasks adopt left child node if ordered and left short-circuited, else concat left and right nodes
        }
    }
    opEvaluateParallelLazy(ph, split) {
        if ORDERED: opEvaluateParallel(_).spliterator()
        else: UnorderedWhileSpliterator.OfRef.Taking(_) {
            // Emits elements until predicate fails, or cancelled (predicate failed in another split; checked ~1/64 steps)
            // Splitting is disabled if cancelled
        }
    }
}
      CLEARS SETS   Okay Caution: (a) Eager if ORDERED, (b) Buffering risk if lazy AND upstream contained amplifying operations
dropWhile()
scrawl
dropWhile(predicate) -> WhileOps.makeDropWhileRef(this, predicate) -> StatefulOp {
    NOT_SIZED
    opWrapSink(flags, sink) {
        // drop until predicate passes (may retain and count dropped elements if used from DropWhileTask)
        // cancellationRequested() = downstream.cancellationRequested()
    }
    opEvaluateParallel(ph, split, gen) {
        DropWhileTask(_).invoke() {
            each leaf copies its split into a wrapped node-builder
            if unordered, the op sink drops elements (until the predicate fails) as it goes (no attempt to cancel others' dropping once failed)
            else, the op sink emits and counts "dropped" elements (into the node builder)
            onCompletion, non-leaf tasks concat child nodes, and if ordered use the drop counts to truncate the concat node from the left
            (ie, if a left node's drop count < its node count, subsequent node drop counts are ignored and "dropped" elements are preserved)
        }
    }
    opEvaluateParallelLazy(ph, split) {
        if ORDERED: opEvaluateParallel(_).spliterator()
        else: UnorderedWhileSpliterator.OfRef.Dropping(_) {
            // Drops elements until predicate fails, or cancelled (predicate failed in another split; checked ~1/64 steps)
        }
    }
}
      CLEARS     Okay Caution: (a) Eager if ORDERED, (b) Buffering risk if lazy AND upstream contained amplifying operations
gather()
scrawl
    gather(gatherer) -> GathererOp.of(this, gatherer) -> GathererOp {
        NOT_SORTED | NOT_DISTINCT | NOT_SIZED | (integrator is greedy ? 0 : IS_SHORT_CIRCUIT)
        opWrapSink(flags, sink) {
            initialize state, then integrate elements, then emit finishing elements
            stop emitting elements if integrator or downstream cancels
            cancellationRequested() = integrator returned false or downstream requested cancellation
        }
        opEvaluateParallel(ph, split, gen) {
            evaluate(wrappedSpliterator, _, NodeBuilder collector parts)
        }
        opEvalauteParallelLazy(ph, split) {
            opEvaluateParallel(_).spliterator()
        }
        collect(_) {
            // collect() is overridden to avoid gather() accumulating into a Node first
            // (since GathererOp reports that it is a stateful op)
            evaluate(wrappedSpliterator, _, collector parts)
        }
        evaluate(_) {
            Sequential {
                // Used directly if stream is sequential, else used indirectly by Hybrid/Parallel
                initialize gatherer + collector state, integrate elements into collector accumulator,
                then emit finishing elements and finish collector
                stop emitting elements if integrator cancels
            }
            Hybrid {
                // Used if stream is parallel and gatherer is sequential (default combiner)
                like parallel forEachOrdered(), but only buffers the upstream if gatherer is greedy
                makes no use of the gatherer's or collector's combiners
                cancels later tasks if !greedy and the integrator short-circuits
            }
            Parallel {
                // Used if stream is parallel and gatherer is not sequential (non-default combiner)
                parent tasks use the gatherer's and collector's combiners to merge childrens' state
                cancels later tasks if !greedy and the integrator short-circuits
            }
        }
    }
CLEARS CLEARS   CLEARS SETS if gatherer is !greedy   Caution: Memory usage depends on how the gatherer's state grows Caution: (a) Eager unless followed by collect(), (b) Memory usage depends on how the gatherer's state grows, (c) Buffers elements from upstream until predecessor tasks finish if gatherer is sequential and greedy, (d) Buffering risk if gatherer is !greedy AND upstream contained amplifying operations
distinct()
scrawl
distinct() -> DistinctOps.makeRef(this) -> StatefulOp {
    IS_DISTINCT | NOT_SIZED
    opWrapSink(flags, sink) {
        if DISTINCT: sink // No-op
        if SORTED: store last emitted element, emit next element if not equal
        else: store set of emitted elements, emit next element if not in set
        // cancellationRequested() = downstream.cancellationRequested()
    }
    opEvaluateParallel(ph, split, gen) {
        if DISTINCT: ph.evaluate(split, false, gen)
        if ORDERED: ReduceOps.makeRef(<into LinkedHashSet>).evaluateParallel(_) |> Nodes.node(<set>)
        else: ForEachOps.makeRef(<into ConcurrentHashMap>).evaluateParallel(_) |> Nodes.node(<map keys>)
    }
    opEvaluateParallelLazy(ph, split) {
        if DISTINCT: ph.wrapSpliterator(split) // No-op
        if ORDERED: ReduceOps.makeRef(<into LinkedHashSet>).evaluateParallel(_) |> Nodes.node(<set>).spliterator()
        else: StreamSpliterators.DistinctSpliterator(ph.wrapSpliterator(split)) {
            // Emits elements if not present in a ConcurrentHashMap, shared by all splits
        }
    }
}
SETS     CLEARS     Caution: Accumulates into a Set if !DISTINCT and !SORTED Caution: (a) Eager if !DISTINCT and ORDERED, (b) Accumulates into a Set if !DISTINCT and !ORDERED, (c) Buffering risk if lazy AND upstream contained amplifying operations AND downstream consumes elements one-at-a-time
sorted()
scrawl
sorted([cmp]) -> SortedOps.makeRef(this[, cmp]) -> StatefulOp {
    IS_ORDERED | (isNaturalSort ? IS_SORTED : NOT_SORTED)
    opWrapSink(flags, sink) {
        if SORTED && isNaturalSort: sink // No-op
        if SIZED: SizedRefSortingSink(sink, cmp) {
            // begin() by making an array, accept() into the array, end() by sorting and emitting all downstream
            // cancellationRequested() = false, but records that call happened
            //  - indicates that pipeline is short-circuiting, so end() should poll downstream.cancellationRequested() after each emission
        }
        else: RefSortingSink(sink, cmp) {
            // begin() by making a list, accept() into the list, end() by sorting and emitting all downstream
            // cancellationRequested() = false, but records that call happened
            //  - indicates that pipeline is short-circuiting, so end() should poll downstream.cancellationRequested() after each emission
        }
    }
    opEvaluateParallel(ph, split, gen) {
        if SORTED && isNaturalSort: ph.evaluate(split, false, gen)
        else: ph.evaluate(split, true, gen).asArray(gen) |> Arrays.parallelSort(_) |> Nodes.node(_)
    }
    //--opEvaluateParallelLazy(ph, split);
}
  SETS if no comparator else CLEARS SETS       Caution: Accumulates elements into an array/List if !(SORTED and no comparator) Warning: Always eager; Accumulates elements into an array/List
Terminal Operations ΔFlags (combined BEFORE operation) Memory Usage
DISTINCT SORTED ORDERED SIZED SHORT_CIRCUIT SIZE_ADJUSTING sequential parallel
  min()
scrawl
min(cmp) -> reduce(BinaryOperator.minBy(cmp))
max(cmp) -> reduce(BinaryOperator.maxBy(cmp))
            Okay Okay
max()             Okay Okay
findFirst()
scrawl
findFirst() -> evaluate(FindOps.makeRef(true))  -> TerminalOp {
findAny()   -> evaluate(FindOps.makeRef(false)) -> TerminalOp {
    getOpFlags() {
        IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : NOT_ORDERED)
    }
    evaluateSequential(ph, split) {
        ph.wrapAndCopyInto(FindSink(), split).get() ?: emptyValue
    }
    evaluateParallel(ph, split) {
        mustFindFirst = ORDERED, FindTask(_).invoke() {
            each leaf copies into a FindSink()
            if it finds a result:
                if op is findAny or node is leftmost: short-circuit shared/root result
                else: cancel later tasks (prevent further splitting/forking)
            onCompletion, non-leaf tasks adopt the first present result of their children (if any, and cancel later tasks)
        }
    }
    FindSink {
        accept(value) {
            if !hasValue: hasValue = true, this.value = value
        }
        get() {
            hasValue ? Optional.of(value) : null
        }
        cancellationRequested() {
            hasValue
        }
    }
}
        SETS   Okay Okay
findAny()     CLEARS   SETS   Okay Okay
anyMatch()
scrawl
anyMatch(predicate)  -> evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY))  -> TerminalOp {
allMatch(predicate)  -> evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL))  -> TerminalOp {
noneMatch(predicate) -> evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE)) -> TerminalOp {
    getOpFlags() {
        IS_SHORT_CIRCUIT | NOT_ORDERED
    }
    evaluateSequential(ph, split) {
        ph.wrapAndCopyInto(MatchSink(), split).getAndClearState()
    }
    evaluateParallel(ph, split) {
        MatchTask(_).invoke() {
            doLeaf():
                boolean b = ph.wrapAndCopyInto(MatchSink(), split).getAndClearState()
                if (b == matchKind.shortCircuitResult) shortCircuit(b)
            getEmptyResult(): !matchKind.shortCircuitResult
        }
    }

    MatchSink {
        boolean stop, value = !matchKind.shortCircuitResult

        accept(t) {
            if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) {
                stop = true;
                value = matchKind.shortCircuitResult;
            }
        }
        getAndClearState() { // returns boolean; avoids boxing like TerminalSink.get()
            value
        }
        cancellationRequested() {
            stop
        }
    }
}
    CLEARS   SETS   Okay Okay
allMatch()     CLEARS   SETS   Okay Okay
noneMatch()     CLEARS   SETS   Okay Okay
count()
scrawl
count() -> evaluate(ReduceOps.makeRefCounting()) -> TerminalOp {
    getOpFlags() {
        NOT_ORDERED
    }
    evaluateSequential(ph, split) {
        if size is known: size
        else: ph.wrapAndCopyInto(CountingSink(), split)
    }
    evaluateParallel(ph, split) {
        if size is known: size
        else: ReduceTask(this, ph, split).invoke().get()
    }

    CountingSink {
        begin(size)    { count = 0 }
        accept(t)      { count++ }
        combine(other) { count += other.count }
        get()          { count }
        //--cancellationRequested()
    }
}
    CLEARS       Okay Okay
forEach()
scrawl
forEach(action)        -> evaluate(ForEachOps.makeRef(action, false)) -> TerminalOp, TerminalSink {
forEachOrdered(action) -> evaluate(ForEachOps.makeRef(action, true))  -> TerminalOp, TerminalSink {
    getOpFlags() {
        ordered ? 0 : NOT_ORDERED
    }
    evaluateSequential(ph, split) {
        ph.wrapAndCopyInto(this, split).get()
    }
    evaluateParallel(ph, split) {
        if (ordered): ForEachOrderedTask(_).invoke() {
            if a leaf task is blocked by a predecessor, it copies its spliterator into a wrapped nodebuilder (runs the pipeline above forEachOrdered)
            onCompletion, the node (if created, else spliterator) is forEach-d with the action
        }
        else: ForEachTask(_).invoke() {
            each leaf task copies its split into the (pre-wrapped) sink
        }
    }

    // From TerminalSink:
    accept(t) {
        action.accept(t)
    }
    get() {
        null
    }
    //--cancellationRequested()
}
    CLEARS       Okay Okay
forEachOrdered()             Okay Warning: Buffers elements from upstream until predecessor tasks finish
toArray()
scrawl
toArray(gen) -> Nodes.flatten(evaluateToArrayNode(gen), gen).asArray(gen)
toArray() -> toArray(Object[]::new)
toList() -> unmodifiableListAroundArray(toArray())
            Warning: Accumulates elements into an array Warning: Accumulates elements into an array
toList()             Warning: Accumulates elements into an array Warning: Accumulates elements into an array
reduce()
scrawl
reduce(_) -> evaluate(ReduceOps.makeRef(_)) -> TerminalOp {
    getOpFlags() {
        0
    }
    evaluateSequential(ph, split) {
        ph.wrapAndCopyInto(ReducingSink(), split)
    }
    evaluateParallel(ph, split) {
        ReduceTask(this, ph, split).invoke().get()
    }

    ReducingSink {
        begin(size)    { state = seed }
        accept(t)      { state = reducer.apply(state, t) }
        combine(other) { state = combiner.apply(state, other.state) }
        get()          { state }
        //--cancellationRequested()
    }

    ReduceTask {
        each leaf task does result = ph.wrapAndCopyInto(op.makeSink(), split)
        onCompletion, non-leaf tasks do leftResult.combine(rightResult), result = leftResult
    }
}
            Caution: Memory usage depends on how the reduction's accumulation grows Caution: Memory usage depends on how the reduction's accumulation grows
collect()
scrawl
collect(_) -> {
    if isParallel() && collector is CONCURRENT && (!ORDERED || collector is UNORDERED):
        container = supplier.get(), forEach(u -> accumulator.accept(container, u))
    else: container = evaluate(ReduceOps.makeRef(collector)) -> TerminalOp {
        getOpFlags() {
            collector is UNORDERED ? NOT_ORDERED : 0
        }
        evaluateSequential(ph, split) {
            ph.wrapAndCopyInto(ReducingSink(), split)
        }
        evaluateParallel(ph, split) {
            ReduceTask(this, ph, split).invoke().get()
        }

        ReducingSink {
            begin(size)    { state = supplier.get() }
            accept(t)      { accumulator.accept(state, t) }
            combine(other) { state = combiner.apply(state, other.state) }
            get()          { state }
            //--cancellationRequested()
        }
    }
    collector is IDENTITY_FINISH ? container : finisher.apply(container)
}
    CLEARS if collector is UNORDERED       Caution: Memory usage depends on how the collector's accumulation grows Caution: Memory usage depends on how the collector's accumulation grows
spliterator()
scrawl
spliterator() {
    this == sourceStage && sourceSpliterator != null -> sourceSpliterator
    this == sourceStage && sourceSupplier != null -> lazySpliterator(sourceSupplier)
    else -> wrap(this, () -> sourceSpliterator(0), isParallel())
}
            Caution: (a) Upstream memory usage will be realized upon consumption, (b) Buffering risk if upstream contained amplifying operations Caution: (a) Upstream memory usage will be realized upon consumption, (b) Buffering risk if upstream contained amplifying operations
iterator()
scrawl
Spliterators.iterator(spliterator())
            Caution: (a) Upstream memory usage will be realized upon consumption, (b) Buffering risk if upstream contained amplifying operations Caution: (a) Upstream memory usage will be realized upon consumption, (b) Buffering risk if upstream contained amplifying operations

Caution: Splitting a naive spliterator may cause buffering. In parallel streams, the first eager operation (stateful or terminal) can evoke heavy memory usage if the initial spliterator's trySplit() relies on buffering many/large elements. For instance, Spliterators.AbstractSpliterator and Spliterators.IteratorSpliterator (often used when converting an iterator to a spliterator) both split by instantiating an array and advancing elements into it, then returning a spliterator over the array. Each time the original spliterator splits, the capacity of the array it instantiates and fills is increased (up to a threshold). If the original spliterator covers sufficiently many/large elements, it can eventually run out of memory attempting to split.

Caution: Amplifying operations that feed into a pipeline-wrapping spliterator may cause buffering. Lazy operations necessarily wrap the upstream pipeline in a spliterator. When we initially call tryAdvance() on this upstream-wrapping spliterator (StreamSpliterators.WrappingSpliterator), it calls tryAdvance() on the incoming pipeline spliterator, passing a consumer that applies the upstream pipeline operations and ultimately pushes output elements into a buffer. This buffer is because the pipeline may contain amplifying operations that can produce multiple output elements per input element from the incoming spliterator. The next time tryAdvance() is called on the upstream-wrapping spliterator, it will poll from the buffer if it is not empty. So, in the case that we have an upstream-wrapping spliterator over large amplifying operations, and we consume it with tryAdvance(), we can actually use a lot of memory.

See examples
// The simplest example to demonstrate an OutOfMemoryError due to buffering is to use the spliterator()
// lazy terminal op to wrap an infinitely-amplifying upstream, and then tryAdvance() the spliterator:
Stream.of(0).flatMap(_ -> Stream.generate(() -> 0)).spliterator().tryAdvance(_ -> {});  // Throws OOME

// But since parallel-lazy stateful ops also wrap the upstream in a spliterator,
// we can evoke this OOME a few other ways:
//                                 |  "large" amplifying operation        |  lazy operation that |  operation that causes
//                                 |  (infinite in this case)             |  wraps upstream in a |  tryAdvance() to be called
//                                 v                                      v  spliterator         v  on the wrapping spliterator
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).skip(10)              .findFirst();  // All
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).limit(10)             .findFirst();  // of
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).takeWhile(i -> i < 10).findFirst();  // these
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).dropWhile(i -> i < 10).findFirst();  // throw
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).distinct()            .findFirst();  // OOME

// Above we used the short-circuiting findFirst() operation to ensure we call tryAdvance() instead
// of forEachRemaining() on the incoming spliterator. However, even non-short-circuiting operations
// like forEach() will still cause tryAdvance() to be called on the upstream-wrapping spliterator in
// most of these cases, since _that_ spliterator is wrapped in the lazy stateful op's spliterator,
// and most of those have a forEachRemaining() that calls tryAdvance() on the wrapped spliterator.
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).skip(10)              .forEach(_ -> {});  // These
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).limit(10)             .forEach(_ -> {});  // still
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).takeWhile(i -> i < 10).forEach(_ -> {});  // throw
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).dropWhile(i -> i < 10).forEach(_ -> {});  // OOME

// Except this one! StreamSpliterators.DistinctSpliterator.forEachRemaining()
// calls forEachRemaining() on the wrapped spliterator:
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).distinct()            .forEach(_ -> {});  // No OOME; Runs forever

// There are also a few parallel-eager stateful ops that wrap the upstream in a spliterator
// before consuming it with tryAdvance().

// Here, we make unordered skip()/limit() parallel-eager in a way that still wraps the upstream
// spliterator, by following them with toArray()/toList().
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).skip(10).toArray();   // OOME (Okay, silly - toArray() was going to buffer everything anyway)
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).limit(10).toArray();  // OOME (Less silly)

// Here, all we have to do is use a non-greedy (aka short-circuiting) gatherer.
// Regardless of the subsequent operations, the parallel gather() wraps the upstream in a spliterator,
// and the non-greedy gatherer forces gather() to consume that spliterator via tryAdvance().
Stream.of(0).parallel().flatMap(_ -> Stream.generate(() -> 0)).gather(nonGreedyGatherer).forEach(_ -> {});  // OOME

// We don't even have to be parallel if the gather() is followed by collect(),
// which again causes the implementation to wrap the upstream in a spliterator.
Stream.of(0).flatMap(_ -> Stream.generate(() -> 0)).gather(nonGreedyGatherer).collect(anyCollector);  // OOME

// Note that currently, all existing gatherers provided by java.util.stream.Gatherers are greedy,
// but you can craft your own gatherers that are not.

Note: Though parallel forEachOrdered() and parallel gather() (with a sequential + greedy Gatherer) both buffer the upstream in leaf tasks, the actual amount buffered depends on the leaf spliterator size, the parallelism of the ForkJoinPool running the stream, and whether the upstream uses amplifying/de-amplifying operations. The buffering in these cases does not necessarily hold the entire upstream in memory at once.


# Stream planner

This table simulates what will happen to stream op flags and memory usage across stream operations.

Try adding/changing stream operations and initial spliterator characteristics.

Stages Spliterator Characteristics ΔFlags Flags Memory Usage Notes
DI SO OR SI NO IM CO SU DI SO OR SI SC SA DI SO OR SI SC SA
    StreamSupport.stream(spliterator, )                            

Legend

lazy barrier: (Dashed line) Indicates that a (parallel) stateful op evaluates lazily, by wrapping the incoming spliterator in a new spliterator that implements the pipeline since the last barrier, and becomes the new "incoming" spliterator downstream. A lazy barrier is also drawn for the terminal operations spliterator()/iterator(), since they likewise wrap the incoming spliterator.

eager barrier: (Solid line) Indicates that a (parallel) stateful op evaluates eagerly, by accumulating the upstream pipeline into a Node, whose spliterator becomes the new "incoming" spliterator downstream. An eager barrier is also drawn for (sequential) sorted(), since it must accumulate all elements from the upstream pipeline before emitting any downstream. (It is possible for (sequential) gather() to behave in this way too, but for now no barrier is drawn.) If a stateful op is fused to a toArray()/toList() terminal op, no barrier is drawn - the stateful op evaluates in parallel-eager mode to produce a Node, but that Node is converted to an array directly, rather than taking its spliterator (and toArray()/toList() are already eager). If gather() is fused to collect(), no barrier is drawn - the gather() no longer evaluates in parallel-eager mode to produce a Node, and instead emits directly into the collector's accumulator.

Spliterator Characteristics, ΔFlags, and Flags are all "outgoing" from the stage they are adjacent to. (The exception to this is eager terminal ops, which do not have an outgoing spliterator, and apply flag deltas BEFORE evaluation.) To help visualize stateful op barriers, they are drawn going under the stage, and above the rest of the row.

Spliterator Characteristics ΔFlags Flags
DI DISTINCT DI DISTINCT DI DISTINCT
SO SORTED SO SORTED SO SORTED
OR ORDERED OR ORDERED OR ORDERED
SI SIZED SI SIZED SI SIZED
NO NONNULL SC SHORT_CIRCUIT SC SHORT_CIRCUIT
IM IMMUTABLE SA SIZE_ADJUSTING SA SIZE_ADJUSTING
CO CONCURRENT   No delta   Flag is known absent
SU SUBSIZED S Delta sets flag Y Flag is known present
? Characteristic is unknown† (assumed absent) C Delta clears flag  
  Characteristic is known absent C/S Delta changes‡ (e.g. from "clears" to "sets")
Y Characteristic is known present  

† When a stateful op is evaluated in parallel-eager mode, the implementation of the resultant Node (and its spliterator) can depend on details that this tool doesn't or can't model, such as the size of the input spliterator, whether/how that spliterator splits, and whether nodes are concatenated, truncated, etc. In these cases, I depict the intersection of all possible sets of spliterator characteristics as "known", and the remaining possible characteristics as "unknown (assumed absent)". In practice, this is mostly benign, but can occasionaly lead to some mispredictions when the actual spliterator would have been SUBSIZED.

‡ Deltas may be depicted as "changed" for one of several reasons:

  1. After AbstractPipeline.sourceSpliterator() evaluates a stateful op (in parallel mode), it overwrites the SIZED delta based on the resultant spliterator, and removes the SHORT_CIRCUIT delta (if any).
  2. For fused gather() ops, the actual implementation removes the first gather() stage entirely, and fuses its Gatherer to the Gatherer of the second gather() stage. I chose to depict this as removing the deltas from the first stage (and displaying its Memory Usage as N/A).
  3. For the niche case of a gather() fused to a collect() with an UNORDERED collector, the collect() no longer clears the ORDERED flag. I chose to depict this as removing the delta.


# Additional tangents

This is a list of some interesting tidbits that didn't make it elsewhere.