unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 3/3] searchidx: consolidate checkpoint accounting
Date: Thu, 12 Dec 2024 10:10:40 +0000	[thread overview]
Message-ID: <20241212101040.1109774-4-e@80x24.org> (raw)
In-Reply-To: <20241212101040.1109774-1-e@80x24.org>

We can eliminate check_batch_limit() and checkpoint_due() from
extsearchidx in favor of update_checkpoint() in searchidx.  We
can also get rid of the awkward scalar deref for setting the
{need_checkpoint} field.

The only behavioral difference is the checkpoint interval is
standardized to 5s and -extindex no longer uses 10s for its
checkpoints.  In retrospect, 5s should work more nicely for
public-facing indices since they spend less time waiting on
writers, but it has the downside of potentially hurting writer
performance.

This is another step in the gradual shift away from the $sync
arg in favor of `local $self->{...}'.
---
 lib/PublicInbox/ExtSearchIdx.pm | 43 ++++++++++-----------------------
 lib/PublicInbox/SearchIdx.pm    | 19 ++++++++-------
 lib/PublicInbox/V2Writable.pm   | 16 ++++--------
 3 files changed, 28 insertions(+), 50 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index a172ba62..52d7c3b1 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -122,13 +122,6 @@ sub attach_config {
 	$cfg->each_inbox(\&_ibx_attach, $self, $types);
 }
 
-sub check_batch_limit ($) {
-	my ($req) = @_;
-	# set flag for PublicInbox::V2Writable::index_todo:
-	update_checkpoint $req->{self}, $req->{new_smsg}->{bytes} and
-		${$req->{need_checkpoint}} = 1;
-}
-
 sub bad_ibx_id ($$;$) {
 	my ($self, $ibx_id, $cb) = @_;
 	my $msg = "E: bad/stale ibx_id=#$ibx_id encountered";
@@ -262,7 +255,7 @@ sub index_unseen ($) {
 	$self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key);
 	$new_smsg->{eidx_key} = $ibx->eidx_key;
 	$idx->index_eml($eml, $new_smsg);
-	check_batch_limit($req);
+	update_checkpoint $self, $new_smsg->{bytes};
 }
 
 sub do_finalize ($) {
@@ -450,7 +443,7 @@ EOM
 			$oid = unpack('H*', $oid);
 			$r = $r ? 'unref' : 'remove';
 			warn "# $r #$docid $eidx_key $oid\n";
-			if (checkpoint_due($sync)) {
+			if (update_checkpoint $self) {
 				$x3_doc = $ibx_ck = undef;
 				reindex_checkpoint($self, $sync);
 				goto restart;
@@ -483,20 +476,20 @@ sub eidx_gc_scan_shards ($$) { # TODO: use for lei/store
 DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over)
 
 	warn "# eliminated $nr stale xref3 entries\n" if $nr != 0;
-	reindex_checkpoint($self, $sync) if checkpoint_due($sync);
+	reindex_checkpoint($self, $sync) if update_checkpoint $self;
 
 	# fixup from old bugs:
 	$nr = $self->{oidx}->dbh->do(<<'');
 DELETE FROM over WHERE num > 0 AND num NOT IN (SELECT docid FROM xref3)
 
 	warn "# eliminated $nr stale over entries\n" if $nr != 0;
-	reindex_checkpoint($self, $sync) if checkpoint_due($sync);
+	reindex_checkpoint($self, $sync) if update_checkpoint $self;
 
 	$nr = $self->{oidx}->dbh->do(<<'');
 DELETE FROM eidxq WHERE docid NOT IN (SELECT num FROM over)
 
 	warn "# eliminated $nr stale reindex queue entries\n" if $nr != 0;
-	reindex_checkpoint($self, $sync) if checkpoint_due($sync);
+	reindex_checkpoint($self, $sync) if update_checkpoint $self;
 
 	my ($cur) = $self->{oidx}->dbh->selectrow_array(<<EOM);
 SELECT MIN(num) FROM over WHERE num > 0
@@ -517,7 +510,7 @@ SELECT num FROM over WHERE num >= ? ORDER BY num ASC LIMIT 10000
 			}
 			$cur = $n + 1;
 		}
-		if (checkpoint_due($sync)) {
+		if (update_checkpoint $self) {
 			for my $idx (values %active_shards) {
 				$nr += $idx->ipc_do('nr_quiet_rm')
 			}
@@ -533,10 +526,8 @@ sub eidx_gc { # top-level entry point
 	$self->{cfg} or die "E: GC requires ->attach_config\n";
 	$opt->{-idx_gc} = 1;
 	local $self->{checkpoint_unlocks} = 1;
+	local $self->{need_checkpoint} = 0;
 	my $sync = {
-		need_checkpoint => \(my $need_checkpoint),
-		check_intvl => 10,
-		next_check => now() + 10,
 		-opt => $opt,
 		self => $self,
 	};
@@ -585,7 +576,7 @@ sub _reindex_finalize ($$$) {
 	my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
 	my $docid = $smsg->{num} = $orig_smsg->{num};
 	$self->{oidx}->add_overview($eml, $smsg); # may rethread
-	check_batch_limit({ %$sync, new_smsg => $smsg });
+	update_checkpoint $self, $smsg->{bytes};
 	my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
 	my $stable = delete($by_chash->{$chash0}) //
 				die "BUG: $smsg->{blob} chash missing";
@@ -699,11 +690,6 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
 	$self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
 }
 
-sub checkpoint_due ($) {
-	my ($sync) = @_;
-	${$sync->{need_checkpoint}} || (now() > $sync->{next_check});
-}
-
 sub host_ident () {
 	# I've copied FS images and only changed the hostname before,
 	# so prepend hostname.  Use `state' since these a BOFH can change
@@ -830,7 +816,7 @@ restart:
 		$del->execute($docid);
 		++${$sync->{nr}};
 
-		if (checkpoint_due($sync)) {
+		if (update_checkpoint $self) {
 			$dbh = $del = $iter = undef;
 			reindex_checkpoint($self, $sync); # release lock
 			$dbh = $self->{oidx}->dbh;
@@ -922,9 +908,8 @@ sub _reindex_check_ibx ($$$) {
 		$beg = $msgs->[-1]->{num} + 1;
 		$end = $beg + $slice;
 		$end = $max if $end > $max;
-		if (checkpoint_due($sync)) {
+		update_checkpoint $self and
 			reindex_checkpoint($self, $sync); # release lock
-		}
 		($lo, $hi) = ($msgs->[0]->{num}, $msgs->[-1]->{num});
 		$usr //= _unref_stale_range($sync, $ibx, "xnum < $lo");
 		my $x3a = $self->{oidx}->dbh->selectall_arrayref(
@@ -1107,7 +1092,7 @@ EOS
 		# Message-IDs.
 		$self->git->async_wait_all;
 
-		if (checkpoint_due($sync)) {
+		if (update_checkpoint $self) {
 			undef $iter;
 			reindex_checkpoint($self, $sync);
 			goto dedupe_restart;
@@ -1133,10 +1118,8 @@ sub eidx_sync { # main entry point
 	};
 	$self->idx_init($opt); # acquire lock via V2Writable::_idx_init
 	$self->{oidx}->rethread_prepare($opt);
+	local $self->{need_checkpoint} = 0;
 	my $sync = {
-		need_checkpoint => \(my $need_checkpoint),
-		check_intvl => 10,
-		next_check => now() + 10,
 		-opt => $opt,
 		# DO NOT SET {reindex} here, it's incompatible with reused
 		# V2Writable code, reindex is totally different here
@@ -1144,7 +1127,7 @@ sub eidx_sync { # main entry point
 		self => $self,
 		-regen_fmt => "%u/?\n",
 	};
-	local $SIG{USR1} = sub { $need_checkpoint = 1 };
+	local $SIG{USR1} = sub { $self->{need_checkpoint} = 1 };
 	my $quit = PublicInbox::SearchIdx::quit_cb($sync);
 	local $SIG{QUIT} = $quit;
 	local $SIG{INT} = $quit;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 0d9acd20..8ac8cac3 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -787,13 +787,15 @@ sub is_bad_blob ($$$$) {
 	$size == 0 ? 1 : 0; # size == 0 means purged
 }
 
-sub update_checkpoint ($$) {
+# returns true if checkpoint is needed
+sub update_checkpoint ($;$) {
 	my ($self, $bytes) = @_;
-	($self->{transact_bytes} += $bytes) >= $self->{batch_bytes} and
-		return 1;
+	my $nr = $self->{transact_bytes} += $bytes // 0;
+	$self->{need_checkpoint} // return; # must be defined via local
+	return ++$self->{need_checkpoint} if $nr >= $self->{batch_bytes};
 	my $now = now;
 	my $next = $self->{next_checkpoint} //= $now + $CHECKPOINT_INTVL;
-	$now > $next;
+	$self->{need_checkpoint} += ($now > $next ? 1 : 0);
 }
 
 sub index_both { # git->cat_async callback
@@ -803,7 +805,7 @@ sub index_both { # git->cat_async callback
 	my $smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
 	$smsg->set_bytes($$bref, $size);
 	my $self = $sync->{sidx};
-	${$sync->{need_checkpoint}} = 1 if update_checkpoint $self, $smsg->{bytes};
+	update_checkpoint $self, $smsg->{bytes};
 	local $self->{current_info} = "$self->{current_info}: $oid";
 	my $eml = PublicInbox::Eml->new($bref);
 	$smsg->{num} = index_mm($self, $eml, $oid, $sync) or
@@ -860,7 +862,7 @@ sub check_size { # check_async cb for -index --max-size=...
 sub v1_checkpoint ($$;$) {
 	my ($self, $sync, $stk) = @_;
 	$self->{ibx}->git->async_wait_all;
-	${$sync->{need_checkpoint}} = undef;
+	$self->{need_checkpoint} = 0;
 
 	# $newest may be undef
 	my $newest = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
@@ -910,7 +912,7 @@ sub process_stack {
 	my $nr = 0;
 	$sync->{nr} = \$nr;
 	$sync->{sidx} = $self;
-	$sync->{need_checkpoint} = \(my $need_ckpt);
+	local $self->{need_checkpoint} = 0;
 	$sync->{latest_cmt} = \(my $latest_cmt);
 
 	$self->{mm}->{dbh}->begin_work;
@@ -940,8 +942,7 @@ sub process_stack {
 		} elsif ($f eq 'd') {
 			$git->cat_async($oid, \&unindex_both, $arg);
 		}
-		${$sync->{need_checkpoint}} and
-			v1_checkpoint $self, $sync;
+		v1_checkpoint $self, $sync if $self->{need_checkpoint};
 	}
 	v1_checkpoint($self, $sync, $sync->{quit} ? undef : $stk);
 }
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 87118ec4..15945b35 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -22,7 +22,7 @@ use PublicInbox::Spawn qw(spawn popen_rd run_die);
 use PublicInbox::Search;
 use PublicInbox::SearchIdx qw(log2stack is_ancestor check_size is_bad_blob
 	update_checkpoint);
-use PublicInbox::DS qw(now);
+use PublicInbox::DS;
 use IO::Handle; # ->autoflush
 use POSIX ();
 use Carp qw(confess);
@@ -703,7 +703,7 @@ sub reindex_checkpoint ($$) {
 
 	$self->git->async_wait_all;
 	$self->update_last_commit($sync);
-	${$sync->{need_checkpoint}} = 0;
+	$self->{need_checkpoint} = 0;
 	my $mm_tmp = $sync->{mm_tmp};
 	$mm_tmp->atfork_prepare if $mm_tmp;
 	die 'BUG: {im} during reindex' if $self->{im};
@@ -719,9 +719,6 @@ sub reindex_checkpoint ($$) {
 
 	# allow -watch or -mda to write...
 	$self->idx_init($sync->{-opt}); # reacquire lock
-	if (my $intvl = $sync->{check_intvl}) { # eidx
-		$sync->{next_check} = now + $intvl;
-	}
 	$mm_tmp->atfork_parent if $mm_tmp;
 }
 
@@ -817,7 +814,7 @@ sub index_oid { # cat_async callback
 	}, 'PublicInbox::Smsg';
 	$smsg->populate($eml, $arg);
 	$smsg->set_bytes($$bref, $size);
-	${$arg->{need_checkpoint}} = 1 if do_idx $self, $eml, $smsg;
+	do_idx $self, $eml, $smsg;
 	index_finalize($arg, 1);
 }
 
@@ -1169,9 +1166,7 @@ sub index_todo ($$$) {
 		} elsif ($f eq 'd') {
 			$all->cat_async($oid, $unindex_oid, $req);
 		}
-		if (${$sync->{need_checkpoint}}) {
-			reindex_checkpoint($self, $sync);
-		}
+		reindex_checkpoint($self, $sync) if $self->{need_checkpoint};
 	}
 	$all->async_wait_all;
 	$self->update_last_commit($sync, $stk);
@@ -1185,7 +1180,6 @@ sub xapian_only {
 	$self->idx_init($opt); # acquire lock
 	if (my $art_end = $self->{ibx}->mm->max) {
 		$sync //= {
-			need_checkpoint => \(my $bool = 0),
 			-opt => $opt,
 			self => $self,
 			nr => \(my $nr = 0),
@@ -1214,6 +1208,7 @@ sub xapian_only {
 sub index_sync {
 	my ($self, $opt) = @_;
 	$opt //= {};
+	local $self->{need_checkpoint} = 0;
 	return xapian_only($self, $opt) if $opt->{xapian_only};
 
 	my $epoch_max;
@@ -1238,7 +1233,6 @@ sub index_sync {
 	$self->{mg}->fill_alternates;
 	$self->{oidx}->rethread_prepare($opt);
 	my $sync = {
-		need_checkpoint => \(my $bool = 0),
 		reindex => $opt->{reindex},
 		-opt => $opt,
 		self => $self,

      parent reply	other threads:[~2024-12-12 10:10 UTC|newest]

Thread overview: 4+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-12-12 10:10 [PATCH 0/3] indexing cleanups Eric Wong
2024-12-12 10:10 ` [PATCH 1/3] searchidx: update_checkpoint: take bytes arg directly Eric Wong
2024-12-12 10:10 ` [PATCH 2/3] extindex: move {checkpoint_unlocks} to $self Eric Wong
2024-12-12 10:10 ` Eric Wong [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20241212101040.1109774-4-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).