* [PATCH 1/3] cindex: store coderepo data asynchronously
2023-10-25 15:33 [PATCH 0/3] cindex: prune fixes Eric Wong
@ 2023-10-25 15:33 ` Eric Wong
2023-10-25 15:33 ` [PATCH 2/3] cindex: quiet --prune when checking objectFormat Eric Wong
2023-10-25 15:33 ` [PATCH 3/3] cindex: fix large prunes Eric Wong
2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2023-10-25 15:33 UTC (permalink / raw)
To: meta
While it's typically fast to store coderepo data, pathological
latency on HDDs can let us use that delay to get other work
done.
---
lib/PublicInbox/CodeSearchIdx.pm | 69 +++++++++++++++++---------------
1 file changed, 36 insertions(+), 33 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index aeee37c0..f2fd28e3 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -193,8 +193,9 @@ sub progress {
$pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n");
}
-sub store_repo { # wq_do - returns docid
+sub store_repo { # wq_io_do, sends docid back
my ($self, $repo) = @_;
+ my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
$self->begin_txn_lazy;
$self->{xdb}->delete_document($_) for @{$repo->{to_delete}};
my $doc = $PublicInbox::Search::X{Document}->new;
@@ -203,12 +204,10 @@ sub store_repo { # wq_do - returns docid
$doc->add_boolean_term('T'.'r');
$doc->add_boolean_term('G'.$_) for @{$repo->{roots}};
$doc->set_data($repo->{fp}); # \n delimited
- if ($repo->{docid}) {
- $self->{xdb}->replace_document($repo->{docid}, $doc);
- $repo->{docid};
- } else {
- $self->{xdb}->add_document($doc);
- }
+ my $did = $repo->{docid};
+ $did ? $self->{xdb}->replace_document($did, $doc)
+ : ($did = $self->{xdb}->add_document($doc));
+ send($op_p, "repo_stored $did", 0);
}
sub cidx_ckpoint ($;$) {
@@ -322,6 +321,17 @@ sub shard_done { # called via PktOp on shard_index completion
$repo_ctx->{shard_ok}->{$n} = 1;
}
+sub repo_stored {
+ my ($self, $repo_ctx, $did) = @_;
+ $did > 0 or die "BUG: $repo_ctx->{repo}->{git_dir}: docid=$did";
+ my $next = PublicInbox::OnDestroy->new($$, \&next_repos, $repo_ctx);
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ $c->{ops}->{shard_done} = [ $self, $repo_ctx, $next ];
+ # shard_done fires when all shards are committed
+ my @active = keys %{$repo_ctx->{active}};
+ $IDX_SHARDS[$_]->wq_io_do('shard_commit', [ $p->{op_p} ]) for @active;
+}
+
sub prune_done { # called via prune_do completion
my ($self, $n) = @_;
return if $DO_QUIT || !$PRUNE_DONE;
@@ -584,37 +594,30 @@ sub index_next ($) {
sub next_repos { # OnDestroy cb
my ($repo_ctx) = @_;
- progress($repo_ctx->{self}, "$repo_ctx->{repo}->{git_dir}: done");
- return if $DO_QUIT;
- if ($REPO_CTX) {
- $REPO_CTX == $repo_ctx or die "BUG: $REPO_CTX != $repo_ctx";
- $REPO_CTX = undef;
- index_next($repo_ctx->{self});
- }
+ my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)};
+ progress($self, "$repo->{git_dir}: done");
+ return if $DO_QUIT || !$REPO_CTX;
+ my $n = grep { ! $repo_ctx->{shard_ok}->{$_} } keys %$active;
+ die "E: $repo->{git_dir} $n shards failed" if $n;
+ $REPO_CTX == $repo_ctx or die "BUG: $REPO_CTX != $repo_ctx";
+ $REPO_CTX = undef;
+ index_next($self);
}
-sub commit_shard { # OnDestroy cb
+sub index_done { # OnDestroy cb called when done indexing each code repo
my ($repo_ctx) = @_;
my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)};
+ return if $DO_QUIT;
my $n = grep { ! $repo_ctx->{shard_ok}->{$_} } keys %$active;
- die "E: $repo->{git_dir} $n shards failed" if $n && !$DO_QUIT;
-
- $repo_ctx->{shard_ok} = {};
- if (!$DO_QUIT) {
- my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo',
- $repo);
- (!defined($id) || $id <= 0) and
- die "E: store_repo $repo->{git_dir}: id=$id";
- $active->{$repo->{shard_n}} = undef;
- }
- my $next = PublicInbox::OnDestroy->new($$, \&next_repos, $repo_ctx);
+ die "E: $repo->{git_dir} $n shards failed" if $n;
+ $repo_ctx->{shard_ok} = {}; # reset for future shard_done
+ $n = $repo->{shard_n};
+ $active->{$n} = undef;
my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{shard_done} = [ $repo_ctx->{self}, $repo_ctx, $next ];
- for my $n (keys %$active) {
- $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]);
- }
- # shard_done fires when all shards are committed
+ $c->{ops}->{repo_stored} = [ $self, $repo_ctx ];
+ $IDX_SHARDS[$n]->wq_io_do('store_repo', [ $p->{op_p} ], $repo);
+ # repo_stored will fire once store_repo is done
}
sub index_repo { # run_git cb
@@ -637,10 +640,10 @@ sub index_repo { # run_git cb
$repo->{git_dir} = $git->{git_dir};
my $repo_ctx = $REPO_CTX = { self => $self, repo => $repo };
delete $git->{-cidx_gits_fini}; # may fire gits_fini
- my $commit_shard = PublicInbox::OnDestroy->new($$, \&commit_shard,
+ my $index_done = PublicInbox::OnDestroy->new($$, \&index_done,
$repo_ctx);
my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{shard_done} = [ $self, $repo_ctx, $commit_shard ];
+ $c->{ops}->{shard_done} = [ $self, $repo_ctx, $index_done ];
for my $n (0..$#shard_in) {
$shard_in[$n]->flush or die "flush shard[$n]: $!";
-s $shard_in[$n] or next;
^ permalink raw reply related [flat|nested] 4+ messages in thread
* [PATCH 2/3] cindex: quiet --prune when checking objectFormat
2023-10-25 15:33 [PATCH 0/3] cindex: prune fixes Eric Wong
2023-10-25 15:33 ` [PATCH 1/3] cindex: store coderepo data asynchronously Eric Wong
@ 2023-10-25 15:33 ` Eric Wong
2023-10-25 15:33 ` [PATCH 3/3] cindex: fix large prunes Eric Wong
2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2023-10-25 15:33 UTC (permalink / raw)
To: meta
Most coderepos don't have extensions.objectFormat set,
so it's senseless to emit warnings on failures.
Fixes: 709fcf00c4d5 (cindex: use run_await to read extensions.objectFormat)
---
lib/PublicInbox/CodeSearchIdx.pm | 3 ++-
t/cindex.t | 7 +++++--
2 files changed, 7 insertions(+), 3 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index f2fd28e3..80636270 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -858,7 +858,8 @@ sub prep_alternate_start {
}
my $cmd = [ 'git', "--git-dir=$git_dir",
qw(config extensions.objectFormat) ];
- run_await($cmd, undef, undef, \&prep_alternate_end, $o, $run_prune);
+ my $opt = { quiet => 1 };
+ run_await($cmd, undef, $opt, \&prep_alternate_end, $o, $run_prune);
}
sub cmd_done { # run_await cb for sort, xapian-delve, sed failures
diff --git a/t/cindex.t b/t/cindex.t
index c7de1505..09183518 100644
--- a/t/cindex.t
+++ b/t/cindex.t
@@ -13,6 +13,7 @@ my ($tmp, $for_destroy) = tmpdir();
my $pwd = getcwd();
my @unused_keys = qw(last_commit has_threadid skip_docdata);
local $ENV{PI_CONFIG} = '/dev/null';
+my $opt = { 1 => \(my $cidx_out), 2 => \(my $cidx_err) };
# I reworked CodeSearchIdx->shard_worker to handle empty trees
# in the initial commit generated by cvs2svn for xapian.git
@@ -166,7 +167,9 @@ SKIP: { # --prune
is(scalar($csrch->mset('s:hi')->items), 1, 'got hit');
rename("$tmp/wt0/.git", "$tmp/wt0/.giit");
- ok(run_script([qw(-cindex -q --prune -d), "$tmp/ext"]), 'prune');
+ ok(run_script([qw(-cindex -q --prune -d), "$tmp/ext"], undef, $opt),
+ 'prune');
+ is(${$opt->{2}}, '', 'nothing in stderr') or diag explain($opt);
$csrch->reopen;
is(scalar($csrch->mset('s:hi')->items), 0, 'hit pruned');
@@ -213,7 +216,7 @@ EOM
close $fh;
my $cmd = [ qw(-cindex -u --all --associate -d), "$tmp/ext",
'-I', $basic->{inboxdir} ];
- my $opt = { 1 => \(my $cidx_out), 2 => \(my $cidx_err) };
+ $cidx_out = $cidx_err = '';
ok(run_script($cmd, $env, $opt), 'associate w/o search');
like($cidx_err, qr/W: \Q$basic->{inboxdir}\E not indexed for search/,
'non-Xapian-enabled inbox noted');
^ permalink raw reply related [flat|nested] 4+ messages in thread
* [PATCH 3/3] cindex: fix large prunes
2023-10-25 15:33 [PATCH 0/3] cindex: prune fixes Eric Wong
2023-10-25 15:33 ` [PATCH 1/3] cindex: store coderepo data asynchronously Eric Wong
2023-10-25 15:33 ` [PATCH 2/3] cindex: quiet --prune when checking objectFormat Eric Wong
@ 2023-10-25 15:33 ` Eric Wong
2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2023-10-25 15:33 UTC (permalink / raw)
To: meta
When comm(1) has a lot of data to output, we must ensure we
explicitly close FDs of processes in previous stages of the
pipeline to ensure comm(1) to terminates properly.
This is difficult to test automatically with small test repos...
Fixes: 17b06aa32aac (cindex: start using run_await to simplify code)
---
lib/PublicInbox/CodeSearchIdx.pm | 1 +
1 file changed, 1 insertion(+)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 80636270..33080664 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -993,6 +993,7 @@ sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done
run_await(\@AWK, $CMD_ENV, $awk_opt, \&cmd_done);
run_await([@SORT, '-u'], $CMD_ENV, $sort_opt, \&cmd_done);
my $comm_rd = popen_rd(\@COMM, $CMD_ENV, $comm_opt, \&cmd_done, \@COMM);
+ %$_ = () for ($awk_opt, $sort_opt, $comm_opt); # comm_rd is blocking :<
PublicInbox::CidxComm->new($comm_rd, $self); # calls cidx_read_comm
my $git_ver = PublicInbox::Git::git_version();
push @PRUNE_BATCH, '--buffer' if $git_ver ge v2.6;
^ permalink raw reply related [flat|nested] 4+ messages in thread