Ruby Threads and Variable

Ruby threads and variable

The block that is passed to Thread.new may actually begin at some point in the future, and by that time the value of i may have changed. In your case, they all have incremented up to 10 prior to when all the threads actually run.

To fix this, use the form of Thread.new that accepts a parameter, in addition to the block:

require 'thread'

def run(i)
puts i
end

while true
for i in 0..10
Thread.new(i) { |j| run(j) }
end
sleep(100)
end

This sets the block variable j to the value of i at the time new was called.

Ruby: How do I share a global variable amongst threads that are running an object.method

The problem is in the function start_threads. You called io.change(number) in that function, but the local variable io is not defined in that function. The consequence is that both threads died due to NameError.

You can change the start_threads function as this:

def start_threads(numbers, io)
numbers.each do |number|
$threads[number] = Thread.new {io.change(number)}
end
end

and call it like this:

start_threads(numbers, io)

ruby local thread variable access from other methods

This seems like you'll trip yourself up a lot. It might be better to initialize a new object for each thread.

class Tour
def self.destinations
threads = []

[:new_york, :london, :sydney].each do |city|
threads << Thread.new { Destination.new(city).go }
end

threads.each(&:join)
end
end

class Destination
attr_reader :location

def initialize(location)
@location = location
end

def go
puts "I am going to visit #{location}."
end
end

# Tour.destinations

Suggested reading: https://blog.engineyard.com/2011/a-modern-guide-to-threads

How can I change a var in a thread?

Thread safe local variables are best off handled slightly differently quick useful document

thr = Thread.new {
loop {
print Thread.current[:var]
sleep 5
}
}

thr[:var] = "meep\n"

Using Thread.current[:var] inside the thread, you seem to be able to set the variable hash and print it out as needed.

Ruby threads calling the same function with different arguments

The problem is the for-loop. In Ruby, it reuses a single variable.
So all blocks of the thread bodies access the same variable. An this variable is 6 at the end of the loop. The thread itself may start only after the loop has ended.

You can solve this by using the each-loops. They are more cleanly implemented, each loop variable exists on its own.

(1..num_threads).each do | thread_no |
puts "Creating thread no. "+thread_no.to_s
threads << Thread.new{test(thread_no)}
end

Unfortunately, for loops in ruby are a source of surprises. So it is best to always use each loops.

Addition:
You an also give Thread.new one or several parameters, and these parameters get passed into the thread body block. This way you can make sure that the block uses no vars outside it's own scope, so it also works with for-loops.

threads <<  Thread.new(thread_no){|n| test(n) }

Accessing a variable within a rails thread

UPDATED EDIT AT END: Shows working code. Main module unmodified except for debugging code. Note: I did experience the issue I already noted regarding the need to unsubscribe prior to termination.

The code looks correct. I'd like to see how you are instantiating it.

In config/application.rb, you probably have at least something like:

require 'ws_communication'
config.middleware.use WsCommunication

Then, in your JavaScript client, you should have something like this:

var ws = new WebSocket(uri);

Do you instantiate another instance of WsCommunication? That would set @clients to an empty array and could exhibit your symptoms. Something like this would be incorrect:

var ws = new WsCommunication;

It would help us if you would show the client and, perhaps, config/application.rb if this post does not help.

By the way, I agree with the comment that @clients should be protected by a mutex on any update, if not reads as well. It's a dynamic structure that could change at any time in an event-driven system. redis-mutex is a good option. (Hope that link is correct as Github seems to be throwing 500 errors on everything at the moment.)

You might also note that $redis.publish returns an integer value of the number of clients that received the message.

Finally, you might find that you need to ensure that your channel is unsubscribed before termination. I've had situations where I've ended up sending each message multiple, even many, times because of earlier subscriptions to the same channel that weren't cleaned up. Since you are subscribing to the channel within a thread, you will need to unsubscribe within that same thread or the process will just "hang" waiting for the right thread to magically appear. I handle that situation by setting an "unsubscribe" flag and then sending a message. Then, within the on.message block, I test for the unsubscribe flag and issue the unsubscribe there.

The module you provided, with only minor debugging modifications:

require 'faye/websocket'
require 'redis'

class WsCommunication
KEEPALIVE_TIME = 15 #seconds
CHANNEL = 'vip-deck'

def initialize(app)
@app = app
@clients = []
uri = URI.parse(ENV['REDISCLOUD_URL'])
$redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
Thread.new do
redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
redis_sub.subscribe(CHANNEL) do |on|
on.message do |channel, msg|
puts "Message event. Clients receiving:#{@clients.count};"
@clients.each { |ws| ws.send(msg) }
end
end
end
end

def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})

ws.on :open do |event|
@clients << ws
puts "Open event. Clients open:#{@clients.count};"
end

ws.on :message do |event|
receivers = $redis.publish(CHANNEL, event.data)
puts "Message published:#{event.data}; Receivers:#{receivers};"
end

ws.on :close do |event|
@clients.delete(ws)
puts "Close event. Clients open:#{@clients.count};"
ws = nil
end

ws.rack_response
else
@app.call(env)
end
end
end

The test subscriber code I provided:

# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do

ws = WebSocket::Client::Simple.connect url

ws.on :message do |msg|
puts msg
end

ws.on :open do
puts "-- Subscriber open (#{ws.url})"
end

ws.on :close do |e|
puts "-- Subscriber close (#{e.inspect})"
exit 1
end

ws.on :error do |e|
puts "-- Subscriber error (#{e.inspect})"
end

end

The test publisher code I provided. Publisher and Subscriber could easily be combined, as these are just tests:

# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do
count ||= 0
timer = EventMachine.add_periodic_timer(5+rand(5)) do
count += 1
send({"MESSAGE": "COUNT:#{count};"})
end

@ws = WebSocket::Client::Simple.connect url

@ws.on :message do |msg|
puts msg
end

@ws.on :open do
puts "-- Publisher open"
end

@ws.on :close do |e|
puts "-- Publisher close (#{e.inspect})"
exit 1
end

@ws.on :error do |e|
puts "-- Publisher error (#{e.inspect})"
@ws.close
end

def self.send message
payload = message.is_a?(Hash) ? message : {payload: message}
@ws.send(payload.to_json)
end
end

A sample config.ru which runs all this at the rack middleware layer:

require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new

This is Main. I stripped it down out of my running version so it might need tweaked if you use it:

%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)

Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']

class Main < Sinatra::Base

env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
get "/" do
erb :"index.html"
end

get "/assets/js/application.js" do
content_type :js
@scheme = env == "production" ? "wss://" : "ws://"
erb :"application.js"
end
end

Thread Safety: Class Variables in Ruby

Instance variables are not thread safe (and class variables are even less thread safe)

Example 2 and 3, both with instance variables, are equivalent, and they are NOT thread safe, like @VincentXie stated. However, here is a better example to demonstrate why they are not:

class Foo
def self.bar(message)
@bar ||= message
end
end

t1 = Thread.new do
puts "bar is #{Foo.bar('thread1')}"
end

t2 = Thread.new do
puts "bar is #{Foo.bar('thread2')}"
end

sleep 2

t1.join
t2.join

=> bar is thread1
=> bar is thread1

Because the instance variable is shared amongst all of the threads, like @VincentXie stated in his comment.

PS: Instance variables are sometimes referred to as "class instance variables", depending on the context in which they are used:

When self is a class, they are instance variables of classes(class
instance variables). When self is a object, they are instance
variables of objects(instance variables). - WindorC's answer to a question about this



Related Topics



Leave a reply



Submit