From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id EF6E81F934 for ; Sat, 23 Jan 2021 10:27:55 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 01/10] lei: move external vivification to xsearch Date: Sat, 23 Jan 2021 10:27:46 +0000 Message-Id: <20210123102755.425-2-e@80x24.org> In-Reply-To: <20210123102755.425-1-e@80x24.org> References: <20210123102755.425-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This seems like a better place to put it given upcoming URI support, which starts in this commit. --- lib/PublicInbox/LeiQuery.pm | 27 +++++------------ lib/PublicInbox/LeiXSearch.pm | 57 ++++++++++++++++++++++++----------- t/lei_xsearch.t | 5 ++- 3 files changed, 50 insertions(+), 39 deletions(-) diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index 7d634b5e..eebf217b 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -7,19 +7,6 @@ use strict; use v5.10.1; use PublicInbox::DS qw(dwaitpid); -sub _vivify_external { # _externals_each callback - my ($src, $dir) = @_; - if (-f "$dir/ei.lock") { - require PublicInbox::ExtSearch; - push @$src, PublicInbox::ExtSearch->new($dir); - } elsif (-f "$dir/inbox.lock" || -d "$dir/public-inbox") { # v2, v1 - require PublicInbox::Inbox; - push @$src, bless { inboxdir => $dir }, 'PublicInbox::Inbox'; - } else { - warn "W: ignoring $dir, unable to determine type\n"; - } -} - # the main "lei q SEARCH_TERMS" method sub lei_q { my ($self, @argv) = @_; @@ -27,19 +14,19 @@ sub lei_q { require PublicInbox::LeiOverview; PublicInbox::Config->json; # preload before forking my $opt = $self->{opt}; - my @srcs; # any number of LeiXSearch || LeiSearch || Inbox + my $lxs = $self->{lxs} = PublicInbox::LeiXSearch->new; + # any number of LeiXSearch || LeiSearch || Inbox if ($opt->{'local'} //= 1) { # --local is enabled by default my $sto = $self->_lei_store(1); - push @srcs, $sto->search; + $lxs->prepare_external($sto->search); } - my $lxs = $self->{lxs} = PublicInbox::LeiXSearch->new; # --external is enabled by default, but allow --no-external if ($opt->{external} //= 1) { - $self->_externals_each(\&_vivify_external, \@srcs); + my $cb = $lxs->can('prepare_external'); + $self->_externals_each($cb, $lxs); } - my $xj = $opt->{jobs} // (scalar(@srcs) > 3 ? 3 : scalar(@srcs)); - $xj = 1 if !$opt->{thread}; + my $xj = $opt->{thread} ? $lxs->locals : ($lxs->remotes + 1); my $ovv = PublicInbox::LeiOverview->new($self) or return; $self->atfork_prepare_wq($lxs); $lxs->wq_workers_start('lei_xsearch', $xj, $self->oldset); @@ -76,7 +63,7 @@ sub lei_q { $mset_opt{relevance} //= -2 if $opt->{thread}; $self->{mset_opt} = \%mset_opt; $ovv->ovv_begin($self); - $lxs->do_query($self, \@srcs); + $lxs->do_query($self); } 1; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 987a9896..10c25246 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -26,10 +26,6 @@ sub new { sub attach_external { my ($self, $ibxish) = @_; # ibxish = ExtSearch or Inbox - - if (!$ibxish->can('over') || !$ibxish->over) { - return push(@{$self->{remotes}}, $ibxish) - } my $desc = $ibxish->{inboxdir} // $ibxish->{topdir}; my $srch = $ibxish->search or return warn("$desc not indexed for Xapian\n"); @@ -59,10 +55,9 @@ sub attach_external { } # returns a list of local inboxes (or count in scalar context) -sub locals { - my %uniq = map {; "$_" => $_ } @{$_[0]->{shard2ibx} // []}; - values %uniq; -} +sub locals { @{$_[0]->{locals} // []} } + +sub remotes { @{$_[0]->{remotes} // []} } # called by PublicInbox::Search::xdb sub xdb_shards_flat { @{$_[0]->{shards_flat} // []} } @@ -148,14 +143,16 @@ sub query_thread_mset { # for --thread } sub query_mset { # non-parallel for non-"--thread" users - my ($self, $lei, $srcs) = @_; + my ($self, $lei) = @_; local $0 = "$0 query_mset"; my $startq = delete $self->{5}; my %sig = $lei->atfork_child_wq($self); local @SIG{keys %sig} = values %sig; my $mo = { %{$lei->{mset_opt}} }; my $mset; - $self->attach_external($_) for @$srcs; + for my $loc (locals($self)) { + attach_external($self, $loc); + } my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $self); my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing'; $dedupe->prepare_dedupe; @@ -172,6 +169,10 @@ sub query_mset { # non-parallel for non-"--thread" users $lei->{ovv}->ovv_atexit_child($lei); } +sub query_remote_mboxrd { + my ($self, $lei, $uri) = @_; +} + sub git { my ($self) = @_; my (%seen, @dirs); @@ -221,18 +222,17 @@ sub do_post_augment { } sub start_query { # always runs in main (lei-daemon) process - my ($self, $io, $lei, $srcs) = @_; - my $remotes = $self->{remotes} // []; + my ($self, $io, $lei) = @_; if ($lei->{opt}->{thread}) { - for my $ibxish (@$srcs) { + for my $ibxish (locals($self)) { $self->wq_do('query_thread_mset', $io, $lei, $ibxish); } } else { - $self->wq_do('query_mset', $io, $lei, $srcs); + $self->wq_do('query_mset', $io, $lei); } # TODO - for my $rmt (@$remotes) { - $self->wq_do('query_thread_mbox', $io, $lei, $rmt); + for my $uri (remotes($self)) { + $self->wq_do('query_remote_mboxrd', $io, $lei, $uri); } @$io = (); } @@ -259,7 +259,7 @@ sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers } sub do_query { - my ($self, $lei_orig, $srcs) = @_; + my ($self, $lei_orig) = @_; my ($lei, @io) = $lei_orig->atfork_parent_wq($self); $io[0] = undef; pipe(my $done, $io[0]) or die "pipe $!"; @@ -286,7 +286,7 @@ sub do_query { $io[5] = $startq; $io[1] = $zpipe->[1] if $zpipe; } - start_query($self, \@io, $lei, $srcs); + start_query($self, \@io, $lei); $self->wq_close(1); unless ($in_loop) { # for the $lei->atfork_child_wq PIPE handler: @@ -302,4 +302,25 @@ sub ipc_atfork_prepare { $self->SUPER::ipc_atfork_prepare; # PublicInbox::IPC } +sub prepare_external { + my ($self, $loc, $boost) = @_; # n.b. already ordered by boost + if (ref $loc) { # already a URI, or PublicInbox::Inbox-like object + return push(@{$self->{remotes}}, $loc) if $loc->can('scheme'); + } elsif ($loc =~ m!\Ahttps?://!) { + require URI; + return push(@{$self->{remotes}}, URI->new($loc)); + } elsif (-f "$loc/ei.lock") { + require PublicInbox::ExtSearch; + $loc = PublicInbox::ExtSearch->new($loc); + } elsif (-f "$loc/inbox.lock" || -d "$loc/public-inbox") { + require PublicInbox::Inbox; # v2, v1 + $loc = bless { inboxdir => $loc }, 'PublicInbox::Inbox'; + } else { + warn "W: ignoring $loc, unable to determine type\n"; + return; + } + push @{$self->{locals}}, $loc; +} + + 1; diff --git a/t/lei_xsearch.t b/t/lei_xsearch.t index 8b03c1f2..f745ea3e 100644 --- a/t/lei_xsearch.t +++ b/t/lei_xsearch.t @@ -49,7 +49,10 @@ $eidx->eidx_sync({fsync => 0}); my $es = PublicInbox::ExtSearch->new("$home/eidx"); my $lxs = PublicInbox::LeiXSearch->new; for my $ibxish (shuffle($es, @ibx)) { - $lxs->attach_external($ibxish); + $lxs->prepare_external($ibxish); +} +for my $loc ($lxs->locals) { + $lxs->attach_external($loc); } my $nr = $lxs->xdb->get_doccount; my $mset = $lxs->mset('d:19931002..19931003', { limit => $nr });