unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
* [PATCH 0/4] cindex: parallelism++, sockets--
@ 2023-03-31 10:20 Eric Wong
  2023-03-31 10:20 ` [PATCH 1/4] ipc: support awaitpid in WQ workers Eric Wong
                   ` (3 more replies)
  0 siblings, 4 replies; 5+ messages in thread
From: Eric Wong @ 2023-03-31 10:20 UTC (permalink / raw)
  To: meta

I've noticed `git log -p ...' can take a while to generate
diffs, so being able to do prune work is helpful in the
meantime.

I've also noticed PktOp use was excessive so we now share
sockets across processes without an increase in complexity.

Eric Wong (4):
  ipc: support awaitpid in WQ workers
  cindex: do prune work while waiting for `git log -p'
  cindex: share PktOp socket across prune workers
  cindex: share PktOp across indexing workers

 MANIFEST                         |   1 +
 lib/PublicInbox/CidxLogP.pm      |  29 +++++++
 lib/PublicInbox/CodeSearchIdx.pm | 129 ++++++++++++++++++-------------
 lib/PublicInbox/IPC.pm           |  11 +--
 4 files changed, 110 insertions(+), 60 deletions(-)
 create mode 100644 lib/PublicInbox/CidxLogP.pm

^ permalink raw reply	[flat|nested] 5+ messages in thread

* [PATCH 1/4] ipc: support awaitpid in WQ workers
  2023-03-31 10:20 [PATCH 0/4] cindex: parallelism++, sockets-- Eric Wong
@ 2023-03-31 10:20 ` Eric Wong
  2023-03-31 10:20 ` [PATCH 2/4] cindex: do prune work while waiting for `git log -p' Eric Wong
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 5+ messages in thread
From: Eric Wong @ 2023-03-31 10:20 UTC (permalink / raw)
  To: meta

Using signalfd is necessary to get reliable signal wakeups w/o
polling on fixed intervals.  This change will make it possible
to use awaitpid in cidx shard workers so they can perform prune
work while waiting on the initial output of `git log -p'.
---
 lib/PublicInbox/IPC.pm | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 1f0e87ee..cca3dacb 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -268,12 +268,14 @@ sub sock_defined {
 	defined($wqw->{sock});
 }
 
-sub wq_worker_loop ($$) {
-	my ($self, $bcast2) = @_;
+sub wq_worker_loop ($$$) {
+	my ($self, $bcast2, $oldset) = @_;
 	my $wqw = PublicInbox::WQWorker->new($self, $self->{-wq_s2});
 	PublicInbox::WQWorker->new($self, $bcast2) if $bcast2;
 	local @PublicInbox::DS::post_loop_do = (\&sock_defined, $wqw);
-	PublicInbox::DS::event_loop();
+	my $sig = delete($self->{wq_sig});
+	$sig->{CHLD} //= \&PublicInbox::DS::enqueue_reap;
+	PublicInbox::DS::event_loop($sig, $oldset);
 	PublicInbox::DS->Reset;
 }
 
@@ -405,8 +407,7 @@ sub _wq_worker_start {
 			local @$self{keys %$fields} = values(%$fields);
 			my $on_destroy = $self->ipc_atfork_child;
 			local @SIG{keys %SIG} = values %SIG;
-			PublicInbox::DS::sig_setmask($oldset);
-			wq_worker_loop($self, $bcast2);
+			wq_worker_loop($self, $bcast2, $oldset);
 		};
 		warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
 		undef $end; # trigger exit

^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH 2/4] cindex: do prune work while waiting for `git log -p'
  2023-03-31 10:20 [PATCH 0/4] cindex: parallelism++, sockets-- Eric Wong
  2023-03-31 10:20 ` [PATCH 1/4] ipc: support awaitpid in WQ workers Eric Wong
@ 2023-03-31 10:20 ` Eric Wong
  2023-03-31 10:20 ` [PATCH 3/4] cindex: share PktOp socket across prune workers Eric Wong
  2023-03-31 10:20 ` [PATCH 4/4] cindex: share PktOp across indexing workers Eric Wong
  3 siblings, 0 replies; 5+ messages in thread
From: Eric Wong @ 2023-03-31 10:20 UTC (permalink / raw)
  To: meta

`git log -p' can several seconds to generate its initial output.
SMP systems can be processing prunes during this delay, so let
DS do a one-shot notification for us while prune is running.  On
Linux, we'll also use the biggest pipe possible so git can do
more CPU-intensive work to generate diffs while our Perl
processes are indexing and likely hitting I/O wait.
---
 MANIFEST                         |  1 +
 lib/PublicInbox/CidxLogP.pm      | 29 ++++++++++++++++++
 lib/PublicInbox/CodeSearchIdx.pm | 51 +++++++++++++++++++++-----------
 3 files changed, 63 insertions(+), 18 deletions(-)
 create mode 100644 lib/PublicInbox/CidxLogP.pm

diff --git a/MANIFEST b/MANIFEST
index 3c421645..a0e64c6a 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -160,6 +160,7 @@ lib/PublicInbox/AdminEdit.pm
 lib/PublicInbox/AltId.pm
 lib/PublicInbox/AutoReap.pm
 lib/PublicInbox/Cgit.pm
+lib/PublicInbox/CidxLogP.pm
 lib/PublicInbox/CmdIPC4.pm
 lib/PublicInbox/CodeSearch.pm
 lib/PublicInbox/CodeSearchIdx.pm
diff --git a/lib/PublicInbox/CidxLogP.pm b/lib/PublicInbox/CidxLogP.pm
new file mode 100644
index 00000000..7877d5ac
--- /dev/null
+++ b/lib/PublicInbox/CidxLogP.pm
@@ -0,0 +1,29 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+#
+# Waits for initial `git log -p' output for PublicInbox::CodeSearchIdx.
+# The initial output from `git log -p' can take a while to generate,
+# CodeSearchIdx can process prune work while it's happening.  Once
+# `git log -p' starts generating output, it should be able to keep
+# up with Xapian indexing, so we still rely on blocking reads to simplify
+# cidx_read_log_p
+package PublicInbox::CidxLogP;
+use v5.12;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
+
+sub new {
+	my ($cls, $rd, $cidx, $git, $roots) = @_;
+	my $self = bless { cidx => $cidx, git => $git, roots => $roots }, $cls;
+	fcntl($rd, 1031, 1048576) if $^O eq 'linux'; # fatter pipes
+	$self->SUPER::new($rd, EPOLLIN|EPOLLONESHOT);
+}
+
+sub event_step {
+	my ($self) = @_;
+	my $rd = $self->{sock} // return warn('BUG?: no {sock}');
+	$self->close; # PublicInbox::DS::close, deferred, so $sock is usable
+	delete($self->{cidx})->cidx_read_log_p($self, $rd);
+}
+
+1;
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 035fab3e..215e337f 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -30,6 +30,7 @@ use PublicInbox::SearchIdx qw(add_val);
 use PublicInbox::Config qw(glob2re);
 use PublicInbox::Spawn qw(spawn popen_rd);
 use PublicInbox::OnDestroy;
+use PublicInbox::CidxLogP;
 use Socket qw(MSG_EOR);
 use Carp ();
 our (
@@ -216,20 +217,41 @@ EOM
 	$len;
 }
 
-# sharded reader for `git log --pretty=format: --stdin'
+sub cidx_reap_log { # awaitpid cb
+	my ($pid, $self, $op_p) = @_;
+	if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
+				($? & 127) == POSIX::SIGPIPE))) {
+		send($op_p, "shard_done $self->{shard}", MSG_EOR);
+	} else {
+		warn "E: git @LOG_STDIN: \$?=$?\n";
+		$self->{xdb}->cancel_transaction;
+	}
+}
+
 sub shard_index { # via wq_io_do in IDX_SHARDS
-	my ($self, $git, $n, $roots) = @_;
-	local $self->{current_info} = "$git->{git_dir} [$n]";
-	local $self->{roots} = $roots;
+	my ($self, $git, $roots) = @_;
+
 	my $in = delete($self->{0}) // die 'BUG: no {0} input';
 	my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
+	my ($rd, $pid) = $git->popen(@LOG_STDIN, undef, { 0 => $in });
+	close $in or die "close: $!";
+	awaitpid($pid, \&cidx_reap_log, $self, $op_p);
+	PublicInbox::CidxLogP->new($rd, $self, $git, $roots);
+	# CidxLogP->event_step will call cidx_read_log_p once there's input
+}
+
+# sharded reader for `git log --pretty=format: --stdin'
+sub cidx_read_log_p {
+	my ($self, $log_p, $rd) = @_;
+	my $git = delete $log_p->{git} // die 'BUG: no {git}';
+	local $self->{current_info} = "$git->{git_dir} [$self->{shard}]";
+	local $self->{roots} = delete $log_p->{roots} // die 'BUG: no {roots}';
+
 	local $MAX_SIZE = $self->{-opt}->{max_size};
 	# local-ized in parent before fork
 	$TXN_BYTES = $BATCH_BYTES;
 	local $self->{git} = $git; # for patchid
 	return if $DO_QUIT;
-	my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
-	close $in or die "close: $!";
 	my $nr = 0;
 
 	# a patch may have \0, see c4201214cbf10636e2c1ab9131573f735b42c8d4
@@ -238,7 +260,7 @@ sub shard_index { # via wq_io_do in IDX_SHARDS
 	my $len;
 	my $cmt = {};
 	local $/ = $FS;
-	my $buf = <$rd> // return close($rd); # leading $FS
+	my $buf = <$rd> // return; # leading $FS
 	$buf eq $FS or die "BUG: not LF-NUL: $buf\n";
 	$self->begin_txn_lazy;
 	while (!$DO_QUIT && defined($buf = <$rd>)) {
@@ -251,22 +273,15 @@ sub shard_index { # via wq_io_do in IDX_SHARDS
 			@$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
 		}
 		if (($TXN_BYTES -= $len) <= 0) {
-			cidx_ckpoint($self, "[$n] $nr");
+			cidx_ckpoint($self, "[$self->{shard}] $nr");
 			$TXN_BYTES -= $len; # len may be huge, >TXN_BYTES;
 		}
 		update_commit($self, $cmt);
 		++$nr;
-		cidx_ckpoint($self, "[$n] $nr") if $TXN_BYTES <= 0;
+		cidx_ckpoint($self, "[$self->{shard}] $nr") if $TXN_BYTES <= 0;
 		$/ = $FS;
 	}
-	close($rd);
-	if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
-				($? & 127) == POSIX::SIGPIPE))) {
-		send($op_p, "shard_done $n", MSG_EOR);
-	} else {
-		warn "E: git @LOG_STDIN: \$?=$?\n";
-		$self->{xdb}->cancel_transaction;
-	}
+	# return and wait for cidx_reap_log
 }
 
 sub shard_done { # called via PktOp on shard_index completion
@@ -537,7 +552,7 @@ sub index_repo { # cidx_await cb
 		$c->{ops}->{shard_done} = [ $self ];
 		$IDX_SHARDS[$n]->wq_io_do('shard_index',
 					[ $shard_in[$n], $p->{op_p} ],
-					$git, $n, \@roots);
+					$git, \@roots);
 		$consumers->{$n} = $c;
 	}
 	@shard_in = ();

^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH 3/4] cindex: share PktOp socket across prune workers
  2023-03-31 10:20 [PATCH 0/4] cindex: parallelism++, sockets-- Eric Wong
  2023-03-31 10:20 ` [PATCH 1/4] ipc: support awaitpid in WQ workers Eric Wong
  2023-03-31 10:20 ` [PATCH 2/4] cindex: do prune work while waiting for `git log -p' Eric Wong
@ 2023-03-31 10:20 ` Eric Wong
  2023-03-31 10:20 ` [PATCH 4/4] cindex: share PktOp across indexing workers Eric Wong
  3 siblings, 0 replies; 5+ messages in thread
From: Eric Wong @ 2023-03-31 10:20 UTC (permalink / raw)
  To: meta

We can allocate fewer sockets and memory this way.
---
 lib/PublicInbox/CodeSearchIdx.pm | 24 ++++++++++++++----------
 1 file changed, 14 insertions(+), 10 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 215e337f..14342683 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -53,7 +53,7 @@ our (
 	$PRUNE_MAX, # per-shard document ID to stop at
 	$PRUNE_OP_P, # prune_done() notification socket
 	$PRUNE_NR, # total number pruned
-	@PRUNE_DONE, # marks off prune completions
+	$PRUNE_DONE, # marks off prune completions
 	$NCHANGE, # current number of changes
 	%ACTIVE_GIT_DIR, # GIT_DIR => undef mapping for prune
 );
@@ -290,13 +290,17 @@ sub shard_done { # called via PktOp on shard_index completion
 }
 
 sub prune_done { # called via PktOp->event_step completion
-	my ($shard) = @_;
-	$PRUNE_DONE[$shard->{shard}] = 1;
+	my ($self, $n) = @_;
+	return if $DO_QUIT || !$PRUNE_DONE;
+	die "BUG: \$PRUNE_DONE->[$n] already defined" if $PRUNE_DONE->[$n];
+	$PRUNE_DONE->[$n] = 1;
+	grep(defined, @$PRUNE_DONE) == @IDX_SHARDS and
+		progress($self, 'prune done')
 }
 
-sub prune_busy {
+sub prune_busy { # post_loop_do
 	return if $DO_QUIT;
-	grep(defined, @PRUNE_DONE) != @IDX_SHARDS;
+	grep(defined, @$PRUNE_DONE) != @IDX_SHARDS;
 }
 
 sub await_prune () {
@@ -711,7 +715,7 @@ sub event_step { # may be requeued via DS
 	$TMP_GIT->async_wait_all;
 	cidx_ckpoint($self);
 	return PublicInbox::DS::requeue($self) if $PRUNE_CUR <= $PRUNE_MAX;
-	send($PRUNE_OP_P, 'prune_done', MSG_EOR);
+	send($PRUNE_OP_P, "prune_done $self->{shard}", MSG_EOR);
 	$TMP_GIT->cleanup;
 	$TMP_GIT = $PRUNE_OP_P = $PRUNE_CUR = $PRUNE_MAX = undef;
 	%ACTIVE_GIT_DIR = ();
@@ -790,9 +794,9 @@ sub start_prune ($) {
 	my ($self) = @_;
 	init_tmp_git_dir($self);
 	my @active_git_dir = (@{$self->{git_dirs}}, @GIT_DIR_GONE);
+	my ($c, $p) = PublicInbox::PktOp->pair;
+	$c->{ops}->{prune_done} = [ $self ];
 	for my $s (@IDX_SHARDS) {
-		my ($c, $p) = PublicInbox::PktOp->pair;
-		$c->{ops}->{prune_done} = [ $s ];
 		$s->wq_io_do('prune_start', [ $p->{op_p} ],
 				$TMP_GIT->{git_dir}, @active_git_dir)
 	}
@@ -807,8 +811,8 @@ sub cidx_run { # main entry point
 	my $restore = PublicInbox::OnDestroy->new($$,
 		\&PublicInbox::DS::sig_setmask, $SIGSET);
 	local $LIVE = {};
-	local ($DO_QUIT, $TMP_GIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE,
-		@PRUNE_DONE);
+	local $PRUNE_DONE = [];
+	local ($DO_QUIT, $TMP_GIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE);
 	local $BATCH_BYTES = $self->{-opt}->{batch_size} //
 				$PublicInbox::SearchIdx::BATCH_BYTES;
 	local @IDX_SHARDS = cidx_init($self);

^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH 4/4] cindex: share PktOp across indexing workers
  2023-03-31 10:20 [PATCH 0/4] cindex: parallelism++, sockets-- Eric Wong
                   ` (2 preceding siblings ...)
  2023-03-31 10:20 ` [PATCH 3/4] cindex: share PktOp socket across prune workers Eric Wong
@ 2023-03-31 10:20 ` Eric Wong
  3 siblings, 0 replies; 5+ messages in thread
From: Eric Wong @ 2023-03-31 10:20 UTC (permalink / raw)
  To: meta

Using fewer sockets simplifies completion checks, too.
---
 lib/PublicInbox/CodeSearchIdx.pm | 54 ++++++++++++++++----------------
 1 file changed, 27 insertions(+), 27 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 14342683..05007afd 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -504,36 +504,35 @@ sub partition_refs ($$$) {
 }
 
 sub shard_commit { # via wq_io_do
-	my ($self, $n) = @_;
+	my ($self) = @_;
 	my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
 	$self->commit_txn_lazy;
-	send($op_p, "shard_done $n", MSG_EOR);
+	send($op_p, "shard_done $self->{shard}", MSG_EOR);
 }
 
-sub consumers_open { # post_loop_do
-	my (undef, $consumers) = @_;
-	return if $DO_QUIT;
-	scalar(grep { $_->{sock} } values %$consumers);
+sub consumer_open { # post_loop_do
+	my (undef, $c) = @_; # $c is PublicInbox::PktOp
+	$DO_QUIT ? undef : defined($c->{sock});
 }
 
-sub wait_consumers ($$$) {
-	my ($self, $git, $consumers) = @_;
-	local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
+sub wait_active ($$$$) {
+	my ($self, $git, $active, $c) = @_;
+	local @PublicInbox::DS::post_loop_do = (\&consumer_open, $c);
 	PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
-	my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
+	my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$active;
 	die "E: $git->{git_dir} $n shards failed" if $n && !$DO_QUIT;
 }
 
-sub commit_used_shards ($$$) {
-	my ($self, $git, $consumers) = @_;
+sub commit_active_shards ($$$) {
+	my ($self, $git, $active) = @_;
 	local $self->{-shard_ok} = {};
-	for my $n (keys %$consumers) {
-		my ($c, $p) = PublicInbox::PktOp->pair;
-		$c->{ops}->{shard_done} = [ $self ];
-		$IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n);
-		$consumers->{$n} = $c;
+	my ($c, $p) = PublicInbox::PktOp->pair;
+	$c->{ops}->{shard_done} = [ $self ];
+	for my $n (keys %$active) {
+		$IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]);
 	}
-	wait_consumers($self, $git, $consumers);
+	undef $p;
+	wait_active($self, $git, $active, $c);
 }
 
 sub index_repo { # cidx_await cb
@@ -547,30 +546,31 @@ sub index_repo { # cidx_await cb
 	$repo->{roots} = \@roots;
 	local $self->{current_info} = $git->{git_dir};
 	my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
-	local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1
-	my $consumers = {};
+	local $self->{-shard_ok} = {};
+	my $active = {};
+	my ($c, $p) = PublicInbox::PktOp->pair;
+	$c->{ops}->{shard_done} = [ $self ];
 	for my $n (0..$#shard_in) {
 		-s $shard_in[$n] or next;
 		last if $DO_QUIT;
-		my ($c, $p) = PublicInbox::PktOp->pair;
-		$c->{ops}->{shard_done} = [ $self ];
 		$IDX_SHARDS[$n]->wq_io_do('shard_index',
 					[ $shard_in[$n], $p->{op_p} ],
 					$git, \@roots);
-		$consumers->{$n} = $c;
+		$active->{$n} = undef;
 	}
+	undef $p;
 	@shard_in = ();
-	wait_consumers($self, $git, $consumers);
+	wait_active($self, $git, $active, $c);
 	if ($DO_QUIT) {
-		commit_used_shards($self, $git, $consumers);
+		commit_active_shards($self, $git, $active);
 		progress($self, "$git->{git_dir}: done");
 		return;
 	}
 	$repo->{git_dir} = $git->{git_dir};
 	my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
 	if ($id > 0) {
-		$consumers->{$repo->{shard_n}} = undef;
-		commit_used_shards($self, $git, $consumers);
+		$active->{$repo->{shard_n}} = undef;
+		commit_active_shards($self, $git, $active);
 		progress($self, "$git->{git_dir}: done");
 		return run_deferred();
 	}

^ permalink raw reply related	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2023-03-31 10:20 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-03-31 10:20 [PATCH 0/4] cindex: parallelism++, sockets-- Eric Wong
2023-03-31 10:20 ` [PATCH 1/4] ipc: support awaitpid in WQ workers Eric Wong
2023-03-31 10:20 ` [PATCH 2/4] cindex: do prune work while waiting for `git log -p' Eric Wong
2023-03-31 10:20 ` [PATCH 3/4] cindex: share PktOp socket across prune workers Eric Wong
2023-03-31 10:20 ` [PATCH 4/4] cindex: share PktOp across indexing workers Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).