How to Communicate with Threads in Ruby

How to communicate with threads in Ruby?

To send data to a thread, you can use Ruby Queue:

http://www.ruby-doc.org/stdlib-1.9.3/libdoc/thread/rdoc/Queue.html

How to call Object Methods while object is in a thread? Ruby (not rails)

When you write this:

Thread.new { ... }

The { ... } is a block that will be executed by the new thread. If you want the thread to do something interesting,
then you have to provide code (the ...) to do the interesting thing.


Typically, the original thread goes on to do something else concurrently with the new thread:

Thread.new { ... }
do_something_else()

Doing things concurrently (maybe even, in parallel) is the whole point of multi-threading after all.


Threads communicate by accessing shared objects... but it doesn't make any sense for one thread to look at a shared object until it knows the the the other thread has finished updating it. The simplest way is to join() the other thread.

t = Thread.new { ... }
do_something_else()
t.join() # this simply waits until the thread has finished.

Now about those shared objects. It's especially easy in Ruby.

shared_object = Hash.new()
t = Thread.new {
shared_object["a"] = ...
shared_object["b"] = ...
...
}
do_something_else()
t.join()
# after the join() call returns, it's safe to look in shared_object
# to see what the other thread left for us.
do_something_with(shared_object["a"])
...

There's a whole other issue that arises if you need the main thread to access shared_object concurrently with the new thread (i.e., before it calls t.join()). Google for "race condition", or "locking", or "mutual exclusion", or "mutex" for more information about why that's tricky, and how to do it safely.

Can I use class variables to communicate with threads?

No, it's not ok to update class variables from multiple threads like your example.

Take a look at the explanation of why, and how to solve this using the Mutex class:

http://www.ruby-doc.org/docs/ProgrammingRuby/html/tut_threads.html

Ruby Threads and Websockets

EDITED

After our discussion in the comments, I decided to review the code I posted and write a small DDP encapsulation leveraging Iodine's Websocket Client (which I prefer, since I'm the author).

I have to admit, I really had fun thinking about this question.
Attached is a simplified code for a Meteor connector using Iodine. This is truly basic and includes only: renewing the connection if dropped, completing the handshake and answering ping-pongs.

To use this code together with the FFI workflow concept initiated in the first answer, use:

# create the connection to the Meteor server
# and setup the callback for incoming messages:
meteor_ddp = IodineDDP.new('ws://chat.n-k.de/websocket') do |message|
Iodine.debug "got message #{message}, it's a Hash"
end

# next, create a dedicated thread for the FFI,
# it will run until the FFI had finished
# or the application exits.
Thread.new do
# initialize FFI interface
data = StringIO.new "initialize FFI interface - StringIO will be our data for now"
# imagine it takes time
sleep 1
# Meteor will respond to these with error messages
(meteor_ddp << data.read(3)) && sleep(0.2) until data.eof?
sleep 1
Iodine.signal_exit
end

# it seems Meteor sends pings and since we already answer them in the
# class object, it should be enough...
# but we can do it too, if we want to:
Iodine.run_every(5) { meteor_ddp << {msg: :ping}.to_json }

As to the Meteor DDP connection class, this could probably be achieved like so:

require 'iodine/client'

class IodineDDP
attr_reader :session
attr_reader :server_id
def initialize url, &block
@url = url
@ddp_initialized = false
@session = nil
@server_id = nil
@block = block
@closed = false
connect_websocket
end

def << message
Iodine.debug "Writing message #{message}"
ensure_connection
@ws << message
end
alias :write :<<

def close
@closed = true
@ws.on_close { nil }
@ws.close
end

protected

def on_message data
# make sure the DDP handshake is complete
return handshake data unless @ddp_initialized
data = JSON.parse(data)
Iodine.debug "Got message: #{data}"
return write({msg: 'pong', id: data['id'] }.to_json) if data['msg'] == 'ping'
return true if data['msg'] == 'pong'
@block.call data
end
def on_close
@ddp_initialized = false
connect_websocket
end

def ensure_connection
return true unless @ws.closed? || !@ddp_initialized
raise 'This DDP instance was shutdown using `close`, it will not be renewed' if @closed
raise 'DDP disconnected - not enough threads to ensure reconnection' if (@ws.closed? || !@ddp_initialized) && Iodine.threads == 1
timeout = Iodine.time + 3
sleep 0.2 until @ddp_initialized && Iodine.time <= timeout
raise 'DDP disconnected - reconnection timed-out.' if @ws.closed? || !@ddp_initialized
end

def connect_websocket
@___on_message_proc ||= method(:on_message)
@___on_close_proc ||= method(:on_close)
@ws = ::Iodine::Http::WebsocketClient.connect(@url, on_message: @___on_message_proc, on_open: @___on_open_proc, on_close: @___on_close_proc)
# inform
Iodine.debug "initiating a new DDP connection to #{@url}"
# start the DDP handshake
handshake
end
def handshake last_message = nil
raise 'Handshake failed because the websocket was closed or missing' if @ws.nil? || @ws.closed?
unless last_message # this is the first message sent
Iodine.debug "Meteor DDP handshake initiated."
msg = {msg: "connect", version: "1", support: ["1"]}
msg[:session] = @session if @session
return(@ws << msg.to_json)
end
message = JSON.parse(last_message)
raise "Meteor DDP connection error, requires version #{message['version']}: #{last_message}" if message['msg'] == 'failed'
if message['msg'] == 'connected'
# inform
Iodine.debug "Meteor DDP handshake complete."
@session = message['session']
return @ddp_initialized = true
else
return @server_id = message['server_id'] if message['server_id']
Iodine.error "Invalid handshake data - closing connection."
close
end
end
end

# we need at least two threads for the IodineDDP#ensure_connection
Iodine.threads = 3

# # if we are inside a larger application, call:
# Iodine.force_start!

# # if we are on irb:
exit
# no need to write anything if this is the whole of the script

How to run a background thread in ruby?

Read the documentation for Thread.new (which is the same as Thread.start here)

Thread.start(commands) runs the commands method and passes its return value to a thread (which then does nothing). It's blocking because you aren't starting any threads when gets is called. You want

Thread.start { commands }

Here's a similar demo script that works just like you would expect

def commands
while gets.strip !~ /^exit$/i
puts "Invalid command"
end
abort "Exiting the program"
end

Thread.start { commands }

loop do
puts "Type exit:"
sleep 2
end

Safely wake thread in Ruby

The easiest way to communicate with a thread uses a Thread#Queue object. Thread#Queue is a thread-safe FIFO queue.

require "thread"

@queue = Queue.new

When the tread wants to block until signaled, it reads from the queue. The thread will stop while the queue is empty:

@queue.deq

To awaken the thread, write something into the queue:

@queue.enq :wakeup

Here, we just threw a symbol into the queue. But you can also write things into the queue that you want the thread to process. For example, if a thread is processing URLs, it can retrieve them from the queue:

loop do
url = @queue.deq
# process the url
end

And some other thread can add URLs to the queue:

@queue.enq "http://stackoverflow.com"
@queue.enq "http://meta.stackoverflow.com"


Related Topics



Leave a reply



Submit