unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 2/2] ipc: wq: handle >MAX_ARG_STRLEN && <EMSGSIZE case
Date: Tue, 25 May 2021 22:20:01 +0000	[thread overview]
Message-ID: <20210525222001.27517-3-e@80x24.org> (raw)
In-Reply-To: <20210525222001.27517-1-e@80x24.org>

WQWorkers are limited roughly to MAX_ARG_STRLEN (the kernel
limit of argv + environ) to avoid excessive memory growth.
Occasionally, we need to send larger messages via workqueues
that are too small to hit EMSGSIZE on the sender.

This fixes "lei q" when using HTTP(S) externals, since that
code path sends large Eml objects from lei_xsearch workers
directly to lei2mail WQ workers.
---
 lib/PublicInbox/IPC.pm      | 45 +++++++++++++++++++++++--------------
 lib/PublicInbox/WQWorker.pm |  2 +-
 t/ipc.t                     | 11 ++++++---
 3 files changed, 37 insertions(+), 21 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 24237773..497a6035 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -17,6 +17,7 @@ use PublicInbox::Spawn;
 use PublicInbox::OnDestroy;
 use PublicInbox::WQWorker;
 use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
+my $MY_MAX_ARG_STRLEN = 4096 * 33; # extra 4K for serialization
 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
 our @EXPORT_OK = qw(ipc_freeze ipc_thaw);
 my $WQ_MAX_WORKERS = 4096;
@@ -213,7 +214,7 @@ sub ipc_sibling_atfork_child {
 
 sub recv_and_run {
 	my ($self, $s2, $len, $full_stream) = @_;
-	my @fds = $recv_cmd->($s2, my $buf, $len);
+	my @fds = $recv_cmd->($s2, my $buf, $len // $MY_MAX_ARG_STRLEN);
 	return if scalar(@fds) && !defined($fds[0]);
 	my $n = length($buf) or return 0;
 	my $nfd = 0;
@@ -268,27 +269,37 @@ sub wq_broadcast {
 	}
 }
 
+sub stream_in_full ($$$) {
+	my ($s1, $fds, $buf) = @_;
+	socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
+		croak "socketpair: $!";
+	my $n = $send_cmd->($s1, [ fileno($r) ],
+			ipc_freeze(['do_sock_stream', length($buf)]),
+			MSG_EOR) // croak "sendmsg: $!";
+	undef $r;
+	$n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!";
+	while ($n < length($buf)) {
+		my $x = syswrite($w, $buf, length($buf) - $n, $n) //
+				croak "syswrite: $!";
+		croak "syswrite wrote 0 bytes" if $x == 0;
+		$n += $x;
+	}
+}
+
 sub wq_io_do { # always async
 	my ($self, $sub, $ios, @args) = @_;
 	if (my $s1 = $self->{-wq_s1}) { # run in worker
 		my $fds = [ map { fileno($_) } @$ios ];
 		my $buf = ipc_freeze([$sub, @args]);
-		my $n = $send_cmd->($s1, $fds, $buf, MSG_EOR);
-		return if defined($n); # likely
-		croak "sendmsg: $! (check RLIMIT_NOFILE)" if $!{ETOOMANYREFS};
-		croak "sendmsg: $!" if !$!{EMSGSIZE};
-		socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
-			croak "socketpair: $!";
-		$n = $send_cmd->($s1, [ fileno($r) ],
-				ipc_freeze(['do_sock_stream', length($buf)]),
-				MSG_EOR) // croak "sendmsg: $!";
-		undef $r;
-		$n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!";
-		while ($n < length($buf)) {
-			my $x = syswrite($w, $buf, length($buf) - $n, $n) //
-					croak "syswrite: $!";
-			croak "syswrite wrote 0 bytes" if $x == 0;
-			$n += $x;
+		if (length($buf) > $MY_MAX_ARG_STRLEN) {
+			stream_in_full($s1, $fds, $buf);
+		} else {
+			my $n = $send_cmd->($s1, $fds, $buf, MSG_EOR);
+			return if defined($n); # likely
+			$!{ETOOMANYREFS} and
+				croak "sendmsg: $! (check RLIMIT_NOFILE)";
+			$!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
+			croak("sendmsg: $!");
 		}
 	} else {
 		@$self{0..$#$ios} = @$ios;
diff --git a/lib/PublicInbox/WQWorker.pm b/lib/PublicInbox/WQWorker.pm
index 3636321e..f7aa61c5 100644
--- a/lib/PublicInbox/WQWorker.pm
+++ b/lib/PublicInbox/WQWorker.pm
@@ -23,7 +23,7 @@ sub event_step {
 	my ($self) = @_;
 	my $n;
 	do {
-		$n = $self->{wq}->recv_and_run($self->{sock}, 4096 * 33);
+		$n = $self->{wq}->recv_and_run($self->{sock});
 	} while ($n);
 	return if !defined($n) && $! == EAGAIN; # likely
 	warn "wq worker error: $!\n" if !defined($n) && $! != ECONNRESET;
diff --git a/t/ipc.t b/t/ipc.t
index ca88eb59..7983fdc0 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -122,11 +122,16 @@ for my $t ('local', 'worker', 'worker again') {
 	$ipc->wq_io_do('test_sha', [ $wa, $wb ], 'hello world');
 	is(readline($rb), sha1_hex('hello world')."\n", "SHA small ($t)");
 	{
-		my $bigger = $big x 10;
+		my $bigger = $big x 10; # to hit EMSGSIZE
 		$ipc->wq_io_do('test_sha', [ $wa, $wb ], $bigger);
 		my $exp = sha1_hex($bigger)."\n";
-		undef $bigger;
-		is(readline($rb), $exp, "SHA big ($t)");
+		is(readline($rb), $exp, "SHA big for EMSGSIZE ($t)");
+
+		# to hit the WQWorker recv_and_run length
+		substr($bigger, my $MY_MAX_ARG_STRLEN = 4096 * 33, -1) = '';
+		$ipc->wq_io_do('test_sha', [ $wa, $wb ], $bigger);
+		$exp = sha1_hex($bigger)."\n";
+		is(readline($rb), $exp, "SHA WQWorker limit ($t)");
 	}
 	my $ppid = $ipc->wq_workers_start('wq', 1);
 	push(@ppids, $ppid);

      parent reply	other threads:[~2021-05-25 22:20 UTC|newest]

Thread overview: 3+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-05-25 22:19 [PATCH 0/2] ipc: fix "lei q" w/ HTTP(S) externals Eric Wong
2021-05-25 22:20 ` [PATCH 1/2] ipc: avoid potential stack-not-refcounted bug Eric Wong
2021-05-25 22:20 ` Eric Wong [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210525222001.27517-3-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).