unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
* [PATCH] extindex: preliminary --reindex support
@ 2020-12-11  3:37 Eric Wong
  2020-12-12 19:53 ` [PATCH 2/1] extindex: reindex: drop stale rows from over.sqlite3 Eric Wong
  2020-12-15  2:02 ` [PATCH 0/9] extindex: --reindex support Eric Wong
  0 siblings, 2 replies; 14+ messages in thread
From: Eric Wong @ 2020-12-11  3:37 UTC (permalink / raw)
  To: meta

--reindex allows us to catch missed and stale messages due to
-extindex vs -index races prior to commit 02b2fcc46f364b51
("extsearchidx: enforce -index before -extindex").

We'll also rely on reindex to internally deal with v1/v2 inbox
removals and partial-unindexing of messages which are only
removed from one inbox out of many.

This reindex design is completely different than how normal
v1/v2 inbox reindex operates due to extindex having multiple
histories to work with.  Instead of scanning git history, this
relies exclusively on comparing over.sqlite3 contents between
the v1/v2 inboxes and the extindex.

Changes to Xapian behavior also get picked up, now.  Xapian indexing
is handled by workers with minimal IPC to the parent process.
This results in more read I/O but fewer writes when dealing
with cross-posted messages.

Changes to $smsg->populate and --rethread still need further
work.
---
 lib/PublicInbox/ExtSearchIdx.pm   | 195 ++++++++++++++++++++++++++++--
 lib/PublicInbox/OverIdx.pm        |  23 ++++
 lib/PublicInbox/SearchIdx.pm      |  77 +++++++++++-
 lib/PublicInbox/SearchIdxShard.pm |  11 ++
 lib/PublicInbox/V2Writable.pm     |   6 +-
 t/extsearch.t                     |  62 +++++++++-
 6 files changed, 357 insertions(+), 17 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 84449cb4..394a89d4 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -29,6 +29,7 @@ use PublicInbox::InboxWritable;
 use PublicInbox::ContentHash qw(content_hash);
 use PublicInbox::Eml;
 use File::Spec;
+use DBI qw(:sql_types); # SQL_BLOB
 
 sub new {
 	my (undef, $dir, $opt) = @_;
@@ -123,9 +124,11 @@ sub do_xpost ($$) {
 		my $nr = $self->{oidx}->remove_xref3($docid, $oid, $eidx_key,
 							\$rm_eidx_info);
 		if ($nr == 0) {
+			$self->{oidx}->eidxq_del($docid);
 			$idx->shard_remove($docid);
 		} elsif ($rm_eidx_info) {
 			$idx->shard_remove_eidx_info($docid, $eidx_key, $eml);
+			$self->{oidx}->eidxq_add($docid); # yes, add
 		}
 	}
 }
@@ -168,7 +171,7 @@ sub do_finalize ($) {
 
 sub do_step ($) { # main iterator for adding messages to the index
 	my ($req) = @_;
-	my $self = $req->{self};
+	my $self = $req->{self} // die 'BUG: {self} missing';
 	while (1) {
 		if (my $next_arg = $req->{next_arg}) {
 			if (my $smsg = $self->{oidx}->next_by_mid(@$next_arg)) {
@@ -311,7 +314,7 @@ sub _sync_inbox ($$$) {
 	$ibx->git->cleanup; # done with this inbox, now
 }
 
-sub unref_doc ($$$$) {
+sub gc_unref_doc ($$$$) {
 	my ($self, $ibx_id, $eidx_key, $docid) = @_;
 	my $dbh = $self->{oidx}->dbh;
 
@@ -326,15 +329,14 @@ SELECT oidbin FROM xref3 WHERE docid = ? AND ibx_id = ?
 DELETE FROM xref3 WHERE docid = ? AND ibx_id = ?
 
 	my $remain = $self->{oidx}->get_xref3($docid);
-	my $idx = $self->idx_shard($docid);
-	if (@$remain) {
+	if (scalar(@$remain)) {
+		$self->{oidx}->eidxq_add($docid); # enqueue for reindex
 		for my $oid (@oid) {
 			warn "I: unref #$docid $eidx_key $oid\n";
-			$idx->shard_remove_eidx_info($docid, $eidx_key);
 		}
 	} else {
 		warn "I: remove #$docid $eidx_key @oid\n";
-		$idx->shard_remove($docid);
+		$self->idx_shard($docid)->shard_remove($docid);
 	}
 }
 
@@ -356,7 +358,7 @@ sub eidx_gc {
 		warn "I: deleting messages for $eidx_key...\n";
 		$x3_doc->execute($ibx_id);
 		while (defined(my $docid = $x3_doc->fetchrow_array)) {
-			unref_doc($self, $ibx_id, $eidx_key, $docid);
+			gc_unref_doc($self, $ibx_id, $eidx_key, $docid);
 		}
 		$dbh->prepare_cached(<<'')->execute($ibx_id);
 DELETE FROM inboxes WHERE ibx_id = ?
@@ -393,20 +395,187 @@ DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over)
 	done($self);
 }
 
+sub eidxq_process ($$) { # for reindexing
+	my ($self, $sync) = @_;
+
+	$self->{oidx}->commit_lazy; # ensure shard workers can see it
+	$self->{oidx}->begin_lazy;
+	my $dbh = $self->{oidx}->dbh;
+	my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
+	${$sync->{nr}} = 0;
+	$sync->{-regen_fmt} = "%u/$tot\n";
+	my $pr = $sync->{-opt}->{-progress};
+	if ($pr) {
+		my $min = $dbh->selectrow_array('SELECT MIN(docid) FROM eidxq');
+		my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq');
+		$pr->("Xapian indexing $min..$max (total=$tot)\n");
+	}
+
+	my $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
+	my $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
+	$iter->execute;
+	while (defined(my $docid = $iter->fetchrow_array)) {
+		$self->idx_shard($docid)->shard_reindex_docid($docid);
+		$del->execute($docid);
+		last if $sync->{quit};
+		my $cur = ++${$sync->{nr}};
+
+		# shards flush on their own, just don't queue up too many
+		# deletes
+		if (($cur % 1000) == 0) {
+			$self->{oidx}->commit_lazy;
+			$self->{oidx}->begin_lazy;
+			$pr->("reindexed $cur/$tot\n") if $pr;
+		}
+		# this is only for SIGUSR1, shards do their own accounting:
+		reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}};
+	}
+	$pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr;
+	$self->{oidx}->commit_lazy;
+	$self->{oidx}->begin_lazy;
+}
+
+sub _reindex_unseen { # git->cat_async callback
+	my ($bref, $oid, $type, $size, $req) = @_;
+	return if is_bad_blob($oid, $type, $size, $req->{oid});
+	my $self = $req->{self} // die 'BUG: {self} unset';
+	local $self->{current_info} = "$self->{current_info} $oid";
+	my $new_smsg = bless { blob => $oid, }, 'PublicInbox::Smsg';
+	$new_smsg->{bytes} = $size + crlf_adjust($$bref);
+	my $eml = $req->{eml} = PublicInbox::Eml->new($bref);
+	$req->{new_smsg} = $new_smsg;
+	$req->{chash} = content_hash($eml);
+	$req->{mids} = mids($eml); # do_step iterates through this
+	do_step($req); # enter the normal indexing flow
+}
+
+# --reindex may catch totally unseen messages, this handles them
+sub reindex_unseen ($$$$) {
+	my ($self, $sync, $ibx, $xsmsg) = @_;
+	my $req = {
+		%$sync, # has {self}
+		autime => $xsmsg->{ds},
+		cotime => $xsmsg->{ts},
+		oid => $xsmsg->{blob},
+		ibx => $ibx,
+		xnum => $xsmsg->{num},
+		# {mids} and {chash} will be filled in at _reindex_unseen
+	};
+	warn "I: reindex_unseen ${\$ibx->eidx_key}:$req->{xnum}:$req->{oid}\n";
+	$self->git->cat_async($xsmsg->{blob}, \&_reindex_unseen, $req);
+}
+
+sub _reindex_check_unseen ($$$) {
+	my ($self, $sync, $ibx) = @_;
+	my $ibx_id = $ibx->{-ibx_id};
+	my ($beg, $end) = (1, 1000);
+
+	# first, check if we missed any messages in target $ibx
+	my $inx3 = $self->{oidx}->dbh->prepare(<<'');
+SELECT DISTINCT(docid) FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
+
+	my $msgs;
+	while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) {
+		$beg = $msgs->[-1]->{num} + 1;
+		$end = $beg + 1000;
+		for my $xsmsg (@$msgs) {
+			my $oidbin = pack('H*', $xsmsg->{blob});
+			$inx3->bind_param(1, $ibx_id);
+			$inx3->bind_param(2, $xsmsg->{num});
+			$inx3->bind_param(3, $oidbin, SQL_BLOB);
+			$inx3->execute;
+			my $docids = $inx3->fetchall_arrayref;
+			# index messages which were totally missed
+			# the first time around ASAP:
+			if (scalar(@$docids) == 0) {
+				reindex_unseen($self, $sync, $ibx, $xsmsg);
+			} else { # already seen, reindex later
+				for my $r (@$docids) {
+					$self->{oidx}->eidxq_add($r->[0]);
+				}
+			}
+			last if $sync->{quit};
+		}
+		last if $sync->{quit};
+	}
+}
+
+sub _reindex_check_stale ($$$) {
+	my ($self, $sync, $ibx) = @_;
+
+	# now, check if there's stale xrefs
+	my $get_xnum = $self->{oidx}->dbh->prepare(<<'');
+SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? ORDER BY docid ASC
+
+	$get_xnum->execute($ibx->{-ibx_id});
+	my $del_xref3 = $self->{oidx}->dbh->prepare(<<'');
+DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
+
+	while (my ($docid, $xnum, $oidbin) = $get_xnum->fetchrow_array) {
+		last if $sync->{quit};
+		my $smsg = $ibx->over->get_art($xnum);
+		my $oidhex = unpack('H*', $oidbin);
+		my $err;
+		if (!$smsg) {
+			$err = 'stale';
+		} elsif ($smsg->{blob} ne $oidhex) {
+			$err = "mismatch (!= $smsg->{blob})";
+		} else {
+			next; # likely, all good
+		}
+		warn $ibx->eidx_key . ":$xnum:$oidhex (#$docid): $err\n";
+		$del_xref3->bind_param(1, $ibx->{-ibx_id});
+		$del_xref3->bind_param(2, $xnum);
+		$del_xref3->bind_param(3, $oidbin, SQL_BLOB);
+		$del_xref3->execute;
+
+		# get_xref3 over-fetches, but this is a rare path:
+		my $xr3 = $self->{oidx}->get_xref3($docid);
+		my $idx = $self->idx_shard($docid);
+		if (scalar(@$xr3) == 0) { # all gone
+			$self->{oidx}->eidxq_del($docid);
+			$idx->shard_remove($docid);
+		} else { # enqueue for reindex of remaining messages
+			$idx->shard_remove_eidx_info($docid, $ibx->eidx_key);
+			$self->{oidx}->eidxq_add($docid); # yes, add
+		}
+	}
+}
+
+sub _reindex_inbox ($$$) {
+	my ($self, $sync, $ibx) = @_;
+	_reindex_check_unseen($self, $sync, $ibx);
+	_reindex_check_stale($self, $sync, $ibx) unless $sync->{quit};
+	delete @$ibx{qw(over mm search git)}; # won't need these for a bit
+}
+
+sub eidx_reindex {
+	my ($self, $sync) = @_;
+
+	for my $ibx (@{$self->{ibx_list}}) {
+		_reindex_inbox($self, $sync, $ibx);
+		last if $sync->{quit};
+	}
+	$self->git->async_wait_all; # ensure eidxq gets filled completely
+	eidxq_process($self, $sync) unless $sync->{quit};
+}
+
 sub eidx_sync { # main entry point
 	my ($self, $opt) = @_;
-	$self->idx_init($opt); # acquire lock via V2Writable::_idx_init
-	$self->{oidx}->rethread_prepare($opt);
 
 	my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ };
 	local $self->{current_info} = '';
 	local $SIG{__WARN__} = sub {
 		$warn_cb->($self->{current_info}, ': ', @_);
 	};
+	$self->idx_init($opt); # acquire lock via V2Writable::_idx_init
+	$self->{oidx}->rethread_prepare($opt);
 	my $sync = {
 		need_checkpoint => \(my $need_checkpoint = 0),
-		reindex => $opt->{reindex},
 		-opt => $opt,
+		# DO NOT SET {reindex} here, it's incompatible with reused
+		# V2Writable code, reindex is totally different here
+		# compared to v1/v2 inboxes because we have multiple histories
 		self => $self,
 		-regen_fmt => "%u/?\n",
 	};
@@ -415,6 +584,10 @@ sub eidx_sync { # main entry point
 	local $SIG{QUIT} = $quit;
 	local $SIG{INT} = $quit;
 	local $SIG{TERM} = $quit;
+	for my $ibx (@{$self->{ibx_list}}) {
+		$ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
+	}
+	eidx_reindex($self, $sync) if delete($opt->{reindex});
 
 	# don't use $_ here, it'll get clobbered by reindex_checkpoint
 	for my $ibx (@{$self->{ibx_list}}) {
@@ -422,6 +595,7 @@ sub eidx_sync { # main entry point
 		_sync_inbox($self, $sync, $ibx);
 	}
 	$self->{oidx}->rethread_done($opt) unless $sync->{quit};
+	eidxq_process($self, $sync) unless $sync->{quit};
 
 	PublicInbox::V2Writable::done($self);
 }
@@ -522,5 +696,6 @@ no warnings 'once';
 *count_shards = \&PublicInbox::V2Writable::count_shards;
 *atfork_child = \&PublicInbox::V2Writable::atfork_child;
 *idx_shard = \&PublicInbox::V2Writable::idx_shard;
+*reindex_checkpoint = \&PublicInbox::V2Writable::reindex_checkpoint;
 
 1;
diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm
index 38552247..4a39bf53 100644
--- a/lib/PublicInbox/OverIdx.pm
+++ b/lib/PublicInbox/OverIdx.pm
@@ -563,6 +563,15 @@ CREATE TABLE IF NOT EXISTS eidx_meta (
 	val VARCHAR(255) NOT NULL
 )
 
+		# A queue of current docids which need reindexing.
+		# eidxq persists across aborted -extindex invocations
+		# Currently used for "-extindex --reindex" for Xapian
+		# data, but may be used in more places down the line.
+		$dbh->do(<<'');
+CREATE TABLE IF NOT EXISTS eidxq (
+	docid INTEGER PRIMARY KEY NOT NULL
+)
+
 		$dbh;
 	};
 }
@@ -661,4 +670,18 @@ UPDATE over SET ddd = ? WHERE num = ?
 	$sth->execute;
 }
 
+sub eidxq_add {
+	my ($self, $docid) = @_;
+	$self->dbh->prepare_cached(<<'')->execute($docid);
+INSERT OR IGNORE INTO eidxq (docid) VALUES (?)
+
+}
+
+sub eidxq_del {
+	my ($self, $docid) = @_;
+	$self->dbh->prepare_cached(<<'')->execute($docid);
+DELETE FROM eidxq WHERE docid = ?
+
+}
+
 1;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 0fbe6560..cd8f4dd7 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -352,8 +352,9 @@ sub index_ids ($$$$) {
 	index_list_id($self, $doc, $hdr);
 }
 
-sub add_xapian ($$$$) {
+sub eml2doc ($$$;$) {
 	my ($self, $eml, $smsg, $mids) = @_;
+	$mids //= mids_for_index($eml);
 	my $doc = $X->{Document}->new;
 	add_val($doc, PublicInbox::Search::TS(), $smsg->{ts});
 	my @ds = gmtime($smsg->{ds});
@@ -396,6 +397,12 @@ sub add_xapian ($$$$) {
 			}
 		}
 	}
+	$doc;
+}
+
+sub add_xapian ($$$$) {
+	my ($self, $eml, $smsg, $mids) = @_;
+	my $doc = eml2doc($self, $eml, $smsg, $mids);
 	$self->{xdb}->replace_document($smsg->{num}, $doc);
 }
 
@@ -941,6 +948,10 @@ sub set_metadata_once {
 
 sub _commit_txn {
 	my ($self) = @_;
+	if (my $eidx = $self->{eidx}) {
+		$eidx->git->async_wait_all;
+		$eidx->{transact_bytes} = 0;
+	}
 	if (my $xdb = $self->{xdb}) {
 		set_metadata_once($self);
 		$xdb->commit_transaction;
@@ -997,4 +1008,68 @@ SELECT COUNT(*) FROM over WHERE num = ?
 	}
 }
 
+sub reindex_xap { # git->cat_async callback
+	my ($bref, $oid, $type, $size, $ary) = @_;
+	my ($ibx_id, $oidhex, $req, $more) = @$ary;
+	my $self = $req->{self} // die 'BUG: {self} missing';
+	my $eidx = $self->{eidx} // die 'BUG: {eidx} missing';
+	my $eidx_key = $self->{-eidx_key_for}->{$ibx_id} //
+			die "BUG: bad ibx_id=$ibx_id ($oid)";
+
+	my $docid = $req->{docid};
+	local $eidx->{current_info} = "#$docid $oid";
+	return if is_bad_blob($oid, $type, $size, $oidhex);
+	if (my $doc = $req->{doc}) { # modify existing doc
+		$req->{tg_isset} //= do { # for existing documents in {xdb}
+			term_generator($self)->set_document($doc);
+			1;
+		};
+		$doc->add_boolean_term('O'.$eidx_key);
+		index_list_id($self, $doc, PublicInbox::Eml->new($bref));
+	} else { # first time seeing this doc
+		my $smsg = $self->{eidx}->over->get_art($docid) //
+			die "BUG: #$docid ($oid) not in over";
+		$smsg->{bytes} = $size + crlf_adjust($$bref);
+		$smsg->{eidx_key} = $eidx_key;
+		my $eml = PublicInbox::Eml->new($bref);
+		$req->{doc} = eml2doc($self, $eml, $smsg);
+		$req->{tg_isset} = 1; # eml2doc calls $tg->set_document
+	}
+	return if $more;
+	my $doc = delete($req->{doc}) or return; # all bad blobs!
+	$eidx->{transact_bytes} += $size;
+	$self->{xdb}->replace_document($req->{docid}, $doc);
+}
+
+sub reindex_docid {
+	my ($self, $docid) = @_;
+	my $eidx = $self->{eidx} // die 'BUG: {eidx} missing';
+	my $eidx_key_for = $self->{-eidx_key_for} //= do {
+		my %eidx_key_for = map {
+			$_->[0] => $_->[1];
+		} @{$eidx->over->dbh->selectall_arrayref(<<'')};
+SELECT ibx_id,eidx_key FROM inboxes
+
+		\%eidx_key_for;
+	};
+
+	begin_txn_lazy($self);
+	my $doc = eval { $self->{xdb}->get_document($docid) };
+	my $req = { doc => $doc, self => $self, docid => $docid };
+	my $sth = $eidx->over->dbh->prepare_cached(<<'', undef, 1);
+SELECT ibx_id,oidbin FROM xref3 WHERE docid = ? ORDER BY ibx_id ASC
+
+	$sth->execute($docid);
+	my $rows = $sth->fetchall_arrayref;
+	while (my $row = shift(@$rows)) {
+		my ($ibx_id, $oidbin) = @$row;
+		my $oidhex = unpack('H*', $oidbin);
+		$eidx->git->cat_async($oidhex, \&reindex_xap,
+				[ $ibx_id, $oidhex, $req, scalar(@$rows) ]);
+	}
+	if ($eidx->{transact_bytes} >= $eidx->{batch_bytes}) {
+		commit_txn_lazy($self);
+	}
+}
+
 1;
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
index 579ed196..b6eef6bd 100644
--- a/lib/PublicInbox/SearchIdxShard.pm
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -94,6 +94,8 @@ sub shard_worker_loop ($$$$$) {
 			my $over_fn = $1;
 			$over_fn =~ tr/\0/\n/;
 			$self->over_check(PublicInbox::Over->new($over_fn));
+		} elsif ($line =~ /\AE ([0-9]+)\n/) {
+			$self->reindex_docid($1 + 0);
 		} else {
 			chomp $line;
 			my $eidx_key;
@@ -223,4 +225,13 @@ sub shard_over_check {
 	}
 }
 
+sub shard_reindex_docid {
+	my ($self, $docid) = @_;
+	if (my $w = $self->{w}) {
+		print $w "E $docid\n" or die "failed to write to shard: $!";
+	} else {
+		$self->reindex_docid($docid);
+	}
+}
+
 1;
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index bef3a67a..572eb418 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -275,13 +275,17 @@ sub _idx_init { # with_umask callback
 	$self->{shards} = $nshards if $nshards && $nshards != $self->{shards};
 	$self->{batch_bytes} = $opt->{batch_size} //
 				$PublicInbox::SearchIdx::BATCH_BYTES;
-	$self->{batch_bytes} *= $self->{shards} if $self->{parallel};
 
 	# need to create all shards before initializing msgmap FD
 	# idx_shards must be visible to all forked processes
 	my $max = $self->{shards} - 1;
 	my $idx = $self->{idx_shards} = [];
 	push @$idx, PublicInbox::SearchIdxShard->new($self, $_) for (0..$max);
+
+	# SearchIdxShard may do their own flushing, so don't scale
+	# until after forking
+	$self->{batch_bytes} *= $self->{shards} if $self->{parallel};
+
 	my $ibx = $self->{ibx} or return; # ExtIdxSearch
 
 	# Now that all subprocesses are up, we can open the FDs
diff --git a/t/extsearch.t b/t/extsearch.t
index 70a60b5a..f16ec0d1 100644
--- a/t/extsearch.t
+++ b/t/extsearch.t
@@ -5,6 +5,7 @@ use strict;
 use Test::More;
 use PublicInbox::TestCommon;
 use PublicInbox::Config;
+use PublicInbox::Search;
 use Fcntl qw(:seek);
 my $json = PublicInbox::Config::json() or plan skip_all => 'JSON missing';
 require_git(2.6);
@@ -101,7 +102,7 @@ my $es = PublicInbox::ExtSearch->new("$home/extindex");
 	is(scalar(@$xref3), 2, 'only to entries');
 }
 
-{
+if ('inbox edited') {
 	my ($in, $out, $err);
 	$in = $out = $err = '';
 	my $opt = { 0 => \$in, 1 => \$out, 2 => \$err };
@@ -176,13 +177,13 @@ is(scalar(@it), 2, 'two inboxes');
 like($it[0]->get_document->get_data, qr/v2test/, 'docdata matched v2');
 like($it[1]->get_document->get_data, qr/v1test/, 'docdata matched v1');
 
+my $cfg = PublicInbox::Config->new;
+my $schema_version = PublicInbox::Search::SCHEMA_VERSION();
 if ('inject w/o indexing') {
 	use PublicInbox::Import;
-	use PublicInbox::Search;
-	my $schema_version = PublicInbox::Search::SCHEMA_VERSION();
-	my $v1ibx = PublicInbox::Config->new->lookup_name('v1test');
+	my $v1ibx = $cfg->lookup_name('v1test');
 	my $last_v1_commit = $v1ibx->mm->last_commit;
-	my $v2ibx = PublicInbox::Config->new->lookup_name('v2test');
+	my $v2ibx = $cfg->lookup_name('v2test');
 	my $last_v2_commit = $v2ibx->mm->last_commit_xap($schema_version, 0);
 	my $git0 = PublicInbox::Git->new("$v2ibx->{inboxdir}/git/0.git");
 	chomp(my $cmt = $git0->qx(qw(rev-parse HEAD^0)));
@@ -230,6 +231,57 @@ if ('inject w/o indexing') {
 	is($mset->size, 1, 'got v2 message');
 }
 
+if ('reindex catches missed messages') {
+	use PublicInbox::InboxWritable;
+	use PublicInbox::OverIdx;
+	my $v2ibx = $cfg->lookup_name('v2test');
+	my $im = PublicInbox::InboxWritable->new($v2ibx)->importer(0);
+	my $cmt_a = $v2ibx->mm->last_commit_xap($schema_version, 0);
+	my $eml = eml_load('t/data/0001.patch');
+	$im->add($eml);
+	$im->done;
+	my $cmt_b = $v2ibx->mm->last_commit_xap($schema_version, 0);
+	isnt($cmt_a, $cmt_b, 'v2 0.git HEAD updated');
+	my $f = "$home/extindex/ei$schema_version/over.sqlite3";
+	my $oidx = PublicInbox::OverIdx->new($f);
+	$oidx->dbh;
+	my $uv = $v2ibx->uidvalidity;
+	my $lc_key = "lc-v2:v2.example//$uv;0";
+	is($oidx->eidx_meta($lc_key, $cmt_b), $cmt_a,
+		'update lc-v2 meta, old is as expected');
+	my $max = $oidx->max;
+	$oidx->dbh_close;
+	ok(run_script([qw(-extindex), "$home/extindex", $v2ibx->{inboxdir}]),
+		'-extindex noop');
+	is($oidx->max, $max, '->max unchanged');
+	is($oidx->eidx_meta($lc_key), $cmt_b, 'lc-v2 unchanged');
+	$oidx->dbh_close;
+	my $opt = { 2 => \(my $err = '') };
+	ok(run_script([qw(-extindex --reindex), "$home/extindex",
+			$v2ibx->{inboxdir}], undef, $opt),
+			'--reindex for unseen');
+	is($oidx->max, $max + 1, '->max bumped');
+	is($oidx->eidx_meta($lc_key), $cmt_b, 'lc-v2 stays unchanged');
+	my @err = split(/^/, $err);
+	is(scalar(@err), 1, 'only one warning');
+	like($err[0], qr/I: reindex_unseen/, 'got reindex_unseen message');
+	my $new = $oidx->get_art($max + 1);
+	is($new->{subject}, $eml->header('Subject'), 'new message added');
+
+	ok($im->remove($eml), 'remove new message from v2 inbox');
+	$im->done;
+	my $cmt_c = $v2ibx->mm->last_commit_xap($schema_version, 0);
+	is($oidx->eidx_meta($lc_key, $cmt_c), $cmt_b,
+		'bump lc-v2 meta again to skip v2 remove');
+	$err = '';
+	ok(run_script([qw(-extindex --reindex), "$home/extindex",
+			$v2ibx->{inboxdir}], undef, $opt),
+			'--reindex for stale');
+	@err = split(/^/, $err);
+	is(scalar(@err), 1, 'only one warning');
+	like($err[0], qr/\(#$new->{num}\): stale/, 'got stale message warning');
+}
+
 if ('remove v1test and test gc') {
 	xsys([qw(git config --unset publicinbox.v1test.inboxdir)],
 		{ GIT_CONFIG => $cfg_path });

^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 2/1] extindex: reindex: drop stale rows from over.sqlite3
  2020-12-11  3:37 [PATCH] extindex: preliminary --reindex support Eric Wong
@ 2020-12-12 19:53 ` Eric Wong
  2020-12-15  2:02 ` [PATCH 0/9] extindex: --reindex support Eric Wong
  1 sibling, 0 replies; 14+ messages in thread
From: Eric Wong @ 2020-12-12 19:53 UTC (permalink / raw)
  To: meta

In addition to removing stale messages from Xapian, we must
also remove them from over.sqlite3.
---
 Still testing rethread/smsg stuffs...

 lib/PublicInbox/ExtSearchIdx.pm |  1 +
 t/extsearch.t                   | 12 ++++++++++++
 2 files changed, 13 insertions(+)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 394a89d4..ec86a7c0 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -533,6 +533,7 @@ DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
 		my $xr3 = $self->{oidx}->get_xref3($docid);
 		my $idx = $self->idx_shard($docid);
 		if (scalar(@$xr3) == 0) { # all gone
+			$self->{oidx}->delete_by_num($docid);
 			$self->{oidx}->eidxq_del($docid);
 			$idx->shard_remove($docid);
 		} else { # enqueue for reindex of remaining messages
diff --git a/t/extsearch.t b/t/extsearch.t
index f16ec0d1..4a8a9f49 100644
--- a/t/extsearch.t
+++ b/t/extsearch.t
@@ -268,18 +268,30 @@ if ('reindex catches missed messages') {
 	my $new = $oidx->get_art($max + 1);
 	is($new->{subject}, $eml->header('Subject'), 'new message added');
 
+	$es->{xdb}->reopen;
+	my $mset = $es->mset("mid:$new->{mid}");
+	is($mset->size, 1, 'previously unseen, now indexed in Xapian');
+
 	ok($im->remove($eml), 'remove new message from v2 inbox');
 	$im->done;
 	my $cmt_c = $v2ibx->mm->last_commit_xap($schema_version, 0);
 	is($oidx->eidx_meta($lc_key, $cmt_c), $cmt_b,
 		'bump lc-v2 meta again to skip v2 remove');
 	$err = '';
+	$oidx->dbh_close;
 	ok(run_script([qw(-extindex --reindex), "$home/extindex",
 			$v2ibx->{inboxdir}], undef, $opt),
 			'--reindex for stale');
 	@err = split(/^/, $err);
 	is(scalar(@err), 1, 'only one warning');
 	like($err[0], qr/\(#$new->{num}\): stale/, 'got stale message warning');
+	is($oidx->get_art($new->{num}), undef,
+		'stale message gone from over');
+	is_deeply($oidx->get_xref3($new->{num}), [],
+		'stale message has no xref3');
+	$es->{xdb}->reopen;
+	$mset = $es->mset("mid:$new->{mid}");
+	is($mset->size, 0, 'stale mid gone Xapian');
 }
 
 if ('remove v1test and test gc') {

^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 0/9] extindex: --reindex support
  2020-12-11  3:37 [PATCH] extindex: preliminary --reindex support Eric Wong
  2020-12-12 19:53 ` [PATCH 2/1] extindex: reindex: drop stale rows from over.sqlite3 Eric Wong
@ 2020-12-15  2:02 ` Eric Wong
  2020-12-15  2:02   ` [PATCH 1/9] extindex: preliminary " Eric Wong
                     ` (9 more replies)
  1 sibling, 10 replies; 14+ messages in thread
From: Eric Wong @ 2020-12-15  2:02 UTC (permalink / raw)
  To: meta

Patches 1 and 2 are resends, the rest have gone through a lot of
changes and I'm probably ready to run this live on the
extindex which holds my lore mirror onion
<http://rskvuqcfnfizkjg6h5jvovwb3wkikzcwskf54lfpymus6mxrzw67b5ad.onion/all/>

Eric Wong (9):
  extindex: preliminary --reindex support
  extindex: delete stale messages from over.sqlite3
  over: sort xref3 by xnum if ibx_id repeats
  extindex: support --rethread and content bifurcation
  extsearchidx: reindex works on Xapian, too
  extsearchidx: checkpoint releases locks
  extsearchidx: simplify reindex code paths
  extsearchidx: reindex releases over.sqlite3 handles properly
  searchidxshard: simplify newline elimination

 lib/PublicInbox/ExtSearchIdx.pm   | 369 +++++++++++++++++++++++++++++-
 lib/PublicInbox/Over.pm           |   5 +-
 lib/PublicInbox/OverIdx.pm        |  23 ++
 lib/PublicInbox/SearchIdx.pm      |  13 +-
 lib/PublicInbox/SearchIdxShard.pm |  20 +-
 lib/PublicInbox/V2Writable.pm     |  11 +-
 t/extsearch.t                     | 133 ++++++++++-
 7 files changed, 542 insertions(+), 32 deletions(-)

^ permalink raw reply	[flat|nested] 14+ messages in thread

* [PATCH 1/9] extindex: preliminary --reindex support
  2020-12-15  2:02 ` [PATCH 0/9] extindex: --reindex support Eric Wong
@ 2020-12-15  2:02   ` Eric Wong
  2020-12-15  2:02   ` [PATCH 2/9] extindex: delete stale messages from over.sqlite3 Eric Wong
                     ` (8 subsequent siblings)
  9 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2020-12-15  2:02 UTC (permalink / raw)
  To: meta

--reindex allows us to catch missed and stale messages due to
-extindex vs -index races prior to commit 02b2fcc46f364b51
("extsearchidx: enforce -index before -extindex").

We'll also rely on reindex to internally deal with v1/v2 inbox
removals and partial-unindexing of messages which are only
removed from one inbox out of many.

This reindex design is completely different than how normal
v1/v2 inbox reindex operates due to extindex having multiple
histories to work with.  Instead of scanning git history, this
relies exclusively on comparing over.sqlite3 contents between
the v1/v2 inboxes and the extindex.

Changes to Xapian behavior also get picked up, now.  Xapian indexing
is handled by workers with minimal IPC to the parent process.
This results in more read I/O but fewer writes when dealing
with cross-posted messages.

Changes to $smsg->populate and --rethread still need further
work.
---
 lib/PublicInbox/ExtSearchIdx.pm   | 195 ++++++++++++++++++++++++++++--
 lib/PublicInbox/OverIdx.pm        |  23 ++++
 lib/PublicInbox/SearchIdx.pm      |  77 +++++++++++-
 lib/PublicInbox/SearchIdxShard.pm |  11 ++
 lib/PublicInbox/V2Writable.pm     |   6 +-
 t/extsearch.t                     |  62 +++++++++-
 6 files changed, 357 insertions(+), 17 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 84449cb4..394a89d4 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -29,6 +29,7 @@ use PublicInbox::InboxWritable;
 use PublicInbox::ContentHash qw(content_hash);
 use PublicInbox::Eml;
 use File::Spec;
+use DBI qw(:sql_types); # SQL_BLOB
 
 sub new {
 	my (undef, $dir, $opt) = @_;
@@ -123,9 +124,11 @@ sub do_xpost ($$) {
 		my $nr = $self->{oidx}->remove_xref3($docid, $oid, $eidx_key,
 							\$rm_eidx_info);
 		if ($nr == 0) {
+			$self->{oidx}->eidxq_del($docid);
 			$idx->shard_remove($docid);
 		} elsif ($rm_eidx_info) {
 			$idx->shard_remove_eidx_info($docid, $eidx_key, $eml);
+			$self->{oidx}->eidxq_add($docid); # yes, add
 		}
 	}
 }
@@ -168,7 +171,7 @@ sub do_finalize ($) {
 
 sub do_step ($) { # main iterator for adding messages to the index
 	my ($req) = @_;
-	my $self = $req->{self};
+	my $self = $req->{self} // die 'BUG: {self} missing';
 	while (1) {
 		if (my $next_arg = $req->{next_arg}) {
 			if (my $smsg = $self->{oidx}->next_by_mid(@$next_arg)) {
@@ -311,7 +314,7 @@ sub _sync_inbox ($$$) {
 	$ibx->git->cleanup; # done with this inbox, now
 }
 
-sub unref_doc ($$$$) {
+sub gc_unref_doc ($$$$) {
 	my ($self, $ibx_id, $eidx_key, $docid) = @_;
 	my $dbh = $self->{oidx}->dbh;
 
@@ -326,15 +329,14 @@ SELECT oidbin FROM xref3 WHERE docid = ? AND ibx_id = ?
 DELETE FROM xref3 WHERE docid = ? AND ibx_id = ?
 
 	my $remain = $self->{oidx}->get_xref3($docid);
-	my $idx = $self->idx_shard($docid);
-	if (@$remain) {
+	if (scalar(@$remain)) {
+		$self->{oidx}->eidxq_add($docid); # enqueue for reindex
 		for my $oid (@oid) {
 			warn "I: unref #$docid $eidx_key $oid\n";
-			$idx->shard_remove_eidx_info($docid, $eidx_key);
 		}
 	} else {
 		warn "I: remove #$docid $eidx_key @oid\n";
-		$idx->shard_remove($docid);
+		$self->idx_shard($docid)->shard_remove($docid);
 	}
 }
 
@@ -356,7 +358,7 @@ sub eidx_gc {
 		warn "I: deleting messages for $eidx_key...\n";
 		$x3_doc->execute($ibx_id);
 		while (defined(my $docid = $x3_doc->fetchrow_array)) {
-			unref_doc($self, $ibx_id, $eidx_key, $docid);
+			gc_unref_doc($self, $ibx_id, $eidx_key, $docid);
 		}
 		$dbh->prepare_cached(<<'')->execute($ibx_id);
 DELETE FROM inboxes WHERE ibx_id = ?
@@ -393,20 +395,187 @@ DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over)
 	done($self);
 }
 
+sub eidxq_process ($$) { # for reindexing
+	my ($self, $sync) = @_;
+
+	$self->{oidx}->commit_lazy; # ensure shard workers can see it
+	$self->{oidx}->begin_lazy;
+	my $dbh = $self->{oidx}->dbh;
+	my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
+	${$sync->{nr}} = 0;
+	$sync->{-regen_fmt} = "%u/$tot\n";
+	my $pr = $sync->{-opt}->{-progress};
+	if ($pr) {
+		my $min = $dbh->selectrow_array('SELECT MIN(docid) FROM eidxq');
+		my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq');
+		$pr->("Xapian indexing $min..$max (total=$tot)\n");
+	}
+
+	my $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
+	my $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
+	$iter->execute;
+	while (defined(my $docid = $iter->fetchrow_array)) {
+		$self->idx_shard($docid)->shard_reindex_docid($docid);
+		$del->execute($docid);
+		last if $sync->{quit};
+		my $cur = ++${$sync->{nr}};
+
+		# shards flush on their own, just don't queue up too many
+		# deletes
+		if (($cur % 1000) == 0) {
+			$self->{oidx}->commit_lazy;
+			$self->{oidx}->begin_lazy;
+			$pr->("reindexed $cur/$tot\n") if $pr;
+		}
+		# this is only for SIGUSR1, shards do their own accounting:
+		reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}};
+	}
+	$pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr;
+	$self->{oidx}->commit_lazy;
+	$self->{oidx}->begin_lazy;
+}
+
+sub _reindex_unseen { # git->cat_async callback
+	my ($bref, $oid, $type, $size, $req) = @_;
+	return if is_bad_blob($oid, $type, $size, $req->{oid});
+	my $self = $req->{self} // die 'BUG: {self} unset';
+	local $self->{current_info} = "$self->{current_info} $oid";
+	my $new_smsg = bless { blob => $oid, }, 'PublicInbox::Smsg';
+	$new_smsg->{bytes} = $size + crlf_adjust($$bref);
+	my $eml = $req->{eml} = PublicInbox::Eml->new($bref);
+	$req->{new_smsg} = $new_smsg;
+	$req->{chash} = content_hash($eml);
+	$req->{mids} = mids($eml); # do_step iterates through this
+	do_step($req); # enter the normal indexing flow
+}
+
+# --reindex may catch totally unseen messages, this handles them
+sub reindex_unseen ($$$$) {
+	my ($self, $sync, $ibx, $xsmsg) = @_;
+	my $req = {
+		%$sync, # has {self}
+		autime => $xsmsg->{ds},
+		cotime => $xsmsg->{ts},
+		oid => $xsmsg->{blob},
+		ibx => $ibx,
+		xnum => $xsmsg->{num},
+		# {mids} and {chash} will be filled in at _reindex_unseen
+	};
+	warn "I: reindex_unseen ${\$ibx->eidx_key}:$req->{xnum}:$req->{oid}\n";
+	$self->git->cat_async($xsmsg->{blob}, \&_reindex_unseen, $req);
+}
+
+sub _reindex_check_unseen ($$$) {
+	my ($self, $sync, $ibx) = @_;
+	my $ibx_id = $ibx->{-ibx_id};
+	my ($beg, $end) = (1, 1000);
+
+	# first, check if we missed any messages in target $ibx
+	my $inx3 = $self->{oidx}->dbh->prepare(<<'');
+SELECT DISTINCT(docid) FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
+
+	my $msgs;
+	while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) {
+		$beg = $msgs->[-1]->{num} + 1;
+		$end = $beg + 1000;
+		for my $xsmsg (@$msgs) {
+			my $oidbin = pack('H*', $xsmsg->{blob});
+			$inx3->bind_param(1, $ibx_id);
+			$inx3->bind_param(2, $xsmsg->{num});
+			$inx3->bind_param(3, $oidbin, SQL_BLOB);
+			$inx3->execute;
+			my $docids = $inx3->fetchall_arrayref;
+			# index messages which were totally missed
+			# the first time around ASAP:
+			if (scalar(@$docids) == 0) {
+				reindex_unseen($self, $sync, $ibx, $xsmsg);
+			} else { # already seen, reindex later
+				for my $r (@$docids) {
+					$self->{oidx}->eidxq_add($r->[0]);
+				}
+			}
+			last if $sync->{quit};
+		}
+		last if $sync->{quit};
+	}
+}
+
+sub _reindex_check_stale ($$$) {
+	my ($self, $sync, $ibx) = @_;
+
+	# now, check if there's stale xrefs
+	my $get_xnum = $self->{oidx}->dbh->prepare(<<'');
+SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? ORDER BY docid ASC
+
+	$get_xnum->execute($ibx->{-ibx_id});
+	my $del_xref3 = $self->{oidx}->dbh->prepare(<<'');
+DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
+
+	while (my ($docid, $xnum, $oidbin) = $get_xnum->fetchrow_array) {
+		last if $sync->{quit};
+		my $smsg = $ibx->over->get_art($xnum);
+		my $oidhex = unpack('H*', $oidbin);
+		my $err;
+		if (!$smsg) {
+			$err = 'stale';
+		} elsif ($smsg->{blob} ne $oidhex) {
+			$err = "mismatch (!= $smsg->{blob})";
+		} else {
+			next; # likely, all good
+		}
+		warn $ibx->eidx_key . ":$xnum:$oidhex (#$docid): $err\n";
+		$del_xref3->bind_param(1, $ibx->{-ibx_id});
+		$del_xref3->bind_param(2, $xnum);
+		$del_xref3->bind_param(3, $oidbin, SQL_BLOB);
+		$del_xref3->execute;
+
+		# get_xref3 over-fetches, but this is a rare path:
+		my $xr3 = $self->{oidx}->get_xref3($docid);
+		my $idx = $self->idx_shard($docid);
+		if (scalar(@$xr3) == 0) { # all gone
+			$self->{oidx}->eidxq_del($docid);
+			$idx->shard_remove($docid);
+		} else { # enqueue for reindex of remaining messages
+			$idx->shard_remove_eidx_info($docid, $ibx->eidx_key);
+			$self->{oidx}->eidxq_add($docid); # yes, add
+		}
+	}
+}
+
+sub _reindex_inbox ($$$) {
+	my ($self, $sync, $ibx) = @_;
+	_reindex_check_unseen($self, $sync, $ibx);
+	_reindex_check_stale($self, $sync, $ibx) unless $sync->{quit};
+	delete @$ibx{qw(over mm search git)}; # won't need these for a bit
+}
+
+sub eidx_reindex {
+	my ($self, $sync) = @_;
+
+	for my $ibx (@{$self->{ibx_list}}) {
+		_reindex_inbox($self, $sync, $ibx);
+		last if $sync->{quit};
+	}
+	$self->git->async_wait_all; # ensure eidxq gets filled completely
+	eidxq_process($self, $sync) unless $sync->{quit};
+}
+
 sub eidx_sync { # main entry point
 	my ($self, $opt) = @_;
-	$self->idx_init($opt); # acquire lock via V2Writable::_idx_init
-	$self->{oidx}->rethread_prepare($opt);
 
 	my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ };
 	local $self->{current_info} = '';
 	local $SIG{__WARN__} = sub {
 		$warn_cb->($self->{current_info}, ': ', @_);
 	};
+	$self->idx_init($opt); # acquire lock via V2Writable::_idx_init
+	$self->{oidx}->rethread_prepare($opt);
 	my $sync = {
 		need_checkpoint => \(my $need_checkpoint = 0),
-		reindex => $opt->{reindex},
 		-opt => $opt,
+		# DO NOT SET {reindex} here, it's incompatible with reused
+		# V2Writable code, reindex is totally different here
+		# compared to v1/v2 inboxes because we have multiple histories
 		self => $self,
 		-regen_fmt => "%u/?\n",
 	};
@@ -415,6 +584,10 @@ sub eidx_sync { # main entry point
 	local $SIG{QUIT} = $quit;
 	local $SIG{INT} = $quit;
 	local $SIG{TERM} = $quit;
+	for my $ibx (@{$self->{ibx_list}}) {
+		$ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
+	}
+	eidx_reindex($self, $sync) if delete($opt->{reindex});
 
 	# don't use $_ here, it'll get clobbered by reindex_checkpoint
 	for my $ibx (@{$self->{ibx_list}}) {
@@ -422,6 +595,7 @@ sub eidx_sync { # main entry point
 		_sync_inbox($self, $sync, $ibx);
 	}
 	$self->{oidx}->rethread_done($opt) unless $sync->{quit};
+	eidxq_process($self, $sync) unless $sync->{quit};
 
 	PublicInbox::V2Writable::done($self);
 }
@@ -522,5 +696,6 @@ no warnings 'once';
 *count_shards = \&PublicInbox::V2Writable::count_shards;
 *atfork_child = \&PublicInbox::V2Writable::atfork_child;
 *idx_shard = \&PublicInbox::V2Writable::idx_shard;
+*reindex_checkpoint = \&PublicInbox::V2Writable::reindex_checkpoint;
 
 1;
diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm
index 38552247..4a39bf53 100644
--- a/lib/PublicInbox/OverIdx.pm
+++ b/lib/PublicInbox/OverIdx.pm
@@ -563,6 +563,15 @@ CREATE TABLE IF NOT EXISTS eidx_meta (
 	val VARCHAR(255) NOT NULL
 )
 
+		# A queue of current docids which need reindexing.
+		# eidxq persists across aborted -extindex invocations
+		# Currently used for "-extindex --reindex" for Xapian
+		# data, but may be used in more places down the line.
+		$dbh->do(<<'');
+CREATE TABLE IF NOT EXISTS eidxq (
+	docid INTEGER PRIMARY KEY NOT NULL
+)
+
 		$dbh;
 	};
 }
@@ -661,4 +670,18 @@ UPDATE over SET ddd = ? WHERE num = ?
 	$sth->execute;
 }
 
+sub eidxq_add {
+	my ($self, $docid) = @_;
+	$self->dbh->prepare_cached(<<'')->execute($docid);
+INSERT OR IGNORE INTO eidxq (docid) VALUES (?)
+
+}
+
+sub eidxq_del {
+	my ($self, $docid) = @_;
+	$self->dbh->prepare_cached(<<'')->execute($docid);
+DELETE FROM eidxq WHERE docid = ?
+
+}
+
 1;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 0fbe6560..cd8f4dd7 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -352,8 +352,9 @@ sub index_ids ($$$$) {
 	index_list_id($self, $doc, $hdr);
 }
 
-sub add_xapian ($$$$) {
+sub eml2doc ($$$;$) {
 	my ($self, $eml, $smsg, $mids) = @_;
+	$mids //= mids_for_index($eml);
 	my $doc = $X->{Document}->new;
 	add_val($doc, PublicInbox::Search::TS(), $smsg->{ts});
 	my @ds = gmtime($smsg->{ds});
@@ -396,6 +397,12 @@ sub add_xapian ($$$$) {
 			}
 		}
 	}
+	$doc;
+}
+
+sub add_xapian ($$$$) {
+	my ($self, $eml, $smsg, $mids) = @_;
+	my $doc = eml2doc($self, $eml, $smsg, $mids);
 	$self->{xdb}->replace_document($smsg->{num}, $doc);
 }
 
@@ -941,6 +948,10 @@ sub set_metadata_once {
 
 sub _commit_txn {
 	my ($self) = @_;
+	if (my $eidx = $self->{eidx}) {
+		$eidx->git->async_wait_all;
+		$eidx->{transact_bytes} = 0;
+	}
 	if (my $xdb = $self->{xdb}) {
 		set_metadata_once($self);
 		$xdb->commit_transaction;
@@ -997,4 +1008,68 @@ SELECT COUNT(*) FROM over WHERE num = ?
 	}
 }
 
+sub reindex_xap { # git->cat_async callback
+	my ($bref, $oid, $type, $size, $ary) = @_;
+	my ($ibx_id, $oidhex, $req, $more) = @$ary;
+	my $self = $req->{self} // die 'BUG: {self} missing';
+	my $eidx = $self->{eidx} // die 'BUG: {eidx} missing';
+	my $eidx_key = $self->{-eidx_key_for}->{$ibx_id} //
+			die "BUG: bad ibx_id=$ibx_id ($oid)";
+
+	my $docid = $req->{docid};
+	local $eidx->{current_info} = "#$docid $oid";
+	return if is_bad_blob($oid, $type, $size, $oidhex);
+	if (my $doc = $req->{doc}) { # modify existing doc
+		$req->{tg_isset} //= do { # for existing documents in {xdb}
+			term_generator($self)->set_document($doc);
+			1;
+		};
+		$doc->add_boolean_term('O'.$eidx_key);
+		index_list_id($self, $doc, PublicInbox::Eml->new($bref));
+	} else { # first time seeing this doc
+		my $smsg = $self->{eidx}->over->get_art($docid) //
+			die "BUG: #$docid ($oid) not in over";
+		$smsg->{bytes} = $size + crlf_adjust($$bref);
+		$smsg->{eidx_key} = $eidx_key;
+		my $eml = PublicInbox::Eml->new($bref);
+		$req->{doc} = eml2doc($self, $eml, $smsg);
+		$req->{tg_isset} = 1; # eml2doc calls $tg->set_document
+	}
+	return if $more;
+	my $doc = delete($req->{doc}) or return; # all bad blobs!
+	$eidx->{transact_bytes} += $size;
+	$self->{xdb}->replace_document($req->{docid}, $doc);
+}
+
+sub reindex_docid {
+	my ($self, $docid) = @_;
+	my $eidx = $self->{eidx} // die 'BUG: {eidx} missing';
+	my $eidx_key_for = $self->{-eidx_key_for} //= do {
+		my %eidx_key_for = map {
+			$_->[0] => $_->[1];
+		} @{$eidx->over->dbh->selectall_arrayref(<<'')};
+SELECT ibx_id,eidx_key FROM inboxes
+
+		\%eidx_key_for;
+	};
+
+	begin_txn_lazy($self);
+	my $doc = eval { $self->{xdb}->get_document($docid) };
+	my $req = { doc => $doc, self => $self, docid => $docid };
+	my $sth = $eidx->over->dbh->prepare_cached(<<'', undef, 1);
+SELECT ibx_id,oidbin FROM xref3 WHERE docid = ? ORDER BY ibx_id ASC
+
+	$sth->execute($docid);
+	my $rows = $sth->fetchall_arrayref;
+	while (my $row = shift(@$rows)) {
+		my ($ibx_id, $oidbin) = @$row;
+		my $oidhex = unpack('H*', $oidbin);
+		$eidx->git->cat_async($oidhex, \&reindex_xap,
+				[ $ibx_id, $oidhex, $req, scalar(@$rows) ]);
+	}
+	if ($eidx->{transact_bytes} >= $eidx->{batch_bytes}) {
+		commit_txn_lazy($self);
+	}
+}
+
 1;
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
index 579ed196..b6eef6bd 100644
--- a/lib/PublicInbox/SearchIdxShard.pm
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -94,6 +94,8 @@ sub shard_worker_loop ($$$$$) {
 			my $over_fn = $1;
 			$over_fn =~ tr/\0/\n/;
 			$self->over_check(PublicInbox::Over->new($over_fn));
+		} elsif ($line =~ /\AE ([0-9]+)\n/) {
+			$self->reindex_docid($1 + 0);
 		} else {
 			chomp $line;
 			my $eidx_key;
@@ -223,4 +225,13 @@ sub shard_over_check {
 	}
 }
 
+sub shard_reindex_docid {
+	my ($self, $docid) = @_;
+	if (my $w = $self->{w}) {
+		print $w "E $docid\n" or die "failed to write to shard: $!";
+	} else {
+		$self->reindex_docid($docid);
+	}
+}
+
 1;
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index bef3a67a..572eb418 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -275,13 +275,17 @@ sub _idx_init { # with_umask callback
 	$self->{shards} = $nshards if $nshards && $nshards != $self->{shards};
 	$self->{batch_bytes} = $opt->{batch_size} //
 				$PublicInbox::SearchIdx::BATCH_BYTES;
-	$self->{batch_bytes} *= $self->{shards} if $self->{parallel};
 
 	# need to create all shards before initializing msgmap FD
 	# idx_shards must be visible to all forked processes
 	my $max = $self->{shards} - 1;
 	my $idx = $self->{idx_shards} = [];
 	push @$idx, PublicInbox::SearchIdxShard->new($self, $_) for (0..$max);
+
+	# SearchIdxShard may do their own flushing, so don't scale
+	# until after forking
+	$self->{batch_bytes} *= $self->{shards} if $self->{parallel};
+
 	my $ibx = $self->{ibx} or return; # ExtIdxSearch
 
 	# Now that all subprocesses are up, we can open the FDs
diff --git a/t/extsearch.t b/t/extsearch.t
index 70a60b5a..f16ec0d1 100644
--- a/t/extsearch.t
+++ b/t/extsearch.t
@@ -5,6 +5,7 @@ use strict;
 use Test::More;
 use PublicInbox::TestCommon;
 use PublicInbox::Config;
+use PublicInbox::Search;
 use Fcntl qw(:seek);
 my $json = PublicInbox::Config::json() or plan skip_all => 'JSON missing';
 require_git(2.6);
@@ -101,7 +102,7 @@ my $es = PublicInbox::ExtSearch->new("$home/extindex");
 	is(scalar(@$xref3), 2, 'only to entries');
 }
 
-{
+if ('inbox edited') {
 	my ($in, $out, $err);
 	$in = $out = $err = '';
 	my $opt = { 0 => \$in, 1 => \$out, 2 => \$err };
@@ -176,13 +177,13 @@ is(scalar(@it), 2, 'two inboxes');
 like($it[0]->get_document->get_data, qr/v2test/, 'docdata matched v2');
 like($it[1]->get_document->get_data, qr/v1test/, 'docdata matched v1');
 
+my $cfg = PublicInbox::Config->new;
+my $schema_version = PublicInbox::Search::SCHEMA_VERSION();
 if ('inject w/o indexing') {
 	use PublicInbox::Import;
-	use PublicInbox::Search;
-	my $schema_version = PublicInbox::Search::SCHEMA_VERSION();
-	my $v1ibx = PublicInbox::Config->new->lookup_name('v1test');
+	my $v1ibx = $cfg->lookup_name('v1test');
 	my $last_v1_commit = $v1ibx->mm->last_commit;
-	my $v2ibx = PublicInbox::Config->new->lookup_name('v2test');
+	my $v2ibx = $cfg->lookup_name('v2test');
 	my $last_v2_commit = $v2ibx->mm->last_commit_xap($schema_version, 0);
 	my $git0 = PublicInbox::Git->new("$v2ibx->{inboxdir}/git/0.git");
 	chomp(my $cmt = $git0->qx(qw(rev-parse HEAD^0)));
@@ -230,6 +231,57 @@ if ('inject w/o indexing') {
 	is($mset->size, 1, 'got v2 message');
 }
 
+if ('reindex catches missed messages') {
+	use PublicInbox::InboxWritable;
+	use PublicInbox::OverIdx;
+	my $v2ibx = $cfg->lookup_name('v2test');
+	my $im = PublicInbox::InboxWritable->new($v2ibx)->importer(0);
+	my $cmt_a = $v2ibx->mm->last_commit_xap($schema_version, 0);
+	my $eml = eml_load('t/data/0001.patch');
+	$im->add($eml);
+	$im->done;
+	my $cmt_b = $v2ibx->mm->last_commit_xap($schema_version, 0);
+	isnt($cmt_a, $cmt_b, 'v2 0.git HEAD updated');
+	my $f = "$home/extindex/ei$schema_version/over.sqlite3";
+	my $oidx = PublicInbox::OverIdx->new($f);
+	$oidx->dbh;
+	my $uv = $v2ibx->uidvalidity;
+	my $lc_key = "lc-v2:v2.example//$uv;0";
+	is($oidx->eidx_meta($lc_key, $cmt_b), $cmt_a,
+		'update lc-v2 meta, old is as expected');
+	my $max = $oidx->max;
+	$oidx->dbh_close;
+	ok(run_script([qw(-extindex), "$home/extindex", $v2ibx->{inboxdir}]),
+		'-extindex noop');
+	is($oidx->max, $max, '->max unchanged');
+	is($oidx->eidx_meta($lc_key), $cmt_b, 'lc-v2 unchanged');
+	$oidx->dbh_close;
+	my $opt = { 2 => \(my $err = '') };
+	ok(run_script([qw(-extindex --reindex), "$home/extindex",
+			$v2ibx->{inboxdir}], undef, $opt),
+			'--reindex for unseen');
+	is($oidx->max, $max + 1, '->max bumped');
+	is($oidx->eidx_meta($lc_key), $cmt_b, 'lc-v2 stays unchanged');
+	my @err = split(/^/, $err);
+	is(scalar(@err), 1, 'only one warning');
+	like($err[0], qr/I: reindex_unseen/, 'got reindex_unseen message');
+	my $new = $oidx->get_art($max + 1);
+	is($new->{subject}, $eml->header('Subject'), 'new message added');
+
+	ok($im->remove($eml), 'remove new message from v2 inbox');
+	$im->done;
+	my $cmt_c = $v2ibx->mm->last_commit_xap($schema_version, 0);
+	is($oidx->eidx_meta($lc_key, $cmt_c), $cmt_b,
+		'bump lc-v2 meta again to skip v2 remove');
+	$err = '';
+	ok(run_script([qw(-extindex --reindex), "$home/extindex",
+			$v2ibx->{inboxdir}], undef, $opt),
+			'--reindex for stale');
+	@err = split(/^/, $err);
+	is(scalar(@err), 1, 'only one warning');
+	like($err[0], qr/\(#$new->{num}\): stale/, 'got stale message warning');
+}
+
 if ('remove v1test and test gc') {
 	xsys([qw(git config --unset publicinbox.v1test.inboxdir)],
 		{ GIT_CONFIG => $cfg_path });

^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 2/9] extindex: delete stale messages from over.sqlite3
  2020-12-15  2:02 ` [PATCH 0/9] extindex: --reindex support Eric Wong
  2020-12-15  2:02   ` [PATCH 1/9] extindex: preliminary " Eric Wong
@ 2020-12-15  2:02   ` Eric Wong
  2020-12-15  2:02   ` [PATCH 3/9] over: sort xref3 by xnum if ibx_id repeats Eric Wong
                     ` (7 subsequent siblings)
  9 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2020-12-15  2:02 UTC (permalink / raw)
  To: meta

In addition to removing stale messages from Xapian, we must
also remove them from over.sqlite3.
---
 lib/PublicInbox/ExtSearchIdx.pm |  1 +
 t/extsearch.t                   | 12 ++++++++++++
 2 files changed, 13 insertions(+)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 394a89d4..ec86a7c0 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -533,6 +533,7 @@ DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
 		my $xr3 = $self->{oidx}->get_xref3($docid);
 		my $idx = $self->idx_shard($docid);
 		if (scalar(@$xr3) == 0) { # all gone
+			$self->{oidx}->delete_by_num($docid);
 			$self->{oidx}->eidxq_del($docid);
 			$idx->shard_remove($docid);
 		} else { # enqueue for reindex of remaining messages
diff --git a/t/extsearch.t b/t/extsearch.t
index f16ec0d1..4a8a9f49 100644
--- a/t/extsearch.t
+++ b/t/extsearch.t
@@ -268,18 +268,30 @@ if ('reindex catches missed messages') {
 	my $new = $oidx->get_art($max + 1);
 	is($new->{subject}, $eml->header('Subject'), 'new message added');
 
+	$es->{xdb}->reopen;
+	my $mset = $es->mset("mid:$new->{mid}");
+	is($mset->size, 1, 'previously unseen, now indexed in Xapian');
+
 	ok($im->remove($eml), 'remove new message from v2 inbox');
 	$im->done;
 	my $cmt_c = $v2ibx->mm->last_commit_xap($schema_version, 0);
 	is($oidx->eidx_meta($lc_key, $cmt_c), $cmt_b,
 		'bump lc-v2 meta again to skip v2 remove');
 	$err = '';
+	$oidx->dbh_close;
 	ok(run_script([qw(-extindex --reindex), "$home/extindex",
 			$v2ibx->{inboxdir}], undef, $opt),
 			'--reindex for stale');
 	@err = split(/^/, $err);
 	is(scalar(@err), 1, 'only one warning');
 	like($err[0], qr/\(#$new->{num}\): stale/, 'got stale message warning');
+	is($oidx->get_art($new->{num}), undef,
+		'stale message gone from over');
+	is_deeply($oidx->get_xref3($new->{num}), [],
+		'stale message has no xref3');
+	$es->{xdb}->reopen;
+	$mset = $es->mset("mid:$new->{mid}");
+	is($mset->size, 0, 'stale mid gone Xapian');
 }
 
 if ('remove v1test and test gc') {

^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 3/9] over: sort xref3 by xnum if ibx_id repeats
  2020-12-15  2:02 ` [PATCH 0/9] extindex: --reindex support Eric Wong
  2020-12-15  2:02   ` [PATCH 1/9] extindex: preliminary " Eric Wong
  2020-12-15  2:02   ` [PATCH 2/9] extindex: delete stale messages from over.sqlite3 Eric Wong
@ 2020-12-15  2:02   ` Eric Wong
  2020-12-15  2:02   ` [PATCH 4/9] extindex: support --rethread and content bifurcation Eric Wong
                     ` (6 subsequent siblings)
  9 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2020-12-15  2:02 UTC (permalink / raw)
  To: meta

While unlikely to happen, it may be possible for messages
from the same inbox to get indexed multiple times.  Provide
consistent results in this case for ease-of-testing.
---
 lib/PublicInbox/Over.pm | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/PublicInbox/Over.pm b/lib/PublicInbox/Over.pm
index 51284601..62709468 100644
--- a/lib/PublicInbox/Over.pm
+++ b/lib/PublicInbox/Over.pm
@@ -264,7 +264,7 @@ sub get_xref3 {
 	my ($self, $num) = @_;
 	my $dbh = dbh($self);
 	my $sth = $dbh->prepare_cached(<<'', undef, 1);
-SELECT ibx_id,xnum,oidbin FROM xref3 WHERE docid = ? ORDER BY ibx_id ASC
+SELECT ibx_id,xnum,oidbin FROM xref3 WHERE docid = ? ORDER BY ibx_id,xnum ASC
 
 	$sth->execute($num);
 	my $rows = $sth->fetchall_arrayref;

^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 4/9] extindex: support --rethread and content bifurcation
  2020-12-15  2:02 ` [PATCH 0/9] extindex: --reindex support Eric Wong
                     ` (2 preceding siblings ...)
  2020-12-15  2:02   ` [PATCH 3/9] over: sort xref3 by xnum if ibx_id repeats Eric Wong
@ 2020-12-15  2:02   ` Eric Wong
  2020-12-15  2:02   ` [PATCH 5/9] extsearchidx: reindex works on Xapian, too Eric Wong
                     ` (5 subsequent siblings)
  9 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2020-12-15  2:02 UTC (permalink / raw)
  To: meta

--rethread is useful for dealing with bugs and behaves
just like it does with current inboxes.

This is in case our content deduplication logic changes for
whatever reason and causes previously merged messages to be
considered "different".  As with v2, this won't allow us to
merge messages in a way that allows deduplicating messages which
were previously considered different, but v2 inboxes do not
allow that, either.

In other words, this makes the --reindex and --rethread
switches of -extindex match the behavior of v2 -index.
---
 lib/PublicInbox/ExtSearchIdx.pm | 117 ++++++++++++++++++++++++++++++--
 lib/PublicInbox/Over.pm         |   3 +-
 t/extsearch.t                   |  71 +++++++++++++++++--
 3 files changed, 180 insertions(+), 11 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index ec86a7c0..c77fb197 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -395,6 +395,106 @@ DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over)
 	done($self);
 }
 
+sub _reindex_finalize ($$$) {
+	my ($req, $smsg, $eml) = @_;
+	my $sync = $req->{sync};
+	my $self = $sync->{self};
+	my $by_chash = $req->{by_chash};
+	my $nr = scalar(keys(%$by_chash)) or die 'BUG: no content hashes';
+	my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
+	my $docid = $smsg->{num} = $orig_smsg->{num};
+	$self->{oidx}->add_overview($eml, $smsg); # may rethread
+	return if $nr == 1; # likely, all good
+
+	warn "W: #$docid split into $nr due to deduplication change\n";
+	my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
+	delete($by_chash->{$chash0}) // die "BUG: $smsg->{blob} chash missing";
+	for my $ary (values %$by_chash) {
+		for my $x (reverse @$ary) {
+			my $n = $self->{oidx}->remove_xref3($docid, $x->{blob});
+			die "BUG: $x->{blob} invalidated #$docid" if $n == 0;
+		}
+		my $x = $ary->[-1] // die "BUG: #$docid {by_chash} empty";
+		$x->{num} = delete($x->{xnum}) // die '{xnum} unset';
+		my $ibx_id = delete($x->{ibx_id}) // die '{ibx_id} unset';
+		my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
+		my $ibx = $self->{ibx_list}->[$pos] //
+			die "BUG: ibx for $x->{blob} not mapped";
+		my $e = $ibx->over->get_art($x->{num});
+		$e->{blob} eq $x->{blob} or die <<EOF;
+$x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num});
+EOF
+		reindex_unseen($self, $sync, $ibx, $e);
+	}
+}
+
+sub _reindex_oid { # git->cat_async callback
+	my ($bref, $oid, $type, $size, $req) = @_;
+	my $sync = $req->{sync};
+	my $self = $sync->{self};
+	my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
+	my $expect_oid = $req->{xr3}->[$req->{ix}]->[2];
+	my $docid = $orig_smsg->{num};
+	if (is_bad_blob($oid, $type, $size, $expect_oid)) {
+		my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid);
+		if ($remain == 0) {
+			warn "W: #$docid gone or corrupted\n";
+			$self->idx_shard($docid)->shard_remove($docid);
+		} elsif (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+			$self->git->cat_async($next_oid, \&_reindex_oid, $req);
+		} else {
+			warn "BUG: #$docid gone (UNEXPECTED)\n";
+			$self->idx_shard($docid)->shard_remove($docid);
+		}
+		return;
+	}
+	my $ci = $self->{current_info};
+	local $self->{current_info} = "$ci #$docid $oid";
+	my $re_smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
+	$re_smsg->{bytes} = $size + crlf_adjust($$bref);
+	my $eml = PublicInbox::Eml->new($bref);
+	$re_smsg->populate($eml, { autime => $orig_smsg->{ds},
+				cotime => $orig_smsg->{ts} });
+	my $chash = content_hash($eml);
+	$re_smsg->{chash} = $chash;
+	$re_smsg->{xnum} = $req->{xr3}->[$req->{ix}]->[1];
+	$re_smsg->{ibx_id} = $req->{xr3}->[$req->{ix}]->[0];
+	push @{$req->{by_chash}->{$chash}}, $re_smsg;
+	if (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+		$self->git->cat_async($next_oid, \&_reindex_oid, $req);
+	} else { # last $re_smsg is the highest priority xref3
+		local $self->{current_info} = "$ci #$docid";
+		_reindex_finalize($req, $re_smsg, $eml);
+	}
+}
+
+sub _reindex_smsg ($$$) {
+	my ($self, $sync, $smsg) = @_;
+	my $docid = $smsg->{num};
+	my $xr3 = $self->{oidx}->get_xref3($docid, 1);
+	if (scalar(@$xr3) == 0) { # _reindex_check_stale should've covered this
+		warn <<"";
+BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
+
+		$self->{oidx}->delete_by_num($docid);
+		$self->idx_shard($docid)->shard_remove($docid);
+		return;
+	}
+
+	# we sort {xr3} in the reverse order of {ibx_list} so we can
+	# hit the common case in _reindex_finalize without rereading
+	# from git (or holding multiple messages in memory).
+	my $id2pos = $sync->{id2pos}; # index in {ibx_list}
+	@$xr3 = sort {
+		$id2pos->{$b->[0]} <=> $id2pos->{$a->[0]}
+				||
+		$b->[1] <=> $a->[1] # break ties with {xnum}
+	} @$xr3;
+	@$xr3 = map { [ $_->[0], $_->[1], unpack('H*', $_->[2]) ] } @$xr3;
+	my $req = { orig_smsg => $smsg, sync => $sync, xr3 => $xr3, ix => 0 };
+	$self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
+}
+
 sub eidxq_process ($$) { # for reindexing
 	my ($self, $sync) = @_;
 
@@ -410,19 +510,29 @@ sub eidxq_process ($$) { # for reindexing
 		my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq');
 		$pr->("Xapian indexing $min..$max (total=$tot)\n");
 	}
+	my %id2pos;
+	my $pos = 0;
+	$id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}};
+	$sync->{id2pos} = \%id2pos;
 
 	my $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
 	my $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
 	$iter->execute;
 	while (defined(my $docid = $iter->fetchrow_array)) {
-		$self->idx_shard($docid)->shard_reindex_docid($docid);
-		$del->execute($docid);
 		last if $sync->{quit};
+		if (my $smsg = $self->{oidx}->get_art($docid)) {
+			_reindex_smsg($self, $sync, $smsg);
+		} else {
+			warn "E: #$docid does not exist in over\n";
+		}
+		$del->execute($docid);
+
 		my $cur = ++${$sync->{nr}};
 
 		# shards flush on their own, just don't queue up too many
 		# deletes
 		if (($cur % 1000) == 0) {
+			$self->git->async_wait_all;
 			$self->{oidx}->commit_lazy;
 			$self->{oidx}->begin_lazy;
 			$pr->("reindexed $cur/$tot\n") if $pr;
@@ -430,9 +540,8 @@ sub eidxq_process ($$) { # for reindexing
 		# this is only for SIGUSR1, shards do their own accounting:
 		reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}};
 	}
+	$self->git->async_wait_all;
 	$pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr;
-	$self->{oidx}->commit_lazy;
-	$self->{oidx}->begin_lazy;
 }
 
 sub _reindex_unseen { # git->cat_async callback
diff --git a/lib/PublicInbox/Over.pm b/lib/PublicInbox/Over.pm
index 62709468..49ba180b 100644
--- a/lib/PublicInbox/Over.pm
+++ b/lib/PublicInbox/Over.pm
@@ -261,13 +261,14 @@ SELECT num,tid,ds,ts,ddd FROM over WHERE num = ? LIMIT 1
 }
 
 sub get_xref3 {
-	my ($self, $num) = @_;
+	my ($self, $num, $raw) = @_;
 	my $dbh = dbh($self);
 	my $sth = $dbh->prepare_cached(<<'', undef, 1);
 SELECT ibx_id,xnum,oidbin FROM xref3 WHERE docid = ? ORDER BY ibx_id,xnum ASC
 
 	$sth->execute($num);
 	my $rows = $sth->fetchall_arrayref;
+	return $rows if $raw;
 	my $eidx_key_sth = $dbh->prepare_cached(<<'', undef, 1);
 SELECT eidx_key FROM inboxes WHERE ibx_id = ?
 
diff --git a/t/extsearch.t b/t/extsearch.t
index 4a8a9f49..fb31b0ab 100644
--- a/t/extsearch.t
+++ b/t/extsearch.t
@@ -6,12 +6,14 @@ use Test::More;
 use PublicInbox::TestCommon;
 use PublicInbox::Config;
 use PublicInbox::Search;
+use PublicInbox::InboxWritable;
 use Fcntl qw(:seek);
 my $json = PublicInbox::Config::json() or plan skip_all => 'JSON missing';
 require_git(2.6);
 require_mods(qw(DBD::SQLite Search::Xapian));
 use_ok 'PublicInbox::ExtSearch';
 use_ok 'PublicInbox::ExtSearchIdx';
+use_ok 'PublicInbox::OverIdx';
 my $sock = tcp_server();
 my $host_port = $sock->sockhost . ':' . $sock->sockport;
 my ($home, $for_destroy) = tmpdir();
@@ -179,6 +181,8 @@ like($it[1]->get_document->get_data, qr/v1test/, 'docdata matched v1');
 
 my $cfg = PublicInbox::Config->new;
 my $schema_version = PublicInbox::Search::SCHEMA_VERSION();
+my $f = "$home/extindex/ei$schema_version/over.sqlite3";
+my $oidx = PublicInbox::OverIdx->new($f);
 if ('inject w/o indexing') {
 	use PublicInbox::Import;
 	my $v1ibx = $cfg->lookup_name('v1test');
@@ -232,8 +236,6 @@ if ('inject w/o indexing') {
 }
 
 if ('reindex catches missed messages') {
-	use PublicInbox::InboxWritable;
-	use PublicInbox::OverIdx;
 	my $v2ibx = $cfg->lookup_name('v2test');
 	my $im = PublicInbox::InboxWritable->new($v2ibx)->importer(0);
 	my $cmt_a = $v2ibx->mm->last_commit_xap($schema_version, 0);
@@ -242,8 +244,6 @@ if ('reindex catches missed messages') {
 	$im->done;
 	my $cmt_b = $v2ibx->mm->last_commit_xap($schema_version, 0);
 	isnt($cmt_a, $cmt_b, 'v2 0.git HEAD updated');
-	my $f = "$home/extindex/ei$schema_version/over.sqlite3";
-	my $oidx = PublicInbox::OverIdx->new($f);
 	$oidx->dbh;
 	my $uv = $v2ibx->uidvalidity;
 	my $lc_key = "lc-v2:v2.example//$uv;0";
@@ -263,7 +263,7 @@ if ('reindex catches missed messages') {
 	is($oidx->max, $max + 1, '->max bumped');
 	is($oidx->eidx_meta($lc_key), $cmt_b, 'lc-v2 stays unchanged');
 	my @err = split(/^/, $err);
-	is(scalar(@err), 1, 'only one warning');
+	is(scalar(@err), 1, 'only one warning') or diag "err=$err";
 	like($err[0], qr/I: reindex_unseen/, 'got reindex_unseen message');
 	my $new = $oidx->get_art($max + 1);
 	is($new->{subject}, $eml->header('Subject'), 'new message added');
@@ -283,7 +283,7 @@ if ('reindex catches missed messages') {
 			$v2ibx->{inboxdir}], undef, $opt),
 			'--reindex for stale');
 	@err = split(/^/, $err);
-	is(scalar(@err), 1, 'only one warning');
+	is(scalar(@err), 1, 'only one warning') or diag "err=$err";
 	like($err[0], qr/\(#$new->{num}\): stale/, 'got stale message warning');
 	is($oidx->get_art($new->{num}), undef,
 		'stale message gone from over');
@@ -294,6 +294,65 @@ if ('reindex catches missed messages') {
 	is($mset->size, 0, 'stale mid gone Xapian');
 }
 
+if ('reindex catches content bifurcation') {
+	use PublicInbox::MID qw(mids);
+	my $v2ibx = $cfg->lookup_name('v2test');
+	my $im = PublicInbox::InboxWritable->new($v2ibx)->importer(0);
+	my $eml = eml_load('t/data/message_embed.eml');
+	my $cmt_a = $v2ibx->mm->last_commit_xap($schema_version, 0);
+	$im->add($eml);
+	$im->done;
+	my $cmt_b = $v2ibx->mm->last_commit_xap($schema_version, 0);
+	my $uv = $v2ibx->uidvalidity;
+	my $lc_key = "lc-v2:v2.example//$uv;0";
+	$oidx->dbh;
+	is($oidx->eidx_meta($lc_key, $cmt_b), $cmt_a,
+		'update lc-v2 meta, old is as expected');
+	my $mid = mids($eml)->[0];
+	my $smsg = $v2ibx->over->next_by_mid($mid, \(my $id), \(my $prev));
+	my $oldmax = $oidx->max;
+	my $x3_orig = $oidx->get_xref3(3);
+	is(scalar(@$x3_orig), 1, '#3 has one xref');
+	$oidx->add_xref3(3, $smsg->{num}, $smsg->{blob}, 'v2.example');
+	my $x3 = $oidx->get_xref3(3);
+	is(scalar(@$x3), 2, 'injected xref3');
+	$oidx->commit_lazy;
+	my $opt = { 2 => \(my $err = '') };
+	ok(run_script([qw(-extindex --all), "$home/extindex"], undef, $opt),
+		'extindex --all is noop');
+	is($err, '', 'no warnings in index');
+	$oidx->dbh;
+	is($oidx->max, $oldmax, 'oidx->max unchanged');
+	$oidx->dbh_close;
+	ok(run_script([qw(-extindex --reindex --all), "$home/extindex"],
+		undef, $opt), 'extindex --reindex');
+	$oidx->dbh;
+	ok($oidx->max > $oldmax, 'oidx->max bumped');
+	like($err, qr/split into 2 due to deduplication change/,
+		'bifurcation noted');
+	my $added = $oidx->get_art($oidx->max);
+	is($added->{blob}, $smsg->{blob}, 'new blob indexed');
+	is_deeply(["v2.example:$smsg->{num}:$smsg->{blob}"],
+		$oidx->get_xref3($added->{num}),
+		'xref3 corrected for bifurcated message');
+	is_deeply($oidx->get_xref3(3), $x3_orig, 'xref3 restored for #3');
+}
+
+if ('--reindex --rethread') {
+	my $before = $oidx->dbh->selectrow_array(<<'');
+SELECT MAX(tid) FROM over WHERE num > 0
+
+	my $opt = {};
+	ok(run_script([qw(-extindex --reindex --rethread --all),
+			"$home/extindex"], undef, $opt),
+			'--rethread');
+	my $after = $oidx->dbh->selectrow_array(<<'');
+SELECT MIN(tid) FROM over WHERE num > 0
+
+	# actual rethread logic is identical to v1/v2 and tested elsewhere
+	ok($after > $before, '--rethread updates MIN(tid)');
+}
+
 if ('remove v1test and test gc') {
 	xsys([qw(git config --unset publicinbox.v1test.inboxdir)],
 		{ GIT_CONFIG => $cfg_path });

^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 5/9] extsearchidx: reindex works on Xapian, too
  2020-12-15  2:02 ` [PATCH 0/9] extindex: --reindex support Eric Wong
                     ` (3 preceding siblings ...)
  2020-12-15  2:02   ` [PATCH 4/9] extindex: support --rethread and content bifurcation Eric Wong
@ 2020-12-15  2:02   ` Eric Wong
  2020-12-15  2:02   ` [PATCH 6/9] extsearchidx: checkpoint releases locks Eric Wong
                     ` (4 subsequent siblings)
  9 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2020-12-15  2:02 UTC (permalink / raw)
  To: meta

Instead of just working on over.sqlite3, we need to work on
the Xapian DBs as well.  While no changes to our Xapian use
have taken place recently, they could in the future and
--reindex exists to account for that.
---
 lib/PublicInbox/ExtSearchIdx.pm | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index c77fb197..f29a84e3 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -404,13 +404,18 @@ sub _reindex_finalize ($$$) {
 	my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
 	my $docid = $smsg->{num} = $orig_smsg->{num};
 	$self->{oidx}->add_overview($eml, $smsg); # may rethread
-	return if $nr == 1; # likely, all good
-
+	$self->{transact_bytes} += $smsg->{bytes};
+	if ($nr == 1) { # likely, all good
+		$self->idx_shard($docid)->shard_reindex_docid($docid);
+		return;
+	}
 	warn "W: #$docid split into $nr due to deduplication change\n";
 	my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
 	delete($by_chash->{$chash0}) // die "BUG: $smsg->{blob} chash missing";
+	my @todo;
 	for my $ary (values %$by_chash) {
 		for my $x (reverse @$ary) {
+			warn "removing #$docid xref3 $x->{blob}\n";
 			my $n = $self->{oidx}->remove_xref3($docid, $x->{blob});
 			die "BUG: $x->{blob} invalidated #$docid" if $n == 0;
 		}
@@ -424,6 +429,12 @@ sub _reindex_finalize ($$$) {
 		$e->{blob} eq $x->{blob} or die <<EOF;
 $x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num});
 EOF
+		push @todo, $ibx, $e;
+	}
+	$self->{oidx}->commit_lazy; # ensure shard workers can see xref removals
+	$self->{oidx}->begin_lazy;
+	$self->idx_shard($docid)->shard_reindex_docid($docid);
+	while (my ($ibx, $e) = splice(@todo, 0, 2)) {
 		reindex_unseen($self, $sync, $ibx, $e);
 	}
 }
@@ -531,11 +542,12 @@ sub eidxq_process ($$) { # for reindexing
 
 		# shards flush on their own, just don't queue up too many
 		# deletes
-		if (($cur % 1000) == 0) {
+		if ($self->{transact_bytes} >= $self->{batch_bytes}) {
 			$self->git->async_wait_all;
 			$self->{oidx}->commit_lazy;
 			$self->{oidx}->begin_lazy;
 			$pr->("reindexed $cur/$tot\n") if $pr;
+			$self->{transact_bytes} = 0;
 		}
 		# this is only for SIGUSR1, shards do their own accounting:
 		reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}};

^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 6/9] extsearchidx: checkpoint releases locks
  2020-12-15  2:02 ` [PATCH 0/9] extindex: --reindex support Eric Wong
                     ` (4 preceding siblings ...)
  2020-12-15  2:02   ` [PATCH 5/9] extsearchidx: reindex works on Xapian, too Eric Wong
@ 2020-12-15  2:02   ` Eric Wong
  2020-12-15  2:02   ` [PATCH 7/9] extsearchidx: simplify reindex code paths Eric Wong
                     ` (3 subsequent siblings)
  9 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2020-12-15  2:02 UTC (permalink / raw)
  To: meta

--reindex can take many hours or days, ensure we release
locks according to --batch-size so automated fetch+index
jobs can write new data to indices while we update old data.
---
 lib/PublicInbox/ExtSearchIdx.pm | 19 ++++++-------------
 lib/PublicInbox/V2Writable.pm   |  2 +-
 2 files changed, 7 insertions(+), 14 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index f29a84e3..3b021a1b 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -404,7 +404,7 @@ sub _reindex_finalize ($$$) {
 	my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
 	my $docid = $smsg->{num} = $orig_smsg->{num};
 	$self->{oidx}->add_overview($eml, $smsg); # may rethread
-	$self->{transact_bytes} += $smsg->{bytes};
+	check_batch_limit({ %$sync, new_smsg => $smsg });
 	if ($nr == 1) { # likely, all good
 		$self->idx_shard($docid)->shard_reindex_docid($docid);
 		return;
@@ -537,18 +537,8 @@ sub eidxq_process ($$) { # for reindexing
 			warn "E: #$docid does not exist in over\n";
 		}
 		$del->execute($docid);
+		++${$sync->{nr}};
 
-		my $cur = ++${$sync->{nr}};
-
-		# shards flush on their own, just don't queue up too many
-		# deletes
-		if ($self->{transact_bytes} >= $self->{batch_bytes}) {
-			$self->git->async_wait_all;
-			$self->{oidx}->commit_lazy;
-			$self->{oidx}->begin_lazy;
-			$pr->("reindexed $cur/$tot\n") if $pr;
-			$self->{transact_bytes} = 0;
-		}
 		# this is only for SIGUSR1, shards do their own accounting:
 		reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}};
 	}
@@ -709,7 +699,10 @@ sub eidx_sync { # main entry point
 	for my $ibx (@{$self->{ibx_list}}) {
 		$ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key);
 	}
-	eidx_reindex($self, $sync) if delete($opt->{reindex});
+	if (delete($opt->{reindex})) {
+		$sync->{checkpoint_unlocks} = 1;
+		eidx_reindex($self, $sync);
+	}
 
 	# don't use $_ here, it'll get clobbered by reindex_checkpoint
 	for my $ibx (@{$self->{ibx_list}}) {
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 572eb418..97dbf328 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -884,7 +884,7 @@ sub reindex_checkpoint ($$) {
 	my $mm_tmp = $sync->{mm_tmp};
 	$mm_tmp->atfork_prepare if $mm_tmp;
 	die 'BUG: {im} during reindex' if $self->{im};
-	if ($self->{ibx_map}) {
+	if ($self->{ibx_map} && !$sync->{checkpoint_unlocks}) {
 		checkpoint($self, 1); # no need to release lock on pure index
 	} else {
 		$self->done; # release lock

^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 7/9] extsearchidx: simplify reindex code paths
  2020-12-15  2:02 ` [PATCH 0/9] extindex: --reindex support Eric Wong
                     ` (5 preceding siblings ...)
  2020-12-15  2:02   ` [PATCH 6/9] extsearchidx: checkpoint releases locks Eric Wong
@ 2020-12-15  2:02   ` Eric Wong
  2020-12-15  2:02   ` [PATCH 8/9] extsearchidx: reindex releases over.sqlite3 handles properly Eric Wong
                     ` (2 subsequent siblings)
  9 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2020-12-15  2:02 UTC (permalink / raw)
  To: meta

Since we're inside a Xapian transaction, calling ->index_raw
followed by ->shard_add_eidx_info calls on the same docid
doesn't seem to hurt indexing performance.  It definitely
reduces FS read traffic and IPC from git at the cost of some
more IPC between the parent and workers.  Nevertheless, the code
and FD reductions seem worth it.
---
 lib/PublicInbox/ExtSearchIdx.pm   | 58 ++++++++++++++++------------
 lib/PublicInbox/SearchIdx.pm      | 64 -------------------------------
 lib/PublicInbox/SearchIdxShard.pm | 17 ++------
 3 files changed, 37 insertions(+), 102 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 3b021a1b..d5295735 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -149,7 +149,7 @@ sub index_unseen ($) {
 	my $oid = $new_smsg->{blob};
 	my $ibx = delete $req->{ibx} or die 'BUG: {ibx} unset';
 	$self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key);
-	$idx->index_raw(undef, $eml, $new_smsg, $ibx);
+	$idx->index_raw(undef, $eml, $new_smsg, $ibx->eidx_key);
 	check_batch_limit($req);
 }
 
@@ -395,23 +395,39 @@ DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over)
 	done($self);
 }
 
+sub _ibx_for ($$$) {
+	my ($self, $sync, $smsg) = @_;
+	my $ibx_id = delete($smsg->{ibx_id}) // die '{ibx_id} unset';
+	my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
+	$self->{ibx_list}->[$pos] // die "BUG: ibx for $smsg->{blob} not mapped"
+}
+
 sub _reindex_finalize ($$$) {
 	my ($req, $smsg, $eml) = @_;
 	my $sync = $req->{sync};
 	my $self = $sync->{self};
-	my $by_chash = $req->{by_chash};
+	my $by_chash = delete $req->{by_chash} or die 'BUG: no {by_chash}';
 	my $nr = scalar(keys(%$by_chash)) or die 'BUG: no content hashes';
 	my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
 	my $docid = $smsg->{num} = $orig_smsg->{num};
 	$self->{oidx}->add_overview($eml, $smsg); # may rethread
 	check_batch_limit({ %$sync, new_smsg => $smsg });
-	if ($nr == 1) { # likely, all good
-		$self->idx_shard($docid)->shard_reindex_docid($docid);
-		return;
+	my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
+	my $stable = delete($by_chash->{$chash0}) //
+				die "BUG: $smsg->{blob} chash missing";
+	my $idx = $self->idx_shard($docid);
+	my $top_smsg = pop @$stable;
+	$top_smsg == $smsg or die 'BUG: top_smsg != smsg';
+	my $ibx = _ibx_for($self, $sync, $smsg);
+	$idx->index_raw(undef, $eml, $smsg, $ibx->eidx_key);
+	for my $x (reverse @$stable) {
+		$ibx = _ibx_for($self, $sync, $x);
+		my $hdr = delete $x->{hdr} // die 'BUG: no {hdr}';
+		$idx->shard_add_eidx_info($docid, $ibx->eidx_key, $hdr);
 	}
+	return if $nr == 1; # likely, all good
+
 	warn "W: #$docid split into $nr due to deduplication change\n";
-	my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
-	delete($by_chash->{$chash0}) // die "BUG: $smsg->{blob} chash missing";
 	my @todo;
 	for my $ary (values %$by_chash) {
 		for my $x (reverse @$ary) {
@@ -419,21 +435,16 @@ sub _reindex_finalize ($$$) {
 			my $n = $self->{oidx}->remove_xref3($docid, $x->{blob});
 			die "BUG: $x->{blob} invalidated #$docid" if $n == 0;
 		}
-		my $x = $ary->[-1] // die "BUG: #$docid {by_chash} empty";
+		my $x = pop(@$ary) // die "BUG: #$docid {by_chash} empty";
 		$x->{num} = delete($x->{xnum}) // die '{xnum} unset';
-		my $ibx_id = delete($x->{ibx_id}) // die '{ibx_id} unset';
-		my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
-		my $ibx = $self->{ibx_list}->[$pos] //
-			die "BUG: ibx for $x->{blob} not mapped";
+		$ibx = _ibx_for($self, $sync, $x);
 		my $e = $ibx->over->get_art($x->{num});
 		$e->{blob} eq $x->{blob} or die <<EOF;
 $x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num});
 EOF
 		push @todo, $ibx, $e;
 	}
-	$self->{oidx}->commit_lazy; # ensure shard workers can see xref removals
-	$self->{oidx}->begin_lazy;
-	$self->idx_shard($docid)->shard_reindex_docid($docid);
+	undef $by_chash;
 	while (my ($ibx, $e) = splice(@todo, 0, 2)) {
 		reindex_unseen($self, $sync, $ibx, $e);
 	}
@@ -444,14 +455,14 @@ sub _reindex_oid { # git->cat_async callback
 	my $sync = $req->{sync};
 	my $self = $sync->{self};
 	my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
-	my $expect_oid = $req->{xr3}->[$req->{ix}]->[2];
+	my $expect_oid = $req->{xr3r}->[$req->{ix}]->[2];
 	my $docid = $orig_smsg->{num};
 	if (is_bad_blob($oid, $type, $size, $expect_oid)) {
 		my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid);
 		if ($remain == 0) {
 			warn "W: #$docid gone or corrupted\n";
 			$self->idx_shard($docid)->shard_remove($docid);
-		} elsif (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+		} elsif (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) {
 			$self->git->cat_async($next_oid, \&_reindex_oid, $req);
 		} else {
 			warn "BUG: #$docid gone (UNEXPECTED)\n";
@@ -468,10 +479,11 @@ sub _reindex_oid { # git->cat_async callback
 				cotime => $orig_smsg->{ts} });
 	my $chash = content_hash($eml);
 	$re_smsg->{chash} = $chash;
-	$re_smsg->{xnum} = $req->{xr3}->[$req->{ix}]->[1];
-	$re_smsg->{ibx_id} = $req->{xr3}->[$req->{ix}]->[0];
+	$re_smsg->{xnum} = $req->{xr3r}->[$req->{ix}]->[1];
+	$re_smsg->{ibx_id} = $req->{xr3r}->[$req->{ix}]->[0];
+	$re_smsg->{hdr} = $eml->header_obj;
 	push @{$req->{by_chash}->{$chash}}, $re_smsg;
-	if (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+	if (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) {
 		$self->git->cat_async($next_oid, \&_reindex_oid, $req);
 	} else { # last $re_smsg is the highest priority xref3
 		local $self->{current_info} = "$ci #$docid";
@@ -492,7 +504,7 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
 		return;
 	}
 
-	# we sort {xr3} in the reverse order of {ibx_list} so we can
+	# we sort {xr3r} in the reverse order of {ibx_list} so we can
 	# hit the common case in _reindex_finalize without rereading
 	# from git (or holding multiple messages in memory).
 	my $id2pos = $sync->{id2pos}; # index in {ibx_list}
@@ -502,15 +514,13 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
 		$b->[1] <=> $a->[1] # break ties with {xnum}
 	} @$xr3;
 	@$xr3 = map { [ $_->[0], $_->[1], unpack('H*', $_->[2]) ] } @$xr3;
-	my $req = { orig_smsg => $smsg, sync => $sync, xr3 => $xr3, ix => 0 };
+	my $req = { orig_smsg => $smsg, sync => $sync, xr3r => $xr3, ix => 0 };
 	$self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
 }
 
 sub eidxq_process ($$) { # for reindexing
 	my ($self, $sync) = @_;
 
-	$self->{oidx}->commit_lazy; # ensure shard workers can see it
-	$self->{oidx}->begin_lazy;
 	my $dbh = $self->{oidx}->dbh;
 	my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
 	${$sync->{nr}} = 0;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index cd8f4dd7..c6d2a0e8 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -1008,68 +1008,4 @@ SELECT COUNT(*) FROM over WHERE num = ?
 	}
 }
 
-sub reindex_xap { # git->cat_async callback
-	my ($bref, $oid, $type, $size, $ary) = @_;
-	my ($ibx_id, $oidhex, $req, $more) = @$ary;
-	my $self = $req->{self} // die 'BUG: {self} missing';
-	my $eidx = $self->{eidx} // die 'BUG: {eidx} missing';
-	my $eidx_key = $self->{-eidx_key_for}->{$ibx_id} //
-			die "BUG: bad ibx_id=$ibx_id ($oid)";
-
-	my $docid = $req->{docid};
-	local $eidx->{current_info} = "#$docid $oid";
-	return if is_bad_blob($oid, $type, $size, $oidhex);
-	if (my $doc = $req->{doc}) { # modify existing doc
-		$req->{tg_isset} //= do { # for existing documents in {xdb}
-			term_generator($self)->set_document($doc);
-			1;
-		};
-		$doc->add_boolean_term('O'.$eidx_key);
-		index_list_id($self, $doc, PublicInbox::Eml->new($bref));
-	} else { # first time seeing this doc
-		my $smsg = $self->{eidx}->over->get_art($docid) //
-			die "BUG: #$docid ($oid) not in over";
-		$smsg->{bytes} = $size + crlf_adjust($$bref);
-		$smsg->{eidx_key} = $eidx_key;
-		my $eml = PublicInbox::Eml->new($bref);
-		$req->{doc} = eml2doc($self, $eml, $smsg);
-		$req->{tg_isset} = 1; # eml2doc calls $tg->set_document
-	}
-	return if $more;
-	my $doc = delete($req->{doc}) or return; # all bad blobs!
-	$eidx->{transact_bytes} += $size;
-	$self->{xdb}->replace_document($req->{docid}, $doc);
-}
-
-sub reindex_docid {
-	my ($self, $docid) = @_;
-	my $eidx = $self->{eidx} // die 'BUG: {eidx} missing';
-	my $eidx_key_for = $self->{-eidx_key_for} //= do {
-		my %eidx_key_for = map {
-			$_->[0] => $_->[1];
-		} @{$eidx->over->dbh->selectall_arrayref(<<'')};
-SELECT ibx_id,eidx_key FROM inboxes
-
-		\%eidx_key_for;
-	};
-
-	begin_txn_lazy($self);
-	my $doc = eval { $self->{xdb}->get_document($docid) };
-	my $req = { doc => $doc, self => $self, docid => $docid };
-	my $sth = $eidx->over->dbh->prepare_cached(<<'', undef, 1);
-SELECT ibx_id,oidbin FROM xref3 WHERE docid = ? ORDER BY ibx_id ASC
-
-	$sth->execute($docid);
-	my $rows = $sth->fetchall_arrayref;
-	while (my $row = shift(@$rows)) {
-		my ($ibx_id, $oidbin) = @$row;
-		my $oidhex = unpack('H*', $oidbin);
-		$eidx->git->cat_async($oidhex, \&reindex_xap,
-				[ $ibx_id, $oidhex, $req, scalar(@$rows) ]);
-	}
-	if ($eidx->{transact_bytes} >= $eidx->{batch_bytes}) {
-		commit_txn_lazy($self);
-	}
-}
-
 1;
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
index b6eef6bd..ee00858b 100644
--- a/lib/PublicInbox/SearchIdxShard.pm
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -94,8 +94,6 @@ sub shard_worker_loop ($$$$$) {
 			my $over_fn = $1;
 			$over_fn =~ tr/\0/\n/;
 			$self->over_check(PublicInbox::Over->new($over_fn));
-		} elsif ($line =~ /\AE ([0-9]+)\n/) {
-			$self->reindex_docid($1 + 0);
 		} else {
 			chomp $line;
 			my $eidx_key;
@@ -124,9 +122,9 @@ sub shard_worker_loop ($$$$$) {
 }
 
 sub index_raw {
-	my ($self, $msgref, $eml, $smsg, $ibx) = @_;
+	my ($self, $msgref, $eml, $smsg, $eidx_key) = @_;
 	if (my $w = $self->{w}) {
-		my @ekey = $ibx ? ('X='.$ibx->eidx_key."\0") : ();
+		my @ekey = defined($eidx_key) ? ("X=$eidx_key\0") : ();
 		$msgref //= \($eml->as_string);
 		$smsg->{raw_bytes} //= length($$msgref);
 		# mid must be last, it can contain spaces (but not LF)
@@ -140,7 +138,7 @@ sub index_raw {
 			$eml = PublicInbox::Eml->new($msgref);
 		}
 		$self->begin_txn_lazy;
-		$smsg->{eidx_key} = $ibx->eidx_key if $ibx;
+		$smsg->{eidx_key} = $eidx_key if defined $eidx_key;
 		$self->add_message($eml, $smsg);
 	}
 }
@@ -225,13 +223,4 @@ sub shard_over_check {
 	}
 }
 
-sub shard_reindex_docid {
-	my ($self, $docid) = @_;
-	if (my $w = $self->{w}) {
-		print $w "E $docid\n" or die "failed to write to shard: $!";
-	} else {
-		$self->reindex_docid($docid);
-	}
-}
-
 1;

^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 8/9] extsearchidx: reindex releases over.sqlite3 handles properly
  2020-12-15  2:02 ` [PATCH 0/9] extindex: --reindex support Eric Wong
                     ` (6 preceding siblings ...)
  2020-12-15  2:02   ` [PATCH 7/9] extsearchidx: simplify reindex code paths Eric Wong
@ 2020-12-15  2:02   ` Eric Wong
  2020-12-15  2:02   ` [PATCH 9/9] searchidxshard: simplify newline elimination Eric Wong
  2020-12-16  6:40   ` [PATCH 0/9] extindex: --reindex support Eric Wong
  9 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2020-12-15  2:02 UTC (permalink / raw)
  To: meta

When checkpointing and yielding the lock to other processes,
we need to ensure any open DB statement handles are closed,
since they reference and prevent DB FDs from being closed
and unlocked.

And clean up some progress reporting while we're at it.
---
 lib/PublicInbox/ExtSearchIdx.pm | 147 +++++++++++++++++++++-----------
 lib/PublicInbox/V2Writable.pm   |   3 +
 2 files changed, 100 insertions(+), 50 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index d5295735..b5024823 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -29,6 +29,7 @@ use PublicInbox::InboxWritable;
 use PublicInbox::ContentHash qw(content_hash);
 use PublicInbox::Eml;
 use File::Spec;
+use PublicInbox::DS qw(now);
 use DBI qw(:sql_types); # SQL_BLOB
 
 sub new {
@@ -518,6 +519,11 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
 	$self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
 }
 
+sub checkpoint_due ($) {
+	my ($sync) = @_;
+	${$sync->{need_checkpoint}} || (now() > $sync->{next_check});
+}
+
 sub eidxq_process ($$) { # for reindexing
 	my ($self, $sync) = @_;
 
@@ -531,13 +537,16 @@ sub eidxq_process ($$) { # for reindexing
 		my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq');
 		$pr->("Xapian indexing $min..$max (total=$tot)\n");
 	}
-	my %id2pos;
-	my $pos = 0;
-	$id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}};
-	$sync->{id2pos} = \%id2pos;
-
-	my $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
-	my $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
+	$sync->{id2pos} //= do {
+		my %id2pos;
+		my $pos = 0;
+		$id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}};
+		\%id2pos;
+	};
+	my ($del, $iter);
+restart:
+	$del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
+	$iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
 	$iter->execute;
 	while (defined(my $docid = $iter->fetchrow_array)) {
 		last if $sync->{quit};
@@ -549,8 +558,12 @@ sub eidxq_process ($$) { # for reindexing
 		$del->execute($docid);
 		++${$sync->{nr}};
 
-		# this is only for SIGUSR1, shards do their own accounting:
-		reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}};
+		if (checkpoint_due($sync)) {
+			$dbh = $del = $iter = undef;
+			reindex_checkpoint($self, $sync); # release lock
+			$dbh = $self->{oidx}->dbh;
+			goto restart;
+		}
 	}
 	$self->git->async_wait_all;
 	$pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr;
@@ -589,16 +602,28 @@ sub reindex_unseen ($$$$) {
 sub _reindex_check_unseen ($$$) {
 	my ($self, $sync, $ibx) = @_;
 	my $ibx_id = $ibx->{-ibx_id};
-	my ($beg, $end) = (1, 1000);
+	my $slice = 1000;
+	my ($beg, $end) = (1, $slice);
 
 	# first, check if we missed any messages in target $ibx
-	my $inx3 = $self->{oidx}->dbh->prepare(<<'');
-SELECT DISTINCT(docid) FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
-
 	my $msgs;
+	my $pr = $sync->{-opt}->{-progress};
+	my $ekey = $ibx->eidx_key;
+	$sync->{-regen_fmt} = "$ekey checking unseen %u/".$ibx->over->max."\n";
+	${$sync->{nr}} = 0;
+
 	while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) {
+		${$sync->{nr}} = $beg;
 		$beg = $msgs->[-1]->{num} + 1;
-		$end = $beg + 1000;
+		$end = $beg + $slice;
+		if (checkpoint_due($sync)) {
+			reindex_checkpoint($self, $sync); # release lock
+		}
+
+		my $inx3 = $self->{oidx}->dbh->prepare_cached(<<'', undef, 1);
+SELECT DISTINCT(docid) FROM xref3 WHERE
+ibx_id = ? AND xnum = ? AND oidbin = ?
+
 		for my $xsmsg (@$msgs) {
 			my $oidbin = pack('H*', $xsmsg->{blob});
 			$inx3->bind_param(1, $ibx_id);
@@ -623,49 +648,69 @@ SELECT DISTINCT(docid) FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
 
 sub _reindex_check_stale ($$$) {
 	my ($self, $sync, $ibx) = @_;
-
-	# now, check if there's stale xrefs
-	my $get_xnum = $self->{oidx}->dbh->prepare(<<'');
-SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? ORDER BY docid ASC
-
-	$get_xnum->execute($ibx->{-ibx_id});
-	my $del_xref3 = $self->{oidx}->dbh->prepare(<<'');
+	my $min = 0;
+	my $pr = $sync->{-opt}->{-progress};
+	my $fetching;
+	my $ekey = $ibx->eidx_key;
+	$sync->{-regen_fmt} =
+			"$ekey check stale/missing %u/".$ibx->over->max."\n";
+	${$sync->{nr}} = 0;
+	do {
+		if (checkpoint_due($sync)) {
+			reindex_checkpoint($self, $sync); # release lock
+		}
+		# now, check if there's stale xrefs
+		my $iter = $self->{oidx}->dbh->prepare_cached(<<'', undef, 1);
+SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? AND docid > ?
+ORDER BY docid,xnum ASC LIMIT 10000
+
+		$iter->execute($ibx->{-ibx_id}, $min);
+		$fetching = undef;
+
+		while (my ($docid, $xnum, $oidbin) = $iter->fetchrow_array) {
+			return if $sync->{quit};
+			${$sync->{nr}} = $xnum;
+
+			$fetching = $min = $docid;
+			my $smsg = $ibx->over->get_art($xnum);
+			my $oidhex = unpack('H*', $oidbin);
+			my $err;
+			if (!$smsg) {
+				$err = 'stale';
+			} elsif ($smsg->{blob} ne $oidhex) {
+				$err = "mismatch (!= $smsg->{blob})";
+			} else {
+				next; # likely, all good
+			}
+			# current_info already has eidx_key
+			warn "$xnum:$oidhex (#$docid): $err\n";
+			my $del = $self->{oidx}->dbh->prepare_cached(<<'');
 DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
 
-	while (my ($docid, $xnum, $oidbin) = $get_xnum->fetchrow_array) {
-		last if $sync->{quit};
-		my $smsg = $ibx->over->get_art($xnum);
-		my $oidhex = unpack('H*', $oidbin);
-		my $err;
-		if (!$smsg) {
-			$err = 'stale';
-		} elsif ($smsg->{blob} ne $oidhex) {
-			$err = "mismatch (!= $smsg->{blob})";
-		} else {
-			next; # likely, all good
-		}
-		warn $ibx->eidx_key . ":$xnum:$oidhex (#$docid): $err\n";
-		$del_xref3->bind_param(1, $ibx->{-ibx_id});
-		$del_xref3->bind_param(2, $xnum);
-		$del_xref3->bind_param(3, $oidbin, SQL_BLOB);
-		$del_xref3->execute;
-
-		# get_xref3 over-fetches, but this is a rare path:
-		my $xr3 = $self->{oidx}->get_xref3($docid);
-		my $idx = $self->idx_shard($docid);
-		if (scalar(@$xr3) == 0) { # all gone
-			$self->{oidx}->delete_by_num($docid);
-			$self->{oidx}->eidxq_del($docid);
-			$idx->shard_remove($docid);
-		} else { # enqueue for reindex of remaining messages
-			$idx->shard_remove_eidx_info($docid, $ibx->eidx_key);
-			$self->{oidx}->eidxq_add($docid); # yes, add
+			$del->bind_param(1, $ibx->{-ibx_id});
+			$del->bind_param(2, $xnum);
+			$del->bind_param(3, $oidbin, SQL_BLOB);
+			$del->execute;
+
+			# get_xref3 over-fetches, but this is a rare path:
+			my $xr3 = $self->{oidx}->get_xref3($docid);
+			my $idx = $self->idx_shard($docid);
+			if (scalar(@$xr3) == 0) { # all gone
+				$self->{oidx}->delete_by_num($docid);
+				$self->{oidx}->eidxq_del($docid);
+				$idx->shard_remove($docid);
+			} else { # enqueue for reindex of remaining messages
+				$idx->shard_remove_eidx_info($docid,
+							$ibx->eidx_key);
+				$self->{oidx}->eidxq_add($docid); # yes, add
+			}
 		}
-	}
+	} while (defined $fetching);
 }
 
 sub _reindex_inbox ($$$) {
 	my ($self, $sync, $ibx) = @_;
+	local $self->{current_info} = $ibx->eidx_key;
 	_reindex_check_unseen($self, $sync, $ibx);
 	_reindex_check_stale($self, $sync, $ibx) unless $sync->{quit};
 	delete @$ibx{qw(over mm search git)}; # won't need these for a bit
@@ -694,6 +739,8 @@ sub eidx_sync { # main entry point
 	$self->{oidx}->rethread_prepare($opt);
 	my $sync = {
 		need_checkpoint => \(my $need_checkpoint = 0),
+		check_intvl => 10,
+		next_check => now() + 10,
 		-opt => $opt,
 		# DO NOT SET {reindex} here, it's incompatible with reused
 		# V2Writable code, reindex is totally different here
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 97dbf328..992305c5 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -896,6 +896,9 @@ sub reindex_checkpoint ($$) {
 
 	# allow -watch or -mda to write...
 	$self->idx_init($sync->{-opt}); # reacquire lock
+	if (my $intvl = $sync->{check_intvl}) { # eidx
+		$sync->{next_check} = PublicInbox::DS::now() + $intvl;
+	}
 	$mm_tmp->atfork_parent if $mm_tmp;
 }
 

^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [PATCH 9/9] searchidxshard: simplify newline elimination
  2020-12-15  2:02 ` [PATCH 0/9] extindex: --reindex support Eric Wong
                     ` (7 preceding siblings ...)
  2020-12-15  2:02   ` [PATCH 8/9] extsearchidx: reindex releases over.sqlite3 handles properly Eric Wong
@ 2020-12-15  2:02   ` Eric Wong
  2020-12-16  6:40   ` [PATCH 0/9] extindex: --reindex support Eric Wong
  9 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2020-12-15  2:02 UTC (permalink / raw)
  To: meta

This overdue change fixes {current_info} to not inject a newline
into every warning message.

Simpler code helps us avoid bugs and the need to make
fixes like commit 44de182766037948d62bc2a8ba924de2264dd5fc
("searchidxshard: chomp $eidx_key from pipe").
---
 lib/PublicInbox/SearchIdxShard.pm | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
index ee00858b..2e654769 100644
--- a/lib/PublicInbox/SearchIdxShard.pm
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -69,33 +69,31 @@ sub shard_worker_loop ($$$$$) {
 	$0 = "shard[$shard]";
 	$self->begin_txn_lazy;
 	while (my $line = readline($r)) {
+		chomp $line;
 		$v2w->{current_info} = "[$shard] $line";
-		if ($line eq "commit\n") {
+		if ($line eq 'commit') {
 			$self->commit_txn_lazy;
-		} elsif ($line eq "close\n") {
+		} elsif ($line eq 'close') {
 			$self->idx_release;
-		} elsif ($line eq "barrier\n") {
+		} elsif ($line eq 'barrier') {
 			$self->commit_txn_lazy;
 			# no need to lock < 512 bytes is atomic under POSIX
 			print $bnote "barrier $shard\n" or
 					die "write failed for barrier $!\n";
-		} elsif ($line =~ /\AD ([0-9]+)\n\z/s) {
+		} elsif ($line =~ /\AD ([0-9]+)\z/s) {
 			$self->remove_by_docid($1 + 0);
 		} elsif ($line =~ s/\A\+X //) {
 			my ($len, $docid, $eidx_key) = split(/ /, $line, 3);
-			chomp $eidx_key;
 			$self->add_eidx_info($docid, $eidx_key, eml($r, $len));
 		} elsif ($line =~ s/\A-X //) {
 			my ($len, $docid, $eidx_key) = split(/ /, $line, 3);
-			chomp $eidx_key;
 			$self->remove_eidx_info($docid, $eidx_key,
 							eml($r, $len));
-		} elsif ($line =~ s/\AO ([^\n]+)\n//) {
+		} elsif ($line =~ s/\AO ([^\n]+)//) {
 			my $over_fn = $1;
 			$over_fn =~ tr/\0/\n/;
 			$self->over_check(PublicInbox::Over->new($over_fn));
 		} else {
-			chomp $line;
 			my $eidx_key;
 			if ($line =~ s/\AX=(.+)\0//) {
 				$eidx_key = $1;

^ permalink raw reply related	[flat|nested] 14+ messages in thread

* Re: [PATCH 0/9] extindex: --reindex support
  2020-12-15  2:02 ` [PATCH 0/9] extindex: --reindex support Eric Wong
                     ` (8 preceding siblings ...)
  2020-12-15  2:02   ` [PATCH 9/9] searchidxshard: simplify newline elimination Eric Wong
@ 2020-12-16  6:40   ` Eric Wong
  2020-12-16 23:04     ` [PATCH 10/9] extsearchidx: lock eidxq on full --reindex Eric Wong
  9 siblings, 1 reply; 14+ messages in thread
From: Eric Wong @ 2020-12-16  6:40 UTC (permalink / raw)
  To: meta

Eric Wong <e@80x24.org> wrote:
> Patches 1 and 2 are resends, the rest have gone through a lot of
> changes and I'm probably ready to run this live on the
> extindex which holds my lore mirror onion
> <http://rskvuqcfnfizkjg6h5jvovwb3wkikzcwskf54lfpymus6mxrzw67b5ad.onion/all/>

Putting eidxq in over.sqlite3 was a bad idea when multiple
-extindex processes may run :x   Nothing fatal or leading to
index corruption, just some stuff delayed or work needlessly
repeated.

^ permalink raw reply	[flat|nested] 14+ messages in thread

* [PATCH 10/9] extsearchidx: lock eidxq on full --reindex
  2020-12-16  6:40   ` [PATCH 0/9] extindex: --reindex support Eric Wong
@ 2020-12-16 23:04     ` Eric Wong
  0 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2020-12-16 23:04 UTC (permalink / raw)
  To: meta

Eric Wong <e@80x24.org> wrote:
> Putting eidxq in over.sqlite3 was a bad idea when multiple
> -extindex processes may run :x   Nothing fatal or leading to
> index corruption, just some stuff delayed or work needlessly
> repeated.

It's still nice to be able to resume interrupted runs, however.
Final patch in this series from burning SSD hell, I hope...

------------8<------------
Subject: [PATCH] extsearchidx: lock eidxq on full --reindex

Incremental indexing can use the `eidxq' reindexing queue for
handling deletes and resuming interrupted indexing.  Ensure
those incremental -extindex invocations do not steal (and
prematurely perform) work that an "-extindex --reindex"
invocation is handling.
---
 lib/PublicInbox/ExtSearchIdx.pm | 86 +++++++++++++++++++++++++++++++++
 1 file changed, 86 insertions(+)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index b5024823..f492734a 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -18,6 +18,8 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::ExtSearch PublicInbox::Lock);
 use Carp qw(croak carp);
+use Sys::Hostname qw(hostname);
+use POSIX qw(strftime);
 use PublicInbox::Search;
 use PublicInbox::SearchIdx qw(crlf_adjust prepare_stack is_ancestor
 	is_bad_blob);
@@ -524,9 +526,86 @@ sub checkpoint_due ($) {
 	${$sync->{need_checkpoint}} || (now() > $sync->{next_check});
 }
 
+sub host_ident () {
+	# I've copied FS images and only changed the hostname before,
+	# so prepend hostname.  Use `state' since these a BOFH can change
+	# these while this process is running and we always want to be
+	# able to release locks taken by this process.
+	state $retval = hostname . '-' . do {
+		my $m; # machine-id(5) is systemd
+		if (open(my $fh, '<', '/etc/machine-id')) { $m = <$fh> }
+		# hostid(1) is in GNU coreutils, kern.hostid is FreeBSD
+		chomp($m ||= `hostid` || `sysctl -n kern.hostid`);
+		$m;
+	};
+}
+
+sub eidxq_release {
+	my ($self) = @_;
+	my $expect = delete($self->{-eidxq_locked}) or return;
+	my ($owner_pid, undef) = split(/-/, $expect);
+	return if $owner_pid != $$; # shards may fork
+	my $oidx = $self->{oidx};
+	$oidx->begin_lazy;
+	my $cur = $oidx->eidx_meta('eidxq_lock') // '';
+	if ($cur eq $expect) {
+		$oidx->eidx_meta('eidxq_lock', '');
+		return 1;
+	} elsif ($cur ne '') {
+		warn "E: eidxq_lock($expect) stolen by $cur\n";
+	} else {
+		warn "E: eidxq_lock($expect) released by another process\n";
+	}
+	undef;
+}
+
+sub DESTROY {
+	my ($self) = @_;
+	eidxq_release($self) and $self->{oidx}->commit_lazy;
+}
+
+sub _eidxq_take ($) {
+	my ($self) = @_;
+	my $val = "$$-${\time}-$>-".host_ident;
+	$self->{oidx}->eidx_meta('eidxq_lock', $val);
+	$self->{-eidxq_locked} = $val;
+}
+
+sub eidxq_lock_acquire ($) {
+	my ($self) = @_;
+	my $oidx = $self->{oidx};
+	$oidx->begin_lazy;
+	my $cur = $oidx->eidx_meta('eidxq_lock') || return _eidxq_take($self);
+	if (my $locked = $self->{-eidxq_locked}) { # be lazy
+		return $locked if $locked eq $cur;
+	}
+	my ($pid, $time, $euid, $ident) = split(/-/, $cur, 4);
+	my $t = strftime('%Y-%m-%d %k:%M:%S', gmtime($time));
+	if ($euid == $> && $ident eq host_ident) {
+		if (kill(0, $pid)) {
+			warn <<EOM; return;
+I: PID:$pid (re)indexing Xapian since $t, it will continue our work
+EOM
+		}
+		if ($!{ESRCH}) {
+			warn "I: eidxq_lock is stale ($cur), clobbering\n";
+			return _eidxq_take($self);
+		}
+		warn "E: kill(0, $pid) failed: $!\n"; # fall-through:
+	}
+	my $fn = $oidx->dbh->sqlite_db_filename;
+	warn <<EOF;
+W: PID:$pid, UID:$euid on $ident is indexing Xapian since $t
+W: If this is unexpected, delete `eidxq_lock' from the `eidx_meta' table:
+W:	sqlite3 $fn 'DELETE FROM eidx_meta WHERE key = "eidxq_lock"'
+EOF
+	undef;
+}
+
 sub eidxq_process ($$) { # for reindexing
 	my ($self, $sync) = @_;
 
+	return unless eidxq_lock_acquire($self);
 	my $dbh = $self->{oidx}->dbh;
 	my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
 	${$sync->{nr}} = 0;
@@ -719,6 +798,12 @@ sub _reindex_inbox ($$$) {
 sub eidx_reindex {
 	my ($self, $sync) = @_;
 
+	# acquire eidxq_lock early because full reindex takes forever
+	# and incremental -extindex processes can run during our checkpoints
+	if (!eidxq_lock_acquire($self)) {
+		warn "E: aborting --reindex\n";
+		return;
+	}
 	for my $ibx (@{$self->{ibx_list}}) {
 		_reindex_inbox($self, $sync, $ibx);
 		last if $sync->{quit};
@@ -769,6 +854,7 @@ sub eidx_sync { # main entry point
 	$self->{oidx}->rethread_done($opt) unless $sync->{quit};
 	eidxq_process($self, $sync) unless $sync->{quit};
 
+	eidxq_release($self);
 	PublicInbox::V2Writable::done($self);
 }
 

^ permalink raw reply related	[flat|nested] 14+ messages in thread

end of thread, other threads:[~2020-12-16 23:04 UTC | newest]

Thread overview: 14+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-12-11  3:37 [PATCH] extindex: preliminary --reindex support Eric Wong
2020-12-12 19:53 ` [PATCH 2/1] extindex: reindex: drop stale rows from over.sqlite3 Eric Wong
2020-12-15  2:02 ` [PATCH 0/9] extindex: --reindex support Eric Wong
2020-12-15  2:02   ` [PATCH 1/9] extindex: preliminary " Eric Wong
2020-12-15  2:02   ` [PATCH 2/9] extindex: delete stale messages from over.sqlite3 Eric Wong
2020-12-15  2:02   ` [PATCH 3/9] over: sort xref3 by xnum if ibx_id repeats Eric Wong
2020-12-15  2:02   ` [PATCH 4/9] extindex: support --rethread and content bifurcation Eric Wong
2020-12-15  2:02   ` [PATCH 5/9] extsearchidx: reindex works on Xapian, too Eric Wong
2020-12-15  2:02   ` [PATCH 6/9] extsearchidx: checkpoint releases locks Eric Wong
2020-12-15  2:02   ` [PATCH 7/9] extsearchidx: simplify reindex code paths Eric Wong
2020-12-15  2:02   ` [PATCH 8/9] extsearchidx: reindex releases over.sqlite3 handles properly Eric Wong
2020-12-15  2:02   ` [PATCH 9/9] searchidxshard: simplify newline elimination Eric Wong
2020-12-16  6:40   ` [PATCH 0/9] extindex: --reindex support Eric Wong
2020-12-16 23:04     ` [PATCH 10/9] extsearchidx: lock eidxq on full --reindex Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).