Scheduledexecutorservice Exception Handling

ScheduledExecutorService Exception handling

You should use the ScheduledFuture object returned by your scheduler.scheduleWithFixedDelay(...) like so :

ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<?> handle =
scheduler.scheduleWithFixedDelay(new Runnable() {
public void run() {
throw new RuntimeException("foo");
}
}, 1, 10, TimeUnit.SECONDS);

// Create and Start an exception handler thread
// pass the "handle" object to the thread
// Inside the handler thread do :
....
try {
handle.get();
} catch (ExecutionException e) {
Exception rootException = e.getCause();
}

Exception Handling ScheduledExecutorService

I was encountering some exceptions by which the further execution
stops.

That is the expected behavior of ScheduledExecutorService.scheduleAtFixRate() according to the specification :

If any execution of the task encounters an exception, subsequent
executions are suppressed.

About your need :

I want to catch the exception for a systematic shutdown of my
application.

Should I handle the exception using a third thread that monitors both
futures and handles the Exception or is there any other better way?

Handling the future return with ScheduledFuture.get() looks the right.
According to ScheduledFuture.scheduleAtFixedRate() specification :

Otherwise, the task will only terminate via cancellation or
termination of the executor.

So you don't even need to create a new scheduled future.

Just run two parallel tasks (with ExecutorService or two threads is also possible) that wait on get() of each Future and that stops the application in case of exception thrown in the task :

Future<?> futureA = ses1.scheduleAtFixRate(..) // for thread 1  
Future<?> futureB = ses2.scheduleAtFixRate(..) // for thread 2
submitAndStopTheApplicationIfFail(futureA);
submitAndStopTheApplicationIfFail(futureB);

public void submitAndStopTheApplicationIfFail(Future<?> future){
executor.submit(() -> {
try {
future.get();
} catch (InterruptedException e) {
// stop the application
} catch (ExecutionException e) {
// stop the application
}
});
}

How to test ScheduledExecutorService exception handling in Junit?

I think you are going down the wrong rabbit hole here. Meaning: when you check the javadoc for the method you are using, you find:

Creates a single-threaded executor that can schedule commands to run after a given delay, or to execute periodically. (Note however that if this single thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.)

In other words: you are asking how to test something that is guaranteed to work by the Java system library you are using. And in that sense you are wasting your time.

You might rather spend time to improve your code to make it easier to test. You see - when your class would receive an ExecutorService object (instead of creating one for itself) you could pass in a same thread executor for your unit tests. And all of a sudden, your unit tests can run on one thread which makes the whole testing a lot easier - as it allows you to get rid of your sleep statements in your tests. (and those sleep statements are much more of a problem than chances that threads are not re-started although the system library guarantees you to do so).

Beyond that: your runnable is already written in a way that should guarantee that threads running this code never die (of course, it is questionable to catch Throwable). But in order to test that, I guess you only need another "test provider" where update() throws any kind of exception.

Unhandled exceptions with Java scheduled executors

The Javadoc of both scheduleAtFixedRate and scheduleWithFixedDelay says "If any execution of the task encounters an exception, subsequent executions are suppressed." I don't find that to be exactly crystal clear, but it seems to be saying that if your run method throws any kind of exception, then the scheduler will effectively drop that task. Any other tasks running via that scheduler should not be affected. It shouldn't be hard to test what it actually does...

Cancellation of the task may not necessarily be a bad thing. If the run method throws a RuntimeException, it's probably got a bug somewhere, and the state of the system is unknown. But at minimum I would advise catching RuntimeException in your run method, and logging the full stack trace at SEVERE. You may want to then rethrow to cancel the task, depending on the circumstances. But either way you'll need the logging to have a fighting chance of working out what went wrong.

How to handle errors in a repeated task using ScheduledExecutorService?

Based on a few of the helpful comments above here's the gist of my current code - a few q's remain within so would welcome any further comments:

public class ScheduledTask implements Runnable {
// Configurable values
private static final int CORE_THREAD_POOL_SIZE = 1;
private static final int INITIAL_DELAY_MS = 0;
private static final int INTERVAL_MS = 1000;

private final ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(ScheduledTask.CORE_THREAD_POOL_SIZE);

private ScheduledFuture<?> scheduledFuture;

public void run() {
try {
try {
// Do stuff
} catch RecoverableCheckedException rce { // E.g. SAXException
// Log and handle appropriately
}
} catch UnrecoverableCheckedException uce { // E.g. ParserConfigurationException
// Not 100% happy with this. It means the caller would need to call
// getCause() to get the real Exception in this case. But other
// RuntimeExceptions wouldn't be wrapped. Could consider catching
// and wrapping all RuntimeExceptions but I like that even less!
throw new RuntimeException(uce);
}
}

public boolean isScheduling() {
return (this.scheduledFuture != null)
&& (!this.scheduledFuture.isDone());
}

// May not be needed but provided in case this class is shared.
public boolean isShutdown() {
return scheduledExecutorService.isShutdown();
}

public void start() {
// If the Executor Service has already been shutdown, would expect
// a RejectedExecutionException to be thrown here(?) Not sure what
// would happen if this method were called when isScheduling() is
// true?
this.scheduledFuture =
this.scheduledExecutorService.scheduleWithFixedDelay(
this,
ScheduledTask.INITIAL_DELAY_MS,
ScheduledTask.INTERVAL_MS,
TimeUnit.MILLISECONDS);
}

// To be called once at the very end - e.g. on program termination.
public void shutdown() {
this.scheduledExecutorService.shutdown();
}
}

ScheduledExecutorService: How do I catch its exceptions?

You can use this Constructor to inject your own implementation of a RejectedExecutionHandler:

public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)

So you would instead of using executor = Executors.newScheduledThreadPool(1); write:

executor = new ScheduledThreadPoolExecutor(1, new MyRejectedHandler());

where MyRejectedHandler would be an instance of your implementation of the interface RejectedExecutionHandler.

I used this once to handle congestion.

There are also predefined "policies":

  • ThreadPoolExecutor.AbortPolicy (the default)
  • ThreadPoolExecutor.CallerRunsPolicy ( runs the Task on the Thread that called execute, blocking it, of course )
  • ThreadPoolExecutor.DiscardPolicy ( discards the Task that has been rejected )
  • ThreadPoolExecutor.DiscardOldestPolicy ( adds the rejected task to the queue and discards the oldest task in the queue instead )

You would set them like this:

executor = new ScheduledThreadPoolExecutor(1, new ThreadPoolExecutor.DiscardOldestPolicy());

Do I really have to mess with overriding for that?

Not sure what you mean by that. But if you thought you have to extend ThreadPoolExecutor, then in most cases : no. That Family of classes is IMHO so neatly designed and complete that you nearly never have the urge to do that. It is almost like a construction kit which allows for a vast variety of behaviors. The most invasive implementation I had to do once was to inject my own ThreadFactory (for thread naming) and a RejectedExecutionHandler that removed optional low-prio tasks from the queue on congestion.

Other Exceptions

If you want to catch Exceptions that arise inside the run method, then I suggest you enclose the body of it in try/catch. Which is ok for first shot. I tend to narrow down try/catch blocks to where they are really needed. This enables you to handle them on the executing Thread of the pool.

Exception handling with multiple futures from ScheduledExecutorService

You can do it with a Listener or Observer-Like pattern:

interface IFutureListener{
void onException( Throwable t );
}

final IFutureListener errHandler = new IFutureListener(){
@override public void onException( Throwable t ){
// shutdown Service here
}
};
// ...
Future<?> handle = scheduler.scheduleWithFixedDelay(new Runnable() {
final IFutureListener callback = errHandler;
public void run() {
try{
System.out.println("No exception!");
if (i > 2) {
System.out.println("Exception!");
throw new RuntimeException("foo");
}
i++;
}
catch( Exception ex ){
callback.onException(ex);
}
}
}, 0, 500, TimeUnit.MILLISECONDS);

You still might need some tweaks to this, but this is the gist of it.

Guava's ListenableFuture as @dimo414 writes in comment will give you something similar. But if you do not want / are not allowed to use 3rd parties, this is a way you can implement it yourself.

Thanks to @efekctive: I also suggest logging exceptions. They should almost never just be swallowed silently except you know exactly what you are doing.



Related Topics



Leave a reply



Submit