From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 74EE41F8C7 for ; Tue, 24 Aug 2021 13:06:39 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH] lei: non-blocking lei/store->done in lei-daemon Date: Tue, 24 Aug 2021 13:06:39 +0000 Message-Id: <20210824130639.1980-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This allows client sockets to wait for "done" commits to lei/store while the daemon reacts asynchronously. The goal of this change is to keep the script/lei client alive until lei/store commits changes to the filesystem, but without blocking the lei-daemon event loop. It depends on Perl refcounting to close the socket. This change also highlighted our over-use of "done" requests to lei/store processes, which is now corrected so we only issue it on collective socket EOF rather than upon reaping every single worker. This also fixes "lei forget-mail-sync" when it is the initial command. This took several iterations and much debugging to arrive at the current implementation: 1. The initial iteration of this change utilized socket passing from lei-daemon to lei/store, which necessitated switching from faster pipes to slower Unix sockets. 2. The second iteration switched to registering notification sockets independently of "done" requests, but that could lead to early wakeups when "done" was requested by other workers. This appeared to work most of the time, but suffered races under high load which were difficult to track down. Finally, this iteration passes the stringified socket GLOB ref to lei/store which is echoed back to lei-daemon upon completion of that particular "done" request. --- lib/PublicInbox/LEI.pm | 19 ++++++++++++++- lib/PublicInbox/LeiForgetMailSync.pm | 4 ++-- lib/PublicInbox/LeiImportKw.pm | 3 ++- lib/PublicInbox/LeiNoteEvent.pm | 4 ++-- lib/PublicInbox/LeiPmdir.pm | 5 ++-- lib/PublicInbox/LeiStore.pm | 35 +++++++++++++++++++--------- lib/PublicInbox/LeiXSearch.pm | 4 ++-- lib/PublicInbox/PktOp.pm | 9 ++++--- 8 files changed, 59 insertions(+), 24 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index ea3ec0fe..5694e92c 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -37,6 +37,7 @@ $GLP_PASS->configure(qw(gnu_getopt no_ignore_case auto_abbrev pass_through)); our %PATH2CFG; # persistent for socket daemon our $MDIR2CFGPATH; # /path/to/maildir => { /path/to/config => [ ino watches ] } +our %LIVE_SOCK; # "GLOB(0x....)" => $lei->{sock} # TBD: this is a documentation mechanism to show a subcommand # (may) pass options through to another command: @@ -565,6 +566,7 @@ sub _lei_atfork_child { $dir_idle->force_close if $dir_idle; %PATH2CFG = (); $MDIR2CFGPATH = {}; + %LIVE_SOCK = (); eval 'no warnings; undef $PublicInbox::LeiNoteEvent::to_flush'; undef $errors_log; $quit = \&CORE::exit; @@ -1429,7 +1431,7 @@ sub refresh_watches { add_maildir_watch($cd, $cfg_f); } } - my $wait = $renames ? $sto->ipc_do('done') : undef; + $lei->sto_done_request if $renames; if ($old) { # cull old non-existent entries for my $url (keys %$old) { next if exists $seen{$url}; @@ -1463,4 +1465,19 @@ sub lms { # read-only LeiMailSync $lse ? $lse->lms : undef; } +sub sto_done_request { # only call this from lei-daemon process (not workers) + my ($lei, $sock) = @_; + if ($sock //= $lei->{sock}) { + $LIVE_SOCK{"$sock"} = $sock; + $lei->{sto}->ipc_do('done', "$sock"); # issue, async wait + } else { # forcibly wait + my $wait = $lei->{sto}->ipc_do('done'); + } +} + +sub sto_done_complete { # called in lei-daemon when LeiStore->done is complete + my ($sock_str) = @_; + delete $LIVE_SOCK{$sock_str}; # frees {sock} for waiting lei clients +} + 1; diff --git a/lib/PublicInbox/LeiForgetMailSync.pm b/lib/PublicInbox/LeiForgetMailSync.pm index 940ca1b6..2b4e58a9 100644 --- a/lib/PublicInbox/LeiForgetMailSync.pm +++ b/lib/PublicInbox/LeiForgetMailSync.pm @@ -16,12 +16,12 @@ sub lei_forget_mail_sync { my ($lei, @folders) = @_; my $lms = $lei->lms or return; my $sto = $lei->_lei_store or return; # may disappear due to race - $sto->write_prepare; + $sto->write_prepare($lei); my $err = $lms->arg2folder($lei, \@folders); $lei->qerr(@{$err->{qerr}}) if $err->{qerr}; return $lei->fail($err->{fail}) if $err->{fail}; $sto->ipc_do('lms_forget_folders', @folders); - my $wait = $sto->ipc_do('done'); + $lei->sto_done_request; } *_complete_forget_mail_sync = \&PublicInbox::LeiExportKw::_complete_export_kw; diff --git a/lib/PublicInbox/LeiImportKw.pm b/lib/PublicInbox/LeiImportKw.pm index 2878cbdf..402125cf 100644 --- a/lib/PublicInbox/LeiImportKw.pm +++ b/lib/PublicInbox/LeiImportKw.pm @@ -13,6 +13,7 @@ sub new { my $self = bless { -wq_ident => 'lei import_kw worker' }, $cls; my ($op_c, $ops) = $lei->workers_start($self, $self->detect_nproc); $op_c->{ops} = $ops; # for PktOp->event_step + $self->{lei_sock} = $lei->{sock}; $lei->{ikw} = $self; } @@ -42,13 +43,13 @@ sub ck_update_kw { # via wq_io_do sub ikw_done_wait { my ($arg, $pid) = @_; my ($self, $lei) = @$arg; - my $wait = $lei->{sto}->ipc_do('done'); $lei->can('wq_done_wait')->($arg, $pid); } sub _lei_wq_eof { # EOF callback for main lei daemon my ($lei) = @_; my $ikw = delete $lei->{ikw} or return $lei->fail; + $lei->sto_done_request($ikw->{lei_sock}); $ikw->wq_wait_old(\&ikw_done_wait, $lei); } diff --git a/lib/PublicInbox/LeiNoteEvent.pm b/lib/PublicInbox/LeiNoteEvent.pm index 1cd15296..6a40ba39 100644 --- a/lib/PublicInbox/LeiNoteEvent.pm +++ b/lib/PublicInbox/LeiNoteEvent.pm @@ -15,7 +15,7 @@ sub flush_lei ($) { if (my $lne = delete $lei->{cfg}->{-lei_note_event}) { $lne->wq_close(1, undef, $lei); # runs _lei_wq_eof; } elsif ($lei->{sto}) { # lms_clear_src calls only: - my $wait = $lei->{sto}->ipc_do('done'); + $lei->sto_done_request; } } @@ -117,7 +117,7 @@ sub lne_done_wait { sub _lei_wq_eof { # EOF callback for main lei daemon my ($lei) = @_; my $lne = delete $lei->{lne} or return $lei->fail; - my $wait = $lei->{sto}->ipc_do('done'); + $lei->sto_done_request; $lne->wq_wait_old(\&lne_done_wait, $lei); } diff --git a/lib/PublicInbox/LeiPmdir.pm b/lib/PublicInbox/LeiPmdir.pm index 760f276c..59cf886e 100644 --- a/lib/PublicInbox/LeiPmdir.pm +++ b/lib/PublicInbox/LeiPmdir.pm @@ -25,6 +25,7 @@ sub new { my ($op_c, $ops) = $lei->workers_start($self, $nproc, undef, { ipt => $ipt }); # LeiInput subclass $op_c->{ops} = $ops; # for PktOp->event_step + $self->{lei_sock} = $lei->{sock}; # keep client for pmd_done_wait $lei->{pmd} = $self; } @@ -32,7 +33,7 @@ sub ipc_atfork_child { my ($self) = @_; my $ipt = $self->{ipt} // die 'BUG: no self->{ipt}'; $ipt->{lei} = $self->{lei}; - $ipt->ipc_atfork_child; + $ipt->ipc_atfork_child; # calls _lei_atfork_child; } sub each_mdir_fn { # maildir_each_file callback @@ -48,13 +49,13 @@ sub mdir_iter { # via wq_io_do sub pmd_done_wait { my ($arg, $pid) = @_; my ($self, $lei) = @$arg; - my $wait = $lei->{sto}->ipc_do('done'); $lei->can('wq_done_wait')->($arg, $pid); } sub _lei_wq_eof { # EOF callback for main lei daemon my ($lei) = @_; my $pmd = delete $lei->{pmd} or return $lei->fail; + $lei->sto_done_request($pmd->{lei_sock}); $pmd->wq_wait_old(\&pmd_done_wait, $lei); } diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm index bbd853e5..28e36e89 100644 --- a/lib/PublicInbox/LeiStore.pm +++ b/lib/PublicInbox/LeiStore.pm @@ -471,7 +471,7 @@ sub xchg_stderr { } sub done { - my ($self) = @_; + my ($self, $sock_ref) = @_; my $err = ''; if (my $im = delete($self->{im})) { eval { $im->done }; @@ -486,6 +486,10 @@ sub done { $self->{priv_eidx}->done; # V2Writable::done xchg_stderr($self); die $err if $err; + + # notify clients ->done has been issued + defined($sock_ref) and + $self->{s2d_op_p}->pkt_do('sto_done_complete', $sock_ref); } sub ipc_atfork_child { @@ -493,28 +497,37 @@ sub ipc_atfork_child { my $lei = $self->{lei}; $lei->_lei_atfork_child(1) if $lei; xchg_stderr($self); - if (my $err = delete($self->{err_pipe})) { - close $err->[0]; - $self->{-err_wr} = $err->[1]; + if (my $to_close = delete($self->{to_close})) { + close($_) for @$to_close; } $self->SUPER::ipc_atfork_child; } sub write_prepare { my ($self, $lei) = @_; + $lei // die 'BUG: $lei not passed'; unless ($self->{-ipc_req}) { - my $d = $lei->store_path; - $self->ipc_lock_init("$d/ipc.lock"); - substr($d, -length('/lei/store'), 10, ''); + # s2d => store-to-daemon messages + require PublicInbox::PktOp; + my ($s2d_op_c, $s2d_op_p) = PublicInbox::PktOp->pair; + my $dir = $lei->store_path; + $self->ipc_lock_init("$dir/ipc.lock"); + substr($dir, -length('/lei/store'), 10, ''); pipe(my ($r, $w)) or die "pipe: $!"; - my $err_pipe = [ $r, $w ]; # Mail we import into lei are private, so headers filtered out # by -mda for public mail are not appropriate local @PublicInbox::MDA::BAD_HEADERS = (); - $self->ipc_worker_spawn("lei/store $d", $lei->oldset, - { lei => $lei, err_pipe => $err_pipe }); + $self->ipc_worker_spawn("lei/store $dir", $lei->oldset, { + lei => $lei, + -err_wr => $w, + to_close => [ $r, $s2d_op_c->{sock} ], + s2d_op_p => $s2d_op_p, + }); require PublicInbox::LeiStoreErr; - PublicInbox::LeiStoreErr->new($err_pipe->[0], $lei); + PublicInbox::LeiStoreErr->new($r, $lei); + $s2d_op_c->{ops} = { + sto_done_complete => [ $lei->can('sto_done_complete') ] + }; } $lei->{sto} = $self; } diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 5e34d864..1f83e582 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -374,8 +374,8 @@ sub query_done { # EOF callback for main daemon if ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) { warn "BUG: {sto} missing with --mail-sync"; } - my $wait = $lei->{sto} ? $lei->{sto}->ipc_do('done') : undef; - $wait = $lei->{v2w} ? $lei->{v2w}->ipc_do('done') : undef; + $lei->sto_done_request if $lei->{sto}; + my $wait = $lei->{v2w} ? $lei->{v2w}->ipc_do('done') : undef; $lei->{ovv}->ovv_end($lei); my $start_mua; if ($l2m) { # close() calls LeiToMail reap_compress diff --git a/lib/PublicInbox/PktOp.pm b/lib/PublicInbox/PktOp.pm index 92e150a4..10942dd1 100644 --- a/lib/PublicInbox/PktOp.pm +++ b/lib/PublicInbox/PktOp.pm @@ -56,9 +56,12 @@ sub event_step { ($cmd, @pargs) = split(/ /, $msg); } my $op = $self->{ops}->{$cmd //= $msg}; - die "BUG: unknown message: `$cmd'" unless $op; - my ($sub, @args) = @$op; - $sub->(@args, @pargs); + if ($op) { + my ($sub, @args) = @$op; + $sub->(@args, @pargs); + } elsif ($msg ne '') { + die "BUG: unknown message: `$cmd'"; + } return $self->close if $msg eq ''; # close on EOF } }