From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 3DED81F51B for ; Thu, 24 Aug 2023 01:22:37 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1692840157; bh=n/kiBpdVnzRAlO2+wk88BUOIkQRFyOPdxJdH+XUGsK0=; h=From:To:Subject:Date:In-Reply-To:References:From; b=ZPOUY90twFf4bST3nQhzY2VOFABCWpQ/17cDAKTopMACmWgNbf+bIidfhXu+kdLxL 0by6SJvdM3lMdTwyG1Kpmdu9vCJKbXpmoyqi6MMwH40YzaO/EjKaJh/E5uzJtujuYx Qykh9PwcfjKMauym6YK3dfN05T8MMpoWW5cB4wGY= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 6/7] cindex: implement dump_roots in C++ Date: Thu, 24 Aug 2023 01:22:35 +0000 Message-Id: <20230824012236.3968030-7-e@80x24.org> In-Reply-To: <20230824012236.3968030-1-e@80x24.org> References: <20230824012236.3968030-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: It's now just `dump_roots' instead of `dump_shard_roots', since this doesn't need to be tied to the concept of shards. I'm still shaky with C++, but intend to keep using stuff like hsearch(3) to make life easier for C hackers :P --- MANIFEST | 1 - lib/PublicInbox/CidxDumpShardRoots.pm | 73 ------ lib/PublicInbox/CidxXapHelperAux.pm | 10 +- lib/PublicInbox/CodeSearchIdx.pm | 56 ++--- lib/PublicInbox/XapHelper.pm | 52 +++- lib/PublicInbox/xap_helper.h | 332 +++++++++++++++++++++++--- t/xap_helper.t | 44 +++- 7 files changed, 407 insertions(+), 161 deletions(-) delete mode 100644 lib/PublicInbox/CidxDumpShardRoots.pm diff --git a/MANIFEST b/MANIFEST index 4f61af42..4bccc849 100644 --- a/MANIFEST +++ b/MANIFEST @@ -162,7 +162,6 @@ lib/PublicInbox/AltId.pm lib/PublicInbox/AutoReap.pm lib/PublicInbox/Cgit.pm lib/PublicInbox/CidxComm.pm -lib/PublicInbox/CidxDumpShardRoots.pm lib/PublicInbox/CidxLogP.pm lib/PublicInbox/CidxRecvIbx.pm lib/PublicInbox/CidxXapHelperAux.pm diff --git a/lib/PublicInbox/CidxDumpShardRoots.pm b/lib/PublicInbox/CidxDumpShardRoots.pm deleted file mode 100644 index 34afa419..00000000 --- a/lib/PublicInbox/CidxDumpShardRoots.pm +++ /dev/null @@ -1,73 +0,0 @@ -# Copyright (C) all contributors -# License: AGPL-3.0+ - -# Intended for PublicInbox::DS::event_loop for -cindex --associate -# Iterating through mset->items is slow in Perl due to method dispatch -# and that loop may implemented in C++ using Xapian directly -package PublicInbox::CidxDumpShardRoots; -use v5.12; -use PublicInbox::Lock; -use PublicInbox::Search qw(xap_terms); -use Socket qw(MSG_EOR); - -sub start { - my ($cidx, $root2id, $qry_str) = @_; - my $op_p = delete($cidx->{0}) // die 'BUG: no {0} op_p'; - my $sort_w = delete($cidx->{1}) // die 'BUG: no {1} $w sort pipe'; - # sort lock is necessary if we have may root ids which cause a - # row length to exceed POSIX PIPE_BUF (via `$G' below) - my $sort_lk = bless { lock_path => $cidx->tmpdir.'/to_root_id.lock' }, - 'PublicInbox::Lock'; - $sort_w->autoflush(1); - $cidx->begin_txn_lazy; # only using txn to simplify writer subs - my $opt = { limit => $cidx->assoc_max_init, relevance => -2 }; - my $self = bless { - cidx => $cidx, - op_p => $op_p, - iter => 0, - mset => $cidx->mset($qry_str, $opt), - root2id => $root2id, - sort_w => $sort_w, - sort_lk => $sort_lk, - }, __PACKAGE__; - event_step($self); -} - -sub event_step { - my ($self) = @_; - my $cidx = $self->{cidx}; - return if $cidx->do_quit; - my $last = $self->{mset}->size - 1; - my $cur = $self->{iter}; - my $end = $cur + 9999; - $end = $last if $end > $last; - $self->{iter} = $end + 1; - local $0 = "dumping shard [$cidx->{shard}] $cur..$end"; - $cidx->progress($0); - - my $root2id = $self->{root2id}; - my $buf = ''; - for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop - my $doc = $x->get_document; - my $G = join(' ', map { - $root2id->{pack('H*', $_)}; - } xap_terms('G', $doc)); - for my $p (@{$cidx->{ASSOC_PFX}}) { - $buf .= "$_ $G\n" for (xap_terms($p, $doc)); - } - } - $self->{sort_lk}->lock_acquire_fast; - print { $self->{sort_w} } $buf or die "print: $!"; - $self->{sort_lk}->lock_release_fast; - $end < $last && !$cidx->do_quit and - PublicInbox::DS::requeue($self); -} - -sub DESTROY { - my ($self) = @_; - return if $self->{cidx}->do_quit; - send($self->{op_p}, - "dump_shard_roots_done $self->{cidx}->{shard}", MSG_EOR); -} - -1; diff --git a/lib/PublicInbox/CidxXapHelperAux.pm b/lib/PublicInbox/CidxXapHelperAux.pm index c9a5ddad..f402bde0 100644 --- a/lib/PublicInbox/CidxXapHelperAux.pm +++ b/lib/PublicInbox/CidxXapHelperAux.pm @@ -10,12 +10,8 @@ use PublicInbox::Syscall qw(EPOLLIN); # rpipe connects to req->fp[1] in xap_helper.h sub new { - my ($cls, $rpipe, $cidx, $pfx, $associate) = @_; - my $self = bless { - cidx => $cidx, - pfx => $pfx, - associate => $associate - }, $cls; + my ($cls, $rpipe, $cidx, $pfx) = @_; + my $self = bless { cidx => $cidx, pfx => $pfx }, $cls; $rpipe->blocking(0); $self->SUPER::new($rpipe, EPOLLIN); } @@ -36,7 +32,7 @@ sub event_step { my @lines = split(/^/m, $buf); $self->{buf} = pop @lines if substr($lines[-1], -1) ne "\n"; for my $l (@lines) { - if ($l =~ /\Amset\.size=[0-9]+\n\z/) { + if ($l =~ /\Amset\.size=[0-9]+ nr_out=[0-9]+\n\z/) { delete $self->{cidx}->{PENDING}->{$pfx}; $self->{cidx}->index_next; } diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index 4a41b1da..404d6826 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -87,7 +87,6 @@ our ( @AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands @ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST $QRY_STR, # common query string for both code and inbox associations - @DUMP_SHARD_ROOTS_OK, # for associate $DUMP_IBX_WPIPE, # goes to sort(1) @ID2ROOT, ); @@ -505,14 +504,6 @@ sub shard_commit { # via wq_io_do send($op_p, "shard_done $self->{shard}", MSG_EOR); } -sub dump_shard_roots_done { # via PktOp on dump_shard_roots completion - my ($self, $associate, $n) = @_; - return if $DO_QUIT; - progress($self, "dump_shard_roots [$n] done"); - $DUMP_SHARD_ROOTS_OK[$n] = 1; - # may run associate() -} - sub assoc_max_init ($) { my ($self) = @_; my $max = $self->{-opt}->{'associate-max'} // $ASSOC_MAX; @@ -520,38 +511,39 @@ sub assoc_max_init ($) { $max < 0 ? ((2 ** 31) - 1) : $max; } -# dump the patchids of each shard: $XDFID $ROOT1 $ROOT2.. -sub dump_shard_roots { # via wq_io_do for associate - my ($self, $root2id, $qry_str) = @_; - PublicInbox::CidxDumpShardRoots::start($self, $root2id, $qry_str); -} - sub dump_roots_once { my ($self, $associate) = @_; $associate // die 'BUG: no $associate'; $TODO{associating} = 1; # keep shards_active() happy progress($self, 'dumping IDs from coderepos'); local $self->{xdb}; - @ID2ROOT = map { pack('H*', $_) } $self->all_terms('G'); - my $id = 0; - my %root2id = map { $_ => $id++ } @ID2ROOT; - # dump_shard_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_id + @ID2ROOT = $self->all_terms('G'); + my $root2id = "$TMPDIR/root2id"; + open my $fh, '>', $root2id or die "open($root2id): $!"; + my $nr = -1; + for (@ID2ROOT) { print $fh $_, "\0", ++$nr, "\0" } # mmap-friendly + close $fh or die "close: $!"; + # dump_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_id pipe(my ($sort_r, $sort_w)) or die "pipe: $!"; pipe(my ($fold_r, $fold_w)) or die "pipe: $!"; my @sort = (@SORT, '-k1,1'); my $dst = "$TMPDIR/to_root_id"; - open my $fh, '>', $dst or die "open($dst): $!"; + open $fh, '>', $dst or die "open($dst): $!"; my $env = { %$CMD_ENV, OFS => ' ' }; my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fold_w }); my $fold_pid = spawn(\@UNIQ_FOLD, $env, { 0 => $fold_r, 1 => $fh }); awaitpid($sort_pid, \&cmd_done, \@sort, $associate); awaitpid($fold_pid, \&cmd_done, [@UNIQ_FOLD, '(shards)'], $associate); - my ($c, $p) = PublicInbox::PktOp->pair; - $c->{ops}->{dump_shard_roots_done} = [ $self, $associate ]; - my @arg = ('dump_shard_roots', [ $p->{op_p}, $sort_w ], - \%root2id, $QRY_STR); - $_->wq_io_do(@arg) for @IDX_SHARDS; - progress($self, 'waiting on dump_shard_roots sort'); + my @arg = ((map { ('-A', $_) } @ASSOC_PFX), '-c', + '-m', assoc_max_init($self), $root2id, $QRY_STR); + for my $d ($self->shard_dirs) { + pipe(my ($err_r, $err_w)) or die "pipe: $!"; + $XHC->mkreq([$sort_w, $err_w], qw(dump_roots -d), $d, @arg); + my $desc = "dump_roots $d"; + $self->{PENDING}->{$desc} = $associate; + PublicInbox::CidxXapHelperAux->new($err_r, $self, $desc); + } + progress($self, 'waiting on dump_roots sort'); } sub dump_ibx { # sends to xap_helper.h @@ -563,8 +555,8 @@ sub dump_ibx { # sends to xap_helper.h pipe(my ($r, $w)) or die "pipe: $!"; $XHC->mkreq([$DUMP_IBX_WPIPE, $w], @cmd); my $ekey = $ibx->eidx_key; - $self->{PENDING}->{$ekey} = undef; - PublicInbox::CidxXapHelperAux->new($r, $self, $ekey, $TODO{associate}); + $self->{PENDING}->{$ekey} = $TODO{associate}; + PublicInbox::CidxXapHelperAux->new($r, $self, $ekey); } sub dump_ibx_start { @@ -885,8 +877,7 @@ sub associate { my ($self) = @_; return if $DO_QUIT; @IDX_SHARDS or return warn("# aborting on no shards\n"); - grep(defined, @DUMP_SHARD_ROOTS_OK) == @IDX_SHARDS or - die "E: shards not dumped properly\n"; + unlink("$TMPDIR/root2id"); my @pending = keys %{$self->{PENDING}}; die "E: pending=@pending jobs not done\n" if @pending; progress($self, 'associating...'); @@ -906,7 +897,7 @@ sub associate { my $nr = $score{$k}; my ($ibx_id, $root) = split(/ /, $k); my $ekey = $IBX[$ibx_id]->eidx_key; - $root = unpack('H*', $ID2ROOT[$root]); + $root = $ID2ROOT[$root]; progress($self, "$ekey => $root has $nr matches"); } delete $TODO{associating}; # break out of shards_active() @@ -1048,7 +1039,6 @@ sub cidx_read_comm { # via PublicInbox::CidxComm::event_step sub init_associate_prefork ($) { my ($self) = @_; return unless $self->{-opt}->{associate}; - require PublicInbox::CidxDumpShardRoots; require PublicInbox::CidxXapHelperAux; require PublicInbox::XapClient; $self->{-pi_cfg} = PublicInbox::Config->new; @@ -1120,7 +1110,7 @@ sub cidx_run { # main entry point local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE, $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV, %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE, - @ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC, @SORT); + @ID2ROOT, $XH_PID, $XHC, @SORT); local $BATCH_BYTES = $self->{-opt}->{batch_size} // $PublicInbox::SearchIdx::BATCH_BYTES; local $self->{ASSOC_PFX} = \@ASSOC_PFX; diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm index bf2f99a2..c80be810 100644 --- a/lib/PublicInbox/XapHelper.pm +++ b/lib/PublicInbox/XapHelper.pm @@ -8,7 +8,9 @@ use Getopt::Long (); # good API even if we only use short options our $GLP = Getopt::Long::Parser->new; $GLP->configure(qw(require_order bundling no_ignore_case no_auto_abbrev)); use PublicInbox::Search qw(xap_terms); +use PublicInbox::CodeSearch; use PublicInbox::IPC; +use Fcntl qw(LOCK_UN LOCK_EX); my $X = \%PublicInbox::Search::X; our (%SRCH, %PIDS, $parent_pid); our $stderr = \*STDERR; @@ -44,15 +46,63 @@ sub cmd_dump_ibx { my $mset = $req->{srch}->mset($qry_str, $opt); my $out = $req->{0}; $out->autoflush(1); + my $nr = 0; for my $it ($mset->items) { my $doc = $it->get_document; for my $p (@pfx) { for (xap_terms($p, $doc)) { print $out "$_ $ibx_id\n" or die "print: $!"; + ++$nr; } } } - if (my $err = $req->{1}) { say $err 'mset.size=', $mset->size } + if (my $err = $req->{1}) { + say $err 'mset.size='.$mset->size.' nr_out='.$nr + } +} + +sub cmd_dump_roots { + my ($req, $root2id_file, $qry_str) = @_; + $qry_str // return + warn('usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR'); + my @pfx = @{$req->{A}} or return warn('dump_roots requires -A PREFIX'); + open my $fh, '<', $root2id_file or die "open($root2id_file): $!"; + my %root2id; # record format: $OIDHEX "\0" uint32_t + my @x = split(/\0/, do { local $/; <$fh> } // die "readline: $!"); + while (@x) { + my $oidhex = shift @x; + $root2id{$oidhex} = shift @x; + } + my $opt = { relevance => -1, limit => $req->{'m'}, + offset => $req->{o} // 0 }; + my $mset = $req->{srch}->mset($qry_str, $opt); + $req->{0}->autoflush(1); + my $buf = ''; + my $nr = 0; + for my $it ($mset->items) { + my $doc = $it->get_document; + my $G = join(' ', map { $root2id{$_} } xap_terms('G', $doc)); + for my $p (@pfx) { + for (xap_terms($p, $doc)) { + $buf .= "$_ $G\n"; + ++$nr; + } + } + if (!($nr & 0x3fff)) { + flock($fh, LOCK_EX) or die "flock: $!"; + print { $req->{0} } $buf or die "print: $!"; + flock($fh, LOCK_UN) or die "flock: $!"; + $buf = ''; + } + } + if ($buf ne '') { + flock($fh, LOCK_EX) or die "flock: $!"; + print { $req->{0} } $buf or die "print: $!"; + flock($fh, LOCK_UN) or die "flock: $!"; + } + if (my $err = $req->{1}) { + say $err 'mset.size='.$mset->size.' nr_out='.$nr + } } sub dispatch { diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h index 52db92b7..c9b4e0cc 100644 --- a/lib/PublicInbox/xap_helper.h +++ b/lib/PublicInbox/xap_helper.h @@ -6,12 +6,16 @@ * this is not linked to Perl in any way. * C (not C++) is used as much as possible to lower the contribution * barrier for hackers who mainly know C (this includes the maintainer). + * Yes, that means we use C stdlib stuff like hsearch and open_memstream + * instead their equivalents in the C++ stdlib :P * Everything here is an unstable internal API of public-inbox and * NOT intended for ordinary users; only public-inbox hackers */ #ifndef _ALL_SOURCE # define _ALL_SOURCE #endif +#include +#include #include #include #include @@ -80,6 +84,7 @@ struct req { // argv and pfxv point into global rbuf unsigned long long max; unsigned long long off; unsigned long timeout_sec; + size_t nr_out; long sort_col; // value column, negative means BoolWeight int argc; int pfxc; @@ -96,6 +101,28 @@ struct worker { unsigned nr; }; +#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst)) +static size_t split2argv(char **dst, char *buf, size_t len, size_t limit) +{ + if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) { + warnx("bogus argument given"); + return 0; + } + size_t nr = 0; + char *c = buf; + for (size_t i = 1; i < len; i++) { + if (!buf[i]) { + dst[nr++] = c; + c = buf + i + 1; + } + if (nr == limit) { + warnx("too many args: %zu", nr); + return 0; + } + } + return (long)nr; +} + static bool has_threadid(const struct srch *srch) { return srch->db->get_metadata("has_threadid") == "1"; @@ -118,8 +145,12 @@ static Xapian::Enquire prep_enquire(const struct req *req) static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq) { - if (!req->max) - req->max = 50; + if (!req->max) { + switch (sizeof(Xapian::doccount)) { + case 4: req->max = UINT_MAX; break; + default: req->max = ULLONG_MAX; + } + } for (int i = 0; i < 9; i++) { try { Xapian::MSet mset = enq->get_mset(req->off, req->max); @@ -131,13 +162,13 @@ static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq) return enq->get_mset(req->off, req->max); } +// for v1, v2, and extindex static Xapian::MSet mail_mset(struct req *req, const char *qry_str) { struct srch *srch = req->srch; Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags); if (req->Oeidx_key) { req->Oeidx_key[0] = 'O'; // modifies static rbuf - fprintf(stderr, "dbg eidxkey:%s>\n", req->Oeidx_key); qry = Xapian::Query(Xapian::Query::OP_FILTER, qry, Xapian::Query(req->Oeidx_key)); } @@ -150,6 +181,21 @@ static Xapian::MSet mail_mset(struct req *req, const char *qry_str) return enquire_mset(req, &enq); } +// for cindex +static Xapian::MSet commit_mset(struct req *req, const char *qry_str) +{ + struct srch *srch = req->srch; + Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags); + // TODO: git_dir + roots_filter + + // we only want commits: + qry = Xapian::Query(Xapian::Query::OP_FILTER, qry, + Xapian::Query("T" "c")); + Xapian::Enquire enq = prep_enquire(req); + enq.set_query(qry); + return enquire_mset(req, &enq); +} + static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len) { return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len); @@ -165,9 +211,11 @@ static void dump_ibx_term(struct req *req, const char *pfx, for (cur.skip_to(pfx); cur != end; cur++) { std::string tn = *cur; - if (starts_with(&tn, pfx, pfx_len)) + if (starts_with(&tn, pfx, pfx_len)) { fprintf(req->fp[0], "%s %s\n", tn.c_str() + pfx_len, ibx_id); + ++req->nr_out; + } } } @@ -194,7 +242,6 @@ static bool cmd_dump_ibx(struct req *req) } req->asc = true; req->sort_col = -1; - req->max = (unsigned long long)req->srch->db->get_doccount(); Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]); for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) { try { @@ -208,8 +255,244 @@ static bool cmd_dump_ibx(struct req *req) } } if (req->fp[1]) - fprintf(req->fp[1], "mset.size=%llu\n", - (unsigned long long)mset.size()); + fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n", + (unsigned long long)mset.size(), req->nr_out); + return true; +} + +struct fbuf { + FILE *fp; + char *ptr; + size_t len; +}; + +struct dump_roots_tmp { + struct stat sb; + void *mm_ptr; + char **entries; + struct fbuf wbuf; + int root2id_fd; +}; + +#define CLEANUP_FBUF __attribute__((__cleanup__(fbuf_ensure))) +static void fbuf_ensure(void *ptr) +{ + struct fbuf *fbuf = (struct fbuf *)ptr; + if (fbuf->fp && fclose(fbuf->fp)) + perror("fclose(fbuf->fp)"); + fbuf->fp = NULL; + free(fbuf->ptr); +} + +static bool fbuf_init(struct fbuf *fbuf) +{ + assert(!fbuf->ptr); + fbuf->fp = open_memstream(&fbuf->ptr, &fbuf->len); + if (fbuf->fp) return true; + perror("open_memstream(fbuf)"); + return false; +} + +static void xclose(int fd) +{ + if (close(fd) < 0 && errno != EINTR) + err(EXIT_FAILURE, "BUG: close"); +} + +#define CLEANUP_DUMP_ROOTS __attribute__((__cleanup__(dump_roots_ensure))) +static void dump_roots_ensure(void *ptr) +{ + struct dump_roots_tmp *drt = (struct dump_roots_tmp *)ptr; + if (drt->root2id_fd >= 0) + xclose(drt->root2id_fd); + hdestroy(); // idempotent + if (drt->mm_ptr && munmap(drt->mm_ptr, drt->sb.st_size)) + err(EXIT_FAILURE, "BUG: munmap"); + free(drt->entries); + fbuf_ensure(&drt->wbuf); +} + +static bool root2ids_str(struct fbuf *root_ids, struct dump_roots_tmp *drt, + Xapian::Document *doc) +{ + if (!fbuf_init(root_ids)) return false; + + bool ok = true; + Xapian::TermIterator cur = doc->termlist_begin(); + Xapian::TermIterator end = doc->termlist_end(); + ENTRY e, *ep; + for (cur.skip_to("G"); cur != end; cur++) { + std::string tn = *cur; + if (!starts_with(&tn, "G", 1)) + continue; + union { const char *in; char *out; } u; + u.in = tn.c_str() + 1; + e.key = u.out; + ep = hsearch(e, FIND); + if (!ep) { + warnx("hsearch miss `%s'", e.key); + return false; + } + // ep->data is a NUL-terminated string matching /[0-9]+/ + fputc(' ', root_ids->fp); + fputs((const char *)ep->data, root_ids->fp); + } + fputc('\n', root_ids->fp); + if (ferror(root_ids->fp) | fclose(root_ids->fp)) { + perror("ferror|fclose(root_ids)"); + ok = false; + } + root_ids->fp = NULL; + return ok; +} + +// writes term values matching @pfx for a given @doc, ending the line +// with the contents of @root_ids +static void dump_roots_term(struct req *req, const char *pfx, + struct dump_roots_tmp *drt, + struct fbuf *root_ids, + Xapian::Document *doc) +{ + Xapian::TermIterator cur = doc->termlist_begin(); + Xapian::TermIterator end = doc->termlist_end(); + size_t pfx_len = strlen(pfx); + + for (cur.skip_to(pfx); cur != end; cur++) { + std::string tn = *cur; + if (!starts_with(&tn, pfx, pfx_len)) + continue; + fputs(tn.c_str() + pfx_len, drt->wbuf.fp); + fwrite(root_ids->ptr, root_ids->len, 1, drt->wbuf.fp); + ++req->nr_out; + } +} + +// we may have lines which exceed PIPE_BUF, so we do our own +// buffering and rely on flock(2), here +static bool dump_roots_flush(struct req *req, struct dump_roots_tmp *drt) +{ + char *p; + int fd = fileno(req->fp[0]); + bool ok = true; + + if (!drt->wbuf.fp) return true; + if (fd < 0) err(EXIT_FAILURE, "BUG: fileno"); + if (fclose(drt->wbuf.fp)) { + warn("fclose(drt->wbuf.fp)"); // malloc failure? + return false; + } + drt->wbuf.fp = NULL; + if (!drt->wbuf.len) goto done_free; + if (flock(drt->root2id_fd, LOCK_EX)) { + perror("LOCK_EX"); + return false; + } + p = drt->wbuf.ptr; + do { + ssize_t n = write(fd, p, drt->wbuf.len); + if (n > 0) { + drt->wbuf.len -= n; + p += n; + } else { + perror(n ? "write" : "write (zero bytes)"); + return false; + } + } while (drt->wbuf.len); + if (flock(drt->root2id_fd, LOCK_UN)) { + perror("LOCK_UN"); + return false; + } +done_free: + free(drt->wbuf.ptr); + drt->wbuf.ptr = NULL; + return ok; +} + +static bool cmd_dump_roots(struct req *req) +{ + CLEANUP_DUMP_ROOTS struct dump_roots_tmp drt { .root2id_fd = -1 }; + if ((optind + 1) >= req->argc) { + warnx("usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR"); + return false; // need file + qry_str + } + if (!req->pfxc) { + warnx("dump_roots requires -A PREFIX"); + return false; + } + const char *root2id_file = req->argv[optind]; + drt.root2id_fd = open(root2id_file, O_RDONLY); + if (drt.root2id_fd < 0) { + warn("open(%s)", root2id_file); + return false; + } + if (fstat(drt.root2id_fd, &drt.sb)) { + warn("fstat(%s)", root2id_file); + return false; + } + // each entry is at least 43 bytes ({OIDHEX}\0{INT}\0), + // so /32 overestimates the number of expected entries by + // ~%25 (as recommended by Linux hcreate(3) manpage) + size_t est = (drt.sb.st_size / 32) + 1; + if ((uint64_t)drt.sb.st_size > (uint64_t)SIZE_MAX) { + warnx("%s size too big (%lld bytes > %zu)", root2id_file, + (long long)drt.sb.st_size, SIZE_MAX); + return false; + } + drt.mm_ptr = mmap(NULL, drt.sb.st_size, PROT_READ, + MAP_PRIVATE, drt.root2id_fd, 0); + if (drt.mm_ptr == MAP_FAILED) { + warn("mmap(%s)", root2id_file); + return false; + } + drt.entries = (char **)calloc(est * 2, sizeof(char *)); + if (!drt.entries) { + warn("calloc(%zu * 2, %zu)", est, sizeof(char *)); + return false; + } + size_t tot = split2argv(drt.entries, (char *)drt.mm_ptr, + drt.sb.st_size, est * 2); + if (tot <= 0) return false; // split2argv already warned on error + if (!hcreate(est)) { + warn("hcreate(%zu)", est); + return false; + } + for (size_t i = 0; i < tot; ) { + ENTRY e; + e.key = drt.entries[i++]; + e.data = drt.entries[i++]; + if (!hsearch(e, ENTER)) { + warn("hsearch(%s => %s, ENTER)", e.key, + (const char *)e.data); + return false; + } + } + req->asc = true; + req->sort_col = -1; + Xapian::MSet mset = commit_mset(req, req->argv[optind + 1]); + for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) { + CLEANUP_FBUF struct fbuf root_ids = { 0 }; + if (!drt.wbuf.fp && !fbuf_init(&drt.wbuf)) + return false; + try { + Xapian::Document doc = i.get_document(); + if (!root2ids_str(&root_ids, &drt, &doc)) + return false; + for (int p = 0; p < req->pfxc; p++) + dump_roots_term(req, req->pfxv[p], &drt, + &root_ids, &doc); + } catch (const Xapian::Error & e) { + fprintf(orig_err, "W: %s (#%ld)\n", + e.get_description().c_str(), (long)(*i)); + continue; + } + if (!(req->nr_out & 0x3fff) && !dump_roots_flush(req, &drt)) + return false; + } + if (!dump_roots_flush(req, &drt)) + return false; + if (req->fp[1]) + fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n", + (unsigned long long)mset.size(), req->nr_out); return true; } @@ -228,7 +511,8 @@ static const struct cmd_entry { cmd fn; } cmds[] = { // should be small enough to not need bsearch || gperf // most common commands first - CMD(dump_ibx), + CMD(dump_ibx), // many inboxes + CMD(dump_roots), // per-cidx shard CMD(test_inspect), // least common commands last }; @@ -240,12 +524,6 @@ union my_cmsg { char pad[sizeof(struct cmsghdr) + 16 + RECV_FD_SPACE]; }; -static void xclose(int fd) -{ - if (close(fd) < 0 && errno != EINTR) - err(EXIT_FAILURE, "BUG: close"); -} - static bool recv_req(struct req *req, char *rbuf, size_t *len) { union my_cmsg cmsg = { 0 }; @@ -306,28 +584,6 @@ static bool recv_req(struct req *req, char *rbuf, size_t *len) return false; } -#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst)) -static int split2argv(char **dst, char *buf, size_t len, size_t limit) -{ - if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) { - warnx("bogus argument given"); - return 0; - } - size_t nr = 0; - char *c = buf; - for (size_t i = 1; i < len; i++) { - if (!buf[i]) { - dst[nr++] = c; - c = buf + i + 1; - } - if (nr == limit) { - warnx("too many args: %zu", nr); - return 0; - } - } - return (int)nr; -} - static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch { const struct srch *a = (const struct srch *)pa; @@ -355,7 +611,7 @@ static bool srch_init(struct req *req) char *dirv[MY_ARG_MAX]; int i; struct srch *srch = req->srch; - int dirc = SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len); + int dirc = (int)SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len); const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE; srch->qp_flags = FLAG_PHRASE | Xapian::QueryParser::FLAG_BOOLEAN | @@ -538,7 +794,7 @@ static void recv_loop(void) // worker process loop perror("W: setlinebuf(req.fp[1])"); stderr = req.fp[1]; } - req.argc = SPLIT2ARGV(req.argv, rbuf, len); + req.argc = (int)SPLIT2ARGV(req.argv, rbuf, len); if (req.argc > 0) dispatch(&req); if (ferror(req.fp[0]) | fclose(req.fp[0])) diff --git a/t/xap_helper.t b/t/xap_helper.t index f00a845a..92da2e6d 100644 --- a/t/xap_helper.t +++ b/t/xap_helper.t @@ -91,7 +91,7 @@ my $test = sub { my $res = do { local $/; <$r> }; is(join('', @res), $res, 'got identical response w/ error pipe'); my $stats = do { local $/; <$err_rd> }; - is($stats, "mset.size=6\n", 'mset.size reported'); + is($stats, "mset.size=6 nr_out=6\n", 'mset.size reported'); if ($arg[-1] !~ /\('-j0'\)/) { kill('KILL', $cinfo{pid}); @@ -105,12 +105,14 @@ my $test = sub { }; my $ar; -$ar = $test->(qw[-MPublicInbox::XapHelper -e - PublicInbox::XapHelper::start('-j0')]); -$ar = $test->(qw[-MPublicInbox::XapHelper -e - PublicInbox::XapHelper::start('-j1')]); - -my @NO_CXX = (0); +my @NO_CXX; +if (!$ENV{TEST_XH_CXX_ONLY}) { + $ar = $test->(qw[-MPublicInbox::XapHelper -e + PublicInbox::XapHelper::start('-j0')]); + $ar = $test->(qw[-MPublicInbox::XapHelper -e + PublicInbox::XapHelper::start('-j1')]); + push @NO_CXX, 0; +} SKIP: { eval { require PublicInbox::XapHelperCxx; @@ -125,6 +127,20 @@ SKIP: { PublicInbox::XapHelperCxx::start('-j1')]); }; +require PublicInbox::CodeSearch; +my $cs_int = PublicInbox::CodeSearch->new("$crepo/public-inbox-cindex"); +my $root2id_file = "$tmp/root2id"; +my @id2root; +{ + open my $fh, '>', $root2id_file; + my $i = -1; + for ($cs_int->all_terms('G')) { + print $fh $_, "\0", ++$i, "\0"; + $id2root[$i] = $_; + } + close $fh; +} + for my $n (@NO_CXX) { local $ENV{PI_NO_CXX} = $n; my ($xhc, $pid) = PublicInbox::XapClient::start_helper('-j0'); @@ -141,7 +157,19 @@ for my $n (@NO_CXX) { my $res = do { local $/; <$r> }; is($res, "$dfid 9\n$mid 9\n", "got expected result ($xhc->{impl})"); my $err = do { local $/; <$err_r> }; - is($err, "mset.size=1\n", "got expected status ($xhc->{impl})"); + is($err, "mset.size=1 nr_out=2\n", "got expected status ($xhc->{impl})"); + + pipe($err_r, $err_w); + $r = $xhc->mkreq([ undef, $err_w ], qw(dump_roots -c -A XDFID), + (map { ('-d', $_) } @int), + $root2id_file, 'dt:19700101'.'000000..'); + close $err_w; + my @res = <$r>; + is(scalar(@res), 5, 'got expected rows'); + is(scalar(@res), scalar(grep(/\A[0-9a-f]{40,} [0-9]+\n\z/, @res)), + 'entries match format'); + $err = do { local $/; <$err_r> }; + is($err, "mset.size=6 nr_out=5\n", "got expected status ($xhc->{impl})"); } done_testing;