From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 111881FA13 for ; Sat, 16 Jan 2021 11:36:25 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 3/4] lei: q: results output to Maildir and mbox* working Date: Fri, 15 Jan 2021 23:36:23 -1200 Message-Id: <20210116113624.19930-4-e@80x24.org> In-Reply-To: <20210116113624.19930-1-e@80x24.org> References: <20210116113624.19930-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: All the augment and deduplication stuff seems to be working based on unit tests. OpPipe is a nice general addition that will probably make future state machines easier. --- MANIFEST | 1 + lib/PublicInbox/LEI.pm | 27 +++++--- lib/PublicInbox/LeiDedupe.pm | 3 +- lib/PublicInbox/LeiOverview.pm | 44 ++++++++---- lib/PublicInbox/LeiQuery.pm | 14 ++-- lib/PublicInbox/LeiToMail.pm | 89 ++++++++++++++++--------- lib/PublicInbox/LeiXSearch.pm | 118 ++++++++++++++++++++++++--------- lib/PublicInbox/OpPipe.pm | 41 ++++++++++++ t/lei.t | 20 ++++++ t/lei_to_mail.t | 4 +- 10 files changed, 266 insertions(+), 95 deletions(-) create mode 100644 lib/PublicInbox/OpPipe.pm diff --git a/MANIFEST b/MANIFEST index 0ebdaccc..0de1de4a 100644 --- a/MANIFEST +++ b/MANIFEST @@ -193,6 +193,7 @@ lib/PublicInbox/NNTPD.pm lib/PublicInbox/NNTPdeflate.pm lib/PublicInbox/NewsWWW.pm lib/PublicInbox/OnDestroy.pm +lib/PublicInbox/OpPipe.pm lib/PublicInbox/Over.pm lib/PublicInbox/OverIdx.pm lib/PublicInbox/ProcessPipe.pm diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 5568904d..f849c9df 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -256,7 +256,9 @@ sub puts ($;@) { print { shift->{1} } map { "$_\n" } @_ } sub out ($;@) { print { shift->{1} } @_ } sub err ($;@) { - print { shift->{2} } @_, (substr($_[-1], -1, 1) eq "\n" ? () : "\n"); + my $self = shift; + my $err = $self->{2} // *STDERR{IO}; + print $err @_, (substr($_[-1], -1, 1) eq "\n" ? () : "\n"); } sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) } @@ -270,8 +272,11 @@ sub fail ($$;$) { sub atfork_prepare_wq { my ($self, $wq) = @_; - push @{$wq->{-ipc_atfork_child_close}}, @TO_CLOSE_ATFORK_CHILD, - grep { defined } @$self{qw(0 1 2 sock)} + my $tcafc = $wq->{-ipc_atfork_child_close}; + push @$tcafc, @TO_CLOSE_ATFORK_CHILD; + if (my $sock = $self->{sock}) { + push @$tcafc, @$self{qw(0 1 2)}, $sock; + } } # usage: my %sig = $lei->atfork_child_wq($wq); @@ -286,7 +291,9 @@ sub atfork_child_wq { PIPE => sub { $self->x_it(13); # SIGPIPE = 13 # we need to close explicitly to avoid Perl warning on SIGPIPE - close($_) for (delete @$self{1..2}); + close(delete $self->{1}); + # regular files and /dev/null (-c) won't trigger SIGPIPE + close(delete $self->{2}) unless (-f $self->{2} || -c _); syswrite($self->{0}, '!') unless $self->{sock}; # for eof_wait die bless(\"$_[0]", 'PublicInbox::SIGPIPE'), }); @@ -641,7 +648,7 @@ sub start_pager { $new_env{MORE} = 'FRX' if $^O eq 'freebsd'; pipe(my ($r, $wpager)) or return warn "pipe: $!"; my $rdr = { 0 => $r, 1 => $self->{1}, 2 => $self->{2} }; - my $pgr = [ undef, @$rdr{1, 2} ]; + my $pgr = [ undef, @$rdr{1, 2}, $$ ]; if (my $sock = $self->{sock}) { # lei(1) process runs it delete @new_env{keys %$env}; # only set iff unset my $buf = "exec 1\0".$pager; @@ -664,7 +671,7 @@ sub stop_pager { # do not restore original stdout, just close it so we error out close(delete($self->{1})) if $self->{1}; my $pid = $pgr->[0]; - dwaitpid($pid, undef, $self->{sock}) if $pid; + dwaitpid($pid, undef, $self->{sock}) if $pid && $pgr->[3] == $$; } sub accept_dispatch { # Listener {post_accept} callback @@ -706,7 +713,7 @@ sub accept_dispatch { # Listener {post_accept} callback sub dclose { my ($self) = @_; delete $self->{lxs}; # stops LeiXSearch queries - $self->close; # PublicInbox::DS::close + $self->close if $self->{sock}; # PublicInbox::DS::close } # for long-running results @@ -737,8 +744,10 @@ sub event_step { sub event_step_init { my ($self) = @_; - $self->{sock}->blocking(0); - $self->SUPER::new($self->{sock}, EPOLLIN|EPOLLET); + if (my $sock = $self->{sock}) { # using DS->EventLoop + $sock->blocking(0); + $self->SUPER::new($sock, EPOLLIN|EPOLLET); + } } sub noop {} diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm index 81754361..3f478aa4 100644 --- a/lib/PublicInbox/LeiDedupe.pm +++ b/lib/PublicInbox/LeiDedupe.pm @@ -89,8 +89,9 @@ sub true { 1 } sub dedupe_none ($) { (\&true, \&true) } sub new { - my ($cls, $lei, $dst) = @_; + my ($cls, $lei) = @_; my $dd = $lei->{opt}->{dedupe} // 'content'; + my $dst = $lei->{ovv}->{dst}; # allow "none" to bypass Eml->new if writing to directory: return if ($dd eq 'none' && substr($dst // '', -1) eq '/'); diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index 9846bc8a..c0b423f6 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -15,6 +15,8 @@ use PublicInbox::MID qw($MID_EXTRACT); use PublicInbox::Address qw(pairs); use PublicInbox::Config; use PublicInbox::Search qw(get_pct); +use PublicInbox::LeiDedupe; +use PublicInbox::LeiToMail; # cf. https://en.wikipedia.org/wiki/JSON_streaming my $JSONL = 'ldjson|ndjson|jsonl'; # 3 names for the same thing @@ -44,7 +46,7 @@ sub new { my $fmt = $opt->{'format'}; $fmt = lc($fmt) if defined $fmt; - if ($dst =~ s/\A([a-z]+)://is) { # e.g. Maildir:/home/user/Mail/ + if ($dst =~ s/\A([a-z0-9]+)://is) { # e.g. Maildir:/home/user/Mail/ my $ofmt = lc $1; $fmt //= $ofmt; return $lei->fail(<<"") if $fmt ne $ofmt; @@ -52,13 +54,14 @@ sub new { } $fmt //= 'json' if $dst eq '/dev/stdout'; - $fmt //= 'maildir'; # TODO + $fmt //= 'maildir'; if (index($dst, '://') < 0) { # not a URL, so assume path $dst = File::Spec->canonpath($dst); } # else URL my $self = bless { fmt => $fmt, dst => $dst }, $class; + $lei->{ovv} = $self; my $json; if ($fmt =~ /\A($JSONL|(?:concat)?json)\z/) { $json = $self->{json} = ref(PublicInbox::Config->json); @@ -75,11 +78,13 @@ sub new { } else { ovv_out_lk_init($self); } - } elsif ($json) { - return $lei->fail('JSON formats only output to stdout'); - } else { - return $lei->fail("TODO: $dst -f $fmt"); } + if (!$json) { + # default to the cheapest sort since MUA usually resorts + $lei->{opt}->{'sort'} //= 'docid' if $dst ne '/dev/stdout'; + $lei->{l2m} = PublicInbox::LeiToMail->new($lei); + } + $lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei); $self; } @@ -135,9 +140,13 @@ sub _unbless_smsg { sub ovv_atexit_child { my ($self, $lei) = @_; + if (my $git = delete $self->{git}) { + $git->async_wait_all; + } if (my $bref = delete $lei->{ovv_buf}) { + my $out = $lei->{1} or return; my $lk = $self->lock_for_scope; - print { $lei->{1} } $$bref; + print $out $$bref; } } @@ -167,17 +176,28 @@ sub _json_pretty { qq{ "$k": }.$v; } -sub ovv_each_smsg_cb { - my ($self, $lei) = @_; +sub ovv_each_smsg_cb { # runs in wq worker usually + my ($self, $lei, $ibxish) = @_; $lei->{ovv_buf} = \(my $buf = ''); delete(@$self{qw(lock_path tmp_lk_id)}) unless $lei->{-parallel}; - my $json = $self->{json}->new; + my $json; $lei->{1}->autoflush(1); - if ($json) { + if (my $pkg = $self->{json}) { + $json = $pkg->new; $json->utf8->canonical; $json->ascii(1) if $lei->{opt}->{ascii}; } - if ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) { + if (my $l2m = $lei->{l2m}) { + my $wcb = $l2m->write_cb($lei); + my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git + $self->{git} = $git; # for ovv_atexit_child + my $g2m = $l2m->can('git_to_mail'); + sub { + my ($smsg, $mitem) = @_; + my $kw = []; # TODO get from mitem + $git->cat_async($smsg->{blob}, $g2m, [ $wcb, $kw ]); + }; + } elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) { my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},"; sub { # DIY prettiness :P my ($smsg, $mitem) = @_; diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index 69d2f9a6..a80d5887 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -23,8 +23,6 @@ sub _vivify_external { # _externals_each callback # the main "lei q SEARCH_TERMS" method sub lei_q { my ($self, @argv) = @_; - my $sto = $self->_lei_store(1); - my $cfg = $self->_lei_cfg(1); my $opt = $self->{opt}; # --local is enabled by default @@ -32,7 +30,7 @@ sub lei_q { my @srcs; require PublicInbox::LeiXSearch; require PublicInbox::LeiOverview; - require PublicInbox::LeiDedupe; + PublicInbox::Config->json; my $lxs = PublicInbox::LeiXSearch->new; # --external is enabled by default, but allow --no-external @@ -46,10 +44,10 @@ sub lei_q { $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset) // $lxs->wq_workers($j); - unshift(@srcs, $sto->search) if $opt->{'local'}; # no forking workers after this - $self->{ovv} = PublicInbox::LeiOverview->new($self); - $self->{dd} = PublicInbox::LeiDedupe->new($self); + my $ovv = PublicInbox::LeiOverview->new($self) or return; + my $sto = $self->_lei_store(1); + unshift(@srcs, $sto->search) if $opt->{'local'}; my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset); $mset_opt{asc} = $opt->{'reverse'} ? 1 : 0; $mset_opt{qstr} = join(' ', map {; @@ -69,12 +67,10 @@ sub lei_q { die "unrecognized --sort=$sort\n"; } } - # $self->out($json->encode(\%mset_opt)); # descending docid order $mset_opt{relevance} //= -2 if $opt->{thread}; - # my $wcb = PublicInbox::LeiToMail->write_cb($out, $self); $self->{mset_opt} = \%mset_opt; - $self->{ovv}->ovv_begin($self); + $ovv->ovv_begin($self); $lxs->do_query($self, \@srcs); } diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 5d4b7978..744f331d 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -187,8 +187,9 @@ sub zsfx2cmd ($$$) { \@cmd; } -sub compress_dst { - my ($self, $zsfx, $lei) = @_; +sub _post_augment_mbox { # open a compressor process + my ($self, $lei) = @_; + my $zsfx = $self->{zsfx} or return; my $cmd = zsfx2cmd($zsfx, undef, $lei); pipe(my ($r, $w)) or die "pipe: $!"; my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2} }; @@ -209,7 +210,9 @@ sub decompress_src ($$$) { sub dup_src ($) { my ($in) = @_; - open my $dup, '+>>&', $in or die "dup: $!"; + # fileno needed because wq_set_recv_modes only used ">&=" for {1} + # and Perl blindly trusts that to reject the '+' (readability flag) + open my $dup, '+>>&=', fileno($in) or die "dup: $!"; $dup; } @@ -321,11 +324,13 @@ sub new { } else { die "bad mail --format=$fmt\n"; } - my $dedupe = $lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei, $dst); + $lei->{dedupe} = PublicInbox::LeiDedupe->new($lei); $self; } -sub _prepare_maildir { +sub _pre_augment_maildir {} # noop + +sub _do_augment_maildir { my ($self, $lei) = @_; my $dst = $lei->{ovv}->{dst}; if ($lei->{opt}->{augment}) { @@ -338,6 +343,11 @@ sub _prepare_maildir { } else { # clobber existing Maildir _maildir_each_file($dst, \&_unlink); } +} + +sub _post_augment_maildir { + my ($self, $lei) = @_; + my $dst = $lei->{ovv}->{dst}; for my $x (qw(tmp new cur)) { my $d = $dst.$x; next if -d $d; @@ -347,45 +357,64 @@ sub _prepare_maildir { } } -sub _prepare_mbox { +sub _pre_augment_mbox { my ($self, $lei) = @_; my $dst = $lei->{ovv}->{dst}; - my ($out, $seekable); - if ($dst eq '/dev/stdout') { - $out = $lei->{1}; - } else { + if ($dst ne '/dev/stdout') { my $mode = -p $dst ? '>' : '+>>'; if (-f _ && !$lei->{opt}->{augment} and !unlink($dst)) { $! == ENOENT or die "unlink($dst): $!"; } - open $out, $mode, $dst or die "open($dst): $!"; - # Perl does SEEK_END even with O_APPEND :< - $seekable = seek($out, 0, SEEK_SET); - die "seek($dst): $!\n" if !$seekable && $! != ESPIPE; + open my $out, $mode, $dst or die "open($dst): $!"; $lei->{1} = $out; } + # Perl does SEEK_END even with O_APPEND :< + $self->{seekable} = seek($lei->{1}, 0, SEEK_SET); + if (!$self->{seekable} && $! != ESPIPE && $dst ne '/dev/stdout') { + die "seek($dst): $!\n"; + } state $zsfx_allow = join('|', keys %zsfx2cmd); - my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/); + ($self->{zsfx}) = ($dst =~ /\.($zsfx_allow)\z/); +} + +sub _do_augment_mbox { + my ($self, $lei) = @_; + return if !$lei->{opt}->{augment}; my $dedupe = $lei->{dedupe}; - if ($lei->{opt}->{augment}) { - die "cannot augment $dst, not seekable\n" if !$seekable; - if (-s $out && $dedupe && $dedupe->prepare_dedupe) { - my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) : - dup_src($out); - my $fmt = $lei->{ovv}->{fmt}; - require PublicInbox::MboxReader; - PublicInbox::MboxReader->$fmt($rd, \&_augment, $lei); - } - # maybe some systems don't honor O_APPEND, Perl does this: - seek($out, 0, SEEK_END) or die "seek $dst: $!"; - $dedupe->pause_dedupe if $dedupe; + my $dst = $lei->{ovv}->{dst}; + die "cannot augment $dst, not seekable\n" if !$self->{seekable}; + my $out = $lei->{1}; + if (-s $out && $dedupe && $dedupe->prepare_dedupe) { + my $zsfx = $self->{zsfx}; + my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) : + dup_src($out); + my $fmt = $lei->{ovv}->{fmt}; + require PublicInbox::MboxReader; + PublicInbox::MboxReader->$fmt($rd, \&_augment, $lei); } - compress_dst($self, $zsfx, $lei) if $zsfx; + # maybe some systems don't honor O_APPEND, Perl does this: + seek($out, 0, SEEK_END) or die "seek $dst: $!"; + $dedupe->pause_dedupe if $dedupe; +} + +sub pre_augment { # fast (1 disk seek), runs in main daemon + my ($self, $lei) = @_; + # _pre_augment_maildir, _pre_augment_mbox + my $m = "_pre_augment_$self->{base_type}"; + $self->$m($lei); +} + +sub do_augment { # slow, runs in wq worker + my ($self, $lei) = @_; + # _do_augment_maildir, _do_augment_mbox + my $m = "_do_augment_$self->{base_type}"; + $self->$m($lei); } -sub do_prepare { +sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon my ($self, $lei) = @_; - my $m = "_prepare_$self->{base_type}"; + # _post_augment_maildir, _post_augment_mbox + my $m = "_post_augment_$self->{base_type}"; $self->$m($lei); } diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 8b70167c..9563ad63 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -9,6 +9,10 @@ use strict; use v5.10.1; use parent qw(PublicInbox::LeiSearch PublicInbox::IPC); use PublicInbox::DS qw(dwaitpid); +use PublicInbox::OpPipe; +use PublicInbox::Import; +use File::Temp 0.19 (); # 0.19 for ->newdir +use File::Spec (); sub new { my ($class) = @_; @@ -103,9 +107,9 @@ sub query_thread_mset { # for --thread } my $mo = { %{$lei->{mset_opt}} }; my $mset; - my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei); - my $dd = $lei->{dd}; - $dd->prepare_dedupe; + my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $ibxish); + my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing'; + $dedupe->prepare_dedupe; do { $mset = $srch->mset($mo->{qstr}, $mo); my $ids = $srch->mset_to_artnums($mset, $mo); @@ -115,7 +119,7 @@ sub query_thread_mset { # for --thread while ($over->expand_thread($ctx)) { for my $n (@{$ctx->{xids}}) { my $smsg = $over->get_art($n) or next; - next if $dd->is_smsg_dup($smsg); + next if $dedupe->is_smsg_dup($smsg); my $mitem = delete $n2item{$smsg->{num}}; $each_smsg->($smsg, $mitem); } @@ -132,65 +136,113 @@ sub query_mset { # non-parallel for non-"--thread" users my $mo = { %{$lei->{mset_opt}} }; my $mset; $self->attach_external($_) for @$srcs; - my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei); - my $dd = $lei->{dd}; - $dd->prepare_dedupe; + my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei, $self); + my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing'; + $dedupe->prepare_dedupe; do { $mset = $self->mset($mo->{qstr}, $mo); for my $it ($mset->items) { my $smsg = smsg_for($self, $it) or next; - next if $dd->is_smsg_dup($smsg); + next if $dedupe->is_smsg_dup($smsg); $each_smsg->($smsg, $it); } } while (_mset_more($mset, $mo)); $lei->{ovv}->ovv_atexit_child($lei); } -sub query_done { # PublicInbox::EOFpipe callback +sub git { + my ($self) = @_; + my (%seen, @dirs); + my $tmp = File::Temp->newdir('lei_xsrch_git-XXXXXXXX', TMPDIR => 1); + for my $ibx (@{$self->{shard2ibx} // []}) { + my $d = File::Spec->canonpath($ibx->git->{git_dir}); + $seen{$d} //= push @dirs, "$d/objects\n" + } + my $git_dir = $tmp->dirname; + PublicInbox::Import::init_bare($git_dir); + my $f = "$git_dir/objects/info/alternates"; + open my $alt, '>', $f or die "open($f): $!"; + print $alt @dirs or die "print $f: $!"; + close $alt or die "close $f: $!"; + my $git = PublicInbox::Git->new($git_dir); + $git->{-tmp} = $tmp; + $git; +} + +sub query_done { # EOF callback my ($lei) = @_; $lei->{ovv}->ovv_end($lei); $lei->dclose; } -sub do_query { - my ($self, $lei_orig, $srcs) = @_; - my ($lei, @io) = $lei_orig->atfork_parent_wq($self); +sub start_query { # always runs in main (lei-daemon) process + my ($self, $io, $lei, $srcs) = @_; + if (my $l2m = $lei->{l2m}) { + $lei->{1} = $io->[1]; + $l2m->post_augment($lei); + $io->[1] = delete $lei->{1}; + } my $remotes = $self->{remotes} // []; - pipe(my ($eof_wait, $qry_done)) or die "pipe $!"; - $io[0] = $qry_done; # don't need stdin - if ($lei->{opt}->{thread}) { $lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1; for my $ibxish (@$srcs) { - $self->wq_do('query_thread_mset', \@io, $lei, $ibxish); + $self->wq_do('query_thread_mset', $io, $lei, $ibxish); } } else { $lei->{-parallel} = scalar(@$remotes); - $self->wq_do('query_mset', \@io, $lei, $srcs); + $self->wq_do('query_mset', $io, $lei, $srcs); } # TODO for my $rmt (@$remotes) { - $self->wq_do('query_thread_mbox', \@io, $lei, $rmt); + $self->wq_do('query_thread_mbox', $io, $lei, $rmt); } - @io = (); - close $qry_done; # fully closed when children are done - - # query_done will run when query_*mset close $qry_done - if ($lei_orig->{sock}) { # watch for client premature exit - require PublicInbox::EOFpipe; - PublicInbox::EOFpipe->new($eof_wait, \&query_done, $lei_orig); - $lei_orig->{lxs} = $self; - $lei_orig->event_step_init; + close $io->[0]; # qry_status_wr + @$io = (); +} + +sub query_prepare { # wq_do + my ($self, $lei) = @_; + my %sig = $lei->atfork_child_wq($self); + local @SIG{keys %sig} = values %sig; + if (my $l2m = $lei->{l2m}) { + eval { $l2m->do_augment($lei) }; + return $lei->fail($@) if $@; + } + # trigger PublicInbox::OpPipe->event_step + my $qry_status_wr = $lei->{0} or + return $lei->fail('BUG: qry_status_wr missing'); + $qry_status_wr->autoflush(1); + print $qry_status_wr '.' or # this should never fail... + return $lei->fail("BUG? print qry_status_wr: $!"); +} + +sub do_query { + my ($self, $lei_orig, $srcs) = @_; + my ($lei, @io) = $lei_orig->atfork_parent_wq($self); + $io[0] = undef; + pipe(my $qry_status_rd, $io[0]) or die "pipe $!"; + + $lei_orig->{lxs} = $self; + $lei_orig->event_step_init; # wait for shutdowns + my $op_map = { '' => [ \&query_done, $lei_orig ] }; + my $in_loop = exists $lei_orig->{sock}; + my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop); + if (my $l2m = $lei->{l2m}) { + $l2m->pre_augment($lei_orig); # may redirect $lei->{1} for mbox + $io[1] = $lei_orig->{1}; + $op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ]; + $self->wq_do('query_prepare', \@io, $lei); + $opp->event_step if !$in_loop; } else { + start_query($self, \@io, $lei, $srcs); + } + unless ($in_loop) { my @pids = $self->wq_close; - # wait for close($lei->{0}) - if (read($eof_wait, my $buf, 1)) { - # if we get a SIGPIPE from one, kill the rest - kill('TERM', @pids) if $buf eq '!'; - } + # for the $lei->atfork_child_wq PIPE handler: + $op_map->{'!'} = [ \&CORE::kill, 'TERM', @pids ]; + $opp->event_step; my $ipc_worker_reap = $self->can('ipc_worker_reap'); dwaitpid($_, $ipc_worker_reap, $self) for @pids; - query_done($lei_orig); # may SIGPIPE } } diff --git a/lib/PublicInbox/OpPipe.pm b/lib/PublicInbox/OpPipe.pm new file mode 100644 index 00000000..295a8aa5 --- /dev/null +++ b/lib/PublicInbox/OpPipe.pm @@ -0,0 +1,41 @@ +# Copyright (C) 2021 all contributors +# License: AGPL-3.0+ + +# bytecode dispatch pipe, reads a byte, runs a sub +# byte => [ sub, @operands ] +package PublicInbox::OpPipe; +use strict; +use v5.10.1; +use parent qw(PublicInbox::DS); +use PublicInbox::Syscall qw(EPOLLIN); + +sub new { + my ($cls, $rd, $op_map, $in_loop) = @_; + my $self = bless { sock => $rd, op_map => $op_map }, $cls; + # 1031: F_SETPIPE_SZ, 4096: page size + fcntl($rd, 1031, 4096) if $^O eq 'linux'; + if ($in_loop) { # iff using DS->EventLoop + $rd->blocking(0); + $self->SUPER::new($rd, EPOLLIN); + } + $self; +} + +sub event_step { + my ($self) = @_; + my $rd = $self->{sock}; + my $byte; + until (defined(sysread($rd, $byte, 1))) { + return if $!{EAGAIN}; + next if $!{EINTR}; + die "read \$rd: $!"; + } + my $op = $self->{op_map}->{$byte} or die "BUG: unknown byte `$byte'"; + if ($byte eq '') { # close on EOF + $rd->blocking ? delete($self->{sock}) : $self->close; + } + my ($sub, @args) = @$op; + $sub->(@args); +} + +1; diff --git a/t/lei.t b/t/lei.t index 2349dca4..c4692217 100644 --- a/t/lei.t +++ b/t/lei.t @@ -7,6 +7,7 @@ use Test::More; use PublicInbox::TestCommon; use PublicInbox::Config; use File::Path qw(rmtree); +use Fcntl qw(SEEK_SET); require_git 2.6; require_mods(qw(json DBD::SQLite Search::Xapian)); my $opt = { 1 => \(my $out = ''), 2 => \(my $err = '') }; @@ -188,6 +189,25 @@ my $test_external = sub { # No double-quoting should be imposed on users on the CLI $lei->('q', 's:use boolean prefix'); like($out, qr/search: use boolean prefix/, 'phrase search got result'); + + $lei->('q', '-o', "mboxcl2:$home/mbox", 's:use boolean prefix'); + open my $mb, '<', "$home/mbox" or fail "no mbox: $!"; + my @s = grep(/^Subject:/, <$mb>); + is(scalar(@s), 1, '1 result in mbox'); + $lei->('q', '-a', '-o', "mboxcl2:$home/mbox", 's:see attachment'); + is($err, '', 'no errors from augment'); + seek($mb, 0, SEEK_SET) or BAIL_OUT "seek: $!"; + @s = grep(/^Subject:/, <$mb>); + is(scalar(@s), 2, '2 results in mbox'); + + $lei->('q', '-a', '-o', "mboxcl2:$home/mbox", 's:nonexistent'); + is($err, '', 'no errors on no results'); + seek($mb, 0, SEEK_SET) or BAIL_OUT "seek: $!"; + my @s2 = grep(/^Subject:/, <$mb>); + is_deeply(\@s2, \@s, 'same 2 old results w/ --augment and bad search'); + + $lei->('q', '-o', "mboxcl2:$home/mbox", 's:nonexistent'); + is(-s "$home/mbox", 0, 'clobber w/o --augment'); }; my $test_lei_common = sub { diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t index d5beb3d2..083e0df4 100644 --- a/t/lei_to_mail.t +++ b/t/lei_to_mail.t @@ -94,7 +94,9 @@ my $wcb_get = sub { my $dup = Storable::thaw(Storable::freeze($l2m)); is_deeply($dup, $l2m, "$fmt round-trips through storable"); } - $l2m->do_prepare($lei); + $l2m->pre_augment($lei); + $l2m->do_augment($lei); + $l2m->post_augment($lei); my $cb = $l2m->write_cb($lei); delete $lei->{1}; $cb;