From b8ac732543661b5ee7e2f38482f824b638664bd2 Mon Sep 17 00:00:00 2001 From: Maxime Devos Date: Sun, 14 Feb 2021 17:04:02 +0100 Subject: [PATCH] (WIP) Implement operations for waiting for readability / writability * fibers/io-wakeup.scm (wait-until-port-readable-operation) (wait-until-port-writable-operation): new operations. (writable?): helper procedure for 'wait-until-port-writable-operation' * tests/io-wakeup.scm: test 'fibers/io-wakeup.scm' * Makefile.am (SOURCES): compile 'fibers/io-wakeup.scm'. (TESTS): run 'tests/io-wakeup.scm'. --- Makefile.am | 2 + fibers/io-wakeup.scm | 97 ++++++++++++++++++++++++++++++ tests/io-wakeup.scm | 139 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 238 insertions(+) create mode 100644 fibers/io-wakeup.scm create mode 100644 tests/io-wakeup.scm diff --git a/Makefile.am b/Makefile.am index e2db57e..0134255 100644 --- a/Makefile.am +++ b/Makefile.am @@ -33,6 +33,7 @@ SOURCES = \ fibers/deque.scm \ fibers/epoll.scm \ fibers/interrupts.scm \ + fibers/io-wakeup.scm \ fibers/nameset.scm \ fibers/operations.scm \ fibers/posix-clocks.scm \ @@ -67,6 +68,7 @@ TESTS = \ tests/conditions.scm \ tests/channels.scm \ tests/foreign.scm \ + tests/io-wakeup.scm \ tests/parameters.scm \ tests/preemption.scm \ tests/speedup.scm diff --git a/fibers/io-wakeup.scm b/fibers/io-wakeup.scm new file mode 100644 index 0000000..6a91abe --- /dev/null +++ b/fibers/io-wakeup.scm @@ -0,0 +1,97 @@ +;; Fibers: cooperative, event-driven user-space threads. + +;;;; Copyright (C) 2016,2021 Free Software Foundation, Inc. +;;;; Copyright (C) 2021 Maxime Devos +;;;; +;;;; This library is free software; you can redistribute it and/or +;;;; modify it under the terms of the GNU Lesser General Public +;;;; License as published by the Free Software Foundation; either +;;;; version 3 of the License, or (at your option) any later version. +;;;; +;;;; This library is distributed in the hope that it will be useful, +;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;;; Lesser General Public License for more details. +;;;; +;;;; You should have received a copy of the GNU Lesser General Public +;;;; License along with this library; if not, write to the Free Software +;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +;;;; + +(define-module (fibers io-wakeup) + #:use-module (fibers scheduler) + #:use-module (fibers operations) + #:use-module (ice-9 atomic) + #:use-module (ice-9 match) + #:use-module (ice-9 threads) + #:export (wait-until-port-readable-operation + wait-until-port-writable-operation)) +;; Make sure (ice-9 port) is loaded before +;; trying to load (ice-9 port internal). +(use-modules (ice-9 ports)) +(use-modules (ice-9 ports internal)) + +(define *poll-sched* (make-atomic-box #f)) + +(define (poll-sched) + (or (atomic-box-ref *poll-sched*) + (let ((sched (make-scheduler))) + (cond + ((atomic-box-compare-and-swap! *poll-sched* #f sched)) + (else + ;; FIXME: Would be nice to clean up this thread at some point. + (call-with-new-thread + (lambda () + (define (finished?) #f) + (run-scheduler sched finished?))) + sched))))) + +;; These procedure are subject to spurious wakeups. + +(define (wait-until-port-readable-operation port) + "Make an operation that will succeed when PORT is readable." + (make-base-operation #f + (lambda () + (and (char-ready? port) + values)) + (lambda (flag sched resume) + (define (commit) + (match (atomic-box-compare-and-swap! flag 'W 'S) + ('W (resume values)) + ('C (commit)) + ('S #f))) + (if sched + (schedule-task-when-fd-readable + sched (port-read-wait-fd port) commit) + (schedule-task + (poll-sched) + (lambda () + (perform-operation (wait-until-port-readable-operation port)) + (commit))))))) + +(define (writable? port) + "Test if PORT is writable." + (match (select #() (vector port) #() 0) + ((#() #() #()) #f) + ((#() #(_) #()) #t))) + +(define (wait-until-port-writable-operation port) + "Make an operation that will succeed when PORT is writable." + (make-base-operation #f + (lambda () + (and (writable? port) + values)) + (lambda (flag sched resume) + (define (commit) + (match (atomic-box-compare-and-swap! flag 'W 'S) + ('W (resume values)) + ('C (commit)) + ('S #f))) + (if sched + (schedule-task-when-fd-writable + sched (port-write-wait-fd port) commit) + (schedule-task + (poll-sched) + (lambda () + (perform-operation (wait-until-port-writable-operation port)) + (commit))))))) diff --git a/tests/io-wakeup.scm b/tests/io-wakeup.scm new file mode 100644 index 0000000..ea8babe --- /dev/null +++ b/tests/io-wakeup.scm @@ -0,0 +1,139 @@ +;; Fibers: cooperative, event-driven user-space threads. + +;;;; Copyright (C) 2016 Free Software Foundation, Inc. +;;;; Copyright (C) 2021 Maxime Devos +;;;; +;;;; This library is free software; you can redistribute it and/or +;;;; modify it under the terms of the GNU Lesser General Public +;;;; License as published by the Free Software Foundation; either +;;;; version 3 of the License, or (at your option) any later version. +;;;; +;;;; This library is distributed in the hope that it will be useful, +;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;;;; Lesser General Public License for more details. +;;;; +;;;; You should have received a copy of the GNU Lesser General Public +;;;; License along with this library; if not, write to the Free Software +;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +;;;; + +(define-module (tests io-wakeup) + #:use-module (fibers) + #:use-module (fibers io-wakeup) + #:use-module (fibers operations) + #:use-module (fibers timers)) + +(define failed? #f) + +(define-syntax-rule (assert-equal expected actual) + (let ((x expected)) + (format #t "assert ~s equal to ~s: " 'actual x) + (force-output) + (let ((y actual)) + (cond + ((equal? x y) (format #t "ok\n")) + (else + (format #t "no (got ~s)\n" y) + (set! failed? #t)))))) + +(define-syntax-rule (assert-run-fibers-terminates exp) + (begin + (format #t "assert run-fibers on ~s terminates: " 'exp) + (force-output) + (let ((start (get-internal-real-time))) + (call-with-values (lambda () (run-fibers (lambda () exp))) + (lambda vals + (format #t "ok (~a s)\n" (/ (- (get-internal-real-time) start) + 1.0 internal-time-units-per-second)) + (apply values vals)))))) + +(define-syntax-rule (assert-run-fibers-returns (expected ...) exp) + (begin + (call-with-values (lambda () (assert-run-fibers-terminates exp)) + (lambda run-fiber-return-vals + (assert-equal '(expected ...) run-fiber-return-vals))))) + +(define* (with-timeout op #:key (seconds 0.05) (wrap values)) + (choice-operation op + (wrap-operation (sleep-operation seconds) wrap))) + +(define* (readable/timeout port #:key (allowed-spurious 5)) + "Does waiting for readability not time-out? +Allow @var{allowed-spurious} spurious wakeups." + (and (perform-operation + (with-timeout + (wrap-operation (wait-until-port-readable-operation port) + (lambda () #t)) + #:wrap (lambda () #f))) + (or (<= (pk 'spurious/readable allowed-spurious) 0) + (readable/timeout port #:allowed-spurious + (- allowed-spurious 1))))) + +(define* (writable/timeout port #:key (allowed-spurious 5)) + "Does waiting for readability not time-out? +Allow @var{allowed-spurious} spurious wakeups." + (and (perform-operation + (with-timeout + (wrap-operation (wait-until-port-writable-operation port) + (lambda () #t)) + #:wrap (lambda () #f))) + (or (<= (pk 'spurious/writable allowed-spurious) 0) + (writable/timeout port #:allowed-spurious + (- allowed-spurious 1))))) + +;; Tests: +;; * wait-until-port-readable-operaton / wait-until-port-writable-operation +;; blocks if the port isn't ready for input / output. +;; +;; This is tested with a pipe (read & write) +;; and a listening socket (read, or accept in this case). +;; +;; Due to the possibility of spurious wakeups, +;; a limited few spurious wakeups are tolerated. +;; +;; * these operations succeed if the port is ready for input / output. +;; +;; These are again tested with a pipe and a listening socket +;; +;; Blocking is detected with a small time-out. + +(define (make-listening-socket) + (let ((server (socket PF_INET SOCK_DGRAM 0))) + (bind server AF_INET INADDR_LOOPBACK 0) + server)) + +(let ((s (make-listening-socket))) + (assert-run-fibers-returns (#f) + (readable/timeout s)) + (assert-equal #f (readable/timeout s)) + (close s)) + +(define-syntax-rule (with-pipes (A B) exp exp* ...) + (let* ((pipes (pipe)) + (A (car pipes)) + (B (cdr pipes))) + exp exp* ... + (close A) + (close B))) + +(with-pipes (A B) + (setvbuf A 'none) + (setvbuf B 'none) + (assert-run-fibers-returns (#f) + (readable/timeout A)) + (assert-equal #f (readable/timeout A)) + + ;; XXX why does this hang? Shouldn't there + ;; be a time-out? + (assert-run-fibers-returns (#f) + (writable/timeout A)) + (assert-equal #f (writable/timeout A)) + + ) + +(exit (if failed? 1 0)) + +;; Local Variables: +;; eval: (put 'with-pipes 'scheme-indent-function 1) +;; End: -- 2.30.0