From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-3.4 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, NUMERIC_HTTP_ADDR,WEIRD_PORT shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 3C0E41FA01 for ; Wed, 3 Mar 2021 13:48:58 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 4/4] lei q: import flags when clobbering/augmenting Maildirs Date: Wed, 3 Mar 2021 13:48:57 +0000 Message-Id: <20210303134857.7227-5-e@80x24.org> In-Reply-To: <20210303134857.7227-1-e@80x24.org> References: <20210303134857.7227-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This will eventually be supported for other mail stores, but Maildir is the easiest to test and support, here. This lets us avoid a situation where flag changes get lost between search results. --- MANIFEST | 1 + lib/PublicInbox/ExtSearchIdx.pm | 1 + lib/PublicInbox/LEI.pm | 2 +- lib/PublicInbox/LeiQuery.pm | 5 +++- lib/PublicInbox/LeiSearch.pm | 47 +++++++++++++++++++++++++++++++++ lib/PublicInbox/LeiStore.pm | 27 +++++++++---------- lib/PublicInbox/LeiToMail.pm | 33 ++++++++++++++++++----- lib/PublicInbox/LeiXSearch.pm | 2 +- t/lei-q-kw.t | 33 +++++++++++++++++++++++ t/lei.t | 3 ++- t/lei_store.t | 10 ++++++- 11 files changed, 137 insertions(+), 27 deletions(-) create mode 100644 t/lei-q-kw.t diff --git a/MANIFEST b/MANIFEST index 5044e21c..8c9c86a0 100644 --- a/MANIFEST +++ b/MANIFEST @@ -375,6 +375,7 @@ t/lei-import-nntp.t t/lei-import.t t/lei-mirror.t t/lei-p2q.t +t/lei-q-kw.t t/lei-q-remote-import.t t/lei-q-thread.t t/lei.t diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index d0c9c2f7..a17e7579 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -1128,5 +1128,6 @@ no warnings 'once'; *atfork_child = \&PublicInbox::V2Writable::atfork_child; *idx_shard = \&PublicInbox::V2Writable::idx_shard; *reindex_checkpoint = \&PublicInbox::V2Writable::reindex_checkpoint; +*checkpoint = \&PublicInbox::V2Writable::checkpoint; 1; diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 834e399f..1e5b04ca 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -113,7 +113,7 @@ our %CMD = ( # sorted in order of importance/use: qw(save-as=s output|mfolder|o=s format|f=s dedupe|d=s threads|t+ sort|s=s reverse|r offset=i remote! local! external! pretty include|I=s@ exclude=s@ only=s@ jobs|j=s globoff|g augment|a - import-remote! lock=s@ + import-remote! import-augment! lock=s@ alert=s@ mua=s no-torsocks torsocks=s verbose|v+ quiet|q C=s@), PublicInbox::LeiQuery::curl_opt(), opt_dash('limit|n=i', '[0-9]+') ], diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index b57d1cc5..c630d628 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -51,7 +51,10 @@ sub lei_q { # we'll allow "--only $LOCATION --local" my $sto = $self->_lei_store(1); my $lse = $sto->search; - $sto->write_prepare($self) if $opt->{'import-remote'} //= 1; + if (($opt->{'import-remote'} //= 1) | + ($opt->{'import-augment'} //= 1)) { + $sto->write_prepare($self); + } if ($opt->{'local'} //= scalar(@only) ? 0 : 1) { $lxs->prepare_external($lse); } diff --git a/lib/PublicInbox/LeiSearch.pm b/lib/PublicInbox/LeiSearch.pm index 440bacf5..ceb3624b 100644 --- a/lib/PublicInbox/LeiSearch.pm +++ b/lib/PublicInbox/LeiSearch.pm @@ -1,11 +1,14 @@ # Copyright (C) 2020-2021 all contributors # License: AGPL-3.0+ +# read-only counterpart for PublicInbox::LeiStore package PublicInbox::LeiSearch; use strict; use v5.10.1; use parent qw(PublicInbox::ExtSearch); use PublicInbox::Search qw(xap_terms); +use PublicInbox::ContentHash qw(content_digest content_hash); +use PublicInbox::MID qw(mids mids_in); # get combined docid from over.num: # (not generic Xapian, only works with our sharding scheme) @@ -24,4 +27,48 @@ sub msg_keywords { wantarray ? sort(keys(%$kw)) : $kw; } +# when a message has no Message-IDs at all, this is needed for +# unsent Draft messages, at least +sub content_key ($) { + my ($eml) = @_; + my $dig = content_digest($eml); + my $chash = $dig->clone->digest; + my $mids = mids_in($eml, + qw(Message-ID X-Alt-Message-ID Resent-Message-ID)); + unless (@$mids) { + $eml->{-lei_fake_mid} = $mids->[0] = + PublicInbox::Import::digest2mid($dig, $eml); + } + ($chash, $mids); +} + +sub _cmp_1st { # git->cat_async callback + my ($bref, $oid, $type, $size, $cmp) = @_; # cmp: [chash, found, smsg] + return if defined($cmp->[1]->[0]); # $found->[0] + if (content_hash(PublicInbox::Eml->new($bref)) eq $cmp->[0]) { + push @{$cmp->[1]}, $cmp->[2]->{num}; + } +} + +# returns true if $eml is indexed by lei/store and keywords don't match +sub kw_changed { + my ($self, $eml, $new_kw_sorted) = @_; + my ($chash, $mids) = content_key($eml); + my $over = $self->over; + my $git = $self->git; + my $found = []; + for my $mid (@$mids) { + my ($id, $prev); + while (my $cur = $over->next_by_mid($mid, \$id, \$prev)) { + $git->cat_async($cur->{blob}, \&_cmp_1st, + [ $chash, $found, $cur ]); + last if scalar(@$found); + } + } + $git->cat_async_wait; + my $num = $found->[0] // return; + my @cur_kw = msg_keywords($self, $num); + join("\0", @$new_kw_sorted) eq join("\0", @cur_kw) ? 0 : 1; +} + 1; diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm index 77601828..92c29100 100644 --- a/lib/PublicInbox/LeiStore.pm +++ b/lib/PublicInbox/LeiStore.pm @@ -14,8 +14,8 @@ use PublicInbox::ExtSearchIdx; use PublicInbox::Import; use PublicInbox::InboxWritable qw(eml_from_path); use PublicInbox::V2Writable; -use PublicInbox::ContentHash qw(content_hash content_digest); -use PublicInbox::MID qw(mids mids_in); +use PublicInbox::ContentHash qw(content_hash); +use PublicInbox::MID qw(mids); use PublicInbox::LeiSearch; use PublicInbox::MDA; use List::Util qw(max); @@ -104,25 +104,13 @@ sub eidx_init { $eidx; } -# when a message has no Message-IDs at all, this is needed for -# unsent Draft messages, at least -sub _fake_mid_for ($$) { - my ($eml, $dig) = @_; - my $mids = mids_in($eml, qw(X-Alt-Message-ID Resent-Message-ID)); - $eml->{-lei_fake_mid} = - $mids->[0] // PublicInbox::Import::digest2mid($dig, $eml); -} - sub _docids_for ($$) { my ($self, $eml) = @_; my %docids; - my $dig = content_digest($eml); - my $chash = $dig->clone->digest; + my ($chash, $mids) = PublicInbox::LeiSearch::content_key($eml); my $eidx = eidx_init($self); my $oidx = $eidx->{oidx}; my $im = $self->{im}; - my $mids = mids($eml); - $mids->[0] //= _fake_mid_for($eml, $dig); for my $mid (@$mids) { my ($id, $prev); while (my $cur = $oidx->next_by_mid($mid, \$id, \$prev)) { @@ -183,6 +171,7 @@ sub mbox_keywords { sort(keys %kw); } +# TODO: move this to MdirReader, maybe... # cf: https://cr.yp.to/proto/maildir.html my %c2kw = ('D' => 'draft', F => 'flagged', R => 'answered', S => 'seen'); sub maildir_keywords { @@ -230,6 +219,14 @@ sub set_eml_from_maildir { set_eml($self, $eml, $set_kw ? maildir_keywords($f) : ()); } +sub checkpoint { + my ($self, $wait) = @_; + if (my $im = $self->{im}) { + $wait ? $im->barrier : $im->checkpoint; + } + $self->{priv_eidx}->checkpoint($wait); +} + sub done { my ($self) = @_; my $err = ''; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 31b8aba8..3420b06e 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -267,8 +267,8 @@ sub _mbox_write_cb ($$) { } } -sub _augment_file { # maildir_each_eml cb - my ($f, undef, $eml, $lei, $mod, $shard) = @_; +sub _augment_or_unlink { # maildir_each_eml cb + my ($f, $kw, $eml, $lei, $lse, $mod, $shard, $unlink) = @_; if ($mod) { # can't get dirent.d_ino w/ pure Perl, so we extract the OID # if it looks like one: @@ -276,8 +276,16 @@ sub _augment_file { # maildir_each_eml cb $1 : sha256_hex($f); my $recno = hex(substr($hex, 0, 8)); return if ($recno % $mod) != $shard; + if ($lse) { + my $x = $lse->kw_changed($eml, $kw); + if ($x) { + $lei->{sto}->ipc_do('set_eml', $eml, @$kw); + } elsif (!defined($x)) { + # TODO: xkw + } + } } - _augment($eml, $lei); + $unlink ? unlink($f) : _augment($eml, $lei); } # maildir_each_file callback, \&CORE::unlink doesn't work with it @@ -419,20 +427,31 @@ sub _pre_augment_maildir { sub _do_augment_maildir { my ($self, $lei) = @_; my $dst = $lei->{ovv}->{dst}; + my $lse = $lei->{sto}->search if $lei->{opt}->{'import-augment'}; + my ($mod, $shard) = @{$self->{shard_info} // []}; if ($lei->{opt}->{augment}) { my $dedupe = $lei->{dedupe}; if ($dedupe && $dedupe->prepare_dedupe) { - my ($mod, $shard) = @{$self->{shard_info} // []}; PublicInbox::MdirReader::maildir_each_eml($dst, - \&_augment_file, - $lei, $mod, $shard); + \&_augment_or_unlink, + $lei, $lse, $mod, $shard); $dedupe->pause_dedupe; } - } else { # clobber existing Maildir + } elsif ($lse) { + PublicInbox::MdirReader::maildir_each_eml($dst, + \&_augment_or_unlink, + $lei, $lse, $mod, $shard, 1); + } else {# clobber existing Maildir PublicInbox::MdirReader::maildir_each_file($dst, \&_unlink); } } +sub _post_augment_maildir { + my ($self, $lei) = @_; + $lei->{opt}->{'import-augment'} or return; + my $wait = $lei->{sto}->ipc_do('checkpoint', 1); +} + sub _augment_imap { # PublicInbox::NetReader::imap_each cb my ($url, $uid, $kw, $eml, $lei) = @_; _augment($eml, $lei); diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index dcc48806..45815180 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -204,7 +204,7 @@ sub query_mset { # non-parallel for non-"--threads" users sub each_remote_eml { # callback for MboxReader->mboxrd my ($eml, $self, $lei, $each_smsg) = @_; - $lei->{sto}->ipc_do('add_eml', $eml) if $lei->{sto}; # --import-remote + $lei->{sto}->ipc_do('add_eml', $eml) if $lei->{opt}->{'import-remote'}; my $smsg = bless {}, 'PublicInbox::Smsg'; $smsg->populate($eml); $smsg->parse_references($eml, mids($eml)); diff --git a/t/lei-q-kw.t b/t/lei-q-kw.t new file mode 100644 index 00000000..97b2e08f --- /dev/null +++ b/t/lei-q-kw.t @@ -0,0 +1,33 @@ +#!perl -w +# Copyright (C) 2020-2021 all contributors +# License: AGPL-3.0+ +use strict; use v5.10.1; use PublicInbox::TestCommon; +test_lei(sub { +lei_ok(qw(import -F eml t/plack-qp.eml)); +my $o = "$ENV{HOME}/dst"; +lei_ok(qw(q -o), "maildir:$o", qw(m:qp@example.com)); +my @fn = glob("$o/cur/*:2,"); +scalar(@fn) == 1 or BAIL_OUT "wrote multiple or zero files: ".explain(\@fn); +rename($fn[0], "$fn[0]S") or BAIL_OUT "rename $!"; + +lei_ok(qw(q -o), "maildir:$o", qw(m:bogus-noresults@example.com)); +ok(!glob("$o/cur/*"), 'last result cleared after augment-import'); + +lei_ok(qw(q -o), "maildir:$o", qw(m:qp@example.com)); +@fn = glob("$o/cur/*:2,S"); +is(scalar(@fn), 1, "`seen' flag set on Maildir file"); + +# ensure --no-import-augment works +my $n = $fn[0]; +$n =~ s/,S\z/,RS/; +rename($fn[0], $n) or BAIL_OUT "rename $!"; +lei_ok(qw(q --no-import-augment -o), "maildir:$o", + qw(m:bogus-noresults@example.com)); +ok(!glob("$o/cur/*"), '--no-import-augment cleared destination'); +lei_ok(qw(q -o), "maildir:$o", qw(m:qp@example.com)); +@fn = glob("$o/cur/*:2,S"); +is(scalar(@fn), 1, "`seen' flag (but not `replied') set on Maildir file"); + +# TODO: other destination types +}); +done_testing; diff --git a/t/lei.t b/t/lei.t index ba179b39..74a775ca 100644 --- a/t/lei.t +++ b/t/lei.t @@ -138,7 +138,8 @@ SKIP: { lei(qw(q --only http://127.0.0.1:99999/bogus/ t:m)); is($? >> 8, 3, 'got curl exit for bogus URL'); lei(qw(q --only http://127.0.0.1:99999/bogus/ t:m -o), "$home/junk"); - is($? >> 8, 3, 'got curl exit for bogus URL with Maildir'); + is($? >> 8, 3, 'got curl exit for bogus URL with Maildir') or + diag $lei_err; is($lei_out, '', 'no output'); }; # /SKIP }; diff --git a/t/lei_store.t b/t/lei_store.t index e93fe779..1c3f7841 100644 --- a/t/lei_store.t +++ b/t/lei_store.t @@ -124,8 +124,16 @@ SKIP: { $ids = $sto->ipc_do('set_eml', $eml, qw(seen answered)); is_deeply($ids, [ $no_mid->{num} ], 'docid returned w/o mid w/o ipc'); $wait = $sto->ipc_do('done'); - @kw = $sto->search->msg_keywords($no_mid->{num}); + + my $lse = $sto->search; + @kw = $lse->msg_keywords($no_mid->{num}); is_deeply(\@kw, [qw(answered seen)], 'set changed kw w/o ipc'); + is($lse->kw_changed($eml, [qw(answered seen)]), 0, + 'kw_changed false when unchanged'); + is($lse->kw_changed($eml, [qw(answered seen flagged)]), 1, + 'kw_changed true when +flagged'); + is($lse->kw_changed(eml_load('t/plack-qp.eml'), ['seen']), undef, + 'kw_changed undef on unknown message'); } done_testing;