How to Fix a Deadlock in Join() in Ruby

How to fix a deadlock in join() in Ruby

The simplest code to reproduce this issue is:

t = Thread.new { Thread.stop }
t.join # => exception in `join': deadlock detected (fatal)

Thread::stop → nil

Stops execution of the current thread, putting it into a “sleep”
state, and schedules execution of another thread.

Thread#join → thr

Thread#join(limit) → thr

The calling thread will suspend execution and run thr. Does not return
until thr exits or until limit seconds have passed. If the time limit
expires, nil will be returned, otherwise thr is returned.

As far as I understand you call Thread.join without parameters on thread and wait for it to exit, but the child thread calls Thread.stop and goes into sleep status. This is a deadlock situation, the main thread waits for the child thread to exit, but the child thread is sleeping and not responding.

If you call join with limit the parameter then the child thread will be aborted after a timeout without causing a deadlock to your program:

t = Thread.new { Thread.stop }
t.join 1 # => Process finished with exit code 0

I would recommend exiting your worker threads after they do the job with Thread.exit or get rid of the infinite loop and reach the end of the execution thread normally, for example:

if user_id == nil
raise StopIteration
end

#or
if user_id == nil
Thread.exit
end

Deadlock in ThreadPool

Ok, so the main problem with the implementation is: how to make sure no signal is lost and avoid dead locks ?

In my experience, this is REALLY hard to achieve with condition variables and mutex, but easy with semaphores. It so happens that ruby implement an object called Queue (or SizedQueue) that should solve the problem. Here is my suggested implementation:

require 'thread'
begin
require 'fasttread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end

class ThreadPool
class Worker
def initialize(thread_queue)
@mutex = Mutex.new
@cv = ConditionVariable.new
@queue = thread_queue
@running = true
@thread = Thread.new do
@mutex.synchronize do
while @running
@cv.wait(@mutex)
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
end
@queue << self
end
end
end
end

def name
@thread.inspect
end

def get_block
@block
end

def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end

def reset_block
@block = nil
end

def busy?
@mutex.synchronize { !@block.nil? }
end

def stop
@mutex.synchronize do
@running = false
@cv.signal
end
@thread.join
end
end

attr_accessor :max_size

def initialize(max_size = 10)
@max_size = max_size
@queue = Queue.new
@workers = []
end

def size
@workers.size
end

def busy?
@queue.size < @workers.size
end

def shutdown
@workers.each { |w| w.stop }
@workers = []
end

alias :join :shutdown

def process(block=nil,&blk)
block = blk if block_given?
worker = get_worker
worker.set_block(block)
end

private

def get_worker
if !@queue.empty? or @workers.size == @max_size
return @queue.pop
else
worker = Worker.new(@queue)
@workers << worker
worker
end
end

end

And here is a simple test code:

tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown

How to execute threads in order in Ruby

I'm not a ruby expert, but in every other language i have used, the name "condition variable" is a misnomer. For anything else that's called "variable," we expect that if one thread changes it, some other thread can come along later and see that it was changed. That is not how condition variables work.

When thread A "notifies/signals" a condition variable, it will "wake up" some other thread that already was waiting, but if no other thread happened waiting at that moment, then the signal/notification does absolutely nothing at all.

Condition variables do not remember notifications.

Here's what I think could happen:

The t1 thread locks the mutex, and then sleeps.

The other three threads all start up, and all get blocked while awaiting the mutex.

The t1 thread returns from sleep(3), and it signals the condition variable. But, condition variables do not remember notifications. None of the other threads has been able to get to their wait(mutex) calls, because they're all still trying to get past mutex.synchronize. The notification is lost.

The t1 thread leaves the synchronized block, the other threads get in to their synchronized blocks, one-by-one, until all of them are awaiting signals.

Meanwhile, the main thread has been hanging in t1.join(). That call returns when the t1 thread ends, but then the main thread calls t2.join() t2 is awaiting a signal, t3 is awaiting a signal, t4 is awaiting a signal, and the main thread is waiting for t2 to die.

No more live threads.


Again, Not a ruby expert, but in every other language, a thread that uses a condition variable to await some "condition" must do something like this:

# The mutex prevents other threads from modifying the "condition"
# (i.e., prevents them from modifying the `sharedData`.)
mutex.lock()

while ( sharedData.doesNotSatisfyTheCondition() ) {

# The `wait()` call _temporarily_ unlocks the mutex so that other
# threads may make the condition become true, but it's _guaranteed_
# to re-lock the mutex before it returns.
conditionVar.wait(mutex)
}

# At this point, the condition is _guaranteed_ to be true.
sharedData.doSomethingThatRequiresTheConditionToBeTrue()

mutex.unlock()

The most important thing going on here is, the caller does not wait if the condition already is true. If the condition already is true, then the notification probably already has happened. We missed it, and if we wait for it now, we may end up waiting forever.

The other important thing is, after we have awaited and received a notification, we check the condition again. Depending on the rules of the programming language, on the operating system, and on the architecture of the program; it may be possible for wait() to return prematurely.


Making the condition become true is simple:

mutex.lock()
sharedData.doSomethingThatMakesTheConditionTrue()
conditionVar.notify()
mutex.unlock()

What causes this deadlock in my Ruby `trap` block?

I looked through Ruby sources to see where that particular error is raised, and it's only ever raised when the current thread tries to acquire a lock, but that same lock is already taken by the current thread. This implies that locking is not re-entrant:

m = Mutex.new
m.lock
m.lock #=> same error as yours

Now at least we know what happens, but not yet why and where. The error message indicates that it happens during the call to puts. When it gets called, it finally ends up in io_binwrite. stdout is not synchronized, but it is buffered, so this if condition is fulfilled on the first call, and a buffer plus a write lock for that buffer will be set up. The write lock is important to guarantee the atomicity of writes to stdout, it shouldn't happen that two threads simultaneously writing to stdout mix up each other's output. To demonstrate what I mean:

t1 = Thread.new { 100.times { print "aaaaa" } }
t2 = Thread.new { 100.times { print "bbbbb" } }
t1.join
t2.join

Although both threads take turns in writing to stdout, it will never happen that a single write is broken up - you will always have the full 5 a's or b's in sequence. That's what the write lock is there for.

Now what goes wrong in your case is a race condition on that write lock. The parent process loops and writes to stdout every second("parent is working hard"). But the same thread also eventually executes the trap block and tries again to write to stdout("Received a CHLD signal"). You can verify that it's really the same thread by adding #{Thread.current} in your puts statements. If those two events happen closely enough, then you will have the same situation as in the first example: the same thread trying to obtain the same lock twice, and this ultimately triggers the error.

Problem wuth Ruby threads

Most likely the code you're running is executed in another thread. That particular thread is then joined (meaning Ruby waits for it to finish upon exiting the script) using Thread.join(). Calling Thread.stop() while also calling .join() is most likely the cause of the deadlock. Having said that you should following the guides of StackOverflow regarding how to ask questions properly, since you haven't done so I've down voted your question.

Joining a thread while still calling Thread.stop can be done as following:

th = Thread.new do
Thread.stop
end

if th.status === 'sleep'
th.run
else
th.join
end

It's not the cleanest way but it works. Also, if you want to actually terminate a thread you'll have to call Thread.exit instead.

How to fix a deadlock caused by open

You're not catching all exceptions here. When nothing is specified after rescue, it means that you're catching StandardError which is not at the root of Exceptions' hierarchy.

If you want to make sure you're catching all exceptions and retry opening a URL (or whatever behavior you'd like), what you want to do is:

rescue Exception => error


Related Topics



Leave a reply



Submit