From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 8E2BD1FA17 for ; Tue, 27 Apr 2021 11:07:53 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 3/5] lei: standardize on _lei_wq_eof callback for workers Date: Tue, 27 Apr 2021 11:07:51 +0000 Message-Id: <20210427110753.24609-4-e@80x24.org> In-Reply-To: <20210427110753.24609-1-e@80x24.org> References: <20210427110753.24609-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Simplify our internals a little bit. --- lib/PublicInbox/LEI.pm | 2 +- lib/PublicInbox/LeiBlob.pm | 5 ++--- lib/PublicInbox/LeiConvert.pm | 2 +- lib/PublicInbox/LeiImport.pm | 6 +++--- lib/PublicInbox/LeiMirror.pm | 6 ++---- lib/PublicInbox/LeiP2q.pm | 2 +- lib/PublicInbox/LeiTag.pm | 8 ++++---- 7 files changed, 14 insertions(+), 17 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index c170572b..effc905a 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -531,7 +531,7 @@ sub workers_start { 'child_error' => [ \&child_error, $lei ], ($ops ? %$ops : ()), }; - $ops->{''} //= [ \&dclose, $lei ]; + $ops->{''} //= [ $wq->can('_lei_wq_eof') || \&dclose, $lei ]; my $end = $lei->pkt_op_pair; $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei }); delete $lei->{pkt_op_p}; diff --git a/lib/PublicInbox/LeiBlob.pm b/lib/PublicInbox/LeiBlob.pm index 4e52c8a5..0b96bd04 100644 --- a/lib/PublicInbox/LeiBlob.pm +++ b/lib/PublicInbox/LeiBlob.pm @@ -10,7 +10,7 @@ use parent qw(PublicInbox::IPC); use PublicInbox::Spawn qw(spawn popen_rd which); use PublicInbox::DS; -sub sol_done { # EOF callback for main daemon +sub _lei_wq_eof { # EOF callback for main daemon my ($lei) = @_; my $sol = delete $lei->{sol} // return $lei->dclose; # already failed $sol->wq_wait_old($lei->can('wq_done_wait'), $lei); @@ -157,8 +157,7 @@ EOM } require PublicInbox::SolverGit; my $self = bless { lxs => $lxs, oid_b => $blob }, __PACKAGE__; - my ($op_c, $ops) = $lei->workers_start($self, 'lei_solve', 1, - { '' => [ \&sol_done, $lei ] }); + my ($op_c, $ops) = $lei->workers_start($self, 'lei-blob', 1); $lei->{sol} = $self; $self->wq_io_do('do_solve_blob', []); $self->wq_close(1); diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm index 0ce49ea9..0c324169 100644 --- a/lib/PublicInbox/LeiConvert.pm +++ b/lib/PublicInbox/LeiConvert.pm @@ -52,7 +52,7 @@ sub lei_convert { # the main "lei convert" method my $devfd = $lei->path_to_fd($ovv->{dst}) // return; $lei->{opt}->{augment} = 1 if $devfd < 0; $self->prepare_inputs($lei, \@inputs) or return; - my ($op_c, $ops) = $lei->workers_start($self, 'lei_convert', 1); + my ($op_c, $ops) = $lei->workers_start($self, 'lei-convert', 1); $lei->{cnv} = $self; $self->wq_io_do('process_inputs', []); $self->wq_close(1); diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index daaa6753..e0d899cc 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -53,7 +53,7 @@ sub input_nntp_cb { # nntp_each input_eml_cb($self, $eml, $self->{-import_kw} ? { kw => $kw } : undef); } -sub import_done { # EOF callback for main daemon +sub _lei_wq_eof { # EOF callback for main daemon my ($lei) = @_; my $imp = delete $lei->{imp} // return $lei->fail('BUG: {imp} gone'); $imp->wq_wait_old($lei->can('wq_done_wait'), $lei, 'non-fatal'); @@ -90,10 +90,10 @@ sub lei_import { # the main "lei import" method my $nproc = $self->detect_nproc; $j = $nproc if $j > $nproc; } - my $ops = { '' => [ \&import_done, $lei ] }; + my $ops = {}; $lei->{auth}->op_merge($ops, $self) if $lei->{auth}; $self->{-wq_nr_workers} = $j // 1; # locked - (my $op_c, $ops) = $lei->workers_start($self, 'lei_import', $j, $ops); + (my $op_c, $ops) = $lei->workers_start($self, 'lei-import', $j, $ops); $lei->{imp} = $self; net_merge_complete($self) unless $lei->{auth}; $op_c->op_wait_event($ops); diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm index 15adb71b..50ab4c85 100644 --- a/lib/PublicInbox/LeiMirror.pm +++ b/lib/PublicInbox/LeiMirror.pm @@ -24,7 +24,7 @@ sub do_finish_mirror { # dwaitpid callback $lei->dclose; } -sub mirror_done { # EOF callback for main daemon +sub _lei_wq_eof { # EOF callback for main daemon my ($lei) = @_; my $mrr = delete $lei->{mrr} or return; $mrr->wq_wait_old(\&do_finish_mirror, $lei); @@ -282,9 +282,7 @@ sub start { require PublicInbox::Inbox; require PublicInbox::Admin; require PublicInbox::InboxWritable; - my ($op, $ops) = $lei->workers_start($self, 'lei_mirror', 1, { - '' => [ \&mirror_done, $lei ] - }); + my ($op, $ops) = $lei->workers_start($self, 'lei_mirror', 1); $lei->{mrr} = $self; $self->wq_io_do('do_mirror', []); $self->wq_close(1); diff --git a/lib/PublicInbox/LeiP2q.pm b/lib/PublicInbox/LeiP2q.pm index cb2309c7..3248afd7 100644 --- a/lib/PublicInbox/LeiP2q.pm +++ b/lib/PublicInbox/LeiP2q.pm @@ -188,7 +188,7 @@ sub lei_p2q { # the "lei patch-to-query" entry point } else { $self->{input} = $input; } - my ($op, $ops) = $lei->workers_start($self, 'lei_p2q', 1); + my ($op, $ops) = $lei->workers_start($self, 'lei-p2q', 1); $lei->{p2q} = $self; $self->wq_io_do('do_p2q', []); $self->wq_close(1); diff --git a/lib/PublicInbox/LeiTag.pm b/lib/PublicInbox/LeiTag.pm index f5791947..3cda2eca 100644 --- a/lib/PublicInbox/LeiTag.pm +++ b/lib/PublicInbox/LeiTag.pm @@ -19,9 +19,9 @@ sub input_eml_cb { # used by PublicInbox::LeiInput::input_fh sub input_mbox_cb { input_eml_cb($_[1], $_[0]) } -sub tag_done { # EOF callback for main daemon +sub _lei_wq_eof { # EOF callback for main daemon my ($lei) = @_; - my $tag = delete $lei->{tag} or return; + my $tag = delete $lei->{tag} // return $lei->dclose; $tag->wq_wait_old($lei->can('wq_done_wait'), $lei, 'non-fatal'); } @@ -52,11 +52,11 @@ sub lei_tag { # the "lei tag" method $self->prepare_inputs($lei, \@argv) or return; grep(defined, @$vmd_mod{qw(+kw +L -L -kw)}) or return $lei->fail('no keywords or labels specified'); - my $ops = { '' => [ \&tag_done, $lei ] }; + my $ops = {}; $lei->{auth}->op_merge($ops, $self) if $lei->{auth}; $self->{vmd_mod} = $vmd_mod; my $j = $self->{-wq_nr_workers} = 1; # locked for now - (my $op_c, $ops) = $lei->workers_start($self, 'lei_tag', $j, $ops); + (my $op_c, $ops) = $lei->workers_start($self, 'lei-tag', $j, $ops); $lei->{tag} = $self; net_merge_complete($self) unless $lei->{auth}; $op_c->op_wait_event($ops);