Java 8 Streams - Collect VS Reduce

Java 8 Streams - collect vs reduce

reduce is a "fold" operation, it applies a binary operator to each element in the stream where the first argument to the operator is the return value of the previous application and the second argument is the current stream element.

collect is an aggregation operation where a "collection" is created and each element is "added" to that collection. Collections in different parts of the stream are then added together.

The document you linked gives the reason for having two different approaches:

If we wanted to take a stream of strings and concatenate them into a
single long string, we could achieve this with ordinary reduction:

 String concatenated = strings.reduce("", String::concat)  

We would get the desired result, and it would even work in parallel.
However, we might not be happy about the performance! Such an
implementation would do a great deal of string copying, and the run
time would be O(n^2) in the number of characters. A more performant
approach would be to accumulate the results into a StringBuilder,
which is a mutable container for accumulating strings. We can use the
same technique to parallelize mutable reduction as we do with ordinary
reduction.

So the point is that the parallelisation is the same in both cases but in the reduce case we apply the function to the stream elements themselves. In the collect case we apply the function to a mutable container.

Java 8 stream, how to break in reduce or in collect without throwing runtime exception?

Given an array as input, seems to me you're looking for something like this :

int stateStream(int[] arr) {
return IntStream.range(0, arr.length - 1)
.filter(i -> arr[i + 1] - arr[i] > 1) // your condition
.mapToObj(i -> arr[i])
.findFirst() // first such occurrence
.map(i -> i + 1) // to add 1 to the point where the cehck actually failed
.orElse(0); // some default value
}

or from the scratch, while you convert it to a sorted and filtered value list as :

int stateStream(int[] arr) {
List<Integer> list = Arrays.stream(arr)
.boxed().sorted()
.filter(value -> value > 0)
.collect(Collectors.toList());
return IntStream.range(0, list.size() - 1)
.filter(i -> list.get(i + 1) - list.get(i) > 1)
.mapToObj(list::get)
.findFirst()
.map(i -> i + 1)
.orElse(0);
}

Java 8 collect vs reduce

Basically, the question reduces to this: BiConsumer is a functional interface whose function is declared like this:

void accept(T t, U u)

You've given it a method reference with the correct parameters, but the wrong return type:

public MutableContainer reduce(long param) {
return new MutableContainer(param);
}

[The T parameter is actually the this object when reduce is called, since reduce is an instance method and not a static method. That's why the parameters are correct.] The return type is MutableContainer and not void, however. So the question is, why does the compiler accept it?

Intuitively, I think this is because a method reference is, more or less, equivalent to an anonymous class that looks like this:

new BiConsumer<MutableContainer,Long>() {
@Override
public void accept(MutableContainer t, Long u) {
t.reduce(u);
}
}

Note that t.reduce(u) will return a result. However, the result is discarded. Since it's OK to call a method with a result and discard the result, I think that, by extension, that's why it's OK to use a method reference where the method returns the result, for a functional interface whose method returns void.

Legalistically, I believe the reason is in JLS 15.12.2.5. This section is difficult and I don't fully understand it, but somewhere in this section it says

If e is an exact method reference expression ... R2 is
void.

where, if I read it correctly, R2 is the result type of a functional interface method. I think this is the clause that allows a non-void method reference to be used where a void method reference is expected.

(Edit: As Ismail pointed out in the comments, JLS 15.13.2 may be the correct clause here; it talks about a method reference being congruent with a function type, and one of the conditions for this is that the result of the function type is void.)

Anyway, that should hopefully explain why it compiles. Of course, the compiler can't always tell when you're doing something that will produce incorrect results.

Java 8 streams - use reduce with alternative accumulator return type

For mutable reduction you should use collect, not reduce:

PdfPTable myTable = 
Stream.of(Labels.ITEM_NAME,Labels.QUANTITY,Labels.PRICE)
.map(s -> new PdfPCell(new Phrase(s)))
.collect(() -> new PdfPTable(new float[]{ 100.0f }), // supplier
(table, cell) -> table.addCell(cell), // accumulator
(table1,table2) -> table1.addAllCells(table2.getCells())); // combiner

For the combiner, I made some assumptions on methods which may or may not exist in your PdfPTable class, but that's the general idea.

Surprising performance differences between Stream.reduce() and Stream.collect()

Basically, you are measuring the initial overhead of code that is executed the first time. Not only that the optimizer didn’t any work yet, you are measuring the overhead of loading, verifying and initializing the classes.

So it’s no wonder that the evaluation times decrease as each evaluation can reuse classes already loaded for the previous evaluation. Running all three evaluations in a loop or even just changing the order will give you an entirely different picture.

The only predictable result is that the simple recursive evaluation will have the smallest initial overhead as it doesn’t require loading of the Stream API classes.


If you run the code multiple times, or better, use a sophisticated benchmark tool, I guess you will get results similar to mine, where reduce clearly outperforms collect and is indeed faster than the single threaded approach.

The reason collect is slower is because you are using it entirely wrong. The Supplier will be queried for each thread to get a distinct container, therefore the accumulator function does not need any additional synchronization. But it’s important that the combiner function does its work correctly for joining the result containers of the different threads into a single result.

A correct approach would be:

BigInteger[] result2 = Stream.iterate(ONE, x -> x.add(ONE)).parallel().limit(N)
.collect(()->new BigInteger[]{ONE},
(a,v)->a[0]=a[0].multiply(v), (a,b)->a[0]=a[0].multiply(b[0]));

On my system, its performance is on par with the reduce approach. Since using an array as mutable container can’t change the immutable nature of BigInteger, there is no advantage in using collect here, using reduce is straight-forward and, as said, has equivalent performance when both methods are used correctly.


By the way, I don’t get why so many programmers try to create self-referential lambda expressions. The straight-forward way for recursive function still is a method:

static BigInteger fac(long x) {
return x==0? ONE : BigInteger.valueOf(x).multiply(fac(x - 1));
}
static final Function<Long, BigInteger> fac=StartMe::fac;

(Though in your code, you don’t need the Function<Long, BigInteger> at all, just call fac(long) directly).


As a final note, both Stream.iterate and Stream.limit, are really bad for parallel execution. Using a stream with a predictable size and independent operations will outperform your solutions significantly:

BigInteger result4 = LongStream.rangeClosed(1, N).parallel()
.mapToObj(BigInteger::valueOf).reduce(BigInteger::multiply).orElse(ONE);

Reduce a collection to a single object of another type with streams

Did you mean :

collection.stream()
.filter(e -> e.key.equals("2"))
.findFirst()
.orElse(null);//Or any default value

You can even throw an exception :

collection.stream()
.filter(e -> e.key.equals("2"))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("No data found"));

Collect/reduce on a stream when no combine function exists

I would just use a loop over the elements of the collection:

Foo result = new Foo();
for (int ele : collection) {
result = result.foo(ele);
}

This is easy to read, of similar verbosity to the streams approach, and you don't have to worry about its behaviour changing unexpectedly because of dark voodoo in the Streams library.

Java 8 Streams reduce remove duplicates keeping the most recent entry

Well, I am just going to put my comment here in the shape of an answer:

 yourList.stream()
.collect(Collectors.toMap(
EmployeeContract::getId,
Function.identity(),
BinaryOperator.maxBy(Comparator.comparing(EmployeeContract::getDate)))
)
.values();

This will give you a Collection instead of a List, if you really care about this.

Reduce vs Collect method on Parallel Streams vs Serial Streams

To begin, the given program is modified to see what happen inside each scenario.

import java.util.Set;
import java.util.TreeSet;

public class ReduceVsCollect {
public static void main(String[] args) {
Set<String> set = new TreeSet<>(Set.of("b", "c", "d"));
parallelReduceA_Append_B(set);
parallelCollectA_Append_B(set);
parallelReduceB_Append_A(set);
parallelCollectB_Append_A(set);
}

private static void parallelReduceA_Append_B(Set<String> set) {
System.out.println("-".repeat(80));
System.out.println("Start parallelReduceA_Append_B");
StringBuilder identity = new StringBuilder();
System.out.println("identity hash: " + System.identityHashCode(identity));
StringBuilder result = set
.parallelStream()
.map(s -> new StringBuilder(s))
.reduce(identity,
(a, b) -> {
System.out.println(String.format("called accumulator, a hash:%s, a:%s, b:%s", System.identityHashCode(a), a, b));
return a.append(b);
},
(a, b) -> {
System.out.println(String.format("called combiner, a hash:%s, a:%s, b:%s", System.identityHashCode(a), a, b));
return a.append(b);
}
);
System.out.println("Result: " + result);
}

private static void parallelCollectA_Append_B(Set<String> set) {
System.out.println("-".repeat(80));
System.out.println("Start parallelCollectA_Append_B");
StringBuilder result = set
.parallelStream()
.map(s -> new StringBuilder(s))
.collect(StringBuilder::new,
(a, b) -> {
System.out.println(String.format("called accumulator, a hash:%s, a:%s, b:%s", System.identityHashCode(a), a, b));
a.append(b);
},
(a, b) -> {
System.out.println(String.format("called combiner, a hash:%s, a:%s, b:%s", System.identityHashCode(a), a, b));
a.append(b);
});
System.out.println("Result: " + result);
}

private static void parallelReduceB_Append_A(Set<String> set) {
System.out.println("-".repeat(80));
System.out.println("Start parallelReduceB_Append_A");
StringBuilder identity = new StringBuilder();
System.out.println("identity hash: " + System.identityHashCode(identity));
StringBuilder result = set
.parallelStream()
.map(s -> new StringBuilder(s))
.reduce(identity,
(a, b) -> {
System.out.println(String.format("called accumulator, a hash:%s, a:%s, b:%s", System.identityHashCode(a), a, b));
return b.append(a);
},
(a, b) -> {
System.out.println(String.format("called combiner, a hash:%s, a:%s, b:%s", System.identityHashCode(a), a, b));
return b.append(a);
});
System.out.println("Result: " + result);
}

private static void parallelCollectB_Append_A(Set<String> set) {
System.out.println("-".repeat(80));
System.out.println("Start parallelCollectB_Append_A");
StringBuilder result = set
.parallelStream()
.map(s -> new StringBuilder(s))
.collect(StringBuilder::new,
(a, b) -> {
System.out.println(String.format("called accumulator, a hash:%s, a:%s, b:%s", System.identityHashCode(a), a, b));
b.append(a);
},
(a, b) -> {
System.out.println(String.format("called combiner, a hash:%s, a:%s, b:%s", System.identityHashCode(a), a, b));
b.append(a);
});
System.out.println("Result: " + result);
}
}

Result

--------------------------------------------------------------------------------
Start parallelReduceA_Append_B
identity hash: 1642360923
called accumulator, a hash:1642360923, a:, b:c
called accumulator, a hash:1642360923, a:, b:b
called accumulator, a hash:1642360923, a:cb, b:d
called combiner, a hash:1642360923, a:cbd, b:cbd
Result: cbdcbd
--------------------------------------------------------------------------------
Start parallelCollectA_Append_B
called accumulator, a hash:1940447180, a:, b:c
called accumulator, a hash:572416449, a:, b:b
called accumulator, a hash:1940447180, a:c, b:d
called combiner, a hash:572416449, a:b, b:cd
Result: bcd
--------------------------------------------------------------------------------
Start parallelReduceB_Append_A
identity hash: 245565335
called accumulator, a hash:245565335, a:, b:c
called accumulator, a hash:245565335, a:, b:b
called accumulator, a hash:476402209, a:c, b:d
called combiner, a hash:1490180672, a:b, b:dc
Result: dcb
--------------------------------------------------------------------------------
Start parallelCollectB_Append_A
called accumulator, a hash:358699161, a:, b:c
called accumulator, a hash:1802696844, a:, b:b
called accumulator, a hash:358699161, a:, b:d
called combiner, a hash:1802696844, a:, b:
Result:

Explanation of different scenario:

1. Reduce with a.append(b) result in "cdbcdb" :

Repeated sequence is due to using mutable identity - new StringBuilder() in reduce(Refer to this answer). The value of identity keeps changing when accumulator is called. All intermediate result is also stored in identity, so when combiner is called, two sequence is shown.

2. Collect with a.append(b) result in "bcd" :

As Javadoc state,

Performs a mutable reduction operation on the elements of this stream. A mutable reduction is one in which the reduced value is a mutable result container, such as an ArrayList, and elements are incorporated by updating the state of the result rather than by replacing the result

Hence this is a valid scenario to use StringBuilder

3. Reduce with b.append(a) result in "bdc" :

This time identity is not modified due to swapping the parameter, and intermediate result is stored in the StringBuilder in .map(s -> new StringBuilder(s)), hence no repeated sequence is shown.

4. Collect with b.append(a) result in "" :

Refer to the psuedocode of collect,

     R result = supplier.get();
for (T element : this stream)
accumulator.accept(result, element);
return result;

In the accumulator, we called b.append(a) which means result(a) is never modified, so it keeps its initial value as new StringBuilder().



Related Topics



Leave a reply



Submit