Java's Fork/Join VS Executorservice - When to Use Which

Java's Fork/Join vs ExecutorService - when to use which?

Fork-join allows you to easily execute divide and conquer jobs, which have to be implemented manually if you want to execute it in ExecutorService. In practice ExecutorService is usually used to process many independent requests (aka transaction) concurrently, and fork-join when you want to accelerate one coherent job.

ForkJoinPool scheduling vs ExecutorService

Suppose you have a very big array of ints and you want to add all of them. With an ExecutorService you might say: let's divide that array into chunks of let's say number of threads / 4. So if you have an array of 160 elements (and you have 4 CPUs), you create 160 / 4 / 4 = 10, so you would create 16 chunks each holding 10 ints. Create runnables/callables and submit those to an executor service (and of course think of a way to merge those results once they are done).

Now your hopes are that each of the CPUs will take 4 of those tasks and work on them. Now let's also suppose that some of the numbers are very complicated to add (of course not, but bear with me), it could turn out that 3 threads/CPUs are done with their work while one of them is busy only with the first chunk. No one wants that, of course, but could happen. The bad thing now is that you can't do anything about it.

What ForkJoinPool does instead is say provide me with how you want to split your task and the implementation for the minimal workload I have to do and I'll take care of the rest. In the Stream API this is done with Spliterators; mainly with two methods trySplit (that either returns null meaning nothing can be split more or a new Spliterator - meaning a new chunk) and forEachRemaning that will process elements once you can't split your task anymore. And this is where work stealing will help you.

You say how your chunks are computed (usually split in half) and what to do when you can't split anymore. ForkJoinPool will dispatch the first chunk to all threads and when some of them are free - they are done with their work, they can query other queues from other threads and see if they have work. If they notice that there are chunks in some other threads queues, they will take them, split them on their own and work on those. It can even turn out that they don't do the entire work on that chunks on their own - some other thread can now query this thread's queue and notice that there is still work to do and so on... This is far better as now, when those 3 threads are free they can pick up some other work to do - and all of them are busy.


This example is a bit simplified, but is not very far from reality. It's just that you need to have a lot more chunks than CPU's/threads for work stealing to work; thus usually trySplit has to have a smart implementation and you need lots of elements in the source of your stream.

Is it beneficial to use ForkJoinPool as usual ExecutorService

It will be effective if you use newWorkStealingPool

public static ExecutorService newWorkStealingPool()

Creates a work-stealing thread pool using all available processors as its target parallelism level.

You can find advantage from this documentation page:

A ForkJoinPool provides the entry point for submissions from non-ForkJoinTask clients, as well as management and monitoring operations.

A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist).

This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined.

Difference between ForkJoinPool and normal ExecutionService?

Although ForkJoinPool implements ExecutorService, it is conceptionally different from 'normal' executors.

You can easily see the difference if your tasks spawn more tasks and wait for them to complete, e.g. by calling

executor.invoke(new Task()); // blocks this thread until new task completes

In a normal executor service, waiting for other tasks to complete will block the current thread. There are two possible outcomes: If your executor service has a fixed number of threads, it might deadlock if the last running thread waits for another task to complete. If your executor dynamically creates new threads on demand, the number of threads might explode and you end up having thousands of threads which might cause starvation.

In opposite, the fork/join framework reuses the thread in the meantime to execute other tasks, so it won't deadlock although the number of threads is fixed:

new MyForkJoinTask().invoke();

So if you have a problem that you can solve recursively, think of using a ForkJoinPool as you can easily implement one level of recursion as ForkJoinTask.

Just check the number of running threads in your examples.

java Fork/Join pool, ExecutorService and CountDownLatch

After research on various multi threading frameworks for past 3 months , I have found answer to question.

ExecutorService

It is simple and easy to use with limited control. You can use it

  1. To start parallel independent tasks with out Waiting
  2. Wait for completion of all your tasks

I prefer this one when number of Callable/Runnable tasks are small in number and piling of tasks in unbounded queue does not cause pile-up in memory & degrade the performance of the system.

It hides low level details of ThreadPoolExecutor. It does not allow playing with other parameters ( Bounded Queue, Rejection Handler etc. to fine tune the performance) as in ThreadPoolExectuor.

ThreadPoolExecutor

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler)

It provides more control to you. Apart from setting minimum and maximum threads, you can set queue size and make BlockingQueue is bounded.

You can come up with your own thread factory if you need below features

  1. To set a more descriptive thread name
  2. To set thread daemon status
  3. To set thread priority

If your application is constrained by number of pending Runnable/Callable tasks, you will use bounded queue by setting the max capacity. Once the queue reaches maximum capacity, you can define RejectionHandler. Java provides four types of Rejection Handler policies.

  1. In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime RejectedExecutionException upon rejection.

  2. In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.

  3. In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped.

  4. In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.)

CountDownLatch

CountDownLatch : This framework allows a java thread to wait until other set of threads completes their tasks.

Use cases:

  1. Achieving Maximum Parallelism: Sometimes we want to start a number of threads at the same time to achieve maximum parallelism

  2. Wait for N threads to complete before start of executing other code block

  3. Deadlock detection.

More details are listed in this article

ForkJoinPool

The ForkJoinPool is similar to the Java ExecutorService but with one difference. The ForkJoinPool makes it easy for tasks to split their work up into smaller tasks which are then submitted to the ForkJoinPool too. Task stealing happens in ForkJoinPool when free worker threads steal tasks from busy worker thread queue.

public ForkJoinPool(int parallelism,
ForkJoinPool.ForkJoinWorkerThreadFactory factory,
Thread.UncaughtExceptionHandler handler,
boolean asyncMode)
Creates a ForkJoinPool with the given parameters.

Parameters:

parallelism - the parallelism level. For default value, use Runtime.availableProcessors().

factory - the factory for creating new threads. For default value, use
defaultForkJoinWorkerThreadFactory.

handler - the handler for internal worker threads that terminate due to unrecoverable errors

asyncMode - if true, establishes local first-in-first-out scheduling mode
for forked tasks that are never joined.

Regarding main query:

You can use ExecutorService.invokeAll() or CountDownLatch framework or ForkJoinPool . All these frameworks are complimentary to each other varying of granularity to control the execution of tasks from high level to low level.

EDIT:

Have a look at related SE questions:

What are the advantages of using an ExecutorService?

Java's Fork/Join vs ExecutorService - when to use which?

What's the advantage of a Java-5 ThreadPoolExecutor over a Java-7 ForkJoinPool?

ThreadPool (TP) and ForkJoinPool (FJ) are targeted towards different use cases. The main difference is in the number of queues employed by the different executors which decide what type of problems are better suited to either executor.

The FJ executor has n (aka parallelism level) separate concurrent queues (deques) while the TP executor has only one concurrent queue (these queues/deques maybe custom implementations not following the JDK Collections API). As a result, in scenarios where you have a large number of (usually relatively short running) tasks generated, the FJ executor will perform better as the independent queues will minimize concurrent operations and infrequent steals will help with load balancing. In TP due to the single queue, there will be concurrent operations every time work is dequeued and it will act as a relative bottleneck and limit performance.

In contrast, if there are relatively fewer long-running tasks the single queue in TP is no longer a bottleneck for performance. However, the n-independent queues and relatively frequent work-stealing attempts will now become a bottleneck in FJ as there can be possibly many futile attempts to steal work which add to overhead.

In addition, the work-stealing algorithm in FJ assumes that (older) tasks stolen from the deque will produce enough parallel tasks to reduce the number of steals. E.g. in quicksort or mergesort where older tasks equate to larger arrays, these tasks will generate more tasks and keep the queue non-empty and reduce the number of overall steals. If this is not the case in a given application then the frequent steal attempts again become a bottleneck. This is also noted in the javadoc for ForkJoinPool:

this class provides status check methods (for example getStealCount())
that are intended to aid in developing, tuning, and monitoring
fork/join applications.

Why does parallelStream use a ForkJoinPool, not a normal thread pool?

One important thing is that a ForkJoinPool can execute "normal" tasks (e.g. Runnable, Callable) as well, so it's not just meant to be used with recursively-created tasks.

Another (important) thing is that ForkJoinPool has multiple queues, one for each worker thread, for the tasks, where a normal executor (e.g. ThreadPoolExecutor) has just one. This has much impact on what kind of tasks they should run.

The smaller and the more tasks a normal executor has to execute, the higher is the overhead of synchronization for distributing tasks to the workers. If most of the tasks are small, the workers will access the internal task queue often, which leads to synchronization overhead.

Here's where the ForkJoinPool shines with its multiple queues. Every worker just takes tasks from its own queue, which doesn't need to be synchronized by blocking most of the time, and if it's empty, it can steal a task from another worker, but from the other end of the queue, which also leads rarely to synchronization overhead as work-stealing is supposed to be rather rare.

Now what does that have to do with parallel streams? The streams-framework is designed to be easy to use. Parallel streams are supposed to be used when you want to split something up in many concurrent tasks easily, where all tasks are rather small and simple. Here's the point where the ForkJoinPool is the reasonable choice. It provides the better performance on huge numbers of smaller tasks and it can handle longer tasks as well, if it has to.



Related Topics



Leave a reply



Submit