unofficial mirror of 
 help / color / mirror / Atom feed
* Anouncement stis-engine 0.1
@ 2021-08-31  0:08 Stefan Israelsson Tampe
  0 siblings, 0 replies; only message in thread
From: Stefan Israelsson Tampe @ 2021-08-31  0:08 UTC (permalink / raw)
  To: Guile User, guile-devel

Hi guilers!

I am now happy to announce the first version 0.1 stis-engine. With this
tool you can manage data streams with quite good throughput.

Consider a case with streaming objects over the network and allow to have a
prefix of the
stream where  you control what kind of serializer are used and if streamed
zipping is done.
Below is the server and client implementation in
module/fibers/stis-parser/q.scm. 1 400MB big
bytevector transports from the client to the server and back with about one
second. I did the transport over a zmq  thread communication link. We reach
quite high throughput as we only do logic in the header of the stream so we
can copy the buffers themselves when we can instead
of copying the bytes.

The sizes of the buffers is quite large as to make sure that the fiber
overhead is not too large
The main constraint is from copying bytes, just the serializer and
deserialiser used to copy an object would be 10x the speed. But this is for
simple data structures like bytevectors. Already a list of numbers instead
of bytevector the serialisation and deserialisation starts to dominate as
bytestream operations is essentially memmove memcpy and those are insanely

Some other examples are audio and image streams. you can glue stream
operations together
in guile and have close to C speed for large amount of data.

Here are the code:
;;We can make an abstractions as such:
(define-fpipe-construct #:scm (zmq->atom socket)
    (zmq->fpipe socket)                      ; read from the network to a
byte stream
    (let ((it)                                           ; pick up the
first byte (it1 ...) means the bytes prefix
            (opt?  (= 1 (logand it 1)))
            (text? (= 2 (logand it 2))))

        (fpipe-skip 1)                              ; skip the prefix

        (when opt?
            (uncompress-from-fpipe-to-fpipe))   ; id the stream was
compressed decompress

       (if text?                                                  ; if text
the transport is in cleartext e.g. scheme

(define-fpipe-construct #:scm (atom->zmq socket)
     (let ( it                                                     ;; This
picks a scheme object (header . payload)
           (header (car it))
           (opt?   (memq 'compress header))
           (text?  (memq 'text     header))
           (tag     (logior (if opt? 1 0) (if text? 2 0))))

       (if text?

       (when opt?
           (compress-from-fpipe-to-fpipe #:level 3))

       (fpipe-prepend tag)                       ; prepend the stream with
the tag ...

       (fpipe->zmq socket)))

;;Now we can make a server and a client out like so,

(define* (make-client address ip-server? #:key (context #f))
  (define ctx    (if context context (zmq-init)))
  (define socket (zmq-socket ctx ZMQ_REQ))

  (if ip-server?
      (zmq-bind    socket address)
      (zmq-connect socket address))

  ;; here we construct the scheme pipeline using the abstraction
  (define-values (ch1 ch2)
     (atom->zmq socket)
     (zmq->atom socket)))

  (define action (fpipe-schemer ch1 ch2))
  (lambda* (message #:key (compress? #f))
    (cdr (action (cons (if compress? '(compress) '()) message)))))

;; the server:
(define* (run-server server-lambda address ip-server? #:key (context #f))
  (define ctx    (if context context (zmq-init)))
  (define socket (zmq-socket ctx ZMQ_REP))
  (define (lam x)
    (call-with-values server-lambda
      (lambda* (message #:key (compress? #f))
         (cons (if compress? '(compress) '()) message))))

  (if ip-server?
      (zmq-bind    socket address)
      (zmq-connect socket address))

  ;; the pipeline using the abstractions
  (define-values (ch1 ch2)
          (zmq->atom socket)
          (fpipe-map server-lambda)
          (atom->zmq socket)))

  (define schemer (fpipe-schemer ch1 ch2))

   (lambda ()
       (let lp ()
           (schemer %fpipe-eof%)
      #:parallel? #f))

^ permalink raw reply	[flat|nested] only message in thread

only message in thread, other threads:[~2021-08-31  0:08 UTC | newest]

Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-08-31  0:08 Anouncement stis-engine 0.1 Stefan Israelsson Tampe

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).