From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 414D41F51A for ; Tue, 16 Apr 2024 20:56:30 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1713300990; bh=5khtDdlUR0XzPcgDSZSviQ3MfNjOMeICnrboytDl44g=; h=From:To:Subject:Date:In-Reply-To:References:From; b=N/5cSs2jJOawYfGscGfmrFITtKTnA4eg8lDe+6NEhznuWeW3+AB9Lxic7N0zDVbDW c8MHnDf7zVBgesSKuA2MJxpAhPUmOlsZSb/uHG1OpzE7FSQ+1pAh881F+GC6ETDM/7 FAp63phdmGGAoh8ZPZs9NVTyTdolr5J9EX3H9cwQ= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 2/4] lei: use ->barrier to commit to lei/store Date: Tue, 16 Apr 2024 20:56:27 +0000 Message-ID: <20240416205629.3648894-3-e@80x24.org> In-Reply-To: <20240416205629.3648894-1-e@80x24.org> References: <20240416205629.3648894-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: barrier (synchronous checkpoint) is better than ->done with parallel lei commands being issued (via '&' or different terminals), since repeatedly stopping and restarting processes doesn't play nicely with expensive tasks like `lei reindex'. This introduces a slight regression in maintaining more processes (and thus resource use) when lei is idle, but that'll be fixed in the next commit. --- lib/PublicInbox/ExtSearchIdx.pm | 1 + lib/PublicInbox/LEI.pm | 6 +++--- lib/PublicInbox/LeiInput.pm | 2 +- lib/PublicInbox/LeiRefreshMailSync.pm | 2 +- lib/PublicInbox/LeiRemote.pm | 4 ++-- lib/PublicInbox/LeiStore.pm | 26 ++++++++++++++++++-------- lib/PublicInbox/LeiToMail.pm | 3 ++- lib/PublicInbox/LeiXSearch.pm | 4 ++-- t/lei-store-fail.t | 2 +- 9 files changed, 31 insertions(+), 19 deletions(-) diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index ebbffffc..763a124c 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -1424,5 +1424,6 @@ no warnings 'once'; *idx_shard = \&PublicInbox::V2Writable::idx_shard; *reindex_checkpoint = \&PublicInbox::V2Writable::reindex_checkpoint; *checkpoint = \&PublicInbox::V2Writable::checkpoint; +*barrier = \&PublicInbox::V2Writable::barrier; 1; diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 5b46686a..e9a0de6c 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -1443,7 +1443,7 @@ sub wq_eof { # EOF callback for main daemon my ($lei, $wq_fld) = @_; local $current_lei = $lei; my $wq = delete $lei->{$wq_fld // 'wq1'}; - $lei->sto_done_request($wq); + $lei->sto_barrier_request($wq); $wq // $lei->fail; # already failed } @@ -1548,7 +1548,7 @@ sub lms { (-f $f || $creat) ? PublicInbox::LeiMailSync->new($f) : undef; } -sub sto_done_request { +sub sto_barrier_request { my ($lei, $wq) = @_; return unless $lei->{sto} && $lei->{sto}->{-wq_s1}; local $current_lei = $lei; @@ -1558,7 +1558,7 @@ sub sto_done_request { my $s = ($wq ? $wq->{lei_sock} : undef) // $lei->{sock}; my $errfh = $lei->{2} // *STDERR{GLOB}; my @io = $s ? ($errfh, $s) : ($errfh); - eval { $lei->{sto}->wq_io_do('done', \@io) }; + eval { $lei->{sto}->wq_io_do('barrier', \@io, 1) }; } warn($@) if $@; } diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm index d003d983..c388f7dc 100644 --- a/lib/PublicInbox/LeiInput.pm +++ b/lib/PublicInbox/LeiInput.pm @@ -499,7 +499,7 @@ sub process_inputs { } # always commit first, even on error partial work is acceptable for # lei - $self->{lei}->sto_done_request; + $self->{lei}->sto_barrier_request; $self->{lei}->fail($err) if $err; } diff --git a/lib/PublicInbox/LeiRefreshMailSync.pm b/lib/PublicInbox/LeiRefreshMailSync.pm index a60a9a5e..dde23274 100644 --- a/lib/PublicInbox/LeiRefreshMailSync.pm +++ b/lib/PublicInbox/LeiRefreshMailSync.pm @@ -60,7 +60,7 @@ sub input_path_url { # overrides PublicInbox::LeiInput::input_path_url $self->folder_missing($$uri); } } else { die "BUG: $input not supported" } - $self->{lei}->sto_done_request; + $self->{lei}->sto_barrier_request; } sub lei_refresh_mail_sync { diff --git a/lib/PublicInbox/LeiRemote.pm b/lib/PublicInbox/LeiRemote.pm index ddcaf2c9..d6fc40a4 100644 --- a/lib/PublicInbox/LeiRemote.pm +++ b/lib/PublicInbox/LeiRemote.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2021 all contributors +# Copyright (C) all contributors # License: AGPL-3.0+ # Make remote externals HTTP(S) inboxes behave like @@ -51,7 +51,7 @@ sub mset { $fh = IO::Uncompress::Gunzip->new($fh, MultiStream=>1, AutoClose=>1); eval { PublicInbox::MboxReader->mboxrd($fh, \&each_mboxrd_eml, $self) }; my $err = $@ ? ": $@" : ''; - my $wait = $self->{lei}->{sto}->wq_do('done'); + my $wait = $self->{lei}->{sto}->wq_do('barrier'); $lei->child_error($?, "@$cmd failed$err") if $err || $?; $self; # we are the mset (and $ibx, and $self) } diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm index 0df2352c..162c915f 100644 --- a/lib/PublicInbox/LeiStore.pm +++ b/lib/PublicInbox/LeiStore.pm @@ -81,7 +81,7 @@ sub importer { delete $self->{im}; $im->done; undef $im; - $self->checkpoint; + $self->barrier; $max = $self->{priv_eidx}->{mg}->git_epochs + 1; } my (undef, $tl) = eidx_init($self); # acquire lock @@ -118,7 +118,7 @@ sub cat_blob { sub schedule_commit { my ($self, $sec) = @_; - add_uniq_timer($self->{priv_eidx}->{topdir}, $sec, \&done, $self); + add_uniq_timer($self->{priv_eidx}->{topdir}, $sec, \&barrier, $self); } # follows the stderr file @@ -391,7 +391,7 @@ sub reindex_done { my ($self) = @_; my ($eidx, $tl) = eidx_init($self); $eidx->git->async_wait_all; - # ->done to be called via sto_done_request + # ->done to be called via sto_barrier_request } sub add_eml { @@ -571,11 +571,21 @@ sub set_xvmd { sto_export_kw($self, $smsg->{num}, $vmd); } -sub checkpoint { - my ($self, $wait) = @_; - $self->{im}->barrier if $self->{im}; +sub barrier { + my ($self) = @_; + my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_barrier_request + my @err; + if ($self->{im}) { + eval { $self->{im}->barrier }; + push(@err, "E: import barrier: $@\n") if $@; + } delete $self->{lms}; - $self->{priv_eidx}->checkpoint($wait); + eval { $self->{priv_eidx}->barrier }; + push(@err, "E: priv_eidx barrier: $@\n") if $@; + print { $errfh // \*STDERR } @err; + send($lei_sock, 'child_error 256', 0) if @err && $lei_sock; + xchg_stderr($self); + die @err if @err; } sub xchg_stderr { @@ -594,7 +604,7 @@ sub xchg_stderr { sub done { my ($self) = @_; - my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_done_request + my ($errfh, $lei_sock) = @$self{0, 1}; my @err; if (my $im = delete($self->{im})) { eval { $im->done }; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index dfae29e9..593547f6 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -724,8 +724,9 @@ sub post_augment { my ($self, $lei, @args) = @_; $self->{-au_noted}++ and $lei->qerr("# writing to $self->{dst} ..."); + # FIXME: this synchronous wait can be slow w/ parallel callers my $wait = $lei->{opt}->{'import-before'} ? - $lei->{sto}->wq_do('checkpoint', 1) : 0; + $lei->{sto}->wq_do('barrier') : 0; # _post_augment_mbox my $m = $self->can("_post_augment_$self->{base_type}") or return; $m->($self, $lei, @args); diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index d4f34733..5a5a1adc 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -363,7 +363,7 @@ print STDERR $_; $self, $lei, $each_smsg); }; my ($exc, $code) = ($@, $?); - $lei->sto_done_request if delete($self->{-sto_imported}); + $lei->sto_barrier_request if delete($self->{-sto_imported}); die "E: $exc" if $exc && !$code; my $nr = delete $lei->{-nr_remote_eml} // 0; if (!$code) { # don't update if no results, maybe MTA is down @@ -399,7 +399,7 @@ sub query_done { # EOF callback for main daemon delete $lei->{lxs}; ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and warn "BUG: {sto} missing with --mail-sync"; - $lei->sto_done_request; + $lei->sto_barrier_request; $lei->{ovv}->ovv_end($lei); if ($l2m) { # close() calls LeiToMail reap_compress $l2m->finish_output($lei); diff --git a/t/lei-store-fail.t b/t/lei-store-fail.t index c2f03148..1e83e383 100644 --- a/t/lei-store-fail.t +++ b/t/lei-store-fail.t @@ -39,7 +39,7 @@ EOM lei_ok qw(q m:testmessage@example.com); is($lei_out, "[null]\n", 'delayed commit is unindexed'); - # make immediate ->sto_done_request fail from mboxrd import: + # make immediate ->sto_barrier_request fail from mboxrd import: remove_tree("$ENV{HOME}/.local/share/lei/store"); # subsequent lei commands are undefined behavior, # but we need to make sure the current lei command fails: