From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 22BB71FB0F for ; Mon, 22 Feb 2021 11:23:00 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 07/10] lei q: reduce wasted IMAP connection for auth Date: Mon, 22 Feb 2021 08:22:56 -0300 Message-Id: <20210222112259.32402-7-e@80x24.org> In-Reply-To: <20210222112259.32402-1-e@80x24.org> References: <20210222112155.32231-1-e@80x24.org> <20210222112259.32402-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: We can rework the first lei2mail worker to authenticate, and then share auth info with the rest of the lei2mail workers. As with "lei import", this uses PktOp and lei-daemon to share updated credentials between the first an subsequent l2m workers. --- lib/PublicInbox/LeiAuth.pm | 37 ------------------------ lib/PublicInbox/LeiConvert.pm | 2 +- lib/PublicInbox/LeiQuery.pm | 9 ++---- lib/PublicInbox/LeiToMail.pm | 53 ++++++++++++++++++++++++----------- lib/PublicInbox/LeiXSearch.pm | 26 ++++++++++++----- 5 files changed, 59 insertions(+), 68 deletions(-) diff --git a/lib/PublicInbox/LeiAuth.pm b/lib/PublicInbox/LeiAuth.pm index d329eadb..b4777114 100644 --- a/lib/PublicInbox/LeiAuth.pm +++ b/lib/PublicInbox/LeiAuth.pm @@ -20,13 +20,6 @@ sub net_merge { } } -sub do_auth { # called via wq_io_do - my ($self) = @_; - my ($lei, $net) = @$self{qw(lei net)}; - $net->imap_common_init($lei); - net_merge($lei, $net); # tell lei-daemon updated auth info -} - sub do_auth_atfork { # used by IPC WQ workers my ($self, $wq) = @_; return if $wq->{-wq_worker_nr} != 0; @@ -63,36 +56,6 @@ sub op_merge { # prepares PktOp->pair ops $ops->{net_merge_done1} = [ \&net_merge_done1, $wq ]; } -sub do_finish_auth { # dwaitpid callback - my ($arg, $pid) = @_; - my ($self, $lei, $post_auth_cb, @args) = @$arg; - $? ? $lei->dclose : $post_auth_cb->(@args); -} - -sub auth_eof { - my ($lei, $post_auth_cb, @args) = @_; - my $self = delete $lei->{auth} or return; - $self->wq_wait_old(\&do_finish_auth, $lei, $post_auth_cb, @args); -} - -sub auth_start { - my ($self, $lei, $post_auth_cb, @args) = @_; - my $op = $lei->workers_start($self, 'auth', 1, { - 'net_merge' => [ \&net_merge, $lei ], - '' => [ \&auth_eof, $lei, $post_auth_cb, @args ], - }); - $self->wq_io_do('do_auth', []); - $self->wq_close(1); - while ($op && $op->{sock}) { $op->event_step } -} - -sub ipc_atfork_child { - my ($self) = @_; - delete $self->{lei}->{auth}; # drop circular ref - $self->{lei}->lei_atfork_child; - $self->SUPER::ipc_atfork_child; -} - sub new { my ($cls, $net) = @_; # net may be NetReader or descendant (NetWriter) bless { net => $net }, $cls; diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm index 3a714502..b45de4e0 100644 --- a/lib/PublicInbox/LeiConvert.pm +++ b/lib/PublicInbox/LeiConvert.pm @@ -62,7 +62,7 @@ sub do_convert { # via wq_do delete $self->{wcb}; # commit } -sub convert_start { # LeiAuth->auth_start callback +sub convert_start { my ($lei) = @_; my $self = $lei->{cnv}; my $op = $lei->workers_start($self, 'lei_convert', 1, { diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index 398f834f..64c9394c 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -13,14 +13,11 @@ sub prep_ext { # externals_each callback sub _start_query { my ($self) = @_; - if (my $nwr = $self->{nwr}) { + if (my $net = $self->{net}) { require PublicInbox::LeiAuth; - my $auth = $self->{auth} = PublicInbox::LeiAuth->new($nwr); - my $lxs = $self->{lxs}; - $auth->auth_start($self, $lxs->can('do_query'), $lxs, $self); - } else { - $self->{lxs}->do_query($self); + $self->{auth} = PublicInbox::LeiAuth->new($net); } + $self->{lxs}->do_query($self); } sub qstr_add { # PublicInbox::InputPipe::consume callback for --stdin diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 6efd398a..df813064 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -345,8 +345,8 @@ sub _imap_write_cb ($$) { my ($self, $lei) = @_; my $dedupe = $lei->{dedupe}; $dedupe->prepare_dedupe if $dedupe; - my $imap_append = $lei->{nwr}->can('imap_append'); - my $mic = $lei->{nwr}->mic_get($self->{uri}); + my $imap_append = $lei->{net}->can('imap_append'); + my $mic = $lei->{net}->mic_get($self->{uri}); my $folder = $self->{uri}->mailbox; sub { # for git_to_mail my ($bref, $smsg, $eml) = @_; @@ -394,15 +394,15 @@ sub new { $self->{base_type} = 'mbox'; } elsif ($fmt =~ /\Aimaps?\z/) { # TODO .onion support require PublicInbox::NetWriter; - my $nwr = PublicInbox::NetWriter->new; - $nwr->add_url($dst); - $nwr->{quiet} = $lei->{opt}->{quiet}; - my $err = $nwr->errors($dst); + my $net = PublicInbox::NetWriter->new; + $net->add_url($dst); + $net->{quiet} = $lei->{opt}->{quiet}; + my $err = $net->errors($dst); return $lei->fail($err) if $err; require PublicInbox::URIimap; # TODO: URI cast early $self->{uri} = PublicInbox::URIimap->new($dst); $self->{uri}->mailbox or die "No mailbox: $dst"; - $lei->{nwr} = $nwr; + $lei->{net} = $net; $self->{base_type} = 'imap'; } else { die "bad mail --format=$fmt\n"; @@ -447,15 +447,16 @@ sub _augment_imap { # PublicInbox::NetReader::imap_each cb sub _do_augment_imap { my ($self, $lei) = @_; - my $nwr = $lei->{nwr}; + my $net = $lei->{net}; if ($lei->{opt}->{augment}) { my $dedupe = $lei->{dedupe}; if ($dedupe && $dedupe->prepare_dedupe) { - $nwr->imap_each($self->{uri}, \&_augment_imap, $lei); + $net->imap_each($self->{uri}, \&_augment_imap, $lei); $dedupe->pause_dedupe; } - } else { # clobber existing IMAP folder - $nwr->imap_delete_all($self->{uri}); + } elsif (!$self->{-wq_worker_nr}) { # undef or 0 + # clobber existing IMAP folder + $net->imap_delete_all($self->{uri}); } } @@ -523,16 +524,18 @@ sub post_augment { $m->($self, $lei, @args); } -sub ipc_atfork_child { +sub do_post_auth { my ($self) = @_; - my $lei = delete $self->{lei}; - $lei->lei_atfork_child; + my $lei = $self->{lei}; + # lei_xsearch can start as soon as all l2m workers get here + pkt_do($lei->{pkt_op_p}, 'incr_start_query') or + die "incr_start_query: $!"; my $aug; if (lock_free($self)) { my $mod = $self->{-wq_nr_workers}; my $shard = $self->{-wq_worker_nr}; - if (my $nwr = $lei->{nwr}) { - $nwr->{shard_info} = [ $mod, $shard ]; + if (my $net = $lei->{net}) { + $net->{shard_info} = [ $mod, $shard ]; } else { # Maildir (MH?) $self->{shard_info} = [ $mod, $shard ]; } @@ -545,13 +548,20 @@ sub ipc_atfork_child { eval { do_augment($self, $lei) }; $lei->fail($@) if $@; pkt_do($lei->{pkt_op_p}, $aug) == 1 or - die "do_post_augment trigger: $!"; + die "do_post_augment trigger: $!"; } if (my $zpipe = delete $lei->{zpipe}) { $lei->{1} = $zpipe->[1]; close $zpipe->[0]; } $self->{wcb} = $self->write_cb($lei); +} + +sub ipc_atfork_child { + my ($self) = @_; + my $lei = $self->{lei}; + $lei->lei_atfork_child; + $lei->{auth}->do_auth_atfork($self) if $lei->{auth}; $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb(); $self->SUPER::ipc_atfork_child; } @@ -584,4 +594,13 @@ sub wq_atexit_child { $SIG{__WARN__} = 'DEFAULT'; } +# called in top-level lei-daemon when LeiAuth is done +sub net_merge_complete { + my ($self) = @_; + $self->wq_broadcast('do_post_auth'); + $self->wq_close(1); +} + +no warnings 'once'; # the following works even when LeiAuth is lazy-loaded +*net_merge_all = \&PublicInbox::LeiAuth::net_merge_all; 1; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index e982165f..6dcadf0a 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -348,7 +348,7 @@ sub do_post_augment { close(delete $lei->{au_done}); # triggers wait_startq in lei_xsearch } -sub incr_post_augment { # called whenever an l2m shard finishes +sub incr_post_augment { # called whenever an l2m shard finishes augment my ($lei) = @_; my $l2m = $lei->{l2m} or die 'BUG: unexpected incr_post_augment'; return if ++$lei->{nr_post_augment} != $l2m->{-wq_nr_workers}; @@ -366,8 +366,8 @@ sub concurrency { } sub start_query { # always runs in main (lei-daemon) process - my ($self, $lei) = @_; - if ($lei->{opt}->{threads}) { + my ($self) = @_; + if ($self->{threads}) { for my $ibxish (locals($self)) { $self->wq_io_do('query_thread_mset', [], $ibxish); } @@ -382,6 +382,13 @@ sub start_query { # always runs in main (lei-daemon) process for my $uris (@$q) { $self->wq_io_do('query_remote_mboxrd', [], $uris); } + $self->wq_close(1); # lei_xsearch workers stop when done +} + +sub incr_start_query { # called whenever an l2m shard starts do_post_auth + my ($self, $l2m) = @_; + return if ++$self->{nr_start_query} != $l2m->{-wq_nr_workers}; + start_query($self); } sub ipc_atfork_child { @@ -393,6 +400,7 @@ sub ipc_atfork_child { sub do_query { my ($self, $lei) = @_; + my $l2m = $lei->{l2m}; my $ops = { '|' => [ $lei->can('sigpipe_handler'), $lei ], '!' => [ $lei->can('fail_handler'), $lei ], @@ -402,12 +410,13 @@ sub do_query { 'mset_progress' => [ \&mset_progress, $lei ], 'x_it' => [ $lei->can('x_it'), $lei ], 'child_error' => [ $lei->can('child_error'), $lei ], + 'incr_start_query' => [ \&incr_start_query, $self, $l2m ], }; + $lei->{auth}->op_merge($ops, $l2m) if $l2m && $lei->{auth}; ($lei->{pkt_op_c}, $lei->{pkt_op_p}) = PublicInbox::PktOp->pair($ops); $lei->{1}->autoflush(1); $lei->start_pager if delete $lei->{need_pager}; $lei->{ovv}->ovv_begin($lei); - my $l2m = $lei->{l2m}; if ($l2m) { $l2m->pre_augment($lei); if ($lei->{opt}->{augment} && delete $lei->{early_mua}) { @@ -428,10 +437,13 @@ sub do_query { $lei->oldset, { lei => $lei }); my $op = delete $lei->{pkt_op_c}; delete $lei->{pkt_op_p}; - $l2m->wq_close(1) if $l2m; + $self->{threads} = $lei->{opt}->{threads}; + if ($l2m) { + $l2m->net_merge_complete unless $lei->{auth}; + } else { + start_query($self); + } $lei->event_step_init; # wait for shutdowns - start_query($self, $lei); - $self->wq_close(1); # lei_xsearch workers stop when done if ($lei->{oneshot}) { while ($op->{sock}) { $op->event_step } }