From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF, T_SCC_BODY_TEXT_LINE shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 30C641F406 for ; Tue, 7 Nov 2023 13:01:48 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1699362108; bh=nwbkLJiFlGz/fMTw7Os7aIVnez8H28UgwPCRSIuu1n0=; h=From:To:Subject:Date:From; b=V2btiWTA9x72Ukl+fsbvC6v3cgoPPgFIqEtaWiAIBxrBReo1vqTrwSbZEKLlhVjVr k05ZkQ1qO6lvowq7uFuHP1wPL6IiVmHgx8zRBQVFln1cDpDEt8zbaMGZ3cCgx+T6E+ 6ghp6Vc0QVize4Qwu1NmcgPXtMYxUbAxyvwjY3HI= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH] lei: fix SIGPIPE on large result sets to pager Date: Tue, 7 Nov 2023 13:01:47 +0000 Message-Id: <20231107130147.287420-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: When dealing with large search results, we need to deal with EPIPE not just from the pager, but also EPIPE or ECONNRESET between lei_xsearch and lei2mail processes. Without this fix, lei_xsearch processes could linger and get stuck writing to dead lei2mail processes if a user aborts the pager early during a large result set. To ensure lei_xsearch processes don't linger around after lei2mail workers all die, we must close $l2m->{-wq_s2} before spawning lei_xsearch processes, since $l2m->{-wq_s2} is only used in lei2mail workers. For `git cat-file' processes, we also need to trigger PublicInbox::Git->close to handle unpredictable destructor ordering to avoid using uninitialized IO refs. This combines with the `git_to_mail' change to deal with process cleanup handling from premature shutdowns. To test all this, we can't just rely on a single message being large, but also need to rely on the result set being large enough to saturate the lei_xsearch -> lei2mail socket so we rely on GIANT_INBOX_DIR once again. --- lib/PublicInbox/Git.pm | 2 ++ lib/PublicInbox/LeiOverview.pm | 3 ++- lib/PublicInbox/LeiToMail.pm | 8 +++++++- lib/PublicInbox/LeiXSearch.pm | 3 ++- t/lei-sigpipe.t | 27 ++++++++++++++++++++------- 5 files changed, 33 insertions(+), 10 deletions(-) diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index 11712db2..292c359a 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -276,6 +276,7 @@ sub cat_async_step ($$) { sub cat_async_wait ($) { my ($self) = @_; + $self->close if !$self->{sock}; my $inflight = $self->{inflight} or return; while (scalar(@$inflight)) { cat_async_step($self, $inflight); @@ -331,6 +332,7 @@ sub check_async_wait ($) { my ($self) = @_; return cat_async_wait($self) if $self->{-bc}; my $ck = $self->{ck} or return; + $ck->close if !$ck->{sock}; my $inflight = $ck->{inflight} or return; check_async_step($ck, $inflight) while (scalar(@$inflight)); } diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index 066c40bd..129dabf8 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -212,7 +212,8 @@ sub ovv_each_smsg_cb { # runs in wq worker usually sub { my ($smsg, $mitem, $eml) = @_; $smsg->{pct} = get_pct($mitem) if $mitem; - $l2m->wq_io_do('write_mail', [], $smsg, $eml); + eval { $l2m->wq_io_do('write_mail', [], $smsg, $eml) }; + $lei->fail($@) if $@ && !$!{ECONNRESET} && !$!{EPIPE}; } } elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) { my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},"; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index e80163e2..b73af68a 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -8,11 +8,13 @@ use v5.10.1; use parent qw(PublicInbox::IPC); use PublicInbox::Eml; use PublicInbox::IO; +use PublicInbox::Git; use PublicInbox::Spawn qw(spawn); use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY); use PublicInbox::Syscall qw(rename_noreplace); use autodie qw(open seek close); +use Carp qw(croak); my %kw2char = ( # Maildir characters draft => 'D', @@ -132,8 +134,12 @@ sub eml2mboxcl2 { sub git_to_mail { # git->cat_async callback my ($bref, $oid, $type, $size, $smsg) = @_; - my $self = delete $smsg->{l2m} // die "BUG: no l2m"; $type // return; # called by PublicInbox::Git::close + my $self = delete $smsg->{l2m}; + if (!defined($self)) { + return if $PublicInbox::Git::in_cleanup; + croak "BUG: no l2m (type=$type)"; + } eval { if ($type eq 'missing' && ($bref = $self->{-lms_rw}->local_blob($oid, 1))) { diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 5443188d..6c8dfe10 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -20,7 +20,7 @@ use PublicInbox::LEI; use Fcntl qw(SEEK_SET F_SETFL O_APPEND O_RDWR); use PublicInbox::ContentHash qw(git_sha); use POSIX qw(strftime); -use autodie qw(open read seek truncate); +use autodie qw(close open read seek truncate); use PublicInbox::Syscall qw($F_SETPIPE_SZ); sub new { @@ -543,6 +543,7 @@ sub do_query { pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!"; fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ; delete $l2m->{au_peers}; + close(delete $l2m->{-wq_s2}); # share wq_s1 with lei_xsearch } $self->wq_workers_start('lei_xsearch', undef, $lei->oldset, { lei => $lei }, diff --git a/t/lei-sigpipe.t b/t/lei-sigpipe.t index 622598a4..1aa700e9 100644 --- a/t/lei-sigpipe.t +++ b/t/lei-sigpipe.t @@ -7,6 +7,19 @@ use PublicInbox::TestCommon; use POSIX qw(WTERMSIG WIFSIGNALED SIGPIPE); use PublicInbox::OnDestroy; use PublicInbox::Syscall qw($F_SETPIPE_SZ); +use autodie qw(close open pipe seek sysread); +use PublicInbox::IO qw(write_file); +my $inboxdir = $ENV{GIANT_INBOX_DIR}; +SKIP: { + $inboxdir // skip 'GIANT_INBOX_DIR unset to test large results'; + require PublicInbox::Inbox; + my $ibx = PublicInbox::Inbox->new({ + name => 'unconfigured-test', + address => [ "test\@example.com" ], + inboxdir => $inboxdir, + }); + $ibx->search or xbail "GIANT_INBOX_DIR=$inboxdir has no search"; +} # undo systemd (and similar) ignoring SIGPIPE, since lei expects to be run # from an interactive terminal: @@ -21,30 +34,30 @@ test_lei(sub { my $f = "$ENV{HOME}/big.eml"; my $imported; for my $out ([], [qw(-f mboxcl2)], [qw(-f text)]) { - pipe(my ($r, $w)) or BAIL_OUT $!; + pipe(my $r, my $w); my $size = $F_SETPIPE_SZ && fcntl($w, $F_SETPIPE_SZ, 4096) ? 4096 : 65536; unless (-f $f) { - open my $fh, '>', $f or xbail "open $f: $!"; - print $fh <<'EOM' or xbail; + my $fh = write_file '>', $f, <<'EOM'; From: big@example.com Message-ID: EOM print $fh 'Subject:'; print $fh (' '.('x' x 72)."\n") x (($size / 73) + 1); print $fh "\nbody\n"; - close $fh or xbail "close: $!"; + close $fh; } lei_ok(qw(import), $f) if $imported++ == 0; - open my $errfh, '+>>', "$ENV{HOME}/stderr.log" or xbail $!; + open my $errfh, '+>>', "$ENV{HOME}/stderr.log"; my $opt = { run_mode => 0, 2 => $errfh, 1 => $w }; my $cmd = [qw(lei q -q -t), @$out, 'z:1..']; + push @$cmd, '--only='.$inboxdir if defined $inboxdir; my $tp = start_script($cmd, undef, $opt); close $w; vec(my $rvec = '', fileno($r), 1) = 1; if (!select($rvec, undef, undef, 30)) { - seek($errfh, 0, 0) or xbail $!; + seek($errfh, 0, 0); my $s = do { local $/; <$errfh> }; xbail "lei q had no output after 30s, stderr=$s"; } @@ -53,7 +66,7 @@ EOM $tp->join; ok(WIFSIGNALED($?), "signaled @$out"); is(WTERMSIG($?), SIGPIPE, "got SIGPIPE @$out"); - seek($errfh, 0, 0) or xbail $!; + seek($errfh, 0, 0); my $s = do { local $/; <$errfh> }; is($s, '', "quiet after sigpipe @$out"); }