* [PATCH 1/5] treewide: handle EINTR for non-(signalfd|kevent)
2024-08-20 10:35 [PATCH 0/5] signal handling fixes Eric Wong
@ 2024-08-20 10:35 ` Eric Wong
2024-08-20 10:35 ` [PATCH 2/5] lei: simplify forced signal check Eric Wong
` (3 subsequent siblings)
4 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2024-08-20 10:35 UTC (permalink / raw)
To: meta
We may encounter new architectures in Linux without syscall
number definitions or *BSD systems without IO::KQueue or kevent
support at all, so be prepared to handle signals anywhere within
the event loop in such cases.
---
lib/PublicInbox/CodeSearchIdx.pm | 24 ++++++++++++----
lib/PublicInbox/IPC.pm | 16 +++++++++--
lib/PublicInbox/LEI.pm | 49 +++++++++++++++++++++-----------
lib/PublicInbox/PktOp.pm | 17 ++++++-----
lib/PublicInbox/SHA.pm | 22 +++++++++++---
5 files changed, 92 insertions(+), 36 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 6d777bf6..ff3db8ba 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -72,7 +72,8 @@ use PublicInbox::Aspawn qw(run_await);
use Compress::Zlib qw(compress);
use Carp qw(croak);
use Time::Local qw(timegm);
-use autodie qw(close pipe open sysread seek sysseek send);
+use Errno qw(EINTR);
+use autodie qw(close pipe open seek sysseek);
our $DO_QUIT = 15; # signal number
our (
$LIVE_JOBS, # integer
@@ -225,6 +226,17 @@ sub check_objfmt_status ($$$) {
$fmt;
}
+sub xsend ($$) { # move to PerlIO if we need to
+ my ($s, $buf) = @_;
+ my $n;
+ while (1) {
+ $n = send $s, $buf, 0;
+ return $n if defined $n;
+ next if $! == EINTR;
+ croak "send: $!";
+ }
+}
+
sub store_repo { # wq_io_do, sends docid back
my ($self, $repo) = @_;
my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
@@ -248,7 +260,7 @@ EOM
my $did = $repo->{docid};
$did ? $self->{xdb}->replace_document($did, $doc)
: ($did = $self->{xdb}->add_document($doc));
- send($op_p, "repo_stored $did", 0);
+ xsend $op_p, "repo_stored $did";
}
sub cidx_ckpoint ($;$) {
@@ -293,7 +305,7 @@ sub cidx_reap_log { # awaitpid cb
my ($pid, $cmd, $self, $op_p) = @_;
if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
($? & 127) == POSIX::SIGPIPE))) {
- send($op_p, "shard_done $self->{shard}", 0);
+ xsend $op_p, "shard_done $self->{shard}";
} else {
warn "W: @$cmd (\$?=$?)\n";
$self->{xdb}->cancel_transaction;
@@ -444,7 +456,7 @@ sub fp_async_done { # run_git cb from worker
my ($opt, $self, $git, $op_p) = @_;
my $refs = delete $opt->{1} // 'BUG: no {-repo}->{refs}';
sysseek($refs, 0, SEEK_SET);
- send($op_p, 'fp_done '.sha_all(256, $refs)->hexdigest, 0);
+ xsend $op_p, 'fp_done '.sha_all(256, $refs)->hexdigest;
}
sub fp_done { # called parent via PktOp by fp_async_done
@@ -523,7 +535,7 @@ sub shard_commit { # via wq_io_do
my ($self) = @_;
my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
$self->commit_txn_lazy;
- send($op_p, "shard_done $self->{shard}", 0);
+ xsend $op_p, "shard_done $self->{shard}";
}
sub dump_roots_start {
@@ -818,7 +830,7 @@ sub prune_commit { # via wq_io_do in IDX_SHARDS
my $prune_op_p = delete $self->{0} // die 'BUG: no {0} op_p';
my $nr = delete $self->{nr_prune} // die 'BUG: nr_prune undef';
cidx_ckpoint($self, "prune [$self->{shard}] $nr done") if $nr;
- send($prune_op_p, "prune_done $self->{shard}", 0);
+ xsend $prune_op_p, "prune_done $self->{shard}";
}
sub shards_active { # post_loop_do
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index ed6d27fd..13a897be 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -10,7 +10,8 @@
package PublicInbox::IPC;
use v5.12;
use parent qw(Exporter);
-use autodie qw(close pipe read socketpair sysread);
+use autodie qw(close pipe read socketpair);
+use Errno qw(EAGAIN EINTR);
use Carp qw(croak);
use PublicInbox::DS qw(awaitpid);
use PublicInbox::Spawn;
@@ -215,8 +216,17 @@ sub recv_and_run {
}
while ($full_stream && $n < $len) {
my $r = sysread($s2, $buf, $len - $n, $n);
- croak "read EOF after $n/$len bytes" if $r == 0;
- $n = length($buf);
+ if ($r) {
+ $n = length($buf); # keep looping
+ } elsif (!defined $r) {
+ if ($! == EAGAIN) {
+ poll_in($s2)
+ } elsif ($! != EINTR) {
+ croak "sysread: $!";
+ } # next on EINTR
+ } else { # ($r == 0)
+ croak "read EOF after $n/$len bytes";
+ }
}
# Sereal dies on truncated data, Storable returns undef
my $args = ipc_thaw($buf) // die "thaw error on buffer of size: $n";
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index c5146428..637cc8b1 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -12,7 +12,7 @@ use parent qw(PublicInbox::DS PublicInbox::LeiExternal
use autodie qw(bind chdir open pipe socket socketpair syswrite unlink);
use Getopt::Long ();
use Socket qw(AF_UNIX SOCK_SEQPACKET pack_sockaddr_un);
-use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET);
+use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET EINTR);
use Cwd qw(getcwd);
use POSIX qw(strftime);
use IO::Handle ();
@@ -27,6 +27,7 @@ use PublicInbox::Import;
use PublicInbox::ContentHash qw(git_sha);
use PublicInbox::OnDestroy;
use PublicInbox::IPC;
+use PublicInbox::IO qw(poll_in);
use Time::HiRes qw(stat); # ctime comparisons for config cache
use File::Path ();
use File::Spec;
@@ -488,6 +489,15 @@ sub _drop_wq {
}
}
+sub send_gently ($$) {
+ my ($s, $buf) = @_;
+ my $n;
+ while (1) {
+ $n = send $s, $buf, 0;
+ return $n if defined($n) || $! != EINTR;
+ }
+}
+
# pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE
sub x_it ($$) {
my ($self, $code) = @_;
@@ -498,7 +508,7 @@ sub x_it ($$) {
$self->{pkt_op_p}->pkt_do('x_it', $code);
exit($code >> 8) if $$ != $daemon_pid;
} elsif ($self->{sock}) { # lei->daemon => lei(1) client
- send($self->{sock}, "x_it $code", 0);
+ send_gently $self->{sock}, "x_it $code";
} elsif ($quit == \&CORE::exit) { # an admin (one-shot) command
exit($code >> 8);
} # else ignore if client disconnected
@@ -569,7 +579,7 @@ sub child_error { # passes non-fatal curl exit codes to user
if ($self->{pkt_op_p}) { # to top lei-daemon
$self->{pkt_op_p}->pkt_do('child_error', $child_error);
} elsif ($self->{sock}) { # to lei(1) client
- send($self->{sock}, "child_error $child_error", 0);
+ send_gently $self->{sock}, "child_error $child_error";
} # else noop if client disconnected
}
@@ -1066,7 +1076,7 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail
while (my $op = shift(@$alerts)) {
if ($op eq ':WINCH') {
# hit the process group that started the MUA
- send($sock, '-WINCH', 0) if $sock;
+ send_gently $sock, '-WINCH' if $sock;
} elsif ($op eq ':bell') {
out($self, "\a");
} elsif ($op =~ /(?<!\\),/) { # bare ',' (not ',,')
@@ -1075,7 +1085,7 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail
my $cmd = $1; # run an arbitrary command
require Text::ParseWords;
$cmd = [ Text::ParseWords::shellwords($cmd) ];
- send($sock, exec_buf($cmd, {}), 0) if $sock;
+ send_gently $sock, exec_buf($cmd, {}) if $sock;
} else {
warn("W: unsupported --alert=$op\n"); # non-fatal
}
@@ -1130,7 +1140,7 @@ sub pgr_err {
say { $self->{2} } @msg, '# -quit pager to continue-';
$self->{2}->autoflush(1);
stop_pager($self);
- send($self->{sock}, 'wait', 0); # wait for user to quit pager
+ send_gently $self->{sock}, 'wait'; # wait for user to quit pager
}
sub stop_pager {
@@ -1147,22 +1157,22 @@ sub accept_dispatch { # Listener {post_accept} callback
my $self = bless { sock => $sock }, __PACKAGE__;
vec(my $rvec = '', fileno($sock), 1) = 1;
select($rvec, undef, undef, 60) or
- return send($sock, 'timed out waiting to recv FDs', 0);
+ return send_gently $sock, 'timed out waiting to recv FDs';
# (4096 * 33) >MAX_ARG_STRLEN
my @fds = $PublicInbox::IPC::recv_cmd->($sock, my $buf, 4096 * 33) or
return; # EOF
if (!defined($fds[0])) {
warn(my $msg = "recv_cmd failed: $!");
- return send($sock, $msg, 0);
+ return send_gently $sock, $msg;
} else {
my $i = 0;
open($self->{$i++}, '+<&=', $_) for @fds;
- $i == 4 or return send($sock, 'not enough FDs='.($i-1), 0)
+ $i == 4 or return send_gently $sock, 'not enough FDs='.($i-1);
}
# $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV);
# $buf = "$argc\0".join("\0", @ARGV).$ENV_STR."\0\0";
substr($buf, -2, 2, '') eq "\0\0" or # s/\0\0\z//
- return send($sock, 'request command truncated', 0);
+ return send_gently $sock, 'request command truncated';
my ($argc, @argv) = split(/\0/, $buf, -1);
undef $buf;
my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
@@ -1207,7 +1217,7 @@ sub event_step {
die "unrecognized client signal: $buf";
}
my $s = $self->{-socks} // []; # lei up --all
- @$s = grep { send($_, $buf, 0) } @$s;
+ @$s = grep { send_gently $_, $buf } @$s;
};
if (my $err = $@) {
eval { $self->fail($err) };
@@ -1572,14 +1582,21 @@ sub cfg_dump ($$) {
undef;
}
-sub request_umask {
+sub request_umask { # assumes client is trusted and fast
my ($lei) = @_;
my $s = $lei->{sock} // return;
send($s, 'umask', 0) // die "send: $!";
- vec(my $rvec = '', fileno($s), 1) = 1;
- select($rvec, undef, undef, 2) or die 'timeout waiting for umask';
- recv($s, my $v, 5, 0) // die "recv: $!";
- (my $u, $lei->{client_umask}) = unpack('AV', $v);
+ my ($v, $r, $u);
+ do { # n.b. poll_in returns -1 on EINTR
+ vec($v = '', fileno($s), 1) = 1;
+ $r = poll_in($s, 2) or
+ die 'timeout waiting for umask';
+ } while ($r < 0 && $! == EINTR);
+ do {
+ $r = recv $s, $v, 5, 0;
+ die "recv: $!" if !defined($r) && $! != EINTR;
+ } while (!defined $r);
+ ($u, $lei->{client_umask}) = unpack('AV', $v);
$u eq 'u' or warn "E: recv $v has no umask";
}
diff --git a/lib/PublicInbox/PktOp.pm b/lib/PublicInbox/PktOp.pm
index 1bcdd799..c3ee36aa 100644
--- a/lib/PublicInbox/PktOp.pm
+++ b/lib/PublicInbox/PktOp.pm
@@ -8,7 +8,7 @@
package PublicInbox::PktOp;
use v5.12;
use parent qw(PublicInbox::DS);
-use Errno qw(EAGAIN ECONNRESET);
+use Errno qw(EAGAIN ECONNRESET EINTR);
use PublicInbox::Syscall qw(EPOLLIN);
use Socket qw(AF_UNIX SOCK_SEQPACKET);
use PublicInbox::IPC qw(ipc_freeze ipc_thaw);
@@ -37,12 +37,15 @@ sub pkt_do { # for the producer to trigger event_step in consumer
sub event_step {
my ($self) = @_;
my $c = $self->{sock};
- my $n = recv($c, my $msg, 4096, 0);
- unless (defined $n) {
- return if $! == EAGAIN;
- die "recv: $!" if $! != ECONNRESET; # we may be bidirectional
- }
- my ($cmd, @pargs);
+ my ($msg, $n, $cmd, @pargs);
+ do {
+ $n = recv($c, $msg, 4096, 0);
+ unless (defined $n) {
+ next if $! == EINTR;
+ return if $! == EAGAIN;
+ die "recv: $!" if $! != ECONNRESET; # we may be bidirectional
+ }
+ } until (defined $n);
if (index($msg, "\0") > 0) {
($cmd, my $pargs) = split(/\0/, $msg, 2);
@pargs = @{ipc_thaw($pargs)};
diff --git a/lib/PublicInbox/SHA.pm b/lib/PublicInbox/SHA.pm
index 3fa8530e..5eb882c6 100644
--- a/lib/PublicInbox/SHA.pm
+++ b/lib/PublicInbox/SHA.pm
@@ -12,8 +12,10 @@
package PublicInbox::SHA;
use v5.12;
require Exporter;
+use Errno qw(EAGAIN EINTR);
+use PublicInbox::IO qw(poll_in);
+use Carp qw(croak);
our @EXPORT_OK = qw(sha1_hex sha256_hex sha256 sha_all);
-use autodie qw(sysread);
our @ISA;
BEGIN {
@@ -59,9 +61,21 @@ EOM
sub sha_all ($$) {
my ($n, $fh) = @_;
- my ($dig, $buf) = (PublicInbox::SHA->new($n));
- while (sysread($fh, $buf, 65536)) { $dig->add($buf) }
- $dig
+ my ($dig, $buf, $r) = (PublicInbox::SHA->new($n));
+ while (1) {
+ $r = sysread($fh, $buf, 65536);
+ if ($r) {
+ $dig->add($buf);
+ } elsif (!defined $r) {
+ if ($! == EAGAIN) {
+ poll_in($fh);
+ } elsif ($! != EINTR) {
+ croak "sysread: $!";
+ } # next on EINTR
+ } else { # EOF:
+ return $dig;
+ }
+ }
}
1;
^ permalink raw reply related [flat|nested] 7+ messages in thread
* [PATCH 2/5] lei: simplify forced signal check
2024-08-20 10:35 [PATCH 0/5] signal handling fixes Eric Wong
2024-08-20 10:35 ` [PATCH 1/5] treewide: handle EINTR for non-(signalfd|kevent) Eric Wong
@ 2024-08-20 10:35 ` Eric Wong
2024-08-20 10:35 ` [PATCH 3/5] sigfd: call normal Perl %SIG handlers Eric Wong
` (2 subsequent siblings)
4 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2024-08-20 10:35 UTC (permalink / raw)
To: meta
There's no need to loop since a select call is enough to force
signal handlers to be fired.
---
script/lei | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/script/lei b/script/lei
index a5aef956..d6cc4ef1 100755
--- a/script/lei
+++ b/script/lei
@@ -139,6 +139,6 @@ while (1) {
$sigchld->();
if (my $sig = ($x_it_code & 127)) {
kill $sig, $$;
- sleep(1) while 1; # no self-pipe/signalfd, here, so we loop
+ select undef, undef, undef, 0; # for kernel to act on signal
}
exit($x_it_code >> 8);
^ permalink raw reply related [flat|nested] 7+ messages in thread
* [PATCH 3/5] sigfd: call normal Perl %SIG handlers
2024-08-20 10:35 [PATCH 0/5] signal handling fixes Eric Wong
2024-08-20 10:35 ` [PATCH 1/5] treewide: handle EINTR for non-(signalfd|kevent) Eric Wong
2024-08-20 10:35 ` [PATCH 2/5] lei: simplify forced signal check Eric Wong
@ 2024-08-20 10:35 ` Eric Wong
2024-08-20 10:35 ` [PATCH 4/5] lei: allow Ctrl-C to interrupt IMAP+NNTP reads Eric Wong
2024-08-20 10:35 ` [PATCH 5/5] lei_xsearch: allow signals during long queries Eric Wong
4 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2024-08-20 10:35 UTC (permalink / raw)
To: meta
Instead of storing our own mapping of signal handler callbacks,
rely on the standard %SIG hash table which can be arbitrarily
updated from anywhere.
This makes it easier to allow existing synchronous code (e.g.
NetReader using Mail::IMAPClient or Net::NNTP) to add explicit
points where pending signals can be checked.
Additionally, it allows the `DEFAULT' (SIG_DFL) signal handler
to fire when there's no Perl subroutine to register.
Finally, this also allows us to rely on the OS + Perl itself to
dispatch signal handlers on kevent-based systems (and avoid
redundant dispatch due to our (previous) Linux-centric API). It
makes Linux signalfd the only system where we'd need to dispatch
%SIG callbacks ourselves.
---
lib/PublicInbox/DS.pm | 21 ++++++++++++-------
lib/PublicInbox/Sigfd.pm | 43 ++++++++++++++++++++++++++------------
t/sigfd.t | 45 +++++++++++++++++++++++++---------------
3 files changed, 71 insertions(+), 38 deletions(-)
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index f807c626..bb45ec99 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -248,7 +248,7 @@ sub sigset_prep ($$$) {
my ($sig, $init, $each) = @_; # $sig: { signame => whatever }
my $ret = POSIX::SigSet->new;
$ret->$init or die "$init: $!";
- for my $s (keys %$sig) {
+ for my $s (ref($sig) eq 'HASH' ? keys(%$sig) : @$sig) {
my $num = $SIGNUM{$s} // POSIX->can("SIG$s")->();
$ret->$each($num) or die "$each ($s => $num): $!";
}
@@ -259,6 +259,13 @@ sub sigset_prep ($$$) {
sub allowset ($) { sigset_prep $_[0], 'fillset', 'delset' }
sub unblockset ($) { sigset_prep $_[0], 'emptyset', 'addset' }
+sub allow_sigs (@) {
+ my @signames = @_;
+ my $tmp = allowset(\@signames);
+ sig_setmask($tmp, my $old = POSIX::SigSet->new);
+ on_destroy \&sig_setmask, $old;
+}
+
# Start processing IO events. In most daemon programs this never exits. See
# C<post_loop_do> for how to exit the loop.
sub event_loop (;$$) {
@@ -266,17 +273,15 @@ sub event_loop (;$$) {
$Poller //= _InitPoller();
require PublicInbox::Sigfd if $sig;
my $sigfd = $sig ? PublicInbox::Sigfd->new($sig) : undef;
- if ($sigfd && $sigfd->{is_kq}) {
- my $tmp = allowset($sig);
- local @SIG{keys %$sig} = values(%$sig);
- sig_setmask($tmp, my $old = POSIX::SigSet->new);
+ local $SIG{PIPE} = 'IGNORE';
+ local @SIG{keys %$sig} = values(%$sig) if $sig;
+ if ($sigfd && $sigfd->{kq_sigs}) {
# Unlike Linux signalfd, EVFILT_SIGNAL can't handle
# signals received before the filter is created,
# so we peek at signals here.
- sig_setmask($old);
+ my $restore = allow_sigs keys %$sig;
+ select undef, undef, undef, 0; # check sigs
}
- local @SIG{keys %$sig} = values(%$sig) if $sig && !$sigfd;
- local $SIG{PIPE} = 'IGNORE';
if (!$sigfd && $sig) {
# wake up every second to accept signals if we don't
# have signalfd or IO::KQueue:
diff --git a/lib/PublicInbox/Sigfd.pm b/lib/PublicInbox/Sigfd.pm
index b8a1ddfb..128d933e 100644
--- a/lib/PublicInbox/Sigfd.pm
+++ b/lib/PublicInbox/Sigfd.pm
@@ -8,27 +8,31 @@ use v5.12;
use parent qw(PublicInbox::DS);
use PublicInbox::Syscall qw(signalfd EPOLLIN EPOLLET %SIGNUM);
use POSIX ();
+use autodie qw(kill open);
+my @num2name;
# returns a coderef to unblock signals if neither signalfd or kqueue
# are available.
sub new {
my ($class, $sig) = @_;
- my %signo = map {;
- # $num => [ $cb, $signame ];
- ($SIGNUM{$_} // POSIX->can("SIG$_")->()) => [ $sig->{$_}, $_ ]
- } keys %$sig;
- my $self = bless { sig => \%signo }, $class;
+ my @signo;
+ for my $name (keys %$sig) {
+ my $num = $SIGNUM{$name} // POSIX->can("SIG$name")->();
+ push @signo, $num;
+ $num2name[$num] //= $name;
+ }
+ my $self = bless {}, $class;
my $io;
- my $fd = signalfd([keys %signo]);
+ my $fd = signalfd(\@signo);
if (defined $fd && $fd >= 0) {
- open($io, '+<&=', $fd) or die "open: $!";
+ open $io, '+<&=', $fd;
} elsif (eval { require PublicInbox::DSKQXS }) {
- $io = PublicInbox::DSKQXS->signalfd([keys %signo]);
+ $io = PublicInbox::DSKQXS->signalfd(\@signo);
+ $self->{kq_sigs} = [ keys %$sig ];
} else {
return; # wake up every second to check for signals
}
$self->SUPER::new($io, EPOLLIN | EPOLLET);
- $self->{is_kq} = 1 if tied(*$io);
$self;
}
@@ -37,13 +41,26 @@ sub wait_once ($) {
my ($self) = @_;
# 128 == sizeof(struct signalfd_siginfo)
my $r = sysread($self->{sock}, my $buf, 128 * 64);
- if (defined($r)) {
+ if ($self->{kq_sigs}) {
+ # kqueue doesn't consume signals the same way signalfd does,
+ # so the OS + Perl can make calls for us:
+ my $restore = PublicInbox::DS::allow_sigs @{$self->{kq_sigs}};
+ select undef, undef, undef, 0; # checks signals
+ } elsif (defined($r)) { # Linux signalfd
my $nr = $r / 128 - 1; # $nr may be -1
for my $off (0..$nr) {
# the first uint32_t of signalfd_siginfo: ssi_signo
- my $signo = unpack('L', substr($buf, 128 * $off, 4));
- my ($cb, $signame) = @{$self->{sig}->{$signo}};
- $cb->($signame) if $cb ne 'IGNORE';
+ my $num = unpack('L', substr($buf, 128 * $off, 4));
+ my $name = $num2name[$num];
+ my $cb = $SIG{$name} || 'IGNORE';
+ if ($cb eq 'DEFAULT') {
+ my $restore = PublicInbox::DS::allow_sigs $name;
+ kill $name, $$;
+ select undef, undef, undef, 0; # checks signals
+ # $restore fires
+ } elsif (ref $cb) {
+ $cb->($name);
+ } # undef
}
}
$r;
diff --git a/t/sigfd.t b/t/sigfd.t
index 9a7b947d..eab85ed7 100644
--- a/t/sigfd.t
+++ b/t/sigfd.t
@@ -8,6 +8,7 @@ use Errno qw(ENOSYS);
require_ok 'PublicInbox::Sigfd';
use PublicInbox::DS;
my ($linux_sigfd, $has_sigfd);
+use autodie qw(kill);
SKIP: {
if ($^O ne 'linux' && !eval { require IO::KQueue }) {
@@ -23,7 +24,7 @@ SKIP: {
local $SIG{INT} = sub { $hit->{INT}->{normal}++ };
local $SIG{WINCH} = sub { $hit->{WINCH}->{normal}++ };
for my $s (qw(USR2 HUP TERM INT WINCH)) {
- $sig->{$s} = sub { $hit->{$s}->{sigfd}++ };
+ $sig->{$s} = sub { die "SHOULD NOT BE CALLED ($s)" }
}
kill 'USR2', $$ or die "kill $!";
ok(!defined($hit->{USR2}), 'no USR2 yet') or diag explain($hit);
@@ -44,16 +45,13 @@ SKIP: {
is(select($rvec, undef, undef, undef), 1, 'select() works');
ok($sigfd->wait_once, 'wait_once reported success');
for my $s (qw(HUP INT)) {
- is($hit->{$s}->{sigfd}, 1, "sigfd fired $s");
- is($hit->{$s}->{normal}, undef,
- "normal \$SIG{$s} not fired");
+ is($hit->{$s}->{normal}, 1, "sigfd fired $s");
}
SKIP: {
skip 'Linux sigfd-only behavior', 1 if !$linux_sigfd;
- is($hit->{USR2}->{sigfd}, 1,
+ is($hit->{USR2}->{normal}, 1,
'USR2 sent before signalfd created received');
}
- ok(!$hit->{USR2}->{normal}, 'USR2 not fired normally');
PublicInbox::DS->Reset;
$sigfd = undef;
@@ -64,26 +62,39 @@ SKIP: {
kill('HUP', $$) or die "kill $!";
local @PublicInbox::DS::post_loop_do = (sub {}); # loop once
PublicInbox::DS::event_loop();
- is($hit->{HUP}->{sigfd}, 2, 'HUP sigfd fired in event loop') or
+ is($hit->{HUP}->{normal}, 2, 'HUP sigfd fired in event loop') or
diag explain($hit); # sometimes fails on FreeBSD 11.x
kill('TERM', $$) or die "kill $!";
kill('HUP', $$) or die "kill $!";
PublicInbox::DS::event_loop();
PublicInbox::DS->Reset;
- is($hit->{TERM}->{sigfd}, 1, 'TERM sigfd fired in event loop');
- is($hit->{HUP}->{sigfd}, 3, 'HUP sigfd fired in event loop');
- ok($hit->{WINCH}->{sigfd}, 'WINCH sigfd fired in event loop');
+ is($hit->{TERM}->{normal}, 1, 'TERM sigfd fired in event loop');
+ is($hit->{HUP}->{normal}, 3, 'HUP sigfd fired in event loop');
+ ok($hit->{WINCH}->{normal}, 'WINCH sigfd fired in event loop');
+
+ my $restore = PublicInbox::DS::allow_sigs 'HUP';
+ kill 'HUP', $$;
+ select undef, undef, undef, 0;
+ is $hit->{HUP}->{normal}, 4, 'HUP sigfd fired after allow_sigs';
+
+ undef $restore;
+ kill 'HUP', $$;
+ vec($rvec = '', fileno($nbsig->{sock}), 1) = 1;
+ ok select($rvec, undef, undef, 1),
+ 'select reports sigfd readiness';
+ is $hit->{HUP}->{normal}, 4, 'HUP not fired when sigs blocked';
+ $nbsig->event_step;
+ is $hit->{HUP}->{normal}, 5, 'HUP fires only on ->event_step';
+
+ kill 'HUP', $$;
+ is $hit->{HUP}->{normal}, 5, 'HUP not fired, yet';
+ $restore = PublicInbox::DS::allow_sigs 'HUP';
+ select(undef, undef, undef, 0);
+ is $hit->{HUP}->{normal}, 6, 'HUP fires from allow_sigs';
} else {
skip('signalfd disabled?', 10);
}
- ok(!$hit->{USR2}->{normal}, 'USR2 still not fired normally');
PublicInbox::DS::sig_setmask($old);
- SKIP: {
- ($has_sigfd && !$linux_sigfd) or
- skip 'EVFILT_SIGNAL-only behavior check', 1;
- is($hit->{USR2}->{normal}, 1,
- "USR2 fired normally after unblocking on $^O");
- }
}
done_testing;
^ permalink raw reply related [flat|nested] 7+ messages in thread
* [PATCH 4/5] lei: allow Ctrl-C to interrupt IMAP+NNTP reads
2024-08-20 10:35 [PATCH 0/5] signal handling fixes Eric Wong
` (2 preceding siblings ...)
2024-08-20 10:35 ` [PATCH 3/5] sigfd: call normal Perl %SIG handlers Eric Wong
@ 2024-08-20 10:35 ` Eric Wong
2024-08-20 10:35 ` [PATCH 5/5] lei_xsearch: allow signals during long queries Eric Wong
4 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2024-08-20 10:35 UTC (permalink / raw)
To: meta
Mail::IMAPClient and Net::NNTP remain synchronous APIs with
indefinite wait times on slow/unreliable connections or servers.
Since these APIs don't play nicely with signalfd or
EVFILT_SIGNAL, we will temporarily drop the reliable (but
sometimes delayed) signal handling mechanisms in favor of the
less reliable built-in signal handling of Perl to provide a
best-effort attempt to handle signals during slow operations.
---
lib/PublicInbox/NetReader.pm | 14 ++++++++++++++
1 file changed, 14 insertions(+)
diff --git a/lib/PublicInbox/NetReader.pm b/lib/PublicInbox/NetReader.pm
index 7e861f5f..b95022b3 100644
--- a/lib/PublicInbox/NetReader.pm
+++ b/lib/PublicInbox/NetReader.pm
@@ -7,6 +7,7 @@ use v5.12;
use parent qw(Exporter PublicInbox::IPC);
use PublicInbox::Eml;
use PublicInbox::Config;
+use PublicInbox::DS;
our %IMAPflags2kw = map {; "\\\u$_" => $_ } qw(seen answered flagged draft);
$IMAPflags2kw{'$Forwarded'} = 'forwarded'; # RFC 5550
@@ -61,6 +62,7 @@ sub mic_new ($$$$) {
my %mic_arg = (%$mic_arg, Keepalive => 1);
my $sa = $self->{cfg_opt}->{$sec}->{-proxy_cfg} || $self->{-proxy_cli};
my ($mic, $s, $t);
+ my $restore = PublicInbox::DS::allow_sigs qw(INT QUIT TERM);
if ($sa) {
# this `require' needed for worker[1..Inf], since socks_args
# only got called in worker[0]
@@ -226,6 +228,7 @@ sub nn_new ($$$$) {
($Net_NNTP, $new) = qw(PublicInbox::NetNNTPSocks new_socks);
$nn_arg->{SocksDebug} = 1 if $nn_arg->{Debug};
}
+ my $restore = PublicInbox::DS::allow_sigs qw(INT QUIT TERM);
do {
$! = 0;
$nn = $Net_NNTP->$new(%$nn_arg);
@@ -567,12 +570,14 @@ sub each_old_flags ($$$$) {
for (my $n = 1; $n <= $l_uid; $n += $bs) {
my $end = $n + $bs;
$end = $l_uid if $end > $l_uid;
+ my $restore = PublicInbox::DS::allow_sigs qw(INT QUIT TERM);
my $r = $mic->fetch_hash("$n:$end", 'FLAGS');
if (!$r) {
return if $!{EINTR} && $self->{quit};
return "E: $uri UID FETCH $n:$end error: " .
$mic->LastError." \$!=$!"
}
+ undef $restore;
while (my ($uid, $per_uid) = each %$r) {
my $kw = flags2kw($self, $uri, $uid, $per_uid->{FLAGS})
// next;
@@ -611,12 +616,14 @@ sub _imap_fetch_bodies ($$$$) {
my @batch = splice(@$uids, 0, $bs);
my $batch = join(',', @batch);
local $0 = "UID:$batch $mbx $sec";
+ my $restore = PublicInbox::DS::allow_sigs qw(INT QUIT TERM);
my $r = $mic->fetch_hash($batch, $req, 'FLAGS');
unless ($r) { # network error?
last if $!{EINTR} && $self->{quit};
$err = "E: $uri UID FETCH $batch error: $!";
last;
}
+ undef $restore;
for my $uid (@batch) {
# messages get deleted, so holes appear
my $per_uid = delete $r->{$uid} // next;
@@ -640,6 +647,7 @@ sub _imap_fetch_all ($$$) {
# we need to check for mailbox writability to see if we care about
# FLAGS from already-imported messages.
my $cmd = $self->folder_select;
+ my $restore = PublicInbox::DS::allow_sigs qw(INT QUIT TERM);
$mic->$cmd($mbx) or return "E: \U$cmd\E $mbx ($sec) failed: $!";
my ($r_uidval, $r_uidnext, $perm_fl);
@@ -657,6 +665,7 @@ sub _imap_fetch_all ($$$) {
E: $orig_uri UIDVALIDITY mismatch (got $r_uidval)
EOF
+ undef $restore;
my $uri = $orig_uri->clone;
my $single_uid = $uri->uid;
my ($itrk, $l_uid, $l_uidval) = itrk_last($self, $uri, $r_uidval, $mic);
@@ -782,6 +791,7 @@ sub nn_get {
my $nntp_cfg = $self->{cfg_opt}->{$sec};
$nn = nn_new($self, $nn_arg, $nntp_cfg, $uri) or return;
if (my $postconn = $nntp_cfg->{-postconn}) {
+ my $restore = PublicInbox::DS::allow_sigs qw(INT QUIT TERM);
for my $m_arg (@$postconn) {
my ($method, @args) = @$m_arg;
$nn->$method(@args) and next;
@@ -796,11 +806,13 @@ sub _nntp_fetch_all ($$$) {
my ($self, $nn, $uri) = @_;
my ($group, $num_a, $num_b) = $uri->group;
my $sec = uri_section($uri);
+ my $restore = PublicInbox::DS::allow_sigs qw(INT QUIT TERM);
my ($nr, $beg, $end) = $nn->group($group);
unless (defined($nr)) {
my $msg = ndump($nn->message);
return "E: GROUP $group <$sec> $msg";
}
+ undef $restore;
(defined($num_a) && defined($num_b) && $num_a > $num_b) and
return "E: $uri: backwards range: $num_a > $num_b";
if (defined($num_a)) { # no article numbers in mail_sync.sqlite3
@@ -833,6 +845,7 @@ sub _nntp_fetch_all ($$$) {
$itrk->update_last(0, $last_art) if $itrk;
$n = $self->{max_batch};
}
+ $restore = PublicInbox::DS::allow_sigs qw(INT QUIT TERM);
my $raw = $nn->article($art);
unless (defined($raw)) {
my $msg = ndump($nn->message);
@@ -844,6 +857,7 @@ sub _nntp_fetch_all ($$$) {
next;
}
}
+ undef $restore;
$raw = join('', @$raw);
$raw =~ s/\r\n/\n/sg;
my ($eml_cb, @args) = @{$self->{eml_each}};
^ permalink raw reply related [flat|nested] 7+ messages in thread
* [PATCH 5/5] lei_xsearch: allow signals during long queries
2024-08-20 10:35 [PATCH 0/5] signal handling fixes Eric Wong
` (3 preceding siblings ...)
2024-08-20 10:35 ` [PATCH 4/5] lei: allow Ctrl-C to interrupt IMAP+NNTP reads Eric Wong
@ 2024-08-20 10:35 ` Eric Wong
2024-08-20 18:40 ` [PATCH 6/5] t/sigfd: reduce getpid() calls and hash lookups Eric Wong
4 siblings, 1 reply; 7+ messages in thread
From: Eric Wong @ 2024-08-20 10:35 UTC (permalink / raw)
To: meta
Xapian ->mset, remote Xapian calls via remote inboxes, and
lcat dumps can take a long time via wq_io_do and hold
lei_xsearch processes open for too long after a client
disconnects prematurely.
This fixes wait_for_eof shutdown timeouts on the lei-daemon quit
pipe when running t/lei-sigpipe.t with GIANT_INBOX_DIR pointed
to a meta@public-inbox.org mirror on my old laptop.
---
lib/PublicInbox/LeiXSearch.pm | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 43dedd10..e20b13c6 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -150,6 +150,7 @@ sub mset_progress {
sub query_one_mset { # for --threads and l2m w/o sort
my ($self, $ibxish) = @_;
my $lei = $self->{lei};
+ my $allow_sigs = PublicInbox::DS::allow_sigs qw(INT QUIT TERM);
my ($srch, $over) = ($ibxish->search, $ibxish->over);
my $dir = $ibxish->{inboxdir} // $ibxish->{topdir};
return warn("$dir not indexed by Xapian\n") unless ($srch && $over);
@@ -223,6 +224,7 @@ sub query_one_mset { # for --threads and l2m w/o sort
sub query_combined_mset { # non-parallel for non-"--threads" users
my ($self) = @_;
my $lei = $self->{lei};
+ my $allow_sigs = PublicInbox::DS::allow_sigs qw(INT QUIT TERM);
my $mo = { %{$lei->{mset_opt}} };
local $0 = "$0 C $mo->{qstr}";
my $mset;
@@ -317,6 +319,7 @@ sub fudge_qstr_time ($$$) {
sub query_remote_mboxrd {
my ($self, $uris) = @_;
local $SIG{TERM} = sub { exit(0) }; # for DESTROY (File::Temp, $reap)
+ my $allow_sigs = PublicInbox::DS::allow_sigs qw(INT QUIT TERM);
my $lei = $self->{lei};
my $opt = $lei->{opt};
my $qstr = $lei->{mset_opt}->{qstr};
@@ -633,6 +636,7 @@ sub _lcat2smsg { # git->cat_async callback
sub lcat_dump { # via wq_io_do
my ($self) = @_;
my $lei = $self->{lei};
+ my $allow_sigs = PublicInbox::DS::allow_sigs qw(INT QUIT TERM);
my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
my $git = $lei->{ale}->git;
if (!$lei->{l2m}) {
^ permalink raw reply related [flat|nested] 7+ messages in thread
* [PATCH 6/5] t/sigfd: reduce getpid() calls and hash lookups
2024-08-20 10:35 ` [PATCH 5/5] lei_xsearch: allow signals during long queries Eric Wong
@ 2024-08-20 18:40 ` Eric Wong
0 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2024-08-20 18:40 UTC (permalink / raw)
To: meta
getpid() is no longer cached by glibc, syscalls are more
expensive nowadays, so only call it once per test. The
additional hash table depth is no longer necessary since there's
no longer a difference between signal dispatch methods now that
Sigfd uses the global %SIG.
---
t/sigfd.t | 55 ++++++++++++++++++++++++++++---------------------------
1 file changed, 28 insertions(+), 27 deletions(-)
diff --git a/t/sigfd.t b/t/sigfd.t
index eab85ed7..91d11034 100644
--- a/t/sigfd.t
+++ b/t/sigfd.t
@@ -18,15 +18,16 @@ SKIP: {
my $old = PublicInbox::DS::block_signals();
my $hit = {};
my $sig = {};
- local $SIG{USR2} = sub { $hit->{USR2}->{normal}++ };
- local $SIG{HUP} = sub { $hit->{HUP}->{normal}++ };
- local $SIG{TERM} = sub { $hit->{TERM}->{normal}++ };
- local $SIG{INT} = sub { $hit->{INT}->{normal}++ };
- local $SIG{WINCH} = sub { $hit->{WINCH}->{normal}++ };
+ local $SIG{USR2} = sub { $hit->{USR2}++ };
+ local $SIG{HUP} = sub { $hit->{HUP}++ };
+ local $SIG{TERM} = sub { $hit->{TERM}++ };
+ local $SIG{INT} = sub { $hit->{INT}++ };
+ local $SIG{WINCH} = sub { $hit->{WINCH}++ };
for my $s (qw(USR2 HUP TERM INT WINCH)) {
$sig->{$s} = sub { die "SHOULD NOT BE CALLED ($s)" }
}
- kill 'USR2', $$ or die "kill $!";
+ my $PID = $$;
+ kill 'USR2', $PID;
ok(!defined($hit->{USR2}), 'no USR2 yet') or diag explain($hit);
PublicInbox::DS->Reset;
ok($PublicInbox::Syscall::SIGNUM{WINCH}, 'SIGWINCH number defined');
@@ -35,9 +36,9 @@ SKIP: {
$linux_sigfd = 1 if $^O eq 'linux';
$has_sigfd = 1;
ok($sigfd, 'Sigfd->new works');
- kill('HUP', $$) or die "kill $!";
- kill('INT', $$) or die "kill $!";
- kill('WINCH', $$) or die "kill $!";
+ kill 'HUP', $PID;
+ kill 'INT', $PID;
+ kill 'WINCH', $PID;
my $fd = fileno($sigfd->{sock});
ok($fd >= 0, 'fileno(Sigfd->{sock}) works');
my $rvec = '';
@@ -45,12 +46,12 @@ SKIP: {
is(select($rvec, undef, undef, undef), 1, 'select() works');
ok($sigfd->wait_once, 'wait_once reported success');
for my $s (qw(HUP INT)) {
- is($hit->{$s}->{normal}, 1, "sigfd fired $s");
+ is $hit->{$s}, 1, "sigfd fired $s";
}
SKIP: {
skip 'Linux sigfd-only behavior', 1 if !$linux_sigfd;
- is($hit->{USR2}->{normal}, 1,
- 'USR2 sent before signalfd created received');
+ is $hit->{USR2}, 1,
+ 'USR2 sent before signalfd created received';
}
PublicInbox::DS->Reset;
$sigfd = undef;
@@ -59,38 +60,38 @@ SKIP: {
ok($nbsig, 'Sigfd->new SFD_NONBLOCK works');
is($nbsig->wait_once, undef, 'nonblocking ->wait_once');
ok($! == Errno::EAGAIN, 'got EAGAIN');
- kill('HUP', $$) or die "kill $!";
+ kill 'HUP', $PID;
local @PublicInbox::DS::post_loop_do = (sub {}); # loop once
PublicInbox::DS::event_loop();
- is($hit->{HUP}->{normal}, 2, 'HUP sigfd fired in event loop') or
+ is $hit->{HUP}, 2, 'HUP sigfd fired in event loop' or
diag explain($hit); # sometimes fails on FreeBSD 11.x
- kill('TERM', $$) or die "kill $!";
- kill('HUP', $$) or die "kill $!";
+ kill 'TERM', $PID;
+ kill 'HUP', $PID;
PublicInbox::DS::event_loop();
PublicInbox::DS->Reset;
- is($hit->{TERM}->{normal}, 1, 'TERM sigfd fired in event loop');
- is($hit->{HUP}->{normal}, 3, 'HUP sigfd fired in event loop');
- ok($hit->{WINCH}->{normal}, 'WINCH sigfd fired in event loop');
+ is $hit->{TERM}, 1, 'TERM sigfd fired in event loop';
+ is $hit->{HUP}, 3, 'HUP sigfd fired in event loop';
+ ok $hit->{WINCH}, 'WINCH sigfd fired in event loop';
my $restore = PublicInbox::DS::allow_sigs 'HUP';
- kill 'HUP', $$;
+ kill 'HUP', $PID;
select undef, undef, undef, 0;
- is $hit->{HUP}->{normal}, 4, 'HUP sigfd fired after allow_sigs';
+ is $hit->{HUP}, 4, 'HUP sigfd fired after allow_sigs';
undef $restore;
- kill 'HUP', $$;
+ kill 'HUP', $PID;
vec($rvec = '', fileno($nbsig->{sock}), 1) = 1;
ok select($rvec, undef, undef, 1),
'select reports sigfd readiness';
- is $hit->{HUP}->{normal}, 4, 'HUP not fired when sigs blocked';
+ is $hit->{HUP}, 4, 'HUP not fired when sigs blocked';
$nbsig->event_step;
- is $hit->{HUP}->{normal}, 5, 'HUP fires only on ->event_step';
+ is $hit->{HUP}, 5, 'HUP fires only on ->event_step';
- kill 'HUP', $$;
- is $hit->{HUP}->{normal}, 5, 'HUP not fired, yet';
+ kill 'HUP', $PID;
+ is $hit->{HUP}, 5, 'HUP not fired, yet';
$restore = PublicInbox::DS::allow_sigs 'HUP';
select(undef, undef, undef, 0);
- is $hit->{HUP}->{normal}, 6, 'HUP fires from allow_sigs';
+ is $hit->{HUP}, 6, 'HUP fires from allow_sigs';
} else {
skip('signalfd disabled?', 10);
}
^ permalink raw reply related [flat|nested] 7+ messages in thread