* [PATCH 01/26] limiter: split out from qspawn
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 02/26] spawn: support synchronous run_qx Eric Wong
` (24 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
It's slightly better organized this way, especially since
`publicinboxLimiter' has its own user-facing config section
and knobs. I may use it in LeiMirror and CodeSearchIdx for
process management.
---
MANIFEST | 1 +
lib/PublicInbox/Config.pm | 4 +--
lib/PublicInbox/GitHTTPBackend.pm | 3 +-
lib/PublicInbox/Inbox.pm | 4 +--
lib/PublicInbox/Limiter.pm | 47 +++++++++++++++++++++++++++++++
lib/PublicInbox/MailDiff.pm | 1 +
lib/PublicInbox/Qspawn.pm | 47 ++-----------------------------
t/qspawn.t | 3 +-
8 files changed, 60 insertions(+), 50 deletions(-)
create mode 100644 lib/PublicInbox/Limiter.pm
diff --git a/MANIFEST b/MANIFEST
index 791d91a7..dcce801c 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -287,6 +287,7 @@ lib/PublicInbox/LeiUp.pm
lib/PublicInbox/LeiViewText.pm
lib/PublicInbox/LeiWatch.pm
lib/PublicInbox/LeiXSearch.pm
+lib/PublicInbox/Limiter.pm
lib/PublicInbox/Linkify.pm
lib/PublicInbox/Listener.pm
lib/PublicInbox/Lock.pm
diff --git a/lib/PublicInbox/Config.pm b/lib/PublicInbox/Config.pm
index 15e0872e..d156b2d3 100644
--- a/lib/PublicInbox/Config.pm
+++ b/lib/PublicInbox/Config.pm
@@ -124,9 +124,9 @@ sub lookup_newsgroup {
sub limiter {
my ($self, $name) = @_;
$self->{-limiters}->{$name} //= do {
- require PublicInbox::Qspawn;
+ require PublicInbox::Limiter;
my $max = $self->{"publicinboxlimiter.$name.max"} || 1;
- my $limiter = PublicInbox::Qspawn::Limiter->new($max);
+ my $limiter = PublicInbox::Limiter->new($max);
$limiter->setup_rlimit($name, $self);
$limiter;
};
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index 74432429..d69f5f8b 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -9,13 +9,14 @@ use v5.10.1;
use Fcntl qw(:seek);
use IO::Handle; # ->flush
use HTTP::Date qw(time2str);
+use PublicInbox::Limiter;
use PublicInbox::Qspawn;
use PublicInbox::Tmpfile;
use PublicInbox::WwwStatic qw(r @NO_CACHE);
use Carp ();
# 32 is same as the git-daemon connection limit
-my $default_limiter = PublicInbox::Qspawn::Limiter->new(32);
+my $default_limiter = PublicInbox::Limiter->new(32);
# n.b. serving "description" and "cloneurl" should be innocuous enough to
# not cause problems. serving "config" might...
diff --git a/lib/PublicInbox/Inbox.pm b/lib/PublicInbox/Inbox.pm
index 9afbb478..3dad7004 100644
--- a/lib/PublicInbox/Inbox.pm
+++ b/lib/PublicInbox/Inbox.pm
@@ -55,8 +55,8 @@ sub _set_limiter ($$$) {
my $val = $self->{$mkey} or return;
my $lim;
if ($val =~ /\A[0-9]+\z/) {
- require PublicInbox::Qspawn;
- $lim = PublicInbox::Qspawn::Limiter->new($val);
+ require PublicInbox::Limiter;
+ $lim = PublicInbox::Limiter->new($val);
} elsif ($val =~ /\A[a-z][a-z0-9]*\z/) {
$lim = $pi_cfg->limiter($val);
warn "$mkey limiter=$val not found\n" if !$lim;
diff --git a/lib/PublicInbox/Limiter.pm b/lib/PublicInbox/Limiter.pm
new file mode 100644
index 00000000..48a2b6a3
--- /dev/null
+++ b/lib/PublicInbox/Limiter.pm
@@ -0,0 +1,47 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+package PublicInbox::Limiter;
+use v5.12;
+use PublicInbox::Spawn;
+
+sub new {
+ my ($class, $max) = @_;
+ bless {
+ # 32 is same as the git-daemon connection limit
+ max => $max || 32,
+ running => 0,
+ run_queue => [],
+ # RLIMIT_CPU => undef,
+ # RLIMIT_DATA => undef,
+ # RLIMIT_CORE => undef,
+ }, $class;
+}
+
+sub setup_rlimit {
+ my ($self, $name, $cfg) = @_;
+ for my $rlim (@PublicInbox::Spawn::RLIMITS) {
+ my $k = lc($rlim);
+ $k =~ tr/_//d;
+ $k = "publicinboxlimiter.$name.$k";
+ my $v = $cfg->{$k} // next;
+ my @rlimit = split(/\s*,\s*/, $v);
+ if (scalar(@rlimit) == 1) {
+ push @rlimit, $rlimit[0];
+ } elsif (scalar(@rlimit) != 2) {
+ warn "could not parse $k: $v\n";
+ }
+ eval { require BSD::Resource };
+ if ($@) {
+ warn "BSD::Resource missing for $rlim";
+ next;
+ }
+ for my $i (0..$#rlimit) {
+ next if $rlimit[$i] ne 'INFINITY';
+ $rlimit[$i] = BSD::Resource::RLIM_INFINITY();
+ }
+ $self->{$rlim} = \@rlimit;
+ }
+}
+
+1;
diff --git a/lib/PublicInbox/MailDiff.pm b/lib/PublicInbox/MailDiff.pm
index 994c7851..c3ce9365 100644
--- a/lib/PublicInbox/MailDiff.pm
+++ b/lib/PublicInbox/MailDiff.pm
@@ -8,6 +8,7 @@ use PublicInbox::MsgIter qw(msg_part_text);
use PublicInbox::ViewDiff qw(flush_diff);
use PublicInbox::GitAsyncCat;
use PublicInbox::ContentDigestDbg;
+use PublicInbox::Qspawn;
sub write_part { # Eml->each_part callback
my ($ary, $self) = @_;
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 0e52617c..a4d78e49 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -29,6 +29,7 @@ use v5.12;
use PublicInbox::Spawn qw(popen_rd);
use PublicInbox::GzipFilter;
use Scalar::Util qw(blessed);
+use PublicInbox::Limiter;
# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
use Errno qw(EAGAIN EINTR);
@@ -183,7 +184,7 @@ sub psgi_qx {
$self->{qx_arg} = $qx_arg;
$self->{qx_fh} = $qx_fh;
$self->{qx_buf} = \$qx_buf;
- $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+ $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
start($self, $limiter, \&psgi_qx_start);
}
@@ -317,7 +318,7 @@ sub psgi_return {
$self->{psgi_env} = $env;
$self->{hdr_buf} = \(my $hdr_buf = '');
$self->{parse_hdr} = [ $parse_hdr, $hdr_arg ];
- $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+ $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
# the caller already captured the PSGI write callback from
# the PSGI server, so we can call ->start, here:
@@ -334,46 +335,4 @@ sub psgi_return {
}
}
-package PublicInbox::Qspawn::Limiter;
-use v5.12;
-
-sub new {
- my ($class, $max) = @_;
- bless {
- # 32 is same as the git-daemon connection limit
- max => $max || 32,
- running => 0,
- run_queue => [],
- # RLIMIT_CPU => undef,
- # RLIMIT_DATA => undef,
- # RLIMIT_CORE => undef,
- }, $class;
-}
-
-sub setup_rlimit {
- my ($self, $name, $cfg) = @_;
- foreach my $rlim (@PublicInbox::Spawn::RLIMITS) {
- my $k = lc($rlim);
- $k =~ tr/_//d;
- $k = "publicinboxlimiter.$name.$k";
- defined(my $v = $cfg->{$k}) or next;
- my @rlimit = split(/\s*,\s*/, $v);
- if (scalar(@rlimit) == 1) {
- push @rlimit, $rlimit[0];
- } elsif (scalar(@rlimit) != 2) {
- warn "could not parse $k: $v\n";
- }
- eval { require BSD::Resource };
- if ($@) {
- warn "BSD::Resource missing for $rlim";
- next;
- }
- foreach my $i (0..$#rlimit) {
- next if $rlimit[$i] ne 'INFINITY';
- $rlimit[$i] = BSD::Resource::RLIM_INFINITY();
- }
- $self->{$rlim} = \@rlimit;
- }
-}
-
1;
diff --git a/t/qspawn.t b/t/qspawn.t
index 224e20db..507f86a5 100644
--- a/t/qspawn.t
+++ b/t/qspawn.t
@@ -3,6 +3,7 @@
use v5.12;
use Test::More;
use_ok 'PublicInbox::Qspawn';
+use_ok 'PublicInbox::Limiter';
{
my $cmd = [qw(sh -c), 'echo >&2 err; echo out'];
@@ -23,7 +24,7 @@ sub finish_err ($) {
$qsp->{qsp_err} && ${$qsp->{qsp_err}};
}
-my $limiter = PublicInbox::Qspawn::Limiter->new(1);
+my $limiter = PublicInbox::Limiter->new(1);
{
my $x = PublicInbox::Qspawn->new([qw(true)]);
$x->{qsp_err} = \(my $err = '');
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 02/26] spawn: support synchronous run_qx
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
2023-10-25 0:29 ` [PATCH 01/26] limiter: split out from qspawn Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 03/26] psgi_qx: use a temporary file rather than pipe Eric Wong
` (23 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
This is similar to `backtick` but supports all our existing spawn
functionality (chdir, env, rlimit, redirects, etc.). It also
supports SCALAR ref redirects like run_script in our test suite
for std{in,out,err}.
We can probably use :utf8 by default for these redirects, even.
---
lib/PublicInbox/Git.pm | 6 ++++
lib/PublicInbox/SearchIdx.pm | 19 ++++------
lib/PublicInbox/Spawn.pm | 69 ++++++++++++++++++++++++++----------
t/spawn.t | 13 ++++++-
4 files changed, 76 insertions(+), 31 deletions(-)
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index a460d155..476dcf30 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -69,6 +69,7 @@ sub check_git_exe () {
$GIT_VER = eval("v$1") // die "BUG: bad vstring: $1 ($v)";
$EXE_ST = $st;
}
+ $GIT_EXE;
}
sub git_version {
@@ -422,6 +423,11 @@ sub async_err ($$$$$) {
$async_warn ? carp($msg) : $self->fail($msg);
}
+sub cmd {
+ my $self = shift;
+ [ $GIT_EXE // check_git_exe(), "--git-dir=$self->{git_dir}", @_ ]
+}
+
# $git->popen(qw(show f00)); # or
# $git->popen(qw(show f00), { GIT_CONFIG => ... }, { 2 => ... });
sub popen {
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 8a571cfb..3c64c715 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -22,7 +22,7 @@ use POSIX qw(strftime);
use Fcntl qw(SEEK_SET);
use Time::Local qw(timegm);
use PublicInbox::OverIdx;
-use PublicInbox::Spawn qw(run_wait);
+use PublicInbox::Spawn qw(run_wait run_qx);
use PublicInbox::Git qw(git_unquote);
use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp);
use PublicInbox::Address;
@@ -351,23 +351,18 @@ sub index_diff ($$$) {
}
sub patch_id {
- my ($self) = @_; # $_[1] is the diff (may be huge)
- open(my $fh, '+>:utf8', undef) or die "open: $!";
- open(my $eh, '+>', undef) or die "open: $!";
- $fh->autoflush(1);
- print $fh $_[1] or die "print: $!";
- sysseek($fh, 0, SEEK_SET) or die "sysseek: $!";
- my $id = ($self->{ibx} // $self->{eidx} // $self)->git->qx(
- [qw(patch-id --stable)], {}, { 0 => $fh, 2 => $eh });
- seek($eh, 0, SEEK_SET) or die "seek: $!";
- while (<$eh>) { warn $_ }
+ my ($self, $sref) = @_;
+ my $git = ($self->{ibx} // $self->{eidx} // $self)->git;
+ my $opt = { 0 => $sref, 2 => \(my $err) };
+ my $id = run_qx($git->cmd(qw(patch-id --stable)), undef, $opt);
+ warn $err if $err;
$id =~ /\A([a-f0-9]{40,})/ ? $1 : undef;
}
sub index_body_text {
my ($self, $doc, $sref) = @_;
if ($$sref =~ /^(?:diff|---|\+\+\+) /ms) {
- my $id = patch_id($self, $$sref);
+ my $id = patch_id($self, $sref);
$doc->add_term('XDFID'.$id) if defined($id);
}
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index 106f5e01..1fa7a41f 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -22,8 +22,9 @@ use Fcntl qw(SEEK_SET);
use IO::Handle ();
use Carp qw(croak);
use PublicInbox::ProcessIO;
-our @EXPORT_OK = qw(which spawn popen_rd popen_wr run_die run_wait);
+our @EXPORT_OK = qw(which spawn popen_rd popen_wr run_die run_wait run_qx);
our @RLIMITS = qw(RLIMIT_CPU RLIMIT_CORE RLIMIT_DATA);
+use autodie qw(open pipe read seek sysseek truncate);
BEGIN {
my $all_libc = <<'ALL_LIBC'; # all *nix systems we support
@@ -290,7 +291,6 @@ ALL_LIBC
undef $all_libc unless -d $inline_dir;
if (defined $all_libc) {
local $ENV{PERL_INLINE_DIRECTORY} = $inline_dir;
- use autodie;
# CentOS 7.x ships Inline 0.53, 0.64+ has built-in locking
my $lk = PublicInbox::Lock->new($inline_dir.
'/.public-inbox.lock');
@@ -301,7 +301,7 @@ ALL_LIBC
open STDERR, '>&', $fh;
STDERR->autoflush(1);
STDOUT->autoflush(1);
- CORE::eval 'use Inline C => $all_libc, BUILD_NOISY => 1';
+ eval 'use Inline C => $all_libc, BUILD_NOISY => 1';
my $err = $@;
open(STDERR, '>&', $olderr);
open(STDOUT, '>&', $oldout);
@@ -332,26 +332,34 @@ sub which ($) {
}
sub spawn ($;$$) {
- my ($cmd, $env, $opts) = @_;
+ my ($cmd, $env, $opt) = @_;
my $f = which($cmd->[0]) // die "$cmd->[0]: command not found\n";
- my @env;
+ my (@env, @rdr);
my %env = (%ENV, $env ? %$env : ());
while (my ($k, $v) = each %env) {
push @env, "$k=$v" if defined($v);
}
- my $redir = [];
for my $child_fd (0..2) {
- my $parent_fd = $opts->{$child_fd};
- if (defined($parent_fd) && $parent_fd !~ /\A[0-9]+\z/) {
- my $fd = fileno($parent_fd) //
- die "$parent_fd not an IO GLOB? $!";
- $parent_fd = $fd;
+ my $pfd = $opt->{$child_fd};
+ if ('SCALAR' eq ref($pfd)) {
+ open my $fh, '+>:utf8', undef;
+ $opt->{"fh.$child_fd"} = $fh;
+ if ($child_fd == 0) {
+ print $fh $$pfd;
+ $fh->flush or die "flush: $!";
+ sysseek($fh, 0, SEEK_SET);
+ }
+ $pfd = fileno($fh);
+ } elsif (defined($pfd) && $pfd !~ /\A[0-9]+\z/) {
+ my $fd = fileno($pfd) //
+ die "$pfd not an IO GLOB? $!";
+ $pfd = $fd;
}
- $redir->[$child_fd] = $parent_fd // $child_fd;
+ $rdr[$child_fd] = $pfd // $child_fd;
}
my $rlim = [];
foreach my $l (@RLIMITS) {
- my $v = $opts->{$l} // next;
+ my $v = $opt->{$l} // next;
my $r = eval "require BSD::Resource; BSD::Resource::$l();";
unless (defined $r) {
warn "$l undefined by BSD::Resource: $@\n";
@@ -359,31 +367,41 @@ sub spawn ($;$$) {
}
push @$rlim, $r, @$v;
}
- my $cd = $opts->{'-C'} // ''; # undef => NULL mapping doesn't work?
- my $pgid = $opts->{pgid} // -1;
- my $pid = pi_fork_exec($redir, $f, $cmd, \@env, $rlim, $cd, $pgid);
+ my $cd = $opt->{'-C'} // ''; # undef => NULL mapping doesn't work?
+ my $pgid = $opt->{pgid} // -1;
+ my $pid = pi_fork_exec(\@rdr, $f, $cmd, \@env, $rlim, $cd, $pgid);
die "fork_exec @$cmd failed: $!\n" unless $pid > 0;
$pid;
}
sub popen_rd {
my ($cmd, $env, $opt, @cb_arg) = @_;
- pipe(my $r, local $opt->{1}) or die "pipe: $!\n";
+ pipe(my $r, local $opt->{1});
my $pid = spawn($cmd, $env, $opt);
PublicInbox::ProcessIO->maybe_new($pid, $r, @cb_arg);
}
sub popen_wr {
my ($cmd, $env, $opt, @cb_arg) = @_;
- pipe(local $opt->{0}, my $w) or die "pipe: $!\n";
+ pipe(local $opt->{0}, my $w);
$w->autoflush(1);
my $pid = spawn($cmd, $env, $opt);
PublicInbox::ProcessIO->maybe_new($pid, $w, @cb_arg)
}
+sub read_out_err ($) {
+ my ($opt) = @_;
+ for my $fd (1, 2) { # read stdout/stderr
+ my $fh = delete($opt->{"fh.$fd"}) // next;
+ seek($fh, 0, SEEK_SET);
+ read($fh, ${$opt->{$fd}}, -s $fh, length(${$opt->{$fd}} // ''));
+ }
+}
+
sub run_wait ($;$$) {
my ($cmd, $env, $opt) = @_;
waitpid(spawn($cmd, $env, $opt), 0);
+ read_out_err($opt);
$?
}
@@ -392,4 +410,19 @@ sub run_die ($;$$) {
run_wait($cmd, $env, $rdr) and croak "E: @$cmd failed: \$?=$?";
}
+sub run_qx {
+ my ($cmd, $env, $opt) = @_;
+ my $fh = popen_rd($cmd, $env, $opt);
+ my @ret;
+ if (wantarray) {
+ @ret = <$fh>;
+ } else {
+ local $/;
+ $ret[0] = <$fh>;
+ }
+ close $fh; # caller should check $?
+ read_out_err($opt);
+ wantarray ? @ret : $ret[0];
+}
+
1;
diff --git a/t/spawn.t b/t/spawn.t
index 1af66bda..4b3baae4 100644
--- a/t/spawn.t
+++ b/t/spawn.t
@@ -3,7 +3,7 @@
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
use v5.12;
use Test::More;
-use PublicInbox::Spawn qw(which spawn popen_rd);
+use PublicInbox::Spawn qw(which spawn popen_rd run_qx);
require PublicInbox::Sigfd;
require PublicInbox::DS;
@@ -19,6 +19,17 @@ require PublicInbox::DS;
is($?, 0, 'true exited successfully');
}
+{
+ my $opt = { 0 => \'in', 2 => \(my $e) };
+ my $out = run_qx(['sh', '-c', 'echo e >&2; cat'], undef, $opt);
+ is($e, "e\n", 'captured stderr');
+ is($out, 'in', 'stdin read and stdout captured');
+ $opt->{0} = \"IN\n3\nLINES";
+ my @out = run_qx(['sh', '-c', 'echo E >&2; cat'], undef, $opt);
+ is($e, "e\nE\n", 'captured stderr appended to string');
+ is_deeply(\@out, [ "IN\n", "3\n", 'LINES' ], 'stdout array');
+}
+
SKIP: {
my $pid = spawn(['true'], undef, { pgid => 0 });
ok($pid, 'spawned process with new pgid');
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 03/26] psgi_qx: use a temporary file rather than pipe
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
2023-10-25 0:29 ` [PATCH 01/26] limiter: split out from qspawn Eric Wong
2023-10-25 0:29 ` [PATCH 02/26] spawn: support synchronous run_qx Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 04/26] www_coderepo: capture uses a flattened list Eric Wong
` (22 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
A pipe requires more context switches, syscalls, and code to
deal with unpredictable pipe EOF vs waitpid ordering. So just
use the new spawn/aspawn features to automatically handle
slurping output into a string.
---
MANIFEST | 1 +
lib/PublicInbox/Aspawn.pm | 34 ++++++++++++++
lib/PublicInbox/Qspawn.pm | 95 ++++++++++++++-------------------------
3 files changed, 68 insertions(+), 62 deletions(-)
create mode 100644 lib/PublicInbox/Aspawn.pm
diff --git a/MANIFEST b/MANIFEST
index dcce801c..f087621c 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -161,6 +161,7 @@ lib/PublicInbox/AddressPP.pm
lib/PublicInbox/Admin.pm
lib/PublicInbox/AdminEdit.pm
lib/PublicInbox/AltId.pm
+lib/PublicInbox/Aspawn.pm
lib/PublicInbox/AutoReap.pm
lib/PublicInbox/Cgit.pm
lib/PublicInbox/CidxComm.pm
diff --git a/lib/PublicInbox/Aspawn.pm b/lib/PublicInbox/Aspawn.pm
new file mode 100644
index 00000000..49f8651a
--- /dev/null
+++ b/lib/PublicInbox/Aspawn.pm
@@ -0,0 +1,34 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# async system()/qx() which takes callback
+package PublicInbox::Aspawn;
+use v5.12;
+use parent qw(Exporter);
+use PublicInbox::DS qw(awaitpid);
+use PublicInbox::Spawn qw(spawn);
+our @EXPORT_OK = qw(run_await);
+
+sub _await_cb { # awaitpid cb
+ my ($pid, $cmd, $env, $opt, $cb, @args) = @_;
+ PublicInbox::Spawn::read_out_err($opt);
+ if ($? && !$opt->{quiet}) {
+ my ($status, $sig) = ($? >> 8, $? & 127);
+ my $msg = '';
+ $msg .= " (-C=$opt->{-C})" if defined $opt->{-C};
+ $msg .= " status=$status" if $status;
+ $msg .= " signal=$sig" if $sig;
+ warn "E: @$cmd", $msg, "\n";
+ }
+ $cb->($pid, $cmd, $env, $opt, @args) if $cb;
+}
+
+sub run_await {
+ my ($cmd, $env, $opt, $cb, @args) = @_;
+ $opt->{1} //= \(my $out);
+ my $pid = spawn($cmd, $env, $opt);
+ awaitpid($pid, \&_await_cb, $cmd, $env, $opt, $cb, @args);
+ awaitpid($pid); # synchronous for non-$in_loop
+}
+
+1;
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index a4d78e49..59d5ed40 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -30,6 +30,7 @@ use PublicInbox::Spawn qw(popen_rd);
use PublicInbox::GzipFilter;
use Scalar::Util qw(blessed);
use PublicInbox::Limiter;
+use PublicInbox::Aspawn qw(run_await);
# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
use Errno qw(EAGAIN EINTR);
@@ -48,29 +49,30 @@ sub new {
sub _do_spawn {
my ($self, $start_cb, $limiter) = @_;
- my $err;
my ($cmd, $cmd_env, $opt) = @{delete $self->{args}};
my %o = %{$opt || {}};
$self->{limiter} = $limiter;
- foreach my $k (@PublicInbox::Spawn::RLIMITS) {
- if (defined(my $rlimit = $limiter->{$k})) {
- $o{$k} = $rlimit;
- }
+ for my $k (@PublicInbox::Spawn::RLIMITS) {
+ $o{$k} = $limiter->{$k} // next;
}
$self->{cmd} = $cmd;
$self->{-quiet} = 1 if $o{quiet};
- eval {
- # popen_rd may die on EMFILE, ENFILE
- $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
- \&waitpid_err, $self);
- $limiter->{running}++;
- $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
- };
+ $limiter->{running}++;
+ if ($start_cb) {
+ eval { # popen_rd may die on EMFILE, ENFILE
+ $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
+ \&waitpid_err, $self);
+ $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
+ };
+ } else {
+ eval { run_await($cmd, $cmd_env, \%o, \&wait_await, $self) };
+ warn "E: $@" if $@;
+ }
finish($self, $@) if $@;
}
-sub finalize ($) {
- my ($self) = @_;
+sub finalize ($;$) {
+ my ($self, $opt) = @_;
# process is done, spawn whatever's in the queue
my $limiter = delete $self->{limiter} or return;
@@ -89,10 +91,10 @@ sub finalize ($) {
warn "@{$self->{cmd}}: $err\n" if !$self->{-quiet};
}
- my ($env, $qx_cb, $qx_arg, $qx_buf) =
- delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
- if ($qx_cb) {
- eval { $qx_cb->($qx_buf, $qx_arg) };
+ my ($env, $qx_cb_arg) = delete @$self{qw(psgi_env qx_cb_arg)};
+ if ($qx_cb_arg) {
+ my $cb = shift @$qx_cb_arg;
+ eval { $cb->($opt->{1}, @$qx_cb_arg) };
return unless $@;
warn "E: $@"; # hope qspawn.wcb can handle it
}
@@ -108,15 +110,20 @@ sub finalize ($) {
sub DESTROY { finalize($_[0]) } # ->finalize is idempotent
sub waitpid_err { # callback for awaitpid
- my (undef, $self) = @_; # $_[0]: pid
+ my (undef, $self, $opt) = @_; # $_[0]: pid
$self->{_err} = ''; # for defined check in ->finish
- if ($?) {
+ if ($?) { # FIXME: redundant
my $status = $? >> 8;
my $sig = $? & 127;
$self->{_err} .= "exit status=$status";
$self->{_err} .= " signal=$sig" if $sig;
}
- finalize($self) if !$self->{rpipe};
+ finalize($self, $opt) if !$self->{rpipe};
+}
+
+sub wait_await { # run_await cb
+ my ($pid, $cmd, $cmd_env, $opt, $self) = @_;
+ waitpid_err($pid, $self, $opt);
}
sub finish ($;$) {
@@ -140,52 +147,16 @@ sub start ($$$) {
}
}
-sub psgi_qx_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
- my ($self) = @_;
- my ($r, $buf);
-reread:
- $r = sysread($self->{rpipe}, $buf, 65536);
- if (!defined($r)) {
- return if $! == EAGAIN; # try again when notified
- goto reread if $! == EINTR;
- event_step($self, $!);
- } elsif (my $as = delete $self->{async}) { # PublicInbox::HTTPD::Async
- $as->async_pass($self->{psgi_env}->{'psgix.io'},
- $self->{qx_fh}, \$buf);
- } elsif ($r) { # generic PSGI:
- print { $self->{qx_fh} } $buf;
- } else { # EOF
- event_step($self, undef);
- }
-}
-
-sub psgi_qx_start {
- my ($self) = @_;
- if (my $async = $self->{psgi_env}->{'pi-httpd.async'}) {
- # PublicInbox::HTTPD::Async->new(rpipe, $cb, cb_arg, $end_obj)
- $self->{async} = $async->($self->{rpipe},
- \&psgi_qx_init_cb, $self, $self);
- # init_cb will call ->async_pass or ->close
- } else { # generic PSGI
- psgi_qx_init_cb($self) while $self->{qx_fh};
- }
-}
-
-# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with
+# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls @qx_cb_arg with
# the stdout of the given command when done; but respects the given limiter
# $env is the PSGI env. As with ``/qx; only use this when output is small
# and safe to slurp.
sub psgi_qx {
- my ($self, $env, $limiter, $qx_cb, $qx_arg) = @_;
+ my ($self, $env, $limiter, @qx_cb_arg) = @_;
$self->{psgi_env} = $env;
- my $qx_buf = '';
- open(my $qx_fh, '+>', \$qx_buf) or die; # PerlIO::scalar
- $self->{qx_cb} = $qx_cb;
- $self->{qx_arg} = $qx_arg;
- $self->{qx_fh} = $qx_fh;
- $self->{qx_buf} = \$qx_buf;
+ $self->{qx_cb_arg} = \@qx_cb_arg;
$limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
- start($self, $limiter, \&psgi_qx_start);
+ start($self, $limiter, undef);
}
# this is called on pipe EOF to reap the process, may be called
@@ -195,7 +166,7 @@ sub event_step {
my ($self, $err) = @_; # $err: $!
warn "psgi_{return,qx} $err" if defined($err);
finish($self);
- my ($fh, $qx_fh) = delete(@$self{qw(qfh qx_fh)});
+ my $fh = delete $self->{qfh};
$fh->close if $fh; # async-only (psgi_return)
}
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 04/26] www_coderepo: capture uses a flattened list
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (2 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 03/26] psgi_qx: use a temporary file rather than pipe Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 05/26] qspawn: psgi_return allows list for callback args Eric Wong
` (21 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
We no longer need a multi-dimensional list to pass multiple
arguments to the psgi_qx callback. This simplifies usage
and reduces allocations.
---
lib/PublicInbox/WwwCoderepo.pm | 8 +++-----
1 file changed, 3 insertions(+), 5 deletions(-)
diff --git a/lib/PublicInbox/WwwCoderepo.pm b/lib/PublicInbox/WwwCoderepo.pm
index e8c340b5..68c4c86d 100644
--- a/lib/PublicInbox/WwwCoderepo.pm
+++ b/lib/PublicInbox/WwwCoderepo.pm
@@ -178,9 +178,8 @@ EOM
}
sub capture { # psgi_qx callback to capture git-for-each-ref
- my ($bref, $arg) = @_; # arg = [ctx, key, OnDestroy(summary_END)]
- utf8_maybe($$bref);
- $arg->[0]->{qx_res}->{$arg->[1]} = $$bref;
+ my ($bref, $ctx, $key) = @_; # $_[3] = OnDestroy(summary_END)
+ $ctx->{qx_res}->{$key} = $$bref;
# summary_END may be called via OnDestroy $arg->[2]
}
@@ -220,8 +219,7 @@ sub summary ($$) {
my ($k, $cmd) = @$_;
my $qsp = PublicInbox::Qspawn->new($cmd, \%env, \%opt);
$qsp->{qsp_err} = $qsp_err;
- $qsp->psgi_qx($ctx->{env}, undef, \&capture,
- [$ctx, $k, $END]);
+ $qsp->psgi_qx($ctx->{env}, undef, \&capture, $ctx, $k, $END);
}
$tip //= 'HEAD';
my @try = ("$tip:README", "$tip:README.md"); # TODO: configurable
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 05/26] qspawn: psgi_return allows list for callback args
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (3 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 04/26] www_coderepo: capture uses a flattened list Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 06/26] qspawn: drop unused err arg for ->event_step Eric Wong
` (20 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
This slightly simplifies our GitHTTPBackend wrapper.
We can also use shorter variable names to avoid wrapping some
lines.
---
lib/PublicInbox/GitHTTPBackend.pm | 6 +++---
lib/PublicInbox/Qspawn.pm | 19 ++++++++-----------
2 files changed, 11 insertions(+), 14 deletions(-)
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index d69f5f8b..edbc0157 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -80,9 +80,9 @@ sub serve_dumb {
}
sub git_parse_hdr { # {parse_hdr} for Qspawn
- my ($r, $bref, $dumb_args) = @_;
+ my ($r, $bref, @dumb_args) = @_;
my $res = parse_cgi_headers($r, $bref) or return; # incomplete
- $res->[0] == 403 ? serve_dumb(@$dumb_args) : $res;
+ $res->[0] == 403 ? serve_dumb(@dumb_args) : $res;
}
# returns undef if 403 so it falls back to dumb HTTP
@@ -106,7 +106,7 @@ sub serve_smart {
$env{PATH_TRANSLATED} = "$git->{git_dir}/$path";
my $rdr = input_prepare($env) or return r(500);
my $qsp = PublicInbox::Qspawn->new([qw(git http-backend)], \%env, $rdr);
- $qsp->psgi_return($env, $limiter, \&git_parse_hdr, [$env, $git, $path]);
+ $qsp->psgi_return($env, $limiter, \&git_parse_hdr, $env, $git, $path);
}
sub input_prepare {
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 59d5ed40..0f900691 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -174,16 +174,13 @@ sub rd_hdr ($) {
my ($self) = @_;
# typically used for reading CGI headers
# We also need to check EINTR for generic PSGI servers.
- my $ret;
- my $total_rd = 0;
- my $hdr_buf = $self->{hdr_buf};
- my ($ph_cb, $ph_arg) = @{$self->{parse_hdr}};
+ my ($ret, $total_rd);
+ my ($bref, $ph_cb, @ph_arg) = ($self->{hdr_buf}, @{$self->{parse_hdr}});
until (defined($ret)) {
- my $r = sysread($self->{rpipe}, $$hdr_buf, 4096,
- length($$hdr_buf));
+ my $r = sysread($self->{rpipe}, $$bref, 4096, length($$bref));
if (defined($r)) {
$total_rd += $r;
- eval { $ret = $ph_cb->($total_rd, $hdr_buf, $ph_arg) };
+ eval { $ret = $ph_cb->($total_rd, $bref, @ph_arg) };
if ($@) {
warn "parse_hdr: $@";
$ret = [ 500, [], [ "Internal error\n" ] ];
@@ -207,7 +204,7 @@ EOM
sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
my ($self) = @_;
- my $r = rd_hdr($self) or return;
+ my $r = rd_hdr($self) or return; # incomplete
my $env = $self->{psgi_env};
my $filter;
@@ -277,7 +274,7 @@ sub psgi_return_start { # may run later, much later...
#
# $limiter - the Limiter object to use (uses the def_limiter if not given)
#
-# $parse_hdr - Initial read function; often for parsing CGI header output.
+# @parse_hdr_arg - Initial read cb+args; often for parsing CGI header output.
# It will be given the return value of sysread from the pipe
# and a string ref of the current buffer. Returns an arrayref
# for PSGI responses. 2-element arrays in PSGI mean the
@@ -285,10 +282,10 @@ sub psgi_return_start { # may run later, much later...
# psgix.io. 3-element arrays means the body is available
# immediately (or streamed via ->getline (pull-based)).
sub psgi_return {
- my ($self, $env, $limiter, $parse_hdr, $hdr_arg) = @_;
+ my ($self, $env, $limiter, @parse_hdr_arg)= @_;
$self->{psgi_env} = $env;
$self->{hdr_buf} = \(my $hdr_buf = '');
- $self->{parse_hdr} = [ $parse_hdr, $hdr_arg ];
+ $self->{parse_hdr} = \@parse_hdr_arg;
$limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
# the caller already captured the PSGI write callback from
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 06/26] qspawn: drop unused err arg for ->event_step
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (4 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 05/26] qspawn: psgi_return allows list for callback args Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 07/26] httpd/async: require IO arg Eric Wong
` (19 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
It's no longer needed since psgi_qx doesn't use a pipe, anymore.
---
lib/PublicInbox/Qspawn.pm | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 0f900691..9a7e8734 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -163,8 +163,7 @@ sub psgi_qx {
# via PublicInbox::DS event loop OR via GetlineBody for generic
# PSGI servers.
sub event_step {
- my ($self, $err) = @_; # $err: $!
- warn "psgi_{return,qx} $err" if defined($err);
+ my ($self) = @_;
finish($self);
my $fh = delete $self->{qfh};
$fh->close if $fh; # async-only (psgi_return)
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 07/26] httpd/async: require IO arg
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (5 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 06/26] qspawn: drop unused err arg for ->event_step Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 08/26] xt/check-run: call DS->Reset after all tests Eric Wong
` (18 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
Callers that want to requeue can call PublicInbox::DS::requeue
directly and not go through the convoluted argument handling
via PublicInbox::HTTPD::Async->new.
---
lib/PublicInbox/HTTPD/Async.pm | 8 --------
lib/PublicInbox/MailDiff.pm | 7 +++----
lib/PublicInbox/SolverGit.pm | 10 +++-------
3 files changed, 6 insertions(+), 19 deletions(-)
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index b73d0c4b..2e4d8baa 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -25,14 +25,6 @@ use PublicInbox::ProcessIONBF;
# bidirectional socket in the future.
sub new {
my ($class, $io, $cb, $arg, $end_obj) = @_;
-
- # no $io? call $cb at the top of the next event loop to
- # avoid recursion:
- unless (defined($io)) {
- PublicInbox::DS::requeue($cb ? $cb : $arg);
- die '$end_obj unsupported w/o $io' if $end_obj;
- return;
- }
my $self = bless {
cb => $cb, # initial read callback
arg => $arg, # arg for $cb
diff --git a/lib/PublicInbox/MailDiff.pm b/lib/PublicInbox/MailDiff.pm
index c3ce9365..908f223c 100644
--- a/lib/PublicInbox/MailDiff.pm
+++ b/lib/PublicInbox/MailDiff.pm
@@ -59,8 +59,7 @@ sub next_smsg ($) {
$ctx->write($ctx->_html_end);
return $ctx->close;
}
- my $async = $self->{ctx}->{env}->{'pi-httpd.async'};
- $async->(undef, undef, $self) if $async # PublicInbox::HTTPD::Async->new
+ PublicInbox::DS::requeue($self) if $ctx->{env}->{'pi-httpd.async'};
}
sub emit_msg_diff {
@@ -125,8 +124,8 @@ sub event_step {
sub begin_mail_diff {
my ($self) = @_;
- if (my $async = $self->{ctx}->{env}->{'pi-httpd.async'}) {
- $async->(undef, undef, $self); # PublicInbox::HTTPD::Async->new
+ if ($self->{ctx}->{env}->{'pi-httpd.async'}) {
+ PublicInbox::DS::requeue($self);
} else {
event_step($self) while $self->{smsg};
}
diff --git a/lib/PublicInbox/SolverGit.pm b/lib/PublicInbox/SolverGit.pm
index 5f317f51..23d4d3d1 100644
--- a/lib/PublicInbox/SolverGit.pm
+++ b/lib/PublicInbox/SolverGit.pm
@@ -386,12 +386,9 @@ sub event_step ($) {
}
sub next_step ($) {
- my ($self) = @_;
# if outside of public-inbox-httpd, caller is expected to be
# looping event_step, anyways
- my $async = $self->{psgi_env}->{'pi-httpd.async'} or return;
- # PublicInbox::HTTPD::Async->new
- $async->(undef, undef, $self);
+ PublicInbox::DS::requeue($_[0]) if $_[0]->{psgi_env}->{'pi-httpd.async'}
}
sub mark_found ($$$) {
@@ -690,9 +687,8 @@ sub solve ($$$$$) {
$self->{found} = {}; # { abbr => [ ::Git, oid, type, size, $di ] }
dbg($self, "solving $oid_want ...");
- if (my $async = $env->{'pi-httpd.async'}) {
- # PublicInbox::HTTPD::Async->new
- $async->(undef, undef, $self);
+ if ($env->{'pi-httpd.async'}) {
+ PublicInbox::DS::requeue($self);
} else {
event_step($self) while $self->{user_cb};
}
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 08/26] xt/check-run: call DS->Reset after all tests
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (6 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 07/26] httpd/async: require IO arg Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 09/26] qspawn: introduce new psgi_yield API Eric Wong
` (17 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
This ensures reused processes get a clean start and
avoids surprises as we develop more code around the
DS event loop.
---
t/dir_idle.t | 1 -
t/fake_inotify.t | 2 --
xt/check-run.t | 2 ++
3 files changed, 2 insertions(+), 3 deletions(-)
diff --git a/t/dir_idle.t b/t/dir_idle.t
index 35c800f9..14aad7a1 100644
--- a/t/dir_idle.t
+++ b/t/dir_idle.t
@@ -41,5 +41,4 @@ is(scalar(@x), 1, 'got an event') and
ok($x[0]->[0]->IN_DELETE_SELF || $x[0]->[0]->IN_MOVE_SELF,
'IN_DELETE_SELF set on move');
-PublicInbox::DS->Reset;
done_testing;
diff --git a/t/fake_inotify.t b/t/fake_inotify.t
index 56f64588..8221e092 100644
--- a/t/fake_inotify.t
+++ b/t/fake_inotify.t
@@ -48,6 +48,4 @@ is_deeply([map{ $_->fullname }@events], ["$tmpdir/new/tst"], 'unlink detected')
diag explain(\@events);
ok($events[0]->IN_DELETE, 'IN_DELETE set on unlink');
-PublicInbox::DS->Reset;
-
done_testing;
diff --git a/xt/check-run.t b/xt/check-run.t
index 6eefcb7d..cda839fe 100755
--- a/xt/check-run.t
+++ b/xt/check-run.t
@@ -14,6 +14,7 @@ use v5.12;
use IO::Handle; # ->autoflush
use PublicInbox::TestCommon;
use PublicInbox::Spawn;
+use PublicInbox::DS; # already loaded by Spawn via ProcessIO
use Getopt::Long qw(:config gnu_getopt no_ignore_case auto_abbrev);
use Errno qw(EINTR);
use Fcntl qw(:seek);
@@ -187,6 +188,7 @@ my $start_worker = sub {
DIE "short read $r" if $r != UINT_SIZE;
my $t = unpack('I', $buf);
run_test($todo->[$t]);
+ PublicInbox::DS->Reset;
$tb->reset;
}
kill 'USR1', $producer if !$eof; # sets $eof in $producer
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 09/26] qspawn: introduce new psgi_yield API
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (7 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 08/26] xt/check-run: call DS->Reset after all tests Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 10/26] repo_atom: switch to psgi_yield Eric Wong
` (16 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
This is intended to replace psgi_return and HTTPD/Async
entirely, hopefully making our code less convoluted while
maintaining the ability to handle slow clients on
memory-constrained systems
This was made possible by the philosophy shift in commit 21a539a2df0c
(httpd/async: switch to buffering-as-fast-as-possible, 2019-06-28).
We'll still support generic PSGI via the `pull' model with a
GetlineResponse class which is similar to the old GetlineBody.
---
MANIFEST | 1 +
lib/PublicInbox/GetlineResponse.pm | 40 ++++++++++
lib/PublicInbox/GitHTTPBackend.pm | 4 +-
lib/PublicInbox/GzipFilter.pm | 3 +-
lib/PublicInbox/HTTP.pm | 8 +-
lib/PublicInbox/InputPipe.pm | 12 +--
lib/PublicInbox/LEI.pm | 2 +-
lib/PublicInbox/Qspawn.pm | 119 ++++++++++++++++++++++++++++-
8 files changed, 176 insertions(+), 13 deletions(-)
create mode 100644 lib/PublicInbox/GetlineResponse.pm
diff --git a/MANIFEST b/MANIFEST
index f087621c..420b40a1 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -204,6 +204,7 @@ lib/PublicInbox/Filter/Vger.pm
lib/PublicInbox/Gcf2.pm
lib/PublicInbox/Gcf2Client.pm
lib/PublicInbox/GetlineBody.pm
+lib/PublicInbox/GetlineResponse.pm
lib/PublicInbox/Git.pm
lib/PublicInbox/GitAsyncCat.pm
lib/PublicInbox/GitCredential.pm
diff --git a/lib/PublicInbox/GetlineResponse.pm b/lib/PublicInbox/GetlineResponse.pm
new file mode 100644
index 00000000..290cce74
--- /dev/null
+++ b/lib/PublicInbox/GetlineResponse.pm
@@ -0,0 +1,40 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# For generic PSGI servers (not public-inbox-httpd/netd) which assumes their
+# getline response bodies can be backpressure-aware for slow clients
+# This depends on rpipe being _blocking_ on getline.
+package PublicInbox::GetlineResponse;
+use v5.12;
+
+sub response {
+ my ($qsp) = @_;
+ my ($res, $rbuf);
+ do { # read header synchronously
+ sysread($qsp->{rpipe}, $rbuf, 65536);
+ $res = $qsp->parse_hdr_done($rbuf); # fills $bref
+ } until defined($res);
+ my ($wcb, $filter) = $qsp->yield_pass(undef, $res) or return;
+ my $self = $res->[2] = bless {
+ qsp => $qsp,
+ filter => $filter,
+ }, __PACKAGE__;
+ my ($bref) = @{delete $qsp->{yield_parse_hdr}};
+ $self->{rbuf} = $$bref if $$bref ne '';
+ $wcb->($res);
+}
+
+sub getline {
+ my ($self) = @_;
+ my $rpipe = $self->{qsp}->{rpipe} // do {
+ delete($self->{qsp})->finish;
+ return; # EOF was set on previous call
+ };
+ my $buf = delete($self->{rbuf}) // $rpipe->getline;
+ $buf // delete($self->{qsp}->{rpipe}); # set EOF for next call
+ $self->{filter} ? $self->{filter}->translate($buf) : $buf;
+}
+
+sub close {}
+
+1;
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index edbc0157..d7e0bced 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -79,7 +79,7 @@ sub serve_dumb {
PublicInbox::WwwStatic::response($env, $h, $path, $type);
}
-sub git_parse_hdr { # {parse_hdr} for Qspawn
+sub ghb_parse_hdr { # header parser for Qspawn
my ($r, $bref, @dumb_args) = @_;
my $res = parse_cgi_headers($r, $bref) or return; # incomplete
$res->[0] == 403 ? serve_dumb(@dumb_args) : $res;
@@ -106,7 +106,7 @@ sub serve_smart {
$env{PATH_TRANSLATED} = "$git->{git_dir}/$path";
my $rdr = input_prepare($env) or return r(500);
my $qsp = PublicInbox::Qspawn->new([qw(git http-backend)], \%env, $rdr);
- $qsp->psgi_return($env, $limiter, \&git_parse_hdr, $env, $git, $path);
+ $qsp->psgi_yield($env, $limiter, \&ghb_parse_hdr, $env, $git, $path);
}
sub input_prepare {
diff --git a/lib/PublicInbox/GzipFilter.pm b/lib/PublicInbox/GzipFilter.pm
index db8e8397..d6ecd5ba 100644
--- a/lib/PublicInbox/GzipFilter.pm
+++ b/lib/PublicInbox/GzipFilter.pm
@@ -123,9 +123,10 @@ sub http_out ($) {
};
}
+# returns undef if HTTP client disconnected, may return 0
+# because ->translate can return ''
sub write {
my $self = shift;
- # my $ret = bytes::length($_[1]); # XXX does anybody care?
http_out($self)->write($self->translate(@_));
}
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index ca162939..edc88fe8 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -455,11 +455,12 @@ sub next_step {
# They may be exposed to the PSGI application when the PSGI app
# returns a CODE ref for "push"-based responses
package PublicInbox::HTTP::Chunked;
-use strict;
+use v5.12;
sub write {
# ([$http], $buf) = @_;
- PublicInbox::HTTP::chunked_write($_[0]->[0], $_[1])
+ PublicInbox::HTTP::chunked_write($_[0]->[0], $_[1]);
+ $_[0]->[0]->{sock} ? length($_[1]) : undef;
}
sub close {
@@ -468,12 +469,13 @@ sub close {
}
package PublicInbox::HTTP::Identity;
-use strict;
+use v5.12;
our @ISA = qw(PublicInbox::HTTP::Chunked);
sub write {
# ([$http], $buf) = @_;
PublicInbox::HTTP::identity_write($_[0]->[0], $_[1]);
+ $_[0]->[0]->{sock} ? length($_[1]) : undef;
}
1;
diff --git a/lib/PublicInbox/InputPipe.pm b/lib/PublicInbox/InputPipe.pm
index b38d8270..f4d57e7d 100644
--- a/lib/PublicInbox/InputPipe.pm
+++ b/lib/PublicInbox/InputPipe.pm
@@ -39,14 +39,16 @@ sub consume {
if ($@) { # regular file (but not w/ select|IO::Poll backends)
$self->{-need_rq} = 1;
$self->requeue;
- } elsif (-p $in || -S _) { # O_NONBLOCK for sockets and pipes
+ } elsif (do { no warnings 'unopened'; !stat($in) }) { # ProcessIONBF
+ } elsif (-p _ || -S _) { # O_NONBLOCK for sockets and pipes
$in->blocking(0);
} elsif (-t $in) { # isatty(3) can't use `_' stat cache
unblock_tty($self);
}
+ $self;
}
-sub close {
+sub close { # idempotent
my ($self) = @_;
if (my $t = delete($self->{restore_termios})) {
my $fd = fileno($self->{sock} // return);
@@ -60,16 +62,16 @@ sub event_step {
my $r = sysread($self->{sock} // return, my $rbuf, 65536);
eval {
if ($r) {
- $self->{cb}->(@{$self->{args}}, $rbuf);
+ $self->{cb}->($self, @{$self->{args}}, $rbuf);
$self->requeue if $self->{-need_rq};
} elsif (defined($r)) { # EOF
- $self->{cb}->(@{$self->{args}}, '');
+ $self->{cb}->($self, @{$self->{args}}, '');
$self->close
} elsif ($!{EAGAIN}) { # rely on EPOLLIN
} elsif ($!{EINTR}) { # rely on EPOLLIN for sockets/pipes/tty
$self->requeue if $self->{-need_rq};
} else { # another error
- $self->{cb}->(@{$self->{args}}, undef);
+ $self->{cb}->($self, @{$self->{args}}, undef);
$self->close;
}
};
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 56e4c001..7bc7b2dc 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1567,7 +1567,7 @@ sub request_umask {
}
sub _stdin_cb { # PublicInbox::InputPipe::consume callback for --stdin
- my ($lei, $cb) = @_; # $_[-1] = $rbuf
+ my (undef, $lei, $cb) = @_; # $_[-1] = $rbuf
$_[1] // return $lei->fail("error reading stdin: $!");
$lei->{stdin_buf} .= $_[-1];
do_env($lei, $cb) if $_[-1] eq '';
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 9a7e8734..203d8f41 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -31,6 +31,9 @@ use PublicInbox::GzipFilter;
use Scalar::Util qw(blessed);
use PublicInbox::Limiter;
use PublicInbox::Aspawn qw(run_await);
+use PublicInbox::Syscall qw(EPOLLIN);
+use PublicInbox::InputPipe;
+use Carp qw(carp confess);
# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
use Errno qw(EAGAIN EINTR);
@@ -61,7 +64,7 @@ sub _do_spawn {
if ($start_cb) {
eval { # popen_rd may die on EMFILE, ENFILE
$self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
- \&waitpid_err, $self);
+ \&waitpid_err, $self, \%o);
$start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
};
} else {
@@ -126,6 +129,20 @@ sub wait_await { # run_await cb
waitpid_err($pid, $self, $opt);
}
+sub yield_chunk { # $_[-1] is sysread buffer (or undef)
+ my ($self, $ipipe) = @_;
+ if (!defined($_[-1])) {
+ warn "error reading body: $!";
+ } elsif ($_[-1] eq '') { # normal EOF
+ $self->finish;
+ $self->{qfh}->close;
+ } elsif (defined($self->{qfh}->write($_[-1]))) {
+ return; # continue while HTTP client is reading our writes
+ } # else { # HTTP client disconnected
+ delete $self->{rpipe};
+ $ipipe->close;
+}
+
sub finish ($;$) {
my ($self, $err) = @_;
$self->{_err} //= $err; # only for $@
@@ -201,6 +218,39 @@ EOM
$ret;
}
+sub yield_pass {
+ my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe
+ my $env = $self->{psgi_env};
+ my $wcb = delete $env->{'qspawn.wcb'} // confess('BUG: no qspawn.wcb');
+ if (ref($res) eq 'CODE') { # chain another command
+ delete $self->{rpipe};
+ $ipipe->close if $ipipe;
+ $res->($wcb);
+ $self->{passed} = 1;
+ return; # all done
+ }
+ confess("BUG: $res unhandled") if ref($res) ne 'ARRAY';
+
+ my $filter = blessed($res->[2]) && $res->[2]->can('attach') ?
+ pop(@$res) : delete($env->{'qspawn.filter'});
+ $filter //= PublicInbox::GzipFilter::qsp_maybe($res->[1], $env);
+
+ if (scalar(@$res) == 3) { # done early (likely error or static file)
+ delete $self->{rpipe};
+ $ipipe->close if $ipipe;
+ $wcb->($res); # all done
+ return;
+ }
+ scalar(@$res) == 2 or confess("BUG: scalar(res) != 2: @$res");
+ return ($wcb, $filter) if !$ipipe; # generic PSGI
+ # streaming response
+ my $qfh = $wcb->($res); # get PublicInbox::HTTP::(Chunked|Identity)
+ $qfh = $filter->attach($qfh) if $filter;
+ my ($bref) = @{delete $self->{yield_parse_hdr}};
+ $qfh->write($$bref) if $$bref ne '';
+ $self->{qfh} = $qfh; # keep $ipipe open
+}
+
sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
my ($self) = @_;
my $r = rd_hdr($self) or return; # incomplete
@@ -257,6 +307,55 @@ sub psgi_return_start { # may run later, much later...
}
}
+sub r500 () { [ 500, [], [ "Internal error\n" ] ] }
+
+sub parse_hdr_done ($$) {
+ my ($self) = @_;
+ my $ret;
+ if (defined $_[-1]) {
+ my ($bref, $ph_cb, @ph_arg) = @{$self->{yield_parse_hdr}};
+ $$bref .= $_[-1];
+ $ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) };
+ if ($@) {
+ carp "parse_hdr (@{$self->{cmd}}): $@\n";
+ $ret = r500();
+ } elsif (!$ret && $_[-1] eq '') {
+ carp <<EOM;
+EOF parsing headers from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
+EOM
+ $ret = r500();
+ }
+ } else {
+ carp <<EOM;
+E: parsing headers: $! from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
+EOM
+ $ret = r500();
+ }
+ $ret; # undef if headers incomplete
+}
+
+sub ipipe_cb { # InputPipe callback
+ my ($ipipe, $self) = @_; # $_[-1] rbuf
+ if ($self->{qfh}) { # already streaming
+ yield_chunk($self, $ipipe, $_[-1]);
+ } elsif (my $res = parse_hdr_done($self, $_[-1])) {
+ yield_pass($self, $ipipe, $res);
+ } # else: headers incomplete, keep reading
+}
+
+sub _yield_start { # may run later, much later...
+ my ($self) = @_;
+ if ($self->{psgi_env}->{'pi-httpd.async'}) {
+ require PublicInbox::ProcessIONBF;
+ my $rpipe = $self->{rpipe};
+ PublicInbox::ProcessIONBF->replace($rpipe);
+ PublicInbox::InputPipe::consume($rpipe, \&ipipe_cb, $self);
+ } else {
+ require PublicInbox::GetlineResponse;
+ PublicInbox::GetlineResponse::response($self);
+ }
+}
+
# Used for streaming the stdout of one process as a PSGI response.
#
# $env is the PSGI env.
@@ -302,4 +401,22 @@ sub psgi_return {
}
}
+sub psgi_yield {
+ my ($self, $env, $limiter, @parse_hdr_arg)= @_;
+ $self->{psgi_env} = $env;
+ $self->{yield_parse_hdr} = [ \(my $buf = ''), @parse_hdr_arg ];
+ $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
+
+ # the caller already captured the PSGI write callback from
+ # the PSGI server, so we can call ->start, here:
+ $env->{'qspawn.wcb'} ? start($self, $limiter, \&_yield_start) : sub {
+ # the caller will return this sub to the PSGI server, so
+ # it can set the response callback (that is, for
+ # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback),
+ # but other HTTP servers are supported:
+ $env->{'qspawn.wcb'} = $_[0];
+ start($self, $limiter, \&_yield_start);
+ }
+}
+
1;
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 10/26] repo_atom: switch to psgi_yield
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (8 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 09/26] qspawn: introduce new psgi_yield API Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 11/26] repo_snapshot: psgi_yield Eric Wong
` (15 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
To my pleasant surprise, it appears I've managed to make
psgi_yield a drop-in replacement for psgi_return...
---
lib/PublicInbox/RepoAtom.pm | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/lib/PublicInbox/RepoAtom.pm b/lib/PublicInbox/RepoAtom.pm
index 79b76c12..b7179511 100644
--- a/lib/PublicInbox/RepoAtom.pm
+++ b/lib/PublicInbox/RepoAtom.pm
@@ -100,7 +100,7 @@ sub srv_tags_atom {
$ctx->{-feed_title} = "$ctx->{git}->{nick} tags";
my $qsp = PublicInbox::Qspawn->new(\@cmd);
$ctx->{-is_tag} = 1;
- $qsp->psgi_return($ctx->{env}, undef, \&atom_ok, $ctx);
+ $qsp->psgi_yield($ctx->{env}, undef, \&atom_ok, $ctx);
}
sub srv_atom {
@@ -122,7 +122,7 @@ sub srv_atom {
push @cmd, $path if $path ne '';
my $qsp = PublicInbox::Qspawn->new(\@cmd, undef,
{ quiet => 1, 2 => $ctx->{lh} });
- $qsp->psgi_return($ctx->{env}, undef, \&atom_ok, $ctx);
+ $qsp->psgi_yield($ctx->{env}, undef, \&atom_ok, $ctx);
}
1;
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 11/26] repo_snapshot: psgi_yield
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (9 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 10/26] repo_atom: switch to psgi_yield Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 12/26] viewvcs: psgi_yield Eric Wong
` (14 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
Another drop-in replacement for psgi_return.
---
lib/PublicInbox/RepoSnapshot.pm | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lib/PublicInbox/RepoSnapshot.pm b/lib/PublicInbox/RepoSnapshot.pm
index ebcbbd81..6b7441b0 100644
--- a/lib/PublicInbox/RepoSnapshot.pm
+++ b/lib/PublicInbox/RepoSnapshot.pm
@@ -58,7 +58,7 @@ sub ver_check { # git->check_async callback
"--git-dir=$ctx->{git}->{git_dir}", 'archive',
"--prefix=$ctx->{snap_pfx}/",
"--format=$ctx->{snap_fmt}", $treeish]);
- $qsp->psgi_return($ctx->{env}, undef, \&archive_hdr, $ctx);
+ $qsp->psgi_yield($ctx->{env}, undef, \&archive_hdr, $ctx);
}
}
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 12/26] viewvcs: psgi_yield
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (10 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 11/26] repo_snapshot: psgi_yield Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 13/26] www_altid: switch to psgi_yield Eric Wong
` (13 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
Another drop-in replacement, though I took the opportunity
to avoid unnecessarily bumping the refcount of $ctx->{env}
---
lib/PublicInbox/ViewVCS.pm | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git a/lib/PublicInbox/ViewVCS.pm b/lib/PublicInbox/ViewVCS.pm
index 86c46e69..6c588ddf 100644
--- a/lib/PublicInbox/ViewVCS.pm
+++ b/lib/PublicInbox/ViewVCS.pm
@@ -101,9 +101,8 @@ sub stream_large_blob ($$) {
my ($git, $oid, $type, $size, $di) = @$res;
my $cmd = ['git', "--git-dir=$git->{git_dir}", 'cat-file', $type, $oid];
my $qsp = PublicInbox::Qspawn->new($cmd);
- my $env = $ctx->{env};
- $env->{'qspawn.wcb'} = $ctx->{-wcb};
- $qsp->psgi_return($env, undef, \&stream_blob_parse_hdr, $ctx);
+ $ctx->{env}->{'qspawn.wcb'} = $ctx->{-wcb};
+ $qsp->psgi_yield($ctx->{env}, undef, \&stream_blob_parse_hdr, $ctx);
}
sub show_other_result ($$) { # future-proofing
@@ -341,7 +340,7 @@ sub show_patch ($$) {
my $qsp = PublicInbox::Qspawn->new(\@cmd);
$ctx->{env}->{'qspawn.wcb'} = $ctx->{-wcb};
$ctx->{patch_oid} = $oid;
- $qsp->psgi_return($ctx->{env}, undef, \&stream_patch_parse_hdr, $ctx);
+ $qsp->psgi_yield($ctx->{env}, undef, \&stream_patch_parse_hdr, $ctx);
}
sub show_commit ($$) {
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 13/26] www_altid: switch to psgi_yield
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (11 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 12/26] viewvcs: psgi_yield Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 14/26] cgit: " Eric Wong
` (12 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
Another drop-in replacement for psgi_return, this one notably
utilizing an unconditional qspawn.filter for gzip unlike the
others.
---
lib/PublicInbox/WwwAltId.pm | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/lib/PublicInbox/WwwAltId.pm b/lib/PublicInbox/WwwAltId.pm
index 47056160..48520142 100644
--- a/lib/PublicInbox/WwwAltId.pm
+++ b/lib/PublicInbox/WwwAltId.pm
@@ -1,9 +1,9 @@
-# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
# dumps using the ".dump" command of sqlite3(1)
package PublicInbox::WwwAltId;
-use strict;
+use v5.12;
use PublicInbox::Qspawn;
use PublicInbox::WwwStream qw(html_oneshot);
use PublicInbox::AltId;
@@ -71,7 +71,7 @@ EOF
my $qsp = PublicInbox::Qspawn->new([$sqlite3, $fn], undef, { 0 => $r });
$ctx->{altid_pfx} = $altid_pfx;
$env->{'qspawn.filter'} = PublicInbox::GzipFilter->new;
- $qsp->psgi_return($env, undef, \&check_output, $ctx);
+ $qsp->psgi_yield($env, undef, \&check_output, $ctx);
}
1;
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 14/26] cgit: switch to psgi_yield
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (12 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 13/26] www_altid: switch to psgi_yield Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 15/26] www_coderepo: use psgi_yield Eric Wong
` (11 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
Another drop-in replacement using an alternate limiter.
---
lib/PublicInbox/Cgit.pm | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lib/PublicInbox/Cgit.pm b/lib/PublicInbox/Cgit.pm
index 4265cfb2..10cad57a 100644
--- a/lib/PublicInbox/Cgit.pm
+++ b/lib/PublicInbox/Cgit.pm
@@ -110,7 +110,7 @@ sub call {
my $rdr = input_prepare($env) or return r(500);
my $qsp = PublicInbox::Qspawn->new($self->{cmd}, $cgi_env, $rdr);
my $limiter = $self->{pi_cfg}->limiter('-cgit');
- $qsp->psgi_return($env, $limiter, $parse_cgi_headers, $ctx);
+ $qsp->psgi_yield($env, $limiter, $parse_cgi_headers, $ctx);
}
1;
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 15/26] www_coderepo: use psgi_yield
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (13 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 14/26] cgit: " Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 16/26] drop psgi_return, httpd/async and GetlineBody Eric Wong
` (10 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
Yet another drop-in replacement for psgi_return.
---
lib/PublicInbox/WwwCoderepo.pm | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lib/PublicInbox/WwwCoderepo.pm b/lib/PublicInbox/WwwCoderepo.pm
index 68c4c86d..6e19fc02 100644
--- a/lib/PublicInbox/WwwCoderepo.pm
+++ b/lib/PublicInbox/WwwCoderepo.pm
@@ -277,7 +277,7 @@ sub refs_foo { # /$REPO/refs/{heads,tags} endpoints
$ctx->{-heads} = 1 if $pfx eq 'refs/heads';
my $qsp = PublicInbox::Qspawn->new([@EACH_REF, $pfx ],
{ GIT_DIR => $ctx->{git}->{git_dir} });
- $qsp->psgi_return($ctx->{env}, undef, \&_refs_parse_hdr, $ctx);
+ $qsp->psgi_yield($ctx->{env}, undef, \&_refs_parse_hdr, $ctx);
}
sub srv { # endpoint called by PublicInbox::WWW
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 16/26] drop psgi_return, httpd/async and GetlineBody
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (14 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 15/26] www_coderepo: use psgi_yield Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 17/26] qspawn: use WwwStatic for fallbacks and error code Eric Wong
` (9 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
Now that psgi_yield is used everywhere, the more complex
psgi_return and it's helper bits can be removed. We'll also fix
some outdated comments now that everything on psgi_return has
switched to psgi_yield. GetlineResponse replaces GetlineBody
and does a better job of isolating generic PSGI-only code.
---
MANIFEST | 2 -
lib/PublicInbox/GetlineBody.pm | 46 ------------
lib/PublicInbox/GitHTTPBackend.pm | 6 +-
lib/PublicInbox/GzipFilter.pm | 2 +-
lib/PublicInbox/HTTPD.pm | 5 +-
lib/PublicInbox/HTTPD/Async.pm | 101 -------------------------
lib/PublicInbox/Qspawn.pm | 121 +-----------------------------
lib/PublicInbox/RepoAtom.pm | 2 +-
lib/PublicInbox/WwwCoderepo.pm | 2 +-
t/httpd-corner.psgi | 14 ++--
t/httpd-corner.t | 12 +--
11 files changed, 19 insertions(+), 294 deletions(-)
delete mode 100644 lib/PublicInbox/GetlineBody.pm
delete mode 100644 lib/PublicInbox/HTTPD/Async.pm
diff --git a/MANIFEST b/MANIFEST
index 420b40a1..3df48667 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -203,7 +203,6 @@ lib/PublicInbox/Filter/SubjectTag.pm
lib/PublicInbox/Filter/Vger.pm
lib/PublicInbox/Gcf2.pm
lib/PublicInbox/Gcf2Client.pm
-lib/PublicInbox/GetlineBody.pm
lib/PublicInbox/GetlineResponse.pm
lib/PublicInbox/Git.pm
lib/PublicInbox/GitAsyncCat.pm
@@ -212,7 +211,6 @@ lib/PublicInbox/GitHTTPBackend.pm
lib/PublicInbox/GzipFilter.pm
lib/PublicInbox/HTTP.pm
lib/PublicInbox/HTTPD.pm
-lib/PublicInbox/HTTPD/Async.pm
lib/PublicInbox/HlMod.pm
lib/PublicInbox/Hval.pm
lib/PublicInbox/IMAP.pm
diff --git a/lib/PublicInbox/GetlineBody.pm b/lib/PublicInbox/GetlineBody.pm
deleted file mode 100644
index 0e781224..00000000
--- a/lib/PublicInbox/GetlineBody.pm
+++ /dev/null
@@ -1,46 +0,0 @@
-# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-
-# Wrap a pipe or file for PSGI streaming response bodies and calls the
-# end callback when the object goes out-of-scope.
-# This depends on rpipe being _blocking_ on getline.
-#
-# This is only used by generic PSGI servers and not public-inbox-httpd
-package PublicInbox::GetlineBody;
-use strict;
-use warnings;
-
-sub new {
- my ($class, $rpipe, $end, $end_arg, $buf, $filter) = @_;
- bless {
- rpipe => $rpipe,
- end => $end,
- end_arg => $end_arg,
- initial_buf => $buf,
- filter => $filter,
- }, $class;
-}
-
-# close should always be called after getline returns undef,
-# but a client aborting a connection can ruin our day; so lets
-# hope our underlying PSGI server does not leak references, here.
-sub DESTROY { $_[0]->close }
-
-sub getline {
- my ($self) = @_;
- my $rpipe = $self->{rpipe} or return; # EOF was set on previous call
- my $buf = delete($self->{initial_buf}) // $rpipe->getline;
- delete($self->{rpipe}) unless defined $buf; # set EOF for next call
- if (my $filter = $self->{filter}) {
- $buf = $filter->translate($buf);
- }
- $buf;
-}
-
-sub close {
- my ($self) = @_;
- my ($end, $end_arg) = delete @$self{qw(end end_arg)};
- $end->($end_arg) if $end;
-}
-
-1;
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index d7e0bced..7228555b 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -145,16 +145,12 @@ sub parse_cgi_headers { # {parse_hdr} for Qspawn
}
}
- # fallback to WwwCoderepo if cgit 404s. Duplicating $ctx prevents
- # ->finalize from the current Qspawn from using qspawn.wcb.
- # This makes qspawn skip ->async_pass and causes
- # PublicInbox::HTTPD::Async::event_step to close shortly after
+ # fallback to WwwCoderepo if cgit 404s
if ($code == 404 && $ctx->{www} && !$ctx->{_coderepo_tried}++) {
my $wcb = delete $ctx->{env}->{'qspawn.wcb'};
$ctx->{env}->{'plack.skip-deflater'} = 1; # prevent 2x gzip
$ctx->{env}->{'qspawn.fallback'} = $code;
my $res = $ctx->{www}->coderepo->srv($ctx);
- # for ->psgi_return_init_cb
$ctx->{env}->{'qspawn.wcb'} = $wcb;
$res; # CODE or ARRAY ref
} else {
diff --git a/lib/PublicInbox/GzipFilter.pm b/lib/PublicInbox/GzipFilter.pm
index d6ecd5ba..fc471ea2 100644
--- a/lib/PublicInbox/GzipFilter.pm
+++ b/lib/PublicInbox/GzipFilter.pm
@@ -93,7 +93,7 @@ sub gone { # what: search/over/mm
undef;
}
-# for GetlineBody (via Qspawn) when NOT using $env->{'pi-httpd.async'}
+# for GetlineResponse (via Qspawn) when NOT using $env->{'pi-httpd.async'}
# Also used for ->getline callbacks
sub translate {
my $self = shift; # $_[1] => input
diff --git a/lib/PublicInbox/HTTPD.pm b/lib/PublicInbox/HTTPD.pm
index bae7281b..6a6347d8 100644
--- a/lib/PublicInbox/HTTPD.pm
+++ b/lib/PublicInbox/HTTPD.pm
@@ -9,9 +9,6 @@ use strict;
use Plack::Util ();
use Plack::Builder;
use PublicInbox::HTTP;
-use PublicInbox::HTTPD::Async;
-
-sub pi_httpd_async { PublicInbox::HTTPD::Async->new(@_) }
# we have a different env for ever listener socket for
# SERVER_NAME, SERVER_PORT and psgi.url_scheme
@@ -45,7 +42,7 @@ sub env_for ($$$) {
# this to limit git-http-backend(1) parallelism.
# We also check for the truthiness of this to
# detect when to use async paths for slow blobs
- 'pi-httpd.async' => \&pi_httpd_async,
+ 'pi-httpd.async' => 1,
'pi-httpd.app' => $self->{app},
'pi-httpd.warn_cb' => $self->{warn_cb},
}
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
deleted file mode 100644
index 2e4d8baa..00000000
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ /dev/null
@@ -1,101 +0,0 @@
-# Copyright (C) all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-#
-# XXX This is a totally unstable API for public-inbox internal use only
-# This is exposed via the 'pi-httpd.async' key in the PSGI env hash.
-# The name of this key is not even stable!
-# Currently intended for use with read-only pipes with expensive
-# processes such as git-http-backend(1), cgit(1)
-#
-# fields:
-# http: PublicInbox::HTTP ref
-# fh: PublicInbox::HTTP::{Identity,Chunked} ref (can ->write + ->close)
-# cb: initial read callback
-# arg: arg for {cb}
-# end_obj: CODE or object which responds to ->event_step when ->close is called
-package PublicInbox::HTTPD::Async;
-use v5.12;
-use parent qw(PublicInbox::DS);
-use Errno qw(EAGAIN);
-use PublicInbox::Syscall qw(EPOLLIN);
-use PublicInbox::ProcessIONBF;
-
-# This is called via: $env->{'pi-httpd.async'}->()
-# $io is a read-only pipe ($rpipe) for now, but may be a
-# bidirectional socket in the future.
-sub new {
- my ($class, $io, $cb, $arg, $end_obj) = @_;
- my $self = bless {
- cb => $cb, # initial read callback
- arg => $arg, # arg for $cb
- end_obj => $end_obj, # like END{}, can ->event_step
- }, $class;
- PublicInbox::ProcessIONBF->replace($io);
- $self->SUPER::new($io, EPOLLIN);
-}
-
-sub event_step {
- my ($self) = @_;
- if (defined $self->{cb}) {
- # this may call async_pass when headers are done
- $self->{cb}->($self->{arg});
- } elsif (my $sock = $self->{sock}) {
- # $http may be undef if discarding body output from cgit on 404
- my $http = $self->{http} or return $self->close;
- # $self->{sock} is a read pipe for git-http-backend or cgit
- # and 65536 is the default Linux pipe size
- my $r = sysread($sock, my $buf, 65536);
- if ($r) {
- $self->{ofh}->write($buf); # may call $http->close
- # let other clients get some work done, too
- return if $http->{sock}; # !closed
-
- # else: fall through to close below...
- } elsif (!defined $r && $! == EAGAIN) {
- return; # EPOLLIN means we'll be notified
- }
-
- # Done! Error handling will happen in $self->{ofh}->close
- # called by end_obj->event_step handler
- delete $http->{forward};
- $self->close; # queues end_obj->event_step to be called
- } # else { # we may've been requeued but closed by $http
-}
-
-# once this is called, all data we read is passed to the
-# to the PublicInbox::HTTP instance ($http) via $ofh->write
-# $ofh is typically PublicInbox::HTTP::{Chunked,Identity}, but
-# may be PublicInbox::GzipFilter or $PublicInbox::Qspawn::qx_fh
-sub async_pass {
- my ($self, $http, $ofh, $bref) = @_;
- delete @$self{qw(cb arg)};
- # In case the client HTTP connection ($http) dies, it
- # will automatically close this ($self) object.
- $http->{forward} = $self;
-
- # write anything we overread when we were reading headers.
- # This is typically PublicInbox:HTTP::{chunked,identity}_wcb,
- # but may be PublicInbox::GzipFilter::write. PSGI requires
- # *_wcb methods respond to ->write (and ->close), not ->print
- $ofh->write($$bref);
-
- $self->{http} = $http;
- $self->{ofh} = $ofh;
-}
-
-# may be called as $forward->close in PublicInbox::HTTP or EOF (event_step)
-sub close {
- my $self = $_[0];
- $self->SUPER::close; # DS::close
- delete @$self{qw(cb arg)};
-
- # we defer this to the next timer loop since close is deferred
- if (my $end_obj = delete $self->{end_obj}) {
- # this calls $end_obj->event_step
- # (likely PublicInbox::Qspawn::event_step,
- # NOT PublicInbox::HTTPD::Async::event_step)
- PublicInbox::DS::requeue($end_obj);
- }
-}
-
-1;
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 203d8f41..a6e1d58b 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -176,48 +176,6 @@ sub psgi_qx {
start($self, $limiter, undef);
}
-# this is called on pipe EOF to reap the process, may be called
-# via PublicInbox::DS event loop OR via GetlineBody for generic
-# PSGI servers.
-sub event_step {
- my ($self) = @_;
- finish($self);
- my $fh = delete $self->{qfh};
- $fh->close if $fh; # async-only (psgi_return)
-}
-
-sub rd_hdr ($) {
- my ($self) = @_;
- # typically used for reading CGI headers
- # We also need to check EINTR for generic PSGI servers.
- my ($ret, $total_rd);
- my ($bref, $ph_cb, @ph_arg) = ($self->{hdr_buf}, @{$self->{parse_hdr}});
- until (defined($ret)) {
- my $r = sysread($self->{rpipe}, $$bref, 4096, length($$bref));
- if (defined($r)) {
- $total_rd += $r;
- eval { $ret = $ph_cb->($total_rd, $bref, @ph_arg) };
- if ($@) {
- warn "parse_hdr: $@";
- $ret = [ 500, [], [ "Internal error\n" ] ];
- } elsif (!defined($ret) && !$r) {
- warn <<EOM;
-EOF parsing headers from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
-EOM
- $ret = [ 500, [], [ "Internal error\n" ] ];
- }
- } else {
- # caller should notify us when it's ready:
- return if $! == EAGAIN;
- next if $! == EINTR; # immediate retry
- warn "error reading header: $!";
- $ret = [ 500, [], [ "Internal error\n" ] ];
- }
- }
- delete $self->{parse_hdr}; # done parsing headers
- $ret;
-}
-
sub yield_pass {
my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe
my $env = $self->{psgi_env};
@@ -251,62 +209,6 @@ sub yield_pass {
$self->{qfh} = $qfh; # keep $ipipe open
}
-sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
- my ($self) = @_;
- my $r = rd_hdr($self) or return; # incomplete
- my $env = $self->{psgi_env};
- my $filter;
-
- # this is for RepoAtom since that can fire after parse_cgi_headers
- if (ref($r) eq 'ARRAY' && blessed($r->[2]) && $r->[2]->can('attach')) {
- $filter = pop @$r;
- }
- $filter //= delete($env->{'qspawn.filter'}) // (ref($r) eq 'ARRAY' ?
- PublicInbox::GzipFilter::qsp_maybe($r->[1], $env) : undef);
-
- my $wcb = delete $env->{'qspawn.wcb'};
- my $async = delete $self->{async}; # PublicInbox::HTTPD::Async
- if (ref($r) ne 'ARRAY' || scalar(@$r) == 3) { # error
- if ($async) { # calls rpipe->close && ->event_step
- $async->close; # PublicInbox::HTTPD::Async::close
- } else { # generic PSGI, use PublicInbox::ProcessIO::CLOSE
- delete($self->{rpipe})->close;
- event_step($self);
- }
- if (ref($r) eq 'ARRAY') { # error
- $wcb->($r)
- } elsif (ref($r) eq 'CODE') { # chain another command
- $r->($wcb);
- $self->{passed} = 1;
- }
- # else do nothing
- } elsif ($async) {
- # done reading headers, handoff to read body
- my $fh = $wcb->($r); # scalar @$r == 2
- $fh = $filter->attach($fh) if $filter;
- $self->{qfh} = $fh;
- $async->async_pass($env->{'psgix.io'}, $fh,
- delete($self->{hdr_buf}));
- } else { # for synchronous PSGI servers
- require PublicInbox::GetlineBody;
- my $buf = delete $self->{hdr_buf};
- $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe},
- \&event_step, $self, $$buf, $filter);
- $wcb->($r);
- }
-}
-
-sub psgi_return_start { # may run later, much later...
- my ($self) = @_;
- if (my $cb = $self->{psgi_env}->{'pi-httpd.async'}) {
- # PublicInbox::HTTPD::Async->new(rpipe, $cb, $cb_arg, $end_obj)
- $self->{async} = $cb->($self->{rpipe},
- \&psgi_return_init_cb, $self, $self);
- } else { # generic PSGI
- psgi_return_init_cb($self) while $self->{parse_hdr};
- }
-}
-
sub r500 () { [ 500, [], [ "Internal error\n" ] ] }
sub parse_hdr_done ($$) {
@@ -363,7 +265,7 @@ sub _yield_start { # may run later, much later...
# $env->{'qspawn.wcb'} - the write callback from the PSGI server
# optional, use this if you've already
# captured it elsewhere. If not given,
-# psgi_return will return an anonymous
+# psgi_yield will return an anonymous
# sub for the PSGI server to call
#
# $env->{'qspawn.filter'} - filter object, responds to ->attach for
@@ -379,27 +281,6 @@ sub _yield_start { # may run later, much later...
# body will be streamed, later, via writes (push-based) to
# psgix.io. 3-element arrays means the body is available
# immediately (or streamed via ->getline (pull-based)).
-sub psgi_return {
- my ($self, $env, $limiter, @parse_hdr_arg)= @_;
- $self->{psgi_env} = $env;
- $self->{hdr_buf} = \(my $hdr_buf = '');
- $self->{parse_hdr} = \@parse_hdr_arg;
- $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
-
- # the caller already captured the PSGI write callback from
- # the PSGI server, so we can call ->start, here:
- $env->{'qspawn.wcb'} and
- return start($self, $limiter, \&psgi_return_start);
-
- # the caller will return this sub to the PSGI server, so
- # it can set the response callback (that is, for
- # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback),
- # but other HTTP servers are supported:
- sub {
- $env->{'qspawn.wcb'} = $_[0];
- start($self, $limiter, \&psgi_return_start);
- }
-}
sub psgi_yield {
my ($self, $env, $limiter, @parse_hdr_arg)= @_;
diff --git a/lib/PublicInbox/RepoAtom.pm b/lib/PublicInbox/RepoAtom.pm
index b7179511..c1649d0a 100644
--- a/lib/PublicInbox/RepoAtom.pm
+++ b/lib/PublicInbox/RepoAtom.pm
@@ -40,7 +40,7 @@ EOM
# called by GzipFilter->close
sub zflush { $_[0]->SUPER::zflush('</feed>') }
-# called by GzipFilter->write or GetlineBody->getline
+# called by GzipFilter->write or GetlineResponse->getline
sub translate {
my $self = shift;
my $rec = $_[0] // return $self->zflush; # getline
diff --git a/lib/PublicInbox/WwwCoderepo.pm b/lib/PublicInbox/WwwCoderepo.pm
index 6e19fc02..0eb4a2d6 100644
--- a/lib/PublicInbox/WwwCoderepo.pm
+++ b/lib/PublicInbox/WwwCoderepo.pm
@@ -230,7 +230,7 @@ sub summary ($$) {
# called by GzipFilter->close after translate
sub zflush { $_[0]->SUPER::zflush('</pre>', $_[0]->_html_end) }
-# called by GzipFilter->write or GetlineBody->getline
+# called by GzipFilter->write or GetlineResponse->getline
sub translate {
my $ctx = shift;
my $rec = $_[0] // return zflush($ctx); # getline
diff --git a/t/httpd-corner.psgi b/t/httpd-corner.psgi
index 1e96d7b1..e29fd87b 100644
--- a/t/httpd-corner.psgi
+++ b/t/httpd-corner.psgi
@@ -92,34 +92,34 @@ my $app = sub {
my $rdr = { 2 => fileno($null) };
my $cmd = [qw(dd if=/dev/zero count=30 bs=1024k)];
my $qsp = PublicInbox::Qspawn->new($cmd, undef, $rdr);
- return $qsp->psgi_return($env, undef, sub {
+ return $qsp->psgi_yield($env, undef, sub {
my ($r, $bref) = @_;
# make $rd_hdr retry sysread + $parse_hdr in Qspawn:
return until length($$bref) > 8000;
close $null;
[ 200, [ qw(Content-Type application/octet-stream) ]];
});
- } elsif ($path eq '/psgi-return-gzip') {
+ } elsif ($path eq '/psgi-yield-gzip') {
require PublicInbox::Qspawn;
require PublicInbox::GzipFilter;
my $cmd = [qw(echo hello world)];
my $qsp = PublicInbox::Qspawn->new($cmd);
$env->{'qspawn.filter'} = PublicInbox::GzipFilter->new;
- return $qsp->psgi_return($env, undef, sub {
+ return $qsp->psgi_yield($env, undef, sub {
[ 200, [ qw(Content-Type application/octet-stream)]]
});
- } elsif ($path eq '/psgi-return-compressible') {
+ } elsif ($path eq '/psgi-yield-compressible') {
require PublicInbox::Qspawn;
my $cmd = [qw(echo goodbye world)];
my $qsp = PublicInbox::Qspawn->new($cmd);
- return $qsp->psgi_return($env, undef, sub {
+ return $qsp->psgi_yield($env, undef, sub {
[200, [qw(Content-Type text/plain)]]
});
- } elsif ($path eq '/psgi-return-enoent') {
+ } elsif ($path eq '/psgi-yield-enoent') {
require PublicInbox::Qspawn;
my $cmd = [ 'this-better-not-exist-in-PATH'.rand ];
my $qsp = PublicInbox::Qspawn->new($cmd);
- return $qsp->psgi_return($env, undef, sub {
+ return $qsp->psgi_yield($env, undef, sub {
[ 200, [ qw(Content-Type application/octet-stream)]]
});
} elsif ($path eq '/pid') {
diff --git a/t/httpd-corner.t b/t/httpd-corner.t
index aab3635c..2d2d1061 100644
--- a/t/httpd-corner.t
+++ b/t/httpd-corner.t
@@ -374,13 +374,13 @@ SKIP: {
is($non_zero, 0, 'read all zeros');
require_mods(@zmods, 4);
- my $buf = xqx([$curl, '-gsS', "$base/psgi-return-gzip"]);
+ my $buf = xqx([$curl, '-gsS', "$base/psgi-yield-gzip"]);
is($?, 0, 'curl succesful');
IO::Uncompress::Gunzip::gunzip(\$buf => \(my $out));
is($out, "hello world\n");
my $curl_rdr = { 2 => \(my $curl_err = '') };
$buf = xqx([$curl, qw(-gsSv --compressed),
- "$base/psgi-return-compressible"], undef, $curl_rdr);
+ "$base/psgi-yield-compressible"], undef, $curl_rdr);
is($?, 0, 'curl --compressed successful');
is($buf, "goodbye world\n", 'gzipped response as expected');
like($curl_err, qr/\bContent-Encoding: gzip\b/,
@@ -388,8 +388,8 @@ SKIP: {
}
{
- my $conn = conn_for($sock, 'psgi_return ENOENT');
- print $conn "GET /psgi-return-enoent HTTP/1.1\r\n\r\n" or die;
+ my $conn = conn_for($sock, 'psgi_yield ENOENT');
+ print $conn "GET /psgi-yield-enoent HTTP/1.1\r\n\r\n" or die;
my $buf = '';
sysread($conn, $buf, 16384, length($buf)) until $buf =~ /\r\n\r\n/;
like($buf, qr!HTTP/1\.[01] 500\b!, 'got 500 error on ENOENT');
@@ -678,13 +678,13 @@ SKIP: {
my $app = require $psgi;
test_psgi($app, sub {
my ($cb) = @_;
- my $req = GET('http://example.com/psgi-return-gzip');
+ my $req = GET('http://example.com/psgi-yield-gzip');
my $res = $cb->($req);
my $buf = $res->content;
IO::Uncompress::Gunzip::gunzip(\$buf => \(my $out));
is($out, "hello world\n", 'got expected output');
- $req = GET('http://example.com/psgi-return-enoent');
+ $req = GET('http://example.com/psgi-yield-enoent');
$res = $cb->($req);
is($res->code, 500, 'got error on ENOENT');
seek($tmperr, 0, SEEK_SET) or die;
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 17/26] qspawn: use WwwStatic for fallbacks and error code
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (15 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 16/26] drop psgi_return, httpd/async and GetlineBody Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 18/26] qspawn: simplify internal argument passing Eric Wong
` (8 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
This ensures we set directives to disable caching since
errors are always transient.
---
lib/PublicInbox/Qspawn.pm | 17 +++++++++--------
1 file changed, 9 insertions(+), 8 deletions(-)
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index a6e1d58b..0bb02081 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -74,6 +74,11 @@ sub _do_spawn {
finish($self, $@) if $@;
}
+sub psgi_status_err { # Qspawn itself is useful w/o PSGI
+ require PublicInbox::WwwStatic;
+ PublicInbox::WwwStatic::r($_[0] // 500);
+}
+
sub finalize ($;$) {
my ($self, $opt) = @_;
@@ -104,9 +109,7 @@ sub finalize ($;$) {
return if $self->{passed}; # another command chained it
if (my $wcb = delete $env->{'qspawn.wcb'}) {
# have we started writing, yet?
- my $code = delete $env->{'qspawn.fallback'} // 500;
- require PublicInbox::WwwStatic;
- $wcb->(PublicInbox::WwwStatic::r($code));
+ $wcb->(psgi_status_err($env->{'qspawn.fallback'}));
}
}
@@ -209,8 +212,6 @@ sub yield_pass {
$self->{qfh} = $qfh; # keep $ipipe open
}
-sub r500 () { [ 500, [], [ "Internal error\n" ] ] }
-
sub parse_hdr_done ($$) {
my ($self) = @_;
my $ret;
@@ -220,18 +221,18 @@ sub parse_hdr_done ($$) {
$ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) };
if ($@) {
carp "parse_hdr (@{$self->{cmd}}): $@\n";
- $ret = r500();
+ $ret = psgi_status_err();
} elsif (!$ret && $_[-1] eq '') {
carp <<EOM;
EOF parsing headers from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
EOM
- $ret = r500();
+ $ret = psgi_status_err();
}
} else {
carp <<EOM;
E: parsing headers: $! from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
EOM
- $ret = r500();
+ $ret = psgi_status_err();
}
$ret; # undef if headers incomplete
}
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 18/26] qspawn: simplify internal argument passing
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (16 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 17/26] qspawn: use WwwStatic for fallbacks and error code Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 19/26] cidx_log_p: don't bother with F_SETPIPE_SZ Eric Wong
` (7 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
Now that psgi_return is gone, we can further simplify our
internals to support only psgi_qx and psgi_yield. Internal
argument passing is reduced and we keep the command env and
redirects in the Qspawn object for as long as it's alive.
I wanted to get rid of finalize() entirely, but it seems
trickier to do when having to support generic PSGI.
---
lib/PublicInbox/Qspawn.pm | 50 +++++++++++++++++++--------------------
1 file changed, 24 insertions(+), 26 deletions(-)
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 0bb02081..a03e1b01 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -47,28 +47,27 @@ my $def_limiter;
# {qsp_err} is an optional error buffer callers may access themselves
sub new {
my ($class, $cmd, $cmd_env, $opt) = @_;
- bless { args => [ $cmd, $cmd_env, $opt ] }, $class;
+ bless { args => [ $cmd, $cmd_env, $opt ? { %$opt } : {} ] }, $class;
}
sub _do_spawn {
my ($self, $start_cb, $limiter) = @_;
- my ($cmd, $cmd_env, $opt) = @{delete $self->{args}};
+ my ($cmd, $cmd_env, $opt) = @{$self->{args}};
my %o = %{$opt || {}};
$self->{limiter} = $limiter;
for my $k (@PublicInbox::Spawn::RLIMITS) {
- $o{$k} = $limiter->{$k} // next;
+ $opt->{$k} = $limiter->{$k} // next;
}
- $self->{cmd} = $cmd;
$self->{-quiet} = 1 if $o{quiet};
$limiter->{running}++;
if ($start_cb) {
eval { # popen_rd may die on EMFILE, ENFILE
- $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
- \&waitpid_err, $self, \%o);
+ $self->{rpipe} = popen_rd($cmd, $cmd_env, $opt,
+ \&waitpid_err, $self);
$start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
};
} else {
- eval { run_await($cmd, $cmd_env, \%o, \&wait_await, $self) };
+ eval { run_await($cmd, $cmd_env, $opt, \&wait_await, $self) };
warn "E: $@" if $@;
}
finish($self, $@) if $@;
@@ -79,8 +78,8 @@ sub psgi_status_err { # Qspawn itself is useful w/o PSGI
PublicInbox::WwwStatic::r($_[0] // 500);
}
-sub finalize ($;$) {
- my ($self, $opt) = @_;
+sub finalize ($) {
+ my ($self) = @_;
# process is done, spawn whatever's in the queue
my $limiter = delete $self->{limiter} or return;
@@ -96,13 +95,13 @@ sub finalize ($;$) {
if (my $dst = $self->{qsp_err}) {
$$dst .= $$dst ? " $err" : "; $err";
}
- warn "@{$self->{cmd}}: $err\n" if !$self->{-quiet};
+ warn "E: @{$self->{args}->[0]}: $err\n" if !$self->{-quiet};
}
my ($env, $qx_cb_arg) = delete @$self{qw(psgi_env qx_cb_arg)};
if ($qx_cb_arg) {
my $cb = shift @$qx_cb_arg;
- eval { $cb->($opt->{1}, @$qx_cb_arg) };
+ eval { $cb->($self->{args}->[2]->{1}, @$qx_cb_arg) };
return unless $@;
warn "E: $@"; # hope qspawn.wcb can handle it
}
@@ -113,23 +112,21 @@ sub finalize ($;$) {
}
}
-sub DESTROY { finalize($_[0]) } # ->finalize is idempotent
-
sub waitpid_err { # callback for awaitpid
- my (undef, $self, $opt) = @_; # $_[0]: pid
+ my (undef, $self) = @_; # $_[0]: pid
$self->{_err} = ''; # for defined check in ->finish
- if ($?) { # FIXME: redundant
+ if ($?) { # XXX this may be redundant
my $status = $? >> 8;
my $sig = $? & 127;
$self->{_err} .= "exit status=$status";
$self->{_err} .= " signal=$sig" if $sig;
}
- finalize($self, $opt) if !$self->{rpipe};
+ finalize($self) if !$self->{rpipe};
}
sub wait_await { # run_await cb
my ($pid, $cmd, $cmd_env, $opt, $self) = @_;
- waitpid_err($pid, $self, $opt);
+ waitpid_err($pid, $self);
}
sub yield_chunk { # $_[-1] is sysread buffer (or undef)
@@ -214,26 +211,24 @@ sub yield_pass {
sub parse_hdr_done ($$) {
my ($self) = @_;
- my $ret;
+ my ($ret, $err);
if (defined $_[-1]) {
my ($bref, $ph_cb, @ph_arg) = @{$self->{yield_parse_hdr}};
$$bref .= $_[-1];
$ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) };
- if ($@) {
- carp "parse_hdr (@{$self->{cmd}}): $@\n";
+ if (($err = $@)) {
$ret = psgi_status_err();
} elsif (!$ret && $_[-1] eq '') {
- carp <<EOM;
-EOF parsing headers from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
-EOM
+ $err = 'EOF';
$ret = psgi_status_err();
}
} else {
- carp <<EOM;
-E: parsing headers: $! from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
-EOM
+ $err = "$!";
$ret = psgi_status_err();
}
+ carp <<EOM if $err;
+E: $err @{$self->{args}->[0]} ($self->{psgi_env}->{REQUEST_URI})
+EOM
$ret; # undef if headers incomplete
}
@@ -301,4 +296,7 @@ sub psgi_yield {
}
}
+no warnings 'once';
+*DESTROY = \&finalize; # ->finalize is idempotent
+
1;
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 19/26] cidx_log_p: don't bother with F_SETPIPE_SZ
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (17 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 18/26] qspawn: simplify internal argument passing Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 20/26] cindex: avoid awaitpid for popen Eric Wong
` (6 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
It doesn't help here since the bottleneck is Xapian indexing
(and not `git log -p'). This fcntl call was also in the way
of switching to ProcessIO.
---
lib/PublicInbox/CidxLogP.pm | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/lib/PublicInbox/CidxLogP.pm b/lib/PublicInbox/CidxLogP.pm
index 34f7201d..ac4c1b37 100644
--- a/lib/PublicInbox/CidxLogP.pm
+++ b/lib/PublicInbox/CidxLogP.pm
@@ -10,12 +10,11 @@
package PublicInbox::CidxLogP;
use v5.12;
use parent qw(PublicInbox::DS);
-use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT $F_SETPIPE_SZ);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
sub new {
my ($cls, $rd, $cidx, $git, $roots) = @_;
my $self = bless { cidx => $cidx, git => $git, roots => $roots }, $cls;
- fcntl($rd, $F_SETPIPE_SZ, 1048576) if $F_SETPIPE_SZ;
$self->SUPER::new($rd, EPOLLIN|EPOLLONESHOT);
}
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 20/26] cindex: avoid awaitpid for popen
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (18 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 19/26] cidx_log_p: don't bother with F_SETPIPE_SZ Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 21/26] cindex: use timer for inits Eric Wong
` (5 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
We can use popen_rd to pass command and callbacks to a
callback sub. This is another step which may allow us
to get rid of the wantarray forms of popen_rd/popen_wr
in the future.
---
lib/PublicInbox/CodeSearchIdx.pm | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index c8e4c591..122bd4d4 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -266,9 +266,9 @@ sub shard_index { # via wq_io_do in IDX_SHARDS
my $in = delete($self->{0}) // die 'BUG: no {0} input';
my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
sysseek($in, 0, SEEK_SET);
- my ($rd, $pid) = $git->popen(@LOG_STDIN, undef, { 0 => $in });
+ my $rd = popen_rd($git->cmd(@LOG_STDIN), undef, { 0 => $in },
+ \&cidx_reap_log, $self, $op_p);
close $in;
- awaitpid($pid, \&cidx_reap_log, $self, $op_p);
PublicInbox::CidxLogP->new($rd, $self, $git, $roots);
# CidxLogP->event_step will call cidx_read_log_p once there's input
}
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 21/26] cindex: use timer for inits
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (19 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 20/26] cindex: avoid awaitpid for popen Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 22/26] cindex: start using run_await to simplify code Eric Wong
` (4 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
We'll need to be in the event loop to use run_await in parallel,
so we can't start processes outside of it. This change isn't
ideal, but it likely keeps the rest of our (hotter) code simpler.
---
lib/PublicInbox/CodeSearchIdx.pm | 17 +++++++++++------
1 file changed, 11 insertions(+), 6 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 122bd4d4..b5ba03ea 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -1105,6 +1105,15 @@ sub show_roots { # for diagnostics
}
}
+sub do_inits { # called via PublicInbox::DS::add_timer
+ my ($self) = @_;
+ init_prune($self);
+ init_associate_postfork($self);
+ scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
+ my $max = $TODO{associate} ? max($LIVE_JOBS, $NPROC) : $LIVE_JOBS;
+ index_next($self) for (1..$max);
+}
+
sub cidx_run { # main entry point
my ($self) = @_;
my $restore_umask = prep_umask($self);
@@ -1183,18 +1192,14 @@ sub cidx_run { # main entry point
local $NPROC = PublicInbox::IPC::detect_nproc();
local $LIVE_JOBS = $self->{-opt}->{jobs} || $NPROC || 2;
local @RDONLY_XDB = $self->xdb_shards_flat;
- init_prune($self);
- init_associate_postfork($self);
- scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
- my $max = $TODO{associate} ? max($LIVE_JOBS, $NPROC) : $LIVE_JOBS;
- index_next($self) for (1..$max);
+ PublicInbox::DS::add_timer(0, \&do_inits, $self);
# FreeBSD ignores/discards SIGCHLD while signals are blocked and
# EVFILT_SIGNAL is inactive, so we pretend we have a SIGCHLD pending
PublicInbox::DS::enqueue_reap();
local @PublicInbox::DS::post_loop_do = (\&shards_active);
- PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active();
+ PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
PublicInbox::DS->Reset;
$self->lock_release(!!$NCHANGE);
show_roots($self) if $self->{-opt}->{'show-roots'} # for diagnostics
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 22/26] cindex: start using run_await to simplify code
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (20 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 21/26] cindex: use timer for inits Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 23/26] cindex: use run_await to read extensions.objectFormat Eric Wong
` (3 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
This saves us some awaitpid calls. We can also start passing
hashref redirect elements directly to pipe and open perlops,
saving us the trouble of naming some variables.
---
lib/PublicInbox/CodeSearchIdx.pm | 80 ++++++++++++++------------------
1 file changed, 35 insertions(+), 45 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index b5ba03ea..c1ab569c 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -55,6 +55,7 @@ use PublicInbox::CidxLogP;
use PublicInbox::CidxComm;
use PublicInbox::Git qw(%OFMT2HEXLEN);
use PublicInbox::Compat qw(uniqstr);
+use PublicInbox::Aspawn qw(run_await);
use Carp ();
use autodie qw(pipe open seek sysseek send);
our (
@@ -526,16 +527,15 @@ sub dump_roots_start {
for (@ID2ROOT) { print $fh $_, "\0", ++$nr, "\0" } # mmap-friendly
close $fh;
# dump_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_id
- pipe(my $sort_r, my $sort_w);
- pipe(my $fold_r, my $fold_w);
+ my ($sort_opt, $fold_opt);
+ pipe($sort_opt->{0}, my $sort_w);
+ pipe($fold_opt->{0}, $sort_opt->{1});
my @sort = (@SORT, '-k1,1');
my $dst = "$TMPDIR/to_root_id";
- open $fh, '>', $dst;
- my $env = { %$CMD_ENV, OFS => ' ' };
- my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fold_w });
- my $fold_pid = spawn(\@UNIQ_FOLD, $env, { 0 => $fold_r, 1 => $fh });
- awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
- awaitpid($fold_pid, \&cmd_done, [@UNIQ_FOLD, '(shards)'], $associate);
+ open $fold_opt->{1}, '>', $dst;
+ my $fold_env = { %$CMD_ENV, OFS => ' ' };
+ run_await(\@sort, $CMD_ENV, $sort_opt, \&cmd_done, $associate);
+ run_await(\@UNIQ_FOLD, $fold_env, $fold_opt, \&cmd_done, $associate);
my @arg = ((map { ('-A', $_) } @ASSOC_PFX), '-c',
'-m', assoc_max_init($self), $root2id, $QRY_STR);
for my $d ($self->shard_dirs) {
@@ -565,16 +565,14 @@ EOM
sub dump_ibx_start {
my ($self, $associate) = @_;
- pipe(my $sort_r, $DUMP_IBX_WPIPE);
- pipe(my $fold_r, my $fold_w);
+ my ($sort_opt, $fold_opt);
+ pipe($sort_opt->{0}, $DUMP_IBX_WPIPE);
+ pipe($fold_opt->{0}, $sort_opt->{1});
my @sort = (@SORT, '-k1,1'); # sort only on ASSOC_PFX
# pipeline: dump_ibx | sort -k1,1 | uniq_fold >to_ibx_id
- my $dst = "$TMPDIR/to_ibx_id";
- open my $fh, '>', $dst;
- my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fold_w });
- my $fold_pid = spawn(\@UNIQ_FOLD, $CMD_ENV, { 0 => $fold_r, 1 => $fh });
- awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
- awaitpid($fold_pid, \&cmd_done, [@UNIQ_FOLD, '(ibx)'], $associate);
+ open $fold_opt->{1}, '>', "$TMPDIR/to_ibx_id";
+ run_await(\@sort, $CMD_ENV, $sort_opt, \&cmd_done, $associate);
+ run_await(\@UNIQ_FOLD, $CMD_ENV, $fold_opt, \&cmd_done, $associate);
}
sub index_next ($) {
@@ -872,8 +870,8 @@ sub prep_alternate_start {
awaitpid($pid, \&prep_alternate_end, $o, $out, $run_prune);
}
-sub cmd_done { # awaitpid cb for sort, xapian-delve, sed failures
- my ($pid, $cmd, $run_on_destroy) = @_;
+sub cmd_done { # run_await cb for sort, xapian-delve, sed failures
+ my ($pid, $cmd, undef, undef, $run_on_destroy) = @_;
$? and die "@$cmd failed: \$?=$?";
# $run_on_destroy calls associate() or run_prune()
}
@@ -962,32 +960,28 @@ sub init_prune ($) {
comm => \@COMM, awk => \@AWK);
for (0..$#IDX_SHARDS) { push @delve, "$self->{xpfx}/$_" }
my $run_prune = PublicInbox::OnDestroy->new($$, \&run_prune, $self);
- pipe(my $sed_in, my $delve_out);
- pipe(my $sort_in, my $sed_out);
- my @sort_u = (@SORT, '-u');
- open(my $sort_out, '+>', "$TMPDIR/indexed_commits");
- my $pid = spawn(\@sort_u, $CMD_ENV, { 0 => $sort_in, 1 => $sort_out });
- awaitpid($pid, \&cmd_done, \@sort_u, $run_prune);
- $pid = spawn(\@sed, $CMD_ENV, { 0 => $sed_in, 1 => $sed_out });
- awaitpid($pid, \&cmd_done, \@sed, $run_prune);
- $pid = spawn(\@delve, undef, { 1 => $delve_out });
- awaitpid($pid, \&cmd_done, \@delve, $run_prune);
+ my ($sort_opt, $sed_opt, $delve_opt);
+ pipe($sed_opt->{0}, $delve_opt->{1});
+ pipe($sort_opt->{0}, $sed_opt->{1});
+ open($sort_opt->{1}, '+>', "$TMPDIR/indexed_commits");
+ run_await([@SORT, '-u'], $CMD_ENV, $sort_opt, \&cmd_done, $run_prune);
+ run_await(\@sed, $CMD_ENV, $sed_opt, \&cmd_done, $run_prune);
+ run_await(\@delve, undef, $delve_opt, \&cmd_done, $run_prune);
@PRUNE_QUEUE = @{$self->{git_dirs}};
for (1..$LIVE_JOBS) {
prep_alternate_start(shift(@PRUNE_QUEUE) // last, $run_prune);
}
}
-sub dump_git_commits { # awaitpid cb
- my ($pid, $batch_out) = @_;
+sub dump_git_commits { # run_await cb
+ my ($pid, undef, undef, $batch_opt) = @_;
(defined($pid) && $?) and die "E: @PRUNE_BATCH: \$?=$?";
return if $DO_QUIT;
my ($hexlen) = keys(%ALT_FH) or return; # done
close(delete $ALT_FH{$hexlen});
$PRUNE_BATCH[1] = "--git-dir=$TMPDIR/hexlen$hexlen.git";
- $pid = spawn(\@PRUNE_BATCH, undef, { 1 => $batch_out });
- awaitpid($pid, \&dump_git_commits, $batch_out);
+ run_await(\@PRUNE_BATCH, undef, $batch_opt, \&dump_git_commits);
}
sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done
@@ -999,18 +993,14 @@ sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done
# git --git-dir=hexlen64.git cat-file \
# --batch-all-objects --batch-check
# ) | awk | sort | comm | cidx_read_comm()
- pipe(my $awk_in, my $batch_out);
- pipe(my $sort_in, my $awk_out);
- pipe(my $comm_in, my $sort_out);
- my $awk_pid = spawn(\@AWK, $CMD_ENV, { 0 => $awk_in, 1 => $awk_out });
- my @sort_u = (@SORT, '-u');
- my $sort_pid = spawn(\@sort_u, $CMD_ENV,
- { 0 => $sort_in, 1 => $sort_out });
- my ($comm_rd, $comm_pid) = popen_rd(\@COMM, $CMD_ENV,
- { 0 => $comm_in, -C => "$TMPDIR" });
- awaitpid($awk_pid, \&cmd_done, \@AWK);
- awaitpid($sort_pid, \&cmd_done, \@sort_u);
- awaitpid($comm_pid, \&cmd_done, \@COMM);
+ my ($awk_opt, $sort_opt, $batch_opt);
+ my $comm_opt = { -C => "$TMPDIR" };
+ pipe($awk_opt->{0}, $batch_opt->{1});
+ pipe($sort_opt->{0}, $awk_opt->{1});
+ pipe($comm_opt->{0}, $sort_opt->{1});
+ run_await(\@AWK, $CMD_ENV, $awk_opt, \&cmd_done);
+ run_await([@SORT, '-u'], $CMD_ENV, $sort_opt, \&cmd_done);
+ my $comm_rd = popen_rd(\@COMM, $CMD_ENV, $comm_opt, \&cmd_done, \@COMM);
PublicInbox::CidxComm->new($comm_rd, $self); # calls cidx_read_comm
my $git_ver = PublicInbox::Git::git_version();
push @PRUNE_BATCH, '--buffer' if $git_ver ge v2.6;
@@ -1023,7 +1013,7 @@ sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done
warn(sprintf(<<EOM, $git_ver)) if $git_ver lt v2.19;
W: git v2.19+ recommended for high-latency storage (have git v%vd)
EOM
- dump_git_commits(undef, $batch_out);
+ dump_git_commits(undef, undef, undef, $batch_opt);
}
sub cidx_read_comm { # via PublicInbox::CidxComm::event_step
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 23/26] cindex: use run_await to read extensions.objectFormat
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (21 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 22/26] cindex: start using run_await to simplify code Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 24/26] cindex: drop XH_PID global Eric Wong
` (2 subsequent siblings)
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
This saves us the trouble of seeking ourselves by using existing
run_await functionality. We'll also be more robust to ensure we
only handle the result if the `git config' process exited without
a signal.
---
lib/PublicInbox/CodeSearchIdx.pm | 23 +++++++++--------------
1 file changed, 9 insertions(+), 14 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index c1ab569c..68b47d02 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -829,20 +829,18 @@ sub prep_umask ($) {
}
}
-sub prep_alternate_end { # awaitpid callback for config extensions.objectFormat
- my ($pid, $objdir, $out, $run_prune) = @_;
- my $status = $? >> 8;
+sub prep_alternate_end { # run_await cb for config extensions.objectFormat
+ my ($pid, $cmd, undef, $opt, $objdir, $run_prune) = @_;
+ my ($status, $sig) = ($? >> 8, $? & 127);
my $next_dir = shift(@PRUNE_QUEUE);
prep_alternate_start($next_dir, $run_prune) if defined($next_dir);
my $fmt;
- if ($status == 1) { # unset, default is '' (SHA-1)
+ if (!$sig && $status == 1) { # unset, default is '' (SHA-1)
$fmt = 'sha1';
- } elsif ($status == 0) {
- seek($out, 0, SEEK_SET);
- chomp($fmt = <$out> // 'sha1');
- } else {
- return warn("git config \$?=$? for objdir=$objdir");
+ } elsif (!$sig && $status == 0) {
+ chomp($fmt = ${$opt->{1}} || 'sha1');
}
+ $fmt // return warn("git config \$?=$? for objdir=$objdir");
my $hexlen = $OFMT2HEXLEN{$fmt} // return warn <<EOM;
E: ignoring objdir=$objdir, unknown extensions.objectFormat=$fmt
EOM
@@ -850,8 +848,7 @@ EOM
require PublicInbox::Import;
my $git_dir = "$TMPDIR/hexlen$hexlen.git";
PublicInbox::Import::init_bare($git_dir, 'cidx-all', $fmt);
- my $f = "$git_dir/objects/info/alternates";
- open $ALT_FH{$hexlen}, '>', $f;
+ open $ALT_FH{$hexlen}, '>', "$git_dir/objects/info/alternates";
}
say { $ALT_FH{$hexlen} } $objdir or die "say: $!";
}
@@ -865,9 +862,7 @@ sub prep_alternate_start {
}
my $cmd = [ 'git', "--git-dir=$git_dir",
qw(config extensions.objectFormat) ];
- open my $out, '+>', undef;
- my $pid = spawn($cmd, undef, { 1 => $out });
- awaitpid($pid, \&prep_alternate_end, $o, $out, $run_prune);
+ run_await($cmd, undef, undef, \&prep_alternate_end, $o, $run_prune);
}
sub cmd_done { # run_await cb for sort, xapian-delve, sed failures
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 24/26] cindex: drop XH_PID global
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (22 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 23/26] cindex: use run_await to read extensions.objectFormat Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 25/26] cindex: use run_await wrapper for git commands Eric Wong
2023-10-25 0:29 ` [PATCH 26/26] cindex: use sysread for generating fingerprint Eric Wong
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
We only the PID locally to call awaitpid.
---
lib/PublicInbox/CodeSearchIdx.pm | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 68b47d02..2356164b 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -74,7 +74,6 @@ our (
$PRUNE_DONE, # marks off prune completions
$NCHANGE, # current number of changes
$NPROC,
- $XH_PID, # XapHelper PID
$XHC, # XapClient
$REPO_CTX, # current repo being indexed in shards
$IDX_TODO, # [ $git0, $root0, $git1, $root1, ...]
@@ -514,8 +513,8 @@ sub assoc_max_init ($) {
sub dump_roots_start {
my ($self, $associate) = @_;
- ($XHC, $XH_PID) = PublicInbox::XapClient::start_helper("-j$NPROC");
- awaitpid($XH_PID, \&cmd_done, ['xap_helper', "-j$NPROC"]);
+ ($XHC, my $pid) = PublicInbox::XapClient::start_helper("-j$NPROC");
+ awaitpid($pid, \&cmd_done, ['xap_helper', "-j$NPROC"]);
$associate // die 'BUG: no $associate';
$TODO{associating} = 1; # keep shards_active() happy
progress($self, 'dumping IDs from coderepos');
@@ -1113,7 +1112,7 @@ sub cidx_run { # main entry point
local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
$REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
%TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE,
- @ID2ROOT, $XH_PID, $XHC, @SORT);
+ @ID2ROOT, $XHC, @SORT);
local $BATCH_BYTES = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
local $self->{ASSOC_PFX} = \@ASSOC_PFX;
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 25/26] cindex: use run_await wrapper for git commands
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (23 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 24/26] cindex: drop XH_PID global Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 0:29 ` [PATCH 26/26] cindex: use sysread for generating fingerprint Eric Wong
25 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
Instead of keeping track of live processes ourselves in a hash
table, we'll rely on OnDestroy here to notify us of git command
completions.
---
lib/PublicInbox/CodeSearchIdx.pm | 101 +++++++++++++++----------------
1 file changed, 48 insertions(+), 53 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 2356164b..e17cba39 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -58,14 +58,14 @@ use PublicInbox::Compat qw(uniqstr);
use PublicInbox::Aspawn qw(run_await);
use Carp ();
use autodie qw(pipe open seek sysseek send);
+our $DO_QUIT = 15; # signal number
our (
- $LIVE, # pid => cmd
$LIVE_JOBS, # integer
+ $GITS_NR, # number of coderepos
$MY_SIG, # like %SIG
$SIGSET,
$TXN_BYTES, # number of bytes in current shard transaction
$BATCH_BYTES,
- $DO_QUIT, # signal number
@RDONLY_XDB, # Xapian::Database
@IDX_SHARDS, # clones of self
$MAX_SIZE,
@@ -359,39 +359,31 @@ sub docids_by_postlist ($$) { # consider moving to PublicInbox::Search
@ids;
}
-sub cidx_await_cb { # awaitpid cb
- my ($pid, $cb, $self, $git, @args) = @_;
- return if !$LIVE || $DO_QUIT;
- my $cmd = delete $LIVE->{$pid} // die 'BUG: no $cmd';
- PublicInbox::DS::enqueue_reap() if !keys(%$LIVE); # once more for PLC
- if ($?) {
- $git->{-cidx_err} = 1;
- return warn("@$cmd error: \$?=$?\n");
- }
- $cb->($self, $git, @args);
+sub _cb { # run_await cb
+ my ($pid, $cmd, undef, $opt, $cb, $self, $git, @arg) = @_;
+ return if $DO_QUIT;
+ ($git->{-cidx_err} = $?) ? warn("@$cmd error: \$?=$?\n") :
+ $cb->($opt, $self, $git, @arg);
}
-sub cidx_await ($$$$$@) {
- my ($pid, $cmd, $cb, $self, $git, @args) = @_;
- $LIVE->{$pid} = $cmd;
- awaitpid($pid, \&cidx_await_cb, $cb, $self, $git, @args);
+sub run_git {
+ my ($cmd, $opt, $cb, $self, $git, @arg) = @_;
+ run_await($git->cmd(@$cmd), undef, $opt, \&_cb, $cb, $self, $git, @arg)
}
# this is different from the grokmirror-compatible fingerprint since we
# only care about --heads (branches) and --tags, and not even their names
sub fp_start ($$$) {
my ($self, $git, $prep_repo) = @_;
- return if !$LIVE || $DO_QUIT;
+ return if $DO_QUIT;
open my $refs, '+>', undef;
- my $cmd = ['git', "--git-dir=$git->{git_dir}",
- qw(show-ref --heads --tags --hash)];
- my $pid = spawn($cmd, undef, { 1 => $refs });
$git->{-repo}->{refs} = $refs;
- cidx_await($pid, $cmd, \&fp_fini, $self, $git, $prep_repo);
+ run_git([qw(show-ref --heads --tags --hash)], { 1 => $refs },
+ \&fp_fini, $self, $git, $prep_repo);
}
-sub fp_fini { # cidx_await cb
- my ($self, $git, $prep_repo) = @_;
+sub fp_fini { # run_git cb
+ my (undef, $self, $git, $prep_repo) = @_;
my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
seek($refs, 0, SEEK_SET);
my $buf;
@@ -402,26 +394,23 @@ sub fp_fini { # cidx_await cb
sub ct_start ($$$) {
my ($self, $git, $prep_repo) = @_;
- return if !$LIVE || $DO_QUIT;
- my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
- qw[for-each-ref --sort=-committerdate
+ return if $DO_QUIT;
+ run_git([ qw[for-each-ref --sort=-committerdate
--format=%(committerdate:raw) --count=1
- refs/heads/ refs/tags/] ];
- my ($rd, $pid) = popen_rd($cmd);
- cidx_await($pid, $cmd, \&ct_fini, $self, $git, $rd, $prep_repo);
+ refs/heads/ refs/tags/] ], undef, # capture like qx
+ \&ct_fini, $self, $git, $prep_repo);
}
-sub ct_fini { # cidx_await cb
- my ($self, $git, $rd, $prep_repo) = @_;
- defined(my $ct = <$rd>) or return;
- $ct =~ s/\s+.*\z//s; # drop TZ + LF
+sub ct_fini { # run_git cb
+ my ($opt, $self, $git, $prep_repo) = @_;
+ my ($ct) = split(/\s+/, ${$opt->{1}}); # drop TZ + LF
$git->{-repo}->{ct} = $ct + 0;
}
# TODO: also index gitweb.owner and the full fingerprint for grokmirror?
sub prep_repo ($$) {
my ($self, $git) = @_;
- return if !$LIVE || $DO_QUIT;
+ return if $DO_QUIT;
return index_next($self) if $git->{-cidx_err};
my $repo = $git->{-repo} // die 'BUG: no {-repo}';
if (!defined($repo->{ct})) {
@@ -578,9 +567,9 @@ sub index_next ($) {
my ($self) = @_;
return if $DO_QUIT;
if ($IDX_TODO && @$IDX_TODO) {
- index_repo($self, shift @$IDX_TODO);
+ index_repo(undef, $self, shift @$IDX_TODO);
} elsif ($GIT_TODO && @$GIT_TODO) {
- my $git = PublicInbox::Git->new(shift @$GIT_TODO);
+ my $git = shift @$GIT_TODO;
my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo,
$self, $git);
fp_start($self, $git, $prep_repo);
@@ -589,7 +578,8 @@ sub index_next ($) {
return if delete($TODO{dump_roots_start});
delete $TODO{dump_ibx_start}; # runs OnDestroy once
return dump_ibx($self, shift @IBXQ) if @IBXQ;
- undef $DUMP_IBX_WPIPE; # done dumping inboxes, dump roots
+ undef $DUMP_IBX_WPIPE; # done dumping inboxes
+ undef $XHC;
delete $TODO{associate};
}
# else: wait for shards_active (post_loop_do) callback
@@ -630,8 +620,8 @@ sub commit_shard { # OnDestroy cb
# shard_done fires when all shards are committed
}
-sub index_repo { # cidx_await cb
- my ($self, $git) = @_;
+sub index_repo { # run_git cb
+ my (undef, $self, $git) = @_;
return if $DO_QUIT;
return index_next($self) if $git->{-cidx_err};
return push(@$IDX_TODO, $git) if $REPO_CTX; # busy
@@ -649,6 +639,7 @@ sub index_repo { # cidx_await cb
my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
$repo->{git_dir} = $git->{git_dir};
my $repo_ctx = $REPO_CTX = { self => $self, repo => $repo };
+ delete $git->{-cidx_gits_fini}; # may fire gits_fini
my $commit_shard = PublicInbox::OnDestroy->new($$, \&commit_shard,
$repo_ctx);
my ($c, $p) = PublicInbox::PktOp->pair;
@@ -667,15 +658,13 @@ sub index_repo { # cidx_await cb
sub get_roots ($$) {
my ($self, $git) = @_;
- return if !$LIVE || $DO_QUIT;
+ return if $DO_QUIT;
my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
sysseek($refs, 0, SEEK_SET);
open my $roots_fh, '+>', undef;
- my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
- qw(rev-list --stdin --max-parents=0) ];
- my $pid = spawn($cmd, undef, { 0 => $refs, 1 => $roots_fh });
$git->{-repo}->{roots_fh} = $roots_fh;
- cidx_await($pid, $cmd, \&index_repo, $self, $git);
+ run_git([ qw(rev-list --stdin --max-parents=0) ],
+ { 0 => $refs, 1 => $roots_fh }, \&index_repo, $self, $git)
}
# for PublicInbox::SearchIdx::patch_id and with_umask
@@ -751,10 +740,18 @@ EOM
@shards;
}
+sub gits_fini {
+ undef $GITS_NR;
+ PublicInbox::DS::enqueue_reap(); # kick @post_loop_do
+}
+
sub scan_git_dirs ($) {
my ($self) = @_;
- my $n = @$GIT_TODO = @{$self->{git_dirs}};
- progress($self, "scanning $n code repositories...");
+ @$GIT_TODO = map { PublicInbox::Git->new($_) } @{$self->{git_dirs}};
+ $GITS_NR = @$GIT_TODO;
+ my $gits_fini = PublicInbox::OnDestroy->new($$, \&gits_fini);
+ $_->{-cidx_gits_fini} = $gits_fini for @$GIT_TODO;
+ progress($self, "scanning $GITS_NR code repositories...");
}
sub prune_init { # via wq_io_do in IDX_SHARDS
@@ -786,10 +783,10 @@ sub prune_commit { # via wq_io_do in IDX_SHARDS
sub shards_active { # post_loop_do
return if $DO_QUIT;
- return if grep(defined, $PRUNE_DONE, $GIT_TODO, $IDX_TODO, $LIVE) != 4;
+ return if grep(defined, $PRUNE_DONE, $GIT_TODO, $IDX_TODO) != 3;
return 1 if grep(defined, @$PRUNE_DONE) != @IDX_SHARDS;
- return 1 if scalar(@$GIT_TODO) || scalar(@$IDX_TODO) || $REPO_CTX;
- return 1 if keys(%$LIVE) || @IBXQ || keys(%TODO);
+ return 1 if $GITS_NR || scalar(@$IDX_TODO) || $REPO_CTX;
+ return 1 if @IBXQ || keys(%TODO);
for my $s (grep { $_->{-wq_s1} } @IDX_SHARDS) {
$s->{-cidx_quit} = 1 if defined($s->{-wq_s1});
$s->wq_close; # may recurse via awaitpid outside of event_loop
@@ -1105,14 +1102,13 @@ sub cidx_run { # main entry point
POSIX::SIGTSTP, POSIX::SIGCONT);
my $restore = PublicInbox::OnDestroy->new($$,
\&PublicInbox::DS::sig_setmask, $SIGSET);
- local $LIVE = {};
local $PRUNE_DONE = [];
local $IDX_TODO = [];
local $GIT_TODO = [];
local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
$REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
%TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE,
- @ID2ROOT, $XHC, @SORT);
+ @ID2ROOT, $XHC, @SORT, $GITS_NR);
local $BATCH_BYTES = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
local $self->{ASSOC_PFX} = \@ASSOC_PFX;
@@ -1202,14 +1198,13 @@ sub ipc_atfork_child { # @IDX_SHARDS
sub shard_done_wait { # awaitpid cb via ipc_worker_reap
my ($pid, $shard, $self) = @_;
my $quit_req = delete($shard->{-cidx_quit});
- return if $DO_QUIT || !$LIVE;
+ return if $DO_QUIT;
if ($? == 0) { # success
$quit_req // warn 'BUG: {-cidx_quit} unset';
} else {
warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
++$self->{shard_err} if defined($self->{shard_err});
}
- PublicInbox::DS::enqueue_reap() if !shards_active(); # once more for PLC
}
1;
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 26/26] cindex: use sysread for generating fingerprint
2023-10-25 0:29 [PATCH 00/26] process management simplifications Eric Wong
` (24 preceding siblings ...)
2023-10-25 0:29 ` [PATCH 25/26] cindex: use run_await wrapper for git commands Eric Wong
@ 2023-10-25 0:29 ` Eric Wong
2023-10-25 6:33 ` [PATCH 27/26] lei_mirror+fetch: don't slurp `git show-ref' output Eric Wong
25 siblings, 1 reply; 28+ messages in thread
From: Eric Wong @ 2023-10-25 0:29 UTC (permalink / raw)
To: meta
We use sysseek for this file handle elsewhere (since it's passed
to `git rev-list --stdin' multiple times), and sysread ensures
we can use a larger read buffer than the tiny 8K BUFSIZ Perl +
glibc is contrained to.
This also ensures we autodie on sysread failures, since the
autodie import for `read' was missing and we don't call `read'
anywhere else in this file.
---
lib/PublicInbox/CodeSearchIdx.pm | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index e17cba39..e31432b9 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -57,7 +57,7 @@ use PublicInbox::Git qw(%OFMT2HEXLEN);
use PublicInbox::Compat qw(uniqstr);
use PublicInbox::Aspawn qw(run_await);
use Carp ();
-use autodie qw(pipe open seek sysseek send);
+use autodie qw(pipe open sysread seek sysseek send);
our $DO_QUIT = 15; # signal number
our (
$LIVE_JOBS, # integer
@@ -385,10 +385,10 @@ sub fp_start ($$$) {
sub fp_fini { # run_git cb
my (undef, $self, $git, $prep_repo) = @_;
my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
- seek($refs, 0, SEEK_SET);
+ sysseek($refs, 0, SEEK_SET);
my $buf;
my $dig = PublicInbox::SHA->new(256);
- while (read($refs, $buf, 65536)) { $dig->add($buf) }
+ while (sysread($refs, $buf, 65536)) { $dig->add($buf) }
$git->{-repo}->{fp} = $dig->hexdigest;
}
^ permalink raw reply related [flat|nested] 28+ messages in thread
* [PATCH 27/26] lei_mirror+fetch: don't slurp `git show-ref' output
2023-10-25 0:29 ` [PATCH 26/26] cindex: use sysread for generating fingerprint Eric Wong
@ 2023-10-25 6:33 ` Eric Wong
0 siblings, 0 replies; 28+ messages in thread
From: Eric Wong @ 2023-10-25 6:33 UTC (permalink / raw)
To: meta
While uncommon, some git repos have hundreds of thousands of
refs and slurping that output into memory can bloat the heap.
Introduce a sha_all sub in PublicInbox::SHA to loop until EOF
and rely on autodie for checking sysread errors.
---
lib/PublicInbox/CodeSearchIdx.pm | 7 ++-----
lib/PublicInbox/Fetch.pm | 4 ++--
lib/PublicInbox/Git.pm | 6 ++----
lib/PublicInbox/LeiMirror.pm | 14 +++++++-------
lib/PublicInbox/SHA.pm | 11 ++++++++++-
5 files changed, 23 insertions(+), 19 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index e31432b9..aeee37c0 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -45,7 +45,7 @@ use POSIX qw(WNOHANG SEEK_SET);
use File::Path ();
use File::Spec ();
use List::Util qw(max);
-use PublicInbox::SHA qw(sha256_hex);
+use PublicInbox::SHA qw(sha256_hex sha_all);
use PublicInbox::Search qw(xap_terms);
use PublicInbox::SearchIdx qw(add_val);
use PublicInbox::Config qw(glob2re rel2abs_collapsed);
@@ -386,10 +386,7 @@ sub fp_fini { # run_git cb
my (undef, $self, $git, $prep_repo) = @_;
my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
sysseek($refs, 0, SEEK_SET);
- my $buf;
- my $dig = PublicInbox::SHA->new(256);
- while (sysread($refs, $buf, 65536)) { $dig->add($buf) }
- $git->{-repo}->{fp} = $dig->hexdigest;
+ $git->{-repo}->{fp} = sha_all(256, $refs)->hexdigest;
}
sub ct_start ($$$) {
diff --git a/lib/PublicInbox/Fetch.pm b/lib/PublicInbox/Fetch.pm
index 6e9b1e94..e41dd448 100644
--- a/lib/PublicInbox/Fetch.pm
+++ b/lib/PublicInbox/Fetch.pm
@@ -10,6 +10,7 @@ use PublicInbox::Admin;
use PublicInbox::LEI;
use PublicInbox::LeiCurl;
use PublicInbox::LeiMirror;
+use PublicInbox::SHA qw(sha_all);
use File::Temp ();
sub new { bless {}, __PACKAGE__ }
@@ -92,9 +93,8 @@ sub do_manifest ($$$) {
sub get_fingerprint2 {
my ($git_dir) = @_;
- require PublicInbox::SHA;
my $rd = popen_rd([qw(git show-ref)], undef, { -C => $git_dir });
- PublicInbox::SHA::sha256(do { local $/; <$rd> });
+ sha_all(256, $rd)->digest; # ignore show-ref errors
}
sub writable_dir ($) {
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 476dcf30..9c26d8bf 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -23,7 +23,7 @@ use PublicInbox::ProcessIONBF;
use PublicInbox::Tmpfile;
use IO::Poll qw(POLLIN);
use Carp qw(croak carp);
-use PublicInbox::SHA ();
+use PublicInbox::SHA qw(sha_all);
our %HEXLEN2SHA = (40 => 1, 64 => 256);
our %OFMT2HEXLEN = (sha1 => 40, sha256 => 64);
our @EXPORT_OK = qw(git_unquote git_quote %HEXLEN2SHA %OFMT2HEXLEN read_all);
@@ -620,10 +620,8 @@ sub manifest_entry {
$ent->{reference} = $buf;
}
}
- my $dig = PublicInbox::SHA->new(1);
- while (CORE::read($sr, $buf, 65536)) { $dig->add($buf) }
+ $ent->{fingerprint} = sha_all(1, $sr)->hexdigest;
CORE::close $sr or return; # empty, uninitialized git repo
- $ent->{fingerprint} = $dig->hexdigest;
$ent->{modified} = modified(undef, $mod);
chomp($buf = <$own> // '');
utf8::decode($buf);
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index 47fb767b..43e59e6c 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -19,10 +19,10 @@ use PublicInbox::Inbox;
use PublicInbox::Git qw(read_all);
use PublicInbox::LeiCurl;
use PublicInbox::OnDestroy;
-use PublicInbox::SHA qw(sha256_hex sha1_hex);
+use PublicInbox::SHA qw(sha256_hex sha_all);
use POSIX qw(strftime);
-use autodie qw(chdir chmod close open pipe readlink seek symlink sysopen
- truncate unlink);
+use autodie qw(chdir chmod close open pipe readlink
+ seek symlink sysopen sysseek truncate unlink);
our $LIVE; # pid => callback
our $FGRP_TODO; # objstore -> [[ to resume ], [ to clone ]]
@@ -533,10 +533,10 @@ sub fp_done {
}
return if !keep_going($self);
my $fh = delete $self->{-show_ref} // die 'BUG: no show-ref output';
- seek($fh, SEEK_SET, 0);
+ sysseek($fh, SEEK_SET, 0);
$self->{-ent} // die 'BUG: no -ent';
my $A = $self->{-ent}->{fingerprint} // die 'BUG: no fingerprint';
- my $B = sha1_hex(read_all($fh));
+ my $B = sha_all(1, $fh)->hexdigest;
return $cb->($self, @arg) if $A ne $B;
$self->{lei}->qerr("# $self->{-key} up-to-date");
}
@@ -730,10 +730,10 @@ sub up_fp_done {
my ($self) = @_;
return if !keep_going($self);
my $fh = delete $self->{-show_ref_up} // die 'BUG: no show-ref output';
- seek($fh, SEEK_SET, 0);
+ sysseek($fh, SEEK_SET, 0);
$self->{-ent} // die 'BUG: no -ent';
my $A = $self->{-ent}->{fingerprint} // die 'BUG: no fingerprint';
- my $B = sha1_hex(read_all($fh));
+ my $B = sha_all(1, $fh)->hexdigest;
return if $A eq $B;
$self->{-ent}->{fingerprint} = $B;
push @{$self->{chg}->{fp_mismatch}}, $self->{-key};
diff --git a/lib/PublicInbox/SHA.pm b/lib/PublicInbox/SHA.pm
index 81f62618..3fa8530e 100644
--- a/lib/PublicInbox/SHA.pm
+++ b/lib/PublicInbox/SHA.pm
@@ -12,7 +12,8 @@
package PublicInbox::SHA;
use v5.12;
require Exporter;
-our @EXPORT_OK = qw(sha1_hex sha256_hex sha256);
+our @EXPORT_OK = qw(sha1_hex sha256_hex sha256 sha_all);
+use autodie qw(sysread);
our @ISA;
BEGIN {
@@ -55,4 +56,12 @@ EOM
}
} # /BEGIN
+
+sub sha_all ($$) {
+ my ($n, $fh) = @_;
+ my ($dig, $buf) = (PublicInbox::SHA->new($n));
+ while (sysread($fh, $buf, 65536)) { $dig->add($buf) }
+ $dig
+}
+
1;
^ permalink raw reply related [flat|nested] 28+ messages in thread