"--save" may become the default when writing to a pathname or URL. "lei up" will be used to update the results of "--save". This only supports local externals at the moment, remote externals won't be able to avoid excess traffic easily. Usability improvements are coming... Eric Wong (5): lei_xsearch: use per-external queries when not sorting lei_dedupe: adjust to prepare for saved searches lei_query: rearrange internals to capture query early lei q: start wiring up saved search lei: add "lei up" to complement "lei q --save" MANIFEST | 4 + lib/PublicInbox/LEI.pm | 6 +- lib/PublicInbox/LeiDedupe.pm | 16 ++-- lib/PublicInbox/LeiQuery.pm | 59 +++++++------ lib/PublicInbox/LeiSavedSearch.pm | 142 ++++++++++++++++++++++++++++++ lib/PublicInbox/LeiToMail.pm | 18 ++-- lib/PublicInbox/LeiUp.pm | 46 ++++++++++ lib/PublicInbox/LeiXSearch.pm | 94 +++++++++++++------- t/lei-q-save.t | 25 ++++++ t/lei.t | 2 +- t/lei_dedupe.t | 11 ++- t/lei_saved_search.t | 10 +++ 12 files changed, 356 insertions(+), 77 deletions(-) create mode 100644 lib/PublicInbox/LeiSavedSearch.pm create mode 100644 lib/PublicInbox/LeiUp.pm create mode 100644 t/lei-q-save.t create mode 100644 t/lei_saved_search.t
We only need the combined mset query when we care about sort order. When writing to --output destinations intended for MUA consumption, sort order is irrelevant as MUAs are expected to offer their own sorting, so run queries to each external in parallel. This prepares us for docid-sort-based saved search support. It will also become faster than the combined mset query for users with many externals due to current Xapian exhibiting poor performance with many shards (the same reason -extindex exists) --- lib/PublicInbox/LeiXSearch.pm | 63 ++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 26 deletions(-) diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 692d5e54..9d367977 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -111,7 +111,7 @@ sub _mset_more ($$) { } # $startq will EOF when do_augment is done augmenting and allow -# query_mset and query_thread_mset to proceed. +# query_combined_mset and query_thread_mset to proceed. sub wait_startq ($) { my ($lei) = @_; my $startq = delete $lei->{startq} or return; @@ -144,9 +144,9 @@ sub mset_progress { } } -sub query_thread_mset { # for --threads +sub query_one_mset { # for --threads and l2m w/o sort my ($self, $ibxish) = @_; - local $0 = "$0 query_thread_mset"; + local $0 = "$0 query_one_mset"; my $lei = $self->{lei}; my ($srch, $over) = ($ibxish->search, $ibxish->over); my $desc = $ibxish->{inboxdir} // $ibxish->{topdir}; @@ -155,41 +155,51 @@ sub query_thread_mset { # for --threads my $mset; my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei); my $can_kw = !!$ibxish->can('msg_keywords'); - my $fl = $lei->{opt}->{threads} > 1 ? 1 : undef; + my $threads = $lei->{opt}->{threads} // 0; + my $fl = $threads > 1 ? 1 : undef; do { $mset = $srch->mset($mo->{qstr}, $mo); mset_progress($lei, $desc, $mset->size, $mset->get_matches_estimated); wait_startq($lei); # wait for keyword updates my $ids = $srch->mset_to_artnums($mset, $mo); - my $ctx = { ids => $ids }; my $i = 0; - my %n2item = map { ($ids->[$i++], $_) } $mset->items; - while ($over->expand_thread($ctx)) { - for my $n (@{$ctx->{xids}}) { - my $smsg = $over->get_art($n) or next; - my $mitem = delete $n2item{$smsg->{num}}; - next if $smsg->{bytes} == 0; - if ($mitem) { - if ($can_kw) { + if ($threads) { + my $ctx = { ids => $ids }; + my %n2item = map { ($ids->[$i++], $_) } $mset->items; + while ($over->expand_thread($ctx)) { + for my $n (@{$ctx->{xids}}) { + my $smsg = $over->get_art($n) or next; + my $mitem = delete $n2item{$n}; + next if $smsg->{bytes} == 0; + if ($mitem && $can_kw) { mitem_kw($smsg, $mitem, $fl); - } elsif ($fl) { + } elsif ($mitem && $fl) { # call ->xsmsg_vmd, later $smsg->{lei_q_tt_flagged} = 1; } + $each_smsg->($smsg, $mitem); } + @{$ctx->{xids}} = (); + } + } else { + my @items = $mset->items; + for my $n (@$ids) { + my $mitem = $items[$i++]; + my $smsg = $over->get_art($n) or next; + next if $smsg->{bytes} == 0; + mitem_kw($smsg, $mitem, $fl) if $can_kw; $each_smsg->($smsg, $mitem); } - @{$ctx->{xids}} = (); } } while (_mset_more($mset, $mo)); - undef $each_smsg; # drops @io for l2m->{each_smsg_done} + undef $each_smsg; # may commit $lei->{ovv}->ovv_atexit_child($lei); } -sub query_mset { # non-parallel for non-"--threads" users +sub query_combined_mset { # non-parallel for non-"--threads" users my ($self) = @_; - local $0 = "$0 query_mset"; + local $0 = "$0 query_combined_mset"; my $lei = $self->{lei}; my $mo = { %{$lei->{mset_opt}} }; my $mset; @@ -207,7 +217,7 @@ sub query_mset { # non-parallel for non-"--threads" users $each_smsg->($smsg, $mitem); } } while (_mset_more($mset, $mo)); - undef $each_smsg; # drops @io for l2m->{each_smsg_done} + undef $each_smsg; # may commit $lei->{ovv}->ovv_atexit_child($lei); } @@ -379,14 +389,14 @@ sub concurrency { $nl + $nr; } -sub start_query { # always runs in main (lei-daemon) process - my ($self) = @_; - if ($self->{threads}) { +sub start_query ($;$) { # always runs in main (lei-daemon) process + my ($self, $l2m) = @_; + if ($self->{opt_threads} || ($l2m && !$self->{opt_sort})) { for my $ibxish (locals($self)) { - $self->wq_io_do('query_thread_mset', [], $ibxish); + $self->wq_io_do('query_one_mset', [], $ibxish); } } elsif (locals($self)) { - $self->wq_io_do('query_mset', []); + $self->wq_io_do('query_combined_mset', []); } my $i = 0; my $q = []; @@ -402,7 +412,7 @@ sub start_query { # always runs in main (lei-daemon) process sub incr_start_query { # called whenever an l2m shard starts do_post_auth my ($self, $l2m) = @_; return if ++$self->{nr_start_query} != $l2m->{-wq_nr_workers}; - start_query($self); + start_query($self, $l2m); } sub ipc_atfork_child { @@ -448,7 +458,8 @@ sub do_query { my $op_c = delete $lei->{pkt_op_c}; delete $lei->{pkt_op_p}; @$end = (); - $self->{threads} = $lei->{opt}->{threads}; + $self->{opt_threads} = $lei->{opt}->{threads}; + $self->{opt_sort} = $lei->{opt}->{'sort'}; if ($l2m) { $l2m->net_merge_complete unless $lei->{auth}; } else {
LeiSavedSearch will use a LeiDedupe-like internal API, so we won't have to make as many changes to callsites between saved and unsaved searches. --- lib/PublicInbox/LeiDedupe.pm | 16 ++++++++-------- lib/PublicInbox/LeiToMail.pm | 6 +++--- t/lei_dedupe.t | 11 +++++++---- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm index a62b3a7c..378f748e 100644 --- a/lib/PublicInbox/LeiDedupe.pm +++ b/lib/PublicInbox/LeiDedupe.pm @@ -41,8 +41,8 @@ sub smsg_hash ($) { sub dedupe_oid ($) { my ($skv) = @_; (sub { # may be called in a child process - my ($eml, $oid) = @_; - $skv->set_maybe(_oidbin($oid) // _regen_oid($eml), ''); + my ($eml, $oidhex) = @_; + $skv->set_maybe(_oidbin($oidhex) // _regen_oid($eml), ''); }, sub { my ($smsg) = @_; $skv->set_maybe(_oidbin($smsg->{blob}), ''); @@ -53,9 +53,9 @@ sub dedupe_oid ($) { sub dedupe_mid ($) { my ($skv) = @_; (sub { # may be called in a child process - my ($eml, $oid) = @_; - # TODO: lei will support non-public messages w/o Message-ID - my $mid = $eml->header_raw('Message-ID') // _oidbin($oid) // + my ($eml, $oidhex) = @_; + # lei supports non-public drafts w/o Message-ID + my $mid = $eml->header_raw('Message-ID') // _oidbin($oidhex) // content_hash($eml); $skv->set_maybe($mid, ''); }, sub { @@ -71,7 +71,7 @@ sub dedupe_mid ($) { sub dedupe_content ($) { my ($skv) = @_; (sub { # may be called in a child process - my ($eml) = @_; # oid = $_[1], ignored + my ($eml) = @_; # $oidhex = $_[1], ignored $skv->set_maybe(content_hash($eml), ''); }, sub { my ($smsg) = @_; @@ -104,8 +104,8 @@ sub new { # returns true on seen messages according to the deduplication strategy, # returns false if unseen sub is_dup { - my ($self, $eml, $oid) = @_; - !$self->[1]->($eml, $oid); + my ($self, $eml, $smsg) = @_; + !$self->[1]->($eml, $smsg ? $smsg->{blob} : undef); } sub is_smsg_dup { diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 70164e40..7adbffe7 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -194,7 +194,7 @@ sub _mbox_write_cb ($$) { sub { # for git_to_mail my ($buf, $smsg, $eml) = @_; $eml //= PublicInbox::Eml->new($buf); - return if $dedupe->is_dup($eml, $smsg->{blob}); + return if $dedupe->is_dup($eml, $smsg); $lse->xsmsg_vmd($smsg) if $lse; $buf = $eml2mbox->($eml, $smsg); return atomic_append($lei, $buf) if $atomic_append; @@ -280,7 +280,7 @@ sub _maildir_write_cb ($$) { $lse->xsmsg_vmd($smsg) if $lse; return _buf2maildir($dst, $buf, $smsg) if !$dedupe; $eml //= PublicInbox::Eml->new($$buf); # copy buf - return if $dedupe->is_dup($eml, $smsg->{blob}); + return if $dedupe->is_dup($eml, $smsg); undef $eml; _buf2maildir($dst, $buf, $smsg); } @@ -299,7 +299,7 @@ sub _imap_write_cb ($$) { $mic // return $lei->fail; # mic may be undef-ed in last run if ($dedupe) { $eml //= PublicInbox::Eml->new($$bref); # copy bref - return if $dedupe->is_dup($eml, $smsg->{blob}); + return if $dedupe->is_dup($eml, $smsg); } $lse->xsmsg_vmd($smsg) if $lse; eval { $imap_append->($mic, $folder, $bref, $smsg, $eml) }; diff --git a/t/lei_dedupe.t b/t/lei_dedupe.t index bcb06a0a..e1944d02 100644 --- a/t/lei_dedupe.t +++ b/t/lei_dedupe.t @@ -74,10 +74,13 @@ ok(!$dd->is_dup($different), 'different is_dup with mid dedupe (augment)'); $different->header_set('Status', 'RO'); ok($dd->is_dup($different), 'different seen with oid dedupe Status removed'); -ok(!$dd->is_dup($eml, '01d'), '1st is_dup with oid dedupe'); -ok($dd->is_dup($different, '01d'), 'different content ignored if oid matches'); -ok($dd->is_dup($eml, '01D'), 'case insensitive oid comparison :P'); -ok(!$dd->is_dup($eml, '01dbad'), 'case insensitive oid comparison :P'); +$smsg = { blob => '01d' }; +ok(!$dd->is_dup($eml, $smsg), '1st is_dup with oid dedupe'); +ok($dd->is_dup($different, $smsg), 'different content ignored if oid matches'); +$smsg->{blob} = uc($smsg->{blob}); +ok($dd->is_dup($eml, $smsg), 'case insensitive oid comparison :P'); +$smsg->{blob} = '01dbad'; +ok(!$dd->is_dup($eml, $smsg), 'case insensitive oid comparison :P'); $smsg->{blob} = 'dead'; ok(!$dd->is_smsg_dup($smsg), 'smsg dedupe pass');
To support saved search, we need the query string available to us before we setup LeiDedupe via (LeiOverview || LeiToMail). --- lib/PublicInbox/LeiQuery.pm | 55 +++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index 9174bea8..224eba69 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -14,11 +14,39 @@ sub prep_ext { # externals_each callback sub _start_query { my ($self) = @_; + PublicInbox::LeiOverview->new($self) or return; + my $opt = $self->{opt}; + my ($xj, $mj) = split(/,/, $opt->{jobs} // ''); + if (defined($xj) && $xj ne '' && $xj !~ /\A[1-9][0-9]*\z/) { + return $self->fail("`$xj' search jobs must be >= 1"); + } + my $lxs = $self->{lxs}; + $xj ||= $lxs->concurrency($opt); # allow: "--jobs ,$WRITER_ONLY" + my $nproc = $lxs->detect_nproc // 1; # don't memoize, schedtool(1) exists + $xj = $nproc if $xj > $nproc; + $lxs->{-wq_nr_workers} = $xj; + if (defined($mj) && $mj !~ /\A[1-9][0-9]*\z/) { + return $self->fail("`$mj' writer jobs must be >= 1"); + } + my $l2m = $self->{l2m}; + if ($l2m && ($opt->{'import-remote'} //= 1) | + # we use \1 (a ref) to distinguish between + # user-supplied and default value + (($opt->{'import-before'} //= \1) ? 1 : 0)) { + $self->_lei_store(1)->write_prepare($self); + } + $l2m and $l2m->{-wq_nr_workers} = $mj // do { + $mj = POSIX::lround($nproc * 3 / 4); # keep some CPU for git + $mj <= 0 ? 1 : $mj; + }; + + # descending docid order is cheapest, MUA controls sorting order + $self->{mset_opt}->{relevance} //= -2 if $l2m || $opt->{threads}; if ($self->{net}) { require PublicInbox::LeiAuth; $self->{auth} = PublicInbox::LeiAuth->new } - $self->{lxs}->do_query($self); + $lxs->do_query($self); } sub qstr_add { # PublicInbox::InputPipe::consume callback for --stdin @@ -87,32 +115,9 @@ sub lei_q { my ($self, @argv) = @_; require PublicInbox::LeiOverview; PublicInbox::Config->json; # preload before forking - PublicInbox::LeiOverview->new($self) or return; my $lxs = lxs_prepare($self) or return; $self->ale->refresh_externals($lxs); my $opt = $self->{opt}; - my ($xj, $mj) = split(/,/, $opt->{jobs} // ''); - if (defined($xj) && $xj ne '' && $xj !~ /\A[1-9][0-9]*\z/) { - return $self->fail("`$xj' search jobs must be >= 1"); - } - $xj ||= $lxs->concurrency($opt); # allow: "--jobs ,$WRITER_ONLY" - my $nproc = $lxs->detect_nproc // 1; # don't memoize, schedtool(1) exists - $xj = $nproc if $xj > $nproc; - $lxs->{-wq_nr_workers} = $xj; - if (defined($mj) && $mj !~ /\A[1-9][0-9]*\z/) { - return $self->fail("`$mj' writer jobs must be >= 1"); - } - if ($self->{l2m} && ($opt->{'import-remote'} //= 1) | - # we use \1 (a ref) to distinguish between - # user-supplied and default value - (($opt->{'import-before'} //= \1) ? 1 : 0)) { - $self->_lei_store(1)->write_prepare($self); - } - $self->{l2m} and $self->{l2m}->{-wq_nr_workers} = $mj // do { - $mj = POSIX::lround($nproc * 3 / 4); # keep some CPU for git - $mj <= 0 ? 1 : $mj; - }; - my %mset_opt = map { $_ => $opt->{$_} } qw(threads limit offset); $mset_opt{asc} = $opt->{'reverse'} ? 1 : 0; $mset_opt{limit} //= 10000; @@ -127,8 +132,6 @@ sub lei_q { die "unrecognized --sort=$sort\n"; } } - # descending docid order is cheapest, MUA controls sorting order - $mset_opt{relevance} //= -2 if $self->{l2m} || $opt->{threads}; $self->{mset_opt} = \%mset_opt; if ($opt->{stdin}) {
This will have a over.sqlite3 for content-based deduplication. It may exhibit ibxish methods, so serving a read-only (or even R/W) IMAP or instance or displaying HTML isn't outside the realm of possibility. --- MANIFEST | 3 + lib/PublicInbox/LEI.pm | 4 +- lib/PublicInbox/LeiQuery.pm | 2 + lib/PublicInbox/LeiSavedSearch.pm | 134 ++++++++++++++++++++++++++++++ lib/PublicInbox/LeiToMail.pm | 10 ++- lib/PublicInbox/LeiXSearch.pm | 31 ++++++- t/lei-q-save.t | 12 +++ t/lei.t | 2 +- t/lei_saved_search.t | 10 +++ 9 files changed, 199 insertions(+), 9 deletions(-) create mode 100644 lib/PublicInbox/LeiSavedSearch.pm create mode 100644 t/lei-q-save.t create mode 100644 t/lei_saved_search.t diff --git a/MANIFEST b/MANIFEST index 12247ad2..20615abc 100644 --- a/MANIFEST +++ b/MANIFEST @@ -201,6 +201,7 @@ lib/PublicInbox/LeiOverview.pm lib/PublicInbox/LeiP2q.pm lib/PublicInbox/LeiQuery.pm lib/PublicInbox/LeiRemote.pm +lib/PublicInbox/LeiSavedSearch.pm lib/PublicInbox/LeiSearch.pm lib/PublicInbox/LeiStore.pm lib/PublicInbox/LeiStoreErr.pm @@ -393,12 +394,14 @@ t/lei-mirror.t t/lei-p2q.t t/lei-q-kw.t t/lei-q-remote-import.t +t/lei-q-save.t t/lei-q-thread.t t/lei-tag.t t/lei.t t/lei_dedupe.t t/lei_external.t t/lei_overview.t +t/lei_saved_search.t t/lei_store.t t/lei_to_mail.t t/lei_xsearch.t diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 475af8f0..7292d0f2 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -131,7 +131,7 @@ our %CMD = ( # sorted in order of importance/use: 'q' => [ '--stdin|SEARCH_TERMS...', 'search for messages matching terms', 'stdin|', # /|\z/ must be first for lone dash @lxs_opt, - qw(save-as=s output|mfolder|o=s format|f=s dedupe|d=s threads|t+ + qw(save output|mfolder|o=s format|f=s dedupe|d=s threads|t+ sort|s=s reverse|r offset=i pretty jobs|j=s globoff|g augment|a import-before! lock=s@ rsyncable alert=s@ mua=s verbose|v+), @c_opt, opt_dash('limit|n=i', '[0-9]+') ], @@ -249,7 +249,7 @@ my %OPTDESC = ( 'torsocks=s' => ['VAL|auto|no|yes', 'whether or not to wrap git and curl commands with torsocks'], 'no-torsocks' => 'alias for --torsocks=no', -'save-as=s' => ['NAME', 'save a search terms by given name'], +'save' => "save a search for `lei up'", 'import-remote!' => 'do not memoize remote messages into local store', 'type=s' => [ 'any|mid|git', 'disambiguate type' ], diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index 224eba69..8bca1020 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -53,6 +53,7 @@ sub qstr_add { # PublicInbox::InputPipe::consume callback for --stdin my ($self) = @_; # $_[1] = $rbuf if (defined($_[1])) { $_[1] eq '' and return eval { + $self->{mset_opt}->{q_raw} = $self->{mset_opt}->{qstr}; $self->{lse}->query_approxidate($self->{lse}->git, $self->{mset_opt}->{qstr}); _start_query($self); @@ -142,6 +143,7 @@ no query allowed on command-line with --stdin PublicInbox::InputPipe::consume($self->{0}, \&qstr_add, $self); return; } + $mset_opt{q_raw} = \@argv; $mset_opt{qstr} = $self->{lse}->query_argv_to_string($self->{lse}->git, \@argv); _start_query($self); diff --git a/lib/PublicInbox/LeiSavedSearch.pm b/lib/PublicInbox/LeiSavedSearch.pm new file mode 100644 index 00000000..ab9f393b --- /dev/null +++ b/lib/PublicInbox/LeiSavedSearch.pm @@ -0,0 +1,134 @@ +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# pretends to be like LeiDedupe and also PublicInbox::Inbox +package PublicInbox::LeiSavedSearch; +use strict; +use v5.10.1; +use parent qw(PublicInbox::Lock); +use PublicInbox::OverIdx; +use PublicInbox::LeiSearch; +use PublicInbox::Config; +use PublicInbox::Spawn qw(run_die); +use PublicInbox::ContentHash qw(content_hash git_sha); +use PublicInbox::Eml; +use PublicInbox::Hval qw(to_filename); + +sub new { + my ($cls, $lei, $dir) = @_; + my $self = bless { ale => $lei->ale, -cfg => {} }, $cls; + if (defined $dir) { # updating existing saved search + my $f = $self->{'-f'} = "$dir/lei.saved-search"; + -f $f && -r _ or + return $lei->fail("$f non-existent or unreadable"); + $self->{-cfg} = PublicInbox::Config::git_config_dump($f); + my $q = $lei->{mset_opt}->{q_raw} = $self->{-cfg}->{'lei.q'} // + return $lei->fail("lei.q unset in $f"); + my $lse = $lei->{lse} // die 'BUG: {lse} missing'; + $lei->{mset_opt}->{qstr} = ref($q) ? + $lse->query_argv_to_string($lse->git, $q) : + $lse->query_approxidate($lse->git, $q); + } else { # new saved search + my $saved_dir = $lei->store_path . '/../saved-searches/'; + my (@name) = ($lei->{ovv}->{dst} =~ m{([\w\-\.]+)/*\z}); + push @name, to_filename($lei->{mset_opt}->{qstr}); + $dir = $saved_dir . join('-', @name); + require File::Path; + File::Path::make_path($dir); # raises on error + $self->{'-f'} = "$dir/lei.saved-search"; + my $q = $lei->{mset_opt}->{q_raw}; + if (ref $q) { + cfg_set($self, '--add', 'lei.q', $_) for @$q; + } else { + cfg_set($self, 'lei.q', $q); + } + } + bless $self->{-cfg}, 'PublicInbox::Config'; + $self->{lock_path} = "$self->{-f}.flock"; + $self->{-ovf} = "$dir/over.sqlite3"; + $self; +} + +sub description { $_[0]->{qstr} } # for WWW + +sub cfg_set { + my ($self, @args) = @_; + my $lk = $self->lock_for_scope; # git-config doesn't wait + run_die([qw(git config -f), $self->{'-f'}, @args]); +} + +# drop-in for LeiDedupe API +sub is_dup { + my ($self, $eml, $smsg) = @_; + my $oidx = $self->{oidx} // die 'BUG: no {oidx}'; + my $blob = $smsg ? $smsg->{blob} : undef; + return 1 if $blob && $oidx->blob_exists($blob); + my $lk = $self->lock_for_scope_fast; + if (my $xoids = PublicInbox::LeiSearch::xoids_for($self, $eml, 1)) { + for my $docid (values %$xoids) { + $oidx->add_xref3($docid, -1, $blob, '.'); + } + $oidx->commit_lazy; + 1; + } else { + # n.b. above xoids_for fills out eml->{-lei_fake_mid} if needed + unless ($smsg) { + $smsg = bless {}, 'PublicInbox::Smsg'; + $smsg->{bytes} = 0; + $smsg->populate($eml); + } + $oidx->begin_lazy; + $smsg->{num} = $oidx->adj_counter('eidx_docid', '+'); + $smsg->{blob} //= git_sha(1, $eml)->hexdigest; + $oidx->add_overview($eml, $smsg); + $oidx->add_xref3($smsg->{num}, -1, $smsg->{blob}, '.'); + $oidx->commit_lazy; + undef; + } +} + +sub prepare_dedupe { + my ($self) = @_; + $self->{oidx} //= do { + my $creat = !-f $self->{-ovf}; + my $lk = $self->lock_for_scope; # git-config doesn't wait + my $oidx = PublicInbox::OverIdx->new($self->{-ovf}); + $oidx->{-no_fsync} = 1; + $oidx->dbh; + if ($creat) { + $oidx->{dbh}->do('PRAGMA journal_mode = WAL'); + $oidx->eidx_prep; # for xref3 + } + $oidx + }; +} + +sub over { $_[0]->{oidx} } # for xoids_for + +sub git { $_[0]->{ale}->git } + +sub pause_dedupe { + my ($self) = @_; + $self->{ale}->git->cleanup; + my $oidx = delete($self->{oidx}) // return; + $oidx->commit_lazy; +} + +sub mm { undef } + +sub altid_map { {} } + +sub cloneurl { [] } +no warnings 'once'; +*nntp_url = \&cloneurl; +*base_url = \&PublicInbox::Inbox::base_url; +*smsg_eml = \&PublicInbox::Inbox::smsg_eml; +*smsg_by_mid = \&PublicInbox::Inbox::smsg_by_mid; +*msg_by_mid = \&PublicInbox::Inbox::msg_by_mid; +*modified = \&PublicInbox::Inbox::modified; +*recent = \&PublicInbox::Inbox::recent; +*max_git_epoch = *nntp_usable = *msg_by_path = \&mm; # undef +*isrch = *search = \&mm; # TODO +*DESTROY = \&pause_dedupe; + +1; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 7adbffe7..bd2b714a 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -9,7 +9,6 @@ use parent qw(PublicInbox::IPC); use PublicInbox::Eml; use PublicInbox::ProcessPipe; use PublicInbox::Spawn qw(spawn); -use PublicInbox::LeiDedupe; use PublicInbox::PktOp qw(pkt_do); use Symbol qw(gensym); use IO::Handle; # ->autoflush @@ -350,7 +349,11 @@ sub new { die "bad mail --format=$fmt\n"; } $self->{dst} = $dst; - $lei->{dedupe} = PublicInbox::LeiDedupe->new($lei); + my $dd_cls = 'PublicInbox::'. + ($lei->{opt}->{save} ? 'LeiSavedSearch' : 'LeiDedupe'); + eval "require $dd_cls"; + die "$dd_cls: $@" if $@; + $lei->{dedupe} = $dd_cls->new($lei); $self; } @@ -368,6 +371,7 @@ sub _pre_augment_maildir { sub _do_augment_maildir { my ($self, $lei) = @_; + return if defined($lei->{opt}->{save}); my $dst = $lei->{ovv}->{dst}; my $lse = $lei->{opt}->{'import-before'} ? $lei->{lse} : undef; my $mdr = PublicInbox::MdirReader->new; @@ -398,6 +402,7 @@ sub _imap_augment_or_delete { # PublicInbox::NetReader::imap_each cb sub _do_augment_imap { my ($self, $lei) = @_; + return if defined($lei->{opt}->{save}); my $net = $lei->{net}; my $lse = $lei->{opt}->{'import-before'} ? $lei->{lse} : undef; if ($lei->{opt}->{augment}) { @@ -468,6 +473,7 @@ sub _do_augment_mbox { my ($self, $lei) = @_; return unless $self->{seekable}; my $opt = $lei->{opt}; + return if defined($opt->{save}); my $out = $lei->{1}; my ($fmt, $dst) = @{$lei->{ovv}}{qw(fmt dst)}; return unless -s $out; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 9d367977..7c540c1c 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -149,22 +149,38 @@ sub query_one_mset { # for --threads and l2m w/o sort local $0 = "$0 query_one_mset"; my $lei = $self->{lei}; my ($srch, $over) = ($ibxish->search, $ibxish->over); - my $desc = $ibxish->{inboxdir} // $ibxish->{topdir}; - return warn("$desc not indexed by Xapian\n") unless ($srch && $over); - my $mo = { %{$lei->{mset_opt}} }; + my $dir = $ibxish->{inboxdir} // $ibxish->{topdir}; + return warn("$dir not indexed by Xapian\n") unless ($srch && $over); + my $mo = { %{$lei->{mset_opt}} }; # copy my $mset; my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei); my $can_kw = !!$ibxish->can('msg_keywords'); my $threads = $lei->{opt}->{threads} // 0; my $fl = $threads > 1 ? 1 : undef; + my $lss = $lei->{dedupe}; + $lss = undef unless $lss && $lss->can('cfg_set'); # saved search + my $maxk = "external.$dir.maxnum"; + my $stop_at = $lss ? $lss->{-cfg}->{$maxk} : undef; + if (defined $stop_at) { + die "$maxk=$stop_at has multiple values" if ref $stop_at; + my @e; + local $SIG{__WARN__} = sub { push @e, @_ }; + $stop_at += 0; + return warn("$maxk=$stop_at: @e") if @e; + } + my $first_ids; do { $mset = $srch->mset($mo->{qstr}, $mo); - mset_progress($lei, $desc, $mset->size, + mset_progress($lei, $dir, $mset->size, $mset->get_matches_estimated); wait_startq($lei); # wait for keyword updates my $ids = $srch->mset_to_artnums($mset, $mo); + @$ids = grep { $_ > $stop_at } @$ids if defined($stop_at); my $i = 0; if ($threads) { + # copy $ids if $lss since over->expand_thread + # shifts @{$ctx->{ids}} + $first_ids = [ @$ids ] if $lss; my $ctx = { ids => $ids }; my %n2item = map { ($ids->[$i++], $_) } $mset->items; while ($over->expand_thread($ctx)) { @@ -183,6 +199,7 @@ sub query_one_mset { # for --threads and l2m w/o sort @{$ctx->{xids}} = (); } } else { + $first_ids = $ids; my @items = $mset->items; for my $n (@$ids) { my $mitem = $items[$i++]; @@ -193,6 +210,12 @@ sub query_one_mset { # for --threads and l2m w/o sort } } } while (_mset_more($mset, $mo)); + if ($lss && scalar(@$first_ids)) { + undef $stop_at; + my $max = $first_ids->[0]; + $lss->cfg_set($maxk, $max); + undef $lss; + } undef $each_smsg; # may commit $lei->{ovv}->ovv_atexit_child($lei); } diff --git a/t/lei-q-save.t b/t/lei-q-save.t new file mode 100644 index 00000000..56f7cb37 --- /dev/null +++ b/t/lei-q-save.t @@ -0,0 +1,12 @@ +#!perl -w +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; use v5.10.1; use PublicInbox::TestCommon; +test_lei(sub { + my $home = $ENV{HOME}; + lei_ok qw(import t/plack-qp.eml); + lei_ok qw(q --save z:0..), '-o', "$home/md/"; + my @s = glob("$home/.local/share/lei/saved-searches/md-*"); + is(scalar(@s), 1, 'got one saved search'); +}); +done_testing; diff --git a/t/lei.t b/t/lei.t index 2be9b4e8..6ade2f18 100644 --- a/t/lei.t +++ b/t/lei.t @@ -114,7 +114,7 @@ my $test_completion = sub { %out = map { $_ => 1 } split(/\s+/s, $lei_out); for my $sw (qw(-f --format -o --output --mfolder --augment -a --mua --no-local --local --verbose -v - --save-as --no-remote --remote --torsocks + --save --no-remote --remote --torsocks --reverse -r )) { ok($out{$sw}, "$sw offered as `lei q' completion"); } diff --git a/t/lei_saved_search.t b/t/lei_saved_search.t new file mode 100644 index 00000000..6d26cd2b --- /dev/null +++ b/t/lei_saved_search.t @@ -0,0 +1,10 @@ +#!perl -w +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; +use v5.10.1; +use PublicInbox::TestCommon; +require_mods(qw(DBD::SQLite)); +use_ok 'PublicInbox::LeiSavedSearch'; + +done_testing;
The command isn't finalized, yet, but it's intended to update an existing saved search. --- MANIFEST | 1 + lib/PublicInbox/LEI.pm | 2 ++ lib/PublicInbox/LeiQuery.pm | 2 +- lib/PublicInbox/LeiSavedSearch.pm | 24 ++++++++++------ lib/PublicInbox/LeiToMail.pm | 12 ++++---- lib/PublicInbox/LeiUp.pm | 46 +++++++++++++++++++++++++++++++ t/lei-q-save.t | 17 ++++++++++-- 7 files changed, 88 insertions(+), 16 deletions(-) create mode 100644 lib/PublicInbox/LeiUp.pm diff --git a/MANIFEST b/MANIFEST index 20615abc..1b7d16ee 100644 --- a/MANIFEST +++ b/MANIFEST @@ -208,6 +208,7 @@ lib/PublicInbox/LeiStoreErr.pm lib/PublicInbox/LeiSucks.pm lib/PublicInbox/LeiTag.pm lib/PublicInbox/LeiToMail.pm +lib/PublicInbox/LeiUp.pm lib/PublicInbox/LeiXSearch.pm lib/PublicInbox/Linkify.pm lib/PublicInbox/Listener.pm diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 7292d0f2..4b87c104 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -135,6 +135,8 @@ our %CMD = ( # sorted in order of importance/use: sort|s=s reverse|r offset=i pretty jobs|j=s globoff|g augment|a import-before! lock=s@ rsyncable alert=s@ mua=s verbose|v+), @c_opt, opt_dash('limit|n=i', '[0-9]+') ], +'up' => [ 'SEARCH_TERMS...', 'update saved search', + qw(jobs|j=s lock=s@ alert=s@ mua=s verbose|v+), @c_opt ], 'blob' => [ 'OID', 'show a git blob, reconstructing from mail if necessary', qw(git-dir=s@ cwd! verbose|v+ mail! oid-a|A=s path-a|a=s path-b|b=s), diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index 8bca1020..7456f7f9 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -12,7 +12,7 @@ sub prep_ext { # externals_each callback $lxs->prepare_external($loc) unless $exclude->{$loc}; } -sub _start_query { +sub _start_query { # used by "lei q" and "lei up" my ($self) = @_; PublicInbox::LeiOverview->new($self) or return; my $opt = $self->{opt}; diff --git a/lib/PublicInbox/LeiSavedSearch.pm b/lib/PublicInbox/LeiSavedSearch.pm index ab9f393b..815008fd 100644 --- a/lib/PublicInbox/LeiSavedSearch.pm +++ b/lib/PublicInbox/LeiSavedSearch.pm @@ -17,18 +17,12 @@ use PublicInbox::Hval qw(to_filename); sub new { my ($cls, $lei, $dir) = @_; my $self = bless { ale => $lei->ale, -cfg => {} }, $cls; - if (defined $dir) { # updating existing saved search + if (defined $dir) { # updating existing saved search via "lei up" my $f = $self->{'-f'} = "$dir/lei.saved-search"; -f $f && -r _ or return $lei->fail("$f non-existent or unreadable"); $self->{-cfg} = PublicInbox::Config::git_config_dump($f); - my $q = $lei->{mset_opt}->{q_raw} = $self->{-cfg}->{'lei.q'} // - return $lei->fail("lei.q unset in $f"); - my $lse = $lei->{lse} // die 'BUG: {lse} missing'; - $lei->{mset_opt}->{qstr} = ref($q) ? - $lse->query_argv_to_string($lse->git, $q) : - $lse->query_approxidate($lse->git, $q); - } else { # new saved search + } else { # new saved search "lei q --save" my $saved_dir = $lei->store_path . '/../saved-searches/'; my (@name) = ($lei->{ovv}->{dst} =~ m{([\w\-\.]+)/*\z}); push @name, to_filename($lei->{mset_opt}->{qstr}); @@ -42,6 +36,20 @@ sub new { } else { cfg_set($self, 'lei.q', $q); } + my $fmt = $lei->{opt}->{'format'}; + cfg_set($self, 'lei.q.format', $fmt) if defined $fmt; + cfg_set($self, 'lei.q.output', $lei->{opt}->{output}); + for my $k (qw(only include exclude)) { + my $ary = $lei->{opt}->{$k} // next; + for my $x (@$ary) { + cfg_set($self, '--add', "lei.q.$k", $x); + } + } + for my $k (qw(external local remote import-remote + import-before threads)) { + my $val = $lei->{opt}->{$k} // next; + cfg_set($self, "lei.q.$k", $val); + } } bless $self->{-cfg}, 'PublicInbox::Config'; $self->{lock_path} = "$self->{-f}.flock"; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index bd2b714a..4ebaf8f3 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -349,11 +349,13 @@ sub new { die "bad mail --format=$fmt\n"; } $self->{dst} = $dst; - my $dd_cls = 'PublicInbox::'. - ($lei->{opt}->{save} ? 'LeiSavedSearch' : 'LeiDedupe'); - eval "require $dd_cls"; - die "$dd_cls: $@" if $@; - $lei->{dedupe} = $dd_cls->new($lei); + $lei->{dedupe} = $lei->{lss} // do { + my $dd_cls = 'PublicInbox::'. + ($lei->{opt}->{save} ? 'LeiSavedSearch' : 'LeiDedupe'); + eval "require $dd_cls"; + die "$dd_cls: $@" if $@; + $dd_cls->new($lei); + }; $self; } diff --git a/lib/PublicInbox/LeiUp.pm b/lib/PublicInbox/LeiUp.pm new file mode 100644 index 00000000..386a7566 --- /dev/null +++ b/lib/PublicInbox/LeiUp.pm @@ -0,0 +1,46 @@ +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# "lei up" - updates the result of "lei q --save" +package PublicInbox::LeiUp; +use strict; +use v5.10.1; +use PublicInbox::LeiSavedSearch; +use PublicInbox::LeiOverview; + +sub lei_up { + my ($lei, $dir) = @_; + $lei->{lse} = $lei->_lei_store(1)->search; + my $lss = PublicInbox::LeiSavedSearch->new($lei, $dir) or return; + my $mset_opt = $lei->{mset_opt} = { relevance => -2 }; + $mset_opt->{limit} = $lei->{opt}->{limit} // 10000; + my $q = $mset_opt->{q_raw} = $lss->{-cfg}->{'lei.q'} // + return $lei->fail("lei.q unset in $lss->{-f}"); + my $lse = $lei->{lse} // die 'BUG: {lse} missing'; + if (ref($q)) { + $mset_opt->{qstr} = $lse->query_argv_to_string($lse->git, $q); + } else { + $lse->query_approxidate($lse->git, $mset_opt->{qstr} = $q); + } + $lei->{opt}->{output} = $lss->{-cfg}->{'lei.q.output'} // + return $lei->fail("lei.q.output unset in $lss->{-f}"); + $lei->{opt}->{'format'} //= $lss->{-cfg}->{'lei.q.format'}; # optional + + my $to_avref = $lss->{-cfg}->can('_array'); + for my $k (qw(only include exclude)) { + my $v = $lss->{-cfg}->{"lei.q.$k"} // next; + $lei->{opt}->{$k} = $to_avref->($v); + } + for my $k (qw(external local remote + import-remote import-before threads)) { + my $v = $lss->{-cfg}->{"lei.q.$k"} // next; + $lei->{opt}->{$k} = $v; + } + $lei->{lss} = $lss; # for LeiOverview->new + my $lxs = $lei->lxs_prepare or return; + $lei->ale->refresh_externals($lxs); + $lei->{opt}->{save} = 1; + $lei->_start_query; +} + +1; diff --git a/t/lei-q-save.t b/t/lei-q-save.t index 56f7cb37..a6d579cf 100644 --- a/t/lei-q-save.t +++ b/t/lei-q-save.t @@ -2,11 +2,24 @@ # Copyright (C) 2021 all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> use strict; use v5.10.1; use PublicInbox::TestCommon; +my $doc1 = eml_load('t/plack-qp.eml'); +my $doc2 = eml_load('t/utf8.eml'); test_lei(sub { my $home = $ENV{HOME}; - lei_ok qw(import t/plack-qp.eml); - lei_ok qw(q --save z:0..), '-o', "$home/md/"; + lei_ok qw(import -q t/plack-qp.eml); + lei_ok qw(q -q --save z:0..), '-o', "$home/md/"; + my %before = map { $_ => 1 } glob("$home/md/cur/*"); + is_deeply(eml_load((keys %before)[0]), $doc1, 'doc1 matches'); + my @s = glob("$home/.local/share/lei/saved-searches/md-*"); is(scalar(@s), 1, 'got one saved search'); + + # ensure "lei up" works, since it compliments "lei q --save" + lei_ok qw(import t/utf8.eml); + lei_ok qw(up), $s[0]; + my %after = map { $_ => 1 } glob("$home/md/cur/*"); + is(delete $after{(keys(%before))[0]}, 1, 'original message kept'); + is(scalar(keys %after), 1, 'one new message added'); + is_deeply(eml_load((keys %after)[0]), $doc2, 'doc2 matches'); }); done_testing;
Eric Wong <e@80x24.org> wrote:
> --- a/lib/PublicInbox/LeiXSearch.pm
> +++ b/lib/PublicInbox/LeiXSearch.pm
> @@ -149,22 +149,38 @@ sub query_one_mset { # for --threads and l2m w/o sort
> local $0 = "$0 query_one_mset";
> my $lei = $self->{lei};
> my ($srch, $over) = ($ibxish->search, $ibxish->over);
> - my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
> - return warn("$desc not indexed by Xapian\n") unless ($srch && $over);
> - my $mo = { %{$lei->{mset_opt}} };
> + my $dir = $ibxish->{inboxdir} // $ibxish->{topdir};
> + return warn("$dir not indexed by Xapian\n") unless ($srch && $over);
> + my $mo = { %{$lei->{mset_opt}} }; # copy
> my $mset;
> my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
> my $can_kw = !!$ibxish->can('msg_keywords');
> my $threads = $lei->{opt}->{threads} // 0;
> my $fl = $threads > 1 ? 1 : undef;
> + my $lss = $lei->{dedupe};
> + $lss = undef unless $lss && $lss->can('cfg_set'); # saved search
> + my $maxk = "external.$dir.maxnum";
Eh, that should probably be "maxuid" since "num" is more
strongly associated with "NNTP article number". extindex may
only be easily exposed via IMAP and HTTP (since we Message-IDs
may conflict with different list trailers).
Eric Wong <e@80x24.org> wrote: > Eh, that should probably be "maxuid" since "num" is more > strongly associated with "NNTP article number". extindex may > only be easily exposed via IMAP and HTTP (since we Message-IDs > may conflict with different list trailers). Pushed with the following squashed in: diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 7c540c1c..e1538391 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -159,7 +159,7 @@ sub query_one_mset { # for --threads and l2m w/o sort my $fl = $threads > 1 ? 1 : undef; my $lss = $lei->{dedupe}; $lss = undef unless $lss && $lss->can('cfg_set'); # saved search - my $maxk = "external.$dir.maxnum"; + my $maxk = "external.$dir.maxuid"; my $stop_at = $lss ? $lss->{-cfg}->{$maxk} : undef; if (defined $stop_at) { die "$maxk=$stop_at has multiple values" if ref $stop_at;