Does Stream.forEach respect the encounter order of sequential streams?
Specifications exist to describe the minimal guarantees a caller can depend on, not to describe what the implementation does. This gap is crucial, as it allows the implementation flexibility to evolve. (Specification is declarative; implementation is imperative.) Overspecification is just as bad as underspecification.
When a specification says "does not preserve property X", it does not mean that the property X may never be observed; it means the implementation is not obligated to preserve it. Your claimed implication that encounter order is never preserved is simply a wrong conclusion. (HashSet
doesn't promise that iterating its elements preserves the order they were inserted, but that doesn't mean this can't accidentally happen -- you just can't count on it.)
Similarly, your implication of "that suggests forEach is intended to preserve order for sequential streams" because you saw an implementation that does so in some case is equally incorrect.
In both cases, it seems like you're just uncomfortable with the fact that the specification gives forEach
a great deal of freedom. Specifically, it has the freedom to not preserve encounter order for sequential streams, even though that's what the implementation currently does, and further that it's kind of hard to imagine an implementation going out of its way to process sequential sources out of order. But that's what the spec says, and that's what it was intended to say.
That said, the wording of the comment about parallel streams is potentially confusing, because it is still possible to misinterpret it. The intent of calling out the parallel case explicitly here was pedagogical; the spec is still perfectly clear with that sentence removed entirely. However, to a reader who is unaware of parallelism, it would be almost impossible to not assume that forEach
would preserve encounter order, so this sentence was added to help clarify the motivation. But, as you point out, the desire to treat the sequential case specially is still so powerful that it would be beneficial to clarify further.
Java, in which thread are sequential streams executed?
This all boils down to what is guaranteed based on the specification, and the fact that a current implementation may have additional behaviors beyond what is guaranteed.
Java Language Architect Brian Goetz made a relevant point regarding specifications in a related question:
Specifications exist to describe the minimal guarantees a caller can depend on, not to describe what the implementation does.
[...]
When a specification says "does not preserve property X", it does not mean that the property X may never be observed; it means the implementation is not obligated to preserve it. [...] (
HashSet
doesn't promise that iterating its elements preserves the order they were inserted, but that doesn't mean this can't accidentally happen -- you just can't count on it.)
This all means that even if the current implementation happens to have certain behavioral characteristics, they should not be relied upon nor assumed that they will not change in new versions of the library.
Sequential stream pipeline thread
In which thread is the pipeline of a sequential stream executed? Is it always the calling thread or is an implementation free to choose any thread?
Current stream implementations may or may not use the calling thread, and may use one or multiple threads. As none of this is specified by the API, this behavior should not be relied on.
forEach
execution thread
In which thread is the action parameter of the forEach terminal operation executed if the stream is sequential?
While current implementations use the existing thread, this cannot be relied on, as the documentation states that the choice of thread is up to the implementation. In fact, there are no guarantees that the elements aren't processed by different threads for different elements, though that is not something the current stream implementation does either.
Per the API:
For any given element, the action may be performed at whatever time and in whatever thread the library chooses.
Note that while the API calls out parallel streams specifically when discussing encounter order, that was clarified by Brian Goetz to clarify the motivation of the behavior, and not that any of the behavior is specific to parallel streams:
The intent of calling out the parallel case explicitly here was pedagogical [...]. However, to a reader who is unaware of parallelism, it would be almost impossible to not assume that
forEach
would preserve encounter order, so this sentence was added to help clarify the motivation.
Synchronization using sequential streams
Do I have to use any synchronization when using sequential streams?
Current implementations will likely work since they use a single thread for the sequential stream's forEach
method. However, as it is not guaranteed by the stream specification, it should not be relied on. Therefore, synchronization should be used as though the methods could be called by multiple threads.
That said, the stream documentation specifically recommends against using side-effects that would require synchronization, and suggest using reduction operations instead of mutable accumulators:
Many computations where one might be tempted to use side effects can be more safely and efficiently expressed without side-effects, such as using reduction instead of mutable accumulators. [...] A small number of stream operations, such as forEach() and peek(), can operate only via side-effects; these should be used with care.
As an example of how to transform a stream pipeline that inappropriately uses side-effects to one that does not, the following code searches a stream of strings for those matching a given regular expression, and puts the matches in a list.
ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
.forEach(s -> results.add(s)); // Unnecessary use of side-effects!This code unnecessarily uses side-effects. If executed in parallel, the non-thread-safety of ArrayList would cause incorrect results, and adding needed synchronization would cause contention, undermining the benefit of parallelism. Furthermore, using side-effects here is completely unnecessary; the forEach() can simply be replaced with a reduction operation that is safer, more efficient, and more amenable to parallelization:
List<String>results =
stream.filter(s -> pattern.matcher(s).matches())
.collect(Collectors.toList()); // No side-effects!
Does Stream.sorted().forEach() work as intended?
It's not guaranteed that the forEach
terminal operation will process elements in the encounter order hence the "is explicitly nondeterministic". Although under the current implementation it should process the elements of a sequential stream in the encounter order of the stream.
forEachOrdered
is primarily for cases where you're using a parallel stream and want to respect the encounter order of the stream if the stream has a defined encounter order.
Using forEach
or forEachOrdered
on a sequential stream will have the same effect so it's a matter of preference.
As mentioned above, under the current implementation we know that the forEach
terminal operation should process the elements of a sequential stream in the encounter order of the stream but since it's not stated in the java doc it's better to sit on the fence and use forEachOrdered
if you really care about the iteration order.
forEach vs forEachOrdered in Java 8 Stream
Stream.of("AAA","BBB","CCC").parallel().forEach(s->System.out.println("Output:"+s));
Stream.of("AAA","BBB","CCC").parallel().forEachOrdered(s->System.out.println("Output:"+s));
The second line will always output
Output:AAA
Output:BBB
Output:CCC
whereas the first one is not guaranted since the order is not kept. forEachOrdered
will processes the elements of the stream in the order specified by its source, regardless of whether the stream is sequential or parallel.
Quoting from forEach
Javadoc:
The behavior of this operation is explicitly nondeterministic. For parallel stream pipelines, this operation does not guarantee to respect the encounter order of the stream, as doing so would sacrifice the benefit of parallelism.
When the forEachOrdered
Javadoc states (emphasis mine):
Performs an action for each element of this stream, in the encounter order of the stream if the stream has a defined encounter order.
What is the reason forEach in Java Streams API is unordered?
Defining a method forEach
that would preserve order and unordered
that would break it, would complicated things IMO; simply because unordered
does nothing more than setting a flag in the stream api internals and the flag checking would have to be performed or enforced based on some conditions.
So let's say you would do:
someStream()
.unordered()
.forEach(System.out::println)
In this case, your proposal is to not print elements in any order, thus enforcing unordered
here. But what if we did:
someSet().stream()
.unordered()
.forEach(System.out::println)
In this case would you want unordered
to be enforced? After all, the source of a stream is a Set
, which has no order, so in this case, enforcing unordered
is just useless; but this means additional tests on the source of the stream internally. This can get quite tricky and complicated (as it already is btw).
To make it simpler there were two method defined, that clearly stipulate what they will do; and this is on par for example with findFirst
vs findAny
or even Optional::isPresent
and Optional::isEmpty
(added in java-11).
How to ensure order of processing in java8 streams?
You are asking the wrong question. You are asking about sequential
vs. parallel
whereas you want to process items in order, so you have to ask about ordering. If you have an ordered stream and perform operations which guarantee to maintain the order, it doesn’t matter whether the stream is processed in parallel or sequential; the implementation will maintain the order.
The ordered property is distinct from parallel vs sequential. E.g. if you call stream()
on a HashSet
the stream will be unordered while calling stream()
on a List
returns an ordered stream. Note that you can call unordered()
to release the ordering contract and potentially increase performance. Once the stream has no ordering there is no way to reestablish the ordering. (The only way to turn an unordered stream into an ordered is to call sorted
, however, the resulting order is not necessarily the original order).
See also the “Ordering” section of the java.util.stream
package documentation.
In order to ensure maintenance of ordering throughout an entire stream operation, you have to study the documentation of the stream’s source, all intermediate operations and the terminal operation for whether they maintain the order or not (or whether the source has an ordering in the first place).
This can be very subtle, e.g. Stream.iterate(T,UnaryOperator)
creates an ordered stream while Stream.generate(Supplier)
creates an unordered stream. Note that you also made a common mistake in your question as forEach
does not maintain the ordering. You have to use forEachOrdered
if you want to process the stream’s elements in a guaranteed order.
So if your list
in your question is indeed a java.util.List
, its stream()
method will return an ordered stream and filter
will not change the ordering. So if you call list.stream().filter() .forEachOrdered()
, all elements will be processed sequentially in order, whereas for list.parallelStream().filter().forEachOrdered()
the elements might be processed in parallel (e.g. by the filter) but the terminal action will still be called in order (which obviously will reduce the benefit of parallel execution).
If you, for example, use an operation like
List<…> result=inputList.parallelStream().map(…).filter(…).collect(Collectors.toList());
the entire operation might benefit from parallel execution but the resulting list will always be in the right order, regardless of whether you use a parallel or sequential stream.
Pros and Cons of usage forEach and Stream
There are no advantages of using the second case, unless you have a parallel stream. There is a disadvantage, namely that Stream.forEach()
doesn't guarantee to respect encounter order. A more accurate (but still unnecessary) equivalent would be Stream.forEachOrdered()
.
Does Stream.forEach() always work in parallel?
No, forEach() doesn't parallelize if the stream isn't parallel. I think he simplified the example for the sake of discussion.
As evidence, this code is inside the AbstractPipeline class's evaluate method (which is called from forEach)
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
Related Topics
Spring Cache with Collection of Items/Entities
Why Do Consumers Accept Lambdas with Statement Bodies But Not Expression Bodies
Writing Custom Kafka Serializer
Not Implementing All of the Methods of Interface. Is It Possible
How to Create an 2D Arraylist in Java
Find Files in a Folder Using Java
String Replacement in Java, Similar to a Velocity Template
Read Error Response Body in Java
Making a Log4J Console Appender Use Different Colors for Different Threads
How to Get Method Parameter Names in Java 8 Using Reflection
JPA How to Make Composite Foreign Key Part of Composite Primary Key
String Array Initialization in Java
How to Cast an Object to an Int
How to Make a Countdown Timer in Java
Can't Parse String to Localdate (Java 8)
Are Java Static Calls More or Less Expensive Than Non-Static Calls