From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 467D21F437 for ; Tue, 28 Mar 2023 02:59:06 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1679972346; bh=B0hynPFO6ct8lu9j3FSKtz7BJ6xyxWiSkL4dUpON9fs=; h=From:To:Subject:Date:In-Reply-To:References:From; b=NsiV9dxqv7hzweURRo2knt5CiRqCsAf+sJRWjqQVShlZIfL6ZpZc9T8fTuzi22ltQ EU4R+pfkYQvHDX0z1h0gu8aP21c6Y2d2I75z7amShADSWMbWTcyea9pL0lQEWLECYF GjfSUlpwRh3LiXr3Vo21aPlIo07+sDptksrJaBdQ= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 3/4] cindex: interleave prune with indexing Date: Tue, 28 Mar 2023 02:59:03 +0000 Message-Id: <20230328025904.3822761-4-e@80x24.org> In-Reply-To: <20230328025904.3822761-1-e@80x24.org> References: <20230328025904.3822761-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: We need to ensure we don't block indexing for too long while pruning, since pruning coderepos seems more frequent and necessary than inbox repos due to the prevalence of force pushes with branches like `seen' (formerly `pu') in git.git. This requires us to workaround a current bug[1] in the XS Search::Xapian (and maybe Xapian.pm SWIG) module where a unwrapped C++ exception for DatabaseModifiedError can get thrown and be uncatchable via Perl `eval'. [1] https://lists.xapian.org/pipermail/xapian-discuss/2023-March/009967.html <20230327114604.M803690@dcvr> --- lib/PublicInbox/CodeSearchIdx.pm | 274 +++++++++++++++++++++---------- lib/PublicInbox/SearchIdx.pm | 12 +- 2 files changed, 195 insertions(+), 91 deletions(-) diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index 6907570d..f60af015 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -32,6 +32,7 @@ use PublicInbox::Spawn qw(spawn popen_rd); use PublicInbox::OnDestroy; use Socket qw(MSG_EOR); use Carp (); +use List::Util qw(max); our ( $LIVE, # pid => cmd $DEFER, # [ [ cb, @args ], ... ] @@ -39,12 +40,22 @@ our ( $MY_SIG, # like %SIG $SIGSET, $TXN_BYTES, # number of bytes in current shard transaction + $BATCH_BYTES, $DO_QUIT, # signal number - @RDONLY_SHARDS, # Xapian::Database + @RDONLY_XDB, # Xapian::Database + @WORKER_SHARDS, # read-only clones of self @IDX_SHARDS, # clones of self $MAX_SIZE, $TMP_GIT, # PublicInbox::Git object for --prune $REINDEX, # PublicInbox::SharedKV + @GIT_DIR_GONE, # [ git_dir1, git_dir2 ] + %TO_PRUNE, # (docid => docid) mapping (hash in case of retry_reopen) + $PRUNE_CUR, # per-shard document ID + $PRUNE_FH, # tracks PRUNE_CUR in the event of C++ exception + $PRUNE_SENT, # total number pruned by one of WORKER_SHARDS + $PRUNE_RECV, # pruned by one of IDX_SHARDS + $NCHANGE, # current number of changes + %ACTIVE_GIT_DIR, # GIT_DIR => undef mapping for prune ); # stop walking history if we see >$SEEN_MAX existing commits, this assumes @@ -137,7 +148,7 @@ sub store_repo { # wq_do - returns docid my $xdb = $self->{xdb}; for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed? if (defined $repo->{docid}) { - my $doc = $xdb->get_document($repo->{docid}) // + my $doc = $self->get_doc($repo->{docid}) // die "$repo->{git_dir} doc #$repo->{docid} gone"; add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct}); my %new = map { $_ => undef } @{$repo->{roots}}; @@ -198,17 +209,15 @@ EOM } # sharded reader for `git log --pretty=format: --stdin' -sub shard_index { # via wq_io_do +sub shard_index { # via wq_io_do in IDX_SHARDS my ($self, $git, $n, $roots) = @_; local $self->{current_info} = "$git->{git_dir} [$n]"; local $self->{roots} = $roots; my $in = delete($self->{0}) // die 'BUG: no {0} input'; my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p'; - my $batch_bytes = $self->{-opt}->{batch_size} // - $PublicInbox::SearchIdx::BATCH_BYTES; local $MAX_SIZE = $self->{-opt}->{max_size}; # local-ized in parent before fork - $TXN_BYTES = $batch_bytes; + $TXN_BYTES = $BATCH_BYTES; local $self->{git} = $git; # for patchid return if $DO_QUIT; my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in }); @@ -236,13 +245,13 @@ sub shard_index { # via wq_io_do $TXN_BYTES -= $len; if ($TXN_BYTES <= 0) { cidx_ckpoint($self, "[$n] $nr"); - $TXN_BYTES = $batch_bytes - $len; + $TXN_BYTES = $BATCH_BYTES - $len; } update_commit($self, $cmt); ++$nr; if ($TXN_BYTES <= 0) { cidx_ckpoint($self, "[$n] $nr"); - $TXN_BYTES = $batch_bytes; + $TXN_BYTES = $BATCH_BYTES; } $/ = $FS; } @@ -390,7 +399,7 @@ sub prep_repo ($$) { my $shard = bless { %$self, shard => $n }, ref($self); $repo->{shard_n} = $n; delete @$shard{qw(lockfh lock_path)}; - local $shard->{xdb} = $RDONLY_SHARDS[$n] // die "BUG: shard[$n] undef"; + local $shard->{xdb} = $RDONLY_XDB[$n] // die "BUG: shard[$n] undef"; $shard->retry_reopen(\&check_existing, $self, $git); } @@ -398,7 +407,7 @@ sub check_existing { # retry_reopen callback my ($shard, $self, $git) = @_; my @docids = docids_by_postlist($shard, 'P'.$git->{git_dir}); my $docid = shift(@docids) // return get_roots($self, $git); - my $doc = $shard->{xdb}->get_document($docid) // + my $doc = $shard->get_doc($docid) // die "BUG: no #$docid ($git->{git_dir})"; my $old_fp = $REINDEX ? "\0invalid" : $doc->get_data; if ($old_fp eq $git->{-repo}->{fp}) { # no change @@ -418,24 +427,24 @@ sub partition_refs ($$$) { sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin my $rfh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs }); close $refs or die "close: $!"; - my ($seen, $nchange) = (0, 0); + my $seen = 0; my @shard_in = map { $_->reopen; open my $fh, '+>', undef or die "open: $!"; $fh; - } @RDONLY_SHARDS; + } @RDONLY_XDB; while (defined(my $cmt = <$rfh>)) { chomp $cmt; - my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS); + my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_XDB); if ($REINDEX && $REINDEX->set_maybe(pack('H*', $cmt), '')) { say { $shard_in[$n] } $cmt or die "say: $!"; - ++$nchange; - } elsif (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) { + ++$NCHANGE; + } elsif (seen($RDONLY_XDB[$n], 'Q'.$cmt)) { last if ++$seen > $SEEN_MAX; } else { say { $shard_in[$n] } $cmt or die "say: $!"; - ++$nchange; + ++$NCHANGE; $seen = 0; } if ($DO_QUIT) { @@ -446,8 +455,7 @@ sub partition_refs ($$$) { close($rfh); return () if $DO_QUIT; if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) { - $self->{nchange} += $nchange; - progress($self, "$git->{git_dir}: $nchange commits"); + progress($self, "$git->{git_dir}: $NCHANGE commits"); for my $fh (@shard_in) { $fh->flush or die "flush: $!"; sysseek($fh, 0, SEEK_SET) or die "seek: $!"; @@ -548,25 +556,25 @@ sub git { $_[0]->{git} } sub load_existing ($) { # for -u/--update my ($self) = @_; - my $dirs = $self->{git_dirs} // []; + my $dirs = $self->{git_dirs} //= []; if ($self->{-opt}->{update} || $self->{-opt}->{prune}) { local $self->{xdb}; $self->xdb or die "E: $self->{cidx_dir} non-existent for --update\n"; - my @missing; my @cur = grep { if (-e $_) { 1; } else { - push @missing, $_; + push @GIT_DIR_GONE, $_; undef; } } $self->all_terms('P'); - @missing = () if $self->{-opt}->{prune}; - @missing and warn "W: the following repos no longer exist:\n", - (map { "W:\t$_\n" } @missing), + if (@GIT_DIR_GONE && !$self->{-opt}->{prune}) { + warn "W: the following repos no longer exist:\n", + (map { "W:\t$_\n" } @GIT_DIR_GONE), "W: use --prune to remove them from ", $self->{cidx_dir}, "\n"; + } push @$dirs, @cur; } my %uniq; # List::Util::uniq requires Perl 5.26+ @@ -586,13 +594,12 @@ sub cidx_init ($) { } $self->lock_acquire; my @shards; - local $TXN_BYTES; for my $n (0..($self->{nshard} - 1)) { my $shard = bless { %$self, shard => $n }, ref($self); delete @$shard{qw(lockfh lock_path)}; $shard->idx_acquire; $shard->idx_release; - $shard->wq_workers_start("shard[$n]", 1, $SIGSET, { + $shard->wq_workers_start("cidx shard[$n]", 1, $SIGSET, { siblings => \@shards, # for ipc_atfork_child }, \&shard_done_wait, $self); push @shards, $shard; @@ -619,82 +626,166 @@ sub scan_git_dirs ($) { cidx_reap($self, 0); } +sub prune_recv { # via wq_do in IDX_SHARDS, called from WORKER_SHARDS + my ($self, $to_prune) = @_; + return if $DO_QUIT; + $self->begin_txn_lazy; + $PRUNE_RECV += scalar @$to_prune; + my $xdb = $self->{xdb}; + $xdb->delete_document($_) for @$to_prune; + $self->commit_txn_lazy; + progress($self, "< prune [$self->{shard}] $PRUNE_RECV"); +} + +sub prune_ckpoint ($) { + my ($self) = @_; + $TXN_BYTES = $BATCH_BYTES; + return if $DO_QUIT; + my @to_prune = values(%TO_PRUNE) or return; + %TO_PRUNE = (); + $PRUNE_SENT += scalar(@to_prune); + progress($self, "> prune [$self->{shard}] $PRUNE_SENT"); + $IDX_SHARDS[$self->{shard}]->wq_do('prune_recv', \@to_prune); + seek($PRUNE_FH, 0, SEEK_SET) or die "seek: $!"; + print $PRUNE_FH pack('J',max(@to_prune)) or die "print: $!"; +} + +sub get_doclen { # retry_reopen callback + my ($self, $id) = @_; + $self->{xdb}->get_doclength($id); +} + sub prune_cb { # git->check_async callback my ($hex, $type, undef, $self_id) = @_; - return if $type eq 'commit'; my ($self, $id) = @$self_id; - my $len = $self->{xdb}->get_doclength($id); - progress($self, "$hex $type (doclength=$len)"); - ++$self->{pruned}; - $self->{xdb}->delete_document($id); + return ($PRUNE_CUR = $id) if $type eq 'commit'; + progress($self, "$hex $type #$id") if ($self->{-opt}->{verbose}//0) > 1; + my $len = $self->retry_reopen(\&get_doclen, $id); + $TO_PRUNE{$id} = $PRUNE_CUR = $id; - # all math around batch_bytes calculation is pretty fuzzy, + # all math around TXN_BYTES calculation is pretty fuzzy, # but need a way to regularly flush output to avoid OOM, # so assume the average term + position overhead is the # answer to everything: 42 - return if ($self->{batch_bytes} -= ($len * 42)) > 0; - cidx_ckpoint($self, "[$self->{shard}] $self->{pruned}"); - $self->{batch_bytes} = $self->{-opt}->{batch_size} // - $PublicInbox::SearchIdx::BATCH_BYTES; + return if ($TXN_BYTES -= ($len * 42)) > 0; + prune_ckpoint($self); +} + +sub prune_git_dir ($$$) { + my ($self, $id, $doc) = @_; + my @P = xap_terms('P', $doc); + scalar(@P) == 1 or warn +"BUG? shard[$self->{shard}] #$id has zero or multiple paths: @P"; + for my $P (@P) { + if (exists($ACTIVE_GIT_DIR{$P}) && -d $P) { + $PRUNE_CUR = $id; + } else { + $TO_PRUNE{$id} = $id; + progress($self, "$P gone #$id"); + my $len = $self->{xdb}->get_doclength($id); + $PRUNE_CUR = $id; + return if ($TXN_BYTES -= ($len * 42)) > 0; + prune_ckpoint($self); + } + } } -sub shard_prune { # via wq_io_do - my ($self, $n, $git_dir) = @_; - my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p'; - my $git = PublicInbox::Git->new($git_dir); # TMP_GIT copy - $self->begin_txn_lazy; - my $xdb = $self->{xdb}; - my $cur = $xdb->postlist_begin('Tc'); - my $end = $xdb->postlist_end('Tc'); - my ($id, @cmt, $oid); - local $self->{batch_bytes} = $self->{-opt}->{batch_size} // - $PublicInbox::SearchIdx::BATCH_BYTES; - local $self->{pruned} = 0; - for (; $cur != $end && !$DO_QUIT; $cur++) { - @cmt = xap_terms('Q', $xdb, $id = $cur->get_docid); - scalar(@cmt) == 1 or - warn "BUG? shard[$n] #$id has multiple commits: @cmt"; - for $oid (@cmt) { - $git->check_async($oid, \&prune_cb, [ $self, $id ]); +sub prune_all { # retry_reopen cb + my ($self, $git) = @_; + my $max = $self->{xdb}->get_lastdocid; + my $cur = $self->{xdb}->postlist_begin('Tc'); + my $end = $self->{xdb}->postlist_end('Tc'); + my $id = $PRUNE_CUR; + local %ACTIVE_GIT_DIR = map { + $_ => undef + } (@{$self->{git_dirs}}, @GIT_DIR_GONE); + + for (; $id <= $max && !$DO_QUIT; $id++) { + my $doc = $self->get_doc($id) // next; + my @cmt = xap_terms('Q', $doc); + if (scalar(@cmt) == 0) { + prune_git_dir($self, $id, $doc); + } else { + scalar(@cmt) == 1 or warn +"BUG? shard[$self->{shard}] #$id has multiple commits: @cmt"; + for my $o (@cmt) { + $git->check_async($o, \&prune_cb, [$self, $id]) + } } } - $git->async_wait_all; - for my $d ($self->all_terms('P')) { # GIT_DIR paths - last if $DO_QUIT; - next if -d $d; - for $id (docids_by_postlist($self, 'P'.$d)) { - progress($self, "$d gone #$id"); - $xdb->delete_document($id); - } +} + +sub prune_send { # via wq_io_do in WORKER_SHARDS + my ($self, $git_dir) = @_; + my $n = $self->{shard} // die 'BUG: no {shard}'; + my ($pid, $wpid); + my $cur = 1; + open my $fh, '+>', undef or die "open: $!"; + $fh->autoflush(1); + # fork off into a child to deal with unwrapped + # Xapian::DatabaseModifiedError C++ exceptions from Search::Xapian + # retry_reopen catches most of the properly wrapped-into-Perl + # exceptions, but we can't catch (non-Perl) C++ exceptions +again: + $pid = fork // die "fork: $!"; + if ($pid == 0) { + my $end = PublicInbox::OnDestroy->new($$, \&CORE::exit, 1); + my $git = PublicInbox::Git->new($git_dir); # TMP_GIT copy + local $TXN_BYTES = $BATCH_BYTES; + local $PRUNE_SENT = 0; + local $PRUNE_FH = $fh; + local $PRUNE_CUR = $cur; + local %TO_PRUNE; + $self->{xdb}->reopen; + $self->retry_reopen(\&prune_all, $git); + $git->async_wait_all; + $git->cleanup; + prune_ckpoint($self); + $PRUNE_SENT and + progress($self, "[$n] pruned $PRUNE_SENT items"); + $end->cancel; + exit 0; + } + $wpid = waitpid($pid, 0) // die "waitpid($pid) $!"; + die("W: waitpid($pid) => $wpid ($!)") if $wpid != $pid; + if ($? == 134) { + seek($fh, 0, SEEK_SET) or die "seek: $!"; + read($fh, $cur, -s $fh) // die "read: $!"; + $cur = unpack('J', $cur); + warn "W: retrying $$ on uncaught exception from #$cur..\n"; + goto again; + } elsif ($?) { + warn "W: prune_send worker exited with \$?=$?\n"; } - $self->commit_txn_lazy; - $self->{pruned} and - progress($self, "[$n] pruned $self->{pruned} commits"); - send($op_p, "shard_done $n", MSG_EOR); } -sub do_prune ($) { +sub ro_workers_start ($) { my ($self) = @_; - my $consumers = {}; - my $git_dir = $TMP_GIT->{git_dir}; - my $n = 0; - local $self->{-shard_ok} = {}; - for my $s (@IDX_SHARDS) { - my ($c, $p) = PublicInbox::PktOp->pair; - $c->{ops}->{shard_done} = [ $self ]; - $s->wq_io_do('shard_prune', [ $p->{op_p} ], $n, $git_dir); - $consumers->{$n++} = $c; + return () unless $self->{-opt}->{prune}; # TODO fsck? + init_tmp_git_dir(); + my @shards; + for my $n (0..($self->{nshard} - 1)) { + my $shard = bless { %$self, shard => $n }, ref($self); + $shard->wq_workers_start("cidx-ro shard[$n]", 1, $SIGSET, { + siblings => \@shards, # for ipc_atfork_child + -cidx_ro => 1, + }, \&shard_done_wait, $self); + push @shards, $shard; } - wait_consumers($self, $TMP_GIT, $consumers); + @shards; +} + +sub start_prune () { + $_->wq_do('prune_send', $TMP_GIT->{git_dir}) for @WORKER_SHARDS; } sub shards_active { # post_loop_do return if $DO_QUIT; - scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS); + scalar(grep { $_->{-cidx_quit} } (@WORKER_SHARDS, @IDX_SHARDS)); } # signal handlers -sub kill_shards { $_->wq_kill(@_) for @IDX_SHARDS } +sub kill_shards { $_->wq_kill(@_) for (@WORKER_SHARDS, @IDX_SHARDS) } sub parent_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->(); @@ -704,7 +795,6 @@ sub parent_quit { sub init_tmp_git_dir ($) { my ($self) = @_; - return unless $self->{-opt}->{prune}; require File::Temp; require PublicInbox::Import; my $tmp = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1); @@ -755,7 +845,9 @@ sub cidx_run { # main entry point my $restore = PublicInbox::OnDestroy->new($$, \&PublicInbox::DS::sig_setmask, $SIGSET); local $LIVE = {}; - local ($DO_QUIT, $TMP_GIT, $REINDEX); + local ($DO_QUIT, $TMP_GIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, $PRUNE_RECV); + local $BATCH_BYTES = $self->{-opt}->{batch_size} // + $PublicInbox::SearchIdx::BATCH_BYTES; local @IDX_SHARDS = cidx_init($self); local $self->{current_info} = ''; local $MY_SIG = { @@ -795,31 +887,37 @@ sub cidx_run { # main entry point $_ =~ /$re/ ? (warn("# excluding $_\n"), 0) : 1; } @{$self->{git_dirs}}; } - local $self->{nchange} = 0; + local $NCHANGE = 0; local $LIVE_JOBS = $self->{-opt}->{jobs} || PublicInbox::IPC::detect_nproc() || 2; - local @RDONLY_SHARDS = $self->xdb_shards_flat; - init_tmp_git_dir($self); - do_prune($self) if $self->{-opt}->{prune}; + local @RDONLY_XDB = $self->xdb_shards_flat; + local @WORKER_SHARDS = ro_workers_start($self); + start_prune() if $self->{-opt}->{prune}; scan_git_dirs($self) if $self->{-opt}->{scan} // 1; - for my $s (@IDX_SHARDS) { + for my $s (@WORKER_SHARDS, @IDX_SHARDS) { $s->{-cidx_quit} = 1; $s->wq_close; } local @PublicInbox::DS::post_loop_do = (\&shards_active); PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active(); - $self->lock_release(!!$self->{nchange}); + $self->lock_release(!!$NCHANGE); } -sub ipc_atfork_child { +sub ipc_atfork_child { # both @WORKER_SHARDS and @IDX_SHARDS my ($self) = @_; $self->SUPER::ipc_atfork_child; $SIG{USR1} = \&shard_usr1; $SIG{$_} = \&shard_quit for qw(INT TERM QUIT); my $x = delete $self->{siblings} // die 'BUG: no {siblings}'; $_->wq_close for @$x; + my $n = $self->{shard} // die 'BUG: no {shard}'; + if ($self->{-cidx_ro}) { + $self->{xdb} = $RDONLY_XDB[$n] // die "BUG: no RDONLY_XDB[$n]"; + $self->{xdb}->reopen; + @RDONLY_XDB = (); + } undef; } diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 3baeaa9c..b907772e 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -552,11 +552,17 @@ sub add_message { $smsg->{num}; } +sub get_doc ($$) { + my ($self, $docid) = @_; + eval { $self->{xdb}->get_document($docid) } // do { + die $@ if $@ && ref($@) !~ /\bDocNotFoundError\b/; + undef; + } +} + sub _get_doc ($$) { my ($self, $docid) = @_; - my $doc = eval { $self->{xdb}->get_document($docid) }; - $doc // do { - warn "E: $@\n" if $@; + get_doc($self, $docid) // do { warn "E: #$docid missing in Xapian\n"; undef; }