From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 9BFB11F543 for ; Tue, 16 Apr 2024 20:56:30 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1713300990; bh=mfpL2jmM+OWOCOhUEYVSUHqsACS6khMrnJgC5lAG8c0=; h=From:To:Subject:Date:In-Reply-To:References:From; b=RnosTVX8NO6ZQ0ejaXVCBcrP8lmvQIuQb5/3ajA+WrdAQsBnIJMvGYMbm0XyDTkXx MQLK6M4QLfnPm9gaI9OACrKh99BlH8A2dspsc7Vta5AqxlvCKKHp0BRxBqBr8H77Qq RzK00qsB/vfoFwgeekm3c85GdmzWIHX29rIe+vD4= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 4/4] lei: use async barrier for --import-before Date: Tue, 16 Apr 2024 20:56:29 +0000 Message-ID: <20240416205629.3648894-5-e@80x24.org> In-Reply-To: <20240416205629.3648894-1-e@80x24.org> References: <20240416205629.3648894-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Write barriers can take a long time to finish, especially when commands are issues in parallel. So handle it asynchronously without blocking lei-daemon by making EOFpipe a little more flexible by supporting arguments to the callback function. This is another step towards improving parallel use of lei. --- lib/PublicInbox/EOFpipe.pm | 7 ++++--- lib/PublicInbox/LeiToMail.pm | 29 ++++++++++++++++++++++------- lib/PublicInbox/LeiXSearch.pm | 13 +++++++++---- 3 files changed, 35 insertions(+), 14 deletions(-) diff --git a/lib/PublicInbox/EOFpipe.pm b/lib/PublicInbox/EOFpipe.pm index 3474874f..77b699a2 100644 --- a/lib/PublicInbox/EOFpipe.pm +++ b/lib/PublicInbox/EOFpipe.pm @@ -7,8 +7,8 @@ use parent qw(PublicInbox::DS); use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT $F_SETPIPE_SZ); sub new { - my (undef, $rd, $cb) = @_; - my $self = bless { cb => $cb }, __PACKAGE__; + my (undef, $rd, @cb_args) = @_; + my $self = bless { cb_args => \@cb_args }, __PACKAGE__; # 4096: page size fcntl($rd, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ; $self->SUPER::new($rd, EPOLLIN|EPOLLONESHOT); @@ -17,7 +17,8 @@ sub new { sub event_step { my ($self) = @_; if ($self->do_read(my $buf, 1) == 0) { # auto-closed - $self->{cb}->(); + my ($cb, @args) = @{delete $self->{cb_args}}; + $cb->(@args); } } diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 593547f6..5481b5e4 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -14,7 +14,7 @@ use PublicInbox::Import; use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY); use PublicInbox::Syscall qw(rename_noreplace); -use autodie qw(open seek close); +use autodie qw(pipe open seek close); use Carp qw(croak); my %kw2char = ( # Maildir characters @@ -605,7 +605,7 @@ sub _pre_augment_mbox { $lei->{dedupe} && $lei->{dedupe}->can('reset_dedupe'); } if ($self->{zsfx} = PublicInbox::MboxReader::zsfx($dst)) { - pipe(my ($r, $w)) or die "pipe: $!"; + pipe(my $r, my $w); $lei->{zpipe} = [ $r, $w ]; $lei->{ovv}->{lock_path} and die 'BUG: unexpected {ovv}->{lock_path}'; @@ -719,17 +719,32 @@ sub do_augment { # slow, runs in wq worker $m->($self, $lei); } +sub post_augment_call ($$$$) { + my ($self, $lei, $m, $post_augment_done) = @_; + eval { $m->($self, $lei) }; + $lei->{post_augment_err} = $@ if $@; # for post_augment_done +} + # fast (spawn compressor or mkdir), runs in same process as pre_augment sub post_augment { - my ($self, $lei, @args) = @_; + my ($self, $lei, $post_augment_done) = @_; $self->{-au_noted}++ and $lei->qerr("# writing to $self->{dst} ..."); - # FIXME: this synchronous wait can be slow w/ parallel callers - my $wait = $lei->{opt}->{'import-before'} ? - $lei->{sto}->wq_do('barrier') : 0; # _post_augment_mbox my $m = $self->can("_post_augment_$self->{base_type}") or return; - $m->($self, $lei, @args); + + # --import-before is only for lei-(q|lcat), not lei-convert + $lei->{opt}->{'import-before'} or + return post_augment_call $self, $lei, $m, $post_augment_done; + + # we can't deal with post_augment until import-before commits: + require PublicInbox::EOFpipe; + my @io = @$lei{qw(2 sock)}; + pipe(my $r, $io[2]); + PublicInbox::EOFpipe->new($r, \&post_augment_call, + $self, $lei, $m, $post_augment_done); + $lei->{sto}->wq_io_do('barrier', \@io); + # _post_augment_* && post_augment_done run when barrier is complete } # called by every single l2m worker process diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 5a5a1adc..43dedd10 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -22,6 +22,7 @@ use PublicInbox::ContentHash qw(git_sha); use POSIX qw(strftime); use autodie qw(close open read seek truncate); use PublicInbox::Syscall qw($F_SETPIPE_SZ); +use PublicInbox::OnDestroy; sub new { my ($class) = @_; @@ -428,11 +429,9 @@ sub query_done { # EOF callback for main daemon $lei->dclose; } -sub do_post_augment { +sub post_augment_done { # via on_destroy in top-level lei-daemon my ($lei) = @_; - my $l2m = $lei->{l2m} or return; # client disconnected - eval { $l2m->post_augment($lei) }; - my $err = $@; + my $err = delete $lei->{post_augment_err}; if ($err) { if (my $lxs = delete $lei->{lxs}) { $lxs->wq_kill(-POSIX::SIGTERM()); @@ -447,6 +446,12 @@ sub do_post_augment { close(delete $lei->{au_done}); # trigger wait_startq if start_mua didn't } +sub do_post_augment { + my ($lei) = @_; + my $l2m = $lei->{l2m} or return; # client disconnected + $l2m->post_augment($lei, on_destroy(\&post_augment_done, $lei)); +} + sub incr_post_augment { # called whenever an l2m shard finishes augment my ($lei) = @_; my $l2m = $lei->{l2m} or return; # client disconnected