unofficial mirror of guile-user@gnu.org 
 help / color / mirror / Atom feed
From: Stefan Israelsson Tampe <stefan.itampe@gmail.com>
To: Guile User <guile-user@gnu.org>, guile-devel <guile-devel@gnu.org>
Subject: Anouncement stis-engine 0.1
Date: Tue, 31 Aug 2021 02:08:54 +0200	[thread overview]
Message-ID: <CAGua6m3Do739-eTOXMoQb9e1tiVf8UsNmGy2yskNaiDUTNva7g@mail.gmail.com> (raw)

Hi guilers!

I am now happy to announce the first version 0.1 stis-engine. With this
tool you can manage data streams with quite good throughput.

Example:
Consider a case with streaming objects over the network and allow to have a
prefix of the
stream where  you control what kind of serializer are used and if streamed
zipping is done.
Below is the server and client implementation in
module/fibers/stis-parser/q.scm. 1 400MB big
bytevector transports from the client to the server and back with about one
second. I did the transport over a zmq  thread communication link. We reach
quite high throughput as we only do logic in the header of the stream so we
can copy the buffers themselves when we can instead
of copying the bytes.

The sizes of the buffers is quite large as to make sure that the fiber
overhead is not too large
The main constraint is from copying bytes, just the serializer and
deserialiser used to copy an object would be 10x the speed. But this is for
simple data structures like bytevectors. Already a list of numbers instead
of bytevector the serialisation and deserialisation starts to dominate as
bytestream operations is essentially memmove memcpy and those are insanely
optimized.

Some other examples are audio and image streams. you can glue stream
operations together
in guile and have close to C speed for large amount of data.

Here are the code:
;;We can make an abstractions as such:
(define-fpipe-construct #:scm (zmq->atom socket)
    (zmq->fpipe socket)                      ; read from the network to a
byte stream
    (let ((it)                                           ; pick up the
first byte (it1 ...) means the bytes prefix
            (opt?  (= 1 (logand it 1)))
            (text? (= 2 (logand it 2))))

        (fpipe-skip 1)                              ; skip the prefix

        (when opt?
            (uncompress-from-fpipe-to-fpipe))   ; id the stream was
compressed decompress

       (if text?                                                  ; if text
the transport is in cleartext e.g. scheme
            (fpipe->scm)
            (fpipe->c-atom))))


(define-fpipe-construct #:scm (atom->zmq socket)
     (let ( it                                                     ;; This
picks a scheme object (header . payload)
           (header (car it))
           (opt?   (memq 'compress header))
           (text?  (memq 'text     header))
           (tag     (logior (if opt? 1 0) (if text? 2 0))))

       (if text?
           (scm->fpipe)
           (c-atom->fpipe))

       (when opt?
           (compress-from-fpipe-to-fpipe #:level 3))

       (fpipe-prepend tag)                       ; prepend the stream with
the tag ...

       (fpipe->zmq socket)))


;;Now we can make a server and a client out like so,

(define* (make-client address ip-server? #:key (context #f))
  (define ctx    (if context context (zmq-init)))
  (define socket (zmq-socket ctx ZMQ_REQ))

  (if ip-server?
      (zmq-bind    socket address)
      (zmq-connect socket address))

  ;; here we construct the scheme pipeline using the abstraction
  (define-values (ch1 ch2)
    (fpipe-construct
     (atom->zmq socket)
     (zmq->atom socket)))

  (define action (fpipe-schemer ch1 ch2))
  (lambda* (message #:key (compress? #f))
    (cdr (action (cons (if compress? '(compress) '()) message)))))

;; the server:
(define* (run-server server-lambda address ip-server? #:key (context #f))
  (define ctx    (if context context (zmq-init)))
  (define socket (zmq-socket ctx ZMQ_REP))
  (define (lam x)
    (call-with-values server-lambda
      (lambda* (message #:key (compress? #f))
         (cons (if compress? '(compress) '()) message))))

  (if ip-server?
      (zmq-bind    socket address)
      (zmq-connect socket address))

  ;; the pipeline using the abstractions
  (define-values (ch1 ch2)
      (fpipe-construct
          (zmq->atom socket)
          (fpipe-map server-lambda)
          (atom->zmq socket)))

  (define schemer (fpipe-schemer ch1 ch2))

  (spawn-fiber
   (lambda ()
       (let lp ()
           (schemer %fpipe-eof%)
           (lp)))
      #:parallel? #f))


                 reply	other threads:[~2021-08-31  0:08 UTC|newest]

Thread overview: [no followups] expand[flat|nested]  mbox.gz  Atom feed

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://www.gnu.org/software/guile/

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=CAGua6m3Do739-eTOXMoQb9e1tiVf8UsNmGy2yskNaiDUTNva7g@mail.gmail.com \
    --to=stefan.itampe@gmail.com \
    --cc=guile-devel@gnu.org \
    --cc=guile-user@gnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).