unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
* [PATCH 0/3] git: use Unix stream sockets for --batch*
@ 2023-09-30 15:20 Eric Wong
  2023-09-30 15:20 ` [PATCH 1/3] git: decouple cat_async_retry from POSIX pipe semantics Eric Wong
                   ` (2 more replies)
  0 siblings, 3 replies; 4+ messages in thread
From: Eric Wong @ 2023-09-30 15:20 UTC (permalink / raw)
  To: meta

This simplifies git process management greatly, and the diffstat
makes me happy.

Eric Wong (3):
  git: decouple cat_async_retry from POSIX pipe semantics
  git: use Unix stream sockets for `cat-file --batch-*'
  git+gcf2client: switch to level-triggered wakeups

 lib/PublicInbox/Gcf2Client.pm  |  59 ++------
 lib/PublicInbox/Git.pm         | 269 +++++++++++++++++----------------
 lib/PublicInbox/GitAsyncCat.pm |  98 +-----------
 lib/PublicInbox/LeiToMail.pm   |   2 +-
 lib/PublicInbox/ViewVCS.pm     |   2 +-
 5 files changed, 158 insertions(+), 272 deletions(-)

^ permalink raw reply	[flat|nested] 4+ messages in thread

* [PATCH 1/3] git: decouple cat_async_retry from POSIX pipe semantics
  2023-09-30 15:20 [PATCH 0/3] git: use Unix stream sockets for --batch* Eric Wong
@ 2023-09-30 15:20 ` Eric Wong
  2023-09-30 15:20 ` [PATCH 2/3] git: use Unix stream sockets for `cat-file --batch-*' Eric Wong
  2023-09-30 15:20 ` [PATCH 3/3] git+gcf2client: switch to level-triggered wakeups Eric Wong
  2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2023-09-30 15:20 UTC (permalink / raw)
  To: meta

While pipes guarantee writes of <= 512 bytes to be atomic,
Unix stream sockets (or TCP sockets) have no such guarantees.
Removing the pipe assumption will make it possible for us to
switch to bidirectional Unix stream sockets and save FDs with
`git cat-file' processes as we have with Gcf2Client.  The
performance benefit of larger pipe buffers over stream sockets
isn't irrelevant when interacting with git as it is with
SearchIdx shards.
---
 lib/PublicInbox/Git.pm | 20 ++++++++------------
 1 file changed, 8 insertions(+), 12 deletions(-)

diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index eb88aa48..8ac40d2b 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -223,25 +223,21 @@ sub my_readline ($$) {
 }
 
 sub cat_async_retry ($$) {
-	my ($self, $inflight) = @_;
+	my ($self, $old_inflight) = @_;
 
 	# {inflight} may be non-existent, but if it isn't we delete it
 	# here to prevent cleanup() from waiting:
 	delete $self->{inflight};
 	cleanup($self);
+	batch_prepare($self, my $new_inflight = []);
 
-	batch_prepare($self, $inflight);
-	my $buf = '';
-	for (my $i = 0; $i < @$inflight; $i += 3) {
-		$buf .= "$inflight->[$i]\n";
+	while (my ($oid, $cb, $arg) = splice(@$old_inflight, 0, 3)) {
+		write_all($self, $self->{out}, $oid."\n",
+				\&cat_async_step, $new_inflight);
+		$oid = \$oid if !@$new_inflight; # to indicate oid retried
+		push @$new_inflight, $oid, $cb, $arg;
 	}
-	$self->{out}->blocking(1); # brand new pipe, should never block
-	print { $self->{out} } $buf or $self->fail("write error: $!");
-	$self->{out}->blocking(0);
-	my $req = shift @$inflight;
-	unshift(@$inflight, \$req); # \$ref to indicate retried
-
-	cat_async_step($self, $inflight); # take one step
+	cat_async_step($self, $new_inflight); # take one step
 }
 
 # returns true if prefetch is successful

^ permalink raw reply related	[flat|nested] 4+ messages in thread

* [PATCH 2/3] git: use Unix stream sockets for `cat-file --batch-*'
  2023-09-30 15:20 [PATCH 0/3] git: use Unix stream sockets for --batch* Eric Wong
  2023-09-30 15:20 ` [PATCH 1/3] git: decouple cat_async_retry from POSIX pipe semantics Eric Wong
@ 2023-09-30 15:20 ` Eric Wong
  2023-09-30 15:20 ` [PATCH 3/3] git+gcf2client: switch to level-triggered wakeups Eric Wong
  2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2023-09-30 15:20 UTC (permalink / raw)
  To: meta

The benefit of 1MB potential pipe buffer size (on Linux) doesn't
seem noticeable when reading from git (unlike when writing to v2
shards), so Unix stream sockets seem fine, here.

This allows us to simplify our process management by using the
same socket FD for reads and writes and enables us to use our
ProcessPipe class for reaping (as we can do with Gcf2Client).

Gcf2Client no longer relies on PublicInbox::DS for write
buffering, and instead just waits for requests to complete
once the number of inflight requests hits the MAX_INFLIGHT
threshold as we do with PublicInbox::Git.

We reuse the existing MAX_INFLIGHT limit (18) that was
determined by the minimum allowed PIPE_BUF (512).  (AFAIK) Unix
stream sockets have no analogy to PIPE_BUF, but all *BSDs and
Linux I've checked have default SO_RCVBUF and SO_SNDBUF values
larger than the previously-required PIPE_BUF size of 512 bytes.
---
 lib/PublicInbox/Gcf2Client.pm  |  57 ++------
 lib/PublicInbox/Git.pm         | 254 +++++++++++++++++----------------
 lib/PublicInbox/GitAsyncCat.pm |  98 +------------
 lib/PublicInbox/LeiToMail.pm   |   2 +-
 lib/PublicInbox/ViewVCS.pm     |   2 +-
 5 files changed, 151 insertions(+), 262 deletions(-)

diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index a49e2aad..8f442787 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -3,14 +3,15 @@
 
 # connects public-inbox processes to PublicInbox::Gcf2::loop()
 package PublicInbox::Gcf2Client;
-use strict;
+use v5.12;
 use parent qw(PublicInbox::DS);
 use PublicInbox::Git;
 use PublicInbox::Gcf2; # fails if Inline::C or libgit2-dev isn't available
 use PublicInbox::Spawn qw(spawn);
 use Socket qw(AF_UNIX SOCK_STREAM);
 use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
-use PublicInbox::DS qw(awaitpid);
+use PublicInbox::ProcessPipe;
+
 # fields:
 #	sock => socket to Gcf2::loop
 # The rest of these fields are compatible with what PublicInbox::Git
@@ -21,66 +22,36 @@ use PublicInbox::DS qw(awaitpid);
 #	inflight => array (see PublicInbox::Git)
 #	rbuf => scalarref, may be non-existent or empty
 sub new  {
-	my ($rdr) = @_;
+	my ($opt) = @_;
 	my $self = bless {}, __PACKAGE__;
 	# ensure the child process has the same @INC we do:
 	my $env = { PERL5LIB => join(':', @INC) };
 	my ($s1, $s2);
 	socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair $!";
-	$rdr //= {};
-	$rdr->{0} = $rdr->{1} = $s2;
-	my $cmd = [$^X, qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]];
-	$self->{'pid.owner'} = $$;
-	awaitpid($self->{pid} = spawn($cmd, $env, $rdr), undef);
 	$s1->blocking(0);
+	$opt->{0} = $opt->{1} = $s2;
+	my $cmd = [$^X, qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]];
+	my $pid = spawn($cmd, $env, $opt);
+	my $sock = PublicInbox::ProcessPipe->maybe_new($pid, $s1);
 	$self->{inflight} = [];
-	$self->{in} = $s1;
-	$self->SUPER::new($s1, EPOLLIN|EPOLLET);
-}
-
-sub fail {
-	my $self = shift;
-	$self->close; # PublicInbox::DS::close
-	PublicInbox::Git::fail($self, @_);
+	$self->{epwatch} = \undef; # for Git->cleanup
+	$self->SUPER::new($sock, EPOLLIN|EPOLLET);
 }
 
 sub gcf2_async ($$$;$) {
 	my ($self, $req, $cb, $arg) = @_;
 	my $inflight = $self->{inflight} or return $self->close;
-
-	# {wbuf} is rare, I hope:
-	cat_async_step($self, $inflight) if $self->{wbuf};
-
-	$self->fail("gcf2c write: $!") if !$self->write($req) && !$self->{sock};
+	PublicInbox::Git::write_all($self, $$req, \&cat_async_step, $inflight);
 	push @$inflight, $req, $cb, $arg;
 }
 
 # ensure PublicInbox::Git::cat_async_step never calls cat_async_retry
 sub alternates_changed {}
 
-# DS::event_loop will call this
-sub event_step {
-	my ($self) = @_;
-	$self->flush_write;
-	$self->close if !$self->{in} || !$self->{sock}; # process died
-	my $inflight = $self->{inflight};
-	if ($inflight && @$inflight) {
-		cat_async_step($self, $inflight);
-		return $self->close unless $self->{in}; # process died
-
-		# ok, more to do, requeue for fairness
-		$self->requeue if @$inflight || exists($self->{rbuf});
-	}
-}
-
-sub DESTROY {
-	my ($self) = @_;
-	delete $self->{sock}; # if outside event_loop
-	PublicInbox::Git::DESTROY($self);
-}
-
 no warnings 'once';
 
-*cat_async_step = \&PublicInbox::Git::cat_async_step;
+*cat_async_step = \&PublicInbox::Git::cat_async_step; # for event_step
+*event_step = \&PublicInbox::Git::event_step;
+*DESTROY = \&PublicInbox::Git::DESTROY;
 
 1;
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 8ac40d2b..3062f293 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -9,23 +9,23 @@
 package PublicInbox::Git;
 use strict;
 use v5.10.1;
-use parent qw(Exporter);
+use parent qw(Exporter PublicInbox::DS);
 use POSIX ();
-use IO::Handle; # ->autoflush
+use IO::Handle; # ->blocking
+use Socket qw(AF_UNIX SOCK_STREAM);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
 use Errno qw(EINTR EAGAIN);
 use File::Glob qw(bsd_glob GLOB_NOSORT);
 use File::Spec ();
 use Time::HiRes qw(stat);
-use PublicInbox::Spawn qw(popen_rd which);
+use PublicInbox::Spawn qw(spawn popen_rd which);
 use PublicInbox::Tmpfile;
 use IO::Poll qw(POLLIN);
 use Carp qw(croak carp);
 use PublicInbox::SHA ();
-use PublicInbox::DS qw(awaitpid);
 our %HEXLEN2SHA = (40 => 1, 64 => 256);
 our %OFMT2HEXLEN = (sha1 => 40, sha256 => 64);
 our @EXPORT_OK = qw(git_unquote git_quote %HEXLEN2SHA %OFMT2HEXLEN);
-our $PIPE_BUFSIZ = 65536; # Linux default
 our $in_cleanup;
 our $RDTIMEO = 60_000; # milliseconds
 our $async_warn; # true in read-only daemons
@@ -34,11 +34,8 @@ our $async_warn; # true in read-only daemons
 my @MODIFIED_DATE = qw[for-each-ref --sort=-committerdate
 			--format=%(committerdate:raw) --count=1];
 
-# 512: POSIX PIPE_BUF minimum (see pipe(7))
-# 65: SHA-256 hex size + "\n" in preparation for git using non-SHA1
-# 3: @$inflight is flattened [ $OID, $cb, $arg ]
 use constant {
-	MAX_INFLIGHT => int(512 / (65 + length('contents '))) * 3,
+	MAX_INFLIGHT => 18, # arbitrary, formerly based on PIPE_BUF
 	BATCH_CMD_VER => v2.36.0, # git 2.36+
 };
 
@@ -65,7 +62,7 @@ sub check_git_exe () {
 	if ($st ne $EXE_ST) {
 		my $rd = popen_rd([ $GIT_EXE, '--version' ]);
 		my $v = readline($rd);
-		close($rd) or die "$GIT_EXE --version: $?";
+		CORE::close($rd) or die "$GIT_EXE --version: $?";
 		$v =~ /\b([0-9]+(?:\.[0-9]+){2})/ or die
 			"$GIT_EXE --version output: $v # unparseable";
 		$GIT_VER = eval("v$1") // die "BUG: bad vstring: $1 ($v)";
@@ -144,17 +141,16 @@ sub last_check_err {
 	$buf;
 }
 
-sub _bidi_pipe {
-	my ($self, $batch, $in, $out, $pid, $err) = @_;
-	if (defined $self->{$pid}) {
-		Carp::cluck("BUG: self->{$pid} exists unexpectedly");
-		return;
-	}
-	pipe(my ($out_r, $out_w)) or $self->fail("pipe failed: $!");
-	my $rdr = { 0 => $out_r, pgid => 0 };
+sub _sock_cmd {
+	my ($self, $batch, $err_c) = @_;
+	$self->{sock} and Carp::confess('BUG: {sock} exists');
+	my ($s1, $s2);
+	socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair $!";
+	$s1->blocking(0);
+	my $opt = { pgid => 0, 0 => $s2, 1 => $s2 };
 	my $gd = $self->{git_dir};
 	if ($gd =~ s!/([^/]+/[^/]+)\z!/!) {
-		$rdr->{-C} = $gd;
+		$opt->{-C} = $gd;
 		$gd = $1;
 	}
 
@@ -163,23 +159,13 @@ sub _bidi_pipe {
 	my $abbr = $GIT_VER lt v2.31.0 ? 40 : 'no';
 	my @cmd = ($GIT_EXE, "--git-dir=$gd", '-c', "core.abbrev=$abbr",
 			'cat-file', "--$batch");
-	if ($err) {
+	if ($err_c) {
 		my $id = "git.$self->{git_dir}.$batch.err";
-		$self->{$err} = $rdr->{2} = tmpfile($id, undef, 1) or
+		$self->{err_c} = $opt->{2} = tmpfile($id, undef, 1) or
 						$self->fail("tmpfile($id): $!");
 	}
-	# see lib/PublicInbox/ProcessPipe.pm for why we don't use that here
-	my ($in_r, $p) = popen_rd(\@cmd, undef, $rdr);
-	awaitpid($self->{$pid} = $p, undef);
-	$self->{"$pid.owner"} = $$;
-	$out_w->autoflush(1);
-	if ($^O eq 'linux') { # 1031: F_SETPIPE_SZ
-		fcntl($out_w, 1031, 4096);
-		fcntl($in_r, 1031, 4096) if $batch eq 'batch-check';
-	}
-	$out_w->blocking(0);
-	$self->{$out} = $out_w;
-	$self->{$in} = $in_r;
+	my $pid = spawn(\@cmd, undef, $opt);
+	$self->{sock} = PublicInbox::ProcessPipe->maybe_new($pid, $s1);
 }
 
 sub poll_in ($) { IO::Poll::_poll($RDTIMEO, fileno($_[0]), my $ev = POLLIN) }
@@ -189,7 +175,7 @@ sub my_read ($$$) {
 	my $left = $len - length($$rbuf);
 	my $r;
 	while ($left > 0) {
-		$r = sysread($fh, $$rbuf, $PIPE_BUFSIZ, length($$rbuf));
+		$r = sysread($fh, $$rbuf, $left, length($$rbuf));
 		if ($r) {
 			$left -= $r;
 		} elsif (defined($r)) { # EOF
@@ -210,8 +196,7 @@ sub my_readline ($$) {
 		if ((my $n = index($$rbuf, "\n")) >= 0) {
 			return substr($$rbuf, 0, $n + 1, '');
 		}
-		my $r = sysread($fh, $$rbuf, $PIPE_BUFSIZ, length($$rbuf))
-								and next;
+		my $r = sysread($fh, $$rbuf, 65536, length($$rbuf)) and next;
 
 		# return whatever's left on EOF
 		return substr($$rbuf, 0, length($$rbuf)+1, '') if defined($r);
@@ -229,11 +214,10 @@ sub cat_async_retry ($$) {
 	# here to prevent cleanup() from waiting:
 	delete $self->{inflight};
 	cleanup($self);
-	batch_prepare($self, my $new_inflight = []);
+	my $new_inflight = batch_prepare($self);
 
 	while (my ($oid, $cb, $arg) = splice(@$old_inflight, 0, 3)) {
-		write_all($self, $self->{out}, $oid."\n",
-				\&cat_async_step, $new_inflight);
+		write_all($self, $oid."\n", \&cat_async_step, $new_inflight);
 		$oid = \$oid if !@$new_inflight; # to indicate oid retried
 		push @$new_inflight, $oid, $cb, $arg;
 	}
@@ -246,7 +230,7 @@ sub async_prefetch {
 	my $inflight = $self->{inflight} or return;
 	return if @$inflight;
 	substr($oid, 0, 0) = 'contents ' if $self->{-bc};
-	write_all($self, $self->{out}, "$oid\n", \&cat_async_step, $inflight);
+	write_all($self, "$oid\n", \&cat_async_step, $inflight);
 	push(@$inflight, $oid, $cb, $arg);
 }
 
@@ -256,14 +240,14 @@ sub cat_async_step ($$) {
 	my ($req, $cb, $arg) = @$inflight[0, 1, 2];
 	my $rbuf = delete($self->{rbuf}) // \(my $new = '');
 	my ($bref, $oid, $type, $size);
-	my $head = my_readline($self->{in}, $rbuf);
+	my $head = my_readline($self->{sock}, $rbuf);
 	my $cmd = ref($req) ? $$req : $req;
 	# ->fail may be called via Gcf2Client.pm
 	my $info = $self->{-bc} && substr($cmd, 0, 5) eq 'info ';
 	if ($head =~ /^([0-9a-f]{40,}) (\S+) ([0-9]+)$/) {
 		($oid, $type, $size) = ($1, $2, $3 + 0);
 		unless ($info) { # --batch-command
-			$bref = my_read($self->{in}, $rbuf, $size + 1) or
+			$bref = my_read($self->{sock}, $rbuf, $size + 1) or
 				$self->fail(defined($bref) ?
 						'read EOF' : "read: $!");
 			chop($$bref) eq "\n" or
@@ -302,16 +286,16 @@ sub cat_async_wait ($) {
 	}
 }
 
-sub batch_prepare ($$) {
-	my ($self, $inflight) = @_;
+sub batch_prepare ($) {
+	my ($self) = @_;
 	check_git_exe();
 	if ($GIT_VER ge BATCH_CMD_VER) {
-		_bidi_pipe($self, qw(batch-command in out pid err_c));
 		$self->{-bc} = 1;
+		_sock_cmd($self, 'batch-command', 1);
 	} else {
-		_bidi_pipe($self, qw(batch in out pid));
+		_sock_cmd($self, 'batch');
 	}
-	$self->{inflight} = $inflight;
+	$self->{inflight} = [];
 }
 
 sub _cat_file_cb {
@@ -328,52 +312,59 @@ sub cat_file {
 }
 
 sub check_async_step ($$) {
-	my ($self, $inflight_c) = @_;
-	die 'BUG: inflight empty or odd' if scalar(@$inflight_c) < 3;
-	my ($req, $cb, $arg) = @$inflight_c[0, 1, 2];
-	my $rbuf = delete($self->{rbuf_c}) // \(my $new = '');
-	chomp(my $line = my_readline($self->{in_c}, $rbuf));
+	my ($ck, $inflight) = @_;
+	die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
+	my ($req, $cb, $arg) = @$inflight[0, 1, 2];
+	my $rbuf = delete($ck->{rbuf}) // \(my $new = '');
+	chomp(my $line = my_readline($ck->{sock}, $rbuf));
 	my ($hex, $type, $size) = split(/ /, $line);
 
 	# git <2.21 would show `dangling' (2.21+ shows `ambiguous')
 	# https://public-inbox.org/git/20190118033845.s2vlrb3wd3m2jfzu@dcvr/T/
 	if ($hex eq 'dangling') {
-		my $ret = my_read($self->{in_c}, $rbuf, $type + 1);
-		$self->fail(defined($ret) ? 'read EOF' : "read: $!") if !$ret;
+		my $ret = my_read($ck->{sock}, $rbuf, $type + 1);
+		$ck->fail(defined($ret) ? 'read EOF' : "read: $!") if !$ret;
 	}
-	$self->{rbuf_c} = $rbuf if $$rbuf ne '';
-	splice(@$inflight_c, 0, 3); # don't retry $cb on ->fail
+	$ck->{rbuf} = $rbuf if $$rbuf ne '';
+	splice(@$inflight, 0, 3); # don't retry $cb on ->fail
 	eval { $cb->(undef, $hex, $type, $size, $arg) };
-	async_err($self, $req, $hex, $@, 'check') if $@;
+	async_err($ck, $req, $hex, $@, 'check') if $@;
 }
 
 sub check_async_wait ($) {
 	my ($self) = @_;
 	return cat_async_wait($self) if $self->{-bc};
-	my $inflight_c = $self->{inflight_c} or return;
-	check_async_step($self, $inflight_c) while (scalar(@$inflight_c));
+	my $ck = $self->{ck} or return;
+	my $inflight = $ck->{inflight} or return;
+	check_async_step($ck, $inflight) while (scalar(@$inflight));
+}
+
+# git <2.36
+sub ck {
+	$_[0]->{ck} //= bless { git_dir => $_[0]->{git_dir} },
+				'PublicInbox::GitCheck';
 }
 
 sub check_async_begin ($) {
 	my ($self) = @_;
-	die 'BUG: already in async check' if $self->{inflight_c};
 	cleanup($self) if alternates_changed($self);
 	check_git_exe();
 	if ($GIT_VER ge BATCH_CMD_VER) {
-		_bidi_pipe($self, qw(batch-command in out pid err_c));
 		$self->{-bc} = 1;
-		$self->{inflight} = [];
+		_sock_cmd($self, 'batch-command', 1);
 	} else {
-		_bidi_pipe($self, qw(batch-check in_c out_c pid_c err_c));
-		$self->{inflight_c} = [];
+		_sock_cmd($self = ck($self), 'batch-check', 1);
 	}
+	$self->{inflight} = [];
 }
 
 sub write_all {
-	my ($self, $out, $buf, $read_step, $inflight) = @_;
+	my ($self, $buf, $read_step, $inflight) = @_;
+	$self->{sock} // Carp::confess 'BUG: no {sock}';
+	Carp::confess('BUG: not an array') if ref($inflight) ne 'ARRAY';
 	$read_step->($self, $inflight) while @$inflight >= MAX_INFLIGHT;
 	do {
-		my $w = syswrite($out, $buf);
+		my $w = syswrite($self->{sock}, $buf);
 		if (defined $w) {
 			return if $w == length($buf);
 			substr($buf, 0, $w, ''); # sv_chop
@@ -386,16 +377,17 @@ sub write_all {
 
 sub check_async ($$$$) {
 	my ($self, $oid, $cb, $arg) = @_;
-	my $inflight = $self->{-bc} ?
-			($self->{inflight} // cat_async_begin($self)) :
-			($self->{inflight_c} // check_async_begin($self));
-	if ($self->{-bc}) {
+	my $inflight;
+	if ($self->{-bc}) { # likely as time goes on
+batch_command:
+		$inflight = $self->{inflight} // cat_async_begin($self);
 		substr($oid, 0, 0) = 'info ';
-		write_all($self, $self->{out}, "$oid\n",
-				\&cat_async_step, $inflight);
-	} else {
-		write_all($self, $self->{out_c}, "$oid\n",
-				\&check_async_step, $inflight);
+		write_all($self, "$oid\n", \&cat_async_step, $inflight);
+	} else { # accounts for git upgrades while we're running:
+		my $ck = $self->{ck}; # undef OK, maybe set in check_async_begin
+		$inflight = $ck->{inflight} // check_async_begin($self);
+		goto batch_command if $self->{-bc};
+		write_all($self->{ck}, "$oid\n", \&check_async_step, $inflight);
 	}
 	push(@$inflight, $oid, $cb, $arg);
 }
@@ -418,39 +410,9 @@ sub check {
 	($hex, $type, $size);
 }
 
-sub _destroy {
-	my ($self, $pid, @rest) = @_; # rest = rbuf, in, out, err
-	my ($p) = delete @$self{($pid, @rest)};
-
-	# GitAsyncCat::event_step may delete {$pid}
-	awaitpid($p) if defined($p) && $$ == $self->{"$pid.owner"};
-}
-
-sub async_abort ($) {
-	my ($self) = @_;
-	while (scalar(@{$self->{inflight_c} // []}) ||
-			scalar(@{$self->{inflight} // []})) {
-		for my $c ('', '_c') {
-			my $q = $self->{"inflight$c"} or next;
-			while (@$q) {
-				my ($req, $cb, $arg) = splice(@$q, 0, 3);
-				$req = $$req if ref($req);
-				$self->{-bc} and
-					$req =~ s/\A(?:contents|info) //;
-				$req =~ s/ .*//; # drop git_dir for Gcf2Client
-				eval { $cb->(undef, $req, undef, undef, $arg) };
-				warn "E: (in abort) $req: $@" if $@;
-			}
-			delete $self->{"inflight$c"};
-			delete $self->{"rbuf$c"};
-		}
-	}
-	cleanup($self);
-}
-
-sub fail { # may be augmented in subclasses
+sub fail {
 	my ($self, $msg) = @_;
-	async_abort($self);
+	$self->close;
 	croak(ref($self) . ' ' . ($self->{git_dir} // '') . ": $msg");
 }
 
@@ -475,12 +437,12 @@ sub qx {
 	my $fh = popen(@_);
 	if (wantarray) {
 		my @ret = <$fh>;
-		close $fh; # caller should check $?
+		CORE::close $fh; # caller should check $?
 		@ret;
 	} else {
 		local $/;
 		my $ret = <$fh>;
-		close $fh; # caller should check $?
+		CORE::close $fh; # caller should check $?
 		$ret;
 	}
 }
@@ -492,12 +454,16 @@ sub date_parse {
 	} $self->qx('rev-parse', map { "--since=$_" } @_);
 }
 
+sub _active ($) {
+	scalar(@{$_[0]->{inflight} // []}) ||
+		($_[0]->{ck} && scalar(@{$_[0]->{ck}->{inflight} // []}))
+}
+
 # check_async and cat_async may trigger the other, so ensure they're
 # both completely done by using this:
 sub async_wait_all ($) {
 	my ($self) = @_;
-	while (scalar(@{$self->{inflight_c} // []}) ||
-			scalar(@{$self->{inflight} // []})) {
+	while (_active($self)) {
 		check_async_wait($self);
 		cat_async_wait($self);
 	}
@@ -506,14 +472,10 @@ sub async_wait_all ($) {
 # returns true if there are pending "git cat-file" processes
 sub cleanup {
 	my ($self, $lazy) = @_;
-	return 1 if $lazy && (scalar(@{$self->{inflight_c} // []}) ||
-				scalar(@{$self->{inflight} // []}));
+	return 1 if $lazy && _active($self);
 	local $in_cleanup = 1;
-	delete @$self{qw(async_cat async_chk)};
 	async_wait_all($self);
-	delete @$self{qw(inflight inflight_c -bc)};
-	_destroy($self, qw(pid rbuf in out err_c));
-	_destroy($self, qw(pid_c rbuf_c in_c out_c err_c));
+	$_->close for ($self, (delete($self->{ck}) // ()));
 	undef;
 }
 
@@ -530,7 +492,7 @@ sub packed_bytes {
 	$n
 }
 
-sub DESTROY { cleanup(@_) }
+sub DESTROY { cleanup($_[0]) }
 
 sub local_nick ($) {
 	# don't show full FS path, basename should be OK:
@@ -571,14 +533,14 @@ sub cat_async_begin {
 	my ($self) = @_;
 	cleanup($self) if $self->alternates_changed;
 	die 'BUG: already in async' if $self->{inflight};
-	batch_prepare($self, []);
+	batch_prepare($self);
 }
 
 sub cat_async ($$$;$) {
 	my ($self, $oid, $cb, $arg) = @_;
 	my $inflight = $self->{inflight} // cat_async_begin($self);
 	substr($oid, 0, 0) = 'contents ' if $self->{-bc};
-	write_all($self, $self->{out}, $oid."\n", \&cat_async_step, $inflight);
+	write_all($self, $oid."\n", \&cat_async_step, $inflight);
 	push(@$inflight, $oid, $cb, $arg);
 }
 
@@ -648,7 +610,7 @@ sub manifest_entry {
 	}
 	my $dig = PublicInbox::SHA->new(1);
 	while (read($sr, $buf, 65536)) { $dig->add($buf) }
-	close $sr or return; # empty, uninitialized git repo
+	CORE::close $sr or return; # empty, uninitialized git repo
 	$ent->{fingerprint} = $dig->hexdigest;
 	$ent->{modified} = modified(undef, $mod);
 	chomp($buf = <$own> // '');
@@ -664,8 +626,10 @@ sub cleanup_if_unlinked {
 	# Linux-specific /proc/$PID/maps access
 	# TODO: support this inside git.git
 	my $ret = 0;
-	for my $fld (qw(pid pid_c)) {
-		my $pid = $self->{$fld} // next;
+	for my $obj ($self, ($self->{ck} // ())) {
+		my $sock = $obj->{sock} // next;
+		my PublicInbox::ProcessPipe $pp = tied *$sock; # ProcessPipe
+		my $pid = $pp->{pid} // next;
 		open my $fh, '<', "/proc/$pid/maps" or return cleanup($self, 1);
 		while (<$fh>) {
 			# n.b. we do not restart for unlinked multi-pack-index
@@ -679,6 +643,50 @@ sub cleanup_if_unlinked {
 	$ret;
 }
 
+sub event_step {
+	my ($self) = @_;
+	$self->close if !$self->{sock}; # process died while requeued
+	my $inflight = $self->{inflight};
+	if ($inflight && @$inflight) {
+		$self->cat_async_step($inflight);
+		return $self->close unless $self->{sock};
+		# more to do? requeue for fairness:
+		$self->requeue if @$inflight || exists($self->{rbuf});
+	}
+}
+
+# idempotently registers with DS epoll/kqueue/select/poll
+sub watch_async ($) {
+	$_[0]->{epwatch} //= do {
+		$_[0]->SUPER::new($_[0]->{sock}, EPOLLIN|EPOLLET);
+		\undef;
+	}
+}
+
+sub close {
+	my ($self) = @_;
+	if (my $q = $self->{inflight}) { # abort inflight requests
+		while (@$q) {
+			my ($req, $cb, $arg) = splice(@$q, 0, 3);
+			$req = $$req if ref($req);
+			$self->{-bc} and $req =~ s/\A(?:contents|info) //;
+			$req =~ s/ .*//; # drop git_dir for Gcf2Client
+			eval { $cb->(undef, $req, undef, undef, $arg) };
+			warn "E: (in abort) $req: $@" if $@;
+		}
+	}
+	delete @$self{qw(-bc err_c inflight rbuf)};
+	delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock});
+}
+
+package PublicInbox::GitCheck; # only for git <2.36
+use v5.12;
+our @ISA = qw(PublicInbox::Git);
+no warnings 'once';
+
+# for event_step
+*cat_async_step = \&PublicInbox::Git::check_async_step;
+
 1;
 __END__
 =pod
diff --git a/lib/PublicInbox/GitAsyncCat.pm b/lib/PublicInbox/GitAsyncCat.pm
index 671654b5..71ee1147 100644
--- a/lib/PublicInbox/GitAsyncCat.pm
+++ b/lib/PublicInbox/GitAsyncCat.pm
@@ -1,70 +1,12 @@
 # Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-#
-# internal class used by PublicInbox::Git + PublicInbox::DS
-# This parses the output pipe of "git cat-file --batch"
 package PublicInbox::GitAsyncCat;
 use v5.12;
-use parent qw(PublicInbox::DS Exporter);
-use PublicInbox::DS qw(awaitpid);
-use POSIX qw(WNOHANG);
-use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
+use parent qw(Exporter);
 our @EXPORT = qw(ibx_async_cat ibx_async_prefetch async_check);
-use PublicInbox::Git ();
 
 our $GCF2C; # singleton PublicInbox::Gcf2Client
 
-# close w/o aborting another git process
-sub vanish {
-	delete $_[0]->{git};
-	$_[0]->close;
-}
-
-sub close {
-	my ($self) = @_;
-	if (my $git = delete $self->{git}) {
-		$git->async_abort;
-	}
-	$self->SUPER::close; # PublicInbox::DS::close
-}
-
-sub aclose {
-	my (undef, $self, $f) = @_; # ignore PID ($_[0])
-	if (my $g = $self->{git}) {
-		return vanish($self) if ($g->{$f} // 0) != ($self->{sock} // 1);
-	}
-	$self->close;
-}
-
-sub event_step {
-	my ($self) = @_;
-	my $git = $self->{git} or return;
-	return vanish($self) if ($git->{in} // 0) != ($self->{sock} // 1);
-	my $inflight = $git->{inflight};
-	if ($inflight && @$inflight) {
-		$git->cat_async_step($inflight);
-
-		# child death?
-		if (($git->{in} // 0) != ($self->{sock} // 1)) {
-			vanish($self);
-		} elsif (@$inflight || exists $git->{rbuf}) {
-			# ok, more to do, requeue for fairness
-			$self->requeue;
-		}
-	}
-}
-
-sub watch_cat {
-	my ($git) = @_;
-	$git->{async_cat} //= do {
-		my $self = bless { git => $git }, __PACKAGE__;
-		$git->{in}->blocking(0);
-		$self->SUPER::new($git->{in}, EPOLLIN|EPOLLET);
-		awaitpid($git->{pid}, \&aclose, $self, 'in');
-		\undef; # this is a true ref()
-	};
-}
-
 sub ibx_async_cat ($$$$) {
 	my ($ibx, $oid, $cb, $arg) = @_;
 	my $git = $ibx->{git} // $ibx->git;
@@ -80,7 +22,7 @@ sub ibx_async_cat ($$$$) {
 		\undef;
 	} else { # read-only end of git-cat-file pipe
 		$git->cat_async($oid, $cb, $arg);
-		watch_cat($git);
+		$git->watch_async;
 	}
 }
 
@@ -88,14 +30,7 @@ sub async_check ($$$$) {
 	my ($ibx, $oidish, $cb, $arg) = @_; # $ibx may be $ctx
 	my $git = $ibx->{git} // $ibx->git;
 	$git->check_async($oidish, $cb, $arg);
-	return watch_cat($git) if $git->{-bc}; # --batch-command
-	$git->{async_chk} //= do {
-		my $self = bless { git => $git }, 'PublicInbox::GitAsyncCheck';
-		$git->{in_c}->blocking(0);
-		$self->SUPER::new($git->{in_c}, EPOLLIN|EPOLLET);
-		awaitpid($git->{pid_c}, \&aclose, $self, 'in_c');
-		\undef; # this is a true ref()
-	};
+	($git->{ck} // $git)->watch_async;
 }
 
 # this is safe to call inside $cb, but not guaranteed to enqueue
@@ -109,35 +44,10 @@ sub ibx_async_prefetch {
 			$oid .= " $git->{git_dir}\n";
 			return $GCF2C->gcf2_async(\$oid, $cb, $arg); # true
 		}
-	} elsif ($git->{async_cat}) {
+	} elsif ($git->{epwatch}) {
 		return $git->async_prefetch($oid, $cb, $arg);
 	}
 	undef;
 }
 
 1;
-package PublicInbox::GitAsyncCheck;
-use v5.12;
-our @ISA = qw(PublicInbox::GitAsyncCat);
-use POSIX qw(WNOHANG);
-use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
-
-sub event_step {
-	my ($self) = @_;
-	my $git = $self->{git} or return;
-	return $self->vanish if ($git->{in_c} // 0) != ($self->{sock} // 1);
-	my $inflight = $git->{inflight_c};
-	if ($inflight && @$inflight) {
-		$git->check_async_step($inflight);
-
-		# child death?
-		if (($git->{in_c} // 0) != ($self->{sock} // 1)) {
-			$self->vanish;
-		} elsif (@$inflight || exists $git->{rbuf_c}) {
-			# ok, more to do, requeue for fairness
-			$self->requeue;
-		}
-	}
-}
-
-1;
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 2dddf00b..98d0ac19 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -133,7 +133,7 @@ sub eml2mboxcl2 {
 sub git_to_mail { # git->cat_async callback
 	my ($bref, $oid, $type, $size, $smsg) = @_;
 	my $self = delete $smsg->{l2m} // die "BUG: no l2m";
-	$type // return; # called by git->async_abort
+	$type // return; # called by PublicInbox::Git::close
 	eval {
 		if ($type eq 'missing' &&
 			  ($bref = $self->{-lms_rw}->local_blob($oid, 1))) {
diff --git a/lib/PublicInbox/ViewVCS.pm b/lib/PublicInbox/ViewVCS.pm
index 5529ed5b..f80fb4cb 100644
--- a/lib/PublicInbox/ViewVCS.pm
+++ b/lib/PublicInbox/ViewVCS.pm
@@ -133,7 +133,7 @@ sub do_cat_async {
 	# favor git(1) over Gcf2 (libgit2) for SHA-256 support
 	$ctx->{git}->cat_async($_, $cb, $ctx) for @req;
 	if ($ctx->{env}->{'pi-httpd.async'}) {
-		PublicInbox::GitAsyncCat::watch_cat($ctx->{git});
+		$ctx->{git}->watch_async;
 	} else { # synchronous, generic PSGI
 		$ctx->{git}->cat_async_wait;
 	}

^ permalink raw reply related	[flat|nested] 4+ messages in thread

* [PATCH 3/3] git+gcf2client: switch to level-triggered wakeups
  2023-09-30 15:20 [PATCH 0/3] git: use Unix stream sockets for --batch* Eric Wong
  2023-09-30 15:20 ` [PATCH 1/3] git: decouple cat_async_retry from POSIX pipe semantics Eric Wong
  2023-09-30 15:20 ` [PATCH 2/3] git: use Unix stream sockets for `cat-file --batch-*' Eric Wong
@ 2023-09-30 15:20 ` Eric Wong
  2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2023-09-30 15:20 UTC (permalink / raw)
  To: meta

Instead of using ->requeue to emulate level-triggered wakeups in
userspace, just use level-triggered wakeups in the kernel to
save some user time at the expense of system (kernel) time.  Of
course, the ready list implementation in the kernel via C is
faster than a Perl one on our end.

We must still use requeue if we've got buffered data, however.

Followup-to: 1181a7e6a853 (listener: switch to level-triggered epoll)
---
 lib/PublicInbox/Gcf2Client.pm | 4 ++--
 lib/PublicInbox/Git.pm        | 7 ++++---
 2 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index 8f442787..8ac44a5e 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -9,7 +9,7 @@ use PublicInbox::Git;
 use PublicInbox::Gcf2; # fails if Inline::C or libgit2-dev isn't available
 use PublicInbox::Spawn qw(spawn);
 use Socket qw(AF_UNIX SOCK_STREAM);
-use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
+use PublicInbox::Syscall qw(EPOLLIN);
 use PublicInbox::ProcessPipe;
 
 # fields:
@@ -35,7 +35,7 @@ sub new  {
 	my $sock = PublicInbox::ProcessPipe->maybe_new($pid, $s1);
 	$self->{inflight} = [];
 	$self->{epwatch} = \undef; # for Git->cleanup
-	$self->SUPER::new($sock, EPOLLIN|EPOLLET);
+	$self->SUPER::new($sock, EPOLLIN);
 }
 
 sub gcf2_async ($$$;$) {
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 3062f293..5003be53 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -650,15 +650,16 @@ sub event_step {
 	if ($inflight && @$inflight) {
 		$self->cat_async_step($inflight);
 		return $self->close unless $self->{sock};
-		# more to do? requeue for fairness:
-		$self->requeue if @$inflight || exists($self->{rbuf});
+		# don't loop here to keep things fair, but we must requeue
+		# if there's already-read data in rbuf
+		$self->requeue if exists($self->{rbuf});
 	}
 }
 
 # idempotently registers with DS epoll/kqueue/select/poll
 sub watch_async ($) {
 	$_[0]->{epwatch} //= do {
-		$_[0]->SUPER::new($_[0]->{sock}, EPOLLIN|EPOLLET);
+		$_[0]->SUPER::new($_[0]->{sock}, EPOLLIN);
 		\undef;
 	}
 }

^ permalink raw reply related	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2023-09-30 15:20 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2023-09-30 15:20 [PATCH 0/3] git: use Unix stream sockets for --batch* Eric Wong
2023-09-30 15:20 ` [PATCH 1/3] git: decouple cat_async_retry from POSIX pipe semantics Eric Wong
2023-09-30 15:20 ` [PATCH 2/3] git: use Unix stream sockets for `cat-file --batch-*' Eric Wong
2023-09-30 15:20 ` [PATCH 3/3] git+gcf2client: switch to level-triggered wakeups Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).