Does ruby have the Java equivalent of synchronize keyword?
It doesn't have the synchronize
keyword, but you can get something very similar via the Monitor
class. Here's an example from the Programming Ruby 1.8 book:
require 'monitor'
class Counter < Monitor
attr_reader :count
def initialize
@count = 0
super
end
def tick
synchronize do
@count += 1
end
end
end
c = Counter.new
t1 = Thread.new { 100_000.times { c.tick } }
t2 = Thread.new { 100_000.times { c.tick } }
t1.join; t2.join
c.count → 200000
Are there any operations/methods in Ruby that are guaranteed/documented to be atomic?
I have a very limited knowledge on this topic. But I will try to answer as best as i can.
There is a very good article that Jesse Storimer wrote about concurrency. I would highly recommend you to read all of 3 parts about it.
http://www.jstorimer.com/blogs/workingwithcode/8100871-nobody-understands-the-gil-part-2-implementation
The conclusion on part 2 is that GIL guaranteed that native C method implementation are atomic.
The example that you gave is actually more of re-entrance problem then atomicity. I don't know if that is the same thing or how close related it is.
Like the article explain, ruby is different from event driven programming where callback are synchronous, meaning that if you send the signal USR1 twice, the second handler will be executed after the first one finished. So you will not lock on the mutex twice.
But in the signal handling in ruby is asynchronous. meaning that if you send the signal twice. the second handler will interrupt the first handler. Because the first already acquired the lock, then the second handler that try to acquire the same lock will throw an exception. And I believe that this problem is not ruby specific.
One of the way of solving this problem is by creating a queue to do signal handling. Another solution for this is using a method called "self-pipe" trick. Both of the method. are explained in this article by again the awesome Jesse Storimer :
http://rubysource.com/the-self-pipe-trick-explained/
So,
For MRI 2.0.0 is I believe still have GIL, so ruby only guarantee that native C method are atomic.
JRuby is JVM backed so my guess is all of the thread and locking mechanism are implemented on top of JVM
Rubinius 1.2 also still have the GIL so i believe it will be the same as MRI. But Rubinius 2.x remove the GIL. I haven't had much experience with Rubinius so I'm not entirely sure about it.
And to answer the question, if you are working with multi thread application in ruby. Mutex classes should guard that the block will only be executed by single thread at a time.
Learning Ruby threading - trigger an event when thread finishes
The issue isn't really about "knowing when the thread finished", but rather, how can you update a shared progress bar without race conditions.
To explain the problem: say you had a central ThreadList#progress_var
variable, and as the last line of each thread, you incremented it with +=
. This would introduce a race condition because two threads can perform the operation at the same time (and could overwrite each other's results).
To get around this, the typical approach is to use a Mutex which is an essential concept to understand if you're learning multithreading.
The actual implementation isn't that difficult:
require 'mutex'
class ThreadList
def initialize
@semaphore = Mutex.new
@progress_bar = 0
end
def increment_progress_bar(amount)
@semaphore.synchronize do
@progress_bar += amount
end
end
end
Because of that @semaphore.synchronize
block, you can now safely call this increment_progress_bar
method from threads, without the risk of race condition.
Single thread still handles concurrency request?
Quoting the chapter: "Non blocking IOs/Reactor pattern" in
http://merbist.com/2011/02/22/concurrency-in-ruby-explained/:
"this is the approach used by Twisted, EventMachine and Node.js. Ruby developers can use EventMachine or
an EventMachine based webserver like Thin as well as EM clients/drivers to make non blocking async calls."
The heart of the matter regard EventMachine.defer
*
used for integrating blocking operations into EventMachine's control flow.
The action of defer is to take the block specified in the first parameter (the "operation")
and schedule it for asynchronous execution on an internal thread pool maintained by EventMachine.
When the operation completes, it will pass the result computed by the block (if any)
back to the EventMachine reactor.
Then, EventMachine calls the block specified in the second parameter to defer (the "callback"),
as part of its normal event handling loop.
The result computed by the operation block is passed as a parameter to the callback.
You may omit the callback parameter if you don't need to execute any code after the operation completes.
*
Essentially, in response to an HTTP request, the server executes that you wrote,
invokes the process
method in the Connecction class.
have a look at the code in $GEM_HOME/gems/thin-1.6.2/lib/thin/connection.rb
:
# Connection between the server and client.
# This class is instanciated by EventMachine on each new connection
# that is opened.
class Connection < EventMachine::Connection
# Called when all data was received and the request
# is ready to be processed.
def process
if threaded?
@request.threaded = true
EventMachine.defer(method(:pre_process), method(:post_process))
else
@request.threaded = false
post_process(pre_process)
end
end
..here is where a threaded connection invoke EventMachine.defer
The reactor
To see where is activated the EventMachine reactor
should follow the initialization of the program:
Notice that for all Sinatra applications and middleware ($GEM_HOME/gems/sinatra-1.4.5/base.rb
)
can run the Sinatra app as a self-hosted server using Thin, Puma, Mongrel, or WEBrick.
def run!(options = {}, &block)
return if running?
set options
handler = detect_rack_handler
....
the method detect_rack_handler returns the first Rack::Handler
return Rack::Handler.get(server_name.to_s)
in our test we require thin therefore it returns a Thin rack handler and setup a threaded server
# Starts the server by running the Rack Handler.
def start_server(handler, server_settings, handler_name)
handler.run(self, server_settings) do |server|
....
server.threaded = settings.threaded if server.respond_to? :threaded=
$GEM_HOME/gems/thin-1.6.2/lib/thin/server.rb
# Start the server and listen for connections.
def start
raise ArgumentError, 'app required' unless @app
log_info "Thin web server (v#{VERSION::STRING} codename #{VERSION::CODENAME})"
...
log_info "Listening on #{@backend}, CTRL+C to stop"
@backend.start { setup_signals if @setup_signals }
end
$GEM_HOME/gems/thin-1.6.2/lib/thin/backends/base.rb
# Start the backend and connect it.
def start
@stopping = false
starter = proc do
connect
yield if block_given?
@running = true
end
# Allow for early run up of eventmachine.
if EventMachine.reactor_running?
starter.call
else
@started_reactor = true
EventMachine.run(&starter)
end
end
Pure-Ruby concurrent Hash
Okay, now that you specified the actually meaning of 'threadsafe', here are two potential implementations. The following code will run forever in MRI and JRuby. The lockless implementation follows an eventual consistency model where each thread uses it's own view of the hash if the master is in flux. There is a little trickery required to make sure storing all the information in the thread doesn't leak memory, but that is handled and tested ― process size does not grow running this code. Both implementations would need more work to be 'complete', meaning delete, update, etc. would need some thinking, but either of the two concepts below will meet your requirements.
It's very important for people reading this thread to realize the whole issue is exclusive to JRuby ― in MRI the built-in Hash is sufficient.
module Cash
def Cash.new(*args, &block)
env = ENV['CASH_IMPL']
impl = env ? Cash.const_get(env) : LocklessImpl
klass = defined?(JRUBY_VERSION) ? impl : ::Hash
klass.new(*args)
end
class LocklessImpl
def initialize
@hash = {}
end
def thread_hash
thread = Thread.current
thread[:cash] ||= {}
hash = thread[:cash][thread_key]
if hash
hash
else
hash = thread[:cash][thread_key] = {}
ObjectSpace.define_finalizer(self){ thread[:cash].delete(thread_key) }
hash
end
end
def thread_key
[Thread.current.object_id, object_id]
end
def []=(key, val)
time = Time.now.to_f
tuple = [time, val]
@hash[key] = tuple
thread_hash[key] = tuple
val
end
def [](key)
# check the master value
#
val = @hash[key]
# someone else is either writing the key or it has never been set. we
# need to invalidate our own copy in either case
#
if val.nil?
thread_val = thread_hash.delete(key)
return(thread_val ? thread_val.last : nil)
end
# check our own thread local value
#
thread_val = thread_hash[key]
# in this case someone else has written a value that we have never seen so
# simply return it
#
if thread_val.nil?
return(val.last)
end
# in this case there is a master *and* a thread local value, if the master
# is newer juke our own cached copy
#
if val.first > thread_val.first
thread_hash.delete(key)
return val.last
else
return thread_val.last
end
end
end
class LockingImpl < ::Hash
require 'sync'
def initialize(*args, &block)
super
ensure
extend Sync_m
end
def sync(*args, &block)
sync_synchronize(*args, &block)
end
def [](key)
sync(:SH){ super }
end
def []=(key, val)
sync(:EX){ super }
end
end
end
if $0 == __FILE__
iteration = 0
loop do
n = 42
hash = Cash.new
threads =
Array.new(10) {
Thread.new do
Thread.current.abort_on_exception = true
n.times do |key|
hash[key] = key
raise "#{ key }=nil" if hash[key].nil?
end
end
}
threads.map{|thread| thread.join}
puts "THREADSAFE: #{ iteration += 1 }"
end
end
Concurrency and Mongoid
Sounds like you want the mongo findAndModify command which allows you to atomically retrieve and update a row.
Unfortunately mongoid doesn't appear to expose this part of the mongo api, so it looks like you'll have to drop down to the driver level for this one bit:
battle = Battle.collection.find_and_modify(query: {oppenent: current_user._id, ...},
update: {'$set' => {resolving: true})
By default the returned object does not include the modification made, but you can turn this on if you want (pass {:new => true})
The value returned is a raw hash, if my memory is correct you can do Battle.instantiate(doc)
to get a Battle
object back.
Rails concurrence issue - how to use locks
I think the correct way to use a Ruby Mutex is obtain a lock, run a code block, and then release the lock again, i.e. to call lock in combination with unlock...
@mutex = Mutex.new
..
def start
@mutex.lock
begin
..
ensure
@mutex.unlock rescue nil
end
end
or to use the synchronize method:
@mutex = Mutex.new
..
def start
@mutex.synchronize do
# do something
end
end
An example can be found in the Rack Middleware class Rack::Lock. But I am not sure if it helps in your case, even if you use class variables like @@mutex
, because the mutex/semaphore may not be preserved between different tasks and distinct processes (I assume that the "tasks" are different processes started by "rake tasks"). The Mutex class is useful to achieve thread safety, since it implements a simple semaphore that can be used to coordinate access to shared data from multiple concurrent threads. However, there is also a nice RailsCast about Thread-Safety (unfortunately only behind a paywall).
In your case it may help to create a global lock flag in the database, or to create a global lock file in the filesystem with touch lock.txt
and to remove it again with rm lock.txt
when the process is over. It is possible to execute these shell commands in Ruby with Kernel.system or %x. If the file exists File.exists?("lock.txt")
then the update can be suspended.
JS-style async/non-blocking callback execution with Ruby, without heavy machinery like threads?
Okay, after some fiddling with threads and studying contributions by apeiros and asQuirreL, i came up with a solution that suits me.
I'll show sample usage first, source code in the end.
Example 1: simple non-blocking execution
First, a JS example that i'm trying to mimic:
setTimeout( function() {
console.log("world");
}, 0);
console.log("hello");
// 'Will print "hello" first, then "world"'.
Here's how i can do it with my tiny Ruby library:
# You wrap all your code into this...
Branch.new do
# ...and you gain access to the `branch` method that accepts a block.
# This block runs non-blockingly, just like in JS `setTimeout(callback, 0)`.
branch { puts "world!" }
print "Hello, "
end
# Will print "Hello, world!"
Note how you don't have to take care of creating threads, waiting for them to finish. The only scaffolding required is the Branch.new { ... }
wrapper.
Example 2: synchronizing threads with a mutex
Now we'll assume that we're working with some input and output shared among threads.
JS code i'm trying to reproduce with Ruby:
var
results = [],
rounds = 5;
for (var i = 1; i <= rounds; i++) {
console.log("Starting thread #" + i + ".");
// "Creating local scope"
(function(local_i) {
setTimeout( function() {
// "Assuming there's a time-consuming operation here."
results.push(local_i);
console.log("Thread #" + local_i + " has finished.");
if (results.length === rounds)
console.log("All " + rounds + " threads have completed! Bye!");
}, 0);
})(i);
}
console.log("All threads started!");
This code produces the following output:
Starting thread #1.
Starting thread #2.
Starting thread #3.
Starting thread #4.
Starting thread #5.
All threads started!
Thread #5 has finished.
Thread #4 has finished.
Thread #3 has finished.
Thread #2 has finished.
Thread #1 has finished.
All 5 threads have completed! Bye!
Notice that the callbacks finish in reverse order.
We're also gonna assume that working the results
array may produce a race condition. In JS this is never an issue, but in multithreaded Ruby this has to be addressed with a mutex.
Ruby equivalent of the above:
Branch.new 1 do
# Setting up an array to be filled with that many values.
results = []
rounds = 5
# Running `branch` N times:
1.upto(rounds) do |item|
puts "Starting thread ##{item}."
# The block passed to `branch` accepts a hash with mutexes
# that you can use to synchronize threads.
branch do |mutexes|
# This imitates that the callback may take time to complete.
# Threads will finish in reverse order.
sleep (6.0 - item) / 10
# When you need a mutex, you simply request one from the hash.
# For each unique key, a new mutex will be created lazily.
mutexes[:array_and_output].synchronize do
puts "Thread ##{item} has finished!"
results.push item
if results.size == rounds
puts "All #{rounds} threads have completed! Bye!"
end
end
end
end
puts "All threads started."
end
puts "All threads finished!"
Note how you don't have to take care of creating threads, waiting for them to finish, creating mutexes and passing them into the block.
Example 3: delaying execution of the block
If you need the delay feature of setTimeout
, you can do it like this.
JS:
setTimeout(function(){ console.log('Foo'); }, 2000);
Ruby:
branch(2) { puts 'Foo' }
Example 4: waiting for all threads to finish
With JS, there's no simple way to have the script wait for all threads to finish. You'll need an await/defer library for that.
But in Ruby it's possible, and Branch makes it even simpler. If you write code after the Branch.new{}
wrapper, it will be executed after all branches within the wrapper have been completed. You don't need to manually ensure that all threads have finished, Branch does that for you.
Branch.new do
branch { sleep 10 }
branch { sleep 5 }
# This will be printed immediately
puts "All threads started!"
end
# This will be printed after 10 seconds (the duration of the slowest branch).
puts "All threads finished!"
Sequential Branch.new{}
wrappers will be executed sequentially.
Source
# (c) lolmaus (Andrey Mikhaylov), 2014
# MIT license http://choosealicense.com/licenses/mit/
class Branch
def initialize(mutexes = 0, &block)
@threads = []
@mutexes = Hash.new { |hash, key| hash[key] = Mutex.new }
# Executing the passed block within the context
# of this class' instance.
instance_eval &block
# Waiting for all threads to finish
@threads.each { |thr| thr.join }
end
# This method will be available within a block
# passed to `Branch.new`.
def branch(delay = false, &block)
# Starting a new thread
@threads << Thread.new do
# Implementing the timeout functionality
sleep delay if delay.is_a? Numeric
# Executing the block passed to `branch`,
# providing mutexes into the block.
block.call @mutexes
end
end
end
Related Topics
Rails: Switch Connection on Each Request But Keep a Connection Pool
How to Find Where a Ruby Method Is Declared
Generating a Short Uuid String Using Uuidtools in Rails
How to Authorize a Google Service Account Without the Default Credentials File
Find Memory Leak in a Ruby on Rails Project
Using Ruby, Reading a File, Containing Name/Value Pairs into a Hash
Stylesheet_Link_Tag :All Versus :Media =>All
Use Global or Constant Variable in Ruby/Rails
Active Admin Scopes for Each Instance of a Related Model
Can't Convert Symbol into String
Running Multiple Background Parallel Jobs with Rails
Customize Error Message with Simple_Form
How to Run Irb.Start in Context of Current Class
How to Convert String to Bytes in Ruby
How to Iterate Through an Array Starting from the Last Element? (Ruby)
How to Turn a Ruby Method into a Block