From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 15/22] ipc: start supporting sending/receiving more than 3 FDs
Date: Sun, 10 Jan 2021 12:15:12 +0000 [thread overview]
Message-ID: <20210110121519.17044-16-e@80x24.org> (raw)
In-Reply-To: <20210110121519.17044-1-e@80x24.org>
Actually, sending 4 FDs will be useful for lei internal xsearch
work once we start accepting input from stdin. It won't be used
with the lightweight lei(1) client, however.
For WWW (eventually), a single FD may be enough.
---
lib/PublicInbox/CmdIPC1.pm | 16 +++++++-----
lib/PublicInbox/CmdIPC4.pm | 12 +++++----
lib/PublicInbox/IPC.pm | 13 +++++-----
lib/PublicInbox/LeiXSearch.pm | 6 ++---
lib/PublicInbox/Spawn.pm | 48 ++++++++++++++++++++---------------
script/lei | 2 +-
t/cmd_ipc.t | 5 ++--
t/ipc.t | 6 ++---
8 files changed, 60 insertions(+), 48 deletions(-)
diff --git a/lib/PublicInbox/CmdIPC1.pm b/lib/PublicInbox/CmdIPC1.pm
index 0eed8bed..de6e54ef 100644
--- a/lib/PublicInbox/CmdIPC1.pm
+++ b/lib/PublicInbox/CmdIPC1.pm
@@ -10,17 +10,19 @@ BEGIN { eval {
require IO::FDPass; # XS, available in all major distros
no warnings 'once';
-*send_cmd1 = sub ($$$$$$) { # (sock, in, out, err, buf, flags) = @_;
- for (1..3) {
- IO::FDPass::send(fileno($_[0]), $_[$_]) or
+*send_cmd1 = sub ($$$$) { # (sock, fds, buf, flags) = @_;
+ my ($sock, $fds, undef, $flags) = @_;
+ for my $fd (@$fds) {
+ IO::FDPass::send(fileno($sock), $fd) or
die "IO::FDPass::send: $!";
}
- send($_[0], $_[4], $_[5]) or die "send $!";
+ send($sock, $_[2], $flags) or die "send $!";
};
-*recv_cmd1 = sub ($$$) {
- my ($s, undef, $len) = @_;
- my @fds = map { IO::FDPass::recv(fileno($s)) } (0..2);
+*recv_cmd1 = sub ($$$;$) {
+ my ($s, undef, $len, $nfds) = @_;
+ $nfds //= 3;
+ my @fds = map { IO::FDPass::recv(fileno($s)) } (1..$nfds);
recv($s, $_[1], $len, 0) // die "recv: $!";
length($_[1]) == 0 ? () : @fds;
};
diff --git a/lib/PublicInbox/CmdIPC4.pm b/lib/PublicInbox/CmdIPC4.pm
index 90fca62d..c4fcb0d6 100644
--- a/lib/PublicInbox/CmdIPC4.pm
+++ b/lib/PublicInbox/CmdIPC4.pm
@@ -13,10 +13,12 @@ require Socket::MsgHdr; # XS
no warnings 'once';
# 3 FDs per-sendmsg(2) + buffer
-*send_cmd4 = sub ($$$$$$) { # (sock, in, out, err, buf, flags) = @_;
- my $mh = Socket::MsgHdr->new(buf => $_[4]);
- $mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS, pack('iii', @_[1,2,3]));
- Socket::MsgHdr::sendmsg($_[0], $mh, $_[5]) or die "sendmsg: $!";
+*send_cmd4 = sub ($$$$) { # (sock, fds, buf, flags) = @_;
+ my ($sock, $fds, undef, $flags) = @_;
+ my $mh = Socket::MsgHdr->new(buf => $_[2]);
+ $mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS,
+ pack('i' x scalar(@$fds), @$fds));
+ Socket::MsgHdr::sendmsg($sock, $mh, $flags) or die "sendmsg: $!";
};
*recv_cmd4 = sub ($$$) {
@@ -26,7 +28,7 @@ no warnings 'once';
$_[1] = $mh->buf;
return () if $r == 0;
my (undef, undef, $data) = $mh->cmsghdr;
- unpack('iii', $data);
+ unpack('i' x (length($data) / 4), $data);
};
} } # /eval /BEGIN
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index be5b2f45..b0a0bfb5 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -263,16 +263,15 @@ sub wq_worker_loop ($) {
}
sub wq_do { # always async
- my ($self, $sub, $in, $out, $err, @args) = @_;
+ my ($self, $sub, $ios, @args) = @_;
if (my $s1 = $self->{-wq_s1}) { # run in worker
- $_ = fileno($_) for ($in, $out, $err);
- $send_cmd->($s1, $in, $out, $err,
- freeze([$sub, @args]), MSG_EOR);
+ my $fds = [ map { fileno($_) } @$ios ];
+ $send_cmd->($s1, $fds, freeze([$sub, @args]), MSG_EOR);
} else {
- @$self{0, 1, 2} = ($in, $out, $err);
+ @$self{0..$#$ios} = @$ios;
eval { $self->$sub(@args) };
warn "wq_do: $@" if $@;
- delete @$self{0, 1, 2};
+ delete @$self{0..$#$ios};
}
}
@@ -334,7 +333,7 @@ sub wq_worker_decr { # SIGTTOU handler, kills first idle worker
my ($self) = @_;
my $workers = $self->{-wq_workers} or return;
my $s2 = $self->{-wq_s2} // die 'BUG: no wq_s2';
- $self->wq_do('wq_exit', $s2, $s2, $s2);
+ $self->wq_do('wq_exit', [ $s2, $s2, $s2 ]);
$self->{-wq_exit_pending}++;
# caller must call wq_worker_decr_wait in main loop
}
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index a3010efe..c0df21a8 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -150,14 +150,14 @@ sub do_query {
$io[2]->autoflush(1);
if ($lei->{opt}->{thread}) {
for my $ibxish (@$srcs) {
- $self->wq_do('query_thread_mset', @io, $lei, $ibxish);
+ $self->wq_do('query_thread_mset', \@io, $lei, $ibxish);
}
} else {
- $self->wq_do('query_mset', @io, $lei, $srcs);
+ $self->wq_do('query_mset', \@io, $lei, $srcs);
}
# TODO
for my $rmt (@{$self->{remotes} // []}) {
- $self->wq_do('query_thread_mbox', @io, $lei, $rmt);
+ $self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
}
}
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index 7d0d9597..b35bf54c 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -209,20 +209,22 @@ my $fdpass = <<'FDPASS';
#include <sys/socket.h>
#if defined(CMSG_SPACE) && defined(CMSG_LEN)
-struct my_3fds { int fds[3]; };
+#define SEND_FD_CAPA 3
+#define SEND_FD_SPACE (SEND_FD_CAPA * sizeof(int))
union my_cmsg {
struct cmsghdr hdr;
- char pad[sizeof(struct cmsghdr)+ 8 + sizeof(struct my_3fds) + 8];
+ char pad[sizeof(struct cmsghdr) + 16 + SEND_FD_SPACE];
};
-int send_cmd4(PerlIO *s, int in, int out, int err, SV *data, int flags)
+int send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags)
{
struct msghdr msg = { 0 };
- struct iovec iov;
union my_cmsg cmsg = { 0 };
- int *fdp;
- size_t i;
STRLEN dlen = 0;
+ struct iovec iov;
+ AV *fds = (AV *)SvRV(svfds);
+ I32 i, nfds = av_len(fds) + 1;
+ int *fdp;
if (SvOK(data)) {
iov.iov_base = SvPV(data, dlen);
@@ -234,16 +236,22 @@ int send_cmd4(PerlIO *s, int in, int out, int err, SV *data, int flags)
}
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
- msg.msg_control = &cmsg.hdr;
- msg.msg_controllen = CMSG_SPACE(sizeof(struct my_3fds));
-
- cmsg.hdr.cmsg_level = SOL_SOCKET;
- cmsg.hdr.cmsg_type = SCM_RIGHTS;
- cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(struct my_3fds));
- fdp = (int *)CMSG_DATA(&cmsg.hdr);
- *fdp++ = in;
- *fdp++ = out;
- *fdp++ = err;
+ if (nfds) {
+ if (nfds > SEND_FD_CAPA) {
+ fprintf(stderr, "FIXME: bump SEND_FD_CAPA=%d\n", nfds);
+ nfds = SEND_FD_CAPA;
+ }
+ msg.msg_control = &cmsg.hdr;
+ msg.msg_controllen = CMSG_SPACE(nfds * sizeof(int));
+ cmsg.hdr.cmsg_level = SOL_SOCKET;
+ cmsg.hdr.cmsg_type = SCM_RIGHTS;
+ cmsg.hdr.cmsg_len = CMSG_LEN(nfds * sizeof(int));
+ fdp = (int *)CMSG_DATA(&cmsg.hdr);
+ for (i = 0; i < nfds; i++) {
+ SV **fd = av_fetch(fds, i, 0);
+ *fdp++ = SvIV(*fd);
+ }
+ }
return sendmsg(PerlIO_fileno(s), &msg, flags) >= 0;
}
@@ -263,17 +271,17 @@ void recv_cmd4(PerlIO *s, SV *buf, STRLEN n)
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = &cmsg.hdr;
- msg.msg_controllen = CMSG_SPACE(sizeof(struct my_3fds));
+ msg.msg_controllen = CMSG_SPACE(SEND_FD_SPACE);
i = recvmsg(PerlIO_fileno(s), &msg, 0);
if (i < 0)
croak("recvmsg: %s", strerror(errno));
SvCUR_set(buf, i);
if (i > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
- cmsg.hdr.cmsg_type == SCM_RIGHTS &&
- cmsg.hdr.cmsg_len == CMSG_LEN(sizeof(struct my_3fds))) {
+ cmsg.hdr.cmsg_type == SCM_RIGHTS) {
+ size_t len = cmsg.hdr.cmsg_len;
int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
- for (i = 0; i < 3; i++)
+ for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++)
Inline_Stack_Push(sv_2mortal(newSViv(*fdp++)));
}
Inline_Stack_Done;
diff --git a/script/lei b/script/lei
index d954b9eb..5e30f4d7 100755
--- a/script/lei
+++ b/script/lei
@@ -67,7 +67,7 @@ Falling back to (slow) one-shot mode
$buf .= "\0\0";
select $sock;
$| = 1; # unbuffer selected $sock
- $send_cmd->($sock, 0, 1, 2, $buf, 0);
+ $send_cmd->($sock, [ 0, 1, 2 ], $buf, 0);
while ($buf = <$sock>) {
$buf =~ /\Aexit=([0-9]+)\n\z/ and exit($1 + 0);
die $buf;
diff --git a/t/cmd_ipc.t b/t/cmd_ipc.t
index b9f4d128..22f73c19 100644
--- a/t/cmd_ipc.t
+++ b/t/cmd_ipc.t
@@ -17,7 +17,8 @@ my $do_test = sub { SKIP: {
my ($s1, $s2);
my $src = 'some payload' x 40;
socketpair($s1, $s2, AF_UNIX, $type, 0) or BAIL_OUT $!;
- $send->($s1, fileno($r), fileno($w), fileno($s1), $src, $flag);
+ my $sfds = [ fileno($r), fileno($w), fileno($s1) ];
+ $send->($s1, $sfds, $src, $flag);
my (@fds) = $recv->($s2, my $buf, length($src) + 1);
is($buf, $src, 'got buffer payload '.$desc);
my ($r1, $w1, $s1a);
@@ -39,7 +40,7 @@ my $do_test = sub { SKIP: {
if (defined($SOCK_SEQPACKET) && $type == $SOCK_SEQPACKET) {
$r1 = $w1 = $s1a = undef;
$src = (',' x 1023) . '-' .('.' x 1024);
- $send->($s1, fileno($r), fileno($w), fileno($s1), $src, $flag);
+ $send->($s1, $sfds, $src, $flag);
(@fds) = $recv->($s2, $buf, 1024);
is($buf, (',' x 1023) . '-', 'silently truncated buf');
$opens->();
diff --git a/t/ipc.t b/t/ipc.t
index 903294c5..d2b6ad4f 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -121,7 +121,7 @@ $warn->autoflush(0);
local $SIG{__WARN__} = sub { print $warn "PID:$$ ", @_ };
my @ppids;
for my $t ('local', 'worker', 'worker again') {
- $ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, 'hello world');
+ $ipc->wq_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world');
my $i = 0;
for my $fh ($ra, $rb, $rc) {
my $buf = readline($fh);
@@ -129,7 +129,7 @@ for my $t ('local', 'worker', 'worker again') {
like($buf, qr/\Ai=$i \d+ hello world\z/, "got expected ($t)");
$i++;
}
- $ipc->wq_do('test_die', $wa, $wb, $wc);
+ $ipc->wq_do('test_die', [ $wa, $wb, $wc ]);
my $ppid = $ipc->wq_workers_start('wq', 1);
push(@ppids, $ppid);
}
@@ -142,7 +142,7 @@ SKIP: {
my $pid = fork // BAIL_OUT $!;
if ($pid == 0) {
use POSIX qw(_exit);
- $ipc->wq_do('test_write_each_fd', $wa, $wb, $wc, $$);
+ $ipc->wq_do('test_write_each_fd', [ $wa, $wb, $wc ], $$);
_exit(0);
} else {
my $i = 0;
next prev parent reply other threads:[~2021-01-10 12:15 UTC|newest]
Thread overview: 23+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-01-10 12:14 [PATCH 00/22] lei query overview views Eric Wong
2021-01-10 12:14 ` [PATCH 01/22] lei query + pagination sorta working Eric Wong
2021-01-10 12:14 ` [PATCH 02/22] lei q: deduplicate smsg Eric Wong
2021-01-10 12:15 ` [PATCH 03/22] ds: block signals when reaping Eric Wong
2021-01-10 12:15 ` [PATCH 04/22] ipc: add support for asynchronous callbacks Eric Wong
2021-01-10 12:15 ` [PATCH 05/22] cmd_ipc: send FDs with buffer payload Eric Wong
2021-01-10 12:15 ` [PATCH 06/22] ipc: avoid excessive evals Eric Wong
2021-01-10 12:15 ` [PATCH 07/22] ipc: work queue support via SOCK_SEQPACKET Eric Wong
2021-01-10 12:15 ` [PATCH 08/22] ipc: eliminate ipc_worker_stop method Eric Wong
2021-01-10 12:15 ` [PATCH 09/22] ipc: wq: support dynamic worker count change Eric Wong
2021-01-10 12:15 ` [PATCH 10/22] ipc: drop -ipc_parent_pid field Eric Wong
2021-01-10 12:15 ` [PATCH 11/22] ipc: DESTROY and wq_workers methods Eric Wong
2021-01-10 12:15 ` [PATCH 12/22] lei: rename $w to $wpager for warning message Eric Wong
2021-01-10 12:15 ` [PATCH 13/22] lei: fix oneshot TTY detection by passing STD*{GLOB} Eric Wong
2021-01-10 12:15 ` [PATCH 14/22] lei: query: ensure pager exit is instantaneous Eric Wong
2021-01-10 12:15 ` Eric Wong [this message]
2021-01-10 12:15 ` [PATCH 16/22] ipc: fix IO::FDPass use with a worker limit of 1 Eric Wong
2021-01-10 12:15 ` [PATCH 17/22] ipc: drop unused fields, default sighandlers for wq Eric Wong
2021-01-10 12:15 ` [PATCH 18/22] lei: get rid of client {pid} field Eric Wong
2021-01-10 12:15 ` [PATCH 19/22] lei: fork + FD cleanup Eric Wong
2021-01-10 12:15 ` [PATCH 20/22] lei: run pager in client script Eric Wong
2021-01-10 12:15 ` [PATCH 21/22] lei_xsearch: transfer 4 FDs internally, drop IO::FDPass Eric Wong
2021-01-10 12:15 ` [PATCH 22/22] lei: query: restore JSON output overview Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://public-inbox.org/README
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20210110121519.17044-16-e@80x24.org \
--to=e@80x24.org \
--cc=meta@public-inbox.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).