From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id C6E001FA12 for ; Sun, 24 Jan 2021 11:46:55 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 2/9] ipc: wq supports arbitrarily large payloads Date: Sun, 24 Jan 2021 04:46:48 -0700 Message-Id: <20210124114655.12815-3-e@80x24.org> In-Reply-To: <20210124114655.12815-1-e@80x24.org> References: <20210124114655.12815-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This should not be needed, but somebody using lei could theoretically create thousands of external URLs and only have a handful of workers, which means the per-worker URI list could be large. --- lib/PublicInbox/IPC.pm | 82 +++++++++++++++++++++++++++++------------- t/cmd_ipc.t | 16 +++++++++ t/ipc.t | 22 +++++++++++- 3 files changed, 94 insertions(+), 26 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 6efaff38..592efd21 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -8,8 +8,10 @@ use v5.10.1; use Carp qw(confess croak); use PublicInbox::DS qw(dwaitpid); use PublicInbox::Spawn; -use POSIX qw(WNOHANG); -use Socket qw(AF_UNIX MSG_EOR); +use POSIX qw(mkfifo WNOHANG); +use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM); +use Errno qw(EMSGSIZE); +use File::Temp 0.19 (); # 0.19 for ->newdir my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough? use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF(); my $WQ_MAX_WORKERS = 4096; @@ -245,39 +247,69 @@ sub ipc_sibling_atfork_child { $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself"; } +sub _recv_and_run { + my ($self, $s2, $len, $full_stream) = @_; + my @fds = $recv_cmd->($s2, my $buf, $len); + my $n = length($buf // '') or return; + my @m = @{$self->{-wq_recv_modes} // [qw( +<&= >&= >&= )]}; + my $nfd = 0; + for my $fd (@fds) { + my $mode = shift(@m); + if (open(my $cmdfh, $mode, $fd)) { + $self->{$nfd++} = $cmdfh; + $cmdfh->autoflush(1); + } else { + die "$$ open($mode$fd) (FD:$nfd): $!"; + } + } + while ($full_stream && $n < $len) { + my $r = sysread($s2, $buf, $len - $n, $n) // croak "read: $!"; + croak "read EOF after $n/$len bytes" if $r == 0; + $n = length($buf); + } + # Sereal dies on truncated data, Storable returns undef + my $args = thaw($buf) // die "thaw error on buffer of size: $n"; + undef $buf; + my $sub = shift @$args; + eval { $self->$sub(@$args) }; + warn "$$ wq_worker: $@" if $@ && ref($@) ne 'PublicInbox::SIGPIPE'; + delete @$self{0..($nfd-1)}; + $n; +} + sub wq_worker_loop ($) { my ($self) = @_; my $len = $self->{wq_req_len} // (4096 * 33); my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2'; - while (1) { - my @fds = $recv_cmd->($s2, my $buf, $len) or return; # EOF - my @m = @{$self->{-wq_recv_modes} // [qw( +<&= >&= >&= )]}; - my $nfd = 0; - for my $fd (@fds) { - my $mode = shift(@m); - if (open(my $cmdfh, $mode, $fd)) { - $self->{$nfd++} = $cmdfh; - $cmdfh->autoflush(1); - } else { - die "$$ open($mode$fd) (FD:$nfd): $!"; - } - } - # Sereal dies on truncated data, Storable returns undef - my $args = thaw($buf) // - die "thaw error on buffer of size:".length($buf); - my $sub = shift @$args; - eval { $self->$sub(@$args) }; - warn "$$ wq_worker: $@" if $@ && - ref($@) ne 'PublicInbox::SIGPIPE'; - delete @$self{0..($nfd-1)}; - } + 1 while (_recv_and_run($self, $s2, $len)); +} + +sub do_sock_stream { # via wq_do, for big requests + my ($self, $len) = @_; + _recv_and_run($self, delete $self->{0}, $len, 1); } sub wq_do { # always async my ($self, $sub, $ios, @args) = @_; if (my $s1 = $self->{-wq_s1}) { # run in worker my $fds = [ map { fileno($_) } @$ios ]; - $send_cmd->($s1, $fds, freeze([$sub, @args]), MSG_EOR); + my $n = $send_cmd->($s1, $fds, freeze([$sub, @args]), MSG_EOR); + return if defined($n); + croak "sendmsg error: $!" if $! != EMSGSIZE; + socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or + croak "socketpair: $!"; + my $buf = freeze([$sub, @args]); + $n = $send_cmd->($s1, [ fileno($r) ], + freeze(['do_sock_stream', length($buf)]), + MSG_EOR) // croak "sendmsg: $!"; + undef $r; + $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!"; + while ($n < length($buf)) { + my $x = syswrite($w, $buf, length($buf) - $n, $n) // + croak "syswrite: $!"; + croak "syswrite wrote 0 bytes" if $x == 0; + $n += $x; + } } else { @$self{0..$#$ios} = @$ios; eval { $self->$sub(@args) }; diff --git a/t/cmd_ipc.t b/t/cmd_ipc.t index 96510175..84f8fb4d 100644 --- a/t/cmd_ipc.t +++ b/t/cmd_ipc.t @@ -82,6 +82,22 @@ my $do_test = sub { SKIP: { @fds = $recv->($s2, $buf, length($src)); is(scalar(@fds), 0, 'no FDs received'); is($buf, $src, 'recv w/o FDs'); + + my $nr = 2 * 1024 * 1024; + while (1) { + vec(my $vec = '', $nr * 8 - 1, 1) = 1; + my $n = $send->($s1, [], $vec, $flag); + if (defined($n)) { + $n == length($vec) or + fail "short send: $n != ".length($vec); + diag "sent $nr, retrying with more"; + $nr += 2 * 1024 * 1024; + } else { + ok($!{EMSGSIZE}, 'got EMSGSIZE'); + # diag "$nr bytes hits EMSGSIZE"; + last; + } + } } } }; diff --git a/t/ipc.t b/t/ipc.t index 22423a78..f25f2491 100644 --- a/t/ipc.t +++ b/t/ipc.t @@ -6,11 +6,13 @@ use v5.10.1; use Test::More; use PublicInbox::TestCommon; use Fcntl qw(SEEK_SET); +use Digest::SHA qw(sha1_hex); require_mods(qw(Storable||Sereal)); require_ok 'PublicInbox::IPC'; state $once = eval <<''; package PublicInbox::IPC; use strict; +use Digest::SHA qw(sha1_hex); sub test_array { qw(test array) } sub test_scalar { 'scalar' } sub test_scalarref { \'scalarref' } @@ -24,6 +26,11 @@ sub test_write_each_fd { $self->{$fd}->flush; } } +sub test_sha { + my ($self, $buf) = @_; + print { $self->{1} } sha1_hex($buf), "\n"; + $self->{1}->flush; +} 1; my $ipc = bless {}, 'PublicInbox::IPC'; @@ -112,7 +119,7 @@ $test->('local'); $ipc->ipc_worker_stop; # idempotent # work queues -$ipc->wq_set_recv_modes(qw( >&= >&= >&= )); +$ipc->wq_set_recv_modes(qw( +>&= >&= >&= )); pipe(my ($ra, $wa)) or BAIL_OUT $!; pipe(my ($rb, $wb)) or BAIL_OUT $!; pipe(my ($rc, $wc)) or BAIL_OUT $!; @@ -120,6 +127,10 @@ open my $warn, '+>', undef or BAIL_OUT; $warn->autoflush(0); local $SIG{__WARN__} = sub { print $warn "PID:$$ ", @_ }; my @ppids; +open my $agpl, '<', 'COPYING' or BAIL_OUT "AGPL-3 missing: $!"; +my $big = do { local $/; <$agpl> } // BAIL_OUT "read: $!"; +close $agpl or BAIL_OUT "close: $!"; + for my $t ('local', 'worker', 'worker again') { $ipc->wq_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world'); my $i = 0; @@ -130,6 +141,15 @@ for my $t ('local', 'worker', 'worker again') { $i++; } $ipc->wq_do('test_die', [ $wa, $wb, $wc ]); + $ipc->wq_do('test_sha', [ $wa, $wb ], 'hello world'); + is(readline($rb), sha1_hex('hello world')."\n", "SHA small ($t)"); + { + my $bigger = $big x 10; + $ipc->wq_do('test_sha', [ $wa, $wb ], $bigger); + my $exp = sha1_hex($bigger)."\n"; + undef $bigger; + is(readline($rb), $exp, "SHA big ($t)"); + } my $ppid = $ipc->wq_workers_start('wq', 1); push(@ppids, $ppid); }