From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 213F01FA00 for ; Thu, 18 Feb 2021 11:06:48 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH (resend) 3/4] lei: consolidate the bulk of the IPC code Date: Thu, 18 Feb 2021 02:06:46 -0900 Message-Id: <20210218110647.9822-4-e@80x24.org> In-Reply-To: <20210218110647.9822-1-e@80x24.org> References: <20210218110647.9822-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: The backends for "lei add-external --mirror", "lei convert", and "lei import" all share a similar pattern for spawning background workers. Hoist out the common parts to slim down our code base a bit. The LeiXSearch and LeiToMail workers for "lei q" remains a the odd duck due to the deep pipelining and parallelization. --- lib/PublicInbox/LEI.pm | 19 +++++++++++++++++++ lib/PublicInbox/LeiAuth.pm | 17 +++-------------- lib/PublicInbox/LeiConvert.pm | 22 +++++----------------- lib/PublicInbox/LeiImport.pm | 19 ++++--------------- lib/PublicInbox/LeiMirror.pm | 19 ++++--------------- 5 files changed, 35 insertions(+), 61 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 1e4c36d0..0b4bc20e 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -468,6 +468,25 @@ sub lei_atfork_child { $current_lei = $persist ? undef : $self; # for SIG{__WARN__} } +sub workers_start { + my ($lei, $wq, $ident, $jobs, $ops) = @_; + $ops = { + '!' => [ $lei->can('fail_handler'), $lei ], + '|' => [ $lei->can('sigpipe_handler'), $lei ], + 'x_it' => [ $lei->can('x_it'), $lei ], + 'child_error' => [ $lei->can('child_error'), $lei ], + %$ops + }; + require PublicInbox::PktOp; + ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); + $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei }); + delete $lei->{pkt_op_p}; + my $op = delete $lei->{pkt_op_c}; + $lei->event_step_init; + # oneshot needs $op, daemon-mode uses DS->EventLoop to handle $op + $lei->{oneshot} ? $op : undef; +} + sub _help { require PublicInbox::LeiHelp; PublicInbox::LeiHelp::call($_[0], $_[1], \%CMD, \%OPTDESC); diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm index 88310874..7210af99 100644 --- a/lib/PublicInbox/LeiAuth.pm +++ b/lib/PublicInbox/LeiAuth.pm @@ -42,24 +42,13 @@ sub auth_eof { sub auth_start { my ($self, $lei, $post_auth_cb, @args) = @_; - my $ops = { - '!' => [ $lei->can('fail_handler'), $lei ], - '|' => [ $lei->can('sigpipe_handler'), $lei ], - 'x_it' => [ $lei->can('x_it'), $lei ], - 'child_error' => [ $lei->can('child_error'), $lei ], + my $op = $lei->workers_start($self, 'auth', 1, { 'nrd_merge' => [ \&nrd_merge, $lei ], '' => [ \&auth_eof, $lei, $post_auth_cb, @args ], - }; - ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); - $self->wq_workers_start('lei_auth', 1, $lei->oldset, {lei => $lei}); - my $op = delete $lei->{pkt_op_c}; - delete $lei->{pkt_op_p}; + }); $self->wq_io_do('do_auth', []); $self->wq_close(1); - $lei->event_step_init; # wait for shutdowns - if ($lei->{oneshot}) { - while ($op->{sock}) { $op->event_step } - } + while ($op && $op->{sock}) { $op->event_step } } sub ipc_atfork_child { diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm index 78fd5e17..ba375772 100644 --- a/lib/PublicInbox/LeiConvert.pm +++ b/lib/PublicInbox/LeiConvert.pm @@ -8,7 +8,6 @@ use v5.10.1; use parent qw(PublicInbox::IPC); use PublicInbox::Eml; use PublicInbox::InboxWritable qw(eml_from_path); -use PublicInbox::PktOp; use PublicInbox::LeiStore; use PublicInbox::LeiOverview; @@ -59,26 +58,15 @@ sub do_convert { # via wq_do delete $self->{wcb}; # commit } -sub convert_start { +sub convert_start { # LeiAuth->auth_start callback my ($lei) = @_; - my $ops = { - '!' => [ $lei->can('fail_handler'), $lei ], - '|' => [ $lei->can('sigpipe_handler'), $lei ], - 'x_it' => [ $lei->can('x_it'), $lei ], - 'child_error' => [ $lei->can('child_error'), $lei ], - '' => [ $lei->can('dclose'), $lei ], - }; - ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); my $self = $lei->{cnv}; - $self->wq_workers_start('lei_convert', 1, $lei->oldset, {lei => $lei}); - my $op = delete $lei->{pkt_op_c}; - delete $lei->{pkt_op_p}; + my $op = $lei->workers_start($self, 'lei_convert', 1, { + '' => [ $lei->can('dclose'), $lei ] + }); $self->wq_io_do('do_convert', []); $self->wq_close(1); - $lei->event_step_init; # wait for shutdowns - if ($lei->{oneshot}) { - while ($op->{sock}) { $op->event_step } - } + while ($op && $op->{sock}) { $op->event_step } } sub call { # the main "lei convert" method diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index 62a2a412..68cab12c 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -8,7 +8,6 @@ use v5.10.1; use parent qw(PublicInbox::IPC); use PublicInbox::Eml; use PublicInbox::InboxWritable qw(eml_from_path); -use PublicInbox::PktOp; sub _import_eml { # MboxReader callback my ($eml, $sto, $set_kw) = @_; @@ -31,13 +30,6 @@ sub import_done { # EOF callback for main daemon sub import_start { my ($lei) = @_; - my $ops = { - '!' => [ $lei->can('fail_handler'), $lei ], - 'x_it' => [ $lei->can('x_it'), $lei ], - 'child_error' => [ $lei->can('child_error'), $lei ], - '' => [ \&import_done, $lei ], - }; - ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); my $self = $lei->{imp}; my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1; if (my $nrd = $lei->{nrd}) { @@ -46,18 +38,15 @@ sub import_start { my $nproc = $self->detect_nproc; $j = $nproc if $j > $nproc; } - $self->wq_workers_start('lei_import', $j, $lei->oldset, {lei => $lei}); - my $op = delete $lei->{pkt_op_c}; - delete $lei->{pkt_op_p}; + my $op = $lei->workers_start($self, 'lei_import', $j, { + '' => [ \&import_done, $lei ], + }); $self->wq_io_do('import_stdin', []) if $self->{0}; for my $input (@{$self->{inputs}}) { $self->wq_io_do('import_path_url', [], $input); } $self->wq_close(1); - $lei->event_step_init; # wait for shutdowns - if ($lei->{oneshot}) { - while ($op->{sock}) { $op->event_step } - } + while ($op && $op->{sock}) { $op->event_step } } sub call { # the main "lei import" method diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm index c5153148..f8ca1ee5 100644 --- a/lib/PublicInbox/LeiMirror.pm +++ b/lib/PublicInbox/LeiMirror.pm @@ -8,7 +8,6 @@ use v5.10.1; use parent qw(PublicInbox::IPC); use IO::Uncompress::Gunzip qw(gunzip $GunzipError); use PublicInbox::Spawn qw(popen_rd spawn); -use PublicInbox::PktOp; sub do_finish_mirror { # dwaitpid callback my ($arg, $pid) = @_; @@ -279,22 +278,12 @@ sub start { require PublicInbox::Inbox; require PublicInbox::Admin; require PublicInbox::InboxWritable; - my $ops = { - '!' => [ $lei->can('fail_handler'), $lei ], - 'x_it' => [ $lei->can('x_it'), $lei ], - 'child_error' => [ $lei->can('child_error'), $lei ], - '' => [ \&mirror_done, $lei ], - }; - ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); - $self->wq_workers_start('lei_mirror', 1, $lei->oldset, {lei => $lei}); - my $op = delete $lei->{pkt_op_c}; - delete $lei->{pkt_op_p}; + my $op = $lei->workers_start($self, 'lei_mirror', 1, { + '' => [ \&mirror_done, $lei ] + }); $self->wq_io_do('do_mirror', []); $self->wq_close(1); - $lei->event_step_init; # wait for shutdowns - if ($lei->{oneshot}) { - while ($op->{sock}) { $op->event_step } - } + while ($op && $op->{sock}) { $op->event_step } } sub ipc_atfork_child {