unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 1/5] treewide: handle EINTR for non-(signalfd|kevent)
Date: Tue, 20 Aug 2024 10:35:17 +0000	[thread overview]
Message-ID: <20240820103522.3548609-2-e@80x24.org> (raw)
In-Reply-To: <20240820103522.3548609-1-e@80x24.org>

We may encounter new architectures in Linux without syscall
number definitions or *BSD systems without IO::KQueue or kevent
support at all, so be prepared to handle signals anywhere within
the event loop in such cases.
---
 lib/PublicInbox/CodeSearchIdx.pm | 24 ++++++++++++----
 lib/PublicInbox/IPC.pm           | 16 +++++++++--
 lib/PublicInbox/LEI.pm           | 49 +++++++++++++++++++++-----------
 lib/PublicInbox/PktOp.pm         | 17 ++++++-----
 lib/PublicInbox/SHA.pm           | 22 +++++++++++---
 5 files changed, 92 insertions(+), 36 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 6d777bf6..ff3db8ba 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -72,7 +72,8 @@ use PublicInbox::Aspawn qw(run_await);
 use Compress::Zlib qw(compress);
 use Carp qw(croak);
 use Time::Local qw(timegm);
-use autodie qw(close pipe open sysread seek sysseek send);
+use Errno qw(EINTR);
+use autodie qw(close pipe open seek sysseek);
 our $DO_QUIT = 15; # signal number
 our (
 	$LIVE_JOBS, # integer
@@ -225,6 +226,17 @@ sub check_objfmt_status ($$$) {
 	$fmt;
 }
 
+sub xsend ($$) { # move to PerlIO if we need to
+	my ($s, $buf) = @_;
+	my $n;
+	while (1) {
+		$n = send $s, $buf, 0;
+		return $n if defined $n;
+		next if $! == EINTR;
+		croak "send: $!";
+	}
+}
+
 sub store_repo { # wq_io_do, sends docid back
 	my ($self, $repo) = @_;
 	my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
@@ -248,7 +260,7 @@ EOM
 	my $did = $repo->{docid};
 	$did ? $self->{xdb}->replace_document($did, $doc)
 		: ($did = $self->{xdb}->add_document($doc));
-	send($op_p, "repo_stored $did", 0);
+	xsend $op_p, "repo_stored $did";
 }
 
 sub cidx_ckpoint ($;$) {
@@ -293,7 +305,7 @@ sub cidx_reap_log { # awaitpid cb
 	my ($pid, $cmd, $self, $op_p) = @_;
 	if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
 				($? & 127) == POSIX::SIGPIPE))) {
-		send($op_p, "shard_done $self->{shard}", 0);
+		xsend $op_p, "shard_done $self->{shard}";
 	} else {
 		warn "W: @$cmd (\$?=$?)\n";
 		$self->{xdb}->cancel_transaction;
@@ -444,7 +456,7 @@ sub fp_async_done { # run_git cb from worker
 	my ($opt, $self, $git, $op_p) = @_;
 	my $refs = delete $opt->{1} // 'BUG: no {-repo}->{refs}';
 	sysseek($refs, 0, SEEK_SET);
-	send($op_p, 'fp_done '.sha_all(256, $refs)->hexdigest, 0);
+	xsend $op_p, 'fp_done '.sha_all(256, $refs)->hexdigest;
 }
 
 sub fp_done { # called parent via PktOp by fp_async_done
@@ -523,7 +535,7 @@ sub shard_commit { # via wq_io_do
 	my ($self) = @_;
 	my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
 	$self->commit_txn_lazy;
-	send($op_p, "shard_done $self->{shard}", 0);
+	xsend $op_p, "shard_done $self->{shard}";
 }
 
 sub dump_roots_start {
@@ -818,7 +830,7 @@ sub prune_commit { # via wq_io_do in IDX_SHARDS
 	my $prune_op_p = delete $self->{0} // die 'BUG: no {0} op_p';
 	my $nr = delete $self->{nr_prune} // die 'BUG: nr_prune undef';
 	cidx_ckpoint($self, "prune [$self->{shard}] $nr done") if $nr;
-	send($prune_op_p, "prune_done $self->{shard}", 0);
+	xsend $prune_op_p, "prune_done $self->{shard}";
 }
 
 sub shards_active { # post_loop_do
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index ed6d27fd..13a897be 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -10,7 +10,8 @@
 package PublicInbox::IPC;
 use v5.12;
 use parent qw(Exporter);
-use autodie qw(close pipe read socketpair sysread);
+use autodie qw(close pipe read socketpair);
+use Errno qw(EAGAIN EINTR);
 use Carp qw(croak);
 use PublicInbox::DS qw(awaitpid);
 use PublicInbox::Spawn;
@@ -215,8 +216,17 @@ sub recv_and_run {
 	}
 	while ($full_stream && $n < $len) {
 		my $r = sysread($s2, $buf, $len - $n, $n);
-		croak "read EOF after $n/$len bytes" if $r == 0;
-		$n = length($buf);
+		if ($r) {
+			$n = length($buf); # keep looping
+		} elsif (!defined $r) {
+			if ($! == EAGAIN) {
+				poll_in($s2)
+			} elsif ($! != EINTR) {
+				croak "sysread: $!";
+			} # next on EINTR
+		} else { # ($r == 0)
+			croak "read EOF after $n/$len bytes";
+		}
 	}
 	# Sereal dies on truncated data, Storable returns undef
 	my $args = ipc_thaw($buf) // die "thaw error on buffer of size: $n";
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index c5146428..637cc8b1 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -12,7 +12,7 @@ use parent qw(PublicInbox::DS PublicInbox::LeiExternal
 use autodie qw(bind chdir open pipe socket socketpair syswrite unlink);
 use Getopt::Long ();
 use Socket qw(AF_UNIX SOCK_SEQPACKET pack_sockaddr_un);
-use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET);
+use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET EINTR);
 use Cwd qw(getcwd);
 use POSIX qw(strftime);
 use IO::Handle ();
@@ -27,6 +27,7 @@ use PublicInbox::Import;
 use PublicInbox::ContentHash qw(git_sha);
 use PublicInbox::OnDestroy;
 use PublicInbox::IPC;
+use PublicInbox::IO qw(poll_in);
 use Time::HiRes qw(stat); # ctime comparisons for config cache
 use File::Path ();
 use File::Spec;
@@ -488,6 +489,15 @@ sub _drop_wq {
 	}
 }
 
+sub send_gently ($$) {
+	my ($s, $buf) = @_;
+	my $n;
+	while (1) {
+		$n = send $s, $buf, 0;
+		return $n if defined($n) || $! != EINTR;
+	}
+}
+
 # pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE
 sub x_it ($$) {
 	my ($self, $code) = @_;
@@ -498,7 +508,7 @@ sub x_it ($$) {
 		$self->{pkt_op_p}->pkt_do('x_it', $code);
 		exit($code >> 8) if $$ != $daemon_pid;
 	} elsif ($self->{sock}) { # lei->daemon => lei(1) client
-		send($self->{sock}, "x_it $code", 0);
+		send_gently $self->{sock}, "x_it $code";
 	} elsif ($quit == \&CORE::exit) { # an admin (one-shot) command
 		exit($code >> 8);
 	} # else ignore if client disconnected
@@ -569,7 +579,7 @@ sub child_error { # passes non-fatal curl exit codes to user
 	if ($self->{pkt_op_p}) { # to top lei-daemon
 		$self->{pkt_op_p}->pkt_do('child_error', $child_error);
 	} elsif ($self->{sock}) { # to lei(1) client
-		send($self->{sock}, "child_error $child_error", 0);
+		send_gently $self->{sock}, "child_error $child_error";
 	} # else noop if client disconnected
 }
 
@@ -1066,7 +1076,7 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail
 	while (my $op = shift(@$alerts)) {
 		if ($op eq ':WINCH') {
 			# hit the process group that started the MUA
-			send($sock, '-WINCH', 0) if $sock;
+			send_gently $sock, '-WINCH' if $sock;
 		} elsif ($op eq ':bell') {
 			out($self, "\a");
 		} elsif ($op =~ /(?<!\\),/) { # bare ',' (not ',,')
@@ -1075,7 +1085,7 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail
 			my $cmd = $1; # run an arbitrary command
 			require Text::ParseWords;
 			$cmd = [ Text::ParseWords::shellwords($cmd) ];
-			send($sock, exec_buf($cmd, {}), 0) if $sock;
+			send_gently $sock, exec_buf($cmd, {}) if $sock;
 		} else {
 			warn("W: unsupported --alert=$op\n"); # non-fatal
 		}
@@ -1130,7 +1140,7 @@ sub pgr_err {
 	say { $self->{2} } @msg, '# -quit pager to continue-';
 	$self->{2}->autoflush(1);
 	stop_pager($self);
-	send($self->{sock}, 'wait', 0); # wait for user to quit pager
+	send_gently $self->{sock}, 'wait'; # wait for user to quit pager
 }
 
 sub stop_pager {
@@ -1147,22 +1157,22 @@ sub accept_dispatch { # Listener {post_accept} callback
 	my $self = bless { sock => $sock }, __PACKAGE__;
 	vec(my $rvec = '', fileno($sock), 1) = 1;
 	select($rvec, undef, undef, 60) or
-		return send($sock, 'timed out waiting to recv FDs', 0);
+		return send_gently $sock, 'timed out waiting to recv FDs';
 	# (4096 * 33) >MAX_ARG_STRLEN
 	my @fds = $PublicInbox::IPC::recv_cmd->($sock, my $buf, 4096 * 33) or
 		return; # EOF
 	if (!defined($fds[0])) {
 		warn(my $msg = "recv_cmd failed: $!");
-		return send($sock, $msg, 0);
+		return send_gently $sock, $msg;
 	} else {
 		my $i = 0;
 		open($self->{$i++}, '+<&=', $_) for @fds;
-		$i == 4 or return send($sock, 'not enough FDs='.($i-1), 0)
+		$i == 4 or return send_gently $sock, 'not enough FDs='.($i-1);
 	}
 	# $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV);
 	# $buf = "$argc\0".join("\0", @ARGV).$ENV_STR."\0\0";
 	substr($buf, -2, 2, '') eq "\0\0" or  # s/\0\0\z//
-		return send($sock, 'request command truncated', 0);
+		return send_gently $sock, 'request command truncated';
 	my ($argc, @argv) = split(/\0/, $buf, -1);
 	undef $buf;
 	my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
@@ -1207,7 +1217,7 @@ sub event_step {
 			die "unrecognized client signal: $buf";
 		}
 		my $s = $self->{-socks} // []; # lei up --all
-		@$s = grep { send($_, $buf, 0) } @$s;
+		@$s = grep { send_gently $_, $buf } @$s;
 	};
 	if (my $err = $@) {
 		eval { $self->fail($err) };
@@ -1572,14 +1582,21 @@ sub cfg_dump ($$) {
 	undef;
 }
 
-sub request_umask {
+sub request_umask { # assumes client is trusted and fast
 	my ($lei) = @_;
 	my $s = $lei->{sock} // return;
 	send($s, 'umask', 0) // die "send: $!";
-	vec(my $rvec = '', fileno($s), 1) = 1;
-	select($rvec, undef, undef, 2) or die 'timeout waiting for umask';
-	recv($s, my $v, 5, 0) // die "recv: $!";
-	(my $u, $lei->{client_umask}) = unpack('AV', $v);
+	my ($v, $r, $u);
+	do { # n.b. poll_in returns -1 on EINTR
+		vec($v = '', fileno($s), 1) = 1;
+		$r = poll_in($s, 2) or
+			die 'timeout waiting for umask';
+	} while ($r < 0 && $! == EINTR);
+	do {
+		$r = recv $s, $v, 5, 0;
+		die "recv: $!" if !defined($r) && $! != EINTR;
+	} while (!defined $r);
+	($u, $lei->{client_umask}) = unpack('AV', $v);
 	$u eq 'u' or warn "E: recv $v has no umask";
 }
 
diff --git a/lib/PublicInbox/PktOp.pm b/lib/PublicInbox/PktOp.pm
index 1bcdd799..c3ee36aa 100644
--- a/lib/PublicInbox/PktOp.pm
+++ b/lib/PublicInbox/PktOp.pm
@@ -8,7 +8,7 @@
 package PublicInbox::PktOp;
 use v5.12;
 use parent qw(PublicInbox::DS);
-use Errno qw(EAGAIN ECONNRESET);
+use Errno qw(EAGAIN ECONNRESET EINTR);
 use PublicInbox::Syscall qw(EPOLLIN);
 use Socket qw(AF_UNIX SOCK_SEQPACKET);
 use PublicInbox::IPC qw(ipc_freeze ipc_thaw);
@@ -37,12 +37,15 @@ sub pkt_do { # for the producer to trigger event_step in consumer
 sub event_step {
 	my ($self) = @_;
 	my $c = $self->{sock};
-	my $n = recv($c, my $msg, 4096, 0);
-	unless (defined $n) {
-		return if $! == EAGAIN;
-		die "recv: $!" if $! != ECONNRESET; # we may be bidirectional
-	}
-	my ($cmd, @pargs);
+	my ($msg, $n, $cmd, @pargs);
+	do {
+		$n = recv($c, $msg, 4096, 0);
+		unless (defined $n) {
+			next if $! == EINTR;
+			return if $! == EAGAIN;
+			die "recv: $!" if $! != ECONNRESET; # we may be bidirectional
+		}
+	} until (defined $n);
 	if (index($msg, "\0") > 0) {
 		($cmd, my $pargs) = split(/\0/, $msg, 2);
 		@pargs = @{ipc_thaw($pargs)};
diff --git a/lib/PublicInbox/SHA.pm b/lib/PublicInbox/SHA.pm
index 3fa8530e..5eb882c6 100644
--- a/lib/PublicInbox/SHA.pm
+++ b/lib/PublicInbox/SHA.pm
@@ -12,8 +12,10 @@
 package PublicInbox::SHA;
 use v5.12;
 require Exporter;
+use Errno qw(EAGAIN EINTR);
+use PublicInbox::IO qw(poll_in);
+use Carp qw(croak);
 our @EXPORT_OK = qw(sha1_hex sha256_hex sha256 sha_all);
-use autodie qw(sysread);
 our @ISA;
 
 BEGIN {
@@ -59,9 +61,21 @@ EOM
 
 sub sha_all ($$) {
 	my ($n, $fh) = @_;
-	my ($dig, $buf) = (PublicInbox::SHA->new($n));
-	while (sysread($fh, $buf, 65536)) { $dig->add($buf) }
-	$dig
+	my ($dig, $buf, $r) = (PublicInbox::SHA->new($n));
+	while (1) {
+		$r = sysread($fh, $buf, 65536);
+		if ($r) {
+			$dig->add($buf);
+		} elsif (!defined $r) {
+			if ($! == EAGAIN) {
+				poll_in($fh);
+			} elsif ($! != EINTR) {
+				croak "sysread: $!";
+			} # next on EINTR
+		} else { # EOF:
+			return $dig;
+		}
+	}
 }
 
 1;

  reply	other threads:[~2024-08-20 10:35 UTC|newest]

Thread overview: 7+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-08-20 10:35 [PATCH 0/5] signal handling fixes Eric Wong
2024-08-20 10:35 ` Eric Wong [this message]
2024-08-20 10:35 ` [PATCH 2/5] lei: simplify forced signal check Eric Wong
2024-08-20 10:35 ` [PATCH 3/5] sigfd: call normal Perl %SIG handlers Eric Wong
2024-08-20 10:35 ` [PATCH 4/5] lei: allow Ctrl-C to interrupt IMAP+NNTP reads Eric Wong
2024-08-20 10:35 ` [PATCH 5/5] lei_xsearch: allow signals during long queries Eric Wong
2024-08-20 18:40   ` [PATCH 6/5] t/sigfd: reduce getpid() calls and hash lookups Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240820103522.3548609-2-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).