diff --git a/guix/store.scm b/guix/store.scm index b15da5485..1ba22cf2d 100644 --- a/guix/store.scm +++ b/guix/store.scm @@ -40,6 +40,7 @@ #:use-module (ice-9 regex) #:use-module (ice-9 vlist) #:use-module (ice-9 popen) + #:use-module (ice-9 format) #:use-module (web uri) #:export (%daemon-socket-uri %gc-roots-directory @@ -322,7 +323,7 @@ (define-record-type (%make-nix-server socket major minor - buffer flush + buffer flush pending-rpcs ats-cache atts-cache) nix-server? (socket nix-server-socket) @@ -332,6 +333,10 @@ (buffer nix-server-output-port) ;output port (flush nix-server-flush-output) ;thunk + ;; List of pending 'add-text-to-store' RPC arguments. + (pending-rpcs nix-server-pending-rpcs + set-nix-server-pending-rpcs!) + ;; Caches. We keep them per-connection, because store paths build ;; during the session are temporary GC roots kept for the duration of ;; the session. @@ -509,7 +514,7 @@ for this connection will be pinned. Return a server object." (let ((conn (%make-nix-server port (protocol-major v) (protocol-minor v) - output flush + output flush '() (make-hash-table 100) (make-hash-table 100)))) (let loop ((done? (process-stderr conn))) @@ -521,8 +526,17 @@ for this connection will be pinned. Return a server object." (force-output (nix-server-output-port server)) ((nix-server-flush-output server))) +(define (flush-pending-rpcs server) + (let ((len (length (nix-server-pending-rpcs server)))) + (when (> len 0) + (pk 'flush-pending-rpcs len) + (add-data-to-store/multiple server + (reverse (nix-server-pending-rpcs server))) + (set-nix-server-pending-rpcs! server '())))) + (define (close-connection server) "Close the connection to SERVER." + (flush-pending-rpcs server) (close (nix-server-socket server))) (define-syntax-rule (with-store store exp ...) @@ -811,6 +825,8 @@ bytevector) as its internal buffer, and a thunk to flush this output port." docstring (let* ((s (nix-server-socket server)) (buffered (nix-server-output-port server))) + (unless (eq? 'name 'add-text-to-store) + (flush-pending-rpcs server)) (record-operation 'name) (write-int (operation-id name) buffered) (write-arg type arg buffered) @@ -822,6 +838,32 @@ bytevector) as its internal buffer, and a thunk to flush this output port." (or done? (loop (process-stderr server)))) (values (read-arg return s) ...)))))) + +(define-syntax operation-pipeline + (syntax-rules () + "Define a client-side RPC stub for the given operation." + ((_ (name (type arg) ...) docstring return ...) + (lambda (server arg-list) + docstring + (let* ((s (nix-server-socket server)) + (buffered (nix-server-output-port server))) + (record-operation 'name) + (for-each (match-lambda + ((arg ...) + (write-int (operation-id name) buffered) + (write-arg type arg buffered) + ...)) + arg-list) + (write-buffered-output server) + + (map (lambda (_) + ;; Loop until the server is done sending error output. + (let loop ((done? (process-stderr server))) + (or done? (loop (process-stderr server)))) + + (list (read-arg return s) ...)) + arg-list)))))) + (define-syntax-rule (define-operation (name args ...) docstring return ...) (define name @@ -856,6 +898,20 @@ string). Raise an error if no such path exists." "Return the info (hash, references, etc.) for PATH." path-info) +(define add-data-to-store/multiple + (operation-pipeline + (add-text-to-store (string name) (bytevector text) + (string-list references)) + #f + store-path)) + +(define (add-data-to-store/buffer server name bytes references) + (let ((pending (nix-server-pending-rpcs server))) + (set-nix-server-pending-rpcs! server + (cons (list name bytes references) + pending)) + (text-output-path name bytes references))) + (define add-data-to-store ;; A memoizing version of `add-to-store', to avoid repeated RPCs with ;; the very same arguments during a given session. @@ -871,7 +927,7 @@ path." (let* ((args `(,bytes ,name ,references)) (cache (nix-server-add-text-to-store-cache server))) (or (hash-ref cache args) - (let ((path (add-text-to-store server name bytes references))) + (let ((path (add-data-to-store/buffer server name bytes references))) (hash-set! cache args path) path)))))) @@ -1485,6 +1541,16 @@ the derivation called NAME with hash HASH." name (string-append name "-" output)))) +(define (text-output-path name bv references) + "Return an output path for NAME, with contents BV and the given REFERENCES. +The result is the same as that produced by 'add-data-to-store' with the same +arguments." + (store-path (string-append "text" + (string-join (sort references string +#include #include #include @@ -79,8 +80,7 @@ static void tunnelStderr(const unsigned char * buf, size_t count) /* Return true if the remote side has closed its end of the - connection, false otherwise. Should not be called on any socket on - which we expect input! */ + connection, false otherwise. */ static bool isFarSideClosed(int socket) { struct timeval timeout; @@ -95,17 +95,24 @@ static bool isFarSideClosed(int socket) if (!FD_ISSET(socket, &fds)) return false; - /* Destructive read to determine whether the select() marked the - socket as readable because there is actual input or because - we've reached EOF (i.e., a read of size 0 is available). */ - char c; - int rd; - if ((rd = read(socket, &c, 1)) > 0) - throw Error("EOF expected (protocol error?)"); - else if (rd == -1 && errno != ECONNRESET) - throw SysError("expected connection reset or EOF"); + /* Check whether whether 'select' marked the socket as readable because + there is actual input or because we've reached EOF (i.e., a read of + size 0 is available). */ + char c; int rd; + do { + rd = recv(socket, &c, sizeof c, MSG_PEEK); + } + while (rd == -1 && errno == EINTR); - return true; + if (rd == -1) { + if (errno == ECONNRESET) + /* Remote side is definitely closed. */ + return true; + else + throw SysError("while peeking client input"); + } + + return rd == 0; } @@ -136,9 +143,6 @@ static void sigPollHandler(int sigNo) const char * s = "SIGPOLL\n"; write(STDERR_FILENO, s, strlen(s)); } - } else { - const char * s = "spurious SIGPOLL\n"; - write(STDERR_FILENO, s, strlen(s)); } } catch (Error & e) { @@ -847,8 +851,8 @@ static void acceptConnection(int fdSocket) /* If we're on a TCP connection, disable Nagle's algorithm so that data is sent as soon as possible. */ - (void) setsockopt(remote, SOL_TCP, TCP_NODELAY, - &enabled, sizeof enabled); + // (void) setsockopt(remote, SOL_TCP, TCP_NODELAY, + // &enabled, sizeof enabled); #if defined(TCP_QUICKACK) /* Enable TCP quick-ack if applicable; this might help a little. */