unofficial mirror of guile-user@gnu.org 
 help / color / mirror / Atom feed
* [WIP PATCH] (for fibers): wait-until-port-readable-operation / wait-until-port-writable-operation
@ 2021-02-14 16:20 Maxime Devos
  0 siblings, 0 replies; only message in thread
From: Maxime Devos @ 2021-02-14 16:20 UTC (permalink / raw)
  To: Andy Wingo; +Cc: guile-user


[-- Attachment #1.1: Type: text/plain, Size: 1038 bytes --]

Hi Guilers,

This is a (buggy) patch to guile-fibers that adds
a ‘wait-until-port-readable-operation’ and ‘wait-until-port-writable’
that can be used to wait until a port (backed by a fd) becomes readable
or writable respectively.

Now I seem to have hit a problem: the first few tests (more to be written
later) pass just fine, but the following test never completes:

  ;; XXX why does this hang?  Shouldn't there
  ;; be a time-out?
  (assert-run-fibers-returns (#f)
			     (writable/timeout A))

(writable/timeout is a combination of wait-until-port-writable and
sleep-operation, that also retries a few times to allow for spurious
wakeups.)

Oddly, the same test but with readable/timeout completes just fine.

Any thoughts?  (Warning: I don't know what I'm doing; I'm not
familiar with relevant literature on operations, I'm just trying
to complete wingo's proposal).

Wild guess: maybe schedule-task-when-fd-writable prevents the fiber
from being resumed by other causes?

Greetings,
Maxime.

[-- Attachment #1.2: 0001-WIP-Implement-operations-for-waiting-for-readability.patch --]
[-- Type: text/x-patch, Size: 10557 bytes --]

From b8ac732543661b5ee7e2f38482f824b638664bd2 Mon Sep 17 00:00:00 2001
From: Maxime Devos <maximedevos@telenet.be>
Date: Sun, 14 Feb 2021 17:04:02 +0100
Subject: [PATCH] (WIP) Implement operations for waiting for readability /
 writability

* fibers/io-wakeup.scm
  (wait-until-port-readable-operation)
  (wait-until-port-writable-operation): new operations.
  (writable?): helper procedure for
  'wait-until-port-writable-operation'
* tests/io-wakeup.scm: test 'fibers/io-wakeup.scm'
* Makefile.am
  (SOURCES): compile 'fibers/io-wakeup.scm'.
  (TESTS): run 'tests/io-wakeup.scm'.
---
 Makefile.am          |   2 +
 fibers/io-wakeup.scm |  97 ++++++++++++++++++++++++++++++
 tests/io-wakeup.scm  | 139 +++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 238 insertions(+)
 create mode 100644 fibers/io-wakeup.scm
 create mode 100644 tests/io-wakeup.scm

diff --git a/Makefile.am b/Makefile.am
index e2db57e..0134255 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -33,6 +33,7 @@ SOURCES = \
 	fibers/deque.scm \
 	fibers/epoll.scm \
 	fibers/interrupts.scm \
+	fibers/io-wakeup.scm \
 	fibers/nameset.scm \
 	fibers/operations.scm \
 	fibers/posix-clocks.scm \
@@ -67,6 +68,7 @@ TESTS = \
 	tests/conditions.scm \
 	tests/channels.scm \
 	tests/foreign.scm \
+	tests/io-wakeup.scm \
 	tests/parameters.scm \
 	tests/preemption.scm \
 	tests/speedup.scm
diff --git a/fibers/io-wakeup.scm b/fibers/io-wakeup.scm
new file mode 100644
index 0000000..6a91abe
--- /dev/null
+++ b/fibers/io-wakeup.scm
@@ -0,0 +1,97 @@
+;; Fibers: cooperative, event-driven user-space threads.
+
+;;;; Copyright (C) 2016,2021 Free Software Foundation, Inc.
+;;;; Copyright (C) 2021 Maxime Devos
+;;;; 
+;;;; This library is free software; you can redistribute it and/or
+;;;; modify it under the terms of the GNU Lesser General Public
+;;;; License as published by the Free Software Foundation; either
+;;;; version 3 of the License, or (at your option) any later version.
+;;;; 
+;;;; This library is distributed in the hope that it will be useful,
+;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+;;;; Lesser General Public License for more details.
+;;;; 
+;;;; You should have received a copy of the GNU Lesser General Public
+;;;; License along with this library; if not, write to the Free Software
+;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+;;;; 
+
+(define-module (fibers io-wakeup)
+  #:use-module (fibers scheduler)
+  #:use-module (fibers operations)
+  #:use-module (ice-9 atomic)
+  #:use-module (ice-9 match)
+  #:use-module (ice-9 threads)
+  #:export (wait-until-port-readable-operation
+	    wait-until-port-writable-operation))
+;; Make sure (ice-9 port) is loaded before
+;; trying to load (ice-9 port internal).
+(use-modules (ice-9 ports))
+(use-modules (ice-9 ports internal))
+
+(define *poll-sched* (make-atomic-box #f))
+
+(define (poll-sched)
+  (or (atomic-box-ref *poll-sched*)
+      (let ((sched (make-scheduler)))
+        (cond
+         ((atomic-box-compare-and-swap! *poll-sched* #f sched))
+         (else
+          ;; FIXME: Would be nice to clean up this thread at some point.
+          (call-with-new-thread
+           (lambda ()
+             (define (finished?) #f)
+             (run-scheduler sched finished?)))
+          sched)))))
+
+;; These procedure are subject to spurious wakeups.
+
+(define (wait-until-port-readable-operation port)
+  "Make an operation that will succeed when PORT is readable."
+  (make-base-operation #f
+                       (lambda ()
+                         (and (char-ready? port)
+                              values))
+                       (lambda (flag sched resume)
+                         (define (commit)
+                           (match (atomic-box-compare-and-swap! flag 'W 'S)
+                             ('W (resume values))
+                             ('C (commit))
+                             ('S #f)))
+                         (if sched
+                             (schedule-task-when-fd-readable
+                              sched (port-read-wait-fd port) commit)
+                             (schedule-task
+                              (poll-sched)
+                              (lambda ()
+                                (perform-operation (wait-until-port-readable-operation port))
+                                (commit)))))))
+
+(define (writable? port)
+  "Test if PORT is writable."
+  (match (select #() (vector port) #() 0)
+    ((#() #() #()) #f)
+    ((#() #(_) #()) #t)))
+
+(define (wait-until-port-writable-operation port)
+  "Make an operation that will succeed when PORT is writable."
+  (make-base-operation #f
+                       (lambda ()
+                         (and (writable? port)
+                              values))
+                       (lambda (flag sched resume)
+                         (define (commit)
+                           (match (atomic-box-compare-and-swap! flag 'W 'S)
+                             ('W (resume values))
+                             ('C (commit))
+                             ('S #f)))
+                         (if sched
+                             (schedule-task-when-fd-writable
+                              sched (port-write-wait-fd port) commit)
+                             (schedule-task
+                              (poll-sched)
+                              (lambda ()
+                                (perform-operation (wait-until-port-writable-operation port))
+                                (commit)))))))
diff --git a/tests/io-wakeup.scm b/tests/io-wakeup.scm
new file mode 100644
index 0000000..ea8babe
--- /dev/null
+++ b/tests/io-wakeup.scm
@@ -0,0 +1,139 @@
+;; Fibers: cooperative, event-driven user-space threads.
+
+;;;; Copyright (C) 2016 Free Software Foundation, Inc.
+;;;; Copyright (C) 2021 Maxime Devos
+;;;;
+;;;; This library is free software; you can redistribute it and/or
+;;;; modify it under the terms of the GNU Lesser General Public
+;;;; License as published by the Free Software Foundation; either
+;;;; version 3 of the License, or (at your option) any later version.
+;;;;
+;;;; This library is distributed in the hope that it will be useful,
+;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+;;;; Lesser General Public License for more details.
+;;;;
+;;;; You should have received a copy of the GNU Lesser General Public
+;;;; License along with this library; if not, write to the Free Software
+;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+;;;;
+
+(define-module (tests io-wakeup)
+  #:use-module (fibers)
+  #:use-module (fibers io-wakeup)
+  #:use-module (fibers operations)
+  #:use-module (fibers timers))
+
+(define failed? #f)
+
+(define-syntax-rule (assert-equal expected actual)
+  (let ((x expected))
+    (format #t "assert ~s equal to ~s: " 'actual x)
+    (force-output)
+    (let ((y actual))
+      (cond
+       ((equal? x y) (format #t "ok\n"))
+       (else
+        (format #t "no (got ~s)\n" y)
+        (set! failed? #t))))))
+
+(define-syntax-rule (assert-run-fibers-terminates exp)
+  (begin
+    (format #t "assert run-fibers on ~s terminates: " 'exp)
+    (force-output)
+    (let ((start (get-internal-real-time)))
+      (call-with-values (lambda () (run-fibers (lambda () exp)))
+        (lambda vals
+          (format #t "ok (~a s)\n" (/ (- (get-internal-real-time) start)
+                                      1.0 internal-time-units-per-second))
+          (apply values vals))))))
+
+(define-syntax-rule (assert-run-fibers-returns (expected ...) exp)
+  (begin
+    (call-with-values (lambda () (assert-run-fibers-terminates exp))
+      (lambda run-fiber-return-vals
+        (assert-equal '(expected ...) run-fiber-return-vals)))))
+
+(define* (with-timeout op #:key (seconds 0.05) (wrap values))
+  (choice-operation op
+                    (wrap-operation (sleep-operation seconds) wrap)))
+
+(define* (readable/timeout port #:key (allowed-spurious 5))
+  "Does waiting for readability not time-out?
+Allow @var{allowed-spurious} spurious wakeups."
+  (and (perform-operation
+	(with-timeout
+	 (wrap-operation (wait-until-port-readable-operation port)
+			 (lambda () #t))
+	 #:wrap (lambda () #f)))
+       (or (<= (pk 'spurious/readable allowed-spurious) 0)
+	   (readable/timeout port #:allowed-spurious
+			     (- allowed-spurious 1)))))
+
+(define* (writable/timeout port #:key (allowed-spurious 5))
+  "Does waiting for readability not time-out?
+Allow @var{allowed-spurious} spurious wakeups."
+  (and (perform-operation
+       (with-timeout
+	(wrap-operation (wait-until-port-writable-operation port)
+			(lambda () #t))
+	#:wrap (lambda () #f)))
+       (or (<= (pk 'spurious/writable allowed-spurious) 0)
+	   (writable/timeout port #:allowed-spurious
+			     (- allowed-spurious 1)))))
+
+;; Tests:
+;;  * wait-until-port-readable-operaton / wait-until-port-writable-operation
+;;    blocks if the port isn't ready for input / output.
+;;
+;;    This is tested with a pipe (read & write)
+;;    and a listening socket (read, or accept in this case).
+;;
+;;    Due to the possibility of spurious wakeups,
+;;    a limited few spurious wakeups are tolerated.
+;;
+;;  * these operations succeed if the port is ready for input / output.
+;;
+;;    These are again tested with a pipe and a listening socket
+;;
+;; Blocking is detected with a small time-out.
+
+(define (make-listening-socket)
+  (let ((server (socket PF_INET SOCK_DGRAM 0)))
+    (bind server AF_INET INADDR_LOOPBACK 0)
+    server))
+
+(let ((s (make-listening-socket)))
+  (assert-run-fibers-returns (#f)
+			     (readable/timeout s))
+  (assert-equal #f (readable/timeout s))
+  (close s))
+
+(define-syntax-rule (with-pipes (A B) exp exp* ...)
+  (let* ((pipes (pipe))
+	 (A (car pipes))
+	 (B (cdr pipes)))
+    exp exp* ...
+    (close A)
+    (close B)))
+
+(with-pipes (A B)
+  (setvbuf A 'none)
+  (setvbuf B 'none)
+  (assert-run-fibers-returns (#f)
+			     (readable/timeout A))
+  (assert-equal #f (readable/timeout A))
+
+  ;; XXX why does this hang?  Shouldn't there
+  ;; be a time-out?
+  (assert-run-fibers-returns (#f)
+			     (writable/timeout A))
+  (assert-equal #f (writable/timeout A))
+  
+  )
+
+(exit (if failed? 1 0))
+
+;; Local Variables:
+;; eval: (put 'with-pipes 'scheme-indent-function 1)
+;; End:
-- 
2.30.0


[-- Attachment #2: This is a digitally signed message part --]
[-- Type: application/pgp-signature, Size: 260 bytes --]

^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2021-02-14 16:20 UTC | newest]

Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-02-14 16:20 [WIP PATCH] (for fibers): wait-until-port-readable-operation / wait-until-port-writable-operation Maxime Devos

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).