How do I implement task prioritization using an ExecutorService in Java 5?
At first blush it would seem you could define an interface for your tasks that extends Runnable
or Callable<T>
and Comparable
. Then wrap a ThreadPoolExecutor
with a PriorityBlockingQueue
as the queue, and only accept tasks that implement your interface.
Taking your comment into account, it looks like one option is to extend ThreadPoolExecutor
, and override the submit()
methods. Refer to AbstractExecutorService
to see what the default ones look like; all they do is wrap the Runnable
or Callable
in a FutureTask
and execute()
it. I'd probably do this by writing a wrapper class that implements ExecutorService
and delegates to an anonymous inner ThreadPoolExecutor
. Wrap them in something that has your priority, so that your Comparator
can get at it.
Java Executors: how can I set task priority?
Currently the only concrete implementations of the Executor interface are the ThreadPoolExecutor and the ScheduledThreadpoolExecutor
Instead of using the utility / factory class Executors, you should create an instance using a constructor.
You can pass a BlockingQueue to the constructors of the ThreadPoolExecutor.
One of the implementations of the BlockingQueue, the PriorityBlockingQueue lets you pass a Comparator to a constructor, that way enabling you to decide the order of execution.
Testing PriorityBlockingQueue in ThreadPoolExecutor
The priority is only taken into account if the pool is fully busy and you submit several new tasks. If you define your pool with only one thread, you should get the expected output. In your example, all tasks get executed concurrently and which one finishes first is somewhat random.
By the way the linked implementation has a problem and throws an exception if your queue is full and you submit new tasks.
See below a working example of what you are trying to achieve (I have overriden newTaskFor
in a simplistic way, just to make it work - you might want to improve that part).
It prints: 1 2 3 4 5
.
public class Test {
public static void main(String[] args) {
PriorityExecutor executorService = (PriorityExecutor) PriorityExecutor.newFixedThreadPool(1);
executorService.submit(getRunnable("1"), 1);
executorService.submit(getRunnable("3"), 3);
executorService.submit(getRunnable("2"), 2);
executorService.submit(getRunnable("5"), 5);
executorService.submit(getRunnable("4"), 4);
executorService.shutdown();
try {
executorService.awaitTermination(30, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static Runnable getRunnable(final String id) {
return new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(id);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
}
static class PriorityExecutor extends ThreadPoolExecutor {
public PriorityExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
//Utitlity method to create thread pool easily
public static ExecutorService newFixedThreadPool(int nThreads) {
return new PriorityExecutor(nThreads, nThreads, 0L,
TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>());
}
//Submit with New comparable task
public Future<?> submit(Runnable task, int priority) {
return super.submit(new ComparableFutureTask(task, null, priority));
}
//execute with New comparable task
public void execute(Runnable command, int priority) {
super.execute(new ComparableFutureTask(command, null, priority));
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return (RunnableFuture<T>) callable;
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return (RunnableFuture<T>) runnable;
}
}
static class ComparableFutureTask<T> extends FutureTask<T> implements Comparable<ComparableFutureTask<T>> {
volatile int priority = 0;
public ComparableFutureTask(Runnable runnable, T result, int priority) {
super(runnable, result);
this.priority = priority;
}
public ComparableFutureTask(Callable<T> callable, int priority) {
super(callable);
this.priority = priority;
}
@Override
public int compareTo(ComparableFutureTask<T> o) {
return Integer.valueOf(priority).compareTo(o.priority);
}
}
}
Related Topics
Explanation of Generic <T Extends Comparable<? Super T>> in Collection.Sort/ Comparable Code
How to Run MySQL In-Memory for Junit Test Cases
Spring - How to Use Multiple Transaction Managers in the Same Application
Checking If Unlimited Cryptography Is Available
Extract Integer Part in String
Count Int Occurrences with Java8
Why Does Java Code with an Inner Class Generates a Third Someclass$1.Class File
Run Java File as Administrator with Full Privileges
How to Modify the Raw Xml Message of an Outbound Cxf Request
Java List Sorting: How to Keep a List Permantly Sorted Automatically Like Treemap
How to Disable 'X-Frame-Options' Response Header in Spring Security
Java 8: How to Work with Exception Throwing Methods in Streams