How to Implement a Practical Fiber Scheduler

How to implement a practical fiber scheduler?

You'd need to multiplex io operations into an event based interface(select/poll), so you can leverage the OS to do the waiting, while still being able to schedule other fibers. select/poll have a timeout argument - for fibers that want to sleep, you can create a priority queue that uses that option of select/poll to emulate a sleep call.

Trying to serve fibers that does blocking operations (call read/write/sleep etc). directly won't work unless you schedule each fiber in a native thread - which kind of beats the purpose.

See http://swtch.com/libtask/ for a working implementation.

Implementing preemptive microthreads using signal handlers and setjmp/longjmp

The solution is to use sigsetjmp / siglongjmp, intstead of setjmp/longjmp. sig* versions preserve signal masks :)

Fibers use cases


what if my problem is counting words in a huge file? ..., can fibers help somehow?

No.

every doc actually uses async I/O as an example

Async I/O is the problem that threads originally were meant to solve back when multi-CPU systems had not yet escaped from the laboratory. Threads were an alternate way to structure a program that had to wait for input from several different, non-synchronized sources and, had to respond to those inputs in a timely fashion.

Depending on how they were implemented, threads back in those days could be anywhere on a scale from "mostly the same as" to "completely identical with" what we call "green threads" or "fibers" today.

When multi-CPU systems hit the market, threading was seen as a natural and obvious way to exploit the parallel processing capabilities.

Why do we need fibers

Fibers are something you will probably never use directly in application-level code. They are a flow-control primitive which you can use to build other abstractions, which you then use in higher-level code.

Probably the #1 use of fibers in Ruby is to implement Enumerators, which are a core Ruby class in Ruby 1.9. These are incredibly useful.

In Ruby 1.9, if you call almost any iterator method on the core classes, without passing a block, it will return an Enumerator.

irb(main):001:0> [1,2,3].reverse_each
=> #<Enumerator: [1, 2, 3]:reverse_each>
irb(main):002:0> "abc".chars
=> #<Enumerator: "abc":chars>
irb(main):003:0> 1.upto(10)
=> #<Enumerator: 1:upto(10)>

These Enumerators are Enumerable objects, and their each methods yield the elements which would have been yielded by the original iterator method, had it been called with a block. In the example I just gave, the Enumerator returned by reverse_each has a each method which yields 3,2,1. The Enumerator returned by chars yields "c","b","a" (and so on). BUT, unlike the original iterator method, the Enumerator can also return the elements one by one if you call next on it repeatedly:

irb(main):001:0> e = "abc".chars
=> #<Enumerator: "abc":chars>
irb(main):002:0> e.next
=> "a"
irb(main):003:0> e.next
=> "b"
irb(main):004:0> e.next
=> "c"

You may have heard of "internal iterators" and "external iterators" (a good description of both is given in the "Gang of Four" Design Patterns book). The above example shows that Enumerators can be used to turn an internal iterator into an external one.

This is one way to make your own enumerators:

class SomeClass
def an_iterator
# note the 'return enum_for...' pattern; it's very useful
# enum_for is an Object method
# so even for iterators which don't return an Enumerator when called
# with no block, you can easily get one by calling 'enum_for'
return enum_for(:an_iterator) if not block_given?
yield 1
yield 2
yield 3
end
end

Let's try it:

e = SomeClass.new.an_iterator
e.next # => 1
e.next # => 2
e.next # => 3

Wait a minute... does anything seem strange there? You wrote the yield statements in an_iterator as straight-line code, but the Enumerator can run them one at a time. In between calls to next, the execution of an_iterator is "frozen". Each time you call next, it continues running down to the following yield statement, and then "freezes" again.

Can you guess how this is implemented? The Enumerator wraps the call to an_iterator in a fiber, and passes a block which suspends the fiber. So every time an_iterator yields to the block, the fiber which it is running on is suspended, and execution continues on the main thread. Next time you call next, it passes control to the fiber, the block returns, and an_iterator continues where it left off.

It would be instructive to think of what would be required to do this without fibers. EVERY class which wanted to provide both internal and external iterators would have to contain explicit code to keep track of state between calls to next. Each call to next would have to check that state, and update it before returning a value. With fibers, we can automatically convert any internal iterator to an external one.

This doesn't have to do with fibers persay, but let me mention one more thing you can do with Enumerators: they allow you to apply higher-order Enumerable methods to other iterators other than each. Think about it: normally all the Enumerable methods, including map, select, include?, inject, and so on, all work on the elements yielded by each. But what if an object has other iterators other than each?

irb(main):001:0> "Hello".chars.select { |c| c =~ /[A-Z]/ }
=> ["H"]
irb(main):002:0> "Hello".bytes.sort
=> [72, 101, 108, 108, 111]

Calling the iterator with no block returns an Enumerator, and then you can call other Enumerable methods on that.

Getting back to fibers, have you used the take method from Enumerable?

class InfiniteSeries
include Enumerable
def each
i = 0
loop { yield(i += 1) }
end
end

If anything calls that each method, it looks like it should never return, right? Check this out:

InfiniteSeries.new.take(10) # => [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

I don't know if this uses fibers under the hood, but it could. Fibers can be used to implement infinite lists and lazy evaluation of a series. For an example of some lazy methods defined with Enumerators, I have defined some here: https://github.com/alexdowad/showcase/blob/master/ruby-core/collections.rb

You can also build a general-purpose coroutine facility using fibers. I've never used coroutines in any of my programs yet, but it's a good concept to know.

I hope this gives you some idea of the possibilities. As I said at the beginning, fibers are a low-level flow-control primitive. They make it possible to maintain multiple control-flow "positions" within your program (like different "bookmarks" in the pages of a book) and switch between them as desired. Since arbitrary code can run in a fiber, you can call into 3rd-party code on a fiber, and then "freeze" it and continue doing something else when it calls back into code you control.

Imagine something like this: you are writing a server program which will service many clients. A complete interaction with a client involves going through a series of steps, but each connection is transient, and you have to remember state for each client between connections. (Sound like web programming?)

Rather than explicitly storing that state, and checking it each time a client connects (to see what the next "step" they have to do is), you could maintain a fiber for each client. After identifying the client, you would retrieve their fiber and re-start it. Then at the end of each connection, you would suspend the fiber and store it again. This way, you could write straight-line code to implement all the logic for a complete interaction, including all the steps (just as you naturally would if your program was made to run locally).

I'm sure there's many reasons why such a thing may not be practical (at least for now), but again I'm just trying to show you some of the possibilities. Who knows; once you get the concept, you may come up with some totally new application which no-one else has thought of yet!

C setjmp.h and ucontext.h, which is better?

setjmp is portable (ISO C89 and C99) and ucontext (obsolescent in SUSv3 and removed from SUSv4/POSIX 2008) is not. However ucontext was considerably more powerful in specification. In practice, if you used nasty hacks with setjmp/longjmp and signal handlers and alternate signal handling stacks, you could make these just about as powerful as ucontext, but they were not "portable".

Neither should be used for multithreading. For that purpose POSIX threads (pthread functions). I have several reasons for saying this:

  • If you're writing threaded code, you might as well make it actually run concurrently. We're hitting the speed limits of non-parallel computing and future machines will be more and more parallel, so take advantage of that.
  • ucontext was removed from the standards and might not be supported in future OS's (or even some present ones?)
  • Rolling your own threads cannot be made transparent to library code you might want to use. It might break library code that makes reasonable assumptions about concurrency, locking, etc. As long as your multithreading is cooperative rather than async-signal-based there are probably not too many issues like this, but once you've gotten this deep into nonportable hacks things can get very fragile.
  • ...and probably some more reasons I can't think of right now. :-)

Which would be better for concurrent tasks on node.js? Fibers? Web-workers? or Threads?

Node has a completely different paradigm and once it is correctly captured, it is easier to see this different way of solving problems. You never need multiple threads in a Node application(1) because you have a different way of doing the same thing. You create multiple processes; but it is very very different than, for example how Apache Web Server's Prefork mpm does.

For now, let's think that we have just one CPU core and we will develop an application (in Node's way) to do some work. Our job is to process a big file running over its contents byte-by-byte. The best way for our software is to start the work from the beginning of the file, follow it byte-by-byte to the end.

-- Hey, Hasan, I suppose you are either a newbie or very old school from my Grandfather's time!!! Why don't you create some threads and make it much faster?

-- Oh, we have only one CPU core.

-- So what? Create some threads man, make it faster!

-- It does not work like that. If I create threads I will be making it slower. Because I will be adding a lot of overhead to the system for switching between threads, trying to give them a just amount of time, and inside my process, trying to communicate between these threads. In addition to all these facts, I will also have to think about how I will divide a single job into multiple pieces that can be done in parallel.

-- Okay okay, I see you are poor. Let's use my computer, it has 32 cores!

-- Wow, you are awesome my dear friend, thank you very much. I appreciate it!

Then we turn back to work. Now we have 32 cpu cores thanks to our rich friend. Rules we have to abide have just changed. Now we want to utilize all this wealth we are given.

To use multiple cores, we need to find a way to divide our work into pieces that we can handle in parallel. If it was not Node, we would use threads for this; 32 threads, one for each cpu core. However, since we have Node, we will create 32 Node processes.

Threads can be a good alternative to Node processes, maybe even a better way; but only in a specific kind of job where the work is already defined and we have complete control over how to handle it. Other than this, for every other kind of problem where the job comes from outside in a way we do not have control over and we want to answer as quickly as possible, Node's way is unarguably superior.

-- Hey, Hasan, are you still working single-threaded? What is wrong with you, man? I have just provided you what you wanted. You have no excuses anymore. Create threads, make it run faster.

-- I have divided the work into pieces and every process will work on one of these pieces in parallel.

-- Why don't you create threads?

-- Sorry, I don't think it is usable. You can take your computer if you want?

-- No okay, I am cool, I just don't understand why you don't use threads?

-- Thank you for the computer. :) I already divided the work into pieces and I create processes to work on these pieces in parallel. All the CPU cores will be fully utilized. I could do this with threads instead of processes; but Node has this way and my boss Parth Thakkar wants me to use Node.

-- Okay, let me know if you need another computer. :p

If I create 33 processes, instead of 32, the operating system's scheduler will be pausing a thread, start the other one, pause it after some cycles, start the other one again... This is unnecessary overhead. I do not want it. In fact, on a system with 32 cores, I wouldn't even want to create exactly 32 processes, 31 can be nicer. Because it is not just my application that will work on this system. Leaving a little room for other things can be good, especially if we have 32 rooms.

I believe we are on the same page now about fully utilizing processors for CPU-intensive tasks.

-- Hmm, Hasan, I am sorry for mocking you a little. I believe I understand you better now. But there is still something I need an explanation for: What is all the buzz about running hundreds of threads? I read everywhere that threads are much faster to create and dumb than forking processes? You fork processes instead of threads and you think it is the highest you would get with Node. Then is Node not appropriate for this kind of work?

-- No worries, I am cool, too. Everybody says these things so I think I am used to hearing them.

-- So? Node is not good for this?

-- Node is perfectly good for this even though threads can be good too. As for thread/process creation overhead; on things that you repeat a lot, every millisecond counts. However, I create only 32 processes and it will take a tiny amount of time. It will happen only once. It will not make any difference.

-- When do I want to create thousands of threads, then?

-- You never want to create thousands of threads. However, on a system that is doing work that comes from outside, like a web server processing HTTP requests; if you are using a thread for each request, you will be creating a lot of threads, many of them.

-- Node is different, though? Right?

-- Yes, exactly. This is where Node really shines. Like a thread is much lighter than a process, a function call is much lighter than a thread. Node calls functions, instead of creating threads. In the example of a web server, every incoming request causes a function call.

-- Hmm, interesting; but you can only run one function at the same time if you are not using multiple threads. How can this work when a lot of requests arrive at the web server at the same time?

-- You are perfectly right about how functions run, one at a time, never two in parallel. I mean in a single process, only one scope of code is running at a time. The OS Scheduler does not come and pause this function and switch to another one, unless it pauses the process to give time to another process, not another thread in our process. (2)

-- Then how can a process handle 2 requests at a time?

-- A process can handle tens of thousands of requests at a time as long as our system has enough resources (RAM, Network, etc.). How those functions run is THE KEY DIFFERENCE.

-- Hmm, should I be excited now?

-- Maybe :) Node runs a loop over a queue. In this queue are our jobs, i.e, the calls we started to process incoming requests. The most important point here is the way we design our functions to run. Instead of starting to process a request and making the caller wait until we finish the job, we quickly end our function after doing an acceptable amount of work. When we come to a point where we need to wait for another component to do some work and return us a value, instead of waiting for that, we simply finish our function adding the rest of work to the queue.

-- It sounds too complex?

-- No no, I might sound complex; but the system itself is very simple and it makes perfect sense.

Now I want to stop citing the dialogue between these two developers and finish my answer after a last quick example of how these functions work.

In this way, we are doing what OS Scheduler would normally do. We pause our work at some point and let other function calls (like other threads in a multi-threaded environment) run until we get our turn again. This is much better than leaving the work to OS Scheduler which tries to give just time to every thread on system. We know what we are doing much better than OS Scheduler does and we are expected to stop when we should stop.

Below is a simple example where we open a file and read it to do some work on the data.

Synchronous Way:

Open File
Repeat This:
Read Some
Do the work

Asynchronous Way:

Open File and Do this when it is ready: // Our function returns
Repeat this:
Read Some and when it is ready: // Returns again
Do some work

As you see, our function asks the system to open a file and does not wait for it to be opened. It finishes itself by providing next steps after file is ready. When we return, Node runs other function calls on the queue. After running over all the functions, the event loop moves to next turn...

In summary, Node has a completely different paradigm than multi-threaded development; but this does not mean that it lacks things. For a synchronous job (where we can decide the order and way of processing), it works as well as multi-threaded parallelism. For a job that comes from outside like requests to a server, it simply is superior.


(1) Unless you are building libraries in other languages like C/C++ in which case you still do not create threads for dividing jobs. For this kind of work you have two threads one of which will continue communication with Node while the other does the real work.

(2) In fact, every Node process has multiple threads for the same reasons I mentioned in the first footnote. However this is no way like 1000 threads doing similar works. Those extra threads are for things like to accept IO events and to handle inter-process messaging.

UPDATE (As reply to a good question in comments)

@Mark, thank you for the constructive criticism. In Node's paradigm, you should never have functions that takes too long to process unless all other calls in the queue are designed to be run one after another. In case of computationally expensive tasks, if we look at the picture in complete, we see that this is not a question of "Should we use threads or processes?" but a question of "How can we divide these tasks in a well balanced manner into sub-tasks that we can run them in parallel employing multiple CPU cores on the system?" Let's say we will process 400 video files on a system with 8 cores. If we want to process one file at a time, then we need a system that will process different parts of the same file in which case, maybe, a multi-threaded single-process system will be easier to build and even more efficient. We can still use Node for this by running multiple processes and passing messages between them when state-sharing/communication is necessary. As I said before, a multi-process approach with Node is as well as a multi-threaded approach in this kind of tasks; but not more than that. Again, as I told before, the situation that Node shines is when we have these tasks coming as input to system from multiple sources since keeping many connections concurrently is much lighter in Node compared to a thread-per-connection or process-per-connection system.

As for setTimeout(...,0) calls; sometimes giving a break during a time consuming task to allow calls in the queue have their share of processing can be required. Dividing tasks in different ways can save you from these; but still, this is not really a hack, it is just the way event queues work. Also, using process.nextTick for this aim is much better since when you use setTimeout, calculation and checks of the time passed will be necessary while process.nextTick is simply what we really want: "Hey task, go back to end of the queue, you have used your share!"

How to make monix fixed rate Scheduler continue upon failure

You can always leverage scala.util.Try for that or simple try-catch blocks. On any failure case, you just log and move on. You can even have failure retry strategy as below.

import scala.util._

def taskExceptionProne() = ???

var failures = 0
val maxRetries = 10

scheduler.scheduleAtFixedRate(5.milliseconds, 2.milliseconds) {
Try(taskExceptionProne) match {
Success(result) =>
//do something with the result
failures = 0
println("Task executed.")
Failure(throwable) =>
if (failures>=maxRetries) throw throwable else {
failures = failures + 1
//log failure
println(throwable.getMessage)
}
}
}


Related Topics



Leave a reply



Submit