Why filter() after flatMap() is not completely lazy in Java streams?
TL;DR, this has been addressed in JDK-8075939 and fixed in Java 10 (and backported to Java 8 in JDK-8225328).
When looking into the implementation (ReferencePipeline.java
) we see the method [link]
@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}
which will be invoke for findFirst
operation. The special thing to take care about is the sink.cancellationRequested()
which allows to end the loop on the first match. Compare to [link]
@Override
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper);
// We can do better than this, by polling cancellationRequested when stream is infinite
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
try (Stream<? extends R> result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstream);
}
}
};
}
};
}
The method for advancing one item ends up calling forEach
on the sub-stream without any possibility for earlier termination and the comment at the beginning of the flatMap
method even tells about this absent feature.
Since this is more than just an optimization thing as it implies that the code simply breaks when the sub-stream is infinite, I hope that the developers soon prove that they “can do better than this”…
To illustrate the implications, while Stream.iterate(0, i->i+1).findFirst()
works as expected, Stream.of("").flatMap(x->Stream.iterate(0, i->i+1)).findFirst()
will end up in an infinite loop.
Regarding the specification, most of it can be found in the
chapter “Stream operations and pipelines” of the package specification:
…
Intermediate operations return a new stream. They are always lazy;
…
… Laziness also allows avoiding examining all the data when it is not necessary; for operations such as "find the first string longer than 1000 characters", it is only necessary to examine just enough strings to find one that has the desired characteristics without examining all of the strings available from the source. (This behavior becomes even more important when the input stream is infinite and not merely large.)
…
Further, some operations are deemed short-circuiting operations. An intermediate operation is short-circuiting if, when presented with infinite input, it may produce a finite stream as a result. A terminal operation is short-circuiting if, when presented with infinite input, it may terminate in finite time. Having a short-circuiting operation in the pipeline is a necessary, but not sufficient, condition for the processing of an infinite stream to terminate normally in finite time.
It’s clear that a short-circuiting operation doesn’t guaranty a finite time termination, e.g. when a filter doesn’t match any item the processing can’t complete, but an implementation which doesn’t support any termination in finite time by simply ignoring the short-circuiting nature of an operation is far off the specification.
Is flatMap guaranteed to be lazy?
Under the current implementation, flatmap
is eager; like any other stateful intermediate operation (like sorted
and distinct
). And it's very easy to prove :
int result = Stream.of(1)
.flatMap(x -> Stream.generate(() -> ThreadLocalRandom.current().nextInt()))
.findFirst()
.get();
System.out.println(result);
This never finishes as flatMap
is computed eagerly. For your example:
urls.stream()
.flatMap(url -> fetchDataFromInternet(url).stream())
.filter(...)
.findFirst()
.get();
It means that for each url
, the flatMap
will block all others operation that come after it, even if you care about a single one. So let's suppose that from a single url
your fetchDataFromInternet(url)
generates 10_000
lines, well your findFirst
will have to wait for all 10_000 to be computed, even if you care about only one.
EDIT
This is fixed in Java 10, where we get our laziness back: see JDK-8075939
EDIT 2
This is fixed in Java 8 too (8u222): JDK-8225328
Are java streams able to lazilly reduce from map/filter conditions?
The actual term you’re asking for is short-circuiting
Further, some operations are deemed short-circuiting operations. An intermediate operation is short-circuiting if, when presented with infinite input, it may produce a finite stream as a result. A terminal operation is short-circuiting if, when presented with infinite input, it may terminate in finite time. Having a short-circuiting operation in the pipeline is a necessary, but not sufficient, condition for the processing of an infinite stream to terminate normally in finite time.
The term “lazy” only applies to intermediate operations and means that they only perform work when being requested by the terminal operation. This is always the case, so when you don’t chain a terminal operation, no intermediate operation will ever process any element.
Finding out whether a terminal operation is short-circuiting, is rather easy. Go to the Stream
API documentation and check whether the particular terminal operation’s documentation contains the sentence
This is a short-circuiting terminal operation.
allMatch
has it, reduce
has not.
This does not mean that such optimizations based on logic or algebra are impossible. But the responsibility lies at the JVM’s optimizer which might do the same for loops. However, this requires inlining of all involved methods to be sure that this conditions always applies and there are no side effect which must be retained. This behavioral compatibility implies that even if the processing gets optimized away, a peek(System.out::println)
would keep printing all elements as if they were processed. In practice, you should not expect such optimizations, as the Stream implementation code is too complex for the optimizer.
Java 8 Stream - Why is filter method not executing?
filter
is an intermediate operation, which will be executed only if the Stream pipeline ends in a terminal operation.
For example :
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
s.startsWith("b");
System.out.println("filter: " + s);
return true;
})
.forEach (System.out::println);
As it is, your filter method is useless, since it always returns true
, and thus performs no filtering.
Why filter() after flatMap() is not completely lazy in Java streams?
TL;DR, this has been addressed in JDK-8075939 and fixed in Java 10 (and backported to Java 8 in JDK-8225328).
When looking into the implementation (ReferencePipeline.java
) we see the method [link]
@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}
which will be invoke for findFirst
operation. The special thing to take care about is the sink.cancellationRequested()
which allows to end the loop on the first match. Compare to [link]
@Override
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper);
// We can do better than this, by polling cancellationRequested when stream is infinite
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
try (Stream<? extends R> result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstream);
}
}
};
}
};
}
The method for advancing one item ends up calling forEach
on the sub-stream without any possibility for earlier termination and the comment at the beginning of the flatMap
method even tells about this absent feature.
Since this is more than just an optimization thing as it implies that the code simply breaks when the sub-stream is infinite, I hope that the developers soon prove that they “can do better than this”…
To illustrate the implications, while Stream.iterate(0, i->i+1).findFirst()
works as expected, Stream.of("").flatMap(x->Stream.iterate(0, i->i+1)).findFirst()
will end up in an infinite loop.
Regarding the specification, most of it can be found in the
chapter “Stream operations and pipelines” of the package specification:
…
Intermediate operations return a new stream. They are always lazy;
…
… Laziness also allows avoiding examining all the data when it is not necessary; for operations such as "find the first string longer than 1000 characters", it is only necessary to examine just enough strings to find one that has the desired characteristics without examining all of the strings available from the source. (This behavior becomes even more important when the input stream is infinite and not merely large.)
…
Further, some operations are deemed short-circuiting operations. An intermediate operation is short-circuiting if, when presented with infinite input, it may produce a finite stream as a result. A terminal operation is short-circuiting if, when presented with infinite input, it may terminate in finite time. Having a short-circuiting operation in the pipeline is a necessary, but not sufficient, condition for the processing of an infinite stream to terminate normally in finite time.
It’s clear that a short-circuiting operation doesn’t guaranty a finite time termination, e.g. when a filter doesn’t match any item the processing can’t complete, but an implementation which doesn’t support any termination in finite time by simply ignoring the short-circuiting nature of an operation is far off the specification.
takeWhile() working differently with flatmap
This is a bug in JDK 9 - from issue #8193856:
takeWhile
is incorrectly assuming that an upstream operation supports and honors cancellation, which unfortunately is not the case forflatMap
.
Explanation
If the stream is ordered, takeWhile
should show the expected behavior. This is not entirely the case in your code because you use forEach
, which waives order. If you care about it, which you do in this example, you should use forEachOrdered
instead. Funny thing: That doesn't change anything. /p>
So maybe the stream isn't ordered in the first place? (In that case the behavior is ok.) If you create a temporary variable for the stream created from strArray
and check whether it is ordered by executing the expression ((StatefulOp) stream).isOrdered();
at the breakpoint, you will find that it is indeed ordered:
String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};
Stream<String> stream = Arrays.stream(strArray)
.flatMap(indStream -> Arrays.stream(indStream))
.takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));
// breakpoint here
System.out.println(stream);
That means that this is very likely an implementation error.
Into The Code
As others have suspected, I now also think that this might be connected to flatMap
being eager. More precisely, both problems might have the same root cause.
Looking into the source of WhileOps
, we can see these methods:
@Override
public void accept(T t) {
if (take = predicate.test(t)) {
downstream.accept(t);
}
}
@Override
public boolean cancellationRequested() {
return !take || downstream.cancellationRequested();
}
This code is used by takeWhile
to check for a given stream element t
whether the predicate
is fulfilled:
- If so, it passes the element on to the
downstream
operation, in this caseSystem.out::println
. - If not, it sets
take
to false, so when it is asked next time whether the pipeline should be canceled (i.e. it is done), it returnstrue
.
This covers the takeWhile
operation. The other thing you need to know is that forEachOrdered
leads to the terminal operation executing the method ReferencePipeline::forEachWithCancel
:
@Override
final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
boolean cancelled;
do { } while (
!(cancelled = sink.cancellationRequested())
&& spliterator.tryAdvance(sink));
return cancelled;
}
All this does is:
- check whether pipeline was canceled
- if not, advance the sink by one element
- stop if this was the last element
Looks promising, right?
Without flatMap
In the "good case" (without flatMap
; your second example) forEachWithCancel
directly operates on the WhileOp
as sink
and you can see how this plays out:
ReferencePipeline::forEachWithCancel
does its loop:WhileOps::accept
is given each stream elementWhileOps::cancellationRequested
is queried after each element
- at some point
"Sample4"
fails the predicate and the stream is canceled
Yay!
With flatMap
In the "bad case" (with flatMap
; your first example), forEachWithCancel
operates on the flatMap
operation, though, , which simply calls forEachRemaining
on the ArraySpliterator
for {"Sample3", "Sample4", "Sample5"}
, which does this:
if ((a = array).length >= (hi = fence) &&
(i = index) >= 0 && i < (index = hi)) {
do { action.accept((T)a[i]); } while (++i < hi);
}
Ignoring all that hi
and fence
stuff, which is only used if the array processing is split for a parallel stream, this is a simple for
loop, which passes each element to the takeWhile
operation, but never checks whether it is cancelled. It will hence eagerly ply through all elements in that "substream" before stopping, likely even through the rest of the stream.
Java Streams - Buffering huge streams
This seems to be connected to the older issue “Why filter() after flatMap() is "not completely" lazy in Java streams?”. While that issue has been fixed for the Stream’s builtin operations, it seems to still exist when we try to iterate over a flatmapped stream externally.
We can simplify the code to reproduce the problem to
Stream.of(LongStream.range(0, Integer.MAX_VALUE))
.flatMapToLong(x -> x)
.iterator().hasNext();
Note that using Spliterator
is affected as well
Stream.of(LongStream.range(0, Integer.MAX_VALUE))
.flatMapToLong(x -> x)
.spliterator()
.tryAdvance((long l) -> System.out.println("first item: "+l));
Both try to buffer elements until ultimately bailing out with an OutOfMemoryError
.
Since spliterator().forEachRemaining(…)
seems not to be affected, you could implement a solution which works for your use case of forEach
, but it would be fragile, as it would still exhibit the problem for short-circuiting stream operations.
public static <T> Stream<List<T>> buffer(Stream<T> stream, final int count) {
boolean parallel = stream.isParallel();
Spliterator<T> source = stream.spliterator();
return StreamSupport.stream(
new Spliterators.AbstractSpliterator<List<T>>(
(source.estimateSize()+count-1)/count, source.characteristics()
&(Spliterator.SIZED|Spliterator.DISTINCT|Spliterator.ORDERED)
| Spliterator.NONNULL) {
List<T> list;
Consumer<T> c = t -> list.add(t);
@Override
public boolean tryAdvance(Consumer<? super List<T>> action) {
if(list == null) list = new ArrayList<>(count);
if(!source.tryAdvance(c)) return false;
do {} while(list.size() < count && source.tryAdvance(c));
action.accept(list);
list = null;
return true;
}
@Override
public void forEachRemaining(Consumer<? super List<T>> action) {
source.forEachRemaining(t -> {
if(list == null) list = new ArrayList<>(count);
list.add(t);
if(list.size() == count) {
action.accept(list);
list = null;
}
});
if(list != null) {
action.accept(list);
list = null;
}
}
}, parallel);
}
But note that Spliterator
based solutions are preferable in general, as they support carrying additional information enabling optimizations and have lower iteration costs in a lot of use cases. So this is the way to go once this issue has been fixed in the JDK code.
As a workaround, you can use Stream.concat(…)
to combine streams, but it has an explicit warning about not to combine too many streams at once in its documentation:
Use caution when constructing streams from repeated concatenation. Accessing an element of a deeply concatenated stream can result in deep call chains, or even
StackOverflowException
[sic].
The throwable’s name has been corrected to StackOverflowError
in Java 9’s documentation
Related Topics
Simplest Way to Read Json from a Url in Java
Spring - @Transactional - What Happens in Background
How to Read from a Inputstream With a Timeout
Swing Animation Running Extremely Slow
How to Do a Deep Copy of a 2D Array in Java
When to Choose Checked and Unchecked Exceptions
When to Use: Java 8+ Interface Default Method, Vs. Abstract Method
Wait For Page Load in Selenium
What's the Best Way to Build a String of Delimited Items in Java
How to Efficiently Remove Duplicates from an Array Without Using Set
Can Java Finalize an Object When It Is Still in Scope
Java Https Client Certificate Authentication
Number of Lines in a File in Java
Value Change Listener to Jtextfield
How to Add Jars to Maven 2 Build Classpath Without Installing Them