From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 1/5] treewide: handle EINTR for non-(signalfd|kevent)
Date: Tue, 20 Aug 2024 10:35:17 +0000 [thread overview]
Message-ID: <20240820103522.3548609-2-e@80x24.org> (raw)
In-Reply-To: <20240820103522.3548609-1-e@80x24.org>
We may encounter new architectures in Linux without syscall
number definitions or *BSD systems without IO::KQueue or kevent
support at all, so be prepared to handle signals anywhere within
the event loop in such cases.
---
lib/PublicInbox/CodeSearchIdx.pm | 24 ++++++++++++----
lib/PublicInbox/IPC.pm | 16 +++++++++--
lib/PublicInbox/LEI.pm | 49 +++++++++++++++++++++-----------
lib/PublicInbox/PktOp.pm | 17 ++++++-----
lib/PublicInbox/SHA.pm | 22 +++++++++++---
5 files changed, 92 insertions(+), 36 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 6d777bf6..ff3db8ba 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -72,7 +72,8 @@ use PublicInbox::Aspawn qw(run_await);
use Compress::Zlib qw(compress);
use Carp qw(croak);
use Time::Local qw(timegm);
-use autodie qw(close pipe open sysread seek sysseek send);
+use Errno qw(EINTR);
+use autodie qw(close pipe open seek sysseek);
our $DO_QUIT = 15; # signal number
our (
$LIVE_JOBS, # integer
@@ -225,6 +226,17 @@ sub check_objfmt_status ($$$) {
$fmt;
}
+sub xsend ($$) { # move to PerlIO if we need to
+ my ($s, $buf) = @_;
+ my $n;
+ while (1) {
+ $n = send $s, $buf, 0;
+ return $n if defined $n;
+ next if $! == EINTR;
+ croak "send: $!";
+ }
+}
+
sub store_repo { # wq_io_do, sends docid back
my ($self, $repo) = @_;
my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
@@ -248,7 +260,7 @@ EOM
my $did = $repo->{docid};
$did ? $self->{xdb}->replace_document($did, $doc)
: ($did = $self->{xdb}->add_document($doc));
- send($op_p, "repo_stored $did", 0);
+ xsend $op_p, "repo_stored $did";
}
sub cidx_ckpoint ($;$) {
@@ -293,7 +305,7 @@ sub cidx_reap_log { # awaitpid cb
my ($pid, $cmd, $self, $op_p) = @_;
if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
($? & 127) == POSIX::SIGPIPE))) {
- send($op_p, "shard_done $self->{shard}", 0);
+ xsend $op_p, "shard_done $self->{shard}";
} else {
warn "W: @$cmd (\$?=$?)\n";
$self->{xdb}->cancel_transaction;
@@ -444,7 +456,7 @@ sub fp_async_done { # run_git cb from worker
my ($opt, $self, $git, $op_p) = @_;
my $refs = delete $opt->{1} // 'BUG: no {-repo}->{refs}';
sysseek($refs, 0, SEEK_SET);
- send($op_p, 'fp_done '.sha_all(256, $refs)->hexdigest, 0);
+ xsend $op_p, 'fp_done '.sha_all(256, $refs)->hexdigest;
}
sub fp_done { # called parent via PktOp by fp_async_done
@@ -523,7 +535,7 @@ sub shard_commit { # via wq_io_do
my ($self) = @_;
my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
$self->commit_txn_lazy;
- send($op_p, "shard_done $self->{shard}", 0);
+ xsend $op_p, "shard_done $self->{shard}";
}
sub dump_roots_start {
@@ -818,7 +830,7 @@ sub prune_commit { # via wq_io_do in IDX_SHARDS
my $prune_op_p = delete $self->{0} // die 'BUG: no {0} op_p';
my $nr = delete $self->{nr_prune} // die 'BUG: nr_prune undef';
cidx_ckpoint($self, "prune [$self->{shard}] $nr done") if $nr;
- send($prune_op_p, "prune_done $self->{shard}", 0);
+ xsend $prune_op_p, "prune_done $self->{shard}";
}
sub shards_active { # post_loop_do
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index ed6d27fd..13a897be 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -10,7 +10,8 @@
package PublicInbox::IPC;
use v5.12;
use parent qw(Exporter);
-use autodie qw(close pipe read socketpair sysread);
+use autodie qw(close pipe read socketpair);
+use Errno qw(EAGAIN EINTR);
use Carp qw(croak);
use PublicInbox::DS qw(awaitpid);
use PublicInbox::Spawn;
@@ -215,8 +216,17 @@ sub recv_and_run {
}
while ($full_stream && $n < $len) {
my $r = sysread($s2, $buf, $len - $n, $n);
- croak "read EOF after $n/$len bytes" if $r == 0;
- $n = length($buf);
+ if ($r) {
+ $n = length($buf); # keep looping
+ } elsif (!defined $r) {
+ if ($! == EAGAIN) {
+ poll_in($s2)
+ } elsif ($! != EINTR) {
+ croak "sysread: $!";
+ } # next on EINTR
+ } else { # ($r == 0)
+ croak "read EOF after $n/$len bytes";
+ }
}
# Sereal dies on truncated data, Storable returns undef
my $args = ipc_thaw($buf) // die "thaw error on buffer of size: $n";
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index c5146428..637cc8b1 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -12,7 +12,7 @@ use parent qw(PublicInbox::DS PublicInbox::LeiExternal
use autodie qw(bind chdir open pipe socket socketpair syswrite unlink);
use Getopt::Long ();
use Socket qw(AF_UNIX SOCK_SEQPACKET pack_sockaddr_un);
-use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET);
+use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET EINTR);
use Cwd qw(getcwd);
use POSIX qw(strftime);
use IO::Handle ();
@@ -27,6 +27,7 @@ use PublicInbox::Import;
use PublicInbox::ContentHash qw(git_sha);
use PublicInbox::OnDestroy;
use PublicInbox::IPC;
+use PublicInbox::IO qw(poll_in);
use Time::HiRes qw(stat); # ctime comparisons for config cache
use File::Path ();
use File::Spec;
@@ -488,6 +489,15 @@ sub _drop_wq {
}
}
+sub send_gently ($$) {
+ my ($s, $buf) = @_;
+ my $n;
+ while (1) {
+ $n = send $s, $buf, 0;
+ return $n if defined($n) || $! != EINTR;
+ }
+}
+
# pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE
sub x_it ($$) {
my ($self, $code) = @_;
@@ -498,7 +508,7 @@ sub x_it ($$) {
$self->{pkt_op_p}->pkt_do('x_it', $code);
exit($code >> 8) if $$ != $daemon_pid;
} elsif ($self->{sock}) { # lei->daemon => lei(1) client
- send($self->{sock}, "x_it $code", 0);
+ send_gently $self->{sock}, "x_it $code";
} elsif ($quit == \&CORE::exit) { # an admin (one-shot) command
exit($code >> 8);
} # else ignore if client disconnected
@@ -569,7 +579,7 @@ sub child_error { # passes non-fatal curl exit codes to user
if ($self->{pkt_op_p}) { # to top lei-daemon
$self->{pkt_op_p}->pkt_do('child_error', $child_error);
} elsif ($self->{sock}) { # to lei(1) client
- send($self->{sock}, "child_error $child_error", 0);
+ send_gently $self->{sock}, "child_error $child_error";
} # else noop if client disconnected
}
@@ -1066,7 +1076,7 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail
while (my $op = shift(@$alerts)) {
if ($op eq ':WINCH') {
# hit the process group that started the MUA
- send($sock, '-WINCH', 0) if $sock;
+ send_gently $sock, '-WINCH' if $sock;
} elsif ($op eq ':bell') {
out($self, "\a");
} elsif ($op =~ /(?<!\\),/) { # bare ',' (not ',,')
@@ -1075,7 +1085,7 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail
my $cmd = $1; # run an arbitrary command
require Text::ParseWords;
$cmd = [ Text::ParseWords::shellwords($cmd) ];
- send($sock, exec_buf($cmd, {}), 0) if $sock;
+ send_gently $sock, exec_buf($cmd, {}) if $sock;
} else {
warn("W: unsupported --alert=$op\n"); # non-fatal
}
@@ -1130,7 +1140,7 @@ sub pgr_err {
say { $self->{2} } @msg, '# -quit pager to continue-';
$self->{2}->autoflush(1);
stop_pager($self);
- send($self->{sock}, 'wait', 0); # wait for user to quit pager
+ send_gently $self->{sock}, 'wait'; # wait for user to quit pager
}
sub stop_pager {
@@ -1147,22 +1157,22 @@ sub accept_dispatch { # Listener {post_accept} callback
my $self = bless { sock => $sock }, __PACKAGE__;
vec(my $rvec = '', fileno($sock), 1) = 1;
select($rvec, undef, undef, 60) or
- return send($sock, 'timed out waiting to recv FDs', 0);
+ return send_gently $sock, 'timed out waiting to recv FDs';
# (4096 * 33) >MAX_ARG_STRLEN
my @fds = $PublicInbox::IPC::recv_cmd->($sock, my $buf, 4096 * 33) or
return; # EOF
if (!defined($fds[0])) {
warn(my $msg = "recv_cmd failed: $!");
- return send($sock, $msg, 0);
+ return send_gently $sock, $msg;
} else {
my $i = 0;
open($self->{$i++}, '+<&=', $_) for @fds;
- $i == 4 or return send($sock, 'not enough FDs='.($i-1), 0)
+ $i == 4 or return send_gently $sock, 'not enough FDs='.($i-1);
}
# $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV);
# $buf = "$argc\0".join("\0", @ARGV).$ENV_STR."\0\0";
substr($buf, -2, 2, '') eq "\0\0" or # s/\0\0\z//
- return send($sock, 'request command truncated', 0);
+ return send_gently $sock, 'request command truncated';
my ($argc, @argv) = split(/\0/, $buf, -1);
undef $buf;
my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
@@ -1207,7 +1217,7 @@ sub event_step {
die "unrecognized client signal: $buf";
}
my $s = $self->{-socks} // []; # lei up --all
- @$s = grep { send($_, $buf, 0) } @$s;
+ @$s = grep { send_gently $_, $buf } @$s;
};
if (my $err = $@) {
eval { $self->fail($err) };
@@ -1572,14 +1582,21 @@ sub cfg_dump ($$) {
undef;
}
-sub request_umask {
+sub request_umask { # assumes client is trusted and fast
my ($lei) = @_;
my $s = $lei->{sock} // return;
send($s, 'umask', 0) // die "send: $!";
- vec(my $rvec = '', fileno($s), 1) = 1;
- select($rvec, undef, undef, 2) or die 'timeout waiting for umask';
- recv($s, my $v, 5, 0) // die "recv: $!";
- (my $u, $lei->{client_umask}) = unpack('AV', $v);
+ my ($v, $r, $u);
+ do { # n.b. poll_in returns -1 on EINTR
+ vec($v = '', fileno($s), 1) = 1;
+ $r = poll_in($s, 2) or
+ die 'timeout waiting for umask';
+ } while ($r < 0 && $! == EINTR);
+ do {
+ $r = recv $s, $v, 5, 0;
+ die "recv: $!" if !defined($r) && $! != EINTR;
+ } while (!defined $r);
+ ($u, $lei->{client_umask}) = unpack('AV', $v);
$u eq 'u' or warn "E: recv $v has no umask";
}
diff --git a/lib/PublicInbox/PktOp.pm b/lib/PublicInbox/PktOp.pm
index 1bcdd799..c3ee36aa 100644
--- a/lib/PublicInbox/PktOp.pm
+++ b/lib/PublicInbox/PktOp.pm
@@ -8,7 +8,7 @@
package PublicInbox::PktOp;
use v5.12;
use parent qw(PublicInbox::DS);
-use Errno qw(EAGAIN ECONNRESET);
+use Errno qw(EAGAIN ECONNRESET EINTR);
use PublicInbox::Syscall qw(EPOLLIN);
use Socket qw(AF_UNIX SOCK_SEQPACKET);
use PublicInbox::IPC qw(ipc_freeze ipc_thaw);
@@ -37,12 +37,15 @@ sub pkt_do { # for the producer to trigger event_step in consumer
sub event_step {
my ($self) = @_;
my $c = $self->{sock};
- my $n = recv($c, my $msg, 4096, 0);
- unless (defined $n) {
- return if $! == EAGAIN;
- die "recv: $!" if $! != ECONNRESET; # we may be bidirectional
- }
- my ($cmd, @pargs);
+ my ($msg, $n, $cmd, @pargs);
+ do {
+ $n = recv($c, $msg, 4096, 0);
+ unless (defined $n) {
+ next if $! == EINTR;
+ return if $! == EAGAIN;
+ die "recv: $!" if $! != ECONNRESET; # we may be bidirectional
+ }
+ } until (defined $n);
if (index($msg, "\0") > 0) {
($cmd, my $pargs) = split(/\0/, $msg, 2);
@pargs = @{ipc_thaw($pargs)};
diff --git a/lib/PublicInbox/SHA.pm b/lib/PublicInbox/SHA.pm
index 3fa8530e..5eb882c6 100644
--- a/lib/PublicInbox/SHA.pm
+++ b/lib/PublicInbox/SHA.pm
@@ -12,8 +12,10 @@
package PublicInbox::SHA;
use v5.12;
require Exporter;
+use Errno qw(EAGAIN EINTR);
+use PublicInbox::IO qw(poll_in);
+use Carp qw(croak);
our @EXPORT_OK = qw(sha1_hex sha256_hex sha256 sha_all);
-use autodie qw(sysread);
our @ISA;
BEGIN {
@@ -59,9 +61,21 @@ EOM
sub sha_all ($$) {
my ($n, $fh) = @_;
- my ($dig, $buf) = (PublicInbox::SHA->new($n));
- while (sysread($fh, $buf, 65536)) { $dig->add($buf) }
- $dig
+ my ($dig, $buf, $r) = (PublicInbox::SHA->new($n));
+ while (1) {
+ $r = sysread($fh, $buf, 65536);
+ if ($r) {
+ $dig->add($buf);
+ } elsif (!defined $r) {
+ if ($! == EAGAIN) {
+ poll_in($fh);
+ } elsif ($! != EINTR) {
+ croak "sysread: $!";
+ } # next on EINTR
+ } else { # EOF:
+ return $dig;
+ }
+ }
}
1;
next prev parent reply other threads:[~2024-08-20 10:35 UTC|newest]
Thread overview: 7+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-08-20 10:35 [PATCH 0/5] signal handling fixes Eric Wong
2024-08-20 10:35 ` Eric Wong [this message]
2024-08-20 10:35 ` [PATCH 2/5] lei: simplify forced signal check Eric Wong
2024-08-20 10:35 ` [PATCH 3/5] sigfd: call normal Perl %SIG handlers Eric Wong
2024-08-20 10:35 ` [PATCH 4/5] lei: allow Ctrl-C to interrupt IMAP+NNTP reads Eric Wong
2024-08-20 10:35 ` [PATCH 5/5] lei_xsearch: allow signals during long queries Eric Wong
2024-08-20 18:40 ` [PATCH 6/5] t/sigfd: reduce getpid() calls and hash lookups Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://public-inbox.org/README
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20240820103522.3548609-2-e@80x24.org \
--to=e@80x24.org \
--cc=meta@public-inbox.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).