unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 21/28] cindex: improve granularity of quit checks
Date: Tue, 21 Mar 2023 23:07:36 +0000	[thread overview]
Message-ID: <20230321230743.3020032-21-e@80x24.org> (raw)
In-Reply-To: <20230321230743.3020032-1-e@80x24.org>

This fixes shutdown handling when shard_index() isn't running
and ensures we can shut down the process more quickly.
---
 lib/PublicInbox/CodeSearchIdx.pm | 55 ++++++++++++++++++++------------
 1 file changed, 34 insertions(+), 21 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index f0b506da..4f91e0b6 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -206,6 +206,7 @@ sub shard_index { # via wq_io_do
 	# local-ized in parent before fork
 	$TXN_BYTES = $batch_bytes;
 	local $self->{git} = $git; # for patchid
+	return if $DO_QUIT;
 	my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
 	close $in or die "close: $!";
 	my $nr = 0;
@@ -216,10 +217,10 @@ sub shard_index { # via wq_io_do
 	my $len;
 	my $cmt = {};
 	local $/ = $FS;
-	my $buf = <$rd> // return; # leading $FS
+	my $buf = <$rd> // return close($rd); # leading $FS
 	$buf eq $FS or die "BUG: not LF-NUL: $buf\n";
 	$self->begin_txn_lazy;
-	while (defined($buf = <$rd>)) {
+	while (!$DO_QUIT && defined($buf = <$rd>)) {
 		chomp($buf);
 		$/ = "\n";
 		$len = length($buf);
@@ -234,7 +235,6 @@ sub shard_index { # via wq_io_do
 			$TXN_BYTES = $batch_bytes - $len;
 		}
 		add_commit($self, $cmt);
-		last if $DO_QUIT;
 		++$nr;
 		if ($TXN_BYTES <= 0) {
 			cidx_ckpoint($self, "[$n] $nr");
@@ -298,6 +298,7 @@ sub run_todo ($) {
 
 sub need_reap { # post_loop_do
 	my (undef, $jobs) = @_;
+	return if !$LIVE || $DO_QUIT;
 	scalar(keys(%$LIVE)) > $jobs;
 }
 
@@ -412,7 +413,7 @@ sub check_existing { # retry_reopen callback
 sub partition_refs ($$$) {
 	my ($self, $git, $refs) = @_; # show-ref --heads --tags --hash output
 	sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
-	my $fh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
+	my $rfh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
 	close $refs or die "close: $!";
 	my ($seen, $nchange) = (0, 0);
 	my @shard_in = map {
@@ -421,7 +422,7 @@ sub partition_refs ($$$) {
 		$fh;
 	} @RDONLY_SHARDS;
 
-	while (defined(my $cmt = <$fh>)) {
+	while (defined(my $cmt = <$rfh>)) {
 		chomp $cmt;
 		my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS);
 		if (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) {
@@ -431,8 +432,13 @@ sub partition_refs ($$$) {
 			++$nchange;
 			$seen = 0;
 		}
+		if ($DO_QUIT) {
+			close($rfh);
+			return ();
+		}
 	}
-	close($fh);
+	close($rfh);
+	return () if $DO_QUIT;
 	if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) {
 		$self->{nchange} += $nchange;
 		progress($self, "$git->{git_dir}: $nchange commits");
@@ -454,9 +460,18 @@ sub shard_commit { # via wq_io_do
 
 sub consumers_open { # post_loop_do
 	my (undef, $consumers) = @_;
+	return if $DO_QUIT;
 	scalar(grep { $_->{sock} } values %$consumers);
 }
 
+sub wait_consumers ($$$) {
+	my ($self, $git, $consumers) = @_;
+	local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
+	PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
+	my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
+	die "E: $git->{git_dir} $n shards failed" if $n && !$DO_QUIT;
+}
+
 sub commit_used_shards ($$$) {
 	my ($self, $git, $consumers) = @_;
 	local $self->{-shard_ok} = {};
@@ -466,15 +481,12 @@ sub commit_used_shards ($$$) {
 		$IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n);
 		$consumers->{$n} = $c;
 	}
-	local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
-	PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
-	my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
-	die "E: $git->{git_dir} $n shards failed" if $n;
+	wait_consumers($self, $git, $consumers);
 }
 
 sub index_repo { # cidx_await cb
 	my ($self, $git, $roots) = @_;
-	return if $git->{-cidx_err};
+	return if $git->{-cidx_err} || $DO_QUIT;
 	my $repo = delete $git->{-repo} or return;
 	seek($roots, 0, SEEK_SET) or die "seek: $!";
 	chomp(my @roots = <$roots>);
@@ -484,31 +496,29 @@ sub index_repo { # cidx_await cb
 	local $self->{current_info} = $git->{git_dir};
 	my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
 	local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1
-	my %CONSUMERS;
+	my $consumers = {};
 	for my $n (0..$#shard_in) {
 		-s $shard_in[$n] or next;
+		last if $DO_QUIT;
 		my ($c, $p) = PublicInbox::PktOp->pair;
 		$c->{ops}->{shard_done} = [ $self ];
 		$IDX_SHARDS[$n]->wq_io_do('shard_index',
 					[ $shard_in[$n], $p->{op_p} ],
 					$git, $n, \@roots);
-		$CONSUMERS{$n} = $c;
+		$consumers->{$n} = $c;
 	}
 	@shard_in = ();
-	local @PublicInbox::DS::post_loop_do = (\&consumers_open, \%CONSUMERS);
-	PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
-	my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS;
-	die "E: $git->{git_dir} $n shards failed" if $n;
+	wait_consumers($self, $git, $consumers);
 	if ($DO_QUIT) {
-		commit_used_shards($self, $git, \%CONSUMERS);
+		commit_used_shards($self, $git, $consumers);
 		progress($self, "$git->{git_dir}: done");
 		return;
 	}
 	$repo->{git_dir} = $git->{git_dir};
 	my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
 	if ($id > 0) {
-		$CONSUMERS{$repo->{shard_n}} = undef;
-		commit_used_shards($self, $git, \%CONSUMERS);
+		$consumers->{$repo->{shard_n}} = undef;
+		commit_used_shards($self, $git, $consumers);
 		progress($self, "$git->{git_dir}: done");
 		return run_todo($self);
 	}
@@ -585,6 +595,7 @@ sub scan_git_dirs ($) {
 							$self, $git);
 		fp_start($self, $git, $prep_repo);
 		ct_start($self, $git, $prep_repo);
+		last if $DO_QUIT;
 	}
 	cidx_reap($self, 0);
 }
@@ -674,8 +685,10 @@ sub ipc_atfork_child {
 
 sub shard_done_wait { # awaitpid cb via ipc_worker_reap
 	my ($pid, $shard, $self) = @_;
+	my $quit_req = delete($shard->{-cidx_quit});
+	return if $DO_QUIT || !$LIVE;
 	if ($? == 0) { # success
-		delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
+		$quit_req // warn 'BUG: {-cidx_quit} unset';
 		return;
 	}
 	warn "PID:$pid $shard->{shard} exited with \$?=$?\n";

  parent reply	other threads:[~2023-03-21 23:07 UTC|newest]

Thread overview: 30+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-03-21 23:07 [PATCH 00/28] cindex coderepo commit indexer Eric Wong
2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
2023-03-21 23:07   ` [PATCH 02/28] search: relocate all_terms from lei_search Eric Wong
2023-03-21 23:07   ` [PATCH 03/28] admin: hoist out resolve_git_dir Eric Wong
2023-03-21 23:07   ` [PATCH 04/28] admin: ensure resolved GIT_DIR is absolute Eric Wong
2023-03-21 23:07   ` [PATCH 05/28] test_common: create_inbox: use `$!' properly on mkdir failure Eric Wong
2023-03-21 23:07   ` [PATCH 06/28] codesearch: initial cut w/ -cindex tool Eric Wong
2023-03-21 23:07   ` [PATCH 07/28] cindex: parallelize prep phases Eric Wong
2023-03-21 23:07   ` [PATCH 08/28] cindex: use read-only shards during " Eric Wong
2023-03-21 23:07   ` [PATCH 09/28] searchidxshard: improve comment wording Eric Wong
2023-03-21 23:07   ` [PATCH 10/28] cindex: use DS and workqueues for parallelism Eric Wong
2023-03-21 23:07   ` [PATCH 11/28] ds: @post_loop_do replaces SetPostLoopCallback Eric Wong
2023-03-21 23:07   ` [PATCH 12/28] cindex: implement --exclude= like -clone Eric Wong
2023-03-21 23:07   ` [PATCH 13/28] cindex: show shard number in progress message Eric Wong
2023-03-21 23:07   ` [PATCH 14/28] cindex: drop `unchanged' " Eric Wong
2023-03-21 23:07   ` [PATCH 15/28] cindex: handle graceful shutdown by default Eric Wong
2023-03-21 23:07   ` [PATCH 16/28] sigfd: pass signal name rather than number to callback Eric Wong
2023-03-21 23:07   ` [PATCH 17/28] cindex: implement --max-size=SIZE Eric Wong
2023-03-21 23:07   ` [PATCH 18/28] cindex: check for checkpoint before giant messages Eric Wong
2023-03-21 23:07   ` [PATCH 19/28] cindex: truncate or drop body for over-sized commits Eric Wong
2023-03-21 23:07   ` [PATCH 20/28] cindex: attempt to give oldest commits lowest docids Eric Wong
2023-03-21 23:07   ` Eric Wong [this message]
2023-03-21 23:07   ` [PATCH 22/28] spawn: show failing directory for chdir failures Eric Wong
2023-03-21 23:07   ` [PATCH 23/28] cindex: filter out non-existent git directories Eric Wong
2023-03-21 23:07   ` [PATCH 24/28] cindex: add support for --prune Eric Wong
2023-03-21 23:07   ` [PATCH 25/28] cindex: implement reindex Eric Wong
2023-03-21 23:07   ` [PATCH 26/28] cindex: squelch incompatible options Eric Wong
2023-03-21 23:07   ` [PATCH 27/28] cindex: respect existing permissions Eric Wong
2023-03-21 23:07   ` [PATCH 28/28] cindex: ignore SIGPIPE Eric Wong
2023-03-24 10:40     ` [PATCH 29/28] cindex: --prune checkpoints to avoid OOM Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20230321230743.3020032-21-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).