From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 01/12] lei: simplify PktOp callers
Date: Sun, 28 Mar 2021 09:01:13 +0000 [thread overview]
Message-ID: <20210328090124.3541-2-e@80x24.org> (raw)
In-Reply-To: <20210328090124.3541-1-e@80x24.org>
Provide a consistent ->op_wait_event method instead of
forcing callers to loop (or not) at each callsite.
This also avoid a leak possibility by avoiding circular
references.
---
lib/PublicInbox/LEI.pm | 11 +++++------
lib/PublicInbox/LeiBlob.pm | 4 ++--
lib/PublicInbox/LeiConvert.pm | 4 ++--
lib/PublicInbox/LeiImport.pm | 4 ++--
lib/PublicInbox/LeiMark.pm | 4 ++--
lib/PublicInbox/LeiMirror.pm | 4 ++--
lib/PublicInbox/LeiP2q.pm | 4 ++--
lib/PublicInbox/LeiXSearch.pm | 8 +++-----
lib/PublicInbox/PktOp.pm | 20 +++++++++++++++-----
9 files changed, 35 insertions(+), 28 deletions(-)
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 478912cd..9cacb142 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -494,11 +494,11 @@ sub _delete_pkt_op { # OnDestroy callback to prevent leaks on die
}
sub pkt_op_pair {
- my ($self, $ops) = @_;
+ my ($self) = @_;
require PublicInbox::OnDestroy;
require PublicInbox::PktOp;
my $end = PublicInbox::OnDestroy->new($$, \&_delete_pkt_op, $self);
- @$self{qw(pkt_op_c pkt_op_p)} = PublicInbox::PktOp->pair($ops);
+ @$self{qw(pkt_op_c pkt_op_p)} = PublicInbox::PktOp->pair;
$end;
}
@@ -512,14 +512,13 @@ sub workers_start {
($ops ? %$ops : ()),
};
$ops->{''} //= [ \&dclose, $lei ];
- my $end = $lei->pkt_op_pair($ops);
+ my $end = $lei->pkt_op_pair;
$wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
delete $lei->{pkt_op_p};
- my $op = delete $lei->{pkt_op_c};
+ my $op_c = delete $lei->{pkt_op_c};
@$end = ();
$lei->event_step_init;
- # oneshot needs $op, daemon-mode uses DS->EventLoop to handle $op
- $lei->{oneshot} ? $op : undef;
+ ($op_c, $ops);
}
sub _help {
diff --git a/lib/PublicInbox/LeiBlob.pm b/lib/PublicInbox/LeiBlob.pm
index 2facbad3..97747220 100644
--- a/lib/PublicInbox/LeiBlob.pm
+++ b/lib/PublicInbox/LeiBlob.pm
@@ -103,12 +103,12 @@ sub lei_blob {
my $lxs = $lei->lxs_prepare or return;
require PublicInbox::SolverGit;
my $self = bless { lxs => $lxs, oid_b => $blob }, __PACKAGE__;
- my $op = $lei->workers_start($self, 'lei_solve', 1,
+ my ($op_c, $ops) = $lei->workers_start($self, 'lei_solve', 1,
{ '' => [ \&sol_done, $lei ] });
$lei->{sol} = $self;
$self->wq_io_do('do_solve_blob', []);
$self->wq_close(1);
- while ($op && $op->{sock}) { $op->event_step }
+ $op_c->op_wait_event($ops);
}
sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm
index 083ecc33..5d0adb14 100644
--- a/lib/PublicInbox/LeiConvert.pm
+++ b/lib/PublicInbox/LeiConvert.pm
@@ -53,11 +53,11 @@ sub lei_convert { # the main "lei convert" method
my $devfd = $lei->path_to_fd($ovv->{dst}) // return;
$lei->{opt}->{augment} = 1 if $devfd < 0;
$self->prepare_inputs($lei, \@inputs) or return;
- my $op = $lei->workers_start($self, 'lei_convert', 1);
+ my ($op_c, $ops) = $lei->workers_start($self, 'lei_convert', 1);
$lei->{cnv} = $self;
$self->wq_io_do('do_convert', []);
$self->wq_close(1);
- while ($op && $op->{sock}) { $op->event_step }
+ $op_c->op_wait_event($ops);
}
sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 7c5b7d09..803b5cda 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -76,11 +76,11 @@ sub lei_import { # the main "lei import" method
my $ops = { '' => [ \&import_done, $lei ] };
$lei->{auth}->op_merge($ops, $self) if $lei->{auth};
$self->{-wq_nr_workers} = $j // 1; # locked
- my $op = $lei->workers_start($self, 'lei_import', undef, $ops);
+ my ($op_c, undef) = $lei->workers_start($self, 'lei_import', $j, $ops);
$lei->{imp} = $self;
$self->wq_io_do('input_stdin', []) if $self->{0};
net_merge_complete($self) unless $lei->{auth};
- while ($op && $op->{sock}) { $op->event_step }
+ $op_c->op_wait_event($ops);
}
no warnings 'once';
diff --git a/lib/PublicInbox/LeiMark.pm b/lib/PublicInbox/LeiMark.pm
index 34846b84..6e611318 100644
--- a/lib/PublicInbox/LeiMark.pm
+++ b/lib/PublicInbox/LeiMark.pm
@@ -116,11 +116,11 @@ sub lei_mark { # the "lei mark" method
my $ops = { '' => [ \&mark_done, $lei ] };
$lei->{auth}->op_merge($ops, $self) if $lei->{auth};
$self->{vmd_mod} = $vmd_mod;
- my $op = $lei->workers_start($self, 'lei_mark', 1, $ops);
+ my ($op_c, undef) = $lei->workers_start($self, 'lei_mark', 1, $ops);
$lei->{mark} = $self;
$self->wq_io_do('input_stdin', []) if $self->{0};
net_merge_complete($self) unless $lei->{auth};
- while ($op && $op->{sock}) { $op->event_step }
+ $op_c->op_wait_event($ops);
}
sub note_missing {
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index c83386c6..89574d28 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -282,13 +282,13 @@ sub start {
require PublicInbox::Inbox;
require PublicInbox::Admin;
require PublicInbox::InboxWritable;
- my $op = $lei->workers_start($self, 'lei_mirror', 1, {
+ my ($op, $ops) = $lei->workers_start($self, 'lei_mirror', 1, {
'' => [ \&mirror_done, $lei ]
});
$lei->{mrr} = $self;
$self->wq_io_do('do_mirror', []);
$self->wq_close(1);
- while ($op && $op->{sock}) { $op->event_step }
+ $op->op_wait_event($ops);
}
sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiP2q.pm b/lib/PublicInbox/LeiP2q.pm
index 25f63a10..a8a3dd2c 100644
--- a/lib/PublicInbox/LeiP2q.pm
+++ b/lib/PublicInbox/LeiP2q.pm
@@ -185,11 +185,11 @@ sub lei_p2q { # the "lei patch-to-query" entry point
} else {
$self->{input} = $input;
}
- my $op = $lei->workers_start($self, 'lei_p2q', 1);
+ my ($op, $ops) = $lei->workers_start($self, 'lei_p2q', 1);
$lei->{p2q} = $self;
$self->wq_io_do('do_p2q', []);
$self->wq_close(1);
- while ($op && $op->{sock}) { $op->event_step }
+ $op->op_wait_event($ops);
}
sub ipc_atfork_child {
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index b41daffe..1a194f1c 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -427,7 +427,7 @@ sub do_query {
'incr_start_query' => [ \&incr_start_query, $self, $l2m ],
};
$lei->{auth}->op_merge($ops, $l2m) if $l2m && $lei->{auth};
- my $end = $lei->pkt_op_pair($ops);
+ my $end = $lei->pkt_op_pair;
$lei->{1}->autoflush(1);
$lei->start_pager if delete $lei->{need_pager};
$lei->{ovv}->ovv_begin($lei);
@@ -445,7 +445,7 @@ sub do_query {
}
$self->wq_workers_start('lei_xsearch', undef,
$lei->oldset, { lei => $lei });
- my $op = delete $lei->{pkt_op_c};
+ my $op_c = delete $lei->{pkt_op_c};
delete $lei->{pkt_op_p};
@$end = ();
$self->{threads} = $lei->{opt}->{threads};
@@ -455,9 +455,7 @@ sub do_query {
start_query($self);
}
$lei->event_step_init; # wait for shutdowns
- if ($lei->{oneshot}) {
- while ($op->{sock}) { $op->event_step }
- }
+ $op_c->op_wait_event($ops);
}
sub add_uri {
diff --git a/lib/PublicInbox/PktOp.pm b/lib/PublicInbox/PktOp.pm
index 5d8e78ea..c3221735 100644
--- a/lib/PublicInbox/PktOp.pm
+++ b/lib/PublicInbox/PktOp.pm
@@ -16,21 +16,23 @@ use PublicInbox::IPC qw(ipc_freeze ipc_thaw);
our @EXPORT_OK = qw(pkt_do);
sub new {
- my ($cls, $r, $ops) = @_;
- my $self = bless { sock => $r, ops => $ops }, $cls;
+ my ($cls, $r) = @_;
+ my $self = bless { sock => $r }, $cls;
if ($PublicInbox::DS::in_loop) { # iff using DS->EventLoop
$r->blocking(0);
$self->SUPER::new($r, EPOLLIN|EPOLLET);
+ } else {
+ $self->{blocking} = 1;
}
$self;
}
# returns a blessed object as the consumer, and a GLOB/IO for the producer
sub pair {
- my ($cls, $ops) = @_;
+ my ($cls) = @_;
my ($c, $p);
socketpair($c, $p, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!";
- (new($cls, $c, $ops), $p);
+ (new($cls, $c), $p);
}
sub pkt_do { # for the producer to trigger event_step in consumer
@@ -41,7 +43,7 @@ sub pkt_do { # for the producer to trigger event_step in consumer
sub close {
my ($self) = @_;
my $c = $self->{sock} or return;
- $c->blocking ? delete($self->{sock}) : $self->SUPER::close;
+ $self->{blocking} ? delete($self->{sock}) : $self->SUPER::close;
}
sub event_step {
@@ -73,4 +75,12 @@ sub event_step {
}
}
+# call this when we're ready to wait on events,
+# returns immediately if non-blocking
+sub op_wait_event {
+ my ($self, $ops) = @_;
+ $self->{ops} = $ops;
+ while ($self->{blocking} && $self->{sock}) { event_step($self) }
+}
+
1;
next prev parent reply other threads:[~2021-03-28 9:01 UTC|newest]
Thread overview: 14+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-03-28 9:01 [PATCH 00/12] lei blob and some yak-shaving Eric Wong
2021-03-28 9:01 ` Eric Wong [this message]
2021-03-28 9:01 ` [PATCH 02/12] lei init: split out into separate file Eric Wong
2021-03-28 9:01 ` [PATCH 03/12] lei blob: dclose if already failed Eric Wong
2021-03-28 9:01 ` [PATCH 04/12] lei blob: support --no-mail switch Eric Wong
2021-03-28 9:01 ` [PATCH 05/12] lei blob: fail early if no git dirs Eric Wong
2021-03-28 9:01 ` [PATCH 06/12] lei blob: some extra tests Eric Wong
2021-03-28 9:01 ` [PATCH 07/12] lei help: show "NAME=VALUE" properly for -c Eric Wong
2021-03-28 9:01 ` [PATCH 08/12] lei blob: flesh out help text Eric Wong
2021-03-28 9:01 ` [PATCH 09/12] t/lei_store: ensure LeiSearch responds to ->isrch Eric Wong
2021-03-28 9:01 ` [PATCH 10/12] lei blob: add remote external support Eric Wong
2021-03-28 9:01 ` [PATCH 11/12] lei: drop coderepo placeholders, submodule TODO Eric Wong
2021-03-28 9:31 ` Eric Wong
2021-03-28 9:01 ` [PATCH 12/12] treewide: shorten temporary filename Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://public-inbox.org/README
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20210328090124.3541-2-e@80x24.org \
--to=e@80x24.org \
--cc=meta@public-inbox.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).