From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 9F5D12007F for ; Tue, 27 Oct 2020 07:54:56 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 19/52] extsearchidx: initial implementation Date: Tue, 27 Oct 2020 07:54:20 +0000 Message-Id: <20201027075453.19163-20-e@80x24.org> In-Reply-To: <20201027075453.19163-1-e@80x24.org> References: <20201027075453.19163-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: It compiles... --- MANIFEST | 1 + lib/PublicInbox/ExtSearchIdx.pm | 311 ++++++++++++++++++++++++++++++++ t/extsearch.t | 1 + 3 files changed, 313 insertions(+) create mode 100644 lib/PublicInbox/ExtSearchIdx.pm diff --git a/MANIFEST b/MANIFEST index 60055d2b..418a2f17 100644 --- a/MANIFEST +++ b/MANIFEST @@ -122,6 +122,7 @@ lib/PublicInbox/Eml.pm lib/PublicInbox/EmlContentFoo.pm lib/PublicInbox/ExtMsg.pm lib/PublicInbox/ExtSearch.pm +lib/PublicInbox/ExtSearchIdx.pm lib/PublicInbox/FakeInotify.pm lib/PublicInbox/Feed.pm lib/PublicInbox/Filter/Base.pm diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm new file mode 100644 index 00000000..edf17974 --- /dev/null +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -0,0 +1,311 @@ +# Copyright (C) 2020 all contributors +# License: AGPL-3.0+ + +# Detached/external index cross inbox search indexing support +# read-write counterpart to PublicInbox::ExtSearch +# +# It's based on the same ideas as public-inbox-v2-format(5) using +# over.sqlite3 for dedupe and sharded Xapian. msgmap.sqlite3 is +# missing, so there is no Message-ID conflict resolution, meaning +# no NNTP support for now. +# +# v2 has a 1:1 mapping of index:inbox or msgmap for NNTP support. +# This is intended to be an M:N index:inbox mapping, but it'll likely +# be 1:N in common practice (M==1) + +package PublicInbox::ExtSearchIdx; +use strict; +use v5.10.1; +use parent qw(PublicInbox::ExtSearch PublicInbox::Lock); +use Carp qw(croak carp); +use PublicInbox::Search; +use PublicInbox::SearchIdx qw(crlf_adjust); +use PublicInbox::OverIdx; +use PublicInbox::V2Writable; +use PublicInbox::InboxWritable; +use PublicInbox::Eml; +use File::Spec; + +sub new { + my (undef, $dir, $opt, $shard) = @_; + my $l = $opt->{indexlevel} // 'full'; + $l !~ $PublicInbox::SearchIdx::INDEXLEVELS and + die "invalid indexlevel=$l\n"; + $l eq 'basic' and die "E: indexlevel=basic not yet supported\n"; + my $self = bless { + xpfx => "$dir/ei".PublicInbox::Search::SCHEMA_VERSION, + topdir => $dir, + creat => $opt->{creat}, + ibx_map => {}, # (newsgroup//inboxdir) => $ibx + ibx_list => [], + indexlevel => $l, + transact_bytes => 0, + total_bytes => 0, + current_info => '', + parallel => 1, + lock_path => "$dir/ei.lock", + }, __PACKAGE__; + $self->{shards} = $self->count_shards || nproc_shards($opt->{creat}); + my $oidx = PublicInbox::OverIdx->new("$self->{xpfx}/over.sqlite3"); + $oidx->{-no_fsync} = 1 if $opt->{-no_fsync}; + $self->{oidx} = $oidx; + $self +} + +sub attach_inbox { + my ($self, $ibx) = @_; + my $key = $ibx->eidx_key; + if (!$ibx->over || !$ibx->mm) { + warn "W: skipping $key (unindexed)\n"; + return; + } + if (!defined($ibx->uidvalidity)) { + warn "W: skipping $key (no UIDVALIDITY)\n"; + return; + } + my $ibxdir = File::Spec->canonpath($ibx->{inboxdir}); + if ($ibxdir ne $ibx->{inboxdir}) { + warn "W: `$ibx->{inboxdir}' canonicalized to `$ibxdir'\n"; + $ibx->{inboxdir} = $ibxdir; + } + $ibx = PublicInbox::InboxWritable->new($ibx); + $self->{ibx_map}->{$key} //= do { + push @{$self->{ibx_list}}, $ibx; + $ibx; + } +} + +sub _ibx_attach { # each_inbox callback + my ($ibx, $self) = @_; + attach_inbox($self, $ibx); +} + +sub attach_config { + my ($self, $cfg) = @_; + $cfg->each_inbox(\&_ibx_attach, $self); +} + +sub git_blob_digest ($) { + my ($bref) = @_; + my $dig = Digest::SHA->new(1); # XXX SHA256 later + $dig->add('blob '.length($$bref)."\0"); + $dig->add($$bref); + $dig; +} + +sub is_bad_blob ($$$$) { + my ($oid, $type, $size, $expect_oid) = @_; + if ($type ne 'blob') { + carp "W: $expect_oid is not a blob (type=$type)"; + return 1; + } + croak "BUG: $oid != $expect_oid" if $oid ne $expect_oid; + $size == 0 ? 1 : 0; # size == 0 means purged +} + +sub do_xpost ($$) { + my ($req, $smsg) = @_; + my $self = $req->{eidx}; + my $docid = $smsg->{num}; + my $idx = $self->idx_shard($docid); + my $oid = $req->{oid}; + my $xibx = $self->{sync}->{ibx}; + my $eml = $req->{eml}; + if (my $new_smsg = $req->{new_smsg}) { # 'm' on cross-posted message + my $xnum = $req->{xnum}; + $idx->shard_add_xref3($docid, $xnum, $oid, $xibx, $eml); + } else { # 'd' + $idx->shard_remove_xref3($docid, $oid, $xibx, $eml); + } +} + +sub index_unseen ($) { + my ($req) = @_; + my $new_smsg = $req->{new_smsg} or die 'BUG: {new_smsg} unset'; + $new_smsg->populate($req->{eml}, $req); + my $self = $req->{eidx}; + my $docid = $self->{oidx}->adj_counter('eidx_docid', '+'); + $new_smsg->{num} = $docid; + my $idx = $self->idx_shard($docid); + my $eml = delete $req->{eml}; + $self->{oidx}->add_overview($eml, $new_smsg); + $idx->index_raw(undef, $eml, $new_smsg, delete $new_smsg->{ibx}); +} + +sub do_finalize ($) { + my ($req) = @_; + if (my $indexed = $req->{indexed}) { + do_xpost($req, $_) for @$indexed; + } elsif (exists $req->{new_smsg}) { # totally unseen messsage + index_unseen($req); + } else { + warn "W: ignoring delete $req->{oid} (not found)\n"; + } +} + +sub do_step ($) { # main iterator for adding messages to the index + my ($req) = @_; + my $self = $req->{eidx}; + while (1) { + if (my $next_arg = $req->{next_arg}) { + if (my $smsg = $self->{oidx}->next_by_mid(@$next_arg)) { + $req->{cur_smsg} = $smsg; + $self->git->cat_async($smsg->{blob}, + \&ck_existing, $req); + return; # ck_existing calls do_step + } + delete $req->{cur_smsg}; + delete $req->{next_arg}; + } + my $mid = shift(@{$req->{mids}}); + last unless defined $mid; + my ($id, $prev); + $req->{next_arg} = [ $mid, \$id, \$prev ]; + # loop again + } + do_finalize($req); +} + +sub ck_existing { # git->cat_async callback + my ($bref, $oid, $type, $size, $req) = @_; + my $smsg = $req->{cur_smsg} or die 'BUG: {cur_smsg} missing'; + return if is_bad_blob($oid, $type, $size, $smsg->{blob}); + my $cur = PublicInbox::Eml->new($bref); + if (content_digest($cur) eq $req->{chash}) { + push @{$req->{indexed}}, $smsg; # for do_xpost + } # else { index_unseen later } + do_step($req); +} + +# is the messages visible in the inbox currently being indexed? +# return the number if so +sub cur_ibx_xnum ($$) { + my ($req, $bref) = @_; + my $ibx = $req->{sync}->{ibx} or die 'BUG: current {ibx} missing'; + + # XXX overkill? + git_blob_digest($bref)->hexdigest eq $req->{oid} or die + "BUG: blob mismatch $req->{oid}"; + + $req->{eml} = PublicInbox::Eml->new($bref); + $req->{chash} = content_hash($req->{eml}); + $req->{mids} = mids($req->{eml}); + my @q = @{$req->{mids}}; # copy + while (defined(my $mid = shift @q)) { + my ($id, $prev); + while (my $x = $ibx->over->next_by_mid($mid, \$id, \$prev)) { + return $x->{num} if $x->{blob} eq $req->{oid}; + } + } + undef; +} + +sub m_start { # git->cat_async callback for 'm' + my ($bref, $oid, $type, $size, $req) = @_; + return if is_bad_blob($oid, $type, $size, $req->{oid}); + my $new_smsg = $req->{new_smsg} = bless { + blob => $oid, + raw_bytes => $size, + }, 'PublicInbox::Smsg'; + $new_smsg->{bytes} = $new_smsg->{raw_bytes} + crlf_adjust($$bref); + defined($req->{xnum} = cur_ibx_xnum($req, $bref)) or return; + $new_smsg->{ibx} = $req->{sync}->{ibx}; + do_step($req); +} + +sub d_start { # git->cat_async callback for 'd' + my ($bref, $oid, $type, $size, $req) = @_; + return if is_bad_blob($oid, $type, $size, $req->{oid}); + return if defined(cur_ibx_xnum($req, $bref)); # was re-added + do_step($req); +} + +sub eidx_last_epoch_commit ($$$) { + my ($self, $sync, $epoch) = @_; + my $key = $sync->{ibx}->eidx_key; + my $uidvalidity = $sync->{ibx}->uidvalidity; + $self->{oidx}->eidx_meta("lc-v2:$key//$uidvalidity;$epoch"); +} + +sub _sync_inbox ($$$) { + my ($self, $opt, $ibx) = @_; + my $sync = { + need_checkpoint => \(my $bool = 0), + unindex_range => {}, # EPOCH => oid_old..oid_new + reindex => $opt->{reindex}, + -opt => $opt, + eidx => $self, + ibx => $ibx, + }; + my $key = $ibx->eidx_key; + my $u = $ibx->uidvalidity; + my $oidx = $self->{oidx}; + my $v = $ibx->version; + if ($v == 2) { + my $epoch_max; + defined($ibx->git_dir_latest(\$epoch_max)) or return; + my $heads = $sync->{ranges} = []; + for my $i (0..$epoch_max) { + $heads->[$i] = $oidx->eidx_meta("lc-v2:$key//$u;$i"); + } + + + } elsif ($v == 1) { + my $lc = $oidx->eidx_meta("lc-v1:$key//$u"); + prepare_stack($sync, $lc ? "$lc..HEAD" : 'HEAD'); + } else { + warn "E: $key unsupported inbox version (v$v)\n"; + return; + } +} + +sub eidx_sync { # main entry point + my ($self, $opt) = @_; + $self->idx_init($opt); # acquire lock via V2Writable::_idx_init + $self->{oidx}->rethread_prepare($opt); + + _sync_inbox($self, $opt, $_) for (@{$self->{ibx_list}}); +} + +sub idx_init { # similar to V2Writable + my ($self, $opt) = @_; + return if $self->{idx_shards}; + + $self->git->cleanup; + + my $ALL = $self->git->{git_dir}; # ALL.git + PublicInbox::Import::init_bare($ALL) unless -d $ALL; + my $info_dir = "$ALL/objects/info"; + my $alt = "$info_dir/alternates"; + my $mode = 0644; + my (%old, @old, %new, @new); + if (-e $alt) { + open(my $fh, '<', $alt) or die "open $alt: $!"; + $mode = (stat($fh))[2] & 07777; + while (<$fh>) { + push @old, $_ if !$old{$_}++; + } + } + for my $ibx (@{$self->{ibx_list}}) { + my $line = $ibx->git->{git_dir} . "/objects\n"; + next if $old{$line}; + $new{$line} = 1; + push @new, $line; + } + push @old, @new; + PublicInbox::V2Writable::write_alternates($info_dir, $mode, \@old); + $self->parallel_init($self->{indexlevel}); + $self->umask_prepare; + $self->with_umask(\&PublicInbox::V2Writable::_idx_init, $self, $opt); + $self->{oidx}->begin_lazy; + $self->{oidx}->eidx_prep; +} + +no warnings 'once'; +*done = \&PublicInbox::V2Writable::done; +*umask_prepare = \&PublicInbox::InboxWritable::umask_prepare; +*with_umask = \&PublicInbox::InboxWritable::with_umask; +*parallel_init = \&PublicInbox::V2Writable::parallel_init; +*nproc_shards = \&PublicInbox::V2Writable::nproc_shards; + +1; diff --git a/t/extsearch.t b/t/extsearch.t index 7687f5f0..54927c50 100644 --- a/t/extsearch.t +++ b/t/extsearch.t @@ -7,5 +7,6 @@ use PublicInbox::TestCommon; require_git(2.6); require_mods(qw(DBD::SQLite Search::Xapian)); use_ok 'PublicInbox::ExtSearch'; +use_ok 'PublicInbox::ExtSearchIdx'; done_testing;