unofficial mirror of guile-user@gnu.org 
 help / color / mirror / Atom feed
* Not understanding spawn-fiber's parallel? keyword argument
@ 2020-01-26 12:36 Zelphir Kaltstahl
  0 siblings, 0 replies; only message in thread
From: Zelphir Kaltstahl @ 2020-01-26 12:36 UTC (permalink / raw)
  To: guile-user

[-- Attachment #1: Type: text/plain, Size: 3196 bytes --]

Hi Guile Users!

I am experimenting with guile-fibers and hit some behavior that seems weird.

Have a spawn-fiber call in a run-fibers call inside a
call-with-new-thread, to spawn and run fibers without blocking the whole
execution of the program. The code is probably very similar to
amirouche's babelia thread pool (I get a 404 currently, when trying to
visit it on Github and could not find it on Gitlab either, could it be
you are moving it elsewhere @amirouche?), except, that I am trying to
use fibers instead of threads:

~~~~~~~~8<~~~~~~~~8<~~~~~~~~
(define pool-initializer
  (lambda* (#:key (parallelism (current-processor-count)))
    (let ([channel-receive (make-channel)]
          [scheduler (make-scheduler #:parallelism parallelism)])
      ;; Start as many workers as are desired.
      ;; TODO: PROBLEM: ~run-fibers~ blocks. So we need a new thread to run the
      ;; fibers in a non-blocking way. LOOKUP: How to start fibers without
      ;; waiting for them to finish?
      (call-with-new-thread
       (lambda ()
         (run-fibers
          (lambda ()
            (let loop ([index parallelism])
              (unless (zero? index)
                (display "[POOL INIT THREAD]: will spawn fiber ") (displayln index)
                (spawn-fiber (lambda () (worker index channel-receive))
                             )  ; add #:parallel? #t keyword argument here
                ;; We do not need to spawn new fibers in the same scheduler later. The
                ;; fibers should stay alive for the whole duration the program is
                ;; running.
                (displayln "[POOL INIT THREAD]: fiber spawned")
                (loop (- index 1)))))
          #:scheduler scheduler)
         (displayln "[POOL INIT]: pool init thread returning")
         ))
      (displayln "[POOL INIT]: will start work-distributor")
      (call-with-new-thread
       (lambda ()
         (work-distributor channel-receive)))
      ;; Return the channel for receiving work, so that the outside context can
      ;; make use of it when calling ~publish~ to publish work.
      channel-receive)))
~~~~~~~~>8~~~~~~~~>8~~~~~~~~

So there is that call to spawn-fiber. This call starts executing
whatever is defined in ~worker~. However, when I add the #:parallel? #t
keyword argument, the fiber never starts running. This is confusing. I
thought (after reading the manual of fibers regarding this keyword
argument multiple times) that it would only change what scheduler
(scheduler or peer scheduler) the fiber is run on, but not whether it is
run at all. It seems, that somehow when I add the keyword #:parallel?
#t, it never is run.

Can anyone explain, why this happens?

I will send the complete code as attachment.

There is also the question, why the fibers do not run in parallel, as
they should, because I created the scheduler with #:parallelism 2.
Instead they run sequentially. But maybe that makes for another e-mail
later.

Btw.: Thanks @amirouche! After a while of looking at your thread pool
code and drawing some diagrams to help me understanding it, it seems
quite clever and just in the direction I probably need for a fibers
"thread pool".

Regards,
Zelphir


[-- Attachment #2: pool.scm --]
[-- Type: text/x-scheme, Size: 7617 bytes --]

(define-module (fibers-pool))


(use-modules
 ;; FIFO queue, not functional, using mutation
 ;; https://www.gnu.org/software/guile/manual/html_node/Queues.html
 (ice-9 q)
 (ice-9 match)
 (ice-9 threads)
 (rnrs exceptions)
 (rnrs conditions)
 ;; fibers internals are needed for creating schedulers without running anything
 ;; in them immediately
 (fibers)
 (fibers channels)
 (fibers internal))


(define displayln
  (lambda (msg)
    (display msg)
    (newline)))


(define work-distributor
  (lambda (channel-receive)
    ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor started")
    ;; (displayln "[WORK-DISTRIBUTOR]: starting work-distributor message loop")
    (let loop ([work-queue (make-q)]
               [worker-channels-queue (make-q)])
      (displayln "[WORK-DISTRIBUTOR]: work-distributor is listening for messages")

      (display "[WORK-DISTRIBUTOR]: number of ready workers in queue: ")
      (displayln (q-length worker-channels-queue))

      (display "[WORK-DISTRIBUTOR]: number of works in queue: ")
      (displayln (q-length work-queue))

      (match (pk 'work-distributor-received-msg (get-message channel-receive))
        [('worker-ready . channel-worker)
         (displayln "[WORK-DISTRIBUTOR]: work-distributor received ready worker channel")
         ;; If there is no work for the ready worker, enqueue the worker,
         ;; otherwise give it work.
         (cond
          [(q-empty? work-queue)
           ;; (displayln "[WORK-DISTRIBUTOR]: work queue is empty")
           (enq! worker-channels-queue channel-worker)]
          [else
           ;; (displayln "[WORK-DISTRIBUTOR]: work queue has work")
           (let ([some-work (deq! work-queue)])
             ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor will put work on channel")
             (put-message channel-worker (cons 'work some-work))
             ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor did put work on channel")
             )])
         (loop work-queue worker-channels-queue)]
        [('work . work)
         ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor received work")
         ;; ~work~ is always a pair of a thunk to be run and a return channel,
         ;; on which the result shall be put.

         ;; If there is no worker ready, enqueue the work, otherwise distribute
         ;; the work to a ready worker.
         (cond
          [(q-empty? worker-channels-queue)
           ;; (displayln "[WORK-DISTRIBUTOR]: worker queue is empty")
           (enq! work-queue work)]
          [else
           ;; (displayln "[WORK-DISTRIBUTOR]: ready workers available")
           (let ([channel-worker (deq! worker-channels-queue)])
             ;; (displayln "[WORK-DISTRIBUTOR]: will put work on channel")
             (put-message channel-worker (cons 'work work))
             ;; (displayln "[WORK-DISTRIBUTOR]: did put work on channel")
             )])
         (loop work-queue worker-channels-queue)]
        ;; On any other message raise a condition.
        [other
         (raise
          (condition
           (make-error)
           (make-message-condition "work-distributor received unrecognized message")
           (make-irritants-condition (list other))))]))))


(define worker
  (lambda (worker-index channel-receive)
    (let ([channel-worker (make-channel)])
      (displayln "[WORKER]: before worker message loop")
      (let loop ()
        ;; Report as ready. Give my own channel to the work-distributor to let
        ;; it send me work.
        (put-message channel-receive
                     (cons 'worker-ready
                           channel-worker))
        ;; Get messages sent to me by the distributor on my own channel.
        (match (pk 'worker-got-msg (get-message channel-worker))
          ;; If I receive work, do the work and return it on the channel-return.
          [('work . (thunk . channel-return))
           ;; Put the result on the return channel, so that anyone, who has the
           ;; a binding of the return channel, can access the result.
           (put-message channel-return (thunk))
           (loop)]
          ;; On any other message raise a condition.
          [other
           (raise
            (condition
             (make-error)
             (make-message-condition "worker received unrecognized message")
             (make-irritants-condition (list other))))])))))


(define pool-initializer
  (lambda* (#:key (parallelism (current-processor-count)))

    ;; (define run-fibers-in-scheduler

    ;; (displayln "[POOL INIT]: runnning pool-initializer")
    (let ([channel-receive (make-channel)]
          [scheduler (make-scheduler #:parallelism parallelism)])
      ;; start as many workers as are desired

      ;; TODO: PROBLEM: ~run-fibers~ blocks. So we need a new thread to run the
      ;; fibers in a non-blocking way. LOOKUP: How to start fibers without
      ;; waiting for them to finish?
      ;; (displayln "[POOL INIT]: will run-fibers with new thread")

      (call-with-new-thread
       (lambda ()
         ;; (displayln "[POOL INIT THREAD]: running")
         (run-fibers
          (lambda ()
            ;; (displayln "[POOL INIT THREAD]: will start some fibers")
            ;; (display "[POOL INIT THREAD]: parallelism is: ") (displayln parallelism)
            (let loop ([index parallelism])
              (unless (zero? index)
                ;; using fibers:
                ;; TODO: use created scheduler
                ;; (displayln "[POOL INIT THREAD]: there are more fibers to spawn")
                (display "[POOL INIT THREAD]: will spawn fiber ") (displayln index)
                (spawn-fiber (lambda () (worker index channel-receive))
                             )
                ;; We do not need to spawn new fibers in the same scheduler later. The
                ;; fibers should stay alive for the whole duration the program is
                ;; running.
                (displayln "[POOL INIT THREAD]: fiber spawned")
                (loop (- index 1)))))
          #:scheduler scheduler)
         (displayln "[POOL INIT]: pool init thread returning")
         ))
      (displayln "[POOL INIT]: will start work-distributor")
      (call-with-new-thread
       (lambda ()
         (work-distributor channel-receive)))
      ;; (displayln "[POOL INIT]: work-distributor is now running in new thread")
      ;; Return the channel for receiving work, so that the outside context can
      ;; make use of it when calling ~publish~ to publish work.
      ;; (displayln "[POOL INIT]: returning channel-receive")
      channel-receive)))


(define publish
  (lambda (work-as-thunk channel-receive)
    ;; The result of the computation can be taken from ~channel-return~.
    (let ([channel-return (make-channel)])
      ;; Put work tagged as work on the receive channel of the work-distributor.
      (let ([work-message (cons 'work (cons work-as-thunk channel-return))])
        (display
         (simple-format
          #f "[PUBLISHER]: will publish the following work: ~a\n"
          work-message))
        (put-message channel-receive work-message))

      (displayln "[PUBLISHER]: work published")
      ;; Return the ~channel-return~, so that the outside context can get
      ;; results from it.
      channel-return)))


(define busy-work
  (lambda ()
    (let loop ([i 0])
      (cond
       [(< i 5e8) (loop (+ i 1))]
       [else i]))))


(define c-rec (pool-initializer #:parallelism 2))
(define c-ret-2 (publish (lambda () (busy-work)) c-rec))
(define c-ret-1 (publish (lambda () (busy-work)) c-rec))
;; (get-message c-ret-2)
;; (get-message c-ret-1)

^ permalink raw reply	[flat|nested] only message in thread

only message in thread, other threads:[~2020-01-26 12:36 UTC | newest]

Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-01-26 12:36 Not understanding spawn-fiber's parallel? keyword argument Zelphir Kaltstahl

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).