unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
* [PATCH 0/3] indexing cleanups
@ 2024-12-12 10:10 Eric Wong
  2024-12-12 10:10 ` [PATCH 1/3] searchidx: update_checkpoint: take bytes arg directly Eric Wong
                   ` (2 more replies)
  0 siblings, 3 replies; 4+ messages in thread
From: Eric Wong @ 2024-12-12 10:10 UTC (permalink / raw)
  To: meta

A small step to share more code between v1, v2, and extindex...,

Eric Wong (3):
  searchidx: update_checkpoint: take bytes arg directly
  extindex: move {checkpoint_unlocks} to $self
  searchidx: consolidate checkpoint accounting

 lib/PublicInbox/ExtSearchIdx.pm | 51 +++++++++++----------------------
 lib/PublicInbox/SearchIdx.pm    | 21 +++++++-------
 lib/PublicInbox/V2Writable.pm   | 20 +++++--------
 3 files changed, 35 insertions(+), 57 deletions(-)

Diffstat looks good

^ permalink raw reply	[flat|nested] 4+ messages in thread

* [PATCH 1/3] searchidx: update_checkpoint: take bytes arg directly
  2024-12-12 10:10 [PATCH 0/3] indexing cleanups Eric Wong
@ 2024-12-12 10:10 ` Eric Wong
  2024-12-12 10:10 ` [PATCH 2/3] extindex: move {checkpoint_unlocks} to $self Eric Wong
  2024-12-12 10:10 ` [PATCH 3/3] searchidx: consolidate checkpoint accounting Eric Wong
  2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2024-12-12 10:10 UTC (permalink / raw)
  To: meta

Passing $smsg limits flexibility in case we reuse it for deletes
and/or commit search.
---
 lib/PublicInbox/ExtSearchIdx.pm | 2 +-
 lib/PublicInbox/SearchIdx.pm    | 6 +++---
 lib/PublicInbox/V2Writable.pm   | 2 +-
 3 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index fe2f5d2e..1c2a9a26 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -125,7 +125,7 @@ sub attach_config {
 sub check_batch_limit ($) {
 	my ($req) = @_;
 	# set flag for PublicInbox::V2Writable::index_todo:
-	update_checkpoint $req->{self}, $req->{new_smsg} and
+	update_checkpoint $req->{self}, $req->{new_smsg}->{bytes} and
 		${$req->{need_checkpoint}} = 1;
 }
 
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 3a85f552..0d9acd20 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -788,8 +788,8 @@ sub is_bad_blob ($$$$) {
 }
 
 sub update_checkpoint ($$) {
-	my ($self, $smsg) = @_;
-	($self->{transact_bytes} += $smsg->{bytes}) >= $self->{batch_bytes} and
+	my ($self, $bytes) = @_;
+	($self->{transact_bytes} += $bytes) >= $self->{batch_bytes} and
 		return 1;
 	my $now = now;
 	my $next = $self->{next_checkpoint} //= $now + $CHECKPOINT_INTVL;
@@ -803,7 +803,7 @@ sub index_both { # git->cat_async callback
 	my $smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
 	$smsg->set_bytes($$bref, $size);
 	my $self = $sync->{sidx};
-	${$sync->{need_checkpoint}} = 1 if update_checkpoint $self, $smsg;
+	${$sync->{need_checkpoint}} = 1 if update_checkpoint $self, $smsg->{bytes};
 	local $self->{current_info} = "$self->{current_info}: $oid";
 	my $eml = PublicInbox::Eml->new($bref);
 	$smsg->{num} = index_mm($self, $eml, $oid, $sync) or
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 5f3bfde5..c894b648 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -107,7 +107,7 @@ sub do_idx ($$$) {
 		my $idx = idx_shard($self, $smsg->{num});
 		$idx->index_eml($eml, $smsg);
 	}
-	update_checkpoint $self, $smsg;
+	update_checkpoint $self, $smsg->{bytes};
 }
 
 # returns undef on duplicate or spam

^ permalink raw reply related	[flat|nested] 4+ messages in thread

* [PATCH 2/3] extindex: move {checkpoint_unlocks} to $self
  2024-12-12 10:10 [PATCH 0/3] indexing cleanups Eric Wong
  2024-12-12 10:10 ` [PATCH 1/3] searchidx: update_checkpoint: take bytes arg directly Eric Wong
@ 2024-12-12 10:10 ` Eric Wong
  2024-12-12 10:10 ` [PATCH 3/3] searchidx: consolidate checkpoint accounting Eric Wong
  2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2024-12-12 10:10 UTC (permalink / raw)
  To: meta

One small step towards eliminating the $sync structure.
---
 lib/PublicInbox/ExtSearchIdx.pm | 8 ++++----
 lib/PublicInbox/V2Writable.pm   | 2 +-
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 1c2a9a26..a172ba62 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -528,15 +528,15 @@ SELECT num FROM over WHERE num >= ? ORDER BY num ASC LIMIT 10000
 	warn "# eliminated $nr stale Xapian documents\n" if $nr != 0;
 }
 
-sub eidx_gc {
+sub eidx_gc { # top-level entry point
 	my ($self, $opt) = @_;
 	$self->{cfg} or die "E: GC requires ->attach_config\n";
 	$opt->{-idx_gc} = 1;
+	local $self->{checkpoint_unlocks} = 1;
 	my $sync = {
 		need_checkpoint => \(my $need_checkpoint),
 		check_intvl => 10,
 		next_check => now() + 10,
-		checkpoint_unlocks => 1,
 		-opt => $opt,
 		self => $self,
 	};
@@ -1159,11 +1159,11 @@ sub eidx_sync { # main entry point
 	}
 
 	if (my $msgids = delete($opt->{dedupe})) {
-		local $sync->{checkpoint_unlocks} = 1;
+		local $self->{checkpoint_unlocks} = 1;
 		eidx_dedupe($self, $sync, $msgids);
 	}
 	if (delete($opt->{reindex})) {
-		local $sync->{checkpoint_unlocks} = 1;
+		local $self->{checkpoint_unlocks} = 1;
 		eidx_reindex($self, $sync);
 	}
 
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index c894b648..87118ec4 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -707,7 +707,7 @@ sub reindex_checkpoint ($$) {
 	my $mm_tmp = $sync->{mm_tmp};
 	$mm_tmp->atfork_prepare if $mm_tmp;
 	die 'BUG: {im} during reindex' if $self->{im};
-	if ($self->{ibx_map} && !$sync->{checkpoint_unlocks}) {
+	if ($self->{ibx_map} && !$self->{checkpoint_unlocks}) {
 		checkpoint($self, 1); # no need to release lock on pure index
 	} else {
 		$self->done; # release lock

^ permalink raw reply related	[flat|nested] 4+ messages in thread

* [PATCH 3/3] searchidx: consolidate checkpoint accounting
  2024-12-12 10:10 [PATCH 0/3] indexing cleanups Eric Wong
  2024-12-12 10:10 ` [PATCH 1/3] searchidx: update_checkpoint: take bytes arg directly Eric Wong
  2024-12-12 10:10 ` [PATCH 2/3] extindex: move {checkpoint_unlocks} to $self Eric Wong
@ 2024-12-12 10:10 ` Eric Wong
  2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2024-12-12 10:10 UTC (permalink / raw)
  To: meta

We can eliminate check_batch_limit() and checkpoint_due() from
extsearchidx in favor of update_checkpoint() in searchidx.  We
can also get rid of the awkward scalar deref for setting the
{need_checkpoint} field.

The only behavioral difference is the checkpoint interval is
standardized to 5s and -extindex no longer uses 10s for its
checkpoints.  In retrospect, 5s should work more nicely for
public-facing indices since they spend less time waiting on
writers, but it has the downside of potentially hurting writer
performance.

This is another step in the gradual shift away from the $sync
arg in favor of `local $self->{...}'.
---
 lib/PublicInbox/ExtSearchIdx.pm | 43 ++++++++++-----------------------
 lib/PublicInbox/SearchIdx.pm    | 19 ++++++++-------
 lib/PublicInbox/V2Writable.pm   | 16 ++++--------
 3 files changed, 28 insertions(+), 50 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index a172ba62..52d7c3b1 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -122,13 +122,6 @@ sub attach_config {
 	$cfg->each_inbox(\&_ibx_attach, $self, $types);
 }
 
-sub check_batch_limit ($) {
-	my ($req) = @_;
-	# set flag for PublicInbox::V2Writable::index_todo:
-	update_checkpoint $req->{self}, $req->{new_smsg}->{bytes} and
-		${$req->{need_checkpoint}} = 1;
-}
-
 sub bad_ibx_id ($$;$) {
 	my ($self, $ibx_id, $cb) = @_;
 	my $msg = "E: bad/stale ibx_id=#$ibx_id encountered";
@@ -262,7 +255,7 @@ sub index_unseen ($) {
 	$self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key);
 	$new_smsg->{eidx_key} = $ibx->eidx_key;
 	$idx->index_eml($eml, $new_smsg);
-	check_batch_limit($req);
+	update_checkpoint $self, $new_smsg->{bytes};
 }
 
 sub do_finalize ($) {
@@ -450,7 +443,7 @@ EOM
 			$oid = unpack('H*', $oid);
 			$r = $r ? 'unref' : 'remove';
 			warn "# $r #$docid $eidx_key $oid\n";
-			if (checkpoint_due($sync)) {
+			if (update_checkpoint $self) {
 				$x3_doc = $ibx_ck = undef;
 				reindex_checkpoint($self, $sync);
 				goto restart;
@@ -483,20 +476,20 @@ sub eidx_gc_scan_shards ($$) { # TODO: use for lei/store
 DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over)
 
 	warn "# eliminated $nr stale xref3 entries\n" if $nr != 0;
-	reindex_checkpoint($self, $sync) if checkpoint_due($sync);
+	reindex_checkpoint($self, $sync) if update_checkpoint $self;
 
 	# fixup from old bugs:
 	$nr = $self->{oidx}->dbh->do(<<'');
 DELETE FROM over WHERE num > 0 AND num NOT IN (SELECT docid FROM xref3)
 
 	warn "# eliminated $nr stale over entries\n" if $nr != 0;
-	reindex_checkpoint($self, $sync) if checkpoint_due($sync);
+	reindex_checkpoint($self, $sync) if update_checkpoint $self;
 
 	$nr = $self->{oidx}->dbh->do(<<'');
 DELETE FROM eidxq WHERE docid NOT IN (SELECT num FROM over)
 
 	warn "# eliminated $nr stale reindex queue entries\n" if $nr != 0;
-	reindex_checkpoint($self, $sync) if checkpoint_due($sync);
+	reindex_checkpoint($self, $sync) if update_checkpoint $self;
 
 	my ($cur) = $self->{oidx}->dbh->selectrow_array(<<EOM);
 SELECT MIN(num) FROM over WHERE num > 0
@@ -517,7 +510,7 @@ SELECT num FROM over WHERE num >= ? ORDER BY num ASC LIMIT 10000
 			}
 			$cur = $n + 1;
 		}
-		if (checkpoint_due($sync)) {
+		if (update_checkpoint $self) {
 			for my $idx (values %active_shards) {
 				$nr += $idx->ipc_do('nr_quiet_rm')
 			}
@@ -533,10 +526,8 @@ sub eidx_gc { # top-level entry point
 	$self->{cfg} or die "E: GC requires ->attach_config\n";
 	$opt->{-idx_gc} = 1;
 	local $self->{checkpoint_unlocks} = 1;
+	local $self->{need_checkpoint} = 0;
 	my $sync = {
-		need_checkpoint => \(my $need_checkpoint),
-		check_intvl => 10,
-		next_check => now() + 10,
 		-opt => $opt,
 		self => $self,
 	};
@@ -585,7 +576,7 @@ sub _reindex_finalize ($$$) {
 	my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
 	my $docid = $smsg->{num} = $orig_smsg->{num};
 	$self->{oidx}->add_overview($eml, $smsg); # may rethread
-	check_batch_limit({ %$sync, new_smsg => $smsg });
+	update_checkpoint $self, $smsg->{bytes};
 	my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
 	my $stable = delete($by_chash->{$chash0}) //
 				die "BUG: $smsg->{blob} chash missing";
@@ -699,11 +690,6 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
 	$self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
 }
 
-sub checkpoint_due ($) {
-	my ($sync) = @_;
-	${$sync->{need_checkpoint}} || (now() > $sync->{next_check});
-}
-
 sub host_ident () {
 	# I've copied FS images and only changed the hostname before,
 	# so prepend hostname.  Use `state' since these a BOFH can change
@@ -830,7 +816,7 @@ restart:
 		$del->execute($docid);
 		++${$sync->{nr}};
 
-		if (checkpoint_due($sync)) {
+		if (update_checkpoint $self) {
 			$dbh = $del = $iter = undef;
 			reindex_checkpoint($self, $sync); # release lock
 			$dbh = $self->{oidx}->dbh;
@@ -922,9 +908,8 @@ sub _reindex_check_ibx ($$$) {
 		$beg = $msgs->[-1]->{num} + 1;
 		$end = $beg + $slice;
 		$end = $max if $end > $max;
-		if (checkpoint_due($sync)) {
+		update_checkpoint $self and
 			reindex_checkpoint($self, $sync); # release lock
-		}
 		($lo, $hi) = ($msgs->[0]->{num}, $msgs->[-1]->{num});
 		$usr //= _unref_stale_range($sync, $ibx, "xnum < $lo");
 		my $x3a = $self->{oidx}->dbh->selectall_arrayref(
@@ -1107,7 +1092,7 @@ EOS
 		# Message-IDs.
 		$self->git->async_wait_all;
 
-		if (checkpoint_due($sync)) {
+		if (update_checkpoint $self) {
 			undef $iter;
 			reindex_checkpoint($self, $sync);
 			goto dedupe_restart;
@@ -1133,10 +1118,8 @@ sub eidx_sync { # main entry point
 	};
 	$self->idx_init($opt); # acquire lock via V2Writable::_idx_init
 	$self->{oidx}->rethread_prepare($opt);
+	local $self->{need_checkpoint} = 0;
 	my $sync = {
-		need_checkpoint => \(my $need_checkpoint),
-		check_intvl => 10,
-		next_check => now() + 10,
 		-opt => $opt,
 		# DO NOT SET {reindex} here, it's incompatible with reused
 		# V2Writable code, reindex is totally different here
@@ -1144,7 +1127,7 @@ sub eidx_sync { # main entry point
 		self => $self,
 		-regen_fmt => "%u/?\n",
 	};
-	local $SIG{USR1} = sub { $need_checkpoint = 1 };
+	local $SIG{USR1} = sub { $self->{need_checkpoint} = 1 };
 	my $quit = PublicInbox::SearchIdx::quit_cb($sync);
 	local $SIG{QUIT} = $quit;
 	local $SIG{INT} = $quit;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 0d9acd20..8ac8cac3 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -787,13 +787,15 @@ sub is_bad_blob ($$$$) {
 	$size == 0 ? 1 : 0; # size == 0 means purged
 }
 
-sub update_checkpoint ($$) {
+# returns true if checkpoint is needed
+sub update_checkpoint ($;$) {
 	my ($self, $bytes) = @_;
-	($self->{transact_bytes} += $bytes) >= $self->{batch_bytes} and
-		return 1;
+	my $nr = $self->{transact_bytes} += $bytes // 0;
+	$self->{need_checkpoint} // return; # must be defined via local
+	return ++$self->{need_checkpoint} if $nr >= $self->{batch_bytes};
 	my $now = now;
 	my $next = $self->{next_checkpoint} //= $now + $CHECKPOINT_INTVL;
-	$now > $next;
+	$self->{need_checkpoint} += ($now > $next ? 1 : 0);
 }
 
 sub index_both { # git->cat_async callback
@@ -803,7 +805,7 @@ sub index_both { # git->cat_async callback
 	my $smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
 	$smsg->set_bytes($$bref, $size);
 	my $self = $sync->{sidx};
-	${$sync->{need_checkpoint}} = 1 if update_checkpoint $self, $smsg->{bytes};
+	update_checkpoint $self, $smsg->{bytes};
 	local $self->{current_info} = "$self->{current_info}: $oid";
 	my $eml = PublicInbox::Eml->new($bref);
 	$smsg->{num} = index_mm($self, $eml, $oid, $sync) or
@@ -860,7 +862,7 @@ sub check_size { # check_async cb for -index --max-size=...
 sub v1_checkpoint ($$;$) {
 	my ($self, $sync, $stk) = @_;
 	$self->{ibx}->git->async_wait_all;
-	${$sync->{need_checkpoint}} = undef;
+	$self->{need_checkpoint} = 0;
 
 	# $newest may be undef
 	my $newest = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
@@ -910,7 +912,7 @@ sub process_stack {
 	my $nr = 0;
 	$sync->{nr} = \$nr;
 	$sync->{sidx} = $self;
-	$sync->{need_checkpoint} = \(my $need_ckpt);
+	local $self->{need_checkpoint} = 0;
 	$sync->{latest_cmt} = \(my $latest_cmt);
 
 	$self->{mm}->{dbh}->begin_work;
@@ -940,8 +942,7 @@ sub process_stack {
 		} elsif ($f eq 'd') {
 			$git->cat_async($oid, \&unindex_both, $arg);
 		}
-		${$sync->{need_checkpoint}} and
-			v1_checkpoint $self, $sync;
+		v1_checkpoint $self, $sync if $self->{need_checkpoint};
 	}
 	v1_checkpoint($self, $sync, $sync->{quit} ? undef : $stk);
 }
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 87118ec4..15945b35 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -22,7 +22,7 @@ use PublicInbox::Spawn qw(spawn popen_rd run_die);
 use PublicInbox::Search;
 use PublicInbox::SearchIdx qw(log2stack is_ancestor check_size is_bad_blob
 	update_checkpoint);
-use PublicInbox::DS qw(now);
+use PublicInbox::DS;
 use IO::Handle; # ->autoflush
 use POSIX ();
 use Carp qw(confess);
@@ -703,7 +703,7 @@ sub reindex_checkpoint ($$) {
 
 	$self->git->async_wait_all;
 	$self->update_last_commit($sync);
-	${$sync->{need_checkpoint}} = 0;
+	$self->{need_checkpoint} = 0;
 	my $mm_tmp = $sync->{mm_tmp};
 	$mm_tmp->atfork_prepare if $mm_tmp;
 	die 'BUG: {im} during reindex' if $self->{im};
@@ -719,9 +719,6 @@ sub reindex_checkpoint ($$) {
 
 	# allow -watch or -mda to write...
 	$self->idx_init($sync->{-opt}); # reacquire lock
-	if (my $intvl = $sync->{check_intvl}) { # eidx
-		$sync->{next_check} = now + $intvl;
-	}
 	$mm_tmp->atfork_parent if $mm_tmp;
 }
 
@@ -817,7 +814,7 @@ sub index_oid { # cat_async callback
 	}, 'PublicInbox::Smsg';
 	$smsg->populate($eml, $arg);
 	$smsg->set_bytes($$bref, $size);
-	${$arg->{need_checkpoint}} = 1 if do_idx $self, $eml, $smsg;
+	do_idx $self, $eml, $smsg;
 	index_finalize($arg, 1);
 }
 
@@ -1169,9 +1166,7 @@ sub index_todo ($$$) {
 		} elsif ($f eq 'd') {
 			$all->cat_async($oid, $unindex_oid, $req);
 		}
-		if (${$sync->{need_checkpoint}}) {
-			reindex_checkpoint($self, $sync);
-		}
+		reindex_checkpoint($self, $sync) if $self->{need_checkpoint};
 	}
 	$all->async_wait_all;
 	$self->update_last_commit($sync, $stk);
@@ -1185,7 +1180,6 @@ sub xapian_only {
 	$self->idx_init($opt); # acquire lock
 	if (my $art_end = $self->{ibx}->mm->max) {
 		$sync //= {
-			need_checkpoint => \(my $bool = 0),
 			-opt => $opt,
 			self => $self,
 			nr => \(my $nr = 0),
@@ -1214,6 +1208,7 @@ sub xapian_only {
 sub index_sync {
 	my ($self, $opt) = @_;
 	$opt //= {};
+	local $self->{need_checkpoint} = 0;
 	return xapian_only($self, $opt) if $opt->{xapian_only};
 
 	my $epoch_max;
@@ -1238,7 +1233,6 @@ sub index_sync {
 	$self->{mg}->fill_alternates;
 	$self->{oidx}->rethread_prepare($opt);
 	my $sync = {
-		need_checkpoint => \(my $bool = 0),
 		reindex => $opt->{reindex},
 		-opt => $opt,
 		self => $self,

^ permalink raw reply related	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2024-12-12 10:10 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2024-12-12 10:10 [PATCH 0/3] indexing cleanups Eric Wong
2024-12-12 10:10 ` [PATCH 1/3] searchidx: update_checkpoint: take bytes arg directly Eric Wong
2024-12-12 10:10 ` [PATCH 2/3] extindex: move {checkpoint_unlocks} to $self Eric Wong
2024-12-12 10:10 ` [PATCH 3/3] searchidx: consolidate checkpoint accounting Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).