Custom thread pool in Java 8 parallel stream
There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.
final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = forkJoinPool.submit(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList())
).get();
System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
The trick is based on ForkJoinTask.fork
which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool()
if not inForkJoinPool()
"
Java parallelStream does not use expected number of threads
The parallelism setting of the Fork/Join pool determines the number of pool worker threads, but since the caller thread, e.g. the main thread, will work on the jobs too, there is always one more thread when using the common pool. That’s why the default setting of the common pool is “number of cores minus one” to get an actual number of working threads equal to the number of cores.
With your custom Fork/Join pool, the caller thread of the stream operation is already a worker thread of the pool, hence, utilizing it for processing jobs doesn’t increase the actual number of working threads.
It must be emphasized that the interaction between the Stream implementation and the Fork/Join pool is entirely unspecified as the fact that streams use the Fork/Join framework under the hood is an implementation detail. There is no guaranty that changing the default pool’s properties has any effect on streams nor that calling stream operations from within a custom Fork/Join pool’s task will use that custom pool.
Java Parallel Stream always log same threadID
It depends on how many cores your CPU on production has. You are using common ForkJoinPool which is initialised by expression: number of available processors - 1. So probably this thread pool has only 1 thread.
Usage of parallelStream() resulting in unclosed Threads?
Basically ... you are trying to solve a problem that isn't a problem. The common fork-join pool is a managed pool. The JVM takes care of it.
And if it really matters to you, the bad news is that there is probably nothing you can do about it. The common pool will ignore shutdown()
and shutdownNow()
calls. This is by design.
There is a trick which allows you to create a custom ForkJoinPool
and run your stream in it. See Custom thread pool in Java 8 parallel stream. Once you are finished you can shutdown the pool to make the threads go away.
But ... it is probably a bad idea. It is more efficient to reuse an existing pool or the common pool. Creating and destroying threads is expensive. Doing this repeatedly because you are repeatedly creating and destroying pools is inefficient.
The common ForkJoinPool
should not be viewed as a thread leak.
java 8 parallel stream, blockingcode possible?
An operation on a ParallelStream
is still blocking and will wait for all the threads it spawned to finish. These threads are executed asynchronously (they don't wait for a previous one to finish), but that doesn't mean your whole code starts behaving asynchronously !
If you're actually making asynchronous calls and working on the resulting CompletableFuture<T>
in your forEach
, you should instead make your terminal operation a reduce
producing a single CompletableFuture<T>
. Intermediate operations could be a peek
or an identity map
with side-effects (both are frowned upon, but I don't know any best-practice solution). You would close the connection upon resolve of the single resulting CompletableFuture<T>
.
If you're not, then your code looks good enough, as the closeClientConnection()
will only be executed once the ParallelStream
has been processed.
How to specify ForkJoinPool for Java 8 parallel stream?
Streams are lazy; all work is done when you commence a terminal operation. In your case, the terminal operation is .collect(Collectors.toList())
, which you call in the main
thread on the result of get()
. Therefore, the actual work will be done the same way as if you’ve constructed the entire stream in the main
thread.
For your pool to have an effect, you have to move the terminal operation into the submitted task:
ForkJoinPool pool = new ForkJoinPool(10);
List<Integer> testList = Arrays.asList(
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
long start = System.currentTimeMillis();
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (InterruptedException e) {}
return item * 10;
}).collect(Collectors.toList())).join();
System.out.println(result);
System.out.println(System.currentTimeMillis() - start);
We can also demonstrate the relevance of the terminal operation by constructing the stream in the main
thread and only submitting the terminal operation to the pool:
Stream<Integer> stream = testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (InterruptedException e) {}
return item * 10;
});
List<Integer> result = pool.submit(() -> stream.collect(Collectors.toList())).join();
But you should keep in mind that this is undocumented behavior, which is not guaranteed. The actual answer must be that the Stream API in its current form, with no thread control (and no help for dealing with checked exceptions), is not suitable for parallel I/O operations.
Nested parallel streams in Java
The thread pool behind parallel streams is the common pool, which you can get with ForkJoinPool.commonPool()
. It usually uses NumberOfProcessors - 1 workers. To resolve dependencies like you've described, it's able to dynamically create additional workers if (some) current workers are blocked and a deadlock becomes possible.
However, this is not the answer for your case.
Tasks in a ForkJoinPool
have two important functionalities:
- They can create subtasks and split the current task into smaller pieces (fork).
- They can wait for the subtasks (join).
When a thread executes such a task A and joins a subtask B, it doesn't just wait blocking for the subtask to finish its execution but executes another task C in the meantime. When C is finished, the thread comes back to A and checks if B is finished. Note that B and C can (and most likely are) the same task. If B is finished, then A has successfully waited for/joined it (non-blocking!). Check out this guide if the previous explanation is not clear.
Now when you use a parallel stream, the range of the stream is split into tasks recursively until the tasks become so small that they can be executed sequentially more efficiently. Those tasks are put into a work queue (there is one for each worker) in the common pool. So, what IntStream.range(0, 100).parallel().forEach
does is splitting up the range recursively until it's not worth it anymore. Each final task, or rather bunch of iterations, can be executed sequentially with the provided code in forEach
. At this point the workers in the common pool can just execute those tasks until all are done and the stream can return. Note that the calling thread helps out with the execution by joining subtasks!
Now each of those tasks uses a parallel stream itself in your case. The procedure is the same; split it up into smaller tasks and put those tasks into a work queue in the common pool. From the ForkJoinPool
's perspective those are just additional tasks on top of the already present ones. The workers just keep executing/joining tasks until all are done and the outer stream can return.
This is what you see in the output: There is no deterministic behaviour, no fixed order. Also there cannot occur a deadlock because in the given use case there won't be blocking threads.
You can check the explanation with the following code:
public static void main(String[] args) {
IntStream.range(0, 10).parallel().forEach(i -> {
IntStream.range(0, 10).parallel().forEach(j -> {
for (int x = 0; x < 1e6; x++) { Math.sqrt(Math.log(x)); }
System.out.printf("%d %d %s\n", i, j, Thread.currentThread().getName());
for (int x = 0; x < 1e6; x++) { Math.sqrt(Math.log(x)); }
});
});
}
You should notice that the main thread is involved in the execution of the inner iterations, so it is not (!) blocked. The common pool workers just pick tasks one after another until all are finished.
Related Topics
Change Location of Log4J.Properties
How to Set Classpath When I Use Javax.Tools.Javacompiler Compile the Source
Differences Between "Java -Cp" and "Java -Jar"
Java.Lang.Stackoverflowerror While Using a Regex to Parse Big Strings
Java String - See If a String Contains Only Numbers and Not Letters
Getting Mail from Gmail into Java Application Using Imap
Checked VS Unchecked Exception
Java: Join Array of Primitives with Separator
Using Javamail to Connect to Gmail Smtp Server Ignores Specified Port and Tries to Use 25
How Is This Private Variable Accessible
How to Change Font Size in Eclipse for Java Text Editors