unofficial mirror of guile-devel@gnu.org 
 help / color / mirror / Atom feed
From: Stefan Israelsson Tampe <stefan.itampe@gmail.com>
To: Guile User <guile-user@gnu.org>, guile-devel <guile-devel@gnu.org>
Subject: Re: stis engine
Date: Wed, 25 Aug 2021 00:31:25 +0200	[thread overview]
Message-ID: <CAGua6m3b2Prj6u8d0sdrmeOdKUjVY2P_KM1d+LaTw_QF2SwxLQ@mail.gmail.com> (raw)
In-Reply-To: <CAGua6m2VHdcNa0wkxLxEPzVQYDXkU0FKgQF0JoWSf6ybiF8dKw@mail.gmail.com>

;; Oups I managed to send the message by accident without finishing it.
;; The server part is similar and I will drop the actual pipeline and
highlight the differenece
;; which is that we will get a message stream it to an scheme object call
server-.lambda below
;; with it and finally from that lambdas return value reply to the client
we will create a loop where
;; the server waits for questions

(define* (make-server server-lambda address ip-server? #key (context #f))

  (define schemer (fpipe-schemer ch1 ch2))

  (spawn-fiber
   (lambda ()
     (let lp ()
       (schemer %fpipe-eof%)
       (lp)))))

Finally the idea is to use it as


(define context    (make-zmq-context))
(define address    "") ;; ZeroMQ address
(define ip-server? #t) ;; if we will use bind or connect

(make-server (lambda (x) (cons '() x)) context address ip-server?)

(define client (make-client context address ip-server?))
  ;; send a mesage that is not compressed
  > (client "abc")
  "abc"

  > (length (client (iota 1000000) #:compress? #t))
  1000000

in a run fibers context

On Wed, Aug 25, 2021 at 12:25 AM Stefan Israelsson Tampe <
stefan.itampe@gmail.com> wrote:

> I have now made a fiber enables stream library called fpipe in my
> stis-engine repository see
>
> https://gitlab.com/tampe/stis-engine/-/tree/master/
>
> The idea is to focus on a high performance byte streaming library on top
> of wingo's fibers library and make heavy use of bytevector buffers. We will
> also gp between a stream of scheme
> values and these byte streams to seamlessly be able to integrate a good
> overview of the data pipeline.
>
> The following code uses a c-based serializer and deserializer of scheme
> data structures
> and allow for optional streamed compression and decompression and
> transport it over ZeroMQ
> networking which allow for thread/process/computer movement of data. The
> end result is a way to create servers and clients.
>
> It is instructive to show the code for the client and server pipelines is
> constructed tp show off the fpipes library. This is not the final design
> but moste components are done
>
> Here is the client
>
> (define* (make-client address ip-server? #key (context #f))
>   ;; First we setup the zmq networking
>   (define ctx    (if context context (make-zmq-context)))
>   (define socket (zmq-socket context ZMQ_REQ))
>
>   (if ip-server?
>       (zmq-bind      socket address)
>       (zmq-connect socket address))
>
>   ;; we will define to fiber channels, channel in = ch1 and channel out =
> ch2
>   (define-values (ch1 ch2)
>
>      ;; fpipe-construct is the general pipelining macro
>     (fpipe-construct
>
>       ;; this is a scheme condition that will match check a message
> bounded to it
>      (cond
>       (#:scm it)
>
>        ;; format of the matcher is (predicate . translatot) where if
> predicate is true we will
>        ;; push the message to the branching pipline this assumes a message
> is the form
>        ;; ((list-of-features) . message)
>       (((memq 'compress (car it)) #:tr (cdr it))
>
>        ;;  the c-based stremed serializer that integrates nicely with
> fibers and streams
>        ;;  the message transport is the form scm->bytesteam
>
>        (mk-c-atom->fpipe)
>
>        ;; the zlib compressor node will tarnsport as bytestream->bytestream
>        compress-from-fpipe-to-fpipe
>
>        ;; a bytestream->bytestream that will prepend a message with 1 to
> indicate that the stream
>        ;; has been compressed
>        #:prepend #u8(1))
>
>       ;;  if we do not have the compress feature then we will simply
> generate the stream and
>       ;; prepend a one e.g. not doing any compression
>       (else
>        (mk-c-atom->fpipe)
>        #:prepend #u8(0)))
>
>      ;; transport the message byetstream over the zmq socket this will
> retrun in a scheme
>       ;; stream where eof will survive as all control messages are and
> will initiate the next
>       ;; reading from the socket (when the request message has been fully
> sent.
>      (fpipe->zmq socket)
>
>       ;; so here we get the return message
>      (zmq->fpipe socket)
>
>      ;; This is a bytestream cond and has no it part,
>      (cond
>        ;; We try to match the beginning of the bytestream message and if
> it starts with 1
>        ;; then we know that the reply message has been compressed
>       ((#:match u8(1) #:skip 1)
>        decompress-from-pipe-to-pipe)
>
>       ;; else no compression.
>       ((else #:skip 1)
>        ))
>
>      ;; the final step is to take the bytestream and make a scheme object
> and put that
>      ;; to the scheme stream and the pipe is finished
>      (mk-fpipe->c-atom)))
>
>    ;; fpipe-scheme takes a piplend from scm to scm and creates a function
> of it.
>    ;; each time the function is called with a scheme object we will send
> it ot the server
>    ;; from the return message create a scheme object that is returned from
> the funciton
>   (define action (fpipe-schemer ch1 ch2))
>
>   ;; A little nicer interface and we are finished
>   (lambda* (message #:key (compress? #f))
>      (action (cons (if compress? '(compress) '()) message))))
>
>
> ;; SERVER
> (define* (make-server server-lambda address ip-server? #key (context #f))
>
>   (define schemer (fpipe-schemer ch1 ch2))
>
>   (spawn-fiber
>    (lambda ()
>      (let lp ()
>        (schemer %fpipe-eof%)
>        (lp)))))
>
>


      reply	other threads:[~2021-08-24 22:31 UTC|newest]

Thread overview: 2+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-08-24 22:25 stis engine Stefan Israelsson Tampe
2021-08-24 22:31 ` Stefan Israelsson Tampe [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://www.gnu.org/software/guile/

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=CAGua6m3b2Prj6u8d0sdrmeOdKUjVY2P_KM1d+LaTw_QF2SwxLQ@mail.gmail.com \
    --to=stefan.itampe@gmail.com \
    --cc=guile-devel@gnu.org \
    --cc=guile-user@gnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).