From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 79F981FA01 for ; Sun, 21 Feb 2021 07:41:35 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 4/7] lei q: move augment into lei2mail workers Date: Sun, 21 Feb 2021 07:41:31 +0000 Message-Id: <20210221074134.15084-5-e@80x24.org> In-Reply-To: <20210221074134.15084-1-e@80x24.org> References: <20210221074134.15084-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This is a step which will allow us to parallelize augment on Maildir and IMAP. --- lib/PublicInbox/LeiToMail.pm | 10 +++++++++- lib/PublicInbox/LeiXSearch.pm | 18 ++++-------------- t/lei-externals.t | 3 ++- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 0e0b0a43..e5398912 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -14,6 +14,7 @@ use PublicInbox::LeiDedupe; use PublicInbox::OnDestroy; use PublicInbox::Git; use PublicInbox::GitAsyncCat; +use PublicInbox::PktOp qw(pkt_do); use Symbol qw(gensym); use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY); @@ -499,7 +500,7 @@ sub pre_augment { # fast (1 disk seek), runs in same process as post_augment sub do_augment { # slow, runs in wq worker my ($self, $lei) = @_; - # _do_augment_maildir, _do_augment_mbox + # _do_augment_maildir, _do_augment_mbox, or _do_augment_imap my $m = "_do_augment_$self->{base_type}"; $self->$m($lei); } @@ -516,6 +517,13 @@ sub ipc_atfork_child { my ($self) = @_; my $lei = delete $self->{lei}; $lei->lei_atfork_child; + if ($self->{-wq_worker_nr} == 0) { + local $0 = 'do_augment'; + eval { do_augment($self, $lei) }; + $lei->fail($@) if $@; + pkt_do($lei->{pkt_op_p}, '.') == 1 or + die "do_post_augment trigger: $!"; + } if (my $zpipe = delete $lei->{zpipe}) { $lei->{1} = $zpipe->[1]; close $zpipe->[0]; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 10485220..a319b75f 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -99,21 +99,21 @@ sub _mset_more ($$) { $size >= $mo->{limit} && (($mo->{offset} += $size) < $mo->{limit}); } -# $startq will EOF when query_prepare is done augmenting and allow +# $startq will EOF when do_augment is done augmenting and allow # query_mset and query_thread_mset to proceed. sub wait_startq ($) { my ($lei) = @_; my $startq = delete $lei->{startq} or return; while (1) { - my $n = sysread($startq, my $query_prepare_done, 1); + my $n = sysread($startq, my $do_augment_done, 1); if (defined $n) { return if $n == 0; # no MUA - if ($query_prepare_done eq 'q') { + if ($do_augment_done eq 'q') { $lei->{opt}->{quiet} = 1; delete $lei->{opt}->{verbose}; delete $lei->{-progress}; } else { - $lei->fail("$$ WTF `$query_prepare_done'"); + $lei->fail("$$ WTF `$do_augment_done'"); } return; } @@ -386,15 +386,6 @@ sub ipc_atfork_child { $self->SUPER::ipc_atfork_child; } -sub query_prepare { # called by wq_io_do - my ($self) = @_; - local $0 = "$0 query_prepare"; - my $lei = $self->{lei}; - eval { $lei->{l2m}->do_augment($lei) }; - $lei->fail($@) if $@; - pkt_do($lei->{pkt_op_p}, '.') == 1 or die "do_post_augment trigger: $!" -} - sub do_query { my ($self, $lei) = @_; my $ops = { @@ -433,7 +424,6 @@ sub do_query { delete $lei->{pkt_op_p}; $l2m->wq_close(1) if $l2m; $lei->event_step_init; # wait for shutdowns - $self->wq_io_do('query_prepare', []) if $l2m; # for augment/dedupe start_query($self, $lei); $self->wq_close(1); # lei_xsearch workers stop when done if ($lei->{oneshot}) { diff --git a/t/lei-externals.t b/t/lei-externals.t index edfbb2bf..02b15232 100644 --- a/t/lei-externals.t +++ b/t/lei-externals.t @@ -186,7 +186,8 @@ SKIP: { my @s = grep(/^Subject:/, $cat->()); is(scalar(@s), 1, "1 result in mbox$sfx"); $lei->('q', '-a', '-o', "mboxcl2:$f", 's:see attachment'); - is(grep(!/^#/, $lei_err), 0, 'no errors from augment'); + is(grep(!/^#/, $lei_err), 0, 'no errors from augment') or + diag $lei_err; @s = grep(/^Subject:/, my @wtf = $cat->()); is(scalar(@s), 2, "2 results in mbox$sfx");