* [PATCH 0/4] cindex updates @ 2023-03-28 2:59 Eric Wong 2023-03-28 2:59 ` [PATCH 1/4] cindex: simplify some internal data structures Eric Wong ` (3 more replies) 0 siblings, 4 replies; 6+ messages in thread From: Eric Wong @ 2023-03-28 2:59 UTC (permalink / raw) To: meta cindex --prune is less intrusive w.r.t. handling new commits, now. I'm embracing `local' more because this is a Perl codebase; and I think it makes things more robust against typos. I probably should've been doing this years ago :x Eric Wong (4): cindex: simplify some internal data structures cindex: always break out of event loop on $DO_QUIT cindex: interleave prune with indexing cindex: leave SIGTSTP and SIGCONT unblocked lib/PublicInbox/CodeSearchIdx.pm | 298 ++++++++++++++++++++----------- lib/PublicInbox/DS.pm | 5 +- lib/PublicInbox/SearchIdx.pm | 12 +- script/public-inbox-cindex | 2 +- 4 files changed, 210 insertions(+), 107 deletions(-) ^ permalink raw reply [flat|nested] 6+ messages in thread
* [PATCH 1/4] cindex: simplify some internal data structures 2023-03-28 2:59 [PATCH 0/4] cindex updates Eric Wong @ 2023-03-28 2:59 ` Eric Wong 2023-03-28 2:59 ` [PATCH 2/4] cindex: always break out of event loop on $DO_QUIT Eric Wong ` (2 subsequent siblings) 3 siblings, 0 replies; 6+ messages in thread From: Eric Wong @ 2023-03-28 2:59 UTC (permalink / raw) To: meta We'll rely more on local-ized `our' globals rather than hashref fields. The former is more resistant to typos and can be checked at compile-time earlier via `perl -c'. The {-internal} field is also renamed to {-cidx_internal} in case to reduce confusion within a large code base. --- lib/PublicInbox/CodeSearchIdx.pm | 22 +++++++++------------- script/public-inbox-cindex | 2 +- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index e353f452..85e44cdc 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -289,10 +289,9 @@ sub docids_by_postlist ($$) { # consider moving to PublicInbox::Search @ids; } -sub run_todo ($) { - my ($self) = @_; +sub run_deferred () { my $n; - while (defined(my $x = shift(@{$self->{todo} // []}))) { + while (defined(my $x = shift(@{$DEFER // []}))) { my $cb = shift @$x; $cb->(@$x); ++$n; @@ -308,12 +307,12 @@ sub need_reap { # post_loop_do sub cidx_reap ($$) { my ($self, $jobs) = @_; - while (run_todo($self)) {} + while (run_deferred()) {} local @PublicInbox::DS::post_loop_do = (\&need_reap, $jobs); while (need_reap(undef, $jobs)) { PublicInbox::DS::event_loop($MY_SIG, $SIGSET); } - while (!$jobs && run_todo($self)) {} + while (!$jobs && run_deferred()) {} } sub cidx_await_cb { # awaitpid cb @@ -527,7 +526,7 @@ sub index_repo { # cidx_await cb $consumers->{$repo->{shard_n}} = undef; commit_used_shards($self, $git, $consumers); progress($self, "$git->{git_dir}: done"); - return run_todo($self); + return run_deferred(); } die "E: store_repo $git->{git_dir}: id=$id"; } @@ -725,7 +724,7 @@ sub prep_umask ($) { my ($self) = @_; my $um; my $cur = umask; - if ($self->{-internal}) { # respect core.sharedRepository + if ($self->{-cidx_internal}) { # respect core.sharedRepository @{$self->{git_dirs}} == 1 or die 'BUG: only for GIT_DIR'; # yuck, FIXME move umask handling out of inbox-specific stuff require PublicInbox::InboxWritable; @@ -750,14 +749,12 @@ sub prep_umask ($) { sub cidx_run { # main entry point my ($self) = @_; my $restore_umask = prep_umask($self); - local $self->{todo} = []; - local $DEFER = $self->{todo}; + local $DEFER = []; local $SIGSET = PublicInbox::DS::block_signals(); my $restore = PublicInbox::OnDestroy->new($$, \&PublicInbox::DS::sig_setmask, $SIGSET); local $LIVE = {}; - local $DO_QUIT; - local $TMP_GIT; + local ($DO_QUIT, $TMP_GIT, $REINDEX); local @IDX_SHARDS = cidx_init($self); local $self->{current_info} = ''; local $MY_SIG = { @@ -772,8 +769,7 @@ sub cidx_run { # main entry point $m =~ s/\A(#?\s*)/$1$self->{current_info}: /; $cb->($m, @_); }; - load_existing($self) unless $self->{-internal}; - local $REINDEX; + load_existing($self) unless $self->{-cidx_internal}; if ($self->{-opt}->{reindex}) { require PublicInbox::SharedKV; $REINDEX = PublicInbox::SharedKV->new; diff --git a/script/public-inbox-cindex b/script/public-inbox-cindex index fb906bad..4c9136cf 100755 --- a/script/public-inbox-cindex +++ b/script/public-inbox-cindex @@ -79,7 +79,7 @@ EOM for my $gd (@git_dirs) { my $cd = "$gd/public-inbox-cindex"; my $cidx = PublicInbox::CodeSearchIdx->new($cd, { %$opt }); - $cidx->{-internal} = 1; + $cidx->{-cidx_internal} = 1; @{$cidx->{git_dirs}} = ($gd); $cidx->cidx_run; } ^ permalink raw reply related [flat|nested] 6+ messages in thread
* [PATCH 2/4] cindex: always break out of event loop on $DO_QUIT 2023-03-28 2:59 [PATCH 0/4] cindex updates Eric Wong 2023-03-28 2:59 ` [PATCH 1/4] cindex: simplify some internal data structures Eric Wong @ 2023-03-28 2:59 ` Eric Wong 2023-03-28 2:59 ` [PATCH 3/4] cindex: interleave prune with indexing Eric Wong 2023-03-28 2:59 ` [PATCH 4/4] cindex: leave SIGTSTP and SIGCONT unblocked Eric Wong 3 siblings, 0 replies; 6+ messages in thread From: Eric Wong @ 2023-03-28 2:59 UTC (permalink / raw) To: meta Shard workers may not die soon enough (or get stuck), just let the parent die earlier since it doesn't need to commit anything. --- lib/PublicInbox/CodeSearchIdx.pm | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index 85e44cdc..6907570d 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -689,6 +689,7 @@ sub do_prune ($) { } sub shards_active { # post_loop_do + return if $DO_QUIT; scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS); } ^ permalink raw reply related [flat|nested] 6+ messages in thread
* [PATCH 3/4] cindex: interleave prune with indexing 2023-03-28 2:59 [PATCH 0/4] cindex updates Eric Wong 2023-03-28 2:59 ` [PATCH 1/4] cindex: simplify some internal data structures Eric Wong 2023-03-28 2:59 ` [PATCH 2/4] cindex: always break out of event loop on $DO_QUIT Eric Wong @ 2023-03-28 2:59 ` Eric Wong 2023-03-29 20:32 ` [PATCH v2] " Eric Wong 2023-03-28 2:59 ` [PATCH 4/4] cindex: leave SIGTSTP and SIGCONT unblocked Eric Wong 3 siblings, 1 reply; 6+ messages in thread From: Eric Wong @ 2023-03-28 2:59 UTC (permalink / raw) To: meta We need to ensure we don't block indexing for too long while pruning, since pruning coderepos seems more frequent and necessary than inbox repos due to the prevalence of force pushes with branches like `seen' (formerly `pu') in git.git. This requires us to workaround a current bug[1] in the XS Search::Xapian (and maybe Xapian.pm SWIG) module where a unwrapped C++ exception for DatabaseModifiedError can get thrown and be uncatchable via Perl `eval'. [1] https://lists.xapian.org/pipermail/xapian-discuss/2023-March/009967.html <20230327114604.M803690@dcvr> --- lib/PublicInbox/CodeSearchIdx.pm | 274 +++++++++++++++++++++---------- lib/PublicInbox/SearchIdx.pm | 12 +- 2 files changed, 195 insertions(+), 91 deletions(-) diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index 6907570d..f60af015 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -32,6 +32,7 @@ use PublicInbox::Spawn qw(spawn popen_rd); use PublicInbox::OnDestroy; use Socket qw(MSG_EOR); use Carp (); +use List::Util qw(max); our ( $LIVE, # pid => cmd $DEFER, # [ [ cb, @args ], ... ] @@ -39,12 +40,22 @@ our ( $MY_SIG, # like %SIG $SIGSET, $TXN_BYTES, # number of bytes in current shard transaction + $BATCH_BYTES, $DO_QUIT, # signal number - @RDONLY_SHARDS, # Xapian::Database + @RDONLY_XDB, # Xapian::Database + @WORKER_SHARDS, # read-only clones of self @IDX_SHARDS, # clones of self $MAX_SIZE, $TMP_GIT, # PublicInbox::Git object for --prune $REINDEX, # PublicInbox::SharedKV + @GIT_DIR_GONE, # [ git_dir1, git_dir2 ] + %TO_PRUNE, # (docid => docid) mapping (hash in case of retry_reopen) + $PRUNE_CUR, # per-shard document ID + $PRUNE_FH, # tracks PRUNE_CUR in the event of C++ exception + $PRUNE_SENT, # total number pruned by one of WORKER_SHARDS + $PRUNE_RECV, # pruned by one of IDX_SHARDS + $NCHANGE, # current number of changes + %ACTIVE_GIT_DIR, # GIT_DIR => undef mapping for prune ); # stop walking history if we see >$SEEN_MAX existing commits, this assumes @@ -137,7 +148,7 @@ sub store_repo { # wq_do - returns docid my $xdb = $self->{xdb}; for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed? if (defined $repo->{docid}) { - my $doc = $xdb->get_document($repo->{docid}) // + my $doc = $self->get_doc($repo->{docid}) // die "$repo->{git_dir} doc #$repo->{docid} gone"; add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct}); my %new = map { $_ => undef } @{$repo->{roots}}; @@ -198,17 +209,15 @@ EOM } # sharded reader for `git log --pretty=format: --stdin' -sub shard_index { # via wq_io_do +sub shard_index { # via wq_io_do in IDX_SHARDS my ($self, $git, $n, $roots) = @_; local $self->{current_info} = "$git->{git_dir} [$n]"; local $self->{roots} = $roots; my $in = delete($self->{0}) // die 'BUG: no {0} input'; my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p'; - my $batch_bytes = $self->{-opt}->{batch_size} // - $PublicInbox::SearchIdx::BATCH_BYTES; local $MAX_SIZE = $self->{-opt}->{max_size}; # local-ized in parent before fork - $TXN_BYTES = $batch_bytes; + $TXN_BYTES = $BATCH_BYTES; local $self->{git} = $git; # for patchid return if $DO_QUIT; my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in }); @@ -236,13 +245,13 @@ sub shard_index { # via wq_io_do $TXN_BYTES -= $len; if ($TXN_BYTES <= 0) { cidx_ckpoint($self, "[$n] $nr"); - $TXN_BYTES = $batch_bytes - $len; + $TXN_BYTES = $BATCH_BYTES - $len; } update_commit($self, $cmt); ++$nr; if ($TXN_BYTES <= 0) { cidx_ckpoint($self, "[$n] $nr"); - $TXN_BYTES = $batch_bytes; + $TXN_BYTES = $BATCH_BYTES; } $/ = $FS; } @@ -390,7 +399,7 @@ sub prep_repo ($$) { my $shard = bless { %$self, shard => $n }, ref($self); $repo->{shard_n} = $n; delete @$shard{qw(lockfh lock_path)}; - local $shard->{xdb} = $RDONLY_SHARDS[$n] // die "BUG: shard[$n] undef"; + local $shard->{xdb} = $RDONLY_XDB[$n] // die "BUG: shard[$n] undef"; $shard->retry_reopen(\&check_existing, $self, $git); } @@ -398,7 +407,7 @@ sub check_existing { # retry_reopen callback my ($shard, $self, $git) = @_; my @docids = docids_by_postlist($shard, 'P'.$git->{git_dir}); my $docid = shift(@docids) // return get_roots($self, $git); - my $doc = $shard->{xdb}->get_document($docid) // + my $doc = $shard->get_doc($docid) // die "BUG: no #$docid ($git->{git_dir})"; my $old_fp = $REINDEX ? "\0invalid" : $doc->get_data; if ($old_fp eq $git->{-repo}->{fp}) { # no change @@ -418,24 +427,24 @@ sub partition_refs ($$$) { sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin my $rfh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs }); close $refs or die "close: $!"; - my ($seen, $nchange) = (0, 0); + my $seen = 0; my @shard_in = map { $_->reopen; open my $fh, '+>', undef or die "open: $!"; $fh; - } @RDONLY_SHARDS; + } @RDONLY_XDB; while (defined(my $cmt = <$rfh>)) { chomp $cmt; - my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS); + my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_XDB); if ($REINDEX && $REINDEX->set_maybe(pack('H*', $cmt), '')) { say { $shard_in[$n] } $cmt or die "say: $!"; - ++$nchange; - } elsif (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) { + ++$NCHANGE; + } elsif (seen($RDONLY_XDB[$n], 'Q'.$cmt)) { last if ++$seen > $SEEN_MAX; } else { say { $shard_in[$n] } $cmt or die "say: $!"; - ++$nchange; + ++$NCHANGE; $seen = 0; } if ($DO_QUIT) { @@ -446,8 +455,7 @@ sub partition_refs ($$$) { close($rfh); return () if $DO_QUIT; if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) { - $self->{nchange} += $nchange; - progress($self, "$git->{git_dir}: $nchange commits"); + progress($self, "$git->{git_dir}: $NCHANGE commits"); for my $fh (@shard_in) { $fh->flush or die "flush: $!"; sysseek($fh, 0, SEEK_SET) or die "seek: $!"; @@ -548,25 +556,25 @@ sub git { $_[0]->{git} } sub load_existing ($) { # for -u/--update my ($self) = @_; - my $dirs = $self->{git_dirs} // []; + my $dirs = $self->{git_dirs} //= []; if ($self->{-opt}->{update} || $self->{-opt}->{prune}) { local $self->{xdb}; $self->xdb or die "E: $self->{cidx_dir} non-existent for --update\n"; - my @missing; my @cur = grep { if (-e $_) { 1; } else { - push @missing, $_; + push @GIT_DIR_GONE, $_; undef; } } $self->all_terms('P'); - @missing = () if $self->{-opt}->{prune}; - @missing and warn "W: the following repos no longer exist:\n", - (map { "W:\t$_\n" } @missing), + if (@GIT_DIR_GONE && !$self->{-opt}->{prune}) { + warn "W: the following repos no longer exist:\n", + (map { "W:\t$_\n" } @GIT_DIR_GONE), "W: use --prune to remove them from ", $self->{cidx_dir}, "\n"; + } push @$dirs, @cur; } my %uniq; # List::Util::uniq requires Perl 5.26+ @@ -586,13 +594,12 @@ sub cidx_init ($) { } $self->lock_acquire; my @shards; - local $TXN_BYTES; for my $n (0..($self->{nshard} - 1)) { my $shard = bless { %$self, shard => $n }, ref($self); delete @$shard{qw(lockfh lock_path)}; $shard->idx_acquire; $shard->idx_release; - $shard->wq_workers_start("shard[$n]", 1, $SIGSET, { + $shard->wq_workers_start("cidx shard[$n]", 1, $SIGSET, { siblings => \@shards, # for ipc_atfork_child }, \&shard_done_wait, $self); push @shards, $shard; @@ -619,82 +626,166 @@ sub scan_git_dirs ($) { cidx_reap($self, 0); } +sub prune_recv { # via wq_do in IDX_SHARDS, called from WORKER_SHARDS + my ($self, $to_prune) = @_; + return if $DO_QUIT; + $self->begin_txn_lazy; + $PRUNE_RECV += scalar @$to_prune; + my $xdb = $self->{xdb}; + $xdb->delete_document($_) for @$to_prune; + $self->commit_txn_lazy; + progress($self, "< prune [$self->{shard}] $PRUNE_RECV"); +} + +sub prune_ckpoint ($) { + my ($self) = @_; + $TXN_BYTES = $BATCH_BYTES; + return if $DO_QUIT; + my @to_prune = values(%TO_PRUNE) or return; + %TO_PRUNE = (); + $PRUNE_SENT += scalar(@to_prune); + progress($self, "> prune [$self->{shard}] $PRUNE_SENT"); + $IDX_SHARDS[$self->{shard}]->wq_do('prune_recv', \@to_prune); + seek($PRUNE_FH, 0, SEEK_SET) or die "seek: $!"; + print $PRUNE_FH pack('J',max(@to_prune)) or die "print: $!"; +} + +sub get_doclen { # retry_reopen callback + my ($self, $id) = @_; + $self->{xdb}->get_doclength($id); +} + sub prune_cb { # git->check_async callback my ($hex, $type, undef, $self_id) = @_; - return if $type eq 'commit'; my ($self, $id) = @$self_id; - my $len = $self->{xdb}->get_doclength($id); - progress($self, "$hex $type (doclength=$len)"); - ++$self->{pruned}; - $self->{xdb}->delete_document($id); + return ($PRUNE_CUR = $id) if $type eq 'commit'; + progress($self, "$hex $type #$id") if ($self->{-opt}->{verbose}//0) > 1; + my $len = $self->retry_reopen(\&get_doclen, $id); + $TO_PRUNE{$id} = $PRUNE_CUR = $id; - # all math around batch_bytes calculation is pretty fuzzy, + # all math around TXN_BYTES calculation is pretty fuzzy, # but need a way to regularly flush output to avoid OOM, # so assume the average term + position overhead is the # answer to everything: 42 - return if ($self->{batch_bytes} -= ($len * 42)) > 0; - cidx_ckpoint($self, "[$self->{shard}] $self->{pruned}"); - $self->{batch_bytes} = $self->{-opt}->{batch_size} // - $PublicInbox::SearchIdx::BATCH_BYTES; + return if ($TXN_BYTES -= ($len * 42)) > 0; + prune_ckpoint($self); +} + +sub prune_git_dir ($$$) { + my ($self, $id, $doc) = @_; + my @P = xap_terms('P', $doc); + scalar(@P) == 1 or warn +"BUG? shard[$self->{shard}] #$id has zero or multiple paths: @P"; + for my $P (@P) { + if (exists($ACTIVE_GIT_DIR{$P}) && -d $P) { + $PRUNE_CUR = $id; + } else { + $TO_PRUNE{$id} = $id; + progress($self, "$P gone #$id"); + my $len = $self->{xdb}->get_doclength($id); + $PRUNE_CUR = $id; + return if ($TXN_BYTES -= ($len * 42)) > 0; + prune_ckpoint($self); + } + } } -sub shard_prune { # via wq_io_do - my ($self, $n, $git_dir) = @_; - my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p'; - my $git = PublicInbox::Git->new($git_dir); # TMP_GIT copy - $self->begin_txn_lazy; - my $xdb = $self->{xdb}; - my $cur = $xdb->postlist_begin('Tc'); - my $end = $xdb->postlist_end('Tc'); - my ($id, @cmt, $oid); - local $self->{batch_bytes} = $self->{-opt}->{batch_size} // - $PublicInbox::SearchIdx::BATCH_BYTES; - local $self->{pruned} = 0; - for (; $cur != $end && !$DO_QUIT; $cur++) { - @cmt = xap_terms('Q', $xdb, $id = $cur->get_docid); - scalar(@cmt) == 1 or - warn "BUG? shard[$n] #$id has multiple commits: @cmt"; - for $oid (@cmt) { - $git->check_async($oid, \&prune_cb, [ $self, $id ]); +sub prune_all { # retry_reopen cb + my ($self, $git) = @_; + my $max = $self->{xdb}->get_lastdocid; + my $cur = $self->{xdb}->postlist_begin('Tc'); + my $end = $self->{xdb}->postlist_end('Tc'); + my $id = $PRUNE_CUR; + local %ACTIVE_GIT_DIR = map { + $_ => undef + } (@{$self->{git_dirs}}, @GIT_DIR_GONE); + + for (; $id <= $max && !$DO_QUIT; $id++) { + my $doc = $self->get_doc($id) // next; + my @cmt = xap_terms('Q', $doc); + if (scalar(@cmt) == 0) { + prune_git_dir($self, $id, $doc); + } else { + scalar(@cmt) == 1 or warn +"BUG? shard[$self->{shard}] #$id has multiple commits: @cmt"; + for my $o (@cmt) { + $git->check_async($o, \&prune_cb, [$self, $id]) + } } } - $git->async_wait_all; - for my $d ($self->all_terms('P')) { # GIT_DIR paths - last if $DO_QUIT; - next if -d $d; - for $id (docids_by_postlist($self, 'P'.$d)) { - progress($self, "$d gone #$id"); - $xdb->delete_document($id); - } +} + +sub prune_send { # via wq_io_do in WORKER_SHARDS + my ($self, $git_dir) = @_; + my $n = $self->{shard} // die 'BUG: no {shard}'; + my ($pid, $wpid); + my $cur = 1; + open my $fh, '+>', undef or die "open: $!"; + $fh->autoflush(1); + # fork off into a child to deal with unwrapped + # Xapian::DatabaseModifiedError C++ exceptions from Search::Xapian + # retry_reopen catches most of the properly wrapped-into-Perl + # exceptions, but we can't catch (non-Perl) C++ exceptions +again: + $pid = fork // die "fork: $!"; + if ($pid == 0) { + my $end = PublicInbox::OnDestroy->new($$, \&CORE::exit, 1); + my $git = PublicInbox::Git->new($git_dir); # TMP_GIT copy + local $TXN_BYTES = $BATCH_BYTES; + local $PRUNE_SENT = 0; + local $PRUNE_FH = $fh; + local $PRUNE_CUR = $cur; + local %TO_PRUNE; + $self->{xdb}->reopen; + $self->retry_reopen(\&prune_all, $git); + $git->async_wait_all; + $git->cleanup; + prune_ckpoint($self); + $PRUNE_SENT and + progress($self, "[$n] pruned $PRUNE_SENT items"); + $end->cancel; + exit 0; + } + $wpid = waitpid($pid, 0) // die "waitpid($pid) $!"; + die("W: waitpid($pid) => $wpid ($!)") if $wpid != $pid; + if ($? == 134) { + seek($fh, 0, SEEK_SET) or die "seek: $!"; + read($fh, $cur, -s $fh) // die "read: $!"; + $cur = unpack('J', $cur); + warn "W: retrying $$ on uncaught exception from #$cur..\n"; + goto again; + } elsif ($?) { + warn "W: prune_send worker exited with \$?=$?\n"; } - $self->commit_txn_lazy; - $self->{pruned} and - progress($self, "[$n] pruned $self->{pruned} commits"); - send($op_p, "shard_done $n", MSG_EOR); } -sub do_prune ($) { +sub ro_workers_start ($) { my ($self) = @_; - my $consumers = {}; - my $git_dir = $TMP_GIT->{git_dir}; - my $n = 0; - local $self->{-shard_ok} = {}; - for my $s (@IDX_SHARDS) { - my ($c, $p) = PublicInbox::PktOp->pair; - $c->{ops}->{shard_done} = [ $self ]; - $s->wq_io_do('shard_prune', [ $p->{op_p} ], $n, $git_dir); - $consumers->{$n++} = $c; + return () unless $self->{-opt}->{prune}; # TODO fsck? + init_tmp_git_dir(); + my @shards; + for my $n (0..($self->{nshard} - 1)) { + my $shard = bless { %$self, shard => $n }, ref($self); + $shard->wq_workers_start("cidx-ro shard[$n]", 1, $SIGSET, { + siblings => \@shards, # for ipc_atfork_child + -cidx_ro => 1, + }, \&shard_done_wait, $self); + push @shards, $shard; } - wait_consumers($self, $TMP_GIT, $consumers); + @shards; +} + +sub start_prune () { + $_->wq_do('prune_send', $TMP_GIT->{git_dir}) for @WORKER_SHARDS; } sub shards_active { # post_loop_do return if $DO_QUIT; - scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS); + scalar(grep { $_->{-cidx_quit} } (@WORKER_SHARDS, @IDX_SHARDS)); } # signal handlers -sub kill_shards { $_->wq_kill(@_) for @IDX_SHARDS } +sub kill_shards { $_->wq_kill(@_) for (@WORKER_SHARDS, @IDX_SHARDS) } sub parent_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->(); @@ -704,7 +795,6 @@ sub parent_quit { sub init_tmp_git_dir ($) { my ($self) = @_; - return unless $self->{-opt}->{prune}; require File::Temp; require PublicInbox::Import; my $tmp = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1); @@ -755,7 +845,9 @@ sub cidx_run { # main entry point my $restore = PublicInbox::OnDestroy->new($$, \&PublicInbox::DS::sig_setmask, $SIGSET); local $LIVE = {}; - local ($DO_QUIT, $TMP_GIT, $REINDEX); + local ($DO_QUIT, $TMP_GIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, $PRUNE_RECV); + local $BATCH_BYTES = $self->{-opt}->{batch_size} // + $PublicInbox::SearchIdx::BATCH_BYTES; local @IDX_SHARDS = cidx_init($self); local $self->{current_info} = ''; local $MY_SIG = { @@ -795,31 +887,37 @@ sub cidx_run { # main entry point $_ =~ /$re/ ? (warn("# excluding $_\n"), 0) : 1; } @{$self->{git_dirs}}; } - local $self->{nchange} = 0; + local $NCHANGE = 0; local $LIVE_JOBS = $self->{-opt}->{jobs} || PublicInbox::IPC::detect_nproc() || 2; - local @RDONLY_SHARDS = $self->xdb_shards_flat; - init_tmp_git_dir($self); - do_prune($self) if $self->{-opt}->{prune}; + local @RDONLY_XDB = $self->xdb_shards_flat; + local @WORKER_SHARDS = ro_workers_start($self); + start_prune() if $self->{-opt}->{prune}; scan_git_dirs($self) if $self->{-opt}->{scan} // 1; - for my $s (@IDX_SHARDS) { + for my $s (@WORKER_SHARDS, @IDX_SHARDS) { $s->{-cidx_quit} = 1; $s->wq_close; } local @PublicInbox::DS::post_loop_do = (\&shards_active); PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active(); - $self->lock_release(!!$self->{nchange}); + $self->lock_release(!!$NCHANGE); } -sub ipc_atfork_child { +sub ipc_atfork_child { # both @WORKER_SHARDS and @IDX_SHARDS my ($self) = @_; $self->SUPER::ipc_atfork_child; $SIG{USR1} = \&shard_usr1; $SIG{$_} = \&shard_quit for qw(INT TERM QUIT); my $x = delete $self->{siblings} // die 'BUG: no {siblings}'; $_->wq_close for @$x; + my $n = $self->{shard} // die 'BUG: no {shard}'; + if ($self->{-cidx_ro}) { + $self->{xdb} = $RDONLY_XDB[$n] // die "BUG: no RDONLY_XDB[$n]"; + $self->{xdb}->reopen; + @RDONLY_XDB = (); + } undef; } diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 3baeaa9c..b907772e 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -552,11 +552,17 @@ sub add_message { $smsg->{num}; } +sub get_doc ($$) { + my ($self, $docid) = @_; + eval { $self->{xdb}->get_document($docid) } // do { + die $@ if $@ && ref($@) !~ /\bDocNotFoundError\b/; + undef; + } +} + sub _get_doc ($$) { my ($self, $docid) = @_; - my $doc = eval { $self->{xdb}->get_document($docid) }; - $doc // do { - warn "E: $@\n" if $@; + get_doc($self, $docid) // do { warn "E: #$docid missing in Xapian\n"; undef; } ^ permalink raw reply related [flat|nested] 6+ messages in thread
* [PATCH v2] cindex: interleave prune with indexing 2023-03-28 2:59 ` [PATCH 3/4] cindex: interleave prune with indexing Eric Wong @ 2023-03-29 20:32 ` Eric Wong 0 siblings, 0 replies; 6+ messages in thread From: Eric Wong @ 2023-03-29 20:32 UTC (permalink / raw) To: meta I wasn't happy with v1. This one is less code and doesn't require a workaround for uncatchable C++ exceptions with Search::Xapian. This version also avoids doing redundant pruning work on newly-indexed commits. -------8<------ Subject: [PATCH] cindex: interleave prune with indexing We need to ensure we don't block indexing for too long while pruning, since pruning coderepos seems more frequent and necessary than inbox repos due to the prevalence of force pushes with branches like `seen' (formerly `pu') in git.git. Implement this via ->event_step and requeue mechanisms of DS so we periodically flush our work and let indexing resume. I originally wanted to implement this as a dedicated group of workers, but the XS Search::Xapian bug[1] workaround to handle uncaught C++ exceptions was expensive and complex compared to the evented mechanism. [1] https://lists.xapian.org/pipermail/xapian-discuss/2023-March/009967.html <20230327114604.M803690@dcvr> --- lib/PublicInbox/CodeSearchIdx.pm | 237 ++++++++++++++++++------------- lib/PublicInbox/SearchIdx.pm | 12 +- 2 files changed, 147 insertions(+), 102 deletions(-) diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index 9e70087e..035fab3e 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -39,12 +39,22 @@ our ( $MY_SIG, # like %SIG $SIGSET, $TXN_BYTES, # number of bytes in current shard transaction + $BATCH_BYTES, $DO_QUIT, # signal number - @RDONLY_SHARDS, # Xapian::Database + @RDONLY_XDB, # Xapian::Database @IDX_SHARDS, # clones of self $MAX_SIZE, $TMP_GIT, # PublicInbox::Git object for --prune $REINDEX, # PublicInbox::SharedKV + @GIT_DIR_GONE, # [ git_dir1, git_dir2 ] + %TO_PRUNE, # (docid => docid) mapping (hash in case of retry_reopen) + $PRUNE_CUR, # per-shard document ID + $PRUNE_MAX, # per-shard document ID to stop at + $PRUNE_OP_P, # prune_done() notification socket + $PRUNE_NR, # total number pruned + @PRUNE_DONE, # marks off prune completions + $NCHANGE, # current number of changes + %ACTIVE_GIT_DIR, # GIT_DIR => undef mapping for prune ); # stop walking history if we see >$SEEN_MAX existing commits, this assumes @@ -137,7 +147,7 @@ sub store_repo { # wq_do - returns docid my $xdb = $self->{xdb}; for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed? if (defined $repo->{docid}) { - my $doc = $xdb->get_document($repo->{docid}) // + my $doc = $self->get_doc($repo->{docid}) // die "$repo->{git_dir} doc #$repo->{docid} gone"; add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct}); my %new = map { $_ => undef } @{$repo->{roots}}; @@ -160,12 +170,21 @@ sub store_repo { # wq_do - returns docid } } -sub cidx_ckpoint ($$) { +sub cidx_ckpoint ($;$) { my ($self, $msg) = @_; - progress($self, $msg); + progress($self, $msg) if defined($msg); + $TXN_BYTES = $BATCH_BYTES; # reset + if (my @to_prune = values(%TO_PRUNE)) { + %TO_PRUNE = (); + $PRUNE_NR += scalar(@to_prune); + progress($self, + "prune [$self->{shard}] $PRUNE_NR ($PRUNE_CUR/$PRUNE_MAX)"); + $self->begin_txn_lazy; + $self->{xdb}->delete_document($_) for @to_prune; + } return if $PublicInbox::Search::X{CLOEXEC_UNSET}; - $self->{xdb}->commit_transaction; - $self->{xdb}->begin_transaction; + $self->commit_txn_lazy; + $self->begin_txn_lazy; } sub truncate_cmt ($$) { @@ -198,17 +217,15 @@ EOM } # sharded reader for `git log --pretty=format: --stdin' -sub shard_index { # via wq_io_do +sub shard_index { # via wq_io_do in IDX_SHARDS my ($self, $git, $n, $roots) = @_; local $self->{current_info} = "$git->{git_dir} [$n]"; local $self->{roots} = $roots; my $in = delete($self->{0}) // die 'BUG: no {0} input'; my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p'; - my $batch_bytes = $self->{-opt}->{batch_size} // - $PublicInbox::SearchIdx::BATCH_BYTES; local $MAX_SIZE = $self->{-opt}->{max_size}; # local-ized in parent before fork - $TXN_BYTES = $batch_bytes; + $TXN_BYTES = $BATCH_BYTES; local $self->{git} = $git; # for patchid return if $DO_QUIT; my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in }); @@ -233,17 +250,13 @@ sub shard_index { # via wq_io_do } else { @$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT)); } - $TXN_BYTES -= $len; - if ($TXN_BYTES <= 0) { + if (($TXN_BYTES -= $len) <= 0) { cidx_ckpoint($self, "[$n] $nr"); - $TXN_BYTES = $batch_bytes - $len; + $TXN_BYTES -= $len; # len may be huge, >TXN_BYTES; } update_commit($self, $cmt); ++$nr; - if ($TXN_BYTES <= 0) { - cidx_ckpoint($self, "[$n] $nr"); - $TXN_BYTES = $batch_bytes; - } + cidx_ckpoint($self, "[$n] $nr") if $TXN_BYTES <= 0; $/ = $FS; } close($rd); @@ -261,6 +274,21 @@ sub shard_done { # called via PktOp on shard_index completion $self->{-shard_ok}->{$n} = 1 if defined($self->{-shard_ok}); } +sub prune_done { # called via PktOp->event_step completion + my ($shard) = @_; + $PRUNE_DONE[$shard->{shard}] = 1; +} + +sub prune_busy { + return if $DO_QUIT; + grep(defined, @PRUNE_DONE) != @IDX_SHARDS; +} + +sub await_prune () { + local @PublicInbox::DS::post_loop_do = (\&prune_busy); + PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if prune_busy(); +} + sub seen ($$) { my ($xdb, $q) = @_; # $q = "Q$COMMIT_HASH" for (1..100) { @@ -390,7 +418,7 @@ sub prep_repo ($$) { my $shard = bless { %$self, shard => $n }, ref($self); $repo->{shard_n} = $n; delete @$shard{qw(lockfh lock_path)}; - local $shard->{xdb} = $RDONLY_SHARDS[$n] // die "BUG: shard[$n] undef"; + local $shard->{xdb} = $RDONLY_XDB[$n] // die "BUG: shard[$n] undef"; $shard->retry_reopen(\&check_existing, $self, $git); } @@ -398,7 +426,7 @@ sub check_existing { # retry_reopen callback my ($shard, $self, $git) = @_; my @docids = docids_by_postlist($shard, 'P'.$git->{git_dir}); my $docid = shift(@docids) // return get_roots($self, $git); - my $doc = $shard->{xdb}->get_document($docid) // + my $doc = $shard->get_doc($docid) // die "BUG: no #$docid ($git->{git_dir})"; my $old_fp = $REINDEX ? "\0invalid" : $doc->get_data; if ($old_fp eq $git->{-repo}->{fp}) { # no change @@ -418,24 +446,24 @@ sub partition_refs ($$$) { sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin my $rfh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs }); close $refs or die "close: $!"; - my ($seen, $nchange) = (0, 0); + my $seen = 0; my @shard_in = map { $_->reopen; open my $fh, '+>', undef or die "open: $!"; $fh; - } @RDONLY_SHARDS; + } @RDONLY_XDB; while (defined(my $cmt = <$rfh>)) { chomp $cmt; - my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS); + my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_XDB); if ($REINDEX && $REINDEX->set_maybe(pack('H*', $cmt), '')) { say { $shard_in[$n] } $cmt or die "say: $!"; - ++$nchange; - } elsif (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) { + ++$NCHANGE; + } elsif (seen($RDONLY_XDB[$n], 'Q'.$cmt)) { last if ++$seen > $SEEN_MAX; } else { say { $shard_in[$n] } $cmt or die "say: $!"; - ++$nchange; + ++$NCHANGE; $seen = 0; } if ($DO_QUIT) { @@ -446,8 +474,7 @@ sub partition_refs ($$$) { close($rfh); return () if $DO_QUIT; if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) { - $self->{nchange} += $nchange; - progress($self, "$git->{git_dir}: $nchange commits"); + progress($self, "$git->{git_dir}: $NCHANGE commits"); for my $fh (@shard_in) { $fh->flush or die "flush: $!"; sysseek($fh, 0, SEEK_SET) or die "seek: $!"; @@ -548,25 +575,25 @@ sub git { $_[0]->{git} } sub load_existing ($) { # for -u/--update my ($self) = @_; - my $dirs = $self->{git_dirs} // []; + my $dirs = $self->{git_dirs} //= []; if ($self->{-opt}->{update} || $self->{-opt}->{prune}) { local $self->{xdb}; $self->xdb or die "E: $self->{cidx_dir} non-existent for --update\n"; - my @missing; my @cur = grep { if (-e $_) { 1; } else { - push @missing, $_; + push @GIT_DIR_GONE, $_; undef; } } $self->all_terms('P'); - @missing = () if $self->{-opt}->{prune}; - @missing and warn "W: the following repos no longer exist:\n", - (map { "W:\t$_\n" } @missing), + if (@GIT_DIR_GONE && !$self->{-opt}->{prune}) { + warn "W: the following repos no longer exist:\n", + (map { "W:\t$_\n" } @GIT_DIR_GONE), "W: use --prune to remove them from ", $self->{cidx_dir}, "\n"; + } push @$dirs, @cur; } my %uniq; # List::Util::uniq requires Perl 5.26+ @@ -586,13 +613,12 @@ sub cidx_init ($) { } $self->lock_acquire; my @shards; - local $TXN_BYTES; for my $n (0..($self->{nshard} - 1)) { my $shard = bless { %$self, shard => $n }, ref($self); delete @$shard{qw(lockfh lock_path)}; $shard->idx_acquire; $shard->idx_release; - $shard->wq_workers_start("shard[$n]", 1, $SIGSET, { + $shard->wq_workers_start("cidx shard[$n]", 1, $SIGSET, { siblings => \@shards, # for ipc_atfork_child }, \&shard_done_wait, $self); push @shards, $shard; @@ -621,80 +647,79 @@ sub scan_git_dirs ($) { sub prune_cb { # git->check_async callback my ($hex, $type, undef, $self_id) = @_; - return if $type eq 'commit'; my ($self, $id) = @$self_id; + return if $type eq 'commit'; + progress($self, "$hex $type #$id") if ($self->{-opt}->{verbose}//0) > 1; my $len = $self->{xdb}->get_doclength($id); - progress($self, "$hex $type (doclength=$len)"); - ++$self->{pruned}; - $self->{xdb}->delete_document($id); + $TO_PRUNE{$id} = $id; - # all math around batch_bytes calculation is pretty fuzzy, + # all math around TXN_BYTES calculation is pretty fuzzy, # but need a way to regularly flush output to avoid OOM, # so assume the average term + position overhead is the # answer to everything: 42 - return if ($self->{batch_bytes} -= ($len * 42)) > 0; - cidx_ckpoint($self, "[$self->{shard}] $self->{pruned}"); - $self->{batch_bytes} = $self->{-opt}->{batch_size} // - $PublicInbox::SearchIdx::BATCH_BYTES; -} - -sub shard_prune { # via wq_io_do - my ($self, $n, $git_dir) = @_; - my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p'; - my $git = PublicInbox::Git->new($git_dir); # TMP_GIT copy - $self->begin_txn_lazy; - my $xdb = $self->{xdb}; - my $cur = $xdb->postlist_begin('Tc'); - my $end = $xdb->postlist_end('Tc'); - my ($id, @cmt, $oid); - local $self->{batch_bytes} = $self->{-opt}->{batch_size} // - $PublicInbox::SearchIdx::BATCH_BYTES; - local $self->{pruned} = 0; - for (; $cur != $end && !$DO_QUIT; $cur++) { - @cmt = xap_terms('Q', $xdb, $id = $cur->get_docid); - scalar(@cmt) == 1 or - warn "BUG? shard[$n] #$id has multiple commits: @cmt"; - for $oid (@cmt) { - $git->check_async($oid, \&prune_cb, [ $self, $id ]); - } + cidx_ckpoint($self) if ($TXN_BYTES -= ($len * 42)) <= 0; +} + +sub prune_git_dir ($$$) { + my ($self, $id, $doc) = @_; + my @P = xap_terms('P', $doc); + scalar(@P) == 1 or warn +"BUG? shard[$self->{shard}] #$id has zero or multiple paths: @P"; + for my $P (@P) { + next if exists($ACTIVE_GIT_DIR{$P}) && -d $P; + $TO_PRUNE{$id} = $id; + progress($self, "$P gone #$id"); + my $len = $self->{xdb}->get_doclength($id); + cidx_ckpoint($self) if ($TXN_BYTES -= ($len * 42)) <= 0; } - $git->async_wait_all; - for my $d ($self->all_terms('P')) { # GIT_DIR paths - last if $DO_QUIT; - next if -d $d; - for $id (docids_by_postlist($self, 'P'.$d)) { - progress($self, "$d gone #$id"); - $xdb->delete_document($id); - } - } - $self->commit_txn_lazy; - $self->{pruned} and - progress($self, "[$n] pruned $self->{pruned} commits"); - send($op_p, "shard_done $n", MSG_EOR); } -sub do_prune ($) { +sub event_step { # may be requeued via DS my ($self) = @_; - my $consumers = {}; - my $git_dir = $TMP_GIT->{git_dir}; - my $n = 0; - local $self->{-shard_ok} = {}; - for my $s (@IDX_SHARDS) { - my ($c, $p) = PublicInbox::PktOp->pair; - $c->{ops}->{shard_done} = [ $self ]; - $s->wq_io_do('shard_prune', [ $p->{op_p} ], $n, $git_dir); - $consumers->{$n++} = $c; + my $PRUNE_BATCH = 1000; + $TXN_BYTES = $BATCH_BYTES; + for (; --$PRUNE_BATCH && !$DO_QUIT && $PRUNE_CUR <= $PRUNE_MAX; + $PRUNE_CUR++) { + my $doc = $self->get_doc($PRUNE_CUR) // next; + my @cmt = xap_terms('Q', $doc); + if (scalar(@cmt) == 0) { + prune_git_dir($self, $PRUNE_CUR, $doc); + } else { + scalar(@cmt) == 1 or warn +"BUG? shard[$self->{shard}] #$PRUNE_CUR has multiple commits: @cmt"; + for my $o (@cmt) { + $TMP_GIT->check_async($o, \&prune_cb, + [$self, $PRUNE_CUR]) + } + } } - wait_consumers($self, $TMP_GIT, $consumers); + $TMP_GIT->async_wait_all; + cidx_ckpoint($self); + return PublicInbox::DS::requeue($self) if $PRUNE_CUR <= $PRUNE_MAX; + send($PRUNE_OP_P, 'prune_done', MSG_EOR); + $TMP_GIT->cleanup; + $TMP_GIT = $PRUNE_OP_P = $PRUNE_CUR = $PRUNE_MAX = undef; + %ACTIVE_GIT_DIR = (); +} + +sub prune_start { # via wq_io_do in IDX_SHARDS + my ($self, $git_dir, @active_git_dir) = @_; + $PRUNE_CUR = 1; + $PRUNE_OP_P = delete $self->{0} // die 'BUG: no {0} op_p'; + %ACTIVE_GIT_DIR = map { $_ => undef } @active_git_dir; + $TMP_GIT = PublicInbox::Git->new($git_dir); # TMP_GIT copy + $self->begin_txn_lazy; + $PRUNE_MAX = $self->{xdb}->get_lastdocid // 1; + event_step($self); } sub shards_active { # post_loop_do return if $DO_QUIT; - scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS); + scalar(grep { $_->{-cidx_quit} } (@IDX_SHARDS)); } # signal handlers -sub kill_shards { $_->wq_kill(@_) for @IDX_SHARDS } +sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) } sub parent_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->(); @@ -704,7 +729,6 @@ sub parent_quit { sub init_tmp_git_dir ($) { my ($self) = @_; - return unless $self->{-opt}->{prune}; require File::Temp; require PublicInbox::Import; my $tmp = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1); @@ -747,6 +771,18 @@ sub prep_umask ($) { undef; } +sub start_prune ($) { + my ($self) = @_; + init_tmp_git_dir($self); + my @active_git_dir = (@{$self->{git_dirs}}, @GIT_DIR_GONE); + for my $s (@IDX_SHARDS) { + my ($c, $p) = PublicInbox::PktOp->pair; + $c->{ops}->{prune_done} = [ $s ]; + $s->wq_io_do('prune_start', [ $p->{op_p} ], + $TMP_GIT->{git_dir}, @active_git_dir) + } +} + sub cidx_run { # main entry point my ($self) = @_; my $restore_umask = prep_umask($self); @@ -756,7 +792,10 @@ sub cidx_run { # main entry point my $restore = PublicInbox::OnDestroy->new($$, \&PublicInbox::DS::sig_setmask, $SIGSET); local $LIVE = {}; - local ($DO_QUIT, $TMP_GIT, $REINDEX); + local ($DO_QUIT, $TMP_GIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, + @PRUNE_DONE); + local $BATCH_BYTES = $self->{-opt}->{batch_size} // + $PublicInbox::SearchIdx::BATCH_BYTES; local @IDX_SHARDS = cidx_init($self); local $self->{current_info} = ''; local $MY_SIG = { @@ -796,13 +835,13 @@ sub cidx_run { # main entry point $_ =~ /$re/ ? (warn("# excluding $_\n"), 0) : 1; } @{$self->{git_dirs}}; } - local $self->{nchange} = 0; + local $NCHANGE = 0; local $LIVE_JOBS = $self->{-opt}->{jobs} || PublicInbox::IPC::detect_nproc() || 2; - local @RDONLY_SHARDS = $self->xdb_shards_flat; - init_tmp_git_dir($self); - do_prune($self) if $self->{-opt}->{prune}; + local @RDONLY_XDB = $self->xdb_shards_flat; + start_prune($self) if $self->{-opt}->{prune}; scan_git_dirs($self) if $self->{-opt}->{scan} // 1; + await_prune if $self->{-opt}->{prune}; for my $s (@IDX_SHARDS) { $s->{-cidx_quit} = 1; @@ -811,10 +850,10 @@ sub cidx_run { # main entry point local @PublicInbox::DS::post_loop_do = (\&shards_active); PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active(); - $self->lock_release(!!$self->{nchange}); + $self->lock_release(!!$NCHANGE); } -sub ipc_atfork_child { +sub ipc_atfork_child { # @IDX_SHARDS my ($self) = @_; $self->SUPER::ipc_atfork_child; $SIG{USR1} = \&shard_usr1; diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 3baeaa9c..b907772e 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -552,11 +552,17 @@ sub add_message { $smsg->{num}; } +sub get_doc ($$) { + my ($self, $docid) = @_; + eval { $self->{xdb}->get_document($docid) } // do { + die $@ if $@ && ref($@) !~ /\bDocNotFoundError\b/; + undef; + } +} + sub _get_doc ($$) { my ($self, $docid) = @_; - my $doc = eval { $self->{xdb}->get_document($docid) }; - $doc // do { - warn "E: $@\n" if $@; + get_doc($self, $docid) // do { warn "E: #$docid missing in Xapian\n"; undef; } ^ permalink raw reply related [flat|nested] 6+ messages in thread
* [PATCH 4/4] cindex: leave SIGTSTP and SIGCONT unblocked 2023-03-28 2:59 [PATCH 0/4] cindex updates Eric Wong ` (2 preceding siblings ...) 2023-03-28 2:59 ` [PATCH 3/4] cindex: interleave prune with indexing Eric Wong @ 2023-03-28 2:59 ` Eric Wong 3 siblings, 0 replies; 6+ messages in thread From: Eric Wong @ 2023-03-28 2:59 UTC (permalink / raw) To: meta This makes it easier to pause and restart long-running indexing jobs which use our event loop. --- lib/PublicInbox/CodeSearchIdx.pm | 3 ++- lib/PublicInbox/DS.pm | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index f60af015..8d57ec10 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -841,7 +841,8 @@ sub cidx_run { # main entry point my ($self) = @_; my $restore_umask = prep_umask($self); local $DEFER = []; - local $SIGSET = PublicInbox::DS::block_signals(); + local $SIGSET = PublicInbox::DS::block_signals( + POSIX::SIGTSTP, POSIX::SIGCONT); my $restore = PublicInbox::OnDestroy->new($$, \&PublicInbox::DS::sig_setmask, $SIGSET); local $LIVE = {}; diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 340086fc..98084b5c 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -193,10 +193,11 @@ sub RunTimers { sub sig_setmask { sigprocmask(SIG_SETMASK, @_) or die "sigprocmask: $!" } -sub block_signals () { - my $oldset = POSIX::SigSet->new; +sub block_signals { # anything in @_ stays unblocked my $newset = POSIX::SigSet->new; $newset->fillset or die "fillset: $!"; + $newset->delset($_) for @_; + my $oldset = POSIX::SigSet->new; sig_setmask($newset, $oldset); $oldset; } ^ permalink raw reply related [flat|nested] 6+ messages in thread
end of thread, other threads:[~2023-03-29 20:32 UTC | newest] Thread overview: 6+ messages (download: mbox.gz follow: Atom feed -- links below jump to the message on this page -- 2023-03-28 2:59 [PATCH 0/4] cindex updates Eric Wong 2023-03-28 2:59 ` [PATCH 1/4] cindex: simplify some internal data structures Eric Wong 2023-03-28 2:59 ` [PATCH 2/4] cindex: always break out of event loop on $DO_QUIT Eric Wong 2023-03-28 2:59 ` [PATCH 3/4] cindex: interleave prune with indexing Eric Wong 2023-03-29 20:32 ` [PATCH v2] " Eric Wong 2023-03-28 2:59 ` [PATCH 4/4] cindex: leave SIGTSTP and SIGCONT unblocked Eric Wong
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox; as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).