From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id E018D1F539 for ; Tue, 21 Mar 2023 23:07:47 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1679440067; bh=JIpcgtONAF0K1hrnXr28TF91TCLhwEKt12sJzvEP04A=; h=From:To:Subject:Date:In-Reply-To:References:From; b=EujYIomQ2NkWbqTmdZgoguBI4fGoWy7dGiCnK/iaqyxKhNJ3mRonnE7SxgcM/lVBF t+EW6qwx0+WZgOAwoaE7KL+s1McrJTemwjE492khoeTLjQgxIjhxLGT+Xzh9l3FqFH zL9sAwa6AyzR+2sYYKNxuJhjWnpIgDnfVSCWotCM= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 21/28] cindex: improve granularity of quit checks Date: Tue, 21 Mar 2023 23:07:36 +0000 Message-Id: <20230321230743.3020032-21-e@80x24.org> In-Reply-To: <20230321230743.3020032-1-e@80x24.org> References: <20230321230701.3019936-1-e@80x24.org> <20230321230743.3020032-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This fixes shutdown handling when shard_index() isn't running and ensures we can shut down the process more quickly. --- lib/PublicInbox/CodeSearchIdx.pm | 55 ++++++++++++++++++++------------ 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index f0b506da..4f91e0b6 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -206,6 +206,7 @@ sub shard_index { # via wq_io_do # local-ized in parent before fork $TXN_BYTES = $batch_bytes; local $self->{git} = $git; # for patchid + return if $DO_QUIT; my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in }); close $in or die "close: $!"; my $nr = 0; @@ -216,10 +217,10 @@ sub shard_index { # via wq_io_do my $len; my $cmt = {}; local $/ = $FS; - my $buf = <$rd> // return; # leading $FS + my $buf = <$rd> // return close($rd); # leading $FS $buf eq $FS or die "BUG: not LF-NUL: $buf\n"; $self->begin_txn_lazy; - while (defined($buf = <$rd>)) { + while (!$DO_QUIT && defined($buf = <$rd>)) { chomp($buf); $/ = "\n"; $len = length($buf); @@ -234,7 +235,6 @@ sub shard_index { # via wq_io_do $TXN_BYTES = $batch_bytes - $len; } add_commit($self, $cmt); - last if $DO_QUIT; ++$nr; if ($TXN_BYTES <= 0) { cidx_ckpoint($self, "[$n] $nr"); @@ -298,6 +298,7 @@ sub run_todo ($) { sub need_reap { # post_loop_do my (undef, $jobs) = @_; + return if !$LIVE || $DO_QUIT; scalar(keys(%$LIVE)) > $jobs; } @@ -412,7 +413,7 @@ sub check_existing { # retry_reopen callback sub partition_refs ($$$) { my ($self, $git, $refs) = @_; # show-ref --heads --tags --hash output sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin - my $fh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs }); + my $rfh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs }); close $refs or die "close: $!"; my ($seen, $nchange) = (0, 0); my @shard_in = map { @@ -421,7 +422,7 @@ sub partition_refs ($$$) { $fh; } @RDONLY_SHARDS; - while (defined(my $cmt = <$fh>)) { + while (defined(my $cmt = <$rfh>)) { chomp $cmt; my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS); if (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) { @@ -431,8 +432,13 @@ sub partition_refs ($$$) { ++$nchange; $seen = 0; } + if ($DO_QUIT) { + close($rfh); + return (); + } } - close($fh); + close($rfh); + return () if $DO_QUIT; if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) { $self->{nchange} += $nchange; progress($self, "$git->{git_dir}: $nchange commits"); @@ -454,9 +460,18 @@ sub shard_commit { # via wq_io_do sub consumers_open { # post_loop_do my (undef, $consumers) = @_; + return if $DO_QUIT; scalar(grep { $_->{sock} } values %$consumers); } +sub wait_consumers ($$$) { + my ($self, $git, $consumers) = @_; + local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers); + PublicInbox::DS::event_loop($MY_SIG, $SIGSET); + my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers; + die "E: $git->{git_dir} $n shards failed" if $n && !$DO_QUIT; +} + sub commit_used_shards ($$$) { my ($self, $git, $consumers) = @_; local $self->{-shard_ok} = {}; @@ -466,15 +481,12 @@ sub commit_used_shards ($$$) { $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n); $consumers->{$n} = $c; } - local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers); - PublicInbox::DS::event_loop($MY_SIG, $SIGSET); - my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers; - die "E: $git->{git_dir} $n shards failed" if $n; + wait_consumers($self, $git, $consumers); } sub index_repo { # cidx_await cb my ($self, $git, $roots) = @_; - return if $git->{-cidx_err}; + return if $git->{-cidx_err} || $DO_QUIT; my $repo = delete $git->{-repo} or return; seek($roots, 0, SEEK_SET) or die "seek: $!"; chomp(my @roots = <$roots>); @@ -484,31 +496,29 @@ sub index_repo { # cidx_await cb local $self->{current_info} = $git->{git_dir}; my @shard_in = partition_refs($self, $git, delete($repo->{refs})); local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1 - my %CONSUMERS; + my $consumers = {}; for my $n (0..$#shard_in) { -s $shard_in[$n] or next; + last if $DO_QUIT; my ($c, $p) = PublicInbox::PktOp->pair; $c->{ops}->{shard_done} = [ $self ]; $IDX_SHARDS[$n]->wq_io_do('shard_index', [ $shard_in[$n], $p->{op_p} ], $git, $n, \@roots); - $CONSUMERS{$n} = $c; + $consumers->{$n} = $c; } @shard_in = (); - local @PublicInbox::DS::post_loop_do = (\&consumers_open, \%CONSUMERS); - PublicInbox::DS::event_loop($MY_SIG, $SIGSET); - my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS; - die "E: $git->{git_dir} $n shards failed" if $n; + wait_consumers($self, $git, $consumers); if ($DO_QUIT) { - commit_used_shards($self, $git, \%CONSUMERS); + commit_used_shards($self, $git, $consumers); progress($self, "$git->{git_dir}: done"); return; } $repo->{git_dir} = $git->{git_dir}; my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo); if ($id > 0) { - $CONSUMERS{$repo->{shard_n}} = undef; - commit_used_shards($self, $git, \%CONSUMERS); + $consumers->{$repo->{shard_n}} = undef; + commit_used_shards($self, $git, $consumers); progress($self, "$git->{git_dir}: done"); return run_todo($self); } @@ -585,6 +595,7 @@ sub scan_git_dirs ($) { $self, $git); fp_start($self, $git, $prep_repo); ct_start($self, $git, $prep_repo); + last if $DO_QUIT; } cidx_reap($self, 0); } @@ -674,8 +685,10 @@ sub ipc_atfork_child { sub shard_done_wait { # awaitpid cb via ipc_worker_reap my ($pid, $shard, $self) = @_; + my $quit_req = delete($shard->{-cidx_quit}); + return if $DO_QUIT || !$LIVE; if ($? == 0) { # success - delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset'; + $quit_req // warn 'BUG: {-cidx_quit} unset'; return; } warn "PID:$pid $shard->{shard} exited with \$?=$?\n";