From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 4/4] lei: use async barrier for --import-before
Date: Tue, 16 Apr 2024 20:56:29 +0000 [thread overview]
Message-ID: <20240416205629.3648894-5-e@80x24.org> (raw)
In-Reply-To: <20240416205629.3648894-1-e@80x24.org>
Write barriers can take a long time to finish, especially when
commands are issues in parallel. So handle it asynchronously
without blocking lei-daemon by making EOFpipe a little more
flexible by supporting arguments to the callback function.
This is another step towards improving parallel use of lei.
---
lib/PublicInbox/EOFpipe.pm | 7 ++++---
lib/PublicInbox/LeiToMail.pm | 29 ++++++++++++++++++++++-------
lib/PublicInbox/LeiXSearch.pm | 13 +++++++++----
3 files changed, 35 insertions(+), 14 deletions(-)
diff --git a/lib/PublicInbox/EOFpipe.pm b/lib/PublicInbox/EOFpipe.pm
index 3474874f..77b699a2 100644
--- a/lib/PublicInbox/EOFpipe.pm
+++ b/lib/PublicInbox/EOFpipe.pm
@@ -7,8 +7,8 @@ use parent qw(PublicInbox::DS);
use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT $F_SETPIPE_SZ);
sub new {
- my (undef, $rd, $cb) = @_;
- my $self = bless { cb => $cb }, __PACKAGE__;
+ my (undef, $rd, @cb_args) = @_;
+ my $self = bless { cb_args => \@cb_args }, __PACKAGE__;
# 4096: page size
fcntl($rd, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
$self->SUPER::new($rd, EPOLLIN|EPOLLONESHOT);
@@ -17,7 +17,8 @@ sub new {
sub event_step {
my ($self) = @_;
if ($self->do_read(my $buf, 1) == 0) { # auto-closed
- $self->{cb}->();
+ my ($cb, @args) = @{delete $self->{cb_args}};
+ $cb->(@args);
}
}
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 593547f6..5481b5e4 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -14,7 +14,7 @@ use PublicInbox::Import;
use IO::Handle; # ->autoflush
use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
use PublicInbox::Syscall qw(rename_noreplace);
-use autodie qw(open seek close);
+use autodie qw(pipe open seek close);
use Carp qw(croak);
my %kw2char = ( # Maildir characters
@@ -605,7 +605,7 @@ sub _pre_augment_mbox {
$lei->{dedupe} && $lei->{dedupe}->can('reset_dedupe');
}
if ($self->{zsfx} = PublicInbox::MboxReader::zsfx($dst)) {
- pipe(my ($r, $w)) or die "pipe: $!";
+ pipe(my $r, my $w);
$lei->{zpipe} = [ $r, $w ];
$lei->{ovv}->{lock_path} and
die 'BUG: unexpected {ovv}->{lock_path}';
@@ -719,17 +719,32 @@ sub do_augment { # slow, runs in wq worker
$m->($self, $lei);
}
+sub post_augment_call ($$$$) {
+ my ($self, $lei, $m, $post_augment_done) = @_;
+ eval { $m->($self, $lei) };
+ $lei->{post_augment_err} = $@ if $@; # for post_augment_done
+}
+
# fast (spawn compressor or mkdir), runs in same process as pre_augment
sub post_augment {
- my ($self, $lei, @args) = @_;
+ my ($self, $lei, $post_augment_done) = @_;
$self->{-au_noted}++ and $lei->qerr("# writing to $self->{dst} ...");
- # FIXME: this synchronous wait can be slow w/ parallel callers
- my $wait = $lei->{opt}->{'import-before'} ?
- $lei->{sto}->wq_do('barrier') : 0;
# _post_augment_mbox
my $m = $self->can("_post_augment_$self->{base_type}") or return;
- $m->($self, $lei, @args);
+
+ # --import-before is only for lei-(q|lcat), not lei-convert
+ $lei->{opt}->{'import-before'} or
+ return post_augment_call $self, $lei, $m, $post_augment_done;
+
+ # we can't deal with post_augment until import-before commits:
+ require PublicInbox::EOFpipe;
+ my @io = @$lei{qw(2 sock)};
+ pipe(my $r, $io[2]);
+ PublicInbox::EOFpipe->new($r, \&post_augment_call,
+ $self, $lei, $m, $post_augment_done);
+ $lei->{sto}->wq_io_do('barrier', \@io);
+ # _post_augment_* && post_augment_done run when barrier is complete
}
# called by every single l2m worker process
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 5a5a1adc..43dedd10 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -22,6 +22,7 @@ use PublicInbox::ContentHash qw(git_sha);
use POSIX qw(strftime);
use autodie qw(close open read seek truncate);
use PublicInbox::Syscall qw($F_SETPIPE_SZ);
+use PublicInbox::OnDestroy;
sub new {
my ($class) = @_;
@@ -428,11 +429,9 @@ sub query_done { # EOF callback for main daemon
$lei->dclose;
}
-sub do_post_augment {
+sub post_augment_done { # via on_destroy in top-level lei-daemon
my ($lei) = @_;
- my $l2m = $lei->{l2m} or return; # client disconnected
- eval { $l2m->post_augment($lei) };
- my $err = $@;
+ my $err = delete $lei->{post_augment_err};
if ($err) {
if (my $lxs = delete $lei->{lxs}) {
$lxs->wq_kill(-POSIX::SIGTERM());
@@ -447,6 +446,12 @@ sub do_post_augment {
close(delete $lei->{au_done}); # trigger wait_startq if start_mua didn't
}
+sub do_post_augment {
+ my ($lei) = @_;
+ my $l2m = $lei->{l2m} or return; # client disconnected
+ $l2m->post_augment($lei, on_destroy(\&post_augment_done, $lei));
+}
+
sub incr_post_augment { # called whenever an l2m shard finishes augment
my ($lei) = @_;
my $l2m = $lei->{l2m} or return; # client disconnected
prev parent reply other threads:[~2024-04-16 20:56 UTC|newest]
Thread overview: 7+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-04-16 20:56 [PATCH 0/4] lei parallelism fixes Eric Wong
2024-04-16 20:56 ` [PATCH 1/4] v2 + lei/store: always wait for fast-import checkpoint Eric Wong
2024-04-16 20:56 ` [PATCH 2/4] lei: use ->barrier to commit to lei/store Eric Wong
2024-04-16 20:56 ` [PATCH 3/4] lei/store: stop shard workers + cat-file on idle Eric Wong
2024-04-17 9:34 ` [PATCH v2 " Eric Wong
2024-04-17 9:54 ` sub prototypes aren't enough Eric Wong
2024-04-16 20:56 ` Eric Wong [this message]
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://public-inbox.org/README
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20240416205629.3648894-5-e@80x24.org \
--to=e@80x24.org \
--cc=meta@public-inbox.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).