Ruby Semaphores?
Thanks to @x3ro for his link. That pointed me in the right direction. However, with the implementation that Fukumoto gave (at least for rb1.9.2) Thread.critical isn't available. Furthermore, my attempts to replace the Thread.critical calls with Thread.exclusive{} simply resulted in deadlocks. It turns out that there is a proposed Semaphore patch for Ruby (which I've linked below) that has solved the problem by replacing Thread.exclusive{} with a Mutex::synchronize{}, among a few other tweaks. Thanks to @x3ro for pushing me in the right direction.
http://redmine.ruby-lang.org/attachments/1109/final-semaphore.patch
Ruby: Concurrent::Semaphore yields deadlock
The (apparent) deadlock is not directly caused by your usage of semaphores. Instead, what is happening here is that you have two threads (both of which you are waiting to finish) which are both blocking.
Your first thread is indeed waiting for the semaphore to be available.
However the second thread is currently writing data to STDOUT
which in your case is also blocking. This can usually happen if the process reading the STDOUT
of your Ruby process (e.g. your terminal) is not quick enough to read all of the data. Once the pipe's buffer is full, writing to STDOUT
blocks, resulting in the thread to not be live either.
This is detected by Thread#join
resulting in the exception being thrown.
To resolve this issue, you could just make sure that you are reading fast enough from your process's STDOUT
. Then, I could not reproduce the issue anymore.
For documentation purposes: I could consistently reproduce the issue described by OP by running ruby ./semaphore.rb | ruby -e "sleep 30"
with semaphore.rb
containing the code show in the question.
Ruby Return Within semaphore.synchronize Block
According to documentation it will release once it's done with the block (the one passed to syncronize
):
https://ruby-doc.org/core-2.5.0/Mutex.html#method-i-synchronize
In order to provide more proofs as this answer was downvoted, here's the implementation of synchronize. I'm no expert in C, but from what I see here, unlocking implemented in ensure, so this mutex will be unlocked on block termination no matter whether it returned or was left via jump:
https://github.com/ruby/ruby/blob/2cf3bd5bb2a7c4724e528577d37a883fe80a1122/thread_sync.c#L512
Quick modeling supports this as well:
https://repl.it/repls/FailingWearableCodewarrior
Redis semaphore locks can't be released
An awesome bug /p>
The bug
I think it happens if you kill the process when it does lpop on the SEMAPHORE:test:AVAILABLE
Most probably here https://github.com/dv/redis-semaphore/blob/v0.3.1/lib/redis/semaphore.rb#L67
To replicate it
NonBlockingRedis.new.flushall
release_and_lock('test');
NonBlockingRedis.new.lpop('SEMAPHORE:test:AVAILABLE')
Now initially you have:
SEMAPHORE:test:AVAILABLE 0
SEMAPHORE:test:VERSION 1
SEMAPHORE:test:EXISTS 1
After the above code you get:
SEMAPHORE:test:VERSION 1
SEMAPHORE:test:EXISTS 1
The code checks the SEMAPHORE:test:EXISTS
and then expects to have SEMAPHORE:test:AVAILABLE
/ SEMAPHORE:test:GRABBED
Solution
From my brief check I don't think it is possible to make the gem work without a modification. I tried adding an expiration:
but somehow it managed to disable the expiration for SEMAPHORE:test:EXISTS
NonBlockingRedis.new.ttl('SEMAPHORE:test:EXISTS') # => -1 and it should have been e.g. 20 seconds and going down
So.. maybe a fix will be
class Redis
class Semaphore
def exists_or_create!
token = @redis.getset(exists_key, EXISTS_TOKEN)
if token.nil? || all_tokens.empty?
create!
else
# Previous versions of redis-semaphore did not set `version_key`.
# Make sure it's set now, so we can use it in future versions.
if token == API_VERSION && @redis.get(version_key).nil?
@redis.set(version_key, API_VERSION)
end
true
end
end
end
end
the all_tokens is https://github.com/dv/redis-semaphore/blob/v0.3.1/lib/redis/semaphore.rb#L120
I'll open a PR to the gem shortly -> https://github.com/dv/redis-semaphore/pull/66 maybe ♂️
Note 1
Not sure how you use the NonBlockingRedis
but it is not in use in Redis::Semaphore
. You do lock(-1)
which does in the code lpop
. Also the code never calls your lock
.
Random
Here is a helper to dump the keys
class Test
def self.all
r = NonBlockingRedis.new
puts r.keys('*').map { |k|
[
k,
((r.hgetall(k) rescue r.get(k)) rescue r.lrange(k, 0, -1).join(' | '))
].join("\t\t")
}
end
end
> Test.all
SEMAPHORE:test:AVAILABLE 0
SEMAPHORE:test:VERSION 1
SEMAPHORE:test:EXISTS 1
For completeness here is how it looks when you have grabbed the lock
SEMAPHORE:test:VERSION 1
SEMAPHORE:test:EXISTS 1
SEMAPHORE:test:GRABBED {"0"=>"1583672948.7168388"}
Javascript (Jest) tests on Semaphore CI ruby project
It looks like SemaphoreCI is not using the same version of NPM as you. But you can set that within the build settings like this :
nvm install v8.9.4
npm install
npm test
You can test it out in the SSH
How to share text file (or a mutex/semaphore) between Ruby and another language?
You should be able to accomplish what you desire by placing locks on "history.txt" by using flock
in Ruby and C++ (this probably exists in many other languages as well, since it's a system call), although there does seem to be a few gotchas that may occur while using this method.
Here is the code I used to test the method.
Here is the Ruby code:
File.open("history.txt", "r+") do |file|
puts "before the lock"
file.flock(File::LOCK_EX)
puts "Locking until you press enter"
gets
puts file.gets
file.flock(File::LOCK_UN)
end
Here is the C++ code:
#include <iostream>
#include <fstream>
#include <sys/file.h>
int main()
{
FILE *h;
h = fopen("history.txt","a"); //open the file
std::cout << "Press enter to lock\n";
std::cin.get();
int hNum = fileno(h); //get the file handle from the FILE*
int rt = flock(hNum, LOCK_EX); //Lock it down!
std::cout << "Writing!"<<rt<<"\n";
fprintf(h,"Shoop da woop!\n");
std::cout << "Press enter to unlock\n";
std::cin.get();
rt = flock(hNum, LOCK_UN);
fflush(h);
fclose(h);
return 0;
}
By running these two methods you can confirm that the Ruby process stops when the C++ process has locked the file and vice versa.
Ruby synchronisation: How to make threads work one after another in proper order?
Using Queue as a PV Semaphore
You can abuse Queue, using it like a traditional PV Semaphore. To do this, you create an instance of Queue:
require 'thread'
...
sem = Queue.new
When a thread needs to wait, it calls Queue#deq:
# waiting thread
sem.deq
When some other thread wants to unblock the waiting thread, it pushes something (anything) onto the queue:
# another thread that wants to unblock the waiting thread
sem.enq :go
A Worker class
Here's a worker class that uses Queue to synchronize its start and stop:
class Worker
def initialize(worker_number)
@start = Queue.new
Thread.new do
@start.deq
puts "Thread #{worker_number}"
@when_done.call
end
end
def start
@start.enq :start
end
def when_done(&block)
@when_done = block
end
end
When constructed, a worker creates a thread, but that thread then waits on the @start
queue. Not until #start is called will the thread unblock.
When done, the thread will execute the block that was called to #when_done. We'll see how this is used in just a moment.
Creating workers
First, let's make sure that if any threads raise an exception, we get to find out about it:
Thread.abort_on_exception = true
We'll need six workers:
workers = (1..6).map { |i| Worker.new(i) }
Telling each worker what to do when it's done
Here's where #when_done comes into play:
workers.each_cons(2) do |w1, w2|
w1.when_done { w2.start }
end
This takes each pair of workers in turn. Each worker except the last is told, that when it finishes, it should start the worker after it. That just leaves the last worker. When it finishes, we want it to notify this thread:
all_done = Queue.new
workers.last.when_done { all_done.enq :done }
Let's Go!
Now all that remains is to start the first thread:
workers.first.start
and wait for the last thread to finish:
all_done.deq
The output:
Thread 1
Thread 2
Thread 3
Thread 4
Thread 5
Thread 6
Synchronising thread startup
Set a while block to stop waiting if second thread finished (see more here):
def run_test
mutex = Mutex.new
cond = ConditionVariable.new
cond_main = ConditionVariable.new
threads = []
spawned = false
t1_done = false
t2_done = false
threads << Thread.new do
mutex.synchronize do
while(!spawned) do
cond.wait(mutex, 2)
end
raise 'timeout waiting for switch' if !t2_done
# some work
t1_done = true
cond.signal
end
end
threads << Thread.new do
mutex.synchronize do
spawned = true
cond.signal
# some work
t2_done = true
cond.wait(mutex, 2)
raise 'timeout waiting for switch' if !t1_done
end
end
threads.map(&:join)
end
50000.times { |x|
puts x
run_test
}
Alternatively, using a counting semaphore, we can assign some priorities to the threads:
require 'concurrent-ruby'
def run_test
mutex = Mutex.new
sync = Concurrent::Semaphore.new(0)
cond = ConditionVariable.new
cond_main = ConditionVariable.new
threads = []
t1_done = false
t2_done = false
threads << Thread.new do
mutex.synchronize do
sync.release(1)
# this needs to happen first
cond.wait(mutex, 2)
raise 'timeout waiting for switch' if !t2_done
# some work
t1_done = true
cond.signal
end
end
threads << Thread.new do
sync.acquire(1)
mutex.synchronize do
cond.signal
# some work
t2_done = true
cond.wait(mutex, 2)
raise 'timeout waiting for switch' if !t1_done
end
end
threads.map(&:join)
end
50000.times { |x|
puts x
run_test
}
I prefer the second solution as it allows you to control the order of your threads, though it feels a little dirtier.
As a curiosity, on Ruby 2.6, your code does not seem to raise exceptions (tested >10M runs).
Related Topics
How to Refresh a Page with Turbolinks
Ip Range to Cidr in Ruby/Rails
Why Is Foreman Gem Ignoring the Port Environment Variable
Sort a List with Multiple Conditions Ruby on Rails
Working with a Large Data Object Between Ruby Processes
How to Remotely Inspect the Data in My Rediscloud Dbs
Ruby - Platform Independent Way to Determine Ips of All Network Interfaces
Made a Mistake Installing Rvm with Sudo. How to Reverse
Ruby 'split': Invalid Byte Sequence in Utf-8 (Argumenterror)
When Do You Need to Pass Arguments to 'Thread.New'
Phonegap and Rails 3: How to Interact with a Rails 3 App
How to Export a Rails Model to JSON Schema
Which Ruby Rest API Client for Neo4J
Using Rvm, But Can't Set Current Ruby Version (Ubuntu 11.10)
Application.CSS Not Being Served as an Asset
Ruby on Rails Triggers Update on Serialized Attribute Every Time
Using Negative Conditions Within Regular Expressions
Rails Server Does Not Start -> Could Not Find a JavaScript Runtime