(use-modules (web uri) (web request) (web response) (web client) (web http) (srfi srfi-1) (ice-9 threads) (ice-9 match) (rnrs bytevectors) (srfi srfi-11) (srfi srfi-9) (srfi srfi-9 gnu) (srfi srfi-26) (gnutls) (ice-9 binary-ports) ((ice-9 ftw) #:select (scandir)) ((rnrs io ports) #:prefix rnrs-ports:)) (define* (call-with-streaming-http-request uri callback #:key (headers '())) (let* ((port (open-socket-for-uri uri)) (request (build-request uri #:method 'PUT #:version '(1 . 1) #:headers `((connection close) (Transfer-Encoding . "chunked") (Content-Type . "application/octet-stream") ,@headers) #:port port))) (set-port-encoding! port "ISO-8859-1") (let ((request (write-request request port))) (let ((chunked-output-port (make-chunked-output-port port #:buffering 128 #:keep-alive? #t))) ;; A SIGPIPE will kill Guile, so ignore it (sigaction SIGPIPE (lambda (arg) (simple-format (current-error-port) "warning: SIGPIPE\n"))) (set-port-encoding! chunked-output-port "ISO-8859-1") (callback chunked-output-port) (retry-gnutls-resource-temporarily-unavailable (lambda () (close-port chunked-output-port))) (display "\r\n" port) (force-output port)) (let ((response (read-response port))) (let ((body (read-response-body response))) (close-port port) (values response body)))))) (define (retry-gnutls-resource-temporarily-unavailable thunk) (catch 'gnutls-error thunk (lambda (key err proc . rest) (if (eq? error/again err) (begin (simple-format (current-error-port) "error/again\n") (sleep 1) (thunk)) (throw key (cons* err proc rest)))))) (define (start-thread thread-index) (call-with-new-thread (lambda () (for-each (lambda (request-index) (with-throw-handler #t (lambda () (call-with-streaming-http-request ;; The URL doesn't realy matter as the response to the ;; request doesn't matter. (peek (string->uri (if (= thread-index 1) "https://guix.cbaines.net/test" "https://www.cbaines.net/test"))) (lambda (port) (simple-format (current-error-port) "thread ~A making request\n" thread-index) (let* ((buffer-size 1024) (buffer (make-bytevector buffer-size))) (for-each (lambda (index) ;; (usleep 10) (retry-gnutls-resource-temporarily-unavailable (lambda () (put-bytevector port buffer 0 buffer-size)))) (iota 512)))))) (lambda (key . args) (simple-format #t "thread ~A: exception: ~A ~A\n" thread-index key args) (backtrace)))) (iota 2 1))))) ;; (define threads ;; (list (start-thread 1))) ;; (for-each join-thread threads) ;; (define threads ;; (list (start-thread 1))) ;; (for-each join-thread threads) ;; (define threads ;; (list (start-thread 1))) ;; (for-each join-thread threads) ;; (simple-format (current-error-port) ;; "\ntrying concurrent threads\n\n") (define threads (map start-thread (iota 2 1))) (for-each join-thread threads)