unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
* [PATCH 00/22] lei query overview views
@ 2021-01-10 12:14 Eric Wong
  2021-01-10 12:14 ` [PATCH 01/22] lei query + pagination sorta working Eric Wong
                   ` (21 more replies)
  0 siblings, 22 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:14 UTC (permalink / raw)
  To: meta

Usage summary:

	lei add-external /path/to/v1-or-v2-inbox
	lei add-external /path/to/another-inbox-or-ext-index
			# URLs aren't supported, yet :<

	lei q SEARCH TERMS GO HERE... # pager should open with JSON output

For faster startup time than what Inline::C can give:

	apt-get install libsocket-msghdr-perl # Socket::Msghdr

Having neither Inline::C nor Socket::Msghdr means parallel
queries won't work.

I went back-and-forth on a bunch of things but ultimately gave
up trying to support IO::FDPass since it got too fragile and
difficult to test with the work-queue distribution.

The pager runs from the client process (if using Socket::MsgHdr
or Inline::C), now.  It took at fair amount of work from my slow
brain to get pager shutdown to be instantaneous, though queries
which haven't output anything aren't easily interruptible...

The wq_* IPC stuff will be reused in the normal read-only
WWW/IMAP search at some point, too.

Eric Wong (22):
  lei query + pagination sorta working
  lei q: deduplicate smsg
  ds: block signals when reaping
  ipc: add support for asynchronous callbacks
  cmd_ipc: send FDs with buffer payload
  ipc: avoid excessive evals
  ipc: work queue support via SOCK_SEQPACKET
  ipc: eliminate ipc_worker_stop method
  ipc: wq: support dynamic worker count change
  ipc: drop -ipc_parent_pid field
  ipc: DESTROY and wq_workers methods
  lei: rename $w to $wpager for warning message
  lei: fix oneshot TTY detection by passing STD*{GLOB}
  lei: query: ensure pager exit is instantaneous
  ipc: start supporting sending/receiving more than 3 FDs
  ipc: fix IO::FDPass use with a worker limit of 1
  ipc: drop unused fields, default sighandlers for wq
  lei: get rid of client {pid} field
  lei: fork + FD cleanup
  lei: run pager in client script
  lei_xsearch: transfer 4 FDs internally, drop IO::FDPass
  lei: query: restore JSON output overview

 MANIFEST                        |   4 +
 lib/PublicInbox/CmdIPC4.pm      |  36 ++++
 lib/PublicInbox/DS.pm           |  16 +-
 lib/PublicInbox/Daemon.pm       |  10 +-
 lib/PublicInbox/ExtSearchIdx.pm |   4 +-
 lib/PublicInbox/IPC.pm          | 280 ++++++++++++++++++++++++++++----
 lib/PublicInbox/LEI.pm          | 180 +++++++++++++-------
 lib/PublicInbox/LeiDedupe.pm    |  29 +++-
 lib/PublicInbox/LeiExternal.pm  |  33 ++--
 lib/PublicInbox/LeiOverview.pm  | 188 +++++++++++++++++++++
 lib/PublicInbox/LeiQuery.pm     |  92 +++++++++++
 lib/PublicInbox/LeiStore.pm     |   2 +-
 lib/PublicInbox/LeiToMail.pm    |   2 +
 lib/PublicInbox/LeiXSearch.pm   | 118 +++++++++++++-
 lib/PublicInbox/Search.pm       |  10 +-
 lib/PublicInbox/SearchView.pm   |  10 +-
 lib/PublicInbox/Sigfd.pm        |  12 +-
 lib/PublicInbox/Spawn.pm        |  85 ++++++----
 lib/PublicInbox/Watch.pm        |   8 +-
 script/lei                      |  76 +++++----
 script/public-inbox-watch       |   4 +-
 t/cmd_ipc.t                     |  82 ++++++++++
 t/ipc.t                         | 115 ++++++++++++-
 t/lei.t                         |  31 +++-
 t/lei_dedupe.t                  |  14 ++
 t/lei_xsearch.t                 |   5 +
 t/spawn.t                       |  33 +---
 27 files changed, 1233 insertions(+), 246 deletions(-)
 create mode 100644 lib/PublicInbox/CmdIPC4.pm
 create mode 100644 lib/PublicInbox/LeiOverview.pm
 create mode 100644 lib/PublicInbox/LeiQuery.pm
 create mode 100644 t/cmd_ipc.t

^ permalink raw reply	[flat|nested] 23+ messages in thread

* [PATCH 01/22] lei query + pagination sorta working
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
@ 2021-01-10 12:14 ` Eric Wong
  2021-01-10 12:14 ` [PATCH 02/22] lei q: deduplicate smsg Eric Wong
                   ` (20 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:14 UTC (permalink / raw)
  To: meta

Parallelism and interactivity with pager + SIGPIPE needs work;
but results are shown and phrase search works without shell
users having to apply Xapian quoting rules on top of standard
shell quoting.
---
 MANIFEST                       |   1 +
 lib/PublicInbox/LEI.pm         |  12 +--
 lib/PublicInbox/LeiExternal.pm |  33 ++++---
 lib/PublicInbox/LeiQuery.pm    | 176 +++++++++++++++++++++++++++++++++
 lib/PublicInbox/LeiStore.pm    |   2 +-
 lib/PublicInbox/LeiToMail.pm   |   2 +
 lib/PublicInbox/LeiXSearch.pm  |  22 ++++-
 lib/PublicInbox/Search.pm      |  10 +-
 lib/PublicInbox/SearchView.pm  |  10 +-
 t/lei.t                        |  11 ++-
 t/lei_xsearch.t                |   5 +
 11 files changed, 250 insertions(+), 34 deletions(-)
 create mode 100644 lib/PublicInbox/LeiQuery.pm

diff --git a/MANIFEST b/MANIFEST
index 6dc08f01..609160dd 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -165,6 +165,7 @@ lib/PublicInbox/KQNotify.pm
 lib/PublicInbox/LEI.pm
 lib/PublicInbox/LeiDedupe.pm
 lib/PublicInbox/LeiExternal.pm
+lib/PublicInbox/LeiQuery.pm
 lib/PublicInbox/LeiSearch.pm
 lib/PublicInbox/LeiStore.pm
 lib/PublicInbox/LeiToMail.pm
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 9c3308ad..a5658e6d 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -8,7 +8,8 @@
 package PublicInbox::LEI;
 use strict;
 use v5.10.1;
-use parent qw(PublicInbox::DS PublicInbox::LeiExternal);
+use parent qw(PublicInbox::DS PublicInbox::LeiExternal
+	PublicInbox::LeiQuery);
 use Getopt::Long ();
 use Socket qw(AF_UNIX SOCK_STREAM pack_sockaddr_un);
 use Errno qw(EAGAIN ECONNREFUSED ENOENT);
@@ -80,7 +81,7 @@ sub _config_path ($) {
 our %CMD = ( # sorted in order of importance/use:
 'q' => [ 'SEARCH_TERMS...', 'search for messages matching terms', qw(
 	save-as=s output|mfolder|o=s format|f=s dedupe|d=s thread|t augment|a
-	sort|s=s@ reverse|r offset=i remote local! external!
+	sort|s=s reverse|r offset=i remote local! external! pretty
 	since|after=s until|before=s), opt_dash('limit|n=i', '[0-9]+') ],
 
 'show' => [ 'MID|OID', 'show a given object (Message-ID or object ID)',
@@ -202,8 +203,9 @@ my %OPTDESC = (
 'limit|n=i@' => ['NUM', 'limit on number of matches (default: 10000)' ],
 'offset=i' => ['OFF', 'search result offset (default: 0)'],
 
-'sort|s=s@' => [ 'VAL|internaldate,date,relevance,docid',
+'sort|s=s' => [ 'VAL|received,relevance,docid',
 		"order of results `--output'-dependent"],
+'reverse|r' => [ 'reverse search results' ], # like sort(1)
 
 'boost=i' => 'increase/decrease priority of results (default: 0)',
 
@@ -469,10 +471,6 @@ sub lei_show {
 	my ($self, @argv) = @_;
 }
 
-sub lei_query {
-	my ($self, @argv) = @_;
-}
-
 sub lei_mark {
 	my ($self, @argv) = @_;
 }
diff --git a/lib/PublicInbox/LeiExternal.pm b/lib/PublicInbox/LeiExternal.pm
index 4facd451..64faf5a0 100644
--- a/lib/PublicInbox/LeiExternal.pm
+++ b/lib/PublicInbox/LeiExternal.pm
@@ -8,24 +8,35 @@ use v5.10.1;
 use parent qw(Exporter);
 our @EXPORT = qw(lei_ls_external lei_add_external lei_forget_external);
 
-sub lei_ls_external {
-	my ($self, @argv) = @_;
-	my $stor = $self->_lei_store(0);
+sub _externals_each {
+	my ($self, $cb, @arg) = @_;
 	my $cfg = $self->_lei_cfg(0);
-	my $out = $self->{1};
-	my ($OFS, $ORS) = $self->{opt}->{z} ? ("\0", "\0\0") : (" ", "\n");
-	my (%boost, @loc);
+	my %boost;
 	for my $sec (grep(/\Aexternal\./, @{$cfg->{-section_order}})) {
 		my $loc = substr($sec, length('external.'));
 		$boost{$loc} = $cfg->{"$sec.boost"};
-		push @loc, $loc;
 	}
-	use sort 'stable';
+	return \%boost if !wantarray && !$cb;
+
 	# highest boost first, but stable for alphabetic tie break
-	for (sort { $boost{$b} <=> $boost{$a} } sort keys %boost) {
-		# TODO: use miscidx and show docid so forget/set is easier
-		print $out $_, $OFS, 'boost=', $boost{$_}, $ORS;
+	use sort 'stable';
+	my @order = sort { $boost{$b} <=> $boost{$a} } sort keys %boost;
+	return @order if !$cb;
+	for my $loc (@order) {
+		$cb->(@arg, $loc, $boost{$loc});
 	}
+	@order; # scalar or array
+}
+
+sub lei_ls_external {
+	my ($self, @argv) = @_;
+	my $stor = $self->_lei_store(0);
+	my $out = $self->{1};
+	my ($OFS, $ORS) = $self->{opt}->{z} ? ("\0", "\0\0") : (" ", "\n");
+	$self->_externals_each(sub {
+		my ($loc, $boost_val) = @_;
+		print $out $loc, $OFS, 'boost=', $boost_val, $ORS;
+	});
 }
 
 sub lei_add_external {
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
new file mode 100644
index 00000000..d14da1bc
--- /dev/null
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -0,0 +1,176 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# handles lei <q|ls-query|rm-query|mv-query> commands
+package PublicInbox::LeiQuery;
+use strict;
+use v5.10.1;
+use PublicInbox::MID qw($MID_EXTRACT);
+use POSIX qw(strftime);
+use PublicInbox::Address qw(pairs);
+use PublicInbox::Search qw(get_pct);
+
+sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
+
+# prepares an smsg for JSON
+sub _smsg_unbless ($) {
+	my ($smsg) = @_;
+
+	delete @$smsg{qw(lines bytes)};
+	$smsg->{rcvd} = _iso8601(delete $smsg->{ts}); # JMAP receivedAt
+	$smsg->{dt} = _iso8601(delete $smsg->{ds}); # JMAP UTCDate
+
+	if (my $r = delete $smsg->{references}) {
+		$smsg->{references} = [
+				map { "<$_>" } ($r =~ m/$MID_EXTRACT/go) ];
+	}
+	if (my $m = delete($smsg->{mid})) {
+		$smsg->{'m'} = "<$m>";
+	}
+	# XXX breaking to/cc, into structured arrays or tables which
+	# distinguish "$phrase <$address>" causes pretty printing JSON
+	# to take up too much vertical space.  I can't get either
+	# Cpanel::JSON::XS or JSON::XS or jq(1) only indent when
+	# wrapping is necessary, rather than blindly indenting and
+	# adding vertical space everywhere.
+	for my $f (qw(from to cc)) {
+		my $v = delete $smsg->{$f} or next;
+		$smsg->{substr($f, 0, 1)} = $v;
+	}
+	$smsg->{'s'} = delete $smsg->{subject};
+	# can we be bothered to parse From/To/Cc into arrays?
+	scalar { %$smsg }; # unbless
+}
+
+sub _vivify_external { # _externals_each callback
+	my ($src, $dir) = @_;
+	if (-f "$dir/ei.lock") {
+		require PublicInbox::ExtSearch;
+		push @$src, PublicInbox::ExtSearch->new($dir);
+	} elsif (-f "$dir/inbox.lock" || -d "$dir/public-inbox") { # v2, v1
+		require PublicInbox::Inbox;
+		push @$src, bless { inboxdir => $dir }, 'PublicInbox::Inbox';
+	} else {
+		warn "W: ignoring $dir, unable to determine type\n";
+	}
+}
+
+# the main "lei q SEARCH_TERMS" method
+sub lei_q {
+	my ($self, @argv) = @_;
+	my $sto = $self->_lei_store(1);
+	my $cfg = $self->_lei_cfg(1);
+	my $opt = $self->{opt};
+	my $qstr = join(' ', map {;
+		# Consider spaces in argv to be for phrase search in Xapian.
+		# In other words, the users should need only care about
+		# normal shell quotes and not have to learn Xapian quoting.
+		/\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_
+	} @argv);
+	$opt->{limit} //= 10000;
+	my $lxs;
+
+	# --local is enabled by default
+	my @src = $opt->{'local'} ? ($sto->search) : ();
+
+	# --external is enabled by default, but allow --no-external
+	if ($opt->{external} // 1) {
+		$self->_externals_each(\&_vivify_external, \@src);
+		# {tid} is not unique between indices, so we have to search
+		# each src individually
+		if (!$opt->{thread}) {
+			require PublicInbox::LeiXSearch;
+			my $lxs = PublicInbox::LeiXSearch->new;
+			# local is always first
+			$lxs->attach_external($_) for @src;
+			@src = ($lxs);
+		}
+	}
+	my $out = $self->{output} // '-';
+	$out = 'json:/dev/stdout' if $out eq '-';
+	my $isatty = -t $self->{1};
+	$self->start_pager if $isatty;
+	my $json = substr($out, 0, 5) eq 'json:' ?
+		ref(PublicInbox::Config->json)->new : undef;
+	if ($json) {
+		if ($opt->{pretty} //= $isatty) {
+			$json->pretty(1)->space_before(0);
+			$json->indent_length($opt->{indent} // 2);
+		}
+		$json->utf8; # avoid Wide character in print warnings
+		$json->ascii(1) if $opt->{ascii}; # for "\uXXXX"
+		$json->canonical;
+	}
+
+	# src: LeiXSearch || LeiSearch || Inbox
+	my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
+	delete $mset_opt{limit} if $opt->{limit} < 0;
+	$mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
+	if (defined(my $sort = $opt->{'sort'})) {
+		if ($sort eq 'relevance') {
+			$mset_opt{relevance} = 1;
+		} elsif ($sort eq 'docid') {
+			$mset_opt{relevance} = $mset_opt{asc} ? -1 : -2;
+		} elsif ($sort =~ /\Areceived(?:-?[aA]t)?\z/) {
+			# the default
+		} else {
+			die "unrecognized --sort=$sort\n";
+		}
+	}
+	# $self->out($json->encode(\%mset_opt));
+	# descending docid order
+	$mset_opt{relevance} //= -2 if $opt->{thread};
+	# my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
+
+	# even w/o pretty, do the equivalent of a --pretty=oneline
+	# output so "lei q SEARCH_TERMS | wc -l" can be useful:
+	my $ORS = $json ? ($opt->{pretty} ? ', ' : ",\n") : "\n";
+	my $buf;
+
+	# we can generate too many records to hold in RAM, so we stream
+	# and fake a JSON array starting here:
+	$self->out('[') if $json;
+	my $emit_cb = sub {
+		my ($smsg) = @_;
+		delete @$smsg{qw(tid num)}; # only makes sense if single src
+		chomp($buf = $json->encode(_smsg_unbless($smsg)));
+	};
+	for my $src (@src) {
+		my $srch = $src->search;
+		my $over = $src->over;
+		my $smsg_for = $src->can('smsg_for'); # LeiXSearch
+		my $mo = { %mset_opt };
+		my $mset = $srch->mset($qstr, $mo);
+		my $ctx = {};
+		if ($smsg_for) {
+			for my $it ($mset->items) {
+				my $smsg = $smsg_for->($srch, $it) or next;
+				$self->out($buf .= $ORS) if defined $buf;
+				$smsg->{relevance} = get_pct($it);
+				$emit_cb->($smsg);
+			}
+		} else { # --thread
+			my $ids = $srch->mset_to_artnums($mset, $mo);
+			$ctx->{ids} = $ids;
+			my $i = 0;
+			my %n2p = map {
+				($ids->[$i++], get_pct($_));
+			} $mset->items;
+			undef $mset;
+			while ($over && $over->expand_thread($ctx)) {
+				for my $n (@{$ctx->{xids}}) {
+					my $t = $over->get_art($n) or next;
+					if (my $p = delete $n2p{$t->{num}}) {
+						$t->{relevance} = $p;
+					}
+					$self->out($buf .= $ORS);
+					$emit_cb->($t);
+				}
+				@{$ctx->{xids}} = ();
+			}
+		}
+	}
+	$self->out($buf .= "]\n"); # done
+}
+
+1;
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 7cda7e44..a7d7d953 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -23,7 +23,7 @@ sub new {
 	my (undef, $dir, $opt) = @_;
 	my $eidx = PublicInbox::ExtSearchIdx->new($dir, $opt);
 	my $self = bless { priv_eidx => $eidx }, __PACKAGE__;
-	eidx_init($self) if $opt->{creat};
+	eidx_init($self)->done if $opt->{creat};
 	$self;
 }
 
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 851c015b..4c65dce2 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -354,6 +354,8 @@ sub write_cb { # returns a callback for git_to_mail
 		_mbox_write_cb($cls, $1, $dst, $lei);
 	} elsif ($dst =~ s!\A[Mm]aildir:!!) { # typically capitalized
 		_maildir_write_cb($dst, $lei);
+	} else {
+		undef;
 	}
 	# TODO: Maildir, MH, IMAP, JMAP ...
 }
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 33e9c413..b670bc2f 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -20,9 +20,16 @@ sub new {
 
 sub attach_external {
 	my ($self, $ibxish) = @_; # ibxish = ExtSearch or Inbox
-	if (!$ibxish->can('over')) {
-		push @{$self->{remotes}}, $ibxish
+
+	if (!$ibxish->can('over') || !$ibxish->over) {
+		return push(@{$self->{remotes}}, $ibxish)
 	}
+	my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
+	my $srch = $ibxish->search or
+		return warn("$desc not indexed for Xapian\n");
+	my @shards = $srch->xdb_shards_flat or
+		return warn("$desc has no Xapian shardsXapian\n");
+
 	if (delete $self->{xdb}) { # XXX: do we need this?
 		# clobber existing {xdb} if amending
 		my $expect = delete $self->{nshard};
@@ -41,13 +48,18 @@ sub attach_external {
 		$nr == $expect or die
 			"BUG: reloaded $nr shards, expected $expect"
 	}
-	my @shards = $ibxish->search->xdb_shards_flat;
 	push @{$self->{shards_flat}}, @shards;
 	push(@{$self->{shard2ibx}}, $ibxish) for (@shards);
 }
 
+# returns a list of local inboxes (or count in scalar context)
+sub locals {
+	my %uniq = map {; "$_" => $_ } @{$_[0]->{shard2ibx} // []};
+	values %uniq;
+}
+
 # called by PublicInbox::Search::xdb
-sub xdb_shards_flat { @{$_[0]->{shards_flat}} }
+sub xdb_shards_flat { @{$_[0]->{shards_flat} // []} }
 
 # like over->get_art
 sub smsg_for {
@@ -69,4 +81,6 @@ sub recent {
 	$self->mset($qstr //= 'bytes:1..', $opt);
 }
 
+sub over {}
+
 1;
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index 0bdf6fc6..7f68ee01 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -6,7 +6,7 @@
 package PublicInbox::Search;
 use strict;
 use parent qw(Exporter);
-our @EXPORT_OK = qw(retry_reopen int_val);
+our @EXPORT_OK = qw(retry_reopen int_val get_pct);
 use List::Util qw(max);
 
 # values for searching, changing the numeric value breaks
@@ -424,4 +424,12 @@ sub int_val ($$) {
 	sortable_unserialise($val) + 0; # PV => IV conversion
 }
 
+sub get_pct ($) { # mset item
+	# Capped at "99%" since "100%" takes an extra column in the
+	# thread skeleton view.  <xapian/mset.h> says the value isn't
+	# very meaningful, anyways.
+	my $n = $_[0]->get_percent;
+	$n > 99 ? 99 : $n;
+}
+
 1;
diff --git a/lib/PublicInbox/SearchView.pm b/lib/PublicInbox/SearchView.pm
index 6b36f795..d50d3cf6 100644
--- a/lib/PublicInbox/SearchView.pm
+++ b/lib/PublicInbox/SearchView.pm
@@ -14,7 +14,7 @@ use PublicInbox::WwwAtomStream;
 use PublicInbox::WwwStream qw(html_oneshot);
 use PublicInbox::SearchThread;
 use PublicInbox::SearchQuery;
-use PublicInbox::Search;
+use PublicInbox::Search qw(get_pct);
 my %rmap_inc;
 
 sub mbox_results {
@@ -276,14 +276,6 @@ sub sort_relevance {
 	} @{$_[0]} ]
 }
 
-sub get_pct ($) {
-	# Capped at "99%" since "100%" takes an extra column in the
-	# thread skeleton view.  <xapian/mset.h> says the value isn't
-	# very meaningful, anyways.
-	my $n = $_[0]->get_percent;
-	$n > 99 ? 99 : $n;
-}
-
 sub mset_thread {
 	my ($ctx, $mset, $q) = @_;
 	my $ibx = $ctx->{ibx};
diff --git a/t/lei.t b/t/lei.t
index 6d47e307..72c50308 100644
--- a/t/lei.t
+++ b/t/lei.t
@@ -122,7 +122,7 @@ my $setup_publicinboxes = sub {
 	return if $done eq $home;
 	use PublicInbox::InboxWritable;
 	for my $V (1, 2) {
-		run_script([qw(-init -Lmedium), "-V$V", "t$V",
+		run_script([qw(-init), "-V$V", "t$V",
 				'--newsgroup', "t.$V",
 				"$home/t$V", "http://example.com/t$V",
 				"t$V\@example.com" ]) or BAIL_OUT "init v$V";
@@ -175,6 +175,15 @@ my $test_external = sub {
 	});
 	$lei->('ls-external');
 	like($out, qr/boost=0\n/s, 'ls-external has output');
+
+	# note, on a Bourne shell users should be able to use either:
+	#	s:"use boolean prefix"
+	#	"s:use boolean prefix"
+	# or use single quotes, it should not matter.  Users only need
+	# to know shell quoting rules, not Xapian quoting rules.
+	# No double-quoting should be imposed on users on the CLI
+	$lei->('q', 's:use boolean prefix');
+	like($out, qr/search: use boolean prefix/, 'phrase search got result');
 };
 
 my $test_lei_common = sub {
diff --git a/t/lei_xsearch.t b/t/lei_xsearch.t
index 3774b4c1..8b03c1f2 100644
--- a/t/lei_xsearch.t
+++ b/t/lei_xsearch.t
@@ -70,4 +70,9 @@ my $max = max(map { $_->{docid} } @msgs);
 is($lxs->smsg_for(($mset->items)[0])->{docid}, $max,
 	'got highest docid');
 
+my @ibxish = $lxs->locals;
+is(scalar(@ibxish), scalar(@ibx) + 1, 'got locals back');
+is($lxs->search, $lxs, '->search works');
+is($lxs->over, undef, '->over fails');
+
 done_testing;

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 02/22] lei q: deduplicate smsg
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
  2021-01-10 12:14 ` [PATCH 01/22] lei query + pagination sorta working Eric Wong
@ 2021-01-10 12:14 ` Eric Wong
  2021-01-10 12:15 ` [PATCH 03/22] ds: block signals when reaping Eric Wong
                   ` (19 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:14 UTC (permalink / raw)
  To: meta

We don't want duplicate messages in results overviews, either.
---
 lib/PublicInbox/LeiDedupe.pm | 29 ++++++++++++++++++++++++++++-
 lib/PublicInbox/LeiQuery.pm  |  5 +++++
 t/lei_dedupe.t               | 14 ++++++++++++++
 3 files changed, 47 insertions(+), 1 deletion(-)

diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm
index c4e5dffb..58eee533 100644
--- a/lib/PublicInbox/LeiDedupe.pm
+++ b/lib/PublicInbox/LeiDedupe.pm
@@ -33,12 +33,24 @@ sub _regen_oid ($) {
 
 sub _oidbin ($) { defined($_[0]) ? pack('H*', $_[0]) : undef }
 
+sub smsg_hash ($) {
+	my ($smsg) = @_;
+	my $dig = Digest::SHA->new(256);
+	my $x = join("\0", @$smsg{qw(from to cc ds subject references mid)});
+	utf8::encode($x);
+	$dig->add($x);
+	$dig->digest;
+}
+
 # the paranoid option
 sub dedupe_oid () {
 	my $skv = PublicInbox::SharedKV->new;
 	($skv, sub { # may be called in a child process
 		my ($eml, $oid) = @_;
 		$skv->set_maybe(_oidbin($oid) // _regen_oid($eml), '');
+	}, sub {
+		my ($smsg) = @_;
+		$skv->set_maybe(_oidbin($smsg->{blob}), '');
 	});
 }
 
@@ -51,6 +63,12 @@ sub dedupe_mid () {
 		my $mid = $eml->header_raw('Message-ID') // _oidbin($oid) //
 			content_hash($eml);
 		$skv->set_maybe($mid, '');
+	}, sub {
+		my ($smsg) = @_;
+		my $mid = $smsg->{mid};
+		$mid = undef if $mid eq '';
+		$mid //= smsg_hash($smsg) // _oidbin($smsg->{blob});
+		$skv->set_maybe($mid, '');
 	});
 }
 
@@ -60,11 +78,15 @@ sub dedupe_content () {
 	($skv, sub { # may be called in a child process
 		my ($eml) = @_; # oid = $_[1], ignored
 		$skv->set_maybe(content_hash($eml), '');
+	}, sub {
+		my ($smsg) = @_;
+		$skv->set_maybe(smsg_hash($smsg), '');
 	});
 }
 
 # no deduplication at all
-sub dedupe_none () { (undef, sub { 1 }) }
+sub true { 1 }
+sub dedupe_none () { (undef, \&true, \&true) }
 
 sub new {
 	my ($cls, $lei, $dst) = @_;
@@ -85,6 +107,11 @@ sub is_dup {
 	!$self->[1]->($eml, $oid);
 }
 
+sub is_smsg_dup {
+	my ($self, $smsg) = @_;
+	!$self->[2]->($smsg);
+}
+
 sub prepare_dedupe {
 	my ($self) = @_;
 	my $skv = $self->[0];
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index d14da1bc..f69dccad 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -69,6 +69,8 @@ sub lei_q {
 	} @argv);
 	$opt->{limit} //= 10000;
 	my $lxs;
+	require PublicInbox::LeiDedupe;
+	my $dd = PublicInbox::LeiDedupe->new($self);
 
 	# --local is enabled by default
 	my @src = $opt->{'local'} ? ($sto->search) : ();
@@ -135,6 +137,7 @@ sub lei_q {
 		delete @$smsg{qw(tid num)}; # only makes sense if single src
 		chomp($buf = $json->encode(_smsg_unbless($smsg)));
 	};
+	$dd->prepare_dedupe;
 	for my $src (@src) {
 		my $srch = $src->search;
 		my $over = $src->over;
@@ -145,6 +148,7 @@ sub lei_q {
 		if ($smsg_for) {
 			for my $it ($mset->items) {
 				my $smsg = $smsg_for->($srch, $it) or next;
+				next if $dd->is_smsg_dup($smsg);
 				$self->out($buf .= $ORS) if defined $buf;
 				$smsg->{relevance} = get_pct($it);
 				$emit_cb->($smsg);
@@ -160,6 +164,7 @@ sub lei_q {
 			while ($over && $over->expand_thread($ctx)) {
 				for my $n (@{$ctx->{xids}}) {
 					my $t = $over->get_art($n) or next;
+					next if $dd->is_smsg_dup($t);
 					if (my $p = delete $n2p{$t->{num}}) {
 						$t->{relevance} = $p;
 					}
diff --git a/t/lei_dedupe.t b/t/lei_dedupe.t
index b5e2b8f9..6e971b9b 100644
--- a/t/lei_dedupe.t
+++ b/t/lei_dedupe.t
@@ -6,12 +6,16 @@ use v5.10.1;
 use Test::More;
 use PublicInbox::TestCommon;
 use PublicInbox::Eml;
+use PublicInbox::Smsg;
 require_mods(qw(DBD::SQLite));
 use_ok 'PublicInbox::LeiDedupe';
 my $eml = eml_load('t/plack-qp.eml');
 my $mid = $eml->header_raw('Message-ID');
 my $different = eml_load('t/msg_iter-order.eml');
 $different->header_set('Message-ID', $mid);
+my $smsg = bless { ds => time }, 'PublicInbox::Smsg';
+$smsg->populate($eml);
+$smsg->{$_} //= '' for (qw(to cc references)) ;
 
 my $lei = { opt => { dedupe => 'none' } };
 my $dd = PublicInbox::LeiDedupe->new($lei);
@@ -19,6 +23,8 @@ $dd->prepare_dedupe;
 ok(!$dd->is_dup($eml), '1st is_dup w/o dedupe');
 ok(!$dd->is_dup($eml), '2nd is_dup w/o dedupe');
 ok(!$dd->is_dup($different), 'different is_dup w/o dedupe');
+ok(!$dd->is_smsg_dup($smsg), 'smsg dedupe none 1');
+ok(!$dd->is_smsg_dup($smsg), 'smsg dedupe none 2');
 
 for my $strat (undef, 'content') {
 	$lei->{opt}->{dedupe} = $strat;
@@ -28,6 +34,8 @@ for my $strat (undef, 'content') {
 	ok(!$dd->is_dup($eml), "1st is_dup with $desc dedupe");
 	ok($dd->is_dup($eml), "2nd seen with $desc dedupe");
 	ok(!$dd->is_dup($different), "different is_dup with $desc dedupe");
+	ok(!$dd->is_smsg_dup($smsg), "is_smsg_dup pass w/ $desc dedupe");
+	ok($dd->is_smsg_dup($smsg), "is_smsg_dup reject w/ $desc dedupe");
 }
 $lei->{opt}->{dedupe} = 'bogus';
 eval { PublicInbox::LeiDedupe->new($lei) };
@@ -39,6 +47,8 @@ $dd->prepare_dedupe;
 ok(!$dd->is_dup($eml), '1st is_dup with mid dedupe');
 ok($dd->is_dup($eml), '2nd seen with mid dedupe');
 ok($dd->is_dup($different), 'different seen with mid dedupe');
+ok(!$dd->is_smsg_dup($smsg), 'smsg mid dedupe pass');
+ok($dd->is_smsg_dup($smsg), 'smsg mid dedupe reject');
 
 $lei->{opt}->{dedupe} = 'oid';
 $dd = PublicInbox::LeiDedupe->new($lei);
@@ -56,4 +66,8 @@ ok($dd->is_dup($different, '01d'), 'different content ignored if oid matches');
 ok($dd->is_dup($eml, '01D'), 'case insensitive oid comparison :P');
 ok(!$dd->is_dup($eml, '01dbad'), 'case insensitive oid comparison :P');
 
+$smsg->{blob} = 'dead';
+ok(!$dd->is_smsg_dup($smsg), 'smsg dedupe pass');
+ok($dd->is_smsg_dup($smsg), 'smsg dedupe reject');
+
 done_testing;

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 03/22] ds: block signals when reaping
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
  2021-01-10 12:14 ` [PATCH 01/22] lei query + pagination sorta working Eric Wong
  2021-01-10 12:14 ` [PATCH 02/22] lei q: deduplicate smsg Eric Wong
@ 2021-01-10 12:15 ` Eric Wong
  2021-01-10 12:15 ` [PATCH 04/22] ipc: add support for asynchronous callbacks Eric Wong
                   ` (18 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

This lets us call dwaitpid long before a process exits
and not have to wait around for it.

This is advantageous for lei where we can run dwaitpid on the
pager as soon as we spawn it, instead of waiting for a client
socket to go away on DESTROY.
---
 lib/PublicInbox/DS.pm           | 16 +++++++++++++---
 lib/PublicInbox/Daemon.pm       | 10 +++++-----
 lib/PublicInbox/ExtSearchIdx.pm |  4 ++--
 lib/PublicInbox/IPC.pm          |  6 +++---
 lib/PublicInbox/LEI.pm          | 10 ++++------
 lib/PublicInbox/Sigfd.pm        | 12 +-----------
 lib/PublicInbox/Watch.pm        |  8 ++++----
 script/public-inbox-watch       |  4 ++--
 t/spawn.t                       |  4 ++--
 9 files changed, 36 insertions(+), 38 deletions(-)

diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index 8a560ae8..40994fd4 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -24,7 +24,7 @@ use strict;
 use v5.10.1;
 use parent qw(Exporter);
 use bytes;
-use POSIX qw(WNOHANG);
+use POSIX qw(WNOHANG sigprocmask SIG_SETMASK);
 use IO::Handle qw();
 use Fcntl qw(SEEK_SET :DEFAULT O_APPEND);
 use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
@@ -202,6 +202,16 @@ sub RunTimers {
     ($LoopTimeout < 0 || $LoopTimeout >= $timeout) ? $timeout : $LoopTimeout;
 }
 
+sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" }
+
+sub block_signals () {
+	my $oldset = POSIX::SigSet->new;
+	my $newset = POSIX::SigSet->new;
+	$newset->fillset or die "fillset: $!";
+	sig_setmask($newset, $oldset);
+	$oldset;
+}
+
 # We can't use waitpid(-1) safely here since it can hit ``, system(),
 # and other things.  So we scan the $wait_pids list, which is hopefully
 # not too big.  We keep $wait_pids small by not calling dwaitpid()
@@ -211,6 +221,7 @@ sub reap_pids {
 	$reap_armed = undef;
 	my $tmp = $wait_pids or return;
 	$wait_pids = undef;
+	my $oldset = block_signals();
 	foreach my $ary (@$tmp) {
 		my ($pid, $cb, $arg) = @$ary;
 		my $ret = waitpid($pid, WNOHANG);
@@ -225,8 +236,7 @@ sub reap_pids {
 			warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?";
 		}
 	}
-	# we may not be done, yet, and could've missed/masked a SIGCHLD:
-	$reap_armed //= requeue(\&reap_pids) if $wait_pids;
+	sig_setmask($oldset);
 }
 
 # reentrant SIGCHLD handler (since reap_pids is not reentrant)
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index 4dcb5fb6..4b738b7c 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -77,7 +77,7 @@ sub accept_tls_opt ($) {
 sub daemon_prepare ($) {
 	my ($default_listen) = @_;
 	my $listener_names = {}; # sockname => IO::Handle
-	$oldset = PublicInbox::Sigfd::block_signals();
+	$oldset = PublicInbox::DS::block_signals();
 	@CMD = ($0, @ARGV);
 	my ($prog) = ($CMD[0] =~ m!([^/]+)\z!g);
 	my $help = <<EOF;
@@ -515,7 +515,7 @@ EOF
 	};
 	my $sigfd = PublicInbox::Sigfd->new($sig, 0);
 	local %SIG = (%SIG, %$sig) if !$sigfd;
-	PublicInbox::Sigfd::sig_setmask($oldset) if !$sigfd;
+	PublicInbox::DS::sig_setmask($oldset) if !$sigfd;
 	while (1) { # main loop
 		my $n = scalar keys %pids;
 		unless (@listeners) {
@@ -531,7 +531,7 @@ EOF
 		}
 		my $want = $worker_processes - 1;
 		if ($n <= $want) {
-			PublicInbox::Sigfd::block_signals() if !$sigfd;
+			PublicInbox::DS::block_signals() if !$sigfd;
 			for my $i ($n..$want) {
 				my $pid = fork;
 				if (!defined $pid) {
@@ -544,7 +544,7 @@ EOF
 					$pids{$pid} = $i;
 				}
 			}
-			PublicInbox::Sigfd::sig_setmask($oldset) if !$sigfd;
+			PublicInbox::DS::sig_setmask($oldset) if !$sigfd;
 		}
 
 		if ($sigfd) { # Linux and IO::KQueue users:
@@ -632,7 +632,7 @@ sub daemon_loop ($$$$) {
 	if (!$sigfd) {
 		# wake up every second to accept signals if we don't
 		# have signalfd or IO::KQueue:
-		PublicInbox::Sigfd::sig_setmask($oldset);
+		PublicInbox::DS::sig_setmask($oldset);
 		PublicInbox::DS->SetLoopTimeout(1000);
 	}
 	PublicInbox::DS->EventLoop;
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index e6c21866..85959a95 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -1090,7 +1090,7 @@ sub eidx_watch { # public-inbox-extindex --watch main loop
 	$pr->("performing initial scan ...\n") if $pr;
 	my $sync = eidx_sync($self, $opt); # initial sync
 	return if $sync->{quit};
-	my $oldset = PublicInbox::Sigfd::block_signals();
+	my $oldset = PublicInbox::DS::block_signals();
 	local $self->{current_info} = '';
 	my $cb = $SIG{__WARN__} || \&CORE::warn;
 	local $SIG{__WARN__} = sub { $cb->($self->{current_info}, ': ', @_) };
@@ -1108,7 +1108,7 @@ sub eidx_watch { # public-inbox-extindex --watch main loop
 	if (!$sigfd) {
 		# wake up every second to accept signals if we don't
 		# have signalfd or IO::KQueue:
-		PublicInbox::Sigfd::sig_setmask($oldset);
+		PublicInbox::DS::sig_setmask($oldset);
 		PublicInbox::DS->SetLoopTimeout(1000);
 	}
 	PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} });
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index c1f6f920..81623fc0 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -81,7 +81,7 @@ sub ipc_worker_spawn {
 	delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)});
 	pipe(my ($r_req, $w_req)) or die "pipe: $!";
 	pipe(my ($r_res, $w_res)) or die "pipe: $!";
-	my $sigset = $oldset // PublicInbox::Sigfd::block_signals();
+	my $sigset = $oldset // PublicInbox::DS::block_signals();
 	my $parent = $$;
 	$self->ipc_atfork_parent;
 	defined(my $pid = fork) or die "fork: $!";
@@ -92,13 +92,13 @@ sub ipc_worker_spawn {
 		$w_res->autoflush(1);
 		$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
 		local $0 = $ident;
-		PublicInbox::Sigfd::sig_setmask($oldset);
+		PublicInbox::DS::sig_setmask($oldset);
 		my $on_destroy = $self->ipc_atfork_child;
 		eval { ipc_worker_loop($self, $r_req, $w_res) };
 		die "worker $ident PID:$$ died: $@\n" if $@;
 		exit;
 	}
-	PublicInbox::Sigfd::sig_setmask($sigset) unless $oldset;
+	PublicInbox::DS::sig_setmask($sigset) unless $oldset;
 	$r_req = $w_res = undef;
 	$w_req->autoflush(1);
 	$self->{-ipc_req} = $w_req;
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index a5658e6d..12e227d2 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -607,7 +607,8 @@ sub start_pager {
 	my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} };
 	$self->{1} = $w;
 	$self->{2} = $w if -t $self->{2};
-	$self->{'pager.pid'} = spawn([$pager], $env, $rdr);
+	my $pid = spawn([$pager], $env, $rdr);
+	dwaitpid($pid, undef, $self->{sock});
 	$env->{GIT_PAGER_IN_USE} = 'true'; # we may spawn git
 }
 
@@ -689,7 +690,7 @@ sub lazy_start {
 	my @st = stat($path) or die "stat($path): $!";
 	my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
 	pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
-	my $oldset = PublicInbox::Sigfd::block_signals();
+	my $oldset = PublicInbox::DS::block_signals();
 	if ($nfd == 1) {
 		require IO::FDPass;
 		$recv_3fds = sub { map { IO::FDPass::recv($_[0]) } (0..2) };
@@ -736,7 +737,7 @@ sub lazy_start {
 	} else {
 		# wake up every second to accept signals if we don't
 		# have signalfd or IO::KQueue:
-		PublicInbox::Sigfd::sig_setmask($oldset);
+		PublicInbox::DS::sig_setmask($oldset);
 		PublicInbox::DS->SetLoopTimeout(1000);
 	}
 	PublicInbox::DS->SetPostLoopCallback(sub {
@@ -801,9 +802,6 @@ sub oneshot {
 sub DESTROY {
 	my ($self) = @_;
 	$self->{1}->autoflush(1);
-	if (my $pid = delete $self->{'pager.pid'}) {
-		dwaitpid($pid, undef, $self->{sock});
-	}
 }
 
 1;
diff --git a/lib/PublicInbox/Sigfd.pm b/lib/PublicInbox/Sigfd.pm
index db0bf523..a4d1b3bb 100644
--- a/lib/PublicInbox/Sigfd.pm
+++ b/lib/PublicInbox/Sigfd.pm
@@ -7,7 +7,7 @@ package PublicInbox::Sigfd;
 use strict;
 use parent qw(PublicInbox::DS);
 use PublicInbox::Syscall qw(signalfd EPOLLIN EPOLLET SFD_NONBLOCK);
-use POSIX qw(:signal_h);
+use POSIX ();
 use IO::Handle ();
 
 # returns a coderef to unblock signals if neither signalfd or kqueue
@@ -63,14 +63,4 @@ sub event_step {
 	while (wait_once($_[0])) {} # non-blocking
 }
 
-sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" }
-
-sub block_signals () {
-	my $oldset = POSIX::SigSet->new;
-	my $newset = POSIX::SigSet->new;
-	$newset->fillset or die "fillset: $!";
-	sig_setmask($newset, $oldset);
-	$oldset;
-}
-
 1;
diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm
index c39ce1a7..9a729140 100644
--- a/lib/PublicInbox/Watch.pm
+++ b/lib/PublicInbox/Watch.pm
@@ -583,13 +583,13 @@ sub watch_atfork_child ($) {
 	delete $self->{opendirs};
 	PublicInbox::DS->Reset;
 	%SIG = (%SIG, %{$self->{sig}}, CHLD => 'DEFAULT');
-	PublicInbox::Sigfd::sig_setmask($self->{oldset});
+	PublicInbox::DS::sig_setmask($self->{oldset});
 }
 
 sub watch_atfork_parent ($) {
 	my ($self) = @_;
 	_done_for_now($self);
-	PublicInbox::Sigfd::block_signals();
+	PublicInbox::DS::block_signals();
 }
 
 sub imap_idle_requeue ($) { # DS::add_timer callback
@@ -648,7 +648,7 @@ sub event_step {
 				imap_idle_fork($self, $url_intvl);
 			}
 		};
-		PublicInbox::Sigfd::sig_setmask($oldset);
+		PublicInbox::DS::sig_setmask($oldset);
 		die $@ if $@;
 	}
 	fs_scan_step($self) if $self->{mdre};
@@ -716,7 +716,7 @@ sub poll_fetch_fork ($) { # DS::add_timer callback
 		close $w;
 		_exit(0);
 	}
-	PublicInbox::Sigfd::sig_setmask($oldset);
+	PublicInbox::DS::sig_setmask($oldset);
 	die "fork: $!"  unless defined $pid;
 	$self->{poll_pids}->{$pid} = [ $intvl, $urls ];
 	PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&poll_fetch_reap, $self]);
diff --git a/script/public-inbox-watch b/script/public-inbox-watch
index 9ada9c3b..10c7cd6f 100755
--- a/script/public-inbox-watch
+++ b/script/public-inbox-watch
@@ -19,7 +19,7 @@ my $do_scan = 1;
 GetOptions('scan!' => \$do_scan, # undocumented, testing only
 	'help|h' => \(my $show_help)) or do { print STDERR $help; exit 1 };
 if ($show_help) { print $help; exit 0 };
-my $oldset = PublicInbox::Sigfd::block_signals();
+my $oldset = PublicInbox::DS::block_signals();
 STDOUT->autoflush(1);
 STDERR->autoflush(1);
 local $0 = $0; # local since this script may be eval-ed
@@ -60,7 +60,7 @@ if ($watch) {
 	my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
 	local %SIG = (%SIG, %$sig) if !$sigfd;
 	if (!$sigfd) {
-		PublicInbox::Sigfd::sig_setmask($oldset);
+		PublicInbox::DS::sig_setmask($oldset);
 		PublicInbox::DS->SetLoopTimeout(1000);
 	}
 	$watch->watch($sig, $oldset) while ($watch);
diff --git a/t/spawn.t b/t/spawn.t
index 891a3702..558afc28 100644
--- a/t/spawn.t
+++ b/t/spawn.t
@@ -61,7 +61,7 @@ elsif ($pid > 0) {
 	select(undef, undef, undef, 0.01) while 1;
 }
 EOF
-	my $oldset = PublicInbox::Sigfd::block_signals();
+	my $oldset = PublicInbox::DS::block_signals();
 	my $rd = popen_rd([$^X, '-e', $script]);
 	diag 'waiting for child to reap grandchild...';
 	chomp(my $line = readline($rd));
@@ -70,7 +70,7 @@ EOF
 	ok(kill('CHLD', $pid), 'sent SIGCHLD to child');
 	is(readline($rd), "HI\n", '$SIG{CHLD} works in child');
 	ok(close $rd, 'popen_rd close works');
-	PublicInbox::Sigfd::sig_setmask($oldset);
+	PublicInbox::DS::sig_setmask($oldset);
 }
 
 {

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 04/22] ipc: add support for asynchronous callbacks
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
                   ` (2 preceding siblings ...)
  2021-01-10 12:15 ` [PATCH 03/22] ds: block signals when reaping Eric Wong
@ 2021-01-10 12:15 ` Eric Wong
  2021-01-10 12:15 ` [PATCH 05/22] cmd_ipc: send FDs with buffer payload Eric Wong
                   ` (17 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

Similar to git->cat_async, this will let us deal with responses
asynchronously, as well as being able to mix synchronous and
asynchronous code transparently (though perhaps not optimally).
---
 lib/PublicInbox/IPC.pm | 53 +++++++++++++++++++++++++++++++++++++++---
 t/ipc.t                | 25 ++++++++++++++++++++
 2 files changed, 75 insertions(+), 3 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 81623fc0..7dc8ec6a 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -8,6 +8,8 @@ use strict;
 use v5.10.1;
 use Carp qw(confess croak);
 use PublicInbox::Sigfd;
+use POSIX ();
+use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF();
 my ($enc, $dec);
 # ->imports at BEGIN turns sereal_*_with_object into custom ops on 5.14+
 # and eliminate method call overhead
@@ -41,10 +43,15 @@ sub _get_rec ($) {
 	thaw($buf);
 }
 
+sub _pack_rec ($) {
+	my ($ref) = @_;
+	my $buf = freeze($ref);
+	length($buf) . "\n" . $buf;
+}
+
 sub _send_rec ($$) {
 	my ($w, $ref) = @_;
-	my $buf = freeze($ref);
-	print $w length($buf), "\n", $buf or croak "print: $!";
+	print $w _pack_rec($ref) or croak "print: $!";
 }
 
 sub ipc_return ($$$) {
@@ -156,6 +163,21 @@ sub ipc_lock_init {
 	$self->{-ipc_lock} //= bless { lock_path => $f }, 'PublicInbox::Lock'
 }
 
+sub ipc_async_wait ($$) {
+	my ($self, $max) = @_; # max == -1 to wait for all
+	my $aif = $self->{-async_inflight} or return;
+	my $r_res = $self->{-ipc_res} or die 'BUG: no ipc_res';
+	while (my ($sub, $bytes, $cb, $cb_arg) = splice(@$aif, 0, 4)) {
+		my $ret = _get_rec($r_res) //
+			die "no response on $sub (req.size=$bytes)";
+		$self->{-async_inflight_bytes} -= $bytes;
+
+		eval { $cb->($cb_arg, $ret) };
+		warn "E: $sub callback error: $@\n" if $@;
+		return if --$max == 0;
+	}
+}
+
 # call $self->$sub(@args), on a worker if ipc_worker_spawn was used
 sub ipc_do {
 	my ($self, $sub, @args) = @_;
@@ -163,7 +185,8 @@ sub ipc_do {
 		my $ipc_lock = $self->{-ipc_lock};
 		my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef;
 		if (defined(wantarray)) {
-			my $r_res = $self->{-ipc_res} or die 'no ipc_res';
+			my $r_res = $self->{-ipc_res} or die 'BUG: no ipc_res';
+			ipc_async_wait($self, -1);
 			_send_rec($w_req, [ wantarray, $sub, @args ]);
 			my $ret = _get_rec($r_res) // die "no response on $sub";
 			die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
@@ -176,6 +199,30 @@ sub ipc_do {
 	}
 }
 
+sub ipc_async {
+	my ($self, $sub, $sub_args, $cb, $cb_arg) = @_;
+	if (my $w_req = $self->{-ipc_req}) { # run in worker
+		my $rec = _pack_rec([ 1, $sub, @$sub_args ]);
+		my $cur_bytes = \($self->{-async_inflight_bytes} //= 0);
+		while (($$cur_bytes + length($rec)) > PIPE_BUF) {
+			ipc_async_wait($self, 1);
+		}
+		my $ipc_lock = $self->{-ipc_lock};
+		my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef;
+		print $w_req $rec or croak "print: $!";
+		$$cur_bytes += length($rec);
+		push @{$self->{-async_inflight}},
+				$sub, length($rec), $cb, $cb_arg;
+	} else {
+		my $ret = [ eval { $self->$sub(@$sub_args) } ];
+		if (my $exc = $@) {
+			$ret = ( bless(\$exc, 'PublicInbox::IPC::Die') );
+		}
+		eval { $cb->($cb_arg, $ret) };
+		warn "E: $sub callback error: $@\n" if $@;
+	}
+}
+
 # needed when there's multiple IPC workers and the parent forking
 # causes newer siblings to inherit older siblings sockets
 sub ipc_sibling_atfork_child {
diff --git a/t/ipc.t b/t/ipc.t
index 0efc5394..400fb768 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -21,6 +21,7 @@ my $ipc = bless {}, 'PublicInbox::IPC';
 my @t = qw(array scalar scalarref undef);
 my $test = sub {
 	my $x = shift;
+	my @res;
 	for my $type (@t) {
 		my $m = "test_$type";
 		my @ret = $ipc->ipc_do($m);
@@ -29,10 +30,34 @@ my $test = sub {
 
 		$ipc->ipc_do($m);
 
+		$ipc->ipc_async($m, [], sub { push @res, \@_ }, \$m);
+
 		my $ret = $ipc->ipc_do($m);
 		my $exp = $ipc->$m;
 		is_deeply($ret, $exp, "!wantarray $m $x");
+
+		is_deeply(\@res, [ [ \$m, \@exp ] ], "async $m $x");
+		@res = ();
 	}
+	$ipc->ipc_async_wait(-1);
+	is_deeply(\@res, [], 'no leftover results');
+	$ipc->ipc_async('test_die', ['die test'],
+			sub { push @res, \@_  }, 'die arg');
+	$ipc->ipc_async_wait(1);
+	is(scalar(@res), 1, 'only one result');
+	is(scalar(@{$res[0]}), 2, 'result has 2-element array');
+	is($res[0]->[0], 'die arg', 'got async die arg '.$x);
+	is(ref($res[0]->[1]), 'PublicInbox::IPC::Die',
+		"exception type $x");
+	{
+		my $nr = PublicInbox::IPC::PIPE_BUF();
+		my $count = 0;
+		my $cb = sub { ++$count };
+		$ipc->ipc_async('test_undef', [], $cb) for (1..$nr);
+		$ipc->ipc_async_wait(-1);
+		is($count, $nr, "$x async runs w/o deadlock");
+	}
+
 	my $ret = eval { $ipc->test_die('phail') };
 	my $exp = $@;
 	$ret = eval { $ipc->ipc_do('test_die', 'phail') };

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 05/22] cmd_ipc: send FDs with buffer payload
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
                   ` (3 preceding siblings ...)
  2021-01-10 12:15 ` [PATCH 04/22] ipc: add support for asynchronous callbacks Eric Wong
@ 2021-01-10 12:15 ` Eric Wong
  2021-01-10 12:15 ` [PATCH 06/22] ipc: avoid excessive evals Eric Wong
                   ` (16 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

For another step in in syscall reduction, we'll support
transferring 3 FDs and a buffer with a single sendmsg/recvmsg
syscall using Socket::MsgHdr if available.

Beyond script/lei itself, this will be used for internal IPC
between search backends (perhaps with SOCK_SEQPACKET).  There's
a chance this could make it to the public-facing daemons, too.

This adds an optional dependency on the Socket::MsgHdr package,
available as libsocket-msghdr-perl on Debian-based distros
(but not CentOS 7.x and FreeBSD 11.x, at least).

Our Inline::C version in PublicInbox::Spawn remains the last
choice for script/lei due to the high startup time, and
IO::FDPass remains supported for non-Debian distros.

Since the socket name prefix changes from 3 to 4, we'll also
take this opportunity to make the argv+env buffer transfer less
error-prone by relying on argc instead of designated delimiters.
---
 MANIFEST                   |  3 ++
 lib/PublicInbox/CmdIPC1.pm | 30 +++++++++++++
 lib/PublicInbox/CmdIPC4.pm | 34 ++++++++++++++
 lib/PublicInbox/LEI.pm     | 45 ++++++++++---------
 lib/PublicInbox/Spawn.pm   | 45 +++++++++++--------
 script/lei                 | 53 +++++++++++-----------
 t/cmd_ipc.t                | 90 ++++++++++++++++++++++++++++++++++++++
 t/lei.t                    | 22 +++++++---
 t/spawn.t                  | 29 ------------
 9 files changed, 250 insertions(+), 101 deletions(-)
 create mode 100644 lib/PublicInbox/CmdIPC1.pm
 create mode 100644 lib/PublicInbox/CmdIPC4.pm
 create mode 100644 t/cmd_ipc.t

diff --git a/MANIFEST b/MANIFEST
index 609160dd..62c14cd2 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -109,6 +109,8 @@ lib/PublicInbox/Admin.pm
 lib/PublicInbox/AdminEdit.pm
 lib/PublicInbox/AltId.pm
 lib/PublicInbox/Cgit.pm
+lib/PublicInbox/CmdIPC1.pm
+lib/PublicInbox/CmdIPC4.pm
 lib/PublicInbox/CompressNoop.pm
 lib/PublicInbox/Config.pm
 lib/PublicInbox/ConfigIter.pm
@@ -275,6 +277,7 @@ t/altid.t
 t/altid_v2.t
 t/cgi.t
 t/check-www-inbox.perl
+t/cmd_ipc.t
 t/config.t
 t/config_limiter.t
 t/content_hash.t
diff --git a/lib/PublicInbox/CmdIPC1.pm b/lib/PublicInbox/CmdIPC1.pm
new file mode 100644
index 00000000..0eed8bed
--- /dev/null
+++ b/lib/PublicInbox/CmdIPC1.pm
@@ -0,0 +1,30 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# callers should use PublicInbox::CmdIPC1->can('send_cmd1') (or recv_cmd1)
+# 2nd choice for lei(1) front-end and 3rd choice for lei internals
+package PublicInbox::CmdIPC1;
+use strict;
+use v5.10.1;
+BEGIN { eval {
+require IO::FDPass; # XS, available in all major distros
+no warnings 'once';
+
+*send_cmd1 = sub ($$$$$$) { # (sock, in, out, err, buf, flags) = @_;
+	for (1..3) {
+		IO::FDPass::send(fileno($_[0]), $_[$_]) or
+					die "IO::FDPass::send: $!";
+	}
+	send($_[0], $_[4], $_[5]) or die "send $!";
+};
+
+*recv_cmd1 = sub ($$$) {
+	my ($s, undef, $len) = @_;
+	my @fds = map { IO::FDPass::recv(fileno($s)) } (0..2);
+	recv($s, $_[1], $len, 0) // die "recv: $!";
+	length($_[1]) == 0 ? () : @fds;
+};
+
+} } # /eval /BEGIN
+
+1;
diff --git a/lib/PublicInbox/CmdIPC4.pm b/lib/PublicInbox/CmdIPC4.pm
new file mode 100644
index 00000000..90fca62d
--- /dev/null
+++ b/lib/PublicInbox/CmdIPC4.pm
@@ -0,0 +1,34 @@
+# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# callers should use PublicInbox::CmdIPC4->can('send_cmd4') (or recv_cmd4)
+# first choice for script/lei front-end and 2nd choice for lei backend
+# libsocket-msghdr-perl is in Debian but many other distros as of 2021.
+package PublicInbox::CmdIPC4;
+use strict;
+use v5.10.1;
+use Socket qw(SOL_SOCKET SCM_RIGHTS);
+BEGIN { eval {
+require Socket::MsgHdr; # XS
+no warnings 'once';
+
+# 3 FDs per-sendmsg(2) + buffer
+*send_cmd4 = sub ($$$$$$) { # (sock, in, out, err, buf, flags) = @_;
+	my $mh = Socket::MsgHdr->new(buf => $_[4]);
+	$mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS, pack('iii', @_[1,2,3]));
+	Socket::MsgHdr::sendmsg($_[0], $mh, $_[5]) or die "sendmsg: $!";
+};
+
+*recv_cmd4 = sub ($$$) {
+	my ($s, undef, $len) = @_; # $_[1] = destination buffer
+	my $mh = Socket::MsgHdr->new(buflen => $len, controllen => 256);
+	my $r = Socket::MsgHdr::recvmsg($s, $mh, 0) // die "recvmsg: $!";
+	$_[1] = $mh->buf;
+	return () if $r == 0;
+	my (undef, undef, $data) = $mh->cmsghdr;
+	unpack('iii', $data);
+};
+
+} } # /eval /BEGIN
+
+1;
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 12e227d2..1f4ed0f6 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -26,7 +26,7 @@ use Text::Wrap qw(wrap);
 use File::Path qw(mkpath);
 use File::Spec;
 our $quit = \&CORE::exit;
-my $recv_3fds;
+my $recv_cmd;
 my $GLP = Getopt::Long::Parser->new;
 $GLP->configure(qw(gnu_getopt no_ignore_case auto_abbrev));
 my $GLP_PASS = Getopt::Long::Parser->new;
@@ -619,8 +619,9 @@ sub accept_dispatch { # Listener {post_accept} callback
 	my $self = bless { sock => $sock }, __PACKAGE__;
 	vec(my $rin = '', fileno($sock), 1) = 1;
 	# `say $sock' triggers "die" in lei(1)
+	my $buf;
 	if (select(my $rout = $rin, undef, undef, 1)) {
-		my @fds = $recv_3fds->(fileno($sock));
+		my @fds = $recv_cmd->($sock, $buf, 4096 * 33); # >MAX_ARG_STRLEN
 		if (scalar(@fds) == 3) {
 			my $i = 0;
 			for my $rdr (qw(<&= >&= >&=)) {
@@ -633,7 +634,7 @@ sub accept_dispatch { # Listener {post_accept} callback
 				}
 			}
 		} else {
-			say $sock "recv_3fds failed: $!";
+			say $sock "recv_cmd failed: $!";
 			return;
 		}
 	} else {
@@ -641,20 +642,20 @@ sub accept_dispatch { # Listener {post_accept} callback
 		return;
 	}
 	$self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY
-	# $ARGV_STR = join("]\0[", @ARGV);
-	# $ENV_STR = join('', map { "$_=$ENV{$_}\0" } keys %ENV);
-	# $line = "$$\0\0>$ARGV_STR\0\0>$ENV_STR\0\0";
-	my ($client_pid, $argv, $env) = do {
-		local $/ = "\0\0\0"; # yes, 3 NULs at EOL, not 2
-		chomp(my $line = <$sock>);
-		split(/\0\0>/, $line, 3);
-	};
-	my %env = map { split(/=/, $_, 2) } split(/\0/, $env);
+	# $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV);
+	# $buf = "$$\0$argc\0".join("\0", @ARGV).$ENV_STR."\0\0";
+	if (substr($buf, -2, 2, '') ne "\0\0") { # s/\0\0\z//
+		say $sock "request command truncated";
+		return;
+	}
+	my ($client_pid, $argc, @argv) = split(/\0/, $buf, -1);
+	undef $buf;
+	my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
 	if (chdir($env{PWD})) {
 		local %ENV = %env;
 		$self->{env} = \%env;
-		$self->{pid} = $client_pid;
-		eval { dispatch($self, split(/\]\0\[/, $argv)) };
+		$self->{pid} = $client_pid + 0;
+		eval { dispatch($self, @argv) };
 		say $sock $@ if $@;
 	} else {
 		say $sock "chdir($env{PWD}): $!"; # implicit close
@@ -692,13 +693,17 @@ sub lazy_start {
 	pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
 	my $oldset = PublicInbox::DS::block_signals();
 	if ($nfd == 1) {
-		require IO::FDPass;
-		$recv_3fds = sub { map { IO::FDPass::recv($_[0]) } (0..2) };
-	} elsif ($nfd == 3) {
-		$recv_3fds = PublicInbox::Spawn->can('recv_3fds');
+		require PublicInbox::CmdIPC1;
+		$recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1');
+	} elsif ($nfd == 4) {
+		$recv_cmd = PublicInbox::Spawn->can('recv_cmd4') // do {
+			require PublicInbox::CmdIPC4;
+			PublicInbox::CmdIPC4->can('recv_cmd4');
+		};
 	}
-	$recv_3fds or die
-		"IO::FDPass missing or Inline::C not installed/configured\n";
+	$recv_cmd or die <<"";
+(Socket::MsgHdr || IO::FDPass || Inline::C) missing/unconfigured (nfd=$nfd);
+
 	require PublicInbox::Listener;
 	require PublicInbox::EOFpipe;
 	(-p STDOUT) or die "E: stdout must be a pipe\n";
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index cd94ba96..7d0d9597 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -201,6 +201,8 @@ void nodatacow_dir(const char *dir)
 }
 SET_NODATACOW
 
+# last choice for script/lei, 1st choice for lei internals
+# compatible with PublicInbox::CmdIPC4
 my $fdpass = <<'FDPASS';
 #include <sys/types.h>
 #include <sys/uio.h>
@@ -213,16 +215,23 @@ union my_cmsg {
 	char pad[sizeof(struct cmsghdr)+ 8 + sizeof(struct my_3fds) + 8];
 };
 
-int send_3fds(int sockfd, int infd, int outfd, int errfd)
+int send_cmd4(PerlIO *s, int in, int out, int err, SV *data, int flags)
 {
 	struct msghdr msg = { 0 };
 	struct iovec iov;
 	union my_cmsg cmsg = { 0 };
 	int *fdp;
 	size_t i;
+	STRLEN dlen = 0;
 
-	iov.iov_base = &msg.msg_namelen; /* whatever */
-	iov.iov_len = 1;
+	if (SvOK(data)) {
+		iov.iov_base = SvPV(data, dlen);
+		iov.iov_len = dlen;
+	}
+	if (!dlen) { /* must be non-zero */
+		iov.iov_base = &msg.msg_namelen; /* whatever */
+		iov.iov_len = 1;
+	}
 	msg.msg_iov = &iov;
 	msg.msg_iovlen = 1;
 	msg.msg_control = &cmsg.hdr;
@@ -232,38 +241,38 @@ int send_3fds(int sockfd, int infd, int outfd, int errfd)
 	cmsg.hdr.cmsg_type = SCM_RIGHTS;
 	cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(struct my_3fds));
 	fdp = (int *)CMSG_DATA(&cmsg.hdr);
-	*fdp++ = infd;
-	*fdp++ = outfd;
-	*fdp++ = errfd;
-	return sendmsg(sockfd, &msg, 0) >= 0;
+	*fdp++ = in;
+	*fdp++ = out;
+	*fdp++ = err;
+	return sendmsg(PerlIO_fileno(s), &msg, flags) >= 0;
 }
 
-void recv_3fds(int sockfd)
+void recv_cmd4(PerlIO *s, SV *buf, STRLEN n)
 {
 	union my_cmsg cmsg = { 0 };
 	struct msghdr msg = { 0 };
 	struct iovec iov;
 	size_t i;
 	Inline_Stack_Vars;
+	Inline_Stack_Reset;
 
-	iov.iov_base = &msg.msg_namelen; /* whatever */
-	iov.iov_len = 1;
+	if (!SvOK(buf))
+		sv_setpvn(buf, "", 0);
+	iov.iov_base = SvGROW(buf, n + 1);
+	iov.iov_len = n;
 	msg.msg_iov = &iov;
 	msg.msg_iovlen = 1;
 	msg.msg_control = &cmsg.hdr;
 	msg.msg_controllen = CMSG_SPACE(sizeof(struct my_3fds));
 
-	if (recvmsg(sockfd, &msg, 0) <= 0)
-		return;
-
-	errno = EDOM;
-	Inline_Stack_Reset;
-	if (cmsg.hdr.cmsg_level == SOL_SOCKET &&
+	i = recvmsg(PerlIO_fileno(s), &msg, 0);
+	if (i < 0)
+		croak("recvmsg: %s", strerror(errno));
+	SvCUR_set(buf, i);
+	if (i > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
 			cmsg.hdr.cmsg_type == SCM_RIGHTS &&
 			cmsg.hdr.cmsg_len == CMSG_LEN(sizeof(struct my_3fds))) {
 		int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
-		size_t i;
-
 		for (i = 0; i < 3; i++)
 			Inline_Stack_Push(sv_2mortal(newSViv(*fdp++)));
 	}
diff --git a/script/lei b/script/lei
index 2ea98da4..d954b9eb 100755
--- a/script/lei
+++ b/script/lei
@@ -4,17 +4,20 @@
 use strict;
 use v5.10.1;
 use Socket qw(AF_UNIX SOCK_STREAM pack_sockaddr_un);
-my ($send_3fds, $nfd);
-if (my ($sock, $pwd) = eval {
-	$send_3fds = eval {
-		require IO::FDPass;
-		$nfd = 1; # 1 FD per-sendmsg
-		sub { IO::FDPass::send($_[0], $_[$_]) for (1..3) }
-	} // do {
-		require PublicInbox::Spawn; # takes ~50ms even if built *sigh*
-		$nfd = 3; # 3 FDs per-sendmsg(2)
-		PublicInbox::Spawn->can('send_3fds');
-	} // die "IO::FDPass missing or Inline::C not installed/configured\n";
+use PublicInbox::CmdIPC4;
+my $narg = 4;
+my $send_cmd = PublicInbox::CmdIPC4->can('send_cmd4') // do {
+	require PublicInbox::CmdIPC1; # 2nd choice
+	$narg = 1;
+	PublicInbox::CmdIPC1->can('send_cmd1');
+} // do {
+	require PublicInbox::Spawn; # takes ~50ms even if built *sigh*
+	$narg = 4;
+	PublicInbox::Spawn->can('send_cmd4');
+};
+
+my ($sock, $pwd);
+if ($send_cmd && eval {
 	my $path = do {
 		my $runtime_dir = ($ENV{XDG_RUNTIME_DIR} // '') . '/lei';
 		if ($runtime_dir eq '/lei') {
@@ -25,29 +28,27 @@ if (my ($sock, $pwd) = eval {
 			require File::Path;
 			File::Path::mkpath($runtime_dir, 0, 0700);
 		}
-		"$runtime_dir/$nfd.sock";
+		"$runtime_dir/$narg.sock";
 	};
 	my $addr = pack_sockaddr_un($path);
-	socket(my $sock, AF_UNIX, SOCK_STREAM, 0) or die "socket: $!";
+	socket($sock, AF_UNIX, SOCK_STREAM, 0) or die "socket: $!";
 	unless (connect($sock, $addr)) { # start the daemon if not started
 		local $ENV{PERL5LIB} = join(':', @INC);
 		open(my $daemon, '-|', $^X, qw[-MPublicInbox::LEI
 			-E PublicInbox::LEI::lazy_start(@ARGV)],
-			$path, $! + 0, $nfd) or die "popen: $!";
+			$path, $! + 0, $narg) or die "popen: $!";
 		while (<$daemon>) { warn $_ } # EOF when STDERR is redirected
 		close($daemon) or warn <<"";
 lei-daemon could not start, exited with \$?=$?
 
 		# try connecting again anyways, unlink+bind may be racy
-		unless (connect($sock, $addr)) {
-			die <<"";
+		connect($sock, $addr) or die <<"";
 connect($path): $! (after attempted daemon start)
 Falling back to (slow) one-shot mode
 
-		}
 	}
 	require Cwd;
-	my $pwd = $ENV{PWD} // '';
+	$pwd = $ENV{PWD} // '';
 	my $cwd = Cwd::fastcwd() // die "fastcwd(PWD=$pwd): $!";
 	if ($pwd ne $cwd) { # prefer ENV{PWD} if it's a symlink to real cwd
 		my @st_cwd = stat($cwd) or die "stat(cwd=$cwd): $!";
@@ -58,23 +59,21 @@ Falling back to (slow) one-shot mode
 	} else {
 		$pwd = $cwd;
 	}
-	($sock, $pwd);
-}) { # IO::FDPass, $sock, $pwd are all available:
+	1;
+}) { # (Socket::MsgHdr|IO::FDPass|Inline::C), $sock, $pwd are all available:
 	local $ENV{PWD} = $pwd;
-	my $buf = "$$\0\0>" . join("]\0[", @ARGV) . "\0\0>";
-	while (my ($k, $v) = each %ENV) { $buf .= "$k=$v\0" }
+	my $buf = join("\0", $$, scalar(@ARGV), @ARGV);
+	while (my ($k, $v) = each %ENV) { $buf .= "\0$k=$v" }
 	$buf .= "\0\0";
 	select $sock;
 	$| = 1; # unbuffer selected $sock
-	$send_3fds->(fileno($sock), 0, 1, 2);
-	print $sock $buf or die "print(sock, buf): $!";
+	$send_cmd->($sock, 0, 1, 2, $buf, 0);
 	while ($buf = <$sock>) {
 		$buf =~ /\Aexit=([0-9]+)\n\z/ and exit($1 + 0);
 		die $buf;
 	}
-} else { # for systems lacking IO::FDPass
-	# don't warn about IO::FDPass since it's not commonly installed
-	warn $@ if $@ && index($@, 'IO::FDPass') < 0;
+} else { # for systems lacking Socket::MsgHdr, IO::FDPass or Inline::C
+	warn $@ if $@;
 	require PublicInbox::LEI;
 	PublicInbox::LEI::oneshot(__PACKAGE__);
 }
diff --git a/t/cmd_ipc.t b/t/cmd_ipc.t
new file mode 100644
index 00000000..b9f4d128
--- /dev/null
+++ b/t/cmd_ipc.t
@@ -0,0 +1,90 @@
+#!perl -w
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+use Socket qw(AF_UNIX SOCK_STREAM MSG_EOR);
+pipe(my ($r, $w)) or BAIL_OUT;
+my ($send, $recv);
+require_ok 'PublicInbox::Spawn';
+my $SOCK_SEQPACKET = eval { Socket::SOCK_SEQPACKET() } // undef;
+
+my $do_test = sub { SKIP: {
+	my ($type, $flag, $desc) = @_;
+	defined $type or skip 'SOCK_SEQPACKET missing', 7;
+	my ($s1, $s2);
+	my $src = 'some payload' x 40;
+	socketpair($s1, $s2, AF_UNIX, $type, 0) or BAIL_OUT $!;
+	$send->($s1, fileno($r), fileno($w), fileno($s1), $src, $flag);
+	my (@fds) = $recv->($s2, my $buf, length($src) + 1);
+	is($buf, $src, 'got buffer payload '.$desc);
+	my ($r1, $w1, $s1a);
+	my $opens = sub {
+		ok(open($r1, '<&=', $fds[0]), 'opened received $r');
+		ok(open($w1, '>&=', $fds[1]), 'opened received $w');
+		ok(open($s1a, '+>&=', $fds[2]), 'opened received $s1');
+	};
+	$opens->();
+	my @exp = stat $r;
+	my @cur = stat $r1;
+	is("$exp[0]\0$exp[1]", "$cur[0]\0$cur[1]", '$r dev/ino matches');
+	@exp = stat $w;
+	@cur = stat $w1;
+	is("$exp[0]\0$exp[1]", "$cur[0]\0$cur[1]", '$w dev/ino matches');
+	@exp = stat $s1;
+	@cur = stat $s1a;
+	is("$exp[0]\0$exp[1]", "$cur[0]\0$cur[1]", '$s1 dev/ino matches');
+	if (defined($SOCK_SEQPACKET) && $type == $SOCK_SEQPACKET) {
+		$r1 = $w1 = $s1a = undef;
+		$src = (',' x 1023) . '-' .('.' x 1024);
+		$send->($s1, fileno($r), fileno($w), fileno($s1), $src, $flag);
+		(@fds) = $recv->($s2, $buf, 1024);
+		is($buf, (',' x 1023) . '-', 'silently truncated buf');
+		$opens->();
+		$r1 = $w1 = $s1a = undef;
+		close $s1;
+		@fds = $recv->($s2, $buf, length($src) + 1);
+		is_deeply(\@fds, [], "no FDs on EOF $desc");
+		is($buf, '', "buffer cleared on EOF ($desc)");
+
+	}
+} };
+
+my $send_ic = PublicInbox::Spawn->can('send_cmd4');
+my $recv_ic = PublicInbox::Spawn->can('recv_cmd4');
+SKIP: {
+	($send_ic && $recv_ic) or skip 'Inline::C not installed/enabled', 12;
+	$send = $send_ic;
+	$recv = $recv_ic;
+	$do_test->(SOCK_STREAM, 0, 'Inline::C stream');
+	$do_test->($SOCK_SEQPACKET, MSG_EOR, 'Inline::C seqpacket');
+}
+
+SKIP: {
+	require_mods('Socket::MsgHdr', 13);
+	require_ok 'PublicInbox::CmdIPC4';
+	$send = PublicInbox::CmdIPC4->can('send_cmd4');
+	$recv = PublicInbox::CmdIPC4->can('recv_cmd4');
+	$do_test->(SOCK_STREAM, 0, 'MsgHdr stream');
+	$do_test->($SOCK_SEQPACKET, MSG_EOR, 'MsgHdr seqpacket');
+	SKIP: {
+		($send_ic && $recv_ic) or
+			skip 'Inline::C not installed/enabled', 12;
+		$recv = $recv_ic;
+		$do_test->(SOCK_STREAM, 0, 'Inline::C -> MsgHdr stream');
+		$do_test->($SOCK_SEQPACKET, 0, 'Inline::C -> MsgHdr seqpacket');
+	}
+}
+
+SKIP: {
+	require_mods('IO::FDPass', 13);
+	require_ok 'PublicInbox::CmdIPC1';
+	$send = PublicInbox::CmdIPC1->can('send_cmd1');
+	$recv = PublicInbox::CmdIPC1->can('recv_cmd1');
+	$do_test->(SOCK_STREAM, 0, 'IO::FDPass stream');
+	$do_test->($SOCK_SEQPACKET, MSG_EOR, 'IO::FDPass seqpacket');
+}
+
+done_testing;
diff --git a/t/lei.t b/t/lei.t
index 72c50308..992800a5 100644
--- a/t/lei.t
+++ b/t/lei.t
@@ -10,6 +10,8 @@ use File::Path qw(rmtree);
 require_git 2.6;
 require_mods(qw(json DBD::SQLite Search::Xapian));
 my $opt = { 1 => \(my $out = ''), 2 => \(my $err = '') };
+my ($home, $for_destroy) = tmpdir();
+my $err_filter;
 my $lei = sub {
 	my ($cmd, $env, $xopt) = @_;
 	$out = $err = '';
@@ -17,10 +19,12 @@ my $lei = sub {
 		($env, $xopt) = grep { (!defined) || ref } @_;
 		$cmd = [ grep { defined && !ref } @_ ];
 	}
-	run_script(['lei', @$cmd], $env, $xopt // $opt);
+	my $res = run_script(['lei', @$cmd], $env, $xopt // $opt);
+	$err_filter and
+		$err = join('', grep(!/$err_filter/, split(/^/m, $err)));
+	$res;
 };
 
-my ($home, $for_destroy) = tmpdir();
 delete local $ENV{XDG_DATA_HOME};
 delete local $ENV{XDG_CONFIG_HOME};
 local $ENV{GIT_COMMITTER_EMAIL} = 'lei@example.com';
@@ -195,18 +199,22 @@ my $test_lei_common = sub {
 
 if ($ENV{TEST_LEI_ONESHOT}) {
 	require_ok 'PublicInbox::LEI';
-	# force sun_path[108] overflow, "IO::FDPass" avoids warning
-	local $ENV{XDG_RUNTIME_DIR} = "$home/IO::FDPass".('.sun_path' x 108);
+	# force sun_path[108] overflow, ($lei->() filters out this path)
+	my $xrd = "$home/1shot-test".('.sun_path' x 108);
+	local $ENV{XDG_RUNTIME_DIR} = $xrd;
+	$err_filter = qr!\Q$xrd!;
 	$test_lei_common->();
 }
 
 SKIP: { # real socket
 	require_mods(qw(Cwd), my $nr = 105);
-	my $nfd = eval { require IO::FDPass; 1 } // do {
+	my $nfd = eval { require Socket::MsgHdr; 4 } //
+			eval { require IO::FDPass; 1 } // do {
 		require PublicInbox::Spawn;
-		PublicInbox::Spawn->can('send_3fds') ? 3 : undef;
+		PublicInbox::Spawn->can('send_cmd4') ? 4 : undef;
 	} //
-	skip 'IO::FDPass missing or Inline::C not installed/configured', $nr;
+	skip 'Socket::MsgHdr, IO::FDPass or Inline::C missing or unconfigured',
+		$nr;
 
 	local $ENV{XDG_RUNTIME_DIR} = "$home/xdg_run";
 	my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/$nfd.sock";
diff --git a/t/spawn.t b/t/spawn.t
index 558afc28..0eed79bb 100644
--- a/t/spawn.t
+++ b/t/spawn.t
@@ -5,35 +5,6 @@ use warnings;
 use Test::More;
 use PublicInbox::Spawn qw(which spawn popen_rd);
 use PublicInbox::Sigfd;
-use Socket qw(AF_UNIX SOCK_STREAM);
-
-SKIP: {
-	my $recv_3fds = PublicInbox::Spawn->can('recv_3fds');
-	my $send_3fds = PublicInbox::Spawn->can('send_3fds');
-	skip 'Inline::C not enabled', 3 unless $send_3fds && $recv_3fds;
-	my ($s1, $s2);
-	socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or BAIL_OUT $!;
-	pipe(my ($r, $w)) or BAIL_OUT $!;
-	my @orig = ($r, $w, $s2);
-	my @fd = map { fileno($_) } @orig;
-	ok($send_3fds->(fileno($s1), $fd[0], $fd[1], $fd[2]),
-		'FDs sent');
-	my (@fds) = $recv_3fds->(fileno($s2));
-	is(scalar(@fds), 3, 'got 3 fds');
-	use Data::Dumper; diag Dumper(\@fds);
-	is(scalar(grep(/\A\d+\z/, @fds)), 3, 'all valid FDs');
-	my $i = 0;
-	my @cmp = map {
-		open my $new, $_, shift(@fds) or BAIL_OUT "open $! $i => $_";
-		($new, shift(@orig), $i++);
-	} (qw(<&= >&= +<&=));
-	while (my ($new, $old, $fd) = splice(@cmp, 0, 3)) {
-		my @new = stat($new);
-		my @old = stat($old);
-		is("$old[0]\0$old[1]", "$new[0]\0$new[1]",
-			"device/inode matches on received FD:$fd");
-	}
-}
 
 {
 	my $true = which('true');

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 06/22] ipc: avoid excessive evals
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
                   ` (4 preceding siblings ...)
  2021-01-10 12:15 ` [PATCH 05/22] cmd_ipc: send FDs with buffer payload Eric Wong
@ 2021-01-10 12:15 ` Eric Wong
  2021-01-10 12:15 ` [PATCH 07/22] ipc: work queue support via SOCK_SEQPACKET Eric Wong
                   ` (15 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

We should not need an eval for warning with our code base.
Nowadays, dwaitpid() automatically does the right thing
regardless of whether we're in the event loop, so no eval
is needed there, either.
---
 lib/PublicInbox/IPC.pm | 12 ++----------
 1 file changed, 2 insertions(+), 10 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 7dc8ec6a..5082f110 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -69,7 +69,7 @@ sub ipc_worker_loop ($$$) {
 		# this is the overwhelmingly likely case
 		if (!defined($wantarray)) {
 			eval { $self->$sub(@args) };
-			eval { warn "$$ die: $@ (from nowait $sub)\n" } if $@;
+			warn "$$ die: $@ (from nowait $sub)\n" if $@;
 		} elsif ($wantarray) {
 			my @ret = eval { $self->$sub(@args) };
 			ipc_return($w_res, \@ret, $@);
@@ -144,15 +144,7 @@ sub ipc_worker_stop {
 
 	# allow any sibling to send ipc_worker_exit, but siblings can't wait
 	return if $$ != $ppid;
-	eval {
-		my $reap = $self->can('ipc_worker_reap');
-		PublicInbox::DS::dwaitpid($pid, $reap, $self);
-	};
-	if ($@) {
-		my $wp = waitpid($pid, 0);
-		$pid == $wp or die "waitpid($pid) returned $wp: \$?=$?";
-		$self->ipc_worker_reap($pid);
-	}
+	PublicInbox::DS::dwaitpid($pid, \&ipc_worker_reap, $self);
 }
 
 # use this if we have multiple readers reading curl or "pigz -dc"

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 07/22] ipc: work queue support via SOCK_SEQPACKET
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
                   ` (5 preceding siblings ...)
  2021-01-10 12:15 ` [PATCH 06/22] ipc: avoid excessive evals Eric Wong
@ 2021-01-10 12:15 ` Eric Wong
  2021-01-10 12:15 ` [PATCH 08/22] ipc: eliminate ipc_worker_stop method Eric Wong
                   ` (14 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

This will allow any number of younger sibling processes to
communicate with older siblings directly without relying on a
mediator process.  This is intended to be useful for
distributing search work across multiple workers without caring
which worker hits it (we only care about shard members).

And any request sent with this will be able to hit any worker
without locking on our part.

Unix stream sockets with a listener were also considered;
binding to a file on the FS may confuse users given there's
already a socket path for lei(1).  Linux-only Abstract or
autobind sockets are rejected due to lack of portability.

SOCK_SEQPACKET via socketpair(2) was chosen since it's POSIX
2008 and available on FreeBSD 9+ in addition to Linux, and
doesn't require filesystem access.
---
 lib/PublicInbox/IPC.pm | 106 +++++++++++++++++++++++++++++++++++++++--
 t/ipc.t                |  66 +++++++++++++++++++++++++
 2 files changed, 168 insertions(+), 4 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 5082f110..27ea90de 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -1,14 +1,16 @@
 # Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
-# base class for remote IPC calls, requires Storable
-# TODO: this ought to be usable in SearchIdxShard
+# base class for remote IPC calls and workqueues, requires Storable or Sereal
 package PublicInbox::IPC;
 use strict;
 use v5.10.1;
 use Carp qw(confess croak);
-use PublicInbox::Sigfd;
+use PublicInbox::DS qw(dwaitpid);
+use PublicInbox::Spawn;
 use POSIX ();
+use Socket qw(AF_UNIX MSG_EOR);
+my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
 use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF();
 my ($enc, $dec);
 # ->imports at BEGIN turns sereal_*_with_object into custom ops on 5.14+
@@ -34,6 +36,17 @@ if ($enc && $dec) { # should be custom ops
 	} // warn("Storable (part of Perl) missing: $@\n");
 }
 
+my $recv_cmd = PublicInbox::Spawn->can('recv_cmd4');
+my $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do {
+	require PublicInbox::CmdIPC4;
+	$recv_cmd //= PublicInbox::CmdIPC4->can('recv_cmd4');
+	PublicInbox::CmdIPC4->can('send_cmd4');
+} // do {
+	require PublicInbox::CmdIPC1;
+	$recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1');
+	PublicInbox::CmdIPC1->can('send_cmd1');
+};
+
 sub _get_rec ($) {
 	my ($r) = @_;
 	defined(my $len = <$r>) or return;
@@ -144,7 +157,7 @@ sub ipc_worker_stop {
 
 	# allow any sibling to send ipc_worker_exit, but siblings can't wait
 	return if $$ != $ppid;
-	PublicInbox::DS::dwaitpid($pid, \&ipc_worker_reap, $self);
+	dwaitpid($pid, \&ipc_worker_reap, $self);
 }
 
 # use this if we have multiple readers reading curl or "pigz -dc"
@@ -224,4 +237,89 @@ sub ipc_sibling_atfork_child {
 	$pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
 }
 
+sub wq_worker_loop ($$) {
+	my ($self, $s2) = @_;
+	my $buf;
+	my $len = $self->{wq_req_len} // (4096 * 33);
+	my ($rec, $sub, @args);
+	while (1) {
+		my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
+		my $i = 0;
+		my @m = @{$self->{wq_open_modes} // [qw( +<&= >&= >&= )]};
+		for my $fd (@fds) {
+			my $mode = shift(@m);
+			if (open(my $fh, $mode, $fd)) {
+				$self->{$i++} = $fh;
+			} else {
+				die "$$ open($mode$fd) (FD:$i): $!";
+			}
+		}
+		# Sereal dies, Storable returns undef
+		$rec = thaw($buf) //
+			die "thaw error on buffer of size:".length($buf);
+		($sub, @args) = @$rec;
+		eval { $self->$sub(@args) };
+		warn "$$ wq_worker: $@" if $@;
+		delete @$self{0, 1, 2};
+	}
+}
+
+sub wq_do { # always async
+	my ($self, $sub, $in, $out, $err, @args) = @_;
+	if (my $s1 = $self->{-wq_seq}) { # run in worker
+		$_ = fileno($_) for ($in, $out, $err);
+		$send_cmd->($s1, $in, $out, $err,
+				freeze([$sub, @args]), MSG_EOR);
+	} else {
+		@$self{0, 1, 2} = ($in, $out, $err);
+		eval { $self->$sub(@args) };
+		warn "wq_do: $@" if $@;
+		delete @$self{0, 1, 2};
+	}
+}
+
+# starts workqueue workers if Sereal or Storable is installed
+sub wq_workers_start {
+	my ($self, $ident, $nr_workers, $oldset) = @_;
+	($enc && $send_cmd && $recv_cmd && defined($SEQPACKET)) or return;
+	return if $self->{-wq_seq}; # idempotent
+	my ($s1, $s2);
+	socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!";
+	my $sigset = $oldset // PublicInbox::DS::block_signals();
+	$self->ipc_atfork_parent;
+	$nr_workers //= 4;
+	$self->{-wq_workers} = {};
+	for my $i (0..($nr_workers - 1)) {
+		defined(my $pid = fork) or die "fork: $!";
+		if ($pid == 0) {
+			eval { PublicInbox::DS->Reset };
+			$s1 = undef;
+			$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
+			local $0 = $ident."[$i]";
+			PublicInbox::DS::sig_setmask($oldset);
+			my $on_destroy = $self->ipc_atfork_child;
+			eval { wq_worker_loop($self, $s2) };
+			die "worker $ident PID:$$ died: $@\n" if $@;
+			exit;
+		} else {
+			$self->{-wq_workers}->{$pid} = $i;
+		}
+	}
+	PublicInbox::DS::sig_setmask($sigset) unless $oldset;
+	$s2 = undef;
+	$self->{-wq_seq} = $s1;
+	$self->{-wq_ppid} = $$;
+}
+
+sub wq_close {
+	my ($self) = @_;
+	delete $self->{-wq_seq} or return;
+	my $ppid = delete $self->{-wq_ppid} // die 'BUG: no wq_ppid';
+	my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
+	return if $ppid != $$; # can't reap siblings or parents
+	for my $pid (keys %$workers) {
+		dwaitpid($pid, \&ipc_worker_reap, $self);
+	}
+}
+
 1;
diff --git a/t/ipc.t b/t/ipc.t
index 400fb768..f09f76ef 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -5,6 +5,7 @@ use strict;
 use v5.10.1;
 use Test::More;
 use PublicInbox::TestCommon;
+use Fcntl qw(SEEK_SET);
 require_ok 'PublicInbox::IPC';
 state $once = eval <<'';
 package PublicInbox::IPC;
@@ -15,6 +16,13 @@ sub test_scalarref { \'scalarref' }
 sub test_undef { undef }
 sub test_die { shift; die @_; 'unreachable' }
 sub test_pid { $$ }
+sub test_write_each_fd {
+	my ($self, @args) = @_;
+	for my $fd (0..2) {
+		print { $self->{$fd} } "i=$fd $$ ", @args, "\n";
+		$self->{$fd}->flush;
+	}
+}
 1;
 
 my $ipc = bless {}, 'PublicInbox::IPC';
@@ -102,4 +110,62 @@ SKIP: {
 	ok(!kill(0, $pid) && $!{ESRCH}, 'worker stopped');
 }
 $ipc->ipc_worker_stop; # idempotent
+
+# work queues
+$ipc->{wq_open_modes} = [qw( >&= >&= >&= )];
+pipe(my ($ra, $wa)) or BAIL_OUT $!;
+pipe(my ($rb, $wb)) or BAIL_OUT $!;
+pipe(my ($rc, $wc)) or BAIL_OUT $!;
+open my $warn, '+>', undef or BAIL_OUT;
+$warn->autoflush(0);
+local $SIG{__WARN__} = sub { print $warn "PID:$$ ", @_ };
+my @ppids;
+for my $t ('local', 'worker', 'worker again') {
+	$ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, 'hello world');
+	my $i = 0;
+	for my $fh ($ra, $rb, $rc) {
+		my $buf = readline($fh);
+		is(chop($buf), "\n", "trailing CR ($t)");
+		like($buf, qr/\Ai=$i \d+ hello world\z/, "got expected ($t)");
+		$i++;
+	}
+	$ipc->wq_do('test_die', $wa, $wb, $wc);
+	my $ppid = $ipc->wq_workers_start('wq', 1);
+	push(@ppids, $ppid);
+}
+
+# wq_do works across fork (siblings can feed)
+SKIP: {
+	skip 'Socket::MsgHdr, IO::FDPass, Inline::C missing', 7 if !$ppids[0];
+	is_deeply(\@ppids, [$$, undef, undef],
+		'parent pid returned in wq_workers_start');
+	my $pid = fork // BAIL_OUT $!;
+	if ($pid == 0) {
+		use POSIX qw(_exit);
+		$ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, $$);
+		_exit(0);
+	} else {
+		my $i = 0;
+		my ($wpid, @rest) = keys %{$ipc->{-wq_workers}};
+		is(scalar(@rest), 0, 'only one worker');
+		for my $fh ($ra, $rb, $rc) {
+			my $buf = readline($fh);
+			is(chop($buf), "\n", "trailing CR #$i");
+			like($buf, qr/^i=$i $wpid $pid\z/,
+				'got expected from sibling');
+			$i++;
+		}
+		is(waitpid($pid, 0), $pid, 'waitpid complete');
+		is($?, 0, 'child wq producer exited');
+	}
+}
+
+$ipc->wq_close;
+seek($warn, 0, SEEK_SET) or BAIL_OUT;
+my @warn = <$warn>;
+is(scalar(@warn), 3, 'warned 3 times');
+like($warn[0], qr/ wq_do: /, '1st warned from wq_do');
+like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker');
+is($warn[2], $warn[1], 'worker did not die');
+
 done_testing;

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 08/22] ipc: eliminate ipc_worker_stop method
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
                   ` (6 preceding siblings ...)
  2021-01-10 12:15 ` [PATCH 07/22] ipc: work queue support via SOCK_SEQPACKET Eric Wong
@ 2021-01-10 12:15 ` Eric Wong
  2021-01-10 12:15 ` [PATCH 09/22] ipc: wq: support dynamic worker count change Eric Wong
                   ` (13 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

We can just EOF the pipe, and instead rely on per-class
error handling to deal with uncommitted transactions and
what not.
---
 lib/PublicInbox/IPC.pm | 8 --------
 1 file changed, 8 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 27ea90de..0c5205c1 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -136,12 +136,6 @@ sub ipc_worker_reap { # dwaitpid callback
 sub ipc_atfork_parent {}
 sub ipc_atfork_child {}
 
-# should only be called inside the worker process
-sub ipc_worker_exit {
-	my (undef, $code) = @_;
-	exit($code);
-}
-
 # idempotent, can be called regardless of whether worker is active or not
 sub ipc_worker_stop {
 	my ($self) = @_;
@@ -152,10 +146,8 @@ sub ipc_worker_stop {
 		return; # idempotent
 	}
 	die 'no PID with IPC pipes' unless $pid;
-	_send_rec($w_req, [ undef, 'ipc_worker_exit', 0 ]);
 	$w_req = $r_res = undef;
 
-	# allow any sibling to send ipc_worker_exit, but siblings can't wait
 	return if $$ != $ppid;
 	dwaitpid($pid, \&ipc_worker_reap, $self);
 }

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 09/22] ipc: wq: support dynamic worker count change
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
                   ` (7 preceding siblings ...)
  2021-01-10 12:15 ` [PATCH 08/22] ipc: eliminate ipc_worker_stop method Eric Wong
@ 2021-01-10 12:15 ` Eric Wong
  2021-01-10 12:15 ` [PATCH 10/22] ipc: drop -ipc_parent_pid field Eric Wong
                   ` (12 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

Increasing/decreasing workers count will be useful in
some situations.
---
 lib/PublicInbox/IPC.pm | 99 ++++++++++++++++++++++++++++++------------
 t/ipc.t                |  9 ++++
 2 files changed, 81 insertions(+), 27 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 0c5205c1..5bca3627 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -8,7 +8,7 @@ use v5.10.1;
 use Carp qw(confess croak);
 use PublicInbox::DS qw(dwaitpid);
 use PublicInbox::Spawn;
-use POSIX ();
+use POSIX qw(WNOHANG);
 use Socket qw(AF_UNIX MSG_EOR);
 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
 use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF();
@@ -112,7 +112,7 @@ sub ipc_worker_spawn {
 		$w_res->autoflush(1);
 		$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
 		local $0 = $ident;
-		PublicInbox::DS::sig_setmask($oldset);
+		PublicInbox::DS::sig_setmask($sigset);
 		my $on_destroy = $self->ipc_atfork_child;
 		eval { ipc_worker_loop($self, $r_req, $w_res) };
 		die "worker $ident PID:$$ died: $@\n" if $@;
@@ -229,12 +229,13 @@ sub ipc_sibling_atfork_child {
 	$pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
 }
 
-sub wq_worker_loop ($$) {
-	my ($self, $s2) = @_;
+sub wq_worker_loop ($) {
+	my ($self) = @_;
 	my $buf;
 	my $len = $self->{wq_req_len} // (4096 * 33);
 	my ($rec, $sub, @args);
-	while (1) {
+	my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
+	until ($self->{-wq_quit}) {
 		my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
 		my $i = 0;
 		my @m = @{$self->{wq_open_modes} // [qw( +<&= >&= >&= )]};
@@ -258,7 +259,7 @@ sub wq_worker_loop ($$) {
 
 sub wq_do { # always async
 	my ($self, $sub, $in, $out, $err, @args) = @_;
-	if (my $s1 = $self->{-wq_seq}) { # run in worker
+	if (my $s1 = $self->{-wq_s1}) { # run in worker
 		$_ = fileno($_) for ($in, $out, $err);
 		$send_cmd->($s1, $in, $out, $err,
 				freeze([$sub, @args]), MSG_EOR);
@@ -270,42 +271,86 @@ sub wq_do { # always async
 	}
 }
 
+sub _wq_worker_start ($$) {
+	my ($self, $oldset) = @_;
+	my $pid = fork // die "fork: $!";
+	if ($pid == 0) {
+		eval { PublicInbox::DS->Reset };
+		close(delete $self->{-wq_s1});
+		delete $self->{qw(-wq_workers -wq_quit)};
+		my $quit = sub { $self->{-wq_quit} = 1 };
+		$SIG{$_} = $quit for (qw(TERM INT QUIT));
+		$SIG{$_} = 'IGNORE' for (qw(TTOU TTIN));
+		local $0 = $self->{-wq_ident};
+		PublicInbox::DS::sig_setmask($oldset);
+		my $on_destroy = $self->ipc_atfork_child;
+		eval { wq_worker_loop($self) };
+		die "worker $self->{-wq_ident} PID:$$ died: $@\n" if $@;
+		exit;
+	} else {
+		$self->{-wq_workers}->{$pid} = \undef;
+	}
+}
+
 # starts workqueue workers if Sereal or Storable is installed
 sub wq_workers_start {
 	my ($self, $ident, $nr_workers, $oldset) = @_;
 	($enc && $send_cmd && $recv_cmd && defined($SEQPACKET)) or return;
-	return if $self->{-wq_seq}; # idempotent
+	return if $self->{-wq_s1}; # idempotent
 	my ($s1, $s2);
 	socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!";
-	my $sigset = $oldset // PublicInbox::DS::block_signals();
 	$self->ipc_atfork_parent;
 	$nr_workers //= 4;
+	my $sigset = $oldset // PublicInbox::DS::block_signals();
 	$self->{-wq_workers} = {};
-	for my $i (0..($nr_workers - 1)) {
-		defined(my $pid = fork) or die "fork: $!";
-		if ($pid == 0) {
-			eval { PublicInbox::DS->Reset };
-			$s1 = undef;
-			$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
-			local $0 = $ident."[$i]";
-			PublicInbox::DS::sig_setmask($oldset);
-			my $on_destroy = $self->ipc_atfork_child;
-			eval { wq_worker_loop($self, $s2) };
-			die "worker $ident PID:$$ died: $@\n" if $@;
-			exit;
-		} else {
-			$self->{-wq_workers}->{$pid} = $i;
-		}
-	}
+	$self->{-wq_ident} = $ident;
+	$self->{-wq_s1} = $s1;
+	$self->{-wq_s2} = $s2;
+	_wq_worker_start($self, $sigset) for (1..$nr_workers);
 	PublicInbox::DS::sig_setmask($sigset) unless $oldset;
-	$s2 = undef;
-	$self->{-wq_seq} = $s1;
 	$self->{-wq_ppid} = $$;
 }
 
+sub wq_worker_incr { # SIGTTIN handler
+	my ($self, $oldset) = @_;
+	$self->{-wq_s2} or return;
+	$self->ipc_atfork_parent;
+	my $sigset = $oldset // PublicInbox::DS::block_signals();
+	_wq_worker_start($self, $sigset);
+	PublicInbox::DS::sig_setmask($sigset) unless $oldset;
+}
+
+sub wq_exit { # wakes up wq_worker_decr_wait
+	send($_[0]->{-wq_s2}, $$, MSG_EOR) // die "$$ send: $!";
+	exit;
+}
+
+sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
+	my ($self) = @_;
+	my $workers = $self->{-wq_workers} or return;
+	my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2';
+	$self->wq_do('wq_exit', $s2, $s2, $s2);
+	$self->{-wq_exit_pending}++;
+	# caller must call wq_worker_decr_wait in main loop
+}
+
+sub wq_worker_decr_wait {
+	my ($self, $timeout) = @_;
+	return if $self->{-wq_ppid} != $$; # can't reap siblings or parents
+	my $s1 = $self->{-wq_s1} // croak 'BUG: no wq_s1';
+	vec(my $rin = '', fileno($s1), 1) = 1;
+	select(my $rout = $rin, undef, undef, $timeout) or
+		croak 'timed out waiting for wq_exit';
+	recv($s1, my $pid, 64, 0) // croak "recv: $!";
+	my $workers = $self->{-wq_workers} // croak 'BUG: no wq_workers';
+	delete $workers->{$pid} // croak "BUG: PID:$pid invalid";
+	$self->{-wq_exit_pending}--;
+	dwaitpid($pid, \&ipc_worker_reap, $self);
+}
+
 sub wq_close {
 	my ($self) = @_;
-	delete $self->{-wq_seq} or return;
+	delete @$self{qw(-wq_s1 -wq_s2)} or return;
 	my $ppid = delete $self->{-wq_ppid} // die 'BUG: no wq_ppid';
 	my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
 	return if $ppid != $$; # can't reap siblings or parents
diff --git a/t/ipc.t b/t/ipc.t
index f09f76ef..51e347c6 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -168,4 +168,13 @@ like($warn[0], qr/ wq_do: /, '1st warned from wq_do');
 like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker');
 is($warn[2], $warn[1], 'worker did not die');
 
+$SIG{__WARN__} = 'DEFAULT';
+is($ipc->wq_workers_start('wq', 1), $$, 'workers started again');
+is(scalar(keys %{$ipc->{-wq_workers}}), 1, '1 worker started');
+$ipc->wq_worker_incr;
+is(scalar(keys %{$ipc->{-wq_workers}}), 2, 'worker count bumped');
+$ipc->wq_worker_decr;
+$ipc->wq_worker_decr_wait(10);
+is(scalar(keys %{$ipc->{-wq_workers}}), 1, 'worker count lowered');
+
 done_testing;

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 10/22] ipc: drop -ipc_parent_pid field
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
                   ` (8 preceding siblings ...)
  2021-01-10 12:15 ` [PATCH 09/22] ipc: wq: support dynamic worker count change Eric Wong
@ 2021-01-10 12:15 ` Eric Wong
  2021-01-10 12:15 ` [PATCH 11/22] ipc: DESTROY and wq_workers methods Eric Wong
                   ` (11 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

It is not used anywhere.
---
 lib/PublicInbox/IPC.pm | 1 -
 1 file changed, 1 deletion(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 5bca3627..4d29532c 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -107,7 +107,6 @@ sub ipc_worker_spawn {
 	defined(my $pid = fork) or die "fork: $!";
 	if ($pid == 0) {
 		eval { PublicInbox::DS->Reset };
-		$self->{-ipc_parent_pid} = $parent;
 		$w_req = $r_res = undef;
 		$w_res->autoflush(1);
 		$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 11/22] ipc: DESTROY and wq_workers methods
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
                   ` (9 preceding siblings ...)
  2021-01-10 12:15 ` [PATCH 10/22] ipc: drop -ipc_parent_pid field Eric Wong
@ 2021-01-10 12:15 ` Eric Wong
  2021-01-10 12:15 ` [PATCH 12/22] lei: rename $w to $wpager for warning message Eric Wong
                   ` (10 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

We'll enable automatic cleanup when IPC classes go out-of-scope
to avoid leaving zombies around.

->wq_workers will be a useful convenience method to change
worker counts.
---
 lib/PublicInbox/IPC.pm | 23 +++++++++++++++++++++--
 t/ipc.t                | 10 +++++++---
 2 files changed, 28 insertions(+), 5 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 4d29532c..8a3120c9 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -276,7 +276,7 @@ sub _wq_worker_start ($$) {
 	if ($pid == 0) {
 		eval { PublicInbox::DS->Reset };
 		close(delete $self->{-wq_s1});
-		delete $self->{qw(-wq_workers -wq_quit)};
+		delete $self->{qw(-wq_workers -wq_quit -wq_ppid)};
 		my $quit = sub { $self->{-wq_quit} = 1 };
 		$SIG{$_} = $quit for (qw(TERM INT QUIT));
 		$SIG{$_} = 'IGNORE' for (qw(TTOU TTIN));
@@ -347,10 +347,24 @@ sub wq_worker_decr_wait {
 	dwaitpid($pid, \&ipc_worker_reap, $self);
 }
 
+# set or retrieve number of workers
+sub wq_workers {
+	my ($self, $nr) = @_;
+	my $cur = $self->{-wq_workers} or return;
+	if (defined $nr) {
+		while (scalar(keys(%$cur)) > $nr) {
+			$self->wq_worker_decr;
+			$self->wq_worker_decr_wait;
+		}
+		$self->wq_worker_incr while scalar(keys(%$cur)) < $nr;
+	}
+	scalar(keys(%$cur));
+}
+
 sub wq_close {
 	my ($self) = @_;
 	delete @$self{qw(-wq_s1 -wq_s2)} or return;
-	my $ppid = delete $self->{-wq_ppid} // die 'BUG: no wq_ppid';
+	my $ppid = delete $self->{-wq_ppid} or return;
 	my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers';
 	return if $ppid != $$; # can't reap siblings or parents
 	for my $pid (keys %$workers) {
@@ -358,4 +372,9 @@ sub wq_close {
 	}
 }
 
+sub DESTROY {
+	wq_close($_[0]);
+	ipc_worker_stop($_[0]);
+}
+
 1;
diff --git a/t/ipc.t b/t/ipc.t
index 51e347c6..903294c5 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -170,11 +170,15 @@ is($warn[2], $warn[1], 'worker did not die');
 
 $SIG{__WARN__} = 'DEFAULT';
 is($ipc->wq_workers_start('wq', 1), $$, 'workers started again');
-is(scalar(keys %{$ipc->{-wq_workers}}), 1, '1 worker started');
+is($ipc->wq_workers, 1, '1 worker started');
 $ipc->wq_worker_incr;
-is(scalar(keys %{$ipc->{-wq_workers}}), 2, 'worker count bumped');
+is($ipc->wq_workers, 2, 'worker count bumped');
 $ipc->wq_worker_decr;
 $ipc->wq_worker_decr_wait(10);
-is(scalar(keys %{$ipc->{-wq_workers}}), 1, 'worker count lowered');
+is($ipc->wq_workers, 1, 'worker count lowered');
+is($ipc->wq_workers(2), 2, 'worker count set');
+is($ipc->wq_workers, 2, 'worker count stayed set');
+$ipc->wq_close;
+is($ipc->wq_workers, undef, 'workers undef after close');
 
 done_testing;

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 12/22] lei: rename $w to $wpager for warning message
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
                   ` (10 preceding siblings ...)
  2021-01-10 12:15 ` [PATCH 11/22] ipc: DESTROY and wq_workers methods Eric Wong
@ 2021-01-10 12:15 ` Eric Wong
  2021-01-10 12:15 ` [PATCH 13/22] lei: fix oneshot TTY detection by passing STD*{GLOB} Eric Wong
                   ` (9 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

Perl keeps track of the variable name for error messages
when auto-closing an FD fails, so this will help identify
the source of a close error..
---
 lib/PublicInbox/LEI.pm | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 1f4ed0f6..24f5930b 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -603,10 +603,10 @@ sub start_pager {
 	$env->{LV} //= '-c';
 	$env->{COLUMNS} //= 80; # TODO TIOCGWINSZ
 	$env->{MORE} //= 'FRX' if $^O eq 'freebsd';
-	pipe(my ($r, $w)) or return warn "pipe: $!";
+	pipe(my ($r, $wpager)) or return warn "pipe: $!";
 	my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} };
-	$self->{1} = $w;
-	$self->{2} = $w if -t $self->{2};
+	$self->{1} = $wpager;
+	$self->{2} = $wpager if -t $self->{2};
 	my $pid = spawn([$pager], $env, $rdr);
 	dwaitpid($pid, undef, $self->{sock});
 	$env->{GIT_PAGER_IN_USE} = 'true'; # we may spawn git

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 13/22] lei: fix oneshot TTY detection by passing STD*{GLOB}
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
                   ` (11 preceding siblings ...)
  2021-01-10 12:15 ` [PATCH 12/22] lei: rename $w to $wpager for warning message Eric Wong
@ 2021-01-10 12:15 ` Eric Wong
  2021-01-10 12:15 ` [PATCH 14/22] lei: query: ensure pager exit is instantaneous Eric Wong
                   ` (8 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

...  instead of STD*{IO}.  I'm not sure why *STDOUT{IO} being an
IO::File object disqualifies it from the "-t" perlop check
returning true on TTY, but it does.  So use *STDOUT{GLOB} for
now.

http://nntp.perl.org/group/perl.perl5.porters/258760
Message-ID: <X/kgIqIuh4ZtUZNR@dcvr>
---
 lib/PublicInbox/LEI.pm | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 24f5930b..17023191 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -795,9 +795,9 @@ sub oneshot {
 	local %PATH2CFG;
 	umask(077) // die("umask(077): $!");
 	dispatch((bless {
-		0 => *STDIN{IO},
-		1 => *STDOUT{IO},
-		2 => *STDERR{IO},
+		0 => *STDIN{GLOB},
+		1 => *STDOUT{GLOB},
+		2 => *STDERR{GLOB},
 		env => \%ENV
 	}, __PACKAGE__), @ARGV);
 }

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 14/22] lei: query: ensure pager exit is instantaneous
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
                   ` (12 preceding siblings ...)
  2021-01-10 12:15 ` [PATCH 13/22] lei: fix oneshot TTY detection by passing STD*{GLOB} Eric Wong
@ 2021-01-10 12:15 ` Eric Wong
  2021-01-10 12:15 ` [PATCH 15/22] ipc: start supporting sending/receiving more than 3 FDs Eric Wong
                   ` (7 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

Improve interactivity and user experience by allowing the user
to return to the terminal immediately when the pager is exited
(e.g. hitting the `q' key in less(1)).

This is a massive change which restructures query handling to
allow parallel search when --thread expansion is in use and
offloading to a separate worker when --thread is not in use.

The Xapian query offload changes allow us to reenter the event
loop right away once the search(es) are shipped off to the work
queue workers.

This means the main lei-daemon process can forget the lei(1)
client socket immediately once it's handed off to worker
processes.

We now unblock SIGPIPE in query workers and send an exit(141)
response to the lei(1) client socket to denote SIGPIPE.

This also allows parallelization for users using "lei q" from
multiple terminals.

JSON output is currently broken and will need to be restructured
for more flexibility and fork-safety.
---
 lib/PublicInbox/IPC.pm        |  14 +++--
 lib/PublicInbox/LEI.pm        |  34 +++++++++++-
 lib/PublicInbox/LeiQuery.pm   | 102 +++++++++-------------------------
 lib/PublicInbox/LeiXSearch.pm |  80 +++++++++++++++++++++++++-
 4 files changed, 147 insertions(+), 83 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 8a3120c9..be5b2f45 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -234,6 +234,9 @@ sub wq_worker_loop ($) {
 	my $len = $self->{wq_req_len} // (4096 * 33);
 	my ($rec, $sub, @args);
 	my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
+	local $SIG{PIPE} = sub {
+		die(bless(\"$_[0]", __PACKAGE__.'::PIPE')) if $sub;
+	};
 	until ($self->{-wq_quit}) {
 		my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
 		my $i = 0;
@@ -242,6 +245,7 @@ sub wq_worker_loop ($) {
 			my $mode = shift(@m);
 			if (open(my $fh, $mode, $fd)) {
 				$self->{$i++} = $fh;
+				$fh->autoflush(1);
 			} else {
 				die "$$ open($mode$fd) (FD:$i): $!";
 			}
@@ -251,8 +255,10 @@ sub wq_worker_loop ($) {
 			die "thaw error on buffer of size:".length($buf);
 		($sub, @args) = @$rec;
 		eval { $self->$sub(@args) };
-		warn "$$ wq_worker: $@" if $@;
-		delete @$self{0, 1, 2};
+		warn "$$ wq_worker: $@" if $@ && ref $@ ne __PACKAGE__.'::PIPE';
+		undef $sub; # quiet SIG{PIPE} handler
+		# need to close explicitly to avoid warnings after SIGPIPE
+		close($_) for (delete(@$self{0..2}));
 	}
 }
 
@@ -284,8 +290,8 @@ sub _wq_worker_start ($$) {
 		PublicInbox::DS::sig_setmask($oldset);
 		my $on_destroy = $self->ipc_atfork_child;
 		eval { wq_worker_loop($self) };
-		die "worker $self->{-wq_ident} PID:$$ died: $@\n" if $@;
-		exit;
+		warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
+		exit($@ ? 1 : 0);
 	} else {
 		$self->{-wq_workers}->{$pid} = \undef;
 	}
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 17023191..f8b8cd4a 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -269,6 +269,33 @@ sub fail ($$;$) {
 	undef;
 }
 
+# usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq));
+sub atfork_child_wq {
+	my ($self, $wq) = @_;
+	$self->{sock} //= $wq->{0};
+	$self->{$_} //= $wq->{$_} for (0..2);
+	my $oldpipe = $SIG{PIPE};
+	(
+		__WARN__ => sub { err($self, @_) },
+		PIPE => sub {
+			$self->x_it(141);
+			$oldpipe->() if ref($oldpipe) eq 'CODE';
+		}
+	);
+}
+
+# usage: ($lei, @io) = $lei->atfork_prepare_wq($wq);
+sub atfork_prepare_wq {
+	my ($self, $wq) = @_;
+	if ($wq->wq_workers) {
+		my $ret = bless { %$self }, ref($self);
+		my $in = delete $ret->{0};
+		($ret, delete($ret->{sock}) // $in, delete @$ret{1, 2});
+	} else {
+		($self, ($self->{sock} // $self->{0}), @$self{1, 2});
+	}
+}
+
 sub _help ($;$) {
 	my ($self, $errmsg) = @_;
 	my $cmd = $self->{cmd} // 'COMMAND';
@@ -608,8 +635,8 @@ sub start_pager {
 	$self->{1} = $wpager;
 	$self->{2} = $wpager if -t $self->{2};
 	my $pid = spawn([$pager], $env, $rdr);
-	dwaitpid($pid, undef, $self->{sock});
 	$env->{GIT_PAGER_IN_USE} = 'true'; # we may spawn git
+	[ $pid, @$rdr{1, 2} ];
 }
 
 sub accept_dispatch { # Listener {post_accept} callback
@@ -675,6 +702,8 @@ sub event_step {
 
 sub noop {}
 
+our $oldset; sub oldset { $oldset }
+
 # lei(1) calls this when it can't connect
 sub lazy_start {
 	my ($path, $errno, $nfd) = @_;
@@ -691,7 +720,7 @@ sub lazy_start {
 	my @st = stat($path) or die "stat($path): $!";
 	my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
 	pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
-	my $oldset = PublicInbox::DS::block_signals();
+	local $oldset = PublicInbox::DS::block_signals();
 	if ($nfd == 1) {
 		require PublicInbox::CmdIPC1;
 		$recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1');
@@ -737,6 +766,7 @@ sub lazy_start {
 	};
 	my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
 	local %SIG = (%SIG, %$sig) if !$sigfd;
+	local $SIG{PIPE} = 'IGNORE';
 	if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets
 		PublicInbox::DS->SetLoopTimeout(5000);
 	} else {
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index f69dccad..040c284d 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -8,7 +8,7 @@ use v5.10.1;
 use PublicInbox::MID qw($MID_EXTRACT);
 use POSIX qw(strftime);
 use PublicInbox::Address qw(pairs);
-use PublicInbox::Search qw(get_pct);
+use PublicInbox::DS qw(dwaitpid);
 
 sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
 
@@ -61,37 +61,30 @@ sub lei_q {
 	my $sto = $self->_lei_store(1);
 	my $cfg = $self->_lei_cfg(1);
 	my $opt = $self->{opt};
-	my $qstr = join(' ', map {;
-		# Consider spaces in argv to be for phrase search in Xapian.
-		# In other words, the users should need only care about
-		# normal shell quotes and not have to learn Xapian quoting.
-		/\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_
-	} @argv);
-	$opt->{limit} //= 10000;
-	my $lxs;
 	require PublicInbox::LeiDedupe;
 	my $dd = PublicInbox::LeiDedupe->new($self);
 
 	# --local is enabled by default
-	my @src = $opt->{'local'} ? ($sto->search) : ();
+	# src: LeiXSearch || LeiSearch || Inbox
+	my @srcs = $opt->{'local'} ? ($sto->search) : ();
+	require PublicInbox::LeiXSearch;
+	my $lxs = PublicInbox::LeiXSearch->new;
 
 	# --external is enabled by default, but allow --no-external
 	if ($opt->{external} // 1) {
-		$self->_externals_each(\&_vivify_external, \@src);
-		# {tid} is not unique between indices, so we have to search
-		# each src individually
-		if (!$opt->{thread}) {
-			require PublicInbox::LeiXSearch;
-			my $lxs = PublicInbox::LeiXSearch->new;
-			# local is always first
-			$lxs->attach_external($_) for @src;
-			@src = ($lxs);
-		}
+		$self->_externals_each(\&_vivify_external, \@srcs);
 	}
-	my $out = $self->{output} // '-';
+	my $j = $opt->{jobs} // scalar(@srcs) > 4 ? 4 : scalar(@srcs);
+	$j = 1 if !$opt->{thread};
+	if ($self->{pid}) {
+		$lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
+			// $self->wq_workers($j);
+	}
+	my $out = $opt->{output} // '-';
 	$out = 'json:/dev/stdout' if $out eq '-';
 	my $isatty = -t $self->{1};
-	$self->start_pager if $isatty;
+	# no forking workers after this
+	my $pid_old12 = $self->start_pager if $isatty;
 	my $json = substr($out, 0, 5) eq 'json:' ?
 		ref(PublicInbox::Config->json)->new : undef;
 	if ($json) {
@@ -104,10 +97,14 @@ sub lei_q {
 		$json->canonical;
 	}
 
-	# src: LeiXSearch || LeiSearch || Inbox
 	my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
-	delete $mset_opt{limit} if $opt->{limit} < 0;
 	$mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
+	$mset_opt{qstr} = join(' ', map {;
+		# Consider spaces in argv to be for phrase search in Xapian.
+		# In other words, the users should need only care about
+		# normal shell quotes and not have to learn Xapian quoting.
+		/\s/ ? (s/\A(\w+:)// ? qq{$1"$_"} : qq{"$_"}) : $_
+	} @argv);
 	if (defined(my $sort = $opt->{'sort'})) {
 		if ($sort eq 'relevance') {
 			$mset_opt{relevance} = 1;
@@ -123,59 +120,12 @@ sub lei_q {
 	# descending docid order
 	$mset_opt{relevance} //= -2 if $opt->{thread};
 	# my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
-
-	# even w/o pretty, do the equivalent of a --pretty=oneline
-	# output so "lei q SEARCH_TERMS | wc -l" can be useful:
-	my $ORS = $json ? ($opt->{pretty} ? ', ' : ",\n") : "\n";
-	my $buf;
-
-	# we can generate too many records to hold in RAM, so we stream
-	# and fake a JSON array starting here:
-	$self->out('[') if $json;
-	my $emit_cb = sub {
-		my ($smsg) = @_;
-		delete @$smsg{qw(tid num)}; # only makes sense if single src
-		chomp($buf = $json->encode(_smsg_unbless($smsg)));
-	};
-	$dd->prepare_dedupe;
-	for my $src (@src) {
-		my $srch = $src->search;
-		my $over = $src->over;
-		my $smsg_for = $src->can('smsg_for'); # LeiXSearch
-		my $mo = { %mset_opt };
-		my $mset = $srch->mset($qstr, $mo);
-		my $ctx = {};
-		if ($smsg_for) {
-			for my $it ($mset->items) {
-				my $smsg = $smsg_for->($srch, $it) or next;
-				next if $dd->is_smsg_dup($smsg);
-				$self->out($buf .= $ORS) if defined $buf;
-				$smsg->{relevance} = get_pct($it);
-				$emit_cb->($smsg);
-			}
-		} else { # --thread
-			my $ids = $srch->mset_to_artnums($mset, $mo);
-			$ctx->{ids} = $ids;
-			my $i = 0;
-			my %n2p = map {
-				($ids->[$i++], get_pct($_));
-			} $mset->items;
-			undef $mset;
-			while ($over && $over->expand_thread($ctx)) {
-				for my $n (@{$ctx->{xids}}) {
-					my $t = $over->get_art($n) or next;
-					next if $dd->is_smsg_dup($t);
-					if (my $p = delete $n2p{$t->{num}}) {
-						$t->{relevance} = $p;
-					}
-					$self->out($buf .= $ORS);
-					$emit_cb->($t);
-				}
-				@{$ctx->{xids}} = ();
-			}
-		}
+	$self->{mset_opt} = \%mset_opt;
+	$lxs->do_query($self, \@srcs);
+	if ($pid_old12) {
+		$self->{$_} = $pid_old12->[$_] for (1, 2);
+		dwaitpid($pid_old12->[0], undef, $self->{sock});
 	}
-	$self->out($buf .= "]\n"); # done
 }
 
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index b670bc2f..a3010efe 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -7,7 +7,8 @@
 package PublicInbox::LeiXSearch;
 use strict;
 use v5.10.1;
-use parent qw(PublicInbox::LeiSearch);
+use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
+use PublicInbox::Search qw(get_pct);
 
 sub new {
 	my ($class) = @_;
@@ -83,4 +84,81 @@ sub recent {
 
 sub over {}
 
+sub _mset_more ($$) {
+	my ($mset, $mo) = @_;
+	my $size = $mset->size;
+	$size && (($mo->{offset} += $size) < ($mo->{limit} // 10000));
+}
+
+sub query_thread_mset { # for --thread
+	my ($self, $lei, $ibxish) = @_;
+	my ($srch, $over) = ($ibxish->search, $ibxish->over);
+	unless ($srch && $over) {
+		my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
+		warn "$desc not indexed by Xapian\n";
+		return;
+	}
+	local %SIG = (%SIG, $lei->atfork_child_wq($self));
+	my $mo = { %{$lei->{mset_opt}} };
+	my $mset;
+	do {
+		$mset = $srch->mset($mo->{qstr}, $mo);
+		my $ids = $srch->mset_to_artnums($mset, $mo);
+		my $ctx = { ids => $ids };
+		my $i = 0;
+		my %n2p = map { ($ids->[$i++], get_pct($_)) } $mset->items;
+		while ($over->expand_thread($ctx)) {
+			for my $n (@{$ctx->{xids}}) {
+				my $smsg = $over->get_art($n) or next;
+				# next if $dd->is_smsg_dup($smsg); TODO
+				if (my $p = delete $n2p{$smsg->{num}}) {
+					$smsg->{relevance} = $p;
+				}
+				print { $self->{1} } Dumper($smsg);
+				# $self->out($buf .= $ORS);
+				# $emit_cb->($smsg);
+			}
+			@{$ctx->{xids}} = ();
+		}
+	} while (_mset_more($mset, $mo));
+}
+
+sub query_mset { # non-parallel for non-"--thread" users
+	my ($self, $lei, $srcs) = @_;
+	my $mo = { %{$lei->{mset_opt}} };
+	my $mset;
+	local %SIG = (%SIG, $lei->atfork_child_wq($self));
+	$self->attach_external($_) for @$srcs;
+	do {
+		$mset = $self->mset($mo->{qstr}, $mo);
+		for my $it ($mset->items) {
+			my $smsg = smsg_for($self, $it) or next;
+			# next if $dd->is_smsg_dup($smsg);
+			$smsg->{relevance} = get_pct($it);
+			use Data::Dumper;
+			print { $self->{1} } Dumper($smsg);
+			# $self->out($buf .= $ORS) if defined $buf;
+			#$emit_cb->($smsg);
+		}
+	} while (_mset_more($mset, $mo));
+}
+
+sub do_query {
+	my ($self, $lei_orig, $srcs) = @_;
+	my ($lei, @io) = $lei_orig->atfork_prepare_wq($self);
+	$io[1]->autoflush(1);
+	$io[2]->autoflush(1);
+	if ($lei->{opt}->{thread}) {
+		for my $ibxish (@$srcs) {
+			$self->wq_do('query_thread_mset', @io, $lei, $ibxish);
+		}
+	} else {
+		$self->wq_do('query_mset', @io, $lei, $srcs);
+	}
+	# TODO
+	for my $rmt (@{$self->{remotes} // []}) {
+		$self->wq_do('query_thread_mbox', @io, $lei, $rmt);
+	}
+}
+
 1;

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 15/22] ipc: start supporting sending/receiving more than 3 FDs
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
                   ` (13 preceding siblings ...)
  2021-01-10 12:15 ` [PATCH 14/22] lei: query: ensure pager exit is instantaneous Eric Wong
@ 2021-01-10 12:15 ` Eric Wong
  2021-01-10 12:15 ` [PATCH 16/22] ipc: fix IO::FDPass use with a worker limit of 1 Eric Wong
                   ` (6 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

Actually, sending 4 FDs will be useful for lei internal xsearch
work once we start accepting input from stdin.  It won't be used
with the lightweight lei(1) client, however.

For WWW (eventually), a single FD may be enough.
---
 lib/PublicInbox/CmdIPC1.pm    | 16 +++++++-----
 lib/PublicInbox/CmdIPC4.pm    | 12 +++++----
 lib/PublicInbox/IPC.pm        | 13 +++++-----
 lib/PublicInbox/LeiXSearch.pm |  6 ++---
 lib/PublicInbox/Spawn.pm      | 48 ++++++++++++++++++++---------------
 script/lei                    |  2 +-
 t/cmd_ipc.t                   |  5 ++--
 t/ipc.t                       |  6 ++---
 8 files changed, 60 insertions(+), 48 deletions(-)

diff --git a/lib/PublicInbox/CmdIPC1.pm b/lib/PublicInbox/CmdIPC1.pm
index 0eed8bed..de6e54ef 100644
--- a/lib/PublicInbox/CmdIPC1.pm
+++ b/lib/PublicInbox/CmdIPC1.pm
@@ -10,17 +10,19 @@ BEGIN { eval {
 require IO::FDPass; # XS, available in all major distros
 no warnings 'once';
 
-*send_cmd1 = sub ($$$$$$) { # (sock, in, out, err, buf, flags) = @_;
-	for (1..3) {
-		IO::FDPass::send(fileno($_[0]), $_[$_]) or
+*send_cmd1 = sub ($$$$) { # (sock, fds, buf, flags) = @_;
+	my ($sock, $fds, undef, $flags) = @_;
+	for my $fd (@$fds) {
+		IO::FDPass::send(fileno($sock), $fd) or
 					die "IO::FDPass::send: $!";
 	}
-	send($_[0], $_[4], $_[5]) or die "send $!";
+	send($sock, $_[2], $flags) or die "send $!";
 };
 
-*recv_cmd1 = sub ($$$) {
-	my ($s, undef, $len) = @_;
-	my @fds = map { IO::FDPass::recv(fileno($s)) } (0..2);
+*recv_cmd1 = sub ($$$;$) {
+	my ($s, undef, $len, $nfds) = @_;
+	$nfds //= 3;
+	my @fds = map { IO::FDPass::recv(fileno($s)) } (1..$nfds);
 	recv($s, $_[1], $len, 0) // die "recv: $!";
 	length($_[1]) == 0 ? () : @fds;
 };
diff --git a/lib/PublicInbox/CmdIPC4.pm b/lib/PublicInbox/CmdIPC4.pm
index 90fca62d..c4fcb0d6 100644
--- a/lib/PublicInbox/CmdIPC4.pm
+++ b/lib/PublicInbox/CmdIPC4.pm
@@ -13,10 +13,12 @@ require Socket::MsgHdr; # XS
 no warnings 'once';
 
 # 3 FDs per-sendmsg(2) + buffer
-*send_cmd4 = sub ($$$$$$) { # (sock, in, out, err, buf, flags) = @_;
-	my $mh = Socket::MsgHdr->new(buf => $_[4]);
-	$mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS, pack('iii', @_[1,2,3]));
-	Socket::MsgHdr::sendmsg($_[0], $mh, $_[5]) or die "sendmsg: $!";
+*send_cmd4 = sub ($$$$) { # (sock, fds, buf, flags) = @_;
+	my ($sock, $fds, undef, $flags) = @_;
+	my $mh = Socket::MsgHdr->new(buf => $_[2]);
+	$mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS,
+			pack('i' x scalar(@$fds), @$fds));
+	Socket::MsgHdr::sendmsg($sock, $mh, $flags) or die "sendmsg: $!";
 };
 
 *recv_cmd4 = sub ($$$) {
@@ -26,7 +28,7 @@ no warnings 'once';
 	$_[1] = $mh->buf;
 	return () if $r == 0;
 	my (undef, undef, $data) = $mh->cmsghdr;
-	unpack('iii', $data);
+	unpack('i' x (length($data) / 4), $data);
 };
 
 } } # /eval /BEGIN
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index be5b2f45..b0a0bfb5 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -263,16 +263,15 @@ sub wq_worker_loop ($) {
 }
 
 sub wq_do { # always async
-	my ($self, $sub, $in, $out, $err, @args) = @_;
+	my ($self, $sub, $ios, @args) = @_;
 	if (my $s1 = $self->{-wq_s1}) { # run in worker
-		$_ = fileno($_) for ($in, $out, $err);
-		$send_cmd->($s1, $in, $out, $err,
-				freeze([$sub, @args]), MSG_EOR);
+		my $fds = [ map { fileno($_) } @$ios ];
+		$send_cmd->($s1, $fds, freeze([$sub, @args]), MSG_EOR);
 	} else {
-		@$self{0, 1, 2} = ($in, $out, $err);
+		@$self{0..$#$ios} = @$ios;
 		eval { $self->$sub(@args) };
 		warn "wq_do: $@" if $@;
-		delete @$self{0, 1, 2};
+		delete @$self{0..$#$ios};
 	}
 }
 
@@ -334,7 +333,7 @@ sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
 	my ($self) = @_;
 	my $workers = $self->{-wq_workers} or return;
 	my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2';
-	$self->wq_do('wq_exit', $s2, $s2, $s2);
+	$self->wq_do('wq_exit', [ $s2, $s2, $s2 ]);
 	$self->{-wq_exit_pending}++;
 	# caller must call wq_worker_decr_wait in main loop
 }
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index a3010efe..c0df21a8 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -150,14 +150,14 @@ sub do_query {
 	$io[2]->autoflush(1);
 	if ($lei->{opt}->{thread}) {
 		for my $ibxish (@$srcs) {
-			$self->wq_do('query_thread_mset', @io, $lei, $ibxish);
+			$self->wq_do('query_thread_mset', \@io, $lei, $ibxish);
 		}
 	} else {
-		$self->wq_do('query_mset', @io, $lei, $srcs);
+		$self->wq_do('query_mset', \@io, $lei, $srcs);
 	}
 	# TODO
 	for my $rmt (@{$self->{remotes} // []}) {
-		$self->wq_do('query_thread_mbox', @io, $lei, $rmt);
+		$self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
 	}
 }
 
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index 7d0d9597..b35bf54c 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -209,20 +209,22 @@ my $fdpass = <<'FDPASS';
 #include <sys/socket.h>
 
 #if defined(CMSG_SPACE) && defined(CMSG_LEN)
-struct my_3fds { int fds[3]; };
+#define SEND_FD_CAPA 3
+#define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
 union my_cmsg {
 	struct cmsghdr hdr;
-	char pad[sizeof(struct cmsghdr)+ 8 + sizeof(struct my_3fds) + 8];
+	char pad[sizeof(struct cmsghdr) + 16 + SEND_FD_SPACE];
 };
 
-int send_cmd4(PerlIO *s, int in, int out, int err, SV *data, int flags)
+int send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags)
 {
 	struct msghdr msg = { 0 };
-	struct iovec iov;
 	union my_cmsg cmsg = { 0 };
-	int *fdp;
-	size_t i;
 	STRLEN dlen = 0;
+	struct iovec iov;
+	AV *fds = (AV *)SvRV(svfds);
+	I32 i, nfds = av_len(fds) + 1;
+	int *fdp;
 
 	if (SvOK(data)) {
 		iov.iov_base = SvPV(data, dlen);
@@ -234,16 +236,22 @@ int send_cmd4(PerlIO *s, int in, int out, int err, SV *data, int flags)
 	}
 	msg.msg_iov = &iov;
 	msg.msg_iovlen = 1;
-	msg.msg_control = &cmsg.hdr;
-	msg.msg_controllen = CMSG_SPACE(sizeof(struct my_3fds));
-
-	cmsg.hdr.cmsg_level = SOL_SOCKET;
-	cmsg.hdr.cmsg_type = SCM_RIGHTS;
-	cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(struct my_3fds));
-	fdp = (int *)CMSG_DATA(&cmsg.hdr);
-	*fdp++ = in;
-	*fdp++ = out;
-	*fdp++ = err;
+	if (nfds) {
+		if (nfds > SEND_FD_CAPA) {
+			fprintf(stderr, "FIXME: bump SEND_FD_CAPA=%d\n", nfds);
+			nfds = SEND_FD_CAPA;
+		}
+		msg.msg_control = &cmsg.hdr;
+		msg.msg_controllen = CMSG_SPACE(nfds * sizeof(int));
+		cmsg.hdr.cmsg_level = SOL_SOCKET;
+		cmsg.hdr.cmsg_type = SCM_RIGHTS;
+		cmsg.hdr.cmsg_len = CMSG_LEN(nfds * sizeof(int));
+		fdp = (int *)CMSG_DATA(&cmsg.hdr);
+		for (i = 0; i < nfds; i++) {
+			SV **fd = av_fetch(fds, i, 0);
+			*fdp++ = SvIV(*fd);
+		}
+	}
 	return sendmsg(PerlIO_fileno(s), &msg, flags) >= 0;
 }
 
@@ -263,17 +271,17 @@ void recv_cmd4(PerlIO *s, SV *buf, STRLEN n)
 	msg.msg_iov = &iov;
 	msg.msg_iovlen = 1;
 	msg.msg_control = &cmsg.hdr;
-	msg.msg_controllen = CMSG_SPACE(sizeof(struct my_3fds));
+	msg.msg_controllen = CMSG_SPACE(SEND_FD_SPACE);
 
 	i = recvmsg(PerlIO_fileno(s), &msg, 0);
 	if (i < 0)
 		croak("recvmsg: %s", strerror(errno));
 	SvCUR_set(buf, i);
 	if (i > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
-			cmsg.hdr.cmsg_type == SCM_RIGHTS &&
-			cmsg.hdr.cmsg_len == CMSG_LEN(sizeof(struct my_3fds))) {
+			cmsg.hdr.cmsg_type == SCM_RIGHTS) {
+		size_t len = cmsg.hdr.cmsg_len;
 		int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
-		for (i = 0; i < 3; i++)
+		for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++)
 			Inline_Stack_Push(sv_2mortal(newSViv(*fdp++)));
 	}
 	Inline_Stack_Done;
diff --git a/script/lei b/script/lei
index d954b9eb..5e30f4d7 100755
--- a/script/lei
+++ b/script/lei
@@ -67,7 +67,7 @@ Falling back to (slow) one-shot mode
 	$buf .= "\0\0";
 	select $sock;
 	$| = 1; # unbuffer selected $sock
-	$send_cmd->($sock, 0, 1, 2, $buf, 0);
+	$send_cmd->($sock, [ 0, 1, 2 ], $buf, 0);
 	while ($buf = <$sock>) {
 		$buf =~ /\Aexit=([0-9]+)\n\z/ and exit($1 + 0);
 		die $buf;
diff --git a/t/cmd_ipc.t b/t/cmd_ipc.t
index b9f4d128..22f73c19 100644
--- a/t/cmd_ipc.t
+++ b/t/cmd_ipc.t
@@ -17,7 +17,8 @@ my $do_test = sub { SKIP: {
 	my ($s1, $s2);
 	my $src = 'some payload' x 40;
 	socketpair($s1, $s2, AF_UNIX, $type, 0) or BAIL_OUT $!;
-	$send->($s1, fileno($r), fileno($w), fileno($s1), $src, $flag);
+	my $sfds = [ fileno($r), fileno($w), fileno($s1) ];
+	$send->($s1, $sfds, $src, $flag);
 	my (@fds) = $recv->($s2, my $buf, length($src) + 1);
 	is($buf, $src, 'got buffer payload '.$desc);
 	my ($r1, $w1, $s1a);
@@ -39,7 +40,7 @@ my $do_test = sub { SKIP: {
 	if (defined($SOCK_SEQPACKET) && $type == $SOCK_SEQPACKET) {
 		$r1 = $w1 = $s1a = undef;
 		$src = (',' x 1023) . '-' .('.' x 1024);
-		$send->($s1, fileno($r), fileno($w), fileno($s1), $src, $flag);
+		$send->($s1, $sfds, $src, $flag);
 		(@fds) = $recv->($s2, $buf, 1024);
 		is($buf, (',' x 1023) . '-', 'silently truncated buf');
 		$opens->();
diff --git a/t/ipc.t b/t/ipc.t
index 903294c5..d2b6ad4f 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -121,7 +121,7 @@ $warn->autoflush(0);
 local $SIG{__WARN__} = sub { print $warn "PID:$$ ", @_ };
 my @ppids;
 for my $t ('local', 'worker', 'worker again') {
-	$ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, 'hello world');
+	$ipc->wq_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world');
 	my $i = 0;
 	for my $fh ($ra, $rb, $rc) {
 		my $buf = readline($fh);
@@ -129,7 +129,7 @@ for my $t ('local', 'worker', 'worker again') {
 		like($buf, qr/\Ai=$i \d+ hello world\z/, "got expected ($t)");
 		$i++;
 	}
-	$ipc->wq_do('test_die', $wa, $wb, $wc);
+	$ipc->wq_do('test_die', [ $wa, $wb, $wc ]);
 	my $ppid = $ipc->wq_workers_start('wq', 1);
 	push(@ppids, $ppid);
 }
@@ -142,7 +142,7 @@ SKIP: {
 	my $pid = fork // BAIL_OUT $!;
 	if ($pid == 0) {
 		use POSIX qw(_exit);
-		$ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, $$);
+		$ipc->wq_do('test_write_each_fd', [ $wa, $wb, $wc ], $$);
 		_exit(0);
 	} else {
 		my $i = 0;

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 16/22] ipc: fix IO::FDPass use with a worker limit of 1
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
                   ` (14 preceding siblings ...)
  2021-01-10 12:15 ` [PATCH 15/22] ipc: start supporting sending/receiving more than 3 FDs Eric Wong
@ 2021-01-10 12:15 ` Eric Wong
  2021-01-10 12:15 ` [PATCH 17/22] ipc: drop unused fields, default sighandlers for wq Eric Wong
                   ` (5 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

IO::FDPass is our last choice for implementing the workqueue
because its lack of atomicity makes it impossible to guarantee
all requests of a single group hit a single worker out of many.

So the only way to use IO::FDPass for workqueues it to only have
a single worker.  A single worker still buys us a small amount
of parallelism because of the parent process.
---
 lib/PublicInbox/IPC.pm | 34 +++++++++++++++++++++++++++++++---
 t/ipc.t                | 18 +++++++++++-------
 2 files changed, 42 insertions(+), 10 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index b0a0bfb5..e6a1082c 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -12,6 +12,7 @@ use POSIX qw(WNOHANG);
 use Socket qw(AF_UNIX MSG_EOR);
 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
 use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF();
+my $WQ_MAX_WORKERS = 4096;
 my ($enc, $dec);
 # ->imports at BEGIN turns sereal_*_with_object into custom ops on 5.14+
 # and eliminate method call overhead
@@ -36,17 +37,39 @@ if ($enc && $dec) { # should be custom ops
 	} // warn("Storable (part of Perl) missing: $@\n");
 }
 
+my $recv_cmd1; # PublicInbox::CmdIPC1::recv_cmd1;
 my $recv_cmd = PublicInbox::Spawn->can('recv_cmd4');
 my $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do {
 	require PublicInbox::CmdIPC4;
 	$recv_cmd //= PublicInbox::CmdIPC4->can('recv_cmd4');
 	PublicInbox::CmdIPC4->can('send_cmd4');
 } // do {
+	# IO::FDPass only allows sending a single FD at-a-time, which
+	# means we can't guarantee all packets end up on the same worker,
+	# so we cap WQ_MAX_WORKERS
 	require PublicInbox::CmdIPC1;
-	$recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1');
+	$recv_cmd1 = PublicInbox::CmdIPC1->can('recv_cmd1');
+	$WQ_MAX_WORKERS = 1 if $recv_cmd1;
+	wq_set_recv_fds(3);
 	PublicInbox::CmdIPC1->can('send_cmd1');
 };
 
+# needed to tell recv_cmd1 how many times to loop IO::FDPass::recv
+sub wq_set_recv_fds {
+	return unless $recv_cmd1;
+	my $nfds = pop;
+	my $sub = sub {
+		my ($sock, $fds, undef, $flags) = @_;
+		$recv_cmd1->($sock, $fds, $_[2], $flags, $nfds);
+	};
+	my $self = pop;
+	if (ref $self) {
+		$self->{-wq_recv_cmd} = $sub;
+	} else {
+		$recv_cmd = $sub;
+	}
+}
+
 sub _get_rec ($) {
 	my ($r) = @_;
 	defined(my $len = <$r>) or return;
@@ -237,8 +260,9 @@ sub wq_worker_loop ($) {
 	local $SIG{PIPE} = sub {
 		die(bless(\"$_[0]", __PACKAGE__.'::PIPE')) if $sub;
 	};
+	my $rcv = $self->{-wq_recv_cmd} // $recv_cmd;
 	until ($self->{-wq_quit}) {
-		my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
+		my (@fds) = $rcv->($s2, $buf, $len) or return; # EOF
 		my $i = 0;
 		my @m = @{$self->{wq_open_modes} // [qw( +<&= >&= >&= )]};
 		for my $fd (@fds) {
@@ -305,6 +329,7 @@ sub wq_workers_start {
 	socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!";
 	$self->ipc_atfork_parent;
 	$nr_workers //= 4;
+	$nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS;
 	my $sigset = $oldset // PublicInbox::DS::block_signals();
 	$self->{-wq_workers} = {};
 	$self->{-wq_ident} = $ident;
@@ -318,6 +343,7 @@ sub wq_workers_start {
 sub wq_worker_incr { # SIGTTIN handler
 	my ($self, $oldset) = @_;
 	$self->{-wq_s2} or return;
+	return if wq_workers($self) >= $WQ_MAX_WORKERS;
 	$self->ipc_atfork_parent;
 	my $sigset = $oldset // PublicInbox::DS::block_signals();
 	_wq_worker_start($self, $sigset);
@@ -331,7 +357,7 @@ sub wq_exit { # wakes up wq_worker_decr_wait
 
 sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
 	my ($self) = @_;
-	my $workers = $self->{-wq_workers} or return;
+	return unless wq_workers($self);
 	my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2';
 	$self->wq_do('wq_exit', [ $s2, $s2, $s2 ]);
 	$self->{-wq_exit_pending}++;
@@ -377,6 +403,8 @@ sub wq_close {
 	}
 }
 
+sub WQ_MAX_WORKERS { $WQ_MAX_WORKERS }
+
 sub DESTROY {
 	wq_close($_[0]);
 	ipc_worker_stop($_[0]);
diff --git a/t/ipc.t b/t/ipc.t
index d2b6ad4f..fd290809 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -171,13 +171,17 @@ is($warn[2], $warn[1], 'worker did not die');
 $SIG{__WARN__} = 'DEFAULT';
 is($ipc->wq_workers_start('wq', 1), $$, 'workers started again');
 is($ipc->wq_workers, 1, '1 worker started');
-$ipc->wq_worker_incr;
-is($ipc->wq_workers, 2, 'worker count bumped');
-$ipc->wq_worker_decr;
-$ipc->wq_worker_decr_wait(10);
-is($ipc->wq_workers, 1, 'worker count lowered');
-is($ipc->wq_workers(2), 2, 'worker count set');
-is($ipc->wq_workers, 2, 'worker count stayed set');
+SKIP: {
+	$ipc->WQ_MAX_WORKERS > 1 or
+		skip 'Inline::C or Socket::MsgHdr not available', 4;
+	$ipc->wq_worker_incr;
+	is($ipc->wq_workers, 2, 'worker count bumped');
+	$ipc->wq_worker_decr;
+	$ipc->wq_worker_decr_wait(10);
+	is($ipc->wq_workers, 1, 'worker count lowered');
+	is($ipc->wq_workers(2), 2, 'worker count set');
+	is($ipc->wq_workers, 2, 'worker count stayed set');
+}
 $ipc->wq_close;
 is($ipc->wq_workers, undef, 'workers undef after close');
 

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 17/22] ipc: drop unused fields, default sighandlers for wq
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
                   ` (15 preceding siblings ...)
  2021-01-10 12:15 ` [PATCH 16/22] ipc: fix IO::FDPass use with a worker limit of 1 Eric Wong
@ 2021-01-10 12:15 ` Eric Wong
  2021-01-10 12:15 ` [PATCH 18/22] lei: get rid of client {pid} field Eric Wong
                   ` (4 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

Relying on signal handlers to kill a particular worker was a
laggy/racy idea and I gave up on the idea of targetting workers
explicitly and instead chose to make wq_worker_decr stop the
next idle worker ->wq_exit.

We will however attempt to support sending signals to
a process group.
---
 lib/PublicInbox/IPC.pm | 9 +++------
 1 file changed, 3 insertions(+), 6 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index e6a1082c..4db4b8ea 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -261,7 +261,7 @@ sub wq_worker_loop ($) {
 		die(bless(\"$_[0]", __PACKAGE__.'::PIPE')) if $sub;
 	};
 	my $rcv = $self->{-wq_recv_cmd} // $recv_cmd;
-	until ($self->{-wq_quit}) {
+	while (1) {
 		my (@fds) = $rcv->($s2, $buf, $len) or return; # EOF
 		my $i = 0;
 		my @m = @{$self->{wq_open_modes} // [qw( +<&= >&= >&= )]};
@@ -305,10 +305,9 @@ sub _wq_worker_start ($$) {
 	if ($pid == 0) {
 		eval { PublicInbox::DS->Reset };
 		close(delete $self->{-wq_s1});
-		delete $self->{qw(-wq_workers -wq_quit -wq_ppid)};
-		my $quit = sub { $self->{-wq_quit} = 1 };
-		$SIG{$_} = $quit for (qw(TERM INT QUIT));
+		delete $self->{qw(-wq_workers -wq_ppid)};
 		$SIG{$_} = 'IGNORE' for (qw(TTOU TTIN));
+		$SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT));
 		local $0 = $self->{-wq_ident};
 		PublicInbox::DS::sig_setmask($oldset);
 		my $on_destroy = $self->ipc_atfork_child;
@@ -360,7 +359,6 @@ sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
 	return unless wq_workers($self);
 	my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2';
 	$self->wq_do('wq_exit', [ $s2, $s2, $s2 ]);
-	$self->{-wq_exit_pending}++;
 	# caller must call wq_worker_decr_wait in main loop
 }
 
@@ -374,7 +372,6 @@ sub wq_worker_decr_wait {
 	recv($s1, my $pid, 64, 0) // croak "recv: $!";
 	my $workers = $self->{-wq_workers} // croak 'BUG: no wq_workers';
 	delete $workers->{$pid} // croak "BUG: PID:$pid invalid";
-	$self->{-wq_exit_pending}--;
 	dwaitpid($pid, \&ipc_worker_reap, $self);
 }
 

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 18/22] lei: get rid of client {pid} field
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
                   ` (16 preceding siblings ...)
  2021-01-10 12:15 ` [PATCH 17/22] ipc: drop unused fields, default sighandlers for wq Eric Wong
@ 2021-01-10 12:15 ` Eric Wong
  2021-01-10 12:15 ` [PATCH 19/22] lei: fork + FD cleanup Eric Wong
                   ` (3 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

Using kill(2) is too dangerous since extremely long
queries may mean the original PID of the aborted lei(1)
client process to be recycled by a new process.  It would
be bad if the lei_xsearch worker process issued a kill
on the wrong process.

So just rely on sending the exit message via socket.
---
 lib/PublicInbox/LEI.pm      | 18 +++++++-----------
 lib/PublicInbox/LeiQuery.pm |  2 +-
 script/lei                  |  2 +-
 3 files changed, 9 insertions(+), 13 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index f8b8cd4a..0cbf342c 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -240,15 +240,12 @@ my %CONFIG_KEYS = (
 sub x_it ($$) { # pronounced "exit"
 	my ($self, $code) = @_;
 	$self->{1}->autoflush(1); # make sure client sees stdout before exit
-	if (my $sig = ($code & 127)) {
-		kill($sig, $self->{pid} // $$);
-	} else {
-		$code >>= 8;
-		if (my $sock = $self->{sock}) {
-			say $sock "exit=$code";
-		} else { # for oneshot
-			$quit->($code);
-		}
+	my $sig = ($code & 127);
+	$code >>= 8 unless $sig;
+	if (my $sock = $self->{sock}) {
+		say $sock "exit=$code";
+	} else { # for oneshot
+		$quit->($code);
 	}
 }
 
@@ -675,13 +672,12 @@ sub accept_dispatch { # Listener {post_accept} callback
 		say $sock "request command truncated";
 		return;
 	}
-	my ($client_pid, $argc, @argv) = split(/\0/, $buf, -1);
+	my ($argc, @argv) = split(/\0/, $buf, -1);
 	undef $buf;
 	my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
 	if (chdir($env{PWD})) {
 		local %ENV = %env;
 		$self->{env} = \%env;
-		$self->{pid} = $client_pid + 0;
 		eval { dispatch($self, @argv) };
 		say $sock $@ if $@;
 	} else {
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 040c284d..d5376be5 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -76,7 +76,7 @@ sub lei_q {
 	}
 	my $j = $opt->{jobs} // scalar(@srcs) > 4 ? 4 : scalar(@srcs);
 	$j = 1 if !$opt->{thread};
-	if ($self->{pid}) {
+	if ($self->{sock}) {
 		$lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
 			// $self->wq_workers($j);
 	}
diff --git a/script/lei b/script/lei
index 5e30f4d7..bea06b2c 100755
--- a/script/lei
+++ b/script/lei
@@ -62,7 +62,7 @@ Falling back to (slow) one-shot mode
 	1;
 }) { # (Socket::MsgHdr|IO::FDPass|Inline::C), $sock, $pwd are all available:
 	local $ENV{PWD} = $pwd;
-	my $buf = join("\0", $$, scalar(@ARGV), @ARGV);
+	my $buf = join("\0", scalar(@ARGV), @ARGV);
 	while (my ($k, $v) = each %ENV) { $buf .= "\0$k=$v" }
 	$buf .= "\0\0";
 	select $sock;

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 19/22] lei: fork + FD cleanup
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
                   ` (17 preceding siblings ...)
  2021-01-10 12:15 ` [PATCH 18/22] lei: get rid of client {pid} field Eric Wong
@ 2021-01-10 12:15 ` Eric Wong
  2021-01-10 12:15 ` [PATCH 20/22] lei: run pager in client script Eric Wong
                   ` (2 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

Do a better job of closing FDs that we don't want shared with
the work queue workers.  We'll also fix naming and use
"atfork_prepare" instead of "atfork_parent" to match
pthread_atfork(3) naming.
---
 lib/PublicInbox/IPC.pm        | 57 +++++++++++++++++++++++------------
 lib/PublicInbox/LEI.pm        | 18 +++++++++--
 lib/PublicInbox/LeiQuery.pm   |  7 +++--
 lib/PublicInbox/LeiXSearch.pm | 11 +++++--
 4 files changed, 68 insertions(+), 25 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 4db4b8ea..88f81e47 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -126,7 +126,7 @@ sub ipc_worker_spawn {
 	pipe(my ($r_res, $w_res)) or die "pipe: $!";
 	my $sigset = $oldset // PublicInbox::DS::block_signals();
 	my $parent = $$;
-	$self->ipc_atfork_parent;
+	$self->ipc_atfork_prepare;
 	defined(my $pid = fork) or die "fork: $!";
 	if ($pid == 0) {
 		eval { PublicInbox::DS->Reset };
@@ -155,8 +155,14 @@ sub ipc_worker_reap { # dwaitpid callback
 }
 
 # for base class, override in sub classes
-sub ipc_atfork_parent {}
-sub ipc_atfork_child {}
+sub ipc_atfork_prepare {}
+
+sub ipc_atfork_child {
+	my ($self) = @_;
+	my $io = delete($self->{-ipc_atfork_child_close}) or return;
+	close($_) for @$io;
+	undef;
+}
 
 # idempotent, can be called regardless of whether worker is active or not
 sub ipc_worker_stop {
@@ -251,14 +257,21 @@ sub ipc_sibling_atfork_child {
 	$pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
 }
 
+sub _close_recvd ($) {
+	my ($self) = @_;
+	close($_) for (grep { defined } (delete @$self{0..2}));
+}
+
 sub wq_worker_loop ($) {
 	my ($self) = @_;
 	my $buf;
 	my $len = $self->{wq_req_len} // (4096 * 33);
-	my ($rec, $sub, @args);
+	my ($sub, $args);
 	my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
 	local $SIG{PIPE} = sub {
-		die(bless(\"$_[0]", __PACKAGE__.'::PIPE')) if $sub;
+		my $cur_sub = $sub;
+		_close_recvd($self);
+		die(bless(\$cur_sub, __PACKAGE__.'::PIPE')) if $cur_sub;
 	};
 	my $rcv = $self->{-wq_recv_cmd} // $recv_cmd;
 	while (1) {
@@ -267,22 +280,25 @@ sub wq_worker_loop ($) {
 		my @m = @{$self->{wq_open_modes} // [qw( +<&= >&= >&= )]};
 		for my $fd (@fds) {
 			my $mode = shift(@m);
-			if (open(my $fh, $mode, $fd)) {
-				$self->{$i++} = $fh;
-				$fh->autoflush(1);
+			if (open(my $cmdfh, $mode, $fd)) {
+				$self->{$i++} = $cmdfh;
+				$cmdfh->autoflush(1);
 			} else {
 				die "$$ open($mode$fd) (FD:$i): $!";
 			}
 		}
-		# Sereal dies, Storable returns undef
-		$rec = thaw($buf) //
+		# Sereal dies on truncated data, Storable returns undef
+		$args = thaw($buf) //
 			die "thaw error on buffer of size:".length($buf);
-		($sub, @args) = @$rec;
-		eval { $self->$sub(@args) };
+		eval {
+			$sub = shift @$args;
+			eval { $self->$sub(@$args) };
+			undef $sub; # quiet SIG{PIPE} handler
+			die $@ if $@;
+		};
 		warn "$$ wq_worker: $@" if $@ && ref $@ ne __PACKAGE__.'::PIPE';
-		undef $sub; # quiet SIG{PIPE} handler
 		# need to close explicitly to avoid warnings after SIGPIPE
-		close($_) for (delete(@$self{0..2}));
+		_close_recvd($self);
 	}
 }
 
@@ -306,14 +322,17 @@ sub _wq_worker_start ($$) {
 		eval { PublicInbox::DS->Reset };
 		close(delete $self->{-wq_s1});
 		delete $self->{qw(-wq_workers -wq_ppid)};
-		$SIG{$_} = 'IGNORE' for (qw(TTOU TTIN));
-		$SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT));
+		$SIG{$_} = 'IGNORE' for (qw(PIPE TTOU TTIN));
+		$SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT CHLD));
 		local $0 = $self->{-wq_ident};
 		PublicInbox::DS::sig_setmask($oldset);
+		# ensure we properly exit even if warn() dies:
+		my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
 		my $on_destroy = $self->ipc_atfork_child;
 		eval { wq_worker_loop($self) };
 		warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
-		exit($@ ? 1 : 0);
+		undef $on_destroy;
+		undef $end; # trigger exit
 	} else {
 		$self->{-wq_workers}->{$pid} = \undef;
 	}
@@ -326,7 +345,7 @@ sub wq_workers_start {
 	return if $self->{-wq_s1}; # idempotent
 	my ($s1, $s2);
 	socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!";
-	$self->ipc_atfork_parent;
+	$self->ipc_atfork_prepare;
 	$nr_workers //= 4;
 	$nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS;
 	my $sigset = $oldset // PublicInbox::DS::block_signals();
@@ -343,7 +362,7 @@ sub wq_worker_incr { # SIGTTIN handler
 	my ($self, $oldset) = @_;
 	$self->{-wq_s2} or return;
 	return if wq_workers($self) >= $WQ_MAX_WORKERS;
-	$self->ipc_atfork_parent;
+	$self->ipc_atfork_prepare;
 	my $sigset = $oldset // PublicInbox::DS::block_signals();
 	_wq_worker_start($self, $sigset);
 	PublicInbox::DS::sig_setmask($sigset) unless $oldset;
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 0cbf342c..1ef0cbec 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -33,6 +33,7 @@ my $GLP_PASS = Getopt::Long::Parser->new;
 $GLP_PASS->configure(qw(gnu_getopt no_ignore_case auto_abbrev pass_through));
 
 our %PATH2CFG; # persistent for socket daemon
+our @TO_CLOSE_ATFORK_CHILD;
 
 # TBD: this is a documentation mechanism to show a subcommand
 # (may) pass options through to another command:
@@ -266,12 +267,20 @@ sub fail ($$;$) {
 	undef;
 }
 
+sub atfork_prepare_wq {
+	my ($self, $wq) = @_;
+	push @{$wq->{-ipc_atfork_child_close}}, @TO_CLOSE_ATFORK_CHILD,
+				grep { defined } @$self{qw(0 1 2 sock)}
+}
+
 # usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq));
 sub atfork_child_wq {
 	my ($self, $wq) = @_;
 	$self->{sock} //= $wq->{0};
 	$self->{$_} //= $wq->{$_} for (0..2);
 	my $oldpipe = $SIG{PIPE};
+	%PATH2CFG = ();
+	@TO_CLOSE_ATFORK_CHILD = ();
 	(
 		__WARN__ => sub { err($self, @_) },
 		PIPE => sub {
@@ -281,11 +290,14 @@ sub atfork_child_wq {
 	);
 }
 
-# usage: ($lei, @io) = $lei->atfork_prepare_wq($wq);
-sub atfork_prepare_wq {
+# usage: ($lei, @io) = $lei->atfork_parent_wq($wq);
+sub atfork_parent_wq {
 	my ($self, $wq) = @_;
 	if ($wq->wq_workers) {
+		my $env = delete $self->{env}; # env is inherited at fork
 		my $ret = bless { %$self }, ref($self);
+		$self->{env} = $env;
+		delete @$ret{qw(-lei_store cfg)};
 		my $in = delete $ret->{0};
 		($ret, delete($ret->{sock}) // $in, delete @$ret{1, 2});
 	} else {
@@ -738,6 +750,7 @@ sub lazy_start {
 	return if $pid;
 	$0 = "lei-daemon $path";
 	local %PATH2CFG;
+	local @TO_CLOSE_ATFORK_CHILD = ($l, $eof_r, $eof_w);
 	$_->blocking(0) for ($l, $eof_r, $eof_w);
 	$l = PublicInbox::Listener->new($l, \&accept_dispatch, $l);
 	my $exit_code;
@@ -764,6 +777,7 @@ sub lazy_start {
 	local %SIG = (%SIG, %$sig) if !$sigfd;
 	local $SIG{PIPE} = 'IGNORE';
 	if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets
+		push @TO_CLOSE_ATFORK_CHILD, $sigfd->{sock};
 		PublicInbox::DS->SetLoopTimeout(5000);
 	} else {
 		# wake up every second to accept signals if we don't
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index d5376be5..9a383cef 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -66,7 +66,7 @@ sub lei_q {
 
 	# --local is enabled by default
 	# src: LeiXSearch || LeiSearch || Inbox
-	my @srcs = $opt->{'local'} ? ($sto->search) : ();
+	my @srcs;
 	require PublicInbox::LeiXSearch;
 	my $lxs = PublicInbox::LeiXSearch->new;
 
@@ -74,12 +74,15 @@ sub lei_q {
 	if ($opt->{external} // 1) {
 		$self->_externals_each(\&_vivify_external, \@srcs);
 	}
-	my $j = $opt->{jobs} // scalar(@srcs) > 4 ? 4 : scalar(@srcs);
+	my $j = $opt->{jobs} // scalar(@srcs) > 3 ? 3 : scalar(@srcs);
 	$j = 1 if !$opt->{thread};
+	$j++ if $opt->{'local'}; # for sto->search below
 	if ($self->{sock}) {
+		$self->atfork_prepare_wq($lxs);
 		$lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
 			// $self->wq_workers($j);
 	}
+	unshift(@srcs, $sto->search) if $opt->{'local'};
 	my $out = $opt->{output} // '-';
 	$out = 'json:/dev/stdout' if $out eq '-';
 	my $isatty = -t $self->{1};
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index c0df21a8..b4172734 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -9,6 +9,7 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
 use PublicInbox::Search qw(get_pct);
+use Sys::Syslog qw(syslog);
 
 sub new {
 	my ($class) = @_;
@@ -92,13 +93,13 @@ sub _mset_more ($$) {
 
 sub query_thread_mset { # for --thread
 	my ($self, $lei, $ibxish) = @_;
+	local %SIG = (%SIG, $lei->atfork_child_wq($self));
 	my ($srch, $over) = ($ibxish->search, $ibxish->over);
 	unless ($srch && $over) {
 		my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
 		warn "$desc not indexed by Xapian\n";
 		return;
 	}
-	local %SIG = (%SIG, $lei->atfork_child_wq($self));
 	my $mo = { %{$lei->{mset_opt}} };
 	my $mset;
 	do {
@@ -145,7 +146,7 @@ sub query_mset { # non-parallel for non-"--thread" users
 
 sub do_query {
 	my ($self, $lei_orig, $srcs) = @_;
-	my ($lei, @io) = $lei_orig->atfork_prepare_wq($self);
+	my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
 	$io[1]->autoflush(1);
 	$io[2]->autoflush(1);
 	if ($lei->{opt}->{thread}) {
@@ -161,4 +162,10 @@ sub do_query {
 	}
 }
 
+sub ipc_atfork_child {
+	my ($self) = @_;
+	$SIG{__WARN__} = sub { syslog('warning', "@_") };
+	$self->SUPER::ipc_atfork_child; # PublicInbox::IPC
+}
+
 1;

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 20/22] lei: run pager in client script
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
                   ` (18 preceding siblings ...)
  2021-01-10 12:15 ` [PATCH 19/22] lei: fork + FD cleanup Eric Wong
@ 2021-01-10 12:15 ` Eric Wong
  2021-01-10 12:15 ` [PATCH 21/22] lei_xsearch: transfer 4 FDs internally, drop IO::FDPass Eric Wong
  2021-01-10 12:15 ` [PATCH 22/22] lei: query: restore JSON output overview Eric Wong
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

While most single keystrokes work fine when the pager is
launched from the background daemon, Ctrl-C and WINCH can cause
strangeness when connected to the wrong terminal.
---
 lib/PublicInbox/LEI.pm      | 26 +++++++++++++++++++-------
 lib/PublicInbox/LeiQuery.pm |  5 +++--
 script/lei                  | 28 +++++++++++++++++++++++++---
 3 files changed, 47 insertions(+), 12 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 1ef0cbec..d19fb311 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -26,7 +26,7 @@ use Text::Wrap qw(wrap);
 use File::Path qw(mkpath);
 use File::Spec;
 our $quit = \&CORE::exit;
-my $recv_cmd;
+my ($recv_cmd, $send_cmd);
 my $GLP = Getopt::Long::Parser->new;
 $GLP->configure(qw(gnu_getopt no_ignore_case auto_abbrev));
 my $GLP_PASS = Getopt::Long::Parser->new;
@@ -244,7 +244,8 @@ sub x_it ($$) { # pronounced "exit"
 	my $sig = ($code & 127);
 	$code >>= 8 unless $sig;
 	if (my $sock = $self->{sock}) {
-		say $sock "exit=$code";
+		my $fds = [ map { fileno($_) } @$self{0..2} ];
+		$send_cmd->($sock, $fds, "exit=$code\n", 0);
 	} else { # for oneshot
 		$quit->($code);
 	}
@@ -635,15 +636,23 @@ sub start_pager {
 	chomp(my $pager = <$fh> // '');
 	close($fh) or warn "`git var PAGER' error: \$?=$?";
 	return if $pager eq 'cat' || $pager eq '';
-	$env->{LESS} //= 'FRX';
-	$env->{LV} //= '-c';
-	$env->{COLUMNS} //= 80; # TODO TIOCGWINSZ
-	$env->{MORE} //= 'FRX' if $^O eq 'freebsd';
+	# TODO TIOCGWINSZ
+	my %new_env = (LESS => 'FRX', LV => '-c', COLUMNS => 80);
+	$new_env{MORE} = 'FRX' if $^O eq 'freebsd';
 	pipe(my ($r, $wpager)) or return warn "pipe: $!";
 	my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} };
+	my $pid;
+	if (my $sock = $self->{sock}) { # lei(1) process runs it
+		delete @new_env{keys %$env}; # only set iff unset
+		my $buf = "exec 1\0".$pager;
+		while (my ($k, $v) = each %new_env) { $buf .= "\0$k=$v" };
+		my $fds = [ map { fileno($_) } @$rdr{0..2} ];
+		$send_cmd->($sock, $fds, $buf .= "\n", 0);
+	} else {
+		$pid = spawn([$pager], $env, $rdr);
+	}
 	$self->{1} = $wpager;
 	$self->{2} = $wpager if -t $self->{2};
-	my $pid = spawn([$pager], $env, $rdr);
 	$env->{GIT_PAGER_IN_USE} = 'true'; # we may spawn git
 	[ $pid, @$rdr{1, 2} ];
 }
@@ -731,10 +740,13 @@ sub lazy_start {
 	local $oldset = PublicInbox::DS::block_signals();
 	if ($nfd == 1) {
 		require PublicInbox::CmdIPC1;
+		$send_cmd = PublicInbox::CmdIPC1->can('send_cmd1');
 		$recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1');
 	} elsif ($nfd == 4) {
+		$send_cmd = PublicInbox::Spawn->can('send_cmd4');
 		$recv_cmd = PublicInbox::Spawn->can('recv_cmd4') // do {
 			require PublicInbox::CmdIPC4;
+			$send_cmd = PublicInbox::CmdIPC4->can('send_cmd4');
 			PublicInbox::CmdIPC4->can('recv_cmd4');
 		};
 	}
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 9a383cef..6e778785 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -125,9 +125,10 @@ sub lei_q {
 	# my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
 	$self->{mset_opt} = \%mset_opt;
 	$lxs->do_query($self, \@srcs);
-	if ($pid_old12) {
+	if ($pid_old12) { # [ pid, stdout, stderr ]
+		my $pid = $pid_old12->[0];
 		$self->{$_} = $pid_old12->[$_] for (1, 2);
-		dwaitpid($pid_old12->[0], undef, $self->{sock});
+		dwaitpid($pid, undef, $self->{sock}) if $pid;
 	}
 }
 
diff --git a/script/lei b/script/lei
index bea06b2c..aac8fa94 100755
--- a/script/lei
+++ b/script/lei
@@ -6,16 +6,33 @@ use v5.10.1;
 use Socket qw(AF_UNIX SOCK_STREAM pack_sockaddr_un);
 use PublicInbox::CmdIPC4;
 my $narg = 4;
+my $recv_cmd = PublicInbox::CmdIPC4->can('recv_cmd4');
 my $send_cmd = PublicInbox::CmdIPC4->can('send_cmd4') // do {
 	require PublicInbox::CmdIPC1; # 2nd choice
 	$narg = 1;
+	$recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1');
 	PublicInbox::CmdIPC1->can('send_cmd1');
 } // do {
 	require PublicInbox::Spawn; # takes ~50ms even if built *sigh*
 	$narg = 4;
+	$recv_cmd = PublicInbox::Spawn->can('recv_cmd4');
 	PublicInbox::Spawn->can('send_cmd4');
 };
 
+sub exec_cmd {
+	my ($fds, $argc, @argv) = @_;
+	my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
+	my @m = (*STDIN{IO}, '<&=',  *STDOUT{IO}, '>&=',
+		*STDERR{IO}, '>&=');
+	for my $fd (@$fds) {
+		my ($old_io, $mode) = splice(@m, 0, 2);
+		open($old_io, $mode, $fd) or die "open $mode$fd: $!";
+	}
+	%ENV = (%ENV, %env);
+	exec(@argv);
+	die "exec: @argv: $!";
+}
+
 my ($sock, $pwd);
 if ($send_cmd && eval {
 	my $path = do {
@@ -68,9 +85,14 @@ Falling back to (slow) one-shot mode
 	select $sock;
 	$| = 1; # unbuffer selected $sock
 	$send_cmd->($sock, [ 0, 1, 2 ], $buf, 0);
-	while ($buf = <$sock>) {
-		$buf =~ /\Aexit=([0-9]+)\n\z/ and exit($1 + 0);
-		die $buf;
+	while (my (@fds) = $recv_cmd->($sock, $buf, 4096 * 33)) {
+		if ($buf =~ /\Aexit=([0-9]+)\n\z/) {
+			exit($1);
+		} elsif ($buf =~ /\Aexec (.+)\n\z/) {
+			exec_cmd(\@fds, split(/\0/, $1));
+		} else {
+			die $buf;
+		}
 	}
 } else { # for systems lacking Socket::MsgHdr, IO::FDPass or Inline::C
 	warn $@ if $@;

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 21/22] lei_xsearch: transfer 4 FDs internally, drop IO::FDPass
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
                   ` (19 preceding siblings ...)
  2021-01-10 12:15 ` [PATCH 20/22] lei: run pager in client script Eric Wong
@ 2021-01-10 12:15 ` Eric Wong
  2021-01-10 12:15 ` [PATCH 22/22] lei: query: restore JSON output overview Eric Wong
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

It's easier to make the code more generic by transferring
all four FDs (std(in|out|err) + socket) instead of omitting
stdin.

We'll be reading from stdin on some imports, and possibly
outputting to stdout, so omitting stdin now would needlessly
complicate things.

The differences with IO::FDPass "1" code paths and the "4"
code paths used by Inline::C and Socket::MsgHdr are far too
much to support and test at the moment.
---
 MANIFEST                      |  1 -
 lib/PublicInbox/CmdIPC1.pm    | 32 --------------------
 lib/PublicInbox/IPC.pm        | 45 ++++++++-------------------
 lib/PublicInbox/LEI.pm        | 57 +++++++++++++++++++++--------------
 lib/PublicInbox/LeiQuery.pm   |  9 ++----
 lib/PublicInbox/LeiXSearch.pm |  6 ++++
 lib/PublicInbox/Spawn.pm      |  2 +-
 script/lei                    |  9 ++----
 t/cmd_ipc.t                   |  9 ------
 t/ipc.t                       | 53 +++++++++++++++++---------------
 t/lei.t                       |  6 ++--
 11 files changed, 89 insertions(+), 140 deletions(-)
 delete mode 100644 lib/PublicInbox/CmdIPC1.pm

diff --git a/MANIFEST b/MANIFEST
index 62c14cd2..caddd8df 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -109,7 +109,6 @@ lib/PublicInbox/Admin.pm
 lib/PublicInbox/AdminEdit.pm
 lib/PublicInbox/AltId.pm
 lib/PublicInbox/Cgit.pm
-lib/PublicInbox/CmdIPC1.pm
 lib/PublicInbox/CmdIPC4.pm
 lib/PublicInbox/CompressNoop.pm
 lib/PublicInbox/Config.pm
diff --git a/lib/PublicInbox/CmdIPC1.pm b/lib/PublicInbox/CmdIPC1.pm
deleted file mode 100644
index de6e54ef..00000000
--- a/lib/PublicInbox/CmdIPC1.pm
+++ /dev/null
@@ -1,32 +0,0 @@
-# Copyright (C) 2021 all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-
-# callers should use PublicInbox::CmdIPC1->can('send_cmd1') (or recv_cmd1)
-# 2nd choice for lei(1) front-end and 3rd choice for lei internals
-package PublicInbox::CmdIPC1;
-use strict;
-use v5.10.1;
-BEGIN { eval {
-require IO::FDPass; # XS, available in all major distros
-no warnings 'once';
-
-*send_cmd1 = sub ($$$$) { # (sock, fds, buf, flags) = @_;
-	my ($sock, $fds, undef, $flags) = @_;
-	for my $fd (@$fds) {
-		IO::FDPass::send(fileno($sock), $fd) or
-					die "IO::FDPass::send: $!";
-	}
-	send($sock, $_[2], $flags) or die "send $!";
-};
-
-*recv_cmd1 = sub ($$$;$) {
-	my ($s, undef, $len, $nfds) = @_;
-	$nfds //= 3;
-	my @fds = map { IO::FDPass::recv(fileno($s)) } (1..$nfds);
-	recv($s, $_[1], $len, 0) // die "recv: $!";
-	length($_[1]) == 0 ? () : @fds;
-};
-
-} } # /eval /BEGIN
-
-1;
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 88f81e47..c54fcc64 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -37,37 +37,16 @@ if ($enc && $dec) { # should be custom ops
 	} // warn("Storable (part of Perl) missing: $@\n");
 }
 
-my $recv_cmd1; # PublicInbox::CmdIPC1::recv_cmd1;
 my $recv_cmd = PublicInbox::Spawn->can('recv_cmd4');
 my $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do {
 	require PublicInbox::CmdIPC4;
 	$recv_cmd //= PublicInbox::CmdIPC4->can('recv_cmd4');
 	PublicInbox::CmdIPC4->can('send_cmd4');
-} // do {
-	# IO::FDPass only allows sending a single FD at-a-time, which
-	# means we can't guarantee all packets end up on the same worker,
-	# so we cap WQ_MAX_WORKERS
-	require PublicInbox::CmdIPC1;
-	$recv_cmd1 = PublicInbox::CmdIPC1->can('recv_cmd1');
-	$WQ_MAX_WORKERS = 1 if $recv_cmd1;
-	wq_set_recv_fds(3);
-	PublicInbox::CmdIPC1->can('send_cmd1');
 };
 
-# needed to tell recv_cmd1 how many times to loop IO::FDPass::recv
-sub wq_set_recv_fds {
-	return unless $recv_cmd1;
-	my $nfds = pop;
-	my $sub = sub {
-		my ($sock, $fds, undef, $flags) = @_;
-		$recv_cmd1->($sock, $fds, $_[2], $flags, $nfds);
-	};
-	my $self = pop;
-	if (ref $self) {
-		$self->{-wq_recv_cmd} = $sub;
-	} else {
-		$recv_cmd = $sub;
-	}
+sub wq_set_recv_modes {
+	my ($self, @modes) = @_;
+	$self->{-wq_recv_modes} = \@modes;
 }
 
 sub _get_rec ($) {
@@ -259,7 +238,9 @@ sub ipc_sibling_atfork_child {
 
 sub _close_recvd ($) {
 	my ($self) = @_;
-	close($_) for (grep { defined } (delete @$self{0..2}));
+	my $x = $self->{-wq_recv_modes};
+	my $end = $x ? $#$x : 2;
+	close($_) for (grep { defined } (delete @$self{0..$end}));
 }
 
 sub wq_worker_loop ($) {
@@ -271,13 +252,12 @@ sub wq_worker_loop ($) {
 	local $SIG{PIPE} = sub {
 		my $cur_sub = $sub;
 		_close_recvd($self);
-		die(bless(\$cur_sub, __PACKAGE__.'::PIPE')) if $cur_sub;
+		die(bless(\$cur_sub, 'PublicInbox::SIGPIPE')) if $cur_sub;
 	};
-	my $rcv = $self->{-wq_recv_cmd} // $recv_cmd;
 	while (1) {
-		my (@fds) = $rcv->($s2, $buf, $len) or return; # EOF
+		my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
 		my $i = 0;
-		my @m = @{$self->{wq_open_modes} // [qw( +<&= >&= >&= )]};
+		my @m = @{$self->{-wq_recv_modes} // [qw( +<&= >&= >&= )]};
 		for my $fd (@fds) {
 			my $mode = shift(@m);
 			if (open(my $cmdfh, $mode, $fd)) {
@@ -296,7 +276,8 @@ sub wq_worker_loop ($) {
 			undef $sub; # quiet SIG{PIPE} handler
 			die $@ if $@;
 		};
-		warn "$$ wq_worker: $@" if $@ && ref $@ ne __PACKAGE__.'::PIPE';
+		warn "$$ wq_worker: $@" if $@ &&
+					ref($@) ne 'PublicInbox::SIGPIPE';
 		# need to close explicitly to avoid warnings after SIGPIPE
 		_close_recvd($self);
 	}
@@ -310,8 +291,8 @@ sub wq_do { # always async
 	} else {
 		@$self{0..$#$ios} = @$ios;
 		eval { $self->$sub(@args) };
-		warn "wq_do: $@" if $@;
-		delete @$self{0..$#$ios};
+		warn "wq_do: $@" if $@ && ref($@) ne 'PublicInbox::SIGPIPE';
+		delete @$self{0..$#$ios}; # don't close
 	}
 }
 
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index d19fb311..7313738e 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -277,8 +277,9 @@ sub atfork_prepare_wq {
 # usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq));
 sub atfork_child_wq {
 	my ($self, $wq) = @_;
-	$self->{sock} //= $wq->{0};
-	$self->{$_} //= $wq->{$_} for (0..2);
+	return () if $self->{0}; # did not fork
+	$self->{$_} = $wq->{$_} for (0..2);
+	$self->{sock} = $wq->{3} // die 'BUG: no {sock}'; # may be undef
 	my $oldpipe = $SIG{PIPE};
 	%PATH2CFG = ();
 	@TO_CLOSE_ATFORK_CHILD = ();
@@ -298,11 +299,10 @@ sub atfork_parent_wq {
 		my $env = delete $self->{env}; # env is inherited at fork
 		my $ret = bless { %$self }, ref($self);
 		$self->{env} = $env;
-		delete @$ret{qw(-lei_store cfg)};
-		my $in = delete $ret->{0};
-		($ret, delete($ret->{sock}) // $in, delete @$ret{1, 2});
+		delete @$ret{qw(-lei_store cfg pgr)};
+		($ret, delete @$ret{qw(0 1 2 sock)});
 	} else {
-		($self, ($self->{sock} // $self->{0}), @$self{1, 2});
+		($self, @$self{qw(0 1 2 sock)});
 	}
 }
 
@@ -641,7 +641,7 @@ sub start_pager {
 	$new_env{MORE} = 'FRX' if $^O eq 'freebsd';
 	pipe(my ($r, $wpager)) or return warn "pipe: $!";
 	my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} };
-	my $pid;
+	my $pgr = [ undef, @$rdr{1, 2} ];
 	if (my $sock = $self->{sock}) { # lei(1) process runs it
 		delete @new_env{keys %$env}; # only set iff unset
 		my $buf = "exec 1\0".$pager;
@@ -649,12 +649,23 @@ sub start_pager {
 		my $fds = [ map { fileno($_) } @$rdr{0..2} ];
 		$send_cmd->($sock, $fds, $buf .= "\n", 0);
 	} else {
-		$pid = spawn([$pager], $env, $rdr);
+		$pgr->[0] = spawn([$pager], $env, $rdr);
 	}
 	$self->{1} = $wpager;
 	$self->{2} = $wpager if -t $self->{2};
 	$env->{GIT_PAGER_IN_USE} = 'true'; # we may spawn git
-	[ $pid, @$rdr{1, 2} ];
+	$self->{pgr} = $pgr;
+}
+
+sub stop_pager {
+	my ($self) = @_;
+	my $pgr = delete($self->{pgr}) or return;
+	my $pid = $pgr->[0];
+	close $self->{1};
+	# {2} may not be redirected
+	$self->{1} = $pgr->[1];
+	$self->{2} = $pgr->[2];
+	dwaitpid($pid, undef, $self->{sock}) if $pid;
 }
 
 sub accept_dispatch { # Listener {post_accept} callback
@@ -738,11 +749,7 @@ sub lazy_start {
 	my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
 	pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
 	local $oldset = PublicInbox::DS::block_signals();
-	if ($nfd == 1) {
-		require PublicInbox::CmdIPC1;
-		$send_cmd = PublicInbox::CmdIPC1->can('send_cmd1');
-		$recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1');
-	} elsif ($nfd == 4) {
+	if ($nfd == 4) {
 		$send_cmd = PublicInbox::Spawn->can('send_cmd4');
 		$recv_cmd = PublicInbox::Spawn->can('recv_cmd4') // do {
 			require PublicInbox::CmdIPC4;
@@ -751,7 +758,7 @@ sub lazy_start {
 		};
 	}
 	$recv_cmd or die <<"";
-(Socket::MsgHdr || IO::FDPass || Inline::C) missing/unconfigured (nfd=$nfd);
+(Socket::MsgHdr || Inline::C) missing/unconfigured (nfd=$nfd);
 
 	require PublicInbox::Listener;
 	require PublicInbox::EOFpipe;
@@ -839,19 +846,24 @@ sub lazy_start {
 	exit($exit_code // 0);
 }
 
-# for users w/o IO::FDPass
+# for users w/o Socket::Msghdr
 sub oneshot {
 	my ($main_pkg) = @_;
 	my $exit = $main_pkg->can('exit'); # caller may override exit()
 	local $quit = $exit if $exit;
 	local %PATH2CFG;
 	umask(077) // die("umask(077): $!");
-	dispatch((bless {
-		0 => *STDIN{GLOB},
-		1 => *STDOUT{GLOB},
-		2 => *STDERR{GLOB},
-		env => \%ENV
-	}, __PACKAGE__), @ARGV);
+	local $SIG{PIPE} = sub { die(bless(\"$_[0]", 'PublicInbox::SIGPIPE')) };
+	eval {
+		my $self = bless {
+			0 => *STDIN{GLOB},
+			1 => *STDOUT{GLOB},
+			2 => *STDERR{GLOB},
+			env => \%ENV
+		}, __PACKAGE__;
+		dispatch($self, @ARGV);
+	};
+	die $@ if $@ && ref($@) ne 'PublicInbox::SIGPIPE';
 }
 
 # ensures stdout hits the FS before sock disconnects so a client
@@ -859,6 +871,7 @@ sub oneshot {
 sub DESTROY {
 	my ($self) = @_;
 	$self->{1}->autoflush(1);
+	stop_pager($self);
 }
 
 1;
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 6e778785..2f4b99e5 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -80,14 +80,14 @@ sub lei_q {
 	if ($self->{sock}) {
 		$self->atfork_prepare_wq($lxs);
 		$lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
-			// $self->wq_workers($j);
+			// $lxs->wq_workers($j);
 	}
 	unshift(@srcs, $sto->search) if $opt->{'local'};
 	my $out = $opt->{output} // '-';
 	$out = 'json:/dev/stdout' if $out eq '-';
 	my $isatty = -t $self->{1};
 	# no forking workers after this
-	my $pid_old12 = $self->start_pager if $isatty;
+	$self->start_pager if $isatty;
 	my $json = substr($out, 0, 5) eq 'json:' ?
 		ref(PublicInbox::Config->json)->new : undef;
 	if ($json) {
@@ -125,11 +125,6 @@ sub lei_q {
 	# my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
 	$self->{mset_opt} = \%mset_opt;
 	$lxs->do_query($self, \@srcs);
-	if ($pid_old12) { # [ pid, stdout, stderr ]
-		my $pid = $pid_old12->[0];
-		$self->{$_} = $pid_old12->[$_] for (1, 2);
-		dwaitpid($pid, undef, $self->{sock}) if $pid;
-	}
 }
 
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index b4172734..94f7c2bc 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -168,4 +168,10 @@ sub ipc_atfork_child {
 	$self->SUPER::ipc_atfork_child; # PublicInbox::IPC
 }
 
+sub ipc_atfork_prepare {
+	my ($self) = @_;
+	$self->wq_set_recv_modes(qw[<&= >&= >&= +<&=]);
+	$self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
+}
+
 1;
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index b35bf54c..ef822e1b 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -209,7 +209,7 @@ my $fdpass = <<'FDPASS';
 #include <sys/socket.h>
 
 #if defined(CMSG_SPACE) && defined(CMSG_LEN)
-#define SEND_FD_CAPA 3
+#define SEND_FD_CAPA 4
 #define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
 union my_cmsg {
 	struct cmsghdr hdr;
diff --git a/script/lei b/script/lei
index aac8fa94..5c32ab88 100755
--- a/script/lei
+++ b/script/lei
@@ -8,11 +8,6 @@ use PublicInbox::CmdIPC4;
 my $narg = 4;
 my $recv_cmd = PublicInbox::CmdIPC4->can('recv_cmd4');
 my $send_cmd = PublicInbox::CmdIPC4->can('send_cmd4') // do {
-	require PublicInbox::CmdIPC1; # 2nd choice
-	$narg = 1;
-	$recv_cmd = PublicInbox::CmdIPC1->can('recv_cmd1');
-	PublicInbox::CmdIPC1->can('send_cmd1');
-} // do {
 	require PublicInbox::Spawn; # takes ~50ms even if built *sigh*
 	$narg = 4;
 	$recv_cmd = PublicInbox::Spawn->can('recv_cmd4');
@@ -77,7 +72,7 @@ Falling back to (slow) one-shot mode
 		$pwd = $cwd;
 	}
 	1;
-}) { # (Socket::MsgHdr|IO::FDPass|Inline::C), $sock, $pwd are all available:
+}) { # (Socket::MsgHdr|Inline::C), $sock, $pwd are all available:
 	local $ENV{PWD} = $pwd;
 	my $buf = join("\0", scalar(@ARGV), @ARGV);
 	while (my ($k, $v) = each %ENV) { $buf .= "\0$k=$v" }
@@ -94,7 +89,7 @@ Falling back to (slow) one-shot mode
 			die $buf;
 		}
 	}
-} else { # for systems lacking Socket::MsgHdr, IO::FDPass or Inline::C
+} else { # for systems lacking Socket::MsgHdr or Inline::C
 	warn $@ if $@;
 	require PublicInbox::LEI;
 	PublicInbox::LEI::oneshot(__PACKAGE__);
diff --git a/t/cmd_ipc.t b/t/cmd_ipc.t
index 22f73c19..0a0a4e00 100644
--- a/t/cmd_ipc.t
+++ b/t/cmd_ipc.t
@@ -79,13 +79,4 @@ SKIP: {
 	}
 }
 
-SKIP: {
-	require_mods('IO::FDPass', 13);
-	require_ok 'PublicInbox::CmdIPC1';
-	$send = PublicInbox::CmdIPC1->can('send_cmd1');
-	$recv = PublicInbox::CmdIPC1->can('recv_cmd1');
-	$do_test->(SOCK_STREAM, 0, 'IO::FDPass stream');
-	$do_test->($SOCK_SEQPACKET, MSG_EOR, 'IO::FDPass seqpacket');
-}
-
 done_testing;
diff --git a/t/ipc.t b/t/ipc.t
index fd290809..22423a78 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -6,6 +6,7 @@ use v5.10.1;
 use Test::More;
 use PublicInbox::TestCommon;
 use Fcntl qw(SEEK_SET);
+require_mods(qw(Storable||Sereal));
 require_ok 'PublicInbox::IPC';
 state $once = eval <<'';
 package PublicInbox::IPC;
@@ -94,8 +95,7 @@ my $test = sub {
 };
 $test->('local');
 
-SKIP: {
-	require_mods(qw(Storable||Sereal), 16);
+{
 	my $pid = $ipc->ipc_worker_spawn('test worker');
 	ok($pid > 0 && kill(0, $pid), 'worker spawned and running');
 	defined($pid) or BAIL_OUT 'no spawn, no test';
@@ -112,7 +112,7 @@ SKIP: {
 $ipc->ipc_worker_stop; # idempotent
 
 # work queues
-$ipc->{wq_open_modes} = [qw( >&= >&= >&= )];
+$ipc->wq_set_recv_modes(qw( >&= >&= >&= ));
 pipe(my ($ra, $wa)) or BAIL_OUT $!;
 pipe(my ($rb, $wb)) or BAIL_OUT $!;
 pipe(my ($rc, $wc)) or BAIL_OUT $!;
@@ -136,7 +136,7 @@ for my $t ('local', 'worker', 'worker again') {
 
 # wq_do works across fork (siblings can feed)
 SKIP: {
-	skip 'Socket::MsgHdr, IO::FDPass, Inline::C missing', 7 if !$ppids[0];
+	skip 'Socket::MsgHdr or Inline::C missing', 3 if !$ppids[0];
 	is_deeply(\@ppids, [$$, undef, undef],
 		'parent pid returned in wq_workers_start');
 	my $pid = fork // BAIL_OUT $!;
@@ -161,28 +161,31 @@ SKIP: {
 }
 
 $ipc->wq_close;
-seek($warn, 0, SEEK_SET) or BAIL_OUT;
-my @warn = <$warn>;
-is(scalar(@warn), 3, 'warned 3 times');
-like($warn[0], qr/ wq_do: /, '1st warned from wq_do');
-like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker');
-is($warn[2], $warn[1], 'worker did not die');
-
-$SIG{__WARN__} = 'DEFAULT';
-is($ipc->wq_workers_start('wq', 1), $$, 'workers started again');
-is($ipc->wq_workers, 1, '1 worker started');
 SKIP: {
-	$ipc->WQ_MAX_WORKERS > 1 or
-		skip 'Inline::C or Socket::MsgHdr not available', 4;
-	$ipc->wq_worker_incr;
-	is($ipc->wq_workers, 2, 'worker count bumped');
-	$ipc->wq_worker_decr;
-	$ipc->wq_worker_decr_wait(10);
-	is($ipc->wq_workers, 1, 'worker count lowered');
-	is($ipc->wq_workers(2), 2, 'worker count set');
-	is($ipc->wq_workers, 2, 'worker count stayed set');
+	skip 'Socket::MsgHdr or Inline::C missing', 11 if !$ppids[0];
+	seek($warn, 0, SEEK_SET) or BAIL_OUT;
+	my @warn = <$warn>;
+	is(scalar(@warn), 3, 'warned 3 times');
+	like($warn[0], qr/ wq_do: /, '1st warned from wq_do');
+	like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker');
+	is($warn[2], $warn[1], 'worker did not die');
+
+	$SIG{__WARN__} = 'DEFAULT';
+	is($ipc->wq_workers_start('wq', 1), $$, 'workers started again');
+	is($ipc->wq_workers, 1, '1 worker started');
+	SKIP: {
+		$ipc->WQ_MAX_WORKERS > 1 or
+			skip 'Inline::C or Socket::MsgHdr not available', 4;
+		$ipc->wq_worker_incr;
+		is($ipc->wq_workers, 2, 'worker count bumped');
+		$ipc->wq_worker_decr;
+		$ipc->wq_worker_decr_wait(10);
+		is($ipc->wq_workers, 1, 'worker count lowered');
+		is($ipc->wq_workers(2), 2, 'worker count set');
+		is($ipc->wq_workers, 2, 'worker count stayed set');
+	}
+	$ipc->wq_close;
+	is($ipc->wq_workers, undef, 'workers undef after close');
 }
-$ipc->wq_close;
-is($ipc->wq_workers, undef, 'workers undef after close');
 
 done_testing;
diff --git a/t/lei.t b/t/lei.t
index 992800a5..6819f182 100644
--- a/t/lei.t
+++ b/t/lei.t
@@ -208,13 +208,11 @@ if ($ENV{TEST_LEI_ONESHOT}) {
 
 SKIP: { # real socket
 	require_mods(qw(Cwd), my $nr = 105);
-	my $nfd = eval { require Socket::MsgHdr; 4 } //
-			eval { require IO::FDPass; 1 } // do {
+	my $nfd = eval { require Socket::MsgHdr; 4 } // do {
 		require PublicInbox::Spawn;
 		PublicInbox::Spawn->can('send_cmd4') ? 4 : undef;
 	} //
-	skip 'Socket::MsgHdr, IO::FDPass or Inline::C missing or unconfigured',
-		$nr;
+	skip 'Socket::MsgHdr or Inline::C missing or unconfigured', $nr;
 
 	local $ENV{XDG_RUNTIME_DIR} = "$home/xdg_run";
 	my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/$nfd.sock";

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 22/22] lei: query: restore JSON output overview
  2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
                   ` (20 preceding siblings ...)
  2021-01-10 12:15 ` [PATCH 21/22] lei_xsearch: transfer 4 FDs internally, drop IO::FDPass Eric Wong
@ 2021-01-10 12:15 ` Eric Wong
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-01-10 12:15 UTC (permalink / raw)
  To: meta

This internal API is better suited for fork-friendliness (but
locking + dedupe still needs to be re-added).

Normal "json" is the default, though stream-friendly "concatjson"
and "jsonl" (AKA "ndjson" AKA "ldjson") all seem working
(though tests aren't working, yet).

For normal "json", the biggest downside is the necessity of a
trailing "null" element at the end of the array because of
parallel processes, since (AFAIK) regular JSON doesn't allow
trailing commas, unlike JavaScript.
---
 MANIFEST                       |   1 +
 lib/PublicInbox/LeiOverview.pm | 188 +++++++++++++++++++++++++++++++++
 lib/PublicInbox/LeiQuery.pm    |  66 +++---------
 lib/PublicInbox/LeiXSearch.pm  |  25 +++--
 4 files changed, 217 insertions(+), 63 deletions(-)
 create mode 100644 lib/PublicInbox/LeiOverview.pm

diff --git a/MANIFEST b/MANIFEST
index caddd8df..810aec42 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -166,6 +166,7 @@ lib/PublicInbox/KQNotify.pm
 lib/PublicInbox/LEI.pm
 lib/PublicInbox/LeiDedupe.pm
 lib/PublicInbox/LeiExternal.pm
+lib/PublicInbox/LeiOverview.pm
 lib/PublicInbox/LeiQuery.pm
 lib/PublicInbox/LeiSearch.pm
 lib/PublicInbox/LeiStore.pm
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
new file mode 100644
index 00000000..8a1f4f82
--- /dev/null
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -0,0 +1,188 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# per-mitem/smsg iterators for search results
+# "ovv" => "Overview viewer"
+package PublicInbox::LeiOverview;
+use strict;
+use v5.10.1;
+use POSIX qw(strftime);
+use File::Spec;
+use PublicInbox::MID qw($MID_EXTRACT);
+use PublicInbox::Address qw(pairs);
+use PublicInbox::Config;
+use PublicInbox::Search qw(get_pct);
+
+# cf. https://en.wikipedia.org/wiki/JSON_streaming
+my $JSONL = 'ldjson|ndjson|jsonl'; # 3 names for the same thing
+
+sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
+
+sub new {
+	my ($class, $lei) = @_;
+	my $opt = $lei->{opt};
+	my $out = $opt->{output} // '-';
+	$out = '/dev/stdout' if $out eq '-';
+
+	my $fmt = $opt->{'format'};
+	$fmt = lc($fmt) if defined $fmt;
+	if ($out =~ s/\A([a-z]+)://is) { # e.g. Maildir:/home/user/Mail/
+		my $ofmt = lc $1;
+		$fmt //= $ofmt;
+		return $lei->fail(<<"") if $fmt ne $ofmt;
+--format=$fmt and --output=$ofmt conflict
+
+	}
+	$fmt //= 'json' if $out eq '/dev/stdout';
+	$fmt //= 'maildir'; # TODO
+
+	if (index($out, '://') < 0) { # not a URL, so assume path
+		 $out = File::Spec->canonpath($out);
+	} # else URL
+
+	my $self = bless { fmt => $fmt, out => $out }, $class;
+	my $json;
+	if ($fmt =~ /\A($JSONL|(?:concat)?json)\z/) {
+		$json = $self->{json} = ref(PublicInbox::Config->json);
+	}
+	my ($isatty, $seekable);
+	if ($out eq '/dev/stdout') {
+		$isatty = -t $lei->{1};
+		$lei->start_pager if $isatty;
+		$opt->{pretty} //= $isatty;
+	} elsif ($json) {
+		return $lei->fail('JSON formats only output to stdout');
+	}
+	$self;
+}
+
+# called once by parent
+sub ovv_begin {
+	my ($self, $lei) = @_;
+	if ($self->{fmt} eq 'json') {
+		print { $lei->{1} } '[';
+	} # TODO HTML/Atom/...
+}
+
+# called once by parent (via PublicInbox::EOFpipe)
+sub ovv_end {
+	my ($self, $lei) = @_;
+	if ($self->{fmt} eq 'json') {
+		# JSON doesn't allow trailing commas, and preventing
+		# trailing commas is a PITA when parallelizing outputs
+		print { $lei->{1} } "null]\n";
+	} elsif ($self->{fmt} eq 'concatjson') {
+		print { $lei->{1} } "\n";
+	}
+}
+
+sub ovv_atfork_child {
+	my ($self) = @_;
+	# reopen dedupe here
+}
+
+# prepares an smsg for JSON
+sub _unbless_smsg {
+	my ($smsg, $mitem) = @_;
+
+	delete @$smsg{qw(lines bytes num tid)};
+	$smsg->{rcvd} = _iso8601(delete $smsg->{ts}); # JMAP receivedAt
+	$smsg->{dt} = _iso8601(delete $smsg->{ds}); # JMAP UTCDate
+	$smsg->{relevance} = get_pct($mitem) if $mitem;
+
+	if (my $r = delete $smsg->{references}) {
+		$smsg->{references} = [
+				map { "<$_>" } ($r =~ m/$MID_EXTRACT/go) ];
+	}
+	if (my $m = delete($smsg->{mid})) {
+		$smsg->{'m'} = "<$m>";
+	}
+	for my $f (qw(from to cc)) {
+		my $v = delete $smsg->{$f} or next;
+		$smsg->{substr($f, 0, 1)} = pairs($v);
+	}
+	$smsg->{'s'} = delete $smsg->{subject};
+	# can we be bothered to parse From/To/Cc into arrays?
+	scalar { %$smsg }; # unbless
+}
+
+sub ovv_atexit_child {
+	my ($self, $lei) = @_;
+	my $bref = delete $lei->{ovv_buf} or return;
+	print { $lei->{1} } $$bref;
+}
+
+# JSON module ->pretty output wastes too much vertical white space,
+# this (IMHO) provides better use of screen real-estate while not
+# being excessively compact:
+sub _json_pretty {
+	my ($json, $k, $v) = @_;
+	if (ref $v eq 'ARRAY') {
+		if (@$v) {
+			my $sep = ",\n" . (' ' x (length($k) + 7));
+			if (ref($v->[0])) { # f/t/c
+				$v = '[' . join($sep, map {
+					my $pair = $json->encode($_);
+					$pair =~ s/(null|"),"/$1, "/g;
+					$pair;
+				} @$v) . ']';
+			} else { # references
+				$v = '[' . join($sep, map {
+					substr($json->encode([$_]), 1, -1);
+				} @$v) . ']';
+			}
+		} else {
+			$v = '[]';
+		}
+	}
+	qq{  "$k": }.$v;
+}
+
+sub ovv_each_smsg_cb {
+	my ($self, $lei) = @_;
+	$lei->{ovv_buf} = \(my $buf = '');
+	my $json = $self->{json}->new;
+	if ($json) {
+		$json->utf8->canonical;
+		$json->ascii(1) if $lei->{opt}->{ascii};
+	}
+	if ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
+		my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
+		sub { # DIY prettiness :P
+			my ($smsg, $mitem) = @_;
+			$smsg = _unbless_smsg($smsg, $mitem);
+			$buf .= "{\n";
+			$buf .= join(",\n", map {
+				my $v = $smsg->{$_};
+				if (ref($v)) {
+					_json_pretty($json, $_, $v);
+				} else {
+					$v = $json->encode([$v]);
+					qq{  "$_": }.substr($v, 1, -1);
+				}
+			} sort keys %$smsg);
+			$buf .= $EOR;
+			if (length($buf) > 65536) {
+				print { $lei->{1} } $buf;
+				$buf = '';
+			}
+		}
+	} elsif ($json) {
+		my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL
+		sub {
+			my ($smsg, $mitem) = @_;
+			delete @$smsg{qw(tid num)};
+			$buf .= $json->encode(_unbless_smsg(@_)) . $ORS;
+			if (length($buf) > 65536) {
+				print { $lei->{1} } $buf;
+				$buf = '';
+			}
+		}
+	} elsif ($self->{fmt} eq 'oid') {
+		sub {
+			my ($smsg, $mitem) = @_;
+		}
+	} # else { ...
+}
+
+1;
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 2f4b99e5..7ca01454 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -5,43 +5,8 @@
 package PublicInbox::LeiQuery;
 use strict;
 use v5.10.1;
-use PublicInbox::MID qw($MID_EXTRACT);
-use POSIX qw(strftime);
-use PublicInbox::Address qw(pairs);
 use PublicInbox::DS qw(dwaitpid);
 
-sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
-
-# prepares an smsg for JSON
-sub _smsg_unbless ($) {
-	my ($smsg) = @_;
-
-	delete @$smsg{qw(lines bytes)};
-	$smsg->{rcvd} = _iso8601(delete $smsg->{ts}); # JMAP receivedAt
-	$smsg->{dt} = _iso8601(delete $smsg->{ds}); # JMAP UTCDate
-
-	if (my $r = delete $smsg->{references}) {
-		$smsg->{references} = [
-				map { "<$_>" } ($r =~ m/$MID_EXTRACT/go) ];
-	}
-	if (my $m = delete($smsg->{mid})) {
-		$smsg->{'m'} = "<$m>";
-	}
-	# XXX breaking to/cc, into structured arrays or tables which
-	# distinguish "$phrase <$address>" causes pretty printing JSON
-	# to take up too much vertical space.  I can't get either
-	# Cpanel::JSON::XS or JSON::XS or jq(1) only indent when
-	# wrapping is necessary, rather than blindly indenting and
-	# adding vertical space everywhere.
-	for my $f (qw(from to cc)) {
-		my $v = delete $smsg->{$f} or next;
-		$smsg->{substr($f, 0, 1)} = $v;
-	}
-	$smsg->{'s'} = delete $smsg->{subject};
-	# can we be bothered to parse From/To/Cc into arrays?
-	scalar { %$smsg }; # unbless
-}
-
 sub _vivify_external { # _externals_each callback
 	my ($src, $dir) = @_;
 	if (-f "$dir/ei.lock") {
@@ -68,6 +33,7 @@ sub lei_q {
 	# src: LeiXSearch || LeiSearch || Inbox
 	my @srcs;
 	require PublicInbox::LeiXSearch;
+	require PublicInbox::LeiOverview;
 	my $lxs = PublicInbox::LeiXSearch->new;
 
 	# --external is enabled by default, but allow --no-external
@@ -83,23 +49,9 @@ sub lei_q {
 			// $lxs->wq_workers($j);
 	}
 	unshift(@srcs, $sto->search) if $opt->{'local'};
-	my $out = $opt->{output} // '-';
-	$out = 'json:/dev/stdout' if $out eq '-';
-	my $isatty = -t $self->{1};
 	# no forking workers after this
-	$self->start_pager if $isatty;
-	my $json = substr($out, 0, 5) eq 'json:' ?
-		ref(PublicInbox::Config->json)->new : undef;
-	if ($json) {
-		if ($opt->{pretty} //= $isatty) {
-			$json->pretty(1)->space_before(0);
-			$json->indent_length($opt->{indent} // 2);
-		}
-		$json->utf8; # avoid Wide character in print warnings
-		$json->ascii(1) if $opt->{ascii}; # for "\uXXXX"
-		$json->canonical;
-	}
-
+	require PublicInbox::LeiOverview;
+	$self->{ovv} = PublicInbox::LeiOverview->new($self);
 	my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
 	$mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
 	$mset_opt{qstr} = join(' ', map {;
@@ -124,7 +76,17 @@ sub lei_q {
 	$mset_opt{relevance} //= -2 if $opt->{thread};
 	# my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
 	$self->{mset_opt} = \%mset_opt;
-	$lxs->do_query($self, \@srcs);
+	$self->{ovv}->ovv_begin($self);
+	pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
+	require PublicInbox::EOFpipe;
+	my $eof = PublicInbox::EOFpipe->new($eof_wait, \&query_done, $self);
+	$lxs->do_query($self, $qry_done, \@srcs);
+	$eof->event_step unless $self->{sock};
+}
+
+sub query_done { # PublicInbox::EOFpipe callback
+	my ($self) = @_;
+	$self->{ovv}->ovv_end($self);
 }
 
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 94f7c2bc..c030b2b2 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -8,7 +8,6 @@ package PublicInbox::LeiXSearch;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
-use PublicInbox::Search qw(get_pct);
 use Sys::Syslog qw(syslog);
 
 sub new {
@@ -102,26 +101,26 @@ sub query_thread_mset { # for --thread
 	}
 	my $mo = { %{$lei->{mset_opt}} };
 	my $mset;
+	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
 	do {
 		$mset = $srch->mset($mo->{qstr}, $mo);
 		my $ids = $srch->mset_to_artnums($mset, $mo);
 		my $ctx = { ids => $ids };
 		my $i = 0;
-		my %n2p = map { ($ids->[$i++], get_pct($_)) } $mset->items;
+		my %n2item = map { ($ids->[$i++], $_) } $mset->items;
 		while ($over->expand_thread($ctx)) {
 			for my $n (@{$ctx->{xids}}) {
 				my $smsg = $over->get_art($n) or next;
 				# next if $dd->is_smsg_dup($smsg); TODO
-				if (my $p = delete $n2p{$smsg->{num}}) {
-					$smsg->{relevance} = $p;
-				}
-				print { $self->{1} } Dumper($smsg);
+				my $mitem = delete $n2item{$smsg->{num}};
+				$each_smsg->($smsg, $mitem);
 				# $self->out($buf .= $ORS);
 				# $emit_cb->($smsg);
 			}
 			@{$ctx->{xids}} = ();
 		}
 	} while (_mset_more($mset, $mo));
+	$lei->{ovv}->ovv_atexit_child($lei);
 }
 
 sub query_mset { # non-parallel for non-"--thread" users
@@ -130,23 +129,24 @@ sub query_mset { # non-parallel for non-"--thread" users
 	my $mset;
 	local %SIG = (%SIG, $lei->atfork_child_wq($self));
 	$self->attach_external($_) for @$srcs;
+	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
 	do {
 		$mset = $self->mset($mo->{qstr}, $mo);
 		for my $it ($mset->items) {
 			my $smsg = smsg_for($self, $it) or next;
 			# next if $dd->is_smsg_dup($smsg);
-			$smsg->{relevance} = get_pct($it);
-			use Data::Dumper;
-			print { $self->{1} } Dumper($smsg);
+			$each_smsg->($smsg, $it);
 			# $self->out($buf .= $ORS) if defined $buf;
 			#$emit_cb->($smsg);
 		}
 	} while (_mset_more($mset, $mo));
+	$lei->{ovv}->ovv_atexit_child($lei);
 }
 
 sub do_query {
-	my ($self, $lei_orig, $srcs) = @_;
+	my ($self, $lei_orig, $qry_done, $srcs) = @_;
 	my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+	$io[0] = $qry_done; # don't need stdin
 	$io[1]->autoflush(1);
 	$io[2]->autoflush(1);
 	if ($lei->{opt}->{thread}) {
@@ -160,6 +160,9 @@ sub do_query {
 	for my $rmt (@{$self->{remotes} // []}) {
 		$self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
 	}
+
+	# sent off to children, they will drop remaining references to it
+	close $qry_done;
 }
 
 sub ipc_atfork_child {
@@ -170,7 +173,7 @@ sub ipc_atfork_child {
 
 sub ipc_atfork_prepare {
 	my ($self) = @_;
-	$self->wq_set_recv_modes(qw[<&= >&= >&= +<&=]);
+	$self->wq_set_recv_modes(qw[+<&= >&= >&= +<&=]);
 	$self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC
 }
 

^ permalink raw reply related	[flat|nested] 23+ messages in thread

end of thread, other threads:[~2021-01-10 12:15 UTC | newest]

Thread overview: 23+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
2021-01-10 12:14 ` [PATCH 01/22] lei query + pagination sorta working Eric Wong
2021-01-10 12:14 ` [PATCH 02/22] lei q: deduplicate smsg Eric Wong
2021-01-10 12:15 ` [PATCH 03/22] ds: block signals when reaping Eric Wong
2021-01-10 12:15 ` [PATCH 04/22] ipc: add support for asynchronous callbacks Eric Wong
2021-01-10 12:15 ` [PATCH 05/22] cmd_ipc: send FDs with buffer payload Eric Wong
2021-01-10 12:15 ` [PATCH 06/22] ipc: avoid excessive evals Eric Wong
2021-01-10 12:15 ` [PATCH 07/22] ipc: work queue support via SOCK_SEQPACKET Eric Wong
2021-01-10 12:15 ` [PATCH 08/22] ipc: eliminate ipc_worker_stop method Eric Wong
2021-01-10 12:15 ` [PATCH 09/22] ipc: wq: support dynamic worker count change Eric Wong
2021-01-10 12:15 ` [PATCH 10/22] ipc: drop -ipc_parent_pid field Eric Wong
2021-01-10 12:15 ` [PATCH 11/22] ipc: DESTROY and wq_workers methods Eric Wong
2021-01-10 12:15 ` [PATCH 12/22] lei: rename $w to $wpager for warning message Eric Wong
2021-01-10 12:15 ` [PATCH 13/22] lei: fix oneshot TTY detection by passing STD*{GLOB} Eric Wong
2021-01-10 12:15 ` [PATCH 14/22] lei: query: ensure pager exit is instantaneous Eric Wong
2021-01-10 12:15 ` [PATCH 15/22] ipc: start supporting sending/receiving more than 3 FDs Eric Wong
2021-01-10 12:15 ` [PATCH 16/22] ipc: fix IO::FDPass use with a worker limit of 1 Eric Wong
2021-01-10 12:15 ` [PATCH 17/22] ipc: drop unused fields, default sighandlers for wq Eric Wong
2021-01-10 12:15 ` [PATCH 18/22] lei: get rid of client {pid} field Eric Wong
2021-01-10 12:15 ` [PATCH 19/22] lei: fork + FD cleanup Eric Wong
2021-01-10 12:15 ` [PATCH 20/22] lei: run pager in client script Eric Wong
2021-01-10 12:15 ` [PATCH 21/22] lei_xsearch: transfer 4 FDs internally, drop IO::FDPass Eric Wong
2021-01-10 12:15 ` [PATCH 22/22] lei: query: restore JSON output overview Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).