How to properly use Java Executor?
ExecutorService
ExecutorService executor=Executors.newFixedThreadPool(50);
It is simple and easy to use. It hides low level details of
ThreadPoolExecutor
.Prefer this one when number of
Callable/Runnable
tasks are small in number and piling of tasks in unbounded queue does not increase memory & degrade the performance of the system. If you haveCPU/Memory
constraints, useThreadPoolExecutor
with capacity constraints &RejectedExecutionHandler
to handle rejection of tasks.CountDownLatch
You have initialized
CountDownLatch
with a given count. This count is decremented by calls to thecountDown()
method. I am assuming that you are calling decrement in your Runnable task later. Threads waiting for this count to reach zero can call one of theawait()
methods. Callingawait()
blocks the thread until the count reaches zero. This class enables a java thread to wait until other set of threads completes their tasks.Use cases:
Achieving Maximum Parallelism: Sometimes we want to start a number of threads at the same time to achieve maximum parallelism
Wait N threads to completes before start execution
Deadlock detection.
Have a look at this article by Lokesh Gupta for more details.
ThreadPoolExecutor : It provides more control to finetune various thread pool parameters. If your application is constrained by number of active
Runnable/Callable
tasks, you should use bounded queue by setting the max capacity. Once the queue reaches maximum capacity, you can define RejectionHandler. Java provides four types ofRejectedExecutionHandler
policies.In the default
ThreadPoolExecutor.AbortPolicy
, the handler throws a runtime RejectedExecutionException upon rejection.In
ThreadPoolExecutor.CallerRunsPolicy
, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.In
ThreadPoolExecutor.DiscardPolicy
, a task that cannot be executed is simply dropped.In
ThreadPoolExecutor.DiscardOldestPolicy
, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.)If you want to simulate
CountDownLatch
behaviour, you can useinvokeAll()
method.
One more mechanism you did not quote is ForkJoinPool
The
ForkJoinPool
was added to Java in Java 7. TheForkJoinPool
is similar to
the JavaExecutorService
but with one difference. TheForkJoinPool
makes it
easy for tasks to split their work up into smaller tasks which are then
submitted to theForkJoinPool
too. Task stealing happens inForkJoinPool
when free worker threads steal tasks from busy worker thread queue.Java 8 has introduced one more API in ExecutorService to create work stealing pool. You don't have to create
RecursiveTask
andRecursiveAction
but still can useForkJoinPool
.public static ExecutorService newWorkStealingPool()
Creates a work-stealing thread pool using all available processors as its target parallelism level.
By default, it will take number of CPU cores as parameter.
All these four mechanism are complimentary to each other. Depending on level of granularity you want to control, you have to chose right ones.
What is the right way to use Java executor?
Looks correct to me. In the past I have followed the suggested implementation from the Java 7 JavaDoc for ExecutorService for stopping it. You can get it fromt he Java 7 Javadoc but I provide it below for convenience. Edit it to fit your needs, for example you might want to pass the number of seconds to wait. The good thing about using a CountDownLatch is that by the time it is done waiting you know the ExecutorService will terminate right away. Also, you might want to add a timeout to your latch's await if needed in future real world cases. Also, put your latch.countDOwn() in a try's finally block when using in real world application.
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
How to correctly use ExecutorService to manage the number of concurrently running SwingWorkers?
That moment when you think: It was so obvious!
ExecutorService executorService = Executors.newFixedThreadPool(20);
for (int i = 0; i < 500; i++) {
SwingWorker<Boolean, Void> worker = new SwingWorker<Boolean, Void>() {
@Override
protected Boolean doInBackground() throws Exception {
System.out.println("One SwingWorker just ran!");
return true;
}
protected void done() {
boolean status;
try {
status = get();
} catch (InterruptedException e) {
// This is thrown if the thread's interrupted.
} catch (ExecutionException e) {
// This is thrown if we throw an exception
// from doInBackground.
}
}
};
executorService.submit(worker);
}
It works great!
How to correctly implement executor that runs multiple iterations and waits for all tasks to complete and successfully terminates after tasks are done
With reference to ThreadPoolExecutor documentation. The awaitTermination() method description reads:
Blocks until all tasks have completed execution after a shutdown request
While the shutdown() method descriptin reads
Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted
Which indicates that awaitTermination() call is effective after a shutdown() call.
To solve the above problem, shutdown() needs to be called first and then awaitTermination()
NOTE: I have not personally tested this; however, John has, as mentioned in the comment of the original post and the mechanism works
ExecutorService's surprising performance break-even point --- rules of thumb?
- Using executors is about utilizing CPUs and / or CPU cores, so if you create a thread pool that utilizes the amount of CPUs at best, you have to have as many threads as CPUs / cores.
- You are right, creating new objects costs too much. So one way to reduce the expenses is to use batches. If you know the kind and amount of computations to do, you create batches. So think about thousand(s) computations done in one executed task. You create batches for each thread. As soon as the computation is done (java.util.concurrent.Future), you create the next batch. Even the creation of new batches can be done in parralel (4 CPUs -> 3 threads for computation, 1 thread for batch provisioning). In the end, you may end up with more throughput, but with higher memory demands (batches, provisioning).
Edit: I changed your example and I let it run on my little dual-core x200 laptop.
provisioned 2 batches to be executed
simpleCompuation:14
computationWithObjCreation:17
computationWithObjCreationAndExecutors:9
As you see in the source code, I took the batch provisioning and executor lifecycle out of the measurement, too. That's more fair compared to the other two methods.
See the results by yourself...
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecServicePerformance {
private static int count = 100000;
public static void main( String[] args ) throws InterruptedException {
final int cpus = Runtime.getRuntime().availableProcessors();
final ExecutorService es = Executors.newFixedThreadPool( cpus );
final Vector< Batch > batches = new Vector< Batch >( cpus );
final int batchComputations = count / cpus;
for ( int i = 0; i < cpus; i++ ) {
batches.add( new Batch( batchComputations ) );
}
System.out.println( "provisioned " + cpus + " batches to be executed" );
// warmup
simpleCompuation();
computationWithObjCreation();
computationWithObjCreationAndExecutors( es, batches );
long start = System.currentTimeMillis();
simpleCompuation();
long stop = System.currentTimeMillis();
System.out.println( "simpleCompuation:" + ( stop - start ) );
start = System.currentTimeMillis();
computationWithObjCreation();
stop = System.currentTimeMillis();
System.out.println( "computationWithObjCreation:" + ( stop - start ) );
// Executor
start = System.currentTimeMillis();
computationWithObjCreationAndExecutors( es, batches );
es.shutdown();
es.awaitTermination( 10, TimeUnit.SECONDS );
// Note: Executor#shutdown() and Executor#awaitTermination() requires
// some extra time. But the result should still be clear.
stop = System.currentTimeMillis();
System.out.println( "computationWithObjCreationAndExecutors:"
+ ( stop - start ) );
}
private static void computationWithObjCreation() {
for ( int i = 0; i < count; i++ ) {
new Runnable() {
@Override
public void run() {
double x = Math.random() * Math.random();
}
}.run();
}
}
private static void simpleCompuation() {
for ( int i = 0; i < count; i++ ) {
double x = Math.random() * Math.random();
}
}
private static void computationWithObjCreationAndExecutors(
ExecutorService es, List< Batch > batches )
throws InterruptedException {
for ( Batch batch : batches ) {
es.submit( batch );
}
}
private static class Batch implements Runnable {
private final int computations;
public Batch( final int computations ) {
this.computations = computations;
}
@Override
public void run() {
int countdown = computations;
while ( countdown-- > -1 ) {
double x = Math.random() * Math.random();
}
}
}
}
When should I use Executor over ExecutorService
Q: When would be a scenario when I would use Executor over ExecutorService?
A: When you know that you are never going to use any of the ExecutorService
methods.
There are few situations where you wouldn't need to manage the executor service in some way, there are few applications where you wouldn't need to manage the service.
(Hypothetically ... if you were creating your own task execution mechanism that didn't involve running the tasks in local threads, the management functions of ExecutorService
might be a poor fit.)
Q: Can I use
ExecutorService
everywhere?
A: Assuming that you instantiated an instance of ExecutorService
, yes.
Q: Is there a way to add an explicit shutdown for an Executor's execute or what is the general practice around it?
If you are asking if you can shut down the entire Executor
, no there isn't a way.
Executor
service is an interface. In the general sense, it could be implemented in a way that can't be shut down at all. But either way, if you don't know how the executor is implemented, then you don't know how to tell it to shut itself down.
(Sure ... if you know that it is likely to be a ExecutorService
, then you could type cast it to ExecutorService
and then call ExecutorService.shutdown()
.)
On the other hand, if you are asking if you can cancel a specific task submitted by Executor.execute
, the answer is also No.
But if you use the ExecutorService.submit
methods, you will get a Future
for each task. You may be able to use Future.cancel()
to stop the corresponding task from running or interrupt it if it is currently running.
How does the ExecutorService(int n) and Thread.activeCount() work?
A primary goal of the ExecutorService you have created is a fixed-size thread pool (in your case four threads). If you are seeing just four Thread instances created for your eight pieces of work, then it is working as designed.
You seem to think there should have been eight threads created. Imagine if you had submitted one million pieces of work; it would be a disaster if one million threads were created.
The abstraction allows you to control how many threads are used at one time, without regard to how many items there are to process. The ExecutorService deals with the complexity of reusing the four threads as many times as needed to process all of the items you pass to calls to execute
.
An analogy that might explain this is a bank. You created a bank with four tellers (the four threads in the thread pool) and eight customers (the eight calls to execute
) in line. When a teller finishes with a customer, the next customer in line gets serviced by that teller. You add someone to the queue with a call to execute
and the ExecutorService manages everything else. You control the number of threads (tellers) by how you initialize the ExecutorService (there are many different flavors that you can create).
Best way to shutdown ExecutorService in Java
But I am worried what if write something wrong the code in callable
task f.get() takes forever and the program will halt forever and never
exit.
That's a bug. You need to make sure that doesn't happen.
With codes above, I can make sure threads are closed after 10 seconds
No, you can't. Even shutdownNow()
doesn't actually guarantee that the executor threads are shut down (documentation):
There are no guarantees beyond best-effort attempts to stop processing
actively executing tasks. For example, typical implementations will
cancel via Thread.interrupt(), so any task that fails to respond to
interrupts may never terminate.
The ThreadPoolExecutor
tries to "shut down now" by interrupting all worker threads. You need to make sure that your tasks handle interrupts correctly.
Once your tasks stop correctly, you can estimate how long a shutdown should take based on your application and the tasks you're shutting down. Then you can do a graceful shutdown:
- Call
shutdown()
- Wait for an orderly shutdown for a reasonable amount of time using
awaitShutdown()
- If the executor is still running, call
shutdownNow()
and handle any outstanding tasks it returns.
Related Topics
How to Get the Current Date and Time of Your Timezone in Java
Counting the Number of Files in a Directory Using Java
How to Check for a Valid Url in Java
What Determines Kafka Consumer Offset
Deciphering Variable Information While Debugging Java
Eclipse Optimize Imports to Include Static Imports
How to Convert from Cmyk to Rgb in Java Correctly
How to Write an Arraylist of Strings into a Text File
What Is the Regex for "Any Positive Integer, Excluding 0"
@Transactional Method Called from Another Method Doesn't Obtain a Transaction
What Does "Atomic" Mean in Programming
Abstractmethoderror Using Uribuilder on Jax-Rs
Exception in Initializer Error in Java When Using Netbeans
How to Bundle Images in Jar File
Facebook Offline Access Step-By-Step
What Java 8 Stream.Collect Equivalents Are Available in the Standard Kotlin Library