Still chasing down a weird problem which causes t/lei.t and t/lei-oneshot.t to fail on FreeBSD 11.4 with IO::FDPass under high load. No syscall errors are reported, but it's like the FDs aren't passed at all... Maybe it's fixed in 12.x 1/3 is to cut down on noise 2/3 is a no-brainer :x 3/3 was for me to play around with, but also avoids malloc and a potential leak in IO::FDPass (upstream's been notified). However, I'm considering just making our C code pass all 3 FDs with one syscall since it's possible. In any case, the C parts of PublicInbox::Spawn should probably renamed PublicInbox::C... Eric Wong (3): t/lei: use $lei->() callback wrapper testcommon: prepare_redirects: fix error message spawn: support send_fd+recv_fd w/o IO::FDPass lib/PublicInbox/LEI.pm | 6 ++- lib/PublicInbox/Spawn.pm | 78 ++++++++++++++++++++++++++++++-- lib/PublicInbox/TestCommon.pm | 4 +- script/lei | 7 ++- t/lei.t | 84 ++++++++++++++++------------------- t/spawn.t | 18 ++++++++ 6 files changed, 141 insertions(+), 56 deletions(-)
This shortens the test and should make it easier to debug and add new tests. --- t/lei.t | 78 ++++++++++++++++++++++++--------------------------------- 1 file changed, 33 insertions(+), 45 deletions(-) diff --git a/t/lei.t b/t/lei.t index 6f6a5888..541d83ce 100644 --- a/t/lei.t +++ b/t/lei.t @@ -35,37 +35,35 @@ my $config_file = "$home/.config/lei/config"; my $store_dir = "$home/.local/share/lei"; my $test_help = sub { - ok(!$lei->([], undef, $opt), 'no args fails'); + ok(!$lei->(), 'no args fails'); is($? >> 8, 1, '$? is 1'); is($out, '', 'nothing in stdout'); like($err, qr/^usage:/sm, 'usage in stderr'); for my $arg (['-h'], ['--help'], ['help'], [qw(daemon-pid --help)]) { - $out = $err = ''; - ok($lei->($arg, undef, $opt), "lei @$arg"); + ok($lei->($arg), "lei @$arg"); like($out, qr/^usage:/sm, "usage in stdout (@$arg)"); is($err, '', "nothing in stderr (@$arg)"); } for my $arg ([''], ['--halp'], ['halp'], [qw(daemon-pid --halp)]) { - $out = $err = ''; - ok(!$lei->($arg, undef, $opt), "lei @$arg"); + ok(!$lei->($arg), "lei @$arg"); is($? >> 8, 1, '$? set correctly'); isnt($err, '', 'something in stderr'); is($out, '', 'nothing in stdout'); } - ok($lei->(qw(init -h), undef, $opt), 'init -h'); + ok($lei->(qw(init -h)), 'init -h'); like($out, qr! \Q$home\E/\.local/share/lei/store\b!, 'actual path shown in init -h'); - ok($lei->(qw(init -h), { XDG_DATA_HOME => '/XDH' }, $opt), + ok($lei->(qw(init -h), { XDG_DATA_HOME => '/XDH' }), 'init with XDG_DATA_HOME'); like($out, qr! /XDH/lei/store\b!, 'XDG_DATA_HOME in init -h'); is($err, '', 'no errors from init -h'); - ok($lei->(qw(config -h), undef, $opt), 'config-h'); + ok($lei->(qw(config -h)), 'config-h'); like($out, qr! \Q$home\E/\.config/lei/config\b!, 'actual path shown in config -h'); - ok($lei->(qw(config -h), { XDG_CONFIG_HOME => '/XDC' }, $opt), + ok($lei->(qw(config -h), { XDG_CONFIG_HOME => '/XDC' }), 'config with XDG_CONFIG_HOME'); like($out, qr! /XDC/lei/config\b!, 'XDG_CONFIG_HOME in config -h'); is($err, '', 'no errors from config -h'); @@ -75,31 +73,28 @@ my $ok_err_info = sub { my ($msg) = @_; is(grep(!/^I:/, split(/^/, $err)), 0, $msg) or diag "$msg: err=$err"; - $err = ''; }; my $test_init = sub { $cleanup->(); - ok($lei->(['init'], undef, $opt), 'init w/o args'); + ok($lei->('init'), 'init w/o args'); $ok_err_info->('after init w/o args'); - ok($lei->(['init'], undef, $opt), 'idempotent init w/o args'); + ok($lei->('init'), 'idempotent init w/o args'); $ok_err_info->('after idempotent init w/o args'); - ok(!$lei->(['init', "$home/x"], undef, $opt), - 'init conflict'); + ok(!$lei->('init', "$home/x"), 'init conflict'); is(grep(/^E:/, split(/^/, $err)), 1, 'got error on conflict'); ok(!-e "$home/x", 'nothing created on conflict'); $cleanup->(); - ok($lei->(['init', "$home/x"], undef, $opt), 'init conflict resolved'); + ok($lei->('init', "$home/x"), 'init conflict resolved'); $ok_err_info->('init w/ arg'); - ok($lei->(['init', "$home/x"], undef, $opt), 'init idempotent w/ path'); + ok($lei->('init', "$home/x"), 'init idempotent w/ path'); $ok_err_info->('init idempotent w/ arg'); ok(-d "$home/x", 'created dir'); $cleanup->("$home/x"); - ok(!$lei->(['init', "$home/x", "$home/2" ], undef, $opt), - 'too many args fails'); + ok(!$lei->('init', "$home/x", "$home/2"), 'too many args fails'); like($err, qr/too many/, 'noted excessive'); ok(!-e "$home/x", 'x not created on excessive'); for my $d (@$home_trash) { @@ -111,12 +106,12 @@ my $test_init = sub { my $test_config = sub { $cleanup->(); - ok($lei->([qw(config a.b c)], undef, $opt), 'config set var'); + ok($lei->(qw(config a.b c)), 'config set var'); is($out.$err, '', 'no output on var set'); - ok($lei->([qw(config -l)], undef, $opt), 'config -l'); + ok($lei->(qw(config -l)), 'config -l'); is($err, '', 'no errors on listing'); is($out, "a.b=c\n", 'got expected output'); - ok(!$lei->([qw(config -f), "$home/.config/f", qw(x.y z)], undef, $opt), + ok(!$lei->(qw(config -f), "$home/.config/f", qw(x.y z)), 'config set var with -f fails'); like($err, qr/not supported/, 'not supported noted'); ok(!-f "$home/config/f", 'no file created'); @@ -201,7 +196,7 @@ SKIP: { # real socket local $ENV{XDG_RUNTIME_DIR} = "$home/xdg_run"; my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/sock"; - ok(run_script([qw(lei daemon-pid)], undef, $opt), 'daemon-pid'); + ok($lei->('daemon-pid'), 'daemon-pid'); is($err, '', 'no error from daemon-pid'); like($out, qr/\A[0-9]+\n\z/s, 'pid returned') or BAIL_OUT; chomp(my $pid = $out); @@ -210,42 +205,39 @@ SKIP: { # real socket $test_lei_common->(); - $out = ''; - ok(run_script([qw(lei daemon-pid)], undef, $opt), 'daemon-pid'); + ok($lei->('daemon-pid'), 'daemon-pid'); chomp(my $pid_again = $out); is($pid, $pid_again, 'daemon-pid idempotent'); - $out = ''; - ok(run_script([qw(lei daemon-env -0)], undef, $opt), 'show env'); + ok($lei->(qw(daemon-env -0)), 'show env'); is($err, '', 'no errors in env dump'); my @env = split(/\0/, $out); is(scalar grep(/\AHOME=\Q$home\E\z/, @env), 1, 'env has HOME'); is(scalar grep(/\AFOO=BAR\z/, @env), 1, 'env has FOO=BAR'); is(scalar grep(/\AXDG_RUNTIME_DIR=/, @env), 1, 'has XDG_RUNTIME_DIR'); - $out = ''; - ok(run_script([qw(lei daemon-env -u FOO)], undef, $opt), 'unset'); + ok($lei->(qw(daemon-env -u FOO)), 'unset'); is($out.$err, '', 'no output for unset'); - ok(run_script([qw(lei daemon-env -0)], undef, $opt), 'show again'); + ok($lei->(qw(daemon-env -0)), 'show again'); is($err, '', 'no errors in env dump'); @env = split(/\0/, $out); is(scalar grep(/\AFOO=BAR\z/, @env), 0, 'env unset FOO'); - $out = ''; - ok(run_script([qw(lei daemon-env -u FOO -u HOME -u XDG_RUNTIME_DIR)], - undef, $opt), 'unset multiple'); + ok($lei->(qw(daemon-env -u FOO -u HOME -u XDG_RUNTIME_DIR)), + 'unset multiple'); is($out.$err, '', 'no errors output for unset'); - ok(run_script([qw(lei daemon-env -0)], undef, $opt), 'show again'); + + ok($lei->(qw(daemon-env -0)), 'show again'); is($err, '', 'no errors in env dump'); @env = split(/\0/, $out); is(scalar grep(/\A(?:HOME|XDG_RUNTIME_DIR)=\z/, @env), 0, 'env unset@'); - $out = ''; - ok(run_script([qw(lei daemon-env -)], undef, $opt), 'clear env'); + + ok($lei->(qw(daemon-env -)), 'clear env'); is($out.$err, '', 'no output'); - ok(run_script([qw(lei daemon-env)], undef, $opt), 'env is empty'); + ok($lei->(qw(daemon-env)), 'env is empty'); is($out, '', 'env cleared'); - ok(run_script([qw(lei daemon-kill)], undef, $opt), 'daemon-kill'); + ok($lei->(qw(daemon-kill)), 'daemon-kill'); is($out, '', 'no output from daemon-kill'); is($err, '', 'no error from daemon-kill'); for (0..100) { @@ -255,26 +247,22 @@ SKIP: { # real socket ok(!-S $sock, 'sock gone'); ok(!kill(0, $pid), 'pid gone after stop'); - ok(run_script([qw(lei daemon-pid)], undef, $opt), 'daemon-pid'); + ok($lei->(qw(daemon-pid)), 'daemon-pid'); chomp(my $new_pid = $out); ok(kill(0, $new_pid), 'new pid is running'); ok(-S $sock, 'sock exists again'); - $out = $err = ''; for my $sig (qw(-0 -CHLD)) { - ok(run_script([qw(lei daemon-kill), $sig ], undef, $opt), - "handles $sig"); + ok($lei->('daemon-kill', $sig), "handles $sig"); } is($out.$err, '', 'no output on innocuous signals'); - ok(run_script([qw(lei daemon-pid)], undef, $opt), 'daemon-pid'); + ok($lei->('daemon-pid'), 'daemon-pid'); chomp $out; is($out, $new_pid, 'PID unchanged after -0/-CHLD'); if ('socket inaccessible') { chmod 0000, $sock or BAIL_OUT "chmod 0000: $!"; - $out = $err = ''; - ok(run_script([qw(lei help)], undef, $opt), - 'connect fail, one-shot fallback works'); + ok($lei->('help'), 'connect fail, one-shot fallback works'); like($err, qr/\bconnect\(/, 'connect error noted'); like($out, qr/^usage: /, 'help output works'); chmod 0700, $sock or BAIL_OUT "chmod 0700: $!";
I never hit these die() calls, but noticed it while debugging another problem on FreeBSD. --- lib/PublicInbox/TestCommon.pm | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/PublicInbox/TestCommon.pm b/lib/PublicInbox/TestCommon.pm index 56f04bd4..16ae2650 100644 --- a/lib/PublicInbox/TestCommon.pm +++ b/lib/PublicInbox/TestCommon.pm @@ -141,9 +141,9 @@ sub _prepare_redirects ($) { for (my $fd = 0; $fd <= $#io_mode; $fd++) { my $fh = $fhref->[$fd] or next; my ($oldfh, $mode) = @{$io_mode[$fd]}; - open my $orig, $mode, $oldfh or die "$$oldfh $mode stash: $!"; + open my $orig, $mode, $oldfh or die "$oldfh $mode stash: $!"; $orig_io->[$fd] = $orig; - open $oldfh, $mode, $fh or die "$$oldfh $mode redirect: $!"; + open $oldfh, $mode, $fh or die "$oldfh $mode redirect: $!"; } $orig_io; }
IO::FDPass may be an extra installation burden I don't want to impose on users. We only support Linux and *BSDs, however. --- lib/PublicInbox/LEI.pm | 6 ++-- lib/PublicInbox/Spawn.pm | 78 +++++++++++++++++++++++++++++++++++++--- script/lei | 7 ++-- t/lei.t | 6 +++- t/spawn.t | 18 ++++++++++ 5 files changed, 106 insertions(+), 9 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 2bc4a916..6f98c934 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -25,6 +25,7 @@ use Text::Wrap qw(wrap); use File::Path qw(mkpath); use File::Spec; our $quit = \&CORE::exit; +my $recv_fd; my $GLP = Getopt::Long::Parser->new; $GLP->configure(qw(gnu_getopt no_ignore_case auto_abbrev)); my $GLP_PASS = Getopt::Long::Parser->new; @@ -615,7 +616,7 @@ sub accept_dispatch { # Listener {post_accept} callback # `say $sock' triggers "die" in lei(1) for my $i (0..2) { if (select(my $rout = $rin, undef, undef, 1)) { - my $fd = IO::FDPass::recv(fileno($sock)); + my $fd = $recv_fd->(fileno($sock)); if ($fd >= 0) { my $rdr = ($fd == 0 ? '<&=' : '>&='); if (open(my $fh, $rdr, $fd)) { @@ -671,7 +672,8 @@ sub lazy_start { my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino pipe(my ($eof_r, $eof_w)) or die "pipe: $!"; my $oldset = PublicInbox::Sigfd::block_signals(); - require IO::FDPass; + $recv_fd = PublicInbox::Spawn->can('recv_fd') or die + "Inline::C not installed/configured or IO::FDPass missing\n"; require PublicInbox::Listener; require PublicInbox::EOFpipe; (-p STDOUT) or die "E: stdout must be a pipe\n"; diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index 762a0549..4ca94b9f 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -201,12 +201,72 @@ void nodatacow_dir(const char *dir) } SET_NODATACOW +my $fdpass = <<'FDPASS'; +#include <sys/types.h> +#include <sys/uio.h> +#include <sys/socket.h> + +#if defined(CMSG_SPACE) && defined(CMSG_LEN) +union my_cmsg { + struct cmsghdr hdr; + char pad[sizeof(struct cmsghdr)+8+sizeof(int)+8]; +}; + +int send_fd(int sockfd, int fd) +{ + struct msghdr msg = { 0 }; + struct iovec iov; + union my_cmsg cmsg = { 0 }; + + iov.iov_base = &msg.msg_namelen; + iov.iov_len = 1; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_control = &cmsg.hdr; + msg.msg_controllen = CMSG_SPACE(sizeof(int)); + + cmsg.hdr.cmsg_level = SOL_SOCKET; + cmsg.hdr.cmsg_type = SCM_RIGHTS; + cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(int)); + *(int *)CMSG_DATA(&cmsg.hdr) = fd; + + return sendmsg(sockfd, &msg, 0) >= 0; +} + +int recv_fd(int sockfd) +{ + union my_cmsg cmsg = { 0 }; + struct msghdr msg = { 0 }; + struct iovec iov; + int fd = -1; + + iov.iov_base = &msg.msg_namelen; + iov.iov_len = 1; + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_control = &cmsg.hdr; + msg.msg_controllen = CMSG_SPACE(sizeof(int)); + + if (recvmsg(sockfd, &msg, 0) <= 0) + return -1; + + errno = EDOM; + if (cmsg.hdr.cmsg_level == SOL_SOCKET && + cmsg.hdr.cmsg_type == SCM_RIGHTS && + cmsg.hdr.cmsg_len == CMSG_LEN(sizeof(int))) + fd = *(int *)CMSG_DATA(&cmsg.hdr); + + return fd; +} +#endif /* defined(CMSG_SPACE) && defined(CMSG_LEN) */ +FDPASS + my $inline_dir = $ENV{PERL_INLINE_DIRECTORY} //= ( $ENV{XDG_CACHE_HOME} // ( ($ENV{HOME} // '/nonexistent').'/.cache' ) ).'/public-inbox/inline-c'; -$set_nodatacow = $vfork_spawn = undef unless -d $inline_dir && -w _; +$set_nodatacow = $vfork_spawn = $fdpass = undef unless -d $inline_dir && -w _; if (defined $vfork_spawn) { # Inline 0.64 or later has locking in multi-process env, # but we support 0.5 on Debian wheezy @@ -215,13 +275,13 @@ if (defined $vfork_spawn) { my $f = "$inline_dir/.public-inbox.lock"; open my $fh, '>', $f or die "failed to open $f: $!\n"; flock($fh, LOCK_EX) or die "LOCK_EX failed on $f: $!\n"; - eval 'use Inline C => $vfork_spawn . $set_nodatacow'; + eval 'use Inline C => $vfork_spawn . $fdpass . $set_nodatacow'; my $err = $@; my $ndc_err; if ($err && $set_nodatacow) { # missing Linux kernel headers $ndc_err = $err; undef $set_nodatacow; - eval 'use Inline C => $vfork_spawn'; + eval 'use Inline C => $vfork_spawn . $fdpass'; } flock($fh, LOCK_UN) or die "LOCK_UN failed on $f: $!\n"; die $err if $err; @@ -229,7 +289,7 @@ if (defined $vfork_spawn) { }; if ($@) { warn "Inline::C failed for vfork: $@\n"; - $set_nodatacow = $vfork_spawn = undef; + $set_nodatacow = $vfork_spawn = $fdpass = undef; } } @@ -243,8 +303,18 @@ unless ($set_nodatacow) { *nodatacow_fd = \&PublicInbox::NDC_PP::nodatacow_fd; *nodatacow_dir = \&PublicInbox::NDC_PP::nodatacow_dir; } +unless (__PACKAGE__->can('recv_fd')) { + eval { # try the XS IO::FDPass package + require IO::FDPass; + no warnings 'once'; + *recv_fd = \&IO::FDPass::recv; + *send_fd = \&IO::FDPass::send; + }; +} + undef $set_nodatacow; undef $vfork_spawn; +undef $fdpass; sub which ($) { my ($file) = @_; diff --git a/script/lei b/script/lei index ff4dcd45..67e8b8b0 100755 --- a/script/lei +++ b/script/lei @@ -4,8 +4,11 @@ use strict; use v5.10.1; use Socket qw(AF_UNIX SOCK_STREAM pack_sockaddr_un); +my $send_fd; if (my ($sock, $pwd) = eval { - require IO::FDPass; # will try to use a daemon to reduce load time + require PublicInbox::Spawn; + $send_fd = PublicInbox::Spawn->can('send_fd') or die + "Inline::C not installed/configured or IO::FDPass missing\n"; my $path = do { my $runtime_dir = ($ENV{XDG_RUNTIME_DIR} // '') . '/lei'; if ($runtime_dir eq '/lei') { @@ -57,7 +60,7 @@ Falling back to (slow) one-shot mode $buf .= "\0\0"; select $sock; $| = 1; # unbuffer selected $sock - IO::FDPass::send(fileno($sock), $_) for (0..2); + $send_fd->(fileno($sock), $_) for (0..2); print $sock $buf or die "print(sock, buf): $!"; while ($buf = <$sock>) { $buf =~ /\Aexit=([0-9]+)\n\z/ and exit($1 + 0); diff --git a/t/lei.t b/t/lei.t index 541d83ce..662fc545 100644 --- a/t/lei.t +++ b/t/lei.t @@ -192,7 +192,11 @@ if ($ENV{TEST_LEI_ONESHOT}) { } SKIP: { # real socket - require_mods(qw(IO::FDPass Cwd), 46); + require_mods(qw(Cwd), my $nr = 46); + require PublicInbox::Spawn; + skip "Inline::C not installed/configured or IO::FDPass missing", $nr + unless PublicInbox::Spawn->can('send_fd'); + local $ENV{XDG_RUNTIME_DIR} = "$home/xdg_run"; my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/sock"; diff --git a/t/spawn.t b/t/spawn.t index d97e13a6..e5cb09d9 100644 --- a/t/spawn.t +++ b/t/spawn.t @@ -5,6 +5,24 @@ use warnings; use Test::More; use PublicInbox::Spawn qw(which spawn popen_rd); use PublicInbox::Sigfd; +use Socket qw(AF_UNIX SOCK_STREAM); + +SKIP: { + my $recv_fd = PublicInbox::Spawn->can('recv_fd'); + my $send_fd = PublicInbox::Spawn->can('send_fd'); + skip 'Inline::C not enabled', 3 unless $send_fd && $recv_fd; + my ($s1, $s2); + socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or BAIL_OUT $!; + pipe(my ($r, $w)) or BAIL_OUT $!; + ok($send_fd->(fileno($s1), fileno($r)), 'pipe sent'); + my $rfd = $recv_fd->(fileno($s2)); + like($rfd, qr/\A\d+\z/, 'got FD'); + open(my $rfh, '<&=', $rfd) or BAIL_OUT $!; + my @old = stat($r); + my @new = stat($rfh); + is("$old[0]\0$old[1]", "$new[0]\0$new[1]", + 'device/inode matches on received FD'); +} { my $true = which('true');
I implemented 1/2 thinking in hopes it would help, but it's a nice syscall reduction anyways. 2/2 is the actual fix I've been struggling to find, and I could only reproduce it on this FreeBSD VM I have access to. It seems so obvious in retrospect :x Eric Wong (2): send and receive all 3 FDs at once lei: fix output race in client/daemon mode lib/PublicInbox/LEI.pm | 33 +++++++++++++++---------- lib/PublicInbox/Spawn.pm | 53 ++++++++++++++++++++++++++-------------- script/lei | 6 ++--- t/lei.t | 2 +- t/spawn.t | 33 ++++++++++++++++--------- 5 files changed, 80 insertions(+), 47 deletions(-)
We'll always be transferring stdin, stdout, and stderr together for lei. Perhaps I lack imagination or foresight, but I can't think of a reason to send more or less FDs. --- lib/PublicInbox/LEI.pm | 27 ++++++++++---------- lib/PublicInbox/Spawn.pm | 53 ++++++++++++++++++++++++++-------------- script/lei | 6 ++--- t/lei.t | 2 +- t/spawn.t | 33 ++++++++++++++++--------- 5 files changed, 74 insertions(+), 47 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 6f98c934..3ad5e01a 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -25,7 +25,7 @@ use Text::Wrap qw(wrap); use File::Path qw(mkpath); use File::Spec; our $quit = \&CORE::exit; -my $recv_fd; +my $recv_3fds; my $GLP = Getopt::Long::Parser->new; $GLP->configure(qw(gnu_getopt no_ignore_case auto_abbrev)); my $GLP_PASS = Getopt::Long::Parser->new; @@ -614,25 +614,26 @@ sub accept_dispatch { # Listener {post_accept} callback my $self = bless { sock => $sock }, __PACKAGE__; vec(my $rin = '', fileno($sock), 1) = 1; # `say $sock' triggers "die" in lei(1) - for my $i (0..2) { - if (select(my $rout = $rin, undef, undef, 1)) { - my $fd = $recv_fd->(fileno($sock)); - if ($fd >= 0) { - my $rdr = ($fd == 0 ? '<&=' : '>&='); + if (select(my $rout = $rin, undef, undef, 1)) { + my @fds = $recv_3fds->(fileno($sock)); + if (scalar(@fds) == 3) { + my $i = 0; + for my $rdr (qw(<&= >&= >&=)) { + my $fd = shift(@fds); if (open(my $fh, $rdr, $fd)) { - $self->{$i} = $fh; - } else { + $self->{$i++} = $fh; + } else { say $sock "open($rdr$fd) (FD=$i): $!"; return; } - } else { - say $sock "recv FD=$i: $!"; - return; } } else { - say $sock "timed out waiting to recv FD=$i"; + say $sock "recv_3fds failed: $!"; return; } + } else { + say $sock "timed out waiting to recv FDs"; + return; } # $ARGV_STR = join("]\0[", @ARGV); # $ENV_STR = join('', map { "$_=$ENV{$_}\0" } keys %ENV); @@ -672,7 +673,7 @@ sub lazy_start { my $dev_ino_expect = pack('dd', $st[0], $st[1]); # dev+ino pipe(my ($eof_r, $eof_w)) or die "pipe: $!"; my $oldset = PublicInbox::Sigfd::block_signals(); - $recv_fd = PublicInbox::Spawn->can('recv_fd') or die + $recv_3fds = PublicInbox::Spawn->can('recv_3fds') or die "Inline::C not installed/configured or IO::FDPass missing\n"; require PublicInbox::Listener; require PublicInbox::EOFpipe; diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index 4ca94b9f..61e95433 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -207,56 +207,67 @@ my $fdpass = <<'FDPASS'; #include <sys/socket.h> #if defined(CMSG_SPACE) && defined(CMSG_LEN) +struct my_3fds { int fds[3]; }; union my_cmsg { struct cmsghdr hdr; - char pad[sizeof(struct cmsghdr)+8+sizeof(int)+8]; + char pad[sizeof(struct cmsghdr)+ 8 + sizeof(struct my_3fds) + 8]; }; -int send_fd(int sockfd, int fd) +int send_3fds(int sockfd, int infd, int outfd, int errfd) { struct msghdr msg = { 0 }; struct iovec iov; union my_cmsg cmsg = { 0 }; + int *fdp; + size_t i; - iov.iov_base = &msg.msg_namelen; + iov.iov_base = &msg.msg_namelen; /* whatever */ iov.iov_len = 1; msg.msg_iov = &iov; msg.msg_iovlen = 1; msg.msg_control = &cmsg.hdr; - msg.msg_controllen = CMSG_SPACE(sizeof(int)); + msg.msg_controllen = CMSG_SPACE(sizeof(struct my_3fds)); cmsg.hdr.cmsg_level = SOL_SOCKET; cmsg.hdr.cmsg_type = SCM_RIGHTS; - cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(int)); - *(int *)CMSG_DATA(&cmsg.hdr) = fd; - + cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(struct my_3fds)); + fdp = (int *)CMSG_DATA(&cmsg.hdr); + *fdp++ = infd; + *fdp++ = outfd; + *fdp++ = errfd; return sendmsg(sockfd, &msg, 0) >= 0; } -int recv_fd(int sockfd) +void recv_3fds(int sockfd) { union my_cmsg cmsg = { 0 }; struct msghdr msg = { 0 }; struct iovec iov; - int fd = -1; + size_t i; + Inline_Stack_Vars; - iov.iov_base = &msg.msg_namelen; + iov.iov_base = &msg.msg_namelen; /* whatever */ iov.iov_len = 1; msg.msg_iov = &iov; msg.msg_iovlen = 1; msg.msg_control = &cmsg.hdr; - msg.msg_controllen = CMSG_SPACE(sizeof(int)); + msg.msg_controllen = CMSG_SPACE(sizeof(struct my_3fds)); if (recvmsg(sockfd, &msg, 0) <= 0) - return -1; + return; errno = EDOM; + Inline_Stack_Reset; if (cmsg.hdr.cmsg_level == SOL_SOCKET && cmsg.hdr.cmsg_type == SCM_RIGHTS && - cmsg.hdr.cmsg_len == CMSG_LEN(sizeof(int))) - fd = *(int *)CMSG_DATA(&cmsg.hdr); + cmsg.hdr.cmsg_len == CMSG_LEN(sizeof(struct my_3fds))) { + int *fdp = (int *)CMSG_DATA(&cmsg.hdr); + size_t i; - return fd; + for (i = 0; i < 3; i++) + Inline_Stack_Push(sv_2mortal(newSViv(*fdp++))); + } + Inline_Stack_Done; } #endif /* defined(CMSG_SPACE) && defined(CMSG_LEN) */ FDPASS @@ -275,7 +286,8 @@ if (defined $vfork_spawn) { my $f = "$inline_dir/.public-inbox.lock"; open my $fh, '>', $f or die "failed to open $f: $!\n"; flock($fh, LOCK_EX) or die "LOCK_EX failed on $f: $!\n"; - eval 'use Inline C => $vfork_spawn . $fdpass . $set_nodatacow'; + eval 'use Inline C => $vfork_spawn.$fdpass.$set_nodatacow'; + # . ', BUILD_NOISY => 1'; my $err = $@; my $ndc_err; if ($err && $set_nodatacow) { # missing Linux kernel headers @@ -303,12 +315,15 @@ unless ($set_nodatacow) { *nodatacow_fd = \&PublicInbox::NDC_PP::nodatacow_fd; *nodatacow_dir = \&PublicInbox::NDC_PP::nodatacow_dir; } -unless (__PACKAGE__->can('recv_fd')) { +unless (__PACKAGE__->can('recv_3fds')) { eval { # try the XS IO::FDPass package require IO::FDPass; no warnings 'once'; - *recv_fd = \&IO::FDPass::recv; - *send_fd = \&IO::FDPass::send; + *recv_3fds = sub { map { IO::FDPass::recv($_[0]) } (0..2) }; + *send_3fds = sub ($$$$) { + my $sockfd = shift; + IO::FDPass::send($sockfd, shift) for (0..2); + }; }; } diff --git a/script/lei b/script/lei index 67e8b8b0..029881f8 100755 --- a/script/lei +++ b/script/lei @@ -4,10 +4,10 @@ use strict; use v5.10.1; use Socket qw(AF_UNIX SOCK_STREAM pack_sockaddr_un); -my $send_fd; +my $send_3fds; if (my ($sock, $pwd) = eval { require PublicInbox::Spawn; - $send_fd = PublicInbox::Spawn->can('send_fd') or die + $send_3fds = PublicInbox::Spawn->can('send_3fds') or die "Inline::C not installed/configured or IO::FDPass missing\n"; my $path = do { my $runtime_dir = ($ENV{XDG_RUNTIME_DIR} // '') . '/lei'; @@ -60,7 +60,7 @@ Falling back to (slow) one-shot mode $buf .= "\0\0"; select $sock; $| = 1; # unbuffer selected $sock - $send_fd->(fileno($sock), $_) for (0..2); + $send_3fds->(fileno($sock), 0, 1, 2); print $sock $buf or die "print(sock, buf): $!"; while ($buf = <$sock>) { $buf =~ /\Aexit=([0-9]+)\n\z/ and exit($1 + 0); diff --git a/t/lei.t b/t/lei.t index 662fc545..42c0eb8f 100644 --- a/t/lei.t +++ b/t/lei.t @@ -195,7 +195,7 @@ SKIP: { # real socket require_mods(qw(Cwd), my $nr = 46); require PublicInbox::Spawn; skip "Inline::C not installed/configured or IO::FDPass missing", $nr - unless PublicInbox::Spawn->can('send_fd'); + unless PublicInbox::Spawn->can('send_3fds'); local $ENV{XDG_RUNTIME_DIR} = "$home/xdg_run"; my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/sock"; diff --git a/t/spawn.t b/t/spawn.t index e5cb09d9..891a3702 100644 --- a/t/spawn.t +++ b/t/spawn.t @@ -8,20 +8,31 @@ use PublicInbox::Sigfd; use Socket qw(AF_UNIX SOCK_STREAM); SKIP: { - my $recv_fd = PublicInbox::Spawn->can('recv_fd'); - my $send_fd = PublicInbox::Spawn->can('send_fd'); - skip 'Inline::C not enabled', 3 unless $send_fd && $recv_fd; + my $recv_3fds = PublicInbox::Spawn->can('recv_3fds'); + my $send_3fds = PublicInbox::Spawn->can('send_3fds'); + skip 'Inline::C not enabled', 3 unless $send_3fds && $recv_3fds; my ($s1, $s2); socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or BAIL_OUT $!; pipe(my ($r, $w)) or BAIL_OUT $!; - ok($send_fd->(fileno($s1), fileno($r)), 'pipe sent'); - my $rfd = $recv_fd->(fileno($s2)); - like($rfd, qr/\A\d+\z/, 'got FD'); - open(my $rfh, '<&=', $rfd) or BAIL_OUT $!; - my @old = stat($r); - my @new = stat($rfh); - is("$old[0]\0$old[1]", "$new[0]\0$new[1]", - 'device/inode matches on received FD'); + my @orig = ($r, $w, $s2); + my @fd = map { fileno($_) } @orig; + ok($send_3fds->(fileno($s1), $fd[0], $fd[1], $fd[2]), + 'FDs sent'); + my (@fds) = $recv_3fds->(fileno($s2)); + is(scalar(@fds), 3, 'got 3 fds'); + use Data::Dumper; diag Dumper(\@fds); + is(scalar(grep(/\A\d+\z/, @fds)), 3, 'all valid FDs'); + my $i = 0; + my @cmp = map { + open my $new, $_, shift(@fds) or BAIL_OUT "open $! $i => $_"; + ($new, shift(@orig), $i++); + } (qw(<&= >&= +<&=)); + while (my ($new, $old, $fd) = splice(@cmp, 0, 3)) { + my @new = stat($new); + my @old = stat($old); + is("$old[0]\0$old[1]", "$new[0]\0$new[1]", + "device/inode matches on received FD:$fd"); + } } {
The daemon needs to flush stdout before disconnecting or killing clients, otherwise they may reread empty data on redirected outputs. We also don't want to unbuffer stdout too early in case we have lots of small chunks of data to output. The received ($self->{2}) will always have autoflush, matching normal STDERR behavior. --- lib/PublicInbox/LEI.pm | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 3ad5e01a..6f21da35 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -236,6 +236,7 @@ my %CONFIG_KEYS = ( sub x_it ($$) { # pronounced "exit" my ($self, $code) = @_; + $self->{1}->autoflush(1); # make sure client sees stdout before exit if (my $sig = ($code & 127)) { kill($sig, $self->{pid} // $$); } else { @@ -635,6 +636,7 @@ sub accept_dispatch { # Listener {post_accept} callback say $sock "timed out waiting to recv FDs"; return; } + $self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY # $ARGV_STR = join("]\0[", @ARGV); # $ENV_STR = join('', map { "$_=$ENV{$_}\0" } keys %ENV); # $line = "$$\0\0>$ARGV_STR\0\0>$ENV_STR\0\0"; @@ -773,4 +775,8 @@ sub oneshot { }, __PACKAGE__), @ARGV); } +# ensures stdout hits the FS before sock disconnects so a client +# can immediately reread it +sub DESTROY { $_[0]->{1}->autoflush(1) } + 1;