From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id C49F41F916 for ; Sat, 27 Jun 2020 10:04:02 +0000 (UTC) From: Eric Wong To: meta@public-inbox.org Subject: [PATCH 13/34] watch: wire up IMAP IDLE reapers to DS Date: Sat, 27 Jun 2020 10:03:39 +0000 Message-Id: <20200627100400.9871-14-e@yhbt.net> In-Reply-To: <20200627100400.9871-1-e@yhbt.net> References: <20200627100400.9871-1-e@yhbt.net> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: We can avoid synchronous `waitpid(-1, 0)' and save a process when simultaneously watching Maildirs. One DS bug is fixed: ->Reset needs to clear the DS $in_loop flag in forked children so dwaitpid() fails and allows git processes to be reaped synchronously. TestCommon also calls DS->Reset when spawning new processes, since t/imapd.t uses DS->EventLoop while waiting on -watch to write. --- lib/PublicInbox/DS.pm | 2 +- lib/PublicInbox/TestCommon.pm | 1 + lib/PublicInbox/WatchMaildir.pm | 170 +++++++++++++------------------- script/public-inbox-watch | 6 +- 4 files changed, 77 insertions(+), 102 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index da68802dda9..c46b20cba27 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -68,7 +68,7 @@ Reset all state =cut sub Reset { %DescriptorMap = (); - $wait_pids = $later_queue = undef; + $in_loop = $wait_pids = $later_queue = undef; $EXPMAP = {}; $nextq = $ToClose = $reap_timer = $later_timer = $exp_timer = undef; $LoopTimeout = -1; # no timeout by default diff --git a/lib/PublicInbox/TestCommon.pm b/lib/PublicInbox/TestCommon.pm index b252810fca5..14ebba10563 100644 --- a/lib/PublicInbox/TestCommon.pm +++ b/lib/PublicInbox/TestCommon.pm @@ -350,6 +350,7 @@ sub start_script { } defined(my $pid = fork) or die "fork: $!\n"; if ($pid == 0) { + eval { PublicInbox::DS->Reset }; # pretend to be systemd (cf. sd_listen_fds(3)) # 3 == SD_LISTEN_FDS_START my $fd; diff --git a/lib/PublicInbox/WatchMaildir.pm b/lib/PublicInbox/WatchMaildir.pm index 4d3cd032e5a..431350be277 100644 --- a/lib/PublicInbox/WatchMaildir.pm +++ b/lib/PublicInbox/WatchMaildir.pm @@ -12,7 +12,7 @@ use PublicInbox::Filter::Base qw(REJECT); use PublicInbox::Spamcheck; use PublicInbox::Sigfd; use PublicInbox::DS qw(now); -use POSIX qw(_exit WNOHANG); +use POSIX qw(_exit); *mime_from_path = \&PublicInbox::InboxWritable::mime_from_path; sub compile_watchheaders ($) { @@ -213,9 +213,8 @@ sub quit { } } -sub watch_fs { +sub watch_fs_init ($) { my ($self) = @_; - require PublicInbox::DirIdle; my $done = sub { delete $self->{done_timer}; _done_for_now($self); @@ -224,10 +223,8 @@ sub watch_fs { _try_path($self, $_[0]->fullname); $self->{done_timer} //= PublicInbox::DS::requeue($done); }; - my $di = PublicInbox::DirIdle->new($self->{mdir}, $cb); - PublicInbox::DS->SetPostLoopCallback(sub { !$self->{quit} }); - PublicInbox::DS->EventLoop; - _done_for_now($self); + require PublicInbox::DirIdle; + PublicInbox::DirIdle->new($self->{mdir}, $cb); # EPOLL_CTL_ADD } # returns the git config section name, e.g [imap "imaps://user@example.com"] @@ -334,25 +331,6 @@ sub mic_for ($$$) { # mic = Mail::IMAPClient $mic; } -sub imap_start ($) { - my ($self) = @_; - eval { require PublicInbox::IMAPClient } or - die "Mail::IMAPClient is required for IMAP:\n$@\n"; - eval { require Git } or - die "Git (Perl module) is required for IMAP:\n$@\n"; - eval { require PublicInbox::IMAPTracker } or - die "DBD::SQLite is required for IMAP\n:$@\n"; - - my $mic_args = imap_common_init($self); - # make sure we can connect and cache the credentials in memory - $self->{mic_arg} = {}; # schema://authority => IMAPClient->new args - my $mics = $self->{mics} = {}; # schema://authority => IMAPClient obj - for my $url (sort keys %{$self->{imap}}) { - my $uri = PublicInbox::URIimap->new($url); - $mics->{imap_section($uri)} //= mic_for($self, $uri, $mic_args); - } -} - sub imap_fetch_all ($$$) { my ($self, $mic, $uri) = @_; my $sec = imap_section($uri); @@ -481,74 +459,76 @@ sub watch_imap_idle_1 ($$$) { sub watch_atfork_child ($) { my ($self) = @_; + delete $self->{idle_pids}; + PublicInbox::DS->Reset; PublicInbox::Sigfd::sig_setmask($self->{oldset}); %SIG = (%SIG, %{$self->{sig}}); } -sub watch_imap_idle_all ($$) { - my ($self, $idle) = @_; # $idle = [[ uri1, intvl1 ], [ uri2, intvl2 ]] - $self->{mics} = {}; # going to be forking, so disconnect - my $idle_pids = $self->{idle_pids} = {}; - until ($self->{quit}) { - while (my $uri_intvl = shift @$idle) { - my ($uri, $intvl) = @$uri_intvl; - defined(my $pid = fork) or die "fork: $!"; - if ($pid == 0) { - watch_atfork_child($self); - delete $self->{idle_pids}; - watch_imap_idle_1($self, $uri, $intvl); - _exit(0); - } - $idle_pids->{$pid} = $uri_intvl; - } - my $pid = waitpid(-1, 0) or next; - if ($pid < 0) { - warn "W: no idling children: $!"; - if (@$idle) { - sleep 60; - } else { - warn "W: nothing to respawn, quitting IDLE\n"; - last; - } - } - if (my $uri_intvl = delete $idle_pids->{$pid}) { - my ($uri, $intvl) = @$uri_intvl; - my $url = $uri->as_string; - if ($? || !$self->{quit}) { - warn "W: PID=$pid on $url died: \$?=$?\n"; - } - push @$idle, $uri_intvl; - } else { - warn "W: PID=$pid (unknown) reaped: \$?=$?\n"; - } +sub imap_idle_reap { # PublicInbox::DS::dwaitpid callback + my ($self, $pid) = @_; + my $uri_intvl = delete $self->{idle_pids}->{$pid} or + die "BUG: PID=$pid (unknown) reaped: \$?=$?\n"; + + my ($uri, $intvl) = @$uri_intvl; + my $url = $uri->as_string; + return if $self->{quit}; + warn "W: PID=$pid on $url died: \$?=$?\n" if $?; + push @{$self->{idle_todo}}, $uri_intvl; + PubicInbox::DS::requeue($self); # call ->event_step to respawn +} + +sub imap_idle_fork ($$) { + my ($self, $uri_intvl) = @_; + my ($uri, $intvl) = @$uri_intvl; + defined(my $pid = fork) or die "fork: $!"; + if ($pid == 0) { + watch_atfork_child($self); + watch_imap_idle_1($self, $uri, $intvl); + _exit(0); } + $self->{idle_pids}->{$pid} = $uri_intvl; + PublicInbox::DS::dwaitpid($pid, \&imap_idle_reap, $self); +} - # tear it all down - kill('QUIT', $_) for (keys %$idle_pids); - while (scalar keys %$idle_pids) { - if (my $pid = waitpid(-1, WNOHANG)) { - if ($pid < 0) { - warn "E: no children? $! (PIDs: ", - join(', ', keys %$idle_pids),")\n"; - last; - } else { - delete $idle_pids->{$pid}; - } - } else { # signals aren't that reliable w/o signalfd/kevent - sleep 1; - kill('QUIT', $_) for (keys %$idle_pids); +sub event_step { + my ($self) = @_; + return if $self->{quit}; + my $idle_todo = $self->{idle_todo}; + if ($idle_todo && @$idle_todo) { + $self->{mics} = {}; # going to be forking, so disconnect + while (my $uri_intvl = shift(@$idle_todo)) { + imap_idle_fork($self, $uri_intvl); } } + goto(&fs_scan_step) if $self->{mdre}; } -sub watch_imap ($) { +sub watch_imap_init ($) { my ($self) = @_; - my $idle = []; # [ [ uri1, intvl1 ], [uri2, intvl2] ]; + eval { require PublicInbox::IMAPClient } or + die "Mail::IMAPClient is required for IMAP:\n$@\n"; + eval { require Git } or + die "Git (Perl module) is required for IMAP:\n$@\n"; + eval { require PublicInbox::IMAPTracker } or + die "DBD::SQLite is required for IMAP\n:$@\n"; + + my $mic_args = imap_common_init($self); # read args from config + + # make sure we can connect and cache the credentials in memory + $self->{mic_arg} = {}; # schema://authority => IMAPClient->new args + my $mics = $self->{mics} = {}; # schema://authority => IMAPClient obj + for my $url (sort keys %{$self->{imap}}) { + my $uri = PublicInbox::URIimap->new($url); + $mics->{imap_section($uri)} //= mic_for($self, $uri, $mic_args); + } + + my $idle = []; # [ [ uri1, intvl1 ], [uri2, intvl2] ] my $poll = {}; # intvl_seconds => [ uri1, uri2 ] for my $url (keys %{$self->{imap}}) { my $uri = PublicInbox::URIimap->new($url); my $sec = imap_section($uri); - my $mic = $self->{mics}->{$sec}; + my $mic = $mics->{$sec}; my $intvl = $self->{imap_opt}->{$sec}->{poll_intvl}; if ($mic->has_capability('IDLE') && !$intvl) { $intvl = $self->{imap_opt}->{$sec}->{idle_intvl}; @@ -557,9 +537,10 @@ sub watch_imap ($) { push @{$poll->{$intvl || 120}}, $uri; } } - my $nr_poll = scalar keys %$poll; - if (scalar @$idle && !$nr_poll) { # multiple idlers, need fork - watch_imap_idle_all($self, $idle); + if (scalar @$idle) { + $self->{idle_pids} = {}; + $self->{idle_todo} = $idle; + PublicInbox::DS::requeue($self); # ->event_step to fork } # TODO: polling } @@ -568,21 +549,11 @@ sub watch { my ($self, $sig, $oldset) = @_; $self->{oldset} = $oldset; $self->{sig} = $sig; - if ($self->{mdre} && $self->{imap}) { - defined(my $pid = fork) or die "fork: $!"; - if ($pid == 0) { - watch_atfork_child($self); - imap_start($self); - goto &watch_imap; - } - $self->{-imap_pid} = $pid; - } elsif ($self->{imap}) { - # not a child process, but no signalfd, yet: - watch_atfork_child($self); - imap_start($self); - goto &watch_imap; - } - goto &watch_fs; + watch_imap_init($self) if $self->{imap}; + watch_fs_init($self) if $self->{mdre}; + PublicInbox::DS->SetPostLoopCallback(sub {}); + PublicInbox::DS->EventLoop until $self->{quit}; + _done_for_now($self); } sub trigger_scan { @@ -591,8 +562,7 @@ sub trigger_scan { PublicInbox::DS::requeue($self); } -# called directly, and by PublicInbox::DS -sub event_step ($) { +sub fs_scan_step { my ($self) = @_; return if $self->{quit}; my $op = shift @{$self->{ops}}; @@ -634,7 +604,7 @@ sub event_step ($) { sub scan { my ($self, $op) = @_; push @{$self->{ops}}, $op; - goto &event_step; + goto &fs_scan_step; } sub _importer_for { diff --git a/script/public-inbox-watch b/script/public-inbox-watch index b6d545adad7..ae7b70be355 100755 --- a/script/public-inbox-watch +++ b/script/public-inbox-watch @@ -22,7 +22,11 @@ if ($watch_md) { $watch_md->quit if $watch_md; $watch_md = undef; }; - my $sig = { HUP => $reload, USR1 => $scan }; + my $sig = { + HUP => $reload, + USR1 => $scan, + CHLD => \&PublicInbox::DS::enqueue_reap, + }; $sig->{QUIT} = $sig->{TERM} = $sig->{INT} = $quit; # --no-scan is only intended for testing atm, undocumented.