* [PATCH 0/7] more I/O + process reliability and cleanups
@ 2023-11-26 2:10 Eric Wong
2023-11-26 2:10 ` [PATCH 1/7] xap_helper_cxx: do not copy xap_helper.h source Eric Wong
` (6 more replies)
0 siblings, 7 replies; 9+ messages in thread
From: Eric Wong @ 2023-11-26 2:10 UTC (permalink / raw)
To: meta
6/7 ought to fix another hang in t/lei-q-save.t when writing to
v2 outputs.
Much of this stuff will be relevant to code search since Xapian
searches will be moved to C++ (if available) to support features
which aren't usable from Perl bindings and allow more
predictable performance anyways.
Eric Wong (7):
xap_helper_cxx: do not copy xap_helper.h source
xap_client: attach PID to the IO object
xap_client: pass arguments to top-level xap_helper
xap_helper: allow PI_NO_CXX to disable C++ in more places
git: move rbuf handling to PublicInbox::IO
git: improve coupling with {sock} and {inflight} fields
drop redundant calls to DS->Reset
lib/PublicInbox/CodeSearchIdx.pm | 11 +--
lib/PublicInbox/Daemon.pm | 1 -
lib/PublicInbox/Gcf2Client.pm | 7 +-
lib/PublicInbox/Git.pm | 138 ++++++++++++-------------------
lib/PublicInbox/GitAsyncCat.pm | 2 +-
lib/PublicInbox/IO.pm | 70 ++++++++++++++--
lib/PublicInbox/TestCommon.pm | 2 +-
lib/PublicInbox/Watch.pm | 6 +-
lib/PublicInbox/XapClient.pm | 9 +-
lib/PublicInbox/XapHelperCxx.pm | 11 +--
lib/PublicInbox/Xapcmd.pm | 6 +-
t/xap_helper.t | 5 +-
12 files changed, 145 insertions(+), 123 deletions(-)
^ permalink raw reply [flat|nested] 9+ messages in thread
* [PATCH 1/7] xap_helper_cxx: do not copy xap_helper.h source
2023-11-26 2:10 [PATCH 0/7] more I/O + process reliability and cleanups Eric Wong
@ 2023-11-26 2:10 ` Eric Wong
2023-11-26 2:11 ` [PATCH 2/7] xap_client: attach PID to the IO object Eric Wong
` (5 subsequent siblings)
6 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2023-11-26 2:10 UTC (permalink / raw)
To: meta
No need to waste memory bandwidth when we can just rely on
the preprocessor to load the header.
---
lib/PublicInbox/XapHelperCxx.pm | 10 +++-------
1 file changed, 3 insertions(+), 7 deletions(-)
diff --git a/lib/PublicInbox/XapHelperCxx.pm b/lib/PublicInbox/XapHelperCxx.pm
index 9e819546..b1deb665 100644
--- a/lib/PublicInbox/XapHelperCxx.pm
+++ b/lib/PublicInbox/XapHelperCxx.pm
@@ -8,7 +8,7 @@
package PublicInbox::XapHelperCxx;
use v5.12;
use PublicInbox::Spawn qw(run_die run_qx which);
-use PublicInbox::IO qw(read_all try_cat write_file);
+use PublicInbox::IO qw(try_cat write_file);
use PublicInbox::Search;
use Fcntl qw(SEEK_SET);
use Config;
@@ -62,11 +62,7 @@ sub build () {
my ($prog) = ($bin =~ m!/([^/]+)\z!);
my $lk = PublicInbox::Lock->new("$dir/$prog.lock")->lock_for_scope;
open my $fh, '>', "$dir/$prog.cpp";
- for (@srcs) {
- say $fh qq(# line 1 "$_");
- open my $rfh, '<', $_;
- print $fh read_all($rfh);
- }
+ say $fh qq(# include "$_") for @srcs;
print $fh PublicInbox::Search::generate_cxx();
print $fh PublicInbox::CodeSearch::generate_cxx();
close $fh;
@@ -88,7 +84,7 @@ sub build () {
"$1-L$2 -Wl,-rpath=$2$3"/egsx;
my @xflags = split(' ', "$fl $xflags"); # ' ' awk-mode eats leading WS
my @cflags = grep(!/\A-(?:Wl|l|L)/, @xflags);
- run_die([$cxx, '-c', "$prog.cpp", @cflags]);
+ run_die([$cxx, '-c', "$prog.cpp", '-I', $srcpfx, @cflags]);
run_die([$cxx, '-o', "$prog.tmp", "$prog.o", @xflags]);
unlink "$prog.cpp", "$prog.o";
write_file '>', 'XFLAGS.tmp', $xflags, "\n";
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH 2/7] xap_client: attach PID to the IO object
2023-11-26 2:10 [PATCH 0/7] more I/O + process reliability and cleanups Eric Wong
2023-11-26 2:10 ` [PATCH 1/7] xap_helper_cxx: do not copy xap_helper.h source Eric Wong
@ 2023-11-26 2:11 ` Eric Wong
2023-11-26 2:11 ` [PATCH 3/7] xap_client: pass arguments to top-level xap_helper Eric Wong
` (4 subsequent siblings)
6 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2023-11-26 2:11 UTC (permalink / raw)
To: meta
As with our popen_* uses, we can simplify callers by using
attach_pid to handle automatic reaping upon close.
---
lib/PublicInbox/CodeSearchIdx.pm | 10 ++--------
lib/PublicInbox/XapClient.pm | 4 +++-
t/xap_helper.t | 3 +--
3 files changed, 6 insertions(+), 11 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 81ca5cbc..3764f13e 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -516,15 +516,9 @@ sub shard_commit { # via wq_io_do
send($op_p, "shard_done $self->{shard}", 0);
}
-sub start_xhc () {
- my ($xhc, $pid) = PublicInbox::XapClient::start_helper("-j$NPROC");
- awaitpid($pid, \&cmd_done, ['xap_helper', "-j$NPROC"]);
- $xhc;
-}
-
sub dump_roots_start {
my ($self, $do_join) = @_;
- $XHC //= start_xhc;
+ $XHC //= PublicInbox::XapClient::start_helper("-j$NPROC");
$do_join // die 'BUG: no $do_join';
progress($self, 'dumping IDs from coderepos');
local $self->{xdb};
@@ -577,7 +571,7 @@ EOM
sub dump_ibx_start {
my ($self, $do_join) = @_;
- $XHC //= start_xhc;
+ $XHC //= PublicInbox::XapClient::start_helper("-j$NPROC");
my ($sort_opt, $fold_opt);
pipe(local $sort_opt->{0}, $DUMP_IBX_WPIPE);
pipe(local $fold_opt->{0}, local $sort_opt->{1});
diff --git a/lib/PublicInbox/XapClient.pm b/lib/PublicInbox/XapClient.pm
index dda5e044..7737e30d 100644
--- a/lib/PublicInbox/XapClient.pm
+++ b/lib/PublicInbox/XapClient.pm
@@ -41,7 +41,9 @@ sub start_helper {
$cls.'::start(@ARGV)', '--' ];
}
my $pid = spawn($cmd, $env, { 0 => $in });
- ((bless { io => $sock, impl => $cls }, __PACKAGE__), $pid);
+ my $self = bless { io => $sock, impl => $cls }, __PACKAGE__;
+ PublicInbox::IO::attach_pid($sock, $pid);
+ $self;
}
1;
diff --git a/t/xap_helper.t b/t/xap_helper.t
index 9e0b234d..02e5ec7d 100644
--- a/t/xap_helper.t
+++ b/t/xap_helper.t
@@ -173,8 +173,7 @@ my @id2root;
my $ar;
for my $n (@NO_CXX) {
local $ENV{PI_NO_CXX} = $n;
- my ($xhc, $pid) = PublicInbox::XapClient::start_helper('-j0');
- $ar = PublicInbox::AutoReap->new($pid);
+ my $xhc = PublicInbox::XapClient::start_helper('-j0');
pipe(my $err_r, my $err_w);
# git patch-id --stable <t/data/0001.patch | awk '{print $1}'
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH 3/7] xap_client: pass arguments to top-level xap_helper
2023-11-26 2:10 [PATCH 0/7] more I/O + process reliability and cleanups Eric Wong
2023-11-26 2:10 ` [PATCH 1/7] xap_helper_cxx: do not copy xap_helper.h source Eric Wong
2023-11-26 2:11 ` [PATCH 2/7] xap_client: attach PID to the IO object Eric Wong
@ 2023-11-26 2:11 ` Eric Wong
2023-11-26 2:11 ` [PATCH 4/7] xap_helper: allow PI_NO_CXX to disable C++ in more places Eric Wong
` (3 subsequent siblings)
6 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2023-11-26 2:11 UTC (permalink / raw)
To: meta
This ensures our tests actually test the -j0 and -j1 cases
properly.
---
lib/PublicInbox/XapClient.pm | 1 +
1 file changed, 1 insertion(+)
diff --git a/lib/PublicInbox/XapClient.pm b/lib/PublicInbox/XapClient.pm
index 7737e30d..1f9ddccc 100644
--- a/lib/PublicInbox/XapClient.pm
+++ b/lib/PublicInbox/XapClient.pm
@@ -40,6 +40,7 @@ sub start_helper {
$cmd = [$^X, ($^W ? ('-w') : ()), "-M$cls", '-e',
$cls.'::start(@ARGV)', '--' ];
}
+ push @$cmd, @argv;
my $pid = spawn($cmd, $env, { 0 => $in });
my $self = bless { io => $sock, impl => $cls }, __PACKAGE__;
PublicInbox::IO::attach_pid($sock, $pid);
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH 4/7] xap_helper: allow PI_NO_CXX to disable C++ in more places
2023-11-26 2:10 [PATCH 0/7] more I/O + process reliability and cleanups Eric Wong
` (2 preceding siblings ...)
2023-11-26 2:11 ` [PATCH 3/7] xap_client: pass arguments to top-level xap_helper Eric Wong
@ 2023-11-26 2:11 ` Eric Wong
2023-11-26 2:11 ` [PATCH 5/7] git: move rbuf handling to PublicInbox::IO Eric Wong
` (2 subsequent siblings)
6 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2023-11-26 2:11 UTC (permalink / raw)
To: meta
This also reduces repetition in the setup code.
---
lib/PublicInbox/XapClient.pm | 4 +---
lib/PublicInbox/XapHelperCxx.pm | 1 +
t/xap_helper.t | 2 +-
3 files changed, 3 insertions(+), 4 deletions(-)
diff --git a/lib/PublicInbox/XapClient.pm b/lib/PublicInbox/XapClient.pm
index 1f9ddccc..4dcbbe5d 100644
--- a/lib/PublicInbox/XapClient.pm
+++ b/lib/PublicInbox/XapClient.pm
@@ -28,12 +28,10 @@ sub mkreq {
sub start_helper {
my @argv = @_;
socketpair(my $sock, my $in, AF_UNIX, SOCK_SEQPACKET, 0);
- require PublicInbox::XapHelperCxx;
my $cls = 'PublicInbox::XapHelperCxx';
my $env;
- my $cmd = eval { PublicInbox::XapHelperCxx::cmd() };
+ my $cmd = eval "require $cls; ${cls}::cmd()";
if ($@) { # fall back to Perl + XS|SWIG
- require PublicInbox::XapHelper;
$cls = 'PublicInbox::XapHelper';
# ensure the child process has the same @INC we do:
$env = { PERL5LIB => join(':', @INC) };
diff --git a/lib/PublicInbox/XapHelperCxx.pm b/lib/PublicInbox/XapHelperCxx.pm
index b1deb665..f421c7bc 100644
--- a/lib/PublicInbox/XapHelperCxx.pm
+++ b/lib/PublicInbox/XapHelperCxx.pm
@@ -110,6 +110,7 @@ sub check_build () {
# returns spawn arg
sub cmd {
+ die 'PI_NO_CXX set' if $ENV{PI_NO_CXX};
check_build();
my @cmd;
if (my $v = $ENV{VALGRIND}) {
diff --git a/t/xap_helper.t b/t/xap_helper.t
index 02e5ec7d..e3abeded 100644
--- a/t/xap_helper.t
+++ b/t/xap_helper.t
@@ -149,7 +149,7 @@ unless ($ENV{TEST_XH_CXX_ONLY}) {
SKIP: {
require PublicInbox::XapHelperCxx;
my $cmd = eval { PublicInbox::XapHelperCxx::cmd() };
- skip "XapHelperCxx build: $@", 1 if $@ || $ENV{PI_NO_CXX};
+ skip "XapHelperCxx build: $@", 1 if $@;
@NO_CXX = $ENV{TEST_XH_CXX_ONLY} ? (0) : (0, 1);
my $ar = $test->(@$cmd, '-j0');
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH 5/7] git: move rbuf handling to PublicInbox::IO
2023-11-26 2:10 [PATCH 0/7] more I/O + process reliability and cleanups Eric Wong
` (3 preceding siblings ...)
2023-11-26 2:11 ` [PATCH 4/7] xap_helper: allow PI_NO_CXX to disable C++ in more places Eric Wong
@ 2023-11-26 2:11 ` Eric Wong
2023-11-26 2:11 ` [PATCH 6/7] git: improve coupling with {sock} and {inflight} fields Eric Wong
2023-11-26 2:11 ` [PATCH 7/7] drop redundant calls to DS->Reset Eric Wong
6 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2023-11-26 2:11 UTC (permalink / raw)
To: meta
The long-term plan is to share non-blocking read buffering logic
with HTTP/NNTP/IMAP/POP3 and also XapClient.
---
lib/PublicInbox/Gcf2Client.pm | 1 -
lib/PublicInbox/Git.pm | 59 ++++++-----------------------------
lib/PublicInbox/IO.pm | 53 ++++++++++++++++++++++++++++++-
3 files changed, 61 insertions(+), 52 deletions(-)
diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index 48d8c5ac..19d77e32 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -21,7 +21,6 @@ use autodie qw(socketpair);
# pid.owner => process which spawned {pid}
# in => same as {sock}, for compatibility with PublicInbox::Git
# inflight => array (see PublicInbox::Git)
-# rbuf => scalarref, may be non-existent or empty
sub new {
my ($opt) = @_;
my $self = bless {}, __PACKAGE__;
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index bef524aa..93736cf0 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -14,11 +14,11 @@ use autodie qw(socketpair read);
use POSIX ();
use Socket qw(AF_UNIX SOCK_STREAM);
use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
-use Errno qw(EINTR EAGAIN);
+use Errno qw(EAGAIN);
use File::Glob qw(bsd_glob GLOB_NOSORT);
use File::Spec ();
use PublicInbox::Spawn qw(spawn popen_rd run_qx which);
-use PublicInbox::IO qw(poll_in read_all try_cat);
+use PublicInbox::IO qw(read_all try_cat);
use PublicInbox::Tmpfile;
use Carp qw(croak carp);
use PublicInbox::SHA qw(sha_all);
@@ -166,43 +166,6 @@ sub _sock_cmd {
$self->{sock} = PublicInbox::IO::attach_pid($s1, $pid);
}
-sub my_read ($$$) {
- my ($fh, $rbuf, $len) = @_;
- my $left = $len - length($$rbuf);
- my $r;
- while ($left > 0) {
- $r = sysread($fh, $$rbuf, $left, length($$rbuf));
- if ($r) {
- $left -= $r;
- } elsif (defined($r)) { # EOF
- return 0;
- } else {
- next if ($! == EAGAIN and poll_in($fh));
- next if $! == EINTR; # may be set by sysread or poll_in
- return; # unrecoverable error
- }
- }
- my $no_pad = substr($$rbuf, 0, $len, '');
- \$no_pad;
-}
-
-sub my_readline ($$) {
- my ($fh, $rbuf) = @_;
- while (1) {
- if ((my $n = index($$rbuf, "\n")) >= 0) {
- return substr($$rbuf, 0, $n + 1, '');
- }
- my $r = sysread($fh, $$rbuf, 65536, length($$rbuf)) and next;
-
- # return whatever's left on EOF
- return substr($$rbuf, 0, length($$rbuf)+1, '') if defined($r);
-
- next if ($! == EAGAIN and poll_in($fh));
- next if $! == EINTR; # may be set by sysread or poll_in
- return; # unrecoverable error
- }
-}
-
sub cat_async_retry ($$) {
my ($self, $old_inflight) = @_;
@@ -234,16 +197,15 @@ sub cat_async_step ($$) {
my ($self, $inflight) = @_;
die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
my ($req, $cb, $arg) = @$inflight[0, 1, 2];
- my $rbuf = delete($self->{rbuf}) // \(my $new = '');
my ($bref, $oid, $type, $size);
- my $head = my_readline($self->{sock}, $rbuf);
+ my $head = $self->{sock}->my_readline;
my $cmd = ref($req) ? $$req : $req;
# ->fail may be called via Gcf2Client.pm
my $info = $self->{-bc} && substr($cmd, 0, 5) eq 'info ';
if ($head =~ /^([0-9a-f]{40,}) (\S+) ([0-9]+)$/) {
($oid, $type, $size) = ($1, $2, $3 + 0);
unless ($info) { # --batch-command
- $bref = my_read($self->{sock}, $rbuf, $size + 1) or
+ $bref = $self->{sock}->my_bufread($size + 1) or
$self->fail(defined($bref) ?
'read EOF' : "read: $!");
chop($$bref) eq "\n" or
@@ -268,7 +230,6 @@ sub cat_async_step ($$) {
my $err = $! ? " ($!)" : '';
$self->fail("bad result from async cat-file: $head$err");
}
- $self->{rbuf} = $rbuf if $$rbuf ne '';
splice(@$inflight, 0, 3); # don't retry $cb on ->fail
eval { $cb->($bref, $oid, $type, $size, $arg) };
async_err($self, $req, $oid, $@, $info ? 'check' : 'cat') if $@;
@@ -312,17 +273,15 @@ sub check_async_step ($$) {
my ($ck, $inflight) = @_;
die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
my ($req, $cb, $arg) = @$inflight[0, 1, 2];
- my $rbuf = delete($ck->{rbuf}) // \(my $new = '');
- chomp(my $line = my_readline($ck->{sock}, $rbuf));
+ chomp(my $line = $ck->{sock}->my_readline);
my ($hex, $type, $size) = split(/ /, $line);
# git <2.21 would show `dangling' (2.21+ shows `ambiguous')
# https://public-inbox.org/git/20190118033845.s2vlrb3wd3m2jfzu@dcvr/T/
if ($hex eq 'dangling') {
- my $ret = my_read($ck->{sock}, $rbuf, $type + 1);
+ my $ret = $ck->{sock}->my_bufread($type + 1);
$ck->fail(defined($ret) ? 'read EOF' : "read: $!") if !$ret;
}
- $ck->{rbuf} = $rbuf if $$rbuf ne '';
splice(@$inflight, 0, 3); # don't retry $cb on ->fail
eval { $cb->(undef, $hex, $type, $size, $arg) };
async_err($ck, $req, $hex, $@, 'check') if $@;
@@ -643,8 +602,8 @@ sub event_step {
$self->cat_async_step($inflight);
return $self->close unless $self->{sock};
# don't loop here to keep things fair, but we must requeue
- # if there's already-read data in rbuf
- $self->requeue if exists($self->{rbuf});
+ # if there's already-read data in pi_io_rbuf
+ $self->requeue if $self->{sock}->has_rbuf;
}
}
@@ -670,7 +629,7 @@ sub close {
warn "E: (in abort) $req: $@" if $@;
}
}
- delete @$self{qw(-bc err_c inflight rbuf)};
+ delete @$self{qw(-bc err_c inflight)};
delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock});
}
diff --git a/lib/PublicInbox/IO.pm b/lib/PublicInbox/IO.pm
index 63ae3ef4..fcebac59 100644
--- a/lib/PublicInbox/IO.pm
+++ b/lib/PublicInbox/IO.pm
@@ -9,6 +9,7 @@ use PublicInbox::DS qw(awaitpid);
our @EXPORT_OK = qw(poll_in read_all try_cat write_file);
use Carp qw(croak);
use IO::Poll qw(POLLIN);
+use Errno qw(EINTR EAGAIN);
# don't autodie in top-level for Perl 5.16.3 (and maybe newer versions)
# we have our own ->close, so we scope autodie into each sub
@@ -18,7 +19,7 @@ sub waitcb { # awaitpid callback
$cb->($pid, @args) if $cb;
}
-sub attach_pid ($$;@) {
+sub attach_pid {
my ($io, $pid, @cb_arg) = @_;
bless $io, __PACKAGE__;
# we share $err (and not $self) with awaitpid to avoid a ref cycle
@@ -87,4 +88,54 @@ sub try_cat ($) {
read_all $fh;
}
+# TODO: move existing HTTP/IMAP/NNTP/POP3 uses of rbuf here
+sub my_bufread {
+ my ($io, $len) = @_;
+ my $rbuf = ${*$io}{pi_io_rbuf} //= \(my $new = '');
+ my $left = $len - length($$rbuf);
+ my $r;
+ while ($left > 0) {
+ $r = sysread($io, $$rbuf, $left, length($$rbuf));
+ if ($r) {
+ $left -= $r;
+ } elsif (defined($r)) { # EOF
+ return 0;
+ } else {
+ next if ($! == EAGAIN and poll_in($io));
+ next if $! == EINTR; # may be set by sysread or poll_in
+ return; # unrecoverable error
+ }
+ }
+ my $no_pad = substr($$rbuf, 0, $len, '');
+ delete(${*$io}{pi_io_rbuf}) if $$rbuf eq '';
+ \$no_pad;
+}
+
+# always uses "\n"
+sub my_readline {
+ my ($io) = @_;
+ my $rbuf = ${*$io}{pi_io_rbuf} //= \(my $new = '');
+ while (1) {
+ if ((my $n = index($$rbuf, "\n")) >= 0) {
+ my $ret = substr($$rbuf, 0, $n + 1, '');
+ delete(${*$io}{pi_io_rbuf}) if $$rbuf eq '';
+ return $ret;
+ }
+ my $r = sysread($io, $$rbuf, 65536, length($$rbuf));
+ if (!defined($r)) {
+ next if ($! == EAGAIN and poll_in($io));
+ next if $! == EINTR; # may be set by sysread or poll_in
+ return; # unrecoverable error
+ } elsif ($r == 0) { # return whatever's left on EOF
+ delete(${*$io}{pi_io_rbuf});
+ return $$rbuf;
+ } # else { continue
+ }
+}
+
+sub has_rbuf {
+ my ($io) = @_;
+ defined(${*$io}{pi_io_rbuf});
+}
+
1;
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH 6/7] git: improve coupling with {sock} and {inflight} fields
2023-11-26 2:10 [PATCH 0/7] more I/O + process reliability and cleanups Eric Wong
` (4 preceding siblings ...)
2023-11-26 2:11 ` [PATCH 5/7] git: move rbuf handling to PublicInbox::IO Eric Wong
@ 2023-11-26 2:11 ` Eric Wong
2023-11-26 2:11 ` [PATCH 7/7] drop redundant calls to DS->Reset Eric Wong
6 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2023-11-26 2:11 UTC (permalink / raw)
To: meta
While the {inflight} array should be tied to the IO object even
more tightly, that's not an easy task with our current code. So
take some small steps by introducing a gcf_inflight helper to
validate the ownership of the process and to drain the inflight
array via the awaitpid callback.
This hopefully fix problems with t/lei-q-save.t (still) hanging
occasionally on v2 outputs since git->cleanup/->DESTROY was getting
called in v2 shard workers.
---
lib/PublicInbox/Gcf2Client.pm | 6 ++-
lib/PublicInbox/Git.pm | 79 ++++++++++++++++++++--------------
lib/PublicInbox/GitAsyncCat.pm | 2 +-
lib/PublicInbox/IO.pm | 17 ++++++--
4 files changed, 65 insertions(+), 39 deletions(-)
diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index 19d77e32..07ff7dcb 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -31,15 +31,16 @@ sub new {
$opt->{0} = $opt->{1} = $s2;
my $cmd = [$^X, $^W ? ('-w') : (),
qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]];
- PublicInbox::IO::attach_pid($s1, spawn($cmd, $env, $opt));
$self->{inflight} = [];
+ PublicInbox::IO::attach_pid($s1, spawn($cmd, $env, $opt),
+ \&PublicInbox::Git::gcf_drain, $self->{inflight});
$self->{epwatch} = \undef; # for Git->cleanup
$self->SUPER::new($s1, EPOLLIN);
}
sub gcf2_async ($$$;$) {
my ($self, $req, $cb, $arg) = @_;
- my $inflight = $self->{inflight} or return $self->close;
+ my $inflight = $self->gcf_inflight or return;
PublicInbox::Git::write_all($self, $req, \&cat_async_step, $inflight);
push @$inflight, \$req, $cb, $arg; # ref prevents Git.pm retries
}
@@ -49,6 +50,7 @@ sub alternates_changed {}
no warnings 'once';
+*gcf_inflight = \&PublicInbox::Git::gcf_inflight; # for event_step
*cat_async_step = \&PublicInbox::Git::cat_async_step; # for event_step
*event_step = \&PublicInbox::Git::event_step;
*fail = \&PublicInbox::Git::fail;
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 93736cf0..fe834210 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -140,6 +140,18 @@ sub last_check_err {
$buf;
}
+sub gcf_drain { # awaitpid cb
+ my ($pid, $inflight, $bc) = @_;
+ while (@$inflight) {
+ my ($req, $cb, $arg) = splice(@$inflight, 0, 3);
+ $req = $$req if ref($req);
+ $bc and $req =~ s/\A(?:contents|info) //;
+ $req =~ s/ .*//; # drop git_dir for Gcf2Client
+ eval { $cb->(undef, $req, undef, undef, $arg) };
+ warn "E: (in abort) $req: $@" if $@;
+ }
+}
+
sub _sock_cmd {
my ($self, $batch, $err_c) = @_;
$self->{sock} and Carp::confess('BUG: {sock} exists');
@@ -162,8 +174,11 @@ sub _sock_cmd {
$self->{err_c} = $opt->{2} = tmpfile($id, undef, 1) or
$self->fail("tmpfile($id): $!");
}
+ my $inflight = []; # TODO consider moving this into the IO object
my $pid = spawn(\@cmd, undef, $opt);
- $self->{sock} = PublicInbox::IO::attach_pid($s1, $pid);
+ $self->{sock} = PublicInbox::IO::attach_pid($s1, $pid,
+ \&gcf_drain, $inflight, $self->{-bc});
+ $self->{inflight} = $inflight;
}
sub cat_async_retry ($$) {
@@ -171,8 +186,8 @@ sub cat_async_retry ($$) {
# {inflight} may be non-existent, but if it isn't we delete it
# here to prevent cleanup() from waiting:
- delete $self->{inflight};
- cleanup($self);
+ my ($sock, $epwatch) = delete @$self{qw(sock epwatch inflight)};
+ $self->SUPER::close if $epwatch;
my $new_inflight = batch_prepare($self);
while (my ($oid, $cb, $arg) = splice(@$old_inflight, 0, 3)) {
@@ -180,13 +195,25 @@ sub cat_async_retry ($$) {
$oid = \$oid if !@$new_inflight; # to indicate oid retried
push @$new_inflight, $oid, $cb, $arg;
}
+ $sock->close if $sock; # only safe once old_inflight is empty
cat_async_step($self, $new_inflight); # take one step
}
+sub gcf_inflight ($) {
+ my ($self) = @_;
+ if ($self->{sock}) {
+ return $self->{inflight} if $self->{sock}->owner_pid == $$;
+ delete @$self{qw(sock inflight)};
+ } else {
+ $self->close;
+ }
+ undef;
+}
+
# returns true if prefetch is successful
sub async_prefetch {
my ($self, $oid, $cb, $arg) = @_;
- my $inflight = $self->{inflight} or return;
+ my $inflight = gcf_inflight($self) or return;
return if @$inflight;
substr($oid, 0, 0) = 'contents ' if $self->{-bc};
write_all($self, "$oid\n", \&cat_async_step, $inflight);
@@ -195,7 +222,7 @@ sub async_prefetch {
sub cat_async_step ($$) {
my ($self, $inflight) = @_;
- die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
+ croak 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
my ($req, $cb, $arg) = @$inflight[0, 1, 2];
my ($bref, $oid, $type, $size);
my $head = $self->{sock}->my_readline;
@@ -237,11 +264,8 @@ sub cat_async_step ($$) {
sub cat_async_wait ($) {
my ($self) = @_;
- return $self->close if !$self->{sock};
- my $inflight = $self->{inflight} or return;
- while (scalar(@$inflight)) {
- cat_async_step($self, $inflight);
- }
+ my $inflight = gcf_inflight($self) or return;
+ cat_async_step($self, $inflight) while (scalar(@$inflight));
}
sub batch_prepare ($) {
@@ -253,7 +277,6 @@ sub batch_prepare ($) {
} else {
_sock_cmd($self, 'batch');
}
- $self->{inflight} = [];
}
sub _cat_file_cb {
@@ -271,7 +294,7 @@ sub cat_file {
sub check_async_step ($$) {
my ($ck, $inflight) = @_;
- die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
+ croak 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
my ($req, $cb, $arg) = @$inflight[0, 1, 2];
chomp(my $line = $ck->{sock}->my_readline);
my ($hex, $type, $size) = split(/ /, $line);
@@ -291,8 +314,7 @@ sub check_async_wait ($) {
my ($self) = @_;
return cat_async_wait($self) if $self->{-bc};
my $ck = $self->{ck} or return;
- return $ck->close if !$ck->{sock};
- my $inflight = $ck->{inflight} or return;
+ my $inflight = gcf_inflight($ck) or return;
check_async_step($ck, $inflight) while (scalar(@$inflight));
}
@@ -312,7 +334,6 @@ sub check_async_begin ($) {
} else {
_sock_cmd($self = ck($self), 'batch-check', 1);
}
- $self->{inflight} = [];
}
sub write_all {
@@ -337,12 +358,13 @@ sub check_async ($$$$) {
my $inflight;
if ($self->{-bc}) { # likely as time goes on
batch_command:
- $inflight = $self->{inflight} // cat_async_begin($self);
+ $inflight = gcf_inflight($self) // cat_async_begin($self);
substr($oid, 0, 0) = 'info ';
write_all($self, "$oid\n", \&cat_async_step, $inflight);
} else { # accounts for git upgrades while we're running:
my $ck = $self->{ck}; # undef OK, maybe set in check_async_begin
- $inflight = $ck->{inflight} // check_async_begin($self);
+ $inflight = ($ck ? gcf_inflight($ck) : undef)
+ // check_async_begin($self);
goto batch_command if $self->{-bc};
write_all($self->{ck}, "$oid\n", \&check_async_step, $inflight);
}
@@ -417,8 +439,8 @@ sub date_parse {
}
sub _active ($) {
- scalar(@{$_[0]->{inflight} // []}) ||
- ($_[0]->{ck} && scalar(@{$_[0]->{ck}->{inflight} // []}))
+ scalar(@{gcf_inflight($_[0]) // []}) ||
+ ($_[0]->{ck} && scalar(@{gcf_inflight($_[0]->{ck}) // []}))
}
# check_async and cat_async may trigger the other, so ensure they're
@@ -493,13 +515,13 @@ sub pub_urls {
sub cat_async_begin {
my ($self) = @_;
cleanup($self) if $self->alternates_changed;
- die 'BUG: already in async' if $self->{inflight};
+ die 'BUG: already in async' if gcf_inflight($self);
batch_prepare($self);
}
sub cat_async ($$$;$) {
my ($self, $oid, $cb, $arg) = @_;
- my $inflight = $self->{inflight} // cat_async_begin($self);
+ my $inflight = gcf_inflight($self) // cat_async_begin($self);
substr($oid, 0, 0) = 'contents ' if $self->{-bc};
write_all($self, $oid."\n", \&cat_async_step, $inflight);
push(@$inflight, $oid, $cb, $arg);
@@ -596,8 +618,7 @@ sub cleanup_if_unlinked {
sub event_step {
my ($self) = @_;
- $self->close if !$self->{sock}; # process died while requeued
- my $inflight = $self->{inflight};
+ my $inflight = gcf_inflight($self);
if ($inflight && @$inflight) {
$self->cat_async_step($inflight);
return $self->close unless $self->{sock};
@@ -619,18 +640,10 @@ sub watch_async ($) {
sub close {
my ($self) = @_;
- if (my $q = $self->{inflight}) { # abort inflight requests
- while (@$q) {
- my ($req, $cb, $arg) = splice(@$q, 0, 3);
- $req = $$req if ref($req);
- $self->{-bc} and $req =~ s/\A(?:contents|info) //;
- $req =~ s/ .*//; # drop git_dir for Gcf2Client
- eval { $cb->(undef, $req, undef, undef, $arg) };
- warn "E: (in abort) $req: $@" if $@;
- }
- }
+ my $sock = $self->{sock};
delete @$self{qw(-bc err_c inflight)};
delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock});
+ $sock->close if $sock; # calls gcf_drain via awaitpid
}
package PublicInbox::GitCheck; # only for git <2.36
diff --git a/lib/PublicInbox/GitAsyncCat.pm b/lib/PublicInbox/GitAsyncCat.pm
index f8b2a9fc..09744b34 100644
--- a/lib/PublicInbox/GitAsyncCat.pm
+++ b/lib/PublicInbox/GitAsyncCat.pm
@@ -40,7 +40,7 @@ sub ibx_async_prefetch {
my ($ibx, $oid, $cb, $arg) = @_;
my $git = $ibx->git;
if (!defined($ibx->{topdir}) && $GCF2C) {
- if (!@{$GCF2C->{inflight} // []}) {
+ if (!@{$GCF2C->gcf_inflight // []}) {
$oid .= " $git->{git_dir}\n";
return $GCF2C->gcf2_async($oid, $cb, $arg); # true
}
diff --git a/lib/PublicInbox/IO.pm b/lib/PublicInbox/IO.pm
index fcebac59..6593dcdf 100644
--- a/lib/PublicInbox/IO.pm
+++ b/lib/PublicInbox/IO.pm
@@ -15,8 +15,10 @@ use Errno qw(EINTR EAGAIN);
sub waitcb { # awaitpid callback
my ($pid, $errref, $cb, @args) = @_;
+ $errref //= \my $workaround_await_pids_clobbered;
$$errref = $?; # sets .cerr for _close
- $cb->($pid, @args) if $cb;
+ $cb->($pid, @args) if $cb; # may clobber $?
+ $? = $$errref;
}
sub attach_pid {
@@ -33,6 +35,11 @@ sub attached_pid {
${${*$io}{pi_io_reap} // []}[1];
}
+sub owner_pid {
+ my ($io) = @_;
+ ${${*$io}{pi_io_reap} // [-1]}[0];
+}
+
# caller cares about error result if they call close explicitly
# reap->[2] may be set before this is called via waitcb
sub close {
@@ -40,8 +47,12 @@ sub close {
my $ret = $io->SUPER::close;
my $reap = delete ${*$io}{pi_io_reap};
return $ret unless $reap && $reap->[0] == $$;
- ${$reap->[2]} // (my $w = awaitpid($reap->[1])); # sets [2]
- ($? = ${$reap->[2]}) ? '' : $ret;
+ if (defined ${$reap->[2]}) { # reap_pids already reaped asynchronously
+ $? = ${$reap->[2]};
+ } else { # wait synchronously
+ my $w = awaitpid($reap->[1]);
+ }
+ $? ? '' : $ret; # use $?, AWAIT_PIDS may be cleared on ->Reset (FIXME?)
}
sub DESTROY {
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH 7/7] drop redundant calls to DS->Reset
2023-11-26 2:10 [PATCH 0/7] more I/O + process reliability and cleanups Eric Wong
` (5 preceding siblings ...)
2023-11-26 2:11 ` [PATCH 6/7] git: improve coupling with {sock} and {inflight} fields Eric Wong
@ 2023-11-26 2:11 ` Eric Wong
2023-11-26 21:08 ` [PATCH v2] " Eric Wong
6 siblings, 1 reply; 9+ messages in thread
From: Eric Wong @ 2023-11-26 2:11 UTC (permalink / raw)
To: meta
Reset gets called on END{} anyways to workaround DBI lifetime
problems, so there's no need to call it near exit.
We'll also replace many calls to POSIX::_exit with the normal
`exit'. This ensures END{} gets called since all of our
destructors are fork-safe nowadays so POSIX::_exit is
unnecessary.
---
lib/PublicInbox/CodeSearchIdx.pm | 1 -
lib/PublicInbox/Daemon.pm | 1 -
lib/PublicInbox/IO.pm | 6 ++----
lib/PublicInbox/TestCommon.pm | 2 +-
lib/PublicInbox/Watch.pm | 6 +++---
lib/PublicInbox/Xapcmd.pm | 6 +++---
6 files changed, 9 insertions(+), 13 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 3764f13e..bb1d698b 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -1292,7 +1292,6 @@ sub cidx_run { # main entry point
local @PublicInbox::DS::post_loop_do = (\&shards_active);
PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
- PublicInbox::DS->Reset;
$self->lock_release(!!$NCHANGE);
show_json($self);
}
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index f33f6f17..a2c1ed6e 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -703,7 +703,6 @@ sub run {
local %POST_ACCEPT;
daemon_loop();
- PublicInbox::DS->Reset;
# ->DESTROY runs when $for_destroy goes out-of-scope
}
diff --git a/lib/PublicInbox/IO.pm b/lib/PublicInbox/IO.pm
index 6593dcdf..5654f3b0 100644
--- a/lib/PublicInbox/IO.pm
+++ b/lib/PublicInbox/IO.pm
@@ -15,10 +15,8 @@ use Errno qw(EINTR EAGAIN);
sub waitcb { # awaitpid callback
my ($pid, $errref, $cb, @args) = @_;
- $errref //= \my $workaround_await_pids_clobbered;
- $$errref = $?; # sets .cerr for _close
+ $$errref = $? if $errref; # sets .cerr for _close
$cb->($pid, @args) if $cb; # may clobber $?
- $? = $$errref;
}
sub attach_pid {
@@ -52,7 +50,7 @@ sub close {
} else { # wait synchronously
my $w = awaitpid($reap->[1]);
}
- $? ? '' : $ret; # use $?, AWAIT_PIDS may be cleared on ->Reset (FIXME?)
+ $? ? '' : $ret;
}
sub DESTROY {
diff --git a/lib/PublicInbox/TestCommon.pm b/lib/PublicInbox/TestCommon.pm
index 361a2356..9595da1a 100644
--- a/lib/PublicInbox/TestCommon.pm
+++ b/lib/PublicInbox/TestCommon.pm
@@ -579,7 +579,7 @@ sub start_script {
undef $tmp_mask;
if ($sub) {
_run_sub($sub, $key, \@argv);
- POSIX::_exit($? >> 8);
+ exit($? >> 8);
} else {
exec(key2script($key), @argv);
die "FAIL: ",join(' ', $key, @argv), ": $!\n";
diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm
index b83a77eb..bef8187c 100644
--- a/lib/PublicInbox/Watch.pm
+++ b/lib/PublicInbox/Watch.pm
@@ -15,7 +15,7 @@ use PublicInbox::Spamcheck;
use PublicInbox::DS qw(now add_timer awaitpid);
use PublicInbox::MID qw(mids);
use PublicInbox::ContentHash qw(content_hash);
-use POSIX qw(_exit WNOHANG);
+use POSIX qw(WNOHANG);
sub compile_watchheaders ($) {
my ($ibx) = @_;
@@ -418,7 +418,7 @@ sub imap_idle_fork {
if ($pid == 0) {
watch_atfork_child($self);
watch_imap_idle_1($self, $uri, $intvl);
- _exit(0);
+ exit(0);
}
$self->{pids}->{$pid} = undef;
awaitpid($pid, \&imap_idle_reap, $self, $uri, $intvl);
@@ -480,7 +480,7 @@ sub poll_fetch_fork { # DS::add_timer callback
watch_atfork_child($self);
watch_imap_fetch_all($self, \@imap) if @imap;
watch_nntp_fetch_all($self, \@nntp) if @nntp;
- _exit(0);
+ exit(0);
}
$self->{pids}->{$pid} = undef;
awaitpid($pid, \&poll_fetch_reap, $self, $intvl, $uris);
diff --git a/lib/PublicInbox/Xapcmd.pm b/lib/PublicInbox/Xapcmd.pm
index 69f0af43..dd0fe6c5 100644
--- a/lib/PublicInbox/Xapcmd.pm
+++ b/lib/PublicInbox/Xapcmd.pm
@@ -10,7 +10,7 @@ use PublicInbox::Search qw(xap_terms);
use PublicInbox::SearchIdx;
use File::Temp 0.19 (); # ->newdir
use File::Path qw(remove_tree);
-use POSIX qw(WNOHANG _exit);
+use POSIX qw(WNOHANG);
use PublicInbox::DS;
# support testing with dev versions of Xapian which installs
@@ -105,9 +105,9 @@ sub cb_spawn {
my ($cb, $args, $opt) = @_; # $cb = cpdb() or compact()
my $pid = PublicInbox::DS::do_fork;
return $pid if $pid > 0;
- $SIG{__DIE__} = sub { warn @_; _exit(1) }; # don't jump up stack
+ $SIG{__DIE__} = sub { warn @_; exit(1) }; # don't jump up stack
$cb->($args, $opt);
- _exit(0);
+ exit(0);
}
sub runnable_or_die ($) {
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH v2] drop redundant calls to DS->Reset
2023-11-26 2:11 ` [PATCH 7/7] drop redundant calls to DS->Reset Eric Wong
@ 2023-11-26 21:08 ` Eric Wong
0 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2023-11-26 21:08 UTC (permalink / raw)
To: meta
Reset gets called on END{} anyways to workaround DBI lifetime
problems, so there's no need to call it near exit. We can't
replace calls to POSIX::_exit with `exit' to force END{} to
run just yet, as there are still some lingering destruction
ordering problems on newer DBI and or Perls.
---
Dropping POSIX::_exit was too aggressive and caused segfaults
on my FreeBSD 13.2 VM.
lib/PublicInbox/CodeSearchIdx.pm | 1 -
lib/PublicInbox/Daemon.pm | 1 -
lib/PublicInbox/IO.pm | 6 ++----
3 files changed, 2 insertions(+), 6 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 3764f13e..bb1d698b 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -1292,7 +1292,6 @@ sub cidx_run { # main entry point
local @PublicInbox::DS::post_loop_do = (\&shards_active);
PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
- PublicInbox::DS->Reset;
$self->lock_release(!!$NCHANGE);
show_json($self);
}
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index f33f6f17..a2c1ed6e 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -703,7 +703,6 @@ sub run {
local %POST_ACCEPT;
daemon_loop();
- PublicInbox::DS->Reset;
# ->DESTROY runs when $for_destroy goes out-of-scope
}
diff --git a/lib/PublicInbox/IO.pm b/lib/PublicInbox/IO.pm
index 6593dcdf..5654f3b0 100644
--- a/lib/PublicInbox/IO.pm
+++ b/lib/PublicInbox/IO.pm
@@ -15,10 +15,8 @@ use Errno qw(EINTR EAGAIN);
sub waitcb { # awaitpid callback
my ($pid, $errref, $cb, @args) = @_;
- $errref //= \my $workaround_await_pids_clobbered;
- $$errref = $?; # sets .cerr for _close
+ $$errref = $? if $errref; # sets .cerr for _close
$cb->($pid, @args) if $cb; # may clobber $?
- $? = $$errref;
}
sub attach_pid {
@@ -52,7 +50,7 @@ sub close {
} else { # wait synchronously
my $w = awaitpid($reap->[1]);
}
- $? ? '' : $ret; # use $?, AWAIT_PIDS may be cleared on ->Reset (FIXME?)
+ $? ? '' : $ret;
}
sub DESTROY {
^ permalink raw reply related [flat|nested] 9+ messages in thread
end of thread, other threads:[~2023-11-26 21:10 UTC | newest]
Thread overview: 9+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2023-11-26 2:10 [PATCH 0/7] more I/O + process reliability and cleanups Eric Wong
2023-11-26 2:10 ` [PATCH 1/7] xap_helper_cxx: do not copy xap_helper.h source Eric Wong
2023-11-26 2:11 ` [PATCH 2/7] xap_client: attach PID to the IO object Eric Wong
2023-11-26 2:11 ` [PATCH 3/7] xap_client: pass arguments to top-level xap_helper Eric Wong
2023-11-26 2:11 ` [PATCH 4/7] xap_helper: allow PI_NO_CXX to disable C++ in more places Eric Wong
2023-11-26 2:11 ` [PATCH 5/7] git: move rbuf handling to PublicInbox::IO Eric Wong
2023-11-26 2:11 ` [PATCH 6/7] git: improve coupling with {sock} and {inflight} fields Eric Wong
2023-11-26 2:11 ` [PATCH 7/7] drop redundant calls to DS->Reset Eric Wong
2023-11-26 21:08 ` [PATCH v2] " Eric Wong
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).