From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF, T_SCC_BODY_TEXT_LINE shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 747071F47C for ; Tue, 20 Aug 2024 10:35:22 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1724150122; bh=ZpBXGgz+oTfLrmu1ExburLQ5zXjksjP4Xivmh8DDIwU=; h=From:To:Subject:Date:In-Reply-To:References:From; b=achfuO/oySyjrj34H4B+gRhkKylNMhap9qCP3ZLfEHisu6mpckC8yFfvojGgIVsMM 2VhX0xO7Gx0p8t7Ddrq3tdt1gENp4K/C1B2g6ZRKf82wsLEUbCy6VwlCtpVOAzjYDz fZaXi94hT1waq4kamrPexyY4yYWZ7aaqa8KNMkxc= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 1/5] treewide: handle EINTR for non-(signalfd|kevent) Date: Tue, 20 Aug 2024 10:35:17 +0000 Message-ID: <20240820103522.3548609-2-e@80x24.org> In-Reply-To: <20240820103522.3548609-1-e@80x24.org> References: <20240820103522.3548609-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: We may encounter new architectures in Linux without syscall number definitions or *BSD systems without IO::KQueue or kevent support at all, so be prepared to handle signals anywhere within the event loop in such cases. --- lib/PublicInbox/CodeSearchIdx.pm | 24 ++++++++++++---- lib/PublicInbox/IPC.pm | 16 +++++++++-- lib/PublicInbox/LEI.pm | 49 +++++++++++++++++++++----------- lib/PublicInbox/PktOp.pm | 17 ++++++----- lib/PublicInbox/SHA.pm | 22 +++++++++++--- 5 files changed, 92 insertions(+), 36 deletions(-) diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index 6d777bf6..ff3db8ba 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -72,7 +72,8 @@ use PublicInbox::Aspawn qw(run_await); use Compress::Zlib qw(compress); use Carp qw(croak); use Time::Local qw(timegm); -use autodie qw(close pipe open sysread seek sysseek send); +use Errno qw(EINTR); +use autodie qw(close pipe open seek sysseek); our $DO_QUIT = 15; # signal number our ( $LIVE_JOBS, # integer @@ -225,6 +226,17 @@ sub check_objfmt_status ($$$) { $fmt; } +sub xsend ($$) { # move to PerlIO if we need to + my ($s, $buf) = @_; + my $n; + while (1) { + $n = send $s, $buf, 0; + return $n if defined $n; + next if $! == EINTR; + croak "send: $!"; + } +} + sub store_repo { # wq_io_do, sends docid back my ($self, $repo) = @_; my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p'; @@ -248,7 +260,7 @@ EOM my $did = $repo->{docid}; $did ? $self->{xdb}->replace_document($did, $doc) : ($did = $self->{xdb}->add_document($doc)); - send($op_p, "repo_stored $did", 0); + xsend $op_p, "repo_stored $did"; } sub cidx_ckpoint ($;$) { @@ -293,7 +305,7 @@ sub cidx_reap_log { # awaitpid cb my ($pid, $cmd, $self, $op_p) = @_; if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT || ($? & 127) == POSIX::SIGPIPE))) { - send($op_p, "shard_done $self->{shard}", 0); + xsend $op_p, "shard_done $self->{shard}"; } else { warn "W: @$cmd (\$?=$?)\n"; $self->{xdb}->cancel_transaction; @@ -444,7 +456,7 @@ sub fp_async_done { # run_git cb from worker my ($opt, $self, $git, $op_p) = @_; my $refs = delete $opt->{1} // 'BUG: no {-repo}->{refs}'; sysseek($refs, 0, SEEK_SET); - send($op_p, 'fp_done '.sha_all(256, $refs)->hexdigest, 0); + xsend $op_p, 'fp_done '.sha_all(256, $refs)->hexdigest; } sub fp_done { # called parent via PktOp by fp_async_done @@ -523,7 +535,7 @@ sub shard_commit { # via wq_io_do my ($self) = @_; my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p'; $self->commit_txn_lazy; - send($op_p, "shard_done $self->{shard}", 0); + xsend $op_p, "shard_done $self->{shard}"; } sub dump_roots_start { @@ -818,7 +830,7 @@ sub prune_commit { # via wq_io_do in IDX_SHARDS my $prune_op_p = delete $self->{0} // die 'BUG: no {0} op_p'; my $nr = delete $self->{nr_prune} // die 'BUG: nr_prune undef'; cidx_ckpoint($self, "prune [$self->{shard}] $nr done") if $nr; - send($prune_op_p, "prune_done $self->{shard}", 0); + xsend $prune_op_p, "prune_done $self->{shard}"; } sub shards_active { # post_loop_do diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index ed6d27fd..13a897be 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -10,7 +10,8 @@ package PublicInbox::IPC; use v5.12; use parent qw(Exporter); -use autodie qw(close pipe read socketpair sysread); +use autodie qw(close pipe read socketpair); +use Errno qw(EAGAIN EINTR); use Carp qw(croak); use PublicInbox::DS qw(awaitpid); use PublicInbox::Spawn; @@ -215,8 +216,17 @@ sub recv_and_run { } while ($full_stream && $n < $len) { my $r = sysread($s2, $buf, $len - $n, $n); - croak "read EOF after $n/$len bytes" if $r == 0; - $n = length($buf); + if ($r) { + $n = length($buf); # keep looping + } elsif (!defined $r) { + if ($! == EAGAIN) { + poll_in($s2) + } elsif ($! != EINTR) { + croak "sysread: $!"; + } # next on EINTR + } else { # ($r == 0) + croak "read EOF after $n/$len bytes"; + } } # Sereal dies on truncated data, Storable returns undef my $args = ipc_thaw($buf) // die "thaw error on buffer of size: $n"; diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index c5146428..637cc8b1 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -12,7 +12,7 @@ use parent qw(PublicInbox::DS PublicInbox::LeiExternal use autodie qw(bind chdir open pipe socket socketpair syswrite unlink); use Getopt::Long (); use Socket qw(AF_UNIX SOCK_SEQPACKET pack_sockaddr_un); -use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET); +use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET EINTR); use Cwd qw(getcwd); use POSIX qw(strftime); use IO::Handle (); @@ -27,6 +27,7 @@ use PublicInbox::Import; use PublicInbox::ContentHash qw(git_sha); use PublicInbox::OnDestroy; use PublicInbox::IPC; +use PublicInbox::IO qw(poll_in); use Time::HiRes qw(stat); # ctime comparisons for config cache use File::Path (); use File::Spec; @@ -488,6 +489,15 @@ sub _drop_wq { } } +sub send_gently ($$) { + my ($s, $buf) = @_; + my $n; + while (1) { + $n = send $s, $buf, 0; + return $n if defined($n) || $! != EINTR; + } +} + # pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE sub x_it ($$) { my ($self, $code) = @_; @@ -498,7 +508,7 @@ sub x_it ($$) { $self->{pkt_op_p}->pkt_do('x_it', $code); exit($code >> 8) if $$ != $daemon_pid; } elsif ($self->{sock}) { # lei->daemon => lei(1) client - send($self->{sock}, "x_it $code", 0); + send_gently $self->{sock}, "x_it $code"; } elsif ($quit == \&CORE::exit) { # an admin (one-shot) command exit($code >> 8); } # else ignore if client disconnected @@ -569,7 +579,7 @@ sub child_error { # passes non-fatal curl exit codes to user if ($self->{pkt_op_p}) { # to top lei-daemon $self->{pkt_op_p}->pkt_do('child_error', $child_error); } elsif ($self->{sock}) { # to lei(1) client - send($self->{sock}, "child_error $child_error", 0); + send_gently $self->{sock}, "child_error $child_error"; } # else noop if client disconnected } @@ -1066,7 +1076,7 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail while (my $op = shift(@$alerts)) { if ($op eq ':WINCH') { # hit the process group that started the MUA - send($sock, '-WINCH', 0) if $sock; + send_gently $sock, '-WINCH' if $sock; } elsif ($op eq ':bell') { out($self, "\a"); } elsif ($op =~ /(?{2} } @msg, '# -quit pager to continue-'; $self->{2}->autoflush(1); stop_pager($self); - send($self->{sock}, 'wait', 0); # wait for user to quit pager + send_gently $self->{sock}, 'wait'; # wait for user to quit pager } sub stop_pager { @@ -1147,22 +1157,22 @@ sub accept_dispatch { # Listener {post_accept} callback my $self = bless { sock => $sock }, __PACKAGE__; vec(my $rvec = '', fileno($sock), 1) = 1; select($rvec, undef, undef, 60) or - return send($sock, 'timed out waiting to recv FDs', 0); + return send_gently $sock, 'timed out waiting to recv FDs'; # (4096 * 33) >MAX_ARG_STRLEN my @fds = $PublicInbox::IPC::recv_cmd->($sock, my $buf, 4096 * 33) or return; # EOF if (!defined($fds[0])) { warn(my $msg = "recv_cmd failed: $!"); - return send($sock, $msg, 0); + return send_gently $sock, $msg; } else { my $i = 0; open($self->{$i++}, '+<&=', $_) for @fds; - $i == 4 or return send($sock, 'not enough FDs='.($i-1), 0) + $i == 4 or return send_gently $sock, 'not enough FDs='.($i-1); } # $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV); # $buf = "$argc\0".join("\0", @ARGV).$ENV_STR."\0\0"; substr($buf, -2, 2, '') eq "\0\0" or # s/\0\0\z// - return send($sock, 'request command truncated', 0); + return send_gently $sock, 'request command truncated'; my ($argc, @argv) = split(/\0/, $buf, -1); undef $buf; my %env = map { split(/=/, $_, 2) } splice(@argv, $argc); @@ -1207,7 +1217,7 @@ sub event_step { die "unrecognized client signal: $buf"; } my $s = $self->{-socks} // []; # lei up --all - @$s = grep { send($_, $buf, 0) } @$s; + @$s = grep { send_gently $_, $buf } @$s; }; if (my $err = $@) { eval { $self->fail($err) }; @@ -1572,14 +1582,21 @@ sub cfg_dump ($$) { undef; } -sub request_umask { +sub request_umask { # assumes client is trusted and fast my ($lei) = @_; my $s = $lei->{sock} // return; send($s, 'umask', 0) // die "send: $!"; - vec(my $rvec = '', fileno($s), 1) = 1; - select($rvec, undef, undef, 2) or die 'timeout waiting for umask'; - recv($s, my $v, 5, 0) // die "recv: $!"; - (my $u, $lei->{client_umask}) = unpack('AV', $v); + my ($v, $r, $u); + do { # n.b. poll_in returns -1 on EINTR + vec($v = '', fileno($s), 1) = 1; + $r = poll_in($s, 2) or + die 'timeout waiting for umask'; + } while ($r < 0 && $! == EINTR); + do { + $r = recv $s, $v, 5, 0; + die "recv: $!" if !defined($r) && $! != EINTR; + } while (!defined $r); + ($u, $lei->{client_umask}) = unpack('AV', $v); $u eq 'u' or warn "E: recv $v has no umask"; } diff --git a/lib/PublicInbox/PktOp.pm b/lib/PublicInbox/PktOp.pm index 1bcdd799..c3ee36aa 100644 --- a/lib/PublicInbox/PktOp.pm +++ b/lib/PublicInbox/PktOp.pm @@ -8,7 +8,7 @@ package PublicInbox::PktOp; use v5.12; use parent qw(PublicInbox::DS); -use Errno qw(EAGAIN ECONNRESET); +use Errno qw(EAGAIN ECONNRESET EINTR); use PublicInbox::Syscall qw(EPOLLIN); use Socket qw(AF_UNIX SOCK_SEQPACKET); use PublicInbox::IPC qw(ipc_freeze ipc_thaw); @@ -37,12 +37,15 @@ sub pkt_do { # for the producer to trigger event_step in consumer sub event_step { my ($self) = @_; my $c = $self->{sock}; - my $n = recv($c, my $msg, 4096, 0); - unless (defined $n) { - return if $! == EAGAIN; - die "recv: $!" if $! != ECONNRESET; # we may be bidirectional - } - my ($cmd, @pargs); + my ($msg, $n, $cmd, @pargs); + do { + $n = recv($c, $msg, 4096, 0); + unless (defined $n) { + next if $! == EINTR; + return if $! == EAGAIN; + die "recv: $!" if $! != ECONNRESET; # we may be bidirectional + } + } until (defined $n); if (index($msg, "\0") > 0) { ($cmd, my $pargs) = split(/\0/, $msg, 2); @pargs = @{ipc_thaw($pargs)}; diff --git a/lib/PublicInbox/SHA.pm b/lib/PublicInbox/SHA.pm index 3fa8530e..5eb882c6 100644 --- a/lib/PublicInbox/SHA.pm +++ b/lib/PublicInbox/SHA.pm @@ -12,8 +12,10 @@ package PublicInbox::SHA; use v5.12; require Exporter; +use Errno qw(EAGAIN EINTR); +use PublicInbox::IO qw(poll_in); +use Carp qw(croak); our @EXPORT_OK = qw(sha1_hex sha256_hex sha256 sha_all); -use autodie qw(sysread); our @ISA; BEGIN { @@ -59,9 +61,21 @@ EOM sub sha_all ($$) { my ($n, $fh) = @_; - my ($dig, $buf) = (PublicInbox::SHA->new($n)); - while (sysread($fh, $buf, 65536)) { $dig->add($buf) } - $dig + my ($dig, $buf, $r) = (PublicInbox::SHA->new($n)); + while (1) { + $r = sysread($fh, $buf, 65536); + if ($r) { + $dig->add($buf); + } elsif (!defined $r) { + if ($! == EAGAIN) { + poll_in($fh); + } elsif ($! != EINTR) { + croak "sysread: $!"; + } # next on EINTR + } else { # EOF: + return $dig; + } + } } 1;