From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-3.9 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 037E41FC9A for ; Sun, 7 Feb 2021 08:52:03 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 15/19] lei q: improve remote mboxrd UX Date: Sun, 7 Feb 2021 08:51:57 +0000 Message-Id: <20210207085201.13871-16-e@80x24.org> In-Reply-To: <20210207085201.13871-1-e@80x24.org> References: <20210207085201.13871-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: For early MUA spawners using lock-free outputs, we we need to on the startq pipe to silence progress reporting. For --augment users, we can start the MUA even earlier by creating Maildirs in the pre-augment phase. To improve progress reporting for non-MUA (or late-MUA) spawners, we'll no longer blindly append "--compressed" to the curl(1) command when POST-ing for the gzipped mboxrd. Furthermore, we'll overload stringify ('""') in LeiCurl to ensure the empty -d '' string shows up properly. --- lib/PublicInbox/IPC.pm | 8 ++-- lib/PublicInbox/LEI.pm | 4 +- lib/PublicInbox/LeiCurl.pm | 11 +++-- lib/PublicInbox/LeiMirror.pm | 5 +- lib/PublicInbox/LeiOverview.pm | 3 +- lib/PublicInbox/LeiToMail.pm | 24 +++++----- lib/PublicInbox/LeiXSearch.pm | 87 ++++++++++++++++++++++------------ 7 files changed, 88 insertions(+), 54 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index c8673e26..9331233a 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -109,7 +109,6 @@ sub ipc_worker_spawn { $w_res->autoflush(1); $SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT)); local $0 = $ident; - PublicInbox::DS::sig_setmask($sigset); # ensure we properly exit even if warn() dies: my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) }); eval { @@ -117,6 +116,7 @@ sub ipc_worker_spawn { local @$self{keys %$fields} = values(%$fields); my $on_destroy = $self->ipc_atfork_child; local %SIG = %SIG; + PublicInbox::DS::sig_setmask($sigset); ipc_worker_loop($self, $r_req, $w_res); }; warn "worker $ident PID:$$ died: $@\n" if $@; @@ -293,7 +293,6 @@ sub _wq_worker_start ($$$) { $SIG{$_} = 'IGNORE' for (qw(PIPE)); $SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD)); local $0 = $self->{-wq_ident}; - PublicInbox::DS::sig_setmask($oldset); # ensure we properly exit even if warn() dies: my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) }); eval { @@ -301,6 +300,7 @@ sub _wq_worker_start ($$$) { local @$self{keys %$fields} = values(%$fields); my $on_destroy = $self->ipc_atfork_child; local %SIG = %SIG; + PublicInbox::DS::sig_setmask($oldset); wq_worker_loop($self); }; warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@; @@ -395,9 +395,9 @@ sub wq_close { } sub wq_kill_old { - my ($self) = @_; + my ($self, $sig) = @_; my $pids = $self->{"-wq_old_pids.$$"} or return; - kill 'TERM', @$pids; + kill($sig // 'TERM', @$pids); } sub wq_kill { diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 31e6b4a8..e52154e5 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -741,7 +741,9 @@ sub start_mua { } elsif ($self->{oneshot}) { $self->{"mua.pid.$self.$$"} = spawn(\@cmd); } - delete $self->{-progress}; + if ($self->{lxs} && $self->{au_done}) { # kick wait_startq + syswrite($self->{au_done}, 'q' x ($self->{lxs}->{jobs} // 0)); + } } # caller needs to "-t $self->{1}" to check if tty diff --git a/lib/PublicInbox/LeiCurl.pm b/lib/PublicInbox/LeiCurl.pm index 38b17c78..f346a1b4 100644 --- a/lib/PublicInbox/LeiCurl.pm +++ b/lib/PublicInbox/LeiCurl.pm @@ -8,6 +8,12 @@ use v5.10.1; use PublicInbox::Spawn qw(which); use PublicInbox::Config; +# Ensures empty strings are quoted, we don't need more +# sophisticated quoting than for empty strings: curl -d '' +use overload '""' => sub { + join(' ', map { $_ eq '' ? "''" : $_ } @{$_[0]}); +}; + my %lei2curl = ( 'curl-config=s@' => 'config|K=s@', ); @@ -63,10 +69,9 @@ EOM # completes the result of cmd() for $uri sub for_uri { - my ($self, $lei, $uri) = @_; + my ($self, $lei, $uri, @opt) = @_; my $pfx = torsocks($self, $lei, $uri) or return; # error - [ @$pfx, @$self, substr($uri->path, -3) eq '.gz' ? () : '--compressed', - $uri->as_string ] + bless [ @$pfx, @$self, @opt, $uri->as_string ], ref($self); } 1; diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm index 5ba69287..c5153148 100644 --- a/lib/PublicInbox/LeiMirror.pm +++ b/lib/PublicInbox/LeiMirror.pm @@ -31,7 +31,7 @@ sub try_scrape { my $uri = URI->new($self->{src}); my $lei = $self->{lei}; my $curl = $self->{curl} //= PublicInbox::LeiCurl->new($lei) or return; - my $cmd = $curl->for_uri($lei, $uri); + my $cmd = $curl->for_uri($lei, $uri, '--compressed'); my $opt = { 0 => $lei->{0}, 2 => $lei->{2} }; my $fh = popen_rd($cmd, $lei->{env}, $opt); my $html = do { local $/; <$fh> } // die "read(curl $uri): $!"; @@ -93,8 +93,7 @@ sub _try_config { my $path = $uri->path; chop($path) eq '/' or die "BUG: $uri not canonicalized"; $uri->path($path . '/_/text/config/raw'); - my $cmd = $self->{curl}->for_uri($lei, $uri); - push @$cmd, '--compressed'; # curl decompresses for us + my $cmd = $self->{curl}->for_uri($lei, $uri, '--compressed'); my $ce = "$dst/inbox.config.example"; my $f = "$ce-$$.tmp"; open(my $fh, '+>', $f) or return $lei->err("open $f: $! (non-fatal)"); diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index dcfb9cc7..f0ac4684 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -95,9 +95,10 @@ sub new { $lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei); } else { # default to the cheapest sort since MUA usually resorts - $lei->{opt}->{'sort'} //= 'docid' if $dst ne '/dev/stdout'; + $opt->{'sort'} //= 'docid' if $dst ne '/dev/stdout'; $lei->{l2m} = eval { PublicInbox::LeiToMail->new($lei) }; return $lei->fail($@) if $@; + $lei->{early_mua} = 1 if $opt->{mua} && $lei->{l2m}->lock_free; } $self; } diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 3f65e9e9..857aeb63 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -370,7 +370,17 @@ sub new { $self; } -sub _pre_augment_maildir {} # noop +sub _pre_augment_maildir { + my ($self, $lei) = @_; + my $dst = $lei->{ovv}->{dst}; + for my $x (qw(tmp new cur)) { + my $d = $dst.$x; + next if -d $d; + require File::Path; + File::Path::mkpath($d); + -d $d or die "$d is not a directory"; + } +} sub _do_augment_maildir { my ($self, $lei) = @_; @@ -387,17 +397,7 @@ sub _do_augment_maildir { } } -sub _post_augment_maildir { - my ($self, $lei) = @_; - my $dst = $lei->{ovv}->{dst}; - for my $x (qw(tmp new cur)) { - my $d = $dst.$x; - next if -d $d; - require File::Path; - File::Path::mkpath($d); - -d $d or die "$d is not a directory"; - } -} +sub _post_augment_maildir {} # noop sub _pre_augment_mbox { my ($self, $lei) = @_; diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 2794140a..0e99e4b4 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -101,9 +101,23 @@ sub _mset_more ($$) { # $startq will EOF when query_prepare is done augmenting and allow # query_mset and query_thread_mset to proceed. sub wait_startq ($) { - my ($startq) = @_; - $_[0] = undef; - read($startq, my $query_prepare_done, 1); + my ($lei) = @_; + my $startq = delete $lei->{startq} or return; + while (1) { + my $n = sysread($startq, my $query_prepare_done, 1); + if (defined $n) { + return if $n == 0; # no MUA + if ($query_prepare_done eq 'q') { + $lei->{opt}->{quiet} = 1; + delete $lei->{opt}->{verbose}; + delete $lei->{-progress}; + } else { + $lei->fail("$$ WTF `$query_prepare_done'"); + } + return; + } + return $lei->fail("$$ wait_startq: $!") unless $!{EINTR}; + } } sub mset_progress { @@ -140,7 +154,7 @@ sub query_thread_mset { # for --threads while ($over->expand_thread($ctx)) { for my $n (@{$ctx->{xids}}) { my $smsg = $over->get_art($n) or next; - wait_startq($startq) if $startq; + wait_startq($lei); my $mitem = delete $n2item{$smsg->{num}}; $each_smsg->($smsg, $mitem); } @@ -155,7 +169,6 @@ sub query_mset { # non-parallel for non-"--threads" users my ($self) = @_; local $0 = "$0 query_mset"; my $lei = $self->{lei}; - my $startq = delete $lei->{startq}; my $mo = { %{$lei->{mset_opt}} }; my $mset; for my $loc (locals($self)) { @@ -168,7 +181,7 @@ sub query_mset { # non-parallel for non-"--threads" users $mset->size, $mset->get_matches_estimated); for my $mitem ($mset->items) { my $smsg = smsg_for($self, $mitem) or next; - wait_startq($startq) if $startq; + wait_startq($lei); $each_smsg->($smsg, $mitem); } } while (_mset_more($mset, $mo)); @@ -183,7 +196,7 @@ sub each_eml { # callback for MboxReader->mboxrd $smsg->parse_references($eml, mids($eml)); $smsg->{$_} //= '' for qw(from to cc ds subject references mid); delete @$smsg{qw(From Subject -ds -ts)}; - if (my $startq = delete($lei->{startq})) { wait_startq($startq) } + wait_startq($lei); if ($lei->{-progress}) { ++$lei->{-nr_remote_eml}; my $now = now(); @@ -200,6 +213,10 @@ sub each_eml { # callback for MboxReader->mboxrd sub query_remote_mboxrd { my ($self, $uris) = @_; local $0 = "$0 query_remote_mboxrd"; +open my $dbg, '>>', '/tmp/dbg'; $dbg->autoflush(1); use Data::Dumper; + local $SIG{__WARN__} = sub { + print $dbg "$$ @_"; + }; local $SIG{TERM} = sub { exit(0) }; # for DESTROY (File::Temp, $reap) my $lei = $self->{lei}; my ($opt, $env) = @$lei{qw(opt env)}; @@ -210,7 +227,6 @@ sub query_remote_mboxrd { my $cerr = File::Temp->new(TEMPLATE => 'curl.err-XXXX', TMPDIR => 1); fcntl($cerr, F_SETFL, O_APPEND|O_RDWR) or warn "set O_APPEND: $!"; my $rdr = { 2 => $cerr, pgid => 0 }; - my $coff = 0; my $sigint_reap = $lei->can('sigint_reap'); if ($verbose) { # spawn a process to force line-buffering, otherwise curl @@ -228,13 +244,14 @@ sub query_remote_mboxrd { $lei->{-nr_remote_eml} = 0; $uri->query_form(@qform); my $cmd = $curl->for_uri($lei, $uri); - $lei->err("# @$cmd") if $verbose; + $lei->qerr("# $cmd"); my ($fh, $pid) = popen_rd($cmd, $env, $rdr); $reap_curl = PublicInbox::OnDestroy->new($sigint_reap, $pid); $fh = IO::Uncompress::Gunzip->new($fh); PublicInbox::MboxReader->mboxrd($fh, \&each_eml, $self, $lei, $each_smsg); - my $err = waitpid($pid, 0) == $pid ? undef : "BUG: waitpid: $!"; + my $err = waitpid($pid, 0) == $pid ? undef + : "BUG: waitpid($cmd): $!"; @$reap_curl = (); # cancel OnDestroy die $err if $err; if ($? == 0) { @@ -242,16 +259,18 @@ sub query_remote_mboxrd { mset_progress($lei, $lei->{-current_url}, $nr, $nr); next; } - seek($cerr, $coff, SEEK_SET) or warn "seek(curl stderr): $!\n"; - my $e = do { local $/; <$cerr> } // - die "read(curl stderr): $!\n"; - $coff += length($e); - truncate($cerr, 0); - next if (($? >> 8) == 22 && $e =~ /\b404\b/); - $lei->child_error($?); + $err = ''; + if (-s $cerr) { + seek($cerr, 0, SEEK_SET) or + $lei->err("seek($cmd stderr): $!"); + $err = do { local $/; <$cerr> } // + "read($cmd stderr): $!"; + truncate($cerr, 0) or + $lei->err("truncate($cmd stderr): $!"); + } + next if (($? >> 8) == 22 && $err =~ /\b404\b/); $uri->query_form(q => $lei->{mset_opt}->{qstr}); - # --verbose already showed the error via tail(1) - $lei->err("E: $uri \$?=$?\n", $verbose ? () : $e); + $lei->child_error($?, "E: <$uri> $err"); } undef $each_smsg; $lei->{ovv}->ovv_atexit_child($lei); @@ -311,15 +330,23 @@ Error closing $lei->{ovv}->{dst}: $! sub do_post_augment { my ($lei) = @_; - eval { $lei->{l2m}->post_augment($lei) }; - if (my $err = $@) { - if (my $lxs = delete $lei->{lxs}) { - $lxs->wq_kill; - $lxs->wq_close(0, undef, $lei); + my $l2m = $lei->{l2m}; + my $err; + if ($l2m) { + eval { $l2m->post_augment($lei) }; + $err = $@; + if ($err) { + if (my $lxs = delete $lei->{lxs}) { + $lxs->wq_kill; + $lxs->wq_close(0, undef, $lei); + } + $lei->fail("$err"); } - $lei->fail("$err"); } - close(delete $lei->{au_done}); # triggers wait_startq + if (!$err && delete $lei->{early_mua}) { # non-augment case + $lei->start_mua; + } + close(delete $lei->{au_done}); # triggers wait_startq in lei_xsearch } my $MAX_PER_HOST = 4; @@ -334,9 +361,6 @@ sub concurrency { sub start_query { # always runs in main (lei-daemon) process my ($self, $lei) = @_; - if (my $l2m = $lei->{l2m}) { - $lei->start_mua if $l2m->lock_free; - } if ($lei->{opt}->{threads}) { for my $ibxish (locals($self)) { $self->wq_io_do('query_thread_mset', [], $ibxish); @@ -387,6 +411,9 @@ sub do_query { my $l2m = $lei->{l2m}; if ($l2m) { $l2m->pre_augment($lei); + if ($lei->{opt}->{augment} && delete $lei->{early_mua}) { + $lei->start_mua; + } $l2m->wq_workers_start('lei2mail', $l2m->{jobs}, $lei->oldset, { lei => $lei }); pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!"; @@ -404,7 +431,7 @@ sub do_query { delete $lei->{pkt_op_p}; $l2m->wq_close(1) if $l2m; $lei->event_step_init; # wait for shutdowns - $self->wq_io_do('query_prepare', []) if $l2m; + $self->wq_io_do('query_prepare', []) if $l2m; # for augment/dedupe start_query($self, $lei); $self->wq_close(1); # lei_xsearch workers stop when done if ($lei->{oneshot}) {