unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 8/9] extsearchidx: reindex releases over.sqlite3 handles properly
Date: Tue, 15 Dec 2020 02:02:23 +0000	[thread overview]
Message-ID: <20201215020224.11739-9-e@80x24.org> (raw)
In-Reply-To: <20201215020224.11739-1-e@80x24.org>

When checkpointing and yielding the lock to other processes,
we need to ensure any open DB statement handles are closed,
since they reference and prevent DB FDs from being closed
and unlocked.

And clean up some progress reporting while we're at it.
---
 lib/PublicInbox/ExtSearchIdx.pm | 147 +++++++++++++++++++++-----------
 lib/PublicInbox/V2Writable.pm   |   3 +
 2 files changed, 100 insertions(+), 50 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index d5295735..b5024823 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -29,6 +29,7 @@ use PublicInbox::InboxWritable;
 use PublicInbox::ContentHash qw(content_hash);
 use PublicInbox::Eml;
 use File::Spec;
+use PublicInbox::DS qw(now);
 use DBI qw(:sql_types); # SQL_BLOB
 
 sub new {
@@ -518,6 +519,11 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
 	$self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
 }
 
+sub checkpoint_due ($) {
+	my ($sync) = @_;
+	${$sync->{need_checkpoint}} || (now() > $sync->{next_check});
+}
+
 sub eidxq_process ($$) { # for reindexing
 	my ($self, $sync) = @_;
 
@@ -531,13 +537,16 @@ sub eidxq_process ($$) { # for reindexing
 		my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq');
 		$pr->("Xapian indexing $min..$max (total=$tot)\n");
 	}
-	my %id2pos;
-	my $pos = 0;
-	$id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}};
-	$sync->{id2pos} = \%id2pos;
-
-	my $del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
-	my $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
+	$sync->{id2pos} //= do {
+		my %id2pos;
+		my $pos = 0;
+		$id2pos{$_->{-ibx_id}} = $pos++ for @{$self->{ibx_list}};
+		\%id2pos;
+	};
+	my ($del, $iter);
+restart:
+	$del = $dbh->prepare('DELETE FROM eidxq WHERE docid = ?');
+	$iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
 	$iter->execute;
 	while (defined(my $docid = $iter->fetchrow_array)) {
 		last if $sync->{quit};
@@ -549,8 +558,12 @@ sub eidxq_process ($$) { # for reindexing
 		$del->execute($docid);
 		++${$sync->{nr}};
 
-		# this is only for SIGUSR1, shards do their own accounting:
-		reindex_checkpoint($self, $sync) if ${$sync->{need_checkpoint}};
+		if (checkpoint_due($sync)) {
+			$dbh = $del = $iter = undef;
+			reindex_checkpoint($self, $sync); # release lock
+			$dbh = $self->{oidx}->dbh;
+			goto restart;
+		}
 	}
 	$self->git->async_wait_all;
 	$pr->("reindexed ${$sync->{nr}}/$tot\n") if $pr;
@@ -589,16 +602,28 @@ sub reindex_unseen ($$$$) {
 sub _reindex_check_unseen ($$$) {
 	my ($self, $sync, $ibx) = @_;
 	my $ibx_id = $ibx->{-ibx_id};
-	my ($beg, $end) = (1, 1000);
+	my $slice = 1000;
+	my ($beg, $end) = (1, $slice);
 
 	# first, check if we missed any messages in target $ibx
-	my $inx3 = $self->{oidx}->dbh->prepare(<<'');
-SELECT DISTINCT(docid) FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
-
 	my $msgs;
+	my $pr = $sync->{-opt}->{-progress};
+	my $ekey = $ibx->eidx_key;
+	$sync->{-regen_fmt} = "$ekey checking unseen %u/".$ibx->over->max."\n";
+	${$sync->{nr}} = 0;
+
 	while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end)})) {
+		${$sync->{nr}} = $beg;
 		$beg = $msgs->[-1]->{num} + 1;
-		$end = $beg + 1000;
+		$end = $beg + $slice;
+		if (checkpoint_due($sync)) {
+			reindex_checkpoint($self, $sync); # release lock
+		}
+
+		my $inx3 = $self->{oidx}->dbh->prepare_cached(<<'', undef, 1);
+SELECT DISTINCT(docid) FROM xref3 WHERE
+ibx_id = ? AND xnum = ? AND oidbin = ?
+
 		for my $xsmsg (@$msgs) {
 			my $oidbin = pack('H*', $xsmsg->{blob});
 			$inx3->bind_param(1, $ibx_id);
@@ -623,49 +648,69 @@ SELECT DISTINCT(docid) FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
 
 sub _reindex_check_stale ($$$) {
 	my ($self, $sync, $ibx) = @_;
-
-	# now, check if there's stale xrefs
-	my $get_xnum = $self->{oidx}->dbh->prepare(<<'');
-SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? ORDER BY docid ASC
-
-	$get_xnum->execute($ibx->{-ibx_id});
-	my $del_xref3 = $self->{oidx}->dbh->prepare(<<'');
+	my $min = 0;
+	my $pr = $sync->{-opt}->{-progress};
+	my $fetching;
+	my $ekey = $ibx->eidx_key;
+	$sync->{-regen_fmt} =
+			"$ekey check stale/missing %u/".$ibx->over->max."\n";
+	${$sync->{nr}} = 0;
+	do {
+		if (checkpoint_due($sync)) {
+			reindex_checkpoint($self, $sync); # release lock
+		}
+		# now, check if there's stale xrefs
+		my $iter = $self->{oidx}->dbh->prepare_cached(<<'', undef, 1);
+SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? AND docid > ?
+ORDER BY docid,xnum ASC LIMIT 10000
+
+		$iter->execute($ibx->{-ibx_id}, $min);
+		$fetching = undef;
+
+		while (my ($docid, $xnum, $oidbin) = $iter->fetchrow_array) {
+			return if $sync->{quit};
+			${$sync->{nr}} = $xnum;
+
+			$fetching = $min = $docid;
+			my $smsg = $ibx->over->get_art($xnum);
+			my $oidhex = unpack('H*', $oidbin);
+			my $err;
+			if (!$smsg) {
+				$err = 'stale';
+			} elsif ($smsg->{blob} ne $oidhex) {
+				$err = "mismatch (!= $smsg->{blob})";
+			} else {
+				next; # likely, all good
+			}
+			# current_info already has eidx_key
+			warn "$xnum:$oidhex (#$docid): $err\n";
+			my $del = $self->{oidx}->dbh->prepare_cached(<<'');
 DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
 
-	while (my ($docid, $xnum, $oidbin) = $get_xnum->fetchrow_array) {
-		last if $sync->{quit};
-		my $smsg = $ibx->over->get_art($xnum);
-		my $oidhex = unpack('H*', $oidbin);
-		my $err;
-		if (!$smsg) {
-			$err = 'stale';
-		} elsif ($smsg->{blob} ne $oidhex) {
-			$err = "mismatch (!= $smsg->{blob})";
-		} else {
-			next; # likely, all good
-		}
-		warn $ibx->eidx_key . ":$xnum:$oidhex (#$docid): $err\n";
-		$del_xref3->bind_param(1, $ibx->{-ibx_id});
-		$del_xref3->bind_param(2, $xnum);
-		$del_xref3->bind_param(3, $oidbin, SQL_BLOB);
-		$del_xref3->execute;
-
-		# get_xref3 over-fetches, but this is a rare path:
-		my $xr3 = $self->{oidx}->get_xref3($docid);
-		my $idx = $self->idx_shard($docid);
-		if (scalar(@$xr3) == 0) { # all gone
-			$self->{oidx}->delete_by_num($docid);
-			$self->{oidx}->eidxq_del($docid);
-			$idx->shard_remove($docid);
-		} else { # enqueue for reindex of remaining messages
-			$idx->shard_remove_eidx_info($docid, $ibx->eidx_key);
-			$self->{oidx}->eidxq_add($docid); # yes, add
+			$del->bind_param(1, $ibx->{-ibx_id});
+			$del->bind_param(2, $xnum);
+			$del->bind_param(3, $oidbin, SQL_BLOB);
+			$del->execute;
+
+			# get_xref3 over-fetches, but this is a rare path:
+			my $xr3 = $self->{oidx}->get_xref3($docid);
+			my $idx = $self->idx_shard($docid);
+			if (scalar(@$xr3) == 0) { # all gone
+				$self->{oidx}->delete_by_num($docid);
+				$self->{oidx}->eidxq_del($docid);
+				$idx->shard_remove($docid);
+			} else { # enqueue for reindex of remaining messages
+				$idx->shard_remove_eidx_info($docid,
+							$ibx->eidx_key);
+				$self->{oidx}->eidxq_add($docid); # yes, add
+			}
 		}
-	}
+	} while (defined $fetching);
 }
 
 sub _reindex_inbox ($$$) {
 	my ($self, $sync, $ibx) = @_;
+	local $self->{current_info} = $ibx->eidx_key;
 	_reindex_check_unseen($self, $sync, $ibx);
 	_reindex_check_stale($self, $sync, $ibx) unless $sync->{quit};
 	delete @$ibx{qw(over mm search git)}; # won't need these for a bit
@@ -694,6 +739,8 @@ sub eidx_sync { # main entry point
 	$self->{oidx}->rethread_prepare($opt);
 	my $sync = {
 		need_checkpoint => \(my $need_checkpoint = 0),
+		check_intvl => 10,
+		next_check => now() + 10,
 		-opt => $opt,
 		# DO NOT SET {reindex} here, it's incompatible with reused
 		# V2Writable code, reindex is totally different here
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 97dbf328..992305c5 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -896,6 +896,9 @@ sub reindex_checkpoint ($$) {
 
 	# allow -watch or -mda to write...
 	$self->idx_init($sync->{-opt}); # reacquire lock
+	if (my $intvl = $sync->{check_intvl}) { # eidx
+		$sync->{next_check} = PublicInbox::DS::now() + $intvl;
+	}
 	$mm_tmp->atfork_parent if $mm_tmp;
 }
 

  parent reply	other threads:[~2020-12-15  2:02 UTC|newest]

Thread overview: 14+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-12-11  3:37 [PATCH] extindex: preliminary --reindex support Eric Wong
2020-12-12 19:53 ` [PATCH 2/1] extindex: reindex: drop stale rows from over.sqlite3 Eric Wong
2020-12-15  2:02 ` [PATCH 0/9] extindex: --reindex support Eric Wong
2020-12-15  2:02   ` [PATCH 1/9] extindex: preliminary " Eric Wong
2020-12-15  2:02   ` [PATCH 2/9] extindex: delete stale messages from over.sqlite3 Eric Wong
2020-12-15  2:02   ` [PATCH 3/9] over: sort xref3 by xnum if ibx_id repeats Eric Wong
2020-12-15  2:02   ` [PATCH 4/9] extindex: support --rethread and content bifurcation Eric Wong
2020-12-15  2:02   ` [PATCH 5/9] extsearchidx: reindex works on Xapian, too Eric Wong
2020-12-15  2:02   ` [PATCH 6/9] extsearchidx: checkpoint releases locks Eric Wong
2020-12-15  2:02   ` [PATCH 7/9] extsearchidx: simplify reindex code paths Eric Wong
2020-12-15  2:02   ` Eric Wong [this message]
2020-12-15  2:02   ` [PATCH 9/9] searchidxshard: simplify newline elimination Eric Wong
2020-12-16  6:40   ` [PATCH 0/9] extindex: --reindex support Eric Wong
2020-12-16 23:04     ` [PATCH 10/9] extsearchidx: lock eidxq on full --reindex Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20201215020224.11739-9-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).