unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 4/9] extindex: support --rethread and content bifurcation
Date: Tue, 15 Dec 2020 02:02:19 +0000	[thread overview]
Message-ID: <20201215020224.11739-5-e@80x24.org> (raw)
In-Reply-To: <20201215020224.11739-1-e@80x24.org>

--rethread is useful for dealing with bugs and behaves
just like it does with current inboxes.

This is in case our content deduplication logic changes for
whatever reason and causes previously merged messages to be
considered "different".  As with v2, this won't allow us to
merge messages in a way that allows deduplicating messages which
were previously considered different, but v2 inboxes do not
allow that, either.

In other words, this makes the --reindex and --rethread
switches of -extindex match the behavior of v2 -index.
---
 lib/PublicInbox/ExtSearchIdx.pm | 117 ++++++++++++++++++++++++++++++--
 lib/PublicInbox/Over.pm         |   3 +-
 t/extsearch.t                   |  71 +++++++++++++++++--
 3 files changed, 180 insertions(+), 11 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index ec86a7c0..c77fb197 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -395,6 +395,106 @@ DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over)
 	done($self);
 }
 
+sub _reindex_finalize ($$$) {
+	my ($req, $smsg, $eml) = @_;
+	my $sync = $req->{sync};
+	my $self = $sync->{self};
+	my $by_chash = $req->{by_chash};
+	my $nr = scalar(keys(%$by_chash)) or die 'BUG: no content hashes';
+	my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
+	my $docid = $smsg->{num} = $orig_smsg->{num};
+	$self->{oidx}->add_overview($eml, $smsg); # may rethread
+	return if $nr == 1; # likely, all good
+
+	warn "W: #$docid split into $nr due to deduplication change\n";
+	my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
+	delete($by_chash->{$chash0}) // die "BUG: $smsg->{blob} chash missing";
+	for my $ary (values %$by_chash) {
+		for my $x (reverse @$ary) {
+			my $n = $self->{oidx}->remove_xref3($docid, $x->{blob});
+			die "BUG: $x->{blob} invalidated #$docid" if $n == 0;
+		}
+		my $x = $ary->[-1] // die "BUG: #$docid {by_chash} empty";
+		$x->{num} = delete($x->{xnum}) // die '{xnum} unset';
+		my $ibx_id = delete($x->{ibx_id}) // die '{ibx_id} unset';
+		my $pos = $sync->{id2pos}->{$ibx_id} // die "$ibx_id no pos";
+		my $ibx = $self->{ibx_list}->[$pos] //
+			die "BUG: ibx for $x->{blob} not mapped";
+		my $e = $ibx->over->get_art($x->{num});
+		$e->{blob} eq $x->{blob} or die <<EOF;
+$x->{blob} != $e->{blob} (${\$ibx->eidx_key}:$e->{num});
+EOF
+		reindex_unseen($self, $sync, $ibx, $e);
+	}
+}
+
+sub _reindex_oid { # git->cat_async callback
+	my ($bref, $oid, $type, $size, $req) = @_;
+	my $sync = $req->{sync};
+	my $self = $sync->{self};
+	my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
+	my $expect_oid = $req->{xr3}->[$req->{ix}]->[2];
+	my $docid = $orig_smsg->{num};
+	if (is_bad_blob($oid, $type, $size, $expect_oid)) {
+		my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid);
+		if ($remain == 0) {
+			warn "W: #$docid gone or corrupted\n";
+			$self->idx_shard($docid)->shard_remove($docid);
+		} elsif (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+			$self->git->cat_async($next_oid, \&_reindex_oid, $req);
+		} else {
+			warn "BUG: #$docid gone (UNEXPECTED)\n";
+			$self->idx_shard($docid)->shard_remove($docid);
+		}
+		return;
+	}
+	my $ci = $self->{current_info};
+	local $self->{current_info} = "$ci #$docid $oid";
+	my $re_smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
+	$re_smsg->{bytes} = $size + crlf_adjust($$bref);
+	my $eml = PublicInbox::Eml->new($bref);
+	$re_smsg->populate($eml, { autime => $orig_smsg->{ds},
+				cotime => $orig_smsg->{ts} });
+	my $chash = content_hash($eml);
+	$re_smsg->{chash} = $chash;
+	$re_smsg->{xnum} = $req->{xr3}->[$req->{ix}]->[1];
+	$re_smsg->{ibx_id} = $req->{xr3}->[$req->{ix}]->[0];
+	push @{$req->{by_chash}->{$chash}}, $re_smsg;
+	if (my $next_oid = $req->{xr3}->[++$req->{ix}]->[2]) {
+		$self->git->cat_async($next_oid, \&_reindex_oid, $req);
+	} else { # last $re_smsg is the highest priority xref3
+		local $self->{current_info} = "$ci #$docid";
+		_reindex_finalize($req, $re_smsg, $eml);
+	}
+}
+
+sub _reindex_smsg ($$$) {
+	my ($self, $sync, $smsg) = @_;
+	my $docid = $smsg->{num};
+	my $xr3 = $self->{oidx}->get_xref3($docid, 1);
+	if (scalar(@$xr3) == 0) { # _reindex_check_stale should've covered this
+		warn <<"";
+BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
+
+		$self->{oidx}->delete_by_num($docid);
+		$self->idx_shard($docid)->shard_remove($docid);
+		return;
+	}
+
+	# we sort {xr3} in the reverse order of {ibx_list} so we can
+	# hit the common case in _reindex_finalize without rereading
+	# from git (or holding multiple messages in memory).
+	my $id2pos = $sync->{id2pos}; # index in {ibx_list}
+	@$xr3 = sort {
+		$id2pos->{$b->[0]} <=> $id2pos->{$a->[0]}
+				||
+		$b->[1] <=> $a->[1] # break ties with {xnum}
+	} @$xr3;
+	@$xr3 = map { [ $_->[0], $_->[1], unpack('H*', $_->[2]) ] } @$xr3;
+	my $req = { orig_smsg => $smsg, sync => $sync, xr3 => $xr3, ix => 0 };
+	$self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
+}
+
 sub eidxq_process ($$) { # for reindexing
 	my ($self, $sync) = @_;
 
@@ -410,19 +510,29 @@ sub eidxq_process ($$) { # for reindexing
 		my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq');
 		$pr->("Xapian indexing $min..$max (total=$tot)\n");
 	}
+	my %id2pos;
+	my $pos = 0;
+	$id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}};
+	$sync->{id2pos} = \%id2pos;
 
 	my $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
 	my $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
 	$iter->execute;
 	while (defined(my $docid = $iter->fetchrow_array)) {
-		$self->idx_shard($docid)->shard_reindex_docid($docid);
-		$del->execute($docid);
 		last if $sync->{quit};
+		if (my $smsg = $self->{oidx}->get_art($docid)) {
+			_reindex_smsg($self, $sync, $smsg);
+		} else {
+			warn "E: #$docid does not exist in over\n";
+		}
+		$del->execute($docid);
+
 		my $cur = ++${$sync->{nr}};
 
 		# shards flush on their own, just don't queue up too many
 		# deletes
 		if (($cur % 1000) == 0) {
+			$self->git->async_wait_all;
 			$self->{oidx}->commit_lazy;
 			$self->{oidx}->begin_lazy;
 			$pr->("reindexed $cur/$tot\n") if $pr;
@@ -430,9 +540,8 @@ sub eidxq_process ($$) { # for reindexing
 		# this is only for SIGUSR1, shards do their own accounting:
 		reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}};
 	}
+	$self->git->async_wait_all;
 	$pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr;
-	$self->{oidx}->commit_lazy;
-	$self->{oidx}->begin_lazy;
 }
 
 sub _reindex_unseen { # git->cat_async callback
diff --git a/lib/PublicInbox/Over.pm b/lib/PublicInbox/Over.pm
index 62709468..49ba180b 100644
--- a/lib/PublicInbox/Over.pm
+++ b/lib/PublicInbox/Over.pm
@@ -261,13 +261,14 @@ SELECT num,tid,ds,ts,ddd FROM over WHERE num = ? LIMIT 1
 }
 
 sub get_xref3 {
-	my ($self, $num) = @_;
+	my ($self, $num, $raw) = @_;
 	my $dbh = dbh($self);
 	my $sth = $dbh->prepare_cached(<<'', undef, 1);
 SELECT ibx_id,xnum,oidbin FROM xref3 WHERE docid = ? ORDER BY ibx_id,xnum ASC
 
 	$sth->execute($num);
 	my $rows = $sth->fetchall_arrayref;
+	return $rows if $raw;
 	my $eidx_key_sth = $dbh->prepare_cached(<<'', undef, 1);
 SELECT eidx_key FROM inboxes WHERE ibx_id = ?
 
diff --git a/t/extsearch.t b/t/extsearch.t
index 4a8a9f49..fb31b0ab 100644
--- a/t/extsearch.t
+++ b/t/extsearch.t
@@ -6,12 +6,14 @@ use Test::More;
 use PublicInbox::TestCommon;
 use PublicInbox::Config;
 use PublicInbox::Search;
+use PublicInbox::InboxWritable;
 use Fcntl qw(:seek);
 my $json = PublicInbox::Config::json() or plan skip_all => 'JSON missing';
 require_git(2.6);
 require_mods(qw(DBD::SQLite Search::Xapian));
 use_ok 'PublicInbox::ExtSearch';
 use_ok 'PublicInbox::ExtSearchIdx';
+use_ok 'PublicInbox::OverIdx';
 my $sock = tcp_server();
 my $host_port = $sock->sockhost . ':' . $sock->sockport;
 my ($home, $for_destroy) = tmpdir();
@@ -179,6 +181,8 @@ like($it[1]->get_document->get_data, qr/v1test/, 'docdata matched v1');
 
 my $cfg = PublicInbox::Config->new;
 my $schema_version = PublicInbox::Search::SCHEMA_VERSION();
+my $f = "$home/extindex/ei$schema_version/over.sqlite3";
+my $oidx = PublicInbox::OverIdx->new($f);
 if ('inject w/o indexing') {
 	use PublicInbox::Import;
 	my $v1ibx = $cfg->lookup_name('v1test');
@@ -232,8 +236,6 @@ if ('inject w/o indexing') {
 }
 
 if ('reindex catches missed messages') {
-	use PublicInbox::InboxWritable;
-	use PublicInbox::OverIdx;
 	my $v2ibx = $cfg->lookup_name('v2test');
 	my $im = PublicInbox::InboxWritable->new($v2ibx)->importer(0);
 	my $cmt_a = $v2ibx->mm->last_commit_xap($schema_version, 0);
@@ -242,8 +244,6 @@ if ('reindex catches missed messages') {
 	$im->done;
 	my $cmt_b = $v2ibx->mm->last_commit_xap($schema_version, 0);
 	isnt($cmt_a, $cmt_b, 'v2 0.git HEAD updated');
-	my $f = "$home/extindex/ei$schema_version/over.sqlite3";
-	my $oidx = PublicInbox::OverIdx->new($f);
 	$oidx->dbh;
 	my $uv = $v2ibx->uidvalidity;
 	my $lc_key = "lc-v2:v2.example//$uv;0";
@@ -263,7 +263,7 @@ if ('reindex catches missed messages') {
 	is($oidx->max, $max + 1, '->max bumped');
 	is($oidx->eidx_meta($lc_key), $cmt_b, 'lc-v2 stays unchanged');
 	my @err = split(/^/, $err);
-	is(scalar(@err), 1, 'only one warning');
+	is(scalar(@err), 1, 'only one warning') or diag "err=$err";
 	like($err[0], qr/I: reindex_unseen/, 'got reindex_unseen message');
 	my $new = $oidx->get_art($max + 1);
 	is($new->{subject}, $eml->header('Subject'), 'new message added');
@@ -283,7 +283,7 @@ if ('reindex catches missed messages') {
 			$v2ibx->{inboxdir}], undef, $opt),
 			'--reindex for stale');
 	@err = split(/^/, $err);
-	is(scalar(@err), 1, 'only one warning');
+	is(scalar(@err), 1, 'only one warning') or diag "err=$err";
 	like($err[0], qr/\(#$new->{num}\): stale/, 'got stale message warning');
 	is($oidx->get_art($new->{num}), undef,
 		'stale message gone from over');
@@ -294,6 +294,65 @@ if ('reindex catches missed messages') {
 	is($mset->size, 0, 'stale mid gone Xapian');
 }
 
+if ('reindex catches content bifurcation') {
+	use PublicInbox::MID qw(mids);
+	my $v2ibx = $cfg->lookup_name('v2test');
+	my $im = PublicInbox::InboxWritable->new($v2ibx)->importer(0);
+	my $eml = eml_load('t/data/message_embed.eml');
+	my $cmt_a = $v2ibx->mm->last_commit_xap($schema_version, 0);
+	$im->add($eml);
+	$im->done;
+	my $cmt_b = $v2ibx->mm->last_commit_xap($schema_version, 0);
+	my $uv = $v2ibx->uidvalidity;
+	my $lc_key = "lc-v2:v2.example//$uv;0";
+	$oidx->dbh;
+	is($oidx->eidx_meta($lc_key, $cmt_b), $cmt_a,
+		'update lc-v2 meta, old is as expected');
+	my $mid = mids($eml)->[0];
+	my $smsg = $v2ibx->over->next_by_mid($mid, \(my $id), \(my $prev));
+	my $oldmax = $oidx->max;
+	my $x3_orig = $oidx->get_xref3(3);
+	is(scalar(@$x3_orig), 1, '#3 has one xref');
+	$oidx->add_xref3(3, $smsg->{num}, $smsg->{blob}, 'v2.example');
+	my $x3 = $oidx->get_xref3(3);
+	is(scalar(@$x3), 2, 'injected xref3');
+	$oidx->commit_lazy;
+	my $opt = { 2 => \(my $err = '') };
+	ok(run_script([qw(-extindex --all), "$home/extindex"], undef, $opt),
+		'extindex --all is noop');
+	is($err, '', 'no warnings in index');
+	$oidx->dbh;
+	is($oidx->max, $oldmax, 'oidx->max unchanged');
+	$oidx->dbh_close;
+	ok(run_script([qw(-extindex --reindex --all), "$home/extindex"],
+		undef, $opt), 'extindex --reindex');
+	$oidx->dbh;
+	ok($oidx->max > $oldmax, 'oidx->max bumped');
+	like($err, qr/split into 2 due to deduplication change/,
+		'bifurcation noted');
+	my $added = $oidx->get_art($oidx->max);
+	is($added->{blob}, $smsg->{blob}, 'new blob indexed');
+	is_deeply(["v2.example:$smsg->{num}:$smsg->{blob}"],
+		$oidx->get_xref3($added->{num}),
+		'xref3 corrected for bifurcated message');
+	is_deeply($oidx->get_xref3(3), $x3_orig, 'xref3 restored for #3');
+}
+
+if ('--reindex --rethread') {
+	my $before = $oidx->dbh->selectrow_array(<<'');
+SELECT MAX(tid) FROM over WHERE num > 0
+
+	my $opt = {};
+	ok(run_script([qw(-extindex --reindex --rethread --all),
+			"$home/extindex"], undef, $opt),
+			'--rethread');
+	my $after = $oidx->dbh->selectrow_array(<<'');
+SELECT MIN(tid) FROM over WHERE num > 0
+
+	# actual rethread logic is identical to v1/v2 and tested elsewhere
+	ok($after > $before, '--rethread updates MIN(tid)');
+}
+
 if ('remove v1test and test gc') {
 	xsys([qw(git config --unset publicinbox.v1test.inboxdir)],
 		{ GIT_CONFIG => $cfg_path });

  parent reply	other threads:[~2020-12-15  2:02 UTC|newest]

Thread overview: 14+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-12-11  3:37 [PATCH] extindex: preliminary --reindex support Eric Wong
2020-12-12 19:53 ` [PATCH 2/1] extindex: reindex: drop stale rows from over.sqlite3 Eric Wong
2020-12-15  2:02 ` [PATCH 0/9] extindex: --reindex support Eric Wong
2020-12-15  2:02   ` [PATCH 1/9] extindex: preliminary " Eric Wong
2020-12-15  2:02   ` [PATCH 2/9] extindex: delete stale messages from over.sqlite3 Eric Wong
2020-12-15  2:02   ` [PATCH 3/9] over: sort xref3 by xnum if ibx_id repeats Eric Wong
2020-12-15  2:02   ` Eric Wong [this message]
2020-12-15  2:02   ` [PATCH 5/9] extsearchidx: reindex works on Xapian, too Eric Wong
2020-12-15  2:02   ` [PATCH 6/9] extsearchidx: checkpoint releases locks Eric Wong
2020-12-15  2:02   ` [PATCH 7/9] extsearchidx: simplify reindex code paths Eric Wong
2020-12-15  2:02   ` [PATCH 8/9] extsearchidx: reindex releases over.sqlite3 handles properly Eric Wong
2020-12-15  2:02   ` [PATCH 9/9] searchidxshard: simplify newline elimination Eric Wong
2020-12-16  6:40   ` [PATCH 0/9] extindex: --reindex support Eric Wong
2020-12-16 23:04     ` [PATCH 10/9] extsearchidx: lock eidxq on full --reindex Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20201215020224.11739-5-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).