unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
* [PATCH] lei: non-blocking lei/store->done in lei-daemon
@ 2021-08-24 13:06 Eric Wong
  0 siblings, 0 replies; only message in thread
From: Eric Wong @ 2021-08-24 13:06 UTC (permalink / raw)
  To: meta

This allows client sockets to wait for "done" commits to
lei/store while the daemon reacts asynchronously.  The goal
of this change is to keep the script/lei client alive until
lei/store commits changes to the filesystem, but without
blocking the lei-daemon event loop.  It depends on Perl
refcounting to close the socket.

This change also highlighted our over-use of "done" requests to
lei/store processes, which is now corrected so we only issue it
on collective socket EOF rather than upon reaping every single
worker.

This also fixes "lei forget-mail-sync" when it is the initial
command.

This took several iterations and much debugging to arrive at the
current implementation:

1. The initial iteration of this change utilized socket passing
   from lei-daemon to lei/store, which necessitated switching
   from faster pipes to slower Unix sockets.

2. The second iteration switched to registering notification sockets
   independently of "done" requests, but that could lead to early
   wakeups when "done" was requested by other workers.  This
   appeared to work most of the time, but suffered races under
   high load which were difficult to track down.

Finally, this iteration passes the stringified socket GLOB ref
to lei/store which is echoed back to lei-daemon upon completion
of that particular "done" request.
---
 lib/PublicInbox/LEI.pm               | 19 ++++++++++++++-
 lib/PublicInbox/LeiForgetMailSync.pm |  4 ++--
 lib/PublicInbox/LeiImportKw.pm       |  3 ++-
 lib/PublicInbox/LeiNoteEvent.pm      |  4 ++--
 lib/PublicInbox/LeiPmdir.pm          |  5 ++--
 lib/PublicInbox/LeiStore.pm          | 35 +++++++++++++++++++---------
 lib/PublicInbox/LeiXSearch.pm        |  4 ++--
 lib/PublicInbox/PktOp.pm             |  9 ++++---
 8 files changed, 59 insertions(+), 24 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index ea3ec0fe..5694e92c 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -37,6 +37,7 @@ $GLP_PASS->configure(qw(gnu_getopt no_ignore_case auto_abbrev pass_through));
 
 our %PATH2CFG; # persistent for socket daemon
 our $MDIR2CFGPATH; # /path/to/maildir => { /path/to/config => [ ino watches ] }
+our %LIVE_SOCK; # "GLOB(0x....)" => $lei->{sock}
 
 # TBD: this is a documentation mechanism to show a subcommand
 # (may) pass options through to another command:
@@ -565,6 +566,7 @@ sub _lei_atfork_child {
 	$dir_idle->force_close if $dir_idle;
 	%PATH2CFG = ();
 	$MDIR2CFGPATH = {};
+	%LIVE_SOCK = ();
 	eval 'no warnings; undef $PublicInbox::LeiNoteEvent::to_flush';
 	undef $errors_log;
 	$quit = \&CORE::exit;
@@ -1429,7 +1431,7 @@ sub refresh_watches {
 			add_maildir_watch($cd, $cfg_f);
 		}
 	}
-	my $wait = $renames ? $sto->ipc_do('done') : undef;
+	$lei->sto_done_request if $renames;
 	if ($old) { # cull old non-existent entries
 		for my $url (keys %$old) {
 			next if exists $seen{$url};
@@ -1463,4 +1465,19 @@ sub lms { # read-only LeiMailSync
 	$lse ? $lse->lms : undef;
 }
 
+sub sto_done_request { # only call this from lei-daemon process (not workers)
+	my ($lei, $sock) = @_;
+	if ($sock //= $lei->{sock}) {
+		$LIVE_SOCK{"$sock"} = $sock;
+		$lei->{sto}->ipc_do('done', "$sock"); # issue, async wait
+	} else { # forcibly wait
+		my $wait = $lei->{sto}->ipc_do('done');
+	}
+}
+
+sub sto_done_complete { # called in lei-daemon when LeiStore->done is complete
+	my ($sock_str) = @_;
+	delete $LIVE_SOCK{$sock_str}; # frees {sock} for waiting lei clients
+}
+
 1;
diff --git a/lib/PublicInbox/LeiForgetMailSync.pm b/lib/PublicInbox/LeiForgetMailSync.pm
index 940ca1b6..2b4e58a9 100644
--- a/lib/PublicInbox/LeiForgetMailSync.pm
+++ b/lib/PublicInbox/LeiForgetMailSync.pm
@@ -16,12 +16,12 @@ sub lei_forget_mail_sync {
 	my ($lei, @folders) = @_;
 	my $lms = $lei->lms or return;
 	my $sto = $lei->_lei_store or return; # may disappear due to race
-	$sto->write_prepare;
+	$sto->write_prepare($lei);
 	my $err = $lms->arg2folder($lei, \@folders);
 	$lei->qerr(@{$err->{qerr}}) if $err->{qerr};
 	return $lei->fail($err->{fail}) if $err->{fail};
 	$sto->ipc_do('lms_forget_folders', @folders);
-	my $wait = $sto->ipc_do('done');
+	$lei->sto_done_request;
 }
 
 *_complete_forget_mail_sync = \&PublicInbox::LeiExportKw::_complete_export_kw;
diff --git a/lib/PublicInbox/LeiImportKw.pm b/lib/PublicInbox/LeiImportKw.pm
index 2878cbdf..402125cf 100644
--- a/lib/PublicInbox/LeiImportKw.pm
+++ b/lib/PublicInbox/LeiImportKw.pm
@@ -13,6 +13,7 @@ sub new {
 	my $self = bless { -wq_ident => 'lei import_kw worker' }, $cls;
 	my ($op_c, $ops) = $lei->workers_start($self, $self->detect_nproc);
 	$op_c->{ops} = $ops; # for PktOp->event_step
+	$self->{lei_sock} = $lei->{sock};
 	$lei->{ikw} = $self;
 }
 
@@ -42,13 +43,13 @@ sub ck_update_kw { # via wq_io_do
 sub ikw_done_wait {
 	my ($arg, $pid) = @_;
 	my ($self, $lei) = @$arg;
-	my $wait = $lei->{sto}->ipc_do('done');
 	$lei->can('wq_done_wait')->($arg, $pid);
 }
 
 sub _lei_wq_eof { # EOF callback for main lei daemon
 	my ($lei) = @_;
 	my $ikw = delete $lei->{ikw} or return $lei->fail;
+	$lei->sto_done_request($ikw->{lei_sock});
 	$ikw->wq_wait_old(\&ikw_done_wait, $lei);
 }
 
diff --git a/lib/PublicInbox/LeiNoteEvent.pm b/lib/PublicInbox/LeiNoteEvent.pm
index 1cd15296..6a40ba39 100644
--- a/lib/PublicInbox/LeiNoteEvent.pm
+++ b/lib/PublicInbox/LeiNoteEvent.pm
@@ -15,7 +15,7 @@ sub flush_lei ($) {
 	if (my $lne = delete $lei->{cfg}->{-lei_note_event}) {
 		$lne->wq_close(1, undef, $lei); # runs _lei_wq_eof;
 	} elsif ($lei->{sto}) { # lms_clear_src calls only:
-		my $wait = $lei->{sto}->ipc_do('done');
+		$lei->sto_done_request;
 	}
 }
 
@@ -117,7 +117,7 @@ sub lne_done_wait {
 sub _lei_wq_eof { # EOF callback for main lei daemon
 	my ($lei) = @_;
 	my $lne = delete $lei->{lne} or return $lei->fail;
-	my $wait = $lei->{sto}->ipc_do('done');
+	$lei->sto_done_request;
 	$lne->wq_wait_old(\&lne_done_wait, $lei);
 }
 
diff --git a/lib/PublicInbox/LeiPmdir.pm b/lib/PublicInbox/LeiPmdir.pm
index 760f276c..59cf886e 100644
--- a/lib/PublicInbox/LeiPmdir.pm
+++ b/lib/PublicInbox/LeiPmdir.pm
@@ -25,6 +25,7 @@ sub new {
 	my ($op_c, $ops) = $lei->workers_start($self, $nproc,
 		undef, { ipt => $ipt }); # LeiInput subclass
 	$op_c->{ops} = $ops; # for PktOp->event_step
+	$self->{lei_sock} = $lei->{sock}; # keep client for pmd_done_wait
 	$lei->{pmd} = $self;
 }
 
@@ -32,7 +33,7 @@ sub ipc_atfork_child {
 	my ($self) = @_;
 	my $ipt = $self->{ipt} // die 'BUG: no self->{ipt}';
 	$ipt->{lei} = $self->{lei};
-	$ipt->ipc_atfork_child;
+	$ipt->ipc_atfork_child; # calls _lei_atfork_child;
 }
 
 sub each_mdir_fn { # maildir_each_file callback
@@ -48,13 +49,13 @@ sub mdir_iter { # via wq_io_do
 sub pmd_done_wait {
 	my ($arg, $pid) = @_;
 	my ($self, $lei) = @$arg;
-	my $wait = $lei->{sto}->ipc_do('done');
 	$lei->can('wq_done_wait')->($arg, $pid);
 }
 
 sub _lei_wq_eof { # EOF callback for main lei daemon
 	my ($lei) = @_;
 	my $pmd = delete $lei->{pmd} or return $lei->fail;
+	$lei->sto_done_request($pmd->{lei_sock});
 	$pmd->wq_wait_old(\&pmd_done_wait, $lei);
 }
 
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index bbd853e5..28e36e89 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -471,7 +471,7 @@ sub xchg_stderr {
 }
 
 sub done {
-	my ($self) = @_;
+	my ($self, $sock_ref) = @_;
 	my $err = '';
 	if (my $im = delete($self->{im})) {
 		eval { $im->done };
@@ -486,6 +486,10 @@ sub done {
 	$self->{priv_eidx}->done; # V2Writable::done
 	xchg_stderr($self);
 	die $err if $err;
+
+	# notify clients ->done has been issued
+	defined($sock_ref) and
+		$self->{s2d_op_p}->pkt_do('sto_done_complete', $sock_ref);
 }
 
 sub ipc_atfork_child {
@@ -493,28 +497,37 @@ sub ipc_atfork_child {
 	my $lei = $self->{lei};
 	$lei->_lei_atfork_child(1) if $lei;
 	xchg_stderr($self);
-	if (my $err = delete($self->{err_pipe})) {
-		close $err->[0];
-		$self->{-err_wr} = $err->[1];
+	if (my $to_close = delete($self->{to_close})) {
+		close($_) for @$to_close;
 	}
 	$self->SUPER::ipc_atfork_child;
 }
 
 sub write_prepare {
 	my ($self, $lei) = @_;
+	$lei // die 'BUG: $lei not passed';
 	unless ($self->{-ipc_req}) {
-		my $d = $lei->store_path;
-		$self->ipc_lock_init("$d/ipc.lock");
-		substr($d, -length('/lei/store'), 10, '');
+		# s2d => store-to-daemon messages
+		require PublicInbox::PktOp;
+		my ($s2d_op_c, $s2d_op_p) = PublicInbox::PktOp->pair;
+		my $dir = $lei->store_path;
+		$self->ipc_lock_init("$dir/ipc.lock");
+		substr($dir, -length('/lei/store'), 10, '');
 		pipe(my ($r, $w)) or die "pipe: $!";
-		my $err_pipe = [ $r, $w ];
 		# Mail we import into lei are private, so headers filtered out
 		# by -mda for public mail are not appropriate
 		local @PublicInbox::MDA::BAD_HEADERS = ();
-		$self->ipc_worker_spawn("lei/store $d", $lei->oldset,
-					{ lei => $lei, err_pipe => $err_pipe });
+		$self->ipc_worker_spawn("lei/store $dir", $lei->oldset, {
+					lei => $lei,
+					-err_wr => $w,
+					to_close => [ $r, $s2d_op_c->{sock} ],
+					s2d_op_p => $s2d_op_p,
+				});
 		require PublicInbox::LeiStoreErr;
-		PublicInbox::LeiStoreErr->new($err_pipe->[0], $lei);
+		PublicInbox::LeiStoreErr->new($r, $lei);
+		$s2d_op_c->{ops} = {
+			sto_done_complete => [ $lei->can('sto_done_complete') ]
+		};
 	}
 	$lei->{sto} = $self;
 }
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 5e34d864..1f83e582 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -374,8 +374,8 @@ sub query_done { # EOF callback for main daemon
 	if ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) {
 		warn "BUG: {sto} missing with --mail-sync";
 	}
-	my $wait = $lei->{sto} ? $lei->{sto}->ipc_do('done') : undef;
-	$wait = $lei->{v2w} ? $lei->{v2w}->ipc_do('done') : undef;
+	$lei->sto_done_request if $lei->{sto};
+	my $wait = $lei->{v2w} ? $lei->{v2w}->ipc_do('done') : undef;
 	$lei->{ovv}->ovv_end($lei);
 	my $start_mua;
 	if ($l2m) { # close() calls LeiToMail reap_compress
diff --git a/lib/PublicInbox/PktOp.pm b/lib/PublicInbox/PktOp.pm
index 92e150a4..10942dd1 100644
--- a/lib/PublicInbox/PktOp.pm
+++ b/lib/PublicInbox/PktOp.pm
@@ -56,9 +56,12 @@ sub event_step {
 			($cmd, @pargs) = split(/ /, $msg);
 		}
 		my $op = $self->{ops}->{$cmd //= $msg};
-		die "BUG: unknown message: `$cmd'" unless $op;
-		my ($sub, @args) = @$op;
-		$sub->(@args, @pargs);
+		if ($op) {
+			my ($sub, @args) = @$op;
+			$sub->(@args, @pargs);
+		} elsif ($msg ne '') {
+			die "BUG: unknown message: `$cmd'";
+		}
 		return $self->close if $msg eq ''; # close on EOF
 	}
 }

^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2021-08-24 13:06 UTC | newest]

Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-08-24 13:06 [PATCH] lei: non-blocking lei/store->done in lei-daemon Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).