From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 8C1521F461 for ; Thu, 24 Aug 2023 01:22:36 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1692840156; bh=qGw2GHW79yEVpC6fMQyViWXGpsTmE14uzt5t8CxRjwk=; h=From:To:Subject:Date:In-Reply-To:References:From; b=ZfR93nDoRLimG94UdrcgZ9MKUTAojrAXEJegJhtCkGn+GAbdsQvrlOs7RTLVhm29R XVyPb879letpPe3uD8o0OFhTrPnVXqY3BSAvovMxQNp3eVogXpEK3qWnhTOFPM+nEs L/jsidN8lgNDLCDEvqLjuy0n1wJ8GCLzJMroZ45E= From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 2/7] cindex: read-only association dump Date: Thu, 24 Aug 2023 01:22:31 +0000 Message-Id: <20230824012236.3968030-3-e@80x24.org> In-Reply-To: <20230824012236.3968030-1-e@80x24.org> References: <20230824012236.3968030-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This will eventually allow associating coderepos with inboxes and vice-versa; avoiding the need for manual configuration via tedious publicinbox.*.coderepo directives. I'm not sure how this should be stored for WWW, yet, but it's required since it takes about 8 hours to do this fully across lore and git.kernel.org. --- MANIFEST | 3 + lib/PublicInbox/CidxDumpIbx.pm | 59 +++++ lib/PublicInbox/CidxDumpShardRoots.pm | 73 ++++++ lib/PublicInbox/CidxRecvIbx.pm | 46 ++++ lib/PublicInbox/CodeSearchIdx.pm | 306 ++++++++++++++++++++++---- lib/PublicInbox/Config.pm | 2 +- lib/PublicInbox/Search.pm | 2 +- script/public-inbox-cindex | 4 +- 8 files changed, 452 insertions(+), 43 deletions(-) create mode 100644 lib/PublicInbox/CidxDumpIbx.pm create mode 100644 lib/PublicInbox/CidxDumpShardRoots.pm create mode 100644 lib/PublicInbox/CidxRecvIbx.pm diff --git a/MANIFEST b/MANIFEST index 1001ca08..162e3038 100644 --- a/MANIFEST +++ b/MANIFEST @@ -162,7 +162,10 @@ lib/PublicInbox/AltId.pm lib/PublicInbox/AutoReap.pm lib/PublicInbox/Cgit.pm lib/PublicInbox/CidxComm.pm +lib/PublicInbox/CidxDumpIbx.pm +lib/PublicInbox/CidxDumpShardRoots.pm lib/PublicInbox/CidxLogP.pm +lib/PublicInbox/CidxRecvIbx.pm lib/PublicInbox/CmdIPC4.pm lib/PublicInbox/CodeSearch.pm lib/PublicInbox/CodeSearchIdx.pm diff --git a/lib/PublicInbox/CidxDumpIbx.pm b/lib/PublicInbox/CidxDumpIbx.pm new file mode 100644 index 00000000..e1bc273d --- /dev/null +++ b/lib/PublicInbox/CidxDumpIbx.pm @@ -0,0 +1,59 @@ +# Copyright (C) all contributors +# License: AGPL-3.0+ + +# Intended for PublicInbox::DS::event_loop for -cindex --associate +# Iterating through mset->items is slow in Perl due to method dispatch +# and that loop may implemented in C++ using Xapian directly +package PublicInbox::CidxDumpIbx; +use v5.12; +use PublicInbox::Search qw(xap_terms); +use PublicInbox::DS; +use Socket qw(MSG_EOR); + +sub start { + my ($rcvibx, $ibx_id) = @_; + my $cidx = $rcvibx->{cidx}; + my $ibx = $cidx->{IBX}->[$ibx_id] // die "BUG: no IBX[$ibx_id]"; + my $self = bless { rcvibx => $rcvibx, ekey => $ibx->eidx_key, + ibx_id => $ibx_id }, __PACKAGE__; + $self->{srch} = $ibx->isrch // do { + warn("W: $self->{ekey} has no search index (ignoring)\n"); + return undef; + }; + my $opt = { limit => $cidx->assoc_max_init, relevance => -2 }; + $self->{mset} = $self->{srch}->mset($rcvibx->{qry_str}, $opt); + $self->{iter} = 0; + event_step($self); +} + +sub event_step { + my ($self) = @_; + my $rcvibx = $self->{rcvibx} // die 'BUG: no rcvibx'; + return if $rcvibx->{cidx}->do_quit; + my $last = $self->{mset}->size - 1; + my $cur = $self->{iter}; + my $end = $cur + 9999; + if ($end >= $last) { + send($rcvibx->{op_p}, 'index_next', MSG_EOR); + $end = $last; + } + $self->{iter} = $end + 1; + local $0 = "dumping $self->{ekey} $cur..$end"; + + my $sort_w = $rcvibx->{sort_w}; + my $ibx_id = $self->{ibx_id}; + local $0 = "dumping $self->{ekey} $cur..$end"; + $rcvibx->{cidx}->progress($0); + for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop + my $doc = $x->get_document; + for my $p (@{$rcvibx->{cidx}->{ASSOC_PFX}}) { + for (xap_terms($p, $doc)) { + print $sort_w "$_ $ibx_id\n" or die "print: $!"; + } + } + } + $end < $last && !$rcvibx->{cidx}->do_quit and + PublicInbox::DS::requeue($self); +} + +1; diff --git a/lib/PublicInbox/CidxDumpShardRoots.pm b/lib/PublicInbox/CidxDumpShardRoots.pm new file mode 100644 index 00000000..34afa419 --- /dev/null +++ b/lib/PublicInbox/CidxDumpShardRoots.pm @@ -0,0 +1,73 @@ +# Copyright (C) all contributors +# License: AGPL-3.0+ + +# Intended for PublicInbox::DS::event_loop for -cindex --associate +# Iterating through mset->items is slow in Perl due to method dispatch +# and that loop may implemented in C++ using Xapian directly +package PublicInbox::CidxDumpShardRoots; +use v5.12; +use PublicInbox::Lock; +use PublicInbox::Search qw(xap_terms); +use Socket qw(MSG_EOR); + +sub start { + my ($cidx, $root2id, $qry_str) = @_; + my $op_p = delete($cidx->{0}) // die 'BUG: no {0} op_p'; + my $sort_w = delete($cidx->{1}) // die 'BUG: no {1} $w sort pipe'; + # sort lock is necessary if we have may root ids which cause a + # row length to exceed POSIX PIPE_BUF (via `$G' below) + my $sort_lk = bless { lock_path => $cidx->tmpdir.'/to_root_id.lock' }, + 'PublicInbox::Lock'; + $sort_w->autoflush(1); + $cidx->begin_txn_lazy; # only using txn to simplify writer subs + my $opt = { limit => $cidx->assoc_max_init, relevance => -2 }; + my $self = bless { + cidx => $cidx, + op_p => $op_p, + iter => 0, + mset => $cidx->mset($qry_str, $opt), + root2id => $root2id, + sort_w => $sort_w, + sort_lk => $sort_lk, + }, __PACKAGE__; + event_step($self); +} + +sub event_step { + my ($self) = @_; + my $cidx = $self->{cidx}; + return if $cidx->do_quit; + my $last = $self->{mset}->size - 1; + my $cur = $self->{iter}; + my $end = $cur + 9999; + $end = $last if $end > $last; + $self->{iter} = $end + 1; + local $0 = "dumping shard [$cidx->{shard}] $cur..$end"; + $cidx->progress($0); + + my $root2id = $self->{root2id}; + my $buf = ''; + for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop + my $doc = $x->get_document; + my $G = join(' ', map { + $root2id->{pack('H*', $_)}; + } xap_terms('G', $doc)); + for my $p (@{$cidx->{ASSOC_PFX}}) { + $buf .= "$_ $G\n" for (xap_terms($p, $doc)); + } + } + $self->{sort_lk}->lock_acquire_fast; + print { $self->{sort_w} } $buf or die "print: $!"; + $self->{sort_lk}->lock_release_fast; + $end < $last && !$cidx->do_quit and + PublicInbox::DS::requeue($self); +} + +sub DESTROY { + my ($self) = @_; + return if $self->{cidx}->do_quit; + send($self->{op_p}, + "dump_shard_roots_done $self->{cidx}->{shard}", MSG_EOR); +} + +1; diff --git a/lib/PublicInbox/CidxRecvIbx.pm b/lib/PublicInbox/CidxRecvIbx.pm new file mode 100644 index 00000000..6add8e54 --- /dev/null +++ b/lib/PublicInbox/CidxRecvIbx.pm @@ -0,0 +1,46 @@ +# Copyright (C) all contributors +# License: AGPL-3.0+ +# +# dumps all per-inbox info for -cindex --associate +# integrated into the event loop for signalfd SIGINT handling +package PublicInbox::CidxRecvIbx; +use v5.12; +use parent qw(PublicInbox::DS); +use PublicInbox::Syscall qw(EPOLLIN EPOLLEXCLUSIVE); +use Socket qw(MSG_EOR); +use PublicInbox::CidxDumpIbx; + +sub new { + my ($cls, $cidx, $qry_str) = @_; + my ($op_p, $r_ibx, $sort_w) = delete @$cidx{0..2}; + $op_p // die 'BUG: no $op_p'; + $r_ibx // die 'BUG: no $r_ibx'; + $sort_w // die 'BUG: no $sort_w'; + my $self = bless {}, $cls; + $self->SUPER::new($r_ibx, EPOLLIN|EPOLLEXCLUSIVE); + $self->{cidx} = $cidx; + $self->{sort_w} = $sort_w; + $self->{op_p} = $op_p; # PublicInbox::CidxDumpIbx uses this + $self->{qry_str} = $qry_str; + # writes to this pipe are never longer than POSIX PIPE_BUF, + # so rely on POSIX atomicity guarantees + $sort_w->autoflush(1); + $self; +} + +sub event_step { + my ($self) = @_; + recv($self->{sock}, my $ibx_id, 25, 0) // die "recv: $!"; + return $self->close if $ibx_id eq '' || $self->{cidx}->do_quit; + PublicInbox::CidxDumpIbx::start($self, $ibx_id); +} + +sub close { + my ($self) = @_; + $self->{cidx}->do_quit or + send($self->{op_p}, + "recv_ibx_done $self->{cidx}->{shard}", MSG_EOR); + $self->SUPER::close; # PublicInbox::DS::close +} + +1; diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index ba14e52a..2480dbd2 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -11,6 +11,26 @@ # # We shard repos using the first 32-bits of sha256($ABS_GIT_DIR) # +# --associate joins root commits of coderepos to inboxes based on prefixes. +# +# Internally, each inbox is assigned a non-negative integer index ($IBX_ID), +# and each root commit object ID (SHA-1/SHA-256 hex) is also assigned +# a non-negative integer index ($ROOT_COMMIT_OID_ID). +# +# associate dumps to 2 intermediate files in $TMPDIR: +# +# * to_root_id - each line is of the format: +# +# $PFX $ROOT_COMMIT_OID_ID +# +# * to_ibx_id - each line is of the format: +# +# $PFX $IBX_ID +# +# In both cases, $PFX is typically the value of the patchid (XDFID) but it +# can be configured to use any combination of patchid, dfpre, dfpost or +# dfblob. +# # See PublicInbox::CodeSearch (read-only API) for more package PublicInbox::CodeSearchIdx; use v5.12; @@ -25,14 +45,14 @@ use File::Spec (); use PublicInbox::SHA qw(sha256_hex); use PublicInbox::Search qw(xap_terms); use PublicInbox::SearchIdx qw(add_val); -use PublicInbox::Config qw(glob2re); +use PublicInbox::Config qw(glob2re rel2abs_collapsed); use PublicInbox::Spawn qw(which spawn popen_rd); use PublicInbox::OnDestroy; use PublicInbox::CidxLogP; use PublicInbox::CidxComm; use PublicInbox::Git qw(%OFMT2HEXLEN); use PublicInbox::Compat qw(uniqstr); -use Socket qw(MSG_EOR); +use Socket qw(MSG_EOR AF_UNIX SOCK_SEQPACKET); use Carp (); our ( $LIVE, # pid => cmd @@ -55,8 +75,15 @@ our ( %ALT_FH, # hexlen => tmp IO for TMPDIR git alternates $TMPDIR, # File::Temp->newdir object for prune @PRUNE_QUEUE, # GIT_DIRs to prepare for pruning - $PRUNE_ENV, # env for awk(1), comm(1), sort(1) commands during prune + %TODO, @IBXQ, @IBX, + @JOIN, # join(1) command for associate + $CMD_ENV, # env for awk(1), comm(1), sort(1) commands during prune @AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands + @ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST + $QRY_STR, # common query string for both code and inbox associations + $IBXDIR_FEED, # SOCK_SEQPACKET + @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK, # for associate + @ID2ROOT, ); # stop walking history if we see >$SEEN_MAX existing commits, this assumes @@ -64,6 +91,9 @@ our ( # git walks commits quickly if it doesn't have to read trees our $SEEN_MAX = 100000; +# window for commits/emails to determine a inbox <-> coderepo association +my $ASSOC_MAX = 50000; + our @PRUNE_BATCH = qw(git _ cat-file --batch-all-objects --batch-check); # TODO: do we care about committer name + email? or tree OID? @@ -455,6 +485,91 @@ sub shard_commit { # via wq_io_do send($op_p, "shard_done $self->{shard}", MSG_EOR); } +sub dump_shard_roots_done { # via PktOp on dump_shard_roots completion + my ($self, $associate, $n) = @_; + return if $DO_QUIT; + progress($self, "dump_shard_roots [$n] done"); + $DUMP_SHARD_ROOTS_OK[$n] = 1; + # may run associate() +} + +sub assoc_max_init ($) { + my ($self) = @_; + my $max = $self->{-opt}->{'associate-max'} // $ASSOC_MAX; + $max = $ASSOC_MAX if !$max; + $max < 0 ? ((2 ** 31) - 1) : $max; +} + +# dump the patchids of each shard: $XDFID $ROOT1 $ROOT2.. +sub dump_shard_roots { # via wq_io_do for associate + my ($self, $root2id, $qry_str) = @_; + PublicInbox::CidxDumpShardRoots::start($self, $root2id, $qry_str); +} + +sub dump_roots_once { + my ($self, $associate) = @_; + $associate // die 'BUG: no $associate'; + $TODO{associating} = 1; # keep shards_active() happy + progress($self, 'dumping IDs from coderepos'); + local $self->{xdb}; + @ID2ROOT = map { pack('H*', $_) } $self->all_terms('G'); + my $id = 0; + my %root2id = map { $_ => $id++ } @ID2ROOT; + pipe(my ($r, $w)) or die "pipe: $!"; + my @sort = (@SORT, '-k1,1'); + my $dst = "$TMPDIR/to_root_id"; + open my $fh, '>', $dst or die "open($dst): $!"; + my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $r, 1 => $fh }); + close $r or die "close: $!"; + awaitpid($sort_pid, \&cmd_done, \@sort, $associate); + my ($c, $p) = PublicInbox::PktOp->pair; + $c->{ops}->{dump_shard_roots_done} = [ $self, $associate ]; + my @arg = ('dump_shard_roots', [ $p->{op_p}, $w ], \%root2id, $QRY_STR); + $_->wq_io_do(@arg) for @IDX_SHARDS; + progress($self, 'waiting on dump_shard_roots sort'); +} + +sub recv_ibx_done { # via PktOp on recv_ibx completion + my ($self, $pid, $n) = @_; + return if $DO_QUIT; + progress($self, "recv_ibx [$n] done"); + $RECV_IBX_OK[$n] = 1; +} + +# causes a worker to become a dumper for inbox/extindex +sub recv_ibx { # wq_io_do + my ($self, $qry_str) = @_; + PublicInbox::CidxRecvIbx->new($self, $qry_str); +} + +sub dump_ibx { # sends to PublicInbox::CidxRecvIbx::event_step + my ($self, $id_dir) = @_; # id_dir: "$IBX_ID=$INBOXDIR" + my $n = length($id_dir); + my $w = send($IBXDIR_FEED, $id_dir, MSG_EOR) // die "send: $!"; + $n == $w or die "send($id_dir) $w != $n"; +} + +# repurpose shard workers to dump inbox patchids with perfect balance +sub dump_ibx_start { + my ($self, $associate) = @_; + pipe(my ($sort_r, $sort_w)) or die "pipe: $!"; + my @sort = (@SORT, '-k1,1'); + my $dst = "$TMPDIR/to_ibx_id"; + open my $fh, '>', $dst or die "open($dst): $!"; + my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fh }); + close $sort_r or die "close: $!"; + awaitpid($sort_pid, \&cmd_done, \@sort, $associate); + + my ($r, $w); + socketpair($r, $w, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!"; + my ($c, $p) = PublicInbox::PktOp->pair; + $c->{ops}->{recv_ibx_done} = [ $self, $associate ]; + $c->{ops}->{index_next} = [ $self ]; + my $io = [ $p->{op_p}, $r, $sort_w ]; + $_->wq_io_do('recv_ibx', $io, $QRY_STR) for @IDX_SHARDS; + $IBXDIR_FEED = $w; +} + sub index_next ($) { my ($self) = @_; return if $DO_QUIT; @@ -466,6 +581,12 @@ sub index_next ($) { $self, $git); fp_start($self, $git, $prep_repo); ct_start($self, $git, $prep_repo); + } elsif ($TMPDIR) { + delete $TODO{dump_ibx_start}; # runs OnDestroy once + return dump_ibx($self, shift @IBXQ) if @IBXQ; + progress($self, 'done dumping inboxes') if $IBXDIR_FEED; + undef $IBXDIR_FEED; # done dumping inboxes, dump roots + dump_roots_once($self, delete($TODO{associate}) // return); } # else: wait for shards_active (post_loop_do) callback } @@ -502,7 +623,7 @@ sub commit_shard { # OnDestroy cb for my $n (keys %$active) { $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]); } - undef $p; # shard_done fires when all shards are committed + # shard_done fires when all shards are committed } sub index_repo { # cidx_await cb @@ -628,8 +749,8 @@ EOM sub scan_git_dirs ($) { my ($self) = @_; - @$GIT_TODO = @{$self->{git_dirs}}; - index_next($self) for (1..$LIVE_JOBS); + my $n = @$GIT_TODO = @{$self->{git_dirs}}; + progress($self, "scanning $n code repositories..."); } sub prune_do { # via wq_io_do in IDX_SHARDS @@ -661,7 +782,7 @@ sub shards_active { # post_loop_do return if grep(defined, $PRUNE_DONE, $GIT_TODO, $IDX_TODO, $LIVE) != 4; return 1 if grep(defined, @$PRUNE_DONE) != @IDX_SHARDS; return 1 if scalar(@$GIT_TODO) || scalar(@$IDX_TODO) || $REPO_CTX; - return 1 if keys(%$LIVE); + return 1 if keys(%$LIVE) || @IBXQ || keys(%TODO); for my $s (grep { $_->{-wq_s1} } @IDX_SHARDS) { $s->{-cidx_quit} = 1 if defined($s->{-wq_s1}); $s->wq_close; # may recurse via awaitpid outside of event_loop @@ -674,6 +795,7 @@ sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) } sub parent_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->(); + $IBXDIR_FEED = undef; kill_shards(@_); warn "# SIG$_[0] received, quitting...\n"; } @@ -717,6 +839,7 @@ sub prep_alternate_end { # awaitpid callback for config extensions.objectFormat E: ignoring objdir=$objdir, unknown extensions.objectFormat=$fmt EOM unless ($ALT_FH{$hexlen}) { + require PublicInbox::Import; my $git_dir = "$TMPDIR/hexlen$hexlen.git"; PublicInbox::Import::init_bare($git_dir, 'cidx-all', $fmt); my $f = "$git_dir/objects/info/alternates"; @@ -739,52 +862,102 @@ sub prep_alternate_start { awaitpid($pid, \&prep_alternate_end, $o, $out, $run_prune); } -sub prune_cmd_done { # awaitpid cb for sort, xapian-delve, sed failures - my ($pid, $cmd, $run_prune) = @_; +sub cmd_done { # awaitpid cb for sort, xapian-delve, sed failures + my ($pid, $cmd, $run_on_destroy) = @_; $? and die "@$cmd failed: \$?=$?"; + # $run_on_destroy calls associate() or run_prune() +} + +# runs once all inboxes and shards are dumped via OnDestroy +sub associate { + my ($self) = @_; + return if $DO_QUIT; + @IDX_SHARDS or return warn("# aborting on no shards\n"); + grep(defined, @DUMP_SHARD_ROOTS_OK) == @IDX_SHARDS or + die "E: shards not dumped properly\n"; + grep(defined, @RECV_IBX_OK) == @IDX_SHARDS or + die "E: inboxes not dumped properly\n"; + progress($self, 'associating...'); + my @join = ('time', @JOIN, 'to_ibx_id', 'to_root_id'); + my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" }); + my %score; + while (<$rd>) { # PFX ibx_id root_id + my (undef, $ibx_id, @root_id) = split(/ /, $_); + ++$score{"$ibx_id $_"} for @root_id; + } + close $rd or die "@join failed: $?=$?"; + my $min = $self->{-opt}->{'assoc-min'} // 10; + progress($self, scalar(keys %score).' potential pairings...'); + for my $k (keys %score) { + my $nr = $score{$k}; + my ($ibx_id, $root) = split(/ /, $k); + my $ekey = $IBX[$ibx_id]->eidx_key; + $root = unpack('H*', $ID2ROOT[$root]); + progress($self, "$ekey => $root has $nr matches"); + } + delete $TODO{associating}; # break out of shards_active() + # TODO + warn "# Waiting for $TMPDIR/cont @JOIN"; + system "ls -Rl $TMPDIR >&2"; + system "wc -l $TMPDIR/to_*_id >&2"; + #sleep(1) until -f "$TMPDIR/cont"; + # warn "# Waiting for $TMPDIR/cont"; + # sleep(1) until -f "$TMPDIR/cont"; +} + +sub require_progs { + my $op = shift; + while (my ($x, $argv) = splice(@_, 0, 2)) { + my $e = $x; + $e =~ tr/a-z-/A-Z_/; + my $c = $ENV{$e} // $x; + $argv->[0] //= which($c) // die "E: `$x' required for --$op\n"; + } +} + +sub init_associate_postfork ($) { + my ($self) = @_; + return unless $self->{-opt}->{associate}; + require_progs('associate', join => \@JOIN); + $QRY_STR = $self->{-opt}->{'associate-date-range'} // '1.year.ago..'; + substr($QRY_STR, 0, 0) = 'dt:'; + scalar(@{$self->{git_dirs} // []}) or die <new($self->{git_dirs}->[0]); # ugh + $self->query_approxidate($approx_git, $QRY_STR); # in-place + $TODO{associate} = PublicInbox::OnDestroy->new($$, \&associate, $self); + $TODO{dump_ibx_start} = PublicInbox::OnDestroy->new($$, + \&dump_ibx_start, $self, $TODO{associate}); + my $id = -1; + @IBXQ = map { ++$id } @IBX; } sub init_prune ($) { my ($self) = @_; return (@$PRUNE_DONE = map { 1 } @IDX_SHARDS) if !$self->{-opt}->{prune}; - require File::Temp; - require PublicInbox::Import; - $TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1); - # Dealing with millions of commits here at once, so use faster tools. # xapian-delve is nearly an order-of-magnitude faster than Xapian Perl # bindings. sed/awk are faster than Perl for simple stream ops, and # sort+comm are more memory-efficient with gigantic lists my @delve = (undef, qw(-A Q -1)); my @sed = (undef, '-ne', 's/^Q//p'); - @SORT = (undef, '-u'); @COMM = (undef, qw(-2 -3 indexed_commits -)); @AWK = (undef, '$2 == "commit" { print $1 }'); # --batch-check output - my @x = ('xapian-delve' => \@delve, sed => \@sed, - sort => \@SORT, comm => \@COMM, awk => \@AWK); - while (my ($x, $argv) = splice(@x, 0, 2)) { - my $e = $x; - $e =~ tr/a-z-/A-Z_/; - my $c = $ENV{$e} // $x; - $argv->[0] = which($c) // die "E: `$x' required for --prune\n"; - } + require_progs('prune', 'xapian-delve' => \@delve, sed => \@sed, + comm => \@COMM, awk => \@AWK); for (0..$#IDX_SHARDS) { push @delve, "$self->{xpfx}/$_" } - for (qw(parallel compress-program buffer-size)) { # GNU sort options - my $v = $self->{-opt}->{"sort-$_"}; - push @SORT, "--$_=$v" if defined $v; - } my $run_prune = PublicInbox::OnDestroy->new($$, \&run_prune, $self); pipe(my ($sed_in, $delve_out)) or die "pipe: $!"; pipe(my ($sort_in, $sed_out)) or die "pipe: $!"; open(my $sort_out, '+>', "$TMPDIR/indexed_commits") or die "open: $!"; - $PRUNE_ENV = { TMPDIR => "$TMPDIR", LC_ALL => 'C', LANG => 'C' }; - my $pid = spawn(\@SORT, $PRUNE_ENV, { 0 => $sort_in, 1 => $sort_out }); - awaitpid($pid, \&prune_cmd_done, \@SORT, $run_prune); - $pid = spawn(\@sed, $PRUNE_ENV, { 0 => $sed_in, 1 => $sed_out }); - awaitpid($pid, \&prune_cmd_done, \@sed, $run_prune); + my $pid = spawn(\@SORT, $CMD_ENV, { 0 => $sort_in, 1 => $sort_out }); + awaitpid($pid, \&cmd_done, \@SORT, $run_prune); + $pid = spawn(\@sed, $CMD_ENV, { 0 => $sed_in, 1 => $sed_out }); + awaitpid($pid, \&cmd_done, \@sed, $run_prune); $pid = spawn(\@delve, undef, { 1 => $delve_out }); - awaitpid($pid, \&prune_cmd_done, \@delve, $run_prune); + awaitpid($pid, \&cmd_done, \@delve, $run_prune); @PRUNE_QUEUE = @{$self->{git_dirs}}; for (1..$LIVE_JOBS) { prep_alternate_start(shift(@PRUNE_QUEUE) // last, $run_prune); @@ -809,14 +982,14 @@ sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done pipe(my ($awk_in, $batch_out)) or die "pipe: $!"; pipe(my ($sort_in, $awk_out)) or die "pipe: $!"; pipe(my ($comm_in, $sort_out)) or die "pipe: $!"; - my $awk_pid = spawn(\@AWK, $PRUNE_ENV, { 0 => $awk_in, 1 => $awk_out }); - my $sort_pid = spawn(\@SORT, $PRUNE_ENV, + my $awk_pid = spawn(\@AWK, $CMD_ENV, { 0 => $awk_in, 1 => $awk_out }); + my $sort_pid = spawn(\@SORT, $CMD_ENV, { 0 => $sort_in, 1 => $sort_out }); - my ($comm_rd, $comm_pid) = popen_rd(\@COMM, $PRUNE_ENV, + my ($comm_rd, $comm_pid) = popen_rd(\@COMM, $CMD_ENV, { 0 => $comm_in, -C => "$TMPDIR" }); - awaitpid($awk_pid, \&prune_cmd_done, \@AWK); - awaitpid($sort_pid, \&prune_cmd_done, \@SORT); - awaitpid($comm_pid, \&prune_cmd_done, \@COMM); + awaitpid($awk_pid, \&cmd_done, \@AWK); + awaitpid($sort_pid, \&cmd_done, \@SORT); + awaitpid($comm_pid, \&cmd_done, \@COMM); PublicInbox::CidxComm->new($comm_rd, $self); # calls cidx_read_comm my $git_ver = PublicInbox::Git::git_version(); push @PRUNE_BATCH, '--buffer' if $git_ver ge v2.6; @@ -856,6 +1029,35 @@ sub cidx_read_comm { # via PublicInbox::CidxComm::event_step for (@gone) { close $_ or die "close: $!" }; } +sub init_associate_prefork ($) { + my ($self) = @_; + return unless $self->{-opt}->{associate}; + require PublicInbox::CidxRecvIbx; + require PublicInbox::CidxDumpShardRoots; + $self->{-pi_cfg} = PublicInbox::Config->new; + my @unknown; + my @pfx = @{$self->{-opt}->{'associate-prefixes'} // [ 'patchid' ]}; + for (map { split(/\s*,\s*/) } @pfx) { + my $v = $PublicInbox::Search::PATCH_BOOL_COMMON{$_} // + push(@unknown, $_); + push(@ASSOC_PFX, split(/ /, $v)); + } + die < undef; + } (@{$self->{-opt}->{include} // []}); + $self->{-pi_cfg}->each_inbox(\&_prep_ibx, $self, \%incl); +} + +sub _prep_ibx { # each_inbox callback + my ($ibx, $self, $incl) = @_; + ($self->{-opt}->{all} || exists($incl->{$ibx->{inboxdir}})) and + push @{$self->{IBX}}, $ibx; +} + sub cidx_run { # main entry point my ($self) = @_; my $restore_umask = prep_umask($self); @@ -868,10 +1070,27 @@ sub cidx_run { # main entry point local $IDX_TODO = []; local $GIT_TODO = []; local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE, - $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, @SORT, $PRUNE_ENV); + $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV, + %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $IBXDIR_FEED, @ID2ROOT, + @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK); local $BATCH_BYTES = $self->{-opt}->{batch_size} // $PublicInbox::SearchIdx::BATCH_BYTES; - local @IDX_SHARDS = cidx_init($self); + local @SORT = (undef, '-u'); + local $self->{IBX} = \@IBX; + local $self->{ASSOC_PFX} = \@ASSOC_PFX; + local $self->{-pi_cfg}; + if (grep { $_ } @{$self->{-opt}}{qw(prune associate)}) { + require File::Temp; + $TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1); + $CMD_ENV = { TMPDIR => "$TMPDIR", LC_ALL => 'C', LANG => 'C' }; + require_progs('(prune|associate)', sort => \@SORT); + for (qw(parallel compress-program buffer-size)) { # GNU sort + my $v = $self->{-opt}->{"sort-$_"}; + push @SORT, "--$_=$v" if defined $v; + } + init_associate_prefork($self) + } + local @IDX_SHARDS = cidx_init($self); # forks workers local $self->{current_info} = ''; local $MY_SIG = { CHLD => \&PublicInbox::DS::enqueue_reap, @@ -919,7 +1138,9 @@ sub cidx_run { # main entry point PublicInbox::IPC::detect_nproc() || 2; local @RDONLY_XDB = $self->xdb_shards_flat; init_prune($self); + init_associate_postfork($self); scan_git_dirs($self) if $self->{-opt}->{scan} // 1; + index_next($self) for (1..$LIVE_JOBS); # FreeBSD ignores/discards SIGCHLD while signals are blocked and # EVFILT_SIGNAL is inactive, so we pretend we have a SIGCHLD pending @@ -954,4 +1175,9 @@ sub shard_done_wait { # awaitpid cb via ipc_worker_reap PublicInbox::DS::enqueue_reap() if !shards_active(); # once more for PLC } +sub do_quit { $DO_QUIT } + +sub tmpdir { $TMPDIR } + + 1; diff --git a/lib/PublicInbox/Config.pm b/lib/PublicInbox/Config.pm index 2f1b4122..0a6b210f 100644 --- a/lib/PublicInbox/Config.pm +++ b/lib/PublicInbox/Config.pm @@ -11,7 +11,7 @@ package PublicInbox::Config; use strict; use v5.10.1; use parent qw(Exporter); -our @EXPORT_OK = qw(glob2re); +our @EXPORT_OK = qw(glob2re rel2abs_collapsed); use PublicInbox::Inbox; use PublicInbox::Spawn qw(popen_rd); our $LD_PRELOAD = $ENV{LD_PRELOAD}; # only valid at startup diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm index 1559d9b3..d5b0bceb 100644 --- a/lib/PublicInbox/Search.pm +++ b/lib/PublicInbox/Search.pm @@ -197,7 +197,7 @@ sub xdir ($;$) { my ($self, $rdonly) = @_; if ($rdonly || !defined($self->{shard})) { $self->{xpfx}; - } else { # v2 + extindex only: + } else { # v2, extindex, cindex only: "$self->{xpfx}/$self->{shard}"; } } diff --git a/script/public-inbox-cindex b/script/public-inbox-cindex index 2f7796e7..888c8b10 100755 --- a/script/public-inbox-cindex +++ b/script/public-inbox-cindex @@ -26,8 +26,10 @@ See public-inbox-cindex(1) man page for full documentation. EOF my $opt = { fsync => 1, scan => 1 }; # --no-scan is hidden GetOptions($opt, qw(quiet|q verbose|v+ reindex jobs|j=i fsync|sync! dangerous - indexlevel|index-level|L=s + indexlevel|index-level|L=s associate associate-max=i + associate-date-range=s associate-prefixes=s@ batch_size|batch-size=s max_size|max-size=s + include|I=s@ only=s@ all project-list=s exclude=s@ sort-parallel=s sort-compress-program=s sort-buffer-size=s d=s update|u scan! prune dry-run|n C=s@ help|h))