* [PATCH 00/12] improve process reaping
@ 2023-01-17 7:18 Eric Wong
2023-01-17 7:19 ` [PATCH 01/12] ipc: remove {-reap_async} field Eric Wong
` (11 more replies)
0 siblings, 12 replies; 14+ messages in thread
From: Eric Wong @ 2023-01-17 7:18 UTC (permalink / raw)
To: meta
dwaitpid was implemented under the assumption our code could
eventually use a multithreaded Perl 5. Since the threads(3perl)
manpage officially discourages threads, that assumption proved
false. This series saves syscalls and improves ergonomics of
our internal APIs, data structures and code a small bit.
Eric Wong (12):
ipc: remove {-reap_async} field
t/solver_git.t: fix test message
qspawn: drop {psgi_env} deref
ds: introduce awaitpid, switch ProcessPipe users
git|gcf2: switch to awaitpid
watch: switch to awaitpid
watch: simplify internal data structures
eofpipe: drop {arg} support for now
watch: IMAP and NNTP polling can use the same interval
ipc: drop unused $args from ->ipc_worker_stop
ipc+lei: switch to awaitpid
ds: drop dwaitpid, switch to waitpid(-1)
Documentation/technical/ds.txt | 2 +-
lib/PublicInbox/DS.pm | 69 ++++++++++++------------
lib/PublicInbox/Daemon.pm | 2 +-
lib/PublicInbox/EOFpipe.pm | 10 ++--
lib/PublicInbox/Gcf2Client.pm | 5 +-
lib/PublicInbox/Git.pm | 10 ++--
lib/PublicInbox/IPC.pm | 39 +++++++-------
lib/PublicInbox/LEI.pm | 8 ++-
lib/PublicInbox/LeiConvert.pm | 2 +-
lib/PublicInbox/LeiInput.pm | 2 +-
lib/PublicInbox/LeiMirror.pm | 7 ++-
lib/PublicInbox/LeiStore.pm | 7 ++-
lib/PublicInbox/LeiToMail.pm | 11 ++--
lib/PublicInbox/LeiUp.pm | 5 +-
lib/PublicInbox/LeiXSearch.pm | 9 ++--
lib/PublicInbox/ProcessPipe.pm | 42 +++++++--------
lib/PublicInbox/Qspawn.pm | 61 ++++++++++-----------
lib/PublicInbox/Spawn.pm | 6 +--
lib/PublicInbox/Watch.pm | 96 ++++++++++++----------------------
script/public-inbox-clone | 2 +-
t/solver_git.t | 2 +-
t/spawn.t | 12 +++--
22 files changed, 186 insertions(+), 223 deletions(-)
^ permalink raw reply [flat|nested] 14+ messages in thread
* [PATCH 01/12] ipc: remove {-reap_async} field
2023-01-17 7:18 [PATCH 00/12] improve process reaping Eric Wong
@ 2023-01-17 7:19 ` Eric Wong
2023-01-17 7:19 ` [PATCH 02/12] t/solver_git.t: fix test message Eric Wong
` (10 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2023-01-17 7:19 UTC (permalink / raw)
To: meta
We can just test for {-reap_do}, instead to save us a few bytes.
---
lib/PublicInbox/IPC.pm | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 74862673..671ad5d5 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -150,7 +150,6 @@ sub ipc_worker_reap { # dwaitpid callback
sub wq_wait_async {
my ($self, $cb, @uargs) = @_;
local $PublicInbox::DS::in_loop = 1;
- $self->{-reap_async} = 1;
$self->{-reap_do} = $cb;
my @pids = keys %{$self->{-wq_workers}};
dwaitpid($_, \&ipc_worker_reap, [ $self, @uargs ]) for @pids;
@@ -350,7 +349,7 @@ sub wq_do {
sub prepare_nonblock {
($_[0]->{-wq_s1} // die 'BUG: no {-wq_s1}')->blocking(0);
- $_[0]->{-reap_async} or die 'BUG: {-reap_async} needed for nonblock';
+ $_[0]->{-reap_do} or die 'BUG: {-reap_do} needed for nonblock';
require PublicInbox::WQBlocked;
}
@@ -424,11 +423,11 @@ sub wq_workers_start {
sub wq_close {
my ($self) = @_;
if (my $wqb = delete $self->{wqb}) {
- $self->{-reap_async} or die 'BUG: {-reap_async} unset';
+ $self->{-reap_do} or die 'BUG: {-reap_do} unset';
$wqb->enq_close;
}
delete @$self{qw(-wq_s1 -wq_s2)} or return;
- return if $self->{-reap_async};
+ return if $self->{-reap_do};
my @pids = keys %{$self->{-wq_workers}};
dwaitpid($_, \&ipc_worker_reap, [ $self ]) for @pids;
}
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 02/12] t/solver_git.t: fix test message
2023-01-17 7:18 [PATCH 00/12] improve process reaping Eric Wong
2023-01-17 7:19 ` [PATCH 01/12] ipc: remove {-reap_async} field Eric Wong
@ 2023-01-17 7:19 ` Eric Wong
2023-01-17 7:19 ` [PATCH 03/12] qspawn: drop {psgi_env} deref Eric Wong
` (9 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2023-01-17 7:19 UTC (permalink / raw)
To: meta
---
t/solver_git.t | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/t/solver_git.t b/t/solver_git.t
index 8faa7309..06d75816 100644
--- a/t/solver_git.t
+++ b/t/solver_git.t
@@ -293,7 +293,7 @@ EOF
is($res->code, 404, 'failure with null OID');
$res = $cb->(GET("/$name/$non_existent/s/"));
- is($res->code, 404, 'failure with null OID');
+ is($res->code, 404, 'failure with non-existent OID');
$res = $cb->(GET("/$name/$v1_0_0_tag/s/"));
is($res->code, 200, 'shows commit (unabbreviated)');
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 03/12] qspawn: drop {psgi_env} deref
2023-01-17 7:18 [PATCH 00/12] improve process reaping Eric Wong
2023-01-17 7:19 ` [PATCH 01/12] ipc: remove {-reap_async} field Eric Wong
2023-01-17 7:19 ` [PATCH 02/12] t/solver_git.t: fix test message Eric Wong
@ 2023-01-17 7:19 ` Eric Wong
2023-01-17 7:19 ` [PATCH 04/12] ds: introduce awaitpid, switch ProcessPipe users Eric Wong
` (8 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2023-01-17 7:19 UTC (permalink / raw)
To: meta
We don't use the assigned variable anywhere, and just access
PATH_INFO directly in the subsequent warning message.
---
lib/PublicInbox/Qspawn.pm | 1 -
1 file changed, 1 deletion(-)
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 0044c1f6..779b703a 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -212,7 +212,6 @@ sub rd_hdr ($) {
$ret = [ 500, [], [ "Internal error\n" ] ];
} elsif (!defined($ret) && !$r) {
my $cmd = $self->{cmd} // [ '(?)' ];
- my $env = $self->{psgi_env};
warn <<EOM;
EOF parsing headers from @$cmd ($self->{psgi_env}->{REQUEST_URI})
EOM
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 04/12] ds: introduce awaitpid, switch ProcessPipe users
2023-01-17 7:18 [PATCH 00/12] improve process reaping Eric Wong
` (2 preceding siblings ...)
2023-01-17 7:19 ` [PATCH 03/12] qspawn: drop {psgi_env} deref Eric Wong
@ 2023-01-17 7:19 ` Eric Wong
2023-01-18 2:10 ` [PATCH 13/12] qspawn: use ->DESTROY to force ->finalize Eric Wong
2023-01-17 7:19 ` [PATCH 05/12] git|gcf2: switch to awaitpid Eric Wong
` (7 subsequent siblings)
11 siblings, 1 reply; 14+ messages in thread
From: Eric Wong @ 2023-01-17 7:19 UTC (permalink / raw)
To: meta
awaitpid is the new API which will eventually replace dwaitpid.
It enables early registration of callback handlers. Eventually
(once dwaitpid is gone) it'll be able to use fewer waitpid
calls.
The avoidance of waitpid(-1) in our earlier days was driven by
the belief that threads may eventually become relevant for Perl 5,
but that's extremely unlikely at this stage. I will still
introduce optional threads via C, but they definitely won't be
spawning/reaping processes.
Argument order to callbacks is swapped (PID first) to allow
flattened multiple arguments more natrually. The previous API
(allowing only a single argument, as influenced by
pthread_create(3)) was more tedious as it involved packing
multiple arguments into yet another array.
---
lib/PublicInbox/DS.pm | 43 +++++++++++++++++++++---
lib/PublicInbox/LeiToMail.pm | 4 +--
lib/PublicInbox/ProcessPipe.pm | 42 ++++++++++++------------
lib/PublicInbox/Qspawn.pm | 60 ++++++++++++++++++----------------
lib/PublicInbox/Spawn.pm | 6 ++--
t/spawn.t | 12 ++++---
6 files changed, 104 insertions(+), 63 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index e4629e97..9563a1cb 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -32,11 +32,12 @@ use PublicInbox::Syscall qw(:epoll);
use PublicInbox::Tmpfile;
use Errno qw(EAGAIN EINVAL);
use Carp qw(carp croak);
-our @EXPORT_OK = qw(now msg_more dwaitpid add_timer add_uniq_timer);
+our @EXPORT_OK = qw(now msg_more dwaitpid awaitpid add_timer add_uniq_timer);
my %Stack;
my $nextq; # queue for next_tick
my $wait_pids; # list of [ pid, callback, callback_arg ]
+my $AWAIT_PIDS; # pid => [ $callback, @args ]
my $reap_armed;
my $ToClose; # sockets to close when event loop is done
our (
@@ -74,11 +75,11 @@ sub Reset {
# we may be iterating inside one of these on our stack
my @q = delete @Stack{keys %Stack};
for my $q (@q) { @$q = () }
- $wait_pids = $nextq = $ToClose = undef;
+ $AWAIT_PIDS = $wait_pids = $nextq = $ToClose = undef;
$ep_io = undef; # closes real $Epoll FD
$Epoll = undef; # may call DSKQXS::DESTROY
} while (@Timers || keys(%Stack) || $nextq || $wait_pids ||
- $ToClose || keys(%DescriptorMap) ||
+ $ToClose || keys(%DescriptorMap) || $AWAIT_PIDS ||
$PostLoopCallback || keys(%UniqTimer));
$reap_armed = undef;
@@ -201,6 +202,13 @@ sub block_signals () {
$oldset;
}
+sub await_cb ($;@) {
+ my ($pid, @cb_args) = @_;
+ my $cb = shift @cb_args or return;
+ eval { $cb->($pid, @cb_args) };
+ warn "E: awaitpid($pid): $@" if $@;
+}
+
# We can't use waitpid(-1) safely here since it can hit ``, system(),
# and other things. So we scan the $wait_pids list, which is hopefully
# not too big. We keep $wait_pids small by not calling dwaitpid()
@@ -208,10 +216,12 @@ sub block_signals () {
sub reap_pids {
$reap_armed = undef;
- my $tmp = $wait_pids or return;
+ my $tmp = $wait_pids // [];
$wait_pids = undef;
$Stack{reap_runq} = $tmp;
my $oldset = block_signals();
+
+ # old API
foreach my $ary (@$tmp) {
my ($pid, $cb, $arg) = @$ary;
my $ret = waitpid($pid, WNOHANG);
@@ -226,6 +236,14 @@ sub reap_pids {
warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?";
}
}
+
+ # new API TODO: convert to waitpid(-1) in the future as long
+ # as we don't use threads
+ for my $pid (keys %$AWAIT_PIDS) {
+ my $wpid = waitpid($pid, WNOHANG) // next;
+ my $cb_args = delete $AWAIT_PIDS->{$wpid} or next;
+ await_cb($pid, @$cb_args);
+ }
sig_setmask($oldset);
delete $Stack{reap_runq};
}
@@ -720,6 +738,23 @@ sub dwaitpid ($;$$) {
}
}
+sub awaitpid {
+ my ($pid, @cb_args) = @_;
+ $AWAIT_PIDS->{$pid} //= @cb_args ? \@cb_args : 0;
+ # provide synchronous API
+ if (defined(wantarray) || (!$in_loop && !@cb_args)) {
+ my $ret = waitpid($pid, 0) // -2;
+ if ($ret == $pid) {
+ my $cb_args = delete $AWAIT_PIDS->{$pid};
+ @cb_args = @$cb_args if !@cb_args && $cb_args;
+ await_cb($pid, @cb_args);
+ return $ret;
+ }
+ }
+ # We could've just missed our SIGCHLD, cover it, here:
+ enqueue_reap() if $in_loop;
+}
+
1;
=head1 AUTHORS (Danga::Socket)
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index b58e2652..1528165a 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -150,8 +150,8 @@ sub git_to_mail { # git->cat_async callback
$self->{lei}->fail("$@ (oid=$oid)") if $@;
}
-sub reap_compress { # dwaitpid callback
- my ($lei, $pid) = @_;
+sub reap_compress { # awaitpid callback
+ my ($pid, $lei) = @_;
my $cmd = delete $lei->{"pid.$pid"};
return if $? == 0;
$lei->fail("@$cmd failed", $? >> 8);
diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm
index 97e9c268..068631c6 100644
--- a/lib/PublicInbox/ProcessPipe.pm
+++ b/lib/PublicInbox/ProcessPipe.pm
@@ -1,16 +1,25 @@
-# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
# a tied handle for auto reaping of children tied to a pipe, see perltie(1)
package PublicInbox::ProcessPipe;
-use strict;
-use v5.10.1;
+use v5.12;
use Carp qw(carp);
+use PublicInbox::DS qw(awaitpid);
+
+sub waitcb { # awaitpid callback
+ my ($pid, $err_ref, $cb, @args) = @_;
+ $$err_ref = $?; # sets >{pp_chld_err} for _close
+ $cb->($pid, @args) if $cb;
+}
sub TIEHANDLE {
- my ($class, $pid, $fh, $cb, $arg) = @_;
- bless { pid => $pid, fh => $fh, ppid => $$, cb => $cb, arg => $arg },
- $class;
+ my ($cls, $pid, $fh, @cb_arg) = @_;
+ my $self = bless { pid => $pid, fh => $fh, ppid => $$ }, $cls;
+ # we share $err (and not $self) with awaitpid to avoid a ref cycle
+ $self->{pp_chld_err} = \(my $err);
+ awaitpid($pid, \&waitcb, \$err, @cb_arg);
+ $self;
}
sub BINMODE { binmode(shift->{fh}) } # for IO::Uncompress::Gunzip
@@ -33,24 +42,15 @@ sub FILENO { fileno($_[0]->{fh}) }
sub _close ($;$) {
my ($self, $wait) = @_;
- my $fh = delete $self->{fh};
+ my ($fh, $pid) = delete(@$self{qw(fh pid)});
my $ret = defined($fh) ? close($fh) : '';
- my ($pid, $cb, $arg) = delete @$self{qw(pid cb arg)};
return $ret unless defined($pid) && $self->{ppid} == $$;
if ($wait) { # caller cares about the exit status:
- my $wp = waitpid($pid, 0);
- if ($wp == $pid) {
- $ret = '' if $?;
- if ($cb) {
- eval { $cb->($arg, $pid) };
- carp "E: cb(arg, $pid): $@" if $@;
- }
- } else {
- carp "waitpid($pid, 0) = $wp, \$!=$!, \$?=$?";
- }
- } else { # caller just undef-ed it, let event loop deal with it
- require PublicInbox::DS;
- PublicInbox::DS::dwaitpid($pid, $cb, $arg);
+ # synchronous wait via defined(wantarray) on awaitpid:
+ defined(${$self->{pp_chld_err}}) or $wait = awaitpid($pid);
+ ($? = ${$self->{pp_chld_err}}) and $ret = '';
+ } else {
+ awaitpid($pid); # depends on $in_loop or not
}
$ret;
}
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 779b703a..02357dbf 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -28,6 +28,7 @@ package PublicInbox::Qspawn;
use v5.12;
use PublicInbox::Spawn qw(popen_rd);
use PublicInbox::GzipFilter;
+use PublicInbox::DS qw(awaitpid);
use Scalar::Util qw(blessed);
# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
@@ -57,35 +58,21 @@ sub _do_spawn {
}
}
$self->{cmd} = $o{quiet} ? undef : $cmd;
+ $o{cb_arg} = [ \&waitpid_err, $self ];
eval {
# popen_rd may die on EMFILE, ENFILE
- $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o);
-
- die "E: $!" unless defined($self->{rpipe});
-
+ $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o) // die "E: $!";
$limiter->{running}++;
$start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
};
finish($self, $@) if $@;
}
-sub child_err ($) {
- my ($child_error) = @_; # typically $?
- my $exitstatus = ($child_error >> 8) or return;
- my $sig = $child_error & 127;
- my $msg = "exit status=$exitstatus";
- $msg .= " signal=$sig" if $sig;
- $msg;
-}
-
-sub finalize ($$) {
- my ($self, $err) = @_;
-
- my ($env, $qx_cb, $qx_arg, $qx_buf) =
- delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
+sub finalize ($) {
+ my ($self) = @_;
- # done, spawn whatever's in the queue
- my $limiter = $self->{limiter};
+ # process is done, spawn whatever's in the queue
+ my $limiter = delete $self->{limiter} or return;
my $running = --$limiter->{running};
if ($running < $limiter->{max}) {
@@ -93,14 +80,16 @@ sub finalize ($$) {
_do_spawn(@$next, $limiter);
}
}
-
- if ($err) {
+ if (my $err = $self->{_err}) { # set by finish or waitpid_err
utf8::decode($err);
if (my $dst = $self->{qsp_err}) {
$$dst .= $$dst ? " $err" : "; $err";
}
warn "@{$self->{cmd}}: $err" if $self->{cmd};
}
+
+ my ($env, $qx_cb, $qx_arg, $qx_buf) =
+ delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
if ($qx_cb) {
eval { $qx_cb->($qx_buf, $qx_arg) };
return unless $@;
@@ -115,14 +104,28 @@ sub finalize ($$) {
}
}
-# callback for dwaitpid or ProcessPipe
-sub waitpid_err { finalize($_[0], child_err($?)) }
+sub waitpid_err { # callback for awaitpid
+ my (undef, $self) = @_; # $_[0]: pid
+ $self->{_err} = ''; # for defined check in ->finish
+ if ($?) {
+ my $status = $? >> 8;
+ my $sig = $? & 127;
+ $self->{_err} .= "exit status=$status";
+ $self->{_err} .= " signal=$sig" if $sig;
+ }
+ finalize($self) if !$self->{rpipe};
+}
sub finish ($;$) {
my ($self, $err) = @_;
- my $tied_pp = delete($self->{rpipe}) or return finalize($self, $err);
- my PublicInbox::ProcessPipe $pp = tied *$tied_pp;
- @$pp{qw(cb arg)} = (\&waitpid_err, $self); # for ->DESTROY
+ $self->{_err} //= $err; # only for $@
+
+ # we can safely finalize if pipe was closed before, or if
+ # {_err} is defined by waitpid_err. Deleting {rpipe} will
+ # trigger PublicInbox::ProcessPipe::DESTROY -> waitpid_err,
+ # but it may not fire right away if inside the event loop.
+ my $closed_before = !delete($self->{rpipe});
+ finalize($self) if $closed_before || defined($self->{_err});
}
sub start ($$$) {
@@ -247,10 +250,9 @@ sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
if (ref($r) ne 'ARRAY' || scalar(@$r) == 3) { # error
if ($async) { # calls rpipe->close && ->event_step
$async->close; # PublicInbox::HTTPD::Async::close
- } else { # generic PSGI:
+ } else { # generic PSGI, use PublicInbox::ProcessPipe::CLOSE
delete($self->{rpipe})->close;
event_step($self);
- waitpid_err($self);
}
if (ref($r) eq 'ARRAY') { # error
$wcb->($r)
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index 7f61d8db..826ee508 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -365,9 +365,9 @@ sub popen_rd {
$opt->{1} = fileno($w);
my $pid = spawn($cmd, $env, $opt);
return ($r, $pid) if wantarray;
- my $ret = gensym;
- tie *$ret, 'PublicInbox::ProcessPipe', $pid, $r, @$opt{qw(cb arg)};
- $ret;
+ my $s = gensym;
+ tie *$s, 'PublicInbox::ProcessPipe', $pid, $r, @{$opt->{cb_arg} // []};
+ $s;
}
sub run_die ($;$$) {
diff --git a/t/spawn.t b/t/spawn.t
index 5fc99a2a..c22cfcfc 100644
--- a/t/spawn.t
+++ b/t/spawn.t
@@ -140,13 +140,13 @@ EOF
{ # ->CLOSE vs ->DESTROY waitpid caller distinction
my @c;
- my $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } });
+ my $fh = popen_rd(['true'], undef, { cb_arg => [sub { @c = caller }] });
ok(close($fh), '->CLOSE fired and successful');
ok(scalar(@c), 'callback fired by ->CLOSE');
ok(grep(!m[/PublicInbox/DS\.pm\z], @c), 'callback not invoked by DS');
@c = ();
- $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } });
+ $fh = popen_rd(['true'], undef, { cb_arg => [sub { @c = caller }] });
undef $fh; # ->DESTROY
ok(scalar(@c), 'callback fired by ->DESTROY');
ok(grep(!m[/PublicInbox/ProcessPipe\.pm\z], @c),
@@ -156,8 +156,9 @@ EOF
{ # children don't wait on siblings
use POSIX qw(_exit);
pipe(my ($r, $w)) or BAIL_OUT $!;
- my $cb = sub { warn "x=$$\n" };
- my $fh = popen_rd(['cat'], undef, { 0 => $r, cb => $cb });
+ my @arg;
+ my $cb = [ sub { @arg = @_; warn "x=$$\n" }, 'hi' ];
+ my $fh = popen_rd(['cat'], undef, { 0 => $r, cb_arg => $cb });
my $pp = tied *$fh;
my $pid = fork // BAIL_OUT $!;
local $SIG{__WARN__} = sub { _exit(1) };
@@ -173,6 +174,9 @@ EOF
close $w;
close $fh;
is($?, 0, 'cat exited');
+ is(scalar(@arg), 2, 'callback got args');
+ is($arg[1], 'hi', 'passed arg');
+ like($arg[0], qr/\A\d+\z/, 'PID');
is_deeply(\@w, [ "x=$$\n" ], 'callback fired from owner');
}
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 05/12] git|gcf2: switch to awaitpid
2023-01-17 7:18 [PATCH 00/12] improve process reaping Eric Wong
` (3 preceding siblings ...)
2023-01-17 7:19 ` [PATCH 04/12] ds: introduce awaitpid, switch ProcessPipe users Eric Wong
@ 2023-01-17 7:19 ` Eric Wong
2023-01-17 7:19 ` [PATCH 06/12] watch: " Eric Wong
` (6 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2023-01-17 7:19 UTC (permalink / raw)
To: meta
This is a trivial change compared to Qspawn in the previous
commit.
---
lib/PublicInbox/Gcf2Client.pm | 5 +++--
lib/PublicInbox/Git.pm | 10 +++++-----
2 files changed, 8 insertions(+), 7 deletions(-)
diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index 09c3aa06..a49e2aad 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -1,4 +1,4 @@
-# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
# connects public-inbox processes to PublicInbox::Gcf2::loop()
@@ -10,6 +10,7 @@ use PublicInbox::Gcf2; # fails if Inline::C or libgit2-dev isn't available
use PublicInbox::Spawn qw(spawn);
use Socket qw(AF_UNIX SOCK_STREAM);
use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
+use PublicInbox::DS qw(awaitpid);
# fields:
# sock => socket to Gcf2::loop
# The rest of these fields are compatible with what PublicInbox::Git
@@ -30,7 +31,7 @@ sub new {
$rdr->{0} = $rdr->{1} = $s2;
my $cmd = [$^X, qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]];
$self->{'pid.owner'} = $$;
- $self->{pid} = spawn($cmd, $env, $rdr);
+ awaitpid($self->{pid} = spawn($cmd, $env, $rdr), undef);
$s1->blocking(0);
$self->{inflight} = [];
$self->{in} = $s1;
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 96627daa..c9ed48be 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -21,7 +21,7 @@ use PublicInbox::Tmpfile;
use IO::Poll qw(POLLIN);
use Carp qw(croak carp);
use Digest::SHA ();
-use PublicInbox::DS qw(dwaitpid);
+use PublicInbox::DS qw(awaitpid);
our @EXPORT_OK = qw(git_unquote git_quote);
our $PIPE_BUFSIZ = 65536; # Linux default
our $in_cleanup;
@@ -138,7 +138,7 @@ sub _bidi_pipe {
$rdr->{2} = $fh;
}
my ($in_r, $p) = popen_rd(\@cmd, undef, $rdr);
- $self->{$pid} = $p;
+ awaitpid($self->{$pid} = $p, undef);
$self->{"$pid.owner"} = $$;
$out_w->autoflush(1);
if ($^O eq 'linux') { # 1031: F_SETPIPE_SZ
@@ -357,9 +357,9 @@ sub _destroy {
delete @$self{($rbuf, $in, $out)};
delete $self->{$err} if $err; # `err_c'
- # GitAsyncCat::event_step may delete {pid}
- my $p = delete $self->{$pid} or return;
- dwaitpid($p) if $$ == $self->{"$pid.owner"};
+ # GitAsyncCat::event_step may delete {$pid}
+ my $p = delete($self->{$pid}) // return;
+ awaitpid($p) if $$ == $self->{"$pid.owner"};
}
sub async_abort ($) {
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 06/12] watch: switch to awaitpid
2023-01-17 7:18 [PATCH 00/12] improve process reaping Eric Wong
` (4 preceding siblings ...)
2023-01-17 7:19 ` [PATCH 05/12] git|gcf2: switch to awaitpid Eric Wong
@ 2023-01-17 7:19 ` Eric Wong
2023-01-17 7:19 ` [PATCH 07/12] watch: simplify internal data structures Eric Wong
` (5 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2023-01-17 7:19 UTC (permalink / raw)
To: meta
-watch relies on our event_loop anyways, and awaitpid lets us
avoid the extra overhead of EOFpipe. Add an extra {quit} check
in imap_idle_fork while we're at it.
---
lib/PublicInbox/Watch.pm | 48 +++++++++++++---------------------------
1 file changed, 15 insertions(+), 33 deletions(-)
diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm
index 082ecfb9..57985083 100644
--- a/lib/PublicInbox/Watch.pm
+++ b/lib/PublicInbox/Watch.pm
@@ -1,4 +1,4 @@
-# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
#
# ref: https://cr.yp.to/proto/maildir.html
@@ -12,10 +12,9 @@ use PublicInbox::MdirReader;
use PublicInbox::NetReader;
use PublicInbox::Filter::Base qw(REJECT);
use PublicInbox::Spamcheck;
-use PublicInbox::DS qw(now add_timer);
+use PublicInbox::DS qw(now add_timer awaitpid);
use PublicInbox::MID qw(mids);
use PublicInbox::ContentHash qw(content_hash);
-use PublicInbox::EOFpipe;
use POSIX qw(_exit WNOHANG);
sub compile_watchheaders ($) {
@@ -244,14 +243,13 @@ sub quit_done ($) {
return unless $self->{quit};
# don't have reliable wakeups, keep signalling
- my $done = 1;
+ my $live = 0;
for (qw(idle_pids poll_pids)) {
my $pids = $self->{$_} or next;
- for (keys %$pids) {
- $done = undef if kill('QUIT', $_);
- }
+ $live += grep { kill('QUIT', $_) } keys %$pids;
}
- $done;
+ add_timer(0.01, \&quit_done, $self) if $live;
+ $live == 0;
}
sub quit {
@@ -400,8 +398,8 @@ sub imap_idle_requeue { # DS::add_timer callback
event_step($self);
}
-sub imap_idle_reap { # PublicInbox::DS::dwaitpid callback
- my ($self, $pid) = @_;
+sub imap_idle_reap { # awaitpid callback
+ my ($pid, $self) = @_;
my $uri_intvl = delete $self->{idle_pids}->{$pid} or
die "BUG: PID=$pid (unknown) reaped: \$?=$?\n";
@@ -411,33 +409,21 @@ sub imap_idle_reap { # PublicInbox::DS::dwaitpid callback
add_timer(60, \&imap_idle_requeue, $self, $uri_intvl);
}
-sub reap { # callback for EOFpipe
- my ($pid, $cb, $self) = @{$_[0]};
- my $ret = waitpid($pid, 0);
- if ($ret == $pid) {
- $cb->($self, $pid); # poll_fetch_reap || imap_idle_reap
- } else {
- warn "W: waitpid($pid) => ", $ret // "($!)", "\n";
- }
-}
-
sub imap_idle_fork ($$) {
my ($self, $uri_intvl) = @_;
+ return if $self->{quit};
my ($uri, $intvl) = @$uri_intvl;
- pipe(my ($r, $w)) or die "pipe: $!";
my $seed = rand(0xffffffff);
my $pid = fork // die "fork: $!";
if ($pid == 0) {
srand($seed);
eval { Net::SSLeay::randomize() };
- close $r;
watch_atfork_child($self);
watch_imap_idle_1($self, $uri, $intvl);
- close $w;
_exit(0);
}
$self->{idle_pids}->{$pid} = $uri_intvl;
- PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&imap_idle_reap, $self]);
+ awaitpid($pid, \&imap_idle_reap, $self);
}
sub event_step {
@@ -486,30 +472,26 @@ sub watch_nntp_fetch_all ($$) {
sub poll_fetch_fork { # DS::add_timer callback
my ($self, $intvl, $uris) = @_;
return if $self->{quit};
- pipe(my ($r, $w)) or die "pipe: $!";
watch_atfork_parent($self);
my $seed = rand(0xffffffff);
- my $pid = fork;
- if (defined($pid) && $pid == 0) {
+ my $pid = fork // die "fork: $!";
+ if ($pid == 0) {
srand($seed);
eval { Net::SSLeay::randomize() };
- close $r;
watch_atfork_child($self);
if ($uris->[0]->scheme =~ m!\Aimaps?!i) {
watch_imap_fetch_all($self, $uris);
} else {
watch_nntp_fetch_all($self, $uris);
}
- close $w;
_exit(0);
}
- die "fork: $!" unless defined $pid;
$self->{poll_pids}->{$pid} = [ $intvl, $uris ];
- PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&poll_fetch_reap, $self]);
+ awaitpid($pid, \&poll_fetch_reap, $self);
}
-sub poll_fetch_reap {
- my ($self, $pid) = @_;
+sub poll_fetch_reap { # awaitpid callback
+ my ($pid, $self) = @_;
my $intvl_uris = delete $self->{poll_pids}->{$pid} or
die "BUG: PID=$pid (unknown) reaped: \$?=$?\n";
return if $self->{quit};
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 07/12] watch: simplify internal data structures
2023-01-17 7:18 [PATCH 00/12] improve process reaping Eric Wong
` (5 preceding siblings ...)
2023-01-17 7:19 ` [PATCH 06/12] watch: " Eric Wong
@ 2023-01-17 7:19 ` Eric Wong
2023-01-17 7:19 ` [PATCH 08/12] eofpipe: drop {arg} support for now Eric Wong
` (4 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2023-01-17 7:19 UTC (permalink / raw)
To: meta
We can flatten arrays and avoid distinguishing between PID
types now that more of that logic and argument passing logic
is offloaded to awaitpid.
---
lib/PublicInbox/Watch.pm | 49 ++++++++++++++++------------------------
1 file changed, 19 insertions(+), 30 deletions(-)
diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm
index 57985083..66b0c8b1 100644
--- a/lib/PublicInbox/Watch.pm
+++ b/lib/PublicInbox/Watch.pm
@@ -243,11 +243,7 @@ sub quit_done ($) {
return unless $self->{quit};
# don't have reliable wakeups, keep signalling
- my $live = 0;
- for (qw(idle_pids poll_pids)) {
- my $pids = $self->{$_} or next;
- $live += grep { kill('QUIT', $_) } keys %$pids;
- }
+ my $live = grep { kill('QUIT', $_) } keys %{$self->{pids}};
add_timer(0.01, \&quit_done, $self) if $live;
$live == 0;
}
@@ -379,8 +375,7 @@ sub watch_imap_idle_1 ($$$) {
sub watch_atfork_child ($) {
my ($self) = @_;
- delete $self->{idle_pids};
- delete $self->{poll_pids};
+ delete $self->{pids};
delete $self->{opendirs};
PublicInbox::DS->Reset;
my $sig = delete $self->{sig};
@@ -392,27 +387,23 @@ sub watch_atfork_child ($) {
sub watch_atfork_parent ($) { _done_for_now($_[0]) }
sub imap_idle_requeue { # DS::add_timer callback
- my ($self, $uri_intvl) = @_;
+ my ($self, $uri, $intvl) = @_;
return if $self->{quit};
- push @{$self->{idle_todo}}, $uri_intvl;
+ push @{$self->{idle_todo}}, $uri, $intvl;
event_step($self);
}
sub imap_idle_reap { # awaitpid callback
- my ($pid, $self) = @_;
- my $uri_intvl = delete $self->{idle_pids}->{$pid} or
- die "BUG: PID=$pid (unknown) reaped: \$?=$?\n";
-
- my ($uri, $intvl) = @$uri_intvl;
+ my ($pid, $self, $uri, $intvl) = @_;
+ delete $self->{pids}->{$pid};
return if $self->{quit};
warn "W: PID=$pid on $uri died: \$?=$?\n" if $?;
- add_timer(60, \&imap_idle_requeue, $self, $uri_intvl);
+ add_timer(60, \&imap_idle_requeue, $self, $uri, $intvl);
}
-sub imap_idle_fork ($$) {
- my ($self, $uri_intvl) = @_;
+sub imap_idle_fork {
+ my ($self, $uri, $intvl) = @_;
return if $self->{quit};
- my ($uri, $intvl) = @$uri_intvl;
my $seed = rand(0xffffffff);
my $pid = fork // die "fork: $!";
if ($pid == 0) {
@@ -422,8 +413,8 @@ sub imap_idle_fork ($$) {
watch_imap_idle_1($self, $uri, $intvl);
_exit(0);
}
- $self->{idle_pids}->{$pid} = $uri_intvl;
- awaitpid($pid, \&imap_idle_reap, $self);
+ $self->{pids}->{$pid} = undef;
+ awaitpid($pid, \&imap_idle_reap, $self, $uri, $intvl);
}
sub event_step {
@@ -433,8 +424,8 @@ sub event_step {
if ($idle_todo && @$idle_todo) {
watch_atfork_parent($self);
eval {
- while (my $uri_intvl = shift(@$idle_todo)) {
- imap_idle_fork($self, $uri_intvl);
+ while (my ($uri, $intvl) = splice(@$idle_todo, 0, 2)) {
+ imap_idle_fork($self, $uri, $intvl);
}
};
die $@ if $@;
@@ -486,16 +477,14 @@ sub poll_fetch_fork { # DS::add_timer callback
}
_exit(0);
}
- $self->{poll_pids}->{$pid} = [ $intvl, $uris ];
- awaitpid($pid, \&poll_fetch_reap, $self);
+ $self->{pids}->{$pid} = undef;
+ awaitpid($pid, \&poll_fetch_reap, $self, $intvl, $uris);
}
sub poll_fetch_reap { # awaitpid callback
- my ($pid, $self) = @_;
- my $intvl_uris = delete $self->{poll_pids}->{$pid} or
- die "BUG: PID=$pid (unknown) reaped: \$?=$?\n";
+ my ($pid, $self, $intvl, $uris) = @_;
+ delete $self->{pids}->{$pid};
return if $self->{quit};
- my ($intvl, $uris) = @$intvl_uris;
if ($?) {
warn "W: PID=$pid died: \$?=$?\n", map { "$_\n" } @$uris;
}
@@ -506,14 +495,14 @@ sub poll_fetch_reap { # awaitpid callback
sub watch_imap_init ($$) {
my ($self, $poll) = @_;
my $mics = PublicInbox::NetReader::imap_common_init($self);
- my $idle = []; # [ [ uri1, intvl1 ], [uri2, intvl2] ]
+ my $idle = []; # [ uri1, intvl1, uri2, intvl2 ]
for my $uri (@{$self->{imap_order}}) {
my $sec = uri_section($uri);
my $mic = $mics->{$sec};
my $intvl = $self->{cfg_opt}->{$sec}->{pollInterval};
if ($mic->has_capability('IDLE') && !$intvl) {
$intvl = $self->{cfg_opt}->{$sec}->{idleInterval};
- push @$idle, [ $uri, $intvl // () ];
+ push @$idle, $uri, $intvl;
} else {
push @{$poll->{$intvl || 120}}, $uri;
}
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 08/12] eofpipe: drop {arg} support for now
2023-01-17 7:18 [PATCH 00/12] improve process reaping Eric Wong
` (6 preceding siblings ...)
2023-01-17 7:19 ` [PATCH 07/12] watch: simplify internal data structures Eric Wong
@ 2023-01-17 7:19 ` Eric Wong
2023-01-17 7:19 ` [PATCH 09/12] watch: IMAP and NNTP polling can use the same interval Eric Wong
` (3 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2023-01-17 7:19 UTC (permalink / raw)
To: meta
The only user of EOFpipe has no args, so avoid wasting a hash
slot on it. If we need it again in the future, EOFpipe will
allow an array of args, instead.
---
lib/PublicInbox/Daemon.pm | 2 +-
lib/PublicInbox/EOFpipe.pm | 10 +++++-----
2 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index ee746f05..17e799ca 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -693,7 +693,7 @@ sub daemon_loop ($) {
if ($worker_processes > 0) {
$refresh->(); # preload by default
my $fh = master_loop(); # returns if in child process
- PublicInbox::EOFpipe->new($fh, \&worker_quit, undef);
+ PublicInbox::EOFpipe->new($fh, \&worker_quit);
} else {
reopen_logs();
$set_user->() if $set_user;
diff --git a/lib/PublicInbox/EOFpipe.pm b/lib/PublicInbox/EOFpipe.pm
index e537e2aa..628e9366 100644
--- a/lib/PublicInbox/EOFpipe.pm
+++ b/lib/PublicInbox/EOFpipe.pm
@@ -1,14 +1,14 @@
-# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
package PublicInbox::EOFpipe;
-use strict;
+use v5.12;
use parent qw(PublicInbox::DS);
use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
sub new {
- my (undef, $rd, $cb, $arg) = @_;
- my $self = bless { cb => $cb, arg => $arg }, __PACKAGE__;
+ my (undef, $rd, $cb) = @_;
+ my $self = bless { cb => $cb }, __PACKAGE__;
# 1031: F_SETPIPE_SZ, 4096: page size
fcntl($rd, 1031, 4096) if $^O eq 'linux';
$self->SUPER::new($rd, EPOLLIN|EPOLLONESHOT);
@@ -17,7 +17,7 @@ sub new {
sub event_step {
my ($self) = @_;
if ($self->do_read(my $buf, 1) == 0) { # auto-closed
- $self->{cb}->($self->{arg});
+ $self->{cb}->();
}
}
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 09/12] watch: IMAP and NNTP polling can use the same interval
2023-01-17 7:18 [PATCH 00/12] improve process reaping Eric Wong
` (7 preceding siblings ...)
2023-01-17 7:19 ` [PATCH 08/12] eofpipe: drop {arg} support for now Eric Wong
@ 2023-01-17 7:19 ` Eric Wong
2023-01-17 7:19 ` [PATCH 10/12] ipc: drop unused $args from ->ipc_worker_stop Eric Wong
` (2 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2023-01-17 7:19 UTC (permalink / raw)
To: meta
An obvious error :x
---
lib/PublicInbox/Watch.pm | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm
index 66b0c8b1..90d82d21 100644
--- a/lib/PublicInbox/Watch.pm
+++ b/lib/PublicInbox/Watch.pm
@@ -464,17 +464,18 @@ sub poll_fetch_fork { # DS::add_timer callback
my ($self, $intvl, $uris) = @_;
return if $self->{quit};
watch_atfork_parent($self);
+ my @nntp;
+ my @imap = grep { # push() always returns > 0
+ $_->scheme =~ m!\Aimaps?!i ? 1 : (push(@nntp, $_) < 0)
+ } @$uris;
my $seed = rand(0xffffffff);
my $pid = fork // die "fork: $!";
if ($pid == 0) {
srand($seed);
eval { Net::SSLeay::randomize() };
watch_atfork_child($self);
- if ($uris->[0]->scheme =~ m!\Aimaps?!i) {
- watch_imap_fetch_all($self, $uris);
- } else {
- watch_nntp_fetch_all($self, $uris);
- }
+ watch_imap_fetch_all($self, \@imap) if @imap;
+ watch_nntp_fetch_all($self, \@nntp) if @nntp;
_exit(0);
}
$self->{pids}->{$pid} = undef;
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 10/12] ipc: drop unused $args from ->ipc_worker_stop
2023-01-17 7:18 [PATCH 00/12] improve process reaping Eric Wong
` (8 preceding siblings ...)
2023-01-17 7:19 ` [PATCH 09/12] watch: IMAP and NNTP polling can use the same interval Eric Wong
@ 2023-01-17 7:19 ` Eric Wong
2023-01-17 7:19 ` [PATCH 11/12] ipc+lei: switch to awaitpid Eric Wong
2023-01-17 7:19 ` [PATCH 12/12] ds: drop dwaitpid, switch to waitpid(-1) Eric Wong
11 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2023-01-17 7:19 UTC (permalink / raw)
To: meta
It's not used anywhere, and simplifies the next commit.
---
lib/PublicInbox/IPC.pm | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 671ad5d5..34e40118 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -169,7 +169,7 @@ sub ipc_atfork_child {
# idempotent, can be called regardless of whether worker is active or not
sub ipc_worker_stop {
- my ($self, $args) = @_;
+ my ($self) = @_;
my ($pid, $ppid) = delete(@$self{qw(-ipc_pid -ipc_ppid)});
my ($w_req, $r_res) = delete(@$self{qw(-ipc_req -ipc_res)});
if (!$w_req && !$r_res) {
@@ -180,7 +180,7 @@ sub ipc_worker_stop {
$w_req = $r_res = undef;
return if $$ != $ppid;
- dwaitpid($pid, \&ipc_worker_reap, [$self, $args]);
+ dwaitpid($pid, \&ipc_worker_reap, [$self]);
}
# use this if we have multiple readers reading curl or "pigz -dc"
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 11/12] ipc+lei: switch to awaitpid
2023-01-17 7:18 [PATCH 00/12] improve process reaping Eric Wong
` (9 preceding siblings ...)
2023-01-17 7:19 ` [PATCH 10/12] ipc: drop unused $args from ->ipc_worker_stop Eric Wong
@ 2023-01-17 7:19 ` Eric Wong
2023-01-17 7:19 ` [PATCH 12/12] ds: drop dwaitpid, switch to waitpid(-1) Eric Wong
11 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2023-01-17 7:19 UTC (permalink / raw)
To: meta
This avoids awkwardly stuffing an arrayref into callbacks
which expect multiple arguments. IPC->awaitpid_init now
allows pre-registering callbacks before spawning workers.
---
lib/PublicInbox/IPC.pm | 30 ++++++++++++++----------------
lib/PublicInbox/LEI.pm | 8 +++-----
lib/PublicInbox/LeiConvert.pm | 2 +-
lib/PublicInbox/LeiInput.pm | 2 +-
lib/PublicInbox/LeiMirror.pm | 7 +++----
lib/PublicInbox/LeiStore.pm | 7 +++----
lib/PublicInbox/LeiToMail.pm | 7 +++----
lib/PublicInbox/LeiUp.pm | 5 ++---
lib/PublicInbox/LeiXSearch.pm | 9 ++++-----
script/public-inbox-clone | 2 +-
10 files changed, 35 insertions(+), 44 deletions(-)
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 34e40118..edc5ba64 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -12,7 +12,7 @@ use strict;
use v5.10.1;
use parent qw(Exporter);
use Carp qw(croak);
-use PublicInbox::DS qw(dwaitpid);
+use PublicInbox::DS qw(awaitpid);
use PublicInbox::Spawn;
use PublicInbox::OnDestroy;
use PublicInbox::WQWorker;
@@ -133,26 +133,26 @@ sub ipc_worker_spawn {
$self->{-ipc_req} = $w_req;
$self->{-ipc_res} = $r_res;
$self->{-ipc_ppid} = $$;
+ awaitpid($pid, \&ipc_worker_reap, $self);
$self->{-ipc_pid} = $pid;
}
-sub ipc_worker_reap { # dwaitpid callback
- my ($args, $pid) = @_;
- my ($self, @uargs) = @$args;
+sub ipc_worker_reap { # awaitpid callback
+ my ($pid, $self) = @_;
delete $self->{-wq_workers}->{$pid};
- return $self->{-reap_do}->($args, $pid) if $self->{-reap_do};
+ if (my $cb_args = $self->{-reap_do}) {
+ return $cb_args->[0]->($pid, $self, @$cb_args[1..$#$cb_args]);
+ }
return if !$?;
my $s = $? & 127;
# TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
warn "$self->{-wq_ident} PID:$pid died \$?=$?\n" if $s != 15 && $s != 13
}
-sub wq_wait_async {
- my ($self, $cb, @uargs) = @_;
- local $PublicInbox::DS::in_loop = 1;
- $self->{-reap_do} = $cb;
- my @pids = keys %{$self->{-wq_workers}};
- dwaitpid($_, \&ipc_worker_reap, [ $self, @uargs ]) for @pids;
+# register wait workers
+sub awaitpid_init {
+ my ($self, @cb_args) = @_;
+ $self->{-reap_do} = \@cb_args;
}
# for base class, override in sub classes
@@ -178,9 +178,7 @@ sub ipc_worker_stop {
}
die 'no PID with IPC pipes' unless $pid;
$w_req = $r_res = undef;
-
- return if $$ != $ppid;
- dwaitpid($pid, \&ipc_worker_reap, [$self]);
+ awaitpid($pid) if $$ == $ppid; # for non-event loop
}
# use this if we have multiple readers reading curl or "pigz -dc"
@@ -397,6 +395,7 @@ sub _wq_worker_start ($$$$) {
undef $end; # trigger exit
} else {
$self->{-wq_workers}->{$pid} = $bcast1;
+ awaitpid($pid, \&ipc_worker_reap, $self);
}
}
@@ -428,8 +427,7 @@ sub wq_close {
}
delete @$self{qw(-wq_s1 -wq_s2)} or return;
return if $self->{-reap_do};
- my @pids = keys %{$self->{-wq_workers}};
- dwaitpid($_, \&ipc_worker_reap, [ $self ]) for @pids;
+ awaitpid($_) for keys %{$self->{-wq_workers}};
}
sub wq_kill {
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index b78d70de..6ad42111 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -18,7 +18,6 @@ use IO::Handle ();
use Fcntl qw(SEEK_SET);
use PublicInbox::Config;
use PublicInbox::Syscall qw(EPOLLIN);
-use PublicInbox::DS qw(dwaitpid);
use PublicInbox::Spawn qw(spawn popen_rd);
use PublicInbox::Lock;
use PublicInbox::Eml;
@@ -644,12 +643,12 @@ sub workers_start {
my $end = $lei->pkt_op_pair;
my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
$flds->{lei} = $lei;
+ $wq->awaitpid_init($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
$wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
delete $lei->{pkt_op_p};
my $op_c = delete $lei->{pkt_op_c};
@$end = ();
$lei->event_step_init;
- $wq->wq_wait_async($wq->can('_wq_done_wait') // \&wq_done_wait, $lei);
($op_c, $ops);
}
@@ -1391,9 +1390,8 @@ sub DESTROY {
# preserve $? for ->fail or ->x_it code
}
-sub wq_done_wait { # dwaitpid callback
- my ($arg, $pid) = @_;
- my ($wq, $lei) = @$arg;
+sub wq_done_wait { # awaitpid cb (via wq_eof / IPC->awaitpid_init)
+ my ($pid, $wq, $lei) = @_;
local $current_lei = $lei;
my $err_type = $lei->{-err_type};
$? and $lei->child_error($?,
diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm
index 59af40de..1acd4558 100644
--- a/lib/PublicInbox/LeiConvert.pm
+++ b/lib/PublicInbox/LeiConvert.pm
@@ -30,7 +30,7 @@ sub input_maildir_cb {
sub process_inputs { # via wq_do
my ($self) = @_;
- local $PublicInbox::DS::in_loop = 0; # force synchronous dwaitpid
+ local $PublicInbox::DS::in_loop = 0; # force synchronous awaitpid
$self->SUPER::process_inputs;
my $lei = $self->{lei};
delete $lei->{1};
diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm
index a1dcc907..c258f824 100644
--- a/lib/PublicInbox/LeiInput.pm
+++ b/lib/PublicInbox/LeiInput.pm
@@ -177,7 +177,7 @@ sub input_path_url {
$mbl->{fh} =
PublicInbox::MboxReader::zsfxcat($in, $zsfx, $lei);
}
- local $PublicInbox::DS::in_loop = 0 if $zsfx; # dwaitpid
+ local $PublicInbox::DS::in_loop = 0 if $zsfx; # awaitpid
$self->input_fh($ifmt, $mbl->{fh}, $input, @args);
} elsif (-d _ && (-d "$input/cur" || -d "$input/new")) {
return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir';
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index 87abf88c..abf66315 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -31,9 +31,8 @@ sub keep_going ($) {
$_[0]->{lei}->{opt}->{'keep-going'});
}
-sub _wq_done_wait { # dwaitpid callback (via wq_eof)
- my ($arg, $pid) = @_;
- my ($mrr, $lei) = @$arg;
+sub _wq_done_wait { # awaitpid cb (via wq_eof / IPC->awaitpid_init)
+ my ($pid, $mrr, $lei) = @_;
if ($?) {
$lei->child_error($?);
} elsif (!$lei->{child_error}) {
@@ -236,7 +235,7 @@ sub index_cloned_inbox {
my ($k) = ($sw =~ /\A([\w-]+)/);
$opt->{$k} = $lei->{opt}->{$k};
}
- # force synchronous dwaitpid for v2:
+ # force synchronous awaitpid for v2:
local $PublicInbox::DS::in_loop = 0;
my $cfg = PublicInbox::Config->new(undef, $lei->{2});
my $env = PublicInbox::Admin::index_prepare($opt, $cfg);
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 57f0e013..0ecf1388 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -604,9 +604,8 @@ sub recv_and_run {
$self->SUPER::recv_and_run(@args);
}
-sub _sto_atexit { # dwaitpid callback
- my ($args, $pid) = @_;
- my $self = $args->[0];
+sub _sto_atexit { # awaitpid cb (via awaitpid_init)
+ my ($pid, $sto) = @_;
warn "lei/store PID:$pid died \$?=$?\n" if $?;
}
@@ -621,12 +620,12 @@ sub write_prepare {
# Mail we import into lei are private, so headers filtered out
# by -mda for public mail are not appropriate
local @PublicInbox::MDA::BAD_HEADERS = ();
+ $self->awaitpid_init(\&_sto_atexit); # outlives $lei
$self->wq_workers_start("lei/store $dir", 1, $lei->oldset, {
lei => $lei,
-err_wr => $w,
to_close => [ $r ],
});
- $self->wq_wait_async(\&_sto_atexit); # outlives $lei
require PublicInbox::LeiStoreErr;
PublicInbox::LeiStoreErr->new($r, $lei);
}
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 1528165a..6a4554e7 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -652,9 +652,8 @@ sub _do_augment_mbox {
$dedupe->pause_dedupe if $dedupe;
}
-sub v2w_done_wait { # dwaitpid callback
- my ($arg, $pid) = @_;
- my ($v2w, $lei) = @$arg;
+sub v2w_done_wait { # awaitpid cb (via awaitpid_init)
+ my ($pid, $v2w, $lei) = @_;
$lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?;
}
@@ -680,8 +679,8 @@ sub _pre_augment_v2 {
PublicInbox::InboxWritable->new($ibx, @creat);
$ibx->init_inbox if @creat;
my $v2w = $ibx->importer;
+ $v2w->awaitpid_init(\&v2w_done_wait, $lei);
$v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei});
- $v2w->wq_wait_async(\&v2w_done_wait, $lei);
$lei->{v2w} = $v2w;
return if !$lei->{opt}->{shared};
my $d = "$lei->{ale}->{git}->{git_dir}/objects";
diff --git a/lib/PublicInbox/LeiUp.pm b/lib/PublicInbox/LeiUp.pm
index 49917339..3e92242e 100644
--- a/lib/PublicInbox/LeiUp.pm
+++ b/lib/PublicInbox/LeiUp.pm
@@ -165,9 +165,8 @@ sub _complete_up { # lei__complete hook
map { $match_cb->($_) } PublicInbox::LeiSavedSearch::list($lei);
}
-sub _wq_done_wait { # dwaitpid callback
- my ($arg, $pid) = @_;
- my ($wq, $lei) = @$arg;
+sub _wq_done_wait { # awaitpid cb (via awaitpid_init)
+ my ($pid, $wq, $lei) = @_;
$lei->child_error($?, 'auth failure') if $?
}
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 730df1f7..f9aa870e 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -400,9 +400,8 @@ sub query_remote_mboxrd {
sub git { $_[0]->{git} // die 'BUG: git uninitialized' }
-sub xsearch_done_wait { # dwaitpid callback
- my ($arg, $pid) = @_;
- my ($wq, $lei) = @$arg;
+sub xsearch_done_wait { # awaitpid cb (via awaitpid_init)
+ my ($pid, $wq, $lei) = @_;
return if !$?;
my $s = $? & 127;
return $lei->child_error($?) if $s == 13 || $s == 15;
@@ -573,16 +572,16 @@ sub do_query {
fcntl($b_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
$l2m->{au_peers} = [ $a_r, $a_w, $b_r, $b_w ];
}
+ $l2m->awaitpid_init(\&xsearch_done_wait, $lei);
$l2m->wq_workers_start('lei2mail', undef,
$lei->oldset, { lei => $lei });
- $l2m->wq_wait_async(\&xsearch_done_wait, $lei);
pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
delete $l2m->{au_peers};
}
+ $self->awaitpid_init(\&xsearch_done_wait, $lei);
$self->wq_workers_start('lei_xsearch', undef,
$lei->oldset, { lei => $lei });
- $self->wq_wait_async(\&xsearch_done_wait, $lei);
my $op_c = delete $lei->{pkt_op_c};
delete $lei->{pkt_op_p};
@$end = ();
diff --git a/script/public-inbox-clone b/script/public-inbox-clone
index e93ac37b..598979bc 100755
--- a/script/public-inbox-clone
+++ b/script/public-inbox-clone
@@ -62,5 +62,5 @@ my $mrr = bless {
$? = 0;
$mrr->do_mirror;
-$mrr->can('_wq_done_wait')->([$mrr, $lei], $$);
+$mrr->can('_wq_done_wait')->($$, $mrr, $lei);
exit(($lei->{child_error} // 0) >> 8);
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 12/12] ds: drop dwaitpid, switch to waitpid(-1)
2023-01-17 7:18 [PATCH 00/12] improve process reaping Eric Wong
` (10 preceding siblings ...)
2023-01-17 7:19 ` [PATCH 11/12] ipc+lei: switch to awaitpid Eric Wong
@ 2023-01-17 7:19 ` Eric Wong
11 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2023-01-17 7:19 UTC (permalink / raw)
To: meta
With no remaining users, we can drop dwaitpid and switch
awaitpid to rely on waitpid(-1) to save syscalls.
---
Documentation/technical/ds.txt | 2 +-
lib/PublicInbox/DS.pm | 68 +++++++---------------------------
2 files changed, 15 insertions(+), 55 deletions(-)
diff --git a/Documentation/technical/ds.txt b/Documentation/technical/ds.txt
index 5a1655a1..89cc05af 100644
--- a/Documentation/technical/ds.txt
+++ b/Documentation/technical/ds.txt
@@ -81,7 +81,7 @@ New features
* IO::Socket::SSL support (for NNTPS, STARTTLS+NNTP, HTTPS)
-* dwaitpid (waitpid wrapper) support for reaping dead children
+* awaitpid (waitpid wrapper) support for reaping dead children
* reliable signal wakeups are supported via signalfd on Linux,
EVFILT_SIGNAL on *BSDs via IO::KQueue.
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 9563a1cb..c849f515 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -32,11 +32,10 @@ use PublicInbox::Syscall qw(:epoll);
use PublicInbox::Tmpfile;
use Errno qw(EAGAIN EINVAL);
use Carp qw(carp croak);
-our @EXPORT_OK = qw(now msg_more dwaitpid awaitpid add_timer add_uniq_timer);
+our @EXPORT_OK = qw(now msg_more awaitpid add_timer add_uniq_timer);
my %Stack;
my $nextq; # queue for next_tick
-my $wait_pids; # list of [ pid, callback, callback_arg ]
my $AWAIT_PIDS; # pid => [ $callback, @args ]
my $reap_armed;
my $ToClose; # sockets to close when event loop is done
@@ -75,11 +74,11 @@ sub Reset {
# we may be iterating inside one of these on our stack
my @q = delete @Stack{keys %Stack};
for my $q (@q) { @$q = () }
- $AWAIT_PIDS = $wait_pids = $nextq = $ToClose = undef;
+ $AWAIT_PIDS = $nextq = $ToClose = undef;
$ep_io = undef; # closes real $Epoll FD
$Epoll = undef; # may call DSKQXS::DESTROY
- } while (@Timers || keys(%Stack) || $nextq || $wait_pids ||
- $ToClose || keys(%DescriptorMap) || $AWAIT_PIDS ||
+ } while (@Timers || keys(%Stack) || $nextq || $AWAIT_PIDS ||
+ $ToClose || keys(%DescriptorMap) ||
$PostLoopCallback || keys(%UniqTimer));
$reap_armed = undef;
@@ -209,43 +208,23 @@ sub await_cb ($;@) {
warn "E: awaitpid($pid): $@" if $@;
}
-# We can't use waitpid(-1) safely here since it can hit ``, system(),
-# and other things. So we scan the $wait_pids list, which is hopefully
-# not too big. We keep $wait_pids small by not calling dwaitpid()
-# until we've hit EOF when reading the stdout of the child.
-
+# This relies on our Perl process is single-threaded, or at least
+# no threads are spawning and waiting on processes (``, system(), etc...)
+# Threads are officially discouraged by the Perl5 team, and I expect
+# that to remain the case.
sub reap_pids {
$reap_armed = undef;
- my $tmp = $wait_pids // [];
- $wait_pids = undef;
- $Stack{reap_runq} = $tmp;
my $oldset = block_signals();
-
- # old API
- foreach my $ary (@$tmp) {
- my ($pid, $cb, $arg) = @$ary;
- my $ret = waitpid($pid, WNOHANG);
- if ($ret == 0) {
- push @$wait_pids, $ary; # autovivifies @$wait_pids
- } elsif ($ret == $pid) {
- if ($cb) {
- eval { $cb->($arg, $pid) };
- warn "E: dwaitpid($pid) in_loop: $@" if $@;
- }
+ while (1) {
+ my $pid = waitpid(-1, WNOHANG) // last;
+ last if $pid <= 0;
+ if (defined(my $cb_args = delete $AWAIT_PIDS->{$pid})) {
+ await_cb($pid, @$cb_args) if $cb_args;
} else {
- warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?";
+ warn "W: reaped unknown PID=$pid: \$?=$?\n";
}
}
-
- # new API TODO: convert to waitpid(-1) in the future as long
- # as we don't use threads
- for my $pid (keys %$AWAIT_PIDS) {
- my $wpid = waitpid($pid, WNOHANG) // next;
- my $cb_args = delete $AWAIT_PIDS->{$wpid} or next;
- await_cb($pid, @$cb_args);
- }
sig_setmask($oldset);
- delete $Stack{reap_runq};
}
# reentrant SIGCHLD handler (since reap_pids is not reentrant)
@@ -719,25 +698,6 @@ sub long_response ($$;@) {
undef;
}
-sub dwaitpid ($;$$) {
- my ($pid, $cb, $arg) = @_;
- if ($in_loop) {
- push @$wait_pids, [ $pid, $cb, $arg ];
- # We could've just missed our SIGCHLD, cover it, here:
- enqueue_reap();
- } else {
- my $ret = waitpid($pid, 0);
- if ($ret == $pid) {
- if ($cb) {
- eval { $cb->($arg, $pid) };
- carp "E: dwaitpid($pid) !in_loop: $@" if $@;
- }
- } else {
- carp "waitpid($pid, 0) = $ret, \$!=$!, \$?=$?";
- }
- }
-}
-
sub awaitpid {
my ($pid, @cb_args) = @_;
$AWAIT_PIDS->{$pid} //= @cb_args ? \@cb_args : 0;
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 13/12] qspawn: use ->DESTROY to force ->finalize
2023-01-17 7:19 ` [PATCH 04/12] ds: introduce awaitpid, switch ProcessPipe users Eric Wong
@ 2023-01-18 2:10 ` Eric Wong
0 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2023-01-18 2:10 UTC (permalink / raw)
To: meta
There's apparently a few places where we do not call ->finalize
or ->finish and leave dangling limiter slots occupied. I can't
reproduce this easily, so it's likely in error-handling paths.
I already made ->finalize idempotent when switching to awaitpid
since I wanted to rely entirely on DESTROY. However, DESTROY
doesn't always fire soon enough (and the client has already seen
a response), but using DESTROY as a fallback seems reasonable..
This does the minimum to ensure the limiter is freed up on
process exit, but ensuring a finish/finalize call always happens
is the goal.
---
lib/PublicInbox/Qspawn.pm | 2 ++
1 file changed, 2 insertions(+)
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 02357dbf..78afe718 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -104,6 +104,8 @@ sub finalize ($) {
}
}
+sub DESTROY { finalize($_[0]) } # ->finalize is idempotent
+
sub waitpid_err { # callback for awaitpid
my (undef, $self) = @_; # $_[0]: pid
$self->{_err} = ''; # for defined check in ->finish
^ permalink raw reply related [flat|nested] 14+ messages in thread
end of thread, other threads:[~2023-01-18 2:10 UTC | newest]
Thread overview: 14+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2023-01-17 7:18 [PATCH 00/12] improve process reaping Eric Wong
2023-01-17 7:19 ` [PATCH 01/12] ipc: remove {-reap_async} field Eric Wong
2023-01-17 7:19 ` [PATCH 02/12] t/solver_git.t: fix test message Eric Wong
2023-01-17 7:19 ` [PATCH 03/12] qspawn: drop {psgi_env} deref Eric Wong
2023-01-17 7:19 ` [PATCH 04/12] ds: introduce awaitpid, switch ProcessPipe users Eric Wong
2023-01-18 2:10 ` [PATCH 13/12] qspawn: use ->DESTROY to force ->finalize Eric Wong
2023-01-17 7:19 ` [PATCH 05/12] git|gcf2: switch to awaitpid Eric Wong
2023-01-17 7:19 ` [PATCH 06/12] watch: " Eric Wong
2023-01-17 7:19 ` [PATCH 07/12] watch: simplify internal data structures Eric Wong
2023-01-17 7:19 ` [PATCH 08/12] eofpipe: drop {arg} support for now Eric Wong
2023-01-17 7:19 ` [PATCH 09/12] watch: IMAP and NNTP polling can use the same interval Eric Wong
2023-01-17 7:19 ` [PATCH 10/12] ipc: drop unused $args from ->ipc_worker_stop Eric Wong
2023-01-17 7:19 ` [PATCH 11/12] ipc+lei: switch to awaitpid Eric Wong
2023-01-17 7:19 ` [PATCH 12/12] ds: drop dwaitpid, switch to waitpid(-1) Eric Wong
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).