From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id C10301F934 for ; Sun, 19 Sep 2021 12:50:35 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 01/16] ipc: wq_do: support synchronous waits and responses Date: Sun, 19 Sep 2021 12:50:20 +0000 Message-Id: <20210919125035.6331-2-e@80x24.org> In-Reply-To: <20210919125035.6331-1-e@80x24.org> References: <20210919125035.6331-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This brings the wq_* SOCK_SEQPACKET API functionality on par with the ipc_do (pipe-based) API. --- lib/PublicInbox/IPC.pm | 36 ++++++++++++++++++++++++++++++++---- t/ipc.t | 6 ++++++ 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 9efe551b..d5e37719 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -182,6 +182,13 @@ sub ipc_lock_init { $self->{-ipc_lock} //= bless { lock_path => $f }, 'PublicInbox::Lock' } +sub _wait_return ($$) { + my ($r_res, $sub) = @_; + my $ret = _get_rec($r_res) // die "no response on $sub"; + die $$ret if ref($ret) eq 'PublicInbox::IPC::Die'; + wantarray ? @$ret : $$ret; +} + # call $self->$sub(@args), on a worker if ipc_worker_spawn was used sub ipc_do { my ($self, $sub, @args) = @_; @@ -191,9 +198,7 @@ sub ipc_do { if (defined(wantarray)) { my $r_res = $self->{-ipc_res} or die 'no ipc_res'; _send_rec($w_req, [ wantarray, $sub, @args ]); - my $ret = _get_rec($r_res) // die "no response on $sub"; - die $$ret if ref($ret) eq 'PublicInbox::IPC::Die'; - wantarray ? @$ret : $$ret; + _wait_return($r_res, $sub); } else { # likely, fire-and-forget into pipe _send_rec($w_req, [ undef , $sub, @args ]); } @@ -298,7 +303,7 @@ sub wq_io_do { # always async $!{ETOOMANYREFS} and croak "sendmsg: $! (check RLIMIT_NOFILE)"; $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) : - croak("sendmsg: $!"); + croak("sendmsg: $!"); } } else { @$self{0..$#$ios} = @$ios; @@ -308,6 +313,29 @@ sub wq_io_do { # always async } } +sub wq_sync_run { + my ($self, $wantarray, $sub, @args) = @_; + if ($wantarray) { + my @ret = eval { $self->$sub(@args) }; + ipc_return($self->{0}, \@ret, $@); + } else { # '' => wantscalar + my $ret = eval { $self->$sub(@args) }; + ipc_return($self->{0}, \$ret, $@); + } +} + +sub wq_do { + my ($self, $sub, @args) = @_; + if (defined(wantarray)) { + pipe(my ($r, $w)) or die "pipe: $!"; + wq_io_do($self, 'wq_sync_run', [ $w ], wantarray, $sub, @args); + undef $w; + _wait_return($r, $sub); + } else { + wq_io_do($self, $sub, [], @args); + } +} + sub _wq_worker_start ($$$) { my ($self, $oldset, $fields) = @_; my ($bcast1, $bcast2); diff --git a/t/ipc.t b/t/ipc.t index 7983fdc0..202b1cc6 100644 --- a/t/ipc.t +++ b/t/ipc.t @@ -161,6 +161,12 @@ SKIP: { is(waitpid($pid, 0), $pid, 'waitpid complete'); is($?, 0, 'child wq producer exited'); } + my @ary = $ipc->wq_do('test_array'); + is_deeply(\@ary, [ qw(test array) ], 'wq_do wantarray'); + is(my $s = $ipc->wq_do('test_scalar'), 'scalar', 'defined wantarray'); + my $exp = bless ['blessed'], 'PublicInbox::WTF'; + my $ret = eval { $ipc->wq_do('test_die', $exp) }; + is_deeply($@, $exp, 'die with blessed ref'); } $ipc->wq_close;