How do I manage ruby threads so they finish all their work?
If you modify spawn_thread_for
to save a reference to your created Thread
, then you can call Thread#join
on the thread to wait for completion:
x = Thread.new { sleep 0.1; print "x"; print "y"; print "z" }
a = Thread.new { print "a"; print "b"; sleep 0.2; print "c" }
x.join # Let the threads finish before
a.join # main thread exits...
produces:
abxyzc
(Stolen from the ri Thread.new
documentation. See the ri Thread.join
documentation for some more details.)
So, if you amend spawn_thread_for
to save the Thread references, you can join on them all:
(Untested, but ought to give the flavor)
# main thread
work_units = Queue.new # and fill the queue...
threads = []
10.downto(1) do
threads << Thread.new do
loop do
w = work_units.pop
Thread::exit() if w.nil?
do_some_work(w)
end
end
end
# main thread continues while work threads devour work
threads.each(&:join)
How to wait for all threads to complete before executing next line
Use Thread#join
which will wait termination of the thread.
To do that you need to save threads; so use map
instead of each
:
threads = all_hosts.map do |hostname|
Thread.new {
# commands
}
end
threads.each(&:join)
How to wait only for the first thread in an array to finish in Ruby?
For a thread-safe way to wait for something to happen in one of several threads, you can use a Queue.
queue = Queue.new
threads = []
threads << Thread.new { sleep 2; queue.push(nil) }
threads << Thread.new { sleep 50; queue.push(nil) }
threads << Thread.new { sleep 20; queue.push(nil) }
# wait for one
queue.pop
# clean up the rest
threads.each(&:kill).each(&:join)
How to execute a determinated quantity of threads every n milliseconds?
Thread#join
blocks until the thread finishes. Don't call it. Use sleep
to pause the current thread for a certain number of seconds.
5.times do |idx|
10.times { Thread.new { something... } }
sleep n / 1000.0
end
How to asynchronously collect results from new threads created in real time in ruby
You can have each thread push its results onto a Queue, then your main thread can read from the Queue. Reading from a Queue is a blocking operation by default, so if there are no results, your code will block and wait on the read.
http://ruby-doc.org/stdlib-2.0.0/libdoc/thread/rdoc/Queue.html
Here is an example:
require 'thread'
jobs = Queue.new
results = Queue.new
thread_pool = []
pool_size = 5
(1..pool_size).each do |i|
thread_pool << Thread.new do
loop do
job = jobs.shift #blocks waiting for a task
break if job == "!NO-MORE-JOBS!"
#Otherwise, do job...
puts "#{i}...."
sleep rand(1..5) #Simulate the time it takes to do a job
results << "thread#{i} finished #{job}" #Push some result from the job onto the Queue
#Go back and get another task from the Queue
end
end
end
#All threads are now blocking waiting for a job...
puts 'db_stuff'
db_stuff = [
'job1',
'job2',
'job3',
'job4',
'job5',
'job6',
'job7',
]
db_stuff.each do |job|
jobs << job
end
#Threads are now attacking the Queue like hungry dogs.
pool_size.times do
jobs << "!NO-MORE-JOBS!"
end
result_count = 0
loop do
result = results.shift
puts "result: #{result}"
result_count +=1
break if result_count == 7
end
Ruby synchronisation: How to make threads work one after another in proper order?
Using Queue as a PV Semaphore
You can abuse Queue, using it like a traditional PV Semaphore. To do this, you create an instance of Queue:
require 'thread'
...
sem = Queue.new
When a thread needs to wait, it calls Queue#deq:
# waiting thread
sem.deq
When some other thread wants to unblock the waiting thread, it pushes something (anything) onto the queue:
# another thread that wants to unblock the waiting thread
sem.enq :go
A Worker class
Here's a worker class that uses Queue to synchronize its start and stop:
class Worker
def initialize(worker_number)
@start = Queue.new
Thread.new do
@start.deq
puts "Thread #{worker_number}"
@when_done.call
end
end
def start
@start.enq :start
end
def when_done(&block)
@when_done = block
end
end
When constructed, a worker creates a thread, but that thread then waits on the @start
queue. Not until #start is called will the thread unblock.
When done, the thread will execute the block that was called to #when_done. We'll see how this is used in just a moment.
Creating workers
First, let's make sure that if any threads raise an exception, we get to find out about it:
Thread.abort_on_exception = true
We'll need six workers:
workers = (1..6).map { |i| Worker.new(i) }
Telling each worker what to do when it's done
Here's where #when_done comes into play:
workers.each_cons(2) do |w1, w2|
w1.when_done { w2.start }
end
This takes each pair of workers in turn. Each worker except the last is told, that when it finishes, it should start the worker after it. That just leaves the last worker. When it finishes, we want it to notify this thread:
all_done = Queue.new
workers.last.when_done { all_done.enq :done }
Let's Go!
Now all that remains is to start the first thread:
workers.first.start
and wait for the last thread to finish:
all_done.deq
The output:
Thread 1
Thread 2
Thread 3
Thread 4
Thread 5
Thread 6
How do I wait for both threads to finish and files to be ready for reading without polling?
Don't reinvent the wheel mate :) check out https://github.com/eventmachine/eventmachine (IO lib based on reactor pattern like node.js etc) or (perhaps preferably) https://github.com/celluloid/celluloid-io (IO lib based on actor pattern, better docs and active maintainers)
OPTION 1 - use EM or Celluloid to handle non-blocking sockets
EM and Celluloid are quite different, EM is reactor pattern ("same thing" as node.js, with a threadpool as workaround for blocking calls) and Celluloid is actor pattern (an actor thread pool).
Both can do non-blocking IO to/from a lot of sockets and delegate work to a lot of threads, depending on how you go about to do it. Both libs are very robust, efficient and battle tested, EM has more history but seems to have fallen slightly out of maintenance (https://www.youtube.com/watch?v=mPDs-xQhPb0), celluloid has nicer API and more active community (http://www.youtube.com/watch?v=KilbFPvLBaI).
Best advice I can give is to play with code samples that both projects provide and see what feels best. I'd go with celluloid for a new project, but that's a personal opinion - you may find that EM has more IO-related features (such as handling files, keyboard, unix sockets, ...)
OPTION 2 - use background job queues
I may have been misguided by the low level of your question :) Have you considered using some of the job queues available under ruby? There's a TON of decent and different options available, see https://www.ruby-toolbox.com/categories/Background_Jobs
OPTION 3 - DIY (not recommended)
There is a pure ruby implementation of EM, it uses IO selectables to handle sockets so it offers a pattern for what you're trying to do, check it out: https://github.com/eventmachine/eventmachine/blob/master/lib/em/pure_ruby.rb#L311 (see selectables handling).
However, given the amount of other options, hopefully you shouldn't need to resort to such low level coding.
ruby threading output
I thought when you run a ruby script, it waits until all threads/processes are done before terminating and returning?
Nope! From the documentation for Thread
:
If we don't call
thr.join
before the main thread terminates, then all other threads includingthr
will be killed.
So you’ll need to join all of them:
threads = 10.times.map do
Thread.new do
puts 'Hello, Ruby!'
end
end
threads.each &:join
Using Ruby threads, but not seeing a speed up
There are many different implementations of Ruby. The most referred is MRI (see: other question.
MRI has threads, but unfortunately uses only one CPU core at a time. That means: Only one thread will actually run at the time.
If your thread had to wait for IO to happen, there may be a speed up. Because if one thread has to wait, another thread can catch up. But your problem need the CPU all the time.
I would suggest investigate another Ruby implementation like JRuby for this kind of problem. JRuby has real threads.
Perhaps you will have a greater speed up, if you change your implementation. In the moment you recalculate every max_length
over and over again. For example: The sequence length for n = 4
will be 3
. If you calculate the length
for n = 8
, you do one step (n / 2
) and than have a current
of 4
and you will already know that n = 4
has length = 3
: Therefore length(8) = 1 + length(4) = 1 + 4 = 5
. Example:
class CollatzSequence
def initialize
@lengths = Hash.new { |h, n| cache_length(h, n) }
end
def length(n)
@lengths[n]
end
private
def cache_length(h, n)
if n <= 1
h[n] = 1
else
next_in_seqence = n.even? ? (n / 2) : (n * 3 + 1)
h[n] = 1 + h[next_in_seqence]
end
end
end
require 'benchmark'
sequencer = CollatzSequence.new
Benchmark.bm(10) do |bm|
bm.report('not cached') { sequencer.length(837799) }
bm.report('cache hit 1') { sequencer.length(837799) }
bm.report('cache hit 2') { sequencer.length(837799 * 2) }
end
# user system total real
# not cached 0.000000 0.000000 0.000000 ( 0.001489)
# cache hit 1 0.000000 0.000000 0.000000 ( 0.000007)
# cache hit 2 0.000000 0.000000 0.000000 ( 0.000011)
Related Topics
Ruby: Is There an Opposite of Include? for Ruby Arrays
When We Import CSV Data, How Eliminate "Invalid Byte Sequence in Utf-8"
Cannot Access Local Sinatra Server from Another Computer on Same Network
Ruby Convert Array into Function Arguments
If String Is Empty Then Return Some Default Value
Find or Create Record Through Factory_Girl Association
Should I Define a Main Method in My Ruby Scripts
How to Convert a JSON Formatted Key Value Pair to Ruby Hash with Symbol as Key
Ruby - Can't Modify Frozen String (Typeerror)
Can't Install Ffi -V '1.9.18' on MACos Catalina
Sidekiq Configuration for Multiple Environments
How to Get the Ip Address of My Local MAChine in Ruby
Error Installing Pg Gem on Osx
Aws-Sdk for Ruby Access Folder Within Bucket
Phusion Passenger Error: You Have Activated Rack 1.2.1, But Your Gemfile Requires Rack 1.2.2