How to Properly Use Java Executor

How to properly use Java Executor?

  1. ExecutorService

    ExecutorService executor=Executors.newFixedThreadPool(50);

    It is simple and easy to use. It hides low level details of ThreadPoolExecutor.

    Prefer this one when number of Callable/Runnable tasks are small in number and piling of tasks in unbounded queue does not increase memory & degrade the performance of the system. If you have CPU/Memory constraints, use ThreadPoolExecutor with capacity constraints & RejectedExecutionHandler to handle rejection of tasks.

  2. CountDownLatch

    You have initialized CountDownLatch with a given count. This count is decremented by calls to the countDown() method. I am assuming that you are calling decrement in your Runnable task later. Threads waiting for this count to reach zero can call one of the await() methods. Calling await() blocks the thread until the count reaches zero. This class enables a java thread to wait until other set of threads completes their tasks.

    Use cases:

    1. Achieving Maximum Parallelism: Sometimes we want to start a number of threads at the same time to achieve maximum parallelism

    2. Wait N threads to completes before start execution

    3. Deadlock detection.

      Have a look at this article by Lokesh Gupta for more details.

  3. ThreadPoolExecutor : It provides more control to finetune various thread pool parameters. If your application is constrained by number of active Runnable/Callable tasks, you should use bounded queue by setting the max capacity. Once the queue reaches maximum capacity, you can define RejectionHandler. Java provides four types of RejectedExecutionHandler policies.

    1. In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime RejectedExecutionException upon rejection.

    2. In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.

    3. In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped.

    4. In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.)

      If you want to simulate CountDownLatch behaviour, you can use invokeAll() method.

  4. One more mechanism you did not quote is ForkJoinPool

    The ForkJoinPool was added to Java in Java 7. The ForkJoinPool is similar to
    the Java ExecutorService but with one difference. The ForkJoinPool makes it
    easy for tasks to split their work up into smaller tasks which are then
    submitted to the ForkJoinPool too. Task stealing happens in ForkJoinPool when free worker threads steal tasks from busy worker thread queue.

    Java 8 has introduced one more API in ExecutorService to create work stealing pool. You don't have to create RecursiveTask and RecursiveAction but still can use ForkJoinPool.

      public static ExecutorService newWorkStealingPool()

    Creates a work-stealing thread pool using all available processors as its target parallelism level.

    By default, it will take number of CPU cores as parameter.

All these four mechanism are complimentary to each other. Depending on level of granularity you want to control, you have to chose right ones.

What is the right way to use Java executor?

Looks correct to me. In the past I have followed the suggested implementation from the Java 7 JavaDoc for ExecutorService for stopping it. You can get it fromt he Java 7 Javadoc but I provide it below for convenience. Edit it to fit your needs, for example you might want to pass the number of seconds to wait. The good thing about using a CountDownLatch is that by the time it is done waiting you know the ExecutorService will terminate right away. Also, you might want to add a timeout to your latch's await if needed in future real world cases. Also, put your latch.countDOwn() in a try's finally block when using in real world application.

 void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}

How to correctly use ExecutorService to manage the number of concurrently running SwingWorkers?

That moment when you think: It was so obvious!

ExecutorService executorService = Executors.newFixedThreadPool(20);

for (int i = 0; i < 500; i++) {

SwingWorker<Boolean, Void> worker = new SwingWorker<Boolean, Void>() {
@Override
protected Boolean doInBackground() throws Exception {

System.out.println("One SwingWorker just ran!");
return true;
}

protected void done() {

boolean status;
try {

status = get();

} catch (InterruptedException e) {
// This is thrown if the thread's interrupted.
} catch (ExecutionException e) {
// This is thrown if we throw an exception
// from doInBackground.
}
}

};

executorService.submit(worker);
}


It works great!

How to correctly implement executor that runs multiple iterations and waits for all tasks to complete and successfully terminates after tasks are done

With reference to ThreadPoolExecutor documentation. The awaitTermination() method description reads:

Blocks until all tasks have completed execution after a shutdown request

While the shutdown() method descriptin reads

Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted

Which indicates that awaitTermination() call is effective after a shutdown() call.

To solve the above problem, shutdown() needs to be called first and then awaitTermination()

NOTE: I have not personally tested this; however, John has, as mentioned in the comment of the original post and the mechanism works

ExecutorService's surprising performance break-even point --- rules of thumb?

  1. Using executors is about utilizing CPUs and / or CPU cores, so if you create a thread pool that utilizes the amount of CPUs at best, you have to have as many threads as CPUs / cores.
  2. You are right, creating new objects costs too much. So one way to reduce the expenses is to use batches. If you know the kind and amount of computations to do, you create batches. So think about thousand(s) computations done in one executed task. You create batches for each thread. As soon as the computation is done (java.util.concurrent.Future), you create the next batch. Even the creation of new batches can be done in parralel (4 CPUs -> 3 threads for computation, 1 thread for batch provisioning). In the end, you may end up with more throughput, but with higher memory demands (batches, provisioning).

Edit: I changed your example and I let it run on my little dual-core x200 laptop.

provisioned 2 batches to be executed
simpleCompuation:14
computationWithObjCreation:17
computationWithObjCreationAndExecutors:9

As you see in the source code, I took the batch provisioning and executor lifecycle out of the measurement, too. That's more fair compared to the other two methods.

See the results by yourself...

import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecServicePerformance {

private static int count = 100000;

public static void main( String[] args ) throws InterruptedException {

final int cpus = Runtime.getRuntime().availableProcessors();

final ExecutorService es = Executors.newFixedThreadPool( cpus );

final Vector< Batch > batches = new Vector< Batch >( cpus );

final int batchComputations = count / cpus;

for ( int i = 0; i < cpus; i++ ) {
batches.add( new Batch( batchComputations ) );
}

System.out.println( "provisioned " + cpus + " batches to be executed" );

// warmup
simpleCompuation();
computationWithObjCreation();
computationWithObjCreationAndExecutors( es, batches );

long start = System.currentTimeMillis();
simpleCompuation();
long stop = System.currentTimeMillis();
System.out.println( "simpleCompuation:" + ( stop - start ) );

start = System.currentTimeMillis();
computationWithObjCreation();
stop = System.currentTimeMillis();
System.out.println( "computationWithObjCreation:" + ( stop - start ) );

// Executor

start = System.currentTimeMillis();
computationWithObjCreationAndExecutors( es, batches );
es.shutdown();
es.awaitTermination( 10, TimeUnit.SECONDS );
// Note: Executor#shutdown() and Executor#awaitTermination() requires
// some extra time. But the result should still be clear.
stop = System.currentTimeMillis();
System.out.println( "computationWithObjCreationAndExecutors:"
+ ( stop - start ) );
}

private static void computationWithObjCreation() {

for ( int i = 0; i < count; i++ ) {
new Runnable() {

@Override
public void run() {

double x = Math.random() * Math.random();
}

}.run();
}

}

private static void simpleCompuation() {

for ( int i = 0; i < count; i++ ) {
double x = Math.random() * Math.random();
}

}

private static void computationWithObjCreationAndExecutors(
ExecutorService es, List< Batch > batches )
throws InterruptedException {

for ( Batch batch : batches ) {
es.submit( batch );
}

}

private static class Batch implements Runnable {

private final int computations;

public Batch( final int computations ) {

this.computations = computations;
}

@Override
public void run() {

int countdown = computations;
while ( countdown-- > -1 ) {
double x = Math.random() * Math.random();
}
}
}
}

When should I use Executor over ExecutorService

Q: When would be a scenario when I would use Executor over ExecutorService?

A: When you know that you are never going to use any of the ExecutorService methods.

There are few situations where you wouldn't need to manage the executor service in some way, there are few applications where you wouldn't need to manage the service.

(Hypothetically ... if you were creating your own task execution mechanism that didn't involve running the tasks in local threads, the management functions of ExecutorService might be a poor fit.)



Q: Can I use ExecutorService everywhere?

A: Assuming that you instantiated an instance of ExecutorService, yes.



Q: Is there a way to add an explicit shutdown for an Executor's execute or what is the general practice around it?

If you are asking if you can shut down the entire Executor, no there isn't a way.

Executor service is an interface. In the general sense, it could be implemented in a way that can't be shut down at all. But either way, if you don't know how the executor is implemented, then you don't know how to tell it to shut itself down.

(Sure ... if you know that it is likely to be a ExecutorService, then you could type cast it to ExecutorService and then call ExecutorService.shutdown().)

On the other hand, if you are asking if you can cancel a specific task submitted by Executor.execute, the answer is also No.

But if you use the ExecutorService.submit methods, you will get a Future for each task. You may be able to use Future.cancel() to stop the corresponding task from running or interrupt it if it is currently running.

How does the ExecutorService(int n) and Thread.activeCount() work?

A primary goal of the ExecutorService you have created is a fixed-size thread pool (in your case four threads). If you are seeing just four Thread instances created for your eight pieces of work, then it is working as designed.

You seem to think there should have been eight threads created. Imagine if you had submitted one million pieces of work; it would be a disaster if one million threads were created.

The abstraction allows you to control how many threads are used at one time, without regard to how many items there are to process. The ExecutorService deals with the complexity of reusing the four threads as many times as needed to process all of the items you pass to calls to execute.

An analogy that might explain this is a bank. You created a bank with four tellers (the four threads in the thread pool) and eight customers (the eight calls to execute) in line. When a teller finishes with a customer, the next customer in line gets serviced by that teller. You add someone to the queue with a call to execute and the ExecutorService manages everything else. You control the number of threads (tellers) by how you initialize the ExecutorService (there are many different flavors that you can create).

Best way to shutdown ExecutorService in Java

But I am worried what if write something wrong the code in callable
task f.get() takes forever and the program will halt forever and never
exit.

That's a bug. You need to make sure that doesn't happen.

With codes above, I can make sure threads are closed after 10 seconds

No, you can't. Even shutdownNow() doesn't actually guarantee that the executor threads are shut down (documentation):

There are no guarantees beyond best-effort attempts to stop processing
actively executing tasks. For example, typical implementations will
cancel via Thread.interrupt(), so any task that fails to respond to
interrupts may never terminate.

The ThreadPoolExecutor tries to "shut down now" by interrupting all worker threads. You need to make sure that your tasks handle interrupts correctly.

Once your tasks stop correctly, you can estimate how long a shutdown should take based on your application and the tasks you're shutting down. Then you can do a graceful shutdown:

  • Call shutdown()
  • Wait for an orderly shutdown for a reasonable amount of time using awaitShutdown()
  • If the executor is still running, call shutdownNow() and handle any outstanding tasks it returns.


Related Topics



Leave a reply



Submit