Java 8 Stream with batch processing
Note! This solution reads the whole file before running the forEach.
You could do it with jOOλ, a library that extends Java 8 streams for single-threaded, sequential stream use-cases:
Seq.seq(lazyFileStream) // Seq<String>
.zipWithIndex() // Seq<Tuple2<String, Long>>
.groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
.forEach((index, batch) -> {
process(batch);
});
Behind the scenes, zipWithIndex()
is just:
static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
final Iterator<T> it = stream.iterator();
class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
long index;
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public Tuple2<T, Long> next() {
return tuple(it.next(), index++);
}
}
return seq(new ZipWithIndex());
}
... whereas groupBy()
is API convenience for:
default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
return collect(Collectors.groupingBy(classifier));
}
(Disclaimer: I work for the company behind jOOλ)
Batch processing an ArrayList with LongStream using Java 8
map
in LongStream
is supposed to map an element of the LongStream
to a long
, not to a List
.
Use mapToObj
:
LongStream.range(0, (data.size()+BATCH-1)/BATCH)
.mapToObj(i -> data.subList((int)i*BATCH, (int)Math.min(data.size(), (i+1)*BATCH)))
.forEach(batch -> process(batch));
Or:
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
.mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
.forEach(batch -> process(batch));
Is there a common Java utility to break a list into batches?
Check out Lists.partition(java.util.List, int)
from Google Guava:
Returns consecutive sublists of a list, each of the same size (the final list may be smaller). For example, partitioning a list containing
[a, b, c, d, e]
with a partition size of 3 yields[[a, b, c]
,[d, e]]
-- an outer list containing two inner lists of three and two elements, all in the original order.
Java Stream: transform user input to batches
In general it's problematic to solve your problem using Java-8 stream API. First, splitting the stream to fixed-size batches is impossible for parallel streams as the source might divide the task at some unknown offset, so you can not know in general case the index of current stream element until you actually process all the previous subtasks (which will kill the whole idea of parallelization). As Stream API idea is to work identically in parallel and sequential modes, it simply has no method to combine the stream elements into even batches. There are some third-party solutions which usually ignore the existence of parallel streams (like protonpack StreamUtils.windowed
), but in general it's cleaner to generate batches from the very beginning instead of transforming the stream.
The second problem is that there's no ready facility in Java-8 to get the numbers (or at least tokens) from standard input as a Stream (you can only get lines using BufferedReader.lines()
). It will be somewhat better in Java-9 as stream support is added to Scanner
class (see JDK-8072722), but currently you need to do some extra steps.
Finally if you managed to create the Stream of number batches, you'll need to finish it as soon as standard input finishes. It's the job for takeWhile
operation which also will appear in Java-9 only (see JDK-8071597).
I can afford a solution involving my StreamEx library, though I still don't like it much:
Scanner sc = new Scanner(System.in).useDelimiter("[\n\r,\\s]+");
Iterable<String> iterable = () -> sc;
// Supplier which returns Lists of up to 3 numbers from System.in
Supplier<List<Integer>> triples = () -> StreamEx.of(iterable.spliterator())
.map(Integer::valueOf).limit(3).toList();
StreamEx.generate(triples).takeWhile(list -> !list.isEmpty())
// replace with your own stream operations
// they will be executed as soon as three numbers are entered
.forEach(System.out::println);
The main feature of StreamEx used here is StreamEx.takeWhile
which is the backport of Java-9 Stream.takeWhile
.
If you prefer using jOOL, it would be even simpler:
Scanner sc = new Scanner(System.in).useDelimiter("[\n\r,\\s]+");
Supplier<List<Integer>> triples = () -> Seq.seq(sc).map(Integer::valueOf).limit(3).toList();
Seq.generate(triples).limitUntil(List::isEmpty)
.forEach(System.out::println);
The idea is the same. Spliterator
creation is unnecessary here as jOOL has Seq.seq(Iterator)
method.
Finally here's protonpack solution. I personally don't like this library, but the solution looks quite short, so somebody might prefer it:
import static com.codepoetics.protonpack.StreamUtils.*;
Scanner sc = new Scanner(System.in).useDelimiter("[\n\r,\\s]+");
Stream<List<Integer>> stream = takeUntil(windowed(
stream(() -> sc).map(Integer::valueOf), 3, 3), List::isEmpty);
stream.forEach(System.out::println);
The problem here is that it delays the processing of the batch for some reason until the next batch is formed. Also it does not create the final batch if it has less than 3 elements. This problem is fixed in the trunc, but is not released yet.
Related Topics
Why Have One Jvm Per Application
@Transactional(Propagation=Propagation.Required)
How to Get Annotations of a Member Variable
How Do Java Method Annotations Work in Conjunction with Method Overriding
Freemarker Iterating Over Hashmap Keys
Other Ways of Singleton in Java
How to Send Data to Com Port Using Java
What Does the 'New' Keyword Actually Do in Java, and Should I Avoid Creating New Objects
Compareto with Primitives -> Integer/Int
Make Hibernate Ignore Instance Variables That Are Not Mapped
How to Copy the Contents of One Arraylist into Another
How to Declare a 2D String Arraylist
How to Pass C Structs Back and Forth to Java Code in Jni
Should You Call Releasestringutfchars If Getstringutfchars Returned a Copy