unofficial mirror of guile-user@gnu.org 
 help / color / mirror / Atom feed
* Potluck - thread safe event loop with await semantics
@ 2016-02-16 21:45 Chris Vine
  2016-02-22 12:01 ` Ludovic Courtès
  0 siblings, 1 reply; 17+ messages in thread
From: Chris Vine @ 2016-02-16 21:45 UTC (permalink / raw)
  To: guile-user

[-- Attachment #1: Type: text/plain, Size: 1079 bytes --]

Hi,

Here for potluck is a considerably improved version of the event loop on
which I responded a few months ago, which I have spent some time
tidying up this week.

It features an a-sync procedure (in coroutines.scm) which can be used to
provide await semantics on asynchronous code (so as to remedy inversion
of control), and will work with callbacks for any event loop, including
the glib event loop wrapped by guile-gnome.  More to the point, it also
provides a thread safe event loop for guile (event-loop.scm) with
support for watches on ports/file descriptors, and now supports proper
timeouts, and permits events to be posted by other tasks.  This
includes tasks running on other threads, for which there is a helper
procedure a-sync-run-task-in-thread.

It would be nice to have a monotonic clock available for timeouts where
the system supports it, but guile does not provide that out of the box.
It would therefore be a separate exercise to wrap clock_gettime() with a
CLOCK_MONOTONIC argument to replace the use of the gettimeofday
procedure in event-loop.scm.

Chris

[-- Attachment #2: coroutines.scm --]
[-- Type: text/x-scheme, Size: 7305 bytes --]

;; Copyright Chris Vine 2014 and 2016

;; This library is free software; you can redistribute it and/or
;; modify it under the terms of the GNU Lesser General Public
;; License as published by the Free Software Foundation; either
;; version 3 of the License, or (at your option) any later version.
;; 
;; This library is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;; Lesser General Public License for more details.
;; 
;; You should have received a copy of the GNU Lesser General Public
;; License along with this library; if not, write to the Free Software
;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

(define-module (coroutines)
  #:use-module (event-loop) ;; for event-post!
  #:export (make-iterator
	    make-coroutine
	    a-sync
	    a-sync-run-task-in-thread))

;; this procedure takes a generator procedure, namely a procedure
;; which has a 'yield' parameter for its first or only argument,
;; followed by such other arguments (other than the one for the
;; 'yield' parameter) as the generator procedure requires, and
;; constructs an iterator from them.  When the iterator is invoked, it
;; will begin executing the procedure unless and until the argument
;; comprising the yield procedure is called, which will cause the
;; iterator to suspend computation and instead return the value passed
;; to yield (yield is a procedure taking no or one argument).  If
;; invoked again, the iterator will resume computation at the point
;; where it last left off (returning the value, if any, passed to the
;; iterator on resuming).  When the generator procedure has executed
;; to the end, the iterator returns 'stop-iteration.  This procedure
;; has some resemblance to call/ec, except that (i) instead of
;; executing the passed procedure immediately, it returns an iterator
;; which will do so, (ii) it is resumable, and (iii) the procedure to
;; be executed can receive starting arguments in addition to the
;; yield/break argument, to provide an alternative to binding them
;; with a lambda closure.  It is similar to ECMAScript generators and
;; python generators.
(define (make-iterator proc . args)
  (define tag (make-prompt-tag))
  (define send-back '())
  (define (thunk)
    (apply proc
	   (lambda* (#:optional val)
	     (abort-to-prompt tag val)
	     send-back)
	   args)
    ;; the generator procedure has returned - reset thunk to do
    ;; nothing except return 'stop-iteration and return
    ;; 'stop-iteration after this last call to proc
    (set! thunk (lambda () 'stop-iteration))
    'stop-iteration)
  (lambda* (#:optional send-arg)
    (set! send-back send-arg)
    (call-with-prompt tag
		      thunk
		      (lambda (cont ret)
			(set! thunk cont)
			ret))))

;; this procedure takes a generator procedure, namely a procedure
;; which has a 'yield' parameter for its first or only argument,
;; followed by such other arguments (other than the one for the
;; 'yield' parameter) as the generator procedure requires, and
;; constructs a coroutine.  It is similar to make-iterator, in that it
;; takes a generator procedure and returns a lambda object (a
;; coroutine) which when called will begin executing the generator
;; procedure unless and until the argument comprising the yield
;; procedure is called, which will cause computation to be suspended.
;; However unlike make-iterator, the resumption continuation generated
;; on yielding is returned by the coroutine when yielding rather than
;; being stored internally in an iterator, so there is no explicit
;; retained mutable state.  The return value of the coroutine
;; comprises two values: first the resumption continuation, and second
;; the value (if any) passed to 'yield' when called.  If the returned
;; resumption continuation is subsequently called again, computation
;; will be resumed at the point where it last left off (the yield
;; procedure returning the value, if any, passed to the continuation
;; on resuming) until it completes or it again calls the yield
;; procedure.
;;
;; Upon the generator procedure finally completing, the value to which
;; it evaluates is returned by the resumption continuation together
;; with a continuation value of #f.  This differs from the behaviour
;; of make-iterator, which returns 'stop-iteration when the generator
;; procedure finishes to completion and ignores its return value.
(define (make-coroutine proc . args)
  (define tag (make-prompt-tag))
  (define (abort-handler cont ret)
    (define* (resume #:optional arg)
      (call-with-prompt
       tag
       (lambda () (cont arg))
       abort-handler))
    (values resume ret))
  ;; 'arg' is ignored - it is provided only for consistency with the
  ;; interface of resume
  (lambda* (#:optional arg)
	   (call-with-prompt
	    tag
	    (lambda ()
	      (values #f
		      (apply proc
			     (lambda* (#:optional arg)
				      (abort-to-prompt tag arg))
			     args)))
	    abort-handler)))
     
;; a-sync takes a waitable procedure (namely a procedure which takes
;; 'await' as its first parameter, which is a yield procedure obtained
;; by a call to make-iterator, and 'resume' as its second parameter,
;; which is an iterator constructed by make-iterator), followed by
;; such other arguments (if any) as the waitable procedure requires to
;; be passed on to it.  The 'resume' argument must only be called by
;; an asynchronous callback, and the 'await' argument must only be
;; called by the waitable procedure in order to block until the
;; callback is ready to let it resume.  When it unblocks, the 'await'
;; argument returns the value (if any) passed to 'resume' by the
;; callback.  This async procedure must be called in the same thread
;; as that in which the event loop which will execute the callback
;; runs.
;;
;; There can be as many calls to 'await' and asynchronous callbacks in
;; any one waitable procedure as wanted, to enable composition of
;; asynchronous operations.  None of the code in the waitable
;; procedure should block on other things in the program, except by
;; calls to await (which do not in fact block, even though they appear
;; to do so).  async must be called in the thread in which the
;; callback will execute, namely the main loop thread.
;;
;; This can be used with any event loop, including the glib event loop
;; provided by guile-gnome and so with gtk+ callbacks, and with the
;; event loop in the event-loop module.
(define (a-sync waitable . args)
  (letrec ([resume (make-iterator (lambda (await)
				  (apply waitable await resume args)))])
    (resume)))

;; this is a convenience procedure for use with the event loop in the
;; event-loop module, which will run thunk in its own thread and then
;; post the result to the main loop specified by the loop argument,
;; where it then applies resume (obtained from a call to async) to
;; that result.  It is intended to be called in a waitable procedure.
;; It will normally be necessary to call event-loop-block! before
;; invoking this procedure.
(define (a-sync-run-task-in-thread thunk loop resume)
  (call-with-new-thread
   (lambda ()
     (let ([res (thunk)])
       (event-post! loop (lambda ()
			   (resume res)))))))

[-- Attachment #3: event-loop.scm --]
[-- Type: text/x-scheme, Size: 19945 bytes --]

;; Copyright Chris Vine 2014 and 2016

;; This library is free software; you can redistribute it and/or
;; modify it under the terms of the GNU Lesser General Public
;; License as published by the Free Software Foundation; either
;; version 3 of the License, or (at your option) any later version.
;; 
;; This library is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;; Lesser General Public License for more details.
;; 
;; You should have received a copy of the GNU Lesser General Public
;; License along with this library; if not, write to the Free Software
;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

(define-module (event-loop)
  #:use-module (ice-9 q)        ;; for make-q, etc
  #:use-module (srfi srfi-1)    ;; for reduce, delete!, alist-delete!, delete-duplicates and assoc
  #:use-module (rnrs records syntactic)
  #:use-module (ice-9 threads)  ;; for with-mutex
  #:export (make-event-loop
	    event-loop-run!
	    event-loop-add-read-watch!
	    event-loop-add-write-watch!
	    event-loop-remove-watch!
	    event-loop-block!
	    event-loop-quit!
	    event-post!
	    timeout-post!
	    timeout-remove!))


(define-record-type (event-loop _make-event-loop event-loop?)
  (fields (immutable mutex _mutex-get)
	  (immutable q _q-get)
	  (mutable done _done-get _done-set!)
	  (mutable event-in _event-in-get _event-in-set!)
	  (mutable event-out _event-out-get _event-out-set!)
	  (mutable read-files _read-files-get _read-files-set!)
	  (mutable read-files-actions _read-files-actions-get _read-files-actions-set!)
	  (mutable write-files _write-files-get _write-files-set!)
	  (mutable write-files-actions _write-files-actions-get _write-files-actions-set!)
	  (mutable timeouts _timeouts-get _timeouts-set!)
	  (mutable current-timeout _current-timeout-get _current-timeout-set!)
	  (mutable block _block-get _block-set!)))

(define (make-event-loop)
  (let* ([event-pipe (pipe)]
	 [in (car event-pipe)]
	 [out (cdr event-pipe)])
    ;; the write end of the pipe needs to be set non-blocking so that
    ;; if the pipe fills and the event loop thread is also putting a
    ;; new event in the queue, an exception is thrown rather than a
    ;; deadlock arising
    (fcntl out F_SETFL (logior O_NONBLOCK
			       (fcntl out F_GETFL)))
    (_make-event-loop (make-mutex)
		      (make-q)
		      #f
		      in
		      out
		      '()
		      '()
		      '()
		      '()
		      '()
		      #f
		      #f)))

;; timeouts are kept as an unsorted list of timeout items.  Each
;; timeout item is a vector of four elements.  First, an absolute time
;; pair (say as generated by _get-abstime) representing the time it is
;; next due to fire; second, a gensym value representing its tag;
;; third, the timeout value in milliseconds from which it was
;; generated (so it can be used again if it is a repeating timeout);
;; and fourth, a callback thunk representing the action to be executed
;; when the timeout is due.  This procedure finds the next timeout
;; item due to fire, or #f if the list passed in is empty.
(define (_next-timeout timeouts)
  (reduce (lambda (elt previous)
	    (if (< (_time-remaining (vector-ref elt 0))
		   (_time-remaining (vector-ref previous 0)))
		elt
		previous))
	  #f
	  timeouts))

;; for an absolute time pair (secs . usecs), say as generated by
;; _get-abstime, this returns the number of seconds remaining to
;; current time as a real number.  If the absolute time is in the
;; past, it returns a negative number: this is useful behaviour as it
;; will help the _next-timeout procedure pick the oldest timeout
;; amongst any which have already expired.  However, normalize a
;; negative value to 0 before passing it to select.
(define (_time-remaining abstime)
  (let ([curr (gettimeofday)])
    (let ([secs (- (car abstime) (car curr))]
	  [usecs (- (cdr abstime) (cdr curr))])
      (+ (exact->inexact secs) (/ (exact->inexact usecs) 1000000)))))

;; takes a timeout value in milliseconds and returns a (secs . usecs)
;; pair representing current time plus the timeout value as an
;; absolute time
(define (_get-abstime msecs)
  (let* ([curr (gettimeofday)]
	 [usec-tmp (round (+ (* msecs 1000) (cdr curr)))])
    (let ([secs (+ (car curr) (quotient usec-tmp 1000000))]
	  [usecs (remainder usec-tmp 1000000)])
    (cons secs usecs))))

(define (_process-timeouts el)
  ;; we don't need any mutexes here as we only access the timeouts and
  ;; current-timeout fields of an event loop object, and any
  ;; individual timeout item vectors, in the event loop thread
  (let ([current-timeout (_current-timeout-get el)])
    (when (and current-timeout
	       (<= (_time-remaining (vector-ref current-timeout 0)) 0))
      (if ((vector-ref current-timeout 3))
	  (vector-set! current-timeout 0
		       (_get-abstime (vector-ref current-timeout 2)))
	  (_timeouts-set! el (_filter-timeout! (_timeouts-get el) (vector-ref current-timeout 1))))
      (_current-timeout-set! el (_next-timeout (_timeouts-get el))))))

;; this returns a list of timeouts, with the tagged timeout removed
;; from the timeout list passed in
(define (_filter-timeout! timeouts tag)
  (let loop ([remaining timeouts]
	     [checked '()])
    (if (null? remaining)
	(begin
	  ;; it is probably too extreme to error here if the user
	  ;; requests to remove a timeout which no longer exists
	  (simple-format (current-error-port)
			 "Warning: timeout tag ~A not found in timeouts list in procedure _filter-timeout!\n"
			 tag)
	  (force-output (current-error-port))
	  timeouts)
	(let ([first (car remaining)])
	  (if (eq? (vector-ref first 1) tag)
	      (append (reverse checked) (cdr remaining))
	      (loop (cdr remaining) (cons first checked)))))))

;; for the purposes of the event loop, two files compare equal if
;; their file descriptors are the same, even if one is a port and one
;; is a file descriptor (or both are a file)
(define (_file-equal? file1 file2)
  (let ([fd1 (if (port? file1) (fileno file1) file1)]
	[fd2 (if (port? file2) (fileno file2) file2)])
    (= fd1 fd2)))

;; we don't need any mutexes here as we only access any of the
;; read-files, read-files-actions, write-files and write-files-actions
;; fields in the event loop thread.  This removes a given file watch
;; and its action from an event loop object.  If there is both a read
;; and write watch for the same file, both are removed.  A file
;; descriptor and a port with the same underlying file descriptor, or
;; two ports with the same underlying file descriptor, compare equal
;; for the purposes of removal.
(define (_remove-watch-impl! el file)
  (_read-files-set! el (delete! file (_read-files-get el) _file-equal?))
  (_write-files-set! el (delete! file (_write-files-get el) _file-equal?))
  (_read-files-actions-set! el (alist-delete! file (_read-files-actions-get el) _file-equal?))
  (_write-files-actions-set! el (alist-delete! file (_write-files-actions-get el) _file-equal?)))

;; the event loop runs in the thread which calls this procedure.  If
;; this is different from the thread which called make-event-loop,
;; external synchronization is required to ensure visibility.  Where
;; event-loop-quit! has been called, this procedure may be called
;; again to restart the same event loop.
(define (event-loop-run! el)
  (define mutex (_mutex-get el))
  (define q (_q-get el))
  (define event-in (_event-in-get el))
  (define event-fd (fileno event-in))

  ;; we don't need to use the mutex in this procedure except in
  ;; relation to q, _done-get and _block-get, as we only access the
  ;; current-timeout field of an event loop object, any individual
  ;; timeout item vectors, and any of the read-files,
  ;; read-files-actions, write-files and write-files-actions fields in
  ;; the event loop thread
  (let loop1 ()
    (_process-timeouts el)
    (when (not (and (null? (_read-files-get el))
		    (null? (_write-files-get el))
		    (null? (_timeouts-get el))
		    (with-mutex mutex (and (q-empty? q)
					   (not (_block-get el))))))
      ;; we provide local versions in order to take a consistent view
      ;; on each run, since we might remove items from the lists after
      ;; executing the callbacks
      (let ([read-files (_read-files-get el)]
	    [read-files-actions (_read-files-actions-get el)]
	    [write-files (_write-files-get el)]
	    [write-files-actions (_write-files-actions-get el)]
	    [current-timeout (_current-timeout-get el)])
	(let ([res (catch 'system-error
		     (lambda ()
		       (select (cons event-fd read-files)
			       write-files
			       (delete-duplicates (append read-files write-files) _file-equal?)
			       (and current-timeout
				    (let ([secs (_time-remaining (vector-ref current-timeout 0))])
				      (if (< secs 0) 0 secs)))))
		     (lambda args
		       (if (= EINTR (system-error-errno args))
			   '(() () ())
			   (apply throw args))))])
	  (for-each (lambda (elt)
		      (let ([action
			     (let ([item (assoc elt read-files-actions _file-equal?)])
			       (if item (cdr item) #f))])
			(if action
			    (when (not (action 'in))
			      (_remove-watch-impl! el elt))
			    (error "No action in event loop for read file: " elt))))
		    (delv event-fd (car res)))
	  (for-each (lambda (elt)
		      (let ([action
			     (let ([item (assoc elt write-files-actions _file-equal?)])
			       (if item (cdr item) #f))])
			(if action
			    (when (not (action 'out))
			      (_remove-watch-impl! el elt))
			    (error "No action in event loop for write file: " elt))))
		    (cadr res))
	  (for-each (lambda (elt)
		      (let ([action
			     (let ([item (assoc elt (append read-files-actions write-files-actions) _file-equal?)])
			       (if item (cdr item) #f))])
			(if action
			    (when (not (action 'excpt))
			      (_remove-watch-impl! el elt))
			    (error "No action in event loop for file: " elt))))
		    (caddr res))
	  (when (memv event-fd (car res))
	    (let loop2 ()
	      (let ([c (read-char event-in)])
		(if (eof-object? c)
		    (with-mutex mutex (_done-set! el #t))
		    (case c
		      [(#\x)
		       (let loop3 ()
			 ;; the strategy is to exhaust the entire
			 ;; event queue when #\x is in the pipe
			 ;; buffer.  This eliminates any concerns that
			 ;; events might go missing if the pipe fills
			 ;; up.
			 (let ([action (with-mutex mutex
					 (if (q-empty? q) #f (deq! q)))])
			   (when action
			     (action)
			     (when (not (with-mutex mutex (_done-get el)))
			       (loop3)))))
		       (when (and (char-ready? event-in)
				  (not (with-mutex mutex (_done-get el))))
			 (loop2))]
		      [else
		       (error "Invalid character in event pipe: " c)]))))))
	(if (not (with-mutex mutex (_done-get el)))
	    (loop1)
	    ;; clear out any stale events before returning and unblocking
	    (_event-loop-reset! el))))))


;; this procedure is only called in the event loop thread, by
;; event-loop-run!  The only things requiring protection by a mutex
;; are the q, done-set and event-out fields of the event loop object.
;; However, for consistency we deal with all operations on the event
;; pipe below via the mutex.
(define (_event-loop-reset! el)
  ;; the only foolproof way of vacating a unix pipe is to close it and
  ;; then create another one
  (with-mutex (_mutex-get el)
    ;; discard any EAGAIN exception when flushing the output buffer
    ;; of a fully filled pipe on closing
    (catch 'system-error
      (lambda ()
	(close-port (_event-in-get el))
	(close-port (_event-out-get el)))
      (lambda args
	(unless (= EAGAIN (system-error-errno args))
	  (apply throw args))))
      
    (let* ([event-pipe (pipe)]
	   [in (car event-pipe)]
	   [out (cdr event-pipe)])
      (fcntl out F_SETFL (logior O_NONBLOCK
				 (fcntl out F_GETFL)))
      (_event-in-set! el in)
      (_event-out-set! el out))
    (let ([q (_q-get el)])
      (let loop ()
	(when (not (q-empty? q))
	  (deq! q)
	  (loop))))
    (_done-set! el #f))
  (_read-files-set! el '())
  (_read-files-actions-set! el '())
  (_write-files-set! el '())
  (_write-files-actions-set! el '())
  (_timeouts-set! el '())
  (_current-timeout-set! el #f))

;; The 'proc' callback should take a single argument, and when called
;; this will be set to 'in or 'excpt.  The same port or file
;; descriptor can also be passed to event-loop-add-write-watch, and if
;; so and the descriptor is also available for writing, the write
;; callback will also be called with its argument set to 'out.  If
;; there is already a read watch for the file passed, the old one will
;; be replaced by the new one.  If proc returns #f, the read watch
;; (and any write watch for the same file) will be removed from the
;; event loop, otherwise the watch will continue.  This is thread safe
;; - any thread may add a watch, and the callback will execute in the
;; event loop thread.  The file argument can be either a port or a
;; file descriptor.  If 'file' is a file descriptor, any port for the
;; descriptor is not referenced for garbage collection purposes - it
;; must remain valid while operations are carried out on the
;; descriptor.  If 'file' is a buffered port, buffering will be taken
;; into account in indicating whether a read can be made without
;; blocking (but on a buffered port, for efficiency purposes each read
;; operation in response to this watch should usually exhaust the
;; buffer by looping on char-ready?).
(define (event-loop-add-read-watch! el file proc)
  (event-post! el (lambda ()
		    (_read-files-set!
		     el
		     (cons file
			   (delete! file (_read-files-get el) _file-equal?)))
		    (_read-files-actions-set!
		     el
		     (acons file proc
			    (alist-delete! file (_read-files-actions-get el) _file-equal?))))))

;; The 'proc' callback should take a single argument, and when called
;; this will be set to 'out or 'excpt.  The same port or file
;; descriptor can also be passed to event-loop-add-read-watch, and if
;; so and the descriptor is also available for reading or in error,
;; the read callback will also be called with its argument set to 'in
;; or 'excpt (if both a read and a write watch have been set for the
;; same file argument, and there is an exceptional condition, it is
;; the read watch procedure which will be called with 'excpt rather
;; than the write watch procedure).  If there is already a write watch
;; for the file passed, the old one will be replaced by the new one.
;; If proc returns #f, the write watch (and any read watch for the
;; same file) will be removed from the event loop, otherwise the watch
;; will continue.  This is thread safe - any thread may add a watch,
;; and the callback will execute in the event loop thread.  The file
;; argument can be either a port or a file descriptor.  If 'file' is a
;; file descriptor, any port for the descriptor is not referenced for
;; garbage collection purposes - it must remain valid while operations
;; are carried out on the descriptor.  If 'file' is a buffered port,
;; buffering will be taken into account in indicating whether a write
;; can be made without blocking.
(define (event-loop-add-write-watch! el file proc)
  (event-post! el (lambda ()
		    (_write-files-set!
		     el
		     (cons file
			   (delete! file (_write-files-get el) _file-equal?)))
		    (_write-files-actions-set!
		     el
		     (acons file proc
			    (alist-delete! file (_write-files-actions-get el) _file-equal?))))))

;; The file argument may be a port or a file descriptor, and this
;; removes any read and write watch previously entered for that port
;; or file descriptor.  This is thread safe - any thread may remove a
;; watch.  A file descriptor and a port with the same underlying file
;; descriptor compare equal for the purposes of removal.
(define (event-loop-remove-watch! el file)
  (event-post! el (lambda ()
		    (_remove-watch-impl! el file))))

;; The 'action' callback is a thunk.  This is thread safe - any thread
;; may post an event (that is its main purpose), and the action
;; callback will execute in the event loop thread.  Actions execute in
;; the order in which they were posted.  If an event is posted from a
;; worker thread, it will normally be necessary to call
;; event-loop-block! beforehand.
(define (event-post! el action)
  (with-mutex (_mutex-get el)
    (enq! (_q-get el) action)
    (let ([out (_event-out-get el)])
      ;; if the event pipe is full and an EAGAIN error arises, we
      ;; can just swallow it.  The only purpose of writing #\x is to
      ;; cause the select procedure to return and reloop to pick up
      ;; any new entries in the event queue.
      (catch 'system-error
	(lambda ()
	  (write-char #\x out)
	  (force-output out))
	(lambda args
	  (unless (= EAGAIN (system-error-errno args))
	    (apply throw args)))))))

;; this adds a timeout to the event loop.  The timeout will repeat
;; unless and until the passed-in callback returns #f or
;; timeout-remove! is called.  The passed-in callback must be a thunk.
;; This procedure returns a tag symbol to which timeout-remove! can be
;; applied.  It may be called by any thread.
(define (timeout-post! el msecs action)
  (let ([tag (gensym "timeout-")]
	[abstime (_get-abstime msecs)])
    (event-post! el
		 (lambda ()
		   (let ([new-timeouts (cons (vector abstime
						     tag
						     msecs
						     action)
					     (_timeouts-get el))])
		     (_timeouts-set! el new-timeouts)
		     (_current-timeout-set! el (_next-timeout new-timeouts)))))
    tag))

;; this stops the timeout with the given tag from executing in the
;; event loop concerned.  It may be called by any thread.
(define (timeout-remove! el tag)
  (event-post! el
	       (lambda ()
		 (_timeouts-set! el (_filter-timeout! el tag)))))
      
;; by default, upon there being no more watches, timeouts and posted
;; events for an event loop, event-loop-run! will return, which is
;; normally what you want with a single threaded program.  However,
;; this is undesirable where a worker thread is intended to post an
;; event to the main loop after it has reached a result, say via
;; a-sync-run-task-in-thread, because the main loop may have ended
;; before it posts.  Passing #t to the val argument of this procedure
;; will prevent that from happening, so that the event loop can only
;; be ended by calling event-loop-quit!.  To switch it back to
;; non-blocking mode, pass #f.  This is thread safe - any thread may
;; call this procedure.
(define (event-loop-block! el val)
  (with-mutex (_mutex-get el)
    (let ([old-val (_block-get el)])
      (_block-set! el (not (not val)))
      (when (and old-val (not val))
	;; if the event pipe is full and an EAGAIN error arises, we
	;; can just swallow it.  The only purpose of writing #\x is to
	;; cause the select procedure to return and reloop and then
	;; exit the event loop if there are no further events.
	(let ([out (_event-out-get el)])
	  (catch 'system-error
	    (lambda ()
	      (write-char #\x out)
	      (force-output out))
	    (lambda args
	      (unless (= EAGAIN (system-error-errno args))
		(apply throw args)))))))))

;; Causes the event loop to unblock.  Any events remaining in the
;; event loop will be discarded.  New events may subsequently be added
;; after event-loop-run! has unblocked and event-loop-run! then called
;; for them.  This is thread safe - any thread may call this
;; procedure.
(define (event-loop-quit! el)
  (with-mutex (_mutex-get el)
    (_done-set! el #t)
    ;; if the event pipe is full and an EAGAIN error arises, we can
    ;; just swallow it.  The only purpose of writing #\x is to cause
    ;; the select procedure to return
    (let ([out (_event-out-get el)])
      (catch 'system-error
	(lambda ()
	  (write-char #\x out)
	  (force-output out))
	(lambda args
	  (unless (= EAGAIN (system-error-errno args))
	    (apply throw args)))))))

[-- Attachment #4: example.scm --]
[-- Type: text/x-scheme, Size: 3583 bytes --]

#!/usr/bin/env guile
!#

;; Copyright Chris Vine 2014 and 2016
;;
;; Permission is hereby granted, free of charge, to any person
;; obtaining a copy of this file (the "Software"), to deal in the
;; Software without restriction, including without limitation the
;; rights to use, copy, modify, merge, publish, distribute,
;; sublicense, and/or sell copies of the Software, and to permit
;; persons to whom the Software is furnished to do so, subject to the
;; following conditions:
;;
;; The above copyright notice and this permission notice shall be
;; included in all copies or substantial portions of the Software.
;;
;; THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
;; EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
;; MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
;; NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
;; BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
;; ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
;; CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
;; SOFTWARE.


;; pick up our local modules
(add-to-load-path (dirname (current-filename)))
(use-modules (event-loop) (coroutines) (ice-9 threads))

(define main-loop (make-event-loop))
(event-post! main-loop
            (lambda ()
              (a-sync (lambda (await resume)
                       (display "In waitable procedure\n")
                       ;; NOTE: we don't try to do it below, but you cannot run two
                       ;; or more asynchronous tasks concurrently using the
                       ;; await/resume technique with the same await-resume pair
                       ;; without extra work, because the first call to 'await'
                       ;; will match the first callback which happens to call
                       ;; 'resume', and so on.  In such cases, 'resume' would need
                       ;; to return something like a key-value pair so that the
                       ;; result can be correctly identified.

                       ;; launch asynchronous task: let's pretend its time
                       ;; consuming so we need to run it in a worker thread
                       ;; to avoid blocking any other events in the main loop
                       ;; (there aren't any in this example)
                       (a-sync-run-task-in-thread
                        (lambda ()
                          ;; do some work
                          (usleep 500000)
                          (display "In first async callback, work done\n")
                          ;; this is the result of our extensive computation
                          "Hello via async\n")
                        main-loop
                        resume)
                       (display "About to make first wait\n")
                       (display (string-append "Back in waitable procedure, and the callback says: " (await)))

                       ;; launch another asynchronous task, this time in the event loop thread
                       (event-post! main-loop (lambda ()
                                                (display "In second async callback\n")
                                                (event-loop-quit! main-loop)
                                                (resume)))
                       (display "About to make second wait\n")
                       (await)
                       (display "Quitting\n")))))

;; because we are running tasks in another thread
(event-loop-block! main-loop #t)

(event-loop-run! main-loop)

^ permalink raw reply	[flat|nested] 17+ messages in thread

end of thread, other threads:[~2016-03-01 20:39 UTC | newest]

Thread overview: 17+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2016-02-16 21:45 Potluck - thread safe event loop with await semantics Chris Vine
2016-02-22 12:01 ` Ludovic Courtès
2016-02-22 16:36   ` Marko Rauhamaa
2016-02-22 17:40   ` Chris Vine
2016-02-22 17:53     ` Thompson, David
2016-02-22 18:12       ` Chris Vine
2016-02-22 19:54         ` Christopher Allan Webber
2016-02-22 20:28     ` David Pirotte
2016-02-23  0:31       ` Chris Vine
2016-02-23  1:30         ` Chris Vine
2016-02-23 19:55           ` David Pirotte
2016-02-23  6:25         ` David Pirotte
2016-02-23 12:09           ` Chris Vine
2016-02-23 16:49             ` Chris Vine
2016-02-25 23:22             ` David Pirotte
2016-02-23  4:58     ` Chris Vine
2016-03-01 20:39     ` Amirouche Boubekki

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).