Does Stream.Foreach Respect the Encounter Order of Sequential Streams

Does Stream.forEach respect the encounter order of sequential streams?

Specifications exist to describe the minimal guarantees a caller can depend on, not to describe what the implementation does. This gap is crucial, as it allows the implementation flexibility to evolve. (Specification is declarative; implementation is imperative.) Overspecification is just as bad as underspecification.

When a specification says "does not preserve property X", it does not mean that the property X may never be observed; it means the implementation is not obligated to preserve it. Your claimed implication that encounter order is never preserved is simply a wrong conclusion. (HashSet doesn't promise that iterating its elements preserves the order they were inserted, but that doesn't mean this can't accidentally happen -- you just can't count on it.)

Similarly, your implication of "that suggests forEach is intended to preserve order for sequential streams" because you saw an implementation that does so in some case is equally incorrect.

In both cases, it seems like you're just uncomfortable with the fact that the specification gives forEach a great deal of freedom. Specifically, it has the freedom to not preserve encounter order for sequential streams, even though that's what the implementation currently does, and further that it's kind of hard to imagine an implementation going out of its way to process sequential sources out of order. But that's what the spec says, and that's what it was intended to say.

That said, the wording of the comment about parallel streams is potentially confusing, because it is still possible to misinterpret it. The intent of calling out the parallel case explicitly here was pedagogical; the spec is still perfectly clear with that sentence removed entirely. However, to a reader who is unaware of parallelism, it would be almost impossible to not assume that forEach would preserve encounter order, so this sentence was added to help clarify the motivation. But, as you point out, the desire to treat the sequential case specially is still so powerful that it would be beneficial to clarify further.

Java, in which thread are sequential streams executed?

This all boils down to what is guaranteed based on the specification, and the fact that a current implementation may have additional behaviors beyond what is guaranteed.

Java Language Architect Brian Goetz made a relevant point regarding specifications in a related question:

Specifications exist to describe the minimal guarantees a caller can depend on, not to describe what the implementation does.

[...]

When a specification says "does not preserve property X", it does not mean that the property X may never be observed; it means the implementation is not obligated to preserve it. [...] (HashSet doesn't promise that iterating its elements preserves the order they were inserted, but that doesn't mean this can't accidentally happen -- you just can't count on it.)

This all means that even if the current implementation happens to have certain behavioral characteristics, they should not be relied upon nor assumed that they will not change in new versions of the library.

Sequential stream pipeline thread

In which thread is the pipeline of a sequential stream executed? Is it always the calling thread or is an implementation free to choose any thread?

Current stream implementations may or may not use the calling thread, and may use one or multiple threads. As none of this is specified by the API, this behavior should not be relied on.

forEach execution thread

In which thread is the action parameter of the forEach terminal operation executed if the stream is sequential?

While current implementations use the existing thread, this cannot be relied on, as the documentation states that the choice of thread is up to the implementation. In fact, there are no guarantees that the elements aren't processed by different threads for different elements, though that is not something the current stream implementation does either.

Per the API:

For any given element, the action may be performed at whatever time and in whatever thread the library chooses.

Note that while the API calls out parallel streams specifically when discussing encounter order, that was clarified by Brian Goetz to clarify the motivation of the behavior, and not that any of the behavior is specific to parallel streams:

The intent of calling out the parallel case explicitly here was pedagogical [...]. However, to a reader who is unaware of parallelism, it would be almost impossible to not assume that forEach would preserve encounter order, so this sentence was added to help clarify the motivation.

Synchronization using sequential streams

Do I have to use any synchronization when using sequential streams?

Current implementations will likely work since they use a single thread for the sequential stream's forEach method. However, as it is not guaranteed by the stream specification, it should not be relied on. Therefore, synchronization should be used as though the methods could be called by multiple threads.

That said, the stream documentation specifically recommends against using side-effects that would require synchronization, and suggest using reduction operations instead of mutable accumulators:

Many computations where one might be tempted to use side effects can be more safely and efficiently expressed without side-effects, such as using reduction instead of mutable accumulators. [...] A small number of stream operations, such as forEach() and peek(), can operate only via side-effects; these should be used with care.

As an example of how to transform a stream pipeline that inappropriately uses side-effects to one that does not, the following code searches a stream of strings for those matching a given regular expression, and puts the matches in a list.

     ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
.forEach(s -> results.add(s)); // Unnecessary use of side-effects!

This code unnecessarily uses side-effects. If executed in parallel, the non-thread-safety of ArrayList would cause incorrect results, and adding needed synchronization would cause contention, undermining the benefit of parallelism. Furthermore, using side-effects here is completely unnecessary; the forEach() can simply be replaced with a reduction operation that is safer, more efficient, and more amenable to parallelization:

     List<String>results =
stream.filter(s -> pattern.matcher(s).matches())
.collect(Collectors.toList()); // No side-effects!

Does Stream.sorted().forEach() work as intended?

It's not guaranteed that the forEach terminal operation will process elements in the encounter order hence the "is explicitly nondeterministic". Although under the current implementation it should process the elements of a sequential stream in the encounter order of the stream.

forEachOrdered is primarily for cases where you're using a parallel stream and want to respect the encounter order of the stream if the stream has a defined encounter order.

Using forEach or forEachOrdered on a sequential stream will have the same effect so it's a matter of preference.

As mentioned above, under the current implementation we know that the forEach terminal operation should process the elements of a sequential stream in the encounter order of the stream but since it's not stated in the java doc it's better to sit on the fence and use forEachOrdered if you really care about the iteration order.

forEach vs forEachOrdered in Java 8 Stream

Stream.of("AAA","BBB","CCC").parallel().forEach(s->System.out.println("Output:"+s));
Stream.of("AAA","BBB","CCC").parallel().forEachOrdered(s->System.out.println("Output:"+s));

The second line will always output

Output:AAA
Output:BBB
Output:CCC

whereas the first one is not guaranted since the order is not kept. forEachOrdered will processes the elements of the stream in the order specified by its source, regardless of whether the stream is sequential or parallel.

Quoting from forEach Javadoc:

The behavior of this operation is explicitly nondeterministic. For parallel stream pipelines, this operation does not guarantee to respect the encounter order of the stream, as doing so would sacrifice the benefit of parallelism.

When the forEachOrdered Javadoc states (emphasis mine):

Performs an action for each element of this stream, in the encounter order of the stream if the stream has a defined encounter order.

What is the reason forEach in Java Streams API is unordered?

Defining a method forEach that would preserve order and unordered that would break it, would complicated things IMO; simply because unordered does nothing more than setting a flag in the stream api internals and the flag checking would have to be performed or enforced based on some conditions.

So let's say you would do:

someStream()
.unordered()
.forEach(System.out::println)

In this case, your proposal is to not print elements in any order, thus enforcing unordered here. But what if we did:

someSet().stream()
.unordered()
.forEach(System.out::println)

In this case would you want unordered to be enforced? After all, the source of a stream is a Set, which has no order, so in this case, enforcing unordered is just useless; but this means additional tests on the source of the stream internally. This can get quite tricky and complicated (as it already is btw).

To make it simpler there were two method defined, that clearly stipulate what they will do; and this is on par for example with findFirst vs findAny or even Optional::isPresent and Optional::isEmpty (added in java-11).

How to ensure order of processing in java8 streams?

You are asking the wrong question. You are asking about sequential vs. parallel whereas you want to process items in order, so you have to ask about ordering. If you have an ordered stream and perform operations which guarantee to maintain the order, it doesn’t matter whether the stream is processed in parallel or sequential; the implementation will maintain the order.

The ordered property is distinct from parallel vs sequential. E.g. if you call stream() on a HashSet the stream will be unordered while calling stream() on a List returns an ordered stream. Note that you can call unordered() to release the ordering contract and potentially increase performance. Once the stream has no ordering there is no way to reestablish the ordering. (The only way to turn an unordered stream into an ordered is to call sorted, however, the resulting order is not necessarily the original order).

See also the “Ordering” section of the java.util.stream package documentation.

In order to ensure maintenance of ordering throughout an entire stream operation, you have to study the documentation of the stream’s source, all intermediate operations and the terminal operation for whether they maintain the order or not (or whether the source has an ordering in the first place).

This can be very subtle, e.g. Stream.iterate(T,UnaryOperator) creates an ordered stream while Stream.generate(Supplier) creates an unordered stream. Note that you also made a common mistake in your question as forEach does not maintain the ordering. You have to use forEachOrdered if you want to process the stream’s elements in a guaranteed order.

So if your list in your question is indeed a java.util.List, its stream() method will return an ordered stream and filter will not change the ordering. So if you call list.stream().filter() .forEachOrdered(), all elements will be processed sequentially in order, whereas for list.parallelStream().filter().forEachOrdered() the elements might be processed in parallel (e.g. by the filter) but the terminal action will still be called in order (which obviously will reduce the benefit of parallel execution).

If you, for example, use an operation like

List<…> result=inputList.parallelStream().map(…).filter(…).collect(Collectors.toList());

the entire operation might benefit from parallel execution but the resulting list will always be in the right order, regardless of whether you use a parallel or sequential stream.

Pros and Cons of usage forEach and Stream

There are no advantages of using the second case, unless you have a parallel stream. There is a disadvantage, namely that Stream.forEach() doesn't guarantee to respect encounter order. A more accurate (but still unnecessary) equivalent would be Stream.forEachOrdered().

Does Stream.forEach() always work in parallel?

No, forEach() doesn't parallelize if the stream isn't parallel. I think he simplified the example for the sake of discussion.

As evidence, this code is inside the AbstractPipeline class's evaluate method (which is called from forEach)

 return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));


Related Topics



Leave a reply



Submit