* [PATCH 3/3] searchidx: consolidate checkpoint accounting
2024-12-12 10:10 [PATCH 0/3] indexing cleanups Eric Wong
2024-12-12 10:10 ` [PATCH 1/3] searchidx: update_checkpoint: take bytes arg directly Eric Wong
2024-12-12 10:10 ` [PATCH 2/3] extindex: move {checkpoint_unlocks} to $self Eric Wong
@ 2024-12-12 10:10 ` Eric Wong
2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2024-12-12 10:10 UTC (permalink / raw)
To: meta
We can eliminate check_batch_limit() and checkpoint_due() from
extsearchidx in favor of update_checkpoint() in searchidx. We
can also get rid of the awkward scalar deref for setting the
{need_checkpoint} field.
The only behavioral difference is the checkpoint interval is
standardized to 5s and -extindex no longer uses 10s for its
checkpoints. In retrospect, 5s should work more nicely for
public-facing indices since they spend less time waiting on
writers, but it has the downside of potentially hurting writer
performance.
This is another step in the gradual shift away from the $sync
arg in favor of `local $self->{...}'.
---
lib/PublicInbox/ExtSearchIdx.pm | 43 ++++++++++-----------------------
lib/PublicInbox/SearchIdx.pm | 19 ++++++++-------
lib/PublicInbox/V2Writable.pm | 16 ++++--------
3 files changed, 28 insertions(+), 50 deletions(-)
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index a172ba62..52d7c3b1 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -122,13 +122,6 @@ sub attach_config {
$cfg->each_inbox(\&_ibx_attach, $self, $types);
}
-sub check_batch_limit ($) {
- my ($req) = @_;
- # set flag for PublicInbox::V2Writable::index_todo:
- update_checkpoint $req->{self}, $req->{new_smsg}->{bytes} and
- ${$req->{need_checkpoint}} = 1;
-}
-
sub bad_ibx_id ($$;$) {
my ($self, $ibx_id, $cb) = @_;
my $msg = "E: bad/stale ibx_id=#$ibx_id encountered";
@@ -262,7 +255,7 @@ sub index_unseen ($) {
$self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key);
$new_smsg->{eidx_key} = $ibx->eidx_key;
$idx->index_eml($eml, $new_smsg);
- check_batch_limit($req);
+ update_checkpoint $self, $new_smsg->{bytes};
}
sub do_finalize ($) {
@@ -450,7 +443,7 @@ EOM
$oid = unpack('H*', $oid);
$r = $r ? 'unref' : 'remove';
warn "# $r #$docid $eidx_key $oid\n";
- if (checkpoint_due($sync)) {
+ if (update_checkpoint $self) {
$x3_doc = $ibx_ck = undef;
reindex_checkpoint($self, $sync);
goto restart;
@@ -483,20 +476,20 @@ sub eidx_gc_scan_shards ($$) { # TODO: use for lei/store
DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over)
warn "# eliminated $nr stale xref3 entries\n" if $nr != 0;
- reindex_checkpoint($self, $sync) if checkpoint_due($sync);
+ reindex_checkpoint($self, $sync) if update_checkpoint $self;
# fixup from old bugs:
$nr = $self->{oidx}->dbh->do(<<'');
DELETE FROM over WHERE num > 0 AND num NOT IN (SELECT docid FROM xref3)
warn "# eliminated $nr stale over entries\n" if $nr != 0;
- reindex_checkpoint($self, $sync) if checkpoint_due($sync);
+ reindex_checkpoint($self, $sync) if update_checkpoint $self;
$nr = $self->{oidx}->dbh->do(<<'');
DELETE FROM eidxq WHERE docid NOT IN (SELECT num FROM over)
warn "# eliminated $nr stale reindex queue entries\n" if $nr != 0;
- reindex_checkpoint($self, $sync) if checkpoint_due($sync);
+ reindex_checkpoint($self, $sync) if update_checkpoint $self;
my ($cur) = $self->{oidx}->dbh->selectrow_array(<<EOM);
SELECT MIN(num) FROM over WHERE num > 0
@@ -517,7 +510,7 @@ SELECT num FROM over WHERE num >= ? ORDER BY num ASC LIMIT 10000
}
$cur = $n + 1;
}
- if (checkpoint_due($sync)) {
+ if (update_checkpoint $self) {
for my $idx (values %active_shards) {
$nr += $idx->ipc_do('nr_quiet_rm')
}
@@ -533,10 +526,8 @@ sub eidx_gc { # top-level entry point
$self->{cfg} or die "E: GC requires ->attach_config\n";
$opt->{-idx_gc} = 1;
local $self->{checkpoint_unlocks} = 1;
+ local $self->{need_checkpoint} = 0;
my $sync = {
- need_checkpoint => \(my $need_checkpoint),
- check_intvl => 10,
- next_check => now() + 10,
-opt => $opt,
self => $self,
};
@@ -585,7 +576,7 @@ sub _reindex_finalize ($$$) {
my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
my $docid = $smsg->{num} = $orig_smsg->{num};
$self->{oidx}->add_overview($eml, $smsg); # may rethread
- check_batch_limit({ %$sync, new_smsg => $smsg });
+ update_checkpoint $self, $smsg->{bytes};
my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
my $stable = delete($by_chash->{$chash0}) //
die "BUG: $smsg->{blob} chash missing";
@@ -699,11 +690,6 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
$self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
}
-sub checkpoint_due ($) {
- my ($sync) = @_;
- ${$sync->{need_checkpoint}} || (now() > $sync->{next_check});
-}
-
sub host_ident () {
# I've copied FS images and only changed the hostname before,
# so prepend hostname. Use `state' since these a BOFH can change
@@ -830,7 +816,7 @@ restart:
$del->execute($docid);
++${$sync->{nr}};
- if (checkpoint_due($sync)) {
+ if (update_checkpoint $self) {
$dbh = $del = $iter = undef;
reindex_checkpoint($self, $sync); # release lock
$dbh = $self->{oidx}->dbh;
@@ -922,9 +908,8 @@ sub _reindex_check_ibx ($$$) {
$beg = $msgs->[-1]->{num} + 1;
$end = $beg + $slice;
$end = $max if $end > $max;
- if (checkpoint_due($sync)) {
+ update_checkpoint $self and
reindex_checkpoint($self, $sync); # release lock
- }
($lo, $hi) = ($msgs->[0]->{num}, $msgs->[-1]->{num});
$usr //= _unref_stale_range($sync, $ibx, "xnum < $lo");
my $x3a = $self->{oidx}->dbh->selectall_arrayref(
@@ -1107,7 +1092,7 @@ EOS
# Message-IDs.
$self->git->async_wait_all;
- if (checkpoint_due($sync)) {
+ if (update_checkpoint $self) {
undef $iter;
reindex_checkpoint($self, $sync);
goto dedupe_restart;
@@ -1133,10 +1118,8 @@ sub eidx_sync { # main entry point
};
$self->idx_init($opt); # acquire lock via V2Writable::_idx_init
$self->{oidx}->rethread_prepare($opt);
+ local $self->{need_checkpoint} = 0;
my $sync = {
- need_checkpoint => \(my $need_checkpoint),
- check_intvl => 10,
- next_check => now() + 10,
-opt => $opt,
# DO NOT SET {reindex} here, it's incompatible with reused
# V2Writable code, reindex is totally different here
@@ -1144,7 +1127,7 @@ sub eidx_sync { # main entry point
self => $self,
-regen_fmt => "%u/?\n",
};
- local $SIG{USR1} = sub { $need_checkpoint = 1 };
+ local $SIG{USR1} = sub { $self->{need_checkpoint} = 1 };
my $quit = PublicInbox::SearchIdx::quit_cb($sync);
local $SIG{QUIT} = $quit;
local $SIG{INT} = $quit;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 0d9acd20..8ac8cac3 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -787,13 +787,15 @@ sub is_bad_blob ($$$$) {
$size == 0 ? 1 : 0; # size == 0 means purged
}
-sub update_checkpoint ($$) {
+# returns true if checkpoint is needed
+sub update_checkpoint ($;$) {
my ($self, $bytes) = @_;
- ($self->{transact_bytes} += $bytes) >= $self->{batch_bytes} and
- return 1;
+ my $nr = $self->{transact_bytes} += $bytes // 0;
+ $self->{need_checkpoint} // return; # must be defined via local
+ return ++$self->{need_checkpoint} if $nr >= $self->{batch_bytes};
my $now = now;
my $next = $self->{next_checkpoint} //= $now + $CHECKPOINT_INTVL;
- $now > $next;
+ $self->{need_checkpoint} += ($now > $next ? 1 : 0);
}
sub index_both { # git->cat_async callback
@@ -803,7 +805,7 @@ sub index_both { # git->cat_async callback
my $smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
$smsg->set_bytes($$bref, $size);
my $self = $sync->{sidx};
- ${$sync->{need_checkpoint}} = 1 if update_checkpoint $self, $smsg->{bytes};
+ update_checkpoint $self, $smsg->{bytes};
local $self->{current_info} = "$self->{current_info}: $oid";
my $eml = PublicInbox::Eml->new($bref);
$smsg->{num} = index_mm($self, $eml, $oid, $sync) or
@@ -860,7 +862,7 @@ sub check_size { # check_async cb for -index --max-size=...
sub v1_checkpoint ($$;$) {
my ($self, $sync, $stk) = @_;
$self->{ibx}->git->async_wait_all;
- ${$sync->{need_checkpoint}} = undef;
+ $self->{need_checkpoint} = 0;
# $newest may be undef
my $newest = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
@@ -910,7 +912,7 @@ sub process_stack {
my $nr = 0;
$sync->{nr} = \$nr;
$sync->{sidx} = $self;
- $sync->{need_checkpoint} = \(my $need_ckpt);
+ local $self->{need_checkpoint} = 0;
$sync->{latest_cmt} = \(my $latest_cmt);
$self->{mm}->{dbh}->begin_work;
@@ -940,8 +942,7 @@ sub process_stack {
} elsif ($f eq 'd') {
$git->cat_async($oid, \&unindex_both, $arg);
}
- ${$sync->{need_checkpoint}} and
- v1_checkpoint $self, $sync;
+ v1_checkpoint $self, $sync if $self->{need_checkpoint};
}
v1_checkpoint($self, $sync, $sync->{quit} ? undef : $stk);
}
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 87118ec4..15945b35 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -22,7 +22,7 @@ use PublicInbox::Spawn qw(spawn popen_rd run_die);
use PublicInbox::Search;
use PublicInbox::SearchIdx qw(log2stack is_ancestor check_size is_bad_blob
update_checkpoint);
-use PublicInbox::DS qw(now);
+use PublicInbox::DS;
use IO::Handle; # ->autoflush
use POSIX ();
use Carp qw(confess);
@@ -703,7 +703,7 @@ sub reindex_checkpoint ($$) {
$self->git->async_wait_all;
$self->update_last_commit($sync);
- ${$sync->{need_checkpoint}} = 0;
+ $self->{need_checkpoint} = 0;
my $mm_tmp = $sync->{mm_tmp};
$mm_tmp->atfork_prepare if $mm_tmp;
die 'BUG: {im} during reindex' if $self->{im};
@@ -719,9 +719,6 @@ sub reindex_checkpoint ($$) {
# allow -watch or -mda to write...
$self->idx_init($sync->{-opt}); # reacquire lock
- if (my $intvl = $sync->{check_intvl}) { # eidx
- $sync->{next_check} = now + $intvl;
- }
$mm_tmp->atfork_parent if $mm_tmp;
}
@@ -817,7 +814,7 @@ sub index_oid { # cat_async callback
}, 'PublicInbox::Smsg';
$smsg->populate($eml, $arg);
$smsg->set_bytes($$bref, $size);
- ${$arg->{need_checkpoint}} = 1 if do_idx $self, $eml, $smsg;
+ do_idx $self, $eml, $smsg;
index_finalize($arg, 1);
}
@@ -1169,9 +1166,7 @@ sub index_todo ($$$) {
} elsif ($f eq 'd') {
$all->cat_async($oid, $unindex_oid, $req);
}
- if (${$sync->{need_checkpoint}}) {
- reindex_checkpoint($self, $sync);
- }
+ reindex_checkpoint($self, $sync) if $self->{need_checkpoint};
}
$all->async_wait_all;
$self->update_last_commit($sync, $stk);
@@ -1185,7 +1180,6 @@ sub xapian_only {
$self->idx_init($opt); # acquire lock
if (my $art_end = $self->{ibx}->mm->max) {
$sync //= {
- need_checkpoint => \(my $bool = 0),
-opt => $opt,
self => $self,
nr => \(my $nr = 0),
@@ -1214,6 +1208,7 @@ sub xapian_only {
sub index_sync {
my ($self, $opt) = @_;
$opt //= {};
+ local $self->{need_checkpoint} = 0;
return xapian_only($self, $opt) if $opt->{xapian_only};
my $epoch_max;
@@ -1238,7 +1233,6 @@ sub index_sync {
$self->{mg}->fill_alternates;
$self->{oidx}->rethread_prepare($opt);
my $sync = {
- need_checkpoint => \(my $bool = 0),
reindex => $opt->{reindex},
-opt => $opt,
self => $self,
^ permalink raw reply related [flat|nested] 4+ messages in thread