From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id C4D722141C for ; Sun, 27 Jan 2019 04:03:42 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 03/14] qspawn|getlinebody: support streaming filters Date: Sun, 27 Jan 2019 04:03:30 +0000 Message-Id: <20190127040341.26107-4-e@80x24.org> In-Reply-To: <20190127040341.26107-1-e@80x24.org> References: <20190127040341.26107-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This is intended for wrapping "git show" and "git diff" processes in the future and to prevent it from monopolizing callers. This will us to better handle backpressure from gigantic commits. --- lib/PublicInbox/GetlineBody.pm | 16 +++++++++++++--- lib/PublicInbox/Qspawn.pm | 21 +++++++++++++++++++-- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/lib/PublicInbox/GetlineBody.pm b/lib/PublicInbox/GetlineBody.pm index ea07f3d..0a922fd 100644 --- a/lib/PublicInbox/GetlineBody.pm +++ b/lib/PublicInbox/GetlineBody.pm @@ -13,8 +13,13 @@ use strict; use warnings; sub new { - my ($class, $rpipe, $end, $buf) = @_; - bless { rpipe => $rpipe, end => $end, buf => $buf }, $class; + my ($class, $rpipe, $end, $buf, $filter) = @_; + bless { + rpipe => $rpipe, + end => $end, + buf => $buf, + filter => $filter || 0, + }, $class; } # close should always be called after getline returns undef, @@ -24,8 +29,13 @@ sub DESTROY { $_[0]->close } sub getline { my ($self) = @_; + my $filter = $self->{filter}; + return if $filter == -1; # last call was EOF + my $buf = delete $self->{buf}; # initial buffer - defined $buf ? $buf : $self->{rpipe}->getline; + $buf = $self->{rpipe}->getline unless defined $buf; + $self->{filter} = -1 unless defined $buf; # set EOF for next call + $filter ? $filter->($buf) : $buf; } sub close { diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index b80dac1..3247cd0 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -9,6 +9,7 @@ package PublicInbox::Qspawn; use strict; use warnings; use PublicInbox::Spawn qw(popen_rd); +require Plack::Util; my $def_limiter; sub new ($$$;) { @@ -60,11 +61,25 @@ sub start { } } +# create a filter for "push"-based streaming PSGI writes used by HTTPD::Async +sub filter_fh ($$) { + my ($fh, $filter) = @_; + Plack::Util::inline_object( + close => sub { + $fh->write($filter->(undef)); + $fh->close; + }, + write => sub { + $fh->write($filter->($_[0])); + }); +} + sub psgi_return { my ($self, $env, $limiter, $parse_hdr) = @_; my ($fh, $rpipe); my $end = sub { - if (my $err = $self->finish) { + my $err = $self->finish; + if ($err && !$env->{'qspawn.quiet'}) { $err = join(' ', @{$self->{args}->[0]}).": $err\n"; $env->{'psgi.errors'}->print($err); } @@ -84,6 +99,7 @@ sub psgi_return { my $cb = sub { my $r = $rd_hdr->() or return; $rd_hdr = undef; + my $filter = delete $env->{'qspawn.filter'}; if (scalar(@$r) == 3) { # error if ($async) { $async->close; # calls rpipe->close @@ -94,11 +110,12 @@ sub psgi_return { $res->($r); } elsif ($async) { $fh = $res->($r); # scalar @$r == 2 + $fh = filter_fh($fh, $filter) if $filter; $async->async_pass($env->{'psgix.io'}, $fh, \$buf); } else { # for synchronous PSGI servers require PublicInbox::GetlineBody; $r->[2] = PublicInbox::GetlineBody->new($rpipe, $end, - $buf); + $buf, $filter); $res->($r); } }; -- EW