unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
* [PATCH 0/9] more process-related cleanups
@ 2023-10-07 21:24 Eric Wong
  2023-10-07 21:24 ` [PATCH 1/9] xt/httpd-async-stream: avoid waitpid call Eric Wong
                   ` (8 more replies)
  0 siblings, 9 replies; 13+ messages in thread
From: Eric Wong @ 2023-10-07 21:24 UTC (permalink / raw)
  To: meta

2/9 fixes an annoying syslog error I spotted running tests;
3/9 is long overdue, and there's a few more overdue things
coming up...

Eric Wong (9):
  xt/httpd-async-stream: avoid waitpid call
  lei: do not issue sto->done if socket is inactive
  lei: always use async `done' requests to store
  ipc: require fork+SOCK_SEQPACKET for wq_* functions
  ipc: use autodie for most syscalls
  import: use autodie, rely on PerlIO for retries
  rename ProcessPipe to ProcessIO
  process_io: pass args to awaitpid as list
  cindex: start using autodie

 MANIFEST                                      |  3 +-
 lib/PublicInbox/CodeSearchIdx.pm              | 70 ++++++++--------
 lib/PublicInbox/Gcf2Client.pm                 |  4 +-
 lib/PublicInbox/Git.pm                        |  4 +-
 lib/PublicInbox/HTTPD/Async.pm                |  2 +-
 lib/PublicInbox/IPC.pm                        | 82 ++++++++-----------
 lib/PublicInbox/Import.pm                     | 45 ++++------
 lib/PublicInbox/LEI.pm                        | 11 ++-
 lib/PublicInbox/LeiInput.pm                   |  2 +-
 lib/PublicInbox/LeiRediff.pm                  |  2 +-
 lib/PublicInbox/LeiRemote.pm                  |  2 +-
 lib/PublicInbox/LeiStore.pm                   | 17 ++--
 lib/PublicInbox/LeiToMail.pm                  |  6 +-
 lib/PublicInbox/LeiXSearch.pm                 |  6 +-
 .../{ProcessPipe.pm => ProcessIO.pm}          | 12 ++-
 lib/PublicInbox/Qspawn.pm                     |  8 +-
 lib/PublicInbox/Spamcheck/Spamc.pm            |  2 +-
 lib/PublicInbox/Spawn.pm                      | 12 +--
 t/ipc.t                                       | 19 ++---
 t/lei-store-fail.t                            | 51 ++++++++++++
 t/spawn.t                                     | 12 +--
 xt/httpd-async-stream.t                       |  6 +-
 22 files changed, 196 insertions(+), 182 deletions(-)
 rename lib/PublicInbox/{ProcessPipe.pm => ProcessIO.pm} (83%)
 create mode 100644 t/lei-store-fail.t

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCH 1/9] xt/httpd-async-stream: avoid waitpid call
  2023-10-07 21:24 [PATCH 0/9] more process-related cleanups Eric Wong
@ 2023-10-07 21:24 ` Eric Wong
  2023-10-07 21:24 ` [PATCH 2/9] lei: do not issue sto->done if socket is inactive Eric Wong
                   ` (7 subsequent siblings)
  8 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2023-10-07 21:24 UTC (permalink / raw)
  To: meta

We can just use the non-wantarray form of popen_rd to
save us some extra error checking.
---
 xt/httpd-async-stream.t | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git a/xt/httpd-async-stream.t b/xt/httpd-async-stream.t
index 0658c691..904f2ae9 100644
--- a/xt/httpd-async-stream.t
+++ b/xt/httpd-async-stream.t
@@ -58,7 +58,7 @@ my $do_get_all = sub {
 	my ($buf, $nr);
 	my $bytes = 0;
 	my $t0 = now();
-	my ($rd, $pid) = popen_rd([$curl, @CURL_OPT, $url]);
+	my $rd = popen_rd([$curl, @CURL_OPT, $url]);
 	while (1) {
 		$nr = sysread($rd, $buf, 65536);
 		last if !$nr;
@@ -67,9 +67,7 @@ my $do_get_all = sub {
 	}
 	my $res = $dig->hexdigest;
 	my $elapsed = sprintf('%0.3f', now() - $t0);
-	close $rd or die "close curl failed: $!\n";
-	waitpid($pid, 0) == $pid or die "waitpid failed: $!\n";
-	$? == 0 or die "curl failed: $?\n";
+	close $rd or die "close curl failed: $! \$?=$?\n";
 	print STDERR "# $job $$ ($?) $res (${elapsed}s) $bytes bytes\n";
 	$res;
 };

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 2/9] lei: do not issue sto->done if socket is inactive
  2023-10-07 21:24 [PATCH 0/9] more process-related cleanups Eric Wong
  2023-10-07 21:24 ` [PATCH 1/9] xt/httpd-async-stream: avoid waitpid call Eric Wong
@ 2023-10-07 21:24 ` Eric Wong
  2023-10-07 21:24 ` [PATCH 3/9] lei: always use async `done' requests to store Eric Wong
                   ` (6 subsequent siblings)
  8 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2023-10-07 21:24 UTC (permalink / raw)
  To: meta

This fixes attempts to use an undefined value as an ARRAY reference
in PublicInbox::IPC::wq_io_do
---
 lib/PublicInbox/LEI.pm | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index f8bcd43d..f00b2465 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1296,7 +1296,7 @@ sub can_stay_alive { # PublicInbox::DS::post_loop_do cb
 			my $lne = delete($cfg->{-lei_note_event});
 			$lne->wq_close if $lne;
 			my $sto = delete($cfg->{-lei_store}) // next;
-			eval { $sto->wq_io_do('done') };
+			eval { $sto->wq_do('done') if $sto->{-wq_s1} };
 			warn "E: $@ (dropping store for $cfg->{-f})" if $@;
 			$sto->wq_close;
 		}

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 3/9] lei: always use async `done' requests to store
  2023-10-07 21:24 [PATCH 0/9] more process-related cleanups Eric Wong
  2023-10-07 21:24 ` [PATCH 1/9] xt/httpd-async-stream: avoid waitpid call Eric Wong
  2023-10-07 21:24 ` [PATCH 2/9] lei: do not issue sto->done if socket is inactive Eric Wong
@ 2023-10-07 21:24 ` Eric Wong
  2023-10-08  1:58   ` Eric Wong
                     ` (2 more replies)
  2023-10-07 21:24 ` [PATCH 4/9] ipc: require fork+SOCK_SEQPACKET for wq_* functions Eric Wong
                   ` (5 subsequent siblings)
  8 siblings, 3 replies; 13+ messages in thread
From: Eric Wong @ 2023-10-07 21:24 UTC (permalink / raw)
  To: meta

It's safer against deadlocks and we still get proper error
reporting by passing stderr across in addition to the lei
socket.
---
 MANIFEST                      |  1 +
 lib/PublicInbox/LEI.pm        |  9 +++----
 lib/PublicInbox/LeiInput.pm   |  2 +-
 lib/PublicInbox/LeiRemote.pm  |  2 +-
 lib/PublicInbox/LeiStore.pm   | 17 ++++++------
 lib/PublicInbox/LeiXSearch.pm |  6 ++---
 t/lei-store-fail.t            | 51 +++++++++++++++++++++++++++++++++++
 7 files changed, 69 insertions(+), 19 deletions(-)
 create mode 100644 t/lei-store-fail.t

diff --git a/MANIFEST b/MANIFEST
index 4693cbe0..689c6bf6 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -514,6 +514,7 @@ t/lei-q-thread.t
 t/lei-refresh-mail-sync.t
 t/lei-reindex.t
 t/lei-sigpipe.t
+t/lei-store-fail.t
 t/lei-tag.t
 t/lei-up.t
 t/lei-watch.t
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index f00b2465..4f840e89 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1537,12 +1537,11 @@ sub lms {
 
 sub sto_done_request {
 	my ($lei, $wq) = @_;
-	return unless $lei->{sto};
+	return unless $lei->{sto} && $lei->{sto}->{-wq_s1};
 	local $current_lei = $lei;
-	my $sock = $wq ? $wq->{lei_sock} : undef;
-	$sock //= $lei->{sock};
-	my @io;
-	push(@io, $sock) if $sock; # async wait iff possible
+	my $s = ($wq ? $wq->{lei_sock} : undef) // $lei->{sock};
+	my $errfh = $lei->{2} // *STDERR{GLOB};
+	my @io = $s ? ($errfh, $s) : ($errfh);
 	eval { $lei->{sto}->wq_io_do('done', \@io) };
 	warn($@) if $@;
 }
diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm
index 91383265..93f8b6b8 100644
--- a/lib/PublicInbox/LeiInput.pm
+++ b/lib/PublicInbox/LeiInput.pm
@@ -467,7 +467,7 @@ sub process_inputs {
 	}
 	# always commit first, even on error partial work is acceptable for
 	# lei <import|tag|convert>
-	my $wait = $self->{lei}->{sto}->wq_do('done') if $self->{lei}->{sto};
+	$self->{lei}->sto_done_request;
 	$self->{lei}->fail($err) if $err;
 }
 
diff --git a/lib/PublicInbox/LeiRemote.pm b/lib/PublicInbox/LeiRemote.pm
index 54750062..15013baa 100644
--- a/lib/PublicInbox/LeiRemote.pm
+++ b/lib/PublicInbox/LeiRemote.pm
@@ -52,7 +52,7 @@ sub mset {
 	$self->{smsg} = [];
 	$fh = IO::Uncompress::Gunzip->new($fh, MultiStream => 1);
 	PublicInbox::MboxReader->mboxrd($fh, \&_each_mboxrd_eml, $self);
-	my $wait = $self->{lei}->{sto}->wq_do('done');
+	$self->{lei}->sto_done_request;
 	$ar->join;
 	$lei->child_error($?) if $?;
 	$self; # we are the mset (and $ibx, and $self)
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 0cb78f79..e19ec88e 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -582,19 +582,20 @@ sub xchg_stderr {
 }
 
 sub done {
-	my ($self, $sock_ref) = @_;
-	my $err = '';
+	my ($self) = @_;
+	my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_done_request
+	my @err;
 	if (my $im = delete($self->{im})) {
 		eval { $im->done };
-		if ($@) {
-			$err .= "import done: $@\n";
-			warn $err;
-		}
+		push(@err, "E: import done: $@\n") if $@;
 	}
 	delete $self->{lms};
-	$self->{priv_eidx}->done; # V2Writable::done
+	eval { $self->{priv_eidx}->done }; # V2Writable::done
+	push(@err, "E: priv_eidx done: $@\n") if $@;
+	print { $errfh // *STDERR{GLOB} } @err;
+	send($lei_sock, 'child_error 256', 0) if @err && $lei_sock;
 	xchg_stderr($self);
-	die $err if $err;
+	die @err if @err;
 }
 
 sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 1caa9d06..4077191f 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -358,9 +358,7 @@ sub query_remote_mboxrd {
 		$fh = IO::Uncompress::Gunzip->new($fh, MultiStream => 1);
 		PublicInbox::MboxReader->mboxrd($fh, \&each_remote_eml, $self,
 						$lei, $each_smsg);
-		if (delete($self->{-sto_imported})) {
-			my $wait = $self->{import_sto}->wq_do('done');
-		}
+		$lei->sto_done_request if delete($self->{-sto_imported});
 		$reap_curl->join;
 		my $nr = delete $lei->{-nr_remote_eml} // 0;
 		if ($? == 0) {
@@ -402,7 +400,7 @@ sub query_done { # EOF callback for main daemon
 	delete $lei->{lxs};
 	($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
 		warn "BUG: {sto} missing with --mail-sync";
-	$lei->sto_done_request if $lei->{sto};
+	$lei->sto_done_request;
 	if (my $v2w = delete $lei->{v2w}) {
 		my $wait = $v2w->wq_do('done'); # may die
 		$v2w->wq_close;
diff --git a/t/lei-store-fail.t b/t/lei-store-fail.t
new file mode 100644
index 00000000..e9ad779f
--- /dev/null
+++ b/t/lei-store-fail.t
@@ -0,0 +1,51 @@
+#!perl -w
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+# ensure we detect errors in lei/store
+use v5.12;
+use PublicInbox::TestCommon;
+use autodie qw(pipe open close seek);
+use Fcntl qw(SEEK_SET);
+use File::Path qw(remove_tree);
+
+my $start_home = $ENV{HOME}; # bug guard
+test_lei(sub {
+	lei_ok qw(import -q t/plack-qp.eml); # start the store
+	my $opt;
+	pipe($opt->{0}, my $in_w);
+	open $opt->{1}, '+>', undef;
+	open $opt->{2}, '+>', undef;
+	$opt->{-CLOFORK} = [ $in_w ];
+	my $cmd = [ qw(lei import -q -F mboxrd) ];
+	my $tp = start_script($cmd, undef, $opt);
+	close $opt->{0};
+	$in_w->autoflush(1);
+	for (1..500) { # need to fill up 64k read buffer
+		print $in_w <<EOM or xbail "print $!";
+From k\@y Fri Oct  2 00:00:00 1993
+From: <k\@example.com>
+Date: Sat, 02 Oct 2010 00:00:00 +0000
+Subject: hi
+Message-ID: <$_\@t>
+
+will this save?
+EOM
+	}
+	tick 0.1; # XXX ugh, this is so hacky
+
+	# make sto_done_request fail:
+	remove_tree("$ENV{HOME}/.local/share/lei/store");
+	# subsequent lei commands are undefined behavior,
+	# but we need to make sure the current lei command fails:
+
+	close $in_w; # should trigger ->done
+	$tp->join;
+	isnt($?, 0, 'lei import error code set on failure');
+	is(-s $opt->{1}, 0, 'nothing in stdout');
+	isnt(-s $opt->{2}, 0, 'stderr not empty');
+	seek($opt->{2}, 0, SEEK_SET);
+	my @err = readline($opt->{2});
+	ok(grep(!/^#/, @err), 'noted error in stderr') or diag "@err";
+});
+
+done_testing;

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 4/9] ipc: require fork+SOCK_SEQPACKET for wq_* functions
  2023-10-07 21:24 [PATCH 0/9] more process-related cleanups Eric Wong
                   ` (2 preceding siblings ...)
  2023-10-07 21:24 ` [PATCH 3/9] lei: always use async `done' requests to store Eric Wong
@ 2023-10-07 21:24 ` Eric Wong
  2023-10-07 21:24 ` [PATCH 5/9] ipc: use autodie for most syscalls Eric Wong
                   ` (4 subsequent siblings)
  8 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2023-10-07 21:24 UTC (permalink / raw)
  To: meta

None of the lei internals works properly without forking and
sockets.  The fallback code increases the potential to accidentally
call subs in the wrong process during the teardown phase.

We'll still support ipc_do w/o forking for now since it
forking doesn't benefit small indexing runs from -mda and
such.
---
 lib/PublicInbox/IPC.pm | 43 ++++++++++++++++--------------------------
 t/ipc.t                | 19 ++++++++-----------
 2 files changed, 24 insertions(+), 38 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 068c5623..ba8b5739 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -256,16 +256,12 @@ sub do_sock_stream { # via wq_io_do, for big requests
 
 sub wq_broadcast {
 	my ($self, $sub, @args) = @_;
-	if (my $wkr = $self->{-wq_workers}) {
-		my $buf = ipc_freeze([$sub, @args]);
-		for my $bcast1 (values %$wkr) {
-			my $sock = $bcast1 // $self->{-wq_s1} // next;
-			send($sock, $buf, 0) // croak "send: $!";
-			# XXX shouldn't have to deal with EMSGSIZE here...
-		}
-	} else {
-		eval { $self->$sub(@args) };
-		warn "wq_broadcast: $@" if $@;
+	my $wkr = $self->{-wq_workers} or Carp::confess('no -wq_workers');
+	my $buf = ipc_freeze([$sub, @args]);
+	for my $bcast1 (values %$wkr) {
+		my $sock = $bcast1 // $self->{-wq_s1} // next;
+		send($sock, $buf, 0) // croak "send: $!";
+		# XXX shouldn't have to deal with EMSGSIZE here...
 	}
 }
 
@@ -291,24 +287,17 @@ sub stream_in_full ($$$) {
 
 sub wq_io_do { # always async
 	my ($self, $sub, $ios, @args) = @_;
-	if (my $s1 = $self->{-wq_s1}) { # run in worker
-		my $fds = [ map { fileno($_) } @$ios ];
-		my $buf = ipc_freeze([$sub, @args]);
-		if (length($buf) > $MY_MAX_ARG_STRLEN) {
-			stream_in_full($s1, $fds, $buf);
-		} else {
-			my $n = $send_cmd->($s1, $fds, $buf, 0);
-			return if defined($n); # likely
-			$!{ETOOMANYREFS} and
-				croak "sendmsg: $! (check RLIMIT_NOFILE)";
-			$!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
-				croak("sendmsg: $!");
-		}
+	my $s1 = $self->{-wq_s1} or Carp::confess('no -wq_s1');
+	my $fds = [ map { fileno($_) } @$ios ];
+	my $buf = ipc_freeze([$sub, @args]);
+	if (length($buf) > $MY_MAX_ARG_STRLEN) {
+		stream_in_full($s1, $fds, $buf);
 	} else {
-		@$self{0..$#$ios} = @$ios;
-		eval { $self->$sub(@args) };
-		warn "wq_io_do: $@" if $@;
-		delete @$self{0..$#$ios}; # don't close
+		my $n = $send_cmd->($s1, $fds, $buf, 0);
+		return if defined($n); # likely
+		$!{ETOOMANYREFS} and croak "sendmsg: $! (check RLIMIT_NOFILE)";
+		$!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
+			croak("sendmsg: $!");
 	}
 }
 
diff --git a/t/ipc.t b/t/ipc.t
index 7bdf2218..519ef089 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -1,9 +1,7 @@
 #!perl -w
 # Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-use strict;
-use v5.10.1;
-use Test::More;
+use v5.12;
 use PublicInbox::TestCommon;
 use Fcntl qw(SEEK_SET);
 use PublicInbox::SHA qw(sha1_hex);
@@ -108,7 +106,9 @@ open my $agpl, '<', 'COPYING' or BAIL_OUT "AGPL-3 missing: $!";
 my $big = do { local $/; <$agpl> } // BAIL_OUT "read: $!";
 close $agpl or BAIL_OUT "close: $!";
 
-for my $t ('local', 'worker', 'worker again') {
+for my $t ('worker', 'worker again') {
+	my $ppid = $ipc->wq_workers_start('wq', 1);
+	push(@ppids, $ppid);
 	$ipc->wq_io_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world');
 	my $i = 0;
 	for my $fh ($ra, $rb, $rc) {
@@ -132,14 +132,12 @@ for my $t ('local', 'worker', 'worker again') {
 		$exp = sha1_hex($bigger)."\n";
 		is(readline($rb), $exp, "SHA WQWorker limit ($t)");
 	}
-	my $ppid = $ipc->wq_workers_start('wq', 1);
-	push(@ppids, $ppid);
 }
 
 # wq_io_do works across fork (siblings can feed)
 SKIP: {
 	skip 'Socket::MsgHdr or Inline::C missing', 3 if !$ppids[0];
-	is_deeply(\@ppids, [$$, undef, undef],
+	is_xdeeply(\@ppids, [$$, undef],
 		'parent pid returned in wq_workers_start');
 	my $pid = fork // BAIL_OUT $!;
 	if ($pid == 0) {
@@ -173,10 +171,9 @@ SKIP: {
 	skip 'Socket::MsgHdr or Inline::C missing', 11 if !$ppids[0];
 	seek($warn, 0, SEEK_SET) or BAIL_OUT;
 	my @warn = <$warn>;
-	is(scalar(@warn), 3, 'warned 3 times');
-	like($warn[0], qr/ wq_io_do: /, '1st warned from wq_do');
-	like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker');
-	is($warn[2], $warn[1], 'worker did not die');
+	is(scalar(@warn), 2, 'warned 3 times');
+	like($warn[0], qr/ wq_worker: /, '2nd warned from wq_worker');
+	is($warn[0], $warn[1], 'worker did not die');
 
 	$SIG{__WARN__} = 'DEFAULT';
 	is($ipc->wq_workers_start('wq', 2), $$, 'workers started again');

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 5/9] ipc: use autodie for most syscalls
  2023-10-07 21:24 [PATCH 0/9] more process-related cleanups Eric Wong
                   ` (3 preceding siblings ...)
  2023-10-07 21:24 ` [PATCH 4/9] ipc: require fork+SOCK_SEQPACKET for wq_* functions Eric Wong
@ 2023-10-07 21:24 ` Eric Wong
  2023-10-07 21:24 ` [PATCH 6/9] import: use autodie, rely on PerlIO for retries Eric Wong
                   ` (3 subsequent siblings)
  8 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2023-10-07 21:24 UTC (permalink / raw)
  To: meta

I'm not sure how/if we should bother recovering from these,
so just croak and let some caller deal with it.  `autodie'
uses Carp internally, so setting `PERL5OPT=-MCarp=verbose'
in the environment gives us full stacktraces.
---
 lib/PublicInbox/IPC.pm | 39 ++++++++++++++++-----------------------
 1 file changed, 16 insertions(+), 23 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index ba8b5739..5964645e 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -8,9 +8,9 @@
 # use ipc_do when you need work done on a certain process
 # use wq_io_do when your work can be done on any idle worker
 package PublicInbox::IPC;
-use strict;
-use v5.10.1;
+use v5.12;
 use parent qw(Exporter);
+use autodie qw(fork pipe read socketpair sysread);
 use Carp qw(croak);
 use PublicInbox::DS qw(awaitpid);
 use PublicInbox::Spawn;
@@ -54,9 +54,9 @@ our $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do {
 
 sub _get_rec ($) {
 	my ($r) = @_;
-	defined(my $len = <$r>) or return;
+	my $len = <$r> // return;
 	chop($len) eq "\n" or croak "no LF byte in $len";
-	defined(my $n = read($r, my $buf, $len)) or croak "read error: $!";
+	my $n = read($r, my $buf, $len);
 	$n == $len or croak "short read: $n != $len";
 	ipc_thaw($buf);
 }
@@ -98,12 +98,12 @@ sub ipc_worker_spawn {
 	my ($self, $ident, $oldset, $fields, @cb_args) = @_;
 	return if ($self->{-ipc_ppid} // -1) == $$; # idempotent
 	delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)});
-	pipe(my ($r_req, $w_req)) or die "pipe: $!";
-	pipe(my ($r_res, $w_res)) or die "pipe: $!";
+	pipe(my $r_req, my $w_req);
+	pipe(my $r_res, my $w_res);
 	my $sigset = $oldset // PublicInbox::DS::block_signals();
 	$self->ipc_atfork_prepare;
 	my $seed = rand(0xffffffff);
-	my $pid = fork // die "fork: $!";
+	my $pid = fork;
 	if ($pid == 0) {
 		srand($seed);
 		eval { Net::SSLeay::randomize() };
@@ -211,15 +211,12 @@ sub recv_and_run {
 	my $n = length($buf) or return 0;
 	my $nfd = 0;
 	for my $fd (@fds) {
-		if (open(my $cmdfh, '+<&=', $fd)) {
-			$self->{$nfd++} = $cmdfh;
-			$cmdfh->autoflush(1);
-		} else {
-			die "$$ open(+<&=$fd) (FD:$nfd): $!";
-		}
+		open(my $cmdfh, '+<&=', $fd);
+		$self->{$nfd++} = $cmdfh;
+		$cmdfh->autoflush(1);
 	}
 	while ($full_stream && $n < $len) {
-		my $r = sysread($s2, $buf, $len - $n, $n) // croak "read: $!";
+		my $r = sysread($s2, $buf, $len - $n, $n);
 		croak "read EOF after $n/$len bytes" if $r == 0;
 		$n = length($buf);
 	}
@@ -267,8 +264,7 @@ sub wq_broadcast {
 
 sub stream_in_full ($$$) {
 	my ($s1, $fds, $buf) = @_;
-	socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
-		croak "socketpair: $!";
+	socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0);
 	my $n = $send_cmd->($s1, [ fileno($r) ],
 			ipc_freeze(['do_sock_stream', length($buf)]),
 			0) // croak "sendmsg: $!";
@@ -315,7 +311,7 @@ sub wq_sync_run {
 sub wq_do {
 	my ($self, $sub, @args) = @_;
 	if (defined(wantarray)) {
-		pipe(my ($r, $w)) or die "pipe: $!";
+		pipe(my $r, my $w);
 		wq_io_do($self, 'wq_sync_run', [ $w ], wantarray, $sub, @args);
 		undef $w;
 		_wait_return($r, $sub);
@@ -344,10 +340,9 @@ sub wq_nonblock_do { # always async
 sub _wq_worker_start {
 	my ($self, $oldset, $fields, $one, @cb_args) = @_;
 	my ($bcast1, $bcast2);
-	$one or socketpair($bcast1, $bcast2, AF_UNIX, SOCK_SEQPACKET, 0) or
-							die "socketpair: $!";
+	$one or socketpair($bcast1, $bcast2, AF_UNIX, SOCK_SEQPACKET, 0);
 	my $seed = rand(0xffffffff);
-	my $pid = fork // die "fork: $!";
+	my $pid = fork;
 	if ($pid == 0) {
 		srand($seed);
 		eval { Net::SSLeay::randomize() };
@@ -381,9 +376,7 @@ sub wq_workers_start {
 	my ($self, $ident, $nr_workers, $oldset, $fields, @cb_args) = @_;
 	($send_cmd && $recv_cmd) or return;
 	return if $self->{-wq_s1}; # idempotent
-	$self->{-wq_s1} = $self->{-wq_s2} = undef;
-	socketpair($self->{-wq_s1}, $self->{-wq_s2}, AF_UNIX, SOCK_SEQPACKET, 0)
-		or die "socketpair: $!";
+	socketpair($self->{-wq_s1}, $self->{-wq_s2},AF_UNIX, SOCK_SEQPACKET, 0);
 	$self->ipc_atfork_prepare;
 	$nr_workers //= $self->{-wq_nr_workers}; # was set earlier
 	my $sigset = $oldset // PublicInbox::DS::block_signals();

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 6/9] import: use autodie, rely on PerlIO for retries
  2023-10-07 21:24 [PATCH 0/9] more process-related cleanups Eric Wong
                   ` (4 preceding siblings ...)
  2023-10-07 21:24 ` [PATCH 5/9] ipc: use autodie for most syscalls Eric Wong
@ 2023-10-07 21:24 ` Eric Wong
  2023-10-07 21:24 ` [PATCH 7/9] rename ProcessPipe to ProcessIO Eric Wong
                   ` (2 subsequent siblings)
  8 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2023-10-07 21:24 UTC (permalink / raw)
  To: meta

As documented in perlipc(1), the default :perlio layer retries
the `read' perlop on EINTR.  The :perlio layer also makes `read'
perform read-in-full behavior; so there's no need to loop
ourselves.  Our responsibility is now only to detect short reads
in case fast-import is killed mid-stream.
---
 lib/PublicInbox/Import.pm | 45 ++++++++++++++-------------------------
 1 file changed, 16 insertions(+), 29 deletions(-)

diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index 59462e9a..7175884c 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -6,9 +6,8 @@
 # and public-inbox-watch. Not the WWW or NNTP code which only
 # requires read-only access.
 package PublicInbox::Import;
-use strict;
+use v5.12;
 use parent qw(PublicInbox::Lock);
-use v5.10.1;
 use PublicInbox::Spawn qw(run_die popen_rd);
 use PublicInbox::MID qw(mids mid2path);
 use PublicInbox::Address;
@@ -18,13 +17,15 @@ use PublicInbox::ContentHash qw(content_digest);
 use PublicInbox::MDA;
 use PublicInbox::Eml;
 use POSIX qw(strftime);
+use autodie qw(read close);
+use Carp qw(croak);
 
 sub default_branch () {
 	state $default_branch = do {
 		my $r = popen_rd([qw(git config --global init.defaultBranch)],
 				 { GIT_CONFIG => undef });
 		chomp(my $h = <$r> // '');
-		close $r;
+		CORE::close $r;
 		$h eq '' ? 'refs/heads/master' : "refs/heads/$h";
 	}
 }
@@ -113,20 +114,10 @@ sub _cat_blob ($$$) {
 	local $/ = "\n";
 	my $info = <$r> // die "EOF from fast-import / cat-blob: $!";
 	$info =~ /\A[a-f0-9]{40,} blob ([0-9]+)\n\z/ or return;
-	my $left = $1;
-	my $offset = 0;
-	my $buf = '';
-	my $n;
-	while ($left > 0) {
-		$n = read($r, $buf, $left, $offset) //
-			die "read cat-blob failed: $!";
-		$n == 0 and die 'fast-export (cat-blob) died';
-		$left -= $n;
-		$offset += $n;
-	}
-	$n = read($r, my $lf, 1) //
-		die "read final byte of cat-blob failed: $!";
-	die "bad read on final byte: <$lf>" if $lf ne "\n";
+	my $n = read($r, my $buf, my $len = $1 + 1);
+	$n == $len or croak "cat-blob: short read: $n < $len";
+	my $lf = chop $buf;
+	croak "bad read on final byte: <$lf>" if $lf ne "\n";
 
 	# fixup some bugginess in old versions:
 	$buf =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
@@ -479,9 +470,9 @@ EOM
 	while (my ($fn, $contents) = splice(@fn_contents, 0, 2)) {
 		my $f = $dir.'/'.$fn;
 		next if -f $f;
-		open my $fh, '>', $f or die "open $f: $!";
-		print $fh $contents or die "print $f: $!";
-		close $fh or die "close $f: $!";
+		open my $fh, '>', $f;
+		print $fh $contents;
+		close $fh;
 	}
 }
 
@@ -494,7 +485,7 @@ sub done {
 	eval {
 		my $r = delete $self->{in} or die 'BUG: missing {in} when done';
 		print $w "done\n" or wfail;
-		close $r or die "fast-import failed: $?"; # ProcessPipe::CLOSE
+		close $r;
 	};
 	my $wait_err = $@;
 	my $nchg = delete $self->{nchg};
@@ -509,10 +500,7 @@ sub done {
 
 sub atfork_child {
 	my ($self) = @_;
-	foreach my $f (qw(in out)) {
-		next unless defined($self->{$f});
-		close $self->{$f} or die "failed to close import[$f]: $!\n";
-	}
+	close($_) for (grep defined, delete(@$self{qw(in out)}));
 }
 
 sub digest2mid ($$;$) {
@@ -583,10 +571,9 @@ sub replace_oids {
 			push @buf, "commit $tmp\n";
 		} elsif (/^data ([0-9]+)/) {
 			# only commit message, so $len is small:
-			my $len = $1; # + 1 for trailing "\n"
 			push @buf, $_;
-			my $n = read($rd, my $buf, $len) or die "read: $!";
-			$len == $n or die "short read ($n < $len)";
+			my $n = read($rd, my $buf, my $len = $1);
+			$len == $n or croak "short read ($n < $len)";
 			push @buf, $buf;
 		} elsif (/^M 100644 ([a-f0-9]+) (\w+)/) {
 			my ($oid, $path) = ($1, $2);
@@ -625,7 +612,7 @@ sub replace_oids {
 			push @buf, $_;
 		}
 	}
-	close $rd or die "close fast-export failed: $?";
+	close $rd;
 	if (@buf) {
 		print $w @buf or wfail;
 	}

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 7/9] rename ProcessPipe to ProcessIO
  2023-10-07 21:24 [PATCH 0/9] more process-related cleanups Eric Wong
                   ` (5 preceding siblings ...)
  2023-10-07 21:24 ` [PATCH 6/9] import: use autodie, rely on PerlIO for retries Eric Wong
@ 2023-10-07 21:24 ` Eric Wong
  2023-10-07 21:24 ` [PATCH 8/9] process_io: pass args to awaitpid as list Eric Wong
  2023-10-07 21:24 ` [PATCH 9/9] cindex: start using autodie Eric Wong
  8 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2023-10-07 21:24 UTC (permalink / raw)
  To: meta

Since we deal with pipes (of either direction) and bidirectional
stream sockets for this class, it's better to remove the `Pipe'
from the name and replace it with `IO' to communicate that it
works for any form of IO::Handle-like object tied to a process.
---
 MANIFEST                                         | 2 +-
 lib/PublicInbox/Gcf2Client.pm                    | 4 ++--
 lib/PublicInbox/Git.pm                           | 4 ++--
 lib/PublicInbox/HTTPD/Async.pm                   | 2 +-
 lib/PublicInbox/LeiRediff.pm                     | 2 +-
 lib/PublicInbox/LeiToMail.pm                     | 4 ++--
 lib/PublicInbox/{ProcessPipe.pm => ProcessIO.pm} | 8 +++-----
 lib/PublicInbox/Qspawn.pm                        | 4 ++--
 lib/PublicInbox/Spamcheck/Spamc.pm               | 2 +-
 lib/PublicInbox/Spawn.pm                         | 6 +++---
 t/spawn.t                                        | 4 ++--
 11 files changed, 20 insertions(+), 22 deletions(-)
 rename lib/PublicInbox/{ProcessPipe.pm => ProcessIO.pm} (86%)

diff --git a/MANIFEST b/MANIFEST
index 689c6bf6..c972818f 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -318,7 +318,7 @@ lib/PublicInbox/OverIdx.pm
 lib/PublicInbox/POP3.pm
 lib/PublicInbox/POP3D.pm
 lib/PublicInbox/PktOp.pm
-lib/PublicInbox/ProcessPipe.pm
+lib/PublicInbox/ProcessIO.pm
 lib/PublicInbox/Qspawn.pm
 lib/PublicInbox/Reply.pm
 lib/PublicInbox/RepoAtom.pm
diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index 4a0348b4..f63a0335 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -10,7 +10,7 @@ use PublicInbox::Gcf2; # fails if Inline::C or libgit2-dev isn't available
 use PublicInbox::Spawn qw(spawn);
 use Socket qw(AF_UNIX SOCK_STREAM);
 use PublicInbox::Syscall qw(EPOLLIN);
-use PublicInbox::ProcessPipe;
+use PublicInbox::ProcessIO;
 use autodie qw(socketpair);
 
 # fields:
@@ -33,7 +33,7 @@ sub new  {
 	my $cmd = [$^X, $^W ? ('-w') : (),
 			qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]];
 	my $pid = spawn($cmd, $env, $opt);
-	my $sock = PublicInbox::ProcessPipe->maybe_new($pid, $s1);
+	my $sock = PublicInbox::ProcessIO->maybe_new($pid, $s1);
 	$self->{inflight} = [];
 	$self->{epwatch} = \undef; # for Git->cleanup
 	$self->SUPER::new($sock, EPOLLIN);
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 0fd621e1..94d5dcee 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -165,7 +165,7 @@ sub _sock_cmd {
 						$self->fail("tmpfile($id): $!");
 	}
 	my $pid = spawn(\@cmd, undef, $opt);
-	$self->{sock} = PublicInbox::ProcessPipe->maybe_new($pid, $s1);
+	$self->{sock} = PublicInbox::ProcessIO->maybe_new($pid, $s1);
 }
 
 sub poll_in ($) { IO::Poll::_poll($RDTIMEO, fileno($_[0]), my $ev = POLLIN) }
@@ -626,7 +626,7 @@ sub cleanup_if_unlinked {
 	my $ret = 0;
 	for my $obj ($self, ($self->{ck} // ())) {
 		my $sock = $obj->{sock} // next;
-		my PublicInbox::ProcessPipe $pp = tied *$sock; # ProcessPipe
+		my PublicInbox::ProcessIO $pp = tied *$sock; # ProcessIO
 		my $pid = $pp->{pid} // next;
 		open my $fh, '<', "/proc/$pid/maps" or return cleanup($self, 1);
 		while (<$fh>) {
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index 7bbab1e1..b9d2159c 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -37,7 +37,7 @@ sub new {
 		arg => $arg, # arg for $cb
 		end_obj => $end_obj, # like END{}, can ->event_step
 	}, $class;
-	my $pp = tied *$io; # ProcessPipe
+	my $pp = tied *$io; # ProcessIO
 	$pp->{fh}->blocking(0) // die "$io->blocking(0): $!";
 	$self->SUPER::new($io, EPOLLIN);
 }
diff --git a/lib/PublicInbox/LeiRediff.pm b/lib/PublicInbox/LeiRediff.pm
index a886931c..b894342b 100644
--- a/lib/PublicInbox/LeiRediff.pm
+++ b/lib/PublicInbox/LeiRediff.pm
@@ -152,7 +152,7 @@ sub requote ($$) {
 	# $^X (perl) is overkill, but maybe there's a weird system w/o sed
 	my ($w, $pid) = popen_wr([$^X, '-pe', "s/^/$pfx/"], $lei->{env}, $opt);
 	$w->autoflush(1);
-	binmode $w, ':utf8'; # incompatible with ProcessPipe due to syswrite
+	binmode $w, ':utf8'; # incompatible with ProcessIO due to syswrite
 	$lei->{1} = $w;
 	PublicInbox::OnDestroy->new(\&wait_requote, $lei, $pid, $old_1);
 }
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index f239da82..f56ad330 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -7,7 +7,7 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::IPC);
 use PublicInbox::Eml;
-use PublicInbox::ProcessPipe;
+use PublicInbox::ProcessIO;
 use PublicInbox::Spawn qw(spawn);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
@@ -162,7 +162,7 @@ sub _post_augment_mbox { # open a compressor process from top-level lei-daemon
 	my ($r, $w) = @{delete $lei->{zpipe}};
 	my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2}, pgid => 0 };
 	my $pid = spawn($cmd, undef, $rdr);
-	$lei->{1} = PublicInbox::ProcessPipe->maybe_new($pid, $w, {
+	$lei->{1} = PublicInbox::ProcessIO->maybe_new($pid, $w, {
 			cb_arg => [\&reap_compress, $lei, $cmd, $lei->{1} ] });
 }
 
diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessIO.pm
similarity index 86%
rename from lib/PublicInbox/ProcessPipe.pm
rename to lib/PublicInbox/ProcessIO.pm
index ba2c1ecb..eeb66139 100644
--- a/lib/PublicInbox/ProcessPipe.pm
+++ b/lib/PublicInbox/ProcessIO.pm
@@ -1,11 +1,9 @@
 # Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
-# a tied handle for auto reaping of children tied to a read-only pipe, see perltie(1)
-# DO NOT use this as-is for bidirectional pipes/sockets (e.g. in PublicInbox::Git),
-# both ends of the pipe must be at the same level of the Perl object hierarchy
-# to ensure orderly destruction.
-package PublicInbox::ProcessPipe;
+# a tied handle for auto reaping of children tied to a pipe or socket,
+# see perltie(1) for details.
+package PublicInbox::ProcessIO;
 use v5.12;
 use PublicInbox::DS qw(awaitpid);
 use Symbol qw(gensym);
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 5e4fd5cb..ea7ae647 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -124,7 +124,7 @@ sub finish ($;$) {
 
 	# we can safely finalize if pipe was closed before, or if
 	# {_err} is defined by waitpid_err.  Deleting {rpipe} will
-	# trigger PublicInbox::ProcessPipe::DESTROY -> waitpid_err,
+	# trigger PublicInbox::ProcessIO::DESTROY -> waitpid_err,
 	# but it may not fire right away if inside the event loop.
 	my $closed_before = !delete($self->{rpipe});
 	finalize($self) if $closed_before || defined($self->{_err});
@@ -251,7 +251,7 @@ sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
 	if (ref($r) ne 'ARRAY' || scalar(@$r) == 3) { # error
 		if ($async) { # calls rpipe->close && ->event_step
 			$async->close; # PublicInbox::HTTPD::Async::close
-		} else { # generic PSGI, use PublicInbox::ProcessPipe::CLOSE
+		} else { # generic PSGI, use PublicInbox::ProcessIO::CLOSE
 			delete($self->{rpipe})->close;
 			event_step($self);
 		}
diff --git a/lib/PublicInbox/Spamcheck/Spamc.pm b/lib/PublicInbox/Spamcheck/Spamc.pm
index 726866c8..cba33a66 100644
--- a/lib/PublicInbox/Spamcheck/Spamc.pm
+++ b/lib/PublicInbox/Spamcheck/Spamc.pm
@@ -27,7 +27,7 @@ sub spamcheck {
 		$out = \$buf;
 	}
 	$$out = do { local $/; <$fh> };
-	close $fh; # PublicInbox::ProcessPipe::CLOSE
+	close $fh; # PublicInbox::ProcessIO::CLOSE
 	($? || $$out eq '') ? 0 : 1;
 }
 
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index 4c7e0f80..cb8b21c6 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -21,7 +21,7 @@ use PublicInbox::Lock;
 use Fcntl qw(SEEK_SET);
 use IO::Handle ();
 use Carp qw(croak);
-use PublicInbox::ProcessPipe;
+use PublicInbox::ProcessIO;
 our @EXPORT_OK = qw(which spawn popen_rd popen_wr run_die run_wait);
 our @RLIMITS = qw(RLIMIT_CPU RLIMIT_CORE RLIMIT_DATA);
 
@@ -368,13 +368,13 @@ sub spawn ($;$$) {
 sub popen_rd {
 	my ($cmd, $env, $opt) = @_;
 	pipe(my $r, local $opt->{1}) or die "pipe: $!\n";
-	PublicInbox::ProcessPipe->maybe_new(spawn($cmd, $env, $opt), $r, $opt)
+	PublicInbox::ProcessIO->maybe_new(spawn($cmd, $env, $opt), $r, $opt)
 }
 
 sub popen_wr {
 	my ($cmd, $env, $opt) = @_;
 	pipe(local $opt->{0}, my $w) or die "pipe: $!\n";
-	PublicInbox::ProcessPipe->maybe_new(spawn($cmd, $env, $opt), $w, $opt)
+	PublicInbox::ProcessIO->maybe_new(spawn($cmd, $env, $opt), $w, $opt)
 }
 
 sub run_wait ($;$$) {
diff --git a/t/spawn.t b/t/spawn.t
index 04589437..be5aaf9f 100644
--- a/t/spawn.t
+++ b/t/spawn.t
@@ -149,8 +149,8 @@ EOF
 	$fh = popen_rd(['true'], undef, { cb_arg => [sub { @c = caller }] });
 	undef $fh; # ->DESTROY
 	ok(scalar(@c), 'callback fired by ->DESTROY');
-	ok(grep(!m[/PublicInbox/ProcessPipe\.pm\z], @c),
-		'callback not invoked by ProcessPipe');
+	ok(grep(!m[/PublicInbox/ProcessIO\.pm\z], @c),
+		'callback not invoked by ProcessIO');
 }
 
 { # children don't wait on siblings

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 8/9] process_io: pass args to awaitpid as list
  2023-10-07 21:24 [PATCH 0/9] more process-related cleanups Eric Wong
                   ` (6 preceding siblings ...)
  2023-10-07 21:24 ` [PATCH 7/9] rename ProcessPipe to ProcessIO Eric Wong
@ 2023-10-07 21:24 ` Eric Wong
  2023-10-07 21:24 ` [PATCH 9/9] cindex: start using autodie Eric Wong
  8 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2023-10-07 21:24 UTC (permalink / raw)
  To: meta

Specifying {cb_args} in the options hash felt awkward to me.
Instead, just use the Perl stack like we do with awaitpid()
and pass the list down directly.
---
 lib/PublicInbox/LeiToMail.pm |  4 ++--
 lib/PublicInbox/ProcessIO.pm |  4 ++--
 lib/PublicInbox/Qspawn.pm    |  4 ++--
 lib/PublicInbox/Spawn.pm     | 10 ++++++----
 t/spawn.t                    |  8 ++++----
 5 files changed, 16 insertions(+), 14 deletions(-)

diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index f56ad330..8771592d 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -162,8 +162,8 @@ sub _post_augment_mbox { # open a compressor process from top-level lei-daemon
 	my ($r, $w) = @{delete $lei->{zpipe}};
 	my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2}, pgid => 0 };
 	my $pid = spawn($cmd, undef, $rdr);
-	$lei->{1} = PublicInbox::ProcessIO->maybe_new($pid, $w, {
-			cb_arg => [\&reap_compress, $lei, $cmd, $lei->{1} ] });
+	$lei->{1} = PublicInbox::ProcessIO->maybe_new($pid, $w,
+				\&reap_compress, $lei, $cmd, $lei->{1});
 }
 
 # --augment existing output destination, with deduplication
diff --git a/lib/PublicInbox/ProcessIO.pm b/lib/PublicInbox/ProcessIO.pm
index eeb66139..5a81e3a6 100644
--- a/lib/PublicInbox/ProcessIO.pm
+++ b/lib/PublicInbox/ProcessIO.pm
@@ -9,10 +9,10 @@ use PublicInbox::DS qw(awaitpid);
 use Symbol qw(gensym);
 
 sub maybe_new {
-	my ($cls, $pid, $fh, $opt) = @_;
+	my ($cls, $pid, $fh, @cb_arg) = @_;
 	return ($fh, $pid) if wantarray;
 	my $s = gensym;
-	tie *$s, $cls, $pid, $fh, @{$opt->{cb_arg} // []};
+	tie *$s, $cls, $pid, $fh, @cb_arg;
 	$s;
 }
 
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index ea7ae647..0e52617c 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -58,10 +58,10 @@ sub _do_spawn {
 	}
 	$self->{cmd} = $cmd;
 	$self->{-quiet} = 1 if $o{quiet};
-	$o{cb_arg} = [ \&waitpid_err, $self ];
 	eval {
 		# popen_rd may die on EMFILE, ENFILE
-		$self->{rpipe} = popen_rd($cmd, $cmd_env, \%o) // die "E: $!";
+		$self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
+					\&waitpid_err, $self);
 		$limiter->{running}++;
 		$start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
 	};
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index cb8b21c6..ec256698 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -366,15 +366,17 @@ sub spawn ($;$$) {
 }
 
 sub popen_rd {
-	my ($cmd, $env, $opt) = @_;
+	my ($cmd, $env, $opt, @cb_arg) = @_;
 	pipe(my $r, local $opt->{1}) or die "pipe: $!\n";
-	PublicInbox::ProcessIO->maybe_new(spawn($cmd, $env, $opt), $r, $opt)
+	my $pid = spawn($cmd, $env, $opt);
+	PublicInbox::ProcessIO->maybe_new($pid, $r, @cb_arg);
 }
 
 sub popen_wr {
-	my ($cmd, $env, $opt) = @_;
+	my ($cmd, $env, $opt, @cb_arg) = @_;
 	pipe(local $opt->{0}, my $w) or die "pipe: $!\n";
-	PublicInbox::ProcessIO->maybe_new(spawn($cmd, $env, $opt), $w, $opt)
+	my $pid = spawn($cmd, $env, $opt);
+	PublicInbox::ProcessIO->maybe_new($pid, $w, @cb_arg)
 }
 
 sub run_wait ($;$$) {
diff --git a/t/spawn.t b/t/spawn.t
index be5aaf9f..1af66bda 100644
--- a/t/spawn.t
+++ b/t/spawn.t
@@ -140,13 +140,13 @@ EOF
 
 { # ->CLOSE vs ->DESTROY waitpid caller distinction
 	my @c;
-	my $fh = popen_rd(['true'], undef, { cb_arg => [sub { @c = caller }] });
+	my $fh = popen_rd(['true'], undef, undef, sub { @c = caller });
 	ok(close($fh), '->CLOSE fired and successful');
 	ok(scalar(@c), 'callback fired by ->CLOSE');
 	ok(grep(!m[/PublicInbox/DS\.pm\z], @c), 'callback not invoked by DS');
 
 	@c = ();
-	$fh = popen_rd(['true'], undef, { cb_arg => [sub { @c = caller }] });
+	$fh = popen_rd(['true'], undef, undef, sub { @c = caller });
 	undef $fh; # ->DESTROY
 	ok(scalar(@c), 'callback fired by ->DESTROY');
 	ok(grep(!m[/PublicInbox/ProcessIO\.pm\z], @c),
@@ -157,8 +157,8 @@ EOF
 	use POSIX qw(_exit);
 	pipe(my ($r, $w)) or BAIL_OUT $!;
 	my @arg;
-	my $cb = [ sub { @arg = @_; warn "x=$$\n" }, 'hi' ];
-	my $fh = popen_rd(['cat'], undef, { 0 => $r, cb_arg => $cb });
+	my $fh = popen_rd(['cat'], undef, { 0 => $r },
+			sub { @arg = @_; warn "x=$$\n" }, 'hi');
 	my $pp = tied *$fh;
 	my $pid = fork // BAIL_OUT $!;
 	local $SIG{__WARN__} = sub { _exit(1) };

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 9/9] cindex: start using autodie
  2023-10-07 21:24 [PATCH 0/9] more process-related cleanups Eric Wong
                   ` (7 preceding siblings ...)
  2023-10-07 21:24 ` [PATCH 8/9] process_io: pass args to awaitpid as list Eric Wong
@ 2023-10-07 21:24 ` Eric Wong
  8 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2023-10-07 21:24 UTC (permalink / raw)
  To: meta

A small step towards making this code less noisy.
---
 lib/PublicInbox/CodeSearchIdx.pm | 70 ++++++++++++++++----------------
 1 file changed, 35 insertions(+), 35 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 2e46e30a..feb37be8 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -55,8 +55,8 @@ use PublicInbox::CidxLogP;
 use PublicInbox::CidxComm;
 use PublicInbox::Git qw(%OFMT2HEXLEN);
 use PublicInbox::Compat qw(uniqstr);
-use Socket qw(AF_UNIX SOCK_SEQPACKET);
 use Carp ();
+use autodie qw(pipe open seek sysseek send);
 our (
 	$LIVE, # pid => cmd
 	$LIVE_JOBS, # integer
@@ -265,9 +265,9 @@ sub shard_index { # via wq_io_do in IDX_SHARDS
 
 	my $in = delete($self->{0}) // die 'BUG: no {0} input';
 	my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
-	sysseek($in, 0, SEEK_SET) or die "seek: $!";
+	sysseek($in, 0, SEEK_SET);
 	my ($rd, $pid) = $git->popen(@LOG_STDIN, undef, { 0 => $in });
-	close $in or die "close: $!";
+	close $in;
 	awaitpid($pid, \&cidx_reap_log, $self, $op_p);
 	PublicInbox::CidxLogP->new($rd, $self, $git, $roots);
 	# CidxLogP->event_step will call cidx_read_log_p once there's input
@@ -382,7 +382,7 @@ sub cidx_await ($$$$$@) {
 sub fp_start ($$$) {
 	my ($self, $git, $prep_repo) = @_;
 	return if !$LIVE || $DO_QUIT;
-	open my $refs, '+>', undef or die "open: $!";
+	open my $refs, '+>', undef;
 	my $cmd = ['git', "--git-dir=$git->{git_dir}",
 		qw(show-ref --heads --tags --hash)];
 	my $pid = spawn($cmd, undef, { 1 => $refs });
@@ -393,7 +393,7 @@ sub fp_start ($$$) {
 sub fp_fini { # cidx_await cb
 	my ($self, $git, $prep_repo) = @_;
 	my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
-	seek($refs, 0, SEEK_SET) or die "seek: $!";
+	seek($refs, 0, SEEK_SET);
 	my $buf;
 	my $dig = PublicInbox::SHA->new(256);
 	while (read($refs, $buf, 65536)) { $dig->add($buf) }
@@ -458,13 +458,13 @@ sub check_existing { # retry_reopen callback
 
 sub partition_refs ($$$) {
 	my ($self, $git, $refs) = @_; # show-ref --heads --tags --hash output
-	sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
+	sysseek($refs, 0, SEEK_SET);
 	my $rfh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
-	close $refs or die "close: $!";
+	close $refs;
 	my $seen = 0;
 	my @shard_in = map {
 		$_->reopen;
-		open my $fh, '+>', undef or die "open: $!";
+		open my $fh, '+>', undef;
 		$fh;
 	} @RDONLY_XDB;
 
@@ -483,11 +483,11 @@ sub partition_refs ($$$) {
 			$seen = 0;
 		}
 		if ($DO_QUIT) {
-			close($rfh);
+			CORE::close($rfh);
 			return ();
 		}
 	}
-	close($rfh);
+	CORE::close($rfh);
 	return () if $DO_QUIT;
 	if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) {
 		my $n = $NCHANGE - $n0;
@@ -521,16 +521,16 @@ sub dump_roots_start {
 	local $self->{xdb};
 	@ID2ROOT = $self->all_terms('G');
 	my $root2id = "$TMPDIR/root2id";
-	open my $fh, '>', $root2id or die "open($root2id): $!";
+	open my $fh, '>', $root2id;
 	my $nr = -1;
 	for (@ID2ROOT) { print $fh $_, "\0", ++$nr, "\0" } # mmap-friendly
-	close $fh or die "close: $!";
+	close $fh;
 	# dump_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_id
-	pipe(my ($sort_r, $sort_w)) or die "pipe: $!";
-	pipe(my ($fold_r, $fold_w)) or die "pipe: $!";
+	pipe(my $sort_r, my $sort_w);
+	pipe(my $fold_r, my $fold_w);
 	my @sort = (@SORT, '-k1,1');
 	my $dst = "$TMPDIR/to_root_id";
-	open $fh, '>', $dst or die "open($dst): $!";
+	open $fh, '>', $dst;
 	my $env = { %$CMD_ENV, OFS => ' ' };
 	my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fold_w });
 	my $fold_pid = spawn(\@UNIQ_FOLD, $env, { 0 => $fold_r, 1 => $fh });
@@ -539,7 +539,7 @@ sub dump_roots_start {
 	my @arg = ((map { ('-A', $_) } @ASSOC_PFX), '-c',
 		'-m', assoc_max_init($self), $root2id, $QRY_STR);
 	for my $d ($self->shard_dirs) {
-		pipe(my ($err_r, $err_w)) or die "pipe: $!";
+		pipe(my $err_r, my $err_w);
 		$XHC->mkreq([$sort_w, $err_w], qw(dump_roots -d), $d, @arg);
 		my $desc = "dump_roots $d";
 		$self->{PENDING}->{$desc} = $associate;
@@ -554,7 +554,7 @@ sub dump_ibx { # sends to xap_helper.h
 	my @cmd = ('dump_ibx', $ibx->isrch->xh_args,
 			(map { ('-A', $_) } @ASSOC_PFX),
 			$ibx_id, $QRY_STR);
-	pipe(my ($r, $w)) or die "pipe: $!";
+	pipe(my $r, my $w);
 	$XHC->mkreq([$DUMP_IBX_WPIPE, $w], @cmd);
 	my $ekey = $ibx->eidx_key;
 	$self->{PENDING}->{$ekey} = $TODO{associate};
@@ -563,12 +563,12 @@ sub dump_ibx { # sends to xap_helper.h
 
 sub dump_ibx_start {
 	my ($self, $associate) = @_;
-	pipe(my $sort_r, $DUMP_IBX_WPIPE) or die "pipe: $!";
-	pipe(my ($fold_r, $fold_w)) or die "pipe: $!";
+	pipe(my $sort_r, $DUMP_IBX_WPIPE);
+	pipe(my $fold_r, my $fold_w);
 	my @sort = (@SORT, '-k1,1'); # sort only on ASSOC_PFX
 	# pipeline: dump_ibx | sort -k1,1 | uniq_fold >to_ibx_id
 	my $dst = "$TMPDIR/to_ibx_id";
-	open my $fh, '>', $dst or die "open($dst): $!";
+	open my $fh, '>', $dst;
 	my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fold_w });
 	my $fold_pid = spawn(\@UNIQ_FOLD, $CMD_ENV, { 0 => $fold_r, 1 => $fh });
 	awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
@@ -638,9 +638,9 @@ sub index_repo { # cidx_await cb
 	return push(@$IDX_TODO, $git) if $REPO_CTX; # busy
 	my $repo = delete $git->{-repo} or return index_next($self);
 	my $roots_fh = delete $repo->{roots_fh} // die 'BUG: no {roots_fh}';
-	seek($roots_fh, 0, SEEK_SET) or die "seek: $!";
+	seek($roots_fh, 0, SEEK_SET);
 	chomp(my @roots = <$roots_fh>);
-	close($roots_fh) or die "close: $!";
+	close($roots_fh);
 	if (!@roots) {
 		warn("E: $git->{git_dir} has no root commits\n");
 		return index_next($self);
@@ -670,8 +670,8 @@ sub get_roots ($$) {
 	my ($self, $git) = @_;
 	return if !$LIVE || $DO_QUIT;
 	my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
-	sysseek($refs, 0, SEEK_SET) or die "seek: $!";
-	open my $roots_fh, '+>', undef or die "open: $!";
+	sysseek($refs, 0, SEEK_SET);
+	open my $roots_fh, '+>', undef;
 	my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
 			qw(rev-list --stdin --max-parents=0) ];
 	my $pid = spawn($cmd, undef, { 0 => $refs, 1 => $roots_fh });
@@ -838,7 +838,7 @@ sub prep_alternate_end { # awaitpid callback for config extensions.objectFormat
 	if ($status == 1) { # unset, default is '' (SHA-1)
 		$fmt = 'sha1';
 	} elsif ($status == 0) {
-		seek($out, 0, SEEK_SET) or die "seek: $!";
+		seek($out, 0, SEEK_SET);
 		chomp($fmt = <$out> // 'sha1');
 	} else {
 		return warn("git config \$?=$? for objdir=$objdir");
@@ -851,7 +851,7 @@ EOM
 		my $git_dir = "$TMPDIR/hexlen$hexlen.git";
 		PublicInbox::Import::init_bare($git_dir, 'cidx-all', $fmt);
 		my $f = "$git_dir/objects/info/alternates";
-		open $ALT_FH{$hexlen}, '>', $f or die "open($f): $!";
+		open $ALT_FH{$hexlen}, '>', $f;
 	}
 	say { $ALT_FH{$hexlen} } $objdir or die "say: $!";
 }
@@ -865,7 +865,7 @@ sub prep_alternate_start {
 	}
 	my $cmd = [ 'git', "--git-dir=$git_dir",
 			qw(config extensions.objectFormat) ];
-	open my $out, '+>', undef or die "open(tmp): $!";
+	open my $out, '+>', undef;
 	my $pid = spawn($cmd, undef, { 1 => $out });
 	awaitpid($pid, \&prep_alternate_end, $o, $out, $run_prune);
 }
@@ -894,7 +894,7 @@ sub associate {
 			++$score{"$ibx_id $_"} for @root_ids;
 		}
 	}
-	close $rd or die "@join failed: $?=$?";
+	CORE::close $rd or die "@join failed: $?=$?";
 	my $min = $self->{-opt}->{'assoc-min'} // 10;
 	progress($self, scalar(keys %score).' potential pairings...');
 	for my $k (keys %score) {
@@ -961,10 +961,10 @@ sub init_prune ($) {
 			comm => \@COMM, awk => \@AWK);
 	for (0..$#IDX_SHARDS) { push @delve, "$self->{xpfx}/$_" }
 	my $run_prune = PublicInbox::OnDestroy->new($$, \&run_prune, $self);
-	pipe(my ($sed_in, $delve_out)) or die "pipe: $!";
-	pipe(my ($sort_in, $sed_out)) or die "pipe: $!";
+	pipe(my $sed_in, my $delve_out);
+	pipe(my $sort_in, my $sed_out);
 	my @sort_u = (@SORT, '-u');
-	open(my $sort_out, '+>', "$TMPDIR/indexed_commits") or die "open: $!";
+	open(my $sort_out, '+>', "$TMPDIR/indexed_commits");
 	my $pid = spawn(\@sort_u, $CMD_ENV, { 0 => $sort_in, 1 => $sort_out });
 	awaitpid($pid, \&cmd_done, \@sort_u, $run_prune);
 	$pid = spawn(\@sed, $CMD_ENV, { 0 => $sed_in, 1 => $sed_out });
@@ -982,7 +982,7 @@ sub dump_git_commits { # awaitpid cb
 	(defined($pid) && $?) and die "E: @PRUNE_BATCH: \$?=$?";
 	return if $DO_QUIT;
 	my ($hexlen) = keys(%ALT_FH) or return; # done
-	close(delete $ALT_FH{$hexlen}) or die "close: $!";
+	close(delete $ALT_FH{$hexlen});
 
 	$PRUNE_BATCH[1] = "--git-dir=$TMPDIR/hexlen$hexlen.git";
 	$pid = spawn(\@PRUNE_BATCH, undef, { 1 => $batch_out });
@@ -998,9 +998,9 @@ sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done
 	#	git --git-dir=hexlen64.git cat-file \
 	#		--batch-all-objects --batch-check
 	# ) | awk | sort | comm | cidx_read_comm()
-	pipe(my ($awk_in, $batch_out)) or die "pipe: $!";
-	pipe(my ($sort_in, $awk_out)) or die "pipe: $!";
-	pipe(my ($comm_in, $sort_out)) or die "pipe: $!";
+	pipe(my $awk_in, my $batch_out);
+	pipe(my $sort_in, my $awk_out);
+	pipe(my $comm_in, my $sort_out);
 	my $awk_pid = spawn(\@AWK, $CMD_ENV, { 0 => $awk_in, 1 => $awk_out });
 	my @sort_u = (@SORT, '-u');
 	my $sort_pid = spawn(\@sort_u, $CMD_ENV,

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* Re: [PATCH 3/9] lei: always use async `done' requests to store
  2023-10-07 21:24 ` [PATCH 3/9] lei: always use async `done' requests to store Eric Wong
@ 2023-10-08  1:58   ` Eric Wong
  2023-10-08  5:49   ` [PATCH 2.5/9] lei: fix implicit stdin support for pipes Eric Wong
  2023-10-08 18:54   ` [PATCHv2 3/9] lei: always use async `done' requests to store Eric Wong
  2 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2023-10-08  1:58 UTC (permalink / raw)
  To: meta

Eric Wong <e@80x24.org> wrote:
> diff --git a/lib/PublicInbox/LeiRemote.pm b/lib/PublicInbox/LeiRemote.pm
> index 54750062..15013baa 100644
> --- a/lib/PublicInbox/LeiRemote.pm
> +++ b/lib/PublicInbox/LeiRemote.pm
> @@ -52,7 +52,7 @@ sub mset {
>  	$self->{smsg} = [];
>  	$fh = IO::Uncompress::Gunzip->new($fh, MultiStream => 1);
>  	PublicInbox::MboxReader->mboxrd($fh, \&_each_mboxrd_eml, $self);
> -	my $wait = $self->{lei}->{sto}->wq_do('done');
> +	$self->{lei}->sto_done_request;

That's usually not the normal lei/store, but the {tmp_sto} one
from LeiRediff.  So it must be synchronous in that case because
we make multiple queries, there.

So I'll revert that hunk.

And ugh, the new lei-store-fail.t test is so nasty; I don't
think a 100ms delay is enough for some systems...

diff --git a/t/lei-store-fail.t b/t/lei-store-fail.t
index e9ad779f..fb0f2b75 100644
--- a/t/lei-store-fail.t
+++ b/t/lei-store-fail.t
@@ -31,7 +31,7 @@ Message-ID: <$_\@t>
 will this save?
 EOM
 	}
-	tick 0.1; # XXX ugh, this is so hacky
+	tick 0.2; # XXX ugh, this is so hacky
 
 	# make sto_done_request fail:
 	remove_tree("$ENV{HOME}/.local/share/lei/store");

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 2.5/9] lei: fix implicit stdin support for pipes
  2023-10-07 21:24 ` [PATCH 3/9] lei: always use async `done' requests to store Eric Wong
  2023-10-08  1:58   ` Eric Wong
@ 2023-10-08  5:49   ` Eric Wong
  2023-10-08 18:54   ` [PATCHv2 3/9] lei: always use async `done' requests to store Eric Wong
  2 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2023-10-08  5:49 UTC (permalink / raw)
  To: meta

Eric Wong <e@80x24.org> wrote:
> +++ b/t/lei-store-fail.t

> +	my $cmd = [ qw(lei import -q -F mboxrd) ];
> +	my $tp = start_script($cmd, undef, $opt);

Of course the lack of `-' or  `--stdin' only worked on Linux and
NetBSD, but not other BSDs.

-------8<------
Subject: [PATCH] lei: fix implicit stdin support for pipes

st_mode permission bits can't be used to determine if a file or
pipe we have on stdin readable or not.  Writable regular files
can be opened O_RDONLY, and permissions bits for pipes are
inconsistent across platforms.

On FreeBSD, OpenBSD, and Dragonfly, only the S_IFIFO bit is set
in st_mode with none of the permission bits are set.  Linux and
NetBSD have both the read and write permission bits set for both
ends of a the pipe, so they're just as inaccurate but allowed
the feature to work before this change.

For now, we'll just assume our users know that stdin is intended
for input and consider any pipe or regular file to be readable.

If we were to be pedantic, we'd check O_RDONLY or O_RDWR
description flags via the F_GETFL fcntl(2) op to determine if a
pipe or socket is readable.  However, I don't think it's worth
the code to do so.
---
 lib/PublicInbox/LEI.pm | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index f00b2465..1ba2c2a1 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -747,7 +747,7 @@ sub optparse ($$$) {
 					# w/o args means stdin
 					if ($sw eq 'stdin' && !@$argv &&
 							(-p $self->{0} ||
-							 -f _) && -r _) {
+							 -f _)) {
 						$OPT->{stdin} //= 1;
 					}
 					$ok = defined($OPT->{$sw}) and last;

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCHv2 3/9] lei: always use async `done' requests to store
  2023-10-07 21:24 ` [PATCH 3/9] lei: always use async `done' requests to store Eric Wong
  2023-10-08  1:58   ` Eric Wong
  2023-10-08  5:49   ` [PATCH 2.5/9] lei: fix implicit stdin support for pipes Eric Wong
@ 2023-10-08 18:54   ` Eric Wong
  2 siblings, 0 replies; 13+ messages in thread
From: Eric Wong @ 2023-10-08 18:54 UTC (permalink / raw)
  To: meta

It's safer against deadlocks and we still get proper error
reporting by passing stderr across in addition to the lei
socket.
---
 v2: no change to LeiRemote.pm, increase delay in lei-store-fail.t

 MANIFEST                      |  1 +
 lib/PublicInbox/LEI.pm        |  9 +++----
 lib/PublicInbox/LeiInput.pm   |  2 +-
 lib/PublicInbox/LeiStore.pm   | 17 ++++++------
 lib/PublicInbox/LeiXSearch.pm |  6 ++---
 t/lei-store-fail.t            | 51 +++++++++++++++++++++++++++++++++++
 6 files changed, 68 insertions(+), 18 deletions(-)
 create mode 100644 t/lei-store-fail.t

diff --git a/MANIFEST b/MANIFEST
index 4693cbe0..689c6bf6 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -514,6 +514,7 @@ t/lei-q-thread.t
 t/lei-refresh-mail-sync.t
 t/lei-reindex.t
 t/lei-sigpipe.t
+t/lei-store-fail.t
 t/lei-tag.t
 t/lei-up.t
 t/lei-watch.t
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 1ba2c2a1..e2b3c0d9 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1537,12 +1537,11 @@ sub lms {
 
 sub sto_done_request {
 	my ($lei, $wq) = @_;
-	return unless $lei->{sto};
+	return unless $lei->{sto} && $lei->{sto}->{-wq_s1};
 	local $current_lei = $lei;
-	my $sock = $wq ? $wq->{lei_sock} : undef;
-	$sock //= $lei->{sock};
-	my @io;
-	push(@io, $sock) if $sock; # async wait iff possible
+	my $s = ($wq ? $wq->{lei_sock} : undef) // $lei->{sock};
+	my $errfh = $lei->{2} // *STDERR{GLOB};
+	my @io = $s ? ($errfh, $s) : ($errfh);
 	eval { $lei->{sto}->wq_io_do('done', \@io) };
 	warn($@) if $@;
 }
diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm
index 91383265..93f8b6b8 100644
--- a/lib/PublicInbox/LeiInput.pm
+++ b/lib/PublicInbox/LeiInput.pm
@@ -467,7 +467,7 @@ sub process_inputs {
 	}
 	# always commit first, even on error partial work is acceptable for
 	# lei <import|tag|convert>
-	my $wait = $self->{lei}->{sto}->wq_do('done') if $self->{lei}->{sto};
+	$self->{lei}->sto_done_request;
 	$self->{lei}->fail($err) if $err;
 }
 
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 0cb78f79..e19ec88e 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -582,19 +582,20 @@ sub xchg_stderr {
 }
 
 sub done {
-	my ($self, $sock_ref) = @_;
-	my $err = '';
+	my ($self) = @_;
+	my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_done_request
+	my @err;
 	if (my $im = delete($self->{im})) {
 		eval { $im->done };
-		if ($@) {
-			$err .= "import done: $@\n";
-			warn $err;
-		}
+		push(@err, "E: import done: $@\n") if $@;
 	}
 	delete $self->{lms};
-	$self->{priv_eidx}->done; # V2Writable::done
+	eval { $self->{priv_eidx}->done }; # V2Writable::done
+	push(@err, "E: priv_eidx done: $@\n") if $@;
+	print { $errfh // *STDERR{GLOB} } @err;
+	send($lei_sock, 'child_error 256', 0) if @err && $lei_sock;
 	xchg_stderr($self);
-	die $err if $err;
+	die @err if @err;
 }
 
 sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 1caa9d06..4077191f 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -358,9 +358,7 @@ sub query_remote_mboxrd {
 		$fh = IO::Uncompress::Gunzip->new($fh, MultiStream => 1);
 		PublicInbox::MboxReader->mboxrd($fh, \&each_remote_eml, $self,
 						$lei, $each_smsg);
-		if (delete($self->{-sto_imported})) {
-			my $wait = $self->{import_sto}->wq_do('done');
-		}
+		$lei->sto_done_request if delete($self->{-sto_imported});
 		$reap_curl->join;
 		my $nr = delete $lei->{-nr_remote_eml} // 0;
 		if ($? == 0) {
@@ -402,7 +400,7 @@ sub query_done { # EOF callback for main daemon
 	delete $lei->{lxs};
 	($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
 		warn "BUG: {sto} missing with --mail-sync";
-	$lei->sto_done_request if $lei->{sto};
+	$lei->sto_done_request;
 	if (my $v2w = delete $lei->{v2w}) {
 		my $wait = $v2w->wq_do('done'); # may die
 		$v2w->wq_close;
diff --git a/t/lei-store-fail.t b/t/lei-store-fail.t
new file mode 100644
index 00000000..fb0f2b75
--- /dev/null
+++ b/t/lei-store-fail.t
@@ -0,0 +1,51 @@
+#!perl -w
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+# ensure we detect errors in lei/store
+use v5.12;
+use PublicInbox::TestCommon;
+use autodie qw(pipe open close seek);
+use Fcntl qw(SEEK_SET);
+use File::Path qw(remove_tree);
+
+my $start_home = $ENV{HOME}; # bug guard
+test_lei(sub {
+	lei_ok qw(import -q t/plack-qp.eml); # start the store
+	my $opt;
+	pipe($opt->{0}, my $in_w);
+	open $opt->{1}, '+>', undef;
+	open $opt->{2}, '+>', undef;
+	$opt->{-CLOFORK} = [ $in_w ];
+	my $cmd = [ qw(lei import -q -F mboxrd) ];
+	my $tp = start_script($cmd, undef, $opt);
+	close $opt->{0};
+	$in_w->autoflush(1);
+	for (1..500) { # need to fill up 64k read buffer
+		print $in_w <<EOM or xbail "print $!";
+From k\@y Fri Oct  2 00:00:00 1993
+From: <k\@example.com>
+Date: Sat, 02 Oct 2010 00:00:00 +0000
+Subject: hi
+Message-ID: <$_\@t>
+
+will this save?
+EOM
+	}
+	tick 0.2; # XXX ugh, this is so hacky
+
+	# make sto_done_request fail:
+	remove_tree("$ENV{HOME}/.local/share/lei/store");
+	# subsequent lei commands are undefined behavior,
+	# but we need to make sure the current lei command fails:
+
+	close $in_w; # should trigger ->done
+	$tp->join;
+	isnt($?, 0, 'lei import error code set on failure');
+	is(-s $opt->{1}, 0, 'nothing in stdout');
+	isnt(-s $opt->{2}, 0, 'stderr not empty');
+	seek($opt->{2}, 0, SEEK_SET);
+	my @err = readline($opt->{2});
+	ok(grep(!/^#/, @err), 'noted error in stderr') or diag "@err";
+});
+
+done_testing;

^ permalink raw reply related	[flat|nested] 13+ messages in thread

end of thread, other threads:[~2023-10-08 18:55 UTC | newest]

Thread overview: 13+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2023-10-07 21:24 [PATCH 0/9] more process-related cleanups Eric Wong
2023-10-07 21:24 ` [PATCH 1/9] xt/httpd-async-stream: avoid waitpid call Eric Wong
2023-10-07 21:24 ` [PATCH 2/9] lei: do not issue sto->done if socket is inactive Eric Wong
2023-10-07 21:24 ` [PATCH 3/9] lei: always use async `done' requests to store Eric Wong
2023-10-08  1:58   ` Eric Wong
2023-10-08  5:49   ` [PATCH 2.5/9] lei: fix implicit stdin support for pipes Eric Wong
2023-10-08 18:54   ` [PATCHv2 3/9] lei: always use async `done' requests to store Eric Wong
2023-10-07 21:24 ` [PATCH 4/9] ipc: require fork+SOCK_SEQPACKET for wq_* functions Eric Wong
2023-10-07 21:24 ` [PATCH 5/9] ipc: use autodie for most syscalls Eric Wong
2023-10-07 21:24 ` [PATCH 6/9] import: use autodie, rely on PerlIO for retries Eric Wong
2023-10-07 21:24 ` [PATCH 7/9] rename ProcessPipe to ProcessIO Eric Wong
2023-10-07 21:24 ` [PATCH 8/9] process_io: pass args to awaitpid as list Eric Wong
2023-10-07 21:24 ` [PATCH 9/9] cindex: start using autodie Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).