From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id ED64A1FFA7 for ; Sun, 10 Jan 2021 12:15:20 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 19/22] lei: fork + FD cleanup Date: Sun, 10 Jan 2021 12:15:16 +0000 Message-Id: <20210110121519.17044-20-e@80x24.org> In-Reply-To: <20210110121519.17044-1-e@80x24.org> References: <20210110121519.17044-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Do a better job of closing FDs that we don't want shared with the work queue workers. We'll also fix naming and use "atfork_prepare" instead of "atfork_parent" to match pthread_atfork(3) naming. --- lib/PublicInbox/IPC.pm | 57 +++++++++++++++++++++++------------ lib/PublicInbox/LEI.pm | 18 +++++++++-- lib/PublicInbox/LeiQuery.pm | 7 +++-- lib/PublicInbox/LeiXSearch.pm | 11 +++++-- 4 files changed, 68 insertions(+), 25 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 4db4b8ea..88f81e47 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -126,7 +126,7 @@ sub ipc_worker_spawn { pipe(my ($r_res, $w_res)) or die "pipe: $!"; my $sigset = $oldset // PublicInbox::DS::block_signals(); my $parent = $$; - $self->ipc_atfork_parent; + $self->ipc_atfork_prepare; defined(my $pid = fork) or die "fork: $!"; if ($pid == 0) { eval { PublicInbox::DS->Reset }; @@ -155,8 +155,14 @@ sub ipc_worker_reap { # dwaitpid callback } # for base class, override in sub classes -sub ipc_atfork_parent {} -sub ipc_atfork_child {} +sub ipc_atfork_prepare {} + +sub ipc_atfork_child { + my ($self) = @_; + my $io = delete($self->{-ipc_atfork_child_close}) or return; + close($_) for @$io; + undef; +} # idempotent, can be called regardless of whether worker is active or not sub ipc_worker_stop { @@ -251,14 +257,21 @@ sub ipc_sibling_atfork_child { $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself"; } +sub _close_recvd ($) { + my ($self) = @_; + close($_) for (grep { defined } (delete @$self{0..2})); +} + sub wq_worker_loop ($) { my ($self) = @_; my $buf; my $len = $self->{wq_req_len} // (4096 * 33); - my ($rec, $sub, @args); + my ($sub, $args); my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2'; local $SIG{PIPE} = sub { - die(bless(\"$_[0]", __PACKAGE__.'::PIPE')) if $sub; + my $cur_sub = $sub; + _close_recvd($self); + die(bless(\$cur_sub, __PACKAGE__.'::PIPE')) if $cur_sub; }; my $rcv = $self->{-wq_recv_cmd} // $recv_cmd; while (1) { @@ -267,22 +280,25 @@ sub wq_worker_loop ($) { my @m = @{$self->{wq_open_modes} // [qw( +<&= >&= >&= )]}; for my $fd (@fds) { my $mode = shift(@m); - if (open(my $fh, $mode, $fd)) { - $self->{$i++} = $fh; - $fh->autoflush(1); + if (open(my $cmdfh, $mode, $fd)) { + $self->{$i++} = $cmdfh; + $cmdfh->autoflush(1); } else { die "$$ open($mode$fd) (FD:$i): $!"; } } - # Sereal dies, Storable returns undef - $rec = thaw($buf) // + # Sereal dies on truncated data, Storable returns undef + $args = thaw($buf) // die "thaw error on buffer of size:".length($buf); - ($sub, @args) = @$rec; - eval { $self->$sub(@args) }; + eval { + $sub = shift @$args; + eval { $self->$sub(@$args) }; + undef $sub; # quiet SIG{PIPE} handler + die $@ if $@; + }; warn "$$ wq_worker: $@" if $@ && ref $@ ne __PACKAGE__.'::PIPE'; - undef $sub; # quiet SIG{PIPE} handler # need to close explicitly to avoid warnings after SIGPIPE - close($_) for (delete(@$self{0..2})); + _close_recvd($self); } } @@ -306,14 +322,17 @@ sub _wq_worker_start ($$) { eval { PublicInbox::DS->Reset }; close(delete $self->{-wq_s1}); delete $self->{qw(-wq_workers -wq_ppid)}; - $SIG{$_} = 'IGNORE' for (qw(TTOU TTIN)); - $SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT)); + $SIG{$_} = 'IGNORE' for (qw(PIPE TTOU TTIN)); + $SIG{$_} = 'DEFAULT' for (qw(TERM QUIT INT CHLD)); local $0 = $self->{-wq_ident}; PublicInbox::DS::sig_setmask($oldset); + # ensure we properly exit even if warn() dies: + my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) }); my $on_destroy = $self->ipc_atfork_child; eval { wq_worker_loop($self) }; warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@; - exit($@ ? 1 : 0); + undef $on_destroy; + undef $end; # trigger exit } else { $self->{-wq_workers}->{$pid} = \undef; } @@ -326,7 +345,7 @@ sub wq_workers_start { return if $self->{-wq_s1}; # idempotent my ($s1, $s2); socketpair($s1, $s2, AF_UNIX, $SEQPACKET, 0) or die "socketpair: $!"; - $self->ipc_atfork_parent; + $self->ipc_atfork_prepare; $nr_workers //= 4; $nr_workers = $WQ_MAX_WORKERS if $nr_workers > $WQ_MAX_WORKERS; my $sigset = $oldset // PublicInbox::DS::block_signals(); @@ -343,7 +362,7 @@ sub wq_worker_incr { # SIGTTIN handler my ($self, $oldset) = @_; $self->{-wq_s2} or return; return if wq_workers($self) >= $WQ_MAX_WORKERS; - $self->ipc_atfork_parent; + $self->ipc_atfork_prepare; my $sigset = $oldset // PublicInbox::DS::block_signals(); _wq_worker_start($self, $sigset); PublicInbox::DS::sig_setmask($sigset) unless $oldset; diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 0cbf342c..1ef0cbec 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -33,6 +33,7 @@ my $GLP_PASS = Getopt::Long::Parser->new; $GLP_PASS->configure(qw(gnu_getopt no_ignore_case auto_abbrev pass_through)); our %PATH2CFG; # persistent for socket daemon +our @TO_CLOSE_ATFORK_CHILD; # TBD: this is a documentation mechanism to show a subcommand # (may) pass options through to another command: @@ -266,12 +267,20 @@ sub fail ($$;$) { undef; } +sub atfork_prepare_wq { + my ($self, $wq) = @_; + push @{$wq->{-ipc_atfork_child_close}}, @TO_CLOSE_ATFORK_CHILD, + grep { defined } @$self{qw(0 1 2 sock)} +} + # usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq)); sub atfork_child_wq { my ($self, $wq) = @_; $self->{sock} //= $wq->{0}; $self->{$_} //= $wq->{$_} for (0..2); my $oldpipe = $SIG{PIPE}; + %PATH2CFG = (); + @TO_CLOSE_ATFORK_CHILD = (); ( __WARN__ => sub { err($self, @_) }, PIPE => sub { @@ -281,11 +290,14 @@ sub atfork_child_wq { ); } -# usage: ($lei, @io) = $lei->atfork_prepare_wq($wq); -sub atfork_prepare_wq { +# usage: ($lei, @io) = $lei->atfork_parent_wq($wq); +sub atfork_parent_wq { my ($self, $wq) = @_; if ($wq->wq_workers) { + my $env = delete $self->{env}; # env is inherited at fork my $ret = bless { %$self }, ref($self); + $self->{env} = $env; + delete @$ret{qw(-lei_store cfg)}; my $in = delete $ret->{0}; ($ret, delete($ret->{sock}) // $in, delete @$ret{1, 2}); } else { @@ -738,6 +750,7 @@ sub lazy_start { return if $pid; $0 = "lei-daemon $path"; local %PATH2CFG; + local @TO_CLOSE_ATFORK_CHILD = ($l, $eof_r, $eof_w); $_->blocking(0) for ($l, $eof_r, $eof_w); $l = PublicInbox::Listener->new($l, \&accept_dispatch, $l); my $exit_code; @@ -764,6 +777,7 @@ sub lazy_start { local %SIG = (%SIG, %$sig) if !$sigfd; local $SIG{PIPE} = 'IGNORE'; if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets + push @TO_CLOSE_ATFORK_CHILD, $sigfd->{sock}; PublicInbox::DS->SetLoopTimeout(5000); } else { # wake up every second to accept signals if we don't diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index d5376be5..9a383cef 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -66,7 +66,7 @@ sub lei_q { # --local is enabled by default # src: LeiXSearch || LeiSearch || Inbox - my @srcs = $opt->{'local'} ? ($sto->search) : (); + my @srcs; require PublicInbox::LeiXSearch; my $lxs = PublicInbox::LeiXSearch->new; @@ -74,12 +74,15 @@ sub lei_q { if ($opt->{external} // 1) { $self->_externals_each(\&_vivify_external, \@srcs); } - my $j = $opt->{jobs} // scalar(@srcs) > 4 ? 4 : scalar(@srcs); + my $j = $opt->{jobs} // scalar(@srcs) > 3 ? 3 : scalar(@srcs); $j = 1 if !$opt->{thread}; + $j++ if $opt->{'local'}; # for sto->search below if ($self->{sock}) { + $self->atfork_prepare_wq($lxs); $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset) // $self->wq_workers($j); } + unshift(@srcs, $sto->search) if $opt->{'local'}; my $out = $opt->{output} // '-'; $out = 'json:/dev/stdout' if $out eq '-'; my $isatty = -t $self->{1}; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index c0df21a8..b4172734 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -9,6 +9,7 @@ use strict; use v5.10.1; use parent qw(PublicInbox::LeiSearch PublicInbox::IPC); use PublicInbox::Search qw(get_pct); +use Sys::Syslog qw(syslog); sub new { my ($class) = @_; @@ -92,13 +93,13 @@ sub _mset_more ($$) { sub query_thread_mset { # for --thread my ($self, $lei, $ibxish) = @_; + local %SIG = (%SIG, $lei->atfork_child_wq($self)); my ($srch, $over) = ($ibxish->search, $ibxish->over); unless ($srch && $over) { my $desc = $ibxish->{inboxdir} // $ibxish->{topdir}; warn "$desc not indexed by Xapian\n"; return; } - local %SIG = (%SIG, $lei->atfork_child_wq($self)); my $mo = { %{$lei->{mset_opt}} }; my $mset; do { @@ -145,7 +146,7 @@ sub query_mset { # non-parallel for non-"--thread" users sub do_query { my ($self, $lei_orig, $srcs) = @_; - my ($lei, @io) = $lei_orig->atfork_prepare_wq($self); + my ($lei, @io) = $lei_orig->atfork_parent_wq($self); $io[1]->autoflush(1); $io[2]->autoflush(1); if ($lei->{opt}->{thread}) { @@ -161,4 +162,10 @@ sub do_query { } } +sub ipc_atfork_child { + my ($self) = @_; + $SIG{__WARN__} = sub { syslog('warning', "@_") }; + $self->SUPER::ipc_atfork_child; # PublicInbox::IPC +} + 1;