* [PATCH 1/4] *index: checkpoints write last_commit metadata
2020-11-13 11:11 [PATCH 0/4] extindex: checkpoints, graceful shutdown, cleanups Eric Wong
@ 2020-11-13 11:11 ` Eric Wong
2020-11-13 11:11 ` [PATCH 2/4] *index: avoid per-epoch --batch-check processes Eric Wong
` (2 subsequent siblings)
3 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2020-11-13 11:11 UTC (permalink / raw)
To: meta
This will set us up for supporting graceful shutdown
on -index without repeating any work.
---
lib/PublicInbox/ExtSearchIdx.pm | 12 ++++---
lib/PublicInbox/IdxStack.pm | 16 +++++++---
lib/PublicInbox/SearchIdx.pm | 56 ++++++++++++++++++---------------
lib/PublicInbox/V2Writable.pm | 28 ++++++++++++-----
t/idx_stack.t | 20 ++++++------
5 files changed, 81 insertions(+), 51 deletions(-)
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 7aaf8291..14ffdadb 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -168,6 +168,10 @@ sub do_finalize ($) {
# `d' message was already unindexed in the v1/v2 inboxes,
# so it's too noisy to warn, here.
}
+ # cur_cmt may be undef for unindex_oid, set by V2Writable::index_todo
+ if (defined(my $cur_cmt = $req->{cur_cmt})) {
+ ${$req->{latest_cmt}} = $cur_cmt;
+ }
}
sub do_step ($) { # main iterator for adding messages to the index
@@ -337,10 +341,10 @@ sub eidx_sync { # main entry point
}
sub update_last_commit { # overrides V2Writable
- my ($self, $sync, $unit, $latest_cmt) = @_;
- return unless defined $latest_cmt;
-
- $self->git->async_wait_all;
+ my ($self, $sync, $stk) = @_;
+ my $unit = $sync->{unit} // return;
+ my $latest_cmt = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+ defined($latest_cmt) or return;
my $ibx = $sync->{ibx} or die 'BUG: {ibx} missing';
my $ekey = $ibx->eidx_key;
my $uv = $ibx->uidvalidity;
diff --git a/lib/PublicInbox/IdxStack.pm b/lib/PublicInbox/IdxStack.pm
index ce75b46a..e7e10de9 100644
--- a/lib/PublicInbox/IdxStack.pm
+++ b/lib/PublicInbox/IdxStack.pm
@@ -6,7 +6,7 @@ package PublicInbox::IdxStack;
use v5.10.1;
use strict;
use Fcntl qw(:seek);
-use constant FMT => eval { pack('Q', 1) } ? 'A1QQH*' : 'A1IIH*';
+use constant PACK_FMT => eval { pack('Q', 1) } ? 'A1QQH*H*' : 'A1IIH*H*';
# start off in write-only mode
sub new {
@@ -16,9 +16,15 @@ sub new {
# file_char = [d|m]
sub push_rec {
- my ($self, $file_char, $at, $ct, $blob_oid) = @_;
- my $rec = pack(FMT, $file_char, $at, $ct, $blob_oid);
- $self->{rec_size} //= length($rec);
+ my ($self, $file_char, $at, $ct, $blob_oid, $cmt_oid) = @_;
+ my $rec = pack(PACK_FMT, $file_char, $at, $ct, $blob_oid, $cmt_oid);
+ $self->{unpack_fmt} //= do {
+ my $len = length($cmt_oid);
+ my $fmt = PACK_FMT;
+ $fmt =~ s/H\*/H$len/g;
+ $self->{rec_size} = length($rec);
+ $fmt;
+ };
print { $self->{wr} } $rec or die "print: $!";
$self->{tot_size} += length($rec);
}
@@ -46,7 +52,7 @@ sub pop_rec {
my $r = read($io, my $buf, $sz);
defined($r) or die "read: $!";
$r == $sz or die "read($r != $sz)";
- unpack(FMT, $buf);
+ unpack($self->{unpack_fmt}, $buf);
}
1;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 662055c6..90d8c8b3 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -608,11 +608,17 @@ sub index_both { # git->cat_async callback
$smsg->{num} = index_mm($self, $eml, $oid, $sync) or
die "E: could not generate NNTP article number for $oid";
add_message($self, $eml, $smsg, $sync);
+ my $cur_cmt = $sync->{cur_cmt} // die 'BUG: {cur_cmt} missing';
+ ${$sync->{latest_cmt}} = $cur_cmt;
}
sub unindex_both { # git->cat_async callback
- my ($bref, $oid, $type, $size, $self) = @_;
- unindex_eml($self, $oid, PublicInbox::Eml->new($bref));
+ my ($bref, $oid, $type, $size, $sync) = @_;
+ unindex_eml($sync->{sidx}, $oid, PublicInbox::Eml->new($bref));
+ # may be undef if leftover
+ if (defined(my $cur_cmt = $sync->{cur_cmt})) {
+ ${$sync->{latest_cmt}} = $cur_cmt;
+ }
}
sub with_umask {
@@ -646,34 +652,33 @@ sub v1_checkpoint ($$;$) {
my ($self, $sync, $stk) = @_;
$self->{ibx}->git->async_wait_all;
- # latest_cmt may be undef
- my $newest = $stk ? $stk->{latest_cmt} : undef;
- if ($newest) {
+ # $newest may be undef
+ my $newest = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+ if (defined($newest)) {
my $cur = $self->{mm}->last_commit || '';
if (need_update($self, $cur, $newest)) {
$self->{mm}->last_commit($newest);
}
- } else {
- ${$sync->{max}} = $self->{batch_bytes};
}
+ ${$sync->{max}} = $self->{batch_bytes};
$self->{mm}->{dbh}->commit;
- if ($newest && need_xapian($self)) {
- my $xdb = $self->{xdb};
+ my $xdb = need_xapian($self) ? $self->{xdb} : undef;
+ if ($newest && $xdb) {
my $cur = $xdb->get_metadata('last_commit');
if (need_update($self, $cur, $newest)) {
$xdb->set_metadata('last_commit', $newest);
}
-
+ }
+ if ($stk) { # all done if $stk is passed
# let SearchView know a full --reindex was done so it can
# generate ->has_threadid-dependent links
- if ($sync->{reindex} && !ref($sync->{reindex})) {
+ if ($xdb && $sync->{reindex} && !ref($sync->{reindex})) {
my $n = $xdb->get_metadata('has_threadid');
$xdb->set_metadata('has_threadid', '1') if $n ne '1';
}
+ $self->{oidx}->rethread_done($sync->{-opt}); # all done
}
-
- $self->{oidx}->rethread_done($sync->{-opt}) if $newest; # all done
commit_txn_lazy($self);
$sync->{ibx}->git->cleanup;
my $nr = ${$sync->{nr}};
@@ -697,21 +702,24 @@ sub process_stack {
$sync->{nr} = \$nr;
$sync->{max} = \$max;
$sync->{sidx} = $self;
+ $sync->{latest_cmt} = \(my $latest_cmt);
$self->{mm}->{dbh}->begin_work;
if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
for my $oid (@leftovers) {
$oid = unpack('H*', $oid);
- $git->cat_async($oid, \&unindex_both, $self);
+ $git->cat_async($oid, \&unindex_both, $sync);
}
}
if ($sync->{max_size} = $sync->{-opt}->{max_size}) {
$sync->{index_oid} = \&index_both;
}
- while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
+ while (my ($f, $at, $ct, $oid, $cur_cmt) = $stk->pop_rec) {
+ my $arg = { %$sync, cur_cmt => $cur_cmt };
if ($f eq 'm') {
- my $arg = { %$sync, autime => $at, cotime => $ct };
+ $arg->{autime} = $at;
+ $arg->{cotime} = $ct;
if ($sync->{max_size}) {
$git->check_async($oid, \&check_size, $arg);
} else {
@@ -719,7 +727,7 @@ sub process_stack {
}
v1_checkpoint($self, $sync) if $max <= 0;
} elsif ($f eq 'd') {
- $git->cat_async($oid, \&unindex_both, $self);
+ $git->cat_async($oid, \&unindex_both, $arg);
}
}
v1_checkpoint($self, $sync, $stk);
@@ -743,17 +751,17 @@ sub log2stack ($$$) {
my $fh = $git->popen(qw(log --raw -r --pretty=tformat:%at-%ct-%H
--no-notes --no-color --no-renames --no-abbrev),
$range);
- my ($at, $ct, $stk);
+ my ($at, $ct, $stk, $cmt);
while (<$fh>) {
if (/\A([0-9]+)-([0-9]+)-($OID)$/o) {
- ($at, $ct) = ($1 + 0, $2 + 0);
- $stk //= PublicInbox::IdxStack->new($3);
+ ($at, $ct, $cmt) = ($1 + 0, $2 + 0, $3);
+ $stk //= PublicInbox::IdxStack->new($cmt);
} elsif (/$del/) {
my $oid = $1;
if ($D) { # reindex case
$D->{pack('H*', $oid)}++;
} else { # non-reindex case:
- $stk->push_rec('d', $at, $ct, $oid);
+ $stk->push_rec('d', $at, $ct, $oid, $cmt);
}
} elsif (/$add/) {
my $oid = $1;
@@ -761,12 +769,10 @@ sub log2stack ($$$) {
my $oid_bin = pack('H*', $oid);
my $nr = --$D->{$oid_bin};
delete($D->{$oid_bin}) if $nr <= 0;
-
# nr < 0 (-1) means it never existed
- $stk->push_rec('m', $at, $ct, $oid) if $nr < 0;
- } else {
- $stk->push_rec('m', $at, $ct, $oid);
+ next if $nr >= 0;
}
+ $stk->push_rec('m', $at, $ct, $oid, $cmt);
}
}
close $fh or die "git log failed: \$?=$?";
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 18f33655..87b76501 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -861,6 +861,7 @@ sub reindex_checkpoint ($$) {
my ($self, $sync) = @_;
$self->git->async_wait_all;
+ $self->update_last_commit($sync);
${$sync->{need_checkpoint}} = 0;
my $mm_tmp = $sync->{mm_tmp};
$mm_tmp->atfork_prepare if $mm_tmp;
@@ -955,19 +956,22 @@ sub index_oid { # cat_async callback
if (do_idx($self, $bref, $eml, $smsg)) {
${$arg->{need_checkpoint}} = 1;
}
+ ${$arg->{latest_cmt}} = $arg->{cur_cmt} // die 'BUG: {cur_cmt} missing';
}
# only update last_commit for $i on reindex iff newer than current
-# $sync will be used by subclasses
sub update_last_commit {
- my ($self, $sync, $unit, $cmt) = @_;
+ my ($self, $sync, $stk) = @_;
+ my $unit = $sync->{unit} // return;
+ my $latest_cmt = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+ defined($latest_cmt) or return;
my $last = last_epoch_commit($self, $unit->{epoch});
- if (defined $last && is_ancestor($unit->{git}, $last, $cmt)) {
- my @cmd = (qw(rev-list --count), "$last..$cmt");
+ if (defined $last && is_ancestor($unit->{git}, $last, $latest_cmt)) {
+ my @cmd = (qw(rev-list --count), "$last..$latest_cmt");
chomp(my $n = $unit->{git}->qx(@cmd));
return if $n ne '' && $n == 0;
}
- last_epoch_commit($self, $unit->{epoch}, $cmt);
+ last_epoch_commit($self, $unit->{epoch}, $latest_cmt);
}
sub last_commits {
@@ -1245,8 +1249,16 @@ sub index_todo ($$$) {
$pfx //= $unit->{git}->{git_dir};
}
local $self->{current_info} = "$pfx ";
- while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
- my $req = { %$sync, autime => $at, cotime => $ct, oid => $oid };
+ local $sync->{latest_cmt} = \(my $latest_cmt);
+ local $sync->{unit} = $unit;
+ while (my ($f, $at, $ct, $oid, $cmt) = $stk->pop_rec) {
+ my $req = {
+ %$sync,
+ autime => $at,
+ cotime => $ct,
+ oid => $oid,
+ cur_cmt => $cmt
+ };
if ($f eq 'm') {
if ($sync->{max_size}) {
$all->check_async($oid, \&check_size, $req);
@@ -1261,7 +1273,7 @@ sub index_todo ($$$) {
}
}
$all->async_wait_all;
- $self->update_last_commit($sync, $unit, $stk->{latest_cmt});
+ $self->update_last_commit($sync, $stk);
}
sub xapian_only {
diff --git a/t/idx_stack.t b/t/idx_stack.t
index 35aff37b..e0474fa4 100644
--- a/t/idx_stack.t
+++ b/t/idx_stack.t
@@ -6,6 +6,8 @@ use Test::More;
use_ok 'PublicInbox::IdxStack';
my $oid_a = '03c21563cf15c241687966b5b2a3f37cdc193316';
my $oid_b = '963caad026055ab9bcbe3ee9550247f9d8840feb';
+my $cmt_a = 'df8e4a0612545d53672036641e9f076efc94c2f6';
+my $cmt_b = '3ba7c9fa4a083c439e768882c571c2026a981ca5';
my $stk = PublicInbox::IdxStack->new;
is($stk->read_prepare, $stk, 'nothing');
@@ -13,19 +15,19 @@ is($stk->num_records, 0, 'no records');
is($stk->pop_rec, undef, 'undef on empty');
$stk = PublicInbox::IdxStack->new;
-$stk->push_rec('m', 1234, 5678, $oid_a);
+$stk->push_rec('m', 1234, 5678, $oid_a, $cmt_a);
is($stk->read_prepare, $stk, 'read_prepare');
is($stk->num_records, 1, 'num_records');
-is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a], 'pop once');
+is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a, $cmt_a], 'pop once');
is($stk->pop_rec, undef, 'undef on empty');
$stk = PublicInbox::IdxStack->new;
-$stk->push_rec('m', 1234, 5678, $oid_a);
-$stk->push_rec('d', 1234, 5678, $oid_b);
+$stk->push_rec('m', 1234, 5678, $oid_a, $cmt_a);
+$stk->push_rec('d', 1234, 5678, $oid_b, $cmt_b);
is($stk->read_prepare, $stk, 'read_prepare');
is($stk->num_records, 2, 'num_records');
-is_deeply([$stk->pop_rec], ['d', 1234, 5678, $oid_b], 'pop');
-is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a], 'pop-pop');
+is_deeply([$stk->pop_rec], ['d', 1234, 5678, $oid_b, $cmt_b], 'pop');
+is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a, $cmt_a], 'pop-pop');
is($stk->pop_rec, undef, 'empty');
SKIP: {
@@ -37,11 +39,11 @@ SKIP: {
while (<$fh>) {
chomp;
my ($at, $ct, $H) = split(/\./);
- $stk //= PublicInbox::IdxStack->new($H);
+ $stk //= PublicInbox::IdxStack->new;
# not bothering to parse blobs here, just using commit OID
# as a blob OID since they're the same size + format
- $stk->push_rec('m', $at + 0, $ct + 0, $H);
- push(@expect, [ 'm', $at, $ct, $H ]);
+ $stk->push_rec('m', $at + 0, $ct + 0, $H, $H);
+ push(@expect, [ 'm', $at, $ct, $H, $H ]);
}
$stk or skip('nothing from git log', 3);
is($stk->read_prepare, $stk, 'read_prepare');
^ permalink raw reply related [flat|nested] 7+ messages in thread
* [PATCH 4/4] extindex: support graceful shutdown via QUIT/INT/TERM
2020-11-13 11:11 [PATCH 0/4] extindex: checkpoints, graceful shutdown, cleanups Eric Wong
` (2 preceding siblings ...)
2020-11-13 11:11 ` [PATCH 3/4] *index: discard sync->{todo} on iteration Eric Wong
@ 2020-11-13 11:11 ` Eric Wong
3 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2020-11-13 11:11 UTC (permalink / raw)
To: meta
Just like the daemon processes, -extindex now supports graceful
shutdown via the same signals. This lets users avoid having to
repeat indexing messages when a power outage strikes during a
long (multi-hour/day) indexing run.
Per-inbox (v1/v2) -index graceful shutdowns are not supported,
yet, but is planned for later.
---
lib/PublicInbox/ExtSearchIdx.pm | 7 ++++++-
lib/PublicInbox/IdxStack.pm | 2 ++
lib/PublicInbox/SearchIdxShard.pm | 6 ++++++
lib/PublicInbox/V2Writable.pm | 17 ++++++++++++++++-
4 files changed, 30 insertions(+), 2 deletions(-)
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 6c09c460..91434b26 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -329,13 +329,18 @@ sub eidx_sync { # main entry point
-regen_fmt => "%u/?\n",
};
local $SIG{USR1} = sub { $need_checkpoint = 1 };
+ my $quit = sub { $sync->{quit} = 1; warn "gracefully quitting\n"; };
+ local $SIG{QUIT} = $quit;
+ local $SIG{INT} = $quit;
+ local $SIG{TERM} = $quit;
# don't use $_ here, it'll get clobbered by reindex_checkpoint
for my $ibx (@{$self->{ibx_list}}) {
_sync_inbox($self, $sync, $ibx);
+ last if $sync->{quit};
}
- $self->{oidx}->rethread_done($opt);
+ $self->{oidx}->rethread_done($opt) unless $sync->{quit};
PublicInbox::V2Writable::done($self);
}
diff --git a/lib/PublicInbox/IdxStack.pm b/lib/PublicInbox/IdxStack.pm
index e7e10de9..c55c5c36 100644
--- a/lib/PublicInbox/IdxStack.pm
+++ b/lib/PublicInbox/IdxStack.pm
@@ -11,6 +11,8 @@ use constant PACK_FMT => eval { pack('Q', 1) } ? 'A1QQH*H*' : 'A1IIH*H*';
# start off in write-only mode
sub new {
open(my $io, '+>', undef) or die "open: $!";
+ # latest_cmt is still useful when the newest revision is a `d'(elete),
+ # otherwise we favor $sync->{latest_cmt} for checkpoints and {quit}
bless { wr => $io, latest_cmt => $_[1] }, __PACKAGE__
}
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
index 1333b305..875a9ec9 100644
--- a/lib/PublicInbox/SearchIdxShard.pm
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -10,6 +10,7 @@ use parent qw(PublicInbox::SearchIdx);
use bytes qw(length);
use IO::Handle (); # autoflush
use PublicInbox::Eml;
+use PublicInbox::Sigfd;
sub new {
my ($class, $v2w, $shard) = @_; # v2w may be ExtSearchIdx
@@ -29,9 +30,13 @@ sub spawn_worker {
my ($r, $w);
pipe($r, $w) or die "pipe failed: $!\n";
$w->autoflush(1);
+ my $oldset = PublicInbox::Sigfd::block_signals();
my $pid = fork;
defined $pid or die "fork failed: $!\n";
if ($pid == 0) {
+ # these signals are localized in parent
+ $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
+ PublicInbox::Sigfd::sig_setmask($oldset);
my $bnote = $v2w->atfork_child;
close $w or die "failed to close: $!";
@@ -44,6 +49,7 @@ sub spawn_worker {
die "unexpected MM $self->{mm}" if $self->{mm};
exit;
}
+ PublicInbox::Sigfd::sig_setmask($oldset);
$self->{pid} = $pid;
$self->{w} = $w;
close $r or die "failed to close: $!";
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 11cde627..5bac04a4 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -1090,6 +1090,7 @@ sub sync_prepare ($$) {
$unit->{stack} = $stk; # may be undef
unshift @{$sync->{todo}}, $unit;
$regen_max += $nr;
+ last if $sync->{quit};
}
# XXX this should not happen unless somebody bypasses checks in
@@ -1102,9 +1103,11 @@ sub sync_prepare ($$) {
$oid = unpack('H*', $oid);
my $req = { %$sync, oid => $oid };
$self->git->cat_async($oid, $unindex_oid, $req);
+ last if $sync->{quit};
}
$self->git->cat_async_wait;
}
+ return 0 if $sync->{quit};
if (!$regen_max) {
$sync->{-regen_fmt} = "%u/?\n";
return 0;
@@ -1236,6 +1239,7 @@ sub index_xap_step ($$$;$) {
sub index_todo ($$$) {
my ($self, $sync, $unit) = @_;
+ return if $sync->{quit};
unindex_todo($self, $sync, $unit);
my $stk = delete($unit->{stack}) or return;
my $all = $self->git;
@@ -1268,6 +1272,12 @@ sub index_todo ($$$) {
} elsif ($f eq 'd') {
$all->cat_async($oid, $unindex_oid, $req);
}
+ if ($sync->{quit}) {
+ warn "waiting to quit...\n";
+ $all->async_wait_all;
+ $self->update_last_commit($sync);
+ return;
+ }
if (${$sync->{need_checkpoint}}) {
reindex_checkpoint($self, $sync);
}
@@ -1334,6 +1344,11 @@ sub index_sync {
ibx => $self->{ibx},
epoch_max => $epoch_max,
};
+ my $quit = sub { $sync->{quit} = 1 };
+ local $SIG{QUIT} = $quit;
+ local $SIG{INT} = $quit;
+ local $SIG{TERM} = $quit;
+
if (sync_prepare($self, $sync)) {
# tmp_clone seems to fail if inside a transaction, so
# we rollback here (because we opened {mm} for reading)
@@ -1352,7 +1367,7 @@ sub index_sync {
}
# work forwards through history
index_todo($self, $sync, $_) for @{delete($sync->{todo}) // []};
- $self->{oidx}->rethread_done($opt);
+ $self->{oidx}->rethread_done($opt) unless $sync->{quit};
$self->done;
if (my $nr = $sync->{nr}) {
^ permalink raw reply related [flat|nested] 7+ messages in thread