Synchronized Method for Concurrency in Ruby

Does ruby have the Java equivalent of synchronize keyword?

It doesn't have the synchronize keyword, but you can get something very similar via the Monitor class. Here's an example from the Programming Ruby 1.8 book:

require 'monitor'

class Counter < Monitor
attr_reader :count
def initialize
@count = 0
super
end

def tick
synchronize do
@count += 1
end
end
end

c = Counter.new
t1 = Thread.new { 100_000.times { c.tick } }
t2 = Thread.new { 100_000.times { c.tick } }
t1.join; t2.join
c.count → 200000

Are there any operations/methods in Ruby that are guaranteed/documented to be atomic?

I have a very limited knowledge on this topic. But I will try to answer as best as i can.

There is a very good article that Jesse Storimer wrote about concurrency. I would highly recommend you to read all of 3 parts about it.

http://www.jstorimer.com/blogs/workingwithcode/8100871-nobody-understands-the-gil-part-2-implementation

The conclusion on part 2 is that GIL guaranteed that native C method implementation are atomic.

The example that you gave is actually more of re-entrance problem then atomicity. I don't know if that is the same thing or how close related it is.

Like the article explain, ruby is different from event driven programming where callback are synchronous, meaning that if you send the signal USR1 twice, the second handler will be executed after the first one finished. So you will not lock on the mutex twice.

But in the signal handling in ruby is asynchronous. meaning that if you send the signal twice. the second handler will interrupt the first handler. Because the first already acquired the lock, then the second handler that try to acquire the same lock will throw an exception. And I believe that this problem is not ruby specific.

One of the way of solving this problem is by creating a queue to do signal handling. Another solution for this is using a method called "self-pipe" trick. Both of the method. are explained in this article by again the awesome Jesse Storimer :

http://rubysource.com/the-self-pipe-trick-explained/

So,
For MRI 2.0.0 is I believe still have GIL, so ruby only guarantee that native C method are atomic.

JRuby is JVM backed so my guess is all of the thread and locking mechanism are implemented on top of JVM

Rubinius 1.2 also still have the GIL so i believe it will be the same as MRI. But Rubinius 2.x remove the GIL. I haven't had much experience with Rubinius so I'm not entirely sure about it.

And to answer the question, if you are working with multi thread application in ruby. Mutex classes should guard that the block will only be executed by single thread at a time.

Learning Ruby threading - trigger an event when thread finishes

The issue isn't really about "knowing when the thread finished", but rather, how can you update a shared progress bar without race conditions.

To explain the problem: say you had a central ThreadList#progress_var variable, and as the last line of each thread, you incremented it with +=. This would introduce a race condition because two threads can perform the operation at the same time (and could overwrite each other's results).

To get around this, the typical approach is to use a Mutex which is an essential concept to understand if you're learning multithreading.

The actual implementation isn't that difficult:

require 'mutex'

class ThreadList
def initialize
@semaphore = Mutex.new
@progress_bar = 0
end
def increment_progress_bar(amount)
@semaphore.synchronize do
@progress_bar += amount
end
end
end

Because of that @semaphore.synchronize block, you can now safely call this increment_progress_bar method from threads, without the risk of race condition.

Single thread still handles concurrency request?

Quoting the chapter: "Non blocking IOs/Reactor pattern" in
http://merbist.com/2011/02/22/concurrency-in-ruby-explained/:
"this is the approach used by Twisted, EventMachine and Node.js. Ruby developers can use EventMachine or
an EventMachine based webserver like Thin as well as EM clients/drivers to make non blocking async calls."

The heart of the matter regard EventMachine.defer
*
used for integrating blocking operations into EventMachine's control flow.
The action of defer is to take the block specified in the first parameter (the "operation")
and schedule it for asynchronous execution on an internal thread pool maintained by EventMachine.

When the operation completes, it will pass the result computed by the block (if any)
back to the EventMachine reactor.
Then, EventMachine calls the block specified in the second parameter to defer (the "callback"),
as part of its normal event handling loop.

The result computed by the operation block is passed as a parameter to the callback.
You may omit the callback parameter if you don't need to execute any code after the operation completes.
*

Essentially, in response to an HTTP request, the server executes that you wrote,
invokes the process method in the Connecction class.
have a look at the code in $GEM_HOME/gems/thin-1.6.2/lib/thin/connection.rb:

# Connection between the server and client.
# This class is instanciated by EventMachine on each new connection
# that is opened.
class Connection < EventMachine::Connection
# Called when all data was received and the request
# is ready to be processed.
def process
if threaded?
@request.threaded = true
EventMachine.defer(method(:pre_process), method(:post_process))
else
@request.threaded = false
post_process(pre_process)
end
end

..here is where a threaded connection invoke EventMachine.defer

The reactor

To see where is activated the EventMachine reactor
should follow the initialization of the program:
Notice that for all Sinatra applications and middleware ($GEM_HOME/gems/sinatra-1.4.5/base.rb)
can run the Sinatra app as a self-hosted server using Thin, Puma, Mongrel, or WEBrick.

  def run!(options = {}, &block)
return if running?
set options
handler = detect_rack_handler
....

the method detect_rack_handler returns the first Rack::Handler

 return Rack::Handler.get(server_name.to_s)

in our test we require thin therefore it returns a Thin rack handler and setup a threaded server

  # Starts the server by running the Rack Handler.
def start_server(handler, server_settings, handler_name)
handler.run(self, server_settings) do |server|
....
server.threaded = settings.threaded if server.respond_to? :threaded=

$GEM_HOME/gems/thin-1.6.2/lib/thin/server.rb

# Start the server and listen for connections.
def start
raise ArgumentError, 'app required' unless @app

log_info "Thin web server (v#{VERSION::STRING} codename #{VERSION::CODENAME})"
...
log_info "Listening on #{@backend}, CTRL+C to stop"

@backend.start { setup_signals if @setup_signals }
end

$GEM_HOME/gems/thin-1.6.2/lib/thin/backends/base.rb

  # Start the backend and connect it.
def start
@stopping = false
starter = proc do
connect
yield if block_given?
@running = true
end

# Allow for early run up of eventmachine.
if EventMachine.reactor_running?
starter.call
else
@started_reactor = true
EventMachine.run(&starter)
end
end

Pure-Ruby concurrent Hash

Okay, now that you specified the actually meaning of 'threadsafe', here are two potential implementations. The following code will run forever in MRI and JRuby. The lockless implementation follows an eventual consistency model where each thread uses it's own view of the hash if the master is in flux. There is a little trickery required to make sure storing all the information in the thread doesn't leak memory, but that is handled and tested ― process size does not grow running this code. Both implementations would need more work to be 'complete', meaning delete, update, etc. would need some thinking, but either of the two concepts below will meet your requirements.

It's very important for people reading this thread to realize the whole issue is exclusive to JRuby ― in MRI the built-in Hash is sufficient.

module Cash
def Cash.new(*args, &block)
env = ENV['CASH_IMPL']
impl = env ? Cash.const_get(env) : LocklessImpl
klass = defined?(JRUBY_VERSION) ? impl : ::Hash
klass.new(*args)
end

class LocklessImpl
def initialize
@hash = {}
end

def thread_hash
thread = Thread.current
thread[:cash] ||= {}
hash = thread[:cash][thread_key]
if hash
hash
else
hash = thread[:cash][thread_key] = {}
ObjectSpace.define_finalizer(self){ thread[:cash].delete(thread_key) }
hash
end
end

def thread_key
[Thread.current.object_id, object_id]
end

def []=(key, val)
time = Time.now.to_f
tuple = [time, val]
@hash[key] = tuple
thread_hash[key] = tuple
val
end

def [](key)
# check the master value
#
val = @hash[key]

# someone else is either writing the key or it has never been set. we
# need to invalidate our own copy in either case
#
if val.nil?
thread_val = thread_hash.delete(key)
return(thread_val ? thread_val.last : nil)
end

# check our own thread local value
#
thread_val = thread_hash[key]

# in this case someone else has written a value that we have never seen so
# simply return it
#
if thread_val.nil?
return(val.last)
end

# in this case there is a master *and* a thread local value, if the master
# is newer juke our own cached copy
#
if val.first > thread_val.first
thread_hash.delete(key)
return val.last
else
return thread_val.last
end
end
end

class LockingImpl < ::Hash
require 'sync'

def initialize(*args, &block)
super
ensure
extend Sync_m
end

def sync(*args, &block)
sync_synchronize(*args, &block)
end

def [](key)
sync(:SH){ super }
end

def []=(key, val)
sync(:EX){ super }
end
end
end

if $0 == __FILE__
iteration = 0

loop do
n = 42
hash = Cash.new

threads =
Array.new(10) {
Thread.new do
Thread.current.abort_on_exception = true
n.times do |key|
hash[key] = key
raise "#{ key }=nil" if hash[key].nil?
end
end
}

threads.map{|thread| thread.join}

puts "THREADSAFE: #{ iteration += 1 }"
end
end

Concurrency and Mongoid

Sounds like you want the mongo findAndModify command which allows you to atomically retrieve and update a row.

Unfortunately mongoid doesn't appear to expose this part of the mongo api, so it looks like you'll have to drop down to the driver level for this one bit:

battle = Battle.collection.find_and_modify(query: {oppenent: current_user._id, ...},
update: {'$set' => {resolving: true})

By default the returned object does not include the modification made, but you can turn this on if you want (pass {:new => true})

The value returned is a raw hash, if my memory is correct you can do Battle.instantiate(doc) to get a Battle object back.

Rails concurrence issue - how to use locks

I think the correct way to use a Ruby Mutex is obtain a lock, run a code block, and then release the lock again, i.e. to call lock in combination with unlock...

@mutex = Mutex.new
..

def start
@mutex.lock
begin
..
ensure
@mutex.unlock rescue nil
end
end

or to use the synchronize method:

@mutex = Mutex.new
..

def start
@mutex.synchronize do
# do something
end
end

An example can be found in the Rack Middleware class Rack::Lock. But I am not sure if it helps in your case, even if you use class variables like @@mutex, because the mutex/semaphore may not be preserved between different tasks and distinct processes (I assume that the "tasks" are different processes started by "rake tasks"). The Mutex class is useful to achieve thread safety, since it implements a simple semaphore that can be used to coordinate access to shared data from multiple concurrent threads. However, there is also a nice RailsCast about Thread-Safety (unfortunately only behind a paywall).

In your case it may help to create a global lock flag in the database, or to create a global lock file in the filesystem with touch lock.txt and to remove it again with rm lock.txt when the process is over. It is possible to execute these shell commands in Ruby with Kernel.system or %x. If the file exists File.exists?("lock.txt") then the update can be suspended.

JS-style async/non-blocking callback execution with Ruby, without heavy machinery like threads?

Okay, after some fiddling with threads and studying contributions by apeiros and asQuirreL, i came up with a solution that suits me.

I'll show sample usage first, source code in the end.

Example 1: simple non-blocking execution

First, a JS example that i'm trying to mimic:

setTimeout( function() {
console.log("world");
}, 0);

console.log("hello");

// 'Will print "hello" first, then "world"'.

Here's how i can do it with my tiny Ruby library:

# You wrap all your code into this...
Branch.new do

# ...and you gain access to the `branch` method that accepts a block.
# This block runs non-blockingly, just like in JS `setTimeout(callback, 0)`.
branch { puts "world!" }

print "Hello, "

end

# Will print "Hello, world!"

Note how you don't have to take care of creating threads, waiting for them to finish. The only scaffolding required is the Branch.new { ... } wrapper.

Example 2: synchronizing threads with a mutex

Now we'll assume that we're working with some input and output shared among threads.

JS code i'm trying to reproduce with Ruby:

var
results = [],
rounds = 5;

for (var i = 1; i <= rounds; i++) {

console.log("Starting thread #" + i + ".");

// "Creating local scope"
(function(local_i) {
setTimeout( function() {

// "Assuming there's a time-consuming operation here."

results.push(local_i);
console.log("Thread #" + local_i + " has finished.");

if (results.length === rounds)
console.log("All " + rounds + " threads have completed! Bye!");

}, 0);
})(i);
}

console.log("All threads started!");

This code produces the following output:

Starting thread #1.
Starting thread #2.
Starting thread #3.
Starting thread #4.
Starting thread #5.
All threads started!
Thread #5 has finished.
Thread #4 has finished.
Thread #3 has finished.
Thread #2 has finished.
Thread #1 has finished.
All 5 threads have completed! Bye!

Notice that the callbacks finish in reverse order.

We're also gonna assume that working the results array may produce a race condition. In JS this is never an issue, but in multithreaded Ruby this has to be addressed with a mutex.

Ruby equivalent of the above:

Branch.new 1 do

# Setting up an array to be filled with that many values.
results = []
rounds = 5

# Running `branch` N times:
1.upto(rounds) do |item|

puts "Starting thread ##{item}."

# The block passed to `branch` accepts a hash with mutexes
# that you can use to synchronize threads.
branch do |mutexes|

# This imitates that the callback may take time to complete.
# Threads will finish in reverse order.
sleep (6.0 - item) / 10

# When you need a mutex, you simply request one from the hash.
# For each unique key, a new mutex will be created lazily.
mutexes[:array_and_output].synchronize do
puts "Thread ##{item} has finished!"
results.push item

if results.size == rounds
puts "All #{rounds} threads have completed! Bye!"
end
end
end
end

puts "All threads started."
end

puts "All threads finished!"

Note how you don't have to take care of creating threads, waiting for them to finish, creating mutexes and passing them into the block.

Example 3: delaying execution of the block

If you need the delay feature of setTimeout, you can do it like this.

JS:

setTimeout(function(){ console.log('Foo'); }, 2000);

Ruby:

branch(2) { puts 'Foo' }

Example 4: waiting for all threads to finish

With JS, there's no simple way to have the script wait for all threads to finish. You'll need an await/defer library for that.

But in Ruby it's possible, and Branch makes it even simpler. If you write code after the Branch.new{} wrapper, it will be executed after all branches within the wrapper have been completed. You don't need to manually ensure that all threads have finished, Branch does that for you.

Branch.new do
branch { sleep 10 }
branch { sleep 5 }

# This will be printed immediately
puts "All threads started!"
end

# This will be printed after 10 seconds (the duration of the slowest branch).
puts "All threads finished!"

Sequential Branch.new{} wrappers will be executed sequentially.

Source

# (c) lolmaus (Andrey Mikhaylov), 2014
# MIT license http://choosealicense.com/licenses/mit/

class Branch
def initialize(mutexes = 0, &block)
@threads = []
@mutexes = Hash.new { |hash, key| hash[key] = Mutex.new }

# Executing the passed block within the context
# of this class' instance.
instance_eval &block

# Waiting for all threads to finish
@threads.each { |thr| thr.join }
end

# This method will be available within a block
# passed to `Branch.new`.
def branch(delay = false, &block)

# Starting a new thread
@threads << Thread.new do

# Implementing the timeout functionality
sleep delay if delay.is_a? Numeric

# Executing the block passed to `branch`,
# providing mutexes into the block.
block.call @mutexes
end
end
end


Related Topics



Leave a reply



Submit