Rejectedexecutionexception Inside Single Executor Service

RejectedExecutionException inside single executor service

You wrote

To be frank initially I though that ExecutorService is GC-ed - reachability and scope are different things and GC is allowed to clear anything which is not reachable; but there is a Future<?> that will keep a strong reference to that service, so I excluded this.

But this is actually a very plausible scenario, which is described in JDK-8145304. In the bug report's example the ExecutorService is not held in a local variable, but a local variable does not prevent garbage collection per se.

Note that the exception message

Task java.util.concurrent.FutureTask@3148f668 rejected from  
java.util.concurrent.ThreadPoolExecutor@6e005dc9[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]

supports this, as the state of ThreadPoolExecutor@6e005dc9 is specified as Terminated.

The assumption that futures hold a reference to their creating ExecutorService is wrong. The actual type depends on the service implementation, but for the common ones, it will be an instance of FutureTask which has no reference to an ExecutorService. It's also visible in the exception message that this applies to your case.

Even if it had a reference, the creator would be the actual ThreadPoolExecutor, but it is the wrapping FinalizableDelegatedExecutorService instance which gets garbage collected and calls shutdown() on the ThreadPoolExecutor instance (Thin wrappers are generally good candidates for premature garbage collection in optimized code which just bypasses the wrapping).

Note that while the bug report is still open, the problem is actually fixed in JDK 11. There, the base class of FinalizableDelegatedExecutorService, the class DelegatedExecutorService has an execute implementation that looks like this:

public void execute(Runnable command) {
try {
e.execute(command);
} finally { reachabilityFence(this); }
}

RejectedExecutionException coming from single thread executor

See here: What could be the cause of RejectedExecutionException

as you could see from the error log, your ThreadPoolExecutor is Terminated.

Maybe this is what you want:

private void trigger(final Packet packet) {

executor.execute(new Runnable() {
@Override
public void run() {
// some code here
}
});
}

private final ExecutorService executor = Executors.newFixedThreadPool(10);

EDIT Reproduce the Problem with:

public static void main(String[] args) {
final ExecutorTest et = new ExecutorTest();
for (int i = 0; i < 50000; i++) {
et.trigger(i);
}
System.out.println("Done");
}

private void trigger(int i) {

try {
Executors.newSingleThreadExecutor().execute(() -> {
try {
Thread.sleep(1000);
} catch (final InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
} catch (final Exception e) {
System.out.println("Test " + i + " with " + Thread.activeCount());
e.printStackTrace();
}
}

I can not get RejectedExecutionException with ExecutorService

There are two aspects involved when defining an executor.

  1. The number of threads that will be used by the executor. This puts a limit on the number of concurrent tasks the executor can run. This is what you are setting via Executors.newFixedThreadPool(5).
  2. The size of the underlying task submission queue. This puts a limit on the number of tasks the underlying queue can store until it will throw an exception. The executor created by newFixedThreadPool uses an unbounded queue, therefore you do not get an exception.

You can achieve your desired behavior by creating your own executor service as follows and submitting 11 tasks to it (5 to use all the threads, 5 to fill the underlying task queue and 1 to overflow it).

new ThreadPoolExecutor(5, 
5,
2000L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5, true),
new ThreadPoolExecutor.CallerRunsPolicy());

What could be the cause of RejectedExecutionException

From ThreadPoolExecutor JavaDoc (emphasis mine)

New tasks submitted in method execute(java.lang.Runnable) will be rejected when the Executor has been shut down, and also when the Executor uses finite bounds for both maximum threads and work queue capacity, and is saturated. In either case, the execute method invokes the RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) method of its RejectedExecutionHandler. Four predefined handler policies are provided:

  1. In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime RejectedExecutionException upon rejection.
  2. In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.
  3. In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped.
  4. In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.)

It is possible to define and use other kinds of RejectedExecutionHandler classes. Doing so requires some care especially when policies are designed to work only under particular capacity or queuing policies.

Presumably therefore, reloading the war triggers a shutdown of the Executor. Try putting the relevant libraries in the war, so that Tomcat's ClassLoader has a better chance of correctly reloading your app.

ExecutorService and AtomicInteger : RejectedExecutionException

There is no need to use AtomicInteger here, since your Runnable lambda function invocations are guaranteed to execute sequentially (by new SingleThreadExecutor). Also, your Runnable lambda code were to take any time to execute (e.g. 2ms), your main loop will queue up far more than 10 tasks needed to hit your limit. You can see this happen if you add a 2ms sleep inside your Runnable lambda function, and also add a counter to your do/while loop, and print the value of the counter out at the end to see how many instances Runnables you queued up.

Assuming that you wish to test this code with concurrent threads, you would need to replace the call to newSingleThreadPool with newFixedThreadPool. The approach your code takes is problematic when concurrent threads are being used. In the following code, I've switched to newFixedThreadPool, added a counter, so we can see how many tasks are queued, and added to short pauses in your Runnable lambda function, just to represent a small amount of work. When I execute this program, atomicInteger became greater than 13000 and the program crashed with java.lang.OutOfMemoryError: GC overhead limit exceeded That is because, your runnable function always adds 10 to atomicInteger regardless of it's current value. And also, the code queues up more tasks than it needs. Here's the code with these small changes that illustrate the problem.

public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
AtomicInteger atomicInteger = new AtomicInteger(0);
int i=0;
do {
executor.submit(() -> {
pause(2); // simulates some small amount of work.
System.out.println("atomicInt="+atomicInteger.getAndAdd(10));
pause(2); // simulates some small amount of work.
if (atomicInteger.get() == 100) {
System.out.println("executor.shutdownNow()");
System.out.flush();
executor.shutdownNow();
}
});
if (atomicInteger.get() == 100) {
break;
}
} while (true);
System.out.println("final atomicInt="+atomicInteger.get());
System.out.println("final tasks queued="+i);
}
public static void pause(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ex) {
}
}

Here is a version that fixes the concurrency problems and moves the executor management out of the worker threads where it doesn't really belong:

private static int LIMIT = 100;
private static int INCREMENT = 10;

public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicInteger atomicInteger = new AtomicInteger(0);
for (int i=0; i < LIMIT/INCREMENT; i++) {
executor.submit(() -> {
pause(2);
System.out.println("atomicInt=" + atomicInteger.getAndAdd(INCREMENT));
System.out.flush();
pause(2);
});
}
executor.shutdown();
while (!executor.isTerminated()) {
System.out.println("Executor not yet terminated");
System.out.flush();
pause(4);
}
System.out.println("final atomicInt=" + atomicInteger.get());
}

public static void pause(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ex) {

}
}

ExecutorService throws RejectedExecutionException if only initialized with declaration

An executor will throw RejectedExecutionException if it has been shut down when you try to submit a new job for it to run. Your executor variable is declared static, which means all instances of the class will share the same variable. Is it possible that one instance of your class is shutting down the executor, and then another instance of the class is trying to schedule a new job?



Related Topics



Leave a reply



Submit