* [PATCH 01/12] import: allow the epoch (0s) as a valid time
2018-02-22 21:42 [WIP PATCH 0/12] v2: git repo rotation + parallel Xapian indexing Eric Wong (Contractor, The Linux Foundation)
@ 2018-02-22 21:42 ` Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 02/12] extmsg: fix broken Xapian MID lookup Eric Wong (Contractor, The Linux Foundation)
` (11 subsequent siblings)
12 siblings, 0 replies; 14+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-02-22 21:42 UTC (permalink / raw)
To: meta
Despite email not existing until 1971; "Jan 1, 1970 00:00:00"
seems like a common default timestamp for some test emails
to use as a Date: header.
---
lib/PublicInbox/Import.pm | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index f8d1003..1f831a7 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -233,7 +233,7 @@ sub parse_date ($) {
warn "bogus TZ offset: $zone, ignoring and assuming +0000\n";
$zone = '+0000';
}
- $ts ||= time;
+ $ts = time unless defined $ts;
$ts = 0 if $ts < 0; # git uses unsigned times
"$ts $zone";
}
--
EW
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 02/12] extmsg: fix broken Xapian MID lookup
2018-02-22 21:42 [WIP PATCH 0/12] v2: git repo rotation + parallel Xapian indexing Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 01/12] import: allow the epoch (0s) as a valid time Eric Wong (Contractor, The Linux Foundation)
@ 2018-02-22 21:42 ` Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 03/12] search: stop assuming Message-ID is unique Eric Wong (Contractor, The Linux Foundation)
` (10 subsequent siblings)
12 siblings, 0 replies; 14+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-02-22 21:42 UTC (permalink / raw)
To: meta
This likely has no real world implications, though, as we
fall back to Msgmap lookups anyways.
Broken since commit 7eeadcb62729b0efbcb53cd9b7b181897c92cf9a
("search: remove unnecessary abstractions and functionality")
---
lib/PublicInbox/ExtMsg.pm | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lib/PublicInbox/ExtMsg.pm b/lib/PublicInbox/ExtMsg.pm
index ab9591f..4e31ef0 100644
--- a/lib/PublicInbox/ExtMsg.pm
+++ b/lib/PublicInbox/ExtMsg.pm
@@ -46,7 +46,7 @@ sub ext_msg {
}
# try to find the URL with Xapian to avoid forking
- my $doc_id = eval { $s->find_unique_doc_id('mid', $mid) };
+ my $doc_id = eval { $s->find_unique_doc_id('XMID' . $mid) };
if ($@) {
# xapian not configured properly for this repo
push @nox, $other;
--
EW
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 03/12] search: stop assuming Message-ID is unique
2018-02-22 21:42 [WIP PATCH 0/12] v2: git repo rotation + parallel Xapian indexing Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 01/12] import: allow the epoch (0s) as a valid time Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 02/12] extmsg: fix broken Xapian MID lookup Eric Wong (Contractor, The Linux Foundation)
@ 2018-02-22 21:42 ` Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 04/12] www: stop assuming mainrepo == git_dir Eric Wong (Contractor, The Linux Foundation)
` (9 subsequent siblings)
12 siblings, 0 replies; 14+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-02-22 21:42 UTC (permalink / raw)
To: meta
In general, they are, but there's no way for or general purpose
mail server to enforce that. This is a step in allowing us
to handle more corner cases which existing lists throw at us.
---
lib/PublicInbox/ExtMsg.pm | 2 +-
lib/PublicInbox/Search.pm | 14 ++++++++++++--
lib/PublicInbox/SearchIdx.pm | 10 ++++++----
3 files changed, 19 insertions(+), 7 deletions(-)
diff --git a/lib/PublicInbox/ExtMsg.pm b/lib/PublicInbox/ExtMsg.pm
index 4e31ef0..90d68db 100644
--- a/lib/PublicInbox/ExtMsg.pm
+++ b/lib/PublicInbox/ExtMsg.pm
@@ -46,7 +46,7 @@ sub ext_msg {
}
# try to find the URL with Xapian to avoid forking
- my $doc_id = eval { $s->find_unique_doc_id('XMID' . $mid) };
+ my $doc_id = eval { $s->find_first_doc_id('XMID' . $mid) };
if ($@) {
# xapian not configured properly for this repo
push @nox, $other;
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index 3ec96ca..33a1f2d 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -56,7 +56,7 @@ my %bool_pfx_internal = (
);
my %bool_pfx_external = (
- mid => 'XMID', # uniQue id (Message-ID)
+ mid => 'XMID', # Message-ID (full/exact)
);
my %prob_prefix = (
@@ -285,7 +285,7 @@ sub lookup_message {
my ($self, $mid) = @_;
$mid = mid_clean($mid);
- my $doc_id = $self->find_unique_doc_id('XMID' . $mid);
+ my $doc_id = $self->find_first_doc_id('XMID' . $mid);
my $smsg;
if (defined $doc_id) {
# raises on error:
@@ -327,6 +327,16 @@ sub find_doc_ids {
($db->postlist_begin($termval), $db->postlist_end($termval));
}
+sub find_first_doc_id {
+ my ($self, $termval) = @_;
+
+ my ($begin, $end) = $self->find_doc_ids($termval);
+
+ return undef if $begin->equal($end); # not found
+
+ $begin->get_docid;
+}
+
# normalize subjects so they are suitable as pathnames for URLs
# XXX: consider for removal
sub subject_path {
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index fa5057f..265403a 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -366,12 +366,14 @@ sub remove_message {
$mid = mid_clean($mid);
eval {
- $doc_id = $self->find_unique_doc_id('XMID' . $mid);
- if (defined $doc_id) {
- $db->delete_document($doc_id);
- } else {
+ my ($head, $tail) = $self->find_doc_ids('XMID' . $mid);
+ if ($head->equal($tail)) {
warn "cannot remove non-existent <$mid>\n";
}
+ for (; $head != $tail; $head->inc) {
+ my $docid = $head->get_docid;
+ $db->delete_document($docid);
+ }
};
if ($@) {
--
EW
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 04/12] www: stop assuming mainrepo == git_dir
2018-02-22 21:42 [WIP PATCH 0/12] v2: git repo rotation + parallel Xapian indexing Eric Wong (Contractor, The Linux Foundation)
` (2 preceding siblings ...)
2018-02-22 21:42 ` [PATCH 03/12] search: stop assuming Message-ID is unique Eric Wong (Contractor, The Linux Foundation)
@ 2018-02-22 21:42 ` Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 05/12] v2writable: initial cut for repo-rotation Eric Wong (Contractor, The Linux Foundation)
` (8 subsequent siblings)
12 siblings, 0 replies; 14+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-02-22 21:42 UTC (permalink / raw)
To: meta
It won't be in v2
---
lib/PublicInbox/WWW.pm | 1 -
1 file changed, 1 deletion(-)
diff --git a/lib/PublicInbox/WWW.pm b/lib/PublicInbox/WWW.pm
index 4ddc187..e95fba0 100644
--- a/lib/PublicInbox/WWW.pm
+++ b/lib/PublicInbox/WWW.pm
@@ -150,7 +150,6 @@ sub invalid_inbox ($$) {
my $www = $ctx->{www};
my $obj = $www->{pi_config}->lookup_name($inbox);
if (defined $obj) {
- $ctx->{git_dir} = $obj->{mainrepo};
$ctx->{git} = $obj->git;
$ctx->{-inbox} = $obj;
$ctx->{inbox} = $inbox;
--
EW
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 05/12] v2writable: initial cut for repo-rotation
2018-02-22 21:42 [WIP PATCH 0/12] v2: git repo rotation + parallel Xapian indexing Eric Wong (Contractor, The Linux Foundation)
` (3 preceding siblings ...)
2018-02-22 21:42 ` [PATCH 04/12] www: stop assuming mainrepo == git_dir Eric Wong (Contractor, The Linux Foundation)
@ 2018-02-22 21:42 ` Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 06/12] git: reload alternates file on missing blob Eric Wong (Contractor, The Linux Foundation)
` (7 subsequent siblings)
12 siblings, 0 replies; 14+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-02-22 21:42 UTC (permalink / raw)
To: meta
Wrap the old Import package to enable creating new repos based
on size thresholds. This is better than relying on time-based
rotation as LKML traffic seems to be increasing.
---
MANIFEST | 1 +
lib/PublicInbox/Git.pm | 12 +++
lib/PublicInbox/Import.pm | 9 ++-
lib/PublicInbox/V2Writable.pm | 180 ++++++++++++++++++++++++++++++++++++++++++
scripts/import_vger_from_mbox | 19 +++--
t/git.t | 3 +
6 files changed, 211 insertions(+), 13 deletions(-)
create mode 100644 lib/PublicInbox/V2Writable.pm
diff --git a/MANIFEST b/MANIFEST
index 1df27f2..4b51b54 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -91,6 +91,7 @@ lib/PublicInbox/Spamcheck/Spamc.pm
lib/PublicInbox/Spawn.pm
lib/PublicInbox/SpawnPP.pm
lib/PublicInbox/Unsubscribe.pm
+lib/PublicInbox/V2Writable.pm
lib/PublicInbox/View.pm
lib/PublicInbox/WWW.pm
lib/PublicInbox/WWW.pod
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index ea2b814..6437643 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -139,6 +139,18 @@ sub cleanup {
_destroy($self, qw(in_c out_c pid_c));
}
+# assuming a well-maintained repo, this should be a somewhat
+# accurate estimation of its size
+# TODO: show this in the WWW UI as a hint to potential cloners
+sub packed_bytes {
+ my ($self) = @_;
+ my $n = 0;
+ foreach my $p (glob("$self->{git_dir}/objects/pack/*.pack")) {
+ $n += -s $p;
+ }
+ $n
+}
+
sub DESTROY { cleanup(@_) }
1;
diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index 1f831a7..364ab60 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -31,6 +31,7 @@ sub new {
inbox => $ibx,
path_type => '2/38', # or 'v2'
ssoma_lock => 1, # disable for v2
+ bytes_added => 0,
}, $class
}
@@ -275,7 +276,9 @@ sub add {
my $blob = $self->{mark}++;
my $str = $mime->as_string;
- print $w "blob\nmark :$blob\ndata ", length($str), "\n" or wfail;
+ my $n = length($str);
+ $self->{bytes_added} += $n;
+ print $w "blob\nmark :$blob\ndata ", $n, "\n" or wfail;
print $w $str, "\n" or wfail;
$str = undef;
@@ -325,7 +328,7 @@ sub add {
$self->{tip} = ":$commit";
}
-sub run_die ($$) {
+sub run_die ($;$) {
my ($cmd, $env) = @_;
my $pid = spawn($cmd, $env, undef);
defined $pid or die "spawning ".join(' ', @$cmd)." failed: $!";
@@ -354,7 +357,7 @@ sub done {
}
if ($nchg) {
run_die([@cmd, 'update-server-info'], undef);
- eval {
+ ($self->{path_type} eq '2/38') and eval {
require PublicInbox::SearchIdx;
my $inbox = $self->{inbox} || $git_dir;
my $s = PublicInbox::SearchIdx->new($inbox);
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
new file mode 100644
index 0000000..9b68e9b
--- /dev/null
+++ b/lib/PublicInbox/V2Writable.pm
@@ -0,0 +1,180 @@
+# Copyright (C) 2018 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# This interface wraps and mimics PublicInbox::Import
+package PublicInbox::V2Writable;
+use strict;
+use warnings;
+use Fcntl qw(:flock :DEFAULT);
+use PublicInbox::SearchIdx;
+use PublicInbox::MIME;
+use PublicInbox::Git;
+use PublicInbox::Import;
+use Email::MIME::ContentType;
+$Email::MIME::ContentType::STRICT_PARAMS = 0;
+
+# an estimate of the post-packed size to the raw uncompressed size
+my $PACKING_FACTOR = 0.4;
+
+sub new {
+ my ($class, $v2ibx, $creat) = @_;
+ my $dir = $v2ibx->{mainrepo} or die "no mainrepo in inbox\n";
+ unless (-d $dir) {
+ if ($creat) {
+ require File::Path;
+ File::Path::mkpath($dir);
+ } else {
+ die "$dir does not exist\n";
+ }
+ }
+ my $self = {
+ -inbox => $v2ibx,
+ im => undef, # PublicInbox::Import
+ xap_rw => undef, # PublicInbox::V2SearchIdx
+ xap_ro => undef,
+
+ # limit each repo to 1GB or so
+ rotate_bytes => int((100 * 1024 * 1024) / $PACKING_FACTOR),
+ };
+ bless $self, $class
+}
+
+# returns undef on duplicate or spam
+# mimics Import::add and wraps it for v2
+sub add {
+ my ($self, $mime, $check_cb) = @_;
+ my $existing = $self->lookup_content($mime);
+
+ if ($existing) {
+ return undef if $existing->type eq 'mail'; # duplicate
+ }
+
+ my $im = $self->importer;
+
+ # im->add returns undef if check_cb fails
+ my $cmt = $im->add($mime, $check_cb) or return;
+ $cmt = $im->get_mark($cmt);
+ my $oid = $im->{last_object_id};
+ $self->index_msg($mime, $existing, $cmt, $oid);
+ $mime;
+}
+
+sub index_msg { # TODO
+}
+
+sub remove {
+ my ($self, $mime, $msg) = @_;
+ my $existing = $self->lookup_content($mime) or return;
+
+ # don't touch ghosts or already junked messages
+ return unless $existing->type eq 'mail';
+
+ # always write removals to the current (latest) git repo since
+ # we process chronologically
+ my $im = $self->importer;
+ my ($cmt, undef) = $im->remove($mime, $msg);
+ $cmt = $im->get_mark($cmt);
+ $self->unindex_msg($existing, $cmt);
+}
+
+sub done {
+ my ($self) = @_;
+ $self->{im}->done; # PublicInbox::Import::done
+}
+
+sub checkpoint {
+ my ($self) = @_;
+ $self->{im}->checkpoint; # PublicInbox::Import::checkpoint
+}
+
+sub git_init {
+ my ($self, $new) = @_;
+ my $pfx = "$self->{-inbox}->{mainrepo}/git";
+ my $git_dir = "$pfx/$new.git";
+ die "$git_dir exists\n" if -e $git_dir;
+ my @cmd = (qw(git init --bare -q), $git_dir);
+ PublicInbox::Import::run_die(\@cmd);
+ @cmd = (qw/git config/, "--file=$git_dir/config",
+ 'repack.writeBitmaps', 'true');
+ PublicInbox::Import::run_die(\@cmd);
+
+ my $all = "$self->{-inbox}->{mainrepo}/all.git";
+ unless (-d $all) {
+ @cmd = (qw(git init --bare -q), $all);
+ PublicInbox::Import::run_die(\@cmd);
+ }
+
+ my $alt = "$all/objects/info/alternates";
+ my $new_obj_dir = "../../git/$new.git/objects";
+ my %alts;
+ if (-e $alt) {
+ open(my $fh, '<', $alt) or die "open < $alt: $!\n";
+ %alts = map { chomp; $_ => 1 } (<$fh>);
+ }
+ return $git_dir if $alts{$new_obj_dir};
+ open my $fh, '>>', $alt or die "open >> $alt: $!\n";
+ print $fh "$new_obj_dir\n" or die "print >> $alt: $!\n";
+ close $fh or die "close $alt: $!\n";
+ $git_dir
+}
+
+sub importer {
+ my ($self) = @_;
+ my $im = $self->{im};
+ if ($im) {
+ if ($im->{bytes_added} < $self->{rotate_bytes}) {
+ return $im;
+ } else {
+ $self->{im} = undef;
+ $im->done;
+ $im = undef;
+ my $git_dir = $self->git_init(++$self->{max_git});
+ my $git = PublicInbox::Git->new($git_dir);
+ return $self->import_init($git, 0);
+ }
+ }
+ my $latest;
+ my $max = -1;
+ my $new = 0;
+ my $pfx = "$self->{-inbox}->{mainrepo}/git";
+ if (-d $pfx) {
+ foreach my $git_dir (glob("$pfx/*.git")) {
+ $git_dir =~ m!/(\d+)\.git\z! or next;
+ my $n = $1;
+ if ($n > $max) {
+ $max = $n;
+ $latest = $git_dir;
+ }
+ }
+ }
+ if (defined $latest) {
+ my $git = PublicInbox::Git->new($latest);
+ my $packed_bytes = $git->packed_bytes;
+ if ($packed_bytes >= $self->{rotate_bytes}) {
+ $new = $max + 1;
+ } else {
+ $self->{max_git} = $max;
+ return $self->import_init($git, $packed_bytes);
+ }
+ } else {
+ warn "latest not found in $pfx\n";
+ }
+ $self->{max_git} = $new;
+ $latest = $self->git_init($new);
+ $self->import_init(PublicInbox::Git->new($latest), 0);
+}
+
+sub import_init {
+ my ($self, $git, $packed_bytes) = @_;
+ my $im = PublicInbox::Import->new($git, undef, undef, $self->{-inbox});
+ $im->{bytes_added} = int($packed_bytes / $PACKING_FACTOR);
+ $im->{ssoma_lock} = 0;
+ $im->{path_type} = 'v2';
+ $self->{im} = $im;
+}
+
+sub lookup_content {
+ undef # TODO
+}
+
+1;
diff --git a/scripts/import_vger_from_mbox b/scripts/import_vger_from_mbox
index 6ea2ca5..c45dc4e 100644
--- a/scripts/import_vger_from_mbox
+++ b/scripts/import_vger_from_mbox
@@ -7,25 +7,24 @@ use Getopt::Long qw/:config gnu_getopt no_ignore_case auto_abbrev/;
use Date::Parse qw/str2time/;
use Email::MIME;
$Email::MIME::ContentType::STRICT_PARAMS = 0; # user input is imperfect
-use PublicInbox::Git;
-use PublicInbox::Import;
-my $usage = "usage: $0 NAME EMAIL <MBOX\n";
+use PublicInbox::V2Writable;
+my $usage = "usage: $0 NAME EMAIL DIR <MBOX\n";
my $dry_run;
my %opts = ( 'n|dry-run' => \$dry_run );
GetOptions(%opts) or die $usage;
-chomp(my $git_dir = `git rev-parse --git-dir`);
-my $git = PublicInbox::Git->new($git_dir);
my $name = shift or die $usage; # git
my $email = shift or die $usage; # git@vger.kernel.org
-my $im = $dry_run ? undef : PublicInbox::Import->new($git, $name, $email);
+my $mainrepo = shift or die $usage; # /path/to/v2/repo
+my $v2ibx = {
+ mainrepo => $mainrepo,
+ name => $name,
+ -primary_address => $email,
+};
+my $im = $dry_run ? undef : PublicInbox::V2Writable->new($v2ibx, 1);
binmode STDIN;
my $msg = '';
use PublicInbox::Filter::Vger;
my $vger = PublicInbox::Filter::Vger->new;
-if ($im) {
- $im->{ssoma_lock} = 0;
- $im->{path_type} = 'v2';
-}
sub do_add ($$) {
my ($im, $msg) = @_;
diff --git a/t/git.t b/t/git.t
index 5efc18a..ab588a1 100644
--- a/t/git.t
+++ b/t/git.t
@@ -137,6 +137,9 @@ if (1) {
is($all, join('', @ref), 'qx returned array when wanted');
my $nl = scalar @ref;
ok($nl > 1, "qx returned array length of $nl");
+
+ $gcf->qx(qw(repack -adbq));
+ ok($gcf->packed_bytes > 0, 'packed size is positive');
}
done_testing();
--
EW
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 06/12] git: reload alternates file on missing blob
2018-02-22 21:42 [WIP PATCH 0/12] v2: git repo rotation + parallel Xapian indexing Eric Wong (Contractor, The Linux Foundation)
` (4 preceding siblings ...)
2018-02-22 21:42 ` [PATCH 05/12] v2writable: initial cut for repo-rotation Eric Wong (Contractor, The Linux Foundation)
@ 2018-02-22 21:42 ` Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 07/12] v2: support Xapian + SQLite indexing Eric Wong (Contractor, The Linux Foundation)
` (6 subsequent siblings)
12 siblings, 0 replies; 14+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-02-22 21:42 UTC (permalink / raw)
To: meta
Since we'll be adding new repositories to the `alternates' file
in git, we must restart the `git cat-file --batch' process as
git currently does not detect changes to the alternates file
in long-running cat-file processes.
Don't bother with the `--batch-check' process since we won't be
using it with v2.
---
lib/PublicInbox/Git.pm | 29 +++++++++++++++++++++++++----
t/git.t | 21 +++++++++++++++++++++
2 files changed, 46 insertions(+), 4 deletions(-)
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 6437643..95df52e 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -15,7 +15,19 @@ use PublicInbox::Spawn qw(spawn popen_rd);
sub new {
my ($class, $git_dir) = @_;
- bless { git_dir => $git_dir }, $class
+ my @st;
+ $st[7] = $st[10] = 0;
+ bless { git_dir => $git_dir, st => \@st }, $class
+}
+
+sub alternates_changed {
+ my ($self) = @_;
+ my $alt = "$self->{git_dir}/objects/info/alternates";
+ my @st = stat($alt) or return 0;
+ my $old_st = $self->{st};
+ # 10 - ctime, 7 - size
+ return 0 if ($st[10] == $old_st->[10] && $st[7] == $old_st->[7]);
+ $self->{st} = \@st;
}
sub _bidi_pipe {
@@ -38,14 +50,23 @@ sub _bidi_pipe {
sub cat_file {
my ($self, $obj, $ref) = @_;
+ my ($retried, $in, $head);
+again:
batch_prepare($self);
$self->{out}->print($obj, "\n") or fail($self, "write error: $!");
- my $in = $self->{in};
+ $in = $self->{in};
local $/ = "\n";
- my $head = $in->getline;
- $head =~ / missing$/ and return undef;
+ $head = $in->getline;
+ if ($head =~ / missing$/) {
+ if (!$retried && alternates_changed($self)) {
+ $retried = 1;
+ cleanup($self);
+ goto again;
+ }
+ return;
+ }
$head =~ /^[0-9a-f]{40} \S+ (\d+)$/ or
fail($self, "Unexpected result from git cat-file: $head");
diff --git a/t/git.t b/t/git.t
index ab588a1..7f96293 100644
--- a/t/git.t
+++ b/t/git.t
@@ -6,6 +6,7 @@ use Test::More;
use File::Temp qw/tempdir/;
my $dir = tempdir('pi-git-XXXXXX', TMPDIR => 1, CLEANUP => 1);
use Cwd qw/getcwd/;
+use PublicInbox::Spawn qw(popen_rd);
use_ok 'PublicInbox::Git';
{
@@ -142,4 +143,24 @@ if (1) {
ok($gcf->packed_bytes > 0, 'packed size is positive');
}
+if ('alternates reloaded') {
+ my $alt = tempdir('pi-git-XXXXXX', TMPDIR => 1, CLEANUP => 1);
+ my @cmd = ('git', "--git-dir=$alt", qw(hash-object -w --stdin));
+ is(system(qw(git init -q --bare), $alt), 0, 'create alt directory');
+ open my $fh, '<', "$alt/config" or die "open failed: $!\n";
+ my $rd = popen_rd(\@cmd, {}, { 0 => fileno($fh) } );
+ close $fh or die "close failed: $!";
+ chomp(my $remote = <$rd>);
+ my $gcf = PublicInbox::Git->new($dir);
+ is($gcf->cat_file($remote), undef, "remote file not found");
+ open $fh, '>>', "$dir/objects/info/alternates" or
+ die "open failed: $!\n";
+ print $fh "$alt/objects" or die "print failed: $!\n";
+ close $fh or die "close failed: $!";
+ my $found = $gcf->cat_file($remote);
+ open $fh, '<', "$alt/config" or die "open failed: $!\n";
+ my $config = eval { local $/; <$fh> };
+ is($$found, $config, 'alternates reloaded');
+}
+
done_testing();
--
EW
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 07/12] v2: support Xapian + SQLite indexing
2018-02-22 21:42 [WIP PATCH 0/12] v2: git repo rotation + parallel Xapian indexing Eric Wong (Contractor, The Linux Foundation)
` (5 preceding siblings ...)
2018-02-22 21:42 ` [PATCH 06/12] git: reload alternates file on missing blob Eric Wong (Contractor, The Linux Foundation)
@ 2018-02-22 21:42 ` Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 08/12] import_vger_from_inbox: allow "-V" option Eric Wong (Contractor, The Linux Foundation)
` (5 subsequent siblings)
12 siblings, 0 replies; 14+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-02-22 21:42 UTC (permalink / raw)
To: meta
This is too slow, currently. Working with only 2017 LKML
archives:
git-only: ~1 minute
git + SQLite: ~12 minutes
git+Xapian+SQlite: ~45 minutes
So yes, it looks like we'll need to parallelize Xapian indexing,
at least.
---
lib/PublicInbox/Import.pm | 1 +
lib/PublicInbox/Inbox.pm | 4 +++-
lib/PublicInbox/Search.pm | 26 +++++++++++++++-----
lib/PublicInbox/SearchIdx.pm | 56 +++++++++++++++++++++++++++++++------------
lib/PublicInbox/V2Writable.pm | 45 ++++++++++++++++++++++++++++------
scripts/import_vger_from_mbox | 3 +++
6 files changed, 106 insertions(+), 29 deletions(-)
diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index 364ab60..1a2698a 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -285,6 +285,7 @@ sub add {
# v2: we need this for Xapian
if ($self->{want_object_id}) {
chomp($self->{last_object_id} = $self->get_mark(":$blob"));
+ $self->{last_object_size} = $n;
}
my $ref = $self->{ref};
diff --git a/lib/PublicInbox/Inbox.pm b/lib/PublicInbox/Inbox.pm
index 2ec2be6..e7856e3 100644
--- a/lib/PublicInbox/Inbox.pm
+++ b/lib/PublicInbox/Inbox.pm
@@ -79,7 +79,9 @@ sub new {
sub git {
my ($self) = @_;
$self->{git} ||= eval {
- my $g = PublicInbox::Git->new($self->{mainrepo});
+ my $git_dir = $self->{mainrepo};
+ $git_dir .= '/all.git' if (($self->{version} || 1) == 2);
+ my $g = PublicInbox::Git->new($git_dir);
$g->{-httpbackend_limiter} = $self->{-httpbackend_limiter};
_cleanup_later($self);
$g;
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index 33a1f2d..eac11bd 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -120,15 +120,29 @@ chomp @HELP;
my $mail_query = Search::Xapian::Query->new('T' . 'mail');
sub xdir {
- my (undef, $git_dir) = @_;
- "$git_dir/public-inbox/xapian" . SCHEMA_VERSION;
+ my ($self) = @_;
+ if ($self->{version} == 1) {
+ "$self->{mainrepo}/public-inbox/xapian" . SCHEMA_VERSION;
+ } else {
+ "$self->{mainrepo}/xap" . SCHEMA_VERSION;
+ }
}
sub new {
- my ($class, $git_dir, $altid) = @_;
- my $dir = $class->xdir($git_dir);
- my $db = Search::Xapian::Database->new($dir);
- bless { xdb => $db, git_dir => $git_dir, altid => $altid }, $class;
+ my ($class, $mainrepo, $altid) = @_;
+ my $version = 1;
+ my $ibx = $mainrepo;
+ if (ref $ibx) {
+ $version = $ibx->{version} || 1;
+ $mainrepo = $ibx->{mainrepo};
+ }
+ my $self = bless {
+ mainrepo => $mainrepo,
+ altid => $altid,
+ version => $version,
+ }, $class;
+ $self->{xdb} = Search::Xapian::Database->new($self->xdir);
+ $self;
}
sub reopen { $_[0]->{xdb}->reopen }
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 265403a..c6c5bd2 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -51,26 +51,43 @@ sub git_unquote ($) {
}
sub new {
- my ($class, $inbox, $creat) = @_;
- my $git_dir = $inbox;
- my $altid;
- if (ref $inbox) {
- $git_dir = $inbox->{mainrepo};
- $altid = $inbox->{altid};
+ my ($class, $ibx, $creat) = @_;
+ my $mainrepo = $ibx; # for "public-inbox-index" w/o entry in config
+ my $git_dir = $mainrepo;
+ my ($altid, $git);
+ my $version = 1;
+ if (ref $ibx) {
+ $mainrepo = $ibx->{mainrepo};
+ $altid = $ibx->{altid};
+ $version = $ibx->{version} || 1;
if ($altid) {
require PublicInbox::AltId;
$altid = [ map {
- PublicInbox::AltId->new($inbox, $_);
+ PublicInbox::AltId->new($ibx, $_);
} @$altid ];
}
+ $git = $ibx->git;
+ } else {
+ $git = PublicInbox::Git->new($git_dir); # v1 only
}
require Search::Xapian::WritableDatabase;
- my $self = bless { git_dir => $git_dir, -altid => $altid }, $class;
+ my $self = bless {
+ mainrepo => $mainrepo,
+ git => $git,
+ -altid => $altid,
+ version => $version,
+ }, $class;
my $perm = $self->_git_config_perm;
my $umask = _umask_for($perm);
$self->{umask} = $umask;
- $self->{lock_path} = "$git_dir/ssoma.lock";
- $self->{git} = PublicInbox::Git->new($git_dir);
+ if ($version == 1) {
+ $self->{lock_path} = "$mainrepo/ssoma.lock";
+ } elsif ($version == 2) {
+ $self->{lock_path} = "$mainrepo/inbox.lock";
+ $self->{msgmap_path} = "$mainrepo/msgmap.sqlite3";
+ } else {
+ die "unsupported inbox version=$version\n";
+ }
$self->{creat} = ($creat || 0) == 1;
$self;
}
@@ -86,7 +103,7 @@ sub _xdb_release {
sub _xdb_acquire {
my ($self) = @_;
croak 'already acquired' if $self->{xdb};
- my $dir = PublicInbox::Search->xdir($self->{git_dir});
+ my $dir = $self->xdir;
my $flag = Search::Xapian::DB_OPEN;
if ($self->{creat}) {
require File::Path;
@@ -541,6 +558,7 @@ sub batch_adjust ($$$$) {
}
}
+# only for v1
sub rlog {
my ($self, $log, $add_cb, $del_cb, $batch_cb) = @_;
my $hex = '[a-f0-9]';
@@ -573,9 +591,14 @@ sub rlog {
sub _msgmap_init {
my ($self) = @_;
- $self->{mm} = eval {
+ $self->{mm} ||= eval {
require PublicInbox::Msgmap;
- PublicInbox::Msgmap->new($self->{git_dir}, 1);
+ my $msgmap_path = $self->{msgmap_path};
+ if (defined $msgmap_path) { # v2
+ PublicInbox::Msgmap->new_file($msgmap_path, 1);
+ } else {
+ PublicInbox::Msgmap->new($self->{mainrepo}, 1);
+ }
};
}
@@ -712,8 +735,11 @@ sub merge_threads {
sub _read_git_config_perm {
my ($self) = @_;
- my @cmd = qw(config core.sharedRepository);
- my $fh = PublicInbox::Git->new($self->{git_dir})->popen(@cmd);
+ my @cmd = qw(config);
+ if ($self->{version} == 2) {
+ push @cmd, "--file=$self->{mainrepo}/inbox-config";
+ }
+ my $fh = $self->{git}->popen(@cmd, 'core.sharedRepository');
local $/ = "\n";
my $perm = <$fh>;
chomp $perm if defined $perm;
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 9b68e9b..41bfb8d 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -34,7 +34,7 @@ sub new {
xap_ro => undef,
# limit each repo to 1GB or so
- rotate_bytes => int((100 * 1024 * 1024) / $PACKING_FACTOR),
+ rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
};
bless $self, $class
}
@@ -55,11 +55,29 @@ sub add {
my $cmt = $im->add($mime, $check_cb) or return;
$cmt = $im->get_mark($cmt);
my $oid = $im->{last_object_id};
- $self->index_msg($mime, $existing, $cmt, $oid);
+ my $size = $im->{last_object_size};
+
+ my $idx = $self->search_idx;
+ $idx->index_both($mime, $size, $oid);
+ $idx->{xdb}->set_metadata('last_commit', $cmt);
+ my $n = $self->{transact_bytes} += $size;
+ if ($n > PublicInbox::SearchIdx::BATCH_BYTES) {
+ $self->checkpoint;
+ }
+
$mime;
}
-sub index_msg { # TODO
+sub search_idx {
+ my ($self) = @_;
+ $self->{idx} ||= eval {
+ my $idx = PublicInbox::SearchIdx->new($self->{-inbox}, 1);
+ my $mm = $idx->_msgmap_init;
+ $idx->_xdb_acquire->begin_transaction;
+ $self->{transact_bytes} = 0;
+ $mm->{dbh}->begin_work;
+ $idx
+ };
}
sub remove {
@@ -79,12 +97,25 @@ sub remove {
sub done {
my ($self) = @_;
- $self->{im}->done; # PublicInbox::Import::done
+ my $im = $self->{im};
+ $im->done if $im; # PublicInbox::Import::done
+ $self->searchidx_checkpoint;
}
sub checkpoint {
my ($self) = @_;
- $self->{im}->checkpoint; # PublicInbox::Import::checkpoint
+ my $im = $self->{im};
+ $im->checkpoint if $im; # PublicInbox::Import::checkpoint
+ $self->searchidx_checkpoint;
+}
+
+sub searchidx_checkpoint {
+ my ($self) = @_;
+ my $idx = delete $self->{idx} or return;
+
+ $idx->{mm}->{dbh}->commit;
+ $idx->{xdb}->commit_transaction;
+ $idx->_xdb_release;
}
sub git_init {
@@ -127,6 +158,7 @@ sub importer {
} else {
$self->{im} = undef;
$im->done;
+ $self->searchidx_checkpoint;
$im = undef;
my $git_dir = $self->git_init(++$self->{max_git});
my $git = PublicInbox::Git->new($git_dir);
@@ -156,8 +188,6 @@ sub importer {
$self->{max_git} = $max;
return $self->import_init($git, $packed_bytes);
}
- } else {
- warn "latest not found in $pfx\n";
}
$self->{max_git} = $new;
$latest = $self->git_init($new);
@@ -168,6 +198,7 @@ sub import_init {
my ($self, $git, $packed_bytes) = @_;
my $im = PublicInbox::Import->new($git, undef, undef, $self->{-inbox});
$im->{bytes_added} = int($packed_bytes / $PACKING_FACTOR);
+ $im->{want_object_id} = 1;
$im->{ssoma_lock} = 0;
$im->{path_type} = 'v2';
$self->{im} = $im;
diff --git a/scripts/import_vger_from_mbox b/scripts/import_vger_from_mbox
index c45dc4e..d30e8a3 100644
--- a/scripts/import_vger_from_mbox
+++ b/scripts/import_vger_from_mbox
@@ -7,6 +7,7 @@ use Getopt::Long qw/:config gnu_getopt no_ignore_case auto_abbrev/;
use Date::Parse qw/str2time/;
use Email::MIME;
$Email::MIME::ContentType::STRICT_PARAMS = 0; # user input is imperfect
+use PublicInbox::Inbox;
use PublicInbox::V2Writable;
my $usage = "usage: $0 NAME EMAIL DIR <MBOX\n";
my $dry_run;
@@ -18,8 +19,10 @@ my $mainrepo = shift or die $usage; # /path/to/v2/repo
my $v2ibx = {
mainrepo => $mainrepo,
name => $name,
+ version => 2,
-primary_address => $email,
};
+$v2ibx = PublicInbox::Inbox->new($v2ibx);
my $im = $dry_run ? undef : PublicInbox::V2Writable->new($v2ibx, 1);
binmode STDIN;
my $msg = '';
--
EW
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 08/12] import_vger_from_inbox: allow "-V" option
2018-02-22 21:42 [WIP PATCH 0/12] v2: git repo rotation + parallel Xapian indexing Eric Wong (Contractor, The Linux Foundation)
` (6 preceding siblings ...)
2018-02-22 21:42 ` [PATCH 07/12] v2: support Xapian + SQLite indexing Eric Wong (Contractor, The Linux Foundation)
@ 2018-02-22 21:42 ` Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 09/12] import_vger_from_mbox: use PublicInbox::MIME and avoid clobbering Eric Wong (Contractor, The Linux Foundation)
` (4 subsequent siblings)
12 siblings, 0 replies; 14+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-02-22 21:42 UTC (permalink / raw)
To: meta
This will let us quickly test between v2 and v1 inboxes.
---
scripts/import_vger_from_mbox | 24 +++++++++++++++++++-----
1 file changed, 19 insertions(+), 5 deletions(-)
diff --git a/scripts/import_vger_from_mbox b/scripts/import_vger_from_mbox
index d30e8a3..abc2d37 100644
--- a/scripts/import_vger_from_mbox
+++ b/scripts/import_vger_from_mbox
@@ -9,21 +9,35 @@ use Email::MIME;
$Email::MIME::ContentType::STRICT_PARAMS = 0; # user input is imperfect
use PublicInbox::Inbox;
use PublicInbox::V2Writable;
+use PublicInbox::Import;
my $usage = "usage: $0 NAME EMAIL DIR <MBOX\n";
my $dry_run;
-my %opts = ( 'n|dry-run' => \$dry_run );
+my $version = 2;
+my %opts = (
+ 'n|dry-run' => \$dry_run,
+ 'V|version=i' => \$version,
+);
GetOptions(%opts) or die $usage;
my $name = shift or die $usage; # git
my $email = shift or die $usage; # git@vger.kernel.org
my $mainrepo = shift or die $usage; # /path/to/v2/repo
-my $v2ibx = {
+my $ibx = {
mainrepo => $mainrepo,
name => $name,
- version => 2,
+ version => $version,
-primary_address => $email,
};
-$v2ibx = PublicInbox::Inbox->new($v2ibx);
-my $im = $dry_run ? undef : PublicInbox::V2Writable->new($v2ibx, 1);
+$ibx = PublicInbox::Inbox->new($ibx);
+my $im;
+unless ($dry_run) {
+ if ($version >= 2) {
+ $im = PublicInbox::V2Writable->new($ibx, 1);
+ } else {
+ system(qw(git init --bare -q), $mainrepo);
+ my $git = PublicInbox::Git->new($mainrepo);
+ $im = PublicInbox::Import->new($git, $name, $email, $ibx);
+ }
+}
binmode STDIN;
my $msg = '';
use PublicInbox::Filter::Vger;
--
EW
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 09/12] import_vger_from_mbox: use PublicInbox::MIME and avoid clobbering
2018-02-22 21:42 [WIP PATCH 0/12] v2: git repo rotation + parallel Xapian indexing Eric Wong (Contractor, The Linux Foundation)
` (7 preceding siblings ...)
2018-02-22 21:42 ` [PATCH 08/12] import_vger_from_inbox: allow "-V" option Eric Wong (Contractor, The Linux Foundation)
@ 2018-02-22 21:42 ` Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 10/12] v2: parallelize Xapian indexing Eric Wong (Contractor, The Linux Foundation)
` (3 subsequent siblings)
12 siblings, 0 replies; 14+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-02-22 21:42 UTC (permalink / raw)
To: meta
It is less confusing without the clobber assignment; and
PublicInbox::MIME exists to workaround bugs in older
Email::MIME (which is in Debian 9 (stretch))
---
scripts/import_vger_from_mbox | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/scripts/import_vger_from_mbox b/scripts/import_vger_from_mbox
index abc2d37..1308483 100644
--- a/scripts/import_vger_from_mbox
+++ b/scripts/import_vger_from_mbox
@@ -46,12 +46,12 @@ my $vger = PublicInbox::Filter::Vger->new;
sub do_add ($$) {
my ($im, $msg) = @_;
$$msg =~ s/(\r?\n)+\z/$1/s;
- $msg = Email::MIME->new($$msg);
- $msg = $vger->scrub($msg);
+ my $mime = PublicInbox::MIME->new($msg);
+ $mime = $vger->scrub($mime);
return unless $im;
- $im->add($msg) or
+ $im->add($mime) or
warn "duplicate: ",
- $msg->header_obj->header_raw('Message-ID'), "\n";
+ $mime->header_obj->header_raw('Message-ID'), "\n";
}
# asctime: From example@example.com Fri Jun 23 02:56:55 2000
--
EW
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 10/12] v2: parallelize Xapian indexing
2018-02-22 21:42 [WIP PATCH 0/12] v2: git repo rotation + parallel Xapian indexing Eric Wong (Contractor, The Linux Foundation)
` (8 preceding siblings ...)
2018-02-22 21:42 ` [PATCH 09/12] import_vger_from_mbox: use PublicInbox::MIME and avoid clobbering Eric Wong (Contractor, The Linux Foundation)
@ 2018-02-22 21:42 ` Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 11/12] v2writable: round-robin to partitions based on article number Eric Wong (Contractor, The Linux Foundation)
` (2 subsequent siblings)
12 siblings, 0 replies; 14+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-02-22 21:42 UTC (permalink / raw)
To: meta
The parallelization requires splitting Msgmap, text+term
indexing, and thread-linking out into separate processes.
git-fast-import is fast, so we don't bother parallelizing it.
Msgmap (SQLite) and thread-linking (Xapian) must be serialized
because they rely on monotonically increasing numbers (NNTP
article number and internal thread_id, respectively).
We handle msgmap in the main process which drives fast-import.
When the article number is retrieved/generated, we write the
entire message to per-partition subprocesses via pipes for
expensive text+term indexing.
When these per-partition subprocesses are done with the
expensive text+term indexing, they write SearchMsg (small data)
to a shared pipe (inherited from the main V2Writable process)
back to the threader, which runs its own subprocess.
The number of text+term Xapian partitions is chosen at import
and can be made equal to the number of cores in a machine.
V2Writable --> Import -> git-fast-import
\-> SearchIdxThread -> Msgmap (synchronous)
\-> SearchIdxPart[n] -> SearchIdx[*]
\-> SearchIdxThread -> SearchIdx ("threader", a subprocess)
[* ] each subprocess writes to threader
---
MANIFEST | 2 +
lib/PublicInbox/Import.pm | 4 +-
lib/PublicInbox/Search.pm | 5 +-
lib/PublicInbox/SearchIdx.pm | 80 ++++++++++++++++++--------
lib/PublicInbox/SearchIdxPart.pm | 70 +++++++++++++++++++++++
lib/PublicInbox/SearchIdxThread.pm | 111 +++++++++++++++++++++++++++++++++++++
lib/PublicInbox/SearchMsg.pm | 33 +++++------
lib/PublicInbox/V2Writable.pm | 80 +++++++++++++++++---------
8 files changed, 314 insertions(+), 71 deletions(-)
create mode 100644 lib/PublicInbox/SearchIdxPart.pm
create mode 100644 lib/PublicInbox/SearchIdxThread.pm
diff --git a/MANIFEST b/MANIFEST
index 4b51b54..2a6f6f0 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -84,6 +84,8 @@ lib/PublicInbox/Reply.pm
lib/PublicInbox/SaPlugin/ListMirror.pm
lib/PublicInbox/Search.pm
lib/PublicInbox/SearchIdx.pm
+lib/PublicInbox/SearchIdxPart.pm
+lib/PublicInbox/SearchIdxThread.pm
lib/PublicInbox/SearchMsg.pm
lib/PublicInbox/SearchThread.pm
lib/PublicInbox/SearchView.pm
diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index 1a2698a..b650e4e 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -280,14 +280,12 @@ sub add {
$self->{bytes_added} += $n;
print $w "blob\nmark :$blob\ndata ", $n, "\n" or wfail;
print $w $str, "\n" or wfail;
- $str = undef;
# v2: we need this for Xapian
if ($self->{want_object_id}) {
chomp($self->{last_object_id} = $self->get_mark(":$blob"));
- $self->{last_object_size} = $n;
+ $self->{last_object} = [ $n, \$str ];
}
-
my $ref = $self->{ref};
my $commit = $self->{mark}++;
my $parent = $tip =~ /\A:/ ? $tip : undef;
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index eac11bd..3b28059 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -124,7 +124,10 @@ sub xdir {
if ($self->{version} == 1) {
"$self->{mainrepo}/public-inbox/xapian" . SCHEMA_VERSION;
} else {
- "$self->{mainrepo}/xap" . SCHEMA_VERSION;
+ my $dir = "$self->{mainrepo}/xap" . SCHEMA_VERSION;
+ my $part = $self->{partition};
+ defined $part or die "partition not given";
+ $dir .= "/$part";
}
}
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index c6c5bd2..cc7e7ec 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -51,7 +51,7 @@ sub git_unquote ($) {
}
sub new {
- my ($class, $ibx, $creat) = @_;
+ my ($class, $ibx, $creat, $part) = @_;
my $mainrepo = $ibx; # for "public-inbox-index" w/o entry in config
my $git_dir = $mainrepo;
my ($altid, $git);
@@ -83,7 +83,10 @@ sub new {
if ($version == 1) {
$self->{lock_path} = "$mainrepo/ssoma.lock";
} elsif ($version == 2) {
- $self->{lock_path} = "$mainrepo/inbox.lock";
+ defined $part or die "partition is required for v2\n";
+ # partition is a number or "all"
+ $self->{partition} = $part;
+ $self->{lock_path} = undef;
$self->{msgmap_path} = "$mainrepo/msgmap.sqlite3";
} else {
die "unsupported inbox version=$version\n";
@@ -119,14 +122,16 @@ sub _xdb_acquire {
sub _lock_acquire {
my ($self) = @_;
croak 'already locked' if $self->{lockfh};
- sysopen(my $lockfh, $self->{lock_path}, O_WRONLY|O_CREAT) or
- die "failed to open lock $self->{lock_path}: $!\n";
+ my $lock_path = $self->{lock_path} or return;
+ sysopen(my $lockfh, $lock_path, O_WRONLY|O_CREAT) or
+ die "failed to open lock $lock_path: $!\n";
flock($lockfh, LOCK_EX) or die "lock failed: $!\n";
$self->{lockfh} = $lockfh;
}
sub _lock_release {
my ($self) = @_;
+ return unless $self->{lock_path};
my $lockfh = delete $self->{lockfh} or croak 'not locked';
flock($lockfh, LOCK_UN) or die "unlock failed: $!\n";
close $lockfh or die "close failed: $!\n";
@@ -138,8 +143,8 @@ sub add_val ($$$) {
$doc->add_value($col, $num);
}
-sub add_values ($$$) {
- my ($smsg, $bytes, $num) = @_;
+sub add_values ($$$$) {
+ my ($smsg, $bytes, $num, $lines) = @_;
my $ts = $smsg->ts;
my $doc = $smsg->{doc};
@@ -149,8 +154,7 @@ sub add_values ($$$) {
defined($bytes) and add_val($doc, &PublicInbox::Search::BYTES, $bytes);
- add_val($doc, &PublicInbox::Search::LINES,
- $smsg->{mime}->body_raw =~ tr!\n!\n!);
+ add_val($doc, &PublicInbox::Search::LINES, $lines);
my $yyyymmdd = strftime('%Y%m%d', gmtime($ts));
add_val($doc, PublicInbox::Search::YYYYMMDD, $yyyymmdd);
@@ -281,6 +285,7 @@ sub add_message {
my ($doc_id, $old_tid);
my $mid = mid_clean(mid_mime($mime));
+ my $threader = $self->{threader};
eval {
die 'Message-ID too long' if length($mid) > MAX_MID_SIZE;
@@ -289,19 +294,22 @@ sub add_message {
# convert a ghost to a regular message
# it will also clobber any existing regular message
$doc_id = $smsg->{doc_id};
- $old_tid = $smsg->thread_id;
+ $old_tid = $smsg->thread_id unless $threader;
}
$smsg = PublicInbox::SearchMsg->new($mime);
my $doc = $smsg->{doc};
$doc->add_term('XMID' . $mid);
my $subj = $smsg->subject;
+ my $xpath;
if ($subj ne '') {
- my $path = $self->subject_path($subj);
- $doc->add_term('XPATH' . id_compress($path));
+ $xpath = $self->subject_path($subj);
+ $xpath = id_compress($xpath);
+ $doc->add_term('XPATH' . $xpath);
}
- add_values($smsg, $bytes, $num);
+ my $lines = $mime->body_raw =~ tr!\n!\n!;
+ add_values($smsg, $bytes, $num, $lines);
my $tg = $self->term_generator;
@@ -350,9 +358,16 @@ sub add_message {
index_body($tg, \@orig, $doc) if @orig;
});
- link_message($self, $smsg, $old_tid);
+ # populates smsg->references for smsg->to_doc_data
+ my $refs = parse_references($smsg);
+ my $data = $smsg->to_doc_data($blob);
+ if ($threader) {
+ $threader->thread_msg($mid, $smsg->ts, $xpath, $data);
+ } else {
+ link_message($self, $smsg, $refs, $old_tid);
+ }
$tg->index_text($mid, 1, 'XM');
- $doc->set_data($smsg->to_doc_data($blob));
+ $doc->set_data($data);
if (my $altid = $self->{-altid}) {
foreach my $alt (@$altid) {
@@ -424,8 +439,8 @@ sub next_thread_id {
$last_thread_id;
}
-sub link_message {
- my ($self, $smsg, $old_tid) = @_;
+sub parse_references ($) {
+ my ($smsg) = @_;
my $doc = $smsg->{doc};
my $mid = $smsg->mid;
my $mime = $smsg->{mime};
@@ -436,7 +451,6 @@ sub link_message {
my @refs = (($hdr->header_raw('References') || '') =~ /<([^>]+)>/g);
push(@refs, (($hdr->header_raw('In-Reply-To') || '') =~ /<([^>]+)>/g));
- my $tid;
if (@refs) {
my %uniq = ($mid => 1);
my @orig_refs = @refs;
@@ -452,25 +466,31 @@ sub link_message {
push @refs, $ref;
}
}
+ $smsg->{references} = '<'.join('> <', @refs).'>' if @refs;
+ \@refs
+}
- if (@refs) {
- $smsg->{references} = '<'.join('> <', @refs).'>';
+sub link_message {
+ my ($self, $smsg, $refs, $old_tid) = @_;
+ my $tid;
+
+ if (@$refs) {
# first ref *should* be the thread root,
# but we can never trust clients to do the right thing
- my $ref = shift @refs;
+ my $ref = shift @$refs;
$tid = $self->_resolve_mid_to_tid($ref);
$self->merge_threads($tid, $old_tid) if defined $old_tid;
# the rest of the refs should point to this tid:
- foreach $ref (@refs) {
+ foreach $ref (@$refs) {
my $ptid = $self->_resolve_mid_to_tid($ref);
merge_threads($self, $tid, $ptid);
}
} else {
$tid = defined $old_tid ? $old_tid : $self->next_thread_id;
}
- $doc->add_term('G' . $tid);
+ $smsg->{doc}->add_term('G' . $tid);
}
sub index_blob {
@@ -798,4 +818,20 @@ sub DESTROY {
$_[0]->{lockfh} = undef;
}
+# remote_* subs are only used by SearchIdxPart and SearchIdxThread:
+sub remote_commit {
+ my ($self) = @_;
+ print { $self->{w} } "commit\n" or die "failed to write commit: $!";
+}
+
+sub remote_close {
+ my ($self) = @_;
+ my $pid = delete $self->{pid} or die "no process to wait on\n";
+ my $w = delete $self->{w} or die "no pipe to write to\n";
+ print $w "close\n" or die "failed to write to pid:$pid: $!\n";
+ close $w or die "failed to close pipe for pid:$pid: $!\n";
+ waitpid($pid, 0) == $pid or die "remote process did not finish";
+ $? == 0 or die ref($self)." exited with: $?";
+}
+
1;
diff --git a/lib/PublicInbox/SearchIdxPart.pm b/lib/PublicInbox/SearchIdxPart.pm
new file mode 100644
index 0000000..d5a3fd1
--- /dev/null
+++ b/lib/PublicInbox/SearchIdxPart.pm
@@ -0,0 +1,70 @@
+# Copyright (C) 2018 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+package PublicInbox::SearchIdxPart;
+use strict;
+use warnings;
+use base qw(PublicInbox::SearchIdx);
+
+sub new {
+ my ($class, $v2writable, $part, $threader) = @_;
+ my $self = $class->SUPER::new($v2writable->{-inbox}, 1, $part);
+ $self->{threader} = $threader;
+ my ($r, $w);
+ pipe($r, $w) or die "pipe failed: $!\n";
+ my $pid = fork;
+ defined $pid or die "fork failed: $!\n";
+ if ($pid == 0) {
+ foreach my $other (@{$v2writable->{idx_parts}}) {
+ my $other_w = $other->{w} or next;
+ close $other_w or die "close other failed: $!\n";
+ }
+ $v2writable = undef;
+ close $w;
+ eval { partition_worker_loop($self, $r) };
+ die "worker $part died: $@\n" if $@;
+ die "unexpected MM $self->{mm}" if $self->{mm};
+ exit;
+ }
+ $self->{pid} = $pid;
+ $self->{w} = $w;
+ close $r;
+ $self;
+}
+
+sub partition_worker_loop ($$) {
+ my ($self, $r) = @_;
+ my $xdb = $self->_xdb_acquire;
+ $xdb->begin_transaction;
+ my $txn = 1;
+ while (my $line = $r->getline) {
+ if ($line eq "commit\n") {
+ $xdb->commit_transaction if $txn;
+ $txn = undef;
+ } elsif ($line eq "close\n") {
+ $self->_xdb_release;
+ $xdb = $txn = undef;
+ } else {
+ my ($len, $artnum, $object_id) = split(/ /, $line);
+ $xdb ||= $self->_xdb_acquire;
+ if (!$txn) {
+ $xdb->begin_transaction;
+ $txn = 1;
+ }
+ my $n = read($r, my $msg, $len) or die "read: $!\n";
+ $n == $len or die "short read: $n != $len\n";
+ my $mime = PublicInbox::MIME->new(\$msg);
+ $self->index_blob($mime, $len, $artnum, $object_id);
+ }
+ }
+ warn "$$ still in transaction\n" if $txn;
+ warn "$$ xdb active\n" if $xdb;
+}
+
+# called by V2Writable
+sub index_raw {
+ my ($self, $len, $msgref, $artnum, $object_id) = @_;
+ print { $self->{w} } "$len $artnum $object_id\n", $$msgref or die
+ "failed to write partition $!\n";
+}
+
+1;
diff --git a/lib/PublicInbox/SearchIdxThread.pm b/lib/PublicInbox/SearchIdxThread.pm
new file mode 100644
index 0000000..6471309
--- /dev/null
+++ b/lib/PublicInbox/SearchIdxThread.pm
@@ -0,0 +1,111 @@
+# Copyright (C) 2018 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+package PublicInbox::SearchIdxThread;
+use strict;
+use warnings;
+use base qw(PublicInbox::SearchIdx);
+use Storable qw(freeze thaw);
+
+sub new {
+ my ($class, $v2ibx) = @_;
+ my $self = $class->SUPER::new($v2ibx, 1, 'all');
+ # create the DB:
+ $self->_xdb_acquire;
+ $self->_xdb_release;
+
+ my ($r, $w);
+ pipe($r, $w) or die "pipe failed: $!\n";
+ binmode $r, ':raw';
+ binmode $w, ':raw';
+ my $pid = fork;
+ defined $pid or die "fork failed: $!\n";
+ if ($pid == 0) {
+ close $w;
+ eval { thread_worker_loop($self, $r) };
+ die "thread worker died: $@\n" if $@;
+ exit;
+ }
+ $self->{w} = $w;
+ $self->{pid} = $pid;
+ close $r;
+
+ $w->autoflush(1);
+
+ # lock on only exists in parent, not in worker
+ my $l = $self->{lock_path} = $self->xdir . '/thread.lock';
+ open my $fh, '>>', $l or die "failed to create $l: $!\n";
+ $self;
+}
+
+sub thread_worker_loop {
+ my ($self, $r) = @_;
+ my $msg;
+ my $xdb = $self->_xdb_acquire;
+ $xdb->begin_transaction;
+ my $txn = 1;
+ while (my $line = $r->getline) {
+ if ($line eq "commit\n") {
+ $xdb->commit_transaction if $txn;
+ $txn = undef;
+ } elsif ($line eq "close\n") {
+ $self->_xdb_release;
+ $xdb = $txn = undef;
+ } else {
+ read($r, $msg, $line) or die "read failed: $!\n";
+ $msg = thaw($msg); # should raise on error
+ defined $msg or die "failed to thaw buffer\n";
+ if (!$txn) {
+ $xdb->begin_transaction;
+ $txn = 1;
+ }
+ eval { $self->thread_msg_real(@$msg) };
+ warn "failed to index message <$msg->[0]>: $@\n" if $@;
+ }
+ }
+}
+
+# called by a partition worker
+sub thread_msg {
+ my ($self, $mid, $ts, $xpath, $doc_data) = @_;
+ my $w = $self->{w};
+ my $err;
+ my $str = freeze([ $mid, $ts, $xpath, $doc_data ]);
+ my $len = length($str) . "\n";
+
+ # multiple processes write to the same pipe, so use flock
+ $self->_lock_acquire;
+ print $w $len, $str or $err = $!;
+ $self->_lock_release;
+
+ die "print failed: $err\n" if $err;
+}
+
+sub thread_msg_real {
+ my ($self, $mid, $ts, $xpath, $doc_data) = @_;
+ my $smsg = $self->lookup_message($mid);
+ my ($old_tid, $doc_id);
+ if ($smsg) {
+ # convert a ghost to a regular message
+ # it will also clobber any existing regular message
+ $doc_id = $smsg->{doc_id};
+ $old_tid = $smsg->thread_id;
+ } else {
+ $smsg = PublicInbox::SearchMsg->new(undef);
+ $smsg->{mid} = $mid;
+ }
+ my $doc = $smsg->{doc};
+ $doc->add_term('XPATH' . $xpath) if defined $xpath;
+ $doc->add_term('XMID' . $mid);
+ $doc->set_data($doc_data);
+ $smsg->{ts} = $ts;
+ my @refs = ($smsg->references =~ /<([^>]+)>/g);
+ $self->link_message($smsg, \@refs, $old_tid);
+ my $db = $self->{xdb};
+ if (defined $doc_id) {
+ $db->replace_document($doc_id, $doc);
+ } else {
+ $doc_id = $db->add_document($doc);
+ }
+}
+
+1;
diff --git a/lib/PublicInbox/SearchMsg.pm b/lib/PublicInbox/SearchMsg.pm
index 25c1abb..941bfd2 100644
--- a/lib/PublicInbox/SearchMsg.pm
+++ b/lib/PublicInbox/SearchMsg.pm
@@ -29,19 +29,24 @@ sub get_val ($$) {
Search::Xapian::sortable_unserialise($doc->get_value($col));
}
-sub load_expand {
- my ($self) = @_;
- my $doc = $self->{doc};
- my $data = $doc->get_data or return;
- $self->{ts} = get_val($doc, &PublicInbox::Search::TS);
- utf8::decode($data);
- my ($subj, $from, $refs, $to, $cc, $blob) = split(/\n/, $data);
+sub load_from_data ($$) {
+ my ($self) = $_[0]; # data = $_[1]
+ my ($subj, $from, $refs, $to, $cc, $blob) = split(/\n/, $_[1]);
$self->{subject} = $subj;
$self->{from} = $from;
$self->{references} = $refs;
$self->{to} = $to;
$self->{cc} = $cc;
$self->{blob} = $blob;
+}
+
+sub load_expand {
+ my ($self) = @_;
+ my $doc = $self->{doc};
+ my $data = $doc->get_data or return;
+ $self->{ts} = get_val($doc, &PublicInbox::Search::TS);
+ utf8::decode($data);
+ load_from_data($self, $data);
$self;
}
@@ -50,17 +55,9 @@ sub load_doc {
my $data = $doc->get_data or return;
my $ts = get_val($doc, &PublicInbox::Search::TS);
utf8::decode($data);
- my ($subj, $from, $refs, $to, $cc, $blob) = split(/\n/, $data);
- bless {
- doc => $doc,
- subject => $subj,
- ts => $ts,
- from => $from,
- references => $refs,
- to => $to,
- cc => $cc,
- blob => $blob,
- }, $class;
+ my $self = bless { doc => $doc, ts => $ts }, $class;
+ load_from_data($self, $data);
+ $self
}
# :bytes and :lines metadata in RFC 3977
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 41bfb8d..cb74ab1 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -6,7 +6,8 @@ package PublicInbox::V2Writable;
use strict;
use warnings;
use Fcntl qw(:flock :DEFAULT);
-use PublicInbox::SearchIdx;
+use PublicInbox::SearchIdxPart;
+use PublicInbox::SearchIdxThread;
use PublicInbox::MIME;
use PublicInbox::Git;
use PublicInbox::Import;
@@ -32,7 +33,8 @@ sub new {
im => undef, # PublicInbox::Import
xap_rw => undef, # PublicInbox::V2SearchIdx
xap_ro => undef,
-
+ partitions => 4,
+ transact_bytes => 0,
# limit each repo to 1GB or so
rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
};
@@ -55,29 +57,39 @@ sub add {
my $cmt = $im->add($mime, $check_cb) or return;
$cmt = $im->get_mark($cmt);
my $oid = $im->{last_object_id};
- my $size = $im->{last_object_size};
-
- my $idx = $self->search_idx;
- $idx->index_both($mime, $size, $oid);
- $idx->{xdb}->set_metadata('last_commit', $cmt);
- my $n = $self->{transact_bytes} += $size;
- if ($n > PublicInbox::SearchIdx::BATCH_BYTES) {
+ my ($len, $msgref) = @{$im->{last_object}};
+
+ my $nparts = $self->{partitions};
+ my $part = hex(substr($oid, 0, 8)) % $nparts;
+ my $idx = $self->idx_part($part);
+ my $all = $self->{all};
+ my $num = $all->index_mm($mime);
+ $idx->index_raw($len, $msgref, $num, $oid);
+ my $n = $self->{transact_bytes} += $len;
+ if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) {
$self->checkpoint;
}
$mime;
}
-sub search_idx {
- my ($self) = @_;
- $self->{idx} ||= eval {
- my $idx = PublicInbox::SearchIdx->new($self->{-inbox}, 1);
- my $mm = $idx->_msgmap_init;
- $idx->_xdb_acquire->begin_transaction;
- $self->{transact_bytes} = 0;
- $mm->{dbh}->begin_work;
- $idx
- };
+sub idx_part {
+ my ($self, $part) = @_;
+ my $idx = $self->{idx_parts};
+ return $idx->[$part] if $idx; # fast path
+
+ # first time initialization:
+ my $all = $self->{all} =
+ PublicInbox::SearchIdxThread->new($self->{-inbox});
+
+ # need to create all parts before initializing msgmap FD
+ my $max = $self->{partitions} - 1;
+ $idx = $self->{idx_parts} = [];
+ for my $i (0..$max) {
+ push @$idx, PublicInbox::SearchIdxPart->new($self, $i, $all);
+ }
+ $all->_msgmap_init->{dbh}->begin_work;
+ $idx->[$part];
}
sub remove {
@@ -99,23 +111,37 @@ sub done {
my ($self) = @_;
my $im = $self->{im};
$im->done if $im; # PublicInbox::Import::done
- $self->searchidx_checkpoint;
+ $self->searchidx_checkpoint(0);
}
sub checkpoint {
my ($self) = @_;
my $im = $self->{im};
$im->checkpoint if $im; # PublicInbox::Import::checkpoint
- $self->searchidx_checkpoint;
+ $self->searchidx_checkpoint(1);
}
sub searchidx_checkpoint {
- my ($self) = @_;
- my $idx = delete $self->{idx} or return;
+ my ($self, $more) = @_;
+
+ # order matters, we can only close {all} after all partitions
+ # are done because the partitions also write to {all}
+
+ my $parts = $self->{idx_parts};
+ foreach my $idx (@$parts) {
+ $idx->remote_commit;
+ $idx->remote_close unless $more;
+ }
- $idx->{mm}->{dbh}->commit;
- $idx->{xdb}->commit_transaction;
- $idx->_xdb_release;
+ if (my $all = $self->{all}) {
+ $all->{mm}->{dbh}->commit;
+ if ($more) {
+ $all->{mm}->{dbh}->begin_work;
+ }
+ $all->remote_commit;
+ $all->remote_close unless $more;
+ }
+ $self->{transact_bytes} = 0;
}
sub git_init {
@@ -158,7 +184,7 @@ sub importer {
} else {
$self->{im} = undef;
$im->done;
- $self->searchidx_checkpoint;
+ $self->searchidx_checkpoint(1);
$im = undef;
my $git_dir = $self->git_init(++$self->{max_git});
my $git = PublicInbox::Git->new($git_dir);
--
EW
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 11/12] v2writable: round-robin to partitions based on article number
2018-02-22 21:42 [WIP PATCH 0/12] v2: git repo rotation + parallel Xapian indexing Eric Wong (Contractor, The Linux Foundation)
` (9 preceding siblings ...)
2018-02-22 21:42 ` [PATCH 10/12] v2: parallelize Xapian indexing Eric Wong (Contractor, The Linux Foundation)
@ 2018-02-22 21:42 ` Eric Wong (Contractor, The Linux Foundation)
2018-02-22 21:42 ` [PATCH 12/12] searchidxpart: increase pipe size for partitions Eric Wong (Contractor, The Linux Foundation)
2018-02-23 1:22 ` [WIP PATCH 0/12] v2: git repo rotation + parallel Xapian indexing Eric Wong
12 siblings, 0 replies; 14+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-02-22 21:42 UTC (permalink / raw)
To: meta
Instead of relying on the git object_id hash to partition,
round-robin to these partitions based on the NNTP article
number. This reduces the partition pipes as a source of
contention when two (or more) sequential messages end up
going to the same partition.
---
lib/PublicInbox/V2Writable.pm | 34 +++++++++++++++++++++-------------
1 file changed, 21 insertions(+), 13 deletions(-)
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index cb74ab1..cf19c76 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -17,6 +17,9 @@ $Email::MIME::ContentType::STRICT_PARAMS = 0;
# an estimate of the post-packed size to the raw uncompressed size
my $PACKING_FACTOR = 0.4;
+# assume 2 cores if GNU nproc(1) is not available
+my $NPROC = int($ENV{NPROC} || `nproc 2>/dev/null` || 2);
+
sub new {
my ($class, $v2ibx, $creat) = @_;
my $dir = $v2ibx->{mainrepo} or die "no mainrepo in inbox\n";
@@ -33,7 +36,7 @@ sub new {
im => undef, # PublicInbox::Import
xap_rw => undef, # PublicInbox::V2SearchIdx
xap_ro => undef,
- partitions => 4,
+ partitions => $NPROC,
transact_bytes => 0,
# limit each repo to 1GB or so
rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
@@ -59,11 +62,11 @@ sub add {
my $oid = $im->{last_object_id};
my ($len, $msgref) = @{$im->{last_object}};
+ $self->idx_init;
+ my $num = $self->{all}->index_mm($mime);
my $nparts = $self->{partitions};
- my $part = hex(substr($oid, 0, 8)) % $nparts;
+ my $part = $num % $nparts;
my $idx = $self->idx_part($part);
- my $all = $self->{all};
- my $num = $all->index_mm($mime);
$idx->index_raw($len, $msgref, $num, $oid);
my $n = $self->{transact_bytes} += $len;
if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) {
@@ -75,21 +78,23 @@ sub add {
sub idx_part {
my ($self, $part) = @_;
- my $idx = $self->{idx_parts};
- return $idx->[$part] if $idx; # fast path
+ $self->{idx_parts}->[$part];
+}
+sub idx_init {
+ my ($self) = @_;
+ return if $self->{idx_parts};
# first time initialization:
- my $all = $self->{all} =
+ my $all = $self->{all} =
PublicInbox::SearchIdxThread->new($self->{-inbox});
# need to create all parts before initializing msgmap FD
my $max = $self->{partitions} - 1;
- $idx = $self->{idx_parts} = [];
+ my $idx = $self->{idx_parts} = [];
for my $i (0..$max) {
push @$idx, PublicInbox::SearchIdxPart->new($self, $i, $all);
}
$all->_msgmap_init->{dbh}->begin_work;
- $idx->[$part];
}
sub remove {
@@ -127,10 +132,12 @@ sub searchidx_checkpoint {
# order matters, we can only close {all} after all partitions
# are done because the partitions also write to {all}
- my $parts = $self->{idx_parts};
- foreach my $idx (@$parts) {
- $idx->remote_commit;
- $idx->remote_close unless $more;
+ if (my $parts = $self->{idx_parts}) {
+ foreach my $idx (@$parts) {
+ $idx->remote_commit;
+ $idx->remote_close unless $more;
+ }
+ delete $self->{idx_parts} unless $more;
}
if (my $all = $self->{all}) {
@@ -140,6 +147,7 @@ sub searchidx_checkpoint {
}
$all->remote_commit;
$all->remote_close unless $more;
+ delete $self->{all} unless $more;
}
$self->{transact_bytes} = 0;
}
--
EW
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [PATCH 12/12] searchidxpart: increase pipe size for partitions
2018-02-22 21:42 [WIP PATCH 0/12] v2: git repo rotation + parallel Xapian indexing Eric Wong (Contractor, The Linux Foundation)
` (10 preceding siblings ...)
2018-02-22 21:42 ` [PATCH 11/12] v2writable: round-robin to partitions based on article number Eric Wong (Contractor, The Linux Foundation)
@ 2018-02-22 21:42 ` Eric Wong (Contractor, The Linux Foundation)
2018-02-23 1:22 ` [WIP PATCH 0/12] v2: git repo rotation + parallel Xapian indexing Eric Wong
12 siblings, 0 replies; 14+ messages in thread
From: Eric Wong (Contractor, The Linux Foundation) @ 2018-02-22 21:42 UTC (permalink / raw)
To: meta
We want to reduce the time in the main V2Writable process
spends writing to the pipe, as the main process itself is
the primary source of contention.
While we're at it, always flush after writing to ensure
the child sees it at once. (Grr... Perl doesn't use writev)
---
lib/PublicInbox/SearchIdxPart.pm | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git a/lib/PublicInbox/SearchIdxPart.pm b/lib/PublicInbox/SearchIdxPart.pm
index d5a3fd1..5582d67 100644
--- a/lib/PublicInbox/SearchIdxPart.pm
+++ b/lib/PublicInbox/SearchIdxPart.pm
@@ -20,6 +20,11 @@ sub new {
}
$v2writable = undef;
close $w;
+
+ # F_SETPIPE_SZ = 1031 on Linux; increasing the pipe size here
+ # speeds V2Writable batch imports across 8 cores by nearly 20%
+ fcntl($r, 1031, 1048576) if $^O eq 'linux';
+
eval { partition_worker_loop($self, $r) };
die "worker $part died: $@\n" if $@;
die "unexpected MM $self->{mm}" if $self->{mm};
@@ -63,8 +68,10 @@ sub partition_worker_loop ($$) {
# called by V2Writable
sub index_raw {
my ($self, $len, $msgref, $artnum, $object_id) = @_;
- print { $self->{w} } "$len $artnum $object_id\n", $$msgref or die
+ my $w = $self->{w};
+ print $w "$len $artnum $object_id\n", $$msgref or die
"failed to write partition $!\n";
+ $w->flush or die "failed to flush: $!\n";
}
1;
--
EW
^ permalink raw reply related [flat|nested] 14+ messages in thread
* Re: [WIP PATCH 0/12] v2: git repo rotation + parallel Xapian indexing
2018-02-22 21:42 [WIP PATCH 0/12] v2: git repo rotation + parallel Xapian indexing Eric Wong (Contractor, The Linux Foundation)
` (11 preceding siblings ...)
2018-02-22 21:42 ` [PATCH 12/12] searchidxpart: increase pipe size for partitions Eric Wong (Contractor, The Linux Foundation)
@ 2018-02-23 1:22 ` Eric Wong
12 siblings, 0 replies; 14+ messages in thread
From: Eric Wong @ 2018-02-23 1:22 UTC (permalink / raw)
To: meta
> git+Xapian+SQLite 4 parts: ~15 minutes (2 + 2 hyperthread)
That was for 2017, which was 11.6% of the 2000-2017 archives.
For the 2000-2017 set, it took 5h20m; which is disappointing to
me but probably tolerable for now.
2017-only didn't trigger repo rotation at ~1G (being only 750MB
or so)
There was other activity on the machine (including
"git gc --auto" packing) going on but I was still hoping to
be done in roughly 3 hours.
I noticed a lot of I/O wait and Xapian DBs end up taking 53G
(uncompacted) while the git repos only used 6.2G.
xapian-compact brings the combined DB sizes down to 31G.
Looking at the UI bits, there seems to be some weird/wrong
dates in the thread index from older messages. Will have
to investigate.
^ permalink raw reply [flat|nested] 14+ messages in thread