From d351c0a5ecde62e63368bec0e1f15108495a1a71 Mon Sep 17 00:00:00 2001 From: Rutger van Beusekom Date: Mon, 2 Mar 2020 10:38:57 +0100 Subject: [PATCH] Add pipeline procedure. * libguile/posix.c (scm_open_process): Remove. (scm_piped_process): Add to replace open_process. * module/ice-9/popen.scm (pipe->fdes): Add to convert pipe pair to fdes pair. (open-process): Add open-process for backwards compatibility. (pipeline): Add to implement a pipeline using piped-process. --- doc/ref/posix.texi | 27 +++++++++++++++ libguile/posix.c | 66 ++++++++++--------------------------- module/ice-9/popen.scm | 46 +++++++++++++++++++++++++- test-suite/tests/popen.test | 37 ++++++++++++++++++++- 4 files changed, 126 insertions(+), 50 deletions(-) diff --git a/doc/ref/posix.texi b/doc/ref/posix.texi index 2c85f803a..d10f6531e 100644 --- a/doc/ref/posix.texi +++ b/doc/ref/posix.texi @@ -2370,6 +2370,33 @@ processes, and a system-wide limit on the number of pipes, so pipes should be closed explicitly when no longer needed, rather than letting the garbage collector pick them up at some later time. +@findex pipeline +@deffn (Scheme Procedure) pipeline commands +Execute a @code{pipeline} of @var{commands} -- where each command is a +list of a program and its arguments as strings -- returning an input +port to the end of the pipeline, an output port to the beginning of the +pipeline and a list of PIDs of the processes executing the @var{commands}. + +@example +(let ((commands '(("git" "ls-files") + ("tar" "-cf-" "-T-") + ("sha1sum" "-"))) + (pipe-fail? (compose not + zero? + status:exit-val + cdr + waitpid))) + (receive (from to pids) (pipeline commands) + (let* ((sha1 (read-delimited " " from)) + (index (list-index pipe-fail? (reverse pids)))) + (close to) + (close from) + (if (not index) sha1 + (string-append "pipeline failed in command: " + (string-join (list-ref commands index))))))) +@result{} "52f99d234503fca8c84ef94b1005a3a28d8b3bc1" +@end example +@end deffn @node Networking @subsection Networking diff --git a/libguile/posix.c b/libguile/posix.c index a1520abc4..dac4197e9 100644 --- a/libguile/posix.c +++ b/libguile/posix.c @@ -1368,10 +1368,9 @@ start_child (const char *exec_file, char **exec_argv, #ifdef HAVE_START_CHILD static SCM -scm_open_process (SCM mode, SCM prog, SCM args) -#define FUNC_NAME "open-process" +scm_piped_process (SCM prog, SCM args, SCM from, SCM to) +#define FUNC_NAME "piped-process" { - long mode_bits; int reading, writing; int c2p[2]; /* Child to parent. */ int p2c[2]; /* Parent to child. */ @@ -1379,44 +1378,27 @@ scm_open_process (SCM mode, SCM prog, SCM args) int pid; char *exec_file; char **exec_argv; - SCM read_port = SCM_BOOL_F, write_port = SCM_BOOL_F; exec_file = scm_to_locale_string (prog); exec_argv = scm_i_allocate_string_pointers (scm_cons (prog, args)); - mode_bits = scm_i_mode_bits (mode); - reading = mode_bits & SCM_RDNG; - writing = mode_bits & SCM_WRTNG; + reading = scm_is_pair (from); + writing = scm_is_pair (to); if (reading) { - if (pipe (c2p)) - { - int errno_save = errno; - free (exec_file); - errno = errno_save; - SCM_SYSERROR; - } + c2p[0] = scm_to_int (scm_car (from)); + c2p[1] = scm_to_int (scm_cdr (from)); out = c2p[1]; } - + if (writing) { - if (pipe (p2c)) - { - int errno_save = errno; - free (exec_file); - if (reading) - { - close (c2p[0]); - close (c2p[1]); - } - errno = errno_save; - SCM_SYSERROR; - } + p2c[0] = scm_to_int (scm_car (to)); + p2c[1] = scm_to_int (scm_cdr (to)); in = p2c[0]; } - + { SCM port; @@ -1449,23 +1431,12 @@ scm_open_process (SCM mode, SCM prog, SCM args) SCM_SYSERROR; } - /* There is no sense in catching errors on close(). */ if (reading) - { - close (c2p[1]); - read_port = scm_i_fdes_to_port (c2p[0], scm_mode_bits ("r0"), - sym_read_pipe, - SCM_FPORT_OPTION_NOT_SEEKABLE); - } + close (c2p[1]); if (writing) - { - close (p2c[0]); - write_port = scm_i_fdes_to_port (p2c[1], scm_mode_bits ("w0"), - sym_write_pipe, - SCM_FPORT_OPTION_NOT_SEEKABLE); - } + close (p2c[0]); - return scm_values_3 (read_port, write_port, scm_from_int (pid)); + return scm_from_int (pid); } #undef FUNC_NAME @@ -1510,8 +1481,8 @@ SCM_DEFINE (scm_system_star, "system*", 0, 0, 1, "Example: (system* \"echo\" \"foo\" \"bar\")") #define FUNC_NAME s_scm_system_star { - SCM prog, res; - int pid, status, wait_result; + SCM prog, pid; + int status, wait_result; if (scm_is_null (args)) SCM_WRONG_NUM_ARGS (); @@ -1529,9 +1500,8 @@ SCM_DEFINE (scm_system_star, "system*", 0, 0, 1, SCM_UNDEFINED); #endif - res = scm_open_process (scm_nullstr, prog, args); - pid = scm_to_int (scm_c_value_ref (res, 2)); - SCM_SYSCALL (wait_result = waitpid (pid, &status, 0)); + pid = scm_piped_process (prog, args, SCM_UNDEFINED, SCM_UNDEFINED); + SCM_SYSCALL (wait_result = waitpid (scm_to_int (pid), &status, 0)); if (wait_result == -1) SCM_SYSERROR; @@ -2371,7 +2341,7 @@ SCM_DEFINE (scm_gethostname, "gethostname", 0, 0, 0, static void scm_init_popen (void) { - scm_c_define_gsubr ("open-process", 2, 0, 1, scm_open_process); + scm_c_define_gsubr ("piped-process", 2, 2, 0, scm_piped_process); } #endif /* HAVE_START_CHILD */ diff --git a/module/ice-9/popen.scm b/module/ice-9/popen.scm index 2afe45701..5ab93f275 100644 --- a/module/ice-9/popen.scm +++ b/module/ice-9/popen.scm @@ -22,9 +22,10 @@ #:use-module (rnrs bytevectors) #:use-module (ice-9 binary-ports) #:use-module (ice-9 threads) + #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) #:export (port/pid-table open-pipe* open-pipe close-pipe open-input-pipe - open-output-pipe open-input-output-pipe)) + open-output-pipe open-input-output-pipe pipeline)) (eval-when (expand load eval) (load-extension (string-append "libguile-" (effective-version)) @@ -84,6 +85,28 @@ (define port/pid-table (make-weak-key-hash-table)) (define port/pid-table-mutex (make-mutex)) +(define (pipe->fdes) + (let ((p (pipe))) + (cons (port->fdes (car p)) + (port->fdes (cdr p))))) + +(define (open-process mode command . args) + "Backwards compatible implementation of the former procedure in +libguile/posix.c (scm_open_process) replaced by +scm_piped_process. Executes the program @var{command} with optional +arguments @var{args} (all strings) in a subprocess. A port to the +process (based on pipes) is created and returned. @var{mode} specifies +whether an input, an output or an input-output port to the process is +created: it should be the value of @code{OPEN_READ}, @code{OPEN_WRITE} +or @code{OPEN_BOTH}." + (let* ((from (and (or (string=? mode OPEN_READ) + (string=? mode OPEN_BOTH)) (pipe->fdes))) + (to (and (or (string=? mode OPEN_WRITE) + (string=? mode OPEN_BOTH)) (pipe->fdes))) + (pid (piped-process command args from to))) + (values (and from (fdes->inport (car from))) + (and to (fdes->outport (cdr to))) pid))) + (define (open-pipe* mode command . args) "Executes the program @var{command} with optional arguments @var{args} (all strings) in a subprocess. @@ -176,3 +199,24 @@ information on how to interpret this value." "Equivalent to @code{open-pipe} with mode @code{OPEN_BOTH}" (open-pipe command OPEN_BOTH)) +(define (pipeline commands) + "Execute a pipeline of @var(commands) -- where each command is a list of a +program and its arguments as strings -- returning an input port to the +end of the pipeline, an output port to the beginning of the pipeline and +a list of PIDs of the processes executing the @var(commands)." + (let* ((to (pipe->fdes)) + (pipes (map (lambda _ (pipe->fdes)) commands)) + (pipeline (fold (lambda (from proc prev) + (let* ((to (car prev)) + (pids (cdr prev)) + (pid (piped-process (car proc) + (cdr proc) + from + to))) + (cons from (cons pid pids)))) + `(,to) + pipes + commands)) + (from (car pipeline)) + (pids (cdr pipeline))) + (values (fdes->inport (car from)) (fdes->outport (cdr to)) pids))) diff --git a/test-suite/tests/popen.test b/test-suite/tests/popen.test index 2c0877484..c780de9a7 100644 --- a/test-suite/tests/popen.test +++ b/test-suite/tests/popen.test @@ -17,7 +17,10 @@ ;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA (define-module (test-suite test-ice-9-popen) - #:use-module (test-suite lib)) + #:use-module (test-suite lib) + #:use-module (ice-9 receive) + #:use-module (ice-9 rdelim)) + ;; read from PORT until eof is reached, return what's read as a string (define (read-string-to-eof port) @@ -211,3 +214,35 @@ exec 2>~a; read REPLY" (let ((st (close-pipe (open-output-pipe "exit 1")))) (and (status:exit-val st) (= 1 (status:exit-val st))))))) + + +;; +;; pipeline related tests +;; + +(pass-if "open-process" + (receive (from to pid) + ((@@ (ice-9 popen) open-process) OPEN_BOTH "rev") + (display "dlrow olleh" to) (close to) + (and (equal? "hello world" (read-string from)) + (= 0 (status:exit-val (cdr (waitpid pid))))))) + +(pass-if "piped-process" + (= 42 (status:exit-val + (cdr (waitpid ((@@ (ice-9 popen) piped-process) + "./meta/guile" '("-c" "(exit 42)"))))))) + +(pass-if "piped-process: with output" + (let* ((p (pipe)) + (pid ((@@ (ice-9 popen) piped-process) "echo" '("foo" "bar") + (cons (port->fdes (car p)) + (port->fdes (cdr p)))))) + + (and (equal? "foo bar\n" (read-string (car p))) + (= 0 (status:exit-val (cdr (waitpid pid))))))) + +(pass-if "pipeline" + (receive (from to pids) + (pipeline '(("echo" "dlrow olleh") ("rev"))) + (and (string=? "hello world\n" (read-string from)) + (equal? '(0 0) (map (compose status:exit-val cdr waitpid) pids))))) -- 2.25.1