* [PATCH 01/20] index: support --rethread switch to fix old indices
2020-07-24 5:55 [PATCH 00/20] indexing changes and new features Eric Wong
@ 2020-07-24 5:55 ` Eric Wong
2020-07-24 5:55 ` [PATCH 02/20] v2: index forwards (via `git log --reverse') Eric Wong
` (18 subsequent siblings)
19 siblings, 0 replies; 21+ messages in thread
From: Eric Wong @ 2020-07-24 5:55 UTC (permalink / raw)
To: meta
Older versions of public-inbox < 1.3.0 had subtly
different semantics around threading in some corner
cases. This switch (when combined with --reindex)
allows us to fix them by regenerating associations.
---
Documentation/public-inbox-index.pod | 23 +++++++--
lib/PublicInbox/OverIdx.pm | 76 ++++++++++++++++++++++++++--
lib/PublicInbox/SearchIdx.pm | 7 ++-
lib/PublicInbox/V2Writable.pm | 4 +-
script/public-inbox-index | 2 +-
t/v1reindex.t | 34 +++++++++++++
t/v2reindex.t | 45 ++++++++++++++++
7 files changed, 177 insertions(+), 14 deletions(-)
diff --git a/Documentation/public-inbox-index.pod b/Documentation/public-inbox-index.pod
index ff2e54867..08f2fbf45 100644
--- a/Documentation/public-inbox-index.pod
+++ b/Documentation/public-inbox-index.pod
@@ -68,12 +68,25 @@ Xapian database. Using this with C<--compact> or running
L<public-inbox-compact(1)> afterwards is recommended to
release free space.
-public-inbox protects writes to various indices with L<flock(2)>,
-so it is safe to reindex while L<public-inbox-watch(1)>,
-L<public-inbox-mda(1)> or L<public-inbox-learn(1)> run.
+public-inbox protects writes to various indices with
+L<flock(2)>, so it is safe to reindex (and rethread) while
+L<public-inbox-watch(1)>, L<public-inbox-mda(1)> or
+L<public-inbox-learn(1)> run.
-This does not touch the NNTP article number database or
-affect threading.
+This does not touch the NNTP article number database.
+It does not affect threading unless C<--rethread> is
+used.
+
+=item --rethread
+
+Regenerate internal THREADID and message thread associations
+when reindexing.
+
+This fixes some bugs in older versions of public-inbox. While
+it is possible to use this without C<--reindex>, it makes little
+sense to do so.
+
+Available in public-inbox 1.6.0 (PENDING).
=item --prune
diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm
index 5601e602c..c57be7243 100644
--- a/lib/PublicInbox/OverIdx.pm
+++ b/lib/PublicInbox/OverIdx.pm
@@ -17,6 +17,7 @@ use PublicInbox::MID qw/id_compress mids_for_index references/;
use PublicInbox::Smsg qw(subject_normalized);
use Compress::Zlib qw(compress);
use PublicInbox::Search;
+use Carp qw(croak);
sub dbh_new {
my ($self) = @_;
@@ -37,6 +38,13 @@ sub dbh_new {
$dbh;
}
+sub new {
+ my ($class, $f) = @_;
+ my $self = $class->SUPER::new($f);
+ $self->{min_tid} = 0;
+ $self;
+}
+
sub get_counter ($$) {
my ($dbh, $key) = @_;
my $sth = $dbh->prepare_cached(<<'', undef, 1);
@@ -164,8 +172,12 @@ sub _resolve_mid_to_tid {
my $cur_tid = $smsg->{tid};
if (defined $$tid) {
merge_threads($self, $$tid, $cur_tid);
- } else {
+ } elsif ($cur_tid > $self->{min_tid}) {
$$tid = $cur_tid;
+ } else { # rethreading, queue up dead ghosts
+ $$tid = next_tid($self);
+ my $num = $smsg->{num};
+ push(@{$self->{-ghosts_to_delete}}, $num) if $num < 0;
}
1;
}
@@ -175,7 +187,10 @@ sub resolve_mid_to_tid {
my ($self, $mid) = @_;
my $tid;
each_by_mid($self, $mid, ['tid'], \&_resolve_mid_to_tid, \$tid);
- defined $tid ? $tid : create_ghost($self, $mid);
+ if (my $del = delete $self->{-ghosts_to_delete}) {
+ delete_by_num($self, $_) for @$del;
+ }
+ $tid // create_ghost($self, $mid);
}
sub create_ghost {
@@ -221,7 +236,7 @@ sub link_refs {
merge_threads($self, $tid, $ptid);
}
} else {
- $tid = defined $old_tid ? $old_tid : next_tid($self);
+ $tid = $old_tid // next_tid($self);
}
$tid;
}
@@ -278,10 +293,17 @@ sub _add_over {
my $cur_tid = $smsg->{tid};
my $n = $smsg->{num};
die "num must not be zero for $mid" if !$n;
- $$old_tid = $cur_tid unless defined $$old_tid;
+ my $cur_valid = $cur_tid > $self->{min_tid};
+
if ($n > 0) { # regular mail
- merge_threads($self, $$old_tid, $cur_tid);
+ if ($cur_valid) {
+ $$old_tid //= $cur_tid;
+ merge_threads($self, $$old_tid, $cur_tid);
+ } else {
+ $$old_tid //= next_tid($self);
+ }
} elsif ($n < 0) { # ghost
+ $$old_tid //= $cur_valid ? $cur_tid : next_tid($self);
link_refs($self, $refs, $$old_tid);
delete_by_num($self, $n);
$$v++;
@@ -297,6 +319,7 @@ sub add_over {
begin_lazy($self);
delete_by_num($self, $num, \$old_tid);
+ $old_tid = undef if ($old_tid // 0) <= $self->{min_tid};
foreach my $mid (@$mids) {
my $v = 0;
each_by_mid($self, $mid, ['tid'], \&_add_over,
@@ -456,4 +479,47 @@ sub create {
$self->disconnect;
}
+sub rethread_prepare {
+ my ($self, $opt) = @_;
+ return unless $opt->{rethread};
+ begin_lazy($self);
+ my $min = $self->{min_tid} = get_counter($self->{dbh}, 'thread') // 0;
+ my $pr = $opt->{-progress};
+ $pr->("rethread min THREADID ".($min + 1)."\n") if $pr && $min;
+}
+
+sub rethread_done {
+ my ($self, $opt) = @_;
+ return unless $opt->{rethread} && $self->{txn};
+ defined(my $min = $self->{min_tid}) or croak('BUG: no min_tid');
+ my $dbh = $self->{dbh} or croak('BUG: no dbh');
+ my $rows = $dbh->selectall_arrayref(<<'', { Slice => {} }, $min);
+SELECT num,tid FROM over WHERE num < 0 AND tid < ?
+
+ my $show_id = $dbh->prepare('SELECT id FROM id2num WHERE num = ?');
+ my $show_mid = $dbh->prepare('SELECT mid FROM msgid WHERE id = ?');
+ my $pr = $opt->{-progress};
+ my $total = 0;
+ for my $r (@$rows) {
+ my $exp = 0;
+ $show_id->execute($r->{num});
+ while (defined(my $id = $show_id->fetchrow_array)) {
+ ++$exp;
+ $show_mid->execute($id);
+ my $mid = $show_mid->fetchrow_array;
+ if (!defined($mid)) {
+ warn <<EOF;
+E: ghost NUM=$r->{num} ID=$id THREADID=$r->{tid} has no Message-ID
+EOF
+ next;
+ }
+ $pr->(<<EOM) if $pr;
+I: ghost $r->{num} <$mid> THREADID=$r->{tid} culled
+EOM
+ }
+ delete_by_num($self, $r->{num});
+ }
+ $pr->("I: rethread culled $total ghosts\n") if $pr && $total;
+}
+
1;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 831625090..e641ffd43 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -723,6 +723,7 @@ sub _index_sync {
my $pr = $opts->{-progress};
my $xdb = $self->begin_txn_lazy;
+ $self->{over}->rethread_prepare($opts);
my $mm = _msgmap_init($self);
do {
$xlog = undef; # stop previous git-log via SIGPIPE
@@ -761,12 +762,14 @@ sub _index_sync {
$xdb->set_metadata('last_commit', $newest);
}
}
+
+ $self->{over}->rethread_done($opts) if $newest; # all done
$self->commit_txn_lazy;
$git->cleanup;
$xdb = _xdb_release($self, $nr);
- # let another process do some work... <
+ # let another process do some work...
$pr->("indexed $nr/$self->{ntodo}\n") if $pr && $nr;
- if (!$newest) {
+ if (!$newest) { # more to come
$xdb = $self->begin_txn_lazy;
$dbh->begin_work if $dbh;
}
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 0582dd5e3..16556ddc2 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -1308,6 +1308,7 @@ sub index_sync {
my $latest = git_dir_latest($self, \$epoch_max);
return unless defined $latest;
$self->idx_init($opt); # acquire lock
+ $self->{over}->rethread_prepare($opt);
my $sync = {
D => {}, # "$mid\0$chash" => $oid
unindex_range => {}, # EPOCH => oid_old..oid_new
@@ -1370,12 +1371,13 @@ sub index_sync {
my $pr = $sync->{-opt}->{-progress};
$pr->('all.git '.sprintf($sync->{-regen_fmt}, $nr)) if $pr;
}
+ $self->{over}->rethread_done($opt);
# reindex does not pick up new changes, so we rerun w/o it:
if ($opt->{reindex}) {
my %again = %$opt;
$sync = undef;
- delete @again{qw(reindex -skip_lock)};
+ delete @again{qw(rethread reindex -skip_lock)};
index_sync($self, \%again);
}
}
diff --git a/script/public-inbox-index b/script/public-inbox-index
index 6217fb86c..2e1934b08 100755
--- a/script/public-inbox-index
+++ b/script/public-inbox-index
@@ -15,7 +15,7 @@ use PublicInbox::Xapcmd;
my $compact_opt;
my $opt = { quiet => -1, compact => 0, maxsize => undef };
-GetOptions($opt, qw(verbose|v+ reindex compact|c+ jobs|j=i prune
+GetOptions($opt, qw(verbose|v+ reindex rethread compact|c+ jobs|j=i prune
indexlevel|L=s maxsize|max-size=s batchsize|batch-size=s))
or die "bad command-line args\n$usage";
die "--jobs must be >= 0\n" if defined $opt->{jobs} && $opt->{jobs} < 0;
diff --git a/t/v1reindex.t b/t/v1reindex.t
index 9f23ef01e..8cb751881 100644
--- a/t/v1reindex.t
+++ b/t/v1reindex.t
@@ -11,6 +11,7 @@ require_git(2.6);
require_mods(qw(DBD::SQLite Search::Xapian));
use_ok 'PublicInbox::SearchIdx';
use_ok 'PublicInbox::Import';
+use_ok 'PublicInbox::OverIdx';
my ($inboxdir, $for_destroy) = tmpdir();
my $ibx_config = {
inboxdir => $inboxdir,
@@ -427,5 +428,38 @@ ok(!-d $xap, 'Xapian directories removed again');
], 'msgmap as expected' );
}
+{
+ my @warn;
+ local $SIG{__WARN__} = sub { push @warn, @_ };
+ my $ibx = PublicInbox::Inbox->new({ %$ibx_config });
+ my $f = $ibx->over->{dbh}->sqlite_db_filename;
+ my $over = PublicInbox::OverIdx->new($f);
+ my $dbh = $over->connect;
+ my $non_ghost_tids = sub {
+ $dbh->selectall_arrayref(<<'');
+SELECT tid FROM over WHERE num > 0 ORDER BY tid ASC
+
+ };
+ my $before = $non_ghost_tids->();
+
+ # mess up threading:
+ my $tid = PublicInbox::OverIdx::get_counter($dbh, 'thread');
+ my $nr = $dbh->do('UPDATE over SET tid = ?', undef, $tid);
+
+ my $rw = PublicInbox::SearchIdx->new($ibx, 1);
+ my @pr;
+ my $pr = sub { push @pr, @_ };
+ $rw->index_sync({reindex => 1, rethread => 1, -progress => $pr });
+ my @n = $dbh->selectrow_array(<<EOS, undef, $tid);
+SELECT COUNT(*) FROM over WHERE tid <= ?
+EOS
+ is_deeply(\@n, [ 0 ], 'rethread dropped old threadids');
+ my $after = $non_ghost_tids->();
+ ok($after->[0]->[0] > $before->[-1]->[0],
+ 'all tids greater than before');
+ is(scalar @$after, scalar @$before, 'thread count unchanged');
+ is_deeply([], \@warn, 'no warnings');
+ # diag "@pr"; # XXX do we care?
+}
done_testing();
diff --git a/t/v2reindex.t b/t/v2reindex.t
index 77deffb4b..ea2b24e59 100644
--- a/t/v2reindex.t
+++ b/t/v2reindex.t
@@ -10,6 +10,7 @@ use PublicInbox::TestCommon;
require_git(2.6);
require_mods(qw(DBD::SQLite Search::Xapian));
use_ok 'PublicInbox::V2Writable';
+use_ok 'PublicInbox::OverIdx';
my ($inboxdir, $for_destroy) = tmpdir();
my $ibx_config = {
inboxdir => $inboxdir,
@@ -423,6 +424,46 @@ ok(!-d $xap, 'Xapian directories removed again');
], 'msgmap as expected' );
}
+my $check_rethread = sub {
+ my ($desc) = @_;
+ my @warn;
+ local $SIG{__WARN__} = sub { push @warn, @_ };
+ my %config = %$ibx_config;
+ my $ibx = PublicInbox::Inbox->new(\%config);
+ my $f = $ibx->over->{dbh}->sqlite_db_filename;
+ my $over = PublicInbox::OverIdx->new($f);
+ my $dbh = $over->connect;
+ my $non_ghost_tids = sub {
+ $dbh->selectall_arrayref(<<'');
+SELECT tid FROM over WHERE num > 0 ORDER BY tid ASC
+
+ };
+ my $before = $non_ghost_tids->();
+
+ # mess up threading:
+ my $tid = PublicInbox::OverIdx::get_counter($dbh, 'thread');
+ my $nr = $dbh->do('UPDATE over SET tid = ?', undef, $tid);
+ diag "messing up all threads with tid=$tid";
+
+ my $v2w = PublicInbox::V2Writable->new($ibx);
+ my @pr;
+ my $pr = sub { push @pr, @_ };
+ $v2w->index_sync({reindex => 1, rethread => 1, -progress => $pr});
+ # diag "@pr"; # nobody cares
+ is_deeply(\@warn, [], 'no warnings on reindex + rethread');
+
+ my @n = $dbh->selectrow_array(<<EOS, undef, $tid);
+SELECT COUNT(*) FROM over WHERE tid <= ?
+EOS
+ is_deeply(\@n, [ 0 ], 'rethread dropped old threadids');
+ my $after = $non_ghost_tids->();
+ ok($after->[0]->[0] > $before->[-1]->[0],
+ 'all tids greater than before');
+ is(scalar @$after, scalar @$before, 'thread count unchanged');
+};
+
+$check_rethread->('no-monster');
+
# A real example from linux-renesas-soc on lore where a 3-headed monster
# of a message has 3 sets of common headers. Another normal message
# previously existed with a single Message-ID that conflicts with one
@@ -497,4 +538,8 @@ EOF
is_deeply([values %uniq], [3], 'search on different subjects');
}
+# XXX: not deterministic when dealing with ambiguous messages, oh well
+$check_rethread->('3-headed-monster once');
+$check_rethread->('3-headed-monster twice');
+
done_testing();
^ permalink raw reply related [flat|nested] 21+ messages in thread
* [PATCH 02/20] v2: index forwards (via `git log --reverse')
2020-07-24 5:55 [PATCH 00/20] indexing changes and new features Eric Wong
2020-07-24 5:55 ` [PATCH 01/20] index: support --rethread switch to fix old indices Eric Wong
@ 2020-07-24 5:55 ` Eric Wong
2020-07-24 5:55 ` [PATCH 03/20] v2writable: introduce idx_stack Eric Wong
` (17 subsequent siblings)
19 siblings, 0 replies; 21+ messages in thread
From: Eric Wong @ 2020-07-24 5:55 UTC (permalink / raw)
To: meta
Since we'll need to expose THREADID to JMAP and IMAP users,
index all messages in the order they were committed to ensure
our `tid' (thread ID) column ascends in mirrors the same way
they do in the source inbox.
This drastically simplifies our code but increases memory
usage of `git-log'. The next commit will bring memory use
back down at the expense of $TMPDIR usage.
---
MANIFEST | 1 -
lib/PublicInbox/MultiMidQueue.pm | 62 -------
lib/PublicInbox/V2Writable.pm | 279 +++++++++----------------------
3 files changed, 81 insertions(+), 261 deletions(-)
delete mode 100644 lib/PublicInbox/MultiMidQueue.pm
diff --git a/MANIFEST b/MANIFEST
index 963caad02..9d90c8c23 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -155,7 +155,6 @@ lib/PublicInbox/MboxGz.pm
lib/PublicInbox/MsgIter.pm
lib/PublicInbox/MsgTime.pm
lib/PublicInbox/Msgmap.pm
-lib/PublicInbox/MultiMidQueue.pm
lib/PublicInbox/NNTP.pm
lib/PublicInbox/NNTPD.pm
lib/PublicInbox/NNTPdeflate.pm
diff --git a/lib/PublicInbox/MultiMidQueue.pm b/lib/PublicInbox/MultiMidQueue.pm
deleted file mode 100644
index eb2ecf2f2..000000000
--- a/lib/PublicInbox/MultiMidQueue.pm
+++ /dev/null
@@ -1,62 +0,0 @@
-# Copyright (C) 2020 all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-
-# temporary queue for public-inbox-index to support multi-Message-ID
-# messages on mirrors of v2 inboxes
-package PublicInbox::MultiMidQueue;
-use strict;
-use SDBM_File; # part of Perl standard library
-use Fcntl qw(O_RDWR O_CREAT);
-use File::Temp 0.19 (); # 0.19 for ->newdir
-my %e = (
- freebsd => 0x100000,
- linux => 0x80000,
- netbsd => 0x400000,
- openbsd => 0x10000,
-);
-my $O_CLOEXEC = $e{$^O} // 0;
-
-sub new {
- my ($class) = @_;
- my $tmpdir = File::Temp->newdir('multi-mid-q-XXXXXX', TMPDIR => 1);
- my $base = $tmpdir->dirname . '/q';
- my %sdbm;
- my $flags = O_RDWR|O_CREAT;
- if (!tie(%sdbm, 'SDBM_File', $base, $flags|$O_CLOEXEC, 0600)) {
- if (!tie(%sdbm, 'SDBM_File', $base, $flags, 0600)) {
- die "could not tie ($base): $!";
- }
- $O_CLOEXEC = 0;
- }
-
- bless {
- cur => 1,
- min => 1,
- max => 0,
- sdbm => \%sdbm,
- tmpdir => $tmpdir,
- }, $class;
-}
-
-sub set_oid {
- my ($self, $i, $oid, $v2w) = @_;
- $self->{max} = $i if $i > $self->{max};
- $self->{min} = $i if $i < $self->{min};
- $self->{sdbm}->{$i} = "$oid\t$v2w->{autime}\t$v2w->{cotime}";
-}
-
-sub get_oid {
- my ($self, $i, $v2w) = @_;
- my $rec = $self->{sdbm}->{$i} or return;
- my ($oid, $autime, $cotime) = split(/\t/, $rec);
- $v2w->{autime} = $autime;
- $v2w->{cotime} = $cotime;
- $oid
-}
-
-sub push_oid {
- my ($self, $oid, $v2w) = @_;
- set_oid($self, $self->{cur}++, $oid, $v2w);
-}
-
-1;
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 16556ddc2..c04ea5d77 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -18,10 +18,10 @@ use PublicInbox::OverIdx;
use PublicInbox::Msgmap;
use PublicInbox::Spawn qw(spawn popen_rd);
use PublicInbox::SearchIdx;
-use PublicInbox::MultiMidQueue;
use IO::Handle; # ->autoflush
use File::Temp qw(tempfile);
+my $x40 = qr/[a-f0-9]{40}/;
# an estimate of the post-packed size to the raw uncompressed size
my $PACKING_FACTOR = 0.4;
@@ -862,18 +862,6 @@ sub atfork_child {
$self->{bnote}->[1];
}
-sub mark_deleted ($$$$) {
- my ($self, $sync, $git, $oid) = @_;
- return if PublicInbox::SearchIdx::too_big($self, $git, $oid);
- my $msgref = $git->cat_file($oid);
- my $mime = PublicInbox::Eml->new($$msgref);
- my $mids = mids($mime->header_obj);
- my $chash = content_hash($mime);
- foreach my $mid (@$mids) {
- $sync->{D}->{"$mid\0$chash"} = $oid;
- }
-}
-
sub reindex_checkpoint ($$$) {
my ($self, $sync, $git) = @_;
@@ -891,107 +879,11 @@ sub reindex_checkpoint ($$$) {
$sync->{mm_tmp}->atfork_parent;
}
-# only for a few odd messages with multiple Message-IDs
-sub reindex_oid_m ($$$$;$) {
- my ($self, $sync, $git, $oid, $regen_num) = @_;
- $self->{current_info} = "multi_mid $oid";
- my ($num, $mid0, $len);
- my $msgref = $git->cat_file($oid, \$len);
- my $mime = PublicInbox::Eml->new($$msgref);
- my $mids = mids($mime->header_obj);
- my $chash = content_hash($mime);
- die "BUG: reindex_oid_m called for <=1 mids" if scalar(@$mids) <= 1;
-
- for my $mid (reverse @$mids) {
- delete($sync->{D}->{"$mid\0$chash"}) and
- die "BUG: reindex_oid should handle <$mid> delete";
- }
- my $over = $self->{over};
- for my $mid (reverse @$mids) {
- ($num, $mid0) = $over->num_mid0_for_oid($oid, $mid);
- next unless defined $num;
- if (defined($regen_num) && $regen_num != $num) {
- die "BUG: regen(#$regen_num) != over(#$num)";
- }
- }
- unless (defined($num)) {
- for my $mid (reverse @$mids) {
- # is this a number we got before?
- my $n = $sync->{mm_tmp}->num_for($mid);
- next unless defined $n;
- next if defined($regen_num) && $regen_num != $n;
- ($num, $mid0) = ($n, $mid);
- last;
- }
- }
- if (defined($num)) {
- $sync->{mm_tmp}->num_delete($num);
- } elsif (defined $regen_num) {
- $num = $regen_num;
- for my $mid (reverse @$mids) {
- $self->{mm}->mid_set($num, $mid) == 1 or next;
- $mid0 = $mid;
- last;
- }
- unless (defined $mid0) {
- warn "E: cannot regen #$num\n";
- return;
- }
- } else { # fixup bugs in old mirrors on reindex
- for my $mid (reverse @$mids) {
- $num = $self->{mm}->mid_insert($mid);
- next unless defined $num;
- $mid0 = $mid;
- last;
- }
- if (defined $mid0) {
- if ($sync->{reindex}) {
- warn "reindex added #$num <$mid0>\n";
- }
- } else {
- warn "E: cannot find article #\n";
- return;
- }
- }
- $sync->{nr}++;
- my $smsg = bless {
- raw_bytes => $len,
- num => $num,
- blob => $oid,
- mid => $mid0,
- }, 'PublicInbox::Smsg';
- $smsg->populate($mime, $self);
- if (do_idx($self, $msgref, $mime, $smsg)) {
- reindex_checkpoint($self, $sync, $git);
- }
-}
-
-sub check_unindexed ($$$) {
- my ($self, $num, $mid0) = @_;
- my $unindexed = $self->{unindexed} // {};
- my $n = delete($unindexed->{$mid0});
- defined $n or return;
- if ($n != $num) {
- die "BUG: unindexed $n != $num <$mid0>\n";
- } else {
- $self->{mm}->mid_set($num, $mid0);
- }
-}
-
-sub multi_mid_q_push ($$$) {
- my ($self, $sync, $oid) = @_;
- my $multi_mid = $sync->{multi_mid} //= PublicInbox::MultiMidQueue->new;
- if ($sync->{reindex}) { # no regen on reindex
- $multi_mid->push_oid($oid, $self);
- } else {
- my $num = $sync->{regen}--;
- die "BUG: ran out of article numbers" if $num <= 0;
- $multi_mid->set_oid($num, $oid, $self);
- }
-}
-
sub reindex_oid ($$$$) {
my ($self, $sync, $git, $oid) = @_;
+ if (my $D = $sync->{D}) { # don't waste I/O on deletes
+ return if $D->{pack('H*', $oid)};
+ }
return if PublicInbox::SearchIdx::too_big($self, $git, $oid);
my ($num, $mid0, $len);
my $msgref = $git->cat_file($oid, \$len);
@@ -1003,48 +895,57 @@ sub reindex_oid ($$$$) {
if (scalar(@$mids) == 0) {
warn "E: $oid has no Message-ID, skipping\n";
return;
- } elsif (scalar(@$mids) == 1) {
- my $mid = $mids->[0];
-
- # was the file previously marked as deleted?, skip if so
- if (delete($sync->{D}->{"$mid\0$chash"})) {
- if (!$sync->{reindex}) {
- $num = $sync->{regen}--;
- $self->{mm}->num_highwater($num);
- }
- return;
- }
+ }
- # is this a number we got before?
- $num = $sync->{mm_tmp}->num_for($mid);
+ # {unindexed} is unlikely
+ if ((my $unindexed = $self->{unindexed}) && scalar(@$mids) == 1) {
+ $num = delete($unindexed->{$mids->[0]});
if (defined $num) {
- $mid0 = $mid;
- check_unindexed($self, $num, $mid0);
- } else {
- $num = $sync->{regen}--;
- die "BUG: ran out of article numbers" if $num <= 0;
- if ($self->{mm}->mid_set($num, $mid) != 1) {
- warn "E: unable to assign $num => <$mid>\n";
- return;
- }
- $mid0 = $mid;
+ $mid0 = $mids->[0];
+ $self->{mm}->mid_set($num, $mid0);
+ delete($self->{unindexed}) if !keys(%$unindexed);
+ }
+ }
+ if (!defined($num)) { # reuse if reindexing (or duplicates)
+ my $over = $self->{over};
+ for my $mid (@$mids) {
+ ($num, $mid0) = $over->num_mid0_for_oid($oid, $mid);
+ last if defined $num;
}
- } else { # multiple MIDs are a weird case:
- my $del = 0;
- for (@$mids) {
- $del += delete($sync->{D}->{"$_\0$chash"}) // 0;
+ }
+ $mid0 //= do { # is this a number we got before?
+ $num = $sync->{mm_tmp}->num_for($mids->[0]);
+ defined($num) ? $mids->[0] : undef;
+ };
+ if (!defined($num)) {
+ for (my $i = $#$mids; $i >= 1; $i--) {
+ $num = $sync->{mm_tmp}->num_for($mids->[$i]);
+ if (defined($num)) {
+ $mid0 = $mids->[$i];
+ last;
+ }
}
- if ($del) {
- unindex_oid_remote($self, $oid, $_) for @$mids;
- # do not delete from {mm_tmp}, since another
- # single-MID message may use it.
- } else { # handle them at the end:
- multi_mid_q_push($self, $sync, $oid);
+ }
+ if (defined($num)) {
+ $sync->{mm_tmp}->num_delete($num);
+ } else { # never seen
+ $num = $self->{mm}->mid_insert($mids->[0]);
+ if (defined($num)) {
+ $mid0 = $mids->[0];
+ } else { # rare, try the rest of them, backwards
+ for (my $i = $#$mids; $i >= 1; $i--) {
+ $num = $self->{mm}->mid_insert($mids->[$i]);
+ if (defined($num)) {
+ $mid0 = $mids->[$i];
+ last;
+ }
+ }
}
+ }
+ if (!defined($num)) {
+ warn "E: $oid <", join('> <', @$mids), "> is a duplicate\n";
return;
}
- $sync->{mm_tmp}->mid_delete($mid0) or
- die "failed to delete <$mid0> for article #$num\n";
$sync->{nr}++;
my $smsg = bless {
raw_bytes => $len,
@@ -1134,6 +1035,22 @@ $range
$range;
}
+# don't bump num_highwater on --reindex
+sub mark_deleted ($$$) {
+ my ($git, $sync, $range) = @_;
+ my $D = $sync->{D} //= {}; # pack("H*", $oid) => NR
+ my $fh = $git->popen(qw(log --raw --no-abbrev
+ --pretty=tformat:%H
+ --no-notes --no-color --no-renames
+ --diff-filter=AM), $range, '--', 'd');
+ while (<$fh>) {
+ if (/\A:\d{6} 100644 $x40 ($x40) [AM]\td$/o) {
+ $D->{pack('H*', $1)}++;
+ }
+ }
+ close $fh or die "git log failed: \$?=$?";
+}
+
sub sync_prepare ($$$) {
my ($self, $sync, $epoch_max) = @_;
my $pr = $sync->{-opt}->{-progress};
@@ -1144,7 +1061,7 @@ sub sync_prepare ($$$) {
# without {reindex}
my $reindex_heads = last_commits($self, $epoch_max) if $sync->{reindex};
- for (my $i = $epoch_max; $i >= 0; $i--) {
+ for my $i (0..$epoch_max) {
die 'BUG: already indexing!' if $self->{reindex_pipe};
my $git_dir = git_dir_n($self, $i);
-d $git_dir or next; # missing epochs are fine
@@ -1168,8 +1085,8 @@ sub sync_prepare ($$$) {
close $fh or die "git log failed: \$?=$?";
$pr->("$n\n") if $pr;
$regen_max += $n;
+ mark_deleted($git, $sync, $range) if $sync->{reindex};
}
-
return 0 if (!$regen_max && !keys(%{$self->{unindex_range}}));
# reindex should NOT see new commits anymore, if we do,
@@ -1203,10 +1120,8 @@ sub unindex_oid ($$$;$) {
my ($id, $prev);
while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
$gone{$smsg->{num}} = 1 if $oid eq $smsg->{blob};
- 1; # continue
}
- my $n = scalar keys %gone;
- next unless $n;
+ my $n = scalar(keys(%gone)) or next;
if ($n > 1) {
warn "BUG: multiple articles linked to $oid\n",
join(',',sort keys %gone), "\n";
@@ -1222,7 +1137,6 @@ sub unindex_oid ($$$;$) {
}
}
-my $x40 = qr/[a-f0-9]{40}/;
sub unindex ($$$$) {
my ($self, $sync, $git, $unindex_range) = @_;
my $unindexed = $self->{unindexed} ||= {}; # $mid0 => $num
@@ -1276,22 +1190,29 @@ sub index_epoch ($$$) {
if (my $pr = $sync->{-opt}->{-progress}) {
$pr->("$i.git indexing $range\n");
}
-
- my @cmd = qw(log --raw -r --pretty=tformat:%H.%at.%ct
+ my @cmd = qw(log --reverse --raw -r --pretty=tformat:%H.%at.%ct
--no-notes --no-color --no-abbrev --no-renames);
my $fh = $self->{reindex_pipe} = $git->popen(@cmd, $range);
my $cmt;
+ my $D = $sync->{D};
while (<$fh>) {
chomp;
$self->{current_info} = "$i.git $_";
if (/\A($x40)\.([0-9]+)\.([0-9]+)$/o) {
- $cmt //= $1;
+ $cmt = $1;
$self->{autime} = $2;
$self->{cotime} = $3;
} elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\tm$/o) {
reindex_oid($self, $sync, $git, $1);
} elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\td$/o) {
- mark_deleted($self, $sync, $git, $1);
+ # allow re-add if there was user error
+ my $oid = $1;
+ if ($D) {
+ my $oid_bin = pack('H*', $oid);
+ my $nr = --$D->{$oid_bin};
+ delete($D->{$oid_bin}) if $nr <= 0;
+ }
+ unindex_oid($self, $git, $oid);
}
}
close $fh or die "git log failed: \$?=$?";
@@ -1310,15 +1231,12 @@ sub index_sync {
$self->idx_init($opt); # acquire lock
$self->{over}->rethread_prepare($opt);
my $sync = {
- D => {}, # "$mid\0$chash" => $oid
unindex_range => {}, # EPOCH => oid_old..oid_new
reindex => $opt->{reindex},
-opt => $opt
};
$sync->{ranges} = sync_ranges($self, $sync, $epoch_max);
- $sync->{regen} = sync_prepare($self, $sync, $epoch_max);
-
- if ($sync->{regen}) {
+ if (sync_prepare($self, $sync, $epoch_max)) {
# tmp_clone seems to fail if inside a transaction, so
# we rollback here (because we opened {mm} for reading)
# Note: we do NOT rely on DBI transactions for atomicity;
@@ -1328,43 +1246,8 @@ sub index_sync {
$sync->{mm_tmp} = $self->{mm}->tmp_clone;
}
- # work backwards through history
- for (my $i = $epoch_max; $i >= 0; $i--) {
- index_epoch($self, $sync, $i);
- }
-
- # unindex is required for leftovers if "deletes" affect messages
- # in a previous fetch+index window:
- my $git;
- if (my @leftovers = values %{delete $sync->{D}}) {
- $git = $self->{-inbox}->git;
- for my $oid (@leftovers) {
- $self->{current_info} = "leftover $oid";
- unindex_oid($self, $git, $oid);
- }
- }
- if (my $multi_mid = delete $sync->{multi_mid}) {
- $git //= $self->{-inbox}->git;
- my $min = $multi_mid->{min};
- my $max = $multi_mid->{max};
- if ($sync->{reindex}) {
- # we may need to create new Message-IDs if mirrors
- # were initially indexed with old versions
- for (my $i = $max; $i >= $min; $i--) {
- my $oid;
- $oid = $multi_mid->get_oid($i, $self) or next;
- next unless defined $oid;
- reindex_oid_m($self, $sync, $git, $oid);
- }
- } else { # regen on initial index
- for my $num ($min..$max) {
- my $oid;
- $oid = $multi_mid->get_oid($num, $self) or next;
- reindex_oid_m($self, $sync, $git, $oid, $num);
- }
- }
- }
- $git->cleanup if $git;
+ # work forwards through history
+ index_epoch($self, $sync, $_) for (0..$epoch_max);
$self->done;
if (my $nr = $sync->{nr}) {
^ permalink raw reply related [flat|nested] 21+ messages in thread
* [PATCH 03/20] v2writable: introduce idx_stack
2020-07-24 5:55 [PATCH 00/20] indexing changes and new features Eric Wong
2020-07-24 5:55 ` [PATCH 01/20] index: support --rethread switch to fix old indices Eric Wong
2020-07-24 5:55 ` [PATCH 02/20] v2: index forwards (via `git log --reverse') Eric Wong
@ 2020-07-24 5:55 ` Eric Wong
2020-07-24 5:55 ` [PATCH 04/20] v2writable: index_sync: reduce fill_alternates calls Eric Wong
` (16 subsequent siblings)
19 siblings, 0 replies; 21+ messages in thread
From: Eric Wong @ 2020-07-24 5:55 UTC (permalink / raw)
To: meta
This avoids pinning a potentially large chunk of memory from
`git-log --reverse' into RAM (or triggering less predictable
swap behavior). Instead it uses a contiguous temporary file
with a fixed-size record for every blob we'll need to index.
---
MANIFEST | 2 +
lib/PublicInbox/IdxStack.pm | 52 ++++++++++++++++
lib/PublicInbox/V2Writable.pm | 114 ++++++++++++++++++++--------------
t/idx_stack.t | 56 +++++++++++++++++
4 files changed, 176 insertions(+), 48 deletions(-)
create mode 100644 lib/PublicInbox/IdxStack.pm
create mode 100644 t/idx_stack.t
diff --git a/MANIFEST b/MANIFEST
index 9d90c8c23..f46a0776d 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -138,6 +138,7 @@ lib/PublicInbox/IMAPD.pm
lib/PublicInbox/IMAPTracker.pm
lib/PublicInbox/IMAPdeflate.pm
lib/PublicInbox/IMAPsearchqp.pm
+lib/PublicInbox/IdxStack.pm
lib/PublicInbox/Import.pm
lib/PublicInbox/In2Tie.pm
lib/PublicInbox/Inbox.pm
@@ -277,6 +278,7 @@ t/httpd-https.t
t/httpd-unix.t
t/httpd.t
t/hval.t
+t/idx_stack.t
t/imap.t
t/imap_searchqp.t
t/imap_tracker.t
diff --git a/lib/PublicInbox/IdxStack.pm b/lib/PublicInbox/IdxStack.pm
new file mode 100644
index 000000000..b43b8064e
--- /dev/null
+++ b/lib/PublicInbox/IdxStack.pm
@@ -0,0 +1,52 @@
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# temporary stack for public-inbox-index
+package PublicInbox::IdxStack;
+use v5.10.1;
+use strict;
+use Fcntl qw(:seek);
+use constant FMT => eval { pack('Q', 1) } ? 'A1QQH*' : 'A1IIH*';
+
+# start off in write-only mode
+sub new {
+ open(my $io, '+>', undef) or die "open: $!";
+ bless { wr => $io, latest_cmt => $_[1] }, __PACKAGE__
+}
+
+# file_char = [a|m]
+sub push_rec {
+ my ($self, $file_char, $at, $ct, $blob_oid) = @_;
+ my $rec = pack(FMT, $file_char, $at, $ct, $blob_oid);
+ $self->{rec_size} //= length($rec);
+ print { $self->{wr} } $rec or die "print: $!";
+ $self->{tot_size} += length($rec);
+}
+
+sub num_records {
+ my ($self) = @_;
+ $self->{rec_size} ? $self->{tot_size} / $self->{rec_size} : 0;
+}
+
+# switch into read-only mode and returns self
+sub read_prepare {
+ my ($self) = @_;
+ my $io = $self->{rd} = delete($self->{wr});
+ $io->flush or die "flush: $!";
+ $self;
+}
+
+sub pop_rec {
+ my ($self) = @_;
+ my $sz = $self->{rec_size} or return;
+ my $rec_pos = $self->{tot_size} -= $sz;
+ return if $rec_pos < 0;
+ my $io = $self->{rd};
+ seek($io, $rec_pos, SEEK_SET) or die "seek: $!";
+ my $r = read($io, my $buf, $sz);
+ defined($r) or die "read: $!";
+ $r == $sz or die "read($r != $sz)";
+ unpack(FMT, $buf);
+}
+
+1;
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index c04ea5d77..04c91e5dd 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -8,6 +8,7 @@ use strict;
use v5.10.1;
use parent qw(PublicInbox::Lock);
use PublicInbox::SearchIdxShard;
+use PublicInbox::IdxStack;
use PublicInbox::Eml;
use PublicInbox::Git;
use PublicInbox::Import;
@@ -881,9 +882,6 @@ sub reindex_checkpoint ($$$) {
sub reindex_oid ($$$$) {
my ($self, $sync, $git, $oid) = @_;
- if (my $D = $sync->{D}) { # don't waste I/O on deletes
- return if $D->{pack('H*', $oid)};
- }
return if PublicInbox::SearchIdx::too_big($self, $git, $oid);
my ($num, $mid0, $len);
my $msgref = $git->cat_file($oid, \$len);
@@ -1035,20 +1033,45 @@ $range
$range;
}
-# don't bump num_highwater on --reindex
-sub mark_deleted ($$$) {
+sub prepare_range_stack {
my ($git, $sync, $range) = @_;
- my $D = $sync->{D} //= {}; # pack("H*", $oid) => NR
- my $fh = $git->popen(qw(log --raw --no-abbrev
- --pretty=tformat:%H
- --no-notes --no-color --no-renames
- --diff-filter=AM), $range, '--', 'd');
+ # Don't bump num_highwater on --reindex by using {D}.
+ # We intentionally do NOT use {D} in the non-reindex case because
+ # we want NNTP article number gaps from unindexed messages to
+ # show up in mirrors, too.
+ my $D = $sync->{D} //= $sync->{reindex} ? {} : undef; # OID_BIN => NR
+
+ my $fh = $git->popen(qw(log --raw -r --pretty=tformat:%at-%ct-%H
+ --no-notes --no-color --no-renames --no-abbrev),
+ $range);
+ my ($at, $ct, $stk);
while (<$fh>) {
- if (/\A:\d{6} 100644 $x40 ($x40) [AM]\td$/o) {
- $D->{pack('H*', $1)}++;
+ if (/\A([0-9]+)-([0-9]+)-($x40)$/o) {
+ ($at, $ct) = ($1 + 0, $2 + 0);
+ $stk //= PublicInbox::IdxStack->new($3);
+ } elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\td$/o) {
+ my $oid = $1;
+ if ($D) { # reindex case
+ $D->{pack('H*', $oid)}++;
+ } else { # non-reindex case:
+ $stk->push_rec('d', $at, $ct, $oid);
+ }
+ } elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\tm$/o) {
+ my $oid = $1;
+ if ($D) {
+ my $oid_bin = pack('H*', $oid);
+ my $nr = --$D->{$oid_bin};
+ delete($D->{$oid_bin}) if $nr <= 0;
+
+ # nr < 0 (-1) means it never existed
+ $stk->push_rec('m', $at, $ct, $oid) if $nr < 0;
+ } else {
+ $stk->push_rec('m', $at, $ct, $oid);
+ }
}
}
close $fh or die "git log failed: \$?=$?";
+ $stk ? $stk->read_prepare : undef;
}
sub sync_prepare ($$$) {
@@ -1061,7 +1084,7 @@ sub sync_prepare ($$$) {
# without {reindex}
my $reindex_heads = last_commits($self, $epoch_max) if $sync->{reindex};
- for my $i (0..$epoch_max) {
+ for (my $i = $epoch_max; $i >= 0; $i--) {
die 'BUG: already indexing!' if $self->{reindex_pipe};
my $git_dir = git_dir_n($self, $i);
-d $git_dir or next; # missing epochs are fine
@@ -1077,15 +1100,24 @@ sub sync_prepare ($$$) {
# can't use 'rev-list --count' if we use --diff-filter
$pr->("$i.git counting $range ... ") if $pr;
- my $n = 0;
- my $fh = $git->popen(qw(log --pretty=tformat:%H
- --no-notes --no-color --no-renames
- --diff-filter=AM), $range, '--', 'm');
- ++$n while <$fh>;
- close $fh or die "git log failed: \$?=$?";
- $pr->("$n\n") if $pr;
- $regen_max += $n;
- mark_deleted($git, $sync, $range) if $sync->{reindex};
+ my $stk = prepare_range_stack($git, $sync, $range);
+ my $nr = $stk ? $stk->num_records : 0;
+ $pr->("$nr\n") if $pr;
+ $sync->{stacks}->[$i] = $stk if $stk;
+ $regen_max += $nr;
+ }
+
+ # XXX this should not happen unless somebody bypasses checks in
+ # our code and blindly injects "d" file history into git repos
+ if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
+ warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
+ my $git = $self->{-inbox}->git;
+ for my $oid (@leftovers) {
+ $oid = unpack('H*', $oid);
+ $self->{current_info} = "leftover $oid";
+ unindex_oid($self, $git, $oid);
+ }
+ $git->cleanup;
}
return 0 if (!$regen_max && !keys(%{$self->{unindex_range}}));
@@ -1186,38 +1218,24 @@ sub index_epoch ($$$) {
if (my $unindex_range = delete $sync->{unindex_range}->{$i}) {
unindex($self, $sync, $git, $unindex_range);
}
- defined(my $range = $sync->{ranges}->[$i]) or return;
+ defined(my $stk = $sync->{stacks}->[$i]) or return;
+ $sync->{stacks}->[$i] = undef;
+ my $range = $sync->{ranges}->[$i];
if (my $pr = $sync->{-opt}->{-progress}) {
$pr->("$i.git indexing $range\n");
}
- my @cmd = qw(log --reverse --raw -r --pretty=tformat:%H.%at.%ct
- --no-notes --no-color --no-abbrev --no-renames);
- my $fh = $self->{reindex_pipe} = $git->popen(@cmd, $range);
- my $cmt;
- my $D = $sync->{D};
- while (<$fh>) {
- chomp;
- $self->{current_info} = "$i.git $_";
- if (/\A($x40)\.([0-9]+)\.([0-9]+)$/o) {
- $cmt = $1;
- $self->{autime} = $2;
- $self->{cotime} = $3;
- } elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\tm$/o) {
- reindex_oid($self, $sync, $git, $1);
- } elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\td$/o) {
- # allow re-add if there was user error
- my $oid = $1;
- if ($D) {
- my $oid_bin = pack('H*', $oid);
- my $nr = --$D->{$oid_bin};
- delete($D->{$oid_bin}) if $nr <= 0;
- }
+ while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
+ $self->{current_info} = "$i.git $oid";
+ if ($f eq 'm') {
+ $self->{autime} = $at;
+ $self->{cotime} = $ct;
+ reindex_oid($self, $sync, $git, $oid);
+ } elsif ($f eq 'd') {
unindex_oid($self, $git, $oid);
}
}
- close $fh or die "git log failed: \$?=$?";
- delete @$self{qw(reindex_pipe autime cotime)};
- update_last_commit($self, $git, $i, $cmt) if defined $cmt;
+ delete @$self{qw(autime cotime)};
+ update_last_commit($self, $git, $i, $stk->{latest_cmt});
}
# public, called by public-inbox-index
diff --git a/t/idx_stack.t b/t/idx_stack.t
new file mode 100644
index 000000000..35aff37b7
--- /dev/null
+++ b/t/idx_stack.t
@@ -0,0 +1,56 @@
+#!perl -w
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use Test::More;
+use_ok 'PublicInbox::IdxStack';
+my $oid_a = '03c21563cf15c241687966b5b2a3f37cdc193316';
+my $oid_b = '963caad026055ab9bcbe3ee9550247f9d8840feb';
+
+my $stk = PublicInbox::IdxStack->new;
+is($stk->read_prepare, $stk, 'nothing');
+is($stk->num_records, 0, 'no records');
+is($stk->pop_rec, undef, 'undef on empty');
+
+$stk = PublicInbox::IdxStack->new;
+$stk->push_rec('m', 1234, 5678, $oid_a);
+is($stk->read_prepare, $stk, 'read_prepare');
+is($stk->num_records, 1, 'num_records');
+is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a], 'pop once');
+is($stk->pop_rec, undef, 'undef on empty');
+
+$stk = PublicInbox::IdxStack->new;
+$stk->push_rec('m', 1234, 5678, $oid_a);
+$stk->push_rec('d', 1234, 5678, $oid_b);
+is($stk->read_prepare, $stk, 'read_prepare');
+is($stk->num_records, 2, 'num_records');
+is_deeply([$stk->pop_rec], ['d', 1234, 5678, $oid_b], 'pop');
+is_deeply([$stk->pop_rec], ['m', 1234, 5678, $oid_a], 'pop-pop');
+is($stk->pop_rec, undef, 'empty');
+
+SKIP: {
+ $stk = undef;
+ my $nr = $ENV{TEST_GIT_LOG} or skip 'TEST_GIT_LOG unset', 3;
+ open my $fh, '-|', qw(git log --pretty=tformat:%at.%ct.%H), "-$nr" or
+ die "git log: $!";
+ my @expect;
+ while (<$fh>) {
+ chomp;
+ my ($at, $ct, $H) = split(/\./);
+ $stk //= PublicInbox::IdxStack->new($H);
+ # not bothering to parse blobs here, just using commit OID
+ # as a blob OID since they're the same size + format
+ $stk->push_rec('m', $at + 0, $ct + 0, $H);
+ push(@expect, [ 'm', $at, $ct, $H ]);
+ }
+ $stk or skip('nothing from git log', 3);
+ is($stk->read_prepare, $stk, 'read_prepare');
+ is($stk->num_records, scalar(@expect), 'num_records matches expected');
+ my @result;
+ while (my @tmp = $stk->pop_rec) {
+ unshift @result, \@tmp;
+ }
+ is_deeply(\@result, \@expect, 'results match expected');
+}
+
+done_testing;
^ permalink raw reply related [flat|nested] 21+ messages in thread
* [PATCH 04/20] v2writable: index_sync: reduce fill_alternates calls
2020-07-24 5:55 [PATCH 00/20] indexing changes and new features Eric Wong
` (2 preceding siblings ...)
2020-07-24 5:55 ` [PATCH 03/20] v2writable: introduce idx_stack Eric Wong
@ 2020-07-24 5:55 ` Eric Wong
2020-07-24 5:55 ` [PATCH 05/20] v2writable: move {autime} and {cotime} into $sync state Eric Wong
` (15 subsequent siblings)
19 siblings, 0 replies; 21+ messages in thread
From: Eric Wong @ 2020-07-24 5:55 UTC (permalink / raw)
To: meta
Instead of doing fill_alternates for every epoch we're indexing,
just do it once at the start of index_sync invocation. This
will set us up for using a single "git cat-file" process for
indexing multiple epochs.
---
lib/PublicInbox/V2Writable.pm | 11 +++++------
1 file changed, 5 insertions(+), 6 deletions(-)
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 04c91e5dd..c896dc0ed 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -682,10 +682,7 @@ sub fill_alternates ($$) {
my $pfx = "$self->{-inbox}->{inboxdir}/git";
my $all = "$self->{-inbox}->{inboxdir}/all.git";
-
- unless (-d $all) {
- PublicInbox::Import::init_bare($all);
- }
+ PublicInbox::Import::init_bare($all) unless -d $all;
my $info_dir = "$all/objects/info";
my $alt = "$info_dir/alternates";
my (%alt, $new);
@@ -695,7 +692,9 @@ sub fill_alternates ($$) {
$mode = (stat($fh))[2] & 07777;
# we assign a sort score to every alternate and favor
- # the newest (highest numbered) one when we
+ # the newest (highest numbered) one because loose objects
+ # require scanning epochs and only the latest epoch is
+ # expected to see loose objects
my $score;
my $other = 0; # in case admin adds non-epoch repos
%alt = map {;
@@ -1213,7 +1212,6 @@ sub index_epoch ($$$) {
my $git_dir = git_dir_n($self, $i);
die 'BUG: already reindexing!' if $self->{reindex_pipe};
-d $git_dir or return; # missing epochs are fine
- fill_alternates($self, $i);
my $git = PublicInbox::Git->new($git_dir);
if (my $unindex_range = delete $sync->{unindex_range}->{$i}) {
unindex($self, $sync, $git, $unindex_range);
@@ -1247,6 +1245,7 @@ sub index_sync {
my $latest = git_dir_latest($self, \$epoch_max);
return unless defined $latest;
$self->idx_init($opt); # acquire lock
+ fill_alternates($self, $epoch_max);
$self->{over}->rethread_prepare($opt);
my $sync = {
unindex_range => {}, # EPOCH => oid_old..oid_new
^ permalink raw reply related [flat|nested] 21+ messages in thread
* [PATCH 05/20] v2writable: move {autime} and {cotime} into $sync state
2020-07-24 5:55 [PATCH 00/20] indexing changes and new features Eric Wong
` (3 preceding siblings ...)
2020-07-24 5:55 ` [PATCH 04/20] v2writable: index_sync: reduce fill_alternates calls Eric Wong
@ 2020-07-24 5:55 ` Eric Wong
2020-07-24 5:55 ` [PATCH 06/20] v2writable: allow >= 40 byte git object IDs Eric Wong
` (14 subsequent siblings)
19 siblings, 0 replies; 21+ messages in thread
From: Eric Wong @ 2020-07-24 5:55 UTC (permalink / raw)
To: meta
The V2Writable object may be long-lived, so it makes more
sense to put the {autime} and {cotime} fields into the
shorter-lived index_sync state.
---
lib/PublicInbox/Smsg.pm | 8 ++++----
lib/PublicInbox/V2Writable.pm | 12 ++++++------
2 files changed, 10 insertions(+), 10 deletions(-)
diff --git a/lib/PublicInbox/Smsg.pm b/lib/PublicInbox/Smsg.pm
index 725d42062..aaf88f355 100644
--- a/lib/PublicInbox/Smsg.pm
+++ b/lib/PublicInbox/Smsg.pm
@@ -96,7 +96,7 @@ sub from_mitem {
# for Import and v1 non-SQLite WWW code paths
sub populate {
- my ($self, $hdr, $v2w) = @_;
+ my ($self, $hdr, $sync) = @_;
for my $f (qw(From To Cc Subject)) {
my @all = $hdr->header($f);
my $val = join(', ', @all);
@@ -117,9 +117,9 @@ sub populate {
}
$self->{$f} = $val if $val ne '';
}
- $v2w //= {};
- $self->{-ds} = [ my @ds = msg_datestamp($hdr, $v2w->{autime}) ];
- $self->{-ts} = [ my @ts = msg_timestamp($hdr, $v2w->{cotime}) ];
+ $sync //= {};
+ $self->{-ds} = [ my @ds = msg_datestamp($hdr, $sync->{autime}) ];
+ $self->{-ts} = [ my @ts = msg_timestamp($hdr, $sync->{cotime}) ];
$self->{ds} //= $ds[0]; # no zone
$self->{ts} //= $ts[0];
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index c896dc0ed..4dc6880b4 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -564,8 +564,8 @@ W: $list
num => $smsg->{num},
mid => $smsg->{mid},
}, 'PublicInbox::Smsg';
- my $v2w = { autime => $smsg->{ds}, cotime => $smsg->{ts} };
- $new_smsg->populate($new_mime, $v2w);
+ my $sync = { autime => $smsg->{ds}, cotime => $smsg->{ts} };
+ $new_smsg->populate($new_mime, $sync);
do_idx($self, \$raw, $new_mime, $new_smsg);
}
$rewritten->{rewrites};
@@ -950,7 +950,7 @@ sub reindex_oid ($$$$) {
blob => $oid,
mid => $mid0,
}, 'PublicInbox::Smsg';
- $smsg->populate($mime, $self);
+ $smsg->populate($mime, $sync);
if (do_idx($self, $msgref, $mime, $smsg)) {
reindex_checkpoint($self, $sync, $git);
}
@@ -1225,14 +1225,14 @@ sub index_epoch ($$$) {
while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
$self->{current_info} = "$i.git $oid";
if ($f eq 'm') {
- $self->{autime} = $at;
- $self->{cotime} = $ct;
+ $sync->{autime} = $at;
+ $sync->{cotime} = $ct;
reindex_oid($self, $sync, $git, $oid);
} elsif ($f eq 'd') {
unindex_oid($self, $git, $oid);
}
}
- delete @$self{qw(autime cotime)};
+ delete @$sync{qw(autime cotime)};
update_last_commit($self, $git, $i, $stk->{latest_cmt});
}
^ permalink raw reply related [flat|nested] 21+ messages in thread
* [PATCH 06/20] v2writable: allow >= 40 byte git object IDs
2020-07-24 5:55 [PATCH 00/20] indexing changes and new features Eric Wong
` (4 preceding siblings ...)
2020-07-24 5:55 ` [PATCH 05/20] v2writable: move {autime} and {cotime} into $sync state Eric Wong
@ 2020-07-24 5:55 ` Eric Wong
2020-07-24 5:55 ` [PATCH 07/20] v2writable: drop "EPOCH.git indexing $RANGE" progress Eric Wong
` (13 subsequent siblings)
19 siblings, 0 replies; 21+ messages in thread
From: Eric Wong @ 2020-07-24 5:55 UTC (permalink / raw)
To: meta
Another step in slowly updating our code to support SHA-256 or
whatever other hash algorithms git may support in the future.
---
lib/PublicInbox/V2Writable.pm | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 4dc6880b4..50582266b 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -22,7 +22,7 @@ use PublicInbox::SearchIdx;
use IO::Handle; # ->autoflush
use File::Temp qw(tempfile);
-my $x40 = qr/[a-f0-9]{40}/;
+my $OID = qr/[a-f0-9]{40,}/;
# an estimate of the post-packed size to the raw uncompressed size
my $PACKING_FACTOR = 0.4;
@@ -492,7 +492,7 @@ sub git_hash_raw ($$) {
local $/ = "\n";
chomp(my $oid = <$r>);
close $r or die "git hash-object failed: $?";
- $oid =~ /\A[a-f0-9]{40}\z/ or die "OID not expected: $oid";
+ $oid =~ /\A$OID\z/ or die "OID not expected: $oid";
$oid;
}
@@ -1045,17 +1045,17 @@ sub prepare_range_stack {
$range);
my ($at, $ct, $stk);
while (<$fh>) {
- if (/\A([0-9]+)-([0-9]+)-($x40)$/o) {
+ if (/\A([0-9]+)-([0-9]+)-($OID)$/o) {
($at, $ct) = ($1 + 0, $2 + 0);
$stk //= PublicInbox::IdxStack->new($3);
- } elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\td$/o) {
+ } elsif (/\A:\d{6} 100644 $OID ($OID) [AM]\td$/o) {
my $oid = $1;
if ($D) { # reindex case
$D->{pack('H*', $oid)}++;
} else { # non-reindex case:
$stk->push_rec('d', $at, $ct, $oid);
}
- } elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\tm$/o) {
+ } elsif (/\A:\d{6} 100644 $OID ($OID) [AM]\tm$/o) {
my $oid = $1;
if ($D) {
my $oid_bin = pack('H*', $oid);
@@ -1177,7 +1177,7 @@ sub unindex ($$$$) {
--no-notes --no-color --no-abbrev --no-renames);
my $fh = $self->{reindex_pipe} = $git->popen(@cmd, $unindex_range);
while (<$fh>) {
- /\A:\d{6} 100644 $x40 ($x40) [AM]\tm$/o or next;
+ /\A:\d{6} 100644 $OID ($OID) [AM]\tm$/o or next;
unindex_oid($self, $git, $1, $unindexed);
}
delete $self->{reindex_pipe};
^ permalink raw reply related [flat|nested] 21+ messages in thread
* [PATCH 07/20] v2writable: drop "EPOCH.git indexing $RANGE" progress
2020-07-24 5:55 [PATCH 00/20] indexing changes and new features Eric Wong
` (5 preceding siblings ...)
2020-07-24 5:55 ` [PATCH 06/20] v2writable: allow >= 40 byte git object IDs Eric Wong
@ 2020-07-24 5:55 ` Eric Wong
2020-07-24 5:55 ` [PATCH 08/20] use consistent {ibx} field for writable code paths Eric Wong
` (12 subsequent siblings)
19 siblings, 0 replies; 21+ messages in thread
From: Eric Wong @ 2020-07-24 5:55 UTC (permalink / raw)
To: meta
It'll be one continuous range with IdxStack.
---
lib/PublicInbox/V2Writable.pm | 6 ------
1 file changed, 6 deletions(-)
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 50582266b..2ff2fc259 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -1095,8 +1095,6 @@ sub sync_prepare ($$$) {
next if $?; # new repo
my $range = log_range($self, $sync, $git, $i, $tip) or next;
- $sync->{ranges}->[$i] = $range;
-
# can't use 'rev-list --count' if we use --diff-filter
$pr->("$i.git counting $range ... ") if $pr;
my $stk = prepare_range_stack($git, $sync, $range);
@@ -1218,10 +1216,6 @@ sub index_epoch ($$$) {
}
defined(my $stk = $sync->{stacks}->[$i]) or return;
$sync->{stacks}->[$i] = undef;
- my $range = $sync->{ranges}->[$i];
- if (my $pr = $sync->{-opt}->{-progress}) {
- $pr->("$i.git indexing $range\n");
- }
while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
$self->{current_info} = "$i.git $oid";
if ($f eq 'm') {
^ permalink raw reply related [flat|nested] 21+ messages in thread
* [PATCH 08/20] use consistent {ibx} field for writable code paths
2020-07-24 5:55 [PATCH 00/20] indexing changes and new features Eric Wong
` (6 preceding siblings ...)
2020-07-24 5:55 ` [PATCH 07/20] v2writable: drop "EPOCH.git indexing $RANGE" progress Eric Wong
@ 2020-07-24 5:55 ` Eric Wong
2020-07-24 5:55 ` [PATCH 09/20] search: avoid copying {inboxdir} Eric Wong
` (11 subsequent siblings)
19 siblings, 0 replies; 21+ messages in thread
From: Eric Wong @ 2020-07-24 5:55 UTC (permalink / raw)
To: meta
This is a step which makes our use of abbreviations more
consistent when referring to PublicInbox::Inbox objects.
We'll also be reducing the number of redundant fields
in SearchIdx and V2Writable code paths to make the
object graph easier-to-follow.
---
lib/PublicInbox/Import.pm | 6 ++--
lib/PublicInbox/SearchIdx.pm | 31 ++++++++++----------
lib/PublicInbox/SearchIdxShard.pm | 6 ++--
lib/PublicInbox/V2Writable.pm | 47 +++++++++++++++----------------
4 files changed, 44 insertions(+), 46 deletions(-)
diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index d565b0a03..b50c662c7 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -35,7 +35,7 @@ sub new {
ident => "$name <$email>",
mark => 1,
ref => $ref,
- -inbox => $ibx,
+ ibx => $ibx,
path_type => '2/38', # or 'v2'
lock_path => "$git->{git_dir}/ssoma.lock", # v2 changes this
bytes_added => 0,
@@ -176,7 +176,7 @@ sub _update_git_info ($$) {
run_die([@cmd, qw(read-tree -m -v -i), $self->{ref}], $env);
}
run_die([@cmd, 'update-server-info']);
- my $ibx = $self->{-inbox};
+ my $ibx = $self->{ibx};
($ibx && $self->{path_type} eq '2/38') and eval {
require PublicInbox::SearchIdx;
my $s = PublicInbox::SearchIdx->new($ibx);
@@ -385,7 +385,7 @@ sub add {
# spam check:
if ($check_cb) {
- $mime = $check_cb->($mime, $self->{-inbox}) or return;
+ $mime = $check_cb->($mime, $self->{ibx}) or return;
}
my $blob = $self->{mark}++;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index e641ffd43..4b1b1736e 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -50,8 +50,7 @@ sub new {
$ibx = PublicInbox::InboxWritable->new($ibx);
my $self = bless {
inboxdir => $inboxdir,
- -inbox => $ibx,
- git => $ibx->git,
+ ibx => $ibx,
-altid => $altid,
ibx_ver => $version,
indexlevel => $indexlevel,
@@ -548,14 +547,14 @@ sub unindex_both { # git->cat_async callback
sub index_sync {
my ($self, $opts) = @_;
delete $self->{lock_path} if $opts->{-skip_lock};
- $self->{-inbox}->with_umask(\&_index_sync, $self, $opts);
+ $self->{ibx}->with_umask(\&_index_sync, $self, $opts);
}
-sub too_big ($$$) {
- my ($self, $git, $oid) = @_;
+sub too_big ($$) {
+ my ($self, $oid) = @_;
my $max_size = $self->{index_max_size} or return;
- my (undef, undef, $size) = $git->check($oid);
- die "E: bad $oid in $git->{git_dir}\n" if !defined($size);
+ my (undef, undef, $size) = $self->{ibx}->git->check($oid);
+ die "E: bad $oid in $self->{ibx}->{inboxdir}\n" if !defined($size);
return if $size <= $max_size;
warn "W: skipping $oid ($size > $max_size)\n";
1;
@@ -568,7 +567,7 @@ sub read_log {
my $h40 = $hex .'{40}';
my $addmsg = qr!^:000000 100644 \S+ ($h40) A\t${hex}{2}/${hex}{38}$!;
my $delmsg = qr!^:100644 000000 ($h40) \S+ D\t${hex}{2}/${hex}{38}$!;
- my $git = $self->{git};
+ my $git = $self->{ibx}->git;
my $latest;
my $max = $BATCH_BYTES;
local $/ = "\n";
@@ -591,7 +590,7 @@ sub read_log {
}
next;
}
- next if too_big($self, $git, $blob);
+ next if too_big($self, $blob);
$git->cat_async($blob, \&index_both, { %$sync });
if ($max <= 0) {
$git->cat_async_wait;
@@ -600,7 +599,7 @@ sub read_log {
}
} elsif ($line =~ /$delmsg/o) {
my $blob = $1;
- $D{$blob} = 1 unless too_big($self, $git, $blob);
+ $D{$blob} = 1 unless too_big($self, $blob);
} elsif ($line =~ /^commit ($h40)/o) {
$latest = $1;
$newest ||= $latest;
@@ -621,7 +620,7 @@ sub read_log {
sub _git_log {
my ($self, $opts, $range) = @_;
- my $git = $self->{git};
+ my $git = $self->{ibx}->git;
if (index($range, '..') < 0) {
# don't show annoying git errors to users who run -index
@@ -681,7 +680,7 @@ sub is_ancestor ($$$) {
sub need_update ($$$) {
my ($self, $cur, $new) = @_;
- my $git = $self->{git};
+ my $git = $self->{ibx}->git;
return 1 if $cur && !is_ancestor($git, $cur, $new);
my $range = $cur eq '' ? $new : "$cur..$new";
chomp(my $n = $git->qx(qw(rev-list --count), $range));
@@ -701,7 +700,7 @@ sub _last_x_commit {
$lx = $lm;
}
# Use last_commit from msgmap if it is older or unset
- if (!$lm || ($lx && $lm && is_ancestor($self->{git}, $lm, $lx))) {
+ if (!$lm || ($lx && $lm && is_ancestor($self->{ibx}->git, $lm, $lx))) {
$lx = $lm;
}
$lx;
@@ -718,7 +717,7 @@ sub _index_sync {
my ($self, $opts) = @_;
my $tip = $opts->{ref} || 'HEAD';
my ($last_commit, $lx, $xlog);
- my $git = $self->{git};
+ my $git = $self->{ibx}->git;
$git->batch_prepare;
my $pr = $opts->{-progress};
@@ -830,7 +829,7 @@ sub _begin_txn {
sub begin_txn_lazy {
my ($self) = @_;
- $self->{-inbox}->with_umask(\&_begin_txn, $self) if !$self->{txn};
+ $self->{ibx}->with_umask(\&_begin_txn, $self) if !$self->{txn};
}
# store 'indexlevel=medium' in v2 shard=0 and v1 (only one shard)
@@ -860,7 +859,7 @@ sub _commit_txn {
sub commit_txn_lazy {
my ($self) = @_;
delete($self->{txn}) and
- $self->{-inbox}->with_umask(\&_commit_txn, $self);
+ $self->{ibx}->with_umask(\&_commit_txn, $self);
}
sub worker_done {
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
index 544268819..fd34e487b 100644
--- a/lib/PublicInbox/SearchIdxShard.pm
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -11,14 +11,14 @@ use IO::Handle (); # autoflush
use PublicInbox::Eml;
sub new {
- my ($class, $v2writable, $shard) = @_;
- my $ibx = $v2writable->{-inbox};
+ my ($class, $v2w, $shard) = @_;
+ my $ibx = $v2w->{ibx};
my $self = $class->SUPER::new($ibx, 1, $shard);
# create the DB before forking:
$self->_xdb_acquire;
$self->set_indexlevel;
$self->_xdb_release;
- $self->spawn_worker($v2writable, $shard) if $v2writable->{parallel};
+ $self->spawn_worker($v2w, $shard) if $v2w->{parallel};
$self;
}
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 2ff2fc259..a1986a469 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -109,7 +109,7 @@ sub new {
my $xpfx = "$dir/xap" . PublicInbox::Search::SCHEMA_VERSION;
my $self = {
- -inbox => $v2ibx,
+ ibx => $v2ibx,
im => undef, # PublicInbox::Import
parallel => 1,
transact_bytes => 0,
@@ -149,7 +149,7 @@ sub init_inbox {
# mimics Import::add and wraps it for v2
sub add {
my ($self, $eml, $check_cb) = @_;
- $self->{-inbox}->with_umask(\&_add, $self, $eml, $check_cb);
+ $self->{ibx}->with_umask(\&_add, $self, $eml, $check_cb);
}
# indexes a message, returns true if checkpointing is needed
@@ -169,7 +169,7 @@ sub _add {
# spam check:
if ($check_cb) {
- $mime = $check_cb->($mime, $self->{-inbox}) or return;
+ $mime = $check_cb->($mime, $self->{ibx}) or return;
}
# All pipes (> $^F) known to Perl 5.6+ have FD_CLOEXEC set,
@@ -218,7 +218,7 @@ sub v2_num_for {
# AltId may pre-populate article numbers (e.g. X-Mail-Count
# or NNTP article number), use that article number if it's
# not in Over.
- my $altid = $self->{-inbox}->{altid};
+ my $altid = $self->{ibx}->{altid};
if ($altid && grep(/:file=msgmap\.sqlite3\z/, @$altid)) {
my $num = $self->{mm}->num_for($mid);
@@ -293,7 +293,7 @@ sub _idx_init { # with_umask callback
# Now that all subprocesses are up, we can open the FDs
# for SQLite:
my $mm = $self->{mm} = PublicInbox::Msgmap->new_file(
- "$self->{-inbox}->{inboxdir}/msgmap.sqlite3", 1);
+ "$self->{ibx}->{inboxdir}/msgmap.sqlite3", 1);
$mm->{dbh}->begin_work;
}
@@ -301,7 +301,7 @@ sub _idx_init { # with_umask callback
sub idx_init {
my ($self, $opt) = @_;
return if $self->{idx_shards};
- my $ibx = $self->{-inbox};
+ my $ibx = $self->{ibx};
# do not leak read-only FDs to child processes, we only have these
# FDs for duplicate detection so they should not be
@@ -329,7 +329,7 @@ sub idx_init {
sub _replace_oids ($$$) {
my ($self, $mime, $replace_map) = @_;
$self->done;
- my $pfx = "$self->{-inbox}->{inboxdir}/git";
+ my $pfx = "$self->{ibx}->{inboxdir}/git";
my $rewrites = []; # epoch => commit
my $max = $self->{epoch_max};
@@ -450,7 +450,7 @@ sub rewrite_internal ($$;$$$) {
# (retval[2]) is not part of the stable API shared with Import->remove
sub remove {
my ($self, $eml, $cmt_msg) = @_;
- my $r = $self->{-inbox}->with_umask(\&rewrite_internal,
+ my $r = $self->{ibx}->with_umask(\&rewrite_internal,
$self, $eml, $cmt_msg);
defined($r) && defined($r->[0]) ? @$r: undef;
}
@@ -458,7 +458,7 @@ sub remove {
sub _replace ($$;$$) {
my ($self, $old_eml, $new_eml, $sref) = @_;
my $arg = [ $self, $old_eml, undef, $new_eml, $sref ];
- my $rewritten = $self->{-inbox}->with_umask(\&rewrite_internal,
+ my $rewritten = $self->{ibx}->with_umask(\&rewrite_internal,
$self, $old_eml, undef, $new_eml, $sref) or return;
my $rewrites = $rewritten->{rewrites};
@@ -484,7 +484,7 @@ sub git_hash_raw ($$) {
my ($self, $raw) = @_;
# grab the expected OID we have to reindex:
pipe(my($in, $w)) or die "pipe: $!";
- my $git_dir = $self->{-inbox}->git->{git_dir};
+ my $git_dir = $self->{ibx}->git->{git_dir};
my $cmd = ['git', "--git-dir=$git_dir", qw(hash-object --stdin)];
my $r = popen_rd($cmd, undef, { 0 => $in });
print $w $$raw or die "print \$w: $!";
@@ -550,11 +550,11 @@ W: $list
}
# make sure we really got the OID:
- my ($blob, $type, $bytes) = $self->{-inbox}->git->check($expect_oid);
+ my ($blob, $type, $bytes) = $self->{ibx}->git->check($expect_oid);
$blob eq $expect_oid or die "BUG: $expect_oid not found after replace";
# don't leak FDs to Xapian:
- $self->{-inbox}->git->cleanup;
+ $self->{ibx}->git->cleanup;
# reindex modified messages:
for my $smsg (@$need_reindex) {
@@ -674,14 +674,14 @@ sub done {
my $nbytes = $self->{total_bytes};
$self->{total_bytes} = 0;
$self->lock_release(!!$nbytes) if $shards;
- $self->{-inbox}->git->cleanup;
+ $self->{ibx}->git->cleanup;
}
sub fill_alternates ($$) {
my ($self, $epoch) = @_;
- my $pfx = "$self->{-inbox}->{inboxdir}/git";
- my $all = "$self->{-inbox}->{inboxdir}/all.git";
+ my $pfx = "$self->{ibx}->{inboxdir}/git";
+ my $all = "$self->{ibx}->{inboxdir}/all.git";
PublicInbox::Import::init_bare($all) unless -d $all;
my $info_dir = "$all/objects/info";
my $alt = "$info_dir/alternates";
@@ -726,7 +726,7 @@ sub fill_alternates ($$) {
sub git_init {
my ($self, $epoch) = @_;
- my $git_dir = "$self->{-inbox}->{inboxdir}/git/$epoch.git";
+ my $git_dir = "$self->{ibx}->{inboxdir}/git/$epoch.git";
PublicInbox::Import::init_bare($git_dir);
my @cmd = (qw/git config/, "--file=$git_dir/config",
'include.path', '../../all.git/config');
@@ -738,7 +738,7 @@ sub git_init {
sub git_dir_latest {
my ($self, $max) = @_;
$$max = -1;
- my $pfx = "$self->{-inbox}->{inboxdir}/git";
+ my $pfx = "$self->{ibx}->{inboxdir}/git";
return unless -d $pfx;
my $latest;
opendir my $dh, $pfx or die "opendir $pfx: $!\n";
@@ -790,7 +790,7 @@ sub importer {
sub import_init {
my ($self, $git, $packed_bytes, $tmp) = @_;
- my $im = PublicInbox::Import->new($git, undef, undef, $self->{-inbox});
+ my $im = PublicInbox::Import->new($git, undef, undef, $self->{ibx});
$im->{bytes_added} = int($packed_bytes / $PACKING_FACTOR);
$im->{lock_path} = undef;
$im->{path_type} = 'v2';
@@ -823,8 +823,7 @@ sub get_blob ($$) {
return $msg if $msg;
}
# older message, should be in alternates
- my $ibx = $self->{-inbox};
- $ibx->msg_by_smsg($smsg);
+ $self->{ibx}->msg_by_smsg($smsg);
}
sub content_exists ($$$) {
@@ -881,7 +880,7 @@ sub reindex_checkpoint ($$$) {
sub reindex_oid ($$$$) {
my ($self, $sync, $git, $oid) = @_;
- return if PublicInbox::SearchIdx::too_big($self, $git, $oid);
+ return if PublicInbox::SearchIdx::too_big($self, $oid);
my ($num, $mid0, $len);
my $msgref = $git->cat_file($oid, \$len);
return if $len == 0; # purged
@@ -968,7 +967,7 @@ sub update_last_commit ($$$$) {
last_epoch_commit($self, $i, $cmt);
}
-sub git_dir_n ($$) { "$_[0]->{-inbox}->{inboxdir}/git/$_[1].git" }
+sub git_dir_n ($$) { "$_[0]->{ibx}->{inboxdir}/git/$_[1].git" }
sub last_commits ($$) {
my ($self, $epoch_max) = @_;
@@ -1077,7 +1076,7 @@ sub sync_prepare ($$$) {
my ($self, $sync, $epoch_max) = @_;
my $pr = $sync->{-opt}->{-progress};
my $regen_max = 0;
- my $head = $self->{-inbox}->{ref_head} || 'refs/heads/master';
+ my $head = $self->{ibx}->{ref_head} || 'refs/heads/master';
# reindex stops at the current heads and we later rerun index_sync
# without {reindex}
@@ -1108,7 +1107,7 @@ sub sync_prepare ($$$) {
# our code and blindly injects "d" file history into git repos
if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
- my $git = $self->{-inbox}->git;
+ my $git = $self->{ibx}->git;
for my $oid (@leftovers) {
$oid = unpack('H*', $oid);
$self->{current_info} = "leftover $oid";
^ permalink raw reply related [flat|nested] 21+ messages in thread
* [PATCH 09/20] search: avoid copying {inboxdir}
2020-07-24 5:55 [PATCH 00/20] indexing changes and new features Eric Wong
` (7 preceding siblings ...)
2020-07-24 5:55 ` [PATCH 08/20] use consistent {ibx} field for writable code paths Eric Wong
@ 2020-07-24 5:55 ` Eric Wong
2020-07-24 5:55 ` [PATCH 10/20] v2writable: use read-only PublicInbox::Git for cat_file Eric Wong
` (10 subsequent siblings)
19 siblings, 0 replies; 21+ messages in thread
From: Eric Wong @ 2020-07-24 5:55 UTC (permalink / raw)
To: meta
Instead, storing {xdir} will allow us to avoid string
concatenation in the read-only path and save us a little
hash entry space.
---
lib/PublicInbox/Search.pm | 25 +++++++++++++++----------
lib/PublicInbox/SearchIdx.pm | 5 +++--
2 files changed, 18 insertions(+), 12 deletions(-)
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index 55eee41ca..4e08aed7a 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -164,15 +164,10 @@ chomp @HELP;
sub xdir ($;$) {
my ($self, $rdonly) = @_;
- if ($self->{ibx_ver} == 1) {
- "$self->{inboxdir}/public-inbox/xapian" . SCHEMA_VERSION;
- } else {
- my $dir = "$self->{inboxdir}/xap" . SCHEMA_VERSION;
- return $dir if $rdonly;
-
- my $shard = $self->{shard};
- defined $shard or die "shard not given";
- $dir .= "/$shard";
+ if ($rdonly || !defined($self->{shard})) {
+ $self->{xpfx};
+ } else { # v2 only:
+ "$self->{xpfx}/$self->{shard}";
}
}
@@ -220,14 +215,24 @@ sub xdb ($) {
};
}
+sub xpfx_init ($) {
+ my ($self) = @_;
+ if ($self->{ibx_ver} == 1) {
+ $self->{xpfx} .= '/public-inbox/xapian' . SCHEMA_VERSION;
+ } else {
+ $self->{xpfx} .= '/xap'.SCHEMA_VERSION;
+ }
+}
+
sub new {
my ($class, $ibx) = @_;
ref $ibx or die "BUG: expected PublicInbox::Inbox object: $ibx";
my $self = bless {
- inboxdir => $ibx->{inboxdir},
+ xpfx => $ibx->{inboxdir}, # for xpfx_init
altid => $ibx->{altid},
ibx_ver => $ibx->version,
}, $class;
+ xpfx_init($self);
my $dir = xdir($self, 1);
$self->{over_ro} = PublicInbox::Over->new("$dir/over.sqlite3");
$self;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 4b1b1736e..2d53b2d03 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -49,12 +49,13 @@ sub new {
}
$ibx = PublicInbox::InboxWritable->new($ibx);
my $self = bless {
- inboxdir => $inboxdir,
ibx => $ibx,
+ xpfx => $inboxdir, # for xpfx_init
-altid => $altid,
ibx_ver => $version,
indexlevel => $indexlevel,
}, $class;
+ $self->xpfx_init;
$self->{-set_indexlevel_once} = 1 if $indexlevel eq 'medium';
$ibx->umask_prepare;
if ($version == 1) {
@@ -371,7 +372,7 @@ sub _msgmap_init ($) {
die "BUG: _msgmap_init is only for v1\n" if $self->{ibx_ver} != 1;
$self->{mm} //= eval {
require PublicInbox::Msgmap;
- PublicInbox::Msgmap->new($self->{inboxdir}, 1);
+ PublicInbox::Msgmap->new($self->{ibx}->{inboxdir}, 1);
};
}
^ permalink raw reply related [flat|nested] 21+ messages in thread
* [PATCH 10/20] v2writable: use read-only PublicInbox::Git for cat_file
2020-07-24 5:55 [PATCH 00/20] indexing changes and new features Eric Wong
` (8 preceding siblings ...)
2020-07-24 5:55 ` [PATCH 09/20] search: avoid copying {inboxdir} Eric Wong
@ 2020-07-24 5:55 ` Eric Wong
2020-07-24 5:55 ` [PATCH 11/20] v2writable: get rid of {reindex_pipe} field Eric Wong
` (9 subsequent siblings)
19 siblings, 0 replies; 21+ messages in thread
From: Eric Wong @ 2020-07-24 5:55 UTC (permalink / raw)
To: meta
We can reduce the number of parameters we pass around on stack
and make our read-write and read-only code paths more uniform.
---
lib/PublicInbox/V2Writable.pm | 32 ++++++++++++++------------------
1 file changed, 14 insertions(+), 18 deletions(-)
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index a1986a469..a85f9e1f8 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -861,16 +861,14 @@ sub atfork_child {
$self->{bnote}->[1];
}
-sub reindex_checkpoint ($$$) {
- my ($self, $sync, $git) = @_;
+sub reindex_checkpoint ($$) {
+ my ($self, $sync) = @_;
- $git->cleanup;
$sync->{mm_tmp}->atfork_prepare;
$self->done; # release lock
if (my $pr = $sync->{-opt}->{-progress}) {
- my ($bn) = (split('/', $git->{git_dir}))[-1];
- $pr->("$bn ".sprintf($sync->{-regen_fmt}, $sync->{nr}));
+ $pr->(sprintf($sync->{-regen_fmt}, $sync->{nr}));
}
# allow -watch or -mda to write...
@@ -878,11 +876,11 @@ sub reindex_checkpoint ($$$) {
$sync->{mm_tmp}->atfork_parent;
}
-sub reindex_oid ($$$$) {
- my ($self, $sync, $git, $oid) = @_;
+sub reindex_oid ($$$) {
+ my ($self, $sync, $oid) = @_;
return if PublicInbox::SearchIdx::too_big($self, $oid);
my ($num, $mid0, $len);
- my $msgref = $git->cat_file($oid, \$len);
+ my $msgref = $self->{ibx}->git->cat_file($oid, \$len);
return if $len == 0; # purged
my $mime = PublicInbox::Eml->new($$msgref);
my $mids = mids($mime->header_obj);
@@ -951,7 +949,7 @@ sub reindex_oid ($$$$) {
}, 'PublicInbox::Smsg';
$smsg->populate($mime, $sync);
if (do_idx($self, $msgref, $mime, $smsg)) {
- reindex_checkpoint($self, $sync, $git);
+ reindex_checkpoint($self, $sync);
}
}
@@ -1107,13 +1105,11 @@ sub sync_prepare ($$$) {
# our code and blindly injects "d" file history into git repos
if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
- my $git = $self->{ibx}->git;
for my $oid (@leftovers) {
$oid = unpack('H*', $oid);
$self->{current_info} = "leftover $oid";
- unindex_oid($self, $git, $oid);
+ unindex_oid($self, $oid);
}
- $git->cleanup;
}
return 0 if (!$regen_max && !keys(%{$self->{unindex_range}}));
@@ -1135,10 +1131,10 @@ sub unindex_oid_remote ($$$) {
}
}
-sub unindex_oid ($$$;$) {
- my ($self, $git, $oid, $unindexed) = @_;
+sub unindex_oid ($$;$) {
+ my ($self, $oid, $unindexed) = @_;
my $mm = $self->{mm};
- my $msgref = $git->cat_file($oid);
+ my $msgref = $self->{ibx}->git->cat_file($oid);
my $mime = PublicInbox::Eml->new($msgref);
my $mids = mids($mime->header_obj);
$mime = $msgref = undef;
@@ -1175,7 +1171,7 @@ sub unindex ($$$$) {
my $fh = $self->{reindex_pipe} = $git->popen(@cmd, $unindex_range);
while (<$fh>) {
/\A:\d{6} 100644 $OID ($OID) [AM]\tm$/o or next;
- unindex_oid($self, $git, $1, $unindexed);
+ unindex_oid($self, $1, $unindexed);
}
delete $self->{reindex_pipe};
close $fh or die "git log failed: \$?=$?";
@@ -1220,9 +1216,9 @@ sub index_epoch ($$$) {
if ($f eq 'm') {
$sync->{autime} = $at;
$sync->{cotime} = $ct;
- reindex_oid($self, $sync, $git, $oid);
+ reindex_oid($self, $sync, $oid);
} elsif ($f eq 'd') {
- unindex_oid($self, $git, $oid);
+ unindex_oid($self, $oid);
}
}
delete @$sync{qw(autime cotime)};
^ permalink raw reply related [flat|nested] 21+ messages in thread
* [PATCH 11/20] v2writable: get rid of {reindex_pipe} field
2020-07-24 5:55 [PATCH 00/20] indexing changes and new features Eric Wong
` (9 preceding siblings ...)
2020-07-24 5:55 ` [PATCH 10/20] v2writable: use read-only PublicInbox::Git for cat_file Eric Wong
@ 2020-07-24 5:55 ` Eric Wong
2020-07-24 5:55 ` [PATCH 12/20] v2writable: clarify "epoch" comment Eric Wong
` (8 subsequent siblings)
19 siblings, 0 replies; 21+ messages in thread
From: Eric Wong @ 2020-07-24 5:55 UTC (permalink / raw)
To: meta
Since normal per-epoch indexing no longer holds a "git log"
process open, we don't need to worry about not sharing the
pipe with forked shards when we restart the indexer.
While we're in the area, better describe what `unindex' does,
since it's a rarely-used but necessary code path.
---
lib/PublicInbox/V2Writable.pm | 11 ++++-------
1 file changed, 4 insertions(+), 7 deletions(-)
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index a85f9e1f8..c59ead393 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -848,8 +848,6 @@ sub content_exists ($$$) {
sub atfork_child {
my ($self) = @_;
- my $fh = delete $self->{reindex_pipe};
- close $fh if $fh;
if (my $shards = $self->{idx_shards}) {
$_->atfork_child foreach @$shards;
}
@@ -1081,7 +1079,6 @@ sub sync_prepare ($$$) {
my $reindex_heads = last_commits($self, $epoch_max) if $sync->{reindex};
for (my $i = $epoch_max; $i >= 0; $i--) {
- die 'BUG: already indexing!' if $self->{reindex_pipe};
my $git_dir = git_dir_n($self, $i);
-d $git_dir or next; # missing epochs are fine
my $git = PublicInbox::Git->new($git_dir);
@@ -1161,6 +1158,8 @@ sub unindex_oid ($$;$) {
}
}
+# this is rare, it only happens when we get discontiguous history in
+# a mirror because the source used -purge or -edit
sub unindex ($$$$) {
my ($self, $sync, $git, $unindex_range) = @_;
my $unindexed = $self->{unindexed} ||= {}; # $mid0 => $num
@@ -1168,12 +1167,11 @@ sub unindex ($$$$) {
# order does not matter, here:
my @cmd = qw(log --raw -r
--no-notes --no-color --no-abbrev --no-renames);
- my $fh = $self->{reindex_pipe} = $git->popen(@cmd, $unindex_range);
+ my $fh = $git->popen(@cmd, $unindex_range);
while (<$fh>) {
/\A:\d{6} 100644 $OID ($OID) [AM]\tm$/o or next;
unindex_oid($self, $1, $unindexed);
}
- delete $self->{reindex_pipe};
close $fh or die "git log failed: \$?=$?";
return unless $sync->{-opt}->{prune};
@@ -1203,10 +1201,9 @@ sub index_epoch ($$$) {
my ($self, $sync, $i) = @_;
my $git_dir = git_dir_n($self, $i);
- die 'BUG: already reindexing!' if $self->{reindex_pipe};
-d $git_dir or return; # missing epochs are fine
my $git = PublicInbox::Git->new($git_dir);
- if (my $unindex_range = delete $sync->{unindex_range}->{$i}) {
+ if (my $unindex_range = delete $sync->{unindex_range}->{$i}) { # rare
unindex($self, $sync, $git, $unindex_range);
}
defined(my $stk = $sync->{stacks}->[$i]) or return;
^ permalink raw reply related [flat|nested] 21+ messages in thread
* [PATCH 12/20] v2writable: clarify "epoch" comment
2020-07-24 5:55 [PATCH 00/20] indexing changes and new features Eric Wong
` (10 preceding siblings ...)
2020-07-24 5:55 ` [PATCH 11/20] v2writable: get rid of {reindex_pipe} field Eric Wong
@ 2020-07-24 5:55 ` Eric Wong
2020-07-24 5:55 ` [PATCH 13/20] xapcmd: set {from} properly for v1 inboxes Eric Wong
` (7 subsequent siblings)
19 siblings, 0 replies; 21+ messages in thread
From: Eric Wong @ 2020-07-24 5:55 UTC (permalink / raw)
To: meta
---
lib/PublicInbox/V2Writable.pm | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index c59ead393..13c1ad6f8 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -120,7 +120,7 @@ sub new {
lock_path => "$dir/inbox.lock",
# limit each git repo (epoch) to 1GB or so
rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
- last_commit => [], # git repo -> commit
+ last_commit => [], # git epoch -> commit
};
$self->{shards} = count_shards($self) || nproc_shards($creat);
$self->{index_max_size} = $v2ibx->{index_max_size};
^ permalink raw reply related [flat|nested] 21+ messages in thread
* [PATCH 13/20] xapcmd: set {from} properly for v1 inboxes
2020-07-24 5:55 [PATCH 00/20] indexing changes and new features Eric Wong
` (11 preceding siblings ...)
2020-07-24 5:55 ` [PATCH 12/20] v2writable: clarify "epoch" comment Eric Wong
@ 2020-07-24 5:55 ` Eric Wong
2020-07-24 5:56 ` [PATCH 14/20] searchidx: rename _xdb_{acquire,release} => idx_ Eric Wong
` (6 subsequent siblings)
19 siblings, 0 replies; 21+ messages in thread
From: Eric Wong @ 2020-07-24 5:55 UTC (permalink / raw)
To: meta
This was a bug, but I'm not sure where it matters, yet, but it
may matter in the future.
---
lib/PublicInbox/Xapcmd.pm | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/lib/PublicInbox/Xapcmd.pm b/lib/PublicInbox/Xapcmd.pm
index c04f935cd..4ee3fc791 100644
--- a/lib/PublicInbox/Xapcmd.pm
+++ b/lib/PublicInbox/Xapcmd.pm
@@ -257,8 +257,9 @@ sub run {
my $reindex; # v1:{ from => $x40 }, v2:{ from => [ $x40, $x40, .. ] } }
if (!$opt->{-coarse_lock}) {
- $reindex = $opt->{reindex} = {};
- $reindex->{from} = []; # per-epoch ranges
+ $reindex = $opt->{reindex} = { # per-epoch ranges for v2
+ from => $ibx->version == 1 ? '' : [],
+ };
require PublicInbox::SearchIdx;
PublicInbox::SearchIdx::load_xapian_writable();
}
^ permalink raw reply related [flat|nested] 21+ messages in thread
* [PATCH 14/20] searchidx: rename _xdb_{acquire,release} => idx_
2020-07-24 5:55 [PATCH 00/20] indexing changes and new features Eric Wong
` (12 preceding siblings ...)
2020-07-24 5:55 ` [PATCH 13/20] xapcmd: set {from} properly for v1 inboxes Eric Wong
@ 2020-07-24 5:56 ` Eric Wong
2020-07-24 5:56 ` [PATCH 15/20] searchidx: make v1 indexing closer to v2 Eric Wong
` (5 subsequent siblings)
19 siblings, 0 replies; 21+ messages in thread
From: Eric Wong @ 2020-07-24 5:56 UTC (permalink / raw)
To: meta
The "xdb" prefix was inaccurate since it's used by
indexlevel=basic, which is Xapian-free. The '_' (underscore)
prefix was also wrong for a method which is called across
package boundaries.
---
lib/PublicInbox/SearchIdx.pm | 12 ++++++------
lib/PublicInbox/SearchIdxShard.pm | 6 +++---
t/inbox_idle.t | 4 ++--
t/search.t | 4 ++--
4 files changed, 13 insertions(+), 13 deletions(-)
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 2d53b2d03..89c716793 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -77,7 +77,7 @@ sub new {
sub need_xapian ($) { $_[0]->{indexlevel} =~ $xapianlevels }
-sub _xdb_release {
+sub idx_release {
my ($self, $wake) = @_;
if (need_xapian($self)) {
my $xdb = delete $self->{xdb} or croak 'not acquired';
@@ -101,7 +101,7 @@ sub load_xapian_writable () {
1;
}
-sub _xdb_acquire {
+sub idx_acquire {
my ($self) = @_;
my $flag;
my $dir = $self->xdir;
@@ -735,7 +735,7 @@ sub _index_sync {
$git->cleanup;
delete $self->{txn};
$xdb->cancel_transaction if $xdb;
- $xdb = _xdb_release($self);
+ $xdb = idx_release($self);
# ensure we leak no FDs to "git log" with Xapian <= 1.2
my $range = $lx eq '' ? $tip : "$lx..$tip";
@@ -766,7 +766,7 @@ sub _index_sync {
$self->{over}->rethread_done($opts) if $newest; # all done
$self->commit_txn_lazy;
$git->cleanup;
- $xdb = _xdb_release($self, $nr);
+ $xdb = idx_release($self, $nr);
# let another process do some work...
$pr->("indexed $nr/$self->{ntodo}\n") if $pr && $nr;
if (!$newest) { # more to come
@@ -805,7 +805,7 @@ sub remote_close {
$? == 0 or die ref($self)." pid:$pid exited with: $?";
} else {
die "transaction in progress $self\n" if $self->{txn};
- $self->_xdb_release if $self->{xdb};
+ idx_release($self) if $self->{xdb};
}
}
@@ -821,7 +821,7 @@ sub remote_remove {
sub _begin_txn {
my ($self) = @_;
- my $xdb = $self->{xdb} || $self->_xdb_acquire;
+ my $xdb = $self->{xdb} || idx_acquire($self);
$self->{over}->begin_lazy if $self->{over};
$xdb->begin_transaction if $xdb;
$self->{txn} = 1;
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
index fd34e487b..cb79f3dc9 100644
--- a/lib/PublicInbox/SearchIdxShard.pm
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -15,9 +15,9 @@ sub new {
my $ibx = $v2w->{ibx};
my $self = $class->SUPER::new($ibx, 1, $shard);
# create the DB before forking:
- $self->_xdb_acquire;
+ $self->idx_acquire;
$self->set_indexlevel;
- $self->_xdb_release;
+ $self->idx_release;
$self->spawn_worker($v2w, $shard) if $v2w->{parallel};
$self;
}
@@ -56,7 +56,7 @@ sub shard_worker_loop ($$$$$) {
if ($line eq "commit\n") {
$self->commit_txn_lazy;
} elsif ($line eq "close\n") {
- $self->_xdb_release;
+ $self->idx_release;
} elsif ($line eq "barrier\n") {
$self->commit_txn_lazy;
# no need to lock < 512 bytes is atomic under POSIX
diff --git a/t/inbox_idle.t b/t/inbox_idle.t
index f754e0fcc..61287200d 100644
--- a/t/inbox_idle.t
+++ b/t/inbox_idle.t
@@ -28,9 +28,9 @@ for my $V (1, 2) {
my $im = $ibx->importer(0);
if ($V == 1) {
my $sidx = PublicInbox::SearchIdx->new($ibx, 1);
- $sidx->_xdb_acquire;
+ $sidx->idx_acquire;
$sidx->set_indexlevel;
- $sidx->_xdb_release; # allow watching on lockfile
+ $sidx->idx_release; # allow watching on lockfile
}
my $pi_config = PublicInbox::Config->new(\<<EOF);
publicinbox.inbox-idle.inboxdir=$inboxdir
diff --git a/t/search.t b/t/search.t
index aa6f94bf3..a75d944c3 100644
--- a/t/search.t
+++ b/t/search.t
@@ -21,8 +21,8 @@ ok($@, "exception raised on non-existent DB");
my $rw = PublicInbox::SearchIdx->new($ibx, 1);
$ibx->with_umask(sub {
- $rw->_xdb_acquire;
- $rw->_xdb_release;
+ $rw->idx_acquire;
+ $rw->idx_release;
});
$rw = undef;
my $ro = PublicInbox::Search->new($ibx);
^ permalink raw reply related [flat|nested] 21+ messages in thread
* [PATCH 15/20] searchidx: make v1 indexing closer to v2
2020-07-24 5:55 [PATCH 00/20] indexing changes and new features Eric Wong
` (13 preceding siblings ...)
2020-07-24 5:56 ` [PATCH 14/20] searchidx: rename _xdb_{acquire,release} => idx_ Eric Wong
@ 2020-07-24 5:56 ` Eric Wong
2020-07-24 5:56 ` [PATCH 16/20] index+xcpdb: support --no-sync flag Eric Wong
` (4 subsequent siblings)
19 siblings, 0 replies; 21+ messages in thread
From: Eric Wong @ 2020-07-24 5:56 UTC (permalink / raw)
To: meta
We'll switch to using IdxStack here to ensure we get repeatable
results and ascending THREADIDs according to git chronology.
This means we'll need a two-pass reindex to index existing
messages before indexing new messages.
Since we no longer have a long-lived git-log process, we don't
have to worry about old Xapian referencing the git-log pipe
w/o FD_CLOEXEC, either.
---
lib/PublicInbox/SearchIdx.pm | 253 ++++++++++++++++-------------------
t/v1reindex.t | 2 +-
2 files changed, 113 insertions(+), 142 deletions(-)
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 89c716793..c57a7e164 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -14,6 +14,7 @@ use PublicInbox::Eml;
use PublicInbox::InboxWritable;
use PublicInbox::MID qw(mid_mime mids_for_index mids);
use PublicInbox::MsgIter;
+use PublicInbox::IdxStack;
use Carp qw(croak);
use POSIX qw(strftime);
use PublicInbox::OverIdx;
@@ -27,6 +28,10 @@ our $BATCH_BYTES = defined($ENV{XAPIAN_FLUSH_THRESHOLD}) ?
use constant DEBUG => !!$ENV{DEBUG};
my $xapianlevels = qr/\A(?:full|medium)\z/;
+my $hex = '[a-f0-9]';
+my $OID = $hex .'{40,}';
+my $addmsg = qr!^:000000 100644 \S+ ($OID) A\t${hex}{2}/${hex}{38}$!;
+my $delmsg = qr!^:100644 000000 ($OID) \S+ D\t${hex}{2}/${hex}{38}$!;
sub new {
my ($class, $ibx, $creat, $shard) = @_;
@@ -385,7 +390,7 @@ sub add_message {
$smsg->{mid} //= $mids->[0]; # v1 compatibility
$smsg->{num} //= do { # v1
_msgmap_init($self);
- index_mm($self, $mime);
+ index_mm($self, $mime, $smsg->{blob}, $sync);
};
# v1 and tests only:
@@ -477,34 +482,20 @@ sub unindex_eml {
}
sub index_mm {
- my ($self, $mime) = @_;
- my $mid = mid_mime($mime);
+ my ($self, $mime, $oid, $sync) = @_;
+ my $mids = mids($mime);
my $mm = $self->{mm};
- my $num;
-
- if (defined $self->{regen_down}) {
- $num = $mm->num_for($mid) and return $num;
-
- while (($num = $self->{regen_down}--) > 0) {
- if ($mm->mid_set($num, $mid) != 0) {
- return $num;
- }
- }
- } elsif (defined $self->{regen_up}) {
- $num = $mm->num_for($mid) and return $num;
-
- # this is to fixup old bugs due to add-remove-add
- while (($num = ++$self->{regen_up})) {
- if ($mm->mid_set($num, $mid) != 0) {
- return $num;
- }
+ if ($sync->{reindex}) {
+ my $over = $self->{over};
+ for my $mid (@$mids) {
+ my ($num, undef) = $over->num_mid0_for_oid($oid, $mid);
+ return $num if defined $num;
}
+ $mm->num_for($mids->[0]) // $mm->mid_insert($mids->[0]);
+ } else {
+ # fallback to num_for since filters like RubyLang set the number
+ $mm->mid_insert($mids->[0]) // $mm->num_for($mids->[0]);
}
-
- $num = $mm->mid_insert($mid) and return $num;
-
- # fallback to num_for since filters like RubyLang set the number
- $mm->num_for($mid);
}
sub unindex_mm {
@@ -532,8 +523,8 @@ sub index_both { # git->cat_async callback
my $smsg = bless { bytes => $size, blob => $oid }, 'PublicInbox::Smsg';
my $self = $sync->{sidx};
my $eml = PublicInbox::Eml->new($bref);
- my $num = index_mm($self, $eml);
- $smsg->{num} = $num;
+ $smsg->{num} = index_mm($self, $eml, $oid, $sync) or
+ die "E: could not generate NNTP article number for $oid";
add_message($self, $eml, $smsg, $sync);
}
@@ -549,6 +540,11 @@ sub index_sync {
my ($self, $opts) = @_;
delete $self->{lock_path} if $opts->{-skip_lock};
$self->{ibx}->with_umask(\&_index_sync, $self, $opts);
+ if ($opts->{reindex}) {
+ my %again = %$opts;
+ delete @again{qw(rethread reindex)};
+ index_sync($self, \%again);
+ }
}
sub too_big ($$) {
@@ -562,110 +558,87 @@ sub too_big ($$) {
}
# only for v1
-sub read_log {
- my ($self, $log, $batch_cb) = @_;
- my $hex = '[a-f0-9]';
- my $h40 = $hex .'{40}';
- my $addmsg = qr!^:000000 100644 \S+ ($h40) A\t${hex}{2}/${hex}{38}$!;
- my $delmsg = qr!^:100644 000000 ($h40) \S+ D\t${hex}{2}/${hex}{38}$!;
+sub process_stack {
+ my ($self, $stk, $sync, $batch_cb) = @_;
my $git = $self->{ibx}->git;
- my $latest;
my $max = $BATCH_BYTES;
- local $/ = "\n";
- my %D;
- my $line;
- my $newest;
my $nr = 0;
- my $sync = { sidx => $self, nr => \$nr, max => \$max };
- while (defined($line = <$log>)) {
- if ($line =~ /$addmsg/o) {
- my $blob = $1;
- if (delete $D{$blob}) {
- # make sure pending index writes are done
- # before writing to ->mm
- $git->cat_async_wait;
-
- if (defined $self->{regen_down}) {
- my $num = $self->{regen_down}--;
- $self->{mm}->num_highwater($num);
- }
- next;
- }
- next if too_big($self, $blob);
- $git->cat_async($blob, \&index_both, { %$sync });
+ $sync->{nr} = \$nr;
+ $sync->{max} = \$max;
+ $sync->{sidx} = $self;
+
+ if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
+ warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
+ for my $oid (@leftovers) {
+ $oid = unpack('H*', $oid);
+ $git->cat_async($oid, \&unindex_both, $self);
+ }
+ }
+ while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
+ if ($f eq 'm') {
+ $sync->{autime} = $at;
+ $sync->{cotime} = $ct;
+ next if too_big($self, $oid);
+ $git->cat_async($oid, \&index_both, { %$sync });
if ($max <= 0) {
$git->cat_async_wait;
$max = $BATCH_BYTES;
- $batch_cb->($nr, $latest);
+ $batch_cb->($nr);
}
- } elsif ($line =~ /$delmsg/o) {
- my $blob = $1;
- $D{$blob} = 1 unless too_big($self, $blob);
- } elsif ($line =~ /^commit ($h40)/o) {
- $latest = $1;
- $newest ||= $latest;
- } elsif ($line =~ /^author .*? ([0-9]+) [\-\+][0-9]+$/) {
- $sync->{autime} = $1;
- } elsif ($line =~ /^committer .*? ([0-9]+) [\-\+][0-9]+$/) {
- $sync->{cotime} = $1;
+ } elsif ($f eq 'd') {
+ $git->cat_async($oid, \&unindex_both, $self);
}
}
- close($log) or die "git log failed: \$?=$?";
- # get the leftovers
- foreach my $blob (keys %D) {
- $git->cat_async($blob, \&unindex_both, $self);
- }
$git->cat_async_wait;
- $batch_cb->($nr, $latest, $newest);
+ $batch_cb->($nr, $stk);
}
-sub _git_log {
- my ($self, $opts, $range) = @_;
+sub prepare_stack ($$$) {
+ my ($self, $sync, $range) = @_;
my $git = $self->{ibx}->git;
if (index($range, '..') < 0) {
# don't show annoying git errors to users who run -index
# on empty inboxes
$git->qx(qw(rev-parse -q --verify), "$range^0");
- if ($?) {
- open my $fh, '<', '/dev/null' or
- die "failed to open /dev/null: $!\n";
- return $fh;
- }
+ return PublicInbox::IdxStack->new->read_prepare if $?;
}
+ my $D = $sync->{D} = $sync->{reindex} ? {} : undef; # OID_BIN => NR
# Count the new files so they can be added newest to oldest
# and still have numbers increasing from oldest to newest
- my $fcount = 0;
- my $pr = $opts->{-progress};
- $pr->("counting changes\n\t$range ... ") if $pr;
- # can't use 'rev-list --count' if we use --diff-filter
- my $fh = $git->popen(qw(log --pretty=tformat:%h
- --no-notes --no-color --no-renames
- --diff-filter=AM), $range);
- ++$fcount while <$fh>;
- close $fh or die "git log failed: \$?=$?";
- my $high = $self->{mm}->num_highwater;
- $pr->("$fcount\n") if $pr; # continue previous line
- $self->{ntodo} = $fcount;
-
- if (index($range, '..') < 0) {
- if ($high && $high == $fcount) {
- # fix up old bugs in full indexes which caused messages to
- # not appear in Msgmap
- $self->{regen_up} = $high;
- } else {
- # normal regen is for for fresh data
- $self->{regen_down} = $fcount;
- $self->{regen_down} += $high unless $opts->{reindex};
+ my $fh = $git->popen(qw(log --raw -r --pretty=tformat:%at-%ct-%H
+ --no-notes --no-color --no-renames --no-abbrev),
+ $range);
+ my ($at, $ct, $stk);
+ while (<$fh>) {
+ if (/\A([0-9]+)-([0-9]+)-($OID)$/o) {
+ ($at, $ct) = ($1 + 0, $2 + 0);
+ $stk //= PublicInbox::IdxStack->new($3);
+ } elsif (/$delmsg/) {
+ my $oid = $1;
+ if ($D) { # reindex case
+ $D->{pack('H*', $oid)}++;
+ } else { # non-reindex case:
+ $stk->push_rec('d', $at, $ct, $oid);
+ }
+ } elsif (/$addmsg/) {
+ my $oid = $1;
+ if ($D) {
+ my $oid_bin = pack('H*', $oid);
+ my $nr = --$D->{$oid_bin};
+ delete($D->{$oid_bin}) if $nr <= 0;
+
+ # nr < 0 (-1) means it never existed
+ $stk->push_rec('m', $at, $ct, $oid) if $nr < 0;
+ } else {
+ $stk->push_rec('m', $at, $ct, $oid);
+ }
}
- } else {
- # Give oldest messages the smallest numbers
- $self->{regen_down} = $high + $fcount;
}
-
- $git->popen(qw/log --pretty=raw --no-notes --no-color --no-renames
- --raw -r --no-abbrev/, $range);
+ close $fh or die "git log failed: \$?=$?";
+ $stk //= PublicInbox::IdxStack->new;
+ $stk->read_prepare;
}
# --is-ancestor requires git 1.8.0+
@@ -717,45 +690,43 @@ sub reindex_from ($$) {
sub _index_sync {
my ($self, $opts) = @_;
my $tip = $opts->{ref} || 'HEAD';
- my ($last_commit, $lx, $xlog);
my $git = $self->{ibx}->git;
$git->batch_prepare;
my $pr = $opts->{-progress};
-
+ my $sync = { reindex => $opts->{reindex} };
my $xdb = $self->begin_txn_lazy;
$self->{over}->rethread_prepare($opts);
my $mm = _msgmap_init($self);
- do {
- $xlog = undef; # stop previous git-log via SIGPIPE
- $last_commit = _last_x_commit($self, $mm);
- $lx = reindex_from($opts->{reindex}, $last_commit);
-
- $self->{over}->rollback_lazy;
- $self->{over}->disconnect;
- $git->cleanup;
- delete $self->{txn};
- $xdb->cancel_transaction if $xdb;
- $xdb = idx_release($self);
-
- # ensure we leak no FDs to "git log" with Xapian <= 1.2
- my $range = $lx eq '' ? $tip : "$lx..$tip";
- $xlog = _git_log($self, $opts, $range);
-
- $xdb = $self->begin_txn_lazy;
- } while (_last_x_commit($self, $mm) ne $last_commit);
+ if ($sync->{reindex}) {
+ my $last = $mm->last_commit;
+ if ($last) {
+ $tip = $last;
+ } else {
+ # somebody just blindly added --reindex when indexing
+ # for the first time, allow it:
+ undef $sync->{reindex};
+ }
+ }
+ my $last_commit = _last_x_commit($self, $mm);
+ my $lx = reindex_from($sync->{reindex}, $last_commit);
+ my $range = $lx eq '' ? $tip : "$lx..$tip";
+ $pr->("counting changes\n\t$range ... ") if $pr;
+ my $stk = prepare_stack($self, $sync, $range);
+ $sync->{ntodo} = $stk ? $stk->num_records : 0;
+ $pr->("$sync->{ntodo}\n") if $pr; # continue previous line
- my $dbh = $mm->{dbh} if $mm;
+ my $dbh = $mm->{dbh};
my $batch_cb = sub {
- my ($nr, $commit, $newest) = @_;
- if ($dbh) {
- if ($newest) {
- my $cur = $mm->last_commit || '';
- if (need_update($self, $cur, $newest)) {
- $mm->last_commit($newest);
- }
+ my ($nr, $stk) = @_;
+ # latest_cmt may be undef
+ my $newest = $stk ? $stk->{latest_cmt} : undef;
+ if ($newest) {
+ my $cur = $mm->last_commit || '';
+ if (need_update($self, $cur, $newest)) {
+ $mm->last_commit($newest);
}
- $dbh->commit;
}
+ $dbh->commit;
if ($newest && need_xapian($self)) {
my $cur = $xdb->get_metadata('last_commit');
if (need_update($self, $cur, $newest)) {
@@ -768,15 +739,15 @@ sub _index_sync {
$git->cleanup;
$xdb = idx_release($self, $nr);
# let another process do some work...
- $pr->("indexed $nr/$self->{ntodo}\n") if $pr && $nr;
- if (!$newest) { # more to come
+ $pr->("indexed $nr/$sync->{ntodo}\n") if $pr && $nr;
+ if (!$stk) { # more to come
$xdb = $self->begin_txn_lazy;
- $dbh->begin_work if $dbh;
+ $dbh->begin_work;
}
};
$dbh->begin_work;
- read_log($self, $xlog, $batch_cb);
+ process_stack($self, $stk, $sync, $batch_cb);
}
sub DESTROY {
diff --git a/t/v1reindex.t b/t/v1reindex.t
index 8cb751881..d70ed4b93 100644
--- a/t/v1reindex.t
+++ b/t/v1reindex.t
@@ -221,7 +221,7 @@ ok(!-d $xap, 'Xapian directories removed again');
$config{indexlevel} = 'medium';
my $ibx = PublicInbox::Inbox->new(\%config);
my $rw = PublicInbox::SearchIdx->new($ibx, 1);
- eval { $rw->index_sync };
+ eval { $rw->index_sync({reindex => 1}) };
is($@, '', 'no error from indexing');
is_deeply(\@warn, [], 'no warnings');
my $mset = $ibx->search->reopen->query('hello world', {mset=>1});
^ permalink raw reply related [flat|nested] 21+ messages in thread
* [PATCH 16/20] index+xcpdb: support --no-sync flag
2020-07-24 5:55 [PATCH 00/20] indexing changes and new features Eric Wong
` (14 preceding siblings ...)
2020-07-24 5:56 ` [PATCH 15/20] searchidx: make v1 indexing closer to v2 Eric Wong
@ 2020-07-24 5:56 ` Eric Wong
2020-07-24 5:56 ` [PATCH 17/20] v2writable: share log2stack code with v1 Eric Wong
` (3 subsequent siblings)
19 siblings, 0 replies; 21+ messages in thread
From: Eric Wong @ 2020-07-24 5:56 UTC (permalink / raw)
To: meta
This allows us to speed up indexing operations to SQLite
and Xapian.
Unfortunately, it doesn't affect operations using
`xapian-compact' and the compactor API, since that doesn't seem
to support Xapian::DB_NO_SYNC, yet.
---
Documentation/public-inbox-index.pod | 7 +++++++
Documentation/public-inbox-xcpdb.pod | 6 ++++++
lib/PublicInbox/Msgmap.pm | 21 ++++++++++++---------
lib/PublicInbox/Over.pm | 1 +
lib/PublicInbox/OverIdx.pm | 2 +-
lib/PublicInbox/SearchIdx.pm | 9 ++++++++-
lib/PublicInbox/V2Writable.pm | 6 ++++--
lib/PublicInbox/Xapcmd.pm | 5 +++--
script/public-inbox-index | 5 +++--
script/public-inbox-xcpdb | 4 ++--
10 files changed, 47 insertions(+), 19 deletions(-)
diff --git a/Documentation/public-inbox-index.pod b/Documentation/public-inbox-index.pod
index 08f2fbf45..aeb1b3a39 100644
--- a/Documentation/public-inbox-index.pod
+++ b/Documentation/public-inbox-index.pod
@@ -113,6 +113,13 @@ below.
Available in public-inbox 1.6.0 (PENDING).
+=item --no-sync
+
+Disables L<fsync(2)> and L<fdatasync(2)> operations on SQLite
+and Xapian. This is only effective with Xapian 1.4+.
+
+Available in public-inbox 1.6.0 (PENDING).
+
=back
=head1 FILES
diff --git a/Documentation/public-inbox-xcpdb.pod b/Documentation/public-inbox-xcpdb.pod
index 149c8f78c..7fe1e5fe2 100644
--- a/Documentation/public-inbox-xcpdb.pod
+++ b/Documentation/public-inbox-xcpdb.pod
@@ -45,6 +45,12 @@ too many shards given the capabilities of the current hardware.
These options are passed directly to L<xapian-compact(1)> when
used with C<--compact>.
+=item --no-sync
+
+Disable L<fsync(2)> and L<fdatasync(2)>.
+
+Available in public-inbox 1.6.0 (PENDING).
+
=back
=head1 ENVIRONMENT
diff --git a/lib/PublicInbox/Msgmap.pm b/lib/PublicInbox/Msgmap.pm
index 9d2ef0dc5..839ddf7ca 100644
--- a/lib/PublicInbox/Msgmap.pm
+++ b/lib/PublicInbox/Msgmap.pm
@@ -32,12 +32,11 @@ sub new_file {
my $self = bless { filename => $f }, $class;
my $dbh = $self->{dbh} = PublicInbox::Over::dbh_new($self, $rw);
if ($rw) {
- create_tables($dbh);
-
# TRUNCATE reduces I/O compared to the default (DELETE)
$dbh->do('PRAGMA journal_mode = TRUNCATE');
$dbh->begin_work;
+ create_tables($dbh);
$self->created_at(time) unless $self->created_at;
my $max = $self->max // 0;
@@ -51,12 +50,17 @@ sub new_file {
sub tmp_clone {
my ($self) = @_;
my ($fh, $fn) = tempfile('msgmap-XXXXXXXX', EXLOCK => 0, TMPDIR => 1);
- $self->{dbh}->sqlite_backup_to_file($fn);
- my $tmp = ref($self)->new_file($fn, 1);
- $tmp->{dbh}->do('PRAGMA synchronous = OFF');
- $tmp->{dbh}->do('PRAGMA journal_mode = MEMORY');
+ my $tmp;
+ if ($self->{dbh}->can('sqlite_backup_to_dbh')) {
+ $tmp = ref($self)->new_file($fn, 2);
+ $tmp->{dbh}->do('PRAGMA journal_mode = MEMORY');
+ $self->{dbh}->sqlite_backup_to_dbh($tmp->{dbh});
+ } else { # DBD::SQLite <= 1.61_01
+ $self->{dbh}->sqlite_backup_to_file($fn);
+ $tmp = ref($self)->new_file($fn, 2);
+ $tmp->{dbh}->do('PRAGMA journal_mode = MEMORY');
+ }
$tmp->{pid} = $$;
- close $fh or die "failed to close $fn: $!";
$tmp;
}
@@ -241,8 +245,7 @@ sub atfork_parent {
$self->{pid} or die 'BUG: not a temporary clone';
$self->{dbh} and die 'BUG: tmp_clone dbh not prepared for parent';
defined($self->{filename}) or die 'BUG: {filename} not defined';
- my $dbh = $self->{dbh} = PublicInbox::Over::dbh_new($self, 1);
- $dbh->do('PRAGMA synchronous = OFF');
+ $self->{dbh} = PublicInbox::Over::dbh_new($self, 2);
}
sub atfork_prepare {
diff --git a/lib/PublicInbox/Over.pm b/lib/PublicInbox/Over.pm
index e3f264564..f32743c05 100644
--- a/lib/PublicInbox/Over.pm
+++ b/lib/PublicInbox/Over.pm
@@ -40,6 +40,7 @@ sub dbh_new {
$st = pack('dd', $st[0], $st[1]);
} while ($st ne $self->{st} && $tries++ < 3);
warn "W: $f: .st_dev, .st_ino unstable\n" if $st ne $self->{st};
+ $dbh->do('PRAGMA synchronous = OFF') if ($rw // 0) > 1;
$dbh;
}
diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm
index c57be7243..fcb450794 100644
--- a/lib/PublicInbox/OverIdx.pm
+++ b/lib/PublicInbox/OverIdx.pm
@@ -21,7 +21,7 @@ use Carp qw(croak);
sub dbh_new {
my ($self) = @_;
- my $dbh = $self->SUPER::dbh_new(1);
+ my $dbh = $self->SUPER::dbh_new($self->{-no_sync} ? 2 : 1);
# TRUNCATE reduces I/O compared to the default (DELETE)
# We do not use WAL since we're optimized for read-only ops,
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index c57a7e164..764257432 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -23,6 +23,7 @@ use PublicInbox::Git qw(git_unquote);
use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp);
my $X = \%PublicInbox::Search::X;
my ($DB_CREATE_OR_OPEN, $DB_OPEN);
+our $DB_NO_SYNC = 0;
our $BATCH_BYTES = defined($ENV{XAPIAN_FLUSH_THRESHOLD}) ?
0x7fffffff : 1_000_000;
use constant DEBUG => !!$ENV{DEBUG};
@@ -67,6 +68,7 @@ sub new {
$self->{lock_path} = "$inboxdir/ssoma.lock";
my $dir = $self->xdir;
$self->{over} = PublicInbox::OverIdx->new("$dir/over.sqlite3");
+ $self->{over}->{-no_sync} = 1 if $ibx->{-no_sync};
$self->{index_max_size} = $ibx->{index_max_size};
} elsif ($version == 2) {
defined $shard or die "shard is required for v2\n";
@@ -103,6 +105,9 @@ sub load_xapian_writable () {
*sortable_serialise = $xap.'::sortable_serialise';
$DB_CREATE_OR_OPEN = eval($xap.'::DB_CREATE_OR_OPEN()');
$DB_OPEN = eval($xap.'::DB_OPEN()');
+ my $ver = (eval($xap.'::major_version()') << 16) |
+ (eval($xap.'::minor_version()') << 8);
+ $DB_NO_SYNC = 0x4 if $ver >= 0x10400;
1;
}
@@ -126,6 +131,7 @@ sub idx_acquire {
}
}
return unless defined $flag;
+ $flag |= $DB_NO_SYNC if $self->{ibx}->{-no_sync};
my $xdb = eval { ($X->{WritableDatabase})->new($dir, $flag) };
if ($@) {
die "Failed opening $dir: ", $@;
@@ -377,7 +383,8 @@ sub _msgmap_init ($) {
die "BUG: _msgmap_init is only for v1\n" if $self->{ibx_ver} != 1;
$self->{mm} //= eval {
require PublicInbox::Msgmap;
- PublicInbox::Msgmap->new($self->{ibx}->{inboxdir}, 1);
+ my $rw = $self->{ibx}->{-no_sync} ? 2 : 1;
+ PublicInbox::Msgmap->new($self->{ibx}->{inboxdir}, $rw);
};
}
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 13c1ad6f8..3dc200956 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -116,12 +116,13 @@ sub new {
total_bytes => 0,
current_info => '',
xpfx => $xpfx,
- over => PublicInbox::OverIdx->new("$xpfx/over.sqlite3", 1),
+ over => PublicInbox::OverIdx->new("$xpfx/over.sqlite3"),
lock_path => "$dir/inbox.lock",
# limit each git repo (epoch) to 1GB or so
rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
last_commit => [], # git epoch -> commit
};
+ $self->{over}->{-no_sync} = 1 if $v2ibx->{-no_sync};
$self->{shards} = count_shards($self) || nproc_shards($creat);
$self->{index_max_size} = $v2ibx->{index_max_size};
bless $self, $class;
@@ -293,7 +294,8 @@ sub _idx_init { # with_umask callback
# Now that all subprocesses are up, we can open the FDs
# for SQLite:
my $mm = $self->{mm} = PublicInbox::Msgmap->new_file(
- "$self->{ibx}->{inboxdir}/msgmap.sqlite3", 1);
+ "$self->{ibx}->{inboxdir}/msgmap.sqlite3",
+ $self->{ibx}->{-no_sync} ? 2 : 1);
$mm->{dbh}->begin_work;
}
diff --git a/lib/PublicInbox/Xapcmd.pm b/lib/PublicInbox/Xapcmd.pm
index 4ee3fc791..d6c069d75 100644
--- a/lib/PublicInbox/Xapcmd.pm
+++ b/lib/PublicInbox/Xapcmd.pm
@@ -412,10 +412,11 @@ sub cpdb ($$) {
# like copydatabase(1), be sure we don't overwrite anything in case
# of other bugs:
- my $creat = eval($PublicInbox::Search::Xap.'::DB_CREATE()');
+ my $flag = eval($PublicInbox::Search::Xap.'::DB_CREATE()');
die if $@;
my $XapianWritableDatabase = $PublicInbox::Search::X{WritableDatabase};
- my $dst = $XapianWritableDatabase->new($tmp, $creat);
+ $flag |= $PublicInbox::SearchIdx::DB_NO_SYNC if !$opt->{sync};
+ my $dst = $XapianWritableDatabase->new($tmp, $flag);
my $pr = $opt->{-progress};
my $pfx = $opt->{-progress_pfx} = progress_pfx($new);
my $pr_data = { pr => $pr, pfx => $pfx, nr => 0 } if $pr;
diff --git a/script/public-inbox-index b/script/public-inbox-index
index 2e1934b08..d5c7cae2b 100755
--- a/script/public-inbox-index
+++ b/script/public-inbox-index
@@ -14,8 +14,8 @@ PublicInbox::Admin::require_or_die('-index');
use PublicInbox::Xapcmd;
my $compact_opt;
-my $opt = { quiet => -1, compact => 0, maxsize => undef };
-GetOptions($opt, qw(verbose|v+ reindex rethread compact|c+ jobs|j=i prune
+my $opt = { quiet => -1, compact => 0, maxsize => undef, sync => 1 };
+GetOptions($opt, qw(verbose|v+ reindex rethread compact|c+ jobs|j=i prune sync!
indexlevel|L=s maxsize|max-size=s batchsize|batch-size=s))
or die "bad command-line args\n$usage";
die "--jobs must be >= 0\n" if defined $opt->{jobs} && $opt->{jobs} < 0;
@@ -59,6 +59,7 @@ for my $ibx (@ibxs) {
if ($opt->{compact} >= 2) {
PublicInbox::Xapcmd::run($ibx, 'compact', $compact_opt);
}
+ $ibx->{-no_sync} = 1 if !$opt->{sync};
PublicInbox::Admin::index_inbox($ibx, undef, $opt);
PublicInbox::Xapcmd::run($ibx, 'compact', $compact_opt) if $compact_opt;
}
diff --git a/script/public-inbox-xcpdb b/script/public-inbox-xcpdb
index 2b9f032c5..fcd961488 100755
--- a/script/public-inbox-xcpdb
+++ b/script/public-inbox-xcpdb
@@ -8,8 +8,8 @@ use PublicInbox::Xapcmd;
use PublicInbox::Admin;
PublicInbox::Admin::require_or_die('-search');
my $usage = "Usage: public-inbox-xcpdb [--compact] INBOX_DIR\n";
-my $opt = {};
-my @opt = (qw(compact reshard|R=i), @PublicInbox::Xapcmd::COMPACT_OPT);
+my $opt = { sync => 1 };
+my @opt = (qw(sync! compact reshard|R=i), @PublicInbox::Xapcmd::COMPACT_OPT);
GetOptions($opt, @opt) or die "bad command-line args\n$usage";
my @ibxs = PublicInbox::Admin::resolve_inboxes(\@ARGV) or die $usage;
foreach (@ibxs) {
^ permalink raw reply related [flat|nested] 21+ messages in thread
* [PATCH 17/20] v2writable: share log2stack code with v1
2020-07-24 5:55 [PATCH 00/20] indexing changes and new features Eric Wong
` (15 preceding siblings ...)
2020-07-24 5:56 ` [PATCH 16/20] index+xcpdb: support --no-sync flag Eric Wong
@ 2020-07-24 5:56 ` Eric Wong
2020-07-24 5:56 ` [PATCH 18/20] searchidx: support async git check Eric Wong
` (2 subsequent siblings)
19 siblings, 0 replies; 21+ messages in thread
From: Eric Wong @ 2020-07-24 5:56 UTC (permalink / raw)
To: meta
Another step in making v1 and v2 more similar.
---
lib/PublicInbox/SearchIdx.pm | 44 ++++++++++++++++++---------
lib/PublicInbox/V2Writable.pm | 57 ++++++-----------------------------
2 files changed, 38 insertions(+), 63 deletions(-)
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 764257432..4d2e0da92 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -9,7 +9,7 @@
package PublicInbox::SearchIdx;
use strict;
use v5.10.1;
-use parent qw(PublicInbox::Search PublicInbox::Lock);
+use parent qw(PublicInbox::Search PublicInbox::Lock Exporter);
use PublicInbox::Eml;
use PublicInbox::InboxWritable;
use PublicInbox::MID qw(mid_mime mids_for_index mids);
@@ -21,6 +21,7 @@ use PublicInbox::OverIdx;
use PublicInbox::Spawn qw(spawn);
use PublicInbox::Git qw(git_unquote);
use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp);
+our @EXPORT_OK = qw(too_big crlf_adjust log2stack is_ancestor);
my $X = \%PublicInbox::Search::X;
my ($DB_CREATE_OR_OPEN, $DB_OPEN);
our $DB_NO_SYNC = 0;
@@ -31,8 +32,6 @@ use constant DEBUG => !!$ENV{DEBUG};
my $xapianlevels = qr/\A(?:full|medium)\z/;
my $hex = '[a-f0-9]';
my $OID = $hex .'{40,}';
-my $addmsg = qr!^:000000 100644 \S+ ($OID) A\t${hex}{2}/${hex}{38}$!;
-my $delmsg = qr!^:100644 000000 ($OID) \S+ D\t${hex}{2}/${hex}{38}$!;
sub new {
my ($class, $ibx, $creat, $shard) = @_;
@@ -600,17 +599,18 @@ sub process_stack {
$batch_cb->($nr, $stk);
}
-sub prepare_stack ($$$) {
- my ($self, $sync, $range) = @_;
- my $git = $self->{ibx}->git;
-
- if (index($range, '..') < 0) {
- # don't show annoying git errors to users who run -index
- # on empty inboxes
- $git->qx(qw(rev-parse -q --verify), "$range^0");
- return PublicInbox::IdxStack->new->read_prepare if $?;
+sub log2stack ($$$$) {
+ my ($sync, $git, $range, $ibx) = @_;
+ my $D = $sync->{D}; # OID_BIN => NR (if reindexing, undef otherwise)
+ my ($add, $del);
+ if ($ibx->version == 1) {
+ my $path = $hex.'{2}/'.$hex.'{38}';
+ $add = qr!\A:000000 100644 \S+ ($OID) A\t$path$!;
+ $del = qr!\A:100644 000000 ($OID) \S+ D\t$path$!;
+ } else {
+ $del = qr!\A:\d{6} 100644 $OID ($OID) [AM]\td$!;
+ $add = qr!\A:\d{6} 100644 $OID ($OID) [AM]\tm$!;
}
- my $D = $sync->{D} = $sync->{reindex} ? {} : undef; # OID_BIN => NR
# Count the new files so they can be added newest to oldest
# and still have numbers increasing from oldest to newest
@@ -622,14 +622,14 @@ sub prepare_stack ($$$) {
if (/\A([0-9]+)-([0-9]+)-($OID)$/o) {
($at, $ct) = ($1 + 0, $2 + 0);
$stk //= PublicInbox::IdxStack->new($3);
- } elsif (/$delmsg/) {
+ } elsif (/$del/) {
my $oid = $1;
if ($D) { # reindex case
$D->{pack('H*', $oid)}++;
} else { # non-reindex case:
$stk->push_rec('d', $at, $ct, $oid);
}
- } elsif (/$addmsg/) {
+ } elsif (/$add/) {
my $oid = $1;
if ($D) {
my $oid_bin = pack('H*', $oid);
@@ -648,6 +648,20 @@ sub prepare_stack ($$$) {
$stk->read_prepare;
}
+sub prepare_stack ($$$) {
+ my ($self, $sync, $range) = @_;
+ my $git = $self->{ibx}->git;
+
+ if (index($range, '..') < 0) {
+ # don't show annoying git errors to users who run -index
+ # on empty inboxes
+ $git->qx(qw(rev-parse -q --verify), "$range^0");
+ return PublicInbox::IdxStack->new->read_prepare if $?;
+ }
+ $sync->{D} = $sync->{reindex} ? {} : undef; # OID_BIN => NR
+ log2stack($sync, $git, $range, $self->{ibx});
+}
+
# --is-ancestor requires git 1.8.0+
sub is_ancestor ($$$) {
my ($git, $cur, $tip) = @_;
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 3dc200956..9a58a7a94 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -18,7 +18,7 @@ use PublicInbox::InboxWritable;
use PublicInbox::OverIdx;
use PublicInbox::Msgmap;
use PublicInbox::Spawn qw(spawn popen_rd);
-use PublicInbox::SearchIdx;
+use PublicInbox::SearchIdx qw(too_big log2stack crlf_adjust is_ancestor);
use IO::Handle; # ->autoflush
use File::Temp qw(tempfile);
@@ -156,8 +156,7 @@ sub add {
# indexes a message, returns true if checkpointing is needed
sub do_idx ($$$$) {
my ($self, $msgref, $mime, $smsg) = @_;
- $smsg->{bytes} = $smsg->{raw_bytes} +
- PublicInbox::SearchIdx::crlf_adjust($$msgref);
+ $smsg->{bytes} = $smsg->{raw_bytes} + crlf_adjust($$msgref);
$self->{over}->add_overview($mime, $smsg);
my $idx = idx_shard($self, $smsg->{num} % $self->{shards});
$idx->index_raw($msgref, $mime, $smsg);
@@ -878,7 +877,7 @@ sub reindex_checkpoint ($$) {
sub reindex_oid ($$$) {
my ($self, $sync, $oid) = @_;
- return if PublicInbox::SearchIdx::too_big($self, $oid);
+ return if too_big($self, $oid);
my ($num, $mid0, $len);
my $msgref = $self->{ibx}->git->cat_file($oid, \$len);
return if $len == 0; # purged
@@ -976,8 +975,6 @@ sub last_commits ($$) {
$heads;
}
-*is_ancestor = *PublicInbox::SearchIdx::is_ancestor;
-
# returns a revision range for git-log(1)
sub log_range ($$$$$) {
my ($self, $sync, $git, $i, $tip) = @_;
@@ -1029,47 +1026,6 @@ $range
$range;
}
-sub prepare_range_stack {
- my ($git, $sync, $range) = @_;
- # Don't bump num_highwater on --reindex by using {D}.
- # We intentionally do NOT use {D} in the non-reindex case because
- # we want NNTP article number gaps from unindexed messages to
- # show up in mirrors, too.
- my $D = $sync->{D} //= $sync->{reindex} ? {} : undef; # OID_BIN => NR
-
- my $fh = $git->popen(qw(log --raw -r --pretty=tformat:%at-%ct-%H
- --no-notes --no-color --no-renames --no-abbrev),
- $range);
- my ($at, $ct, $stk);
- while (<$fh>) {
- if (/\A([0-9]+)-([0-9]+)-($OID)$/o) {
- ($at, $ct) = ($1 + 0, $2 + 0);
- $stk //= PublicInbox::IdxStack->new($3);
- } elsif (/\A:\d{6} 100644 $OID ($OID) [AM]\td$/o) {
- my $oid = $1;
- if ($D) { # reindex case
- $D->{pack('H*', $oid)}++;
- } else { # non-reindex case:
- $stk->push_rec('d', $at, $ct, $oid);
- }
- } elsif (/\A:\d{6} 100644 $OID ($OID) [AM]\tm$/o) {
- my $oid = $1;
- if ($D) {
- my $oid_bin = pack('H*', $oid);
- my $nr = --$D->{$oid_bin};
- delete($D->{$oid_bin}) if $nr <= 0;
-
- # nr < 0 (-1) means it never existed
- $stk->push_rec('m', $at, $ct, $oid) if $nr < 0;
- } else {
- $stk->push_rec('m', $at, $ct, $oid);
- }
- }
- }
- close $fh or die "git log failed: \$?=$?";
- $stk ? $stk->read_prepare : undef;
-}
-
sub sync_prepare ($$$) {
my ($self, $sync, $epoch_max) = @_;
my $pr = $sync->{-opt}->{-progress};
@@ -1093,7 +1049,12 @@ sub sync_prepare ($$$) {
my $range = log_range($self, $sync, $git, $i, $tip) or next;
# can't use 'rev-list --count' if we use --diff-filter
$pr->("$i.git counting $range ... ") if $pr;
- my $stk = prepare_range_stack($git, $sync, $range);
+ # Don't bump num_highwater on --reindex by using {D}.
+ # We intentionally do NOT use {D} in the non-reindex case
+ # because we want NNTP article number gaps from unindexed
+ # messages to show up in mirrors, too.
+ $sync->{D} //= $sync->{reindex} ? {} : undef; # OID_BIN => NR
+ my $stk = log2stack($sync, $git, $range, $self->{ibx});
my $nr = $stk ? $stk->num_records : 0;
$pr->("$nr\n") if $pr;
$sync->{stacks}->[$i] = $stk if $stk;
^ permalink raw reply related [flat|nested] 21+ messages in thread
* [PATCH 18/20] searchidx: support async git check
2020-07-24 5:55 [PATCH 00/20] indexing changes and new features Eric Wong
` (16 preceding siblings ...)
2020-07-24 5:56 ` [PATCH 17/20] v2writable: share log2stack code with v1 Eric Wong
@ 2020-07-24 5:56 ` Eric Wong
2020-07-24 5:56 ` [PATCH 19/20] searchidx: $batch_cb => v1_checkpoint Eric Wong
2020-07-24 5:56 ` [PATCH 20/20] v2writable: {unindexed} belongs in $sync state Eric Wong
19 siblings, 0 replies; 21+ messages in thread
From: Eric Wong @ 2020-07-24 5:56 UTC (permalink / raw)
To: meta
This allows v1 indexing to run while the `cat-file --batch-check'
process is waiting on high-latency storage.
---
lib/PublicInbox/Git.pm | 72 +++++++++++++++++++++++++++++-------
lib/PublicInbox/SearchIdx.pm | 23 ++++++++++--
2 files changed, 78 insertions(+), 17 deletions(-)
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 265524ffa..ffc464eb3 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -231,26 +231,71 @@ sub cat_file {
$result->[0];
}
-sub check {
- my ($self, $obj) = @_;
- _bidi_pipe($self, qw(--batch-check in_c out_c pid_c err_c));
- print { $self->{out_c} } $obj, "\n" or fail($self, "write error: $!");
- my $rbuf = ''; # TODO: async + {chk_rbuf}
- chomp(my $line = my_readline($self->{in_c}, \$rbuf));
- my ($hex, $type, $size) = split(' ', $line);
-
- # Future versions of git.git may show 'ambiguous', but for now,
+sub check_async_step ($$) {
+ my ($self, $inflight_c) = @_;
+ die 'BUG: inflight empty or odd' if scalar(@$inflight_c) < 3;
+ my ($req, $cb, $arg) = splice(@$inflight_c, 0, 3);
+ my $rbuf = delete($self->{rbuf_c}) // \(my $new = '');
+ chomp(my $line = my_readline($self->{in_c}, $rbuf));
+ my ($hex, $type, $size) = split(/ /, $line);
+
+ # Future versions of git.git may have type=ambiguous, but for now,
# we must handle 'dangling' below (and maybe some other oddball
# stuff):
# https://public-inbox.org/git/20190118033845.s2vlrb3wd3m2jfzu@dcvr/T/
- return if $type eq 'missing' || $type eq 'ambiguous';
-
if ($hex eq 'dangling' || $hex eq 'notdir' || $hex eq 'loop') {
- my $ret = my_read($self->{in_c}, \$rbuf, $type + 1);
+ my $ret = my_read($self->{in_c}, $rbuf, $type + 1);
fail($self, defined($ret) ? 'read EOF' : "read: $!") if !$ret;
- return;
}
+ eval { $cb->($hex, $type, $size, $arg, $self) };
+ warn "E: check($req) $@\n" if $@;
+ $self->{rbuf_c} = $rbuf if $$rbuf ne '';
+}
+
+sub check_async_wait ($) {
+ my ($self) = @_;
+ my $inflight_c = delete $self->{inflight_c} or return;
+ while (scalar(@$inflight_c)) {
+ check_async_step($self, $inflight_c);
+ }
+}
+sub check_async_begin ($) {
+ my ($self) = @_;
+ cleanup($self) if alternates_changed($self);
+ _bidi_pipe($self, qw(--batch-check in_c out_c pid_c err_c));
+ die 'BUG: already in async check' if $self->{inflight_c};
+ $self->{inflight_c} = [];
+}
+
+sub check_async ($$$$) {
+ my ($self, $oid, $cb, $arg) = @_;
+ my $inflight_c = $self->{inflight_c} // check_async_begin($self);
+ if (scalar(@$inflight_c) >= MAX_INFLIGHT) {
+ check_async_step($self, $inflight_c);
+ }
+ print { $self->{out_c} } $oid, "\n" or fail($self, "write error: $!");
+ push(@$inflight_c, $oid, $cb, $arg);
+}
+
+sub _check_cb { # check_async callback
+ my ($hex, $type, $size, $result) = @_;
+ @$result = ($hex, $type, $size);
+}
+
+sub check {
+ my ($self, $oid) = @_;
+ my $result = [];
+ check_async($self, $oid, \&_check_cb, $result);
+ check_async_wait($self);
+ my ($hex, $type, $size) = @$result;
+
+ # Future versions of git.git may show 'ambiguous', but for now,
+ # we must handle 'dangling' below (and maybe some other oddball
+ # stuff):
+ # https://public-inbox.org/git/20190118033845.s2vlrb3wd3m2jfzu@dcvr/T/
+ return if $type eq 'missing' || $type eq 'ambiguous';
+ return if $hex eq 'dangling' || $hex eq 'notdir' || $hex eq 'loop';
($hex, $type, $size);
}
@@ -297,6 +342,7 @@ sub cleanup {
my ($self) = @_;
local $in_cleanup = 1;
delete $self->{async_cat};
+ check_async_wait($self);
cat_async_wait($self);
_destroy($self, qw(cat_rbuf in out pid));
_destroy($self, qw(chk_rbuf in_c out_c pid_c err_c));
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 4d2e0da92..39dc1f874 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -563,6 +563,16 @@ sub too_big ($$) {
1;
}
+sub ck_size { # check_async cb for -index --max-size=...
+ my ($oid, $type, $size, $arg, $git) = @_;
+ (($type // '') eq 'blob') or die "E: bad $oid in $git->{git_dir}";
+ if ($size <= $arg->{index_max_size}) {
+ $git->cat_async($oid, \&index_both, $arg);
+ } else {
+ warn "W: skipping $oid ($size > $arg->{index_max_size})\n";
+ }
+}
+
# only for v1
sub process_stack {
my ($self, $stk, $sync, $batch_cb) = @_;
@@ -580,13 +590,17 @@ sub process_stack {
$git->cat_async($oid, \&unindex_both, $self);
}
}
+ $sync->{index_max_size} = $self->{ibx}->{index_max_size};
while (my ($f, $at, $ct, $oid) = $stk->pop_rec) {
if ($f eq 'm') {
- $sync->{autime} = $at;
- $sync->{cotime} = $ct;
- next if too_big($self, $oid);
- $git->cat_async($oid, \&index_both, { %$sync });
+ my $arg = { %$sync, autime => $at, cotime => $ct };
+ if ($sync->{index_max_size}) {
+ $git->check_async($oid, \&ck_size, $arg);
+ } else {
+ $git->cat_async($oid, \&index_both, $arg);
+ }
if ($max <= 0) {
+ $git->check_async_wait;
$git->cat_async_wait;
$max = $BATCH_BYTES;
$batch_cb->($nr);
@@ -595,6 +609,7 @@ sub process_stack {
$git->cat_async($oid, \&unindex_both, $self);
}
}
+ $git->check_async_wait;
$git->cat_async_wait;
$batch_cb->($nr, $stk);
}
^ permalink raw reply related [flat|nested] 21+ messages in thread
* [PATCH 19/20] searchidx: $batch_cb => v1_checkpoint
2020-07-24 5:55 [PATCH 00/20] indexing changes and new features Eric Wong
` (17 preceding siblings ...)
2020-07-24 5:56 ` [PATCH 18/20] searchidx: support async git check Eric Wong
@ 2020-07-24 5:56 ` Eric Wong
2020-07-24 5:56 ` [PATCH 20/20] v2writable: {unindexed} belongs in $sync state Eric Wong
19 siblings, 0 replies; 21+ messages in thread
From: Eric Wong @ 2020-07-24 5:56 UTC (permalink / raw)
To: meta
Another closure gone, and we may be able to share more
code with v2 in upcoming commits.
---
lib/PublicInbox/SearchIdx.pm | 90 ++++++++++++++++++------------------
1 file changed, 45 insertions(+), 45 deletions(-)
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 39dc1f874..fe089c8e8 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -573,9 +573,48 @@ sub ck_size { # check_async cb for -index --max-size=...
}
}
+sub v1_checkpoint ($$;$) {
+ my ($self, $sync, $stk) = @_;
+ $self->{ibx}->git->check_async_wait;
+ $self->{ibx}->git->cat_async_wait;
+
+ # latest_cmt may be undef
+ my $newest = $stk ? $stk->{latest_cmt} : undef;
+ if ($newest) {
+ my $cur = $self->{mm}->last_commit || '';
+ if (need_update($self, $cur, $newest)) {
+ $self->{mm}->last_commit($newest);
+ }
+ } else {
+ ${$sync->{max}} = $BATCH_BYTES;
+ }
+
+ $self->{mm}->{dbh}->commit;
+ if ($newest && need_xapian($self)) {
+ my $cur = $self->{xdb}->get_metadata('last_commit');
+ if (need_update($self, $cur, $newest)) {
+ $self->{xdb}->set_metadata('last_commit', $newest);
+ }
+ }
+
+ $self->{over}->rethread_done($sync->{-opt}) if $newest; # all done
+ commit_txn_lazy($self);
+ $self->{ibx}->git->cleanup;
+ my $nr = ${$sync->{nr}};
+ idx_release($self, $nr);
+ # let another process do some work...
+ if (my $pr = $sync->{-opt}->{-progress}) {
+ $pr->("indexed $nr/$sync->{ntodo}\n") if $nr;
+ }
+ if (!$stk) { # more to come
+ begin_txn_lazy($self);
+ $self->{mm}->{dbh}->begin_work;
+ }
+}
+
# only for v1
sub process_stack {
- my ($self, $stk, $sync, $batch_cb) = @_;
+ my ($self, $sync, $stk) = @_;
my $git = $self->{ibx}->git;
my $max = $BATCH_BYTES;
my $nr = 0;
@@ -583,6 +622,7 @@ sub process_stack {
$sync->{max} = \$max;
$sync->{sidx} = $self;
+ $self->{mm}->{dbh}->begin_work;
if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
for my $oid (@leftovers) {
@@ -599,19 +639,12 @@ sub process_stack {
} else {
$git->cat_async($oid, \&index_both, $arg);
}
- if ($max <= 0) {
- $git->check_async_wait;
- $git->cat_async_wait;
- $max = $BATCH_BYTES;
- $batch_cb->($nr);
- }
+ v1_checkpoint($self, $sync) if $max <= 0;
} elsif ($f eq 'd') {
$git->cat_async($oid, \&unindex_both, $self);
}
}
- $git->check_async_wait;
- $git->cat_async_wait;
- $batch_cb->($nr, $stk);
+ v1_checkpoint($self, $sync, $stk);
}
sub log2stack ($$$$) {
@@ -729,7 +762,7 @@ sub _index_sync {
my $git = $self->{ibx}->git;
$git->batch_prepare;
my $pr = $opts->{-progress};
- my $sync = { reindex => $opts->{reindex} };
+ my $sync = { reindex => $opts->{reindex}, -opt => $opts };
my $xdb = $self->begin_txn_lazy;
$self->{over}->rethread_prepare($opts);
my $mm = _msgmap_init($self);
@@ -750,40 +783,7 @@ sub _index_sync {
my $stk = prepare_stack($self, $sync, $range);
$sync->{ntodo} = $stk ? $stk->num_records : 0;
$pr->("$sync->{ntodo}\n") if $pr; # continue previous line
-
- my $dbh = $mm->{dbh};
- my $batch_cb = sub {
- my ($nr, $stk) = @_;
- # latest_cmt may be undef
- my $newest = $stk ? $stk->{latest_cmt} : undef;
- if ($newest) {
- my $cur = $mm->last_commit || '';
- if (need_update($self, $cur, $newest)) {
- $mm->last_commit($newest);
- }
- }
- $dbh->commit;
- if ($newest && need_xapian($self)) {
- my $cur = $xdb->get_metadata('last_commit');
- if (need_update($self, $cur, $newest)) {
- $xdb->set_metadata('last_commit', $newest);
- }
- }
-
- $self->{over}->rethread_done($opts) if $newest; # all done
- $self->commit_txn_lazy;
- $git->cleanup;
- $xdb = idx_release($self, $nr);
- # let another process do some work...
- $pr->("indexed $nr/$sync->{ntodo}\n") if $pr && $nr;
- if (!$stk) { # more to come
- $xdb = $self->begin_txn_lazy;
- $dbh->begin_work;
- }
- };
-
- $dbh->begin_work;
- process_stack($self, $stk, $sync, $batch_cb);
+ process_stack($self, $sync, $stk);
}
sub DESTROY {
^ permalink raw reply related [flat|nested] 21+ messages in thread
* [PATCH 20/20] v2writable: {unindexed} belongs in $sync state
2020-07-24 5:55 [PATCH 00/20] indexing changes and new features Eric Wong
` (18 preceding siblings ...)
2020-07-24 5:56 ` [PATCH 19/20] searchidx: $batch_cb => v1_checkpoint Eric Wong
@ 2020-07-24 5:56 ` Eric Wong
19 siblings, 0 replies; 21+ messages in thread
From: Eric Wong @ 2020-07-24 5:56 UTC (permalink / raw)
To: meta
There's no reason for {unindexed} to persist beyond
an ->index_sync call.
---
lib/PublicInbox/V2Writable.pm | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 9a58a7a94..f159d39c2 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -891,12 +891,12 @@ sub reindex_oid ($$$) {
}
# {unindexed} is unlikely
- if ((my $unindexed = $self->{unindexed}) && scalar(@$mids) == 1) {
+ if ((my $unindexed = $sync->{unindexed}) && scalar(@$mids) == 1) {
$num = delete($unindexed->{$mids->[0]});
if (defined $num) {
$mid0 = $mids->[0];
$self->{mm}->mid_set($num, $mid0);
- delete($self->{unindexed}) if !keys(%$unindexed);
+ delete($sync->{unindexed}) if !keys(%$unindexed);
}
}
if (!defined($num)) { # reuse if reindexing (or duplicates)
@@ -1125,7 +1125,7 @@ sub unindex_oid ($$;$) {
# a mirror because the source used -purge or -edit
sub unindex ($$$$) {
my ($self, $sync, $git, $unindex_range) = @_;
- my $unindexed = $self->{unindexed} ||= {}; # $mid0 => $num
+ my $unindexed = $sync->{unindexed} ||= {}; # $mid0 => $num
my $before = scalar keys %$unindexed;
# order does not matter, here:
my @cmd = qw(log --raw -r
^ permalink raw reply related [flat|nested] 21+ messages in thread