unofficial mirror of guile-devel@gnu.org 
 help / color / mirror / Atom feed
* ice-9 thread-pool
@ 2012-02-06 19:14 Andy Wingo
  2012-02-06 22:55 ` Ludovic Courtès
  2012-02-08  4:03 ` Nala Ginrut
  0 siblings, 2 replies; 5+ messages in thread
From: Andy Wingo @ 2012-02-06 19:14 UTC (permalink / raw)
  To: guile-devel

[-- Attachment #1: Type: text/plain, Size: 97 bytes --]

Hi,

Related to my previous mail, here is a thread pool implementation.
Comments welcome.

Andy


[-- Attachment #2: ice-9 thread-pool --]
[-- Type: text/plain, Size: 5727 bytes --]

;;; Thread pools

;; Copyright (C)  2010, 2011, 2012 Free Software Foundation, Inc.

;; This library is free software; you can redistribute it and/or
;; modify it under the terms of the GNU Lesser General Public
;; License as published by the Free Software Foundation; either
;; version 3 of the License, or (at your option) any later version.
;;
;; This library is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;; Lesser General Public License for more details.
;;
;; You should have received a copy of the GNU Lesser General Public
;; License along with this library; if not, write to the Free Software
;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
;; 02110-1301 USA

;;; Commentary:
;;;
;;; A simple thread pool implementation.
;;;
;;; Code:

(define-module (ice-9 thread-pool)
  #:use-module (srfi srfi-9)
  #:use-module (srfi srfi-9 gnu)
  #:use-module (ice-9 threads)
  #:export (make-thread-pool
            start-thread-pool!
            pause-thread-pool!
            stop-thread-pool!))


(define-record-type <worker>
  (%make-worker mutex condvar thread handler state)
  worker?
  (mutex worker-mutex)
  (condvar worker-condvar)
  (thread worker-thread set-worker-thread!)
  (handler worker-handler)
  (state worker-state set-worker-state!))

(define (make-worker handler)
  (%make-worker (make-mutex) (make-condition-variable) #f handler 'stopped))

(define* (worker-wait worker #:optional time)
  (if time
      (wait-condition-variable (worker-condvar worker)
                               (worker-mutex worker)
                               time)
      (wait-condition-variable (worker-condvar worker)
                               (worker-mutex worker))))

(define (worker-stopping? worker)
  (with-mutex (worker-mutex worker)
    (let lp ()
      (case (worker-state worker)
        ((running)
         #f)
        ((pausing)
         (set-worker-state! worker 'paused)
         (signal-condition-variable (worker-condvar worker))
         (lp))
        ((paused)
         (worker-wait worker)
         (lp))
        ((stopping)
         #t)
        (else
         (error "bad state" worker))))))

(define (worker-loop worker)
  (let lp ()
    (or (worker-stopping? worker)
        (begin
          ((worker-handler worker))
          (lp)))))

(define (pause-worker worker)
  (with-mutex (worker-mutex worker)
    (let lp ()
      (case (worker-state worker)
        ((running)
         (set-worker-state! worker 'pausing)
         (lp))
        ((pausing)
         #f)
        ((paused)
         #t)
        ((stopping)
         (error "attempt to go stopping -> pausing" worker))
        ((stopped)
         (set-worker-state! worker 'pausing)
         (set-worker-thread! worker
                             (make-thread worker-loop worker))
         (lp))
        (else
         (error "bad state" worker))))))

(define* (wait-for-paused worker #:optional time)
  (with-mutex (worker-mutex worker)
    (let lp ()
      (case (worker-state worker)
        ((paused)
         #t)
        (else
         (and (worker-wait worker time)
              (lp)))))))

(define (start-worker worker)
  (with-mutex (worker-mutex worker)
    (let lp ()
      (case (worker-state worker)
        ((running)
         #t)
        ((stopping)
         (error "attempt to go stopping -> running" worker))
        (else
         (set-worker-state! worker 'running)
         (if (worker-thread worker)
             (signal-condition-variable (worker-condvar worker))
             (set-worker-thread! worker (make-thread worker-loop worker)))
         (lp))))))

(define (stop-worker worker)
  (with-mutex (worker-mutex worker)
    (let lp ()
      (case (worker-state worker)
        ((stopped)
         #t)
        ((stopping)
         #f)
        (else
         (set-worker-state! worker 'stopping)
         (lp))))))

(define* (wait-for-stopped worker #:optional time #:key cancel?)
  (let ((thread
         (with-mutex (worker-mutex worker)
           (cond
            ((eq? (worker-state worker) 'stopped)
             #f)
            ((thread-exited? (worker-thread worker))
             (set-worker-thread! worker #f)
             (set-worker-state! worker 'stopped)
             #f)
            (else
             (worker-thread worker))))))
    (or (not thread)
        (begin
          (if time
              (join-thread (worker-thread worker) time)
              (join-thread (worker-thread worker)))
          (cond
           ((thread-exited? thread)
            (wait-for-stopped worker))
           (cancel?
            (cancel-thread (worker-thread worker))
            ;; Assume it works.
            (join-thread (worker-thread worker))
            (wait-for-stopped worker))
           (else
            #f))))))

(define-record-type <thread-pool>
  (%make-thread-pool size workers)
  thread-pool?
  (size thread-pool-size)
  (workers thread-pool-workers))

;; Create a thread pool, and bring it to the "paused" state.
;;
(define (make-thread-pool size handler)
  (let ((pool (%make-thread-pool size
                                 (map (lambda (i) (make-worker handler))
                                      (iota size)))))
    (pause-thread-pool! pool)
    pool))

(define (pause-thread-pool! pool)
  (for-each pause-worker (thread-pool-workers pool))
  (for-each wait-for-paused (thread-pool-workers pool)))

(define (start-thread-pool! pool)
  (for-each start-worker (thread-pool-workers pool)))

(define (stop-thread-pool! pool)
  (for-each stop-worker (thread-pool-workers pool))
  (for-each wait-for-stopped (thread-pool-workers pool)))

[-- Attachment #3: Type: text/plain, Size: 26 bytes --]


-- 
http://wingolog.org/

^ permalink raw reply	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2012-02-08 13:39 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2012-02-06 19:14 ice-9 thread-pool Andy Wingo
2012-02-06 22:55 ` Ludovic Courtès
2012-02-08  4:03 ` Nala Ginrut
2012-02-08  9:47   ` Andy Wingo
2012-02-08 13:39     ` Ludovic Courtès

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).