Why Filter() After Flatmap() Is "Not Completely" Lazy in Java Streams

Why filter() after flatMap() is not completely lazy in Java streams?

TL;DR, this has been addressed in JDK-8075939 and fixed in Java 10 (and backported to Java 8 in JDK-8225328).

When looking into the implementation (ReferencePipeline.java) we see the method [link]

@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}

which will be invoke for findFirst operation. The special thing to take care about is the sink.cancellationRequested() which allows to end the loop on the first match. Compare to [link]

@Override
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper);
// We can do better than this, by polling cancellationRequested when stream is infinite
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}

@Override
public void accept(P_OUT u) {
try (Stream<? extends R> result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstream);
}
}
};
}
};
}

The method for advancing one item ends up calling forEach on the sub-stream without any possibility for earlier termination and the comment at the beginning of the flatMap method even tells about this absent feature.

Since this is more than just an optimization thing as it implies that the code simply breaks when the sub-stream is infinite, I hope that the developers soon prove that they “can do better than this”…


To illustrate the implications, while Stream.iterate(0, i->i+1).findFirst() works as expected, Stream.of("").flatMap(x->Stream.iterate(0, i->i+1)).findFirst() will end up in an infinite loop.

Regarding the specification, most of it can be found in the

chapter “Stream operations and pipelines” of the package specification:

Intermediate operations return a new stream. They are always lazy;

… Laziness also allows avoiding examining all the data when it is not necessary; for operations such as "find the first string longer than 1000 characters", it is only necessary to examine just enough strings to find one that has the desired characteristics without examining all of the strings available from the source. (This behavior becomes even more important when the input stream is infinite and not merely large.)

Further, some operations are deemed short-circuiting operations. An intermediate operation is short-circuiting if, when presented with infinite input, it may produce a finite stream as a result. A terminal operation is short-circuiting if, when presented with infinite input, it may terminate in finite time. Having a short-circuiting operation in the pipeline is a necessary, but not sufficient, condition for the processing of an infinite stream to terminate normally in finite time.

It’s clear that a short-circuiting operation doesn’t guaranty a finite time termination, e.g. when a filter doesn’t match any item the processing can’t complete, but an implementation which doesn’t support any termination in finite time by simply ignoring the short-circuiting nature of an operation is far off the specification.

Is flatMap guaranteed to be lazy?

Under the current implementation, flatmap is eager; like any other stateful intermediate operation (like sorted and distinct). And it's very easy to prove :

 int result = Stream.of(1)
.flatMap(x -> Stream.generate(() -> ThreadLocalRandom.current().nextInt()))
.findFirst()
.get();

System.out.println(result);

This never finishes as flatMap is computed eagerly. For your example:

urls.stream()
.flatMap(url -> fetchDataFromInternet(url).stream())
.filter(...)
.findFirst()
.get();

It means that for each url, the flatMap will block all others operation that come after it, even if you care about a single one. So let's suppose that from a single url your fetchDataFromInternet(url) generates 10_000 lines, well your findFirst will have to wait for all 10_000 to be computed, even if you care about only one.

EDIT

This is fixed in Java 10, where we get our laziness back: see JDK-8075939

EDIT 2

This is fixed in Java 8 too (8u222): JDK-8225328

Are java streams able to lazilly reduce from map/filter conditions?

The actual term you’re asking for is short-circuiting

Further, some operations are deemed short-circuiting operations. An intermediate operation is short-circuiting if, when presented with infinite input, it may produce a finite stream as a result. A terminal operation is short-circuiting if, when presented with infinite input, it may terminate in finite time. Having a short-circuiting operation in the pipeline is a necessary, but not sufficient, condition for the processing of an infinite stream to terminate normally in finite time.

The term “lazy” only applies to intermediate operations and means that they only perform work when being requested by the terminal operation. This is always the case, so when you don’t chain a terminal operation, no intermediate operation will ever process any element.

Finding out whether a terminal operation is short-circuiting, is rather easy. Go to the Stream API documentation and check whether the particular terminal operation’s documentation contains the sentence

This is a short-circuiting terminal operation.

allMatch has it, reduce has not.

This does not mean that such optimizations based on logic or algebra are impossible. But the responsibility lies at the JVM’s optimizer which might do the same for loops. However, this requires inlining of all involved methods to be sure that this conditions always applies and there are no side effect which must be retained. This behavioral compatibility implies that even if the processing gets optimized away, a peek(System.out::println) would keep printing all elements as if they were processed. In practice, you should not expect such optimizations, as the Stream implementation code is too complex for the optimizer.

Java 8 Stream - Why is filter method not executing?

filter is an intermediate operation, which will be executed only if the Stream pipeline ends in a terminal operation.

For example :

Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
s.startsWith("b");
System.out.println("filter: " + s);
return true;
})
.forEach (System.out::println);

As it is, your filter method is useless, since it always returns true, and thus performs no filtering.

Why filter() after flatMap() is not completely lazy in Java streams?

TL;DR, this has been addressed in JDK-8075939 and fixed in Java 10 (and backported to Java 8 in JDK-8225328).

When looking into the implementation (ReferencePipeline.java) we see the method [link]

@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}

which will be invoke for findFirst operation. The special thing to take care about is the sink.cancellationRequested() which allows to end the loop on the first match. Compare to [link]

@Override
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper);
// We can do better than this, by polling cancellationRequested when stream is infinite
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}

@Override
public void accept(P_OUT u) {
try (Stream<? extends R> result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstream);
}
}
};
}
};
}

The method for advancing one item ends up calling forEach on the sub-stream without any possibility for earlier termination and the comment at the beginning of the flatMap method even tells about this absent feature.

Since this is more than just an optimization thing as it implies that the code simply breaks when the sub-stream is infinite, I hope that the developers soon prove that they “can do better than this”…


To illustrate the implications, while Stream.iterate(0, i->i+1).findFirst() works as expected, Stream.of("").flatMap(x->Stream.iterate(0, i->i+1)).findFirst() will end up in an infinite loop.

Regarding the specification, most of it can be found in the

chapter “Stream operations and pipelines” of the package specification:

Intermediate operations return a new stream. They are always lazy;

… Laziness also allows avoiding examining all the data when it is not necessary; for operations such as "find the first string longer than 1000 characters", it is only necessary to examine just enough strings to find one that has the desired characteristics without examining all of the strings available from the source. (This behavior becomes even more important when the input stream is infinite and not merely large.)

Further, some operations are deemed short-circuiting operations. An intermediate operation is short-circuiting if, when presented with infinite input, it may produce a finite stream as a result. A terminal operation is short-circuiting if, when presented with infinite input, it may terminate in finite time. Having a short-circuiting operation in the pipeline is a necessary, but not sufficient, condition for the processing of an infinite stream to terminate normally in finite time.

It’s clear that a short-circuiting operation doesn’t guaranty a finite time termination, e.g. when a filter doesn’t match any item the processing can’t complete, but an implementation which doesn’t support any termination in finite time by simply ignoring the short-circuiting nature of an operation is far off the specification.

takeWhile() working differently with flatmap

This is a bug in JDK 9 - from issue #8193856:

takeWhile is incorrectly assuming that an upstream operation supports and honors cancellation, which unfortunately is not the case for flatMap.

Explanation

If the stream is ordered, takeWhile should show the expected behavior. This is not entirely the case in your code because you use forEach, which waives order. If you care about it, which you do in this example, you should use forEachOrdered instead. Funny thing: That doesn't change anything. /p>

So maybe the stream isn't ordered in the first place? (In that case the behavior is ok.) If you create a temporary variable for the stream created from strArray and check whether it is ordered by executing the expression ((StatefulOp) stream).isOrdered(); at the breakpoint, you will find that it is indeed ordered:

String[][] strArray = {{"Sample1", "Sample2"}, {"Sample3", "Sample4", "Sample5"}};

Stream<String> stream = Arrays.stream(strArray)
.flatMap(indStream -> Arrays.stream(indStream))
.takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));

// breakpoint here
System.out.println(stream);

That means that this is very likely an implementation error.

Into The Code

As others have suspected, I now also think that this might be connected to flatMap being eager. More precisely, both problems might have the same root cause.

Looking into the source of WhileOps, we can see these methods:

@Override
public void accept(T t) {
if (take = predicate.test(t)) {
downstream.accept(t);
}
}

@Override
public boolean cancellationRequested() {
return !take || downstream.cancellationRequested();
}

This code is used by takeWhile to check for a given stream element t whether the predicate is fulfilled:

  • If so, it passes the element on to the downstream operation, in this case System.out::println.
  • If not, it sets take to false, so when it is asked next time whether the pipeline should be canceled (i.e. it is done), it returns true.

This covers the takeWhile operation. The other thing you need to know is that forEachOrdered leads to the terminal operation executing the method ReferencePipeline::forEachWithCancel:

@Override
final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
boolean cancelled;
do { } while (
!(cancelled = sink.cancellationRequested())
&& spliterator.tryAdvance(sink));
return cancelled;
}

All this does is:

  1. check whether pipeline was canceled
  2. if not, advance the sink by one element
  3. stop if this was the last element

Looks promising, right?

Without flatMap

In the "good case" (without flatMap; your second example) forEachWithCancel directly operates on the WhileOp as sink and you can see how this plays out:

  • ReferencePipeline::forEachWithCancel does its loop:

    • WhileOps::accept is given each stream element
    • WhileOps::cancellationRequested is queried after each element
  • at some point "Sample4" fails the predicate and the stream is canceled

Yay!

With flatMap

In the "bad case" (with flatMap; your first example), forEachWithCancel operates on the flatMap operation, though, , which simply calls forEachRemaining on the ArraySpliterator for {"Sample3", "Sample4", "Sample5"}, which does this:

if ((a = array).length >= (hi = fence) &&
(i = index) >= 0 && i < (index = hi)) {
do { action.accept((T)a[i]); } while (++i < hi);
}

Ignoring all that hi and fence stuff, which is only used if the array processing is split for a parallel stream, this is a simple for loop, which passes each element to the takeWhile operation, but never checks whether it is cancelled. It will hence eagerly ply through all elements in that "substream" before stopping, likely even through the rest of the stream.

Java Streams - Buffering huge streams

This seems to be connected to the older issue “Why filter() after flatMap() is "not completely" lazy in Java streams?”. While that issue has been fixed for the Stream’s builtin operations, it seems to still exist when we try to iterate over a flatmapped stream externally.

We can simplify the code to reproduce the problem to

Stream.of(LongStream.range(0, Integer.MAX_VALUE))
.flatMapToLong(x -> x)
.iterator().hasNext();

Note that using Spliterator is affected as well

Stream.of(LongStream.range(0, Integer.MAX_VALUE))
.flatMapToLong(x -> x)
.spliterator()
.tryAdvance((long l) -> System.out.println("first item: "+l));

Both try to buffer elements until ultimately bailing out with an OutOfMemoryError.

Since spliterator().forEachRemaining(…) seems not to be affected, you could implement a solution which works for your use case of forEach, but it would be fragile, as it would still exhibit the problem for short-circuiting stream operations.

public static <T> Stream<List<T>> buffer(Stream<T> stream, final int count) {
boolean parallel = stream.isParallel();
Spliterator<T> source = stream.spliterator();
return StreamSupport.stream(
new Spliterators.AbstractSpliterator<List<T>>(
(source.estimateSize()+count-1)/count, source.characteristics()
&(Spliterator.SIZED|Spliterator.DISTINCT|Spliterator.ORDERED)
| Spliterator.NONNULL) {
List<T> list;
Consumer<T> c = t -> list.add(t);
@Override
public boolean tryAdvance(Consumer<? super List<T>> action) {
if(list == null) list = new ArrayList<>(count);
if(!source.tryAdvance(c)) return false;
do {} while(list.size() < count && source.tryAdvance(c));
action.accept(list);
list = null;
return true;
}
@Override
public void forEachRemaining(Consumer<? super List<T>> action) {
source.forEachRemaining(t -> {
if(list == null) list = new ArrayList<>(count);
list.add(t);
if(list.size() == count) {
action.accept(list);
list = null;
}
});
if(list != null) {
action.accept(list);
list = null;
}
}
}, parallel);
}

But note that Spliterator based solutions are preferable in general, as they support carrying additional information enabling optimizations and have lower iteration costs in a lot of use cases. So this is the way to go once this issue has been fixed in the JDK code.

As a workaround, you can use Stream.concat(…) to combine streams, but it has an explicit warning about not to combine too many streams at once in its documentation:

Use caution when constructing streams from repeated concatenation. Accessing an element of a deeply concatenated stream can result in deep call chains, or even StackOverflowException [sic].

The throwable’s name has been corrected to StackOverflowError in Java 9’s documentation



Related Topics



Leave a reply



Submit