* [PATCH 0/7] cindex: optional C++ Xapian helper
@ 2023-08-24 1:22 Eric Wong
2023-08-24 1:22 ` [PATCH 1/7] search: hoist out shards_dir for future use Eric Wong
` (7 more replies)
0 siblings, 8 replies; 11+ messages in thread
From: Eric Wong @ 2023-08-24 1:22 UTC (permalink / raw)
To: meta
Associating inboxes with coderepos is an extremely expensive
operation, especially for Perl (even with XS or SWIG) as Perl's
method dispatch overhead to dump data out of Xapian becomes
noticeable.
The actual association is fast with POSIX sort(1) and join(1);
but getting the necessary data out of Xapian to join on is
expensive as neither quest(1) nor xapian-delve(1) are suitable
for this task.
The actual association data isn't stored or usable anywhere,
yet, and some of them are too loose to be useful. More
work is required on that point....
The association could probably be faster with rculfhash (from
Userspace-RCU), but I don't think it's worth the maintenance and
installation overhead for this (though I intend to use rculfhash
for the FUSE shim).
These performance problems weren't as noticeable in the past
since our other Xapian uses spent significant amounts of time
when retrieving document data from SQLite and blobs from git.
Using the C++ implementation of xap_helper.h allows a full join
(without limits or date ranges) of lore + git.kernel.org repos
within one hour on my ancient system while the Perl+(XS|SWIG)
implementation took roughly 8 hours.
Eric Wong (7):
search: hoist out shards_dir for future use
cindex: read-only association dump
cindex: add --show-roots switch
introduce optional C++ xap_helper
cindex: fix sorting and uniqueness
cindex: implement dump_roots in C++
xap_helper: reopen+retry in MSetIterator loops
MANIFEST | 7 +
lib/PublicInbox/CidxRecvIbx.pm | 46 ++
lib/PublicInbox/CidxXapHelperAux.pm | 44 ++
lib/PublicInbox/CodeSearch.pm | 54 +-
lib/PublicInbox/CodeSearchIdx.pm | 349 ++++++++--
lib/PublicInbox/Config.pm | 2 +-
lib/PublicInbox/Isearch.pm | 5 +
lib/PublicInbox/Search.pm | 92 ++-
lib/PublicInbox/XapClient.pm | 50 ++
lib/PublicInbox/XapHelper.pm | 226 +++++++
lib/PublicInbox/XapHelperCxx.pm | 93 +++
lib/PublicInbox/xap_helper.h | 947 ++++++++++++++++++++++++++++
script/public-inbox-cindex | 4 +-
t/xap_helper.t | 175 +++++
14 files changed, 2021 insertions(+), 73 deletions(-)
create mode 100644 lib/PublicInbox/CidxRecvIbx.pm
create mode 100644 lib/PublicInbox/CidxXapHelperAux.pm
create mode 100644 lib/PublicInbox/XapClient.pm
create mode 100644 lib/PublicInbox/XapHelper.pm
create mode 100644 lib/PublicInbox/XapHelperCxx.pm
create mode 100644 lib/PublicInbox/xap_helper.h
create mode 100644 t/xap_helper.t
^ permalink raw reply [flat|nested] 11+ messages in thread
* [PATCH 1/7] search: hoist out shards_dir for future use
2023-08-24 1:22 [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
@ 2023-08-24 1:22 ` Eric Wong
2023-08-24 1:22 ` [PATCH 2/7] cindex: read-only association dump Eric Wong
` (6 subsequent siblings)
7 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2023-08-24 1:22 UTC (permalink / raw)
To: meta
This will be useful for internal tooling and APIs.
---
lib/PublicInbox/Search.pm | 34 +++++++++++++++++++---------------
1 file changed, 19 insertions(+), 15 deletions(-)
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index b2de3450..1559d9b3 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -202,28 +202,32 @@ sub xdir ($;$) {
}
}
-# returns all shards as separate Xapian::Database objects w/o combining
-sub xdb_shards_flat ($) {
+# returns shard directories as an array of strings, does not verify existence
+sub shard_dirs ($) {
my ($self) = @_;
my $xpfx = $self->{xpfx};
- my (@xdb, $slow_phrase);
- load_xapian();
- $self->{qp_flags} //= $QP_FLAGS;
- if ($xpfx =~ m!/xapian[0-9]+\z!) { # v1
- @xdb = ($X{Database}->new($xpfx));
- $self->{qp_flags} |= FLAG_PHRASE() if !-f "$xpfx/iamchert";
- } else { # v2, eidx, cidx
+ if ($xpfx =~ m!/xapian[0-9]+\z!) { # v1 inbox
+ ($xpfx);
+ } else { # v2 inbox, eidx, cidx
opendir(my $dh, $xpfx) or return (); # not initialized yet
# We need numeric sorting so shard[0] is first for reading
# Xapian metadata, if needed
my $last = max(grep(/\A[0-9]+\z/, readdir($dh))) // return ();
- @xdb = map {
- my $shard_dir = "$xpfx/$_";
- $slow_phrase ||= -f "$shard_dir/iamchert";
- $X{Database}->new($shard_dir);
- } (0..$last);
- $self->{qp_flags} |= FLAG_PHRASE() if !$slow_phrase;
+ map { "$xpfx/$_" } (0..$last);
}
+}
+
+# returns all shards as separate Xapian::Database objects w/o combining
+sub xdb_shards_flat ($) {
+ my ($self) = @_;
+ load_xapian();
+ $self->{qp_flags} //= $QP_FLAGS;
+ my $slow_phrase;
+ my @xdb = map {
+ $slow_phrase ||= -f "$_/iamchert";
+ $X{Database}->new($_); # raises if missing
+ } shard_dirs($self);
+ $self->{qp_flags} |= FLAG_PHRASE() if !$slow_phrase;
@xdb;
}
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH 2/7] cindex: read-only association dump
2023-08-24 1:22 [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
2023-08-24 1:22 ` [PATCH 1/7] search: hoist out shards_dir for future use Eric Wong
@ 2023-08-24 1:22 ` Eric Wong
2023-08-24 1:22 ` [PATCH 3/7] cindex: add --show-roots switch Eric Wong
` (5 subsequent siblings)
7 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2023-08-24 1:22 UTC (permalink / raw)
To: meta
This will eventually allow associating coderepos with inboxes
and vice-versa; avoiding the need for manual configuration via
tedious publicinbox.*.coderepo directives.
I'm not sure how this should be stored for WWW, yet, but it's
required since it takes about 8 hours to do this fully across
lore and git.kernel.org.
---
MANIFEST | 3 +
lib/PublicInbox/CidxDumpIbx.pm | 59 +++++
lib/PublicInbox/CidxDumpShardRoots.pm | 73 ++++++
lib/PublicInbox/CidxRecvIbx.pm | 46 ++++
lib/PublicInbox/CodeSearchIdx.pm | 306 ++++++++++++++++++++++----
lib/PublicInbox/Config.pm | 2 +-
lib/PublicInbox/Search.pm | 2 +-
script/public-inbox-cindex | 4 +-
8 files changed, 452 insertions(+), 43 deletions(-)
create mode 100644 lib/PublicInbox/CidxDumpIbx.pm
create mode 100644 lib/PublicInbox/CidxDumpShardRoots.pm
create mode 100644 lib/PublicInbox/CidxRecvIbx.pm
diff --git a/MANIFEST b/MANIFEST
index 1001ca08..162e3038 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -162,7 +162,10 @@ lib/PublicInbox/AltId.pm
lib/PublicInbox/AutoReap.pm
lib/PublicInbox/Cgit.pm
lib/PublicInbox/CidxComm.pm
+lib/PublicInbox/CidxDumpIbx.pm
+lib/PublicInbox/CidxDumpShardRoots.pm
lib/PublicInbox/CidxLogP.pm
+lib/PublicInbox/CidxRecvIbx.pm
lib/PublicInbox/CmdIPC4.pm
lib/PublicInbox/CodeSearch.pm
lib/PublicInbox/CodeSearchIdx.pm
diff --git a/lib/PublicInbox/CidxDumpIbx.pm b/lib/PublicInbox/CidxDumpIbx.pm
new file mode 100644
index 00000000..e1bc273d
--- /dev/null
+++ b/lib/PublicInbox/CidxDumpIbx.pm
@@ -0,0 +1,59 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Intended for PublicInbox::DS::event_loop for -cindex --associate
+# Iterating through mset->items is slow in Perl due to method dispatch
+# and that loop may implemented in C++ using Xapian directly
+package PublicInbox::CidxDumpIbx;
+use v5.12;
+use PublicInbox::Search qw(xap_terms);
+use PublicInbox::DS;
+use Socket qw(MSG_EOR);
+
+sub start {
+ my ($rcvibx, $ibx_id) = @_;
+ my $cidx = $rcvibx->{cidx};
+ my $ibx = $cidx->{IBX}->[$ibx_id] // die "BUG: no IBX[$ibx_id]";
+ my $self = bless { rcvibx => $rcvibx, ekey => $ibx->eidx_key,
+ ibx_id => $ibx_id }, __PACKAGE__;
+ $self->{srch} = $ibx->isrch // do {
+ warn("W: $self->{ekey} has no search index (ignoring)\n");
+ return undef;
+ };
+ my $opt = { limit => $cidx->assoc_max_init, relevance => -2 };
+ $self->{mset} = $self->{srch}->mset($rcvibx->{qry_str}, $opt);
+ $self->{iter} = 0;
+ event_step($self);
+}
+
+sub event_step {
+ my ($self) = @_;
+ my $rcvibx = $self->{rcvibx} // die 'BUG: no rcvibx';
+ return if $rcvibx->{cidx}->do_quit;
+ my $last = $self->{mset}->size - 1;
+ my $cur = $self->{iter};
+ my $end = $cur + 9999;
+ if ($end >= $last) {
+ send($rcvibx->{op_p}, 'index_next', MSG_EOR);
+ $end = $last;
+ }
+ $self->{iter} = $end + 1;
+ local $0 = "dumping $self->{ekey} $cur..$end";
+
+ my $sort_w = $rcvibx->{sort_w};
+ my $ibx_id = $self->{ibx_id};
+ local $0 = "dumping $self->{ekey} $cur..$end";
+ $rcvibx->{cidx}->progress($0);
+ for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop
+ my $doc = $x->get_document;
+ for my $p (@{$rcvibx->{cidx}->{ASSOC_PFX}}) {
+ for (xap_terms($p, $doc)) {
+ print $sort_w "$_ $ibx_id\n" or die "print: $!";
+ }
+ }
+ }
+ $end < $last && !$rcvibx->{cidx}->do_quit and
+ PublicInbox::DS::requeue($self);
+}
+
+1;
diff --git a/lib/PublicInbox/CidxDumpShardRoots.pm b/lib/PublicInbox/CidxDumpShardRoots.pm
new file mode 100644
index 00000000..34afa419
--- /dev/null
+++ b/lib/PublicInbox/CidxDumpShardRoots.pm
@@ -0,0 +1,73 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Intended for PublicInbox::DS::event_loop for -cindex --associate
+# Iterating through mset->items is slow in Perl due to method dispatch
+# and that loop may implemented in C++ using Xapian directly
+package PublicInbox::CidxDumpShardRoots;
+use v5.12;
+use PublicInbox::Lock;
+use PublicInbox::Search qw(xap_terms);
+use Socket qw(MSG_EOR);
+
+sub start {
+ my ($cidx, $root2id, $qry_str) = @_;
+ my $op_p = delete($cidx->{0}) // die 'BUG: no {0} op_p';
+ my $sort_w = delete($cidx->{1}) // die 'BUG: no {1} $w sort pipe';
+ # sort lock is necessary if we have may root ids which cause a
+ # row length to exceed POSIX PIPE_BUF (via `$G' below)
+ my $sort_lk = bless { lock_path => $cidx->tmpdir.'/to_root_id.lock' },
+ 'PublicInbox::Lock';
+ $sort_w->autoflush(1);
+ $cidx->begin_txn_lazy; # only using txn to simplify writer subs
+ my $opt = { limit => $cidx->assoc_max_init, relevance => -2 };
+ my $self = bless {
+ cidx => $cidx,
+ op_p => $op_p,
+ iter => 0,
+ mset => $cidx->mset($qry_str, $opt),
+ root2id => $root2id,
+ sort_w => $sort_w,
+ sort_lk => $sort_lk,
+ }, __PACKAGE__;
+ event_step($self);
+}
+
+sub event_step {
+ my ($self) = @_;
+ my $cidx = $self->{cidx};
+ return if $cidx->do_quit;
+ my $last = $self->{mset}->size - 1;
+ my $cur = $self->{iter};
+ my $end = $cur + 9999;
+ $end = $last if $end > $last;
+ $self->{iter} = $end + 1;
+ local $0 = "dumping shard [$cidx->{shard}] $cur..$end";
+ $cidx->progress($0);
+
+ my $root2id = $self->{root2id};
+ my $buf = '';
+ for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop
+ my $doc = $x->get_document;
+ my $G = join(' ', map {
+ $root2id->{pack('H*', $_)};
+ } xap_terms('G', $doc));
+ for my $p (@{$cidx->{ASSOC_PFX}}) {
+ $buf .= "$_ $G\n" for (xap_terms($p, $doc));
+ }
+ }
+ $self->{sort_lk}->lock_acquire_fast;
+ print { $self->{sort_w} } $buf or die "print: $!";
+ $self->{sort_lk}->lock_release_fast;
+ $end < $last && !$cidx->do_quit and
+ PublicInbox::DS::requeue($self);
+}
+
+sub DESTROY {
+ my ($self) = @_;
+ return if $self->{cidx}->do_quit;
+ send($self->{op_p},
+ "dump_shard_roots_done $self->{cidx}->{shard}", MSG_EOR);
+}
+
+1;
diff --git a/lib/PublicInbox/CidxRecvIbx.pm b/lib/PublicInbox/CidxRecvIbx.pm
new file mode 100644
index 00000000..6add8e54
--- /dev/null
+++ b/lib/PublicInbox/CidxRecvIbx.pm
@@ -0,0 +1,46 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+#
+# dumps all per-inbox info for -cindex --associate
+# integrated into the event loop for signalfd SIGINT handling
+package PublicInbox::CidxRecvIbx;
+use v5.12;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLEXCLUSIVE);
+use Socket qw(MSG_EOR);
+use PublicInbox::CidxDumpIbx;
+
+sub new {
+ my ($cls, $cidx, $qry_str) = @_;
+ my ($op_p, $r_ibx, $sort_w) = delete @$cidx{0..2};
+ $op_p // die 'BUG: no $op_p';
+ $r_ibx // die 'BUG: no $r_ibx';
+ $sort_w // die 'BUG: no $sort_w';
+ my $self = bless {}, $cls;
+ $self->SUPER::new($r_ibx, EPOLLIN|EPOLLEXCLUSIVE);
+ $self->{cidx} = $cidx;
+ $self->{sort_w} = $sort_w;
+ $self->{op_p} = $op_p; # PublicInbox::CidxDumpIbx uses this
+ $self->{qry_str} = $qry_str;
+ # writes to this pipe are never longer than POSIX PIPE_BUF,
+ # so rely on POSIX atomicity guarantees
+ $sort_w->autoflush(1);
+ $self;
+}
+
+sub event_step {
+ my ($self) = @_;
+ recv($self->{sock}, my $ibx_id, 25, 0) // die "recv: $!";
+ return $self->close if $ibx_id eq '' || $self->{cidx}->do_quit;
+ PublicInbox::CidxDumpIbx::start($self, $ibx_id);
+}
+
+sub close {
+ my ($self) = @_;
+ $self->{cidx}->do_quit or
+ send($self->{op_p},
+ "recv_ibx_done $self->{cidx}->{shard}", MSG_EOR);
+ $self->SUPER::close; # PublicInbox::DS::close
+}
+
+1;
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index ba14e52a..2480dbd2 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -11,6 +11,26 @@
#
# We shard repos using the first 32-bits of sha256($ABS_GIT_DIR)
#
+# --associate joins root commits of coderepos to inboxes based on prefixes.
+#
+# Internally, each inbox is assigned a non-negative integer index ($IBX_ID),
+# and each root commit object ID (SHA-1/SHA-256 hex) is also assigned
+# a non-negative integer index ($ROOT_COMMIT_OID_ID).
+#
+# associate dumps to 2 intermediate files in $TMPDIR:
+#
+# * to_root_id - each line is of the format:
+#
+# $PFX $ROOT_COMMIT_OID_ID
+#
+# * to_ibx_id - each line is of the format:
+#
+# $PFX $IBX_ID
+#
+# In both cases, $PFX is typically the value of the patchid (XDFID) but it
+# can be configured to use any combination of patchid, dfpre, dfpost or
+# dfblob.
+#
# See PublicInbox::CodeSearch (read-only API) for more
package PublicInbox::CodeSearchIdx;
use v5.12;
@@ -25,14 +45,14 @@ use File::Spec ();
use PublicInbox::SHA qw(sha256_hex);
use PublicInbox::Search qw(xap_terms);
use PublicInbox::SearchIdx qw(add_val);
-use PublicInbox::Config qw(glob2re);
+use PublicInbox::Config qw(glob2re rel2abs_collapsed);
use PublicInbox::Spawn qw(which spawn popen_rd);
use PublicInbox::OnDestroy;
use PublicInbox::CidxLogP;
use PublicInbox::CidxComm;
use PublicInbox::Git qw(%OFMT2HEXLEN);
use PublicInbox::Compat qw(uniqstr);
-use Socket qw(MSG_EOR);
+use Socket qw(MSG_EOR AF_UNIX SOCK_SEQPACKET);
use Carp ();
our (
$LIVE, # pid => cmd
@@ -55,8 +75,15 @@ our (
%ALT_FH, # hexlen => tmp IO for TMPDIR git alternates
$TMPDIR, # File::Temp->newdir object for prune
@PRUNE_QUEUE, # GIT_DIRs to prepare for pruning
- $PRUNE_ENV, # env for awk(1), comm(1), sort(1) commands during prune
+ %TODO, @IBXQ, @IBX,
+ @JOIN, # join(1) command for associate
+ $CMD_ENV, # env for awk(1), comm(1), sort(1) commands during prune
@AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands
+ @ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST
+ $QRY_STR, # common query string for both code and inbox associations
+ $IBXDIR_FEED, # SOCK_SEQPACKET
+ @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK, # for associate
+ @ID2ROOT,
);
# stop walking history if we see >$SEEN_MAX existing commits, this assumes
@@ -64,6 +91,9 @@ our (
# git walks commits quickly if it doesn't have to read trees
our $SEEN_MAX = 100000;
+# window for commits/emails to determine a inbox <-> coderepo association
+my $ASSOC_MAX = 50000;
+
our @PRUNE_BATCH = qw(git _ cat-file --batch-all-objects --batch-check);
# TODO: do we care about committer name + email? or tree OID?
@@ -455,6 +485,91 @@ sub shard_commit { # via wq_io_do
send($op_p, "shard_done $self->{shard}", MSG_EOR);
}
+sub dump_shard_roots_done { # via PktOp on dump_shard_roots completion
+ my ($self, $associate, $n) = @_;
+ return if $DO_QUIT;
+ progress($self, "dump_shard_roots [$n] done");
+ $DUMP_SHARD_ROOTS_OK[$n] = 1;
+ # may run associate()
+}
+
+sub assoc_max_init ($) {
+ my ($self) = @_;
+ my $max = $self->{-opt}->{'associate-max'} // $ASSOC_MAX;
+ $max = $ASSOC_MAX if !$max;
+ $max < 0 ? ((2 ** 31) - 1) : $max;
+}
+
+# dump the patchids of each shard: $XDFID $ROOT1 $ROOT2..
+sub dump_shard_roots { # via wq_io_do for associate
+ my ($self, $root2id, $qry_str) = @_;
+ PublicInbox::CidxDumpShardRoots::start($self, $root2id, $qry_str);
+}
+
+sub dump_roots_once {
+ my ($self, $associate) = @_;
+ $associate // die 'BUG: no $associate';
+ $TODO{associating} = 1; # keep shards_active() happy
+ progress($self, 'dumping IDs from coderepos');
+ local $self->{xdb};
+ @ID2ROOT = map { pack('H*', $_) } $self->all_terms('G');
+ my $id = 0;
+ my %root2id = map { $_ => $id++ } @ID2ROOT;
+ pipe(my ($r, $w)) or die "pipe: $!";
+ my @sort = (@SORT, '-k1,1');
+ my $dst = "$TMPDIR/to_root_id";
+ open my $fh, '>', $dst or die "open($dst): $!";
+ my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $r, 1 => $fh });
+ close $r or die "close: $!";
+ awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ $c->{ops}->{dump_shard_roots_done} = [ $self, $associate ];
+ my @arg = ('dump_shard_roots', [ $p->{op_p}, $w ], \%root2id, $QRY_STR);
+ $_->wq_io_do(@arg) for @IDX_SHARDS;
+ progress($self, 'waiting on dump_shard_roots sort');
+}
+
+sub recv_ibx_done { # via PktOp on recv_ibx completion
+ my ($self, $pid, $n) = @_;
+ return if $DO_QUIT;
+ progress($self, "recv_ibx [$n] done");
+ $RECV_IBX_OK[$n] = 1;
+}
+
+# causes a worker to become a dumper for inbox/extindex
+sub recv_ibx { # wq_io_do
+ my ($self, $qry_str) = @_;
+ PublicInbox::CidxRecvIbx->new($self, $qry_str);
+}
+
+sub dump_ibx { # sends to PublicInbox::CidxRecvIbx::event_step
+ my ($self, $id_dir) = @_; # id_dir: "$IBX_ID=$INBOXDIR"
+ my $n = length($id_dir);
+ my $w = send($IBXDIR_FEED, $id_dir, MSG_EOR) // die "send: $!";
+ $n == $w or die "send($id_dir) $w != $n";
+}
+
+# repurpose shard workers to dump inbox patchids with perfect balance
+sub dump_ibx_start {
+ my ($self, $associate) = @_;
+ pipe(my ($sort_r, $sort_w)) or die "pipe: $!";
+ my @sort = (@SORT, '-k1,1');
+ my $dst = "$TMPDIR/to_ibx_id";
+ open my $fh, '>', $dst or die "open($dst): $!";
+ my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fh });
+ close $sort_r or die "close: $!";
+ awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
+
+ my ($r, $w);
+ socketpair($r, $w, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!";
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ $c->{ops}->{recv_ibx_done} = [ $self, $associate ];
+ $c->{ops}->{index_next} = [ $self ];
+ my $io = [ $p->{op_p}, $r, $sort_w ];
+ $_->wq_io_do('recv_ibx', $io, $QRY_STR) for @IDX_SHARDS;
+ $IBXDIR_FEED = $w;
+}
+
sub index_next ($) {
my ($self) = @_;
return if $DO_QUIT;
@@ -466,6 +581,12 @@ sub index_next ($) {
$self, $git);
fp_start($self, $git, $prep_repo);
ct_start($self, $git, $prep_repo);
+ } elsif ($TMPDIR) {
+ delete $TODO{dump_ibx_start}; # runs OnDestroy once
+ return dump_ibx($self, shift @IBXQ) if @IBXQ;
+ progress($self, 'done dumping inboxes') if $IBXDIR_FEED;
+ undef $IBXDIR_FEED; # done dumping inboxes, dump roots
+ dump_roots_once($self, delete($TODO{associate}) // return);
}
# else: wait for shards_active (post_loop_do) callback
}
@@ -502,7 +623,7 @@ sub commit_shard { # OnDestroy cb
for my $n (keys %$active) {
$IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]);
}
- undef $p; # shard_done fires when all shards are committed
+ # shard_done fires when all shards are committed
}
sub index_repo { # cidx_await cb
@@ -628,8 +749,8 @@ EOM
sub scan_git_dirs ($) {
my ($self) = @_;
- @$GIT_TODO = @{$self->{git_dirs}};
- index_next($self) for (1..$LIVE_JOBS);
+ my $n = @$GIT_TODO = @{$self->{git_dirs}};
+ progress($self, "scanning $n code repositories...");
}
sub prune_do { # via wq_io_do in IDX_SHARDS
@@ -661,7 +782,7 @@ sub shards_active { # post_loop_do
return if grep(defined, $PRUNE_DONE, $GIT_TODO, $IDX_TODO, $LIVE) != 4;
return 1 if grep(defined, @$PRUNE_DONE) != @IDX_SHARDS;
return 1 if scalar(@$GIT_TODO) || scalar(@$IDX_TODO) || $REPO_CTX;
- return 1 if keys(%$LIVE);
+ return 1 if keys(%$LIVE) || @IBXQ || keys(%TODO);
for my $s (grep { $_->{-wq_s1} } @IDX_SHARDS) {
$s->{-cidx_quit} = 1 if defined($s->{-wq_s1});
$s->wq_close; # may recurse via awaitpid outside of event_loop
@@ -674,6 +795,7 @@ sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) }
sub parent_quit {
$DO_QUIT = POSIX->can("SIG$_[0]")->();
+ $IBXDIR_FEED = undef;
kill_shards(@_);
warn "# SIG$_[0] received, quitting...\n";
}
@@ -717,6 +839,7 @@ sub prep_alternate_end { # awaitpid callback for config extensions.objectFormat
E: ignoring objdir=$objdir, unknown extensions.objectFormat=$fmt
EOM
unless ($ALT_FH{$hexlen}) {
+ require PublicInbox::Import;
my $git_dir = "$TMPDIR/hexlen$hexlen.git";
PublicInbox::Import::init_bare($git_dir, 'cidx-all', $fmt);
my $f = "$git_dir/objects/info/alternates";
@@ -739,52 +862,102 @@ sub prep_alternate_start {
awaitpid($pid, \&prep_alternate_end, $o, $out, $run_prune);
}
-sub prune_cmd_done { # awaitpid cb for sort, xapian-delve, sed failures
- my ($pid, $cmd, $run_prune) = @_;
+sub cmd_done { # awaitpid cb for sort, xapian-delve, sed failures
+ my ($pid, $cmd, $run_on_destroy) = @_;
$? and die "@$cmd failed: \$?=$?";
+ # $run_on_destroy calls associate() or run_prune()
+}
+
+# runs once all inboxes and shards are dumped via OnDestroy
+sub associate {
+ my ($self) = @_;
+ return if $DO_QUIT;
+ @IDX_SHARDS or return warn("# aborting on no shards\n");
+ grep(defined, @DUMP_SHARD_ROOTS_OK) == @IDX_SHARDS or
+ die "E: shards not dumped properly\n";
+ grep(defined, @RECV_IBX_OK) == @IDX_SHARDS or
+ die "E: inboxes not dumped properly\n";
+ progress($self, 'associating...');
+ my @join = ('time', @JOIN, 'to_ibx_id', 'to_root_id');
+ my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" });
+ my %score;
+ while (<$rd>) { # PFX ibx_id root_id
+ my (undef, $ibx_id, @root_id) = split(/ /, $_);
+ ++$score{"$ibx_id $_"} for @root_id;
+ }
+ close $rd or die "@join failed: $?=$?";
+ my $min = $self->{-opt}->{'assoc-min'} // 10;
+ progress($self, scalar(keys %score).' potential pairings...');
+ for my $k (keys %score) {
+ my $nr = $score{$k};
+ my ($ibx_id, $root) = split(/ /, $k);
+ my $ekey = $IBX[$ibx_id]->eidx_key;
+ $root = unpack('H*', $ID2ROOT[$root]);
+ progress($self, "$ekey => $root has $nr matches");
+ }
+ delete $TODO{associating}; # break out of shards_active()
+ # TODO
+ warn "# Waiting for $TMPDIR/cont @JOIN";
+ system "ls -Rl $TMPDIR >&2";
+ system "wc -l $TMPDIR/to_*_id >&2";
+ #sleep(1) until -f "$TMPDIR/cont";
+ # warn "# Waiting for $TMPDIR/cont";
+ # sleep(1) until -f "$TMPDIR/cont";
+}
+
+sub require_progs {
+ my $op = shift;
+ while (my ($x, $argv) = splice(@_, 0, 2)) {
+ my $e = $x;
+ $e =~ tr/a-z-/A-Z_/;
+ my $c = $ENV{$e} // $x;
+ $argv->[0] //= which($c) // die "E: `$x' required for --$op\n";
+ }
+}
+
+sub init_associate_postfork ($) {
+ my ($self) = @_;
+ return unless $self->{-opt}->{associate};
+ require_progs('associate', join => \@JOIN);
+ $QRY_STR = $self->{-opt}->{'associate-date-range'} // '1.year.ago..';
+ substr($QRY_STR, 0, 0) = 'dt:';
+ scalar(@{$self->{git_dirs} // []}) or die <<EOM;
+E: no coderepos to associate
+EOM
+ my $approx_git = PublicInbox::Git->new($self->{git_dirs}->[0]); # ugh
+ $self->query_approxidate($approx_git, $QRY_STR); # in-place
+ $TODO{associate} = PublicInbox::OnDestroy->new($$, \&associate, $self);
+ $TODO{dump_ibx_start} = PublicInbox::OnDestroy->new($$,
+ \&dump_ibx_start, $self, $TODO{associate});
+ my $id = -1;
+ @IBXQ = map { ++$id } @IBX;
}
sub init_prune ($) {
my ($self) = @_;
return (@$PRUNE_DONE = map { 1 } @IDX_SHARDS) if !$self->{-opt}->{prune};
- require File::Temp;
- require PublicInbox::Import;
- $TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
-
# Dealing with millions of commits here at once, so use faster tools.
# xapian-delve is nearly an order-of-magnitude faster than Xapian Perl
# bindings. sed/awk are faster than Perl for simple stream ops, and
# sort+comm are more memory-efficient with gigantic lists
my @delve = (undef, qw(-A Q -1));
my @sed = (undef, '-ne', 's/^Q//p');
- @SORT = (undef, '-u');
@COMM = (undef, qw(-2 -3 indexed_commits -));
@AWK = (undef, '$2 == "commit" { print $1 }'); # --batch-check output
- my @x = ('xapian-delve' => \@delve, sed => \@sed,
- sort => \@SORT, comm => \@COMM, awk => \@AWK);
- while (my ($x, $argv) = splice(@x, 0, 2)) {
- my $e = $x;
- $e =~ tr/a-z-/A-Z_/;
- my $c = $ENV{$e} // $x;
- $argv->[0] = which($c) // die "E: `$x' required for --prune\n";
- }
+ require_progs('prune', 'xapian-delve' => \@delve, sed => \@sed,
+ comm => \@COMM, awk => \@AWK);
for (0..$#IDX_SHARDS) { push @delve, "$self->{xpfx}/$_" }
- for (qw(parallel compress-program buffer-size)) { # GNU sort options
- my $v = $self->{-opt}->{"sort-$_"};
- push @SORT, "--$_=$v" if defined $v;
- }
my $run_prune = PublicInbox::OnDestroy->new($$, \&run_prune, $self);
pipe(my ($sed_in, $delve_out)) or die "pipe: $!";
pipe(my ($sort_in, $sed_out)) or die "pipe: $!";
open(my $sort_out, '+>', "$TMPDIR/indexed_commits") or die "open: $!";
- $PRUNE_ENV = { TMPDIR => "$TMPDIR", LC_ALL => 'C', LANG => 'C' };
- my $pid = spawn(\@SORT, $PRUNE_ENV, { 0 => $sort_in, 1 => $sort_out });
- awaitpid($pid, \&prune_cmd_done, \@SORT, $run_prune);
- $pid = spawn(\@sed, $PRUNE_ENV, { 0 => $sed_in, 1 => $sed_out });
- awaitpid($pid, \&prune_cmd_done, \@sed, $run_prune);
+ my $pid = spawn(\@SORT, $CMD_ENV, { 0 => $sort_in, 1 => $sort_out });
+ awaitpid($pid, \&cmd_done, \@SORT, $run_prune);
+ $pid = spawn(\@sed, $CMD_ENV, { 0 => $sed_in, 1 => $sed_out });
+ awaitpid($pid, \&cmd_done, \@sed, $run_prune);
$pid = spawn(\@delve, undef, { 1 => $delve_out });
- awaitpid($pid, \&prune_cmd_done, \@delve, $run_prune);
+ awaitpid($pid, \&cmd_done, \@delve, $run_prune);
@PRUNE_QUEUE = @{$self->{git_dirs}};
for (1..$LIVE_JOBS) {
prep_alternate_start(shift(@PRUNE_QUEUE) // last, $run_prune);
@@ -809,14 +982,14 @@ sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done
pipe(my ($awk_in, $batch_out)) or die "pipe: $!";
pipe(my ($sort_in, $awk_out)) or die "pipe: $!";
pipe(my ($comm_in, $sort_out)) or die "pipe: $!";
- my $awk_pid = spawn(\@AWK, $PRUNE_ENV, { 0 => $awk_in, 1 => $awk_out });
- my $sort_pid = spawn(\@SORT, $PRUNE_ENV,
+ my $awk_pid = spawn(\@AWK, $CMD_ENV, { 0 => $awk_in, 1 => $awk_out });
+ my $sort_pid = spawn(\@SORT, $CMD_ENV,
{ 0 => $sort_in, 1 => $sort_out });
- my ($comm_rd, $comm_pid) = popen_rd(\@COMM, $PRUNE_ENV,
+ my ($comm_rd, $comm_pid) = popen_rd(\@COMM, $CMD_ENV,
{ 0 => $comm_in, -C => "$TMPDIR" });
- awaitpid($awk_pid, \&prune_cmd_done, \@AWK);
- awaitpid($sort_pid, \&prune_cmd_done, \@SORT);
- awaitpid($comm_pid, \&prune_cmd_done, \@COMM);
+ awaitpid($awk_pid, \&cmd_done, \@AWK);
+ awaitpid($sort_pid, \&cmd_done, \@SORT);
+ awaitpid($comm_pid, \&cmd_done, \@COMM);
PublicInbox::CidxComm->new($comm_rd, $self); # calls cidx_read_comm
my $git_ver = PublicInbox::Git::git_version();
push @PRUNE_BATCH, '--buffer' if $git_ver ge v2.6;
@@ -856,6 +1029,35 @@ sub cidx_read_comm { # via PublicInbox::CidxComm::event_step
for (@gone) { close $_ or die "close: $!" };
}
+sub init_associate_prefork ($) {
+ my ($self) = @_;
+ return unless $self->{-opt}->{associate};
+ require PublicInbox::CidxRecvIbx;
+ require PublicInbox::CidxDumpShardRoots;
+ $self->{-pi_cfg} = PublicInbox::Config->new;
+ my @unknown;
+ my @pfx = @{$self->{-opt}->{'associate-prefixes'} // [ 'patchid' ]};
+ for (map { split(/\s*,\s*/) } @pfx) {
+ my $v = $PublicInbox::Search::PATCH_BOOL_COMMON{$_} //
+ push(@unknown, $_);
+ push(@ASSOC_PFX, split(/ /, $v));
+ }
+ die <<EOM if @unknown;
+--associate-prefixes contains unsupported prefixes: @unknown
+EOM
+ @ASSOC_PFX = uniqstr @ASSOC_PFX;
+ my %incl = map {
+ rel2abs_collapsed($_) => undef;
+ } (@{$self->{-opt}->{include} // []});
+ $self->{-pi_cfg}->each_inbox(\&_prep_ibx, $self, \%incl);
+}
+
+sub _prep_ibx { # each_inbox callback
+ my ($ibx, $self, $incl) = @_;
+ ($self->{-opt}->{all} || exists($incl->{$ibx->{inboxdir}})) and
+ push @{$self->{IBX}}, $ibx;
+}
+
sub cidx_run { # main entry point
my ($self) = @_;
my $restore_umask = prep_umask($self);
@@ -868,10 +1070,27 @@ sub cidx_run { # main entry point
local $IDX_TODO = [];
local $GIT_TODO = [];
local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
- $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, @SORT, $PRUNE_ENV);
+ $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
+ %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $IBXDIR_FEED, @ID2ROOT,
+ @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK);
local $BATCH_BYTES = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
- local @IDX_SHARDS = cidx_init($self);
+ local @SORT = (undef, '-u');
+ local $self->{IBX} = \@IBX;
+ local $self->{ASSOC_PFX} = \@ASSOC_PFX;
+ local $self->{-pi_cfg};
+ if (grep { $_ } @{$self->{-opt}}{qw(prune associate)}) {
+ require File::Temp;
+ $TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
+ $CMD_ENV = { TMPDIR => "$TMPDIR", LC_ALL => 'C', LANG => 'C' };
+ require_progs('(prune|associate)', sort => \@SORT);
+ for (qw(parallel compress-program buffer-size)) { # GNU sort
+ my $v = $self->{-opt}->{"sort-$_"};
+ push @SORT, "--$_=$v" if defined $v;
+ }
+ init_associate_prefork($self)
+ }
+ local @IDX_SHARDS = cidx_init($self); # forks workers
local $self->{current_info} = '';
local $MY_SIG = {
CHLD => \&PublicInbox::DS::enqueue_reap,
@@ -919,7 +1138,9 @@ sub cidx_run { # main entry point
PublicInbox::IPC::detect_nproc() || 2;
local @RDONLY_XDB = $self->xdb_shards_flat;
init_prune($self);
+ init_associate_postfork($self);
scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
+ index_next($self) for (1..$LIVE_JOBS);
# FreeBSD ignores/discards SIGCHLD while signals are blocked and
# EVFILT_SIGNAL is inactive, so we pretend we have a SIGCHLD pending
@@ -954,4 +1175,9 @@ sub shard_done_wait { # awaitpid cb via ipc_worker_reap
PublicInbox::DS::enqueue_reap() if !shards_active(); # once more for PLC
}
+sub do_quit { $DO_QUIT }
+
+sub tmpdir { $TMPDIR }
+
+
1;
diff --git a/lib/PublicInbox/Config.pm b/lib/PublicInbox/Config.pm
index 2f1b4122..0a6b210f 100644
--- a/lib/PublicInbox/Config.pm
+++ b/lib/PublicInbox/Config.pm
@@ -11,7 +11,7 @@ package PublicInbox::Config;
use strict;
use v5.10.1;
use parent qw(Exporter);
-our @EXPORT_OK = qw(glob2re);
+our @EXPORT_OK = qw(glob2re rel2abs_collapsed);
use PublicInbox::Inbox;
use PublicInbox::Spawn qw(popen_rd);
our $LD_PRELOAD = $ENV{LD_PRELOAD}; # only valid at startup
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index 1559d9b3..d5b0bceb 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -197,7 +197,7 @@ sub xdir ($;$) {
my ($self, $rdonly) = @_;
if ($rdonly || !defined($self->{shard})) {
$self->{xpfx};
- } else { # v2 + extindex only:
+ } else { # v2, extindex, cindex only:
"$self->{xpfx}/$self->{shard}";
}
}
diff --git a/script/public-inbox-cindex b/script/public-inbox-cindex
index 2f7796e7..888c8b10 100755
--- a/script/public-inbox-cindex
+++ b/script/public-inbox-cindex
@@ -26,8 +26,10 @@ See public-inbox-cindex(1) man page for full documentation.
EOF
my $opt = { fsync => 1, scan => 1 }; # --no-scan is hidden
GetOptions($opt, qw(quiet|q verbose|v+ reindex jobs|j=i fsync|sync! dangerous
- indexlevel|index-level|L=s
+ indexlevel|index-level|L=s associate associate-max=i
+ associate-date-range=s associate-prefixes=s@
batch_size|batch-size=s max_size|max-size=s
+ include|I=s@ only=s@ all
project-list=s exclude=s@
sort-parallel=s sort-compress-program=s sort-buffer-size=s
d=s update|u scan! prune dry-run|n C=s@ help|h))
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH 3/7] cindex: add --show-roots switch
2023-08-24 1:22 [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
2023-08-24 1:22 ` [PATCH 1/7] search: hoist out shards_dir for future use Eric Wong
2023-08-24 1:22 ` [PATCH 2/7] cindex: read-only association dump Eric Wong
@ 2023-08-24 1:22 ` Eric Wong
2023-08-24 1:22 ` [PATCH 4/7] introduce optional C++ xap_helper Eric Wong
` (4 subsequent siblings)
7 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2023-08-24 1:22 UTC (permalink / raw)
To: meta
This aids in development, but I'm not sure it's going to stay
or be moved into another interface.
---
lib/PublicInbox/CodeSearchIdx.pm | 32 ++++++++++++++++++++++++++++++++
script/public-inbox-cindex | 2 +-
2 files changed, 33 insertions(+), 1 deletion(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 2480dbd2..e795c2b3 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -1058,6 +1058,37 @@ sub _prep_ibx { # each_inbox callback
push @{$self->{IBX}}, $ibx;
}
+sub show_roots { # for diagnostics
+ my ($self) = @_;
+ local $self->{xdb};
+ my $cur = $self->xdb->allterms_begin('G');
+ my $end = $self->{xdb}->allterms_end('G');
+ my $qrepo = $PublicInbox::Search::X{Query}->new('T'.'r');
+ my $enq = $PublicInbox::Search::X{Enquire}->new($self->{xdb});
+ $enq->set_weighting_scheme($PublicInbox::Search::X{BoolWeight}->new);
+ $enq->set_docid_order($PublicInbox::Search::ENQ_ASCENDING);
+ for (; $cur != $end; $cur++) {
+ my $G_oidhex = $cur->get_termname;
+ my $qry = $PublicInbox::Search::X{Query}->new(
+ PublicInbox::Search::OP_FILTER(),
+ $qrepo, $G_oidhex);
+ $enq->set_query($qry);
+ my ($off, $lim) = (0, 10000);
+ say 'commit ',substr($G_oidhex, 1), ' appears in:';
+ while (1) {
+ my $mset = $enq->get_mset($off, $lim);
+ my $size = $mset->size or last;
+ for my $x ($mset->items) {
+ my $doc = $x->get_document;
+ for (xap_terms('P', $x->get_document)) {
+ say '- /', substr($_, 1);
+ }
+ }
+ $off += $size;
+ }
+ }
+}
+
sub cidx_run { # main entry point
my ($self) = @_;
my $restore_umask = prep_umask($self);
@@ -1150,6 +1181,7 @@ sub cidx_run { # main entry point
PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active();
PublicInbox::DS->Reset;
$self->lock_release(!!$NCHANGE);
+ show_roots($self) if $self->{-opt}->{'show-roots'} # for diagnostics
}
sub ipc_atfork_child { # @IDX_SHARDS
diff --git a/script/public-inbox-cindex b/script/public-inbox-cindex
index 888c8b10..0526434c 100755
--- a/script/public-inbox-cindex
+++ b/script/public-inbox-cindex
@@ -29,7 +29,7 @@ GetOptions($opt, qw(quiet|q verbose|v+ reindex jobs|j=i fsync|sync! dangerous
indexlevel|index-level|L=s associate associate-max=i
associate-date-range=s associate-prefixes=s@
batch_size|batch-size=s max_size|max-size=s
- include|I=s@ only=s@ all
+ include|I=s@ only=s@ all show-roots
project-list=s exclude=s@
sort-parallel=s sort-compress-program=s sort-buffer-size=s
d=s update|u scan! prune dry-run|n C=s@ help|h))
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH 4/7] introduce optional C++ xap_helper
2023-08-24 1:22 [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
` (2 preceding siblings ...)
2023-08-24 1:22 ` [PATCH 3/7] cindex: add --show-roots switch Eric Wong
@ 2023-08-24 1:22 ` Eric Wong
2023-08-24 11:23 ` Štěpán Němec
2023-08-24 1:22 ` [PATCH 5/7] cindex: fix sorting and uniqueness Eric Wong
` (3 subsequent siblings)
7 siblings, 1 reply; 11+ messages in thread
From: Eric Wong @ 2023-08-24 1:22 UTC (permalink / raw)
To: meta
This allows us to perform the expensive "dump_ibx" operations in
native C++ code using the Xapian C++ library. This provides the
majority of the speedup with the -cindex --associate switch.
Eventually this may be expanded to cover all uses of Xapian
within the project to ensure we have access to Xapian APIs which
aren't available in XS|SWIG bindings; and also for
ease-of-installation on systems which don't provide
pre-packaged Perl Xapian bindings (e.g. OpenBSD 7.3) but
do provide Xapian development libraries.
Most of the C++ code is still C, as I'm not remotely familiar
with C++ compared to C. I suspect many users and potential
hackers being from git, Linux kernel, and glibc world are in the
same boat.
---
MANIFEST | 7 +-
lib/PublicInbox/CidxDumpIbx.pm | 59 ---
lib/PublicInbox/CidxXapHelperAux.pm | 48 ++
lib/PublicInbox/CodeSearch.pm | 54 ++-
lib/PublicInbox/CodeSearchIdx.pm | 78 ++--
lib/PublicInbox/Isearch.pm | 5 +
lib/PublicInbox/Search.pm | 56 ++-
lib/PublicInbox/XapClient.pm | 50 +++
lib/PublicInbox/XapHelper.pm | 144 ++++++
lib/PublicInbox/XapHelperCxx.pm | 93 ++++
lib/PublicInbox/xap_helper.h | 654 ++++++++++++++++++++++++++++
t/xap_helper.t | 147 +++++++
12 files changed, 1278 insertions(+), 117 deletions(-)
delete mode 100644 lib/PublicInbox/CidxDumpIbx.pm
create mode 100644 lib/PublicInbox/CidxXapHelperAux.pm
create mode 100644 lib/PublicInbox/XapClient.pm
create mode 100644 lib/PublicInbox/XapHelper.pm
create mode 100644 lib/PublicInbox/XapHelperCxx.pm
create mode 100644 lib/PublicInbox/xap_helper.h
create mode 100644 t/xap_helper.t
diff --git a/MANIFEST b/MANIFEST
index 162e3038..4f61af42 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -162,10 +162,10 @@ lib/PublicInbox/AltId.pm
lib/PublicInbox/AutoReap.pm
lib/PublicInbox/Cgit.pm
lib/PublicInbox/CidxComm.pm
-lib/PublicInbox/CidxDumpIbx.pm
lib/PublicInbox/CidxDumpShardRoots.pm
lib/PublicInbox/CidxLogP.pm
lib/PublicInbox/CidxRecvIbx.pm
+lib/PublicInbox/CidxXapHelperAux.pm
lib/PublicInbox/CmdIPC4.pm
lib/PublicInbox/CodeSearch.pm
lib/PublicInbox/CodeSearchIdx.pm
@@ -368,8 +368,12 @@ lib/PublicInbox/WwwListing.pm
lib/PublicInbox/WwwStatic.pm
lib/PublicInbox/WwwStream.pm
lib/PublicInbox/WwwText.pm
+lib/PublicInbox/XapClient.pm
+lib/PublicInbox/XapHelper.pm
+lib/PublicInbox/XapHelperCxx.pm
lib/PublicInbox/Xapcmd.pm
lib/PublicInbox/gcf2_libgit2.h
+lib/PublicInbox/xap_helper.h
sa_config/Makefile
sa_config/README
sa_config/root/etc/spamassassin/public-inbox.pre
@@ -610,6 +614,7 @@ t/www_altid.t
t/www_listing.t
t/www_static.t
t/x-unknown-alpine.eml
+t/xap_helper.t
t/xcpdb-reshard.t
version-gen.perl
xt/cmp-msgstr.t
diff --git a/lib/PublicInbox/CidxDumpIbx.pm b/lib/PublicInbox/CidxDumpIbx.pm
deleted file mode 100644
index e1bc273d..00000000
--- a/lib/PublicInbox/CidxDumpIbx.pm
+++ /dev/null
@@ -1,59 +0,0 @@
-# Copyright (C) all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-
-# Intended for PublicInbox::DS::event_loop for -cindex --associate
-# Iterating through mset->items is slow in Perl due to method dispatch
-# and that loop may implemented in C++ using Xapian directly
-package PublicInbox::CidxDumpIbx;
-use v5.12;
-use PublicInbox::Search qw(xap_terms);
-use PublicInbox::DS;
-use Socket qw(MSG_EOR);
-
-sub start {
- my ($rcvibx, $ibx_id) = @_;
- my $cidx = $rcvibx->{cidx};
- my $ibx = $cidx->{IBX}->[$ibx_id] // die "BUG: no IBX[$ibx_id]";
- my $self = bless { rcvibx => $rcvibx, ekey => $ibx->eidx_key,
- ibx_id => $ibx_id }, __PACKAGE__;
- $self->{srch} = $ibx->isrch // do {
- warn("W: $self->{ekey} has no search index (ignoring)\n");
- return undef;
- };
- my $opt = { limit => $cidx->assoc_max_init, relevance => -2 };
- $self->{mset} = $self->{srch}->mset($rcvibx->{qry_str}, $opt);
- $self->{iter} = 0;
- event_step($self);
-}
-
-sub event_step {
- my ($self) = @_;
- my $rcvibx = $self->{rcvibx} // die 'BUG: no rcvibx';
- return if $rcvibx->{cidx}->do_quit;
- my $last = $self->{mset}->size - 1;
- my $cur = $self->{iter};
- my $end = $cur + 9999;
- if ($end >= $last) {
- send($rcvibx->{op_p}, 'index_next', MSG_EOR);
- $end = $last;
- }
- $self->{iter} = $end + 1;
- local $0 = "dumping $self->{ekey} $cur..$end";
-
- my $sort_w = $rcvibx->{sort_w};
- my $ibx_id = $self->{ibx_id};
- local $0 = "dumping $self->{ekey} $cur..$end";
- $rcvibx->{cidx}->progress($0);
- for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop
- my $doc = $x->get_document;
- for my $p (@{$rcvibx->{cidx}->{ASSOC_PFX}}) {
- for (xap_terms($p, $doc)) {
- print $sort_w "$_ $ibx_id\n" or die "print: $!";
- }
- }
- }
- $end < $last && !$rcvibx->{cidx}->do_quit and
- PublicInbox::DS::requeue($self);
-}
-
-1;
diff --git a/lib/PublicInbox/CidxXapHelperAux.pm b/lib/PublicInbox/CidxXapHelperAux.pm
new file mode 100644
index 00000000..c9a5ddad
--- /dev/null
+++ b/lib/PublicInbox/CidxXapHelperAux.pm
@@ -0,0 +1,48 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Intended for PublicInbox::DS::event_loop for -cindex --associate,
+# this reports auxilliary status while dumping
+package PublicInbox::CidxXapHelperAux;
+use v5.12;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN);
+
+# rpipe connects to req->fp[1] in xap_helper.h
+sub new {
+ my ($cls, $rpipe, $cidx, $pfx, $associate) = @_;
+ my $self = bless {
+ cidx => $cidx,
+ pfx => $pfx,
+ associate => $associate
+ }, $cls;
+ $rpipe->blocking(0);
+ $self->SUPER::new($rpipe, EPOLLIN);
+}
+
+sub event_step {
+ my ($self) = @_; # xap_helper.h is line-buffered
+ my $buf = delete($self->{buf}) // '';
+ my $n = sysread($self->{sock}, $buf, 65536, length($buf));
+ if (!defined($n)) {
+ return if $!{EAGAIN};
+ die "sysread: $!";
+ }
+ my $pfx = $self->{pfx};
+ if ($n == 0) {
+ $self->{cidx}->progress("$pfx $buf") if $buf ne '';
+ return $self->close;
+ }
+ my @lines = split(/^/m, $buf);
+ $self->{buf} = pop @lines if substr($lines[-1], -1) ne "\n";
+ for my $l (@lines) {
+ if ($l =~ /\Amset\.size=[0-9]+\n\z/) {
+ delete $self->{cidx}->{PENDING}->{$pfx};
+ $self->{cidx}->index_next;
+ }
+ chomp $l;
+ $self->{cidx}->progress("$pfx $l");
+ }
+}
+
+1;
diff --git a/lib/PublicInbox/CodeSearch.pm b/lib/PublicInbox/CodeSearch.pm
index a5ccce03..6234e259 100644
--- a/lib/PublicInbox/CodeSearch.pm
+++ b/lib/PublicInbox/CodeSearch.pm
@@ -16,6 +16,12 @@ use constant {
# in refs/{heads,tags}. AT(col=0) may be used to store disk usage
# in the future, but disk usage calculation is espensive w/ alternates
};
+our @CODE_NRP;
+our @CODE_VMAP = (
+ [ AT, 'd:' ], # mairix compat
+ [ AT, 'dt:' ], # mail compat
+ [ CT, 'ct:' ],
+);
# note: the non-X term prefix allocations are shared with Xapian omega,
# see xapian-applications/omega/docs/termprefixes.rst
@@ -45,15 +51,17 @@ sub new {
bless { xpfx => "$dir/cidx".CIDX_SCHEMA_VER }, $cls;
}
-sub cqparse_new ($) {
+sub qparse_new ($) {
my ($self) = @_;
my $qp = $self->qp_init_common;
my $cb = $qp->can('add_valuerangeprocessor') //
$qp->can('add_rangeprocessor'); # Xapian 1.5.0+
- $cb->($qp, $PublicInbox::Search::NVRP->new(AT, 'd:')); # mairix compat
- $cb->($qp, $PublicInbox::Search::NVRP->new(AT, 'dt:')); # mail compat
- $cb->($qp, $PublicInbox::Search::NVRP->new(CT, 'ct:'));
-
+ if (!@CODE_NRP) {
+ @CODE_NRP = map {
+ $PublicInbox::Search::NVRP->new(@$_)
+ } @CODE_VMAP;
+ }
+ $cb->($qp, $_) for @CODE_NRP;
while (my ($name, $pfx) = each %bool_pfx_external) {
$qp->add_boolean_prefix($name, $_) for split(/ /, $pfx);
}
@@ -63,6 +71,40 @@ sub cqparse_new ($) {
$qp;
}
+sub generate_cxx () { # generates snippet for xap_helper.h
+ my ($line, $file) = (__LINE__ + 2, __FILE__);
+ my $ret = <<EOM;
+# line ${\__LINE__} "${\__FILE__}"
+static NRP *code_nrp[${\scalar(@CODE_VMAP)}];
+static void code_nrp_init(void)
+{
+EOM
+ for (0..$#CODE_VMAP) {
+ my $x = $CODE_VMAP[$_];
+ $ret .= qq{\tcode_nrp[$_] = new NRP($x->[0], "$x->[1]");\n}
+ }
+$ret .= <<EOM;
+}
+
+# line ${\__LINE__} "${\__FILE__}"
+static void qp_init_code_search(Xapian::QueryParser *qp)
+{
+ for (size_t i = 0; i < MY_ARRAY_SIZE(code_nrp); i++)
+ qp->ADD_RP(code_nrp[i]);
+EOM
+ for my $name (sort keys %bool_pfx_external) {
+ for (split(/ /, $bool_pfx_external{$name})) {
+ $ret .= qq{\tqp->add_boolean_prefix("$name", "$_");\n}
+ }
+ }
+ for my $name (sort keys %prob_prefix) {
+ for (split(/ /, $prob_prefix{$name})) {
+ $ret .= qq{\tqp->add_prefix("$name", "$_");\n}
+ }
+ }
+ $ret .= "}\n";
+}
+
# returns a Xapian::Query to filter by roots
sub roots_filter { # retry_reopen callback
my ($self, $git_dir) = @_;
@@ -89,7 +131,7 @@ sub roots_filter { # retry_reopen callback
sub mset {
my ($self, $qry_str, $opt) = @_;
- my $qp = $self->{qp} //= cqparse_new($self);
+ my $qp = $self->{qp} //= qparse_new($self);
my $qry = $qp->parse_query($qry_str, $self->{qp_flags});
# limit to commits with shared roots
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index e795c2b3..b8afecd2 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -42,6 +42,7 @@ use PublicInbox::IPC qw(nproc_shards);
use POSIX qw(WNOHANG SEEK_SET);
use File::Path ();
use File::Spec ();
+use List::Util qw(max);
use PublicInbox::SHA qw(sha256_hex);
use PublicInbox::Search qw(xap_terms);
use PublicInbox::SearchIdx qw(add_val);
@@ -69,6 +70,9 @@ our (
@GIT_DIR_GONE, # [ git_dir1, git_dir2 ]
$PRUNE_DONE, # marks off prune completions
$NCHANGE, # current number of changes
+ $NPROC,
+ $XH_PID, # XapHelper PID
+ $XHC, # XapClient
$REPO_CTX, # current repo being indexed in shards
$IDX_TODO, # [ $git0, $root0, $git1, $root1, ...]
$GIT_TODO, # [ GIT_DIR0, GIT_DIR1, ...]
@@ -81,8 +85,8 @@ our (
@AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands
@ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST
$QRY_STR, # common query string for both code and inbox associations
- $IBXDIR_FEED, # SOCK_SEQPACKET
- @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK, # for associate
+ @DUMP_SHARD_ROOTS_OK, # for associate
+ $DUMP_IBX_WPIPE, # goes to sort(1)
@ID2ROOT,
);
@@ -529,45 +533,29 @@ sub dump_roots_once {
progress($self, 'waiting on dump_shard_roots sort');
}
-sub recv_ibx_done { # via PktOp on recv_ibx completion
- my ($self, $pid, $n) = @_;
- return if $DO_QUIT;
- progress($self, "recv_ibx [$n] done");
- $RECV_IBX_OK[$n] = 1;
-}
-
-# causes a worker to become a dumper for inbox/extindex
-sub recv_ibx { # wq_io_do
- my ($self, $qry_str) = @_;
- PublicInbox::CidxRecvIbx->new($self, $qry_str);
-}
-
-sub dump_ibx { # sends to PublicInbox::CidxRecvIbx::event_step
- my ($self, $id_dir) = @_; # id_dir: "$IBX_ID=$INBOXDIR"
- my $n = length($id_dir);
- my $w = send($IBXDIR_FEED, $id_dir, MSG_EOR) // die "send: $!";
- $n == $w or die "send($id_dir) $w != $n";
+sub dump_ibx { # sends to xap_helper.h
+ my ($self, $ibx_id) = @_;
+ my $ibx = $IBX[$ibx_id] // die "BUG: no IBX[$ibx_id]";
+ my @cmd = ('dump_ibx', $ibx->isrch->xh_args,
+ (map { ('-A', $_) } @ASSOC_PFX),
+ $ibx_id, $QRY_STR);
+ pipe(my ($r, $w)) or die "pipe: $!";
+ $XHC->mkreq([$DUMP_IBX_WPIPE, $w], @cmd);
+ my $ekey = $ibx->eidx_key;
+ $self->{PENDING}->{$ekey} = undef;
+ PublicInbox::CidxXapHelperAux->new($r, $self, $ekey, $TODO{associate});
}
-# repurpose shard workers to dump inbox patchids with perfect balance
sub dump_ibx_start {
my ($self, $associate) = @_;
- pipe(my ($sort_r, $sort_w)) or die "pipe: $!";
+ pipe(my $sort_r, $DUMP_IBX_WPIPE) or die "pipe: $!";
my @sort = (@SORT, '-k1,1');
my $dst = "$TMPDIR/to_ibx_id";
open my $fh, '>', $dst or die "open($dst): $!";
my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fh });
close $sort_r or die "close: $!";
awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
-
- my ($r, $w);
- socketpair($r, $w, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!";
- my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{recv_ibx_done} = [ $self, $associate ];
- $c->{ops}->{index_next} = [ $self ];
- my $io = [ $p->{op_p}, $r, $sort_w ];
- $_->wq_io_do('recv_ibx', $io, $QRY_STR) for @IDX_SHARDS;
- $IBXDIR_FEED = $w;
+ ($XHC, $XH_PID) = PublicInbox::XapClient::start_helper("-j$NPROC");
}
sub index_next ($) {
@@ -584,8 +572,8 @@ sub index_next ($) {
} elsif ($TMPDIR) {
delete $TODO{dump_ibx_start}; # runs OnDestroy once
return dump_ibx($self, shift @IBXQ) if @IBXQ;
- progress($self, 'done dumping inboxes') if $IBXDIR_FEED;
- undef $IBXDIR_FEED; # done dumping inboxes, dump roots
+ progress($self, 'done dumping inboxes') if $DUMP_IBX_WPIPE;
+ undef $DUMP_IBX_WPIPE; # done dumping inboxes, dump roots
dump_roots_once($self, delete($TODO{associate}) // return);
}
# else: wait for shards_active (post_loop_do) callback
@@ -795,7 +783,7 @@ sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) }
sub parent_quit {
$DO_QUIT = POSIX->can("SIG$_[0]")->();
- $IBXDIR_FEED = undef;
+ $XHC = undef;
kill_shards(@_);
warn "# SIG$_[0] received, quitting...\n";
}
@@ -875,8 +863,8 @@ sub associate {
@IDX_SHARDS or return warn("# aborting on no shards\n");
grep(defined, @DUMP_SHARD_ROOTS_OK) == @IDX_SHARDS or
die "E: shards not dumped properly\n";
- grep(defined, @RECV_IBX_OK) == @IDX_SHARDS or
- die "E: inboxes not dumped properly\n";
+ my @pending = keys %{$self->{PENDING}};
+ die "E: pending=@pending jobs not done\n" if @pending;
progress($self, 'associating...');
my @join = ('time', @JOIN, 'to_ibx_id', 'to_root_id');
my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" });
@@ -1032,8 +1020,9 @@ sub cidx_read_comm { # via PublicInbox::CidxComm::event_step
sub init_associate_prefork ($) {
my ($self) = @_;
return unless $self->{-opt}->{associate};
- require PublicInbox::CidxRecvIbx;
require PublicInbox::CidxDumpShardRoots;
+ require PublicInbox::CidxXapHelperAux;
+ require PublicInbox::XapClient;
$self->{-pi_cfg} = PublicInbox::Config->new;
my @unknown;
my @pfx = @{$self->{-opt}->{'associate-prefixes'} // [ 'patchid' ]};
@@ -1055,7 +1044,7 @@ EOM
sub _prep_ibx { # each_inbox callback
my ($ibx, $self, $incl) = @_;
($self->{-opt}->{all} || exists($incl->{$ibx->{inboxdir}})) and
- push @{$self->{IBX}}, $ibx;
+ push @IBX, $ibx;
}
sub show_roots { # for diagnostics
@@ -1102,13 +1091,13 @@ sub cidx_run { # main entry point
local $GIT_TODO = [];
local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
$REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
- %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $IBXDIR_FEED, @ID2ROOT,
- @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK);
+ %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE,
+ @ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC);
local $BATCH_BYTES = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
local @SORT = (undef, '-u');
- local $self->{IBX} = \@IBX;
local $self->{ASSOC_PFX} = \@ASSOC_PFX;
+ local $self->{PENDING} = {};
local $self->{-pi_cfg};
if (grep { $_ } @{$self->{-opt}}{qw(prune associate)}) {
require File::Temp;
@@ -1165,13 +1154,14 @@ sub cidx_run { # main entry point
@GIT_DIR_GONE = uniqstr @GIT_DIR_GONE, @excl;
}
local $NCHANGE = 0;
- local $LIVE_JOBS = $self->{-opt}->{jobs} ||
- PublicInbox::IPC::detect_nproc() || 2;
+ local $NPROC = PublicInbox::IPC::detect_nproc();
+ local $LIVE_JOBS = $self->{-opt}->{jobs} || $NPROC || 2;
local @RDONLY_XDB = $self->xdb_shards_flat;
init_prune($self);
init_associate_postfork($self);
scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
- index_next($self) for (1..$LIVE_JOBS);
+ my $max = $TODO{associate} ? max($LIVE_JOBS, $NPROC) : $LIVE_JOBS;
+ index_next($self) for (1..$max);
# FreeBSD ignores/discards SIGCHLD while signals are blocked and
# EVFILT_SIGNAL is inactive, so we pretend we have a SIGCHLD pending
diff --git a/lib/PublicInbox/Isearch.pm b/lib/PublicInbox/Isearch.pm
index 5cac08ba..62112171 100644
--- a/lib/PublicInbox/Isearch.pm
+++ b/lib/PublicInbox/Isearch.pm
@@ -123,4 +123,9 @@ sub has_threadid { 1 }
sub help { $_[0]->{es}->help }
+sub xh_args { # prep getopt args to feed to xap_helper.h socket
+ my ($self, $opt) = @_; # TODO uid_range
+ ($self->{es}->xh_args, '-O', $self->{eidx_key});
+}
+
1;
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index d5b0bceb..2e784646 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -66,6 +66,15 @@ our $NVRP; # '$Xap::'.('NumberValueRangeProcessor' or 'NumberRangeProcessor')
# let's hope the ABI is stable
our $ENQ_DESCENDING = 0;
our $ENQ_ASCENDING = 1;
+our @MAIL_VMAP = (
+ [ YYYYMMDD, 'd:'],
+ [ DT, 'dt:' ],
+ # these are undocumented for WWW, but lei and IMAP use them
+ [ BYTES, 'z:' ],
+ [ TS, 'rt:' ],
+ [ UID, 'uid:' ]
+);
+our @MAIL_NRP;
sub load_xapian () {
return 1 if defined $Xap;
@@ -101,6 +110,7 @@ sub load_xapian () {
# or make indexlevel=medium as default
$QP_FLAGS = FLAG_PHRASE() | FLAG_BOOLEAN() | FLAG_LOVEHATE() |
FLAG_WILDCARD();
+ @MAIL_NRP = map { $NVRP->new(@$_) } @MAIL_VMAP;
return 1;
}
undef;
@@ -490,14 +500,8 @@ sub qparse_new {
my $qp = qp_init_common($self);
my $cb = $qp->can('add_valuerangeprocessor') //
$qp->can('add_rangeprocessor'); # Xapian 1.5.0+
- $cb->($qp, $NVRP->new(YYYYMMDD, 'd:'));
- $cb->($qp, $NVRP->new(DT, 'dt:'));
-
- # for IMAP, undocumented for WWW and may be split off go away
- $cb->($qp, $NVRP->new(BYTES, 'z:'));
- $cb->($qp, $NVRP->new(TS, 'rt:'));
- $cb->($qp, $NVRP->new(UID, 'uid:'));
+ $cb->($qp, $_) for @MAIL_NRP;
while (my ($name, $prefix) = each %bool_pfx_external) {
$qp->add_boolean_prefix($name, $_) foreach split(/ /, $prefix);
}
@@ -527,6 +531,40 @@ EOF
$qp;
}
+sub generate_cxx () { # generates snippet for xap_helper.h
+ my $ret = <<EOM;
+# line ${\__LINE__} "${\__FILE__}"
+static NRP *mail_nrp[${\scalar(@MAIL_VMAP)}];
+static void mail_nrp_init(void)
+{
+EOM
+ for (0..$#MAIL_VMAP) {
+ my $x = $MAIL_VMAP[$_];
+ $ret .= qq{\tmail_nrp[$_] = new NRP($x->[0], "$x->[1]");\n}
+ }
+$ret .= <<EOM;
+}
+
+# line ${\__LINE__} "${\__FILE__}"
+static void qp_init_mail_search(Xapian::QueryParser *qp)
+{
+ for (size_t i = 0; i < MY_ARRAY_SIZE(mail_nrp); i++)
+ qp->ADD_RP(mail_nrp[i]);
+EOM
+ for my $name (sort keys %bool_pfx_external) {
+ for (split(/ /, $bool_pfx_external{$name})) {
+ $ret .= qq{\tqp->add_boolean_prefix("$name", "$_");\n}
+ }
+ }
+ # TODO: altid support
+ for my $name (sort keys %prob_prefix) {
+ for (split(/ /, $prob_prefix{$name})) {
+ $ret .= qq{\tqp->add_prefix("$name", "$_");\n}
+ }
+ }
+ $ret .= "}\n";
+}
+
sub help {
my ($self) = @_;
$self->{qp} //= $self->qparse_new; # parse altids
@@ -585,4 +623,8 @@ sub all_terms {
wantarray ? (sort keys %ret) : \%ret;
}
+sub xh_args { # prep getopt args to feed to xap_helper.h socket
+ map { ('-d', $_) } shard_dirs($_[0]);
+}
+
1;
diff --git a/lib/PublicInbox/XapClient.pm b/lib/PublicInbox/XapClient.pm
new file mode 100644
index 00000000..56e3c3b4
--- /dev/null
+++ b/lib/PublicInbox/XapClient.pm
@@ -0,0 +1,50 @@
+#!perl -w
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# This talks to (XapHelperCxx.pm + xap_helper.h) or XapHelper.pm
+# and will eventually allow users with neither XS nor SWIG Perl
+# bindings to use Xapian as long as they have Xapian development
+# headers/libs and a C++ compiler
+package PublicInbox::XapClient;
+use v5.12;
+use PublicInbox::Spawn qw(spawn);
+use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR);
+use PublicInbox::IPC;
+
+sub mkreq {
+ my ($self, $ios, @arg) = @_;
+ my ($r, $w, $n);
+ if (!defined($ios->[0])) {
+ pipe($r, $w) or die "pipe: $!";
+ $ios->[0] = $w;
+ }
+ my @fds = map fileno($_), @$ios;
+ my $buf = join("\0", @arg, '');
+ $n = PublicInbox::IPC::send_cmd($self->{io}, \@fds, $buf, MSG_EOR) //
+ die "send_cmd: $!";
+ $n == length($buf) or die "send_cmd: $n != ".length($buf);
+ $r;
+}
+
+sub start_helper {
+ my @argv = @_;
+ socketpair(my $sock, my $in, AF_UNIX, SOCK_SEQPACKET, 0) or
+ die "socketpair: $!";
+ my $cls = ($ENV{PI_NO_CXX} ? undef : eval {
+ require PublicInbox::XapHelperCxx;
+ PublicInbox::XapHelperCxx::check_build();
+ 'PublicInbox::XapHelperCxx';
+ }) // do {
+ require PublicInbox::XapHelper;
+ 'PublicInbox::XapHelper';
+ };
+ # ensure the child process has the same @INC we do:
+ my $env = { PERL5LIB => join(':', @INC) };
+ my $pid = spawn([$^X, ($^W ? ('-w') : ()), "-M$cls", '-e',
+ $cls.'::start(@ARGV)', '--', @argv],
+ $env, { 0 => $in });
+ ((bless { io => $sock, impl => $cls }, __PACKAGE__), $pid);
+}
+
+1;
diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm
new file mode 100644
index 00000000..bf2f99a2
--- /dev/null
+++ b/lib/PublicInbox/XapHelper.pm
@@ -0,0 +1,144 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Perl + SWIG||XS implementation if XapHelperCxx / xap_helper.h isn't usable.
+package PublicInbox::XapHelper;
+use v5.12;
+use Getopt::Long (); # good API even if we only use short options
+our $GLP = Getopt::Long::Parser->new;
+$GLP->configure(qw(require_order bundling no_ignore_case no_auto_abbrev));
+use PublicInbox::Search qw(xap_terms);
+use PublicInbox::IPC;
+my $X = \%PublicInbox::Search::X;
+our (%SRCH, %PIDS, $parent_pid);
+our $stderr = \*STDERR;
+
+# only short options for portability in C++ implementation
+our @SPEC = (
+ 'a', # ascending sort
+ 'c', # code search
+ 'd=s@', # shard dirs
+ 'k=i', # sort column (like sort(1))
+ 'm=i', # maximum number of results
+ 'o=i', # offset
+ 'r', # 1=relevance then column
+ 't', # collapse threads
+ 'A=s@', # prefixes
+ 'O=s', # eidx_key
+ 'T=i', # timeout in seconds
+);
+
+sub cmd_test_inspect {
+ my ($req) = @_;
+ print { $req->{0} } "pid=$$ has_threadid=",
+ ($req->{srch}->has_threadid ? 1 : 0)
+}
+
+sub cmd_dump_ibx {
+ my ($req, $ibx_id, $qry_str) = @_;
+ $qry_str // return warn('usage: dump_ibx [OPTIONS] IBX_ID QRY_STR');
+ my @pfx = @{$req->{A}} or return warn('dump_ibx requires -A PREFIX');
+ my $max = $req->{srch}->{xdb}->get_doccount;
+ my $opt = { relevance => -1, limit => $max, offset => $req->{o} // 0 };
+ $opt->{eidx_key} = $req->{O} if defined $req->{O};
+ my $mset = $req->{srch}->mset($qry_str, $opt);
+ my $out = $req->{0};
+ $out->autoflush(1);
+ for my $it ($mset->items) {
+ my $doc = $it->get_document;
+ for my $p (@pfx) {
+ for (xap_terms($p, $doc)) {
+ print $out "$_ $ibx_id\n" or die "print: $!";
+ }
+ }
+ }
+ if (my $err = $req->{1}) { say $err 'mset.size=', $mset->size }
+}
+
+sub dispatch {
+ my ($req, $cmd, @argv) = @_;
+ my $fn = $req->can("cmd_$cmd") or return;
+ $GLP->getoptionsfromarray(\@argv, $req, @SPEC) or return;
+ my $dirs = delete $req->{d} or return warn 'no -d args';
+ my $key = join("\0", @$dirs);
+ $req->{srch} = $SRCH{$key} //= do {
+ my $new = { qp_flags => $PublicInbox::Search::QP_FLAGS };
+ my $first = shift @$dirs;
+ my $slow_phrase = -f "$first/iamchert";
+ $new->{xdb} = $X->{Database}->new($first);
+ for (@$dirs) {
+ $slow_phrase ||= -f "$_/iamchert";
+ $new->{xdb}->add_database($X->{Database}->new($_));
+ }
+ $slow_phrase or
+ $new->{qp_flags} |= PublicInbox::Search::FLAG_PHRASE();
+ bless $new, $req->{c} ? 'PublicInbox::CodeSearch' :
+ 'PublicInbox::Search';
+ $new->{qp} = $new->qparse_new;
+ $new;
+ };
+ eval { $fn->($req, @argv) };
+ warn "E: $@" if $@;
+}
+
+sub recv_loop {
+ local $SIG{__WARN__} = sub { print $stderr @_ };
+ my $rbuf;
+ while (!defined($parent_pid) || getppid != $parent_pid) {
+ my $req = bless {}, __PACKAGE__;
+ my @fds = PublicInbox::IPC::recv_cmd(\*STDIN, $rbuf, 4096*33);
+ scalar(@fds) or exit(66); # EX_NOINPUT
+ $fds[0] // die "recvmsg: $!";
+ my $i = 0;
+ for my $fd (@fds) {
+ open($req->{$i++}, '+<&=', $fd) and next;
+ warn("open(+<&=$fd) (FD=$i): $!");
+ undef $req;
+ last;
+ }
+ $req or next;
+ local $stderr = $req->{1} // \*STDERR;
+ if (chop($rbuf) ne "\0") {
+ warn "not NUL-terminated";
+ next;
+ }
+ my @argv = split(/\0/, $rbuf);
+ eval { $req->dispatch(@argv) } if @argv;
+ }
+}
+
+sub start_worker ($) {
+ my ($nr) = @_;
+ my $pid = fork // return warn("fork: $!");
+ if ($pid == 0) {
+ undef %PIDS;
+ recv_loop();
+ exit(0);
+ } else {
+ $PIDS{$pid} = $nr;
+ }
+}
+
+sub start (@) {
+ my (@argv) = @_;
+ local (%SRCH, %PIDS, $parent_pid);
+ PublicInbox::Search::load_xapian();
+ $GLP->getoptionsfromarray(\@argv, my $opt = { j => 1 }, 'j=i') or
+ die 'bad args';
+ return recv_loop() if !$opt->{j};
+ die '-j must be >= 0' if $opt->{j} < 0;
+ start_worker($_) for (1..($opt->{j}));
+
+ my $quit;
+ until ($quit) {
+ my $p = waitpid(-1, 0) or return;
+ if (defined(my $nr = delete $PIDS{$p})) {
+ $quit = 1 if ($? >> 8) == 66; # EX_NOINPUT
+ start_worker($nr) if !$quit;
+ } else {
+ warn "W: unknown pid=$p reaped\n";
+ }
+ }
+}
+
+1;
diff --git a/lib/PublicInbox/XapHelperCxx.pm b/lib/PublicInbox/XapHelperCxx.pm
new file mode 100644
index 00000000..4571676b
--- /dev/null
+++ b/lib/PublicInbox/XapHelperCxx.pm
@@ -0,0 +1,93 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Just-ahead-of-time builder for the lib/PublicInbox/xap_helper.h shim.
+# I never want users to be without source code for repairs, so this
+# aims to replicate the feel of a scripting language using C++.
+# The resulting executable is not linked to Perl in any way.
+package PublicInbox::XapHelperCxx;
+use v5.12;
+use PublicInbox::Spawn;
+use PublicInbox::Search;
+my $dir = ($ENV{PERL_INLINE_DIRECTORY} //
+ die('BUG: PERL_INLINE_DIRECTORY unset')) . '/cxx';
+my $bin = "$dir/xap_helper";
+my ($srcpfx) = (__FILE__ =~ m!\A(.+/)[^/]+\z!);
+my @srcs = map { $srcpfx.$_ } qw(xap_helper.h);
+my @pm_dep = map { $srcpfx.$_ } qw(Search.pm CodeSearch.pm);
+my $xflags = ($ENV{CXXFLAGS} // '-Wall -ggdb3 -O0') . ' ' .
+ ($ENV{LDFLAGS} // '-Wl,-O1 -Wl,--compress-debug-sections=zlib') .
+ qq{ -DTHREADID=}.PublicInbox::Search::THREADID;
+
+sub xflags_chg () {
+ open my $fh, '<', "$dir/XFLAGS" or return 1;
+ chomp(my $prev = <$fh>);
+ $prev ne $xflags;
+}
+
+sub build () {
+ if (!-d $dir) {
+ my $err;
+ mkdir($dir) or $err = $!;
+ die "mkdir($dir): $err" if !-d $dir;
+ }
+ use autodie;
+ require File::Temp;
+ require PublicInbox::CodeSearch;
+ my ($prog) = ($bin =~ m!/([^/]+)\z!);
+ my $pkg_config = $ENV{PKG_CONFIG} // 'pkg-config';
+ my $tmp = File::Temp->newdir(DIR => $dir) // die "newdir: $!";
+ my $src = "$tmp/$prog.cpp";
+ open my $fh, '>', $src;
+ for (@srcs) {
+ say $fh qq(# line 1 "$_");
+ open my $rfh, '<', $_;
+ local $/;
+ print $fh readline($rfh);
+ }
+ print $fh PublicInbox::Search::generate_cxx();
+ print $fh PublicInbox::CodeSearch::generate_cxx();
+ close $fh;
+
+ my $cmd = "$pkg_config --libs --cflags xapian-core";
+ chomp(my $fl = `$cmd`);
+ die "$cmd failed: \$?=$?" if $?;
+ my $cxx = $ENV{CXX} // 'c++';
+ $cmd = "$cxx $src $fl $xflags -o $tmp/$prog";
+ system($cmd) and die "$cmd failed: \$?=$?";
+ my $cf = "$tmp/XFLAGS";
+ open $fh, '>', $cf;
+ say $fh $xflags;
+ close $fh;
+ # not quite atomic, but close enough :P
+ rename("$tmp/$_", "$dir/$_") for ($prog, 'XFLAGS');
+}
+
+sub check_build () {
+ use Time::HiRes qw(stat);
+ my $ctime = 0;
+ my @bin = stat($bin) or return build();
+ for (@srcs, @pm_dep) {
+ my @st = stat($_) or die "stat $_: $!";
+ if ($st[10] > $ctime) {
+ $ctime = $st[10];
+ return build() if $ctime > $bin[10];
+ }
+ }
+ xflags_chg() ? build() : 0;
+}
+
+sub start (@) {
+ check_build();
+ my @cmd;
+ if (my $v = $ENV{VALGRIND}) {
+ $v = 'valgrind -v' if $v eq '1';
+ @cmd = split(/\s+/, $v);
+ }
+ push @cmd, $bin, @_;
+ my $prog = $cmd[0];
+ $cmd[0] =~ s!\A.*?/([^/]+)\z!$1!;
+ exec { $prog } @cmd;
+}
+
+1;
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
new file mode 100644
index 00000000..52db92b7
--- /dev/null
+++ b/lib/PublicInbox/xap_helper.h
@@ -0,0 +1,654 @@
+/*
+ * Copyright (C) all contributors <meta@public-inbox.org>
+ * License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+ *
+ * Standalone helper process using C and minimal C++ for Xapian,
+ * this is not linked to Perl in any way.
+ * C (not C++) is used as much as possible to lower the contribution
+ * barrier for hackers who mainly know C (this includes the maintainer).
+ * Everything here is an unstable internal API of public-inbox and
+ * NOT intended for ordinary users; only public-inbox hackers
+ */
+#ifndef _ALL_SOURCE
+# define _ALL_SOURCE
+#endif
+#include <sys/resource.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <sys/wait.h>
+
+#include <assert.h>
+#include <err.h> // BSD, glibc, and musl all have this
+#include <errno.h>
+#include <fcntl.h>
+#include <limits.h>
+#include <search.h>
+#include <stdio.h>
+#include <string.h>
+#include <sysexits.h>
+#include <unistd.h>
+#include <xapian.h> // our only reason for using C++
+
+#define MY_VER(maj,min,rev) ((maj) << 16 | (min) << 8 | (rev))
+#define XAP_VER \
+ MY_VER(XAPIAN_MAJOR_VERSION,XAPIAN_MINOR_VERSION,XAPIAN_REVISION)
+
+#if XAP_VER >= MY_VER(1,3,6)
+# define NRP Xapian::NumberRangeProcessor
+# define ADD_RP add_rangeprocessor
+# define SET_MAX_EXPANSION set_max_expansion // technically 1.3.3
+#else
+# define NRP Xapian::NumberValueRangeProcessor
+# define ADD_RP add_valuerangeprocessor
+# define SET_MAX_EXPANSION set_max_wildcard_expansion
+#endif
+
+static const int sock_fd = 0; // SOCK_SEQPACKET as stdin :P
+static pid_t parent_pid;
+static FILE *orig_err = stderr;
+static void *srch_tree; // tsearch + tdelete + twalk
+static pid_t *worker_pids; // nr => pid
+static unsigned long nworker;
+
+// PublicInbox::Search and PublicInbox::CodeSearch generate these:
+static void mail_nrp_init(void);
+static void code_nrp_init(void);
+static void qp_init_mail_search(Xapian::QueryParser *);
+static void qp_init_code_search(Xapian::QueryParser *);
+
+struct srch {
+ int paths_len; // int for comparisons
+ unsigned qp_flags;
+ Xapian::Database *db;
+ Xapian::QueryParser *qp;
+ char paths[]; // $shard_path0\0$shard_path1\0...
+};
+
+#define MY_ARG_MAX 256
+typedef bool (*cmd)(struct req *);
+
+// only one request per-process since we have RLIMIT_CPU timeout
+struct req { // argv and pfxv point into global rbuf
+ char *argv[MY_ARG_MAX];
+ char *pfxv[MY_ARG_MAX]; // -A <prefix>
+ struct srch *srch;
+ char *Oeidx_key;
+ cmd fn;
+ unsigned long long max;
+ unsigned long long off;
+ unsigned long timeout_sec;
+ long sort_col; // value column, negative means BoolWeight
+ int argc;
+ int pfxc;
+ FILE *fp[2]; // [0] response pipe or sock, [1] status/errors (optional)
+ bool has_input; // fp[0] is bidirectional
+ bool collapse_threads;
+ bool code_search;
+ bool relevance; // sort by relevance before column
+ bool asc; // ascending sort
+};
+
+struct worker {
+ pid_t pid;
+ unsigned nr;
+};
+
+static bool has_threadid(const struct srch *srch)
+{
+ return srch->db->get_metadata("has_threadid") == "1";
+}
+
+static Xapian::Enquire prep_enquire(const struct req *req)
+{
+ Xapian::Enquire enq(*req->srch->db);
+ if (req->sort_col < 0) {
+ enq.set_weighting_scheme(Xapian::BoolWeight());
+ enq.set_docid_order(req->asc ? Xapian::Enquire::ASCENDING
+ : Xapian::Enquire::DESCENDING);
+ } else if (req->relevance) {
+ enq.set_sort_by_relevance_then_value(req->sort_col, !req->asc);
+ } else {
+ enq.set_sort_by_value_then_relevance(req->sort_col, !req->asc);
+ }
+ return enq;
+}
+
+static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq)
+{
+ if (!req->max)
+ req->max = 50;
+ for (int i = 0; i < 9; i++) {
+ try {
+ Xapian::MSet mset = enq->get_mset(req->off, req->max);
+ return mset;
+ } catch (const Xapian::DatabaseModifiedError & e) {
+ req->srch->db->reopen();
+ }
+ }
+ return enq->get_mset(req->off, req->max);
+}
+
+static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
+{
+ struct srch *srch = req->srch;
+ Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
+ if (req->Oeidx_key) {
+ req->Oeidx_key[0] = 'O'; // modifies static rbuf
+ fprintf(stderr, "dbg eidxkey:%s>\n", req->Oeidx_key);
+ qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
+ Xapian::Query(req->Oeidx_key));
+ }
+ Xapian::Enquire enq = prep_enquire(req);
+ enq.set_query(qry);
+ // THREADID is a CPP macro defined on CLI (see) XapHelperCxx.pm
+ if (req->collapse_threads && has_threadid(srch))
+ enq.set_collapse_key(THREADID);
+
+ return enquire_mset(req, &enq);
+}
+
+static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
+{
+ return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
+}
+
+static void dump_ibx_term(struct req *req, const char *pfx,
+ Xapian::Document *doc, const char *ibx_id)
+{
+ Xapian::TermIterator cur = doc->termlist_begin();
+ Xapian::TermIterator end = doc->termlist_end();
+ size_t pfx_len = strlen(pfx);
+
+ for (cur.skip_to(pfx); cur != end; cur++) {
+ std::string tn = *cur;
+
+ if (starts_with(&tn, pfx, pfx_len))
+ fprintf(req->fp[0], "%s %s\n",
+ tn.c_str() + pfx_len, ibx_id);
+ }
+}
+
+static int my_setlinebuf(FILE *fp) // glibc setlinebuf(3) can't report errors
+{
+ return setvbuf(fp, NULL, _IOLBF, 0);
+}
+
+static bool cmd_dump_ibx(struct req *req)
+{
+ if ((optind + 1) >= req->argc) {
+ warnx("usage: dump_ibx [OPTIONS] IBX_ID QRY_STR");
+ return false; // need ibx_id + qry_str
+ }
+ if (!req->pfxc) {
+ warnx("dump_ibx requires -A PREFIX");
+ return false;
+ }
+
+ const char *ibx_id = req->argv[optind];
+ if (my_setlinebuf(req->fp[0])) { // for sort(1) pipe
+ perror("setlinebuf(fp[0])");
+ return false;
+ }
+ req->asc = true;
+ req->sort_col = -1;
+ req->max = (unsigned long long)req->srch->db->get_doccount();
+ Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]);
+ for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
+ try {
+ Xapian::Document doc = i.get_document();
+ for (int p = 0; p < req->pfxc; p++)
+ dump_ibx_term(req, req->pfxv[p], &doc, ibx_id);
+ } catch (const Xapian::Error & e) {
+ fprintf(orig_err, "W: %s (#%ld)\n",
+ e.get_description().c_str(), (long)(*i));
+ continue;
+ }
+ }
+ if (req->fp[1])
+ fprintf(req->fp[1], "mset.size=%llu\n",
+ (unsigned long long)mset.size());
+ return true;
+}
+
+// internal usage only
+static bool cmd_test_inspect(struct req *req)
+{
+ fprintf(req->fp[0], "pid=%d has_threadid=%d",
+ (int)getpid(), has_threadid(req->srch) ? 1 : 0);
+ return true;
+}
+
+#define CMD(n) { .fn_len = sizeof(#n) - 1, .fn_name = #n, .fn = cmd_##n }
+static const struct cmd_entry {
+ size_t fn_len;
+ const char *fn_name;
+ cmd fn;
+} cmds[] = { // should be small enough to not need bsearch || gperf
+ // most common commands first
+ CMD(dump_ibx),
+ CMD(test_inspect), // least common commands last
+};
+
+#define MY_ARRAY_SIZE(x) (sizeof(x)/sizeof((x)[0]))
+#define RECV_FD_CAPA 2
+#define RECV_FD_SPACE (RECV_FD_CAPA * sizeof(int))
+union my_cmsg {
+ struct cmsghdr hdr;
+ char pad[sizeof(struct cmsghdr) + 16 + RECV_FD_SPACE];
+};
+
+static void xclose(int fd)
+{
+ if (close(fd) < 0 && errno != EINTR)
+ err(EXIT_FAILURE, "BUG: close");
+}
+
+static bool recv_req(struct req *req, char *rbuf, size_t *len)
+{
+ union my_cmsg cmsg = { 0 };
+ struct msghdr msg = { .msg_iovlen = 1 };
+ struct iovec iov;
+ iov.iov_base = rbuf;
+ iov.iov_len = *len;
+ msg.msg_iov = &iov;
+ msg.msg_control = &cmsg.hdr;
+ msg.msg_controllen = CMSG_SPACE(RECV_FD_SPACE);
+
+ ssize_t r = recvmsg(sock_fd, &msg, 0);
+ if (r < 0)
+ err(EXIT_FAILURE, "recvmsg");
+ if (r == 0)
+ exit(EX_NOINPUT); /* grandparent went away */
+ *len = r;
+ if (r > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
+ cmsg.hdr.cmsg_type == SCM_RIGHTS) {
+ size_t len = cmsg.hdr.cmsg_len;
+ int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
+ size_t i;
+ bool fd_ok = true;
+ for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++) {
+ int fd = *fdp++;
+ const char *mode = NULL;
+ int fl = fd_ok ? fcntl(fd, F_GETFL) : 0;
+ switch (fl) {
+ case 0: break; // hit previous error
+ case -1:
+ warnx("invalid fd=%d", fd);
+ fd_ok = false;
+ break;
+ case O_WRONLY: mode = "w"; break;
+ case O_RDWR:
+ mode = "r+";
+ if (i == 0) req->has_input = true;
+ break;
+ default:
+ warnx("invalid mode from F_GETFL: 0x%x", fl);
+ fd_ok = false;
+ }
+ if (!fd_ok) {
+ xclose(fd);
+ } else {
+ req->fp[i] = fdopen(fd, mode);
+ if (!req->fp[i]) {
+ warn("fdopen(fd=%d)", fd);
+ fd_ok = false;
+ }
+ }
+ }
+ for (i = 0; !fd_ok && i < MY_ARRAY_SIZE(req->fp); i++)
+ if (req->fp[i]) fclose(req->fp[i]);
+ return fd_ok;
+ }
+ warnx("no FD received in %zd-byte request", r);
+ return false;
+}
+
+#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst))
+static int split2argv(char **dst, char *buf, size_t len, size_t limit)
+{
+ if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) {
+ warnx("bogus argument given");
+ return 0;
+ }
+ size_t nr = 0;
+ char *c = buf;
+ for (size_t i = 1; i < len; i++) {
+ if (!buf[i]) {
+ dst[nr++] = c;
+ c = buf + i + 1;
+ }
+ if (nr == limit) {
+ warnx("too many args: %zu", nr);
+ return 0;
+ }
+ }
+ return (int)nr;
+}
+
+static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch
+{
+ const struct srch *a = (const struct srch *)pa;
+ const struct srch *b = (const struct srch *)pb;
+ int diff = a->paths_len - b->paths_len;
+
+ return diff ? diff : memcmp(a->paths, b->paths, (size_t)a->paths_len);
+}
+
+static bool is_chert(const char *dir)
+{
+ char iamchert[PATH_MAX];
+ struct stat sb;
+ int rc = snprintf(iamchert, sizeof(iamchert), "%s/iamchert", dir);
+
+ if (rc <= 0 || rc >= (int)sizeof(iamchert))
+ err(EXIT_FAILURE, "BUG: snprintf(%s/iamchert)", dir);
+ if (stat(iamchert, &sb) == 0 && S_ISREG(sb.st_mode))
+ return true;
+ return false;
+}
+
+static bool srch_init(struct req *req)
+{
+ char *dirv[MY_ARG_MAX];
+ int i;
+ struct srch *srch = req->srch;
+ int dirc = SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len);
+ const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE;
+ srch->qp_flags = FLAG_PHRASE |
+ Xapian::QueryParser::FLAG_BOOLEAN |
+ Xapian::QueryParser::FLAG_LOVEHATE |
+ Xapian::QueryParser::FLAG_WILDCARD;
+ if (is_chert(dirv[0]))
+ srch->qp_flags &= ~FLAG_PHRASE;
+ try {
+ srch->db = new Xapian::Database(dirv[0]);
+ } catch (...) {
+ warn("E: Xapian::Database(%s)", dirv[0]);
+ return false;
+ }
+ try {
+ for (i = 1; i < dirc; i++) {
+ if (srch->qp_flags & FLAG_PHRASE && is_chert(dirv[i]))
+ srch->qp_flags &= ~FLAG_PHRASE;
+ srch->db->add_database(Xapian::Database(dirv[i]));
+ }
+ } catch (...) {
+ warn("E: add_database(%s)", dirv[i]);
+ return false;
+ }
+ try {
+ srch->qp = new Xapian::QueryParser;
+ } catch (...) {
+ perror("E: Xapian::QueryParser");
+ return false;
+ }
+ srch->qp->set_default_op(Xapian::Query::OP_AND);
+ srch->qp->set_database(*srch->db);
+ try {
+ srch->qp->set_stemmer(Xapian::Stem("english"));
+ } catch (...) {
+ perror("E: Xapian::Stem");
+ return false;
+ }
+ srch->qp->set_stemming_strategy(Xapian::QueryParser::STEM_SOME);
+ srch->qp->SET_MAX_EXPANSION(100);
+
+ if (req->code_search)
+ qp_init_code_search(srch->qp); // CodeSearch.pm
+ else
+ qp_init_mail_search(srch->qp); // Search.pm
+ return true;
+}
+
+static void free_srch(void *p) // tdestroy
+{
+ struct srch *srch = (struct srch *)p;
+ delete srch->qp;
+ delete srch->db;
+ free(srch);
+}
+
+static void dispatch(struct req *req)
+{
+ int c;
+ size_t size = strlen(req->argv[0]);
+ union {
+ struct srch *srch;
+ char *ptr;
+ } fbuf;
+ char *end;
+ FILE *kfp;
+ struct srch **s;
+ req->fn = NULL;
+ for (c = 0; c < (int)MY_ARRAY_SIZE(cmds); c++) {
+ if (cmds[c].fn_len == size &&
+ !memcmp(cmds[c].fn_name, req->argv[0], size)) {
+ req->fn = cmds[c].fn;
+ break;
+ }
+ }
+ if (!req->fn) goto cmd_err;
+
+ kfp = open_memstream(&fbuf.ptr, &size);
+ // write padding, first
+ fwrite(&req->argv[0], offsetof(struct srch, paths), 1, kfp);
+
+ // global getopt variables:
+ optind = 1;
+ opterr = optopt = 0;
+ optarg = NULL;
+
+ // keep sync with @PublicInbox::XapHelper::SPEC
+ while ((c = getopt(req->argc, req->argv, "acd:k:m:o:rtA:O:T:")) != -1) {
+ switch (c) {
+ case 'a': req->asc = true; break;
+ case 'c': req->code_search = true; break;
+ case 'd': fwrite(optarg, strlen(optarg) + 1, 1, kfp); break;
+ case 'k':
+ req->sort_col = strtol(optarg, &end, 10);
+ if (*end) goto cmd_err;
+ switch (req->sort_col) {
+ case LONG_MAX: case LONG_MIN: goto cmd_err;
+ }
+ break;
+ case 'm':
+ req->max = strtoull(optarg, &end, 10);
+ if (*end) goto cmd_err;
+ if (req->max == ULLONG_MAX) goto cmd_err;
+ break;
+ case 'o':
+ req->off = strtoull(optarg, &end, 10);
+ if (*end) goto cmd_err;
+ if (req->off == ULLONG_MAX) goto cmd_err;
+ break;
+ case 'r': req->relevance = true; break;
+ case 't': req->collapse_threads = true; break;
+ case 'A':
+ req->pfxv[req->pfxc++] = optarg;
+ if (MY_ARG_MAX == req->pfxc) goto cmd_err;
+ break;
+ case 'O': req->Oeidx_key = optarg - 1; break; // pad "O" prefix
+ case 'T':
+ req->timeout_sec = strtoul(optarg, &end, 10);
+ if (*end) goto cmd_err;
+ if (req->timeout_sec == ULONG_MAX) goto cmd_err;
+ break;
+ default: goto cmd_err;
+ }
+ }
+ if (ferror(kfp) | fclose(kfp)) {
+ perror("ferror|fclose");
+ goto cmd_err;
+ }
+ fbuf.srch->db = NULL;
+ fbuf.srch->qp = NULL;
+ fbuf.srch->paths_len = size - offsetof(struct srch, paths);
+ if (fbuf.srch->paths_len <= 0) {
+ free_srch(fbuf.srch);
+ warnx("no -d args");
+ goto cmd_err;
+ }
+ s = (struct srch **)tsearch(fbuf.srch, &srch_tree, srch_cmp);
+ if (!s) {
+ perror("tsearch");
+ goto cmd_err;
+ }
+ req->srch = *s;
+ if (req->srch != fbuf.srch) { // reuse existing
+ free_srch(fbuf.srch);
+ } else if (!srch_init(req)) {
+ assert(fbuf.srch == *((struct srch **)tfind(
+ fbuf.srch, &srch_tree, srch_cmp)));
+ void *del = tdelete(fbuf.srch, &srch_tree, srch_cmp);
+ assert(del);
+ free_srch(fbuf.srch);
+ goto cmd_err;
+ }
+ try {
+ if (!req->fn(req))
+ goto cmd_err;
+ } catch (const Xapian::Error & e) {
+ warnx("Xapian::Error: %s", e.get_description().c_str());
+ } catch (...) {
+ warn("unhandled exception");
+ }
+cmd_err:
+ return; // just be silent on errors, for now
+}
+
+static void cleanup_pids(void)
+{
+ free(worker_pids);
+ worker_pids = NULL;
+}
+
+static void recv_loop(void) // worker process loop
+{
+ static char rbuf[4096 * 33]; // per-process
+ while (!parent_pid || getppid() == parent_pid) {
+ size_t len = sizeof(rbuf);
+ struct req req = { 0 };
+ if (!recv_req(&req, rbuf, &len))
+ continue;
+ if (req.fp[1]) {
+ if (my_setlinebuf(req.fp[1]))
+ perror("W: setlinebuf(req.fp[1])");
+ stderr = req.fp[1];
+ }
+ req.argc = SPLIT2ARGV(req.argv, rbuf, len);
+ if (req.argc > 0)
+ dispatch(&req);
+ if (ferror(req.fp[0]) | fclose(req.fp[0]))
+ perror("ferror|fclose fp[0]");
+ if (req.fp[1]) {
+ stderr = orig_err;
+ if (ferror(req.fp[1]) | fclose(req.fp[1]))
+ perror("ferror|fclose fp[1]");
+ }
+ }
+}
+
+static void insert_pid(pid_t pid, unsigned nr)
+{
+ assert(!worker_pids[nr]);
+ worker_pids[nr] = pid;
+}
+
+static int delete_pid(pid_t pid)
+{
+ for (unsigned nr = 0; nr < nworker; nr++) {
+ if (worker_pids[nr] == pid) {
+ worker_pids[nr] = 0;
+ return nr;
+ }
+ }
+ warnx("W: unknown pid=%d reaped", (int)pid);
+ return -1;
+}
+
+static void start_worker(unsigned nr)
+{
+ pid_t pid = fork();
+ if (pid < 0) {
+ warn("E: fork(worker=%u)", nr);
+ } else if (pid > 0) {
+ insert_pid(pid, nr);
+ } else {
+ cleanup_pids();
+ recv_loop();
+ exit(0);
+ }
+}
+
+static void cleanup_all(void)
+{
+ cleanup_pids();
+#ifdef __GLIBC__
+ tdestroy(srch_tree, free_srch);
+ srch_tree = NULL;
+#endif
+}
+
+int main(int argc, char *argv[])
+{
+ int c;
+
+ mail_nrp_init();
+ code_nrp_init();
+ atexit(cleanup_all);
+
+ nworker = 0;
+#ifdef _SC_NPROCESSORS_ONLN
+ long j = sysconf(_SC_NPROCESSORS_ONLN);
+ if (j > 0)
+ nworker = j > UCHAR_MAX ? UCHAR_MAX : j;
+#endif // _SC_NPROCESSORS_ONLN
+
+ // make warn/warnx/err multi-process friendly:
+ if (my_setlinebuf(stderr))
+ err(EXIT_FAILURE, "setlinebuf(stderr)");
+ // not using -W<workers> like Daemon.pm, since -W is reserved (glibc)
+ while ((c = getopt(argc, argv, "j:")) != -1) {
+ char *end;
+
+ switch (c) {
+ case 'j':
+ nworker = strtoul(optarg, &end, 10);
+ if (*end != 0 || nworker > USHRT_MAX)
+ errx(EXIT_FAILURE, "-j %s invalid", optarg);
+ break;
+ case ':':
+ errx(EXIT_FAILURE, "missing argument: `-%c'", optopt);
+ case '?':
+ errx(EXIT_FAILURE, "unrecognized: `-%c'", optopt);
+ default:
+ errx(EXIT_FAILURE, "BUG: `-%c'", c);
+ }
+ }
+ if (nworker == 0) {
+ recv_loop();
+ } else {
+ parent_pid = getpid();
+ worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t));
+ if (!worker_pids)
+ err(EXIT_FAILURE, "calloc");
+ for (unsigned i = 0; i < nworker; i++)
+ start_worker(i);
+
+ int st;
+ pid_t pid;
+ bool quit = false;
+ while ((pid = wait(&st)) > 0) {
+ int nr = delete_pid(pid);
+ if (nr < 0) continue;
+ if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT)
+ quit = true;
+ if (!quit)
+ start_worker(nr);
+ }
+ }
+ return 0;
+}
diff --git a/t/xap_helper.t b/t/xap_helper.t
new file mode 100644
index 00000000..f00a845a
--- /dev/null
+++ b/t/xap_helper.t
@@ -0,0 +1,147 @@
+#!perl -w
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use v5.12;
+use PublicInbox::TestCommon;
+require_mods(qw(DBD::SQLite Search::Xapian));
+use PublicInbox::Spawn qw(spawn);
+use Socket qw(AF_UNIX SOCK_SEQPACKET SOCK_STREAM MSG_EOR);
+require PublicInbox::AutoReap;
+require PublicInbox::IPC;
+require PublicInbox::XapClient;
+use autodie;
+my ($tmp, $for_destroy) = tmpdir();
+
+my $fi_data = './t/git.fast-import-data';
+open my $fi_fh, '<', $fi_data;
+open my $dh, '<', '.';
+my $crepo = create_coderepo 'for-cindex', sub {
+ my ($d) = @_;
+ xsys_e([qw(git init -q --bare)]);
+ xsys_e([qw(git fast-import --quiet)], undef, { 0 => $fi_fh });
+ chdir($dh);
+ run_script([qw(-cindex --dangerous -L medium --no-fsync -q -j1), $d])
+ or xbail '-cindex internal';
+ run_script([qw(-cindex --dangerous -L medium --no-fsync -q -j3 -d),
+ "$d/cidx-ext", $d]) or xbail '-cindex "external"';
+};
+$dh = $fi_fh = undef;
+
+my $v2 = create_inbox 'v2', indexlevel => 'medium', version => 2,
+ tmpdir => "$tmp/v2", sub {
+ my ($im) = @_;
+ for my $f (qw(t/data/0001.patch t/data/binary.patch
+ t/data/message_embed.eml
+ t/solve/0001-simple-mod.patch
+ t/solve/0002-rename-with-modifications.patch
+ t/solve/bare.patch)) {
+ $im->add(eml_load($f)) or BAIL_OUT;
+ }
+};
+
+my @ibx_idx = glob("$v2->{inboxdir}/xap*/?");
+my (@int) = glob("$crepo/public-inbox-cindex/cidx*/?");
+my (@ext) = glob("$crepo/cidx-ext/cidx*/?");
+is(scalar(@ext), 2, 'have 2 external shards') or diag explain(\@ext);
+is(scalar(@int), 1, 'have 1 internal shard') or diag explain(\@int);
+
+my $doreq = sub {
+ my ($s, @arg) = @_;
+ my $err = pop @arg if ref($arg[-1]);
+ pipe(my $x, my $y);
+ my $buf = join("\0", @arg, '');
+ my @fds = fileno($y);
+ push @fds, fileno($err) if $err;
+ my $n = PublicInbox::IPC::send_cmd($s, \@fds, $buf, MSG_EOR);
+ $n // xbail "send: $!";
+ my $arg = "@arg";
+ $arg =~ s/\Q$tmp\E/\$TMP/gs;
+ is(length($buf), $n, "req $arg sent");
+ $x;
+};
+
+my $env = { PERL5LIB => join(':', @INC) };
+my $test = sub {
+ my (@arg) = @_;
+ socketpair(my $s, my $y, AF_UNIX, SOCK_SEQPACKET, 0);
+ my $pid = spawn([$^X, '-w', @arg], $env, { 0 => $y });
+ my $ar = PublicInbox::AutoReap->new($pid);
+ diag "$arg[-1] running pid=$pid";
+ close $y;
+ my $r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0]);
+ my %info = map { split(/=/, $_, 2) } split(/ /, do { local $/; <$r> });
+ is($info{has_threadid}, '1', 'has_threadid true for inbox');
+ like($info{pid}, qr/\A\d+\z/, 'got PID from inbox inspect');
+
+ $r = $doreq->($s, qw(test_inspect -d), $int[0]);
+ my %cinfo = map { split(/=/, $_, 2) } split(/ /, do { local $/; <$r> });
+ is($cinfo{has_threadid}, '0', 'has_threadid false for cindex');
+ is($cinfo{pid}, $info{pid}, 'PID unchanged for cindex');
+
+ my @dump = (qw(dump_ibx -A XDFID), (map { ('-d', $_) } @ibx_idx),
+ qw(13 rt:0..));
+ $r = $doreq->($s, @dump);
+ my @res;
+ while (sysread($r, my $buf, 512) != 0) { push @res, $buf }
+ is(grep(/\n\z/s, @res), scalar(@res), 'line buffered');
+
+ pipe(my $err_rd, my $err_wr);
+ $r = $doreq->($s, @dump, $err_wr);
+ close $err_wr;
+ my $res = do { local $/; <$r> };
+ is(join('', @res), $res, 'got identical response w/ error pipe');
+ my $stats = do { local $/; <$err_rd> };
+ is($stats, "mset.size=6\n", 'mset.size reported');
+
+ if ($arg[-1] !~ /\('-j0'\)/) {
+ kill('KILL', $cinfo{pid});
+ $r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0]);
+ %info = map {
+ split(/=/, $_, 2)
+ } split(/ /, do { local $/; <$r> });
+ isnt($info{pid}, $cinfo{pid}, 'spawned new worker');
+ }
+ $ar;
+};
+my $ar;
+
+$ar = $test->(qw[-MPublicInbox::XapHelper -e
+ PublicInbox::XapHelper::start('-j0')]);
+$ar = $test->(qw[-MPublicInbox::XapHelper -e
+ PublicInbox::XapHelper::start('-j1')]);
+
+my @NO_CXX = (0);
+SKIP: {
+ eval {
+ require PublicInbox::XapHelperCxx;
+ PublicInbox::XapHelperCxx::check_build();
+ };
+ skip "XapHelperCxx build: $@", 1 if $@;
+ push @NO_CXX, 1;
+
+ $ar = $test->(qw[-MPublicInbox::XapHelperCxx -e
+ PublicInbox::XapHelperCxx::start('-j0')]);
+ $ar = $test->(qw[-MPublicInbox::XapHelperCxx -e
+ PublicInbox::XapHelperCxx::start('-j1')]);
+};
+
+for my $n (@NO_CXX) {
+ local $ENV{PI_NO_CXX} = $n;
+ my ($xhc, $pid) = PublicInbox::XapClient::start_helper('-j0');
+ $ar = PublicInbox::AutoReap->new($pid);
+ pipe(my $err_r, my $err_w);
+
+ # git patch-id --stable <t/data/0001.patch | awk '{print $1}'
+ my $dfid = '91ee6b761fc7f47cad9f2b09b10489f313eb5b71';
+ my $mid = '20180720072141.GA15957@example';
+ my $r = $xhc->mkreq([ undef, $err_w ], qw(dump_ibx -A XDFID -A Q),
+ (map { ('-d', $_) } @ibx_idx),
+ 9, "mid:$mid");
+ close $err_w;
+ my $res = do { local $/; <$r> };
+ is($res, "$dfid 9\n$mid 9\n", "got expected result ($xhc->{impl})");
+ my $err = do { local $/; <$err_r> };
+ is($err, "mset.size=1\n", "got expected status ($xhc->{impl})");
+}
+
+done_testing;
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH 5/7] cindex: fix sorting and uniqueness
2023-08-24 1:22 [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
` (3 preceding siblings ...)
2023-08-24 1:22 ` [PATCH 4/7] introduce optional C++ xap_helper Eric Wong
@ 2023-08-24 1:22 ` Eric Wong
2023-08-24 1:22 ` [PATCH 6/7] cindex: implement dump_roots in C++ Eric Wong
` (2 subsequent siblings)
7 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2023-08-24 1:22 UTC (permalink / raw)
To: meta
We can't rely on combining the `-u' and `-k1,1' switches of POSIX
sort(1) to do what we want. So only rely on `sort -k1,1' while
introducing a small Perl helper to fold identical prefixes into
one line. In other words, input such as:
deadbeef 0
deadbeef 1
deadbeef 2
Was getting deduplicated into a single line:
deadbeef 0
... with `sort -u -k1,1'
This makes puts the output into a more optimal form for eventual
(not-fully-implemented-yet) parsing:
deadbeef 0,1,2
ORS is current the comma (`,') for inbox IDs, but it'll be a
space (` ') for coderepo root IDs. This implementation also
combines identical IDs in the 2nd column. Thus:
deadbeef 0
deadbeef 0
Becomes a single `deadbeef 0' line thanks to the use of
XS List::Util::uniq (which beats a pure Perl hash).
I attempted to implement this in awk but Perl is close enough to
gawk in performance while being shorter and easier-to-understand
due to List::Util::uniq. mawk was faster, but still not enough
to matter as the bottleneck is from iterating through Xapian
MSets.
---
lib/PublicInbox/CodeSearchIdx.pm | 63 +++++++++++++++++++++++---------
1 file changed, 45 insertions(+), 18 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index b8afecd2..4a41b1da 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -21,12 +21,14 @@
#
# * to_root_id - each line is of the format:
#
-# $PFX $ROOT_COMMIT_OID_ID
+# $PFX $ROOT_COMMIT_OID_IDS
#
# * to_ibx_id - each line is of the format:
#
-# $PFX $IBX_ID
+# $PFX $IBX_IDS
#
+# $IBX_IDS is a comma-delimited list of integers ($IBX_ID)
+# $ROOT_COMMIT_OID_IDS is space-delimited
# In both cases, $PFX is typically the value of the patchid (XDFID) but it
# can be configured to use any combination of patchid, dfpre, dfpost or
# dfblob.
@@ -134,6 +136,20 @@ sub new {
$self;
}
+# This is similar to uniq(1) on the first column, but combines the
+# contents of subsequent columns using $OFS.
+our @UNIQ_FOLD = ($^X, $^W ? ('-w') : (), qw(-MList::Util=uniq -ane), <<'EOM');
+BEGIN { $ofs = $ENV{OFS} // ','; $apfx = '' }
+if ($F[0] eq $apfx) {
+ shift @F;
+ push @ids, @F;
+} else {
+ print $apfx.' '.join($ofs, uniq(@ids))."\n" if @ids;
+ ($apfx, @ids) = @F;
+}
+END { print $apfx.' '.join($ofs, uniq(@ids))."\n" if @ids }
+EOM
+
# TODO: may be used for reshard/compact
sub count_shards { scalar($_[0]->xdb_shards_flat) }
@@ -519,16 +535,21 @@ sub dump_roots_once {
@ID2ROOT = map { pack('H*', $_) } $self->all_terms('G');
my $id = 0;
my %root2id = map { $_ => $id++ } @ID2ROOT;
- pipe(my ($r, $w)) or die "pipe: $!";
+ # dump_shard_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_id
+ pipe(my ($sort_r, $sort_w)) or die "pipe: $!";
+ pipe(my ($fold_r, $fold_w)) or die "pipe: $!";
my @sort = (@SORT, '-k1,1');
my $dst = "$TMPDIR/to_root_id";
open my $fh, '>', $dst or die "open($dst): $!";
- my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $r, 1 => $fh });
- close $r or die "close: $!";
+ my $env = { %$CMD_ENV, OFS => ' ' };
+ my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fold_w });
+ my $fold_pid = spawn(\@UNIQ_FOLD, $env, { 0 => $fold_r, 1 => $fh });
awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
+ awaitpid($fold_pid, \&cmd_done, [@UNIQ_FOLD, '(shards)'], $associate);
my ($c, $p) = PublicInbox::PktOp->pair;
$c->{ops}->{dump_shard_roots_done} = [ $self, $associate ];
- my @arg = ('dump_shard_roots', [ $p->{op_p}, $w ], \%root2id, $QRY_STR);
+ my @arg = ('dump_shard_roots', [ $p->{op_p}, $sort_w ],
+ \%root2id, $QRY_STR);
$_->wq_io_do(@arg) for @IDX_SHARDS;
progress($self, 'waiting on dump_shard_roots sort');
}
@@ -549,12 +570,15 @@ sub dump_ibx { # sends to xap_helper.h
sub dump_ibx_start {
my ($self, $associate) = @_;
pipe(my $sort_r, $DUMP_IBX_WPIPE) or die "pipe: $!";
- my @sort = (@SORT, '-k1,1');
+ pipe(my ($fold_r, $fold_w)) or die "pipe: $!";
+ my @sort = (@SORT, '-k1,1'); # sort only on ASSOC_PFX
+ # pipeline: dump_ibx | sort -k1,1 | uniq_fold >to_ibx_id
my $dst = "$TMPDIR/to_ibx_id";
open my $fh, '>', $dst or die "open($dst): $!";
- my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fh });
- close $sort_r or die "close: $!";
+ my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fold_w });
+ my $fold_pid = spawn(\@UNIQ_FOLD, $CMD_ENV, { 0 => $fold_r, 1 => $fh });
awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
+ awaitpid($fold_pid, \&cmd_done, [@UNIQ_FOLD, '(ibx)'], $associate);
($XHC, $XH_PID) = PublicInbox::XapClient::start_helper("-j$NPROC");
}
@@ -869,9 +893,11 @@ sub associate {
my @join = ('time', @JOIN, 'to_ibx_id', 'to_root_id');
my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" });
my %score;
- while (<$rd>) { # PFX ibx_id root_id
- my (undef, $ibx_id, @root_id) = split(/ /, $_);
- ++$score{"$ibx_id $_"} for @root_id;
+ while (<$rd>) { # PFX ibx_ids root_id
+ my (undef, $ibx_ids, @root_ids) = split(/ /, $_);
+ for my $ibx_id (split(/,/, $ibx_ids)) {
+ ++$score{"$ibx_id $_"} for @root_ids;
+ }
}
close $rd or die "@join failed: $?=$?";
my $min = $self->{-opt}->{'assoc-min'} // 10;
@@ -939,9 +965,10 @@ sub init_prune ($) {
my $run_prune = PublicInbox::OnDestroy->new($$, \&run_prune, $self);
pipe(my ($sed_in, $delve_out)) or die "pipe: $!";
pipe(my ($sort_in, $sed_out)) or die "pipe: $!";
+ my @sort_u = (@SORT, '-u');
open(my $sort_out, '+>', "$TMPDIR/indexed_commits") or die "open: $!";
- my $pid = spawn(\@SORT, $CMD_ENV, { 0 => $sort_in, 1 => $sort_out });
- awaitpid($pid, \&cmd_done, \@SORT, $run_prune);
+ my $pid = spawn(\@sort_u, $CMD_ENV, { 0 => $sort_in, 1 => $sort_out });
+ awaitpid($pid, \&cmd_done, \@sort_u, $run_prune);
$pid = spawn(\@sed, $CMD_ENV, { 0 => $sed_in, 1 => $sed_out });
awaitpid($pid, \&cmd_done, \@sed, $run_prune);
$pid = spawn(\@delve, undef, { 1 => $delve_out });
@@ -971,12 +998,13 @@ sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done
pipe(my ($sort_in, $awk_out)) or die "pipe: $!";
pipe(my ($comm_in, $sort_out)) or die "pipe: $!";
my $awk_pid = spawn(\@AWK, $CMD_ENV, { 0 => $awk_in, 1 => $awk_out });
- my $sort_pid = spawn(\@SORT, $CMD_ENV,
+ my @sort_u = (@SORT, '-u');
+ my $sort_pid = spawn(\@sort_u, $CMD_ENV,
{ 0 => $sort_in, 1 => $sort_out });
my ($comm_rd, $comm_pid) = popen_rd(\@COMM, $CMD_ENV,
{ 0 => $comm_in, -C => "$TMPDIR" });
awaitpid($awk_pid, \&cmd_done, \@AWK);
- awaitpid($sort_pid, \&cmd_done, \@SORT);
+ awaitpid($sort_pid, \&cmd_done, \@sort_u);
awaitpid($comm_pid, \&cmd_done, \@COMM);
PublicInbox::CidxComm->new($comm_rd, $self); # calls cidx_read_comm
my $git_ver = PublicInbox::Git::git_version();
@@ -1092,10 +1120,9 @@ sub cidx_run { # main entry point
local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
$REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
%TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE,
- @ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC);
+ @ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC, @SORT);
local $BATCH_BYTES = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
- local @SORT = (undef, '-u');
local $self->{ASSOC_PFX} = \@ASSOC_PFX;
local $self->{PENDING} = {};
local $self->{-pi_cfg};
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH 6/7] cindex: implement dump_roots in C++
2023-08-24 1:22 [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
` (4 preceding siblings ...)
2023-08-24 1:22 ` [PATCH 5/7] cindex: fix sorting and uniqueness Eric Wong
@ 2023-08-24 1:22 ` Eric Wong
2023-08-24 1:22 ` [PATCH 7/7] xap_helper: reopen+retry in MSetIterator loops Eric Wong
2023-08-24 12:30 ` [PATCH 8/7] drop unused CidxRecvIbx.pm Eric Wong
7 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2023-08-24 1:22 UTC (permalink / raw)
To: meta
It's now just `dump_roots' instead of `dump_shard_roots', since
this doesn't need to be tied to the concept of shards. I'm
still shaky with C++, but intend to keep using stuff like
hsearch(3) to make life easier for C hackers :P
---
MANIFEST | 1 -
lib/PublicInbox/CidxDumpShardRoots.pm | 73 ------
lib/PublicInbox/CidxXapHelperAux.pm | 10 +-
lib/PublicInbox/CodeSearchIdx.pm | 56 ++---
lib/PublicInbox/XapHelper.pm | 52 +++-
lib/PublicInbox/xap_helper.h | 332 +++++++++++++++++++++++---
t/xap_helper.t | 44 +++-
7 files changed, 407 insertions(+), 161 deletions(-)
delete mode 100644 lib/PublicInbox/CidxDumpShardRoots.pm
diff --git a/MANIFEST b/MANIFEST
index 4f61af42..4bccc849 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -162,7 +162,6 @@ lib/PublicInbox/AltId.pm
lib/PublicInbox/AutoReap.pm
lib/PublicInbox/Cgit.pm
lib/PublicInbox/CidxComm.pm
-lib/PublicInbox/CidxDumpShardRoots.pm
lib/PublicInbox/CidxLogP.pm
lib/PublicInbox/CidxRecvIbx.pm
lib/PublicInbox/CidxXapHelperAux.pm
diff --git a/lib/PublicInbox/CidxDumpShardRoots.pm b/lib/PublicInbox/CidxDumpShardRoots.pm
deleted file mode 100644
index 34afa419..00000000
--- a/lib/PublicInbox/CidxDumpShardRoots.pm
+++ /dev/null
@@ -1,73 +0,0 @@
-# Copyright (C) all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-
-# Intended for PublicInbox::DS::event_loop for -cindex --associate
-# Iterating through mset->items is slow in Perl due to method dispatch
-# and that loop may implemented in C++ using Xapian directly
-package PublicInbox::CidxDumpShardRoots;
-use v5.12;
-use PublicInbox::Lock;
-use PublicInbox::Search qw(xap_terms);
-use Socket qw(MSG_EOR);
-
-sub start {
- my ($cidx, $root2id, $qry_str) = @_;
- my $op_p = delete($cidx->{0}) // die 'BUG: no {0} op_p';
- my $sort_w = delete($cidx->{1}) // die 'BUG: no {1} $w sort pipe';
- # sort lock is necessary if we have may root ids which cause a
- # row length to exceed POSIX PIPE_BUF (via `$G' below)
- my $sort_lk = bless { lock_path => $cidx->tmpdir.'/to_root_id.lock' },
- 'PublicInbox::Lock';
- $sort_w->autoflush(1);
- $cidx->begin_txn_lazy; # only using txn to simplify writer subs
- my $opt = { limit => $cidx->assoc_max_init, relevance => -2 };
- my $self = bless {
- cidx => $cidx,
- op_p => $op_p,
- iter => 0,
- mset => $cidx->mset($qry_str, $opt),
- root2id => $root2id,
- sort_w => $sort_w,
- sort_lk => $sort_lk,
- }, __PACKAGE__;
- event_step($self);
-}
-
-sub event_step {
- my ($self) = @_;
- my $cidx = $self->{cidx};
- return if $cidx->do_quit;
- my $last = $self->{mset}->size - 1;
- my $cur = $self->{iter};
- my $end = $cur + 9999;
- $end = $last if $end > $last;
- $self->{iter} = $end + 1;
- local $0 = "dumping shard [$cidx->{shard}] $cur..$end";
- $cidx->progress($0);
-
- my $root2id = $self->{root2id};
- my $buf = '';
- for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop
- my $doc = $x->get_document;
- my $G = join(' ', map {
- $root2id->{pack('H*', $_)};
- } xap_terms('G', $doc));
- for my $p (@{$cidx->{ASSOC_PFX}}) {
- $buf .= "$_ $G\n" for (xap_terms($p, $doc));
- }
- }
- $self->{sort_lk}->lock_acquire_fast;
- print { $self->{sort_w} } $buf or die "print: $!";
- $self->{sort_lk}->lock_release_fast;
- $end < $last && !$cidx->do_quit and
- PublicInbox::DS::requeue($self);
-}
-
-sub DESTROY {
- my ($self) = @_;
- return if $self->{cidx}->do_quit;
- send($self->{op_p},
- "dump_shard_roots_done $self->{cidx}->{shard}", MSG_EOR);
-}
-
-1;
diff --git a/lib/PublicInbox/CidxXapHelperAux.pm b/lib/PublicInbox/CidxXapHelperAux.pm
index c9a5ddad..f402bde0 100644
--- a/lib/PublicInbox/CidxXapHelperAux.pm
+++ b/lib/PublicInbox/CidxXapHelperAux.pm
@@ -10,12 +10,8 @@ use PublicInbox::Syscall qw(EPOLLIN);
# rpipe connects to req->fp[1] in xap_helper.h
sub new {
- my ($cls, $rpipe, $cidx, $pfx, $associate) = @_;
- my $self = bless {
- cidx => $cidx,
- pfx => $pfx,
- associate => $associate
- }, $cls;
+ my ($cls, $rpipe, $cidx, $pfx) = @_;
+ my $self = bless { cidx => $cidx, pfx => $pfx }, $cls;
$rpipe->blocking(0);
$self->SUPER::new($rpipe, EPOLLIN);
}
@@ -36,7 +32,7 @@ sub event_step {
my @lines = split(/^/m, $buf);
$self->{buf} = pop @lines if substr($lines[-1], -1) ne "\n";
for my $l (@lines) {
- if ($l =~ /\Amset\.size=[0-9]+\n\z/) {
+ if ($l =~ /\Amset\.size=[0-9]+ nr_out=[0-9]+\n\z/) {
delete $self->{cidx}->{PENDING}->{$pfx};
$self->{cidx}->index_next;
}
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 4a41b1da..404d6826 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -87,7 +87,6 @@ our (
@AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands
@ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST
$QRY_STR, # common query string for both code and inbox associations
- @DUMP_SHARD_ROOTS_OK, # for associate
$DUMP_IBX_WPIPE, # goes to sort(1)
@ID2ROOT,
);
@@ -505,14 +504,6 @@ sub shard_commit { # via wq_io_do
send($op_p, "shard_done $self->{shard}", MSG_EOR);
}
-sub dump_shard_roots_done { # via PktOp on dump_shard_roots completion
- my ($self, $associate, $n) = @_;
- return if $DO_QUIT;
- progress($self, "dump_shard_roots [$n] done");
- $DUMP_SHARD_ROOTS_OK[$n] = 1;
- # may run associate()
-}
-
sub assoc_max_init ($) {
my ($self) = @_;
my $max = $self->{-opt}->{'associate-max'} // $ASSOC_MAX;
@@ -520,38 +511,39 @@ sub assoc_max_init ($) {
$max < 0 ? ((2 ** 31) - 1) : $max;
}
-# dump the patchids of each shard: $XDFID $ROOT1 $ROOT2..
-sub dump_shard_roots { # via wq_io_do for associate
- my ($self, $root2id, $qry_str) = @_;
- PublicInbox::CidxDumpShardRoots::start($self, $root2id, $qry_str);
-}
-
sub dump_roots_once {
my ($self, $associate) = @_;
$associate // die 'BUG: no $associate';
$TODO{associating} = 1; # keep shards_active() happy
progress($self, 'dumping IDs from coderepos');
local $self->{xdb};
- @ID2ROOT = map { pack('H*', $_) } $self->all_terms('G');
- my $id = 0;
- my %root2id = map { $_ => $id++ } @ID2ROOT;
- # dump_shard_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_id
+ @ID2ROOT = $self->all_terms('G');
+ my $root2id = "$TMPDIR/root2id";
+ open my $fh, '>', $root2id or die "open($root2id): $!";
+ my $nr = -1;
+ for (@ID2ROOT) { print $fh $_, "\0", ++$nr, "\0" } # mmap-friendly
+ close $fh or die "close: $!";
+ # dump_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_id
pipe(my ($sort_r, $sort_w)) or die "pipe: $!";
pipe(my ($fold_r, $fold_w)) or die "pipe: $!";
my @sort = (@SORT, '-k1,1');
my $dst = "$TMPDIR/to_root_id";
- open my $fh, '>', $dst or die "open($dst): $!";
+ open $fh, '>', $dst or die "open($dst): $!";
my $env = { %$CMD_ENV, OFS => ' ' };
my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fold_w });
my $fold_pid = spawn(\@UNIQ_FOLD, $env, { 0 => $fold_r, 1 => $fh });
awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
awaitpid($fold_pid, \&cmd_done, [@UNIQ_FOLD, '(shards)'], $associate);
- my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{dump_shard_roots_done} = [ $self, $associate ];
- my @arg = ('dump_shard_roots', [ $p->{op_p}, $sort_w ],
- \%root2id, $QRY_STR);
- $_->wq_io_do(@arg) for @IDX_SHARDS;
- progress($self, 'waiting on dump_shard_roots sort');
+ my @arg = ((map { ('-A', $_) } @ASSOC_PFX), '-c',
+ '-m', assoc_max_init($self), $root2id, $QRY_STR);
+ for my $d ($self->shard_dirs) {
+ pipe(my ($err_r, $err_w)) or die "pipe: $!";
+ $XHC->mkreq([$sort_w, $err_w], qw(dump_roots -d), $d, @arg);
+ my $desc = "dump_roots $d";
+ $self->{PENDING}->{$desc} = $associate;
+ PublicInbox::CidxXapHelperAux->new($err_r, $self, $desc);
+ }
+ progress($self, 'waiting on dump_roots sort');
}
sub dump_ibx { # sends to xap_helper.h
@@ -563,8 +555,8 @@ sub dump_ibx { # sends to xap_helper.h
pipe(my ($r, $w)) or die "pipe: $!";
$XHC->mkreq([$DUMP_IBX_WPIPE, $w], @cmd);
my $ekey = $ibx->eidx_key;
- $self->{PENDING}->{$ekey} = undef;
- PublicInbox::CidxXapHelperAux->new($r, $self, $ekey, $TODO{associate});
+ $self->{PENDING}->{$ekey} = $TODO{associate};
+ PublicInbox::CidxXapHelperAux->new($r, $self, $ekey);
}
sub dump_ibx_start {
@@ -885,8 +877,7 @@ sub associate {
my ($self) = @_;
return if $DO_QUIT;
@IDX_SHARDS or return warn("# aborting on no shards\n");
- grep(defined, @DUMP_SHARD_ROOTS_OK) == @IDX_SHARDS or
- die "E: shards not dumped properly\n";
+ unlink("$TMPDIR/root2id");
my @pending = keys %{$self->{PENDING}};
die "E: pending=@pending jobs not done\n" if @pending;
progress($self, 'associating...');
@@ -906,7 +897,7 @@ sub associate {
my $nr = $score{$k};
my ($ibx_id, $root) = split(/ /, $k);
my $ekey = $IBX[$ibx_id]->eidx_key;
- $root = unpack('H*', $ID2ROOT[$root]);
+ $root = $ID2ROOT[$root];
progress($self, "$ekey => $root has $nr matches");
}
delete $TODO{associating}; # break out of shards_active()
@@ -1048,7 +1039,6 @@ sub cidx_read_comm { # via PublicInbox::CidxComm::event_step
sub init_associate_prefork ($) {
my ($self) = @_;
return unless $self->{-opt}->{associate};
- require PublicInbox::CidxDumpShardRoots;
require PublicInbox::CidxXapHelperAux;
require PublicInbox::XapClient;
$self->{-pi_cfg} = PublicInbox::Config->new;
@@ -1120,7 +1110,7 @@ sub cidx_run { # main entry point
local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
$REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
%TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE,
- @ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC, @SORT);
+ @ID2ROOT, $XH_PID, $XHC, @SORT);
local $BATCH_BYTES = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
local $self->{ASSOC_PFX} = \@ASSOC_PFX;
diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm
index bf2f99a2..c80be810 100644
--- a/lib/PublicInbox/XapHelper.pm
+++ b/lib/PublicInbox/XapHelper.pm
@@ -8,7 +8,9 @@ use Getopt::Long (); # good API even if we only use short options
our $GLP = Getopt::Long::Parser->new;
$GLP->configure(qw(require_order bundling no_ignore_case no_auto_abbrev));
use PublicInbox::Search qw(xap_terms);
+use PublicInbox::CodeSearch;
use PublicInbox::IPC;
+use Fcntl qw(LOCK_UN LOCK_EX);
my $X = \%PublicInbox::Search::X;
our (%SRCH, %PIDS, $parent_pid);
our $stderr = \*STDERR;
@@ -44,15 +46,63 @@ sub cmd_dump_ibx {
my $mset = $req->{srch}->mset($qry_str, $opt);
my $out = $req->{0};
$out->autoflush(1);
+ my $nr = 0;
for my $it ($mset->items) {
my $doc = $it->get_document;
for my $p (@pfx) {
for (xap_terms($p, $doc)) {
print $out "$_ $ibx_id\n" or die "print: $!";
+ ++$nr;
}
}
}
- if (my $err = $req->{1}) { say $err 'mset.size=', $mset->size }
+ if (my $err = $req->{1}) {
+ say $err 'mset.size='.$mset->size.' nr_out='.$nr
+ }
+}
+
+sub cmd_dump_roots {
+ my ($req, $root2id_file, $qry_str) = @_;
+ $qry_str // return
+ warn('usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR');
+ my @pfx = @{$req->{A}} or return warn('dump_roots requires -A PREFIX');
+ open my $fh, '<', $root2id_file or die "open($root2id_file): $!";
+ my %root2id; # record format: $OIDHEX "\0" uint32_t
+ my @x = split(/\0/, do { local $/; <$fh> } // die "readline: $!");
+ while (@x) {
+ my $oidhex = shift @x;
+ $root2id{$oidhex} = shift @x;
+ }
+ my $opt = { relevance => -1, limit => $req->{'m'},
+ offset => $req->{o} // 0 };
+ my $mset = $req->{srch}->mset($qry_str, $opt);
+ $req->{0}->autoflush(1);
+ my $buf = '';
+ my $nr = 0;
+ for my $it ($mset->items) {
+ my $doc = $it->get_document;
+ my $G = join(' ', map { $root2id{$_} } xap_terms('G', $doc));
+ for my $p (@pfx) {
+ for (xap_terms($p, $doc)) {
+ $buf .= "$_ $G\n";
+ ++$nr;
+ }
+ }
+ if (!($nr & 0x3fff)) {
+ flock($fh, LOCK_EX) or die "flock: $!";
+ print { $req->{0} } $buf or die "print: $!";
+ flock($fh, LOCK_UN) or die "flock: $!";
+ $buf = '';
+ }
+ }
+ if ($buf ne '') {
+ flock($fh, LOCK_EX) or die "flock: $!";
+ print { $req->{0} } $buf or die "print: $!";
+ flock($fh, LOCK_UN) or die "flock: $!";
+ }
+ if (my $err = $req->{1}) {
+ say $err 'mset.size='.$mset->size.' nr_out='.$nr
+ }
}
sub dispatch {
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
index 52db92b7..c9b4e0cc 100644
--- a/lib/PublicInbox/xap_helper.h
+++ b/lib/PublicInbox/xap_helper.h
@@ -6,12 +6,16 @@
* this is not linked to Perl in any way.
* C (not C++) is used as much as possible to lower the contribution
* barrier for hackers who mainly know C (this includes the maintainer).
+ * Yes, that means we use C stdlib stuff like hsearch and open_memstream
+ * instead their equivalents in the C++ stdlib :P
* Everything here is an unstable internal API of public-inbox and
* NOT intended for ordinary users; only public-inbox hackers
*/
#ifndef _ALL_SOURCE
# define _ALL_SOURCE
#endif
+#include <sys/file.h>
+#include <sys/mman.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/stat.h>
@@ -80,6 +84,7 @@ struct req { // argv and pfxv point into global rbuf
unsigned long long max;
unsigned long long off;
unsigned long timeout_sec;
+ size_t nr_out;
long sort_col; // value column, negative means BoolWeight
int argc;
int pfxc;
@@ -96,6 +101,28 @@ struct worker {
unsigned nr;
};
+#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst))
+static size_t split2argv(char **dst, char *buf, size_t len, size_t limit)
+{
+ if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) {
+ warnx("bogus argument given");
+ return 0;
+ }
+ size_t nr = 0;
+ char *c = buf;
+ for (size_t i = 1; i < len; i++) {
+ if (!buf[i]) {
+ dst[nr++] = c;
+ c = buf + i + 1;
+ }
+ if (nr == limit) {
+ warnx("too many args: %zu", nr);
+ return 0;
+ }
+ }
+ return (long)nr;
+}
+
static bool has_threadid(const struct srch *srch)
{
return srch->db->get_metadata("has_threadid") == "1";
@@ -118,8 +145,12 @@ static Xapian::Enquire prep_enquire(const struct req *req)
static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq)
{
- if (!req->max)
- req->max = 50;
+ if (!req->max) {
+ switch (sizeof(Xapian::doccount)) {
+ case 4: req->max = UINT_MAX; break;
+ default: req->max = ULLONG_MAX;
+ }
+ }
for (int i = 0; i < 9; i++) {
try {
Xapian::MSet mset = enq->get_mset(req->off, req->max);
@@ -131,13 +162,13 @@ static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq)
return enq->get_mset(req->off, req->max);
}
+// for v1, v2, and extindex
static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
{
struct srch *srch = req->srch;
Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
if (req->Oeidx_key) {
req->Oeidx_key[0] = 'O'; // modifies static rbuf
- fprintf(stderr, "dbg eidxkey:%s>\n", req->Oeidx_key);
qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
Xapian::Query(req->Oeidx_key));
}
@@ -150,6 +181,21 @@ static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
return enquire_mset(req, &enq);
}
+// for cindex
+static Xapian::MSet commit_mset(struct req *req, const char *qry_str)
+{
+ struct srch *srch = req->srch;
+ Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
+ // TODO: git_dir + roots_filter
+
+ // we only want commits:
+ qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
+ Xapian::Query("T" "c"));
+ Xapian::Enquire enq = prep_enquire(req);
+ enq.set_query(qry);
+ return enquire_mset(req, &enq);
+}
+
static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
{
return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
@@ -165,9 +211,11 @@ static void dump_ibx_term(struct req *req, const char *pfx,
for (cur.skip_to(pfx); cur != end; cur++) {
std::string tn = *cur;
- if (starts_with(&tn, pfx, pfx_len))
+ if (starts_with(&tn, pfx, pfx_len)) {
fprintf(req->fp[0], "%s %s\n",
tn.c_str() + pfx_len, ibx_id);
+ ++req->nr_out;
+ }
}
}
@@ -194,7 +242,6 @@ static bool cmd_dump_ibx(struct req *req)
}
req->asc = true;
req->sort_col = -1;
- req->max = (unsigned long long)req->srch->db->get_doccount();
Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]);
for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
try {
@@ -208,8 +255,244 @@ static bool cmd_dump_ibx(struct req *req)
}
}
if (req->fp[1])
- fprintf(req->fp[1], "mset.size=%llu\n",
- (unsigned long long)mset.size());
+ fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
+ (unsigned long long)mset.size(), req->nr_out);
+ return true;
+}
+
+struct fbuf {
+ FILE *fp;
+ char *ptr;
+ size_t len;
+};
+
+struct dump_roots_tmp {
+ struct stat sb;
+ void *mm_ptr;
+ char **entries;
+ struct fbuf wbuf;
+ int root2id_fd;
+};
+
+#define CLEANUP_FBUF __attribute__((__cleanup__(fbuf_ensure)))
+static void fbuf_ensure(void *ptr)
+{
+ struct fbuf *fbuf = (struct fbuf *)ptr;
+ if (fbuf->fp && fclose(fbuf->fp))
+ perror("fclose(fbuf->fp)");
+ fbuf->fp = NULL;
+ free(fbuf->ptr);
+}
+
+static bool fbuf_init(struct fbuf *fbuf)
+{
+ assert(!fbuf->ptr);
+ fbuf->fp = open_memstream(&fbuf->ptr, &fbuf->len);
+ if (fbuf->fp) return true;
+ perror("open_memstream(fbuf)");
+ return false;
+}
+
+static void xclose(int fd)
+{
+ if (close(fd) < 0 && errno != EINTR)
+ err(EXIT_FAILURE, "BUG: close");
+}
+
+#define CLEANUP_DUMP_ROOTS __attribute__((__cleanup__(dump_roots_ensure)))
+static void dump_roots_ensure(void *ptr)
+{
+ struct dump_roots_tmp *drt = (struct dump_roots_tmp *)ptr;
+ if (drt->root2id_fd >= 0)
+ xclose(drt->root2id_fd);
+ hdestroy(); // idempotent
+ if (drt->mm_ptr && munmap(drt->mm_ptr, drt->sb.st_size))
+ err(EXIT_FAILURE, "BUG: munmap");
+ free(drt->entries);
+ fbuf_ensure(&drt->wbuf);
+}
+
+static bool root2ids_str(struct fbuf *root_ids, struct dump_roots_tmp *drt,
+ Xapian::Document *doc)
+{
+ if (!fbuf_init(root_ids)) return false;
+
+ bool ok = true;
+ Xapian::TermIterator cur = doc->termlist_begin();
+ Xapian::TermIterator end = doc->termlist_end();
+ ENTRY e, *ep;
+ for (cur.skip_to("G"); cur != end; cur++) {
+ std::string tn = *cur;
+ if (!starts_with(&tn, "G", 1))
+ continue;
+ union { const char *in; char *out; } u;
+ u.in = tn.c_str() + 1;
+ e.key = u.out;
+ ep = hsearch(e, FIND);
+ if (!ep) {
+ warnx("hsearch miss `%s'", e.key);
+ return false;
+ }
+ // ep->data is a NUL-terminated string matching /[0-9]+/
+ fputc(' ', root_ids->fp);
+ fputs((const char *)ep->data, root_ids->fp);
+ }
+ fputc('\n', root_ids->fp);
+ if (ferror(root_ids->fp) | fclose(root_ids->fp)) {
+ perror("ferror|fclose(root_ids)");
+ ok = false;
+ }
+ root_ids->fp = NULL;
+ return ok;
+}
+
+// writes term values matching @pfx for a given @doc, ending the line
+// with the contents of @root_ids
+static void dump_roots_term(struct req *req, const char *pfx,
+ struct dump_roots_tmp *drt,
+ struct fbuf *root_ids,
+ Xapian::Document *doc)
+{
+ Xapian::TermIterator cur = doc->termlist_begin();
+ Xapian::TermIterator end = doc->termlist_end();
+ size_t pfx_len = strlen(pfx);
+
+ for (cur.skip_to(pfx); cur != end; cur++) {
+ std::string tn = *cur;
+ if (!starts_with(&tn, pfx, pfx_len))
+ continue;
+ fputs(tn.c_str() + pfx_len, drt->wbuf.fp);
+ fwrite(root_ids->ptr, root_ids->len, 1, drt->wbuf.fp);
+ ++req->nr_out;
+ }
+}
+
+// we may have lines which exceed PIPE_BUF, so we do our own
+// buffering and rely on flock(2), here
+static bool dump_roots_flush(struct req *req, struct dump_roots_tmp *drt)
+{
+ char *p;
+ int fd = fileno(req->fp[0]);
+ bool ok = true;
+
+ if (!drt->wbuf.fp) return true;
+ if (fd < 0) err(EXIT_FAILURE, "BUG: fileno");
+ if (fclose(drt->wbuf.fp)) {
+ warn("fclose(drt->wbuf.fp)"); // malloc failure?
+ return false;
+ }
+ drt->wbuf.fp = NULL;
+ if (!drt->wbuf.len) goto done_free;
+ if (flock(drt->root2id_fd, LOCK_EX)) {
+ perror("LOCK_EX");
+ return false;
+ }
+ p = drt->wbuf.ptr;
+ do {
+ ssize_t n = write(fd, p, drt->wbuf.len);
+ if (n > 0) {
+ drt->wbuf.len -= n;
+ p += n;
+ } else {
+ perror(n ? "write" : "write (zero bytes)");
+ return false;
+ }
+ } while (drt->wbuf.len);
+ if (flock(drt->root2id_fd, LOCK_UN)) {
+ perror("LOCK_UN");
+ return false;
+ }
+done_free:
+ free(drt->wbuf.ptr);
+ drt->wbuf.ptr = NULL;
+ return ok;
+}
+
+static bool cmd_dump_roots(struct req *req)
+{
+ CLEANUP_DUMP_ROOTS struct dump_roots_tmp drt { .root2id_fd = -1 };
+ if ((optind + 1) >= req->argc) {
+ warnx("usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR");
+ return false; // need file + qry_str
+ }
+ if (!req->pfxc) {
+ warnx("dump_roots requires -A PREFIX");
+ return false;
+ }
+ const char *root2id_file = req->argv[optind];
+ drt.root2id_fd = open(root2id_file, O_RDONLY);
+ if (drt.root2id_fd < 0) {
+ warn("open(%s)", root2id_file);
+ return false;
+ }
+ if (fstat(drt.root2id_fd, &drt.sb)) {
+ warn("fstat(%s)", root2id_file);
+ return false;
+ }
+ // each entry is at least 43 bytes ({OIDHEX}\0{INT}\0),
+ // so /32 overestimates the number of expected entries by
+ // ~%25 (as recommended by Linux hcreate(3) manpage)
+ size_t est = (drt.sb.st_size / 32) + 1;
+ if ((uint64_t)drt.sb.st_size > (uint64_t)SIZE_MAX) {
+ warnx("%s size too big (%lld bytes > %zu)", root2id_file,
+ (long long)drt.sb.st_size, SIZE_MAX);
+ return false;
+ }
+ drt.mm_ptr = mmap(NULL, drt.sb.st_size, PROT_READ,
+ MAP_PRIVATE, drt.root2id_fd, 0);
+ if (drt.mm_ptr == MAP_FAILED) {
+ warn("mmap(%s)", root2id_file);
+ return false;
+ }
+ drt.entries = (char **)calloc(est * 2, sizeof(char *));
+ if (!drt.entries) {
+ warn("calloc(%zu * 2, %zu)", est, sizeof(char *));
+ return false;
+ }
+ size_t tot = split2argv(drt.entries, (char *)drt.mm_ptr,
+ drt.sb.st_size, est * 2);
+ if (tot <= 0) return false; // split2argv already warned on error
+ if (!hcreate(est)) {
+ warn("hcreate(%zu)", est);
+ return false;
+ }
+ for (size_t i = 0; i < tot; ) {
+ ENTRY e;
+ e.key = drt.entries[i++];
+ e.data = drt.entries[i++];
+ if (!hsearch(e, ENTER)) {
+ warn("hsearch(%s => %s, ENTER)", e.key,
+ (const char *)e.data);
+ return false;
+ }
+ }
+ req->asc = true;
+ req->sort_col = -1;
+ Xapian::MSet mset = commit_mset(req, req->argv[optind + 1]);
+ for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
+ CLEANUP_FBUF struct fbuf root_ids = { 0 };
+ if (!drt.wbuf.fp && !fbuf_init(&drt.wbuf))
+ return false;
+ try {
+ Xapian::Document doc = i.get_document();
+ if (!root2ids_str(&root_ids, &drt, &doc))
+ return false;
+ for (int p = 0; p < req->pfxc; p++)
+ dump_roots_term(req, req->pfxv[p], &drt,
+ &root_ids, &doc);
+ } catch (const Xapian::Error & e) {
+ fprintf(orig_err, "W: %s (#%ld)\n",
+ e.get_description().c_str(), (long)(*i));
+ continue;
+ }
+ if (!(req->nr_out & 0x3fff) && !dump_roots_flush(req, &drt))
+ return false;
+ }
+ if (!dump_roots_flush(req, &drt))
+ return false;
+ if (req->fp[1])
+ fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
+ (unsigned long long)mset.size(), req->nr_out);
return true;
}
@@ -228,7 +511,8 @@ static const struct cmd_entry {
cmd fn;
} cmds[] = { // should be small enough to not need bsearch || gperf
// most common commands first
- CMD(dump_ibx),
+ CMD(dump_ibx), // many inboxes
+ CMD(dump_roots), // per-cidx shard
CMD(test_inspect), // least common commands last
};
@@ -240,12 +524,6 @@ union my_cmsg {
char pad[sizeof(struct cmsghdr) + 16 + RECV_FD_SPACE];
};
-static void xclose(int fd)
-{
- if (close(fd) < 0 && errno != EINTR)
- err(EXIT_FAILURE, "BUG: close");
-}
-
static bool recv_req(struct req *req, char *rbuf, size_t *len)
{
union my_cmsg cmsg = { 0 };
@@ -306,28 +584,6 @@ static bool recv_req(struct req *req, char *rbuf, size_t *len)
return false;
}
-#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst))
-static int split2argv(char **dst, char *buf, size_t len, size_t limit)
-{
- if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) {
- warnx("bogus argument given");
- return 0;
- }
- size_t nr = 0;
- char *c = buf;
- for (size_t i = 1; i < len; i++) {
- if (!buf[i]) {
- dst[nr++] = c;
- c = buf + i + 1;
- }
- if (nr == limit) {
- warnx("too many args: %zu", nr);
- return 0;
- }
- }
- return (int)nr;
-}
-
static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch
{
const struct srch *a = (const struct srch *)pa;
@@ -355,7 +611,7 @@ static bool srch_init(struct req *req)
char *dirv[MY_ARG_MAX];
int i;
struct srch *srch = req->srch;
- int dirc = SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len);
+ int dirc = (int)SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len);
const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE;
srch->qp_flags = FLAG_PHRASE |
Xapian::QueryParser::FLAG_BOOLEAN |
@@ -538,7 +794,7 @@ static void recv_loop(void) // worker process loop
perror("W: setlinebuf(req.fp[1])");
stderr = req.fp[1];
}
- req.argc = SPLIT2ARGV(req.argv, rbuf, len);
+ req.argc = (int)SPLIT2ARGV(req.argv, rbuf, len);
if (req.argc > 0)
dispatch(&req);
if (ferror(req.fp[0]) | fclose(req.fp[0]))
diff --git a/t/xap_helper.t b/t/xap_helper.t
index f00a845a..92da2e6d 100644
--- a/t/xap_helper.t
+++ b/t/xap_helper.t
@@ -91,7 +91,7 @@ my $test = sub {
my $res = do { local $/; <$r> };
is(join('', @res), $res, 'got identical response w/ error pipe');
my $stats = do { local $/; <$err_rd> };
- is($stats, "mset.size=6\n", 'mset.size reported');
+ is($stats, "mset.size=6 nr_out=6\n", 'mset.size reported');
if ($arg[-1] !~ /\('-j0'\)/) {
kill('KILL', $cinfo{pid});
@@ -105,12 +105,14 @@ my $test = sub {
};
my $ar;
-$ar = $test->(qw[-MPublicInbox::XapHelper -e
- PublicInbox::XapHelper::start('-j0')]);
-$ar = $test->(qw[-MPublicInbox::XapHelper -e
- PublicInbox::XapHelper::start('-j1')]);
-
-my @NO_CXX = (0);
+my @NO_CXX;
+if (!$ENV{TEST_XH_CXX_ONLY}) {
+ $ar = $test->(qw[-MPublicInbox::XapHelper -e
+ PublicInbox::XapHelper::start('-j0')]);
+ $ar = $test->(qw[-MPublicInbox::XapHelper -e
+ PublicInbox::XapHelper::start('-j1')]);
+ push @NO_CXX, 0;
+}
SKIP: {
eval {
require PublicInbox::XapHelperCxx;
@@ -125,6 +127,20 @@ SKIP: {
PublicInbox::XapHelperCxx::start('-j1')]);
};
+require PublicInbox::CodeSearch;
+my $cs_int = PublicInbox::CodeSearch->new("$crepo/public-inbox-cindex");
+my $root2id_file = "$tmp/root2id";
+my @id2root;
+{
+ open my $fh, '>', $root2id_file;
+ my $i = -1;
+ for ($cs_int->all_terms('G')) {
+ print $fh $_, "\0", ++$i, "\0";
+ $id2root[$i] = $_;
+ }
+ close $fh;
+}
+
for my $n (@NO_CXX) {
local $ENV{PI_NO_CXX} = $n;
my ($xhc, $pid) = PublicInbox::XapClient::start_helper('-j0');
@@ -141,7 +157,19 @@ for my $n (@NO_CXX) {
my $res = do { local $/; <$r> };
is($res, "$dfid 9\n$mid 9\n", "got expected result ($xhc->{impl})");
my $err = do { local $/; <$err_r> };
- is($err, "mset.size=1\n", "got expected status ($xhc->{impl})");
+ is($err, "mset.size=1 nr_out=2\n", "got expected status ($xhc->{impl})");
+
+ pipe($err_r, $err_w);
+ $r = $xhc->mkreq([ undef, $err_w ], qw(dump_roots -c -A XDFID),
+ (map { ('-d', $_) } @int),
+ $root2id_file, 'dt:19700101'.'000000..');
+ close $err_w;
+ my @res = <$r>;
+ is(scalar(@res), 5, 'got expected rows');
+ is(scalar(@res), scalar(grep(/\A[0-9a-f]{40,} [0-9]+\n\z/, @res)),
+ 'entries match format');
+ $err = do { local $/; <$err_r> };
+ is($err, "mset.size=6 nr_out=5\n", "got expected status ($xhc->{impl})");
}
done_testing;
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH 7/7] xap_helper: reopen+retry in MSetIterator loops
2023-08-24 1:22 [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
` (5 preceding siblings ...)
2023-08-24 1:22 ` [PATCH 6/7] cindex: implement dump_roots in C++ Eric Wong
@ 2023-08-24 1:22 ` Eric Wong
2023-08-24 12:30 ` [PATCH 8/7] drop unused CidxRecvIbx.pm Eric Wong
7 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2023-08-24 1:22 UTC (permalink / raw)
To: meta
It's possible to hit a DatabaseModifiedError while iterating
through an MSet. We'll retry in these cases and cleanup some
code in both the Perl and C++ implementations.
---
lib/PublicInbox/XapHelper.pm | 102 +++++++++++++++++++++++------------
lib/PublicInbox/xap_helper.h | 101 +++++++++++++++++++++++-----------
2 files changed, 136 insertions(+), 67 deletions(-)
diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm
index c80be810..ef6a47a3 100644
--- a/lib/PublicInbox/XapHelper.pm
+++ b/lib/PublicInbox/XapHelper.pm
@@ -36,28 +36,74 @@ sub cmd_test_inspect {
($req->{srch}->has_threadid ? 1 : 0)
}
+sub iter_retry_check ($) {
+ die unless ref($@) =~ /\bDatabaseModifiedError\b/;
+ $_[0]->{srch}->reopen;
+ undef; # retries
+}
+
+sub dump_ibx_iter ($$$) {
+ my ($req, $ibx_id, $it) = @_;
+ my $out = $req->{0};
+ eval {
+ my $doc = $it->get_document;
+ for my $p (@{$req->{A}}) {
+ for (xap_terms($p, $doc)) {
+ print $out "$_ $ibx_id\n" or die "print: $!";
+ ++$req->{nr_out};
+ }
+ }
+ };
+ $@ ? iter_retry_check($req) : 0;
+}
+
+sub emit_mset_stats ($$) {
+ my ($req, $mset) = @_;
+ my $err = $req->{1} or return;
+ say $err 'mset.size='.$mset->size.' nr_out='.$req->{nr_out}
+}
+
sub cmd_dump_ibx {
my ($req, $ibx_id, $qry_str) = @_;
$qry_str // return warn('usage: dump_ibx [OPTIONS] IBX_ID QRY_STR');
- my @pfx = @{$req->{A}} or return warn('dump_ibx requires -A PREFIX');
+ $req->{A} or return warn('dump_ibx requires -A PREFIX');
my $max = $req->{srch}->{xdb}->get_doccount;
my $opt = { relevance => -1, limit => $max, offset => $req->{o} // 0 };
$opt->{eidx_key} = $req->{O} if defined $req->{O};
my $mset = $req->{srch}->mset($qry_str, $opt);
- my $out = $req->{0};
- $out->autoflush(1);
- my $nr = 0;
+ $req->{0}->autoflush(1);
for my $it ($mset->items) {
+ for (my $t = 10; $t > 0; --$t) {
+ $t = dump_ibx_iter($req, $ibx_id, $it) // $t;
+ }
+ }
+ if (my $err = $req->{1}) {
+ say $err 'mset.size='.$mset->size.' nr_out='.$req->{nr_out}
+ }
+}
+
+sub dump_roots_iter ($$$) {
+ my ($req, $root2id, $it) = @_;
+ eval {
my $doc = $it->get_document;
- for my $p (@pfx) {
+ my $G = join(' ', map { $root2id->{$_} } xap_terms('G', $doc));
+ for my $p (@{$req->{A}}) {
for (xap_terms($p, $doc)) {
- print $out "$_ $ibx_id\n" or die "print: $!";
- ++$nr;
+ $req->{wbuf} .= "$_ $G\n";
+ ++$req->{nr_out};
}
}
- }
- if (my $err = $req->{1}) {
- say $err 'mset.size='.$mset->size.' nr_out='.$nr
+ };
+ $@ ? iter_retry_check($req) : 0;
+}
+
+sub dump_roots_flush ($$) {
+ my ($req, $fh) = @_;
+ if ($req->{wbuf} ne '') {
+ flock($fh, LOCK_EX) or die "flock: $!";
+ print { $req->{0} } $req->{wbuf} or die "print: $!";
+ flock($fh, LOCK_UN) or die "flock: $!";
+ $req->{wbuf} = '';
}
}
@@ -65,44 +111,29 @@ sub cmd_dump_roots {
my ($req, $root2id_file, $qry_str) = @_;
$qry_str // return
warn('usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR');
- my @pfx = @{$req->{A}} or return warn('dump_roots requires -A PREFIX');
+ $req->{A} or return warn('dump_roots requires -A PREFIX');
open my $fh, '<', $root2id_file or die "open($root2id_file): $!";
- my %root2id; # record format: $OIDHEX "\0" uint32_t
+ my $root2id; # record format: $OIDHEX "\0" uint32_t
my @x = split(/\0/, do { local $/; <$fh> } // die "readline: $!");
while (@x) {
my $oidhex = shift @x;
- $root2id{$oidhex} = shift @x;
+ $root2id->{$oidhex} = shift @x;
}
my $opt = { relevance => -1, limit => $req->{'m'},
offset => $req->{o} // 0 };
my $mset = $req->{srch}->mset($qry_str, $opt);
$req->{0}->autoflush(1);
- my $buf = '';
- my $nr = 0;
+ $req->{wbuf} = '';
for my $it ($mset->items) {
- my $doc = $it->get_document;
- my $G = join(' ', map { $root2id{$_} } xap_terms('G', $doc));
- for my $p (@pfx) {
- for (xap_terms($p, $doc)) {
- $buf .= "$_ $G\n";
- ++$nr;
- }
+ for (my $t = 10; $t > 0; --$t) {
+ $t = dump_roots_iter($req, $root2id, $it) // $t;
}
- if (!($nr & 0x3fff)) {
- flock($fh, LOCK_EX) or die "flock: $!";
- print { $req->{0} } $buf or die "print: $!";
- flock($fh, LOCK_UN) or die "flock: $!";
- $buf = '';
+ if (!($req->{nr_out} & 0x3fff)) {
+ dump_roots_flush($req, $fh);
}
}
- if ($buf ne '') {
- flock($fh, LOCK_EX) or die "flock: $!";
- print { $req->{0} } $buf or die "print: $!";
- flock($fh, LOCK_UN) or die "flock: $!";
- }
- if (my $err = $req->{1}) {
- say $err 'mset.size='.$mset->size.' nr_out='.$nr
- }
+ dump_roots_flush($req, $fh);
+ emit_mset_stats($req, $mset);
}
sub dispatch {
@@ -153,6 +184,7 @@ sub recv_loop {
next;
}
my @argv = split(/\0/, $rbuf);
+ $req->{nr_out} = 0;
eval { $req->dispatch(@argv) } if @argv;
}
}
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
index c9b4e0cc..e3ccfd41 100644
--- a/lib/PublicInbox/xap_helper.h
+++ b/lib/PublicInbox/xap_helper.h
@@ -63,6 +63,12 @@ static void code_nrp_init(void);
static void qp_init_mail_search(Xapian::QueryParser *);
static void qp_init_code_search(Xapian::QueryParser *);
+enum exc_iter {
+ ITER_OK = 0,
+ ITER_RETRY,
+ ITER_ABORT
+};
+
struct srch {
int paths_len; // int for comparisons
unsigned qp_flags;
@@ -196,6 +202,13 @@ static Xapian::MSet commit_mset(struct req *req, const char *qry_str)
return enquire_mset(req, &enq);
}
+static void emit_mset_stats(struct req *req, const Xapian::MSet *mset)
+{
+ if (req->fp[1])
+ fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
+ (unsigned long long)mset->size(), req->nr_out);
+}
+
static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
{
return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
@@ -224,6 +237,20 @@ static int my_setlinebuf(FILE *fp) // glibc setlinebuf(3) can't report errors
return setvbuf(fp, NULL, _IOLBF, 0);
}
+static enum exc_iter dump_ibx_iter(struct req *req, const char *ibx_id,
+ Xapian::MSetIterator *i)
+{
+ try {
+ Xapian::Document doc = i->get_document();
+ for (int p = 0; p < req->pfxc; p++)
+ dump_ibx_term(req, req->pfxv[p], &doc, ibx_id);
+ } catch (const Xapian::DatabaseModifiedError & e) {
+ req->srch->db->reopen();
+ return ITER_RETRY;
+ }
+ return ITER_OK;
+}
+
static bool cmd_dump_ibx(struct req *req)
{
if ((optind + 1) >= req->argc) {
@@ -243,20 +270,18 @@ static bool cmd_dump_ibx(struct req *req)
req->asc = true;
req->sort_col = -1;
Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]);
+
+ // @UNIQ_FOLD in CodeSearchIdx.pm can handle duplicate lines fine
+ // in case we need to retry on DB reopens
for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
- try {
- Xapian::Document doc = i.get_document();
- for (int p = 0; p < req->pfxc; p++)
- dump_ibx_term(req, req->pfxv[p], &doc, ibx_id);
- } catch (const Xapian::Error & e) {
- fprintf(orig_err, "W: %s (#%ld)\n",
- e.get_description().c_str(), (long)(*i));
- continue;
- }
+ for (int t = 10; t > 0; --t)
+ switch (dump_ibx_iter(req, ibx_id, &i)) {
+ case ITER_OK: t = 0; break; // leave inner loop
+ case ITER_RETRY: break; // continue for-loop
+ case ITER_ABORT: return false; // error
+ }
}
- if (req->fp[1])
- fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
- (unsigned long long)mset.size(), req->nr_out);
+ emit_mset_stats(req, &mset);
return true;
}
@@ -312,8 +337,7 @@ static void dump_roots_ensure(void *ptr)
fbuf_ensure(&drt->wbuf);
}
-static bool root2ids_str(struct fbuf *root_ids, struct dump_roots_tmp *drt,
- Xapian::Document *doc)
+static bool root2ids_str(struct fbuf *root_ids, Xapian::Document *doc)
{
if (!fbuf_init(root_ids)) return false;
@@ -408,9 +432,28 @@ done_free:
return ok;
}
+static enum exc_iter dump_roots_iter(struct req *req,
+ struct dump_roots_tmp *drt,
+ Xapian::MSetIterator *i)
+{
+ CLEANUP_FBUF struct fbuf root_ids = { 0 }; // " $ID0 $ID1 $IDx..\n"
+ try {
+ Xapian::Document doc = i->get_document();
+ if (!root2ids_str(&root_ids, &doc))
+ return ITER_ABORT; // bad request, abort
+ for (int p = 0; p < req->pfxc; p++)
+ dump_roots_term(req, req->pfxv[p], drt,
+ &root_ids, &doc);
+ } catch (const Xapian::DatabaseModifiedError & e) {
+ req->srch->db->reopen();
+ return ITER_RETRY;
+ }
+ return ITER_OK;
+}
+
static bool cmd_dump_roots(struct req *req)
{
- CLEANUP_DUMP_ROOTS struct dump_roots_tmp drt { .root2id_fd = -1 };
+ CLEANUP_DUMP_ROOTS struct dump_roots_tmp drt = { .root2id_fd = -1 };
if ((optind + 1) >= req->argc) {
warnx("usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR");
return false; // need file + qry_str
@@ -432,7 +475,7 @@ static bool cmd_dump_roots(struct req *req)
// each entry is at least 43 bytes ({OIDHEX}\0{INT}\0),
// so /32 overestimates the number of expected entries by
// ~%25 (as recommended by Linux hcreate(3) manpage)
- size_t est = (drt.sb.st_size / 32) + 1;
+ size_t est = (drt.sb.st_size / 32) + 1; //+1 for "\0" termination
if ((uint64_t)drt.sb.st_size > (uint64_t)SIZE_MAX) {
warnx("%s size too big (%lld bytes > %zu)", root2id_file,
(long long)drt.sb.st_size, SIZE_MAX);
@@ -469,30 +512,24 @@ static bool cmd_dump_roots(struct req *req)
req->asc = true;
req->sort_col = -1;
Xapian::MSet mset = commit_mset(req, req->argv[optind + 1]);
+
+ // @UNIQ_FOLD in CodeSearchIdx.pm can handle duplicate lines fine
+ // in case we need to retry on DB reopens
for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
- CLEANUP_FBUF struct fbuf root_ids = { 0 };
if (!drt.wbuf.fp && !fbuf_init(&drt.wbuf))
return false;
- try {
- Xapian::Document doc = i.get_document();
- if (!root2ids_str(&root_ids, &drt, &doc))
- return false;
- for (int p = 0; p < req->pfxc; p++)
- dump_roots_term(req, req->pfxv[p], &drt,
- &root_ids, &doc);
- } catch (const Xapian::Error & e) {
- fprintf(orig_err, "W: %s (#%ld)\n",
- e.get_description().c_str(), (long)(*i));
- continue;
- }
+ for (int t = 10; t > 0; --t)
+ switch (dump_roots_iter(req, &drt, &i)) {
+ case ITER_OK: t = 0; break; // leave inner loop
+ case ITER_RETRY: break; // continue for-loop
+ case ITER_ABORT: return false; // error
+ }
if (!(req->nr_out & 0x3fff) && !dump_roots_flush(req, &drt))
return false;
}
if (!dump_roots_flush(req, &drt))
return false;
- if (req->fp[1])
- fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
- (unsigned long long)mset.size(), req->nr_out);
+ emit_mset_stats(req, &mset);
return true;
}
^ permalink raw reply related [flat|nested] 11+ messages in thread
* Re: [PATCH 4/7] introduce optional C++ xap_helper
2023-08-24 1:22 ` [PATCH 4/7] introduce optional C++ xap_helper Eric Wong
@ 2023-08-24 11:23 ` Štěpán Němec
2023-08-24 11:49 ` Eric Wong
0 siblings, 1 reply; 11+ messages in thread
From: Štěpán Němec @ 2023-08-24 11:23 UTC (permalink / raw)
To: Eric Wong; +Cc: meta
On Thu, 24 Aug 2023 01:22:33 +0000
Eric Wong wrote:
> ease-of-installation on systems which don't provide
> pre-packaged Perl Xapian bindings (e.g. OpenBSD 7.3) but
> do provide Xapian development libraries.
Where's this coming from? My servers are currently still running
7.2, but I do see the package on 7.3 as well:
https://ftp.openbsd.org/pub/OpenBSD/7.3/packages/amd64/xapian-bindings-perl-1.4.22.tgz
--
Štěpán
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [PATCH 4/7] introduce optional C++ xap_helper
2023-08-24 11:23 ` Štěpán Němec
@ 2023-08-24 11:49 ` Eric Wong
0 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2023-08-24 11:49 UTC (permalink / raw)
To: Štěpán Němec; +Cc: meta
Štěpán Němec <stepnem@smrk.net> wrote:
> On Thu, 24 Aug 2023 01:22:33 +0000
> Eric Wong wrote:
>
> > ease-of-installation on systems which don't provide
> > pre-packaged Perl Xapian bindings (e.g. OpenBSD 7.3) but
> > do provide Xapian development libraries.
>
> Where's this coming from? My servers are currently still running
> 7.2, but I do see the package on 7.3 as well:
>
> https://ftp.openbsd.org/pub/OpenBSD/7.3/packages/amd64/xapian-bindings-perl-1.4.22.tgz
Thanks. I missed the `-perl' suffix and thought `xapian-bindings' alone
would stem it correctly (I'm not familiar with OpenBSD package
management at all and just getting started with `pkg_add').
I'll update the commit message before pushing, since CentOS 7.x
and probably some other ancient distros are missing the Perl bindings.
^ permalink raw reply [flat|nested] 11+ messages in thread
* [PATCH 8/7] drop unused CidxRecvIbx.pm
2023-08-24 1:22 [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
` (6 preceding siblings ...)
2023-08-24 1:22 ` [PATCH 7/7] xap_helper: reopen+retry in MSetIterator loops Eric Wong
@ 2023-08-24 12:30 ` Eric Wong
7 siblings, 0 replies; 11+ messages in thread
From: Eric Wong @ 2023-08-24 12:30 UTC (permalink / raw)
To: meta
This is no longer needed since xap_helper performs its
functionality while having an optional C++ implementation
which is being significantly faster.
---
n.b. I used --irreversible-delete to save bandwidth
MANIFEST | 1 -
lib/PublicInbox/CidxRecvIbx.pm | 46 ----------------------------------
2 files changed, 47 deletions(-)
delete mode 100644 lib/PublicInbox/CidxRecvIbx.pm
diff --git a/MANIFEST b/MANIFEST
index 4bccc849..918ec2e1 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -163,7 +163,6 @@ lib/PublicInbox/AutoReap.pm
lib/PublicInbox/Cgit.pm
lib/PublicInbox/CidxComm.pm
lib/PublicInbox/CidxLogP.pm
-lib/PublicInbox/CidxRecvIbx.pm
lib/PublicInbox/CidxXapHelperAux.pm
lib/PublicInbox/CmdIPC4.pm
lib/PublicInbox/CodeSearch.pm
diff --git a/lib/PublicInbox/CidxRecvIbx.pm b/lib/PublicInbox/CidxRecvIbx.pm
deleted file mode 100644
index 6add8e54..00000000
^ permalink raw reply related [flat|nested] 11+ messages in thread
end of thread, other threads:[~2023-08-24 12:30 UTC | newest]
Thread overview: 11+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2023-08-24 1:22 [PATCH 0/7] cindex: optional C++ Xapian helper Eric Wong
2023-08-24 1:22 ` [PATCH 1/7] search: hoist out shards_dir for future use Eric Wong
2023-08-24 1:22 ` [PATCH 2/7] cindex: read-only association dump Eric Wong
2023-08-24 1:22 ` [PATCH 3/7] cindex: add --show-roots switch Eric Wong
2023-08-24 1:22 ` [PATCH 4/7] introduce optional C++ xap_helper Eric Wong
2023-08-24 11:23 ` Štěpán Němec
2023-08-24 11:49 ` Eric Wong
2023-08-24 1:22 ` [PATCH 5/7] cindex: fix sorting and uniqueness Eric Wong
2023-08-24 1:22 ` [PATCH 6/7] cindex: implement dump_roots in C++ Eric Wong
2023-08-24 1:22 ` [PATCH 7/7] xap_helper: reopen+retry in MSetIterator loops Eric Wong
2023-08-24 12:30 ` [PATCH 8/7] drop unused CidxRecvIbx.pm Eric Wong
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).