From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-3.9 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id AB9B71FC11 for ; Sun, 7 Feb 2021 08:52:02 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 12/19] lei: more consistent IPC exit and error handling Date: Sun, 7 Feb 2021 08:51:54 +0000 Message-Id: <20210207085201.13871-13-e@80x24.org> In-Reply-To: <20210207085201.13871-1-e@80x24.org> References: <20210207085201.13871-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: We're able to propagate $? from wq_workers in a consistent manner, now. --- lib/PublicInbox/IPC.pm | 22 +++++++++++----------- lib/PublicInbox/LEI.pm | 6 +++--- lib/PublicInbox/LeiImport.pm | 14 ++++++++++---- lib/PublicInbox/LeiXSearch.pm | 12 +++++++++--- 4 files changed, 33 insertions(+), 21 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 728f726c..c8673e26 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -140,10 +140,9 @@ sub ipc_worker_reap { # dwaitpid callback } sub wq_wait_old { - my ($self, @args) = @_; - my $cb = ref($args[0]) eq 'CODE' ? shift(@args) : \&ipc_worker_reap; + my ($self, $cb, @args) = @_; my $pids = delete $self->{"-wq_old_pids.$$"} or return; - dwaitpid($_, $cb, [$self, @args]) for @$pids; + dwaitpid($_, $cb // \&ipc_worker_reap, [$self, @args]) for @$pids; } # for base class, override in sub classes @@ -348,13 +347,12 @@ sub wq_exit { # wakes up wq_worker_decr_wait sub wq_worker_decr { # SIGTTOU handler, kills first idle worker my ($self) = @_; return unless wq_workers($self); - my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2'; - $self->wq_io_do('wq_exit', [ $s2, $s2, $s2 ]); + $self->wq_io_do('wq_exit'); # caller must call wq_worker_decr_wait in main loop } sub wq_worker_decr_wait { - my ($self, $timeout) = @_; + my ($self, $timeout, $cb, @args) = @_; return if $self->{-wq_ppid} != $$; # can't reap siblings or parents my $s1 = $self->{-wq_s1} // croak 'BUG: no wq_s1'; vec(my $rin = '', fileno($s1), 1) = 1; @@ -363,17 +361,17 @@ sub wq_worker_decr_wait { recv($s1, my $pid, 64, 0) // croak "recv: $!"; my $workers = $self->{-wq_workers} // croak 'BUG: no wq_workers'; delete $workers->{$pid} // croak "BUG: PID:$pid invalid"; - dwaitpid($pid, \&ipc_worker_reap, $self); + dwaitpid($pid, $cb // \&ipc_worker_reap, [ $self, @args ]); } # set or retrieve number of workers sub wq_workers { - my ($self, $nr) = @_; + my ($self, $nr, $cb, @args) = @_; my $cur = $self->{-wq_workers} or return; if (defined $nr) { while (scalar(keys(%$cur)) > $nr) { $self->wq_worker_decr; - $self->wq_worker_decr_wait; + $self->wq_worker_decr_wait(undef, $cb, @args); } $self->wq_worker_incr while scalar(keys(%$cur)) < $nr; } @@ -381,7 +379,7 @@ sub wq_workers { } sub wq_close { - my ($self, $nohang) = @_; + my ($self, $nohang, $cb, @args) = @_; delete @$self{qw(-wq_s1 -wq_s2)} or return; my $ppid = delete $self->{-wq_ppid} or return; my $workers = delete $self->{-wq_workers} // die 'BUG: no wq_workers'; @@ -390,7 +388,9 @@ sub wq_close { if ($nohang) { push @{$self->{"-wq_old_pids.$$"}}, @pids; } else { - dwaitpid($_, \&ipc_worker_reap, $self) for @pids; + $cb //= \&ipc_worker_reap; + unshift @args, $self; + dwaitpid($_, $cb, \@args) for @pids; } } diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 515bc2a3..21862488 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -360,7 +360,7 @@ sub fail_handler ($;$$) { my ($lei, $code, $io) = @_; for my $f (@WQ_KEYS) { my $wq = delete $lei->{$f} or next; - $wq->wq_wait_old($lei) if $wq->wq_kill_old; # lei-daemon + $wq->wq_wait_old(undef, $lei) if $wq->wq_kill_old; # lei-daemon } close($io) if $io; # needed to avoid warnings on SIGPIPE $lei->x_it($code // (1 >> 8)); @@ -827,9 +827,9 @@ sub dclose { for my $f (@WQ_KEYS) { my $wq = delete $self->{$f} or next; if ($wq->wq_kill) { - $wq->wq_close + $wq->wq_close(0, undef, $self); } elsif ($wq->wq_kill_old) { - $wq->wq_wait_old($self); + $wq->wq_wait_old(undef, $self); } } close(delete $self->{1}) if $self->{1}; # may reap_compress diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index 3a99570e..2b2dc2f7 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -14,12 +14,18 @@ sub _import_eml { # MboxReader callback $sto->ipc_do('set_eml', $eml, $set_kw ? $sto->mbox_keywords($eml) : ()); } +sub import_done_wait { # dwaitpid callback + my ($arg, $pid) = @_; + my ($imp, $lei) = @$arg; + $lei->child_error($?, 'non-fatal errors during import') if $?; + my $ign = $lei->{sto}->ipc_do('done'); # PublicInbox::LeiStore::done + $lei->dclose; +} + sub import_done { # EOF callback for main daemon my ($lei) = @_; - my $imp = delete $lei->{imp}; - $imp->wq_wait_old($lei) if $imp; - my $wait = $lei->{sto}->ipc_do('done'); - $lei->dclose; + my $imp = delete $lei->{imp} or return; + $imp->wq_wait_old(\&import_done_wait, $lei); } sub call { # the main "lei import" method diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 1ba767c1..1024b020 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -279,12 +279,18 @@ sub git_tmp ($) { $git; } +sub xsearch_done_wait { # dwaitpid callback + my ($arg, $pid) = @_; + my ($wq, $lei) = @$arg; + $lei->child_error($?, 'non-fatal error from '.ref($wq)) if $?; +} + sub query_done { # EOF callback for main daemon my ($lei) = @_; my $l2m = delete $lei->{l2m}; - $l2m->wq_wait_old($lei) if $l2m; + $l2m->wq_wait_old(\&xsearch_done_wait, $lei) if $l2m; if (my $lxs = delete $lei->{lxs}) { - $lxs->wq_wait_old($lei); + $lxs->wq_wait_old(\&xsearch_done_wait, $lei); } $lei->{ovv}->ovv_end($lei); if ($l2m) { # close() calls LeiToMail reap_compress @@ -309,7 +315,7 @@ sub do_post_augment { if (my $err = $@) { if (my $lxs = delete $lei->{lxs}) { $lxs->wq_kill; - $lxs->wq_close; + $lxs->wq_close(0, undef, $lei); } $lei->fail("$err"); }