How to Manage Ruby Threads So They Finish All Their Work

How do I manage ruby threads so they finish all their work?

If you modify spawn_thread_for to save a reference to your created Thread, then you can call Thread#join on the thread to wait for completion:

x = Thread.new { sleep 0.1; print "x"; print "y"; print "z" }
a = Thread.new { print "a"; print "b"; sleep 0.2; print "c" }
x.join # Let the threads finish before
a.join # main thread exits...

produces:

abxyzc

(Stolen from the ri Thread.new documentation. See the ri Thread.join documentation for some more details.)

So, if you amend spawn_thread_for to save the Thread references, you can join on them all:

(Untested, but ought to give the flavor)

# main thread
work_units = Queue.new # and fill the queue...

threads = []
10.downto(1) do
threads << Thread.new do
loop do
w = work_units.pop
Thread::exit() if w.nil?
do_some_work(w)
end
end
end

# main thread continues while work threads devour work

threads.each(&:join)

How to wait for all threads to complete before executing next line

Use Thread#join which will wait termination of the thread.

To do that you need to save threads; so use map instead of each:

threads = all_hosts.map do |hostname|   
Thread.new {
# commands
}
end

threads.each(&:join)

How to wait only for the first thread in an array to finish in Ruby?

For a thread-safe way to wait for something to happen in one of several threads, you can use a Queue.

queue = Queue.new
threads = []

threads << Thread.new { sleep 2; queue.push(nil) }
threads << Thread.new { sleep 50; queue.push(nil) }
threads << Thread.new { sleep 20; queue.push(nil) }

# wait for one
queue.pop

# clean up the rest
threads.each(&:kill).each(&:join)

How to execute a determinated quantity of threads every n milliseconds?

Thread#join blocks until the thread finishes. Don't call it. Use sleep to pause the current thread for a certain number of seconds.

5.times do |idx|
10.times { Thread.new { something... } }
sleep n / 1000.0
end

How to asynchronously collect results from new threads created in real time in ruby

You can have each thread push its results onto a Queue, then your main thread can read from the Queue. Reading from a Queue is a blocking operation by default, so if there are no results, your code will block and wait on the read.

http://ruby-doc.org/stdlib-2.0.0/libdoc/thread/rdoc/Queue.html

Here is an example:

require 'thread'

jobs = Queue.new
results = Queue.new

thread_pool = []
pool_size = 5

(1..pool_size).each do |i|
thread_pool << Thread.new do
loop do
job = jobs.shift #blocks waiting for a task
break if job == "!NO-MORE-JOBS!"

#Otherwise, do job...
puts "#{i}...."
sleep rand(1..5) #Simulate the time it takes to do a job
results << "thread#{i} finished #{job}" #Push some result from the job onto the Queue
#Go back and get another task from the Queue
end
end
end

#All threads are now blocking waiting for a job...
puts 'db_stuff'
db_stuff = [
'job1',
'job2',
'job3',
'job4',
'job5',
'job6',
'job7',
]

db_stuff.each do |job|
jobs << job
end

#Threads are now attacking the Queue like hungry dogs.

pool_size.times do
jobs << "!NO-MORE-JOBS!"
end

result_count = 0

loop do
result = results.shift
puts "result: #{result}"
result_count +=1
break if result_count == 7
end

Ruby synchronisation: How to make threads work one after another in proper order?

Using Queue as a PV Semaphore

You can abuse Queue, using it like a traditional PV Semaphore. To do this, you create an instance of Queue:

require 'thread'
...
sem = Queue.new

When a thread needs to wait, it calls Queue#deq:

# waiting thread
sem.deq

When some other thread wants to unblock the waiting thread, it pushes something (anything) onto the queue:

# another thread that wants to unblock the waiting thread
sem.enq :go

A Worker class

Here's a worker class that uses Queue to synchronize its start and stop:

class Worker

def initialize(worker_number)
@start = Queue.new
Thread.new do
@start.deq
puts "Thread #{worker_number}"
@when_done.call
end
end

def start
@start.enq :start
end

def when_done(&block)
@when_done = block
end

end

When constructed, a worker creates a thread, but that thread then waits on the @start queue. Not until #start is called will the thread unblock.

When done, the thread will execute the block that was called to #when_done. We'll see how this is used in just a moment.

Creating workers

First, let's make sure that if any threads raise an exception, we get to find out about it:

Thread.abort_on_exception = true

We'll need six workers:

workers = (1..6).map { |i| Worker.new(i) }

Telling each worker what to do when it's done

Here's where #when_done comes into play:

workers.each_cons(2) do |w1, w2|
w1.when_done { w2.start }
end

This takes each pair of workers in turn. Each worker except the last is told, that when it finishes, it should start the worker after it. That just leaves the last worker. When it finishes, we want it to notify this thread:

all_done = Queue.new
workers.last.when_done { all_done.enq :done }

Let's Go!

Now all that remains is to start the first thread:

workers.first.start

and wait for the last thread to finish:

all_done.deq

The output:

Thread 1
Thread 2
Thread 3
Thread 4
Thread 5
Thread 6

How do I wait for both threads to finish and files to be ready for reading without polling?

Don't reinvent the wheel mate :) check out https://github.com/eventmachine/eventmachine (IO lib based on reactor pattern like node.js etc) or (perhaps preferably) https://github.com/celluloid/celluloid-io (IO lib based on actor pattern, better docs and active maintainers)

OPTION 1 - use EM or Celluloid to handle non-blocking sockets

EM and Celluloid are quite different, EM is reactor pattern ("same thing" as node.js, with a threadpool as workaround for blocking calls) and Celluloid is actor pattern (an actor thread pool).

Both can do non-blocking IO to/from a lot of sockets and delegate work to a lot of threads, depending on how you go about to do it. Both libs are very robust, efficient and battle tested, EM has more history but seems to have fallen slightly out of maintenance (https://www.youtube.com/watch?v=mPDs-xQhPb0), celluloid has nicer API and more active community (http://www.youtube.com/watch?v=KilbFPvLBaI).

Best advice I can give is to play with code samples that both projects provide and see what feels best. I'd go with celluloid for a new project, but that's a personal opinion - you may find that EM has more IO-related features (such as handling files, keyboard, unix sockets, ...)

OPTION 2 - use background job queues

I may have been misguided by the low level of your question :) Have you considered using some of the job queues available under ruby? There's a TON of decent and different options available, see https://www.ruby-toolbox.com/categories/Background_Jobs

OPTION 3 - DIY (not recommended)

There is a pure ruby implementation of EM, it uses IO selectables to handle sockets so it offers a pattern for what you're trying to do, check it out: https://github.com/eventmachine/eventmachine/blob/master/lib/em/pure_ruby.rb#L311 (see selectables handling).

However, given the amount of other options, hopefully you shouldn't need to resort to such low level coding.

ruby threading output

I thought when you run a ruby script, it waits until all threads/processes are done before terminating and returning?

Nope! From the documentation for Thread:

If we don't call thr.join before the main thread terminates, then all other threads including thr will be killed.

So you’ll need to join all of them:

threads = 10.times.map do
Thread.new do
puts 'Hello, Ruby!'
end
end

threads.each &:join

Using Ruby threads, but not seeing a speed up

There are many different implementations of Ruby. The most referred is MRI (see: other question.

MRI has threads, but unfortunately uses only one CPU core at a time. That means: Only one thread will actually run at the time.

If your thread had to wait for IO to happen, there may be a speed up. Because if one thread has to wait, another thread can catch up. But your problem need the CPU all the time.

I would suggest investigate another Ruby implementation like JRuby for this kind of problem. JRuby has real threads.

Perhaps you will have a greater speed up, if you change your implementation. In the moment you recalculate every max_length over and over again. For example: The sequence length for n = 4 will be 3. If you calculate the length for n = 8, you do one step (n / 2) and than have a current of 4 and you will already know that n = 4 has length = 3: Therefore length(8) = 1 + length(4) = 1 + 4 = 5. Example:

class CollatzSequence

def initialize
@lengths = Hash.new { |h, n| cache_length(h, n) }
end

def length(n)
@lengths[n]
end

private

def cache_length(h, n)
if n <= 1
h[n] = 1
else
next_in_seqence = n.even? ? (n / 2) : (n * 3 + 1)
h[n] = 1 + h[next_in_seqence]
end
end

end

require 'benchmark'
sequencer = CollatzSequence.new

Benchmark.bm(10) do |bm|
bm.report('not cached') { sequencer.length(837799) }
bm.report('cache hit 1') { sequencer.length(837799) }
bm.report('cache hit 2') { sequencer.length(837799 * 2) }
end

# user system total real
# not cached 0.000000 0.000000 0.000000 ( 0.001489)
# cache hit 1 0.000000 0.000000 0.000000 ( 0.000007)
# cache hit 2 0.000000 0.000000 0.000000 ( 0.000011)


Related Topics



Leave a reply



Submit