unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
* [PATCH 0/4] lei q: outputs to Maildir and mbox* working
@ 2021-01-16 11:36 Eric Wong
  2021-01-16 11:36 ` [PATCH 1/4] lei_to_mail: prepare for worker offload Eric Wong
                   ` (3 more replies)
  0 siblings, 4 replies; 5+ messages in thread
From: Eric Wong @ 2021-01-16 11:36 UTC (permalink / raw)
  To: meta

Only lightly-tested but this is the key "inspired by mairix"
part.  It's slow compared to mairix due to git storage and not
being able to use hardlinks, but git blob extraction will be
parallelizable.

Eric Wong (4):
  lei_to_mail: prepare for worker offload
  ipc: children don't kill on DESTROY, reduce FD sharing
  lei: q: results output to Maildir and mbox* working
  lei: pager: pass correct env in oneshot mode

 MANIFEST                       |   1 +
 lib/PublicInbox/IPC.pm         |  21 ++--
 lib/PublicInbox/LEI.pm         |  30 +++--
 lib/PublicInbox/LeiDedupe.pm   |   3 +-
 lib/PublicInbox/LeiOverview.pm |  60 ++++++----
 lib/PublicInbox/LeiQuery.pm    |  14 +--
 lib/PublicInbox/LeiToMail.pm   | 206 +++++++++++++++++++++------------
 lib/PublicInbox/LeiXSearch.pm  | 119 ++++++++++++++-----
 lib/PublicInbox/OpPipe.pm      |  41 +++++++
 t/lei.t                        |  20 ++++
 t/lei_to_mail.t                |  64 +++++-----
 11 files changed, 398 insertions(+), 181 deletions(-)
 create mode 100644 lib/PublicInbox/OpPipe.pm

^ permalink raw reply	[flat|nested] 5+ messages in thread

* [PATCH 1/4] lei_to_mail: prepare for worker offload
  2021-01-16 11:36 [PATCH 0/4] lei q: outputs to Maildir and mbox* working Eric Wong
@ 2021-01-16 11:36 ` Eric Wong
  2021-01-16 11:36 ` [PATCH 2/4] ipc: children don't kill on DESTROY, reduce FD sharing Eric Wong
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 5+ messages in thread
From: Eric Wong @ 2021-01-16 11:36 UTC (permalink / raw)
  To: meta

We'll be doing most of the work in forked off worker processes,
so ensure some of it is fork and serialization-friendly.
---
 lib/PublicInbox/LeiOverview.pm |  20 ++--
 lib/PublicInbox/LeiToMail.pm   | 175 +++++++++++++++++++--------------
 t/lei_to_mail.t                |  62 +++++++-----
 3 files changed, 147 insertions(+), 110 deletions(-)

diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index ef5f27c1..9846bc8a 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -25,7 +25,7 @@ sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
 sub ovv_out_lk_init ($) {
 	my ($self) = @_;
 	$self->{tmp_lk_id} = "$self.$$";
-	my $tmp = File::Temp->new("lei-ovv.out.$$.lock-XXXXXX",
+	my $tmp = File::Temp->new("lei-ovv.dst.$$.lock-XXXXXX",
 					TMPDIR => 1, UNLINK => 0);
 	$self->{lock_path} = $tmp->filename;
 }
@@ -39,32 +39,32 @@ sub ovv_out_lk_cancel ($) {
 sub new {
 	my ($class, $lei) = @_;
 	my $opt = $lei->{opt};
-	my $out = $opt->{output} // '-';
-	$out = '/dev/stdout' if $out eq '-';
+	my $dst = $opt->{output} // '-';
+	$dst = '/dev/stdout' if $dst eq '-';
 
 	my $fmt = $opt->{'format'};
 	$fmt = lc($fmt) if defined $fmt;
-	if ($out =~ s/\A([a-z]+)://is) { # e.g. Maildir:/home/user/Mail/
+	if ($dst =~ s/\A([a-z]+)://is) { # e.g. Maildir:/home/user/Mail/
 		my $ofmt = lc $1;
 		$fmt //= $ofmt;
 		return $lei->fail(<<"") if $fmt ne $ofmt;
 --format=$fmt and --output=$ofmt conflict
 
 	}
-	$fmt //= 'json' if $out eq '/dev/stdout';
+	$fmt //= 'json' if $dst eq '/dev/stdout';
 	$fmt //= 'maildir'; # TODO
 
-	if (index($out, '://') < 0) { # not a URL, so assume path
-		 $out = File::Spec->canonpath($out);
+	if (index($dst, '://') < 0) { # not a URL, so assume path
+		 $dst = File::Spec->canonpath($dst);
 	} # else URL
 
-	my $self = bless { fmt => $fmt, out => $out }, $class;
+	my $self = bless { fmt => $fmt, dst => $dst }, $class;
 	my $json;
 	if ($fmt =~ /\A($JSONL|(?:concat)?json)\z/) {
 		$json = $self->{json} = ref(PublicInbox::Config->json);
 	}
 	my ($isatty, $seekable);
-	if ($out eq '/dev/stdout') {
+	if ($dst eq '/dev/stdout') {
 		$isatty = -t $lei->{1};
 		$lei->start_pager if $isatty;
 		$opt->{pretty} //= $isatty;
@@ -78,7 +78,7 @@ sub new {
 	} elsif ($json) {
 		return $lei->fail('JSON formats only output to stdout');
 	} else {
-		return $lei->fail("TODO: $out -f $fmt");
+		return $lei->fail("TODO: $dst -f $fmt");
 	}
 	$self;
 }
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 4c65dce2..5d4b7978 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -146,8 +146,7 @@ sub reap_compress { # dwaitpid callback
 # { foo => '' } means "--foo" is passed to the command-line,
 # otherwise { foo => '--bar' } passes "--bar"
 our %zsfx2cmd = (
-	gz => [ qw(GZIP pigz gzip), {
-		rsyncable => '', threads => '-p' } ],
+	gz => [ qw(GZIP pigz gzip), { rsyncable => '', threads => '-p' } ],
 	bz2 => [ 'bzip2', {} ],
 	xz => [ 'xz', { threads => '-T' } ],
 	# XXX does anybody care for these?  I prefer zstd on entire FSes,
@@ -189,24 +188,23 @@ sub zsfx2cmd ($$$) {
 }
 
 sub compress_dst {
-	my ($out, $zsfx, $lei) = @_;
+	my ($self, $zsfx, $lei) = @_;
 	my $cmd = zsfx2cmd($zsfx, undef, $lei);
 	pipe(my ($r, $w)) or die "pipe: $!";
-	my $rdr = { 0 => $r, 1 => $out, 2 => $lei->{2} };
+	my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2} };
 	my $pid = spawn($cmd, $lei->{env}, $rdr);
 	$lei->{"pid.$pid"} = $cmd;
 	my $pp = gensym;
 	tie *$pp, 'PublicInbox::ProcessPipe', $pid, $w, \&reap_compress, $lei;
-	my $pipe_lk = ($lei->{opt}->{jobs} // 0) > 1 ?
-			PublicInbox::Lock->new_tmp($zsfx) : undef;
-	($pp, $pipe_lk);
+	$lei->{1} = $pp;
+	die 'BUG: unexpected {ovv}->{lock_path}' if $lei->{ovv}->{lock_path};
+	$lei->{ovv}->ovv_out_lk_init if ($lei->{opt}->{jobs} // 2) > 1;
 }
 
 sub decompress_src ($$$) {
 	my ($in, $zsfx, $lei) = @_;
 	my $cmd = zsfx2cmd($zsfx, 1, $lei);
-	my $rdr = { 0 => $in, 2 => $lei->{2} };
-	popen_rd($cmd, $lei->{env}, $rdr);
+	popen_rd($cmd, $lei->{env}, { 0 => $in, 2 => $lei->{2} });
 }
 
 sub dup_src ($) {
@@ -222,48 +220,22 @@ sub _augment { # MboxReader eml_cb
 	$lei->{dedupe}->is_dup($eml);
 }
 
-sub _mbox_write_cb ($$$$) {
-	my ($cls, $mbox, $dst, $lei) = @_;
-	my $m = "eml2$mbox";
-	my $eml2mbox = $cls->can($m) or die "$cls->$m missing";
-	my ($out, $pipe_lk, $seekable);
-	# XXX should we support /dev/stdout.gz ?
-	if ($dst eq '/dev/stdout') {
-		$out = $lei->{1};
-	} else { # TODO: mbox locking (but mairix doesn't...)
-		my $mode = -p $dst ? '>' : '+>>';
-		if (-f _ && !$lei->{opt}->{augment} and !unlink($dst)) {
-			die "unlink $dst: $!" if $! != ENOENT;
-		}
-		open $out, $mode, $dst or die "open $dst: $!";
-		# Perl does SEEK_END even with O_APPEND :<
-		$seekable = seek($out, 0, SEEK_SET);
-		die "seek $dst: $!\n" if !$seekable && $! != ESPIPE;
-	}
-	my $jobs = $lei->{opt}->{jobs} // 0;
-	state $zsfx_allow = join('|', keys %zsfx2cmd);
-	my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/);
-	my $write = $jobs > 1 && !$zsfx ? \&atomic_append : \&_print_full;
-	my $dedupe = $lei->{dedupe} = PublicInbox::LeiDedupe->new($lei);
-	if ($lei->{opt}->{augment}) {
-		die "cannot augment $dst, not seekable\n" if !$seekable;
-		if (-s $out && $dedupe->prepare_dedupe) {
-			my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) :
-					dup_src($out);
-			PublicInbox::MboxReader->$mbox($rd, \&_augment, $lei);
-		}
-		# maybe some systems don't honor O_APPEND, Perl does this:
-		seek($out, 0, SEEK_END) or die "seek $dst: $!";
-		$dedupe->pause_dedupe if $jobs; # are we forking?
-	}
-	$dedupe->prepare_dedupe if !$jobs;
-	($out, $pipe_lk) = compress_dst($out, $zsfx, $lei) if $zsfx;
+sub _mbox_write_cb ($$) {
+	my ($self, $lei) = @_;
+	my $ovv = $lei->{ovv};
+	my $m = 'eml2'.$ovv->{fmt};
+	my $eml2mbox = $self->can($m) or die "$self->$m missing";
+	my $out = $lei->{1} // die "no stdout ($m, $ovv->{dst})"; # redirected earlier
+	$out->autoflush(1);
+	my $write = $ovv->{lock_path} ? \&_print_full : \&atomic_append;
+	my $dedupe = $lei->{dedupe};
+	$dedupe->prepare_dedupe;
 	sub { # for git_to_mail
 		my ($buf, $oid, $kw) = @_;
 		my $eml = PublicInbox::Eml->new($buf);
 		if (!$dedupe->is_dup($eml, $oid)) {
 			$buf = $eml2mbox->($eml, $kw);
-			my $lock = $pipe_lk->lock_for_scope if $pipe_lk;
+			my $lk = $ovv->lock_for_scope;
 			$write->($out, $buf);
 		}
 	}
@@ -313,17 +285,55 @@ sub _buf2maildir {
 	}
 }
 
-
 sub _maildir_write_cb ($$) {
-	my ($dst, $lei) = @_;
-	$dst .= '/' unless substr($dst, -1) eq '/';
-	my $dedupe = $lei->{dedupe} = PublicInbox::LeiDedupe->new($lei, $dst);
-	my $jobs = $lei->{opt}->{jobs} // 0;
+	my ($self, $lei) = @_;
+	my $dedupe = $lei->{dedupe};
+	$dedupe->prepare_dedupe;
+	my $dst = $lei->{ovv}->{dst};
+	sub { # for git_to_mail
+		my ($buf, $oid, $kw) = @_;
+		return _buf2maildir($dst, $buf, $oid, $kw) if !$dedupe;
+		my $eml = PublicInbox::Eml->new($$buf); # copy buf
+		return if $dedupe->is_dup($eml, $oid);
+		undef $eml;
+		_buf2maildir($dst, $buf, $oid, $kw);
+	}
+}
+
+sub write_cb { # returns a callback for git_to_mail
+	my ($self, $lei) = @_;
+	# _mbox_write_cb or _maildir_write_cb
+	my $m = "_$self->{base_type}_write_cb";
+	$self->$m($lei);
+}
+
+sub new {
+	my ($cls, $lei) = @_;
+	my $fmt = $lei->{ovv}->{fmt};
+	my $dst = $lei->{ovv}->{dst};
+	my $self = bless {}, $cls;
+	if ($fmt eq 'maildir') {
+		$self->{base_type} = 'maildir';
+		$lei->{ovv}->{dst} = $dst .= '/' if substr($dst, -1) ne '/';
+	} elsif (substr($fmt, 0, 4) eq 'mbox') {
+		$self->can("eml2$fmt") or die "bad mbox --format=$fmt\n";
+		$self->{base_type} = 'mbox';
+	} else {
+		die "bad mail --format=$fmt\n";
+	}
+	my $dedupe = $lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei, $dst);
+	$self;
+}
+
+sub _prepare_maildir {
+	my ($self, $lei) = @_;
+	my $dst = $lei->{ovv}->{dst};
 	if ($lei->{opt}->{augment}) {
+		my $dedupe = $lei->{dedupe};
 		if ($dedupe && $dedupe->prepare_dedupe) {
 			require PublicInbox::InboxWritable; # eml_from_path
 			_maildir_each_file($dst, \&_augment_file, $lei);
-			$dedupe->pause_dedupe if $jobs; # are we forking?
+			$dedupe->pause_dedupe;
 		}
 	} else { # clobber existing Maildir
 		_maildir_each_file($dst, \&_unlink);
@@ -332,32 +342,51 @@ sub _maildir_write_cb ($$) {
 		my $d = $dst.$x;
 		next if -d $d;
 		require File::Path;
-		if (!File::Path::mkpath($d) && !-d $d) {
-			die "failed to mkpath($d): $!\n";
-		}
-	}
-	$dedupe->prepare_dedupe if $dedupe && !$jobs;
-	sub { # for git_to_mail
-		my ($buf, $oid, $kw) = @_;
-		return _buf2maildir($dst, $buf, $oid, $kw) if !$dedupe;
-		my $eml = PublicInbox::Eml->new($$buf); # copy buf
-		return if $dedupe->is_dup($eml, $oid);
-		undef $eml;
-		_buf2maildir($dst, $buf, $oid, $kw);
+		File::Path::mkpath($d) or die "mkpath($d): $!";
+		-d $d or die "$d is not a directory";
 	}
 }
 
-sub write_cb { # returns a callback for git_to_mail
-	my ($cls, $dst, $lei) = @_;
-	require PublicInbox::LeiDedupe;
-	if ($dst =~ s!\A(mbox(?:rd|cl|cl2|o))?:!!) {
-		_mbox_write_cb($cls, $1, $dst, $lei);
-	} elsif ($dst =~ s!\A[Mm]aildir:!!) { # typically capitalized
-		_maildir_write_cb($dst, $lei);
+sub _prepare_mbox {
+	my ($self, $lei) = @_;
+	my $dst = $lei->{ovv}->{dst};
+	my ($out, $seekable);
+	if ($dst eq '/dev/stdout') {
+		$out = $lei->{1};
 	} else {
-		undef;
+		my $mode = -p $dst ? '>' : '+>>';
+		if (-f _ && !$lei->{opt}->{augment} and !unlink($dst)) {
+			$! == ENOENT or die "unlink($dst): $!";
+		}
+		open $out, $mode, $dst or die "open($dst): $!";
+		# Perl does SEEK_END even with O_APPEND :<
+		$seekable = seek($out, 0, SEEK_SET);
+		die "seek($dst): $!\n" if !$seekable && $! != ESPIPE;
+		$lei->{1} = $out;
+	}
+	state $zsfx_allow = join('|', keys %zsfx2cmd);
+	my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/);
+	my $dedupe = $lei->{dedupe};
+	if ($lei->{opt}->{augment}) {
+		die "cannot augment $dst, not seekable\n" if !$seekable;
+		if (-s $out && $dedupe && $dedupe->prepare_dedupe) {
+			my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) :
+					dup_src($out);
+			my $fmt = $lei->{ovv}->{fmt};
+			require PublicInbox::MboxReader;
+			PublicInbox::MboxReader->$fmt($rd, \&_augment, $lei);
+		}
+		# maybe some systems don't honor O_APPEND, Perl does this:
+		seek($out, 0, SEEK_END) or die "seek $dst: $!";
+		$dedupe->pause_dedupe if $dedupe;
 	}
-	# TODO: Maildir, MH, IMAP, JMAP ...
+	compress_dst($self, $zsfx, $lei) if $zsfx;
+}
+
+sub do_prepare {
+	my ($self, $lei) = @_;
+	my $m = "_prepare_$self->{base_type}";
+	$self->$m($lei);
 }
 
 1;
diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t
index cb30fed5..d5beb3d2 100644
--- a/t/lei_to_mail.t
+++ b/t/lei_to_mail.t
@@ -11,6 +11,7 @@ use PublicInbox::Spawn qw(popen_rd which);
 use List::Util qw(shuffle);
 require_mods(qw(DBD::SQLite));
 require PublicInbox::MboxReader;
+require PublicInbox::LeiOverview;
 use_ok 'PublicInbox::LeiToMail';
 my $from = "Content-Length: 10\nSubject: x\n\nFrom hell\n";
 my $noeol = "Subject: x\n\nFrom hell";
@@ -80,8 +81,27 @@ blah
 EOM
 my $fn = "$tmpdir/x.mbox";
 my ($mbox) = shuffle(@MBOX); # pick one, shouldn't matter
+my $wcb_get = sub {
+	my ($fmt, $dst) = @_;
+	delete $lei->{dedupe};
+	$lei->{ovv} = bless {
+		fmt => $fmt,
+		dst => $dst
+	}, 'PublicInbox::LeiOverview';
+	my $l2m = PublicInbox::LeiToMail->new($lei);
+	SKIP: {
+		require_mods('Storable', 1);
+		my $dup = Storable::thaw(Storable::freeze($l2m));
+		is_deeply($dup, $l2m, "$fmt round-trips through storable");
+	}
+	$l2m->do_prepare($lei);
+	my $cb = $l2m->write_cb($lei);
+	delete $lei->{1};
+	$cb;
+};
+
 my $orig = do {
-	my $wcb = PublicInbox::LeiToMail->write_cb("$mbox:$fn", $lei);
+	my $wcb = $wcb_get->($mbox, $fn);
 	is(ref $wcb, 'CODE', 'write_cb returned callback');
 	ok(-f $fn && !-s _, 'empty file created');
 	$wcb->(\(my $dup = $buf), 'deadbeef', [ qw(seen) ]);
@@ -92,13 +112,12 @@ my $orig = do {
 	unlink $fn or BAIL_OUT $!;
 
 	local $lei->{opt} = { jobs => 2 };
-	$wcb = PublicInbox::LeiToMail->write_cb("$mbox:$fn", $lei);
+	$wcb = $wcb_get->($mbox, $fn);
 	ok(-f $fn && !-s _, 'truncated mbox destination');
-	$lei->{dedupe}->prepare_dedupe;
 	$wcb->(\($dup = $buf), 'deadbeef', [ qw(seen) ]);
 	undef $wcb;
 	open $fh, '<', $fn or BAIL_OUT $!;
-	is($raw, do { local $/; <$fh> }, 'jobs > 1');
+	is(do { local $/; <$fh> }, $raw, 'jobs > 1');
 	$raw;
 };
 for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
@@ -109,8 +128,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
 		my $dc_cmd = eval { $zsfx2cmd->($zsfx, 1, $lei) };
 		ok($dc_cmd, "decompressor for .$zsfx");
 		my $f = "$fn.$zsfx";
-		my $dst = "$mbox:$f";
-		my $wcb = PublicInbox::LeiToMail->write_cb($dst, $lei);
+		my $wcb = $wcb_get->($mbox, $f);
 		$wcb->(\(my $dup = $buf), 'deadbeef', [ qw(seen) ]);
 		undef $wcb;
 		my $uncompressed = xqx([@$dc_cmd, $f]);
@@ -118,15 +136,13 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
 
 		local $lei->{opt} = { jobs => 2 }; # for atomic writes
 		unlink $f or BAIL_OUT "unlink $!";
-		$wcb = PublicInbox::LeiToMail->write_cb($dst, $lei);
-		$lei->{dedupe}->prepare_dedupe;
+		$wcb = $wcb_get->($mbox, $f);
 		$wcb->(\($dup = $buf), 'deadbeef', [ qw(seen) ]);
 		undef $wcb;
 		is(xqx([@$dc_cmd, $f]), $orig, "$zsfx matches with lock");
 
 		local $lei->{opt} = { augment => 1 };
-		$wcb = PublicInbox::LeiToMail->write_cb($dst, $lei);
-		$lei->{dedupe}->prepare_dedupe;
+		$wcb = $wcb_get->($mbox, $f);
 		$wcb->(\($dup = $buf . "\nx\n"), 'deadbeef', [ qw(seen) ]);
 		undef $wcb; # commit
 
@@ -138,8 +154,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
 		like($raw[0], qr/\nblah\n\z/s, "original preserved $zsfx");
 
 		local $lei->{opt} = { augment => 1, jobs => 2 };
-		$wcb = PublicInbox::LeiToMail->write_cb($dst, $lei);
-		$lei->{dedupe}->prepare_dedupe;
+		$wcb = $wcb_get->($mbox, $f);
 		$wcb->(\($dup = $buf . "\ny\n"), 'deadbeef', [ qw(seen) ]);
 		undef $wcb; # commit
 
@@ -155,7 +170,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
 
 unlink $fn or BAIL_OUT $!;
 if ('default deduplication uses content_hash') {
-	my $wcb = PublicInbox::LeiToMail->write_cb("mboxo:$fn", $lei);
+	my $wcb = $wcb_get->('mboxo', $fn);
 	$wcb->(\(my $x = $buf), 'deadbeef', []) for (1..2);
 	undef $wcb; # undef to commit changes
 	my $cmp = '';
@@ -164,7 +179,7 @@ if ('default deduplication uses content_hash') {
 	is($cmp, $buf, 'only one message written');
 
 	local $lei->{opt} = { augment => 1 };
-	$wcb = PublicInbox::LeiToMail->write_cb("mboxo:$fn", $lei);
+	$wcb = $wcb_get->('mboxo', $fn);
 	$wcb->(\($x = $buf . "\nx\n"), 'deadbeef', []) for (1..2);
 	undef $wcb; # undef to commit changes
 	open $fh, '<', $fn or BAIL_OUT $!;
@@ -178,7 +193,7 @@ if ('default deduplication uses content_hash') {
 { # stdout support
 	open my $tmp, '+>', undef or BAIL_OUT $!;
 	local $lei->{1} = $tmp;
-	my $wcb = PublicInbox::LeiToMail->write_cb("mboxrd:/dev/stdout", $lei);
+	my $wcb = $wcb_get->('mboxrd', '/dev/stdout');
 	$wcb->(\(my $x = $buf), 'deadbeef', []);
 	undef $wcb; # commit
 	seek($tmp, 0, SEEK_SET) or BAIL_OUT $!;
@@ -192,7 +207,7 @@ SKIP: { # FIFO support
 	my $fn = "$tmpdir/fifo";
 	mkfifo($fn, 0600) or skip("mkfifo not supported: $!", 1);
 	my $cat = popen_rd([which('cat'), $fn]);
-	my $wcb = PublicInbox::LeiToMail->write_cb("mboxo:$fn", $lei);
+	my $wcb = $wcb_get->('mboxo', $fn);
 	$wcb->(\(my $x = $buf), 'deadbeef', []);
 	undef $wcb; # commit
 	my $cmp = '';
@@ -202,22 +217,17 @@ SKIP: { # FIFO support
 
 { # Maildir support
 	my $md = "$tmpdir/maildir/";
-	my $wcb = PublicInbox::LeiToMail->write_cb("Maildir:$md", $lei);
-	ok($wcb, 'got Maildir callback');
-	$lei->{dedupe}->prepare_dedupe;
+	my $wcb = $wcb_get->('maildir', $md);
+	is(ref($wcb), 'CODE', 'got Maildir callback');
 	$wcb->(\(my $x = $buf), 'badc0ffee', []);
-	undef $wcb; # commit
 
 	my @f;
 	PublicInbox::LeiToMail::_maildir_each_file($md, sub { push @f, shift });
-	is(scalar(@f), 1, 'wrote one file');
 	open my $fh, $f[0] or BAIL_OUT $!;
 	is(do { local $/; <$fh> }, $buf, 'wrote to Maildir');
 
-	$wcb = PublicInbox::LeiToMail->write_cb("maildir:$md", $lei);
-	$lei->{dedupe}->prepare_dedupe;
+	$wcb = $wcb_get->('maildir', $md);
 	$wcb->(\($x = $buf."\nx\n"), 'deadcafe', []);
-	undef $wcb; # commit
 
 	my @x = ();
 	PublicInbox::LeiToMail::_maildir_each_file($md, sub { push @x, shift });
@@ -227,11 +237,9 @@ SKIP: { # FIFO support
 	is(do { local $/; <$fh> }, $buf."\nx\n", 'wrote new file to Maildir');
 
 	local $lei->{opt}->{augment} = 1;
-	$wcb = PublicInbox::LeiToMail->write_cb("maildir:$md", $lei);
-	$lei->{dedupe}->prepare_dedupe;
+	$wcb = $wcb_get->('maildir', $md);
 	$wcb->(\($x = $buf."\ny\n"), 'deadcafe', []);
 	$wcb->(\($x = $buf."\ny\n"), 'b4dc0ffee', []); # skipped by dedupe
-	undef $wcb; # commit
 	@f = ();
 	PublicInbox::LeiToMail::_maildir_each_file($md, sub { push @f, shift });
 	is(scalar grep(/\A\Q$x[0]\E\z/, @f), 1, 'old file still there');

^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH 2/4] ipc: children don't kill on DESTROY, reduce FD sharing
  2021-01-16 11:36 [PATCH 0/4] lei q: outputs to Maildir and mbox* working Eric Wong
  2021-01-16 11:36 ` [PATCH 1/4] lei_to_mail: prepare for worker offload Eric Wong
@ 2021-01-16 11:36 ` Eric Wong
  2021-01-16 11:36 ` [PATCH 3/4] lei: q: results output to Maildir and mbox* working Eric Wong
  2021-01-16 11:36 ` [PATCH 4/4] lei: pager: pass correct env in oneshot mode Eric Wong
  3 siblings, 0 replies; 5+ messages in thread
From: Eric Wong @ 2021-01-16 11:36 UTC (permalink / raw)
  To: meta

Children should not be blindly killing siblings on ->DESTROY
since they're typically shorter-lived than parents.  We'll
also be more careful about on-stack variables and now we
can rely exclusively on delete ops to close FDs.

We also need to fix our SIGPIPE handling for the oneshot case
while fixing a typo for delete, so we write "!" to the EOF pipe
to ensure the parent oneshot process exits on the first worker
that hits SIGPIPE, rather than waiting for the last worker to
hit SIGPIPE.
---
 lib/PublicInbox/IPC.pm        | 21 +++++++++++----------
 lib/PublicInbox/LEI.pm        |  1 +
 lib/PublicInbox/LeiXSearch.pm | 11 +++++++++--
 3 files changed, 21 insertions(+), 12 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index fbc91f6f..78cb8400 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -104,11 +104,11 @@ sub ipc_worker_spawn {
 	pipe(my ($r_req, $w_req)) or die "pipe: $!";
 	pipe(my ($r_res, $w_res)) or die "pipe: $!";
 	my $sigset = $oldset // PublicInbox::DS::block_signals();
-	my $parent = $$;
 	$self->ipc_atfork_prepare;
 	defined(my $pid = fork) or die "fork: $!";
 	if ($pid == 0) {
 		eval { PublicInbox::DS->Reset };
+		delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)};
 		$w_req = $r_res = undef;
 		$w_res->autoflush(1);
 		$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
@@ -283,8 +283,7 @@ sub _wq_worker_start ($$) {
 	my $pid = fork // die "fork: $!";
 	if ($pid == 0) {
 		eval { PublicInbox::DS->Reset };
-		close(delete $self->{-wq_s1});
-		delete $self->{qw(-wq_workers -wq_ppid)};
+		delete @$self{qw(-wq_s1 -wq_workers -wq_ppid)};
 		$SIG{$_} = 'IGNORE' for (qw(PIPE TTOU TTIN));
 		$SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT CHLD));
 		local $0 = $self->{-wq_ident};
@@ -306,16 +305,15 @@ sub wq_workers_start {
 	my ($self, $ident, $nr_workers, $oldset) = @_;
 	($enc && $send_cmd && $recv_cmd && defined($SEQPACKET)) or return;
 	return if $self->{-wq_s1}; # idempotent
-	my ($s1, $s2);
-	socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!";
+	$self->{-wq_s1} = $self->{-wq_s2} = undef;
+	socketpair($self->{-wq_s1}, $self->{-wq_s2}, AF_UNIX, $SEQPACKET, 0) or
+		die "socketpair: $!";
 	$self->ipc_atfork_prepare;
 	$nr_workers //= 4;
 	$nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS;
 	my $sigset = $oldset // PublicInbox::DS::block_signals();
 	$self->{-wq_workers} = {};
 	$self->{-wq_ident} = $ident;
-	$self->{-wq_s1} = $s1;
-	$self->{-wq_s2} = $s2;
 	_wq_worker_start($self, $sigset) for (1..$nr_workers);
 	PublicInbox::DS::sig_setmask($sigset) unless $oldset;
 	$self->{-wq_ppid} = $$;
@@ -377,6 +375,7 @@ sub wq_close {
 	my $ppid = delete $self->{-wq_ppid} or return;
 	my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
 	return if $ppid != $$; # can't reap siblings or parents
+	return (keys %$workers) if wantarray; # caller will reap
 	for my $pid (keys %$workers) {
 		dwaitpid($pid, \&ipc_worker_reap, $self);
 	}
@@ -391,9 +390,11 @@ sub wq_kill {
 sub WQ_MAX_WORKERS { $WQ_MAX_WORKERS }
 
 sub DESTROY {
-	wq_kill($_[0]);
-	wq_close($_[0]);
-	ipc_worker_stop($_[0]);
+	my ($self) = @_;
+	my $ppid = $self->{-wq_ppid};
+	wq_kill($self) if $ppid && $ppid == $$;
+	wq_close($self);
+	ipc_worker_stop($self);
 }
 
 1;
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 1f4a3082..5568904d 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -287,6 +287,7 @@ sub atfork_child_wq {
 		$self->x_it(13); # SIGPIPE = 13
 		# we need to close explicitly to avoid Perl warning on SIGPIPE
 		close($_) for (delete @$self{1..2});
+		syswrite($self->{0}, '!') unless $self->{sock}; # for eof_wait
 		die bless(\"$_[0]", 'PublicInbox::SIGPIPE'),
 	});
 }
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 25ded544..8b70167c 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -8,6 +8,7 @@ package PublicInbox::LeiXSearch;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
+use PublicInbox::DS qw(dwaitpid);
 
 sub new {
 	my ($class) = @_;
@@ -181,8 +182,14 @@ sub do_query {
 		$lei_orig->{lxs} = $self;
 		$lei_orig->event_step_init;
 	} else {
-		$self->wq_close;
-		read($eof_wait, my $buf, 1); # wait for close($lei->{0})
+		my @pids = $self->wq_close;
+		# wait for close($lei->{0})
+		if (read($eof_wait, my $buf, 1)) {
+			# if we get a SIGPIPE from one, kill the rest
+			kill('TERM', @pids) if $buf eq '!';
+		}
+		my $ipc_worker_reap = $self->can('ipc_worker_reap');
+		dwaitpid($_, $ipc_worker_reap, $self) for @pids;
 		query_done($lei_orig); # may SIGPIPE
 	}
 }

^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH 3/4] lei: q: results output to Maildir and mbox* working
  2021-01-16 11:36 [PATCH 0/4] lei q: outputs to Maildir and mbox* working Eric Wong
  2021-01-16 11:36 ` [PATCH 1/4] lei_to_mail: prepare for worker offload Eric Wong
  2021-01-16 11:36 ` [PATCH 2/4] ipc: children don't kill on DESTROY, reduce FD sharing Eric Wong
@ 2021-01-16 11:36 ` Eric Wong
  2021-01-16 11:36 ` [PATCH 4/4] lei: pager: pass correct env in oneshot mode Eric Wong
  3 siblings, 0 replies; 5+ messages in thread
From: Eric Wong @ 2021-01-16 11:36 UTC (permalink / raw)
  To: meta

All the augment and deduplication stuff seems to be working
based on unit tests.  OpPipe is a nice general addition that
will probably make future state machines easier.
---
 MANIFEST                       |   1 +
 lib/PublicInbox/LEI.pm         |  27 +++++---
 lib/PublicInbox/LeiDedupe.pm   |   3 +-
 lib/PublicInbox/LeiOverview.pm |  44 ++++++++----
 lib/PublicInbox/LeiQuery.pm    |  14 ++--
 lib/PublicInbox/LeiToMail.pm   |  89 ++++++++++++++++---------
 lib/PublicInbox/LeiXSearch.pm  | 118 ++++++++++++++++++++++++---------
 lib/PublicInbox/OpPipe.pm      |  41 ++++++++++++
 t/lei.t                        |  20 ++++++
 t/lei_to_mail.t                |   4 +-
 10 files changed, 266 insertions(+), 95 deletions(-)
 create mode 100644 lib/PublicInbox/OpPipe.pm

diff --git a/MANIFEST b/MANIFEST
index 0ebdaccc..0de1de4a 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -193,6 +193,7 @@ lib/PublicInbox/NNTPD.pm
 lib/PublicInbox/NNTPdeflate.pm
 lib/PublicInbox/NewsWWW.pm
 lib/PublicInbox/OnDestroy.pm
+lib/PublicInbox/OpPipe.pm
 lib/PublicInbox/Over.pm
 lib/PublicInbox/OverIdx.pm
 lib/PublicInbox/ProcessPipe.pm
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 5568904d..f849c9df 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -256,7 +256,9 @@ sub puts ($;@) { print { shift->{1} } map { "$_\n" } @_ }
 sub out ($;@) { print { shift->{1} } @_ }
 
 sub err ($;@) {
-	print { shift->{2} } @_, (substr($_[-1], -1, 1) eq "\n" ? () : "\n");
+	my $self = shift;
+	my $err = $self->{2} // *STDERR{IO};
+	print $err @_, (substr($_[-1], -1, 1) eq "\n" ? () : "\n");
 }
 
 sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) }
@@ -270,8 +272,11 @@ sub fail ($$;$) {
 
 sub atfork_prepare_wq {
 	my ($self, $wq) = @_;
-	push @{$wq->{-ipc_atfork_child_close}}, @TO_CLOSE_ATFORK_CHILD,
-				grep { defined } @$self{qw(0 1 2 sock)}
+	my $tcafc = $wq->{-ipc_atfork_child_close};
+	push @$tcafc, @TO_CLOSE_ATFORK_CHILD;
+	if (my $sock = $self->{sock}) {
+		push @$tcafc, @$self{qw(0 1 2)}, $sock;
+	}
 }
 
 # usage: my %sig = $lei->atfork_child_wq($wq);
@@ -286,7 +291,9 @@ sub atfork_child_wq {
 	PIPE => sub {
 		$self->x_it(13); # SIGPIPE = 13
 		# we need to close explicitly to avoid Perl warning on SIGPIPE
-		close($_) for (delete @$self{1..2});
+		close(delete $self->{1});
+		# regular files and /dev/null (-c) won't trigger SIGPIPE
+		close(delete $self->{2}) unless (-f $self->{2} || -c _);
 		syswrite($self->{0}, '!') unless $self->{sock}; # for eof_wait
 		die bless(\"$_[0]", 'PublicInbox::SIGPIPE'),
 	});
@@ -641,7 +648,7 @@ sub start_pager {
 	$new_env{MORE} = 'FRX' if $^O eq 'freebsd';
 	pipe(my ($r, $wpager)) or return warn "pipe: $!";
 	my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} };
-	my $pgr = [ undef, @$rdr{1, 2} ];
+	my $pgr = [ undef, @$rdr{1, 2}, $$ ];
 	if (my $sock = $self->{sock}) { # lei(1) process runs it
 		delete @new_env{keys %$env}; # only set iff unset
 		my $buf = "exec 1\0".$pager;
@@ -664,7 +671,7 @@ sub stop_pager {
 	# do not restore original stdout, just close it so we error out
 	close(delete($self->{1})) if $self->{1};
 	my $pid = $pgr->[0];
-	dwaitpid($pid, undef, $self->{sock}) if $pid;
+	dwaitpid($pid, undef, $self->{sock}) if $pid && $pgr->[3] == $$;
 }
 
 sub accept_dispatch { # Listener {post_accept} callback
@@ -706,7 +713,7 @@ sub accept_dispatch { # Listener {post_accept} callback
 sub dclose {
 	my ($self) = @_;
 	delete $self->{lxs}; # stops LeiXSearch queries
-	$self->close; # PublicInbox::DS::close
+	$self->close if $self->{sock}; # PublicInbox::DS::close
 }
 
 # for long-running results
@@ -737,8 +744,10 @@ sub event_step {
 
 sub event_step_init {
 	my ($self) = @_;
-	$self->{sock}->blocking(0);
-	$self->SUPER::new($self->{sock}, EPOLLIN|EPOLLET);
+	if (my $sock = $self->{sock}) { # using DS->EventLoop
+		$sock->blocking(0);
+		$self->SUPER::new($sock, EPOLLIN|EPOLLET);
+	}
 }
 
 sub noop {}
diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm
index 81754361..3f478aa4 100644
--- a/lib/PublicInbox/LeiDedupe.pm
+++ b/lib/PublicInbox/LeiDedupe.pm
@@ -89,8 +89,9 @@ sub true { 1 }
 sub dedupe_none ($) { (\&true, \&true) }
 
 sub new {
-	my ($cls, $lei, $dst) = @_;
+	my ($cls, $lei) = @_;
 	my $dd = $lei->{opt}->{dedupe} // 'content';
+	my $dst = $lei->{ovv}->{dst};
 
 	# allow "none" to bypass Eml->new if writing to directory:
 	return if ($dd eq 'none' && substr($dst // '', -1) eq '/');
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index 9846bc8a..c0b423f6 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -15,6 +15,8 @@ use PublicInbox::MID qw($MID_EXTRACT);
 use PublicInbox::Address qw(pairs);
 use PublicInbox::Config;
 use PublicInbox::Search qw(get_pct);
+use PublicInbox::LeiDedupe;
+use PublicInbox::LeiToMail;
 
 # cf. https://en.wikipedia.org/wiki/JSON_streaming
 my $JSONL = 'ldjson|ndjson|jsonl'; # 3 names for the same thing
@@ -44,7 +46,7 @@ sub new {
 
 	my $fmt = $opt->{'format'};
 	$fmt = lc($fmt) if defined $fmt;
-	if ($dst =~ s/\A([a-z]+)://is) { # e.g. Maildir:/home/user/Mail/
+	if ($dst =~ s/\A([a-z0-9]+)://is) { # e.g. Maildir:/home/user/Mail/
 		my $ofmt = lc $1;
 		$fmt //= $ofmt;
 		return $lei->fail(<<"") if $fmt ne $ofmt;
@@ -52,13 +54,14 @@ sub new {
 
 	}
 	$fmt //= 'json' if $dst eq '/dev/stdout';
-	$fmt //= 'maildir'; # TODO
+	$fmt //= 'maildir';
 
 	if (index($dst, '://') < 0) { # not a URL, so assume path
 		 $dst = File::Spec->canonpath($dst);
 	} # else URL
 
 	my $self = bless { fmt => $fmt, dst => $dst }, $class;
+	$lei->{ovv} = $self;
 	my $json;
 	if ($fmt =~ /\A($JSONL|(?:concat)?json)\z/) {
 		$json = $self->{json} = ref(PublicInbox::Config->json);
@@ -75,11 +78,13 @@ sub new {
 		} else {
 			ovv_out_lk_init($self);
 		}
-	} elsif ($json) {
-		return $lei->fail('JSON formats only output to stdout');
-	} else {
-		return $lei->fail("TODO: $dst -f $fmt");
 	}
+	if (!$json) {
+		# default to the cheapest sort since MUA usually resorts
+		$lei->{opt}->{'sort'} //= 'docid' if $dst ne '/dev/stdout';
+		$lei->{l2m} = PublicInbox::LeiToMail->new($lei);
+	}
+	$lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei);
 	$self;
 }
 
@@ -135,9 +140,13 @@ sub _unbless_smsg {
 
 sub ovv_atexit_child {
 	my ($self, $lei) = @_;
+	if (my $git = delete $self->{git}) {
+		$git->async_wait_all;
+	}
 	if (my $bref = delete $lei->{ovv_buf}) {
+		my $out = $lei->{1} or return;
 		my $lk = $self->lock_for_scope;
-		print { $lei->{1} } $$bref;
+		print $out $$bref;
 	}
 }
 
@@ -167,17 +176,28 @@ sub _json_pretty {
 	qq{  "$k": }.$v;
 }
 
-sub ovv_each_smsg_cb {
-	my ($self, $lei) = @_;
+sub ovv_each_smsg_cb { # runs in wq worker usually
+	my ($self, $lei, $ibxish) = @_;
 	$lei->{ovv_buf} = \(my $buf = '');
 	delete(@$self{qw(lock_path tmp_lk_id)}) unless $lei->{-parallel};
-	my $json = $self->{json}->new;
+	my $json;
 	$lei->{1}->autoflush(1);
-	if ($json) {
+	if (my $pkg = $self->{json}) {
+		$json = $pkg->new;
 		$json->utf8->canonical;
 		$json->ascii(1) if $lei->{opt}->{ascii};
 	}
-	if ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
+	if (my $l2m = $lei->{l2m}) {
+		my $wcb = $l2m->write_cb($lei);
+		my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git
+		$self->{git} = $git; # for ovv_atexit_child
+		my $g2m = $l2m->can('git_to_mail');
+		sub {
+			my ($smsg, $mitem) = @_;
+			my $kw = []; # TODO get from mitem
+			$git->cat_async($smsg->{blob}, $g2m, [ $wcb, $kw ]);
+		};
+	} elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
 		my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
 		sub { # DIY prettiness :P
 			my ($smsg, $mitem) = @_;
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 69d2f9a6..a80d5887 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -23,8 +23,6 @@ sub _vivify_external { # _externals_each callback
 # the main "lei q SEARCH_TERMS" method
 sub lei_q {
 	my ($self, @argv) = @_;
-	my $sto = $self->_lei_store(1);
-	my $cfg = $self->_lei_cfg(1);
 	my $opt = $self->{opt};
 
 	# --local is enabled by default
@@ -32,7 +30,7 @@ sub lei_q {
 	my @srcs;
 	require PublicInbox::LeiXSearch;
 	require PublicInbox::LeiOverview;
-	require PublicInbox::LeiDedupe;
+	PublicInbox::Config->json;
 	my $lxs = PublicInbox::LeiXSearch->new;
 
 	# --external is enabled by default, but allow --no-external
@@ -46,10 +44,10 @@ sub lei_q {
 	$lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
 		// $lxs->wq_workers($j);
 
-	unshift(@srcs, $sto->search) if $opt->{'local'};
 	# no forking workers after this
-	$self->{ovv} = PublicInbox::LeiOverview->new($self);
-	$self->{dd} = PublicInbox::LeiDedupe->new($self);
+	my $ovv = PublicInbox::LeiOverview->new($self) or return;
+	my $sto = $self->_lei_store(1);
+	unshift(@srcs, $sto->search) if $opt->{'local'};
 	my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
 	$mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
 	$mset_opt{qstr} = join(' ', map {;
@@ -69,12 +67,10 @@ sub lei_q {
 			die "unrecognized --sort=$sort\n";
 		}
 	}
-	# $self->out($json->encode(\%mset_opt));
 	# descending docid order
 	$mset_opt{relevance} //= -2 if $opt->{thread};
-	# my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
 	$self->{mset_opt} = \%mset_opt;
-	$self->{ovv}->ovv_begin($self);
+	$ovv->ovv_begin($self);
 	$lxs->do_query($self, \@srcs);
 }
 
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 5d4b7978..744f331d 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -187,8 +187,9 @@ sub zsfx2cmd ($$$) {
 	\@cmd;
 }
 
-sub compress_dst {
-	my ($self, $zsfx, $lei) = @_;
+sub _post_augment_mbox { # open a compressor process
+	my ($self, $lei) = @_;
+	my $zsfx = $self->{zsfx} or return;
 	my $cmd = zsfx2cmd($zsfx, undef, $lei);
 	pipe(my ($r, $w)) or die "pipe: $!";
 	my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2} };
@@ -209,7 +210,9 @@ sub decompress_src ($$$) {
 
 sub dup_src ($) {
 	my ($in) = @_;
-	open my $dup, '+>>&', $in or die "dup: $!";
+	# fileno needed because wq_set_recv_modes only used ">&=" for {1}
+	# and Perl blindly trusts that to reject the '+' (readability flag)
+	open my $dup, '+>>&=', fileno($in) or die "dup: $!";
 	$dup;
 }
 
@@ -321,11 +324,13 @@ sub new {
 	} else {
 		die "bad mail --format=$fmt\n";
 	}
-	my $dedupe = $lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei, $dst);
+	$lei->{dedupe} = PublicInbox::LeiDedupe->new($lei);
 	$self;
 }
 
-sub _prepare_maildir {
+sub _pre_augment_maildir {} # noop
+
+sub _do_augment_maildir {
 	my ($self, $lei) = @_;
 	my $dst = $lei->{ovv}->{dst};
 	if ($lei->{opt}->{augment}) {
@@ -338,6 +343,11 @@ sub _prepare_maildir {
 	} else { # clobber existing Maildir
 		_maildir_each_file($dst, \&_unlink);
 	}
+}
+
+sub _post_augment_maildir {
+	my ($self, $lei) = @_;
+	my $dst = $lei->{ovv}->{dst};
 	for my $x (qw(tmp new cur)) {
 		my $d = $dst.$x;
 		next if -d $d;
@@ -347,45 +357,64 @@ sub _prepare_maildir {
 	}
 }
 
-sub _prepare_mbox {
+sub _pre_augment_mbox {
 	my ($self, $lei) = @_;
 	my $dst = $lei->{ovv}->{dst};
-	my ($out, $seekable);
-	if ($dst eq '/dev/stdout') {
-		$out = $lei->{1};
-	} else {
+	if ($dst ne '/dev/stdout') {
 		my $mode = -p $dst ? '>' : '+>>';
 		if (-f _ && !$lei->{opt}->{augment} and !unlink($dst)) {
 			$! == ENOENT or die "unlink($dst): $!";
 		}
-		open $out, $mode, $dst or die "open($dst): $!";
-		# Perl does SEEK_END even with O_APPEND :<
-		$seekable = seek($out, 0, SEEK_SET);
-		die "seek($dst): $!\n" if !$seekable && $! != ESPIPE;
+		open my $out, $mode, $dst or die "open($dst): $!";
 		$lei->{1} = $out;
 	}
+	# Perl does SEEK_END even with O_APPEND :<
+	$self->{seekable} = seek($lei->{1}, 0, SEEK_SET);
+	if (!$self->{seekable} && $! != ESPIPE && $dst ne '/dev/stdout') {
+		die "seek($dst): $!\n";
+	}
 	state $zsfx_allow = join('|', keys %zsfx2cmd);
-	my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/);
+	($self->{zsfx}) = ($dst =~ /\.($zsfx_allow)\z/);
+}
+
+sub _do_augment_mbox {
+	my ($self, $lei) = @_;
+	return if !$lei->{opt}->{augment};
 	my $dedupe = $lei->{dedupe};
-	if ($lei->{opt}->{augment}) {
-		die "cannot augment $dst, not seekable\n" if !$seekable;
-		if (-s $out && $dedupe && $dedupe->prepare_dedupe) {
-			my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) :
-					dup_src($out);
-			my $fmt = $lei->{ovv}->{fmt};
-			require PublicInbox::MboxReader;
-			PublicInbox::MboxReader->$fmt($rd, \&_augment, $lei);
-		}
-		# maybe some systems don't honor O_APPEND, Perl does this:
-		seek($out, 0, SEEK_END) or die "seek $dst: $!";
-		$dedupe->pause_dedupe if $dedupe;
+	my $dst = $lei->{ovv}->{dst};
+	die "cannot augment $dst, not seekable\n" if !$self->{seekable};
+	my $out = $lei->{1};
+	if (-s $out && $dedupe && $dedupe->prepare_dedupe) {
+		my $zsfx = $self->{zsfx};
+		my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) :
+				dup_src($out);
+		my $fmt = $lei->{ovv}->{fmt};
+		require PublicInbox::MboxReader;
+		PublicInbox::MboxReader->$fmt($rd, \&_augment, $lei);
 	}
-	compress_dst($self, $zsfx, $lei) if $zsfx;
+	# maybe some systems don't honor O_APPEND, Perl does this:
+	seek($out, 0, SEEK_END) or die "seek $dst: $!";
+	$dedupe->pause_dedupe if $dedupe;
+}
+
+sub pre_augment { # fast (1 disk seek), runs in main daemon
+	my ($self, $lei) = @_;
+	# _pre_augment_maildir, _pre_augment_mbox
+	my $m = "_pre_augment_$self->{base_type}";
+	$self->$m($lei);
+}
+
+sub do_augment { # slow, runs in wq worker
+	my ($self, $lei) = @_;
+	# _do_augment_maildir, _do_augment_mbox
+	my $m = "_do_augment_$self->{base_type}";
+	$self->$m($lei);
 }
 
-sub do_prepare {
+sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon
 	my ($self, $lei) = @_;
-	my $m = "_prepare_$self->{base_type}";
+	# _post_augment_maildir, _post_augment_mbox
+	my $m = "_post_augment_$self->{base_type}";
 	$self->$m($lei);
 }
 
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 8b70167c..9563ad63 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -9,6 +9,10 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
 use PublicInbox::DS qw(dwaitpid);
+use PublicInbox::OpPipe;
+use PublicInbox::Import;
+use File::Temp 0.19 (); # 0.19 for ->newdir
+use File::Spec ();
 
 sub new {
 	my ($class) = @_;
@@ -103,9 +107,9 @@ sub query_thread_mset { # for --thread
 	}
 	my $mo = { %{$lei->{mset_opt}} };
 	my $mset;
-	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
-	my $dd = $lei->{dd};
-	$dd->prepare_dedupe;
+	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish);
+	my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing';
+	$dedupe->prepare_dedupe;
 	do {
 		$mset = $srch->mset($mo->{qstr}, $mo);
 		my $ids = $srch->mset_to_artnums($mset, $mo);
@@ -115,7 +119,7 @@ sub query_thread_mset { # for --thread
 		while ($over->expand_thread($ctx)) {
 			for my $n (@{$ctx->{xids}}) {
 				my $smsg = $over->get_art($n) or next;
-				next if $dd->is_smsg_dup($smsg);
+				next if $dedupe->is_smsg_dup($smsg);
 				my $mitem = delete $n2item{$smsg->{num}};
 				$each_smsg->($smsg, $mitem);
 			}
@@ -132,65 +136,113 @@ sub query_mset { # non-parallel for non-"--thread" users
 	my $mo = { %{$lei->{mset_opt}} };
 	my $mset;
 	$self->attach_external($_) for @$srcs;
-	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
-	my $dd = $lei->{dd};
-	$dd->prepare_dedupe;
+	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $self);
+	my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing';
+	$dedupe->prepare_dedupe;
 	do {
 		$mset = $self->mset($mo->{qstr}, $mo);
 		for my $it ($mset->items) {
 			my $smsg = smsg_for($self, $it) or next;
-			next if $dd->is_smsg_dup($smsg);
+			next if $dedupe->is_smsg_dup($smsg);
 			$each_smsg->($smsg, $it);
 		}
 	} while (_mset_more($mset, $mo));
 	$lei->{ovv}->ovv_atexit_child($lei);
 }
 
-sub query_done { # PublicInbox::EOFpipe callback
+sub git {
+	my ($self) = @_;
+	my (%seen, @dirs);
+	my $tmp = File::Temp->newdir('lei_xsrch_git-XXXXXXXX', TMPDIR => 1);
+	for my $ibx (@{$self->{shard2ibx} // []}) {
+		my $d = File::Spec->canonpath($ibx->git->{git_dir});
+		$seen{$d} //= push @dirs, "$d/objects\n"
+	}
+	my $git_dir = $tmp->dirname;
+	PublicInbox::Import::init_bare($git_dir);
+	my $f = "$git_dir/objects/info/alternates";
+	open my $alt, '>', $f or die "open($f): $!";
+	print $alt @dirs or die "print $f: $!";
+	close $alt or die "close $f: $!";
+	my $git = PublicInbox::Git->new($git_dir);
+	$git->{-tmp} = $tmp;
+	$git;
+}
+
+sub query_done { # EOF callback
 	my ($lei) = @_;
 	$lei->{ovv}->ovv_end($lei);
 	$lei->dclose;
 }
 
-sub do_query {
-	my ($self, $lei_orig, $srcs) = @_;
-	my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+sub start_query { # always runs in main (lei-daemon) process
+	my ($self, $io, $lei, $srcs) = @_;
+	if (my $l2m = $lei->{l2m}) {
+		$lei->{1} = $io->[1];
+		$l2m->post_augment($lei);
+		$io->[1] = delete $lei->{1};
+	}
 	my $remotes = $self->{remotes} // [];
-	pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
-	$io[0] = $qry_done; # don't need stdin
-
 	if ($lei->{opt}->{thread}) {
 		$lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1;
 		for my $ibxish (@$srcs) {
-			$self->wq_do('query_thread_mset', \@io, $lei, $ibxish);
+			$self->wq_do('query_thread_mset', $io, $lei, $ibxish);
 		}
 	} else {
 		$lei->{-parallel} = scalar(@$remotes);
-		$self->wq_do('query_mset', \@io, $lei, $srcs);
+		$self->wq_do('query_mset', $io, $lei, $srcs);
 	}
 	# TODO
 	for my $rmt (@$remotes) {
-		$self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
+		$self->wq_do('query_thread_mbox', $io, $lei, $rmt);
 	}
-	@io = ();
-	close $qry_done; # fully closed when children are done
-
-	# query_done will run when query_*mset close $qry_done
-	if ($lei_orig->{sock}) { # watch for client premature exit
-		require PublicInbox::EOFpipe;
-		PublicInbox::EOFpipe->new($eof_wait, \&query_done, $lei_orig);
-		$lei_orig->{lxs} = $self;
-		$lei_orig->event_step_init;
+	close $io->[0]; # qry_status_wr
+	@$io = ();
+}
+
+sub query_prepare { # wq_do
+	my ($self, $lei) = @_;
+	my %sig = $lei->atfork_child_wq($self);
+	local @SIG{keys %sig} = values %sig;
+	if (my $l2m = $lei->{l2m}) {
+		eval { $l2m->do_augment($lei) };
+		return $lei->fail($@) if $@;
+	}
+	# trigger PublicInbox::OpPipe->event_step
+	my $qry_status_wr = $lei->{0} or
+		return $lei->fail('BUG: qry_status_wr missing');
+	$qry_status_wr->autoflush(1);
+	print $qry_status_wr '.' or # this should never fail...
+		return $lei->fail("BUG? print qry_status_wr: $!");
+}
+
+sub do_query {
+	my ($self, $lei_orig, $srcs) = @_;
+	my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+	$io[0] = undef;
+	pipe(my $qry_status_rd, $io[0]) or die "pipe $!";
+
+	$lei_orig->{lxs} = $self;
+	$lei_orig->event_step_init; # wait for shutdowns
+	my $op_map = { '' => [ \&query_done, $lei_orig ] };
+	my $in_loop = exists $lei_orig->{sock};
+	my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop);
+	if (my $l2m = $lei->{l2m}) {
+		$l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox
+		$io[1] = $lei_orig->{1};
+		$op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ];
+		$self->wq_do('query_prepare', \@io, $lei);
+		$opp->event_step if !$in_loop;
 	} else {
+		start_query($self, \@io, $lei, $srcs);
+	}
+	unless ($in_loop) {
 		my @pids = $self->wq_close;
-		# wait for close($lei->{0})
-		if (read($eof_wait, my $buf, 1)) {
-			# if we get a SIGPIPE from one, kill the rest
-			kill('TERM', @pids) if $buf eq '!';
-		}
+		# for the $lei->atfork_child_wq PIPE handler:
+		$op_map->{'!'} = [ \&CORE::kill, 'TERM', @pids ];
+		$opp->event_step;
 		my $ipc_worker_reap = $self->can('ipc_worker_reap');
 		dwaitpid($_, $ipc_worker_reap, $self) for @pids;
-		query_done($lei_orig); # may SIGPIPE
 	}
 }
 
diff --git a/lib/PublicInbox/OpPipe.pm b/lib/PublicInbox/OpPipe.pm
new file mode 100644
index 00000000..295a8aa5
--- /dev/null
+++ b/lib/PublicInbox/OpPipe.pm
@@ -0,0 +1,41 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# bytecode dispatch pipe, reads a byte, runs a sub
+# byte => [ sub, @operands ]
+package PublicInbox::OpPipe;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN);
+
+sub new {
+	my ($cls, $rd, $op_map, $in_loop) = @_;
+	my $self = bless { sock => $rd, op_map => $op_map }, $cls;
+	# 1031: F_SETPIPE_SZ, 4096: page size
+	fcntl($rd, 1031, 4096) if $^O eq 'linux';
+	if ($in_loop) { # iff using DS->EventLoop
+		$rd->blocking(0);
+		$self->SUPER::new($rd, EPOLLIN);
+	}
+	$self;
+}
+
+sub event_step {
+	my ($self) = @_;
+	my $rd = $self->{sock};
+	my $byte;
+	until (defined(sysread($rd, $byte, 1))) {
+		return if $!{EAGAIN};
+		next if $!{EINTR};
+		die "read \$rd: $!";
+	}
+	my $op = $self->{op_map}->{$byte} or die "BUG: unknown byte `$byte'";
+	if ($byte eq '') { # close on EOF
+		$rd->blocking ? delete($self->{sock}) : $self->close;
+	}
+	my ($sub, @args) = @$op;
+	$sub->(@args);
+}
+
+1;
diff --git a/t/lei.t b/t/lei.t
index 2349dca4..c4692217 100644
--- a/t/lei.t
+++ b/t/lei.t
@@ -7,6 +7,7 @@ use Test::More;
 use PublicInbox::TestCommon;
 use PublicInbox::Config;
 use File::Path qw(rmtree);
+use Fcntl qw(SEEK_SET);
 require_git 2.6;
 require_mods(qw(json DBD::SQLite Search::Xapian));
 my $opt = { 1 => \(my $out = ''), 2 => \(my $err = '') };
@@ -188,6 +189,25 @@ my $test_external = sub {
 	# No double-quoting should be imposed on users on the CLI
 	$lei->('q', 's:use boolean prefix');
 	like($out, qr/search: use boolean prefix/, 'phrase search got result');
+
+	$lei->('q', '-o', "mboxcl2:$home/mbox", 's:use boolean prefix');
+	open my $mb, '<', "$home/mbox" or fail "no mbox: $!";
+	my @s = grep(/^Subject:/, <$mb>);
+	is(scalar(@s), 1, '1 result in mbox');
+	$lei->('q', '-a', '-o', "mboxcl2:$home/mbox", 's:see attachment');
+	is($err, '', 'no errors from augment');
+	seek($mb, 0, SEEK_SET) or BAIL_OUT "seek: $!";
+	@s = grep(/^Subject:/, <$mb>);
+	is(scalar(@s), 2, '2 results in mbox');
+
+	$lei->('q', '-a', '-o', "mboxcl2:$home/mbox", 's:nonexistent');
+	is($err, '', 'no errors on no results');
+	seek($mb, 0, SEEK_SET) or BAIL_OUT "seek: $!";
+	my @s2 = grep(/^Subject:/, <$mb>);
+	is_deeply(\@s2, \@s, 'same 2 old results w/ --augment and bad search');
+
+	$lei->('q', '-o', "mboxcl2:$home/mbox", 's:nonexistent');
+	is(-s "$home/mbox", 0, 'clobber w/o --augment');
 };
 
 my $test_lei_common = sub {
diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t
index d5beb3d2..083e0df4 100644
--- a/t/lei_to_mail.t
+++ b/t/lei_to_mail.t
@@ -94,7 +94,9 @@ my $wcb_get = sub {
 		my $dup = Storable::thaw(Storable::freeze($l2m));
 		is_deeply($dup, $l2m, "$fmt round-trips through storable");
 	}
-	$l2m->do_prepare($lei);
+	$l2m->pre_augment($lei);
+	$l2m->do_augment($lei);
+	$l2m->post_augment($lei);
 	my $cb = $l2m->write_cb($lei);
 	delete $lei->{1};
 	$cb;

^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH 4/4] lei: pager: pass correct env in oneshot mode
  2021-01-16 11:36 [PATCH 0/4] lei q: outputs to Maildir and mbox* working Eric Wong
                   ` (2 preceding siblings ...)
  2021-01-16 11:36 ` [PATCH 3/4] lei: q: results output to Maildir and mbox* working Eric Wong
@ 2021-01-16 11:36 ` Eric Wong
  3 siblings, 0 replies; 5+ messages in thread
From: Eric Wong @ 2021-01-16 11:36 UTC (permalink / raw)
  To: meta

We want new environment variables when spawning the
pager from oneshot (non-daemon) mode.
---
 lib/PublicInbox/LEI.pm | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index f849c9df..56254c45 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -656,7 +656,7 @@ sub start_pager {
 		my $fds = [ map { fileno($_) } @$rdr{0..2} ];
 		$send_cmd->($sock, $fds, $buf, MSG_EOR);
 	} else {
-		$pgr->[0] = spawn([$pager], $env, $rdr);
+		$pgr->[0] = spawn([$pager], \%new_env, $rdr);
 	}
 	$self->{1} = $wpager;
 	$self->{2} = $wpager if -t $self->{2};

^ permalink raw reply related	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2021-01-16 11:36 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-01-16 11:36 [PATCH 0/4] lei q: outputs to Maildir and mbox* working Eric Wong
2021-01-16 11:36 ` [PATCH 1/4] lei_to_mail: prepare for worker offload Eric Wong
2021-01-16 11:36 ` [PATCH 2/4] ipc: children don't kill on DESTROY, reduce FD sharing Eric Wong
2021-01-16 11:36 ` [PATCH 3/4] lei: q: results output to Maildir and mbox* working Eric Wong
2021-01-16 11:36 ` [PATCH 4/4] lei: pager: pass correct env in oneshot mode Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).