From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF, T_SCC_BODY_TEXT_LINE shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id D90971F51A for ; Sun, 26 Nov 2023 02:11:06 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1700964666; bh=UHzv0CFFa9xGlX9Y8t5ld/8RP5oaVySCybhHTblQ9HA=; h=From:To:Subject:Date:In-Reply-To:References:From; b=Vm6iMCdkAd0/aIYGlCQXfjbZBIV9pWYAlB8SxQLUgn1SKqghw1WwDkBS1082gfKF2 jk4QU2LprVUApn7VYF7LKfnX3Ksj3rBMzuk9VgTv9puZVXtQ2V5rRWGrOZpY+W62+y 40uehkkyjy3RvyHYmx23+zhMYHlQLNtdCq6BT0FY= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 6/7] git: improve coupling with {sock} and {inflight} fields Date: Sun, 26 Nov 2023 02:11:04 +0000 Message-ID: <20231126021105.408573-7-e@80x24.org> In-Reply-To: <20231126021105.408573-1-e@80x24.org> References: <20231126021105.408573-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: While the {inflight} array should be tied to the IO object even more tightly, that's not an easy task with our current code. So take some small steps by introducing a gcf_inflight helper to validate the ownership of the process and to drain the inflight array via the awaitpid callback. This hopefully fix problems with t/lei-q-save.t (still) hanging occasionally on v2 outputs since git->cleanup/->DESTROY was getting called in v2 shard workers. --- lib/PublicInbox/Gcf2Client.pm | 6 ++- lib/PublicInbox/Git.pm | 79 ++++++++++++++++++++-------------- lib/PublicInbox/GitAsyncCat.pm | 2 +- lib/PublicInbox/IO.pm | 17 ++++++-- 4 files changed, 65 insertions(+), 39 deletions(-) diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm index 19d77e32..07ff7dcb 100644 --- a/lib/PublicInbox/Gcf2Client.pm +++ b/lib/PublicInbox/Gcf2Client.pm @@ -31,15 +31,16 @@ sub new { $opt->{0} = $opt->{1} = $s2; my $cmd = [$^X, $^W ? ('-w') : (), qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]]; - PublicInbox::IO::attach_pid($s1, spawn($cmd, $env, $opt)); $self->{inflight} = []; + PublicInbox::IO::attach_pid($s1, spawn($cmd, $env, $opt), + \&PublicInbox::Git::gcf_drain, $self->{inflight}); $self->{epwatch} = \undef; # for Git->cleanup $self->SUPER::new($s1, EPOLLIN); } sub gcf2_async ($$$;$) { my ($self, $req, $cb, $arg) = @_; - my $inflight = $self->{inflight} or return $self->close; + my $inflight = $self->gcf_inflight or return; PublicInbox::Git::write_all($self, $req, \&cat_async_step, $inflight); push @$inflight, \$req, $cb, $arg; # ref prevents Git.pm retries } @@ -49,6 +50,7 @@ sub alternates_changed {} no warnings 'once'; +*gcf_inflight = \&PublicInbox::Git::gcf_inflight; # for event_step *cat_async_step = \&PublicInbox::Git::cat_async_step; # for event_step *event_step = \&PublicInbox::Git::event_step; *fail = \&PublicInbox::Git::fail; diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index 93736cf0..fe834210 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -140,6 +140,18 @@ sub last_check_err { $buf; } +sub gcf_drain { # awaitpid cb + my ($pid, $inflight, $bc) = @_; + while (@$inflight) { + my ($req, $cb, $arg) = splice(@$inflight, 0, 3); + $req = $$req if ref($req); + $bc and $req =~ s/\A(?:contents|info) //; + $req =~ s/ .*//; # drop git_dir for Gcf2Client + eval { $cb->(undef, $req, undef, undef, $arg) }; + warn "E: (in abort) $req: $@" if $@; + } +} + sub _sock_cmd { my ($self, $batch, $err_c) = @_; $self->{sock} and Carp::confess('BUG: {sock} exists'); @@ -162,8 +174,11 @@ sub _sock_cmd { $self->{err_c} = $opt->{2} = tmpfile($id, undef, 1) or $self->fail("tmpfile($id): $!"); } + my $inflight = []; # TODO consider moving this into the IO object my $pid = spawn(\@cmd, undef, $opt); - $self->{sock} = PublicInbox::IO::attach_pid($s1, $pid); + $self->{sock} = PublicInbox::IO::attach_pid($s1, $pid, + \&gcf_drain, $inflight, $self->{-bc}); + $self->{inflight} = $inflight; } sub cat_async_retry ($$) { @@ -171,8 +186,8 @@ sub cat_async_retry ($$) { # {inflight} may be non-existent, but if it isn't we delete it # here to prevent cleanup() from waiting: - delete $self->{inflight}; - cleanup($self); + my ($sock, $epwatch) = delete @$self{qw(sock epwatch inflight)}; + $self->SUPER::close if $epwatch; my $new_inflight = batch_prepare($self); while (my ($oid, $cb, $arg) = splice(@$old_inflight, 0, 3)) { @@ -180,13 +195,25 @@ sub cat_async_retry ($$) { $oid = \$oid if !@$new_inflight; # to indicate oid retried push @$new_inflight, $oid, $cb, $arg; } + $sock->close if $sock; # only safe once old_inflight is empty cat_async_step($self, $new_inflight); # take one step } +sub gcf_inflight ($) { + my ($self) = @_; + if ($self->{sock}) { + return $self->{inflight} if $self->{sock}->owner_pid == $$; + delete @$self{qw(sock inflight)}; + } else { + $self->close; + } + undef; +} + # returns true if prefetch is successful sub async_prefetch { my ($self, $oid, $cb, $arg) = @_; - my $inflight = $self->{inflight} or return; + my $inflight = gcf_inflight($self) or return; return if @$inflight; substr($oid, 0, 0) = 'contents ' if $self->{-bc}; write_all($self, "$oid\n", \&cat_async_step, $inflight); @@ -195,7 +222,7 @@ sub async_prefetch { sub cat_async_step ($$) { my ($self, $inflight) = @_; - die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3; + croak 'BUG: inflight empty or odd' if scalar(@$inflight) < 3; my ($req, $cb, $arg) = @$inflight[0, 1, 2]; my ($bref, $oid, $type, $size); my $head = $self->{sock}->my_readline; @@ -237,11 +264,8 @@ sub cat_async_step ($$) { sub cat_async_wait ($) { my ($self) = @_; - return $self->close if !$self->{sock}; - my $inflight = $self->{inflight} or return; - while (scalar(@$inflight)) { - cat_async_step($self, $inflight); - } + my $inflight = gcf_inflight($self) or return; + cat_async_step($self, $inflight) while (scalar(@$inflight)); } sub batch_prepare ($) { @@ -253,7 +277,6 @@ sub batch_prepare ($) { } else { _sock_cmd($self, 'batch'); } - $self->{inflight} = []; } sub _cat_file_cb { @@ -271,7 +294,7 @@ sub cat_file { sub check_async_step ($$) { my ($ck, $inflight) = @_; - die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3; + croak 'BUG: inflight empty or odd' if scalar(@$inflight) < 3; my ($req, $cb, $arg) = @$inflight[0, 1, 2]; chomp(my $line = $ck->{sock}->my_readline); my ($hex, $type, $size) = split(/ /, $line); @@ -291,8 +314,7 @@ sub check_async_wait ($) { my ($self) = @_; return cat_async_wait($self) if $self->{-bc}; my $ck = $self->{ck} or return; - return $ck->close if !$ck->{sock}; - my $inflight = $ck->{inflight} or return; + my $inflight = gcf_inflight($ck) or return; check_async_step($ck, $inflight) while (scalar(@$inflight)); } @@ -312,7 +334,6 @@ sub check_async_begin ($) { } else { _sock_cmd($self = ck($self), 'batch-check', 1); } - $self->{inflight} = []; } sub write_all { @@ -337,12 +358,13 @@ sub check_async ($$$$) { my $inflight; if ($self->{-bc}) { # likely as time goes on batch_command: - $inflight = $self->{inflight} // cat_async_begin($self); + $inflight = gcf_inflight($self) // cat_async_begin($self); substr($oid, 0, 0) = 'info '; write_all($self, "$oid\n", \&cat_async_step, $inflight); } else { # accounts for git upgrades while we're running: my $ck = $self->{ck}; # undef OK, maybe set in check_async_begin - $inflight = $ck->{inflight} // check_async_begin($self); + $inflight = ($ck ? gcf_inflight($ck) : undef) + // check_async_begin($self); goto batch_command if $self->{-bc}; write_all($self->{ck}, "$oid\n", \&check_async_step, $inflight); } @@ -417,8 +439,8 @@ sub date_parse { } sub _active ($) { - scalar(@{$_[0]->{inflight} // []}) || - ($_[0]->{ck} && scalar(@{$_[0]->{ck}->{inflight} // []})) + scalar(@{gcf_inflight($_[0]) // []}) || + ($_[0]->{ck} && scalar(@{gcf_inflight($_[0]->{ck}) // []})) } # check_async and cat_async may trigger the other, so ensure they're @@ -493,13 +515,13 @@ sub pub_urls { sub cat_async_begin { my ($self) = @_; cleanup($self) if $self->alternates_changed; - die 'BUG: already in async' if $self->{inflight}; + die 'BUG: already in async' if gcf_inflight($self); batch_prepare($self); } sub cat_async ($$$;$) { my ($self, $oid, $cb, $arg) = @_; - my $inflight = $self->{inflight} // cat_async_begin($self); + my $inflight = gcf_inflight($self) // cat_async_begin($self); substr($oid, 0, 0) = 'contents ' if $self->{-bc}; write_all($self, $oid."\n", \&cat_async_step, $inflight); push(@$inflight, $oid, $cb, $arg); @@ -596,8 +618,7 @@ sub cleanup_if_unlinked { sub event_step { my ($self) = @_; - $self->close if !$self->{sock}; # process died while requeued - my $inflight = $self->{inflight}; + my $inflight = gcf_inflight($self); if ($inflight && @$inflight) { $self->cat_async_step($inflight); return $self->close unless $self->{sock}; @@ -619,18 +640,10 @@ sub watch_async ($) { sub close { my ($self) = @_; - if (my $q = $self->{inflight}) { # abort inflight requests - while (@$q) { - my ($req, $cb, $arg) = splice(@$q, 0, 3); - $req = $$req if ref($req); - $self->{-bc} and $req =~ s/\A(?:contents|info) //; - $req =~ s/ .*//; # drop git_dir for Gcf2Client - eval { $cb->(undef, $req, undef, undef, $arg) }; - warn "E: (in abort) $req: $@" if $@; - } - } + my $sock = $self->{sock}; delete @$self{qw(-bc err_c inflight)}; delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock}); + $sock->close if $sock; # calls gcf_drain via awaitpid } package PublicInbox::GitCheck; # only for git <2.36 diff --git a/lib/PublicInbox/GitAsyncCat.pm b/lib/PublicInbox/GitAsyncCat.pm index f8b2a9fc..09744b34 100644 --- a/lib/PublicInbox/GitAsyncCat.pm +++ b/lib/PublicInbox/GitAsyncCat.pm @@ -40,7 +40,7 @@ sub ibx_async_prefetch { my ($ibx, $oid, $cb, $arg) = @_; my $git = $ibx->git; if (!defined($ibx->{topdir}) && $GCF2C) { - if (!@{$GCF2C->{inflight} // []}) { + if (!@{$GCF2C->gcf_inflight // []}) { $oid .= " $git->{git_dir}\n"; return $GCF2C->gcf2_async($oid, $cb, $arg); # true } diff --git a/lib/PublicInbox/IO.pm b/lib/PublicInbox/IO.pm index fcebac59..6593dcdf 100644 --- a/lib/PublicInbox/IO.pm +++ b/lib/PublicInbox/IO.pm @@ -15,8 +15,10 @@ use Errno qw(EINTR EAGAIN); sub waitcb { # awaitpid callback my ($pid, $errref, $cb, @args) = @_; + $errref //= \my $workaround_await_pids_clobbered; $$errref = $?; # sets .cerr for _close - $cb->($pid, @args) if $cb; + $cb->($pid, @args) if $cb; # may clobber $? + $? = $$errref; } sub attach_pid { @@ -33,6 +35,11 @@ sub attached_pid { ${${*$io}{pi_io_reap} // []}[1]; } +sub owner_pid { + my ($io) = @_; + ${${*$io}{pi_io_reap} // [-1]}[0]; +} + # caller cares about error result if they call close explicitly # reap->[2] may be set before this is called via waitcb sub close { @@ -40,8 +47,12 @@ sub close { my $ret = $io->SUPER::close; my $reap = delete ${*$io}{pi_io_reap}; return $ret unless $reap && $reap->[0] == $$; - ${$reap->[2]} // (my $w = awaitpid($reap->[1])); # sets [2] - ($? = ${$reap->[2]}) ? '' : $ret; + if (defined ${$reap->[2]}) { # reap_pids already reaped asynchronously + $? = ${$reap->[2]}; + } else { # wait synchronously + my $w = awaitpid($reap->[1]); + } + $? ? '' : $ret; # use $?, AWAIT_PIDS may be cleared on ->Reset (FIXME?) } sub DESTROY {