The DS::in_loop clobbering in MboxReader annoyed me and drove this series. I wanted to do it for the bidirectional pipes with "git cat-file --batch", too, but there seems to be some lifetime management issues which get hard-to-control with things like the the waitpid(..,WNOHANG) call in GitAsyncCat. Maybe switching --batch + DS to use UNIX sockets can be done to save FDs (or I'm too brain-damaged to figure this out). But right now, all of our codebase is robust against children attempting to reap siblings (or PIDs of former siblings) Eric Wong (6): processpipe: allow synchronous close to set $? processpipe: lazy-require PublicInbox::DS for dwaitpid git: qx: waitpid synchronously via ProcessPipe->CLOSE import: switch to using ProcessPipe git: manifest_entry: use ProcessPipe via popen_rd qspawn: switch to ProcessPipe via popen_rd lib/PublicInbox/Git.pm | 20 +++++++++----- lib/PublicInbox/Import.pm | 23 +++++++--------- lib/PublicInbox/MboxReader.pm | 3 --- lib/PublicInbox/ProcessPipe.pm | 49 +++++++++++++++++++++------------- lib/PublicInbox/Qspawn.pm | 15 +++++------ lib/PublicInbox/Spawn.pm | 10 +++---- t/mbox_reader.t | 17 ++++++++++++ t/spawn.t | 38 ++++++++++++++++++++++++++ 8 files changed, 119 insertions(+), 56 deletions(-)
To get rid of the ugly $PublicInbox::DS::in_loop localization in MboxReader, we'll distinguish between ->CLOSE and ->DESTROY with ProcessPipe. If we end up closing via ->DESTROY, we'll assume the caller will want to deal with $? asynchronously via the event loop (or not even care about $?). If we hit ->CLOSE directly, we'll assume the caller called close() and wants to check $? synchronously. Note: wantarray doesn't seem to propagate into tied methods, otherwise I'd be relying on that. --- lib/PublicInbox/MboxReader.pm | 3 --- lib/PublicInbox/ProcessPipe.pm | 45 +++++++++++++++++++++------------- lib/PublicInbox/Spawn.pm | 10 ++++---- t/mbox_reader.t | 17 +++++++++++++ t/spawn.t | 38 ++++++++++++++++++++++++++++ 5 files changed, 88 insertions(+), 25 deletions(-) diff --git a/lib/PublicInbox/MboxReader.pm b/lib/PublicInbox/MboxReader.pm index 1894756d..59ce4fb6 100644 --- a/lib/PublicInbox/MboxReader.pm +++ b/lib/PublicInbox/MboxReader.pm @@ -5,7 +5,6 @@ package PublicInbox::MboxReader; use strict; use v5.10.1; -use PublicInbox::DS (); # localize $in_loop for error detection :< use Data::Dumper; $Data::Dumper::Useqq = 1; # should've been the default, for bad data @@ -14,7 +13,6 @@ my $from_strict = sub _mbox_from { my ($mbfh, $from_re, $eml_cb, @arg) = @_; - local $PublicInbox::DS::in_loop; # disable dwaitpid my $buf = ''; my @raw; while (defined(my $r = read($mbfh, $buf, 65536, length($buf)))) { @@ -75,7 +73,6 @@ sub _extract_hdr { sub _mbox_cl ($$$;@) { my ($mbfh, $uxs_from, $eml_cb, @arg) = @_; - local $PublicInbox::DS::in_loop; # disable dwaitpid my $buf = ''; while (defined(my $r = read($mbfh, $buf, 65536, length($buf)))) { if ($r == 0) { # detect "curl --fail" diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm index 336d5ac4..400a22f3 100644 --- a/lib/PublicInbox/ProcessPipe.pm +++ b/lib/PublicInbox/ProcessPipe.pm @@ -6,10 +6,12 @@ package PublicInbox::ProcessPipe; use strict; use v5.10.1; use PublicInbox::DS qw(dwaitpid); +use Carp qw(carp); sub TIEHANDLE { my ($class, $pid, $fh, $cb, $arg) = @_; - bless { pid => $pid, fh => $fh, cb => $cb, arg => $arg }, $class; + bless { pid => $pid, fh => $fh, ppid => $$, cb => $cb, arg => $arg }, + $class; } sub READ { read($_[0]->{fh}, $_[1], $_[2], $_[3] || 0) } @@ -26,32 +28,41 @@ sub PRINT { print { $self->{fh} } @_; } -sub adjust_ret { # dwaitpid callback - my ($retref, $pid) = @_; - $$retref = '' if $? -} +sub FILENO { fileno($_[0]->{fh}) } -sub CLOSE { - my $fh = delete($_[0]->{fh}); - my $ret = defined $fh ? close($fh) : ''; - my ($pid, $cb, $arg) = delete @{$_[0]}{qw(pid cb arg)}; - if (defined $pid) { - unless ($cb) { - $cb = \&adjust_ret; - $arg = \$ret; +sub _close ($;$) { + my ($self, $wait) = @_; + my $fh = delete $self->{fh}; + my $ret = defined($fh) ? close($fh) : ''; + my ($pid, $cb, $arg) = delete @$self{qw(pid cb arg)}; + return $ret unless defined($pid) && $self->{ppid} == $$; + if ($wait) { # caller cares about the exit status: + my $wp = waitpid($pid, 0); + if ($wp == $pid) { + $ret = '' if $?; + if ($cb) { + eval { $cb->($arg, $pid) }; + carp "E: cb(arg, $pid): $@" if $@; + } + } else { + carp "waitpid($pid, 0) = $wp, \$!=$!, \$?=$?"; } + } else { # caller just undef-ed it, let event loop deal with it dwaitpid $pid, $cb, $arg; } $ret; } -sub FILENO { fileno($_[0]->{fh}) } +# if caller uses close(), assume they want to check $? immediately so +# we'll waitpid() synchronously. n.b. wantarray doesn't seem to +# propagate `undef' down to tied methods, otherwise I'd rely on that. +sub CLOSE { _close($_[0], 1) } +# if relying on DESTROY, assume the caller doesn't care about $? and +# we can let the event loop call waitpid() whenever it gets SIGCHLD sub DESTROY { - CLOSE(@_); + _close($_[0]); undef; } -sub pid { $_[0]->{pid} } - 1; diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index 1ee40503..762a0549 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -295,14 +295,14 @@ sub spawn ($;$$) { } sub popen_rd { - my ($cmd, $env, $opts) = @_; + my ($cmd, $env, $opt) = @_; pipe(my ($r, $w)) or die "pipe: $!\n"; - $opts ||= {}; - $opts->{1} = fileno($w); - my $pid = spawn($cmd, $env, $opts); + $opt ||= {}; + $opt->{1} = fileno($w); + my $pid = spawn($cmd, $env, $opt); return ($r, $pid) if wantarray; my $ret = gensym; - tie *$ret, 'PublicInbox::ProcessPipe', $pid, $r; + tie *$ret, 'PublicInbox::ProcessPipe', $pid, $r, @$opt{qw(cb arg)}; $ret; } diff --git a/t/mbox_reader.t b/t/mbox_reader.t index 53458ec2..4ea2ae29 100644 --- a/t/mbox_reader.t +++ b/t/mbox_reader.t @@ -72,4 +72,21 @@ for my $fmt (@mbox) { $check_fmt->($fmt) } s/\n/\r\n/sg for (values %raw); for my $fmt (@mbox) { $check_fmt->($fmt) } +SKIP: { + use PublicInbox::Spawn qw(popen_rd); + use Time::HiRes qw(alarm); + my $fh = popen_rd([ $^X, '-E', <<'' ]); +say "From x@y Fri Oct 2 00:00:00 1993"; +print "a: b\n\n", "x" x 70000, "\n\n"; +say "From x@y Fri Oct 2 00:00:00 2010"; +print "Final: bit\n\n", "Incomplete\n\n"; +exit 1 + + my @x; + eval { $reader->mboxrd($fh, sub { push @x, shift->as_string }) }; + like($@, qr/error closing mbox/, 'detects error reading from pipe'); + is(scalar(@x), 1, 'only saw one message'); + is(scalar(grep(/Final/, @x)), 0, 'no incomplete bit'); +} + done_testing; diff --git a/t/spawn.t b/t/spawn.t index 552bba33..d97e13a6 100644 --- a/t/spawn.t +++ b/t/spawn.t @@ -98,6 +98,44 @@ EOF isnt($?, 0, '$? set properly: '.$?); } +{ # ->CLOSE vs ->DESTROY waitpid caller distinction + my @c; + my $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } }); + ok(close($fh), '->CLOSE fired and successful'); + ok(scalar(@c), 'callback fired by ->CLOSE'); + ok(grep(!m[/PublicInbox/DS\.pm\z], @c), 'callback not invoked by DS'); + + @c = (); + $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } }); + undef $fh; # ->DESTROY + ok(scalar(@c), 'callback fired by ->DESTROY'); + ok(grep(!m[/PublicInbox/ProcessPipe\.pm\z], @c), + 'callback not invoked by ProcessPipe'); +} + +{ # children don't wait on siblings + use POSIX qw(_exit); + pipe(my ($r, $w)) or BAIL_OUT $!; + my $cb = sub { warn "x=$$\n" }; + my $fh = popen_rd(['cat'], undef, { 0 => $r, cb => $cb }); + my $pp = tied *$fh; + my $pid = fork // BAIL_OUT $!; + local $SIG{__WARN__} = sub { _exit(1) }; + if ($pid == 0) { + local $SIG{__DIE__} = sub { _exit(2) }; + undef $fh; + _exit(0); + } + waitpid($pid, 0); + is($?, 0, 'forked process exited'); + my @w; + local $SIG{__WARN__} = sub { push @w, @_ }; + close $w; + close $fh; + is($?, 0, 'cat exited'); + is_deeply(\@w, [ "x=$$\n" ], 'callback fired from owner'); +} + SKIP: { eval { require BSD::Resource;
This saves over 20ms with scripts that only use PublicInbox::Spawn. --- lib/PublicInbox/ProcessPipe.pm | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm index 400a22f3..e540dc22 100644 --- a/lib/PublicInbox/ProcessPipe.pm +++ b/lib/PublicInbox/ProcessPipe.pm @@ -5,7 +5,6 @@ package PublicInbox::ProcessPipe; use strict; use v5.10.1; -use PublicInbox::DS qw(dwaitpid); use Carp qw(carp); sub TIEHANDLE { @@ -48,7 +47,8 @@ sub _close ($;$) { carp "waitpid($pid, 0) = $wp, \$!=$!, \$?=$?"; } } else { # caller just undef-ed it, let event loop deal with it - dwaitpid $pid, $cb, $arg; + require PublicInbox::DS; + PublicInbox::DS::dwaitpid($pid, $cb, $arg); } $ret; }
If we're using ->qx, we're operating synchronously anyways, so there's little point in relying on the event loop for waitpid. --- lib/PublicInbox/Git.pm | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index f7332bb6..cdd2b400 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -365,8 +365,17 @@ sub popen { sub qx { my $self = shift; my $fh = $self->popen(@_); - local $/ = wantarray ? "\n" : undef; - <$fh>; + if (wantarray) { + local $/ = "\n"; + my @ret = <$fh>; + close $fh; # caller should check $? + @ret; + } else { + local $/; + my $ret = <$fh>; + close $fh; # caller should check $? + $ret; + } } # check_async and cat_async may trigger the other, so ensure they're
This saves us a few lines of code, but also prevents misreaping by sibling processes. --- lib/PublicInbox/Import.pm | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm index 47a529ff..b5780d2b 100644 --- a/lib/PublicInbox/Import.pm +++ b/lib/PublicInbox/Import.pm @@ -54,9 +54,9 @@ sub new { sub gfi_start { my ($self) = @_; - return ($self->{in}, $self->{out}) if $self->{pid}; + return ($self->{in}, $self->{out}) if $self->{in}; - my ($in_r, $pid, $out_r, $out_w); + my ($in_r, $out_r, $out_w); pipe($out_r, $out_w) or die "pipe failed: $!"; $self->lock_acquire; @@ -72,13 +72,11 @@ sub gfi_start { chomp @t; $self->{-tree} = { map { $_ => 1 } @t }; } - my @cmd = ('git', "--git-dir=$git->{git_dir}", - qw(fast-import --quiet --done --date-format=raw)); - ($in_r, $pid) = popen_rd(\@cmd, undef, { 0 => $out_r }); + $in_r = $self->{in} = $git->popen(qw(fast-import + --quiet --done --date-format=raw), + undef, { 0 => $out_r }); $out_w->autoflush(1); - $self->{in} = $in_r; $self->{out} = $out_w; - $self->{pid} = $pid; $self->{nchg} = 0; }; if ($@) { @@ -162,14 +160,14 @@ sub check_remove_v1 { sub checkpoint { my ($self) = @_; - return unless $self->{pid}; + return unless $self->{in}; print { $self->{out} } "checkpoint\n" or wfail; undef; } sub progress { my ($self, $msg) = @_; - return unless $self->{pid}; + return unless $self->{in}; print { $self->{out} } "progress $msg\n" or wfail; readline($self->{in}) eq "progress $msg\n" or die "progress $msg not received\n"; @@ -218,7 +216,7 @@ sub barrier { # used for v2 sub get_mark { my ($self, $mark) = @_; - die "not active\n" unless $self->{pid}; + die "not active\n" unless $self->{in}; my ($r, $w) = $self->gfi_start; print $w "get-mark $mark\n" or wfail; defined(my $oid = <$r>) or die "get-mark failed, need git 2.6.0+\n"; @@ -481,10 +479,7 @@ sub done { eval { my $r = delete $self->{in} or die 'BUG: missing {in} when done'; print $w "done\n" or wfail; - my $pid = delete $self->{pid} or - die 'BUG: missing {pid} when done'; - waitpid($pid, 0) == $pid or die 'fast-import did not finish'; - $? == 0 or die "fast-import failed: $?"; + close $r or die "fast-import failed: $?"; # ProcessPipe::CLOSE }; my $wait_err = $@; my $nchg = delete $self->{nchg};
Only saves us one line of code, but that's better than nothing. --- lib/PublicInbox/Git.pm | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index cdd2b400..3d97300c 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -507,14 +507,13 @@ sub modified ($) { # templates/this--description in git.git sub manifest_entry { my ($self, $epoch, $default_desc) = @_; - my ($fh, $pid) = $self->popen('show-ref'); + my $fh = $self->popen('show-ref'); my $dig = Digest::SHA->new(1); while (read($fh, my $buf, 65536)) { $dig->add($buf); } - close $fh; - waitpid($pid, 0); - return if $?; # empty, uninitialized git repo + close $fh or return; # empty, uninitialized git repo + undef $fh; # for open, below my $git_dir = $self->{git_dir}; my $ent = { fingerprint => $dig->hexdigest,
ProcessPipe has a built-in mechanism to prevent siblings from reaping children. --- lib/PublicInbox/Qspawn.pm | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 68b71112..7e50a59a 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -28,7 +28,6 @@ package PublicInbox::Qspawn; use strict; use PublicInbox::Spawn qw(popen_rd); use PublicInbox::GzipFilter; -use PublicInbox::DS qw(dwaitpid); # doesn't need event loop # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers use Errno qw(EAGAIN EINTR); @@ -58,9 +57,9 @@ sub _do_spawn { $self->{cmd} = $o{quiet} ? undef : $cmd; eval { # popen_rd may die on EMFILE, ENFILE - ($self->{rpipe}, $self->{pid}) = popen_rd($cmd, $cmd_env, \%o); + $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o); - die "E: $!" unless defined($self->{pid}); + die "E: $!" unless defined($self->{rpipe}); $limiter->{running}++; $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM @@ -117,16 +116,14 @@ sub finalize ($$) { } } -# callback for dwaitpid +# callback for dwaitpid or ProcessPipe sub waitpid_err { finalize($_[0], child_err($?)) } sub finish ($;$) { my ($self, $err) = @_; - if (delete $self->{rpipe}) { - dwaitpid $self->{pid}, \&waitpid_err, $self; - } else { - finalize($self, $err); - } + my $tied_pp = delete($self->{rpipe}) or return finalize($self, $err); + my PublicInbox::ProcessPipe $pp = tied *$tied_pp; + @$pp{qw(cb arg)} = (\&waitpid_err, $self); # for ->DESTROY } sub start ($$$) {