How to Unit Test a Code Snippet Running Inside Executor Service, Instead Waiting on Thread.Sleep(Time)

Unit test failure on testing a code involving ScheduledExecutorService#scheduleAtFixedRate

So you are triggering the method SampleCaching#start yourself, this in turn tells the ScheduledExecutorService to call the calculate method with an initial delay of 0 seconds. This is going to happen in a separate thread. Meanwhile, your test code continues to run and the next thing it does is verify that the populateMultimedia method was not called on your multimediaCache. and then the same for populateOne and populateTwo.
The success of this will depend on the progress made by the calculate method in the other thread that was started. If it has already called the populateMultimedia method then your first verify will fail and so will the others. If on the other hand, it has not progressed that far, the test will succeed but maybe it will fail on populateOne or populateTwo.

You either need to build in a synchronisation mechanism (e.g. java.util.concurrent.CountDownLatch) this your calculate method does countDown on at the end and your test code does an await on before verifying or you put a reasonable delay between calling the start method and the verify calls.
The first is intrusive as it changes component that you are testing. You could consider creating a subclass of SimpleCaching that overrides the calculate method but again that is intrusive if your calculate method is private.

Mockito: Verify if a method specified in any thread got executed?

Your test is running on one thread, and your code under test runs a new thread.

This means that your test verification runs before the runJob method because of multithreading.

At that point the test saying "Wanted but not invoked" is correct (the test code ran, checked if the production method had ran, it had not ... aand then in the background the production code ran (too late)).

Ideally you should separate the control of threading from the logic in your app. Wrap the Thread in a ThreadFactory, for real code you can pass an actual Thread, for test code you can pass an object that runs the code instantly (on the same thread).

Or (not recommended) you hack your test (this will help you understand):

void testVTService_Success() {
when(xBuilder.buildInputPayload(any(Request.class)))
.thenReturn(inputPayloadWithAllArguments);

activity.enact(TestConstants.request);

try { Thread.sleep(TimeUnit.SECONDS.toMillis(10)); } catch (Exception e) { assertTrue(false); }

verify(component, times(1)). runJob(any(XInput.class));
}

Now your test will always take 10 seconds, but hopefully the production code doesn't take 10 seconds to complete execution?

This is not ideal, like I said originally you would want to pull the Thread out of that Method, pass in some type of Factory to the class and pass a Fake object in the test. (Thus avoiding trying to test multithreaded code.)

Java - Run tasks in varying time intervals

It is possible to schedule tasks with ScheduledExecutorService combined with some logic. The .schedule argument lets you specify a time unit to use. You can declare a variable that can handle the increment you are trying to do.

int timer = 1000;
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
Runnable runnable = new Runnable() {
public void run()
{
//Move your code you want to implement here
}
};
//Increment your variable
while(!condition()) {
for(int i = 0; i < 5; i++) {
service.schedule(runnable, timer, TimeUnit.SECOND);
timer *= 2;
}
}

Moving your code execution within the runnable block and then scheduling it within a for loop where the timer is incremented should accomplish the effect you are going for. Hope that helps!

How to wait until a ThreadPoolExecutor's threads have exited?

Use a Thread factory.

    ThreadFactory tf = runnable -> {
return new Thread(() -> {
try {
runnable.run();
} finally {
System.out.println(Thread.currentThread().getName()+": my thread exit");
}
});
};

ExecutorService svc = Executors.newFixedThreadPool(3, tf);
Future f = svc.submit(() -> System.out.println(Thread.currentThread().getName()+": some task"));
f.get();
svc.shutdown();

Thread.sleep(2000);
System.out.println("main done");

How to resubmit thread to ExecutorService if it failed/finished

I would extends ThreadPoolExecutor that will implements the protected void afterExecute(Runnable r, Throwable t) by default this method doesn't do anything but you could do something like:

public class RetryExecutor extends ThreadPoolExecutor {

private final long maxRetries;
private Map<Runnable, Integer> retries = new ConcurrentHashMap<>();

public RetryExecutor(int corePoolSize, int maximumPoolSize, long maxRetries,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.maxRetries = maxRetries;
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t != null && shouldRetry(r)) {
retry(r);
} else if (t == null && r instanceof Future<?>) {
try {
((Future<?>) r).get();
} catch (CancellationException | ExecutionException e) {
// you should log the error
if (shouldRetry(r)) {
retry(r);
} else {
retries.remove(r);
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset or catch it to reschedule
}
} else {
retries.remove(r);
}
}

private boolean shouldRetry(Runnable r) {
final Integer nbRetries = retries.getOrDefault(r, 0);
return nbRetries < maxRetries;
}

private void retry(Runnable r) {
final Integer nbRetries = retries.getOrDefault(r, 0);
retries.put(r, nbRetries + 1);
this.execute(r);
}

}

But with this Future are useless it's more like a fire and forget.

As @NikitataGorbatchevski it won't work when you use a Callable. So here is a version that can handle both using Runnable and Callable. Indead FutureTask can't be run again if an error occurs (I reused code for waiting termination from FutureTask and not so sure about it):

public class RetryExecutor extends ThreadPoolExecutor {
private final long maxRetries;
private Map<Runnable, Integer> retries = new ConcurrentHashMap<>();

public RetryExecutor(int corePoolSize, int maximumPoolSize, long maxRetries,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.maxRetries = maxRetries;
}

@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new RetryFutureTask<>(runnable, value);
}

@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new RetryFutureTask<>(callable);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t != null && shouldRetry(r)) {
retry(r);
} else if (t == null && r instanceof Future<?>) {
try {
Object result = ((Future<?>) r).get();
} catch (CancellationException | ExecutionException e) {
// you should log the error
if (shouldRetry(r)) {
retry(r);
} else {
retries.remove(r);
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
} else {
retries.remove(r);
}
}

private boolean shouldRetry(Runnable r) {
final Integer nbRetries = retries.getOrDefault(r, 0);
return nbRetries < maxRetries;
}

private void retry(Runnable r) {
final Integer nbRetries = retries.getOrDefault(r, 0);
retries.put(r, nbRetries + 1);
this.execute(r);
}

private static class RetryFutureTask<V> implements RunnableFuture<V> {
private static final int NEW = 0;
private static final int RUNNING = 1;
private static final int ERROR = 2;
private static final int FINISHED = 3;
private static final int INTERRUPTED = 4;
private final AtomicInteger state = new AtomicInteger(NEW);
private final AtomicReference<Thread> runner = new AtomicReference<>();
private final AtomicReference<WaitNode> waiters = new AtomicReference<>();
private final Callable<V> callable;
private Exception error;
private V result;

public RetryFutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
}

public RetryFutureTask(Callable<V> callable) {
this.callable = callable;
}

@Override
public void run() {
try {
// If not already running
if (runner.compareAndSet(null, Thread.currentThread())) {
state.set(RUNNING);
result = this.callable.call();
state.compareAndSet(RUNNING, FINISHED);
}
} catch (Exception e) {
error = e;
state.compareAndSet(RUNNING, ERROR);
finishCompletion();
} finally {
runner.set(null);
}
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (state.get() == RUNNING || state.get() == INTERRUPTED) {
return false;
}
try {
Thread t = runner.get();
if (mayInterruptIfRunning && t != null) {
t.interrupt();
}
} finally {
state.set(INTERRUPTED);
finishCompletion();
}
return true;
}

@Override
public boolean isCancelled() {
return state.get() == INTERRUPTED;
}

@Override
public boolean isDone() {
return state.get() > RUNNING;
}

@Override
public V get() throws InterruptedException, ExecutionException {
if (state.get() <= RUNNING) {
awaitDone(false, 0L);
}
return resolve();
}

@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (state.get() <= RUNNING) {
awaitDone(true, unit.toNanos(timeout));
}
return resolve();
}

private V resolve() throws ExecutionException, InterruptedException {
if (state.get() == ERROR) {
throw new ExecutionException(error);
} else if (state.get() == INTERRUPTED) {
throw new InterruptedException();
}
return result;
}

private void finishCompletion() {
for (WaitNode q; (q = waiters.get()) != null;) {
if (waiters.compareAndSet(q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
}

private void awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (; ; ) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

int s = state.get();
if (s > RUNNING) {
if (q != null)
q.thread = null;
return;
} else if (q == null)
q = new WaitNode();
else if (!queued)
queued = waiters.compareAndSet(q.next, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return;
}
LockSupport.parkNanos(this, nanos);
} else
LockSupport.park(this);
}
}

private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters.get(), s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!waiters.compareAndSet(q, s))
continue retry;
}
break;
}
}
}

static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
}
}


Related Topics



Leave a reply



Submit