From 94898f67e1dca6152c434ff50e860691ce813018 Mon Sep 17 00:00:00 2001 From: Mathieu Othacehe Date: Wed, 2 Dec 2020 11:13:33 +0100 Subject: [PATCH] Add remote build support. * src/cuirass/remote.scm: New file. * src/cuirass/remote-server.scm: New file. * src/cuirass/remote-worker.scm: New file. * bin/remote-server.in: New file. * bin/remote-worker.in: New file. * Makefile.am (bin_SCRIPTS): Add new binaries, (dist_pkgmodule_DATA): add new files, (EXTRA_DIST): add new binaries, (bin/remote-server, bin/remote-worker): new targets. * .gitignore: Add new binaries. * bin/cuirass.in (%options): Add "--build-remote" option, (show-help): document it, (main): honor it. * src/cuirass/base.scm (with-build-offload-thread): New macro, (%build-remote?, %build-offload-channel): new parameters, (make-build-offload-thread): new procedure, (build-derivations/offload): new procedure, (restart-builds): use it to offload builds when %build-remote? is set, (build-packages): ditto. --- .gitignore | 2 + Makefile.am | 16 +- bin/cuirass.in | 162 ++++++----- bin/remote-server.in | 29 ++ bin/remote-worker.in | 29 ++ src/cuirass/base.scm | 65 ++++- src/cuirass/remote-server.scm | 518 ++++++++++++++++++++++++++++++++++ src/cuirass/remote-worker.scm | 286 +++++++++++++++++++ src/cuirass/remote.scm | 292 +++++++++++++++++++ 9 files changed, 1318 insertions(+), 81 deletions(-) create mode 100644 bin/remote-server.in create mode 100644 bin/remote-worker.in create mode 100644 src/cuirass/remote-server.scm create mode 100644 src/cuirass/remote-worker.scm create mode 100644 src/cuirass/remote.scm diff --git a/.gitignore b/.gitignore index beabf29..7cd0e1f 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,8 @@ /bin/cuirass /bin/cuirass-send-events /bin/evaluate +/bin/remote-server +/bin/remote-worker /build-aux/config.guess /build-aux/config.sub /build-aux/install-sh diff --git a/Makefile.am b/Makefile.am index 17a73f0..270c0ed 100644 --- a/Makefile.am +++ b/Makefile.am @@ -22,7 +22,13 @@ # You should have received a copy of the GNU General Public License # along with Cuirass. If not, see . -bin_SCRIPTS = bin/cuirass bin/cuirass-send-events bin/evaluate +bin_SCRIPTS = \ + bin/cuirass \ + bin/cuirass-send-events \ + bin/evaluate \ + bin/remote-server \ + bin/remote-worker + noinst_SCRIPTS = pre-inst-env guilesitedir = $(datarootdir)/guile/site/@GUILE_EFFECTIVE_VERSION@ @@ -48,6 +54,9 @@ dist_pkgmodule_DATA = \ src/cuirass/http.scm \ src/cuirass/logging.scm \ src/cuirass/metrics.scm \ + src/cuirass/remote.scm \ + src/cuirass/remote-server.scm \ + src/cuirass/remote-worker.scm \ src/cuirass/send-events.scm \ src/cuirass/ui.scm \ src/cuirass/utils.scm \ @@ -166,6 +175,8 @@ EXTRA_DIST = \ bin/cuirass.in \ bin/cuirass-send-events.in \ bin/evaluate.in \ + bin/remote-server.in \ + bin/remote-worker.in \ bootstrap \ build-aux/guix.scm \ src/cuirass/config.scm.in \ @@ -226,6 +237,9 @@ generate_file = \ bin/cuirass: $(srcdir)/bin/cuirass.in bin/cuirass-send-events: $(srcdir)/bin/cuirass-send-events.in bin/evaluate: $(srcdir)/bin/evaluate.in +bin/remote-server: $(srcdir)/bin/remote-server.in +bin/remote-worker: $(srcdir)/bin/remote-worker.in + $(bin_SCRIPTS): Makefile $(generate_file); chmod +x $@ src/cuirass/config.scm: $(srcdir)/src/cuirass/config.scm.in Makefile diff --git a/bin/cuirass.in b/bin/cuirass.in index aef4a65..ac9811c 100644 --- a/bin/cuirass.in +++ b/bin/cuirass.in @@ -57,6 +57,7 @@ exec ${GUILE:-@GUILE@} --no-auto-compile -e main -s "$0" "$@" --listen=HOST Listen on the network interface for HOST -I, --interval=N Wait N seconds between each poll --log-queries=FILE Log SQL queries in FILE. + --build-remote Use the remote build mechanism --use-substitutes Allow usage of pre-built substitutes --record-events Record events for distribution --threads=N Use up to N kernel threads @@ -74,6 +75,7 @@ exec ${GUILE:-@GUILE@} --no-auto-compile -e main -s "$0" "$@" (port (single-char #\p) (value #t)) (listen (value #t)) (interval (single-char #\I) (value #t)) + (build-remote (value #f)) (use-substitutes (value #f)) (threads (value #t)) (fallback (value #f)) @@ -100,6 +102,7 @@ exec ${GUILE:-@GUILE@} --no-auto-compile -e main -s "$0" "$@" (%package-database (option-ref opts 'database (%package-database))) (%package-cachedir (option-ref opts 'cache-directory (%package-cachedir))) + (%build-remote? (option-ref opts 'build-remote #f)) (%use-substitutes? (option-ref opts 'use-substitutes #f)) (%fallback? (option-ref opts 'fallback #f)) (%record-events? (option-ref opts 'record-events #f)) @@ -141,84 +144,87 @@ exec ${GUILE:-@GUILE@} --no-auto-compile -e main -s "$0" "$@" (lambda () (with-database (with-queue-writer-worker - (and specfile - (let ((new-specs (save-module-excursion - (lambda () - (set-current-module (make-user-module '())) - (primitive-load specfile))))) - (for-each db-add-specification new-specs))) - - (when queries-file - (log-message "Enable SQL query logging.") - (db-log-queries queries-file)) - - (if one-shot? - (process-specs (db-get-specifications)) - (let ((exit-channel (make-channel))) - (start-watchdog) - (if (option-ref opts 'web #f) - (begin - (spawn-fiber - (essential-task - 'web exit-channel - (lambda () - (run-cuirass-server #:host host #:port port))) - #:parallel? #t) - - (spawn-fiber - (essential-task - 'monitor exit-channel - (lambda () - (while #t - (log-monitoring-stats) - (sleep 600)))))) - - (begin - (clear-build-queue) - - ;; If Cuirass was stopped during an evaluation, - ;; abort it. Builds that were not registered - ;; during this evaluation will be registered - ;; during the next evaluation. - (db-abort-pending-evaluations) - - ;; First off, restart builds that had not - ;; completed or were not even started on a - ;; previous run. - (spawn-fiber - (essential-task - 'restart-builds exit-channel - (lambda () - (restart-builds)))) - - (spawn-fiber - (essential-task - 'build exit-channel - (lambda () - (while #t - (process-specs (db-get-specifications)) - (log-message - "next evaluation in ~a seconds" interval) - (sleep interval))))) - - (spawn-fiber - (essential-task - 'metrics exit-channel - (lambda () - (while #t - (with-time-logging - "Metrics update" - (db-update-metrics)) - (sleep 3600))))) - - (spawn-fiber - (essential-task - 'monitor exit-channel - (lambda () - (while #t - (log-monitoring-stats) - (sleep 600))))))) - (primitive-exit (get-message exit-channel))))))) + (with-build-offload-thread + (and specfile + (let ((new-specs (save-module-excursion + (lambda () + (set-current-module + (make-user-module '())) + (primitive-load specfile))))) + (for-each db-add-specification new-specs))) + + (when queries-file + (log-message "Enable SQL query logging.") + (db-log-queries queries-file)) + + (if one-shot? + (process-specs (db-get-specifications)) + (let ((exit-channel (make-channel))) + (start-watchdog) + (if (option-ref opts 'web #f) + (begin + (spawn-fiber + (essential-task + 'web exit-channel + (lambda () + (run-cuirass-server #:host host + #:port port))) + #:parallel? #t) + + (spawn-fiber + (essential-task + 'monitor exit-channel + (lambda () + (while #t + (log-monitoring-stats) + (sleep 600)))))) + + (begin + (clear-build-queue) + + ;; If Cuirass was stopped during an evaluation, + ;; abort it. Builds that were not registered + ;; during this evaluation will be registered + ;; during the next evaluation. + (db-abort-pending-evaluations) + + ;; First off, restart builds that had not + ;; completed or were not even started on a + ;; previous run. + (spawn-fiber + (essential-task + 'restart-builds exit-channel + (lambda () + (restart-builds)))) + + (spawn-fiber + (essential-task + 'build exit-channel + (lambda () + (while #t + (process-specs (db-get-specifications)) + (log-message + "next evaluation in ~a seconds" interval) + (sleep interval))))) + + (spawn-fiber + (essential-task + 'metrics exit-channel + (lambda () + (while #t + (with-time-logging + "Metrics update" + (db-update-metrics)) + (sleep 3600))))) + + (spawn-fiber + (essential-task + 'monitor exit-channel + (lambda () + (while #t + (log-monitoring-stats) + (sleep 600))))))) + (primitive-exit (get-message exit-channel)))))))) ;; Most of our code is I/O so preemption doesn't matter much (it ;; could help while we're doing SQL requests, for instance, but it diff --git a/bin/remote-server.in b/bin/remote-server.in new file mode 100644 index 0000000..6425d51 --- /dev/null +++ b/bin/remote-server.in @@ -0,0 +1,29 @@ +#!/bin/sh +# -*- scheme -*- +# @configure_input@ +#GUILE_LOAD_PATH="@PACKAGE_LOAD_PATH@${GUILE_LOAD_PATH:+:}$GUILE_LOAD_PATH" +#GUILE_LOAD_COMPILED_PATH="@PACKAGE_LOAD_COMPILED_PATH@${GUILE_LOAD_COMPILED_PATH:+:}$GUILE_LOAD_COMPILED_PATH" +exec ${GUILE:-@GUILE@} --no-auto-compile -e main -s "$0" "$@" +!# +;;; remote-server.in -- Remote build server. +;;; Copyright © 2020 Mathieu Othacehe +;;; +;;; This file is part of Cuirass. +;;; +;;; Cuirass is free software: you can redistribute it and/or modify +;;; it under the terms of the GNU General Public License as published by +;;; the Free Software Foundation, either version 3 of the License, or +;;; (at your option) any later version. +;;; +;;; Cuirass is distributed in the hope that it will be useful, +;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +;;; GNU General Public License for more details. +;;; +;;; You should have received a copy of the GNU General Public License +;;; along with Cuirass. If not, see . + +(use-modules (cuirass remote-server)) + +(define* (main #:optional (args (command-line))) + (remote-server (cdr args))) diff --git a/bin/remote-worker.in b/bin/remote-worker.in new file mode 100644 index 0000000..8a3830c --- /dev/null +++ b/bin/remote-worker.in @@ -0,0 +1,29 @@ +#!/bin/sh +# -*- scheme -*- +# @configure_input@ +#GUILE_LOAD_PATH="@PACKAGE_LOAD_PATH@${GUILE_LOAD_PATH:+:}$GUILE_LOAD_PATH" +#GUILE_LOAD_COMPILED_PATH="@PACKAGE_LOAD_COMPILED_PATH@${GUILE_LOAD_COMPILED_PATH:+:}$GUILE_LOAD_COMPILED_PATH" +exec ${GUILE:-@GUILE@} --no-auto-compile -e main -s "$0" "$@" +!# +;;; remote-worker.in -- Remote build worker. +;;; Copyright © 2020 Mathieu Othacehe +;;; +;;; This file is part of Cuirass. +;;; +;;; Cuirass is free software: you can redistribute it and/or modify +;;; it under the terms of the GNU General Public License as published by +;;; the Free Software Foundation, either version 3 of the License, or +;;; (at your option) any later version. +;;; +;;; Cuirass is distributed in the hope that it will be useful, +;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +;;; GNU General Public License for more details. +;;; +;;; You should have received a copy of the GNU General Public License +;;; along with Cuirass. If not, see . + +(use-modules (cuirass remote-worker)) + +(define* (main #:optional (args (command-line))) + (remote-worker (cdr args))) diff --git a/src/cuirass/base.scm b/src/cuirass/base.scm index c3ce900..c526f5c 100644 --- a/src/cuirass/base.scm +++ b/src/cuirass/base.scm @@ -22,8 +22,10 @@ (define-module (cuirass base) #:use-module (fibers) + #:use-module (fibers channels) #:use-module (cuirass logging) #:use-module (cuirass database) + #:use-module (cuirass remote) #:use-module (cuirass utils) #:use-module ((cuirass config) #:select (%localstatedir)) #:use-module (gnu packages) @@ -36,9 +38,13 @@ #:use-module ((guix config) #:select (%state-directory)) #:use-module (git) #:use-module (ice-9 binary-ports) + #:use-module ((ice-9 suspendable-ports) + #:select (current-read-waiter + current-write-waiter)) #:use-module (ice-9 format) #:use-module (ice-9 match) #:use-module (ice-9 popen) + #:use-module (ice-9 ports internal) #:use-module (ice-9 rdelim) #:use-module (ice-9 receive) #:use-module (ice-9 regex) @@ -65,11 +71,13 @@ prepare-git process-specs evaluation-log-file + with-build-offload-thread ;; Parameters. %package-cachedir %gc-root-directory %gc-root-ttl + %build-remote? %use-substitutes? %fallback?)) @@ -102,6 +110,10 @@ (define time-monotonic time-tai)) (else #t)) +(define %build-remote? + ;; Define whether to use the remote build mechanism. + (make-parameter #f)) + (define %use-substitutes? ;; Define whether to use substitutes (make-parameter #f)) @@ -110,6 +122,10 @@ ;; Define whether to fall back to building when the substituter fails. (make-parameter #f)) +(define %build-offload-channel + ;; Channel to communicate with the remote build server. + (make-parameter #f)) + (define %package-cachedir ;; Define to location of cache directory of this package. (make-parameter (or (getenv "CUIRASS_CACHEDIR") @@ -436,6 +452,39 @@ Essentially this procedure inverts the inversion-of-control that (raise c)) (x x))))))) +(define (make-build-offload-thread) + "Return a channel used to offload builds by communicating with the remote +build server in a separate thread. The spawned thread also polls for build +events sent by the remote server and calls HANDLE-BUILD-EVENT to register them +in the database." + (let ((channel (make-channel))) + (call-with-new-thread + (lambda () + (parameterize (((@@ (fibers internal) current-fiber) #f) + (current-read-waiter (lambda (port) + (port-poll port "r"))) + (current-write-waiter (lambda (port) + (port-poll port "w")))) + (let ((socket (remote-build-socket))) + (let loop () + (remote-build-poll socket handle-build-event) + (match (get-message-with-timeout channel + #:seconds 1 + #:retry? #f) + ((drvs . systems) + (remote-build socket drvs systems)) + ('timeout #f)) + (loop)))))) + channel)) + +(define-syntax-rule (with-build-offload-thread body ...) + (parameterize ((%build-offload-channel + (make-build-offload-thread))) + body ...)) + +(define (build-derivations/offload drvs systems) + (put-message (%build-offload-channel) (cons drvs systems))) + ;;; ;;; Building packages. @@ -641,7 +690,14 @@ started)." ;; Those in VALID can be restarted. If some of them were built in the ;; meantime behind our back, that's fine: 'spawn-builds' will DTRT. (log-message "restarting ~a pending builds" (length valid)) - (spawn-builds store valid) + (if (%build-remote?) + (let* ((builds (map db-get-build valid)) + (systems (map (cut assq-ref <> #:system) builds))) + ;; The system could by read from the store by the remote build + ;; server using the derivation name, but it is far less expensive + ;; to read it from the database. + (build-derivations/offload valid systems)) + (spawn-builds store valid)) (log-message "done with restarted builds")))) (define (create-build-outputs build product-specs) @@ -690,7 +746,12 @@ by PRODUCT-SPECS." (db-set-evaluation-status eval-id (evaluation-status succeeded)) - (spawn-builds store derivations) + (if (%build-remote?) + (let* ((builds (map db-get-build derivations)) + (systems (map (cut assq-ref <> #:system) builds))) + ;; See the comment above regarding system read. + (build-derivations/offload derivations systems)) + (spawn-builds store derivations)) (let* ((results (filter-map (cut db-get-build <>) derivations)) (status (map (cut assq-ref <> #:status) results)) diff --git a/src/cuirass/remote-server.scm b/src/cuirass/remote-server.scm new file mode 100644 index 0000000..6217918 --- /dev/null +++ b/src/cuirass/remote-server.scm @@ -0,0 +1,518 @@ +;;; remote-server.scm -- Remote build server. +;;; Copyright © 2020 Mathieu Othacehe +;;; +;;; This file is part of Cuirass. +;;; +;;; GNU Guix is free software; you can redistribute it and/or modify it +;;; under the terms of the GNU General Public License as published by +;;; the Free Software Foundation; either version 3 of the License, or (at +;;; your option) any later version. +;;; +;;; GNU Guix is distributed in the hope that it will be useful, but +;;; WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +;;; GNU General Public License for more details. +;;; +;;; You should have received a copy of the GNU General Public License +;;; along with GNU Guix. If not, see . + +(define-module (cuirass remote-server) + #:use-module (cuirass base) + #:use-module (cuirass remote) + #:use-module (gcrypt pk-crypto) + #:use-module (guix avahi) + #:use-module (guix base32) + #:use-module (guix base64) + #:use-module (guix config) + #:use-module (guix derivations) + #:use-module (guix records) + #:use-module (guix packages) + #:use-module (guix pki) + #:use-module (guix scripts) + #:use-module (guix store) + #:use-module (guix ui) + #:use-module (guix workers) + #:use-module (guix build download) + #:use-module (guix build syscalls) + #:use-module (gcrypt hash) + #:use-module (gcrypt pk-crypto) + #:use-module (simple-zmq) + #:use-module (rnrs bytevectors) + #:use-module (srfi srfi-1) + #:use-module (srfi srfi-26) + #:use-module (srfi srfi-37) + #:use-module (ice-9 atomic) + #:use-module (ice-9 match) + #:use-module (ice-9 q) + #:use-module (ice-9 rdelim) + #:use-module (ice-9 regex) + #:use-module (ice-9 threads) + + #:export (remote-server)) + +;; Indicate if the process has to be stopped. +(define %stop-process? + (make-atomic-box #f)) + +;; Whether to add build items to the store. +(define %add-to-store? + (make-parameter #f)) + +(define %cache-directory + (make-parameter #f)) + +(define %private-key + (make-parameter #f)) + +(define %public-key + (make-parameter #f)) + +(define service-name + "Cuirass remote server") + +(define (show-help) + (format #t (G_ "Usage: remote-server [OPTION]... +Start a remote build server.\n")) + (display (G_ " + -a, --add-to-store register built items to the store")) + (display (G_ " + -b, --backend-port=PORT listen worker connections on PORT")) + (display (G_ " + -p, --publish-port=PORT publish substitutes on PORT")) + (display (G_ " + -c, --cache=DIRECTORY cache built items to DIRECTORY")) + (display (G_ " + --public-key=FILE use FILE as the public key for signatures")) + (display (G_ " + --private-key=FILE use FILE as the private key for signatures")) + (newline) + (display (G_ " + -h, --help display this help and exit")) + (display (G_ " + -V, --version display version information and exit")) + (newline) + (show-bug-report-information)) + +(define %options + (list (option '(#\h "help") #f #f + (lambda _ + (show-help) + (exit 0))) + (option '(#\V "version") #f #f + (lambda _ + (show-version-and-exit "guix publish"))) + (option '(#\a "add-to-store") #t #f + (lambda (opt name arg result) + (alist-cons 'add-to-store? arg result))) + (option '(#\b "backend-port") #t #f + (lambda (opt name arg result) + (alist-cons 'backend-port (string->number* arg) result))) + (option '(#\p "publish-port") #t #f + (lambda (opt name arg result) + (alist-cons 'publish-port (string->number* arg) result))) + (option '(#\c "cache") #t #f + (lambda (opt name arg result) + (alist-cons 'cache arg result))) + (option '("public-key") #t #f + (lambda (opt name arg result) + (alist-cons 'public-key-file arg result))) + (option '("private-key") #t #f + (lambda (opt name arg result) + (alist-cons 'private-key-file arg result))))) + +(define %default-options + `((backend-port . 5555) + (publish-port . 5556) + (public-key-file . ,%public-key-file) + (private-key-file . ,%private-key-file))) + + +;;; +;;; Build workers. +;;; + +(define %workers + ;; Set of connected workers. + (make-hash-table)) + +(define %build-queues + ;; Builds request queue. + (map (lambda (system) + (cons system (make-q))) + %supported-systems)) + +(define (find-system-queues systems) + "Return the list of build queues for SYSTEMS that are not empty." + (filter-map (match-lambda + ((system . queue) + (and (member system systems) + (not (q-empty? queue)) + queue))) + %build-queues)) + +(define (build-available? name) + "Return #t if there is some available work for the worker with the given +NAME and #f otherwise." + (let* ((worker (hash-ref %workers name)) + (systems (worker-systems worker)) + (queues (find-system-queues systems))) + (not (null? queues)))) + +(define (pop-random-build name) + "Pop randomly and return a build from all the build queues with available +work for the worker with the given NAME." + (define (random-queue queues) + (list-ref queues (random (length queues)))) + + (let* ((worker (hash-ref %workers name)) + (systems (worker-systems worker)) + (queues (find-system-queues systems))) + (q-pop! (random-queue queues)))) + +(define* (read-client-exp client exp) + "Read the given EXP sent by CLIENT." + (catch 'system-error + (lambda () + (match (zmq-read-message exp) + (('build ('drv drv) ('system system)) + (let ((system (or system + (derivation-system + (read-derivation-from-file drv))))) + ;; Push the derivation to the matching queue according to the + ;; targeted system. Also save the client ID in the queue to be able + ;; to send it build events later on. + (q-push! (assoc-ref %build-queues system) + (list client drv)))))) + (const #f))) + +(define* (read-worker-exp exp #:key reply-worker) + "Read the given EXP sent by a worker. REPLY-WORKER is a procedure that can +be used to reply to the worker." + (match (zmq-read-message exp) + (('worker-ready worker) + (let* ((worker* (sexp->worker worker)) + (name (worker-name worker*))) + (info (G_ "Worker `~a' is ready.~%") name) + (hash-set! %workers name worker*))) + (('worker-request-work name) + (if (build-available? name) + (match (pop-random-build name) + ((client drv) + (reply-worker client (zmq-build-request-message drv)))) + (reply-worker + (zmq-empty-delimiter) + (zmq-no-build-message)))))) + + +;;; +;;; Fetch workers. +;;; + +(define (zmq-fetch-workers-endpoint) + "inproc://fetch-workers") + +(define (zmq-fetch-worker-socket) + "Return a socket used to communicate with the fetch workers." + (let ((socket (zmq-create-socket %zmq-context ZMQ_DEALER)) + (endpoint (zmq-fetch-workers-endpoint))) + (zmq-connect socket endpoint) + socket)) + +(define (strip-store-prefix file) + ; Given a file name like "/gnu/store/…-foo-1.2/bin/foo", return + ;; "/bin/foo". + (let* ((len (string-length %store-directory)) + (base (string-drop file (+ 1 len)))) + (match (string-index base #\/) + (#f base) + (index (string-drop base index))))) + +(define (publish-nar-url publish-url store-hash) + "Return the URL of STORE-HASH nar substitute on PUBLISH-URL." + (format #f "~a/nar/gzip/~a" publish-url store-hash)) + +(define (publish-narinfo-url publish-url store-hash) + "Return the URL of STORE-HASH narinfo file on PUBLISH-URL." + (let ((hash (and=> (string-index store-hash #\-) + (cut string-take store-hash <>)))) + (format #f "~a/~a.narinfo" publish-url hash))) + +(define (nar-path cache-directory output) + "Return the path of the NAR file for OUTPUT in CACHE-DIRECTORY." + (string-append cache-directory "/" (basename output) ".nar")) + +(define (narinfo-path cache-directory output) + "Return the path of the NARINFO file for OUTPUT in CACHE-DIRECTORY." + (string-append cache-directory "/" (basename output) ".narinfo")) + +(define* (sign-narinfo! narinfo) + "Edit the given NARINFO file to replace the worker signature by the remote +build server signature." + (define (signed-string s) + (let* ((hash (bytevector->hash-data (sha256 (string->utf8 s)) + #:key-type (key-type (%public-key))))) + (signature-sexp hash (%private-key) (%public-key)))) + + (define base64-encode-string + (compose base64-encode string->utf8)) + + (define lines + (call-with-input-file narinfo + (lambda (port) + (let loop ((line (read-line port)) + (lines '())) + (if (eof-object? line) + (reverse lines) + (loop (read-line port) + (cons line lines))))))) + (let* ((lines + (filter (lambda (line) + (not (string-match "^Signature:" line))) + lines)) + (info (format #f "~a~%" (string-join lines "\n"))) + (signature (base64-encode-string + (canonical-sexp->string (signed-string info))))) + (call-with-output-file narinfo + (lambda (port) + (format port "~aSignature: 1;~a;~a~%" + info (gethostname) signature))))) + +(define (download-nar cache-directory outputs url) + "Download in CACHE-DIRECTORY the OUTPUTS from the substitute server at URL." + (for-each + (lambda (output) + (let* ((path (derivation-output-path output)) + (store-hash (strip-store-prefix path)) + (nar-file (nar-path cache-directory store-hash)) + (narinfo-file (narinfo-path cache-directory store-hash)) + (nar-url (publish-nar-url url store-hash)) + (narinfo-url (publish-narinfo-url url store-hash))) + (unless (file-exists? nar-file) + (url-fetch nar-url nar-file)) + + (unless (file-exists? narinfo-file) + (url-fetch narinfo-url narinfo-file) + (sign-narinfo! narinfo-file)))) + outputs)) + +(define (add-to-store outputs url) + "Add the OUTPUTS that are available from the substitute server at URL to the +store." + (with-store store + (for-each (lambda (output) + (add-substitute-url store url) + (ensure-path store output)) + (map derivation-output-path outputs)))) + +(define (need-fetching? message) + "Return #t if the received MESSAGE implies that some output fetching is +required and #f otherwise." + (match (zmq-read-message message) + (('build-succeeded ('drv drv) ('url url)) + #t) + (else #f))) + +(define* (run-fetch message #:key reply) + "Read MESSAGE and download the corresponding build outputs. If +%CACHE-DIRECTORY is set, download the matching NAR and NARINFO files in this +directory. If %ADD-TO-STORE? is set, add the build outputs to the store. + +REPLY is procedure used to forward MESSAGE to the client once the build +outputs are downloaded." + (define (build-outputs drv) + (catch 'system-error + (lambda () + (map (match-lambda + ((output-name . output) + output)) + (derivation-outputs + (read-derivation-from-file drv)))) + (const '()))) + + (match (zmq-read-message message) + (('build-succeeded ('drv drv) ('url url)) + (let ((outputs (build-outputs drv))) + (when (%add-to-store?) + (add-to-store outputs url)) + (when (%cache-directory) + (download-nar (%cache-directory) outputs url)) + (reply message))))) + +(define (start-fetch-worker name) + "Start a fetch worker thread with the given NAME. This worker takes care of +downloading build outputs. It communicates with the remote server using a ZMQ +socket." + (define (reply socket client) + (lambda (message) + (zmq-send-msg-parts-bytevector + socket + (list client (zmq-empty-delimiter) (string->bv message))))) + + (call-with-new-thread + (lambda () + (set-thread-name name) + (let ((socket (zmq-fetch-worker-socket))) + (let loop () + (match (zmq-get-msg-parts-bytevector socket '()) + ((client empty rest) + (let ((message (bv->string rest))) + (run-fetch (bv->string rest) + #:reply (reply socket client))))) + (loop)))))) + + +;;; +;;; ZMQ connection. +;;; + +(define %zmq-context + (zmq-create-context)) + +(define (zmq-backend-endpoint backend-port) + "Return a ZMQ endpoint string allowing TCP connections on BACKEND-PORT from +all network interfaces." + (string-append "tcp://*:" (number->string backend-port))) + +(define (zmq-start-proxy backend-port) + "This procedure starts a proxy between client connections from the IPC +frontend to the workers connected through the TCP backend." + (define (socket-ready? items socket) + (find (lambda (item) + (eq? (poll-item-socket item) socket)) + items)) + + (let* ((client-socket + (zmq-create-socket %zmq-context ZMQ_ROUTER)) + (build-socket + (zmq-create-socket %zmq-context ZMQ_ROUTER)) + (fetch-socket + (zmq-create-socket %zmq-context ZMQ_DEALER)) + (poll-items (list + (poll-item client-socket ZMQ_POLLIN) + (poll-item build-socket ZMQ_POLLIN) + (poll-item fetch-socket ZMQ_POLLIN)))) + + (zmq-bind-socket client-socket (zmq-frontend-endpoint)) + (zmq-bind-socket build-socket (zmq-backend-endpoint backend-port)) + (zmq-bind-socket fetch-socket (zmq-fetch-workers-endpoint)) + + ;; Change frontend socket permissions. + (chmod (zmq-frontend-socket-name) #o666) + + ;; Do not use the built-in zmq-proxy as we want to edit the envelope of + ;; frontend messages before forwarding them to the backend. + (let loop () + (let ((items (zmq-poll* poll-items))) + ;; CLIENT -> REMOTE-SERVER. + (when (zmq-socket-ready? items client-socket) + (match (zmq-get-msg-parts-bytevector client-socket) + ((client empty rest) + (read-client-exp client (bv->string rest))))) + ;; BUILD-WORKER -> REMOTE-SERVER. + (when (zmq-socket-ready? items build-socket) + (match (zmq-get-msg-parts-bytevector build-socket) + ((worker empty rest) + (let ((reply-worker + (lambda (client message) + (zmq-send-msg-parts-bytevector + build-socket + (list worker + (zmq-empty-delimiter) + client + (zmq-empty-delimiter) + (string->bv message)))))) + (read-worker-exp (bv->string rest) + #:reply-worker reply-worker))) + ((worker empty client empty rest) + (let ((message (list client (zmq-empty-delimiter) rest))) + (if (need-fetching? (bv->string rest)) + (zmq-send-msg-parts-bytevector fetch-socket message) + (zmq-send-msg-parts-bytevector client-socket message)))))) + ;; FETCH-WORKER -> REMOTE-SERVER. + (when (zmq-socket-ready? items fetch-socket) + (let ((msg (zmq-get-msg-parts-bytevector fetch-socket))) + (zmq-send-msg-parts-bytevector client-socket msg))) + + (loop))))) + + +;;; +;;; Entry point. +;;; + +;; The PID of the publish process. +(define %publish-pid + (make-atomic-box #f)) + +;; The thread running the Avahi publish service. +(define %avahi-thread + (make-atomic-box #f)) + +(define (signal-handler) + "Catch SIGINT to stop the Avahi event loop and the publish process before +exiting." + (sigaction SIGINT + (lambda (signum) + (let ((publish-pid (atomic-box-ref %publish-pid)) + (avahi-thread (atomic-box-ref %avahi-thread))) + (atomic-box-set! %stop-process? #t) + + (and publish-pid + (begin + (kill publish-pid SIGHUP) + (waitpid publish-pid))) + + (and avahi-thread + (join-thread avahi-thread)) + + (exit 1))))) + +(define (remote-server args) + (signal-handler) + + (with-error-handling + (let* ((opts (args-fold* args %options + (lambda (opt name arg result) + (leave (G_ "~A: unrecognized option~%") name)) + (lambda (arg result) + (leave (G_ "~A: extraneous argument~%") arg)) + %default-options)) + (add-to-store? (assoc-ref opts 'add-to-store?)) + (backend-port (assoc-ref opts 'backend-port)) + (publish-port (assoc-ref opts 'publish-port)) + (cache (assoc-ref opts 'cache)) + (public-key + (read-file-sexp + (assoc-ref opts 'public-key-file))) + (private-key + (read-file-sexp + (assoc-ref opts 'private-key-file)))) + + (parameterize ((%add-to-store? add-to-store?) + (%cache-directory cache) + (%public-key public-key) + (%private-key private-key)) + + (atomic-box-set! + %publish-pid + (publish-server publish-port + #:public-key public-key + #:private-key private-key)) + + (atomic-box-set! + %avahi-thread + (avahi-publish-service-thread + service-name + #:type remote-server-service-type + #:port backend-port + #:stop-loop? (lambda () + (atomic-box-ref %stop-process?)) + #:txt (list (string-append "publish=" + (number->string publish-port))))) + + (for-each (lambda (number) + (start-fetch-worker + (string-append "fetch-worker-" (number->string number)))) + (iota 4)) + + (zmq-start-proxy backend-port))))) diff --git a/src/cuirass/remote-worker.scm b/src/cuirass/remote-worker.scm new file mode 100644 index 0000000..c253ee5 --- /dev/null +++ b/src/cuirass/remote-worker.scm @@ -0,0 +1,286 @@ +;;; remote-worker.scm -- Remote build worker. +;;; Copyright © 2020 Mathieu Othacehe +;;; +;;; This file is part of Cuirass. +;;; +;;; GNU Guix is free software; you can redistribute it and/or modify it +;;; under the terms of the GNU General Public License as published by +;;; the Free Software Foundation; either version 3 of the License, or (at +;;; your option) any later version. +;;; +;;; GNU Guix is distributed in the hope that it will be useful, but +;;; WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +;;; GNU General Public License for more details. +;;; +;;; You should have received a copy of the GNU General Public License +;;; along with GNU Guix. If not, see . + +(define-module (cuirass remote-worker) + #:use-module (cuirass remote) + #:use-module (gcrypt pk-crypto) + #:use-module (guix) + #:use-module (guix avahi) + #:use-module (guix config) + #:use-module (guix diagnostics) + #:use-module (guix pki) + #:use-module (guix records) + #:use-module (guix scripts) + #:use-module (guix ui) + #:use-module (guix build syscalls) + #:use-module (guix scripts publish) + #:use-module (simple-zmq) + #:use-module (rnrs bytevectors) + #:use-module (srfi srfi-1) + #:use-module (srfi srfi-26) + #:use-module (srfi srfi-34) + #:use-module (srfi srfi-37) + #:use-module (ice-9 atomic) + #:use-module (ice-9 match) + #:use-module (ice-9 threads) + + #:export (remote-worker)) + +;; Indicate if the process has to be stopped. +(define %stop-process? + (make-atomic-box #f)) + +(define (show-help) + (format #t (G_ "Usage: remote-worker [OPTION]... +Start a remote build worker.\n")) + (display (G_ " + -w, --workers=COUNT start COUNT parallel workers")) + (display (G_ " + -p, --publish-port=PORT publish substitutes on PORT")) + (display (G_ " + --public-key=FILE use FILE as the public key for signatures")) + (display (G_ " + --private-key=FILE use FILE as the private key for signatures")) + (newline) + (display (G_ " + -h, --help display this help and exit")) + (display (G_ " + -V, --version display version information and exit")) + (newline) + (show-bug-report-information)) + +(define %options + (list (option '(#\h "help") #f #f + (lambda _ + (show-help) + (exit 0))) + (option '(#\V "version") #f #f + (lambda _ + (show-version-and-exit "guix publish"))) + (option '(#\w "workers") #t #f + (lambda (opt name arg result) + (alist-cons 'workers (string->number* arg) result))) + (option '(#\p "publish-port") #t #f + (lambda (opt name arg result) + (alist-cons 'publish-port (string->number* arg) result))) + (option '("public-key") #t #f + (lambda (opt name arg result) + (alist-cons 'public-key-file arg result))) + (option '("private-key") #t #f + (lambda (opt name arg result) + (alist-cons 'private-key-file arg result))))) + +(define %default-options + `((workers . 1) + (publish-port . 5558) + (public-key-file . ,%public-key-file) + (private-key-file . ,%private-key-file))) + + +;;; +;;; ZMQ connection. +;;; + +(define %zmq-context + (zmq-create-context)) + +(define (zmq-backend-endpoint address port) + "Return a ZMQ endpoint identifying the build server available by TCP at +ADDRESS and PORT." + (string-append "tcp://" address ":" (number->string port))) + +(define (zmq-dealer-socket) + "The ZMQ socket to communicate with the worker threads." + (zmq-create-socket %zmq-context ZMQ_DEALER)) + + +;;; +;;; Worker. +;;; + +;; The port of the local publish server. +(define %local-publish-port + (make-atomic-box #f)) + +(define (server-publish-url address port) + "Return the server publish url at ADDRESS and PORT." + (string-append "http://" address ":" (number->string port))) + +(define (service-txt->publish-port txt) + "Parse the service TXT record and return the server publish port." + (define (parse-txt) + (fold (lambda (param params) + (match (string-split param #\=) + ((key value) + (cons (cons (string->symbol key) value) + params)))) + '() + txt)) + + (let ((params (parse-txt))) + (string->number (assq-ref params 'publish)))) + +(define (service->publish-url service) + "Return the URL of the publish server corresponding to the service with the +given NAME." + (let* ((address (avahi-service-address service)) + (txt (avahi-service-txt service)) + (publish-port + (service-txt->publish-port txt))) + (server-publish-url address publish-port))) + +(define (service->local-publish-url service) + "Return the URL of the local publish server." + (let* ((local-address (avahi-service-local-address service)) + (port (atomic-box-ref %local-publish-port))) + (server-publish-url local-address port))) + +(define* (run-build drv service #:key reply) + "Build DRV and send messages upon build start, failure or completion to the +build server identified by SERVICE-NAME using the REPLY procedure. + +The publish server of the build server is added to the list of the store +substitutes-urls. This way derivations that are not present on the worker can +still be substituted." + (with-store store + (let ((publish-url (service->publish-url service)) + (local-publish-url (service->local-publish-url service))) + (add-substitute-url store publish-url) + (reply (zmq-build-started-message drv)) + (guard (c ((store-protocol-error? c) + (info (G_ "Derivation `~a' build failed: ~a~%") + drv (store-protocol-error-message c)) + (reply (zmq-build-failed-message drv)))) + (if (build-derivations store (list drv)) + (reply (zmq-build-succeeded-message drv local-publish-url)) + (reply (zmq-build-failed-message drv))))))) + +(define* (run-command command service #:key reply) + "Run COMMAND. SERVICE-NAME is the name of the build server that sent the +command. REPLY is a procedure that can be used to reply to this server." + (match (zmq-read-message command) + (('build ('drv drv) ('system system)) + (info (G_ "Building `~a' derivation.~%") drv) + (run-build drv service #:reply reply)) + (('no-build) + #t))) + +(define (start-worker worker service) + "Start a worker thread named NAME, reading commands from the DEALER socket +and executing them. The worker can reply on the same socket." + (define (reply socket client) + (lambda (message) + (zmq-send-msg-parts-bytevector + socket + (list (zmq-empty-delimiter) client + (zmq-empty-delimiter) (string->bv message))))) + + (define (ready socket) + (zmq-send-msg-parts-bytevector + socket + (list (make-bytevector 0) + (string->bv + (zmq-worker-ready-message (worker->sexp worker)))))) + + (define (request-work socket) + (let ((name (worker-name worker))) + (zmq-send-msg-parts-bytevector + socket + (list (make-bytevector 0) + (string->bv (zmq-worker-request-work-message name)))))) + + (call-with-new-thread + (lambda () + (set-thread-name (worker-name worker)) + (let* ((socket (zmq-dealer-socket)) + (address (avahi-service-address service)) + (port (avahi-service-port service)) + (endpoint (zmq-backend-endpoint address port))) + (zmq-connect socket endpoint) + (ready socket) + (let loop () + (request-work socket) + (match (zmq-get-msg-parts-bytevector socket '()) + ((empty client empty command) + (run-command (bv->string command) service + #:reply (reply socket client)))) + (sleep 1) + (loop)))))) + + +;;; +;;; Entry point. +;;; + +;; The PID of the publish process. +(define %publish-pid + (make-atomic-box #f)) + +(define (signal-handler) + "Catch SIGINT to stop the Avahi event loop and the publish process before +exiting." + (sigaction SIGINT + (lambda (signum) + (let ((publish-pid (atomic-box-ref %publish-pid))) + (atomic-box-set! %stop-process? #t) + + (and publish-pid + (begin + (kill publish-pid SIGHUP) + (waitpid publish-pid))) + + (exit 1))))) + +(define (remote-worker args) + (with-error-handling + (let* ((opts (args-fold* args %options + (lambda (opt name arg result) + (leave (G_ "~A: unrecognized option~%") name)) + (lambda (arg result) + (leave (G_ "~A: extraneous argument~%") arg)) + %default-options)) + (workers (assoc-ref opts 'workers)) + (publish-port (assoc-ref opts 'publish-port)) + (public-key + (read-file-sexp + (assoc-ref opts 'public-key-file))) + (private-key + (read-file-sexp + (assoc-ref opts 'private-key-file)))) + + (atomic-box-set! %local-publish-port publish-port) + + (atomic-box-set! + %publish-pid + (publish-server publish-port + #:public-key public-key + #:private-key private-key)) + + (avahi-browse-service-thread + (lambda (action service) + (case action + ((new-service) + (for-each (lambda (n) + (start-worker (worker + (name (generate-worker-name)) + (systems '("x86_64-linux"))) + service)) + (iota workers))))) + #:types (list remote-server-service-type) + #:stop-loop? (lambda () + (atomic-box-ref %stop-process?)))))) diff --git a/src/cuirass/remote.scm b/src/cuirass/remote.scm new file mode 100644 index 0000000..7a71391 --- /dev/null +++ b/src/cuirass/remote.scm @@ -0,0 +1,292 @@ +;;; remote.scm -- Build on remote machines. +;;; Copyright © 2020 Mathieu Othacehe +;;; +;;; This file is part of Cuirass. +;;; +;;; GNU Guix is free software; you can redistribute it and/or modify it +;;; under the terms of the GNU General Public License as published by +;;; the Free Software Foundation; either version 3 of the License, or (at +;;; your option) any later version. +;;; +;;; GNU Guix is distributed in the hope that it will be useful, but +;;; WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +;;; GNU General Public License for more details. +;;; +;;; You should have received a copy of the GNU General Public License +;;; along with GNU Guix. If not, see . + +(define-module (cuirass remote) + #:use-module (guix config) + #:use-module (guix derivations) + #:use-module (guix records) + #:use-module (guix store) + #:use-module (guix ui) + #:use-module (guix build download) + #:use-module ((guix build utils) #:select (mkdir-p)) + #:use-module (guix scripts publish) + #:use-module (simple-zmq) + #:use-module (rnrs bytevectors) + #:use-module (srfi srfi-1) + #:use-module (srfi srfi-26) + #:use-module (ice-9 match) + #:use-module (ice-9 rdelim) + #:export (worker + worker? + worker-name + worker-systems + worker->sexp + sexp->worker + generate-worker-name + + publish-server + add-substitute-url + + zmq-frontend-socket-name + zmq-frontend-endpoint + zmq-poll* + zmq-socket-ready? + zmq-empty-delimiter + + zmq-build-request-message + zmq-no-build-message + zmq-build-started-message + zmq-build-failed-message + zmq-build-succeeded-message + zmq-worker-ready-message + zmq-worker-request-work-message + zmq-read-message + + remote-server-service-type + remote-build-socket + remote-build + remote-build-poll)) + + +;;; +;;; Workers. +;;; + +(define-record-type* + worker make-worker + worker? + (name worker-name) + (systems worker-systems)) + +(define (worker->sexp worker) + "Return an sexp describing WORKER." + (let ((name (worker-name worker)) + (systems (worker-systems worker))) + `(worker + (name ,name) + (systems ,systems)))) + +(define (sexp->worker sexp) + "Turn SEXP, an sexp as returned by 'worker->sexp', into a record." + (match sexp + (('worker ('name name) ('systems systems)) + (worker + (name name) + (systems systems))))) + + +(define %seed + (seed->random-state + (logxor (getpid) (car (gettimeofday))))) + +(define (integer->alphanumeric-char n) + "Map N, an integer in the [0..62] range, to an alphanumeric character." + (cond ((< n 10) + (integer->char (+ (char->integer #\0) n))) + ((< n 36) + (integer->char (+ (char->integer #\A) (- n 10)))) + ((< n 62) + (integer->char (+ (char->integer #\a) (- n 36)))) + (else + (error "integer out of bounds" n)))) + +(define (random-string len) + "Compute a random string of size LEN where each character is alphanumeric." + (let loop ((chars '()) + (len len)) + (if (zero? len) + (list->string chars) + (let ((n (random 62 %seed))) + (loop (cons (integer->alphanumeric-char n) chars) + (- len 1)))))) + +(define (generate-worker-name) + "Return the service name of the server." + (string-append (gethostname) "-" (random-string 4))) + + +;;; +;;; Store publishing. +;;; + +(define (add-substitute-url store url) + "Add URL to the list of STORE substitutes-urls." + (set-build-options store + #:use-substitutes? #t + #:fallback? #f + #:keep-going? #t + #:print-build-trace #t + #:build-verbosity 1 + #:substitute-urls + (cons url %default-substitute-urls))) + +(define* (publish-server port + #:key + public-key + private-key) + "This procedure starts a publishing server listening on PORT in a new +process and returns the pid of the forked process. Use PUBLIC-KEY and +PRIVATE-KEY to sign narinfos." + (match (primitive-fork) + (0 + (parameterize ((%public-key public-key) + (%private-key private-key)) + (with-store store + (let* ((address (make-socket-address AF_INET INADDR_ANY 0)) + (socket-address + (make-socket-address (sockaddr:fam address) + (sockaddr:addr address) + port)) + (socket (open-server-socket socket-address))) + (run-publish-server socket store + #:compressions + (list %default-gzip-compression)))))) + (pid pid))) + + +;;; +;;; ZMQ. +;;; + +(define %zmq-context + (zmq-create-context)) + +(define (zmq-frontend-socket-name) + "Return the name of the ZMQ frontend socket." + (string-append %state-directory "/remote-build-socket")) + +(define (zmq-frontend-endpoint) + "Return a ZMQ endpoint allowing client connections using the IPC transport." + (string-append "ipc://" (zmq-frontend-socket-name))) + +(define (EINTR-safe proc) + "Return a variant of PROC that catches EINTR 'zmq-error' exceptions and +retries a call to PROC." + (define (safe . args) + (catch 'zmq-error + (lambda () + (apply proc args)) + (lambda (key errno . rest) + (if (= errno EINTR) + (apply safe args) + (apply throw key errno rest))))) + + safe) + +(define zmq-poll* + ;; Return a variant of ZMQ-POLL that catches EINTR errors. + (EINTR-safe zmq-poll)) + +(define (zmq-socket-ready? items socket) + "Return #t if the given SOCKET is part of ITEMS, a list returned by a +'zmq-poll' call, return #f otherwise." + (find (lambda (item) + (eq? (poll-item-socket item) socket)) + items)) + +(define (zmq-read-message msg) + (call-with-input-string msg read)) + +(define (zmq-empty-delimiter) + "Return an empty ZMQ delimiter used to format message envelopes." + (make-bytevector 0)) + +;; ZMQ Messages. +(define* (zmq-build-request-message drv #:optional system) + "Return a message requesting the build of DRV for SYSTEM." + (format #f "~s" `(build (drv ,drv) (system ,system)))) + +(define (zmq-no-build-message) + "Return a message that indicates that no builds are available." + (format #f "~s" `(no-build))) + +(define (zmq-build-started-message drv) + "Return a message that indicates that the build of DRV has started." + (format #f "~s" `(build-started (drv ,drv)))) + +(define (zmq-build-failed-message drv) + "Return a message that indicates that the build of DRV has failed." + (format #f "~s" `(build-failed (drv ,drv)))) + +(define (zmq-build-succeeded-message drv url) + "Return a message that indicates that the build of DRV is done." + (format #f "~s" `(build-succeeded (drv ,drv) (url ,url)))) + +(define (zmq-worker-ready-message worker) + "Return a message that indicates that WORKER is ready." + (format #f "~s" `(worker-ready ,worker))) + +(define (zmq-worker-request-work-message name) + "Return a message that indicates that WORKER is requesting work." + (format #f "~s" `(worker-request-work ,name))) + + +;;; +;;; Remote builds. +;;; + +(define remote-server-service-type + "_remote-server._tcp") + +(define (remote-build-socket) + "Return a socket used to communicate with the remote build server." + (let ((socket (zmq-create-socket %zmq-context ZMQ_DEALER)) + (endpoint (zmq-frontend-endpoint))) + (zmq-connect socket endpoint) + socket)) + +(define* (remote-build socket drvs systems) + "Builds DRVS using the remote build mechanism. A build command is sent on +SOCKET to the build server for each derivation. + +SYSTEMS is a list describing the systems of each derivations in the DRVS list. +It is used for performance reasons, so that the remote server doesn't need to +call 'read-derivation-from-file' for each derivation, which can be an +expensive operation." + (for-each + (lambda (drv system) + ;; We need to prefix the command with an empty delimiter + ;; because the DEALER socket is connected to a ROUTER + ;; socket. See "zmq-start-proxy" procedure. + (zmq-send-msg-parts-bytevector + socket + (list (make-bytevector 0) + (string->bv (zmq-build-request-message drv system))))) + drvs systems)) + +(define* (remote-build-poll socket event-proc + #:key + (timeout 1000)) + "Poll SOCKET for messages and call EVENT-PROC each time a build event is +received, return if no event occured for TIMEOUT milliseconds." + (define (parse-result result) + (match (zmq-read-message result) + (('build-started ('drv drv)) + (event-proc (list 'build-started drv))) + (('build-succeeded ('drv drv) ('url url)) + (event-proc (list 'build-succeeded drv))) + (('build-failed ('drv drv)) + (event-proc (list 'build-failed drv))))) + + (let* ((poll-items (list + (poll-item socket ZMQ_POLLIN))) + (items (zmq-poll* poll-items timeout))) + (when (zmq-socket-ready? items socket) + (match (zmq-get-msg-parts-bytevector socket '()) + ((empty result) + (parse-result (bv->string result))))))) -- 2.29.2