Java 8 Stream with Batch Processing

Java 8 Stream with batch processing

Note! This solution reads the whole file before running the forEach.

You could do it with jOOλ, a library that extends Java 8 streams for single-threaded, sequential stream use-cases:

Seq.seq(lazyFileStream)              // Seq<String>
.zipWithIndex() // Seq<Tuple2<String, Long>>
.groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
.forEach((index, batch) -> {
process(batch);
});

Behind the scenes, zipWithIndex() is just:

static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
final Iterator<T> it = stream.iterator();

class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
long index;

@Override
public boolean hasNext() {
return it.hasNext();
}

@Override
public Tuple2<T, Long> next() {
return tuple(it.next(), index++);
}
}

return seq(new ZipWithIndex());
}

... whereas groupBy() is API convenience for:

default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
return collect(Collectors.groupingBy(classifier));
}

(Disclaimer: I work for the company behind jOOλ)

Batch processing an ArrayList with LongStream using Java 8

map in LongStream is supposed to map an element of the LongStream to a long, not to a List.

Use mapToObj:

LongStream.range(0, (data.size()+BATCH-1)/BATCH)
.mapToObj(i -> data.subList((int)i*BATCH, (int)Math.min(data.size(), (i+1)*BATCH)))
.forEach(batch -> process(batch));

Or:

IntStream.range(0, (data.size()+BATCH-1)/BATCH)
.mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
.forEach(batch -> process(batch));

Is there a common Java utility to break a list into batches?

Check out Lists.partition(java.util.List, int) from Google Guava:

Returns consecutive sublists of a list, each of the same size (the final list may be smaller). For example, partitioning a list containing [a, b, c, d, e] with a partition size of 3 yields [[a, b, c], [d, e]] -- an outer list containing two inner lists of three and two elements, all in the original order.

Java Stream: transform user input to batches

In general it's problematic to solve your problem using Java-8 stream API. First, splitting the stream to fixed-size batches is impossible for parallel streams as the source might divide the task at some unknown offset, so you can not know in general case the index of current stream element until you actually process all the previous subtasks (which will kill the whole idea of parallelization). As Stream API idea is to work identically in parallel and sequential modes, it simply has no method to combine the stream elements into even batches. There are some third-party solutions which usually ignore the existence of parallel streams (like protonpack StreamUtils.windowed), but in general it's cleaner to generate batches from the very beginning instead of transforming the stream.

The second problem is that there's no ready facility in Java-8 to get the numbers (or at least tokens) from standard input as a Stream (you can only get lines using BufferedReader.lines()). It will be somewhat better in Java-9 as stream support is added to Scanner class (see JDK-8072722), but currently you need to do some extra steps.

Finally if you managed to create the Stream of number batches, you'll need to finish it as soon as standard input finishes. It's the job for takeWhile operation which also will appear in Java-9 only (see JDK-8071597).

I can afford a solution involving my StreamEx library, though I still don't like it much:

Scanner sc = new Scanner(System.in).useDelimiter("[\n\r,\\s]+");
Iterable<String> iterable = () -> sc;
// Supplier which returns Lists of up to 3 numbers from System.in
Supplier<List<Integer>> triples = () -> StreamEx.of(iterable.spliterator())
.map(Integer::valueOf).limit(3).toList();
StreamEx.generate(triples).takeWhile(list -> !list.isEmpty())
// replace with your own stream operations
// they will be executed as soon as three numbers are entered
.forEach(System.out::println);

The main feature of StreamEx used here is StreamEx.takeWhile which is the backport of Java-9 Stream.takeWhile.

If you prefer using jOOL, it would be even simpler:

Scanner sc = new Scanner(System.in).useDelimiter("[\n\r,\\s]+");
Supplier<List<Integer>> triples = () -> Seq.seq(sc).map(Integer::valueOf).limit(3).toList();
Seq.generate(triples).limitUntil(List::isEmpty)
.forEach(System.out::println);

The idea is the same. Spliterator creation is unnecessary here as jOOL has Seq.seq(Iterator) method.

Finally here's protonpack solution. I personally don't like this library, but the solution looks quite short, so somebody might prefer it:

import static com.codepoetics.protonpack.StreamUtils.*;

Scanner sc = new Scanner(System.in).useDelimiter("[\n\r,\\s]+");
Stream<List<Integer>> stream = takeUntil(windowed(
stream(() -> sc).map(Integer::valueOf), 3, 3), List::isEmpty);
stream.forEach(System.out::println);

The problem here is that it delays the processing of the batch for some reason until the next batch is formed. Also it does not create the final batch if it has less than 3 elements. This problem is fixed in the trunc, but is not released yet.



Related Topics



Leave a reply



Submit