How to Get the Threadpoolexecutor to Increase Threads to Max Before Queueing

How to get the ThreadPoolExecutor to increase threads to max before queueing?

How can I work around this limitation in ThreadPoolExecutor where the queue needs to be bounded and full before more threads will be started.

I believe I have finally found a somewhat elegant (maybe a little hacky) solution to this limitation with ThreadPoolExecutor. It involves extending LinkedBlockingQueue to have it return false for queue.offer(...) when there are already some tasks queued. If the current threads are not keeping up with the queued tasks, the TPE will add additional threads. If the pool is already at max threads, then the RejectedExecutionHandler will be called which does the put(...) into the queue.

It certainly is strange to write a queue where offer(...) can return false and put() never blocks so that's the hack part. But this works well with TPE's usage of the queue so I don't see any problem with doing this.

Here's the code:

// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
private static final long serialVersionUID = -6903933921423432194L;
@Override
public boolean offer(Runnable e) {
// Offer it to the queue if there is 0 items already queued, else
// return false so the TPE will add another thread. If we return false
// and max threads have been reached then the RejectedExecutionHandler
// will be called which will do the put into the queue.
if (size() == 0) {
return super.offer(e);
} else {
return false;
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// This does the actual put into the queue. Once the max threads
// have been reached, the tasks will then queue up.
executor.getQueue().put(r);
// we do this after the put() to stop race conditions
if (executor.isShutdown()) {
throw new RejectedExecutionException(
"Task " + r + " rejected from " + e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});

With this mechanism, when I submit tasks to the queue, the ThreadPoolExecutor will:

  1. Scale the number of threads up to the core size initially (here 1).
  2. Offer it to the queue. If the queue is empty it will be queued to be handled by the existing threads.
  3. If the queue has 1 or more elements already, the offer(...) will return false.
  4. If false is returned, scale up the number of threads in the pool until they reach the max number (here 50).
  5. If at the max then it calls the RejectedExecutionHandler
  6. The RejectedExecutionHandler then puts the task into the queue to be processed by the first available thread in FIFO order.

Although in my example code above, the queue is unbounded, you could also define it as a bounded queue. For example, if you add a capacity of 1000 to the LinkedBlockingQueue then it will:

  1. scale the threads up to max
  2. then queue up until it is full with 1000 tasks
  3. then block the caller until space becomes available to the queue.

Also, if you needed to use offer(...) in the
RejectedExecutionHandler then you could use the offer(E, long, TimeUnit) method instead with Long.MAX_VALUE as the timeout.

Warning:

If you expect tasks to be added to the executor after it has been shutdown, then you may want to be smarter about throwing RejectedExecutionException out of our custom RejectedExecutionHandler when the executor-service has been shutdown. Thanks to @RaduToader for pointing this out.

Edit:

Another tweak to this answer could be to ask the TPE if there are idle threads and only enqueue the item if there is so. You would have to make a true class for this and add ourQueue.setThreadPoolExecutor(tpe); method on it.

Then your offer(...) method might look something like:

  1. Check to see if the tpe.getPoolSize() == tpe.getMaximumPoolSize() in which case just call super.offer(...).
  2. Else if tpe.getPoolSize() > tpe.getActiveCount() then call super.offer(...) since there seem to be idle threads.
  3. Otherwise return false to fork another thread.

Maybe this:

int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
return super.offer(e);
} else {
return false;
}

Note that the get methods on TPE are expensive since they access volatile fields or (in the case of getActiveCount()) lock the TPE and walk the thread-list. Also, there are race conditions here that may cause a task to be enqueued improperly or another thread forked when there was an idle thread.

ThreadPoolExecutor - Use threads before queue

I see more a design issue than threading package issue here.

One use threading to either reduce delay or increase throughput. Given you are creating 600 threads, this is more a case of increase throughput on a server. However, any modern server don't have 600 CPU cores and you will suffer severely from context switch. It's easy and more efficient to have a fixed number of threads working on a set of queues.

If you really think your case is justified, simply create your own interface wraps a standard thread pool and have a bit custom logic on launching on a separate thread. However, I really doubt this would increase your system performance.

Essentially, unless really, really justified, I don't think creating new thread is a good solution than queuing in real-time systems.

why the maxPoolSize in the ThreadPoolExecutor do nothing?

New threads will only be created up to maxPoolSize once the queue is full. Before, the limit is the one defined at corePoolSize.

Reference: http://www.bigsoft.co.uk/blog/index.php/2009/11/27/rules-of-a-threadpoolexecutor-pool-size

ThreadPoolExecutor's queuing behavior customizable to prefer new thread creation over queuing?

No way to get this exact behavior with a ThreadPoolExecutor.

But, here's a couple solutions:

  1. Consider,

    • If less than corePoolSize threads are running, a new thread will be created for every item queued until coorPoolSize threads are running.

    • A new thread will only be created if the queue is full, and less than maximumPoolSize threads are running.

    So, wrap a ThreadPoolExecutor in a class which monitors how fast items are being queued. Then, change the core pool size to a higher value when many items are being submitted. This will cause a new thread to be created each time a new item is submitted.

    When the submission burst is done, core pool size needs to be manually reduced again so the threads can naturally time out. If you're worried the busy burst could end abruptly, causing the manual method to fail, be sure to use allowCoreThreadTimeout.

  2. Create a fixed thread pool, and allowCoreThreadTimeout

    Unfortunately this uses more threads during low submission bursts, and stores no idle threads during zero traffic.

Use the 1st solution if you have the time, need, and inclination as it will handle a wider range of submission frequency and so is a better solution in terms of flexibility.

Otherwise use the 2nd solution.

Why can't the core threads of a thread pool in Java be reused in the initial phase?

This was actually requested already: JDK-6452337. A core libraries developer has noted:

I like this idea, but ThreadPoolExecutor is already complicated enough.

Keep in mind that corePoolSize is an essential part of ThreadPoolExecutor and is saying how many workers are always active/idle at least. Reaching this number just naturally takes a very short time. You set corePoolSize according to your needs and it's expected that the workload will meet this number.

My assumption is that optimizing this "warm-up phase" – taking it for granted that this will actually increase efficiency – is not worth it. I can't quantify for you what additional complexity this optimization will bring, I'm not developing Java Core libraries, but I assume that it's not worth it.

You can think of it like that: The "warm-up phase" is constant while the thread pool will run for an undefined amount of time. In an ideal world, the initial phase actually should take no time at all, the workload should be there as you create the thread pool. So you are thinking about an optimization that optimizes something that is not the expected thread pool state.

The thread workers will have to be created at some point anyways. This optimization only delays the creation. Imagine you have a corePoolSize of 10, so there is the overhead of creating 10 threads at least. This overhead won't change if you do it later. Yes, resources are also taken later but here I'm asking if the thread pool is configured correctly in the first place: Is corePoolSize correct, does it meet the current workload?

Notice that ThreadPoolExecutor has methods like setCorePoolSize(int) and allowCoreThreadTimeOut(boolean) and more that allow you to configure the thread pool according to your needs.



Related Topics



Leave a reply



Submit