From mboxrd@z Thu Jan 1 00:00:00 1970 Path: news.gmane.io!.POSTED.ciao.gmane.io!not-for-mail From: Zelphir Kaltstahl Newsgroups: gmane.lisp.guile.user Subject: Not understanding spawn-fiber's parallel? keyword argument Date: Sun, 26 Jan 2020 13:36:58 +0100 Message-ID: Mime-Version: 1.0 Content-Type: multipart/mixed; boundary="------------A8B0A7FC01575528F0AD714C" Injection-Info: ciao.gmane.io; posting-host="ciao.gmane.io:159.69.161.202"; logging-data="25290"; mail-complaints-to="usenet@ciao.gmane.io" User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Icedove/52.9.1 To: guile-user@gnu.org Original-X-From: guile-user-bounces+guile-user=m.gmane-mx.org@gnu.org Sun Jan 26 13:37:25 2020 Return-path: Envelope-to: guile-user@m.gmane-mx.org Original-Received: from lists.gnu.org ([209.51.188.17]) by ciao.gmane.io with esmtps (TLS1.2:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.92) (envelope-from ) id 1ivhAK-0006Vr-6H for guile-user@m.gmane-mx.org; Sun, 26 Jan 2020 13:37:24 +0100 Original-Received: from localhost ([::1]:33434 helo=lists1p.gnu.org) by lists.gnu.org with esmtp (Exim 4.90_1) (envelope-from ) id 1ivhAI-0006VD-Ma for guile-user@m.gmane-mx.org; Sun, 26 Jan 2020 07:37:22 -0500 Original-Received: from eggs.gnu.org ([2001:470:142:3::10]:54807) by lists.gnu.org with esmtp (Exim 4.90_1) (envelope-from ) id 1ivhA2-0006V6-LM for guile-user@gnu.org; Sun, 26 Jan 2020 07:37:08 -0500 Original-Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1ivhA0-00022x-II for guile-user@gnu.org; Sun, 26 Jan 2020 07:37:06 -0500 Original-Received: from mout01.posteo.de ([185.67.36.65]:50984) by eggs.gnu.org with esmtps (TLS1.0:DHE_RSA_AES_256_CBC_SHA1:32) (Exim 4.71) (envelope-from ) id 1ivh9z-00020Z-Tu for guile-user@gnu.org; Sun, 26 Jan 2020 07:37:04 -0500 Original-Received: from submission (posteo.de [89.146.220.130]) by mout01.posteo.de (Postfix) with ESMTPS id 4FFE6160063 for ; Sun, 26 Jan 2020 13:37:00 +0100 (CET) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=posteo.de; s=2017; t=1580042220; bh=kKMJ7cfHOwtAFOhwmVTGdqMpKUdOdDZa8BHlD98Z/0I=; h=To:From:Subject:Date:From; b=ipXvSHvO6Ck4S14a5HBtsFWeqhru1aLmYyEdeqEBbhk/FA5ePhGjNFxqEIcNDPA7b 1wPQJ7Hqb7D08CnD2WxTTOxyHhNK2g4Jk38S5fJnQRkYerWWH2+EX+Ogh7BzIFoT9W FrmG8O+kwyXRhO4CULaADtC0Ur3xvUoHqZUPnY/l1lV/ovYGclD+W7Zz/uQ2fVdmMp 3r3c2QSPRnlAnWEX9fHppYppKprQEMThCOpq2lYh/3YKep+v2DOLIqQ3/QBJv6yzUq BR3sSLGT0n2IuvLVMfxSx6Cid6N4WDcoV7M9DKJt/sGfrm4KYe51olXL07yDbetynh 3jsBTolxl+/pA== Original-Received: from customer (localhost [127.0.0.1]) by submission (posteo.de) with ESMTPSA id 485C7H5CRWz6tm8 for ; Sun, 26 Jan 2020 13:36:59 +0100 (CET) Content-Language: en-US X-detected-operating-system: by eggs.gnu.org: GNU/Linux 2.2.x-3.x [generic] X-Received-From: 185.67.36.65 X-BeenThere: guile-user@gnu.org X-Mailman-Version: 2.1.23 Precedence: list List-Id: General Guile related discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: guile-user-bounces+guile-user=m.gmane-mx.org@gnu.org Original-Sender: "guile-user" Xref: news.gmane.io gmane.lisp.guile.user:16095 Archived-At: This is a multi-part message in MIME format. --------------A8B0A7FC01575528F0AD714C Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit Hi Guile Users! I am experimenting with guile-fibers and hit some behavior that seems weird. Have a spawn-fiber call in a run-fibers call inside a call-with-new-thread, to spawn and run fibers without blocking the whole execution of the program. The code is probably very similar to amirouche's babelia thread pool (I get a 404 currently, when trying to visit it on Github and could not find it on Gitlab either, could it be you are moving it elsewhere @amirouche?), except, that I am trying to use fibers instead of threads: ~~~~~~~~8<~~~~~~~~8<~~~~~~~~ (define pool-initializer (lambda* (#:key (parallelism (current-processor-count))) (let ([channel-receive (make-channel)] [scheduler (make-scheduler #:parallelism parallelism)]) ;; Start as many workers as are desired. ;; TODO: PROBLEM: ~run-fibers~ blocks. So we need a new thread to run the ;; fibers in a non-blocking way. LOOKUP: How to start fibers without ;; waiting for them to finish? (call-with-new-thread (lambda () (run-fibers (lambda () (let loop ([index parallelism]) (unless (zero? index) (display "[POOL INIT THREAD]: will spawn fiber ") (displayln index) (spawn-fiber (lambda () (worker index channel-receive)) ) ; add #:parallel? #t keyword argument here ;; We do not need to spawn new fibers in the same scheduler later. The ;; fibers should stay alive for the whole duration the program is ;; running. (displayln "[POOL INIT THREAD]: fiber spawned") (loop (- index 1))))) #:scheduler scheduler) (displayln "[POOL INIT]: pool init thread returning") )) (displayln "[POOL INIT]: will start work-distributor") (call-with-new-thread (lambda () (work-distributor channel-receive))) ;; Return the channel for receiving work, so that the outside context can ;; make use of it when calling ~publish~ to publish work. channel-receive))) ~~~~~~~~>8~~~~~~~~>8~~~~~~~~ So there is that call to spawn-fiber. This call starts executing whatever is defined in ~worker~. However, when I add the #:parallel? #t keyword argument, the fiber never starts running. This is confusing. I thought (after reading the manual of fibers regarding this keyword argument multiple times) that it would only change what scheduler (scheduler or peer scheduler) the fiber is run on, but not whether it is run at all. It seems, that somehow when I add the keyword #:parallel? #t, it never is run. Can anyone explain, why this happens? I will send the complete code as attachment. There is also the question, why the fibers do not run in parallel, as they should, because I created the scheduler with #:parallelism 2. Instead they run sequentially. But maybe that makes for another e-mail later. Btw.: Thanks @amirouche! After a while of looking at your thread pool code and drawing some diagrams to help me understanding it, it seems quite clever and just in the direction I probably need for a fibers "thread pool". Regards, Zelphir --------------A8B0A7FC01575528F0AD714C Content-Type: text/x-scheme; name="pool.scm" Content-Transfer-Encoding: 7bit Content-Disposition: attachment; filename="pool.scm" (define-module (fibers-pool)) (use-modules ;; FIFO queue, not functional, using mutation ;; https://www.gnu.org/software/guile/manual/html_node/Queues.html (ice-9 q) (ice-9 match) (ice-9 threads) (rnrs exceptions) (rnrs conditions) ;; fibers internals are needed for creating schedulers without running anything ;; in them immediately (fibers) (fibers channels) (fibers internal)) (define displayln (lambda (msg) (display msg) (newline))) (define work-distributor (lambda (channel-receive) ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor started") ;; (displayln "[WORK-DISTRIBUTOR]: starting work-distributor message loop") (let loop ([work-queue (make-q)] [worker-channels-queue (make-q)]) (displayln "[WORK-DISTRIBUTOR]: work-distributor is listening for messages") (display "[WORK-DISTRIBUTOR]: number of ready workers in queue: ") (displayln (q-length worker-channels-queue)) (display "[WORK-DISTRIBUTOR]: number of works in queue: ") (displayln (q-length work-queue)) (match (pk 'work-distributor-received-msg (get-message channel-receive)) [('worker-ready . channel-worker) (displayln "[WORK-DISTRIBUTOR]: work-distributor received ready worker channel") ;; If there is no work for the ready worker, enqueue the worker, ;; otherwise give it work. (cond [(q-empty? work-queue) ;; (displayln "[WORK-DISTRIBUTOR]: work queue is empty") (enq! worker-channels-queue channel-worker)] [else ;; (displayln "[WORK-DISTRIBUTOR]: work queue has work") (let ([some-work (deq! work-queue)]) ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor will put work on channel") (put-message channel-worker (cons 'work some-work)) ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor did put work on channel") )]) (loop work-queue worker-channels-queue)] [('work . work) ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor received work") ;; ~work~ is always a pair of a thunk to be run and a return channel, ;; on which the result shall be put. ;; If there is no worker ready, enqueue the work, otherwise distribute ;; the work to a ready worker. (cond [(q-empty? worker-channels-queue) ;; (displayln "[WORK-DISTRIBUTOR]: worker queue is empty") (enq! work-queue work)] [else ;; (displayln "[WORK-DISTRIBUTOR]: ready workers available") (let ([channel-worker (deq! worker-channels-queue)]) ;; (displayln "[WORK-DISTRIBUTOR]: will put work on channel") (put-message channel-worker (cons 'work work)) ;; (displayln "[WORK-DISTRIBUTOR]: did put work on channel") )]) (loop work-queue worker-channels-queue)] ;; On any other message raise a condition. [other (raise (condition (make-error) (make-message-condition "work-distributor received unrecognized message") (make-irritants-condition (list other))))])))) (define worker (lambda (worker-index channel-receive) (let ([channel-worker (make-channel)]) (displayln "[WORKER]: before worker message loop") (let loop () ;; Report as ready. Give my own channel to the work-distributor to let ;; it send me work. (put-message channel-receive (cons 'worker-ready channel-worker)) ;; Get messages sent to me by the distributor on my own channel. (match (pk 'worker-got-msg (get-message channel-worker)) ;; If I receive work, do the work and return it on the channel-return. [('work . (thunk . channel-return)) ;; Put the result on the return channel, so that anyone, who has the ;; a binding of the return channel, can access the result. (put-message channel-return (thunk)) (loop)] ;; On any other message raise a condition. [other (raise (condition (make-error) (make-message-condition "worker received unrecognized message") (make-irritants-condition (list other))))]))))) (define pool-initializer (lambda* (#:key (parallelism (current-processor-count))) ;; (define run-fibers-in-scheduler ;; (displayln "[POOL INIT]: runnning pool-initializer") (let ([channel-receive (make-channel)] [scheduler (make-scheduler #:parallelism parallelism)]) ;; start as many workers as are desired ;; TODO: PROBLEM: ~run-fibers~ blocks. So we need a new thread to run the ;; fibers in a non-blocking way. LOOKUP: How to start fibers without ;; waiting for them to finish? ;; (displayln "[POOL INIT]: will run-fibers with new thread") (call-with-new-thread (lambda () ;; (displayln "[POOL INIT THREAD]: running") (run-fibers (lambda () ;; (displayln "[POOL INIT THREAD]: will start some fibers") ;; (display "[POOL INIT THREAD]: parallelism is: ") (displayln parallelism) (let loop ([index parallelism]) (unless (zero? index) ;; using fibers: ;; TODO: use created scheduler ;; (displayln "[POOL INIT THREAD]: there are more fibers to spawn") (display "[POOL INIT THREAD]: will spawn fiber ") (displayln index) (spawn-fiber (lambda () (worker index channel-receive)) ) ;; We do not need to spawn new fibers in the same scheduler later. The ;; fibers should stay alive for the whole duration the program is ;; running. (displayln "[POOL INIT THREAD]: fiber spawned") (loop (- index 1))))) #:scheduler scheduler) (displayln "[POOL INIT]: pool init thread returning") )) (displayln "[POOL INIT]: will start work-distributor") (call-with-new-thread (lambda () (work-distributor channel-receive))) ;; (displayln "[POOL INIT]: work-distributor is now running in new thread") ;; Return the channel for receiving work, so that the outside context can ;; make use of it when calling ~publish~ to publish work. ;; (displayln "[POOL INIT]: returning channel-receive") channel-receive))) (define publish (lambda (work-as-thunk channel-receive) ;; The result of the computation can be taken from ~channel-return~. (let ([channel-return (make-channel)]) ;; Put work tagged as work on the receive channel of the work-distributor. (let ([work-message (cons 'work (cons work-as-thunk channel-return))]) (display (simple-format #f "[PUBLISHER]: will publish the following work: ~a\n" work-message)) (put-message channel-receive work-message)) (displayln "[PUBLISHER]: work published") ;; Return the ~channel-return~, so that the outside context can get ;; results from it. channel-return))) (define busy-work (lambda () (let loop ([i 0]) (cond [(< i 5e8) (loop (+ i 1))] [else i])))) (define c-rec (pool-initializer #:parallelism 2)) (define c-ret-2 (publish (lambda () (busy-work)) c-rec)) (define c-ret-1 (publish (lambda () (busy-work)) c-rec)) ;; (get-message c-ret-2) ;; (get-message c-ret-1) --------------A8B0A7FC01575528F0AD714C--