Is This a Bug in Files.Lines(), or am I Misunderstanding Something About Parallel Streams

Is this a bug in Files.lines(), or am I misunderstanding something about parallel streams?

Since the current state of the issue is quite the opposite of the earlier statements made here, it should be noted, that there is now an explicit statement by Brian Goetz about the back-propagation of the unordered characteristic past a skip operation is considered a bug. It’s also stated that it is now considered to have no back-propagation of the ordered-ness of a terminal operation at all.

There is also a related bug report, JDK-8129120 whose status is “fixed in Java 9” and it’s backported to Java 8, update 60

I did some tests with jdk1.8.0_60 and it seems that the implementation now indeed exhibits the more intuitive behavior.

Does a good use case exist for skip() on parallel streams?

The choice of sequential vs parallel is simply one of execution strategy. The option for parallelism exists so that, if the specifics of the problem (problem size, choice of stream operations, computational work per element, available processors, memory bandwidth, etc) permit, then a performance benefit may be gained by going parallel. Not all combinations of these specifics will admit a performance benefit (and some may even garner a penalty), so we leave it to the user to separately specify the operations from the execution strategy.

For operations like skip() or limit(), which are intrinsically tied to encounter order, it is indeed hard to extract a lot of parallelism, but it is possible; this generally occurs when the computational work per element (often called 'Q') is very high.

Such cases are probably rare (which might be your point); this doesn't make the combination of operation and execution mode "useless", simply of limited usefulness. But one doesn't design a API with multiple dimensions (operations, execution modes) based on the combinations that one can imagine is useful; assuming each combination has a sensible semantics (which it does in this case), it is best to allow all operations in all modes and let the users decide which is useful for them.

Why Hashmap.values().parallelStream() does not run in parallel while wrap them in ArrayList could work?

As explained in this answer, the issue is connected with the fact that the HashMap has a capacity potentially larger than its size and the actual values are distributed over the backing array based on their hash codes.

The splitting logic is basically the same for all array based spliterators, whether you stream over an array, an ArrayList, or a HashMap. To get balanced splits on a best-effort basis, each split will half the (index) range, but in case of HashMap, the number of actual elements within the range differs from the range size.

In principle, every range based spliterator can split down to single elements, however, the client code, i.e. the Stream API implementation, might not split so far. The decision for even attempting to split is driven by the expected number of elements and number of CPU cores.

Taking the following program

public static void main(String[] args) {
Map<String, Integer> map = new HashMap<>();
map.put("a", 1);
map.put("b", 2);

for(int depth: new int[] { 1, 2, Integer.MAX_VALUE }) {
System.out.println("With max depth: "+depth);
Tree<Spliterator<Map.Entry<String, Integer>>> spTree
= split(map.entrySet().spliterator(), depth);
Tree<String> valueTree = spTree.map(sp -> "estimated: "+sp.estimateSize()+" "
+StreamSupport.stream(sp, false).collect(Collectors.toList()));
System.out.println(valueTree);
}
}

private static <T> Tree<Spliterator<T>> split(Spliterator<T> sp, int depth) {
Spliterator<T> prefix = depth-- > 0? sp.trySplit(): null;
return prefix == null?
new Tree<>(sp): new Tree<>(null, split(prefix, depth), split(sp, depth));
}

public static class Tree<T> {
final T value;
List<Tree<T>> children;

public Tree(T value) {
this.value = value;
children = Collections.emptyList();
}
public Tree(T value, Tree<T>... ch) {
this.value = value;
children = Arrays.asList(ch);
}
public <U> Tree<U> map(Function<? super T, ? extends U> f) {
Tree<U> t = new Tree<>(value == null? null: f.apply(value));
if(!children.isEmpty()) {
t.children = new ArrayList<>(children.size());
for(Tree<T> ch: children) t.children.add(ch.map(f));
}
return t;
}
public @Override String toString() {
if(children.isEmpty()) return value == null? "": value.toString();
final StringBuilder sb = new StringBuilder(100);
toString(sb, 0, 0);
return sb.toString();
}
public void toString(StringBuilder sb, int preS, int preEnd) {
final int myHandle = sb.length() - 2;
sb.append(value == null? "": value).append('\n');
final int num = children.size() - 1;
if (num >= 0) {
if (num != 0) {
for (int ix = 0; ix < num; ix++) {
int nPreS = sb.length();
sb.append(sb, preS, preEnd);
sb.append("\u2502 ");
int nPreE = sb.length();
children.get(ix).toString(sb, nPreS, nPreE);
}
}
int nPreS = sb.length();
sb.append(sb, preS, preEnd);
final int lastItemHandle = sb.length();
sb.append(" ");
int nPreE = sb.length();
children.get(num).toString(sb, nPreS, nPreE);
sb.setCharAt(lastItemHandle, '\u2514');
}
if (myHandle > 0) {
sb.setCharAt(myHandle, '\u251c');
sb.setCharAt(myHandle + 1, '\u2500');
}
}
}

you will get:

With max depth: 1

├─estimated: 1 [a=1, b=2]
└─estimated: 1 []

With max depth: 2

├─
│ ├─estimated: 0 [a=1, b=2]
│ └─estimated: 0 []
└─
├─estimated: 0 []
└─estimated: 0 []

With max depth: 2147483647

├─
│ ├─
│ │ ├─
│ │ │ ├─estimated: 0 []
│ │ │ └─estimated: 0 [a=1]
│ │ └─
│ │ ├─estimated: 0 [b=2]
│ │ └─estimated: 0 []
│ └─
│ ├─
│ │ ├─estimated: 0 []
│ │ └─estimated: 0 []
│ └─
│ ├─estimated: 0 []
│ └─estimated: 0 []
└─
├─
│ ├─
│ │ ├─estimated: 0 []
│ │ └─estimated: 0 []
│ └─
│ ├─estimated: 0 []
│ └─estimated: 0 []
└─
├─
│ ├─estimated: 0 []
│ └─estimated: 0 []
└─
├─estimated: 0 []
└─estimated: 0 []

On ideone

So, as said, the spliterator can split down to individual elements if we split deep enough, however, the estimated size of two elements does not suggest that it’s worth doing that. On each split, it will halve the estimate and while you might say that it’s wrong for the elements you’re interested in, it’s actually correct for most spliterators here, as when going down to the maximum level, most spliterators are representing an empty range and splitting them turns out to be a waste of resources.

As said in the other answer, the decision is about balancing the work of splitting (or preparation in general) and the expected work to parallelize, which the Stream implementation can’t know in advance. If you know in advance that the per-element workload will be very high, to justify more preparation work, you can use, e.g. new ArrayList<>(map.[keySet|entrySet|values]()) .parallelStream() to enforce balanced splits. Usually, the problem will be much smaller for larger maps anyway.

stream parallel skip - does the order of the chained stream methods make any difference?

You actually have two questions in one, the first being whether it makes a difference in writing stream.parallel().skip(1) or stream.skip(1).parallel(), the second being whether either or both will always skip the first element. See also “loaded question”.

The first answer is that it makes no difference, because specifying a .sequential() or .parallel() execution policy affects the entire Stream pipeline, regardless of where you place it in the call chain—of course, unless you specify multiple contradicting policies, in which case the last one wins.

So in either case you are requesting a parallel execution which might affect the outcome of the skip operation, which is subject of the second question.

The answer is not that simple. If the Stream has no defined encounter order in the first place, an arbitrary element might get skipped, which is a consequence of the fact that there is no “first” element, even if there might be an element you encounter first when iterating over the source.

If you have an ordered Stream, skip(1) should skip the first element, but this has been laid down only recently. As discussed in “Stream.skip behavior with unordered terminal operation”, chaining an unordered terminal operation had an effect on the skip operation in earlier implementations and there was some uncertainty of whether this could even be intentional, as visible in “Is this a bug in Files.lines(), or am I misunderstanding something about parallel streams?”, which happens to be close to your code; apparently skipping the first line is a common case.

The final word is that the behavior of earlier JREs is a bug and skip(1) on an ordered stream should skip the first element, even when the stream pipeline is executed in parallel and the terminal operation is unordered. The associated bug report names jdk1.8.0_60 as first fixed version, which I could verify. So if you are using on older implementation, you might experience the Stream skipping different elements when using .parallel() and the unordered .forEach(…) terminal operation. It’s not contradicting if the implementation occasionally skips the expected element, that’s the unpredictability of multi-threading.

So the answer still is that stream.parallel().skip(1) and stream.skip(1).parallel() have the same behavior, even when being used in earlier versions, as both are equally unpredictable when being used with an unordered terminal operation like forEach. They should always skip the first element with ordered Streams and when being used with 1.8.0_60 or newer, they do.

How to properly close a variable amount of streams?

You could write a composite AutoCloseable for managing a dynamic amount of AutoCloseable:

import java.util.ArrayList;
import java.util.List;

public class CompositeAutoclosable<T extends AutoCloseable> implements AutoCloseable {
private final List<T> components= new ArrayList<>();

public void addComponent(T component) { components.add(component); }

public List<T> getComponents() { return components; }

@Override
public void close() throws Exception {
Exception e = null;
for (T component : components) {
try { component.close(); }
catch (Exception closeException) {
if (e == null) { e = closeException; }
else { e.addSuppressed(closeException); }
}
}
if (e != null) { throw e; }
}
}

and you could use it in your method:

private static void foo(String path, String... files) throws Exception {
try (CompositeAutoclosable<Stream<String>> streams
= new CompositeAutoclosable<Stream<String>>()) {
for (int i = 0; i < files.length; i++) {
final String file = files[i];
streams.addComponent(Files.lines(Paths.get(path, file))
.onClose(() -> System.out.println("Closed " + file)));
}
streams.getComponents().stream()
.parallel()
.flatMap(x -> x)
.distinct()
.sorted()
.limit(10)
.forEach(System.out::println);
}
}

When does a Java intermediate operation hit every element?

This is indeed not obvious because the actual reason is rather historical. The first release version of Java 8 had a Stream implementation which did consider the unordered nature of findAny, at least in some cases.

Not in the case of a skippable sorted but in more cases than correct. This has been discussed in

  • Is this a bug in Files.lines(), or am I misunderstanding something about parallel streams? and
  • Stream.skip behavior with unordered terminal operation

In the latter Q&A we find a comment from Brian Goetz, one of the architects:

After some analysis we chose to back out back-propagation entirely. The only place where this pays off is optimizing away a sort; if you have a pipeline that sorts into an unordered terminal op, that's probably a user error anyway.

So the current state of the reference implementation is that the unordered nature of the terminal operation is not used for optimizing preceding intermediate operations.

This doesn’t imply that such optimization wouldn’t be valid; the example of skipping a sort when the subsequent operations do not care about the order is explicitly mentioned as valid, though Brian Goetz suggests that such a scenario might be a mistake on the developer’s side anyway.

I have a slightly different point of view, as I think it would be a useful scenario if you have Stream returning method, which may return a sorted Stream, while operations chained by the caller determine whether this sorting is relevant for the final result or not.

But whether such an optimization happens or not does not affect the correctness of the result. Note that this does not only apply to unordered operations.

When you have

Optional<Integer> res = IntStream.rangeClosed(1, 10)
.mapToObj(x -> {
System.out.println("first map: " + x);
return 2*x;
})
.sorted((a,b)-> {
System.out.println("sorting " + a + " : " + b);
return Integer.compare(a, b);
})
.map(x -> {
System.out.println("second map: " + x);
return 2*x;
})
.findFirst();

it would be perfectly valid to transform the operation to (the equivalent of)

Optional<Integer> res = IntStream.rangeClosed(1, 10)
.mapToObj(x -> {
System.out.println("first map: " + x);
return 2*x;
})
.min((a,b)-> {
System.out.println("sorting " + a + " : " + b);
return Integer.compare(a, b);
})
.map(x -> {
System.out.println("second map: " + x);
return 2*x;
});

which would be more efficient and still return the correct result for the terminal operation while the evaluation order of the first mapping function and the comparator is different.

But the correct result is all that matters. You should never assume a specific evaluation order at all. You don’t find a specification of the evaluation order in the manual, because it has been left out intentionally. The presence or absence of optimizations can change at any time. If you want a dramatic example, replace your findAny() with count() and compare your example’s behavior in Java 8 and Java 9 (or newer).

// run with Java 8 and Java 9+ and compare...
long count = IntStream.rangeClosed(1, 10)
.mapToObj(x -> {
System.out.println("first map: " + x);
return 2*x;
})
.sorted((a,b)-> {
System.out.println("sorting " + a + " : " + b);
return Integer.compare(a, b);
})
.map(x -> {
System.out.println("second map: " + x);
return 2*x;
})
.count();
System.out.println("found " + count + " elements");

Java 8 streams and concurrent writes

  1. ArrayList is not a synchronized collection, so yes it will cause a RACE condition. All the methods that change the state of the vector are synchronized so you did not find any problem over there.

  2. The write method of BufferedWriter is synchronized, so all the writes will be consistent across the threads. So individual write operation in the file will be thread safe. But you will need to explicitly handle synchronization to make it consistent across the threads.

Here is the code snippet of write method in Java 6.

public void write(String s, int off, int len) throws IOException {

synchronized (lock) {

ensureOpen();

int b = off, t = off + len;

while (b < t) {

int d = min(nChars - nextChar, t - b);
s.getChars(b, b + d, cb, nextChar);
b += d;
nextChar += d;

if (nextChar >= nChars)
flushBuffer();
}
}
}
}


Related Topics



Leave a reply



Submit