From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF, T_SCC_BODY_TEXT_LINE shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 0FBD41F487 for ; Wed, 15 Nov 2023 09:21:49 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1700040109; bh=hfROemVMB8cuwFWukuY7WklRqSswxPEnxqf6MGJdvdM=; h=From:To:Subject:Date:In-Reply-To:References:From; b=C3UcoKvbgeN5QfBFSYN6Nnxg98pGWh/taBUuCXfNeTRWVrb1kPzEIoA/BQGRgpB3C 7I4uu6AEgFd0FMGpJeh0Nvb1wiTZtr8EY2+4rklrzL701vMVuro7ivlzbttubfT/kv 1UoVFyvYg3q/Gs9DEgUCD5hmghWdItTtNwrNU3tA= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 3/4] lei: avoid extra fork for v2 outputs Date: Wed, 15 Nov 2023 09:21:44 +0000 Message-Id: <20231115092145.1131822-4-e@80x24.org> In-Reply-To: <20231115092145.1131822-1-e@80x24.org> References: <20231115092145.1131822-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: We've always forced LeiToMail to only have one process for v2 outputs anyways since v2 has its own sharding and IPC. Thus we can use the single LeiToMail process directly to avoid extra IPC overhead. --- lib/PublicInbox/LeiConvert.pm | 7 ++----- lib/PublicInbox/LeiToMail.pm | 19 +++++++++---------- lib/PublicInbox/LeiXSearch.pm | 6 +----- lib/PublicInbox/V2Writable.pm | 2 -- 4 files changed, 12 insertions(+), 22 deletions(-) diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm index 4a1f8323..9d2479b0 100644 --- a/lib/PublicInbox/LeiConvert.pm +++ b/lib/PublicInbox/LeiConvert.pm @@ -35,12 +35,9 @@ sub process_inputs { # via wq_do my $lei = $self->{lei}; delete $lei->{1}; my $l2m = delete $lei->{l2m}; - my $nr_w = delete($l2m->{-nr_write}) // 0; delete $self->{wcb}; # commit - if (my $v2w = delete $lei->{v2w}) { - $nr_w = $v2w->wq_do('done'); # may die - $v2w->wq_close; - } + if (my $v2w = delete $lei->{v2w}) { $v2w->done } # may die + my $nr_w = delete($l2m->{-nr_write}) // 0; my $d = (delete($l2m->{-nr_seen}) // 0) - $nr_w; $d = $d ? " ($d duplicates)" : ''; $lei->qerr("# converted $nr_w messages$d"); diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 2d9b7061..0d62888d 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -369,12 +369,14 @@ sub _v2_write_cb ($$) { my ($self, $lei) = @_; my $dedupe = $lei->{dedupe}; $dedupe->prepare_dedupe if $dedupe; + # only call in worker + $PublicInbox::Import::DROP_UNIQUE_UNSUB = $lei->{-drop_unique_unsub}; sub { # for git_to_mail my ($bref, $smsg, $eml) = @_; $eml //= PublicInbox::Eml->new($bref); ++$self->{-nr_seen}; return if $dedupe && $dedupe->is_dup($eml, $smsg); - $lei->{v2w}->wq_do('add', $eml); # V2Writable->add + $lei->{v2w}->add($eml) and ++$self->{-nr_write}; } } @@ -647,11 +649,6 @@ sub _do_augment_mbox { $dedupe->pause_dedupe if $dedupe; } -sub v2w_done_wait { # awaitpid cb - my ($pid, $v2w, $lei) = @_; - $lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?; -} - sub _pre_augment_v2 { my ($self, $lei) = @_; my $dir = $self->{dst}; @@ -677,11 +674,9 @@ sub _pre_augment_v2 { $lei->x_it(shift); die "E: can't write v2 inbox with broken config\n"; }); + $lei->{-drop_unique_unsub} = $PublicInbox::Import::DROP_UNIQUE_UNSUB; $ibx->init_inbox if @creat; - my $v2w = $ibx->importer; - $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei}, - \&v2w_done_wait, $lei); - $lei->{v2w} = $v2w; + $lei->{v2w} = $ibx->importer; return if !$lei->{opt}->{shared}; my $d = "$lei->{ale}->{git}->{git_dir}/objects"; open my $fh, '+>>', my $f = "$dir/git/0.git/objects/info/alternates"; @@ -806,6 +801,10 @@ sub wq_atexit_child { my $lei = $self->{lei}; $lei->{ale}->git->async_wait_all; my ($nr_w, $nr_s) = delete(@$self{qw(-nr_write -nr_seen)}); + if (my $v2w = delete $lei->{v2w}) { + eval { $v2w->done }; + $lei->child_error($?, "E: $@ ($v2w->{ibx}->{inboxdir})") if $@; + } delete $self->{wcb}; (($nr_w //= 0) + ($nr_s //= 0)) or return; return if $lei->{early_mua} || !$lei->{-progress} || !$lei->{pkt_op_p}; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 7eda6f9e..5e36c11a 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -391,11 +391,6 @@ sub query_done { # EOF callback for main daemon ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and warn "BUG: {sto} missing with --mail-sync"; $lei->sto_done_request; - my $nr_w = delete($lei->{-nr_write}) // 0; - if (my $v2w = delete $lei->{v2w}) { - $nr_w = $v2w->wq_do('done'); # may die - $v2w->wq_close; - } $lei->{ovv}->ovv_end($lei); if ($l2m) { # close() calls LeiToMail reap_compress if (my $out = delete $lei->{old_1}) { @@ -413,6 +408,7 @@ Error closing $lei->{ovv}->{dst}: \$!=$! \$?=$? delete $l2m->{mbl}; # drop dotlock } } + my $nr_w = delete($lei->{-nr_write}) // 0; my $nr_dup = (delete($lei->{-nr_seen}) // 0) - $nr_w; if ($lei->{-progress}) { my $tot = $lei->{-mset_total} // 0; diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 231ed516..fb259396 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -135,7 +135,6 @@ sub add { if (do_idx($self, $mime, $smsg)) { $self->checkpoint; } - ++$self->{-nr_add}; # for lei convert $cmt; } @@ -611,7 +610,6 @@ sub done { $self->lock_release(!!$nbytes) if $shards; $self->git->cleanup; die $err if $err; - delete $self->{-nr_add}; # for lei-convert } sub importer {