unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
* [PATCH 0/4] lei parallelism fixes
@ 2024-04-16 20:56 Eric Wong
  2024-04-16 20:56 ` [PATCH 1/4] v2 + lei/store: always wait for fast-import checkpoint Eric Wong
                   ` (3 more replies)
  0 siblings, 4 replies; 7+ messages in thread
From: Eric Wong @ 2024-04-16 20:56 UTC (permalink / raw)
  To: meta

This series allows `lei reindex' to run in parallel with other
lei commands which write to lei/store.

Eric Wong (4):
  v2 + lei/store: always wait for fast-import checkpoint
  lei: use ->barrier to commit to lei/store
  lei/store: stop shard workers + cat-file on idle
  lei: use async barrier for --import-before

 lib/PublicInbox/EOFpipe.pm            |  7 ++--
 lib/PublicInbox/ExtSearchIdx.pm       |  1 +
 lib/PublicInbox/LEI.pm                |  6 ++--
 lib/PublicInbox/LeiInput.pm           |  2 +-
 lib/PublicInbox/LeiRefreshMailSync.pm |  2 +-
 lib/PublicInbox/LeiRemote.pm          |  4 +--
 lib/PublicInbox/LeiStore.pm           | 46 ++++++++++++++++-----------
 lib/PublicInbox/LeiToMail.pm          | 28 ++++++++++++----
 lib/PublicInbox/LeiXSearch.pm         | 17 ++++++----
 lib/PublicInbox/V2Writable.pm         |  8 +----
 t/lei-store-fail.t                    |  2 +-
 11 files changed, 74 insertions(+), 49 deletions(-)

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [PATCH 1/4] v2 + lei/store: always wait for fast-import checkpoint
  2024-04-16 20:56 [PATCH 0/4] lei parallelism fixes Eric Wong
@ 2024-04-16 20:56 ` Eric Wong
  2024-04-16 20:56 ` [PATCH 2/4] lei: use ->barrier to commit to lei/store Eric Wong
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2024-04-16 20:56 UTC (permalink / raw)
  To: meta

Since data going to git is the most important, always ensure
data is written to git before attempting to write anything to
SQLite or Xapian.
---
 lib/PublicInbox/LeiStore.pm   | 4 +---
 lib/PublicInbox/V2Writable.pm | 8 +-------
 2 files changed, 2 insertions(+), 10 deletions(-)

diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 2eb09eca..0df2352c 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -573,9 +573,7 @@ sub set_xvmd {
 
 sub checkpoint {
 	my ($self, $wait) = @_;
-	if (my $im = $self->{im}) {
-		$wait ? $im->barrier : $im->checkpoint;
-	}
+	$self->{im}->barrier if $self->{im};
 	delete $self->{lms};
 	$self->{priv_eidx}->checkpoint($wait);
 }
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index fb259396..43f37f60 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -507,13 +507,7 @@ sub set_last_commits ($) { # this is NOT for ExtSearchIdx
 sub checkpoint ($;$) {
 	my ($self, $wait) = @_;
 
-	if (my $im = $self->{im}) {
-		if ($wait) {
-			$im->barrier;
-		} else {
-			$im->checkpoint;
-		}
-	}
+	$self->{im}->barrier if $self->{im};
 	my $shards = $self->{idx_shards};
 	if ($shards) {
 		my $dbh = $self->{mm}->{dbh} if $self->{mm};

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH 2/4] lei: use ->barrier to commit to lei/store
  2024-04-16 20:56 [PATCH 0/4] lei parallelism fixes Eric Wong
  2024-04-16 20:56 ` [PATCH 1/4] v2 + lei/store: always wait for fast-import checkpoint Eric Wong
@ 2024-04-16 20:56 ` Eric Wong
  2024-04-16 20:56 ` [PATCH 3/4] lei/store: stop shard workers + cat-file on idle Eric Wong
  2024-04-16 20:56 ` [PATCH 4/4] lei: use async barrier for --import-before Eric Wong
  3 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2024-04-16 20:56 UTC (permalink / raw)
  To: meta

barrier (synchronous checkpoint) is better than ->done with
parallel lei commands being issued (via '&' or different
terminals), since repeatedly stopping and restarting processes
doesn't play nicely with expensive tasks like `lei reindex'.

This introduces a slight regression in maintaining more
processes (and thus resource use) when lei is idle, but that'll
be fixed in the next commit.
---
 lib/PublicInbox/ExtSearchIdx.pm       |  1 +
 lib/PublicInbox/LEI.pm                |  6 +++---
 lib/PublicInbox/LeiInput.pm           |  2 +-
 lib/PublicInbox/LeiRefreshMailSync.pm |  2 +-
 lib/PublicInbox/LeiRemote.pm          |  4 ++--
 lib/PublicInbox/LeiStore.pm           | 26 ++++++++++++++++++--------
 lib/PublicInbox/LeiToMail.pm          |  3 ++-
 lib/PublicInbox/LeiXSearch.pm         |  4 ++--
 t/lei-store-fail.t                    |  2 +-
 9 files changed, 31 insertions(+), 19 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index ebbffffc..763a124c 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -1424,5 +1424,6 @@ no warnings 'once';
 *idx_shard = \&PublicInbox::V2Writable::idx_shard;
 *reindex_checkpoint = \&PublicInbox::V2Writable::reindex_checkpoint;
 *checkpoint = \&PublicInbox::V2Writable::checkpoint;
+*barrier = \&PublicInbox::V2Writable::barrier;
 
 1;
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 5b46686a..e9a0de6c 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1443,7 +1443,7 @@ sub wq_eof { # EOF callback for main daemon
 	my ($lei, $wq_fld) = @_;
 	local $current_lei = $lei;
 	my $wq = delete $lei->{$wq_fld // 'wq1'};
-	$lei->sto_done_request($wq);
+	$lei->sto_barrier_request($wq);
 	$wq // $lei->fail; # already failed
 }
 
@@ -1548,7 +1548,7 @@ sub lms {
 	(-f $f || $creat) ? PublicInbox::LeiMailSync->new($f) : undef;
 }
 
-sub sto_done_request {
+sub sto_barrier_request {
 	my ($lei, $wq) = @_;
 	return unless $lei->{sto} && $lei->{sto}->{-wq_s1};
 	local $current_lei = $lei;
@@ -1558,7 +1558,7 @@ sub sto_done_request {
 		my $s = ($wq ? $wq->{lei_sock} : undef) // $lei->{sock};
 		my $errfh = $lei->{2} // *STDERR{GLOB};
 		my @io = $s ? ($errfh, $s) : ($errfh);
-		eval { $lei->{sto}->wq_io_do('done', \@io) };
+		eval { $lei->{sto}->wq_io_do('barrier', \@io, 1) };
 	}
 	warn($@) if $@;
 }
diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm
index d003d983..c388f7dc 100644
--- a/lib/PublicInbox/LeiInput.pm
+++ b/lib/PublicInbox/LeiInput.pm
@@ -499,7 +499,7 @@ sub process_inputs {
 	}
 	# always commit first, even on error partial work is acceptable for
 	# lei <import|tag|convert>
-	$self->{lei}->sto_done_request;
+	$self->{lei}->sto_barrier_request;
 	$self->{lei}->fail($err) if $err;
 }
 
diff --git a/lib/PublicInbox/LeiRefreshMailSync.pm b/lib/PublicInbox/LeiRefreshMailSync.pm
index a60a9a5e..dde23274 100644
--- a/lib/PublicInbox/LeiRefreshMailSync.pm
+++ b/lib/PublicInbox/LeiRefreshMailSync.pm
@@ -60,7 +60,7 @@ sub input_path_url { # overrides PublicInbox::LeiInput::input_path_url
 			$self->folder_missing($$uri);
 		}
 	} else { die "BUG: $input not supported" }
-	$self->{lei}->sto_done_request;
+	$self->{lei}->sto_barrier_request;
 }
 
 sub lei_refresh_mail_sync {
diff --git a/lib/PublicInbox/LeiRemote.pm b/lib/PublicInbox/LeiRemote.pm
index ddcaf2c9..d6fc40a4 100644
--- a/lib/PublicInbox/LeiRemote.pm
+++ b/lib/PublicInbox/LeiRemote.pm
@@ -1,4 +1,4 @@
-# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # Make remote externals HTTP(S) inboxes behave like
@@ -51,7 +51,7 @@ sub mset {
 	$fh = IO::Uncompress::Gunzip->new($fh, MultiStream=>1, AutoClose=>1);
 	eval { PublicInbox::MboxReader->mboxrd($fh, \&each_mboxrd_eml, $self) };
 	my $err = $@ ? ": $@" : '';
-	my $wait = $self->{lei}->{sto}->wq_do('done');
+	my $wait = $self->{lei}->{sto}->wq_do('barrier');
 	$lei->child_error($?, "@$cmd failed$err") if $err || $?;
 	$self; # we are the mset (and $ibx, and $self)
 }
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 0df2352c..162c915f 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -81,7 +81,7 @@ sub importer {
 		delete $self->{im};
 		$im->done;
 		undef $im;
-		$self->checkpoint;
+		$self->barrier;
 		$max = $self->{priv_eidx}->{mg}->git_epochs + 1;
 	}
 	my (undef, $tl) = eidx_init($self); # acquire lock
@@ -118,7 +118,7 @@ sub cat_blob {
 
 sub schedule_commit {
 	my ($self, $sec) = @_;
-	add_uniq_timer($self->{priv_eidx}->{topdir}, $sec, \&done, $self);
+	add_uniq_timer($self->{priv_eidx}->{topdir}, $sec, \&barrier, $self);
 }
 
 # follows the stderr file
@@ -391,7 +391,7 @@ sub reindex_done {
 	my ($self) = @_;
 	my ($eidx, $tl) = eidx_init($self);
 	$eidx->git->async_wait_all;
-	# ->done to be called via sto_done_request
+	# ->done to be called via sto_barrier_request
 }
 
 sub add_eml {
@@ -571,11 +571,21 @@ sub set_xvmd {
 	sto_export_kw($self, $smsg->{num}, $vmd);
 }
 
-sub checkpoint {
-	my ($self, $wait) = @_;
-	$self->{im}->barrier if $self->{im};
+sub barrier {
+	my ($self) = @_;
+	my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_barrier_request
+	my @err;
+	if ($self->{im}) {
+		eval { $self->{im}->barrier };
+		push(@err, "E: import barrier: $@\n") if $@;
+	}
 	delete $self->{lms};
-	$self->{priv_eidx}->checkpoint($wait);
+	eval { $self->{priv_eidx}->barrier };
+	push(@err, "E: priv_eidx barrier: $@\n") if $@;
+	print { $errfh // \*STDERR } @err;
+	send($lei_sock, 'child_error 256', 0) if @err && $lei_sock;
+	xchg_stderr($self);
+	die @err if @err;
 }
 
 sub xchg_stderr {
@@ -594,7 +604,7 @@ sub xchg_stderr {
 
 sub done {
 	my ($self) = @_;
-	my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_done_request
+	my ($errfh, $lei_sock) = @$self{0, 1};
 	my @err;
 	if (my $im = delete($self->{im})) {
 		eval { $im->done };
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index dfae29e9..593547f6 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -724,8 +724,9 @@ sub post_augment {
 	my ($self, $lei, @args) = @_;
 	$self->{-au_noted}++ and $lei->qerr("# writing to $self->{dst} ...");
 
+	# FIXME: this synchronous wait can be slow w/ parallel callers
 	my $wait = $lei->{opt}->{'import-before'} ?
-			$lei->{sto}->wq_do('checkpoint', 1) : 0;
+			$lei->{sto}->wq_do('barrier') : 0;
 	# _post_augment_mbox
 	my $m = $self->can("_post_augment_$self->{base_type}") or return;
 	$m->($self, $lei, @args);
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index d4f34733..5a5a1adc 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -363,7 +363,7 @@ print STDERR $_;
 						$self, $lei, $each_smsg);
 		};
 		my ($exc, $code) = ($@, $?);
-		$lei->sto_done_request if delete($self->{-sto_imported});
+		$lei->sto_barrier_request if delete($self->{-sto_imported});
 		die "E: $exc" if $exc && !$code;
 		my $nr = delete $lei->{-nr_remote_eml} // 0;
 		if (!$code) { # don't update if no results, maybe MTA is down
@@ -399,7 +399,7 @@ sub query_done { # EOF callback for main daemon
 	delete $lei->{lxs};
 	($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
 		warn "BUG: {sto} missing with --mail-sync";
-	$lei->sto_done_request;
+	$lei->sto_barrier_request;
 	$lei->{ovv}->ovv_end($lei);
 	if ($l2m) { # close() calls LeiToMail reap_compress
 		$l2m->finish_output($lei);
diff --git a/t/lei-store-fail.t b/t/lei-store-fail.t
index c2f03148..1e83e383 100644
--- a/t/lei-store-fail.t
+++ b/t/lei-store-fail.t
@@ -39,7 +39,7 @@ EOM
 	lei_ok qw(q m:testmessage@example.com);
 	is($lei_out, "[null]\n", 'delayed commit is unindexed');
 
-	# make immediate ->sto_done_request fail from mboxrd import:
+	# make immediate ->sto_barrier_request fail from mboxrd import:
 	remove_tree("$ENV{HOME}/.local/share/lei/store");
 	# subsequent lei commands are undefined behavior,
 	# but we need to make sure the current lei command fails:

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH 3/4] lei/store: stop shard workers + cat-file on idle
  2024-04-16 20:56 [PATCH 0/4] lei parallelism fixes Eric Wong
  2024-04-16 20:56 ` [PATCH 1/4] v2 + lei/store: always wait for fast-import checkpoint Eric Wong
  2024-04-16 20:56 ` [PATCH 2/4] lei: use ->barrier to commit to lei/store Eric Wong
@ 2024-04-16 20:56 ` Eric Wong
  2024-04-17  9:34   ` [PATCH v2 " Eric Wong
  2024-04-16 20:56 ` [PATCH 4/4] lei: use async barrier for --import-before Eric Wong
  3 siblings, 1 reply; 7+ messages in thread
From: Eric Wong @ 2024-04-16 20:56 UTC (permalink / raw)
  To: meta

Schedule a timer to stop shard workers and the git-cat-file
process after a `barrier' command.  This allows us to save some
memory again when the lei-daemon is idle but preserves the fork
overhead reduction when issuing many commands in parallel or in
quick succession.
---
 lib/PublicInbox/LeiStore.pm | 46 ++++++++++++++++++-------------------
 1 file changed, 23 insertions(+), 23 deletions(-)

diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 162c915f..a054f649 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -571,21 +571,11 @@ sub set_xvmd {
 	sto_export_kw($self, $smsg->{num}, $vmd);
 }
 
-sub barrier {
+sub check_done {
 	my ($self) = @_;
-	my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_barrier_request
-	my @err;
-	if ($self->{im}) {
-		eval { $self->{im}->barrier };
-		push(@err, "E: import barrier: $@\n") if $@;
-	}
-	delete $self->{lms};
-	eval { $self->{priv_eidx}->barrier };
-	push(@err, "E: priv_eidx barrier: $@\n") if $@;
-	print { $errfh // \*STDERR } @err;
-	send($lei_sock, 'child_error 256', 0) if @err && $lei_sock;
-	xchg_stderr($self);
-	die @err if @err;
+	$self->git->_active ?
+		add_uniq_timer("$self-check_done", \&check_done, $self) :
+		done($self);
 }
 
 sub xchg_stderr {
@@ -602,23 +592,33 @@ sub xchg_stderr {
 	undef;
 }
 
-sub done {
-	my ($self) = @_;
-	my ($errfh, $lei_sock) = @$self{0, 1};
+sub _commit ($$) {
+	my ($self, $cmd) = @_; # cmd is 'done' or 'barrier'
+	my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_barrier_request
 	my @err;
-	if (my $im = delete($self->{im})) {
-		eval { $im->done };
-		push(@err, "E: import done: $@\n") if $@;
+	if ($self->{im}) {
+		eval { $self->{im}->$cmd };
+		push(@err, "E: import $cmd: $@\n") if $@;
 	}
 	delete $self->{lms};
-	eval { $self->{priv_eidx}->done }; # V2Writable::done
-	push(@err, "E: priv_eidx done: $@\n") if $@;
-	print { $errfh // *STDERR{GLOB} } @err;
+	eval { $self->{priv_eidx}->$cmd };
+	push(@err, "E: priv_eidx $cmd: $@\n") if $@;
+	print { $errfh // \*STDERR } @err;
 	send($lei_sock, 'child_error 256', 0) if @err && $lei_sock;
 	xchg_stderr($self);
 	die @err if @err;
+	# $lei_sock goes out-of-scope and script/lei can terminate
+}
+
+sub barrier {
+	my ($self) = @_;
+	_commit $self, 'barrier';
+	add_uniq_timer("$self-check_done", 5, \&check_done, $self);
+	undef;
 }
 
+sub done { _commit $_[0], 'done' }
+
 sub ipc_atfork_child {
 	my ($self) = @_;
 	my $lei = $self->{lei};

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH 4/4] lei: use async barrier for --import-before
  2024-04-16 20:56 [PATCH 0/4] lei parallelism fixes Eric Wong
                   ` (2 preceding siblings ...)
  2024-04-16 20:56 ` [PATCH 3/4] lei/store: stop shard workers + cat-file on idle Eric Wong
@ 2024-04-16 20:56 ` Eric Wong
  3 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2024-04-16 20:56 UTC (permalink / raw)
  To: meta

Write barriers can take a long time to finish, especially when
commands are issues in parallel.  So handle it asynchronously
without blocking lei-daemon by making EOFpipe a little more
flexible by supporting arguments to the callback function.

This is another step towards improving parallel use of lei.
---
 lib/PublicInbox/EOFpipe.pm    |  7 ++++---
 lib/PublicInbox/LeiToMail.pm  | 29 ++++++++++++++++++++++-------
 lib/PublicInbox/LeiXSearch.pm | 13 +++++++++----
 3 files changed, 35 insertions(+), 14 deletions(-)

diff --git a/lib/PublicInbox/EOFpipe.pm b/lib/PublicInbox/EOFpipe.pm
index 3474874f..77b699a2 100644
--- a/lib/PublicInbox/EOFpipe.pm
+++ b/lib/PublicInbox/EOFpipe.pm
@@ -7,8 +7,8 @@ use parent qw(PublicInbox::DS);
 use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT $F_SETPIPE_SZ);
 
 sub new {
-	my (undef, $rd, $cb) = @_;
-	my $self = bless { cb => $cb }, __PACKAGE__;
+	my (undef, $rd, @cb_args) = @_;
+	my $self = bless { cb_args => \@cb_args }, __PACKAGE__;
 	# 4096: page size
 	fcntl($rd, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
 	$self->SUPER::new($rd, EPOLLIN|EPOLLONESHOT);
@@ -17,7 +17,8 @@ sub new {
 sub event_step {
 	my ($self) = @_;
 	if ($self->do_read(my $buf, 1) == 0) { # auto-closed
-		$self->{cb}->();
+		my ($cb, @args) = @{delete $self->{cb_args}};
+		$cb->(@args);
 	}
 }
 
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 593547f6..5481b5e4 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -14,7 +14,7 @@ use PublicInbox::Import;
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
 use PublicInbox::Syscall qw(rename_noreplace);
-use autodie qw(open seek close);
+use autodie qw(pipe open seek close);
 use Carp qw(croak);
 
 my %kw2char = ( # Maildir characters
@@ -605,7 +605,7 @@ sub _pre_augment_mbox {
 			$lei->{dedupe} && $lei->{dedupe}->can('reset_dedupe');
 	}
 	if ($self->{zsfx} = PublicInbox::MboxReader::zsfx($dst)) {
-		pipe(my ($r, $w)) or die "pipe: $!";
+		pipe(my $r, my $w);
 		$lei->{zpipe} = [ $r, $w ];
 		$lei->{ovv}->{lock_path} and
 			die 'BUG: unexpected {ovv}->{lock_path}';
@@ -719,17 +719,32 @@ sub do_augment { # slow, runs in wq worker
 	$m->($self, $lei);
 }
 
+sub post_augment_call ($$$$) {
+	my ($self, $lei, $m, $post_augment_done) = @_;
+	eval { $m->($self, $lei) };
+	$lei->{post_augment_err} = $@ if $@; # for post_augment_done
+}
+
 # fast (spawn compressor or mkdir), runs in same process as pre_augment
 sub post_augment {
-	my ($self, $lei, @args) = @_;
+	my ($self, $lei, $post_augment_done) = @_;
 	$self->{-au_noted}++ and $lei->qerr("# writing to $self->{dst} ...");
 
-	# FIXME: this synchronous wait can be slow w/ parallel callers
-	my $wait = $lei->{opt}->{'import-before'} ?
-			$lei->{sto}->wq_do('barrier') : 0;
 	# _post_augment_mbox
 	my $m = $self->can("_post_augment_$self->{base_type}") or return;
-	$m->($self, $lei, @args);
+
+	# --import-before is only for lei-(q|lcat), not lei-convert
+	$lei->{opt}->{'import-before'} or
+		return post_augment_call $self, $lei, $m, $post_augment_done;
+
+	# we can't deal with post_augment until import-before commits:
+	require PublicInbox::EOFpipe;
+	my @io = @$lei{qw(2 sock)};
+	pipe(my $r, $io[2]);
+	PublicInbox::EOFpipe->new($r, \&post_augment_call,
+				$self, $lei, $m, $post_augment_done);
+	$lei->{sto}->wq_io_do('barrier', \@io);
+	# _post_augment_* && post_augment_done run when barrier is complete
 }
 
 # called by every single l2m worker process
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 5a5a1adc..43dedd10 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -22,6 +22,7 @@ use PublicInbox::ContentHash qw(git_sha);
 use POSIX qw(strftime);
 use autodie qw(close open read seek truncate);
 use PublicInbox::Syscall qw($F_SETPIPE_SZ);
+use PublicInbox::OnDestroy;
 
 sub new {
 	my ($class) = @_;
@@ -428,11 +429,9 @@ sub query_done { # EOF callback for main daemon
 	$lei->dclose;
 }
 
-sub do_post_augment {
+sub post_augment_done { # via on_destroy in top-level lei-daemon
 	my ($lei) = @_;
-	my $l2m = $lei->{l2m} or return; # client disconnected
-	eval { $l2m->post_augment($lei) };
-	my $err = $@;
+	my $err = delete $lei->{post_augment_err};
 	if ($err) {
 		if (my $lxs = delete $lei->{lxs}) {
 			$lxs->wq_kill(-POSIX::SIGTERM());
@@ -447,6 +446,12 @@ sub do_post_augment {
 	close(delete $lei->{au_done}); # trigger wait_startq if start_mua didn't
 }
 
+sub do_post_augment {
+	my ($lei) = @_;
+	my $l2m = $lei->{l2m} or return; # client disconnected
+	$l2m->post_augment($lei, on_destroy(\&post_augment_done, $lei));
+}
+
 sub incr_post_augment { # called whenever an l2m shard finishes augment
 	my ($lei) = @_;
 	my $l2m = $lei->{l2m} or return; # client disconnected

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH v2 3/4] lei/store: stop shard workers + cat-file on idle
  2024-04-16 20:56 ` [PATCH 3/4] lei/store: stop shard workers + cat-file on idle Eric Wong
@ 2024-04-17  9:34   ` Eric Wong
  2024-04-17  9:54     ` sub prototypes aren't enough Eric Wong
  0 siblings, 1 reply; 7+ messages in thread
From: Eric Wong @ 2024-04-17  9:34 UTC (permalink / raw)
  To: meta

Schedule a timer to stop shard workers and the git-cat-file
process after a `barrier' command.  This allows us to save some
memory again when the lei-daemon is idle but preserves the fork
overhead reduction when issuing many commands in parallel or in
quick succession.
---
  v2 fixes an incorrect call to add_uniq_timer.  Sometimes I wish Perl
  could have more static type||arg checking, but it's probably still
  better than other scripting languages...

Interdiff against v1:
  diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
  index a054f649..b2da2bc3 100644
  --- a/lib/PublicInbox/LeiStore.pm
  +++ b/lib/PublicInbox/LeiStore.pm
  @@ -574,7 +574,7 @@ sub set_xvmd {
   sub check_done {
   	my ($self) = @_;
   	$self->git->_active ?
  -		add_uniq_timer("$self-check_done", \&check_done, $self) :
  +		add_uniq_timer("$self-check_done", 5, \&check_done, $self) :
   		done($self);
   }
   

 lib/PublicInbox/LeiStore.pm | 46 ++++++++++++++++++-------------------
 1 file changed, 23 insertions(+), 23 deletions(-)

diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 162c915f..b2da2bc3 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -571,21 +571,11 @@ sub set_xvmd {
 	sto_export_kw($self, $smsg->{num}, $vmd);
 }
 
-sub barrier {
+sub check_done {
 	my ($self) = @_;
-	my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_barrier_request
-	my @err;
-	if ($self->{im}) {
-		eval { $self->{im}->barrier };
-		push(@err, "E: import barrier: $@\n") if $@;
-	}
-	delete $self->{lms};
-	eval { $self->{priv_eidx}->barrier };
-	push(@err, "E: priv_eidx barrier: $@\n") if $@;
-	print { $errfh // \*STDERR } @err;
-	send($lei_sock, 'child_error 256', 0) if @err && $lei_sock;
-	xchg_stderr($self);
-	die @err if @err;
+	$self->git->_active ?
+		add_uniq_timer("$self-check_done", 5, \&check_done, $self) :
+		done($self);
 }
 
 sub xchg_stderr {
@@ -602,23 +592,33 @@ sub xchg_stderr {
 	undef;
 }
 
-sub done {
-	my ($self) = @_;
-	my ($errfh, $lei_sock) = @$self{0, 1};
+sub _commit ($$) {
+	my ($self, $cmd) = @_; # cmd is 'done' or 'barrier'
+	my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_barrier_request
 	my @err;
-	if (my $im = delete($self->{im})) {
-		eval { $im->done };
-		push(@err, "E: import done: $@\n") if $@;
+	if ($self->{im}) {
+		eval { $self->{im}->$cmd };
+		push(@err, "E: import $cmd: $@\n") if $@;
 	}
 	delete $self->{lms};
-	eval { $self->{priv_eidx}->done }; # V2Writable::done
-	push(@err, "E: priv_eidx done: $@\n") if $@;
-	print { $errfh // *STDERR{GLOB} } @err;
+	eval { $self->{priv_eidx}->$cmd };
+	push(@err, "E: priv_eidx $cmd: $@\n") if $@;
+	print { $errfh // \*STDERR } @err;
 	send($lei_sock, 'child_error 256', 0) if @err && $lei_sock;
 	xchg_stderr($self);
 	die @err if @err;
+	# $lei_sock goes out-of-scope and script/lei can terminate
+}
+
+sub barrier {
+	my ($self) = @_;
+	_commit $self, 'barrier';
+	add_uniq_timer("$self-check_done", 5, \&check_done, $self);
+	undef;
 }
 
+sub done { _commit $_[0], 'done' }
+
 sub ipc_atfork_child {
 	my ($self) = @_;
 	my $lei = $self->{lei};

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* sub prototypes aren't enough...
  2024-04-17  9:34   ` [PATCH v2 " Eric Wong
@ 2024-04-17  9:54     ` Eric Wong
  0 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2024-04-17  9:54 UTC (permalink / raw)
  To: meta

Eric Wong <e@80x24.org> wrote:
>   v2 fixes an incorrect call to add_uniq_timer.  Sometimes I wish Perl
>   could have more static type||arg checking, but it's probably still
>   better than other scripting languages...

Fwiw, this would work for all current callers, AFAIK:

diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index a6fec954..52b89247 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -117,9 +117,9 @@ sub _add_named_timer {
 	die "Shouldn't get here.";
 }
 
-sub add_timer { _add_named_timer(undef, @_) }
+sub add_timer ($&;@) { _add_named_timer(undef, @_) }
 
-sub add_uniq_timer { # ($name, $secs, $coderef, @args) = @_;
+sub add_uniq_timer ($$&;@) { # ($name, $secs, $coderef, @args) = @_;
 	$UniqTimer{$_[0]} //= _add_named_timer(@_);
 }
 
... But the above falls short if somebody were to pass a scalar which
references CODE:

my $foo = sub {};
add_timer 5, $foo; # this valid code fails with the above patch
add_timer 5, sub {}; # this works as expected

So AFAIK Perl has no way to detect argument type bugs like this
reliably at compile time.

^ permalink raw reply related	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2024-04-17  9:54 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2024-04-16 20:56 [PATCH 0/4] lei parallelism fixes Eric Wong
2024-04-16 20:56 ` [PATCH 1/4] v2 + lei/store: always wait for fast-import checkpoint Eric Wong
2024-04-16 20:56 ` [PATCH 2/4] lei: use ->barrier to commit to lei/store Eric Wong
2024-04-16 20:56 ` [PATCH 3/4] lei/store: stop shard workers + cat-file on idle Eric Wong
2024-04-17  9:34   ` [PATCH v2 " Eric Wong
2024-04-17  9:54     ` sub prototypes aren't enough Eric Wong
2024-04-16 20:56 ` [PATCH 4/4] lei: use async barrier for --import-before Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).