unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
From: Eric Wong <e@80x24.org>
To: meta@public-inbox.org
Subject: [PATCH 14/36] ipc: generic IPC dispatch based on Storable
Date: Thu, 31 Dec 2020 13:51:32 +0000	[thread overview]
Message-ID: <20201231135154.6070-15-e@80x24.org> (raw)
In-Reply-To: <20201231135154.6070-1-e@80x24.org>

I intend to use this with LeiStore when importing from multiple
slow sources at once (e.g. curl, IMAP, etc).  This is because
over.sqlite3 can only have a single writer, and we'll have
several slow readers running in parallel.

Watch and SearchIdxShard should also be able to use this code
in the future, but this will be proven with LeiStore, first.
---
 MANIFEST                    |   2 +
 lib/PublicInbox/IPC.pm      | 129 ++++++++++++++++++++++++++++++++++++
 lib/PublicInbox/LeiStore.pm |   2 +-
 t/ipc.t                     |  67 +++++++++++++++++++
 t/lei_store.t               |   5 ++
 5 files changed, 204 insertions(+), 1 deletion(-)
 create mode 100644 lib/PublicInbox/IPC.pm
 create mode 100644 t/ipc.t

diff --git a/MANIFEST b/MANIFEST
index 7ce2075e..96ad52bf 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -153,6 +153,7 @@ lib/PublicInbox/IMAPD.pm
 lib/PublicInbox/IMAPTracker.pm
 lib/PublicInbox/IMAPdeflate.pm
 lib/PublicInbox/IMAPsearchqp.pm
+lib/PublicInbox/IPC.pm
 lib/PublicInbox/IdxStack.pm
 lib/PublicInbox/Import.pm
 lib/PublicInbox/In2Tie.pm
@@ -327,6 +328,7 @@ t/index-git-times.t
 t/indexlevels-mirror-v1.t
 t/indexlevels-mirror.t
 t/init.t
+t/ipc.t
 t/iso-2202-jp.eml
 t/kqnotify.t
 t/lei-oneshot.t
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
new file mode 100644
index 00000000..c04140ae
--- /dev/null
+++ b/lib/PublicInbox/IPC.pm
@@ -0,0 +1,129 @@
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# base class for remote IPC calls, requires Storable
+# TODO: this ought to be usable in SearchIdxShard
+package PublicInbox::IPC;
+use strict;
+use v5.10.1;
+use Socket qw(AF_UNIX SOCK_STREAM);
+use Carp qw(confess croak);
+use PublicInbox::Sigfd;
+
+sub _get_rec ($) {
+	my ($sock) = @_;
+	local $/ = "\n";
+	defined(my $len = <$sock>) or return;
+	chop($len) eq "\n" or croak "no LF byte in $len";
+	defined(my $r = read($sock, my $buf, $len)) or croak "read error: $!";
+	$r == $len or croak "short read: $r != $len";
+	thaw($buf);
+}
+
+sub _send_rec ($$) {
+	my ($sock, $ref) = @_;
+	my $buf = freeze($ref);
+	print $sock length($buf), "\n", $buf or croak "print: $!";
+}
+
+sub ipc_return ($$$) {
+	my ($s2, $ret, $exc) = @_;
+	_send_rec($s2, $exc ? bless(\$exc, 'PublicInbox::IPC::Die') : $ret);
+}
+
+sub ipc_worker_loop ($$) {
+	my ($self, $s2) = @_;
+	$self->ipc_atfork_child if $self->can('ipc_atfork_child');
+	$s2->autoflush(1);
+	while (my $rec = _get_rec($s2)) {
+		my ($wantarray, $sub, @args) = @$rec;
+		if (!defined($wantarray)) { # no waiting if client doesn't care
+			eval { $self->$sub(@args) };
+			eval { warn "die: $@ (from nowait $sub)\n" } if $@;
+		} elsif ($wantarray) {
+			my @ret = eval { $self->$sub(@args) };
+			ipc_return($s2, \@ret, $@);
+		} else {
+			my $ret = eval { $self->$sub(@args) };
+			ipc_return($s2, \$ret, $@);
+		}
+	}
+}
+
+sub ipc_worker_spawn ($$$) {
+	my ($self, $ident, $oldset) = @_;
+	eval { require Storable; Storable->import(qw(freeze thaw)); };
+	if ($@) {
+		state $w //= warn "Storable (part of Perl) missing: $@\n";
+		return;
+	}
+	my $pid = $self->{-ipc_worker_pid};
+	confess "BUG: already spawned PID:$pid" if $pid;
+	confess "BUG: already have worker socket" if $self->{-ipc_sock};
+	my ($s1, $s2);
+	socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair: $!";
+	my $sigset = $oldset // PublicInbox::Sigfd::block_signals();
+	defined($pid = fork) or die "fork: $!";
+	if ($pid == 0) {
+		undef $s1;
+		local $0 = $ident;
+		$SIG{$_} = 'IGNORE' for (qw(TERM INT QUIT));
+		PublicInbox::Sigfd::sig_setmask($oldset);
+		eval { ipc_worker_loop($self, $s2) };
+		die "worker $ident died: $@\n" if $@;
+		$self->ipc_at_worker_exit if $self->can('ipc_at_worker_exit');
+		exit;
+	}
+	PublicInbox::Sigfd::sig_setmask($sigset) unless $oldset;
+	$s1->autoflush(1);
+	$self->{-ipc_sock} = $s1;
+	$self->{-ipc_worker_pid} = $pid;
+}
+
+sub ipc_reap_worker { # dwaitpid callback
+	my ($self, $pid) = @_;
+	warn "PID:$pid died with \$?=$?\n" if $?;
+}
+
+sub ipc_worker_stop {
+	my ($self) = @_;
+	my $pid;
+	if (delete $self->{-ipc_sock}) {
+		$pid = delete $self->{-ipc_worker_pid} or die "no PID?";
+	} else {
+		$pid = delete $self->{-ipc_worker_pid} and
+			die "unexpected PID:$pid";
+	}
+	return unless $pid;
+	eval { PublicInbox::DS::dwaitpid($pid, \&ipc_reap_worker, $self) };
+	if ($@) {
+		my $wp = waitpid($pid, 0);
+		$pid == $wp or die "waitpid($pid) returned $wp: \$?=$?";
+		ipc_reap_worker($self, $pid);
+	}
+}
+
+# use this if we have multiple readers reading curl or "pigz -dc"
+# and writing to the same store
+sub ipc_lock_init {
+	my ($self, $f) = @_;
+	require PublicInbox::Lock;
+	$self->{-ipc_lock} //= bless { lock_path => $f }, 'PublicInbox::Lock'
+}
+
+sub ipc_do {
+	my ($self, $sub, @args) = @_;
+	if (my $s1 = $self->{-ipc_sock}) {
+		my $ipc_lock = $self->{-ipc_lock};
+		my $lock = $ipc_lock ? $ipc_lock->lock_for_scope : undef;
+		_send_rec($s1, [ wantarray, $sub, @args ]);
+		return unless defined(wantarray);
+		my $ret = _get_rec($s1) // die "no response on $sub";
+		die $$ret if ref($ret) eq 'PublicInbox::IPC::Die';
+		wantarray ? @$ret : $$ret;
+	} else {
+		$self->$sub(@args);
+	}
+}
+
+1;
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index f8383d5e..2745c560 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -9,7 +9,7 @@
 package PublicInbox::LeiStore;
 use strict;
 use v5.10.1;
-use parent qw(PublicInbox::Lock);
+use parent qw(PublicInbox::Lock PublicInbox::IPC);
 use PublicInbox::SearchIdx qw(crlf_adjust);
 use PublicInbox::ExtSearchIdx;
 use PublicInbox::Import;
diff --git a/t/ipc.t b/t/ipc.t
new file mode 100644
index 00000000..f9c4024b
--- /dev/null
+++ b/t/ipc.t
@@ -0,0 +1,67 @@
+#!perl -w
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+require_ok 'PublicInbox::IPC';
+state $once = eval <<'';
+package PublicInbox::IPC;
+use strict;
+sub test_array { qw(test array) }
+sub test_scalar { 'scalar' }
+sub test_scalarref { \'scalarref' }
+sub test_undef { undef }
+sub test_die { shift; die @_; 'unreachable' }
+sub test_pid { $$ }
+1;
+
+my $ipc = bless {}, 'PublicInbox::IPC';
+my @t = qw(array scalar scalarref undef);
+my $test = sub {
+	my $x = shift;
+	for my $type (@t) {
+		my $m = "test_$type";
+		my @ret = $ipc->ipc_do($m);
+		my @exp = $ipc->$m;
+		is_deeply(\@ret, \@exp, "wantarray $m $x");
+
+		$ipc->ipc_do($m);
+
+		my $ret = $ipc->ipc_do($m);
+		my $exp = $ipc->$m;
+		is_deeply($ret, $exp, "!wantarray $m $x");
+	}
+	my $ret = eval { $ipc->test_die('phail') };
+	my $exp = $@;
+	$ret = eval { $ipc->ipc_do('test_die', 'phail') };
+	my $err = $@;
+	my %lines;
+	for ($err, $exp) {
+		s/ line (\d+).*//s and $lines{$1}++;
+	}
+	is(scalar keys %lines, 1, 'line numbers match');
+	is((values %lines)[0], 2, '2 hits on same line number');
+	is($err, $exp, "$x die matches");
+	is($ret, undef, "$x die did not return");
+};
+$test->('local');
+
+SKIP: {
+	require_mods(qw(Storable), 16);
+	my $pid = $ipc->ipc_worker_spawn('test worker');
+	ok($pid > 0 && kill(0, $pid), 'worker spawned and running');
+	defined($pid) or BAIL_OUT 'no spawn, no test';
+	is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned');
+	$test->('worker');
+	{
+		my ($tmp, $for_destroy) = tmpdir();
+		$ipc->ipc_lock_init("$tmp/lock");
+		is($ipc->ipc_do('test_pid'), $pid, 'worker pid returned');
+	}
+	$ipc->ipc_worker_stop;
+	ok(!kill(0, $pid) && $!{ESRCH}, 'worker stopped');
+}
+$ipc->ipc_worker_stop; # idempotent
+done_testing;
diff --git a/t/lei_store.t b/t/lei_store.t
index 03ab5af6..a189f897 100644
--- a/t/lei_store.t
+++ b/t/lei_store.t
@@ -85,4 +85,9 @@ for my $parallel (0, 1) {
 	is_deeply(\@kw, [], 'set clobbers all');
 }
 
+SKIP: {
+	require_mods(qw(Storable), 1);
+	ok($lst->can('ipc_do'), 'ipc_do works if we have Storable');
+}
+
 done_testing;

  parent reply	other threads:[~2020-12-31 13:51 UTC|newest]

Thread overview: 37+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-12-31 13:51 [PATCH 00/36] another round of lei stuff Eric Wong
2020-12-31 13:51 ` [PATCH 01/36] import: respect init.defaultBranch Eric Wong
2020-12-31 13:51 ` [PATCH 02/36] lei_store: use per-machine refname as git HEAD Eric Wong
2020-12-31 13:51 ` [PATCH 03/36] revert "lei_store: use per-machine refname as git HEAD" Eric Wong
2020-12-31 13:51 ` [PATCH 04/36] lei_to_mail: initial implementation for writing mbox formats Eric Wong
2020-12-31 13:51 ` [PATCH 05/36] sharedkv: fork()-friendly key-value store Eric Wong
2020-12-31 13:51 ` [PATCH 06/36] sharedkv: split out index_values Eric Wong
2020-12-31 13:51 ` [PATCH 07/36] lei_to_mail: start atomic and compressed mbox writing Eric Wong
2020-12-31 13:51 ` [PATCH 08/36] mboxreader: new class for reading various mbox formats Eric Wong
2020-12-31 13:51 ` [PATCH 09/36] lei_to_mail: start --augment, dedupe, bz2 and xz Eric Wong
2020-12-31 13:51 ` [PATCH 10/36] lei: implement various deduplication strategies Eric Wong
2020-12-31 13:51 ` [PATCH 11/36] lei_to_mail: lazy-require LeiDedupe Eric Wong
2020-12-31 13:51 ` [PATCH 12/36] lei_to_mail: support for non-seekable outputs Eric Wong
2020-12-31 13:51 ` [PATCH 13/36] lei_to_mail: support Maildir, fix+test --augment Eric Wong
2020-12-31 13:51 ` Eric Wong [this message]
2020-12-31 13:51 ` [PATCH 15/36] ipc: support Sereal Eric Wong
2020-12-31 13:51 ` [PATCH 16/36] lei_store: add ->set_eml, ->add_eml can return smsg Eric Wong
2020-12-31 13:51 ` [PATCH 17/36] lei: rename "extinbox" => "external" Eric Wong
2020-12-31 13:51 ` [PATCH 18/36] mid: use defined-or with `push' for uniqueness check Eric Wong
2020-12-31 13:51 ` [PATCH 19/36] mid: hoist out mids_in sub Eric Wong
2020-12-31 13:51 ` [PATCH 20/36] lei_store: handle messages without Message-ID at all Eric Wong
2020-12-31 13:51 ` [PATCH 21/36] ipc: use shutdown(2), base atfork* callback Eric Wong
2020-12-31 13:51 ` [PATCH 22/36] lei_to_mail: unlink mboxes if not augmenting Eric Wong
2020-12-31 13:51 ` [PATCH 23/36] lei: add --mfolder as an --output alias Eric Wong
2020-12-31 13:51 ` [PATCH 24/36] spawn: move run_die here from PublicInbox::Import Eric Wong
2020-12-31 13:51 ` [PATCH 25/36] init: remove embedded UnlinkMe package Eric Wong
2020-12-31 13:51 ` [PATCH 26/36] t/run: avoid uninitialized var on incomplete test Eric Wong
2020-12-31 13:51 ` [PATCH 27/36] gcf2client: reap process on DESTROY Eric Wong
2020-12-31 13:51 ` [PATCH 28/36] lei_to_mail: open FIFOs O_WRONLY so we block Eric Wong
2020-12-31 13:51 ` [PATCH 29/36] searchidxshard: call DS->Reset at worker start Eric Wong
2020-12-31 13:51 ` [PATCH 30/36] t/ipc.t: test for references via `die' Eric Wong
2020-12-31 13:51 ` [PATCH 31/36] use PublicInbox::DS for dwaitpid Eric Wong
2020-12-31 13:51 ` [PATCH 32/36] syscall: SFD_NONBLOCK can be a constant, again Eric Wong
2020-12-31 13:51 ` [PATCH 33/36] lei: avoid Spawn package when starting daemon Eric Wong
2020-12-31 13:51 ` [PATCH 34/36] avoid calling waitpid from children in DESTROY Eric Wong
2020-12-31 13:51 ` [PATCH 35/36] ds: clobber $in_loop first at reset Eric Wong
2020-12-31 13:51 ` [PATCH 36/36] on_destroy: support PID owner guard Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://public-inbox.org/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20201231135154.6070-15-e@80x24.org \
    --to=e@80x24.org \
    --cc=meta@public-inbox.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).