From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 704771F5B7 for ; Fri, 24 Jul 2020 05:56:07 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 02/20] v2: index forwards (via `git log --reverse') Date: Fri, 24 Jul 2020 05:55:48 +0000 Message-Id: <20200724055606.27332-3-e@yhbt.net> In-Reply-To: <20200724055606.27332-1-e@yhbt.net> References: <20200724055606.27332-1-e@yhbt.net> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Since we'll need to expose THREADID to JMAP and IMAP users, index all messages in the order they were committed to ensure our `tid' (thread ID) column ascends in mirrors the same way they do in the source inbox. This drastically simplifies our code but increases memory usage of `git-log'. The next commit will bring memory use back down at the expense of $TMPDIR usage. --- MANIFEST | 1 - lib/PublicInbox/MultiMidQueue.pm | 62 ------- lib/PublicInbox/V2Writable.pm | 279 +++++++++---------------------- 3 files changed, 81 insertions(+), 261 deletions(-) delete mode 100644 lib/PublicInbox/MultiMidQueue.pm diff --git a/MANIFEST b/MANIFEST index 963caad02..9d90c8c23 100644 --- a/MANIFEST +++ b/MANIFEST @@ -155,7 +155,6 @@ lib/PublicInbox/MboxGz.pm lib/PublicInbox/MsgIter.pm lib/PublicInbox/MsgTime.pm lib/PublicInbox/Msgmap.pm -lib/PublicInbox/MultiMidQueue.pm lib/PublicInbox/NNTP.pm lib/PublicInbox/NNTPD.pm lib/PublicInbox/NNTPdeflate.pm diff --git a/lib/PublicInbox/MultiMidQueue.pm b/lib/PublicInbox/MultiMidQueue.pm deleted file mode 100644 index eb2ecf2f2..000000000 --- a/lib/PublicInbox/MultiMidQueue.pm +++ /dev/null @@ -1,62 +0,0 @@ -# Copyright (C) 2020 all contributors -# License: AGPL-3.0+ - -# temporary queue for public-inbox-index to support multi-Message-ID -# messages on mirrors of v2 inboxes -package PublicInbox::MultiMidQueue; -use strict; -use SDBM_File; # part of Perl standard library -use Fcntl qw(O_RDWR O_CREAT); -use File::Temp 0.19 (); # 0.19 for ->newdir -my %e = ( - freebsd => 0x100000, - linux => 0x80000, - netbsd => 0x400000, - openbsd => 0x10000, -); -my $O_CLOEXEC = $e{$^O} // 0; - -sub new { - my ($class) = @_; - my $tmpdir = File::Temp->newdir('multi-mid-q-XXXXXX', TMPDIR => 1); - my $base = $tmpdir->dirname . '/q'; - my %sdbm; - my $flags = O_RDWR|O_CREAT; - if (!tie(%sdbm, 'SDBM_File', $base, $flags|$O_CLOEXEC, 0600)) { - if (!tie(%sdbm, 'SDBM_File', $base, $flags, 0600)) { - die "could not tie ($base): $!"; - } - $O_CLOEXEC = 0; - } - - bless { - cur => 1, - min => 1, - max => 0, - sdbm => \%sdbm, - tmpdir => $tmpdir, - }, $class; -} - -sub set_oid { - my ($self, $i, $oid, $v2w) = @_; - $self->{max} = $i if $i > $self->{max}; - $self->{min} = $i if $i < $self->{min}; - $self->{sdbm}->{$i} = "$oid\t$v2w->{autime}\t$v2w->{cotime}"; -} - -sub get_oid { - my ($self, $i, $v2w) = @_; - my $rec = $self->{sdbm}->{$i} or return; - my ($oid, $autime, $cotime) = split(/\t/, $rec); - $v2w->{autime} = $autime; - $v2w->{cotime} = $cotime; - $oid -} - -sub push_oid { - my ($self, $oid, $v2w) = @_; - set_oid($self, $self->{cur}++, $oid, $v2w); -} - -1; diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 16556ddc2..c04ea5d77 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -18,10 +18,10 @@ use PublicInbox::OverIdx; use PublicInbox::Msgmap; use PublicInbox::Spawn qw(spawn popen_rd); use PublicInbox::SearchIdx; -use PublicInbox::MultiMidQueue; use IO::Handle; # ->autoflush use File::Temp qw(tempfile); +my $x40 = qr/[a-f0-9]{40}/; # an estimate of the post-packed size to the raw uncompressed size my $PACKING_FACTOR = 0.4; @@ -862,18 +862,6 @@ sub atfork_child { $self->{bnote}->[1]; } -sub mark_deleted ($$$$) { - my ($self, $sync, $git, $oid) = @_; - return if PublicInbox::SearchIdx::too_big($self, $git, $oid); - my $msgref = $git->cat_file($oid); - my $mime = PublicInbox::Eml->new($$msgref); - my $mids = mids($mime->header_obj); - my $chash = content_hash($mime); - foreach my $mid (@$mids) { - $sync->{D}->{"$mid\0$chash"} = $oid; - } -} - sub reindex_checkpoint ($$$) { my ($self, $sync, $git) = @_; @@ -891,107 +879,11 @@ sub reindex_checkpoint ($$$) { $sync->{mm_tmp}->atfork_parent; } -# only for a few odd messages with multiple Message-IDs -sub reindex_oid_m ($$$$;$) { - my ($self, $sync, $git, $oid, $regen_num) = @_; - $self->{current_info} = "multi_mid $oid"; - my ($num, $mid0, $len); - my $msgref = $git->cat_file($oid, \$len); - my $mime = PublicInbox::Eml->new($$msgref); - my $mids = mids($mime->header_obj); - my $chash = content_hash($mime); - die "BUG: reindex_oid_m called for <=1 mids" if scalar(@$mids) <= 1; - - for my $mid (reverse @$mids) { - delete($sync->{D}->{"$mid\0$chash"}) and - die "BUG: reindex_oid should handle <$mid> delete"; - } - my $over = $self->{over}; - for my $mid (reverse @$mids) { - ($num, $mid0) = $over->num_mid0_for_oid($oid, $mid); - next unless defined $num; - if (defined($regen_num) && $regen_num != $num) { - die "BUG: regen(#$regen_num) != over(#$num)"; - } - } - unless (defined($num)) { - for my $mid (reverse @$mids) { - # is this a number we got before? - my $n = $sync->{mm_tmp}->num_for($mid); - next unless defined $n; - next if defined($regen_num) && $regen_num != $n; - ($num, $mid0) = ($n, $mid); - last; - } - } - if (defined($num)) { - $sync->{mm_tmp}->num_delete($num); - } elsif (defined $regen_num) { - $num = $regen_num; - for my $mid (reverse @$mids) { - $self->{mm}->mid_set($num, $mid) == 1 or next; - $mid0 = $mid; - last; - } - unless (defined $mid0) { - warn "E: cannot regen #$num\n"; - return; - } - } else { # fixup bugs in old mirrors on reindex - for my $mid (reverse @$mids) { - $num = $self->{mm}->mid_insert($mid); - next unless defined $num; - $mid0 = $mid; - last; - } - if (defined $mid0) { - if ($sync->{reindex}) { - warn "reindex added #$num <$mid0>\n"; - } - } else { - warn "E: cannot find article #\n"; - return; - } - } - $sync->{nr}++; - my $smsg = bless { - raw_bytes => $len, - num => $num, - blob => $oid, - mid => $mid0, - }, 'PublicInbox::Smsg'; - $smsg->populate($mime, $self); - if (do_idx($self, $msgref, $mime, $smsg)) { - reindex_checkpoint($self, $sync, $git); - } -} - -sub check_unindexed ($$$) { - my ($self, $num, $mid0) = @_; - my $unindexed = $self->{unindexed} // {}; - my $n = delete($unindexed->{$mid0}); - defined $n or return; - if ($n != $num) { - die "BUG: unindexed $n != $num <$mid0>\n"; - } else { - $self->{mm}->mid_set($num, $mid0); - } -} - -sub multi_mid_q_push ($$$) { - my ($self, $sync, $oid) = @_; - my $multi_mid = $sync->{multi_mid} //= PublicInbox::MultiMidQueue->new; - if ($sync->{reindex}) { # no regen on reindex - $multi_mid->push_oid($oid, $self); - } else { - my $num = $sync->{regen}--; - die "BUG: ran out of article numbers" if $num <= 0; - $multi_mid->set_oid($num, $oid, $self); - } -} - sub reindex_oid ($$$$) { my ($self, $sync, $git, $oid) = @_; + if (my $D = $sync->{D}) { # don't waste I/O on deletes + return if $D->{pack('H*', $oid)}; + } return if PublicInbox::SearchIdx::too_big($self, $git, $oid); my ($num, $mid0, $len); my $msgref = $git->cat_file($oid, \$len); @@ -1003,48 +895,57 @@ sub reindex_oid ($$$$) { if (scalar(@$mids) == 0) { warn "E: $oid has no Message-ID, skipping\n"; return; - } elsif (scalar(@$mids) == 1) { - my $mid = $mids->[0]; - - # was the file previously marked as deleted?, skip if so - if (delete($sync->{D}->{"$mid\0$chash"})) { - if (!$sync->{reindex}) { - $num = $sync->{regen}--; - $self->{mm}->num_highwater($num); - } - return; - } + } - # is this a number we got before? - $num = $sync->{mm_tmp}->num_for($mid); + # {unindexed} is unlikely + if ((my $unindexed = $self->{unindexed}) && scalar(@$mids) == 1) { + $num = delete($unindexed->{$mids->[0]}); if (defined $num) { - $mid0 = $mid; - check_unindexed($self, $num, $mid0); - } else { - $num = $sync->{regen}--; - die "BUG: ran out of article numbers" if $num <= 0; - if ($self->{mm}->mid_set($num, $mid) != 1) { - warn "E: unable to assign $num => <$mid>\n"; - return; - } - $mid0 = $mid; + $mid0 = $mids->[0]; + $self->{mm}->mid_set($num, $mid0); + delete($self->{unindexed}) if !keys(%$unindexed); + } + } + if (!defined($num)) { # reuse if reindexing (or duplicates) + my $over = $self->{over}; + for my $mid (@$mids) { + ($num, $mid0) = $over->num_mid0_for_oid($oid, $mid); + last if defined $num; } - } else { # multiple MIDs are a weird case: - my $del = 0; - for (@$mids) { - $del += delete($sync->{D}->{"$_\0$chash"}) // 0; + } + $mid0 //= do { # is this a number we got before? + $num = $sync->{mm_tmp}->num_for($mids->[0]); + defined($num) ? $mids->[0] : undef; + }; + if (!defined($num)) { + for (my $i = $#$mids; $i >= 1; $i--) { + $num = $sync->{mm_tmp}->num_for($mids->[$i]); + if (defined($num)) { + $mid0 = $mids->[$i]; + last; + } } - if ($del) { - unindex_oid_remote($self, $oid, $_) for @$mids; - # do not delete from {mm_tmp}, since another - # single-MID message may use it. - } else { # handle them at the end: - multi_mid_q_push($self, $sync, $oid); + } + if (defined($num)) { + $sync->{mm_tmp}->num_delete($num); + } else { # never seen + $num = $self->{mm}->mid_insert($mids->[0]); + if (defined($num)) { + $mid0 = $mids->[0]; + } else { # rare, try the rest of them, backwards + for (my $i = $#$mids; $i >= 1; $i--) { + $num = $self->{mm}->mid_insert($mids->[$i]); + if (defined($num)) { + $mid0 = $mids->[$i]; + last; + } + } } + } + if (!defined($num)) { + warn "E: $oid <", join('> <', @$mids), "> is a duplicate\n"; return; } - $sync->{mm_tmp}->mid_delete($mid0) or - die "failed to delete <$mid0> for article #$num\n"; $sync->{nr}++; my $smsg = bless { raw_bytes => $len, @@ -1134,6 +1035,22 @@ $range $range; } +# don't bump num_highwater on --reindex +sub mark_deleted ($$$) { + my ($git, $sync, $range) = @_; + my $D = $sync->{D} //= {}; # pack("H*", $oid) => NR + my $fh = $git->popen(qw(log --raw --no-abbrev + --pretty=tformat:%H + --no-notes --no-color --no-renames + --diff-filter=AM), $range, '--', 'd'); + while (<$fh>) { + if (/\A:\d{6} 100644 $x40 ($x40) [AM]\td$/o) { + $D->{pack('H*', $1)}++; + } + } + close $fh or die "git log failed: \$?=$?"; +} + sub sync_prepare ($$$) { my ($self, $sync, $epoch_max) = @_; my $pr = $sync->{-opt}->{-progress}; @@ -1144,7 +1061,7 @@ sub sync_prepare ($$$) { # without {reindex} my $reindex_heads = last_commits($self, $epoch_max) if $sync->{reindex}; - for (my $i = $epoch_max; $i >= 0; $i--) { + for my $i (0..$epoch_max) { die 'BUG: already indexing!' if $self->{reindex_pipe}; my $git_dir = git_dir_n($self, $i); -d $git_dir or next; # missing epochs are fine @@ -1168,8 +1085,8 @@ sub sync_prepare ($$$) { close $fh or die "git log failed: \$?=$?"; $pr->("$n\n") if $pr; $regen_max += $n; + mark_deleted($git, $sync, $range) if $sync->{reindex}; } - return 0 if (!$regen_max && !keys(%{$self->{unindex_range}})); # reindex should NOT see new commits anymore, if we do, @@ -1203,10 +1120,8 @@ sub unindex_oid ($$$;$) { my ($id, $prev); while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) { $gone{$smsg->{num}} = 1 if $oid eq $smsg->{blob}; - 1; # continue } - my $n = scalar keys %gone; - next unless $n; + my $n = scalar(keys(%gone)) or next; if ($n > 1) { warn "BUG: multiple articles linked to $oid\n", join(',',sort keys %gone), "\n"; @@ -1222,7 +1137,6 @@ sub unindex_oid ($$$;$) { } } -my $x40 = qr/[a-f0-9]{40}/; sub unindex ($$$$) { my ($self, $sync, $git, $unindex_range) = @_; my $unindexed = $self->{unindexed} ||= {}; # $mid0 => $num @@ -1276,22 +1190,29 @@ sub index_epoch ($$$) { if (my $pr = $sync->{-opt}->{-progress}) { $pr->("$i.git indexing $range\n"); } - - my @cmd = qw(log --raw -r --pretty=tformat:%H.%at.%ct + my @cmd = qw(log --reverse --raw -r --pretty=tformat:%H.%at.%ct --no-notes --no-color --no-abbrev --no-renames); my $fh = $self->{reindex_pipe} = $git->popen(@cmd, $range); my $cmt; + my $D = $sync->{D}; while (<$fh>) { chomp; $self->{current_info} = "$i.git $_"; if (/\A($x40)\.([0-9]+)\.([0-9]+)$/o) { - $cmt //= $1; + $cmt = $1; $self->{autime} = $2; $self->{cotime} = $3; } elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\tm$/o) { reindex_oid($self, $sync, $git, $1); } elsif (/\A:\d{6} 100644 $x40 ($x40) [AM]\td$/o) { - mark_deleted($self, $sync, $git, $1); + # allow re-add if there was user error + my $oid = $1; + if ($D) { + my $oid_bin = pack('H*', $oid); + my $nr = --$D->{$oid_bin}; + delete($D->{$oid_bin}) if $nr <= 0; + } + unindex_oid($self, $git, $oid); } } close $fh or die "git log failed: \$?=$?"; @@ -1310,15 +1231,12 @@ sub index_sync { $self->idx_init($opt); # acquire lock $self->{over}->rethread_prepare($opt); my $sync = { - D => {}, # "$mid\0$chash" => $oid unindex_range => {}, # EPOCH => oid_old..oid_new reindex => $opt->{reindex}, -opt => $opt }; $sync->{ranges} = sync_ranges($self, $sync, $epoch_max); - $sync->{regen} = sync_prepare($self, $sync, $epoch_max); - - if ($sync->{regen}) { + if (sync_prepare($self, $sync, $epoch_max)) { # tmp_clone seems to fail if inside a transaction, so # we rollback here (because we opened {mm} for reading) # Note: we do NOT rely on DBI transactions for atomicity; @@ -1328,43 +1246,8 @@ sub index_sync { $sync->{mm_tmp} = $self->{mm}->tmp_clone; } - # work backwards through history - for (my $i = $epoch_max; $i >= 0; $i--) { - index_epoch($self, $sync, $i); - } - - # unindex is required for leftovers if "deletes" affect messages - # in a previous fetch+index window: - my $git; - if (my @leftovers = values %{delete $sync->{D}}) { - $git = $self->{-inbox}->git; - for my $oid (@leftovers) { - $self->{current_info} = "leftover $oid"; - unindex_oid($self, $git, $oid); - } - } - if (my $multi_mid = delete $sync->{multi_mid}) { - $git //= $self->{-inbox}->git; - my $min = $multi_mid->{min}; - my $max = $multi_mid->{max}; - if ($sync->{reindex}) { - # we may need to create new Message-IDs if mirrors - # were initially indexed with old versions - for (my $i = $max; $i >= $min; $i--) { - my $oid; - $oid = $multi_mid->get_oid($i, $self) or next; - next unless defined $oid; - reindex_oid_m($self, $sync, $git, $oid); - } - } else { # regen on initial index - for my $num ($min..$max) { - my $oid; - $oid = $multi_mid->get_oid($num, $self) or next; - reindex_oid_m($self, $sync, $git, $oid, $num); - } - } - } - $git->cleanup if $git; + # work forwards through history + index_epoch($self, $sync, $_) for (0..$epoch_max); $self->done; if (my $nr = $sync->{nr}) {