* [PATCH 1/7] search: hoist out shards_dir for future use
2023-08-24 1:22 [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
@ 2023-08-24 1:22 ` Eric Wong
2023-08-24 1:22 ` [PATCH 2/7] cindex: read-only association dump Eric Wong
` (6 subsequent siblings)
7 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2023-08-24 1:22 UTC (permalink / raw)
To: meta
This will be useful for internal tooling and APIs.
---
lib/PublicInbox/Search.pm | 34 +++++++++++++++++++---------------
1 file changed, 19 insertions(+), 15 deletions(-)
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index b2de3450..1559d9b3 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -202,28 +202,32 @@ sub xdir ($;$) {
}
}
-# returns all shards as separate Xapian::Database objects w/o combining
-sub xdb_shards_flat ($) {
+# returns shard directories as an array of strings, does not verify existence
+sub shard_dirs ($) {
my ($self) = @_;
my $xpfx = $self->{xpfx};
- my (@xdb, $slow_phrase);
- load_xapian();
- $self->{qp_flags} //= $QP_FLAGS;
- if ($xpfx =~ m!/xapian[0-9]+\z!) { # v1
- @xdb = ($X{Database}->new($xpfx));
- $self->{qp_flags} |= FLAG_PHRASE() if !-f "$xpfx/iamchert";
- } else { # v2, eidx, cidx
+ if ($xpfx =~ m!/xapian[0-9]+\z!) { # v1 inbox
+ ($xpfx);
+ } else { # v2 inbox, eidx, cidx
opendir(my $dh, $xpfx) or return (); # not initialized yet
# We need numeric sorting so shard[0] is first for reading
# Xapian metadata, if needed
my $last = max(grep(/\A[0-9]+\z/, readdir($dh))) // return ();
- @xdb = map {
- my $shard_dir = "$xpfx/$_";
- $slow_phrase ||= -f "$shard_dir/iamchert";
- $X{Database}->new($shard_dir);
- } (0..$last);
- $self->{qp_flags} |= FLAG_PHRASE() if !$slow_phrase;
+ map { "$xpfx/$_" } (0..$last);
}
+}
+
+# returns all shards as separate Xapian::Database objects w/o combining
+sub xdb_shards_flat ($) {
+ my ($self) = @_;
+ load_xapian();
+ $self->{qp_flags} //= $QP_FLAGS;
+ my $slow_phrase;
+ my @xdb = map {
+ $slow_phrase ||= -f "$_/iamchert";
+ $X{Database}->new($_); # raises if missing
+ } shard_dirs($self);
+ $self->{qp_flags} |= FLAG_PHRASE() if !$slow_phrase;
@xdb;
}
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH 2/7] cindex: read-only association dump
2023-08-24 1:22 [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
2023-08-24 1:22 ` [PATCH 1/7] search: hoist out shards_dir for future use Eric Wong
@ 2023-08-24 1:22 ` Eric Wong
2023-08-24 1:22 ` [PATCH 3/7] cindex: add --show-roots switch Eric Wong
` (5 subsequent siblings)
7 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2023-08-24 1:22 UTC (permalink / raw)
To: meta
This will eventually allow associating coderepos with inboxes
and vice-versa; avoiding the need for manual configuration via
tedious publicinbox.*.coderepo directives.
I'm not sure how this should be stored for WWW, yet, but it's
required since it takes about 8 hours to do this fully across
lore and git.kernel.org.
---
MANIFEST | 3 +
lib/PublicInbox/CidxDumpIbx.pm | 59 +++++
lib/PublicInbox/CidxDumpShardRoots.pm | 73 ++++++
lib/PublicInbox/CidxRecvIbx.pm | 46 ++++
lib/PublicInbox/CodeSearchIdx.pm | 306 ++++++++++++++++++++++----
lib/PublicInbox/Config.pm | 2 +-
lib/PublicInbox/Search.pm | 2 +-
script/public-inbox-cindex | 4 +-
8 files changed, 452 insertions(+), 43 deletions(-)
create mode 100644 lib/PublicInbox/CidxDumpIbx.pm
create mode 100644 lib/PublicInbox/CidxDumpShardRoots.pm
create mode 100644 lib/PublicInbox/CidxRecvIbx.pm
diff --git a/MANIFEST b/MANIFEST
index 1001ca08..162e3038 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -162,7 +162,10 @@ lib/PublicInbox/AltId.pm
lib/PublicInbox/AutoReap.pm
lib/PublicInbox/Cgit.pm
lib/PublicInbox/CidxComm.pm
+lib/PublicInbox/CidxDumpIbx.pm
+lib/PublicInbox/CidxDumpShardRoots.pm
lib/PublicInbox/CidxLogP.pm
+lib/PublicInbox/CidxRecvIbx.pm
lib/PublicInbox/CmdIPC4.pm
lib/PublicInbox/CodeSearch.pm
lib/PublicInbox/CodeSearchIdx.pm
diff --git a/lib/PublicInbox/CidxDumpIbx.pm b/lib/PublicInbox/CidxDumpIbx.pm
new file mode 100644
index 00000000..e1bc273d
--- /dev/null
+++ b/lib/PublicInbox/CidxDumpIbx.pm
@@ -0,0 +1,59 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Intended for PublicInbox::DS::event_loop for -cindex --associate
+# Iterating through mset->items is slow in Perl due to method dispatch
+# and that loop may implemented in C++ using Xapian directly
+package PublicInbox::CidxDumpIbx;
+use v5.12;
+use PublicInbox::Search qw(xap_terms);
+use PublicInbox::DS;
+use Socket qw(MSG_EOR);
+
+sub start {
+ my ($rcvibx, $ibx_id) = @_;
+ my $cidx = $rcvibx->{cidx};
+ my $ibx = $cidx->{IBX}->[$ibx_id] // die "BUG: no IBX[$ibx_id]";
+ my $self = bless { rcvibx => $rcvibx, ekey => $ibx->eidx_key,
+ ibx_id => $ibx_id }, __PACKAGE__;
+ $self->{srch} = $ibx->isrch // do {
+ warn("W: $self->{ekey} has no search index (ignoring)\n");
+ return undef;
+ };
+ my $opt = { limit => $cidx->assoc_max_init, relevance => -2 };
+ $self->{mset} = $self->{srch}->mset($rcvibx->{qry_str}, $opt);
+ $self->{iter} = 0;
+ event_step($self);
+}
+
+sub event_step {
+ my ($self) = @_;
+ my $rcvibx = $self->{rcvibx} // die 'BUG: no rcvibx';
+ return if $rcvibx->{cidx}->do_quit;
+ my $last = $self->{mset}->size - 1;
+ my $cur = $self->{iter};
+ my $end = $cur + 9999;
+ if ($end >= $last) {
+ send($rcvibx->{op_p}, 'index_next', MSG_EOR);
+ $end = $last;
+ }
+ $self->{iter} = $end + 1;
+ local $0 = "dumping $self->{ekey} $cur..$end";
+
+ my $sort_w = $rcvibx->{sort_w};
+ my $ibx_id = $self->{ibx_id};
+ local $0 = "dumping $self->{ekey} $cur..$end";
+ $rcvibx->{cidx}->progress($0);
+ for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop
+ my $doc = $x->get_document;
+ for my $p (@{$rcvibx->{cidx}->{ASSOC_PFX}}) {
+ for (xap_terms($p, $doc)) {
+ print $sort_w "$_ $ibx_id\n" or die "print: $!";
+ }
+ }
+ }
+ $end < $last && !$rcvibx->{cidx}->do_quit and
+ PublicInbox::DS::requeue($self);
+}
+
+1;
diff --git a/lib/PublicInbox/CidxDumpShardRoots.pm b/lib/PublicInbox/CidxDumpShardRoots.pm
new file mode 100644
index 00000000..34afa419
--- /dev/null
+++ b/lib/PublicInbox/CidxDumpShardRoots.pm
@@ -0,0 +1,73 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Intended for PublicInbox::DS::event_loop for -cindex --associate
+# Iterating through mset->items is slow in Perl due to method dispatch
+# and that loop may implemented in C++ using Xapian directly
+package PublicInbox::CidxDumpShardRoots;
+use v5.12;
+use PublicInbox::Lock;
+use PublicInbox::Search qw(xap_terms);
+use Socket qw(MSG_EOR);
+
+sub start {
+ my ($cidx, $root2id, $qry_str) = @_;
+ my $op_p = delete($cidx->{0}) // die 'BUG: no {0} op_p';
+ my $sort_w = delete($cidx->{1}) // die 'BUG: no {1} $w sort pipe';
+ # sort lock is necessary if we have may root ids which cause a
+ # row length to exceed POSIX PIPE_BUF (via `$G' below)
+ my $sort_lk = bless { lock_path => $cidx->tmpdir.'/to_root_id.lock' },
+ 'PublicInbox::Lock';
+ $sort_w->autoflush(1);
+ $cidx->begin_txn_lazy; # only using txn to simplify writer subs
+ my $opt = { limit => $cidx->assoc_max_init, relevance => -2 };
+ my $self = bless {
+ cidx => $cidx,
+ op_p => $op_p,
+ iter => 0,
+ mset => $cidx->mset($qry_str, $opt),
+ root2id => $root2id,
+ sort_w => $sort_w,
+ sort_lk => $sort_lk,
+ }, __PACKAGE__;
+ event_step($self);
+}
+
+sub event_step {
+ my ($self) = @_;
+ my $cidx = $self->{cidx};
+ return if $cidx->do_quit;
+ my $last = $self->{mset}->size - 1;
+ my $cur = $self->{iter};
+ my $end = $cur + 9999;
+ $end = $last if $end > $last;
+ $self->{iter} = $end + 1;
+ local $0 = "dumping shard [$cidx->{shard}] $cur..$end";
+ $cidx->progress($0);
+
+ my $root2id = $self->{root2id};
+ my $buf = '';
+ for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop
+ my $doc = $x->get_document;
+ my $G = join(' ', map {
+ $root2id->{pack('H*', $_)};
+ } xap_terms('G', $doc));
+ for my $p (@{$cidx->{ASSOC_PFX}}) {
+ $buf .= "$_ $G\n" for (xap_terms($p, $doc));
+ }
+ }
+ $self->{sort_lk}->lock_acquire_fast;
+ print { $self->{sort_w} } $buf or die "print: $!";
+ $self->{sort_lk}->lock_release_fast;
+ $end < $last && !$cidx->do_quit and
+ PublicInbox::DS::requeue($self);
+}
+
+sub DESTROY {
+ my ($self) = @_;
+ return if $self->{cidx}->do_quit;
+ send($self->{op_p},
+ "dump_shard_roots_done $self->{cidx}->{shard}", MSG_EOR);
+}
+
+1;
diff --git a/lib/PublicInbox/CidxRecvIbx.pm b/lib/PublicInbox/CidxRecvIbx.pm
new file mode 100644
index 00000000..6add8e54
--- /dev/null
+++ b/lib/PublicInbox/CidxRecvIbx.pm
@@ -0,0 +1,46 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+#
+# dumps all per-inbox info for -cindex --associate
+# integrated into the event loop for signalfd SIGINT handling
+package PublicInbox::CidxRecvIbx;
+use v5.12;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLEXCLUSIVE);
+use Socket qw(MSG_EOR);
+use PublicInbox::CidxDumpIbx;
+
+sub new {
+ my ($cls, $cidx, $qry_str) = @_;
+ my ($op_p, $r_ibx, $sort_w) = delete @$cidx{0..2};
+ $op_p // die 'BUG: no $op_p';
+ $r_ibx // die 'BUG: no $r_ibx';
+ $sort_w // die 'BUG: no $sort_w';
+ my $self = bless {}, $cls;
+ $self->SUPER::new($r_ibx, EPOLLIN|EPOLLEXCLUSIVE);
+ $self->{cidx} = $cidx;
+ $self->{sort_w} = $sort_w;
+ $self->{op_p} = $op_p; # PublicInbox::CidxDumpIbx uses this
+ $self->{qry_str} = $qry_str;
+ # writes to this pipe are never longer than POSIX PIPE_BUF,
+ # so rely on POSIX atomicity guarantees
+ $sort_w->autoflush(1);
+ $self;
+}
+
+sub event_step {
+ my ($self) = @_;
+ recv($self->{sock}, my $ibx_id, 25, 0) // die "recv: $!";
+ return $self->close if $ibx_id eq '' || $self->{cidx}->do_quit;
+ PublicInbox::CidxDumpIbx::start($self, $ibx_id);
+}
+
+sub close {
+ my ($self) = @_;
+ $self->{cidx}->do_quit or
+ send($self->{op_p},
+ "recv_ibx_done $self->{cidx}->{shard}", MSG_EOR);
+ $self->SUPER::close; # PublicInbox::DS::close
+}
+
+1;
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index ba14e52a..2480dbd2 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -11,6 +11,26 @@
#
# We shard repos using the first 32-bits of sha256($ABS_GIT_DIR)
#
+# --associate joins root commits of coderepos to inboxes based on prefixes.
+#
+# Internally, each inbox is assigned a non-negative integer index ($IBX_ID),
+# and each root commit object ID (SHA-1/SHA-256 hex) is also assigned
+# a non-negative integer index ($ROOT_COMMIT_OID_ID).
+#
+# associate dumps to 2 intermediate files in $TMPDIR:
+#
+# * to_root_id - each line is of the format:
+#
+# $PFX $ROOT_COMMIT_OID_ID
+#
+# * to_ibx_id - each line is of the format:
+#
+# $PFX $IBX_ID
+#
+# In both cases, $PFX is typically the value of the patchid (XDFID) but it
+# can be configured to use any combination of patchid, dfpre, dfpost or
+# dfblob.
+#
# See PublicInbox::CodeSearch (read-only API) for more
package PublicInbox::CodeSearchIdx;
use v5.12;
@@ -25,14 +45,14 @@ use File::Spec ();
use PublicInbox::SHA qw(sha256_hex);
use PublicInbox::Search qw(xap_terms);
use PublicInbox::SearchIdx qw(add_val);
-use PublicInbox::Config qw(glob2re);
+use PublicInbox::Config qw(glob2re rel2abs_collapsed);
use PublicInbox::Spawn qw(which spawn popen_rd);
use PublicInbox::OnDestroy;
use PublicInbox::CidxLogP;
use PublicInbox::CidxComm;
use PublicInbox::Git qw(%OFMT2HEXLEN);
use PublicInbox::Compat qw(uniqstr);
-use Socket qw(MSG_EOR);
+use Socket qw(MSG_EOR AF_UNIX SOCK_SEQPACKET);
use Carp ();
our (
$LIVE, # pid => cmd
@@ -55,8 +75,15 @@ our (
%ALT_FH, # hexlen => tmp IO for TMPDIR git alternates
$TMPDIR, # File::Temp->newdir object for prune
@PRUNE_QUEUE, # GIT_DIRs to prepare for pruning
- $PRUNE_ENV, # env for awk(1), comm(1), sort(1) commands during prune
+ %TODO, @IBXQ, @IBX,
+ @JOIN, # join(1) command for associate
+ $CMD_ENV, # env for awk(1), comm(1), sort(1) commands during prune
@AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands
+ @ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST
+ $QRY_STR, # common query string for both code and inbox associations
+ $IBXDIR_FEED, # SOCK_SEQPACKET
+ @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK, # for associate
+ @ID2ROOT,
);
# stop walking history if we see >$SEEN_MAX existing commits, this assumes
@@ -64,6 +91,9 @@ our (
# git walks commits quickly if it doesn't have to read trees
our $SEEN_MAX = 100000;
+# window for commits/emails to determine a inbox <-> coderepo association
+my $ASSOC_MAX = 50000;
+
our @PRUNE_BATCH = qw(git _ cat-file --batch-all-objects --batch-check);
# TODO: do we care about committer name + email? or tree OID?
@@ -455,6 +485,91 @@ sub shard_commit { # via wq_io_do
send($op_p, "shard_done $self->{shard}", MSG_EOR);
}
+sub dump_shard_roots_done { # via PktOp on dump_shard_roots completion
+ my ($self, $associate, $n) = @_;
+ return if $DO_QUIT;
+ progress($self, "dump_shard_roots [$n] done");
+ $DUMP_SHARD_ROOTS_OK[$n] = 1;
+ # may run associate()
+}
+
+sub assoc_max_init ($) {
+ my ($self) = @_;
+ my $max = $self->{-opt}->{'associate-max'} // $ASSOC_MAX;
+ $max = $ASSOC_MAX if !$max;
+ $max < 0 ? ((2 ** 31) - 1) : $max;
+}
+
+# dump the patchids of each shard: $XDFID $ROOT1 $ROOT2..
+sub dump_shard_roots { # via wq_io_do for associate
+ my ($self, $root2id, $qry_str) = @_;
+ PublicInbox::CidxDumpShardRoots::start($self, $root2id, $qry_str);
+}
+
+sub dump_roots_once {
+ my ($self, $associate) = @_;
+ $associate // die 'BUG: no $associate';
+ $TODO{associating} = 1; # keep shards_active() happy
+ progress($self, 'dumping IDs from coderepos');
+ local $self->{xdb};
+ @ID2ROOT = map { pack('H*', $_) } $self->all_terms('G');
+ my $id = 0;
+ my %root2id = map { $_ => $id++ } @ID2ROOT;
+ pipe(my ($r, $w)) or die "pipe: $!";
+ my @sort = (@SORT, '-k1,1');
+ my $dst = "$TMPDIR/to_root_id";
+ open my $fh, '>', $dst or die "open($dst): $!";
+ my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $r, 1 => $fh });
+ close $r or die "close: $!";
+ awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ $c->{ops}->{dump_shard_roots_done} = [ $self, $associate ];
+ my @arg = ('dump_shard_roots', [ $p->{op_p}, $w ], \%root2id, $QRY_STR);
+ $_->wq_io_do(@arg) for @IDX_SHARDS;
+ progress($self, 'waiting on dump_shard_roots sort');
+}
+
+sub recv_ibx_done { # via PktOp on recv_ibx completion
+ my ($self, $pid, $n) = @_;
+ return if $DO_QUIT;
+ progress($self, "recv_ibx [$n] done");
+ $RECV_IBX_OK[$n] = 1;
+}
+
+# causes a worker to become a dumper for inbox/extindex
+sub recv_ibx { # wq_io_do
+ my ($self, $qry_str) = @_;
+ PublicInbox::CidxRecvIbx->new($self, $qry_str);
+}
+
+sub dump_ibx { # sends to PublicInbox::CidxRecvIbx::event_step
+ my ($self, $id_dir) = @_; # id_dir: "$IBX_ID=$INBOXDIR"
+ my $n = length($id_dir);
+ my $w = send($IBXDIR_FEED, $id_dir, MSG_EOR) // die "send: $!";
+ $n == $w or die "send($id_dir) $w != $n";
+}
+
+# repurpose shard workers to dump inbox patchids with perfect balance
+sub dump_ibx_start {
+ my ($self, $associate) = @_;
+ pipe(my ($sort_r, $sort_w)) or die "pipe: $!";
+ my @sort = (@SORT, '-k1,1');
+ my $dst = "$TMPDIR/to_ibx_id";
+ open my $fh, '>', $dst or die "open($dst): $!";
+ my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fh });
+ close $sort_r or die "close: $!";
+ awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
+
+ my ($r, $w);
+ socketpair($r, $w, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!";
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ $c->{ops}->{recv_ibx_done} = [ $self, $associate ];
+ $c->{ops}->{index_next} = [ $self ];
+ my $io = [ $p->{op_p}, $r, $sort_w ];
+ $_->wq_io_do('recv_ibx', $io, $QRY_STR) for @IDX_SHARDS;
+ $IBXDIR_FEED = $w;
+}
+
sub index_next ($) {
my ($self) = @_;
return if $DO_QUIT;
@@ -466,6 +581,12 @@ sub index_next ($) {
$self, $git);
fp_start($self, $git, $prep_repo);
ct_start($self, $git, $prep_repo);
+ } elsif ($TMPDIR) {
+ delete $TODO{dump_ibx_start}; # runs OnDestroy once
+ return dump_ibx($self, shift @IBXQ) if @IBXQ;
+ progress($self, 'done dumping inboxes') if $IBXDIR_FEED;
+ undef $IBXDIR_FEED; # done dumping inboxes, dump roots
+ dump_roots_once($self, delete($TODO{associate}) // return);
}
# else: wait for shards_active (post_loop_do) callback
}
@@ -502,7 +623,7 @@ sub commit_shard { # OnDestroy cb
for my $n (keys %$active) {
$IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]);
}
- undef $p; # shard_done fires when all shards are committed
+ # shard_done fires when all shards are committed
}
sub index_repo { # cidx_await cb
@@ -628,8 +749,8 @@ EOM
sub scan_git_dirs ($) {
my ($self) = @_;
- @$GIT_TODO = @{$self->{git_dirs}};
- index_next($self) for (1..$LIVE_JOBS);
+ my $n = @$GIT_TODO = @{$self->{git_dirs}};
+ progress($self, "scanning $n code repositories...");
}
sub prune_do { # via wq_io_do in IDX_SHARDS
@@ -661,7 +782,7 @@ sub shards_active { # post_loop_do
return if grep(defined, $PRUNE_DONE, $GIT_TODO, $IDX_TODO, $LIVE) != 4;
return 1 if grep(defined, @$PRUNE_DONE) != @IDX_SHARDS;
return 1 if scalar(@$GIT_TODO) || scalar(@$IDX_TODO) || $REPO_CTX;
- return 1 if keys(%$LIVE);
+ return 1 if keys(%$LIVE) || @IBXQ || keys(%TODO);
for my $s (grep { $_->{-wq_s1} } @IDX_SHARDS) {
$s->{-cidx_quit} = 1 if defined($s->{-wq_s1});
$s->wq_close; # may recurse via awaitpid outside of event_loop
@@ -674,6 +795,7 @@ sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) }
sub parent_quit {
$DO_QUIT = POSIX->can("SIG$_[0]")->();
+ $IBXDIR_FEED = undef;
kill_shards(@_);
warn "# SIG$_[0] received, quitting...\n";
}
@@ -717,6 +839,7 @@ sub prep_alternate_end { # awaitpid callback for config extensions.objectFormat
E: ignoring objdir=$objdir, unknown extensions.objectFormat=$fmt
EOM
unless ($ALT_FH{$hexlen}) {
+ require PublicInbox::Import;
my $git_dir = "$TMPDIR/hexlen$hexlen.git";
PublicInbox::Import::init_bare($git_dir, 'cidx-all', $fmt);
my $f = "$git_dir/objects/info/alternates";
@@ -739,52 +862,102 @@ sub prep_alternate_start {
awaitpid($pid, \&prep_alternate_end, $o, $out, $run_prune);
}
-sub prune_cmd_done { # awaitpid cb for sort, xapian-delve, sed failures
- my ($pid, $cmd, $run_prune) = @_;
+sub cmd_done { # awaitpid cb for sort, xapian-delve, sed failures
+ my ($pid, $cmd, $run_on_destroy) = @_;
$? and die "@$cmd failed: \$?=$?";
+ # $run_on_destroy calls associate() or run_prune()
+}
+
+# runs once all inboxes and shards are dumped via OnDestroy
+sub associate {
+ my ($self) = @_;
+ return if $DO_QUIT;
+ @IDX_SHARDS or return warn("# aborting on no shards\n");
+ grep(defined, @DUMP_SHARD_ROOTS_OK) == @IDX_SHARDS or
+ die "E: shards not dumped properly\n";
+ grep(defined, @RECV_IBX_OK) == @IDX_SHARDS or
+ die "E: inboxes not dumped properly\n";
+ progress($self, 'associating...');
+ my @join = ('time', @JOIN, 'to_ibx_id', 'to_root_id');
+ my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" });
+ my %score;
+ while (<$rd>) { # PFX ibx_id root_id
+ my (undef, $ibx_id, @root_id) = split(/ /, $_);
+ ++$score{"$ibx_id $_"} for @root_id;
+ }
+ close $rd or die "@join failed: $?=$?";
+ my $min = $self->{-opt}->{'assoc-min'} // 10;
+ progress($self, scalar(keys %score).' potential pairings...');
+ for my $k (keys %score) {
+ my $nr = $score{$k};
+ my ($ibx_id, $root) = split(/ /, $k);
+ my $ekey = $IBX[$ibx_id]->eidx_key;
+ $root = unpack('H*', $ID2ROOT[$root]);
+ progress($self, "$ekey => $root has $nr matches");
+ }
+ delete $TODO{associating}; # break out of shards_active()
+ # TODO
+ warn "# Waiting for $TMPDIR/cont @JOIN";
+ system "ls -Rl $TMPDIR >&2";
+ system "wc -l $TMPDIR/to_*_id >&2";
+ #sleep(1) until -f "$TMPDIR/cont";
+ # warn "# Waiting for $TMPDIR/cont";
+ # sleep(1) until -f "$TMPDIR/cont";
+}
+
+sub require_progs {
+ my $op = shift;
+ while (my ($x, $argv) = splice(@_, 0, 2)) {
+ my $e = $x;
+ $e =~ tr/a-z-/A-Z_/;
+ my $c = $ENV{$e} // $x;
+ $argv->[0] //= which($c) // die "E: `$x' required for --$op\n";
+ }
+}
+
+sub init_associate_postfork ($) {
+ my ($self) = @_;
+ return unless $self->{-opt}->{associate};
+ require_progs('associate', join => \@JOIN);
+ $QRY_STR = $self->{-opt}->{'associate-date-range'} // '1.year.ago..';
+ substr($QRY_STR, 0, 0) = 'dt:';
+ scalar(@{$self->{git_dirs} // []}) or die <<EOM;
+E: no coderepos to associate
+EOM
+ my $approx_git = PublicInbox::Git->new($self->{git_dirs}->[0]); # ugh
+ $self->query_approxidate($approx_git, $QRY_STR); # in-place
+ $TODO{associate} = PublicInbox::OnDestroy->new($$, \&associate, $self);
+ $TODO{dump_ibx_start} = PublicInbox::OnDestroy->new($$,
+ \&dump_ibx_start, $self, $TODO{associate});
+ my $id = -1;
+ @IBXQ = map { ++$id } @IBX;
}
sub init_prune ($) {
my ($self) = @_;
return (@$PRUNE_DONE = map { 1 } @IDX_SHARDS) if !$self->{-opt}->{prune};
- require File::Temp;
- require PublicInbox::Import;
- $TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
-
# Dealing with millions of commits here at once, so use faster tools.
# xapian-delve is nearly an order-of-magnitude faster than Xapian Perl
# bindings. sed/awk are faster than Perl for simple stream ops, and
# sort+comm are more memory-efficient with gigantic lists
my @delve = (undef, qw(-A Q -1));
my @sed = (undef, '-ne', 's/^Q//p');
- @SORT = (undef, '-u');
@COMM = (undef, qw(-2 -3 indexed_commits -));
@AWK = (undef, '$2 == "commit" { print $1 }'); # --batch-check output
- my @x = ('xapian-delve' => \@delve, sed => \@sed,
- sort => \@SORT, comm => \@COMM, awk => \@AWK);
- while (my ($x, $argv) = splice(@x, 0, 2)) {
- my $e = $x;
- $e =~ tr/a-z-/A-Z_/;
- my $c = $ENV{$e} // $x;
- $argv->[0] = which($c) // die "E: `$x' required for --prune\n";
- }
+ require_progs('prune', 'xapian-delve' => \@delve, sed => \@sed,
+ comm => \@COMM, awk => \@AWK);
for (0..$#IDX_SHARDS) { push @delve, "$self->{xpfx}/$_" }
- for (qw(parallel compress-program buffer-size)) { # GNU sort options
- my $v = $self->{-opt}->{"sort-$_"};
- push @SORT, "--$_=$v" if defined $v;
- }
my $run_prune = PublicInbox::OnDestroy->new($$, \&run_prune, $self);
pipe(my ($sed_in, $delve_out)) or die "pipe: $!";
pipe(my ($sort_in, $sed_out)) or die "pipe: $!";
open(my $sort_out, '+>', "$TMPDIR/indexed_commits") or die "open: $!";
- $PRUNE_ENV = { TMPDIR => "$TMPDIR", LC_ALL => 'C', LANG => 'C' };
- my $pid = spawn(\@SORT, $PRUNE_ENV, { 0 => $sort_in, 1 => $sort_out });
- awaitpid($pid, \&prune_cmd_done, \@SORT, $run_prune);
- $pid = spawn(\@sed, $PRUNE_ENV, { 0 => $sed_in, 1 => $sed_out });
- awaitpid($pid, \&prune_cmd_done, \@sed, $run_prune);
+ my $pid = spawn(\@SORT, $CMD_ENV, { 0 => $sort_in, 1 => $sort_out });
+ awaitpid($pid, \&cmd_done, \@SORT, $run_prune);
+ $pid = spawn(\@sed, $CMD_ENV, { 0 => $sed_in, 1 => $sed_out });
+ awaitpid($pid, \&cmd_done, \@sed, $run_prune);
$pid = spawn(\@delve, undef, { 1 => $delve_out });
- awaitpid($pid, \&prune_cmd_done, \@delve, $run_prune);
+ awaitpid($pid, \&cmd_done, \@delve, $run_prune);
@PRUNE_QUEUE = @{$self->{git_dirs}};
for (1..$LIVE_JOBS) {
prep_alternate_start(shift(@PRUNE_QUEUE) // last, $run_prune);
@@ -809,14 +982,14 @@ sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done
pipe(my ($awk_in, $batch_out)) or die "pipe: $!";
pipe(my ($sort_in, $awk_out)) or die "pipe: $!";
pipe(my ($comm_in, $sort_out)) or die "pipe: $!";
- my $awk_pid = spawn(\@AWK, $PRUNE_ENV, { 0 => $awk_in, 1 => $awk_out });
- my $sort_pid = spawn(\@SORT, $PRUNE_ENV,
+ my $awk_pid = spawn(\@AWK, $CMD_ENV, { 0 => $awk_in, 1 => $awk_out });
+ my $sort_pid = spawn(\@SORT, $CMD_ENV,
{ 0 => $sort_in, 1 => $sort_out });
- my ($comm_rd, $comm_pid) = popen_rd(\@COMM, $PRUNE_ENV,
+ my ($comm_rd, $comm_pid) = popen_rd(\@COMM, $CMD_ENV,
{ 0 => $comm_in, -C => "$TMPDIR" });
- awaitpid($awk_pid, \&prune_cmd_done, \@AWK);
- awaitpid($sort_pid, \&prune_cmd_done, \@SORT);
- awaitpid($comm_pid, \&prune_cmd_done, \@COMM);
+ awaitpid($awk_pid, \&cmd_done, \@AWK);
+ awaitpid($sort_pid, \&cmd_done, \@SORT);
+ awaitpid($comm_pid, \&cmd_done, \@COMM);
PublicInbox::CidxComm->new($comm_rd, $self); # calls cidx_read_comm
my $git_ver = PublicInbox::Git::git_version();
push @PRUNE_BATCH, '--buffer' if $git_ver ge v2.6;
@@ -856,6 +1029,35 @@ sub cidx_read_comm { # via PublicInbox::CidxComm::event_step
for (@gone) { close $_ or die "close: $!" };
}
+sub init_associate_prefork ($) {
+ my ($self) = @_;
+ return unless $self->{-opt}->{associate};
+ require PublicInbox::CidxRecvIbx;
+ require PublicInbox::CidxDumpShardRoots;
+ $self->{-pi_cfg} = PublicInbox::Config->new;
+ my @unknown;
+ my @pfx = @{$self->{-opt}->{'associate-prefixes'} // [ 'patchid' ]};
+ for (map { split(/\s*,\s*/) } @pfx) {
+ my $v = $PublicInbox::Search::PATCH_BOOL_COMMON{$_} //
+ push(@unknown, $_);
+ push(@ASSOC_PFX, split(/ /, $v));
+ }
+ die <<EOM if @unknown;
+--associate-prefixes contains unsupported prefixes: @unknown
+EOM
+ @ASSOC_PFX = uniqstr @ASSOC_PFX;
+ my %incl = map {
+ rel2abs_collapsed($_) => undef;
+ } (@{$self->{-opt}->{include} // []});
+ $self->{-pi_cfg}->each_inbox(\&_prep_ibx, $self, \%incl);
+}
+
+sub _prep_ibx { # each_inbox callback
+ my ($ibx, $self, $incl) = @_;
+ ($self->{-opt}->{all} || exists($incl->{$ibx->{inboxdir}})) and
+ push @{$self->{IBX}}, $ibx;
+}
+
sub cidx_run { # main entry point
my ($self) = @_;
my $restore_umask = prep_umask($self);
@@ -868,10 +1070,27 @@ sub cidx_run { # main entry point
local $IDX_TODO = [];
local $GIT_TODO = [];
local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
- $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, @SORT, $PRUNE_ENV);
+ $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
+ %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $IBXDIR_FEED, @ID2ROOT,
+ @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK);
local $BATCH_BYTES = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
- local @IDX_SHARDS = cidx_init($self);
+ local @SORT = (undef, '-u');
+ local $self->{IBX} = \@IBX;
+ local $self->{ASSOC_PFX} = \@ASSOC_PFX;
+ local $self->{-pi_cfg};
+ if (grep { $_ } @{$self->{-opt}}{qw(prune associate)}) {
+ require File::Temp;
+ $TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
+ $CMD_ENV = { TMPDIR => "$TMPDIR", LC_ALL => 'C', LANG => 'C' };
+ require_progs('(prune|associate)', sort => \@SORT);
+ for (qw(parallel compress-program buffer-size)) { # GNU sort
+ my $v = $self->{-opt}->{"sort-$_"};
+ push @SORT, "--$_=$v" if defined $v;
+ }
+ init_associate_prefork($self)
+ }
+ local @IDX_SHARDS = cidx_init($self); # forks workers
local $self->{current_info} = '';
local $MY_SIG = {
CHLD => \&PublicInbox::DS::enqueue_reap,
@@ -919,7 +1138,9 @@ sub cidx_run { # main entry point
PublicInbox::IPC::detect_nproc() || 2;
local @RDONLY_XDB = $self->xdb_shards_flat;
init_prune($self);
+ init_associate_postfork($self);
scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
+ index_next($self) for (1..$LIVE_JOBS);
# FreeBSD ignores/discards SIGCHLD while signals are blocked and
# EVFILT_SIGNAL is inactive, so we pretend we have a SIGCHLD pending
@@ -954,4 +1175,9 @@ sub shard_done_wait { # awaitpid cb via ipc_worker_reap
PublicInbox::DS::enqueue_reap() if !shards_active(); # once more for PLC
}
+sub do_quit { $DO_QUIT }
+
+sub tmpdir { $TMPDIR }
+
+
1;
diff --git a/lib/PublicInbox/Config.pm b/lib/PublicInbox/Config.pm
index 2f1b4122..0a6b210f 100644
--- a/lib/PublicInbox/Config.pm
+++ b/lib/PublicInbox/Config.pm
@@ -11,7 +11,7 @@ package PublicInbox::Config;
use strict;
use v5.10.1;
use parent qw(Exporter);
-our @EXPORT_OK = qw(glob2re);
+our @EXPORT_OK = qw(glob2re rel2abs_collapsed);
use PublicInbox::Inbox;
use PublicInbox::Spawn qw(popen_rd);
our $LD_PRELOAD = $ENV{LD_PRELOAD}; # only valid at startup
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index 1559d9b3..d5b0bceb 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -197,7 +197,7 @@ sub xdir ($;$) {
my ($self, $rdonly) = @_;
if ($rdonly || !defined($self->{shard})) {
$self->{xpfx};
- } else { # v2 + extindex only:
+ } else { # v2, extindex, cindex only:
"$self->{xpfx}/$self->{shard}";
}
}
diff --git a/script/public-inbox-cindex b/script/public-inbox-cindex
index 2f7796e7..888c8b10 100755
--- a/script/public-inbox-cindex
+++ b/script/public-inbox-cindex
@@ -26,8 +26,10 @@ See public-inbox-cindex(1) man page for full documentation.
EOF
my $opt = { fsync => 1, scan => 1 }; # --no-scan is hidden
GetOptions($opt, qw(quiet|q verbose|v+ reindex jobs|j=i fsync|sync! dangerous
- indexlevel|index-level|L=s
+ indexlevel|index-level|L=s associate associate-max=i
+ associate-date-range=s associate-prefixes=s@
batch_size|batch-size=s max_size|max-size=s
+ include|I=s@ only=s@ all
project-list=s exclude=s@
sort-parallel=s sort-compress-program=s sort-buffer-size=s
d=s update|u scan! prune dry-run|n C=s@ help|h))
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH 3/7] cindex: add --show-roots switch
2023-08-24 1:22 [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
2023-08-24 1:22 ` [PATCH 1/7] search: hoist out shards_dir for future use Eric Wong
2023-08-24 1:22 ` [PATCH 2/7] cindex: read-only association dump Eric Wong
@ 2023-08-24 1:22 ` Eric Wong
2023-08-24 1:22 ` [PATCH 4/7] introduce optional C++ xap_helper Eric Wong
` (4 subsequent siblings)
7 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2023-08-24 1:22 UTC (permalink / raw)
To: meta
This aids in development, but I'm not sure it's going to stay
or be moved into another interface.
---
lib/PublicInbox/CodeSearchIdx.pm | 32 ++++++++++++++++++++++++++++++++
script/public-inbox-cindex | 2 +-
2 files changed, 33 insertions(+), 1 deletion(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 2480dbd2..e795c2b3 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -1058,6 +1058,37 @@ sub _prep_ibx { # each_inbox callback
push @{$self->{IBX}}, $ibx;
}
+sub show_roots { # for diagnostics
+ my ($self) = @_;
+ local $self->{xdb};
+ my $cur = $self->xdb->allterms_begin('G');
+ my $end = $self->{xdb}->allterms_end('G');
+ my $qrepo = $PublicInbox::Search::X{Query}->new('T'.'r');
+ my $enq = $PublicInbox::Search::X{Enquire}->new($self->{xdb});
+ $enq->set_weighting_scheme($PublicInbox::Search::X{BoolWeight}->new);
+ $enq->set_docid_order($PublicInbox::Search::ENQ_ASCENDING);
+ for (; $cur != $end; $cur++) {
+ my $G_oidhex = $cur->get_termname;
+ my $qry = $PublicInbox::Search::X{Query}->new(
+ PublicInbox::Search::OP_FILTER(),
+ $qrepo, $G_oidhex);
+ $enq->set_query($qry);
+ my ($off, $lim) = (0, 10000);
+ say 'commit ',substr($G_oidhex, 1), ' appears in:';
+ while (1) {
+ my $mset = $enq->get_mset($off, $lim);
+ my $size = $mset->size or last;
+ for my $x ($mset->items) {
+ my $doc = $x->get_document;
+ for (xap_terms('P', $x->get_document)) {
+ say '- /', substr($_, 1);
+ }
+ }
+ $off += $size;
+ }
+ }
+}
+
sub cidx_run { # main entry point
my ($self) = @_;
my $restore_umask = prep_umask($self);
@@ -1150,6 +1181,7 @@ sub cidx_run { # main entry point
PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active();
PublicInbox::DS->Reset;
$self->lock_release(!!$NCHANGE);
+ show_roots($self) if $self->{-opt}->{'show-roots'} # for diagnostics
}
sub ipc_atfork_child { # @IDX_SHARDS
diff --git a/script/public-inbox-cindex b/script/public-inbox-cindex
index 888c8b10..0526434c 100755
--- a/script/public-inbox-cindex
+++ b/script/public-inbox-cindex
@@ -29,7 +29,7 @@ GetOptions($opt, qw(quiet|q verbose|v+ reindex jobs|j=i fsync|sync! dangerous
indexlevel|index-level|L=s associate associate-max=i
associate-date-range=s associate-prefixes=s@
batch_size|batch-size=s max_size|max-size=s
- include|I=s@ only=s@ all
+ include|I=s@ only=s@ all show-roots
project-list=s exclude=s@
sort-parallel=s sort-compress-program=s sort-buffer-size=s
d=s update|u scan! prune dry-run|n C=s@ help|h))
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH 4/7] introduce optional C++ xap_helper
2023-08-24 1:22 [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
` (2 preceding siblings ...)
2023-08-24 1:22 ` [PATCH 3/7] cindex: add --show-roots switch Eric Wong
@ 2023-08-24 1:22 ` Eric Wong
2023-08-24 11:23 ` Štěpán Němec
2023-08-24 1:22 ` [PATCH 5/7] cindex: fix sorting and uniqueness Eric Wong
` (3 subsequent siblings)
7 siblings, 1 reply; 11+ messages in thread
From: Eric Wong @ 2023-08-24 1:22 UTC (permalink / raw)
To: meta
This allows us to perform the expensive "dump_ibx" operations in
native C++ code using the Xapian C++ library. This provides the
majority of the speedup with the -cindex --associate switch.
Eventually this may be expanded to cover all uses of Xapian
within the project to ensure we have access to Xapian APIs which
aren't available in XS|SWIG bindings; and also for
ease-of-installation on systems which don't provide
pre-packaged Perl Xapian bindings (e.g. OpenBSD 7.3) but
do provide Xapian development libraries.
Most of the C++ code is still C, as I'm not remotely familiar
with C++ compared to C. I suspect many users and potential
hackers being from git, Linux kernel, and glibc world are in the
same boat.
---
MANIFEST | 7 +-
lib/PublicInbox/CidxDumpIbx.pm | 59 ---
lib/PublicInbox/CidxXapHelperAux.pm | 48 ++
lib/PublicInbox/CodeSearch.pm | 54 ++-
lib/PublicInbox/CodeSearchIdx.pm | 78 ++--
lib/PublicInbox/Isearch.pm | 5 +
lib/PublicInbox/Search.pm | 56 ++-
lib/PublicInbox/XapClient.pm | 50 +++
lib/PublicInbox/XapHelper.pm | 144 ++++++
lib/PublicInbox/XapHelperCxx.pm | 93 ++++
lib/PublicInbox/xap_helper.h | 654 ++++++++++++++++++++++++++++
t/xap_helper.t | 147 +++++++
12 files changed, 1278 insertions(+), 117 deletions(-)
delete mode 100644 lib/PublicInbox/CidxDumpIbx.pm
create mode 100644 lib/PublicInbox/CidxXapHelperAux.pm
create mode 100644 lib/PublicInbox/XapClient.pm
create mode 100644 lib/PublicInbox/XapHelper.pm
create mode 100644 lib/PublicInbox/XapHelperCxx.pm
create mode 100644 lib/PublicInbox/xap_helper.h
create mode 100644 t/xap_helper.t
diff --git a/MANIFEST b/MANIFEST
index 162e3038..4f61af42 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -162,10 +162,10 @@ lib/PublicInbox/AltId.pm
lib/PublicInbox/AutoReap.pm
lib/PublicInbox/Cgit.pm
lib/PublicInbox/CidxComm.pm
-lib/PublicInbox/CidxDumpIbx.pm
lib/PublicInbox/CidxDumpShardRoots.pm
lib/PublicInbox/CidxLogP.pm
lib/PublicInbox/CidxRecvIbx.pm
+lib/PublicInbox/CidxXapHelperAux.pm
lib/PublicInbox/CmdIPC4.pm
lib/PublicInbox/CodeSearch.pm
lib/PublicInbox/CodeSearchIdx.pm
@@ -368,8 +368,12 @@ lib/PublicInbox/WwwListing.pm
lib/PublicInbox/WwwStatic.pm
lib/PublicInbox/WwwStream.pm
lib/PublicInbox/WwwText.pm
+lib/PublicInbox/XapClient.pm
+lib/PublicInbox/XapHelper.pm
+lib/PublicInbox/XapHelperCxx.pm
lib/PublicInbox/Xapcmd.pm
lib/PublicInbox/gcf2_libgit2.h
+lib/PublicInbox/xap_helper.h
sa_config/Makefile
sa_config/README
sa_config/root/etc/spamassassin/public-inbox.pre
@@ -610,6 +614,7 @@ t/www_altid.t
t/www_listing.t
t/www_static.t
t/x-unknown-alpine.eml
+t/xap_helper.t
t/xcpdb-reshard.t
version-gen.perl
xt/cmp-msgstr.t
diff --git a/lib/PublicInbox/CidxDumpIbx.pm b/lib/PublicInbox/CidxDumpIbx.pm
deleted file mode 100644
index e1bc273d..00000000
--- a/lib/PublicInbox/CidxDumpIbx.pm
+++ /dev/null
@@ -1,59 +0,0 @@
-# Copyright (C) all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-
-# Intended for PublicInbox::DS::event_loop for -cindex --associate
-# Iterating through mset->items is slow in Perl due to method dispatch
-# and that loop may implemented in C++ using Xapian directly
-package PublicInbox::CidxDumpIbx;
-use v5.12;
-use PublicInbox::Search qw(xap_terms);
-use PublicInbox::DS;
-use Socket qw(MSG_EOR);
-
-sub start {
- my ($rcvibx, $ibx_id) = @_;
- my $cidx = $rcvibx->{cidx};
- my $ibx = $cidx->{IBX}->[$ibx_id] // die "BUG: no IBX[$ibx_id]";
- my $self = bless { rcvibx => $rcvibx, ekey => $ibx->eidx_key,
- ibx_id => $ibx_id }, __PACKAGE__;
- $self->{srch} = $ibx->isrch // do {
- warn("W: $self->{ekey} has no search index (ignoring)\n");
- return undef;
- };
- my $opt = { limit => $cidx->assoc_max_init, relevance => -2 };
- $self->{mset} = $self->{srch}->mset($rcvibx->{qry_str}, $opt);
- $self->{iter} = 0;
- event_step($self);
-}
-
-sub event_step {
- my ($self) = @_;
- my $rcvibx = $self->{rcvibx} // die 'BUG: no rcvibx';
- return if $rcvibx->{cidx}->do_quit;
- my $last = $self->{mset}->size - 1;
- my $cur = $self->{iter};
- my $end = $cur + 9999;
- if ($end >= $last) {
- send($rcvibx->{op_p}, 'index_next', MSG_EOR);
- $end = $last;
- }
- $self->{iter} = $end + 1;
- local $0 = "dumping $self->{ekey} $cur..$end";
-
- my $sort_w = $rcvibx->{sort_w};
- my $ibx_id = $self->{ibx_id};
- local $0 = "dumping $self->{ekey} $cur..$end";
- $rcvibx->{cidx}->progress($0);
- for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop
- my $doc = $x->get_document;
- for my $p (@{$rcvibx->{cidx}->{ASSOC_PFX}}) {
- for (xap_terms($p, $doc)) {
- print $sort_w "$_ $ibx_id\n" or die "print: $!";
- }
- }
- }
- $end < $last && !$rcvibx->{cidx}->do_quit and
- PublicInbox::DS::requeue($self);
-}
-
-1;
diff --git a/lib/PublicInbox/CidxXapHelperAux.pm b/lib/PublicInbox/CidxXapHelperAux.pm
new file mode 100644
index 00000000..c9a5ddad
--- /dev/null
+++ b/lib/PublicInbox/CidxXapHelperAux.pm
@@ -0,0 +1,48 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Intended for PublicInbox::DS::event_loop for -cindex --associate,
+# this reports auxilliary status while dumping
+package PublicInbox::CidxXapHelperAux;
+use v5.12;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN);
+
+# rpipe connects to req->fp[1] in xap_helper.h
+sub new {
+ my ($cls, $rpipe, $cidx, $pfx, $associate) = @_;
+ my $self = bless {
+ cidx => $cidx,
+ pfx => $pfx,
+ associate => $associate
+ }, $cls;
+ $rpipe->blocking(0);
+ $self->SUPER::new($rpipe, EPOLLIN);
+}
+
+sub event_step {
+ my ($self) = @_; # xap_helper.h is line-buffered
+ my $buf = delete($self->{buf}) // '';
+ my $n = sysread($self->{sock}, $buf, 65536, length($buf));
+ if (!defined($n)) {
+ return if $!{EAGAIN};
+ die "sysread: $!";
+ }
+ my $pfx = $self->{pfx};
+ if ($n == 0) {
+ $self->{cidx}->progress("$pfx $buf") if $buf ne '';
+ return $self->close;
+ }
+ my @lines = split(/^/m, $buf);
+ $self->{buf} = pop @lines if substr($lines[-1], -1) ne "\n";
+ for my $l (@lines) {
+ if ($l =~ /\Amset\.size=[0-9]+\n\z/) {
+ delete $self->{cidx}->{PENDING}->{$pfx};
+ $self->{cidx}->index_next;
+ }
+ chomp $l;
+ $self->{cidx}->progress("$pfx $l");
+ }
+}
+
+1;
diff --git a/lib/PublicInbox/CodeSearch.pm b/lib/PublicInbox/CodeSearch.pm
index a5ccce03..6234e259 100644
--- a/lib/PublicInbox/CodeSearch.pm
+++ b/lib/PublicInbox/CodeSearch.pm
@@ -16,6 +16,12 @@ use constant {
# in refs/{heads,tags}. AT(col=0) may be used to store disk usage
# in the future, but disk usage calculation is espensive w/ alternates
};
+our @CODE_NRP;
+our @CODE_VMAP = (
+ [ AT, 'd:' ], # mairix compat
+ [ AT, 'dt:' ], # mail compat
+ [ CT, 'ct:' ],
+);
# note: the non-X term prefix allocations are shared with Xapian omega,
# see xapian-applications/omega/docs/termprefixes.rst
@@ -45,15 +51,17 @@ sub new {
bless { xpfx => "$dir/cidx".CIDX_SCHEMA_VER }, $cls;
}
-sub cqparse_new ($) {
+sub qparse_new ($) {
my ($self) = @_;
my $qp = $self->qp_init_common;
my $cb = $qp->can('add_valuerangeprocessor') //
$qp->can('add_rangeprocessor'); # Xapian 1.5.0+
- $cb->($qp, $PublicInbox::Search::NVRP->new(AT, 'd:')); # mairix compat
- $cb->($qp, $PublicInbox::Search::NVRP->new(AT, 'dt:')); # mail compat
- $cb->($qp, $PublicInbox::Search::NVRP->new(CT, 'ct:'));
-
+ if (!@CODE_NRP) {
+ @CODE_NRP = map {
+ $PublicInbox::Search::NVRP->new(@$_)
+ } @CODE_VMAP;
+ }
+ $cb->($qp, $_) for @CODE_NRP;
while (my ($name, $pfx) = each %bool_pfx_external) {
$qp->add_boolean_prefix($name, $_) for split(/ /, $pfx);
}
@@ -63,6 +71,40 @@ sub cqparse_new ($) {
$qp;
}
+sub generate_cxx () { # generates snippet for xap_helper.h
+ my ($line, $file) = (__LINE__ + 2, __FILE__);
+ my $ret = <<EOM;
+# line ${\__LINE__} "${\__FILE__}"
+static NRP *code_nrp[${\scalar(@CODE_VMAP)}];
+static void code_nrp_init(void)
+{
+EOM
+ for (0..$#CODE_VMAP) {
+ my $x = $CODE_VMAP[$_];
+ $ret .= qq{\tcode_nrp[$_] = new NRP($x->[0], "$x->[1]");\n}
+ }
+$ret .= <<EOM;
+}
+
+# line ${\__LINE__} "${\__FILE__}"
+static void qp_init_code_search(Xapian::QueryParser *qp)
+{
+ for (size_t i = 0; i < MY_ARRAY_SIZE(code_nrp); i++)
+ qp->ADD_RP(code_nrp[i]);
+EOM
+ for my $name (sort keys %bool_pfx_external) {
+ for (split(/ /, $bool_pfx_external{$name})) {
+ $ret .= qq{\tqp->add_boolean_prefix("$name", "$_");\n}
+ }
+ }
+ for my $name (sort keys %prob_prefix) {
+ for (split(/ /, $prob_prefix{$name})) {
+ $ret .= qq{\tqp->add_prefix("$name", "$_");\n}
+ }
+ }
+ $ret .= "}\n";
+}
+
# returns a Xapian::Query to filter by roots
sub roots_filter { # retry_reopen callback
my ($self, $git_dir) = @_;
@@ -89,7 +131,7 @@ sub roots_filter { # retry_reopen callback
sub mset {
my ($self, $qry_str, $opt) = @_;
- my $qp = $self->{qp} //= cqparse_new($self);
+ my $qp = $self->{qp} //= qparse_new($self);
my $qry = $qp->parse_query($qry_str, $self->{qp_flags});
# limit to commits with shared roots
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index e795c2b3..b8afecd2 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -42,6 +42,7 @@ use PublicInbox::IPC qw(nproc_shards);
use POSIX qw(WNOHANG SEEK_SET);
use File::Path ();
use File::Spec ();
+use List::Util qw(max);
use PublicInbox::SHA qw(sha256_hex);
use PublicInbox::Search qw(xap_terms);
use PublicInbox::SearchIdx qw(add_val);
@@ -69,6 +70,9 @@ our (
@GIT_DIR_GONE, # [ git_dir1, git_dir2 ]
$PRUNE_DONE, # marks off prune completions
$NCHANGE, # current number of changes
+ $NPROC,
+ $XH_PID, # XapHelper PID
+ $XHC, # XapClient
$REPO_CTX, # current repo being indexed in shards
$IDX_TODO, # [ $git0, $root0, $git1, $root1, ...]
$GIT_TODO, # [ GIT_DIR0, GIT_DIR1, ...]
@@ -81,8 +85,8 @@ our (
@AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands
@ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST
$QRY_STR, # common query string for both code and inbox associations
- $IBXDIR_FEED, # SOCK_SEQPACKET
- @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK, # for associate
+ @DUMP_SHARD_ROOTS_OK, # for associate
+ $DUMP_IBX_WPIPE, # goes to sort(1)
@ID2ROOT,
);
@@ -529,45 +533,29 @@ sub dump_roots_once {
progress($self, 'waiting on dump_shard_roots sort');
}
-sub recv_ibx_done { # via PktOp on recv_ibx completion
- my ($self, $pid, $n) = @_;
- return if $DO_QUIT;
- progress($self, "recv_ibx [$n] done");
- $RECV_IBX_OK[$n] = 1;
-}
-
-# causes a worker to become a dumper for inbox/extindex
-sub recv_ibx { # wq_io_do
- my ($self, $qry_str) = @_;
- PublicInbox::CidxRecvIbx->new($self, $qry_str);
-}
-
-sub dump_ibx { # sends to PublicInbox::CidxRecvIbx::event_step
- my ($self, $id_dir) = @_; # id_dir: "$IBX_ID=$INBOXDIR"
- my $n = length($id_dir);
- my $w = send($IBXDIR_FEED, $id_dir, MSG_EOR) // die "send: $!";
- $n == $w or die "send($id_dir) $w != $n";
+sub dump_ibx { # sends to xap_helper.h
+ my ($self, $ibx_id) = @_;
+ my $ibx = $IBX[$ibx_id] // die "BUG: no IBX[$ibx_id]";
+ my @cmd = ('dump_ibx', $ibx->isrch->xh_args,
+ (map { ('-A', $_) } @ASSOC_PFX),
+ $ibx_id, $QRY_STR);
+ pipe(my ($r, $w)) or die "pipe: $!";
+ $XHC->mkreq([$DUMP_IBX_WPIPE, $w], @cmd);
+ my $ekey = $ibx->eidx_key;
+ $self->{PENDING}->{$ekey} = undef;
+ PublicInbox::CidxXapHelperAux->new($r, $self, $ekey, $TODO{associate});
}
-# repurpose shard workers to dump inbox patchids with perfect balance
sub dump_ibx_start {
my ($self, $associate) = @_;
- pipe(my ($sort_r, $sort_w)) or die "pipe: $!";
+ pipe(my $sort_r, $DUMP_IBX_WPIPE) or die "pipe: $!";
my @sort = (@SORT, '-k1,1');
my $dst = "$TMPDIR/to_ibx_id";
open my $fh, '>', $dst or die "open($dst): $!";
my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fh });
close $sort_r or die "close: $!";
awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
-
- my ($r, $w);
- socketpair($r, $w, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!";
- my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{recv_ibx_done} = [ $self, $associate ];
- $c->{ops}->{index_next} = [ $self ];
- my $io = [ $p->{op_p}, $r, $sort_w ];
- $_->wq_io_do('recv_ibx', $io, $QRY_STR) for @IDX_SHARDS;
- $IBXDIR_FEED = $w;
+ ($XHC, $XH_PID) = PublicInbox::XapClient::start_helper("-j$NPROC");
}
sub index_next ($) {
@@ -584,8 +572,8 @@ sub index_next ($) {
} elsif ($TMPDIR) {
delete $TODO{dump_ibx_start}; # runs OnDestroy once
return dump_ibx($self, shift @IBXQ) if @IBXQ;
- progress($self, 'done dumping inboxes') if $IBXDIR_FEED;
- undef $IBXDIR_FEED; # done dumping inboxes, dump roots
+ progress($self, 'done dumping inboxes') if $DUMP_IBX_WPIPE;
+ undef $DUMP_IBX_WPIPE; # done dumping inboxes, dump roots
dump_roots_once($self, delete($TODO{associate}) // return);
}
# else: wait for shards_active (post_loop_do) callback
@@ -795,7 +783,7 @@ sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) }
sub parent_quit {
$DO_QUIT = POSIX->can("SIG$_[0]")->();
- $IBXDIR_FEED = undef;
+ $XHC = undef;
kill_shards(@_);
warn "# SIG$_[0] received, quitting...\n";
}
@@ -875,8 +863,8 @@ sub associate {
@IDX_SHARDS or return warn("# aborting on no shards\n");
grep(defined, @DUMP_SHARD_ROOTS_OK) == @IDX_SHARDS or
die "E: shards not dumped properly\n";
- grep(defined, @RECV_IBX_OK) == @IDX_SHARDS or
- die "E: inboxes not dumped properly\n";
+ my @pending = keys %{$self->{PENDING}};
+ die "E: pending=@pending jobs not done\n" if @pending;
progress($self, 'associating...');
my @join = ('time', @JOIN, 'to_ibx_id', 'to_root_id');
my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" });
@@ -1032,8 +1020,9 @@ sub cidx_read_comm { # via PublicInbox::CidxComm::event_step
sub init_associate_prefork ($) {
my ($self) = @_;
return unless $self->{-opt}->{associate};
- require PublicInbox::CidxRecvIbx;
require PublicInbox::CidxDumpShardRoots;
+ require PublicInbox::CidxXapHelperAux;
+ require PublicInbox::XapClient;
$self->{-pi_cfg} = PublicInbox::Config->new;
my @unknown;
my @pfx = @{$self->{-opt}->{'associate-prefixes'} // [ 'patchid' ]};
@@ -1055,7 +1044,7 @@ EOM
sub _prep_ibx { # each_inbox callback
my ($ibx, $self, $incl) = @_;
($self->{-opt}->{all} || exists($incl->{$ibx->{inboxdir}})) and
- push @{$self->{IBX}}, $ibx;
+ push @IBX, $ibx;
}
sub show_roots { # for diagnostics
@@ -1102,13 +1091,13 @@ sub cidx_run { # main entry point
local $GIT_TODO = [];
local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
$REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
- %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $IBXDIR_FEED, @ID2ROOT,
- @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK);
+ %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE,
+ @ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC);
local $BATCH_BYTES = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
local @SORT = (undef, '-u');
- local $self->{IBX} = \@IBX;
local $self->{ASSOC_PFX} = \@ASSOC_PFX;
+ local $self->{PENDING} = {};
local $self->{-pi_cfg};
if (grep { $_ } @{$self->{-opt}}{qw(prune associate)}) {
require File::Temp;
@@ -1165,13 +1154,14 @@ sub cidx_run { # main entry point
@GIT_DIR_GONE = uniqstr @GIT_DIR_GONE, @excl;
}
local $NCHANGE = 0;
- local $LIVE_JOBS = $self->{-opt}->{jobs} ||
- PublicInbox::IPC::detect_nproc() || 2;
+ local $NPROC = PublicInbox::IPC::detect_nproc();
+ local $LIVE_JOBS = $self->{-opt}->{jobs} || $NPROC || 2;
local @RDONLY_XDB = $self->xdb_shards_flat;
init_prune($self);
init_associate_postfork($self);
scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
- index_next($self) for (1..$LIVE_JOBS);
+ my $max = $TODO{associate} ? max($LIVE_JOBS, $NPROC) : $LIVE_JOBS;
+ index_next($self) for (1..$max);
# FreeBSD ignores/discards SIGCHLD while signals are blocked and
# EVFILT_SIGNAL is inactive, so we pretend we have a SIGCHLD pending
diff --git a/lib/PublicInbox/Isearch.pm b/lib/PublicInbox/Isearch.pm
index 5cac08ba..62112171 100644
--- a/lib/PublicInbox/Isearch.pm
+++ b/lib/PublicInbox/Isearch.pm
@@ -123,4 +123,9 @@ sub has_threadid { 1 }
sub help { $_[0]->{es}->help }
+sub xh_args { # prep getopt args to feed to xap_helper.h socket
+ my ($self, $opt) = @_; # TODO uid_range
+ ($self->{es}->xh_args, '-O', $self->{eidx_key});
+}
+
1;
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index d5b0bceb..2e784646 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -66,6 +66,15 @@ our $NVRP; # '$Xap::'.('NumberValueRangeProcessor' or 'NumberRangeProcessor')
# let's hope the ABI is stable
our $ENQ_DESCENDING = 0;
our $ENQ_ASCENDING = 1;
+our @MAIL_VMAP = (
+ [ YYYYMMDD, 'd:'],
+ [ DT, 'dt:' ],
+ # these are undocumented for WWW, but lei and IMAP use them
+ [ BYTES, 'z:' ],
+ [ TS, 'rt:' ],
+ [ UID, 'uid:' ]
+);
+our @MAIL_NRP;
sub load_xapian () {
return 1 if defined $Xap;
@@ -101,6 +110,7 @@ sub load_xapian () {
# or make indexlevel=medium as default
$QP_FLAGS = FLAG_PHRASE() | FLAG_BOOLEAN() | FLAG_LOVEHATE() |
FLAG_WILDCARD();
+ @MAIL_NRP = map { $NVRP->new(@$_) } @MAIL_VMAP;
return 1;
}
undef;
@@ -490,14 +500,8 @@ sub qparse_new {
my $qp = qp_init_common($self);
my $cb = $qp->can('add_valuerangeprocessor') //
$qp->can('add_rangeprocessor'); # Xapian 1.5.0+
- $cb->($qp, $NVRP->new(YYYYMMDD, 'd:'));
- $cb->($qp, $NVRP->new(DT, 'dt:'));
-
- # for IMAP, undocumented for WWW and may be split off go away
- $cb->($qp, $NVRP->new(BYTES, 'z:'));
- $cb->($qp, $NVRP->new(TS, 'rt:'));
- $cb->($qp, $NVRP->new(UID, 'uid:'));
+ $cb->($qp, $_) for @MAIL_NRP;
while (my ($name, $prefix) = each %bool_pfx_external) {
$qp->add_boolean_prefix($name, $_) foreach split(/ /, $prefix);
}
@@ -527,6 +531,40 @@ EOF
$qp;
}
+sub generate_cxx () { # generates snippet for xap_helper.h
+ my $ret = <<EOM;
+# line ${\__LINE__} "${\__FILE__}"
+static NRP *mail_nrp[${\scalar(@MAIL_VMAP)}];
+static void mail_nrp_init(void)
+{
+EOM
+ for (0..$#MAIL_VMAP) {
+ my $x = $MAIL_VMAP[$_];
+ $ret .= qq{\tmail_nrp[$_] = new NRP($x->[0], "$x->[1]");\n}
+ }
+$ret .= <<EOM;
+}
+
+# line ${\__LINE__} "${\__FILE__}"
+static void qp_init_mail_search(Xapian::QueryParser *qp)
+{
+ for (size_t i = 0; i < MY_ARRAY_SIZE(mail_nrp); i++)
+ qp->ADD_RP(mail_nrp[i]);
+EOM
+ for my $name (sort keys %bool_pfx_external) {
+ for (split(/ /, $bool_pfx_external{$name})) {
+ $ret .= qq{\tqp->add_boolean_prefix("$name", "$_");\n}
+ }
+ }
+ # TODO: altid support
+ for my $name (sort keys %prob_prefix) {
+ for (split(/ /, $prob_prefix{$name})) {
+ $ret .= qq{\tqp->add_prefix("$name", "$_");\n}
+ }
+ }
+ $ret .= "}\n";
+}
+
sub help {
my ($self) = @_;
$self->{qp} //= $self->qparse_new; # parse altids
@@ -585,4 +623,8 @@ sub all_terms {
wantarray ? (sort keys %ret) : \%ret;
}
+sub xh_args { # prep getopt args to feed to xap_helper.h socket
+ map { ('-d', $_) } shard_dirs($_[0]);
+}
+
1;
diff --git a/lib/PublicInbox/XapClient.pm b/lib/PublicInbox/XapClient.pm
new file mode 100644
index 00000000..56e3c3b4
--- /dev/null
+++ b/lib/PublicInbox/XapClient.pm
@@ -0,0 +1,50 @@
+#!perl -w
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# This talks to (XapHelperCxx.pm + xap_helper.h) or XapHelper.pm
+# and will eventually allow users with neither XS nor SWIG Perl
+# bindings to use Xapian as long as they have Xapian development
+# headers/libs and a C++ compiler
+package PublicInbox::XapClient;
+use v5.12;
+use PublicInbox::Spawn qw(spawn);
+use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR);
+use PublicInbox::IPC;
+
+sub mkreq {
+ my ($self, $ios, @arg) = @_;
+ my ($r, $w, $n);
+ if (!defined($ios->[0])) {
+ pipe($r, $w) or die "pipe: $!";
+ $ios->[0] = $w;
+ }
+ my @fds = map fileno($_), @$ios;
+ my $buf = join("\0", @arg, '');
+ $n = PublicInbox::IPC::send_cmd($self->{io}, \@fds, $buf, MSG_EOR) //
+ die "send_cmd: $!";
+ $n == length($buf) or die "send_cmd: $n != ".length($buf);
+ $r;
+}
+
+sub start_helper {
+ my @argv = @_;
+ socketpair(my $sock, my $in, AF_UNIX, SOCK_SEQPACKET, 0) or
+ die "socketpair: $!";
+ my $cls = ($ENV{PI_NO_CXX} ? undef : eval {
+ require PublicInbox::XapHelperCxx;
+ PublicInbox::XapHelperCxx::check_build();
+ 'PublicInbox::XapHelperCxx';
+ }) // do {
+ require PublicInbox::XapHelper;
+ 'PublicInbox::XapHelper';
+ };
+ # ensure the child process has the same @INC we do:
+ my $env = { PERL5LIB => join(':', @INC) };
+ my $pid = spawn([$^X, ($^W ? ('-w') : ()), "-M$cls", '-e',
+ $cls.'::start(@ARGV)', '--', @argv],
+ $env, { 0 => $in });
+ ((bless { io => $sock, impl => $cls }, __PACKAGE__), $pid);
+}
+
+1;
diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm
new file mode 100644
index 00000000..bf2f99a2
--- /dev/null
+++ b/lib/PublicInbox/XapHelper.pm
@@ -0,0 +1,144 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Perl + SWIG||XS implementation if XapHelperCxx / xap_helper.h isn't usable.
+package PublicInbox::XapHelper;
+use v5.12;
+use Getopt::Long (); # good API even if we only use short options
+our $GLP = Getopt::Long::Parser->new;
+$GLP->configure(qw(require_order bundling no_ignore_case no_auto_abbrev));
+use PublicInbox::Search qw(xap_terms);
+use PublicInbox::IPC;
+my $X = \%PublicInbox::Search::X;
+our (%SRCH, %PIDS, $parent_pid);
+our $stderr = \*STDERR;
+
+# only short options for portability in C++ implementation
+our @SPEC = (
+ 'a', # ascending sort
+ 'c', # code search
+ 'd=s@', # shard dirs
+ 'k=i', # sort column (like sort(1))
+ 'm=i', # maximum number of results
+ 'o=i', # offset
+ 'r', # 1=relevance then column
+ 't', # collapse threads
+ 'A=s@', # prefixes
+ 'O=s', # eidx_key
+ 'T=i', # timeout in seconds
+);
+
+sub cmd_test_inspect {
+ my ($req) = @_;
+ print { $req->{0} } "pid=$$ has_threadid=",
+ ($req->{srch}->has_threadid ? 1 : 0)
+}
+
+sub cmd_dump_ibx {
+ my ($req, $ibx_id, $qry_str) = @_;
+ $qry_str // return warn('usage: dump_ibx [OPTIONS] IBX_ID QRY_STR');
+ my @pfx = @{$req->{A}} or return warn('dump_ibx requires -A PREFIX');
+ my $max = $req->{srch}->{xdb}->get_doccount;
+ my $opt = { relevance => -1, limit => $max, offset => $req->{o} // 0 };
+ $opt->{eidx_key} = $req->{O} if defined $req->{O};
+ my $mset = $req->{srch}->mset($qry_str, $opt);
+ my $out = $req->{0};
+ $out->autoflush(1);
+ for my $it ($mset->items) {
+ my $doc = $it->get_document;
+ for my $p (@pfx) {
+ for (xap_terms($p, $doc)) {
+ print $out "$_ $ibx_id\n" or die "print: $!";
+ }
+ }
+ }
+ if (my $err = $req->{1}) { say $err 'mset.size=', $mset->size }
+}
+
+sub dispatch {
+ my ($req, $cmd, @argv) = @_;
+ my $fn = $req->can("cmd_$cmd") or return;
+ $GLP->getoptionsfromarray(\@argv, $req, @SPEC) or return;
+ my $dirs = delete $req->{d} or return warn 'no -d args';
+ my $key = join("\0", @$dirs);
+ $req->{srch} = $SRCH{$key} //= do {
+ my $new = { qp_flags => $PublicInbox::Search::QP_FLAGS };
+ my $first = shift @$dirs;
+ my $slow_phrase = -f "$first/iamchert";
+ $new->{xdb} = $X->{Database}->new($first);
+ for (@$dirs) {
+ $slow_phrase ||= -f "$_/iamchert";
+ $new->{xdb}->add_database($X->{Database}->new($_));
+ }
+ $slow_phrase or
+ $new->{qp_flags} |= PublicInbox::Search::FLAG_PHRASE();
+ bless $new, $req->{c} ? 'PublicInbox::CodeSearch' :
+ 'PublicInbox::Search';
+ $new->{qp} = $new->qparse_new;
+ $new;
+ };
+ eval { $fn->($req, @argv) };
+ warn "E: $@" if $@;
+}
+
+sub recv_loop {
+ local $SIG{__WARN__} = sub { print $stderr @_ };
+ my $rbuf;
+ while (!defined($parent_pid) || getppid != $parent_pid) {
+ my $req = bless {}, __PACKAGE__;
+ my @fds = PublicInbox::IPC::recv_cmd(\*STDIN, $rbuf, 4096*33);
+ scalar(@fds) or exit(66); # EX_NOINPUT
+ $fds[0] // die "recvmsg: $!";
+ my $i = 0;
+ for my $fd (@fds) {
+ open($req->{$i++}, '+<&=', $fd) and next;
+ warn("open(+<&=$fd) (FD=$i): $!");
+ undef $req;
+ last;
+ }
+ $req or next;
+ local $stderr = $req->{1} // \*STDERR;
+ if (chop($rbuf) ne "\0") {
+ warn "not NUL-terminated";
+ next;
+ }
+ my @argv = split(/\0/, $rbuf);
+ eval { $req->dispatch(@argv) } if @argv;
+ }
+}
+
+sub start_worker ($) {
+ my ($nr) = @_;
+ my $pid = fork // return warn("fork: $!");
+ if ($pid == 0) {
+ undef %PIDS;
+ recv_loop();
+ exit(0);
+ } else {
+ $PIDS{$pid} = $nr;
+ }
+}
+
+sub start (@) {
+ my (@argv) = @_;
+ local (%SRCH, %PIDS, $parent_pid);
+ PublicInbox::Search::load_xapian();
+ $GLP->getoptionsfromarray(\@argv, my $opt = { j => 1 }, 'j=i') or
+ die 'bad args';
+ return recv_loop() if !$opt->{j};
+ die '-j must be >= 0' if $opt->{j} < 0;
+ start_worker($_) for (1..($opt->{j}));
+
+ my $quit;
+ until ($quit) {
+ my $p = waitpid(-1, 0) or return;
+ if (defined(my $nr = delete $PIDS{$p})) {
+ $quit = 1 if ($? >> 8) == 66; # EX_NOINPUT
+ start_worker($nr) if !$quit;
+ } else {
+ warn "W: unknown pid=$p reaped\n";
+ }
+ }
+}
+
+1;
diff --git a/lib/PublicInbox/XapHelperCxx.pm b/lib/PublicInbox/XapHelperCxx.pm
new file mode 100644
index 00000000..4571676b
--- /dev/null
+++ b/lib/PublicInbox/XapHelperCxx.pm
@@ -0,0 +1,93 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Just-ahead-of-time builder for the lib/PublicInbox/xap_helper.h shim.
+# I never want users to be without source code for repairs, so this
+# aims to replicate the feel of a scripting language using C++.
+# The resulting executable is not linked to Perl in any way.
+package PublicInbox::XapHelperCxx;
+use v5.12;
+use PublicInbox::Spawn;
+use PublicInbox::Search;
+my $dir = ($ENV{PERL_INLINE_DIRECTORY} //
+ die('BUG: PERL_INLINE_DIRECTORY unset')) . '/cxx';
+my $bin = "$dir/xap_helper";
+my ($srcpfx) = (__FILE__ =~ m!\A(.+/)[^/]+\z!);
+my @srcs = map { $srcpfx.$_ } qw(xap_helper.h);
+my @pm_dep = map { $srcpfx.$_ } qw(Search.pm CodeSearch.pm);
+my $xflags = ($ENV{CXXFLAGS} // '-Wall -ggdb3 -O0') . ' ' .
+ ($ENV{LDFLAGS} // '-Wl,-O1 -Wl,--compress-debug-sections=zlib') .
+ qq{ -DTHREADID=}.PublicInbox::Search::THREADID;
+
+sub xflags_chg () {
+ open my $fh, '<', "$dir/XFLAGS" or return 1;
+ chomp(my $prev = <$fh>);
+ $prev ne $xflags;
+}
+
+sub build () {
+ if (!-d $dir) {
+ my $err;
+ mkdir($dir) or $err = $!;
+ die "mkdir($dir): $err" if !-d $dir;
+ }
+ use autodie;
+ require File::Temp;
+ require PublicInbox::CodeSearch;
+ my ($prog) = ($bin =~ m!/([^/]+)\z!);
+ my $pkg_config = $ENV{PKG_CONFIG} // 'pkg-config';
+ my $tmp = File::Temp->newdir(DIR => $dir) // die "newdir: $!";
+ my $src = "$tmp/$prog.cpp";
+ open my $fh, '>', $src;
+ for (@srcs) {
+ say $fh qq(# line 1 "$_");
+ open my $rfh, '<', $_;
+ local $/;
+ print $fh readline($rfh);
+ }
+ print $fh PublicInbox::Search::generate_cxx();
+ print $fh PublicInbox::CodeSearch::generate_cxx();
+ close $fh;
+
+ my $cmd = "$pkg_config --libs --cflags xapian-core";
+ chomp(my $fl = `$cmd`);
+ die "$cmd failed: \$?=$?" if $?;
+ my $cxx = $ENV{CXX} // 'c++';
+ $cmd = "$cxx $src $fl $xflags -o $tmp/$prog";
+ system($cmd) and die "$cmd failed: \$?=$?";
+ my $cf = "$tmp/XFLAGS";
+ open $fh, '>', $cf;
+ say $fh $xflags;
+ close $fh;
+ # not quite atomic, but close enough :P
+ rename("$tmp/$_", "$dir/$_") for ($prog, 'XFLAGS');
+}
+
+sub check_build () {
+ use Time::HiRes qw(stat);
+ my $ctime = 0;
+ my @bin = stat($bin) or return build();
+ for (@srcs, @pm_dep) {
+ my @st = stat($_) or die "stat $_: $!";
+ if ($st[10] > $ctime) {
+ $ctime = $st[10];
+ return build() if $ctime > $bin[10];
+ }
+ }
+ xflags_chg() ? build() : 0;
+}
+
+sub start (@) {
+ check_build();
+ my @cmd;
+ if (my $v = $ENV{VALGRIND}) {
+ $v = 'valgrind -v' if $v eq '1';
+ @cmd = split(/\s+/, $v);
+ }
+ push @cmd, $bin, @_;
+ my $prog = $cmd[0];
+ $cmd[0] =~ s!\A.*?/([^/]+)\z!$1!;
+ exec { $prog } @cmd;
+}
+
+1;
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
new file mode 100644
index 00000000..52db92b7
--- /dev/null
+++ b/lib/PublicInbox/xap_helper.h
@@ -0,0 +1,654 @@
+/*
+ * Copyright (C) all contributors <meta@public-inbox.org>
+ * License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+ *
+ * Standalone helper process using C and minimal C++ for Xapian,
+ * this is not linked to Perl in any way.
+ * C (not C++) is used as much as possible to lower the contribution
+ * barrier for hackers who mainly know C (this includes the maintainer).
+ * Everything here is an unstable internal API of public-inbox and
+ * NOT intended for ordinary users; only public-inbox hackers
+ */
+#ifndef _ALL_SOURCE
+# define _ALL_SOURCE
+#endif
+#include <sys/resource.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <sys/wait.h>
+
+#include <assert.h>
+#include <err.h> // BSD, glibc, and musl all have this
+#include <errno.h>
+#include <fcntl.h>
+#include <limits.h>
+#include <search.h>
+#include <stdio.h>
+#include <string.h>
+#include <sysexits.h>
+#include <unistd.h>
+#include <xapian.h> // our only reason for using C++
+
+#define MY_VER(maj,min,rev) ((maj) << 16 | (min) << 8 | (rev))
+#define XAP_VER \
+ MY_VER(XAPIAN_MAJOR_VERSION,XAPIAN_MINOR_VERSION,XAPIAN_REVISION)
+
+#if XAP_VER >= MY_VER(1,3,6)
+# define NRP Xapian::NumberRangeProcessor
+# define ADD_RP add_rangeprocessor
+# define SET_MAX_EXPANSION set_max_expansion // technically 1.3.3
+#else
+# define NRP Xapian::NumberValueRangeProcessor
+# define ADD_RP add_valuerangeprocessor
+# define SET_MAX_EXPANSION set_max_wildcard_expansion
+#endif
+
+static const int sock_fd = 0; // SOCK_SEQPACKET as stdin :P
+static pid_t parent_pid;
+static FILE *orig_err = stderr;
+static void *srch_tree; // tsearch + tdelete + twalk
+static pid_t *worker_pids; // nr => pid
+static unsigned long nworker;
+
+// PublicInbox::Search and PublicInbox::CodeSearch generate these:
+static void mail_nrp_init(void);
+static void code_nrp_init(void);
+static void qp_init_mail_search(Xapian::QueryParser *);
+static void qp_init_code_search(Xapian::QueryParser *);
+
+struct srch {
+ int paths_len; // int for comparisons
+ unsigned qp_flags;
+ Xapian::Database *db;
+ Xapian::QueryParser *qp;
+ char paths[]; // $shard_path0\0$shard_path1\0...
+};
+
+#define MY_ARG_MAX 256
+typedef bool (*cmd)(struct req *);
+
+// only one request per-process since we have RLIMIT_CPU timeout
+struct req { // argv and pfxv point into global rbuf
+ char *argv[MY_ARG_MAX];
+ char *pfxv[MY_ARG_MAX]; // -A <prefix>
+ struct srch *srch;
+ char *Oeidx_key;
+ cmd fn;
+ unsigned long long max;
+ unsigned long long off;
+ unsigned long timeout_sec;
+ long sort_col; // value column, negative means BoolWeight
+ int argc;
+ int pfxc;
+ FILE *fp[2]; // [0] response pipe or sock, [1] status/errors (optional)
+ bool has_input; // fp[0] is bidirectional
+ bool collapse_threads;
+ bool code_search;
+ bool relevance; // sort by relevance before column
+ bool asc; // ascending sort
+};
+
+struct worker {
+ pid_t pid;
+ unsigned nr;
+};
+
+static bool has_threadid(const struct srch *srch)
+{
+ return srch->db->get_metadata("has_threadid") == "1";
+}
+
+static Xapian::Enquire prep_enquire(const struct req *req)
+{
+ Xapian::Enquire enq(*req->srch->db);
+ if (req->sort_col < 0) {
+ enq.set_weighting_scheme(Xapian::BoolWeight());
+ enq.set_docid_order(req->asc ? Xapian::Enquire::ASCENDING
+ : Xapian::Enquire::DESCENDING);
+ } else if (req->relevance) {
+ enq.set_sort_by_relevance_then_value(req->sort_col, !req->asc);
+ } else {
+ enq.set_sort_by_value_then_relevance(req->sort_col, !req->asc);
+ }
+ return enq;
+}
+
+static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq)
+{
+ if (!req->max)
+ req->max = 50;
+ for (int i = 0; i < 9; i++) {
+ try {
+ Xapian::MSet mset = enq->get_mset(req->off, req->max);
+ return mset;
+ } catch (const Xapian::DatabaseModifiedError & e) {
+ req->srch->db->reopen();
+ }
+ }
+ return enq->get_mset(req->off, req->max);
+}
+
+static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
+{
+ struct srch *srch = req->srch;
+ Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
+ if (req->Oeidx_key) {
+ req->Oeidx_key[0] = 'O'; // modifies static rbuf
+ fprintf(stderr, "dbg eidxkey:%s>\n", req->Oeidx_key);
+ qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
+ Xapian::Query(req->Oeidx_key));
+ }
+ Xapian::Enquire enq = prep_enquire(req);
+ enq.set_query(qry);
+ // THREADID is a CPP macro defined on CLI (see) XapHelperCxx.pm
+ if (req->collapse_threads && has_threadid(srch))
+ enq.set_collapse_key(THREADID);
+
+ return enquire_mset(req, &enq);
+}
+
+static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
+{
+ return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
+}
+
+static void dump_ibx_term(struct req *req, const char *pfx,
+ Xapian::Document *doc, const char *ibx_id)
+{
+ Xapian::TermIterator cur = doc->termlist_begin();
+ Xapian::TermIterator end = doc->termlist_end();
+ size_t pfx_len = strlen(pfx);
+
+ for (cur.skip_to(pfx); cur != end; cur++) {
+ std::string tn = *cur;
+
+ if (starts_with(&tn, pfx, pfx_len))
+ fprintf(req->fp[0], "%s %s\n",
+ tn.c_str() + pfx_len, ibx_id);
+ }
+}
+
+static int my_setlinebuf(FILE *fp) // glibc setlinebuf(3) can't report errors
+{
+ return setvbuf(fp, NULL, _IOLBF, 0);
+}
+
+static bool cmd_dump_ibx(struct req *req)
+{
+ if ((optind + 1) >= req->argc) {
+ warnx("usage: dump_ibx [OPTIONS] IBX_ID QRY_STR");
+ return false; // need ibx_id + qry_str
+ }
+ if (!req->pfxc) {
+ warnx("dump_ibx requires -A PREFIX");
+ return false;
+ }
+
+ const char *ibx_id = req->argv[optind];
+ if (my_setlinebuf(req->fp[0])) { // for sort(1) pipe
+ perror("setlinebuf(fp[0])");
+ return false;
+ }
+ req->asc = true;
+ req->sort_col = -1;
+ req->max = (unsigned long long)req->srch->db->get_doccount();
+ Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]);
+ for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
+ try {
+ Xapian::Document doc = i.get_document();
+ for (int p = 0; p < req->pfxc; p++)
+ dump_ibx_term(req, req->pfxv[p], &doc, ibx_id);
+ } catch (const Xapian::Error & e) {
+ fprintf(orig_err, "W: %s (#%ld)\n",
+ e.get_description().c_str(), (long)(*i));
+ continue;
+ }
+ }
+ if (req->fp[1])
+ fprintf(req->fp[1], "mset.size=%llu\n",
+ (unsigned long long)mset.size());
+ return true;
+}
+
+// internal usage only
+static bool cmd_test_inspect(struct req *req)
+{
+ fprintf(req->fp[0], "pid=%d has_threadid=%d",
+ (int)getpid(), has_threadid(req->srch) ? 1 : 0);
+ return true;
+}
+
+#define CMD(n) { .fn_len = sizeof(#n) - 1, .fn_name = #n, .fn = cmd_##n }
+static const struct cmd_entry {
+ size_t fn_len;
+ const char *fn_name;
+ cmd fn;
+} cmds[] = { // should be small enough to not need bsearch || gperf
+ // most common commands first
+ CMD(dump_ibx),
+ CMD(test_inspect), // least common commands last
+};
+
+#define MY_ARRAY_SIZE(x) (sizeof(x)/sizeof((x)[0]))
+#define RECV_FD_CAPA 2
+#define RECV_FD_SPACE (RECV_FD_CAPA * sizeof(int))
+union my_cmsg {
+ struct cmsghdr hdr;
+ char pad[sizeof(struct cmsghdr) + 16 + RECV_FD_SPACE];
+};
+
+static void xclose(int fd)
+{
+ if (close(fd) < 0 && errno != EINTR)
+ err(EXIT_FAILURE, "BUG: close");
+}
+
+static bool recv_req(struct req *req, char *rbuf, size_t *len)
+{
+ union my_cmsg cmsg = { 0 };
+ struct msghdr msg = { .msg_iovlen = 1 };
+ struct iovec iov;
+ iov.iov_base = rbuf;
+ iov.iov_len = *len;
+ msg.msg_iov = &iov;
+ msg.msg_control = &cmsg.hdr;
+ msg.msg_controllen = CMSG_SPACE(RECV_FD_SPACE);
+
+ ssize_t r = recvmsg(sock_fd, &msg, 0);
+ if (r < 0)
+ err(EXIT_FAILURE, "recvmsg");
+ if (r == 0)
+ exit(EX_NOINPUT); /* grandparent went away */
+ *len = r;
+ if (r > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
+ cmsg.hdr.cmsg_type == SCM_RIGHTS) {
+ size_t len = cmsg.hdr.cmsg_len;
+ int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
+ size_t i;
+ bool fd_ok = true;
+ for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++) {
+ int fd = *fdp++;
+ const char *mode = NULL;
+ int fl = fd_ok ? fcntl(fd, F_GETFL) : 0;
+ switch (fl) {
+ case 0: break; // hit previous error
+ case -1:
+ warnx("invalid fd=%d", fd);
+ fd_ok = false;
+ break;
+ case O_WRONLY: mode = "w"; break;
+ case O_RDWR:
+ mode = "r+";
+ if (i == 0) req->has_input = true;
+ break;
+ default:
+ warnx("invalid mode from F_GETFL: 0x%x", fl);
+ fd_ok = false;
+ }
+ if (!fd_ok) {
+ xclose(fd);
+ } else {
+ req->fp[i] = fdopen(fd, mode);
+ if (!req->fp[i]) {
+ warn("fdopen(fd=%d)", fd);
+ fd_ok = false;
+ }
+ }
+ }
+ for (i = 0; !fd_ok && i < MY_ARRAY_SIZE(req->fp); i++)
+ if (req->fp[i]) fclose(req->fp[i]);
+ return fd_ok;
+ }
+ warnx("no FD received in %zd-byte request", r);
+ return false;
+}
+
+#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst))
+static int split2argv(char **dst, char *buf, size_t len, size_t limit)
+{
+ if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) {
+ warnx("bogus argument given");
+ return 0;
+ }
+ size_t nr = 0;
+ char *c = buf;
+ for (size_t i = 1; i < len; i++) {
+ if (!buf[i]) {
+ dst[nr++] = c;
+ c = buf + i + 1;
+ }
+ if (nr == limit) {
+ warnx("too many args: %zu", nr);
+ return 0;
+ }
+ }
+ return (int)nr;
+}
+
+static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch
+{
+ const struct srch *a = (const struct srch *)pa;
+ const struct srch *b = (const struct srch *)pb;
+ int diff = a->paths_len - b->paths_len;
+
+ return diff ? diff : memcmp(a->paths, b->paths, (size_t)a->paths_len);
+}
+
+static bool is_chert(const char *dir)
+{
+ char iamchert[PATH_MAX];
+ struct stat sb;
+ int rc = snprintf(iamchert, sizeof(iamchert), "%s/iamchert", dir);
+
+ if (rc <= 0 || rc >= (int)sizeof(iamchert))
+ err(EXIT_FAILURE, "BUG: snprintf(%s/iamchert)", dir);
+ if (stat(iamchert, &sb) == 0 && S_ISREG(sb.st_mode))
+ return true;
+ return false;
+}
+
+static bool srch_init(struct req *req)
+{
+ char *dirv[MY_ARG_MAX];
+ int i;
+ struct srch *srch = req->srch;
+ int dirc = SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len);
+ const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE;
+ srch->qp_flags = FLAG_PHRASE |
+ Xapian::QueryParser::FLAG_BOOLEAN |
+ Xapian::QueryParser::FLAG_LOVEHATE |
+ Xapian::QueryParser::FLAG_WILDCARD;
+ if (is_chert(dirv[0]))
+ srch->qp_flags &= ~FLAG_PHRASE;
+ try {
+ srch->db = new Xapian::Database(dirv[0]);
+ } catch (...) {
+ warn("E: Xapian::Database(%s)", dirv[0]);
+ return false;
+ }
+ try {
+ for (i = 1; i < dirc; i++) {
+ if (srch->qp_flags & FLAG_PHRASE && is_chert(dirv[i]))
+ srch->qp_flags &= ~FLAG_PHRASE;
+ srch->db->add_database(Xapian::Database(dirv[i]));
+ }
+ } catch (...) {
+ warn("E: add_database(%s)", dirv[i]);
+ return false;
+ }
+ try {
+ srch->qp = new Xapian::QueryParser;
+ } catch (...) {
+ perror("E: Xapian::QueryParser");
+ return false;
+ }
+ srch->qp->set_default_op(Xapian::Query::OP_AND);
+ srch->qp->set_database(*srch->db);
+ try {
+ srch->qp->set_stemmer(Xapian::Stem("english"));
+ } catch (...) {
+ perror("E: Xapian::Stem");
+ return false;
+ }
+ srch->qp->set_stemming_strategy(Xapian::QueryParser::STEM_SOME);
+ srch->qp->SET_MAX_EXPANSION(100);
+
+ if (req->code_search)
+ qp_init_code_search(srch->qp); // CodeSearch.pm
+ else
+ qp_init_mail_search(srch->qp); // Search.pm
+ return true;
+}
+
+static void free_srch(void *p) // tdestroy
+{
+ struct srch *srch = (struct srch *)p;
+ delete srch->qp;
+ delete srch->db;
+ free(srch);
+}
+
+static void dispatch(struct req *req)
+{
+ int c;
+ size_t size = strlen(req->argv[0]);
+ union {
+ struct srch *srch;
+ char *ptr;
+ } fbuf;
+ char *end;
+ FILE *kfp;
+ struct srch **s;
+ req->fn = NULL;
+ for (c = 0; c < (int)MY_ARRAY_SIZE(cmds); c++) {
+ if (cmds[c].fn_len == size &&
+ !memcmp(cmds[c].fn_name, req->argv[0], size)) {
+ req->fn = cmds[c].fn;
+ break;
+ }
+ }
+ if (!req->fn) goto cmd_err;
+
+ kfp = open_memstream(&fbuf.ptr, &size);
+ // write padding, first
+ fwrite(&req->argv[0], offsetof(struct srch, paths), 1, kfp);
+
+ // global getopt variables:
+ optind = 1;
+ opterr = optopt = 0;
+ optarg = NULL;
+
+ // keep sync with @PublicInbox::XapHelper::SPEC
+ while ((c = getopt(req->argc, req->argv, "acd:k:m:o:rtA:O:T:")) != -1) {
+ switch (c) {
+ case 'a': req->asc = true; break;
+ case 'c': req->code_search = true; break;
+ case 'd': fwrite(optarg, strlen(optarg) + 1, 1, kfp); break;
+ case 'k':
+ req->sort_col = strtol(optarg, &end, 10);
+ if (*end) goto cmd_err;
+ switch (req->sort_col) {
+ case LONG_MAX: case LONG_MIN: goto cmd_err;
+ }
+ break;
+ case 'm':
+ req->max = strtoull(optarg, &end, 10);
+ if (*end) goto cmd_err;
+ if (req->max == ULLONG_MAX) goto cmd_err;
+ break;
+ case 'o':
+ req->off = strtoull(optarg, &end, 10);
+ if (*end) goto cmd_err;
+ if (req->off == ULLONG_MAX) goto cmd_err;
+ break;
+ case 'r': req->relevance = true; break;
+ case 't': req->collapse_threads = true; break;
+ case 'A':
+ req->pfxv[req->pfxc++] = optarg;
+ if (MY_ARG_MAX == req->pfxc) goto cmd_err;
+ break;
+ case 'O': req->Oeidx_key = optarg - 1; break; // pad "O" prefix
+ case 'T':
+ req->timeout_sec = strtoul(optarg, &end, 10);
+ if (*end) goto cmd_err;
+ if (req->timeout_sec == ULONG_MAX) goto cmd_err;
+ break;
+ default: goto cmd_err;
+ }
+ }
+ if (ferror(kfp) | fclose(kfp)) {
+ perror("ferror|fclose");
+ goto cmd_err;
+ }
+ fbuf.srch->db = NULL;
+ fbuf.srch->qp = NULL;
+ fbuf.srch->paths_len = size - offsetof(struct srch, paths);
+ if (fbuf.srch->paths_len <= 0) {
+ free_srch(fbuf.srch);
+ warnx("no -d args");
+ goto cmd_err;
+ }
+ s = (struct srch **)tsearch(fbuf.srch, &srch_tree, srch_cmp);
+ if (!s) {
+ perror("tsearch");
+ goto cmd_err;
+ }
+ req->srch = *s;
+ if (req->srch != fbuf.srch) { // reuse existing
+ free_srch(fbuf.srch);
+ } else if (!srch_init(req)) {
+ assert(fbuf.srch == *((struct srch **)tfind(
+ fbuf.srch, &srch_tree, srch_cmp)));
+ void *del = tdelete(fbuf.srch, &srch_tree, srch_cmp);
+ assert(del);
+ free_srch(fbuf.srch);
+ goto cmd_err;
+ }
+ try {
+ if (!req->fn(req))
+ goto cmd_err;
+ } catch (const Xapian::Error & e) {
+ warnx("Xapian::Error: %s", e.get_description().c_str());
+ } catch (...) {
+ warn("unhandled exception");
+ }
+cmd_err:
+ return; // just be silent on errors, for now
+}
+
+static void cleanup_pids(void)
+{
+ free(worker_pids);
+ worker_pids = NULL;
+}
+
+static void recv_loop(void) // worker process loop
+{
+ static char rbuf[4096 * 33]; // per-process
+ while (!parent_pid || getppid() == parent_pid) {
+ size_t len = sizeof(rbuf);
+ struct req req = { 0 };
+ if (!recv_req(&req, rbuf, &len))
+ continue;
+ if (req.fp[1]) {
+ if (my_setlinebuf(req.fp[1]))
+ perror("W: setlinebuf(req.fp[1])");
+ stderr = req.fp[1];
+ }
+ req.argc = SPLIT2ARGV(req.argv, rbuf, len);
+ if (req.argc > 0)
+ dispatch(&req);
+ if (ferror(req.fp[0]) | fclose(req.fp[0]))
+ perror("ferror|fclose fp[0]");
+ if (req.fp[1]) {
+ stderr = orig_err;
+ if (ferror(req.fp[1]) | fclose(req.fp[1]))
+ perror("ferror|fclose fp[1]");
+ }
+ }
+}
+
+static void insert_pid(pid_t pid, unsigned nr)
+{
+ assert(!worker_pids[nr]);
+ worker_pids[nr] = pid;
+}
+
+static int delete_pid(pid_t pid)
+{
+ for (unsigned nr = 0; nr < nworker; nr++) {
+ if (worker_pids[nr] == pid) {
+ worker_pids[nr] = 0;
+ return nr;
+ }
+ }
+ warnx("W: unknown pid=%d reaped", (int)pid);
+ return -1;
+}
+
+static void start_worker(unsigned nr)
+{
+ pid_t pid = fork();
+ if (pid < 0) {
+ warn("E: fork(worker=%u)", nr);
+ } else if (pid > 0) {
+ insert_pid(pid, nr);
+ } else {
+ cleanup_pids();
+ recv_loop();
+ exit(0);
+ }
+}
+
+static void cleanup_all(void)
+{
+ cleanup_pids();
+#ifdef __GLIBC__
+ tdestroy(srch_tree, free_srch);
+ srch_tree = NULL;
+#endif
+}
+
+int main(int argc, char *argv[])
+{
+ int c;
+
+ mail_nrp_init();
+ code_nrp_init();
+ atexit(cleanup_all);
+
+ nworker = 0;
+#ifdef _SC_NPROCESSORS_ONLN
+ long j = sysconf(_SC_NPROCESSORS_ONLN);
+ if (j > 0)
+ nworker = j > UCHAR_MAX ? UCHAR_MAX : j;
+#endif // _SC_NPROCESSORS_ONLN
+
+ // make warn/warnx/err multi-process friendly:
+ if (my_setlinebuf(stderr))
+ err(EXIT_FAILURE, "setlinebuf(stderr)");
+ // not using -W<workers> like Daemon.pm, since -W is reserved (glibc)
+ while ((c = getopt(argc, argv, "j:")) != -1) {
+ char *end;
+
+ switch (c) {
+ case 'j':
+ nworker = strtoul(optarg, &end, 10);
+ if (*end != 0 || nworker > USHRT_MAX)
+ errx(EXIT_FAILURE, "-j %s invalid", optarg);
+ break;
+ case ':':
+ errx(EXIT_FAILURE, "missing argument: `-%c'", optopt);
+ case '?':
+ errx(EXIT_FAILURE, "unrecognized: `-%c'", optopt);
+ default:
+ errx(EXIT_FAILURE, "BUG: `-%c'", c);
+ }
+ }
+ if (nworker == 0) {
+ recv_loop();
+ } else {
+ parent_pid = getpid();
+ worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t));
+ if (!worker_pids)
+ err(EXIT_FAILURE, "calloc");
+ for (unsigned i = 0; i < nworker; i++)
+ start_worker(i);
+
+ int st;
+ pid_t pid;
+ bool quit = false;
+ while ((pid = wait(&st)) > 0) {
+ int nr = delete_pid(pid);
+ if (nr < 0) continue;
+ if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT)
+ quit = true;
+ if (!quit)
+ start_worker(nr);
+ }
+ }
+ return 0;
+}
diff --git a/t/xap_helper.t b/t/xap_helper.t
new file mode 100644
index 00000000..f00a845a
--- /dev/null
+++ b/t/xap_helper.t
@@ -0,0 +1,147 @@
+#!perl -w
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use v5.12;
+use PublicInbox::TestCommon;
+require_mods(qw(DBD::SQLite Search::Xapian));
+use PublicInbox::Spawn qw(spawn);
+use Socket qw(AF_UNIX SOCK_SEQPACKET SOCK_STREAM MSG_EOR);
+require PublicInbox::AutoReap;
+require PublicInbox::IPC;
+require PublicInbox::XapClient;
+use autodie;
+my ($tmp, $for_destroy) = tmpdir();
+
+my $fi_data = './t/git.fast-import-data';
+open my $fi_fh, '<', $fi_data;
+open my $dh, '<', '.';
+my $crepo = create_coderepo 'for-cindex', sub {
+ my ($d) = @_;
+ xsys_e([qw(git init -q --bare)]);
+ xsys_e([qw(git fast-import --quiet)], undef, { 0 => $fi_fh });
+ chdir($dh);
+ run_script([qw(-cindex --dangerous -L medium --no-fsync -q -j1), $d])
+ or xbail '-cindex internal';
+ run_script([qw(-cindex --dangerous -L medium --no-fsync -q -j3 -d),
+ "$d/cidx-ext", $d]) or xbail '-cindex "external"';
+};
+$dh = $fi_fh = undef;
+
+my $v2 = create_inbox 'v2', indexlevel => 'medium', version => 2,
+ tmpdir => "$tmp/v2", sub {
+ my ($im) = @_;
+ for my $f (qw(t/data/0001.patch t/data/binary.patch
+ t/data/message_embed.eml
+ t/solve/0001-simple-mod.patch
+ t/solve/0002-rename-with-modifications.patch
+ t/solve/bare.patch)) {
+ $im->add(eml_load($f)) or BAIL_OUT;
+ }
+};
+
+my @ibx_idx = glob("$v2->{inboxdir}/xap*/?");
+my (@int) = glob("$crepo/public-inbox-cindex/cidx*/?");
+my (@ext) = glob("$crepo/cidx-ext/cidx*/?");
+is(scalar(@ext), 2, 'have 2 external shards') or diag explain(\@ext);
+is(scalar(@int), 1, 'have 1 internal shard') or diag explain(\@int);
+
+my $doreq = sub {
+ my ($s, @arg) = @_;
+ my $err = pop @arg if ref($arg[-1]);
+ pipe(my $x, my $y);
+ my $buf = join("\0", @arg, '');
+ my @fds = fileno($y);
+ push @fds, fileno($err) if $err;
+ my $n = PublicInbox::IPC::send_cmd($s, \@fds, $buf, MSG_EOR);
+ $n // xbail "send: $!";
+ my $arg = "@arg";
+ $arg =~ s/\Q$tmp\E/\$TMP/gs;
+ is(length($buf), $n, "req $arg sent");
+ $x;
+};
+
+my $env = { PERL5LIB => join(':', @INC) };
+my $test = sub {
+ my (@arg) = @_;
+ socketpair(my $s, my $y, AF_UNIX, SOCK_SEQPACKET, 0);
+ my $pid = spawn([$^X, '-w', @arg], $env, { 0 => $y });
+ my $ar = PublicInbox::AutoReap->new($pid);
+ diag "$arg[-1] running pid=$pid";
+ close $y;
+ my $r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0]);
+ my %info = map { split(/=/, $_, 2) } split(/ /, do { local $/; <$r> });
+ is($info{has_threadid}, '1', 'has_threadid true for inbox');
+ like($info{pid}, qr/\A\d+\z/, 'got PID from inbox inspect');
+
+ $r = $doreq->($s, qw(test_inspect -d), $int[0]);
+ my %cinfo = map { split(/=/, $_, 2) } split(/ /, do { local $/; <$r> });
+ is($cinfo{has_threadid}, '0', 'has_threadid false for cindex');
+ is($cinfo{pid}, $info{pid}, 'PID unchanged for cindex');
+
+ my @dump = (qw(dump_ibx -A XDFID), (map { ('-d', $_) } @ibx_idx),
+ qw(13 rt:0..));
+ $r = $doreq->($s, @dump);
+ my @res;
+ while (sysread($r, my $buf, 512) != 0) { push @res, $buf }
+ is(grep(/\n\z/s, @res), scalar(@res), 'line buffered');
+
+ pipe(my $err_rd, my $err_wr);
+ $r = $doreq->($s, @dump, $err_wr);
+ close $err_wr;
+ my $res = do { local $/; <$r> };
+ is(join('', @res), $res, 'got identical response w/ error pipe');
+ my $stats = do { local $/; <$err_rd> };
+ is($stats, "mset.size=6\n", 'mset.size reported');
+
+ if ($arg[-1] !~ /\('-j0'\)/) {
+ kill('KILL', $cinfo{pid});
+ $r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0]);
+ %info = map {
+ split(/=/, $_, 2)
+ } split(/ /, do { local $/; <$r> });
+ isnt($info{pid}, $cinfo{pid}, 'spawned new worker');
+ }
+ $ar;
+};
+my $ar;
+
+$ar = $test->(qw[-MPublicInbox::XapHelper -e
+ PublicInbox::XapHelper::start('-j0')]);
+$ar = $test->(qw[-MPublicInbox::XapHelper -e
+ PublicInbox::XapHelper::start('-j1')]);
+
+my @NO_CXX = (0);
+SKIP: {
+ eval {
+ require PublicInbox::XapHelperCxx;
+ PublicInbox::XapHelperCxx::check_build();
+ };
+ skip "XapHelperCxx build: $@", 1 if $@;
+ push @NO_CXX, 1;
+
+ $ar = $test->(qw[-MPublicInbox::XapHelperCxx -e
+ PublicInbox::XapHelperCxx::start('-j0')]);
+ $ar = $test->(qw[-MPublicInbox::XapHelperCxx -e
+ PublicInbox::XapHelperCxx::start('-j1')]);
+};
+
+for my $n (@NO_CXX) {
+ local $ENV{PI_NO_CXX} = $n;
+ my ($xhc, $pid) = PublicInbox::XapClient::start_helper('-j0');
+ $ar = PublicInbox::AutoReap->new($pid);
+ pipe(my $err_r, my $err_w);
+
+ # git patch-id --stable <t/data/0001.patch | awk '{print $1}'
+ my $dfid = '91ee6b761fc7f47cad9f2b09b10489f313eb5b71';
+ my $mid = '20180720072141.GA15957@example';
+ my $r = $xhc->mkreq([ undef, $err_w ], qw(dump_ibx -A XDFID -A Q),
+ (map { ('-d', $_) } @ibx_idx),
+ 9, "mid:$mid");
+ close $err_w;
+ my $res = do { local $/; <$r> };
+ is($res, "$dfid 9\n$mid 9\n", "got expected result ($xhc->{impl})");
+ my $err = do { local $/; <$err_r> };
+ is($err, "mset.size=1\n", "got expected status ($xhc->{impl})");
+}
+
+done_testing;
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH 5/7] cindex: fix sorting and uniqueness
2023-08-24 1:22 [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
` (3 preceding siblings ...)
2023-08-24 1:22 ` [PATCH 4/7] introduce optional C++ xap_helper Eric Wong
@ 2023-08-24 1:22 ` Eric Wong
2023-08-24 1:22 ` [PATCH 6/7] cindex: implement dump_roots in C++ Eric Wong
` (2 subsequent siblings)
7 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2023-08-24 1:22 UTC (permalink / raw)
To: meta
We can't rely on combining the `-u' and `-k1,1' switches of POSIX
sort(1) to do what we want. So only rely on `sort -k1,1' while
introducing a small Perl helper to fold identical prefixes into
one line. In other words, input such as:
deadbeef 0
deadbeef 1
deadbeef 2
Was getting deduplicated into a single line:
deadbeef 0
... with `sort -u -k1,1'
This makes puts the output into a more optimal form for eventual
(not-fully-implemented-yet) parsing:
deadbeef 0,1,2
ORS is current the comma (`,') for inbox IDs, but it'll be a
space (` ') for coderepo root IDs. This implementation also
combines identical IDs in the 2nd column. Thus:
deadbeef 0
deadbeef 0
Becomes a single `deadbeef 0' line thanks to the use of
XS List::Util::uniq (which beats a pure Perl hash).
I attempted to implement this in awk but Perl is close enough to
gawk in performance while being shorter and easier-to-understand
due to List::Util::uniq. mawk was faster, but still not enough
to matter as the bottleneck is from iterating through Xapian
MSets.
---
lib/PublicInbox/CodeSearchIdx.pm | 63 +++++++++++++++++++++++---------
1 file changed, 45 insertions(+), 18 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index b8afecd2..4a41b1da 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -21,12 +21,14 @@
#
# * to_root_id - each line is of the format:
#
-# $PFX $ROOT_COMMIT_OID_ID
+# $PFX $ROOT_COMMIT_OID_IDS
#
# * to_ibx_id - each line is of the format:
#
-# $PFX $IBX_ID
+# $PFX $IBX_IDS
#
+# $IBX_IDS is a comma-delimited list of integers ($IBX_ID)
+# $ROOT_COMMIT_OID_IDS is space-delimited
# In both cases, $PFX is typically the value of the patchid (XDFID) but it
# can be configured to use any combination of patchid, dfpre, dfpost or
# dfblob.
@@ -134,6 +136,20 @@ sub new {
$self;
}
+# This is similar to uniq(1) on the first column, but combines the
+# contents of subsequent columns using $OFS.
+our @UNIQ_FOLD = ($^X, $^W ? ('-w') : (), qw(-MList::Util=uniq -ane), <<'EOM');
+BEGIN { $ofs = $ENV{OFS} // ','; $apfx = '' }
+if ($F[0] eq $apfx) {
+ shift @F;
+ push @ids, @F;
+} else {
+ print $apfx.' '.join($ofs, uniq(@ids))."\n" if @ids;
+ ($apfx, @ids) = @F;
+}
+END { print $apfx.' '.join($ofs, uniq(@ids))."\n" if @ids }
+EOM
+
# TODO: may be used for reshard/compact
sub count_shards { scalar($_[0]->xdb_shards_flat) }
@@ -519,16 +535,21 @@ sub dump_roots_once {
@ID2ROOT = map { pack('H*', $_) } $self->all_terms('G');
my $id = 0;
my %root2id = map { $_ => $id++ } @ID2ROOT;
- pipe(my ($r, $w)) or die "pipe: $!";
+ # dump_shard_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_id
+ pipe(my ($sort_r, $sort_w)) or die "pipe: $!";
+ pipe(my ($fold_r, $fold_w)) or die "pipe: $!";
my @sort = (@SORT, '-k1,1');
my $dst = "$TMPDIR/to_root_id";
open my $fh, '>', $dst or die "open($dst): $!";
- my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $r, 1 => $fh });
- close $r or die "close: $!";
+ my $env = { %$CMD_ENV, OFS => ' ' };
+ my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fold_w });
+ my $fold_pid = spawn(\@UNIQ_FOLD, $env, { 0 => $fold_r, 1 => $fh });
awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
+ awaitpid($fold_pid, \&cmd_done, [@UNIQ_FOLD, '(shards)'], $associate);
my ($c, $p) = PublicInbox::PktOp->pair;
$c->{ops}->{dump_shard_roots_done} = [ $self, $associate ];
- my @arg = ('dump_shard_roots', [ $p->{op_p}, $w ], \%root2id, $QRY_STR);
+ my @arg = ('dump_shard_roots', [ $p->{op_p}, $sort_w ],
+ \%root2id, $QRY_STR);
$_->wq_io_do(@arg) for @IDX_SHARDS;
progress($self, 'waiting on dump_shard_roots sort');
}
@@ -549,12 +570,15 @@ sub dump_ibx { # sends to xap_helper.h
sub dump_ibx_start {
my ($self, $associate) = @_;
pipe(my $sort_r, $DUMP_IBX_WPIPE) or die "pipe: $!";
- my @sort = (@SORT, '-k1,1');
+ pipe(my ($fold_r, $fold_w)) or die "pipe: $!";
+ my @sort = (@SORT, '-k1,1'); # sort only on ASSOC_PFX
+ # pipeline: dump_ibx | sort -k1,1 | uniq_fold >to_ibx_id
my $dst = "$TMPDIR/to_ibx_id";
open my $fh, '>', $dst or die "open($dst): $!";
- my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fh });
- close $sort_r or die "close: $!";
+ my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fold_w });
+ my $fold_pid = spawn(\@UNIQ_FOLD, $CMD_ENV, { 0 => $fold_r, 1 => $fh });
awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
+ awaitpid($fold_pid, \&cmd_done, [@UNIQ_FOLD, '(ibx)'], $associate);
($XHC, $XH_PID) = PublicInbox::XapClient::start_helper("-j$NPROC");
}
@@ -869,9 +893,11 @@ sub associate {
my @join = ('time', @JOIN, 'to_ibx_id', 'to_root_id');
my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" });
my %score;
- while (<$rd>) { # PFX ibx_id root_id
- my (undef, $ibx_id, @root_id) = split(/ /, $_);
- ++$score{"$ibx_id $_"} for @root_id;
+ while (<$rd>) { # PFX ibx_ids root_id
+ my (undef, $ibx_ids, @root_ids) = split(/ /, $_);
+ for my $ibx_id (split(/,/, $ibx_ids)) {
+ ++$score{"$ibx_id $_"} for @root_ids;
+ }
}
close $rd or die "@join failed: $?=$?";
my $min = $self->{-opt}->{'assoc-min'} // 10;
@@ -939,9 +965,10 @@ sub init_prune ($) {
my $run_prune = PublicInbox::OnDestroy->new($$, \&run_prune, $self);
pipe(my ($sed_in, $delve_out)) or die "pipe: $!";
pipe(my ($sort_in, $sed_out)) or die "pipe: $!";
+ my @sort_u = (@SORT, '-u');
open(my $sort_out, '+>', "$TMPDIR/indexed_commits") or die "open: $!";
- my $pid = spawn(\@SORT, $CMD_ENV, { 0 => $sort_in, 1 => $sort_out });
- awaitpid($pid, \&cmd_done, \@SORT, $run_prune);
+ my $pid = spawn(\@sort_u, $CMD_ENV, { 0 => $sort_in, 1 => $sort_out });
+ awaitpid($pid, \&cmd_done, \@sort_u, $run_prune);
$pid = spawn(\@sed, $CMD_ENV, { 0 => $sed_in, 1 => $sed_out });
awaitpid($pid, \&cmd_done, \@sed, $run_prune);
$pid = spawn(\@delve, undef, { 1 => $delve_out });
@@ -971,12 +998,13 @@ sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done
pipe(my ($sort_in, $awk_out)) or die "pipe: $!";
pipe(my ($comm_in, $sort_out)) or die "pipe: $!";
my $awk_pid = spawn(\@AWK, $CMD_ENV, { 0 => $awk_in, 1 => $awk_out });
- my $sort_pid = spawn(\@SORT, $CMD_ENV,
+ my @sort_u = (@SORT, '-u');
+ my $sort_pid = spawn(\@sort_u, $CMD_ENV,
{ 0 => $sort_in, 1 => $sort_out });
my ($comm_rd, $comm_pid) = popen_rd(\@COMM, $CMD_ENV,
{ 0 => $comm_in, -C => "$TMPDIR" });
awaitpid($awk_pid, \&cmd_done, \@AWK);
- awaitpid($sort_pid, \&cmd_done, \@SORT);
+ awaitpid($sort_pid, \&cmd_done, \@sort_u);
awaitpid($comm_pid, \&cmd_done, \@COMM);
PublicInbox::CidxComm->new($comm_rd, $self); # calls cidx_read_comm
my $git_ver = PublicInbox::Git::git_version();
@@ -1092,10 +1120,9 @@ sub cidx_run { # main entry point
local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
$REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
%TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE,
- @ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC);
+ @ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC, @SORT);
local $BATCH_BYTES = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
- local @SORT = (undef, '-u');
local $self->{ASSOC_PFX} = \@ASSOC_PFX;
local $self->{PENDING} = {};
local $self->{-pi_cfg};
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH 6/7] cindex: implement dump_roots in C++
2023-08-24 1:22 [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
` (4 preceding siblings ...)
2023-08-24 1:22 ` [PATCH 5/7] cindex: fix sorting and uniqueness Eric Wong
@ 2023-08-24 1:22 ` Eric Wong
2023-08-24 1:22 ` [PATCH 7/7] xap_helper: reopen+retry in MSetIterator loops Eric Wong
2023-08-24 12:30 ` [PATCH 8/7] drop unused CidxRecvIbx.pm Eric Wong
7 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2023-08-24 1:22 UTC (permalink / raw)
To: meta
It's now just `dump_roots' instead of `dump_shard_roots', since
this doesn't need to be tied to the concept of shards. I'm
still shaky with C++, but intend to keep using stuff like
hsearch(3) to make life easier for C hackers :P
---
MANIFEST | 1 -
lib/PublicInbox/CidxDumpShardRoots.pm | 73 ------
lib/PublicInbox/CidxXapHelperAux.pm | 10 +-
lib/PublicInbox/CodeSearchIdx.pm | 56 ++---
lib/PublicInbox/XapHelper.pm | 52 +++-
lib/PublicInbox/xap_helper.h | 332 +++++++++++++++++++++++---
t/xap_helper.t | 44 +++-
7 files changed, 407 insertions(+), 161 deletions(-)
delete mode 100644 lib/PublicInbox/CidxDumpShardRoots.pm
diff --git a/MANIFEST b/MANIFEST
index 4f61af42..4bccc849 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -162,7 +162,6 @@ lib/PublicInbox/AltId.pm
lib/PublicInbox/AutoReap.pm
lib/PublicInbox/Cgit.pm
lib/PublicInbox/CidxComm.pm
-lib/PublicInbox/CidxDumpShardRoots.pm
lib/PublicInbox/CidxLogP.pm
lib/PublicInbox/CidxRecvIbx.pm
lib/PublicInbox/CidxXapHelperAux.pm
diff --git a/lib/PublicInbox/CidxDumpShardRoots.pm b/lib/PublicInbox/CidxDumpShardRoots.pm
deleted file mode 100644
index 34afa419..00000000
--- a/lib/PublicInbox/CidxDumpShardRoots.pm
+++ /dev/null
@@ -1,73 +0,0 @@
-# Copyright (C) all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-
-# Intended for PublicInbox::DS::event_loop for -cindex --associate
-# Iterating through mset->items is slow in Perl due to method dispatch
-# and that loop may implemented in C++ using Xapian directly
-package PublicInbox::CidxDumpShardRoots;
-use v5.12;
-use PublicInbox::Lock;
-use PublicInbox::Search qw(xap_terms);
-use Socket qw(MSG_EOR);
-
-sub start {
- my ($cidx, $root2id, $qry_str) = @_;
- my $op_p = delete($cidx->{0}) // die 'BUG: no {0} op_p';
- my $sort_w = delete($cidx->{1}) // die 'BUG: no {1} $w sort pipe';
- # sort lock is necessary if we have may root ids which cause a
- # row length to exceed POSIX PIPE_BUF (via `$G' below)
- my $sort_lk = bless { lock_path => $cidx->tmpdir.'/to_root_id.lock' },
- 'PublicInbox::Lock';
- $sort_w->autoflush(1);
- $cidx->begin_txn_lazy; # only using txn to simplify writer subs
- my $opt = { limit => $cidx->assoc_max_init, relevance => -2 };
- my $self = bless {
- cidx => $cidx,
- op_p => $op_p,
- iter => 0,
- mset => $cidx->mset($qry_str, $opt),
- root2id => $root2id,
- sort_w => $sort_w,
- sort_lk => $sort_lk,
- }, __PACKAGE__;
- event_step($self);
-}
-
-sub event_step {
- my ($self) = @_;
- my $cidx = $self->{cidx};
- return if $cidx->do_quit;
- my $last = $self->{mset}->size - 1;
- my $cur = $self->{iter};
- my $end = $cur + 9999;
- $end = $last if $end > $last;
- $self->{iter} = $end + 1;
- local $0 = "dumping shard [$cidx->{shard}] $cur..$end";
- $cidx->progress($0);
-
- my $root2id = $self->{root2id};
- my $buf = '';
- for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop
- my $doc = $x->get_document;
- my $G = join(' ', map {
- $root2id->{pack('H*', $_)};
- } xap_terms('G', $doc));
- for my $p (@{$cidx->{ASSOC_PFX}}) {
- $buf .= "$_ $G\n" for (xap_terms($p, $doc));
- }
- }
- $self->{sort_lk}->lock_acquire_fast;
- print { $self->{sort_w} } $buf or die "print: $!";
- $self->{sort_lk}->lock_release_fast;
- $end < $last && !$cidx->do_quit and
- PublicInbox::DS::requeue($self);
-}
-
-sub DESTROY {
- my ($self) = @_;
- return if $self->{cidx}->do_quit;
- send($self->{op_p},
- "dump_shard_roots_done $self->{cidx}->{shard}", MSG_EOR);
-}
-
-1;
diff --git a/lib/PublicInbox/CidxXapHelperAux.pm b/lib/PublicInbox/CidxXapHelperAux.pm
index c9a5ddad..f402bde0 100644
--- a/lib/PublicInbox/CidxXapHelperAux.pm
+++ b/lib/PublicInbox/CidxXapHelperAux.pm
@@ -10,12 +10,8 @@ use PublicInbox::Syscall qw(EPOLLIN);
# rpipe connects to req->fp[1] in xap_helper.h
sub new {
- my ($cls, $rpipe, $cidx, $pfx, $associate) = @_;
- my $self = bless {
- cidx => $cidx,
- pfx => $pfx,
- associate => $associate
- }, $cls;
+ my ($cls, $rpipe, $cidx, $pfx) = @_;
+ my $self = bless { cidx => $cidx, pfx => $pfx }, $cls;
$rpipe->blocking(0);
$self->SUPER::new($rpipe, EPOLLIN);
}
@@ -36,7 +32,7 @@ sub event_step {
my @lines = split(/^/m, $buf);
$self->{buf} = pop @lines if substr($lines[-1], -1) ne "\n";
for my $l (@lines) {
- if ($l =~ /\Amset\.size=[0-9]+\n\z/) {
+ if ($l =~ /\Amset\.size=[0-9]+ nr_out=[0-9]+\n\z/) {
delete $self->{cidx}->{PENDING}->{$pfx};
$self->{cidx}->index_next;
}
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 4a41b1da..404d6826 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -87,7 +87,6 @@ our (
@AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands
@ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST
$QRY_STR, # common query string for both code and inbox associations
- @DUMP_SHARD_ROOTS_OK, # for associate
$DUMP_IBX_WPIPE, # goes to sort(1)
@ID2ROOT,
);
@@ -505,14 +504,6 @@ sub shard_commit { # via wq_io_do
send($op_p, "shard_done $self->{shard}", MSG_EOR);
}
-sub dump_shard_roots_done { # via PktOp on dump_shard_roots completion
- my ($self, $associate, $n) = @_;
- return if $DO_QUIT;
- progress($self, "dump_shard_roots [$n] done");
- $DUMP_SHARD_ROOTS_OK[$n] = 1;
- # may run associate()
-}
-
sub assoc_max_init ($) {
my ($self) = @_;
my $max = $self->{-opt}->{'associate-max'} // $ASSOC_MAX;
@@ -520,38 +511,39 @@ sub assoc_max_init ($) {
$max < 0 ? ((2 ** 31) - 1) : $max;
}
-# dump the patchids of each shard: $XDFID $ROOT1 $ROOT2..
-sub dump_shard_roots { # via wq_io_do for associate
- my ($self, $root2id, $qry_str) = @_;
- PublicInbox::CidxDumpShardRoots::start($self, $root2id, $qry_str);
-}
-
sub dump_roots_once {
my ($self, $associate) = @_;
$associate // die 'BUG: no $associate';
$TODO{associating} = 1; # keep shards_active() happy
progress($self, 'dumping IDs from coderepos');
local $self->{xdb};
- @ID2ROOT = map { pack('H*', $_) } $self->all_terms('G');
- my $id = 0;
- my %root2id = map { $_ => $id++ } @ID2ROOT;
- # dump_shard_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_id
+ @ID2ROOT = $self->all_terms('G');
+ my $root2id = "$TMPDIR/root2id";
+ open my $fh, '>', $root2id or die "open($root2id): $!";
+ my $nr = -1;
+ for (@ID2ROOT) { print $fh $_, "\0", ++$nr, "\0" } # mmap-friendly
+ close $fh or die "close: $!";
+ # dump_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_id
pipe(my ($sort_r, $sort_w)) or die "pipe: $!";
pipe(my ($fold_r, $fold_w)) or die "pipe: $!";
my @sort = (@SORT, '-k1,1');
my $dst = "$TMPDIR/to_root_id";
- open my $fh, '>', $dst or die "open($dst): $!";
+ open $fh, '>', $dst or die "open($dst): $!";
my $env = { %$CMD_ENV, OFS => ' ' };
my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fold_w });
my $fold_pid = spawn(\@UNIQ_FOLD, $env, { 0 => $fold_r, 1 => $fh });
awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
awaitpid($fold_pid, \&cmd_done, [@UNIQ_FOLD, '(shards)'], $associate);
- my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{dump_shard_roots_done} = [ $self, $associate ];
- my @arg = ('dump_shard_roots', [ $p->{op_p}, $sort_w ],
- \%root2id, $QRY_STR);
- $_->wq_io_do(@arg) for @IDX_SHARDS;
- progress($self, 'waiting on dump_shard_roots sort');
+ my @arg = ((map { ('-A', $_) } @ASSOC_PFX), '-c',
+ '-m', assoc_max_init($self), $root2id, $QRY_STR);
+ for my $d ($self->shard_dirs) {
+ pipe(my ($err_r, $err_w)) or die "pipe: $!";
+ $XHC->mkreq([$sort_w, $err_w], qw(dump_roots -d), $d, @arg);
+ my $desc = "dump_roots $d";
+ $self->{PENDING}->{$desc} = $associate;
+ PublicInbox::CidxXapHelperAux->new($err_r, $self, $desc);
+ }
+ progress($self, 'waiting on dump_roots sort');
}
sub dump_ibx { # sends to xap_helper.h
@@ -563,8 +555,8 @@ sub dump_ibx { # sends to xap_helper.h
pipe(my ($r, $w)) or die "pipe: $!";
$XHC->mkreq([$DUMP_IBX_WPIPE, $w], @cmd);
my $ekey = $ibx->eidx_key;
- $self->{PENDING}->{$ekey} = undef;
- PublicInbox::CidxXapHelperAux->new($r, $self, $ekey, $TODO{associate});
+ $self->{PENDING}->{$ekey} = $TODO{associate};
+ PublicInbox::CidxXapHelperAux->new($r, $self, $ekey);
}
sub dump_ibx_start {
@@ -885,8 +877,7 @@ sub associate {
my ($self) = @_;
return if $DO_QUIT;
@IDX_SHARDS or return warn("# aborting on no shards\n");
- grep(defined, @DUMP_SHARD_ROOTS_OK) == @IDX_SHARDS or
- die "E: shards not dumped properly\n";
+ unlink("$TMPDIR/root2id");
my @pending = keys %{$self->{PENDING}};
die "E: pending=@pending jobs not done\n" if @pending;
progress($self, 'associating...');
@@ -906,7 +897,7 @@ sub associate {
my $nr = $score{$k};
my ($ibx_id, $root) = split(/ /, $k);
my $ekey = $IBX[$ibx_id]->eidx_key;
- $root = unpack('H*', $ID2ROOT[$root]);
+ $root = $ID2ROOT[$root];
progress($self, "$ekey => $root has $nr matches");
}
delete $TODO{associating}; # break out of shards_active()
@@ -1048,7 +1039,6 @@ sub cidx_read_comm { # via PublicInbox::CidxComm::event_step
sub init_associate_prefork ($) {
my ($self) = @_;
return unless $self->{-opt}->{associate};
- require PublicInbox::CidxDumpShardRoots;
require PublicInbox::CidxXapHelperAux;
require PublicInbox::XapClient;
$self->{-pi_cfg} = PublicInbox::Config->new;
@@ -1120,7 +1110,7 @@ sub cidx_run { # main entry point
local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
$REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
%TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE,
- @ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC, @SORT);
+ @ID2ROOT, $XH_PID, $XHC, @SORT);
local $BATCH_BYTES = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
local $self->{ASSOC_PFX} = \@ASSOC_PFX;
diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm
index bf2f99a2..c80be810 100644
--- a/lib/PublicInbox/XapHelper.pm
+++ b/lib/PublicInbox/XapHelper.pm
@@ -8,7 +8,9 @@ use Getopt::Long (); # good API even if we only use short options
our $GLP = Getopt::Long::Parser->new;
$GLP->configure(qw(require_order bundling no_ignore_case no_auto_abbrev));
use PublicInbox::Search qw(xap_terms);
+use PublicInbox::CodeSearch;
use PublicInbox::IPC;
+use Fcntl qw(LOCK_UN LOCK_EX);
my $X = \%PublicInbox::Search::X;
our (%SRCH, %PIDS, $parent_pid);
our $stderr = \*STDERR;
@@ -44,15 +46,63 @@ sub cmd_dump_ibx {
my $mset = $req->{srch}->mset($qry_str, $opt);
my $out = $req->{0};
$out->autoflush(1);
+ my $nr = 0;
for my $it ($mset->items) {
my $doc = $it->get_document;
for my $p (@pfx) {
for (xap_terms($p, $doc)) {
print $out "$_ $ibx_id\n" or die "print: $!";
+ ++$nr;
}
}
}
- if (my $err = $req->{1}) { say $err 'mset.size=', $mset->size }
+ if (my $err = $req->{1}) {
+ say $err 'mset.size='.$mset->size.' nr_out='.$nr
+ }
+}
+
+sub cmd_dump_roots {
+ my ($req, $root2id_file, $qry_str) = @_;
+ $qry_str // return
+ warn('usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR');
+ my @pfx = @{$req->{A}} or return warn('dump_roots requires -A PREFIX');
+ open my $fh, '<', $root2id_file or die "open($root2id_file): $!";
+ my %root2id; # record format: $OIDHEX "\0" uint32_t
+ my @x = split(/\0/, do { local $/; <$fh> } // die "readline: $!");
+ while (@x) {
+ my $oidhex = shift @x;
+ $root2id{$oidhex} = shift @x;
+ }
+ my $opt = { relevance => -1, limit => $req->{'m'},
+ offset => $req->{o} // 0 };
+ my $mset = $req->{srch}->mset($qry_str, $opt);
+ $req->{0}->autoflush(1);
+ my $buf = '';
+ my $nr = 0;
+ for my $it ($mset->items) {
+ my $doc = $it->get_document;
+ my $G = join(' ', map { $root2id{$_} } xap_terms('G', $doc));
+ for my $p (@pfx) {
+ for (xap_terms($p, $doc)) {
+ $buf .= "$_ $G\n";
+ ++$nr;
+ }
+ }
+ if (!($nr & 0x3fff)) {
+ flock($fh, LOCK_EX) or die "flock: $!";
+ print { $req->{0} } $buf or die "print: $!";
+ flock($fh, LOCK_UN) or die "flock: $!";
+ $buf = '';
+ }
+ }
+ if ($buf ne '') {
+ flock($fh, LOCK_EX) or die "flock: $!";
+ print { $req->{0} } $buf or die "print: $!";
+ flock($fh, LOCK_UN) or die "flock: $!";
+ }
+ if (my $err = $req->{1}) {
+ say $err 'mset.size='.$mset->size.' nr_out='.$nr
+ }
}
sub dispatch {
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
index 52db92b7..c9b4e0cc 100644
--- a/lib/PublicInbox/xap_helper.h
+++ b/lib/PublicInbox/xap_helper.h
@@ -6,12 +6,16 @@
* this is not linked to Perl in any way.
* C (not C++) is used as much as possible to lower the contribution
* barrier for hackers who mainly know C (this includes the maintainer).
+ * Yes, that means we use C stdlib stuff like hsearch and open_memstream
+ * instead their equivalents in the C++ stdlib :P
* Everything here is an unstable internal API of public-inbox and
* NOT intended for ordinary users; only public-inbox hackers
*/
#ifndef _ALL_SOURCE
# define _ALL_SOURCE
#endif
+#include <sys/file.h>
+#include <sys/mman.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/stat.h>
@@ -80,6 +84,7 @@ struct req { // argv and pfxv point into global rbuf
unsigned long long max;
unsigned long long off;
unsigned long timeout_sec;
+ size_t nr_out;
long sort_col; // value column, negative means BoolWeight
int argc;
int pfxc;
@@ -96,6 +101,28 @@ struct worker {
unsigned nr;
};
+#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst))
+static size_t split2argv(char **dst, char *buf, size_t len, size_t limit)
+{
+ if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) {
+ warnx("bogus argument given");
+ return 0;
+ }
+ size_t nr = 0;
+ char *c = buf;
+ for (size_t i = 1; i < len; i++) {
+ if (!buf[i]) {
+ dst[nr++] = c;
+ c = buf + i + 1;
+ }
+ if (nr == limit) {
+ warnx("too many args: %zu", nr);
+ return 0;
+ }
+ }
+ return (long)nr;
+}
+
static bool has_threadid(const struct srch *srch)
{
return srch->db->get_metadata("has_threadid") == "1";
@@ -118,8 +145,12 @@ static Xapian::Enquire prep_enquire(const struct req *req)
static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq)
{
- if (!req->max)
- req->max = 50;
+ if (!req->max) {
+ switch (sizeof(Xapian::doccount)) {
+ case 4: req->max = UINT_MAX; break;
+ default: req->max = ULLONG_MAX;
+ }
+ }
for (int i = 0; i < 9; i++) {
try {
Xapian::MSet mset = enq->get_mset(req->off, req->max);
@@ -131,13 +162,13 @@ static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq)
return enq->get_mset(req->off, req->max);
}
+// for v1, v2, and extindex
static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
{
struct srch *srch = req->srch;
Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
if (req->Oeidx_key) {
req->Oeidx_key[0] = 'O'; // modifies static rbuf
- fprintf(stderr, "dbg eidxkey:%s>\n", req->Oeidx_key);
qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
Xapian::Query(req->Oeidx_key));
}
@@ -150,6 +181,21 @@ static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
return enquire_mset(req, &enq);
}
+// for cindex
+static Xapian::MSet commit_mset(struct req *req, const char *qry_str)
+{
+ struct srch *srch = req->srch;
+ Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
+ // TODO: git_dir + roots_filter
+
+ // we only want commits:
+ qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
+ Xapian::Query("T" "c"));
+ Xapian::Enquire enq = prep_enquire(req);
+ enq.set_query(qry);
+ return enquire_mset(req, &enq);
+}
+
static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
{
return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
@@ -165,9 +211,11 @@ static void dump_ibx_term(struct req *req, const char *pfx,
for (cur.skip_to(pfx); cur != end; cur++) {
std::string tn = *cur;
- if (starts_with(&tn, pfx, pfx_len))
+ if (starts_with(&tn, pfx, pfx_len)) {
fprintf(req->fp[0], "%s %s\n",
tn.c_str() + pfx_len, ibx_id);
+ ++req->nr_out;
+ }
}
}
@@ -194,7 +242,6 @@ static bool cmd_dump_ibx(struct req *req)
}
req->asc = true;
req->sort_col = -1;
- req->max = (unsigned long long)req->srch->db->get_doccount();
Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]);
for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
try {
@@ -208,8 +255,244 @@ static bool cmd_dump_ibx(struct req *req)
}
}
if (req->fp[1])
- fprintf(req->fp[1], "mset.size=%llu\n",
- (unsigned long long)mset.size());
+ fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
+ (unsigned long long)mset.size(), req->nr_out);
+ return true;
+}
+
+struct fbuf {
+ FILE *fp;
+ char *ptr;
+ size_t len;
+};
+
+struct dump_roots_tmp {
+ struct stat sb;
+ void *mm_ptr;
+ char **entries;
+ struct fbuf wbuf;
+ int root2id_fd;
+};
+
+#define CLEANUP_FBUF __attribute__((__cleanup__(fbuf_ensure)))
+static void fbuf_ensure(void *ptr)
+{
+ struct fbuf *fbuf = (struct fbuf *)ptr;
+ if (fbuf->fp && fclose(fbuf->fp))
+ perror("fclose(fbuf->fp)");
+ fbuf->fp = NULL;
+ free(fbuf->ptr);
+}
+
+static bool fbuf_init(struct fbuf *fbuf)
+{
+ assert(!fbuf->ptr);
+ fbuf->fp = open_memstream(&fbuf->ptr, &fbuf->len);
+ if (fbuf->fp) return true;
+ perror("open_memstream(fbuf)");
+ return false;
+}
+
+static void xclose(int fd)
+{
+ if (close(fd) < 0 && errno != EINTR)
+ err(EXIT_FAILURE, "BUG: close");
+}
+
+#define CLEANUP_DUMP_ROOTS __attribute__((__cleanup__(dump_roots_ensure)))
+static void dump_roots_ensure(void *ptr)
+{
+ struct dump_roots_tmp *drt = (struct dump_roots_tmp *)ptr;
+ if (drt->root2id_fd >= 0)
+ xclose(drt->root2id_fd);
+ hdestroy(); // idempotent
+ if (drt->mm_ptr && munmap(drt->mm_ptr, drt->sb.st_size))
+ err(EXIT_FAILURE, "BUG: munmap");
+ free(drt->entries);
+ fbuf_ensure(&drt->wbuf);
+}
+
+static bool root2ids_str(struct fbuf *root_ids, struct dump_roots_tmp *drt,
+ Xapian::Document *doc)
+{
+ if (!fbuf_init(root_ids)) return false;
+
+ bool ok = true;
+ Xapian::TermIterator cur = doc->termlist_begin();
+ Xapian::TermIterator end = doc->termlist_end();
+ ENTRY e, *ep;
+ for (cur.skip_to("G"); cur != end; cur++) {
+ std::string tn = *cur;
+ if (!starts_with(&tn, "G", 1))
+ continue;
+ union { const char *in; char *out; } u;
+ u.in = tn.c_str() + 1;
+ e.key = u.out;
+ ep = hsearch(e, FIND);
+ if (!ep) {
+ warnx("hsearch miss `%s'", e.key);
+ return false;
+ }
+ // ep->data is a NUL-terminated string matching /[0-9]+/
+ fputc(' ', root_ids->fp);
+ fputs((const char *)ep->data, root_ids->fp);
+ }
+ fputc('\n', root_ids->fp);
+ if (ferror(root_ids->fp) | fclose(root_ids->fp)) {
+ perror("ferror|fclose(root_ids)");
+ ok = false;
+ }
+ root_ids->fp = NULL;
+ return ok;
+}
+
+// writes term values matching @pfx for a given @doc, ending the line
+// with the contents of @root_ids
+static void dump_roots_term(struct req *req, const char *pfx,
+ struct dump_roots_tmp *drt,
+ struct fbuf *root_ids,
+ Xapian::Document *doc)
+{
+ Xapian::TermIterator cur = doc->termlist_begin();
+ Xapian::TermIterator end = doc->termlist_end();
+ size_t pfx_len = strlen(pfx);
+
+ for (cur.skip_to(pfx); cur != end; cur++) {
+ std::string tn = *cur;
+ if (!starts_with(&tn, pfx, pfx_len))
+ continue;
+ fputs(tn.c_str() + pfx_len, drt->wbuf.fp);
+ fwrite(root_ids->ptr, root_ids->len, 1, drt->wbuf.fp);
+ ++req->nr_out;
+ }
+}
+
+// we may have lines which exceed PIPE_BUF, so we do our own
+// buffering and rely on flock(2), here
+static bool dump_roots_flush(struct req *req, struct dump_roots_tmp *drt)
+{
+ char *p;
+ int fd = fileno(req->fp[0]);
+ bool ok = true;
+
+ if (!drt->wbuf.fp) return true;
+ if (fd < 0) err(EXIT_FAILURE, "BUG: fileno");
+ if (fclose(drt->wbuf.fp)) {
+ warn("fclose(drt->wbuf.fp)"); // malloc failure?
+ return false;
+ }
+ drt->wbuf.fp = NULL;
+ if (!drt->wbuf.len) goto done_free;
+ if (flock(drt->root2id_fd, LOCK_EX)) {
+ perror("LOCK_EX");
+ return false;
+ }
+ p = drt->wbuf.ptr;
+ do {
+ ssize_t n = write(fd, p, drt->wbuf.len);
+ if (n > 0) {
+ drt->wbuf.len -= n;
+ p += n;
+ } else {
+ perror(n ? "write" : "write (zero bytes)");
+ return false;
+ }
+ } while (drt->wbuf.len);
+ if (flock(drt->root2id_fd, LOCK_UN)) {
+ perror("LOCK_UN");
+ return false;
+ }
+done_free:
+ free(drt->wbuf.ptr);
+ drt->wbuf.ptr = NULL;
+ return ok;
+}
+
+static bool cmd_dump_roots(struct req *req)
+{
+ CLEANUP_DUMP_ROOTS struct dump_roots_tmp drt { .root2id_fd = -1 };
+ if ((optind + 1) >= req->argc) {
+ warnx("usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR");
+ return false; // need file + qry_str
+ }
+ if (!req->pfxc) {
+ warnx("dump_roots requires -A PREFIX");
+ return false;
+ }
+ const char *root2id_file = req->argv[optind];
+ drt.root2id_fd = open(root2id_file, O_RDONLY);
+ if (drt.root2id_fd < 0) {
+ warn("open(%s)", root2id_file);
+ return false;
+ }
+ if (fstat(drt.root2id_fd, &drt.sb)) {
+ warn("fstat(%s)", root2id_file);
+ return false;
+ }
+ // each entry is at least 43 bytes ({OIDHEX}\0{INT}\0),
+ // so /32 overestimates the number of expected entries by
+ // ~%25 (as recommended by Linux hcreate(3) manpage)
+ size_t est = (drt.sb.st_size / 32) + 1;
+ if ((uint64_t)drt.sb.st_size > (uint64_t)SIZE_MAX) {
+ warnx("%s size too big (%lld bytes > %zu)", root2id_file,
+ (long long)drt.sb.st_size, SIZE_MAX);
+ return false;
+ }
+ drt.mm_ptr = mmap(NULL, drt.sb.st_size, PROT_READ,
+ MAP_PRIVATE, drt.root2id_fd, 0);
+ if (drt.mm_ptr == MAP_FAILED) {
+ warn("mmap(%s)", root2id_file);
+ return false;
+ }
+ drt.entries = (char **)calloc(est * 2, sizeof(char *));
+ if (!drt.entries) {
+ warn("calloc(%zu * 2, %zu)", est, sizeof(char *));
+ return false;
+ }
+ size_t tot = split2argv(drt.entries, (char *)drt.mm_ptr,
+ drt.sb.st_size, est * 2);
+ if (tot <= 0) return false; // split2argv already warned on error
+ if (!hcreate(est)) {
+ warn("hcreate(%zu)", est);
+ return false;
+ }
+ for (size_t i = 0; i < tot; ) {
+ ENTRY e;
+ e.key = drt.entries[i++];
+ e.data = drt.entries[i++];
+ if (!hsearch(e, ENTER)) {
+ warn("hsearch(%s => %s, ENTER)", e.key,
+ (const char *)e.data);
+ return false;
+ }
+ }
+ req->asc = true;
+ req->sort_col = -1;
+ Xapian::MSet mset = commit_mset(req, req->argv[optind + 1]);
+ for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
+ CLEANUP_FBUF struct fbuf root_ids = { 0 };
+ if (!drt.wbuf.fp && !fbuf_init(&drt.wbuf))
+ return false;
+ try {
+ Xapian::Document doc = i.get_document();
+ if (!root2ids_str(&root_ids, &drt, &doc))
+ return false;
+ for (int p = 0; p < req->pfxc; p++)
+ dump_roots_term(req, req->pfxv[p], &drt,
+ &root_ids, &doc);
+ } catch (const Xapian::Error & e) {
+ fprintf(orig_err, "W: %s (#%ld)\n",
+ e.get_description().c_str(), (long)(*i));
+ continue;
+ }
+ if (!(req->nr_out & 0x3fff) && !dump_roots_flush(req, &drt))
+ return false;
+ }
+ if (!dump_roots_flush(req, &drt))
+ return false;
+ if (req->fp[1])
+ fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
+ (unsigned long long)mset.size(), req->nr_out);
return true;
}
@@ -228,7 +511,8 @@ static const struct cmd_entry {
cmd fn;
} cmds[] = { // should be small enough to not need bsearch || gperf
// most common commands first
- CMD(dump_ibx),
+ CMD(dump_ibx), // many inboxes
+ CMD(dump_roots), // per-cidx shard
CMD(test_inspect), // least common commands last
};
@@ -240,12 +524,6 @@ union my_cmsg {
char pad[sizeof(struct cmsghdr) + 16 + RECV_FD_SPACE];
};
-static void xclose(int fd)
-{
- if (close(fd) < 0 && errno != EINTR)
- err(EXIT_FAILURE, "BUG: close");
-}
-
static bool recv_req(struct req *req, char *rbuf, size_t *len)
{
union my_cmsg cmsg = { 0 };
@@ -306,28 +584,6 @@ static bool recv_req(struct req *req, char *rbuf, size_t *len)
return false;
}
-#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst))
-static int split2argv(char **dst, char *buf, size_t len, size_t limit)
-{
- if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) {
- warnx("bogus argument given");
- return 0;
- }
- size_t nr = 0;
- char *c = buf;
- for (size_t i = 1; i < len; i++) {
- if (!buf[i]) {
- dst[nr++] = c;
- c = buf + i + 1;
- }
- if (nr == limit) {
- warnx("too many args: %zu", nr);
- return 0;
- }
- }
- return (int)nr;
-}
-
static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch
{
const struct srch *a = (const struct srch *)pa;
@@ -355,7 +611,7 @@ static bool srch_init(struct req *req)
char *dirv[MY_ARG_MAX];
int i;
struct srch *srch = req->srch;
- int dirc = SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len);
+ int dirc = (int)SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len);
const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE;
srch->qp_flags = FLAG_PHRASE |
Xapian::QueryParser::FLAG_BOOLEAN |
@@ -538,7 +794,7 @@ static void recv_loop(void) // worker process loop
perror("W: setlinebuf(req.fp[1])");
stderr = req.fp[1];
}
- req.argc = SPLIT2ARGV(req.argv, rbuf, len);
+ req.argc = (int)SPLIT2ARGV(req.argv, rbuf, len);
if (req.argc > 0)
dispatch(&req);
if (ferror(req.fp[0]) | fclose(req.fp[0]))
diff --git a/t/xap_helper.t b/t/xap_helper.t
index f00a845a..92da2e6d 100644
--- a/t/xap_helper.t
+++ b/t/xap_helper.t
@@ -91,7 +91,7 @@ my $test = sub {
my $res = do { local $/; <$r> };
is(join('', @res), $res, 'got identical response w/ error pipe');
my $stats = do { local $/; <$err_rd> };
- is($stats, "mset.size=6\n", 'mset.size reported');
+ is($stats, "mset.size=6 nr_out=6\n", 'mset.size reported');
if ($arg[-1] !~ /\('-j0'\)/) {
kill('KILL', $cinfo{pid});
@@ -105,12 +105,14 @@ my $test = sub {
};
my $ar;
-$ar = $test->(qw[-MPublicInbox::XapHelper -e
- PublicInbox::XapHelper::start('-j0')]);
-$ar = $test->(qw[-MPublicInbox::XapHelper -e
- PublicInbox::XapHelper::start('-j1')]);
-
-my @NO_CXX = (0);
+my @NO_CXX;
+if (!$ENV{TEST_XH_CXX_ONLY}) {
+ $ar = $test->(qw[-MPublicInbox::XapHelper -e
+ PublicInbox::XapHelper::start('-j0')]);
+ $ar = $test->(qw[-MPublicInbox::XapHelper -e
+ PublicInbox::XapHelper::start('-j1')]);
+ push @NO_CXX, 0;
+}
SKIP: {
eval {
require PublicInbox::XapHelperCxx;
@@ -125,6 +127,20 @@ SKIP: {
PublicInbox::XapHelperCxx::start('-j1')]);
};
+require PublicInbox::CodeSearch;
+my $cs_int = PublicInbox::CodeSearch->new("$crepo/public-inbox-cindex");
+my $root2id_file = "$tmp/root2id";
+my @id2root;
+{
+ open my $fh, '>', $root2id_file;
+ my $i = -1;
+ for ($cs_int->all_terms('G')) {
+ print $fh $_, "\0", ++$i, "\0";
+ $id2root[$i] = $_;
+ }
+ close $fh;
+}
+
for my $n (@NO_CXX) {
local $ENV{PI_NO_CXX} = $n;
my ($xhc, $pid) = PublicInbox::XapClient::start_helper('-j0');
@@ -141,7 +157,19 @@ for my $n (@NO_CXX) {
my $res = do { local $/; <$r> };
is($res, "$dfid 9\n$mid 9\n", "got expected result ($xhc->{impl})");
my $err = do { local $/; <$err_r> };
- is($err, "mset.size=1\n", "got expected status ($xhc->{impl})");
+ is($err, "mset.size=1 nr_out=2\n", "got expected status ($xhc->{impl})");
+
+ pipe($err_r, $err_w);
+ $r = $xhc->mkreq([ undef, $err_w ], qw(dump_roots -c -A XDFID),
+ (map { ('-d', $_) } @int),
+ $root2id_file, 'dt:19700101'.'000000..');
+ close $err_w;
+ my @res = <$r>;
+ is(scalar(@res), 5, 'got expected rows');
+ is(scalar(@res), scalar(grep(/\A[0-9a-f]{40,} [0-9]+\n\z/, @res)),
+ 'entries match format');
+ $err = do { local $/; <$err_r> };
+ is($err, "mset.size=6 nr_out=5\n", "got expected status ($xhc->{impl})");
}
done_testing;
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH 7/7] xap_helper: reopen+retry in MSetIterator loops
2023-08-24 1:22 [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
` (5 preceding siblings ...)
2023-08-24 1:22 ` [PATCH 6/7] cindex: implement dump_roots in C++ Eric Wong
@ 2023-08-24 1:22 ` Eric Wong
2023-08-24 12:30 ` [PATCH 8/7] drop unused CidxRecvIbx.pm Eric Wong
7 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2023-08-24 1:22 UTC (permalink / raw)
To: meta
It's possible to hit a DatabaseModifiedError while iterating
through an MSet. We'll retry in these cases and cleanup some
code in both the Perl and C++ implementations.
---
lib/PublicInbox/XapHelper.pm | 102 +++++++++++++++++++++++------------
lib/PublicInbox/xap_helper.h | 101 +++++++++++++++++++++++-----------
2 files changed, 136 insertions(+), 67 deletions(-)
diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm
index c80be810..ef6a47a3 100644
--- a/lib/PublicInbox/XapHelper.pm
+++ b/lib/PublicInbox/XapHelper.pm
@@ -36,28 +36,74 @@ sub cmd_test_inspect {
($req->{srch}->has_threadid ? 1 : 0)
}
+sub iter_retry_check ($) {
+ die unless ref($@) =~ /\bDatabaseModifiedError\b/;
+ $_[0]->{srch}->reopen;
+ undef; # retries
+}
+
+sub dump_ibx_iter ($$$) {
+ my ($req, $ibx_id, $it) = @_;
+ my $out = $req->{0};
+ eval {
+ my $doc = $it->get_document;
+ for my $p (@{$req->{A}}) {
+ for (xap_terms($p, $doc)) {
+ print $out "$_ $ibx_id\n" or die "print: $!";
+ ++$req->{nr_out};
+ }
+ }
+ };
+ $@ ? iter_retry_check($req) : 0;
+}
+
+sub emit_mset_stats ($$) {
+ my ($req, $mset) = @_;
+ my $err = $req->{1} or return;
+ say $err 'mset.size='.$mset->size.' nr_out='.$req->{nr_out}
+}
+
sub cmd_dump_ibx {
my ($req, $ibx_id, $qry_str) = @_;
$qry_str // return warn('usage: dump_ibx [OPTIONS] IBX_ID QRY_STR');
- my @pfx = @{$req->{A}} or return warn('dump_ibx requires -A PREFIX');
+ $req->{A} or return warn('dump_ibx requires -A PREFIX');
my $max = $req->{srch}->{xdb}->get_doccount;
my $opt = { relevance => -1, limit => $max, offset => $req->{o} // 0 };
$opt->{eidx_key} = $req->{O} if defined $req->{O};
my $mset = $req->{srch}->mset($qry_str, $opt);
- my $out = $req->{0};
- $out->autoflush(1);
- my $nr = 0;
+ $req->{0}->autoflush(1);
for my $it ($mset->items) {
+ for (my $t = 10; $t > 0; --$t) {
+ $t = dump_ibx_iter($req, $ibx_id, $it) // $t;
+ }
+ }
+ if (my $err = $req->{1}) {
+ say $err 'mset.size='.$mset->size.' nr_out='.$req->{nr_out}
+ }
+}
+
+sub dump_roots_iter ($$$) {
+ my ($req, $root2id, $it) = @_;
+ eval {
my $doc = $it->get_document;
- for my $p (@pfx) {
+ my $G = join(' ', map { $root2id->{$_} } xap_terms('G', $doc));
+ for my $p (@{$req->{A}}) {
for (xap_terms($p, $doc)) {
- print $out "$_ $ibx_id\n" or die "print: $!";
- ++$nr;
+ $req->{wbuf} .= "$_ $G\n";
+ ++$req->{nr_out};
}
}
- }
- if (my $err = $req->{1}) {
- say $err 'mset.size='.$mset->size.' nr_out='.$nr
+ };
+ $@ ? iter_retry_check($req) : 0;
+}
+
+sub dump_roots_flush ($$) {
+ my ($req, $fh) = @_;
+ if ($req->{wbuf} ne '') {
+ flock($fh, LOCK_EX) or die "flock: $!";
+ print { $req->{0} } $req->{wbuf} or die "print: $!";
+ flock($fh, LOCK_UN) or die "flock: $!";
+ $req->{wbuf} = '';
}
}
@@ -65,44 +111,29 @@ sub cmd_dump_roots {
my ($req, $root2id_file, $qry_str) = @_;
$qry_str // return
warn('usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR');
- my @pfx = @{$req->{A}} or return warn('dump_roots requires -A PREFIX');
+ $req->{A} or return warn('dump_roots requires -A PREFIX');
open my $fh, '<', $root2id_file or die "open($root2id_file): $!";
- my %root2id; # record format: $OIDHEX "\0" uint32_t
+ my $root2id; # record format: $OIDHEX "\0" uint32_t
my @x = split(/\0/, do { local $/; <$fh> } // die "readline: $!");
while (@x) {
my $oidhex = shift @x;
- $root2id{$oidhex} = shift @x;
+ $root2id->{$oidhex} = shift @x;
}
my $opt = { relevance => -1, limit => $req->{'m'},
offset => $req->{o} // 0 };
my $mset = $req->{srch}->mset($qry_str, $opt);
$req->{0}->autoflush(1);
- my $buf = '';
- my $nr = 0;
+ $req->{wbuf} = '';
for my $it ($mset->items) {
- my $doc = $it->get_document;
- my $G = join(' ', map { $root2id{$_} } xap_terms('G', $doc));
- for my $p (@pfx) {
- for (xap_terms($p, $doc)) {
- $buf .= "$_ $G\n";
- ++$nr;
- }
+ for (my $t = 10; $t > 0; --$t) {
+ $t = dump_roots_iter($req, $root2id, $it) // $t;
}
- if (!($nr & 0x3fff)) {
- flock($fh, LOCK_EX) or die "flock: $!";
- print { $req->{0} } $buf or die "print: $!";
- flock($fh, LOCK_UN) or die "flock: $!";
- $buf = '';
+ if (!($req->{nr_out} & 0x3fff)) {
+ dump_roots_flush($req, $fh);
}
}
- if ($buf ne '') {
- flock($fh, LOCK_EX) or die "flock: $!";
- print { $req->{0} } $buf or die "print: $!";
- flock($fh, LOCK_UN) or die "flock: $!";
- }
- if (my $err = $req->{1}) {
- say $err 'mset.size='.$mset->size.' nr_out='.$nr
- }
+ dump_roots_flush($req, $fh);
+ emit_mset_stats($req, $mset);
}
sub dispatch {
@@ -153,6 +184,7 @@ sub recv_loop {
next;
}
my @argv = split(/\0/, $rbuf);
+ $req->{nr_out} = 0;
eval { $req->dispatch(@argv) } if @argv;
}
}
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
index c9b4e0cc..e3ccfd41 100644
--- a/lib/PublicInbox/xap_helper.h
+++ b/lib/PublicInbox/xap_helper.h
@@ -63,6 +63,12 @@ static void code_nrp_init(void);
static void qp_init_mail_search(Xapian::QueryParser *);
static void qp_init_code_search(Xapian::QueryParser *);
+enum exc_iter {
+ ITER_OK = 0,
+ ITER_RETRY,
+ ITER_ABORT
+};
+
struct srch {
int paths_len; // int for comparisons
unsigned qp_flags;
@@ -196,6 +202,13 @@ static Xapian::MSet commit_mset(struct req *req, const char *qry_str)
return enquire_mset(req, &enq);
}
+static void emit_mset_stats(struct req *req, const Xapian::MSet *mset)
+{
+ if (req->fp[1])
+ fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
+ (unsigned long long)mset->size(), req->nr_out);
+}
+
static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
{
return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
@@ -224,6 +237,20 @@ static int my_setlinebuf(FILE *fp) // glibc setlinebuf(3) can't report errors
return setvbuf(fp, NULL, _IOLBF, 0);
}
+static enum exc_iter dump_ibx_iter(struct req *req, const char *ibx_id,
+ Xapian::MSetIterator *i)
+{
+ try {
+ Xapian::Document doc = i->get_document();
+ for (int p = 0; p < req->pfxc; p++)
+ dump_ibx_term(req, req->pfxv[p], &doc, ibx_id);
+ } catch (const Xapian::DatabaseModifiedError & e) {
+ req->srch->db->reopen();
+ return ITER_RETRY;
+ }
+ return ITER_OK;
+}
+
static bool cmd_dump_ibx(struct req *req)
{
if ((optind + 1) >= req->argc) {
@@ -243,20 +270,18 @@ static bool cmd_dump_ibx(struct req *req)
req->asc = true;
req->sort_col = -1;
Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]);
+
+ // @UNIQ_FOLD in CodeSearchIdx.pm can handle duplicate lines fine
+ // in case we need to retry on DB reopens
for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
- try {
- Xapian::Document doc = i.get_document();
- for (int p = 0; p < req->pfxc; p++)
- dump_ibx_term(req, req->pfxv[p], &doc, ibx_id);
- } catch (const Xapian::Error & e) {
- fprintf(orig_err, "W: %s (#%ld)\n",
- e.get_description().c_str(), (long)(*i));
- continue;
- }
+ for (int t = 10; t > 0; --t)
+ switch (dump_ibx_iter(req, ibx_id, &i)) {
+ case ITER_OK: t = 0; break; // leave inner loop
+ case ITER_RETRY: break; // continue for-loop
+ case ITER_ABORT: return false; // error
+ }
}
- if (req->fp[1])
- fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
- (unsigned long long)mset.size(), req->nr_out);
+ emit_mset_stats(req, &mset);
return true;
}
@@ -312,8 +337,7 @@ static void dump_roots_ensure(void *ptr)
fbuf_ensure(&drt->wbuf);
}
-static bool root2ids_str(struct fbuf *root_ids, struct dump_roots_tmp *drt,
- Xapian::Document *doc)
+static bool root2ids_str(struct fbuf *root_ids, Xapian::Document *doc)
{
if (!fbuf_init(root_ids)) return false;
@@ -408,9 +432,28 @@ done_free:
return ok;
}
+static enum exc_iter dump_roots_iter(struct req *req,
+ struct dump_roots_tmp *drt,
+ Xapian::MSetIterator *i)
+{
+ CLEANUP_FBUF struct fbuf root_ids = { 0 }; // " $ID0 $ID1 $IDx..\n"
+ try {
+ Xapian::Document doc = i->get_document();
+ if (!root2ids_str(&root_ids, &doc))
+ return ITER_ABORT; // bad request, abort
+ for (int p = 0; p < req->pfxc; p++)
+ dump_roots_term(req, req->pfxv[p], drt,
+ &root_ids, &doc);
+ } catch (const Xapian::DatabaseModifiedError & e) {
+ req->srch->db->reopen();
+ return ITER_RETRY;
+ }
+ return ITER_OK;
+}
+
static bool cmd_dump_roots(struct req *req)
{
- CLEANUP_DUMP_ROOTS struct dump_roots_tmp drt { .root2id_fd = -1 };
+ CLEANUP_DUMP_ROOTS struct dump_roots_tmp drt = { .root2id_fd = -1 };
if ((optind + 1) >= req->argc) {
warnx("usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR");
return false; // need file + qry_str
@@ -432,7 +475,7 @@ static bool cmd_dump_roots(struct req *req)
// each entry is at least 43 bytes ({OIDHEX}\0{INT}\0),
// so /32 overestimates the number of expected entries by
// ~%25 (as recommended by Linux hcreate(3) manpage)
- size_t est = (drt.sb.st_size / 32) + 1;
+ size_t est = (drt.sb.st_size / 32) + 1; //+1 for "\0" termination
if ((uint64_t)drt.sb.st_size > (uint64_t)SIZE_MAX) {
warnx("%s size too big (%lld bytes > %zu)", root2id_file,
(long long)drt.sb.st_size, SIZE_MAX);
@@ -469,30 +512,24 @@ static bool cmd_dump_roots(struct req *req)
req->asc = true;
req->sort_col = -1;
Xapian::MSet mset = commit_mset(req, req->argv[optind + 1]);
+
+ // @UNIQ_FOLD in CodeSearchIdx.pm can handle duplicate lines fine
+ // in case we need to retry on DB reopens
for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
- CLEANUP_FBUF struct fbuf root_ids = { 0 };
if (!drt.wbuf.fp && !fbuf_init(&drt.wbuf))
return false;
- try {
- Xapian::Document doc = i.get_document();
- if (!root2ids_str(&root_ids, &drt, &doc))
- return false;
- for (int p = 0; p < req->pfxc; p++)
- dump_roots_term(req, req->pfxv[p], &drt,
- &root_ids, &doc);
- } catch (const Xapian::Error & e) {
- fprintf(orig_err, "W: %s (#%ld)\n",
- e.get_description().c_str(), (long)(*i));
- continue;
- }
+ for (int t = 10; t > 0; --t)
+ switch (dump_roots_iter(req, &drt, &i)) {
+ case ITER_OK: t = 0; break; // leave inner loop
+ case ITER_RETRY: break; // continue for-loop
+ case ITER_ABORT: return false; // error
+ }
if (!(req->nr_out & 0x3fff) && !dump_roots_flush(req, &drt))
return false;
}
if (!dump_roots_flush(req, &drt))
return false;
- if (req->fp[1])
- fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
- (unsigned long long)mset.size(), req->nr_out);
+ emit_mset_stats(req, &mset);
return true;
}
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH 8/7] drop unused CidxRecvIbx.pm
2023-08-24 1:22 [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
` (6 preceding siblings ...)
2023-08-24 1:22 ` [PATCH 7/7] xap_helper: reopen+retry in MSetIterator loops Eric Wong
@ 2023-08-24 12:30 ` Eric Wong
7 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2023-08-24 12:30 UTC (permalink / raw)
To: meta
This is no longer needed since xap_helper performs its
functionality while having an optional C++ implementation
which is being significantly faster.
---
n.b. I used --irreversible-delete to save bandwidth
MANIFEST | 1 -
lib/PublicInbox/CidxRecvIbx.pm | 46 ----------------------------------
2 files changed, 47 deletions(-)
delete mode 100644 lib/PublicInbox/CidxRecvIbx.pm
diff --git a/MANIFEST b/MANIFEST
index 4bccc849..918ec2e1 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -163,7 +163,6 @@ lib/PublicInbox/AutoReap.pm
lib/PublicInbox/Cgit.pm
lib/PublicInbox/CidxComm.pm
lib/PublicInbox/CidxLogP.pm
-lib/PublicInbox/CidxRecvIbx.pm
lib/PublicInbox/CidxXapHelperAux.pm
lib/PublicInbox/CmdIPC4.pm
lib/PublicInbox/CodeSearch.pm
diff --git a/lib/PublicInbox/CidxRecvIbx.pm b/lib/PublicInbox/CidxRecvIbx.pm
deleted file mode 100644
index 6add8e54..00000000
^ permalink raw reply related [flat|nested] 11+ messages in thread