Executorservice That Interrupts Tasks After a Timeout

ExecutorService that interrupts tasks after a timeout

You can use a ScheduledExecutorService for this. First you would submit it only once to begin immediately and retain the future that is created. After that you can submit a new task that would cancel the retained future after some period of time.

 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); 
final Future handler = executor.submit(new Callable(){ ... });
executor.schedule(new Runnable(){
public void run(){
handler.cancel();
}
}, 10000, TimeUnit.MILLISECONDS);

This will execute your handler (main functionality to be interrupted) for 10 seconds, then will cancel (i.e. interrupt) that specific task.

ScheduledExecutorService that interrupts after a timeout

Does this help?Task begins every 10 seconds,and it takes 5 seconds to be done,you will get a InterruptedException when timedout(3 seconds).

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Worker implements Runnable {
ListeningExecutorService listeningExecutorService;
ScheduledExecutorService scheduledExecutorService;
Runnable task;

public Worker(ListeningExecutorService listeningExecutorService, ScheduledExecutorService scheduledExecutorService, Runnable task) {
this.listeningExecutorService = listeningExecutorService;
this.scheduledExecutorService = scheduledExecutorService;
this.task = task;
}

@Override
public void run() {
ListenableFuture future = listeningExecutorService.submit(task);
Futures.withTimeout(future, 3, TimeUnit.SECONDS, scheduledExecutorService);
}

public static void main(String[] args) {
ListeningExecutorService listeningExecutorService = MoreExecutors
.listeningDecorator(Executors.newCachedThreadPool());
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
Worker worker = new Worker(listeningExecutorService, scheduledExecutorService, new Runnable() {
@Override
public void run() {
System.out.println("Now begin: " + new Date());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Now end: " + new Date());
}
});
scheduledExecutorService.scheduleAtFixedRate(worker, 0, 10, TimeUnit.SECONDS);
}
}

Executor Service - timeout of thread

my callable will interrupt after the specified time(timeout) has passed

Not true. The task will continue to execute, instead you will have a null string after the timeout.

If you want to cancel it:

  timeout.cancel(true) //Timeout timeout = new Timeout();

P.S. As you have it right now this interrupt will have no effect what so ever. You are not checking it in any way.

For example this code takes into account interrupts:

    private static final class MyCallable implements Callable<String>{

@Override
public String call() throws Exception {
StringBuilder builder = new StringBuilder();
try{
for(int i=0;i<Integer.MAX_VALUE;++i){
builder.append("a");
Thread.sleep(100);
}
}catch(InterruptedException e){
System.out.println("Thread was interrupted");
}
return builder.toString();
}
}

And then:

        ExecutorService service = Executors.newFixedThreadPool(1);
MyCallable myCallable = new MyCallable();
Future<String> futureResult = service.submit(myCallable);
String result = null;
try{
result = futureResult.get(1000, TimeUnit.MILLISECONDS);
}catch(TimeoutException e){
System.out.println("No response after one second");
futureResult.cancel(true);
}
service.shutdown();

Create a timeout for a set of tasks assigned to a threadpool java

You can use the ExecutorService.invokeAll() to run a collection of tasks with a timeout. After the method finishes (finished work or timed-out) you will have to check all the futures to see if they were canceled (because of the timeout) or finished. If they are finished you will have to check that they finished because the work was done, not because of an exception (when you call Future.get).

The code may look like this:

    final ExecutorService service = Executors.newCachedThreadPool();
final List<Future<Double>> futures = service.invokeAll(tasks, 2, TimeUnit.SECONDS);
final List<CallableTask> tasks = Arrays.asList(new CallableTask(1, TimeUnit.SECONDS),
new CallableTask(1, TimeUnit.HOURS), new CallableTask(100, TimeUnit.MILLISECONDS),
new CallableTask(50, TimeUnit.SECONDS));

for (Future<Double> result : futures) {
if (!result.isCancelled()) {
try {
System.out.println("Result: " + result.get());
} catch (ExecutionException e) {
// Task wasn't completed because of exception, may be required to handle this case
}
}
}

In my case, CallableTask is a Callable implementation and it's used to make the code more simple since all the tasks submitted are the same. You can use the same approach to simplify your code.

I've added a sample of how the CallableTask looks like:

    public class CallableTask implements Callable<Double> {

private static AtomicInteger count = new AtomicInteger(0);
private final int timeout;
private final TimeUnit timeUnit;
private final int taskNumber = count.incrementAndGet();

public CallableTask(int timeout, TimeUnit timeUnit) {
this.timeout = timeout;
this.timeUnit = timeUnit;
}

@Override
public Double call() {
System.out.println("Starting task " + taskNumber);
try {
timeUnit.sleep(timeout);
} catch (InterruptedException e) {
System.out.println("Task interrupted: " + taskNumber);
Thread.currentThread().interrupt();
return null;
}
System.out.println("Ending task " + taskNumber);
return Math.random();
}
}

InvokeAll timeout interrupt

ExecutorService was designed to support cancellation of tasks that respond to interruption. So, the proper way to do this is by making your Callable implementations notice that they've been interrupted.

If your task is purely computational, or if it loops frequently, calling a third-party library, this is easy. Somewhere in your task, you'll have a loop that looks something like:

while (!Thread.interrupted()) {
/* Keep going */
}

Of course, you might test other task-specific conditions in your loop too, but you should be testing the interrupted state of the current thread.

Calling interrupted() will clear the interrupted status. Calling Thread.currentThread().isInterrupted() will preserve the interrupted status. For worker threads in an ExecutorService, keeping the interrupted status doesn't matter too much—the service is interrupting its own threads, and when its thread returns from your task, the service clears the status without looking at it. But, it's a good habit to adopt, so that your code will work well if it is incorporated in a task. To re-assert the interrupted status after catching an InterruptedException or detecting it with Thread.interrupted(), simply call Thread.currentThread().interrupt().

If the task is a long-running call to a third-party library that doesn't support interruption, or a blocking call to an uninterruptible I/O operation, your "task" is much more difficult. You may be out of luck altogether, or you might be able to find a kludge to abort the operation asynchronously.

For example, if you create a Socket, and pass it (or its streams) to your task, you could asynchronously close the socket. If invokeAll times out, the caller would need additional code to close the socket.



Related Topics



Leave a reply



Submit