unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 1/4] *index: checkpoints write last_commit metadata
Date: Fri, 13 Nov 2020 11:11:41 +0000	[thread overview]
Message-ID: <20201113111144.23038-2-e@80x24.org> (raw)
In-Reply-To: <20201113111144.23038-1-e@80x24.org>

This will set us up for supporting graceful shutdown
on -index without repeating any work.
---
 lib/PublicInbox/ExtSearchIdx.pm | 12 ++++---
 lib/PublicInbox/IdxStack.pm     | 16 +++++++---
 lib/PublicInbox/SearchIdx.pm    | 56 ++++++++++++++++++---------------
 lib/PublicInbox/V2Writable.pm   | 28 ++++++++++++-----
 t/idx_stack.t                   | 20 ++++++------
 5 files changed, 81 insertions(+), 51 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 7aaf8291..14ffdadb 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -168,6 +168,10 @@ sub do_finalize ($) {
 		# `d' message was already unindexed in the v1/v2 inboxes,
 		# so it's too noisy to warn, here.
 	}
+	# cur_cmt may be undef for unindex_oid, set by V2Writable::index_todo
+	if (defined(my $cur_cmt = $req->{cur_cmt})) {
+		${$req->{latest_cmt}} = $cur_cmt;
+	}
 }
 
 sub do_step ($) { # main iterator for adding messages to the index
@@ -337,10 +341,10 @@ sub eidx_sync { # main entry point
 }
 
 sub update_last_commit { # overrides V2Writable
-	my ($self, $sync, $unit, $latest_cmt) = @_;
-	return unless defined $latest_cmt;
-
-	$self->git->async_wait_all;
+	my ($self, $sync, $stk) = @_;
+	my $unit = $sync->{unit} // return;
+	my $latest_cmt = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+	defined($latest_cmt) or return;
 	my $ibx = $sync->{ibx} or die 'BUG: {ibx} missing';
 	my $ekey = $ibx->eidx_key;
 	my $uv = $ibx->uidvalidity;
diff --git a/lib/PublicInbox/IdxStack.pm b/lib/PublicInbox/IdxStack.pm
index ce75b46a..e7e10de9 100644
--- a/lib/PublicInbox/IdxStack.pm
+++ b/lib/PublicInbox/IdxStack.pm
@@ -6,7 +6,7 @@ package PublicInbox::IdxStack;
 use v5.10.1;
 use strict;
 use Fcntl qw(:seek);
-use constant FMT => eval { pack('Q', 1) } ? 'A1QQH*' : 'A1IIH*';
+use constant PACK_FMT => eval { pack('Q', 1) } ? 'A1QQH*H*' : 'A1IIH*H*';
 
 # start off in write-only mode
 sub new {
@@ -16,9 +16,15 @@ sub new {
 
 # file_char = [d|m]
 sub push_rec {
-	my ($self, $file_char, $at, $ct, $blob_oid) = @_;
-	my $rec = pack(FMT, $file_char, $at, $ct, $blob_oid);
-	$self->{rec_size} //= length($rec);
+	my ($self, $file_char, $at, $ct, $blob_oid, $cmt_oid) = @_;
+	my $rec = pack(PACK_FMT, $file_char, $at, $ct, $blob_oid, $cmt_oid);
+	$self->{unpack_fmt} //= do {
+		my $len = length($cmt_oid);
+		my $fmt = PACK_FMT;
+		$fmt =~ s/H\*/H$len/g;
+		$self->{rec_size} = length($rec);
+		$fmt;
+	};
 	print { $self->{wr} } $rec or die "print: $!";
 	$self->{tot_size} += length($rec);
 }
@@ -46,7 +52,7 @@ sub pop_rec {
 	my $r = read($io, my $buf, $sz);
 	defined($r) or die "read: $!";
 	$r == $sz or die "read($r != $sz)";
-	unpack(FMT, $buf);
+	unpack($self->{unpack_fmt}, $buf);
 }
 
 1;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 662055c6..90d8c8b3 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -608,11 +608,17 @@ sub index_both { # git->cat_async callback
 	$smsg->{num} = index_mm($self, $eml, $oid, $sync) or
 		die "E: could not generate NNTP article number for $oid";
 	add_message($self, $eml, $smsg, $sync);
+	my $cur_cmt = $sync->{cur_cmt} // die 'BUG: {cur_cmt} missing';
+	${$sync->{latest_cmt}} = $cur_cmt;
 }
 
 sub unindex_both { # git->cat_async callback
-	my ($bref, $oid, $type, $size, $self) = @_;
-	unindex_eml($self, $oid, PublicInbox::Eml->new($bref));
+	my ($bref, $oid, $type, $size, $sync) = @_;
+	unindex_eml($sync->{sidx}, $oid, PublicInbox::Eml->new($bref));
+	# may be undef if leftover
+	if (defined(my $cur_cmt = $sync->{cur_cmt})) {
+		${$sync->{latest_cmt}} = $cur_cmt;
+	}
 }
 
 sub with_umask {
@@ -646,34 +652,33 @@ sub v1_checkpoint ($$;$) {
 	my ($self, $sync, $stk) = @_;
 	$self->{ibx}->git->async_wait_all;
 
-	# latest_cmt may be undef
-	my $newest = $stk ? $stk->{latest_cmt} : undef;
-	if ($newest) {
+	# $newest may be undef
+	my $newest = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+	if (defined($newest)) {
 		my $cur = $self->{mm}->last_commit || '';
 		if (need_update($self, $cur, $newest)) {
 			$self->{mm}->last_commit($newest);
 		}
-	} else {
-		${$sync->{max}} = $self->{batch_bytes};
 	}
+	${$sync->{max}} = $self->{batch_bytes};
 
 	$self->{mm}->{dbh}->commit;
-	if ($newest && need_xapian($self)) {
-		my $xdb = $self->{xdb};
+	my $xdb = need_xapian($self) ? $self->{xdb} : undef;
+	if ($newest && $xdb) {
 		my $cur = $xdb->get_metadata('last_commit');
 		if (need_update($self, $cur, $newest)) {
 			$xdb->set_metadata('last_commit', $newest);
 		}
-
+	}
+	if ($stk) { # all done if $stk is passed
 		# let SearchView know a full --reindex was done so it can
 		# generate ->has_threadid-dependent links
-		if ($sync->{reindex} && !ref($sync->{reindex})) {
+		if ($xdb && $sync->{reindex} && !ref($sync->{reindex})) {
 			my $n = $xdb->get_metadata('has_threadid');
 			$xdb->set_metadata('has_threadid', '1') if $n ne '1';
 		}
+		$self->{oidx}->rethread_done($sync->{-opt}); # all done
 	}
-
-	$self->{oidx}->rethread_done($sync->{-opt}) if $newest; # all done
 	commit_txn_lazy($self);
 	$sync->{ibx}->git->cleanup;
 	my $nr = ${$sync->{nr}};
@@ -697,21 +702,24 @@ sub process_stack {
 	$sync->{nr} = \$nr;
 	$sync->{max} = \$max;
 	$sync->{sidx} = $self;
+	$sync->{latest_cmt} = \(my $latest_cmt);
 
 	$self->{mm}->{dbh}->begin_work;
 	if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
 		warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
 		for my $oid (@leftovers) {
 			$oid = unpack('H*', $oid);
-			$git->cat_async($oid, \&unindex_both, $self);
+			$git->cat_async($oid, \&unindex_both, $sync);
 		}
 	}
 	if ($sync->{max_size} = $sync->{-opt}->{max_size}) {
 		$sync->{index_oid} = \&index_both;
 	}
-	while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
+	while (my ($f, $at, $ct, $oid, $cur_cmt) = $stk->pop_rec) {
+		my $arg = { %$sync, cur_cmt => $cur_cmt };
 		if ($f eq 'm') {
-			my $arg = { %$sync, autime => $at, cotime => $ct };
+			$arg->{autime} = $at;
+			$arg->{cotime} = $ct;
 			if ($sync->{max_size}) {
 				$git->check_async($oid, \&check_size, $arg);
 			} else {
@@ -719,7 +727,7 @@ sub process_stack {
 			}
 			v1_checkpoint($self, $sync) if $max <= 0;
 		} elsif ($f eq 'd') {
-			$git->cat_async($oid, \&unindex_both, $self);
+			$git->cat_async($oid, \&unindex_both, $arg);
 		}
 	}
 	v1_checkpoint($self, $sync, $stk);
@@ -743,17 +751,17 @@ sub log2stack ($$$) {
 	my $fh = $git->popen(qw(log --raw -r --pretty=tformat:%at-%ct-%H
 				--no-notes --no-color --no-renames --no-abbrev),
 				$range);
-	my ($at, $ct, $stk);
+	my ($at, $ct, $stk, $cmt);
 	while (<$fh>) {
 		if (/\A([0-9]+)-([0-9]+)-($OID)$/o) {
-			($at, $ct) = ($1 + 0, $2 + 0);
-			$stk //= PublicInbox::IdxStack->new($3);
+			($at, $ct, $cmt) = ($1 + 0, $2 + 0, $3);
+			$stk //= PublicInbox::IdxStack->new($cmt);
 		} elsif (/$del/) {
 			my $oid = $1;
 			if ($D) { # reindex case
 				$D->{pack('H*', $oid)}++;
 			} else { # non-reindex case:
-				$stk->push_rec('d', $at, $ct, $oid);
+				$stk->push_rec('d', $at, $ct, $oid, $cmt);
 			}
 		} elsif (/$add/) {
 			my $oid = $1;
@@ -761,12 +769,10 @@ sub log2stack ($$$) {
 				my $oid_bin = pack('H*', $oid);
 				my $nr = --$D->{$oid_bin};
 				delete($D->{$oid_bin}) if $nr <= 0;
-
 				# nr < 0 (-1) means it never existed
-				$stk->push_rec('m', $at, $ct, $oid) if $nr < 0;
-			} else {
-				$stk->push_rec('m', $at, $ct, $oid);
+				next if $nr >= 0;
 			}
+			$stk->push_rec('m', $at, $ct, $oid, $cmt);
 		}
 	}
 	close $fh or die "git log failed: \$?=$?";
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 18f33655..87b76501 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -861,6 +861,7 @@ sub reindex_checkpoint ($$) {
 	my ($self, $sync) = @_;
 
 	$self->git->async_wait_all;
+	$self->update_last_commit($sync);
 	${$sync->{need_checkpoint}} = 0;
 	my $mm_tmp = $sync->{mm_tmp};
 	$mm_tmp->atfork_prepare if $mm_tmp;
@@ -955,19 +956,22 @@ sub index_oid { # cat_async callback
 	if (do_idx($self, $bref, $eml, $smsg)) {
 		${$arg->{need_checkpoint}} = 1;
 	}
+	${$arg->{latest_cmt}} = $arg->{cur_cmt} // die 'BUG: {cur_cmt} missing';
 }
 
 # only update last_commit for $i on reindex iff newer than current
-# $sync will be used by subclasses
 sub update_last_commit {
-	my ($self, $sync, $unit, $cmt) = @_;
+	my ($self, $sync, $stk) = @_;
+	my $unit = $sync->{unit} // return;
+	my $latest_cmt = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
+	defined($latest_cmt) or return;
 	my $last = last_epoch_commit($self, $unit->{epoch});
-	if (defined $last && is_ancestor($unit->{git}, $last, $cmt)) {
-		my @cmd = (qw(rev-list --count), "$last..$cmt");
+	if (defined $last && is_ancestor($unit->{git}, $last, $latest_cmt)) {
+		my @cmd = (qw(rev-list --count), "$last..$latest_cmt");
 		chomp(my $n = $unit->{git}->qx(@cmd));
 		return if $n ne '' && $n == 0;
 	}
-	last_epoch_commit($self, $unit->{epoch}, $cmt);
+	last_epoch_commit($self, $unit->{epoch}, $latest_cmt);
 }
 
 sub last_commits {
@@ -1245,8 +1249,16 @@ sub index_todo ($$$) {
 		$pfx //= $unit->{git}->{git_dir};
 	}
 	local $self->{current_info} = "$pfx ";
-	while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
-		my $req = { %$sync, autime => $at, cotime => $ct, oid => $oid };
+	local $sync->{latest_cmt} = \(my $latest_cmt);
+	local $sync->{unit} = $unit;
+	while (my ($f, $at, $ct, $oid, $cmt) = $stk->pop_rec) {
+		my $req = {
+			%$sync,
+			autime => $at,
+			cotime => $ct,
+			oid => $oid,
+			cur_cmt => $cmt
+		};
 		if ($f eq 'm') {
 			if ($sync->{max_size}) {
 				$all->check_async($oid, \&check_size, $req);
@@ -1261,7 +1273,7 @@ sub index_todo ($$$) {
 		}
 	}
 	$all->async_wait_all;
-	$self->update_last_commit($sync, $unit, $stk->{latest_cmt});
+	$self->update_last_commit($sync, $stk);
 }
 
 sub xapian_only {
diff --git a/t/idx_stack.t b/t/idx_stack.t
index 35aff37b..e0474fa4 100644
--- a/t/idx_stack.t
+++ b/t/idx_stack.t
@@ -6,6 +6,8 @@ use Test::More;
 use_ok 'PublicInbox::IdxStack';
 my $oid_a = '03c21563cf15c241687966b5b2a3f37cdc193316';
 my $oid_b = '963caad026055ab9bcbe3ee9550247f9d8840feb';
+my $cmt_a = 'df8e4a0612545d53672036641e9f076efc94c2f6';
+my $cmt_b = '3ba7c9fa4a083c439e768882c571c2026a981ca5';
 
 my $stk = PublicInbox::IdxStack->new;
 is($stk->read_prepare, $stk, 'nothing');
@@ -13,19 +15,19 @@ is($stk->num_records, 0, 'no records');
 is($stk->pop_rec, undef, 'undef on empty');
 
 $stk = PublicInbox::IdxStack->new;
-$stk->push_rec('m', 1234, 5678, $oid_a);
+$stk->push_rec('m', 1234, 5678, $oid_a, $cmt_a);
 is($stk->read_prepare, $stk, 'read_prepare');
 is($stk->num_records, 1, 'num_records');
-is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a], 'pop once');
+is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a, $cmt_a], 'pop once');
 is($stk->pop_rec, undef, 'undef on empty');
 
 $stk = PublicInbox::IdxStack->new;
-$stk->push_rec('m', 1234, 5678, $oid_a);
-$stk->push_rec('d', 1234, 5678, $oid_b);
+$stk->push_rec('m', 1234, 5678, $oid_a, $cmt_a);
+$stk->push_rec('d', 1234, 5678, $oid_b, $cmt_b);
 is($stk->read_prepare, $stk, 'read_prepare');
 is($stk->num_records, 2, 'num_records');
-is_deeply([$stk->pop_rec], ['d', 1234, 5678, $oid_b], 'pop');
-is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a], 'pop-pop');
+is_deeply([$stk->pop_rec], ['d', 1234, 5678, $oid_b, $cmt_b], 'pop');
+is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a, $cmt_a], 'pop-pop');
 is($stk->pop_rec, undef, 'empty');
 
 SKIP: {
@@ -37,11 +39,11 @@ SKIP: {
 	while (<$fh>) {
 		chomp;
 		my ($at, $ct, $H) = split(/\./);
-		$stk //= PublicInbox::IdxStack->new($H);
+		$stk //= PublicInbox::IdxStack->new;
 		# not bothering to parse blobs here, just using commit OID
 		# as a blob OID since they're the same size + format
-		$stk->push_rec('m', $at + 0, $ct + 0, $H);
-		push(@expect, [ 'm', $at, $ct, $H ]);
+		$stk->push_rec('m', $at + 0, $ct + 0, $H, $H);
+		push(@expect, [ 'm', $at, $ct, $H, $H ]);
 	}
 	$stk or skip('nothing from git log', 3);
 	is($stk->read_prepare, $stk, 'read_prepare');

  reply	other threads:[~2020-11-13 11:11 UTC|newest]

Thread overview: 7+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-11-13 11:11 [PATCH 0/4] extindex: checkpoints, graceful shutdown, cleanups Eric Wong
2020-11-13 11:11 ` Eric Wong [this message]
2020-11-13 11:11 ` [PATCH 2/4] *index: avoid per-epoch --batch-check processes Eric Wong
2020-11-13 12:38   ` Kyle Meyer
2020-11-15  3:03     ` Eric Wong
2020-11-13 11:11 ` [PATCH 3/4] *index: discard sync->{todo} on iteration Eric Wong
2020-11-13 11:11 ` [PATCH 4/4] extindex: support graceful shutdown via QUIT/INT/TERM Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20201113111144.23038-2-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).