Stream and Lazy Evaluation

Stream and lazy evaluation

It means that the filter is only applied during the terminal operation. Think of something like this:

public Stream filter(Predicate p) {
this.filter = p; // just store it, don't apply it yet
return this; // in reality: return a new stream
}
public List collect() {
for (Object o : stream) {
if (filter.test(o)) list.add(o);
}
return list;
}

(That does not compile and is a simplification of the reality but the principle is there)

Is Java 8 stream laziness useless in practice?

Your terminal operation, toArray(), perhaps supports your argument given that it requires all elements of the stream.

Some terminal operations don't. And for these, it would be a waste if streams weren't lazily executed. Two examples:

//example 1: print first element of 1000 after transformations
IntStream.range(0, 1000)
.peek(System.out::println)
.mapToObj(String::valueOf)
.peek(System.out::println)
.findFirst()
.ifPresent(System.out::println);

//example 2: check if any value has an even key
boolean valid = records.
.map(this::heavyConversion)
.filter(this::checkWithWebService)
.mapToInt(Record::getKey)
.anyMatch(i -> i % 2 == 0)

The first stream will print:

0
0
0

That is, intermediate operations will be run just on one element. This is an important optimization. If it weren't lazy, then all the peek() calls would have to run on all elements (absolutely unnecessary as you're interested in just one element). Intermediate operations can be expensive (such as in the second example)

Short-circuiting terminal operation (of which toArray isn't) make this optimization possible.

How does Lazy Evaluation and Eager Evaluation work in Java 8

Stateless streams are designed to handle a possibly infinite amount of elements. For this to be possible, the stream cannot attempt to evaluate each operation on all of the elements at once.

Sequential streams always execute the pipeline one element at a time. Each element goes through the entire pipeline before the next element starts. This is what allows lazy evaluation to be effective. It allows the stream to short-circuit, such as with findFirst or allMatch. If all of the elements were processed at once at each stage, streams would not be able to handle infinite data sources, such as from Stream.iterate.

The output of your code demonstrates that the elements go through the pipeline one at a time:

Iterator Results:
Tested: 4 // starting 1st element
iter: 4 // ending 1st element
Tested: 5 // starting 2nd element (fails the filter)
Tested: 8 // starting 3rd element
iter: 8 // ending 3rd element
Tested: 7 // starting 4th element (fails the filter)
...
Last Statement

These behaviors are explained more generally in the JavaDoc.

Java Stream API lazy evaluation internals

If I said the following: "Intermediate operations on a stream are not evaluated until the terminal operation is hit, which will actually perform them," would I be correct?

Yes.

I was wondering if it just returned itself and then just kept track of the the intermediate operations to perform as it went along.

No.

It is returning a new Stream object which is defined in terms of the previous Stream object.

Java 8 Streams Filter Intention of Lazy Evaluation

To compile, the second one should be

collectionOfThings.
stream().
filter(thing -> thing.condition1()).
filter(thing -> thing.condition2())

They're both equivalent. Sometimes one is more readable than the other.

An alternative way of writing the second one would be to use a method reference:

collectionOfThings
.stream()
.filter(Thing::condition1)
.filter(Thing::condition2)

Also note that the convention is to place the dot at the beginning of the line rather than the end, as you would write a bulleted list.

Lazy evaluation of streams in Java: stream has already been operated upon or closed

The map call on line 3 sets the linkedOrConsumed flag on ls to true.
This causes the error on line 4.

Even though the operation hasn't occurred yet, Java apparently won't allow you to redefine what you are going to do with the stream.

Java Streams concat Streams with Supplier followed by distinct (lazy evaluation behavior)

Streams are lazy because intermediate operations are not evaluated
unless terminal operation is invoked. check SO answer here

To the best of my understanding Stream Api streams each element until the terminal operation is applied, and then the next element is streamed.

This will also explain the situation here. Element "0" is streamed, terminal operation is not satisfied. Another one needs to be streamed, "1" now, terminal operation .anyMatch("1"::equals)); is satisfied. No need for any more elements to be streamed. Distinct will be invoked in between without any need to change the streamed elements though.

So if you had after "0" another "0" to be streamed it would not reach the terminal operation at all.

 private void doConcat(){        

System.out.println(Stream.concat(buildStreamFromRange(0,1000).get()
,buildStreamFromRange(1000,2000).get())
.distinct()
.peek( e -> System.out.println(e))
.anyMatch("1"::equals));

Try adding peek and try to stream 2 "0" elements in the start. Only 1 of them will pass the flow and be printed from peek.

Peek is also to be used for debugging purposes and to see how the flow behaves when you are not sure, so use it to your advantage in the future.

Simple example for future readers:

A more simple example where future readers will be able to understand how lazy operators in stream work is the following:

Stream.of("0","0","1","2","3","4")
.distinct()
.peek(a->System.out.println("after distinct: "+a))
.anyMatch("1"::equals);

Will print

after distinct: 0
after distinct: 1

First "0" goes until the terminal operation but does not satisfy it. Another element must be streamed.

Second "0" is filtered through .distinct() and never reaches terminal operation

Since the terminal operation is not satisfied yet, next element is streamed.

"1" goes through terminal operation and satisfies it.

No more elements need to be streamed.

Trying to get lazy evaluation to work for infinite stream

The problem is caused by InfiniteStream.filter, it creates the tail filter as a lazy value but then accesses it immediately which forces the value to be evaluated. This causes the whole stream to be evaluated as recursive calls blowing up the stack.

A lazy val delays the execution of the expression used to construct a variable until it is accessed. So you need to delay access to tail.filter(predicate) until the user of the stream accesses the tail.

The easiest and more functional way to achieve this would be to implement filter with a view. That is filter returns a new stream that only filters the tail on demand.

EG

class FilterStream[+A] private (predicate: predicate: A => Boolean, stream: MyStream) extends MyStream[A] {
override def head: A = stream.head
override def tail: MyStream[A] = FilterStream.dropWhile(!predicate(_), stream)
}

object FilterStream {
def apply[A](predicate: predicate: A => Boolean, stream: MyStream[A]): MyStream[A] = {
new FilterStream(predicate, dropWhile(!predicate(_), stream))
}

@tailrec
def dropWhile[A](predicate: predicate: A => Boolean, stream: MyStream[A]): MyStream[A] = {
if (stream.isEmpty || predicate(stream.head)) stream
else dropWhile(predicate, stream.tail)
}
}

Finally you should consider implementing an empty stream with its own type and object for many reasons but also so that you can terminate an infinite stream if your generator decides it wants to.

object Nil extends MyStream[Nothing] {
override def head: A = throw NoSuchElement
override def tail: MyStream[A] = throw NoSuchElement
}

Head and tail are always unsafe methods, another improvement would be to use case classes to expose the shape of your stream, then users would pattern match on the stream. This would protect your users from having to use unsafe methods like head and tail.



Related Topics



Leave a reply



Submit