* [PATCH 1/4] ipc: support awaitpid in WQ workers
2023-03-31 10:20 [PATCH 0/4] cindex: parallelism++, sockets-- Eric Wong
@ 2023-03-31 10:20 ` Eric Wong
2023-03-31 10:20 ` [PATCH 2/4] cindex: do prune work while waiting for `git log -p' Eric Wong
` (2 subsequent siblings)
3 siblings, 0 replies; 5+ messages in thread
From: Eric Wong @ 2023-03-31 10:20 UTC (permalink / raw)
To: meta
Using signalfd is necessary to get reliable signal wakeups w/o
polling on fixed intervals. This change will make it possible
to use awaitpid in cidx shard workers so they can perform prune
work while waiting on the initial output of `git log -p'.
---
lib/PublicInbox/IPC.pm | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 1f0e87ee..cca3dacb 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -268,12 +268,14 @@ sub sock_defined {
defined($wqw->{sock});
}
-sub wq_worker_loop ($$) {
- my ($self, $bcast2) = @_;
+sub wq_worker_loop ($$$) {
+ my ($self, $bcast2, $oldset) = @_;
my $wqw = PublicInbox::WQWorker->new($self, $self->{-wq_s2});
PublicInbox::WQWorker->new($self, $bcast2) if $bcast2;
local @PublicInbox::DS::post_loop_do = (\&sock_defined, $wqw);
- PublicInbox::DS::event_loop();
+ my $sig = delete($self->{wq_sig});
+ $sig->{CHLD} //= \&PublicInbox::DS::enqueue_reap;
+ PublicInbox::DS::event_loop($sig, $oldset);
PublicInbox::DS->Reset;
}
@@ -405,8 +407,7 @@ sub _wq_worker_start {
local @$self{keys %$fields} = values(%$fields);
my $on_destroy = $self->ipc_atfork_child;
local @SIG{keys %SIG} = values %SIG;
- PublicInbox::DS::sig_setmask($oldset);
- wq_worker_loop($self, $bcast2);
+ wq_worker_loop($self, $bcast2, $oldset);
};
warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
undef $end; # trigger exit
^ permalink raw reply related [flat|nested] 5+ messages in thread
* [PATCH 2/4] cindex: do prune work while waiting for `git log -p'
2023-03-31 10:20 [PATCH 0/4] cindex: parallelism++, sockets-- Eric Wong
2023-03-31 10:20 ` [PATCH 1/4] ipc: support awaitpid in WQ workers Eric Wong
@ 2023-03-31 10:20 ` Eric Wong
2023-03-31 10:20 ` [PATCH 3/4] cindex: share PktOp socket across prune workers Eric Wong
2023-03-31 10:20 ` [PATCH 4/4] cindex: share PktOp across indexing workers Eric Wong
3 siblings, 0 replies; 5+ messages in thread
From: Eric Wong @ 2023-03-31 10:20 UTC (permalink / raw)
To: meta
`git log -p' can several seconds to generate its initial output.
SMP systems can be processing prunes during this delay, so let
DS do a one-shot notification for us while prune is running. On
Linux, we'll also use the biggest pipe possible so git can do
more CPU-intensive work to generate diffs while our Perl
processes are indexing and likely hitting I/O wait.
---
MANIFEST | 1 +
lib/PublicInbox/CidxLogP.pm | 29 ++++++++++++++++++
lib/PublicInbox/CodeSearchIdx.pm | 51 +++++++++++++++++++++-----------
3 files changed, 63 insertions(+), 18 deletions(-)
create mode 100644 lib/PublicInbox/CidxLogP.pm
diff --git a/MANIFEST b/MANIFEST
index 3c421645..a0e64c6a 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -160,6 +160,7 @@ lib/PublicInbox/AdminEdit.pm
lib/PublicInbox/AltId.pm
lib/PublicInbox/AutoReap.pm
lib/PublicInbox/Cgit.pm
+lib/PublicInbox/CidxLogP.pm
lib/PublicInbox/CmdIPC4.pm
lib/PublicInbox/CodeSearch.pm
lib/PublicInbox/CodeSearchIdx.pm
diff --git a/lib/PublicInbox/CidxLogP.pm b/lib/PublicInbox/CidxLogP.pm
new file mode 100644
index 00000000..7877d5ac
--- /dev/null
+++ b/lib/PublicInbox/CidxLogP.pm
@@ -0,0 +1,29 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+#
+# Waits for initial `git log -p' output for PublicInbox::CodeSearchIdx.
+# The initial output from `git log -p' can take a while to generate,
+# CodeSearchIdx can process prune work while it's happening. Once
+# `git log -p' starts generating output, it should be able to keep
+# up with Xapian indexing, so we still rely on blocking reads to simplify
+# cidx_read_log_p
+package PublicInbox::CidxLogP;
+use v5.12;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
+
+sub new {
+ my ($cls, $rd, $cidx, $git, $roots) = @_;
+ my $self = bless { cidx => $cidx, git => $git, roots => $roots }, $cls;
+ fcntl($rd, 1031, 1048576) if $^O eq 'linux'; # fatter pipes
+ $self->SUPER::new($rd, EPOLLIN|EPOLLONESHOT);
+}
+
+sub event_step {
+ my ($self) = @_;
+ my $rd = $self->{sock} // return warn('BUG?: no {sock}');
+ $self->close; # PublicInbox::DS::close, deferred, so $sock is usable
+ delete($self->{cidx})->cidx_read_log_p($self, $rd);
+}
+
+1;
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 035fab3e..215e337f 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -30,6 +30,7 @@ use PublicInbox::SearchIdx qw(add_val);
use PublicInbox::Config qw(glob2re);
use PublicInbox::Spawn qw(spawn popen_rd);
use PublicInbox::OnDestroy;
+use PublicInbox::CidxLogP;
use Socket qw(MSG_EOR);
use Carp ();
our (
@@ -216,20 +217,41 @@ EOM
$len;
}
-# sharded reader for `git log --pretty=format: --stdin'
+sub cidx_reap_log { # awaitpid cb
+ my ($pid, $self, $op_p) = @_;
+ if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
+ ($? & 127) == POSIX::SIGPIPE))) {
+ send($op_p, "shard_done $self->{shard}", MSG_EOR);
+ } else {
+ warn "E: git @LOG_STDIN: \$?=$?\n";
+ $self->{xdb}->cancel_transaction;
+ }
+}
+
sub shard_index { # via wq_io_do in IDX_SHARDS
- my ($self, $git, $n, $roots) = @_;
- local $self->{current_info} = "$git->{git_dir} [$n]";
- local $self->{roots} = $roots;
+ my ($self, $git, $roots) = @_;
+
my $in = delete($self->{0}) // die 'BUG: no {0} input';
my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
+ my ($rd, $pid) = $git->popen(@LOG_STDIN, undef, { 0 => $in });
+ close $in or die "close: $!";
+ awaitpid($pid, \&cidx_reap_log, $self, $op_p);
+ PublicInbox::CidxLogP->new($rd, $self, $git, $roots);
+ # CidxLogP->event_step will call cidx_read_log_p once there's input
+}
+
+# sharded reader for `git log --pretty=format: --stdin'
+sub cidx_read_log_p {
+ my ($self, $log_p, $rd) = @_;
+ my $git = delete $log_p->{git} // die 'BUG: no {git}';
+ local $self->{current_info} = "$git->{git_dir} [$self->{shard}]";
+ local $self->{roots} = delete $log_p->{roots} // die 'BUG: no {roots}';
+
local $MAX_SIZE = $self->{-opt}->{max_size};
# local-ized in parent before fork
$TXN_BYTES = $BATCH_BYTES;
local $self->{git} = $git; # for patchid
return if $DO_QUIT;
- my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
- close $in or die "close: $!";
my $nr = 0;
# a patch may have \0, see c4201214cbf10636e2c1ab9131573f735b42c8d4
@@ -238,7 +260,7 @@ sub shard_index { # via wq_io_do in IDX_SHARDS
my $len;
my $cmt = {};
local $/ = $FS;
- my $buf = <$rd> // return close($rd); # leading $FS
+ my $buf = <$rd> // return; # leading $FS
$buf eq $FS or die "BUG: not LF-NUL: $buf\n";
$self->begin_txn_lazy;
while (!$DO_QUIT && defined($buf = <$rd>)) {
@@ -251,22 +273,15 @@ sub shard_index { # via wq_io_do in IDX_SHARDS
@$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
}
if (($TXN_BYTES -= $len) <= 0) {
- cidx_ckpoint($self, "[$n] $nr");
+ cidx_ckpoint($self, "[$self->{shard}] $nr");
$TXN_BYTES -= $len; # len may be huge, >TXN_BYTES;
}
update_commit($self, $cmt);
++$nr;
- cidx_ckpoint($self, "[$n] $nr") if $TXN_BYTES <= 0;
+ cidx_ckpoint($self, "[$self->{shard}] $nr") if $TXN_BYTES <= 0;
$/ = $FS;
}
- close($rd);
- if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
- ($? & 127) == POSIX::SIGPIPE))) {
- send($op_p, "shard_done $n", MSG_EOR);
- } else {
- warn "E: git @LOG_STDIN: \$?=$?\n";
- $self->{xdb}->cancel_transaction;
- }
+ # return and wait for cidx_reap_log
}
sub shard_done { # called via PktOp on shard_index completion
@@ -537,7 +552,7 @@ sub index_repo { # cidx_await cb
$c->{ops}->{shard_done} = [ $self ];
$IDX_SHARDS[$n]->wq_io_do('shard_index',
[ $shard_in[$n], $p->{op_p} ],
- $git, $n, \@roots);
+ $git, \@roots);
$consumers->{$n} = $c;
}
@shard_in = ();
^ permalink raw reply related [flat|nested] 5+ messages in thread
* [PATCH 3/4] cindex: share PktOp socket across prune workers
2023-03-31 10:20 [PATCH 0/4] cindex: parallelism++, sockets-- Eric Wong
2023-03-31 10:20 ` [PATCH 1/4] ipc: support awaitpid in WQ workers Eric Wong
2023-03-31 10:20 ` [PATCH 2/4] cindex: do prune work while waiting for `git log -p' Eric Wong
@ 2023-03-31 10:20 ` Eric Wong
2023-03-31 10:20 ` [PATCH 4/4] cindex: share PktOp across indexing workers Eric Wong
3 siblings, 0 replies; 5+ messages in thread
From: Eric Wong @ 2023-03-31 10:20 UTC (permalink / raw)
To: meta
We can allocate fewer sockets and memory this way.
---
lib/PublicInbox/CodeSearchIdx.pm | 24 ++++++++++++++----------
1 file changed, 14 insertions(+), 10 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 215e337f..14342683 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -53,7 +53,7 @@ our (
$PRUNE_MAX, # per-shard document ID to stop at
$PRUNE_OP_P, # prune_done() notification socket
$PRUNE_NR, # total number pruned
- @PRUNE_DONE, # marks off prune completions
+ $PRUNE_DONE, # marks off prune completions
$NCHANGE, # current number of changes
%ACTIVE_GIT_DIR, # GIT_DIR => undef mapping for prune
);
@@ -290,13 +290,17 @@ sub shard_done { # called via PktOp on shard_index completion
}
sub prune_done { # called via PktOp->event_step completion
- my ($shard) = @_;
- $PRUNE_DONE[$shard->{shard}] = 1;
+ my ($self, $n) = @_;
+ return if $DO_QUIT || !$PRUNE_DONE;
+ die "BUG: \$PRUNE_DONE->[$n] already defined" if $PRUNE_DONE->[$n];
+ $PRUNE_DONE->[$n] = 1;
+ grep(defined, @$PRUNE_DONE) == @IDX_SHARDS and
+ progress($self, 'prune done')
}
-sub prune_busy {
+sub prune_busy { # post_loop_do
return if $DO_QUIT;
- grep(defined, @PRUNE_DONE) != @IDX_SHARDS;
+ grep(defined, @$PRUNE_DONE) != @IDX_SHARDS;
}
sub await_prune () {
@@ -711,7 +715,7 @@ sub event_step { # may be requeued via DS
$TMP_GIT->async_wait_all;
cidx_ckpoint($self);
return PublicInbox::DS::requeue($self) if $PRUNE_CUR <= $PRUNE_MAX;
- send($PRUNE_OP_P, 'prune_done', MSG_EOR);
+ send($PRUNE_OP_P, "prune_done $self->{shard}", MSG_EOR);
$TMP_GIT->cleanup;
$TMP_GIT = $PRUNE_OP_P = $PRUNE_CUR = $PRUNE_MAX = undef;
%ACTIVE_GIT_DIR = ();
@@ -790,9 +794,9 @@ sub start_prune ($) {
my ($self) = @_;
init_tmp_git_dir($self);
my @active_git_dir = (@{$self->{git_dirs}}, @GIT_DIR_GONE);
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ $c->{ops}->{prune_done} = [ $self ];
for my $s (@IDX_SHARDS) {
- my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{prune_done} = [ $s ];
$s->wq_io_do('prune_start', [ $p->{op_p} ],
$TMP_GIT->{git_dir}, @active_git_dir)
}
@@ -807,8 +811,8 @@ sub cidx_run { # main entry point
my $restore = PublicInbox::OnDestroy->new($$,
\&PublicInbox::DS::sig_setmask, $SIGSET);
local $LIVE = {};
- local ($DO_QUIT, $TMP_GIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE,
- @PRUNE_DONE);
+ local $PRUNE_DONE = [];
+ local ($DO_QUIT, $TMP_GIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE);
local $BATCH_BYTES = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
local @IDX_SHARDS = cidx_init($self);
^ permalink raw reply related [flat|nested] 5+ messages in thread
* [PATCH 4/4] cindex: share PktOp across indexing workers
2023-03-31 10:20 [PATCH 0/4] cindex: parallelism++, sockets-- Eric Wong
` (2 preceding siblings ...)
2023-03-31 10:20 ` [PATCH 3/4] cindex: share PktOp socket across prune workers Eric Wong
@ 2023-03-31 10:20 ` Eric Wong
3 siblings, 0 replies; 5+ messages in thread
From: Eric Wong @ 2023-03-31 10:20 UTC (permalink / raw)
To: meta
Using fewer sockets simplifies completion checks, too.
---
lib/PublicInbox/CodeSearchIdx.pm | 54 ++++++++++++++++----------------
1 file changed, 27 insertions(+), 27 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 14342683..05007afd 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -504,36 +504,35 @@ sub partition_refs ($$$) {
}
sub shard_commit { # via wq_io_do
- my ($self, $n) = @_;
+ my ($self) = @_;
my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
$self->commit_txn_lazy;
- send($op_p, "shard_done $n", MSG_EOR);
+ send($op_p, "shard_done $self->{shard}", MSG_EOR);
}
-sub consumers_open { # post_loop_do
- my (undef, $consumers) = @_;
- return if $DO_QUIT;
- scalar(grep { $_->{sock} } values %$consumers);
+sub consumer_open { # post_loop_do
+ my (undef, $c) = @_; # $c is PublicInbox::PktOp
+ $DO_QUIT ? undef : defined($c->{sock});
}
-sub wait_consumers ($$$) {
- my ($self, $git, $consumers) = @_;
- local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
+sub wait_active ($$$$) {
+ my ($self, $git, $active, $c) = @_;
+ local @PublicInbox::DS::post_loop_do = (\&consumer_open, $c);
PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
- my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
+ my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$active;
die "E: $git->{git_dir} $n shards failed" if $n && !$DO_QUIT;
}
-sub commit_used_shards ($$$) {
- my ($self, $git, $consumers) = @_;
+sub commit_active_shards ($$$) {
+ my ($self, $git, $active) = @_;
local $self->{-shard_ok} = {};
- for my $n (keys %$consumers) {
- my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{shard_done} = [ $self ];
- $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n);
- $consumers->{$n} = $c;
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ $c->{ops}->{shard_done} = [ $self ];
+ for my $n (keys %$active) {
+ $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]);
}
- wait_consumers($self, $git, $consumers);
+ undef $p;
+ wait_active($self, $git, $active, $c);
}
sub index_repo { # cidx_await cb
@@ -547,30 +546,31 @@ sub index_repo { # cidx_await cb
$repo->{roots} = \@roots;
local $self->{current_info} = $git->{git_dir};
my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
- local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1
- my $consumers = {};
+ local $self->{-shard_ok} = {};
+ my $active = {};
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ $c->{ops}->{shard_done} = [ $self ];
for my $n (0..$#shard_in) {
-s $shard_in[$n] or next;
last if $DO_QUIT;
- my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{shard_done} = [ $self ];
$IDX_SHARDS[$n]->wq_io_do('shard_index',
[ $shard_in[$n], $p->{op_p} ],
$git, \@roots);
- $consumers->{$n} = $c;
+ $active->{$n} = undef;
}
+ undef $p;
@shard_in = ();
- wait_consumers($self, $git, $consumers);
+ wait_active($self, $git, $active, $c);
if ($DO_QUIT) {
- commit_used_shards($self, $git, $consumers);
+ commit_active_shards($self, $git, $active);
progress($self, "$git->{git_dir}: done");
return;
}
$repo->{git_dir} = $git->{git_dir};
my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
if ($id > 0) {
- $consumers->{$repo->{shard_n}} = undef;
- commit_used_shards($self, $git, $consumers);
+ $active->{$repo->{shard_n}} = undef;
+ commit_active_shards($self, $git, $active);
progress($self, "$git->{git_dir}: done");
return run_deferred();
}
^ permalink raw reply related [flat|nested] 5+ messages in thread