From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 91D091F512 for ; Tue, 21 Mar 2023 23:07:46 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1679440066; bh=MR+xP2ivsgotKOsw9f93JTpT6LtquUUVZCB8zvE8BPA=; h=From:To:Subject:Date:In-Reply-To:References:From; b=OguqoHuTW4Cvkl0flxlQi79tQDIWw4pb7t3hJ7xtS3GNgp6IeU4XyuYACtzArSS6A JyMc1cF54FZuLPzlp5oHfscBZe1jpzx1HFd7/1qd28r9HDEpZVgwq82+meB4r4SZuw N1qzFqfAqUr8aqNUcbD8SZWGaX4gOZXzvi+Kcb30= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 15/28] cindex: handle graceful shutdown by default Date: Tue, 21 Mar 2023 23:07:30 +0000 Message-Id: <20230321230743.3020032-15-e@80x24.org> In-Reply-To: <20230321230743.3020032-1-e@80x24.org> References: <20230321230701.3019936-1-e@80x24.org> <20230321230743.3020032-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: While individual Xapian shards are consistent due to the use of Xapian transactions, the data across shards still needs to be in a consistent state for our search to work. --- lib/PublicInbox/CodeSearchIdx.pm | 71 +++++++++++++++++++++----------- 1 file changed, 48 insertions(+), 23 deletions(-) diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index 1a472b64..82f90368 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -38,6 +38,8 @@ our ( $LIVE_JOBS, # integer $MY_SIG, # like %SIG $SIGSET, + $TXN_BYTES, # number of bytes in current shard transaction + $DO_QUIT, # signal number @RDONLY_SHARDS, # Xapian::Database @IDX_SHARDS # clones of self ); @@ -153,18 +155,14 @@ sub store_repo { # wq_do - returns docid sub shard_index { # via wq_io_do my ($self, $git, $n, $roots) = @_; local $self->{current_info} = "$git->{git_dir} [$n]"; - my ($quit, $cmt); + my $cmt; local $self->{roots} = $roots; my $in = delete($self->{0}) // die 'BUG: no {0} input'; my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p'; my $batch_bytes = $self->{-opt}->{batch_size} // $PublicInbox::SearchIdx::BATCH_BYTES; - my $max = $batch_bytes; - my $set_quit = sub { $quit = shift }; - local $SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import' - local $SIG{QUIT} = $set_quit; - local $SIG{TERM} = $set_quit; - local $SIG{INT} = $set_quit; + # local-ized in parent before fork + $TXN_BYTES = $batch_bytes; local $self->{git} = $git; # for patchid my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in }); close $in or die "close: $!"; @@ -179,22 +177,23 @@ sub shard_index { # via wq_io_do $self->begin_txn_lazy; while (defined($buf = <$rd>)) { chomp($buf); - $max -= length($buf); + $TXN_BYTES -= length($buf); @$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT)); $/ = "\n"; add_commit($self, $cmt); - last if $quit; # likely SIGPIPE + last if $DO_QUIT; ++$nr; - if ($max <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) { + if ($TXN_BYTES <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) { progress($self, "[$n] $nr"); $self->{xdb}->commit_transaction; - $max = $batch_bytes; + $TXN_BYTES = $batch_bytes; $self->{xdb}->begin_transaction; } $/ = $FS; } close($rd); - if (!$? || ($quit && ($? & 127) == POSIX::SIGPIPE)) { + if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT || + ($? & 127) == POSIX::SIGPIPE))) { send($op_p, "shard_done $n", MSG_EOR); } else { warn "E: git @LOG_STDIN: \$?=$?\n"; @@ -254,7 +253,7 @@ sub need_reap { # post_loop_do sub cidx_reap ($$) { my ($self, $jobs) = @_; while (run_todo($self)) {} - local @PublicInbox::DS::post_loop_do = \(&need_reap, $jobs); + local @PublicInbox::DS::post_loop_do = (\&need_reap, $jobs); while (need_reap(undef, $jobs)) { PublicInbox::DS::event_loop($MY_SIG, $SIGSET); } @@ -263,7 +262,7 @@ sub cidx_reap ($$) { sub cidx_await_cb { # awaitpid cb my ($pid, $cb, $self, $git, @args) = @_; - return if !$LIVE; # premature shutdown + return if !$LIVE || $DO_QUIT; my $cmd = delete $LIVE->{$pid} // die 'BUG: no $cmd'; PublicInbox::DS::enqueue_reap() if !keys(%$LIVE); # once more for PLC if ($?) { @@ -283,7 +282,7 @@ sub cidx_await ($$$$$@) { # only care about --heads (branches) and --tags, and not even their names sub fp_start ($$$) { my ($self, $git, $prep_repo) = @_; - return if !$LIVE; # premature exit + return if !$LIVE || $DO_QUIT; cidx_reap($self, $LIVE_JOBS); open my $refs, '+>', undef or die "open: $!"; my $cmd = ['git', "--git-dir=$git->{git_dir}", @@ -305,7 +304,7 @@ sub fp_fini { # cidx_await cb sub ct_start ($$$) { my ($self, $git, $prep_repo) = @_; - return if !$LIVE; # premature exit + return if !$LIVE || $DO_QUIT; cidx_reap($self, $LIVE_JOBS); my $cmd = [ 'git', "--git-dir=$git->{git_dir}", qw[for-each-ref --sort=-committerdate @@ -325,7 +324,7 @@ sub ct_fini { # cidx_await cb # TODO: also index gitweb.owner and the full fingerprint for grokmirror? sub prep_repo ($$) { my ($self, $git) = @_; - return if !$LIVE || $git->{-cidx_err}; # premature exit + return if !$LIVE || $DO_QUIT || $git->{-cidx_err}; my $repo = $git->{-repo} // die 'BUG: no {-repo}'; if (!defined($repo->{ct})) { warn "W: $git->{git_dir} has no commits, skipping\n"; @@ -449,6 +448,11 @@ sub index_repo { # cidx_await cb PublicInbox::DS::event_loop($MY_SIG, $SIGSET); my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS; die "E: $git->{git_dir} $n shards failed" if $n; + if ($DO_QUIT) { + commit_used_shards($self, $git, \%CONSUMERS); + progress($self, "$git->{git_dir}: done"); + return; + } $repo->{git_dir} = $git->{git_dir}; my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo); if ($id > 0) { @@ -462,7 +466,7 @@ sub index_repo { # cidx_await cb sub get_roots ($$) { my ($self, $git) = @_; - return if !$LIVE; # premature exit + return if !$LIVE || $DO_QUIT; my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}'; sysseek($refs, 0, SEEK_SET) or die "seek: $!"; open my $roots, '+>', undef or die "open: $!"; @@ -489,6 +493,10 @@ sub load_existing ($) { # for -u/--update @$dirs = grep { !$uniq{$_}++ } @$dirs; } +# SIG handlers: +sub shard_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->() } +sub shard_usr1 { $TXN_BYTES = -1 } + sub cidx_init ($) { my ($self) = @_; my $dir = $self->{cidx_dir}; @@ -498,12 +506,13 @@ sub cidx_init ($) { } $self->lock_acquire; my @shards; + local $TXN_BYTES; for my $n (0..($self->{nshard} - 1)) { my $shard = bless { %$self, shard => $n }, ref($self); delete @$shard{qw(lockfh lock_path)}; $shard->idx_acquire; $shard->idx_release; - $shard->wq_workers_start("shard[$n]", 1, undef, { + $shard->wq_workers_start("shard[$n]", 1, $SIGSET, { siblings => \@shards, # for ipc_atfork_child }, \&shard_done_wait, $self); push @shards, $shard; @@ -533,6 +542,15 @@ sub shards_active { # post_loop_do scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS); } +# signal handlers +sub kill_shards { $_->wq_kill(@_) for @IDX_SHARDS } + +sub parent_quit { + $DO_QUIT = $_[0]; + kill_shards(@_); + warn "# SIG$_[0] received, quitting...\n"; +} + sub cidx_run { # main entry point my ($self) = @_; local $self->{todo} = []; @@ -541,13 +559,15 @@ sub cidx_run { # main entry point my $restore = PublicInbox::OnDestroy->new($$, \&PublicInbox::DS::sig_setmask, $SIGSET); local $LIVE = {}; + local $DO_QUIT; local @IDX_SHARDS = cidx_init($self); local $self->{current_info} = ''; - my $cb = $SIG{__WARN__} || \&CORE::warn; local $MY_SIG = { CHLD => \&PublicInbox::DS::enqueue_reap, - INT => sub { exit }, + USR1 => \&kill_shards, }; + $MY_SIG->{$_} = \&parent_quit for qw(TERM QUIT INT); + my $cb = $SIG{__WARN__} || \&CORE::warn; local $SIG{__WARN__} = sub { my $m = shift @_; $self->{current_info} eq '' or @@ -594,14 +614,19 @@ sub cidx_run { # main entry point sub ipc_atfork_child { my ($self) = @_; $self->SUPER::ipc_atfork_child; + $SIG{USR1} = \&shard_usr1; + $SIG{$_} = \&shard_quit for qw(INT TERM QUIT); my $x = delete $self->{siblings} // die 'BUG: no {siblings}'; $_->wq_close for @$x; + undef; } sub shard_done_wait { # awaitpid cb via ipc_worker_reap my ($pid, $shard, $self) = @_; - delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset'; - return unless $?; + if ($? == 0) { # success + delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset'; + return; + } warn "PID:$pid $shard->{shard} exited with \$?=$?\n"; ++$self->{shard_err} if defined($self->{shard_err}); }