From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 735421F5AE for ; Sun, 23 May 2021 08:01:16 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH] lei : set \Recent on non-empty mbox and Maildir Date: Sun, 23 May 2021 08:01:16 +0000 Message-Id: <20210523080116.15184-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Despite JMAP not supporting the equivalent of the IMAP \Recent flag, it is useful for "lei q --augment", and "lei up" users to be able to distinguish new results from old-but-unread messages in an mbox or Maildir. For mbox family messages, we'll drop the "O" status flag when appending to mboxes, and we'll write to the "new" subdirectory of Maildirs. Behavior when writing to initially empty Maildirs and mboxes remains unchanged since there's no need to distinguish between new and old results in the initial case. Having users wait for a rename(2) storm or complete mbox rewrite hurts UX. With IMAP mailboxes, \Recent is already enforced by the IMAP server and IMAP clients have no way of changing it(*) (*) mutt uses the "Old" IMAP flag which isn't part of RFC 3501, other MUAs may do similar things. --- lib/PublicInbox/LeiDedupe.pm | 6 +++++ lib/PublicInbox/LeiSavedSearch.pm | 7 ++++++ lib/PublicInbox/LeiToMail.pm | 42 +++++++++++++++++++++---------- lib/PublicInbox/LeiXSearch.pm | 15 +++++++++-- t/lei-q-kw.t | 13 ++++++++-- t/lei-q-save.t | 4 +-- t/lei_to_mail.t | 16 ++++++------ 7 files changed, 75 insertions(+), 28 deletions(-) diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm index 378f748e..ed52e417 100644 --- a/lib/PublicInbox/LeiDedupe.pm +++ b/lib/PublicInbox/LeiDedupe.pm @@ -127,4 +127,10 @@ sub pause_dedupe { delete($skv->{dbh}) if $skv; } +sub dedupe_nr { + my $skv = $_[0]->[0] or return undef; + my @n = $skv->count; + $n[0]; +} + 1; diff --git a/lib/PublicInbox/LeiSavedSearch.pm b/lib/PublicInbox/LeiSavedSearch.pm index 01b987d1..48d252f1 100644 --- a/lib/PublicInbox/LeiSavedSearch.pm +++ b/lib/PublicInbox/LeiSavedSearch.pm @@ -309,6 +309,13 @@ E: rename($dir_old, $dir_new) error: $! EOM } +# cf. LeiDedupe->dedupe_nr +sub dedupe_nr { + my $oidx = $_[0]->{oidx} // die 'BUG: no {oidx}'; + my @n = $oidx->{dbh}->selectrow_array('SELECT COUNT(*) FROM over'); + $n[0]; +} + no warnings 'once'; *nntp_url = \&cloneurl; *base_url = \&PublicInbox::Inbox::base_url; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index f3c03969..ad6b9439 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -41,11 +41,14 @@ sub _mbox_hdr_buf ($$$) { warn "# keyword `$k' not supported for mbox\n"; } } - # Messages are always 'O' (non-\Recent in IMAP), it saves - # MUAs the trouble of rewriting the mbox if no other - # changes are made. We put 'O' at the end (e.g. "Status: RO") - # to match mutt(1) output. - $eml->header_set('Status', join('', sort(@{$hdr{Status}})). 'O'); + # When writing to empty mboxes, messages are always 'O' + # (not-\Recent in IMAP), it saves MUAs the trouble of + # rewriting the mbox if no other changes are made. + # We put 'O' at the end (e.g. "Status: RO") to match mutt(1) output. + # We only set smsg->{-recent} if augmenting existing stores. + my $status = join('', sort(@{$hdr{Status}})); + $status .= 'O' unless $smsg->{-recent}; + $eml->header_set('Status', $status) if $status; if (my $chars = delete $hdr{'X-Status'}) { $eml->header_set('X-Status', join('', sort(@$chars))); } @@ -196,11 +199,13 @@ sub _mbox_write_cb ($$) { my $dedupe = $lei->{dedupe}; $dedupe->prepare_dedupe; my $lse = $lei->{lse}; # may be undef + my $set_recent = $dedupe->dedupe_nr; sub { # for git_to_mail my ($buf, $smsg, $eml) = @_; $eml //= PublicInbox::Eml->new($buf); return if $dedupe->is_dup($eml, $smsg); $lse->xsmsg_vmd($smsg) if $lse; + $smsg->{-recent} = 1 if $set_recent; $buf = $eml2mbox->($eml, $smsg); if ($atomic_append) { atomic_append($lei, $buf); @@ -248,8 +253,8 @@ sub kw2suffix ($;@) { join('', sort(map { $kw2char{$_} // () } @$kw, @_)); } -sub _buf2maildir { - my ($dst, $buf, $smsg) = @_; +sub _buf2maildir ($$$$) { + my ($dst, $buf, $smsg, $dir) = @_; my $kw = $smsg->{kw} // []; my $rand = ''; # chosen by die roll :P my ($tmp, $fh, $base, $ok); @@ -260,11 +265,7 @@ sub _buf2maildir { } while (!($ok = sysopen($fh, $tmp, O_CREAT|O_EXCL|O_WRONLY)) && $!{EEXIST} && ($rand = _rand.',')); if ($ok && print $fh $$buf and close($fh)) { - # ignore new/ and write only to cur/, otherwise MUAs - # with R/W access to the Maildir will end up doing - # a mass rename which can take a while with thousands - # of messages. - $dst .= 'cur/'; + $dst .= $dir; # 'new/' or 'cur/' $rand = ''; do { $base = $rand.$common.':2,'.kw2suffix($kw); @@ -289,6 +290,11 @@ sub _maildir_write_cb ($$) { my $lse = $lei->{lse}; # may be undef my $sto = $lei->{opt}->{'mail-sync'} ? $lei->{sto} : undef; my $out = $sto ? 'maildir:'.$lei->rel2abs($dst) : undef; + + # Favor cur/ and only write to new/ when augmenting. This + # saves MUAs from having to do a mass rename when the initial + # search result set is huge. + my $dir = $dedupe && $dedupe->dedupe_nr ? 'new/' : 'cur/'; sub { # for git_to_mail my ($bref, $smsg, $eml) = @_; $dst // return $lei->fail; # dst may be undef-ed in last run @@ -296,7 +302,8 @@ sub _maildir_write_cb ($$) { PublicInbox::Eml->new($$bref), $smsg); $lse->xsmsg_vmd($smsg) if $lse; - my $n = _buf2maildir($dst, $bref // \($eml->as_string), $smsg); + my $n = _buf2maildir($dst, $bref // \($eml->as_string), + $smsg, $dir); $sto->ipc_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto; ++$lei->{-nr_write}; } @@ -648,7 +655,16 @@ sub do_post_auth { $lei->{1} = $zpipe->[1]; close $zpipe->[0]; } + my $au_peers = delete $self->{au_peers}; + if ($au_peers) { # wait for peer l2m to finish augmenting: + $au_peers->[1] = undef; + sysread($au_peers->[0], my $barrier1, 1); + } $self->{wcb} = $self->write_cb($lei); + if ($au_peers) { # wait for peer l2m to set write_cb + $au_peers->[3] = undef; + sysread($au_peers->[2], my $barrier2, 1); + } } sub ipc_atfork_child { diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index e69a4edd..3482082d 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -482,11 +482,22 @@ sub do_query { if ($lei->{opt}->{augment} && delete $lei->{early_mua}) { $lei->start_mua; } + my $F_SETPIPE_SZ = $^O eq 'linux' ? 1031 : undef; + if ($l2m->{-wq_nr_workers} > 1 && + $l2m->{base_type} =~ /\A(?:maildir|mbox)\z/) { + # setup two barriers to coordinate dedupe_nr + # between l2m workers + pipe(my ($a_r, $a_w)) or die "pipe: $!"; + fcntl($a_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ; + pipe(my ($b_r, $b_w)) or die "pipe: $!"; + fcntl($b_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ; + $l2m->{au_peers} = [ $a_r, $a_w, $b_r, $b_w ]; + } $l2m->wq_workers_start('lei2mail', undef, $lei->oldset, { lei => $lei }); pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!"; - # 1031: F_SETPIPE_SZ - fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux'; + fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ; + delete $l2m->{au_peers}; } $self->wq_workers_start('lei_xsearch', undef, $lei->oldset, { lei => $lei }); diff --git a/t/lei-q-kw.t b/t/lei-q-kw.t index c00a0a43..074c573d 100644 --- a/t/lei-q-kw.t +++ b/t/lei-q-kw.t @@ -14,7 +14,6 @@ my $exp = { '' => eml_load('t/utf8.eml'), }; $exp->{''}->header_set('Status', 'RO'); -$exp->{''}->header_set('Status', 'O'); test_lei(sub { lei_ok(qw(import -F eml t/plack-qp.eml)); @@ -105,7 +104,17 @@ for my $sfx ('', '.gz') { my %res; PublicInbox::MboxReader->mboxrd($fh, sub { my ($eml) = @_; - $res{$eml->header_raw('Message-ID')} = $eml; + my $mid = $eml->header_raw('Message-ID'); + if ($mid eq '') { + is_deeply([$eml->header('Status')], [], + "no status $sfx"); + $eml->header_set('Status'); + } elsif ($mid eq '') { + is($eml->header('Status'), 'RO', 'status preserved'); + } else { + fail("unknown mid $mid"); + } + $res{$mid} = $eml; }); is_deeply(\%res, $exp, '--augment worked'); diff --git a/t/lei-q-save.t b/t/lei-q-save.t index 753d5b20..aed38a51 100644 --- a/t/lei-q-save.t +++ b/t/lei-q-save.t @@ -42,7 +42,7 @@ test_lei(sub { lei_ok qw(up -q md -C), $home; lei_ok qw(up -q . -C), "$home/md"; lei_ok qw(up -q), "/$home/md"; - my %after = map { $_ => 1 } glob("$home/md/cur/*"); + my %after = map { $_ => 1 } glob("$home/md/{new,cur}/*"); is(delete $after{(keys(%before))[0]}, 1, 'original message kept'); is(scalar(keys %after), 1, 'one new message added'); is_deeply(eml_load((keys %after)[0]), $doc2, 'doc2 matches'); @@ -155,7 +155,7 @@ test_lei(sub { $im->add(PublicInbox::Eml->new($diff)); $im->done; lei_ok('up', $o); - @m = glob("$o/cur/*"); + @m = glob("$o/{new,cur}/*"); is(scalar(@m), 2, 'got 2nd result due to different OID'); SKIP: { diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t index 32532a98..35904706 100644 --- a/t/lei_to_mail.t +++ b/t/lei_to_mail.t @@ -90,7 +90,7 @@ my $fn = "$tmpdir/x.mbox"; my ($mbox) = shuffle(@MBOX); # pick one, shouldn't matter my $wcb_get = sub { my ($fmt, $dst) = @_; - delete $lei->{dedupe}; + delete $lei->{dedupe}; # to be recreated $lei->{ovv} = bless { fmt => $fmt, dst => $dst @@ -119,13 +119,12 @@ my $orig = do { like($raw, qr/^blah\n/sm, 'wrote content'); unlink $fn or BAIL_OUT $!; - local $lei->{opt} = { jobs => 2 }; $wcb = $wcb_get->($mbox, $fn); ok(-f $fn && !-s _, 'truncated mbox destination'); $wcb->(\($dup = $buf), $deadbeef); $commit->($wcb); open $fh, '<', $fn or BAIL_OUT $!; - is(do { local $/; <$fh> }, $raw, 'jobs > 1'); + is(do { local $/; <$fh> }, $raw, 'wrote identical content'); $raw; }; @@ -158,21 +157,20 @@ for my $zsfx (qw(gz bz2 xz)) { ok($dc_cmd, "decompressor for .$zsfx"); my $f = "$fn.$zsfx"; my $wcb = $wcb_get->($mbox, $f); - $wcb->(\(my $dup = $buf), $deadbeef); + $wcb->(\(my $dup = $buf), { %$deadbeef }); $commit->($wcb); my $uncompressed = xqx([@$dc_cmd, $f]); is($uncompressed, $orig, "$zsfx works unlocked"); - local $lei->{opt} = { jobs => 2 }; # for atomic writes unlink $f or BAIL_OUT "unlink $!"; $wcb = $wcb_get->($mbox, $f); - $wcb->(\($dup = $buf), $deadbeef); + $wcb->(\($dup = $buf), { %$deadbeef }); $commit->($wcb); is(xqx([@$dc_cmd, $f]), $orig, "$zsfx matches with lock"); local $lei->{opt} = { augment => 1 }; $wcb = $wcb_get->($mbox, $f); - $wcb->(\($dup = $buf . "\nx\n"), $deadbeef); + $wcb->(\($dup = $buf . "\nx\n"), { %$deadbeef }); $commit->($wcb); my $cat = popen_rd([@$dc_cmd, $f]); @@ -182,9 +180,9 @@ for my $zsfx (qw(gz bz2 xz)) { like($raw[1], qr/\nblah\n\nx\n\z/s, "augmented $zsfx"); like($raw[0], qr/\nblah\n\z/s, "original preserved $zsfx"); - local $lei->{opt} = { augment => 1, jobs => 2 }; + local $lei->{opt} = { augment => 1 }; $wcb = $wcb_get->($mbox, $f); - $wcb->(\($dup = $buf . "\ny\n"), $deadbeef); + $wcb->(\($dup = $buf . "\ny\n"), { %$deadbeef }); $commit->($wcb); my @raw3;