unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
* [PATCH 0/7] cindex: fix FreeBSD freezes
@ 2023-04-05 11:26 Eric Wong
  2023-04-05 11:26 ` [PATCH 1/7] ipc: support awaitpid in WQ workers Eric Wong
                   ` (6 more replies)
  0 siblings, 7 replies; 8+ messages in thread
From: Eric Wong @ 2023-04-05 11:26 UTC (permalink / raw)
  To: meta

PATCH 6/7 is the fix.  It took me a while to realize how to fix it.

1-5 improve parallelism and reduce resource usage while limiting
the workaround to a single callsite in #6.  And #7 is just a
cleanliness fix.

Eric Wong (7):
  ipc: support awaitpid in WQ workers
  cindex: do prune work while waiting for `git log -p'
  cindex: share PktOp socket across prune workers
  cindex: share PktOp across indexing workers
  cindex: enter event loop once per run
  cindex: workaround for FreeBSD missing SIGCHLD
  cindex: reset DS internals on cidx_run completion

 MANIFEST                         |   1 +
 lib/PublicInbox/CidxLogP.pm      |  29 ++++
 lib/PublicInbox/CodeSearchIdx.pm | 290 ++++++++++++++++---------------
 lib/PublicInbox/IPC.pm           |  11 +-
 4 files changed, 184 insertions(+), 147 deletions(-)
 create mode 100644 lib/PublicInbox/CidxLogP.pm

^ permalink raw reply	[flat|nested] 8+ messages in thread

* [PATCH 1/7] ipc: support awaitpid in WQ workers
  2023-04-05 11:26 [PATCH 0/7] cindex: fix FreeBSD freezes Eric Wong
@ 2023-04-05 11:26 ` Eric Wong
  2023-04-05 11:26 ` [PATCH 2/7] cindex: do prune work while waiting for `git log -p' Eric Wong
                   ` (5 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-04-05 11:26 UTC (permalink / raw)
  To: meta

Using signalfd is necessary to get reliable signal wakeups w/o
polling on fixed intervals.  This change will make it possible
to use awaitpid in cidx shard workers so they can perform prune
work while waiting on the initial output of `git log -p'.
---
 lib/PublicInbox/IPC.pm | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 1f0e87ee..cca3dacb 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -268,12 +268,14 @@ sub sock_defined {
 	defined($wqw->{sock});
 }
 
-sub wq_worker_loop ($$) {
-	my ($self, $bcast2) = @_;
+sub wq_worker_loop ($$$) {
+	my ($self, $bcast2, $oldset) = @_;
 	my $wqw = PublicInbox::WQWorker->new($self, $self->{-wq_s2});
 	PublicInbox::WQWorker->new($self, $bcast2) if $bcast2;
 	local @PublicInbox::DS::post_loop_do = (\&sock_defined, $wqw);
-	PublicInbox::DS::event_loop();
+	my $sig = delete($self->{wq_sig});
+	$sig->{CHLD} //= \&PublicInbox::DS::enqueue_reap;
+	PublicInbox::DS::event_loop($sig, $oldset);
 	PublicInbox::DS->Reset;
 }
 
@@ -405,8 +407,7 @@ sub _wq_worker_start {
 			local @$self{keys %$fields} = values(%$fields);
 			my $on_destroy = $self->ipc_atfork_child;
 			local @SIG{keys %SIG} = values %SIG;
-			PublicInbox::DS::sig_setmask($oldset);
-			wq_worker_loop($self, $bcast2);
+			wq_worker_loop($self, $bcast2, $oldset);
 		};
 		warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
 		undef $end; # trigger exit

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 2/7] cindex: do prune work while waiting for `git log -p'
  2023-04-05 11:26 [PATCH 0/7] cindex: fix FreeBSD freezes Eric Wong
  2023-04-05 11:26 ` [PATCH 1/7] ipc: support awaitpid in WQ workers Eric Wong
@ 2023-04-05 11:26 ` Eric Wong
  2023-04-05 11:26 ` [PATCH 3/7] cindex: share PktOp socket across prune workers Eric Wong
                   ` (4 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-04-05 11:26 UTC (permalink / raw)
  To: meta

`git log -p' can several seconds to generate its initial output.
SMP systems can be processing prunes during this delay, so let
DS do a one-shot notification for us while prune is running.  On
Linux, we'll also use the biggest pipe possible so git can do
more CPU-intensive work to generate diffs while our Perl
processes are indexing and likely hitting I/O wait.
---
 MANIFEST                         |  1 +
 lib/PublicInbox/CidxLogP.pm      | 29 ++++++++++++++++++
 lib/PublicInbox/CodeSearchIdx.pm | 51 +++++++++++++++++++++-----------
 3 files changed, 63 insertions(+), 18 deletions(-)
 create mode 100644 lib/PublicInbox/CidxLogP.pm

diff --git a/MANIFEST b/MANIFEST
index 3c421645..a0e64c6a 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -160,6 +160,7 @@ lib/PublicInbox/AdminEdit.pm
 lib/PublicInbox/AltId.pm
 lib/PublicInbox/AutoReap.pm
 lib/PublicInbox/Cgit.pm
+lib/PublicInbox/CidxLogP.pm
 lib/PublicInbox/CmdIPC4.pm
 lib/PublicInbox/CodeSearch.pm
 lib/PublicInbox/CodeSearchIdx.pm
diff --git a/lib/PublicInbox/CidxLogP.pm b/lib/PublicInbox/CidxLogP.pm
new file mode 100644
index 00000000..7877d5ac
--- /dev/null
+++ b/lib/PublicInbox/CidxLogP.pm
@@ -0,0 +1,29 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+#
+# Waits for initial `git log -p' output for PublicInbox::CodeSearchIdx.
+# The initial output from `git log -p' can take a while to generate,
+# CodeSearchIdx can process prune work while it's happening.  Once
+# `git log -p' starts generating output, it should be able to keep
+# up with Xapian indexing, so we still rely on blocking reads to simplify
+# cidx_read_log_p
+package PublicInbox::CidxLogP;
+use v5.12;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
+
+sub new {
+	my ($cls, $rd, $cidx, $git, $roots) = @_;
+	my $self = bless { cidx => $cidx, git => $git, roots => $roots }, $cls;
+	fcntl($rd, 1031, 1048576) if $^O eq 'linux'; # fatter pipes
+	$self->SUPER::new($rd, EPOLLIN|EPOLLONESHOT);
+}
+
+sub event_step {
+	my ($self) = @_;
+	my $rd = $self->{sock} // return warn('BUG?: no {sock}');
+	$self->close; # PublicInbox::DS::close, deferred, so $sock is usable
+	delete($self->{cidx})->cidx_read_log_p($self, $rd);
+}
+
+1;
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 035fab3e..215e337f 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -30,6 +30,7 @@ use PublicInbox::SearchIdx qw(add_val);
 use PublicInbox::Config qw(glob2re);
 use PublicInbox::Spawn qw(spawn popen_rd);
 use PublicInbox::OnDestroy;
+use PublicInbox::CidxLogP;
 use Socket qw(MSG_EOR);
 use Carp ();
 our (
@@ -216,20 +217,41 @@ EOM
 	$len;
 }
 
-# sharded reader for `git log --pretty=format: --stdin'
+sub cidx_reap_log { # awaitpid cb
+	my ($pid, $self, $op_p) = @_;
+	if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
+				($? & 127) == POSIX::SIGPIPE))) {
+		send($op_p, "shard_done $self->{shard}", MSG_EOR);
+	} else {
+		warn "E: git @LOG_STDIN: \$?=$?\n";
+		$self->{xdb}->cancel_transaction;
+	}
+}
+
 sub shard_index { # via wq_io_do in IDX_SHARDS
-	my ($self, $git, $n, $roots) = @_;
-	local $self->{current_info} = "$git->{git_dir} [$n]";
-	local $self->{roots} = $roots;
+	my ($self, $git, $roots) = @_;
+
 	my $in = delete($self->{0}) // die 'BUG: no {0} input';
 	my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
+	my ($rd, $pid) = $git->popen(@LOG_STDIN, undef, { 0 => $in });
+	close $in or die "close: $!";
+	awaitpid($pid, \&cidx_reap_log, $self, $op_p);
+	PublicInbox::CidxLogP->new($rd, $self, $git, $roots);
+	# CidxLogP->event_step will call cidx_read_log_p once there's input
+}
+
+# sharded reader for `git log --pretty=format: --stdin'
+sub cidx_read_log_p {
+	my ($self, $log_p, $rd) = @_;
+	my $git = delete $log_p->{git} // die 'BUG: no {git}';
+	local $self->{current_info} = "$git->{git_dir} [$self->{shard}]";
+	local $self->{roots} = delete $log_p->{roots} // die 'BUG: no {roots}';
+
 	local $MAX_SIZE = $self->{-opt}->{max_size};
 	# local-ized in parent before fork
 	$TXN_BYTES = $BATCH_BYTES;
 	local $self->{git} = $git; # for patchid
 	return if $DO_QUIT;
-	my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
-	close $in or die "close: $!";
 	my $nr = 0;
 
 	# a patch may have \0, see c4201214cbf10636e2c1ab9131573f735b42c8d4
@@ -238,7 +260,7 @@ sub shard_index { # via wq_io_do in IDX_SHARDS
 	my $len;
 	my $cmt = {};
 	local $/ = $FS;
-	my $buf = <$rd> // return close($rd); # leading $FS
+	my $buf = <$rd> // return; # leading $FS
 	$buf eq $FS or die "BUG: not LF-NUL: $buf\n";
 	$self->begin_txn_lazy;
 	while (!$DO_QUIT && defined($buf = <$rd>)) {
@@ -251,22 +273,15 @@ sub shard_index { # via wq_io_do in IDX_SHARDS
 			@$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
 		}
 		if (($TXN_BYTES -= $len) <= 0) {
-			cidx_ckpoint($self, "[$n] $nr");
+			cidx_ckpoint($self, "[$self->{shard}] $nr");
 			$TXN_BYTES -= $len; # len may be huge, >TXN_BYTES;
 		}
 		update_commit($self, $cmt);
 		++$nr;
-		cidx_ckpoint($self, "[$n] $nr") if $TXN_BYTES <= 0;
+		cidx_ckpoint($self, "[$self->{shard}] $nr") if $TXN_BYTES <= 0;
 		$/ = $FS;
 	}
-	close($rd);
-	if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
-				($? & 127) == POSIX::SIGPIPE))) {
-		send($op_p, "shard_done $n", MSG_EOR);
-	} else {
-		warn "E: git @LOG_STDIN: \$?=$?\n";
-		$self->{xdb}->cancel_transaction;
-	}
+	# return and wait for cidx_reap_log
 }
 
 sub shard_done { # called via PktOp on shard_index completion
@@ -537,7 +552,7 @@ sub index_repo { # cidx_await cb
 		$c->{ops}->{shard_done} = [ $self ];
 		$IDX_SHARDS[$n]->wq_io_do('shard_index',
 					[ $shard_in[$n], $p->{op_p} ],
-					$git, $n, \@roots);
+					$git, \@roots);
 		$consumers->{$n} = $c;
 	}
 	@shard_in = ();

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 3/7] cindex: share PktOp socket across prune workers
  2023-04-05 11:26 [PATCH 0/7] cindex: fix FreeBSD freezes Eric Wong
  2023-04-05 11:26 ` [PATCH 1/7] ipc: support awaitpid in WQ workers Eric Wong
  2023-04-05 11:26 ` [PATCH 2/7] cindex: do prune work while waiting for `git log -p' Eric Wong
@ 2023-04-05 11:26 ` Eric Wong
  2023-04-05 11:26 ` [PATCH 4/7] cindex: share PktOp across indexing workers Eric Wong
                   ` (3 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-04-05 11:26 UTC (permalink / raw)
  To: meta

We can allocate fewer sockets and memory this way.
---
 lib/PublicInbox/CodeSearchIdx.pm | 24 ++++++++++++++----------
 1 file changed, 14 insertions(+), 10 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 215e337f..14342683 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -53,7 +53,7 @@ our (
 	$PRUNE_MAX, # per-shard document ID to stop at
 	$PRUNE_OP_P, # prune_done() notification socket
 	$PRUNE_NR, # total number pruned
-	@PRUNE_DONE, # marks off prune completions
+	$PRUNE_DONE, # marks off prune completions
 	$NCHANGE, # current number of changes
 	%ACTIVE_GIT_DIR, # GIT_DIR => undef mapping for prune
 );
@@ -290,13 +290,17 @@ sub shard_done { # called via PktOp on shard_index completion
 }
 
 sub prune_done { # called via PktOp->event_step completion
-	my ($shard) = @_;
-	$PRUNE_DONE[$shard->{shard}] = 1;
+	my ($self, $n) = @_;
+	return if $DO_QUIT || !$PRUNE_DONE;
+	die "BUG: \$PRUNE_DONE->[$n] already defined" if $PRUNE_DONE->[$n];
+	$PRUNE_DONE->[$n] = 1;
+	grep(defined, @$PRUNE_DONE) == @IDX_SHARDS and
+		progress($self, 'prune done')
 }
 
-sub prune_busy {
+sub prune_busy { # post_loop_do
 	return if $DO_QUIT;
-	grep(defined, @PRUNE_DONE) != @IDX_SHARDS;
+	grep(defined, @$PRUNE_DONE) != @IDX_SHARDS;
 }
 
 sub await_prune () {
@@ -711,7 +715,7 @@ sub event_step { # may be requeued via DS
 	$TMP_GIT->async_wait_all;
 	cidx_ckpoint($self);
 	return PublicInbox::DS::requeue($self) if $PRUNE_CUR <= $PRUNE_MAX;
-	send($PRUNE_OP_P, 'prune_done', MSG_EOR);
+	send($PRUNE_OP_P, "prune_done $self->{shard}", MSG_EOR);
 	$TMP_GIT->cleanup;
 	$TMP_GIT = $PRUNE_OP_P = $PRUNE_CUR = $PRUNE_MAX = undef;
 	%ACTIVE_GIT_DIR = ();
@@ -790,9 +794,9 @@ sub start_prune ($) {
 	my ($self) = @_;
 	init_tmp_git_dir($self);
 	my @active_git_dir = (@{$self->{git_dirs}}, @GIT_DIR_GONE);
+	my ($c, $p) = PublicInbox::PktOp->pair;
+	$c->{ops}->{prune_done} = [ $self ];
 	for my $s (@IDX_SHARDS) {
-		my ($c, $p) = PublicInbox::PktOp->pair;
-		$c->{ops}->{prune_done} = [ $s ];
 		$s->wq_io_do('prune_start', [ $p->{op_p} ],
 				$TMP_GIT->{git_dir}, @active_git_dir)
 	}
@@ -807,8 +811,8 @@ sub cidx_run { # main entry point
 	my $restore = PublicInbox::OnDestroy->new($$,
 		\&PublicInbox::DS::sig_setmask, $SIGSET);
 	local $LIVE = {};
-	local ($DO_QUIT, $TMP_GIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE,
-		@PRUNE_DONE);
+	local $PRUNE_DONE = [];
+	local ($DO_QUIT, $TMP_GIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE);
 	local $BATCH_BYTES = $self->{-opt}->{batch_size} //
 				$PublicInbox::SearchIdx::BATCH_BYTES;
 	local @IDX_SHARDS = cidx_init($self);

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 4/7] cindex: share PktOp across indexing workers
  2023-04-05 11:26 [PATCH 0/7] cindex: fix FreeBSD freezes Eric Wong
                   ` (2 preceding siblings ...)
  2023-04-05 11:26 ` [PATCH 3/7] cindex: share PktOp socket across prune workers Eric Wong
@ 2023-04-05 11:26 ` Eric Wong
  2023-04-05 11:26 ` [PATCH 5/7] cindex: enter event loop once per run Eric Wong
                   ` (2 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-04-05 11:26 UTC (permalink / raw)
  To: meta

Using fewer sockets simplifies completion checks, too.
---
 lib/PublicInbox/CodeSearchIdx.pm | 54 ++++++++++++++++----------------
 1 file changed, 27 insertions(+), 27 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 14342683..05007afd 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -504,36 +504,35 @@ sub partition_refs ($$$) {
 }
 
 sub shard_commit { # via wq_io_do
-	my ($self, $n) = @_;
+	my ($self) = @_;
 	my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
 	$self->commit_txn_lazy;
-	send($op_p, "shard_done $n", MSG_EOR);
+	send($op_p, "shard_done $self->{shard}", MSG_EOR);
 }
 
-sub consumers_open { # post_loop_do
-	my (undef, $consumers) = @_;
-	return if $DO_QUIT;
-	scalar(grep { $_->{sock} } values %$consumers);
+sub consumer_open { # post_loop_do
+	my (undef, $c) = @_; # $c is PublicInbox::PktOp
+	$DO_QUIT ? undef : defined($c->{sock});
 }
 
-sub wait_consumers ($$$) {
-	my ($self, $git, $consumers) = @_;
-	local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
+sub wait_active ($$$$) {
+	my ($self, $git, $active, $c) = @_;
+	local @PublicInbox::DS::post_loop_do = (\&consumer_open, $c);
 	PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
-	my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
+	my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$active;
 	die "E: $git->{git_dir} $n shards failed" if $n && !$DO_QUIT;
 }
 
-sub commit_used_shards ($$$) {
-	my ($self, $git, $consumers) = @_;
+sub commit_active_shards ($$$) {
+	my ($self, $git, $active) = @_;
 	local $self->{-shard_ok} = {};
-	for my $n (keys %$consumers) {
-		my ($c, $p) = PublicInbox::PktOp->pair;
-		$c->{ops}->{shard_done} = [ $self ];
-		$IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n);
-		$consumers->{$n} = $c;
+	my ($c, $p) = PublicInbox::PktOp->pair;
+	$c->{ops}->{shard_done} = [ $self ];
+	for my $n (keys %$active) {
+		$IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]);
 	}
-	wait_consumers($self, $git, $consumers);
+	undef $p;
+	wait_active($self, $git, $active, $c);
 }
 
 sub index_repo { # cidx_await cb
@@ -547,30 +546,31 @@ sub index_repo { # cidx_await cb
 	$repo->{roots} = \@roots;
 	local $self->{current_info} = $git->{git_dir};
 	my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
-	local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1
-	my $consumers = {};
+	local $self->{-shard_ok} = {};
+	my $active = {};
+	my ($c, $p) = PublicInbox::PktOp->pair;
+	$c->{ops}->{shard_done} = [ $self ];
 	for my $n (0..$#shard_in) {
 		-s $shard_in[$n] or next;
 		last if $DO_QUIT;
-		my ($c, $p) = PublicInbox::PktOp->pair;
-		$c->{ops}->{shard_done} = [ $self ];
 		$IDX_SHARDS[$n]->wq_io_do('shard_index',
 					[ $shard_in[$n], $p->{op_p} ],
 					$git, \@roots);
-		$consumers->{$n} = $c;
+		$active->{$n} = undef;
 	}
+	undef $p;
 	@shard_in = ();
-	wait_consumers($self, $git, $consumers);
+	wait_active($self, $git, $active, $c);
 	if ($DO_QUIT) {
-		commit_used_shards($self, $git, $consumers);
+		commit_active_shards($self, $git, $active);
 		progress($self, "$git->{git_dir}: done");
 		return;
 	}
 	$repo->{git_dir} = $git->{git_dir};
 	my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
 	if ($id > 0) {
-		$consumers->{$repo->{shard_n}} = undef;
-		commit_used_shards($self, $git, $consumers);
+		$active->{$repo->{shard_n}} = undef;
+		commit_active_shards($self, $git, $active);
 		progress($self, "$git->{git_dir}: done");
 		return run_deferred();
 	}

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 5/7] cindex: enter event loop once per run
  2023-04-05 11:26 [PATCH 0/7] cindex: fix FreeBSD freezes Eric Wong
                   ` (3 preceding siblings ...)
  2023-04-05 11:26 ` [PATCH 4/7] cindex: share PktOp across indexing workers Eric Wong
@ 2023-04-05 11:26 ` Eric Wong
  2023-04-05 11:26 ` [PATCH 6/7] cindex: workaround for FreeBSD missing SIGCHLD Eric Wong
  2023-04-05 11:26 ` [PATCH 7/7] cindex: reset DS internals on cidx_run completion Eric Wong
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-04-05 11:26 UTC (permalink / raw)
  To: meta

This avoids needing to alter the sigmask for systems without
signalfd or EVFILT_SIGNAL.  This will also make it easier to
workaround FreeBSD (and likely *BSD) signal behavior in the
next commit.
---
 lib/PublicInbox/CodeSearchIdx.pm | 207 ++++++++++++++-----------------
 1 file changed, 94 insertions(+), 113 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 05007afd..1000dc6f 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -35,7 +35,6 @@ use Socket qw(MSG_EOR);
 use Carp ();
 our (
 	$LIVE, # pid => cmd
-	$DEFER, # [ [ cb, @args ], ... ]
 	$LIVE_JOBS, # integer
 	$MY_SIG, # like %SIG
 	$SIGSET,
@@ -55,7 +54,10 @@ our (
 	$PRUNE_NR, # total number pruned
 	$PRUNE_DONE, # marks off prune completions
 	$NCHANGE, # current number of changes
+	$REPO_CTX, # current repo being indexed in shards
 	%ACTIVE_GIT_DIR, # GIT_DIR => undef mapping for prune
+	$IDX_TODO, # [ $git0, $root0, $git1, $root1, ...]
+	$GIT_TODO, # [ GIT_DIR0, GIT_DIR1, ...]
 );
 
 # stop walking history if we see >$SEEN_MAX existing commits, this assumes
@@ -285,8 +287,8 @@ sub cidx_read_log_p {
 }
 
 sub shard_done { # called via PktOp on shard_index completion
-	my ($self, $n) = @_;
-	$self->{-shard_ok}->{$n} = 1 if defined($self->{-shard_ok});
+	my ($self, $repo_ctx, $on_destroy, $n) = @_;
+	$repo_ctx->{shard_ok}->{$n} = 1;
 }
 
 sub prune_done { # called via PktOp->event_step completion
@@ -298,16 +300,6 @@ sub prune_done { # called via PktOp->event_step completion
 		progress($self, 'prune done')
 }
 
-sub prune_busy { # post_loop_do
-	return if $DO_QUIT;
-	grep(defined, @$PRUNE_DONE) != @IDX_SHARDS;
-}
-
-sub await_prune () {
-	local @PublicInbox::DS::post_loop_do = (\&prune_busy);
-	PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if prune_busy();
-}
-
 sub seen ($$) {
 	my ($xdb, $q) = @_; # $q = "Q$COMMIT_HASH"
 	for (1..100) {
@@ -336,32 +328,6 @@ sub docids_by_postlist ($$) { # consider moving to PublicInbox::Search
 	@ids;
 }
 
-sub run_deferred () {
-	my $n;
-	while (defined(my $x = shift(@{$DEFER // []}))) {
-		my $cb = shift @$x;
-		$cb->(@$x);
-		++$n;
-	}
-	$n;
-}
-
-sub need_reap { # post_loop_do
-	my (undef, $jobs) = @_;
-	return if !$LIVE || $DO_QUIT;
-	scalar(keys(%$LIVE)) > $jobs;
-}
-
-sub cidx_reap ($$) {
-	my ($self, $jobs) = @_;
-	while (run_deferred()) {}
-	local @PublicInbox::DS::post_loop_do = (\&need_reap, $jobs);
-	while (need_reap(undef, $jobs)) {
-		PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
-	}
-	while (!$jobs && run_deferred()) {}
-}
-
 sub cidx_await_cb { # awaitpid cb
 	my ($pid, $cb, $self, $git, @args) = @_;
 	return if !$LIVE || $DO_QUIT;
@@ -371,7 +337,7 @@ sub cidx_await_cb { # awaitpid cb
 		$git->{-cidx_err} = 1;
 		return warn("@$cmd error: \$?=$?\n");
 	}
-	push(@$DEFER, [ $cb, $self, $git, @args ]) if $DEFER;
+	$cb->($self, $git, @args);
 }
 
 sub cidx_await ($$$$$@) {
@@ -385,7 +351,6 @@ sub cidx_await ($$$$$@) {
 sub fp_start ($$$) {
 	my ($self, $git, $prep_repo) = @_;
 	return if !$LIVE || $DO_QUIT;
-	cidx_reap($self, $LIVE_JOBS);
 	open my $refs, '+>', undef or die "open: $!";
 	my $cmd = ['git', "--git-dir=$git->{git_dir}",
 		qw(show-ref --heads --tags --hash)];
@@ -407,7 +372,6 @@ sub fp_fini { # cidx_await cb
 sub ct_start ($$$) {
 	my ($self, $git, $prep_repo) = @_;
 	return if !$LIVE || $DO_QUIT;
-	cidx_reap($self, $LIVE_JOBS);
 	my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
 		qw[for-each-ref --sort=-committerdate
 		--format=%(committerdate:raw) --count=1
@@ -426,12 +390,13 @@ sub ct_fini { # cidx_await cb
 # TODO: also index gitweb.owner and the full fingerprint for grokmirror?
 sub prep_repo ($$) {
 	my ($self, $git) = @_;
-	return if !$LIVE || $DO_QUIT || $git->{-cidx_err};
+	return if !$LIVE || $DO_QUIT;
+	return index_next($self) if $git->{-cidx_err};
 	my $repo = $git->{-repo} // die 'BUG: no {-repo}';
 	if (!defined($repo->{ct})) {
 		warn "W: $git->{git_dir} has no commits, skipping\n";
 		delete $git->{-repo};
-		return;
+		return index_next($self);
 	}
 	my $n = git_dir_hash($git->{git_dir}) % $self->{nshard};
 	my $shard = bless { %$self, shard => $n }, ref($self);
@@ -450,7 +415,7 @@ sub check_existing { # retry_reopen callback
 	my $old_fp = $REINDEX ? "\0invalid" : $doc->get_data;
 	if ($old_fp eq $git->{-repo}->{fp}) { # no change
 		delete $git->{-repo};
-		return;
+		return index_next($self);
 	}
 	$git->{-repo}->{docid} = $docid;
 	if (@docids) {
@@ -510,71 +475,88 @@ sub shard_commit { # via wq_io_do
 	send($op_p, "shard_done $self->{shard}", MSG_EOR);
 }
 
-sub consumer_open { # post_loop_do
-	my (undef, $c) = @_; # $c is PublicInbox::PktOp
-	$DO_QUIT ? undef : defined($c->{sock});
+sub index_next ($) {
+	my ($self) = @_;
+	return if $DO_QUIT;
+	if ($IDX_TODO && @$IDX_TODO) {
+		index_repo($self, shift @$IDX_TODO);
+	} elsif ($GIT_TODO && @$GIT_TODO) {
+		my $git = PublicInbox::Git->new(shift @$GIT_TODO);
+		my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo,
+							$self, $git);
+		fp_start($self, $git, $prep_repo);
+		ct_start($self, $git, $prep_repo);
+	}
+	# else: wait for shards_active (post_loop_do) callback
 }
 
-sub wait_active ($$$$) {
-	my ($self, $git, $active, $c) = @_;
-	local @PublicInbox::DS::post_loop_do = (\&consumer_open, $c);
-	PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
-	my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$active;
-	die "E: $git->{git_dir} $n shards failed" if $n && !$DO_QUIT;
+sub next_repos { # OnDestroy cb
+	my ($repo_ctx) = @_;
+	progress($repo_ctx->{self}, "$repo_ctx->{repo}->{git_dir}: done");
+	return if $DO_QUIT;
+	if ($REPO_CTX) {
+		$REPO_CTX == $repo_ctx or die "BUG: $REPO_CTX != $repo_ctx";
+		$REPO_CTX = undef;
+		index_next($repo_ctx->{self});
+	}
 }
 
-sub commit_active_shards ($$$) {
-	my ($self, $git, $active) = @_;
-	local $self->{-shard_ok} = {};
+sub commit_shard { # OnDestroy cb
+	my ($repo_ctx) = @_;
+	my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)};
+
+	my $n = grep { ! $repo_ctx->{shard_ok}->{$_} } keys %$active;
+	die "E: $repo->{git_dir} $n shards failed" if $n && !$DO_QUIT;
+
+	$repo_ctx->{shard_ok} = {};
+	if (!$DO_QUIT) {
+		my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo',
+								$repo);
+		(!defined($id) || $id <= 0) and
+			die "E: store_repo $repo->{git_dir}: id=$id";
+		$active->{$repo->{shard_n}} = undef;
+	}
+	my $next = PublicInbox::OnDestroy->new($$, \&next_repos, $repo_ctx);
 	my ($c, $p) = PublicInbox::PktOp->pair;
-	$c->{ops}->{shard_done} = [ $self ];
+	$c->{ops}->{shard_done} = [ $repo_ctx->{self}, $repo_ctx, $next ];
 	for my $n (keys %$active) {
 		$IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]);
 	}
-	undef $p;
-	wait_active($self, $git, $active, $c);
+	undef $p; # shard_done fires when all shards are committed
 }
 
 sub index_repo { # cidx_await cb
-	my ($self, $git, $roots) = @_;
-	return if $git->{-cidx_err} || $DO_QUIT;
-	my $repo = delete $git->{-repo} or return;
-	seek($roots, 0, SEEK_SET) or die "seek: $!";
-	chomp(my @roots = <$roots>);
-	close($roots) or die "close: $!";
-	@roots or return warn("E: $git->{git_dir} has no root commits\n");
+	my ($self, $git) = @_;
+	return if $DO_QUIT;
+	return index_next($self) if $git->{-cidx_err};
+	return push(@$IDX_TODO, $git) if $REPO_CTX; # busy
+	my $repo = delete $git->{-repo} or return index_next($self);
+	my $roots_fh = delete $repo->{roots_fh} // die 'BUG: no {roots_fh}';
+	seek($roots_fh, 0, SEEK_SET) or die "seek: $!";
+	chomp(my @roots = <$roots_fh>);
+	close($roots_fh) or die "close: $!";
+	if (!@roots) {
+		warn("E: $git->{git_dir} has no root commits\n");
+		return index_next($self);
+	}
 	$repo->{roots} = \@roots;
 	local $self->{current_info} = $git->{git_dir};
 	my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
-	local $self->{-shard_ok} = {};
-	my $active = {};
+	$repo->{git_dir} = $git->{git_dir};
+	my $repo_ctx = $REPO_CTX = { self => $self, repo => $repo };
+	my $commit_shard = PublicInbox::OnDestroy->new($$, \&commit_shard,
+							$repo_ctx);
 	my ($c, $p) = PublicInbox::PktOp->pair;
-	$c->{ops}->{shard_done} = [ $self ];
+	$c->{ops}->{shard_done} = [ $self, $repo_ctx, $commit_shard ];
 	for my $n (0..$#shard_in) {
 		-s $shard_in[$n] or next;
 		last if $DO_QUIT;
 		$IDX_SHARDS[$n]->wq_io_do('shard_index',
 					[ $shard_in[$n], $p->{op_p} ],
 					$git, \@roots);
-		$active->{$n} = undef;
+		$repo_ctx->{active}->{$n} = undef;
 	}
-	undef $p;
-	@shard_in = ();
-	wait_active($self, $git, $active, $c);
-	if ($DO_QUIT) {
-		commit_active_shards($self, $git, $active);
-		progress($self, "$git->{git_dir}: done");
-		return;
-	}
-	$repo->{git_dir} = $git->{git_dir};
-	my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
-	if ($id > 0) {
-		$active->{$repo->{shard_n}} = undef;
-		commit_active_shards($self, $git, $active);
-		progress($self, "$git->{git_dir}: done");
-		return run_deferred();
-	}
-	die "E: store_repo $git->{git_dir}: id=$id";
+	# shard_done fires when shard_index is done
 }
 
 sub get_roots ($$) {
@@ -582,11 +564,12 @@ sub get_roots ($$) {
 	return if !$LIVE || $DO_QUIT;
 	my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
 	sysseek($refs, 0, SEEK_SET) or die "seek: $!";
-	open my $roots, '+>', undef or die "open: $!";
+	open my $roots_fh, '+>', undef or die "open: $!";
 	my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
 			qw(rev-list --stdin --max-parents=0) ];
-	my $pid = spawn($cmd, undef, { 0 => $refs, 1 => $roots });
-	cidx_await($pid, $cmd, \&index_repo, $self, $git, $roots);
+	my $pid = spawn($cmd, undef, { 0 => $refs, 1 => $roots_fh });
+	$git->{-repo}->{roots_fh} = $roots_fh;
+	cidx_await($pid, $cmd, \&index_repo, $self, $git);
 }
 
 # for PublicInbox::SearchIdx::patch_id and with_umask
@@ -653,15 +636,8 @@ EOM
 
 sub scan_git_dirs ($) {
 	my ($self) = @_;
-	for (@{$self->{git_dirs}}) {
-		my $git = PublicInbox::Git->new($_);
-		my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo,
-							$self, $git);
-		fp_start($self, $git, $prep_repo);
-		ct_start($self, $git, $prep_repo);
-		last if $DO_QUIT;
-	}
-	cidx_reap($self, 0);
+	@$GIT_TODO = @{$self->{git_dirs}};
+	index_next($self) for (1..$LIVE_JOBS);
 }
 
 sub prune_cb { # git->check_async callback
@@ -734,7 +710,15 @@ sub prune_start { # via wq_io_do in IDX_SHARDS
 
 sub shards_active { # post_loop_do
 	return if $DO_QUIT;
-	scalar(grep { $_->{-cidx_quit} } (@IDX_SHARDS));
+	return if grep(defined, $PRUNE_DONE, $GIT_TODO, $IDX_TODO, $LIVE) != 4;
+	return 1 if grep(defined, @$PRUNE_DONE) != @IDX_SHARDS;
+	return 1 if scalar(@$GIT_TODO) || scalar(@$IDX_TODO) || $REPO_CTX;
+	return 1 if keys(%$LIVE);
+	for my $s (grep { $_->{-wq_s1} } @IDX_SHARDS) {
+		$s->{-cidx_quit} = 1;
+		$s->wq_close;
+	}
+	scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
 }
 
 # signal handlers
@@ -792,6 +776,7 @@ sub prep_umask ($) {
 
 sub start_prune ($) {
 	my ($self) = @_;
+	return (@$PRUNE_DONE = map { 1 } @IDX_SHARDS) if !$self->{-opt}->{prune};
 	init_tmp_git_dir($self);
 	my @active_git_dir = (@{$self->{git_dirs}}, @GIT_DIR_GONE);
 	my ($c, $p) = PublicInbox::PktOp->pair;
@@ -805,14 +790,15 @@ sub start_prune ($) {
 sub cidx_run { # main entry point
 	my ($self) = @_;
 	my $restore_umask = prep_umask($self);
-	local $DEFER = [];
 	local $SIGSET = PublicInbox::DS::block_signals(
 					POSIX::SIGTSTP, POSIX::SIGCONT);
 	my $restore = PublicInbox::OnDestroy->new($$,
 		\&PublicInbox::DS::sig_setmask, $SIGSET);
 	local $LIVE = {};
 	local $PRUNE_DONE = [];
-	local ($DO_QUIT, $TMP_GIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE);
+	local $IDX_TODO = [];
+	local ($DO_QUIT, $TMP_GIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE,
+		$GIT_TODO, $REPO_CTX);
 	local $BATCH_BYTES = $self->{-opt}->{batch_size} //
 				$PublicInbox::SearchIdx::BATCH_BYTES;
 	local @IDX_SHARDS = cidx_init($self);
@@ -858,14 +844,8 @@ sub cidx_run { # main entry point
 	local $LIVE_JOBS = $self->{-opt}->{jobs} ||
 			PublicInbox::IPC::detect_nproc() || 2;
 	local @RDONLY_XDB = $self->xdb_shards_flat;
-	start_prune($self) if $self->{-opt}->{prune};
+	start_prune($self);
 	scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
-	await_prune if $self->{-opt}->{prune};
-
-	for my $s (@IDX_SHARDS) {
-		$s->{-cidx_quit} = 1;
-		$s->wq_close;
-	}
 
 	local @PublicInbox::DS::post_loop_do = (\&shards_active);
 	PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active();
@@ -888,10 +868,11 @@ sub shard_done_wait { # awaitpid cb via ipc_worker_reap
 	return if $DO_QUIT || !$LIVE;
 	if ($? == 0) { # success
 		$quit_req // warn 'BUG: {-cidx_quit} unset';
-		return;
+	} else {
+		warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
+		++$self->{shard_err} if defined($self->{shard_err});
 	}
-	warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
-	++$self->{shard_err} if defined($self->{shard_err});
+	PublicInbox::DS::enqueue_reap() if !shards_active(); # once more for PLC
 }
 
 sub with_umask { # TODO get rid of this treewide and rely on OnDestroy

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 6/7] cindex: workaround for FreeBSD missing SIGCHLD
  2023-04-05 11:26 [PATCH 0/7] cindex: fix FreeBSD freezes Eric Wong
                   ` (4 preceding siblings ...)
  2023-04-05 11:26 ` [PATCH 5/7] cindex: enter event loop once per run Eric Wong
@ 2023-04-05 11:26 ` Eric Wong
  2023-04-05 11:26 ` [PATCH 7/7] cindex: reset DS internals on cidx_run completion Eric Wong
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-04-05 11:26 UTC (permalink / raw)
  To: meta

This likely affects all other *BSDs with kevent, as well;
since SIGCHLD is special w.r.t. EVFILT_SIGNAL
---
 lib/PublicInbox/CodeSearchIdx.pm | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 1000dc6f..82a96cf3 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -636,6 +636,11 @@ EOM
 
 sub scan_git_dirs ($) {
 	my ($self) = @_;
+
+	# FreeBSD ignores/discards SIGCHLD while signals are blocked and
+	# EVFILT_SIGNAL is inactive, so we pretend we have a SIGCHLD pending
+	PublicInbox::DS::enqueue_reap();
+
 	@$GIT_TODO = @{$self->{git_dirs}};
 	index_next($self) for (1..$LIVE_JOBS);
 }

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 7/7] cindex: reset DS internals on cidx_run completion
  2023-04-05 11:26 [PATCH 0/7] cindex: fix FreeBSD freezes Eric Wong
                   ` (5 preceding siblings ...)
  2023-04-05 11:26 ` [PATCH 6/7] cindex: workaround for FreeBSD missing SIGCHLD Eric Wong
@ 2023-04-05 11:26 ` Eric Wong
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-04-05 11:26 UTC (permalink / raw)
  To: meta

We may be invoking cidx_run multiple times per-process,
so ensure it can setup signalfd/EVFILT_SIGNAL across them.
---
 lib/PublicInbox/CodeSearchIdx.pm | 1 +
 1 file changed, 1 insertion(+)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 82a96cf3..f3d07f25 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -854,6 +854,7 @@ sub cidx_run { # main entry point
 
 	local @PublicInbox::DS::post_loop_do = (\&shards_active);
 	PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active();
+	PublicInbox::DS->Reset;
 	$self->lock_release(!!$NCHANGE);
 }
 

^ permalink raw reply related	[flat|nested] 8+ messages in thread

end of thread, other threads:[~2023-04-05 11:26 UTC | newest]

Thread overview: 8+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-04-05 11:26 [PATCH 0/7] cindex: fix FreeBSD freezes Eric Wong
2023-04-05 11:26 ` [PATCH 1/7] ipc: support awaitpid in WQ workers Eric Wong
2023-04-05 11:26 ` [PATCH 2/7] cindex: do prune work while waiting for `git log -p' Eric Wong
2023-04-05 11:26 ` [PATCH 3/7] cindex: share PktOp socket across prune workers Eric Wong
2023-04-05 11:26 ` [PATCH 4/7] cindex: share PktOp across indexing workers Eric Wong
2023-04-05 11:26 ` [PATCH 5/7] cindex: enter event loop once per run Eric Wong
2023-04-05 11:26 ` [PATCH 6/7] cindex: workaround for FreeBSD missing SIGCHLD Eric Wong
2023-04-05 11:26 ` [PATCH 7/7] cindex: reset DS internals on cidx_run completion Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).