From 4036ce5de7bf3b98327010bbfbf75029f3d0b572 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=8B=E6=96=87=E6=AD=A6?= Date: Fri, 8 Sep 2017 22:49:03 +0800 Subject: [PATCH] download: Report the progress asynchronously in another thread. * guix/utils.scm (): New record type. (call-with-progress-reporter): New procedure. * guix/build/download.scm (dump-port*, progress-reporter/file): New procedures. (ftp-fetch, http-fetch): Use them. (progress-proc): Remove procedure. * guix/scripts/substitute.scm (progress-report-port): Rewrite in terms of . (process-substitution): Adjust accordingly. --- guix/build/download.scm | 169 ++++++++++++++++++++++++++------------------ guix/scripts/substitute.scm | 54 +++++++------- guix/utils.scm | 32 ++++++++- 3 files changed, 159 insertions(+), 96 deletions(-) diff --git a/guix/build/download.scm b/guix/build/download.scm index bcf22663b..7c712ca94 100644 --- a/guix/build/download.scm +++ b/guix/build/download.scm @@ -27,6 +27,7 @@ #:use-module (guix base64) #:use-module (guix ftp-client) #:use-module (guix build utils) + #:use-module (guix utils) #:use-module (rnrs io ports) #:use-module (rnrs bytevectors) #:use-module (srfi srfi-1) @@ -36,6 +37,8 @@ #:autoload (ice-9 ftw) (scandir) #:use-module (ice-9 match) #:use-module (ice-9 format) + #:use-module (ice-9 atomic) + #:use-module (ice-9 threads) #:export (open-socket-for-uri open-connection-for-uri %x509-certificate-directory @@ -45,7 +48,7 @@ url-fetch byte-count->string current-terminal-columns - progress-proc + progress-reporter/file uri-abbreviation nar-uri-abbreviation store-path-abbreviation)) @@ -148,65 +151,92 @@ Otherwise return STORE-PATH." (define time-monotonic time-tai)) (else #t)) -(define* (progress-proc file size - #:optional (log-port (current-output-port)) - #:key (abbreviation basename)) - "Return a procedure to show the progress of FILE's download, which is SIZE -bytes long. The returned procedure is suitable for use as an argument to -`dump-port'. The progress report is written to LOG-PORT, with ABBREVIATION -used to shorten FILE for display." - ;; XXX: Because of this procedure is often not - ;; called as frequently as we'd like too; this is especially bad with Nginx - ;; on hydra.gnu.org, which returns whole nars as a single chunk. - (let ((start-time #f)) - (let-syntax ((with-elapsed-time - (syntax-rules () - ((_ elapsed body ...) - (let* ((now (current-time time-monotonic)) - (elapsed (and start-time - (duration->seconds - (time-difference now - start-time))))) - (unless start-time - (set! start-time now)) - body ...))))) - (if (number? size) - (lambda (transferred cont) - (with-elapsed-time elapsed - (let* ((% (* 100.0 (/ transferred size))) - (throughput (if elapsed - (/ transferred elapsed) - 0)) - (left (format #f " ~a ~a" - (abbreviation file) - (byte-count->string size))) - (right (format #f "~a/s ~a ~a~6,1f%" - (byte-count->string throughput) - (seconds->string elapsed) - (progress-bar %) %))) - (display "\r\x1b[K" log-port) - (display (string-pad-middle left right - (current-terminal-columns)) - log-port) - (flush-output-port log-port) - (cont)))) - (lambda (transferred cont) - (with-elapsed-time elapsed - (let* ((throughput (if elapsed - (/ transferred elapsed) - 0)) - (left (format #f " ~a" - (abbreviation file))) - (right (format #f "~a/s ~a | ~a transferred" - (byte-count->string throughput) - (seconds->string elapsed) - (byte-count->string transferred)))) - (display "\r\x1b[K" log-port) - (display (string-pad-middle left right - (current-terminal-columns)) - log-port) - (flush-output-port log-port) - (cont)))))))) + +;; TODO: replace '(@ (guix build utils) dump-port))'. +(define* (dump-port* in out + #:key (buffer-size 16384) + (reporter (make-progress-reporter noop noop noop))) + "Read as much data as possible from IN and write it to OUT, using chunks of +BUFFER-SIZE bytes. After each successful transfer of BUFFER-SIZE bytes or +less, report the total number of bytes transferred to the REPORTER, which +should be a object." + (define buffer + (make-bytevector buffer-size)) + + (call-with-progress-reporter reporter + (lambda (report) + (let loop ((total 0) + (bytes (get-bytevector-n! in buffer 0 buffer-size))) + (or (eof-object? bytes) + (let ((total (+ total bytes))) + (put-bytevector out buffer 0 bytes) + (report total) + (loop total (get-bytevector-n! in buffer 0 buffer-size)))))))) + +(define* (progress-reporter/file file size + #:optional (log-port (current-output-port)) + #:key (abbreviation basename)) + "Return a object to show the progress of FILE's download, +which is SIZE bytes long. The progress report is written to LOG-PORT, with +ABBREVIATION used to shorten FILE for display." + (let ((thread #f) + (%transferred (make-atomic-box 0))) + (define (report-progress) + "Continuously write the progress report to LOG-PORT." + (define start-time (current-time time-monotonic)) + (define (render) + (define transferred (atomic-box-ref %transferred)) + (define elapsed (duration->seconds + (time-difference + (current-time time-monotonic) + start-time))) + (if (number? size) + (let* ((% (* 100.0 (/ transferred size))) + (throughput (/ transferred elapsed)) + (left (format #f " ~a ~a" + (abbreviation file) + (byte-count->string size))) + (right (format #f "~a/s ~a ~a~6,1f%" + (byte-count->string throughput) + (seconds->string elapsed) + (progress-bar %) %))) + (display "\r\x1b[K" log-port) + (display (string-pad-middle left right + (current-terminal-columns)) + log-port) + (flush-output-port log-port)) + (let* ((throughput (/ transferred elapsed)) + (left (format #f " ~a" + (abbreviation file))) + (right (format #f "~a/s ~a | ~a transferred" + (byte-count->string throughput) + (seconds->string elapsed) + (byte-count->string transferred)))) + (display "\r\x1b[K" log-port) + (display (string-pad-middle left right + (current-terminal-columns)) + log-port) + (flush-output-port log-port)))) + + (dynamic-wind + noop + (lambda () + (let loop () + ;; Report the progress every 300ms. + (render) + (usleep 300000) + (loop))) + ;; And don't miss the last report. + render)) + + (progress-reporter + (start (lambda () + (set! thread (make-thread report-progress)))) + (report (lambda (value) + (atomic-box-set! %transferred value))) + (stop (lambda () + (cancel-thread thread) + (join-thread thread)))))) (define* (uri-abbreviation uri #:optional (max-length 42)) "If URI's string representation is larger than MAX-LENGTH, return an @@ -264,9 +294,10 @@ out if the connection could not be established in less than TIMEOUT seconds." (dirname (uri-path uri))))) (call-with-output-file file (lambda (out) - (dump-port in out - #:buffer-size %http-receive-buffer-size - #:progress (progress-proc (uri-abbreviation uri) size)))) + (dump-port* in out + #:buffer-size %http-receive-buffer-size + #:reporter (progress-reporter/file + (uri-abbreviation uri) size)))) (ftp-close conn)) (newline) @@ -755,10 +786,10 @@ certificates; otherwise simply ignore them." (lambda (p) (if (port? bv-or-port) (begin - (dump-port bv-or-port p - #:buffer-size %http-receive-buffer-size - #:progress (progress-proc (uri-abbreviation uri) - size)) + (dump-port* bv-or-port p + #:buffer-size %http-receive-buffer-size + #:reporter (progress-reporter/file + (uri-abbreviation uri) size)) (newline)) (put-bytevector p bv-or-port)))) file)) @@ -863,8 +894,8 @@ otherwise simply ignore them." hashes)) content-addressed-mirrors)) - ;; Make this unbuffered so 'progress-proc' works as expected. _IOLBF means - ;; '\n', not '\r', so it's not appropriate here. + ;; Make this unbuffered so 'progress-report/file' works as expected. _IOLBF + ;; means '\n', not '\r', so it's not appropriate here. (setvbuf (current-output-port) _IONBF) (setvbuf (current-error-port) _IOLBF) diff --git a/guix/scripts/substitute.scm b/guix/scripts/substitute.scm index 0d36997bc..f7d523fd6 100755 --- a/guix/scripts/substitute.scm +++ b/guix/scripts/substitute.scm @@ -34,7 +34,8 @@ #:use-module ((guix build utils) #:select (mkdir-p dump-port)) #:use-module ((guix build download) #:select (current-terminal-columns - progress-proc uri-abbreviation nar-uri-abbreviation + progress-reporter/file + uri-abbreviation nar-uri-abbreviation (open-connection-for-uri . guix:open-connection-for-uri) close-connection @@ -772,23 +773,25 @@ was found." (= (string-length file) 32))))) (narinfo-cache-directories directory))) -(define (progress-report-port report-progress port) - "Return a port that calls REPORT-PROGRESS every time something is read from -PORT. REPORT-PROGRESS is a two-argument procedure such as that returned by -`progress-proc'." - (define total 0) - (define (read! bv start count) - (let ((n (match (get-bytevector-n! port bv start count) - ((? eof-object?) 0) - (x x)))) - (set! total (+ total n)) - (report-progress total (const n)) - ;; XXX: We're not in control, so we always return anyway. - n)) - - (make-custom-binary-input-port "progress-port-proc" - read! #f #f - (cut close-connection port))) +(define (progress-report-port reporter port) + "Return a port that continuously reports the bytes read from PORT using +REPORTER, which should be a object." + (match reporter + (($ start report stop) + (let* ((total 0) + (read! (lambda (bv start count) + (let ((n (match (get-bytevector-n! port bv start count) + ((? eof-object?) 0) + (x x)))) + (set! total (+ total n)) + (report total) + n)))) + (start) + (make-custom-binary-input-port "progress-port-proc" + read! #f #f + (lambda () + (close-connection port) + (stop))))))) (define-syntax with-networking (syntax-rules () @@ -903,12 +906,11 @@ DESTINATION as a nar file. Verify the substitute against ACL." (dl-size (or download-size (and (equal? comp "none") (narinfo-size narinfo)))) - (progress (progress-proc (uri->string uri) - dl-size - (current-error-port) - #:abbreviation - nar-uri-abbreviation))) - (progress-report-port progress raw))) + (reporter (progress-reporter/file + (uri->string uri) dl-size + (current-error-port) + #:abbreviation nar-uri-abbreviation))) + (progress-report-port reporter raw))) ((input pids) (decompressed-port (and=> (narinfo-compression narinfo) string->symbol) @@ -916,8 +918,8 @@ DESTINATION as a nar file. Verify the substitute against ACL." ;; Unpack the Nar at INPUT into DESTINATION. (restore-file input destination) - ;; Skip a line after what 'progress-proc' printed, and another one to - ;; visually separate substitutions. + ;; Skip a line after what 'progress-reporter/file' printed, and another + ;; one to visually separate substitutions. (display "\n\n" (current-error-port)) (every (compose zero? cdr waitpid) pids)))) diff --git a/guix/utils.scm b/guix/utils.scm index ab43ed400..e986ccd4f 100644 --- a/guix/utils.scm +++ b/guix/utils.scm @@ -33,6 +33,7 @@ #:autoload (rnrs io ports) (make-custom-binary-input-port) #:use-module ((rnrs bytevectors) #:select (bytevector-u8-set!)) #:use-module (guix memoization) + #:use-module (guix records) #:use-module ((guix build utils) #:select (dump-port mkdir-p)) #:use-module ((guix build syscalls) #:select (mkdtemp! fdatasync)) #:use-module (ice-9 format) @@ -94,7 +95,13 @@ call-with-decompressed-port compressed-output-port call-with-compressed-output-port - canonical-newline-port)) + canonical-newline-port + + + progress-reporter + make-progress-reporter + progress-reporter? + call-with-progress-reporter)) ;;; @@ -747,3 +754,26 @@ a location object." `((line . ,(and=> (location-line loc) 1-)) (column . ,(location-column loc)) (filename . ,(location-file loc)))) + + +;;; +;;; Progress reporter. +;;; + +(define-record-type* + progress-reporter make-progress-reporter progress-reporter? + (start progress-reporter-start) ; thunk + (report progress-reporter-report) ; procedure + (stop progress-reporter-stop)) ; thunk + +(define (call-with-progress-reporter reporter proc) + "Start REPORTER for progress reporting, and call @code{(@var{proc} report)} +with the resulting report procedure. When @var{proc} returns, the REPORTER is +stopped." + (match reporter + (($ start report stop) + (dynamic-wind start (lambda () (proc report)) stop)))) + +;;; Local Variables: +;;; eval: (put 'call-with-progress-reporter 'scheme-indent-function 1) +;;; End: -- 2.13.3