Ruby Semaphores

Ruby Semaphores?

Thanks to @x3ro for his link. That pointed me in the right direction. However, with the implementation that Fukumoto gave (at least for rb1.9.2) Thread.critical isn't available. Furthermore, my attempts to replace the Thread.critical calls with Thread.exclusive{} simply resulted in deadlocks. It turns out that there is a proposed Semaphore patch for Ruby (which I've linked below) that has solved the problem by replacing Thread.exclusive{} with a Mutex::synchronize{}, among a few other tweaks. Thanks to @x3ro for pushing me in the right direction.

http://redmine.ruby-lang.org/attachments/1109/final-semaphore.patch

Ruby: Concurrent::Semaphore yields deadlock

The (apparent) deadlock is not directly caused by your usage of semaphores. Instead, what is happening here is that you have two threads (both of which you are waiting to finish) which are both blocking.

Your first thread is indeed waiting for the semaphore to be available.

However the second thread is currently writing data to STDOUT which in your case is also blocking. This can usually happen if the process reading the STDOUT of your Ruby process (e.g. your terminal) is not quick enough to read all of the data. Once the pipe's buffer is full, writing to STDOUT blocks, resulting in the thread to not be live either.

This is detected by Thread#join resulting in the exception being thrown.

To resolve this issue, you could just make sure that you are reading fast enough from your process's STDOUT. Then, I could not reproduce the issue anymore.

For documentation purposes: I could consistently reproduce the issue described by OP by running ruby ./semaphore.rb | ruby -e "sleep 30" with semaphore.rb containing the code show in the question.

Ruby Return Within semaphore.synchronize Block

According to documentation it will release once it's done with the block (the one passed to syncronize):
https://ruby-doc.org/core-2.5.0/Mutex.html#method-i-synchronize

In order to provide more proofs as this answer was downvoted, here's the implementation of synchronize. I'm no expert in C, but from what I see here, unlocking implemented in ensure, so this mutex will be unlocked on block termination no matter whether it returned or was left via jump:
https://github.com/ruby/ruby/blob/2cf3bd5bb2a7c4724e528577d37a883fe80a1122/thread_sync.c#L512

Quick modeling supports this as well:
https://repl.it/repls/FailingWearableCodewarrior

Redis semaphore locks can't be released

An awesome bug /p>

The bug

I think it happens if you kill the process when it does lpop on the SEMAPHORE:test:AVAILABLE

Most probably here https://github.com/dv/redis-semaphore/blob/v0.3.1/lib/redis/semaphore.rb#L67

To replicate it

NonBlockingRedis.new.flushall

release_and_lock('test');

NonBlockingRedis.new.lpop('SEMAPHORE:test:AVAILABLE')

Now initially you have:

SEMAPHORE:test:AVAILABLE                0
SEMAPHORE:test:VERSION 1
SEMAPHORE:test:EXISTS 1

After the above code you get:

SEMAPHORE:test:VERSION          1
SEMAPHORE:test:EXISTS 1

The code checks the SEMAPHORE:test:EXISTS and then expects to have SEMAPHORE:test:AVAILABLE / SEMAPHORE:test:GRABBED

Solution

From my brief check I don't think it is possible to make the gem work without a modification. I tried adding an expiration: but somehow it managed to disable the expiration for SEMAPHORE:test:EXISTS

NonBlockingRedis.new.ttl('SEMAPHORE:test:EXISTS') # => -1 and it should have been e.g. 20 seconds and going down

So.. maybe a fix will be

class Redis
class Semaphore
def exists_or_create!
token = @redis.getset(exists_key, EXISTS_TOKEN)

if token.nil? || all_tokens.empty?
create!
else
# Previous versions of redis-semaphore did not set `version_key`.
# Make sure it's set now, so we can use it in future versions.

if token == API_VERSION && @redis.get(version_key).nil?
@redis.set(version_key, API_VERSION)
end

true
end
end
end
end

the all_tokens is https://github.com/dv/redis-semaphore/blob/v0.3.1/lib/redis/semaphore.rb#L120

I'll open a PR to the gem shortly -> https://github.com/dv/redis-semaphore/pull/66 maybe ‍♂️

Note 1

Not sure how you use the NonBlockingRedis but it is not in use in Redis::Semaphore. You do lock(-1) which does in the code lpop. Also the code never calls your lock.

Random

Here is a helper to dump the keys

class Test
def self.all
r = NonBlockingRedis.new
puts r.keys('*').map { |k|
[
k,
((r.hgetall(k) rescue r.get(k)) rescue r.lrange(k, 0, -1).join(' | '))
].join("\t\t")
}
end
end

> Test.all

SEMAPHORE:test:AVAILABLE 0
SEMAPHORE:test:VERSION 1
SEMAPHORE:test:EXISTS 1

For completeness here is how it looks when you have grabbed the lock

SEMAPHORE:test:VERSION          1
SEMAPHORE:test:EXISTS 1
SEMAPHORE:test:GRABBED {"0"=>"1583672948.7168388"}

Javascript (Jest) tests on Semaphore CI ruby project

It looks like SemaphoreCI is not using the same version of NPM as you. But you can set that within the build settings like this :

nvm install v8.9.4
npm install
npm test

You can test it out in the SSH

How to share text file (or a mutex/semaphore) between Ruby and another language?

You should be able to accomplish what you desire by placing locks on "history.txt" by using flock in Ruby and C++ (this probably exists in many other languages as well, since it's a system call), although there does seem to be a few gotchas that may occur while using this method.

Here is the code I used to test the method.

Here is the Ruby code:

File.open("history.txt", "r+") do |file|
puts "before the lock"
file.flock(File::LOCK_EX)
puts "Locking until you press enter"
gets
puts file.gets
file.flock(File::LOCK_UN)
end

Here is the C++ code:

#include <iostream>
#include <fstream>
#include <sys/file.h>

int main()
{
FILE *h;
h = fopen("history.txt","a"); //open the file
std::cout << "Press enter to lock\n";
std::cin.get();
int hNum = fileno(h); //get the file handle from the FILE*
int rt = flock(hNum, LOCK_EX); //Lock it down!
std::cout << "Writing!"<<rt<<"\n";
fprintf(h,"Shoop da woop!\n");
std::cout << "Press enter to unlock\n";
std::cin.get();
rt = flock(hNum, LOCK_UN);
fflush(h);
fclose(h);
return 0;
}

By running these two methods you can confirm that the Ruby process stops when the C++ process has locked the file and vice versa.

Ruby synchronisation: How to make threads work one after another in proper order?

Using Queue as a PV Semaphore

You can abuse Queue, using it like a traditional PV Semaphore. To do this, you create an instance of Queue:

require 'thread'
...
sem = Queue.new

When a thread needs to wait, it calls Queue#deq:

# waiting thread
sem.deq

When some other thread wants to unblock the waiting thread, it pushes something (anything) onto the queue:

# another thread that wants to unblock the waiting thread
sem.enq :go

A Worker class

Here's a worker class that uses Queue to synchronize its start and stop:

class Worker

def initialize(worker_number)
@start = Queue.new
Thread.new do
@start.deq
puts "Thread #{worker_number}"
@when_done.call
end
end

def start
@start.enq :start
end

def when_done(&block)
@when_done = block
end

end

When constructed, a worker creates a thread, but that thread then waits on the @start queue. Not until #start is called will the thread unblock.

When done, the thread will execute the block that was called to #when_done. We'll see how this is used in just a moment.

Creating workers

First, let's make sure that if any threads raise an exception, we get to find out about it:

Thread.abort_on_exception = true

We'll need six workers:

workers = (1..6).map { |i| Worker.new(i) }

Telling each worker what to do when it's done

Here's where #when_done comes into play:

workers.each_cons(2) do |w1, w2|
w1.when_done { w2.start }
end

This takes each pair of workers in turn. Each worker except the last is told, that when it finishes, it should start the worker after it. That just leaves the last worker. When it finishes, we want it to notify this thread:

all_done = Queue.new
workers.last.when_done { all_done.enq :done }

Let's Go!

Now all that remains is to start the first thread:

workers.first.start

and wait for the last thread to finish:

all_done.deq

The output:

Thread 1
Thread 2
Thread 3
Thread 4
Thread 5
Thread 6

Synchronising thread startup

Set a while block to stop waiting if second thread finished (see more here):

def run_test
mutex = Mutex.new
cond = ConditionVariable.new
cond_main = ConditionVariable.new
threads = []

spawned = false

t1_done = false
t2_done = false

threads << Thread.new do
mutex.synchronize do
while(!spawned) do
cond.wait(mutex, 2)
end
raise 'timeout waiting for switch' if !t2_done

# some work
t1_done = true
cond.signal
end
end

threads << Thread.new do
mutex.synchronize do
spawned = true
cond.signal
# some work
t2_done = true
cond.wait(mutex, 2)
raise 'timeout waiting for switch' if !t1_done
end
end

threads.map(&:join)
end

50000.times { |x|
puts x
run_test
}

Alternatively, using a counting semaphore, we can assign some priorities to the threads:

require 'concurrent-ruby'

def run_test
mutex = Mutex.new
sync = Concurrent::Semaphore.new(0)
cond = ConditionVariable.new
cond_main = ConditionVariable.new
threads = []

t1_done = false
t2_done = false

threads << Thread.new do
mutex.synchronize do
sync.release(1)
# this needs to happen first
cond.wait(mutex, 2)
raise 'timeout waiting for switch' if !t2_done

# some work
t1_done = true
cond.signal
end
end

threads << Thread.new do
sync.acquire(1)
mutex.synchronize do
cond.signal
# some work
t2_done = true
cond.wait(mutex, 2)
raise 'timeout waiting for switch' if !t1_done
end
end

threads.map(&:join)
end

50000.times { |x|
puts x
run_test
}

I prefer the second solution as it allows you to control the order of your threads, though it feels a little dirtier.

As a curiosity, on Ruby 2.6, your code does not seem to raise exceptions (tested >10M runs).



Related Topics



Leave a reply



Submit