How Many Threads Are Spawned in Parallelstream in Java 8

Custom thread pool in Java 8 parallel stream

There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = forkJoinPool.submit(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList())
).get();
System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}

The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"

Java parallelStream does not use expected number of threads

The parallelism setting of the Fork/Join pool determines the number of pool worker threads, but since the caller thread, e.g. the main thread, will work on the jobs too, there is always one more thread when using the common pool. That’s why the default setting of the common pool is “number of cores minus one” to get an actual number of working threads equal to the number of cores.

With your custom Fork/Join pool, the caller thread of the stream operation is already a worker thread of the pool, hence, utilizing it for processing jobs doesn’t increase the actual number of working threads.

It must be emphasized that the interaction between the Stream implementation and the Fork/Join pool is entirely unspecified as the fact that streams use the Fork/Join framework under the hood is an implementation detail. There is no guaranty that changing the default pool’s properties has any effect on streams nor that calling stream operations from within a custom Fork/Join pool’s task will use that custom pool.

Java Parallel Stream always log same threadID

It depends on how many cores your CPU on production has. You are using common ForkJoinPool which is initialised by expression: number of available processors - 1. So probably this thread pool has only 1 thread.

Usage of parallelStream() resulting in unclosed Threads?

Basically ... you are trying to solve a problem that isn't a problem. The common fork-join pool is a managed pool. The JVM takes care of it.

And if it really matters to you, the bad news is that there is probably nothing you can do about it. The common pool will ignore shutdown() and shutdownNow() calls. This is by design.

There is a trick which allows you to create a custom ForkJoinPool and run your stream in it. See Custom thread pool in Java 8 parallel stream. Once you are finished you can shutdown the pool to make the threads go away.

But ... it is probably a bad idea. It is more efficient to reuse an existing pool or the common pool. Creating and destroying threads is expensive. Doing this repeatedly because you are repeatedly creating and destroying pools is inefficient.

The common ForkJoinPool should not be viewed as a thread leak.

java 8 parallel stream, blockingcode possible?

An operation on a ParallelStream is still blocking and will wait for all the threads it spawned to finish. These threads are executed asynchronously (they don't wait for a previous one to finish), but that doesn't mean your whole code starts behaving asynchronously !

If you're actually making asynchronous calls and working on the resulting CompletableFuture<T> in your forEach, you should instead make your terminal operation a reduce producing a single CompletableFuture<T>. Intermediate operations could be a peek or an identity map with side-effects (both are frowned upon, but I don't know any best-practice solution). You would close the connection upon resolve of the single resulting CompletableFuture<T>.

If you're not, then your code looks good enough, as the closeClientConnection() will only be executed once the ParallelStream has been processed.

How to specify ForkJoinPool for Java 8 parallel stream?

Streams are lazy; all work is done when you commence a terminal operation. In your case, the terminal operation is .collect(Collectors.toList()), which you call in the main thread on the result of get(). Therefore, the actual work will be done the same way as if you’ve constructed the entire stream in the main thread.

For your pool to have an effect, you have to move the terminal operation into the submitted task:

ForkJoinPool pool = new ForkJoinPool(10);
List<Integer> testList = Arrays.asList(
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
long start = System.currentTimeMillis();
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (InterruptedException e) {}
return item * 10;
}).collect(Collectors.toList())).join();
System.out.println(result);
System.out.println(System.currentTimeMillis() - start);

We can also demonstrate the relevance of the terminal operation by constructing the stream in the main thread and only submitting the terminal operation to the pool:

Stream<Integer> stream = testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (InterruptedException e) {}
return item * 10;
});
List<Integer> result = pool.submit(() -> stream.collect(Collectors.toList())).join();

But you should keep in mind that this is undocumented behavior, which is not guaranteed. The actual answer must be that the Stream API in its current form, with no thread control (and no help for dealing with checked exceptions), is not suitable for parallel I/O operations.

Nested parallel streams in Java

The thread pool behind parallel streams is the common pool, which you can get with ForkJoinPool.commonPool(). It usually uses NumberOfProcessors - 1 workers. To resolve dependencies like you've described, it's able to dynamically create additional workers if (some) current workers are blocked and a deadlock becomes possible.

However, this is not the answer for your case.

Tasks in a ForkJoinPool have two important functionalities:

  • They can create subtasks and split the current task into smaller pieces (fork).
  • They can wait for the subtasks (join).

When a thread executes such a task A and joins a subtask B, it doesn't just wait blocking for the subtask to finish its execution but executes another task C in the meantime. When C is finished, the thread comes back to A and checks if B is finished. Note that B and C can (and most likely are) the same task. If B is finished, then A has successfully waited for/joined it (non-blocking!). Check out this guide if the previous explanation is not clear.

Now when you use a parallel stream, the range of the stream is split into tasks recursively until the tasks become so small that they can be executed sequentially more efficiently. Those tasks are put into a work queue (there is one for each worker) in the common pool. So, what IntStream.range(0, 100).parallel().forEach does is splitting up the range recursively until it's not worth it anymore. Each final task, or rather bunch of iterations, can be executed sequentially with the provided code in forEach. At this point the workers in the common pool can just execute those tasks until all are done and the stream can return. Note that the calling thread helps out with the execution by joining subtasks!

Now each of those tasks uses a parallel stream itself in your case. The procedure is the same; split it up into smaller tasks and put those tasks into a work queue in the common pool. From the ForkJoinPool's perspective those are just additional tasks on top of the already present ones. The workers just keep executing/joining tasks until all are done and the outer stream can return.

This is what you see in the output: There is no deterministic behaviour, no fixed order. Also there cannot occur a deadlock because in the given use case there won't be blocking threads.

You can check the explanation with the following code:

    public static void main(String[] args) {
IntStream.range(0, 10).parallel().forEach(i -> {
IntStream.range(0, 10).parallel().forEach(j -> {
for (int x = 0; x < 1e6; x++) { Math.sqrt(Math.log(x)); }
System.out.printf("%d %d %s\n", i, j, Thread.currentThread().getName());
for (int x = 0; x < 1e6; x++) { Math.sqrt(Math.log(x)); }
});
});
}

You should notice that the main thread is involved in the execution of the inner iterations, so it is not (!) blocked. The common pool workers just pick tasks one after another until all are finished.



Related Topics



Leave a reply



Submit