From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF, T_SCC_BODY_TEXT_LINE shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id D50461F677 for ; Thu, 2 Nov 2023 09:35:42 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1698917742; bh=g7iXxlF3gClbNLXzbHaCQDAV2db0ZYRpBf3NeOr+1Oc=; h=From:To:Subject:Date:In-Reply-To:References:From; b=k3Y/+9Kq7Vo1MQ5pWtotoGQNH5AzOduO6rcMloJO/XAr66zNJiN+1LVQ/lwzZtoOO r8j7HGUyQckmsq0BpDLNTSY6m0klx99U0DBvy3QJGp9pJuVcq+UnzZ4WhALtBM3uWr cyb4uDdvaAV4/L3IWKMWFf8msEiP/0F07bzh9PN8= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 08/14] replace ProcessIO with untied PublicInbox::IO Date: Thu, 2 Nov 2023 09:35:33 +0000 Message-Id: <20231102093539.2067470-9-e@80x24.org> In-Reply-To: <20231102093539.2067470-1-e@80x24.org> References: <20231102093539.2067470-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This fixes two major problems with the use of tie for filehandles: * no way to do fcntl, stat, etc. calls directly on the tied handle, forcing callers to use the `tied' perlop to access the underlying IO::Handle * needing separate classes to handle blocking and non-blocking I/O As a result, Git->cleanup_if_unlinked, InputPipe->consume, and Qspawn->_yield_start have fewer bizzare bits and we can call `$io->blocking(0)' directly instead of `(tied *$io)->{fh}->blocking(0)' Having a PublicInbox::IO class will also allow us to support custom read buffering which allows inspecting the current state. --- MANIFEST | 3 +- lib/PublicInbox/Gcf2Client.pm | 7 ++- lib/PublicInbox/Git.pm | 8 ++-- lib/PublicInbox/IO.pm | 54 ++++++++++++++++++++++++ lib/PublicInbox/Import.pm | 4 +- lib/PublicInbox/InputPipe.pm | 1 - lib/PublicInbox/LeiToMail.pm | 5 +-- lib/PublicInbox/ProcessIO.pm | 75 --------------------------------- lib/PublicInbox/ProcessIONBF.pm | 25 ----------- lib/PublicInbox/Qspawn.pm | 5 +-- lib/PublicInbox/Spawn.pm | 6 +-- t/spawn.t | 26 ++++++------ xt/check-run.t | 2 +- 13 files changed, 85 insertions(+), 136 deletions(-) create mode 100644 lib/PublicInbox/IO.pm delete mode 100644 lib/PublicInbox/ProcessIO.pm delete mode 100644 lib/PublicInbox/ProcessIONBF.pm diff --git a/MANIFEST b/MANIFEST index 3df48667..479c09de 100644 --- a/MANIFEST +++ b/MANIFEST @@ -218,6 +218,7 @@ lib/PublicInbox/IMAPClient.pm lib/PublicInbox/IMAPD.pm lib/PublicInbox/IMAPTracker.pm lib/PublicInbox/IMAPsearchqp.pm +lib/PublicInbox/IO.pm lib/PublicInbox/IPC.pm lib/PublicInbox/IdxStack.pm lib/PublicInbox/Import.pm @@ -319,8 +320,6 @@ lib/PublicInbox/OverIdx.pm lib/PublicInbox/POP3.pm lib/PublicInbox/POP3D.pm lib/PublicInbox/PktOp.pm -lib/PublicInbox/ProcessIO.pm -lib/PublicInbox/ProcessIONBF.pm lib/PublicInbox/Qspawn.pm lib/PublicInbox/Reply.pm lib/PublicInbox/RepoAtom.pm diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm index f63a0335..5220c474 100644 --- a/lib/PublicInbox/Gcf2Client.pm +++ b/lib/PublicInbox/Gcf2Client.pm @@ -10,7 +10,7 @@ use PublicInbox::Gcf2; # fails if Inline::C or libgit2-dev isn't available use PublicInbox::Spawn qw(spawn); use Socket qw(AF_UNIX SOCK_STREAM); use PublicInbox::Syscall qw(EPOLLIN); -use PublicInbox::ProcessIO; +use PublicInbox::IO; use autodie qw(socketpair); # fields: @@ -32,11 +32,10 @@ sub new { $opt->{0} = $opt->{1} = $s2; my $cmd = [$^X, $^W ? ('-w') : (), qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]]; - my $pid = spawn($cmd, $env, $opt); - my $sock = PublicInbox::ProcessIO->maybe_new($pid, $s1); + PublicInbox::IO::attach_pid($s1, spawn($cmd, $env, $opt)); $self->{inflight} = []; $self->{epwatch} = \undef; # for Git->cleanup - $self->SUPER::new($sock, EPOLLIN); + $self->SUPER::new($s1, EPOLLIN); } sub gcf2_async ($$$;$) { diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index 3dac32be..d00f576e 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -18,7 +18,7 @@ use Errno qw(EINTR EAGAIN); use File::Glob qw(bsd_glob GLOB_NOSORT); use File::Spec (); use PublicInbox::Spawn qw(spawn popen_rd run_qx which); -use PublicInbox::ProcessIONBF; +use PublicInbox::IO; use PublicInbox::Tmpfile; use IO::Poll qw(POLLIN); use Carp qw(croak carp); @@ -146,6 +146,7 @@ sub _sock_cmd { my ($self, $batch, $err_c) = @_; $self->{sock} and Carp::confess('BUG: {sock} exists'); socketpair(my $s1, my $s2, AF_UNIX, SOCK_STREAM, 0); + $s1->blocking(0); my $opt = { pgid => 0, 0 => $s2, 1 => $s2 }; my $gd = $self->{git_dir}; if ($gd =~ s!/([^/]+/[^/]+)\z!/!) { @@ -164,7 +165,7 @@ sub _sock_cmd { $self->fail("tmpfile($id): $!"); } my $pid = spawn(\@cmd, undef, $opt); - $self->{sock} = PublicInbox::ProcessIONBF->new($pid, $s1); + $self->{sock} = PublicInbox::IO::attach_pid($s1, $pid); } sub poll_in ($) { IO::Poll::_poll($RDTIMEO, fileno($_[0]), my $ev = POLLIN) } @@ -638,8 +639,7 @@ sub cleanup_if_unlinked { my $ret = 0; for my $obj ($self, ($self->{ck} // ())) { my $sock = $obj->{sock} // next; - my PublicInbox::ProcessIONBF $p = tied *$sock; # ProcessIONBF - my $pid = $p->{pid} // next; + my $pid = $sock->attached_pid // next; open my $fh, '<', "/proc/$pid/maps" or return cleanup($self, 1); while (<$fh>) { # n.b. we do not restart for unlinked multi-pack-index diff --git a/lib/PublicInbox/IO.pm b/lib/PublicInbox/IO.pm new file mode 100644 index 00000000..63850a52 --- /dev/null +++ b/lib/PublicInbox/IO.pm @@ -0,0 +1,54 @@ +# Copyright (C) all contributors +# License: AGPL-3.0+ + +# supports reaping of children tied to a pipe or socket +package PublicInbox::IO; +use v5.12; +use parent qw(IO::Handle); +use PublicInbox::DS qw(awaitpid); + +# TODO: this can probably be the new home for read_all, try_cat +# and maybe even buffered read/readline... + +sub waitcb { # awaitpid callback + my ($pid, $errref, $cb, @args) = @_; + $$errref = $?; # sets .cerr for _close + $cb->($pid, @args) if $cb; +} + +sub attach_pid ($$;@) { + my ($io, $pid, @cb_arg) = @_; + bless $io, __PACKAGE__; + # we share $err (and not $self) with awaitpid to avoid a ref cycle + ${*$io}{pi_io_reap} = [ $$, $pid, \(my $err) ]; + awaitpid($pid, \&waitcb, \$err, @cb_arg); + $io; +} + +sub attached_pid { + my ($io) = @_; + ${${*$io}{pi_io_reap} // []}[1]; +} + +# caller cares about error result if they call close explicitly +# reap->[2] may be set before this is called via waitcb +sub close { + my ($io) = @_; + my $ret = $io->SUPER::close; + my $reap = delete ${*$io}{pi_io_reap}; + return $ret unless $reap && $reap->[0] == $$; + ${$reap->[2]} // (my $w = awaitpid($reap->[1])); # sets [2] + ($? = ${$reap->[2]}) ? '' : $ret; +} + +sub DESTROY { + my ($io) = @_; + my $reap = delete ${*$io}{pi_io_reap}; + if ($reap && $reap->[0] == $$) { + $io->SUPER::close; + awaitpid($reap->[1]); + } + $io->SUPER::DESTROY; +} + +1; diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm index e12a56e8..dfba34b9 100644 --- a/lib/PublicInbox/Import.pm +++ b/lib/PublicInbox/Import.pm @@ -16,7 +16,7 @@ use PublicInbox::MsgTime qw(msg_datestamp); use PublicInbox::ContentHash qw(content_digest); use PublicInbox::MDA; use PublicInbox::Eml; -use PublicInbox::ProcessIO; +use PublicInbox::IO; use POSIX qw(strftime); use autodie qw(read close socketpair); use Carp qw(croak); @@ -77,7 +77,7 @@ sub gfi_start { --quiet --done --date-format=raw) ]; my $pid = spawn($gfi, undef, { 0 => $s2, 1 => $s2 }); $self->{nchg} = 0; - $self->{io} = PublicInbox::ProcessIO->maybe_new($pid, $io); + $self->{io} = PublicInbox::IO::attach_pid($io, $pid); }; if ($@) { $self->lock_release; diff --git a/lib/PublicInbox/InputPipe.pm b/lib/PublicInbox/InputPipe.pm index f4d57e7d..232f20e8 100644 --- a/lib/PublicInbox/InputPipe.pm +++ b/lib/PublicInbox/InputPipe.pm @@ -39,7 +39,6 @@ sub consume { if ($@) { # regular file (but not w/ select|IO::Poll backends) $self->{-need_rq} = 1; $self->requeue; - } elsif (do { no warnings 'unopened'; !stat($in) }) { # ProcessIONBF } elsif (-p _ || -S _) { # O_NONBLOCK for sockets and pipes $in->blocking(0); } elsif (-t $in) { # isatty(3) can't use `_' stat cache diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index ead60b38..b07c2c90 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -7,7 +7,7 @@ use strict; use v5.10.1; use parent qw(PublicInbox::IPC); use PublicInbox::Eml; -use PublicInbox::ProcessIO; +use PublicInbox::IO; use PublicInbox::Spawn qw(spawn); use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY); @@ -160,8 +160,7 @@ sub _post_augment_mbox { # open a compressor process from top-level lei-daemon my $cmd = PublicInbox::MboxReader::zsfx2cmd($zsfx, undef, $lei); my ($r, $w) = @{delete $lei->{zpipe}}; my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2}, pgid => 0 }; - my $pid = spawn($cmd, undef, $rdr); - $lei->{1} = PublicInbox::ProcessIO->maybe_new($pid, $w, + $lei->{1} = PublicInbox::IO::attach_pid($w, spawn($cmd, undef, $rdr), \&reap_compress, $lei, $cmd, $lei->{1}); } diff --git a/lib/PublicInbox/ProcessIO.pm b/lib/PublicInbox/ProcessIO.pm deleted file mode 100644 index ea5d3e6c..00000000 --- a/lib/PublicInbox/ProcessIO.pm +++ /dev/null @@ -1,75 +0,0 @@ -# Copyright (C) all contributors -# License: AGPL-3.0+ - -# a tied handle for auto reaping of children tied to a pipe or socket, -# see perltie(1) for details. -package PublicInbox::ProcessIO; -use v5.12; -use PublicInbox::DS qw(awaitpid); -use Symbol qw(gensym); -use bytes qw(length); - -sub maybe_new { - my ($cls, $pid, $fh, @cb_arg) = @_; - return ($fh, $pid) if wantarray; - my $s = gensym; - tie *$s, $cls, $pid, $fh, @cb_arg; - $s; -} - -sub waitcb { # awaitpid callback - my ($pid, $err_ref, $cb, @args) = @_; - $$err_ref = $?; # sets >{pp_chld_err} for _close - $cb->($pid, @args) if $cb; -} - -sub TIEHANDLE { - my ($cls, $pid, $fh, @cb_arg) = @_; - my $self = bless { pid => $pid, fh => $fh, ppid => $$ }, $cls; - # we share $err (and not $self) with awaitpid to avoid a ref cycle - $self->{pp_chld_err} = \(my $err); - awaitpid($pid, \&waitcb, \$err, @cb_arg); - $self; -} - -# for IO::Uncompress::Gunzip and PublicInbox::LeiRediff -sub BINMODE { @_ == 1 ? binmode($_[0]->{fh}) : binmode($_[0]->{fh}, $_[1]) } - -sub READ { read($_[0]->{fh}, $_[1], $_[2], $_[3] || 0) } - -sub READLINE { readline($_[0]->{fh}) } - -sub WRITE { syswrite($_[0]->{fh}, $_[1], $_[2] // length($_[1]), $_[3] // 0) } - -sub PRINT { print { $_[0]->{fh} } @_[1..$#_] } - -sub FILENO { fileno($_[0]->{fh}) } - -sub _close ($;$) { - my ($self, $wait) = @_; - my ($fh, $pid) = delete(@$self{qw(fh pid)}); - my $ret = (defined($fh) && $wait) ? close($fh) : ($fh = ''); - return $ret unless defined($pid) && $self->{ppid} == $$; - if ($wait) { # caller cares about the exit status: - # synchronous wait via defined(wantarray) on awaitpid: - defined(${$self->{pp_chld_err}}) or $wait = awaitpid($pid); - ($? = ${$self->{pp_chld_err}}) and $ret = ''; - } else { - awaitpid($pid); # depends on $in_loop or not - } - $ret; -} - -# if caller uses close(), assume they want to check $? immediately so -# we'll waitpid() synchronously. n.b. wantarray doesn't seem to -# propagate `undef' down to tied methods, otherwise I'd rely on that. -sub CLOSE { _close($_[0], 1) } - -# if relying on DESTROY, assume the caller doesn't care about $? and -# we can let the event loop call waitpid() whenever it gets SIGCHLD -sub DESTROY { - _close($_[0]); - undef; -} - -1; diff --git a/lib/PublicInbox/ProcessIONBF.pm b/lib/PublicInbox/ProcessIONBF.pm deleted file mode 100644 index 490e200a..00000000 --- a/lib/PublicInbox/ProcessIONBF.pm +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright (C) all contributors -# License: AGPL-3.0+ - -# used to support unbuffered partial reads -package PublicInbox::ProcessIONBF; -use v5.12; -use parent qw(PublicInbox::ProcessIO); -use IO::Handle; # ->blocking - -sub new { - my ($cls, $pid, $fh, @cb_arg) = @_; - $fh->blocking(0) // die "$fh->blocking(0): $!"; - my $io = $cls->SUPER::maybe_new($pid, $fh, @cb_arg); -} - -sub replace { - my ($cls, $orig) = @_; - my $pio = tied *$orig; # ProcessIO - $pio->{fh}->blocking(0) // die "$pio->{fh}->blocking(0): $!"; - bless $pio, $cls; -} - -sub READ { sysread($_[0]->{fh}, $_[1], $_[2], $_[3] // 0) } - -1; diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index a03e1b01..0bf857c6 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -149,7 +149,7 @@ sub finish ($;$) { # we can safely finalize if pipe was closed before, or if # {_err} is defined by waitpid_err. Deleting {rpipe} will - # trigger PublicInbox::ProcessIO::DESTROY -> waitpid_err, + # trigger PublicInbox::IO::DESTROY -> waitpid_err, # but it may not fire right away if inside the event loop. my $closed_before = !delete($self->{rpipe}); finalize($self) if $closed_before || defined($self->{_err}); @@ -244,9 +244,8 @@ sub ipipe_cb { # InputPipe callback sub _yield_start { # may run later, much later... my ($self) = @_; if ($self->{psgi_env}->{'pi-httpd.async'}) { - require PublicInbox::ProcessIONBF; my $rpipe = $self->{rpipe}; - PublicInbox::ProcessIONBF->replace($rpipe); + $rpipe->blocking(0); PublicInbox::InputPipe::consume($rpipe, \&ipipe_cb, $self); } else { require PublicInbox::GetlineResponse; diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index b4f37bea..d3b7ef6f 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -19,7 +19,7 @@ use PublicInbox::Lock; use Fcntl qw(SEEK_SET); use IO::Handle (); use Carp qw(croak); -use PublicInbox::ProcessIO; +use PublicInbox::IO; our @EXPORT_OK = qw(which spawn popen_rd popen_wr run_die run_wait run_qx); our @RLIMITS = qw(RLIMIT_CPU RLIMIT_CORE RLIMIT_DATA); use autodie qw(open pipe seek sysseek truncate); @@ -377,7 +377,7 @@ sub popen_rd { my ($cmd, $env, $opt, @cb_arg) = @_; pipe(my $r, local $opt->{1}); my $pid = spawn($cmd, $env, $opt); - PublicInbox::ProcessIO->maybe_new($pid, $r, @cb_arg); + wantarray ? ($r, $pid) : PublicInbox::IO::attach_pid($r, $pid, @cb_arg) } sub popen_wr { @@ -385,7 +385,7 @@ sub popen_wr { pipe(local $opt->{0}, my $w); $w->autoflush(1); my $pid = spawn($cmd, $env, $opt); - PublicInbox::ProcessIO->maybe_new($pid, $w, @cb_arg) + wantarray ? ($w, $pid) : PublicInbox::IO::attach_pid($w, $pid, @cb_arg) } sub read_out_err ($) { diff --git a/t/spawn.t b/t/spawn.t index 938a2e5e..3479b6b3 100644 --- a/t/spawn.t +++ b/t/spawn.t @@ -107,34 +107,35 @@ EOF { my $fh = popen_rd([qw(echo hello)]); - ok(fileno($fh) >= 0, 'tied fileno works'); + ok(fileno($fh) >= 0, 'fileno works'); my $l = <$fh>; - is($l, "hello\n", 'tied readline works'); + is($l, "hello\n", 'readline works'); $l = <$fh>; - ok(!$l, 'tied readline works for EOF'); + ok(!$l, 'readline works for EOF'); } { my $fh = popen_rd([qw(printf foo\nbar)]); - ok(fileno($fh) >= 0, 'tied fileno works'); - my $tfh = (tied *$fh)->{fh}; - is($tfh->blocking(0), 1, '->blocking was true'); - is($tfh->blocking, 0, '->blocking is false'); - is($tfh->blocking(1), 0, '->blocking was true'); - is($tfh->blocking, 1, '->blocking is true'); + ok(fileno($fh) >= 0, 'fileno works'); + is($fh->blocking(0), 1, '->blocking was true'); + is($fh->blocking, 0, '->blocking is false'); + is($fh->blocking(1), 0, '->blocking was true'); + is($fh->blocking, 1, '->blocking is true'); my @line = <$fh>; is_deeply(\@line, [ "foo\n", 'bar' ], 'wantarray works on readline'); } { my $fh = popen_rd([qw(echo hello)]); + like($fh->attached_pid, qr/\A[0-9]+\z/, 'have a PID'); my $buf; is(sysread($fh, $buf, 6), 6, 'sysread got 6 bytes'); - is($buf, "hello\n", 'tied gets works'); + is($buf, "hello\n", 'sysread works'); is(sysread($fh, $buf, 6), 0, 'sysread got EOF'); $? = 1; ok($fh->close, 'close succeeds'); is($?, 0, '$? set properly'); + is($fh->attached_pid, undef, 'attached_pid cleared after close'); } { @@ -160,8 +161,8 @@ EOF $fh = popen_rd(['true'], undef, undef, sub { @c = caller }); undef $fh; # ->DESTROY ok(scalar(@c), 'callback fired by ->DESTROY'); - ok(grep(!m[/PublicInbox/ProcessIO\.pm\z], @c), - 'callback not invoked by ProcessIO'); + ok(grep(!m[/PublicInbox/IO\.pm\z], @c), + 'callback not invoked by PublicInbox::IO'); } { # children don't wait on siblings @@ -170,7 +171,6 @@ EOF my @arg; my $fh = popen_rd(['cat'], undef, { 0 => $r }, sub { @arg = @_; warn "x=$$\n" }, 'hi'); - my $pp = tied *$fh; my $pid = fork // BAIL_OUT $!; local $SIG{__WARN__} = sub { _exit(1) }; if ($pid == 0) { diff --git a/xt/check-run.t b/xt/check-run.t index cda839fe..d12b925d 100755 --- a/xt/check-run.t +++ b/xt/check-run.t @@ -14,7 +14,7 @@ use v5.12; use IO::Handle; # ->autoflush use PublicInbox::TestCommon; use PublicInbox::Spawn; -use PublicInbox::DS; # already loaded by Spawn via ProcessIO +use PublicInbox::DS; # already loaded by Spawn via PublicInbox::IO use Getopt::Long qw(:config gnu_getopt no_ignore_case auto_abbrev); use Errno qw(EINTR); use Fcntl qw(:seek);