unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
* [PATCH 0/3] solver: async for external processes
@ 2024-08-29 23:26 Eric Wong
  2024-08-29 23:26 ` [PATCH 1/3] solver: use async check (`info') for coderepo Eric Wong
                   ` (2 more replies)
  0 siblings, 3 replies; 4+ messages in thread
From: Eric Wong @ 2024-08-29 23:26 UTC (permalink / raw)
  To: meta

More steps towards mitigating the pathological effects of slow seeks
and ultra-expensive Xapian searches...

1/3 introduces some nastiness in stderr handling, but I care
more about avoiding pathological slowdowns than correct UI
hints.

The rest are pretty straightforward...

Our internal APIs could be nicer for dealing with sync vs.
async usages, but that's for another series.

Eric Wong (3):
  solver: use async check (`info') for coderepo
  solver: use xap_helper for async search if available
  solver: use async_check for the temporary git repo

 lib/PublicInbox/Git.pm       |   2 +-
 lib/PublicInbox/LeiRemote.pm |   7 ++
 lib/PublicInbox/SolverGit.pm | 232 ++++++++++++++++++++---------------
 3 files changed, 142 insertions(+), 99 deletions(-)

^ permalink raw reply	[flat|nested] 4+ messages in thread

* [PATCH 1/3] solver: use async check (`info') for coderepo
  2024-08-29 23:26 [PATCH 0/3] solver: async for external processes Eric Wong
@ 2024-08-29 23:26 ` Eric Wong
  2024-08-29 23:26 ` [PATCH 2/3] solver: use xap_helper for async search if available Eric Wong
  2024-08-29 23:26 ` [PATCH 3/3] solver: use async_check for the temporary git repo Eric Wong
  2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2024-08-29 23:26 UTC (permalink / raw)
  To: meta

Async --batch-check or `info' batch commands allow our Perl
process to handle other requests while git is busy waiting
on slow storage or CPUs to retrieve blob information.

This improves parallelism for SMP machines in addition to
allowing the Perl process to service other HTTP/NNTP/IMAP/POP3
requests while waiting for disk seeks, zlib inflation, and
delta resolution.

Checking stderr for error hints is now potentially racy, but
it's only a hint so overall performance under worst case
scenarios is preferable to correctness.
---
 lib/PublicInbox/Git.pm       |   2 +-
 lib/PublicInbox/SolverGit.pm | 134 +++++++++++++++++++----------------
 2 files changed, 75 insertions(+), 61 deletions(-)

diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index dd0a14e9..c43f9863 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -664,7 +664,7 @@ sub watch_async ($) {
 
 sub close {
 	my ($self) = @_;
-	delete @$self{qw(-bc err_c inflight)};
+	delete @$self{qw(-bc err_c inflight -prev_oids)};
 	delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock});
 	# gcf_drain may run from PublicInbox::IO::DESTROY
 }
diff --git a/lib/PublicInbox/SolverGit.pm b/lib/PublicInbox/SolverGit.pm
index 898ca72d..4d087eab 100644
--- a/lib/PublicInbox/SolverGit.pm
+++ b/lib/PublicInbox/SolverGit.pm
@@ -75,40 +75,84 @@ sub ERR ($$) {
 	die $err;
 }
 
-# look for existing objects already in git repos, returns arrayref
-# if found, number of remaining git coderepos to try if not.
-sub solve_existing ($$) {
-	my ($self, $want) = @_;
-	my $try = $want->{try_gits} //= [ @{$self->{gits}} ]; # array copy
-	my $git = shift @$try or die 'BUG {try_gits} empty';
-	my $oid_b = $want->{oid_b};
-
-	# can't use async_check due to last_check_err :<
-	my ($oid_full, $type, $size) = $git->check($oid_b);
-	$git->schedule_cleanup if $self->{psgi_env}->{'pi-httpd.async'};
+sub ck_existing_cb { # async_check cb
+	my (undef, $oid_full, $type, $size, $arg) = @_;
+	my ($self, $want) = @$arg;
+
+	# git <2.21 would show `dangling' (2.21+ shows `ambiguous')
+	# https://public-inbox.org/git/20190118033845.s2vlrb3wd3m2jfzu@dcvr/T/
+	if ($type =~ /\A(?:missing|ambiguous)\z/ ||
+				($oid_full//'') eq 'dangling') {
+		undef $oid_full;
+		undef $type;
+	}
+	my $try = $want->{try_gits};
+	my $git = shift @$try // die 'BUG: no {try_gits}';
 
-	if ($oid_b eq ($oid_full // '') || (defined($type) &&
+	if ($want->{oid_b} eq ($oid_full // '') || (defined($type) &&
 				(!$self->{have_hints} || $type eq 'blob'))) {
 		delete $want->{try_gits};
-		return [ $git, $oid_full, $type, int($size) ]; # done, success
-	}
+		dbg($self, "found $want->{oid_b} in " .
+			join(" ||\n\t", $git->pub_urls($self->{psgi_env})));
 
-	# TODO: deal with 40-char "abbreviations" with future SHA-256 git
-	return scalar(@$try) if length($oid_b) >= 40;
+		my $existing = [ $git, $oid_full, $type, $size + 0 ];
+		if ($want->{oid_b} eq $self->{oid_want} || $type ne 'blob') {
+			eval { done($self, $existing) };
+			die "E: $@" if $@;
+			return;
+		}
+		mark_found($self, $want->{oid_b}, $existing);
+		return next_step($self); # onto patch application
+	}
 
-	# parse stderr of "git cat-file --batch-check"
+	# parse stderr of "git cat-file --batch-check", async means
+	# we may end up slurping output from other requests:
 	my $err = $git->last_check_err;
-	my (@oids) = ($err =~ /\b([a-f0-9]{40,})\s+blob\b/g);
-	return scalar(@$try) unless scalar(@oids);
-
-	# TODO: do something with the ambiguous array?
-	# push @ambiguous, [ $git, @oids ];
-
-	dbg($self, "`$oid_b' ambiguous in " .
+	my $prev_oids = $git->{-prev_oids} //= [];
+	my @any_oids = ($err =~ /\b([a-f0-9]{40,})\s+blob\b/g);
+	push @$prev_oids, @any_oids;
+	my $oid_b_re = qr/\A$want->{oid_b}/;
+	my @ambig_oids = grep /$oid_b_re/, @$prev_oids;
+	@$prev_oids = grep !/$oid_b_re/, @$prev_oids;
+	delete $git->{-prev_oids} unless @$prev_oids;
+	@ambig_oids and dbg($self, "`$want->{oid_b}' ambiguous in " .
 			join("\n\t", $git->pub_urls($self->{psgi_env}))
 			. "\n" .
-			join('', map { "$_ blob\n" } @oids));
-	scalar(@$try);
+			join('', map { "$_ blob\n" } @ambig_oids));
+
+	return retry_current($self, $want) if @$try;
+
+	# we may retry if inbox scan (below) fails
+	delete $want->{try_gits};
+
+	# scan through inboxes to look for emails which results in
+	# the oid we want:
+	my $ibx = shift(@{$want->{try_ibxs}}) or return done($self, undef);
+	if (my $msgs = find_smsgs($self, $ibx, $want)) {
+		$want->{try_smsgs} = $msgs;
+		$want->{cur_ibx} = $ibx;
+		$self->{tmp_diffs} = [];
+		return retry_current($self, $want);
+	}
+	try_harder($self, $want);
+}
+
+# look for existing objects already in git repos, returns arrayref
+# if found, number of remaining git coderepos to try if not.
+sub solve_existing ($$) {
+	my ($self, $want) = @_;
+	my $try = $want->{try_gits} //= [ @{$self->{gits}} ]; # array copy
+	my $git = $try->[0] // die 'BUG {try_gits} empty';
+
+	if ($self->{psgi_env}->{'pi-httpd.async'}) {
+		async_check({ git => $git }, $want->{oid_b},
+				\&ck_existing_cb, [ $self, $want ]);
+		$git->schedule_cleanup;
+	} else {
+		my ($oid_full, $type, $size) = $git->check($want->{oid_b});
+		$type //= 'missing';
+		ck_existing_cb(undef, $oid_full, $type, $size, [ $self, $want ])
+	}
 }
 
 sub _tmp {
@@ -574,7 +618,7 @@ sub extract_diffs_done {
 	try_harder($self, $want);
 }
 
-sub extract_diff_async {
+sub extract_diff_async { # cat_async cb
 	my ($bref, $oid, $type, $size, $x) = @_;
 	my ($self, $want, $smsg) = @$x;
 	if (defined($oid)) {
@@ -590,7 +634,6 @@ sub extract_diff_async {
 sub resolve_patch ($$) {
 	my ($self, $want) = @_;
 
-	my $cur_want = $want->{oid_b};
 	if (scalar(@{$self->{patches}}) > $MAX_PATCH) {
 		die "Aborting, too many steps to $self->{oid_want}";
 	}
@@ -613,40 +656,11 @@ sub resolve_patch ($$) {
 	}
 
 	# see if we can find the blob in an existing git repo:
-	if (!$want->{try_ibxs} && $self->{seen_oid}->{$cur_want}++) {
-		die "Loop detected solving $cur_want\n";
+	if (!$want->{try_ibxs} && $self->{seen_oid}->{$want->{oid_b}}++) {
+		die "Loop detected solving $want->{oid_b}\n";
 	}
 	$want->{try_ibxs} //= [ @{$self->{inboxes}} ]; # array copy
-	my $existing = solve_existing($self, $want);
-	if (ref $existing) {
-		my ($found_git, undef, $type, undef) = @$existing;
-		dbg($self, "found $cur_want in " .
-			join(" ||\n\t",
-				$found_git->pub_urls($self->{psgi_env})));
-
-		if ($cur_want eq $self->{oid_want} || $type ne 'blob') {
-			eval { done($self, $existing) };
-			die "E: $@" if $@;
-			return;
-		}
-		mark_found($self, $cur_want, $existing);
-		return next_step($self); # onto patch application
-	} elsif ($existing > 0) {
-		return retry_current($self, $want);
-	} else { # $existing == 0: we may retry if inbox scan (below) fails
-		delete $want->{try_gits};
-	}
-
-	# scan through inboxes to look for emails which results in
-	# the oid we want:
-	my $ibx = shift(@{$want->{try_ibxs}}) or return done($self, undef);
-	if (my $msgs = find_smsgs($self, $ibx, $want)) {
-		$want->{try_smsgs} = $msgs;
-		$want->{cur_ibx} = $ibx;
-		$self->{tmp_diffs} = [];
-		return retry_current($self, $want);
-	}
-	try_harder($self, $want);
+	solve_existing($self, $want);
 }
 
 # this API is designed to avoid creating self-referential structures;

^ permalink raw reply related	[flat|nested] 4+ messages in thread

* [PATCH 2/3] solver: use xap_helper for async search if available
  2024-08-29 23:26 [PATCH 0/3] solver: async for external processes Eric Wong
  2024-08-29 23:26 ` [PATCH 1/3] solver: use async check (`info') for coderepo Eric Wong
@ 2024-08-29 23:26 ` Eric Wong
  2024-08-29 23:26 ` [PATCH 3/3] solver: use async_check for the temporary git repo Eric Wong
  2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2024-08-29 23:26 UTC (permalink / raw)
  To: meta

The async search API using xap_helper allows -httpd/netd users
to exploit storage and CPU-level parallelism via sockets.  It is
another step towards reducing head-of-line blocking in our Perl
event loop.  This reduces the effect of slow storage and extremely
large search results on unrelated HTTP requests.
---
 lib/PublicInbox/LeiRemote.pm |  7 +++++
 lib/PublicInbox/SolverGit.pm | 59 +++++++++++++++++++-----------------
 2 files changed, 39 insertions(+), 27 deletions(-)

diff --git a/lib/PublicInbox/LeiRemote.pm b/lib/PublicInbox/LeiRemote.pm
index d6fc40a4..71aef809 100644
--- a/lib/PublicInbox/LeiRemote.pm
+++ b/lib/PublicInbox/LeiRemote.pm
@@ -56,6 +56,13 @@ sub mset {
 	$self; # we are the mset (and $ibx, and $self)
 }
 
+# fake support for async API
+sub async_mset {
+	my ($self, $qstr, undef, $cb, @arg) = @_; # $opt ($_[2]) ignored
+	$cb->(@arg, mset($self, $qstr));
+	undef;
+}
+
 sub size { scalar @{$_[0]->{smsg}} } # size of previous results
 
 sub mset_to_smsg {
diff --git a/lib/PublicInbox/SolverGit.pm b/lib/PublicInbox/SolverGit.pm
index 4d087eab..6dd222a9 100644
--- a/lib/PublicInbox/SolverGit.pm
+++ b/lib/PublicInbox/SolverGit.pm
@@ -128,11 +128,38 @@ sub ck_existing_cb { # async_check cb
 	# scan through inboxes to look for emails which results in
 	# the oid we want:
 	my $ibx = shift(@{$want->{try_ibxs}}) or return done($self, undef);
-	if (my $msgs = find_smsgs($self, $ibx, $want)) {
-		$want->{try_smsgs} = $msgs;
-		$want->{cur_ibx} = $ibx;
-		$self->{tmp_diffs} = [];
-		return retry_current($self, $want);
+
+	# maybe another inbox has it
+	my $srch = $ibx->isrch or return try_harder($self, $want);
+
+	my $post = $want->{oid_b} or die 'BUG: no {oid_b}';
+	$post =~ /\A[a-f0-9]+\z/ or die "BUG: oid_b not hex: $post";
+	my ($q, $pre)= ("dfpost:$post", $want->{oid_a});
+	$q .= " dfpre:$pre" if defined $pre && $pre =~ /\A[a-f0-9]+\z/;
+	my $path_b = $want->{path_b};
+	if (path_searchable($path_b)) {
+		$q .= filename_query($path_b);
+
+		my $path_a = $want->{path_a};
+		(path_searchable($path_a) && $path_a ne $path_b) and
+			$q .= filename_query($path_a);
+	}
+	$srch->async_mset($q, { relevance => 1 },
+			\&find_results, $self, $want, $ibx);
+}
+
+sub find_results { # async_mset cb
+	my ($self, $want, $ibx, $mset, $exc) = @_;
+	if ($mset && $mset->size) {
+		if (my $srch = $ibx->isrch) {
+			my $msgs = $srch->mset_to_smsg($ibx, $mset);
+			$want->{try_smsgs} = $msgs;
+			$want->{cur_ibx} = $ibx;
+			$self->{tmp_diffs} = [];
+			return retry_current($self, $want);
+		}
+		my $dir = $ibx->{inboxdir} // $ibx->{topdir};
+		warn 'W: ', $dir, " search disappeared, skipping\n";
 	}
 	try_harder($self, $want);
 }
@@ -275,28 +302,6 @@ sub filename_query ($) {
 
 sub find_smsgs ($$$) {
 	my ($self, $ibx, $want) = @_;
-	my $srch = $ibx->isrch or return;
-
-	my $post = $want->{oid_b} or die 'BUG: no {oid_b}';
-	$post =~ /\A[a-f0-9]+\z/ or die "BUG: oid_b not hex: $post";
-
-	my $q = "dfpost:$post";
-	my $pre = $want->{oid_a};
-	if (defined $pre && $pre =~ /\A[a-f0-9]+\z/) {
-		$q .= " dfpre:$pre";
-	}
-
-	my $path_b = $want->{path_b};
-	if (path_searchable($path_b)) {
-		$q .= filename_query($path_b);
-
-		my $path_a = $want->{path_a};
-		if (path_searchable($path_a) && $path_a ne $path_b) {
-			$q .= filename_query($path_a);
-		}
-	}
-	my $mset = $srch->mset($q, { relevance => 1 });
-	$mset->size ? $srch->mset_to_smsg($ibx, $mset) : undef;
 }
 
 sub update_index_result ($$) {

^ permalink raw reply related	[flat|nested] 4+ messages in thread

* [PATCH 3/3] solver: use async_check for the temporary git repo
  2024-08-29 23:26 [PATCH 0/3] solver: async for external processes Eric Wong
  2024-08-29 23:26 ` [PATCH 1/3] solver: use async check (`info') for coderepo Eric Wong
  2024-08-29 23:26 ` [PATCH 2/3] solver: use xap_helper for async search if available Eric Wong
@ 2024-08-29 23:26 ` Eric Wong
  2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2024-08-29 23:26 UTC (permalink / raw)
  To: meta

While the temporary git repo is likely in cache and not
subject to high seek latency as normal code repos are,
inflating objects still takes a non-trivial amount of time.
So use this as an opportunity to serve other clients and
exploit parallelism in SMP systems.
---
 lib/PublicInbox/SolverGit.pm | 49 ++++++++++++++++++++++++------------
 1 file changed, 33 insertions(+), 16 deletions(-)

diff --git a/lib/PublicInbox/SolverGit.pm b/lib/PublicInbox/SolverGit.pm
index 6dd222a9..d50a2fd1 100644
--- a/lib/PublicInbox/SolverGit.pm
+++ b/lib/PublicInbox/SolverGit.pm
@@ -385,21 +385,28 @@ EOF
 
 sub do_finish ($) {
 	my ($self) = @_;
-	my ($found, $oid_want) = @$self{qw(found oid_want)};
-	if (my $exists = $found->{$oid_want}) {
+	my $oid_want = $self->{oid_want};
+	if (my $exists = $self->{found}->{$oid_want}) {
 		return done($self, $exists);
 	}
 
 	# let git disambiguate if oid_want was too short,
 	# but long enough to be unambiguous:
-	my $tmp_git = $self->{tmp_git};
-	if (my @res = $tmp_git->check($oid_want)) {
-		return done($self, $found->{$res[0]});
-	}
-	if (my $err = $tmp_git->last_check_err) {
-		dbg($self, $err);
+	if ($self->{psgi_env}->{'pi-httpd.async'}) {
+		async_check { git => $self->{tmp_git} }, $oid_want,
+				\&finish_cb, $self
+	} else {
+		my ($hex, $type) = $self->{tmp_git}->check($oid_want);
+		finish_cb(undef, $hex, $type // 'missing', undef, $self)
 	}
-	done($self, undef);
+}
+
+sub finish_cb { # async_check cb
+	my (undef, $oid_full, $type, undef, $self) = @_;
+	return done $self, $self->{found}->{$oid_full} if $type eq 'blob';
+	my $err = $self->{tmp_git}->last_check_err;
+	dbg $self, $err if $err;
+	done $self, undef;
 }
 
 sub event_step ($) {
@@ -456,9 +463,10 @@ sub mark_found ($$$) {
 
 sub parse_ls_files ($$) {
 	my ($self, $bref) = @_;
-	my ($qsp_err, $di) = delete @$self{qw(-qsp_err -cur_di)};
+	my $qsp_err = delete $self->{-qsp_err};
 	die "git ls-files -s -z error:$qsp_err" if $qsp_err;
 
+	my $di = $self->{-cur_di};
 	my @ls = split(/\0/, $$bref);
 	my ($line, @extra) = grep(/\t\Q$di->{path_b}\E\z/, @ls);
 	scalar(@extra) and die "BUG: extra files in index: <",
@@ -469,14 +477,23 @@ sub parse_ls_files ($$) {
 	$file eq $di->{path_b} or die
 "BUG: index mismatch: file=$file != path_b=$di->{path_b}";
 
+	dbg($self, "index at:\n$mode_b $oid_b_full\t$file");
 	my $tmp_git = $self->{tmp_git} or die 'no git working tree';
-	my (undef, undef, $size) = $tmp_git->check($oid_b_full);
-	defined($size) or die "check $oid_b_full failed";
+	if ($self->{psgi_env}->{'pi-httpd.async'}) {
+		async_check { git => $tmp_git }, $oid_b_full,
+			\&ck_size_cb, $self
+	} else {
+		ck_size_cb(undef, $tmp_git->check($oid_b_full), $self);
+	}
+}
 
-	dbg($self, "index at:\n$mode_b $oid_b_full\t$file");
-	my $created = [ $tmp_git, $oid_b_full, 'blob', $size, $di ];
-	mark_found($self, $di->{oid_b}, $created);
-	next_step($self); # onto the next patch
+sub ck_size_cb { # async_check cb
+	my (undef, $oid_b_full, undef, $size, $self) = @_;
+	$size // die "check $oid_b_full failed";
+	my $di = delete $self->{-cur_di} // die 'BUG: no -cur_di';
+	my $created = [ $self->{tmp_git}, $oid_b_full, 'blob', $size, $di ];
+	mark_found $self, $di->{oid_b}, $created;
+	next_step $self; # onto the next patch
 }
 
 sub ls_files_result {

^ permalink raw reply related	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2024-08-29 23:26 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2024-08-29 23:26 [PATCH 0/3] solver: async for external processes Eric Wong
2024-08-29 23:26 ` [PATCH 1/3] solver: use async check (`info') for coderepo Eric Wong
2024-08-29 23:26 ` [PATCH 2/3] solver: use xap_helper for async search if available Eric Wong
2024-08-29 23:26 ` [PATCH 3/3] solver: use async_check for the temporary git repo Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).