How to Ensure Order of Processing in Java8 Streams

How to ensure order of processing in java8 streams?

You are asking the wrong question. You are asking about sequential vs. parallel whereas you want to process items in order, so you have to ask about ordering. If you have an ordered stream and perform operations which guarantee to maintain the order, it doesn’t matter whether the stream is processed in parallel or sequential; the implementation will maintain the order.

The ordered property is distinct from parallel vs sequential. E.g. if you call stream() on a HashSet the stream will be unordered while calling stream() on a List returns an ordered stream. Note that you can call unordered() to release the ordering contract and potentially increase performance. Once the stream has no ordering there is no way to reestablish the ordering. (The only way to turn an unordered stream into an ordered is to call sorted, however, the resulting order is not necessarily the original order).

See also the “Ordering” section of the java.util.stream package documentation.

In order to ensure maintenance of ordering throughout an entire stream operation, you have to study the documentation of the stream’s source, all intermediate operations and the terminal operation for whether they maintain the order or not (or whether the source has an ordering in the first place).

This can be very subtle, e.g. Stream.iterate(T,UnaryOperator) creates an ordered stream while Stream.generate(Supplier) creates an unordered stream. Note that you also made a common mistake in your question as forEach does not maintain the ordering. You have to use forEachOrdered if you want to process the stream’s elements in a guaranteed order.

So if your list in your question is indeed a java.util.List, its stream() method will return an ordered stream and filter will not change the ordering. So if you call list.stream().filter() .forEachOrdered(), all elements will be processed sequentially in order, whereas for list.parallelStream().filter().forEachOrdered() the elements might be processed in parallel (e.g. by the filter) but the terminal action will still be called in order (which obviously will reduce the benefit of parallel execution).

If you, for example, use an operation like

List<…> result=inputList.parallelStream().map(…).filter(…).collect(Collectors.toList());

the entire operation might benefit from parallel execution but the resulting list will always be in the right order, regardless of whether you use a parallel or sequential stream.

Which operations preserve order

After some researching in the source code I summarized the following tables:

Taken from: Java streams - Part 6 - Spliterator

The following table showes which operations types are allowed to modify charactersiticts:

|                        | DISTICTS | SORTED | ORDERED | SIZED | SHORT_CIRCUIT |
| ---------------------- | -------- | ------ | ------- | ----- | --------------|
| source stream | Y | Y | Y | Y | N |
| intermediate operation | PCI | PCI | PCI | PC | PI |
| terminal operation | N | N | PC | N | PI |
  • Y - Allowed to have
  • P - May preserves
  • C - May clears.
  • I - May injects.
  • N - Not valid; Irelevant for the operation.

Taken from Java streams - Stream methods characteristics table

The following table showes which characteristics and flags each intermediate operation/terminal operation may turns on and off: (SHORT_CIRCUIT is relevent only in the context of StreamOpFlag flags)

Note: P (Preserve) flag is added to every cell except the ones with the C and I (Clear and Inject) flags.

|                  |  DISTINCT |  SORTED |  ORDERED |  SIZED |  SHORT_CIRCUIT |
| ---------------- | ----------| --------| ---------| -------| ---------------|
| filter | | | | C | |
| forEach | | | C | | |
| forEachOrdered | | | | | |
| allMatch | | | C | | I |
| distinct | I | | | C | |
| flatMap | C | C | | C | |
| anyMatch | | | C | | I |
| collect | | | | | |
| unOrdered | | | C | | |
| count | C | C | C | C | |
| findAny | | | C | | I |
| findFirst | | | | | I |
| flatMapToXXX | C | C | | C | |
| limit | | | | C | I |
| map | C | C | | | |
| mapToXXX | C | C | | | |
| max | | | | | |
| min | | | | | |
| noneMatch | | | C | | I |
| peek | | | | | |
| reduce | | | | | |
| skip | | | C | I | |
| sorted | | I | I | | |
| toArray | | | | | |
  • C - Clears.
  • I - Injects.

Preserve order in Java stream with collect

You’ll need to collect to a LinkedHashMap to get the expected result. Also, you don’t have to do another forEach separately from the stream pipeline after the groupingBy, just create a stream from the entrySet and map then collect to list.

return messages.stream()
.collect(groupingBy(Function.identity(), LinkedHashMap::new, summingInt(e -> 1)))
.entrySet()
.stream()
.map(e -> e.getKey()+(e.getValue() == 1 ? "" : " (" + e.getValue() +" times)"))
.collect(toList());

Encounter order preservation in java stream

The unordered nature of a source or the explicit releasing of the order contract via unordered() may affect all subsequent pipeline stages unless they introduce an order which can only happen with a sorted operation.

For stateless intermediate operations like filter and map, there is no difference anyway, but operations like skip, limit and distinct may exhibit a different behavior depending on whether the previous stream state was ordered or unordered. This answer shows an example of how distinct can be affected by a previous unordered().

Note that in principle, sorted, while introducing an order, may depend on the ordered state of previous stage, as it may use an unstable sort algorithm if the previous stream was unordered.

This answer provides a way to print the characteristics of a stream and evaluate how they change due to appending another operation.

When you chain a terminal operation, both, an unordered nature of the terminal operation itself or an unordered state of the last stage before the terminal operation may be sufficient to select an algorithm for the terminal operation that doesn’t try to preserve the order.

In principle, the unordered nature of the terminal operation could be used to affect previous stages, but since stateless intermediate operations are not affected anyway and skip, limit, distinct have to obey a previous ordered state, if present, the only operation that could be affected, is sorted that becomes obsolete if the subsequent operations don’t care about the order anyway.

In the current implementation, since Java 8 update 60, the unordered nature of a terminal operation does not affect the behavior of previous stages. This change was made, as in previous implemen­tations, it wrongly affected skip and limit. Loosing the opportunity to elide obsolete sorting steps was not considered a problem, as chaining sort with unordered subsequent operations, is a corner case. See this answer, including the comments, if you want to know more about the related discussion.

So for

list.stream() // List.stream() returns an ordered stream
.unordered() // releases order contract
.distinct() // for equal elements, it may pick an arbitrary one
.sorted() // re-introduces an order
.skip(1) // will skip the minimum element due to the order
.forEach(System.out::println); // may print the remaining elements in arbitrary order

there is not a single ordered or unordered behavior for the stream pipeline.

In contrast, with

hashSet.stream() // HashSet.stream() has no order (unless being a LinkedHashSet)
.filter(Objects::nonNull) // not affected by order
.distinct() // may use unorderedness, but has no effect anyway, as already distinct
.skip(1) // may skip an arbitrary element
.forEachOrdered(System.out::println); // would respect order if there was one

the entire pipeline runs unordered, just because the source is unordered. With an ordered source, it would be entirely ordered.

So the answer to “is the evaluation of the entire stream pipeline ORDER characteristic done by going through the characteristics of source, intermediate operations and terminal operation even before the start of the execution?” is, yes, this is done right before starting the actual processing, by selecting the appropriate algorithm for the pipeline stages, when there is a choice, but this process does not necessarily result in a single characteristic for the entire pipeline.

Order guarantees using streams and reducing chain of consumers

As Andreas pointed out, Consumer::andThen is an associative function and while the resulting consumer may have a different internal structure, it is still equivalent.

But let's debug it

public static void main(String[] args) {
performAllTasks(IntStream.range(0, 10)
.mapToObj(i -> new DebuggableConsumer(""+i)), new Object());
}
private static <T> void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
Consumer<T> reduced = consumerList.reduce(Consumer::andThen).orElse(x -> {});
reduced.accept(data);
System.out.println(reduced);
}
static class DebuggableConsumer implements Consumer<Object> {
private final Consumer<Object> first, second;
private final boolean leaf;
DebuggableConsumer(String name) {
this(x -> System.out.println(name), x -> {}, true);
}
DebuggableConsumer(Consumer<Object> a, Consumer<Object> b, boolean l) {
first = a; second = b;
leaf = l;
}
public void accept(Object t) {
first.accept(t);
second.accept(t);
}
@Override public Consumer<Object> andThen(Consumer<? super Object> after) {
return new DebuggableConsumer(this, after, false);
}
public @Override String toString() {
if(leaf) return first.toString();
return toString(new StringBuilder(200), 0, 0).toString();
}
private StringBuilder toString(StringBuilder sb, int preS, int preEnd) {
int myHandle = sb.length()-2;
sb.append(leaf? first: "combined").append('\n');
if(!leaf) {
int nPreS=sb.length();
((DebuggableConsumer)first).toString(
sb.append(sb, preS, preEnd).append("\u2502 "), nPreS, sb.length());
nPreS=sb.length();
sb.append(sb, preS, preEnd);
int lastItemHandle=sb.length();
((DebuggableConsumer)second).toString(sb.append(" "), nPreS, sb.length());
sb.setCharAt(lastItemHandle, '\u2514');
}
if(myHandle>0) {
sb.setCharAt(myHandle, '\u251c');
sb.setCharAt(myHandle+1, '\u2500');
}
return sb;
}
}

will print

0
1
2
3
4
5
6
7
8
9
combined
├─combined
│ ├─combined
│ │ ├─combined
│ │ │ ├─combined
│ │ │ │ ├─combined
│ │ │ │ │ ├─combined
│ │ │ │ │ │ ├─combined
│ │ │ │ │ │ │ ├─combined
│ │ │ │ │ │ │ │ ├─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@378fd1ac
│ │ │ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@49097b5d
│ │ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@6e2c634b
│ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@37a71e93
│ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7e6cbb7a
│ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7c3df479
│ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7106e68e
│ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7eda2dbb
│ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@6576fe71
└─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@76fb509a

whereas changing the reduction code to

private static <T> void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
Consumer<T> reduced = consumerList.parallel().reduce(Consumer::andThen).orElse(x -> {});
reduced.accept(data);
System.out.println(reduced);
}

prints on my machine

0
1
2
3
4
5
6
7
8
9
combined
├─combined
│ ├─combined
│ │ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@49097b5d
│ │ └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@6e2c634b
│ └─combined
│ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@37a71e93
│ └─combined
│ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7e6cbb7a
│ └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7c3df479
└─combined
├─combined
│ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7106e68e
│ └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7eda2dbb
└─combined
├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@6576fe71
└─combined
├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@76fb509a
└─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@300ffa5d

illustrating the point of Andreas’ answer, but also highlighting an entirely different problem. You may max it out by using, e.g. IntStream.range(0, 100) in the example code.

The result of the parallel evaluation is actually better than the sequential evaluation, as the sequential evaluation creates an unbalanced tree. When accepting an arbitrary stream of consumers, this can be an actual performance issue or even lead to a StackOverflowError when trying to evaluate the resulting consumer.

For any nontrivial number of consumers, you actually want a balanced consumer tree, but using a parallel stream for that is not the right solution, as a) Consumer::andThen is a cheap operation with no real benefit from parallel evaluation and b) the balancing would depend on unrelated properties, like the nature of the stream source and the number of CPU cores, which determine when the reduction falls back to the sequential algorithm.

Of course, the simplest solution would be

private static <T> void performAllTasks(Stream<Consumer<T>> consumers, T data) {
consumers.forEachOrdered(c -> c.accept(data));
}

But when you want to construct a compound Consumer for re-using, you may use

private static final int ITERATION_THRESHOLD = 16; // tune yourself

public static <T> Consumer<T> combineAllTasks(Stream<Consumer<T>> consumers) {
List<Consumer<T>> consumerList = consumers.collect(Collectors.toList());
if(consumerList.isEmpty()) return t -> {};
if(consumerList.size() == 1) return consumerList.get(0);
if(consumerList.size() < ITERATION_THRESHOLD)
return balancedReduce(consumerList, Consumer::andThen, 0, consumerList.size());
return t -> consumerList.forEach(c -> c.accept(t));
}
private static <T> T balancedReduce(List<T> l, BinaryOperator<T> f, int start, int end) {
if(end-start>2) {
int mid=(start+end)>>>1;
return f.apply(balancedReduce(l, f, start, mid), balancedReduce(l, f, mid, end));
}
T t = l.get(start++);
if(start<end) t = f.apply(t, l.get(start));
assert start==end || start+1==end;
return t;
}

The code will provide a single Consumer just using a loop when the number of consumers exceeds a threshold. This is the simplest and most efficient solution for a larger number of consumers and in fact, you could drop all other approaches for the smaller numbers and still get a reasonable performance…

Note that this still doesn’t hinder parallel processing of the stream of consumers, if their construction really benefits from it.

How to preserve order of the elements in streams

You are requesting a parallel stream, hence, you can’t get a defined processing order, as parallel processing and ordered processing are a general contradiction.

If you want the elements to be processed in order, you have to do both, request a sequential stream and use a terminal operation that depends on the order:

public static void main(String[] args) {
m1().filter(e -> {
System.out.println(e);
return false;
}).findFirst();
}

private static LongStream m1() {
return StreamSupport.longStream(Arrays.spliterator(new long[] { 1, 2, 3, 4 }), false);
}

But note that you should avoid writing software which relies on the processing order. You should define your task in a way that the processing steps do not depend on the processing order, even if the final result may depend on the encounter order.

E.g., if you write

public static void main(String[] args) {
m1().filter(e -> {
System.out.println(e);
return e%2 == 0;
})
.findFirst()
.ifPresent(result -> System.out.println("final result: "+result));
}

private static LongStream m1() {
// note that you can have it much simpler:
return LongStream.of(1, 2, 3, 4 ).parallel();
}

the intermediate output may be in arbitrary order and there’s no guaranty whether or not you will see “3” and “4” in the output, but you have the guaranty that the last output will be “final result: 2”, as that’s the first matching element in encounter order. See also here.

How to get responses in the same order as the original list using parallelStream

The reason it can come out of order is because forEach is an unordered stream terminal operation. From the docs:

For parallel stream pipelines, this operation does not guarantee to respect the encounter order of the stream, as doing so would sacrifice the benefit of parallelism.

Given that, you can choose to use something like the forEachOrdered alternative or map and collect to preserve the order. Here's the map and collect version:

inputs.parallelStream()
.map(input -> processStringInput(input))
.collect(Collectors.toList());

There are more details about the difference between ordered vs unordered and sequential vs parallel streams in this answer: How to ensure order of processing in java8 streams?.



Related Topics



Leave a reply



Submit