From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-3.8 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 401761FA13 for ; Thu, 4 Feb 2021 09:59:31 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 03/10] lei q: reorder internals to reduce FD passing Date: Thu, 4 Feb 2021 00:59:23 -0900 Message-Id: <20210204095930.20278-4-e@80x24.org> In-Reply-To: <20210204095930.20278-1-e@80x24.org> References: <20210204095930.20278-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: While FD passing is critical for script/lei <=> lei-daemon, lei-daemon doesn't need to use it internally if FDs are created in the proper order before forking. --- lib/PublicInbox/IPC.pm | 3 -- lib/PublicInbox/LEI.pm | 99 +++++++--------------------------- lib/PublicInbox/LeiOverview.pm | 28 +++------- lib/PublicInbox/LeiToMail.pm | 28 ++++++---- lib/PublicInbox/LeiXSearch.pm | 97 ++++++++++++++++----------------- 5 files changed, 92 insertions(+), 163 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 078aaa2c..7f5a3f6f 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -464,9 +464,6 @@ sub DESTROY { ipc_worker_stop($self); } -# Sereal doesn't have dclone -sub deep_clone { ipc_thaw(ipc_freeze($_[-1])) } - sub detect_nproc () { # _SC_NPROCESSORS_ONLN = 84 on both Linux glibc and musl return POSIX::sysconf(84) if $^O eq 'linux'; diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 49deed13..0d4b1c11 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -286,7 +286,7 @@ sub x_it ($$) { # make sure client sees stdout before exit $self->{1}->autoflush(1) if $self->{1}; dump_and_clear_log(); - if (my $s = $self->{pkt_op} // $self->{sock}) { + if (my $s = $self->{pkt_op_p} // $self->{sock}) { send($s, "x_it $code", MSG_EOR); } elsif ($self->{oneshot}) { # don't want to end up using $? from child processes @@ -322,7 +322,8 @@ sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) } sub fail ($$;$) { my ($self, $buf, $exit_code) = @_; err($self, $buf) if defined $buf; - send($self->{pkt_op}, '!', MSG_EOR) if $self->{pkt_op}; # fail_handler + # calls fail_handler: + send($self->{pkt_op_p}, '!', MSG_EOR) if $self->{pkt_op_p}; x_it($self, ($exit_code // 1) << 8); undef; } @@ -340,7 +341,7 @@ sub puts ($;@) { out(shift, map { "$_\n" } @_) } sub child_error { # passes non-fatal curl exit codes to user my ($self, $child_error) = @_; # child_error is $? - if (my $s = $self->{pkt_op} // $self->{sock}) { + if (my $s = $self->{pkt_op_p} // $self->{sock}) { # send to the parent lei-daemon or to lei(1) client send($s, "child_error $child_error", MSG_EOR); } elsif (!$PublicInbox::DS::in_loop) { @@ -348,94 +349,34 @@ sub child_error { # passes non-fatal curl exit codes to user } # else noop if client disconnected } -sub atfork_prepare_wq { - my ($self, $wq) = @_; - my $tcafc = $wq->{-ipc_atfork_child_close} //= [ $listener // () ]; - if (my $sock = $self->{sock}) { - push @$tcafc, @$self{qw(0 1 2 3)}, $sock; - } - if (my $pgr = $self->{pgr}) { - push @$tcafc, @$pgr[1,2]; - } - if (my $old_1 = $self->{old_1}) { - push @$tcafc, $old_1; - } - for my $f (qw(lxs l2m)) { - my $ipc = $self->{$f} or next; - push @$tcafc, grep { defined } - @$ipc{qw(-wq_s1 -wq_s2 -ipc_req -ipc_res)}; - } -} - -sub io_restore ($$) { - my ($dst, $src) = @_; - for my $i (0..2) { # standard FDs - my $io = delete $src->{$i} or next; - $dst->{$i} = $io; - } - for my $i (3..9) { # named (non-standard) FDs - my $io = $src->{$i} or next; - my @st = stat($io) or die "stat $src.$i ($io): $!"; - my $f = delete $dst->{"dev=$st[0],ino=$st[1]"} // next; - $dst->{$f} = $io; - delete $src->{$i}; - } -} - sub note_sigpipe { # triggers sigpipe_handler my ($self, $fd) = @_; close(delete($self->{$fd})); # explicit close silences Perl warning - send($self->{pkt_op}, '|', MSG_EOR) if $self->{pkt_op}; + send($self->{pkt_op_p}, '|', MSG_EOR) if $self->{pkt_op_p}; x_it($self, 13); } -sub atfork_child_wq { - my ($self, $wq) = @_; - io_restore($self, $wq); - -S $self->{pkt_op} or die 'BUG: {pkt_op} expected'; - io_restore($self->{l2m}, $wq); +sub lei_atfork_child { + my ($self) = @_; + # we need to explicitly close things which are on stack + delete $self->{0}; + for (delete @$self{qw(3 sock old_1 au_done)}) { + close($_) if defined($_); + } + if (my $op_c = delete $self->{pkt_op_c}) { + close(delete $op_c->{sock}); + } + if (my $pgr = delete $self->{pgr}) { + close($_) for (@$pgr[1,2]); + } + close $listener if $listener; + undef $listener; %PATH2CFG = (); undef $errors_log; $quit = \&CORE::exit; $current_lei = $self; # for SIG{__WARN__} } -sub io_extract ($;@) { - my ($obj, @fields) = @_; - my @io; - for my $f (@fields) { - my $io = delete $obj->{$f} or next; - my @st = stat($io) or die "W: stat $obj.$f ($io): $!"; - $obj->{"dev=$st[0],ino=$st[1]"} = $f; - push @io, $io; - } - @io -} - -# usage: ($lei, @io) = $lei->atfork_parent_wq($wq); -sub atfork_parent_wq { - my ($self, $wq) = @_; - my $env = delete $self->{env}; # env is inherited at fork - my $lei = bless { %$self }, ref($self); - for my $f (qw(dedupe ovv)) { - my $tmp = delete($lei->{$f}) or next; - $lei->{$f} = $wq->deep_clone($tmp); - } - $self->{env} = $env; - delete @$lei{qw(sock 3 -lei_store cfg old_1 pgr lxs)}; # keep l2m - my @io = (delete(@$lei{qw(0 1 2)}), - io_extract($lei, qw(pkt_op startq))); - my $l2m = $lei->{l2m}; - if ($l2m && $l2m != $wq) { # $wq == lxs - if (my $wq_s1 = $l2m->{-wq_s1}) { - push @io, io_extract($l2m, '-wq_s1'); - $l2m->{-wq_s1} = $wq_s1; - } - $l2m->wq_close(1); - } - ($lei, @io); -} - sub _help ($;$) { my ($self, $errmsg) = @_; my $cmd = $self->{cmd} // 'COMMAND'; diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index e33d63a2..e6bf4f2a 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -207,7 +207,6 @@ sub ovv_each_smsg_cb { # runs in wq worker usually } $lei->{ovv_buf} = \(my $buf = '') if !$l2m; if ($l2m && !$ibxish) { # remote https?:// mboxrd - delete $l2m->{-wq_s1}; my $g2m = $l2m->can('git_to_mail'); my $wcb = $l2m->write_cb($lei); sub { @@ -215,33 +214,20 @@ sub ovv_each_smsg_cb { # runs in wq worker usually $wcb->(undef, $smsg, $eml); }; } elsif ($l2m && $l2m->{-wq_s1}) { - my ($lei_ipc, @io) = $lei->atfork_parent_wq($l2m); - # $io[0] becomes a notification pipe that triggers EOF + # $io->[0] becomes a notification pipe that triggers EOF # in this wq worker when all outstanding ->write_mail # calls are complete - $io[0] = undef; - pipe($l2m->{each_smsg_done}, $io[0]) or die "pipe: $!"; - fcntl($io[0], 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ - delete @$lei_ipc{qw(l2m opt mset_opt cmd)}; + my $io = []; + pipe($l2m->{each_smsg_done}, $io->[0]) or die "pipe: $!"; + fcntl($io->[0], 1031, 4096) if $^O eq 'linux'; # F_SETPIPE_SZ my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git $self->{git} = $git; my $git_dir = $git->{git_dir}; sub { my ($smsg, $mitem) = @_; $smsg->{pct} = get_pct($mitem) if $mitem; - $l2m->wq_do('write_mail', \@io, $git_dir, $smsg, - $lei_ipc); + $l2m->wq_do('write_mail', $io, $git_dir, $smsg); } - } elsif ($l2m) { - my $wcb = $l2m->write_cb($lei); - my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git - $self->{git} = $git; # for ovv_atexit_child - my $g2m = $l2m->can('git_to_mail'); - sub { - my ($smsg, $mitem) = @_; - $smsg->{pct} = get_pct($mitem) if $mitem; - $git->cat_async($smsg->{blob}, $g2m, [ $wcb, $smsg ]); - }; } elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) { my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},"; sub { # DIY prettiness :P @@ -275,7 +261,9 @@ sub ovv_each_smsg_cb { # runs in wq worker usually $lei->out($buf); $buf = ''; } - } # else { ... + } else { + die "TODO: unhandled case $self->{fmt}" + } } no warnings 'once'; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index c704dc2a..f9250860 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -211,10 +211,10 @@ sub zsfx2cmd ($$$) { } sub _post_augment_mbox { # open a compressor process - my ($self, $lei, $zpipe) = @_; + my ($self, $lei) = @_; my $zsfx = $self->{zsfx} or return; my $cmd = zsfx2cmd($zsfx, undef, $lei); - my ($r, $w) = splice(@$zpipe, 0, 2); + my ($r, $w) = @{delete $lei->{zpipe}}; my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2} }; my $pid = spawn($cmd, $lei->{env}, $rdr); my $pp = gensym; @@ -407,7 +407,7 @@ sub _pre_augment_mbox { $! == ENOENT or die "unlink($dst): $!"; } open my $out, $mode, $dst or die "open($dst): $!"; - $lei->{old_1} = $lei->{1}; + $lei->{old_1} = $lei->{1}; # keep for spawning MUA $lei->{1} = $out; } # Perl does SEEK_END even with O_APPEND :< @@ -418,7 +418,7 @@ sub _pre_augment_mbox { state $zsfx_allow = join('|', keys %zsfx2cmd); ($self->{zsfx}) = ($dst =~ /\.($zsfx_allow)\z/) or return; pipe(my ($r, $w)) or die "pipe: $!"; - [ $r, $w ]; + $lei->{zpipe} = [ $r, $w ]; } sub _do_augment_mbox { @@ -462,16 +462,24 @@ sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon $self->$m($lei, @args); } +sub ipc_atfork_child { + my ($self) = @_; + my $lei = delete $self->{lei}; + $lei->lei_atfork_child; + if (my $zpipe = delete $lei->{zpipe}) { + $lei->{1} = $zpipe->[1]; + close $zpipe->[0]; + } + $self->{wcb} = $self->write_cb($lei); + $self->SUPER::ipc_atfork_child; +} + sub write_mail { # via ->wq_do - my ($self, $git_dir, $smsg, $lei) = @_; + my ($self, $git_dir, $smsg) = @_; my $not_done = delete $self->{0} // die 'BUG: $not_done missing'; - my $wcb = $self->{wcb} //= do { # first message - $lei->atfork_child_wq($self); - $self->write_cb($lei); - }; my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir); git_async_cat($git, $smsg->{blob}, \&git_to_mail, - [$wcb, $smsg, $not_done]); + [$self->{wcb}, $smsg, $not_done]); } sub wq_atexit_child { diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index ab66717c..e41d899e 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -110,8 +110,8 @@ sub wait_startq ($) { sub mset_progress { my $lei = shift; return unless $lei->{-progress}; - if ($lei->{pkt_op}) { # called via pkt_op/pkt_do from workers - pkt_do($lei->{pkt_op}, 'mset_progress', @_); + if ($lei->{pkt_op_p}) { + pkt_do($lei->{pkt_op_p}, 'mset_progress', @_); } else { # single lei-daemon consumer my ($desc, $mset_size, $mset_total_est) = @_; $lei->{-mset_total} += $mset_size; @@ -120,11 +120,10 @@ sub mset_progress { } sub query_thread_mset { # for --thread - my ($self, $lei, $ibxish) = @_; + my ($self, $ibxish) = @_; local $0 = "$0 query_thread_mset"; - $lei->atfork_child_wq($self); + my $lei = $self->{lei}; my $startq = delete $lei->{startq}; - my ($srch, $over) = ($ibxish->search, $ibxish->over); my $desc = $ibxish->{inboxdir} // $ibxish->{topdir}; return warn("$desc not indexed by Xapian\n") unless ($srch && $over); @@ -154,9 +153,9 @@ sub query_thread_mset { # for --thread } sub query_mset { # non-parallel for non-"--thread" users - my ($self, $lei) = @_; + my ($self) = @_; local $0 = "$0 query_mset"; - $lei->atfork_child_wq($self); + my $lei = $self->{lei}; my $startq = delete $lei->{startq}; my $mo = { %{$lei->{mset_opt}} }; my $mset; @@ -207,10 +206,10 @@ sub kill_reap { } sub query_remote_mboxrd { - my ($self, $lei, $uris) = @_; + my ($self, $uris) = @_; local $0 = "$0 query_remote_mboxrd"; - $lei->atfork_child_wq($self); local $SIG{TERM} = sub { exit(0) }; # for DESTROY (File::Temp, $reap) + my $lei = $self->{lei}; my ($opt, $env) = @$lei{qw(opt env)}; my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm'); push(@qform, t => 1) if $opt->{thread}; @@ -307,7 +306,7 @@ sub git { $git; } -sub query_done { # EOF callback +sub query_done { # EOF callback for main daemon my ($lei) = @_; my $has_l2m = exists $lei->{l2m}; for my $f (qw(lxs l2m)) { @@ -332,9 +331,8 @@ Error closing $lei->{ovv}->{dst}: $! } sub do_post_augment { - my ($lei, $zpipe, $au_done) = @_; - my $l2m = $lei->{l2m} or die 'BUG: no {l2m}'; - eval { $l2m->post_augment($lei, $zpipe) }; + my ($lei) = @_; + eval { $lei->{l2m}->post_augment($lei) }; if (my $err = $@) { if (my $lxs = delete $lei->{lxs}) { $lxs->wq_kill; @@ -342,7 +340,7 @@ sub do_post_augment { } $lei->fail("$err"); } - close $au_done; # triggers wait_startq + close(delete $lei->{au_done}); # triggers wait_startq } my $MAX_PER_HOST = 4; @@ -356,13 +354,13 @@ sub concurrency { } sub start_query { # always runs in main (lei-daemon) process - my ($self, $io, $lei) = @_; + my ($self, $lei) = @_; if ($lei->{opt}->{thread}) { for my $ibxish (locals($self)) { - $self->wq_do('query_thread_mset', $io, $lei, $ibxish); + $self->wq_do('query_thread_mset', [], $ibxish); } } elsif (locals($self)) { - $self->wq_do('query_mset', $io, $lei); + $self->wq_do('query_mset', []); } my $i = 0; my $q = []; @@ -370,19 +368,23 @@ sub start_query { # always runs in main (lei-daemon) process push @{$q->[$i++ % $MAX_PER_HOST]}, $uri; } for my $uris (@$q) { - $self->wq_do('query_remote_mboxrd', $io, $lei, $uris); + $self->wq_do('query_remote_mboxrd', [], $uris); } - @$io = (); +} + +sub ipc_atfork_child { + my ($self) = @_; + $self->{lei}->lei_atfork_child; + $self->SUPER::ipc_atfork_child; } sub query_prepare { # called by wq_do - my ($self, $lei) = @_; + my ($self) = @_; local $0 = "$0 query_prepare"; - $lei->atfork_child_wq($self); - delete $lei->{l2m}->{-wq_s1}; + my $lei = $self->{lei}; eval { $lei->{l2m}->do_augment($lei) }; $lei->fail($@) if $@; - pkt_do($lei->{pkt_op}, '.') == 1 or die "do_post_augment trigger: $!" + pkt_do($lei->{pkt_op_p}, '.') == 1 or die "do_post_augment trigger: $!" } sub fail_handler ($;$$) { @@ -401,45 +403,38 @@ sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers sub do_query { my ($self, $lei) = @_; - $lei->{1}->autoflush(1); - $lei->start_pager if -t $lei->{1}; - $lei->{ovv}->ovv_begin($lei); - my ($au_done, $zpipe); - my $l2m = $lei->{l2m}; - $lei->atfork_prepare_wq($self); - $self->wq_workers_start('lei_xsearch', $self->{jobs}, $lei->oldset); - delete $self->{-ipc_atfork_child_close}; - if ($l2m) { - $lei->atfork_prepare_wq($l2m); - $l2m->wq_workers_start('lei2mail', $l2m->{jobs}, $lei->oldset); - delete $l2m->{-ipc_atfork_child_close}; - pipe($lei->{startq}, $au_done) or die "pipe: $!"; - # 1031: F_SETPIPE_SZ - fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux'; - $zpipe = $l2m->pre_augment($lei); - } my $ops = { '|' => [ \&sigpipe_handler, $lei ], '!' => [ \&fail_handler, $lei ], - '.' => [ \&do_post_augment, $lei, $zpipe, $au_done ], + '.' => [ \&do_post_augment, $lei ], '' => [ \&query_done, $lei ], 'mset_progress' => [ \&mset_progress, $lei ], 'x_it' => [ $lei->can('x_it'), $lei ], 'child_error' => [ $lei->can('child_error'), $lei ], }; - (my $op, $lei->{pkt_op}) = PublicInbox::PktOp->pair($ops); - my ($lei_ipc, @io) = $lei->atfork_parent_wq($self); - delete($lei->{pkt_op}); - - $lei->event_step_init; # wait for shutdowns + ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); + $lei->{1}->autoflush(1); + $lei->start_pager if -t $lei->{1}; + $lei->{ovv}->ovv_begin($lei); + my $l2m = $lei->{l2m}; if ($l2m) { - $self->wq_do('query_prepare', \@io, $lei_ipc); - $io[1] = $zpipe->[1] if $zpipe; + $l2m->pre_augment($lei); + $l2m->wq_workers_start('lei2mail', $l2m->{jobs}, + $lei->oldset, { lei => $lei }); + pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!"; + # 1031: F_SETPIPE_SZ + fcntl($lei->{startq}, 1031, 4096) if $^O eq 'linux'; } - start_query($self, \@io, $lei_ipc); - $self->wq_close(1); + $self->wq_workers_start('lei_xsearch', $self->{jobs}, + $lei->oldset, { lei => $lei }); + my $op = delete $lei->{pkt_op_c}; + delete $lei->{pkt_op_p}; + $l2m->wq_close(1) if $l2m; + $lei->event_step_init; # wait for shutdowns + $self->wq_do('query_prepare', []) if $l2m; + start_query($self, $lei); + $self->wq_close(1); # lei_xsearch workers stop when done if ($lei->{oneshot}) { - # for the $lei_ipc->atfork_child_wq PIPE handler: while ($op->{sock}) { $op->event_step } } }