From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-3.9 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id C2CCD1F9FD for ; Wed, 3 Mar 2021 14:01:39 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [RFC] lei_xkw: eXternal KeyWord index Date: Wed, 3 Mar 2021 14:01:39 +0000 Message-Id: <20210303140139.7637-1-e@80x24.org> In-Reply-To: <20210224204950.GA2076@dcvr> References: <20210224204950.GA2076@dcvr> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This will be yet another new, persistent on-disk format in the upcoming release. It'll be used for storing per-message keywords (AKA flags). They're not fleshed out at all with typical overview info; just OIDs, Message-IDs, and the keywords themselves. It's not wired into lei/store, yet; but that's the plan. With some extensive stress testing, this is 2.5GB after ~11.8M messages with one keyword-per-message set. --- MANIFEST | 4 ++ lib/PublicInbox/LeiXkw.pm | 87 ++++++++++++++++++++++++++++++++++++ lib/PublicInbox/LeiXkwIdx.pm | 85 +++++++++++++++++++++++++++++++++++ lib/PublicInbox/SearchIdx.pm | 6 ++- t/lei_xkw.t | 40 +++++++++++++++++ xt/lei_xkw_stress.t | 57 +++++++++++++++++++++++ 6 files changed, 278 insertions(+), 1 deletion(-) create mode 100644 lib/PublicInbox/LeiXkw.pm create mode 100644 lib/PublicInbox/LeiXkwIdx.pm create mode 100644 t/lei_xkw.t create mode 100644 xt/lei_xkw_stress.t diff --git a/MANIFEST b/MANIFEST index 8c9c86a0..751af8ff 100644 --- a/MANIFEST +++ b/MANIFEST @@ -193,6 +193,8 @@ lib/PublicInbox/LeiSearch.pm lib/PublicInbox/LeiStore.pm lib/PublicInbox/LeiToMail.pm lib/PublicInbox/LeiXSearch.pm +lib/PublicInbox/LeiXkw.pm +lib/PublicInbox/LeiXkwIdx.pm lib/PublicInbox/Linkify.pm lib/PublicInbox/Listener.pm lib/PublicInbox/Lock.pm @@ -384,6 +386,7 @@ t/lei_external.t t/lei_overview.t t/lei_store.t t/lei_to_mail.t +t/lei_xkw.t t/lei_xsearch.t t/linkify.t t/main-bin/spamc @@ -478,6 +481,7 @@ xt/imapd-mbsync-oimap.t xt/imapd-validate.t xt/lei-auth-fail.t xt/lei-sigpipe.t +xt/lei_xkw_stress.t xt/mem-imapd-tls.t xt/mem-msgview.t xt/msgtime_cmp.t diff --git a/lib/PublicInbox/LeiXkw.pm b/lib/PublicInbox/LeiXkw.pm new file mode 100644 index 00000000..038e1cc2 --- /dev/null +++ b/lib/PublicInbox/LeiXkw.pm @@ -0,0 +1,87 @@ +# Copyright (C) 2021 all contributors +# License: AGPL-3.0+ + +# read-only counterpart to LeiXkwIdx +package PublicInbox::LeiXkw; +use strict; +use v5.10.1; +use parent qw(PublicInbox::Search); +use PublicInbox::ContentHash qw(content_hash); +use PublicInbox::Eml; +use PublicInbox::Search qw(xap_terms); +use PublicInbox::MID qw(mids_for_index); + +sub new { + my ($cls, $dir, $opt) = @_; + bless { xpfx => $dir, git => $opt->{git} }, $cls; +} + +sub _content_cmp { # git->cat_async callback + my ($bref, $oid, $type, $size, $cmp) = @_; + if ($bref) { + my $existing = content_hash(PublicInbox::Eml->new($bref)); + return if $cmp->{expect} ne $existing; + push @{$cmp->{hits}}, $oid; + } else { + push @{$cmp->{gone}}, $oid; + } +} + +sub _docids_by_mids ($$$) { + my ($self, $eml, $smsg) = @_; + my $cmp = { expect => content_hash($eml), hits => [] }; + my $mids = $smsg->{mids4idx} //= mids_for_index($eml); + my $xdb = $self->{xdb}; + my $git = $self->{git}; + my %oid2docid; + for my $mid (@$mids) { # typically 1 + my $head = $xdb->postlist_begin('Q'.$mid); + my $tail = $xdb->postlist_end('Q'.$mid); + for (; $head != $tail; $head++) { + my $docid = $head->get_docid; + my $oids = xap_terms('U', $xdb, $docid); + for my $oid (keys %$oids) { + next if exists $oid2docid{$oid}; + $oid2docid{$oid} = $docid; + $git->cat_async($oid, \&_content_cmp, $cmp); + } + } + } + $git->cat_async_wait; + for my $oid (@{$cmp->{gone} // []}) { + my $docid = $oid2docid{$oid} // die "BUG $oid not mapped"; + $xdb->delete_document($docid); + } + map { $oid2docid{$_} // die "BUG $_ miss (@$mids)" } @{$cmp->{hits}}; +} + +sub docids_for ($$$) { + my ($self, $eml, $smsg) = @_; + my $xdb = $self->{xdb}; + my $oid = $smsg->{blob} // die 'BUG: no blob'; + my $head = $xdb->postlist_begin('U'.$oid); + my $tail = $xdb->postlist_end('U'.$oid); + return ($head->get_docid) if $head != $tail; + _docids_by_mids($self, $eml, $smsg); +} + +sub xdb_shards_flat { # for ->xdb + my ($self) = @_; + PublicInbox::Search::load_xapian(); + ($PublicInbox::Search::X{Database}->new($self->{xpfx})); +} + +sub get_xkw { + my ($self, $eml, $smsg) = @_; + $self->xdb; + my @docids = docids_for($self, $eml, $smsg); + my %all; + # unusual for @docids > 1, but dedupe can change + for my $docid (@docids) { + my $terms = xap_terms('K', $self->{xdb}, $docid); + %all = (%all, %$terms); + } + sort keys %all +} + +1; diff --git a/lib/PublicInbox/LeiXkwIdx.pm b/lib/PublicInbox/LeiXkwIdx.pm new file mode 100644 index 00000000..8f53c3ef --- /dev/null +++ b/lib/PublicInbox/LeiXkwIdx.pm @@ -0,0 +1,85 @@ +# Copyright (C) 2021 all contributors +# License: AGPL-3.0+ + +# eXternal keyword index - for dealing with keyword storage on +# read-only local external inboxes and extindex. +# +# Only boolean term prefixes: +# Q - Message-ID (like SearchIdx) +# U - git blob OID (URL) +# K - keywords +package PublicInbox::LeiXkwIdx; +use strict; +use v5.10.1; +use parent qw(PublicInbox::SearchIdx); +use PublicInbox::Search qw(xap_terms); +use PublicInbox::InboxWritable; +use PublicInbox::LeiXkw; + +*docids_for = \&PublicInbox::LeiXkw::docids_for; + +sub new { + my ($cls, $dir, $opt) = @_; + PublicInbox::SearchIdx::load_xapian_writable(); + my $fl = $PublicInbox::SearchIdx::DB_CREATE_OR_OPEN; + $fl |= $PublicInbox::SearchIdx::DB_NO_SYNC if $opt->{-no_fsync}; + bless { + xpfx => $dir, xdb_flags => $fl, umask => 077, + indexlevel => 'full', creat => 1, git => $opt->{git}, + }, $cls; +} + +sub set_xkw { + my ($self, $eml, $smsg, $method) = @_; + my $kw = $smsg->{kw} // die 'BUG: no {kw}'; + $self->begin_txn_lazy; + my @docids = docids_for($self, $eml, $smsg); + if (!@docids) { # brand new + return () unless scalar(@$kw); + my $doc = $PublicInbox::Search::X{Document}->new; + $doc->add_boolean_term('U' . $smsg->{blob}); + $doc->add_boolean_term('K' . $_) for @$kw; + for my $mid (@{$smsg->{mids4idx}}) { + $doc->add_boolean_term('Q' . $mid); + } + return ($self->{xdb}->add_document($doc)); + # modify existing + } elsif (scalar @$kw) { + $method //= 'set_keywords'; # | add_keywords | remove_keywords + for my $docid (@docids) { + $self->$method($docid, @$kw); + } + # we fell back to mids matching, speed up future matches + if ($self->{mids4idx}) { + for my $docid (@docids) { + my $doc = $self->{xdb}->get_document($docid); + $doc->add_boolean_term('U' . $smsg->{blob}); + $self->{xdb}->replace_document($docid, $doc); + } + } + } else { + for my $docid (@docids) { + $self->{xdb}->delete_document($docid); + } + } + @docids; +} + +sub xdb { $_[0]->begin_txn_lazy } # for get_xkw + +sub clear_xkw { + my ($self, $eml, $smsg) = @_; + $self->begin_txn_lazy; + my @docids = docids_for($self, $eml, $smsg); + for my $docid (@docids) { + $self->{xdb}->delete_document($docid); + } + @docids +} + +no warnings 'once'; +*DESTROY = \&PublicInbox::SearchIdx::commit_txn_lazy; +*with_umask = \&PublicInbox::InboxWritable::with_umask; +*get_xkw = \&PublicInbox::LeiXkw::get_xkw; + +1; diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 826302de..bba89ae7 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -136,7 +136,11 @@ sub idx_acquire { } } return unless defined $flag; - $flag |= $DB_NO_SYNC if ($self->{ibx} // $self->{eidx})->{-no_fsync}; + if (defined(my $fl = $self->{xdb_flags})) { + $flag |= $fl; # LeiXkwIdx (and future classes) + } elsif (($self->{ibx} // $self->{eidx})->{-no_fsync}) { + $flag |= $DB_NO_SYNC; + } my $xdb = eval { ($X->{WritableDatabase})->new($dir, $flag) }; croak "Failed opening $dir: $@" if $@; $self->{xdb} = $xdb; diff --git a/t/lei_xkw.t b/t/lei_xkw.t new file mode 100644 index 00000000..ea21a0fe --- /dev/null +++ b/t/lei_xkw.t @@ -0,0 +1,40 @@ +#!perl -w +# Copyright (C) 2021 all contributors +# License: AGPL-3.0+ +use strict; use v5.10.1; use PublicInbox::TestCommon; +require_mods(qw(json DBD::SQLite Search::Xapian)); +use_ok 'PublicInbox::LeiXkwIdx'; +use PublicInbox::Git; +my ($tmpdir, $for_destroy) = tmpdir; +my ($ro_home, $cfg_path) = setup_public_inboxes; +my $git = PublicInbox::Git->new("$ro_home/t1"); +my $xkw = PublicInbox::LeiXkwIdx->new("$tmpdir/xkw", { git => $git }); +my $smsg = { + blob => '9bf1002c49eb075df47247b74d69bcd555e23422', + kw => ['seen'], +}; +my $req = { blob => $smsg->{blob} }; +my $eml = eml_load('t/utf8.eml'); +is_deeply([$xkw->get_xkw($eml, $smsg)], [], 'no keywords, yet'); +is($xkw->{xdb}->get_doccount, 0, 'no documents created'); + +my @d = $xkw->set_xkw($eml, $smsg); +is(scalar(@d), 1, 'set one docid'); +like($d[0], qr/\A\d+\z/, 'set a numeric docid'); + +is_deeply([$xkw->get_xkw($eml, $req)], ['seen'], 'got keywords back'); + +$smsg->{kw} = [qw(seen answered)]; +is_deeply([$xkw->set_xkw($eml, $smsg)], \@d, 'updated existing doc'); +is_deeply([$xkw->get_xkw($eml, $req)], [qw(answered seen)], + 'got keywords back'); +undef $xkw; + +$xkw = PublicInbox::LeiXkwIdx->new("$tmpdir/xkw", { git => $git }); +is_deeply([$xkw->get_xkw($eml, $req)], [qw(answered seen)], + 'got keywords back after auto-commit'); + +is_deeply([$xkw->clear_xkw($eml, $req)], \@d, 'keywords cleared'); +is_deeply([$xkw->get_xkw($eml, $req)], [], 'keywords gone'); + +done_testing; diff --git a/xt/lei_xkw_stress.t b/xt/lei_xkw_stress.t new file mode 100644 index 00000000..69f66d07 --- /dev/null +++ b/xt/lei_xkw_stress.t @@ -0,0 +1,57 @@ +#!perl -w +# Copyright (C) 2021 all contributors +# License: AGPL-3.0+ +use strict; use v5.10.1; use PublicInbox::TestCommon; +use Time::HiRes qw(time); +use PublicInbox::Eml; +my $dir = $ENV{TEST_LEI_XKW_DIR} or + plan skip_all => 'TEST_LEI_XKW_DIR unset'; +my $git_dir = $ENV{GIANT_GIT_DIR} // `git rev-parse --git-dir` or + plan skip_all => 'GIANT_GIT_DIR unset'; +require_mods(qw(json DBD::SQLite Search::Xapian)); +use_ok 'PublicInbox::LeiXkwIdx'; +my @cat = qw(cat-file --buffer --batch-check --batch-all-objects); +if (require_git(2.19, 1)) { + push @cat, '--unordered'; +} else { + warn "git <2.19, cat-file lacks --unordered, locality suffers\n"; +} +chomp $git_dir; +my $xkw = PublicInbox::LeiXkwIdx->new($dir, { -no_fsync => 1 }); +my $git = $xkw->{git} = PublicInbox::Git->new($git_dir); +my $eml = PublicInbox::Eml->new(''); +my $hdr = $eml->{hdr}; +my $cat = $git->popen(@cat); +my $kw = []; +my $smsg = { kw => $kw }; +my @kw_set = qw(seen answered flagged draft); +my $nr = 0; +my $full_eml = sub { + my ($bref, $oid, $type, $size) = @_; + my $xsmsg = { + blob => $oid, + kw => [ $kw_set[$size % scalar(@kw_set)] ], + }; + $xkw->set_xkw(PublicInbox::Eml->new($bref), $xsmsg); + diag("msg $nr @ ".time) if ((++$nr % 10000) == 0); +} if $ENV{TEST_FULL_EML}; +local $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb(); +local $SIG{USR1} = sub { diag "msg $nr" }; + +my ($oid, $type, $size, $mid); +while (<$cat>) { + ($oid, $type, $size) = split(/ /); + if ($full_eml) { + $git->cat_async($oid, $full_eml) if $type eq 'blob'; + } else { + $mid = "$oid\@$type"; + $$hdr = "Message-ID: <$mid>"; + $smsg->{blob} = $oid; + delete $smsg->{mids4idx}; + $kw->[0] = $kw_set[$size % scalar(@kw_set)]; + $xkw->set_xkw($eml, $smsg); + } +} +$git->cat_async_wait; + +done_testing;