* [PATCH 0/8] less code, less memory, more speed
@ 2018-04-07 3:41 Eric Wong (Contractor, The Linux Foundation)
2018-04-07 3:41 ` [PATCH 1/8] psgi: ensure /$INBOX/$MESSAGE_ID/T/ endpoint is chronological Eric Wong (Contractor, The Linux Foundation)
` (7 more replies)
0 siblings, 8 replies; 9+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-04-07 3:41 UTC (permalink / raw)
To: meta
Eric Wong (Contractor, The Linux Foundation) (8):
psgi: ensure /$INBOX/$MESSAGE_ID/T/ endpoint is chronological
over: avoid excessive SELECT
over: remove forked subprocess
v2writable: reduce barriers
index: allow specifying --jobs=0 to disable multiprocess
convert: support converting with altid defined
store less data in the Xapian document
msgmap: speed up minmax with separate queries
MANIFEST | 1 -
lib/PublicInbox/Import.pm | 34 +++++---
lib/PublicInbox/Mbox.pm | 6 +-
lib/PublicInbox/Msgmap.pm | 10 ++-
lib/PublicInbox/NNTP.pm | 2 +-
lib/PublicInbox/Over.pm | 24 +++---
lib/PublicInbox/OverIdx.pm | 57 ++++++++++++-
lib/PublicInbox/OverIdxFork.pm | 180 ---------------------------------------
lib/PublicInbox/SearchIdx.pm | 77 +++--------------
lib/PublicInbox/SearchIdxPart.pm | 14 +--
lib/PublicInbox/SearchMsg.pm | 6 +-
lib/PublicInbox/V2Writable.pm | 176 ++++++++++++++++++++++----------------
script/public-inbox-compact | 6 +-
script/public-inbox-convert | 11 ++-
script/public-inbox-index | 6 +-
t/perf-nntpd.t | 13 ++-
t/search.t | 24 +++---
t/v2writable.t | 9 +-
18 files changed, 269 insertions(+), 387 deletions(-)
delete mode 100644 lib/PublicInbox/OverIdxFork.pm
--
EW
^ permalink raw reply [flat|nested] 9+ messages in thread
* [PATCH 1/8] psgi: ensure /$INBOX/$MESSAGE_ID/T/ endpoint is chronological
2018-04-07 3:41 [PATCH 0/8] less code, less memory, more speed Eric Wong (Contractor, The Linux Foundation)
@ 2018-04-07 3:41 ` Eric Wong (Contractor, The Linux Foundation)
2018-04-07 3:41 ` [PATCH 2/8] over: avoid excessive SELECT Eric Wong (Contractor, The Linux Foundation)
` (6 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-04-07 3:41 UTC (permalink / raw)
To: meta
We only need to call get_thread beyond 1000 messages for
fetching entire mboxes. It's probably too much for the HTML
display otherwise.
---
lib/PublicInbox/Mbox.pm | 6 +++---
lib/PublicInbox/Over.pm | 18 +++++++++++-------
2 files changed, 14 insertions(+), 10 deletions(-)
diff --git a/lib/PublicInbox/Mbox.pm b/lib/PublicInbox/Mbox.pm
index 4427ae5..11b2302 100644
--- a/lib/PublicInbox/Mbox.pm
+++ b/lib/PublicInbox/Mbox.pm
@@ -130,9 +130,9 @@ sub thread_mbox {
eval { require IO::Compress::Gzip };
return sub { need_gzip(@_) } if $@;
my $mid = $ctx->{mid};
- my $msgs = $srch->get_thread($mid, 0);
+ my $msgs = $srch->get_thread($mid, {});
return [404, [qw(Content-Type text/plain)], []] if !@$msgs;
- my $prev = $msgs->[-1]->{num};
+ my $prev = $msgs->[-1];
my $i = 0;
my $cb = sub {
while (1) {
@@ -142,7 +142,7 @@ sub thread_mbox {
# refill result set
$msgs = $srch->get_thread($mid, $prev);
return unless @$msgs;
- $prev = $msgs->[-1]->{num};
+ $prev = $msgs->[-1];
$i = 0;
}
};
diff --git a/lib/PublicInbox/Over.pm b/lib/PublicInbox/Over.pm
index 309e044..da0f11e 100644
--- a/lib/PublicInbox/Over.pm
+++ b/lib/PublicInbox/Over.pm
@@ -93,16 +93,20 @@ ORDER BY num ASC LIMIT 1
SELECT tid,sid FROM over WHERE num = ? LIMIT 1
defined $tid or return nothing; # $sid may be undef
-
- $prev ||= 0;
- my $cond = 'FROM over WHERE (tid = ? OR sid = ?) AND num > ?';
- my $msgs = do_get($self, <<"", {}, $tid, $sid, $prev);
-SELECT * $cond ORDER BY num ASC
+ my $sort_col = 'ds';
+ $num = 0;
+ if ($prev) {
+ $num = $prev->{num} || 0;
+ $sort_col = 'num';
+ }
+ my $cond = '(tid = ? OR sid = ?) AND num > ?';
+ my $msgs = do_get($self, <<"", {}, $tid, $sid, $num);
+SELECT num,ts,ds,ddd FROM over WHERE $cond ORDER BY $sort_col ASC
return $msgs unless wantarray;
- my $nr = $dbh->selectrow_array(<<"", undef, $tid, $sid, $prev);
-SELECT COUNT(num) $cond
+ my $nr = $dbh->selectrow_array(<<"", undef, $tid, $sid, $num);
+SELECT COUNT(num) FROM over WHERE $cond
($nr, $msgs);
}
--
EW
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH 2/8] over: avoid excessive SELECT
2018-04-07 3:41 [PATCH 0/8] less code, less memory, more speed Eric Wong (Contractor, The Linux Foundation)
2018-04-07 3:41 ` [PATCH 1/8] psgi: ensure /$INBOX/$MESSAGE_ID/T/ endpoint is chronological Eric Wong (Contractor, The Linux Foundation)
@ 2018-04-07 3:41 ` Eric Wong (Contractor, The Linux Foundation)
2018-04-07 3:41 ` [PATCH 3/8] over: remove forked subprocess Eric Wong (Contractor, The Linux Foundation)
` (5 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-04-07 3:41 UTC (permalink / raw)
To: meta
No need to read what we don't need into the Perl process.
Fix some broken capitalization while we're at it.
---
lib/PublicInbox/Over.pm | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/lib/PublicInbox/Over.pm b/lib/PublicInbox/Over.pm
index da0f11e..07e54b6 100644
--- a/lib/PublicInbox/Over.pm
+++ b/lib/PublicInbox/Over.pm
@@ -59,7 +59,7 @@ sub do_get {
sub query_xover {
my ($self, $beg, $end) = @_;
do_get($self, <<'', {}, $beg, $end);
-SELECT * FROM over WHERE num >= ? AND num <= ?
+SELECT num,ts,ds,ddd FROM over WHERE num >= ? AND num <= ?
ORDER BY num ASC
}
@@ -131,7 +131,7 @@ sub recent {
}
}
my $msgs = do_get($self, <<"", $opts, @v);
-SELECT * FROM over WHERE $s
+SELECT ts,ds,ddd FROM over WHERE $s
return $msgs unless wantarray;
@@ -145,7 +145,7 @@ sub get_art {
my ($self, $num) = @_;
my $dbh = $self->connect;
my $smsg = $dbh->selectrow_hashref(<<'', undef, $num);
-SELECT * from OVER where num = ? LIMIT 1
+SELECT num,ds,ts,ddd FROM over WHERE num = ? LIMIT 1
return load_from_row($smsg) if $smsg;
undef;
--
EW
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH 3/8] over: remove forked subprocess
2018-04-07 3:41 [PATCH 0/8] less code, less memory, more speed Eric Wong (Contractor, The Linux Foundation)
2018-04-07 3:41 ` [PATCH 1/8] psgi: ensure /$INBOX/$MESSAGE_ID/T/ endpoint is chronological Eric Wong (Contractor, The Linux Foundation)
2018-04-07 3:41 ` [PATCH 2/8] over: avoid excessive SELECT Eric Wong (Contractor, The Linux Foundation)
@ 2018-04-07 3:41 ` Eric Wong (Contractor, The Linux Foundation)
2018-04-07 3:41 ` [PATCH 4/8] v2writable: reduce barriers Eric Wong (Contractor, The Linux Foundation)
` (4 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-04-07 3:41 UTC (permalink / raw)
To: meta
Since the overview stuff is a synchronization point anyways,
move it into the main V2Writable process and allow us to
drop a bunch of code. This is another step towards making
Xapian optional for v2.
In other words, the fan-out point is moved and the Xapian
partitions no longer need to synchronize against each other:
Before:
/-------->\
/---------->\
v2writable -->+----parts----> over
\---------->/
\-------->/
After:
/---------->
/----------->
v2writable --> over-->+----parts--->
\----------->
\---------->
Since the overview/threading logic needs to run on the same core
that feeds git-fast-import, it's slower for small repos but is
not noticeable in large imports where I/O wait in the partitions
dominates.
---
MANIFEST | 1 -
lib/PublicInbox/OverIdx.pm | 57 ++++++++++++-
lib/PublicInbox/OverIdxFork.pm | 180 ---------------------------------------
lib/PublicInbox/SearchIdx.pm | 62 +++++---------
lib/PublicInbox/SearchIdxPart.pm | 14 +--
lib/PublicInbox/V2Writable.pm | 89 +++++++++++++------
6 files changed, 144 insertions(+), 259 deletions(-)
delete mode 100644 lib/PublicInbox/OverIdxFork.pm
diff --git a/MANIFEST b/MANIFEST
index 82cc67d..58b3634 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -84,7 +84,6 @@ lib/PublicInbox/NNTPD.pm
lib/PublicInbox/NewsWWW.pm
lib/PublicInbox/Over.pm
lib/PublicInbox/OverIdx.pm
-lib/PublicInbox/OverIdxFork.pm
lib/PublicInbox/ParentPipe.pm
lib/PublicInbox/ProcessPipe.pm
lib/PublicInbox/Qspawn.pm
diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm
index 28e4aa9..08f8744 100644
--- a/lib/PublicInbox/OverIdx.pm
+++ b/lib/PublicInbox/OverIdx.pm
@@ -2,14 +2,21 @@
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
# for XOVER, OVER in NNTP, and feeds/homepage/threads in PSGI
-# Unlike Msgmap, this is an _UNSTABLE_ database which can be
+# Unlike Msgmap, this is an _UNSTABLE_ cache which can be
# tweaked/updated over time and rebuilt.
+#
+# Ghost messages (messages which are only referenced in References/In-Reply-To)
+# are denoted by a negative NNTP article number.
package PublicInbox::OverIdx;
use strict;
use warnings;
use base qw(PublicInbox::Over);
use IO::Handle;
use DBI qw(:sql_types); # SQL_BLOB
+use PublicInbox::MID qw/id_compress mids references/;
+use PublicInbox::SearchMsg;
+use Compress::Zlib qw(compress);
+use PublicInbox::Search;
sub dbh_new {
my ($self) = @_;
@@ -200,6 +207,54 @@ sub link_refs {
$tid;
}
+sub parse_references ($$$$) {
+ my ($self, $smsg, $mid0, $mids) = @_;
+ my $mime = $smsg->{mime};
+ my $hdr = $mime->header_obj;
+ my $refs = references($hdr);
+ push(@$refs, @$mids) if scalar(@$mids) > 1;
+ return $refs if scalar(@$refs) == 0;
+
+ # prevent circular references here:
+ my %seen = ( $mid0 => 1 );
+ my @keep;
+ foreach my $ref (@$refs) {
+ if (length($ref) > PublicInbox::MID::MAX_MID_SIZE) {
+ warn "References: <$ref> too long, ignoring\n";
+ next;
+ }
+ next if $seen{$ref}++;
+ push @keep, $ref;
+ }
+ $smsg->{references} = '<'.join('> <', @keep).'>' if @keep;
+ \@keep;
+}
+
+sub add_overview {
+ my ($self, $mime, $bytes, $num, $oid, $mid0) = @_;
+ my $lines = $mime->body_raw =~ tr!\n!\n!;
+ my $smsg = bless {
+ mime => $mime,
+ mid => $mid0,
+ bytes => $bytes,
+ lines => $lines,
+ blob => $oid,
+ }, 'PublicInbox::SearchMsg';
+ my $mids = mids($mime->header_obj);
+ my $refs = $self->parse_references($smsg, $mid0, $mids);
+ my $subj = $smsg->subject;
+ my $xpath;
+ if ($subj ne '') {
+ $xpath = PublicInbox::Search::subject_path($subj);
+ $xpath = id_compress($xpath);
+ }
+ my $dd = $smsg->to_doc_data($oid, $mid0);
+ utf8::encode($dd);
+ $dd = compress($dd);
+ my $values = [ $smsg->ts, $smsg->ds, $num, $mids, $refs, $xpath, $dd ];
+ add_over($self, $values);
+}
+
sub add_over {
my ($self, $values) = @_;
my ($ts, $ds, $num, $mids, $refs, $xpath, $ddd) = @$values;
diff --git a/lib/PublicInbox/OverIdxFork.pm b/lib/PublicInbox/OverIdxFork.pm
deleted file mode 100644
index ec96528..0000000
--- a/lib/PublicInbox/OverIdxFork.pm
+++ /dev/null
@@ -1,180 +0,0 @@
-# Copyright (C) 2018 all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-package PublicInbox::OverIdxFork;
-use strict;
-use warnings;
-use base qw(PublicInbox::OverIdx PublicInbox::Lock);
-use Storable qw(freeze thaw);
-use IO::Handle;
-
-sub create {
- my ($self, $v2writable) = @_;
- $self->SUPER::create();
- $self->spawn_worker($v2writable) if $v2writable->{parallel};
-}
-
-sub spawn_worker {
- my ($self, $v2writable) = @_;
- my ($r, $w);
- pipe($r, $w) or die "pipe failed: $!\n";
- my ($barrier_wait, $barrier_note);
- pipe($barrier_wait, $barrier_note) or die "pipe failed: $!\n";
- binmode $_, ':raw' foreach ($r, $w, $barrier_wait, $barrier_note);
- my $pid = fork;
- defined $pid or die "fork failed: $!\n";
- if ($pid == 0) {
- $v2writable->atfork_child;
- $v2writable = undef;
- close $w;
- close $barrier_wait;
-
- # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size here
- # speeds V2Writable batch imports across 8 cores by nearly 20%
- fcntl($r, 1031, 1048576) if $^O eq 'linux';
-
- eval { over_worker_loop($self, $r, $barrier_note) };
- die "over worker died: $@\n" if $@;
- exit;
- }
- $self->{w} = $w;
- $self->{pid} = $pid;
- $self->{lock_path} = "$self->{filename}.pipe.lock";
- close $r;
- close $barrier_note;
- $self->{barrier_wait} = $barrier_wait;
- $w->autoflush(1);
-}
-
-sub over_worker_loop {
- my ($self, $r, $barrier_note) = @_;
- $barrier_note->autoflush(1);
- $0 = 'pi-v2-overview';
- $self->begin_lazy;
- my $barrier = undef;
- while (my $line = $r->getline) {
- if ($line eq "commit\n") {
- $self->commit_lazy;
- } elsif ($line eq "close\n") {
- $self->disconnect;
- } elsif ($line =~ /\Abarrier_init (\d+)\n\z/) {
- my $n = $1 - 1;
- die "barrier in-progress\n" if defined $barrier;
- $barrier = { map { $_ => 1 } (0..$n) };
- } elsif ($line =~ /\Abarrier (\d+)\n\z/) {
- my $part = $1;
- die "no barrier in-progress\n" unless defined $barrier;
- delete $barrier->{$1} or die "unknown barrier: $part\n";
- if ((scalar keys %$barrier) == 0) {
- $barrier = undef;
- $self->commit_lazy;
- print $barrier_note "barrier_done\n" or die
- "print failed to barrier note: $!";
- }
- } elsif ($line =~ /\AD ([a-f0-9]{40,}) (.*)\n\z/s) {
- my ($oid, $mid) = ($1, $2);
- $self->remove_oid($oid, $mid);
- } else {
- my $len = int($line);
- my $n = read($r, my $msg, $len) or die "read: $!\n";
- $n == $len or die "short read: $n != $len\n";
- $msg = thaw($msg); # should raise on error
- defined $msg or die "failed to thaw buffer\n";
- eval { add_over($self, $msg) };
- warn "failed to index message <$msg->[-1]>: $@\n" if $@;
- }
- }
- die "$$ $0 dbh not released\n" if $self->{dbh};
- die "$$ $0 still in transaction\n" if $self->{txn};
-}
-
-# called by a partition worker
-# values: [ DS, NUM, BYTES, LINES, TS, MIDS, XPATH, doc_data ]
-sub add_over {
- my ($self, $values) = @_;
- if (my $w = $self->{w}) {
- my $err;
- my $str = freeze($values);
- $str = length($str) . "\n" . $str;
-
- # multiple processes write to the same pipe, so use flock
- # We can't avoid this lock for <=PIPE_BUF writes, either,
- # because those atomic writes can break up >PIPE_BUF ones
- $self->lock_acquire;
- print $w $str or $err = $!;
- $self->lock_release;
-
- die "print failed: $err\n" if $err;
- } else {
- $self->SUPER::add_over($values);
- }
-}
-
-sub remove_oid {
- my ($self, $oid, $mid) = @_;
- if (my $w = $self->{w}) {
- my $err;
- $self->lock_acquire;
- print $w "D $oid $mid\n" or $err = $!;
- $self->lock_release;
- die $err if $err;
- } else {
- $self->SUPER::remove_oid($oid, $mid); # OverIdx
- }
-}
-
-# write to the subprocess
-sub barrier_init {
- my ($self, $nparts) = @_;
- my $w = $self->{w} or return;
- my $err;
- $self->lock_acquire;
- print $w "barrier_init $nparts\n" or $err = $!;
- $self->lock_release;
- die $err if $err;
-}
-
-sub barrier_wait {
- my ($self) = @_;
- if (my $bw = $self->{barrier_wait}) {
- my $l = $bw->getline;
- $l eq "barrier_done\n" or die "bad response from barrier_wait: $l\n";
- } else {
- $self->commit_lazy;
- }
-}
-
-sub remote_commit {
- my ($self) = @_;
- if (my $w = $self->{w}) {
- my $err;
- $self->lock_acquire;
- print $w "commit\n" or $err = $!;
- $self->lock_release;
- die $err if $err;
- } else {
- $self->commit_lazy;
- }
-}
-
-# prevent connections when using forked subprocesses
-sub connect {
- my ($self) = @_;
- return if $self->{w};
- $self->SUPER::connect;
-}
-
-sub remote_close {
- my ($self) = @_;
- if (my $w = delete $self->{w}) {
- my $pid = delete $self->{pid} or die "no process to wait on\n";
- print $w "close\n" or die "failed to write to pid:$pid: $!\n";
- close $w or die "failed to close pipe for pid:$pid: $!\n";
- waitpid($pid, 0) == $pid or die "remote process did not finish";
- $? == 0 or die ref($self)." pid:$pid exited with: $?";
- } else {
- die "transaction in progress $self\n" if $self->{txn};
- $self->disconnect;
- }
-}
-
-1;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 3596972..7cfa745 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -12,7 +12,7 @@ use warnings;
use base qw(PublicInbox::Search PublicInbox::Lock);
use PublicInbox::MIME;
use PublicInbox::InboxWritable;
-use PublicInbox::MID qw/mid_clean id_compress mid_mime mids references/;
+use PublicInbox::MID qw/mid_clean id_compress mid_mime mids/;
use PublicInbox::MsgIter;
use Carp qw(croak);
use POSIX qw(strftime);
@@ -76,8 +76,7 @@ sub new {
if ($version == 1) {
$self->{lock_path} = "$mainrepo/ssoma.lock";
my $dir = $self->xdir;
- $self->{over_ro} = $self->{over} =
- PublicInbox::OverIdx->new("$dir/over.sqlite3");
+ $self->{over} = PublicInbox::OverIdx->new("$dir/over.sqlite3");
} elsif ($version == 2) {
defined $part or die "partition is required for v2\n";
# partition is a number
@@ -274,11 +273,6 @@ sub add_message {
my $smsg = PublicInbox::SearchMsg->new($mime);
my $doc = $smsg->{doc};
my $subj = $smsg->subject;
- my $xpath;
- if ($subj ne '') {
- $xpath = $self->subject_path($subj);
- $xpath = id_compress($xpath);
- }
$smsg->{lines} = $mime->body_raw =~ tr!\n!\n!;
defined $bytes or $bytes = length($mime->as_string);
@@ -340,7 +334,6 @@ sub add_message {
});
# populates smsg->references for smsg->to_doc_data
- my $refs = parse_references($smsg, $mid0, $mids);
my $data = $smsg->to_doc_data($oid, $mid0);
foreach my $mid (@$mids) {
$tg->index_text($mid, 1, 'XM');
@@ -359,10 +352,19 @@ sub add_message {
$self->delete_article($num) if defined $num; # for reindexing
- utf8::encode($data);
- $data = compress($data);
- push @vals, $num, $mids, $refs, $xpath, $data;
- $self->{over}->add_over(\@vals);
+ if (my $over = $self->{over}) {
+ utf8::encode($data);
+ $data = compress($data);
+ my $refs = $over->parse_references($smsg, $mid0, $mids);
+ my $xpath;
+ if ($subj ne '') {
+ $xpath = $self->subject_path($subj);
+ $xpath = id_compress($xpath);
+ }
+
+ push @vals, $num, $mids, $refs, $xpath, $data;
+ $over->add_over(\@vals);
+ }
$doc->add_boolean_term('Q' . $_) foreach @$mids;
$doc->add_boolean_term('XNUM' . $num) if defined $num;
$doc_id = $self->{xdb}->add_document($doc);
@@ -432,6 +434,8 @@ sub remove_by_oid {
my ($self, $oid, $mid) = @_;
my $db = $self->{xdb};
+ $self->{over}->remove_oid($oid, $mid) if $self->{over};
+
# XXX careful, we cannot use batch_do here since we conditionally
# delete documents based on other factors, so we cannot call
# find_doc_ids twice.
@@ -441,7 +445,6 @@ sub remove_by_oid {
# there is only ONE element in @delete unless we
# have bugs in our v2writable deduplication check
my @delete;
- my @over_del;
for (; $head != $tail; $head->inc) {
my $docid = $head->get_docid;
my $doc = $db->get_document($docid);
@@ -449,11 +452,9 @@ sub remove_by_oid {
$smsg->load_expand;
if ($smsg->{blob} eq $oid) {
push(@delete, $docid);
- push(@over_del, $smsg->num);
}
}
$db->delete_document($_) foreach @delete;
- $self->{over}->remove_oid($oid, $mid);
scalar(@delete);
}
@@ -469,29 +470,6 @@ sub term_generator { # write-only
$self->{term_generator} = $tg;
}
-sub parse_references ($$$) {
- my ($smsg, $mid0, $mids) = @_;
- my $mime = $smsg->{mime};
- my $hdr = $mime->header_obj;
- my $refs = references($hdr);
- push(@$refs, @$mids) if scalar(@$mids) > 1;
- return $refs if scalar(@$refs) == 0;
-
- # prevent circular references here:
- my %seen = ( $mid0 => 1 );
- my @keep;
- foreach my $ref (@$refs) {
- if (length($ref) > PublicInbox::MID::MAX_MID_SIZE) {
- warn "References: <$ref> too long, ignoring\n";
- next;
- }
- next if $seen{$ref}++;
- push @keep, $ref;
- }
- $smsg->{references} = '<'.join('> <', @keep).'>' if @keep;
- \@keep;
-}
-
sub index_git_blob_id {
my ($doc, $pfx, $objid) = @_;
@@ -619,7 +597,7 @@ sub _git_log {
--raw -r --no-abbrev/, $range);
}
-# indexes all unindexed messages
+# indexes all unindexed messages (v1 only)
sub _index_sync {
my ($self, $opts) = @_;
my $tip = $opts->{ref} || 'HEAD';
@@ -750,7 +728,7 @@ sub begin_txn_lazy {
my ($self) = @_;
return if $self->{txn};
my $xdb = $self->{xdb} || $self->_xdb_acquire;
- $self->{over}->begin_lazy;
+ $self->{over}->begin_lazy if $self->{over};
$xdb->begin_transaction;
$self->{txn} = 1;
$xdb;
@@ -760,7 +738,7 @@ sub commit_txn_lazy {
my ($self) = @_;
delete $self->{txn} or return;
$self->{xdb}->commit_transaction;
- $self->{over}->commit_lazy;
+ $self->{over}->commit_lazy if $self->{over};
}
sub worker_done {
diff --git a/lib/PublicInbox/SearchIdxPart.pm b/lib/PublicInbox/SearchIdxPart.pm
index e5766a8..078d2df 100644
--- a/lib/PublicInbox/SearchIdxPart.pm
+++ b/lib/PublicInbox/SearchIdxPart.pm
@@ -11,7 +11,6 @@ sub new {
# create the DB before forking:
$self->_xdb_acquire;
$self->_xdb_release;
- $self->{over} = $v2writable->{over};
$self->spawn_worker($v2writable, $part) if $v2writable->{parallel};
$self;
}
@@ -25,7 +24,7 @@ sub spawn_worker {
my $pid = fork;
defined $pid or die "fork failed: $!\n";
if ($pid == 0) {
- $v2writable->atfork_child;
+ my $bnote = $v2writable->atfork_child;
$v2writable = undef;
close $w or die "failed to close: $!";
@@ -33,7 +32,7 @@ sub spawn_worker {
# speeds V2Writable batch imports across 8 cores by nearly 20%
fcntl($r, 1031, 1048576) if $^O eq 'linux';
- eval { partition_worker_loop($self, $r, $part) };
+ eval { partition_worker_loop($self, $r, $part, $bnote) };
die "worker $part died: $@\n" if $@;
die "unexpected MM $self->{mm}" if $self->{mm};
exit;
@@ -43,8 +42,8 @@ sub spawn_worker {
close $r or die "failed to close: $!";
}
-sub partition_worker_loop ($$$) {
- my ($self, $r, $part) = @_;
+sub partition_worker_loop ($$$$) {
+ my ($self, $r, $part, $bnote) = @_;
$0 = "pi-v2-partition[$part]";
$self->begin_txn_lazy;
while (my $line = $r->getline) {
@@ -54,8 +53,9 @@ sub partition_worker_loop ($$$) {
$self->_xdb_release;
} elsif ($line eq "barrier\n") {
$self->commit_txn_lazy;
- print { $self->{over}->{w} } "barrier $part\n" or
- die "write failed to overview $!\n";
+ # no need to lock < 512 bytes is atomic under POSIX
+ print $bnote "barrier $part\n" or
+ die "write failed for barrier $!\n";
} elsif ($line =~ /\AD ([a-f0-9]{40,}) (.+)\n\z/s) {
my ($oid, $mid) = ($1, $2);
$self->begin_txn_lazy;
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 877a459..8361d09 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -13,7 +13,7 @@ use PublicInbox::Import;
use PublicInbox::MID qw(mids);
use PublicInbox::ContentId qw(content_id content_digest);
use PublicInbox::Inbox;
-use PublicInbox::OverIdxFork;
+use PublicInbox::OverIdx;
use PublicInbox::Msgmap;
use PublicInbox::Spawn;
use IO::Handle;
@@ -67,7 +67,7 @@ sub new {
parallel => 1,
transact_bytes => 0,
xpfx => $xpfx,
- over => PublicInbox::OverIdxFork->new("$xpfx/over.sqlite3"),
+ over => PublicInbox::OverIdx->new("$xpfx/over.sqlite3", 1),
lock_path => "$dir/inbox.lock",
# limit each git repo (epoch) to 1GB or so
rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
@@ -111,11 +111,12 @@ sub add {
my $im = $self->importer;
my $cmt = $im->add($mime);
$cmt = $im->get_mark($cmt);
- my ($oid, $len, $msgref) = @{$im->{last_object}};
+ $self->{last_commit}->[$self->{epoch_max}] = $cmt;
+ my ($oid, $len, $msgref) = @{$im->{last_object}};
+ $self->{over}->add_overview($mime, $len, $num, $oid, $mid0);
my $nparts = $self->{partitions};
my $part = $num % $nparts;
- $self->{last_commit}->[$self->{epoch_max}] = $cmt;
my $idx = $self->idx_part($part);
$idx->index_raw($len, $msgref, $num, $oid, $mid0, $mime);
my $n = $self->{transact_bytes} += $len;
@@ -208,11 +209,17 @@ sub idx_init {
# frequently activated.
delete $ibx->{$_} foreach (qw(git mm search));
+ if ($self->{parallel}) {
+ pipe(my ($r, $w)) or die "pipe failed: $!";
+ $self->{bnote} = [ $r, $w ];
+ $w->autoflush(1);
+ }
+
my $over = $self->{over};
$ibx->umask_prepare;
$ibx->with_umask(sub {
$self->lock_acquire;
- $over->create($self);
+ $over->create;
# -compact can change partition count while -watch is idle
my $nparts = count_partitions($self);
@@ -256,7 +263,7 @@ sub remove_internal {
$self->idx_init;
my $im = $self->importer unless $purge;
my $ibx = $self->{-inbox};
- my $srch = $ibx->search;
+ my $over = $self->{over};
my $cid = content_id($mime);
my $parts = $self->{idx_parts};
my $mm = $self->{mm};
@@ -272,7 +279,7 @@ sub remove_internal {
foreach my $mid (@$mids) {
my %gone;
my ($id, $prev);
- while (my $smsg = $srch->next_by_mid($mid, \$id, \$prev)) {
+ while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
my $msg = $ibx->msg_by_smsg($smsg);
if (!defined($msg)) {
warn "broken smsg for $mid\n";
@@ -304,9 +311,7 @@ sub remove_internal {
($mark, undef) = $im->remove($orig, $cmt_msg);
}
$orig = undef;
- foreach my $idx (@$parts) {
- $idx->remote_remove($oid, $mid);
- }
+ $self->unindex_oid_remote($oid, $mid);
}
$self->barrier;
}
@@ -371,8 +376,8 @@ sub done {
}
my $over = $self->{over};
- $over->remote_commit;
- $over->remote_close;
+ $over->commit_lazy;
+ $over->disconnect;
if ($mm) {
$mm->{dbh}->begin_work;
@@ -381,6 +386,7 @@ sub done {
delete $self->{mm};
}
+ delete $self->{bnote};
$self->{transact_bytes} = 0;
$self->lock_release if $parts;
}
@@ -389,7 +395,25 @@ sub checkpoint {
my ($self) = @_;
my $im = $self->{im};
$im->checkpoint if $im; # PublicInbox::Import::checkpoint
- $self->barrier(1);
+ $self->barrier;
+}
+
+sub barrier_init {
+ my ($self, $n) = @_;
+ $self->{bnote} or return;
+ --$n;
+ my $barrier = { map { $_ => 1 } (0..$n) };
+}
+
+sub barrier_wait {
+ my ($self, $barrier) = @_;
+ my $bnote = $self->{bnote} or return;
+ my $r = $bnote->[0];
+ while (scalar keys %$barrier) {
+ defined(my $l = $r->getline) or die "EOF on barrier_wait: $!";
+ $l =~ /\Abarrier (\d+)/ or die "bad line on barrier_wait: $l";
+ delete $barrier->{$1} or die "bad part[$1] on barrier wait";
+ }
}
# issue a write barrier to ensure all data is visible to other processes
@@ -403,17 +427,19 @@ sub barrier {
my $parts = $self->{idx_parts};
if ($parts) {
my $dbh = $self->{mm}->{dbh};
- $dbh->commit; # SQLite msgmap data is second in importance
- my $over = $self->{over};
+ # SQLite msgmap data is second in importance
+ $dbh->commit;
- # Now deal with Xapian and overview DB
- $over->barrier_init(scalar(@$parts));
+ # SQLite overview is third
+ $self->{over}->commit_lazy;
- # each partition needs to issue a barrier command to over
- $_->remote_barrier foreach @$parts;
+ # Now deal with Xapian
+ my $barrier = $self->barrier_init(scalar @$parts);
- $over->barrier_wait; # wait for each Xapian partition
+ # each partition needs to issue a barrier command
+ $_->remote_barrier for @$parts;
+ $self->barrier_wait($barrier); # wait for each Xapian partition
# last_commit is special, don't commit these until
# remote partitions are done:
@@ -486,7 +512,7 @@ sub importer {
} else {
$self->{im} = undef;
$im->done;
- $self->barrier(1);
+ $self->barrier;
$im = undef;
my $git_dir = $self->git_init(++$self->{epoch_max});
my $git = PublicInbox::Git->new($git_dir);
@@ -546,12 +572,11 @@ sub diff ($$$) {
sub lookup_content {
my ($self, $mime, $mid) = @_;
my $ibx = $self->{-inbox};
-
- my $srch = $ibx->search->reopen;
+ my $over = $self->{over};
my $cid = content_id($mime);
my $found;
my ($id, $prev);
- while (my $smsg = $srch->next_by_mid($mid, \$id, \$prev)) {
+ while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
my $msg = $ibx->msg_by_smsg($smsg);
if (!defined($msg)) {
warn "broken smsg for $mid\n";
@@ -581,6 +606,8 @@ sub atfork_child {
$im->atfork_child;
}
die "unexpected mm" if $self->{mm};
+ close $self->{bnote}->[0] or die "close bnote[0]: $!\n";
+ $self->{bnote}->[1];
}
sub mark_deleted {
@@ -654,6 +681,7 @@ sub reindex_oid {
$mm_tmp->mid_delete($mid0) or
die "failed to delete <$mid0> for article #$num\n";
+ $self->{over}->add_overview($mime, $len, $num, $oid, $mid0);
my $nparts = $self->{partitions};
my $part = $num % $nparts;
my $idx = $self->idx_part($part);
@@ -759,17 +787,23 @@ $range
\$regen_max;
}
+sub unindex_oid_remote {
+ my ($self, $oid, $mid) = @_;
+ $_->remote_remove($oid, $mid) foreach @{$self->{idx_parts}};
+ $self->{over}->remove_oid($oid, $mid);
+}
+
sub unindex_oid {
my ($self, $git, $oid) = @_;
my $msgref = $git->cat_file($oid);
my $mime = PublicInbox::MIME->new($msgref);
my $mids = mids($mime->header_obj);
$mime = $msgref = undef;
- my $srch = $self->{-inbox}->search;
+ my $over = $self->{over};
foreach my $mid (@$mids) {
my %gone;
my ($id, $prev);
- while (my $smsg = $srch->next_by_mid($mid, \$id, \$prev)) {
+ while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
$gone{$smsg->num} = 1 if $oid eq $smsg->{blob};
1; # continue
}
@@ -780,8 +814,7 @@ sub unindex_oid {
join(',',sort keys %gone), "\n";
}
$self->{unindexed}->{$_}++ foreach keys %gone;
- $_->remote_remove($oid, $mid) foreach @{$self->{idx_parts}};
- $self->{over}->remove_oid($oid, $mid);
+ $self->unindex_oid_remote($oid, $mid);
$self->barrier;
}
}
--
EW
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH 4/8] v2writable: reduce barriers
2018-04-07 3:41 [PATCH 0/8] less code, less memory, more speed Eric Wong (Contractor, The Linux Foundation)
` (2 preceding siblings ...)
2018-04-07 3:41 ` [PATCH 3/8] over: remove forked subprocess Eric Wong (Contractor, The Linux Foundation)
@ 2018-04-07 3:41 ` Eric Wong (Contractor, The Linux Foundation)
2018-04-07 3:41 ` [PATCH 5/8] index: allow specifying --jobs=0 to disable multiprocess Eric Wong (Contractor, The Linux Foundation)
` (3 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-04-07 3:41 UTC (permalink / raw)
To: meta
Since we handle the overview info synchronously, we only need
barriers in tests, now. We will use asynchronous checkpoints
to sync less-important Xapian data.
For data deduplication, this requires us to hoist out the
cat-blob support in ::Import for reading uncommitted data
in git.
---
lib/PublicInbox/Import.pm | 34 ++++++++-----
lib/PublicInbox/V2Writable.pm | 111 ++++++++++++++++++++----------------------
t/v2writable.t | 2 +-
3 files changed, 75 insertions(+), 72 deletions(-)
diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index 2529798..9e8900f 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -95,19 +95,13 @@ sub _check_path ($$$$) {
$info =~ /\Amissing / ? undef : $info;
}
-sub check_remove_v1 {
- my ($r, $w, $tip, $path, $mime) = @_;
-
- my $info = _check_path($r, $w, $tip, $path) or return ('MISSING',undef);
- $info =~ m!\A100644 blob ([a-f0-9]{40})\t!s or die "not blob: $info";
- my $blob = $1;
-
- print $w "cat-blob $blob\n" or wfail;
+sub _cat_blob ($$$) {
+ my ($r, $w, $oid) = @_;
+ print $w "cat-blob $oid\n" or wfail;
local $/ = "\n";
- $info = <$r>;
+ my $info = <$r>;
defined $info or die "EOF from fast-import / cat-blob: $!";
- $info =~ /\A[a-f0-9]{40} blob (\d+)\n\z/ or
- die "unexpected cat-blob response: $info";
+ $info =~ /\A[a-f0-9]{40} blob (\d+)\n\z/ or return;
my $left = $1;
my $offset = 0;
my $buf = '';
@@ -122,7 +116,23 @@ sub check_remove_v1 {
$n = read($r, my $lf, 1);
defined($n) or die "read final byte of cat-blob failed: $!";
die "bad read on final byte: <$lf>" if $lf ne "\n";
- my $cur = PublicInbox::MIME->new(\$buf);
+ \$buf;
+}
+
+sub cat_blob {
+ my ($self, $oid) = @_;
+ my ($r, $w) = $self->gfi_start;
+ _cat_blob($r, $w, $oid);
+}
+
+sub check_remove_v1 {
+ my ($r, $w, $tip, $path, $mime) = @_;
+
+ my $info = _check_path($r, $w, $tip, $path) or return ('MISSING',undef);
+ $info =~ m!\A100644 blob ([a-f0-9]{40})\t!s or die "not blob: $info";
+ my $oid = $1;
+ my $msg = _cat_blob($r, $w, $oid) or die "BUG: cat-blob $1 failed";
+ my $cur = PublicInbox::MIME->new($msg);
my $cur_s = $cur->header('Subject');
$cur_s = '' unless defined $cur_s;
my $cur_m = $mime->header('Subject');
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 8361d09..53fdb73 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -139,7 +139,6 @@ sub num_for {
};
# crap, Message-ID is already known, hope somebody just resent:
- $self->barrier;
foreach my $m (@$mids) {
# read-only lookup now safe to do after above barrier
my $existing = $self->lookup_content($mime, $m);
@@ -259,10 +258,8 @@ sub purge_oids {
sub remove_internal {
my ($self, $mime, $cmt_msg, $purge) = @_;
- $self->barrier;
$self->idx_init;
my $im = $self->importer unless $purge;
- my $ibx = $self->{-inbox};
my $over = $self->{over};
my $cid = content_id($mime);
my $parts = $self->{idx_parts};
@@ -280,7 +277,7 @@ sub remove_internal {
my %gone;
my ($id, $prev);
while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
- my $msg = $ibx->msg_by_smsg($smsg);
+ my $msg = get_blob($self, $smsg);
if (!defined($msg)) {
warn "broken smsg for $mid\n";
next; # continue
@@ -313,7 +310,6 @@ sub remove_internal {
$orig = undef;
$self->unindex_oid_remote($oid, $mid);
}
- $self->barrier;
}
if (defined $mark) {
@@ -359,45 +355,6 @@ sub set_last_commits ($) {
}
}
-sub done {
- my ($self) = @_;
- my $im = delete $self->{im};
- $im->done if $im; # PublicInbox::Import::done
-
- my $mm = $self->{mm};
- $mm->{dbh}->commit if $mm;
-
- # order matters, we can only close {over} after all partitions
- # are done because the partitions also write to {over}
- my $parts = delete $self->{idx_parts};
- if ($parts) {
- $_->remote_commit for @$parts;
- $_->remote_close for @$parts;
- }
-
- my $over = $self->{over};
- $over->commit_lazy;
- $over->disconnect;
-
- if ($mm) {
- $mm->{dbh}->begin_work;
- set_last_commits($self);
- $mm->{dbh}->commit;
- delete $self->{mm};
- }
-
- delete $self->{bnote};
- $self->{transact_bytes} = 0;
- $self->lock_release if $parts;
-}
-
-sub checkpoint {
- my ($self) = @_;
- my $im = $self->{im};
- $im->checkpoint if $im; # PublicInbox::Import::checkpoint
- $self->barrier;
-}
-
sub barrier_init {
my ($self, $n) = @_;
$self->{bnote} or return;
@@ -416,13 +373,15 @@ sub barrier_wait {
}
}
-# issue a write barrier to ensure all data is visible to other processes
-# and read-only ops. Order of data importance is: git > SQLite > Xapian
-sub barrier {
- my ($self) = @_;
+sub checkpoint ($;$) {
+ my ($self, $wait) = @_;
if (my $im = $self->{im}) {
- $im->barrier;
+ if ($wait) {
+ $im->barrier;
+ } else {
+ $im->checkpoint;
+ }
}
my $parts = $self->{idx_parts};
if ($parts) {
@@ -435,11 +394,17 @@ sub barrier {
$self->{over}->commit_lazy;
# Now deal with Xapian
- my $barrier = $self->barrier_init(scalar @$parts);
+ if ($wait) {
+ my $barrier = $self->barrier_init(scalar @$parts);
+
+ # each partition needs to issue a barrier command
+ $_->remote_barrier for @$parts;
- # each partition needs to issue a barrier command
- $_->remote_barrier for @$parts;
- $self->barrier_wait($barrier); # wait for each Xapian partition
+ # wait for each Xapian partition
+ $self->barrier_wait($barrier);
+ } else {
+ $_->remote_commit for @$parts;
+ }
# last_commit is special, don't commit these until
# remote partitions are done:
@@ -452,6 +417,27 @@ sub barrier {
$self->{transact_bytes} = 0;
}
+# issue a write barrier to ensure all data is visible to other processes
+# and read-only ops. Order of data importance is: git > SQLite > Xapian
+sub barrier { checkpoint($_[0], 1) };
+
+sub done {
+ my ($self) = @_;
+ my $im = delete $self->{im};
+ $im->done if $im; # PublicInbox::Import::done
+ checkpoint($self);
+ my $mm = delete $self->{mm};
+ $mm->{dbh}->commit if $mm;
+ my $parts = delete $self->{idx_parts};
+ if ($parts) {
+ $_->remote_close for @$parts;
+ }
+ $self->{over}->disconnect;
+ delete $self->{bnote};
+ $self->{transact_bytes} = 0;
+ $self->lock_release if $parts;
+}
+
sub git_init {
my ($self, $epoch) = @_;
my $pfx = "$self->{-inbox}->{mainrepo}/git";
@@ -512,8 +498,8 @@ sub importer {
} else {
$self->{im} = undef;
$im->done;
- $self->barrier;
$im = undef;
+ $self->checkpoint;
my $git_dir = $self->git_init(++$self->{epoch_max});
my $git = PublicInbox::Git->new($git_dir);
return $self->import_init($git, 0);
@@ -569,15 +555,25 @@ sub diff ($$$) {
unlink($an, $bn);
}
+sub get_blob ($$) {
+ my ($self, $smsg) = @_;
+ if (my $im = $self->{im}) {
+ my $msg = $im->cat_blob($smsg->{blob});
+ return $msg if $msg;
+ }
+ # older message, should be in alternates
+ my $ibx = $self->{-inbox};
+ $ibx->msg_by_smsg($smsg);
+}
+
sub lookup_content {
my ($self, $mime, $mid) = @_;
- my $ibx = $self->{-inbox};
my $over = $self->{over};
my $cid = content_id($mime);
my $found;
my ($id, $prev);
while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
- my $msg = $ibx->msg_by_smsg($smsg);
+ my $msg = get_blob($self, $smsg);
if (!defined($msg)) {
warn "broken smsg for $mid\n";
next;
@@ -815,7 +811,6 @@ sub unindex_oid {
}
$self->{unindexed}->{$_}++ foreach keys %gone;
$self->unindex_oid_remote($oid, $mid);
- $self->barrier;
}
}
@@ -823,7 +818,6 @@ my $x40 = qr/[a-f0-9]{40}/;
sub unindex {
my ($self, $opts, $git, $unindex_range) = @_;
my $un = $self->{unindexed} ||= {}; # num => removal count
- $self->barrier;
my $before = scalar keys %$un;
my @cmd = qw(log --raw -r
--no-notes --no-color --no-abbrev --no-renames);
@@ -847,7 +841,6 @@ sub unindex {
sub index_sync {
my ($self, $opts) = @_;
$opts ||= {};
- my $ibx = $self->{-inbox};
my $epoch_max;
my $latest = git_dir_latest($self, \$epoch_max);
return unless defined $latest;
diff --git a/t/v2writable.t b/t/v2writable.t
index 4a42c01..b543c53 100644
--- a/t/v2writable.t
+++ b/t/v2writable.t
@@ -213,8 +213,8 @@ EOF
$im = PublicInbox::V2Writable->new($ibx, 1);
is($im->{partitions}, 1, 'detected single partition from previous');
my $smsg = $im->remove($mime, 'test removal');
- my @after = $git0->qx(qw(log --pretty=oneline));
$im->done;
+ my @after = $git0->qx(qw(log --pretty=oneline));
my $tip = shift @after;
like($tip, qr/\A[a-f0-9]+ test removal\n\z/s,
'commit message propagated to git');
--
EW
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH 5/8] index: allow specifying --jobs=0 to disable multiprocess
2018-04-07 3:41 [PATCH 0/8] less code, less memory, more speed Eric Wong (Contractor, The Linux Foundation)
` (3 preceding siblings ...)
2018-04-07 3:41 ` [PATCH 4/8] v2writable: reduce barriers Eric Wong (Contractor, The Linux Foundation)
@ 2018-04-07 3:41 ` Eric Wong (Contractor, The Linux Foundation)
2018-04-07 3:41 ` [PATCH 6/8] convert: support converting with altid defined Eric Wong (Contractor, The Linux Foundation)
` (2 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-04-07 3:41 UTC (permalink / raw)
To: meta
Not everybody needs multiprocess support.
---
script/public-inbox-index | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/script/public-inbox-index b/script/public-inbox-index
index 73f88ac..db7ebba 100755
--- a/script/public-inbox-index
+++ b/script/public-inbox-index
@@ -31,7 +31,7 @@ my %opts = (
'--prune' => \$prune,
);
GetOptions(%opts) or die "bad command-line args\n$usage";
-die "--jobs must be positive\n" if defined $jobs && $jobs <= 0;
+die "--jobs must be positive\n" if defined $jobs && $jobs < 0;
my @dirs;
@@ -101,11 +101,11 @@ sub index_dir {
eval { require PublicInbox::V2Writable };
die "v2 requirements not met: $@\n" if $@;
my $v2w = eval {
- local $ENV{NPROC} = $jobs;
+ local $ENV{NPROC} = $jobs if $jobs;
PublicInbox::V2Writable->new($repo);
};
if (defined $jobs) {
- if ($jobs == 1) {
+ if ($jobs == 0) {
$v2w->{parallel} = 0;
} else {
my $n = $v2w->{partitions};
--
EW
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH 6/8] convert: support converting with altid defined
2018-04-07 3:41 [PATCH 0/8] less code, less memory, more speed Eric Wong (Contractor, The Linux Foundation)
` (4 preceding siblings ...)
2018-04-07 3:41 ` [PATCH 5/8] index: allow specifying --jobs=0 to disable multiprocess Eric Wong (Contractor, The Linux Foundation)
@ 2018-04-07 3:41 ` Eric Wong (Contractor, The Linux Foundation)
2018-04-07 3:41 ` [PATCH 7/8] store less data in the Xapian document Eric Wong (Contractor, The Linux Foundation)
2018-04-07 3:41 ` [PATCH 8/8] msgmap: speed up minmax with separate queries Eric Wong (Contractor, The Linux Foundation)
7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-04-07 3:41 UTC (permalink / raw)
To: meta
public-inbox-convert ought to be 100% lossless, now
---
script/public-inbox-convert | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git a/script/public-inbox-convert b/script/public-inbox-convert
index 9aa2781..2742be7 100755
--- a/script/public-inbox-convert
+++ b/script/public-inbox-convert
@@ -45,7 +45,6 @@ if (($old->{version} || 1) >= 2) {
die "Only conversion from v1 inboxes is supported\n";
}
my $new = { %$old };
-delete $new->{altid}; # TODO: support altid for v2
$new->{mainrepo} = abs_path($new_dir);
$new->{version} = 2;
$new = PublicInbox::InboxWritable->new($new);
@@ -61,6 +60,16 @@ $old->with_umask(sub {
"--file=$new->{mainrepo}/all.git/config",
'core.sharedRepository', $sr]);
}
+ if (my $alt = $new->{altid}) {
+ require PublicInbox::AltId;
+ foreach my $i (0..$#$alt) {
+ my $src = PublicInbox::AltId->new($old, $alt->[$i], 0);
+ $src->mm_alt or next;
+ my $dst = PublicInbox::AltId->new($new, $alt->[$i], 1);
+ $dst = $dst->{filename};
+ $src->mm_alt->{dbh}->sqlite_backup_to_file($dst);
+ }
+ }
});
my $state = '';
my ($prev, $from);
--
EW
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH 7/8] store less data in the Xapian document
2018-04-07 3:41 [PATCH 0/8] less code, less memory, more speed Eric Wong (Contractor, The Linux Foundation)
` (5 preceding siblings ...)
2018-04-07 3:41 ` [PATCH 6/8] convert: support converting with altid defined Eric Wong (Contractor, The Linux Foundation)
@ 2018-04-07 3:41 ` Eric Wong (Contractor, The Linux Foundation)
2018-04-07 3:41 ` [PATCH 8/8] msgmap: speed up minmax with separate queries Eric Wong (Contractor, The Linux Foundation)
7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-04-07 3:41 UTC (permalink / raw)
To: meta
Since we only query the SQLite over DB for OVER/XOVER; do not
need to waste space storing fields To/Cc/:bytes/:lines or the
XNUM term. We only use From/Subject/References/Message-ID/:blob
in various places of the PSGI code.
For reindexing, we will take advantage of docid stability
in "xapian-compact --no-renumber" to ensure duplicates do not
show up in search results. Since the PSGI interface is the
only consumer of Xapian at the moment, it has no need to
search based on NNTP article number.
---
lib/PublicInbox/NNTP.pm | 2 +-
lib/PublicInbox/OverIdx.pm | 6 +++---
lib/PublicInbox/SearchIdx.pm | 37 ++++---------------------------------
lib/PublicInbox/SearchMsg.pm | 6 ++----
lib/PublicInbox/V2Writable.pm | 2 +-
script/public-inbox-compact | 6 +++---
t/search.t | 24 +++++++++++++-----------
t/v2writable.t | 7 ++++---
8 files changed, 31 insertions(+), 59 deletions(-)
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index fa890cb..ace56e7 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -725,7 +725,7 @@ sub hdr_searchmsg ($$$$) {
my $nr = scalar @$msgs or return;
my $tmp = '';
foreach my $s (@$msgs) {
- $tmp .= $s->num . ' ' . $s->$field . "\r\n";
+ $tmp .= $s->{num} . ' ' . $s->$field . "\r\n";
}
utf8::encode($tmp);
do_more($self, $tmp);
diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm
index 08f8744..62fec0d 100644
--- a/lib/PublicInbox/OverIdx.pm
+++ b/lib/PublicInbox/OverIdx.pm
@@ -207,8 +207,8 @@ sub link_refs {
$tid;
}
-sub parse_references ($$$$) {
- my ($self, $smsg, $mid0, $mids) = @_;
+sub parse_references ($$$) {
+ my ($smsg, $mid0, $mids) = @_;
my $mime = $smsg->{mime};
my $hdr = $mime->header_obj;
my $refs = references($hdr);
@@ -241,7 +241,7 @@ sub add_overview {
blob => $oid,
}, 'PublicInbox::SearchMsg';
my $mids = mids($mime->header_obj);
- my $refs = $self->parse_references($smsg, $mid0, $mids);
+ my $refs = parse_references($smsg, $mid0, $mids);
my $subj = $smsg->subject;
my $xpath;
if ($subj ne '') {
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 7cfa745..f9b40b0 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -273,18 +273,12 @@ sub add_message {
my $smsg = PublicInbox::SearchMsg->new($mime);
my $doc = $smsg->{doc};
my $subj = $smsg->subject;
-
- $smsg->{lines} = $mime->body_raw =~ tr!\n!\n!;
- defined $bytes or $bytes = length($mime->as_string);
- $smsg->{bytes} = $bytes;
-
add_val($doc, PublicInbox::Search::TS(), $smsg->ts);
my @ds = gmtime($smsg->ds);
my $yyyymmdd = strftime('%Y%m%d', @ds);
add_val($doc, PublicInbox::Search::YYYYMMDD(), $yyyymmdd);
my $dt = strftime('%Y%m%d%H%M%S', @ds);
add_val($doc, PublicInbox::Search::DT(), $dt);
- my @vals = ($smsg->{ts}, $smsg->{ds});
my $tg = $self->term_generator;
@@ -333,11 +327,11 @@ sub add_message {
index_body($tg, \@orig, $doc) if @orig;
});
- # populates smsg->references for smsg->to_doc_data
- my $data = $smsg->to_doc_data($oid, $mid0);
foreach my $mid (@$mids) {
$tg->index_text($mid, 1, 'XM');
}
+ $smsg->{to} = $smsg->{cc} = '';
+ my $data = $smsg->to_doc_data($oid, $mid0);
$doc->set_data($data);
if (my $altid = $self->{-altid}) {
foreach my $alt (@$altid) {
@@ -350,24 +344,11 @@ sub add_message {
}
}
- $self->delete_article($num) if defined $num; # for reindexing
-
if (my $over = $self->{over}) {
- utf8::encode($data);
- $data = compress($data);
- my $refs = $over->parse_references($smsg, $mid0, $mids);
- my $xpath;
- if ($subj ne '') {
- $xpath = $self->subject_path($subj);
- $xpath = id_compress($xpath);
- }
-
- push @vals, $num, $mids, $refs, $xpath, $data;
- $over->add_over(\@vals);
+ $over->add_overview($mime, $bytes, $num, $oid, $mid0);
}
$doc->add_boolean_term('Q' . $_) foreach @$mids;
- $doc->add_boolean_term('XNUM' . $num) if defined $num;
- $doc_id = $self->{xdb}->add_document($doc);
+ $self->{xdb}->replace_document($doc_id = $num, $doc);
};
if ($@) {
@@ -419,16 +400,6 @@ sub remove_message {
}
}
-sub delete_article {
- my ($self, $num) = @_;
- my $ndel = 0;
- batch_do($self, 'XNUM' . $num, sub {
- my ($ids) = @_;
- $ndel += scalar @$ids;
- $self->{xdb}->delete_document($_) for @$ids;
- });
-}
-
# MID is a hint in V2
sub remove_by_oid {
my ($self, $oid, $mid) = @_;
diff --git a/lib/PublicInbox/SearchMsg.pm b/lib/PublicInbox/SearchMsg.pm
index 3278802..ab971e0 100644
--- a/lib/PublicInbox/SearchMsg.pm
+++ b/lib/PublicInbox/SearchMsg.pm
@@ -45,12 +45,11 @@ sub to_doc_data {
$self->cc,
$oid,
$mid0,
- $self->{bytes},
- $self->{lines}
+ $self->{bytes} || '',
+ $self->{lines} || ''
);
}
-
sub load_from_data ($$) {
my ($self) = $_[0]; # data = $_[1]
(
@@ -92,7 +91,6 @@ sub load_doc {
# :bytes and :lines metadata in RFC 3977
sub bytes ($) { $_[0]->{bytes} }
sub lines ($) { $_[0]->{lines} }
-sub num ($) { $_[0]->{num} ||= _get_term_val($_[0], 'XNUM', qr/\AXNUM/) }
sub __hdr ($$) {
my ($self, $field) = @_;
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 53fdb73..1cc4b00 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -800,7 +800,7 @@ sub unindex_oid {
my %gone;
my ($id, $prev);
while (my $smsg = $over->next_by_mid($mid, \$id, \$prev)) {
- $gone{$smsg->num} = 1 if $oid eq $smsg->{blob};
+ $gone{$smsg->{num}} = 1 if $oid eq $smsg->{blob};
1; # continue
}
my $n = scalar keys %gone;
diff --git a/script/public-inbox-compact b/script/public-inbox-compact
index d855b9e..9f33265 100755
--- a/script/public-inbox-compact
+++ b/script/public-inbox-compact
@@ -48,7 +48,7 @@ sub commit_changes ($$$) {
$im->lock_release;
remove_tree("$old/old") or die "failed to remove $old/old: $!\n";
}
-
+my @compact = qw(xapian-compact --no-renumber);
if ($v == 2) {
require PublicInbox::V2Writable;
my $v2w = PublicInbox::V2Writable->new($ibx);
@@ -70,7 +70,7 @@ if ($v == 2) {
}
close $dh;
die "No Xapian parts found in $old\n" unless @parts;
- my $cmd = ['xapian-compact', @parts, "$new/0" ];
+ my $cmd = [@compact, @parts, "$new/0" ];
PublicInbox::Import::run_die($cmd);
commit_changes($v2w, $old, $new);
});
@@ -84,7 +84,7 @@ if ($v == 2) {
my $new = tempdir('compact-XXXXXXXX', CLEANUP => 1, DIR => $v1_root);
$ibx->with_umask(sub {
$im->lock_acquire;
- PublicInbox::Import::run_die(['xapian-compact', $old, $new]);
+ PublicInbox::Import::run_die([@compact, $old, $new]);
commit_changes($im, $old, $new);
});
} else {
diff --git a/t/search.t b/t/search.t
index fda32d3..516f567 100644
--- a/t/search.t
+++ b/t/search.t
@@ -306,31 +306,33 @@ sub filter_mids {
# names and addresses
{
- my $res = $ro->query('t:list@example.com');
- is(scalar @$res, 6, 'searched To: successfully');
- foreach my $smsg (@$res) {
+ my $mset = $ro->query('t:list@example.com', {mset => 1});
+ is($mset->size, 6, 'searched To: successfully');
+ foreach my $m ($mset->items) {
+ my $smsg = $ro->lookup_article($m->get_docid);
like($smsg->to, qr/\blist\@example\.com\b/, 'to appears');
}
- $res = $ro->query('tc:list@example.com');
- is(scalar @$res, 6, 'searched To+Cc: successfully');
- foreach my $smsg (@$res) {
+ $mset = $ro->query('tc:list@example.com', {mset => 1});
+ is($mset->size, 6, 'searched To+Cc: successfully');
+ foreach my $m ($mset->items) {
+ my $smsg = $ro->lookup_article($m->get_docid);
my $tocc = join("\n", $smsg->to, $smsg->cc);
like($tocc, qr/\blist\@example\.com\b/, 'tocc appears');
}
foreach my $pfx ('tcf:', 'c:') {
- $res = $ro->query($pfx . 'foo@example.com');
- is(scalar @$res, 1,
- "searched $pfx successfully for Cc:");
- foreach my $smsg (@$res) {
+ my $mset = $ro->query($pfx . 'foo@example.com', { mset => 1 });
+ is($mset->items, 1, "searched $pfx successfully for Cc:");
+ foreach my $m ($mset->items) {
+ my $smsg = $ro->lookup_article($m->get_docid);
like($smsg->cc, qr/\bfoo\@example\.com\b/,
'cc appears');
}
}
foreach my $pfx ('', 'tcf:', 'f:') {
- $res = $ro->query($pfx . 'Laggy');
+ my $res = $ro->query($pfx . 'Laggy');
is(scalar(@$res), 1,
"searched $pfx successfully for From:");
foreach my $smsg (@$res) {
diff --git a/t/v2writable.t b/t/v2writable.t
index b543c53..85fb6a6 100644
--- a/t/v2writable.t
+++ b/t/v2writable.t
@@ -220,13 +220,14 @@ EOF
'commit message propagated to git');
is_deeply(\@after, \@before, 'only one commit written to git');
is($ibx->mm->num_for($smsg->mid), undef, 'no longer in Msgmap by mid');
- like($smsg->num, qr/\A\d+\z/, 'numeric number in return message');
- is($ibx->mm->mid_for($smsg->num), undef, 'no longer in Msgmap by num');
+ my $num = $smsg->{num};
+ like($num, qr/\A\d+\z/, 'numeric number in return message');
+ is($ibx->mm->mid_for($num), undef, 'no longer in Msgmap by num');
my $srch = $ibx->search->reopen;
my $mset = $srch->query('m:'.$smsg->mid, { mset => 1});
is($mset->size, 0, 'no longer found in Xapian');
my @log1 = qw(log -1 --pretty=raw --raw -r --no-abbrev --no-renames);
- is($srch->{over_ro}->get_art($smsg->num), undef,
+ is($srch->{over_ro}->get_art($num), undef,
'removal propagated to Over DB');
my $after = $git0->qx(@log1);
--
EW
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH 8/8] msgmap: speed up minmax with separate queries
2018-04-07 3:41 [PATCH 0/8] less code, less memory, more speed Eric Wong (Contractor, The Linux Foundation)
` (6 preceding siblings ...)
2018-04-07 3:41 ` [PATCH 7/8] store less data in the Xapian document Eric Wong (Contractor, The Linux Foundation)
@ 2018-04-07 3:41 ` Eric Wong (Contractor, The Linux Foundation)
7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-04-07 3:41 UTC (permalink / raw)
To: meta
This significantly improves the performance of the NNTP GROUP
command with 2.7 million messages from over 250ms to 700us.
SQLite is weird about this, but at least there's a way to
optimize it.
---
lib/PublicInbox/Msgmap.pm | 10 +++++++---
t/perf-nntpd.t | 13 +++++++++----
2 files changed, 16 insertions(+), 7 deletions(-)
diff --git a/lib/PublicInbox/Msgmap.pm b/lib/PublicInbox/Msgmap.pm
index f5f8843..feef8ba 100644
--- a/lib/PublicInbox/Msgmap.pm
+++ b/lib/PublicInbox/Msgmap.pm
@@ -138,10 +138,14 @@ sub num_for {
sub minmax {
my ($self) = @_;
my $dbh = $self->{dbh};
- my $sth = $self->{num_minmax} ||=
- $dbh->prepare('SELECT MIN(num),MAX(num) FROM msgmap');
+ # breaking MIN and MAX into separate queries speeds up from 250ms
+ # to around 700us with 2.7million messages.
+ my $sth = $dbh->prepare_cached('SELECT MIN(num) FROM msgmap', undef, 1);
$sth->execute;
- $sth->fetchrow_array;
+ my $min = $sth->fetchrow_array;
+ $sth = $dbh->prepare_cached('SELECT MAX(num) FROM msgmap', undef, 1);
+ $sth->execute;
+ ($min, $sth->fetchrow_array);
}
sub mid_prefixes {
diff --git a/t/perf-nntpd.t b/t/perf-nntpd.t
index 4987f98..e502153 100644
--- a/t/perf-nntpd.t
+++ b/t/perf-nntpd.t
@@ -3,7 +3,7 @@
use strict;
use warnings;
use Test::More;
-use Benchmark qw(:all);
+use Benchmark qw(:all :hireswallclock);
use PublicInbox::Inbox;
use File::Temp qw/tempdir/;
use POSIX qw(dup2);
@@ -79,8 +79,13 @@ $s = IO::Socket::INET->new(%opts);
$s->autoflush(1);
my $buf = $s->getline;
is($buf, "201 server ready - post via email\r\n", 'got greeting');
-ok($s->print("GROUP $group\r\n"), 'changed group');
-$buf = $s->getline;
+
+my $t = timeit(10, sub {
+ ok($s->print("GROUP $group\r\n"), 'changed group');
+ $buf = $s->getline;
+});
+diag 'GROUP took: ' . timestr($t);
+
my ($tot, $min, $max) = ($buf =~ /\A211 (\d+) (\d+) (\d+) /);
ok($tot && $min && $max, 'got GROUP response');
my $nr = $max - $min;
@@ -100,7 +105,7 @@ sub read_until_dot ($) {
$n;
}
-my $t = timeit(1, sub {
+$t = timeit(1, sub {
$s->print("XOVER $spec\r\n");
$n = read_until_dot($s);
});
--
EW
^ permalink raw reply related [flat|nested] 9+ messages in thread
end of thread, other threads:[~2018-04-07 3:41 UTC | newest]
Thread overview: 9+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2018-04-07 3:41 [PATCH 0/8] less code, less memory, more speed Eric Wong (Contractor, The Linux Foundation)
2018-04-07 3:41 ` [PATCH 1/8] psgi: ensure /$INBOX/$MESSAGE_ID/T/ endpoint is chronological Eric Wong (Contractor, The Linux Foundation)
2018-04-07 3:41 ` [PATCH 2/8] over: avoid excessive SELECT Eric Wong (Contractor, The Linux Foundation)
2018-04-07 3:41 ` [PATCH 3/8] over: remove forked subprocess Eric Wong (Contractor, The Linux Foundation)
2018-04-07 3:41 ` [PATCH 4/8] v2writable: reduce barriers Eric Wong (Contractor, The Linux Foundation)
2018-04-07 3:41 ` [PATCH 5/8] index: allow specifying --jobs=0 to disable multiprocess Eric Wong (Contractor, The Linux Foundation)
2018-04-07 3:41 ` [PATCH 6/8] convert: support converting with altid defined Eric Wong (Contractor, The Linux Foundation)
2018-04-07 3:41 ` [PATCH 7/8] store less data in the Xapian document Eric Wong (Contractor, The Linux Foundation)
2018-04-07 3:41 ` [PATCH 8/8] msgmap: speed up minmax with separate queries Eric Wong (Contractor, The Linux Foundation)
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).