* [RFC] lei_xkw: eXternal KeyWord index
2021-02-24 20:49 lei: per-message keywords and externals Eric Wong
2021-02-26 9:26 ` Eric Wong
@ 2021-03-03 14:01 ` Eric Wong
2021-03-13 11:50 ` Eric Wong
1 sibling, 1 reply; 5+ messages in thread
From: Eric Wong @ 2021-03-03 14:01 UTC (permalink / raw)
To: meta
This will be yet another new, persistent on-disk format in the
upcoming release. It'll be used for storing per-message
keywords (AKA flags). They're not fleshed out at all with
typical overview info; just OIDs, Message-IDs, and the keywords
themselves.
It's not wired into lei/store, yet; but that's the plan.
With some extensive stress testing, this is 2.5GB after ~11.8M
messages with one keyword-per-message set.
---
MANIFEST | 4 ++
lib/PublicInbox/LeiXkw.pm | 87 ++++++++++++++++++++++++++++++++++++
lib/PublicInbox/LeiXkwIdx.pm | 85 +++++++++++++++++++++++++++++++++++
lib/PublicInbox/SearchIdx.pm | 6 ++-
t/lei_xkw.t | 40 +++++++++++++++++
xt/lei_xkw_stress.t | 57 +++++++++++++++++++++++
6 files changed, 278 insertions(+), 1 deletion(-)
create mode 100644 lib/PublicInbox/LeiXkw.pm
create mode 100644 lib/PublicInbox/LeiXkwIdx.pm
create mode 100644 t/lei_xkw.t
create mode 100644 xt/lei_xkw_stress.t
diff --git a/MANIFEST b/MANIFEST
index 8c9c86a0..751af8ff 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -193,6 +193,8 @@ lib/PublicInbox/LeiSearch.pm
lib/PublicInbox/LeiStore.pm
lib/PublicInbox/LeiToMail.pm
lib/PublicInbox/LeiXSearch.pm
+lib/PublicInbox/LeiXkw.pm
+lib/PublicInbox/LeiXkwIdx.pm
lib/PublicInbox/Linkify.pm
lib/PublicInbox/Listener.pm
lib/PublicInbox/Lock.pm
@@ -384,6 +386,7 @@ t/lei_external.t
t/lei_overview.t
t/lei_store.t
t/lei_to_mail.t
+t/lei_xkw.t
t/lei_xsearch.t
t/linkify.t
t/main-bin/spamc
@@ -478,6 +481,7 @@ xt/imapd-mbsync-oimap.t
xt/imapd-validate.t
xt/lei-auth-fail.t
xt/lei-sigpipe.t
+xt/lei_xkw_stress.t
xt/mem-imapd-tls.t
xt/mem-msgview.t
xt/msgtime_cmp.t
diff --git a/lib/PublicInbox/LeiXkw.pm b/lib/PublicInbox/LeiXkw.pm
new file mode 100644
index 00000000..038e1cc2
--- /dev/null
+++ b/lib/PublicInbox/LeiXkw.pm
@@ -0,0 +1,87 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# read-only counterpart to LeiXkwIdx
+package PublicInbox::LeiXkw;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::Search);
+use PublicInbox::ContentHash qw(content_hash);
+use PublicInbox::Eml;
+use PublicInbox::Search qw(xap_terms);
+use PublicInbox::MID qw(mids_for_index);
+
+sub new {
+ my ($cls, $dir, $opt) = @_;
+ bless { xpfx => $dir, git => $opt->{git} }, $cls;
+}
+
+sub _content_cmp { # git->cat_async callback
+ my ($bref, $oid, $type, $size, $cmp) = @_;
+ if ($bref) {
+ my $existing = content_hash(PublicInbox::Eml->new($bref));
+ return if $cmp->{expect} ne $existing;
+ push @{$cmp->{hits}}, $oid;
+ } else {
+ push @{$cmp->{gone}}, $oid;
+ }
+}
+
+sub _docids_by_mids ($$$) {
+ my ($self, $eml, $smsg) = @_;
+ my $cmp = { expect => content_hash($eml), hits => [] };
+ my $mids = $smsg->{mids4idx} //= mids_for_index($eml);
+ my $xdb = $self->{xdb};
+ my $git = $self->{git};
+ my %oid2docid;
+ for my $mid (@$mids) { # typically 1
+ my $head = $xdb->postlist_begin('Q'.$mid);
+ my $tail = $xdb->postlist_end('Q'.$mid);
+ for (; $head != $tail; $head++) {
+ my $docid = $head->get_docid;
+ my $oids = xap_terms('U', $xdb, $docid);
+ for my $oid (keys %$oids) {
+ next if exists $oid2docid{$oid};
+ $oid2docid{$oid} = $docid;
+ $git->cat_async($oid, \&_content_cmp, $cmp);
+ }
+ }
+ }
+ $git->cat_async_wait;
+ for my $oid (@{$cmp->{gone} // []}) {
+ my $docid = $oid2docid{$oid} // die "BUG $oid not mapped";
+ $xdb->delete_document($docid);
+ }
+ map { $oid2docid{$_} // die "BUG $_ miss (@$mids)" } @{$cmp->{hits}};
+}
+
+sub docids_for ($$$) {
+ my ($self, $eml, $smsg) = @_;
+ my $xdb = $self->{xdb};
+ my $oid = $smsg->{blob} // die 'BUG: no blob';
+ my $head = $xdb->postlist_begin('U'.$oid);
+ my $tail = $xdb->postlist_end('U'.$oid);
+ return ($head->get_docid) if $head != $tail;
+ _docids_by_mids($self, $eml, $smsg);
+}
+
+sub xdb_shards_flat { # for ->xdb
+ my ($self) = @_;
+ PublicInbox::Search::load_xapian();
+ ($PublicInbox::Search::X{Database}->new($self->{xpfx}));
+}
+
+sub get_xkw {
+ my ($self, $eml, $smsg) = @_;
+ $self->xdb;
+ my @docids = docids_for($self, $eml, $smsg);
+ my %all;
+ # unusual for @docids > 1, but dedupe can change
+ for my $docid (@docids) {
+ my $terms = xap_terms('K', $self->{xdb}, $docid);
+ %all = (%all, %$terms);
+ }
+ sort keys %all
+}
+
+1;
diff --git a/lib/PublicInbox/LeiXkwIdx.pm b/lib/PublicInbox/LeiXkwIdx.pm
new file mode 100644
index 00000000..8f53c3ef
--- /dev/null
+++ b/lib/PublicInbox/LeiXkwIdx.pm
@@ -0,0 +1,85 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# eXternal keyword index - for dealing with keyword storage on
+# read-only local external inboxes and extindex.
+#
+# Only boolean term prefixes:
+# Q - Message-ID (like SearchIdx)
+# U - git blob OID (URL)
+# K - keywords
+package PublicInbox::LeiXkwIdx;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::SearchIdx);
+use PublicInbox::Search qw(xap_terms);
+use PublicInbox::InboxWritable;
+use PublicInbox::LeiXkw;
+
+*docids_for = \&PublicInbox::LeiXkw::docids_for;
+
+sub new {
+ my ($cls, $dir, $opt) = @_;
+ PublicInbox::SearchIdx::load_xapian_writable();
+ my $fl = $PublicInbox::SearchIdx::DB_CREATE_OR_OPEN;
+ $fl |= $PublicInbox::SearchIdx::DB_NO_SYNC if $opt->{-no_fsync};
+ bless {
+ xpfx => $dir, xdb_flags => $fl, umask => 077,
+ indexlevel => 'full', creat => 1, git => $opt->{git},
+ }, $cls;
+}
+
+sub set_xkw {
+ my ($self, $eml, $smsg, $method) = @_;
+ my $kw = $smsg->{kw} // die 'BUG: no {kw}';
+ $self->begin_txn_lazy;
+ my @docids = docids_for($self, $eml, $smsg);
+ if (!@docids) { # brand new
+ return () unless scalar(@$kw);
+ my $doc = $PublicInbox::Search::X{Document}->new;
+ $doc->add_boolean_term('U' . $smsg->{blob});
+ $doc->add_boolean_term('K' . $_) for @$kw;
+ for my $mid (@{$smsg->{mids4idx}}) {
+ $doc->add_boolean_term('Q' . $mid);
+ }
+ return ($self->{xdb}->add_document($doc));
+ # modify existing
+ } elsif (scalar @$kw) {
+ $method //= 'set_keywords'; # | add_keywords | remove_keywords
+ for my $docid (@docids) {
+ $self->$method($docid, @$kw);
+ }
+ # we fell back to mids matching, speed up future matches
+ if ($self->{mids4idx}) {
+ for my $docid (@docids) {
+ my $doc = $self->{xdb}->get_document($docid);
+ $doc->add_boolean_term('U' . $smsg->{blob});
+ $self->{xdb}->replace_document($docid, $doc);
+ }
+ }
+ } else {
+ for my $docid (@docids) {
+ $self->{xdb}->delete_document($docid);
+ }
+ }
+ @docids;
+}
+
+sub xdb { $_[0]->begin_txn_lazy } # for get_xkw
+
+sub clear_xkw {
+ my ($self, $eml, $smsg) = @_;
+ $self->begin_txn_lazy;
+ my @docids = docids_for($self, $eml, $smsg);
+ for my $docid (@docids) {
+ $self->{xdb}->delete_document($docid);
+ }
+ @docids
+}
+
+no warnings 'once';
+*DESTROY = \&PublicInbox::SearchIdx::commit_txn_lazy;
+*with_umask = \&PublicInbox::InboxWritable::with_umask;
+*get_xkw = \&PublicInbox::LeiXkw::get_xkw;
+
+1;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 826302de..bba89ae7 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -136,7 +136,11 @@ sub idx_acquire {
}
}
return unless defined $flag;
- $flag |= $DB_NO_SYNC if ($self->{ibx} // $self->{eidx})->{-no_fsync};
+ if (defined(my $fl = $self->{xdb_flags})) {
+ $flag |= $fl; # LeiXkwIdx (and future classes)
+ } elsif (($self->{ibx} // $self->{eidx})->{-no_fsync}) {
+ $flag |= $DB_NO_SYNC;
+ }
my $xdb = eval { ($X->{WritableDatabase})->new($dir, $flag) };
croak "Failed opening $dir: $@" if $@;
$self->{xdb} = $xdb;
diff --git a/t/lei_xkw.t b/t/lei_xkw.t
new file mode 100644
index 00000000..ea21a0fe
--- /dev/null
+++ b/t/lei_xkw.t
@@ -0,0 +1,40 @@
+#!perl -w
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict; use v5.10.1; use PublicInbox::TestCommon;
+require_mods(qw(json DBD::SQLite Search::Xapian));
+use_ok 'PublicInbox::LeiXkwIdx';
+use PublicInbox::Git;
+my ($tmpdir, $for_destroy) = tmpdir;
+my ($ro_home, $cfg_path) = setup_public_inboxes;
+my $git = PublicInbox::Git->new("$ro_home/t1");
+my $xkw = PublicInbox::LeiXkwIdx->new("$tmpdir/xkw", { git => $git });
+my $smsg = {
+ blob => '9bf1002c49eb075df47247b74d69bcd555e23422',
+ kw => ['seen'],
+};
+my $req = { blob => $smsg->{blob} };
+my $eml = eml_load('t/utf8.eml');
+is_deeply([$xkw->get_xkw($eml, $smsg)], [], 'no keywords, yet');
+is($xkw->{xdb}->get_doccount, 0, 'no documents created');
+
+my @d = $xkw->set_xkw($eml, $smsg);
+is(scalar(@d), 1, 'set one docid');
+like($d[0], qr/\A\d+\z/, 'set a numeric docid');
+
+is_deeply([$xkw->get_xkw($eml, $req)], ['seen'], 'got keywords back');
+
+$smsg->{kw} = [qw(seen answered)];
+is_deeply([$xkw->set_xkw($eml, $smsg)], \@d, 'updated existing doc');
+is_deeply([$xkw->get_xkw($eml, $req)], [qw(answered seen)],
+ 'got keywords back');
+undef $xkw;
+
+$xkw = PublicInbox::LeiXkwIdx->new("$tmpdir/xkw", { git => $git });
+is_deeply([$xkw->get_xkw($eml, $req)], [qw(answered seen)],
+ 'got keywords back after auto-commit');
+
+is_deeply([$xkw->clear_xkw($eml, $req)], \@d, 'keywords cleared');
+is_deeply([$xkw->get_xkw($eml, $req)], [], 'keywords gone');
+
+done_testing;
diff --git a/xt/lei_xkw_stress.t b/xt/lei_xkw_stress.t
new file mode 100644
index 00000000..69f66d07
--- /dev/null
+++ b/xt/lei_xkw_stress.t
@@ -0,0 +1,57 @@
+#!perl -w
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict; use v5.10.1; use PublicInbox::TestCommon;
+use Time::HiRes qw(time);
+use PublicInbox::Eml;
+my $dir = $ENV{TEST_LEI_XKW_DIR} or
+ plan skip_all => 'TEST_LEI_XKW_DIR unset';
+my $git_dir = $ENV{GIANT_GIT_DIR} // `git rev-parse --git-dir` or
+ plan skip_all => 'GIANT_GIT_DIR unset';
+require_mods(qw(json DBD::SQLite Search::Xapian));
+use_ok 'PublicInbox::LeiXkwIdx';
+my @cat = qw(cat-file --buffer --batch-check --batch-all-objects);
+if (require_git(2.19, 1)) {
+ push @cat, '--unordered';
+} else {
+ warn "git <2.19, cat-file lacks --unordered, locality suffers\n";
+}
+chomp $git_dir;
+my $xkw = PublicInbox::LeiXkwIdx->new($dir, { -no_fsync => 1 });
+my $git = $xkw->{git} = PublicInbox::Git->new($git_dir);
+my $eml = PublicInbox::Eml->new('');
+my $hdr = $eml->{hdr};
+my $cat = $git->popen(@cat);
+my $kw = [];
+my $smsg = { kw => $kw };
+my @kw_set = qw(seen answered flagged draft);
+my $nr = 0;
+my $full_eml = sub {
+ my ($bref, $oid, $type, $size) = @_;
+ my $xsmsg = {
+ blob => $oid,
+ kw => [ $kw_set[$size % scalar(@kw_set)] ],
+ };
+ $xkw->set_xkw(PublicInbox::Eml->new($bref), $xsmsg);
+ diag("msg $nr @ ".time) if ((++$nr % 10000) == 0);
+} if $ENV{TEST_FULL_EML};
+local $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb();
+local $SIG{USR1} = sub { diag "msg $nr" };
+
+my ($oid, $type, $size, $mid);
+while (<$cat>) {
+ ($oid, $type, $size) = split(/ /);
+ if ($full_eml) {
+ $git->cat_async($oid, $full_eml) if $type eq 'blob';
+ } else {
+ $mid = "$oid\@$type";
+ $$hdr = "Message-ID: <$mid>";
+ $smsg->{blob} = $oid;
+ delete $smsg->{mids4idx};
+ $kw->[0] = $kw_set[$size % scalar(@kw_set)];
+ $xkw->set_xkw($eml, $smsg);
+ }
+}
+$git->cat_async_wait;
+
+done_testing;
^ permalink raw reply related [flat|nested] 5+ messages in thread