From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 499201FFAB for ; Mon, 1 Feb 2021 08:28:34 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 04/21] lei: remove SIGPIPE handler Date: Sun, 31 Jan 2021 22:28:16 -1000 Message-Id: <20210201082833.3293-5-e@80x24.org> In-Reply-To: <20210201082833.3293-1-e@80x24.org> References: <20210201082833.3293-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: It doesn't save us any code, and the action-at-a-distance element was making it confusing to track down actual problems. Another potential problem was keeping references alive too long. So do like we would a C100K server and check every write while still ensuring lei(1) exit with a proper SIGPIPE iff needed. --- lib/PublicInbox/IPC.pm | 10 +++--- lib/PublicInbox/LEI.pm | 56 +++++++++++++++++++++------------- lib/PublicInbox/LeiExternal.pm | 3 +- lib/PublicInbox/LeiOverview.pm | 33 ++++++++------------ lib/PublicInbox/LeiToMail.pm | 45 ++++++++++++--------------- lib/PublicInbox/LeiXSearch.pm | 17 ++++------- t/lei_to_mail.t | 31 ++++++++++--------- 7 files changed, 96 insertions(+), 99 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 479c4377..172552b9 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -139,8 +139,10 @@ sub ipc_worker_spawn { sub ipc_worker_reap { # dwaitpid callback my ($self, $pid) = @_; - # SIGTERM (15) is our default exit signal - warn "PID:$pid died with \$?=$?\n" if $? && ($? & 127) != 15; + return if !$?; + # TERM(15) is our default exit signal, PIPE(13) is likely w/ pager + my $s = $? & 127; + warn "PID:$pid died with \$?=$?\n" if $s != 15 && $s != 13; } sub wq_wait_old { @@ -278,7 +280,7 @@ sub recv_and_run { undef $buf; my $sub = shift @$args; eval { $self->$sub(@$args) }; - warn "$$ wq_worker: $@" if $@ && ref($@) ne 'PublicInbox::SIGPIPE'; + warn "$$ wq_worker: $@" if $@; delete @$self{0..($nfd-1)}; $n; } @@ -320,7 +322,7 @@ sub wq_do { # always async } else { @$self{0..$#$ios} = @$ios; eval { $self->$sub(@args) }; - warn "wq_do: $@" if $@ && ref($@) ne 'PublicInbox::SIGPIPE'; + warn "wq_do: $@" if $@; delete @$self{0..$#$ios}; # don't close } } diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index ceba16e4..b915bb0c 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -12,7 +12,7 @@ use parent qw(PublicInbox::DS PublicInbox::LeiExternal PublicInbox::LeiQuery); use Getopt::Long (); use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un); -use Errno qw(EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET); +use Errno qw(EPIPE EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET); use Cwd qw(getcwd); use POSIX (); use IO::Handle (); @@ -277,7 +277,11 @@ sub x_it ($$) { dump_and_clear_log(); if (my $sock = $self->{sock}) { send($sock, "x_it $code", MSG_EOR); - } elsif (!($code & 127)) { # oneshot, ignore signals + } elsif (my $signum = ($code & 127)) { # oneshot, usually SIGPIPE (13) + $SIG{PIPE} = 'DEFAULT'; # $SIG{$signum} doesn't work + kill $signum, $$; + sleep; # wait for signal + } else { # oneshot # don't want to end up using $? from child processes for my $f (qw(lxs l2m)) { my $wq = delete $self->{$f} or next; @@ -287,14 +291,15 @@ sub x_it ($$) { } } -sub puts ($;@) { print { shift->{1} } map { "$_\n" } @_ } - -sub out ($;@) { print { shift->{1} } @_ } - sub err ($;@) { my $self = shift; - my $err = $self->{2} // ($self->{pgr} // [])->[2] // *STDERR{IO}; - print $err @_, (substr($_[-1], -1, 1) eq "\n" ? () : "\n"); + my $err = $self->{2} // ($self->{pgr} // [])->[2] // *STDERR{GLOB}; + my $eor = (substr($_[-1], -1, 1) eq "\n" ? () : "\n"); + print $err @_, $eor and return; + my $old_err = delete $self->{2}; + close($old_err) if $! == EPIPE && $old_err;; + $err = $self->{2} = ($self->{pgr} // [])->[2] // *STDERR{GLOB}; + print $err @_, $eor or print STDERR @_, $eor; } sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) } @@ -306,6 +311,17 @@ sub fail ($$;$) { undef; } +sub out ($;@) { + my $self = shift; + return if print { $self->{1} // return } @_; # likely + return note_sigpipe($self, 1) if $! == EPIPE; + my $err = "error writing to stdout: $!"; + delete $self->{1}; + fail($self, $err); +} + +sub puts ($;@) { out(shift, map { "$_\n" } @_) } + sub child_error { # passes non-fatal curl exit codes to user my ($self, $child_error) = @_; # child_error is $? if (my $sock = $self->{sock}) { # send to lei(1) client @@ -350,27 +366,23 @@ sub io_restore ($$) { } } -# usage: my %sig = $lei->atfork_child_wq($wq); -# local @SIG{keys %sig} = values %sig; +# triggers sigpipe_handler +sub note_sigpipe { + my ($self, $fd) = @_; + close(delete($self->{$fd})); # explicit close silences Perl warning + syswrite($self->{op_pipe}, '!') if $self->{op_pipe}; + x_it($self, 13); +} + sub atfork_child_wq { my ($self, $wq) = @_; io_restore($self, $wq); + -p $self->{op_pipe} or die 'BUG: {op_pipe} expected'; io_restore($self->{l2m}, $wq); %PATH2CFG = (); undef $errors_log; $quit = \&CORE::exit; - (PIPE => sub { - $self->x_it(13); # SIGPIPE = 13 - # we need to close explicitly to avoid Perl warning on SIGPIPE - for my $i (1, 2) { - next unless $self->{$i} && (-p $self->{$i} || -S _); - close(delete $self->{$i}); - } - # trigger the LeiXSearch $done OpPipe: - syswrite($self->{op_pipe}, '!') if $self->{op_pipe}; - $SIG{PIPE} = 'DEFAULT'; - die bless(\"$_[0]", 'PublicInbox::SIGPIPE'), - }); + $current_lei = $self; # for SIG{__WARN__} } sub io_extract ($;@) { diff --git a/lib/PublicInbox/LeiExternal.pm b/lib/PublicInbox/LeiExternal.pm index bf07c41c..b1176824 100644 --- a/lib/PublicInbox/LeiExternal.pm +++ b/lib/PublicInbox/LeiExternal.pm @@ -31,11 +31,10 @@ sub _externals_each { sub lei_ls_external { my ($self, @argv) = @_; - my $out = $self->{1}; my ($OFS, $ORS) = $self->{opt}->{z} ? ("\0", "\0\0") : (" ", "\n"); $self->_externals_each(sub { my ($loc, $boost_val) = @_; - print $out $loc, $OFS, 'boost=', $boost_val, $ORS; + $self->out($loc, $OFS, 'boost=', $boost_val, $ORS); }); } diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index fa041457..1d62ffe2 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -107,28 +107,22 @@ sub new { sub ovv_begin { my ($self, $lei) = @_; if ($self->{fmt} eq 'json') { - print { $lei->{1} } '['; + $lei->out('['); } # TODO HTML/Atom/... } # called once by parent (via PublicInbox::EOFpipe) sub ovv_end { my ($self, $lei) = @_; - my $out = $lei->{1} or return; if ($self->{fmt} eq 'json') { # JSON doesn't allow trailing commas, and preventing # trailing commas is a PITA when parallelizing outputs - print $out "null]\n"; + $lei->out("null]\n"); } elsif ($self->{fmt} eq 'concatjson') { - print $out "\n"; + $lei->out("\n"); } } -sub ovv_atfork_child { - my ($self) = @_; - # reopen dedupe here -} - # prepares an smsg for JSON sub _unbless_smsg { my ($smsg, $mitem) = @_; @@ -168,9 +162,8 @@ sub ovv_atexit_child { $git->async_wait_all; } if (my $bref = delete $lei->{ovv_buf}) { - my $out = $lei->{1} or return; my $lk = $self->lock_for_scope; - print $out $$bref; + $lei->out($$bref); } } @@ -268,11 +261,10 @@ sub ovv_each_smsg_cb { # runs in wq worker usually } } sort keys %$smsg); $buf .= $EOR; - if (length($buf) > 65536) { - my $lk = $self->lock_for_scope; - print { $lei->{1} } $buf; - $buf = ''; - } + return if length($buf) < 65536; + my $lk = $self->lock_for_scope; + $lei->out($buf); + $buf = ''; } } elsif ($json) { my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL @@ -280,11 +272,10 @@ sub ovv_each_smsg_cb { # runs in wq worker usually my ($smsg, $mitem) = @_; return if $dedupe->is_smsg_dup($smsg); $buf .= $json->encode(_unbless_smsg(@_)) . $ORS; - if (length($buf) > 65536) { - my $lk = $self->lock_for_scope; - print { $lei->{1} } $buf; - $buf = ''; - } + return if length($buf) < 65536; + my $lk = $self->lock_for_scope; + $lei->out($buf); + $buf = ''; } } # else { ... } diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 1f6c2a3b..01e7cec5 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -17,7 +17,7 @@ use PublicInbox::GitAsyncCat; use Symbol qw(gensym); use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY); -use Errno qw(EEXIST ESPIPE ENOENT); +use Errno qw(EEXIST ESPIPE ENOENT EPIPE); # struggles with short-lived repos, Gcf2Client makes little sense with lei; # but we may use in-process libgit2 in the future. @@ -68,14 +68,16 @@ sub _mbox_hdr_buf ($$$) { } sub atomic_append { # for on-disk destinations (O_APPEND, or O_EXCL) - my ($fh, $buf) = @_; - defined(my $w = syswrite($fh, $$buf)) or die "write: $!"; - $w == length($$buf) or die "short write: $w != ".length($$buf); -} - -sub _print_full { - my ($fh, $buf) = @_; - print $fh $$buf or die "print: $!"; + my ($lei, $buf) = @_; + if (defined(my $w = syswrite($lei->{1} // return, $$buf))) { + return if $w == length($$buf); + $buf = "short atomic write: $w != ".length($$buf); + } elsif ($! == EPIPE) { + return $lei->note_sigpipe(1); + } else { + $buf = "atomic write: $!"; + } + $lei->fail($buf); } sub eml2mboxrd ($;$) { @@ -248,24 +250,19 @@ sub _mbox_write_cb ($$) { my $ovv = $lei->{ovv}; my $m = 'eml2'.$ovv->{fmt}; my $eml2mbox = $self->can($m) or die "$self->$m missing"; - my $out = $lei->{1} // die "no stdout ($m, $ovv->{dst})"; # redirected earlier - $out->autoflush(1); - my $write = $ovv->{lock_path} ? \&_print_full : \&atomic_append; + $lei->{1} // die "no stdout ($m, $ovv->{dst})"; # redirected earlier + $lei->{1}->autoflush(1); + my $atomic_append = !defined($ovv->{lock_path}); my $dedupe = $lei->{dedupe}; $dedupe->prepare_dedupe; sub { # for git_to_mail my ($buf, $smsg, $eml) = @_; - return unless $out; $eml //= PublicInbox::Eml->new($buf); - if (!$dedupe->is_dup($eml, $smsg->{blob})) { - $buf = $eml2mbox->($eml, $smsg); - my $lk = $ovv->lock_for_scope; - eval { $write->($out, $buf) }; - if ($@) { - die $@ if ref($@) ne 'PublicInbox::SIGPIPE'; - undef $out - } - } + return if $dedupe->is_dup($eml, $smsg->{blob}); + $buf = $eml2mbox->($eml, $smsg); + return atomic_append($lei, $buf) if $atomic_append; + my $lk = $ovv->lock_for_scope; + $lei->out($$buf); } } @@ -467,8 +464,7 @@ sub write_mail { # via ->wq_do my ($self, $git_dir, $smsg, $lei) = @_; my $not_done = delete $self->{$lei->{each_smsg_not_done}}; my $wcb = $self->{wcb} //= do { # first message - my %sig = $lei->atfork_child_wq($self); - @SIG{keys %sig} = values %sig; # not local + $lei->atfork_child_wq($self); $self->write_cb($lei); }; my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir); @@ -483,7 +479,6 @@ sub wq_atexit_child { $git->async_wait_all; } $SIG{__WARN__} = 'DEFAULT'; - $SIG{PIPE} = 'DEFAULT'; } 1; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index e69b637c..de82a7da 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -109,8 +109,7 @@ sub wait_startq ($) { sub query_thread_mset { # for --thread my ($self, $lei, $ibxish) = @_; local $0 = "$0 query_thread_mset"; - my %sig = $lei->atfork_child_wq($self); - local @SIG{keys %sig} = values %sig; + $lei->atfork_child_wq($self); my $startq = delete $lei->{startq}; my ($srch, $over) = ($ibxish->search, $ibxish->over); @@ -145,8 +144,7 @@ sub query_thread_mset { # for --thread sub query_mset { # non-parallel for non-"--thread" users my ($self, $lei) = @_; local $0 = "$0 query_mset"; - my %sig = $lei->atfork_child_wq($self); - local @SIG{keys %sig} = values %sig; + $lei->atfork_child_wq($self); my $startq = delete $lei->{startq}; my $mo = { %{$lei->{mset_opt}} }; my $mset; @@ -187,8 +185,7 @@ sub kill_reap { sub query_remote_mboxrd { my ($self, $lei, $uris) = @_; local $0 = "$0 query_remote_mboxrd"; - my %sig = $lei->atfork_child_wq($self); # keep $self->{5} startq - local @SIG{keys %sig} = values %sig; + $lei->atfork_child_wq($self); my ($opt, $env) = @$lei{qw(opt env)}; my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm'); push(@qform, t => 1) if $opt->{thread}; @@ -351,9 +348,7 @@ sub start_query { # always runs in main (lei-daemon) process sub query_prepare { # called by wq_do my ($self, $lei) = @_; local $0 = "$0 query_prepare"; - my %sig = $lei->atfork_child_wq($self); - -p $lei->{op_pipe} or die "BUG: \$done pipe expected"; - local @SIG{keys %sig} = values %sig; + $lei->atfork_child_wq($self); delete $lei->{l2m}->{-wq_s1}; eval { $lei->{l2m}->do_augment($lei) }; $lei->fail($@) if $@; @@ -363,11 +358,11 @@ sub query_prepare { # called by wq_do sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers my ($lei) = @_; my $lxs = delete $lei->{lxs}; - if ($lxs && $lxs->wq_kill_old) { - kill 'PIPE', $$; + if ($lxs && $lxs->wq_kill_old) { # is this the daemon? $lxs->wq_wait_old; } close(delete $lei->{1}) if $lei->{1}; + $lei->x_it(13); } sub do_query { diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t index 47c0e3d4..f7535687 100644 --- a/t/lei_to_mail.t +++ b/t/lei_to_mail.t @@ -12,6 +12,7 @@ use List::Util qw(shuffle); require_mods(qw(DBD::SQLite)); require PublicInbox::MboxReader; require PublicInbox::LeiOverview; +require PublicInbox::LEI; use_ok 'PublicInbox::LeiToMail'; my $from = "Content-Length: 10\nSubject: x\n\nFrom hell\n"; my $noeol = "Subject: x\n\nFrom hell"; @@ -73,7 +74,11 @@ for my $mbox (@MBOX) { my ($tmpdir, $for_destroy) = tmpdir(); local $ENV{TMPDIR} = $tmpdir; open my $err, '>>', "$tmpdir/lei.err" or BAIL_OUT $!; -my $lei = { 2 => $err }; +my $lei = bless { 2 => $err }, 'PublicInbox::LEI'; +my $commit = sub { + $_[0] = undef; # wcb + delete $lei->{1}; +}; my $buf = <<'EOM'; From: x@example.com Subject: x @@ -98,9 +103,7 @@ my $wcb_get = sub { my $zpipe = $l2m->pre_augment($lei); $l2m->do_augment($lei); $l2m->post_augment($lei, $zpipe); - my $cb = $l2m->write_cb($lei); - delete $lei->{1}; - $cb; + $l2m->write_cb($lei); }; my $deadbeef = { blob => 'deadbeef', kw => [ qw(seen) ] }; @@ -109,7 +112,7 @@ my $orig = do { is(ref $wcb, 'CODE', 'write_cb returned callback'); ok(-f $fn && !-s _, 'empty file created'); $wcb->(\(my $dup = $buf), $deadbeef); - undef $wcb; + $commit->($wcb); open my $fh, '<', $fn or BAIL_OUT $!; my $raw = do { local $/; <$fh> }; like($raw, qr/^blah\n/sm, 'wrote content'); @@ -119,7 +122,7 @@ my $orig = do { $wcb = $wcb_get->($mbox, $fn); ok(-f $fn && !-s _, 'truncated mbox destination'); $wcb->(\($dup = $buf), $deadbeef); - undef $wcb; + $commit->($wcb); open $fh, '<', $fn or BAIL_OUT $!; is(do { local $/; <$fh> }, $raw, 'jobs > 1'); $raw; @@ -134,7 +137,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? my $f = "$fn.$zsfx"; my $wcb = $wcb_get->($mbox, $f); $wcb->(\(my $dup = $buf), $deadbeef); - undef $wcb; + $commit->($wcb); my $uncompressed = xqx([@$dc_cmd, $f]); is($uncompressed, $orig, "$zsfx works unlocked"); @@ -142,13 +145,13 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? unlink $f or BAIL_OUT "unlink $!"; $wcb = $wcb_get->($mbox, $f); $wcb->(\($dup = $buf), $deadbeef); - undef $wcb; + $commit->($wcb); is(xqx([@$dc_cmd, $f]), $orig, "$zsfx matches with lock"); local $lei->{opt} = { augment => 1 }; $wcb = $wcb_get->($mbox, $f); $wcb->(\($dup = $buf . "\nx\n"), $deadbeef); - undef $wcb; # commit + $commit->($wcb); my $cat = popen_rd([@$dc_cmd, $f]); my @raw; @@ -160,7 +163,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma? local $lei->{opt} = { augment => 1, jobs => 2 }; $wcb = $wcb_get->($mbox, $f); $wcb->(\($dup = $buf . "\ny\n"), $deadbeef); - undef $wcb; # commit + $commit->($wcb); my @raw3; $cat = popen_rd([@$dc_cmd, $f]); @@ -183,7 +186,7 @@ if ('default deduplication uses content_hash') { my $wcb = $wcb_get->('mboxo', $fn); $deadbeef->{kw} = []; $wcb->(\(my $x = $buf), $deadbeef) for (1..2); - undef $wcb; # undef to commit changes + $commit->($wcb); my $cmp = ''; open my $fh, '<', $fn or BAIL_OUT $!; PublicInbox::MboxReader->mboxo($fh, sub { $cmp .= $as_orig->(@_) }); @@ -192,7 +195,7 @@ if ('default deduplication uses content_hash') { local $lei->{opt} = { augment => 1 }; $wcb = $wcb_get->('mboxo', $fn); $wcb->(\($x = $buf . "\nx\n"), $deadbeef) for (1..2); - undef $wcb; # undef to commit changes + $commit->($wcb); open $fh, '<', $fn or BAIL_OUT $!; my @x; PublicInbox::MboxReader->mboxo($fh, sub { push @x, $as_orig->(@_) }); @@ -206,7 +209,7 @@ if ('default deduplication uses content_hash') { local $lei->{1} = $tmp; my $wcb = $wcb_get->('mboxrd', '/dev/stdout'); $wcb->(\(my $x = $buf), $deadbeef); - undef $wcb; # commit + $commit->($wcb); seek($tmp, 0, SEEK_SET) or BAIL_OUT $!; my $cmp = ''; PublicInbox::MboxReader->mboxrd($tmp, sub { $cmp .= $as_orig->(@_) }); @@ -220,7 +223,7 @@ SKIP: { # FIFO support my $cat = popen_rd([which('cat'), $fn]); my $wcb = $wcb_get->('mboxo', $fn); $wcb->(\(my $x = $buf), $deadbeef); - undef $wcb; # commit + $commit->($wcb); my $cmp = ''; PublicInbox::MboxReader->mboxo($cat, sub { $cmp .= $as_orig->(@_) }); is($cmp, $buf, 'message written to FIFO');