unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
* [PATCH 00/28] cindex coderepo commit indexer
@ 2023-03-21 23:07 Eric Wong
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
  0 siblings, 1 reply; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

Not wired up to WWW nor lei, yet; but indexing + pruning of
commits works.

I'm not sure if indexing (root) tree OIDs or committer
names+emails is worth it, since I don't think those are very
important terms to search for.

I first wanted to shoehorn this into extindex, but I think it
works better as a separate Xapian schema.

It allows both internal indexes ($GIT_DIR/public-inbox-cindex)
for unforked repos, as well as extindex-style external index
to encompass several projects.

The indexer is structured a bit more nicely than existing
indexers since I'm relying on OnDestroy and `local', more.
I would like to trickle some of these improvements back to
the mail indexers at some point.

--prune and --reindex currently block incremental updates, which
isn't great since both take a while for giant Xapian DBs.

Pruning is pretty important since it's much common for coderepos
(e.g. `seen' branch of git.git)

`lei cq' will probably be a new command which behaves
similarly to `lei q -f text', but takes `git log' options
for output...

Eric Wong (28):
  ipc: move nproc_shards from v2writable
  search: relocate all_terms from lei_search
  admin: hoist out resolve_git_dir
  admin: ensure resolved GIT_DIR is absolute
  test_common: create_inbox: use `$!' properly on mkdir failure
  codesearch: initial cut w/ -cindex tool
  cindex: parallelize prep phases
  cindex: use read-only shards during prep phases
  searchidxshard: improve comment wording
  cindex: use DS and workqueues for parallelism
  ds: @post_loop_do replaces SetPostLoopCallback
  cindex: implement --exclude= like -clone
  cindex: show shard number in progress message
  cindex: drop `unchanged' progress message
  cindex: handle graceful shutdown by default
  sigfd: pass signal name rather than number to callback
  cindex: implement --max-size=SIZE
  cindex: check for checkpoint before giant messages
  cindex: truncate or drop body for over-sized commits
  cindex: attempt to give oldest commits lowest docids
  cindex: improve granularity of quit checks
  spawn: show failing directory for chdir failures
  cindex: filter out non-existent git directories
  cindex: add support for --prune
  cindex: implement reindex
  cindex: squelch incompatible options
  cindex: respect existing permissions
  cindex: ignore SIGPIPE

 MANIFEST                          |   4 +
 lib/PublicInbox/Admin.pm          |  18 +-
 lib/PublicInbox/CodeSearch.pm     | 121 +++++
 lib/PublicInbox/CodeSearchIdx.pm  | 835 ++++++++++++++++++++++++++++++
 lib/PublicInbox/Config.pm         |   2 +-
 lib/PublicInbox/DS.pm             |  30 +-
 lib/PublicInbox/Daemon.pm         |   4 +-
 lib/PublicInbox/ExtSearchIdx.pm   |   2 +-
 lib/PublicInbox/IPC.pm            |  33 +-
 lib/PublicInbox/LEI.pm            |   4 +-
 lib/PublicInbox/LeiSearch.pm      |  14 -
 lib/PublicInbox/MiscIdx.pm        |   2 +-
 lib/PublicInbox/Search.pm         |  77 ++-
 lib/PublicInbox/SearchIdx.pm      |  88 ++--
 lib/PublicInbox/SearchIdxShard.pm |   7 +-
 lib/PublicInbox/Sigfd.pm          |  10 +-
 lib/PublicInbox/Spawn.pm          |   6 +-
 lib/PublicInbox/SpawnPP.pm        |   2 +-
 lib/PublicInbox/TestCommon.pm     |  47 +-
 lib/PublicInbox/V2Writable.pm     |  26 +-
 lib/PublicInbox/Watch.pm          |   2 +-
 script/public-inbox-cindex        |  86 +++
 script/public-inbox-convert       |   2 +-
 t/cindex.t                        | 134 +++++
 t/dir_idle.t                      |   6 +-
 t/ds-leak.t                       |   8 +-
 t/imapd.t                         |   6 +-
 t/nntpd.t                         |   2 +-
 t/sigfd.t                         |   7 +-
 t/watch_maildir.t                 |   8 +-
 xt/mem-imapd-tls.t                |   7 +-
 xt/mem-nntpd-tls.t                |   8 +-
 xt/net_writer-imap.t              |   4 +-
 33 files changed, 1424 insertions(+), 188 deletions(-)
 create mode 100644 lib/PublicInbox/CodeSearch.pm
 create mode 100644 lib/PublicInbox/CodeSearchIdx.pm
 create mode 100755 script/public-inbox-cindex
 create mode 100644 t/cindex.t

^ permalink raw reply	[flat|nested] 30+ messages in thread

* [PATCH 01/28] ipc: move nproc_shards from v2writable
  2023-03-21 23:07 [PATCH 00/28] cindex coderepo commit indexer Eric Wong
@ 2023-03-21 23:07 ` Eric Wong
  2023-03-21 23:07   ` [PATCH 02/28] search: relocate all_terms from lei_search Eric Wong
                     ` (26 more replies)
  0 siblings, 27 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

We'll be using nproc_shards for indexing non-Inbox stuff.
---
 lib/PublicInbox/IPC.pm        | 26 +++++++++++++++++++++++++-
 lib/PublicInbox/V2Writable.pm | 26 +-------------------------
 2 files changed, 26 insertions(+), 26 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 548a72eb..730f2cf6 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -19,7 +19,7 @@ use PublicInbox::WQWorker;
 use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
 my $MY_MAX_ARG_STRLEN = 4096 * 33; # extra 4K for serialization
 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
-our @EXPORT_OK = qw(ipc_freeze ipc_thaw);
+our @EXPORT_OK = qw(ipc_freeze ipc_thaw nproc_shards);
 my ($enc, $dec);
 # ->imports at BEGIN turns sereal_*_with_object into custom ops on 5.14+
 # and eliminate method call overhead
@@ -454,4 +454,28 @@ sub detect_nproc () {
 	undef
 }
 
+# SATA storage lags behind what CPUs are capable of, so relying on
+# nproc(1) can be misleading and having extra Xapian shards is a
+# waste of FDs and space.  It can also lead to excessive IO latency
+# and slow things down.  Users on NVME or other fast storage can
+# use the NPROC env or switches in our script/public-inbox-* programs
+# to increase Xapian shards
+our $NPROC_MAX_DEFAULT = 4;
+
+sub nproc_shards ($) {
+	my ($creat_opt) = @_;
+	my $n = $creat_opt->{nproc} if ref($creat_opt) eq 'HASH';
+	$n //= $ENV{NPROC};
+	if (!$n) {
+		# assume 2 cores if not detectable or zero
+		state $NPROC_DETECTED = PublicInbox::IPC::detect_nproc() || 2;
+		$n = $NPROC_DETECTED;
+		$n = $NPROC_MAX_DEFAULT if $n > $NPROC_MAX_DEFAULT;
+	}
+
+	# subtract for the main process and git-fast-import
+	$n -= 1;
+	$n < 1 ? 1 : $n;
+}
+
 1;
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index ed5182ae..d3d13941 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -8,7 +8,7 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::Lock PublicInbox::IPC);
 use PublicInbox::SearchIdxShard;
-use PublicInbox::IPC;
+use PublicInbox::IPC qw(nproc_shards);
 use PublicInbox::Eml;
 use PublicInbox::Git;
 use PublicInbox::Import;
@@ -29,30 +29,6 @@ my $OID = qr/[a-f0-9]{40,}/;
 # an estimate of the post-packed size to the raw uncompressed size
 our $PACKING_FACTOR = 0.4;
 
-# SATA storage lags behind what CPUs are capable of, so relying on
-# nproc(1) can be misleading and having extra Xapian shards is a
-# waste of FDs and space.  It can also lead to excessive IO latency
-# and slow things down.  Users on NVME or other fast storage can
-# use the NPROC env or switches in our script/public-inbox-* programs
-# to increase Xapian shards
-our $NPROC_MAX_DEFAULT = 4;
-
-sub nproc_shards ($) {
-	my ($creat_opt) = @_;
-	my $n = $creat_opt->{nproc} if ref($creat_opt) eq 'HASH';
-	$n //= $ENV{NPROC};
-	if (!$n) {
-		# assume 2 cores if not detectable or zero
-		state $NPROC_DETECTED = PublicInbox::IPC::detect_nproc() || 2;
-		$n = $NPROC_DETECTED;
-		$n = $NPROC_MAX_DEFAULT if $n > $NPROC_MAX_DEFAULT;
-	}
-
-	# subtract for the main process and git-fast-import
-	$n -= 1;
-	$n < 1 ? 1 : $n;
-}
-
 sub count_shards ($) {
 	my ($self) = @_;
 	# always load existing shards in case core count changes:

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 02/28] search: relocate all_terms from lei_search
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 03/28] admin: hoist out resolve_git_dir Eric Wong
                     ` (25 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

This will be used for code_search, too.
---
 lib/PublicInbox/LeiSearch.pm | 14 --------------
 lib/PublicInbox/Search.pm    | 14 ++++++++++++++
 2 files changed, 14 insertions(+), 14 deletions(-)

diff --git a/lib/PublicInbox/LeiSearch.pm b/lib/PublicInbox/LeiSearch.pm
index 936c2751..ba4c4309 100644
--- a/lib/PublicInbox/LeiSearch.pm
+++ b/lib/PublicInbox/LeiSearch.pm
@@ -158,20 +158,6 @@ sub kw_changed {
 	join("\0", @$new_kw_sorted) eq $cur_kw ? 0 : 1;
 }
 
-sub all_terms {
-	my ($self, $pfx) = @_;
-	my $xdb = $self->xdb;
-	my $cur = $xdb->allterms_begin($pfx);
-	my $end = $xdb->allterms_end($pfx);
-	my %ret;
-	for (; $cur != $end; $cur++) {
-		my $tn = $cur->get_termname;
-		index($tn, $pfx) == 0 and
-			$ret{substr($tn, length($pfx))} = undef;
-	}
-	wantarray ? (sort keys %ret) : \%ret;
-}
-
 sub qparse_new {
 	my ($self) = @_;
 	my $qp = $self->SUPER::qparse_new; # PublicInbox::Search
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index e858729a..7aba2445 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -553,4 +553,18 @@ sub num2docid ($$) {
 	($num - 1) * $nshard + $num % $nshard + 1;
 }
 
+sub all_terms {
+	my ($self, $pfx) = @_;
+	my $cur = xdb($self)->allterms_begin($pfx);
+	my $end = $self->{xdb}->allterms_end($pfx);
+	my %ret;
+	for (; $cur != $end; $cur++) {
+		my $tn = $cur->get_termname;
+		index($tn, $pfx) == 0 and
+			$ret{substr($tn, length($pfx))} = undef;
+	}
+	wantarray ? (sort keys %ret) : \%ret;
+}
+
+
 1;

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 03/28] admin: hoist out resolve_git_dir
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
  2023-03-21 23:07   ` [PATCH 02/28] search: relocate all_terms from lei_search Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 04/28] admin: ensure resolved GIT_DIR is absolute Eric Wong
                     ` (24 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

We'll be using this for indexing git coderepos, and
switch to Perl 5.12 while we're at it since unicode_strings
doesn't affect this package.
---
 lib/PublicInbox/Admin.pm | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git a/lib/PublicInbox/Admin.pm b/lib/PublicInbox/Admin.pm
index 11ea8f83..b34f2256 100644
--- a/lib/PublicInbox/Admin.pm
+++ b/lib/PublicInbox/Admin.pm
@@ -1,10 +1,10 @@
-# Copyright (C) 2019-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # common stuff for administrative command-line tools
 # Unstable internal API
 package PublicInbox::Admin;
-use strict;
+use v5.12;
 use parent qw(Exporter);
 our @EXPORT_OK = qw(setup_signals);
 use PublicInbox::Config;
@@ -69,13 +69,19 @@ sub resolve_inboxdir {
 			die "`$try' is not a directory\n";
 		}
 	}
+	my $dir = resolve_git_dir($cd);
+	$$ver = 1 if $ver;
+	$dir;
+}
+
+sub resolve_git_dir {
+	my ($cd) = @_;
 	# try v1 bare git dirs
 	my $cmd = [ qw(git rev-parse --git-dir) ];
 	my $fh = popen_rd($cmd, undef, {-C => $cd});
 	my $dir = do { local $/; <$fh> };
-	close $fh or die "error in @$cmd (cwd:${\($cd // '.')}): $!\n";
+	close $fh or die "error in @$cmd (cwd:${\($cd // '.')}): $?\n";
 	chomp $dir;
-	$$ver = 1 if $ver;
 	rel2abs_collapsed($dir eq '.' ? ($cd // $dir) : $dir);
 }
 

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 04/28] admin: ensure resolved GIT_DIR is absolute
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
  2023-03-21 23:07   ` [PATCH 02/28] search: relocate all_terms from lei_search Eric Wong
  2023-03-21 23:07   ` [PATCH 03/28] admin: hoist out resolve_git_dir Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 05/28] test_common: create_inbox: use `$!' properly on mkdir failure Eric Wong
                     ` (23 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

We'll also support the $base arg of File::Spec->rel2abs
since it should make codesearch indexing easier.
---
 lib/PublicInbox/Admin.pm    | 4 +++-
 lib/PublicInbox/Config.pm   | 2 +-
 script/public-inbox-convert | 2 +-
 3 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/lib/PublicInbox/Admin.pm b/lib/PublicInbox/Admin.pm
index b34f2256..abfcbb9c 100644
--- a/lib/PublicInbox/Admin.pm
+++ b/lib/PublicInbox/Admin.pm
@@ -82,7 +82,9 @@ sub resolve_git_dir {
 	my $dir = do { local $/; <$fh> };
 	close $fh or die "error in @$cmd (cwd:${\($cd // '.')}): $?\n";
 	chomp $dir;
-	rel2abs_collapsed($dir eq '.' ? ($cd // $dir) : $dir);
+	# --absolute-git-dir requires git v2.13.0+
+	$dir = rel2abs_collapsed($dir, $cd) if $dir !~ m!\A/!;
+	$dir;
 }
 
 # for unconfigured inboxes
diff --git a/lib/PublicInbox/Config.pm b/lib/PublicInbox/Config.pm
index 4065b256..e095ecd1 100644
--- a/lib/PublicInbox/Config.pm
+++ b/lib/PublicInbox/Config.pm
@@ -371,7 +371,7 @@ sub git_bool {
 # is sufficient and doesn't leave "/.." or "/../"
 sub rel2abs_collapsed {
 	require File::Spec;
-	my $p = File::Spec->rel2abs($_[-1]);
+	my $p = File::Spec->rel2abs(@_);
 	return $p if substr($p, -3, 3) ne '/..' && index($p, '/../') < 0;
 	require Cwd;
 	Cwd::abs_path($p);
diff --git a/script/public-inbox-convert b/script/public-inbox-convert
index 42955a48..5f4f2020 100755
--- a/script/public-inbox-convert
+++ b/script/public-inbox-convert
@@ -75,7 +75,7 @@ if ($opt->{'index'}) {
 }
 local %ENV = (%$env, %ENV) if $env;
 my $new = { %$old };
-$new->{inboxdir} = $cfg->rel2abs_collapsed($new_dir);
+$new->{inboxdir} = PublicInbox::Config::rel2abs_collapsed($new_dir);
 $new->{version} = 2;
 $new = PublicInbox::InboxWritable->new($new, { nproc => $opt->{jobs} });
 $new->{-no_fsync} = 1 if !$opt->{fsync};

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 05/28] test_common: create_inbox: use `$!' properly on mkdir failure
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (2 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 04/28] admin: ensure resolved GIT_DIR is absolute Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 06/28] codesearch: initial cut w/ -cindex tool Eric Wong
                     ` (22 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

stat(2) may fail and set `$!', too, so we must stash it, first.
---
 lib/PublicInbox/TestCommon.pm | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/lib/PublicInbox/TestCommon.pm b/lib/PublicInbox/TestCommon.pm
index 5807105a..ed28ac48 100644
--- a/lib/PublicInbox/TestCommon.pm
+++ b/lib/PublicInbox/TestCommon.pm
@@ -709,9 +709,9 @@ sub create_inbox ($$;@) {
 	my ($db) = (PublicInbox::Import::default_branch() =~ m!([^/]+)\z!);
 	my $dir = "t/data-gen/$base.$ident-$db";
 	my $new = !-d $dir;
-	if ($new) {
-		mkdir $dir; # may race
-		-d $dir or BAIL_OUT "$dir could not be created: $!";
+	if ($new && !mkdir($dir)) {
+		my $err = $!;
+		-d $dir or xbail "mkdir($dir): $err";
 	}
 	my $lk = bless { lock_path => "$dir/creat.lock" }, 'PublicInbox::Lock';
 	$opt{inboxdir} = File::Spec->rel2abs($dir);

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 06/28] codesearch: initial cut w/ -cindex tool
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (3 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 05/28] test_common: create_inbox: use `$!' properly on mkdir failure Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 07/28] cindex: parallelize prep phases Eric Wong
                     ` (21 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

It seems relying on root commits is a reasonable way to
deduplicate and handle repositories with common history.

I initially wanted to shoehorn this into extindex, but decided a
separate Xapian index layout capable of being EITHER external to
handle many forks or internal (in $GIT_DIR/public-inbox-cindex)
for small projects is the right way to go.

Unlike most existing parts of public-inbox, this relies on
absolute paths of $GIT_DIR stored in the Xapian DB and does not
rely on the config file.  We'll be relying on the config file to
map absolute paths to public URL paths for WWW.
---
 MANIFEST                         |   4 +
 lib/PublicInbox/CodeSearch.pm    | 121 +++++++++
 lib/PublicInbox/CodeSearchIdx.pm | 425 +++++++++++++++++++++++++++++++
 lib/PublicInbox/MiscIdx.pm       |   2 +-
 lib/PublicInbox/Search.pm        |  63 +++--
 lib/PublicInbox/SearchIdx.pm     |  88 ++++---
 lib/PublicInbox/TestCommon.pm    |  41 ++-
 script/public-inbox-cindex       |  75 ++++++
 t/cindex.t                       |  98 +++++++
 9 files changed, 849 insertions(+), 68 deletions(-)
 create mode 100644 lib/PublicInbox/CodeSearch.pm
 create mode 100644 lib/PublicInbox/CodeSearchIdx.pm
 create mode 100755 script/public-inbox-cindex
 create mode 100644 t/cindex.t

diff --git a/MANIFEST b/MANIFEST
index bc652e21..40535233 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -161,6 +161,8 @@ lib/PublicInbox/AltId.pm
 lib/PublicInbox/AutoReap.pm
 lib/PublicInbox/Cgit.pm
 lib/PublicInbox/CmdIPC4.pm
+lib/PublicInbox/CodeSearch.pm
+lib/PublicInbox/CodeSearchIdx.pm
 lib/PublicInbox/CompressNoop.pm
 lib/PublicInbox/Config.pm
 lib/PublicInbox/ConfigIter.pm
@@ -363,6 +365,7 @@ sa_config/README
 sa_config/root/etc/spamassassin/public-inbox.pre
 sa_config/user/.spamassassin/user_prefs
 script/lei
+script/public-inbox-cindex
 script/public-inbox-clone
 script/public-inbox-compact
 script/public-inbox-convert
@@ -402,6 +405,7 @@ t/altid.t
 t/altid_v2.t
 t/cgi.t
 t/check-www-inbox.perl
+t/cindex.t
 t/clone-coderepo-puh1.sh
 t/clone-coderepo-puh2.sh
 t/clone-coderepo.psgi
diff --git a/lib/PublicInbox/CodeSearch.pm b/lib/PublicInbox/CodeSearch.pm
new file mode 100644
index 00000000..1dfc124f
--- /dev/null
+++ b/lib/PublicInbox/CodeSearch.pm
@@ -0,0 +1,121 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# read-only external index for coderepos
+# currently, it only indexes commits and repository metadata
+# (pathname, root commits); not blob contents
+package PublicInbox::CodeSearch;
+use v5.12;
+use parent qw(PublicInbox::Search);
+use PublicInbox::Search qw(retry_reopen int_val xap_terms);
+use constant {
+	AT => 0, # author time YYYYMMDDHHMMSS, dt: for mail)
+	CT => 1, # commit time (Unix time stamp, like TS/rt: in mail)
+	CIDX_SCHEMA_VER => 1, # brand new schema for code search
+	# for repos (`Tr'), CT(col=1) is used for the latest tip commit time
+	# in refs/{heads,tags}.  AT(col=0) may be used to store disk usage
+	# in the future, but disk usage calculation is espensive w/ alternates
+};
+
+# note: the non-X term prefix allocations are shared with Xapian omega,
+# see xapian-applications/omega/docs/termprefixes.rst
+# bool_pfx_internal:
+#	type => 'T', # 'c' - commit, 'r' - repo GIT_DIR
+#	tags are not indexed, only normal branches (refs/heads/*), not hidden
+#	'P' # (pathname) GIT_DIR # uniq
+#	'G' # (group) root commit (may have multiple roots)
+my %bool_pfx_external = (
+	oid => 'Q', # type:commit - git OID hex (40|64)-byte SHA-(1|256)
+		# type:repo - rel2abs_collapsed(GIT_DIR)
+	parent => 'XP',
+	%PublicInbox::Search::PATCH_BOOL_COMMON,
+);
+
+my %prob_prefix = ( # copied from PublicInbox::Search
+	# do we care about committer? or partial commit OID via Xapian?
+	# o => 'XQ', # 'oid:' (bool) is exact, 'o:' (prob) can do partial
+	%PublicInbox::Search::PATCH_PROB_COMMON,
+
+	# default:
+	'' => 'S A XQUOT XFN ' . $PublicInbox::Search::NON_QUOTED_BODY
+);
+
+sub new {
+	my ($cls, $dir) = @_;
+	bless { xpfx => "$dir/cidx".CIDX_SCHEMA_VER }, $cls;
+}
+
+sub cqparse_new ($) {
+	my ($self) = @_;
+	my $qp = $self->qp_init_common;
+	my $cb = $qp->can('add_valuerangeprocessor') //
+		$qp->can('add_rangeprocessor'); # Xapian 1.5.0+
+	$cb->($qp, $PublicInbox::Search::NVRP->new(AT, 'd:')); # mairix compat
+	$cb->($qp, $PublicInbox::Search::NVRP->new(AT, 'dt:')); # mail compat
+	$cb->($qp, $PublicInbox::Search::NVRP->new(CT, 'ct:'));
+
+	while (my ($name, $pfx) = each %bool_pfx_external) {
+		$qp->add_boolean_prefix($name, $_) for split(/ /, $pfx);
+	}
+	while (my ($name, $pfx) = each %prob_prefix) {
+		$qp->add_prefix($name, $_) for split(/ /, $pfx);
+	}
+	$qp;
+}
+
+# returns a Xapian::Query to filter by roots
+sub roots_filter { # retry_reopen callback
+	my ($self, $git_dir) = @_;
+	my $xdb = $self->xdb;
+	my $P = 'P'.$git_dir;
+	my ($cur, $end) = ($xdb->postlist_begin($P), $xdb->postlist_end($P));
+	if ($cur == $end) {
+		warn "W: $git_dir not indexed?\n";
+		return;
+	}
+	my @roots = xap_terms('G', $xdb, $cur->get_docid);
+	if (!@roots) {
+		warn "W: $git_dir has no root commits?\n";
+		return;
+	}
+	my $q = $PublicInbox::Search::X{Query}->new('G'.shift(@roots));
+	for my $r (@roots) {
+		$q = $PublicInbox::Search::X{Query}->new(
+					PublicInbox::Search::OP_OR(),
+					$q, 'G'.$r);
+	}
+	$q;
+}
+
+sub mset {
+	my ($self, $qry_str, $opt) = @_;
+	my $qp = $self->{qp} //= cqparse_new($self);
+	my $qry = $qp->parse_query($qry_str, $self->{qp_flags});
+
+	# limit to commits with shared roots
+	if (defined(my $git_dir = $opt->{git_dir})) {
+		my $rf = retry_reopen($self, \&roots_filter, $git_dir)
+			or return;
+
+		$qry = $PublicInbox::Search::X{Query}->new(
+				PublicInbox::Search::OP_FILTER(),
+				$qry, $rf);
+	}
+
+	# we only want commits:
+	$qry = $PublicInbox::Search::X{Query}->new(
+				PublicInbox::Search::OP_FILTER(),
+				$qry, 'T'.'c');
+
+	my $enq = $PublicInbox::Search::X{Enquire}->new($self->xdb);
+	$enq->set_query($qry);
+	if ($opt->{relevance}) {
+		$enq->set_sort_by_relevance_then_value(CT, !$opt->{asc});
+	} else {
+		$enq->set_sort_by_value_then_relevance(CT, !$opt->{asc});
+	}
+	$self->retry_reopen($self->can('enquire_once'), $enq,
+			$opt->{offset} || 0, $opt->{limit} || 50);
+}
+
+1;
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
new file mode 100644
index 00000000..218338da
--- /dev/null
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -0,0 +1,425 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+#
+# indexer for git coderepos, just commits and repo paths for now
+# this stores normalized absolute paths of indexed GIT_DIR inside
+# the DB itself and is designed to handle forks by designating roots
+#
+# Unlike mail search, docid isn't tied to NNTP artnum or IMAP UID,
+# there's no serial number dependency at all.  The first 32-bits of
+# the commit SHA-(1|256) is used to select a shard.
+#
+# We shard repos using the first 32-bits of sha256($ABS_GIT_DIR)
+#
+# See PublicInbox::CodeSearch (read-only API) for more
+package PublicInbox::CodeSearchIdx;
+use v5.12;
+use parent qw(PublicInbox::Lock PublicInbox::CodeSearch PublicInbox::SearchIdx);
+use PublicInbox::Eml;
+use PublicInbox::DS ();
+use PublicInbox::IPC qw(nproc_shards);
+use PublicInbox::Admin;
+use POSIX qw(WNOHANG SEEK_SET);
+use File::Path ();
+use File::Spec ();
+use PublicInbox::SHA qw(sha256_hex);
+use PublicInbox::Search qw(xap_terms);
+use PublicInbox::SearchIdx qw(add_val);
+use PublicInbox::Config;
+use PublicInbox::Spawn qw(run_die);
+
+# stop walking history if we see >$SEEN_MAX existing commits, this assumes
+# branches don't diverge by more than this number of commits...
+# git walks commits quickly if it doesn't have to read trees
+our $SEEN_MAX = 100000;
+
+# TODO: do we care about committer name + email? or tree OID?
+my @FMT = qw(H P ct an ae at s b); # (b)ody must be last
+my @LOG_STDIN = (qw(log --no-decorate --no-color --no-notes -p --stat -M
+	--stdin --no-walk=unsorted), '--pretty=format:%n%x00'.
+	join('%n', map { "%$_" } @FMT));
+
+sub new {
+	my (undef, $dir, $opt) = @_;
+	my $l = $opt->{indexlevel} // 'full';
+	$l !~ $PublicInbox::SearchIdx::INDEXLEVELS and
+		die "invalid indexlevel=$l\n";
+	$l eq 'basic' and die "E: indexlevel=basic not supported\n";
+	my $self = bless {
+		xpfx => "$dir/cidx".  PublicInbox::CodeSearch::CIDX_SCHEMA_VER,
+		cidx_dir => $dir,
+		creat => 1, # TODO: get rid of this, should be implicit
+		indexlevel => $l,
+		transact_bytes => 0, # for checkpoint
+		total_bytes => 0, # for lock_release
+		current_info => '',
+		parallel => 1,
+		-opt => $opt,
+		lock_path => "$dir/cidx.lock",
+	}, __PACKAGE__;
+	$self->{nshard} = count_shards($self) ||
+		nproc_shards({nproc => $opt->{jobs}});
+	$self->{-no_fsync} = 1 if !$opt->{fsync};
+	$self->{-dangerous} = 1 if $opt->{dangerous};
+	$self;
+}
+
+# TODO: may be used for reshard/compact
+sub count_shards { scalar($_[0]->xdb_shards_flat) }
+
+sub add_commit ($$) {
+	my ($self, $cmt) = @_; # fields from @FMT
+	my $x = 'Q'.$cmt->{H};
+	for (docids_by_postlist($self, $x)) {
+		$self->{xdb}->delete_document($_)
+	}
+	my $doc = $PublicInbox::Search::X{Document}->new;
+	$doc->add_boolean_term($x);
+	$doc->add_boolean_term('G'.$_) for @{$self->{roots}};
+	$doc->add_boolean_term('XP'.$_) for split(/ /, $cmt->{P});
+	$doc->add_boolean_term('T'.'c');
+
+	# Author-Time is compatible with dt: for mail search schema_version=15
+	add_val($doc, PublicInbox::CodeSearch::AT,
+		POSIX::strftime('%Y%m%d%H%M%S', gmtime($cmt->{at})));
+
+	# Commit-Time is the fallback used by rt: (TS) for mail search:
+	add_val($doc, PublicInbox::CodeSearch::CT, $cmt->{ct});
+
+	$self->term_generator->set_document($doc);
+
+	# email address is always indexed with positional data for usability
+	$self->index_phrase("$cmt->{an} <$cmt->{ae}>", 1, 'A');
+
+	$x = $cmt->{'s'};
+	$self->index_text($x, 1, 'S') if $x =~ /\S/s;
+	$doc->set_data($x); # subject is the first (and currently only) line
+
+	$x = delete $cmt->{b};
+	$self->index_body_text($doc, \$x) if $x =~ /\S/s;
+	$self->{xdb}->add_document($doc);
+}
+
+sub progress {
+	my ($self, @msg) = @_;
+	my $pr = $self->{-opt}->{-progress} or return;
+	$pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n");
+}
+
+sub store_repo ($$) {
+	my ($self, $repo) = @_;
+	my $xdb = delete($repo->{shard})->idx_acquire;
+	$xdb->begin_transaction;
+	if (defined $repo->{id}) {
+		my $doc = $xdb->get_document($repo->{id}) //
+			die "$self->{git}->{git_dir} doc #$repo->{id} gone";
+		add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct});
+		my %new = map { $_ => undef } @{$self->{roots}};
+		my $old = xap_terms('G', $doc);
+		delete @new{keys %$old};
+		$doc->add_boolean_term('G'.$_) for keys %new;
+		delete @$old{@{$self->{roots}}};
+		$doc->remove_term('G'.$_) for keys %$old;
+		$doc->set_data($repo->{fp});
+		$xdb->replace_document($repo->{id}, $doc);
+	} else {
+		my $new = $PublicInbox::Search::X{Document}->new;
+		add_val($new, PublicInbox::CodeSearch::CT, $repo->{ct});
+		$new->add_boolean_term("P$self->{git}->{git_dir}");
+		$new->add_boolean_term('T'.'r');
+		$new->add_boolean_term('G'.$_) for @{$repo->{roots}};
+		$new->set_data($repo->{fp}); # \n delimited
+		$xdb->add_document($new);
+	}
+	$xdb->commit_transaction;
+}
+
+# sharded reader for `git log --pretty=format: --stdin'
+sub shard_worker ($$$) {
+	my ($self, $r, $sigset) = @_;
+	my ($quit, $cmt);
+	my $batch_bytes = $self->{-opt}->{batch_size} //
+				$PublicInbox::SearchIdx::BATCH_BYTES;
+	my $max = $batch_bytes;
+	$SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import'
+	$SIG{QUIT} = $SIG{TERM} = $SIG{INT} = sub { $quit = shift };
+	PublicInbox::DS::sig_setmask($sigset);
+
+	# the parent process of this shard process writes directly to
+	# the stdin of `git log', we consume git log's stdout:
+	my $rd = $self->{git}->popen(@LOG_STDIN, undef, { 0 => $r });
+	close $r or die "close: $!";
+	my $nr = 0;
+
+	# a patch may have \0, see c4201214cbf10636e2c1ab9131573f735b42c8d4
+	# in linux.git, so we use $/ = "\n\0" to check end-of-patch
+	my $FS = "\n\0";
+	local $/ = $FS;
+	my $buf = <$rd> // return; # leading $FS
+	$buf eq $FS or die "BUG: not LF-NUL: $buf\n";
+	my $xdb = $self->idx_acquire;
+	$xdb->begin_transaction;
+	while (defined($buf = <$rd>)) {
+		chomp($buf);
+		$max -= length($buf);
+		@$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
+		$/ = "\n";
+		add_commit($self, $cmt);
+		last if $quit; # likely SIGPIPE
+		++$nr;
+		if ($max <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
+			progress($self, $nr);
+			$xdb->commit_transaction;
+			$max = $batch_bytes;
+			$xdb->begin_transaction;
+		}
+		$/ = $FS;
+	}
+	close($rd);
+	if (!$? || ($quit && ($? & 127) == POSIX::SIGPIPE)) {
+		$xdb->commit_transaction;
+	} else {
+		warn "E: git @LOG_STDIN: \$?=$?\n";
+		$xdb->cancel_transaction;
+	}
+}
+
+sub seen ($$) {
+	my ($xdb, $q) = @_; # $q = "Q$COMMIT_HASH"
+	$xdb->postlist_begin($q) != $xdb->postlist_end($q)
+}
+
+# used to select the shard for a GIT_DIR
+sub git_dir_hash ($) { hex(substr(sha256_hex($_[0]), 0, 8)) }
+
+sub docids_by_postlist ($$) { # consider moving to PublicInbox::Search
+	my ($self, $q) = @_;
+	my $cur = $self->{xdb}->postlist_begin($q);
+	my $end = $self->{xdb}->postlist_end($q);
+	my @ids;
+	for (; $cur != $end; $cur++) { push(@ids, $cur->get_docid) };
+	@ids;
+}
+
+sub get_roots ($$) {
+	my ($self, $refs) = @_;
+	my @roots = $self->{git}->qx([qw(rev-list --stdin --max-parents=0)],
+		undef, { 0 => $refs });
+	die "git rev-list \$?=$?" if $?;
+	sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
+	chomp(@roots);
+	scalar(@roots) ? \@roots : undef;
+}
+
+# this is different from the grokmirror-compatible fingerprint since we
+# only care about --heads (branches) and --tags, and not even their names
+sub cidx_fp ($) {
+	my ($self) = @_;
+	open my $refs, '+>', undef or die "open: $!";
+	run_die(['git', "--git-dir=$self->{git}->{git_dir}",
+		qw(show-ref --heads --tags --hash)], undef, { 1 => $refs });
+	seek($refs, 0, SEEK_SET) or die "seek: $!";
+	my $buf;
+	my $dig = PublicInbox::SHA->new(256);
+	while (read($refs, $buf, 65536)) { $dig->add($buf) }
+	sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
+	($dig->hexdigest, $refs);
+}
+
+# TODO: should we also index gitweb.owner and the full fingerprint for grokmirror?
+sub prep_git_dir ($) {
+	my ($self) = @_;
+	my $git_dir = $self->{git}->{git_dir};
+	my $ct = $self->{git}->qx([qw[for-each-ref
+		--sort=-committerdate --format=%(committerdate:raw) --count=1
+		refs/heads/ refs/tags/]]);
+	my $repo = {};
+	@$repo{qw(fp refs)} = cidx_fp($self);
+	$repo->{roots} = get_roots($self, $repo->{refs});
+	if (!$repo->{roots} || !defined($ct)) {
+		warn "W: $git_dir has no root commits, skipping\n";
+		return;
+	}
+	$ct =~ s/ .*\z//s; # drop TZ
+	$repo->{ct} = $ct + 0;
+	my $n = git_dir_hash($git_dir) % $self->{nshard};
+	my $shard = $repo->{shard} = bless { %$self, shard => $n }, ref($self);
+	delete @$shard{qw(lockfh lock_path)};
+	local $shard->{xdb};
+	my $xdb = $shard->idx_acquire;
+	my @docids = docids_by_postlist($shard, 'P'.$git_dir);
+	my $docid = shift(@docids) // return $repo;
+	if (@docids) {
+		warn "BUG: $git_dir indexed multiple times, culling\n";
+		$xdb->begin_transaction;
+		for (@docids) { $xdb->delete_document($_) }
+		$xdb->commit_transaction;
+	}
+	my $doc = $xdb->get_document($docid) //
+		die "BUG: no #$docid ($git_dir)";
+	my $old_fp = $doc->get_data;
+	if ($old_fp eq $repo->{fp}) { # no change
+		progress($self, 'unchanged');
+		return;
+	}
+	$repo->{id} = $docid;
+	$repo;
+}
+
+sub partition_refs ($$) {
+	my ($self, $refs) = @_; # show-ref --heads --tags --hash output
+	my $fh = $self->{git}->popen(qw(rev-list --stdin), undef,
+					{ 0 => $refs });
+	close $refs or die "close: $!";
+	local $self->{xdb};
+	my $xdb = $self->{-opt}->{reindex} ? undef : $self->xdb;
+	my ($seen, $nchange, $nshard) = (0, 0, $self->{nshard});
+	my @shard_in;
+	for (0..($nshard - 1)) {
+		open $shard_in[$_], '+>', undef or die "open: $!";
+	}
+	while (defined(my $cmt = <$fh>)) {
+		chomp $cmt;
+		if ($xdb && seen($xdb, 'Q'.$cmt)) {
+			last if ++$seen > $SEEN_MAX;
+		} else {
+			my $n = hex(substr($cmt, 0, 8)) % $nshard;
+			say { $shard_in[$n] } $cmt or die "say: $!";
+			++$nchange;
+			$seen = 0;
+		}
+	}
+	close($fh);
+	if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) {
+		$self->{nchange} += $nchange;
+		progress($self, "$nchange commits");
+		for my $fh (@shard_in) {
+			$fh->flush or die "flush: $!";
+			sysseek($fh, 0, SEEK_SET) or die "seek: $!";
+		}
+		return @shard_in;
+	}
+	die "git-rev-list: \$?=$?\n";
+}
+
+sub index_git_dir ($$) {
+	my ($self, $git_dir) = @_;
+	local $self->{git} = PublicInbox::Git->new($git_dir); # for ->patch_id
+	my $repo = prep_git_dir($self) or return;
+	local $self->{current_info} = $git_dir;
+	my @shard_in = partition_refs($self, delete($repo->{refs}));
+	my %pids;
+	my $fwd_kill = sub {
+		my ($sig) = @_;
+		kill($sig, $_) for keys %pids;
+	};
+	local $SIG{USR1} = $fwd_kill;
+	local $SIG{QUIT} = $fwd_kill;
+	local $SIG{INT} = $fwd_kill;
+	local $SIG{TERM} = $fwd_kill;
+	my $sigset = PublicInbox::DS::block_signals();
+	for (my $n = 0; $n <= $#shard_in; $n++) {
+		-s $shard_in[$n] or next;
+		my $pid = fork // die "fork: $!";
+		if ($pid == 0) { # no RNG use, here
+			$0 = "code index [$n]";
+			$self->{shard} = $n;
+			$self->{current_info} = "$self->{current_info} [$n]";
+			delete @$self{qw(lockfh lock_path)};
+			my $in = $shard_in[$n];
+			@shard_in = ();
+			$self->{roots} = delete $repo->{roots};
+			undef $repo;
+			eval { shard_worker($self, $in, $sigset) };
+			warn "E: $@" if $@;
+			POSIX::_exit($@ ? 1 : 0);
+		} else {
+			$pids{$pid} = "code index [$n]";
+		}
+	}
+	PublicInbox::DS::sig_setmask($sigset);
+	@shard_in = ();
+	my $err;
+	while (keys %pids) {
+		my $pid = waitpid(-1, 0) or last;
+		my $j = delete $pids{$pid} // "unknown PID:$pid";
+		next if $? == 0;
+		warn "PID:$pid $j exited with \$?=$?\n";
+		$err = 1;
+	}
+	die "subprocess(es) failed\n" if $err;
+	store_repo($self, $repo);
+	progress($self, 'done');
+	# TODO: check fp afterwards?
+}
+
+# for PublicInbox::SearchIdx::patch_id and with_umask
+sub git { $_[0]->{git} }
+
+sub load_existing ($) { # for -u/--update
+	my ($self) = @_;
+	my $dirs = $self->{git_dirs} // [];
+	if ($self->{-opt}->{update}) {
+		local $self->{xdb};
+		$self->xdb or
+			die "E: $self->{cidx_dir} non-existent for --update\n";
+		my @cur = $self->all_terms('P');
+		push @$dirs, @cur;
+	}
+	my %uniq; # List::Util::uniq requires Perl 5.26+
+	@$dirs = grep { !$uniq{$_}++ } @$dirs;
+}
+
+sub cidx_init ($) {
+	my ($self) = @_;
+	my $dir = $self->{cidx_dir};
+	unless (-d $dir) {
+		warn "# creating $dir\n" if !$self->{-opt}->{quiet};
+		File::Path::mkpath($dir);
+	}
+	for my $n (0..($self->{nshard} - 1)) {
+		my $shard = bless { %$self, shard => $n }, ref($self);
+		$shard->idx_acquire;
+	}
+	# this warning needs to happen after idx_acquire
+	state $once;
+	warn <<EOM if $PublicInbox::Search::X{CLOEXEC_UNSET} && !$once++;
+W: Xapian v1.2.21..v1.2.24 were missing close-on-exec on OFD locks,
+W: memory usage may be high for large indexing runs
+EOM
+}
+
+sub cidx_run {
+	my ($self) = @_;
+	cidx_init($self);
+	local $self->{current_info} = '';
+	my $cb = $SIG{__WARN__} || \&CORE::warn;
+	local $SIG{__WARN__} = sub {
+		my $m = shift @_;
+		$self->{current_info} eq '' or
+			$m =~ s/\A(#?\s*)/$1$self->{current_info}: /;
+		$cb->($m, @_);
+	};
+	$self->lock_acquire;
+	load_existing($self);
+	my @nc = grep { File::Spec->canonpath($_) ne $_ } @{$self->{git_dirs}};
+	if (@nc) {
+		warn "E: BUG? paths in $self->{cidx_dir} not canonicalized:\n";
+		for my $d (@{$self->{git_dirs}}) {
+			my $c = File::Spec->canonpath($_);
+			warn "E: $d => $c\n";
+			$d = $c;
+		}
+		warn "E: canonicalized and attempting to continue\n";
+	}
+	local $self->{nchange} = 0;
+	# do_prune($self) if $self->{-opt}->{prune}; TODO
+	if ($self->{-opt}->{scan} // 1) {
+		for my $gd (@{$self->{git_dirs}}) {
+			index_git_dir($self, $gd);
+		}
+	}
+	$self->lock_release(!!$self->{nchange});
+}
+
+1;
diff --git a/lib/PublicInbox/MiscIdx.pm b/lib/PublicInbox/MiscIdx.pm
index 19200b92..6708527d 100644
--- a/lib/PublicInbox/MiscIdx.pm
+++ b/lib/PublicInbox/MiscIdx.pm
@@ -5,7 +5,7 @@
 # Things indexed include:
 # * inboxes themselves
 # * epoch information
-# * (maybe) git code repository information
+# * (maybe) git code repository information (not commits)
 # Expect ~100K-1M documents with no parallelism opportunities,
 # so no sharding, here.
 #
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index 7aba2445..5133a3b7 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -110,43 +110,50 @@ sub load_xapian () {
 # a prefix common in patch emails
 our $LANG = 'english';
 
+our %PATCH_BOOL_COMMON = (
+	dfpre => 'XDFPRE',
+	dfpost => 'XDFPOST',
+	dfblob => 'XDFPRE XDFPOST',
+	patchid => 'XDFID',
+);
+
 # note: the non-X term prefix allocations are shared with
 # Xapian omega, see xapian-applications/omega/docs/termprefixes.rst
 my %bool_pfx_external = (
 	mid => 'Q', # Message-ID (full/exact), this is mostly uniQue
 	lid => 'G', # newsGroup (or similar entity), just inside <>
-	dfpre => 'XDFPRE',
-	dfpost => 'XDFPOST',
-	dfblob => 'XDFPRE XDFPOST',
-	patchid => 'XDFID',
+	%PATCH_BOOL_COMMON
 );
 
-my $non_quoted_body = 'XNQ XDFN XDFA XDFB XDFHH XDFCTX XDFPRE XDFPOST XDFID';
-my %prob_prefix = (
-	# for mairix compatibility
+# for mairix compatibility
+our $NON_QUOTED_BODY = 'XNQ XDFN XDFA XDFB XDFHH XDFCTX XDFPRE XDFPOST XDFID';
+our %PATCH_PROB_COMMON = (
 	s => 'S',
-	m => 'XM', # 'mid:' (bool) is exact, 'm:' (prob) can do partial
-	l => 'XL', # 'lid:' (bool) is exact, 'l:' (prob) can do partial
 	f => 'A',
-	t => 'XTO',
-	tc => 'XTO XCC',
-	c => 'XCC',
-	tcf => 'XTO XCC A',
-	a => 'XTO XCC A',
-	b => $non_quoted_body . ' XQUOT',
-	bs => $non_quoted_body . ' XQUOT S',
+	b => $NON_QUOTED_BODY . ' XQUOT',
+	bs => $NON_QUOTED_BODY . ' XQUOT S',
 	n => 'XFN',
 
 	q => 'XQUOT',
-	nq => $non_quoted_body,
+	nq => $NON_QUOTED_BODY,
 	dfn => 'XDFN',
 	dfa => 'XDFA',
 	dfb => 'XDFB',
 	dfhh => 'XDFHH',
 	dfctx => 'XDFCTX',
+);
 
+my %prob_prefix = (
+	m => 'XM', # 'mid:' (bool) is exact, 'm:' (prob) can do partial
+	l => 'XL', # 'lid:' (bool) is exact, 'l:' (prob) can do partial
+	t => 'XTO',
+	tc => 'XTO XCC',
+	c => 'XCC',
+	tcf => 'XTO XCC A',
+	a => 'XTO XCC A',
+	%PATCH_PROB_COMMON,
 	# default:
-	'' => 'XM S A XQUOT XFN ' . $non_quoted_body,
+	'' => 'XM S A XQUOT XFN ' . $NON_QUOTED_BODY,
 );
 
 # not documenting m: and mid: for now, the using the URLs works w/o Xapian
@@ -305,7 +312,7 @@ sub date_parse_prepare {
 				$x = "\0%Y%m%d%H%M%S$#$to_parse\0";
 			}
 		}
-	} else { # "rt", let git interpret "YYYY", deal with Y10K later :P
+	} else { # (rt|ct), let git interpret "YYYY", deal with Y10K later :P
 		for my $x (@r) {
 			next if $x eq '' || $x =~ /\A[0-9]{5,}\z/;
 			push @$to_parse, $x;
@@ -454,20 +461,24 @@ sub mset_to_smsg {
 # read-write
 sub stemmer { $X{Stem}->new($LANG) }
 
-# read-only
-sub qparse_new {
+sub qp_init_common {
 	my ($self) = @_;
-
-	my $xdb = xdb($self);
 	my $qp = $X{QueryParser}->new;
 	$qp->set_default_op(OP_AND());
-	$qp->set_database($xdb);
+	$qp->set_database(xdb($self));
 	$qp->set_stemmer(stemmer($self));
 	$qp->set_stemming_strategy(STEM_SOME());
 	my $cb = $qp->can('set_max_wildcard_expansion') //
 		$qp->can('set_max_expansion'); # Xapian 1.5.0+
 	$cb->($qp, 100);
-	$cb = $qp->can('add_valuerangeprocessor') //
+	$qp;
+}
+
+# read-only
+sub qparse_new {
+	my ($self) = @_;
+	my $qp = qp_init_common($self);
+	my $cb = $qp->can('add_valuerangeprocessor') //
 		$qp->can('add_rangeprocessor'); # Xapian 1.5.0+
 	$cb->($qp, $NVRP->new(YYYYMMDD, 'd:'));
 	$cb->($qp, $NVRP->new(DT, 'dt:'));
@@ -546,7 +557,7 @@ sub xap_terms ($$;@) {
 }
 
 # get combined docid from over.num:
-# (not generic Xapian, only works with our sharding scheme)
+# (not generic Xapian, only works with our sharding scheme for mail)
 sub num2docid ($$) {
 	my ($self, $num) = @_;
 	my $nshard = $self->{nshard};
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index fc464383..3baeaa9c 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -134,6 +134,7 @@ sub idx_acquire {
 		load_xapian_writable();
 		$flag = $self->{creat} ? $DB_CREATE_OR_OPEN : $DB_OPEN;
 	}
+	my $owner = $self->{ibx} // $self->{eidx} // $self;
 	if ($self->{creat}) {
 		require File::Path;
 		$self->lock_acquire;
@@ -145,14 +146,13 @@ sub idx_acquire {
 			File::Path::mkpath($dir);
 			require PublicInbox::Syscall;
 			PublicInbox::Syscall::nodatacow_dir($dir);
-			$self->{-set_has_threadid_once} = 1;
-			if (($self->{ibx} // $self->{eidx})->{-dangerous}) {
-				$flag |= $DB_DANGEROUS;
-			}
+			# owner == self for CodeSearchIdx
+			$self->{-set_has_threadid_once} = 1 if $owner != $self;
+			$flag |= $DB_DANGEROUS if $owner->{-dangerous};
 		}
 	}
 	return unless defined $flag;
-	$flag |= $DB_NO_SYNC if ($self->{ibx} // $self->{eidx})->{-no_fsync};
+	$flag |= $DB_NO_SYNC if $owner->{-no_fsync};
 	my $xdb = eval { ($X->{WritableDatabase})->new($dir, $flag) };
 	croak "Failed opening $dir: $@" if $@;
 	$self->{xdb} = $xdb;
@@ -350,43 +350,30 @@ sub index_diff ($$$) {
 	index_text($self, join("\n", @$xnq), 1, 'XNQ');
 }
 
-sub index_xapian { # msg_iter callback
-	my $part = $_[0]->[0]; # ignore $depth and $idx
-	my ($self, $doc) = @{$_[1]};
-	my $ct = $part->content_type || 'text/plain';
-	my $fn = $part->filename;
-	if (defined $fn && $fn ne '') {
-		index_phrase($self, $fn, 1, 'XFN');
-	}
-	if ($part->{is_submsg}) {
-		my $mids = mids_for_index($part);
-		index_ids($self, $doc, $part, $mids);
-		my $smsg = bless {}, 'PublicInbox::Smsg';
-		$smsg->populate($part);
-		index_headers($self, $smsg);
-	}
-
-	my ($s, undef) = msg_part_text($part, $ct);
-	defined $s or return;
-	$_[0]->[0] = $part = undef; # free memory
+sub patch_id {
+	my ($self) = @_; # $_[1] is the diff (may be huge)
+	open(my $fh, '+>:utf8', undef) or die "open: $!";
+	open(my $eh, '+>', undef) or die "open: $!";
+	$fh->autoflush(1);
+	print $fh $_[1] or die "print: $!";
+	sysseek($fh, 0, SEEK_SET) or die "sysseek: $!";
+	my $id = ($self->{ibx} // $self->{eidx} // $self)->git->qx(
+			[qw(patch-id --stable)], {}, { 0 => $fh, 2 => $eh });
+	seek($eh, 0, SEEK_SET) or die "seek: $!";
+	while (<$eh>) { warn $_ }
+	$id =~ /\A([a-f0-9]{40,})/ ? $1 : undef;
+}
 
-	if ($s =~ /^(?:diff|---|\+\+\+) /ms) {
-		open(my $fh, '+>:utf8', undef) or die "open: $!";
-		open(my $eh, '+>', undef) or die "open: $!";
-		$fh->autoflush(1);
-		print $fh $s or die "print: $!";
-		sysseek($fh, 0, SEEK_SET) or die "sysseek: $!";
-		my $id = ($self->{ibx} // $self->{eidx})->git->qx(
-						[qw(patch-id --stable)],
-						{}, { 0 => $fh, 2 => $eh });
-		$id =~ /\A([a-f0-9]{40,})/ and $doc->add_term('XDFID'.$1);
-		seek($eh, 0, SEEK_SET) or die "seek: $!";
-		while (<$eh>) { warn $_ }
+sub index_body_text {
+	my ($self, $doc, $sref) = @_;
+	if ($$sref =~ /^(?:diff|---|\+\+\+) /ms) {
+		my $id = patch_id($self, $$sref);
+		$doc->add_term('XDFID'.$id) if defined($id);
 	}
 
 	# split off quoted and unquoted blocks:
-	my @sections = PublicInbox::MsgIter::split_quotes($s);
-	undef $s; # free memory
+	my @sections = PublicInbox::MsgIter::split_quotes($$sref);
+	undef $$sref; # free memory
 	for my $txt (@sections) {
 		if ($txt =~ /\A>/) {
 			if ($txt =~ /^[>\t ]+GIT binary patch\r?/sm) {
@@ -396,8 +383,7 @@ sub index_xapian { # msg_iter callback
 					(?:[>\h]+$BASE85\h*\r?\n)+/$1/gsmx;
 			}
 			index_text($self, $txt, 0, 'XQUOT');
-		} else {
-			# does it look like a diff?
+		} else { # does it look like a diff?
 			if ($txt =~ /^(?:diff|---|\+\+\+) /ms) {
 				index_diff($self, \$txt, $doc);
 			} else {
@@ -408,6 +394,28 @@ sub index_xapian { # msg_iter callback
 	}
 }
 
+sub index_xapian { # msg_iter callback
+	my $part = $_[0]->[0]; # ignore $depth and $idx
+	my ($self, $doc) = @{$_[1]};
+	my $ct = $part->content_type || 'text/plain';
+	my $fn = $part->filename;
+	if (defined $fn && $fn ne '') {
+		index_phrase($self, $fn, 1, 'XFN');
+	}
+	if ($part->{is_submsg}) {
+		my $mids = mids_for_index($part);
+		index_ids($self, $doc, $part, $mids);
+		my $smsg = bless {}, 'PublicInbox::Smsg';
+		$smsg->populate($part);
+		index_headers($self, $smsg);
+	}
+
+	my ($s, undef) = msg_part_text($part, $ct);
+	defined $s or return;
+	$_[0]->[0] = $part = undef; # free memory
+	index_body_text($self, $doc, \$s);
+}
+
 sub index_list_id ($$$) {
 	my ($self, $doc, $hdr) = @_;
 	for my $l ($hdr->header_raw('List-Id')) {
diff --git a/lib/PublicInbox/TestCommon.pm b/lib/PublicInbox/TestCommon.pm
index ed28ac48..494323c0 100644
--- a/lib/PublicInbox/TestCommon.pm
+++ b/lib/PublicInbox/TestCommon.pm
@@ -21,6 +21,7 @@ BEGIN {
 	@EXPORT = qw(tmpdir tcp_server tcp_connect require_git require_mods
 		run_script start_script key2sub xsys xsys_e xqx eml_load tick
 		have_xapian_compact json_utf8 setup_public_inboxes create_inbox
+		create_coderepo
 		tcp_host_port test_lei lei lei_ok $lei_out $lei_err $lei_opt
 		test_httpd xbail require_cmd is_xdeeply tail_f
 		ignore_inline_c_missing);
@@ -325,7 +326,7 @@ sub run_script ($;$$) {
 		}
 	}
 	my $tail = @tail_paths ? tail_f(@tail_paths) : undef;
-	if ($key =~ /-(index|convert|extindex|convert|xcpdb)\z/) {
+	if ($key =~ /-(index|cindex|extindex|convert|xcpdb)\z/) {
 		unshift @argv, '--no-fsync';
 	}
 	if ($run_mode == 0) {
@@ -698,6 +699,44 @@ sub setup_public_inboxes () {
 	@ret;
 }
 
+our %COMMIT_ENV = (
+	GIT_AUTHOR_NAME => 'A U Thor',
+	GIT_COMMITTER_NAME => 'C O Mitter',
+	GIT_AUTHOR_EMAIL => 'a@example.com',
+	GIT_COMMITTER_EMAIL => 'c@example.com',
+);
+
+sub create_coderepo ($$;@) {
+	my $ident = shift;
+	my $cb = pop;
+	my %opt = @_;
+	require PublicInbox::Lock;
+	require PublicInbox::Import;
+	my ($base) = ($0 =~ m!\b([^/]+)\.[^\.]+\z!);
+	my ($db) = (PublicInbox::Import::default_branch() =~ m!([^/]+)\z!);
+	my $dir = "t/data-gen/$base.$ident-$db";
+	my $new = !-d $dir;
+	if ($new && !mkdir($dir)) {
+		my $err = $!;
+		-d $dir or xbail "mkdir($dir): $err";
+	}
+	my $lk = bless { lock_path => "$dir/creat.lock" }, 'PublicInbox::Lock';
+	my $scope = $lk->lock_for_scope;
+	my $tmpdir = delete $opt{tmpdir};
+	if (!-f "$dir/creat.stamp") {
+		opendir(my $dfh, '.') or xbail "opendir .: $!";
+		chdir($dir) or xbail "chdir($dir): $!";
+		local %ENV = (%ENV, %COMMIT_ENV);
+		$cb->($dir);
+		chdir($dfh) or xbail "cd -: $!";
+		open my $s, '>', "$dir/creat.stamp" or
+			BAIL_OUT "error creating $dir/creat.stamp: $!";
+	}
+	return $dir if !defined($tmpdir);
+	xsys_e([qw(/bin/cp -Rp), $dir, $tmpdir]);
+	$tmpdir;
+}
+
 sub create_inbox ($$;@) {
 	my $ident = shift;
 	my $cb = pop;
diff --git a/script/public-inbox-cindex b/script/public-inbox-cindex
new file mode 100755
index 00000000..166c8261
--- /dev/null
+++ b/script/public-inbox-cindex
@@ -0,0 +1,75 @@
+#!perl -w
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use v5.12;
+use Getopt::Long qw(:config gnu_getopt no_ignore_case auto_abbrev);
+my $help = <<EOF; # the following should fit w/o scrolling in 80x24 term:
+usage: public-inbox-cindex [options] GIT_DIR...
+usage: public-inbox-cindex [options] --project-list=FILE PROJECT_ROOT
+
+  Create and update search indices for code repos
+
+  -d EXTDIR           use EXTDIR instead of GIT_DIR/public-inbox-cindex
+  --no-fsync          speed up indexing, risk corruption on power outage
+  -L LEVEL            `medium', or `full' (default: medium)
+  --project-list=FILE use a cgit/gitweb-compatible list of projects
+  --update | -u       update previously-indexed code repos with `-d'
+  --jobs=NUM          set or disable parallelization (NUM=0)
+  --batch-size=BYTES  flush changes to OS after a given number of bytes
+  --prune             prune old repos and commits
+  --reindex           reindex previously indexed repos
+  --verbose | -v      increase verbosity (may be repeated)
+
+BYTES may use `k', `m', and `g' suffixes (e.g. `10m' for 10 megabytes)
+See public-inbox-cindex(1) man page for full documentation.
+EOF
+my $opt = { fsync => 1, scan => 1 }; # --no-scan is hidden
+GetOptions($opt, qw(quiet|q verbose|v+ reindex jobs|j=i fsync|sync! dangerous
+		indexlevel|index-level|L=s batch_size|batch-size=s
+		project-list=s
+		d=s update|u scan! prune dry-run|n C=s@ help|h))
+	or die $help;
+if ($opt->{help}) { print $help; exit 0 };
+die "--jobs must be >= 0\n" if defined $opt->{jobs} && $opt->{jobs} < 0;
+require IO::Handle;
+STDOUT->autoflush(1);
+STDERR->autoflush(1);
+local $SIG{USR1} = 'IGNORE'; # to be overridden in cidx_sync
+# require lazily to speed up --help
+require PublicInbox::Admin;
+PublicInbox::Admin::do_chdir(delete $opt->{C});
+my $cfg = PublicInbox::Config->new;
+my $cidx_dir = $opt->{d};
+PublicInbox::Admin::require_or_die('Search::Xapian');
+PublicInbox::Admin::progress_prepare($opt);
+my $env = PublicInbox::Admin::index_prepare($opt, $cfg);
+%ENV = (%ENV, %$env) if $env;
+
+require PublicInbox::CodeSearchIdx; # unstable internal API
+my @git_dirs;
+if (defined(my $pl = $opt->{'project-list'})) {
+	my $pfx = shift @ARGV // die <<EOM;
+PROJECTS_ROOT required for --project-list
+EOM
+	open my $fh, '<', $pl or die "open($pl): $!\n";
+	chomp(@git_dirs = <$fh>);
+	$_ = PublicInbox::Admin::resolve_git_dir("$pfx/$_") for @git_dirs;
+} else {
+	@git_dirs = map { PublicInbox::Admin::resolve_git_dir($_) } @ARGV;
+}
+if (defined $cidx_dir) { # external index
+	die "`%' is not allowed in $cidx_dir\n" if $cidx_dir =~ /\%/;
+	my $cidx = PublicInbox::CodeSearchIdx->new($cidx_dir, $opt);
+	@{$cidx->{git_dirs}} = @git_dirs; # may be empty
+	$cidx->cidx_run;
+} elsif (!@git_dirs) {
+	die $help
+} else {
+	for my $gd (@git_dirs) {
+		my $cd = "$gd/public-inbox-cindex";
+		my $cidx = PublicInbox::CodeSearchIdx->new($cd, { %$opt });
+		$cidx->{-internal} = 1;
+		@{$cidx->{git_dirs}} = ($gd);
+		$cidx->cidx_run;
+	}
+}
diff --git a/t/cindex.t b/t/cindex.t
new file mode 100644
index 00000000..c93e4e4e
--- /dev/null
+++ b/t/cindex.t
@@ -0,0 +1,98 @@
+#!perl -w
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use v5.12;
+use PublicInbox::TestCommon;
+use Cwd qw(getcwd abs_path);
+require_mods(qw(json Search::Xapian));
+use_ok 'PublicInbox::CodeSearchIdx';
+require PublicInbox::Import;
+my ($tmp, $for_destroy) = tmpdir();
+my $pwd = getcwd();
+
+# I reworked CodeSearchIdx->shard_worker to handle empty trees
+# in the initial commit generated by cvs2svn for xapian.git
+create_coderepo 'empty-tree-root', tmpdir => "$tmp/wt0", sub {
+	xsys_e([qw(/bin/sh -c), <<'EOM']);
+git init -q &&
+tree=$(git mktree </dev/null) &&
+head=$(git symbolic-ref HEAD) &&
+cmt=$(echo 'empty root' | git commit-tree $tree) &&
+git update-ref $head $cmt &&
+echo hi >f &&
+git add f &&
+git commit -q -m hi &&
+git gc -q
+EOM
+}; # /create_coderepo
+
+ok(run_script([qw(-cindex --dangerous -q), "$tmp/wt0"]), 'cindex internal');
+ok(-e "$tmp/wt0/.git/public-inbox-cindex/cidx.lock", 'internal dir created');
+
+
+# it's possible for git to emit NUL characters in diffs
+# (see c4201214cbf10636e2c1ab9131573f735b42c8d4 in linux.git)
+my $zp = create_coderepo 'NUL in patch', sub {
+	require PublicInbox::Git;
+	my $src = PublicInbox::Git::try_cat("$pwd/COPYING");
+	xsys_e([qw(git init -q)]);
+
+	# needs to be further than FIRST_FEW_BYTES (8000) in git.git
+	$src =~ s/\b(Limitation of Liability\.)\n\n/$1\n\0\n/s or
+		xbail "BUG: no `\\n\\n' in $pwd/COPYING";
+
+	open my $fh, '>', 'f' or xbail "open: $!";
+	print $fh $src or xbail "print: $!";
+	close $fh or xbail "close: $!";
+	xsys_e([qw(/bin/sh -c), <<'EOM']);
+git add f &&
+git commit -q -m 'initial with NUL character'
+EOM
+	$src =~ s/\n\0\n/\n\n/ or xbail "BUG: no `\\n\\0\\n'";
+	open $fh, '>', 'f' or xbail "open: $!";
+	print $fh $src or xbail "print: $!";
+	close $fh or xbail "close: $!";
+	xsys_e([qw(/bin/sh -c), <<'EOM']);
+git add f &&
+git commit -q -m 'remove NUL character' &&
+git gc -q
+EOM
+}; # /create_coderepo
+
+ok(run_script([qw(-cindex --dangerous -q -d), "$tmp/ext", $zp, "$tmp/wt0"]),
+	'cindex external');
+ok(-e "$tmp/ext/cidx.lock", 'external dir created');
+ok(!-d "$zp/.git/public-inbox-cindex", 'no cindex in original coderepo');
+
+use_ok 'PublicInbox::CodeSearch';
+if ('multi-repo search') {
+	my $csrch = PublicInbox::CodeSearch->new("$tmp/ext");
+	my $mset = $csrch->mset('NUL');
+	is(scalar($mset->items), 2, 'got results');
+	my $exp = [ 'initial with NUL character', 'remove NUL character' ];
+	my @have = sort(map { $_->get_document->get_data } $mset->items);
+	is_xdeeply(\@have, $exp, 'got expected subjects');
+
+	$mset = $csrch->mset('NUL', { git_dir => "$tmp/wt0/.git" });
+	is(scalar($mset->items), 0, 'no results with other GIT_DIR');
+
+	$mset = $csrch->mset('NUL', { git_dir => abs_path("$zp/.git") });
+	@have = sort(map { $_->get_document->get_data } $mset->items);
+	is_xdeeply(\@have, $exp, 'got expected subjects w/ GIT_DIR filter');
+}
+
+if ('--update') {
+	my $csrch = PublicInbox::CodeSearch->new("$tmp/ext");
+	my $mset = $csrch->mset('dfn:for-update');
+	is(scalar($mset->items), 0, 'no result before update');
+
+	my $e = \%PublicInbox::TestCommon::COMMIT_ENV;
+	xsys_e([qw(/bin/sh -c), <<'EOM'], $e, { -C => "$tmp/wt0" });
+>for-update && git add for-update && git commit -q -m updated
+EOM
+	ok(run_script([qw(-cindex -qu -d), "$tmp/ext"]), '-cindex -u');
+	$mset = $csrch->reopen->mset('dfn:for-update');
+	is(scalar($mset->items), 1, 'got updated result');
+}
+
+done_testing;

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 07/28] cindex: parallelize prep phases
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (4 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 06/28] codesearch: initial cut w/ -cindex tool Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 08/28] cindex: use read-only shards during " Eric Wong
                     ` (20 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

Listing refs, fingerprinting and root scanning can all be
parallelized to reduce runtime on SMP systems.

We'll use DESTROY-based dependency management with
parallelizagion as in LeiMirror to handle ref listing and
fingerprinting before serializing Xapian DB access to check
against the existing fingerprint.

We'll also delay root listing until we get a fingerprint
mismatch to speed up no-op indexing.
---
 lib/PublicInbox/CodeSearchIdx.pm | 197 +++++++++++++++++++++----------
 1 file changed, 132 insertions(+), 65 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 218338da..a926886e 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -26,7 +26,10 @@ use PublicInbox::SHA qw(sha256_hex);
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::SearchIdx qw(add_val);
 use PublicInbox::Config;
-use PublicInbox::Spawn qw(run_die);
+use PublicInbox::Spawn qw(spawn);
+use PublicInbox::OnDestroy;
+our $LIVE; # pid => callback
+our $LIVE_JOBS;
 
 # stop walking history if we see >$SEEN_MAX existing commits, this assumes
 # branches don't diverge by more than this number of commits...
@@ -106,26 +109,27 @@ sub progress {
 	$pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n");
 }
 
-sub store_repo ($$) {
-	my ($self, $repo) = @_;
+sub store_repo ($$$) {
+	my ($self, $git, $repo) = @_;
 	my $xdb = delete($repo->{shard})->idx_acquire;
 	$xdb->begin_transaction;
+	for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed?
 	if (defined $repo->{id}) {
 		my $doc = $xdb->get_document($repo->{id}) //
-			die "$self->{git}->{git_dir} doc #$repo->{id} gone";
+			die "$git->{git_dir} doc #$repo->{id} gone";
 		add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct});
-		my %new = map { $_ => undef } @{$self->{roots}};
+		my %new = map { $_ => undef } @{$repo->{roots}};
 		my $old = xap_terms('G', $doc);
 		delete @new{keys %$old};
 		$doc->add_boolean_term('G'.$_) for keys %new;
-		delete @$old{@{$self->{roots}}};
+		delete @$old{@{$repo->{roots}}};
 		$doc->remove_term('G'.$_) for keys %$old;
 		$doc->set_data($repo->{fp});
 		$xdb->replace_document($repo->{id}, $doc);
 	} else {
 		my $new = $PublicInbox::Search::X{Document}->new;
 		add_val($new, PublicInbox::CodeSearch::CT, $repo->{ct});
-		$new->add_boolean_term("P$self->{git}->{git_dir}");
+		$new->add_boolean_term("P$git->{git_dir}");
 		$new->add_boolean_term('T'.'r');
 		$new->add_boolean_term('G'.$_) for @{$repo->{roots}};
 		$new->set_data($repo->{fp}); # \n delimited
@@ -201,75 +205,98 @@ sub docids_by_postlist ($$) { # consider moving to PublicInbox::Search
 	@ids;
 }
 
-sub get_roots ($$) {
-	my ($self, $refs) = @_;
-	my @roots = $self->{git}->qx([qw(rev-list --stdin --max-parents=0)],
-		undef, { 0 => $refs });
-	die "git rev-list \$?=$?" if $?;
-	sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
-	chomp(@roots);
-	scalar(@roots) ? \@roots : undef;
+sub cidx_reap ($$) {
+	my ($self, $jobs) = @_;
+	while (keys(%$LIVE) >= $jobs) {
+		my $pid = waitpid(-1, 0) // die "waitpid(-1): $!";
+		last if $pid < 0;
+		if (my $x = delete $LIVE->{$pid}) {
+			my $cb = shift @$x;
+			$cb->(@$x) if $cb;
+		} else {
+			warn "reaped unknown PID=$pid ($?)\n";
+		}
+	}
 }
 
 # this is different from the grokmirror-compatible fingerprint since we
 # only care about --heads (branches) and --tags, and not even their names
-sub cidx_fp ($) {
-	my ($self) = @_;
+sub fp_start ($$$) {
+	my ($self, $git, $prep_repo) = @_;
+	return if !$LIVE; # premature exit
+	cidx_reap($self, $LIVE_JOBS);
 	open my $refs, '+>', undef or die "open: $!";
-	run_die(['git', "--git-dir=$self->{git}->{git_dir}",
+	my $pid = spawn(['git', "--git-dir=$git->{git_dir}",
 		qw(show-ref --heads --tags --hash)], undef, { 1 => $refs });
+	$git->{-repo}->{refs} = $refs;
+	$LIVE->{$pid} = [ \&fp_fini, $self, $git, $prep_repo ];
+}
+
+sub fp_fini {
+	my ($self, $git, $prep_repo) = @_;
+	my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
 	seek($refs, 0, SEEK_SET) or die "seek: $!";
 	my $buf;
 	my $dig = PublicInbox::SHA->new(256);
 	while (read($refs, $buf, 65536)) { $dig->add($buf) }
-	sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
-	($dig->hexdigest, $refs);
+	$git->{-repo}->{fp} = $dig->hexdigest;
 }
 
-# TODO: should we also index gitweb.owner and the full fingerprint for grokmirror?
-sub prep_git_dir ($) {
-	my ($self) = @_;
-	my $git_dir = $self->{git}->{git_dir};
-	my $ct = $self->{git}->qx([qw[for-each-ref
-		--sort=-committerdate --format=%(committerdate:raw) --count=1
+sub ct_start ($$$) {
+	my ($self, $git, $prep_repo) = @_;
+	return if !$LIVE; # premature exit
+	cidx_reap($self, $LIVE_JOBS);
+	my ($rd, $pid) = $git->popen([qw[for-each-ref --sort=-committerdate
+		--format=%(committerdate:raw) --count=1
 		refs/heads/ refs/tags/]]);
-	my $repo = {};
-	@$repo{qw(fp refs)} = cidx_fp($self);
-	$repo->{roots} = get_roots($self, $repo->{refs});
-	if (!$repo->{roots} || !defined($ct)) {
-		warn "W: $git_dir has no root commits, skipping\n";
+	$LIVE->{$pid} = [ \&ct_fini, $self, $git, $rd, $prep_repo ];
+}
+
+sub ct_fini {
+	my ($self, $git, $rd, $prep_repo) = @_;
+	defined(my $ct = <$rd>) or return;
+	$ct =~ s/\s+.*\z//s; # drop TZ + LF
+	$git->{-repo}->{ct} = $ct + 0;
+}
+
+# TODO: also index gitweb.owner and the full fingerprint for grokmirror?
+sub prep_repo ($$) {
+	my ($self, $git) = @_;
+	return if !$LIVE; # premature exit
+	my $repo = $git->{-repo} // die 'BUG: no {-repo}';
+	my $git_dir = $git->{git_dir};
+	if (!defined($repo->{ct})) {
+		warn "W: $git_dir has no commits, skipping\n";
+		delete $git->{-repo};
 		return;
 	}
-	$ct =~ s/ .*\z//s; # drop TZ
-	$repo->{ct} = $ct + 0;
 	my $n = git_dir_hash($git_dir) % $self->{nshard};
 	my $shard = $repo->{shard} = bless { %$self, shard => $n }, ref($self);
 	delete @$shard{qw(lockfh lock_path)};
 	local $shard->{xdb};
 	my $xdb = $shard->idx_acquire;
 	my @docids = docids_by_postlist($shard, 'P'.$git_dir);
-	my $docid = shift(@docids) // return $repo;
+	my $docid = shift(@docids) // return get_roots($self, $git);
 	if (@docids) {
 		warn "BUG: $git_dir indexed multiple times, culling\n";
-		$xdb->begin_transaction;
-		for (@docids) { $xdb->delete_document($_) }
-		$xdb->commit_transaction;
+		$repo->{to_delete} = \@docids; # XXX needed?
 	}
 	my $doc = $xdb->get_document($docid) //
 		die "BUG: no #$docid ($git_dir)";
 	my $old_fp = $doc->get_data;
 	if ($old_fp eq $repo->{fp}) { # no change
-		progress($self, 'unchanged');
+		progress($self, "$git_dir unchanged");
+		delete $git->{-repo};
 		return;
 	}
 	$repo->{id} = $docid;
-	$repo;
+	get_roots($self, $git);
 }
 
-sub partition_refs ($$) {
-	my ($self, $refs) = @_; # show-ref --heads --tags --hash output
-	my $fh = $self->{git}->popen(qw(rev-list --stdin), undef,
-					{ 0 => $refs });
+sub partition_refs ($$$) {
+	my ($self, $git, $refs) = @_; # show-ref --heads --tags --hash output
+	sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
+	my $fh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
 	close $refs or die "close: $!";
 	local $self->{xdb};
 	my $xdb = $self->{-opt}->{reindex} ? undef : $self->xdb;
@@ -292,22 +319,27 @@ sub partition_refs ($$) {
 	close($fh);
 	if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) {
 		$self->{nchange} += $nchange;
-		progress($self, "$nchange commits");
+		progress($self, "$git->{git_dir}: $nchange commits");
 		for my $fh (@shard_in) {
 			$fh->flush or die "flush: $!";
 			sysseek($fh, 0, SEEK_SET) or die "seek: $!";
 		}
 		return @shard_in;
 	}
-	die "git-rev-list: \$?=$?\n";
+	die "git --git-dir=$git->{git_dir} rev-list: \$?=$?\n";
 }
 
-sub index_git_dir ($$) {
-	my ($self, $git_dir) = @_;
-	local $self->{git} = PublicInbox::Git->new($git_dir); # for ->patch_id
-	my $repo = prep_git_dir($self) or return;
-	local $self->{current_info} = $git_dir;
-	my @shard_in = partition_refs($self, delete($repo->{refs}));
+sub index_repo {
+	my ($self, $git, $roots) = @_;
+	return if !$LIVE; # premature exit
+	my $repo = delete $git->{-repo} or return;
+	seek($roots, 0, SEEK_SET) or die "seek: $!";
+	chomp(my @roots = <$roots>);
+	close($roots) or die "close: $!";
+	@roots or return warn("E: $git->{git_dir} has no root commits\n");
+	$repo->{roots} = \@roots;
+	local $self->{current_info} = $git->{git_dir};
+	my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
 	my %pids;
 	my $fwd_kill = sub {
 		my ($sig) = @_;
@@ -323,12 +355,13 @@ sub index_git_dir ($$) {
 		my $pid = fork // die "fork: $!";
 		if ($pid == 0) { # no RNG use, here
 			$0 = "code index [$n]";
+			$self->{git} = $git;
 			$self->{shard} = $n;
 			$self->{current_info} = "$self->{current_info} [$n]";
 			delete @$self{qw(lockfh lock_path)};
 			my $in = $shard_in[$n];
 			@shard_in = ();
-			$self->{roots} = delete $repo->{roots};
+			$self->{roots} = \@roots;
 			undef $repo;
 			eval { shard_worker($self, $in, $sigset) };
 			warn "E: $@" if $@;
@@ -339,18 +372,41 @@ sub index_git_dir ($$) {
 	}
 	PublicInbox::DS::sig_setmask($sigset);
 	@shard_in = ();
-	my $err;
+	my ($err, @todo);
 	while (keys %pids) {
-		my $pid = waitpid(-1, 0) or last;
-		my $j = delete $pids{$pid} // "unknown PID:$pid";
-		next if $? == 0;
-		warn "PID:$pid $j exited with \$?=$?\n";
-		$err = 1;
+		my $pid = waitpid(-1, 0) // die "waitpid: $!";
+		if (my $j = delete $pids{$pid}) {
+			next if $? == 0;
+			warn "PID:$pid $j exited with \$?=$?\n";
+			$err = 1;
+		} elsif (my $todo = delete $LIVE->{$pid}) {
+			warn "PID:$pid exited with \$?=$?\n" if $?;
+			push @todo, $todo;
+		} else {
+			warn "reaped unknown PID=$pid ($?)\n";
+		}
 	}
 	die "subprocess(es) failed\n" if $err;
-	store_repo($self, $repo);
-	progress($self, 'done');
+	store_repo($self, $git, $repo);
+	progress($self, "$git->{git_dir}: done");
 	# TODO: check fp afterwards?
+	while (my $x = shift @todo) {
+		my $cb = shift @$x;
+		$cb->(@$x) if $cb;
+	}
+}
+
+sub get_roots ($$) {
+	my ($self, $git) = @_;
+	return if !$LIVE; # premature exit
+	cidx_reap($self, $LIVE_JOBS);
+	my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
+	sysseek($refs, 0, SEEK_SET) or die "seek: $!";
+	open my $roots, '+>', undef or die "open: $!";
+	my $pid = spawn(['git', "--git-dir=$git->{git_dir}",
+			qw(rev-list --stdin --max-parents=0)],
+			undef, { 0 => $refs, 1 => $roots });
+	$LIVE->{$pid} = [ \&index_repo, $self, $git, $roots ];
 }
 
 # for PublicInbox::SearchIdx::patch_id and with_umask
@@ -389,6 +445,21 @@ W: memory usage may be high for large indexing runs
 EOM
 }
 
+sub scan_git_dirs ($) {
+	my ($self) = @_;
+	local $LIVE_JOBS = $self->{-opt}->{jobs} //
+			PublicInbox::IPC::detect_nproc() // 2;
+	local $LIVE = {};
+	for (@{$self->{git_dirs}}) {
+		my $git = PublicInbox::Git->new($_);
+		my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo,
+							$self, $git);
+		fp_start($self, $git, $prep_repo);
+		ct_start($self, $git, $prep_repo);
+	}
+	cidx_reap($self, 0);
+}
+
 sub cidx_run {
 	my ($self) = @_;
 	cidx_init($self);
@@ -414,11 +485,7 @@ sub cidx_run {
 	}
 	local $self->{nchange} = 0;
 	# do_prune($self) if $self->{-opt}->{prune}; TODO
-	if ($self->{-opt}->{scan} // 1) {
-		for my $gd (@{$self->{git_dirs}}) {
-			index_git_dir($self, $gd);
-		}
-	}
+	scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
 	$self->lock_release(!!$self->{nchange});
 }
 

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 08/28] cindex: use read-only shards during prep phases
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (5 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 07/28] cindex: parallelize prep phases Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 09/28] searchidxshard: improve comment wording Eric Wong
                     ` (19 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

No need to open shards for read/write access when read-only
will do.  Since we also control how a document gets sharded,
we'll also access the shard directly instead of letting Xapian
do the mappings.

--reindex didn't work properly before this change since it was
over-indexing.  It is now broken in the opposite way in that it
doesn't do reindexing at all.  --reindex will be implemented
properly in the future.
---
 lib/PublicInbox/CodeSearchIdx.pm | 26 ++++++++++++++------------
 1 file changed, 14 insertions(+), 12 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index a926886e..02c9ed84 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -30,6 +30,7 @@ use PublicInbox::Spawn qw(spawn);
 use PublicInbox::OnDestroy;
 our $LIVE; # pid => callback
 our $LIVE_JOBS;
+our @XDB_SHARDS_FLAT;
 
 # stop walking history if we see >$SEEN_MAX existing commits, this assumes
 # branches don't diverge by more than this number of commits...
@@ -273,9 +274,9 @@ sub prep_repo ($$) {
 	my $n = git_dir_hash($git_dir) % $self->{nshard};
 	my $shard = $repo->{shard} = bless { %$self, shard => $n }, ref($self);
 	delete @$shard{qw(lockfh lock_path)};
-	local $shard->{xdb};
-	my $xdb = $shard->idx_acquire;
-	my @docids = docids_by_postlist($shard, 'P'.$git_dir);
+	my $xdb = $XDB_SHARDS_FLAT[$n] // die "BUG: shard[$n] undef";
+	$xdb->reopen;
+	my @docids = docids_by_postlist({ xdb => $xdb }, 'P'.$git_dir);
 	my $docid = shift(@docids) // return get_roots($self, $git);
 	if (@docids) {
 		warn "BUG: $git_dir indexed multiple times, culling\n";
@@ -298,19 +299,19 @@ sub partition_refs ($$$) {
 	sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
 	my $fh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
 	close $refs or die "close: $!";
-	local $self->{xdb};
-	my $xdb = $self->{-opt}->{reindex} ? undef : $self->xdb;
-	my ($seen, $nchange, $nshard) = (0, 0, $self->{nshard});
-	my @shard_in;
-	for (0..($nshard - 1)) {
-		open $shard_in[$_], '+>', undef or die "open: $!";
-	}
+	my ($seen, $nchange) = (0, 0);
+	my @shard_in = map {
+		$_->reopen;
+		open my $fh, '+>', undef or die "open: $!";
+		$fh;
+	} @XDB_SHARDS_FLAT;
+
 	while (defined(my $cmt = <$fh>)) {
 		chomp $cmt;
-		if ($xdb && seen($xdb, 'Q'.$cmt)) {
+		my $n = hex(substr($cmt, 0, 8)) % scalar(@XDB_SHARDS_FLAT);
+		if (seen($XDB_SHARDS_FLAT[$n], 'Q'.$cmt)) {
 			last if ++$seen > $SEEN_MAX;
 		} else {
-			my $n = hex(substr($cmt, 0, 8)) % $nshard;
 			say { $shard_in[$n] } $cmt or die "say: $!";
 			++$nchange;
 			$seen = 0;
@@ -450,6 +451,7 @@ sub scan_git_dirs ($) {
 	local $LIVE_JOBS = $self->{-opt}->{jobs} //
 			PublicInbox::IPC::detect_nproc() // 2;
 	local $LIVE = {};
+	local @XDB_SHARDS_FLAT = $self->xdb_shards_flat;
 	for (@{$self->{git_dirs}}) {
 		my $git = PublicInbox::Git->new($_);
 		my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo,

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 09/28] searchidxshard: improve comment wording
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (6 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 08/28] cindex: use read-only shards during " Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 10/28] cindex: use DS and workqueues for parallelism Eric Wong
                     ` (18 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

Just something I noticed while considering using this package
for CodeSearchIdx.
---
 lib/PublicInbox/SearchIdxShard.pm | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
index 000abd94..831be51b 100644
--- a/lib/PublicInbox/SearchIdxShard.pm
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -1,11 +1,10 @@
-# Copyright (C) 2018-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # Internal interface for a single Xapian shard in V2 inboxes.
 # See L<public-inbox-v2-format(5)> for more info on how we shard Xapian
 package PublicInbox::SearchIdxShard;
-use strict;
-use v5.10.1;
+use v5.12;
 use parent qw(PublicInbox::SearchIdx PublicInbox::IPC);
 use PublicInbox::OnDestroy;
 
@@ -47,7 +46,7 @@ sub ipc_atfork_child { # called automatically before ipc_worker_loop
 	$v2w->atfork_child; # calls ipc_sibling_atfork_child on our siblings
 	$v2w->{current_info} = "[$self->{shard}]"; # for $SIG{__WARN__}
 	$self->begin_txn_lazy;
-	# caller must capture this:
+	# caller (ipc_worker_spawn) must capture this:
 	PublicInbox::OnDestroy->new($$, \&_worker_done, $self);
 }
 

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 10/28] cindex: use DS and workqueues for parallelism
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (7 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 09/28] searchidxshard: improve comment wording Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 11/28] ds: @post_loop_do replaces SetPostLoopCallback Eric Wong
                     ` (17 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

This avoids forking new shard processes for each repo we scan,
but we can't avoid many excessive commits since we need to
ensure the `seen()' sub can avoid excessive work.
---
 lib/PublicInbox/CodeSearchIdx.pm | 374 ++++++++++++++++++++-----------
 1 file changed, 240 insertions(+), 134 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 02c9ed84..13fe1c28 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -14,9 +14,11 @@
 # See PublicInbox::CodeSearch (read-only API) for more
 package PublicInbox::CodeSearchIdx;
 use v5.12;
-use parent qw(PublicInbox::Lock PublicInbox::CodeSearch PublicInbox::SearchIdx);
+# parent order matters, we want ->DESTROY from IPC, not SearchIdx
+use parent qw(PublicInbox::CodeSearch PublicInbox::IPC PublicInbox::SearchIdx);
 use PublicInbox::Eml;
-use PublicInbox::DS ();
+use PublicInbox::DS qw(awaitpid);
+use PublicInbox::PktOp;
 use PublicInbox::IPC qw(nproc_shards);
 use PublicInbox::Admin;
 use POSIX qw(WNOHANG SEEK_SET);
@@ -26,11 +28,19 @@ use PublicInbox::SHA qw(sha256_hex);
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::SearchIdx qw(add_val);
 use PublicInbox::Config;
-use PublicInbox::Spawn qw(spawn);
+use PublicInbox::Spawn qw(spawn popen_rd);
 use PublicInbox::OnDestroy;
-our $LIVE; # pid => callback
-our $LIVE_JOBS;
-our @XDB_SHARDS_FLAT;
+use Socket qw(MSG_EOR);
+use Carp ();
+our (
+	$LIVE, # pid => cmd
+	$DEFER, # [ [ cb, @args ], ... ]
+	$LIVE_JOBS, # integer
+	$MY_SIG, # like %SIG
+	$SIGSET,
+	@RDONLY_SHARDS, # Xapian::Database
+	@IDX_SHARDS # clones of self
+);
 
 # stop walking history if we see >$SEEN_MAX existing commits, this assumes
 # branches don't diverge by more than this number of commits...
@@ -110,14 +120,14 @@ sub progress {
 	$pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n");
 }
 
-sub store_repo ($$$) {
-	my ($self, $git, $repo) = @_;
-	my $xdb = delete($repo->{shard})->idx_acquire;
-	$xdb->begin_transaction;
+sub store_repo { # wq_do - returns docid
+	my ($self, $repo) = @_;
+	$self->begin_txn_lazy;
+	my $xdb = $self->{xdb};
 	for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed?
-	if (defined $repo->{id}) {
-		my $doc = $xdb->get_document($repo->{id}) //
-			die "$git->{git_dir} doc #$repo->{id} gone";
+	if (defined $repo->{docid}) {
+		my $doc = $xdb->get_document($repo->{docid}) //
+			die "$repo->{git_dir} doc #$repo->{docid} gone";
 		add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct});
 		my %new = map { $_ => undef } @{$repo->{roots}};
 		my $old = xap_terms('G', $doc);
@@ -126,34 +136,38 @@ sub store_repo ($$$) {
 		delete @$old{@{$repo->{roots}}};
 		$doc->remove_term('G'.$_) for keys %$old;
 		$doc->set_data($repo->{fp});
-		$xdb->replace_document($repo->{id}, $doc);
+		$xdb->replace_document($repo->{docid}, $doc);
+		$repo->{docid}
 	} else {
 		my $new = $PublicInbox::Search::X{Document}->new;
 		add_val($new, PublicInbox::CodeSearch::CT, $repo->{ct});
-		$new->add_boolean_term("P$git->{git_dir}");
+		$new->add_boolean_term("P$repo->{git_dir}");
 		$new->add_boolean_term('T'.'r');
 		$new->add_boolean_term('G'.$_) for @{$repo->{roots}};
 		$new->set_data($repo->{fp}); # \n delimited
 		$xdb->add_document($new);
 	}
-	$xdb->commit_transaction;
 }
 
 # sharded reader for `git log --pretty=format: --stdin'
-sub shard_worker ($$$) {
-	my ($self, $r, $sigset) = @_;
+sub shard_index { # via wq_io_do
+	my ($self, $git, $n, $roots) = @_;
+	local $self->{current_info} = "$git->{git_dir} [$n]";
 	my ($quit, $cmt);
+	local $self->{roots} = $roots;
+	my $in = delete($self->{0}) // die 'BUG: no {0} input';
+	my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
 	my $batch_bytes = $self->{-opt}->{batch_size} //
 				$PublicInbox::SearchIdx::BATCH_BYTES;
 	my $max = $batch_bytes;
-	$SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import'
-	$SIG{QUIT} = $SIG{TERM} = $SIG{INT} = sub { $quit = shift };
-	PublicInbox::DS::sig_setmask($sigset);
-
-	# the parent process of this shard process writes directly to
-	# the stdin of `git log', we consume git log's stdout:
-	my $rd = $self->{git}->popen(@LOG_STDIN, undef, { 0 => $r });
-	close $r or die "close: $!";
+	my $set_quit = sub { $quit = shift };
+	local $SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import'
+	local $SIG{QUIT} = $set_quit;
+	local $SIG{TERM} = $set_quit;
+	local $SIG{INT} = $set_quit;
+	local $self->{git} = $git; # for patchid
+	my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
+	close $in or die "close: $!";
 	my $nr = 0;
 
 	# a patch may have \0, see c4201214cbf10636e2c1ab9131573f735b42c8d4
@@ -162,8 +176,7 @@ sub shard_worker ($$$) {
 	local $/ = $FS;
 	my $buf = <$rd> // return; # leading $FS
 	$buf eq $FS or die "BUG: not LF-NUL: $buf\n";
-	my $xdb = $self->idx_acquire;
-	$xdb->begin_transaction;
+	$self->begin_txn_lazy;
 	while (defined($buf = <$rd>)) {
 		chomp($buf);
 		$max -= length($buf);
@@ -174,24 +187,40 @@ sub shard_worker ($$$) {
 		++$nr;
 		if ($max <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
 			progress($self, $nr);
-			$xdb->commit_transaction;
+			$self->{xdb}->commit_transaction;
 			$max = $batch_bytes;
-			$xdb->begin_transaction;
+			$self->{xdb}->begin_transaction;
 		}
 		$/ = $FS;
 	}
 	close($rd);
 	if (!$? || ($quit && ($? & 127) == POSIX::SIGPIPE)) {
-		$xdb->commit_transaction;
+		send($op_p, "shard_done $n", MSG_EOR);
 	} else {
 		warn "E: git @LOG_STDIN: \$?=$?\n";
-		$xdb->cancel_transaction;
+		$self->{xdb}->cancel_transaction;
 	}
 }
 
+sub shard_done { # called via PktOp on shard_index completion
+	my ($self, $n) = @_;
+	$self->{-shard_ok}->{$n} = 1 if defined($self->{-shard_ok});
+}
+
 sub seen ($$) {
 	my ($xdb, $q) = @_; # $q = "Q$COMMIT_HASH"
-	$xdb->postlist_begin($q) != $xdb->postlist_end($q)
+	for (1..100) {
+		my $ret = eval {
+			$xdb->postlist_begin($q) != $xdb->postlist_end($q);
+		};
+		return $ret unless $@;
+		if (ref($@) =~ /\bDatabaseModifiedError\b/) {
+			$xdb->reopen;
+		} else {
+			Carp::croak($@);
+		}
+	}
+	Carp::croak('too many Xapian DB modifications in progress');
 }
 
 # used to select the shard for a GIT_DIR
@@ -206,18 +235,42 @@ sub docids_by_postlist ($$) { # consider moving to PublicInbox::Search
 	@ids;
 }
 
+sub run_todo ($) {
+	my ($self) = @_;
+	my $n;
+	while (defined(my $x = shift(@{$self->{todo} // []}))) {
+		my $cb = shift @$x;
+		$cb->(@$x);
+		++$n;
+	}
+	$n;
+}
+
 sub cidx_reap ($$) {
 	my ($self, $jobs) = @_;
-	while (keys(%$LIVE) >= $jobs) {
-		my $pid = waitpid(-1, 0) // die "waitpid(-1): $!";
-		last if $pid < 0;
-		if (my $x = delete $LIVE->{$pid}) {
-			my $cb = shift @$x;
-			$cb->(@$x) if $cb;
-		} else {
-			warn "reaped unknown PID=$pid ($?)\n";
-		}
+	while (run_todo($self)) {}
+	my $cb = sub { keys(%$LIVE) > $jobs };
+	PublicInbox::DS->SetPostLoopCallback($cb);
+	PublicInbox::DS::event_loop($MY_SIG, $SIGSET) while $cb->();
+	while (!$jobs && run_todo($self)) {}
+}
+
+sub cidx_await_cb { # awaitpid cb
+	my ($pid, $cb, $self, $git, @args) = @_;
+	return if !$LIVE; # premature shutdown
+	my $cmd = delete $LIVE->{$pid} // die 'BUG: no $cmd';
+	PublicInbox::DS::enqueue_reap() if !keys(%$LIVE); # once more for PLC
+	if ($?) {
+		$git->{-cidx_err} = 1;
+		return warn("@$cmd error: \$?=$?\n");
 	}
+	push(@$DEFER, [ $cb, $self, $git, @args ]) if $DEFER;
+}
+
+sub cidx_await ($$$$$@) {
+	my ($pid, $cmd, $cb, $self, $git, @args) = @_;
+	$LIVE->{$pid} = $cmd;
+	awaitpid($pid, \&cidx_await_cb, $cb, $self, $git, @args);
 }
 
 # this is different from the grokmirror-compatible fingerprint since we
@@ -227,13 +280,14 @@ sub fp_start ($$$) {
 	return if !$LIVE; # premature exit
 	cidx_reap($self, $LIVE_JOBS);
 	open my $refs, '+>', undef or die "open: $!";
-	my $pid = spawn(['git', "--git-dir=$git->{git_dir}",
-		qw(show-ref --heads --tags --hash)], undef, { 1 => $refs });
+	my $cmd = ['git', "--git-dir=$git->{git_dir}",
+		qw(show-ref --heads --tags --hash)];
+	my $pid = spawn($cmd, undef, { 1 => $refs });
 	$git->{-repo}->{refs} = $refs;
-	$LIVE->{$pid} = [ \&fp_fini, $self, $git, $prep_repo ];
+	cidx_await($pid, $cmd, \&fp_fini, $self, $git, $prep_repo);
 }
 
-sub fp_fini {
+sub fp_fini { # cidx_await cb
 	my ($self, $git, $prep_repo) = @_;
 	my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
 	seek($refs, 0, SEEK_SET) or die "seek: $!";
@@ -247,13 +301,15 @@ sub ct_start ($$$) {
 	my ($self, $git, $prep_repo) = @_;
 	return if !$LIVE; # premature exit
 	cidx_reap($self, $LIVE_JOBS);
-	my ($rd, $pid) = $git->popen([qw[for-each-ref --sort=-committerdate
+	my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
+		qw[for-each-ref --sort=-committerdate
 		--format=%(committerdate:raw) --count=1
-		refs/heads/ refs/tags/]]);
-	$LIVE->{$pid} = [ \&ct_fini, $self, $git, $rd, $prep_repo ];
+		refs/heads/ refs/tags/] ];
+	my ($rd, $pid) = popen_rd($cmd);
+	cidx_await($pid, $cmd, \&ct_fini, $self, $git, $rd, $prep_repo);
 }
 
-sub ct_fini {
+sub ct_fini { # cidx_await cb
 	my ($self, $git, $rd, $prep_repo) = @_;
 	defined(my $ct = <$rd>) or return;
 	$ct =~ s/\s+.*\z//s; # drop TZ + LF
@@ -263,34 +319,38 @@ sub ct_fini {
 # TODO: also index gitweb.owner and the full fingerprint for grokmirror?
 sub prep_repo ($$) {
 	my ($self, $git) = @_;
-	return if !$LIVE; # premature exit
+	return if !$LIVE || $git->{-cidx_err}; # premature exit
 	my $repo = $git->{-repo} // die 'BUG: no {-repo}';
-	my $git_dir = $git->{git_dir};
 	if (!defined($repo->{ct})) {
-		warn "W: $git_dir has no commits, skipping\n";
+		warn "W: $git->{git_dir} has no commits, skipping\n";
 		delete $git->{-repo};
 		return;
 	}
-	my $n = git_dir_hash($git_dir) % $self->{nshard};
-	my $shard = $repo->{shard} = bless { %$self, shard => $n }, ref($self);
+	my $n = git_dir_hash($git->{git_dir}) % $self->{nshard};
+	my $shard = bless { %$self, shard => $n }, ref($self);
+	$repo->{shard_n} = $n;
 	delete @$shard{qw(lockfh lock_path)};
-	my $xdb = $XDB_SHARDS_FLAT[$n] // die "BUG: shard[$n] undef";
-	$xdb->reopen;
-	my @docids = docids_by_postlist({ xdb => $xdb }, 'P'.$git_dir);
+	local $shard->{xdb} = $RDONLY_SHARDS[$n] // die "BUG: shard[$n] undef";
+	$shard->retry_reopen(\&check_existing, $self, $git);
+}
+
+sub check_existing { # retry_reopen callback
+	my ($shard, $self, $git) = @_;
+	my @docids = docids_by_postlist($shard, 'P'.$git->{git_dir});
 	my $docid = shift(@docids) // return get_roots($self, $git);
-	if (@docids) {
-		warn "BUG: $git_dir indexed multiple times, culling\n";
-		$repo->{to_delete} = \@docids; # XXX needed?
-	}
-	my $doc = $xdb->get_document($docid) //
-		die "BUG: no #$docid ($git_dir)";
+	my $doc = $shard->{xdb}->get_document($docid) //
+			die "BUG: no #$docid ($git->{git_dir})";
 	my $old_fp = $doc->get_data;
-	if ($old_fp eq $repo->{fp}) { # no change
-		progress($self, "$git_dir unchanged");
+	if ($old_fp eq $git->{-repo}->{fp}) { # no change
+		progress($self, "$git->{git_dir} unchanged");
 		delete $git->{-repo};
 		return;
 	}
-	$repo->{id} = $docid;
+	$git->{-repo}->{docid} = $docid;
+	if (@docids) {
+		warn "BUG: $git->{git_dir} indexed multiple times, culling\n";
+		$git->{-repo}->{to_delete} = \@docids; # XXX needed?
+	}
 	get_roots($self, $git);
 }
 
@@ -304,12 +364,12 @@ sub partition_refs ($$$) {
 		$_->reopen;
 		open my $fh, '+>', undef or die "open: $!";
 		$fh;
-	} @XDB_SHARDS_FLAT;
+	} @RDONLY_SHARDS;
 
 	while (defined(my $cmt = <$fh>)) {
 		chomp $cmt;
-		my $n = hex(substr($cmt, 0, 8)) % scalar(@XDB_SHARDS_FLAT);
-		if (seen($XDB_SHARDS_FLAT[$n], 'Q'.$cmt)) {
+		my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS);
+		if (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) {
 			last if ++$seen > $SEEN_MAX;
 		} else {
 			say { $shard_in[$n] } $cmt or die "say: $!";
@@ -330,9 +390,33 @@ sub partition_refs ($$$) {
 	die "git --git-dir=$git->{git_dir} rev-list: \$?=$?\n";
 }
 
-sub index_repo {
+sub shard_commit { # via wq_io_do
+	my ($self, $n) = @_;
+	my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
+	$self->commit_txn_lazy;
+	send($op_p, "shard_done $n", MSG_EOR);
+}
+
+sub commit_used_shards ($$$) {
+	my ($self, $git, $consumers) = @_;
+	local $self->{-shard_ok} = {};
+	for my $n (keys %$consumers) {
+		my ($c, $p) = PublicInbox::PktOp->pair;
+		$c->{ops}->{shard_done} = [ $self ];
+		$IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n);
+		$consumers->{$n} = $c;
+	}
+	PublicInbox::DS->SetPostLoopCallback(sub {
+		scalar(grep { $_->{sock} } values %$consumers);
+	});
+	PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
+	my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
+	die "E: $git->{git_dir} $n shards failed" if $n;
+}
+
+sub index_repo { # cidx_await cb
 	my ($self, $git, $roots) = @_;
-	return if !$LIVE; # premature exit
+	return if $git->{-cidx_err};
 	my $repo = delete $git->{-repo} or return;
 	seek($roots, 0, SEEK_SET) or die "seek: $!";
 	chomp(my @roots = <$roots>);
@@ -341,73 +425,45 @@ sub index_repo {
 	$repo->{roots} = \@roots;
 	local $self->{current_info} = $git->{git_dir};
 	my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
-	my %pids;
-	my $fwd_kill = sub {
-		my ($sig) = @_;
-		kill($sig, $_) for keys %pids;
-	};
-	local $SIG{USR1} = $fwd_kill;
-	local $SIG{QUIT} = $fwd_kill;
-	local $SIG{INT} = $fwd_kill;
-	local $SIG{TERM} = $fwd_kill;
-	my $sigset = PublicInbox::DS::block_signals();
-	for (my $n = 0; $n <= $#shard_in; $n++) {
+	local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1
+	my %CONSUMERS;
+	for my $n (0..$#shard_in) {
 		-s $shard_in[$n] or next;
-		my $pid = fork // die "fork: $!";
-		if ($pid == 0) { # no RNG use, here
-			$0 = "code index [$n]";
-			$self->{git} = $git;
-			$self->{shard} = $n;
-			$self->{current_info} = "$self->{current_info} [$n]";
-			delete @$self{qw(lockfh lock_path)};
-			my $in = $shard_in[$n];
-			@shard_in = ();
-			$self->{roots} = \@roots;
-			undef $repo;
-			eval { shard_worker($self, $in, $sigset) };
-			warn "E: $@" if $@;
-			POSIX::_exit($@ ? 1 : 0);
-		} else {
-			$pids{$pid} = "code index [$n]";
-		}
+		my ($c, $p) = PublicInbox::PktOp->pair;
+		$c->{ops}->{shard_done} = [ $self ];
+		$IDX_SHARDS[$n]->wq_io_do('shard_index',
+					[ $shard_in[$n], $p->{op_p} ],
+					$git, $n, \@roots);
+		$CONSUMERS{$n} = $c;
 	}
-	PublicInbox::DS::sig_setmask($sigset);
 	@shard_in = ();
-	my ($err, @todo);
-	while (keys %pids) {
-		my $pid = waitpid(-1, 0) // die "waitpid: $!";
-		if (my $j = delete $pids{$pid}) {
-			next if $? == 0;
-			warn "PID:$pid $j exited with \$?=$?\n";
-			$err = 1;
-		} elsif (my $todo = delete $LIVE->{$pid}) {
-			warn "PID:$pid exited with \$?=$?\n" if $?;
-			push @todo, $todo;
-		} else {
-			warn "reaped unknown PID=$pid ($?)\n";
-		}
-	}
-	die "subprocess(es) failed\n" if $err;
-	store_repo($self, $git, $repo);
-	progress($self, "$git->{git_dir}: done");
-	# TODO: check fp afterwards?
-	while (my $x = shift @todo) {
-		my $cb = shift @$x;
-		$cb->(@$x) if $cb;
+	PublicInbox::DS->SetPostLoopCallback(sub {
+		scalar(grep { $_->{sock} } values %CONSUMERS);
+	});
+	PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
+	my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS;
+	die "E: $git->{git_dir} $n shards failed" if $n;
+	$repo->{git_dir} = $git->{git_dir};
+	my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
+	if ($id > 0) {
+		$CONSUMERS{$repo->{shard_n}} = undef;
+		commit_used_shards($self, $git, \%CONSUMERS);
+		progress($self, "$git->{git_dir}: done");
+		return run_todo($self);
 	}
+	die "E: store_repo $git->{git_dir}: id=$id";
 }
 
 sub get_roots ($$) {
 	my ($self, $git) = @_;
 	return if !$LIVE; # premature exit
-	cidx_reap($self, $LIVE_JOBS);
 	my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
 	sysseek($refs, 0, SEEK_SET) or die "seek: $!";
 	open my $roots, '+>', undef or die "open: $!";
-	my $pid = spawn(['git', "--git-dir=$git->{git_dir}",
-			qw(rev-list --stdin --max-parents=0)],
-			undef, { 0 => $refs, 1 => $roots });
-	$LIVE->{$pid} = [ \&index_repo, $self, $git, $roots ];
+	my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
+			qw(rev-list --stdin --max-parents=0) ];
+	my $pid = spawn($cmd, undef, { 0 => $refs, 1 => $roots });
+	cidx_await($pid, $cmd, \&index_repo, $self, $git, $roots);
 }
 
 # for PublicInbox::SearchIdx::patch_id and with_umask
@@ -434,9 +490,17 @@ sub cidx_init ($) {
 		warn "# creating $dir\n" if !$self->{-opt}->{quiet};
 		File::Path::mkpath($dir);
 	}
+	$self->lock_acquire;
+	my @shards;
 	for my $n (0..($self->{nshard} - 1)) {
 		my $shard = bless { %$self, shard => $n }, ref($self);
+		delete @$shard{qw(lockfh lock_path)};
 		$shard->idx_acquire;
+		$shard->idx_release;
+		$shard->wq_workers_start("shard[$n]", 1, undef, {
+			siblings => \@shards, # for ipc_atfork_child
+		}, \&shard_done_wait, $self);
+		push @shards, $shard;
 	}
 	# this warning needs to happen after idx_acquire
 	state $once;
@@ -444,14 +508,11 @@ sub cidx_init ($) {
 W: Xapian v1.2.21..v1.2.24 were missing close-on-exec on OFD locks,
 W: memory usage may be high for large indexing runs
 EOM
+	@shards;
 }
 
 sub scan_git_dirs ($) {
 	my ($self) = @_;
-	local $LIVE_JOBS = $self->{-opt}->{jobs} //
-			PublicInbox::IPC::detect_nproc() // 2;
-	local $LIVE = {};
-	local @XDB_SHARDS_FLAT = $self->xdb_shards_flat;
 	for (@{$self->{git_dirs}}) {
 		my $git = PublicInbox::Git->new($_);
 		my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo,
@@ -462,18 +523,31 @@ sub scan_git_dirs ($) {
 	cidx_reap($self, 0);
 }
 
-sub cidx_run {
+sub shards_active { # PostLoopCallback
+	scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
+}
+
+sub cidx_run { # main entry point
 	my ($self) = @_;
-	cidx_init($self);
+	local $self->{todo} = [];
+	local $DEFER = $self->{todo};
+	local $SIGSET = PublicInbox::DS::block_signals();
+	my $restore = PublicInbox::OnDestroy->new($$,
+		\&PublicInbox::DS::sig_setmask, $SIGSET);
+	local $LIVE = {};
+	local @IDX_SHARDS = cidx_init($self);
 	local $self->{current_info} = '';
 	my $cb = $SIG{__WARN__} || \&CORE::warn;
+	local $MY_SIG = {
+		CHLD => \&PublicInbox::DS::enqueue_reap,
+		INT => sub { exit },
+	};
 	local $SIG{__WARN__} = sub {
 		my $m = shift @_;
 		$self->{current_info} eq '' or
 			$m =~ s/\A(#?\s*)/$1$self->{current_info}: /;
 		$cb->($m, @_);
 	};
-	$self->lock_acquire;
 	load_existing($self);
 	my @nc = grep { File::Spec->canonpath($_) ne $_ } @{$self->{git_dirs}};
 	if (@nc) {
@@ -486,9 +560,41 @@ sub cidx_run {
 		warn "E: canonicalized and attempting to continue\n";
 	}
 	local $self->{nchange} = 0;
+	local $LIVE_JOBS = $self->{-opt}->{jobs} ||
+			PublicInbox::IPC::detect_nproc() || 2;
+	local @RDONLY_SHARDS = $self->xdb_shards_flat;
+
 	# do_prune($self) if $self->{-opt}->{prune}; TODO
 	scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
+
+	for my $s (@IDX_SHARDS) {
+		$s->{-cidx_quit} = 1;
+		$s->wq_close;
+	}
+
+	PublicInbox::DS->SetPostLoopCallback(\&shards_active);
+	PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active();
 	$self->lock_release(!!$self->{nchange});
 }
 
+sub ipc_atfork_child {
+	my ($self) = @_;
+	$self->SUPER::ipc_atfork_child;
+	my $x = delete $self->{siblings} // die 'BUG: no {siblings}';
+	$_->wq_close for @$x;
+}
+
+sub shard_done_wait { # awaitpid cb via ipc_worker_reap
+	my ($pid, $shard, $self) = @_;
+	delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
+	return unless $?;
+	warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
+	++$self->{shard_err} if defined($self->{shard_err});
+}
+
+sub with_umask { # TODO
+	my ($self, $cb, @arg) = @_;
+	$cb->(@arg);
+}
+
 1;

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 11/28] ds: @post_loop_do replaces SetPostLoopCallback
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (8 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 10/28] cindex: use DS and workqueues for parallelism Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 12/28] cindex: implement --exclude= like -clone Eric Wong
                     ` (16 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

This allows us to avoid repeatedly using memory-intensive
anonymous subs in CodeSearchIdx where the callback is assigned
frequently.  Anonymous subs are known to leak memory in old
Perls (e.g. 5.16.3 in enterprise distros) and still expensive in
newer Perls.  So favor the (\&subroutine, @args) form which
allows us to eliminate anonymous subs going forward.

Only CodeSearchIdx takes advantage of the new API at the moment,
since it's the biggest repeat user of post-loop callback
changes.

Getting rid of the subroutine and relying on a global `our'
variable also has two advantages:

1) Perl warnings can detect typos at compile-time, whereas the
   (now gone) method could only detect errors at run-time.

2) `our' variable assignment can be `local'-ized to a scope
---
 lib/PublicInbox/CodeSearchIdx.pm | 29 ++++++++++++++++++-----------
 lib/PublicInbox/DS.pm            | 30 +++++++-----------------------
 lib/PublicInbox/Daemon.pm        |  2 +-
 lib/PublicInbox/ExtSearchIdx.pm  |  2 +-
 lib/PublicInbox/IPC.pm           |  7 ++++++-
 lib/PublicInbox/LEI.pm           |  4 ++--
 lib/PublicInbox/Watch.pm         |  2 +-
 t/dir_idle.t                     |  6 +++---
 t/ds-leak.t                      |  8 ++++----
 t/imapd.t                        |  6 +++---
 t/nntpd.t                        |  2 +-
 t/sigfd.t                        |  7 ++++---
 t/watch_maildir.t                |  8 ++++----
 xt/mem-imapd-tls.t               |  7 +++----
 xt/mem-nntpd-tls.t               |  8 ++++----
 xt/net_writer-imap.t             |  4 ++--
 16 files changed, 64 insertions(+), 68 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 13fe1c28..587f0b81 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -246,12 +246,18 @@ sub run_todo ($) {
 	$n;
 }
 
+sub need_reap { # post_loop_do
+	my (undef, $jobs) = @_;
+	scalar(keys(%$LIVE)) > $jobs;
+}
+
 sub cidx_reap ($$) {
 	my ($self, $jobs) = @_;
 	while (run_todo($self)) {}
-	my $cb = sub { keys(%$LIVE) > $jobs };
-	PublicInbox::DS->SetPostLoopCallback($cb);
-	PublicInbox::DS::event_loop($MY_SIG, $SIGSET) while $cb->();
+	local @PublicInbox::DS::post_loop_do = \(&need_reap, $jobs);
+	while (need_reap(undef, $jobs)) {
+		PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
+	}
 	while (!$jobs && run_todo($self)) {}
 }
 
@@ -397,6 +403,11 @@ sub shard_commit { # via wq_io_do
 	send($op_p, "shard_done $n", MSG_EOR);
 }
 
+sub consumers_open { # post_loop_do
+	my (undef, $consumers) = @_;
+	scalar(grep { $_->{sock} } values %$consumers);
+}
+
 sub commit_used_shards ($$$) {
 	my ($self, $git, $consumers) = @_;
 	local $self->{-shard_ok} = {};
@@ -406,9 +417,7 @@ sub commit_used_shards ($$$) {
 		$IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n);
 		$consumers->{$n} = $c;
 	}
-	PublicInbox::DS->SetPostLoopCallback(sub {
-		scalar(grep { $_->{sock} } values %$consumers);
-	});
+	local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
 	PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
 	my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
 	die "E: $git->{git_dir} $n shards failed" if $n;
@@ -437,9 +446,7 @@ sub index_repo { # cidx_await cb
 		$CONSUMERS{$n} = $c;
 	}
 	@shard_in = ();
-	PublicInbox::DS->SetPostLoopCallback(sub {
-		scalar(grep { $_->{sock} } values %CONSUMERS);
-	});
+	local @PublicInbox::DS::post_loop_do = (\&consumers_open, \%CONSUMERS);
 	PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
 	my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS;
 	die "E: $git->{git_dir} $n shards failed" if $n;
@@ -523,7 +530,7 @@ sub scan_git_dirs ($) {
 	cidx_reap($self, 0);
 }
 
-sub shards_active { # PostLoopCallback
+sub shards_active { # post_loop_do
 	scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
 }
 
@@ -572,7 +579,7 @@ sub cidx_run { # main entry point
 		$s->wq_close;
 	}
 
-	PublicInbox::DS->SetPostLoopCallback(\&shards_active);
+	local @PublicInbox::DS::post_loop_do = (\&shards_active);
 	PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active();
 	$self->lock_release(!!$self->{nchange});
 }
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index b6eaf2d7..340086fc 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -44,7 +44,7 @@ our (
      $Epoll,                     # Global epoll fd (or DSKQXS ref)
      $ep_io,                     # IO::Handle for Epoll
 
-     $PostLoopCallback,          # subref to call at the end of each loop, if defined (global)
+     @post_loop_do,              # subref + args to call at the end of each loop
 
      $LoopTimeout,               # timeout of event loop in milliseconds
      @Timers,                    # timers
@@ -69,7 +69,7 @@ sub Reset {
 		%DescriptorMap = ();
 		@Timers = ();
 		%UniqTimer = ();
-		$PostLoopCallback = undef;
+		@post_loop_do = ();
 
 		# we may be iterating inside one of these on our stack
 		my @q = delete @Stack{keys %Stack};
@@ -79,7 +79,7 @@ sub Reset {
 		$Epoll = undef; # may call DSKQXS::DESTROY
 	} while (@Timers || keys(%Stack) || $nextq || $AWAIT_PIDS ||
 		$ToClose || keys(%DescriptorMap) ||
-		$PostLoopCallback || keys(%UniqTimer));
+		@post_loop_do || keys(%UniqTimer));
 
 	$reap_armed = undef;
 	$LoopTimeout = -1;  # no timeout by default
@@ -247,11 +247,13 @@ sub PostEventLoop () {
 	}
 
 	# by default we keep running, unless a postloop callback cancels it
-	$PostLoopCallback ? $PostLoopCallback->(\%DescriptorMap) : 1;
+	@post_loop_do ?  $post_loop_do[0]->(\%DescriptorMap,
+					@post_loop_do[1..$#post_loop_do])
+			: 1
 }
 
 # Start processing IO events. In most daemon programs this never exits. See
-# C<PostLoopCallback> for how to exit the loop.
+# C<post_loop_do> for how to exit the loop.
 sub event_loop (;$$) {
 	my ($sig, $oldset) = @_;
 	$Epoll //= _InitPoller();
@@ -287,24 +289,6 @@ sub event_loop (;$$) {
 	} while (PostEventLoop());
 }
 
-=head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >>
-
-Sets post loop callback function.  Pass a subref and it will be
-called every time the event loop finishes.
-
-Return 1 (or any true value) from the sub to make the loop continue, 0 or false
-and it will exit.
-
-The callback function will be passed two parameters: \%DescriptorMap
-
-=cut
-sub SetPostLoopCallback {
-    my ($class, $ref) = @_;
-
-    # global callback
-    $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef;
-}
-
 #####################################################################
 ### PublicInbox::DS-the-object code
 #####################################################################
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index 17e799ca..6152a5d3 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -350,7 +350,7 @@ sub worker_quit { # $_[0] = signal name or number (unused)
 	my $proc_name;
 	my $warn = 0;
 	# drop idle connections and try to quit gracefully
-	PublicInbox::DS->SetPostLoopCallback(sub {
+	@PublicInbox::DS::post_loop_do = (sub {
 		my ($dmap, undef) = @_;
 		my $n = 0;
 		my $now = now();
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 401b18d0..5445b156 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -1388,7 +1388,7 @@ sub eidx_watch { # public-inbox-extindex --watch main loop
 	my $quit = PublicInbox::SearchIdx::quit_cb($sync);
 	$sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit;
 	local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock
-	PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} });
+	local @PublicInbox::DS::post_loop_do = (sub { !$sync->{quit} });
 	$pr->("initial scan complete, entering event loop\n") if $pr;
 	# calls InboxIdle->event_step:
 	PublicInbox::DS::event_loop($sig, $oldset);
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 730f2cf6..da534aa7 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -245,11 +245,16 @@ sub recv_and_run {
 	$n;
 }
 
+sub sock_defined {
+	my (undef, $wqw) = @_;
+	defined($wqw->{sock});
+}
+
 sub wq_worker_loop ($$) {
 	my ($self, $bcast2) = @_;
 	my $wqw = PublicInbox::WQWorker->new($self, $self->{-wq_s2});
 	PublicInbox::WQWorker->new($self, $bcast2) if $bcast2;
-	PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} });
+	local @PublicInbox::DS::post_loop_do = (\&sock_defined, $wqw);
 	PublicInbox::DS::event_loop();
 	PublicInbox::DS->Reset;
 }
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index b83de91d..eb9799f6 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1325,11 +1325,11 @@ sub lazy_start {
 	};
 	require PublicInbox::DirIdle;
 	local $dir_idle = PublicInbox::DirIdle->new(sub {
-		# just rely on wakeup to hit PostLoopCallback set below
+		# just rely on wakeup to hit post_loop_do
 		dir_idle_handler($_[0]) if $_[0]->fullname ne $path;
 	});
 	$dir_idle->add_watches([$sock_dir]);
-	PublicInbox::DS->SetPostLoopCallback(sub {
+	local @PublicInbox::DS::post_loop_do = (sub {
 		my ($dmap, undef) = @_;
 		if (@st = defined($path) ? stat($path) : ()) {
 			if ($dev_ino_expect ne pack('dd', $st[0], $st[1])) {
diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm
index d9aadf82..8482100c 100644
--- a/lib/PublicInbox/Watch.pm
+++ b/lib/PublicInbox/Watch.pm
@@ -536,7 +536,7 @@ sub watch { # main entry point
 		add_timer(0, \&poll_fetch_fork, $self, $intvl, $uris);
 	}
 	watch_fs_init($self) if $self->{mdre};
-	PublicInbox::DS->SetPostLoopCallback(sub { !$self->quit_done });
+	local @PublicInbox::DS::post_loop_do = (sub { !$self->quit_done });
 	PublicInbox::DS::event_loop($sig, $oldset); # calls ->event_step
 	_done_for_now($self);
 }
diff --git a/t/dir_idle.t b/t/dir_idle.t
index 19e54967..50e1dd27 100644
--- a/t/dir_idle.t
+++ b/t/dir_idle.t
@@ -1,7 +1,7 @@
 #!perl -w
-# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-use v5.10.1; use strict; use PublicInbox::TestCommon;
+use v5.12; use strict; use PublicInbox::TestCommon;
 use PublicInbox::DS qw(now);
 use File::Path qw(make_path);
 use_ok 'PublicInbox::DirIdle';
@@ -13,7 +13,7 @@ my $di = PublicInbox::DirIdle->new($cb);
 $di->add_watches(["$tmpdir/a", "$tmpdir/c"], 1);
 PublicInbox::DS->SetLoopTimeout(1000);
 my $end = 3 + now;
-PublicInbox::DS->SetPostLoopCallback(sub { scalar(@x) == 0 && now < $end });
+local @PublicInbox::DS::post_loop_do = (sub { scalar(@x) == 0 && now < $end });
 tick(0.011);
 rmdir("$tmpdir/a/b") or xbail "rmdir $!";
 PublicInbox::DS::event_loop();
diff --git a/t/ds-leak.t b/t/ds-leak.t
index 4e8d76cd..eaca05b8 100644
--- a/t/ds-leak.t
+++ b/t/ds-leak.t
@@ -1,9 +1,9 @@
-# Copyright (C) 2019-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # Licensed the same as Danga::Socket (and Perl5)
 # License: GPL-1.0+ or Artistic-1.0-Perl
 #  <https://www.gnu.org/licenses/gpl-1.0.txt>
 #  <https://dev.perl.org/licenses/artistic.html>
-use strict; use v5.10.1; use PublicInbox::TestCommon;
+use v5.12; use PublicInbox::TestCommon;
 use_ok 'PublicInbox::DS';
 
 if ('close-on-exec for epoll and kqueue') {
@@ -12,7 +12,7 @@ if ('close-on-exec for epoll and kqueue') {
 	my $evfd_re = qr/(?:kqueue|eventpoll)/i;
 
 	PublicInbox::DS->SetLoopTimeout(0);
-	PublicInbox::DS->SetPostLoopCallback(sub { 0 });
+	local @PublicInbox::DS::post_loop_do = (sub { 0 });
 
 	# make sure execve closes if we're using fork()
 	my ($r, $w);
@@ -55,7 +55,7 @@ SKIP: {
 	my $cb = sub {};
 	for my $i (0..$n) {
 		PublicInbox::DS->SetLoopTimeout(0);
-		PublicInbox::DS->SetPostLoopCallback($cb);
+		local @PublicInbox::DS::post_loop_do = ($cb);
 		PublicInbox::DS::event_loop();
 		PublicInbox::DS->Reset;
 	}
diff --git a/t/imapd.t b/t/imapd.t
index c7dc01a5..0443c7cb 100644
--- a/t/imapd.t
+++ b/t/imapd.t
@@ -457,7 +457,7 @@ SKIP: {
 	my $cfg = PublicInbox::Config->new;
 	PublicInbox::DS->Reset;
 	my $ii = PublicInbox::InboxIdle->new($cfg);
-	my $cb = sub { PublicInbox::DS->SetPostLoopCallback(sub {}) };
+	my $cb = sub { @PublicInbox::DS::post_loop_do = (sub {}) };
 	my $obj = bless \$cb, 'PublicInbox::TestCommon::InboxWakeup';
 	$cfg->each_inbox(sub { $_[0]->subscribe_unlock('ident', $obj) });
 	my $watcherr = "$tmpdir/watcherr";
@@ -476,7 +476,7 @@ SKIP: {
 		'delivered a message for IDLE to kick -watch') or
 		diag "mda error \$?=$?";
 	diag 'waiting for IMAP IDLE wakeup';
-	PublicInbox::DS->SetPostLoopCallback(undef);
+	@PublicInbox::DS::post_loop_do = ();
 	PublicInbox::DS::event_loop();
 	diag 'inbox unlocked on IDLE wakeup';
 
@@ -493,7 +493,7 @@ SKIP: {
 		'delivered a message for -watch PollInterval');
 
 	diag 'waiting for PollInterval wakeup';
-	PublicInbox::DS->SetPostLoopCallback(undef);
+	@PublicInbox::DS::post_loop_do = ();
 	PublicInbox::DS::event_loop();
 	diag 'inbox unlocked (poll)';
 	$w->kill;
diff --git a/t/nntpd.t b/t/nntpd.t
index dbbc37b8..ab6876bb 100644
--- a/t/nntpd.t
+++ b/t/nntpd.t
@@ -436,7 +436,7 @@ sub test_watch {
 	my $cfg = PublicInbox::Config->new;
 	PublicInbox::DS->Reset;
 	my $ii = PublicInbox::InboxIdle->new($cfg);
-	my $cb = sub { PublicInbox::DS->SetPostLoopCallback(sub {}) };
+	my $cb = sub { @PublicInbox::DS::post_loop_do = (sub {}) };
 	my $obj = bless \$cb, 'PublicInbox::TestCommon::InboxWakeup';
 	$cfg->each_inbox(sub { $_[0]->subscribe_unlock('ident', $obj) });
 	my $watcherr = "$tmpdir/watcherr";
diff --git a/t/sigfd.t b/t/sigfd.t
index 7eb6b222..8fc8206c 100644
--- a/t/sigfd.t
+++ b/t/sigfd.t
@@ -1,5 +1,6 @@
-# Copyright (C) 2019-2021 all contributors <meta@public-inbox.org>
-use strict;
+#!perl -w
+# Copyright (C) all contributors <meta@public-inbox.org>
+use v5.12;
 use Test::More;
 use IO::Handle;
 use POSIX qw(:signal_h);
@@ -46,7 +47,7 @@ SKIP: {
 		is($nbsig->wait_once, undef, 'nonblocking ->wait_once');
 		ok($! == Errno::EAGAIN, 'got EAGAIN');
 		kill('HUP', $$) or die "kill $!";
-		PublicInbox::DS->SetPostLoopCallback(sub {}); # loop once
+		local @PublicInbox::DS::post_loop_do = (sub {}); # loop once
 		PublicInbox::DS::event_loop();
 		is($hit->{HUP}->{sigfd}, 2, 'HUP sigfd fired in event loop') or
 			diag explain($hit); # sometimes fails on FreeBSD 11.x
diff --git a/t/watch_maildir.t b/t/watch_maildir.t
index e0719f54..04a1c959 100644
--- a/t/watch_maildir.t
+++ b/t/watch_maildir.t
@@ -1,7 +1,7 @@
-# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org>
+#!perl -w
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-use strict;
-use Test::More;
+use v5.12;
 use PublicInbox::Eml;
 use Cwd;
 use PublicInbox::Config;
@@ -170,7 +170,7 @@ More majordomo info at  http://vger.kernel.org/majordomo-info.html\n);
 	my $ii = PublicInbox::InboxIdle->new($cfg);
 	my $obj = bless \$cb, 'PublicInbox::TestCommon::InboxWakeup';
 	$cfg->each_inbox(sub { $_[0]->subscribe_unlock('ident', $obj) });
-	PublicInbox::DS->SetPostLoopCallback(sub { $delivered == 0 });
+	local @PublicInbox::DS::post_loop_do = (sub { $delivered == 0 });
 
 	# wait for -watch to setup inotify watches
 	my $sleep = 1;
diff --git a/xt/mem-imapd-tls.t b/xt/mem-imapd-tls.t
index 75f2911f..00199a9b 100644
--- a/xt/mem-imapd-tls.t
+++ b/xt/mem-imapd-tls.t
@@ -82,7 +82,7 @@ sub once { 0 }; # stops event loop
 # setup the event loop so that it exits at every step
 # while we're still doing connect(2)
 PublicInbox::DS->SetLoopTimeout(0);
-PublicInbox::DS->SetPostLoopCallback(\&once);
+local @PublicInbox::DS::post_loop_do = (\&once);
 my $pid = $td->{pid};
 if ($^O eq 'linux' && open(my $f, '<', "/proc/$pid/status")) {
 	diag(grep(/RssAnon/, <$f>));
@@ -101,14 +101,13 @@ foreach my $n (1..$nfd) {
 	if (!($n % 128) && $DONE != $n) {
 		diag("nr: ($n) $DONE/$nfd");
 		PublicInbox::DS->SetLoopTimeout(-1);
-		PublicInbox::DS->SetPostLoopCallback(sub { $DONE != $n });
+		local @PublicInbox::DS::post_loop_do = (sub { $DONE != $n });
 
 		# clear the backlog:
 		PublicInbox::DS::event_loop();
 
 		# resume looping
 		PublicInbox::DS->SetLoopTimeout(0);
-		PublicInbox::DS->SetPostLoopCallback(\&once);
 	}
 }
 
@@ -116,7 +115,7 @@ foreach my $n (1..$nfd) {
 diag "done?: @".time." $DONE/$nfd";
 if ($DONE != $nfd) {
 	PublicInbox::DS->SetLoopTimeout(-1);
-	PublicInbox::DS->SetPostLoopCallback(sub { $DONE != $nfd });
+	local @PublicInbox::DS::post_loop_do = (sub { $DONE != $nfd });
 	PublicInbox::DS::event_loop();
 }
 is($nfd, $DONE, "$nfd/$DONE done");
diff --git a/xt/mem-nntpd-tls.t b/xt/mem-nntpd-tls.t
index 6e34d233..a861e318 100644
--- a/xt/mem-nntpd-tls.t
+++ b/xt/mem-nntpd-tls.t
@@ -105,7 +105,7 @@ sub once { 0 }; # stops event loop
 # setup the event loop so that it exits at every step
 # while we're still doing connect(2)
 PublicInbox::DS->SetLoopTimeout(0);
-PublicInbox::DS->SetPostLoopCallback(\&once);
+local @PublicInbox::DS::post_loop_do = (\&once);
 
 foreach my $n (1..$nfd) {
 	my $io = tcp_connect($nntps, Blocking => 0);
@@ -120,14 +120,14 @@ foreach my $n (1..$nfd) {
 	if (!($n % 128) && $n != $DONE) {
 		diag("nr: ($n) $DONE/$nfd");
 		PublicInbox::DS->SetLoopTimeout(-1);
-		PublicInbox::DS->SetPostLoopCallback(sub { $DONE != $n });
+		@PublicInbox::DS::post_loop_do = (sub { $DONE != $n });
 
 		# clear the backlog:
 		PublicInbox::DS::event_loop();
 
 		# resume looping
 		PublicInbox::DS->SetLoopTimeout(0);
-		PublicInbox::DS->SetPostLoopCallback(\&once);
+		@PublicInbox::DS::post_loop_do = (\&once);
 	}
 }
 my $pid = $td->{pid};
@@ -141,7 +141,7 @@ $dump_rss->();
 # run the event loop normally, now:
 if ($DONE != $nfd) {
 	PublicInbox::DS->SetLoopTimeout(-1);
-	PublicInbox::DS->SetPostLoopCallback(sub {
+	@PublicInbox::DS::post_loop_do = (sub {
 		diag "done: ".time." $DONE";
 		$DONE != $nfd;
 	});
diff --git a/xt/net_writer-imap.t b/xt/net_writer-imap.t
index 333e0e3b..f7796e8e 100644
--- a/xt/net_writer-imap.t
+++ b/xt/net_writer-imap.t
@@ -1,5 +1,5 @@
 #!perl -w
-# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 use strict; use v5.10.1; use PublicInbox::TestCommon;
 use Sys::Hostname qw(hostname);
@@ -233,7 +233,7 @@ EOM
 	my $pub_cfg = PublicInbox::Config->new;
 	PublicInbox::DS->Reset;
 	my $ii = PublicInbox::InboxIdle->new($pub_cfg);
-	my $cb = sub { PublicInbox::DS->SetPostLoopCallback(sub {}) };
+	my $cb = sub { @PublicInbox::DS::post_loop_do = (sub {}) };
 	my $obj = bless \$cb, 'PublicInbox::TestCommon::InboxWakeup';
 	$pub_cfg->each_inbox(sub { $_[0]->subscribe_unlock('ident', $obj) });
 	my $w = start_script(['-watch'], undef, { 2 => $err_wr });

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 12/28] cindex: implement --exclude= like -clone
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (9 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 11/28] ds: @post_loop_do replaces SetPostLoopCallback Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 13/28] cindex: show shard number in progress message Eric Wong
                     ` (15 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

This is to ensure we can exclude certain repos which are
expensive-to-index (e.g. `**/deps.git', `**/transparency-logs/**').
---
 lib/PublicInbox/CodeSearchIdx.pm | 10 +++++++++-
 script/public-inbox-cindex       |  2 +-
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 587f0b81..97c563bd 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -27,7 +27,7 @@ use File::Spec ();
 use PublicInbox::SHA qw(sha256_hex);
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::SearchIdx qw(add_val);
-use PublicInbox::Config;
+use PublicInbox::Config qw(glob2re);
 use PublicInbox::Spawn qw(spawn popen_rd);
 use PublicInbox::OnDestroy;
 use Socket qw(MSG_EOR);
@@ -566,6 +566,14 @@ sub cidx_run { # main entry point
 		}
 		warn "E: canonicalized and attempting to continue\n";
 	}
+	if (defined(my $excl = $self->{-opt}->{exclude})) {
+		my $re = '(?:'.join('\\z|', map {
+				glob2re($_) // qr/\A\Q$_\E/
+			} @$excl).'\\z)';
+		@{$self->{git_dirs}} = grep {
+			$_ =~ /$re/ ? (warn("# excluding $_\n"), 0) : 1;
+		} @{$self->{git_dirs}};
+	}
 	local $self->{nchange} = 0;
 	local $LIVE_JOBS = $self->{-opt}->{jobs} ||
 			PublicInbox::IPC::detect_nproc() || 2;
diff --git a/script/public-inbox-cindex b/script/public-inbox-cindex
index 166c8261..420ef4de 100755
--- a/script/public-inbox-cindex
+++ b/script/public-inbox-cindex
@@ -26,7 +26,7 @@ EOF
 my $opt = { fsync => 1, scan => 1 }; # --no-scan is hidden
 GetOptions($opt, qw(quiet|q verbose|v+ reindex jobs|j=i fsync|sync! dangerous
 		indexlevel|index-level|L=s batch_size|batch-size=s
-		project-list=s
+		project-list=s exclude=s@
 		d=s update|u scan! prune dry-run|n C=s@ help|h))
 	or die $help;
 if ($opt->{help}) { print $help; exit 0 };

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 13/28] cindex: show shard number in progress message
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (10 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 12/28] cindex: implement --exclude= like -clone Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 14/28] cindex: drop `unchanged' " Eric Wong
                     ` (14 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

Otherwise it may be confusing to see the `$nr' value walk
backwards if some shards are indexing at a slower pace.
---
 lib/PublicInbox/CodeSearchIdx.pm | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 97c563bd..ee2d9a47 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -186,7 +186,7 @@ sub shard_index { # via wq_io_do
 		last if $quit; # likely SIGPIPE
 		++$nr;
 		if ($max <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
-			progress($self, $nr);
+			progress($self, "[$n] $nr");
 			$self->{xdb}->commit_transaction;
 			$max = $batch_bytes;
 			$self->{xdb}->begin_transaction;

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 14/28] cindex: drop `unchanged' progress message
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (11 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 13/28] cindex: show shard number in progress message Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 15/28] cindex: handle graceful shutdown by default Eric Wong
                     ` (13 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

It's too noisy, and a similar message isn't emitted by -clone.
---
 lib/PublicInbox/CodeSearchIdx.pm | 1 -
 1 file changed, 1 deletion(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index ee2d9a47..1a472b64 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -348,7 +348,6 @@ sub check_existing { # retry_reopen callback
 			die "BUG: no #$docid ($git->{git_dir})";
 	my $old_fp = $doc->get_data;
 	if ($old_fp eq $git->{-repo}->{fp}) { # no change
-		progress($self, "$git->{git_dir} unchanged");
 		delete $git->{-repo};
 		return;
 	}

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 15/28] cindex: handle graceful shutdown by default
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (12 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 14/28] cindex: drop `unchanged' " Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 16/28] sigfd: pass signal name rather than number to callback Eric Wong
                     ` (12 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

While individual Xapian shards are consistent due to the use of
Xapian transactions, the data across shards still needs to be
in a consistent state for our search to work.
---
 lib/PublicInbox/CodeSearchIdx.pm | 71 +++++++++++++++++++++-----------
 1 file changed, 48 insertions(+), 23 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 1a472b64..82f90368 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -38,6 +38,8 @@ our (
 	$LIVE_JOBS, # integer
 	$MY_SIG, # like %SIG
 	$SIGSET,
+	$TXN_BYTES, # number of bytes in current shard transaction
+	$DO_QUIT, # signal number
 	@RDONLY_SHARDS, # Xapian::Database
 	@IDX_SHARDS # clones of self
 );
@@ -153,18 +155,14 @@ sub store_repo { # wq_do - returns docid
 sub shard_index { # via wq_io_do
 	my ($self, $git, $n, $roots) = @_;
 	local $self->{current_info} = "$git->{git_dir} [$n]";
-	my ($quit, $cmt);
+	my $cmt;
 	local $self->{roots} = $roots;
 	my $in = delete($self->{0}) // die 'BUG: no {0} input';
 	my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
 	my $batch_bytes = $self->{-opt}->{batch_size} //
 				$PublicInbox::SearchIdx::BATCH_BYTES;
-	my $max = $batch_bytes;
-	my $set_quit = sub { $quit = shift };
-	local $SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import'
-	local $SIG{QUIT} = $set_quit;
-	local $SIG{TERM} = $set_quit;
-	local $SIG{INT} = $set_quit;
+	# local-ized in parent before fork
+	$TXN_BYTES = $batch_bytes;
 	local $self->{git} = $git; # for patchid
 	my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
 	close $in or die "close: $!";
@@ -179,22 +177,23 @@ sub shard_index { # via wq_io_do
 	$self->begin_txn_lazy;
 	while (defined($buf = <$rd>)) {
 		chomp($buf);
-		$max -= length($buf);
+		$TXN_BYTES -= length($buf);
 		@$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
 		$/ = "\n";
 		add_commit($self, $cmt);
-		last if $quit; # likely SIGPIPE
+		last if $DO_QUIT;
 		++$nr;
-		if ($max <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
+		if ($TXN_BYTES <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
 			progress($self, "[$n] $nr");
 			$self->{xdb}->commit_transaction;
-			$max = $batch_bytes;
+			$TXN_BYTES = $batch_bytes;
 			$self->{xdb}->begin_transaction;
 		}
 		$/ = $FS;
 	}
 	close($rd);
-	if (!$? || ($quit && ($? & 127) == POSIX::SIGPIPE)) {
+	if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
+				($? & 127) == POSIX::SIGPIPE))) {
 		send($op_p, "shard_done $n", MSG_EOR);
 	} else {
 		warn "E: git @LOG_STDIN: \$?=$?\n";
@@ -254,7 +253,7 @@ sub need_reap { # post_loop_do
 sub cidx_reap ($$) {
 	my ($self, $jobs) = @_;
 	while (run_todo($self)) {}
-	local @PublicInbox::DS::post_loop_do = \(&need_reap, $jobs);
+	local @PublicInbox::DS::post_loop_do = (\&need_reap, $jobs);
 	while (need_reap(undef, $jobs)) {
 		PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
 	}
@@ -263,7 +262,7 @@ sub cidx_reap ($$) {
 
 sub cidx_await_cb { # awaitpid cb
 	my ($pid, $cb, $self, $git, @args) = @_;
-	return if !$LIVE; # premature shutdown
+	return if !$LIVE || $DO_QUIT;
 	my $cmd = delete $LIVE->{$pid} // die 'BUG: no $cmd';
 	PublicInbox::DS::enqueue_reap() if !keys(%$LIVE); # once more for PLC
 	if ($?) {
@@ -283,7 +282,7 @@ sub cidx_await ($$$$$@) {
 # only care about --heads (branches) and --tags, and not even their names
 sub fp_start ($$$) {
 	my ($self, $git, $prep_repo) = @_;
-	return if !$LIVE; # premature exit
+	return if !$LIVE || $DO_QUIT;
 	cidx_reap($self, $LIVE_JOBS);
 	open my $refs, '+>', undef or die "open: $!";
 	my $cmd = ['git', "--git-dir=$git->{git_dir}",
@@ -305,7 +304,7 @@ sub fp_fini { # cidx_await cb
 
 sub ct_start ($$$) {
 	my ($self, $git, $prep_repo) = @_;
-	return if !$LIVE; # premature exit
+	return if !$LIVE || $DO_QUIT;
 	cidx_reap($self, $LIVE_JOBS);
 	my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
 		qw[for-each-ref --sort=-committerdate
@@ -325,7 +324,7 @@ sub ct_fini { # cidx_await cb
 # TODO: also index gitweb.owner and the full fingerprint for grokmirror?
 sub prep_repo ($$) {
 	my ($self, $git) = @_;
-	return if !$LIVE || $git->{-cidx_err}; # premature exit
+	return if !$LIVE || $DO_QUIT || $git->{-cidx_err};
 	my $repo = $git->{-repo} // die 'BUG: no {-repo}';
 	if (!defined($repo->{ct})) {
 		warn "W: $git->{git_dir} has no commits, skipping\n";
@@ -449,6 +448,11 @@ sub index_repo { # cidx_await cb
 	PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
 	my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS;
 	die "E: $git->{git_dir} $n shards failed" if $n;
+	if ($DO_QUIT) {
+		commit_used_shards($self, $git, \%CONSUMERS);
+		progress($self, "$git->{git_dir}: done");
+		return;
+	}
 	$repo->{git_dir} = $git->{git_dir};
 	my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
 	if ($id > 0) {
@@ -462,7 +466,7 @@ sub index_repo { # cidx_await cb
 
 sub get_roots ($$) {
 	my ($self, $git) = @_;
-	return if !$LIVE; # premature exit
+	return if !$LIVE || $DO_QUIT;
 	my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
 	sysseek($refs, 0, SEEK_SET) or die "seek: $!";
 	open my $roots, '+>', undef or die "open: $!";
@@ -489,6 +493,10 @@ sub load_existing ($) { # for -u/--update
 	@$dirs = grep { !$uniq{$_}++ } @$dirs;
 }
 
+# SIG handlers:
+sub shard_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->() }
+sub shard_usr1 { $TXN_BYTES = -1 }
+
 sub cidx_init ($) {
 	my ($self) = @_;
 	my $dir = $self->{cidx_dir};
@@ -498,12 +506,13 @@ sub cidx_init ($) {
 	}
 	$self->lock_acquire;
 	my @shards;
+	local $TXN_BYTES;
 	for my $n (0..($self->{nshard} - 1)) {
 		my $shard = bless { %$self, shard => $n }, ref($self);
 		delete @$shard{qw(lockfh lock_path)};
 		$shard->idx_acquire;
 		$shard->idx_release;
-		$shard->wq_workers_start("shard[$n]", 1, undef, {
+		$shard->wq_workers_start("shard[$n]", 1, $SIGSET, {
 			siblings => \@shards, # for ipc_atfork_child
 		}, \&shard_done_wait, $self);
 		push @shards, $shard;
@@ -533,6 +542,15 @@ sub shards_active { # post_loop_do
 	scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
 }
 
+# signal handlers
+sub kill_shards { $_->wq_kill(@_) for @IDX_SHARDS }
+
+sub parent_quit {
+	$DO_QUIT = $_[0];
+	kill_shards(@_);
+	warn "# SIG$_[0] received, quitting...\n";
+}
+
 sub cidx_run { # main entry point
 	my ($self) = @_;
 	local $self->{todo} = [];
@@ -541,13 +559,15 @@ sub cidx_run { # main entry point
 	my $restore = PublicInbox::OnDestroy->new($$,
 		\&PublicInbox::DS::sig_setmask, $SIGSET);
 	local $LIVE = {};
+	local $DO_QUIT;
 	local @IDX_SHARDS = cidx_init($self);
 	local $self->{current_info} = '';
-	my $cb = $SIG{__WARN__} || \&CORE::warn;
 	local $MY_SIG = {
 		CHLD => \&PublicInbox::DS::enqueue_reap,
-		INT => sub { exit },
+		USR1 => \&kill_shards,
 	};
+	$MY_SIG->{$_} = \&parent_quit for qw(TERM QUIT INT);
+	my $cb = $SIG{__WARN__} || \&CORE::warn;
 	local $SIG{__WARN__} = sub {
 		my $m = shift @_;
 		$self->{current_info} eq '' or
@@ -594,14 +614,19 @@ sub cidx_run { # main entry point
 sub ipc_atfork_child {
 	my ($self) = @_;
 	$self->SUPER::ipc_atfork_child;
+	$SIG{USR1} = \&shard_usr1;
+	$SIG{$_} = \&shard_quit for qw(INT TERM QUIT);
 	my $x = delete $self->{siblings} // die 'BUG: no {siblings}';
 	$_->wq_close for @$x;
+	undef;
 }
 
 sub shard_done_wait { # awaitpid cb via ipc_worker_reap
 	my ($pid, $shard, $self) = @_;
-	delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
-	return unless $?;
+	if ($? == 0) { # success
+		delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
+		return;
+	}
 	warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
 	++$self->{shard_err} if defined($self->{shard_err});
 }

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 16/28] sigfd: pass signal name rather than number to callback
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (13 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 15/28] cindex: handle graceful shutdown by default Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 17/28] cindex: implement --max-size=SIZE Eric Wong
                     ` (11 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

This is consistent with normal Perl %SIG handlers, and allows
-cindex signal handlers to be implemented consistently across
platforms.
---
 lib/PublicInbox/CodeSearchIdx.pm |  2 +-
 lib/PublicInbox/Daemon.pm        |  2 +-
 lib/PublicInbox/Sigfd.pm         | 10 +++++-----
 3 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 82f90368..fcd28671 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -546,7 +546,7 @@ sub shards_active { # post_loop_do
 sub kill_shards { $_->wq_kill(@_) for @IDX_SHARDS }
 
 sub parent_quit {
-	$DO_QUIT = $_[0];
+	$DO_QUIT = POSIX->can("SIG$_[0]")->();
 	kill_shards(@_);
 	warn "# SIG$_[0] received, quitting...\n";
 }
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index 6152a5d3..57435421 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -510,7 +510,7 @@ sub upgrade_aborted ($) {
 	warn $@, "\n" if $@;
 }
 
-sub reap_children { # $_[0] = 'CHLD' or POSIX::SIGCHLD()
+sub reap_children { # $_[0] = 'CHLD'
 	while (1) {
 		my $p = waitpid(-1, WNOHANG) or return;
 		if (defined $reexec_pid && $p == $reexec_pid) {
diff --git a/lib/PublicInbox/Sigfd.pm b/lib/PublicInbox/Sigfd.pm
index 3d964be3..3c1d3811 100644
--- a/lib/PublicInbox/Sigfd.pm
+++ b/lib/PublicInbox/Sigfd.pm
@@ -4,7 +4,7 @@
 # Wraps a signalfd (or similar) for PublicInbox::DS
 # fields: (sig: hashref similar to %SIG, but signal numbers as keys)
 package PublicInbox::Sigfd;
-use strict;
+use v5.12;
 use parent qw(PublicInbox::DS);
 use PublicInbox::Syscall qw(signalfd EPOLLIN EPOLLET %SIGNUM);
 use POSIX ();
@@ -14,8 +14,8 @@ use POSIX ();
 sub new {
 	my ($class, $sig, $nonblock) = @_;
 	my %signo = map {;
-		# $num => $cb;
-		($SIGNUM{$_} // POSIX->can("SIG$_")->()) => $sig->{$_}
+		# $num => [ $cb, $signame ];
+		($SIGNUM{$_} // POSIX->can("SIG$_")->()) => [ $sig->{$_}, $_ ]
 	} keys %$sig;
 	my $self = bless { sig => \%signo }, $class;
 	my $io;
@@ -45,8 +45,8 @@ sub wait_once ($) {
 		for my $off (0..$nr) {
 			# the first uint32_t of signalfd_siginfo: ssi_signo
 			my $signo = unpack('L', substr($buf, 128 * $off, 4));
-			my $cb = $self->{sig}->{$signo};
-			$cb->($signo) if $cb ne 'IGNORE';
+			my ($cb, $signame) = @{$self->{sig}->{$signo}};
+			$cb->($signame) if $cb ne 'IGNORE';
 		}
 	}
 	$r;

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 17/28] cindex: implement --max-size=SIZE
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (14 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 16/28] sigfd: pass signal name rather than number to callback Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 18/28] cindex: check for checkpoint before giant messages Eric Wong
                     ` (10 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

This matches existing behavior of -index and -extindex, and
will hopefully allow me to avoid OOM problems by skipping
problematic commits.
---
 lib/PublicInbox/CodeSearchIdx.pm | 6 ++++++
 script/public-inbox-cindex       | 4 +++-
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index fcd28671..b185731d 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -161,6 +161,7 @@ sub shard_index { # via wq_io_do
 	my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
 	my $batch_bytes = $self->{-opt}->{batch_size} //
 				$PublicInbox::SearchIdx::BATCH_BYTES;
+	my $max_size = $self->{-opt}->{max_size};
 	# local-ized in parent before fork
 	$TXN_BYTES = $batch_bytes;
 	local $self->{git} = $git; # for patchid
@@ -177,6 +178,11 @@ sub shard_index { # via wq_io_do
 	$self->begin_txn_lazy;
 	while (defined($buf = <$rd>)) {
 		chomp($buf);
+		if ($max_size && length($buf) >= $max_size) {
+			my ($H, undef) = split(/\n/, $buf, 2);
+			warn "W: skipping $H (", length($buf)," >= $max_size)\n";
+			next;
+		}
 		$TXN_BYTES -= length($buf);
 		@$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
 		$/ = "\n";
diff --git a/script/public-inbox-cindex b/script/public-inbox-cindex
index 420ef4de..e2500b93 100755
--- a/script/public-inbox-cindex
+++ b/script/public-inbox-cindex
@@ -16,6 +16,7 @@ usage: public-inbox-cindex [options] --project-list=FILE PROJECT_ROOT
   --update | -u       update previously-indexed code repos with `-d'
   --jobs=NUM          set or disable parallelization (NUM=0)
   --batch-size=BYTES  flush changes to OS after a given number of bytes
+  --max-size=BYTES    do not index commit diffs larger than the given size
   --prune             prune old repos and commits
   --reindex           reindex previously indexed repos
   --verbose | -v      increase verbosity (may be repeated)
@@ -25,7 +26,8 @@ See public-inbox-cindex(1) man page for full documentation.
 EOF
 my $opt = { fsync => 1, scan => 1 }; # --no-scan is hidden
 GetOptions($opt, qw(quiet|q verbose|v+ reindex jobs|j=i fsync|sync! dangerous
-		indexlevel|index-level|L=s batch_size|batch-size=s
+		indexlevel|index-level|L=s
+		batch_size|batch-size=s max_size|max-size=s
 		project-list=s exclude=s@
 		d=s update|u scan! prune dry-run|n C=s@ help|h))
 	or die $help;

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 18/28] cindex: check for checkpoint before giant messages
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (15 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 17/28] cindex: implement --max-size=SIZE Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 19/28] cindex: truncate or drop body for over-sized commits Eric Wong
                     ` (9 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

Giant messages may put us far over the batch limit if we're
close to it.
---
 lib/PublicInbox/CodeSearchIdx.pm | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index b185731d..829fe28e 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -151,6 +151,14 @@ sub store_repo { # wq_do - returns docid
 	}
 }
 
+sub cidx_ckpoint ($$) {
+	my ($self, $msg) = @_;
+	progress($self, $msg);
+	return if $PublicInbox::Search::X{CLOEXEC_UNSET};
+	$self->{xdb}->commit_transaction;
+	$self->{xdb}->begin_transaction;
+}
+
 # sharded reader for `git log --pretty=format: --stdin'
 sub shard_index { # via wq_io_do
 	my ($self, $git, $n, $roots) = @_;
@@ -184,16 +192,18 @@ sub shard_index { # via wq_io_do
 			next;
 		}
 		$TXN_BYTES -= length($buf);
+		if ($TXN_BYTES <= 0) {
+			cidx_ckpoint($self, "[$n] $nr");
+			$TXN_BYTES = $batch_bytes - length($buf);
+		}
 		@$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
 		$/ = "\n";
 		add_commit($self, $cmt);
 		last if $DO_QUIT;
 		++$nr;
-		if ($TXN_BYTES <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
-			progress($self, "[$n] $nr");
-			$self->{xdb}->commit_transaction;
+		if ($TXN_BYTES <= 0) {
+			cidx_ckpoint($self, "[$n] $nr");
 			$TXN_BYTES = $batch_bytes;
-			$self->{xdb}->begin_transaction;
 		}
 		$/ = $FS;
 	}

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 19/28] cindex: truncate or drop body for over-sized commits
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (16 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 18/28] cindex: check for checkpoint before giant messages Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 20/28] cindex: attempt to give oldest commits lowest docids Eric Wong
                     ` (8 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

We need to get at least the commit OID indexed to
avoid redundant work.
---
 lib/PublicInbox/CodeSearchIdx.pm | 53 +++++++++++++++++++++++++-------
 1 file changed, 42 insertions(+), 11 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 829fe28e..176422d0 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -41,7 +41,8 @@ our (
 	$TXN_BYTES, # number of bytes in current shard transaction
 	$DO_QUIT, # signal number
 	@RDONLY_SHARDS, # Xapian::Database
-	@IDX_SHARDS # clones of self
+	@IDX_SHARDS, # clones of self
+	$MAX_SIZE,
 );
 
 # stop walking history if we see >$SEEN_MAX existing commits, this assumes
@@ -159,17 +160,45 @@ sub cidx_ckpoint ($$) {
 	$self->{xdb}->begin_transaction;
 }
 
+sub truncate_cmt ($$) {
+	my ($cmt) = @_; # _[1] is $buf (giant)
+	my ($orig_len, $len);
+	$len = $orig_len = length($_[1]);
+	@$cmt{@FMT} = split(/\n/, $_[1], scalar(@FMT));
+	undef $_[1];
+	$len -= length($cmt->{b});
+
+	# try to keep the commit message body.
+	# n.b. this diffstat split may be unreliable but it's not worth
+	# perfection for giant commits:
+	my ($bdy) = split(/^---\n/sm, delete($cmt->{b}), 2);
+	if (($len + length($bdy)) <= $MAX_SIZE) {
+		$len += length($bdy);
+		$cmt->{b} = $bdy;
+		warn <<EOM;
+W: $cmt->{H}: truncated body ($orig_len => $len bytes)
+W: to be under --max-size=$MAX_SIZE
+EOM
+	} else {
+		$cmt->{b} = '';
+		warn <<EOM;
+W: $cmt->{H}: deleted body ($orig_len => $len bytes)
+W: to be under --max-size=$MAX_SIZE
+EOM
+	}
+	$len;
+}
+
 # sharded reader for `git log --pretty=format: --stdin'
 sub shard_index { # via wq_io_do
 	my ($self, $git, $n, $roots) = @_;
 	local $self->{current_info} = "$git->{git_dir} [$n]";
-	my $cmt;
 	local $self->{roots} = $roots;
 	my $in = delete($self->{0}) // die 'BUG: no {0} input';
 	my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
 	my $batch_bytes = $self->{-opt}->{batch_size} //
 				$PublicInbox::SearchIdx::BATCH_BYTES;
-	my $max_size = $self->{-opt}->{max_size};
+	local $MAX_SIZE = $self->{-opt}->{max_size};
 	# local-ized in parent before fork
 	$TXN_BYTES = $batch_bytes;
 	local $self->{git} = $git; # for patchid
@@ -180,24 +209,26 @@ sub shard_index { # via wq_io_do
 	# a patch may have \0, see c4201214cbf10636e2c1ab9131573f735b42c8d4
 	# in linux.git, so we use $/ = "\n\0" to check end-of-patch
 	my $FS = "\n\0";
+	my $len;
+	my $cmt = {};
 	local $/ = $FS;
 	my $buf = <$rd> // return; # leading $FS
 	$buf eq $FS or die "BUG: not LF-NUL: $buf\n";
 	$self->begin_txn_lazy;
 	while (defined($buf = <$rd>)) {
 		chomp($buf);
-		if ($max_size && length($buf) >= $max_size) {
-			my ($H, undef) = split(/\n/, $buf, 2);
-			warn "W: skipping $H (", length($buf)," >= $max_size)\n";
-			next;
+		$/ = "\n";
+		$len = length($buf);
+		if (defined($MAX_SIZE) && $len > $MAX_SIZE) {
+			$len = truncate_cmt($cmt, $buf);
+		} else {
+			@$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
 		}
-		$TXN_BYTES -= length($buf);
+		$TXN_BYTES -= $len;
 		if ($TXN_BYTES <= 0) {
 			cidx_ckpoint($self, "[$n] $nr");
-			$TXN_BYTES = $batch_bytes - length($buf);
+			$TXN_BYTES = $batch_bytes - $len;
 		}
-		@$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
-		$/ = "\n";
 		add_commit($self, $cmt);
 		last if $DO_QUIT;
 		++$nr;

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 20/28] cindex: attempt to give oldest commits lowest docids
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (17 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 19/28] cindex: truncate or drop body for over-sized commits Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 21/28] cindex: improve granularity of quit checks Eric Wong
                     ` (7 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

Monotonically increasing docids may help us avoid sorting output
for the web and CLI, since recent commits are generally the most
desired search results.

`git log --reverse' incurs no extra overhead in this case, since
`--stdin' will mean git buffers the commit list in memory before
attempting to emit anything.
---
 lib/PublicInbox/CodeSearchIdx.pm | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 176422d0..f0b506da 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -52,8 +52,12 @@ our $SEEN_MAX = 100000;
 
 # TODO: do we care about committer name + email? or tree OID?
 my @FMT = qw(H P ct an ae at s b); # (b)ody must be last
+
+# git log --stdin buffers all commits before emitting, thus --reverse
+# doesn't incur extra overhead.  We use --reverse to keep Xapian docids
+# increasing so we may be able to avoid sorting results in some cases
 my @LOG_STDIN = (qw(log --no-decorate --no-color --no-notes -p --stat -M
-	--stdin --no-walk=unsorted), '--pretty=format:%n%x00'.
+	--reverse --stdin --no-walk=unsorted), '--pretty=format:%n%x00'.
 	join('%n', map { "%$_" } @FMT));
 
 sub new {

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 21/28] cindex: improve granularity of quit checks
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (18 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 20/28] cindex: attempt to give oldest commits lowest docids Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 22/28] spawn: show failing directory for chdir failures Eric Wong
                     ` (6 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

This fixes shutdown handling when shard_index() isn't running
and ensures we can shut down the process more quickly.
---
 lib/PublicInbox/CodeSearchIdx.pm | 55 ++++++++++++++++++++------------
 1 file changed, 34 insertions(+), 21 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index f0b506da..4f91e0b6 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -206,6 +206,7 @@ sub shard_index { # via wq_io_do
 	# local-ized in parent before fork
 	$TXN_BYTES = $batch_bytes;
 	local $self->{git} = $git; # for patchid
+	return if $DO_QUIT;
 	my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
 	close $in or die "close: $!";
 	my $nr = 0;
@@ -216,10 +217,10 @@ sub shard_index { # via wq_io_do
 	my $len;
 	my $cmt = {};
 	local $/ = $FS;
-	my $buf = <$rd> // return; # leading $FS
+	my $buf = <$rd> // return close($rd); # leading $FS
 	$buf eq $FS or die "BUG: not LF-NUL: $buf\n";
 	$self->begin_txn_lazy;
-	while (defined($buf = <$rd>)) {
+	while (!$DO_QUIT && defined($buf = <$rd>)) {
 		chomp($buf);
 		$/ = "\n";
 		$len = length($buf);
@@ -234,7 +235,6 @@ sub shard_index { # via wq_io_do
 			$TXN_BYTES = $batch_bytes - $len;
 		}
 		add_commit($self, $cmt);
-		last if $DO_QUIT;
 		++$nr;
 		if ($TXN_BYTES <= 0) {
 			cidx_ckpoint($self, "[$n] $nr");
@@ -298,6 +298,7 @@ sub run_todo ($) {
 
 sub need_reap { # post_loop_do
 	my (undef, $jobs) = @_;
+	return if !$LIVE || $DO_QUIT;
 	scalar(keys(%$LIVE)) > $jobs;
 }
 
@@ -412,7 +413,7 @@ sub check_existing { # retry_reopen callback
 sub partition_refs ($$$) {
 	my ($self, $git, $refs) = @_; # show-ref --heads --tags --hash output
 	sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
-	my $fh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
+	my $rfh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
 	close $refs or die "close: $!";
 	my ($seen, $nchange) = (0, 0);
 	my @shard_in = map {
@@ -421,7 +422,7 @@ sub partition_refs ($$$) {
 		$fh;
 	} @RDONLY_SHARDS;
 
-	while (defined(my $cmt = <$fh>)) {
+	while (defined(my $cmt = <$rfh>)) {
 		chomp $cmt;
 		my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS);
 		if (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) {
@@ -431,8 +432,13 @@ sub partition_refs ($$$) {
 			++$nchange;
 			$seen = 0;
 		}
+		if ($DO_QUIT) {
+			close($rfh);
+			return ();
+		}
 	}
-	close($fh);
+	close($rfh);
+	return () if $DO_QUIT;
 	if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) {
 		$self->{nchange} += $nchange;
 		progress($self, "$git->{git_dir}: $nchange commits");
@@ -454,9 +460,18 @@ sub shard_commit { # via wq_io_do
 
 sub consumers_open { # post_loop_do
 	my (undef, $consumers) = @_;
+	return if $DO_QUIT;
 	scalar(grep { $_->{sock} } values %$consumers);
 }
 
+sub wait_consumers ($$$) {
+	my ($self, $git, $consumers) = @_;
+	local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
+	PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
+	my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
+	die "E: $git->{git_dir} $n shards failed" if $n && !$DO_QUIT;
+}
+
 sub commit_used_shards ($$$) {
 	my ($self, $git, $consumers) = @_;
 	local $self->{-shard_ok} = {};
@@ -466,15 +481,12 @@ sub commit_used_shards ($$$) {
 		$IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n);
 		$consumers->{$n} = $c;
 	}
-	local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
-	PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
-	my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
-	die "E: $git->{git_dir} $n shards failed" if $n;
+	wait_consumers($self, $git, $consumers);
 }
 
 sub index_repo { # cidx_await cb
 	my ($self, $git, $roots) = @_;
-	return if $git->{-cidx_err};
+	return if $git->{-cidx_err} || $DO_QUIT;
 	my $repo = delete $git->{-repo} or return;
 	seek($roots, 0, SEEK_SET) or die "seek: $!";
 	chomp(my @roots = <$roots>);
@@ -484,31 +496,29 @@ sub index_repo { # cidx_await cb
 	local $self->{current_info} = $git->{git_dir};
 	my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
 	local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1
-	my %CONSUMERS;
+	my $consumers = {};
 	for my $n (0..$#shard_in) {
 		-s $shard_in[$n] or next;
+		last if $DO_QUIT;
 		my ($c, $p) = PublicInbox::PktOp->pair;
 		$c->{ops}->{shard_done} = [ $self ];
 		$IDX_SHARDS[$n]->wq_io_do('shard_index',
 					[ $shard_in[$n], $p->{op_p} ],
 					$git, $n, \@roots);
-		$CONSUMERS{$n} = $c;
+		$consumers->{$n} = $c;
 	}
 	@shard_in = ();
-	local @PublicInbox::DS::post_loop_do = (\&consumers_open, \%CONSUMERS);
-	PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
-	my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS;
-	die "E: $git->{git_dir} $n shards failed" if $n;
+	wait_consumers($self, $git, $consumers);
 	if ($DO_QUIT) {
-		commit_used_shards($self, $git, \%CONSUMERS);
+		commit_used_shards($self, $git, $consumers);
 		progress($self, "$git->{git_dir}: done");
 		return;
 	}
 	$repo->{git_dir} = $git->{git_dir};
 	my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
 	if ($id > 0) {
-		$CONSUMERS{$repo->{shard_n}} = undef;
-		commit_used_shards($self, $git, \%CONSUMERS);
+		$consumers->{$repo->{shard_n}} = undef;
+		commit_used_shards($self, $git, $consumers);
 		progress($self, "$git->{git_dir}: done");
 		return run_todo($self);
 	}
@@ -585,6 +595,7 @@ sub scan_git_dirs ($) {
 							$self, $git);
 		fp_start($self, $git, $prep_repo);
 		ct_start($self, $git, $prep_repo);
+		last if $DO_QUIT;
 	}
 	cidx_reap($self, 0);
 }
@@ -674,8 +685,10 @@ sub ipc_atfork_child {
 
 sub shard_done_wait { # awaitpid cb via ipc_worker_reap
 	my ($pid, $shard, $self) = @_;
+	my $quit_req = delete($shard->{-cidx_quit});
+	return if $DO_QUIT || !$LIVE;
 	if ($? == 0) { # success
-		delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
+		$quit_req // warn 'BUG: {-cidx_quit} unset';
 		return;
 	}
 	warn "PID:$pid $shard->{shard} exited with \$?=$?\n";

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 22/28] spawn: show failing directory for chdir failures
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (19 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 21/28] cindex: improve granularity of quit checks Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 23/28] cindex: filter out non-existent git directories Eric Wong
                     ` (5 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

Our use of `git rev-parse --git-dir' depends on our (v)fork+exec
wrapper doing chdir, so the error message is required to avoid
user confusion.  I'm still avoiding `git -C $DIR' for now since
ancient versions of git did not support it.
---
 lib/PublicInbox/Spawn.pm   | 6 ++++--
 lib/PublicInbox/SpawnPP.pm | 2 +-
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index dc11543a..878843a6 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -122,8 +122,10 @@ int pi_fork_exec(SV *redirref, SV *file, SV *cmdref, SV *envref, SV *rlimref,
 			exit_err("setpgid", &cerrnum);
 		for (sig = 1; sig < NSIG; sig++)
 			signal(sig, SIG_DFL); /* ignore errors on signals */
-		if (*cd && chdir(cd) < 0)
-			exit_err("chdir", &cerrnum);
+		if (*cd && chdir(cd) < 0) {
+			write(2, "cd ", 3);
+			exit_err(cd, &cerrnum);
+		}
 
 		max_rlim = av_len(rlim);
 		for (i = 0; i < max_rlim; i += 3) {
diff --git a/lib/PublicInbox/SpawnPP.pm b/lib/PublicInbox/SpawnPP.pm
index 5609f74a..d6c863f8 100644
--- a/lib/PublicInbox/SpawnPP.pm
+++ b/lib/PublicInbox/SpawnPP.pm
@@ -37,7 +37,7 @@ sub pi_fork_exec ($$$$$$$) {
 		}
 		$SIG{$_} = 'DEFAULT' for grep(!/\A__/, keys %SIG);
 		if ($cd ne '') {
-			chdir $cd or die "chdir $cd: $!";
+			chdir $cd or die "cd $cd: $!";
 		}
 		while (@$rlim) {
 			my ($r, $soft, $hard) = splice(@$rlim, 0, 3);

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 23/28] cindex: filter out non-existent git directories
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (20 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 22/28] spawn: show failing directory for chdir failures Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 24/28] cindex: add support for --prune Eric Wong
                     ` (4 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

We'll just warn them about our non-existent prune support,
for now, and implement --prune in the next commit.
---
 lib/PublicInbox/CodeSearchIdx.pm | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 4f91e0b6..e875b93e 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -547,7 +547,19 @@ sub load_existing ($) { # for -u/--update
 		local $self->{xdb};
 		$self->xdb or
 			die "E: $self->{cidx_dir} non-existent for --update\n";
-		my @cur = $self->all_terms('P');
+		my @missing;
+		my @cur = grep {
+			if (-e $_) {
+				1;
+			} else {
+				push @missing, $_;
+				undef;
+			}
+		} $self->all_terms('P');
+		@missing and warn "W: the following repos no longer exist:\n",
+				(map { "W:\t$_\n" } @missing),
+				"W: use --prune to remove them from ",
+				$self->{cidx_dir}, "\n";
 		push @$dirs, @cur;
 	}
 	my %uniq; # List::Util::uniq requires Perl 5.26+

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 24/28] cindex: add support for --prune
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (21 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 23/28] cindex: filter out non-existent git directories Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 25/28] cindex: implement reindex Eric Wong
                     ` (3 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

This gets rid of both inaccessible commits AND repositories.
It will only unindex commits which are pruned in git, first,
so repos with auto GC disabled will need GC to prune them.
---
 lib/PublicInbox/CodeSearchIdx.pm | 86 ++++++++++++++++++++++++++++++--
 t/cindex.t                       | 16 ++++++
 2 files changed, 99 insertions(+), 3 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index e875b93e..095c153e 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -43,6 +43,7 @@ our (
 	@RDONLY_SHARDS, # Xapian::Database
 	@IDX_SHARDS, # clones of self
 	$MAX_SIZE,
+	$TMP_GIT, # PublicInbox::Git object for --reindex and --prune
 );
 
 # stop walking history if we see >$SEEN_MAX existing commits, this assumes
@@ -543,7 +544,7 @@ sub git { $_[0]->{git} }
 sub load_existing ($) { # for -u/--update
 	my ($self) = @_;
 	my $dirs = $self->{git_dirs} // [];
-	if ($self->{-opt}->{update}) {
+	if ($self->{-opt}->{update} || $self->{-opt}->{prune}) {
 		local $self->{xdb};
 		$self->xdb or
 			die "E: $self->{cidx_dir} non-existent for --update\n";
@@ -556,6 +557,7 @@ sub load_existing ($) { # for -u/--update
 				undef;
 			}
 		} $self->all_terms('P');
+		@missing = () if $self->{-opt}->{prune};
 		@missing and warn "W: the following repos no longer exist:\n",
 				(map { "W:\t$_\n" } @missing),
 				"W: use --prune to remove them from ",
@@ -612,6 +614,64 @@ sub scan_git_dirs ($) {
 	cidx_reap($self, 0);
 }
 
+sub prune_cb { # git->check_async callback
+	my ($hex, $type, undef, $self_id) = @_;
+	if ($type ne 'commit') {
+		my ($self, $id) = @$self_id;
+		progress($self, "$hex $type");
+		++$self->{pruned};
+		$self->{xdb}->delete_document($id);
+	}
+}
+
+sub shard_prune { # via wq_io_do
+	my ($self, $n, $git_dir) = @_;
+	my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
+	my $git = PublicInbox::Git->new($git_dir); # TMP_GIT copy
+	$self->begin_txn_lazy;
+	my $xdb = $self->{xdb};
+	my $cur = $xdb->postlist_begin('Tc');
+	my $end = $xdb->postlist_end('Tc');
+	my ($id, @cmt, $oid);
+	local $self->{pruned} = 0;
+	for (; $cur != $end && !$DO_QUIT; $cur++) {
+		@cmt = xap_terms('Q', $xdb, $id = $cur->get_docid);
+		scalar(@cmt) == 1 or
+			warn "BUG? shard[$n] #$id has multiple commits: @cmt";
+		for $oid (@cmt) {
+			$git->check_async($oid, \&prune_cb, [ $self, $id ]);
+		}
+	}
+	$git->async_wait_all;
+	for my $d ($self->all_terms('P')) { # GIT_DIR paths
+		last if $DO_QUIT;
+		next if -d $d;
+		for $id (docids_by_postlist($self, 'P'.$d)) {
+			progress($self, "$d gone #$id");
+			$xdb->delete_document($id);
+		}
+	}
+	$self->commit_txn_lazy;
+	$self->{pruned} and
+		progress($self, "[$n] pruned $self->{pruned} commits");
+	send($op_p, "shard_done $n", MSG_EOR);
+}
+
+sub do_prune ($) {
+	my ($self) = @_;
+	my $consumers = {};
+	my $git_dir = $TMP_GIT->{git_dir};
+	my $n = 0;
+	local $self->{-shard_ok} = {};
+	for my $s (@IDX_SHARDS) {
+		my ($c, $p) = PublicInbox::PktOp->pair;
+		$c->{ops}->{shard_done} = [ $self ];
+		$s->wq_io_do('shard_prune', [ $p->{op_p} ], $n, $git_dir);
+		$consumers->{$n++} = $c;
+	}
+	wait_consumers($self, $TMP_GIT, $consumers);
+}
+
 sub shards_active { # post_loop_do
 	scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
 }
@@ -625,6 +685,25 @@ sub parent_quit {
 	warn "# SIG$_[0] received, quitting...\n";
 }
 
+sub init_tmp_git_dir ($) {
+	my ($self) = @_;
+	return unless ($self->{-opt}->{prune} || $self->{-opt}->{reindex});
+	require File::Temp;
+	require PublicInbox::Import;
+	my $tmp = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
+	PublicInbox::Import::init_bare("$tmp", 'cidx-all');
+	my $f = "$tmp/objects/info/alternates";
+	open my $fh, '>', $f or die "open($f): $!";
+	my $o;
+	for (@{$self->{git_dirs}}) { # TODO: sha256 check?
+		$o = $_.'/objects';
+		say $fh $o if -d $o;
+	}
+	close $fh or die "close($f): $!";
+	$TMP_GIT = PublicInbox::Git->new("$tmp");
+	$TMP_GIT->{-tmp} = $tmp;
+}
+
 sub cidx_run { # main entry point
 	my ($self) = @_;
 	local $self->{todo} = [];
@@ -634,6 +713,7 @@ sub cidx_run { # main entry point
 		\&PublicInbox::DS::sig_setmask, $SIGSET);
 	local $LIVE = {};
 	local $DO_QUIT;
+	local $TMP_GIT;
 	local @IDX_SHARDS = cidx_init($self);
 	local $self->{current_info} = '';
 	local $MY_SIG = {
@@ -671,8 +751,8 @@ sub cidx_run { # main entry point
 	local $LIVE_JOBS = $self->{-opt}->{jobs} ||
 			PublicInbox::IPC::detect_nproc() || 2;
 	local @RDONLY_SHARDS = $self->xdb_shards_flat;
-
-	# do_prune($self) if $self->{-opt}->{prune}; TODO
+	init_tmp_git_dir($self);
+	do_prune($self) if $self->{-opt}->{prune};
 	scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
 
 	for my $s (@IDX_SHARDS) {
diff --git a/t/cindex.t b/t/cindex.t
index c93e4e4e..5d269217 100644
--- a/t/cindex.t
+++ b/t/cindex.t
@@ -95,4 +95,20 @@ EOM
 	is(scalar($mset->items), 1, 'got updated result');
 }
 
+if ('--prune') {
+	my $csrch = PublicInbox::CodeSearch->new("$tmp/ext");
+	is(scalar($csrch->mset('s:hi')->items), 1, 'got hit');
+
+	rename("$tmp/wt0/.git", "$tmp/wt0/.giit") or xbail "rename $!";
+	ok(run_script([qw(-cindex -q --prune -d), "$tmp/ext"]), 'prune');
+	$csrch->reopen;
+	is(scalar($csrch->mset('s:hi')->items), 0, 'hit pruned');
+
+	rename("$tmp/wt0/.giit", "$tmp/wt0/.git") or xbail "rename $!";
+	ok(run_script([qw(-cindex -qu -d), "$tmp/ext"]), 'update');
+	$csrch->reopen;
+	is(scalar($csrch->mset('s:hi')->items), 0,
+		'hit stays pruned since GIT_DIR was previously pruned');
+}
+
 done_testing;

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 25/28] cindex: implement reindex
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (22 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 24/28] cindex: add support for --prune Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 26/28] cindex: squelch incompatible options Eric Wong
                     ` (2 subsequent siblings)
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

This allows changing --indexlevel at the moment and will allow
us to fix some yet-to-be-discovered bugs or backwards-compatible
improvements in the future.
---
 lib/PublicInbox/CodeSearchIdx.pm | 33 ++++++++++++++++++++++----------
 t/cindex.t                       |  4 ++++
 2 files changed, 27 insertions(+), 10 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 095c153e..5e6c0d22 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -43,7 +43,8 @@ our (
 	@RDONLY_SHARDS, # Xapian::Database
 	@IDX_SHARDS, # clones of self
 	$MAX_SIZE,
-	$TMP_GIT, # PublicInbox::Git object for --reindex and --prune
+	$TMP_GIT, # PublicInbox::Git object for --prune
+	$REINDEX, # PublicInbox::SharedKV
 );
 
 # stop walking history if we see >$SEEN_MAX existing commits, this assumes
@@ -89,12 +90,13 @@ sub new {
 # TODO: may be used for reshard/compact
 sub count_shards { scalar($_[0]->xdb_shards_flat) }
 
-sub add_commit ($$) {
+sub update_commit ($$) {
 	my ($self, $cmt) = @_; # fields from @FMT
 	my $x = 'Q'.$cmt->{H};
-	for (docids_by_postlist($self, $x)) {
-		$self->{xdb}->delete_document($_)
-	}
+	my ($docid, @extra) = sort { $a <=> $b } docids_by_postlist($self, $x);
+	@extra and warn "W: $cmt->{H} indexed multiple times, pruning ",
+			join(', ', map { "#$_" } @extra), "\n";
+	$self->{xdb}->delete_document($_) for @extra;
 	my $doc = $PublicInbox::Search::X{Document}->new;
 	$doc->add_boolean_term($x);
 	$doc->add_boolean_term('G'.$_) for @{$self->{roots}};
@@ -119,7 +121,8 @@ sub add_commit ($$) {
 
 	$x = delete $cmt->{b};
 	$self->index_body_text($doc, \$x) if $x =~ /\S/s;
-	$self->{xdb}->add_document($doc);
+	defined($docid) ? $self->{xdb}->replace_document($docid, $doc) :
+			$self->{xdb}->add_document($doc);
 }
 
 sub progress {
@@ -235,7 +238,7 @@ sub shard_index { # via wq_io_do
 			cidx_ckpoint($self, "[$n] $nr");
 			$TXN_BYTES = $batch_bytes - $len;
 		}
-		add_commit($self, $cmt);
+		update_commit($self, $cmt);
 		++$nr;
 		if ($TXN_BYTES <= 0) {
 			cidx_ckpoint($self, "[$n] $nr");
@@ -398,7 +401,7 @@ sub check_existing { # retry_reopen callback
 	my $docid = shift(@docids) // return get_roots($self, $git);
 	my $doc = $shard->{xdb}->get_document($docid) //
 			die "BUG: no #$docid ($git->{git_dir})";
-	my $old_fp = $doc->get_data;
+	my $old_fp = $REINDEX ? "\0invalid" : $doc->get_data;
 	if ($old_fp eq $git->{-repo}->{fp}) { # no change
 		delete $git->{-repo};
 		return;
@@ -426,7 +429,10 @@ sub partition_refs ($$$) {
 	while (defined(my $cmt = <$rfh>)) {
 		chomp $cmt;
 		my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS);
-		if (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) {
+		if ($REINDEX && $REINDEX->set_maybe(pack('H*', $cmt), '')) {
+			say { $shard_in[$n] } $cmt or die "say: $!";
+			++$nchange;
+		} elsif (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) {
 			last if ++$seen > $SEEN_MAX;
 		} else {
 			say { $shard_in[$n] } $cmt or die "say: $!";
@@ -687,7 +693,7 @@ sub parent_quit {
 
 sub init_tmp_git_dir ($) {
 	my ($self) = @_;
-	return unless ($self->{-opt}->{prune} || $self->{-opt}->{reindex});
+	return unless $self->{-opt}->{prune};
 	require File::Temp;
 	require PublicInbox::Import;
 	my $tmp = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
@@ -729,6 +735,13 @@ sub cidx_run { # main entry point
 		$cb->($m, @_);
 	};
 	load_existing($self);
+	local $REINDEX;
+	if ($self->{-opt}->{reindex}) {
+		require PublicInbox::SharedKV;
+		$REINDEX = PublicInbox::SharedKV->new;
+		delete $REINDEX->{lock_path};
+		$REINDEX->dbh;
+	}
 	my @nc = grep { File::Spec->canonpath($_) ne $_ } @{$self->{git_dirs}};
 	if (@nc) {
 		warn "E: BUG? paths in $self->{cidx_dir} not canonicalized:\n";
diff --git a/t/cindex.t b/t/cindex.t
index 5d269217..eb66b2e6 100644
--- a/t/cindex.t
+++ b/t/cindex.t
@@ -93,6 +93,10 @@ EOM
 	ok(run_script([qw(-cindex -qu -d), "$tmp/ext"]), '-cindex -u');
 	$mset = $csrch->reopen->mset('dfn:for-update');
 	is(scalar($mset->items), 1, 'got updated result');
+
+	ok(run_script([qw(-cindex -qu --reindex -d), "$tmp/ext"]), 'reindex');
+	$mset = $csrch->reopen->mset('dfn:for-update');
+	is(scalar($mset->items), 1, 'same result after reindex');
 }
 
 if ('--prune') {

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 26/28] cindex: squelch incompatible options
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (23 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 25/28] cindex: implement reindex Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 27/28] cindex: respect existing permissions Eric Wong
  2023-03-21 23:07   ` [PATCH 28/28] cindex: ignore SIGPIPE Eric Wong
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

Some options don't make sense when used together.
---
 lib/PublicInbox/CodeSearchIdx.pm | 2 +-
 script/public-inbox-cindex       | 8 ++++++++
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 5e6c0d22..21c43973 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -734,7 +734,7 @@ sub cidx_run { # main entry point
 			$m =~ s/\A(#?\s*)/$1$self->{current_info}: /;
 		$cb->($m, @_);
 	};
-	load_existing($self);
+	load_existing($self) unless $self->{-internal};
 	local $REINDEX;
 	if ($self->{-opt}->{reindex}) {
 		require PublicInbox::SharedKV;
diff --git a/script/public-inbox-cindex b/script/public-inbox-cindex
index e2500b93..f8a3ebbb 100755
--- a/script/public-inbox-cindex
+++ b/script/public-inbox-cindex
@@ -52,6 +52,11 @@ my @git_dirs;
 if (defined(my $pl = $opt->{'project-list'})) {
 	my $pfx = shift @ARGV // die <<EOM;
 PROJECTS_ROOT required for --project-list
+EOM
+	@ARGV and die <<EOM;
+--project-list does not accept additional directories
+(@ARGV)
+beyond `$pfx'
 EOM
 	open my $fh, '<', $pl or die "open($pl): $!\n";
 	chomp(@git_dirs = <$fh>);
@@ -67,6 +72,9 @@ if (defined $cidx_dir) { # external index
 } elsif (!@git_dirs) {
 	die $help
 } else {
+	die <<EOM if $opt->{update};
+--update requires `-d EXTDIR'
+EOM
 	for my $gd (@git_dirs) {
 		my $cd = "$gd/public-inbox-cindex";
 		my $cidx = PublicInbox::CodeSearchIdx->new($cd, { %$opt });

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 27/28] cindex: respect existing permissions
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (24 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 26/28] cindex: squelch incompatible options Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-21 23:07   ` [PATCH 28/28] cindex: ignore SIGPIPE Eric Wong
  26 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

For internal ($GIT_DIR/public-inbox-cindex) Xapian DBs, we can
rely on core.sharedRepository.  For external ones, we'll just
rely on existing permissions if the directory already exists.
---
 lib/PublicInbox/CodeSearchIdx.pm | 29 ++++++++++++++++++++++++++++-
 t/cindex.t                       | 22 +++++++++++++++++++---
 2 files changed, 47 insertions(+), 4 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 21c43973..704baa9c 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -710,8 +710,35 @@ sub init_tmp_git_dir ($) {
 	$TMP_GIT->{-tmp} = $tmp;
 }
 
+sub prep_umask ($) {
+	my ($self) = @_;
+	my $um;
+	my $cur = umask;
+	if ($self->{-internal}) { # respect core.sharedRepository
+		@{$self->{git_dirs}} == 1 or die 'BUG: only for GIT_DIR';
+		# yuck, FIXME move umask handling out of inbox-specific stuff
+		require PublicInbox::InboxWritable;
+		my $git = PublicInbox::Git->new($self->{git_dirs}->[0]);
+		chomp($um = $git->qx('config', 'core.sharedRepository') // '');
+		$um = PublicInbox::InboxWritable::_git_config_perm(undef, $um);
+		$um = PublicInbox::InboxWritable::_umask_for($um);
+		umask == $um or progress($self, 'umask from git: ',
+						sprintf('0%03o', $um));
+	} elsif (-d $self->{cidx_dir}) { # respect existing perms
+		my @st = stat(_);
+		$um = (~$st[2] & 0777);
+		umask == $um or progress($self, 'using umask from ',
+						$self->{cidx_dir}, ': ',
+						sprintf('0%03o', $um));
+	}
+	defined($um) ?
+		PublicInbox::OnDestroy->new(\&CORE::umask, umask($um)) :
+		undef;
+}
+
 sub cidx_run { # main entry point
 	my ($self) = @_;
+	my $restore_umask = prep_umask($self);
 	local $self->{todo} = [];
 	local $DEFER = $self->{todo};
 	local $SIGSET = PublicInbox::DS::block_signals();
@@ -800,7 +827,7 @@ sub shard_done_wait { # awaitpid cb via ipc_worker_reap
 	++$self->{shard_err} if defined($self->{shard_err});
 }
 
-sub with_umask { # TODO
+sub with_umask { # TODO get rid of this treewide and rely on OnDestroy
 	my ($self, $cb, @arg) = @_;
 	$cb->(@arg);
 }
diff --git a/t/cindex.t b/t/cindex.t
index eb66b2e6..9da0ba69 100644
--- a/t/cindex.t
+++ b/t/cindex.t
@@ -12,9 +12,10 @@ my $pwd = getcwd();
 
 # I reworked CodeSearchIdx->shard_worker to handle empty trees
 # in the initial commit generated by cvs2svn for xapian.git
-create_coderepo 'empty-tree-root', tmpdir => "$tmp/wt0", sub {
+create_coderepo 'empty-tree-root-0600', tmpdir => "$tmp/wt0", sub {
 	xsys_e([qw(/bin/sh -c), <<'EOM']);
 git init -q &&
+git config core.sharedRepository 0600
 tree=$(git mktree </dev/null) &&
 head=$(git symbolic-ref HEAD) &&
 cmt=$(echo 'empty root' | git commit-tree $tree) &&
@@ -27,8 +28,14 @@ EOM
 }; # /create_coderepo
 
 ok(run_script([qw(-cindex --dangerous -q), "$tmp/wt0"]), 'cindex internal');
-ok(-e "$tmp/wt0/.git/public-inbox-cindex/cidx.lock", 'internal dir created');
-
+{
+	my $exists = -e "$tmp/wt0/.git/public-inbox-cindex/cidx.lock";
+	my @st = stat(_);
+	ok($exists, 'internal dir created');
+	is($st[2] & 0600, 0600, 'mode respects core.sharedRepository');
+	@st = stat("$tmp/wt0/.git/public-inbox-cindex");
+	is($st[2] & 0700, 0700, 'dir mode respects core.sharedRepository');
+}
 
 # it's possible for git to emit NUL characters in diffs
 # (see c4201214cbf10636e2c1ab9131573f735b42c8d4 in linux.git)
@@ -115,4 +122,13 @@ if ('--prune') {
 		'hit stays pruned since GIT_DIR was previously pruned');
 }
 
+File::Path::remove_tree("$tmp/ext");
+ok(mkdir("$tmp/ext", 0707), 'create $tmp/ext with odd permissions');
+ok(run_script([qw(-cindex --dangerous -q -d), "$tmp/ext", $zp]),
+	'external on existing dir');
+{
+	my @st = stat("$tmp/ext/cidx.lock");
+	is($st[2] & 0777, 0604, 'created lock respects odd permissions');
+}
+
 done_testing;

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 28/28] cindex: ignore SIGPIPE
  2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
                     ` (25 preceding siblings ...)
  2023-03-21 23:07   ` [PATCH 27/28] cindex: respect existing permissions Eric Wong
@ 2023-03-21 23:07   ` Eric Wong
  2023-03-24 10:40     ` [PATCH 29/28] cindex: --prune checkpoints to avoid OOM Eric Wong
  26 siblings, 1 reply; 30+ messages in thread
From: Eric Wong @ 2023-03-21 23:07 UTC (permalink / raw)
  To: meta

We check for all socket write errors anyways, and I don't expect
stderr output to be significant enough to matter.
---
 script/public-inbox-cindex | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/script/public-inbox-cindex b/script/public-inbox-cindex
index f8a3ebbb..fb906bad 100755
--- a/script/public-inbox-cindex
+++ b/script/public-inbox-cindex
@@ -36,7 +36,8 @@ die "--jobs must be >= 0\n" if defined $opt->{jobs} && $opt->{jobs} < 0;
 require IO::Handle;
 STDOUT->autoflush(1);
 STDERR->autoflush(1);
-local $SIG{USR1} = 'IGNORE'; # to be overridden in cidx_sync
+$SIG{USR1} = 'IGNORE'; # to be overridden in cidx_sync
+$SIG{PIPE} = 'IGNORE';
 # require lazily to speed up --help
 require PublicInbox::Admin;
 PublicInbox::Admin::do_chdir(delete $opt->{C});

^ permalink raw reply related	[flat|nested] 30+ messages in thread

* [PATCH 29/28] cindex: --prune checkpoints to avoid OOM
  2023-03-21 23:07   ` [PATCH 28/28] cindex: ignore SIGPIPE Eric Wong
@ 2023-03-24 10:40     ` Eric Wong
  0 siblings, 0 replies; 30+ messages in thread
From: Eric Wong @ 2023-03-24 10:40 UTC (permalink / raw)
  To: meta

Having many ->delete_document calls in a transaction still
causes Xapian to eat up a large amount of memory and OOM on my
system.

I may reimplement --prune to avoid blocking ongoing updates, but
this is a simple fix for swapping and OOMs for now.
---
 lib/PublicInbox/CodeSearchIdx.pm | 23 +++++++++++++++++------
 1 file changed, 17 insertions(+), 6 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 704baa9c..e353f452 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -622,12 +622,21 @@ sub scan_git_dirs ($) {
 
 sub prune_cb { # git->check_async callback
 	my ($hex, $type, undef, $self_id) = @_;
-	if ($type ne 'commit') {
-		my ($self, $id) = @$self_id;
-		progress($self, "$hex $type");
-		++$self->{pruned};
-		$self->{xdb}->delete_document($id);
-	}
+	return if $type eq 'commit';
+	my ($self, $id) = @$self_id;
+	my $len = $self->{xdb}->get_doclength($id);
+	progress($self, "$hex $type (doclength=$len)");
+	++$self->{pruned};
+	$self->{xdb}->delete_document($id);
+
+	# all math around batch_bytes calculation is pretty fuzzy,
+	# but need a way to regularly flush output to avoid OOM,
+	# so assume the average term + position overhead is the
+	# answer to everything: 42
+	return if ($self->{batch_bytes} -= ($len * 42)) > 0;
+	cidx_ckpoint($self, "[$self->{shard}] $self->{pruned}");
+	$self->{batch_bytes} = $self->{-opt}->{batch_size} //
+			$PublicInbox::SearchIdx::BATCH_BYTES;
 }
 
 sub shard_prune { # via wq_io_do
@@ -639,6 +648,8 @@ sub shard_prune { # via wq_io_do
 	my $cur = $xdb->postlist_begin('Tc');
 	my $end = $xdb->postlist_end('Tc');
 	my ($id, @cmt, $oid);
+	local $self->{batch_bytes} = $self->{-opt}->{batch_size} //
+				$PublicInbox::SearchIdx::BATCH_BYTES;
 	local $self->{pruned} = 0;
 	for (; $cur != $end && !$DO_QUIT; $cur++) {
 		@cmt = xap_terms('Q', $xdb, $id = $cur->get_docid);

^ permalink raw reply related	[flat|nested] 30+ messages in thread

end of thread, other threads:[~2023-03-24 10:40 UTC | newest]

Thread overview: 30+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-03-21 23:07 [PATCH 00/28] cindex coderepo commit indexer Eric Wong
2023-03-21 23:07 ` [PATCH 01/28] ipc: move nproc_shards from v2writable Eric Wong
2023-03-21 23:07   ` [PATCH 02/28] search: relocate all_terms from lei_search Eric Wong
2023-03-21 23:07   ` [PATCH 03/28] admin: hoist out resolve_git_dir Eric Wong
2023-03-21 23:07   ` [PATCH 04/28] admin: ensure resolved GIT_DIR is absolute Eric Wong
2023-03-21 23:07   ` [PATCH 05/28] test_common: create_inbox: use `$!' properly on mkdir failure Eric Wong
2023-03-21 23:07   ` [PATCH 06/28] codesearch: initial cut w/ -cindex tool Eric Wong
2023-03-21 23:07   ` [PATCH 07/28] cindex: parallelize prep phases Eric Wong
2023-03-21 23:07   ` [PATCH 08/28] cindex: use read-only shards during " Eric Wong
2023-03-21 23:07   ` [PATCH 09/28] searchidxshard: improve comment wording Eric Wong
2023-03-21 23:07   ` [PATCH 10/28] cindex: use DS and workqueues for parallelism Eric Wong
2023-03-21 23:07   ` [PATCH 11/28] ds: @post_loop_do replaces SetPostLoopCallback Eric Wong
2023-03-21 23:07   ` [PATCH 12/28] cindex: implement --exclude= like -clone Eric Wong
2023-03-21 23:07   ` [PATCH 13/28] cindex: show shard number in progress message Eric Wong
2023-03-21 23:07   ` [PATCH 14/28] cindex: drop `unchanged' " Eric Wong
2023-03-21 23:07   ` [PATCH 15/28] cindex: handle graceful shutdown by default Eric Wong
2023-03-21 23:07   ` [PATCH 16/28] sigfd: pass signal name rather than number to callback Eric Wong
2023-03-21 23:07   ` [PATCH 17/28] cindex: implement --max-size=SIZE Eric Wong
2023-03-21 23:07   ` [PATCH 18/28] cindex: check for checkpoint before giant messages Eric Wong
2023-03-21 23:07   ` [PATCH 19/28] cindex: truncate or drop body for over-sized commits Eric Wong
2023-03-21 23:07   ` [PATCH 20/28] cindex: attempt to give oldest commits lowest docids Eric Wong
2023-03-21 23:07   ` [PATCH 21/28] cindex: improve granularity of quit checks Eric Wong
2023-03-21 23:07   ` [PATCH 22/28] spawn: show failing directory for chdir failures Eric Wong
2023-03-21 23:07   ` [PATCH 23/28] cindex: filter out non-existent git directories Eric Wong
2023-03-21 23:07   ` [PATCH 24/28] cindex: add support for --prune Eric Wong
2023-03-21 23:07   ` [PATCH 25/28] cindex: implement reindex Eric Wong
2023-03-21 23:07   ` [PATCH 26/28] cindex: squelch incompatible options Eric Wong
2023-03-21 23:07   ` [PATCH 27/28] cindex: respect existing permissions Eric Wong
2023-03-21 23:07   ` [PATCH 28/28] cindex: ignore SIGPIPE Eric Wong
2023-03-24 10:40     ` [PATCH 29/28] cindex: --prune checkpoints to avoid OOM Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).