unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
From: "Eric Wong (Contractor, The Linux Foundation)" <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 10/12] v2: parallelize Xapian indexing
Date: Thu, 22 Feb 2018 21:42:20 +0000	[thread overview]
Message-ID: <20180222214222.1086-11-e@80x24.org> (raw)
In-Reply-To: <20180222214222.1086-1-e@80x24.org>

The parallelization requires splitting Msgmap, text+term
indexing, and thread-linking out into separate processes.

git-fast-import is fast, so we don't bother parallelizing it.

Msgmap (SQLite) and thread-linking (Xapian) must be serialized
because they rely on monotonically increasing numbers (NNTP
article number and internal thread_id, respectively).

We handle msgmap in the main process which drives fast-import.
When the article number is retrieved/generated, we write the
entire message to per-partition subprocesses via pipes for
expensive text+term indexing.

When these per-partition subprocesses are done with the
expensive text+term indexing, they write SearchMsg (small data)
to a shared pipe (inherited from the main V2Writable process)
back to the threader, which runs its own subprocess.

The number of text+term Xapian partitions is chosen at import
and can be made equal to the number of cores in a machine.

V2Writable --> Import -> git-fast-import
           \-> SearchIdxThread -> Msgmap (synchronous)
           \-> SearchIdxPart[n] -> SearchIdx[*]
	   \-> SearchIdxThread -> SearchIdx ("threader", a subprocess)

[* ] each subprocess writes to threader
---
 MANIFEST                           |   2 +
 lib/PublicInbox/Import.pm          |   4 +-
 lib/PublicInbox/Search.pm          |   5 +-
 lib/PublicInbox/SearchIdx.pm       |  80 ++++++++++++++++++--------
 lib/PublicInbox/SearchIdxPart.pm   |  70 +++++++++++++++++++++++
 lib/PublicInbox/SearchIdxThread.pm | 111 +++++++++++++++++++++++++++++++++++++
 lib/PublicInbox/SearchMsg.pm       |  33 +++++------
 lib/PublicInbox/V2Writable.pm      |  80 +++++++++++++++++---------
 8 files changed, 314 insertions(+), 71 deletions(-)
 create mode 100644 lib/PublicInbox/SearchIdxPart.pm
 create mode 100644 lib/PublicInbox/SearchIdxThread.pm

diff --git a/MANIFEST b/MANIFEST
index 4b51b54..2a6f6f0 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -84,6 +84,8 @@ lib/PublicInbox/Reply.pm
 lib/PublicInbox/SaPlugin/ListMirror.pm
 lib/PublicInbox/Search.pm
 lib/PublicInbox/SearchIdx.pm
+lib/PublicInbox/SearchIdxPart.pm
+lib/PublicInbox/SearchIdxThread.pm
 lib/PublicInbox/SearchMsg.pm
 lib/PublicInbox/SearchThread.pm
 lib/PublicInbox/SearchView.pm
diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index 1a2698a..b650e4e 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -280,14 +280,12 @@ sub add {
 	$self->{bytes_added} += $n;
 	print $w "blob\nmark :$blob\ndata ", $n, "\n" or wfail;
 	print $w $str, "\n" or wfail;
-	$str = undef;
 
 	# v2: we need this for Xapian
 	if ($self->{want_object_id}) {
 		chomp($self->{last_object_id} = $self->get_mark(":$blob"));
-		$self->{last_object_size} = $n;
+		$self->{last_object} = [ $n, \$str ];
 	}
-
 	my $ref = $self->{ref};
 	my $commit = $self->{mark}++;
 	my $parent = $tip =~ /\A:/ ? $tip : undef;
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index eac11bd..3b28059 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -124,7 +124,10 @@ sub xdir {
 	if ($self->{version} == 1) {
 		"$self->{mainrepo}/public-inbox/xapian" . SCHEMA_VERSION;
 	} else {
-		"$self->{mainrepo}/xap" . SCHEMA_VERSION;
+		my $dir = "$self->{mainrepo}/xap" . SCHEMA_VERSION;
+		my $part = $self->{partition};
+		defined $part or die "partition not given";
+		$dir .= "/$part";
 	}
 }
 
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index c6c5bd2..cc7e7ec 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -51,7 +51,7 @@ sub git_unquote ($) {
 }
 
 sub new {
-	my ($class, $ibx, $creat) = @_;
+	my ($class, $ibx, $creat, $part) = @_;
 	my $mainrepo = $ibx; # for "public-inbox-index" w/o entry in config
 	my $git_dir = $mainrepo;
 	my ($altid, $git);
@@ -83,7 +83,10 @@ sub new {
 	if ($version == 1) {
 		$self->{lock_path} = "$mainrepo/ssoma.lock";
 	} elsif ($version == 2) {
-		$self->{lock_path} = "$mainrepo/inbox.lock";
+		defined $part or die "partition is required for v2\n";
+		# partition is a number or "all"
+		$self->{partition} = $part;
+		$self->{lock_path} = undef;
 		$self->{msgmap_path} = "$mainrepo/msgmap.sqlite3";
 	} else {
 		die "unsupported inbox version=$version\n";
@@ -119,14 +122,16 @@ sub _xdb_acquire {
 sub _lock_acquire {
 	my ($self) = @_;
 	croak 'already locked' if $self->{lockfh};
-	sysopen(my $lockfh, $self->{lock_path}, O_WRONLY|O_CREAT) or
-		die "failed to open lock $self->{lock_path}: $!\n";
+	my $lock_path = $self->{lock_path} or return;
+	sysopen(my $lockfh, $lock_path, O_WRONLY|O_CREAT) or
+		die "failed to open lock $lock_path: $!\n";
 	flock($lockfh, LOCK_EX) or die "lock failed: $!\n";
 	$self->{lockfh} = $lockfh;
 }
 
 sub _lock_release {
 	my ($self) = @_;
+	return unless $self->{lock_path};
 	my $lockfh = delete $self->{lockfh} or croak 'not locked';
 	flock($lockfh, LOCK_UN) or die "unlock failed: $!\n";
 	close $lockfh or die "close failed: $!\n";
@@ -138,8 +143,8 @@ sub add_val ($$$) {
 	$doc->add_value($col, $num);
 }
 
-sub add_values ($$$) {
-	my ($smsg, $bytes, $num) = @_;
+sub add_values ($$$$) {
+	my ($smsg, $bytes, $num, $lines) = @_;
 
 	my $ts = $smsg->ts;
 	my $doc = $smsg->{doc};
@@ -149,8 +154,7 @@ sub add_values ($$$) {
 
 	defined($bytes) and add_val($doc, &PublicInbox::Search::BYTES, $bytes);
 
-	add_val($doc, &PublicInbox::Search::LINES,
-			$smsg->{mime}->body_raw =~ tr!\n!\n!);
+	add_val($doc, &PublicInbox::Search::LINES, $lines);
 
 	my $yyyymmdd = strftime('%Y%m%d', gmtime($ts));
 	add_val($doc, PublicInbox::Search::YYYYMMDD, $yyyymmdd);
@@ -281,6 +285,7 @@ sub add_message {
 
 	my ($doc_id, $old_tid);
 	my $mid = mid_clean(mid_mime($mime));
+	my $threader = $self->{threader};
 
 	eval {
 		die 'Message-ID too long' if length($mid) > MAX_MID_SIZE;
@@ -289,19 +294,22 @@ sub add_message {
 			# convert a ghost to a regular message
 			# it will also clobber any existing regular message
 			$doc_id = $smsg->{doc_id};
-			$old_tid = $smsg->thread_id;
+			$old_tid = $smsg->thread_id unless $threader;
 		}
 		$smsg = PublicInbox::SearchMsg->new($mime);
 		my $doc = $smsg->{doc};
 		$doc->add_term('XMID' . $mid);
 
 		my $subj = $smsg->subject;
+		my $xpath;
 		if ($subj ne '') {
-			my $path = $self->subject_path($subj);
-			$doc->add_term('XPATH' . id_compress($path));
+			$xpath = $self->subject_path($subj);
+			$xpath = id_compress($xpath);
+			$doc->add_term('XPATH' . $xpath);
 		}
 
-		add_values($smsg, $bytes, $num);
+		my $lines = $mime->body_raw =~ tr!\n!\n!;
+		add_values($smsg, $bytes, $num, $lines);
 
 		my $tg = $self->term_generator;
 
@@ -350,9 +358,16 @@ sub add_message {
 			index_body($tg, \@orig, $doc) if @orig;
 		});
 
-		link_message($self, $smsg, $old_tid);
+		# populates smsg->references for smsg->to_doc_data
+		my $refs = parse_references($smsg);
+		my $data = $smsg->to_doc_data($blob);
+		if ($threader) {
+			$threader->thread_msg($mid, $smsg->ts, $xpath, $data);
+		} else {
+			link_message($self, $smsg, $refs, $old_tid);
+		}
 		$tg->index_text($mid, 1, 'XM');
-		$doc->set_data($smsg->to_doc_data($blob));
+		$doc->set_data($data);
 
 		if (my $altid = $self->{-altid}) {
 			foreach my $alt (@$altid) {
@@ -424,8 +439,8 @@ sub next_thread_id {
 	$last_thread_id;
 }
 
-sub link_message {
-	my ($self, $smsg, $old_tid) = @_;
+sub parse_references ($) {
+	my ($smsg) = @_;
 	my $doc = $smsg->{doc};
 	my $mid = $smsg->mid;
 	my $mime = $smsg->{mime};
@@ -436,7 +451,6 @@ sub link_message {
 	my @refs = (($hdr->header_raw('References') || '') =~ /<([^>]+)>/g);
 	push(@refs, (($hdr->header_raw('In-Reply-To') || '') =~ /<([^>]+)>/g));
 
-	my $tid;
 	if (@refs) {
 		my %uniq = ($mid => 1);
 		my @orig_refs = @refs;
@@ -452,25 +466,31 @@ sub link_message {
 			push @refs, $ref;
 		}
 	}
+	$smsg->{references} = '<'.join('> <', @refs).'>' if @refs;
+	\@refs
+}
 
-	if (@refs) {
-		$smsg->{references} = '<'.join('> <', @refs).'>';
+sub link_message {
+	my ($self, $smsg, $refs, $old_tid) = @_;
+	my $tid;
+
+	if (@$refs) {
 
 		# first ref *should* be the thread root,
 		# but we can never trust clients to do the right thing
-		my $ref = shift @refs;
+		my $ref = shift @$refs;
 		$tid = $self->_resolve_mid_to_tid($ref);
 		$self->merge_threads($tid, $old_tid) if defined $old_tid;
 
 		# the rest of the refs should point to this tid:
-		foreach $ref (@refs) {
+		foreach $ref (@$refs) {
 			my $ptid = $self->_resolve_mid_to_tid($ref);
 			merge_threads($self, $tid, $ptid);
 		}
 	} else {
 		$tid = defined $old_tid ? $old_tid : $self->next_thread_id;
 	}
-	$doc->add_term('G' . $tid);
+	$smsg->{doc}->add_term('G' . $tid);
 }
 
 sub index_blob {
@@ -798,4 +818,20 @@ sub DESTROY {
 	$_[0]->{lockfh} = undef;
 }
 
+# remote_* subs are only used by SearchIdxPart and SearchIdxThread:
+sub remote_commit {
+	my ($self) = @_;
+	print { $self->{w} } "commit\n" or die "failed to write commit: $!";
+}
+
+sub remote_close {
+	my ($self) = @_;
+	my $pid = delete $self->{pid} or die "no process to wait on\n";
+	my $w = delete $self->{w} or die "no pipe to write to\n";
+	print $w "close\n" or die "failed to write to pid:$pid: $!\n";
+	close $w or die "failed to close pipe for pid:$pid: $!\n";
+	waitpid($pid, 0) == $pid or die "remote process did not finish";
+	$? == 0 or die ref($self)." exited with: $?";
+}
+
 1;
diff --git a/lib/PublicInbox/SearchIdxPart.pm b/lib/PublicInbox/SearchIdxPart.pm
new file mode 100644
index 0000000..d5a3fd1
--- /dev/null
+++ b/lib/PublicInbox/SearchIdxPart.pm
@@ -0,0 +1,70 @@
+# Copyright (C) 2018 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+package PublicInbox::SearchIdxPart;
+use strict;
+use warnings;
+use base qw(PublicInbox::SearchIdx);
+
+sub new {
+	my ($class, $v2writable, $part, $threader) = @_;
+	my $self = $class->SUPER::new($v2writable->{-inbox}, 1, $part);
+	$self->{threader} = $threader;
+	my ($r, $w);
+	pipe($r, $w) or die "pipe failed: $!\n";
+	my $pid = fork;
+	defined $pid or die "fork failed: $!\n";
+	if ($pid == 0) {
+		foreach my $other (@{$v2writable->{idx_parts}}) {
+			my $other_w = $other->{w} or next;
+			close $other_w or die "close other failed: $!\n";
+		}
+		$v2writable = undef;
+		close $w;
+		eval { partition_worker_loop($self, $r) };
+		die "worker $part died: $@\n" if $@;
+		die "unexpected MM $self->{mm}" if $self->{mm};
+		exit;
+	}
+	$self->{pid} = $pid;
+	$self->{w} = $w;
+	close $r;
+	$self;
+}
+
+sub partition_worker_loop ($$) {
+	my ($self, $r) = @_;
+	my $xdb = $self->_xdb_acquire;
+	$xdb->begin_transaction;
+	my $txn = 1;
+	while (my $line = $r->getline) {
+		if ($line eq "commit\n") {
+			$xdb->commit_transaction if $txn;
+			$txn = undef;
+		} elsif ($line eq "close\n") {
+			$self->_xdb_release;
+			$xdb = $txn = undef;
+		} else {
+			my ($len, $artnum, $object_id) = split(/ /, $line);
+			$xdb ||= $self->_xdb_acquire;
+			if (!$txn) {
+				$xdb->begin_transaction;
+				$txn = 1;
+			}
+			my $n = read($r, my $msg, $len) or die "read: $!\n";
+			$n == $len or die "short read: $n != $len\n";
+			my $mime = PublicInbox::MIME->new(\$msg);
+			$self->index_blob($mime, $len, $artnum, $object_id);
+		}
+	}
+	warn "$$ still in transaction\n" if $txn;
+	warn "$$ xdb active\n" if $xdb;
+}
+
+# called by V2Writable
+sub index_raw {
+	my ($self, $len, $msgref, $artnum, $object_id) = @_;
+	print { $self->{w} } "$len $artnum $object_id\n", $$msgref or die
+		"failed to write partition $!\n";
+}
+
+1;
diff --git a/lib/PublicInbox/SearchIdxThread.pm b/lib/PublicInbox/SearchIdxThread.pm
new file mode 100644
index 0000000..6471309
--- /dev/null
+++ b/lib/PublicInbox/SearchIdxThread.pm
@@ -0,0 +1,111 @@
+# Copyright (C) 2018 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+package PublicInbox::SearchIdxThread;
+use strict;
+use warnings;
+use base qw(PublicInbox::SearchIdx);
+use Storable qw(freeze thaw);
+
+sub new {
+	my ($class, $v2ibx) = @_;
+	my $self = $class->SUPER::new($v2ibx, 1, 'all');
+	# create the DB:
+	$self->_xdb_acquire;
+	$self->_xdb_release;
+
+	my ($r, $w);
+	pipe($r, $w) or die "pipe failed: $!\n";
+	binmode $r, ':raw';
+	binmode $w, ':raw';
+	my $pid = fork;
+	defined $pid or die "fork failed: $!\n";
+	if ($pid == 0) {
+		close $w;
+		eval { thread_worker_loop($self, $r) };
+		die "thread worker died: $@\n" if $@;
+		exit;
+	}
+	$self->{w} = $w;
+	$self->{pid} = $pid;
+	close $r;
+
+	$w->autoflush(1);
+
+	# lock on only exists in parent, not in worker
+	my $l = $self->{lock_path} = $self->xdir . '/thread.lock';
+	open my $fh, '>>', $l or die "failed to create $l: $!\n";
+	$self;
+}
+
+sub thread_worker_loop {
+	my ($self, $r) = @_;
+	my $msg;
+	my $xdb = $self->_xdb_acquire;
+	$xdb->begin_transaction;
+	my $txn = 1;
+	while (my $line = $r->getline) {
+		if ($line eq "commit\n") {
+			$xdb->commit_transaction if $txn;
+			$txn = undef;
+		} elsif ($line eq "close\n") {
+			$self->_xdb_release;
+			$xdb = $txn = undef;
+		} else {
+			read($r, $msg, $line) or die "read failed: $!\n";
+			$msg = thaw($msg); # should raise on error
+			defined $msg or die "failed to thaw buffer\n";
+			if (!$txn) {
+				$xdb->begin_transaction;
+				$txn = 1;
+			}
+			eval { $self->thread_msg_real(@$msg) };
+			warn "failed to index message <$msg->[0]>: $@\n" if $@;
+		}
+	}
+}
+
+# called by a partition worker
+sub thread_msg {
+	my ($self, $mid, $ts, $xpath, $doc_data) = @_;
+	my $w = $self->{w};
+	my $err;
+	my $str = freeze([ $mid, $ts, $xpath, $doc_data ]);
+	my $len = length($str) . "\n";
+
+	# multiple processes write to the same pipe, so use flock
+	$self->_lock_acquire;
+	print $w $len, $str or $err = $!;
+	$self->_lock_release;
+
+	die "print failed: $err\n" if $err;
+}
+
+sub thread_msg_real {
+	my ($self, $mid, $ts, $xpath, $doc_data) = @_;
+	my $smsg = $self->lookup_message($mid);
+	my ($old_tid, $doc_id);
+	if ($smsg) {
+		# convert a ghost to a regular message
+		# it will also clobber any existing regular message
+		$doc_id = $smsg->{doc_id};
+		$old_tid = $smsg->thread_id;
+	} else {
+		$smsg = PublicInbox::SearchMsg->new(undef);
+		$smsg->{mid} = $mid;
+	}
+	my $doc = $smsg->{doc};
+	$doc->add_term('XPATH' . $xpath) if defined $xpath;
+	$doc->add_term('XMID' . $mid);
+	$doc->set_data($doc_data);
+	$smsg->{ts} = $ts;
+	my @refs = ($smsg->references =~ /<([^>]+)>/g);
+	$self->link_message($smsg, \@refs, $old_tid);
+	my $db = $self->{xdb};
+	if (defined $doc_id) {
+		$db->replace_document($doc_id, $doc);
+	} else {
+		$doc_id = $db->add_document($doc);
+	}
+}
+
+1;
diff --git a/lib/PublicInbox/SearchMsg.pm b/lib/PublicInbox/SearchMsg.pm
index 25c1abb..941bfd2 100644
--- a/lib/PublicInbox/SearchMsg.pm
+++ b/lib/PublicInbox/SearchMsg.pm
@@ -29,19 +29,24 @@ sub get_val ($$) {
 	Search::Xapian::sortable_unserialise($doc->get_value($col));
 }
 
-sub load_expand {
-	my ($self) = @_;
-	my $doc = $self->{doc};
-	my $data = $doc->get_data or return;
-	$self->{ts} = get_val($doc, &PublicInbox::Search::TS);
-	utf8::decode($data);
-	my ($subj, $from, $refs, $to, $cc, $blob) = split(/\n/, $data);
+sub load_from_data ($$) {
+	my ($self) = $_[0]; # data = $_[1]
+	my ($subj, $from, $refs, $to, $cc, $blob) = split(/\n/, $_[1]);
 	$self->{subject} = $subj;
 	$self->{from} = $from;
 	$self->{references} = $refs;
 	$self->{to} = $to;
 	$self->{cc} = $cc;
 	$self->{blob} = $blob;
+}
+
+sub load_expand {
+	my ($self) = @_;
+	my $doc = $self->{doc};
+	my $data = $doc->get_data or return;
+	$self->{ts} = get_val($doc, &PublicInbox::Search::TS);
+	utf8::decode($data);
+	load_from_data($self, $data);
 	$self;
 }
 
@@ -50,17 +55,9 @@ sub load_doc {
 	my $data = $doc->get_data or return;
 	my $ts = get_val($doc, &PublicInbox::Search::TS);
 	utf8::decode($data);
-	my ($subj, $from, $refs, $to, $cc, $blob) = split(/\n/, $data);
-	bless {
-		doc => $doc,
-		subject => $subj,
-		ts => $ts,
-		from => $from,
-		references => $refs,
-		to => $to,
-		cc => $cc,
-		blob => $blob,
-	}, $class;
+	my $self = bless { doc => $doc, ts => $ts }, $class;
+	load_from_data($self, $data);
+	$self
 }
 
 # :bytes and :lines metadata in RFC 3977
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 41bfb8d..cb74ab1 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -6,7 +6,8 @@ package PublicInbox::V2Writable;
 use strict;
 use warnings;
 use Fcntl qw(:flock :DEFAULT);
-use PublicInbox::SearchIdx;
+use PublicInbox::SearchIdxPart;
+use PublicInbox::SearchIdxThread;
 use PublicInbox::MIME;
 use PublicInbox::Git;
 use PublicInbox::Import;
@@ -32,7 +33,8 @@ sub new {
 		im => undef, #  PublicInbox::Import
 		xap_rw => undef, # PublicInbox::V2SearchIdx
 		xap_ro => undef,
-
+		partitions => 4,
+		transact_bytes => 0,
 		# limit each repo to 1GB or so
 		rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
 	};
@@ -55,29 +57,39 @@ sub add {
 	my $cmt = $im->add($mime, $check_cb) or return;
 	$cmt = $im->get_mark($cmt);
 	my $oid = $im->{last_object_id};
-	my $size = $im->{last_object_size};
-
-	my $idx = $self->search_idx;
-	$idx->index_both($mime, $size, $oid);
-	$idx->{xdb}->set_metadata('last_commit', $cmt);
-	my $n = $self->{transact_bytes} += $size;
-	if ($n > PublicInbox::SearchIdx::BATCH_BYTES) {
+	my ($len, $msgref) = @{$im->{last_object}};
+
+	my $nparts = $self->{partitions};
+	my $part = hex(substr($oid, 0, 8)) % $nparts;
+	my $idx = $self->idx_part($part);
+	my $all = $self->{all};
+	my $num = $all->index_mm($mime);
+	$idx->index_raw($len, $msgref, $num, $oid);
+	my $n = $self->{transact_bytes} += $len;
+	if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) {
 		$self->checkpoint;
 	}
 
 	$mime;
 }
 
-sub search_idx {
-	my ($self) = @_;
-	$self->{idx} ||= eval {
-		my $idx = PublicInbox::SearchIdx->new($self->{-inbox}, 1);
-		my $mm = $idx->_msgmap_init;
-		$idx->_xdb_acquire->begin_transaction;
-		$self->{transact_bytes} = 0;
-		$mm->{dbh}->begin_work;
-		$idx
-	};
+sub idx_part {
+	my ($self, $part) = @_;
+	my $idx = $self->{idx_parts};
+	return $idx->[$part] if $idx; # fast path
+
+	# first time initialization:
+	my $all = $self->{all} = 
+		PublicInbox::SearchIdxThread->new($self->{-inbox});
+
+	# need to create all parts before initializing msgmap FD
+	my $max = $self->{partitions} - 1;
+	$idx = $self->{idx_parts} = [];
+	for my $i (0..$max) {
+		push @$idx, PublicInbox::SearchIdxPart->new($self, $i, $all);
+	}
+	$all->_msgmap_init->{dbh}->begin_work;
+	$idx->[$part];
 }
 
 sub remove {
@@ -99,23 +111,37 @@ sub done {
 	my ($self) = @_;
 	my $im = $self->{im};
 	$im->done if $im; # PublicInbox::Import::done
-	$self->searchidx_checkpoint;
+	$self->searchidx_checkpoint(0);
 }
 
 sub checkpoint {
 	my ($self) = @_;
 	my $im = $self->{im};
 	$im->checkpoint if $im; # PublicInbox::Import::checkpoint
-	$self->searchidx_checkpoint;
+	$self->searchidx_checkpoint(1);
 }
 
 sub searchidx_checkpoint {
-	my ($self) = @_;
-	my $idx = delete $self->{idx} or return;
+	my ($self, $more) = @_;
+
+	# order matters, we can only close {all} after all partitions
+	# are done because the partitions also write to {all}
+
+	my $parts = $self->{idx_parts};
+	foreach my $idx (@$parts) {
+		$idx->remote_commit;
+		$idx->remote_close unless $more;
+	}
 
-	$idx->{mm}->{dbh}->commit;
-	$idx->{xdb}->commit_transaction;
-	$idx->_xdb_release;
+	if (my $all = $self->{all}) {
+		$all->{mm}->{dbh}->commit;
+		if ($more) {
+			$all->{mm}->{dbh}->begin_work;
+		}
+		$all->remote_commit;
+		$all->remote_close unless $more;
+	}
+	$self->{transact_bytes} = 0;
 }
 
 sub git_init {
@@ -158,7 +184,7 @@ sub importer {
 		} else {
 			$self->{im} = undef;
 			$im->done;
-			$self->searchidx_checkpoint;
+			$self->searchidx_checkpoint(1);
 			$im = undef;
 			my $git_dir = $self->git_init(++$self->{max_git});
 			my $git = PublicInbox::Git->new($git_dir);
-- 
EW


  parent reply	other threads:[~2018-02-22 21:42 UTC|newest]

Thread overview: 14+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-02-22 21:42 [WIP PATCH 0/12] v2: git repo rotation + parallel Xapian indexing Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 01/12] import: allow the epoch (0s) as a valid time Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 02/12] extmsg: fix broken Xapian MID lookup Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 03/12] search: stop assuming Message-ID is unique Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 04/12] www: stop assuming mainrepo == git_dir Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 05/12] v2writable: initial cut for repo-rotation Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 06/12] git: reload alternates file on missing blob Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 07/12] v2: support Xapian + SQLite indexing Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 08/12] import_vger_from_inbox: allow "-V" option Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 09/12] import_vger_from_mbox: use PublicInbox::MIME and avoid clobbering Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` Eric Wong (Contractor, The Linux Foundation) [this message]
2018-02-22 21:42 ` [PATCH 11/12] v2writable: round-robin to partitions based on article number Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 12/12] searchidxpart: increase pipe size for partitions Eric Wong (Contractor, The Linux Foundation)
2018-02-23  1:22 ` [WIP PATCH 0/12] v2: git repo rotation + parallel Xapian indexing Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20180222214222.1086-11-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).