fibers icon indicating copy to clipboard operation
fibers copied to clipboard

Fibers not running in parallel

Open ZelphirKaltstahl opened this issue 4 years ago • 9 comments

I have some code in which fibers do not perform their work in parallel for some reason:

(define pool-initializer
  (lambda* (#:key (parallelism (current-processor-count)))
    (let ([channel-receive (make-channel)]
          [scheduler (make-scheduler #:parallelism parallelism)])

      (call-with-new-thread
       (lambda ()
         (run-fibers
          (lambda ()
            (let loop ([index parallelism])
              (unless (zero? index)
                (spawn-fiber (lambda () (worker index channel-receive)))
                (loop (- index 1)))))
          #:scheduler scheduler)))

      (call-with-new-thread
       (lambda ()
         (work-distributor channel-receive)))
      channel-receive)))

This procedure initializes a pool of fibers. The number of fibers depends on the parallelism keyword argument. Later on these fibers receive some work on a channel and return their result on a channel.

My understanding is, that fibers of the same scheduler should be able to run in parallel, if the scheduler's parallelism keyword argument is > 1. For some reason however, this does not happen and the fibers run sequentially, one after the other, when I give them work.

Why do they not run in parallel?

Is it because the scheduler is run inside a call-with-new-thread? Does this limit the parallelism to 1? I need to run it in call-with-new-thread, because I need to return the channel-receive, which will be input to the procedure, which gives work to a thing I called work-distributor. If I cannot run a scheduler in call-with-new-thread, how can I run a scheduler without blocking until all the fibers have completed their work?

So far I have only guesses, why my code does not do computation in parallel.

For a complete example of sequentially running fibers, I will paste my whole code below. It is actually based on @amirouche 's thread pool for his babelia project (I currently cannot find it on Github any longer and do not know where to look for it.), only that I am trying to use fibers instead of threads to perform work, hoping to take advantage of work stealing and work sharing as well as running more lightweight than threads.

The code:

(define-module (fibers-pool))


(use-modules
 ;; FIFO queue, not functional, using mutation
 ;; https://www.gnu.org/software/guile/manual/html_node/Queues.html
 (ice-9 q)
 (ice-9 match)
 (ice-9 threads)
 (rnrs exceptions)
 (rnrs conditions)
 ;; fibers internals are needed for creating schedulers without running anything
 ;; in them immediately
 (fibers)
 (fibers channels)
 (fibers internal))


(define displayln
  (lambda (msg)
    (display msg)
    (newline)))


(define work-distributor
  (lambda (channel-receive)
    (let loop ([work-queue (make-q)]
               [worker-channels-queue (make-q)])
      (displayln "[WORK-DISTRIBUTOR]: work-distributor is listening for messages")

      (display "[WORK-DISTRIBUTOR]: number of ready workers in queue: ")
      (displayln (q-length worker-channels-queue))

      (display "[WORK-DISTRIBUTOR]: number of works in queue: ")
      (displayln (q-length work-queue))

      (match (pk 'work-distributor-received-msg (get-message channel-receive))
        [('worker-ready . channel-worker)
         (displayln "[WORK-DISTRIBUTOR]: work-distributor received ready worker channel")
         ;; If there is no work for the ready worker, enqueue the worker,
         ;; otherwise give it work.
         (cond
          [(q-empty? work-queue)
           (enq! worker-channels-queue channel-worker)]
          [else
           (let ([some-work (deq! work-queue)])
             (put-message channel-worker (cons 'work some-work)))])
         (loop work-queue worker-channels-queue)]
        [('work . work)
         ;; ~work~ is always a pair of a thunk to be run and a return channel,
         ;; on which the result shall be put.

         ;; If there is no worker ready, enqueue the work, otherwise distribute
         ;; the work to a ready worker.
         (cond
          [(q-empty? worker-channels-queue)
           (enq! work-queue work)]
          [else
           (let ([channel-worker (deq! worker-channels-queue)])
             (put-message channel-worker (cons 'work work)))])
         (loop work-queue worker-channels-queue)]
        ;; On any other message raise a condition.
        [other
         (raise
          (condition
           (make-error)
           (make-message-condition "work-distributor received unrecognized message")
           (make-irritants-condition (list other))))]))))


(define worker
  (lambda (worker-index channel-receive)
    (let ([channel-worker (make-channel)])
      (displayln "[WORKER]: before worker message loop")
      (let loop ()
        ;; Report as ready. Give my own channel to the work-distributor to let
        ;; it send me work.
        (put-message channel-receive
                     (cons 'worker-ready
                           channel-worker))
        ;; Get messages sent to me by the distributor on my own channel.
        (match (pk 'worker-got-msg (get-message channel-worker))
          ;; If I receive work, do the work and return it on the channel-return.
          [('work . (thunk . channel-return))
           ;; Put the result on the return channel, so that anyone, who has the
           ;; a binding of the return channel, can access the result.
           (put-message channel-return (thunk))
           (loop)]
          ;; On any other message raise a condition.
          [other
           (raise
            (condition
             (make-error)
             (make-message-condition "worker received unrecognized message")
             (make-irritants-condition (list other))))])))))


(define pool-initializer
  (lambda* (#:key (parallelism (current-processor-count)))
    (let ([channel-receive (make-channel)]
          [scheduler (make-scheduler #:parallelism parallelism)])
      ;; start as many workers as are desired

      ;; TODO: PROBLEM: ~run-fibers~ blocks. So we need a new thread to run the
      ;; fibers in a non-blocking way. LOOKUP: How to start fibers without
      ;; waiting for them to finish?
      (call-with-new-thread
       (lambda ()
         (run-fibers
          (lambda ()
            (let loop ([index parallelism])
              (unless (zero? index)
                (display "[POOL INIT THREAD]: will spawn fiber ") (displayln index)
                (spawn-fiber (lambda () (worker index channel-receive)))
                ;; We do not need to spawn new fibers in the same scheduler later. The
                ;; fibers should stay alive for the whole duration the program is
                ;; running.
                (displayln "[POOL INIT THREAD]: fiber spawned")
                (loop (- index 1)))))
          #:scheduler scheduler)
         (displayln "[POOL INIT]: pool init thread returning")))
      (displayln "[POOL INIT]: will start work-distributor")
      (call-with-new-thread
       (lambda ()
         (work-distributor channel-receive)))
      ;; Return the channel for receiving work, so that the outside context can
      ;; make use of it when calling ~publish~ to publish work.
      channel-receive)))


(define publish
  (lambda (work-as-thunk channel-receive)
    ;; The result of the computation can be taken from ~channel-return~.
    (let ([channel-return (make-channel)])
      ;; Put work tagged as work on the receive channel of the work-distributor.
      (let ([work-message (cons 'work (cons work-as-thunk channel-return))])
        (display
         (simple-format
          #f "[PUBLISHER]: will publish the following work: ~a\n"
          work-message))
        (put-message channel-receive work-message))

      (displayln "[PUBLISHER]: work published")
      ;; Return the ~channel-return~, so that the outside context can get
      ;; results from it.
      channel-return)))


(define busy-work
  (lambda ()
    (let loop ([i 0])
      (cond
       [(< i 5e8) (loop (+ i 1))]
       [else i]))))


;; Try it!
(define c-rec (pool-initializer #:parallelism 2))
(define c-ret-2 (publish (lambda () (busy-work)) c-rec))
(define c-ret-1 (publish (lambda () (busy-work)) c-rec))
(get-message c-ret-2)
(get-message c-ret-1)

On my machine, this runs in sequence, rather than in parallel.

ZelphirKaltstahl avatar Jan 30 '20 21:01 ZelphirKaltstahl

@ZelphirKaltstahl Sorry, long story made short, I made all my repositories private on github.

The code you are referring to is:

;; Pool of workers that can be used to execute blocking operation in a
;; fibers application.
(define-module (babelia pool))

(import (ice-9 match))
(import (ice-9 q))
(import (ice-9 threads))
(import (srfi srfi-9))
(import (srfi srfi-1))
(import (fibers))
(import (fibers channels))
(import (fibers operations))
(import (babelia thread))
(import (babelia okvs ulid))
(import (babelia log))


(define %channel #f)

(define worker-count (- (current-processor-count) 1))

(define (worker channel)
  (parameterize ((thread-index (random-bytes 2)))
    (let ((worker (make-channel)))
      (let loop ()
        (put-message channel (cons 'worker worker))
        (let* ((work (get-message worker))
               (thunk (car work))
               (return (cdr work))
               ;; Execute thunk and send the returned value.  XXX: To be able
               ;; to keep track of jobs, the channel called `return`, is put
               ;; in itself.  See procedure pool-for-each-par-map.

               ;; TODO: add a call-with-values
               (out (thunk)))
          (put-message return (cons return out)))
        (loop)))))

(define (arbiter channel)
  (let ((works (make-q))
        (workers (make-q)))
    (let loop ((message (get-message channel)))
      (match message
        (('worker . worker)
         (if (q-empty? works)
             (enq! workers worker)
             (let ((work (deq! works)))
               (put-message worker work))))
        (('work . work)
         (if (q-empty? workers)
             (enq! works work)
             (let ((worker (deq! workers)))
               (put-message worker work))))
        (_ (raise (cons 'fuu message))))
      (loop (get-message channel)))))

(define-public (pool-init)
  (if %channel
      (error 'babelia "pool can not be initialized more than once")
      (let ((channel (make-channel)))
        (log-debug "pool init")
        (set! %channel channel)
        (let loop ((index worker-count))
          (unless (zero? index)
            (call-with-new-thread (lambda () (worker channel)))
            (loop (- index 1))))
        (arbiter channel))))

(define (publish thunk)
  (let ((return (make-channel)))
    (put-message %channel (cons 'work (cons thunk return)))
    return))

(define-public (pool-apply thunk)
  "Execute THUNK in a worker thread.

   Pause the calling fiber until the result is available."
  (cdr (get-message (publish thunk))))

(define (select channels)
  (perform-operation
   (apply choice-operation (map get-operation channels))))

;; TODO: Maybe add a timeout argument, in order to be able to display
;; a nicer error.
(define-public (pool-for-each-par-map sproc pproc lst)
  "For each item of LST execute (PPROC item) in a worker thread, and
   gather returned value with SPROC. SPROC is executed in the calling
   fiber.

   This a POSIX thread pool based n-for-each-par-map for fibers. It is
   somewhat equivalent to:

     (for-each SSPROC (map PPROC LST))

   But applications of PPROC happens in parallel and waiting for new
   values is not blocking the main thread."
  (let loop ((channels (map (lambda (item) (publish (lambda () (pproc item))))
                            lst)))
    (unless (null? channels)
      (match (select channels)
        ((channel . value)
         (sproc value)
         (loop (remove (lambda (x) (eq? x channel)) channels)))
        (else (raise 'fuuubar))))))

amirouche avatar Feb 07 '20 10:02 amirouche

In particular:

(pool-for-each-par-map sproc pproc lst)

Will only work if / when the count of jobs LST is known in advance. Bad things may happen if PPROC calls poll-for-each-par-map. I do not know.

Anyway, in your case, and based on your messages on the mailing list, correct me if I am wrong: you want to compute in parallel a graph of procedures, where some procedures PROCR depend on THUNKA and THUNKB both THUNKA and THUNKB are independant computation that can happen in parallel. In that case, you can rely on something similar to how pool-apply in the above snippet works. Here is the definition of pool-apply:

(define (pool-apply thunk)
  "Execute THUNK in a worker thread.

   Pause the calling fiber until the result is available."
  (cdr (get-message (publish thunk))))

You can not use that as-is because it will pause the calling fiber, without giving you the change to execute a parallel computation. So what you need to do is:

(define thunka-channel-result (publish THUNKA))
(define thunkb-channel-result (publish THUNKB))
(define result-thunka (get-message thunka-channel-result))
(define result-thunkb (get-message thunkb-channel-result))

And then possibly call pool-apply:

(pool-apply (lambda () (PROCR result-thunka result-thunkb)))

Maybe adding a gather procedure will be a good thing.

@ZelphirKaltstahl do you have a repository I can look at? Let me know what you think.

amirouche avatar Feb 07 '20 11:02 amirouche

Hi @amirouche !

I do have a copy of your code in a file here on my machine : ) Thanks nevertheless.

The code I posted is the code I derived from yours, by using fibers instead of threads for the workers. I want to create a repo for the pool. I guess it will be AGPL then, as your original code was AGPL as well, iirc. I will create a repo and link it here.

The question I have now is, why those fibers I am using as workers are not running in parallel. I did make the scheduler with #:parallelism 2, but it does not work in parallel for some reason. At the very bottom of the code I posted you can see, that I am already publishing some busy work twice, before I call the get-message procedure on the channels, to make the pool start working, before requesting the answer. get-message is blocking, just like put message. That much I understand.

Once I can get this running in parallel, I can make use of it in my decision tree parallelization attempt and avoid the apparently buggy parallel forms and finally have my decision tree algorithm run in parallel.

Anyway, in your case, and based on your messages on the mailing list, correct me if I am wrong: you want to compute in parallel a graph of procedures, where some procedures PROCR depend on THUNKA and THUNKB both THUNKA and THUNKB are independant computation that can happen in parallel. In that case, you can rely on something similar to how pool-apply in the above snippet works. Here is the definition of pool-apply:

Yes, I think that is what I want to do. Furthermore in those THUNKA and THUNKB new thunks can be started, as there may be more layers in the tree. For example:

main-thunk - spawns thunk-1 and thunk-2 thunk-1 spawns thunk-1-1 and thunk-1-2 thunk-2 spawns thunk-2-1 and thunk-2-2

Each of those thunks on the lowest level can again spawn some thunk. Spawning a thunk here means to publish work for the fibers to finish. This recursive character is important.

Repo added: https://notabug.org/ZelphirKaltstahl/guile-fibers-pool

ZelphirKaltstahl avatar Feb 08 '20 11:02 ZelphirKaltstahl

I realized, that my idea of running things recursively in parallel will not work with the code I wrote above. The case with 2 separate workers should still work in parallel according to my understanding of fibers. In the end I will probably end up spawning a new fiber for each task to be run in parallel and do away with the concept of a worker, which gets assigned new work and is not discarded. It seems impossible to use workers recursively, as they would block for child executions and not be available to receive and work on more work while waiting for a child execution to finish.

I could not yet think of a way to avoid spawning an arbitrary number of execution units (threads, processes, fibers, whatever) and still perform the arbitrarily recursive parallel execution. That is for not making use of some kind of global state somewhere.

It would still be great to learn, why the code I gave does not run in parallel, as it uses 2 fibers for 2 separate non-recursive tasks. There might be a bug or my understanding of how the fibers library works is simply wrong or I overlooked something that makes my code block even when only one of the workers is assigned work and there are 2 workers available.

ZelphirKaltstahl avatar Feb 19 '20 12:02 ZelphirKaltstahl

I republished https://github.com/amirouche/guile-babelia

@ZelphirKaltstahl did you make any progress?

amirouche avatar Jun 01 '22 11:06 amirouche

@amirouche I have not worked on this any further. I completely forgot about this issue. I did have a nagging feeling everytime I thought about fibers, that there was something I needed to learn about them.

I had a few small projects, where I used future instead (cannot link right now, because notabug is experiencing issues). I don't know how performant it is compared to fibers, but the manual does say, that they are for fine-grained parallelism, so I think I did not go too wrong in using them. What I did there was to split up a range of numbers into evenly sized chunks (as evenly sized as possible, at least) and then created multiple futures, for each chunk one, and then synchronized them all, merging their results. Typical map-reduce kind of thing. That worked quite well. But not all problems are easily expressed like that, as some might be recursive.

However, I still have not found a way (without introducing a global state wrapped in some mutex) to have recursive calls, each potentially spawning a fiber, but only as many as some maximum says. Well, except of course limiting the whole Guile process, which runs a program, so that the program itself does not need to impose the limits. But then I might spawn many futures, creating overhead, when I should only be spawning some workers, which are not discarded and receive work and deliver results, in order to reduce overhead.

I have also not yet found a way to build any worker pool of fibers.

I seem to remember, that you had some worker pool thing in guile-babelia going? Is that correct?

ZelphirKaltstahl avatar Jun 01 '22 23:06 ZelphirKaltstahl

I don't know how performant it is compared to fibers

It depends on the algorithm and its side effects:

  • Just mutating variables e.g., with integer procedure will consume only CPU; in that case a flow of execution that was started with run-fibers [I call it flow-spawn]. That fiber will not be paused. No other fiber will run in that POSIX thread, since that fiber doing computation never pause it will not be in the pool of fibers [also known as continuations], it will not be migrated to another POSIX thread: in guile-fibers as known as work-stealing. CPython -X dev will log the execution time between two pauses of a fiber, and issue a log warning when it is above a certain number of jiffies...

  • At this time, when a fiber call with an operation or directly the procedure sleep, in this case (sleep is under-the-wood an operation), then the fiber will pause. Another case is when a fiber does network input-output with tcp or udp ports, a fiber will pause. That means it only consumes memory in the pool of paused fibers, until its resume because "resume?" predicate returns #t.

  • In the future, there will be support for pausing fibers when they operate on durable disk. In 2022, guile-fibers will still ~pause~ block the whole fiber scheduler of the associated POSIX thread when calling POSIX read; the kernel pause the calling thread, that some time called blocking too in user space / green threads community; and no other fiber can run in that calling thread. On Linux there is something called iouring sometime spelled io_uring. Look at libiouring on GitHub (the maintainer is at facebook); and the recent convo at the board called lobsters.

  • If you do pure computation without pause, you will have one buzy cpu core (hopefully it is pinned to ease the Linux scheduler): there is no overhead to run your app inside run-fibers.

  • guile-fibers has also the ability to spawn fibers in POSIX threads that are not the main POSIX thread, with calls like:

(thread (run-fiber thunk))

The channels are bidirectional pipelines, on both sides they must be an operation that may run inside as a flow of execution after a (run-fiber (my-flow a-channel-is-shared) or from a thread: (thread (proc a-channel-is-shared)). Both sides can send, and recv any scheme objects: that is a great feature, because it avoids copies, hence use less memory, and speed up GC, hence the whole program.

Regarding the nesting of POSIX threads and fibers, given one-posix-thread-per-core, and three POSIX threads, with three fibers in each POSIX threads, it looks like:

[to be continued].

amirouche avatar Jun 02 '22 10:06 amirouche

not found a way (without introducing a global state wrapped in some mutex) to have recursive calls,

Have you tried a queue implementing producer-consumer pattern, see this python code, the todo variable is the queue, several flow of execution will fetch a message with todo.get(); with fibers that code looks in pseudo code:

(define queue (make-channel))

(run-fibers
  (spawn (lambda () (let ((msg (channel-receive queue))) (do-something-concurrently msg)))
  (for-each (lambda (msg) (channel-send queue msg)) (iota 999999999))))

Fibers will pill up messages, there is no overflow checks last time I checked, so you need to monitor the app memory behavior.

Channels use mutex and condition variable, and one of their advantage is that they avoid the need to use mutex and condvars.

amirouche avatar Jun 02 '22 14:06 amirouche

Hey, sorry, I don't know, how long it will take, until I will look at this project again. Might be soon, but also might be a year or two. So hopefully no one is waiting for anything and if anyone is, please don't =)

ZelphirKaltstahl avatar Jul 24 '22 14:07 ZelphirKaltstahl