From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 09BFA1FB05 for ; Wed, 28 Apr 2021 07:52:06 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 08/11] lei: simple WQ workers use {wq1} field Date: Wed, 28 Apr 2021 07:52:02 +0000 Message-Id: <20210428075205.19440-9-e@80x24.org> In-Reply-To: <20210428075205.19440-1-e@80x24.org> References: <20210428075205.19440-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This lets us share more code and reduces cognitive overhead when it comes to picking names (because {lsss} was ridiculous). We'll need to ensure the first error set in lei is the actual error we exit with, otherwise things can get confusing and errors may get lost. --- lib/PublicInbox/LEI.pm | 16 ++++++++++++---- lib/PublicInbox/LeiBlob.pm | 8 +------- lib/PublicInbox/LeiConvert.pm | 2 +- lib/PublicInbox/LeiImport.pm | 9 ++------- lib/PublicInbox/LeiLsSearch.pm | 2 +- lib/PublicInbox/LeiMirror.pm | 4 ++-- lib/PublicInbox/LeiP2q.pm | 8 +------- lib/PublicInbox/LeiTag.pm | 9 ++------- script/lei | 4 ++-- 9 files changed, 24 insertions(+), 38 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index cfbf12f0..403f9ed8 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -382,7 +382,7 @@ my %CONFIG_KEYS = ( 'leistore.dir' => 'top-level storage location', ); -my @WQ_KEYS = qw(lxs l2m imp mrr cnv p2q tag sol lsss); # internal workers +my @WQ_KEYS = qw(lxs l2m wq1); # internal workers sub _drop_wq { my ($self) = @_; @@ -542,7 +542,7 @@ sub workers_start { 'child_error' => [ \&child_error, $lei ], ($ops ? %$ops : ()), }; - $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&dclose, $lei ]; + $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ]; my $end = $lei->pkt_op_pair; $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei }); delete $lei->{pkt_op_p}; @@ -1237,9 +1237,17 @@ sub DESTROY { sub wq_done_wait { # dwaitpid callback my ($arg, $pid) = @_; - my ($wq, $lei, $e) = @$arg; - $? and $lei->child_error($?, $e ? "$e errors during $lei->{cmd}" : ()); + my ($wq, $lei) = @$arg; + my $err_type = $lei->{-err_type}; + $? and $lei->child_error($?, + $err_type ? "$err_type errors during $lei->{cmd}" : ()); $lei->dclose; } +sub wq_eof { # EOF callback for main daemon + my ($lei) = @_; + my $wq1 = delete $lei->{wq1} // return $lei->fail; # already failed + $wq1->wq_wait_old(\&wq_done_wait, $lei); +} + 1; diff --git a/lib/PublicInbox/LeiBlob.pm b/lib/PublicInbox/LeiBlob.pm index ff079e65..0a957358 100644 --- a/lib/PublicInbox/LeiBlob.pm +++ b/lib/PublicInbox/LeiBlob.pm @@ -10,12 +10,6 @@ use parent qw(PublicInbox::IPC); use PublicInbox::Spawn qw(spawn popen_rd which); use PublicInbox::DS; -sub _lei_wq_eof { # EOF callback for main daemon - my ($lei) = @_; - my $sol = delete $lei->{sol} // return $lei->dclose; # already failed - $sol->wq_wait_old($lei->can('wq_done_wait'), $lei); -} - sub get_git_dir ($$) { my ($lei, $d) = @_; return $d if -d "$d/objects" && -d "$d/refs" && -e "$d/HEAD"; @@ -158,7 +152,7 @@ EOM require PublicInbox::SolverGit; my $self = bless { lxs => $lxs, oid_b => $blob }, __PACKAGE__; my ($op_c, $ops) = $lei->workers_start($self, 'lei-blob', 1); - $lei->{sol} = $self; + $lei->{wq1} = $self; $self->wq_io_do('do_solve_blob', []); $self->wq_close(1); $op_c->op_wait_event($ops); diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm index 14bed901..cefcaf65 100644 --- a/lib/PublicInbox/LeiConvert.pm +++ b/lib/PublicInbox/LeiConvert.pm @@ -53,7 +53,7 @@ sub lei_convert { # the main "lei convert" method $lei->{opt}->{augment} = 1 if $devfd < 0; $self->prepare_inputs($lei, \@inputs) or return; my ($op_c, $ops) = $lei->workers_start($self, 'lei-convert', 1); - $lei->{cnv} = $self; + $lei->{wq1} = $self; $self->wq_io_do('process_inputs', []); $self->wq_close(1); $op_c->op_wait_event($ops); diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index f2a0c95a..26127ece 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -53,12 +53,6 @@ sub input_nntp_cb { # nntp_each input_eml_cb($self, $eml, $self->{-import_kw} ? { kw => $kw } : undef); } -sub _lei_wq_eof { # EOF callback for main daemon - my ($lei) = @_; - my $imp = delete $lei->{imp} // return $lei->fail('BUG: {imp} gone'); - $imp->wq_wait_old($lei->can('wq_done_wait'), $lei, 'non-fatal'); -} - sub net_merge_complete { # callback used by LeiAuth my ($self) = @_; $self->wq_io_do('process_inputs'); @@ -95,7 +89,8 @@ sub lei_import { # the main "lei import" method $self->{-wq_nr_workers} = $j // 1; # locked $lei->{-eml_noisy} = 1; (my $op_c, $ops) = $lei->workers_start($self, 'lei-import', $j, $ops); - $lei->{imp} = $self; + $lei->{wq1} = $self; + $lei->{-err_type} = 'non-fatal'; net_merge_complete($self) unless $lei->{auth}; $op_c->op_wait_event($ops); } diff --git a/lib/PublicInbox/LeiLsSearch.pm b/lib/PublicInbox/LeiLsSearch.pm index 9ac4870f..a00e78fc 100644 --- a/lib/PublicInbox/LeiLsSearch.pm +++ b/lib/PublicInbox/LeiLsSearch.pm @@ -73,7 +73,7 @@ sub bg_worker ($$$) { my ($lei, $pfx, $json) = @_; my $self = bless { -wq_nr_workers => 1, json => $json }, __PACKAGE__; my ($op_c, $ops) = $lei->workers_start($self, 'ls-search', 1); - $lei->{lsss} = $self; + $lei->{wq1} = $self; $self->wq_io_do('do_ls_search_long', [], $pfx); $self->wq_close(1); $op_c->op_wait_event($ops); diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm index 50ab4c85..db97b98c 100644 --- a/lib/PublicInbox/LeiMirror.pm +++ b/lib/PublicInbox/LeiMirror.pm @@ -26,7 +26,7 @@ sub do_finish_mirror { # dwaitpid callback sub _lei_wq_eof { # EOF callback for main daemon my ($lei) = @_; - my $mrr = delete $lei->{mrr} or return; + my $mrr = delete $lei->{wq1} or return $lei->fail; $mrr->wq_wait_old(\&do_finish_mirror, $lei); } @@ -283,7 +283,7 @@ sub start { require PublicInbox::Admin; require PublicInbox::InboxWritable; my ($op, $ops) = $lei->workers_start($self, 'lei_mirror', 1); - $lei->{mrr} = $self; + $lei->{wq1} = $self; $self->wq_io_do('do_mirror', []); $self->wq_close(1); $op->op_wait_event($ops); diff --git a/lib/PublicInbox/LeiP2q.pm b/lib/PublicInbox/LeiP2q.pm index deb31974..b4893489 100644 --- a/lib/PublicInbox/LeiP2q.pm +++ b/lib/PublicInbox/LeiP2q.pm @@ -189,7 +189,7 @@ sub lei_p2q { # the "lei patch-to-query" entry point $self->{input} = $input; } my ($op, $ops) = $lei->workers_start($self, 'lei-p2q', 1); - $lei->{p2q} = $self; + $lei->{wq1} = $self; $self->wq_io_do('do_p2q', []); $self->wq_close(1); $op->op_wait_event($ops); @@ -201,10 +201,4 @@ sub ipc_atfork_child { $self->SUPER::ipc_atfork_child; } -sub _lei_wq_eof { # EOF callback for main daemon - my ($lei) = @_; - my $p2q = delete $lei->{p2q} // return $lei->dclose; - $p2q->wq_wait_old($lei->can('wq_done_wait'), $lei); -} - 1; diff --git a/lib/PublicInbox/LeiTag.pm b/lib/PublicInbox/LeiTag.pm index 3cda2eca..989a6954 100644 --- a/lib/PublicInbox/LeiTag.pm +++ b/lib/PublicInbox/LeiTag.pm @@ -19,12 +19,6 @@ sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh sub input_mbox_cb { input_eml_cb($_[1], $_[0]) } -sub _lei_wq_eof { # EOF callback for main daemon - my ($lei) = @_; - my $tag = delete $lei->{tag} // return $lei->dclose; - $tag->wq_wait_old($lei->can('wq_done_wait'), $lei, 'non-fatal'); -} - sub net_merge_complete { # callback used by LeiAuth my ($self) = @_; $self->wq_io_do('process_inputs'); @@ -57,7 +51,8 @@ sub lei_tag { # the "lei tag" method $self->{vmd_mod} = $vmd_mod; my $j = $self->{-wq_nr_workers} = 1; # locked for now (my $op_c, $ops) = $lei->workers_start($self, 'lei-tag', $j, $ops); - $lei->{tag} = $self; + $lei->{wq1} = $self; + $lei->{-err_type} = 'non-fatal'; net_merge_complete($self) unless $lei->{auth}; $op_c->op_wait_event($ops); } diff --git a/script/lei b/script/lei index db302422..90a93839 100755 --- a/script/lei +++ b/script/lei @@ -116,10 +116,10 @@ Falling back to (slow) one-shot mode } elsif ($buf eq '-WINCH') { kill($buf, @parent); # for MUA } elsif ($buf =~ /\Ax_it ([0-9]+)\z/) { - $x_it_code = $1 + 0; + $x_it_code ||= $1 + 0; last; } elsif ($buf =~ /\Achild_error ([0-9]+)\z/) { - $x_it_code = $1 + 0; + $x_it_code ||= $1 + 0; } else { $sigchld->(); die $buf;