Partition a Java 8 Stream

Partition a Java 8 Stream

It's impossible to partition the arbitrary source stream to the fixed size batches, because this will screw up the parallel processing. When processing in parallel you may not know how many elements in the first sub-task after the split, so you cannot create the partitions for the next sub-task until the first is fully processed.

However it is possible to create the stream of partitions from the random access List. Such feature is available, for example, in my StreamEx library:

List<Type> input = Arrays.asList(...);

Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize);

Or if you really want the stream of streams:

Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream);

If you don't want to depend on third-party libraries, you can implement such ofSubLists method manually:

public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) {
if (length <= 0)
throw new IllegalArgumentException("length = " + length);
int size = source.size();
if (size <= 0)
return Stream.empty();
int fullChunks = (size - 1) / length;
return IntStream.range(0, fullChunks + 1).mapToObj(
n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length));
}

This implementation looks a little bit long, but it takes into account some corner cases like close-to-MAX_VALUE list size.


If you want parallel-friendly solution for unordered stream (so you don't care which stream elements will be combined in single batch), you may use the collector like this (thanks to @sibnick for inspiration):

public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize, 
Collector<List<T>, A, R> downstream) {
class Acc {
List<T> cur = new ArrayList<>();
A acc = downstream.supplier().get();
}
BiConsumer<Acc, T> accumulator = (acc, t) -> {
acc.cur.add(t);
if(acc.cur.size() == batchSize) {
downstream.accumulator().accept(acc.acc, acc.cur);
acc.cur = new ArrayList<>();
}
};
return Collector.of(Acc::new, accumulator,
(acc1, acc2) -> {
acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc);
for(T t : acc2.cur) accumulator.accept(acc1, t);
return acc1;
}, acc -> {
if(!acc.cur.isEmpty())
downstream.accumulator().accept(acc.acc, acc.cur);
return downstream.finisher().apply(acc.acc);
}, Collector.Characteristics.UNORDERED);
}

Usage example:

List<List<Integer>> list = IntStream.range(0,20)
.boxed().parallel()
.collect(unorderedBatches(3, Collectors.toList()));

Result:

[[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]]

Such collector is perfectly thread-safe and produces ordered batches for sequential stream.

If you want to apply an intermediate transformation for every batch, you may use the following version:

public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize,
Collector<T, AA, B> batchCollector,
Collector<B, A, R> downstream) {
return unorderedBatches(batchSize,
Collectors.mapping(list -> list.stream().collect(batchCollector), downstream));
}

For example, this way you can sum the numbers in every batch on the fly:

List<Integer> list = IntStream.range(0,20)
.boxed().parallel()
.collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue),
Collectors.toList()));

Partitioning a Map in Java 8+

You can reduce each group using toMap (as a downstream collector):

Map<String, String> myMap = new HashMap<>();
myMap.put("d", "D");
myMap.put("c", "C");
myMap.put("b", "B");
myMap.put("A", "A");

List<String> myList = Arrays.asList("a", "b", "c");

Map<Boolean, Map<String, String>> result = myMap.entrySet()
.stream()
.collect(Collectors.partitioningBy(
entry -> myList.contains(entry.getKey()),
Collectors.toMap(Entry::getKey, Entry::getValue)
)
);

And for this example, that produces {false={A=A, d=D}, true={b=B, c=C}}

Can i partition a stream combining with the grouping by functionality?

I tried the following and somhow it worked.

Map<Boolean, Map<Object, List<Person>>> rr = persons.stream()
.collect(Collectors.partitioningBy(p -> p.name.startsWith("P"),
Collectors.groupingBy(p -> p.age > 20)));

The output was as expected

rr = {false={false=[Max, David]}, true={false=[Pam], true=[Peter, Pamela]}}

But, i am not sure is it the efficient way to do this. Any suggestions?

Partition and map java streams in categories

You need a slightly different groupBy

  stream.collect(Collectors.groupingBy(A::getCategory, 
Collectors.mapping(A::data, Collectors.toList()));

Java Stream: divide into two lists by boolean predicate

Collectors.partitioningBy:

Map<Boolean, List<Employee>> partitioned = 
listOfEmployees.stream().collect(
Collectors.partitioningBy(Employee::isActive));

The resulting map contains two lists, corresponding to whether or not the predicate was matched:

List<Employee> activeEmployees = partitioned.get(true);
List<Employee> formerEmployees = partitioned.get(false);

There are a couple of reasons to use partitioningBy over groupingBy (as suggested by Juan Carlos Mendoza):

Firstly, the parameter of groupingBy is a Function<Employee, Boolean> (in this case), and so there is a possibility of passing it a function which can return null, meaning there would be a 3rd partition if that function returns null for any of the employees. partitioningBy uses a Predicate<Employee>, so it can only ever return 2 partitions. which would result in a NullPointerException being thrown by the collector: whilst not documented explicitly, an exception is explicitly thrown for null keys, presumably because of the behavior of Map.computeIfAbsent that "If the function returns null no mapping is recorded", meaning elements would otherwise be dropped silently from the output. (Thanks to lczapski for pointing this out).

Secondly, you get two lists (*) in the resulting map with partitioningBy; with groupingBy, you only get key/value pairs where elements map to the given key:

System.out.println(
Stream.empty().collect(Collectors.partitioningBy(a -> false)));
// Output: {false=[], true=[]}

System.out.println(
Stream.empty().collect(Collectors.groupingBy(a -> false)));
// Output: {}

(*) This behavior isn't documented in the Java 8 Javadoc, but it was added for Java 9.

Java 8 partition list

I have tried my own solution with a custom made Collector. I hope someone will find it useful, or help me improve it.

class PartitioningCollector<T> implements Collector<T, List<List<T>>, List<List<T>>> {

private final int batchSize;
private final List<T> batch;

public PartitioningCollector(int batchSize) {
this.batchSize = batchSize;
this.batch = new ArrayList<>(batchSize);
}

@Override
public Supplier<List<List<T>>> supplier() {
return LinkedList::new;
}

@Override
public BiConsumer<List<List<T>>, T> accumulator() {
return (total, element) -> {
batch.add(element);
if (batch.size() >= batchSize) {
total.add(new ArrayList<>(batch));
batch.clear();
}
};
}

@Override
public BinaryOperator<List<List<T>>> combiner() {
return (left, right) -> {
List<List<T>> result = new ArrayList<>();
result.addAll(left);
result.addAll(left);
return result;
};
}

@Override
public Function<List<List<T>>, List<List<T>>> finisher() {
return result -> {
if (!batch.isEmpty()) {
result.add(new ArrayList<>(batch));
batch.clear();
}
return result;
};
}

@Override
public Set<Characteristics> characteristics() {
return emptySet();
}
}

Can you split a stream into two streams?

Not exactly. You can't get two Streams out of one; this doesn't make sense -- how would you iterate over one without needing to generate the other at the same time? A stream can only be operated over once.

However, if you want to dump them into a list or something, you could do

stream.forEach((x) -> ((x == 0) ? heads : tails).add(x));

Java 8 partition data from a list to two separate list

First of all, your two conditions do not partition the original List. i.e. the second condition is not the negation of the first.

If you want a true partitioning, you can use Collectors.partitioningBy:

Map<Boolean, List<ProductHolder>> partition = 
holderList.stream()
.collect(Collectors.partitioningBy(holder-> (holder.getRecord().isX() || holder.getRecord().isY())));

The first partition (that satisfies the condition) will be obtained via partition.get(true), and the second via partition.get(false).

How to partition a list by predicate using java8?

Like it was explained in @RealSkeptic comment Predicate can return only two results: true and false. This means you would be able to split your data only in two groups.

What you need is some kind of Function which will allow you to determine some common result for elements which should be grouped together. In your case such result could be first character in its lowercase (assuming that all strings are not empty - have at least one character).

Now with Collectors.groupingBy(function) you can group all elements in separate Lists and store them in Map where key will be common result used for grouping (like first character).

So your code can look like

Function<String, Character> firstChar =  s -> Character.toLowerCase(s.charAt(0));

List<String> a = Arrays.asList("foo", "Abc", "bar", "baz", "aBc");
Map<Character, List<String>> collect = a.stream()
.collect(Collectors.groupingBy(firstChar));

System.out.println(collect);

Output:

{a=[Abc, aBc], b=[bar, baz], f=[foo]}

Partition a Stream by a discriminator function

The solution requires us to define a custom Spliterator which can be used to construct the partitioned stream. We shall need to access the input stream through its own spliterator and wrap it into ours. The output stream is then constructed from our custom spliterator.

The following Spliterator will turn any Stream<E> into a Stream<List<E>> provided a Function<E, ?> as the discriminator function. Note that the input stream must be ordered for this operation to make sense.

import java.util.*;
import java.util.Spliterators.AbstractSpliterator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static java.util.Comparator.naturalOrder;

public class PartitionBySpliterator<E> extends AbstractSpliterator<List<E>> {
private final Spliterator<E> spliterator;
private final Function<? super E, ?> partitionBy;
private HoldingConsumer<E> holder;
private Comparator<List<E>> comparator;

public PartitionBySpliterator(
Spliterator<E> toWrap,
Function<? super E, ?> partitionBy
) {
super(Long.MAX_VALUE, toWrap.characteristics() & ~SIZED | NONNULL);
this.spliterator = toWrap;
this.partitionBy = partitionBy;
}

public static <E> Stream<List<E>> partitionBy(
Function<E, ?> partitionBy, Stream<E> in
) {
return StreamSupport.stream(
new PartitionBySpliterator<>(in.spliterator(), partitionBy), false);
}

@Override
public boolean tryAdvance(Consumer<? super List<E>> action) {
final HoldingConsumer<E> h;
if (holder == null) {
h = new HoldingConsumer<>();
if (!spliterator.tryAdvance(h)) {
return false;
}
holder = h;
} else {
h = holder;
}
final ArrayList<E> partition = new ArrayList<>();
final Object partitionKey = partitionBy.apply(h.value);
boolean didAdvance;
do {
partition.add(h.value);
}
while ((didAdvance = spliterator.tryAdvance(h))
&& Objects.equals(partitionBy.apply(h.value), partitionKey));
if (!didAdvance) {
holder = null;
}
action.accept(partition);
return true;
}

static final class HoldingConsumer<T> implements Consumer<T> {
T value;

@Override
public void accept(T value) {
this.value = value;
}
}

@Override
public Comparator<? super List<E>> getComparator() {
final Comparator<List<E>> c = this.comparator;
return c != null ? c : (this.comparator = comparator());
}

private Comparator<List<E>> comparator() {
@SuppressWarnings({"unchecked", "rawtypes"})
final Comparator<? super E> innerComparator =
Optional.ofNullable(spliterator.getComparator())
.orElse((Comparator) naturalOrder());
return (left, right) -> {
final int c = innerComparator.compare(left.get(0), right.get(0));
return c != 0 ? c : innerComparator.compare(
left.get(left.size() - 1), right.get(right.size() - 1));
};
}
}


Related Topics



Leave a reply



Submit