How to Make Threadpoolexecutor's Submit() Method Block If It Is Saturated

How to make ThreadPoolExecutor's submit() method block if it is saturated?

One of the possible solutions I've just found:

public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;

public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}

public void submitTask(final Runnable command)
throws InterruptedException, RejectedExecutionException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
throw e;
}
}
}

Are there any other solutions? I'd prefer something based on RejectedExecutionHandler since it seems like a standard way to handle such situations.

Java: ExecutorService that blocks on submission after a certain queue size

I have done this same thing. The trick is to create a BlockingQueue where the offer() method is really a put(). (you can use whatever base BlockingQueue impl you want).

public class LimitedQueue<E> extends LinkedBlockingQueue<E> 
{
public LimitedQueue(int maxSize)
{
super(maxSize);
}

@Override
public boolean offer(E e)
{
// turn offer() and add() into a blocking calls (unless interrupted)
try {
put(e);
return true;
} catch(InterruptedException ie) {
Thread.currentThread().interrupt();
}
return false;
}

}

Note that this only works for thread pool where corePoolSize==maxPoolSize so be careful there (see comments).

Fixed thread pool: pause until thread available?

Create a simple thread pool with available worker thread same as pool size.before submitting check if available thread is there then submit else wait on lock.

Also you can use Semaphore which will block until the acquire() will get some value.
Semaphore exanple:

Semaphore semaphore = new Semaphore(pool_size);

//critical section
semaphore.acquire();

...

semaphore.release();

Example of simple thread pool:

    private List<WorkerThread> workers;
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();

Submit method

   public boolean submit(Runnable runnable) {
if (runnable == null) {
return false;
}

synchronized (Lock) {

handoffPending = true;

// Wait until a worker thread is available
while ((availWorkers.size() < 1) && !isShutdown) {
try {
Lock.wait(500);
} catch (InterruptedException ignore) {
}
}

if (!isShutdown) {
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);
} else {
// If the thread pool is going down, execute the Runnable
// within a new additional worker thread (no thread from the pool).
WorkerThread wt = new WorkerThread(this, threadGroup,
"WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
busyWorkers.add(wt);
workers.add(wt);
wt.start();
}
Lock.notifyAll();

}

return true;
}

Override execute() in ThreadPoolExecutor

Depends on the ThreadPoolExecutor state and settings because not all task submissions pass through the BlockingQueue. Usually you just want to change the RejectedExecutionHandler of the ThreadPoolExecutor to the ThreadPoolExecutor.CallerRunsPolicy which will throttle submissions. If you really want to block on submit then you should use a CompletionService and call the 'take' method when you want to block. You can also create a subclass and use a Semaphore to block the execute method. See JDK-6648211 : Need for blocking ThreadPoolExecutor for more information.

Why ThreadPoolExecutor has BlockingQueue as its argument?

The problem occurs because you're task queue is too small and this is indicated by the documentation of the execute method:

Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current RejectedExecutionHandler.

So the first problem is that you're setting your queue size to a very small number:

int poolSize = 2;
int maxPoolSize = 3;
ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);

And then you state "If [I] try 7th, 8th... task" then you would get a RejectedExecutionException because you're past the capacity of the queue. There are two ways to resolve your problem (I would recommend doing both):

  1. Increase the size of the queue.
  2. Catch the exception and re-try adding the task.

You should have something along the lines of this:

public void ExecuteTask(MyRunnableTask task) {
bool taskAdded = false;
while(!taskAdded) {
try {
executor.execute(task);
taskAdded = true;
} catch (RejectedExecutionException ex) {
taskAdded = false;
}
}
}

Now, to address your other questions...

Here then what is the role of BlockingQueue if it is missing the tasks?

The role of the BlockingQueue is to complete the Producer/Consumer pattern and if it's large enough, then you shouldn't see the issues you're encountering. As I mentioned above, you need to increase the queue size and catch the exception then retry executing the task.

Why cant we go for linkedlist?

A linked list is neither thread safe, nor is it blocking. The Producer/Consumer pattern tends to work best with a blocking queue.

Update

Please don't be offended by the following statements, I'm intentionally using more stringent language in order to put emphasis on the fact that your first assumption should never be that there is something wrong with the library you're using (unless you wrote the library yourself and you know that there is a specific problem in it)!

So let's put this concern to rest right now: neither the ThreadPoolExecutor nor the Java library are the problem here. It's entirely your (mis)use of the library that's causing the problem. Javmex has a great tutorial explaining the exact situation you're seeing.

There could be several reasons why you're filling up the queue faster than you're emptying it:

  1. The thread that's adding tasks for executing is adding them too fast.
  2. The tasks are taking too long to execute.
  3. Your queue is too small.
  4. Any combination of the above 3.

There are a bunch of other reasons too, but I think the above would be the most common.

I would give you a simple solution with an unbounded queue, but it would NOT resolve your (mis)use of the library. So before we go blaming the Java library, let's see a concise example that demonstrates the exact problem you're encountering.

Update 2.0

Here are a couple of other questions addressing the specific problem:

  1. ThreadPoolExecutor Block When Queue Is Full?
  2. How to make ThreadPoolExecutor's submit() method block if it is saturated?

Create and add Runnable only when one/more of the worker Thread is available..?

Why don't you set a limit to how many tasks can run concurrently. Like:

HashSet<Future> futures = new HashSet<>();
int concurrentTasks = 1000;

for (int ii=0; ii<100000000; ii++) {
while(concurrentTasks-- > 0 && ii<100000000) {
concurrentTasks.add(executor.submit(new Task(ii)));
}
Iterator<Future> it = concurrentTasks.iterator();
while(it.hasNext()) {
Future task = it.next();
if (task.isDone()) {
concurrentTasks++;
it.remove();
}
}
}


Related Topics



Leave a reply



Submit