From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 7E2B01F507 for ; Tue, 21 Mar 2023 23:07:45 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1679440065; bh=d1nXooePlEfY/Qbt6M6SWKDruPxnOQP52ZxS7dqKlV8=; h=From:To:Subject:Date:In-Reply-To:References:From; b=TFFUKm4vKg7OJDuxJvAVaRf3H/8avnCvUnbOpx7ef83LjNVgmTIHKdUk3L0IZnogi q0DXmN0TzHbjT0jXZwIZsuP1Z/LJa27oXwbZ32mqfHK9cbtQPvv2z2DF+ESf/KvfSW +uZBc8zrhI/1sxM1oZB6TnAn2pB0T1VRgKXhCZtc= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 10/28] cindex: use DS and workqueues for parallelism Date: Tue, 21 Mar 2023 23:07:25 +0000 Message-Id: <20230321230743.3020032-10-e@80x24.org> In-Reply-To: <20230321230743.3020032-1-e@80x24.org> References: <20230321230701.3019936-1-e@80x24.org> <20230321230743.3020032-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This avoids forking new shard processes for each repo we scan, but we can't avoid many excessive commits since we need to ensure the `seen()' sub can avoid excessive work. --- lib/PublicInbox/CodeSearchIdx.pm | 374 ++++++++++++++++++++----------- 1 file changed, 240 insertions(+), 134 deletions(-) diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index 02c9ed84..13fe1c28 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -14,9 +14,11 @@ # See PublicInbox::CodeSearch (read-only API) for more package PublicInbox::CodeSearchIdx; use v5.12; -use parent qw(PublicInbox::Lock PublicInbox::CodeSearch PublicInbox::SearchIdx); +# parent order matters, we want ->DESTROY from IPC, not SearchIdx +use parent qw(PublicInbox::CodeSearch PublicInbox::IPC PublicInbox::SearchIdx); use PublicInbox::Eml; -use PublicInbox::DS (); +use PublicInbox::DS qw(awaitpid); +use PublicInbox::PktOp; use PublicInbox::IPC qw(nproc_shards); use PublicInbox::Admin; use POSIX qw(WNOHANG SEEK_SET); @@ -26,11 +28,19 @@ use PublicInbox::SHA qw(sha256_hex); use PublicInbox::Search qw(xap_terms); use PublicInbox::SearchIdx qw(add_val); use PublicInbox::Config; -use PublicInbox::Spawn qw(spawn); +use PublicInbox::Spawn qw(spawn popen_rd); use PublicInbox::OnDestroy; -our $LIVE; # pid => callback -our $LIVE_JOBS; -our @XDB_SHARDS_FLAT; +use Socket qw(MSG_EOR); +use Carp (); +our ( + $LIVE, # pid => cmd + $DEFER, # [ [ cb, @args ], ... ] + $LIVE_JOBS, # integer + $MY_SIG, # like %SIG + $SIGSET, + @RDONLY_SHARDS, # Xapian::Database + @IDX_SHARDS # clones of self +); # stop walking history if we see >$SEEN_MAX existing commits, this assumes # branches don't diverge by more than this number of commits... @@ -110,14 +120,14 @@ sub progress { $pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n"); } -sub store_repo ($$$) { - my ($self, $git, $repo) = @_; - my $xdb = delete($repo->{shard})->idx_acquire; - $xdb->begin_transaction; +sub store_repo { # wq_do - returns docid + my ($self, $repo) = @_; + $self->begin_txn_lazy; + my $xdb = $self->{xdb}; for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed? - if (defined $repo->{id}) { - my $doc = $xdb->get_document($repo->{id}) // - die "$git->{git_dir} doc #$repo->{id} gone"; + if (defined $repo->{docid}) { + my $doc = $xdb->get_document($repo->{docid}) // + die "$repo->{git_dir} doc #$repo->{docid} gone"; add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct}); my %new = map { $_ => undef } @{$repo->{roots}}; my $old = xap_terms('G', $doc); @@ -126,34 +136,38 @@ sub store_repo ($$$) { delete @$old{@{$repo->{roots}}}; $doc->remove_term('G'.$_) for keys %$old; $doc->set_data($repo->{fp}); - $xdb->replace_document($repo->{id}, $doc); + $xdb->replace_document($repo->{docid}, $doc); + $repo->{docid} } else { my $new = $PublicInbox::Search::X{Document}->new; add_val($new, PublicInbox::CodeSearch::CT, $repo->{ct}); - $new->add_boolean_term("P$git->{git_dir}"); + $new->add_boolean_term("P$repo->{git_dir}"); $new->add_boolean_term('T'.'r'); $new->add_boolean_term('G'.$_) for @{$repo->{roots}}; $new->set_data($repo->{fp}); # \n delimited $xdb->add_document($new); } - $xdb->commit_transaction; } # sharded reader for `git log --pretty=format: --stdin' -sub shard_worker ($$$) { - my ($self, $r, $sigset) = @_; +sub shard_index { # via wq_io_do + my ($self, $git, $n, $roots) = @_; + local $self->{current_info} = "$git->{git_dir} [$n]"; my ($quit, $cmt); + local $self->{roots} = $roots; + my $in = delete($self->{0}) // die 'BUG: no {0} input'; + my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p'; my $batch_bytes = $self->{-opt}->{batch_size} // $PublicInbox::SearchIdx::BATCH_BYTES; my $max = $batch_bytes; - $SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import' - $SIG{QUIT} = $SIG{TERM} = $SIG{INT} = sub { $quit = shift }; - PublicInbox::DS::sig_setmask($sigset); - - # the parent process of this shard process writes directly to - # the stdin of `git log', we consume git log's stdout: - my $rd = $self->{git}->popen(@LOG_STDIN, undef, { 0 => $r }); - close $r or die "close: $!"; + my $set_quit = sub { $quit = shift }; + local $SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import' + local $SIG{QUIT} = $set_quit; + local $SIG{TERM} = $set_quit; + local $SIG{INT} = $set_quit; + local $self->{git} = $git; # for patchid + my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in }); + close $in or die "close: $!"; my $nr = 0; # a patch may have \0, see c4201214cbf10636e2c1ab9131573f735b42c8d4 @@ -162,8 +176,7 @@ sub shard_worker ($$$) { local $/ = $FS; my $buf = <$rd> // return; # leading $FS $buf eq $FS or die "BUG: not LF-NUL: $buf\n"; - my $xdb = $self->idx_acquire; - $xdb->begin_transaction; + $self->begin_txn_lazy; while (defined($buf = <$rd>)) { chomp($buf); $max -= length($buf); @@ -174,24 +187,40 @@ sub shard_worker ($$$) { ++$nr; if ($max <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) { progress($self, $nr); - $xdb->commit_transaction; + $self->{xdb}->commit_transaction; $max = $batch_bytes; - $xdb->begin_transaction; + $self->{xdb}->begin_transaction; } $/ = $FS; } close($rd); if (!$? || ($quit && ($? & 127) == POSIX::SIGPIPE)) { - $xdb->commit_transaction; + send($op_p, "shard_done $n", MSG_EOR); } else { warn "E: git @LOG_STDIN: \$?=$?\n"; - $xdb->cancel_transaction; + $self->{xdb}->cancel_transaction; } } +sub shard_done { # called via PktOp on shard_index completion + my ($self, $n) = @_; + $self->{-shard_ok}->{$n} = 1 if defined($self->{-shard_ok}); +} + sub seen ($$) { my ($xdb, $q) = @_; # $q = "Q$COMMIT_HASH" - $xdb->postlist_begin($q) != $xdb->postlist_end($q) + for (1..100) { + my $ret = eval { + $xdb->postlist_begin($q) != $xdb->postlist_end($q); + }; + return $ret unless $@; + if (ref($@) =~ /\bDatabaseModifiedError\b/) { + $xdb->reopen; + } else { + Carp::croak($@); + } + } + Carp::croak('too many Xapian DB modifications in progress'); } # used to select the shard for a GIT_DIR @@ -206,18 +235,42 @@ sub docids_by_postlist ($$) { # consider moving to PublicInbox::Search @ids; } +sub run_todo ($) { + my ($self) = @_; + my $n; + while (defined(my $x = shift(@{$self->{todo} // []}))) { + my $cb = shift @$x; + $cb->(@$x); + ++$n; + } + $n; +} + sub cidx_reap ($$) { my ($self, $jobs) = @_; - while (keys(%$LIVE) >= $jobs) { - my $pid = waitpid(-1, 0) // die "waitpid(-1): $!"; - last if $pid < 0; - if (my $x = delete $LIVE->{$pid}) { - my $cb = shift @$x; - $cb->(@$x) if $cb; - } else { - warn "reaped unknown PID=$pid ($?)\n"; - } + while (run_todo($self)) {} + my $cb = sub { keys(%$LIVE) > $jobs }; + PublicInbox::DS->SetPostLoopCallback($cb); + PublicInbox::DS::event_loop($MY_SIG, $SIGSET) while $cb->(); + while (!$jobs && run_todo($self)) {} +} + +sub cidx_await_cb { # awaitpid cb + my ($pid, $cb, $self, $git, @args) = @_; + return if !$LIVE; # premature shutdown + my $cmd = delete $LIVE->{$pid} // die 'BUG: no $cmd'; + PublicInbox::DS::enqueue_reap() if !keys(%$LIVE); # once more for PLC + if ($?) { + $git->{-cidx_err} = 1; + return warn("@$cmd error: \$?=$?\n"); } + push(@$DEFER, [ $cb, $self, $git, @args ]) if $DEFER; +} + +sub cidx_await ($$$$$@) { + my ($pid, $cmd, $cb, $self, $git, @args) = @_; + $LIVE->{$pid} = $cmd; + awaitpid($pid, \&cidx_await_cb, $cb, $self, $git, @args); } # this is different from the grokmirror-compatible fingerprint since we @@ -227,13 +280,14 @@ sub fp_start ($$$) { return if !$LIVE; # premature exit cidx_reap($self, $LIVE_JOBS); open my $refs, '+>', undef or die "open: $!"; - my $pid = spawn(['git', "--git-dir=$git->{git_dir}", - qw(show-ref --heads --tags --hash)], undef, { 1 => $refs }); + my $cmd = ['git', "--git-dir=$git->{git_dir}", + qw(show-ref --heads --tags --hash)]; + my $pid = spawn($cmd, undef, { 1 => $refs }); $git->{-repo}->{refs} = $refs; - $LIVE->{$pid} = [ \&fp_fini, $self, $git, $prep_repo ]; + cidx_await($pid, $cmd, \&fp_fini, $self, $git, $prep_repo); } -sub fp_fini { +sub fp_fini { # cidx_await cb my ($self, $git, $prep_repo) = @_; my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}'; seek($refs, 0, SEEK_SET) or die "seek: $!"; @@ -247,13 +301,15 @@ sub ct_start ($$$) { my ($self, $git, $prep_repo) = @_; return if !$LIVE; # premature exit cidx_reap($self, $LIVE_JOBS); - my ($rd, $pid) = $git->popen([qw[for-each-ref --sort=-committerdate + my $cmd = [ 'git', "--git-dir=$git->{git_dir}", + qw[for-each-ref --sort=-committerdate --format=%(committerdate:raw) --count=1 - refs/heads/ refs/tags/]]); - $LIVE->{$pid} = [ \&ct_fini, $self, $git, $rd, $prep_repo ]; + refs/heads/ refs/tags/] ]; + my ($rd, $pid) = popen_rd($cmd); + cidx_await($pid, $cmd, \&ct_fini, $self, $git, $rd, $prep_repo); } -sub ct_fini { +sub ct_fini { # cidx_await cb my ($self, $git, $rd, $prep_repo) = @_; defined(my $ct = <$rd>) or return; $ct =~ s/\s+.*\z//s; # drop TZ + LF @@ -263,34 +319,38 @@ sub ct_fini { # TODO: also index gitweb.owner and the full fingerprint for grokmirror? sub prep_repo ($$) { my ($self, $git) = @_; - return if !$LIVE; # premature exit + return if !$LIVE || $git->{-cidx_err}; # premature exit my $repo = $git->{-repo} // die 'BUG: no {-repo}'; - my $git_dir = $git->{git_dir}; if (!defined($repo->{ct})) { - warn "W: $git_dir has no commits, skipping\n"; + warn "W: $git->{git_dir} has no commits, skipping\n"; delete $git->{-repo}; return; } - my $n = git_dir_hash($git_dir) % $self->{nshard}; - my $shard = $repo->{shard} = bless { %$self, shard => $n }, ref($self); + my $n = git_dir_hash($git->{git_dir}) % $self->{nshard}; + my $shard = bless { %$self, shard => $n }, ref($self); + $repo->{shard_n} = $n; delete @$shard{qw(lockfh lock_path)}; - my $xdb = $XDB_SHARDS_FLAT[$n] // die "BUG: shard[$n] undef"; - $xdb->reopen; - my @docids = docids_by_postlist({ xdb => $xdb }, 'P'.$git_dir); + local $shard->{xdb} = $RDONLY_SHARDS[$n] // die "BUG: shard[$n] undef"; + $shard->retry_reopen(\&check_existing, $self, $git); +} + +sub check_existing { # retry_reopen callback + my ($shard, $self, $git) = @_; + my @docids = docids_by_postlist($shard, 'P'.$git->{git_dir}); my $docid = shift(@docids) // return get_roots($self, $git); - if (@docids) { - warn "BUG: $git_dir indexed multiple times, culling\n"; - $repo->{to_delete} = \@docids; # XXX needed? - } - my $doc = $xdb->get_document($docid) // - die "BUG: no #$docid ($git_dir)"; + my $doc = $shard->{xdb}->get_document($docid) // + die "BUG: no #$docid ($git->{git_dir})"; my $old_fp = $doc->get_data; - if ($old_fp eq $repo->{fp}) { # no change - progress($self, "$git_dir unchanged"); + if ($old_fp eq $git->{-repo}->{fp}) { # no change + progress($self, "$git->{git_dir} unchanged"); delete $git->{-repo}; return; } - $repo->{id} = $docid; + $git->{-repo}->{docid} = $docid; + if (@docids) { + warn "BUG: $git->{git_dir} indexed multiple times, culling\n"; + $git->{-repo}->{to_delete} = \@docids; # XXX needed? + } get_roots($self, $git); } @@ -304,12 +364,12 @@ sub partition_refs ($$$) { $_->reopen; open my $fh, '+>', undef or die "open: $!"; $fh; - } @XDB_SHARDS_FLAT; + } @RDONLY_SHARDS; while (defined(my $cmt = <$fh>)) { chomp $cmt; - my $n = hex(substr($cmt, 0, 8)) % scalar(@XDB_SHARDS_FLAT); - if (seen($XDB_SHARDS_FLAT[$n], 'Q'.$cmt)) { + my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS); + if (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) { last if ++$seen > $SEEN_MAX; } else { say { $shard_in[$n] } $cmt or die "say: $!"; @@ -330,9 +390,33 @@ sub partition_refs ($$$) { die "git --git-dir=$git->{git_dir} rev-list: \$?=$?\n"; } -sub index_repo { +sub shard_commit { # via wq_io_do + my ($self, $n) = @_; + my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p'; + $self->commit_txn_lazy; + send($op_p, "shard_done $n", MSG_EOR); +} + +sub commit_used_shards ($$$) { + my ($self, $git, $consumers) = @_; + local $self->{-shard_ok} = {}; + for my $n (keys %$consumers) { + my ($c, $p) = PublicInbox::PktOp->pair; + $c->{ops}->{shard_done} = [ $self ]; + $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n); + $consumers->{$n} = $c; + } + PublicInbox::DS->SetPostLoopCallback(sub { + scalar(grep { $_->{sock} } values %$consumers); + }); + PublicInbox::DS::event_loop($MY_SIG, $SIGSET); + my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers; + die "E: $git->{git_dir} $n shards failed" if $n; +} + +sub index_repo { # cidx_await cb my ($self, $git, $roots) = @_; - return if !$LIVE; # premature exit + return if $git->{-cidx_err}; my $repo = delete $git->{-repo} or return; seek($roots, 0, SEEK_SET) or die "seek: $!"; chomp(my @roots = <$roots>); @@ -341,73 +425,45 @@ sub index_repo { $repo->{roots} = \@roots; local $self->{current_info} = $git->{git_dir}; my @shard_in = partition_refs($self, $git, delete($repo->{refs})); - my %pids; - my $fwd_kill = sub { - my ($sig) = @_; - kill($sig, $_) for keys %pids; - }; - local $SIG{USR1} = $fwd_kill; - local $SIG{QUIT} = $fwd_kill; - local $SIG{INT} = $fwd_kill; - local $SIG{TERM} = $fwd_kill; - my $sigset = PublicInbox::DS::block_signals(); - for (my $n = 0; $n <= $#shard_in; $n++) { + local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1 + my %CONSUMERS; + for my $n (0..$#shard_in) { -s $shard_in[$n] or next; - my $pid = fork // die "fork: $!"; - if ($pid == 0) { # no RNG use, here - $0 = "code index [$n]"; - $self->{git} = $git; - $self->{shard} = $n; - $self->{current_info} = "$self->{current_info} [$n]"; - delete @$self{qw(lockfh lock_path)}; - my $in = $shard_in[$n]; - @shard_in = (); - $self->{roots} = \@roots; - undef $repo; - eval { shard_worker($self, $in, $sigset) }; - warn "E: $@" if $@; - POSIX::_exit($@ ? 1 : 0); - } else { - $pids{$pid} = "code index [$n]"; - } + my ($c, $p) = PublicInbox::PktOp->pair; + $c->{ops}->{shard_done} = [ $self ]; + $IDX_SHARDS[$n]->wq_io_do('shard_index', + [ $shard_in[$n], $p->{op_p} ], + $git, $n, \@roots); + $CONSUMERS{$n} = $c; } - PublicInbox::DS::sig_setmask($sigset); @shard_in = (); - my ($err, @todo); - while (keys %pids) { - my $pid = waitpid(-1, 0) // die "waitpid: $!"; - if (my $j = delete $pids{$pid}) { - next if $? == 0; - warn "PID:$pid $j exited with \$?=$?\n"; - $err = 1; - } elsif (my $todo = delete $LIVE->{$pid}) { - warn "PID:$pid exited with \$?=$?\n" if $?; - push @todo, $todo; - } else { - warn "reaped unknown PID=$pid ($?)\n"; - } - } - die "subprocess(es) failed\n" if $err; - store_repo($self, $git, $repo); - progress($self, "$git->{git_dir}: done"); - # TODO: check fp afterwards? - while (my $x = shift @todo) { - my $cb = shift @$x; - $cb->(@$x) if $cb; + PublicInbox::DS->SetPostLoopCallback(sub { + scalar(grep { $_->{sock} } values %CONSUMERS); + }); + PublicInbox::DS::event_loop($MY_SIG, $SIGSET); + my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS; + die "E: $git->{git_dir} $n shards failed" if $n; + $repo->{git_dir} = $git->{git_dir}; + my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo); + if ($id > 0) { + $CONSUMERS{$repo->{shard_n}} = undef; + commit_used_shards($self, $git, \%CONSUMERS); + progress($self, "$git->{git_dir}: done"); + return run_todo($self); } + die "E: store_repo $git->{git_dir}: id=$id"; } sub get_roots ($$) { my ($self, $git) = @_; return if !$LIVE; # premature exit - cidx_reap($self, $LIVE_JOBS); my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}'; sysseek($refs, 0, SEEK_SET) or die "seek: $!"; open my $roots, '+>', undef or die "open: $!"; - my $pid = spawn(['git', "--git-dir=$git->{git_dir}", - qw(rev-list --stdin --max-parents=0)], - undef, { 0 => $refs, 1 => $roots }); - $LIVE->{$pid} = [ \&index_repo, $self, $git, $roots ]; + my $cmd = [ 'git', "--git-dir=$git->{git_dir}", + qw(rev-list --stdin --max-parents=0) ]; + my $pid = spawn($cmd, undef, { 0 => $refs, 1 => $roots }); + cidx_await($pid, $cmd, \&index_repo, $self, $git, $roots); } # for PublicInbox::SearchIdx::patch_id and with_umask @@ -434,9 +490,17 @@ sub cidx_init ($) { warn "# creating $dir\n" if !$self->{-opt}->{quiet}; File::Path::mkpath($dir); } + $self->lock_acquire; + my @shards; for my $n (0..($self->{nshard} - 1)) { my $shard = bless { %$self, shard => $n }, ref($self); + delete @$shard{qw(lockfh lock_path)}; $shard->idx_acquire; + $shard->idx_release; + $shard->wq_workers_start("shard[$n]", 1, undef, { + siblings => \@shards, # for ipc_atfork_child + }, \&shard_done_wait, $self); + push @shards, $shard; } # this warning needs to happen after idx_acquire state $once; @@ -444,14 +508,11 @@ sub cidx_init ($) { W: Xapian v1.2.21..v1.2.24 were missing close-on-exec on OFD locks, W: memory usage may be high for large indexing runs EOM + @shards; } sub scan_git_dirs ($) { my ($self) = @_; - local $LIVE_JOBS = $self->{-opt}->{jobs} // - PublicInbox::IPC::detect_nproc() // 2; - local $LIVE = {}; - local @XDB_SHARDS_FLAT = $self->xdb_shards_flat; for (@{$self->{git_dirs}}) { my $git = PublicInbox::Git->new($_); my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo, @@ -462,18 +523,31 @@ sub scan_git_dirs ($) { cidx_reap($self, 0); } -sub cidx_run { +sub shards_active { # PostLoopCallback + scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS); +} + +sub cidx_run { # main entry point my ($self) = @_; - cidx_init($self); + local $self->{todo} = []; + local $DEFER = $self->{todo}; + local $SIGSET = PublicInbox::DS::block_signals(); + my $restore = PublicInbox::OnDestroy->new($$, + \&PublicInbox::DS::sig_setmask, $SIGSET); + local $LIVE = {}; + local @IDX_SHARDS = cidx_init($self); local $self->{current_info} = ''; my $cb = $SIG{__WARN__} || \&CORE::warn; + local $MY_SIG = { + CHLD => \&PublicInbox::DS::enqueue_reap, + INT => sub { exit }, + }; local $SIG{__WARN__} = sub { my $m = shift @_; $self->{current_info} eq '' or $m =~ s/\A(#?\s*)/$1$self->{current_info}: /; $cb->($m, @_); }; - $self->lock_acquire; load_existing($self); my @nc = grep { File::Spec->canonpath($_) ne $_ } @{$self->{git_dirs}}; if (@nc) { @@ -486,9 +560,41 @@ sub cidx_run { warn "E: canonicalized and attempting to continue\n"; } local $self->{nchange} = 0; + local $LIVE_JOBS = $self->{-opt}->{jobs} || + PublicInbox::IPC::detect_nproc() || 2; + local @RDONLY_SHARDS = $self->xdb_shards_flat; + # do_prune($self) if $self->{-opt}->{prune}; TODO scan_git_dirs($self) if $self->{-opt}->{scan} // 1; + + for my $s (@IDX_SHARDS) { + $s->{-cidx_quit} = 1; + $s->wq_close; + } + + PublicInbox::DS->SetPostLoopCallback(\&shards_active); + PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active(); $self->lock_release(!!$self->{nchange}); } +sub ipc_atfork_child { + my ($self) = @_; + $self->SUPER::ipc_atfork_child; + my $x = delete $self->{siblings} // die 'BUG: no {siblings}'; + $_->wq_close for @$x; +} + +sub shard_done_wait { # awaitpid cb via ipc_worker_reap + my ($pid, $shard, $self) = @_; + delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset'; + return unless $?; + warn "PID:$pid $shard->{shard} exited with \$?=$?\n"; + ++$self->{shard_err} if defined($self->{shard_err}); +} + +sub with_umask { # TODO + my ($self, $cb, @arg) = @_; + $cb->(@arg); +} + 1;