Can You Split a Stream into Two Streams

Can you split a stream into two streams?

Not exactly. You can't get two Streams out of one; this doesn't make sense -- how would you iterate over one without needing to generate the other at the same time? A stream can only be operated over once.

However, if you want to dump them into a list or something, you could do

stream.forEach((x) -> ((x == 0) ? heads : tails).add(x));

Split stream into substreams with N elements

You can't split a Stream into 2 or more Streass easily and directly. The only way the procedural one consisting of collecting the elements to the List by the couples and mapping them back to Stream again:

Stream<Integer> s = Stream.of(1,2,3,4,5);
List<Integer> list = s.collect(Collectors.toList());
int size = list.size();

List<List<Integer>> temp = new ArrayList<>();
List<Integer> temp2 = new ArrayList<>();

int index = 0;
for (int i=0; i<size; i++) {
temp2.add(list.get(i));
if (i%2!=0) {
temp.add(temp2);
temp2 = new ArrayList<>();
}
if (i == size - 1) {
temp.add(temp2);
}
}
Stream<Stream<Integer>> stream = temp.stream().map(i -> i.stream());

As you see it's a really long way an not worth. Wouldn't be better to store the pairs in the List rather than Stream? The java-stream API is not used for data storage but their processing.

How to split a stream into multiple stream based on certain criteria?

This isn't so hard if you think a bit different here: find out the indexes where the negative value is and just do a subList between those... There are some quirks to do with IntStream.of(-1) (but I'll let you figure it out why it is like that: try to replace it with the more intuitive IntStream.of(0) and see what is going on). So having an input like:

ArrayList<Integer> arr = List.of(1, 2, 0, -1, 5, 8, 9, -11, 7, 13);

You first find out the indexes:

int[] indexes = IntStream.concat(
IntStream.of(-1),
IntStream.concat(
IntStream.range(0, arr.size())
.filter(x -> arr.get(x) < 0),
IntStream.of(arr.size())))
.toArray();

System.out.println(Arrays.toString(indexes));

This will give a result like: [-1, 3, 7, 10].

Thus just compute the subList between these:

IntStream.range(0, indexes.length - 1)
.mapToObj(x -> arr.subList(indexes[x] + 1, indexes[x + 1]))
.collect(Collectors.toList())
.forEach(System.out::println);

How to split a stream into two flows using GraphDSL

If you want to split data than you need Partition rather than Broadcast.

They are both fanout shapes, but whereas Broadcast sends incoming message to all its outputs, Partition allows you to use custom logic to decide which output a message should go to... which is what I think you're trying to do.

The documentation is pretty good with code samples:
https://doc.akka.io/docs/akka/current/stream/operators/Partition.html

Split Java stream into two lazy streams without terminal operation

You can implement a custom Spliterator in order to achieve such behavior. We will split your streams into the common "source" and the different "consumers". The custom spliterator then forwards the elements from the source to each consumer. For this purpose, we will use a BlockingQueue (see this question).

Note that the difficult part here is not the spliterator/stream, but the syncing of the consumers around the queue, as the comments on your question already indicate. Still, however you implement the syncing, Spliterator helps to use streams with it.

@SafeVarargs
public static <T> long streamForked(Stream<T> source, Consumer<Stream<T>>... consumers)
{
return StreamSupport.stream(new ForkingSpliterator<>(source, consumers), false).count();
}

private static class ForkingSpliterator<T>
extends AbstractSpliterator<T>
{
private Spliterator<T> sourceSpliterator;

private BlockingQueue<T> queue = new LinkedBlockingQueue<>();

private AtomicInteger nextToTake = new AtomicInteger(0);
private AtomicInteger processed = new AtomicInteger(0);

private boolean sourceDone;
private int consumerCount;

@SafeVarargs
private ForkingSpliterator(Stream<T> source, Consumer<Stream<T>>... consumers)
{
super(Long.MAX_VALUE, 0);

sourceSpliterator = source.spliterator();
consumerCount = consumers.length;

for (int i = 0; i < consumers.length; i++)
{
int index = i;
Consumer<Stream<T>> consumer = consumers[i];
new Thread(new Runnable()
{
@Override
public void run()
{
consumer.accept(StreamSupport.stream(new ForkedConsumer(index), false));
}
}).start();
}
}

@Override
public boolean tryAdvance(Consumer<? super T> action)
{
sourceDone = !sourceSpliterator.tryAdvance(queue::offer);
return !sourceDone;
}

private class ForkedConsumer
extends AbstractSpliterator<T>
{
private int index;

private ForkedConsumer(int index)
{
super(Long.MAX_VALUE, 0);

this.index = index;
}

@Override
public boolean tryAdvance(Consumer<? super T> action)
{
// take next element when it's our turn
while (!nextToTake.compareAndSet(index, index + 1))
{
}
T element;
while ((element = queue.peek()) == null)
{
if (sourceDone)
{
// element is null, and there won't be no more, so "terminate" this sub stream
return false;
}
}

// push to consumer pipeline
action.accept(element);

if (consumerCount == processed.incrementAndGet())
{
// start next round
queue.poll();
processed.set(0);
nextToTake.set(0);
}

return true;
}
}
}

With the approach used, the consumers work on each element in parallel, but wait for each other before starting on the next element.

Known issue
If one of the consumers is "shorter" than the others (e.g. because it calls limit()) it will also stop the other consumers and leave the threads hanging.


Example

public static void sleep(long millis)
{
try { Thread.sleep((long) (Math.random() * 30 + millis)); } catch (InterruptedException e) { }
}

streamForked(Stream.of("1", "2", "3", "4", "5"),
source -> source.map(word -> { sleep(50); return "fast " + word; }).forEach(System.out::println),
source -> source.map(word -> { sleep(300); return "slow " + word; }).forEach(System.out::println),
source -> source.map(word -> { sleep(50); return "2fast " + word; }).forEach(System.out::println));

fast 1
2fast 1
slow 1
fast 2
2fast 2
slow 2
2fast 3
fast 3
slow 3
fast 4
2fast 4
slow 4
2fast 5
fast 5
slow 5

How to divide a stream by object fields into two sets?

You can indeed use partitioningBy. You can specify what to do with each partition in the second parameter.

var map = foos.stream().collect(Collectors.partitioningBy(
foo -> foo.free >= 0, // assuming no 0
// for each partition, map to id and collect to set
Collectors.mapping(foo -> foo.id, Collectors.toSet())
));

map.get(true) will get you the set of ids with positive frees, and map.get(false) will get you the set of ids with negative frees.

How to split a stream into multiple streams in sequence

You can iterate over N and split your stream in two at each iteration using partition:

import { from, merge } from 'rxjs';
import { partition, map } from 'rxjs/operators';

const source = from(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']);

function split(source, n) {
const streams = [];
let toSplit = source;
for (let k = n; k > 0; --k) {
const [stream, rest] = toSplit.pipe(
partition((_, i) => i % k === 0)
);
streams.push(stream);
toSplit = rest;
}
return streams;
}

const obs = split(source, 3);

const subscribe = merge(
obs[0].pipe(map(val => `1: ${val}`)),
obs[1].pipe(map(val => `2: ${val}`)),
obs[2].pipe(map(val => `3: ${val}`)),
).subscribe(val => console.log(val));

See this StackBlitz example.

Java split stream by predicate into stream of streams

Frederico's answer is probably the nicest way for this particular problem. Following his last thought about custom Spliterator, I'll add an adapted version of an answer to a similar question, where I proposed using a custom iterator to created a chunked stream. This approach would also work on other streams that are not created by input readers.

public class StreamSplitter<T>
implements Iterator<Stream<T>>
{
private Iterator<T> incoming;
private Predicate<T> startOfNewEntry;
private T nextLine;

public static <T> Stream<Stream<T>> streamOf(Stream<T> incoming, Predicate<T> startOfNewEntry)
{
Iterable<Stream<T>> iterable = () -> new StreamSplitter<>(incoming, startOfNewEntry);
return StreamSupport.stream(iterable.spliterator(), false);
}

private StreamSplitter(Stream<T> stream, Predicate<T> startOfNewEntry)
{
this.incoming = stream.iterator();
this.startOfNewEntry = startOfNewEntry;
if (incoming.hasNext())
nextLine = incoming.next();
}

@Override
public boolean hasNext()
{
return nextLine != null;
}

@Override
public Stream<T> next()
{
List<T> nextEntrysLines = new ArrayList<>();
do
{
nextEntrysLines.add(nextLine);
} while (incoming.hasNext()
&& !startOfNewEntry.test((nextLine = incoming.next())));

if (!startOfNewEntry.test(nextLine)) // incoming does not have next
nextLine = null;

return nextEntrysLines.stream();
}
}

Example

public static void main(String[] args)
{
Stream<String> flat = Stream.of("Start of log entry 1",
" ...some log details",
" ...some log details",
"Start of log entry 2",
" ...some log details",
" ...some log details",
"Start of log entry 3",
" ...some log details",
" ...some log details");

StreamSplitter.streamOf(flat, line -> line.matches("Start of log entry.*"))
.forEach(logEntry -> {
System.out.println("------------------");
logEntry.forEach(System.out::println);
});
}

// Output
// ------------------
// Start of log entry 1
// ...some log details
// ...some log details
// ------------------
// Start of log entry 2
// ...some log details
// ...some log details
// ------------------
// Start of log entry 3
// ...some log details
// ...some log details

The iterator always looks one line ahead. As soon as that lline is the beginning of a new entry, it will wrapp the previous entry in a stream and return it as next. The factory method streamOf turns this iterator into a stream to be used as in the example I gave above.

I changed the split condition from a regex to a Predicate, so you can specify more complicated conditions with the help of multiple regexes, if-conditions, and so on.

Note that I only tested it with the example data above, so I don't know how it would behave with more complicated, errornous, or empty input.



Related Topics



Leave a reply



Submit