* [PATCH 01/14] cmd_ipc: support + test EINTR + EAGAIN, no FDs
2021-01-14 7:06 [PATCH 00/14] lei: another pile of changes Eric Wong
@ 2021-01-14 7:06 ` Eric Wong
2021-01-14 7:06 ` [PATCH 02/14] lei: test SIGPIPE, stop xsearch workers on client abort Eric Wong
` (12 subsequent siblings)
13 siblings, 0 replies; 16+ messages in thread
From: Eric Wong @ 2021-01-14 7:06 UTC (permalink / raw)
To: meta
We'll ensure our {send,recv}_cmd4 implementations are
consistent w.r.t. non-blocking and interrupted sockets.
We'll also support receiving messages without FDs associated
so we don't have to send dummy FDs to keep receivers from
reporting EOF.
---
lib/PublicInbox/CmdIPC4.pm | 6 +++---
lib/PublicInbox/Spawn.pm | 13 ++++++++-----
t/cmd_ipc.t | 32 ++++++++++++++++++++++++++++++++
3 files changed, 43 insertions(+), 8 deletions(-)
diff --git a/lib/PublicInbox/CmdIPC4.pm b/lib/PublicInbox/CmdIPC4.pm
index c4fcb0d6..c244f6a1 100644
--- a/lib/PublicInbox/CmdIPC4.pm
+++ b/lib/PublicInbox/CmdIPC4.pm
@@ -18,17 +18,17 @@ no warnings 'once';
my $mh = Socket::MsgHdr->new(buf => $_[2]);
$mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS,
pack('i' x scalar(@$fds), @$fds));
- Socket::MsgHdr::sendmsg($sock, $mh, $flags) or die "sendmsg: $!";
+ Socket::MsgHdr::sendmsg($sock, $mh, $flags);
};
*recv_cmd4 = sub ($$$) {
my ($s, undef, $len) = @_; # $_[1] = destination buffer
my $mh = Socket::MsgHdr->new(buflen => $len, controllen => 256);
- my $r = Socket::MsgHdr::recvmsg($s, $mh, 0) // die "recvmsg: $!";
+ my $r = Socket::MsgHdr::recvmsg($s, $mh, 0) // return ($_[1] = undef);
$_[1] = $mh->buf;
return () if $r == 0;
my (undef, undef, $data) = $mh->cmsghdr;
- unpack('i' x (length($data) / 4), $data);
+ defined($data) ? unpack('i' x (length($data) / 4), $data) : ();
};
} } # /eval /BEGIN
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index ef822e1b..e5c0b1e9 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -216,12 +216,13 @@ union my_cmsg {
char pad[sizeof(struct cmsghdr) + 16 + SEND_FD_SPACE];
};
-int send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags)
+SV *send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags)
{
struct msghdr msg = { 0 };
union my_cmsg cmsg = { 0 };
STRLEN dlen = 0;
struct iovec iov;
+ ssize_t sent;
AV *fds = (AV *)SvRV(svfds);
I32 i, nfds = av_len(fds) + 1;
int *fdp;
@@ -252,7 +253,8 @@ int send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags)
*fdp++ = SvIV(*fd);
}
}
- return sendmsg(PerlIO_fileno(s), &msg, flags) >= 0;
+ sent = sendmsg(PerlIO_fileno(s), &msg, flags);
+ return sent >= 0 ? newSViv(sent) : &PL_sv_undef;
}
void recv_cmd4(PerlIO *s, SV *buf, STRLEN n)
@@ -260,7 +262,7 @@ void recv_cmd4(PerlIO *s, SV *buf, STRLEN n)
union my_cmsg cmsg = { 0 };
struct msghdr msg = { 0 };
struct iovec iov;
- size_t i;
+ ssize_t i;
Inline_Stack_Vars;
Inline_Stack_Reset;
@@ -275,8 +277,9 @@ void recv_cmd4(PerlIO *s, SV *buf, STRLEN n)
i = recvmsg(PerlIO_fileno(s), &msg, 0);
if (i < 0)
- croak("recvmsg: %s", strerror(errno));
- SvCUR_set(buf, i);
+ Inline_Stack_Push(&PL_sv_undef);
+ else
+ SvCUR_set(buf, i);
if (i > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
cmsg.hdr.cmsg_type == SCM_RIGHTS) {
size_t len = cmsg.hdr.cmsg_len;
diff --git a/t/cmd_ipc.t b/t/cmd_ipc.t
index 0a0a4e00..96510175 100644
--- a/t/cmd_ipc.t
+++ b/t/cmd_ipc.t
@@ -10,6 +10,7 @@ pipe(my ($r, $w)) or BAIL_OUT;
my ($send, $recv);
require_ok 'PublicInbox::Spawn';
my $SOCK_SEQPACKET = eval { Socket::SOCK_SEQPACKET() } // undef;
+use Time::HiRes qw(alarm);
my $do_test = sub { SKIP: {
my ($type, $flag, $desc) = @_;
@@ -45,11 +46,42 @@ my $do_test = sub { SKIP: {
is($buf, (',' x 1023) . '-', 'silently truncated buf');
$opens->();
$r1 = $w1 = $s1a = undef;
+
+ $s2->blocking(0);
+ @fds = $recv->($s2, $buf, length($src) + 1);
+ ok($!{EAGAIN}, "EAGAIN set by ($desc)");
+ is_deeply(\@fds, [ undef ], "EAGAIN $desc");
+ $s2->blocking(1);
+
+ my $alrm = 0;
+ local $SIG{ALRM} = sub { $alrm++ };
+ alarm(0.001);
+ @fds = $recv->($s2, $buf, length($src) + 1);
+ ok($!{EINTR}, "EINTR set by ($desc)");
+ is_deeply(\@fds, [ undef ], "EINTR $desc");
+ is($alrm, 1, 'SIGALRM hit');
+
close $s1;
@fds = $recv->($s2, $buf, length($src) + 1);
is_deeply(\@fds, [], "no FDs on EOF $desc");
is($buf, '', "buffer cleared on EOF ($desc)");
+ socketpair($s1, $s2, AF_UNIX, $type, 0) or BAIL_OUT $!;
+ $s1->blocking(0);
+ my $nsent = 0;
+ while (defined(my $n = $send->($s1, $sfds, $src, $flag))) {
+ $nsent += $n;
+ fail "sent 0 bytes" if $n == 0;
+ }
+ ok($!{EAGAIN}, "hit EAGAIN on send $desc");
+ ok($nsent > 0, 'sent some bytes');
+
+ socketpair($s1, $s2, AF_UNIX, $type, 0) or BAIL_OUT $!;
+ is($send->($s1, [], $src, $flag), length($src), 'sent w/o FDs');
+ $buf = 'nope';
+ @fds = $recv->($s2, $buf, length($src));
+ is(scalar(@fds), 0, 'no FDs received');
+ is($buf, $src, 'recv w/o FDs');
}
} };
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 02/14] lei: test SIGPIPE, stop xsearch workers on client abort
2021-01-14 7:06 [PATCH 00/14] lei: another pile of changes Eric Wong
2021-01-14 7:06 ` [PATCH 01/14] cmd_ipc: support + test EINTR + EAGAIN, no FDs Eric Wong
@ 2021-01-14 7:06 ` Eric Wong
2021-01-14 7:06 ` [PATCH 03/14] daemon+watch: fix localization of %SIG for non-signalfd users Eric Wong
` (11 subsequent siblings)
13 siblings, 0 replies; 16+ messages in thread
From: Eric Wong @ 2021-01-14 7:06 UTC (permalink / raw)
To: meta
The new test ensures consistency between oneshot and
client/daemon users. Cancelling an in-progress result now also
stops xsearch workers to avoid wasted CPU and I/O.
Note the lei->atfork_child_wq usage changes, it is to workaround
a bug in Perl 5: http://nntp.perl.org/group/perl.perl5.porters/258784
<CAHhgV8hPbcmkzWizp6Vijw921M5BOXixj4+zTh3nRS9vRBYk8w@mail.gmail.com>
This switches the internal protocol to use SOCK_SEQPACKET
AF_UNIX sockets to prevent merging messages from the daemon to
client to run pager and kill/exit the client script.
---
MANIFEST | 1 +
lib/PublicInbox/IPC.pm | 45 ++++------
lib/PublicInbox/LEI.pm | 158 +++++++++++++++++----------------
lib/PublicInbox/LeiOverview.pm | 5 +-
lib/PublicInbox/LeiQuery.pm | 22 ++---
lib/PublicInbox/LeiXSearch.pm | 34 +++++--
script/lei | 74 ++++++++++-----
t/lei.t | 2 +-
xt/lei-sigpipe.t | 32 +++++++
9 files changed, 225 insertions(+), 148 deletions(-)
create mode 100644 xt/lei-sigpipe.t
diff --git a/MANIFEST b/MANIFEST
index 810aec42..2ca240fc 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -429,6 +429,7 @@ xt/git_async_cmp.t
xt/httpd-async-stream.t
xt/imapd-mbsync-oimap.t
xt/imapd-validate.t
+xt/lei-sigpipe.t
xt/mem-imapd-tls.t
xt/mem-msgview.t
xt/msgtime_cmp.t
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index c54fcc64..fbc91f6f 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -130,7 +130,8 @@ sub ipc_worker_spawn {
sub ipc_worker_reap { # dwaitpid callback
my ($self, $pid) = @_;
- warn "PID:$pid died with \$?=$?\n" if $?;
+ # SIGTERM (15) is our default exit signal
+ warn "PID:$pid died with \$?=$?\n" if $? && ($? & 127) != 15;
}
# for base class, override in sub classes
@@ -236,50 +237,31 @@ sub ipc_sibling_atfork_child {
$pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
}
-sub _close_recvd ($) {
- my ($self) = @_;
- my $x = $self->{-wq_recv_modes};
- my $end = $x ? $#$x : 2;
- close($_) for (grep { defined } (delete @$self{0..$end}));
-}
-
sub wq_worker_loop ($) {
my ($self) = @_;
- my $buf;
my $len = $self->{wq_req_len} // (4096 * 33);
- my ($sub, $args);
my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
- local $SIG{PIPE} = sub {
- my $cur_sub = $sub;
- _close_recvd($self);
- die(bless(\$cur_sub, 'PublicInbox::SIGPIPE')) if $cur_sub;
- };
while (1) {
- my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
- my $i = 0;
+ my @fds = $recv_cmd->($s2, my $buf, $len) or return; # EOF
my @m = @{$self->{-wq_recv_modes} // [qw( +<&= >&= >&= )]};
+ my $nfd = 0;
for my $fd (@fds) {
my $mode = shift(@m);
if (open(my $cmdfh, $mode, $fd)) {
- $self->{$i++} = $cmdfh;
+ $self->{$nfd++} = $cmdfh;
$cmdfh->autoflush(1);
} else {
- die "$$ open($mode$fd) (FD:$i): $!";
+ die "$$ open($mode$fd) (FD:$nfd): $!";
}
}
# Sereal dies on truncated data, Storable returns undef
- $args = thaw($buf) //
+ my $args = thaw($buf) //
die "thaw error on buffer of size:".length($buf);
- eval {
- $sub = shift @$args;
- eval { $self->$sub(@$args) };
- undef $sub; # quiet SIG{PIPE} handler
- die $@ if $@;
- };
+ my $sub = shift @$args;
+ eval { $self->$sub(@$args) };
warn "$$ wq_worker: $@" if $@ &&
ref($@) ne 'PublicInbox::SIGPIPE';
- # need to close explicitly to avoid warnings after SIGPIPE
- _close_recvd($self);
+ delete @$self{0..($nfd-1)};
}
}
@@ -400,9 +382,16 @@ sub wq_close {
}
}
+sub wq_kill {
+ my ($self, $sig) = @_;
+ my $workers = $self->{-wq_workers} or return;
+ kill($sig // 'TERM', keys %$workers);
+}
+
sub WQ_MAX_WORKERS { $WQ_MAX_WORKERS }
sub DESTROY {
+ wq_kill($_[0]);
wq_close($_[0]);
ipc_worker_stop($_[0]);
}
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 7313738e..2889fa76 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -11,13 +11,13 @@ use v5.10.1;
use parent qw(PublicInbox::DS PublicInbox::LeiExternal
PublicInbox::LeiQuery);
use Getopt::Long ();
-use Socket qw(AF_UNIX SOCK_STREAM pack_sockaddr_un);
-use Errno qw(EAGAIN ECONNREFUSED ENOENT);
+use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
+use Errno qw(EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET);
use POSIX ();
use IO::Handle ();
use Sys::Syslog qw(syslog openlog);
use PublicInbox::Config;
-use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLONESHOT);
+use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLET);
use PublicInbox::Sigfd;
use PublicInbox::DS qw(now dwaitpid);
use PublicInbox::Spawn qw(spawn run_die popen_rd);
@@ -238,16 +238,15 @@ my %CONFIG_KEYS = (
'leistore.dir' => 'top-level storage location',
);
-sub x_it ($$) { # pronounced "exit"
+# pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE
+sub x_it ($$) {
my ($self, $code) = @_;
- $self->{1}->autoflush(1); # make sure client sees stdout before exit
- my $sig = ($code & 127);
- $code >>= 8 unless $sig;
+ # make sure client sees stdout before exit
+ $self->{1}->autoflush(1) if $self->{1};
if (my $sock = $self->{sock}) {
- my $fds = [ map { fileno($_) } @$self{0..2} ];
- $send_cmd->($sock, $fds, "exit=$code\n", 0);
- } else { # for oneshot
- $quit->($code);
+ send($sock, "x_it $code", MSG_EOR);
+ } elsif (!($code & 127)) { # oneshot, ignore signals
+ $quit->($code >> 8);
}
}
@@ -274,22 +273,20 @@ sub atfork_prepare_wq {
grep { defined } @$self{qw(0 1 2 sock)}
}
-# usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq));
+# usage: my %sig = $lei->atfork_child_wq($wq);
+# local @SIG{keys %sig} = values %sig;
sub atfork_child_wq {
my ($self, $wq) = @_;
- return () if $self->{0}; # did not fork
- $self->{$_} = $wq->{$_} for (0..2);
- $self->{sock} = $wq->{3} // die 'BUG: no {sock}'; # may be undef
- my $oldpipe = $SIG{PIPE};
+ @$self{qw(0 1 2 sock)} = delete(@$wq{0..3});
%PATH2CFG = ();
@TO_CLOSE_ATFORK_CHILD = ();
- (
- __WARN__ => sub { err($self, @_) },
- PIPE => sub {
- $self->x_it(141);
- $oldpipe->() if ref($oldpipe) eq 'CODE';
- }
- );
+ (__WARN__ => sub { err($self, @_) },
+ PIPE => sub {
+ $self->x_it(13); # SIGPIPE = 13
+ # we need to close explicitly to avoid Perl warning on SIGPIPE
+ close($_) for (delete @$self{1..2});
+ die bless(\"$_[0]", 'PublicInbox::SIGPIPE'),
+ });
}
# usage: ($lei, @io) = $lei->atfork_parent_wq($wq);
@@ -300,9 +297,9 @@ sub atfork_parent_wq {
my $ret = bless { %$self }, ref($self);
$self->{env} = $env;
delete @$ret{qw(-lei_store cfg pgr)};
- ($ret, delete @$ret{qw(0 1 2 sock)});
+ ($ret, delete @$ret{0..2}, delete($ret->{sock}) // ());
} else {
- ($self, @$self{qw(0 1 2 sock)});
+ ($self, @$self{0..2}, $self->{sock} // ());
}
}
@@ -647,7 +644,7 @@ sub start_pager {
my $buf = "exec 1\0".$pager;
while (my ($k, $v) = each %new_env) { $buf .= "\0$k=$v" };
my $fds = [ map { fileno($_) } @$rdr{0..2} ];
- $send_cmd->($sock, $fds, $buf .= "\n", 0);
+ $send_cmd->($sock, $fds, $buf, MSG_EOR);
} else {
$pgr->[0] = spawn([$pager], $env, $rdr);
}
@@ -660,50 +657,39 @@ sub start_pager {
sub stop_pager {
my ($self) = @_;
my $pgr = delete($self->{pgr}) or return;
- my $pid = $pgr->[0];
- close $self->{1};
- # {2} may not be redirected
- $self->{1} = $pgr->[1];
$self->{2} = $pgr->[2];
+ # do not restore original stdout, just close it so we error out
+ close(delete($self->{1})) if $self->{1};
+ my $pid = $pgr->[0];
dwaitpid($pid, undef, $self->{sock}) if $pid;
}
sub accept_dispatch { # Listener {post_accept} callback
my ($sock) = @_; # ignore other
- $sock->blocking(1);
$sock->autoflush(1);
my $self = bless { sock => $sock }, __PACKAGE__;
- vec(my $rin = '', fileno($sock), 1) = 1;
- # `say $sock' triggers "die" in lei(1)
- my $buf;
- if (select(my $rout = $rin, undef, undef, 1)) {
- my @fds = $recv_cmd->($sock, $buf, 4096 * 33); # >MAX_ARG_STRLEN
- if (scalar(@fds) == 3) {
- my $i = 0;
- for my $rdr (qw(<&= >&= >&=)) {
- my $fd = shift(@fds);
- if (open(my $fh, $rdr, $fd)) {
- $self->{$i++} = $fh;
- } else {
- say $sock "open($rdr$fd) (FD=$i): $!";
- return;
- }
+ vec(my $rvec, fileno($sock), 1) = 1;
+ select($rvec, undef, undef, 1) or
+ return send($sock, 'timed out waiting to recv FDs', MSG_EOR);
+ my @fds = $recv_cmd->($sock, my $buf, 4096 * 33); # >MAX_ARG_STRLEN
+ if (scalar(@fds) == 3) {
+ my $i = 0;
+ for my $rdr (qw(<&= >&= >&=)) {
+ my $fd = shift(@fds);
+ if (open(my $fh, $rdr, $fd)) {
+ $self->{$i++} = $fh;
+ next;
}
- } else {
- say $sock "recv_cmd failed: $!";
- return;
+ return send($sock, "open($rdr$fd) (FD=$i): $!", MSG_EOR);
}
} else {
- say $sock "timed out waiting to recv FDs";
- return;
+ return send($sock, "recv_cmd failed: $!", MSG_EOR);
}
$self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY
# $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV);
# $buf = "$$\0$argc\0".join("\0", @ARGV).$ENV_STR."\0\0";
- if (substr($buf, -2, 2, '') ne "\0\0") { # s/\0\0\z//
- say $sock "request command truncated";
- return;
- }
+ substr($buf, -2, 2, '') eq "\0\0" or # s/\0\0\z//
+ return send($sock, 'request command truncated', MSG_EOR);
my ($argc, @argv) = split(/\0/, $buf, -1);
undef $buf;
my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
@@ -711,23 +697,50 @@ sub accept_dispatch { # Listener {post_accept} callback
local %ENV = %env;
$self->{env} = \%env;
eval { dispatch($self, @argv) };
- say $sock $@ if $@;
+ send($sock, $@, MSG_EOR) if $@;
} else {
- say $sock "chdir($env{PWD}): $!"; # implicit close
+ send($sock, "chdir($env{PWD}): $!", MSG_EOR); # implicit close
}
}
+sub dclose {
+ my ($self) = @_;
+ delete $self->{lxs}; # stops LeiXSearch queries
+ $self->close; # PublicInbox::DS::close
+}
+
# for long-running results
sub event_step {
my ($self) = @_;
local %ENV = %{$self->{env}};
- eval {}; # TODO
- if ($@) {
- say { $self->{sock} } $@;
- $self->close; # PublicInbox::DS::close
+ my $sock = $self->{sock};
+ eval {
+ while (my @fds = $recv_cmd->($sock, my $buf, 4096)) {
+ if (scalar(@fds) == 1 && !defined($fds[0])) {
+ return if $! == EAGAIN;
+ next if $! == EINTR;
+ last if $! == ECONNRESET;
+ die "recvmsg: $!";
+ }
+ for my $fd (@fds) {
+ open my $rfh, '+<&=', $fd;
+ }
+ die "unrecognized client signal: $buf";
+ }
+ dclose($self);
+ };
+ if (my $err = $@) {
+ eval { $self->fail($err) };
+ dclose($self);
}
}
+sub event_step_init {
+ my ($self) = @_;
+ $self->{sock}->blocking(0);
+ $self->SUPER::new($self->{sock}, EPOLLIN|EPOLLET);
+}
+
sub noop {}
our $oldset; sub oldset { $oldset }
@@ -742,7 +755,7 @@ sub lazy_start {
die "connect($path): $!";
}
umask(077) // die("umask(077): $!");
- socket(my $l, AF_UNIX, SOCK_STREAM, 0) or die "socket: $!";
+ socket(my $l, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!";
bind($l, pack_sockaddr_un($path)) or die "bind($path): $!";
listen($l, 1024) or die "listen: $!";
my @st = stat($path) or die "stat($path): $!";
@@ -793,7 +806,7 @@ sub lazy_start {
USR2 => \&noop,
};
my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
- local %SIG = (%SIG, %$sig) if !$sigfd;
+ local @SIG{keys %$sig} = values(%$sig) unless $sigfd;
local $SIG{PIPE} = 'IGNORE';
if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets
push @TO_CLOSE_ATFORK_CHILD, $sigfd->{sock};
@@ -853,24 +866,19 @@ sub oneshot {
local $quit = $exit if $exit;
local %PATH2CFG;
umask(077) // die("umask(077): $!");
- local $SIG{PIPE} = sub { die(bless(\"$_[0]", 'PublicInbox::SIGPIPE')) };
- eval {
- my $self = bless {
- 0 => *STDIN{GLOB},
- 1 => *STDOUT{GLOB},
- 2 => *STDERR{GLOB},
- env => \%ENV
- }, __PACKAGE__;
- dispatch($self, @ARGV);
- };
- die $@ if $@ && ref($@) ne 'PublicInbox::SIGPIPE';
+ dispatch((bless {
+ 0 => *STDIN{GLOB},
+ 1 => *STDOUT{GLOB},
+ 2 => *STDERR{GLOB},
+ env => \%ENV
+ }, __PACKAGE__), @ARGV);
}
# ensures stdout hits the FS before sock disconnects so a client
# can immediately reread it
sub DESTROY {
my ($self) = @_;
- $self->{1}->autoflush(1);
+ $self->{1}->autoflush(1) if $self->{1};
stop_pager($self);
}
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index 8a1f4f82..194c5e28 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -108,8 +108,9 @@ sub _unbless_smsg {
sub ovv_atexit_child {
my ($self, $lei) = @_;
- my $bref = delete $lei->{ovv_buf} or return;
- print { $lei->{1} } $$bref;
+ if (my $bref = delete $lei->{ovv_buf}) {
+ print { $lei->{1} } $$bref;
+ }
}
# JSON module ->pretty output wastes too much vertical white space,
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 7ca01454..1a3e1193 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -40,14 +40,13 @@ sub lei_q {
if ($opt->{external} // 1) {
$self->_externals_each(\&_vivify_external, \@srcs);
}
- my $j = $opt->{jobs} // scalar(@srcs) > 3 ? 3 : scalar(@srcs);
+ my $j = $opt->{jobs} // (scalar(@srcs) > 3 ? 3 : scalar(@srcs));
$j = 1 if !$opt->{thread};
$j++ if $opt->{'local'}; # for sto->search below
- if ($self->{sock}) {
- $self->atfork_prepare_wq($lxs);
- $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
- // $lxs->wq_workers($j);
- }
+ $self->atfork_prepare_wq($lxs);
+ $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
+ // $lxs->wq_workers($j);
+
unshift(@srcs, $sto->search) if $opt->{'local'};
# no forking workers after this
require PublicInbox::LeiOverview;
@@ -77,16 +76,7 @@ sub lei_q {
# my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
$self->{mset_opt} = \%mset_opt;
$self->{ovv}->ovv_begin($self);
- pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
- require PublicInbox::EOFpipe;
- my $eof = PublicInbox::EOFpipe->new($eof_wait, \&query_done, $self);
- $lxs->do_query($self, $qry_done, \@srcs);
- $eof->event_step unless $self->{sock};
-}
-
-sub query_done { # PublicInbox::EOFpipe callback
- my ($self) = @_;
- $self->{ovv}->ovv_end($self);
+ $lxs->do_query($self, \@srcs);
}
1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index c030b2b2..d06b6f1d 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -92,7 +92,9 @@ sub _mset_more ($$) {
sub query_thread_mset { # for --thread
my ($self, $lei, $ibxish) = @_;
- local %SIG = (%SIG, $lei->atfork_child_wq($self));
+ my %sig = $lei->atfork_child_wq($self);
+ local @SIG{keys %sig} = values %sig;
+
my ($srch, $over) = ($ibxish->search, $ibxish->over);
unless ($srch && $over) {
my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
@@ -125,9 +127,10 @@ sub query_thread_mset { # for --thread
sub query_mset { # non-parallel for non-"--thread" users
my ($self, $lei, $srcs) = @_;
+ my %sig = $lei->atfork_child_wq($self);
+ local @SIG{keys %sig} = values %sig;
my $mo = { %{$lei->{mset_opt}} };
my $mset;
- local %SIG = (%SIG, $lei->atfork_child_wq($self));
$self->attach_external($_) for @$srcs;
my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
do {
@@ -143,9 +146,17 @@ sub query_mset { # non-parallel for non-"--thread" users
$lei->{ovv}->ovv_atexit_child($lei);
}
+sub query_done { # PublicInbox::EOFpipe callback
+ my ($lei) = @_;
+ $lei->{ovv}->ovv_end($lei);
+ $lei->dclose;
+}
+
sub do_query {
- my ($self, $lei_orig, $qry_done, $srcs) = @_;
+ my ($self, $lei_orig, $srcs) = @_;
my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+
+ pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
$io[0] = $qry_done; # don't need stdin
$io[1]->autoflush(1);
$io[2]->autoflush(1);
@@ -160,9 +171,20 @@ sub do_query {
for my $rmt (@{$self->{remotes} // []}) {
$self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
}
-
- # sent off to children, they will drop remaining references to it
- close $qry_done;
+ @io = ();
+ close $qry_done; # fully closed when children are done
+
+ # query_done will run when query_*mset close $qry_done
+ if ($lei_orig->{sock}) { # watch for client premature exit
+ require PublicInbox::EOFpipe;
+ PublicInbox::EOFpipe->new($eof_wait, \&query_done, $lei_orig);
+ $lei_orig->{lxs} = $self;
+ $lei_orig->event_step_init;
+ } else {
+ $self->wq_close;
+ read($eof_wait, my $buf, 1); # wait for close($lei->{0})
+ query_done($lei_orig); # may SIGPIPE
+ }
}
sub ipc_atfork_child {
diff --git a/script/lei b/script/lei
index 5c32ab88..9610a876 100755
--- a/script/lei
+++ b/script/lei
@@ -3,32 +3,47 @@
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
use strict;
use v5.10.1;
-use Socket qw(AF_UNIX SOCK_STREAM pack_sockaddr_un);
+use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
+use Errno qw(EINTR ECONNRESET);
use PublicInbox::CmdIPC4;
my $narg = 4;
+my ($sock, $pwd);
my $recv_cmd = PublicInbox::CmdIPC4->can('recv_cmd4');
my $send_cmd = PublicInbox::CmdIPC4->can('send_cmd4') // do {
require PublicInbox::Spawn; # takes ~50ms even if built *sigh*
- $narg = 4;
$recv_cmd = PublicInbox::Spawn->can('recv_cmd4');
PublicInbox::Spawn->can('send_cmd4');
};
+sub sigchld {
+ my ($sig) = @_;
+ my $flags = $sig ? POSIX::WNOHANG() : 0;
+ while (waitpid(-1, $flags) > 0) {}
+}
+
sub exec_cmd {
my ($fds, $argc, @argv) = @_;
- my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
- my @m = (*STDIN{IO}, '<&=', *STDOUT{IO}, '>&=',
- *STDERR{IO}, '>&=');
+ my @m = (*STDIN{IO}, '<&=', *STDOUT{IO}, '>&=', *STDERR{IO}, '>&=');
+ my @rdr;
for my $fd (@$fds) {
my ($old_io, $mode) = splice(@m, 0, 2);
- open($old_io, $mode, $fd) or die "open $mode$fd: $!";
+ open(my $tmpfh, $mode, $fd) or die "open $mode$fd: $!";
+ push @rdr, $old_io, $mode, $tmpfh;
+ }
+ require POSIX; # WNOHANG
+ $SIG{CHLD} = \&sigchld;
+ my $pid = fork // die "fork: $!";
+ if ($pid == 0) {
+ my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
+ while (my ($old_io, $mode, $tmpfh) = splice(@rdr, 0, 3)) {
+ open $old_io, $mode, $tmpfh or die "open $mode: $!";
+ }
+ %ENV = (%ENV, %env);
+ exec(@argv);
+ die "exec: @argv: $!";
}
- %ENV = (%ENV, %env);
- exec(@argv);
- die "exec: @argv: $!";
}
-my ($sock, $pwd);
if ($send_cmd && eval {
my $path = do {
my $runtime_dir = ($ENV{XDG_RUNTIME_DIR} // '') . '/lei';
@@ -40,10 +55,10 @@ if ($send_cmd && eval {
require File::Path;
File::Path::mkpath($runtime_dir, 0, 0700);
}
- "$runtime_dir/$narg.sock";
+ "$runtime_dir/$narg.seq.sock";
};
my $addr = pack_sockaddr_un($path);
- socket($sock, AF_UNIX, SOCK_STREAM, 0) or die "socket: $!";
+ socket($sock, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!";
unless (connect($sock, $addr)) { # start the daemon if not started
local $ENV{PERL5LIB} = join(':', @INC);
open(my $daemon, '-|', $^X, qw[-MPublicInbox::LEI
@@ -73,22 +88,41 @@ Falling back to (slow) one-shot mode
}
1;
}) { # (Socket::MsgHdr|Inline::C), $sock, $pwd are all available:
- local $ENV{PWD} = $pwd;
+ $ENV{PWD} = $pwd;
my $buf = join("\0", scalar(@ARGV), @ARGV);
while (my ($k, $v) = each %ENV) { $buf .= "\0$k=$v" }
$buf .= "\0\0";
- select $sock;
- $| = 1; # unbuffer selected $sock
- $send_cmd->($sock, [ 0, 1, 2 ], $buf, 0);
- while (my (@fds) = $recv_cmd->($sock, $buf, 4096 * 33)) {
- if ($buf =~ /\Aexit=([0-9]+)\n\z/) {
- exit($1);
- } elsif ($buf =~ /\Aexec (.+)\n\z/) {
+ $send_cmd->($sock, [ 0, 1, 2 ], $buf, MSG_EOR);
+ $SIG{TERM} = $SIG{INT} = $SIG{QUIT} = sub {
+ my ($sig) = @_; # 'TERM', not an integer :<
+ $SIG{$sig} = 'DEFAULT';
+ kill($sig, $$); # exit($signo + 128)
+ };
+ my $x_it_code = 0;
+ while (1) {
+ my (@fds) = $recv_cmd->($sock, $buf, 4096 * 33);
+ if (scalar(@fds) == 1 && !defined($fds[0])) {
+ last if $! == ECONNRESET;
+ next if $! == EINTR;
+ die "recvmsg: $!";
+ }
+ last if $buf eq '';
+ if ($buf =~ /\Ax_it ([0-9]+)\z/) {
+ $x_it_code = $1 + 0;
+ last;
+ } elsif ($buf =~ /\Aexec (.+)\z/) {
exec_cmd(\@fds, split(/\0/, $1));
} else {
+ sigchld();
die $buf;
}
}
+ sigchld();
+ if (my $sig = ($x_it_code & 127)) {
+ kill $sig, $$;
+ sleep;
+ }
+ exit($x_it_code >> 8);
} else { # for systems lacking Socket::MsgHdr or Inline::C
warn $@ if $@;
require PublicInbox::LEI;
diff --git a/t/lei.t b/t/lei.t
index 6819f182..3ebaade6 100644
--- a/t/lei.t
+++ b/t/lei.t
@@ -215,7 +215,7 @@ SKIP: { # real socket
skip 'Socket::MsgHdr or Inline::C missing or unconfigured', $nr;
local $ENV{XDG_RUNTIME_DIR} = "$home/xdg_run";
- my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/$nfd.sock";
+ my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/$nfd.seq.sock";
ok($lei->('daemon-pid'), 'daemon-pid');
is($err, '', 'no error from daemon-pid');
diff --git a/xt/lei-sigpipe.t b/xt/lei-sigpipe.t
new file mode 100644
index 00000000..4d35bbb3
--- /dev/null
+++ b/xt/lei-sigpipe.t
@@ -0,0 +1,32 @@
+#!perl -w
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+use POSIX qw(WTERMSIG WIFSIGNALED SIGPIPE);
+require_mods(qw(json DBD::SQLite Search::Xapian));
+# XXX this needs an already configured lei instance with many messages
+
+my $do_test = sub {
+ my $env = shift // {};
+ pipe(my ($r, $w)) or BAIL_OUT $!;
+ open my $err, '+>', undef or BAIL_OUT $!;
+ my $opt = { run_mode => 0, 1 => $w, 2 => $err };
+ my $tp = start_script([qw(lei q -t), 'bytes:1..'], $env, $opt);
+ close $w;
+ sysread($r, my $buf, 1);
+ close $r; # trigger SIGPIPE
+ $tp->join;
+ ok(WIFSIGNALED($?), 'signaled');
+ is(WTERMSIG($?), SIGPIPE, 'got SIGPIPE');
+ seek($err, 0, 0);
+ my @err = grep(!m{mkdir /dev/null\b}, <$err>);
+ is_deeply(\@err, [], 'no errors');
+};
+
+$do_test->();
+$do_test->({XDG_RUNTIME_DIR => '/dev/null'});
+
+done_testing;
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 03/14] daemon+watch: fix localization of %SIG for non-signalfd users
2021-01-14 7:06 [PATCH 00/14] lei: another pile of changes Eric Wong
2021-01-14 7:06 ` [PATCH 01/14] cmd_ipc: support + test EINTR + EAGAIN, no FDs Eric Wong
2021-01-14 7:06 ` [PATCH 02/14] lei: test SIGPIPE, stop xsearch workers on client abort Eric Wong
@ 2021-01-14 7:06 ` Eric Wong
2021-01-14 7:06 ` [PATCH 04/14] lei: do not unlink socket path at exit Eric Wong
` (10 subsequent siblings)
13 siblings, 0 replies; 16+ messages in thread
From: Eric Wong @ 2021-01-14 7:06 UTC (permalink / raw)
To: meta
It turns out "local" did not take effect in the way we used it:
http://nntp.perl.org/group/perl.perl5.porters/258784
<CAHhgV8hPbcmkzWizp6Vijw921M5BOXixj4+zTh3nRS9vRBYk8w@mail.gmail.com>
Fortunately, none of the old use cases seem affected, unlike the
previous lei change to ensure consistent SIGPIPE handling.
---
lib/PublicInbox/Daemon.pm | 4 ++--
script/public-inbox-watch | 2 +-
2 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index 4b738b7c..f5543c85 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -514,7 +514,7 @@ EOF
CHLD => \&reap_children,
};
my $sigfd = PublicInbox::Sigfd->new($sig, 0);
- local %SIG = (%SIG, %$sig) if !$sigfd;
+ local @SIG{keys %$sig} = values(%$sig) unless $sigfd;
PublicInbox::DS::sig_setmask($oldset) if !$sigfd;
while (1) { # main loop
my $n = scalar keys %pids;
@@ -628,7 +628,7 @@ sub daemon_loop ($$$$) {
PublicInbox::Listener->new($_, $tls_cb || $post_accept)
} @listeners;
my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
- local %SIG = (%SIG, %$sig) if !$sigfd;
+ local @SIG{keys %$sig} = values(%$sig) unless $sigfd;
if (!$sigfd) {
# wake up every second to accept signals if we don't
# have signalfd or IO::KQueue:
diff --git a/script/public-inbox-watch b/script/public-inbox-watch
index 10c7cd6f..86349d71 100755
--- a/script/public-inbox-watch
+++ b/script/public-inbox-watch
@@ -58,7 +58,7 @@ if ($watch) {
PublicInbox::DS::requeue($scan) if $do_scan;
my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
- local %SIG = (%SIG, %$sig) if !$sigfd;
+ local @SIG{keys %$sig} = values(%$sig) unless $sigfd;
if (!$sigfd) {
PublicInbox::DS::sig_setmask($oldset);
PublicInbox::DS->SetLoopTimeout(1000);
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 04/14] lei: do not unlink socket path at exit
2021-01-14 7:06 [PATCH 00/14] lei: another pile of changes Eric Wong
` (2 preceding siblings ...)
2021-01-14 7:06 ` [PATCH 03/14] daemon+watch: fix localization of %SIG for non-signalfd users Eric Wong
@ 2021-01-14 7:06 ` Eric Wong
2021-01-14 7:06 ` [PATCH 05/14] lei: reduce live FD references in wq child Eric Wong
` (9 subsequent siblings)
13 siblings, 0 replies; 16+ messages in thread
From: Eric Wong @ 2021-01-14 7:06 UTC (permalink / raw)
To: meta
This matches existing -httpd/-nntpd/-imapd daemon behavior.
From what I can recall, it is less racy for the process doing
bind(2) to unlink it if stale.
---
lib/PublicInbox/LEI.pm | 1 -
t/lei.t | 4 ++--
2 files changed, 2 insertions(+), 3 deletions(-)
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 2889fa76..7a1df0bb 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -789,7 +789,6 @@ sub lazy_start {
local $quit = sub {
$exit_code //= shift;
my $listener = $l or exit($exit_code);
- unlink($path) if defined($path);
# closing eof_w triggers \&noop wakeup
$eof_w = $l = $path = undef;
$listener->close; # DS::close
diff --git a/t/lei.t b/t/lei.t
index 3ebaade6..240735bf 100644
--- a/t/lei.t
+++ b/t/lei.t
@@ -237,13 +237,13 @@ SKIP: { # real socket
kill(0, $pid) or last;
tick();
}
- ok(!-S $sock, 'sock gone');
+ ok(-S $sock, 'sock still exists');
ok(!kill(0, $pid), 'pid gone after stop');
ok($lei->(qw(daemon-pid)), 'daemon-pid');
chomp(my $new_pid = $out);
ok(kill(0, $new_pid), 'new pid is running');
- ok(-S $sock, 'sock exists again');
+ ok(-S $sock, 'sock still exists');
for my $sig (qw(-0 -CHLD)) {
ok($lei->('daemon-kill', $sig), "handles $sig");
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 05/14] lei: reduce live FD references in wq child
2021-01-14 7:06 [PATCH 00/14] lei: another pile of changes Eric Wong
` (3 preceding siblings ...)
2021-01-14 7:06 ` [PATCH 04/14] lei: do not unlink socket path at exit Eric Wong
@ 2021-01-14 7:06 ` Eric Wong
2021-01-14 7:06 ` [PATCH 06/14] lei: rely on localized $current_lei for warnings Eric Wong
` (8 subsequent siblings)
13 siblings, 0 replies; 16+ messages in thread
From: Eric Wong @ 2021-01-14 7:06 UTC (permalink / raw)
To: meta
We can shrink the @TO_CLOSE_ATFORK_CHILD array by two
elements, at least. I may be possible to eliminate this
array entirely but clobbering $quit doesn't seem to
remove references to $eof_w or the $listener socket.
---
lib/PublicInbox/LEI.pm | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 7a1df0bb..fd2b722c 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -279,6 +279,7 @@ sub atfork_child_wq {
my ($self, $wq) = @_;
@$self{qw(0 1 2 sock)} = delete(@$wq{0..3});
%PATH2CFG = ();
+ $quit = \&CORE::exit;
@TO_CLOSE_ATFORK_CHILD = ();
(__WARN__ => sub { err($self, @_) },
PIPE => sub {
@@ -782,8 +783,8 @@ sub lazy_start {
return if $pid;
$0 = "lei-daemon $path";
local %PATH2CFG;
- local @TO_CLOSE_ATFORK_CHILD = ($l, $eof_r, $eof_w);
- $_->blocking(0) for ($l, $eof_r, $eof_w);
+ local @TO_CLOSE_ATFORK_CHILD = ($l, $eof_w);
+ $l->blocking(0);
$l = PublicInbox::Listener->new($l, \&accept_dispatch, $l);
my $exit_code;
local $quit = sub {
@@ -795,6 +796,7 @@ sub lazy_start {
PublicInbox::DS->SetLoopTimeout(1000);
};
PublicInbox::EOFpipe->new($eof_r, \&noop, undef);
+ undef $eof_r;
my $sig = {
CHLD => \&PublicInbox::DS::enqueue_reap,
QUIT => $quit,
@@ -806,9 +808,10 @@ sub lazy_start {
};
my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
local @SIG{keys %$sig} = values(%$sig) unless $sigfd;
+ undef $sig;
local $SIG{PIPE} = 'IGNORE';
if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets
- push @TO_CLOSE_ATFORK_CHILD, $sigfd->{sock};
+ undef $sigfd;
PublicInbox::DS->SetLoopTimeout(5000);
} else {
# wake up every second to accept signals if we don't
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 06/14] lei: rely on localized $current_lei for warnings
2021-01-14 7:06 [PATCH 00/14] lei: another pile of changes Eric Wong
` (4 preceding siblings ...)
2021-01-14 7:06 ` [PATCH 05/14] lei: reduce live FD references in wq child Eric Wong
@ 2021-01-14 7:06 ` Eric Wong
2021-01-14 7:06 ` [PATCH 07/14] lei_dedupe+shared_kv: ensure round-tripping serialization Eric Wong
` (7 subsequent siblings)
13 siblings, 0 replies; 16+ messages in thread
From: Eric Wong @ 2021-01-14 7:06 UTC (permalink / raw)
To: meta
This lets us get rid of the Sys::Syslog import and __WARN__
override in LeiXSearch, though we still need it with
->atfork_child_wq.
---
lib/PublicInbox/LEI.pm | 7 +++++--
lib/PublicInbox/LeiXSearch.pm | 7 -------
2 files changed, 5 insertions(+), 9 deletions(-)
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index fd2b722c..a8fea16d 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -26,6 +26,7 @@ use Text::Wrap qw(wrap);
use File::Path qw(mkpath);
use File::Spec;
our $quit = \&CORE::exit;
+our $current_lei;
my ($recv_cmd, $send_cmd);
my $GLP = Getopt::Long::Parser->new;
$GLP->configure(qw(gnu_getopt no_ignore_case auto_abbrev));
@@ -447,7 +448,7 @@ sub optparse ($$$) {
sub dispatch {
my ($self, $cmd, @argv) = @_;
- local $SIG{__WARN__} = sub { err($self, @_) };
+ local $current_lei = $self; # for __WARN__
return _help($self, 'no command given') unless defined($cmd);
my $func = "lei_$cmd";
$func =~ tr/-/_/;
@@ -849,7 +850,9 @@ sub lazy_start {
# STDOUT will cause the calling `lei' client process to finish
# reading the <$daemon> pipe.
openlog($path, 'pid', 'user');
- local $SIG{__WARN__} = sub { syslog('warning', "@_") };
+ local $SIG{__WARN__} = sub {
+ $current_lei ? err($current_lei, @_) : syslog('warning', "@_");
+ };
my $on_destroy = PublicInbox::OnDestroy->new($$, sub {
syslog('crit', "$@") if $@;
});
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index d06b6f1d..68889e81 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -8,7 +8,6 @@ package PublicInbox::LeiXSearch;
use strict;
use v5.10.1;
use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
-use Sys::Syslog qw(syslog);
sub new {
my ($class) = @_;
@@ -187,12 +186,6 @@ sub do_query {
}
}
-sub ipc_atfork_child {
- my ($self) = @_;
- $SIG{__WARN__} = sub { syslog('warning', "@_") };
- $self->SUPER::ipc_atfork_child; # PublicInbox::IPC
-}
-
sub ipc_atfork_prepare {
my ($self) = @_;
$self->wq_set_recv_modes(qw[+<&= >&= >&= +<&=]);
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 07/14] lei_dedupe+shared_kv: ensure round-tripping serialization
2021-01-14 7:06 [PATCH 00/14] lei: another pile of changes Eric Wong
` (5 preceding siblings ...)
2021-01-14 7:06 ` [PATCH 06/14] lei: rely on localized $current_lei for warnings Eric Wong
@ 2021-01-14 7:06 ` Eric Wong
2021-01-14 7:06 ` [PATCH 08/14] lei q: reinstate smsg dedupe Eric Wong
` (6 subsequent siblings)
13 siblings, 0 replies; 16+ messages in thread
From: Eric Wong @ 2021-01-14 7:06 UTC (permalink / raw)
To: meta
We'll be passing these objects via PublicInbox::IPC which uses
Storable (or Sereal), so ensure they're safe to use after
serialization.
---
lib/PublicInbox/LeiDedupe.pm | 29 ++++++++++++++++-------------
lib/PublicInbox/SharedKV.pm | 12 +++++++++---
t/lei_dedupe.t | 13 +++++++++++++
3 files changed, 38 insertions(+), 16 deletions(-)
diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm
index 58eee533..81754361 100644
--- a/lib/PublicInbox/LeiDedupe.pm
+++ b/lib/PublicInbox/LeiDedupe.pm
@@ -43,9 +43,9 @@ sub smsg_hash ($) {
}
# the paranoid option
-sub dedupe_oid () {
- my $skv = PublicInbox::SharedKV->new;
- ($skv, sub { # may be called in a child process
+sub dedupe_oid ($) {
+ my ($skv) = @_;
+ (sub { # may be called in a child process
my ($eml, $oid) = @_;
$skv->set_maybe(_oidbin($oid) // _regen_oid($eml), '');
}, sub {
@@ -55,9 +55,9 @@ sub dedupe_oid () {
}
# dangerous if there's duplicate messages with different Message-IDs
-sub dedupe_mid () {
- my $skv = PublicInbox::SharedKV->new;
- ($skv, sub { # may be called in a child process
+sub dedupe_mid ($) {
+ my ($skv) = @_;
+ (sub { # may be called in a child process
my ($eml, $oid) = @_;
# TODO: lei will support non-public messages w/o Message-ID
my $mid = $eml->header_raw('Message-ID') // _oidbin($oid) //
@@ -73,9 +73,9 @@ sub dedupe_mid () {
}
# our default deduplication strategy (used by v2, also)
-sub dedupe_content () {
- my $skv = PublicInbox::SharedKV->new;
- ($skv, sub { # may be called in a child process
+sub dedupe_content ($) {
+ my ($skv) = @_;
+ (sub { # may be called in a child process
my ($eml) = @_; # oid = $_[1], ignored
$skv->set_maybe(content_hash($eml), '');
}, sub {
@@ -86,7 +86,7 @@ sub dedupe_content () {
# no deduplication at all
sub true { 1 }
-sub dedupe_none () { (undef, \&true, \&true) }
+sub dedupe_none ($) { (\&true, \&true) }
sub new {
my ($cls, $lei, $dst) = @_;
@@ -94,10 +94,12 @@ sub new {
# allow "none" to bypass Eml->new if writing to directory:
return if ($dd eq 'none' && substr($dst // '', -1) eq '/');
+ my $m = "dedupe_$dd";
+ $cls->can($m) or die "unsupported dedupe strategy: $dd\n";
+ my $skv = $dd eq 'none' ? undef : PublicInbox::SharedKV->new;
- my $dd_new = $cls->can("dedupe_$dd") //
- die "unsupported dedupe strategy: $dd\n";
- bless [ $dd_new->() ], $cls; # [ $skv, $cb ]
+ # [ $skv, $eml_cb, $smsg_cb, "dedupe_$dd" ]
+ bless [ $skv, undef, undef, $m ], $cls;
}
# returns true on unseen messages according to the deduplication strategy,
@@ -115,6 +117,7 @@ sub is_smsg_dup {
sub prepare_dedupe {
my ($self) = @_;
my $skv = $self->[0];
+ $self->[1] or @$self[1,2] = $self->can($self->[3])->($skv);
$skv ? $skv->dbh : undef;
}
diff --git a/lib/PublicInbox/SharedKV.pm b/lib/PublicInbox/SharedKV.pm
index d75d8998..072c94ca 100644
--- a/lib/PublicInbox/SharedKV.pm
+++ b/lib/PublicInbox/SharedKV.pm
@@ -8,9 +8,10 @@ package PublicInbox::SharedKV;
use strict;
use v5.10.1;
use parent qw(PublicInbox::Lock);
-use File::Temp 0.19 (); # 0.19 for ->newdir
+use File::Temp qw(tempdir);
use DBI ();
use PublicInbox::Spawn;
+use File::Path qw(rmtree);
sub dbh {
my ($self, $lock) = @_;
@@ -44,8 +45,8 @@ sub new {
my ($cls, $dir, $base, $opt) = @_;
my $self = bless { opt => $opt }, $cls;
unless (defined $dir) {
- $self->{tmp} = File::Temp->newdir('kv-XXXXXX', TMPDIR => 1);
- $dir = $self->{tmp}->dirname;
+ $self->{tmpdir} = $dir = tempdir('skv-XXXXXX', TMPDIR => 1);
+ $self->{tmpid} = "$$.$self";
}
-d $dir or mkdir($dir) or die "mkdir($dir): $!";
$base //= '';
@@ -145,4 +146,9 @@ SELECT COUNT(k) FROM kv
$sth->fetchrow_array;
}
+sub DESTROY {
+ my ($self) = @_;
+ rmtree($self->{tmpdir}) if ($self->{tmpid} // '') eq "$$.$self";
+}
+
1;
diff --git a/t/lei_dedupe.t b/t/lei_dedupe.t
index 6e971b9b..bcb06a0a 100644
--- a/t/lei_dedupe.t
+++ b/t/lei_dedupe.t
@@ -17,8 +17,18 @@ my $smsg = bless { ds => time }, 'PublicInbox::Smsg';
$smsg->populate($eml);
$smsg->{$_} //= '' for (qw(to cc references)) ;
+my $check_storable = sub {
+ my ($x) = @_;
+ SKIP: {
+ require_mods('Storable', 1);
+ my $dup = Storable::thaw(Storable::freeze($x));
+ is_deeply($dup, $x, "$x->[3] round-trips through storable");
+ }
+};
+
my $lei = { opt => { dedupe => 'none' } };
my $dd = PublicInbox::LeiDedupe->new($lei);
+$check_storable->($dd);
$dd->prepare_dedupe;
ok(!$dd->is_dup($eml), '1st is_dup w/o dedupe');
ok(!$dd->is_dup($eml), '2nd is_dup w/o dedupe');
@@ -29,6 +39,7 @@ ok(!$dd->is_smsg_dup($smsg), 'smsg dedupe none 2');
for my $strat (undef, 'content') {
$lei->{opt}->{dedupe} = $strat;
$dd = PublicInbox::LeiDedupe->new($lei);
+ $check_storable->($dd);
$dd->prepare_dedupe;
my $desc = $strat // 'default';
ok(!$dd->is_dup($eml), "1st is_dup with $desc dedupe");
@@ -43,6 +54,7 @@ like($@, qr/unsupported.*bogus/, 'died on bogus strategy');
$lei->{opt}->{dedupe} = 'mid';
$dd = PublicInbox::LeiDedupe->new($lei);
+$check_storable->($dd);
$dd->prepare_dedupe;
ok(!$dd->is_dup($eml), '1st is_dup with mid dedupe');
ok($dd->is_dup($eml), '2nd seen with mid dedupe');
@@ -52,6 +64,7 @@ ok($dd->is_smsg_dup($smsg), 'smsg mid dedupe reject');
$lei->{opt}->{dedupe} = 'oid';
$dd = PublicInbox::LeiDedupe->new($lei);
+$check_storable->($dd);
$dd->prepare_dedupe;
# --augment won't have OIDs:
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 08/14] lei q: reinstate smsg dedupe
2021-01-14 7:06 [PATCH 00/14] lei: another pile of changes Eric Wong
` (6 preceding siblings ...)
2021-01-14 7:06 ` [PATCH 07/14] lei_dedupe+shared_kv: ensure round-tripping serialization Eric Wong
@ 2021-01-14 7:06 ` Eric Wong
2021-01-14 7:06 ` [PATCH 09/14] search: rename "ts:" prefix to "rt:" Eric Wong
` (5 subsequent siblings)
13 siblings, 0 replies; 16+ messages in thread
From: Eric Wong @ 2021-01-14 7:06 UTC (permalink / raw)
To: meta
Now that dedupe is serialization and fork-safe, we can
wire it back up in our query results paths.
---
lib/PublicInbox/LeiQuery.pm | 5 ++---
lib/PublicInbox/LeiXSearch.pm | 8 ++++++--
2 files changed, 8 insertions(+), 5 deletions(-)
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 1a3e1193..69d2f9a6 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -26,14 +26,13 @@ sub lei_q {
my $sto = $self->_lei_store(1);
my $cfg = $self->_lei_cfg(1);
my $opt = $self->{opt};
- require PublicInbox::LeiDedupe;
- my $dd = PublicInbox::LeiDedupe->new($self);
# --local is enabled by default
# src: LeiXSearch || LeiSearch || Inbox
my @srcs;
require PublicInbox::LeiXSearch;
require PublicInbox::LeiOverview;
+ require PublicInbox::LeiDedupe;
my $lxs = PublicInbox::LeiXSearch->new;
# --external is enabled by default, but allow --no-external
@@ -49,8 +48,8 @@ sub lei_q {
unshift(@srcs, $sto->search) if $opt->{'local'};
# no forking workers after this
- require PublicInbox::LeiOverview;
$self->{ovv} = PublicInbox::LeiOverview->new($self);
+ $self->{dd} = PublicInbox::LeiDedupe->new($self);
my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
$mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
$mset_opt{qstr} = join(' ', map {;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 68889e81..80e7a7f7 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -103,6 +103,8 @@ sub query_thread_mset { # for --thread
my $mo = { %{$lei->{mset_opt}} };
my $mset;
my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+ my $dd = $lei->{dd};
+ $dd->prepare_dedupe;
do {
$mset = $srch->mset($mo->{qstr}, $mo);
my $ids = $srch->mset_to_artnums($mset, $mo);
@@ -112,7 +114,7 @@ sub query_thread_mset { # for --thread
while ($over->expand_thread($ctx)) {
for my $n (@{$ctx->{xids}}) {
my $smsg = $over->get_art($n) or next;
- # next if $dd->is_smsg_dup($smsg); TODO
+ next if $dd->is_smsg_dup($smsg);
my $mitem = delete $n2item{$smsg->{num}};
$each_smsg->($smsg, $mitem);
# $self->out($buf .= $ORS);
@@ -132,11 +134,13 @@ sub query_mset { # non-parallel for non-"--thread" users
my $mset;
$self->attach_external($_) for @$srcs;
my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+ my $dd = $lei->{dd};
+ $dd->prepare_dedupe;
do {
$mset = $self->mset($mo->{qstr}, $mo);
for my $it ($mset->items) {
my $smsg = smsg_for($self, $it) or next;
- # next if $dd->is_smsg_dup($smsg);
+ next if $dd->is_smsg_dup($smsg);
$each_smsg->($smsg, $it);
# $self->out($buf .= $ORS) if defined $buf;
#$emit_cb->($smsg);
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 09/14] search: rename "ts:" prefix to "rt:"
2021-01-14 7:06 [PATCH 00/14] lei: another pile of changes Eric Wong
` (7 preceding siblings ...)
2021-01-14 7:06 ` [PATCH 08/14] lei q: reinstate smsg dedupe Eric Wong
@ 2021-01-14 7:06 ` Eric Wong
2021-01-14 7:06 ` [PATCH 10/14] lei_overview: rename "references" to "refs" Eric Wong
` (4 subsequent siblings)
13 siblings, 0 replies; 16+ messages in thread
From: Eric Wong @ 2021-01-14 7:06 UTC (permalink / raw)
To: meta
Meaning "Received time", as it is the best description of the
value we use from the "Received:" header, if present. JMAP
calls it "receivedAt", but "rt:" seems like a better
abbreviation being in line with "dt:" for the "Date" header.
"Timestamp" ("ts") was potentially ambiguous given the presence
of the "Date" header.
---
lib/PublicInbox/IMAPsearchqp.pm | 6 +++---
lib/PublicInbox/LeiOverview.pm | 2 +-
lib/PublicInbox/Search.pm | 2 +-
t/imap_searchqp.t | 6 +++---
4 files changed, 8 insertions(+), 8 deletions(-)
diff --git a/lib/PublicInbox/IMAPsearchqp.pm b/lib/PublicInbox/IMAPsearchqp.pm
index 78d9a206..2fb92bb8 100644
--- a/lib/PublicInbox/IMAPsearchqp.pm
+++ b/lib/PublicInbox/IMAPsearchqp.pm
@@ -124,7 +124,7 @@ sub ON {
my ($self, $item) = @_;
my $ts = yyyymmdd($item);
my $end = $ts + 86399; # no leap day
- push @{$self->{xap}}, "ts:$ts..$end";
+ push @{$self->{xap}}, "rt:$ts..$end";
my $sql = $self->{sql} or return 1;
$$sql .= " AND ts >= $ts AND ts <= $end";
}
@@ -132,7 +132,7 @@ sub ON {
sub BEFORE {
my ($self, $item) = @_;
my $ts = yyyymmdd($item);
- push @{$self->{xap}}, "ts:..$ts";
+ push @{$self->{xap}}, "rt:..$ts";
my $sql = $self->{sql} or return 1;
$$sql .= " AND ts <= $ts";
}
@@ -140,7 +140,7 @@ sub BEFORE {
sub SINCE {
my ($self, $item) = @_;
my $ts = yyyymmdd($item);
- push @{$self->{xap}}, "ts:$ts..";
+ push @{$self->{xap}}, "rt:$ts..";
my $sql = $self->{sql} or return 1;
$$sql .= " AND ts >= $ts";
}
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index 194c5e28..080fe837 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -86,7 +86,7 @@ sub _unbless_smsg {
my ($smsg, $mitem) = @_;
delete @$smsg{qw(lines bytes num tid)};
- $smsg->{rcvd} = _iso8601(delete $smsg->{ts}); # JMAP receivedAt
+ $smsg->{rt} = _iso8601(delete $smsg->{ts}); # JMAP receivedAt
$smsg->{dt} = _iso8601(delete $smsg->{ds}); # JMAP UTCDate
$smsg->{relevance} = get_pct($mitem) if $mitem;
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index 7f68ee01..a4b40f94 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -376,7 +376,7 @@ sub qparse_new ($) {
# for IMAP, undocumented for WWW and may be split off go away
$cb->($qp, $NVRP->new(BYTES, 'bytes:'));
- $cb->($qp, $NVRP->new(TS, 'ts:'));
+ $cb->($qp, $NVRP->new(TS, 'rt:'));
$cb->($qp, $NVRP->new(UID, 'uid:'));
while (my ($name, $prefix) = each %bool_pfx_external) {
diff --git a/t/imap_searchqp.t b/t/imap_searchqp.t
index 049fd680..6b4121ea 100644
--- a/t/imap_searchqp.t
+++ b/t/imap_searchqp.t
@@ -76,17 +76,17 @@ is($q->{xap}, 'c:"b" d:..19931002', 'compound query w/ parens');
$q = $parse->($s = qq{BEFORE 2-Oct-1993});
is_deeply($q->{sql}, \" AND ts <= $t0", 'BEFORE SQL');
$q = $parse->("FROM z $s");
- is($q->{xap}, qq{f:"z" ts:..$t0}, 'BEFORE Xapian');
+ is($q->{xap}, qq{f:"z" rt:..$t0}, 'BEFORE Xapian');
$q = $parse->($s = qq{SINCE 2-Oct-1993});
is_deeply($q->{sql}, \" AND ts >= $t0", 'SINCE SQL');
$q = $parse->("FROM z $s");
- is($q->{xap}, qq{f:"z" ts:$t0..}, 'SINCE Xapian');
+ is($q->{xap}, qq{f:"z" rt:$t0..}, 'SINCE Xapian');
$q = $parse->($s = qq{ON 2-Oct-1993});
is_deeply($q->{sql}, \" AND ts >= $t0 AND ts <= $t1", 'ON SQL');
$q = $parse->("FROM z $s");
- is($q->{xap}, qq{f:"z" ts:$t0..$t1}, 'ON Xapian');
+ is($q->{xap}, qq{f:"z" rt:$t0..$t1}, 'ON Xapian');
}
{
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 10/14] lei_overview: rename "references" to "refs"
2021-01-14 7:06 [PATCH 00/14] lei: another pile of changes Eric Wong
` (8 preceding siblings ...)
2021-01-14 7:06 ` [PATCH 09/14] search: rename "ts:" prefix to "rt:" Eric Wong
@ 2021-01-14 7:06 ` Eric Wong
2021-01-14 7:06 ` [PATCH 11/14] lei: q: lock stdout on overview output Eric Wong
` (3 subsequent siblings)
13 siblings, 0 replies; 16+ messages in thread
From: Eric Wong @ 2021-01-14 7:06 UTC (permalink / raw)
To: meta
"references" was too long of a name compared to the other field
names we output in the JSON. While we currently don't have a
"refs:" search prefix for the "References:" header, we may in
the future.
---
lib/PublicInbox/LeiOverview.pm | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index 080fe837..ec0921ba 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -91,7 +91,7 @@ sub _unbless_smsg {
$smsg->{relevance} = get_pct($mitem) if $mitem;
if (my $r = delete $smsg->{references}) {
- $smsg->{references} = [
+ $smsg->{refs} = [
map { "<$_>" } ($r =~ m/$MID_EXTRACT/go) ];
}
if (my $m = delete($smsg->{mid})) {
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 11/14] lei: q: lock stdout on overview output
2021-01-14 7:06 [PATCH 00/14] lei: another pile of changes Eric Wong
` (9 preceding siblings ...)
2021-01-14 7:06 ` [PATCH 10/14] lei_overview: rename "references" to "refs" Eric Wong
@ 2021-01-14 7:06 ` Eric Wong
2021-01-15 0:18 ` Eric Wong
2021-01-14 7:06 ` [PATCH 12/14] leixsearch: remove some commented out code Eric Wong
` (2 subsequent siblings)
13 siblings, 1 reply; 16+ messages in thread
From: Eric Wong @ 2021-01-14 7:06 UTC (permalink / raw)
To: meta
Most writes to stdout aren't atomic and we need locking to
prevent workers from interleaving and corrupting JSON output.
The one case stdout won't require locking is if it's pointed
to a regular file with O_APPEND; as POSIX O_APPEND semantics
guarantees atomicity.
---
MANIFEST | 1 +
lib/PublicInbox/LeiOverview.pm | 34 ++++++++++++++++++++++++++++++++++
lib/PublicInbox/LeiXSearch.pm | 9 +++++----
lib/PublicInbox/Lock.pm | 2 +-
t/lei_overview.t | 33 +++++++++++++++++++++++++++++++++
5 files changed, 74 insertions(+), 5 deletions(-)
create mode 100644 t/lei_overview.t
diff --git a/MANIFEST b/MANIFEST
index 2ca240fc..0ebdaccc 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -338,6 +338,7 @@ t/kqnotify.t
t/lei-oneshot.t
t/lei.t
t/lei_dedupe.t
+t/lei_overview.t
t/lei_store.t
t/lei_to_mail.t
t/lei_xsearch.t
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index ec0921ba..44c21837 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -6,8 +6,11 @@
package PublicInbox::LeiOverview;
use strict;
use v5.10.1;
+use parent qw(PublicInbox::Lock);
use POSIX qw(strftime);
+use Fcntl qw(F_GETFL O_APPEND);
use File::Spec;
+use File::Temp ();
use PublicInbox::MID qw($MID_EXTRACT);
use PublicInbox::Address qw(pairs);
use PublicInbox::Config;
@@ -18,6 +21,23 @@ my $JSONL = 'ldjson|ndjson|jsonl'; # 3 names for the same thing
sub _iso8601 ($) { strftime('%Y-%m-%dT%H:%M:%SZ', gmtime($_[0])) }
+# we open this in the parent process before ->wq_do handoff
+sub ovv_out_lk_init ($) {
+ my ($self) = @_;
+ $self->{tmp_lk_id} = "$self.$$";
+ my $tmp = File::Temp->new("lei-ovv.out.$$.lock-XXXXXX",
+ TMPDIR => 1, UNLINK => 0);
+ $self->{lock_path} = $tmp->filename;
+}
+
+sub ovv_out_lk_cancel ($) {
+ my ($self) = @_;
+ ($self->{tmp_lk_id}//'') eq "$self.$$" and
+ unlink(delete($self->{lock_path}));
+}
+
+*DESTROY = \&ovv_out_lk_cancel;
+
sub new {
my ($class, $lei) = @_;
my $opt = $lei->{opt};
@@ -50,8 +70,17 @@ sub new {
$isatty = -t $lei->{1};
$lei->start_pager if $isatty;
$opt->{pretty} //= $isatty;
+ if (!$isatty && -f _) {
+ my $fl = fcntl($lei->{1}, F_GETFL, 0) //
+ return $lei->fail("fcntl(stdout): $!");
+ ovv_out_lk_init($self) unless ($fl & O_APPEND);
+ } else {
+ ovv_out_lk_init($self);
+ }
} elsif ($json) {
return $lei->fail('JSON formats only output to stdout');
+ } else {
+ return $lei->fail("TODO: $out -f $fmt");
}
$self;
}
@@ -109,6 +138,7 @@ sub _unbless_smsg {
sub ovv_atexit_child {
my ($self, $lei) = @_;
if (my $bref = delete $lei->{ovv_buf}) {
+ my $lk = $self->lock_for_scope;
print { $lei->{1} } $$bref;
}
}
@@ -142,7 +172,9 @@ sub _json_pretty {
sub ovv_each_smsg_cb {
my ($self, $lei) = @_;
$lei->{ovv_buf} = \(my $buf = '');
+ delete(@$self{qw(lock_path tmp_lk_id)}) unless $lei->{-parallel};
my $json = $self->{json}->new;
+ $lei->{1}->autoflush(1);
if ($json) {
$json->utf8->canonical;
$json->ascii(1) if $lei->{opt}->{ascii};
@@ -164,6 +196,7 @@ sub ovv_each_smsg_cb {
} sort keys %$smsg);
$buf .= $EOR;
if (length($buf) > 65536) {
+ my $lk = $self->lock_for_scope;
print { $lei->{1} } $buf;
$buf = '';
}
@@ -175,6 +208,7 @@ sub ovv_each_smsg_cb {
delete @$smsg{qw(tid num)};
$buf .= $json->encode(_unbless_smsg(@_)) . $ORS;
if (length($buf) > 65536) {
+ my $lk = $self->lock_for_scope;
print { $lei->{1} } $buf;
$buf = '';
}
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 80e7a7f7..ee93e074 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -158,20 +158,21 @@ sub query_done { # PublicInbox::EOFpipe callback
sub do_query {
my ($self, $lei_orig, $srcs) = @_;
my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
-
+ my $remotes = $self->{remotes} // [];
pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
$io[0] = $qry_done; # don't need stdin
- $io[1]->autoflush(1);
- $io[2]->autoflush(1);
+
if ($lei->{opt}->{thread}) {
+ $lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1;
for my $ibxish (@$srcs) {
$self->wq_do('query_thread_mset', \@io, $lei, $ibxish);
}
} else {
+ $lei->{-parallel} = scalar(@$remotes);
$self->wq_do('query_mset', \@io, $lei, $srcs);
}
# TODO
- for my $rmt (@{$self->{remotes} // []}) {
+ for my $rmt (@$remotes) {
$self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
}
@io = ();
diff --git a/lib/PublicInbox/Lock.pm b/lib/PublicInbox/Lock.pm
index 2c5ebf27..bb213de4 100644
--- a/lib/PublicInbox/Lock.pm
+++ b/lib/PublicInbox/Lock.pm
@@ -37,7 +37,7 @@ sub lock_release {
# caller must use return value
sub lock_for_scope {
my ($self, @single_pid) = @_;
- $self->lock_acquire;
+ lock_acquire($self) or return; # lock_path not set
PublicInbox::OnDestroy->new(@single_pid, \&lock_release, $self);
}
diff --git a/t/lei_overview.t b/t/lei_overview.t
new file mode 100644
index 00000000..896cc01a
--- /dev/null
+++ b/t/lei_overview.t
@@ -0,0 +1,33 @@
+#!perl -w
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+use POSIX qw(_exit);
+require_ok 'PublicInbox::LeiOverview';
+
+my $ovv = bless {}, 'PublicInbox::LeiOverview';
+$ovv->ovv_out_lk_init;
+my $lock_path = $ovv->{lock_path};
+ok(-f $lock_path, 'lock init');
+undef $ovv;
+ok(!-f $lock_path, 'lock DESTROY');
+
+$ovv = bless {}, 'PublicInbox::LeiOverview';
+$ovv->ovv_out_lk_init;
+$lock_path = $ovv->{lock_path};
+ok(-f $lock_path, 'lock init #2');
+my $pid = fork // BAIL_OUT "fork $!";
+if ($pid == 0) {
+ undef $ovv;
+ _exit(0);
+}
+is(waitpid($pid, 0), $pid, 'child exited');
+is($?, 0, 'no error in child process');
+ok(-f $lock_path, 'lock was not destroyed by child');
+undef $ovv;
+ok(!-f $lock_path, 'lock DESTROY #2');
+
+done_testing;
^ permalink raw reply related [flat|nested] 16+ messages in thread
* Re: [PATCH 11/14] lei: q: lock stdout on overview output
2021-01-14 7:06 ` [PATCH 11/14] lei: q: lock stdout on overview output Eric Wong
@ 2021-01-15 0:18 ` Eric Wong
0 siblings, 0 replies; 16+ messages in thread
From: Eric Wong @ 2021-01-15 0:18 UTC (permalink / raw)
To: meta
Will squash this to fix a warning:
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index 44c21837..ef5f27c1 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -36,8 +36,6 @@ sub ovv_out_lk_cancel ($) {
unlink(delete($self->{lock_path}));
}
-*DESTROY = \&ovv_out_lk_cancel;
-
sub new {
my ($class, $lei) = @_;
my $opt = $lei->{opt};
@@ -220,4 +218,7 @@ sub ovv_each_smsg_cb {
} # else { ...
}
+no warnings 'once';
+*DESTROY = \&ovv_out_lk_cancel;
+
1;
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 12/14] leixsearch: remove some commented out code
2021-01-14 7:06 [PATCH 00/14] lei: another pile of changes Eric Wong
` (10 preceding siblings ...)
2021-01-14 7:06 ` [PATCH 11/14] lei: q: lock stdout on overview output Eric Wong
@ 2021-01-14 7:06 ` Eric Wong
2021-01-14 7:06 ` [PATCH 13/14] lei: remove temporary var on open Eric Wong
2021-01-14 7:06 ` [PATCH 14/14] lei: pass FD to CWD via cmsg, use fchdir on server Eric Wong
13 siblings, 0 replies; 16+ messages in thread
From: Eric Wong @ 2021-01-14 7:06 UTC (permalink / raw)
To: meta
Dedupe is active, now, and we have $each_smsg->(...)
---
lib/PublicInbox/LeiXSearch.pm | 4 ----
1 file changed, 4 deletions(-)
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index ee93e074..25ded544 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -117,8 +117,6 @@ sub query_thread_mset { # for --thread
next if $dd->is_smsg_dup($smsg);
my $mitem = delete $n2item{$smsg->{num}};
$each_smsg->($smsg, $mitem);
- # $self->out($buf .= $ORS);
- # $emit_cb->($smsg);
}
@{$ctx->{xids}} = ();
}
@@ -142,8 +140,6 @@ sub query_mset { # non-parallel for non-"--thread" users
my $smsg = smsg_for($self, $it) or next;
next if $dd->is_smsg_dup($smsg);
$each_smsg->($smsg, $it);
- # $self->out($buf .= $ORS) if defined $buf;
- #$emit_cb->($smsg);
}
} while (_mset_more($mset, $mo));
$lei->{ovv}->ovv_atexit_child($lei);
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 13/14] lei: remove temporary var on open
2021-01-14 7:06 [PATCH 00/14] lei: another pile of changes Eric Wong
` (11 preceding siblings ...)
2021-01-14 7:06 ` [PATCH 12/14] leixsearch: remove some commented out code Eric Wong
@ 2021-01-14 7:06 ` Eric Wong
2021-01-14 7:06 ` [PATCH 14/14] lei: pass FD to CWD via cmsg, use fchdir on server Eric Wong
13 siblings, 0 replies; 16+ messages in thread
From: Eric Wong @ 2021-01-14 7:06 UTC (permalink / raw)
To: meta
We can place the IO/GLOB ref directly into $self, here.
---
lib/PublicInbox/LEI.pm | 7 ++-----
1 file changed, 2 insertions(+), 5 deletions(-)
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index a8fea16d..9786e7ac 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -678,11 +678,8 @@ sub accept_dispatch { # Listener {post_accept} callback
my $i = 0;
for my $rdr (qw(<&= >&= >&=)) {
my $fd = shift(@fds);
- if (open(my $fh, $rdr, $fd)) {
- $self->{$i++} = $fh;
- next;
- }
- return send($sock, "open($rdr$fd) (FD=$i): $!", MSG_EOR);
+ open($self->{$i++}, $rdr, $fd) and next;
+ send($sock, "open($rdr$fd) (FD=$i): $!", MSG_EOR);
}
} else {
return send($sock, "recv_cmd failed: $!", MSG_EOR);
^ permalink raw reply related [flat|nested] 16+ messages in thread
* [PATCH 14/14] lei: pass FD to CWD via cmsg, use fchdir on server
2021-01-14 7:06 [PATCH 00/14] lei: another pile of changes Eric Wong
` (12 preceding siblings ...)
2021-01-14 7:06 ` [PATCH 13/14] lei: remove temporary var on open Eric Wong
@ 2021-01-14 7:06 ` Eric Wong
13 siblings, 0 replies; 16+ messages in thread
From: Eric Wong @ 2021-01-14 7:06 UTC (permalink / raw)
To: meta
Perl chdir() automatically does fchdir(2) if given a file
or directory handle since 5.8.8/5.10.0, so we can safely
rely on it given our 5.10.1+ requirement.
This means we no longer have to waste several milliseconds
loading the Cwd.so and making stat() calls to ensure
ENV{PWD} is correct and usable in the server. It also lets
us work in directories that are no longer accessible via
pathname.
---
lib/PublicInbox/LEI.pm | 14 +++++++-------
script/lei | 18 +++---------------
t/lei.t | 27 ++-------------------------
3 files changed, 12 insertions(+), 47 deletions(-)
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 9786e7ac..1f4a3082 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -674,9 +674,9 @@ sub accept_dispatch { # Listener {post_accept} callback
select($rvec, undef, undef, 1) or
return send($sock, 'timed out waiting to recv FDs', MSG_EOR);
my @fds = $recv_cmd->($sock, my $buf, 4096 * 33); # >MAX_ARG_STRLEN
- if (scalar(@fds) == 3) {
+ if (scalar(@fds) == 4) {
my $i = 0;
- for my $rdr (qw(<&= >&= >&=)) {
+ for my $rdr (qw(<&= >&= >&= <&=)) {
my $fd = shift(@fds);
open($self->{$i++}, $rdr, $fd) and next;
send($sock, "open($rdr$fd) (FD=$i): $!", MSG_EOR);
@@ -692,13 +692,13 @@ sub accept_dispatch { # Listener {post_accept} callback
my ($argc, @argv) = split(/\0/, $buf, -1);
undef $buf;
my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
- if (chdir($env{PWD})) {
+ if (chdir(delete($self->{3}))) {
local %ENV = %env;
$self->{env} = \%env;
eval { dispatch($self, @argv) };
send($sock, $@, MSG_EOR) if $@;
} else {
- send($sock, "chdir($env{PWD}): $!", MSG_EOR); # implicit close
+ send($sock, "fchdir: $!", MSG_EOR); # implicit close
}
}
@@ -746,7 +746,7 @@ our $oldset; sub oldset { $oldset }
# lei(1) calls this when it can't connect
sub lazy_start {
- my ($path, $errno, $nfd) = @_;
+ my ($path, $errno, $narg) = @_;
if ($errno == ECONNREFUSED) {
unlink($path) or die "unlink($path): $!";
} elsif ($errno != ENOENT) {
@@ -761,7 +761,7 @@ sub lazy_start {
my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino
pipe(my ($eof_r, $eof_w)) or die "pipe: $!";
local $oldset = PublicInbox::DS::block_signals();
- if ($nfd == 4) {
+ if ($narg == 5) {
$send_cmd = PublicInbox::Spawn->can('send_cmd4');
$recv_cmd = PublicInbox::Spawn->can('recv_cmd4') // do {
require PublicInbox::CmdIPC4;
@@ -770,7 +770,7 @@ sub lazy_start {
};
}
$recv_cmd or die <<"";
-(Socket::MsgHdr || Inline::C) missing/unconfigured (nfd=$nfd);
+(Socket::MsgHdr || Inline::C) missing/unconfigured (narg=$narg);
require PublicInbox::Listener;
require PublicInbox::EOFpipe;
diff --git a/script/lei b/script/lei
index 9610a876..a4a0217b 100755
--- a/script/lei
+++ b/script/lei
@@ -6,7 +6,7 @@ use v5.10.1;
use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
use Errno qw(EINTR ECONNRESET);
use PublicInbox::CmdIPC4;
-my $narg = 4;
+my $narg = 5;
my ($sock, $pwd);
my $recv_cmd = PublicInbox::CmdIPC4->can('recv_cmd4');
my $send_cmd = PublicInbox::CmdIPC4->can('send_cmd4') // do {
@@ -74,25 +74,13 @@ connect($path): $! (after attempted daemon start)
Falling back to (slow) one-shot mode
}
- require Cwd;
- $pwd = $ENV{PWD} // '';
- my $cwd = Cwd::fastcwd() // die "fastcwd(PWD=$pwd): $!";
- if ($pwd ne $cwd) { # prefer ENV{PWD} if it's a symlink to real cwd
- my @st_cwd = stat($cwd) or die "stat(cwd=$cwd): $!";
- my @st_pwd = stat($pwd); # PWD invalid, use cwd
- # make sure st_dev/st_ino match for {PWD} to be valid
- $pwd = $cwd if (!@st_pwd || $st_pwd[1] != $st_cwd[1] ||
- $st_pwd[0] != $st_cwd[0]);
- } else {
- $pwd = $cwd;
- }
1;
}) { # (Socket::MsgHdr|Inline::C), $sock, $pwd are all available:
- $ENV{PWD} = $pwd;
+ open my $dh, '<', '.' or die "open(.) $!";
my $buf = join("\0", scalar(@ARGV), @ARGV);
while (my ($k, $v) = each %ENV) { $buf .= "\0$k=$v" }
$buf .= "\0\0";
- $send_cmd->($sock, [ 0, 1, 2 ], $buf, MSG_EOR);
+ $send_cmd->($sock, [ 0, 1, 2, fileno($dh) ], $buf, MSG_EOR);
$SIG{TERM} = $SIG{INT} = $SIG{QUIT} = sub {
my ($sig) = @_; # 'TERM', not an integer :<
$SIG{$sig} = 'DEFAULT';
diff --git a/t/lei.t b/t/lei.t
index 240735bf..2349dca4 100644
--- a/t/lei.t
+++ b/t/lei.t
@@ -208,9 +208,9 @@ if ($ENV{TEST_LEI_ONESHOT}) {
SKIP: { # real socket
require_mods(qw(Cwd), my $nr = 105);
- my $nfd = eval { require Socket::MsgHdr; 4 } // do {
+ my $nfd = eval { require Socket::MsgHdr; 5 } // do {
require PublicInbox::Spawn;
- PublicInbox::Spawn->can('send_cmd4') ? 4 : undef;
+ PublicInbox::Spawn->can('send_cmd4') ? 5 : undef;
} //
skip 'Socket::MsgHdr or Inline::C missing or unconfigured', $nr;
@@ -260,29 +260,6 @@ SKIP: { # real socket
like($out, qr/^usage: /, 'help output works');
chmod 0700, $sock or BAIL_OUT "chmod 0700: $!";
}
- if ('oneshot on cwd gone') {
- my $cwd = Cwd::fastcwd() or BAIL_OUT "fastcwd: $!";
- my $d = "$home/to-be-removed";
- my $lei_path = 'lei';
- # we chdir, so we need an abs_path fur run_script
- if (($ENV{TEST_RUN_MODE}//2) != 2) {
- $lei_path = PublicInbox::TestCommon::key2script('lei');
- $lei_path = Cwd::abs_path($lei_path);
- }
- mkdir $d or BAIL_OUT "mkdir($d) $!";
- chdir $d or BAIL_OUT "chdir($d) $!";
- if (rmdir($d)) {
- $out = $err = '';
- ok(run_script([$lei_path, 'help'], undef, $opt),
- 'cwd fail, one-shot fallback works');
- } else {
- $err = "rmdir=$!";
- }
- chdir $cwd or BAIL_OUT "chdir($cwd) $!";
- like($err, qr/cwd\(/, 'cwd error noted');
- like($out, qr/^usage: /, 'help output still works');
- }
-
unlink $sock or BAIL_OUT "unlink($sock) $!";
for (0..100) {
kill('CHLD', $new_pid) or last;
^ permalink raw reply related [flat|nested] 16+ messages in thread