unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
* [PATCH 0/7] v2: swap in new IPC package
@ 2021-01-03  2:06 Eric Wong
  2021-01-03  2:06 ` [PATCH 1/7] ipc: some documentation comments Eric Wong
                   ` (6 more replies)
  0 siblings, 7 replies; 8+ messages in thread
From: Eric Wong @ 2021-01-03  2:06 UTC (permalink / raw)
  To: meta

SearchIdxShard was too big and adding the new extindex
stuff made things worse.  Since I intend to use IPC
in more places, I figured it'd be good to prove it with
works well by dropping it into the old v2 mix.

The below diffstat is nice

Eric Wong (7):
  ipc: some documentation comments
  searchidxshard: use PublicInbox::IPC to kill lots of code
  searchidxshard: IPC conversion, part 2
  searchidxshard: replace index_raw with index_eml
  use Eml (or MIME) objects for all indexing paths
  ipc: switch to one-way pipes
  searchidxshard: use add_xapian directly for v2

 lib/PublicInbox/ExtSearchIdx.pm   |  38 +++--
 lib/PublicInbox/IPC.pm            | 127 +++++++++------
 lib/PublicInbox/Import.pm         |   4 +-
 lib/PublicInbox/LeiStore.pm       |  18 +--
 lib/PublicInbox/SearchIdx.pm      |  35 +---
 lib/PublicInbox/SearchIdxShard.pm | 257 +++++-------------------------
 lib/PublicInbox/Smsg.pm           |  13 ++
 lib/PublicInbox/V2Writable.pm     | 102 +++++-------
 t/import.t                        |  12 +-
 t/search.t                        |   2 +-
 10 files changed, 206 insertions(+), 402 deletions(-)

^ permalink raw reply	[flat|nested] 8+ messages in thread

* [PATCH 1/7] ipc: some documentation comments
  2021-01-03  2:06 [PATCH 0/7] v2: swap in new IPC package Eric Wong
@ 2021-01-03  2:06 ` Eric Wong
  2021-01-03  2:06 ` [PATCH 2/7] searchidxshard: use PublicInbox::IPC to kill lots of code Eric Wong
                   ` (5 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2021-01-03  2:06 UTC (permalink / raw)
  To: meta

Fix some comments and add some short summary descriptions to
hopefully make things easier-to-follow.
---
 lib/PublicInbox/IPC.pm | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 288a8c94..79cd34fe 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -10,7 +10,7 @@ use Socket qw(AF_UNIX SOCK_STREAM);
 use Carp qw(confess croak);
 use PublicInbox::Sigfd;
 my ($enc, $dec);
-# ->imports at BEGIN turns serial_*_with_object into custom ops on 5.14+
+# ->imports at BEGIN turns sereal_*_with_object into custom ops on 5.14+
 # and eliminate method call overhead
 BEGIN {
 	eval {
@@ -71,9 +71,10 @@ sub ipc_worker_loop ($$) {
 	}
 }
 
+# starts a worker if Sereal or Storable is installed
 sub ipc_worker_spawn {
 	my ($self, $ident, $oldset) = @_;
-	return unless $enc;
+	return unless $enc; # no Sereal or Storable
 	my $pid = $self->{-ipc_worker_pid};
 	confess "BUG: already spawned PID:$pid" if $pid;
 	confess "BUG: already have worker socket" if $self->{-ipc_sock};
@@ -108,15 +109,17 @@ sub ipc_worker_reap { # dwaitpid callback
 	warn "PID:$pid died with \$?=$?\n" if $?;
 }
 
-# for base class, override in superclasses
+# for base class, override in sub classes
 sub ipc_atfork_parent {}
 sub ipc_atfork_child {}
 
+# should only be called inside the worker process
 sub ipc_worker_exit {
 	my (undef, $code) = @_;
 	exit($code);
 }
 
+# idempotent, can be called regardless of whether worker is active or not
 sub ipc_worker_stop {
 	my ($self) = @_;
 	my $pid;
@@ -147,6 +150,7 @@ sub ipc_lock_init {
 	$self->{-ipc_lock} //= bless { lock_path => $f }, 'PublicInbox::Lock'
 }
 
+# call $self->$sub(@args), on a worker if ipc_worker_spawn was used
 sub ipc_do {
 	my ($self, $sub, @args) = @_;
 	if (my $s1 = $self->{-ipc_sock}) {

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 2/7] searchidxshard: use PublicInbox::IPC to kill lots of code
  2021-01-03  2:06 [PATCH 0/7] v2: swap in new IPC package Eric Wong
  2021-01-03  2:06 ` [PATCH 1/7] ipc: some documentation comments Eric Wong
@ 2021-01-03  2:06 ` Eric Wong
  2021-01-03  2:06 ` [PATCH 3/7] searchidxshard: IPC conversion, part 2 Eric Wong
                   ` (4 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2021-01-03  2:06 UTC (permalink / raw)
  To: meta

It's nice to prove the new code works by swapping it into
the current V2Writable / SearchIdxShard packages.  This is
only the first step for the core bits, and we'll be able
to delete more code in a subsequent patch.
---
 lib/PublicInbox/IPC.pm            |   2 +-
 lib/PublicInbox/SearchIdx.pm      |   9 +-
 lib/PublicInbox/SearchIdxShard.pm | 237 +++++++-----------------------
 lib/PublicInbox/V2Writable.pm     |  63 +++-----
 4 files changed, 74 insertions(+), 237 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 79cd34fe..6b7b3c7a 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -92,7 +92,7 @@ sub ipc_worker_spawn {
 		$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
 		local $0 = $ident;
 		PublicInbox::Sigfd::sig_setmask($oldset);
-		$self->ipc_atfork_child;
+		my $on_destroy = $self->ipc_atfork_child;
 		eval { ipc_worker_loop($self, $s2) };
 		die "worker $ident PID:$$ died: $@\n" if $@;
 		exit;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 88349faa..d83fd4ca 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -414,6 +414,7 @@ sub _msgmap_init ($) {
 sub add_message {
 	# mime = PublicInbox::Eml or Email::MIME object
 	my ($self, $mime, $smsg, $sync) = @_;
+	begin_txn_lazy($self);
 	my $mids = mids_for_index($mime);
 	$smsg //= bless { blob => '' }, 'PublicInbox::Smsg'; # test-only compat
 	$smsg->{mid} //= $mids->[0]; # v1 compatibility
@@ -1002,14 +1003,6 @@ sub commit_txn_lazy {
 		$self->with_umask(\&_commit_txn, $self);
 }
 
-sub worker_done {
-	my ($self) = @_;
-	if (need_xapian($self)) {
-		die "$$ $0 xdb not released\n" if $self->{xdb};
-	}
-	die "$$ $0 still in transaction\n" if $self->{txn};
-}
-
 sub eidx_shard_new {
 	my ($class, $eidx, $shard) = @_;
 	my $self = bless {
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
index 45240e07..68644bc0 100644
--- a/lib/PublicInbox/SearchIdxShard.pm
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -6,11 +6,8 @@
 package PublicInbox::SearchIdxShard;
 use strict;
 use v5.10.1;
-use parent qw(PublicInbox::SearchIdx);
-use bytes qw(length);
-use IO::Handle (); # autoflush
-use PublicInbox::Eml;
-use PublicInbox::Sigfd;
+use parent qw(PublicInbox::SearchIdx PublicInbox::IPC);
+use PublicInbox::OnDestroy;
 
 sub new {
 	my ($class, $v2w, $shard) = @_; # v2w may be ExtSearchIdx
@@ -21,238 +18,108 @@ sub new {
 	$self->idx_acquire;
 	$self->set_metadata_once;
 	$self->idx_release;
-	$self->spawn_worker($v2w, $shard) if $v2w->{parallel};
+	if ($v2w->{parallel}) {
+		local $self->{-v2w_afc} = $v2w;
+		$self->ipc_worker_spawn("shard[$shard]");
+	}
 	$self;
 }
 
-sub spawn_worker {
-	my ($self, $v2w, $shard) = @_;
-	my ($r, $w);
-	pipe($r, $w) or die "pipe failed: $!\n";
-	$w->autoflush(1);
-	my $oldset = PublicInbox::Sigfd::block_signals();
-	my $pid = fork;
-	defined $pid or die "fork failed: $!\n";
-	if ($pid == 0) {
-		eval { PublicInbox::DS->Reset };
-		# these signals are localized in parent
-		$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
-		PublicInbox::Sigfd::sig_setmask($oldset);
-		my $bnote = $v2w->atfork_child;
-		close $w or die "failed to close: $!";
-
-		# F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size here
-		# speeds V2Writable batch imports across 8 cores by nearly 20%
-		fcntl($r, 1031, 1048576) if $^O eq 'linux';
-
-		eval { shard_worker_loop($self, $v2w, $r, $shard, $bnote) };
-		die "worker $shard died: $@\n" if $@;
-		die "unexpected MM $self->{mm}" if $self->{mm};
-		exit;
+sub _worker_done {
+	my ($self) = @_;
+	if ($self->need_xapian) {
+		die "$$ $0 xdb not released\n" if $self->{xdb};
 	}
-	PublicInbox::Sigfd::sig_setmask($oldset);
-	$self->{pid} = $pid;
-	$self->{w} = $w;
-	close $r or die "failed to close: $!";
-}
-
-sub eml ($$) {
-	my ($r, $len) = @_;
-	return if $len == 0;
-	my $n = read($r, my $bref, $len) or die "read: $!\n";
-	$n == $len or die "short read: $n != $len\n";
-	PublicInbox::Eml->new(\$bref);
+	die "$$ $0 still in transaction\n" if $self->{txn};
 }
 
-# this reads all the writes to $self->{w} from the parent process
-sub shard_worker_loop ($$$$$) {
-	my ($self, $v2w, $r, $shard, $bnote) = @_;
-	$0 = "shard[$shard]";
+sub ipc_atfork_child { # called automatically before ipc_worker_loop
+	my ($self) = @_;
+	my $v2w = delete $self->{-v2w_afc} or die 'BUG: {-v2w_afc} missing';
+	$v2w->atfork_child; # calls shard_atfork_child on our siblings
+	$v2w->{current_info} = "[$self->{shard}]"; # for $SIG{__WARN__}
 	$self->begin_txn_lazy;
-	while (my $line = readline($r)) {
-		chomp $line;
-		$v2w->{current_info} = "[$shard] $line";
-		if ($line eq 'commit') {
-			$self->commit_txn_lazy;
-		} elsif ($line eq 'close') {
-			$self->idx_release;
-		} elsif ($line eq 'barrier') {
-			$self->commit_txn_lazy;
-			# no need to lock < 512 bytes is atomic under POSIX
-			print $bnote "barrier $shard\n" or
-					die "write failed for barrier $!\n";
-		} elsif ($line =~ /\AD ([0-9]+)\z/s) {
-			$self->remove_by_docid($1 + 0);
-		} elsif ($line =~ s/\A\+X //) {
-			my ($len, $docid, $eidx_key) = split(/ /, $line, 3);
-			$self->add_eidx_info($docid, $eidx_key, eml($r, $len));
-		} elsif ($line =~ s/\A-X //) {
-			my ($len, $docid, $eidx_key) = split(/ /, $line, 3);
-			$self->remove_eidx_info($docid, $eidx_key,
-							eml($r, $len));
-		} elsif ($line =~ s/\A=K (\d+) //) {
-			$self->set_keywords($1 + 0, split(/ /, $line));
-		} elsif ($line =~ s/\A-K (\d+) //) {
-			$self->remove_keywords($1 + 0, split(/ /, $line));
-		} elsif ($line =~ s/\A\+K (\d+) //) {
-			$self->add_keywords($1 + 0, split(/ /, $line));
-		} elsif ($line =~ s/\AO ([^\n]+)//) {
-			my $over_fn = $1;
-			$over_fn =~ tr/\0/\n/;
-			$self->over_check(PublicInbox::Over->new($over_fn));
-		} else {
-			my $eidx_key;
-			if ($line =~ s/\AX=(.+)\0//) {
-				$eidx_key = $1;
-				$v2w->{current_info} =~ s/\0/\\0 /;
-			}
-			# n.b. $mid may contain spaces(!)
-			my ($len, $bytes, $num, $oid, $ds, $ts, $tid, $mid)
-				= split(/ /, $line, 8);
-			$self->begin_txn_lazy;
-			my $smsg = bless {
-				bytes => $bytes,
-				num => $num + 0,
-				blob => $oid,
-				mid => $mid,
-				tid => $tid,
-				ds => $ds,
-				ts => $ts,
-			}, 'PublicInbox::Smsg';
-			$smsg->{eidx_key} = $eidx_key if defined($eidx_key);
-			$self->add_message(eml($r, $len), $smsg);
-		}
-	}
-	$self->worker_done;
+	# caller must capture this:
+	PublicInbox::OnDestroy->new($$, \&_worker_done, $self);
 }
 
 sub index_raw {
 	my ($self, $msgref, $eml, $smsg, $eidx_key) = @_;
-	if (my $w = $self->{w}) {
-		my @ekey = defined($eidx_key) ? ("X=$eidx_key\0") : ();
-		$msgref //= \($eml->as_string);
-		$smsg->{raw_bytes} //= length($$msgref);
-		# mid must be last, it can contain spaces (but not LF)
-		print $w @ekey, join(' ', @$smsg{qw(raw_bytes bytes
-						num blob ds ts tid mid)}),
-			"\n", $$msgref or die "failed to write shard $!\n";
-	} else {
-		if ($eml) {
-			undef($$msgref) if $msgref;
-		} else { # --xapian-only + --sequential-shard:
-			$eml = PublicInbox::Eml->new($msgref);
-		}
-		$self->begin_txn_lazy;
-		$smsg->{eidx_key} = $eidx_key if defined $eidx_key;
-		$self->add_message($eml, $smsg);
+	if ($eml) {
+		undef($$msgref) if $msgref;
+	} else { # --xapian-only + --sequential-shard:
+		$eml = PublicInbox::Eml->new($msgref);
 	}
+	$smsg->{eidx_key} = $eidx_key if defined $eidx_key;
+	$self->ipc_do('add_message', $eml, $smsg);
 }
 
 sub shard_add_eidx_info {
 	my ($self, $docid, $eidx_key, $eml) = @_;
-	if (my $w = $self->{w}) {
-		my $hdr = $eml->header_obj->as_string;
-		my $len = length($hdr);
-		print $w "+X $len $docid $eidx_key\n", $hdr or
-			die "failed to write shard: $!";
-	} else {
-		$self->add_eidx_info($docid, $eidx_key, $eml);
-	}
+	$self->ipc_do('add_eidx_info', $docid, $eidx_key, $eml);
 }
 
 sub shard_remove_eidx_info {
 	my ($self, $docid, $eidx_key, $eml) = @_;
-	if (my $w = $self->{w}) {
-		my $hdr = $eml ? $eml->header_obj->as_string : '';
-		my $len = length($hdr);
-		print $w "-X $len $docid $eidx_key\n", $hdr or
-			die "failed to write shard: $!";
-	} else {
-		$self->remove_eidx_info($docid, $eidx_key, $eml);
-	}
+	$self->ipc_do('remove_eidx_info', $docid, $eidx_key, $eml);
 }
 
-sub atfork_child {
-	close $_[0]->{w} or die "failed to close write pipe: $!\n";
+# needed when there's multiple IPC workers and the parent forking
+# causes newer siblings to inherit older siblings sockets
+sub shard_atfork_child {
+	my ($self) = @_;
+	my $pid = delete($self->{-ipc_worker_pid}) or
+			die "BUG: $$ no -ipc_worker_pid";
+	my $s1 = delete($self->{-ipc_sock}) or die "BUG: $$ no -ipc_sock";
+	$pid == $$ and die "BUG: $$ shard_atfork_child called on itself";
+	close($s1) or die "close -ipc_sock: $!";
 }
 
-sub shard_barrier {
-	my ($self) = @_;
-	if (my $w = $self->{w}) {
-		print $w "barrier\n" or die "failed to print: $!";
-	} else {
-		$self->commit_txn_lazy;
-	}
+# wait for return to determine when ipc_do('commit_txn_lazy') is done
+sub echo {
+	shift;
+	"@_";
 }
 
-sub shard_commit {
+sub idx_close {
 	my ($self) = @_;
-	if (my $w = $self->{w}) {
-		print $w "commit\n" or die "failed to write commit: $!";
-	} else {
-		$self->commit_txn_lazy;
-	}
+	die "transaction in progress $self\n" if $self->{txn};
+	$self->idx_release if $self->{xdb};
 }
 
 sub shard_close {
 	my ($self) = @_;
-	if (my $w = delete $self->{w}) {
-		my $pid = delete $self->{pid} or die "no process to wait on\n";
-		print $w "close\n" or die "failed to write to pid:$pid: $!\n";
-		close $w or die "failed to close pipe for pid:$pid: $!\n";
-		waitpid($pid, 0) == $pid or die "remote process did not finish";
-		$? == 0 or die ref($self)." pid:$pid exited with: $?";
-	} else {
-		die "transaction in progress $self\n" if $self->{txn};
-		$self->idx_release if $self->{xdb};
-	}
+	$self->ipc_do('idx_close');
+	$self->ipc_worker_stop;
 }
 
 sub shard_remove {
 	my ($self, $num) = @_;
-	if (my $w = $self->{w}) { # triggers remove_by_docid in a shard child
-		print $w "D $num\n" or die "failed to write remove $!";
-	} else { # same process
-		$self->remove_by_docid($num);
-	}
+	$self->ipc_do('remove_by_docid', $num);
 }
 
 sub shard_set_keywords {
 	my ($self, $docid, @kw) = @_;
-	if (my $w = $self->{w}) { # triggers remove_by_docid in a shard child
-		print $w "=K $docid @kw\n" or die "failed to write: $!";
-	} else { # same process
-		$self->set_keywords($docid, @kw);
-	}
+	$self->ipc_do('set_keywords', $docid, @kw);
 }
 
 sub shard_remove_keywords {
 	my ($self, $docid, @kw) = @_;
-	if (my $w = $self->{w}) { # triggers remove_by_docid in a shard child
-		print $w "-K $docid @kw\n" or die "failed to write: $!";
-	} else { # same process
-		$self->remove_keywords($docid, @kw);
-	}
+	$self->ipc_do('remove_keywords', $docid, @kw);
 }
 
 sub shard_add_keywords {
 	my ($self, $docid, @kw) = @_;
-	if (my $w = $self->{w}) { # triggers remove_by_docid in a shard child
-		print $w "+K $docid @kw\n" or die "failed to write: $!";
-	} else { # same process
-		$self->add_keywords($docid, @kw);
-	}
+	$self->ipc_do('add_keywords', $docid, @kw);
 }
 
 sub shard_over_check {
 	my ($self, $over) = @_;
-	if (my $w = $self->{w}) { # triggers remove_by_docid in a shard child
-		my ($over_fn) = $over->{dbh}->sqlite_db_filename;
-		$over_fn =~ tr/\n/\0/;
-		print $w "O $over_fn\n" or die "failed to write over $!";
-	} else {
-		$self->over_check($over);
+	if ($self->{-ipc_sock} && $over->{dbh}) {
+		# can't send DB handles over IPC
+		$over = ref($over)->new($over->{dbh}->sqlite_db_filename);
 	}
+	$self->ipc_do('over_check', $over);
 }
 
 1;
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 724fa804..cad559c5 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -287,16 +287,7 @@ sub _idx_init { # with_umask callback
 
 sub parallel_init ($$) {
 	my ($self, $indexlevel) = @_;
-	if (($indexlevel // 'full') eq 'basic') {
-		$self->{parallel} = 0;
-	} else {
-		pipe(my ($r, $w)) or die "pipe failed: $!";
-		# pipe for barrier notifications doesn't need to be big,
-		# 1031: F_SETPIPE_SZ
-		fcntl($w, 1031, 4096) if $^O eq 'linux';
-		$self->{bnote} = [ $r, $w ];
-		$w->autoflush(1);
-	}
+	$self->{parallel} = 0 if ($indexlevel // 'full') eq 'basic';
 }
 
 # idempotent
@@ -574,24 +565,6 @@ sub set_last_commits ($) { # this is NOT for ExtSearchIdx
 	}
 }
 
-sub barrier_init {
-	my ($self, $n) = @_;
-	$self->{bnote} or return;
-	--$n;
-	my $barrier = { map { $_ => 1 } (0..$n) };
-}
-
-sub barrier_wait {
-	my ($self, $barrier) = @_;
-	my $bnote = $self->{bnote} or return;
-	my $r = $bnote->[0];
-	while (scalar keys %$barrier) {
-		defined(my $l = readline($r)) or die "EOF on barrier_wait: $!";
-		$l =~ /\Abarrier (\d+)/ or die "bad line on barrier_wait: $l";
-		delete $barrier->{$1} or die "bad shard[$1] on barrier wait";
-	}
-}
-
 # public
 sub checkpoint ($;$) {
 	my ($self, $wait) = @_;
@@ -615,16 +588,23 @@ sub checkpoint ($;$) {
 		$self->{oidx}->commit_lazy;
 
 		# Now deal with Xapian
-		if ($wait) {
-			my $barrier = barrier_init($self, scalar @$shards);
-
-			# each shard needs to issue a barrier command
-			$_->shard_barrier for @$shards;
 
-			# wait for each Xapian shard
-			barrier_wait($self, $barrier);
-		} else {
-			$_->shard_commit for @$shards;
+		# start commit_txn_lazy asynchronously on all parallel shards
+		# (non-parallel waits here)
+		$_->ipc_do('commit_txn_lazy') for @$shards;
+
+		# transactions started on parallel shards,
+		# wait for them by issuing an echo command (echo can only
+		# run after commit_txn_lazy is done)
+		if ($wait && $self->{parallel}) {
+			my $i = 0;
+			for my $shard (@$shards) {
+				my $echo = $shard->ipc_do('echo', $i);
+				$echo == $i or die <<"";
+shard[$i] bad echo:$echo != $i waiting for txn commit
+
+				++$i;
+			}
 		}
 
 		my $midx = $self->{midx}; # misc index
@@ -679,7 +659,6 @@ sub done {
 	eval { $self->{oidx}->dbh_close };
 	$err .= "over close: $@\n" if $@;
 	delete $self->{midx};
-	delete $self->{bnote};
 	my $nbytes = $self->{total_bytes};
 	$self->{total_bytes} = 0;
 	$self->lock_release(!!$nbytes) if $shards;
@@ -846,15 +825,13 @@ sub content_exists ($$$) {
 
 sub atfork_child {
 	my ($self) = @_;
-	if (my $shards = $self->{idx_shards}) {
-		$_->atfork_child foreach @$shards;
+	if (my $older_siblings = $self->{idx_shards}) {
+		$_->shard_atfork_child for @$older_siblings;
 	}
 	if (my $im = $self->{im}) {
 		$im->atfork_child;
 	}
-	die "unexpected mm" if $self->{mm};
-	close $self->{bnote}->[0] or die "close bnote[0]: $!\n";
-	$self->{bnote}->[1];
+	die "BUG: unexpected mm" if $self->{mm};
 }
 
 sub reindex_checkpoint ($$) {

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 3/7] searchidxshard: IPC conversion, part 2
  2021-01-03  2:06 [PATCH 0/7] v2: swap in new IPC package Eric Wong
  2021-01-03  2:06 ` [PATCH 1/7] ipc: some documentation comments Eric Wong
  2021-01-03  2:06 ` [PATCH 2/7] searchidxshard: use PublicInbox::IPC to kill lots of code Eric Wong
@ 2021-01-03  2:06 ` Eric Wong
  2021-01-03  2:06 ` [PATCH 4/7] searchidxshard: replace index_raw with index_eml Eric Wong
                   ` (3 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2021-01-03  2:06 UTC (permalink / raw)
  To: meta

We can remove some now-pointless wrapper functions by using
->ipc_do in even more places.
---
 lib/PublicInbox/ExtSearchIdx.pm   | 23 ++++++++++++-----------
 lib/PublicInbox/LeiStore.pm       | 13 +++++++------
 lib/PublicInbox/SearchIdx.pm      |  8 +-------
 lib/PublicInbox/SearchIdxShard.pm | 30 ------------------------------
 lib/PublicInbox/V2Writable.pm     |  3 +--
 5 files changed, 21 insertions(+), 56 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index c3581628..064d9939 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -102,7 +102,7 @@ sub do_xpost ($$) {
 	if (my $new_smsg = $req->{new_smsg}) { # 'm' on cross-posted message
 		my $xnum = $req->{xnum};
 		$self->{oidx}->add_xref3($docid, $xnum, $oid, $eidx_key);
-		$idx->shard_add_eidx_info($docid, $eidx_key, $eml);
+		$idx->ipc_do('add_eidx_info', $docid, $eidx_key, $eml);
 		check_batch_limit($req);
 	} else { # 'd'
 		my $rm_eidx_info;
@@ -110,9 +110,10 @@ sub do_xpost ($$) {
 							\$rm_eidx_info);
 		if ($nr == 0) {
 			$self->{oidx}->eidxq_del($docid);
-			$idx->shard_remove($docid);
+			$idx->ipc_do('xdb_remove', $docid);
 		} elsif ($rm_eidx_info) {
-			$idx->shard_remove_eidx_info($docid, $eidx_key, $eml);
+			$idx->ipc_do('remove_eidx_info',
+					$docid, $eidx_key, $eml);
 			$self->{oidx}->eidxq_add($docid); # yes, add
 		}
 	}
@@ -327,7 +328,7 @@ DELETE FROM xref3 WHERE docid = ? AND ibx_id = ?
 		}
 	} else {
 		warn "I: remove #$docid $eidx_key @oid\n";
-		$self->idx_shard($docid)->shard_remove($docid);
+		$self->idx_shard($docid)->ipc_do('xdb_remove', $docid);
 	}
 }
 
@@ -440,7 +441,7 @@ sub _reindex_finalize ($$$) {
 	for my $x (reverse @$stable) {
 		$ibx = _ibx_for($self, $sync, $x);
 		my $hdr = delete $x->{hdr} // die 'BUG: no {hdr}';
-		$idx->shard_add_eidx_info($docid, $ibx->eidx_key, $hdr);
+		$idx->ipc_do('add_eidx_info', $docid, $ibx->eidx_key, $hdr);
 	}
 	return if $nr == 1; # likely, all good
 
@@ -483,12 +484,12 @@ sub _reindex_oid { # git->cat_async callback
 		my $remain = $self->{oidx}->remove_xref3($docid, $expect_oid);
 		if ($remain == 0) {
 			warn "W: #$docid gone or corrupted\n";
-			$self->idx_shard($docid)->shard_remove($docid);
+			$self->idx_shard($docid)->ipc_do('xdb_remove', $docid);
 		} elsif (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) {
 			$self->git->cat_async($next_oid, \&_reindex_oid, $req);
 		} else {
 			warn "BUG: #$docid gone (UNEXPECTED)\n";
-			$self->idx_shard($docid)->shard_remove($docid);
+			$self->idx_shard($docid)->ipc_do('xdb_remove', $docid);
 		}
 		return;
 	}
@@ -522,7 +523,7 @@ sub _reindex_smsg ($$$) {
 BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex
 
 		$self->{oidx}->delete_by_num($docid);
-		$self->idx_shard($docid)->shard_remove($docid);
+		$self->idx_shard($docid)->ipc_do('xdb_remove', $docid);
 		return;
 	}
 
@@ -799,10 +800,10 @@ DELETE FROM xref3 WHERE ibx_id = ? AND xnum = ? AND oidbin = ?
 			if (scalar(@$xr3) == 0) { # all gone
 				$self->{oidx}->delete_by_num($docid);
 				$self->{oidx}->eidxq_del($docid);
-				$idx->shard_remove($docid);
+				$idx->ipc_do('xdb_remove', $docid);
 			} else { # enqueue for reindex of remaining messages
-				$idx->shard_remove_eidx_info($docid,
-							$ibx->eidx_key);
+				$idx->ipc_do('remove_eidx_info',
+						$docid, $ibx->eidx_key);
 				$self->{oidx}->eidxq_add($docid); # yes, add
 			}
 		}
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 07a3198a..d686e95a 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -146,7 +146,7 @@ sub set_eml_keywords {
 	my $eidx = eidx_init($self);
 	my @docids = _docids_for($self, $eml);
 	for my $docid (@docids) {
-		$eidx->idx_shard($docid)->shard_set_keywords($docid, @kw);
+		$eidx->idx_shard($docid)->ipc_do('set_keywords', $docid, @kw);
 	}
 	\@docids;
 }
@@ -156,7 +156,7 @@ sub add_eml_keywords {
 	my $eidx = eidx_init($self);
 	my @docids = _docids_for($self, $eml);
 	for my $docid (@docids) {
-		$eidx->idx_shard($docid)->shard_add_keywords($docid, @kw);
+		$eidx->idx_shard($docid)->ipc_do('add_keywords', $docid, @kw);
 	}
 	\@docids;
 }
@@ -166,7 +166,7 @@ sub remove_eml_keywords {
 	my $eidx = eidx_init($self);
 	my @docids = _docids_for($self, $eml);
 	for my $docid (@docids) {
-		$eidx->idx_shard($docid)->shard_remove_keywords($docid, @kw);
+		$eidx->idx_shard($docid)->ipc_do('remove_keywords', $docid, @kw)
 	}
 	\@docids;
 }
@@ -205,8 +205,9 @@ sub add_eml {
 		for my $docid (@docids) {
 			my $idx = $eidx->idx_shard($docid);
 			$oidx->add_xref3($docid, -1, $smsg->{blob}, '.');
-			$idx->shard_add_eidx_info($docid, '.', $eml); # List-Id
-			$idx->shard_add_keywords($docid, @kw) if @kw;
+			# add_eidx_info for List-Id
+			$idx->ipc_do('add_eidx_info', $docid, '.', $eml);
+			$idx->ipc_do('add_keywords', $docid, @kw) if @kw;
 		}
 		\@docids;
 	} else {
@@ -215,7 +216,7 @@ sub add_eml {
 		$oidx->add_xref3($smsg->{num}, -1, $smsg->{blob}, '.');
 		my $idx = $eidx->idx_shard($smsg->{num});
 		$idx->index_raw($msgref, $eml, $smsg);
-		$idx->shard_add_keywords($smsg->{num}, @kw) if @kw;
+		$idx->ipc_do('add_keywords', $smsg->{num}, @kw) if @kw;
 		$smsg;
 	}
 }
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index d83fd4ca..da3ac2e3 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -552,6 +552,7 @@ sub smsg_from_doc ($) {
 
 sub xdb_remove {
 	my ($self, @docids) = @_;
+	$self->begin_txn_lazy;
 	my $xdb = $self->{xdb} or return;
 	for my $docid (@docids) {
 		eval { $xdb->delete_document($docid) };
@@ -559,13 +560,6 @@ sub xdb_remove {
 	}
 }
 
-sub remove_by_docid {
-	my ($self, $num) = @_;
-	die "BUG: remove_by_docid is v2-only\n" if $self->{oidx};
-	$self->begin_txn_lazy;
-	xdb_remove($self, $num) if need_xapian($self);
-}
-
 sub index_git_blob_id {
 	my ($doc, $pfx, $objid) = @_;
 
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
index 68644bc0..43dad959 100644
--- a/lib/PublicInbox/SearchIdxShard.pm
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -54,16 +54,6 @@ sub index_raw {
 	$self->ipc_do('add_message', $eml, $smsg);
 }
 
-sub shard_add_eidx_info {
-	my ($self, $docid, $eidx_key, $eml) = @_;
-	$self->ipc_do('add_eidx_info', $docid, $eidx_key, $eml);
-}
-
-sub shard_remove_eidx_info {
-	my ($self, $docid, $eidx_key, $eml) = @_;
-	$self->ipc_do('remove_eidx_info', $docid, $eidx_key, $eml);
-}
-
 # needed when there's multiple IPC workers and the parent forking
 # causes newer siblings to inherit older siblings sockets
 sub shard_atfork_child {
@@ -93,26 +83,6 @@ sub shard_close {
 	$self->ipc_worker_stop;
 }
 
-sub shard_remove {
-	my ($self, $num) = @_;
-	$self->ipc_do('remove_by_docid', $num);
-}
-
-sub shard_set_keywords {
-	my ($self, $docid, @kw) = @_;
-	$self->ipc_do('set_keywords', $docid, @kw);
-}
-
-sub shard_remove_keywords {
-	my ($self, $docid, @kw) = @_;
-	$self->ipc_do('remove_keywords', $docid, @kw);
-}
-
-sub shard_add_keywords {
-	my ($self, $docid, @kw) = @_;
-	$self->ipc_do('add_keywords', $docid, @kw);
-}
-
 sub shard_over_check {
 	my ($self, $over) = @_;
 	if ($self->{-ipc_sock} && $over->{dbh}) {
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index cad559c5..885edbe9 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -1133,8 +1133,7 @@ sub unindex_oid_aux ($$$) {
 	my ($self, $oid, $mid) = @_;
 	my @removed = $self->{oidx}->remove_oid($oid, $mid);
 	for my $num (@removed) {
-		my $idx = idx_shard($self, $num);
-		$idx->shard_remove($num);
+		idx_shard($self, $num)->ipc_do('xdb_remove', $num);
 	}
 }
 

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 4/7] searchidxshard: replace index_raw with index_eml
  2021-01-03  2:06 [PATCH 0/7] v2: swap in new IPC package Eric Wong
                   ` (2 preceding siblings ...)
  2021-01-03  2:06 ` [PATCH 3/7] searchidxshard: IPC conversion, part 2 Eric Wong
@ 2021-01-03  2:06 ` Eric Wong
  2021-01-03  2:06 ` [PATCH 5/7] use Eml (or MIME) objects for all indexing paths Eric Wong
                   ` (2 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2021-01-03  2:06 UTC (permalink / raw)
  To: meta

Since Storable and Sereal are designed for lossless
serialization, we'll just pass $eml objects to whatever process
is running SearchIdx.
---
 lib/PublicInbox/ExtSearchIdx.pm   |  4 ++--
 lib/PublicInbox/LeiStore.pm       |  3 ++-
 lib/PublicInbox/SearchIdxShard.pm |  9 ++-------
 lib/PublicInbox/V2Writable.pm     | 11 +++++------
 4 files changed, 11 insertions(+), 16 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index 064d9939..d55d3db9 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -135,7 +135,7 @@ sub index_unseen ($) {
 	my $oid = $new_smsg->{blob};
 	my $ibx = delete $req->{ibx} or die 'BUG: {ibx} unset';
 	$self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key);
-	$idx->index_raw(undef, $eml, $new_smsg, $ibx->eidx_key);
+	$idx->index_eml($eml, $new_smsg, $ibx->eidx_key);
 	check_batch_limit($req);
 }
 
@@ -437,7 +437,7 @@ sub _reindex_finalize ($$$) {
 	my $top_smsg = pop @$stable;
 	$top_smsg == $smsg or die 'BUG: top_smsg != smsg';
 	my $ibx = _ibx_for($self, $sync, $smsg);
-	$idx->index_raw(undef, $eml, $smsg, $ibx->eidx_key);
+	$idx->index_eml($eml, $smsg, $ibx->eidx_key);
 	for my $x (reverse @$stable) {
 		$ibx = _ibx_for($self, $sync, $x);
 		my $hdr = delete $x->{hdr} // die 'BUG: no {hdr}';
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index d686e95a..4f77e8fa 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -199,6 +199,7 @@ sub add_eml {
 	$im->add($eml, undef, $smsg) or return; # duplicate returns undef
 	my $msgref = delete $smsg->{-raw_email};
 	$smsg->{bytes} = $smsg->{raw_bytes} + crlf_adjust($$msgref);
+	undef $msgref;
 
 	local $self->{current_info} = $smsg->{blob};
 	if (my @docids = _docids_for($self, $eml)) {
@@ -215,7 +216,7 @@ sub add_eml {
 		$oidx->add_overview($eml, $smsg);
 		$oidx->add_xref3($smsg->{num}, -1, $smsg->{blob}, '.');
 		my $idx = $eidx->idx_shard($smsg->{num});
-		$idx->index_raw($msgref, $eml, $smsg);
+		$idx->index_eml($eml, $smsg);
 		$idx->ipc_do('add_keywords', $smsg->{num}, @kw) if @kw;
 		$smsg;
 	}
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
index 43dad959..83cbbb25 100644
--- a/lib/PublicInbox/SearchIdxShard.pm
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -43,13 +43,8 @@ sub ipc_atfork_child { # called automatically before ipc_worker_loop
 	PublicInbox::OnDestroy->new($$, \&_worker_done, $self);
 }
 
-sub index_raw {
-	my ($self, $msgref, $eml, $smsg, $eidx_key) = @_;
-	if ($eml) {
-		undef($$msgref) if $msgref;
-	} else { # --xapian-only + --sequential-shard:
-		$eml = PublicInbox::Eml->new($msgref);
-	}
+sub index_eml {
+	my ($self, $eml, $smsg, $eidx_key) = @_;
 	$smsg->{eidx_key} = $eidx_key if defined $eidx_key;
 	$self->ipc_do('add_message', $eml, $smsg);
 }
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 885edbe9..7b6b93a0 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -140,11 +140,11 @@ sub idx_shard ($$) {
 
 # indexes a message, returns true if checkpointing is needed
 sub do_idx ($$$$) {
-	my ($self, $msgref, $mime, $smsg) = @_;
+	my ($self, $msgref, $eml, $smsg) = @_;
 	$smsg->{bytes} = $smsg->{raw_bytes} + crlf_adjust($$msgref);
-	$self->{oidx}->add_overview($mime, $smsg);
+	$self->{oidx}->add_overview($eml, $smsg);
 	my $idx = idx_shard($self, $smsg->{num});
-	$idx->index_raw($msgref, $mime, $smsg);
+	$idx->index_eml($eml, $smsg);
 	my $n = $self->{transact_bytes} += $smsg->{raw_bytes};
 	$n >= $self->{batch_bytes};
 }
@@ -173,8 +173,7 @@ sub _add {
 	$cmt = $im->get_mark($cmt);
 	$self->{last_commit}->[$self->{epoch_max}] = $cmt;
 
-	my $msgref = delete $smsg->{-raw_email};
-	if (do_idx($self, $msgref, $mime, $smsg)) {
+	if (do_idx($self, delete $smsg->{-raw_email}, $mime, $smsg)) {
 		$self->checkpoint;
 	}
 
@@ -1219,7 +1218,7 @@ sub index_xap_only { # git->cat_async callback
 	my $self = $smsg->{self};
 	my $idx = idx_shard($self, $smsg->{num});
 	$smsg->{raw_bytes} = $size;
-	$idx->index_raw($bref, undef, $smsg);
+	$idx->index_eml(PublicInbox::Eml->new($bref), $smsg);
 	$self->{transact_bytes} += $size;
 }
 

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 5/7] use Eml (or MIME) objects for all indexing paths
  2021-01-03  2:06 [PATCH 0/7] v2: swap in new IPC package Eric Wong
                   ` (3 preceding siblings ...)
  2021-01-03  2:06 ` [PATCH 4/7] searchidxshard: replace index_raw with index_eml Eric Wong
@ 2021-01-03  2:06 ` Eric Wong
  2021-01-03  2:06 ` [PATCH 6/7] ipc: switch to one-way pipes Eric Wong
  2021-01-03  2:06 ` [PATCH 7/7] searchidxshard: use add_xapian directly for v2 Eric Wong
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2021-01-03  2:06 UTC (permalink / raw)
  To: meta

We don't need to be keeping the raw message around after it hits
git.  Shard work now relies on Storable (or Sereal) and all of
the indexing code relies on the Email::MIME-like API of Eml to
access interesting parts of the message.

Similarly, smsg->{raw_bytes} is no longer carried around and we
do the CRLF adjustment when setting smsg->{bytes}.

There's also a small simplification to t/import.t while
we're in the area to use xqx instead of spawn/popen_rd.
---
 lib/PublicInbox/ExtSearchIdx.pm | 11 ++++-------
 lib/PublicInbox/Import.pm       |  4 +---
 lib/PublicInbox/LeiStore.pm     |  4 ----
 lib/PublicInbox/SearchIdx.pm    | 17 +++--------------
 lib/PublicInbox/Smsg.pm         | 13 +++++++++++++
 lib/PublicInbox/V2Writable.pm   | 23 ++++++++++-------------
 t/import.t                      | 12 ++----------
 t/search.t                      |  2 +-
 8 files changed, 34 insertions(+), 52 deletions(-)

diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index d55d3db9..e6c21866 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -21,8 +21,7 @@ use Carp qw(croak carp);
 use Sys::Hostname qw(hostname);
 use POSIX qw(strftime);
 use PublicInbox::Search;
-use PublicInbox::SearchIdx qw(crlf_adjust prepare_stack is_ancestor
-	is_bad_blob);
+use PublicInbox::SearchIdx qw(prepare_stack is_ancestor is_bad_blob);
 use PublicInbox::OverIdx;
 use PublicInbox::MiscIdx;
 use PublicInbox::MID qw(mids);
@@ -82,8 +81,6 @@ sub check_batch_limit ($) {
 	my ($req) = @_;
 	my $self = $req->{self};
 	my $new_smsg = $req->{new_smsg};
-
-	# {raw_bytes} may be unset, so just use {bytes}
 	my $n = $self->{transact_bytes} += $new_smsg->{bytes};
 
 	# set flag for PublicInbox::V2Writable::index_todo:
@@ -239,7 +236,7 @@ sub index_oid { # git->cat_async callback for 'm'
 	my $new_smsg = $req->{new_smsg} = bless {
 		blob => $oid,
 	}, 'PublicInbox::Smsg';
-	$new_smsg->{bytes} = $size + crlf_adjust($$bref);
+	$new_smsg->set_bytes($$bref, $size);
 	defined($req->{xnum} = cur_ibx_xnum($req, $bref)) or return;
 	++${$req->{nr}};
 	do_step($req);
@@ -496,7 +493,7 @@ sub _reindex_oid { # git->cat_async callback
 	my $ci = $self->{current_info};
 	local $self->{current_info} = "$ci #$docid $oid";
 	my $re_smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
-	$re_smsg->{bytes} = $size + crlf_adjust($$bref);
+	$re_smsg->set_bytes($$bref, $size);
 	my $eml = PublicInbox::Eml->new($bref);
 	$re_smsg->populate($eml, { autime => $orig_smsg->{ds},
 				cotime => $orig_smsg->{ts} });
@@ -676,7 +673,7 @@ sub _reindex_unseen { # git->cat_async callback
 	my $self = $req->{self} // die 'BUG: {self} unset';
 	local $self->{current_info} = "$self->{current_info} $oid";
 	my $new_smsg = bless { blob => $oid, }, 'PublicInbox::Smsg';
-	$new_smsg->{bytes} = $size + crlf_adjust($$bref);
+	$new_smsg->set_bytes($$bref, $size);
 	my $eml = $req->{eml} = PublicInbox::Eml->new($bref);
 	$req->{new_smsg} = $new_smsg;
 	$req->{chash} = content_hash($eml);
diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index 052b145b..8a06a661 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -412,12 +412,10 @@ sub add {
 	# v2: we need this for Xapian
 	if ($smsg) {
 		$smsg->{blob} = $self->get_mark(":$blob");
-		$smsg->{raw_bytes} = $n;
+		$smsg->set_bytes($raw_email, $n);
 		if (my $oidx = delete $smsg->{-oidx}) { # used by LeiStore
 			return if $oidx->blob_exists($smsg->{blob});
 		}
-		# XXX do we need this? it's in git at this point
-		$smsg->{-raw_email} = \$raw_email;
 	}
 	my $ref = $self->{ref};
 	my $commit = $self->{mark}++;
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 4f77e8fa..7cda7e44 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -10,7 +10,6 @@ package PublicInbox::LeiStore;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::Lock PublicInbox::IPC);
-use PublicInbox::SearchIdx qw(crlf_adjust);
 use PublicInbox::ExtSearchIdx;
 use PublicInbox::Import;
 use PublicInbox::InboxWritable;
@@ -197,9 +196,6 @@ sub add_eml {
 	my $smsg = bless { -oidx => $oidx }, 'PublicInbox::Smsg';
 	my $im = $self->importer;
 	$im->add($eml, undef, $smsg) or return; # duplicate returns undef
-	my $msgref = delete $smsg->{-raw_email};
-	$smsg->{bytes} = $smsg->{raw_bytes} + crlf_adjust($$msgref);
-	undef $msgref;
 
 	local $self->{current_info} = $smsg->{blob};
 	if (my @docids = _docids_for($self, $eml)) {
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index da3ac2e3..a7005051 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -22,7 +22,7 @@ use PublicInbox::OverIdx;
 use PublicInbox::Spawn qw(spawn nodatacow_dir);
 use PublicInbox::Git qw(git_unquote);
 use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp);
-our @EXPORT_OK = qw(crlf_adjust log2stack is_ancestor check_size prepare_stack
+our @EXPORT_OK = qw(log2stack is_ancestor check_size prepare_stack
 	index_text term_generator add_val is_bad_blob);
 my $X = \%PublicInbox::Search::X;
 our ($DB_CREATE_OR_OPEN, $DB_OPEN);
@@ -613,17 +613,6 @@ sub index_mm {
 	}
 }
 
-# returns the number of bytes to add if given a non-CRLF arg
-sub crlf_adjust ($) {
-	if (index($_[0], "\r\n") < 0) {
-		# common case is LF-only, every \n needs an \r;
-		# so favor a cheap tr// over an expensive m//g
-		$_[0] =~ tr/\n/\n/;
-	} else { # count number of '\n' w/o '\r', expensive:
-		scalar(my @n = ($_[0] =~ m/(?<!\r)\n/g));
-	}
-}
-
 sub is_bad_blob ($$$$) {
 	my ($oid, $type, $size, $expect_oid) = @_;
 	if ($type ne 'blob') {
@@ -640,8 +629,8 @@ sub index_both { # git->cat_async callback
 	my ($nr, $max) = @$sync{qw(nr max)};
 	++$$nr;
 	$$max -= $size;
-	$size += crlf_adjust($$bref);
-	my $smsg = bless { bytes => $size, blob => $oid }, 'PublicInbox::Smsg';
+	my $smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
+	$smsg->set_bytes($$bref, $size);
 	my $self = $sync->{sidx};
 	local $self->{current_info} = "$self->{current_info}: $oid";
 	my $eml = PublicInbox::Eml->new($bref);
diff --git a/lib/PublicInbox/Smsg.pm b/lib/PublicInbox/Smsg.pm
index 571cbb6f..c6ff7f52 100644
--- a/lib/PublicInbox/Smsg.pm
+++ b/lib/PublicInbox/Smsg.pm
@@ -135,4 +135,17 @@ sub subject_normalized ($) {
 	$subj;
 }
 
+# returns the number of bytes to add if given a non-CRLF arg
+sub crlf_adjust ($) {
+	if (index($_[0], "\r\n") < 0) {
+		# common case is LF-only, every \n needs an \r;
+		# so favor a cheap tr// over an expensive m//g
+		$_[0] =~ tr/\n/\n/;
+	} else { # count number of '\n' w/o '\r', expensive:
+		scalar(my @n = ($_[0] =~ m/(?<!\r)\n/g));
+	}
+}
+
+sub set_bytes { $_[0]->{bytes} = $_[2] + crlf_adjust($_[1]) }
+
 1;
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 7b6b93a0..c4efbdd2 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -17,8 +17,7 @@ use PublicInbox::InboxWritable;
 use PublicInbox::OverIdx;
 use PublicInbox::Msgmap;
 use PublicInbox::Spawn qw(spawn popen_rd run_die);
-use PublicInbox::SearchIdx qw(log2stack crlf_adjust is_ancestor check_size
-	is_bad_blob);
+use PublicInbox::SearchIdx qw(log2stack is_ancestor check_size is_bad_blob);
 use IO::Handle; # ->autoflush
 use File::Temp ();
 
@@ -139,13 +138,12 @@ sub idx_shard ($$) {
 }
 
 # indexes a message, returns true if checkpointing is needed
-sub do_idx ($$$$) {
-	my ($self, $msgref, $eml, $smsg) = @_;
-	$smsg->{bytes} = $smsg->{raw_bytes} + crlf_adjust($$msgref);
+sub do_idx ($$$) {
+	my ($self, $eml, $smsg) = @_;
 	$self->{oidx}->add_overview($eml, $smsg);
 	my $idx = idx_shard($self, $smsg->{num});
 	$idx->index_eml($eml, $smsg);
-	my $n = $self->{transact_bytes} += $smsg->{raw_bytes};
+	my $n = $self->{transact_bytes} += $smsg->{bytes};
 	$n >= $self->{batch_bytes};
 }
 
@@ -173,7 +171,7 @@ sub _add {
 	$cmt = $im->get_mark($cmt);
 	$self->{last_commit}->[$self->{epoch_max}] = $cmt;
 
-	if (do_idx($self, delete $smsg->{-raw_email}, $mime, $smsg)) {
+	if (do_idx($self, $mime, $smsg)) {
 		$self->checkpoint;
 	}
 
@@ -536,13 +534,13 @@ W: $list
 	for my $smsg (@$need_reindex) {
 		my $new_smsg = bless {
 			blob => $blob,
-			raw_bytes => $bytes,
 			num => $smsg->{num},
 			mid => $smsg->{mid},
 		}, 'PublicInbox::Smsg';
 		my $sync = { autime => $smsg->{ds}, cotime => $smsg->{ts} };
 		$new_smsg->populate($new_mime, $sync);
-		do_idx($self, \$raw, $new_mime, $new_smsg);
+		$new_smsg->set_bytes($raw, $bytes);
+		do_idx($self, $new_mime, $new_smsg);
 	}
 	$rewritten->{rewrites};
 }
@@ -937,13 +935,13 @@ sub index_oid { # cat_async callback
 	}
 	++${$arg->{nr}};
 	my $smsg = bless {
-		raw_bytes => $size,
 		num => $num,
 		blob => $oid,
 		mid => $mid0,
 	}, 'PublicInbox::Smsg';
 	$smsg->populate($eml, $arg);
-	if (do_idx($self, $bref, $eml, $smsg)) {
+	$smsg->set_bytes($$bref, $size);
+	if (do_idx($self, $eml, $smsg)) {
 		${$arg->{need_checkpoint}} = 1;
 	}
 	index_finalize($arg, 1);
@@ -1217,9 +1215,8 @@ sub index_xap_only { # git->cat_async callback
 	my ($bref, $oid, $type, $size, $smsg) = @_;
 	my $self = $smsg->{self};
 	my $idx = idx_shard($self, $smsg->{num});
-	$smsg->{raw_bytes} = $size;
 	$idx->index_eml(PublicInbox::Eml->new($bref), $smsg);
-	$self->{transact_bytes} += $size;
+	$self->{transact_bytes} += $smsg->{bytes};
 }
 
 sub index_xap_step ($$$;$) {
diff --git a/t/import.t b/t/import.t
index 855b5d7c..ae76858b 100644
--- a/t/import.t
+++ b/t/import.t
@@ -7,7 +7,6 @@ use PublicInbox::Eml;
 use PublicInbox::Smsg;
 use PublicInbox::Git;
 use PublicInbox::Import;
-use PublicInbox::Spawn qw(spawn);
 use Fcntl qw(:DEFAULT SEEK_SET);
 use PublicInbox::TestCommon;
 use MIME::Base64 3.05; # Perl 5.10.0 / 5.9.2
@@ -32,20 +31,13 @@ like($im->add($mime, undef, $smsg), qr/\A:[0-9]+\z/, 'added one message');
 
 if ($v2) {
 	like($smsg->{blob}, qr/\A[a-f0-9]{40}\z/, 'got last object_id');
-	my $raw_email = $smsg->{-raw_email};
-	is($mime->as_string, $$raw_email, 'string matches');
-	is($smsg->{raw_bytes}, length($$raw_email), 'length matches');
 	my @cmd = ('git', "--git-dir=$git->{git_dir}", qw(hash-object --stdin));
 	open my $in, '+<', undef or BAIL_OUT "open(+<): $!";
 	print $in $mime->as_string or die "write failed: $!";
 	$in->flush or die "flush failed: $!";
-	seek($in, 0, SEEK_SET);
-	open my $out, '+<', undef or BAIL_OUT "open(+<): $!";
-	my $pid = spawn(\@cmd, {}, { 0 => $in, 1 => $out });
-	is(waitpid($pid, 0), $pid, 'waitpid succeeds on hash-object');
+	seek($in, 0, SEEK_SET) or die "seek: $!";
+	chomp(my $hashed_obj = xqx(\@cmd, undef, { 0 => $in }));
 	is($?, 0, 'hash-object');
-	seek($out, 0, SEEK_SET);
-	chomp(my $hashed_obj = <$out>);
 	is($hashed_obj, $smsg->{blob}, "blob object_id matches exp");
 }
 
diff --git a/t/search.t b/t/search.t
index 7495233e..b2958c00 100644
--- a/t/search.t
+++ b/t/search.t
@@ -60,7 +60,7 @@ sub oct_is ($$$) {
 }
 
 {
-	my $crlf_adjust = \&PublicInbox::SearchIdx::crlf_adjust;
+	my $crlf_adjust = \&PublicInbox::Smsg::crlf_adjust;
 	is($crlf_adjust->("hi\r\nworld\r\n"), 0, 'no adjustment needed');
 	is($crlf_adjust->("hi\nworld\n"), 2, 'LF-only counts two CR');
 	is($crlf_adjust->("hi\r\nworld\n"), 1, 'CRLF/LF-mix 1 counts 1 CR');

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 6/7] ipc: switch to one-way pipes
  2021-01-03  2:06 [PATCH 0/7] v2: swap in new IPC package Eric Wong
                   ` (4 preceding siblings ...)
  2021-01-03  2:06 ` [PATCH 5/7] use Eml (or MIME) objects for all indexing paths Eric Wong
@ 2021-01-03  2:06 ` Eric Wong
  2021-01-03  2:06 ` [PATCH 7/7] searchidxshard: use add_xapian directly for v2 Eric Wong
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2021-01-03  2:06 UTC (permalink / raw)
  To: meta

This fixes a performance regression in multi-process v2 indexing
due to the switch to PublicInbox::IPC.  While Unix sockets are
fewer FDs to manage, pipes allow unprivileged processes to use
larger buffers (up to 1M) on out-of-the-box Linux instances.

A larger buffer via F_SETPIPE_SZ afforded by pipes was proven
valuable during v2 development in 2018 and continues to be
valuable when we get significant amounts of one-way traffic from
the producer parent to worker children.

Compression may be an option for systems without F_SETPIPE_SZ;
but it increases CPU usage with no memory bandwidth savings on
hosts where larger buffers are available.
---
 lib/PublicInbox/IPC.pm            | 115 +++++++++++++++++-------------
 lib/PublicInbox/SearchIdxShard.pm |  23 +++---
 lib/PublicInbox/V2Writable.pm     |   2 +-
 3 files changed, 78 insertions(+), 62 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 6b7b3c7a..c1f6f920 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -6,7 +6,6 @@
 package PublicInbox::IPC;
 use strict;
 use v5.10.1;
-use Socket qw(AF_UNIX SOCK_STREAM);
 use Carp qw(confess croak);
 use PublicInbox::Sigfd;
 my ($enc, $dec);
@@ -34,39 +33,42 @@ if ($enc && $dec) { # should be custom ops
 }
 
 sub _get_rec ($) {
-	my ($sock) = @_;
-	local $/ = "\n";
-	defined(my $len = <$sock>) or return;
+	my ($r) = @_;
+	defined(my $len = <$r>) or return;
 	chop($len) eq "\n" or croak "no LF byte in $len";
-	defined(my $r = read($sock, my $buf, $len)) or croak "read error: $!";
-	$r == $len or croak "short read: $r != $len";
+	defined(my $n = read($r, my $buf, $len)) or croak "read error: $!";
+	$n == $len or croak "short read: $n != $len";
 	thaw($buf);
 }
 
 sub _send_rec ($$) {
-	my ($sock, $ref) = @_;
+	my ($w, $ref) = @_;
 	my $buf = freeze($ref);
-	print $sock length($buf), "\n", $buf or croak "print: $!";
+	print $w length($buf), "\n", $buf or croak "print: $!";
 }
 
 sub ipc_return ($$$) {
-	my ($s2, $ret, $exc) = @_;
-	_send_rec($s2, $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : $ret);
+	my ($w, $ret, $exc) = @_;
+	_send_rec($w, $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : $ret);
 }
 
-sub ipc_worker_loop ($$) {
-	my ($self, $s2) = @_;
-	while (my $rec = _get_rec($s2)) {
-		my ($wantarray, $sub, @args) = @$rec;
-		if (!defined($wantarray)) { # no waiting if client doesn't care
+sub ipc_worker_loop ($$$) {
+	my ($self, $r_req, $w_res) = @_;
+	my ($rec, $wantarray, $sub, @args);
+	local $/ = "\n";
+	while ($rec = _get_rec($r_req)) {
+		($wantarray, $sub, @args) = @$rec;
+		# no waiting if client doesn't care,
+		# this is the overwhelmingly likely case
+		if (!defined($wantarray)) {
 			eval { $self->$sub(@args) };
-			eval { warn "die: $@ (from nowait $sub)\n" } if $@;
+			eval { warn "$$ die: $@ (from nowait $sub)\n" } if $@;
 		} elsif ($wantarray) {
 			my @ret = eval { $self->$sub(@args) };
-			ipc_return($s2, \@ret, $@);
-		} else {
+			ipc_return($w_res, \@ret, $@);
+		} else { # '' => wantscalar
 			my $ret = eval { $self->$sub(@args) };
-			ipc_return($s2, \$ret, $@);
+			ipc_return($w_res, \$ret, $@);
 		}
 	}
 }
@@ -75,33 +77,34 @@ sub ipc_worker_loop ($$) {
 sub ipc_worker_spawn {
 	my ($self, $ident, $oldset) = @_;
 	return unless $enc; # no Sereal or Storable
-	my $pid = $self->{-ipc_worker_pid};
-	confess "BUG: already spawned PID:$pid" if $pid;
-	confess "BUG: already have worker socket" if $self->{-ipc_sock};
-	my ($s1, $s2);
-	socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair: $!";
+	return if ($self->{-ipc_ppid} // -1) == $$; # idempotent
+	delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)});
+	pipe(my ($r_req, $w_req)) or die "pipe: $!";
+	pipe(my ($r_res, $w_res)) or die "pipe: $!";
 	my $sigset = $oldset // PublicInbox::Sigfd::block_signals();
 	my $parent = $$;
 	$self->ipc_atfork_parent;
-	defined($pid = fork) or die "fork: $!";
+	defined(my $pid = fork) or die "fork: $!";
 	if ($pid == 0) {
 		eval { PublicInbox::DS->Reset };
 		$self->{-ipc_parent_pid} = $parent;
-		close $s1 or die "close(\$s1): $!";
-		$s2->autoflush(1);
+		$w_req = $r_res = undef;
+		$w_res->autoflush(1);
 		$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
 		local $0 = $ident;
 		PublicInbox::Sigfd::sig_setmask($oldset);
 		my $on_destroy = $self->ipc_atfork_child;
-		eval { ipc_worker_loop($self, $s2) };
+		eval { ipc_worker_loop($self, $r_req, $w_res) };
 		die "worker $ident PID:$$ died: $@\n" if $@;
 		exit;
 	}
 	PublicInbox::Sigfd::sig_setmask($sigset) unless $oldset;
-	close $s2 or die "close(\$s2): $!";
-	$s1->autoflush(1);
-	$self->{-ipc_sock} = $s1;
-	$self->{-ipc_worker_pid} = $pid;
+	$r_req = $w_res = undef;
+	$w_req->autoflush(1);
+	$self->{-ipc_req} = $w_req;
+	$self->{-ipc_res} = $r_res;
+	$self->{-ipc_ppid} = $$;
+	$self->{-ipc_pid} = $pid;
 }
 
 sub ipc_worker_reap { # dwaitpid callback
@@ -122,15 +125,18 @@ sub ipc_worker_exit {
 # idempotent, can be called regardless of whether worker is active or not
 sub ipc_worker_stop {
 	my ($self) = @_;
-	my $pid;
-	my $s1 = delete $self->{-ipc_sock} or do {
-		$pid = delete $self->{-ipc_worker_pid} and
-			die "unexpected PID:$pid without ipc_sock";
-		return;
-	};
-	$pid = delete $self->{-ipc_worker_pid} or die "no PID?";
-	_send_rec($s1, [ undef, 'ipc_worker_exit', 0 ]);
-	shutdown($s1, 2) or die "shutdown(\$s1) for PID:$pid";
+	my ($pid, $ppid) = delete(@$self{qw(-ipc_pid -ipc_ppid)});
+	my ($w_req, $r_res) = delete(@$self{qw(-ipc_req -ipc_res)});
+	if (!$w_req && !$r_res) {
+		die "unexpected PID:$pid without IPC pipes" if $pid;
+		return; # idempotent
+	}
+	die 'no PID with IPC pipes' unless $pid;
+	_send_rec($w_req, [ undef, 'ipc_worker_exit', 0 ]);
+	$w_req = $r_res = undef;
+
+	# allow any sibling to send ipc_worker_exit, but siblings can't wait
+	return if $$ != $ppid;
 	eval {
 		my $reap = $self->can('ipc_worker_reap');
 		PublicInbox::DS::dwaitpid($pid, $reap, $self);
@@ -153,17 +159,30 @@ sub ipc_lock_init {
 # call $self->$sub(@args), on a worker if ipc_worker_spawn was used
 sub ipc_do {
 	my ($self, $sub, @args) = @_;
-	if (my $s1 = $self->{-ipc_sock}) {
+	if (my $w_req = $self->{-ipc_req}) { # run in worker
 		my $ipc_lock = $self->{-ipc_lock};
 		my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef;
-		_send_rec($s1, [ wantarray, $sub, @args ]);
-		return unless defined(wantarray);
-		my $ret = _get_rec($s1) // die "no response on $sub";
-		die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
-		wantarray ? @$ret : $$ret;
-	} else {
+		if (defined(wantarray)) {
+			my $r_res = $self->{-ipc_res} or die 'no ipc_res';
+			_send_rec($w_req, [ wantarray, $sub, @args ]);
+			my $ret = _get_rec($r_res) // die "no response on $sub";
+			die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
+			wantarray ? @$ret : $$ret;
+		} else { # likely, fire-and-forget into pipe
+			_send_rec($w_req, [ undef , $sub, @args ]);
+		}
+	} else { # run locally
 		$self->$sub(@args);
 	}
 }
 
+# needed when there's multiple IPC workers and the parent forking
+# causes newer siblings to inherit older siblings sockets
+sub ipc_sibling_atfork_child {
+	my ($self) = @_;
+	my ($pid, undef) = delete(@$self{qw(-ipc_pid -ipc_ppid)});
+	delete(@$self{qw(-ipc_req -ipc_res)});
+	$pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
+}
+
 1;
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
index 83cbbb25..0051df93 100644
--- a/lib/PublicInbox/SearchIdxShard.pm
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -21,6 +21,14 @@ sub new {
 	if ($v2w->{parallel}) {
 		local $self->{-v2w_afc} = $v2w;
 		$self->ipc_worker_spawn("shard[$shard]");
+		# F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size for
+		# inputs speeds V2Writable batch imports across 8 cores by
+		# nearly 20%.  Since any of our responses are small, make
+		# the response pipe as small as possible
+		if ($^O eq 'linux') {
+			fcntl($self->{-ipc_req}, 1031, 1048576);
+			fcntl($self->{-ipc_res}, 1031, 4096);
+		}
 	}
 	$self;
 }
@@ -36,7 +44,7 @@ sub _worker_done {
 sub ipc_atfork_child { # called automatically before ipc_worker_loop
 	my ($self) = @_;
 	my $v2w = delete $self->{-v2w_afc} or die 'BUG: {-v2w_afc} missing';
-	$v2w->atfork_child; # calls shard_atfork_child on our siblings
+	$v2w->atfork_child; # calls ipc_sibling_atfork_child on our siblings
 	$v2w->{current_info} = "[$self->{shard}]"; # for $SIG{__WARN__}
 	$self->begin_txn_lazy;
 	# caller must capture this:
@@ -49,17 +57,6 @@ sub index_eml {
 	$self->ipc_do('add_message', $eml, $smsg);
 }
 
-# needed when there's multiple IPC workers and the parent forking
-# causes newer siblings to inherit older siblings sockets
-sub shard_atfork_child {
-	my ($self) = @_;
-	my $pid = delete($self->{-ipc_worker_pid}) or
-			die "BUG: $$ no -ipc_worker_pid";
-	my $s1 = delete($self->{-ipc_sock}) or die "BUG: $$ no -ipc_sock";
-	$pid == $$ and die "BUG: $$ shard_atfork_child called on itself";
-	close($s1) or die "close -ipc_sock: $!";
-}
-
 # wait for return to determine when ipc_do('commit_txn_lazy') is done
 sub echo {
 	shift;
@@ -80,7 +77,7 @@ sub shard_close {
 
 sub shard_over_check {
 	my ($self, $over) = @_;
-	if ($self->{-ipc_sock} && $over->{dbh}) {
+	if ($self->{-ipc_req} && $over->{dbh}) {
 		# can't send DB handles over IPC
 		$over = ref($over)->new($over->{dbh}->sqlite_db_filename);
 	}
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index c4efbdd2..6be95979 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -823,7 +823,7 @@ sub content_exists ($$$) {
 sub atfork_child {
 	my ($self) = @_;
 	if (my $older_siblings = $self->{idx_shards}) {
-		$_->shard_atfork_child for @$older_siblings;
+		$_->ipc_sibling_atfork_child for @$older_siblings;
 	}
 	if (my $im = $self->{im}) {
 		$im->atfork_child;

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 7/7] searchidxshard: use add_xapian directly for v2
  2021-01-03  2:06 [PATCH 0/7] v2: swap in new IPC package Eric Wong
                   ` (5 preceding siblings ...)
  2021-01-03  2:06 ` [PATCH 6/7] ipc: switch to one-way pipes Eric Wong
@ 2021-01-03  2:06 ` Eric Wong
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2021-01-03  2:06 UTC (permalink / raw)
  To: meta

We can more clearly distinguish between v1 and v2-only code
paths this way, and may be able to save a few cycles this way.
---
 lib/PublicInbox/SearchIdx.pm      | 1 +
 lib/PublicInbox/SearchIdxShard.pm | 2 +-
 lib/PublicInbox/V2Writable.pm     | 8 ++++++--
 3 files changed, 8 insertions(+), 3 deletions(-)

diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index a7005051..adced076 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -397,6 +397,7 @@ sub eml2doc ($$$;$) {
 
 sub add_xapian ($$$$) {
 	my ($self, $eml, $smsg, $mids) = @_;
+	begin_txn_lazy($self);
 	my $doc = eml2doc($self, $eml, $smsg, $mids);
 	$self->{xdb}->replace_document($smsg->{num}, $doc);
 }
diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm
index 0051df93..1598faeb 100644
--- a/lib/PublicInbox/SearchIdxShard.pm
+++ b/lib/PublicInbox/SearchIdxShard.pm
@@ -54,7 +54,7 @@ sub ipc_atfork_child { # called automatically before ipc_worker_loop
 sub index_eml {
 	my ($self, $eml, $smsg, $eidx_key) = @_;
 	$smsg->{eidx_key} = $eidx_key if defined $eidx_key;
-	$self->ipc_do('add_message', $eml, $smsg);
+	$self->ipc_do('add_xapian', $eml, $smsg);
 }
 
 # wait for return to determine when ipc_do('commit_txn_lazy') is done
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 6be95979..459c7e86 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -141,8 +141,10 @@ sub idx_shard ($$) {
 sub do_idx ($$$) {
 	my ($self, $eml, $smsg) = @_;
 	$self->{oidx}->add_overview($eml, $smsg);
-	my $idx = idx_shard($self, $smsg->{num});
-	$idx->index_eml($eml, $smsg);
+	if ($self->{-need_xapian}) {
+		my $idx = idx_shard($self, $smsg->{num});
+		$idx->index_eml($eml, $smsg);
+	}
 	my $n = $self->{transact_bytes} += $smsg->{bytes};
 	$n >= $self->{batch_bytes};
 }
@@ -267,6 +269,7 @@ sub _idx_init { # with_umask callback
 	my $max = $self->{shards} - 1;
 	my $idx = $self->{idx_shards} = [];
 	push @$idx, PublicInbox::SearchIdxShard->new($self, $_) for (0..$max);
+	$self->{-need_xapian} = $idx->[0]->need_xapian;
 
 	# SearchIdxShard may do their own flushing, so don't scale
 	# until after forking
@@ -1129,6 +1132,7 @@ sub sync_prepare ($$) {
 sub unindex_oid_aux ($$$) {
 	my ($self, $oid, $mid) = @_;
 	my @removed = $self->{oidx}->remove_oid($oid, $mid);
+	return unless $self->{-need_xapian};
 	for my $num (@removed) {
 		idx_shard($self, $num)->ipc_do('xdb_remove', $num);
 	}

^ permalink raw reply related	[flat|nested] 8+ messages in thread

end of thread, other threads:[~2021-01-03  2:06 UTC | newest]

Thread overview: 8+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-01-03  2:06 [PATCH 0/7] v2: swap in new IPC package Eric Wong
2021-01-03  2:06 ` [PATCH 1/7] ipc: some documentation comments Eric Wong
2021-01-03  2:06 ` [PATCH 2/7] searchidxshard: use PublicInbox::IPC to kill lots of code Eric Wong
2021-01-03  2:06 ` [PATCH 3/7] searchidxshard: IPC conversion, part 2 Eric Wong
2021-01-03  2:06 ` [PATCH 4/7] searchidxshard: replace index_raw with index_eml Eric Wong
2021-01-03  2:06 ` [PATCH 5/7] use Eml (or MIME) objects for all indexing paths Eric Wong
2021-01-03  2:06 ` [PATCH 6/7] ipc: switch to one-way pipes Eric Wong
2021-01-03  2:06 ` [PATCH 7/7] searchidxshard: use add_xapian directly for v2 Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).