From mboxrd@z Thu Jan 1 00:00:00 1970 Path: news.gmane.io!.POSTED.blaine.gmane.org!not-for-mail From: Stefan Israelsson Tampe Newsgroups: gmane.lisp.guile.user,gmane.lisp.guile.devel Subject: Re: stis engine Date: Wed, 25 Aug 2021 00:31:25 +0200 Message-ID: References: Mime-Version: 1.0 Content-Type: text/plain; charset="UTF-8" Injection-Info: ciao.gmane.io; posting-host="blaine.gmane.org:116.202.254.214"; logging-data="23281"; mail-complaints-to="usenet@ciao.gmane.io" To: Guile User , guile-devel Original-X-From: guile-user-bounces+guile-user=m.gmane-mx.org@gnu.org Wed Aug 25 00:32:35 2021 Return-path: Envelope-to: guile-user@m.gmane-mx.org Original-Received: from lists.gnu.org ([209.51.188.17]) by ciao.gmane.io with esmtps (TLS1.2:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.92) (envelope-from ) id 1mIeyA-0005pY-Gw for guile-user@m.gmane-mx.org; Wed, 25 Aug 2021 00:32:34 +0200 Original-Received: from localhost ([::1]:56300 helo=lists1p.gnu.org) by lists.gnu.org with esmtp (Exim 4.90_1) (envelope-from ) id 1mIey8-000877-FF for guile-user@m.gmane-mx.org; Tue, 24 Aug 2021 18:32:32 -0400 Original-Received: from eggs.gnu.org ([2001:470:142:3::10]:39118) by lists.gnu.org with esmtps (TLS1.2:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.90_1) (envelope-from ) id 1mIexJ-00086d-G2; Tue, 24 Aug 2021 18:31:41 -0400 Original-Received: from mail-pl1-x62c.google.com ([2607:f8b0:4864:20::62c]:35781) by eggs.gnu.org with esmtps (TLS1.2:ECDHE_RSA_AES_128_GCM_SHA256:128) (Exim 4.90_1) (envelope-from ) id 1mIexG-0007XB-5r; Tue, 24 Aug 2021 18:31:41 -0400 Original-Received: by mail-pl1-x62c.google.com with SMTP id b9so8765675plx.2; Tue, 24 Aug 2021 15:31:37 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=IjSrKDP8uWc93O2sQpACGqpYkB7NwFeIPzeZiXbsZB8=; b=V8do+3boegppbUtSAYR4axzub5B8oPanIcwn3g4XYrlCwSRzLxV6SMBHIdFnHXqGAD pIVVwS5yoZRSV61Q1tZ1KnFqXHMhgRPSS0RMZqxEDTY+GuY0RyuC1aXOUnBZp686H580 Xan42LkPOhzySlhrf0NtsyS4db0xgwlkPSTU51M16zqFtKB4SSeksUNy4/mJ4UKQrtSR lcMixj85isPxx8cPX+TK6m10yYVO5fHGg6hzFyz/sQcV7dE7VLBQifRPP4870r7amtq4 fjEnhQ1C9GMcCT2DAPAumq5oP0GRaHonjvZNi3WCstaw2bP2eVPUipmwHB7DaVMP9tPW 6/Tw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=IjSrKDP8uWc93O2sQpACGqpYkB7NwFeIPzeZiXbsZB8=; b=cNXbuJBgJFmiRr+H7QXNV1AeGzy76bQUToPyFfbHuPn+tp+W3miRewj8wU+Mkd5r/8 H4wH7QnaPhVDridOhfol+hiiTOwyWpJd1d/M4ckpMPmOJyANb5mOG72o7PBbegNo8ZCT iuyJxgyFqvoGgY+Wgg2Bo8muA1t801zG6iUg/5bykIvTFcSXpY1oJEomGOvDVWc9mYQE UYGQWjEpky6OZfpLn6PTnOJgJ4wfY2hI4NZ/vo1sbK3cluYeSXGkcOjvalJqTtTHMBSl x94wiDzrotp5ZKDvNOn0/DPQsCTsR7FE8Jifrb6dcvmAZ+bLBr6oLYT7P4gOSOj6ALjM myww== X-Gm-Message-State: AOAM532fAMtyFSAFhz2fw9W5A/ag2cRAMUZW33AshYzUaQk4qy97T4fx K+PR7Pxrf6yTudVvmbs2a8N8IwMUkjN2XrM6o1ZK7PNydfA= X-Google-Smtp-Source: ABdhPJzDcTVenxHKUv9AQ6s5zS7SzWJZF/6XMcAPhrjvb0OnTqcUhr//tuF0l+U8fFWTdjZchyxjLz6cA7v3WMc9nX0= X-Received: by 2002:a17:902:650b:b0:137:3940:ec24 with SMTP id b11-20020a170902650b00b001373940ec24mr1718314plk.36.1629844296137; Tue, 24 Aug 2021 15:31:36 -0700 (PDT) In-Reply-To: Received-SPF: pass client-ip=2607:f8b0:4864:20::62c; envelope-from=stefan.itampe@gmail.com; helo=mail-pl1-x62c.google.com X-Spam_score_int: -20 X-Spam_score: -2.1 X-Spam_bar: -- X-Spam_report: (-2.1 / 5.0 requ) BAYES_00=-1.9, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, FREEMAIL_FROM=0.001, HTML_MESSAGE=0.001, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001 autolearn=ham autolearn_force=no X-Spam_action: no action X-Content-Filtered-By: Mailman/MimeDel 2.1.23 X-BeenThere: guile-user@gnu.org X-Mailman-Version: 2.1.23 Precedence: list List-Id: General Guile related discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: guile-user-bounces+guile-user=m.gmane-mx.org@gnu.org Original-Sender: "guile-user" Xref: news.gmane.io gmane.lisp.guile.user:17694 gmane.lisp.guile.devel:20830 Archived-At: ;; Oups I managed to send the message by accident without finishing it. ;; The server part is similar and I will drop the actual pipeline and highlight the differenece ;; which is that we will get a message stream it to an scheme object call server-.lambda below ;; with it and finally from that lambdas return value reply to the client we will create a loop where ;; the server waits for questions (define* (make-server server-lambda address ip-server? #key (context #f)) (define schemer (fpipe-schemer ch1 ch2)) (spawn-fiber (lambda () (let lp () (schemer %fpipe-eof%) (lp))))) Finally the idea is to use it as (define context (make-zmq-context)) (define address "") ;; ZeroMQ address (define ip-server? #t) ;; if we will use bind or connect (make-server (lambda (x) (cons '() x)) context address ip-server?) (define client (make-client context address ip-server?)) ;; send a mesage that is not compressed > (client "abc") "abc" > (length (client (iota 1000000) #:compress? #t)) 1000000 in a run fibers context On Wed, Aug 25, 2021 at 12:25 AM Stefan Israelsson Tampe < stefan.itampe@gmail.com> wrote: > I have now made a fiber enables stream library called fpipe in my > stis-engine repository see > > https://gitlab.com/tampe/stis-engine/-/tree/master/ > > The idea is to focus on a high performance byte streaming library on top > of wingo's fibers library and make heavy use of bytevector buffers. We will > also gp between a stream of scheme > values and these byte streams to seamlessly be able to integrate a good > overview of the data pipeline. > > The following code uses a c-based serializer and deserializer of scheme > data structures > and allow for optional streamed compression and decompression and > transport it over ZeroMQ > networking which allow for thread/process/computer movement of data. The > end result is a way to create servers and clients. > > It is instructive to show the code for the client and server pipelines is > constructed tp show off the fpipes library. This is not the final design > but moste components are done > > Here is the client > > (define* (make-client address ip-server? #key (context #f)) > ;; First we setup the zmq networking > (define ctx (if context context (make-zmq-context))) > (define socket (zmq-socket context ZMQ_REQ)) > > (if ip-server? > (zmq-bind socket address) > (zmq-connect socket address)) > > ;; we will define to fiber channels, channel in = ch1 and channel out = > ch2 > (define-values (ch1 ch2) > > ;; fpipe-construct is the general pipelining macro > (fpipe-construct > > ;; this is a scheme condition that will match check a message > bounded to it > (cond > (#:scm it) > > ;; format of the matcher is (predicate . translatot) where if > predicate is true we will > ;; push the message to the branching pipline this assumes a message > is the form > ;; ((list-of-features) . message) > (((memq 'compress (car it)) #:tr (cdr it)) > > ;; the c-based stremed serializer that integrates nicely with > fibers and streams > ;; the message transport is the form scm->bytesteam > > (mk-c-atom->fpipe) > > ;; the zlib compressor node will tarnsport as bytestream->bytestream > compress-from-fpipe-to-fpipe > > ;; a bytestream->bytestream that will prepend a message with 1 to > indicate that the stream > ;; has been compressed > #:prepend #u8(1)) > > ;; if we do not have the compress feature then we will simply > generate the stream and > ;; prepend a one e.g. not doing any compression > (else > (mk-c-atom->fpipe) > #:prepend #u8(0))) > > ;; transport the message byetstream over the zmq socket this will > retrun in a scheme > ;; stream where eof will survive as all control messages are and > will initiate the next > ;; reading from the socket (when the request message has been fully > sent. > (fpipe->zmq socket) > > ;; so here we get the return message > (zmq->fpipe socket) > > ;; This is a bytestream cond and has no it part, > (cond > ;; We try to match the beginning of the bytestream message and if > it starts with 1 > ;; then we know that the reply message has been compressed > ((#:match u8(1) #:skip 1) > decompress-from-pipe-to-pipe) > > ;; else no compression. > ((else #:skip 1) > )) > > ;; the final step is to take the bytestream and make a scheme object > and put that > ;; to the scheme stream and the pipe is finished > (mk-fpipe->c-atom))) > > ;; fpipe-scheme takes a piplend from scm to scm and creates a function > of it. > ;; each time the function is called with a scheme object we will send > it ot the server > ;; from the return message create a scheme object that is returned from > the funciton > (define action (fpipe-schemer ch1 ch2)) > > ;; A little nicer interface and we are finished > (lambda* (message #:key (compress? #f)) > (action (cons (if compress? '(compress) '()) message)))) > > > ;; SERVER > (define* (make-server server-lambda address ip-server? #key (context #f)) > > (define schemer (fpipe-schemer ch1 ch2)) > > (spawn-fiber > (lambda () > (let lp () > (schemer %fpipe-eof%) > (lp))))) > >