From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 2E4721F9FD for ; Sun, 28 Mar 2021 09:01:25 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 01/12] lei: simplify PktOp callers Date: Sun, 28 Mar 2021 09:01:13 +0000 Message-Id: <20210328090124.3541-2-e@80x24.org> In-Reply-To: <20210328090124.3541-1-e@80x24.org> References: <20210328090124.3541-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Provide a consistent ->op_wait_event method instead of forcing callers to loop (or not) at each callsite. This also avoid a leak possibility by avoiding circular references. --- lib/PublicInbox/LEI.pm | 11 +++++------ lib/PublicInbox/LeiBlob.pm | 4 ++-- lib/PublicInbox/LeiConvert.pm | 4 ++-- lib/PublicInbox/LeiImport.pm | 4 ++-- lib/PublicInbox/LeiMark.pm | 4 ++-- lib/PublicInbox/LeiMirror.pm | 4 ++-- lib/PublicInbox/LeiP2q.pm | 4 ++-- lib/PublicInbox/LeiXSearch.pm | 8 +++----- lib/PublicInbox/PktOp.pm | 20 +++++++++++++++----- 9 files changed, 35 insertions(+), 28 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 478912cd..9cacb142 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -494,11 +494,11 @@ sub _delete_pkt_op { # OnDestroy callback to prevent leaks on die } sub pkt_op_pair { - my ($self, $ops) = @_; + my ($self) = @_; require PublicInbox::OnDestroy; require PublicInbox::PktOp; my $end = PublicInbox::OnDestroy->new($$, \&_delete_pkt_op, $self); - @$self{qw(pkt_op_c pkt_op_p)} = PublicInbox::PktOp->pair($ops); + @$self{qw(pkt_op_c pkt_op_p)} = PublicInbox::PktOp->pair; $end; } @@ -512,14 +512,13 @@ sub workers_start { ($ops ? %$ops : ()), }; $ops->{''} //= [ \&dclose, $lei ]; - my $end = $lei->pkt_op_pair($ops); + my $end = $lei->pkt_op_pair; $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei }); delete $lei->{pkt_op_p}; - my $op = delete $lei->{pkt_op_c}; + my $op_c = delete $lei->{pkt_op_c}; @$end = (); $lei->event_step_init; - # oneshot needs $op, daemon-mode uses DS->EventLoop to handle $op - $lei->{oneshot} ? $op : undef; + ($op_c, $ops); } sub _help { diff --git a/lib/PublicInbox/LeiBlob.pm b/lib/PublicInbox/LeiBlob.pm index 2facbad3..97747220 100644 --- a/lib/PublicInbox/LeiBlob.pm +++ b/lib/PublicInbox/LeiBlob.pm @@ -103,12 +103,12 @@ sub lei_blob { my $lxs = $lei->lxs_prepare or return; require PublicInbox::SolverGit; my $self = bless { lxs => $lxs, oid_b => $blob }, __PACKAGE__; - my $op = $lei->workers_start($self, 'lei_solve', 1, + my ($op_c, $ops) = $lei->workers_start($self, 'lei_solve', 1, { '' => [ \&sol_done, $lei ] }); $lei->{sol} = $self; $self->wq_io_do('do_solve_blob', []); $self->wq_close(1); - while ($op && $op->{sock}) { $op->event_step } + $op_c->op_wait_event($ops); } sub ipc_atfork_child { diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm index 083ecc33..5d0adb14 100644 --- a/lib/PublicInbox/LeiConvert.pm +++ b/lib/PublicInbox/LeiConvert.pm @@ -53,11 +53,11 @@ sub lei_convert { # the main "lei convert" method my $devfd = $lei->path_to_fd($ovv->{dst}) // return; $lei->{opt}->{augment} = 1 if $devfd < 0; $self->prepare_inputs($lei, \@inputs) or return; - my $op = $lei->workers_start($self, 'lei_convert', 1); + my ($op_c, $ops) = $lei->workers_start($self, 'lei_convert', 1); $lei->{cnv} = $self; $self->wq_io_do('do_convert', []); $self->wq_close(1); - while ($op && $op->{sock}) { $op->event_step } + $op_c->op_wait_event($ops); } sub ipc_atfork_child { diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index 7c5b7d09..803b5cda 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -76,11 +76,11 @@ sub lei_import { # the main "lei import" method my $ops = { '' => [ \&import_done, $lei ] }; $lei->{auth}->op_merge($ops, $self) if $lei->{auth}; $self->{-wq_nr_workers} = $j // 1; # locked - my $op = $lei->workers_start($self, 'lei_import', undef, $ops); + my ($op_c, undef) = $lei->workers_start($self, 'lei_import', $j, $ops); $lei->{imp} = $self; $self->wq_io_do('input_stdin', []) if $self->{0}; net_merge_complete($self) unless $lei->{auth}; - while ($op && $op->{sock}) { $op->event_step } + $op_c->op_wait_event($ops); } no warnings 'once'; diff --git a/lib/PublicInbox/LeiMark.pm b/lib/PublicInbox/LeiMark.pm index 34846b84..6e611318 100644 --- a/lib/PublicInbox/LeiMark.pm +++ b/lib/PublicInbox/LeiMark.pm @@ -116,11 +116,11 @@ sub lei_mark { # the "lei mark" method my $ops = { '' => [ \&mark_done, $lei ] }; $lei->{auth}->op_merge($ops, $self) if $lei->{auth}; $self->{vmd_mod} = $vmd_mod; - my $op = $lei->workers_start($self, 'lei_mark', 1, $ops); + my ($op_c, undef) = $lei->workers_start($self, 'lei_mark', 1, $ops); $lei->{mark} = $self; $self->wq_io_do('input_stdin', []) if $self->{0}; net_merge_complete($self) unless $lei->{auth}; - while ($op && $op->{sock}) { $op->event_step } + $op_c->op_wait_event($ops); } sub note_missing { diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm index c83386c6..89574d28 100644 --- a/lib/PublicInbox/LeiMirror.pm +++ b/lib/PublicInbox/LeiMirror.pm @@ -282,13 +282,13 @@ sub start { require PublicInbox::Inbox; require PublicInbox::Admin; require PublicInbox::InboxWritable; - my $op = $lei->workers_start($self, 'lei_mirror', 1, { + my ($op, $ops) = $lei->workers_start($self, 'lei_mirror', 1, { '' => [ \&mirror_done, $lei ] }); $lei->{mrr} = $self; $self->wq_io_do('do_mirror', []); $self->wq_close(1); - while ($op && $op->{sock}) { $op->event_step } + $op->op_wait_event($ops); } sub ipc_atfork_child { diff --git a/lib/PublicInbox/LeiP2q.pm b/lib/PublicInbox/LeiP2q.pm index 25f63a10..a8a3dd2c 100644 --- a/lib/PublicInbox/LeiP2q.pm +++ b/lib/PublicInbox/LeiP2q.pm @@ -185,11 +185,11 @@ sub lei_p2q { # the "lei patch-to-query" entry point } else { $self->{input} = $input; } - my $op = $lei->workers_start($self, 'lei_p2q', 1); + my ($op, $ops) = $lei->workers_start($self, 'lei_p2q', 1); $lei->{p2q} = $self; $self->wq_io_do('do_p2q', []); $self->wq_close(1); - while ($op && $op->{sock}) { $op->event_step } + $op->op_wait_event($ops); } sub ipc_atfork_child { diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index b41daffe..1a194f1c 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -427,7 +427,7 @@ sub do_query { 'incr_start_query' => [ \&incr_start_query, $self, $l2m ], }; $lei->{auth}->op_merge($ops, $l2m) if $l2m && $lei->{auth}; - my $end = $lei->pkt_op_pair($ops); + my $end = $lei->pkt_op_pair; $lei->{1}->autoflush(1); $lei->start_pager if delete $lei->{need_pager}; $lei->{ovv}->ovv_begin($lei); @@ -445,7 +445,7 @@ sub do_query { } $self->wq_workers_start('lei_xsearch', undef, $lei->oldset, { lei => $lei }); - my $op = delete $lei->{pkt_op_c}; + my $op_c = delete $lei->{pkt_op_c}; delete $lei->{pkt_op_p}; @$end = (); $self->{threads} = $lei->{opt}->{threads}; @@ -455,9 +455,7 @@ sub do_query { start_query($self); } $lei->event_step_init; # wait for shutdowns - if ($lei->{oneshot}) { - while ($op->{sock}) { $op->event_step } - } + $op_c->op_wait_event($ops); } sub add_uri { diff --git a/lib/PublicInbox/PktOp.pm b/lib/PublicInbox/PktOp.pm index 5d8e78ea..c3221735 100644 --- a/lib/PublicInbox/PktOp.pm +++ b/lib/PublicInbox/PktOp.pm @@ -16,21 +16,23 @@ use PublicInbox::IPC qw(ipc_freeze ipc_thaw); our @EXPORT_OK = qw(pkt_do); sub new { - my ($cls, $r, $ops) = @_; - my $self = bless { sock => $r, ops => $ops }, $cls; + my ($cls, $r) = @_; + my $self = bless { sock => $r }, $cls; if ($PublicInbox::DS::in_loop) { # iff using DS->EventLoop $r->blocking(0); $self->SUPER::new($r, EPOLLIN|EPOLLET); + } else { + $self->{blocking} = 1; } $self; } # returns a blessed object as the consumer, and a GLOB/IO for the producer sub pair { - my ($cls, $ops) = @_; + my ($cls) = @_; my ($c, $p); socketpair($c, $p, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!"; - (new($cls, $c, $ops), $p); + (new($cls, $c), $p); } sub pkt_do { # for the producer to trigger event_step in consumer @@ -41,7 +43,7 @@ sub pkt_do { # for the producer to trigger event_step in consumer sub close { my ($self) = @_; my $c = $self->{sock} or return; - $c->blocking ? delete($self->{sock}) : $self->SUPER::close; + $self->{blocking} ? delete($self->{sock}) : $self->SUPER::close; } sub event_step { @@ -73,4 +75,12 @@ sub event_step { } } +# call this when we're ready to wait on events, +# returns immediately if non-blocking +sub op_wait_event { + my ($self, $ops) = @_; + $self->{ops} = $ops; + while ($self->{blocking} && $self->{sock}) { event_step($self) } +} + 1;