Been putting off 3/4 off for a bit, still thinking about labels a bit... Eric Wong (4): lei q: skip lei/store->write_prepare for JSON outputs lei: do not blindly commit to lei/store on close lei: support /dev/fd/[0-2] inputs and outputs in daemon lei mark: disallow '!' in labels lib/PublicInbox/LEI.pm | 26 ++++++++++---------------- lib/PublicInbox/LeiConvert.pm | 3 ++- lib/PublicInbox/LeiInput.pm | 5 ++++- lib/PublicInbox/LeiMark.pm | 2 +- lib/PublicInbox/LeiOverview.pm | 17 +++++++++-------- lib/PublicInbox/LeiP2q.pm | 7 +++++-- lib/PublicInbox/LeiQuery.pm | 16 ++++++++-------- lib/PublicInbox/LeiToMail.pm | 28 ++++++++++++++++------------ lib/PublicInbox/LeiXSearch.pm | 2 ++ t/lei-convert.t | 2 +- t/lei-import.t | 2 +- 11 files changed, 59 insertions(+), 51 deletions(-)
JSON outputs won't write to lei/store at all, so there's no point in forking the store worker if it's not already running. LeiSearch object ($lse) is also fork-safe until it opens a persistent FD for Xapian/SQLite so we can unconditionally carry it across fork. --- lib/PublicInbox/LeiOverview.pm | 4 ++-- lib/PublicInbox/LeiQuery.pm | 16 ++++++++-------- lib/PublicInbox/LeiToMail.pm | 16 ++++++++-------- lib/PublicInbox/LeiXSearch.pm | 1 + 4 files changed, 19 insertions(+), 18 deletions(-) diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index b4d81328..96bfff24 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -223,7 +223,7 @@ sub ovv_each_smsg_cb { # runs in wq worker usually } } elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) { my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},"; - my $lse = $lei->{sto}->search; + my $lse = $lei->{lse}; sub { # DIY prettiness :P my ($smsg, $mitem) = @_; return if $dedupe->is_smsg_dup($smsg); @@ -247,7 +247,7 @@ sub ovv_each_smsg_cb { # runs in wq worker usually } } elsif ($json) { my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL - my $lse = $lei->{sto}->search; + my $lse = $lei->{lse}; sub { my ($smsg, $mitem) = @_; return if $dedupe->is_smsg_dup($smsg); diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index 84996e7e..65aa9e87 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -25,8 +25,7 @@ sub qstr_add { # PublicInbox::InputPipe::consume callback for --stdin my ($self) = @_; # $_[1] = $rbuf if (defined($_[1])) { $_[1] eq '' and return eval { - my $lse = delete $self->{lse}; - $lse->query_approxidate($lse->git, + $self->{lse}->query_approxidate($self->{lse}->git, $self->{mset_opt}->{qstr}); _start_query($self); }; @@ -50,11 +49,7 @@ sub lei_q { # --local is enabled by default unless --only is used # we'll allow "--only $LOCATION --local" my $sto = $self->_lei_store(1); - if (($opt->{'import-remote'} //= 1) | - (($opt->{'import-before'} //= \1) ? 1 : 0)) { - $sto->write_prepare($self); - } - my $lse = $sto->search; + my $lse = $self->{lse} = $sto->search; if ($opt->{'local'} //= scalar(@only) ? 0 : 1) { $lxs->prepare_external($lse); } @@ -103,6 +98,12 @@ sub lei_q { return $self->fail("`$mj' writer jobs must be >= 1"); } PublicInbox::LeiOverview->new($self) or return; + if ($self->{l2m} && ($opt->{'import-remote'} //= 1) | + # we use \1 (a ref) to distinguish between + # user-supplied and default value + (($opt->{'import-before'} //= \1) ? 1 : 0)) { + $sto->write_prepare($self); + } $self->{l2m} and $self->{l2m}->{-wq_nr_workers} = $mj // do { $mj = POSIX::lround($nproc * 3 / 4); # keep some CPU for git $mj <= 0 ? 1 : $mj; @@ -131,7 +132,6 @@ sub lei_q { no query allowed on command-line with --stdin require PublicInbox::InputPipe; - $self->{lse} = $lse; # for query_approxidate PublicInbox::InputPipe::consume($self->{0}, \&qstr_add, $self); return; } diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 1be15707..f71f74cc 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -263,7 +263,7 @@ sub _mbox_write_cb ($$) { my $atomic_append = !defined($ovv->{lock_path}); my $dedupe = $lei->{dedupe}; $dedupe->prepare_dedupe; - my $lse = $lei->{sto} ? $lei->{sto}->search : undef; + my $lse = $lei->{lse}; # may be undef sub { # for git_to_mail my ($buf, $smsg, $eml) = @_; $eml //= PublicInbox::Eml->new($buf); @@ -352,7 +352,7 @@ sub _maildir_write_cb ($$) { my $dedupe = $lei->{dedupe}; $dedupe->prepare_dedupe if $dedupe; my $dst = $lei->{ovv}->{dst}; - my $lse = $lei->{sto} ? $lei->{sto}->search : undef; + my $lse = $lei->{lse}; # may be undef sub { # for git_to_mail my ($buf, $smsg, $eml) = @_; $dst // return $lei->fail; # dst may be undef-ed in last run @@ -373,7 +373,7 @@ sub _imap_write_cb ($$) { my $imap_append = $lei->{net}->can('imap_append'); my $mic = $lei->{net}->mic_get($self->{uri}); my $folder = $self->{uri}->mailbox; - my $lse = $lei->{sto} ? $lei->{sto}->search : undef; + my $lse = $lei->{lse}; # may be undef sub { # for git_to_mail my ($bref, $smsg, $eml) = @_; $mic // return $lei->fail; # dst may be undef-ed in last run @@ -449,7 +449,7 @@ sub _pre_augment_maildir { sub _do_augment_maildir { my ($self, $lei) = @_; my $dst = $lei->{ovv}->{dst}; - my $lse = $lei->{sto}->search if $lei->{opt}->{'import-before'}; + my $lse = $lei->{opt}->{'import-before'} ? $lei->{lse} : undef; my ($mod, $shard) = @{$self->{shard_info} // []}; if ($lei->{opt}->{augment}) { my $dedupe = $lei->{dedupe}; @@ -481,7 +481,7 @@ sub _imap_augment_or_delete { # PublicInbox::NetReader::imap_each cb sub _do_augment_imap { my ($self, $lei) = @_; my $net = $lei->{net}; - my $lse = $lei->{sto}->search if $lei->{opt}->{'import-before'}; + my $lse = $lei->{opt}->{'import-before'} ? $lei->{lse} : undef; if ($lei->{opt}->{augment}) { my $dedupe = $lei->{dedupe}; if ($dedupe && $dedupe->prepare_dedupe) { @@ -523,9 +523,9 @@ sub _pre_augment_mbox { die "seek($dst): $!\n"; } if (!$self->{seekable}) { - my $ia = $lei->{opt}->{'import-before'}; + my $imp_before = $lei->{opt}->{'import-before'}; die "--import-before specified but $dst is not seekable\n" - if $ia && !ref($ia); + if $imp_before && !ref($imp_before); die "--augment specified but $dst is not seekable\n" if $lei->{opt}->{augment}; } @@ -562,7 +562,7 @@ sub _do_augment_mbox { $dedupe->prepare_dedupe if $dedupe; } if ($opt->{'import-before'}) { # the default - my $lse = $lei->{sto}->search; + my $lse = $lei->{lse}; PublicInbox::MboxReader->$fmt($rd, \&_mbox_augment_kw_maybe, $lei, $lse, $opt->{augment}); if (!$opt->{augment} and !truncate($out, 0)) { diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index f64b2c62..6410e0ea 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -430,6 +430,7 @@ sub do_query { $lei->{1}->autoflush(1); $lei->start_pager if delete $lei->{need_pager}; $lei->{ovv}->ovv_begin($lei); + die 'BUG: xdb|over open' if $lei->{lse}->{xdb} || $lei->{lse}->{over}; if ($l2m) { $l2m->pre_augment($lei); if ($lei->{opt}->{augment} && delete $lei->{early_mua}) {
It may hide errors/bugs, instead do it explicitly for each worker that writes to it. For lei_xsearch, it will be better to close before spawning the MUA for future use since we may need it again once the user starts changing keywords. --- lib/PublicInbox/LEI.pm | 3 --- lib/PublicInbox/LeiXSearch.pm | 1 + 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 6a5c32b3..59715633 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -1030,9 +1030,6 @@ sub dclose { } } close(delete $self->{1}) if $self->{1}; # may reap_compress - if (my $sto = delete $self->{sto}) { - $sto->ipc_do('done'); - } $self->close if $self->{-event_init_done}; # PublicInbox::DS::close } diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 6410e0ea..b41daffe 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -319,6 +319,7 @@ sub query_done { # EOF callback for main daemon if (my $lxs = delete $lei->{lxs}) { $lxs->wq_wait_old(\&xsearch_done_wait, $lei); } + my $wait = $lei->{sto} ? $lei->{sto}->ipc_do('done') : undef; $lei->{ovv}->ovv_end($lei); if ($l2m) { # close() calls LeiToMail reap_compress if (my $out = delete $lei->{old_1}) {
Since lei-daemon won't have the same FDs as the client, we need to special-case thse mappings and won't be able to open arbitrary, non-standard FDs. We also won't attempt to support /proc/self/fd/[0-2] since that's a Linux-ism. /dev/fd/[0-2] and /dev/std{in,out,err} are portable to FreeBSD, at least. mawk(1) also supports /dev/std{out,err}, as does gawk(1) (which supports everything we can support, and arbitrary /dev/fd/$FD). --- lib/PublicInbox/LEI.pm | 23 ++++++++++------------- lib/PublicInbox/LeiConvert.pm | 3 ++- lib/PublicInbox/LeiInput.pm | 5 ++++- lib/PublicInbox/LeiOverview.pm | 13 +++++++------ lib/PublicInbox/LeiP2q.pm | 7 +++++-- lib/PublicInbox/LeiToMail.pm | 12 ++++++++---- t/lei-convert.t | 2 +- t/lei-import.t | 2 +- 8 files changed, 38 insertions(+), 29 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 59715633..eb3ad9e2 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -464,7 +464,6 @@ sub _lei_atfork_child { } } else { # worker, Net::NNTP (Net::Cmd) uses STDERR directly open STDERR, '+>&='.fileno($self->{2}) or warn "open $!"; - delete $self->{0}; } for (delete @$self{qw(3 old_1 au_done)}) { close($_) if defined($_); @@ -929,19 +928,17 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail } my %path_to_fd = ('/dev/stdin' => 0, '/dev/stdout' => 1, '/dev/stderr' => 2); -$path_to_fd{"/dev/fd/$_"} = $path_to_fd{"/proc/self/fd/$_"} for (0..2); -sub fopen { - my ($self, $mode, $path) = @_; - rel2abs($self, $path); +$path_to_fd{"/dev/fd/$_"} = $_ for (0..2); + +# this also normalizes the path +sub path_to_fd { + my ($self, $path) = @_; + $path = rel2abs($self, $path); $path =~ tr!/!/!s; - if (defined(my $fd = $path_to_fd{$path})) { - return $self->{$fd}; - } - if ($path =~ m!\A/(?:dev|proc/self)/fd/[0-9]+\z!) { - return fail($self, "cannot open $path from daemon"); - } - open my $fh, $mode, $path or return; - $fh; + $path_to_fd{$path} // ( + ($path =~ m!\A/(?:dev|proc/self)/fd/[0-9]+\z!) ? + fail($self, "cannot open $path from daemon") : -1 + ); } # caller needs to "-t $self->{1}" to check if tty diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm index 0cc65108..083ecc33 100644 --- a/lib/PublicInbox/LeiConvert.pm +++ b/lib/PublicInbox/LeiConvert.pm @@ -50,7 +50,8 @@ sub lei_convert { # the main "lei convert" method my $ovv = PublicInbox::LeiOverview->new($lei, 'out-format'); $lei->{l2m} or return $lei->fail("output not specified or is not a mail destination"); - $lei->{opt}->{augment} = 1 unless $ovv->{dst} eq '/dev/stdout'; + my $devfd = $lei->path_to_fd($ovv->{dst}) // return; + $lei->{opt}->{augment} = 1 if $devfd < 0; $self->prepare_inputs($lei, \@inputs) or return; my $op = $lei->workers_start($self, 'lei_convert', 1); $lei->{cnv} = $self; diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm index b059ecda..eed0eed7 100644 --- a/lib/PublicInbox/LeiInput.pm +++ b/lib/PublicInbox/LeiInput.pm @@ -67,7 +67,10 @@ sub input_path_url { return; } $input =~ s!\A([a-z0-9]+):!!i and $ifmt = lc($1); - if (-f $input) { + my $devfd = $lei->path_to_fd($input) // return; + if ($devfd >= 0) { + $self->input_fh($ifmt, $lei->{$devfd}, $input, @args); + } elsif (-f $input) { my $m = $lei->{opt}->{'lock'} // ($ifmt eq 'eml' ? ['none'] : PublicInbox::MboxLock->defaults); my $mbl = PublicInbox::MboxLock->acq($input, 0, $m); diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index 96bfff24..8e26cba4 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -71,8 +71,9 @@ sub new { --$ofmt_key=$fmt and --output=$ofmt conflict } - $fmt //= 'json' if $dst eq '/dev/stdout'; - $fmt //= detect_fmt($lei, $dst) or return; + + my $devfd = $lei->path_to_fd($dst) // return; + $fmt //= $devfd >= 0 ? 'json' : (detect_fmt($lei, $dst) or return); if (index($dst, '://') < 0) { # not a URL, so assume path $dst = File::Spec->canonpath($dst); @@ -84,11 +85,11 @@ sub new { if ($fmt =~ /\A($JSONL|(?:concat)?json)\z/) { $json = $self->{json} = ref(PublicInbox::Config->json); } - if ($dst eq '/dev/stdout') { - my $isatty = $lei->{need_pager} = -t $lei->{1}; + if ($devfd >= 0) { + my $isatty = $lei->{need_pager} = -t $lei->{$devfd}; $opt->{pretty} //= $isatty; if (!$isatty && -f _) { - my $fl = fcntl($lei->{1}, F_GETFL, 0) // + my $fl = fcntl($lei->{$devfd}, F_GETFL, 0) // return $lei->fail("fcntl(stdout): $!"); ovv_out_lk_init($self) unless ($fl & O_APPEND); } else { @@ -101,7 +102,7 @@ sub new { $lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei); } else { # default to the cheapest sort since MUA usually resorts - $opt->{'sort'} //= 'docid' if $dst ne '/dev/stdout'; + $opt->{'sort'} //= 'docid' if $devfd < 0; $lei->{l2m} = eval { PublicInbox::LeiToMail->new($lei) }; return $lei->fail($@) if $@; if ($opt->{mua} && $lei->{l2m}->lock_free) { diff --git a/lib/PublicInbox/LeiP2q.pm b/lib/PublicInbox/LeiP2q.pm index fda055fe..25f63a10 100644 --- a/lib/PublicInbox/LeiP2q.pm +++ b/lib/PublicInbox/LeiP2q.pm @@ -107,8 +107,11 @@ sub do_p2q { # via wq_do my $in = $self->{0}; unless ($in) { my $input = $self->{input}; - if (-e $input) { - $in = $lei->fopen('<', $input) or + my $devfd = $lei->path_to_fd($input) // return; + if ($devfd >= 0) { + $in = $lei->{$devfd}; + } elsif (-e $input) { + open($in, '<', $input) or return $lei->fail("open < $input: $!"); } else { my @cmd = (qw(git format-patch --stdout -1), $input); diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index f71f74cc..88468c34 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -503,10 +503,12 @@ sub _do_augment_imap { sub _pre_augment_mbox { my ($self, $lei) = @_; my $dst = $lei->{ovv}->{dst}; - my $out = $lei->{1}; - if ($dst ne '/dev/stdout') { + my $out; + my $devfd = $lei->path_to_fd($dst) // die "bad $dst"; + if ($devfd >= 0) { + $out = $lei->{$devfd}; + } else { # normal-looking path if (-p $dst) { - $out = undef; open $out, '>', $dst or die "open($dst): $!"; } elsif (-f _ || !-e _) { require PublicInbox::MboxLock; @@ -514,12 +516,14 @@ sub _pre_augment_mbox { PublicInbox::MboxLock->defaults; $self->{mbl} = PublicInbox::MboxLock->acq($dst, 1, $m); $out = $self->{mbl}->{fh}; + } else { + die "$dst is not a file or FIFO\n"; } $lei->{old_1} = $lei->{1}; # keep for spawning MUA } # Perl does SEEK_END even with O_APPEND :< $self->{seekable} = seek($out, 0, SEEK_SET); - if (!$self->{seekable} && $! != ESPIPE && $dst ne '/dev/stdout') { + if (!$self->{seekable} && $! != ESPIPE && !defined($devfd)) { die "seek($dst): $!\n"; } if (!$self->{seekable}) { diff --git a/t/lei-convert.t b/t/lei-convert.t index e147715d..9b430d8e 100644 --- a/t/lei-convert.t +++ b/t/lei-convert.t @@ -87,7 +87,7 @@ test_lei({ tmpdir => $tmpdir }, sub { my $exp = do { local $/; <$fh> }; is($out, $exp, 'stdin => stdout'); - lei_ok qw(convert -F eml -o mboxcl2:/dev/stdout t/plack-qp.eml); + lei_ok qw(convert -F eml -o mboxcl2:/dev/fd/1 t/plack-qp.eml); open $fh, '<', \$lei_out or BAIL_OUT; @bar = (); PublicInbox::MboxReader->mboxcl2($fh, sub { diff --git a/t/lei-import.t b/t/lei-import.t index a697d756..fa40ad01 100644 --- a/t/lei-import.t +++ b/t/lei-import.t @@ -69,7 +69,7 @@ is($res->[0]->{kw}, undef, 'no keywords set'); $eml->header_set('Message-ID', '<k@y>'); $in = 'From k@y Fri Oct 2 00:00:00 1993'."\n".$eml->as_string; -lei_ok([qw(import -F mboxrd -)], undef, { %$lei_opt, 0 => \$in }, +lei_ok([qw(import -F mboxrd /dev/fd/0)], undef, { %$lei_opt, 0 => \$in }, \'import single file with --kw (default) from stdin'); lei(qw(q m:k@y)); $res = json_utf8->decode($lei_out);
'!' could wreak havoc if exposed to a shell like bash. It seems like a rare character for use in file/directory/mailbox names. --- lib/PublicInbox/LeiMark.pm | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/PublicInbox/LeiMark.pm b/lib/PublicInbox/LeiMark.pm index 7a2ccf77..6d236411 100644 --- a/lib/PublicInbox/LeiMark.pm +++ b/lib/PublicInbox/LeiMark.pm @@ -23,7 +23,7 @@ my %ERR = ( my ($label) = @_; length($label) >= $L_MAX and return "`$label' too long (must be <= $L_MAX)"; - $label =~ m{\A[a-z0-9_][a-z0-9_\-\./\@\!,]*[a-z0-9]\z} ? + $label =~ m{\A[a-z0-9_][a-z0-9_\-\./\@,]*[a-z0-9]\z} ? undef : "`$label' is invalid"; }, kw => sub {
This is needed for t/lei-import.t on FreeBSD, it should be necessary on Linux, but I didn't notice problems there... diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm index eed0eed7..d916249a 100644 --- a/lib/PublicInbox/LeiInput.pm +++ b/lib/PublicInbox/LeiInput.pm @@ -113,21 +113,29 @@ sub prepare_inputs { # returns undef on error --in-format=$in_fmt and `$ifmt:' conflict } - if (-f $input_path) { + my $devfd = $lei->path_to_fd($input_path) // return; + if ($devfd >= 0 || (-f $input_path || -p _)) { require PublicInbox::MboxLock; require PublicInbox::MboxReader; PublicInbox::MboxReader->reads($ifmt) or return $lei->fail("$ifmt not supported"); - } elsif (-d _) { + } elsif (-d $input_path) { require PublicInbox::MdirReader; $ifmt eq 'maildir' or return $lei->fail("$ifmt not supported"); } else { return $lei->fail("Unable to handle $input"); } - } elsif (-f $input) { push @f, $input } - elsif (-d _) { push @d, $input } - else { return $lei->fail("Unable to handle $input") } + } else { + my $devfd = $lei->path_to_fd($input) // return; + if ($devfd >= 0 || -f $input || -p _) { + push @f, $input + } elsif (-d $input) { + push @d, $input + } else { + return $lei->fail("Unable to handle $input") + } + } } if (@f) { check_input_format($lei, \@f) or return } if (@d) { # TODO: check for MH vs Maildir, here