From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 2D7931F9F3 for ; Tue, 8 Jun 2021 09:50:22 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 2/3] lei: generalize auxiliary WQ handling Date: Tue, 8 Jun 2021 09:50:20 +0000 Message-Id: <20210608095021.20034-3-e@80x24.org> In-Reply-To: <20210608095021.20034-1-e@80x24.org> References: <20210608095021.20034-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: op_wait_event is now more lei-specific since we no longer have to care about oneshot and use a synchronous loop. {ikw} (import-keywords) started a trend, but LeiPmdir (parallel Maildir) is an upcoming WQ class that will follow this idea. Eventually, {l2m} usage may be updated to follow this, too. --- lib/PublicInbox/LEI.pm | 9 +++++++++ lib/PublicInbox/LeiBlob.pm | 2 +- lib/PublicInbox/LeiConvert.pm | 2 +- lib/PublicInbox/LeiExportKw.pm | 2 +- lib/PublicInbox/LeiImport.pm | 3 +-- lib/PublicInbox/LeiLsSearch.pm | 2 +- lib/PublicInbox/LeiMirror.pm | 4 ++-- lib/PublicInbox/LeiP2q.pm | 4 ++-- lib/PublicInbox/LeiRediff.pm | 2 +- lib/PublicInbox/LeiRm.pm | 2 +- lib/PublicInbox/LeiTag.pm | 2 +- lib/PublicInbox/LeiXSearch.pm | 2 +- lib/PublicInbox/PktOp.pm | 6 ------ 13 files changed, 22 insertions(+), 20 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 0cf4d10b..ed01e8de 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -587,6 +587,15 @@ sub workers_start { ($op_c, $ops); } +# call this when we're ready to wait on events and yield to other clients +sub wait_wq_events { + my ($lei, $op_c, $ops) = @_; + for my $wq (grep(defined, @$lei{qw(ikw)})) { # auxiliary WQs + $wq->wq_close(1); + } + $op_c->{ops} = $ops; +} + sub _help { require PublicInbox::LeiHelp; PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC); diff --git a/lib/PublicInbox/LeiBlob.pm b/lib/PublicInbox/LeiBlob.pm index 8de86565..09217964 100644 --- a/lib/PublicInbox/LeiBlob.pm +++ b/lib/PublicInbox/LeiBlob.pm @@ -169,7 +169,7 @@ sub lei_blob { $lei->{wq1} = $self; $self->wq_io_do('do_solve_blob', []); $self->wq_close(1); - $op_c->op_wait_event($ops); + $lei->wait_wq_events($op_c, $ops); } sub ipc_atfork_child { diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm index 395a80f8..6550c242 100644 --- a/lib/PublicInbox/LeiConvert.pm +++ b/lib/PublicInbox/LeiConvert.pm @@ -59,7 +59,7 @@ sub lei_convert { # the main "lei convert" method $lei->{wq1} = $self; $self->wq_io_do('process_inputs', []); $self->wq_close(1); - $op_c->op_wait_event($ops); + $lei->wait_wq_events($op_c, $ops); } sub ipc_atfork_child { diff --git a/lib/PublicInbox/LeiExportKw.pm b/lib/PublicInbox/LeiExportKw.pm index b31b065f..f8579221 100644 --- a/lib/PublicInbox/LeiExportKw.pm +++ b/lib/PublicInbox/LeiExportKw.pm @@ -160,7 +160,7 @@ EOM $lei->{wq1} = $self; $lei->{-err_type} = 'non-fatal'; net_merge_all_done($self) unless $lei->{auth}; - $op_c->op_wait_event($ops); # calls net_merge_all_done if $lei->{auth} + $lei->wait_wq_events($op_c, $ops); # net_merge_all_done if !{auth} } sub _complete_export_kw { diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index 2efd4935..222f75c8 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -91,9 +91,8 @@ sub do_import_index ($$@) { (my $op_c, $ops) = $lei->workers_start($self, $j, $ops); $lei->{wq1} = $self; $lei->{-err_type} = 'non-fatal'; - $ikw->wq_close(1) if $ikw; net_merge_all_done($self) unless $lei->{auth}; - $op_c->op_wait_event($ops); + $lei->wait_wq_events($op_c, $ops); } sub lei_import { # the main "lei import" method diff --git a/lib/PublicInbox/LeiLsSearch.pm b/lib/PublicInbox/LeiLsSearch.pm index 6cea6ae8..70136135 100644 --- a/lib/PublicInbox/LeiLsSearch.pm +++ b/lib/PublicInbox/LeiLsSearch.pm @@ -76,7 +76,7 @@ sub bg_worker ($$$) { $lei->{wq1} = $self; $self->wq_io_do('do_ls_search_long', [], $pfx); $self->wq_close(1); - $op_c->op_wait_event($ops); + $lei->wait_wq_events($op_c, $ops); } sub lei_ls_search { diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm index a37e1d5c..39671f90 100644 --- a/lib/PublicInbox/LeiMirror.pm +++ b/lib/PublicInbox/LeiMirror.pm @@ -282,11 +282,11 @@ sub start { require PublicInbox::Inbox; require PublicInbox::Admin; require PublicInbox::InboxWritable; - my ($op, $ops) = $lei->workers_start($self, 1); + my ($op_c, $ops) = $lei->workers_start($self, 1); $lei->{wq1} = $self; $self->wq_io_do('do_mirror', []); $self->wq_close(1); - $op->op_wait_event($ops); + $lei->wait_wq_events($op_c, $ops); } sub ipc_atfork_child { diff --git a/lib/PublicInbox/LeiP2q.pm b/lib/PublicInbox/LeiP2q.pm index f381a31c..c0c4563d 100644 --- a/lib/PublicInbox/LeiP2q.pm +++ b/lib/PublicInbox/LeiP2q.pm @@ -188,11 +188,11 @@ sub lei_p2q { # the "lei patch-to-query" entry point } else { $self->{input} = $input; } - my ($op, $ops) = $lei->workers_start($self, 1); + my ($op_c, $ops) = $lei->workers_start($self, 1); $lei->{wq1} = $self; $self->wq_io_do('do_p2q', []); $self->wq_close(1); - $op->op_wait_event($ops); + $lei->wait_wq_events($op_c, $ops); } sub ipc_atfork_child { diff --git a/lib/PublicInbox/LeiRediff.pm b/lib/PublicInbox/LeiRediff.pm index c8bd0dfb..7607b44f 100644 --- a/lib/PublicInbox/LeiRediff.pm +++ b/lib/PublicInbox/LeiRediff.pm @@ -227,7 +227,7 @@ sub lei_rediff { my ($op_c, $ops) = $lei->workers_start($self, 1); $lei->{wq1} = $self; net_merge_all_done($self) unless $lei->{auth}; - $op_c->op_wait_event($ops); + $lei->wait_wq_events($op_c, $ops); } sub ipc_atfork_child { diff --git a/lib/PublicInbox/LeiRm.pm b/lib/PublicInbox/LeiRm.pm index c6d28045..578e9811 100644 --- a/lib/PublicInbox/LeiRm.pm +++ b/lib/PublicInbox/LeiRm.pm @@ -38,7 +38,7 @@ sub lei_rm { $lei->{wq1} = $self; $lei->{-err_type} = 'non-fatal'; net_merge_all_done($self) unless $lei->{auth}; - $op_c->op_wait_event($ops); + $lei->wait_wq_events($op_c, $ops); } no warnings 'once'; diff --git a/lib/PublicInbox/LeiTag.pm b/lib/PublicInbox/LeiTag.pm index b6abd533..4b3ce7d8 100644 --- a/lib/PublicInbox/LeiTag.pm +++ b/lib/PublicInbox/LeiTag.pm @@ -53,7 +53,7 @@ sub lei_tag { # the "lei tag" method $lei->{wq1} = $self; $lei->{-err_type} = 'non-fatal'; net_merge_all_done($self) unless $lei->{auth}; - $op_c->op_wait_event($ops); + $lei->wait_wq_events($op_c, $ops); } sub note_missing { diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 75e55d47..beb955bb 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -532,7 +532,7 @@ sub do_query { start_query($self); } $lei->event_step_init; # wait for shutdowns - $op_c->op_wait_event($ops); + $lei->wait_wq_events($op_c, $ops); } sub add_uri { diff --git a/lib/PublicInbox/PktOp.pm b/lib/PublicInbox/PktOp.pm index ca098d3c..92e150a4 100644 --- a/lib/PublicInbox/PktOp.pm +++ b/lib/PublicInbox/PktOp.pm @@ -63,10 +63,4 @@ sub event_step { } } -# call this when we're ready to wait on events -sub op_wait_event { - my ($self, $ops) = @_; - $self->{ops} = $ops; -} - 1;