Celluloid Async Inside Ruby Blocks Does Not Work

Celluloid async inside ruby blocks does not work

Your main loop is dominating the actor/application's threads.

All your program is doing is spawning background processes, but never running them. You need that sleep in the loop purely to allow the background threads to get attention.

It is not usually a good idea to have an unconditional loop spawn infinite background processes like you have here. There ought to be either a delay, or a conditional statement put in there... otherwise you just have an infinite loop spawning things that never get invoked.

Think about it like this: if you put puts "looping" just inside your loop, while you do not see Running in the background ... you will see looping over and over and over.


Approach #1: Use every or after blocks.

The best way to fix this is not to use sleep inside a loop, but to use an after or every block, like this:

every(0.1) {
on_background
}

Or best of all, if you want to make sure the process runs completely before running again, use after instead:

def run_method
@running ||= false
unless @running
@running = true
on_background
@running = false
end
after(0.1) { run_method }
end

Using a loop is not a good idea with async unless there is some kind of flow control done, or a blocking process such as with @server.accept... otherwise it will just pull 100% of the CPU core for no good reason.

By the way, you can also use now_and_every as well as now_and_after too... this would run the block right away, then run it again after the amount of time you want.

Using every is shown in this gist:

  • https://gist.github.com/digitalextremist/686f42e58a58b743142b

The ideal situation, in my opinion:

This is a rough but immediately usable example:

  • https://gist.github.com/digitalextremist/12fc824c6a4dbd94a9df

require 'celluloid/current'

class Indefinite
include Celluloid

INTERVAL = 0.5
ONE_AT_A_TIME = true

def self.run!
puts "000a Instantiating."
indefinite = new
indefinite.run
puts "000b Running forever:"
sleep
end

def initialize
puts "001a Initializing."
@mutex = Mutex.new if ONE_AT_A_TIME
@running = false
puts "001b Interval: #{INTERVAL}"
end

def run
puts "002a Running."
unless ONE_AT_A_TIME && @running
if ONE_AT_A_TIME
@mutex.synchronize {
puts "002b Inside lock."
@running = true
on_background
@running = false
}
else
puts "002b Without lock."
on_background
end
end
puts "002c Setting new timer."
after(INTERVAL) { run }
end

def on_background
if ONE_AT_A_TIME
puts "003 Running background processor in foreground."
else
puts "003 Running in background"
end
end
end

Indefinite.run!
puts "004 End of application."

This will be its output, if ONE_AT_A_TIME is true:

000a Instantiating.
001a Initializing.
001b Interval: 0.5
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
000b Running forever:
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.

And this will be its output if ONE_AT_A_TIME is false:

000a Instantiating.
001a Initializing.
001b Interval: 0.5
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
000b Running forever:
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.

You need to be more "evented" than "threaded" to properly issue tasks and preserve scope and state, rather than issue commands between threads/actors... which is what the every and after blocks provide. And besides that, it's good practice either way, even if you didn't have a Global Interpreter Lock to deal with, because in your example, it doesn't seem like you are dealing with a blocking process. If you had a blocking process, then by all means have an infinite loop. But since you're just going to end up spawning an infinite number of background tasks before even one is processed, you need to either use a sleep like your question started with, or use a different strategy altogether, and use every and after which is how Celluloid itself encourages you to operate when it comes to handling data on sockets of any kind.


Approach #2: Use a recursive method call.

This just came up in the Google Group. The below example code will actually allow execution of other tasks, even though it's an infinite loop.

  • https://groups.google.com/forum/#!topic/celluloid-ruby/xmkdrMQBGbY

This approach is less desirable because it will likely have more overhead, spawning a series of fibers.

def work
# ...
async.work
end

Question #2: Thread vs. Fiber behaviors.

The second question is why the following would work: loop { Thread.new { puts "Hello" } }

That spawns an infinite number of process threads, which are managed by the RVM directly. Even though there is a Global Interpreter Lock in the RVM you are using... that only means no green threads are used, which are provided by the operating system itself... instead these are handled by the process itself. The CPU scheduler for the process runs each Thread itself, without hesitation. And in the case of the example, the Thread runs very quickly and then dies.

Compared to an async task, a Fiber is used. So what's happening is this, in the default case:

  1. Process starts.
  2. Actor instantiated.
  3. Method call invokes loop.
  4. Loop invokes async method.
  5. async method adds task to mailbox.
  6. Mailbox is not invoked, and loop continues.
  7. Another async task is added to the mailbox.
  8. This continues infinitely.

The above is because the loop method itself is a Fiber call, which is not ever being suspended ( unless a sleep is called! ) and therefore the additional task added to the mailbox is never an invoking a new Fiber. A Fiber behaves differently than a Thread. This is a good piece of reference material discussing the differences:

  • https://blog.engineyard.com/2010/concurrency-real-and-imagined-in-mri-threads

Question #3: Celluloid vs. Celluloid::ZMQ behavior.

The third question is why include Celluloid behaves differently than Celluloid::ZMQ ...

That's because Celluloid::ZMQ uses a reactor-based evented mailbox, versus Celluloid which uses a condition variable based mailbox.

Read more about pipelining and execution modes:

  • https://github.com/celluloid/celluloid/wiki/Pipelining-and-execution-modes

That is the difference between the two examples. If you have additional questions about how these mailboxes behave, feel free to post on the Google Group ... the main dynamic you are facing is the unique nature of the GIL interacting with the Fiber vs. Thread vs. Reactor behavior.

You can read more about the reactor-pattern here:

  • http://en.wikipedia.org/wiki/Reactor_pattern
  • Explanation of the "Reactor pattern"
  • What is the difference between event driven model and reactor pattern?

And see the specific reactor used by Celluloid::ZMQ here:

  • https://github.com/celluloid/celluloid-zmq/blob/master/lib/celluloid/zmq/reactor.rb

So what's happening in the evented mailbox scenario, is that when sleep is hit, that is a blocking call, which causes the reactor to move to the next task in the mailbox.

But also, and this is unique to your situation, the specific reactor being used by Celluloid::ZMQ is using an eternal C library... specifically the 0MQ library. That reactor is external to your application, which behaves differently than Celluloid::IO or Celluloid itself, and that is also why the behavior is occurring differently than you expected.

Multi-core Support Alternative

If maintaining state and scope is not important to you, if you use jRuby or Rubinius which are not limited to one operating system thread, versus using MRI which has the Global Interpreter Lock, you can instantiate more than one actor and issue async calls between actors concurrently.

But my humble opinion is that you would be much better served using a very high frequency timer, such as 0.001 or 0.1 in my example, which will seem instantaneous for all intents and purposes, but also allow the actor thread plenty of time to switch fibers and run other tasks in the mailbox.

Can't run multithreading with Celluloid

Ah, I think I see.

Inside your loop, you are waiting for each future to complete, at the end of the loop -- which means you are waiting for one future to complete, before creating the next one.

TIMES.times do |i|
# workers_pool.async.create_file(i) # also not happens
future = Celluloid::Future.new { FileWorker.new.create_file(i) }
p future.value
end

Try changing it to this:

futures = []
TIMES.times do |i|
futures << Celluloid::Future.new { FileWorker.new.create_file(i) }
end
futures.each {|f| p f.value }

In your version, consider the first iteration the loop -- you create a future, then call future.value which waits for the future to complete. The future.value statement won't return until the future completes, and the loop iteration won't finish and loop again to create another future until the statement returns. So you've effectively made it synchronous, by waiting on each future with value before creating the next.

Make sense?

Also, for short code blocks like this, it's way easier on potential SO answerers if you put the code directly in the question, properly indented to format as code, instead of linking out.

In general, if you are using a fairly widely used library like Celluloid, and finding it doesn't seem to do the main thing it's supposed to do -- the first guess should probably be a bug in your code, not that the library fundamentally doesn't work at all (someone else would have noticed before now!). A question title reflecting that, even just "Why doesn't my Celluloid code appear to work multi-threaded" might have gotten more favorable attention than a title suggesting Celluloid fundamentally does not work -- without any code in the question itself demonstrating!

Celluloid 0.17.3 giving unexpected undefined method error

The undefined method error occurred because actor methods are not called with a bang in the recent versions of celluloid gem. Instead you call the method like this: n.async.assholify. So here is what the code should look like:

names = ['John', 'Tom', 'Harry']

names.each do |name|
n = SomeClass.new name
n.async.assholify # Instead of "n.assholify!"
end


For "Celluloid 0.17.0 is running in BACKPORTED mode" warning, take a look at this wiki. Backported Mode is the default, for a limited time. If you use require 'celluloid/current' instead of require 'celluloid', you should not see this warning.

Non blocking subprocess with Celluloid

You need to defer or use a future if you want the return value.

defer ( easy if you need no return )

In your every(5) block, to use defer, wrap your popen call (including the exit status display call) with defer {}

future ( a bit more complicated )

There are several ways to implement this approach. My suggestion would be to move the contents of every(5) to its own method, and initialize an array @futures or similar when starting MyProcessLauncher ... then use this inside the every(5) call:

@futures = future.new_method

Also, add a new loop to process your futures. After every(5) add a new handler ( either another every loop, or an async call to a processor method which is recursive ) to process your values, essentially doing this:

value = @futures.pop.value if @futures.any?

I'd ask a follow up question about futures if you have trouble and can't find examples of how to process futures. Otherwise, there you have two ways to handle popen calls without blocking.

Celluloid Futures not faster than synchronous computation?

Are you using the standard ruby MRI interpreter?

If so, you won't get any speed-up for entirely CPU-bound tasks -- that is, tasks that aren't doing any I/O, but are entirely doing calculations in the CPU. Your 'test' task of 100000.times {|n| n} is indeed entirely CPU-bound.

The reason you won't get any speed-up through multi-threading for entirely CPU-bound tasks on MRI, is because the MRI interpreter has a "Global Interpreter Lock" (GIL), that prevents more than one of your CPU cores from being used at once by the ruby interpreter. Multi-threaded parallelism, like celluloid gives you, can speed up CPU work only by running different threads on different CPU cores simulataneously, on a multi-core system like most systems are these days.

But in MRI, that's not possible. This is a limitation of the ruby MRI interpreter.

If you install JRuby and run your test under JRuby, you should see speed-up.

If your task involved some I/O (like making a database query, or waiting on a remote HTTP API, or doing significant amounts of file reading or writing), you could also see some speed-up under MRI. The more proportional time your task spends doing I/O, the more speed-up. This is because even though MRI doesn't allow threads to execute simultaneously on more than one CPU core, a thread waiting on I/O can still be switched out and another thread switched in to do work. Whereas if you weren't using threads, the program would just be sitting around waiting on the I/O doing no work.

If you google for "ruby GIL" you can find more discussions of the issue.

If you are really doing CPU-intensive work that could benefit from multi-threaded parallelism in a way that will significantly help your program, consider switching to Jruby.

And if you really do need multi-threaded parallelism, an alternatives to using Celluloid is using Futures or Promises from the concurrent-ruby package. Concurrent-ruby is generally simpler internally and lighter-weight than Celluloid. However, writing multi-threaded code can be tricky regardless of which tool you use, and even if you use Celluloid or ruby-concurrent to give you better higher-level abstractions than working directly with threads, working with multi-threaded concurrency will require becoming familiar with some techniques for such and require some tricky debugging from time to time.

Understanding Celluloid Concurrency

Using your gists, I verified this issue can be reproduced in MRI 2.2.1 as well as jRuby 1.7.21 and Rubinius 2.5.8 ... The difference between server1.rb and server2.rb is the use of the DisplayMessage and message class method in the latter.


Use of sleep in DisplayMessage is out of Celluloid scope.

When sleep is used in server1.rb it is using Celluloid.sleep in actuality, but when used in server2.rb it is using Kernel.sleep ... which locks up the mailbox for Server until 60 seconds have passed. This prevents future method calls on that actor to be processed until the mailbox is processing messages ( method calls on the actor ) again.

There are three ways to resolve this:

  • Use a defer {} or future {} block.

  • Explicitly invoke Celluloid.sleep rather than sleep ( if not explicitly invoked as Celluloid.sleep, using sleep will end up calling Kernel.sleep since DisplayMessage does not include Celluloid like Server does )

  • Bring the contents of DisplayMessage.message into handle_message as in server1.rb; or at least into Server, which is in Celluloid scope, and will use the correct sleep.


The defer {} approach:

def handle_message(message)
defer {
DisplayMessage.message(message)
}
end

The Celluloid.sleep approach:

class DisplayMessage
def self.message(message)
#de ...
Celluloid.sleep 60
end
end

Not truly a scope issue; it's about asynchrony.

To reiterate, the deeper issue is not the scope of sleep ... that's why defer and future are my best recommendation. But to post something here that came out in my comments:

Using defer or future pushes a task that would cause an actor to become tied up into another thread. If you use future, you can get the return value once the task is done, if you use defer you can fire & forget.

But better yet, create another actor for tasks that tend to get tied up, and even pool that other actor... if defer or future don't work for you.

I'd be more than happy to answer follow-up questions brought up by this question; we have a very active mailing list, and IRC channel. Your generous bounties are commendable, but plenty of us would help purely to help you.



Related Topics



Leave a reply



Submit