Custom Thread Pool in Java 8 Parallel Stream

Custom thread pool in Java 8 parallel stream

There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = forkJoinPool.submit(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList())
).get();
System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}

The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"

How to specify ForkJoinPool for Java 8 parallel stream?

Streams are lazy; all work is done when you commence a terminal operation. In your case, the terminal operation is .collect(Collectors.toList()), which you call in the main thread on the result of get(). Therefore, the actual work will be done the same way as if you’ve constructed the entire stream in the main thread.

For your pool to have an effect, you have to move the terminal operation into the submitted task:

ForkJoinPool pool = new ForkJoinPool(10);
List<Integer> testList = Arrays.asList(
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
long start = System.currentTimeMillis();
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (InterruptedException e) {}
return item * 10;
}).collect(Collectors.toList())).join();
System.out.println(result);
System.out.println(System.currentTimeMillis() - start);

We can also demonstrate the relevance of the terminal operation by constructing the stream in the main thread and only submitting the terminal operation to the pool:

Stream<Integer> stream = testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (InterruptedException e) {}
return item * 10;
});
List<Integer> result = pool.submit(() -> stream.collect(Collectors.toList())).join();

But you should keep in mind that this is undocumented behavior, which is not guaranteed. The actual answer must be that the Stream API in its current form, with no thread control (and no help for dealing with checked exceptions), is not suitable for parallel I/O operations.

Why does parallelStream use a ForkJoinPool, not a normal thread pool?

One important thing is that a ForkJoinPool can execute "normal" tasks (e.g. Runnable, Callable) as well, so it's not just meant to be used with recursively-created tasks.

Another (important) thing is that ForkJoinPool has multiple queues, one for each worker thread, for the tasks, where a normal executor (e.g. ThreadPoolExecutor) has just one. This has much impact on what kind of tasks they should run.

The smaller and the more tasks a normal executor has to execute, the higher is the overhead of synchronization for distributing tasks to the workers. If most of the tasks are small, the workers will access the internal task queue often, which leads to synchronization overhead.

Here's where the ForkJoinPool shines with its multiple queues. Every worker just takes tasks from its own queue, which doesn't need to be synchronized by blocking most of the time, and if it's empty, it can steal a task from another worker, but from the other end of the queue, which also leads rarely to synchronization overhead as work-stealing is supposed to be rather rare.

Now what does that have to do with parallel streams? The streams-framework is designed to be easy to use. Parallel streams are supposed to be used when you want to split something up in many concurrent tasks easily, where all tasks are rather small and simple. Here's the point where the ForkJoinPool is the reasonable choice. It provides the better performance on huge numbers of smaller tasks and it can handle longer tasks as well, if it has to.

java 8 parallel stream with ForkJoinPool and ThreadLocal

I found the following solution that works without changing any underlying code. Basically the map method takes a functional interface which I am representing as a lambda expression. This expression adds a preExecution hook to set the new tenantId in the current threadlocal, and cleaning it up in postExecution.

       forkJoinPool.submit(tables.stream()
.parallel()
.map((item) -> {
preExecution(tenantId)
try {
return validator.apply(item);
} finally {
postExecution();
}
}
)
.filter(validationResult ->
validationResult.getValidationMessages()
.size() > 0)
.collect(Collectors.toList())).get();

Why does stream parallel() not use all available threads?

Since the Stream implementation’s use of the Fork/Join pool is an implementation detail, the trick to force it to use a different Fork/Join pool is undocumented as well and seems to work by accident, i.e. there’s a hardcoded constant determining the actual parallelism, depending on the default pool’s parallelism. So using a different pool was not foreseen, originally.

However, it has been recognized that using a different pool with an inappropriate target parallelism is a bug, even if this trick is not documented, see JDK-8190974.

It has been fixed in Java 10 and backported to Java 8, update 222.

So a simple solution world be updating the Java version.

You may also change the default pool’s parallelism, e.g.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "100");

before doing any Fork/Join activity.

But this may have unintended effects on other parallel operations.

How to limit number of parallel executions in ParallelStream?

I believe you rather need to limit the number of concurrent tasks being executed, therefore I don't find a necessity of using a parallel stream here as long as there is an easy solution located in the Java concurrent package. Use ExecutorService with a fixed thread pool of four instead.

Collection<Callable<Void>> = ...
ExecutorService executorService = Executors.newFixedThreadPool(4);
executorService.invokeAll(callables);

If you really wish to use a custom thread pool within the parallel streams, please, refer to this question: Custom thread pool in Java 8 parallel stream.

java how to create thread pool for stream operation

The code compiles and runs fine, once the code errors are fixed (str => s).

Common Pool

// Setup with dummy actions for testing which thread executes the action
List<String> mylist = new ArrayList<>(Arrays.asList("1","2","3","4")); //that holds the strings
List<Action> actions = new ArrayList<>(Arrays.asList(
s -> { s += "x"; System.out.println(Thread.currentThread().getName() + ": " + s); return s; },
s -> { s += "y"; System.out.println(Thread.currentThread().getName() + ": " + s); return s; }
));

// Using common pool
Stream<String> stream = mylist.parallelStream();
stream = stream.flatMap(s -> actions.stream().map(ac -> ac.work(s)));
List<String> r = stream.collect(Collectors.toList());
System.out.println(r);

Output

ForkJoinPool.commonPool-worker-7: 1x
ForkJoinPool.commonPool-worker-3: 2x
ForkJoinPool.commonPool-worker-3: 2y
main: 3x
ForkJoinPool.commonPool-worker-5: 4x
main: 3y
ForkJoinPool.commonPool-worker-7: 1y
ForkJoinPool.commonPool-worker-5: 4y
[1x, 1y, 2x, 2y, 3x, 3y, 4x, 4y]

Custom Pool

ForkJoinPool customThreadPool = new ForkJoinPool(4);
ForkJoinTask<List<String>> task = customThreadPool.submit(
() -> mylist.parallelStream().flatMap(s -> actions.stream().map(ac -> ac.work(s))).collect(Collectors.toList()));
System.out.println(task.get());

If the compiler complains as described in the question, you need to help it choose the correct overload of submit() by casting the lambda expression in the 3rd line:

        (Callable<List<String>>) () -> mylist.parallelStream().flatMap(s -> actions.stream().map(ac -> ac.work(s))).collect(Collectors.toList()));

Output

ForkJoinPool-1-worker-3: 3x
ForkJoinPool-1-worker-1: 1x
ForkJoinPool-1-worker-1: 1y
ForkJoinPool-1-worker-5: 2x
ForkJoinPool-1-worker-7: 4x
ForkJoinPool-1-worker-7: 4y
ForkJoinPool-1-worker-5: 2y
ForkJoinPool-1-worker-3: 3y
[1x, 1y, 2x, 2y, 3x, 3y, 4x, 4y]

Single Thread

Stream<String> stream = mylist.stream();
stream = stream.flatMap(s -> actions.stream().map(ac -> ac.work(s)));
List<String> r = stream.collect(Collectors.toList());
System.out.println(r);

Output

main: 1x
main: 1y
main: 2x
main: 2y
main: 3x
main: 3y
main: 4x
main: 4y
[1x, 1y, 2x, 2y, 3x, 3y, 4x, 4y]


Related Topics



Leave a reply



Submit