unofficial mirror of guile-user@gnu.org 
 help / color / mirror / Atom feed
* stis engine
@ 2021-08-24 22:25 Stefan Israelsson Tampe
  2021-08-24 22:31 ` Stefan Israelsson Tampe
  0 siblings, 1 reply; 2+ messages in thread
From: Stefan Israelsson Tampe @ 2021-08-24 22:25 UTC (permalink / raw)
  To: Guile User, guile-devel

[-- Attachment #1: Type: text/plain, Size: 4016 bytes --]

I have now made a fiber enables stream library called fpipe in my
stis-engine repository see

https://gitlab.com/tampe/stis-engine/-/tree/master/

The idea is to focus on a high performance byte streaming library on top of
wingo's fibers library and make heavy use of bytevector buffers. We will
also gp between a stream of scheme
values and these byte streams to seamlessly be able to integrate a good
overview of the data pipeline.

The following code uses a c-based serializer and deserializer of scheme
data structures
and allow for optional streamed compression and decompression and transport
it over ZeroMQ
networking which allow for thread/process/computer movement of data. The
end result is a way to create servers and clients.

It is instructive to show the code for the client and server pipelines is
constructed tp show off the fpipes library. This is not the final design
but moste components are done

Here is the client

(define* (make-client address ip-server? #key (context #f))
  ;; First we setup the zmq networking
  (define ctx    (if context context (make-zmq-context)))
  (define socket (zmq-socket context ZMQ_REQ))

  (if ip-server?
      (zmq-bind      socket address)
      (zmq-connect socket address))

  ;; we will define to fiber channels, channel in = ch1 and channel out =
ch2
  (define-values (ch1 ch2)

     ;; fpipe-construct is the general pipelining macro
    (fpipe-construct

      ;; this is a scheme condition that will match check a message bounded
to it
     (cond
      (#:scm it)

       ;; format of the matcher is (predicate . translatot) where if
predicate is true we will
       ;; push the message to the branching pipline this assumes a message
is the form
       ;; ((list-of-features) . message)
      (((memq 'compress (car it)) #:tr (cdr it))

       ;;  the c-based stremed serializer that integrates nicely with
fibers and streams
       ;;  the message transport is the form scm->bytesteam

       (mk-c-atom->fpipe)

       ;; the zlib compressor node will tarnsport as bytestream->bytestream
       compress-from-fpipe-to-fpipe

       ;; a bytestream->bytestream that will prepend a message with 1 to
indicate that the stream
       ;; has been compressed
       #:prepend #u8(1))

      ;;  if we do not have the compress feature then we will simply
generate the stream and
      ;; prepend a one e.g. not doing any compression
      (else
       (mk-c-atom->fpipe)
       #:prepend #u8(0)))

     ;; transport the message byetstream over the zmq socket this will
retrun in a scheme
      ;; stream where eof will survive as all control messages are and will
initiate the next
      ;; reading from the socket (when the request message has been fully
sent.
     (fpipe->zmq socket)

      ;; so here we get the return message
     (zmq->fpipe socket)

     ;; This is a bytestream cond and has no it part,
     (cond
       ;; We try to match the beginning of the bytestream message and if it
starts with 1
       ;; then we know that the reply message has been compressed
      ((#:match u8(1) #:skip 1)
       decompress-from-pipe-to-pipe)

      ;; else no compression.
      ((else #:skip 1)
       ))

     ;; the final step is to take the bytestream and make a scheme object
and put that
     ;; to the scheme stream and the pipe is finished
     (mk-fpipe->c-atom)))

   ;; fpipe-scheme takes a piplend from scm to scm and creates a function
of it.
   ;; each time the function is called with a scheme object we will send it
ot the server
   ;; from the return message create a scheme object that is returned from
the funciton
  (define action (fpipe-schemer ch1 ch2))

  ;; A little nicer interface and we are finished
  (lambda* (message #:key (compress? #f))
     (action (cons (if compress? '(compress) '()) message))))


;; SERVER
(define* (make-server server-lambda address ip-server? #key (context #f))

  (define schemer (fpipe-schemer ch1 ch2))

  (spawn-fiber
   (lambda ()
     (let lp ()
       (schemer %fpipe-eof%)
       (lp)))))

[-- Attachment #2: Type: text/html, Size: 5168 bytes --]

^ permalink raw reply	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2021-08-24 22:31 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-08-24 22:25 stis engine Stefan Israelsson Tampe
2021-08-24 22:31 ` Stefan Israelsson Tampe

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).