Why Should I Use Concurrent Characteristic in Parallel Stream with Collect

Concurrent data structures and parallel streams when using collect

There will be no behavioral change when you use ConcurrentHashMap instead of HashMap with the three-arg collect method. To change the behavior, you need a Collector which reports the CONCURRENT characteristic and there is no way to specify characteristics with the ad-hoc collector.

Further, the operation must be unordered to enable a parallel collect operation where all threads accumulate into a single container. The operation may be unordered due to the stream properties, either intrinsically, e.g. when streaming over an unordered source like a HashSet, or explicitly via unordered(), e.g.

Map<Integer, Boolean> resultMap = integers.limit(1000)
.unordered()
.collect(Collector.of(
() -> new ConcurrentHashMap<>(),
(map, value) -> map.put(value, false),
(m1,m2) -> { m1.putAll(m2); return m1; },
Collector.Characteristics.CONCURRENT));

or due to the UNORDERED characteristic of the collector:

Map<Integer, Boolean> resultMap = integers.limit(1000)
.collect(Collector.of(
() -> new ConcurrentHashMap<>(),
(map, value) -> map.put(value, false),
(m1,m2) -> { m1.putAll(m2); return m1; },
Collector.Characteristics.CONCURRENT, Collector.Characteristics.UNORDERED));

The latter is what you get when using the builtin collector:

Map<Integer, Boolean> resultMap = integers.limit(1000)
.collect(Collectors.toConcurrentMap(Function.identity(), i -> Boolean.FALSE));

toConcurrentMap will always be CONCURRENT and UNORDERED and requires a ConcurrentMap when you use a map supplier, whereas toMap is never CONCURRENT, even if you provide a supplier which creates instances of a ConcurrentMap implementation.

Java 8 Parallel Stream Concurrent Grouping

You can do that by using the of static factory method from Collector:

Map<String, Set<Person>> groupBy = persons.parallelStream()
.collect(Collector.of(
ConcurrentHashMap::new,
( map, person ) -> {
map.computeIfAbsent(person.name, k -> new HashSet<>()).add(person);
map.computeIfAbsent(person.uid, k -> new HashSet<>()).add(person);
map.computeIfAbsent(person.phone, k -> new HashSet<>()).add(person);
},
( a, b ) -> {
b.forEach(( key, set ) -> a.computeIfAbsent(key, k -> new HashSet<>()).addAll(set));
return a;
}
));

As Holger in the comments suggested, following approach can be preferred over the above one:

Map<String, Set<Person>> groupBy = persons.parallelStream()
.collect(HashMap::new, (m, p) -> {
m.computeIfAbsent(p.name, k -> new HashSet<>()).add(p);
m.computeIfAbsent(p.uid, k -> new HashSet<>()).add(p);
m.computeIfAbsent(p.phone, k -> new HashSet<>()).add(p);
}, (a, b) -> b.forEach((key, set) -> {
a.computeIfAbsent(key, k -> new HashSet<>()).addAll(set));
});

It uses the overloaded collect method which acts identical to my suggested statement above.

Parallel streams, collectors and thread safety

Can non-concurrent collectors be safely used with a parallel stream or should I only use the concurrent versions when collecting from a parallel stream?

It is safe to use a non-concurrent collector in a collect operation of a parallel stream.

In the specification of the Collector interface, in the section with half a dozen bullet points, is this:

For non-concurrent collectors, any result returned from the result supplier, accumulator, or combiner functions must be serially thread-confined. This enables collection to occur in parallel without the Collector needing to implement any additional synchronization. The reduction implementation must manage that the input is properly partitioned, that partitions are processed in isolation, and combining happens only after accumulation is complete.

This means that the various implementations provided by the Collectors class can be used with parallel streams, even though some of those implementations might not be concurrent collectors. This also applies to any of your own non-concurrent collectors that you might implement. They can be used safely with parallel streams, provided your collectors don't interfere with the stream source, are side-effect free, order independent, etc.

I also recommend reading the Mutable Reduction section of the java.util.stream package documentation. In the middle of this section is an example that is stated to be parallelizable, but which collects results into an ArrayList, which is not thread-safe.

The way this works is that a parallel stream ending in a non-concurrent collector makes sure that different threads are always operating on different instances of the intermediate result collections. That's why a collector has a Supplier function, for creating as many intermediate collections as there are threads, so each thread can accumulate into its own. When intermediate results are to be merged, they are handed off safely between threads, and at any given time only a single thread is merging any pair of intermediate results.

Confusion about Characteristics.UNORDERED in Java 8 in action book

That is simply wrong. Even adding CONCURRENT characteristic here is wrong, as you would need a thread safe data structure in the Supplier.

Is it safe to stream a concurrent collection while it is modified externally?

From the documentation in link you have provided

For most data sources, preventing interference means ensuring that the
data source is not modified at all during the execution of the stream
pipeline. The notable exception to this are streams whose sources are
concurrent collections, which are specifically designed to handle
concurrent modification. Concurrent stream sources are those whose
Spliterator reports the CONCURRENT characteristic

From documentation of the SplitIterator (its CONCURRENT characteristic)

static final int CONCURRENT

Characteristic value signifying that the element source may be safely
concurrently modified (allowing additions, replacements, and/or
removals) by multiple threads without external synchronization. If so,
the Spliterator is expected to have a documented policy concerning the
impact of modifications during traversal.

This is implementation from Collection interface stream method (which is not overridden in ConcurrentLinkedQueue)

default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}

So as long as ConcurrentLinkedQueue uses CONCURRENT SplitIterator (which it does) I would assume you can safely iterate your ConcurrentLinkedQueue using stream().

What is the difference between Collectors.toConcurrentMap and converting a Map to ConcurrentHashMap via Collectors.toMap supplier option?

There is a difference between them when dealing with parallel streams.

toMap -> is a non-concurrent collector

toConcurrentMap -> is a concurrent collector (this can be seen from their characteristics).

The difference is that toMap will create multiple intermediate results and then will merge then together (the Supplier of such a Collector will be called multiple times), while toConcurrentMap will create a single result and each Thread will throw results at it (the Supplier of such a Collector will be called only once)

Why is this important? This deals with insertion order (if that matters).

toMap will insert values in the resulting Map in encounter order by merging multiple intermediate results (Supplier of that collector is called multiple time as well as the Combiner)

toConcurrentMap will collect elements in any order (undefined) by throwing all elements at a common result container (ConcurrentHashMap in this case). Supplier is called only once, Accumulator many times and Combiner never.

The small caveat here is that for a CONCURRENT collector to not invoke the merger: either the stream has to have the UNORDERED flag - either via the unordered() explicit call or when the source of the stream is not ordered (a Set for example).



Related Topics



Leave a reply



Submit