Pure-Ruby Concurrent Hash

Pure-Ruby concurrent Hash

Okay, now that you specified the actually meaning of 'threadsafe', here are two potential implementations. The following code will run forever in MRI and JRuby. The lockless implementation follows an eventual consistency model where each thread uses it's own view of the hash if the master is in flux. There is a little trickery required to make sure storing all the information in the thread doesn't leak memory, but that is handled and tested ― process size does not grow running this code. Both implementations would need more work to be 'complete', meaning delete, update, etc. would need some thinking, but either of the two concepts below will meet your requirements.

It's very important for people reading this thread to realize the whole issue is exclusive to JRuby ― in MRI the built-in Hash is sufficient.

module Cash
def Cash.new(*args, &block)
env = ENV['CASH_IMPL']
impl = env ? Cash.const_get(env) : LocklessImpl
klass = defined?(JRUBY_VERSION) ? impl : ::Hash
klass.new(*args)
end

class LocklessImpl
def initialize
@hash = {}
end

def thread_hash
thread = Thread.current
thread[:cash] ||= {}
hash = thread[:cash][thread_key]
if hash
hash
else
hash = thread[:cash][thread_key] = {}
ObjectSpace.define_finalizer(self){ thread[:cash].delete(thread_key) }
hash
end
end

def thread_key
[Thread.current.object_id, object_id]
end

def []=(key, val)
time = Time.now.to_f
tuple = [time, val]
@hash[key] = tuple
thread_hash[key] = tuple
val
end

def [](key)
# check the master value
#
val = @hash[key]

# someone else is either writing the key or it has never been set. we
# need to invalidate our own copy in either case
#
if val.nil?
thread_val = thread_hash.delete(key)
return(thread_val ? thread_val.last : nil)
end

# check our own thread local value
#
thread_val = thread_hash[key]

# in this case someone else has written a value that we have never seen so
# simply return it
#
if thread_val.nil?
return(val.last)
end

# in this case there is a master *and* a thread local value, if the master
# is newer juke our own cached copy
#
if val.first > thread_val.first
thread_hash.delete(key)
return val.last
else
return thread_val.last
end
end
end

class LockingImpl < ::Hash
require 'sync'

def initialize(*args, &block)
super
ensure
extend Sync_m
end

def sync(*args, &block)
sync_synchronize(*args, &block)
end

def [](key)
sync(:SH){ super }
end

def []=(key, val)
sync(:EX){ super }
end
end
end

if $0 == __FILE__
iteration = 0

loop do
n = 42
hash = Cash.new

threads =
Array.new(10) {
Thread.new do
Thread.current.abort_on_exception = true
n.times do |key|
hash[key] = key
raise "#{ key }=nil" if hash[key].nil?
end
end
}

threads.map{|thread| thread.join}

puts "THREADSAFE: #{ iteration += 1 }"
end
end

Ruby: In-memory Hash-based and thread-safe buffer?

In general, the way that you provide access to a global value in a thread-safe manner is to use the built-in Mutex class:

$buffer = {}
$bufflock = Mutex.new

threads = (0..2).map do |i|
Thread.new do
puts "Starting Thread #{i}"
3.times do
puts "Thread #{i} got: #{$buffer[:foo].inspect}"
$bufflock.synchronize{ $buffer[:foo] = ($buffer[:foo] || 1) * (i+1) }
sleep rand
end
puts "Ending Thread #{i}"
end
end
threads.each{ |t| t.join } # Wait for all threads to complete

#=> Starting Thread 0
#=> Thread 0 got: nil
#=> Starting Thread 1
#=> Thread 1 got: 1
#=> Starting Thread 2
#=> Thread 2 got: 2
#=> Thread 1 got: 6
#=> Thread 1 got: 12
#=> Ending Thread 1
#=> Thread 0 got: 24
#=> Thread 2 got: 24
#=> Thread 0 got: 72
#=> Thread 2 got: 72
#=> Ending Thread 0
#=> Ending Thread 2

Code inside a Mutex#synchronize block is atomic per thread; one thread cannot go into $bufflock until the previous thread is done with the block.

See also: Pure-Ruby concurrent Hash

Fast Thread-Safe Ruby Hash with strong read bias

I would suggest a wrapper which protects the Hash with a read-write lock. I couldn't find a pre-built Ruby read-write lock implementation (of course JRuby users can use java.util.concurrent.ReentrantReadWriteLock), so I built one. You can see it at:

https://github.com/alexdowad/showcase/blob/master/ruby-threads/read_write_lock.rb

Me and two other people have tested it on MRI 1.9.2, MRI 1.9.3, and JRuby. It seems to be working correctly (though I still want to do more thorough testing). It has a built-in test script; if you have a multi-core machine, please download, try running it, and let me know the results! As far as performance goes, it trounces Mutex in situations with a read bias. Even in situations with 80-90% writes, it still seems a bit faster than using a Mutex.

I am also planning to do a Ruby port of Java's ConcurrentHashMap.

Necessity of the locks while working with concurrent hash map

Let's look at the operations inside public void put(String some).

  1. map.put(counter++, tmp);
  2. sum.getAndAdd(tmp);

Now let's look at the individual parts.

  1. counter is a volatile variable. So it only provides memory visibility but not atomicity. Since counter++ is a compound operation, you need a lock to achieve atomicity.

  2. map.put(key, value) is atomic since it is a ConcurrentHashMap.

  3. sum.getAndAdd(tmp) is atomic since it is a AtomicInteger.

As you can see, except counter++ every other operation is atomic. However, you are trying to achieve some function by combining all these operations. To achieve atomicity at the functionality level, you need a lock. This will help you to avoid surprising side effects when the threads interleave between the individual atomic operations.

So you need a lock because counter++ is not atomic and you want to combine a few atomic operations to achieve some functionality (assuming you want this to be atomic).

Is `hash` on a number consistent within a single ruby process?

Yes, it is guaranteed to be the same within one Ruby process. Hash relies on this fact to guarantee unique keys. Otherwise long-lived processes could end up with duplicate hash keys when an object gets a new hash value.

Regarding implementation and memory locations, the implementation of hash is not guaranteed by Ruby. The only requirement is:

This function must have the property that a.eql?(b) implies a.hash ==
b.hash
.

In YARV, for example, many objects do not exist in memory at all (e.g. Fixnums) and are simply identified by a special object ID. x.object_id == x * 2 + 1 for any fixnum.

Document Server: Handling Concurrent Saves

My suggestion would be something like your first one. When the first user (Bob) opens the document, he acquires a lock so that other users can only read the current document. If the user saves the document while he is using it, he keeps the lock. Only when he exits the document, it is unlocked and other people can edit it.

If the second user (Kate) opens the document while Bob has the lock on it, Kate will get a message saying the document is uneditable but she can read it until it the lock has been released.

So what happens when Bob acquires the lock, maybe saves the document once or twice but then exits the application leaving the lock hanging?

As you said yourself, requiring the client with the lock to send pings at a certain frequency is probably the best option. If you don't get a ping from the client for a set amount of time, this effectively means his client is not responding anymore. If this is a web application you can use javascript for the pings. The document that was last saved releases its lock and Kate can now acquire it.

A ping can contain the name of the document that the client has a lock on, and the server can calculate when the last ping for that document was received.



Related Topics



Leave a reply



Submit