Concurrent data structures and parallel streams when using collect
There will be no behavioral change when you use ConcurrentHashMap
instead of HashMap
with the three-arg collect
method. To change the behavior, you need a Collector
which reports the CONCURRENT
characteristic and there is no way to specify characteristics with the ad-hoc collector.
Further, the operation must be unordered to enable a parallel collect operation where all threads accumulate into a single container. The operation may be unordered due to the stream properties, either intrinsically, e.g. when streaming over an unordered source like a HashSet
, or explicitly via unordered()
, e.g.
Map<Integer, Boolean> resultMap = integers.limit(1000)
.unordered()
.collect(Collector.of(
() -> new ConcurrentHashMap<>(),
(map, value) -> map.put(value, false),
(m1,m2) -> { m1.putAll(m2); return m1; },
Collector.Characteristics.CONCURRENT));
or due to the UNORDERED
characteristic of the collector:
Map<Integer, Boolean> resultMap = integers.limit(1000)
.collect(Collector.of(
() -> new ConcurrentHashMap<>(),
(map, value) -> map.put(value, false),
(m1,m2) -> { m1.putAll(m2); return m1; },
Collector.Characteristics.CONCURRENT, Collector.Characteristics.UNORDERED));
The latter is what you get when using the builtin collector:
Map<Integer, Boolean> resultMap = integers.limit(1000)
.collect(Collectors.toConcurrentMap(Function.identity(), i -> Boolean.FALSE));
toConcurrentMap
will always be CONCURRENT
and UNORDERED
and requires a ConcurrentMap
when you use a map supplier, whereas toMap
is never CONCURRENT
, even if you provide a supplier which creates instances of a ConcurrentMap
implementation.
Java 8 Parallel Stream Concurrent Grouping
You can do that by using the of
static factory method from Collector
:
Map<String, Set<Person>> groupBy = persons.parallelStream()
.collect(Collector.of(
ConcurrentHashMap::new,
( map, person ) -> {
map.computeIfAbsent(person.name, k -> new HashSet<>()).add(person);
map.computeIfAbsent(person.uid, k -> new HashSet<>()).add(person);
map.computeIfAbsent(person.phone, k -> new HashSet<>()).add(person);
},
( a, b ) -> {
b.forEach(( key, set ) -> a.computeIfAbsent(key, k -> new HashSet<>()).addAll(set));
return a;
}
));
As Holger in the comments suggested, following approach can be preferred over the above one:
Map<String, Set<Person>> groupBy = persons.parallelStream()
.collect(HashMap::new, (m, p) -> {
m.computeIfAbsent(p.name, k -> new HashSet<>()).add(p);
m.computeIfAbsent(p.uid, k -> new HashSet<>()).add(p);
m.computeIfAbsent(p.phone, k -> new HashSet<>()).add(p);
}, (a, b) -> b.forEach((key, set) -> {
a.computeIfAbsent(key, k -> new HashSet<>()).addAll(set));
});
It uses the overloaded collect
method which acts identical to my suggested statement above.
Parallel streams, collectors and thread safety
Can non-concurrent collectors be safely used with a parallel stream or should I only use the concurrent versions when collecting from a parallel stream?
It is safe to use a non-concurrent collector in a collect
operation of a parallel stream.
In the specification of the Collector
interface, in the section with half a dozen bullet points, is this:
For non-concurrent collectors, any result returned from the result supplier, accumulator, or combiner functions must be serially thread-confined. This enables collection to occur in parallel without the Collector needing to implement any additional synchronization. The reduction implementation must manage that the input is properly partitioned, that partitions are processed in isolation, and combining happens only after accumulation is complete.
This means that the various implementations provided by the Collectors
class can be used with parallel streams, even though some of those implementations might not be concurrent collectors. This also applies to any of your own non-concurrent collectors that you might implement. They can be used safely with parallel streams, provided your collectors don't interfere with the stream source, are side-effect free, order independent, etc.
I also recommend reading the Mutable Reduction section of the java.util.stream package documentation. In the middle of this section is an example that is stated to be parallelizable, but which collects results into an ArrayList
, which is not thread-safe.
The way this works is that a parallel stream ending in a non-concurrent collector makes sure that different threads are always operating on different instances of the intermediate result collections. That's why a collector has a Supplier
function, for creating as many intermediate collections as there are threads, so each thread can accumulate into its own. When intermediate results are to be merged, they are handed off safely between threads, and at any given time only a single thread is merging any pair of intermediate results.
Confusion about Characteristics.UNORDERED in Java 8 in action book
That is simply wrong. Even adding CONCURRENT
characteristic here is wrong, as you would need a thread safe data structure in the Supplier
.
Is it safe to stream a concurrent collection while it is modified externally?
From the documentation in link you have provided
For most data sources, preventing interference means ensuring that the
data source is not modified at all during the execution of the stream
pipeline. The notable exception to this are streams whose sources are
concurrent collections, which are specifically designed to handle
concurrent modification. Concurrent stream sources are those whose
Spliterator reports the CONCURRENT characteristic
From documentation of the SplitIterator (its CONCURRENT characteristic)
static final int CONCURRENT
Characteristic value signifying that the element source may be safely
concurrently modified (allowing additions, replacements, and/or
removals) by multiple threads without external synchronization. If so,
the Spliterator is expected to have a documented policy concerning the
impact of modifications during traversal.
This is implementation from Collection interface stream method (which is not overridden in ConcurrentLinkedQueue)
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
So as long as ConcurrentLinkedQueue uses CONCURRENT SplitIterator (which it does) I would assume you can safely iterate your ConcurrentLinkedQueue using stream().
What is the difference between Collectors.toConcurrentMap and converting a Map to ConcurrentHashMap via Collectors.toMap supplier option?
There is a difference between them when dealing with parallel streams.
toMap
-> is a non-concurrent collector
toConcurrentMap
-> is a concurrent collector (this can be seen from their characteristics).
The difference is that toMap will create multiple intermediate results and then will merge then together (the Supplier of such a Collector will be called multiple times), while toConcurrentMap will create a single result and each Thread will throw results at it (the Supplier of such a Collector will be called only once)
Why is this important? This deals with insertion order (if that matters).
toMap will insert values in the resulting Map in encounter order by merging multiple intermediate results (Supplier of that collector is called multiple time as well as the Combiner)
toConcurrentMap will collect elements in any order (undefined) by throwing all elements at a common result container (ConcurrentHashMap in this case). Supplier is called only once, Accumulator many times and Combiner never.
The small caveat here is that for a CONCURRENT
collector to not invoke the merger: either the stream has to have the UNORDERED
flag - either via the unordered()
explicit call or when the source of the stream is not ordered (a Set
for example).
Related Topics
"Content Is Not Allowed in Prolog" When Parsing Perfectly Valid Xml on Gae
Blank PDF Even with the Simplest Jasperreport Jrxml
Create Bar Chart in Excel with Apache Poi
Why Is Constructor of Super Class Invoked When We Declare the Object of Sub Class? (Java)
Steganography in Lossy Compression (Java)
Can Someone Please Explain Me @Mapsid in Hibernate
Using Javafx.Beans Properties in Model Classes
Httpclient 4, Error 302. How to Redirect
Information About _Java_Options
Whats the Use of Saying <? Extends Someobject> Instead of <Someobject>
Differencebetween a Pointer and a Reference Variable in Java
Convert Two Dimensional Array to List in Java
Creating Unicode Character from Its Number
Spring Boot Actuator Without Spring Boot
Why Should I Override Hashcode() When I Override Equals() Method