Ruby threads and variable
The block that is passed to Thread.new
may actually begin at some point in the future, and by that time the value of i
may have changed. In your case, they all have incremented up to 10
prior to when all the threads actually run.
To fix this, use the form of Thread.new
that accepts a parameter, in addition to the block:
require 'thread'
def run(i)
puts i
end
while true
for i in 0..10
Thread.new(i) { |j| run(j) }
end
sleep(100)
end
This sets the block variable j
to the value of i
at the time new
was called.
Ruby: How do I share a global variable amongst threads that are running an object.method
The problem is in the function start_threads
. You called io.change(number)
in that function, but the local variable io
is not defined in that function. The consequence is that both threads died due to NameError
.
You can change the start_threads
function as this:
def start_threads(numbers, io)
numbers.each do |number|
$threads[number] = Thread.new {io.change(number)}
end
end
and call it like this:
start_threads(numbers, io)
ruby local thread variable access from other methods
This seems like you'll trip yourself up a lot. It might be better to initialize a new object for each thread.
class Tour
def self.destinations
threads = []
[:new_york, :london, :sydney].each do |city|
threads << Thread.new { Destination.new(city).go }
end
threads.each(&:join)
end
end
class Destination
attr_reader :location
def initialize(location)
@location = location
end
def go
puts "I am going to visit #{location}."
end
end
# Tour.destinations
Suggested reading: https://blog.engineyard.com/2011/a-modern-guide-to-threads
How can I change a var in a thread?
Thread safe local variables are best off handled slightly differently quick useful document
thr = Thread.new {
loop {
print Thread.current[:var]
sleep 5
}
}
thr[:var] = "meep\n"
Using Thread.current[:var]
inside the thread, you seem to be able to set the variable hash and print it out as needed.
Ruby threads calling the same function with different arguments
The problem is the for
-loop. In Ruby, it reuses a single variable.
So all blocks of the thread bodies access the same variable. An this variable is 6 at the end of the loop. The thread itself may start only after the loop has ended.
You can solve this by using the each
-loops. They are more cleanly implemented, each loop variable exists on its own.
(1..num_threads).each do | thread_no |
puts "Creating thread no. "+thread_no.to_s
threads << Thread.new{test(thread_no)}
end
Unfortunately, for
loops in ruby are a source of surprises. So it is best to always use each
loops.
Addition:
You an also give Thread.new
one or several parameters, and these parameters get passed into the thread body block. This way you can make sure that the block uses no vars outside it's own scope, so it also works with for-loops.
threads << Thread.new(thread_no){|n| test(n) }
Accessing a variable within a rails thread
UPDATED EDIT AT END: Shows working code. Main module unmodified except for debugging code. Note: I did experience the issue I already noted regarding the need to unsubscribe prior to termination.
The code looks correct. I'd like to see how you are instantiating it.
In config/application.rb, you probably have at least something like:
require 'ws_communication'
config.middleware.use WsCommunication
Then, in your JavaScript client, you should have something like this:
var ws = new WebSocket(uri);
Do you instantiate another instance of WsCommunication? That would set @clients to an empty array and could exhibit your symptoms. Something like this would be incorrect:
var ws = new WsCommunication;
It would help us if you would show the client and, perhaps, config/application.rb if this post does not help.
By the way, I agree with the comment that @clients should be protected by a mutex on any update, if not reads as well. It's a dynamic structure that could change at any time in an event-driven system. redis-mutex is a good option. (Hope that link is correct as Github seems to be throwing 500 errors on everything at the moment.)
You might also note that $redis.publish returns an integer value of the number of clients that received the message.
Finally, you might find that you need to ensure that your channel is unsubscribed before termination. I've had situations where I've ended up sending each message multiple, even many, times because of earlier subscriptions to the same channel that weren't cleaned up. Since you are subscribing to the channel within a thread, you will need to unsubscribe within that same thread or the process will just "hang" waiting for the right thread to magically appear. I handle that situation by setting an "unsubscribe" flag and then sending a message. Then, within the on.message block, I test for the unsubscribe flag and issue the unsubscribe there.
The module you provided, with only minor debugging modifications:
require 'faye/websocket'
require 'redis'
class WsCommunication
KEEPALIVE_TIME = 15 #seconds
CHANNEL = 'vip-deck'
def initialize(app)
@app = app
@clients = []
uri = URI.parse(ENV['REDISCLOUD_URL'])
$redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
Thread.new do
redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
redis_sub.subscribe(CHANNEL) do |on|
on.message do |channel, msg|
puts "Message event. Clients receiving:#{@clients.count};"
@clients.each { |ws| ws.send(msg) }
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
ws.on :open do |event|
@clients << ws
puts "Open event. Clients open:#{@clients.count};"
end
ws.on :message do |event|
receivers = $redis.publish(CHANNEL, event.data)
puts "Message published:#{event.data}; Receivers:#{receivers};"
end
ws.on :close do |event|
@clients.delete(ws)
puts "Close event. Clients open:#{@clients.count};"
ws = nil
end
ws.rack_response
else
@app.call(env)
end
end
end
The test subscriber code I provided:
# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
ws = WebSocket::Client::Simple.connect url
ws.on :message do |msg|
puts msg
end
ws.on :open do
puts "-- Subscriber open (#{ws.url})"
end
ws.on :close do |e|
puts "-- Subscriber close (#{e.inspect})"
exit 1
end
ws.on :error do |e|
puts "-- Subscriber error (#{e.inspect})"
end
end
The test publisher code I provided. Publisher and Subscriber could easily be combined, as these are just tests:
# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
count ||= 0
timer = EventMachine.add_periodic_timer(5+rand(5)) do
count += 1
send({"MESSAGE": "COUNT:#{count};"})
end
@ws = WebSocket::Client::Simple.connect url
@ws.on :message do |msg|
puts msg
end
@ws.on :open do
puts "-- Publisher open"
end
@ws.on :close do |e|
puts "-- Publisher close (#{e.inspect})"
exit 1
end
@ws.on :error do |e|
puts "-- Publisher error (#{e.inspect})"
@ws.close
end
def self.send message
payload = message.is_a?(Hash) ? message : {payload: message}
@ws.send(payload.to_json)
end
end
A sample config.ru which runs all this at the rack middleware layer:
require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new
This is Main. I stripped it down out of my running version so it might need tweaked if you use it:
%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)
Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
class Main < Sinatra::Base
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
get "/" do
erb :"index.html"
end
get "/assets/js/application.js" do
content_type :js
@scheme = env == "production" ? "wss://" : "ws://"
erb :"application.js"
end
end
Thread Safety: Class Variables in Ruby
Instance variables are not thread safe (and class variables are even less thread safe)
Example 2 and 3, both with instance variables, are equivalent, and they are NOT thread safe, like @VincentXie stated. However, here is a better example to demonstrate why they are not:
class Foo
def self.bar(message)
@bar ||= message
end
end
t1 = Thread.new do
puts "bar is #{Foo.bar('thread1')}"
end
t2 = Thread.new do
puts "bar is #{Foo.bar('thread2')}"
end
sleep 2
t1.join
t2.join
=> bar is thread1
=> bar is thread1
Because the instance variable is shared amongst all of the threads, like @VincentXie stated in his comment.
PS: Instance variables are sometimes referred to as "class instance variables", depending on the context in which they are used:
When self is a class, they are instance variables of classes(class
instance variables). When self is a object, they are instance
variables of objects(instance variables). - WindorC's answer to a question about this
Related Topics
Creating a Gmail Draft with Recipients Through Gmail API
Can a Watir Browser Object Be Re-Used in a Later Ruby Process
Trouble with Google Apps API and Service Accounts in Ruby
Installing MySQL-2.9.0 Gem on Windows Fails Due to Lack of Libmysql
Activerecord User-Supplied Column Name
Respond_With Redirect with Notice Flash Message Not Working
Word Document.Saveas Ignores Encoding, When Calling Through Ole, from Ruby or Vbs
Have Delayed_Job Log "Puts", SQL Queries and Jobs Status
Unescaping Characters in a String with Ruby
How to Install "Readline" for Rails Console
Ruby 'Require' Call Fails on Custom Code
Controller Method #Show Getting Called