From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCHv2 01/13] lei q: improve remote mboxrd UX + MUA
Date: Sun, 7 Feb 2021 23:05:09 -1000 [thread overview]
Message-ID: <20210208090521.28909-2-e@80x24.org> (raw)
In-Reply-To: <20210208090521.28909-1-e@80x24.org>
For early MUA spawners using lock-free outputs, we we need to
on the startq pipe to silence progress reporting. For
--augment users, we can start the MUA even earlier by
creating Maildirs in the pre-augment phase.
To improve progress reporting for non-MUA (or late-MUA)
spawners, we'll no longer blindly append "--compressed" to the
curl(1) command when POST-ing for the gzipped mboxrd.
Furthermore, we'll overload stringify ('""') in LeiCurl to
ensure the empty -d '' string shows up properly.
v2: fix startq waiting with --threads
mset_progress is never shown with early MUA spawning,
The plan is to still show progress when augmenting and
deduping. This fixes all local search cases.
A leftover debug bit is dropped, too
---
lib/PublicInbox/IPC.pm | 8 ++--
lib/PublicInbox/LEI.pm | 4 +-
lib/PublicInbox/LeiCurl.pm | 11 +++--
lib/PublicInbox/LeiMirror.pm | 5 +-
lib/PublicInbox/LeiOverview.pm | 3 +-
lib/PublicInbox/LeiToMail.pm | 24 +++++-----
lib/PublicInbox/LeiXSearch.pm | 88 +++++++++++++++++++++-------------
7 files changed, 86 insertions(+), 57 deletions(-)
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index c8673e26..9331233a 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -109,7 +109,6 @@ sub ipc_worker_spawn {
$w_res->autoflush(1);
$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
local $0 = $ident;
- PublicInbox::DS::sig_setmask($sigset);
# ensure we properly exit even if warn() dies:
my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
eval {
@@ -117,6 +116,7 @@ sub ipc_worker_spawn {
local @$self{keys %$fields} = values(%$fields);
my $on_destroy = $self->ipc_atfork_child;
local %SIG = %SIG;
+ PublicInbox::DS::sig_setmask($sigset);
ipc_worker_loop($self, $r_req, $w_res);
};
warn "worker $ident PID:$$ died: $@\n" if $@;
@@ -293,7 +293,6 @@ sub _wq_worker_start ($$$) {
$SIG{$_} = 'IGNORE' for (qw(PIPE));
$SIG{$_} = 'DEFAULT' for (qw(TTOU TTIN TERM QUIT INT CHLD));
local $0 = $self->{-wq_ident};
- PublicInbox::DS::sig_setmask($oldset);
# ensure we properly exit even if warn() dies:
my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
eval {
@@ -301,6 +300,7 @@ sub _wq_worker_start ($$$) {
local @$self{keys %$fields} = values(%$fields);
my $on_destroy = $self->ipc_atfork_child;
local %SIG = %SIG;
+ PublicInbox::DS::sig_setmask($oldset);
wq_worker_loop($self);
};
warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
@@ -395,9 +395,9 @@ sub wq_close {
}
sub wq_kill_old {
- my ($self) = @_;
+ my ($self, $sig) = @_;
my $pids = $self->{"-wq_old_pids.$$"} or return;
- kill 'TERM', @$pids;
+ kill($sig // 'TERM', @$pids);
}
sub wq_kill {
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index dce80762..c3645698 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -741,7 +741,9 @@ sub start_mua {
} elsif ($self->{oneshot}) {
$self->{"mua.pid.$self.$$"} = spawn(\@cmd);
}
- delete $self->{-progress};
+ if ($self->{lxs} && $self->{au_done}) { # kick wait_startq
+ syswrite($self->{au_done}, 'q' x ($self->{lxs}->{jobs} // 0));
+ }
}
# caller needs to "-t $self->{1}" to check if tty
diff --git a/lib/PublicInbox/LeiCurl.pm b/lib/PublicInbox/LeiCurl.pm
index 38b17c78..f346a1b4 100644
--- a/lib/PublicInbox/LeiCurl.pm
+++ b/lib/PublicInbox/LeiCurl.pm
@@ -8,6 +8,12 @@ use v5.10.1;
use PublicInbox::Spawn qw(which);
use PublicInbox::Config;
+# Ensures empty strings are quoted, we don't need more
+# sophisticated quoting than for empty strings: curl -d ''
+use overload '""' => sub {
+ join(' ', map { $_ eq '' ? "''" : $_ } @{$_[0]});
+};
+
my %lei2curl = (
'curl-config=s@' => 'config|K=s@',
);
@@ -63,10 +69,9 @@ EOM
# completes the result of cmd() for $uri
sub for_uri {
- my ($self, $lei, $uri) = @_;
+ my ($self, $lei, $uri, @opt) = @_;
my $pfx = torsocks($self, $lei, $uri) or return; # error
- [ @$pfx, @$self, substr($uri->path, -3) eq '.gz' ? () : '--compressed',
- $uri->as_string ]
+ bless [ @$pfx, @$self, @opt, $uri->as_string ], ref($self);
}
1;
diff --git a/lib/PublicInbox/LeiMirror.pm b/lib/PublicInbox/LeiMirror.pm
index 5ba69287..c5153148 100644
--- a/lib/PublicInbox/LeiMirror.pm
+++ b/lib/PublicInbox/LeiMirror.pm
@@ -31,7 +31,7 @@ sub try_scrape {
my $uri = URI->new($self->{src});
my $lei = $self->{lei};
my $curl = $self->{curl} //= PublicInbox::LeiCurl->new($lei) or return;
- my $cmd = $curl->for_uri($lei, $uri);
+ my $cmd = $curl->for_uri($lei, $uri, '--compressed');
my $opt = { 0 => $lei->{0}, 2 => $lei->{2} };
my $fh = popen_rd($cmd, $lei->{env}, $opt);
my $html = do { local $/; <$fh> } // die "read(curl $uri): $!";
@@ -93,8 +93,7 @@ sub _try_config {
my $path = $uri->path;
chop($path) eq '/' or die "BUG: $uri not canonicalized";
$uri->path($path . '/_/text/config/raw');
- my $cmd = $self->{curl}->for_uri($lei, $uri);
- push @$cmd, '--compressed'; # curl decompresses for us
+ my $cmd = $self->{curl}->for_uri($lei, $uri, '--compressed');
my $ce = "$dst/inbox.config.example";
my $f = "$ce-$$.tmp";
open(my $fh, '+>', $f) or return $lei->err("open $f: $! (non-fatal)");
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index dcfb9cc7..f0ac4684 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -95,9 +95,10 @@ sub new {
$lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei);
} else {
# default to the cheapest sort since MUA usually resorts
- $lei->{opt}->{'sort'} //= 'docid' if $dst ne '/dev/stdout';
+ $opt->{'sort'} //= 'docid' if $dst ne '/dev/stdout';
$lei->{l2m} = eval { PublicInbox::LeiToMail->new($lei) };
return $lei->fail($@) if $@;
+ $lei->{early_mua} = 1 if $opt->{mua} && $lei->{l2m}->lock_free;
}
$self;
}
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 4c5a5685..a5a196db 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -371,7 +371,17 @@ sub new {
$self;
}
-sub _pre_augment_maildir {} # noop
+sub _pre_augment_maildir {
+ my ($self, $lei) = @_;
+ my $dst = $lei->{ovv}->{dst};
+ for my $x (qw(tmp new cur)) {
+ my $d = $dst.$x;
+ next if -d $d;
+ require File::Path;
+ File::Path::mkpath($d);
+ -d $d or die "$d is not a directory";
+ }
+}
sub _do_augment_maildir {
my ($self, $lei) = @_;
@@ -388,17 +398,7 @@ sub _do_augment_maildir {
}
}
-sub _post_augment_maildir {
- my ($self, $lei) = @_;
- my $dst = $lei->{ovv}->{dst};
- for my $x (qw(tmp new cur)) {
- my $d = $dst.$x;
- next if -d $d;
- require File::Path;
- File::Path::mkpath($d);
- -d $d or die "$d is not a directory";
- }
-}
+sub _post_augment_maildir {} # noop
sub _pre_augment_mbox {
my ($self, $lei) = @_;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 2794140a..db089a67 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -101,20 +101,34 @@ sub _mset_more ($$) {
# $startq will EOF when query_prepare is done augmenting and allow
# query_mset and query_thread_mset to proceed.
sub wait_startq ($) {
- my ($startq) = @_;
- $_[0] = undef;
- read($startq, my $query_prepare_done, 1);
+ my ($lei) = @_;
+ my $startq = delete $lei->{startq} or return;
+ while (1) {
+ my $n = sysread($startq, my $query_prepare_done, 1);
+ if (defined $n) {
+ return if $n == 0; # no MUA
+ if ($query_prepare_done eq 'q') {
+ $lei->{opt}->{quiet} = 1;
+ delete $lei->{opt}->{verbose};
+ delete $lei->{-progress};
+ } else {
+ $lei->fail("$$ WTF `$query_prepare_done'");
+ }
+ return;
+ }
+ return $lei->fail("$$ wait_startq: $!") unless $!{EINTR};
+ }
}
sub mset_progress {
my $lei = shift;
- return unless $lei->{-progress};
+ return if $lei->{early_mua} || !$lei->{-progress};
if ($lei->{pkt_op_p}) {
pkt_do($lei->{pkt_op_p}, 'mset_progress', @_);
} else { # single lei-daemon consumer
my ($desc, $mset_size, $mset_total_est) = @_;
$lei->{-mset_total} += $mset_size;
- $lei->err("# $desc $mset_size/$mset_total_est");
+ $lei->qerr("# $desc $mset_size/$mset_total_est");
}
}
@@ -122,7 +136,6 @@ sub query_thread_mset { # for --threads
my ($self, $ibxish) = @_;
local $0 = "$0 query_thread_mset";
my $lei = $self->{lei};
- my $startq = delete $lei->{startq};
my ($srch, $over) = ($ibxish->search, $ibxish->over);
my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
return warn("$desc not indexed by Xapian\n") unless ($srch && $over);
@@ -140,7 +153,7 @@ sub query_thread_mset { # for --threads
while ($over->expand_thread($ctx)) {
for my $n (@{$ctx->{xids}}) {
my $smsg = $over->get_art($n) or next;
- wait_startq($startq) if $startq;
+ wait_startq($lei);
my $mitem = delete $n2item{$smsg->{num}};
$each_smsg->($smsg, $mitem);
}
@@ -155,7 +168,6 @@ sub query_mset { # non-parallel for non-"--threads" users
my ($self) = @_;
local $0 = "$0 query_mset";
my $lei = $self->{lei};
- my $startq = delete $lei->{startq};
my $mo = { %{$lei->{mset_opt}} };
my $mset;
for my $loc (locals($self)) {
@@ -168,7 +180,7 @@ sub query_mset { # non-parallel for non-"--threads" users
$mset->size, $mset->get_matches_estimated);
for my $mitem ($mset->items) {
my $smsg = smsg_for($self, $mitem) or next;
- wait_startq($startq) if $startq;
+ wait_startq($lei);
$each_smsg->($smsg, $mitem);
}
} while (_mset_more($mset, $mo));
@@ -183,7 +195,7 @@ sub each_eml { # callback for MboxReader->mboxrd
$smsg->parse_references($eml, mids($eml));
$smsg->{$_} //= '' for qw(from to cc ds subject references mid);
delete @$smsg{qw(From Subject -ds -ts)};
- if (my $startq = delete($lei->{startq})) { wait_startq($startq) }
+ wait_startq($lei);
if ($lei->{-progress}) {
++$lei->{-nr_remote_eml};
my $now = now();
@@ -210,7 +222,6 @@ sub query_remote_mboxrd {
my $cerr = File::Temp->new(TEMPLATE => 'curl.err-XXXX', TMPDIR => 1);
fcntl($cerr, F_SETFL, O_APPEND|O_RDWR) or warn "set O_APPEND: $!";
my $rdr = { 2 => $cerr, pgid => 0 };
- my $coff = 0;
my $sigint_reap = $lei->can('sigint_reap');
if ($verbose) {
# spawn a process to force line-buffering, otherwise curl
@@ -228,13 +239,14 @@ sub query_remote_mboxrd {
$lei->{-nr_remote_eml} = 0;
$uri->query_form(@qform);
my $cmd = $curl->for_uri($lei, $uri);
- $lei->err("# @$cmd") if $verbose;
+ $lei->qerr("# $cmd");
my ($fh, $pid) = popen_rd($cmd, $env, $rdr);
$reap_curl = PublicInbox::OnDestroy->new($sigint_reap, $pid);
$fh = IO::Uncompress::Gunzip->new($fh);
PublicInbox::MboxReader->mboxrd($fh, \&each_eml, $self,
$lei, $each_smsg);
- my $err = waitpid($pid, 0) == $pid ? undef : "BUG: waitpid: $!";
+ my $err = waitpid($pid, 0) == $pid ? undef
+ : "BUG: waitpid($cmd): $!";
@$reap_curl = (); # cancel OnDestroy
die $err if $err;
if ($? == 0) {
@@ -242,16 +254,18 @@ sub query_remote_mboxrd {
mset_progress($lei, $lei->{-current_url}, $nr, $nr);
next;
}
- seek($cerr, $coff, SEEK_SET) or warn "seek(curl stderr): $!\n";
- my $e = do { local $/; <$cerr> } //
- die "read(curl stderr): $!\n";
- $coff += length($e);
- truncate($cerr, 0);
- next if (($? >> 8) == 22 && $e =~ /\b404\b/);
- $lei->child_error($?);
+ $err = '';
+ if (-s $cerr) {
+ seek($cerr, 0, SEEK_SET) or
+ $lei->err("seek($cmd stderr): $!");
+ $err = do { local $/; <$cerr> } //
+ "read($cmd stderr): $!";
+ truncate($cerr, 0) or
+ $lei->err("truncate($cmd stderr): $!");
+ }
+ next if (($? >> 8) == 22 && $err =~ /\b404\b/);
$uri->query_form(q => $lei->{mset_opt}->{qstr});
- # --verbose already showed the error via tail(1)
- $lei->err("E: $uri \$?=$?\n", $verbose ? () : $e);
+ $lei->child_error($?, "E: <$uri> $err");
}
undef $each_smsg;
$lei->{ovv}->ovv_atexit_child($lei);
@@ -311,15 +325,23 @@ Error closing $lei->{ovv}->{dst}: $!
sub do_post_augment {
my ($lei) = @_;
- eval { $lei->{l2m}->post_augment($lei) };
- if (my $err = $@) {
- if (my $lxs = delete $lei->{lxs}) {
- $lxs->wq_kill;
- $lxs->wq_close(0, undef, $lei);
+ my $l2m = $lei->{l2m};
+ my $err;
+ if ($l2m) {
+ eval { $l2m->post_augment($lei) };
+ $err = $@;
+ if ($err) {
+ if (my $lxs = delete $lei->{lxs}) {
+ $lxs->wq_kill;
+ $lxs->wq_close(0, undef, $lei);
+ }
+ $lei->fail("$err");
}
- $lei->fail("$err");
}
- close(delete $lei->{au_done}); # triggers wait_startq
+ if (!$err && delete $lei->{early_mua}) { # non-augment case
+ $lei->start_mua;
+ }
+ close(delete $lei->{au_done}); # triggers wait_startq in lei_xsearch
}
my $MAX_PER_HOST = 4;
@@ -334,9 +356,6 @@ sub concurrency {
sub start_query { # always runs in main (lei-daemon) process
my ($self, $lei) = @_;
- if (my $l2m = $lei->{l2m}) {
- $lei->start_mua if $l2m->lock_free;
- }
if ($lei->{opt}->{threads}) {
for my $ibxish (locals($self)) {
$self->wq_io_do('query_thread_mset', [], $ibxish);
@@ -387,6 +406,9 @@ sub do_query {
my $l2m = $lei->{l2m};
if ($l2m) {
$l2m->pre_augment($lei);
+ if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
+ $lei->start_mua;
+ }
$l2m->wq_workers_start('lei2mail', $l2m->{jobs},
$lei->oldset, { lei => $lei });
pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
@@ -404,7 +426,7 @@ sub do_query {
delete $lei->{pkt_op_p};
$l2m->wq_close(1) if $l2m;
$lei->event_step_init; # wait for shutdowns
- $self->wq_io_do('query_prepare', []) if $l2m;
+ $self->wq_io_do('query_prepare', []) if $l2m; # for augment/dedupe
start_query($self, $lei);
$self->wq_close(1); # lei_xsearch workers stop when done
if ($lei->{oneshot}) {
next prev parent reply other threads:[~2021-02-08 9:05 UTC|newest]
Thread overview: 15+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-02-08 9:05 [PATCH 00/13] lei approxidate, startup fix, --alert Eric Wong
2021-02-08 9:05 ` Eric Wong [this message]
2021-02-08 9:05 ` [PATCH 02/13] lei_xsearch: quiet Eml warnings from remote mboxrds Eric Wong
2021-02-08 9:05 ` [PATCH 03/13] lei q: SIGWINCH process group with the terminal Eric Wong
2021-02-08 9:05 ` [PATCH 04/13] lei q: support --alert=CMD for early MUA users Eric Wong
2021-02-08 9:05 ` [PATCH 05/13] tests: favor IPv6 Eric Wong
2021-02-08 9:05 ` [PATCH 06/13] ds: improve add_timer usability Eric Wong
2021-02-08 9:05 ` [PATCH 07/13] lei: start_pager: drop COLUMNS default Eric Wong
2021-02-08 9:05 ` [PATCH 08/13] lei: avoid racing on unlink + bind + listen Eric Wong
2021-02-08 9:05 ` [PATCH 09/13] lei: drop BSD::Resource usage Eric Wong
2021-02-08 9:05 ` [PATCH 10/13] git: implement date_parse method Eric Wong
2021-02-08 9:05 ` [PATCH 11/13] lei q: use git approxidate with d:, dt: and rt: ranges Eric Wong
2021-02-10 9:59 ` [PATCH] search: fix argv handling of quoted phrases Eric Wong
2021-02-08 9:05 ` [PATCH 12/13] search: use one git-rev-parse process for all dates Eric Wong
2021-02-08 9:05 ` [PATCH 13/13] spawnpp: raise exception on E2BIG errors Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://public-inbox.org/README
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20210208090521.28909-2-e@80x24.org \
--to=e@80x24.org \
--cc=meta@public-inbox.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).