Deadlock in Threadpool

Deadlock in ThreadPool

Ok, so the main problem with the implementation is: how to make sure no signal is lost and avoid dead locks ?

In my experience, this is REALLY hard to achieve with condition variables and mutex, but easy with semaphores. It so happens that ruby implement an object called Queue (or SizedQueue) that should solve the problem. Here is my suggested implementation:

require 'thread'
begin
require 'fasttread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end

class ThreadPool
class Worker
def initialize(thread_queue)
@mutex = Mutex.new
@cv = ConditionVariable.new
@queue = thread_queue
@running = true
@thread = Thread.new do
@mutex.synchronize do
while @running
@cv.wait(@mutex)
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
end
@queue << self
end
end
end
end

def name
@thread.inspect
end

def get_block
@block
end

def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end

def reset_block
@block = nil
end

def busy?
@mutex.synchronize { !@block.nil? }
end

def stop
@mutex.synchronize do
@running = false
@cv.signal
end
@thread.join
end
end

attr_accessor :max_size

def initialize(max_size = 10)
@max_size = max_size
@queue = Queue.new
@workers = []
end

def size
@workers.size
end

def busy?
@queue.size < @workers.size
end

def shutdown
@workers.each { |w| w.stop }
@workers = []
end

alias :join :shutdown

def process(block=nil,&blk)
block = blk if block_given?
worker = get_worker
worker.set_block(block)
end

private

def get_worker
if !@queue.empty? or @workers.size == @max_size
return @queue.pop
else
worker = Worker.new(@queue)
@workers << worker
worker
end
end

end

And here is a simple test code:

tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown

why does this thread pool deadlock or run too many times?

You check the "done" count, then get the lock. This allows multiple threads to be waiting for the lock. In particular, there might not be a thread that enters the second if body.

The other side of that is because you have all threads running all the time, the "last" thread may not get access to its exclusive section early (before enough threads have run) or even late (because additional threads are waiting at the mutex in the first loop).

To fix the first issue, since the second if block has all of the same code that is in the first if block, you can have just one block that checks the count to see if you've reached the end and should set the out value.

The second issue requires you to check m_num_comps_done a second time after acquiring the mutex.

Threadpool Deadlock: designing against or detecting

Your "more callbackish" answer seems like it'd mostly be solved for you via the CompletionStage API in Java 8 - the lack of a "runAfterAllAsync" method means you may have to do some external work to get something to happen after your group of 3 subtasks, but this is where I'd start to look. This tutorial has an example that may be of some help.

Will this cause a deadlock or a bad pattern?

This would be a safe way to spawn a thread from within a thread that itself was initiated by ThreadPoolExecutor. This may not be necessary if ThreadPoolExecutor itself is thread-safe. The output shows how, in this case, there would be 10 concurrent threads.

from concurrent.futures import ThreadPoolExecutor
from time import sleep


BAR_THREADS = 4
FOO_THREADS = 2

def bar(_):
print('Running bar')
sleep(1)

def foo(_):
print('Running foo')
with ThreadPoolExecutor(max_workers=BAR_THREADS) as executor:
executor.map(bar, range(BAR_THREADS))

with ThreadPoolExecutor(max_workers=FOO_THREADS) as executor:
executor.map(foo, range(FOO_THREADS))

print('Done')

Output:

Running foo
Running foo
Running bar
Running bar
Running bar
Running bar
Running bar
Running bar
Running bar
Running bar
Done

Submitting tasks that depend on other tasks risks deadlock unless the pool is unbounded

"Pool is unbounded" means there is no limit to the number of threads in the pool.

Therefore, an unlimited number of tasks can execute concurrently.

Therefore, you cannot get into a deadlock where executing task T1 depends on task T2 which is unable to execute because there are no available threads.


With long-running tasks, the smaller the number of available threads, the greater the risk of all threads being occupied by long-running tasks, thus there is no thread left to run short tasks.

The pool needs to be large enough to accommodate the peak number of long tasks plus however many short tasks might be needed to satisfy dependencies. It seems better to have separate pools; it is easier to reason about.



Related Topics



Leave a reply



Submit