From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 83E6F1F86C for ; Fri, 27 Nov 2020 21:33:55 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH] *index: more consistent graceful shutdown checks Date: Fri, 27 Nov 2020 21:33:55 +0000 Message-Id: <20201127213355.26310-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: v1 and v2 inbox indexing now supports graceful shutdown checks just like ExtSearchIdx. Additionally, we'll consistently perform quit checks at the top of loops for consistency. Interaction with the --xapian-only and --sequential-shard options are a bit lacking, and will warn the user to use "--reindex --xapian-only" to fix. --- lib/PublicInbox/ExtSearchIdx.pm | 12 +++++------ lib/PublicInbox/SearchIdx.pm | 25 ++++++++++++++++++---- lib/PublicInbox/V2Writable.pm | 37 ++++++++++++++++++++++----------- script/public-inbox-index | 1 + 4 files changed, 53 insertions(+), 22 deletions(-) diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 7ab0c4af..cf90c562 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -309,10 +309,11 @@ sub _sync_inbox ($$$) { warn "E: $ekey unsupported inbox version (v$v)\n"; return; } - unless ($sync->{quit}) { - index_todo($self, $sync, $_) for @{delete($sync->{todo}) // []}; - $self->{midx}->index_ibx($ibx) unless $sync->{quit}; + for my $unit (@{delete($sync->{todo}) // []}) { + last if $sync->{quit}; + index_todo($self, $sync, $unit); } + $self->{midx}->index_ibx($ibx) unless $sync->{quit}; $ibx->git->cleanup; # done with this inbox, now } @@ -334,17 +335,16 @@ sub eidx_sync { # main entry point -regen_fmt => "%u/?\n", }; local $SIG{USR1} = sub { $need_checkpoint = 1 }; - my $quit = sub { $sync->{quit} = 1; warn "gracefully quitting\n"; }; + my $quit = PublicInbox::SearchIdx::quit_cb($sync); local $SIG{QUIT} = $quit; local $SIG{INT} = $quit; local $SIG{TERM} = $quit; # don't use $_ here, it'll get clobbered by reindex_checkpoint for my $ibx (@{$self->{ibx_list}}) { - _sync_inbox($self, $sync, $ibx); last if $sync->{quit}; + _sync_inbox($self, $sync, $ibx); } - $self->{oidx}->rethread_done($opt) unless $sync->{quit}; PublicInbox::V2Writable::done($self); diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 18390602..d06c159b 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -632,10 +632,11 @@ sub index_sync { my ($self, $opt) = @_; delete $self->{lock_path} if $opt->{-skip_lock}; $self->with_umask(\&_index_sync, $self, $opt); - if ($opt->{reindex}) { + if ($opt->{reindex} && !$opt->{quit}) { my %again = %$opt; delete @again{qw(rethread reindex)}; index_sync($self, \%again); + $opt->{quit} = $again{quit}; # propagate to caller } } @@ -688,7 +689,7 @@ sub v1_checkpoint ($$;$) { if (my $pr = $sync->{-opt}->{-progress}) { $pr->("indexed $nr/$sync->{ntodo}\n") if $nr; } - if (!$stk) { # more to come + if (!$stk && !$sync->{quit}) { # more to come begin_txn_lazy($self); $self->{mm}->{dbh}->begin_work; } @@ -709,6 +710,7 @@ sub process_stack { if (my @leftovers = keys %{delete($sync->{D}) // {}}) { warn('W: unindexing '.scalar(@leftovers)." leftovers\n"); for my $oid (@leftovers) { + last if $sync->{quit}; $oid = unpack('H*', $oid); $git->cat_async($oid, \&unindex_both, $sync); } @@ -718,6 +720,7 @@ sub process_stack { } while (my ($f, $at, $ct, $oid, $cur_cmt) = $stk->pop_rec) { my $arg = { %$sync, cur_cmt => $cur_cmt }; + last if $sync->{quit}; if ($f eq 'm') { $arg->{autime} = $at; $arg->{cotime} = $ct; @@ -731,7 +734,7 @@ sub process_stack { $git->cat_async($oid, \&unindex_both, $arg); } } - v1_checkpoint($self, $sync, $stk); + v1_checkpoint($self, $sync, $sync->{quit} ? undef : $stk); } sub log2stack ($$$) { @@ -841,6 +844,16 @@ sub reindex_from ($$) { ref($reindex) eq 'HASH' ? $reindex->{from} : ''; } +sub quit_cb ($) { + my ($sync) = @_; + sub { + # we set {-opt}->{quit} too, so ->index_sync callers + # can abort multi-inbox loops this way + $sync->{quit} = $sync->{-opt}->{quit} = 1; + warn "gracefully quitting\n"; + } +} + # indexes all unindexed messages (v1 only) sub _index_sync { my ($self, $opt) = @_; @@ -850,6 +863,10 @@ sub _index_sync { $ibx->git->batch_prepare; my $pr = $opt->{-progress}; my $sync = { reindex => $opt->{reindex}, -opt => $opt, ibx => $ibx }; + my $quit = quit_cb($sync); + local $SIG{QUIT} = $quit; + local $SIG{INT} = $quit; + local $SIG{TERM} = $quit; my $xdb = $self->begin_txn_lazy; $self->{oidx}->rethread_prepare($opt); my $mm = _msgmap_init($self); @@ -870,7 +887,7 @@ sub _index_sync { my $stk = prepare_stack($sync, $range); $sync->{ntodo} = $stk ? $stk->num_records : 0; $pr->("$sync->{ntodo}\n") if $pr; # continue previous line - process_stack($self, $sync, $stk); + process_stack($self, $sync, $stk) if !$sync->{quit}; } sub DESTROY { diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index afba0220..c94623e1 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -1106,10 +1106,10 @@ sub sync_prepare ($$) { local $self->{current_info} = 'leftover '; my $unindex_oid = $self->can('unindex_oid'); for my $oid (@leftovers) { + last if $sync->{quit}; $oid = unpack('H*', $oid); my $req = { %$sync, oid => $oid }; $self->git->cat_async($oid, $unindex_oid, $req); - last if $sync->{quit}; } $self->git->cat_async_wait; } @@ -1233,6 +1233,7 @@ sub index_xap_step ($$$;$) { "$beg..$end (% $step)\n"); } for (my $num = $beg; $num <= $end; $num += $step) { + last if $sync->{quit}; my $smsg = $ibx->over->get_art($num) or next; $smsg->{self} = $self; $ibx->git->cat_async($smsg->{blob}, \&index_xap_only, $smsg); @@ -1262,6 +1263,12 @@ sub index_todo ($$$) { local $sync->{latest_cmt} = \(my $latest_cmt); local $sync->{unit} = $unit; while (my ($f, $at, $ct, $oid, $cmt) = $stk->pop_rec) { + if ($sync->{quit}) { + warn "waiting to quit...\n"; + $all->async_wait_all; + $self->update_last_commit($sync); + return; + } my $req = { %$sync, autime => $at, @@ -1278,12 +1285,6 @@ sub index_todo ($$$) { } elsif ($f eq 'd') { $all->cat_async($oid, $unindex_oid, $req); } - if ($sync->{quit}) { - warn "waiting to quit...\n"; - $all->async_wait_all; - $self->update_last_commit($sync); - return; - } if (${$sync->{need_checkpoint}}) { reindex_checkpoint($self, $sync); } @@ -1310,6 +1311,7 @@ sub xapian_only { if ($seq || !$self->{parallel}) { my $shard_end = $self->{shards} - 1; for my $i (0..$shard_end) { + last if $sync->{quit}; index_xap_step($self, $sync, $art_beg + $i); if ($i != $shard_end) { reindex_checkpoint($self, $sync); @@ -1350,7 +1352,7 @@ sub index_sync { ibx => $self->{ibx}, epoch_max => $epoch_max, }; - my $quit = sub { $sync->{quit} = 1 }; + my $quit = PublicInbox::SearchIdx::quit_cb($sync); local $SIG{QUIT} = $quit; local $SIG{INT} = $quit; local $SIG{TERM} = $quit; @@ -1381,14 +1383,21 @@ sub index_sync { $pr->('all.git '.sprintf($sync->{-regen_fmt}, $$nr)) if $pr; } + my $quit_warn; # deal with Xapian shards sequentially if ($seq && delete($sync->{mm_tmp})) { - $self->{ibx}->{indexlevel} = $idxlevel; - xapian_only($self, $opt, $sync, $art_beg); + if ($sync->{quit}) { + $quit_warn = 1; + } else { + $self->{ibx}->{indexlevel} = $idxlevel; + xapian_only($self, $opt, $sync, $art_beg); + $quit_warn = 1 if $sync->{quit}; + } } # --reindex on the command-line - if ($opt->{reindex} && !ref($opt->{reindex}) && $idxlevel ne 'basic') { + if (!$sync->{quit} && $opt->{reindex} && + !ref($opt->{reindex}) && $idxlevel ne 'basic') { $self->lock_acquire; my $s0 = PublicInbox::SearchIdx->new($self->{ibx}, 0, 0); if (my $xdb = $s0->idx_acquire) { @@ -1400,12 +1409,16 @@ sub index_sync { } # reindex does not pick up new changes, so we rerun w/o it: - if ($opt->{reindex}) { + if ($opt->{reindex} && !$sync->{quit}) { my %again = %$opt; $sync = undef; delete @again{qw(rethread reindex -skip_lock)}; index_sync($self, \%again); + $opt->{quit} = $again{quit}; # propagate to caller } + warn < $v }; } PublicInbox::Admin::index_inbox($ibx, undef, $ibx_opt); + last if $ibx_opt->{quit}; if (my $copt = $opt->{compact_opt}) { local $copt->{jobs} = 0 if $ibx_opt->{sequential_shard}; PublicInbox::Xapcmd::run($ibx, 'compact', $copt);