unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 10/28] cindex: use DS and workqueues for parallelism
Date: Tue, 21 Mar 2023 23:07:25 +0000	[thread overview]
Message-ID: <20230321230743.3020032-10-e@80x24.org> (raw)
In-Reply-To: <20230321230743.3020032-1-e@80x24.org>

This avoids forking new shard processes for each repo we scan,
but we can't avoid many excessive commits since we need to
ensure the `seen()' sub can avoid excessive work.
---
 lib/PublicInbox/CodeSearchIdx.pm | 374 ++++++++++++++++++++-----------
 1 file changed, 240 insertions(+), 134 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 02c9ed84..13fe1c28 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -14,9 +14,11 @@
 # See PublicInbox::CodeSearch (read-only API) for more
 package PublicInbox::CodeSearchIdx;
 use v5.12;
-use parent qw(PublicInbox::Lock PublicInbox::CodeSearch PublicInbox::SearchIdx);
+# parent order matters, we want ->DESTROY from IPC, not SearchIdx
+use parent qw(PublicInbox::CodeSearch PublicInbox::IPC PublicInbox::SearchIdx);
 use PublicInbox::Eml;
-use PublicInbox::DS ();
+use PublicInbox::DS qw(awaitpid);
+use PublicInbox::PktOp;
 use PublicInbox::IPC qw(nproc_shards);
 use PublicInbox::Admin;
 use POSIX qw(WNOHANG SEEK_SET);
@@ -26,11 +28,19 @@ use PublicInbox::SHA qw(sha256_hex);
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::SearchIdx qw(add_val);
 use PublicInbox::Config;
-use PublicInbox::Spawn qw(spawn);
+use PublicInbox::Spawn qw(spawn popen_rd);
 use PublicInbox::OnDestroy;
-our $LIVE; # pid => callback
-our $LIVE_JOBS;
-our @XDB_SHARDS_FLAT;
+use Socket qw(MSG_EOR);
+use Carp ();
+our (
+	$LIVE, # pid => cmd
+	$DEFER, # [ [ cb, @args ], ... ]
+	$LIVE_JOBS, # integer
+	$MY_SIG, # like %SIG
+	$SIGSET,
+	@RDONLY_SHARDS, # Xapian::Database
+	@IDX_SHARDS # clones of self
+);
 
 # stop walking history if we see >$SEEN_MAX existing commits, this assumes
 # branches don't diverge by more than this number of commits...
@@ -110,14 +120,14 @@ sub progress {
 	$pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n");
 }
 
-sub store_repo ($$$) {
-	my ($self, $git, $repo) = @_;
-	my $xdb = delete($repo->{shard})->idx_acquire;
-	$xdb->begin_transaction;
+sub store_repo { # wq_do - returns docid
+	my ($self, $repo) = @_;
+	$self->begin_txn_lazy;
+	my $xdb = $self->{xdb};
 	for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed?
-	if (defined $repo->{id}) {
-		my $doc = $xdb->get_document($repo->{id}) //
-			die "$git->{git_dir} doc #$repo->{id} gone";
+	if (defined $repo->{docid}) {
+		my $doc = $xdb->get_document($repo->{docid}) //
+			die "$repo->{git_dir} doc #$repo->{docid} gone";
 		add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct});
 		my %new = map { $_ => undef } @{$repo->{roots}};
 		my $old = xap_terms('G', $doc);
@@ -126,34 +136,38 @@ sub store_repo ($$$) {
 		delete @$old{@{$repo->{roots}}};
 		$doc->remove_term('G'.$_) for keys %$old;
 		$doc->set_data($repo->{fp});
-		$xdb->replace_document($repo->{id}, $doc);
+		$xdb->replace_document($repo->{docid}, $doc);
+		$repo->{docid}
 	} else {
 		my $new = $PublicInbox::Search::X{Document}->new;
 		add_val($new, PublicInbox::CodeSearch::CT, $repo->{ct});
-		$new->add_boolean_term("P$git->{git_dir}");
+		$new->add_boolean_term("P$repo->{git_dir}");
 		$new->add_boolean_term('T'.'r');
 		$new->add_boolean_term('G'.$_) for @{$repo->{roots}};
 		$new->set_data($repo->{fp}); # \n delimited
 		$xdb->add_document($new);
 	}
-	$xdb->commit_transaction;
 }
 
 # sharded reader for `git log --pretty=format: --stdin'
-sub shard_worker ($$$) {
-	my ($self, $r, $sigset) = @_;
+sub shard_index { # via wq_io_do
+	my ($self, $git, $n, $roots) = @_;
+	local $self->{current_info} = "$git->{git_dir} [$n]";
 	my ($quit, $cmt);
+	local $self->{roots} = $roots;
+	my $in = delete($self->{0}) // die 'BUG: no {0} input';
+	my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
 	my $batch_bytes = $self->{-opt}->{batch_size} //
 				$PublicInbox::SearchIdx::BATCH_BYTES;
 	my $max = $batch_bytes;
-	$SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import'
-	$SIG{QUIT} = $SIG{TERM} = $SIG{INT} = sub { $quit = shift };
-	PublicInbox::DS::sig_setmask($sigset);
-
-	# the parent process of this shard process writes directly to
-	# the stdin of `git log', we consume git log's stdout:
-	my $rd = $self->{git}->popen(@LOG_STDIN, undef, { 0 => $r });
-	close $r or die "close: $!";
+	my $set_quit = sub { $quit = shift };
+	local $SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import'
+	local $SIG{QUIT} = $set_quit;
+	local $SIG{TERM} = $set_quit;
+	local $SIG{INT} = $set_quit;
+	local $self->{git} = $git; # for patchid
+	my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
+	close $in or die "close: $!";
 	my $nr = 0;
 
 	# a patch may have \0, see c4201214cbf10636e2c1ab9131573f735b42c8d4
@@ -162,8 +176,7 @@ sub shard_worker ($$$) {
 	local $/ = $FS;
 	my $buf = <$rd> // return; # leading $FS
 	$buf eq $FS or die "BUG: not LF-NUL: $buf\n";
-	my $xdb = $self->idx_acquire;
-	$xdb->begin_transaction;
+	$self->begin_txn_lazy;
 	while (defined($buf = <$rd>)) {
 		chomp($buf);
 		$max -= length($buf);
@@ -174,24 +187,40 @@ sub shard_worker ($$$) {
 		++$nr;
 		if ($max <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
 			progress($self, $nr);
-			$xdb->commit_transaction;
+			$self->{xdb}->commit_transaction;
 			$max = $batch_bytes;
-			$xdb->begin_transaction;
+			$self->{xdb}->begin_transaction;
 		}
 		$/ = $FS;
 	}
 	close($rd);
 	if (!$? || ($quit && ($? & 127) == POSIX::SIGPIPE)) {
-		$xdb->commit_transaction;
+		send($op_p, "shard_done $n", MSG_EOR);
 	} else {
 		warn "E: git @LOG_STDIN: \$?=$?\n";
-		$xdb->cancel_transaction;
+		$self->{xdb}->cancel_transaction;
 	}
 }
 
+sub shard_done { # called via PktOp on shard_index completion
+	my ($self, $n) = @_;
+	$self->{-shard_ok}->{$n} = 1 if defined($self->{-shard_ok});
+}
+
 sub seen ($$) {
 	my ($xdb, $q) = @_; # $q = "Q$COMMIT_HASH"
-	$xdb->postlist_begin($q) != $xdb->postlist_end($q)
+	for (1..100) {
+		my $ret = eval {
+			$xdb->postlist_begin($q) != $xdb->postlist_end($q);
+		};
+		return $ret unless $@;
+		if (ref($@) =~ /\bDatabaseModifiedError\b/) {
+			$xdb->reopen;
+		} else {
+			Carp::croak($@);
+		}
+	}
+	Carp::croak('too many Xapian DB modifications in progress');
 }
 
 # used to select the shard for a GIT_DIR
@@ -206,18 +235,42 @@ sub docids_by_postlist ($$) { # consider moving to PublicInbox::Search
 	@ids;
 }
 
+sub run_todo ($) {
+	my ($self) = @_;
+	my $n;
+	while (defined(my $x = shift(@{$self->{todo} // []}))) {
+		my $cb = shift @$x;
+		$cb->(@$x);
+		++$n;
+	}
+	$n;
+}
+
 sub cidx_reap ($$) {
 	my ($self, $jobs) = @_;
-	while (keys(%$LIVE) >= $jobs) {
-		my $pid = waitpid(-1, 0) // die "waitpid(-1): $!";
-		last if $pid < 0;
-		if (my $x = delete $LIVE->{$pid}) {
-			my $cb = shift @$x;
-			$cb->(@$x) if $cb;
-		} else {
-			warn "reaped unknown PID=$pid ($?)\n";
-		}
+	while (run_todo($self)) {}
+	my $cb = sub { keys(%$LIVE) > $jobs };
+	PublicInbox::DS->SetPostLoopCallback($cb);
+	PublicInbox::DS::event_loop($MY_SIG, $SIGSET) while $cb->();
+	while (!$jobs && run_todo($self)) {}
+}
+
+sub cidx_await_cb { # awaitpid cb
+	my ($pid, $cb, $self, $git, @args) = @_;
+	return if !$LIVE; # premature shutdown
+	my $cmd = delete $LIVE->{$pid} // die 'BUG: no $cmd';
+	PublicInbox::DS::enqueue_reap() if !keys(%$LIVE); # once more for PLC
+	if ($?) {
+		$git->{-cidx_err} = 1;
+		return warn("@$cmd error: \$?=$?\n");
 	}
+	push(@$DEFER, [ $cb, $self, $git, @args ]) if $DEFER;
+}
+
+sub cidx_await ($$$$$@) {
+	my ($pid, $cmd, $cb, $self, $git, @args) = @_;
+	$LIVE->{$pid} = $cmd;
+	awaitpid($pid, \&cidx_await_cb, $cb, $self, $git, @args);
 }
 
 # this is different from the grokmirror-compatible fingerprint since we
@@ -227,13 +280,14 @@ sub fp_start ($$$) {
 	return if !$LIVE; # premature exit
 	cidx_reap($self, $LIVE_JOBS);
 	open my $refs, '+>', undef or die "open: $!";
-	my $pid = spawn(['git', "--git-dir=$git->{git_dir}",
-		qw(show-ref --heads --tags --hash)], undef, { 1 => $refs });
+	my $cmd = ['git', "--git-dir=$git->{git_dir}",
+		qw(show-ref --heads --tags --hash)];
+	my $pid = spawn($cmd, undef, { 1 => $refs });
 	$git->{-repo}->{refs} = $refs;
-	$LIVE->{$pid} = [ \&fp_fini, $self, $git, $prep_repo ];
+	cidx_await($pid, $cmd, \&fp_fini, $self, $git, $prep_repo);
 }
 
-sub fp_fini {
+sub fp_fini { # cidx_await cb
 	my ($self, $git, $prep_repo) = @_;
 	my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
 	seek($refs, 0, SEEK_SET) or die "seek: $!";
@@ -247,13 +301,15 @@ sub ct_start ($$$) {
 	my ($self, $git, $prep_repo) = @_;
 	return if !$LIVE; # premature exit
 	cidx_reap($self, $LIVE_JOBS);
-	my ($rd, $pid) = $git->popen([qw[for-each-ref --sort=-committerdate
+	my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
+		qw[for-each-ref --sort=-committerdate
 		--format=%(committerdate:raw) --count=1
-		refs/heads/ refs/tags/]]);
-	$LIVE->{$pid} = [ \&ct_fini, $self, $git, $rd, $prep_repo ];
+		refs/heads/ refs/tags/] ];
+	my ($rd, $pid) = popen_rd($cmd);
+	cidx_await($pid, $cmd, \&ct_fini, $self, $git, $rd, $prep_repo);
 }
 
-sub ct_fini {
+sub ct_fini { # cidx_await cb
 	my ($self, $git, $rd, $prep_repo) = @_;
 	defined(my $ct = <$rd>) or return;
 	$ct =~ s/\s+.*\z//s; # drop TZ + LF
@@ -263,34 +319,38 @@ sub ct_fini {
 # TODO: also index gitweb.owner and the full fingerprint for grokmirror?
 sub prep_repo ($$) {
 	my ($self, $git) = @_;
-	return if !$LIVE; # premature exit
+	return if !$LIVE || $git->{-cidx_err}; # premature exit
 	my $repo = $git->{-repo} // die 'BUG: no {-repo}';
-	my $git_dir = $git->{git_dir};
 	if (!defined($repo->{ct})) {
-		warn "W: $git_dir has no commits, skipping\n";
+		warn "W: $git->{git_dir} has no commits, skipping\n";
 		delete $git->{-repo};
 		return;
 	}
-	my $n = git_dir_hash($git_dir) % $self->{nshard};
-	my $shard = $repo->{shard} = bless { %$self, shard => $n }, ref($self);
+	my $n = git_dir_hash($git->{git_dir}) % $self->{nshard};
+	my $shard = bless { %$self, shard => $n }, ref($self);
+	$repo->{shard_n} = $n;
 	delete @$shard{qw(lockfh lock_path)};
-	my $xdb = $XDB_SHARDS_FLAT[$n] // die "BUG: shard[$n] undef";
-	$xdb->reopen;
-	my @docids = docids_by_postlist({ xdb => $xdb }, 'P'.$git_dir);
+	local $shard->{xdb} = $RDONLY_SHARDS[$n] // die "BUG: shard[$n] undef";
+	$shard->retry_reopen(\&check_existing, $self, $git);
+}
+
+sub check_existing { # retry_reopen callback
+	my ($shard, $self, $git) = @_;
+	my @docids = docids_by_postlist($shard, 'P'.$git->{git_dir});
 	my $docid = shift(@docids) // return get_roots($self, $git);
-	if (@docids) {
-		warn "BUG: $git_dir indexed multiple times, culling\n";
-		$repo->{to_delete} = \@docids; # XXX needed?
-	}
-	my $doc = $xdb->get_document($docid) //
-		die "BUG: no #$docid ($git_dir)";
+	my $doc = $shard->{xdb}->get_document($docid) //
+			die "BUG: no #$docid ($git->{git_dir})";
 	my $old_fp = $doc->get_data;
-	if ($old_fp eq $repo->{fp}) { # no change
-		progress($self, "$git_dir unchanged");
+	if ($old_fp eq $git->{-repo}->{fp}) { # no change
+		progress($self, "$git->{git_dir} unchanged");
 		delete $git->{-repo};
 		return;
 	}
-	$repo->{id} = $docid;
+	$git->{-repo}->{docid} = $docid;
+	if (@docids) {
+		warn "BUG: $git->{git_dir} indexed multiple times, culling\n";
+		$git->{-repo}->{to_delete} = \@docids; # XXX needed?
+	}
 	get_roots($self, $git);
 }
 
@@ -304,12 +364,12 @@ sub partition_refs ($$$) {
 		$_->reopen;
 		open my $fh, '+>', undef or die "open: $!";
 		$fh;
-	} @XDB_SHARDS_FLAT;
+	} @RDONLY_SHARDS;
 
 	while (defined(my $cmt = <$fh>)) {
 		chomp $cmt;
-		my $n = hex(substr($cmt, 0, 8)) % scalar(@XDB_SHARDS_FLAT);
-		if (seen($XDB_SHARDS_FLAT[$n], 'Q'.$cmt)) {
+		my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS);
+		if (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) {
 			last if ++$seen > $SEEN_MAX;
 		} else {
 			say { $shard_in[$n] } $cmt or die "say: $!";
@@ -330,9 +390,33 @@ sub partition_refs ($$$) {
 	die "git --git-dir=$git->{git_dir} rev-list: \$?=$?\n";
 }
 
-sub index_repo {
+sub shard_commit { # via wq_io_do
+	my ($self, $n) = @_;
+	my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
+	$self->commit_txn_lazy;
+	send($op_p, "shard_done $n", MSG_EOR);
+}
+
+sub commit_used_shards ($$$) {
+	my ($self, $git, $consumers) = @_;
+	local $self->{-shard_ok} = {};
+	for my $n (keys %$consumers) {
+		my ($c, $p) = PublicInbox::PktOp->pair;
+		$c->{ops}->{shard_done} = [ $self ];
+		$IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n);
+		$consumers->{$n} = $c;
+	}
+	PublicInbox::DS->SetPostLoopCallback(sub {
+		scalar(grep { $_->{sock} } values %$consumers);
+	});
+	PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
+	my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
+	die "E: $git->{git_dir} $n shards failed" if $n;
+}
+
+sub index_repo { # cidx_await cb
 	my ($self, $git, $roots) = @_;
-	return if !$LIVE; # premature exit
+	return if $git->{-cidx_err};
 	my $repo = delete $git->{-repo} or return;
 	seek($roots, 0, SEEK_SET) or die "seek: $!";
 	chomp(my @roots = <$roots>);
@@ -341,73 +425,45 @@ sub index_repo {
 	$repo->{roots} = \@roots;
 	local $self->{current_info} = $git->{git_dir};
 	my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
-	my %pids;
-	my $fwd_kill = sub {
-		my ($sig) = @_;
-		kill($sig, $_) for keys %pids;
-	};
-	local $SIG{USR1} = $fwd_kill;
-	local $SIG{QUIT} = $fwd_kill;
-	local $SIG{INT} = $fwd_kill;
-	local $SIG{TERM} = $fwd_kill;
-	my $sigset = PublicInbox::DS::block_signals();
-	for (my $n = 0; $n <= $#shard_in; $n++) {
+	local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1
+	my %CONSUMERS;
+	for my $n (0..$#shard_in) {
 		-s $shard_in[$n] or next;
-		my $pid = fork // die "fork: $!";
-		if ($pid == 0) { # no RNG use, here
-			$0 = "code index [$n]";
-			$self->{git} = $git;
-			$self->{shard} = $n;
-			$self->{current_info} = "$self->{current_info} [$n]";
-			delete @$self{qw(lockfh lock_path)};
-			my $in = $shard_in[$n];
-			@shard_in = ();
-			$self->{roots} = \@roots;
-			undef $repo;
-			eval { shard_worker($self, $in, $sigset) };
-			warn "E: $@" if $@;
-			POSIX::_exit($@ ? 1 : 0);
-		} else {
-			$pids{$pid} = "code index [$n]";
-		}
+		my ($c, $p) = PublicInbox::PktOp->pair;
+		$c->{ops}->{shard_done} = [ $self ];
+		$IDX_SHARDS[$n]->wq_io_do('shard_index',
+					[ $shard_in[$n], $p->{op_p} ],
+					$git, $n, \@roots);
+		$CONSUMERS{$n} = $c;
 	}
-	PublicInbox::DS::sig_setmask($sigset);
 	@shard_in = ();
-	my ($err, @todo);
-	while (keys %pids) {
-		my $pid = waitpid(-1, 0) // die "waitpid: $!";
-		if (my $j = delete $pids{$pid}) {
-			next if $? == 0;
-			warn "PID:$pid $j exited with \$?=$?\n";
-			$err = 1;
-		} elsif (my $todo = delete $LIVE->{$pid}) {
-			warn "PID:$pid exited with \$?=$?\n" if $?;
-			push @todo, $todo;
-		} else {
-			warn "reaped unknown PID=$pid ($?)\n";
-		}
-	}
-	die "subprocess(es) failed\n" if $err;
-	store_repo($self, $git, $repo);
-	progress($self, "$git->{git_dir}: done");
-	# TODO: check fp afterwards?
-	while (my $x = shift @todo) {
-		my $cb = shift @$x;
-		$cb->(@$x) if $cb;
+	PublicInbox::DS->SetPostLoopCallback(sub {
+		scalar(grep { $_->{sock} } values %CONSUMERS);
+	});
+	PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
+	my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS;
+	die "E: $git->{git_dir} $n shards failed" if $n;
+	$repo->{git_dir} = $git->{git_dir};
+	my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
+	if ($id > 0) {
+		$CONSUMERS{$repo->{shard_n}} = undef;
+		commit_used_shards($self, $git, \%CONSUMERS);
+		progress($self, "$git->{git_dir}: done");
+		return run_todo($self);
 	}
+	die "E: store_repo $git->{git_dir}: id=$id";
 }
 
 sub get_roots ($$) {
 	my ($self, $git) = @_;
 	return if !$LIVE; # premature exit
-	cidx_reap($self, $LIVE_JOBS);
 	my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
 	sysseek($refs, 0, SEEK_SET) or die "seek: $!";
 	open my $roots, '+>', undef or die "open: $!";
-	my $pid = spawn(['git', "--git-dir=$git->{git_dir}",
-			qw(rev-list --stdin --max-parents=0)],
-			undef, { 0 => $refs, 1 => $roots });
-	$LIVE->{$pid} = [ \&index_repo, $self, $git, $roots ];
+	my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
+			qw(rev-list --stdin --max-parents=0) ];
+	my $pid = spawn($cmd, undef, { 0 => $refs, 1 => $roots });
+	cidx_await($pid, $cmd, \&index_repo, $self, $git, $roots);
 }
 
 # for PublicInbox::SearchIdx::patch_id and with_umask
@@ -434,9 +490,17 @@ sub cidx_init ($) {
 		warn "# creating $dir\n" if !$self->{-opt}->{quiet};
 		File::Path::mkpath($dir);
 	}
+	$self->lock_acquire;
+	my @shards;
 	for my $n (0..($self->{nshard} - 1)) {
 		my $shard = bless { %$self, shard => $n }, ref($self);
+		delete @$shard{qw(lockfh lock_path)};
 		$shard->idx_acquire;
+		$shard->idx_release;
+		$shard->wq_workers_start("shard[$n]", 1, undef, {
+			siblings => \@shards, # for ipc_atfork_child
+		}, \&shard_done_wait, $self);
+		push @shards, $shard;
 	}
 	# this warning needs to happen after idx_acquire
 	state $once;
@@ -444,14 +508,11 @@ sub cidx_init ($) {
 W: Xapian v1.2.21..v1.2.24 were missing close-on-exec on OFD locks,
 W: memory usage may be high for large indexing runs
 EOM
+	@shards;
 }
 
 sub scan_git_dirs ($) {
 	my ($self) = @_;
-	local $LIVE_JOBS = $self->{-opt}->{jobs} //
-			PublicInbox::IPC::detect_nproc() // 2;
-	local $LIVE = {};
-	local @XDB_SHARDS_FLAT = $self->xdb_shards_flat;
 	for (@{$self->{git_dirs}}) {
 		my $git = PublicInbox::Git->new($_);
 		my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo,
@@ -462,18 +523,31 @@ sub scan_git_dirs ($) {
 	cidx_reap($self, 0);
 }
 
-sub cidx_run {
+sub shards_active { # PostLoopCallback
+	scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
+}
+
+sub cidx_run { # main entry point
 	my ($self) = @_;
-	cidx_init($self);
+	local $self->{todo} = [];
+	local $DEFER = $self->{todo};
+	local $SIGSET = PublicInbox::DS::block_signals();
+	my $restore = PublicInbox::OnDestroy->new($$,
+		\&PublicInbox::DS::sig_setmask, $SIGSET);
+	local $LIVE = {};
+	local @IDX_SHARDS = cidx_init($self);
 	local $self->{current_info} = '';
 	my $cb = $SIG{__WARN__} || \&CORE::warn;
+	local $MY_SIG = {
+		CHLD => \&PublicInbox::DS::enqueue_reap,
+		INT => sub { exit },
+	};
 	local $SIG{__WARN__} = sub {
 		my $m = shift @_;
 		$self->{current_info} eq '' or
 			$m =~ s/\A(#?\s*)/$1$self->{current_info}: /;
 		$cb->($m, @_);
 	};
-	$self->lock_acquire;
 	load_existing($self);
 	my @nc = grep { File::Spec->canonpath($_) ne $_ } @{$self->{git_dirs}};
 	if (@nc) {
@@ -486,9 +560,41 @@ sub cidx_run {
 		warn "E: canonicalized and attempting to continue\n";
 	}
 	local $self->{nchange} = 0;
+	local $LIVE_JOBS = $self->{-opt}->{jobs} ||
+			PublicInbox::IPC::detect_nproc() || 2;
+	local @RDONLY_SHARDS = $self->xdb_shards_flat;
+
 	# do_prune($self) if $self->{-opt}->{prune}; TODO
 	scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
+
+	for my $s (@IDX_SHARDS) {
+		$s->{-cidx_quit} = 1;
+		$s->wq_close;
+	}
+
+	PublicInbox::DS->SetPostLoopCallback(\&shards_active);
+	PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active();
 	$self->lock_release(!!$self->{nchange});
 }
 
+sub ipc_atfork_child {
+	my ($self) = @_;
+	$self->SUPER::ipc_atfork_child;
+	my $x = delete $self->{siblings} // die 'BUG: no {siblings}';
+	$_->wq_close for @$x;
+}
+
+sub shard_done_wait { # awaitpid cb via ipc_worker_reap
+	my ($pid, $shard, $self) = @_;
+	delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
+	return unless $?;
+	warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
+	++$self->{shard_err} if defined($self->{shard_err});
+}
+
+sub with_umask { # TODO
+	my ($self, $cb, @arg) = @_;
+	$cb->(@arg);
+}
+
 1;

  parent reply	other threads:[~2023-03-21 23:07 UTC|newest]

Thread overview: 30+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-03-21 23:07 [PATCH 00/28] cindex coderepo commit indexer Eric Wong
2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
2023-03-21 23:07   ` [PATCH 02/28] search: relocate all_terms from lei_search Eric Wong
2023-03-21 23:07   ` [PATCH 03/28] admin: hoist out resolve_git_dir Eric Wong
2023-03-21 23:07   ` [PATCH 04/28] admin: ensure resolved GIT_DIR is absolute Eric Wong
2023-03-21 23:07   ` [PATCH 05/28] test_common: create_inbox: use `$!' properly on mkdir failure Eric Wong
2023-03-21 23:07   ` [PATCH 06/28] codesearch: initial cut w/ -cindex tool Eric Wong
2023-03-21 23:07   ` [PATCH 07/28] cindex: parallelize prep phases Eric Wong
2023-03-21 23:07   ` [PATCH 08/28] cindex: use read-only shards during " Eric Wong
2023-03-21 23:07   ` [PATCH 09/28] searchidxshard: improve comment wording Eric Wong
2023-03-21 23:07   ` Eric Wong [this message]
2023-03-21 23:07   ` [PATCH 11/28] ds: @post_loop_do replaces SetPostLoopCallback Eric Wong
2023-03-21 23:07   ` [PATCH 12/28] cindex: implement --exclude= like -clone Eric Wong
2023-03-21 23:07   ` [PATCH 13/28] cindex: show shard number in progress message Eric Wong
2023-03-21 23:07   ` [PATCH 14/28] cindex: drop `unchanged' " Eric Wong
2023-03-21 23:07   ` [PATCH 15/28] cindex: handle graceful shutdown by default Eric Wong
2023-03-21 23:07   ` [PATCH 16/28] sigfd: pass signal name rather than number to callback Eric Wong
2023-03-21 23:07   ` [PATCH 17/28] cindex: implement --max-size=SIZE Eric Wong
2023-03-21 23:07   ` [PATCH 18/28] cindex: check for checkpoint before giant messages Eric Wong
2023-03-21 23:07   ` [PATCH 19/28] cindex: truncate or drop body for over-sized commits Eric Wong
2023-03-21 23:07   ` [PATCH 20/28] cindex: attempt to give oldest commits lowest docids Eric Wong
2023-03-21 23:07   ` [PATCH 21/28] cindex: improve granularity of quit checks Eric Wong
2023-03-21 23:07   ` [PATCH 22/28] spawn: show failing directory for chdir failures Eric Wong
2023-03-21 23:07   ` [PATCH 23/28] cindex: filter out non-existent git directories Eric Wong
2023-03-21 23:07   ` [PATCH 24/28] cindex: add support for --prune Eric Wong
2023-03-21 23:07   ` [PATCH 25/28] cindex: implement reindex Eric Wong
2023-03-21 23:07   ` [PATCH 26/28] cindex: squelch incompatible options Eric Wong
2023-03-21 23:07   ` [PATCH 27/28] cindex: respect existing permissions Eric Wong
2023-03-21 23:07   ` [PATCH 28/28] cindex: ignore SIGPIPE Eric Wong
2023-03-24 10:40     ` [PATCH 29/28] cindex: --prune checkpoints to avoid OOM Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20230321230743.3020032-10-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).