unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
* [PATCH] lei <q|up>: set \Recent on non-empty mbox and Maildir
@ 2021-05-23  8:01 Eric Wong
  0 siblings, 0 replies; only message in thread
From: Eric Wong @ 2021-05-23  8:01 UTC (permalink / raw)
  To: meta

Despite JMAP not supporting the equivalent of the IMAP \Recent
flag, it is useful for "lei q --augment", and "lei up" users to
be able to distinguish new results from old-but-unread messages
in an mbox or Maildir.

For mbox family messages, we'll drop the "O" status flag when
appending to mboxes, and we'll write to the "new" subdirectory
of Maildirs.

Behavior when writing to initially empty Maildirs and mboxes
remains unchanged since there's no need to distinguish between
new and old results in the initial case.  Having users wait
for a rename(2) storm or complete mbox rewrite hurts UX.

With IMAP mailboxes, \Recent is already enforced by the IMAP
server and IMAP clients have no way of changing it(*)

(*) mutt uses the "Old" IMAP flag which isn't part of RFC 3501,
    other MUAs may do similar things.
---
 lib/PublicInbox/LeiDedupe.pm      |  6 +++++
 lib/PublicInbox/LeiSavedSearch.pm |  7 ++++++
 lib/PublicInbox/LeiToMail.pm      | 42 +++++++++++++++++++++----------
 lib/PublicInbox/LeiXSearch.pm     | 15 +++++++++--
 t/lei-q-kw.t                      | 13 ++++++++--
 t/lei-q-save.t                    |  4 +--
 t/lei_to_mail.t                   | 16 ++++++------
 7 files changed, 75 insertions(+), 28 deletions(-)

diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm
index 378f748e..ed52e417 100644
--- a/lib/PublicInbox/LeiDedupe.pm
+++ b/lib/PublicInbox/LeiDedupe.pm
@@ -127,4 +127,10 @@ sub pause_dedupe {
 	delete($skv->{dbh}) if $skv;
 }
 
+sub dedupe_nr {
+	my $skv = $_[0]->[0] or return undef;
+	my @n = $skv->count;
+	$n[0];
+}
+
 1;
diff --git a/lib/PublicInbox/LeiSavedSearch.pm b/lib/PublicInbox/LeiSavedSearch.pm
index 01b987d1..48d252f1 100644
--- a/lib/PublicInbox/LeiSavedSearch.pm
+++ b/lib/PublicInbox/LeiSavedSearch.pm
@@ -309,6 +309,13 @@ E: rename($dir_old, $dir_new) error: $!
 EOM
 }
 
+# cf. LeiDedupe->dedupe_nr
+sub dedupe_nr {
+	my $oidx = $_[0]->{oidx} // die 'BUG: no {oidx}';
+	my @n = $oidx->{dbh}->selectrow_array('SELECT COUNT(*) FROM over');
+	$n[0];
+}
+
 no warnings 'once';
 *nntp_url = \&cloneurl;
 *base_url = \&PublicInbox::Inbox::base_url;
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index f3c03969..ad6b9439 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -41,11 +41,14 @@ sub _mbox_hdr_buf ($$$) {
 			warn "# keyword `$k' not supported for mbox\n";
 		}
 	}
-	# Messages are always 'O' (non-\Recent in IMAP), it saves
-	# MUAs the trouble of rewriting the mbox if no other
-	# changes are made.  We put 'O' at the end (e.g. "Status: RO")
-	# to match mutt(1) output.
-	$eml->header_set('Status', join('', sort(@{$hdr{Status}})). 'O');
+	# When writing to empty mboxes, messages are always 'O'
+	# (not-\Recent in IMAP), it saves MUAs the trouble of
+	# rewriting the mbox if no other changes are made.
+	# We put 'O' at the end (e.g. "Status: RO") to match mutt(1) output.
+	# We only set smsg->{-recent} if augmenting existing stores.
+	my $status = join('', sort(@{$hdr{Status}}));
+	$status .= 'O' unless $smsg->{-recent};
+	$eml->header_set('Status', $status) if $status;
 	if (my $chars = delete $hdr{'X-Status'}) {
 		$eml->header_set('X-Status', join('', sort(@$chars)));
 	}
@@ -196,11 +199,13 @@ sub _mbox_write_cb ($$) {
 	my $dedupe = $lei->{dedupe};
 	$dedupe->prepare_dedupe;
 	my $lse = $lei->{lse}; # may be undef
+	my $set_recent = $dedupe->dedupe_nr;
 	sub { # for git_to_mail
 		my ($buf, $smsg, $eml) = @_;
 		$eml //= PublicInbox::Eml->new($buf);
 		return if $dedupe->is_dup($eml, $smsg);
 		$lse->xsmsg_vmd($smsg) if $lse;
+		$smsg->{-recent} = 1 if $set_recent;
 		$buf = $eml2mbox->($eml, $smsg);
 		if ($atomic_append) {
 			atomic_append($lei, $buf);
@@ -248,8 +253,8 @@ sub kw2suffix ($;@) {
 	join('', sort(map { $kw2char{$_} // () } @$kw, @_));
 }
 
-sub _buf2maildir {
-	my ($dst, $buf, $smsg) = @_;
+sub _buf2maildir ($$$$) {
+	my ($dst, $buf, $smsg, $dir) = @_;
 	my $kw = $smsg->{kw} // [];
 	my $rand = ''; # chosen by die roll :P
 	my ($tmp, $fh, $base, $ok);
@@ -260,11 +265,7 @@ sub _buf2maildir {
 	} while (!($ok = sysopen($fh, $tmp, O_CREAT|O_EXCL|O_WRONLY)) &&
 		$!{EEXIST} && ($rand = _rand.','));
 	if ($ok && print $fh $$buf and close($fh)) {
-		# ignore new/ and write only to cur/, otherwise MUAs
-		# with R/W access to the Maildir will end up doing
-		# a mass rename which can take a while with thousands
-		# of messages.
-		$dst .= 'cur/';
+		$dst .= $dir; # 'new/' or 'cur/'
 		$rand = '';
 		do {
 			$base = $rand.$common.':2,'.kw2suffix($kw);
@@ -289,6 +290,11 @@ sub _maildir_write_cb ($$) {
 	my $lse = $lei->{lse}; # may be undef
 	my $sto = $lei->{opt}->{'mail-sync'} ? $lei->{sto} : undef;
 	my $out = $sto ? 'maildir:'.$lei->rel2abs($dst) : undef;
+
+	# Favor cur/ and only write to new/ when augmenting.  This
+	# saves MUAs from having to do a mass rename when the initial
+	# search result set is huge.
+	my $dir = $dedupe && $dedupe->dedupe_nr ? 'new/' : 'cur/';
 	sub { # for git_to_mail
 		my ($bref, $smsg, $eml) = @_;
 		$dst // return $lei->fail; # dst may be undef-ed in last run
@@ -296,7 +302,8 @@ sub _maildir_write_cb ($$) {
 						PublicInbox::Eml->new($$bref),
 						$smsg);
 		$lse->xsmsg_vmd($smsg) if $lse;
-		my $n = _buf2maildir($dst, $bref // \($eml->as_string), $smsg);
+		my $n = _buf2maildir($dst, $bref // \($eml->as_string),
+					$smsg, $dir);
 		$sto->ipc_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto;
 		++$lei->{-nr_write};
 	}
@@ -648,7 +655,16 @@ sub do_post_auth {
 		$lei->{1} = $zpipe->[1];
 		close $zpipe->[0];
 	}
+	my $au_peers = delete $self->{au_peers};
+	if ($au_peers) { # wait for peer l2m to finish augmenting:
+		$au_peers->[1] = undef;
+		sysread($au_peers->[0], my $barrier1, 1);
+	}
 	$self->{wcb} = $self->write_cb($lei);
+	if ($au_peers) { # wait for peer l2m to set write_cb
+		$au_peers->[3] = undef;
+		sysread($au_peers->[2], my $barrier2, 1);
+	}
 }
 
 sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index e69a4edd..3482082d 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -482,11 +482,22 @@ sub do_query {
 		if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
 			$lei->start_mua;
 		}
+		my $F_SETPIPE_SZ = $^O eq 'linux' ? 1031 : undef;
+		if ($l2m->{-wq_nr_workers} > 1 &&
+				$l2m->{base_type} =~ /\A(?:maildir|mbox)\z/) {
+			# setup two barriers to coordinate dedupe_nr
+			# between l2m workers
+			pipe(my ($a_r, $a_w)) or die "pipe: $!";
+			fcntl($a_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
+			pipe(my ($b_r, $b_w)) or die "pipe: $!";
+			fcntl($b_r, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
+			$l2m->{au_peers} = [ $a_r, $a_w, $b_r, $b_w ];
+		}
 		$l2m->wq_workers_start('lei2mail', undef,
 					$lei->oldset, { lei => $lei });
 		pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
-		# 1031: F_SETPIPE_SZ
-		fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux';
+		fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
+		delete $l2m->{au_peers};
 	}
 	$self->wq_workers_start('lei_xsearch', undef,
 				$lei->oldset, { lei => $lei });
diff --git a/t/lei-q-kw.t b/t/lei-q-kw.t
index c00a0a43..074c573d 100644
--- a/t/lei-q-kw.t
+++ b/t/lei-q-kw.t
@@ -14,7 +14,6 @@ my $exp = {
 	'<testmessage@example.com>' => eml_load('t/utf8.eml'),
 };
 $exp->{'<qp@example.com>'}->header_set('Status', 'RO');
-$exp->{'<testmessage@example.com>'}->header_set('Status', 'O');
 
 test_lei(sub {
 lei_ok(qw(import -F eml t/plack-qp.eml));
@@ -105,7 +104,17 @@ for my $sfx ('', '.gz') {
 	my %res;
 	PublicInbox::MboxReader->mboxrd($fh, sub {
 		my ($eml) = @_;
-		$res{$eml->header_raw('Message-ID')} = $eml;
+		my $mid = $eml->header_raw('Message-ID');
+		if ($mid eq '<testmessage@example.com>') {
+			is_deeply([$eml->header('Status')], [],
+				"no status $sfx");
+			$eml->header_set('Status');
+		} elsif ($mid eq '<qp@example.com>') {
+			is($eml->header('Status'), 'RO', 'status preserved');
+		} else {
+			fail("unknown mid $mid");
+		}
+		$res{$mid} = $eml;
 	});
 	is_deeply(\%res, $exp, '--augment worked');
 
diff --git a/t/lei-q-save.t b/t/lei-q-save.t
index 753d5b20..aed38a51 100644
--- a/t/lei-q-save.t
+++ b/t/lei-q-save.t
@@ -42,7 +42,7 @@ test_lei(sub {
 	lei_ok qw(up -q md -C), $home;
 	lei_ok qw(up -q . -C), "$home/md";
 	lei_ok qw(up -q), "/$home/md";
-	my %after = map { $_ => 1 } glob("$home/md/cur/*");
+	my %after = map { $_ => 1 } glob("$home/md/{new,cur}/*");
 	is(delete $after{(keys(%before))[0]}, 1, 'original message kept');
 	is(scalar(keys %after), 1, 'one new message added');
 	is_deeply(eml_load((keys %after)[0]), $doc2, 'doc2 matches');
@@ -155,7 +155,7 @@ test_lei(sub {
 	$im->add(PublicInbox::Eml->new($diff));
 	$im->done;
 	lei_ok('up', $o);
-	@m = glob("$o/cur/*");
+	@m = glob("$o/{new,cur}/*");
 	is(scalar(@m), 2, 'got 2nd result due to different OID');
 
 	SKIP: {
diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t
index 32532a98..35904706 100644
--- a/t/lei_to_mail.t
+++ b/t/lei_to_mail.t
@@ -90,7 +90,7 @@ my $fn = "$tmpdir/x.mbox";
 my ($mbox) = shuffle(@MBOX); # pick one, shouldn't matter
 my $wcb_get = sub {
 	my ($fmt, $dst) = @_;
-	delete $lei->{dedupe};
+	delete $lei->{dedupe}; # to be recreated
 	$lei->{ovv} = bless {
 		fmt => $fmt,
 		dst => $dst
@@ -119,13 +119,12 @@ my $orig = do {
 	like($raw, qr/^blah\n/sm, 'wrote content');
 	unlink $fn or BAIL_OUT $!;
 
-	local $lei->{opt} = { jobs => 2 };
 	$wcb = $wcb_get->($mbox, $fn);
 	ok(-f $fn && !-s _, 'truncated mbox destination');
 	$wcb->(\($dup = $buf), $deadbeef);
 	$commit->($wcb);
 	open $fh, '<', $fn or BAIL_OUT $!;
-	is(do { local $/; <$fh> }, $raw, 'jobs > 1');
+	is(do { local $/; <$fh> }, $raw, 'wrote identical content');
 	$raw;
 };
 
@@ -158,21 +157,20 @@ for my $zsfx (qw(gz bz2 xz)) {
 		ok($dc_cmd, "decompressor for .$zsfx");
 		my $f = "$fn.$zsfx";
 		my $wcb = $wcb_get->($mbox, $f);
-		$wcb->(\(my $dup = $buf), $deadbeef);
+		$wcb->(\(my $dup = $buf), { %$deadbeef });
 		$commit->($wcb);
 		my $uncompressed = xqx([@$dc_cmd, $f]);
 		is($uncompressed, $orig, "$zsfx works unlocked");
 
-		local $lei->{opt} = { jobs => 2 }; # for atomic writes
 		unlink $f or BAIL_OUT "unlink $!";
 		$wcb = $wcb_get->($mbox, $f);
-		$wcb->(\($dup = $buf), $deadbeef);
+		$wcb->(\($dup = $buf), { %$deadbeef });
 		$commit->($wcb);
 		is(xqx([@$dc_cmd, $f]), $orig, "$zsfx matches with lock");
 
 		local $lei->{opt} = { augment => 1 };
 		$wcb = $wcb_get->($mbox, $f);
-		$wcb->(\($dup = $buf . "\nx\n"), $deadbeef);
+		$wcb->(\($dup = $buf . "\nx\n"), { %$deadbeef });
 		$commit->($wcb);
 
 		my $cat = popen_rd([@$dc_cmd, $f]);
@@ -182,9 +180,9 @@ for my $zsfx (qw(gz bz2 xz)) {
 		like($raw[1], qr/\nblah\n\nx\n\z/s, "augmented $zsfx");
 		like($raw[0], qr/\nblah\n\z/s, "original preserved $zsfx");
 
-		local $lei->{opt} = { augment => 1, jobs => 2 };
+		local $lei->{opt} = { augment => 1 };
 		$wcb = $wcb_get->($mbox, $f);
-		$wcb->(\($dup = $buf . "\ny\n"), $deadbeef);
+		$wcb->(\($dup = $buf . "\ny\n"), { %$deadbeef });
 		$commit->($wcb);
 
 		my @raw3;

^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2021-05-23  8:01 UTC | newest]

Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-05-23  8:01 [PATCH] lei <q|up>: set \Recent on non-empty mbox and Maildir Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).