unofficial mirror of guile-user@gnu.org 
 help / color / mirror / Atom feed
* Yet Another Event Loop for Guile
@ 2016-03-02 19:41 Amirouche Boubekki
  0 siblings, 0 replies; only message in thread
From: Amirouche Boubekki @ 2016-03-02 19:41 UTC (permalink / raw
  To: Guile User

[-- Attachment #1: Type: text/plain, Size: 4604 bytes --]

Héllo,


A while back I've written an event loop trying to understand how both
asyncio and 8sync work. The result of this work is attached to this
email.

The result of this study is much shorter than any other event loop,
so it might be the easiest way to jump into the train to learn the 
basics
of event loop mish mash.

It only implements receiving and sending bytes without blocking using
select.

It doesn't support the following:

- sleep a certain amount of time
- call-when-idle
- call later with a time
- any kind of multi-thread operation

It's only a study draft implementation of an event loop and as matter
of fact I'm not sure which procedure is blocking which is not. So,
I've only implemented asynchronous variants of recv and send. There is
also an implementation of write and read but I think those are
blocking anyway.

Now some code. The client is implemented like so:

```
(define (client)
   (define socket (make-client-socket 12345))
   (write/ (list 0 "héllo") socket)
   (write/ (list 42 "world") socket)
   (close socket)
   (loop-stop!))
```

Look there is no callback, but it's asynchronous! How?

Here ``write/`` is asynchronous variant of ``write`` which doesn't
block.  Instead of actually calling write. It register a procedure
(with a continuation) to be called when the socket is ready.

It's implemented with the following code:

```
(define-public (write/ message socket)
   (abort-to-prompt 'loop  ;; Abort to the event loop
                    (lambda (cc)  ;; [*] And call this with the 
continuation `cc`
                      (loop-add-writer socket ;; register this callback 
to be called
                                              ;; when the socket is 
ready.
                                       ;; [#] When the socket is ready 
write message
                                       ;; and return using continuation 
`cc`
                                       ;; given by `abort-to-prompt`
                                       (lambda () (cc (write message 
socket)))))))
```

Before looking at the core of the event loop, let's look
at the outer procedure, that makes the event loop run forever:

```
(define-public (loop-run-forever)
   (let* ((loop (fluid-ref *loop*)))
     (loop-running! loop #true)
     (while (loop-running loop)
       (call-with-prompt 'loop
         loop-run-once
         ;; This is the prompt handler which calls the above lambda with 
[*] mark
         (lambda (cc callback) (callback cc))))))
```

When `write/` abort to the prompt `'loop`, it execute the handler which
only pass the continuation the the callback registred by `write/` which
register a procedure to be called when the socket is ready. And run 
again
the same loop, which probably does nothing because the socket is not 
ready
yet. Otherwise said, just after `write/` does register it's callback, 
the
event loop takes back the responsability to do what the program is meant
to do.

The procedure `loop-run-once` has the responsibility to run the 
callbacks
registered against select using `loop-add-writer`:

```
(define (loop-run-once)
   (let ((loop (fluid-ref *loop*))
         (readers (hash-table-keys (loop-readers loop)))
         (writers (hash-table-keys (loop-writers loop))))
     ;; first select ready ports
     (match (select readers writers '() 0)
       ((to-read to-write _)
        (for-each call-read-callback to-read)
        (for-each call-write-callback to-write)))
     ;; execute tasks
     (while (not (empty? (loop-tasks loop)))
       ((pop! (loop-tasks loop))))))
```

Here `call-write-callback` will *only* call the lambda marked [#] at
some point and remove it from the waiting procedures. Once [#] is
finished it returns to where it was registred ie. where `write/` is 
called.
And the event loop basically loose the control over the flow of the 
program
until the next async call or the *calling* procedures returns.

The server looks very similar. It's not event a echo server. Anyway,
here is it's code for symmetry sake:

```
(define (server)
   (let* ((sock (make-server-socket 12345))
          (client (car (accept/ sock))))
     (pk (read/ client))
     (pk (read/ client))
     (close client)
     (loop-stop!)))
```

There is two obvious things for me to take away in this implementation:

- The underlying event loop as a clear visible interface (which is not
   presented in the above)
- There is *no future*, prompt replace the use of futures done in 
asyncio
- There is no need for coroutines of any kind, since 
prompts/continuations
   do the job as-is


HTH,


Amirouche ~ amz3 ~ http://www.hyperdev.fr

[-- Attachment #2: async.scm --]
[-- Type: text/plain, Size: 6447 bytes --]

;; Copyright (C) 2015 Amirouche Boubekki <amirouche@hypermove.net>

;; This library is free software; you can redistribute it and/or
;; modify it under the terms of the GNU Lesser General Public
;; License as published by the Free Software Foundation; either
;; version 3 of the License, or (at your option) any later version.
;;
;; This library is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
;; Lesser General Public License for more details.
;;
;; You should have received a copy of the GNU Lesser General Public
;; License along with this library; if not, write to the Free Software
;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
;; 02110-1301 USA
(define-module (async))

(use-modules (rnrs bytevectors))
(use-modules (srfi srfi-69)) ;; hash-table

(use-modules (ice-9 q))
(use-modules (ice-9 match))
(use-modules (ice-9 rdelim))
(use-modules (ice-9 binary-ports))

(use-modules (srfi srfi-9))


;; srfi-queue :o)
(define make-queue make-q)
(define push! enq!)
(define pop! deq!)
(define empty? q-empty?)


(define-record-type <loop>
  (make-loop running tasks readers writers)
  loop?
  (running loop-running loop-running!)
  (tasks loop-tasks loop-tasks!)
  (readers loop-readers loop-readers!)
  (writers loop-writers loop-writers!))


(define loop (make-loop #false (make-queue) (make-hash-table) (make-hash-table)))

(define-public *loop* (make-fluid loop))

(define-public (loop-call-later thunk) ;; support delay
  (let ((loop (fluid-ref *loop*)))
    (push! (loop-tasks loop) thunk)
    #true))
  
(define (loop-add-reader port callback)
  (let* ((loop (fluid-ref *loop*))
         (readers (loop-readers loop))
         (queue (hash-table-ref readers port make-queue)))
    (push! queue callback)
    (hash-table-set! readers port queue)))

(define (loop-add-writer port callback)
  (let* ((loop (fluid-ref *loop*))
         (writers (loop-writers loop))
         (queue (hash-table-ref writers port make-queue)))
    (push! queue callback)
    (hash-table-set! writers port queue)))

(define (call-read-callback port)
  (let* ((loop (fluid-ref *loop*))
         (queue (hash-table-ref (loop-readers loop) port)))
    (when (and queue (not (empty? queue)))
      (let ((callback (pop! queue)))
      (when (empty? queue)
        (hash-table-delete! (loop-readers loop) port))
      (callback)))))

(define (call-write-callback port)
  (let* ((loop (fluid-ref *loop*))
         (queue (hash-table-ref (loop-writers loop) port)))
    (when (and queue (not (empty? queue)))
      (let ((callback (pop! queue)))
        (when (empty? queue)
          (hash-table-delete! (loop-writers loop) port))
        (callback)))))

(define (loop-run-once)
  (let ((loop (fluid-ref *loop*))
        (readers (hash-table-keys (loop-readers loop)))
        (writers (hash-table-keys (loop-writers loop))))
    ;; first select ready ports
    (match (select readers writers '() 0) ;; FIXME: replace 0 with time for next task
      ((to-read to-write _)
       (for-each call-read-callback to-read)
       (for-each call-write-callback to-write)))
    ;; execute tasks
    (while (not (empty? (loop-tasks loop)))
      ((pop! (loop-tasks loop))))))

(define-public (loop-run-forever)
  (let* ((loop (fluid-ref *loop*)))
    (loop-running! loop #true)
    (while (loop-running loop)
      (call-with-prompt 'loop
        loop-run-once
        (lambda (cc callback) (callback cc))))))

(define-public (loop-running?)
  (let ((loop (fluid-ref *loop*)))
    (loop-running loop)))

(define-public (loop-stop!)
  (let ((loop (fluid-ref *loop*)))
    (loop-running! loop #false)))

;; non blocking accept

(define-public (accept/ port)
  (abort-to-prompt 'loop (lambda (cc) (loop-add-reader port (lambda () (cc (accept port)))))))

;;; recv/ and send/

;; recv/

(define (%recv/ port)
  (let next ((out '()))
    (if (char-ready? port)
        (let ((byte (get-u8 port)))
          (if (eof-object? byte)
              (reverse out)
              (next (cons byte out))))
        (reverse out))))

(define-public (recv/ port)
  (abort-to-prompt 'loop (lambda (cc) (loop-add-reader port (lambda () (cc (%recv/ port)))))))

(define (bv-drop bv count)
  (u8-list->bytevector (list-tail (bytevector->u8-list bv) count)))

;; send/

(define (%send/ port message cc)
  (let loop ((message message))
    (let* ((count (send port message))
           (message (bv-drop message count)))
      (if (eq? (bytevector-length message) 0)
          (cc)
          (loop-add-writer port (lambda () (loop message)))))))

(define-public (send/ port message)
  (abort-to-prompt 'loop (lambda (cc) (loop-add-writer port (lambda () (%send/ port message cc))))))

;; XXX: write replacement for those blocking procedures

(define-public (read/ socket)
  (abort-to-prompt 'loop (lambda (cc) (loop-add-reader socket (lambda () (cc (read socket)))))))

(define-public (write/ message socket)
  (abort-to-prompt 'loop  ;; Abort to the event loop 
                   (lambda (cc)  ;; and call this with the continuation `cc`
                     (loop-add-writer socket ;; register this callback to be called
                                             ;; when the socket is ready.
                                      ;; When the socket is ready write message
                                      ;; and return the using continuation cc
                                      ;; given by the abort
                                      (lambda () (cc (write message socket)))))))

;; (define-public (display/ message socket)
;;   (abort-to-prompt 'loop (lambda (cc) (loop-add-writer socket (lambda () (cc (display message socket)))))))

;; (define-public (read-line/ socket)
;;   (abort-to-prompt 'loop (lambda (cc) (loop-add-writer socket (lambda () (cc (read-line socket)))))))

;;; basic TCP sockets

(define make-socket socket)

(define-public (make-client-socket port)
  (let ((socket (make-socket PF_INET SOCK_STREAM 0)))
    (connect socket AF_INET INADDR_LOOPBACK port)
    (fcntl socket F_SETFL (logior O_NONBLOCK (fcntl socket F_GETFL)))
    socket))

(define-public (make-server-socket port)
  (let ((socket (make-socket PF_INET SOCK_STREAM 0)))
    (bind socket (make-socket-address AF_INET INADDR_ANY port))
    (listen socket 128)
    (fcntl socket F_SETFL (logior O_NONBLOCK (fcntl socket F_GETFL)))
    socket))

[-- Attachment #3: server.scm --]
[-- Type: text/plain, Size: 377 bytes --]

(define-module (server))

(use-modules (rnrs bytevectors))

(use-modules (async))


(setlocale LC_ALL "")

;; very simple server socket helper

(define (server)
  (let* ((sock (make-server-socket 12345))
         (client (car (accept/ sock))))
    (pk (read/ client))
    (pk (read/ client))
    (close client)
    (loop-stop!)))


(loop-call-later server)

(loop-run-forever)

[-- Attachment #4: client.scm --]
[-- Type: text/plain, Size: 477 bytes --]

(define-module (client))

(use-modules (rnrs bytevectors))

(use-modules (async))


;; (setlocale LC_ALL "")


(define (scm->string scm)
  (call-with-output-string
    (lambda (port)
      (write scm port))))

(define scm->bv (compose string->utf8 scm->string))  

(define (client)
  (define socket (make-client-socket 12345))
  (write/ (list 0 "héllo") socket)
  (write/ (list 42 "world") socket)
  (close socket)
  (loop-stop!))

(loop-call-later client)
(loop-run-forever)

^ permalink raw reply	[flat|nested] only message in thread

only message in thread, other threads:[~2016-03-02 19:41 UTC | newest]

Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2016-03-02 19:41 Yet Another Event Loop for Guile Amirouche Boubekki

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).