unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
* [PATCH 00/36] another round of lei stuff
@ 2020-12-31 13:51 Eric Wong
  2020-12-31 13:51 ` [PATCH 01/36] import: respect init.defaultBranch Eric Wong
                   ` (35 more replies)
  0 siblings, 36 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

This is against lei branch @ commit
0c8106d44f317175e122744b43407bf067183175 in
https://public-inbox.org/public-inbox.git

Infrastructure stuff for reading + writing local Maildirs and a
bunch of mbox formats are done (including gz/bz2/xz support)
and it's usage should be familiar to mairix(1) users.

Infrastructure for deduplication + augmenting search results
in place and tested.

Going to skip MH and MMDF for now; but IMAP/JMAP might happen
sooner but deduplication needs low-latency.

"extinbox" renamed "external"

Basic infrastructure like PublicInbox::IPC and SharedKV
should've been done and in use ages ago...  I look forward to
using them, at least.

Some DS safety fixes since lei will use it in stranger ways
than current.

Bad enough we have messages with duplicate Message-IDs, lei will
need to deal with Unsent/Drafts messages w/o Message-IDs at all!

Eric Wong (36):
  import: respect init.defaultBranch
  lei_store: use per-machine refname as git HEAD
  revert "lei_store: use per-machine refname as git HEAD"
  lei_to_mail: initial implementation for writing mbox formats
  sharedkv: fork()-friendly key-value store
  sharedkv: split out index_values
  lei_to_mail: start atomic and compressed mbox writing
  mboxreader: new class for reading various mbox formats
  lei_to_mail: start --augment, dedupe, bz2 and xz
  lei: implement various deduplication strategies
  lei_to_mail: lazy-require LeiDedupe
  lei_to_mail: support for non-seekable outputs
  lei_to_mail: support Maildir, fix+test --augment
  ipc: generic IPC dispatch based on Storable
  ipc: support Sereal
  lei_store: add ->set_eml, ->add_eml can return smsg
  lei: rename "extinbox" => "external"
  mid: use defined-or with `push' for uniqueness check
  mid: hoist out mids_in sub
  lei_store: handle messages without Message-ID at all
  ipc: use shutdown(2), base atfork* callback
  lei_to_mail: unlink mboxes if not augmenting
  lei: add --mfolder as an option
  spawn: move run_die here from PublicInbox::Import
  init: remove embedded UnlinkMe package
  t/run.perl: avoid uninitialized var on incomplete test
  gcf2client: reap process on DESTROY
  lei_to_mail: open FIFOs O_WRONLY so we block
  searchidxshard: call DS->Reset at worker start
  t/ipc.t: test for references via `die'
  use PublicInbox::DS for dwaitpid
  syscall: SFD_NONBLOCK can be a constant, again
  lei: avoid Spawn package when starting daemon
  avoid calling waitpid from children in DESTROY
  ds: clobber $in_loop first at reset
  on_destroy: support PID owner guard

 MANIFEST                                      |  12 +-
 lib/PublicInbox/DS.pm                         |  42 +-
 lib/PublicInbox/DSKQXS.pm                     |   4 +-
 lib/PublicInbox/Daemon.pm                     |   4 +-
 lib/PublicInbox/Gcf2Client.pm                 |  18 +-
 lib/PublicInbox/Git.pm                        |   7 +-
 lib/PublicInbox/IPC.pm                        | 165 ++++++++
 lib/PublicInbox/Import.pm                     |  36 +-
 lib/PublicInbox/LEI.pm                        |  44 +--
 lib/PublicInbox/LeiDedupe.pm                  | 100 +++++
 .../{LeiExtinbox.pm => LeiExternal.pm}        |  18 +-
 lib/PublicInbox/LeiStore.pm                   |  32 +-
 lib/PublicInbox/LeiToMail.pm                  | 361 ++++++++++++++++++
 lib/PublicInbox/LeiXSearch.pm                 |   2 +-
 lib/PublicInbox/Lock.pm                       |  17 +-
 lib/PublicInbox/MID.pm                        |  15 +-
 lib/PublicInbox/MboxReader.pm                 | 127 ++++++
 lib/PublicInbox/OnDestroy.pm                  |   5 +
 lib/PublicInbox/OverIdx.pm                    |   2 +
 lib/PublicInbox/ProcessPipe.pm                |  34 +-
 lib/PublicInbox/Qspawn.pm                     |  43 +--
 lib/PublicInbox/SearchIdxShard.pm             |   1 +
 lib/PublicInbox/SharedKV.pm                   | 148 +++++++
 lib/PublicInbox/Sigfd.pm                      |   4 +-
 lib/PublicInbox/Smsg.pm                       |   6 +-
 lib/PublicInbox/Spawn.pm                      |   9 +-
 lib/PublicInbox/Syscall.pm                    |   4 +-
 lib/PublicInbox/TestCommon.pm                 |  25 +-
 lib/PublicInbox/V2Writable.pm                 |  10 +-
 script/lei                                    |  17 +-
 script/public-inbox-init                      |  32 +-
 script/public-inbox-watch                     |   4 +-
 t/convert-compact.t                           |   4 +-
 t/index-git-times.t                           |   3 +-
 t/ipc.t                                       |  80 ++++
 t/lei.t                                       |  22 +-
 t/lei_dedupe.t                                |  59 +++
 t/lei_store.t                                 |  47 ++-
 t/lei_to_mail.t                               | 246 ++++++++++++
 t/lei_xsearch.t                               |   2 +-
 t/mbox_reader.t                               |  75 ++++
 t/on_destroy.t                                |   9 +
 t/plack.t                                     |   4 +-
 t/run.perl                                    |   3 +-
 t/shared_kv.t                                 |  58 +++
 t/sigfd.t                                     |   6 +-
 46 files changed, 1755 insertions(+), 211 deletions(-)
 create mode 100644 lib/PublicInbox/IPC.pm
 create mode 100644 lib/PublicInbox/LeiDedupe.pm
 rename lib/PublicInbox/{LeiExtinbox.pm => LeiExternal.pm} (75%)
 create mode 100644 lib/PublicInbox/LeiToMail.pm
 create mode 100644 lib/PublicInbox/MboxReader.pm
 create mode 100644 lib/PublicInbox/SharedKV.pm
 create mode 100644 t/ipc.t
 create mode 100644 t/lei_dedupe.t
 create mode 100644 t/lei_to_mail.t
 create mode 100644 t/mbox_reader.t
 create mode 100644 t/shared_kv.t


^ permalink raw reply	[flat|nested] 37+ messages in thread

* [PATCH 01/36] import: respect init.defaultBranch
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 02/36] lei_store: use per-machine refname as git HEAD Eric Wong
                   ` (34 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

This matches git v2.28.0+ behavior in case users prefer
a different name.
---
 lib/PublicInbox/Import.pm | 17 +++++++++++++----
 1 file changed, 13 insertions(+), 4 deletions(-)

diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index 079afc5f..7258e848 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -19,13 +19,21 @@ use PublicInbox::MDA;
 use PublicInbox::Eml;
 use POSIX qw(strftime);
 
+sub default_branch () {
+	state $default_branch = do {
+		my $r = popen_rd([qw(git config --global init.defaultBranch)]);
+		chomp(my $h = <$r> // '');
+		$h eq '' ? 'refs/heads/master' : $h;
+	}
+}
+
 sub new {
 	# we can't change arg order, this is documented in POD
 	# and external projects may rely on it:
 	my ($class, $git, $name, $email, $ibx) = @_;
-	my $ref = 'refs/heads/master';
+	my $ref;
 	if ($ibx) {
-		$ref = $ibx->{ref_head} // 'refs/heads/master';
+		$ref = $ibx->{ref_head};
 		$name //= $ibx->{name};
 		$email //= $ibx->{-primary_address};
 		$git //= $ibx->git;
@@ -34,7 +42,7 @@ sub new {
 		git => $git,
 		ident => "$name <$email>",
 		mark => 1,
-		ref => $ref,
+		ref => $ref // default_branch,
 		ibx => $ibx,
 		path_type => '2/38', # or 'v2'
 		lock_path => "$git->{git_dir}/ssoma.lock", # v2 changes this
@@ -441,7 +449,7 @@ sub run_die ($;$$) {
 	$? == 0 or die join(' ', @$cmd) . " failed: $?\n";
 }
 
-my @INIT_FILES = ('HEAD' => "ref: refs/heads/master\n",
+my @INIT_FILES = ('HEAD' => undef, # filled in at runtime
 		'description' => <<EOD,
 Unnamed repository; edit this file 'description' to name the repository.
 EOD
@@ -459,6 +467,7 @@ sub init_bare {
 	$dir = $dir->{git}->{git_dir} if ref($dir);
 	require File::Path;
 	File::Path::mkpath([ map { "$dir/$_" } qw(objects/info refs/heads) ]);
+	$INIT_FILES[1] //= 'ref: '.default_branch."\n";
 	for (my $i = 0; $i < @INIT_FILES; $i++) {
 		my $f = $dir.'/'.$INIT_FILES[$i++];
 		next if -f $f;

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 02/36] lei_store: use per-machine refname as git HEAD
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
  2020-12-31 13:51 ` [PATCH 01/36] import: respect init.defaultBranch Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 03/36] revert "lei_store: use per-machine refname as git HEAD" Eric Wong
                   ` (33 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

It may be helpful to identify the source of messages
and perhaps avoid conflicting history.

On the other hand, this may be a terrible idea for users who
move portable storage (e.g. USB sticks) across computers...
---
 lib/PublicInbox/Import.pm   | 10 ++++++----
 lib/PublicInbox/LeiStore.pm | 21 ++++++++++++++++++++-
 2 files changed, 26 insertions(+), 5 deletions(-)

diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index 7258e848..60cff9c2 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -463,16 +463,18 @@ EOD
 EOC
 
 sub init_bare {
-	my ($dir) = @_; # or self
+	my ($dir, $head) = @_; # or self
 	$dir = $dir->{git}->{git_dir} if ref($dir);
 	require File::Path;
 	File::Path::mkpath([ map { "$dir/$_" } qw(objects/info refs/heads) ]);
 	$INIT_FILES[1] //= 'ref: '.default_branch."\n";
-	for (my $i = 0; $i < @INIT_FILES; $i++) {
-		my $f = $dir.'/'.$INIT_FILES[$i++];
+	my @fn_contents = @INIT_FILES;
+	$fn_contents[1] = "ref: refs/heads/$head\n" if defined $head;
+	while (my ($fn, $contents) = splice(@fn_contents, 0, 2)) {
+		my $f = $dir.'/'.$fn;
 		next if -f $f;
 		open my $fh, '>', $f or die "open $f: $!";
-		print $fh $INIT_FILES[$i] or die "print $f: $!";
+		print $fh $contents or die "print $f: $!";
 		close $fh or die "close $f: $!";
 	}
 }
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 553adbc8..a17c7bab 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -60,6 +60,24 @@ sub git_ident ($) {
 		('lei user', 'x@example.com')
 }
 
+# We will support users combining storage across multiple machines
+# somehow.  Use per-machine refnames to make it easy-to-identify
+# where a message came from
+sub host_head () {
+	state $h = do {
+		my $x = PublicInbox::ExtSearchIdx::host_ident;
+		# Similar rules found in git.git/remote.c::valid_remote_nick
+		# and git.git/refs.c::check_refname_component
+		$x =~ s!(?:\.lock|/)+\z!!gs; # must not end with ".lock" or "/"
+		$x =~ tr/././s; # no dot-dot, collapse them
+		$x =~ s/@\{/\@-/gs;
+		$x =~ s/\A\./-/s;
+		# no "*", ":", "?", "[", "\", "^", "~", SP, TAB; "]" is OK
+		$x =~ tr^a-zA-Z0-9!"#$%&'()+,\-.;<=>@]_`{|}^-^c;
+		$x
+	};
+}
+
 sub importer {
 	my ($self) = @_;
 	my $max;
@@ -78,8 +96,8 @@ sub importer {
 	while (1) {
 		my $latest = "$pfx/$max.git";
 		my $old = -e $latest;
+		PublicInbox::Import::init_bare($latest, host_head);
 		my $git = PublicInbox::Git->new($latest);
-		PublicInbox::Import::init_bare({ git => $git });
 		$git->qx(qw(config core.sharedRepository 0600)) if !$old;
 		my $packed_bytes = $git->packed_bytes;
 		my $unpacked_bytes = $packed_bytes / $self->packing_factor;
@@ -92,6 +110,7 @@ sub importer {
 		$im->{bytes_added} = int($packed_bytes / $self->packing_factor);
 		$im->{lock_path} = undef;
 		$im->{path_type} = 'v2';
+		$im->{'ref'} = host_head;
 		return $im;
 	}
 }

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 03/36] revert "lei_store: use per-machine refname as git HEAD"
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
  2020-12-31 13:51 ` [PATCH 01/36] import: respect init.defaultBranch Eric Wong
  2020-12-31 13:51 ` [PATCH 02/36] lei_store: use per-machine refname as git HEAD Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 04/36] lei_to_mail: initial implementation for writing mbox formats Eric Wong
                   ` (32 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

In retrospect, per-machine HEADs was a bad idea because users
of removable storage would be thrown off when moving storage
between different machines.

This is only a partial revert, the Import::init_bare change to
support alternate head names still exists because we may use it
for other reasons.
---
 lib/PublicInbox/LeiStore.pm | 21 +--------------------
 1 file changed, 1 insertion(+), 20 deletions(-)

diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index a17c7bab..f8383d5e 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -60,24 +60,6 @@ sub git_ident ($) {
 		('lei user', 'x@example.com')
 }
 
-# We will support users combining storage across multiple machines
-# somehow.  Use per-machine refnames to make it easy-to-identify
-# where a message came from
-sub host_head () {
-	state $h = do {
-		my $x = PublicInbox::ExtSearchIdx::host_ident;
-		# Similar rules found in git.git/remote.c::valid_remote_nick
-		# and git.git/refs.c::check_refname_component
-		$x =~ s!(?:\.lock|/)+\z!!gs; # must not end with ".lock" or "/"
-		$x =~ tr/././s; # no dot-dot, collapse them
-		$x =~ s/@\{/\@-/gs;
-		$x =~ s/\A\./-/s;
-		# no "*", ":", "?", "[", "\", "^", "~", SP, TAB; "]" is OK
-		$x =~ tr^a-zA-Z0-9!"#$%&'()+,\-.;<=>@]_`{|}^-^c;
-		$x
-	};
-}
-
 sub importer {
 	my ($self) = @_;
 	my $max;
@@ -96,7 +78,7 @@ sub importer {
 	while (1) {
 		my $latest = "$pfx/$max.git";
 		my $old = -e $latest;
-		PublicInbox::Import::init_bare($latest, host_head);
+		PublicInbox::Import::init_bare($latest);
 		my $git = PublicInbox::Git->new($latest);
 		$git->qx(qw(config core.sharedRepository 0600)) if !$old;
 		my $packed_bytes = $git->packed_bytes;
@@ -110,7 +92,6 @@ sub importer {
 		$im->{bytes_added} = int($packed_bytes / $self->packing_factor);
 		$im->{lock_path} = undef;
 		$im->{path_type} = 'v2';
-		$im->{'ref'} = host_head;
 		return $im;
 	}
 }

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 04/36] lei_to_mail: initial implementation for writing mbox formats
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (2 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 03/36] revert "lei_store: use per-machine refname as git HEAD" Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 05/36] sharedkv: fork()-friendly key-value store Eric Wong
                   ` (31 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

No Maildir, support, yet, but it'll come.
---
 MANIFEST                     |   2 +
 lib/PublicInbox/LeiToMail.pm | 109 +++++++++++++++++++++++++++++++++++
 t/lei_to_mail.t              |  65 +++++++++++++++++++++
 3 files changed, 176 insertions(+)
 create mode 100644 lib/PublicInbox/LeiToMail.pm
 create mode 100644 t/lei_to_mail.t

diff --git a/MANIFEST b/MANIFEST
index a5ff81cf..12b67e95 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -165,6 +165,7 @@ lib/PublicInbox/LEI.pm
 lib/PublicInbox/LeiExtinbox.pm
 lib/PublicInbox/LeiSearch.pm
 lib/PublicInbox/LeiStore.pm
+lib/PublicInbox/LeiToMail.pm
 lib/PublicInbox/LeiXSearch.pm
 lib/PublicInbox/Linkify.pm
 lib/PublicInbox/Listener.pm
@@ -328,6 +329,7 @@ t/kqnotify.t
 t/lei-oneshot.t
 t/lei.t
 t/lei_store.t
+t/lei_to_mail.t
 t/lei_xsearch.t
 t/linkify.t
 t/main-bin/spamc
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
new file mode 100644
index 00000000..b0d4b664
--- /dev/null
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -0,0 +1,109 @@
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Writes PublicInbox::Eml objects atomically to a mbox variant or Maildir
+package PublicInbox::LeiToMail;
+use strict;
+use v5.10.1;
+use PublicInbox::Eml;
+
+my %kw2char = ( # Maildir characters
+	draft => 'D',
+	flagged => 'F',
+	answered => 'R',
+	seen => 'S'
+);
+
+my %kw2status = (
+	flagged => [ 'X-Status' => 'F' ],
+	answered => [ 'X-Status' => 'A' ],
+	seen => [ 'Status' => 'R' ],
+	draft => [ 'X-Status' => 'T' ],
+);
+
+sub _mbox_hdr_buf ($$$) {
+	my ($eml, $type, $kw) = @_;
+	$eml->header_set($_) for (qw(Lines Bytes Content-Length));
+	my %hdr; # set Status, X-Status
+	for my $k (@$kw) {
+		if (my $ent = $kw2status{$k}) {
+			push @{$hdr{$ent->[0]}}, $ent->[1];
+		} else { # X-Label?
+			warn "TODO: keyword `$k' not supported for mbox\n";
+		}
+	}
+	while (my ($name, $chars) = each %hdr) {
+		$eml->header_set($name, join('', sort @$chars));
+	}
+	my $buf = delete $eml->{hdr};
+
+	# fixup old bug from import (pre-a0c07cba0e5d8b6a)
+	$$buf =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
+
+	substr($$buf, 0, 0, # prepend From line
+		"From lei\@$type Thu Jan  1 00:00:00 1970$eml->{crlf}");
+	$buf;
+}
+
+sub write_in_full_atomic ($$) {
+	my ($fh, $buf) = @_;
+	defined(my $w = syswrite($fh, $$buf)) or die "write: $!";
+	$w == length($$buf) or die "short write: $w != ".length($$buf);
+}
+
+sub eml2mboxrd ($;$) {
+	my ($eml, $kw) = @_;
+	my $buf = _mbox_hdr_buf($eml, 'mboxrd', $kw);
+	if (my $bdy = delete $eml->{bdy}) {
+		$$bdy =~ s/^(>*From )/>$1/gm;
+		$$buf .= $eml->{crlf};
+		substr($$bdy, 0, 0, $$buf); # prepend header
+		$buf = $bdy;
+	}
+	$$buf .= $eml->{crlf};
+	$buf;
+}
+
+sub eml2mboxo {
+	my ($eml, $kw) = @_;
+	my $buf = _mbox_hdr_buf($eml, 'mboxo', $kw);
+	if (my $bdy = delete $eml->{bdy}) {
+		$$bdy =~ s/^From />From /gm;
+		$$buf .= $eml->{crlf};
+		substr($$bdy, 0, 0, $$buf); # prepend header
+		$buf = $bdy;
+	}
+	$$buf .= $eml->{crlf};
+	$buf;
+}
+
+# mboxcl still escapes "From " lines
+sub eml2mboxcl {
+	my ($eml, $kw) = @_;
+	my $buf = _mbox_hdr_buf($eml, 'mboxcl', $kw);
+	my $crlf = $eml->{crlf};
+	if (my $bdy = delete $eml->{bdy}) {
+		$$bdy =~ s/^From />From /gm;
+		$$buf .= 'Content-Length: '.length($$bdy).$crlf.$crlf;
+		substr($$bdy, 0, 0, $$buf); # prepend header
+		$buf = $bdy;
+	}
+	$$buf .= $crlf;
+	$buf;
+}
+
+# mboxcl2 has no "From " escaping
+sub eml2mboxcl2 {
+	my ($eml, $kw) = @_;
+	my $buf = _mbox_hdr_buf($eml, 'mboxcl2', $kw);
+	my $crlf = $eml->{crlf};
+	if (my $bdy = delete $eml->{bdy}) {
+		$$buf .= 'Content-Length: '.length($$bdy).$crlf.$crlf;
+		substr($$bdy, 0, 0, $$buf); # prepend header
+		$buf = $bdy;
+	}
+	$$buf .= $crlf;
+	$buf;
+}
+
+1;
diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t
new file mode 100644
index 00000000..089a422e
--- /dev/null
+++ b/t/lei_to_mail.t
@@ -0,0 +1,65 @@
+#!perl -w
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+use PublicInbox::Eml;
+use_ok 'PublicInbox::LeiToMail';
+my $from = "Content-Length: 10\nSubject: x\n\nFrom hell\n";
+my $noeol = "Subject: x\n\nFrom hell";
+my $crlf = $noeol;
+$crlf =~ s/\n/\r\n/g;
+my $kw = [qw(seen answered flagged)];
+for my $mbox (qw(mboxrd mboxo mboxcl mboxcl2)) {
+	my $m = "eml2$mbox";
+	my $cb = PublicInbox::LeiToMail->can($m);
+	my $s = $cb->(PublicInbox::Eml->new($from), $kw);
+	is(substr($$s, -1, 1), "\n", "trailing LF in normal $mbox");
+	my $eml = PublicInbox::Eml->new($s);
+	is($eml->header('Status'), 'R', "Status: set by $m");
+	is($eml->header('X-Status'), 'AF', "X-Status: set by $m");
+	if ($mbox eq 'mboxcl2') {
+		like($eml->body_raw, qr/^From /, "From not escaped $m");
+	} else {
+		like($eml->body_raw, qr/^>From /, "From escaped once by $m");
+	}
+	my @cl = $eml->header('Content-Length');
+	if ($mbox =~ /mboxcl/) {
+		is(scalar(@cl), 1, "$m only has one Content-Length header");
+		is($cl[0] + length("\n"),
+			length($eml->body_raw), "$m Content-Length matches");
+	} else {
+		is(scalar(@cl), 0, "$m clobbered Content-Length");
+	}
+	$s = $cb->(PublicInbox::Eml->new($noeol), $kw);
+	is(substr($$s, -1, 1), "\n",
+		"trailing LF added by $m when original lacks EOL");
+	$eml = PublicInbox::Eml->new($s);
+	if ($mbox eq 'mboxcl2') {
+		is($eml->body_raw, "From hell\n", "From not escaped by $m");
+	} else {
+		is($eml->body_raw, ">From hell\n", "From escaped once by $m");
+	}
+	$s = $cb->(PublicInbox::Eml->new($crlf), $kw);
+	is(substr($$s, -2, 2), "\r\n",
+		"trailing CRLF added $m by original lacks EOL");
+	$eml = PublicInbox::Eml->new($s);
+	if ($mbox eq 'mboxcl2') {
+		is($eml->body_raw, "From hell\r\n", "From not escaped by $m");
+	} else {
+		is($eml->body_raw, ">From hell\r\n", "From escaped once by $m");
+	}
+	if ($mbox =~ /mboxcl/) {
+		is($eml->header('Content-Length') + length("\r\n"),
+			length($eml->body_raw), "$m Content-Length matches");
+	} elsif ($mbox eq 'mboxrd') {
+		$s = $cb->($eml, $kw);
+		$eml = PublicInbox::Eml->new($s);
+		is($eml->body_raw,
+			">>From hell\r\n\r\n", "From escaped again by $m");
+	}
+}
+
+done_testing;

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 05/36] sharedkv: fork()-friendly key-value store
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (3 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 04/36] lei_to_mail: initial implementation for writing mbox formats Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 06/36] sharedkv: split out index_values Eric Wong
                   ` (30 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

This is intended for maintaining Maildir states, mbox message
deduplication, but may be useful for other purposes...
---
 MANIFEST                    |   2 +
 lib/PublicInbox/Lock.pm     |  10 ++-
 lib/PublicInbox/SharedKV.pm | 143 ++++++++++++++++++++++++++++++++++++
 t/shared_kv.t               |  57 ++++++++++++++
 4 files changed, 211 insertions(+), 1 deletion(-)
 create mode 100644 lib/PublicInbox/SharedKV.pm
 create mode 100644 t/shared_kv.t

diff --git a/MANIFEST b/MANIFEST
index 12b67e95..d32f064e 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -200,6 +200,7 @@ lib/PublicInbox/SearchIdxShard.pm
 lib/PublicInbox/SearchQuery.pm
 lib/PublicInbox/SearchThread.pm
 lib/PublicInbox/SearchView.pm
+lib/PublicInbox/SharedKV.pm
 lib/PublicInbox/Sigfd.pm
 lib/PublicInbox/Smsg.pm
 lib/PublicInbox/SolverGit.pm
@@ -377,6 +378,7 @@ t/run.perl
 t/search-amsg.eml
 t/search-thr-index.t
 t/search.t
+t/shared_kv.t
 t/sigfd.t
 t/solve/0001-simple-mod.patch
 t/solve/0002-rename-with-modifications.patch
diff --git a/lib/PublicInbox/Lock.pm b/lib/PublicInbox/Lock.pm
index b2c8227f..7fd17745 100644
--- a/lib/PublicInbox/Lock.pm
+++ b/lib/PublicInbox/Lock.pm
@@ -4,9 +4,10 @@
 # Base class for per-inbox locking
 package PublicInbox::Lock;
 use strict;
-use warnings;
+use v5.10.1;
 use Fcntl qw(:flock :DEFAULT);
 use Carp qw(croak);
+use PublicInbox::OnDestroy;
 
 # we only acquire the flock if creating or reindexing;
 # PublicInbox::Import already has the lock on its own.
@@ -32,4 +33,11 @@ sub lock_release {
 	close $lockfh or croak "close $lock_path failed: $!\n";
 }
 
+# caller must use return value
+sub lock_for_scope {
+	my ($self) = @_;
+	$self->lock_acquire;
+	PublicInbox::OnDestroy->new(\&lock_release, $self);
+}
+
 1;
diff --git a/lib/PublicInbox/SharedKV.pm b/lib/PublicInbox/SharedKV.pm
new file mode 100644
index 00000000..52a7424e
--- /dev/null
+++ b/lib/PublicInbox/SharedKV.pm
@@ -0,0 +1,143 @@
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# fork()-friendly key-value store.  Will be used for making
+# augmenting Maildirs and mboxes less expensive, maybe.
+# We use flock(2) to avoid SQLite lock problems (busy timeouts, backoff)
+package PublicInbox::SharedKV;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::Lock);
+use File::Temp 0.19 (); # 0.19 for ->newdir
+use DBI ();
+use PublicInbox::Spawn;
+
+sub dbh {
+	my ($self, $lock) = @_;
+	$self->{dbh} //= do {
+		my $f = $self->{filename};
+		$lock //= $self->lock_for_scope;
+		my $dbh = DBI->connect("dbi:SQLite:dbname=$f", '', '', {
+			AutoCommit => 1,
+			RaiseError => 1,
+			PrintError => 0,
+			sqlite_use_immediate_transaction => 1,
+			# no sqlite_unicode here, this is for binary data
+		});
+		my $opt = $self->{opt} // {};
+		$dbh->do('PRAGMA synchronous = OFF') if !$opt->{fsync};
+		$dbh->do('PRAGMA cache_size = '.($opt->{cache_size} || 80000));
+		$dbh->do('PRAGMA journal_mode = '.
+				($opt->{journal_mode} // 'WAL'));
+		$dbh->do(<<'');
+CREATE TABLE IF NOT EXISTS kv (
+	k VARBINARY PRIMARY KEY NOT NULL,
+	v VARBINARY NOT NULL,
+	UNIQUE (k)
+)
+
+		$dbh->do('CREATE INDEX IF NOT EXISTS idx_v ON kv (v)');
+		$dbh;
+	}
+}
+
+sub new {
+	my ($cls, $dir, $base, $opt) = @_;
+	my $self = bless { opt => $opt }, $cls;
+	unless (defined $dir) {
+		$self->{tmp} = File::Temp->newdir('kv-XXXXXX', TMPDIR => 1);
+		$dir = $self->{tmp}->dirname;
+	}
+	-d $dir or mkdir($dir) or die "mkdir($dir): $!";
+	$base //= '';
+	my $f = $self->{filename} = "$dir/$base.sqlite3";
+	$self->{lock_path} = $opt->{lock_path} // "$dir/$base.flock";
+	unless (-f $f) {
+		open my $fh, '+>>', $f or die "failed to open $f: $!";
+		PublicInbox::Spawn::nodatacow_fd(fileno($fh));
+	}
+	$self;
+}
+
+sub set_maybe {
+	my ($self, $key, $val, $lock) = @_;
+	$lock //= $self->lock_for_scope;
+	my $e = $self->{dbh}->prepare_cached(<<'')->execute($key, $val);
+INSERT OR IGNORE INTO kv (k,v) VALUES (?, ?)
+
+	$e == 0 ? undef : $e;
+}
+
+# caller calls sth->fetchrow_array
+sub each_kv_iter {
+	my ($self) = @_;
+	my $sth = $self->{dbh}->prepare_cached(<<'', undef, 1);
+SELECT k,v FROM kv
+
+	$sth->execute;
+	$sth
+}
+
+sub delete_by_val {
+	my ($self, $val, $lock) = @_;
+	$lock //= $self->lock_for_scope;
+	$self->{dbh}->prepare_cached(<<'')->execute($val) + 0;
+DELETE FROM kv WHERE v = ?
+
+}
+
+sub replace_values {
+	my ($self, $oldval, $newval, $lock) = @_;
+	$lock //= $self->lock_for_scope;
+	$self->{dbh}->prepare_cached(<<'')->execute($newval, $oldval) + 0;
+UPDATE kv SET v = ? WHERE v = ?
+
+}
+
+sub set {
+	my ($self, $key, $val) = @_;
+	if (defined $val) {
+		my $e = $self->{dbh}->prepare_cached(<<'')->execute($key, $val);
+INSERT OR REPLACE INTO kv (k,v) VALUES (?,?)
+
+		$e == 0 ? undef : $e;
+	} else {
+		$self->{dbh}->prepare_cached(<<'')->execute($key);
+DELETE FROM kv WHERE k = ?
+
+	}
+}
+
+sub get {
+	my ($self, $key) = @_;
+	my $sth = $self->{dbh}->prepare_cached(<<'', undef, 1);
+SELECT v FROM kv WHERE k = ?
+
+	$sth->execute($key);
+	$sth->fetchrow_array;
+}
+
+sub xchg {
+	my ($self, $key, $newval, $lock) = @_;
+	$lock //= $self->lock_for_scope;
+	my $oldval = get($self, $key);
+	if (defined $newval) {
+		set($self, $key, $newval);
+	} else {
+		$self->{dbh}->prepare_cached(<<'')->execute($key);
+DELETE FROM kv WHERE k = ?
+
+	}
+	$oldval;
+}
+
+sub count {
+	my ($self) = @_;
+	my $sth = $self->{dbh}->prepare_cached(<<'');
+SELECT COUNT(k) FROM kv
+
+	$sth->execute;
+	$sth->fetchrow_array;
+}
+
+1;
diff --git a/t/shared_kv.t b/t/shared_kv.t
new file mode 100644
index 00000000..4b727462
--- /dev/null
+++ b/t/shared_kv.t
@@ -0,0 +1,57 @@
+#!perl -w
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+use_ok 'PublicInbox::SharedKV';
+my ($tmpdir, $for_destroy) = tmpdir();
+local $ENV{TMPDIR} = $tmpdir;
+my $skv = PublicInbox::SharedKV->new;
+opendir(my $dh, $tmpdir) or BAIL_OUT $!;
+my @ent = grep(!/\A\.\.?\z/, readdir($dh));
+is(scalar(@ent), 1, 'created a temporary dir');
+$skv->dbh;
+my $dead = "\xde\xad";
+my $beef = "\xbe\xef";
+my $cafe = "\xca\xfe";
+ok($skv->set($dead, $beef), 'set');
+is($skv->get($dead), $beef, 'get');
+ok($skv->set($dead, $beef), 'set idempotent');
+ok(!$skv->set_maybe($dead, $cafe), 'set_maybe ignores');
+ok($skv->set_maybe($cafe, $dead), 'set_maybe sets');
+is($skv->xchg($dead, $cafe), $beef, 'xchg');
+is($skv->get($dead), $cafe, 'get after xchg');
+is($skv->xchg($dead, undef), $cafe, 'xchg to undef');
+is($skv->get($dead), undef, 'get after xchg to undef');
+is($skv->get($cafe), $dead, 'get after set_maybe');
+is($skv->replace_values($dead, $cafe), 1, 'replaced one by value');
+is($skv->get($cafe), $cafe, 'value updated');
+is($skv->replace_values($dead, $cafe), 0, 'replaced none by value');
+is($skv->xchg($dead, $cafe), undef, 'xchg from undef');
+is($skv->count, 2, 'count works');
+
+my %seen;
+my $sth = $skv->each_kv_iter;
+while (my ($k, $v) = $sth->fetchrow_array) {
+	$seen{$k} = $v;
+}
+is($seen{$dead}, $cafe, '$dead has expected value');
+is($seen{$cafe}, $cafe, '$cafe has expected value');
+is(scalar keys %seen, 2, 'iterated through all');
+
+is($skv->replace_values($cafe, $dead), 2, 'replaced 2 by value');
+is($skv->delete_by_val('bogus'), 0, 'delete_by_val misses');
+is($skv->delete_by_val($dead), 2, 'delete_by_val hits');
+is($skv->delete_by_val($dead), 0, 'delete_by_val misses again');
+
+undef $skv;
+rewinddir($dh);
+@ent = grep(!/\A\.\.?\z/, readdir($dh));
+is(scalar(@ent), 0, 'temporary dir gone');
+undef $dh;
+$skv = PublicInbox::SharedKV->new("$tmpdir/dir", 'base');
+ok(-e "$tmpdir/dir/base.sqlite3", 'file created');
+
+done_testing;

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 06/36] sharedkv: split out index_values
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (4 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 05/36] sharedkv: fork()-friendly key-value store Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 07/36] lei_to_mail: start atomic and compressed mbox writing Eric Wong
                   ` (29 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

In most cases, we won't need to index by value, so
don't waste cycles or space on it.
---
 lib/PublicInbox/SharedKV.pm | 7 ++++++-
 t/shared_kv.t               | 1 +
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/lib/PublicInbox/SharedKV.pm b/lib/PublicInbox/SharedKV.pm
index 52a7424e..983952f5 100644
--- a/lib/PublicInbox/SharedKV.pm
+++ b/lib/PublicInbox/SharedKV.pm
@@ -36,7 +36,6 @@ CREATE TABLE IF NOT EXISTS kv (
 	UNIQUE (k)
 )
 
-		$dbh->do('CREATE INDEX IF NOT EXISTS idx_v ON kv (v)');
 		$dbh;
 	}
 }
@@ -59,6 +58,12 @@ sub new {
 	$self;
 }
 
+sub index_values {
+	my ($self) = @_;
+	my $lock = $self->lock_for_scope;
+	$self->dbh($lock)->do('CREATE INDEX IF NOT EXISTS idx_v ON kv (v)');
+}
+
 sub set_maybe {
 	my ($self, $key, $val, $lock) = @_;
 	$lock //= $self->lock_for_scope;
diff --git a/t/shared_kv.t b/t/shared_kv.t
index 4b727462..ad901328 100644
--- a/t/shared_kv.t
+++ b/t/shared_kv.t
@@ -26,6 +26,7 @@ is($skv->get($dead), $cafe, 'get after xchg');
 is($skv->xchg($dead, undef), $cafe, 'xchg to undef');
 is($skv->get($dead), undef, 'get after xchg to undef');
 is($skv->get($cafe), $dead, 'get after set_maybe');
+ok($skv->index_values, 'index_values works');
 is($skv->replace_values($dead, $cafe), 1, 'replaced one by value');
 is($skv->get($cafe), $cafe, 'value updated');
 is($skv->replace_values($dead, $cafe), 0, 'replaced none by value');

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 07/36] lei_to_mail: start atomic and compressed mbox writing
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (5 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 06/36] sharedkv: split out index_values Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 08/36] mboxreader: new class for reading various mbox formats Eric Wong
                   ` (28 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

We'll allow using multiple workers to write to a single
mbox (which could be compressed).  This is can be done
safely with O_APPEND + syswrite for uncompressed files,
and using a lock when piping to pigz/gzip/bzip2/xz.
---
 lib/PublicInbox/LeiToMail.pm   | 99 ++++++++++++++++++++++++++++++++--
 lib/PublicInbox/ProcessPipe.pm | 21 ++++++--
 t/lei_to_mail.t                | 47 ++++++++++++++++
 3 files changed, 158 insertions(+), 9 deletions(-)

diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index b0d4b664..ebb50c50 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -6,6 +6,12 @@ package PublicInbox::LeiToMail;
 use strict;
 use v5.10.1;
 use PublicInbox::Eml;
+use PublicInbox::Lock;
+use PublicInbox::ProcessPipe;
+use PublicInbox::Spawn qw(which spawn);
+use Symbol qw(gensym);
+use File::Temp ();
+use IO::Handle; # ->autoflush
 
 my %kw2char = ( # Maildir characters
 	draft => 'D',
@@ -45,10 +51,14 @@ sub _mbox_hdr_buf ($$$) {
 	$buf;
 }
 
-sub write_in_full_atomic ($$) {
-	my ($fh, $buf) = @_;
-	defined(my $w = syswrite($fh, $$buf)) or die "write: $!";
-	$w == length($$buf) or die "short write: $w != ".length($$buf);
+sub write_in_full ($$$) {
+	my ($fh, $buf, $atomic) = @_;
+	if ($atomic) {
+		defined(my $w = syswrite($fh, $$buf)) or die "write: $!";
+		$w == length($$buf) or die "short write: $w != ".length($$buf);
+	} else {
+		print $fh $$buf or die "print: $!";
+	}
 }
 
 sub eml2mboxrd ($;$) {
@@ -106,4 +116,85 @@ sub eml2mboxcl2 {
 	$buf;
 }
 
+sub mkmaildir ($) {
+	my ($maildir) = @_;
+	for (qw(new tmp cur)) {
+		my $d = "$maildir/$_";
+		next if -d $d;
+		require File::Path;
+		if (!File::Path::mkpath($d) && !-d $d) {
+			die "failed to mkpath($d): $!\n";
+		}
+	}
+}
+
+sub git_to_mail { # git->cat_async callback
+	my ($bref, $oid, $type, $size, $arg) = @_;
+	if ($type ne 'blob') {
+		if ($type eq 'missing') {
+			warn "missing $oid\n";
+		} else {
+			warn "unexpected type=$type for $oid\n";
+		}
+	}
+	if ($size > 0) {
+		my ($write_cb, $kw) = @$arg;
+		$write_cb->($bref, $oid, $kw);
+	}
+}
+
+sub reap_compress { # dwaitpid callback
+	my ($lei, $pid) = @_;
+	my $cmd = delete $lei->{"pid.$pid"};
+	return if $? == 0;
+	$lei->fail("@$cmd failed", $? >> 8);
+}
+
+sub compress_dst {
+	my ($out, $sfx, $lei) = @_;
+	my $cmd = [];
+	if ($sfx eq 'gz') {
+		$cmd->[0] = which($lei->{env}->{GZIP} // 'pigz') //
+				which('gzip') //
+			die "pigz or gzip missing for $sfx\n";
+			# TODO: use IO::Compress::Gzip
+		push @$cmd, '-c'; # stdout
+		push @$cmd, '--rsyncable' if $lei->{opt}->{rsyncable};
+	} else {
+		die "TODO $sfx"
+	}
+	pipe(my ($r, $w)) or die "pipe: $!";
+	my $rdr = { 0 => $r, 1 => $out, 2 => $lei->{2} };
+	my $pid = spawn($cmd, $lei->{env}, $rdr);
+	$lei->{"pid.$pid"} = $cmd;
+	my $pp = gensym;
+	tie *$pp, 'PublicInbox::ProcessPipe', $pid, $w, \&reap_compress, $lei;
+	my $tmp = File::Temp->new("$sfx.lock-XXXXXX", TMPDIR => 1);
+	my $pipe_lk = ($lei->{opt}->{jobs} // 0) > 1 ? bless({
+		lock_path => $tmp->filename,
+		tmp => $tmp
+	}, 'PublicInbox::Lock') : undef;
+	($pp, $pipe_lk);
+}
+
+sub write_cb {
+	my ($cls, $dst, $lei) = @_;
+	if ($dst =~ s!\A(mbox(?:rd|cl|cl2|o))?:!!) {
+		my $m = "eml2$1";
+		my $eml2mbox = $cls->can($m) or die "$cls->$m missing";
+		my ($out, $pipe_lk);
+		open $out, '>>', $dst or die "open $dst: $!";
+		my $atomic = !!(($lei->{opt}->{jobs} // 0) > 1);
+		if ($dst =~ /\.(gz|bz2|xz)\z/) {
+			($out, $pipe_lk) = compress_dst($out, $1, $lei);
+		}
+		sub {
+			my ($buf, $oid, $kw) = @_;
+			$buf = $eml2mbox->(PublicInbox::Eml->new($buf), $kw);
+			my $lock = $pipe_lk->lock_for_scope if $pipe_lk;
+			write_in_full($out, $buf, $atomic);
+		}
+	}
+}
+
 1;
diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm
index 2ce7eb8f..c9234f42 100644
--- a/lib/PublicInbox/ProcessPipe.pm
+++ b/lib/PublicInbox/ProcessPipe.pm
@@ -4,28 +4,39 @@
 # a tied handle for auto reaping of children tied to a pipe, see perltie(1)
 package PublicInbox::ProcessPipe;
 use strict;
-use warnings;
+use v5.10.1;
 
 sub TIEHANDLE {
-	my ($class, $pid, $fh) = @_;
-	bless { pid => $pid, fh => $fh }, $class;
+	my ($class, $pid, $fh, $cb, $arg) = @_;
+	bless { pid => $pid, fh => $fh, cb => $cb, arg => $arg }, $class;
 }
 
 sub READ { read($_[0]->{fh}, $_[1], $_[2], $_[3] || 0) }
 
 sub READLINE { readline($_[0]->{fh}) }
 
+sub WRITE {
+	use bytes qw(length);
+	syswrite($_[0]->{fh}, $_[1], $_[2] // length($_[1]), $_[3] // 0);
+}
+
+sub PRINT {
+	my $self = shift;
+	print { $self->{fh} } @_;
+}
+
 sub CLOSE {
 	my $fh = delete($_[0]->{fh});
 	my $ret = defined $fh ? close($fh) : '';
-	my $pid = delete $_[0]->{pid};
+	my ($pid, $cb, $arg) = delete @{$_[0]}{qw(pid cb arg)};
 	if (defined $pid) {
 		# PublicInbox::DS may not be loaded
-		eval { PublicInbox::DS::dwaitpid($pid, undef, undef) };
+		eval { PublicInbox::DS::dwaitpid($pid, $cb, $arg) };
 
 		if ($@) { # ok, not in the event loop, work synchronously
 			waitpid($pid, 0);
 			$ret = '' if $?;
+			$cb->($arg, $pid) if $cb;
 		}
 	}
 	$ret;
diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t
index 089a422e..231cf543 100644
--- a/t/lei_to_mail.t
+++ b/t/lei_to_mail.t
@@ -62,4 +62,51 @@ for my $mbox (qw(mboxrd mboxo mboxcl mboxcl2)) {
 	}
 }
 
+my ($tmpdir, $for_destroy) = tmpdir();
+local $ENV{TMPDIR} = $tmpdir;
+open my $err, '>>', "$tmpdir/lei.err" or BAIL_OUT $!;
+my $lei = { 2 => $err };
+my $buf = <<'EOM';
+From: x@example.com
+Subject: x
+
+blah
+EOM
+my $fn = "$tmpdir/x.mbox";
+my $orig = do {
+	my $wcb = PublicInbox::LeiToMail->write_cb("mboxcl2:$fn", $lei);
+	is(ref $wcb, 'CODE', 'write_cb returned callback');
+	ok(-f $fn && !-s _, 'empty file created');
+	$wcb->(\(my $dup = $buf), 'deadbeef', [ qw(seen) ]);
+	undef $wcb;
+	open my $fh, '<', $fn or BAIL_OUT $!;
+	my $raw = do { local $/; <$fh> };
+	like($raw, qr/^blah\n/sm, 'wrote content');
+	unlink $fn or BAIL_OUT $!;
+
+	local $lei->{opt} = { jobs => 2 };
+	$wcb = PublicInbox::LeiToMail->write_cb("mboxcl2:$fn", $lei);
+	$wcb->(\($dup = $buf), 'deadbeef', [ qw(seen) ]);
+	undef $wcb;
+	open $fh, '<', $fn or BAIL_OUT $!;
+	is($raw, do { local $/; <$fh> }, 'jobs > 1');
+	$raw;
+};
+SKIP: {
+	use PublicInbox::Spawn qw(which);
+	my $gzip = which('gzip') or skip 'gzip not found', 1;
+	my $wcb = PublicInbox::LeiToMail->write_cb("mboxcl2:$fn.gz", $lei);
+	$wcb->(\(my $dup = $buf), 'deadbeef', [ qw(seen) ]);
+	undef $wcb;
+	my $uncompressed = xqx([$gzip, '-dc', "$fn.gz"]);
+	is($uncompressed, $orig, 'gzip works');
+
+	local $lei->{opt} = { jobs => 2 };
+	unlink "$fn.gz" or die "unlink $!";
+	$wcb = PublicInbox::LeiToMail->write_cb("mboxcl2:$fn.gz", $lei);
+	$wcb->(\(my $dupe = $buf), 'deadbeef', [ qw(seen) ]);
+	undef $wcb;
+	is(xqx([$gzip, '-dc', "$fn.gz"]), $orig);
+}
+
 done_testing;

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 08/36] mboxreader: new class for reading various mbox formats
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (6 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 07/36] lei_to_mail: start atomic and compressed mbox writing Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 09/36] lei_to_mail: start --augment, dedupe, bz2 and xz Eric Wong
                   ` (27 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

This is only lightly-tested against stuff LeiToMail generates
and will need real-world tests to validate.
---
 MANIFEST                      |   2 +
 lib/PublicInbox/MboxReader.pm | 124 ++++++++++++++++++++++++++++++++++
 t/mbox_reader.t               |  76 +++++++++++++++++++++
 3 files changed, 202 insertions(+)
 create mode 100644 lib/PublicInbox/MboxReader.pm
 create mode 100644 t/mbox_reader.t

diff --git a/MANIFEST b/MANIFEST
index d32f064e..1fb1e181 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -176,6 +176,7 @@ lib/PublicInbox/MIME.pm
 lib/PublicInbox/ManifestJsGz.pm
 lib/PublicInbox/Mbox.pm
 lib/PublicInbox/MboxGz.pm
+lib/PublicInbox/MboxReader.pm
 lib/PublicInbox/MiscIdx.pm
 lib/PublicInbox/MiscSearch.pm
 lib/PublicInbox/MsgIter.pm
@@ -334,6 +335,7 @@ t/lei_to_mail.t
 t/lei_xsearch.t
 t/linkify.t
 t/main-bin/spamc
+t/mbox_reader.t
 t/mda-mime.eml
 t/mda.t
 t/mda_filter_rubylang.t
diff --git a/lib/PublicInbox/MboxReader.pm b/lib/PublicInbox/MboxReader.pm
new file mode 100644
index 00000000..e1944aaf
--- /dev/null
+++ b/lib/PublicInbox/MboxReader.pm
@@ -0,0 +1,124 @@
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# reader for mbox variants we support
+package PublicInbox::MboxReader;
+use strict;
+use v5.10.1;
+use Data::Dumper;
+$Data::Dumper::Useqq = 1; # should've been the default, for bad data
+
+my $from_strict =
+	qr/^From \S+ +\S+ \S+ +\S+ [^\n:]+:[^\n:]+:[^\n:]+ [^\n:]+\n/sm;
+
+sub _mbox_from {
+	my ($mbfh, $from_re, $eml_cb, @arg) = @_;
+	my $buf = '';
+	my @raw;
+	while (defined(my $r = read($mbfh, $buf, 65536, length($buf)))) {
+		if ($r == 0) { # close here to check for "curl --fail"
+			close($mbfh) or die "error closing mbox: \$?=$? $!";
+			@raw = ($buf);
+		} else {
+			@raw = split(/$from_strict/mos, $buf, -1);
+			next if scalar(@raw) == 0;
+			$buf = pop(@raw); # last bit may be incomplete
+		}
+		@raw = grep /[^ \t\r\n]/s, @raw; # skip empty messages
+		while (defined(my $raw = shift @raw)) {
+			$raw =~ s/\r?\n\z//s;
+			$raw =~ s/$from_re/$1/gms;
+			my $eml = PublicInbox::Eml->new(\$raw);
+			$eml_cb->($eml, @arg);
+		}
+		return if $r == 0; # EOF
+	}
+	die "error reading mboxo/mboxrd handle: $!";
+}
+
+sub mboxrd {
+	my (undef, $mbfh, $eml_cb, @arg) = @_;
+	_mbox_from($mbfh, qr/^>(>*From )/ms, $eml_cb, @arg);
+}
+
+sub mboxo {
+	my (undef, $mbfh, $eml_cb, @arg) = @_;
+	_mbox_from($mbfh, qr/^>(From )/ms, $eml_cb, @arg);
+}
+
+sub _cl_body {
+	my ($mbfh, $bref, $cl) = @_;
+	my $body = substr($$bref, 0, $cl, '');
+	my $need = $cl - length($body);
+	if ($need > 0) {
+		$mbfh or die "E: needed $need bytes after EOF";
+		defined(my $r = read($mbfh, $body, $need, length($body))) or
+			die "E: read error: $!\n";
+		$r == $need or die "E: read $r of $need bytes\n";
+	}
+	\$body;
+}
+
+sub _extract_hdr {
+	my ($ref) = @_;
+	if (index($$ref, "\r\n") < 0 && (my $pos = index($$ref, "\n\n")) >= 0) {
+		# likely on *nix
+		\substr($$ref, 0, $pos + 2, ''); # sv_chop on $$ref
+	} elsif ($$ref =~ /\r?\n\r?\n/s) {
+		\substr($$ref, 0, $+[0], ''); # sv_chop on $$ref
+	} else {
+		undef
+	}
+}
+
+sub _mbox_cl ($$$;@) {
+	my ($mbfh, $uxs_from, $eml_cb, @arg) = @_;
+	my $buf = '';
+	while (defined(my $r = read($mbfh, $buf, 65536, length($buf)))) {
+		if ($r == 0) { # detect "curl --fail"
+			close($mbfh) or
+				die "error closing mboxcl/mboxcl2: \$?=$? $!";
+			undef $mbfh;
+		}
+		while (my $hdr = _extract_hdr(\$buf)) {
+			$$hdr =~ s/\A[\r\n]*From [^\n]*\n//s or
+				die "E: no 'From ' line in:\n", Dumper($hdr);
+			my $eml = PublicInbox::Eml->new($hdr);
+			my @cl = $eml->header_raw('Content-Length');
+			my $n = scalar(@cl);
+			$n == 0 and die "E: Content-Length missing in:\n",
+					Dumper($eml->as_string);
+			$n == 1 or die "E: multiple ($n) Content-Length in:\n",
+					Dumper($eml->as_string);
+			$cl[0] =~ /\A[0-9]+\z/ or die
+				"E: Content-Length `$cl[0]' invalid\n",
+					Dumper($eml->as_string);
+			if (($eml->{bdy} = _cl_body($mbfh, \$buf, $cl[0]))) {
+				$uxs_from and
+					${$eml->{bdy}} =~ s/^>From /From /sgm;
+			}
+			$eml_cb->($eml, @arg);
+		}
+		if ($r == 0) {
+			$buf =~ /[^ \r\n\t]/ and
+				warn "W: leftover at end of mboxcl/mboxcl2:\n",
+					Dumper(\$buf);
+			return;
+		}
+	}
+	die "error reading mboxcl/mboxcl2 handle: $!";
+}
+
+sub mboxcl {
+	my (undef, $mbfh, $eml_cb, @arg) = @_;
+	_mbox_cl($mbfh, 1, $eml_cb, @arg);
+}
+
+sub mboxcl2 {
+	my (undef, $mbfh, $eml_cb, @arg) = @_;
+	_mbox_cl($mbfh, undef, $eml_cb, @arg);
+}
+
+sub new { bless \(my $x), __PACKAGE__ }
+
+1;
diff --git a/t/mbox_reader.t b/t/mbox_reader.t
new file mode 100644
index 00000000..9391dc24
--- /dev/null
+++ b/t/mbox_reader.t
@@ -0,0 +1,76 @@
+#!perl -w
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+use List::Util qw(shuffle);
+use PublicInbox::Eml;
+use Fcntl qw(SEEK_SET);
+require_ok 'PublicInbox::MboxReader';
+require_ok 'PublicInbox::LeiToMail';
+my %raw = (
+	hdr_only => "From: header-only\@example.com\n\n",
+	small_from => "From: small-from\@example.com\n\nFrom hell\n",
+	small => "From: small\@example.com\n\nfrom hell\n",
+	big_hdr_only => "From: big-header\@example.com\n" .
+		(('A: '.('a' x 72)."\n") x 1000)."\n",
+	big_body => "From: big-body\@example.com\n\n".
+		(('b: '.('b' x 72)."\n") x 1000) .
+		"From hell\n",
+	big_all => "From: big-all\@example.com\n".
+		(("A: ".('a' x 72)."\n") x 1000). "\n" .
+		(("b: ".('b' x 72)."\n") x 1000) .
+		"From hell\n",
+);
+
+if ($ENV{TEST_EXTRA}) {
+	for my $fn (glob('t/*.eml'), glob('t/*/*.{patch,eml}')) {
+		$raw{$fn} = eml_load($fn)->as_string;
+	}
+}
+
+my $reader = PublicInbox::MboxReader->new;
+my $write_in_full = PublicInbox::LeiToMail->can('write_in_full');
+my $check_fmt = sub {
+	my $fmt = shift;
+	my @order = shuffle(keys %raw);
+	my $eml2mbox = PublicInbox::LeiToMail->can("eml2$fmt");
+	open my $fh, '+>', undef or BAIL_OUT "open: $!";
+	for my $k (@order) {
+		my $eml = PublicInbox::Eml->new($raw{$k});
+		my $buf = $eml2mbox->($eml);
+		$write_in_full->($fh, $buf, undef);
+	}
+	seek($fh, 0, SEEK_SET) or BAIL_OUT "seek: $!";
+	$reader->$fmt($fh, sub {
+		my ($eml) = @_;
+		my $cur = shift @order;
+		my @cl = $eml->header_raw('Content-Length');
+		if ($fmt =~ /\Amboxcl/) {
+			is(scalar(@cl), 1, "Content-Length set $fmt $cur");
+			my $raw = $eml->body_raw;
+			my $adj = 0;
+			if ($fmt eq 'mboxcl') {
+				my @from = ($raw =~ /^(From )/smg);
+				$adj = scalar(@from);
+			}
+			is(length($raw), $cl[0] - $adj,
+				"Content-Length is correct $fmt $cur");
+			# clobber for ->as_string comparison below
+			$eml->header_set('Content-Length');
+		} else {
+			is(scalar(@cl), 0, "Content-Length unset $fmt $cur");
+		}
+		my $orig = PublicInbox::Eml->new($raw{$cur});
+		is($eml->as_string, $orig->as_string,
+			"read back original $fmt $cur");
+	});
+};
+my @mbox = qw(mboxrd mboxo mboxcl mboxcl2);
+for my $fmt (@mbox) { $check_fmt->($fmt) }
+s/\n/\r\n/sg for (values %raw);
+for my $fmt (@mbox) { $check_fmt->($fmt) }
+
+done_testing;

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 09/36] lei_to_mail: start --augment, dedupe, bz2 and xz
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (7 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 08/36] mboxreader: new class for reading various mbox formats Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 10/36] lei: implement various deduplication strategies Eric Wong
                   ` (26 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

--augment will match the mairix(1) option of the same
name to augment existing search results.  We'll need
to implement deduplication for a better user experience.

mutt ships with compressed mbox support for bz2 and xz,
at least, so we'll support those out-of-the-box.
---
 lib/PublicInbox/LeiToMail.pm  | 140 ++++++++++++++++++++++++++--------
 lib/PublicInbox/Lock.pm       |   7 ++
 lib/PublicInbox/MboxReader.pm |   3 +
 t/lei_to_mail.t               |  47 ++++++++----
 4 files changed, 150 insertions(+), 47 deletions(-)

diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index ebb50c50..294291b2 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -8,10 +8,12 @@ use v5.10.1;
 use PublicInbox::Eml;
 use PublicInbox::Lock;
 use PublicInbox::ProcessPipe;
-use PublicInbox::Spawn qw(which spawn);
+use PublicInbox::SharedKV;
+use PublicInbox::Spawn qw(which spawn popen_rd);
+use PublicInbox::ContentHash qw(content_hash);
 use Symbol qw(gensym);
-use File::Temp ();
 use IO::Handle; # ->autoflush
+use Fcntl qw(SEEK_SET);
 
 my %kw2char = ( # Maildir characters
 	draft => 'D',
@@ -150,51 +152,123 @@ sub reap_compress { # dwaitpid callback
 	$lei->fail("@$cmd failed", $? >> 8);
 }
 
-sub compress_dst {
-	my ($out, $sfx, $lei) = @_;
-	my $cmd = [];
-	if ($sfx eq 'gz') {
-		$cmd->[0] = which($lei->{env}->{GZIP} // 'pigz') //
-				which('gzip') //
-			die "pigz or gzip missing for $sfx\n";
-			# TODO: use IO::Compress::Gzip
-		push @$cmd, '-c'; # stdout
-		push @$cmd, '--rsyncable' if $lei->{opt}->{rsyncable};
-	} else {
-		die "TODO $sfx"
+# all of these support -c for stdout and -d for decompression,
+# mutt is commonly distributed with hooks for gz, bz2 and xz, at least
+# { foo => '' } means "--foo" is passed to the command-line,
+# otherwise { foo => '--bar' } passes "--bar"
+our %zsfx2cmd = (
+	gz => [ qw(GZIP pigz gzip), {
+		rsyncable => '', threads => '-p' } ],
+	bz2 => [ 'bzip2', {} ],
+	xz => [ 'xz', { threads => '-T' } ],
+	# XXX does anybody care for these?  I prefer zstd on entire FSes,
+	# so it's probably not necessary on a per-file basis
+	# zst => [ 'zstd', { -default => [ qw(-q) ], # it's noisy by default
+	#	rsyncable => '', threads => '-T' } ],
+	# zz => [ 'pigz', { -default => [ '--zlib' ],
+	#	rsyncable => '', threads => '-p' }],
+	# lzo => [ 'lzop', {} ],
+	# lzma => [ 'lzma', {} ],
+);
+
+sub zsfx2cmd ($$$) {
+	my ($zsfx, $decompress, $lei) = @_;
+	my $x = $zsfx2cmd{$zsfx} // die "no support for suffix=.$zsfx";
+	my @info = @$x;
+	my $cmd_opt = pop @info;
+	my @cmd = (undef, $decompress ? qw(-dc) : qw(-c));
+	for my $exe (@info) {
+		# I think respecting client's ENV{GZIP} is OK, not sure
+		# about ENV overrides for other, less-common compressors
+		if ($exe eq uc($exe)) {
+			$exe = $lei->{env}->{$exe} or next;
+		}
+		$cmd[0] = which($exe) and last;
+	}
+	$cmd[0] // die join(' or ', @info)." missing for .$zsfx";
+	# push @cmd, @{$cmd_opt->{-default}} if $cmd_opt->{-default};
+	for my $bool (qw(rsyncable)) {
+		my $switch = $cmd_opt->{rsyncable} // next;
+		push @cmd, '--'.($switch || $bool);
+	}
+	for my $key (qw(threads)) { # support compression level?
+		my $switch = $cmd_opt->{$key} // next;
+		my $val = $lei->{opt}->{$key} // next;
+		push @cmd, $switch, $val;
 	}
+	\@cmd;
+}
+
+sub compress_dst {
+	my ($out, $zsfx, $lei) = @_;
+	my $cmd = zsfx2cmd($zsfx, undef, $lei);
 	pipe(my ($r, $w)) or die "pipe: $!";
 	my $rdr = { 0 => $r, 1 => $out, 2 => $lei->{2} };
 	my $pid = spawn($cmd, $lei->{env}, $rdr);
 	$lei->{"pid.$pid"} = $cmd;
 	my $pp = gensym;
 	tie *$pp, 'PublicInbox::ProcessPipe', $pid, $w, \&reap_compress, $lei;
-	my $tmp = File::Temp->new("$sfx.lock-XXXXXX", TMPDIR => 1);
-	my $pipe_lk = ($lei->{opt}->{jobs} // 0) > 1 ? bless({
-		lock_path => $tmp->filename,
-		tmp => $tmp
-	}, 'PublicInbox::Lock') : undef;
+	my $pipe_lk = ($lei->{opt}->{jobs} // 0) > 1 ?
+			PublicInbox::Lock->new_tmp($zsfx) : undef;
 	($pp, $pipe_lk);
 }
 
-sub write_cb {
-	my ($cls, $dst, $lei) = @_;
-	if ($dst =~ s!\A(mbox(?:rd|cl|cl2|o))?:!!) {
-		my $m = "eml2$1";
-		my $eml2mbox = $cls->can($m) or die "$cls->$m missing";
-		my ($out, $pipe_lk);
-		open $out, '>>', $dst or die "open $dst: $!";
-		my $atomic = !!(($lei->{opt}->{jobs} // 0) > 1);
-		if ($dst =~ /\.(gz|bz2|xz)\z/) {
-			($out, $pipe_lk) = compress_dst($out, $1, $lei);
-		}
-		sub {
-			my ($buf, $oid, $kw) = @_;
-			$buf = $eml2mbox->(PublicInbox::Eml->new($buf), $kw);
+sub decompress_src ($$$) {
+	my ($in, $zsfx, $lei) = @_;
+	my $cmd = zsfx2cmd($zsfx, 1, $lei);
+	my $rdr = { 0 => $in, 2 => $lei->{2} };
+	popen_rd($cmd, $lei->{env}, $rdr);
+}
+
+sub dup_src ($) {
+	my ($in) = @_;
+	open my $dup, '+>>&', $in or die "dup: $!";
+	$dup;
+}
+
+# --augment existing output destination, without duplicating anything
+sub _augment { # MboxReader eml_cb
+	my ($eml, $lei) = @_;
+	$lei->{skv}->set_maybe(content_hash($eml), '');
+}
+
+sub _mbox_write_cb ($$$$) {
+	my ($cls, $mbox, $dst, $lei) = @_;
+	my $m = "eml2$mbox";
+	my $eml2mbox = $cls->can($m) or die "$cls->$m missing";
+	my ($out, $pipe_lk);
+	open $out, '+>>', $dst or die "open $dst: $!";
+	# Perl does SEEK_END even with O_APPEND :<
+	seek($out, 0, SEEK_SET) or die "seek $dst: $!";
+	my $atomic = !!(($lei->{opt}->{jobs} // 0) > 1);
+	$lei->{skv} = PublicInbox::SharedKV->new;
+	$lei->{skv}->dbh;
+	state $zsfx_allow = join('|', keys %zsfx2cmd);
+	my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/);
+	if ($lei->{opt}->{augment}) {
+		my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) :
+				dup_src($out);
+		PublicInbox::MboxReader->$mbox($rd, \&_augment, $lei);
+	} else {
+		truncate($out, 0) or die "truncate $dst: $!";
+	}
+	($out, $pipe_lk) = compress_dst($out, $zsfx, $lei) if $zsfx;
+	sub {
+		my ($buf, $oid, $kw) = @_;
+		my $eml = PublicInbox::Eml->new($buf);
+		if ($lei->{skv}->set_maybe(content_hash($eml), '')) {
+			$buf = $eml2mbox->($eml, $kw);
 			my $lock = $pipe_lk->lock_for_scope if $pipe_lk;
 			write_in_full($out, $buf, $atomic);
 		}
 	}
 }
 
+sub write_cb { # returns a callback for git_to_mail
+	my ($cls, $dst, $lei) = @_;
+	if ($dst =~ s!\A(mbox(?:rd|cl|cl2|o))?:!!) {
+		_mbox_write_cb($cls, $1, $dst, $lei);
+	}
+}
+
 1;
diff --git a/lib/PublicInbox/Lock.pm b/lib/PublicInbox/Lock.pm
index 7fd17745..f6eaa5ce 100644
--- a/lib/PublicInbox/Lock.pm
+++ b/lib/PublicInbox/Lock.pm
@@ -8,6 +8,7 @@ use v5.10.1;
 use Fcntl qw(:flock :DEFAULT);
 use Carp qw(croak);
 use PublicInbox::OnDestroy;
+use File::Temp ();
 
 # we only acquire the flock if creating or reindexing;
 # PublicInbox::Import already has the lock on its own.
@@ -40,4 +41,10 @@ sub lock_for_scope {
 	PublicInbox::OnDestroy->new(\&lock_release, $self);
 }
 
+sub new_tmp {
+	my ($cls, $ident) = @_;
+	my $tmp = File::Temp->new("$ident.lock-XXXXXX", TMPDIR => 1);
+	bless { lock_path => $tmp->filename, tmp => $tmp }, $cls;
+}
+
 1;
diff --git a/lib/PublicInbox/MboxReader.pm b/lib/PublicInbox/MboxReader.pm
index e1944aaf..ac0c0f52 100644
--- a/lib/PublicInbox/MboxReader.pm
+++ b/lib/PublicInbox/MboxReader.pm
@@ -5,6 +5,7 @@
 package PublicInbox::MboxReader;
 use strict;
 use v5.10.1;
+use PublicInbox::DS (); # localize $in_loop for error detection :<
 use Data::Dumper;
 $Data::Dumper::Useqq = 1; # should've been the default, for bad data
 
@@ -13,6 +14,7 @@ my $from_strict =
 
 sub _mbox_from {
 	my ($mbfh, $from_re, $eml_cb, @arg) = @_;
+	local $PublicInbox::DS::in_loop; # disable dwaitpid
 	my $buf = '';
 	my @raw;
 	while (defined(my $r = read($mbfh, $buf, 65536, length($buf)))) {
@@ -73,6 +75,7 @@ sub _extract_hdr {
 
 sub _mbox_cl ($$$;@) {
 	my ($mbfh, $uxs_from, $eml_cb, @arg) = @_;
+	local $PublicInbox::DS::in_loop; # disable dwaitpid
 	my $buf = '';
 	while (defined(my $r = read($mbfh, $buf, 65536, length($buf)))) {
 		if ($r == 0) { # detect "curl --fail"
diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t
index 231cf543..e4551e69 100644
--- a/t/lei_to_mail.t
+++ b/t/lei_to_mail.t
@@ -92,21 +92,40 @@ my $orig = do {
 	is($raw, do { local $/; <$fh> }, 'jobs > 1');
 	$raw;
 };
-SKIP: {
-	use PublicInbox::Spawn qw(which);
-	my $gzip = which('gzip') or skip 'gzip not found', 1;
-	my $wcb = PublicInbox::LeiToMail->write_cb("mboxcl2:$fn.gz", $lei);
-	$wcb->(\(my $dup = $buf), 'deadbeef', [ qw(seen) ]);
-	undef $wcb;
-	my $uncompressed = xqx([$gzip, '-dc', "$fn.gz"]);
-	is($uncompressed, $orig, 'gzip works');
+for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
+	my $zsfx2cmd = PublicInbox::LeiToMail->can('zsfx2cmd');
+	SKIP: {
+		my $cmd = eval { $zsfx2cmd->($zsfx, 0, $lei) };
+		skip $@, 3 if $@;
+		my $dc_cmd = eval { $zsfx2cmd->($zsfx, 1, $lei) };
+		ok($dc_cmd, "decompressor for .$zsfx");
+		my $f = "$fn.$zsfx";
+		my $dst = "mboxcl2:$f";
+		my $wcb = PublicInbox::LeiToMail->write_cb($dst, $lei);
+		$wcb->(\(my $dup = $buf), 'deadbeef', [ qw(seen) ]);
+		undef $wcb;
+		my $uncompressed = xqx([@$dc_cmd, $f]);
+		is($uncompressed, $orig, "$zsfx works unlocked");
 
-	local $lei->{opt} = { jobs => 2 };
-	unlink "$fn.gz" or die "unlink $!";
-	$wcb = PublicInbox::LeiToMail->write_cb("mboxcl2:$fn.gz", $lei);
-	$wcb->(\(my $dupe = $buf), 'deadbeef', [ qw(seen) ]);
-	undef $wcb;
-	is(xqx([$gzip, '-dc', "$fn.gz"]), $orig);
+		local $lei->{opt} = { jobs => 2 }; # for atomic writes
+		unlink $f or BAIL_OUT "unlink $!";
+		$wcb = PublicInbox::LeiToMail->write_cb($dst, $lei);
+		$wcb->(\($dup = $buf), 'deadbeef', [ qw(seen) ]);
+		undef $wcb;
+		is(xqx([@$dc_cmd, $f]), $orig, "$zsfx matches with lock");
+	}
+}
+
+unlink $fn or BAIL_OUT $!;
+if ('default deduplication uses content_hash') {
+	my $wcb = PublicInbox::LeiToMail->write_cb("mboxo:$fn", $lei);
+	$wcb->(\(my $x = $buf), 'deadbeef', []) for (1..2);
+	undef $wcb; # undef to commit changes
+	my $cmp = '';
+	open my $fh, '<', $fn or BAIL_OUT $!;
+	require PublicInbox::MboxReader;
+	PublicInbox::MboxReader->mboxo($fh, sub { $cmp .= shift->as_string });
+	is($cmp, $buf, 'only one message written');
 }
 
 done_testing;

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 10/36] lei: implement various deduplication strategies
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (8 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 09/36] lei_to_mail: start --augment, dedupe, bz2 and xz Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 11/36] lei_to_mail: lazy-require LeiDedupe Eric Wong
                   ` (25 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

For writing mboxes and Maildirs, users may wish to use
stricter or looser deduplication strategies.  This
gives them more control.
---
 MANIFEST                     |  2 +
 lib/PublicInbox/LEI.pm       |  2 +-
 lib/PublicInbox/LeiDedupe.pm | 96 ++++++++++++++++++++++++++++++++++++
 lib/PublicInbox/LeiToMail.pm | 26 +++++-----
 t/lei_dedupe.t               | 59 ++++++++++++++++++++++
 t/lei_to_mail.t              |  3 ++
 6 files changed, 176 insertions(+), 12 deletions(-)
 create mode 100644 lib/PublicInbox/LeiDedupe.pm
 create mode 100644 t/lei_dedupe.t

diff --git a/MANIFEST b/MANIFEST
index 1fb1e181..7ce2075e 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -162,6 +162,7 @@ lib/PublicInbox/InboxWritable.pm
 lib/PublicInbox/Isearch.pm
 lib/PublicInbox/KQNotify.pm
 lib/PublicInbox/LEI.pm
+lib/PublicInbox/LeiDedupe.pm
 lib/PublicInbox/LeiExtinbox.pm
 lib/PublicInbox/LeiSearch.pm
 lib/PublicInbox/LeiStore.pm
@@ -330,6 +331,7 @@ t/iso-2202-jp.eml
 t/kqnotify.t
 t/lei-oneshot.t
 t/lei.t
+t/lei_dedupe.t
 t/lei_store.t
 t/lei_to_mail.t
 t/lei_xsearch.t
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 7002a1f7..9aa4d95a 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -172,7 +172,7 @@ my %OPTDESC = (
 
 'type=s' => [ 'any|mid|git', 'disambiguate type' ],
 
-'dedupe|d=s' => ['STRAT|content|oid|mid',
+'dedupe|d=s' => ['STRAT|content|oid|mid|none',
 		'deduplication strategy'],
 'show	thread|t' => 'display entire thread a message belongs to',
 'q	thread|t' =>
diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm
new file mode 100644
index 00000000..c6eb7196
--- /dev/null
+++ b/lib/PublicInbox/LeiDedupe.pm
@@ -0,0 +1,96 @@
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+package PublicInbox::LeiDedupe;
+use strict;
+use v5.10.1;
+use PublicInbox::SharedKV;
+use PublicInbox::ContentHash qw(content_hash);
+
+# n.b. mutt sets most of these headers not sure about Bytes
+our @OID_IGNORE = qw(Status X-Status Content-Length Lines Bytes);
+
+# best-effort regeneration of OID when augmenting existing results
+sub _regen_oid ($) {
+	my ($eml) = @_;
+	my @stash; # stash away headers we shouldn't have in git
+	for my $k (@OID_IGNORE) {
+		my @v = $eml->header_raw($k) or next;
+		push @stash, [ $k, \@v ];
+		$eml->header_set($k); # restore below
+	}
+	my $dig = Digest::SHA->new(1); # XXX SHA256 later
+	my $buf = $eml->as_string;
+	$dig->add('blob '.length($buf)."\0");
+	$dig->add($buf);
+	undef $buf;
+
+	for my $kv (@stash) { # restore stashed headers
+		my ($k, @v) = @$kv;
+		$eml->header_set($k, @v);
+	}
+	$dig->digest;
+}
+
+sub _oidbin ($) { defined($_[0]) ? pack('H*', $_[0]) : undef }
+
+# the paranoid option
+sub dedupe_oid () {
+	my $skv = PublicInbox::SharedKV->new;
+	($skv, sub { # may be called in a child process
+		my ($eml, $oid) = @_;
+		$skv->set_maybe(_oidbin($oid) // _regen_oid($eml), '');
+	});
+}
+
+# dangerous if there's duplicate messages with different Message-IDs
+sub dedupe_mid () {
+	my $skv = PublicInbox::SharedKV->new;
+	($skv, sub { # may be called in a child process
+		my ($eml, $oid) = @_;
+		# TODO: lei will support non-public messages w/o Message-ID
+		my $mid = $eml->header_raw('Message-ID') // _oidbin($oid) //
+			content_hash($eml);
+		$skv->set_maybe($mid, '');
+	});
+}
+
+# our default deduplication strategy (used by v2, also)
+sub dedupe_content () {
+	my $skv = PublicInbox::SharedKV->new;
+	($skv, sub { # may be called in a child process
+		my ($eml) = @_; # oid = $_[1], ignored
+		$skv->set_maybe(content_hash($eml), '');
+	});
+}
+
+# no deduplication at all
+sub dedupe_none () { (undef, sub { 1 }) }
+
+sub new {
+	my ($cls, $lei) = @_;
+	my $dd = $lei->{opt}->{dedupe} // 'content';
+	my $dd_new = $cls->can("dedupe_$dd") //
+			die "unsupported dedupe strategy: $dd\n";
+	bless [ $dd_new->() ], $cls; # [ $skv, $cb ]
+}
+
+# returns true on unseen messages according to the deduplication strategy,
+# returns false if seen
+sub is_dup {
+	my ($self, $eml, $oid) = @_;
+	!$self->[1]->($eml, $oid);
+}
+
+sub prepare_dedupe {
+	my ($self) = @_;
+	my $skv = $self->[0];
+	$skv ? $skv->dbh : undef;
+}
+
+sub pause_dedupe {
+	my ($self) = @_;
+	my $skv = $self->[0];
+	delete($skv->{dbh}) if $skv;
+}
+
+1;
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 294291b2..ead00d1a 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -8,9 +8,8 @@ use v5.10.1;
 use PublicInbox::Eml;
 use PublicInbox::Lock;
 use PublicInbox::ProcessPipe;
-use PublicInbox::SharedKV;
 use PublicInbox::Spawn qw(which spawn popen_rd);
-use PublicInbox::ContentHash qw(content_hash);
+use PublicInbox::LeiDedupe;
 use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET);
@@ -226,10 +225,11 @@ sub dup_src ($) {
 	$dup;
 }
 
-# --augment existing output destination, without duplicating anything
+# --augment existing output destination, with deduplication
 sub _augment { # MboxReader eml_cb
 	my ($eml, $lei) = @_;
-	$lei->{skv}->set_maybe(content_hash($eml), '');
+	# ignore return value, just populate the skv
+	$lei->{dedupe_cb}->is_dup($eml);
 }
 
 sub _mbox_write_cb ($$$$) {
@@ -240,23 +240,27 @@ sub _mbox_write_cb ($$$$) {
 	open $out, '+>>', $dst or die "open $dst: $!";
 	# Perl does SEEK_END even with O_APPEND :<
 	seek($out, 0, SEEK_SET) or die "seek $dst: $!";
-	my $atomic = !!(($lei->{opt}->{jobs} // 0) > 1);
-	$lei->{skv} = PublicInbox::SharedKV->new;
-	$lei->{skv}->dbh;
+	my $jobs = $lei->{opt}->{jobs} // 0;
+	my $atomic = $jobs > 1;
+	my $dedupe = $lei->{dedupe} = PublicInbox::LeiDedupe->new($lei);
 	state $zsfx_allow = join('|', keys %zsfx2cmd);
 	my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/);
 	if ($lei->{opt}->{augment}) {
-		my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) :
-				dup_src($out);
-		PublicInbox::MboxReader->$mbox($rd, \&_augment, $lei);
+		if (-s $out && $dedupe->prepare_dedupe) {
+			my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) :
+					dup_src($out);
+			PublicInbox::MboxReader->$mbox($rd, \&_augment, $lei);
+		}
+		$dedupe->pause_dedupe if $jobs; # are we forking?
 	} else {
 		truncate($out, 0) or die "truncate $dst: $!";
+		$dedupe->prepare_dedupe if !$jobs;
 	}
 	($out, $pipe_lk) = compress_dst($out, $zsfx, $lei) if $zsfx;
 	sub {
 		my ($buf, $oid, $kw) = @_;
 		my $eml = PublicInbox::Eml->new($buf);
-		if ($lei->{skv}->set_maybe(content_hash($eml), '')) {
+		if (!$lei->{dedupe}->is_dup($eml, $oid)) {
 			$buf = $eml2mbox->($eml, $kw);
 			my $lock = $pipe_lk->lock_for_scope if $pipe_lk;
 			write_in_full($out, $buf, $atomic);
diff --git a/t/lei_dedupe.t b/t/lei_dedupe.t
new file mode 100644
index 00000000..08f38aa0
--- /dev/null
+++ b/t/lei_dedupe.t
@@ -0,0 +1,59 @@
+#!perl -w
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+use PublicInbox::Eml;
+require_mods(qw(DBD::SQLite));
+use_ok 'PublicInbox::LeiDedupe';
+my $eml = eml_load('t/plack-qp.eml');
+my $mid = $eml->header_raw('Message-ID');
+my $different = eml_load('t/msg_iter-order.eml');
+$different->header_set('Message-ID', $mid);
+
+my $lei = { opt => { dedupe => 'none' } };
+my $dd = PublicInbox::LeiDedupe->new($lei);
+$dd->prepare_dedupe;
+ok(!$dd->is_dup($eml), '1st is_dup w/o dedupe');
+ok(!$dd->is_dup($eml), '2nd is_dup w/o dedupe');
+ok(!$dd->is_dup($different), 'different is_dup w/o dedupe');
+
+for my $strat (undef, 'content') {
+	$lei->{opt}->{dedupe} = $strat;
+	$dd = PublicInbox::LeiDedupe->new($lei);
+	$dd->prepare_dedupe;
+	my $desc = $strat // 'default';
+	ok(!$dd->is_dup($eml), "1st is_dup with $desc dedupe");
+	ok($dd->is_dup($eml), "2nd seen with $desc dedupe");
+	ok(!$dd->is_dup($different), "different is_dup with $desc dedupe");
+}
+$lei->{opt}->{dedupe} = 'bogus';
+eval { PublicInbox::LeiDedupe->new($lei) };
+like($@, qr/unsupported.*bogus/, 'died on bogus strategy');
+
+$lei->{opt}->{dedupe} = 'mid';
+$dd = PublicInbox::LeiDedupe->new($lei);
+$dd->prepare_dedupe;
+ok(!$dd->is_dup($eml), '1st is_dup with mid dedupe');
+ok($dd->is_dup($eml), '2nd seen with mid dedupe');
+ok($dd->is_dup($different), 'different seen with mid dedupe');
+
+$lei->{opt}->{dedupe} = 'oid';
+$dd = PublicInbox::LeiDedupe->new($lei);
+$dd->prepare_dedupe;
+
+# --augment won't have OIDs:
+ok(!$dd->is_dup($eml), '1st is_dup with oid dedupe (augment)');
+ok($dd->is_dup($eml), '2nd seen with oid dedupe (augment)');
+ok(!$dd->is_dup($different), 'different is_dup with mid dedupe (augment)');
+$different->header_set('Status', 'RO');
+ok($dd->is_dup($different), 'different seen with oid dedupe Status removed');
+
+ok(!$dd->is_dup($eml, '01d'), '1st is_dup with oid dedupe');
+ok($dd->is_dup($different, '01d'), 'different content ignored if oid matches');
+ok($dd->is_dup($eml, '01D'), 'case insensitive oid comparison :P');
+ok(!$dd->is_dup($eml, '01dbad'), 'case insensitive oid comparison :P');
+
+done_testing;
diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t
index e4551e69..5be4e285 100644
--- a/t/lei_to_mail.t
+++ b/t/lei_to_mail.t
@@ -6,6 +6,7 @@ use v5.10.1;
 use Test::More;
 use PublicInbox::TestCommon;
 use PublicInbox::Eml;
+require_mods(qw(DBD::SQLite));
 use_ok 'PublicInbox::LeiToMail';
 my $from = "Content-Length: 10\nSubject: x\n\nFrom hell\n";
 my $noeol = "Subject: x\n\nFrom hell";
@@ -86,6 +87,7 @@ my $orig = do {
 
 	local $lei->{opt} = { jobs => 2 };
 	$wcb = PublicInbox::LeiToMail->write_cb("mboxcl2:$fn", $lei);
+	$lei->{dedupe}->prepare_dedupe;
 	$wcb->(\($dup = $buf), 'deadbeef', [ qw(seen) ]);
 	undef $wcb;
 	open $fh, '<', $fn or BAIL_OUT $!;
@@ -110,6 +112,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
 		local $lei->{opt} = { jobs => 2 }; # for atomic writes
 		unlink $f or BAIL_OUT "unlink $!";
 		$wcb = PublicInbox::LeiToMail->write_cb($dst, $lei);
+		$lei->{dedupe}->prepare_dedupe;
 		$wcb->(\($dup = $buf), 'deadbeef', [ qw(seen) ]);
 		undef $wcb;
 		is(xqx([@$dc_cmd, $f]), $orig, "$zsfx matches with lock");

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 11/36] lei_to_mail: lazy-require LeiDedupe
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (9 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 10/36] lei: implement various deduplication strategies Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 12/36] lei_to_mail: support for non-seekable outputs Eric Wong
                   ` (24 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

LeiDedupe requires SQLite, so we may want to be able to test
writing mail without DBI or SQLite down the line.
---
 lib/PublicInbox/LeiToMail.pm | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index ead00d1a..1c0f3108 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -270,9 +270,11 @@ sub _mbox_write_cb ($$$$) {
 
 sub write_cb { # returns a callback for git_to_mail
 	my ($cls, $dst, $lei) = @_;
+	require PublicInbox::LeiDedupe;
 	if ($dst =~ s!\A(mbox(?:rd|cl|cl2|o))?:!!) {
 		_mbox_write_cb($cls, $1, $dst, $lei);
 	}
+	# TODO: Maildir, MH, IMAP, JMAP ...
 }
 
 1;

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 12/36] lei_to_mail: support for non-seekable outputs
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (10 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 11/36] lei_to_mail: lazy-require LeiDedupe Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 13/36] lei_to_mail: support Maildir, fix+test --augment Eric Wong
                   ` (23 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

Users may wish to pipe output to "git am", "spamc",
or similar, so we need to support those cases and
not bail out on lseek(2) or ftruncate(2) failures.
---
 lib/PublicInbox/LeiToMail.pm | 24 ++++++++++++++++--------
 t/lei_to_mail.t              | 29 ++++++++++++++++++++++++++++-
 2 files changed, 44 insertions(+), 9 deletions(-)

diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 1c0f3108..4476d84c 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -12,7 +12,7 @@ use PublicInbox::Spawn qw(which spawn popen_rd);
 use PublicInbox::LeiDedupe;
 use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
-use Fcntl qw(SEEK_SET);
+use Fcntl qw(SEEK_SET SEEK_END);
 
 my %kw2char = ( # Maildir characters
 	draft => 'D',
@@ -236,26 +236,34 @@ sub _mbox_write_cb ($$$$) {
 	my ($cls, $mbox, $dst, $lei) = @_;
 	my $m = "eml2$mbox";
 	my $eml2mbox = $cls->can($m) or die "$cls->$m missing";
-	my ($out, $pipe_lk);
-	open $out, '+>>', $dst or die "open $dst: $!";
-	# Perl does SEEK_END even with O_APPEND :<
-	seek($out, 0, SEEK_SET) or die "seek $dst: $!";
+	my ($out, $pipe_lk, $seekable);
+	# XXX should we support /dev/stdout.gz ?
+	if ($dst eq '/dev/stdout') {
+		$out = $lei->{1};
+	} else { # TODO: mbox locking
+		open $out, '+>>', $dst or die "open $dst: $!";
+		# Perl does SEEK_END even with O_APPEND :<
+		$seekable = seek($out, 0, SEEK_SET);
+		die "seek $dst: $!\n" if !$seekable && !$!{ESPIPE};
+	}
 	my $jobs = $lei->{opt}->{jobs} // 0;
 	my $atomic = $jobs > 1;
 	my $dedupe = $lei->{dedupe} = PublicInbox::LeiDedupe->new($lei);
 	state $zsfx_allow = join('|', keys %zsfx2cmd);
 	my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/);
 	if ($lei->{opt}->{augment}) {
-		if (-s $out && $dedupe->prepare_dedupe) {
+		if ($seekable && -s $out && $dedupe->prepare_dedupe) {
 			my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) :
 					dup_src($out);
 			PublicInbox::MboxReader->$mbox($rd, \&_augment, $lei);
+		} elsif ($seekable && !$atomic) {
+			seek($out, 0, SEEK_END) or die "seek: $!";
 		}
 		$dedupe->pause_dedupe if $jobs; # are we forking?
-	} else {
+	} elsif ($seekable) {
 		truncate($out, 0) or die "truncate $dst: $!";
-		$dedupe->prepare_dedupe if !$jobs;
 	}
+	$dedupe->prepare_dedupe if !$jobs;
 	($out, $pipe_lk) = compress_dst($out, $zsfx, $lei) if $zsfx;
 	sub {
 		my ($buf, $oid, $kw) = @_;
diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t
index 5be4e285..f3cc71ad 100644
--- a/t/lei_to_mail.t
+++ b/t/lei_to_mail.t
@@ -6,6 +6,7 @@ use v5.10.1;
 use Test::More;
 use PublicInbox::TestCommon;
 use PublicInbox::Eml;
+use Fcntl qw(SEEK_SET);
 require_mods(qw(DBD::SQLite));
 use_ok 'PublicInbox::LeiToMail';
 my $from = "Content-Length: 10\nSubject: x\n\nFrom hell\n";
@@ -120,15 +121,41 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
 }
 
 unlink $fn or BAIL_OUT $!;
+require PublicInbox::MboxReader;
 if ('default deduplication uses content_hash') {
 	my $wcb = PublicInbox::LeiToMail->write_cb("mboxo:$fn", $lei);
 	$wcb->(\(my $x = $buf), 'deadbeef', []) for (1..2);
 	undef $wcb; # undef to commit changes
 	my $cmp = '';
 	open my $fh, '<', $fn or BAIL_OUT $!;
-	require PublicInbox::MboxReader;
 	PublicInbox::MboxReader->mboxo($fh, sub { $cmp .= shift->as_string });
 	is($cmp, $buf, 'only one message written');
 }
 
+{ # stdout support
+	open my $tmp, '+>', undef or BAIL_OUT $!;
+	local $lei->{1} = $tmp;
+	my $wcb = PublicInbox::LeiToMail->write_cb("mboxrd:/dev/stdout", $lei);
+	$wcb->(\(my $x = $buf), 'deadbeef', []);
+	undef $wcb; # commit
+	seek($tmp, 0, SEEK_SET) or BAIL_OUT $!;
+	my $cmp = '';
+	PublicInbox::MboxReader->mboxrd($tmp, sub { $cmp .= shift->as_string });
+	is($cmp, $buf, 'message written to stdout');
+}
+
+SKIP: { # FIFO support
+	use PublicInbox::Spawn qw(popen_rd which);
+	use POSIX qw(mkfifo);
+	my $fn = "$tmpdir/fifo";
+	mkfifo($fn, 0600) or skip("mkfifo not supported: $!", 1);
+	my $cat = popen_rd([which('cat'), $fn]);
+	my $wcb = PublicInbox::LeiToMail->write_cb("mboxo:$fn", $lei);
+	$wcb->(\(my $x = $buf), 'deadbeef', []);
+	undef $wcb; # commit
+	my $cmp = '';
+	PublicInbox::MboxReader->mboxo($cat, sub { $cmp .= shift->as_string });
+	is($cmp, $buf, 'message written to FIFO');
+}
+
 done_testing;

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 13/36] lei_to_mail: support Maildir, fix+test --augment
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (11 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 12/36] lei_to_mail: support for non-seekable outputs Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 14/36] ipc: generic IPC dispatch based on Storable Eric Wong
                   ` (22 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

Maildir should be plenty fine for short-lived output folders.
---
 lib/PublicInbox/LeiDedupe.pm |   6 +-
 lib/PublicInbox/LeiToMail.pm | 133 +++++++++++++++++++++++++++--------
 t/lei_to_mail.t              |  97 +++++++++++++++++++++++--
 t/mbox_reader.t              |   3 +-
 4 files changed, 199 insertions(+), 40 deletions(-)

diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm
index c6eb7196..34d8bc27 100644
--- a/lib/PublicInbox/LeiDedupe.pm
+++ b/lib/PublicInbox/LeiDedupe.pm
@@ -67,8 +67,12 @@ sub dedupe_content () {
 sub dedupe_none () { (undef, sub { 1 }) }
 
 sub new {
-	my ($cls, $lei) = @_;
+	my ($cls, $lei, $dst) = @_;
 	my $dd = $lei->{opt}->{dedupe} // 'content';
+
+	# allow "none" to bypass Eml->new if writing to directory:
+	return if ($dd eq 'none' && substr($dst // '', -1) eq '/');
+
 	my $dd_new = $cls->can("dedupe_$dd") //
 			die "unsupported dedupe strategy: $dd\n";
 	bless [ $dd_new->() ], $cls; # [ $skv, $cb ]
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 4476d84c..0b2685b0 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -12,7 +12,8 @@ use PublicInbox::Spawn qw(which spawn popen_rd);
 use PublicInbox::LeiDedupe;
 use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
-use Fcntl qw(SEEK_SET SEEK_END);
+use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
+use Errno qw(EEXIST ESPIPE);
 
 my %kw2char = ( # Maildir characters
 	draft => 'D',
@@ -52,14 +53,15 @@ sub _mbox_hdr_buf ($$$) {
 	$buf;
 }
 
-sub write_in_full ($$$) {
-	my ($fh, $buf, $atomic) = @_;
-	if ($atomic) {
-		defined(my $w = syswrite($fh, $$buf)) or die "write: $!";
-		$w == length($$buf) or die "short write: $w != ".length($$buf);
-	} else {
-		print $fh $$buf or die "print: $!";
-	}
+sub atomic_append { # for on-disk destinations (O_APPEND, or O_EXCL)
+	my ($fh, $buf) = @_;
+	defined(my $w = syswrite($fh, $$buf)) or die "write: $!";
+	$w == length($$buf) or die "short write: $w != ".length($$buf);
+}
+
+sub _print_full {
+	my ($fh, $buf) = @_;
+	print $fh $$buf or die "print: $!";
 }
 
 sub eml2mboxrd ($;$) {
@@ -117,18 +119,6 @@ sub eml2mboxcl2 {
 	$buf;
 }
 
-sub mkmaildir ($) {
-	my ($maildir) = @_;
-	for (qw(new tmp cur)) {
-		my $d = "$maildir/$_";
-		next if -d $d;
-		require File::Path;
-		if (!File::Path::mkpath($d) && !-d $d) {
-			die "failed to mkpath($d): $!\n";
-		}
-	}
-}
-
 sub git_to_mail { # git->cat_async callback
 	my ($bref, $oid, $type, $size, $arg) = @_;
 	if ($type ne 'blob') {
@@ -229,7 +219,7 @@ sub dup_src ($) {
 sub _augment { # MboxReader eml_cb
 	my ($eml, $lei) = @_;
 	# ignore return value, just populate the skv
-	$lei->{dedupe_cb}->is_dup($eml);
+	$lei->{dedupe}->is_dup($eml);
 }
 
 sub _mbox_write_cb ($$$$) {
@@ -244,36 +234,115 @@ sub _mbox_write_cb ($$$$) {
 		open $out, '+>>', $dst or die "open $dst: $!";
 		# Perl does SEEK_END even with O_APPEND :<
 		$seekable = seek($out, 0, SEEK_SET);
-		die "seek $dst: $!\n" if !$seekable && !$!{ESPIPE};
+		die "seek $dst: $!\n" if !$seekable && $! != ESPIPE;
 	}
 	my $jobs = $lei->{opt}->{jobs} // 0;
-	my $atomic = $jobs > 1;
-	my $dedupe = $lei->{dedupe} = PublicInbox::LeiDedupe->new($lei);
 	state $zsfx_allow = join('|', keys %zsfx2cmd);
 	my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/);
+	my $write = $jobs > 1 && !$zsfx ? \&atomic_append : \&_print_full;
+	my $dedupe = $lei->{dedupe} = PublicInbox::LeiDedupe->new($lei);
 	if ($lei->{opt}->{augment}) {
-		if ($seekable && -s $out && $dedupe->prepare_dedupe) {
+		die "cannot augment $dst, not seekable\n" if !$seekable;
+		if (-s $out && $dedupe->prepare_dedupe) {
 			my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) :
 					dup_src($out);
 			PublicInbox::MboxReader->$mbox($rd, \&_augment, $lei);
-		} elsif ($seekable && !$atomic) {
-			seek($out, 0, SEEK_END) or die "seek: $!";
 		}
+		# maybe some systems don't honor O_APPEND, Perl does this:
+		seek($out, 0, SEEK_END) or die "seek $dst: $!";
 		$dedupe->pause_dedupe if $jobs; # are we forking?
 	} elsif ($seekable) {
 		truncate($out, 0) or die "truncate $dst: $!";
 	}
 	$dedupe->prepare_dedupe if !$jobs;
 	($out, $pipe_lk) = compress_dst($out, $zsfx, $lei) if $zsfx;
-	sub {
+	sub { # for git_to_mail
 		my ($buf, $oid, $kw) = @_;
 		my $eml = PublicInbox::Eml->new($buf);
-		if (!$lei->{dedupe}->is_dup($eml, $oid)) {
+		if (!$dedupe->is_dup($eml, $oid)) {
 			$buf = $eml2mbox->($eml, $kw);
 			my $lock = $pipe_lk->lock_for_scope if $pipe_lk;
-			write_in_full($out, $buf, $atomic);
+			$write->($out, $buf);
+		}
+	}
+}
+
+sub _maildir_each_file ($$;@) {
+	my ($dir, $cb, @arg) = @_;
+	for my $d (qw(new/ cur/)) {
+		my $pfx = $dir.$d;
+		opendir my $dh, $pfx or next;
+		while (defined(my $fn = readdir($dh))) {
+			$cb->($pfx.$fn, @arg) if $fn =~ /:2,[A-Za-z]*\z/;
+		}
+	}
+}
+
+sub _augment_file { # _maildir_each_file cb
+	my ($f, $lei) = @_;
+	my $eml = PublicInbox::InboxWritable::eml_from_path($f) or return;
+	_augment($eml, $lei);
+}
+
+# _maildir_each_file callback, \&CORE::unlink doesn't work with it
+sub _unlink { unlink($_[0]) }
+
+sub _buf2maildir {
+	my ($dst, $buf, $oid, $kw) = @_;
+	my $sfx = join('', sort(map { $kw2char{$_} // () } @$kw));
+	my $rand = ''; # chosen by die roll :P
+	my ($tmp, $fh, $final);
+	do {
+		$tmp = $dst.'tmp/'.$rand."oid=$oid";
+	} while (!sysopen($fh, $tmp, O_CREAT|O_EXCL|O_WRONLY) &&
+		$! == EEXIST && ($rand = int(rand 0x7fffffff).','));
+	if (print $fh $$buf and close($fh)) {
+		$dst .= $sfx eq '' ? 'new/' : 'cur/';
+		$rand = '';
+		do {
+			$final = $dst.$rand."oid=$oid:2,$sfx";
+		} while (!link($tmp, $final) && $! == EEXIST &&
+			($rand = int(rand 0x7fffffff).','));
+		unlink($tmp) or warn "W: failed to unlink $tmp: $!\n";
+	} else {
+		my $err = $!;
+		unlink($tmp);
+		die "Error writing $oid to $dst: $err";
+	}
+}
+
+
+sub _maildir_write_cb ($$) {
+	my ($dst, $lei) = @_;
+	$dst .= '/' unless substr($dst, -1) eq '/';
+	my $dedupe = $lei->{dedupe} = PublicInbox::LeiDedupe->new($lei, $dst);
+	my $jobs = $lei->{opt}->{jobs} // 0;
+	if ($lei->{opt}->{augment}) {
+		if ($dedupe && $dedupe->prepare_dedupe) {
+			require PublicInbox::InboxWritable; # eml_from_path
+			_maildir_each_file($dst, \&_augment_file, $lei);
+			$dedupe->pause_dedupe if $jobs; # are we forking?
+		}
+	} else { # clobber existing Maildir
+		_maildir_each_file($dst, \&_unlink);
+	}
+	for my $x (qw(tmp new cur)) {
+		my $d = $dst.$x;
+		next if -d $d;
+		require File::Path;
+		if (!File::Path::mkpath($d) && !-d $d) {
+			die "failed to mkpath($d): $!\n";
 		}
 	}
+	$dedupe->prepare_dedupe if $dedupe && !$jobs;
+	sub { # for git_to_mail
+		my ($buf, $oid, $kw) = @_;
+		return _buf2maildir($dst, $buf, $oid, $kw) if !$dedupe;
+		my $eml = PublicInbox::Eml->new($$buf); # copy buf
+		return if $dedupe->is_dup($eml, $oid);
+		undef $eml;
+		_buf2maildir($dst, $buf, $oid, $kw);
+	}
 }
 
 sub write_cb { # returns a callback for git_to_mail
@@ -281,6 +350,8 @@ sub write_cb { # returns a callback for git_to_mail
 	require PublicInbox::LeiDedupe;
 	if ($dst =~ s!\A(mbox(?:rd|cl|cl2|o))?:!!) {
 		_mbox_write_cb($cls, $1, $dst, $lei);
+	} elsif ($dst =~ s!\A[Mm]aildir:!!) { # typically capitalized
+		_maildir_write_cb($dst, $lei);
 	}
 	# TODO: Maildir, MH, IMAP, JMAP ...
 }
diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t
index f3cc71ad..bd864754 100644
--- a/t/lei_to_mail.t
+++ b/t/lei_to_mail.t
@@ -7,14 +7,18 @@ use Test::More;
 use PublicInbox::TestCommon;
 use PublicInbox::Eml;
 use Fcntl qw(SEEK_SET);
+use PublicInbox::Spawn qw(popen_rd which);
+use List::Util qw(shuffle);
 require_mods(qw(DBD::SQLite));
+require PublicInbox::MboxReader;
 use_ok 'PublicInbox::LeiToMail';
 my $from = "Content-Length: 10\nSubject: x\n\nFrom hell\n";
 my $noeol = "Subject: x\n\nFrom hell";
 my $crlf = $noeol;
 $crlf =~ s/\n/\r\n/g;
 my $kw = [qw(seen answered flagged)];
-for my $mbox (qw(mboxrd mboxo mboxcl mboxcl2)) {
+my @MBOX = qw(mboxcl2 mboxrd mboxcl mboxo);
+for my $mbox (@MBOX) {
 	my $m = "eml2$mbox";
 	my $cb = PublicInbox::LeiToMail->can($m);
 	my $s = $cb->(PublicInbox::Eml->new($from), $kw);
@@ -75,8 +79,9 @@ Subject: x
 blah
 EOM
 my $fn = "$tmpdir/x.mbox";
+my ($mbox) = shuffle(@MBOX); # pick one, shouldn't matter
 my $orig = do {
-	my $wcb = PublicInbox::LeiToMail->write_cb("mboxcl2:$fn", $lei);
+	my $wcb = PublicInbox::LeiToMail->write_cb("$mbox:$fn", $lei);
 	is(ref $wcb, 'CODE', 'write_cb returned callback');
 	ok(-f $fn && !-s _, 'empty file created');
 	$wcb->(\(my $dup = $buf), 'deadbeef', [ qw(seen) ]);
@@ -87,7 +92,8 @@ my $orig = do {
 	unlink $fn or BAIL_OUT $!;
 
 	local $lei->{opt} = { jobs => 2 };
-	$wcb = PublicInbox::LeiToMail->write_cb("mboxcl2:$fn", $lei);
+	$wcb = PublicInbox::LeiToMail->write_cb("$mbox:$fn", $lei);
+	ok(-f $fn && !-s _, 'truncated mbox destination');
 	$lei->{dedupe}->prepare_dedupe;
 	$wcb->(\($dup = $buf), 'deadbeef', [ qw(seen) ]);
 	undef $wcb;
@@ -103,7 +109,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
 		my $dc_cmd = eval { $zsfx2cmd->($zsfx, 1, $lei) };
 		ok($dc_cmd, "decompressor for .$zsfx");
 		my $f = "$fn.$zsfx";
-		my $dst = "mboxcl2:$f";
+		my $dst = "$mbox:$f";
 		my $wcb = PublicInbox::LeiToMail->write_cb($dst, $lei);
 		$wcb->(\(my $dup = $buf), 'deadbeef', [ qw(seen) ]);
 		undef $wcb;
@@ -117,11 +123,37 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
 		$wcb->(\($dup = $buf), 'deadbeef', [ qw(seen) ]);
 		undef $wcb;
 		is(xqx([@$dc_cmd, $f]), $orig, "$zsfx matches with lock");
+
+		local $lei->{opt} = { augment => 1 };
+		$wcb = PublicInbox::LeiToMail->write_cb($dst, $lei);
+		$lei->{dedupe}->prepare_dedupe;
+		$wcb->(\($dup = $buf . "\nx\n"), 'deadbeef', [ qw(seen) ]);
+		undef $wcb; # commit
+
+		my $cat = popen_rd([@$dc_cmd, $f]);
+		my @raw;
+		PublicInbox::MboxReader->$mbox($cat,
+			sub { push @raw, shift->as_string });
+		like($raw[1], qr/\nblah\n\nx\n\z/s, "augmented $zsfx");
+		like($raw[0], qr/\nblah\n\z/s, "original preserved $zsfx");
+
+		local $lei->{opt} = { augment => 1, jobs => 2 };
+		$wcb = PublicInbox::LeiToMail->write_cb($dst, $lei);
+		$lei->{dedupe}->prepare_dedupe;
+		$wcb->(\($dup = $buf . "\ny\n"), 'deadbeef', [ qw(seen) ]);
+		undef $wcb; # commit
+
+		my @raw3;
+		$cat = popen_rd([@$dc_cmd, $f]);
+		PublicInbox::MboxReader->$mbox($cat,
+			sub { push @raw3, shift->as_string });
+		my $y = pop @raw3;
+		is_deeply(\@raw3, \@raw, 'previous messages preserved');
+		like($y, qr/\nblah\n\ny\n\z/s, "augmented $zsfx (atomic)");
 	}
 }
 
 unlink $fn or BAIL_OUT $!;
-require PublicInbox::MboxReader;
 if ('default deduplication uses content_hash') {
 	my $wcb = PublicInbox::LeiToMail->write_cb("mboxo:$fn", $lei);
 	$wcb->(\(my $x = $buf), 'deadbeef', []) for (1..2);
@@ -130,6 +162,17 @@ if ('default deduplication uses content_hash') {
 	open my $fh, '<', $fn or BAIL_OUT $!;
 	PublicInbox::MboxReader->mboxo($fh, sub { $cmp .= shift->as_string });
 	is($cmp, $buf, 'only one message written');
+
+	local $lei->{opt} = { augment => 1 };
+	$wcb = PublicInbox::LeiToMail->write_cb("mboxo:$fn", $lei);
+	$wcb->(\($x = $buf . "\nx\n"), 'deadbeef', []) for (1..2);
+	undef $wcb; # undef to commit changes
+	open $fh, '<', $fn or BAIL_OUT $!;
+	my @x;
+	PublicInbox::MboxReader->mboxo($fh, sub { push @x, shift->as_string });
+	is(scalar(@x), 2, 'augmented mboxo');
+	is($x[0], $cmp, 'original message preserved');
+	is($x[1], $buf . "\nx\n", 'new message appended');
 }
 
 { # stdout support
@@ -145,7 +188,6 @@ if ('default deduplication uses content_hash') {
 }
 
 SKIP: { # FIFO support
-	use PublicInbox::Spawn qw(popen_rd which);
 	use POSIX qw(mkfifo);
 	my $fn = "$tmpdir/fifo";
 	mkfifo($fn, 0600) or skip("mkfifo not supported: $!", 1);
@@ -158,4 +200,47 @@ SKIP: { # FIFO support
 	is($cmp, $buf, 'message written to FIFO');
 }
 
+{ # Maildir support
+	my $md = "$tmpdir/maildir/";
+	my $wcb = PublicInbox::LeiToMail->write_cb("Maildir:$md", $lei);
+	ok($wcb, 'got Maildir callback');
+	$lei->{dedupe}->prepare_dedupe;
+	$wcb->(\(my $x = $buf), 'badc0ffee', []);
+	undef $wcb; # commit
+
+	my @f;
+	PublicInbox::LeiToMail::_maildir_each_file($md, sub { push @f, shift });
+	is(scalar(@f), 1, 'wrote one file');
+	open my $fh, $f[0] or BAIL_OUT $!;
+	is(do { local $/; <$fh> }, $buf, 'wrote to Maildir');
+
+	$wcb = PublicInbox::LeiToMail->write_cb("maildir:$md", $lei);
+	$lei->{dedupe}->prepare_dedupe;
+	$wcb->(\($x = $buf."\nx\n"), 'deadcafe', []);
+	undef $wcb; # commit
+
+	my @x = ();
+	PublicInbox::LeiToMail::_maildir_each_file($md, sub { push @x, shift });
+	is(scalar(@x), 1, 'wrote one new file');
+	ok(!-f $f[0], 'old file clobbered');
+	open $fh, $x[0] or BAIL_OUT $!;
+	is(do { local $/; <$fh> }, $buf."\nx\n", 'wrote new file to Maildir');
+
+	local $lei->{opt}->{augment} = 1;
+	$wcb = PublicInbox::LeiToMail->write_cb("maildir:$md", $lei);
+	$lei->{dedupe}->prepare_dedupe;
+	$wcb->(\($x = $buf."\ny\n"), 'deadcafe', []);
+	$wcb->(\($x = $buf."\ny\n"), 'b4dc0ffee', []); # skipped by dedupe
+	undef $wcb; # commit
+	@f = ();
+	PublicInbox::LeiToMail::_maildir_each_file($md, sub { push @f, shift });
+	is(scalar grep(/\A\Q$x[0]\E\z/, @f), 1, 'old file still there');
+	my @new = grep(!/\A\Q$x[0]\E\z/, @f);
+	is(scalar @new, 1, '1 new file written (b4dc0ffee skipped)');
+	open $fh, $x[0] or BAIL_OUT $!;
+	is(do { local $/; <$fh> }, $buf."\nx\n", 'old file untouched');
+	open $fh, $new[0] or BAIL_OUT $!;
+	is(do { local $/; <$fh> }, $buf."\ny\n", 'new file written');
+}
+
 done_testing;
diff --git a/t/mbox_reader.t b/t/mbox_reader.t
index 9391dc24..7d5a6ba5 100644
--- a/t/mbox_reader.t
+++ b/t/mbox_reader.t
@@ -32,7 +32,6 @@ if ($ENV{TEST_EXTRA}) {
 }
 
 my $reader = PublicInbox::MboxReader->new;
-my $write_in_full = PublicInbox::LeiToMail->can('write_in_full');
 my $check_fmt = sub {
 	my $fmt = shift;
 	my @order = shuffle(keys %raw);
@@ -41,7 +40,7 @@ my $check_fmt = sub {
 	for my $k (@order) {
 		my $eml = PublicInbox::Eml->new($raw{$k});
 		my $buf = $eml2mbox->($eml);
-		$write_in_full->($fh, $buf, undef);
+		print $fh $$buf or BAIL_OUT "print $!";
 	}
 	seek($fh, 0, SEEK_SET) or BAIL_OUT "seek: $!";
 	$reader->$fmt($fh, sub {

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 14/36] ipc: generic IPC dispatch based on Storable
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (12 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 13/36] lei_to_mail: support Maildir, fix+test --augment Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 15/36] ipc: support Sereal Eric Wong
                   ` (21 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

I intend to use this with LeiStore when importing from multiple
slow sources at once (e.g. curl, IMAP, etc).  This is because
over.sqlite3 can only have a single writer, and we'll have
several slow readers running in parallel.

Watch and SearchIdxShard should also be able to use this code
in the future, but this will be proven with LeiStore, first.
---
 MANIFEST                    |   2 +
 lib/PublicInbox/IPC.pm      | 129 ++++++++++++++++++++++++++++++++++++
 lib/PublicInbox/LeiStore.pm |   2 +-
 t/ipc.t                     |  67 +++++++++++++++++++
 t/lei_store.t               |   5 ++
 5 files changed, 204 insertions(+), 1 deletion(-)
 create mode 100644 lib/PublicInbox/IPC.pm
 create mode 100644 t/ipc.t

diff --git a/MANIFEST b/MANIFEST
index 7ce2075e..96ad52bf 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -153,6 +153,7 @@ lib/PublicInbox/IMAPD.pm
 lib/PublicInbox/IMAPTracker.pm
 lib/PublicInbox/IMAPdeflate.pm
 lib/PublicInbox/IMAPsearchqp.pm
+lib/PublicInbox/IPC.pm
 lib/PublicInbox/IdxStack.pm
 lib/PublicInbox/Import.pm
 lib/PublicInbox/In2Tie.pm
@@ -327,6 +328,7 @@ t/index-git-times.t
 t/indexlevels-mirror-v1.t
 t/indexlevels-mirror.t
 t/init.t
+t/ipc.t
 t/iso-2202-jp.eml
 t/kqnotify.t
 t/lei-oneshot.t
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
new file mode 100644
index 00000000..c04140ae
--- /dev/null
+++ b/lib/PublicInbox/IPC.pm
@@ -0,0 +1,129 @@
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# base class for remote IPC calls, requires Storable
+# TODO: this ought to be usable in SearchIdxShard
+package PublicInbox::IPC;
+use strict;
+use v5.10.1;
+use Socket qw(AF_UNIX SOCK_STREAM);
+use Carp qw(confess croak);
+use PublicInbox::Sigfd;
+
+sub _get_rec ($) {
+	my ($sock) = @_;
+	local $/ = "\n";
+	defined(my $len = <$sock>) or return;
+	chop($len) eq "\n" or croak "no LF byte in $len";
+	defined(my $r = read($sock, my $buf, $len)) or croak "read error: $!";
+	$r == $len or croak "short read: $r != $len";
+	thaw($buf);
+}
+
+sub _send_rec ($$) {
+	my ($sock, $ref) = @_;
+	my $buf = freeze($ref);
+	print $sock length($buf), "\n", $buf or croak "print: $!";
+}
+
+sub ipc_return ($$$) {
+	my ($s2, $ret, $exc) = @_;
+	_send_rec($s2, $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : $ret);
+}
+
+sub ipc_worker_loop ($$) {
+	my ($self, $s2) = @_;
+	$self->ipc_atfork_child if $self->can('ipc_atfork_child');
+	$s2->autoflush(1);
+	while (my $rec = _get_rec($s2)) {
+		my ($wantarray, $sub, @args) = @$rec;
+		if (!defined($wantarray)) { # no waiting if client doesn't care
+			eval { $self->$sub(@args) };
+			eval { warn "die: $@ (from nowait $sub)\n" } if $@;
+		} elsif ($wantarray) {
+			my @ret = eval { $self->$sub(@args) };
+			ipc_return($s2, \@ret, $@);
+		} else {
+			my $ret = eval { $self->$sub(@args) };
+			ipc_return($s2, \$ret, $@);
+		}
+	}
+}
+
+sub ipc_worker_spawn ($$$) {
+	my ($self, $ident, $oldset) = @_;
+	eval { require Storable; Storable->import(qw(freeze thaw)); };
+	if ($@) {
+		state $w //= warn "Storable (part of Perl) missing: $@\n";
+		return;
+	}
+	my $pid = $self->{-ipc_worker_pid};
+	confess "BUG: already spawned PID:$pid" if $pid;
+	confess "BUG: already have worker socket" if $self->{-ipc_sock};
+	my ($s1, $s2);
+	socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair: $!";
+	my $sigset = $oldset // PublicInbox::Sigfd::block_signals();
+	defined($pid = fork) or die "fork: $!";
+	if ($pid == 0) {
+		undef $s1;
+		local $0 = $ident;
+		$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
+		PublicInbox::Sigfd::sig_setmask($oldset);
+		eval { ipc_worker_loop($self, $s2) };
+		die "worker $ident died: $@\n" if $@;
+		$self->ipc_at_worker_exit if $self->can('ipc_at_worker_exit');
+		exit;
+	}
+	PublicInbox::Sigfd::sig_setmask($sigset) unless $oldset;
+	$s1->autoflush(1);
+	$self->{-ipc_sock} = $s1;
+	$self->{-ipc_worker_pid} = $pid;
+}
+
+sub ipc_reap_worker { # dwaitpid callback
+	my ($self, $pid) = @_;
+	warn "PID:$pid died with \$?=$?\n" if $?;
+}
+
+sub ipc_worker_stop {
+	my ($self) = @_;
+	my $pid;
+	if (delete $self->{-ipc_sock}) {
+		$pid = delete $self->{-ipc_worker_pid} or die "no PID?";
+	} else {
+		$pid = delete $self->{-ipc_worker_pid} and
+			die "unexpected PID:$pid";
+	}
+	return unless $pid;
+	eval { PublicInbox::DS::dwaitpid($pid, \&ipc_reap_worker, $self) };
+	if ($@) {
+		my $wp = waitpid($pid, 0);
+		$pid == $wp or die "waitpid($pid) returned $wp: \$?=$?";
+		ipc_reap_worker($self, $pid);
+	}
+}
+
+# use this if we have multiple readers reading curl or "pigz -dc"
+# and writing to the same store
+sub ipc_lock_init {
+	my ($self, $f) = @_;
+	require PublicInbox::Lock;
+	$self->{-ipc_lock} //= bless { lock_path => $f }, 'PublicInbox::Lock'
+}
+
+sub ipc_do {
+	my ($self, $sub, @args) = @_;
+	if (my $s1 = $self->{-ipc_sock}) {
+		my $ipc_lock = $self->{-ipc_lock};
+		my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef;
+		_send_rec($s1, [ wantarray, $sub, @args ]);
+		return unless defined(wantarray);
+		my $ret = _get_rec($s1) // die "no response on $sub";
+		die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
+		wantarray ? @$ret : $$ret;
+	} else {
+		$self->$sub(@args);
+	}
+}
+
+1;
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index f8383d5e..2745c560 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -9,7 +9,7 @@
 package PublicInbox::LeiStore;
 use strict;
 use v5.10.1;
-use parent qw(PublicInbox::Lock);
+use parent qw(PublicInbox::Lock PublicInbox::IPC);
 use PublicInbox::SearchIdx qw(crlf_adjust);
 use PublicInbox::ExtSearchIdx;
 use PublicInbox::Import;
diff --git a/t/ipc.t b/t/ipc.t
new file mode 100644
index 00000000..f9c4024b
--- /dev/null
+++ b/t/ipc.t
@@ -0,0 +1,67 @@
+#!perl -w
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+require_ok 'PublicInbox::IPC';
+state $once = eval <<'';
+package PublicInbox::IPC;
+use strict;
+sub test_array { qw(test array) }
+sub test_scalar { 'scalar' }
+sub test_scalarref { \'scalarref' }
+sub test_undef { undef }
+sub test_die { shift; die @_; 'unreachable' }
+sub test_pid { $$ }
+1;
+
+my $ipc = bless {}, 'PublicInbox::IPC';
+my @t = qw(array scalar scalarref undef);
+my $test = sub {
+	my $x = shift;
+	for my $type (@t) {
+		my $m = "test_$type";
+		my @ret = $ipc->ipc_do($m);
+		my @exp = $ipc->$m;
+		is_deeply(\@ret, \@exp, "wantarray $m $x");
+
+		$ipc->ipc_do($m);
+
+		my $ret = $ipc->ipc_do($m);
+		my $exp = $ipc->$m;
+		is_deeply($ret, $exp, "!wantarray $m $x");
+	}
+	my $ret = eval { $ipc->test_die('phail') };
+	my $exp = $@;
+	$ret = eval { $ipc->ipc_do('test_die', 'phail') };
+	my $err = $@;
+	my %lines;
+	for ($err, $exp) {
+		s/ line (\d+).*//s and $lines{$1}++;
+	}
+	is(scalar keys %lines, 1, 'line numbers match');
+	is((values %lines)[0], 2, '2 hits on same line number');
+	is($err, $exp, "$x die matches");
+	is($ret, undef, "$x die did not return");
+};
+$test->('local');
+
+SKIP: {
+	require_mods(qw(Storable), 16);
+	my $pid = $ipc->ipc_worker_spawn('test worker');
+	ok($pid > 0 && kill(0, $pid), 'worker spawned and running');
+	defined($pid) or BAIL_OUT 'no spawn, no test';
+	is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned');
+	$test->('worker');
+	{
+		my ($tmp, $for_destroy) = tmpdir();
+		$ipc->ipc_lock_init("$tmp/lock");
+		is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned');
+	}
+	$ipc->ipc_worker_stop;
+	ok(!kill(0, $pid) && $!{ESRCH}, 'worker stopped');
+}
+$ipc->ipc_worker_stop; # idempotent
+done_testing;
diff --git a/t/lei_store.t b/t/lei_store.t
index 03ab5af6..a189f897 100644
--- a/t/lei_store.t
+++ b/t/lei_store.t
@@ -85,4 +85,9 @@ for my $parallel (0, 1) {
 	is_deeply(\@kw, [], 'set clobbers all');
 }
 
+SKIP: {
+	require_mods(qw(Storable), 1);
+	ok($lst->can('ipc_do'), 'ipc_do works if we have Storable');
+}
+
 done_testing;

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 15/36] ipc: support Sereal
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (13 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 14/36] ipc: generic IPC dispatch based on Storable Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 16/36] lei_store: add ->set_eml, ->add_eml can return smsg Eric Wong
                   ` (20 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

Some testing will be needed to see if it's worth the code
and maintenance overhead, but it seems easy-enough to get
working.
---
 lib/PublicInbox/IPC.pm | 29 ++++++++++++++++++++++++-----
 t/ipc.t                |  2 +-
 2 files changed, 25 insertions(+), 6 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index c04140ae..0baa218c 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -9,6 +9,29 @@ use v5.10.1;
 use Socket qw(AF_UNIX SOCK_STREAM);
 use Carp qw(confess croak);
 use PublicInbox::Sigfd;
+my ($enc, $dec);
+# ->imports at BEGIN turns serial_*_with_object into custom ops on 5.14+
+# and eliminate method call overhead
+BEGIN {
+	eval {
+		require Sereal::Encoder;
+		require Sereal::Decoder;
+		Sereal::Encoder->import('sereal_encode_with_object');
+		Sereal::Decoder->import('sereal_decode_with_object');
+		($enc, $dec) = (Sereal::Encoder->new, Sereal::Decoder->new);
+	};
+};
+
+if ($enc && $dec) { # should be custom ops
+	*freeze = sub ($) { sereal_encode_with_object $enc, $_[0] };
+	*thaw = sub ($) { sereal_decode_with_object $dec, $_[0], my $ret };
+} else {
+	eval { # some distros have Storable as a separate package from Perl
+		require Storable;
+		Storable->import(qw(freeze thaw));
+		$enc = 1;
+	} // warn("Storable (part of Perl) missing: $@\n");
+}
 
 sub _get_rec ($) {
 	my ($sock) = @_;
@@ -52,11 +75,7 @@ sub ipc_worker_loop ($$) {
 
 sub ipc_worker_spawn ($$$) {
 	my ($self, $ident, $oldset) = @_;
-	eval { require Storable; Storable->import(qw(freeze thaw)); };
-	if ($@) {
-		state $w //= warn "Storable (part of Perl) missing: $@\n";
-		return;
-	}
+	return unless $enc;
 	my $pid = $self->{-ipc_worker_pid};
 	confess "BUG: already spawned PID:$pid" if $pid;
 	confess "BUG: already have worker socket" if $self->{-ipc_sock};
diff --git a/t/ipc.t b/t/ipc.t
index f9c4024b..f3715e2c 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -49,7 +49,7 @@ my $test = sub {
 $test->('local');
 
 SKIP: {
-	require_mods(qw(Storable), 16);
+	require_mods(qw(Storable||Sereal), 16);
 	my $pid = $ipc->ipc_worker_spawn('test worker');
 	ok($pid > 0 && kill(0, $pid), 'worker spawned and running');
 	defined($pid) or BAIL_OUT 'no spawn, no test';

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 16/36] lei_store: add ->set_eml, ->add_eml can return smsg
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (14 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 15/36] ipc: support Sereal Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 17/36] lei: rename "extinbox" => "external" Eric Wong
                   ` (19 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

Add a ->set_eml method which can be a useful fire-and-forget
way of either adding new files to store OR setting keywords
on them.

When seeing brand-new messages, add_eml can afford to return
more information in the smsg instead of just the OID.
---
 lib/PublicInbox/LeiStore.pm |  8 +++++++-
 t/lei_store.t               | 18 +++++++++++++++---
 2 files changed, 22 insertions(+), 4 deletions(-)

diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 2745c560..43fddf6d 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -199,6 +199,7 @@ sub add_eml {
 			$idx->shard_add_eidx_info($docid, '.', $eml); # List-Id
 			$idx->shard_add_keywords($docid, @kw) if @kw;
 		}
+		\@docids;
 	} else {
 		$smsg->{num} = $oidx->adj_counter('eidx_docid', '+');
 		$oidx->add_overview($eml, $smsg);
@@ -206,8 +207,13 @@ sub add_eml {
 		my $idx = $eidx->idx_shard($smsg->{num});
 		$idx->index_raw($msgref, $eml, $smsg);
 		$idx->shard_add_keywords($smsg->{num}, @kw) if @kw;
+		$smsg;
 	}
-	$smsg->{blob}
+}
+
+sub set_eml {
+	my ($self, $eml, @kw) = @_;
+	add_eml($self, $eml, @kw) // set_eml_keywords($self, $eml, @kw);
 }
 
 sub done {
diff --git a/t/lei_store.t b/t/lei_store.t
index a189f897..bc0d66c2 100644
--- a/t/lei_store.t
+++ b/t/lei_store.t
@@ -14,8 +14,8 @@ my $opt = { 1 => \(my $out = ''), 2 => \(my $err = '') };
 my $store_dir = "$home/lst";
 my $lst = PublicInbox::LeiStore->new($store_dir, { creat => 1 });
 ok($lst, '->new');
-my $oid = $lst->add_eml(eml_load('t/data/0001.patch'));
-like($oid, qr/\A[0-9a-f]+\z/, 'add returned OID');
+my $smsg = $lst->add_eml(eml_load('t/data/0001.patch'));
+like($smsg->{blob}, qr/\A[0-9a-f]+\z/, 'add returned OID');
 my $eml = eml_load('t/data/0001.patch');
 is($lst->add_eml($eml), undef, 'idempotent');
 $lst->done;
@@ -37,7 +37,7 @@ is_deeply([$lst->maildir_keywords('/foo:2,RSZ')], ['answered', 'seen'],
 	my $es = $lst->search;
 	my $msgs = $es->over->query_xover(0, 1000);
 	is(scalar(@$msgs), 1, 'one message');
-	is($msgs->[0]->{blob}, $oid, 'blob matches');
+	is($msgs->[0]->{blob}, $smsg->{blob}, 'blob matches');
 	my $mset = $es->mset("mid:$msgs->[0]->{mid}");
 	is($mset->size, 1, 'search works');
 	is_deeply($es->mset_to_artnums($mset), [ $msgs->[0]->{num} ],
@@ -83,6 +83,18 @@ for my $parallel (0, 1) {
 	$lst->done;
 	@kw = $lst->search->msg_keywords($docids->[0]);
 	is_deeply(\@kw, [], 'set clobbers all');
+
+	my $set = eml_load('t/plack-qp.eml');
+	$set->header_set('Message-ID', "<set\@$parallel>");
+	my $ret = $lst->set_eml($set, 'seen');
+	is(ref $ret, 'PublicInbox::Smsg', 'initial returns smsg');
+	my $ids = $lst->set_eml($set, qw(seen));
+	is_deeply($ids, [ $ret->{num} ], 'set_eml idempotent');
+	$ids = $lst->set_eml($set, qw(seen answered));
+	is_deeply($ids, [ $ret->{num} ], 'set_eml to change kw');
+	$lst->done;
+	@kw = $lst->search->msg_keywords($ids->[0]);
+	is_deeply(\@kw, [qw(answered seen)], 'set changed kw');
 }
 
 SKIP: {

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 17/36] lei: rename "extinbox" => "external"
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (15 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 16/36] lei_store: add ->set_eml, ->add_eml can return smsg Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 18/36] mid: use defined-or with `push' for uniqueness check Eric Wong
                   ` (18 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

The words "extinbox" and "extindex" are too close and easy to
confuse with the other.  Rename "extinbox" to "external", since
these could be IMAP, JMAP or other non-public-inbox search APIs.

Link: https://public-inbox.org/meta/20201226112649.GB6226@dcvr/
---
 MANIFEST                                      |  2 +-
 lib/PublicInbox/LEI.pm                        | 12 +++++-----
 .../{LeiExtinbox.pm => LeiExternal.pm}        | 18 +++++++--------
 lib/PublicInbox/LeiXSearch.pm                 |  2 +-
 t/lei.t                                       | 22 +++++++++----------
 t/lei_xsearch.t                               |  2 +-
 6 files changed, 29 insertions(+), 29 deletions(-)
 rename lib/PublicInbox/{LeiExtinbox.pm => LeiExternal.pm} (75%)

diff --git a/MANIFEST b/MANIFEST
index 96ad52bf..6dc08f01 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -164,7 +164,7 @@ lib/PublicInbox/Isearch.pm
 lib/PublicInbox/KQNotify.pm
 lib/PublicInbox/LEI.pm
 lib/PublicInbox/LeiDedupe.pm
-lib/PublicInbox/LeiExtinbox.pm
+lib/PublicInbox/LeiExternal.pm
 lib/PublicInbox/LeiSearch.pm
 lib/PublicInbox/LeiStore.pm
 lib/PublicInbox/LeiToMail.pm
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 9aa4d95a..f960aa72 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -8,7 +8,7 @@
 package PublicInbox::LEI;
 use strict;
 use v5.10.1;
-use parent qw(PublicInbox::DS PublicInbox::LeiExtinbox);
+use parent qw(PublicInbox::DS PublicInbox::LeiExternal);
 use Getopt::Long ();
 use Socket qw(AF_UNIX SOCK_STREAM pack_sockaddr_un);
 use Errno qw(EAGAIN ECONNREFUSED ENOENT);
@@ -70,19 +70,19 @@ sub _config_path ($) {
 our %CMD = ( # sorted in order of importance/use:
 'q' => [ 'SEARCH_TERMS...', 'search for messages matching terms', qw(
 	save-as=s output|o=s format|f=s dedupe|d=s thread|t augment|a
-	sort|s=s@ reverse|r offset=i remote local! extinbox!
+	sort|s=s@ reverse|r offset=i remote local! external!
 	since|after=s until|before=s), opt_dash('limit|n=i', '[0-9]+') ],
 
 'show' => [ 'MID|OID', 'show a given object (Message-ID or object ID)',
 	qw(type=s solve! format|f=s dedupe|d=s thread|t remote local!),
 	pass_through('git show') ],
 
-'add-extinbox' => [ 'URL_OR_PATHNAME',
+'add-external' => [ 'URL_OR_PATHNAME',
 	'add/set priority of a publicinbox|extindex for extra matches',
 	qw(boost=i quiet|q) ],
-'ls-extinbox' => [ '[FILTER...]', 'list publicinbox|extindex locations',
+'ls-external' => [ '[FILTER...]', 'list publicinbox|extindex locations',
 	qw(format|f=s z|0 local remote quiet|q) ],
-'forget-extinbox' => [ '{URL_OR_PATHNAME|--prune}',
+'forget-external' => [ '{URL_OR_PATHNAME|--prune}',
 	'exclude further results from a publicinbox|extindex',
 	qw(prune quiet|q) ],
 
@@ -189,7 +189,7 @@ my %OPTDESC = (
 'q	format|f=s' => [ 'OUT|maildir|mboxrd|mboxcl2|mboxcl|html|oid|json',
 		'specify output format, default depends on --output'],
 'ls-query	format|f=s' => $ls_format,
-'ls-extinbox	format|f=s' => $ls_format,
+'ls-external	format|f=s' => $ls_format,
 
 'limit|n=i@' => ['NUM', 'limit on number of matches (default: 10000)' ],
 'offset=i' => ['OFF', 'search result offset (default: 0)'],
diff --git a/lib/PublicInbox/LeiExtinbox.pm b/lib/PublicInbox/LeiExternal.pm
similarity index 75%
rename from lib/PublicInbox/LeiExtinbox.pm
rename to lib/PublicInbox/LeiExternal.pm
index c2de7735..0378551a 100644
--- a/lib/PublicInbox/LeiExtinbox.pm
+++ b/lib/PublicInbox/LeiExternal.pm
@@ -1,22 +1,22 @@
 # Copyright (C) 2020 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
-# *-extinbox commands of lei
-package PublicInbox::LeiExtinbox;
+# *-external commands of lei
+package PublicInbox::LeiExternal;
 use strict;
 use v5.10.1;
 use parent qw(Exporter);
-our @EXPORT = qw(lei_ls_extinbox lei_add_extinbox lei_forget_extinbox);
+our @EXPORT = qw(lei_ls_external lei_add_external lei_forget_external);
 
-sub lei_ls_extinbox {
+sub lei_ls_external {
 	my ($self, @argv) = @_;
 	my $stor = $self->_lei_store(0);
 	my $cfg = $self->_lei_cfg(0);
 	my $out = $self->{1};
 	my ($OFS, $ORS) = $self->{opt}->{z} ? ("\0", "\0\0") : (" ", "\n");
 	my (%boost, @loc);
-	for my $sec (grep(/\Aextinbox\./, @{$cfg->{-section_order}})) {
-		my $loc = substr($sec, length('extinbox.'));
+	for my $sec (grep(/\Aexternal\./, @{$cfg->{-section_order}})) {
+		my $loc = substr($sec, length('external.'));
 		$boost{$loc} = $cfg->{"$sec.boost"};
 		push @loc, $loc;
 	}
@@ -28,14 +28,14 @@ sub lei_ls_extinbox {
 	}
 }
 
-sub lei_add_extinbox {
+sub lei_add_external {
 	my ($self, $url_or_dir) = @_;
 	my $cfg = $self->_lei_cfg(1);
 	if ($url_or_dir !~ m!\Ahttps?://!) {
 		$url_or_dir = File::Spec->canonpath($url_or_dir);
 	}
 	my $new_boost = $self->{opt}->{boost} // 0;
-	my $key = "extinbox.$url_or_dir.boost";
+	my $key = "external.$url_or_dir.boost";
 	my $cur_boost = $cfg->{$key};
 	return if defined($cur_boost) && $cur_boost == $new_boost; # idempotent
 	$self->lei_config($key, $new_boost);
@@ -44,7 +44,7 @@ sub lei_add_extinbox {
 	$stor->done;
 }
 
-sub lei_forget_extinbox {
+sub lei_forget_external {
 	# TODO
 }
 
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 1a81b14a..7d251afd 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -18,7 +18,7 @@ sub new {
 	}, $class
 }
 
-sub attach_extinbox {
+sub attach_external {
 	my ($self, $ibxish) = @_; # ibxish = ExtSearch or Inbox
 	if (!$ibxish->can('over')) {
 		push @{$self->{remotes}}, $ibxish
diff --git a/t/lei.t b/t/lei.t
index a95a0efc..764a7fe4 100644
--- a/t/lei.t
+++ b/t/lei.t
@@ -154,38 +154,38 @@ my $setup_publicinboxes = sub {
 	$seen || BAIL_OUT 'no imports';
 };
 
-my $test_extinbox = sub {
+my $test_external = sub {
 	$setup_publicinboxes->();
 	$cleanup->();
-	$lei->('ls-extinbox');
-	is($out.$err, '', 'ls-extinbox no output, yet');
+	$lei->('ls-external');
+	is($out.$err, '', 'ls-external no output, yet');
 	ok(!-e $config_file && !-e $store_dir,
-		'nothing created by ls-extinbox');
+		'nothing created by ls-external');
 
 	my $cfg = PublicInbox::Config->new;
 	$cfg->each_inbox(sub {
 		my ($ibx) = @_;
-		ok($lei->(qw(add-extinbox -q), $ibx->{inboxdir}),
-			'added extinbox');
+		ok($lei->(qw(add-external -q), $ibx->{inboxdir}),
+			'added external');
 		is($out.$err, '', 'no output');
 	});
 	ok(-s $config_file && -e $store_dir,
-		'add-extinbox created config + store');
+		'add-external created config + store');
 	my $lcfg = PublicInbox::Config->new($config_file);
 	$cfg->each_inbox(sub {
 		my ($ibx) = @_;
-		is($lcfg->{"extinbox.$ibx->{inboxdir}.boost"}, 0,
+		is($lcfg->{"external.$ibx->{inboxdir}.boost"}, 0,
 			"configured boost on $ibx->{name}");
 	});
-	$lei->('ls-extinbox');
-	like($out, qr/boost=0\n/s, 'ls-extinbox has output');
+	$lei->('ls-external');
+	like($out, qr/boost=0\n/s, 'ls-external has output');
 };
 
 my $test_lei_common = sub {
 	$test_help->();
 	$test_config->();
 	$test_init->();
-	$test_extinbox->();
+	$test_external->();
 };
 
 my $test_lei_oneshot = $ENV{TEST_LEI_ONESHOT};
diff --git a/t/lei_xsearch.t b/t/lei_xsearch.t
index c41213bd..178c3d37 100644
--- a/t/lei_xsearch.t
+++ b/t/lei_xsearch.t
@@ -49,7 +49,7 @@ $eidx->eidx_sync({fsync => 0});
 my $es = PublicInbox::ExtSearch->new("$home/eidx");
 my $lxs = PublicInbox::LeiXSearch->new;
 for my $ibxish (shuffle($es, @ibx)) {
-	$lxs->attach_extinbox($ibxish);
+	$lxs->attach_external($ibxish);
 }
 my $nr = $lxs->xdb->get_doccount;
 my $mset = $lxs->mset('d:19931002..19931003', { limit => $nr });

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 18/36] mid: use defined-or with `push' for uniqueness check
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (16 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 17/36] lei: rename "extinbox" => "external" Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 19/36] mid: hoist out mids_in sub Eric Wong
                   ` (17 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

As shown recently in commit a05445fb400108e60ede7d377cf3b26a0392eb24
("config: config_fh_parse: micro-optimize"), the relying on
the return value of `push' and defined-or operators can avoid
modifying a the hash value scalar with an increment.
---
 lib/PublicInbox/MID.pm | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/PublicInbox/MID.pm b/lib/PublicInbox/MID.pm
index 5aeffb8c..601f4c9b 100644
--- a/lib/PublicInbox/MID.pm
+++ b/lib/PublicInbox/MID.pm
@@ -119,7 +119,7 @@ sub uniq_mids ($;$) {
 			warn "Message-ID: <$mid> too long, truncating\n";
 			$mid = substr($mid, 0, MAX_MID_SIZE);
 		}
-		push(@ret, $mid) unless $seen->{$mid}++;
+		$seen->{$mid} //= push(@ret, $mid);
 	}
 	\@ret;
 }

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 19/36] mid: hoist out mids_in sub
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (17 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 18/36] mid: use defined-or with `push' for uniqueness check Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 20/36] lei_store: handle messages without Message-ID at all Eric Wong
                   ` (16 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

We'll be using it for Resent-Message-ID with lei, and possibly
other places.
---
 lib/PublicInbox/MID.pm | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/lib/PublicInbox/MID.pm b/lib/PublicInbox/MID.pm
index 601f4c9b..28739011 100644
--- a/lib/PublicInbox/MID.pm
+++ b/lib/PublicInbox/MID.pm
@@ -7,7 +7,7 @@ use strict;
 use warnings;
 use base qw/Exporter/;
 our @EXPORT_OK = qw(mid_clean id_compress mid2path mid_escape MID_ESC
-	mids references mids_for_index $MID_EXTRACT);
+	mids references mids_for_index mids_in $MID_EXTRACT);
 use URI::Escape qw(uri_escape_utf8);
 use Digest::SHA qw/sha1_hex/;
 require PublicInbox::Address;
@@ -73,14 +73,17 @@ sub mids ($) {
 	uniq_mids(extract_mids(@mids));
 }
 
+# for Resent-Message-ID and maybe others
+sub mids_in ($@) {
+	my ($eml, @headers) = @_;
+	uniq_mids(extract_mids(map { ($eml->header_raw($_)) } @headers));
+}
+
 # we allow searching on X-Alt-Message-ID since PublicInbox::NNTP uses them
 # to placate some clients, and we want to ensure NNTP-only clients can
 # import and index without relying on HTTP endpoints
 sub mids_for_index ($) {
-	my ($hdr) = @_;
-	my @mids = $hdr->header_raw('Message-ID');
-	my @alts = $hdr->header_raw('X-Alt-Message-ID');
-	uniq_mids(extract_mids(@mids, @alts));
+	mids_in($_[0], qw(Message-ID X-Alt-Message-ID));
 }
 
 # last References should be IRT, but some mail clients do things

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 20/36] lei_store: handle messages without Message-ID at all
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (18 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 19/36] mid: hoist out mids_in sub Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 21/36] ipc: use shutdown(2), base atfork* callback Eric Wong
                   ` (15 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

For personal mail, unsent drafts messages are a common source of
messages without Message-IDs.
---
 lib/PublicInbox/LeiStore.pm | 20 ++++++++++++++++----
 lib/PublicInbox/OverIdx.pm  |  2 ++
 lib/PublicInbox/Smsg.pm     |  6 ++----
 t/lei_store.t               | 24 ++++++++++++++++++++++++
 4 files changed, 44 insertions(+), 8 deletions(-)

diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 43fddf6d..c8b9d75e 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -15,8 +15,8 @@ use PublicInbox::ExtSearchIdx;
 use PublicInbox::Import;
 use PublicInbox::InboxWritable;
 use PublicInbox::V2Writable;
-use PublicInbox::ContentHash qw(content_hash);
-use PublicInbox::MID qw(mids);
+use PublicInbox::ContentHash qw(content_hash content_digest);
+use PublicInbox::MID qw(mids mids_in);
 use PublicInbox::LeiSearch;
 use List::Util qw(max);
 
@@ -107,14 +107,26 @@ sub eidx_init {
 	$eidx;
 }
 
+# when a message has no Message-IDs at all, this is needed for
+# unsent Draft messages, at least
+sub _fake_mid_for ($$) {
+	my ($eml, $dig) = @_;
+	my $mids = mids_in($eml, qw(X-Alt-Message-ID Resent-Message-ID));
+	$eml->{-lei_fake_mid} =
+		$mids->[0] // PublicInbox::Import::digest2mid($dig, $eml);
+}
+
 sub _docids_for ($$) {
 	my ($self, $eml) = @_;
 	my %docids;
-	my $chash = content_hash($eml);
+	my $dig = content_digest($eml);
+	my $chash = $dig->clone->digest;
 	my $eidx = eidx_init($self);
 	my $oidx = $eidx->{oidx};
 	my $im = $self->{im};
-	for my $mid (@{mids($eml)}) {
+	my $mids = mids($eml);
+	$mids->[0] //= _fake_mid_for($eml, $dig);
+	for my $mid (@$mids) {
 		my ($id, $prev);
 		while (my $cur = $oidx->next_by_mid($mid, \$id, \$prev)) {
 			my $oid = $cur->{blob};
diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm
index bc2e3ef4..dad3966d 100644
--- a/lib/PublicInbox/OverIdx.pm
+++ b/lib/PublicInbox/OverIdx.pm
@@ -284,6 +284,8 @@ sub add_overview {
 	$smsg->{lines} = $eml->body_raw =~ tr!\n!\n!;
 	my $mids = mids_for_index($eml);
 	my $refs = parse_references($smsg, $eml, $mids);
+	$mids->[0] //= $smsg->{mid} //= $eml->{-lei_fake_mid};
+	$smsg->{mid} //= '';
 	my $subj = $smsg->{subject};
 	my $xpath;
 	if ($subj ne '') {
diff --git a/lib/PublicInbox/Smsg.pm b/lib/PublicInbox/Smsg.pm
index 14086538..9db10c64 100644
--- a/lib/PublicInbox/Smsg.pm
+++ b/lib/PublicInbox/Smsg.pm
@@ -69,7 +69,7 @@ sub psgi_cull ($) {
 	$self;
 }
 
-# for Import and v1 non-SQLite WWW code paths
+# used for v2, Import and v1 non-SQLite WWW code paths
 sub populate {
 	my ($self, $hdr, $sync) = @_;
 	for my $f (qw(From To Cc Subject)) {
@@ -100,9 +100,7 @@ sub populate {
 	$self->{-ts} = [ my @ts = msg_timestamp($hdr, $sync->{cotime}) ];
 	$self->{ds} //= $ds[0]; # no zone
 	$self->{ts} //= $ts[0];
-
-	# for v1 users w/o SQLite
-	$self->{mid} //= eval { mids($hdr)->[0] } // '';
+	$self->{mid} //= mids($hdr)->[0];
 }
 
 # no strftime, that is locale-dependent and not for RFC822
diff --git a/t/lei_store.t b/t/lei_store.t
index bc0d66c2..beb5a8c4 100644
--- a/t/lei_store.t
+++ b/t/lei_store.t
@@ -100,6 +100,30 @@ for my $parallel (0, 1) {
 SKIP: {
 	require_mods(qw(Storable), 1);
 	ok($lst->can('ipc_do'), 'ipc_do works if we have Storable');
+	$eml->header_set('Message-ID', '<ipc-test@example>');
+	my $pid = $lst->ipc_worker_spawn('lei-store');
+	ok($pid > 0, 'got a worker');
+	my $smsg = $lst->ipc_do('set_eml', $eml, qw(seen));
+	is(ref($smsg), 'PublicInbox::Smsg', 'set_eml works over ipc');
+	my $ids = $lst->ipc_do('set_eml', $eml, qw(seen));
+	is_deeply($ids, [ $smsg->{num} ], 'docid returned');
+
+	$eml->header_set('Message-ID');
+	my $no_mid = $lst->ipc_do('set_eml', $eml, qw(seen));
+	my $wait = $lst->ipc_do('done');
+	my @kw = $lst->search->msg_keywords($no_mid->{num});
+	is_deeply(\@kw, [qw(seen)], 'ipc set changed kw');
+
+	is(ref($smsg), 'PublicInbox::Smsg', 'no mid works ipc');
+	$ids = $lst->ipc_do('set_eml', $eml, qw(seen));
+	is_deeply($ids, [ $no_mid->{num} ], 'docid returned w/o mid w/ ipc');
+	$lst->ipc_do('done');
+	$lst->ipc_worker_stop;
+	$ids = $lst->ipc_do('set_eml', $eml, qw(seen answered));
+	is_deeply($ids, [ $no_mid->{num} ], 'docid returned w/o mid w/o ipc');
+	$wait = $lst->ipc_do('done');
+	@kw = $lst->search->msg_keywords($no_mid->{num});
+	is_deeply(\@kw, [qw(answered seen)], 'set changed kw w/o ipc');
 }
 
 done_testing;

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 21/36] ipc: use shutdown(2), base atfork* callback
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (19 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 20/36] lei_store: handle messages without Message-ID at all Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 22/36] lei_to_mail: unlink mboxes if not augmenting Eric Wong
                   ` (14 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

shutdown(2) on a socket can be preferable if there's multiple
forked processes writing to a single worker and we really want
to shut things down ASAP.

It may also be good to provide an ipc_worker_exit method which
subclasses can override if needed for graceful shutdown.  But we
won't need equivalents to atexit(3) since we can rely on DESTROY
handlers given this is Perl5.
---
 lib/PublicInbox/IPC.pm | 49 ++++++++++++++++++++++++++++--------------
 1 file changed, 33 insertions(+), 16 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 0baa218c..ed10cf44 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -56,8 +56,6 @@ sub ipc_return ($$$) {
 
 sub ipc_worker_loop ($$) {
 	my ($self, $s2) = @_;
-	$self->ipc_atfork_child if $self->can('ipc_atfork_child');
-	$s2->autoflush(1);
 	while (my $rec = _get_rec($s2)) {
 		my ($wantarray, $sub, @args) = @$rec;
 		if (!defined($wantarray)) { # no waiting if client doesn't care
@@ -73,7 +71,7 @@ sub ipc_worker_loop ($$) {
 	}
 }
 
-sub ipc_worker_spawn ($$$) {
+sub ipc_worker_spawn {
 	my ($self, $ident, $oldset) = @_;
 	return unless $enc;
 	my $pid = $self->{-ipc_worker_pid};
@@ -82,43 +80,62 @@ sub ipc_worker_spawn ($$$) {
 	my ($s1, $s2);
 	socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair: $!";
 	my $sigset = $oldset // PublicInbox::Sigfd::block_signals();
+	my $parent = $$;
+	$self->ipc_atfork_parent;
 	defined($pid = fork) or die "fork: $!";
 	if ($pid == 0) {
-		undef $s1;
-		local $0 = $ident;
+		eval { PublicInbox::DS->Reset };
+		$self->{-ipc_parent_pid} = $parent;
+		close $s1 or die "close(\$s1): $!";
+		$s2->autoflush(1);
 		$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
+		local $0 = $ident;
 		PublicInbox::Sigfd::sig_setmask($oldset);
+		$self->ipc_atfork_child;
 		eval { ipc_worker_loop($self, $s2) };
-		die "worker $ident died: $@\n" if $@;
-		$self->ipc_at_worker_exit if $self->can('ipc_at_worker_exit');
+		die "worker $ident PID:$$ died: $@\n" if $@;
 		exit;
 	}
 	PublicInbox::Sigfd::sig_setmask($sigset) unless $oldset;
+	close $s2 or die "close(\$s2): $!";
 	$s1->autoflush(1);
 	$self->{-ipc_sock} = $s1;
 	$self->{-ipc_worker_pid} = $pid;
 }
 
-sub ipc_reap_worker { # dwaitpid callback
+sub ipc_worker_reap { # dwaitpid callback
 	my ($self, $pid) = @_;
 	warn "PID:$pid died with \$?=$?\n" if $?;
 }
 
+# for base class, override in superclasses
+sub ipc_atfork_parent {}
+sub ipc_atfork_child {}
+
+sub ipc_worker_exit {
+	my (undef, $code) = @_;
+	exit($code);
+}
+
 sub ipc_worker_stop {
 	my ($self) = @_;
 	my $pid;
-	if (delete $self->{-ipc_sock}) {
-		$pid = delete $self->{-ipc_worker_pid} or die "no PID?";
-	} else {
+	my $s1 = delete $self->{-ipc_sock} or do {
 		$pid = delete $self->{-ipc_worker_pid} and
-			die "unexpected PID:$pid";
-	}
-	return unless $pid;
-	eval { PublicInbox::DS::dwaitpid($pid, \&ipc_reap_worker, $self) };
+			die "unexpected PID:$pid without ipc_sock";
+		return;
+	};
+	$pid = delete $self->{-ipc_worker_pid} or die "no PID?";
+	_send_rec($s1, [ undef, 'ipc_worker_exit', 0 ]);
+	shutdown($s1, 2) or die "shutdown(\$s1) for PID:$pid";
+	eval {
+		my $reap = $self->can('ipc_worker_reap');
+		PublicInbox::DS::dwaitpid($pid, $reap, $self);
+	};
 	if ($@) {
 		my $wp = waitpid($pid, 0);
 		$pid == $wp or die "waitpid($pid) returned $wp: \$?=$?";
-		ipc_reap_worker($self, $pid);
+		$self->ipc_worker_reap($pid);
 	}
 }
 

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 22/36] lei_to_mail: unlink mboxes if not augmenting
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (20 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 21/36] ipc: use shutdown(2), base atfork* callback Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 23/36] lei: add --mfolder as an --output alias Eric Wong
                   ` (13 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

This matches mairix(1) behavior and may be safer if there's
concurrent readers on the existing mbox, especially since
we don't do currently implement mbox locking (nor does mairix).
---
 lib/PublicInbox/LeiToMail.pm | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 0b2685b0..5b80eb27 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -13,7 +13,7 @@ use PublicInbox::LeiDedupe;
 use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
-use Errno qw(EEXIST ESPIPE);
+use Errno qw(EEXIST ESPIPE ENOENT);
 
 my %kw2char = ( # Maildir characters
 	draft => 'D',
@@ -230,7 +230,10 @@ sub _mbox_write_cb ($$$$) {
 	# XXX should we support /dev/stdout.gz ?
 	if ($dst eq '/dev/stdout') {
 		$out = $lei->{1};
-	} else { # TODO: mbox locking
+	} else { # TODO: mbox locking (but mairix doesn't...)
+		if (!$lei->{opt}->{augment} && -f $dst and !unlink($dst)) {
+			die "unlink $dst: $!" if $! != ENOENT;
+		}
 		open $out, '+>>', $dst or die "open $dst: $!";
 		# Perl does SEEK_END even with O_APPEND :<
 		$seekable = seek($out, 0, SEEK_SET);
@@ -251,8 +254,6 @@ sub _mbox_write_cb ($$$$) {
 		# maybe some systems don't honor O_APPEND, Perl does this:
 		seek($out, 0, SEEK_END) or die "seek $dst: $!";
 		$dedupe->pause_dedupe if $jobs; # are we forking?
-	} elsif ($seekable) {
-		truncate($out, 0) or die "truncate $dst: $!";
 	}
 	$dedupe->prepare_dedupe if !$jobs;
 	($out, $pipe_lk) = compress_dst($out, $zsfx, $lei) if $zsfx;

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 23/36] lei: add --mfolder as an --output alias
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (21 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 22/36] lei_to_mail: unlink mboxes if not augmenting Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 24/36] spawn: move run_die here from PublicInbox::Import Eric Wong
                   ` (12 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

This will be helpful for mairix users.
---
 lib/PublicInbox/LEI.pm | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index f960aa72..bb77198e 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -69,7 +69,7 @@ sub _config_path ($) {
 # command => [ positional_args, 1-line description, Getopt::Long option spec ]
 our %CMD = ( # sorted in order of importance/use:
 'q' => [ 'SEARCH_TERMS...', 'search for messages matching terms', qw(
-	save-as=s output|o=s format|f=s dedupe|d=s thread|t augment|a
+	save-as=s output|mfolder|o=s format|f=s dedupe|d=s thread|t augment|a
 	sort|s=s@ reverse|r offset=i remote local! external!
 	since|after=s until|before=s), opt_dash('limit|n=i', '[0-9]+') ],
 

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 24/36] spawn: move run_die here from PublicInbox::Import
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (22 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 23/36] lei: add --mfolder as an --output alias Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 25/36] init: remove embedded UnlinkMe package Eric Wong
                   ` (11 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

It seems like a more logical place for it, but we'll favor the
newly-added xsys_e() in tests for BAIL_OUT use.
---
 lib/PublicInbox/Import.pm     |  9 +--------
 lib/PublicInbox/LEI.pm        |  5 ++---
 lib/PublicInbox/Spawn.pm      |  9 ++++++++-
 lib/PublicInbox/TestCommon.pm | 25 ++++++++++++++++++-------
 lib/PublicInbox/V2Writable.pm | 10 ++++------
 script/public-inbox-init      | 13 ++++++++-----
 t/convert-compact.t           |  4 ++--
 t/index-git-times.t           |  3 +--
 t/plack.t                     |  4 ++--
 9 files changed, 46 insertions(+), 36 deletions(-)

diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index 60cff9c2..6f387b77 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -9,7 +9,7 @@ package PublicInbox::Import;
 use strict;
 use parent qw(PublicInbox::Lock);
 use v5.10.1;
-use PublicInbox::Spawn qw(spawn popen_rd);
+use PublicInbox::Spawn qw(run_die popen_rd);
 use PublicInbox::MID qw(mids mid2path);
 use PublicInbox::Address;
 use PublicInbox::Smsg;
@@ -442,13 +442,6 @@ sub add {
 	$self->{tip} = ":$commit";
 }
 
-sub run_die ($;$$) {
-	my ($cmd, $env, $rdr) = @_;
-	my $pid = spawn($cmd, $env, $rdr);
-	waitpid($pid, 0) == $pid or die join(' ', @$cmd) .' did not finish';
-	$? == 0 or die join(' ', @$cmd) . " failed: $?\n";
-}
-
 my @INIT_FILES = ('HEAD' => undef, # filled in at runtime
 		'description' => <<EOD,
 Unnamed repository; edit this file 'description' to name the repository.
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index bb77198e..1ba9eff3 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -19,7 +19,7 @@ use PublicInbox::Config;
 use PublicInbox::Syscall qw($SFD_NONBLOCK EPOLLIN EPOLLONESHOT);
 use PublicInbox::Sigfd;
 use PublicInbox::DS qw(now);
-use PublicInbox::Spawn qw(spawn);
+use PublicInbox::Spawn qw(spawn run_die);
 use PublicInbox::OnDestroy;
 use Text::Wrap qw(wrap);
 use File::Path qw(mkpath);
@@ -482,8 +482,7 @@ sub lei_config {
 	my $cfg = _lei_cfg($self, 1);
 	my $cmd = [ qw(git config -f), $cfg->{'-f'}, @argv ];
 	my %rdr = map { $_ => $self->{$_} } (0..2);
-	require PublicInbox::Import;
-	PublicInbox::Import::run_die($cmd, $env, \%rdr);
+	run_die($cmd, $env, \%rdr);
 }
 
 sub lei_init {
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index cb16fcf6..5e2495d2 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -19,7 +19,7 @@ use strict;
 use parent qw(Exporter);
 use Symbol qw(gensym);
 use PublicInbox::ProcessPipe;
-our @EXPORT_OK = qw/which spawn popen_rd nodatacow_dir/;
+our @EXPORT_OK = qw(which spawn popen_rd run_die nodatacow_dir);
 our @RLIMITS = qw(RLIMIT_CPU RLIMIT_CORE RLIMIT_DATA);
 
 my $vfork_spawn = <<'VFORK_SPAWN';
@@ -306,4 +306,11 @@ sub popen_rd {
 	$ret;
 }
 
+sub run_die ($;$$) {
+	my ($cmd, $env, $rdr) = @_;
+	my $pid = spawn($cmd, $env, $rdr);
+	waitpid($pid, 0) == $pid or die "@$cmd did not finish";
+	$? == 0 or die "@$cmd failed: \$?=$?\n";
+}
+
 1;
diff --git a/lib/PublicInbox/TestCommon.pm b/lib/PublicInbox/TestCommon.pm
index 338e760c..b833984d 100644
--- a/lib/PublicInbox/TestCommon.pm
+++ b/lib/PublicInbox/TestCommon.pm
@@ -10,8 +10,14 @@ use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD :seek);
 use POSIX qw(dup2);
 use IO::Socket::INET;
 our @EXPORT = qw(tmpdir tcp_server tcp_connect require_git require_mods
-	run_script start_script key2sub xsys xqx eml_load tick
+	run_script start_script key2sub xsys xsys_e xqx eml_load tick
 	have_xapian_compact);
+BEGIN {
+	require Test::More;
+	*BAIL_OUT = \&Test::More::BAIL_OUT;
+	*plan = \&Test::More::plan;
+	*skip = \&Test::More::skip;
+}
 
 sub eml_load ($) {
 	my ($path, $cb) = @_;
@@ -38,7 +44,7 @@ sub tcp_server () {
 		Type => Socket::SOCK_STREAM(),
 		Listen => 1024,
 		Blocking => 0,
-	) or Test::More::BAIL_OUT("failed to create TCP server: $!");
+	) or BAIL_OUT "failed to create TCP server: $!";
 }
 
 sub tcp_connect {
@@ -49,7 +55,7 @@ sub tcp_connect {
 		Type => Socket::SOCK_STREAM(),
 		PeerAddr => $addr,
 		%opt,
-	) or Test::More::BAIL_OUT("failed to connect to $addr: $!");
+	) or BAIL_OUT "failed to connect to $addr: $!";
 	$s->autoflush(1);
 	$s;
 }
@@ -64,8 +70,8 @@ sub require_git ($;$) {
 	my $cur_int = ($cur_maj << 24) | ($cur_min << 16) | ($cur_sub // 0);
 	if ($cur_int < $req_int) {
 		return 0 if $maybe;
-		Test::More::plan(skip_all =>
-			"git $req+ required, have $cur_maj.$cur_min.$cur_sub");
+		plan skip_all =>
+			"git $req+ required, have $cur_maj.$cur_min.$cur_sub";
 	}
 	1;
 }
@@ -113,8 +119,8 @@ sub require_mods {
 	}
 	return unless @need;
 	my $m = join(', ', @need)." missing for $0";
-	Test::More::skip($m, $maybe) if $maybe;
-	Test::More::plan(skip_all => $m)
+	skip($m, $maybe) if $maybe;
+	plan(skip_all => $m)
 }
 
 sub key2script ($) {
@@ -323,6 +329,11 @@ sub xsys {
 	$? >> 8
 }
 
+sub xsys_e { # like "/bin/sh -e"
+	xsys(@_) == 0 or
+		BAIL_OUT (ref $_[0] ? "@{$_[0]}" : "@_"). " failed \$?=$?"
+}
+
 # like `backtick` or qx{} op, but uses spawn() for env/rdr + vfork
 sub xqx {
 	my ($cmd, $env, $rdr) = @_;
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index edb8ba57..5a8caa08 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -16,7 +16,7 @@ use PublicInbox::ContentHash qw(content_hash content_digest);
 use PublicInbox::InboxWritable;
 use PublicInbox::OverIdx;
 use PublicInbox::Msgmap;
-use PublicInbox::Spawn qw(spawn popen_rd);
+use PublicInbox::Spawn qw(spawn popen_rd run_die);
 use PublicInbox::SearchIdx qw(log2stack crlf_adjust is_ancestor check_size
 	is_bad_blob);
 use IO::Handle; # ->autoflush
@@ -745,9 +745,8 @@ sub git_init {
 	my ($self, $epoch) = @_;
 	my $git_dir = "$self->{ibx}->{inboxdir}/git/$epoch.git";
 	PublicInbox::Import::init_bare($git_dir);
-	my @cmd = (qw/git config/, "--file=$git_dir/config",
-			'include.path', '../../all.git/config');
-	PublicInbox::Import::run_die(\@cmd);
+	run_die([qw(git config), "--file=$git_dir/config",
+		qw(include.path ../../all.git/config)]);
 	fill_alternates($self, $epoch);
 	$git_dir
 }
@@ -1222,8 +1221,7 @@ sub unindex_todo ($$$) {
 	return if $before == $after;
 
 	# ensure any blob can not longer be accessed via dumb HTTP
-	PublicInbox::Import::run_die(['git',
-		"--git-dir=$unit->{git}->{git_dir}",
+	run_die(['git', "--git-dir=$unit->{git}->{git_dir}",
 		qw(-c gc.reflogExpire=now gc --prune=all --quiet)]);
 }
 
diff --git a/script/public-inbox-init b/script/public-inbox-init
index 7ac77830..85d14377 100755
--- a/script/public-inbox-init
+++ b/script/public-inbox-init
@@ -185,17 +185,20 @@ $ibx->init_inbox(0, $skip_epoch, $skip_artnum);
 # needed for git prior to v2.1.0
 umask(0077) if defined $perm;
 
+require PublicInbox::Spawn;
+PublicInbox::Spawn->import(qw(run_die));
+
 foreach my $addr (@address) {
 	next if $seen{lc($addr)};
-	PublicInbox::Import::run_die([@x, "--add", "$pfx.address", $addr]);
+	run_die([@x, "--add", "$pfx.address", $addr]);
 }
-PublicInbox::Import::run_die([@x, "$pfx.url", $http_url]);
-PublicInbox::Import::run_die([@x, "$pfx.inboxdir", $inboxdir]);
+run_die([@x, "$pfx.url", $http_url]);
+run_die([@x, "$pfx.inboxdir", $inboxdir]);
 
 if (defined($indexlevel)) {
-	PublicInbox::Import::run_die([@x, "$pfx.indexlevel", $indexlevel]);
+	run_die([@x, "$pfx.indexlevel", $indexlevel]);
 }
-PublicInbox::Import::run_die([@x, "$pfx.newsgroup", $ng]) if $ng ne '';
+run_die([@x, "$pfx.newsgroup", $ng]) if $ng ne '';
 
 # needed for git prior to v2.1.0
 if (defined $perm) {
diff --git a/t/convert-compact.t b/t/convert-compact.t
index e479476d..fe8a7ec2 100644
--- a/t/convert-compact.t
+++ b/t/convert-compact.t
@@ -21,8 +21,8 @@ my $ibx = {
 
 PublicInbox::Import::init_bare($ibx->{inboxdir});
 ok(umask(077), 'set restrictive umask');
-ok(PublicInbox::Import::run_die([qw(git) , "--git-dir=$ibx->{inboxdir}",
-	qw(config core.sharedRepository 0644)]), 'set sharedRepository');
+xsys_e(qw(git) , "--git-dir=$ibx->{inboxdir}",
+	qw(config core.sharedRepository 0644));
 $ibx = PublicInbox::Inbox->new($ibx);
 my $im = PublicInbox::Import->new($ibx->git, undef, undef, $ibx);
 my $mime = PublicInbox::Eml->new(<<'EOF');
diff --git a/t/index-git-times.t b/t/index-git-times.t
index f9869cfa..9b869f94 100644
--- a/t/index-git-times.t
+++ b/t/index-git-times.t
@@ -5,7 +5,6 @@ use strict;
 use v5.10.1;
 use Test::More;
 use PublicInbox::TestCommon;
-use PublicInbox::Import;
 use PublicInbox::Config;
 use PublicInbox::Admin;
 use File::Path qw(remove_tree);
@@ -48,7 +47,7 @@ EOF
 	print $w $data or die;
 	close $w or die;
 	my $cmd = ['git', "--git-dir=$v1dir", 'fast-import', '--quiet'];
-	PublicInbox::Import::run_die($cmd, undef, { 0 => $r });
+	xsys_e($cmd, undef, { 0 => $r });
 }
 
 run_script(['-index', '--skip-docdata', $v1dir]) or die 'v1 index failed';
diff --git a/t/plack.t b/t/plack.t
index 1fedf426..5d65cd05 100644
--- a/t/plack.t
+++ b/t/plack.t
@@ -21,8 +21,8 @@ ok(-f $psgi, "psgi example file found");
 my $pfx = 'http://example.com/test';
 ok(run_script(['-init', 'test', $inboxdir, "$pfx/", $addr]),
 	'initialized repo');
-PublicInbox::Import::run_die([qw(git config -f), $pi_config,
-	'publicinbox.test.newsgroup', 'inbox.test']);
+xsys_e(qw(git config -f), $pi_config,
+	qw(publicinbox.test.newsgroup inbox.test));
 open my $fh, '>', "$inboxdir/description" or die "open: $!\n";
 print $fh "test for public-inbox\n";
 close $fh or die "close: $!\n";

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 25/36] init: remove embedded UnlinkMe package
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (23 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 24/36] spawn: move run_die here from PublicInbox::Import Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 26/36] t/run: avoid uninitialized var on incomplete test Eric Wong
                   ` (10 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

PublicInbox::OnDestroy can do the same thing
---
 script/public-inbox-init | 19 +++----------------
 1 file changed, 3 insertions(+), 16 deletions(-)

diff --git a/script/public-inbox-init b/script/public-inbox-init
index 85d14377..693f5ca1 100755
--- a/script/public-inbox-init
+++ b/script/public-inbox-init
@@ -91,7 +91,8 @@ sysopen($lockfh, $lockfile, O_RDWR|O_CREAT|O_EXCL) or do {
 	warn "could not open config file: $lockfile: $!\n";
 	exit(255);
 };
-my $auto_unlink = UnlinkMe->new($lockfile);
+require PublicInbox::OnDestroy;
+my $auto_unlink = PublicInbox::OnDestroy->new(sub { unlink $lockfile });
 my ($perm, %seen);
 if (-e $pi_config) {
 	open(my $oh, '<', $pi_config) or die "unable to read $pi_config: $!\n";
@@ -208,18 +209,4 @@ if (defined $perm) {
 
 rename $pi_config_tmp, $pi_config or
 	die "failed to rename `$pi_config_tmp' to `$pi_config': $!\n";
-$auto_unlink->DESTROY;
-
-package UnlinkMe;
-use strict;
-
-sub new {
-	my ($klass, $file) = @_;
-	bless { file => $file }, $klass;
-}
-
-sub DESTROY {
-	my $f = delete($_[0]->{file});
-	unlink($f) if defined($f);
-}
-1;
+undef $auto_unlink; # trigger ->DESTROY

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 26/36] t/run: avoid uninitialized var on incomplete test
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (24 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 25/36] init: remove embedded UnlinkMe package Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 27/36] gcf2client: reap process on DESTROY Eric Wong
                   ` (9 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

Diagnosing an occasional FIFO failure in t/lei_to_mail.t...
---
 t/run.perl | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/t/run.perl b/t/run.perl
index 1c7bcfc3..5c056356 100755
--- a/t/run.perl
+++ b/t/run.perl
@@ -71,7 +71,8 @@ sub test_status () {
 		my $skip = '';
 		if (open my $fh, '<', $log) {
 			my @not_ok = grep(!/^(?:ok |[ \t]*#)/ms, <$fh>);
-			pop @not_ok if $not_ok[-1] =~ /^[0-9]+\.\.[0-9]+$/;
+			my $last = $not_ok[-1] // '';
+			pop @not_ok if $last =~ /^[0-9]+\.\.[0-9]+$/;
 			my $pfx = "# $log: ";
 			print $OLDERR map { $pfx.$_ } @not_ok;
 			seek($fh, 0, SEEK_SET) or die "seek: $!";

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 27/36] gcf2client: reap process on DESTROY
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (25 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 26/36] t/run: avoid uninitialized var on incomplete test Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 28/36] lei_to_mail: open FIFOs O_WRONLY so we block Eric Wong
                   ` (8 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

We don't want to leave Xapcmd waitpid(-1, ...) call to hit it.
---
 lib/PublicInbox/Gcf2Client.pm | 20 ++++++++++++++++++--
 1 file changed, 18 insertions(+), 2 deletions(-)

diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index ab486de5..4bda5520 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -12,8 +12,8 @@ use PublicInbox::Syscall qw(EPOLLONESHOT);
 # fields:
 #	async_cat => GitAsyncCat ref (read-only pipe)
 #	sock => writable pipe to Gcf2::loop
-
-
+#	in => pipe we read from
+#	pid => PID of Gcf2::loop process
 sub new  {
 	my ($rdr) = @_;
 	my $self = bless {}, __PACKAGE__;
@@ -63,6 +63,22 @@ sub event_step {
 
 no warnings 'once';
 
+sub DESTROY {
+	my ($self) = @_;
+	my $pid = delete $self->{pid};
+	delete $self->{in};
+	return unless $pid;
+	eval {
+		PublicInbox::DS::dwaitpid($pid, undef, undef);
+		$self->close; # we're still in the event loop
+	};
+	if ($@) { # wait synchronously if not in event loop
+		my $sock = delete $self->{sock};
+		close $sock if $sock;
+		waitpid($pid, 0);
+	}
+}
+
 # used by GitAsyncCat
 *cat_async_step = \&PublicInbox::Git::cat_async_step;
 

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 28/36] lei_to_mail: open FIFOs O_WRONLY so we block
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (26 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 27/36] gcf2client: reap process on DESTROY Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 29/36] searchidxshard: call DS->Reset at worker start Eric Wong
                   ` (7 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

Opening a FIFO with O_RDWR always succeeds on Linux, which
cause the cat(1) process invoked by t/lei_to_mail.t to get
stuck.  Furthermore O_APPEND makes no sense on FIFOs and
perhaps there's some kernel out there which will reject it.
---
 lib/PublicInbox/LeiToMail.pm | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 5b80eb27..be338006 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -231,10 +231,11 @@ sub _mbox_write_cb ($$$$) {
 	if ($dst eq '/dev/stdout') {
 		$out = $lei->{1};
 	} else { # TODO: mbox locking (but mairix doesn't...)
-		if (!$lei->{opt}->{augment} && -f $dst and !unlink($dst)) {
+		my $mode = -p $dst ? '>' : '+>>';
+		if (-f _ && !$lei->{opt}->{augment} and !unlink($dst)) {
 			die "unlink $dst: $!" if $! != ENOENT;
 		}
-		open $out, '+>>', $dst or die "open $dst: $!";
+		open $out, $mode, $dst or die "open $dst: $!";
 		# Perl does SEEK_END even with O_APPEND :<
 		$seekable = seek($out, 0, SEEK_SET);
 		die "seek $dst: $!\n" if !$seekable && $! != ESPIPE;

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 29/36] searchidxshard: call DS->Reset at worker start
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (27 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 28/36] lei_to_mail: open FIFOs O_WRONLY so we block Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 30/36] t/ipc.t: test for references via `die' Eric Wong
                   ` (6 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

The daemon for the local email interface will be inside
the DS->EventLoop.  -watch currently doesn't trigger this
bug since it doesn't enable parallelism, but it may in
the future.
---
 lib/PublicInbox/SearchIdxShard.pm | 1 +
 1 file changed, 1 insertion(+)

diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
index 87b0bad6..a41477cd 100644
--- a/lib/PublicInbox/SearchIdxShard.pm
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -34,6 +34,7 @@ sub spawn_worker {
 	my $pid = fork;
 	defined $pid or die "fork failed: $!\n";
 	if ($pid == 0) {
+		eval { PublicInbox::DS->Reset };
 		# these signals are localized in parent
 		$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
 		PublicInbox::Sigfd::sig_setmask($oldset);

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 30/36] t/ipc.t: test for references via `die'
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (28 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 29/36] searchidxshard: call DS->Reset at worker start Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 31/36] use PublicInbox::DS for dwaitpid Eric Wong
                   ` (5 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

We'll probably start using references as exceptions in
some places for more exact matching.
---
 t/ipc.t | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/t/ipc.t b/t/ipc.t
index f3715e2c..5ee45e63 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -45,6 +45,19 @@ my $test = sub {
 	is((values %lines)[0], 2, '2 hits on same line number');
 	is($err, $exp, "$x die matches");
 	is($ret, undef, "$x die did not return");
+
+	eval { $ipc->test_die(['arrayref']) };
+	$exp = $@;
+	$ret = eval { $ipc->ipc_do('test_die', ['arrayref']) };
+	$err = $@;
+	is_deeply($err, $exp, 'die with unblessed ref');
+	is(ref($err), 'ARRAY', 'got an array ref');
+
+	$exp = bless ['blessed'], 'PublicInbox::WTF';
+	$ret = eval { $ipc->ipc_do('test_die', $exp) };
+	$err = $@;
+	is_deeply($err, $exp, 'die with blessed ref');
+	is(ref($err), 'PublicInbox::WTF', 'got blessed ref');
 };
 $test->('local');
 

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 31/36] use PublicInbox::DS for dwaitpid
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (29 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 30/36] t/ipc.t: test for references via `die' Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 32/36] syscall: SFD_NONBLOCK can be a constant, again Eric Wong
                   ` (4 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

This simplifies our code and provides a more consistent API for
error handling.  PublicInbox::DS can be loaded nowadays on all
*BSDs and Linux distros easily without extra packages to
install.

The downside is possibly increased startup time, but it's
probably not as a big problem with lei being a daemon
(and -mda possibly following suite).
---
 lib/PublicInbox/DS.pm          | 39 ++++++++++++++++++++----------
 lib/PublicInbox/Gcf2Client.pm  | 16 ++++---------
 lib/PublicInbox/Git.pm         |  6 ++---
 lib/PublicInbox/LEI.pm         |  4 ++--
 lib/PublicInbox/ProcessPipe.pm | 17 ++++++++------
 lib/PublicInbox/Qspawn.pm      | 43 ++++++++--------------------------
 6 files changed, 56 insertions(+), 69 deletions(-)

diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 97a6f6ef..01c9abd4 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -21,19 +21,19 @@
 #        (tmpio = [ GLOB, offset, [ length ] ])
 package PublicInbox::DS;
 use strict;
+use v5.10.1;
+use parent qw(Exporter);
 use bytes;
 use POSIX qw(WNOHANG);
 use IO::Handle qw();
 use Fcntl qw(SEEK_SET :DEFAULT O_APPEND);
 use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
-use parent qw(Exporter);
-our @EXPORT_OK = qw(now msg_more);
-use 5.010_001;
 use Scalar::Util qw(blessed);
 use PublicInbox::Syscall qw(:epoll);
 use PublicInbox::Tmpfile;
 use Errno qw(EAGAIN EINVAL);
 use Carp qw(confess carp);
+our @EXPORT_OK = qw(now msg_more dwaitpid);
 
 my $nextq; # queue for next_tick
 my $wait_pids; # list of [ pid, callback, callback_arg ]
@@ -215,8 +215,13 @@ sub reap_pids {
 		my $ret = waitpid($pid, WNOHANG);
 		if ($ret == 0) {
 			push @$wait_pids, $ary; # autovivifies @$wait_pids
-		} elsif ($cb) {
-			eval { $cb->($arg, $pid) };
+		} elsif ($ret == $pid) {
+			if ($cb) {
+				eval { $cb->($arg, $pid) };
+				warn "E: dwaitpid($pid) in_loop: $@" if $@;
+			}
+		} else {
+			warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?";
 		}
 	}
 	# we may not be done, yet, and could've missed/masked a SIGCHLD:
@@ -608,13 +613,23 @@ sub shutdn ($) {
     }
 }
 
-# must be called with eval, PublicInbox::DS may not be loaded (see t/qspawn.t)
-sub dwaitpid ($$$) {
-	die "Not in EventLoop\n" unless $in_loop;
-	push @$wait_pids, [ @_ ]; # [ $pid, $cb, $arg ]
-
-	# We could've just missed our SIGCHLD, cover it, here:
-	enqueue_reap();
+sub dwaitpid ($;$$) {
+	my ($pid, $cb, $arg) = @_;
+	if ($in_loop) {
+		push @$wait_pids, [ $pid, $cb, $arg ];
+		# We could've just missed our SIGCHLD, cover it, here:
+		enqueue_reap();
+	} else {
+		my $ret = waitpid($pid, 0);
+		if ($ret == $pid) {
+			if ($cb) {
+				eval { $cb->($arg, $pid) };
+				warn "E: dwaitpid($pid) !in_loop: $@" if $@;
+			}
+		} else {
+			warn "waitpid($pid, 0) = $ret, \$!=$!, \$?=$?";
+		}
+	}
 }
 
 sub _run_later () {
diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index 4bda5520..10820852 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -9,6 +9,7 @@ use PublicInbox::Git;
 use PublicInbox::Spawn qw(popen_rd);
 use IO::Handle ();
 use PublicInbox::Syscall qw(EPOLLONESHOT);
+use PublicInbox::DS qw(dwaitpid);
 # fields:
 #	async_cat => GitAsyncCat ref (read-only pipe)
 #	sock => writable pipe to Gcf2::loop
@@ -65,18 +66,11 @@ no warnings 'once';
 
 sub DESTROY {
 	my ($self) = @_;
-	my $pid = delete $self->{pid};
 	delete $self->{in};
-	return unless $pid;
-	eval {
-		PublicInbox::DS::dwaitpid($pid, undef, undef);
-		$self->close; # we're still in the event loop
-	};
-	if ($@) { # wait synchronously if not in event loop
-		my $sock = delete $self->{sock};
-		close $sock if $sock;
-		waitpid($pid, 0);
-	}
+	# GitAsyncCat::event_step may reap us with WNOHANG, too
+	my $pid = delete $self->{pid} or return;
+	PublicInbox::DS->in_loop ? $self->close : delete($self->{sock});
+	dwaitpid $pid;
 }
 
 # used by GitAsyncCat
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 73dc7d3e..fdfe1269 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -21,6 +21,7 @@ use PublicInbox::Tmpfile;
 use IO::Poll qw(POLLIN);
 use Carp qw(croak);
 use Digest::SHA ();
+use PublicInbox::DS qw(dwaitpid);
 our @EXPORT_OK = qw(git_unquote git_quote);
 our $PIPE_BUFSIZ = 65536; # Linux default
 our $in_cleanup;
@@ -326,10 +327,7 @@ sub _destroy {
 
 	# GitAsyncCat::event_step may delete {pid}
 	my $p = delete $self->{$pid} or return;
-
-	# PublicInbox::DS may not be loaded
-	eval { PublicInbox::DS::dwaitpid($p, undef, undef) };
-	waitpid($p, 0) if $@; # wait synchronously if not in event loop
+	dwaitpid $p;
 }
 
 sub cat_async_abort ($) {
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 1ba9eff3..7b7f45de 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -18,7 +18,7 @@ use Sys::Syslog qw(syslog openlog);
 use PublicInbox::Config;
 use PublicInbox::Syscall qw($SFD_NONBLOCK EPOLLIN EPOLLONESHOT);
 use PublicInbox::Sigfd;
-use PublicInbox::DS qw(now);
+use PublicInbox::DS qw(now dwaitpid);
 use PublicInbox::Spawn qw(spawn run_die);
 use PublicInbox::OnDestroy;
 use Text::Wrap qw(wrap);
@@ -604,7 +604,7 @@ sub lei_git { # support passing through random git commands
 	my ($self, @argv) = @_;
 	my %rdr = map { $_ => $self->{$_} } (0..2);
 	my $pid = spawn(['git', @argv], $self->{env}, \%rdr);
-	PublicInbox::DS::dwaitpid($pid, \&reap_exec, $self);
+	dwaitpid($pid, \&reap_exec, $self);
 }
 
 sub accept_dispatch { # Listener {post_accept} callback
diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm
index c9234f42..afbb048d 100644
--- a/lib/PublicInbox/ProcessPipe.pm
+++ b/lib/PublicInbox/ProcessPipe.pm
@@ -5,6 +5,7 @@
 package PublicInbox::ProcessPipe;
 use strict;
 use v5.10.1;
+use PublicInbox::DS qw(dwaitpid);
 
 sub TIEHANDLE {
 	my ($class, $pid, $fh, $cb, $arg) = @_;
@@ -25,19 +26,21 @@ sub PRINT {
 	print { $self->{fh} } @_;
 }
 
+sub adjust_ret { # dwaitpid callback
+	my ($retref, $pid) = @_;
+	$$retref = '' if $?
+}
+
 sub CLOSE {
 	my $fh = delete($_[0]->{fh});
 	my $ret = defined $fh ? close($fh) : '';
 	my ($pid, $cb, $arg) = delete @{$_[0]}{qw(pid cb arg)};
 	if (defined $pid) {
-		# PublicInbox::DS may not be loaded
-		eval { PublicInbox::DS::dwaitpid($pid, $cb, $arg) };
-
-		if ($@) { # ok, not in the event loop, work synchronously
-			waitpid($pid, 0);
-			$ret = '' if $?;
-			$cb->($arg, $pid) if $cb;
+		unless ($cb) {
+			$cb = \&adjust_ret;
+			$arg = \$ret;
 		}
+		dwaitpid $pid, $cb, $arg;
 	}
 	$ret;
 }
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 2aa2042a..5bbbb027 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -12,12 +12,13 @@
 # operate in.  This can be useful to ensure smaller inboxes can
 # be cloned while cloning of large inboxes is maxed out.
 #
-# This does not depend on PublicInbox::DS or any other external
-# scheduling mechanism, you just need to call start() and finish()
-# appropriately. However, public-inbox-httpd (which uses PublicInbox::DS)
-# will be able to schedule this based on readability of stdout from
-# the spawned process.  See GitHTTPBackend.pm and SolverGit.pm for
-# usage examples.  It does not depend on any form of threading.
+# This does not depend on the PublicInbox::DS->EventLoop or any
+# other external scheduling mechanism, you just need to call
+# start() and finish() appropriately. However, public-inbox-httpd
+# (which uses PublicInbox::DS)  will be able to schedule this
+# based on readability of stdout from the spawned process.
+# See GitHTTPBackend.pm and SolverGit.pm for usage examples.
+# It does not depend on any form of threading.
 #
 # This is useful for scheduling CGI execution of both long-lived
 # git-http-backend(1) process (for "git clone") as well as short-lived
@@ -27,6 +28,7 @@ package PublicInbox::Qspawn;
 use strict;
 use PublicInbox::Spawn qw(popen_rd);
 use PublicInbox::GzipFilter;
+use PublicInbox::DS qw(dwaitpid); # doesn't need event loop
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
 use Errno qw(EAGAIN EINTR);
@@ -116,37 +118,12 @@ sub finalize ($$) {
 }
 
 # callback for dwaitpid
-sub waitpid_err ($$) {
-	my ($self, $pid) = @_;
-	my $xpid = delete $self->{pid};
-	my $err;
-	if (defined $pid) {
-		if ($pid > 0) { # success!
-			$err = child_err($?);
-		} elsif ($pid < 0) { # ??? does this happen in our case?
-			$err = "W: waitpid($xpid, 0) => $pid: $!";
-		} # else should not be called with pid == 0
-	}
-	finalize($self, $err);
-}
-
-sub do_waitpid ($) {
-	my ($self) = @_;
-	my $pid = $self->{pid};
-	# PublicInbox::DS may not be loaded
-	eval { PublicInbox::DS::dwaitpid($pid, \&waitpid_err, $self) };
-	# done if we're running in PublicInbox::DS::EventLoop
-	if ($@) {
-		# non public-inbox-{httpd,nntpd} callers may block:
-		my $ret = waitpid($pid, 0);
-		waitpid_err($self, $ret);
-	}
-}
+sub waitpid_err { finalize($_[0], child_err($?)) }
 
 sub finish ($;$) {
 	my ($self, $err) = @_;
 	if (delete $self->{rpipe}) {
-		do_waitpid($self);
+		dwaitpid $self->{pid}, \&waitpid_err, $self;
 	} else {
 		finalize($self, $err);
 	}

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 32/36] syscall: SFD_NONBLOCK can be a constant, again
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (30 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 31/36] use PublicInbox::DS for dwaitpid Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 33/36] lei: avoid Spawn package when starting daemon Eric Wong
                   ` (3 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

Since Perl exposes O_NONBLOCK as a constant, we can safely make
SFD_NONBLOCK a constant, too.  This is not the case for
SFD_CLOEXEC, since O_CLOEXEC is not exposed by Perl despite
being used internally in the interpreter.
---
 lib/PublicInbox/DSKQXS.pm  | 4 ++--
 lib/PublicInbox/Daemon.pm  | 4 ++--
 lib/PublicInbox/LEI.pm     | 4 ++--
 lib/PublicInbox/Sigfd.pm   | 4 ++--
 lib/PublicInbox/Syscall.pm | 4 ++--
 script/public-inbox-watch  | 4 ++--
 t/sigfd.t                  | 6 +++---
 7 files changed, 15 insertions(+), 15 deletions(-)

diff --git a/lib/PublicInbox/DSKQXS.pm b/lib/PublicInbox/DSKQXS.pm
index aa2c9168..9a37e4ce 100644
--- a/lib/PublicInbox/DSKQXS.pm
+++ b/lib/PublicInbox/DSKQXS.pm
@@ -18,7 +18,7 @@ use Symbol qw(gensym);
 use IO::KQueue;
 use Errno qw(EAGAIN);
 use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLLET
-	EPOLL_CTL_ADD EPOLL_CTL_MOD EPOLL_CTL_DEL $SFD_NONBLOCK);
+	EPOLL_CTL_ADD EPOLL_CTL_MOD EPOLL_CTL_DEL SFD_NONBLOCK);
 our @EXPORT_OK = qw(epoll_ctl epoll_wait);
 
 sub EV_DISPATCH () { 0x0080 }
@@ -57,7 +57,7 @@ sub signalfd {
 sub TIEHANDLE { # similar to signalfd()
 	my ($class, $signo, $flags) = @_;
 	my $self = $class->new;
-	$self->{timeout} = ($flags & $SFD_NONBLOCK) ? 0 : -1;
+	$self->{timeout} = ($flags & SFD_NONBLOCK) ? 0 : -1;
 	my $kq = $self->{kq};
 	$kq->EV_SET($_, EVFILT_SIGNAL, EV_ADD) for @$signo;
 	$self;
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index bdf1dc45..f68337a0 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -16,7 +16,7 @@ sub SO_ACCEPTFILTER () { 0x1000 }
 STDOUT->autoflush(1);
 STDERR->autoflush(1);
 use PublicInbox::DS qw(now);
-use PublicInbox::Syscall qw($SFD_NONBLOCK);
+use PublicInbox::Syscall qw(SFD_NONBLOCK);
 require PublicInbox::Listener;
 use PublicInbox::EOFpipe;
 use PublicInbox::Sigfd;
@@ -627,7 +627,7 @@ sub daemon_loop ($$$$) {
 		# this calls epoll_create:
 		PublicInbox::Listener->new($_, $tls_cb || $post_accept)
 	} @listeners;
-	my $sigfd = PublicInbox::Sigfd->new($sig, $SFD_NONBLOCK);
+	my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
 	local %SIG = (%SIG, %$sig) if !$sigfd;
 	if (!$sigfd) {
 		# wake up every second to accept signals if we don't
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 7b7f45de..03302f8a 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -16,7 +16,7 @@ use POSIX ();
 use IO::Handle ();
 use Sys::Syslog qw(syslog openlog);
 use PublicInbox::Config;
-use PublicInbox::Syscall qw($SFD_NONBLOCK EPOLLIN EPOLLONESHOT);
+use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLONESHOT);
 use PublicInbox::Sigfd;
 use PublicInbox::DS qw(now dwaitpid);
 use PublicInbox::Spawn qw(spawn run_die);
@@ -704,7 +704,7 @@ sub lazy_start {
 		USR1 => \&noop,
 		USR2 => \&noop,
 	};
-	my $sigfd = PublicInbox::Sigfd->new($sig, $SFD_NONBLOCK);
+	my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
 	local %SIG = (%SIG, %$sig) if !$sigfd;
 	if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets
 		PublicInbox::DS->SetLoopTimeout(5000);
diff --git a/lib/PublicInbox/Sigfd.pm b/lib/PublicInbox/Sigfd.pm
index 5d61e630..bf91bb37 100644
--- a/lib/PublicInbox/Sigfd.pm
+++ b/lib/PublicInbox/Sigfd.pm
@@ -6,7 +6,7 @@
 package PublicInbox::Sigfd;
 use strict;
 use parent qw(PublicInbox::DS);
-use PublicInbox::Syscall qw(signalfd EPOLLIN EPOLLET $SFD_NONBLOCK);
+use PublicInbox::Syscall qw(signalfd EPOLLIN EPOLLET SFD_NONBLOCK);
 use POSIX qw(:signal_h);
 use IO::Handle ();
 
@@ -33,7 +33,7 @@ sub new {
 	} else {
 		return; # wake up every second to check for signals
 	}
-	if ($flags & $SFD_NONBLOCK) { # it can go into the event loop
+	if ($flags & SFD_NONBLOCK) { # it can go into the event loop
 		$self->SUPER::new($io, EPOLLIN | EPOLLET);
 	} else { # master main loop
 		$self->{sock} = $io;
diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm
index c403f78a..180ee2cc 100644
--- a/lib/PublicInbox/Syscall.pm
+++ b/lib/PublicInbox/Syscall.pm
@@ -22,7 +22,7 @@ our @EXPORT_OK = qw(epoll_ctl epoll_create epoll_wait
                   EPOLLIN EPOLLOUT EPOLLET
                   EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
                   EPOLLONESHOT EPOLLEXCLUSIVE
-                  signalfd $SFD_NONBLOCK);
+                  signalfd SFD_NONBLOCK);
 our %EXPORT_TAGS = (epoll => [qw(epoll_ctl epoll_create epoll_wait
                              EPOLLIN EPOLLOUT
                              EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
@@ -67,7 +67,7 @@ our (
      );
 
 my $SFD_CLOEXEC = 02000000; # Perl does not expose O_CLOEXEC
-our $SFD_NONBLOCK = O_NONBLOCK;
+sub SFD_NONBLOCK () { O_NONBLOCK }
 our $no_deprecated = 0;
 
 if ($^O eq "linux") {
diff --git a/script/public-inbox-watch b/script/public-inbox-watch
index 55183ef2..4fd6ad49 100755
--- a/script/public-inbox-watch
+++ b/script/public-inbox-watch
@@ -14,7 +14,7 @@ use PublicInbox::Watch;
 use PublicInbox::Config;
 use PublicInbox::DS;
 use PublicInbox::Sigfd;
-use PublicInbox::Syscall qw($SFD_NONBLOCK);
+use PublicInbox::Syscall qw(SFD_NONBLOCK);
 my $do_scan = 1;
 GetOptions('scan!' => \$do_scan, # undocumented, testing only
 	'help|h' => \(my $show_help)) or do { print STDERR $help; exit 1 };
@@ -57,7 +57,7 @@ if ($watch) {
 	# --no-scan is only intended for testing atm, undocumented.
 	PublicInbox::DS::requeue($scan) if $do_scan;
 
-	my $sigfd = PublicInbox::Sigfd->new($sig, $SFD_NONBLOCK);
+	my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
 	local %SIG = (%SIG, %$sig) if !$sigfd;
 	if (!$sigfd) {
 		PublicInbox::Sigfd::sig_setmask($oldset);
diff --git a/t/sigfd.t b/t/sigfd.t
index 8daf3137..07120b64 100644
--- a/t/sigfd.t
+++ b/t/sigfd.t
@@ -4,7 +4,7 @@ use Test::More;
 use IO::Handle;
 use POSIX qw(:signal_h);
 use Errno qw(ENOSYS);
-use PublicInbox::Syscall qw($SFD_NONBLOCK);
+use PublicInbox::Syscall qw(SFD_NONBLOCK);
 require_ok 'PublicInbox::Sigfd';
 
 SKIP: {
@@ -42,8 +42,8 @@ SKIP: {
 		}
 		$sigfd = undef;
 
-		my $nbsig = PublicInbox::Sigfd->new($sig, $SFD_NONBLOCK);
-		ok($nbsig, 'Sigfd->new $SFD_NONBLOCK works');
+		my $nbsig = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
+		ok($nbsig, 'Sigfd->new SFD_NONBLOCK works');
 		is($nbsig->wait_once, undef, 'nonblocking ->wait_once');
 		ok($! == Errno::EAGAIN, 'got EAGAIN');
 		kill('HUP', $$) or die "kill $!";

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 33/36] lei: avoid Spawn package when starting daemon
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (31 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 32/36] syscall: SFD_NONBLOCK can be a constant, again Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 34/36] avoid calling waitpid from children in DESTROY Eric Wong
                   ` (2 subsequent siblings)
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

Spawn was designed to speed up process spawning inside
long-lived daemons with largish memory usage.  It does not help
for short-lived scripts which only exist to start and connect to
a daemon.

This change actually speeds up initial lei startup from
~190ms to ~140ms(!).  Normal usage once the daemon is running
is unaffected, at <20ms for help text.

While we're in the area, simplify Cwd error message generation,
too.
---
 lib/PublicInbox/LEI.pm | 10 +++++-----
 script/lei             | 17 ++++++-----------
 2 files changed, 11 insertions(+), 16 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 03302f8a..b84e24ef 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -675,7 +675,7 @@ sub lazy_start {
 	require IO::FDPass;
 	require PublicInbox::Listener;
 	require PublicInbox::EOFpipe;
-	(-p STDOUT && -p STDERR) or die "E: stdout+stderr must be pipes\n";
+	(-p STDOUT) or die "E: stdout must be a pipe\n";
 	open(STDIN, '+<', '/dev/null') or die "redirect stdin failed: $!";
 	POSIX::setsid() > 0 or die "setsid: $!";
 	my $pid = fork // die "fork: $!";
@@ -740,10 +740,9 @@ sub lazy_start {
 		$n; # true: continue, false: stop
 	});
 
-	# STDIN was redirected to /dev/null above, closing STDOUT and
-	# STDERR will cause the calling `lei' client process to finish
-	# reading <$daemon> pipe.
-	open STDOUT, '>&STDIN' or die "redirect stdout failed: $!";
+	# STDIN was redirected to /dev/null above, closing STDERR and
+	# STDOUT will cause the calling `lei' client process to finish
+	# reading the <$daemon> pipe.
 	openlog($path, 'pid', 'user');
 	local $SIG{__WARN__} = sub { syslog('warning', "@_") };
 	my $owner_pid = $$;
@@ -751,6 +750,7 @@ sub lazy_start {
 		syslog('crit', "$@") if $@ && $$ == $owner_pid;
 	});
 	open STDERR, '>&STDIN' or die "redirect stderr failed: $!";
+	open STDOUT, '>&STDIN' or die "redirect stdout failed: $!";
 	# $daemon pipe to `lei' closed, main loop begins:
 	PublicInbox::DS->EventLoop;
 	@$on_destroy = (); # cancel on_destroy if we get here
diff --git a/script/lei b/script/lei
index ceaf1e00..0457adfd 100755
--- a/script/lei
+++ b/script/lei
@@ -21,18 +21,13 @@ if (my ($sock, $pwd) = eval {
 	my $addr = pack_sockaddr_un($path);
 	socket(my $sock, AF_UNIX, SOCK_STREAM, 0) or die "socket: $!";
 	unless (connect($sock, $addr)) { # start the daemon if not started
-		my $cmd = [ $^X, qw[-MPublicInbox::LEI
+		local $ENV{PERL5LIB} = join(':', @INC);
+		open(my $daemon, '-|', $^X, qw[-MPublicInbox::LEI
 			-E PublicInbox::LEI::lazy_start(@ARGV)],
-			$path, $! + 0 ];
-		my $env = { PERL5LIB => join(':', @INC) };
-		pipe(my ($daemon, $w)) or die "pipe: $!";
-		my $opt = { 1 => $w, 2 => $w };
-		require PublicInbox::Spawn;
-		my $pid = PublicInbox::Spawn::spawn($cmd, $env, $opt);
-		$opt = $w = undef;
+			$path, $! + 0) or die "popen: $!";
 		while (<$daemon>) { warn $_ } # EOF when STDERR is redirected
-		waitpid($pid, 0) or warn <<"";
-lei-daemon could not start, PID:$pid exited with \$?=$?
+		close($daemon) or warn <<"";
+lei-daemon could not start, exited with \$?=$?
 
 		# try connecting again anyways, unlink+bind may be racy
 		unless (connect($sock, $addr)) {
@@ -43,8 +38,8 @@ Falling back to (slow) one-shot mode
 		}
 	}
 	require Cwd;
-	my $cwd = Cwd::fastcwd() // die "fastcwd(PWD=".($ENV{PWD}//'').": $!";
 	my $pwd = $ENV{PWD} // '';
+	my $cwd = Cwd::fastcwd() // die "fastcwd(PWD=$pwd): $!";
 	if ($pwd ne $cwd) { # prefer ENV{PWD} if it's a symlink to real cwd
 		my @st_cwd = stat($cwd) or die "stat(cwd=$cwd): $!";
 		my @st_pwd = stat($pwd); # PWD invalid, use cwd

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 34/36] avoid calling waitpid from children in DESTROY
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (32 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 33/36] lei: avoid Spawn package when starting daemon Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 35/36] ds: clobber $in_loop first at reset Eric Wong
  2020-12-31 13:51 ` [PATCH 36/36] on_destroy: support PID owner guard Eric Wong
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

Objects with DESTROY callbacks get propagated to children, so we
must be careful to not invoke waitpid from children on their
sibling processes.  Only parents (and their parents...) can reap
child processes.
---
 lib/PublicInbox/DS.pm         | 4 ++--
 lib/PublicInbox/Gcf2Client.pm | 8 ++++++--
 lib/PublicInbox/Git.pm        | 3 ++-
 3 files changed, 10 insertions(+), 5 deletions(-)

diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 01c9abd4..4f1558c7 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -624,10 +624,10 @@ sub dwaitpid ($;$$) {
 		if ($ret == $pid) {
 			if ($cb) {
 				eval { $cb->($arg, $pid) };
-				warn "E: dwaitpid($pid) !in_loop: $@" if $@;
+				carp "E: dwaitpid($pid) !in_loop: $@" if $@;
 			}
 		} else {
-			warn "waitpid($pid, 0) = $ret, \$!=$!, \$?=$?";
+			carp "waitpid($pid, 0) = $ret, \$!=$!, \$?=$?";
 		}
 	}
 }
diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index 10820852..54957cf3 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -15,6 +15,7 @@ use PublicInbox::DS qw(dwaitpid);
 #	sock => writable pipe to Gcf2::loop
 #	in => pipe we read from
 #	pid => PID of Gcf2::loop process
+#	owner_pid => process which spawned {pid}
 sub new  {
 	my ($rdr) = @_;
 	my $self = bless {}, __PACKAGE__;
@@ -25,6 +26,7 @@ sub new  {
 	$rdr //= {};
 	$rdr->{0} = $out_r;
 	my $cmd = [$^X, qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop()]];
+	$self->{owner_pid} = $$;
 	@$self{qw(in pid)} = popen_rd($cmd, $env, $rdr);
 	fcntl($out_w, 1031, 4096) if $^O eq 'linux'; # 1031: F_SETPIPE_SZ
 	$out_w->autoflush(1);
@@ -69,8 +71,10 @@ sub DESTROY {
 	delete $self->{in};
 	# GitAsyncCat::event_step may reap us with WNOHANG, too
 	my $pid = delete $self->{pid} or return;
-	PublicInbox::DS->in_loop ? $self->close : delete($self->{sock});
-	dwaitpid $pid;
+	if ($$ == $self->{owner_pid}) {
+		PublicInbox::DS->in_loop ? $self->close : delete($self->{sock});
+		dwaitpid $pid;
+	}
 }
 
 # used by GitAsyncCat
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index fdfe1269..47928c55 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -126,6 +126,7 @@ sub _bidi_pipe {
 	}
 	my ($in_r, $p) = popen_rd(\@cmd, undef, $redir);
 	$self->{$pid} = $p;
+	$self->{"$pid.owner"} = $$;
 	$out_w->autoflush(1);
 	if ($^O eq 'linux') { # 1031: F_SETPIPE_SZ
 		fcntl($out_w, 1031, 4096);
@@ -327,7 +328,7 @@ sub _destroy {
 
 	# GitAsyncCat::event_step may delete {pid}
 	my $p = delete $self->{$pid} or return;
-	dwaitpid $p;
+	dwaitpid($p) if $$ == $self->{"$pid.owner"};
 }
 
 sub cat_async_abort ($) {

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 35/36] ds: clobber $in_loop first at reset
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (33 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 34/36] avoid calling waitpid from children in DESTROY Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  2020-12-31 13:51 ` [PATCH 36/36] on_destroy: support PID owner guard Eric Wong
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

This may help ensure DESTROY callbacks will see in_loop
before the others.
---
 lib/PublicInbox/DS.pm | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 4f1558c7..8a560ae8 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -66,8 +66,9 @@ Reset all state
 
 =cut
 sub Reset {
+    $in_loop = undef; # first in case DESTROY callbacks use this
     %DescriptorMap = ();
-    $in_loop = $wait_pids = $later_queue = $reap_armed = undef;
+    $wait_pids = $later_queue = $reap_armed = undef;
     $EXPMAP = {};
     $nextq = $ToClose = $later_timer = $exp_timer = undef;
     $LoopTimeout = -1;  # no timeout by default

^ permalink raw reply related	[flat|nested] 37+ messages in thread

* [PATCH 36/36] on_destroy: support PID owner guard
  2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
                   ` (34 preceding siblings ...)
  2020-12-31 13:51 ` [PATCH 35/36] ds: clobber $in_loop first at reset Eric Wong
@ 2020-12-31 13:51 ` Eric Wong
  35 siblings, 0 replies; 37+ messages in thread
From: Eric Wong @ 2020-12-31 13:51 UTC (permalink / raw)
  To: meta

Since we'll be forking for Xapian indexing and maybe
other places, having a simple guard in place to ensure
OnDestroy doesn't unexpectedly unlink files or similar
is a safer option.
---
 lib/PublicInbox/LEI.pm       | 5 ++---
 lib/PublicInbox/Lock.pm      | 4 ++--
 lib/PublicInbox/OnDestroy.pm | 5 +++++
 script/public-inbox-init     | 2 +-
 t/on_destroy.t               | 9 +++++++++
 5 files changed, 19 insertions(+), 6 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index b84e24ef..4af85d49 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -745,9 +745,8 @@ sub lazy_start {
 	# reading the <$daemon> pipe.
 	openlog($path, 'pid', 'user');
 	local $SIG{__WARN__} = sub { syslog('warning', "@_") };
-	my $owner_pid = $$;
-	my $on_destroy = PublicInbox::OnDestroy->new(sub {
-		syslog('crit', "$@") if $@ && $$ == $owner_pid;
+	my $on_destroy = PublicInbox::OnDestroy->new($$, sub {
+		syslog('crit', "$@") if $@;
 	});
 	open STDERR, '>&STDIN' or die "redirect stderr failed: $!";
 	open STDOUT, '>&STDIN' or die "redirect stdout failed: $!";
diff --git a/lib/PublicInbox/Lock.pm b/lib/PublicInbox/Lock.pm
index f6eaa5ce..1d0b4f9c 100644
--- a/lib/PublicInbox/Lock.pm
+++ b/lib/PublicInbox/Lock.pm
@@ -36,9 +36,9 @@ sub lock_release {
 
 # caller must use return value
 sub lock_for_scope {
-	my ($self) = @_;
+	my ($self, @single_pid) = @_;
 	$self->lock_acquire;
-	PublicInbox::OnDestroy->new(\&lock_release, $self);
+	PublicInbox::OnDestroy->new(@single_pid, \&lock_release, $self);
 }
 
 sub new_tmp {
diff --git a/lib/PublicInbox/OnDestroy.pm b/lib/PublicInbox/OnDestroy.pm
index 841f87d4..65ebd7dc 100644
--- a/lib/PublicInbox/OnDestroy.pm
+++ b/lib/PublicInbox/OnDestroy.pm
@@ -10,6 +10,11 @@ sub new {
 
 sub DESTROY {
 	my ($cb, @args) = @{$_[0]};
+	if (!ref($cb)) {
+		my $pid = $cb;
+		return if $pid != $$;
+		$cb = shift @args;
+	}
 	$cb->(@args) if $cb;
 }
 
diff --git a/script/public-inbox-init b/script/public-inbox-init
index 693f5ca1..222d0c60 100755
--- a/script/public-inbox-init
+++ b/script/public-inbox-init
@@ -92,7 +92,7 @@ sysopen($lockfh, $lockfile, O_RDWR|O_CREAT|O_EXCL) or do {
 	exit(255);
 };
 require PublicInbox::OnDestroy;
-my $auto_unlink = PublicInbox::OnDestroy->new(sub { unlink $lockfile });
+my $auto_unlink = PublicInbox::OnDestroy->new($$, sub { unlink $lockfile });
 my ($perm, %seen);
 if (-e $pi_config) {
 	open(my $oh, '<', $pi_config) or die "unable to read $pi_config: $!\n";
diff --git a/t/on_destroy.t b/t/on_destroy.t
index 8b85b48e..0de67d0b 100644
--- a/t/on_destroy.t
+++ b/t/on_destroy.t
@@ -16,6 +16,15 @@ $od = PublicInbox::OnDestroy->new(sub { @x = @_ }, qw(x y));
 undef $od;
 is_deeply(\@x, [ 'x', 'y' ], '2 args passed');
 
+open my $tmp, '+>>', undef or BAIL_OUT $!;
+$tmp->autoflush(1);
+$od = PublicInbox::OnDestroy->new(1, sub { print $tmp "$$ DESTROY\n" });
+undef $od;
+is(-s $tmp, 0, '$tmp is empty on pid mismatch');
+$od = PublicInbox::OnDestroy->new($$, sub { $tmp = $$ });
+undef $od;
+is($tmp, $$, '$tmp set to $$ by callback');
+
 if (my $nr = $ENV{TEST_LEAK_NR}) {
 	for (0..$nr) {
 		$od = PublicInbox::OnDestroy->new(sub { @x = @_ }, qw(x y));

^ permalink raw reply related	[flat|nested] 37+ messages in thread

end of thread, other threads:[~2020-12-31 13:51 UTC | newest]

Thread overview: 37+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
2020-12-31 13:51 ` [PATCH 01/36] import: respect init.defaultBranch Eric Wong
2020-12-31 13:51 ` [PATCH 02/36] lei_store: use per-machine refname as git HEAD Eric Wong
2020-12-31 13:51 ` [PATCH 03/36] revert "lei_store: use per-machine refname as git HEAD" Eric Wong
2020-12-31 13:51 ` [PATCH 04/36] lei_to_mail: initial implementation for writing mbox formats Eric Wong
2020-12-31 13:51 ` [PATCH 05/36] sharedkv: fork()-friendly key-value store Eric Wong
2020-12-31 13:51 ` [PATCH 06/36] sharedkv: split out index_values Eric Wong
2020-12-31 13:51 ` [PATCH 07/36] lei_to_mail: start atomic and compressed mbox writing Eric Wong
2020-12-31 13:51 ` [PATCH 08/36] mboxreader: new class for reading various mbox formats Eric Wong
2020-12-31 13:51 ` [PATCH 09/36] lei_to_mail: start --augment, dedupe, bz2 and xz Eric Wong
2020-12-31 13:51 ` [PATCH 10/36] lei: implement various deduplication strategies Eric Wong
2020-12-31 13:51 ` [PATCH 11/36] lei_to_mail: lazy-require LeiDedupe Eric Wong
2020-12-31 13:51 ` [PATCH 12/36] lei_to_mail: support for non-seekable outputs Eric Wong
2020-12-31 13:51 ` [PATCH 13/36] lei_to_mail: support Maildir, fix+test --augment Eric Wong
2020-12-31 13:51 ` [PATCH 14/36] ipc: generic IPC dispatch based on Storable Eric Wong
2020-12-31 13:51 ` [PATCH 15/36] ipc: support Sereal Eric Wong
2020-12-31 13:51 ` [PATCH 16/36] lei_store: add ->set_eml, ->add_eml can return smsg Eric Wong
2020-12-31 13:51 ` [PATCH 17/36] lei: rename "extinbox" => "external" Eric Wong
2020-12-31 13:51 ` [PATCH 18/36] mid: use defined-or with `push' for uniqueness check Eric Wong
2020-12-31 13:51 ` [PATCH 19/36] mid: hoist out mids_in sub Eric Wong
2020-12-31 13:51 ` [PATCH 20/36] lei_store: handle messages without Message-ID at all Eric Wong
2020-12-31 13:51 ` [PATCH 21/36] ipc: use shutdown(2), base atfork* callback Eric Wong
2020-12-31 13:51 ` [PATCH 22/36] lei_to_mail: unlink mboxes if not augmenting Eric Wong
2020-12-31 13:51 ` [PATCH 23/36] lei: add --mfolder as an --output alias Eric Wong
2020-12-31 13:51 ` [PATCH 24/36] spawn: move run_die here from PublicInbox::Import Eric Wong
2020-12-31 13:51 ` [PATCH 25/36] init: remove embedded UnlinkMe package Eric Wong
2020-12-31 13:51 ` [PATCH 26/36] t/run: avoid uninitialized var on incomplete test Eric Wong
2020-12-31 13:51 ` [PATCH 27/36] gcf2client: reap process on DESTROY Eric Wong
2020-12-31 13:51 ` [PATCH 28/36] lei_to_mail: open FIFOs O_WRONLY so we block Eric Wong
2020-12-31 13:51 ` [PATCH 29/36] searchidxshard: call DS->Reset at worker start Eric Wong
2020-12-31 13:51 ` [PATCH 30/36] t/ipc.t: test for references via `die' Eric Wong
2020-12-31 13:51 ` [PATCH 31/36] use PublicInbox::DS for dwaitpid Eric Wong
2020-12-31 13:51 ` [PATCH 32/36] syscall: SFD_NONBLOCK can be a constant, again Eric Wong
2020-12-31 13:51 ` [PATCH 33/36] lei: avoid Spawn package when starting daemon Eric Wong
2020-12-31 13:51 ` [PATCH 34/36] avoid calling waitpid from children in DESTROY Eric Wong
2020-12-31 13:51 ` [PATCH 35/36] ds: clobber $in_loop first at reset Eric Wong
2020-12-31 13:51 ` [PATCH 36/36] on_destroy: support PID owner guard Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).