I made this as someone who loves reading into OpenJDK source code, and obsesses over streaming and concurrency, but found the Java Streams source particularly difficult to grok in as much detail as I'd like. I'm hoping this document will make the implementation and performance characteristics a little more accessible to myself and others. I know this may be a moving target, especially with Gatherers now being introduced, but perhaps patching or versioning this document as the implementation evolves will be easier with some groundwork laid.
This document is intended to be intermediate- to expert-friendly, not necessarily beginner-friendly. I may not dwell too long on terminology or public abstractions that have been well-hashed elsewhere. I recommend other resources for that, such as the stream package doc, and this article series by Brian Goetz.
This document is current with JDK 24 [build 34], and consists of a single html file.
If you have substantive additions or corrections to propose, I have (hesitantly) created an issue tracker for this site. Or, if you'd like to work with me, you can reach me at danielaveryj@gmail.com.
Caution: Depending on implementation details is inherently risky! This document is not intended to mislead anyone into believing that subsequent (or prior) JDK versions will adhere to the same design or performance characteristics.
Note: In this doc I am using a slightly different definition of "eager" and "lazy" than the stream package doc, which uses "eager" and "lazy" to refer to roughly "when the stream (as a whole) executes", rather than "when specific operations on the stream execute (during the whole execution)". By the former definition, all intermediate operations are lazy. By the latter definition (which I extrapolated from AbstractPipeline.opEvaluateParallelLazy() and AbstractPipeline.opEvaluateParallel()[Eager?]), stateful intermediate operations on a parallel stream may execute either eagerly or lazily. By both definitions, all existing terminal operations are eager, except for spliterator()/iterator().
Java Streams use data parallelism:
Data parallelism in a pipeline | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Pipeline | Data | |||||||||||||||
↓ | ↓ | ↓ | ↓ | |||||||||||||
Task D | Task E | Task F | Task G | |||||||||||||
map() | ↓ | ↓ | ↓ | ↓ | ||||||||||||
filter() | ||||||||||||||||
flatMap() | ||||||||||||||||
collect() | ||||||||||||||||
Task B | Task C | |||||||||||||||
↓ | ↓ | |||||||||||||||
Task A | ||||||||||||||||
↓ |
Here, the pipeline consists of several operations, terminated by the first "eager" operation (in this case, "collect()"). The eager operation associates the data with a root task (think: thread), which splits the data and associates each split with a child task. Each child task repeats this process, until the data will split no more, indicating a leaf task. Each leaf task consumes its split of the data, applying the pipeline operations and accumulating each element into a result, according to the eager operation. Each parent task combines its children's results to form its own result, according to the eager operation. This continues until the root task has a result.
In parallel Java Streams, pipelines can have multiple eager operations, by requiring each intermediate eager operation to produce a new dataset as its result. This becomes input to the remaining pipeline, down to the next eager operation.
For data parallelism to achieve speedup, the data should be large enough - and/or the operations time-intensive enough - to offset the cost of the multitasking overhead (e.g. initial thread scheduling, and result-combining). However, note that in Java Streams, after any intermediate eager operation, the resultant data will be resident in memory (so "too large" is possible).
Data parallelism is typically contrasted with task parallelism:
Task parallelism in a pipeline | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Pipeline | Data | |||||||||||||||
↓ | ||||||||||||||||
Task C | ||||||||||||||||
map() | ↓ | |||||||||||||||
buffer (C pushes, B polls) | ||||||||||||||||
↓ | ||||||||||||||||
Task B | ||||||||||||||||
filter() | ↓ | |||||||||||||||
buffer (B pushes, A polls) | ||||||||||||||||
↓ | ||||||||||||||||
Task A | ||||||||||||||||
flatMap() | ↓ | |||||||||||||||
collect() |
Here, the pipeline consists of several operations, optionally split up by buffers, and ending in a terminal operation (in this case, "collect()"). Tasks are created to produce or poll elements from upstream, apply pipeline operations, and consume or push elements downstream. The last task accumulates elements into a result, according to the terminal operation.
Buffers enable tasks to progress concurrently, but are typically bounded in size to constrain memory usage. If a buffer fills up, the upstream task must wait until it is not full; if a buffer empties, the downstream task must wait until it is not empty. In the case of small/temporary mismatches in upstream and downstream processing rate, a larger buffer spends memory to buy concurrency, enabling tasks to spend less time waiting on each other.
For task parallelism to achieve speedup, the operations should be time-intensive enough to offset the cost of the multitasking overhead (e.g. upstream task context-switching when downstream buffer is full, and downstream task context-switching when upstream buffer is empty). This "time intensity" typically comes from operations being IO-bound (and thus already having context-switching and downtime).
The "declarative" approach to task parallelism depicted here - interacting with a first-class "pipeline" object, with operations that abstract over tasks and buffers - has been modeled by e.g. the various Reactive Streams libraries, and Kotlin Flows. An "imperative" approach - interacting with the lower-level tasks (or actors/threads/processes) and buffers (or mailboxes/queues/channels) more directly - has been modeled by e.g. Akka's actors, Erlang's processes, and Go's goroutines + channels.
We can't really use data paralleism in an arbitrary task-parallel task, because in general we don't have the data in hand in order to split it. The data arrives incrementally over time. We can instead do something related, and route the data to one of several downstream tasks as it arrives - a kind of load-balancing operation. Note that pipeline topology need not be linear - it can diverge (fan-out) and even re-converge (fan-in).
We can use task parallelism in (leaf) data-parallel tasks. However, if the operations are CPU-bound, which Java Streams was traditionally intended for, then this does nothing but add the devastating IO overhead of waiting on full/empty buffers and context-switching. That said, the new Gatherers.mapConcurrent() introduces a limited mechanism for IO-bound work, that actually encapsulates a buffer, fan-out, and fan-in, in one operation.
A Java Stream is created from an initial spliterator, which yields elements of a dataset. In sequential execution, the initial spliterator is advanced into a consumer representing the entire stream pipeline. In parallel execution, the stream is divided into "segments", where each stateful intermediate operation ends a segment. These intermediate operations may be either "eager" or "lazy".
Eager operations are implemented as described by data parallelism above. They consume the most recent spliterator, and (if intermediate) buffer output elements into a new dataset and yield its spliterator. Eager operations cannot emit downstream before consuming the full upstream.
Lazy operations are implemented by wrapping the most recent spliterator in a new spliterator that implements the upstream pipeline - from just after the last segment ended, down to and including the operation itself. Lazy operations can emit downstream before consuming the full upstream.
Whether a given stateful intermediate operation executes eagerly or lazily often depends on characteristics of the stream (see the Stream operations summary and Stream planner).
The terminal operation ends the last segment. Almost all terminal operations are eager - they consume the most recent spliterator, rather than wrap it. The exceptions are the lazy terminal operations spliterator() and iterator() - they wrap (a supplier of) the most recent spliterator.
The below table contains a short description of each top-level class in java.util.stream, to get your bearings.
Note: The JDK goes out of its way to provide "primitive" stream types (i.e. IntStream, LongStream, DoubleStream) that meticulously avoid boxing their primitive elements, for maximum performance. This results in more complicated class structures designed to support both reference- and primitive-flavored streams.
// Primitive subclasses of reference variant:
class SomeClass {
// ...shared and reference-specific methods...
static class OfInt extends SomeClass { ... }
static class OfLong extends SomeClass { ... }
static class OfDouble extends SomeClass { ... }
}
// Reference and primitive subclasses of abstract parent:
abstract class SomeClass {
// ...shared methods...
static class OfRef extends SomeClass { ... }
static class OfInt extends SomeClass { ... }
static class OfLong extends SomeClass { ... }
static class OfDouble extends SomeClass { ... }
}
// Sometimes an additional class layer is added above the primitive specializations:
abstract class SomeClass {
// ...shared methods...
static class OfRef extends SomeClass { ... }
abstract static class OfPrimitive extends SomeClass { ... } // <-- See here
static class OfInt extends SomeClass.OfPrimitive { ... }
static class OfLong extends SomeClass.OfPrimitive { ... }
static class OfDouble extends SomeClass.OfPrimitive { ... }
}
// Sometimes classes are nested (as shown above), sometimes not
In this doc I try to focus on the "reference" stream type (i.e. Stream) and its supporting classes. The
code in the primitive counterparts will be highly reminiscent, except for using primitive-specific subclasses
and primitive-accepting/returning variants of methods to avoid boxing.
Many of my detail notes are presented as a mix of pseudocode and prose, that I will refer
to as "scrawl" (to dampen expectations). Scrawl is tailored for brevity, rather than precision.
Some scrawl I have put more effort into, some less. This "flashcard"-esque style helps me build
a mental map of the code, but requires reading between the lines. To give a sense of some of the
obscurities used in scrawl:
I have included the scrawl just in case it is helpful, but it may not be, and you are not
obligated to read it. In fact, I can only recommend reading scrawl alongside the actual source
code (and a hot drink). Good luck, and have fun! :)
Aside: What is "scrawl"?
Top-level java.util.stream classes | High-level summary | ||
---|---|---|---|
Model | BaseStream (public) |
scrawl
|
Interface extended by all streams (reference and primitive), that encompasses only the broadest stroke of what a stream is to Java: Something that wraps an iterator/spliterator, and may be ordered/unordered, sequential/parallel, and closeable. |
Stream (public) |
scrawl
|
The familiar interface that declares available operations on streams. This is the "reference"-flavored variant; I have elided the primitive variants (IntStream, LongStream, DoubleStream) in this document for brevity, since their implementations differ only mildly. | |
Sink |
scrawl
|
A Consumer extended with one-shot methods to inform that it is "about to be sent elements" and "done being sent elements", as well as a method to query if it "wants more elements?". Intermediate operations on streams make heavy use of "sink-wrapping" - taking a downstream sink and creating a new sink that adds its own state/handling for incoming elements, before (conditionally) passing elements downstream. | |
PipelineHelper |
scrawl
|
An abstract class extended by AbstractPipeline, and declaring some of the core "heavy-hitting" internal methods for advancing elements through the stream. As AbstractPipeline is the only subclass, it's not clear that PipelineHelper is really needed, but it does have fewer type parameters to spell out compared to passing AbstractPipeline around. | |
AbstractPipeline |
scrawl
|
Arguably the most important class in the package, extended by all the (reference and primitive) "pipeline" classes that implement the (reference and primitive) "stream" interfaces. AbstractPipeline defines the core methods for advancing elements through the stream, wrapping successive operation sinks, etc. AbstractPipelines are constructed to form a doubly-linked list of "stages" - each intermediate stream operation creates a new AbstractPipeline linked to the previous one, retaining access to stream characteristics and operation-specific behavior at every previous stage. Like many abstract classes in this package with reference- and primitive-flavored subclasses, AbstractPipeline declares and calls into several abstract methods that are implemented in subclasses, allowing for type specialization. | |
ReferencePipeline |
scrawl
|
Extends AbstractPipeline to implement Stream, by implementing leftover specializable methods from AbstractPipeline, as well as the familiar stream operation methods (which mostly delegate to utility classes, except for the simpler stateless operations). ReferencePipeline itself is still abstract, and declares several static inner subclasses: Head, for the first pipeline stage (simply wrapping a spliterator), plus StatelessOp and StatefulOp, for intermediate operations. | |
TerminalOp |
scrawl
|
Interface that defines how to evaluate a terminal operation, in sequential and parallel modes, given an upstream pipeline and spliterator. Most terminal operations use this interface, but there are a couple special cases that are implemented directly on AbstractPipeline: toArray()/toList() and spliterator()/iterator(). | |
TerminalSink |
scrawl
|
A Sink that also extends Supplier. Used by terminal operations, which don't have a downstream sink, but do want to expose a result after the upstream is done sending elements. The TerminalSink interface is only used inside TerminalOps, and only if they have multiple TerminalSink implementations to abstract over. | |
StreamShape |
scrawl
|
Enum of the 4 (1 reference and 3 primitive) stream variants. Though passed around widely, its usages (aside from assertions) ultimately boil down to selecting a type-specialized implementation in a handful of places. | |
StreamOpFlag |
scrawl
|
Enum of stream characteristics, used to optimize various stream operations. Flags are initially derived from the Head spliterator characteristics, and may be set, cleared, or unaffected by each stream operation. Each pipeline stage tracks its own changes to the stream flags, so that the changes can be combined one-stage-at-a-time when the stream is evaluated. | |
AbstractSpinedBuffer |
scrawl
|
Like several other abstract classes, exists to deduplicate common functionality from reference and primitive subclasses. Extended by SpinedBuffer itself (reference variant), and the primitive variants, which are static inner classes inside SpinedBuffer. | |
SpinedBuffer |
scrawl
|
Basically an append-only list, backed by an array-of-arrays to avoid copy-on-resize. When a subarray ("chunk") fills up, a new one is allocated (with double the capacity) for subsequent elements. Copying only happens if the outer array ("spine") is filled with full chunks, at which point chunk references are copied to a new spine with double the (chunk) capacity. This data structure is used internally in places where elements must be collected into a buffer whose final size is not known upfront. | |
Node |
scrawl
|
Basically a list, backed by a binary tree of Nodes. A "leaf" Node simply wraps an array/collection, while an "internal" Node concatenates two child Nodes. The shape of a node tree generally corresponds to the shape of a parallel computation tree (of fork-join tasks) that produced it: Leaf tasks produce leaf nodes, and parent tasks concatenate child nodes. "The use of Node within the stream framework is largely to avoid copying data unnecessarily during parallel operations." Inner interface Node.Builder extends Sink. | |
AbstractTask |
scrawl
|
A ForkJoinTask (a CountedCompleter, specifically) used to implement the parallel-eager mode of most stateful or terminal stream operations. Each task is associated with a spliterator. When invoked, a task splits its spliterator (if possible and size is above a threshold) and passes the two halves to new child tasks, forming a parallel computation tree. If a task does not split, it is a leaf task, and consumes its spliterator in some way (left abstract) to produce a result, then tries to complete its parent. When both children complete, the parent completes, which is usually overridden to combine child results to produce its own result, before trying to complete its parent, and so on up the tree. When a task splits, it typically fork():s one child task (submits it to the ForkJoinPool, to be executed when a thread is available), and "becomes" the other (continues splitting the child's spliterator, or performs the leaf action if child is a leaf task). This avoids blocking the thread executing the task. | |
AbstractShortCircuitTask |
scrawl
|
An AbstractTask extended with the ability to short-circuit compare-and-set the root task's result, plus a 'canceled' flag (which is checked at least once, when the task is first invoked), and the ability to cancel all tasks that come later in the root spliterator's encounter order. Used to implement the parallel-eager mode of short-circuiting stateful or terminal stream operations. | |
Collector (public) | The (hopefully) familiar interface for expressing a custom, parallelizable, terminal reduction operation. A parallel computation tree would perform the operation by having each leaf task initialize an accumulation (via supplier()) and mutate it with incoming elements (via accumulator()), then have each parent task combine final child accumulations (via combiner()), and the root task transform the final combined accumulation (via finisher()). In sequential operation, the root task is a leaf task, and no combining is needed. Collectors can indicate by their Characteristics if they are "concurrent" (can use same accumulation in all tasks), "unordered" (can accumulate elements in any order), and/or use a no-op finisher(). This enables optimizations in some cases. | ||
Gatherer (public) | The interface for expressing a custom, optionally parallelizable, intermediate operation. A parallel computation tree would perform the operation by having each leaf task initialize some state (via initializer()) and optionally mutate it with incoming elements and/or emit downstream (via integrator()), then have each parent task combine final child state (via combiner()), and the root task use the final combined state to optionally emit more downstream. In sequential operation (a sequential Stream or a sequential Gatherer), no combining is needed. Gatherers can indicate by their arguments whether they are "stateless" (use a no-op initializer()), "greedy" (integrator never initiates cancellation - only propagates downstream cancellation), "sequential" (use a no-op combiner()), and/or use a no-op finisher(). This enables (or disables) optimizations in some cases. Two gatherers can be fused into one using Gatherer.andThen(). The resulting gatherer is stateless if both initial gatherers were stateless and greedy; greedy if both were greedy; sequential if either was sequential; and uses a no-op finisher() if both did. | ||
GathererOp | Implementation of gather() (not nested in a utility class, unlike other stateful ops). GathererOp extends ReferencePipeline directly, rather than StatelessOp or StatefulOp. This is presumably because gather() could theoretically be either a stateless or stateful intermediate operation, depending on the Gatherer. Currently it is conservatively always considered stateful. | ||
Utility | StreamSupport (public) | Creates streams from spliterators or suppliers-of-spliterators, by instantiating the appropriate Head class. This is THE entry point for creating an initial stream - called by Stream.concat(), generate(), builder().build(), all Collection.stream() impls, Arrays.stream()... | |
StreamSpliterators | Spliterators for Stream.spliterator(), skip()/limit(), distinct(), generate() | ||
Streams | Spliterators for Stream.concat(), range(), implementation for builder(), plus composeWithExceptions() (used by onClose()) and composedClose() (used by concat()) | ||
Nodes |
scrawl
|
A utility that implements "leaf" Nodes for wrapping arrays and collections, Node.Builders (that are themselves both Sinks and "leaf" Nodes - build() returns this) to accumulate a known or unknown amount of elements (using an array or SpinedBuffer, respectively), the "internal"/concat Node, and a few additional Node-related helper methods and tasks | |
Collectors (public) | A (hopefully) familiar utility that defines a variety of common collectors, some of which accept a downstream collector and act as "wrapping" or "multilevel" collectors | ||
Gatherers (public) | A utility that defines a variety of (expected-to-be) common gatherers | ||
DistinctOps | Implementation of distinct() | ||
SliceOps | Implementation of skip(), limit() | ||
SortedOps | Implementation of sorted() | ||
WhileOps | Implementation of takeWhile(), dropWhile() | ||
FindOps | Implementation of findFirst(), findAny() | ||
ForEachOps | Implementation of forEach(), forEachOrdered() | ||
MatchOps | Implementation of anyMatch(), allMatch(), noneMatch() | ||
ReduceOps | Implementation of reduce(), collect(), count() |
This is an outline of how a stream executes, from creation to consumption.
This list details where each stream op flag is read to optimize the stream.
Related usages:
The below table details how each stream operation affects stream op flags and memory usage in isolation.
Note: The "ΔFlags" below (mostly corresponding to AbstractPipeline.sourceOrOpFlags) represent the changes that one operation applies to the "cumulative" stream flags (mostly corresponding to AbstractPipeline.combinedFlags). For intermediate operations, these changes are applied AFTER the operation, to affect downstream operations. For terminal operations, these changes are applied BEFORE the operation, to affect the terminal operation itself (if it even bothers to use this mechanism).
Note: The "Memory Usage" columns below sometimes refer to what I call "amplifying" operations - operations that may increase the number of elements in the stream. These include flatMap(), mapMulti(), and gather(). Conversely, "de-amplifying" operations may decrease the number of elements in the stream. These include filter(), skip(), limit(), takeWhile(), and dropWhile() - as well as flatMap(), mapMulti(), and gather() again (since they may emit no output elements for an input element). I conservatively ignore the possible effect of de-amplifying operations when considering memory usage.
Intermediate Operations | ΔFlags (combined AFTER operation) | Memory Usage | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
DISTINCT | SORTED | ORDERED | SIZED | SHORT_CIRCUIT | SIZE_ADJUSTING | sequential | parallel | |||
Stateless | map() |
scrawl
|
CLEARS | CLEARS | Okay | Okay | ||||
flatMap() |
scrawl
|
CLEARS | CLEARS | CLEARS | Okay | Okay | ||||
mapMulti() |
scrawl
|
CLEARS | CLEARS | CLEARS | Okay | Okay | ||||
filter() |
scrawl
|
CLEARS | Okay | Okay | ||||||
peek() |
scrawl
|
Okay | Okay | |||||||
unordered() |
scrawl
|
CLEARS | Okay | Okay | ||||||
Stateful | skip() |
scrawl
|
SETS | Okay | Caution: (a) Eager if ORDERED and !(SIZED and source-spliterator is SUBSIZED), (b) Buffering risk if upstream contained amplifying operations | |||||
limit() | SETS | SETS | Okay | Caution: (a) Eager if ORDERED and !(SIZED and source-spliterator is SUBSIZED), (b) Buffering risk if upstream contained amplifying operations | ||||||
takeWhile() |
scrawl
|
CLEARS | SETS | Okay | Caution: (a) Eager if ORDERED, (b) Buffering risk if lazy AND upstream contained amplifying operations | |||||
dropWhile() |
scrawl
|
CLEARS | Okay | Caution: (a) Eager if ORDERED, (b) Buffering risk if lazy AND upstream contained amplifying operations | ||||||
gather() |
scrawl
|
CLEARS | CLEARS | CLEARS | SETS if gatherer is !greedy | Caution: Memory usage depends on how the gatherer's state grows | Caution: (a) Eager unless followed by collect(), (b) Memory usage depends on how the gatherer's state grows, (c) Buffers elements from upstream until predecessor tasks finish if gatherer is sequential and greedy, (d) Buffering risk if gatherer is !greedy AND upstream contained amplifying operations | |||
distinct() |
scrawl
|
SETS | CLEARS | Caution: Accumulates into a Set if !DISTINCT and !SORTED | Caution: (a) Eager if !DISTINCT and ORDERED, (b) Accumulates into a Set if !DISTINCT and !ORDERED, (c) Buffering risk if lazy AND upstream contained amplifying operations AND downstream consumes elements one-at-a-time | |||||
sorted() |
scrawl
|
SETS if no comparator else CLEARS | SETS | Caution: Accumulates elements into an array/List if !(SORTED and no comparator) | Warning: Always eager; Accumulates elements into an array/List | |||||
Terminal Operations | ΔFlags (combined BEFORE operation) | Memory Usage | ||||||||
DISTINCT | SORTED | ORDERED | SIZED | SHORT_CIRCUIT | SIZE_ADJUSTING | sequential | parallel | |||
min() |
scrawl
|
Okay | Okay | |||||||
max() | Okay | Okay | ||||||||
findFirst() |
scrawl
|
SETS | Okay | Okay | ||||||
findAny() | CLEARS | SETS | Okay | Okay | ||||||
anyMatch() |
scrawl
|
CLEARS | SETS | Okay | Okay | |||||
allMatch() | CLEARS | SETS | Okay | Okay | ||||||
noneMatch() | CLEARS | SETS | Okay | Okay | ||||||
count() |
scrawl
|
CLEARS | Okay | Okay | ||||||
forEach() |
scrawl
|
CLEARS | Okay | Okay | ||||||
forEachOrdered() | Okay | Warning: Buffers elements from upstream until predecessor tasks finish | ||||||||
toArray() |
scrawl
|
Warning: Accumulates elements into an array | Warning: Accumulates elements into an array | |||||||
toList() | Warning: Accumulates elements into an array | Warning: Accumulates elements into an array | ||||||||
reduce() |
scrawl
|
Caution: Memory usage depends on how the reduction's accumulation grows | Caution: Memory usage depends on how the reduction's accumulation grows | |||||||
collect() |
scrawl
|
CLEARS if collector is UNORDERED | Caution: Memory usage depends on how the collector's accumulation grows | Caution: Memory usage depends on how the collector's accumulation grows | ||||||
spliterator() |
scrawl
|
Caution: (a) Upstream memory usage will be realized upon consumption, (b) Buffering risk if upstream contained amplifying operations | Caution: (a) Upstream memory usage will be realized upon consumption, (b) Buffering risk if upstream contained amplifying operations | |||||||
iterator() |
scrawl
|
Caution: (a) Upstream memory usage will be realized upon consumption, (b) Buffering risk if upstream contained amplifying operations | Caution: (a) Upstream memory usage will be realized upon consumption, (b) Buffering risk if upstream contained amplifying operations |
Caution: Splitting a naive spliterator may cause buffering. In parallel streams, the first eager operation (stateful or terminal) can evoke heavy memory usage if the initial spliterator's trySplit() relies on buffering many/large elements. For instance, Spliterators.AbstractSpliterator and Spliterators.IteratorSpliterator (often used when converting an iterator to a spliterator) both split by instantiating an array and advancing elements into it, then returning a spliterator over the array. Each time the original spliterator splits, the capacity of the array it instantiates and fills is increased (up to a threshold). If the original spliterator covers sufficiently many/large elements, it can eventually run out of memory attempting to split.
Caution: Amplifying operations that feed into a pipeline-wrapping spliterator may cause buffering. Lazy operations necessarily wrap the upstream pipeline in a spliterator. When we initially call tryAdvance() on this upstream-wrapping spliterator (StreamSpliterators.WrappingSpliterator), it calls tryAdvance() on the incoming pipeline spliterator, passing a consumer that applies the upstream pipeline operations and ultimately pushes output elements into a buffer. This buffer is because the pipeline may contain amplifying operations that can produce multiple output elements per input element from the incoming spliterator. The next time tryAdvance() is called on the upstream-wrapping spliterator, it will poll from the buffer if it is not empty. So, in the case that we have an upstream-wrapping spliterator over large amplifying operations, and we consume it with tryAdvance(), we can actually use a lot of memory.
// The simplest example to demonstrate an OutOfMemoryError due to buffering is to use the spliterator()
// lazy terminal op to wrap an infinitely-amplifying upstream, and then tryAdvance() the spliterator:
Stream.of(0).flatMap(_ -> Stream.generate(() -> 0)).spliterator().tryAdvance(_ -> {}); // Throws OOME
// But since parallel-lazy stateful ops also wrap the upstream in a spliterator,
// we can evoke this OOME a few other ways:
// | "large" amplifying operation | lazy operation that | operation that causes
// | (infinite in this case) | wraps upstream in a | tryAdvance() to be called
// v v spliterator v on the wrapping spliterator
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).skip(10) .findFirst(); // All
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).limit(10) .findFirst(); // of
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).takeWhile(i -> i < 10).findFirst(); // these
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).dropWhile(i -> i < 10).findFirst(); // throw
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).distinct() .findFirst(); // OOME
// Above we used the short-circuiting findFirst() operation to ensure we call tryAdvance() instead
// of forEachRemaining() on the incoming spliterator. However, even non-short-circuiting operations
// like forEach() will still cause tryAdvance() to be called on the upstream-wrapping spliterator in
// most of these cases, since _that_ spliterator is wrapped in the lazy stateful op's spliterator,
// and most of those have a forEachRemaining() that calls tryAdvance() on the wrapped spliterator.
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).skip(10) .forEach(_ -> {}); // These
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).limit(10) .forEach(_ -> {}); // still
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).takeWhile(i -> i < 10).forEach(_ -> {}); // throw
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).dropWhile(i -> i < 10).forEach(_ -> {}); // OOME
// Except this one! StreamSpliterators.DistinctSpliterator.forEachRemaining()
// calls forEachRemaining() on the wrapped spliterator:
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).distinct() .forEach(_ -> {}); // No OOME; Runs forever
// There are also a few parallel-eager stateful ops that wrap the upstream in a spliterator
// before consuming it with tryAdvance().
// Here, we make unordered skip()/limit() parallel-eager in a way that still wraps the upstream
// spliterator, by following them with toArray()/toList().
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).skip(10).toArray(); // OOME (Okay, silly - toArray() was going to buffer everything anyway)
Stream.of(0).parallel().unordered().flatMap(_ -> Stream.generate(() -> 0)).limit(10).toArray(); // OOME (Less silly)
// Here, all we have to do is use a non-greedy (aka short-circuiting) gatherer.
// Regardless of the subsequent operations, the parallel gather() wraps the upstream in a spliterator,
// and the non-greedy gatherer forces gather() to consume that spliterator via tryAdvance().
Stream.of(0).parallel().flatMap(_ -> Stream.generate(() -> 0)).gather(nonGreedyGatherer).forEach(_ -> {}); // OOME
// We don't even have to be parallel if the gather() is followed by collect(),
// which again causes the implementation to wrap the upstream in a spliterator.
Stream.of(0).flatMap(_ -> Stream.generate(() -> 0)).gather(nonGreedyGatherer).collect(anyCollector); // OOME
// Note that currently, all existing gatherers provided by java.util.stream.Gatherers are greedy,
// but you can craft your own gatherers that are not.
Note: Though parallel forEachOrdered() and parallel gather() (with a sequential + greedy Gatherer) both buffer the upstream in leaf tasks, the actual amount buffered depends on the leaf spliterator size, the parallelism of the ForkJoinPool running the stream, and whether the upstream uses amplifying/de-amplifying operations. The buffering in these cases does not necessarily hold the entire upstream in memory at once.
This table simulates what will happen to stream op flags and memory usage across stream operations.
Try adding/changing stream operations and initial spliterator characteristics.
Stages | Spliterator Characteristics | ΔFlags | Flags | Memory Usage | Notes | |||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
DI | SO | OR | SI | NO | IM | CO | SU | DI | SO | OR | SI | SC | SA | DI | SO | OR | SI | SC | SA | |||||||
StreamSupport.stream(spliterator, ) |
(Dashed line) Indicates that a (parallel)
stateful op evaluates lazily, by wrapping the incoming spliterator in a new spliterator that implements
the pipeline since the last barrier, and becomes the new "incoming" spliterator downstream. A lazy
barrier is also drawn for the terminal operations spliterator()/iterator(), since they likewise wrap the
incoming spliterator. (Solid line) Indicates that a (parallel)
stateful op evaluates eagerly, by accumulating the upstream pipeline into a Node, whose spliterator
becomes the new "incoming" spliterator downstream. An eager barrier is also drawn for (sequential)
sorted(), since it must accumulate all elements from the upstream pipeline before emitting any
downstream. (It is possible for (sequential) gather() to behave in this way too, but for now no barrier
is drawn.) If a stateful op is fused to a toArray()/toList() terminal op, no barrier is drawn - the
stateful op evaluates in parallel-eager mode to produce a Node, but that Node is converted to an array
directly, rather than taking its spliterator (and toArray()/toList() are already eager). If gather() is
fused to collect(), no barrier is drawn - the gather() no longer evaluates in parallel-eager mode to
produce a Node, and instead emits directly into the collector's accumulator.
Spliterator Characteristics, ΔFlags, and Flags are all "outgoing" from the stage they are
adjacent to. (The exception to this is eager terminal ops, which do not have an outgoing spliterator,
and apply flag deltas BEFORE evaluation.) To help visualize stateful op barriers, they are drawn going
under the stage, and above the rest of the row.
† When a stateful op is evaluated in parallel-eager mode, the implementation of the resultant Node
(and its spliterator) can depend on details that this tool doesn't or can't model, such as the size of
the input spliterator, whether/how that spliterator splits, and whether nodes are concatenated,
truncated, etc. In these cases, I depict the intersection of all possible sets of spliterator characteristics as
"known", and the remaining possible characteristics as "unknown (assumed absent)". In practice, this is
mostly benign, but can occasionaly lead to some mispredictions when the actual spliterator would have
been SUBSIZED.
‡ Deltas may be depicted as "changed" for one of several reasons:
Legend
Spliterator Characteristics
ΔFlags
Flags
DI
DISTINCT
DI
DISTINCT
DI
DISTINCT
SO
SORTED
SO
SORTED
SO
SORTED
OR
ORDERED
OR
ORDERED
OR
ORDERED
SI
SIZED
SI
SIZED
SI
SIZED
NO
NONNULL
SC
SHORT_CIRCUIT
SC
SHORT_CIRCUIT
IM
IMMUTABLE
SA
SIZE_ADJUSTING
SA
SIZE_ADJUSTING
CO
CONCURRENT
No delta
Flag is known absent
SU
SUBSIZED
S
Delta sets flag
Y
Flag is known present
?
Characteristic is unknown† (assumed absent)
C
Delta clears flag
Characteristic is known absent
C/S
Delta changes‡ (e.g. from "clears" to "sets")
Y
Characteristic is known present
This is a list of some interesting tidbits that didn't make it elsewhere.