unofficial mirror of guile-devel@gnu.org 
 help / color / mirror / Atom feed
* stis engine
@ 2021-08-24 22:25 Stefan Israelsson Tampe
  2021-08-24 22:31 ` Stefan Israelsson Tampe
  0 siblings, 1 reply; 2+ messages in thread
From: Stefan Israelsson Tampe @ 2021-08-24 22:25 UTC (permalink / raw)
  To: Guile User, guile-devel

[-- Attachment #1: Type: text/plain, Size: 4016 bytes --]

I have now made a fiber enables stream library called fpipe in my
stis-engine repository see

https://gitlab.com/tampe/stis-engine/-/tree/master/

The idea is to focus on a high performance byte streaming library on top of
wingo's fibers library and make heavy use of bytevector buffers. We will
also gp between a stream of scheme
values and these byte streams to seamlessly be able to integrate a good
overview of the data pipeline.

The following code uses a c-based serializer and deserializer of scheme
data structures
and allow for optional streamed compression and decompression and transport
it over ZeroMQ
networking which allow for thread/process/computer movement of data. The
end result is a way to create servers and clients.

It is instructive to show the code for the client and server pipelines is
constructed tp show off the fpipes library. This is not the final design
but moste components are done

Here is the client

(define* (make-client address ip-server? #key (context #f))
  ;; First we setup the zmq networking
  (define ctx    (if context context (make-zmq-context)))
  (define socket (zmq-socket context ZMQ_REQ))

  (if ip-server?
      (zmq-bind      socket address)
      (zmq-connect socket address))

  ;; we will define to fiber channels, channel in = ch1 and channel out =
ch2
  (define-values (ch1 ch2)

     ;; fpipe-construct is the general pipelining macro
    (fpipe-construct

      ;; this is a scheme condition that will match check a message bounded
to it
     (cond
      (#:scm it)

       ;; format of the matcher is (predicate . translatot) where if
predicate is true we will
       ;; push the message to the branching pipline this assumes a message
is the form
       ;; ((list-of-features) . message)
      (((memq 'compress (car it)) #:tr (cdr it))

       ;;  the c-based stremed serializer that integrates nicely with
fibers and streams
       ;;  the message transport is the form scm->bytesteam

       (mk-c-atom->fpipe)

       ;; the zlib compressor node will tarnsport as bytestream->bytestream
       compress-from-fpipe-to-fpipe

       ;; a bytestream->bytestream that will prepend a message with 1 to
indicate that the stream
       ;; has been compressed
       #:prepend #u8(1))

      ;;  if we do not have the compress feature then we will simply
generate the stream and
      ;; prepend a one e.g. not doing any compression
      (else
       (mk-c-atom->fpipe)
       #:prepend #u8(0)))

     ;; transport the message byetstream over the zmq socket this will
retrun in a scheme
      ;; stream where eof will survive as all control messages are and will
initiate the next
      ;; reading from the socket (when the request message has been fully
sent.
     (fpipe->zmq socket)

      ;; so here we get the return message
     (zmq->fpipe socket)

     ;; This is a bytestream cond and has no it part,
     (cond
       ;; We try to match the beginning of the bytestream message and if it
starts with 1
       ;; then we know that the reply message has been compressed
      ((#:match u8(1) #:skip 1)
       decompress-from-pipe-to-pipe)

      ;; else no compression.
      ((else #:skip 1)
       ))

     ;; the final step is to take the bytestream and make a scheme object
and put that
     ;; to the scheme stream and the pipe is finished
     (mk-fpipe->c-atom)))

   ;; fpipe-scheme takes a piplend from scm to scm and creates a function
of it.
   ;; each time the function is called with a scheme object we will send it
ot the server
   ;; from the return message create a scheme object that is returned from
the funciton
  (define action (fpipe-schemer ch1 ch2))

  ;; A little nicer interface and we are finished
  (lambda* (message #:key (compress? #f))
     (action (cons (if compress? '(compress) '()) message))))


;; SERVER
(define* (make-server server-lambda address ip-server? #key (context #f))

  (define schemer (fpipe-schemer ch1 ch2))

  (spawn-fiber
   (lambda ()
     (let lp ()
       (schemer %fpipe-eof%)
       (lp)))))

[-- Attachment #2: Type: text/html, Size: 5168 bytes --]

^ permalink raw reply	[flat|nested] 2+ messages in thread

* Re: stis engine
  2021-08-24 22:25 stis engine Stefan Israelsson Tampe
@ 2021-08-24 22:31 ` Stefan Israelsson Tampe
  0 siblings, 0 replies; 2+ messages in thread
From: Stefan Israelsson Tampe @ 2021-08-24 22:31 UTC (permalink / raw)
  To: Guile User, guile-devel

;; Oups I managed to send the message by accident without finishing it.
;; The server part is similar and I will drop the actual pipeline and
highlight the differenece
;; which is that we will get a message stream it to an scheme object call
server-.lambda below
;; with it and finally from that lambdas return value reply to the client
we will create a loop where
;; the server waits for questions

(define* (make-server server-lambda address ip-server? #key (context #f))

  (define schemer (fpipe-schemer ch1 ch2))

  (spawn-fiber
   (lambda ()
     (let lp ()
       (schemer %fpipe-eof%)
       (lp)))))

Finally the idea is to use it as


(define context    (make-zmq-context))
(define address    "") ;; ZeroMQ address
(define ip-server? #t) ;; if we will use bind or connect

(make-server (lambda (x) (cons '() x)) context address ip-server?)

(define client (make-client context address ip-server?))
  ;; send a mesage that is not compressed
  > (client "abc")
  "abc"

  > (length (client (iota 1000000) #:compress? #t))
  1000000

in a run fibers context

On Wed, Aug 25, 2021 at 12:25 AM Stefan Israelsson Tampe <
stefan.itampe@gmail.com> wrote:

> I have now made a fiber enables stream library called fpipe in my
> stis-engine repository see
>
> https://gitlab.com/tampe/stis-engine/-/tree/master/
>
> The idea is to focus on a high performance byte streaming library on top
> of wingo's fibers library and make heavy use of bytevector buffers. We will
> also gp between a stream of scheme
> values and these byte streams to seamlessly be able to integrate a good
> overview of the data pipeline.
>
> The following code uses a c-based serializer and deserializer of scheme
> data structures
> and allow for optional streamed compression and decompression and
> transport it over ZeroMQ
> networking which allow for thread/process/computer movement of data. The
> end result is a way to create servers and clients.
>
> It is instructive to show the code for the client and server pipelines is
> constructed tp show off the fpipes library. This is not the final design
> but moste components are done
>
> Here is the client
>
> (define* (make-client address ip-server? #key (context #f))
>   ;; First we setup the zmq networking
>   (define ctx    (if context context (make-zmq-context)))
>   (define socket (zmq-socket context ZMQ_REQ))
>
>   (if ip-server?
>       (zmq-bind      socket address)
>       (zmq-connect socket address))
>
>   ;; we will define to fiber channels, channel in = ch1 and channel out =
> ch2
>   (define-values (ch1 ch2)
>
>      ;; fpipe-construct is the general pipelining macro
>     (fpipe-construct
>
>       ;; this is a scheme condition that will match check a message
> bounded to it
>      (cond
>       (#:scm it)
>
>        ;; format of the matcher is (predicate . translatot) where if
> predicate is true we will
>        ;; push the message to the branching pipline this assumes a message
> is the form
>        ;; ((list-of-features) . message)
>       (((memq 'compress (car it)) #:tr (cdr it))
>
>        ;;  the c-based stremed serializer that integrates nicely with
> fibers and streams
>        ;;  the message transport is the form scm->bytesteam
>
>        (mk-c-atom->fpipe)
>
>        ;; the zlib compressor node will tarnsport as bytestream->bytestream
>        compress-from-fpipe-to-fpipe
>
>        ;; a bytestream->bytestream that will prepend a message with 1 to
> indicate that the stream
>        ;; has been compressed
>        #:prepend #u8(1))
>
>       ;;  if we do not have the compress feature then we will simply
> generate the stream and
>       ;; prepend a one e.g. not doing any compression
>       (else
>        (mk-c-atom->fpipe)
>        #:prepend #u8(0)))
>
>      ;; transport the message byetstream over the zmq socket this will
> retrun in a scheme
>       ;; stream where eof will survive as all control messages are and
> will initiate the next
>       ;; reading from the socket (when the request message has been fully
> sent.
>      (fpipe->zmq socket)
>
>       ;; so here we get the return message
>      (zmq->fpipe socket)
>
>      ;; This is a bytestream cond and has no it part,
>      (cond
>        ;; We try to match the beginning of the bytestream message and if
> it starts with 1
>        ;; then we know that the reply message has been compressed
>       ((#:match u8(1) #:skip 1)
>        decompress-from-pipe-to-pipe)
>
>       ;; else no compression.
>       ((else #:skip 1)
>        ))
>
>      ;; the final step is to take the bytestream and make a scheme object
> and put that
>      ;; to the scheme stream and the pipe is finished
>      (mk-fpipe->c-atom)))
>
>    ;; fpipe-scheme takes a piplend from scm to scm and creates a function
> of it.
>    ;; each time the function is called with a scheme object we will send
> it ot the server
>    ;; from the return message create a scheme object that is returned from
> the funciton
>   (define action (fpipe-schemer ch1 ch2))
>
>   ;; A little nicer interface and we are finished
>   (lambda* (message #:key (compress? #f))
>      (action (cons (if compress? '(compress) '()) message))))
>
>
> ;; SERVER
> (define* (make-server server-lambda address ip-server? #key (context #f))
>
>   (define schemer (fpipe-schemer ch1 ch2))
>
>   (spawn-fiber
>    (lambda ()
>      (let lp ()
>        (schemer %fpipe-eof%)
>        (lp)))))
>
>


^ permalink raw reply	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2021-08-24 22:31 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-08-24 22:25 stis engine Stefan Israelsson Tampe
2021-08-24 22:31 ` Stefan Israelsson Tampe

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).