From mboxrd@z Thu Jan 1 00:00:00 1970 Path: news.gmane.io!.POSTED.blaine.gmane.org!not-for-mail From: Stefan Israelsson Tampe Newsgroups: gmane.lisp.guile.devel,gmane.lisp.guile.user Subject: stis engine Date: Wed, 25 Aug 2021 00:25:53 +0200 Message-ID: Mime-Version: 1.0 Content-Type: multipart/alternative; boundary="0000000000008b32df05ca559edc" Injection-Info: ciao.gmane.io; posting-host="blaine.gmane.org:116.202.254.214"; logging-data="35014"; mail-complaints-to="usenet@ciao.gmane.io" To: Guile User , guile-devel Original-X-From: guile-devel-bounces+guile-devel=m.gmane-mx.org@gnu.org Wed Aug 25 00:26:23 2021 Return-path: Envelope-to: guile-devel@m.gmane-mx.org Original-Received: from lists.gnu.org ([209.51.188.17]) by ciao.gmane.io with esmtps (TLS1.2:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.92) (envelope-from ) id 1mIes9-0008ro-Pu for guile-devel@m.gmane-mx.org; Wed, 25 Aug 2021 00:26:21 +0200 Original-Received: from localhost ([::1]:48580 helo=lists1p.gnu.org) by lists.gnu.org with esmtp (Exim 4.90_1) (envelope-from ) id 1mIes8-0002md-98 for guile-devel@m.gmane-mx.org; Tue, 24 Aug 2021 18:26:20 -0400 Original-Received: from eggs.gnu.org ([2001:470:142:3::10]:38382) by lists.gnu.org with esmtps (TLS1.2:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.90_1) (envelope-from ) id 1mIerx-0002mL-CR; Tue, 24 Aug 2021 18:26:09 -0400 Original-Received: from mail-pg1-x52c.google.com ([2607:f8b0:4864:20::52c]:45724) by eggs.gnu.org with esmtps (TLS1.2:ECDHE_RSA_AES_128_GCM_SHA256:128) (Exim 4.90_1) (envelope-from ) id 1mIerv-0003u6-Ay; Tue, 24 Aug 2021 18:26:09 -0400 Original-Received: by mail-pg1-x52c.google.com with SMTP id n18so21193161pgm.12; Tue, 24 Aug 2021 15:26:05 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:from:date:message-id:subject:to; bh=FcEI7Lqdu53HMXSdRca+AXbwbiSqTDHPWtXX+lLbgMo=; b=hXHYEKEXfjx7F4B3aATTuce9PviGU7GxtYhAuX8ygqnhkLEA9vsZSGv0BAusjzkIOQ SE++kFEwrQCZhzf1XIdY4ViiN9juORDVbzFkMFtN4NRON/+RDqAXuVmMRRmraNH6P6Le gsXox3buCnb+sgSHMLzXzJRqNhyqy71NWCPfNzCghUIaM6v2ORv4dlN3stGb53VlpDy3 DZ2xyO5GLy9AwZBkbIv6Xjs0wRAs1M8tuqDue4dJxEZ9f1p28l6r7hKyDjjOL88tgBr/ KYJ6aST5GI2YMHLzQvtVzJOPCYQhCWVaOV6Mz09DjHwu9udvnXqRo9gQ9KKo0VxnBshV OZVA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=FcEI7Lqdu53HMXSdRca+AXbwbiSqTDHPWtXX+lLbgMo=; b=WT85o988uZaubwvnCvoIbLf2l0AcHFSp8KwGiSGtHuNp0jgLpShG6iJstGUZgDct9w Uw7Kx1O/GiEpOjBrwrA+plUBKHjxbaD2jIJeyFPLg+KEFf7AejhhXY5J2jjxpt+F7ZN8 mr8zx/H9bWNYTMJ5RXBmxs8KeoJZFzh2SiTHuIt3k2mlW/ICTYZLzbrKjOrw6SkAXX/7 x7bHtgE270OTbc5Z0nautQ152v4w1Q9LMw9ywCbe2YhlfvFFNbM4Z9mW1oAz2j9XMo8F 2oGVDskbYN61Rjz4oUNmTu5XvZrU2a8yGB0/6E9xFPYYbm5Ycj66siGkq97XAfBiC+/C owlg== X-Gm-Message-State: AOAM533GKHoEq+cXx1buf1eAL6BqezJ9hQ92jz14EhdXBvdsrRySqphA gWqec5IDbiW5nWn/ajBMPjTWIS/9L3U785lIWxjUYq6nS9A= X-Google-Smtp-Source: ABdhPJw5TOroc0YB7BAqnpec9MqDKihENIing+Jmy2o3cXu0lyzYtK5hXldFdE9uBZuzFPjaGfZdJM9pG4q1hnDBDSQ= X-Received: by 2002:aa7:8d92:0:b0:3ed:aec5:1d48 with SMTP id i18-20020aa78d92000000b003edaec51d48mr5609126pfr.79.1629843964505; Tue, 24 Aug 2021 15:26:04 -0700 (PDT) Received-SPF: pass client-ip=2607:f8b0:4864:20::52c; envelope-from=stefan.itampe@gmail.com; helo=mail-pg1-x52c.google.com X-Spam_score_int: -20 X-Spam_score: -2.1 X-Spam_bar: -- X-Spam_report: (-2.1 / 5.0 requ) BAYES_00=-1.9, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, FREEMAIL_FROM=0.001, HTML_MESSAGE=0.001, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001 autolearn=ham autolearn_force=no X-Spam_action: no action X-BeenThere: guile-devel@gnu.org X-Mailman-Version: 2.1.23 Precedence: list List-Id: "Developers list for Guile, the GNU extensibility library" List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: guile-devel-bounces+guile-devel=m.gmane-mx.org@gnu.org Original-Sender: "guile-devel" Xref: news.gmane.io gmane.lisp.guile.devel:20829 gmane.lisp.guile.user:17693 Archived-At: --0000000000008b32df05ca559edc Content-Type: text/plain; charset="UTF-8" I have now made a fiber enables stream library called fpipe in my stis-engine repository see https://gitlab.com/tampe/stis-engine/-/tree/master/ The idea is to focus on a high performance byte streaming library on top of wingo's fibers library and make heavy use of bytevector buffers. We will also gp between a stream of scheme values and these byte streams to seamlessly be able to integrate a good overview of the data pipeline. The following code uses a c-based serializer and deserializer of scheme data structures and allow for optional streamed compression and decompression and transport it over ZeroMQ networking which allow for thread/process/computer movement of data. The end result is a way to create servers and clients. It is instructive to show the code for the client and server pipelines is constructed tp show off the fpipes library. This is not the final design but moste components are done Here is the client (define* (make-client address ip-server? #key (context #f)) ;; First we setup the zmq networking (define ctx (if context context (make-zmq-context))) (define socket (zmq-socket context ZMQ_REQ)) (if ip-server? (zmq-bind socket address) (zmq-connect socket address)) ;; we will define to fiber channels, channel in = ch1 and channel out = ch2 (define-values (ch1 ch2) ;; fpipe-construct is the general pipelining macro (fpipe-construct ;; this is a scheme condition that will match check a message bounded to it (cond (#:scm it) ;; format of the matcher is (predicate . translatot) where if predicate is true we will ;; push the message to the branching pipline this assumes a message is the form ;; ((list-of-features) . message) (((memq 'compress (car it)) #:tr (cdr it)) ;; the c-based stremed serializer that integrates nicely with fibers and streams ;; the message transport is the form scm->bytesteam (mk-c-atom->fpipe) ;; the zlib compressor node will tarnsport as bytestream->bytestream compress-from-fpipe-to-fpipe ;; a bytestream->bytestream that will prepend a message with 1 to indicate that the stream ;; has been compressed #:prepend #u8(1)) ;; if we do not have the compress feature then we will simply generate the stream and ;; prepend a one e.g. not doing any compression (else (mk-c-atom->fpipe) #:prepend #u8(0))) ;; transport the message byetstream over the zmq socket this will retrun in a scheme ;; stream where eof will survive as all control messages are and will initiate the next ;; reading from the socket (when the request message has been fully sent. (fpipe->zmq socket) ;; so here we get the return message (zmq->fpipe socket) ;; This is a bytestream cond and has no it part, (cond ;; We try to match the beginning of the bytestream message and if it starts with 1 ;; then we know that the reply message has been compressed ((#:match u8(1) #:skip 1) decompress-from-pipe-to-pipe) ;; else no compression. ((else #:skip 1) )) ;; the final step is to take the bytestream and make a scheme object and put that ;; to the scheme stream and the pipe is finished (mk-fpipe->c-atom))) ;; fpipe-scheme takes a piplend from scm to scm and creates a function of it. ;; each time the function is called with a scheme object we will send it ot the server ;; from the return message create a scheme object that is returned from the funciton (define action (fpipe-schemer ch1 ch2)) ;; A little nicer interface and we are finished (lambda* (message #:key (compress? #f)) (action (cons (if compress? '(compress) '()) message)))) ;; SERVER (define* (make-server server-lambda address ip-server? #key (context #f)) (define schemer (fpipe-schemer ch1 ch2)) (spawn-fiber (lambda () (let lp () (schemer %fpipe-eof%) (lp))))) --0000000000008b32df05ca559edc Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
I have now made a fiber enables stream=C2=A0library= =C2=A0called fpipe in my stis-engine repository see

https://g= itlab.com/tampe/stis-engine/-/tree/master/

The idea is to focus on a high performance byte streaming library on top o= f wingo's fibers library and make heavy use of bytevector buffers. We w= ill also gp between=C2=A0a stream of scheme=C2=A0
values and thes= e byte streams to seamlessly be able to integrate a good overview of the da= ta=C2=A0pipeline.

The following code uses a c-base= d=C2=A0serializer=C2=A0and deserializer=C2=A0of scheme data structures=C2= =A0
and allow for optional streamed compression and decompression= and transport it over ZeroMQ
networking which allow for thread/p= rocess/computer movement of data. The end result is a way to create servers= and clients.

It is instructive to show the code f= or the client and server pipelines is constructed=C2=A0tp show off the fpip= es library. This is not the final design but moste components are done

Here is the client

(define* (make-= client address ip-server? #key (context #f))
=C2=A0 ;; First we setup t= he zmq networking
=C2=A0 (define ctx =C2=A0 =C2=A0(if context context (m= ake-zmq-context)))
=C2=A0 (define socket (zmq-socket context ZMQ_REQ))=C2=A0
=C2=A0 (if ip-server?
=C2=A0 =C2=A0 =C2=A0 (zmq-bind=C2=A0 = =C2=A0 =C2=A0 socket address)
=C2=A0 =C2=A0 =C2=A0 (zmq-connect socket a= ddress))

=C2=A0 ;; we will define to fiber channels, channel in =3D = ch1 and channel out =3D ch2
=C2=A0 (define-values (ch1 ch2) =C2=A0
=

=C2=A0 =C2=A0 =C2=A0;; fpipe-construct is the general p= ipelining macro
=C2=A0 =C2=A0 (fpipe-construct

=C2=A0 =C2=A0 =C2=A0 ;; this is a scheme condition that will match c= heck a message bounded to it
=C2=A0 =C2=A0 =C2=A0(cond
=C2=A0 =C2=A0 = =C2=A0 (#:scm it)
=C2=A0 =C2=A0 =C2=A0
=C2=A0 =C2=A0 =C2=A0 = =C2=A0;; format of the matcher is (predicate . translatot) where if predica= te is true we will=C2=A0=C2=A0
=C2=A0 =C2=A0 =C2=A0 =C2=A0;; push= the message to the branching pipline=C2=A0this assumes a message is the fo= rm
=C2=A0 =C2=A0 =C2=A0 =C2=A0;; ((list-of-features) . message)=C2=A0 =C2=A0 =C2=A0 (((memq 'compress (car it)) #:tr (cdr it)) =C2= =A0 =C2=A0 =C2=A0
=C2=A0 =C2=A0 =C2=A0 =C2=A0=C2=A0
=C2= =A0 =C2=A0 =C2=A0 =C2=A0;;=C2=A0 the c-based stremed=C2=A0serializer that i= ntegrates nicely with fibers and streams
=C2=A0 =C2=A0 =C2=A0 =C2= =A0;;=C2=A0 the message transport is the form scm->bytesteam
<= br>=C2=A0 =C2=A0 =C2=A0 =C2=A0(mk-c-atom->fpipe)

=C2=A0 =C2=A0 =C2=A0 =C2=A0;; the zlib compressor node will tarnsport as = bytestream->bytestream
=C2=A0 =C2=A0 =C2=A0 =C2=A0compress-from-fpipe= -to-fpipe =C2=A0 =C2=A0 =C2=A0

=C2=A0 =C2=A0 =C2= =A0 =C2=A0;; a bytestream->bytestream=C2=A0that will prepend a message w= ith 1 to indicate that the stream
=C2=A0 =C2=A0 =C2=A0 =C2=A0;; h= as been compressed
=C2=A0 =C2=A0 =C2=A0 =C2=A0#:prepend #u8(1))
=C2= =A0 =C2=A0 =C2=A0 =C2=A0
=C2=A0 =C2=A0 =C2=A0 ;;=C2=A0 if we do n= ot have the compress feature then we will simply generate the stream and=C2= =A0
=C2=A0 =C2=A0 =C2=A0 ;; prepend a one e.g. not doing any comp= ression=C2=A0
=C2=A0 =C2=A0 =C2=A0 (else =C2=A0 =C2=A0 =C2=A0
=C2=A0= =C2=A0 =C2=A0 =C2=A0(mk-c-atom->fpipe)
=C2=A0 =C2=A0 =C2=A0 =C2=A0#:= prepend #u8(0)))
=C2=A0 =C2=A0
=C2=A0 =C2=A0 =C2=A0;; transpor= t the message byetstream over the zmq socket this will retrun in a scheme
=C2=A0 =C2=A0 =C2=A0 ;; stream where eof will survive as all contr= ol messages are and will initiate the next
=C2=A0 =C2=A0 =C2=A0 ;= ; reading from the socket (when the request message has been fully sent.=C2=A0 =C2=A0 =C2=A0(fpipe->zmq socket)
=C2=A0 =C2=A0 =C2=A0
=C2=A0 =C2=A0 =C2=A0 ;; so here we get the return message
=C2=A0 =C2= =A0 =C2=A0(zmq->fpipe socket)
=C2=A0 =C2=A0 =C2=A0
=C2=A0 = =C2=A0 =C2=A0;; This is a bytestream cond and has no it part,=C2=A0
=C2= =A0 =C2=A0 =C2=A0(cond
=C2=A0 =C2=A0 =C2=A0 =C2=A0;; We try to ma= tch the beginning of the bytestream message and if it starts with 1
=C2=A0 =C2=A0 =C2=A0 =C2=A0;; then we know that the reply message has be= en compressed
=C2=A0 =C2=A0 =C2=A0 ((#:match u8(1) #:skip 1)
=C2=A0 = =C2=A0 =C2=A0 =C2=A0decompress-from-pipe-to-pipe)
=C2=A0 =C2=A0 =C2=A0
=C2=A0 =C2=A0 =C2=A0 ;; else no compression.
=C2=A0 =C2=A0 =C2= =A0 ((else #:skip 1)
=C2=A0 =C2=A0 =C2=A0 =C2=A0))

=C2=A0 =C2=A0 = =C2=A0;; the final step is to take the bytestream=C2=A0and make a scheme ob= ject and put that
=C2=A0 =C2=A0 =C2=A0;; to the scheme stream and= the pipe is finished
=C2=A0 =C2=A0 =C2=A0(mk-fpipe->c-atom)))
=C2= =A0
=C2=A0 =C2=A0;; fpipe-scheme takes a piplend from scm to scm = and creates a function of it.
=C2=A0 =C2=A0;; each time the funct= ion is called with a scheme object we will send it ot the server
= =C2=A0 =C2=A0;; from the return message create a scheme object that is retu= rned from the funciton=C2=A0=C2=A0
=C2=A0 (define action (fpipe-schemer = ch1 ch2))
=C2=A0=C2=A0
=C2=A0 ;; A little nicer interfa= ce and we are finished
=C2=A0 (lambda* (message #:key (compress? #f))=C2=A0 =C2=A0 =C2=A0(action (cons (if compress? '(compress) '()) m= essage)))) =C2=A0
=C2=A0

;; SERVER
(define* (make-server se= rver-lambda address ip-server? #key (context #f))

=C2=A0 (define sch= emer (fpipe-schemer ch1 ch2))
=C2=A0
=C2=A0 (spawn-fiber
=C2=A0 = =C2=A0(lambda ()
=C2=A0 =C2=A0 =C2=A0(let lp ()
=C2=A0 =C2=A0 =C2=A0 = =C2=A0(schemer %fpipe-eof%)
=C2=A0 =C2=A0 =C2=A0 =C2=A0(lp)))))
=C2= =A0 =C2=A0=C2=A0
--0000000000008b32df05ca559edc--