unofficial mirror of meta@public-inbox.org
 help / color / mirror / Atom feed
* [PATCH 00/11] watch: fix contention w/ Maildir & NNTP
@ 2020-08-31  4:41 Eric Wong
  2020-08-31  4:41 ` [PATCH 01/11] watch: limit batch size of NNTP and IMAP workers, too Eric Wong
                   ` (10 more replies)
  0 siblings, 11 replies; 12+ messages in thread
From: Eric Wong @ 2020-08-31  4:41 UTC (permalink / raw)
  To: meta

Here's a bunch of fixes to improve watch performance when
both Maildirs and NNTP are being watched (possibly on the same
inbox, or if `watchspam' is configured for spam removals).

Wakeups are reduced, and inbox.lock contention is minimized by
using read-only ->over to check for `watchspam' removals.

These affect IMAP, too; but I've been mainly using NNTP.

Eric Wong (11):
  watch: limit batch size of NNTP and IMAP workers, too
  watchmaildir: use v5.10.1, drop warnings
  rename WatchMaildir => Watch
  watch: log signal activities to STDERR
  watch: avoid unnecessary spawning on spam removals
  watch: block signals before fork on non-signalfd/kevent systems
  watch: comments and tiny cleanups
  ds: avoid excessive queueing when reaping PIDs
  watch: use EOFpipe to reduce dwaitpid wakeups
  ds: avoid unnecessary timer for waitpid
  replace ParentPipe with EOFpipe

 MANIFEST                                      |   4 +-
 lib/PublicInbox/DS.pm                         |  38 +++---
 lib/PublicInbox/Daemon.pm                     |   6 +-
 lib/PublicInbox/EOFpipe.pm                    |  24 ++++
 lib/PublicInbox/Import.pm                     |   3 +
 lib/PublicInbox/ParentPipe.pm                 |  23 ----
 lib/PublicInbox/V2Writable.pm                 |   3 +
 lib/PublicInbox/{WatchMaildir.pm => Watch.pm} | 111 +++++++++++++-----
 script/public-inbox-watch                     |  34 ++++--
 t/imapd.t                                     |   2 +-
 t/nntpd.t                                     |   2 +-
 t/watch_filter_rubylang.t                     |   4 +-
 t/watch_imap.t                                |   4 +-
 t/watch_maildir.t                             |  18 +--
 t/watch_maildir_v2.t                          |  22 ++--
 t/watch_multiple_headers.t                    |   4 +-
 t/watch_nntp.t                                |   4 +-
 17 files changed, 190 insertions(+), 116 deletions(-)
 create mode 100644 lib/PublicInbox/EOFpipe.pm
 delete mode 100644 lib/PublicInbox/ParentPipe.pm
 rename lib/PublicInbox/{WatchMaildir.pm => Watch.pm} (92%)

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [PATCH 01/11] watch: limit batch size of NNTP and IMAP workers, too
  2020-08-31  4:41 [PATCH 00/11] watch: fix contention w/ Maildir & NNTP Eric Wong
@ 2020-08-31  4:41 ` Eric Wong
  2020-08-31  4:41 ` [PATCH 02/11] watchmaildir: use v5.10.1, drop warnings Eric Wong
                   ` (9 subsequent siblings)
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2020-08-31  4:41 UTC (permalink / raw)
  To: meta; +Cc: Eric Wong

From: Eric Wong <e@yhbt.net>

We don't want to monopolize locks because processes can easily
block each other if using `watchspam' on a Maildir while a big
NNTP or IMAP import is happening.

This can also happen if somebody configured a single inbox to
watch from several sources to merge several mailboxes into one
(e.g. both an IMAP and Maildir are watched).
---
 lib/PublicInbox/WatchMaildir.pm | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git a/lib/PublicInbox/WatchMaildir.pm b/lib/PublicInbox/WatchMaildir.pm
index a227a6fd..5176ef69 100644
--- a/lib/PublicInbox/WatchMaildir.pm
+++ b/lib/PublicInbox/WatchMaildir.pm
@@ -108,6 +108,7 @@ sub new {
 	return unless $mdre || scalar(keys %imap) || scalar(keys %nntp);
 
 	bless {
+		max_batch => 10, # avoid hogging locks for too long
 		spamcheck => $spamcheck,
 		mdmap => \%mdmap,
 		mdre => $mdre,
@@ -472,8 +473,14 @@ sub imap_fetch_all ($$$) {
 
 		$l_uid = $uids->[-1] + 1; # for next search
 		my $last_uid;
+		my $n = $self->{max_batch};
 
 		while (scalar @$uids) {
+			if (--$n < 0) {
+				_done_for_now($self);
+				$itrk->update_last($r_uidval, $last_uid);
+				$n = $self->{max_batch};
+			}
 			my @batch = splice(@$uids, 0, $bs);
 			$batch = join(',', @batch);
 			local $0 = "UID:$batch $mbx $sec";
@@ -888,9 +895,15 @@ sub nntp_fetch_all ($$$) {
 	};
 	my $inboxes = $self->{nntp}->{$url};
 	my $last_art;
+	my $n = $self->{max_batch};
 	for ($beg..$end) {
 		last if $self->{quit};
 		$art = $_;
+		if (--$n < 0) {
+			_done_for_now($self);
+			$itrk->update_last(0, $last_art);
+			$n = $self->{max_batch};
+		}
 		my $raw = $nn->article($art);
 		unless (defined($raw)) {
 			my $msg = $nn->message;
@@ -976,12 +989,11 @@ sub fs_scan_step {
 	local $PublicInbox::DS::in_loop = 0; # waitpid() synchronously
 
 	# continue existing scan
-	my $max = 10;
 	my $opendirs = $self->{opendirs};
 	my @dirnames = keys %$opendirs;
 	foreach my $dir (@dirnames) {
 		my $dh = delete $opendirs->{$dir};
-		my $n = $max;
+		my $n = $self->{max_batch};
 		while (my $fn = readdir($dh)) {
 			_try_path($self, "$dir/$fn");
 			last if --$n < 0;
@@ -996,7 +1008,7 @@ sub fs_scan_step {
 				warn "failed to open $dir: $!\n";
 				next;
 			}
-			my $n = $max;
+			my $n = $self->{max_batch};
 			while (my $fn = readdir($dh)) {
 				_try_path($self, "$dir/$fn");
 				last if --$n < 0;

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH 02/11] watchmaildir: use v5.10.1, drop warnings
  2020-08-31  4:41 [PATCH 00/11] watch: fix contention w/ Maildir & NNTP Eric Wong
  2020-08-31  4:41 ` [PATCH 01/11] watch: limit batch size of NNTP and IMAP workers, too Eric Wong
@ 2020-08-31  4:41 ` Eric Wong
  2020-08-31  4:41 ` [PATCH 03/11] rename WatchMaildir => Watch Eric Wong
                   ` (8 subsequent siblings)
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2020-08-31  4:41 UTC (permalink / raw)
  To: meta; +Cc: Eric Wong

From: Eric Wong <e@yhbt.net>

Declare 5.10.1 to avoid potential compatibility problems with
Perl 7/8 down the line.  We'll rely on the command-line to set
or drop warnings during development, at least.
---
 lib/PublicInbox/WatchMaildir.pm | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/PublicInbox/WatchMaildir.pm b/lib/PublicInbox/WatchMaildir.pm
index 5176ef69..1c7ac6c0 100644
--- a/lib/PublicInbox/WatchMaildir.pm
+++ b/lib/PublicInbox/WatchMaildir.pm
@@ -5,7 +5,7 @@
 #	http://wiki2.dovecot.org/MailboxFormat/Maildir
 package PublicInbox::WatchMaildir;
 use strict;
-use warnings;
+use v5.10.1;
 use PublicInbox::Eml;
 use PublicInbox::InboxWritable qw(eml_from_path warn_ignore_cb);
 use PublicInbox::Filter::Base qw(REJECT);

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH 03/11] rename WatchMaildir => Watch
  2020-08-31  4:41 [PATCH 00/11] watch: fix contention w/ Maildir & NNTP Eric Wong
  2020-08-31  4:41 ` [PATCH 01/11] watch: limit batch size of NNTP and IMAP workers, too Eric Wong
  2020-08-31  4:41 ` [PATCH 02/11] watchmaildir: use v5.10.1, drop warnings Eric Wong
@ 2020-08-31  4:41 ` Eric Wong
  2020-08-31  4:41 ` [PATCH 04/11] watch: log signal activities to STDERR Eric Wong
                   ` (7 subsequent siblings)
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2020-08-31  4:41 UTC (permalink / raw)
  To: meta; +Cc: Eric Wong

From: Eric Wong <e@yhbt.net>

This is no longer limited to Maildirs now that IMAP and NNTP
support exist; so give it a shorter name.
---
 MANIFEST                                      |  2 +-
 lib/PublicInbox/{WatchMaildir.pm => Watch.pm} |  2 +-
 script/public-inbox-watch                     | 18 +++++++--------
 t/imapd.t                                     |  2 +-
 t/nntpd.t                                     |  2 +-
 t/watch_filter_rubylang.t                     |  4 ++--
 t/watch_imap.t                                |  4 ++--
 t/watch_maildir.t                             | 18 +++++++--------
 t/watch_maildir_v2.t                          | 22 +++++++++----------
 t/watch_multiple_headers.t                    |  4 ++--
 t/watch_nntp.t                                |  4 ++--
 11 files changed, 41 insertions(+), 41 deletions(-)
 rename lib/PublicInbox/{WatchMaildir.pm => Watch.pm} (99%)

diff --git a/MANIFEST b/MANIFEST
index 35adc8d3..f090175e 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -200,7 +200,7 @@ lib/PublicInbox/ViewDiff.pm
 lib/PublicInbox/ViewVCS.pm
 lib/PublicInbox/WWW.pm
 lib/PublicInbox/WWW.pod
-lib/PublicInbox/WatchMaildir.pm
+lib/PublicInbox/Watch.pm
 lib/PublicInbox/WwwAltId.pm
 lib/PublicInbox/WwwAtomStream.pm
 lib/PublicInbox/WwwAttach.pm
diff --git a/lib/PublicInbox/WatchMaildir.pm b/lib/PublicInbox/Watch.pm
similarity index 99%
rename from lib/PublicInbox/WatchMaildir.pm
rename to lib/PublicInbox/Watch.pm
index 1c7ac6c0..5f786139 100644
--- a/lib/PublicInbox/WatchMaildir.pm
+++ b/lib/PublicInbox/Watch.pm
@@ -3,7 +3,7 @@
 #
 # ref: https://cr.yp.to/proto/maildir.html
 #	http://wiki2.dovecot.org/MailboxFormat/Maildir
-package PublicInbox::WatchMaildir;
+package PublicInbox::Watch;
 use strict;
 use v5.10.1;
 use PublicInbox::Eml;
diff --git a/script/public-inbox-watch b/script/public-inbox-watch
index 20534bf2..02491860 100755
--- a/script/public-inbox-watch
+++ b/script/public-inbox-watch
@@ -3,7 +3,7 @@
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 use strict;
 use IO::Handle;
-use PublicInbox::WatchMaildir;
+use PublicInbox::Watch;
 use PublicInbox::Config;
 use PublicInbox::DS;
 use PublicInbox::Sigfd;
@@ -11,18 +11,18 @@ use PublicInbox::Syscall qw($SFD_NONBLOCK);
 my $oldset = PublicInbox::Sigfd::block_signals();
 STDOUT->autoflush(1);
 STDERR->autoflush(1);
-my ($config, $watch_md);
+my ($config, $watch);
 my $reload = sub {
 	$config = PublicInbox::Config->new;
-	$watch_md->quit if $watch_md;
-	$watch_md = PublicInbox::WatchMaildir->new($config);
+	$watch->quit if $watch;
+	$watch = PublicInbox::Watch->new($config);
 };
 $reload->();
-if ($watch_md) {
-	my $scan = sub { $watch_md->trigger_scan('full') if $watch_md };
+if ($watch) {
+	my $scan = sub { $watch->trigger_scan('full') if $watch };
 	my $quit = sub {
-		$watch_md->quit if $watch_md;
-		$watch_md = undef;
+		$watch->quit if $watch;
+		$watch = undef;
 	};
 	my $sig = {
 		HUP => $reload,
@@ -41,5 +41,5 @@ if ($watch_md) {
 		PublicInbox::Sigfd::set_sigmask($oldset);
 		PublicInbox::DS->SetLoopTimeout(1000);
 	}
-	$watch_md->watch($sig, $oldset) while ($watch_md);
+	$watch->watch($sig, $oldset) while ($watch);
 }
diff --git a/t/imapd.t b/t/imapd.t
index 8db72bd7..f743bf06 100644
--- a/t/imapd.t
+++ b/t/imapd.t
@@ -449,7 +449,7 @@ ok($mic->logout, 'logged out');
 }
 
 SKIP: {
-	use_ok 'PublicInbox::WatchMaildir';
+	use_ok 'PublicInbox::Watch';
 	use_ok 'PublicInbox::InboxIdle';
 	require_git('1.8.5', 1) or
 		skip('git 1.8.5+ needed for --urlmatch', 4);
diff --git a/t/nntpd.t b/t/nntpd.t
index 74e21a41..d8a44334 100644
--- a/t/nntpd.t
+++ b/t/nntpd.t
@@ -396,7 +396,7 @@ sub read_til_dot {
 
 sub test_watch {
 	my ($tmpdir, $sock, $group) = @_;
-	use_ok 'PublicInbox::WatchMaildir';
+	use_ok 'PublicInbox::Watch';
 	use_ok 'PublicInbox::InboxIdle';
 	use_ok 'PublicInbox::Config';
 	require_git('1.8.5', 1) or skip('git 1.8.5+ needed for --urlmatch', 4);
diff --git a/t/watch_filter_rubylang.t b/t/watch_filter_rubylang.t
index db48cb2f..4b72dbae 100644
--- a/t/watch_filter_rubylang.t
+++ b/t/watch_filter_rubylang.t
@@ -7,7 +7,7 @@ use Test::More;
 use PublicInbox::Eml;
 use PublicInbox::Config;
 require_mods(qw(DBD::SQLite Search::Xapian));
-use_ok 'PublicInbox::WatchMaildir';
+use_ok 'PublicInbox::Watch';
 use_ok 'PublicInbox::Emergency';
 my ($tmpdir, $for_destroy) = tmpdir();
 local $ENV{PI_CONFIG} = "$tmpdir/pi_config";
@@ -76,7 +76,7 @@ EOF
 	my $ibx = $config->lookup_name($v);
 	ok($ibx, 'found inbox by name');
 
-	my $w = PublicInbox::WatchMaildir->new($config);
+	my $w = PublicInbox::Watch->new($config);
 	for my $i (1..2) {
 		$w->scan('full');
 	}
diff --git a/t/watch_imap.t b/t/watch_imap.t
index 9433bb6f..fb71d3df 100644
--- a/t/watch_imap.t
+++ b/t/watch_imap.t
@@ -5,14 +5,14 @@ use Test::More;
 use PublicInbox::Config;
 # see t/imapd*.t for tests against a live IMAP server
 
-use_ok 'PublicInbox::WatchMaildir';
+use_ok 'PublicInbox::Watch';
 my $cfg = PublicInbox::Config->new(\<<EOF);
 publicinbox.i.address=i\@example.com
 publicinbox.i.inboxdir=/nonexistent
 publicinbox.i.watch=imap://example.com/INBOX.a
 publicinboxlearn.watchspam=imap://example.com/INBOX.spam
 EOF
-my $watch = PublicInbox::WatchMaildir->new($cfg);
+my $watch = PublicInbox::Watch->new($cfg);
 is($watch->{imap}->{'imap://example.com/INBOX.a'}->[0]->{name}, 'i',
 	'watched an inbox');
 is($watch->{imap}->{'imap://example.com/INBOX.spam'}, 'watchspam',
diff --git a/t/watch_maildir.t b/t/watch_maildir.t
index c44273f0..ae53caf9 100644
--- a/t/watch_maildir.t
+++ b/t/watch_maildir.t
@@ -11,7 +11,7 @@ my ($tmpdir, $for_destroy) = tmpdir();
 my $git_dir = "$tmpdir/test.git";
 my $maildir = "$tmpdir/md";
 my $spamdir = "$tmpdir/spam";
-use_ok 'PublicInbox::WatchMaildir';
+use_ok 'PublicInbox::Watch';
 use_ok 'PublicInbox::Emergency';
 my $cfgpfx = "publicinbox.test";
 my $addr = 'test-public@example.com';
@@ -40,7 +40,7 @@ $cfgpfx.inboxdir=$git_dir
 $cfgpfx.watch=maildir:$spamdir
 publicinboxlearn.watchspam=maildir:$spamdir
 EOF
-	my $wm = PublicInbox::WatchMaildir->new($config);
+	my $wm = PublicInbox::Watch->new($config);
 	is(scalar grep(/is a spam folder/, @w), 1, 'got warning about spam');
 	is_deeply($wm->{mdmap}, { "$spamdir/cur" => 'watchspam' },
 		'only got the spam folder to watch');
@@ -62,7 +62,7 @@ EOF
 }
 
 my $config = PublicInbox::Config->new($cfg_path);
-PublicInbox::WatchMaildir->new($config)->scan('full');
+PublicInbox::Watch->new($config)->scan('full');
 my $git = PublicInbox::Git->new($git_dir);
 my @list = $git->qx(qw(rev-list refs/heads/master));
 is(scalar @list, 1, 'one revision in rev-list');
@@ -79,7 +79,7 @@ my $write_spam = sub {
 };
 $write_spam->();
 is(unlink(glob("$maildir/new/*")), 1, 'unlinked old spam');
-PublicInbox::WatchMaildir->new($config)->scan('full');
+PublicInbox::Watch->new($config)->scan('full');
 @list = $git->qx(qw(rev-list refs/heads/master));
 is(scalar @list, 2, 'two revisions in rev-list');
 @list = $git->qx(qw(ls-tree -r --name-only refs/heads/master));
@@ -93,7 +93,7 @@ To unsubscribe from this list: send the line "unsubscribe git" in
 the body of a message to majordomo\@vger.kernel.org
 More majordomo info at  http://vger.kernel.org/majordomo-info.html\n);
 	PublicInbox::Emergency->new($maildir)->prepare(\$msg);
-	PublicInbox::WatchMaildir->new($config)->scan('full');
+	PublicInbox::Watch->new($config)->scan('full');
 	@list = $git->qx(qw(ls-tree -r --name-only refs/heads/master));
 	is(scalar @list, 1, 'tree has one file');
 	my $mref = $git->cat_file('HEAD:'.$list[0]);
@@ -101,7 +101,7 @@ More majordomo info at  http://vger.kernel.org/majordomo-info.html\n);
 
 	is(unlink(glob("$maildir/new/*")), 1, 'unlinked spam');
 	$write_spam->();
-	PublicInbox::WatchMaildir->new($config)->scan('full');
+	PublicInbox::Watch->new($config)->scan('full');
 	@list = $git->qx(qw(ls-tree -r --name-only refs/heads/master));
 	is(scalar @list, 0, 'tree is empty');
 	@list = $git->qx(qw(rev-list refs/heads/master));
@@ -118,7 +118,7 @@ More majordomo info at  http://vger.kernel.org/majordomo-info.html\n);
 	$config->{'publicinboxwatch.spamcheck'} = 'spamc';
 	{
 		local $SIG{__WARN__} = sub {}; # quiet spam check warning
-		PublicInbox::WatchMaildir->new($config)->scan('full');
+		PublicInbox::Watch->new($config)->scan('full');
 	}
 	@list = $git->qx(qw(ls-tree -r --name-only refs/heads/master));
 	is(scalar @list, 0, 'tree has no files spamc checked');
@@ -133,7 +133,7 @@ More majordomo info at  http://vger.kernel.org/majordomo-info.html\n);
 	PublicInbox::Emergency->new($maildir)->prepare(\$msg);
 	$config->{'publicinboxwatch.spamcheck'} = 'spamc';
 	@list = $git->qx(qw(ls-tree -r --name-only refs/heads/master));
-	PublicInbox::WatchMaildir->new($config)->scan('full');
+	PublicInbox::Watch->new($config)->scan('full');
 	@list = $git->qx(qw(ls-tree -r --name-only refs/heads/master));
 	is(scalar @list, 1, 'tree has one file after spamc checked');
 
@@ -211,7 +211,7 @@ More majordomo info at  http://vger.kernel.org/majordomo-info.html\n);
 
 sub is_maildir {
 	my ($dir) = @_;
-	PublicInbox::WatchMaildir::is_maildir($dir);
+	PublicInbox::Watch::is_maildir($dir);
 }
 
 is(is_maildir('maildir:/hello//world'), '/hello/world', 'extra slash gone');
diff --git a/t/watch_maildir_v2.t b/t/watch_maildir_v2.t
index 59ec247e..ca1cf965 100644
--- a/t/watch_maildir_v2.t
+++ b/t/watch_maildir_v2.t
@@ -14,7 +14,7 @@ my ($tmpdir, $for_destroy) = tmpdir();
 my $inboxdir = "$tmpdir/v2";
 my $maildir = "$tmpdir/md";
 my $spamdir = "$tmpdir/spam";
-use_ok 'PublicInbox::WatchMaildir';
+use_ok 'PublicInbox::Watch';
 use_ok 'PublicInbox::Emergency';
 my $cfgpfx = "publicinbox.test";
 my $addr = 'test-public@example.com';
@@ -49,7 +49,7 @@ my $ibx = $config->lookup_name('test');
 ok($ibx, 'found inbox by name');
 my $srch = $ibx->search;
 
-PublicInbox::WatchMaildir->new($config)->scan('full');
+PublicInbox::Watch->new($config)->scan('full');
 my $total = scalar @{$srch->reopen->query('')};
 is($total, 1, 'got one revision');
 
@@ -69,7 +69,7 @@ my $write_spam = sub {
 };
 $write_spam->();
 is(unlink(glob("$maildir/new/*")), 1, 'unlinked old spam');
-PublicInbox::WatchMaildir->new($config)->scan('full');
+PublicInbox::Watch->new($config)->scan('full');
 is_deeply($srch->reopen->query(''), [], 'deleted file');
 is(unlink(glob("$spamdir/cur/*")), 1, 'unlinked trained spam');
 
@@ -80,7 +80,7 @@ To unsubscribe from this list: send the line "unsubscribe git" in
 the body of a message to majordomo\@vger.kernel.org
 More majordomo info at  http://vger.kernel.org/majordomo-info.html\n);
 	PublicInbox::Emergency->new($maildir)->prepare(\$msg);
-	PublicInbox::WatchMaildir->new($config)->scan('full');
+	PublicInbox::Watch->new($config)->scan('full');
 	my $msgs = $srch->reopen->query('');
 	is(scalar(@$msgs), 1, 'got one file back');
 	my $mref = $ibx->msg_by_smsg($msgs->[0]);
@@ -88,7 +88,7 @@ More majordomo info at  http://vger.kernel.org/majordomo-info.html\n);
 
 	is(unlink(glob("$maildir/new/*")), 1, 'unlinked spam');
 	$write_spam->();
-	PublicInbox::WatchMaildir->new($config)->scan('full');
+	PublicInbox::Watch->new($config)->scan('full');
 	$msgs = $srch->reopen->query('');
 	is(scalar(@$msgs), 0, 'inbox is empty again');
 	is(unlink(glob("$spamdir/cur/*")), 1, 'unlinked trained spam');
@@ -103,7 +103,7 @@ More majordomo info at  http://vger.kernel.org/majordomo-info.html\n);
 	$config->{'publicinboxwatch.spamcheck'} = 'spamc';
 	{
 		local $SIG{__WARN__} = sub {}; # quiet spam check warning
-		PublicInbox::WatchMaildir->new($config)->scan('full');
+		PublicInbox::Watch->new($config)->scan('full');
 	}
 	my $msgs = $srch->reopen->query('');
 	is(scalar(@$msgs), 0, 'inbox is still empty');
@@ -117,7 +117,7 @@ More majordomo info at  http://vger.kernel.org/majordomo-info.html\n);
 	local $ENV{PATH} = $main_path;
 	PublicInbox::Emergency->new($maildir)->prepare(\$msg);
 	$config->{'publicinboxwatch.spamcheck'} = 'spamc';
-	PublicInbox::WatchMaildir->new($config)->scan('full');
+	PublicInbox::Watch->new($config)->scan('full');
 	my $msgs = $srch->reopen->query('');
 	is(scalar(@$msgs), 1, 'inbox has one mail after spamc OK-ed a message');
 	my $mref = $ibx->msg_by_smsg($msgs->[0]);
@@ -130,7 +130,7 @@ More majordomo info at  http://vger.kernel.org/majordomo-info.html\n);
 	open my $fh, '<', $patch or die "failed to open $patch: $!\n";
 	$msg = do { local $/; <$fh> };
 	PublicInbox::Emergency->new($maildir)->prepare(\$msg);
-	PublicInbox::WatchMaildir->new($config)->scan('full');
+	PublicInbox::Watch->new($config)->scan('full');
 	my $msgs = $srch->reopen->query('dfpost:6e006fd7');
 	is(scalar(@$msgs), 1, 'diff postimage found');
 	my $post = $msgs->[0];
@@ -161,7 +161,7 @@ Date: Sat, 18 Jun 2016 00:00:00 +0000
 both
 EOF
 	PublicInbox::Emergency->new($maildir)->prepare(\$both);
-	PublicInbox::WatchMaildir->new($config)->scan('full');
+	PublicInbox::Watch->new($config)->scan('full');
 	my $msgs = $srch->reopen->query('m:both@b.com');
 	my $v1 = $config->lookup_name('v1');
 	my $msg = $v1->git->cat_file($msgs->[0]->{blob});
@@ -186,7 +186,7 @@ EOF
 	PublicInbox::Emergency->new($maildir)->prepare(\$want);
 	PublicInbox::Emergency->new($maildir)->prepare(\$do_not_want);
 	my $config = PublicInbox::Config->new(\$cfg);
-	PublicInbox::WatchMaildir->new($config)->scan('full');
+	PublicInbox::Watch->new($config)->scan('full');
 	$ibx = $config->lookup_name('test');
 	my $num = $ibx->mm->num_for('do.want@example.com');
 	ok(defined $num, 'List-ID matched for watch');
@@ -195,7 +195,7 @@ EOF
 
 	$cfg = $orig."$cfgpfx.watchheader=X-Mailing-List:no\@example.com\n";
 	$config = PublicInbox::Config->new(\$cfg);
-	PublicInbox::WatchMaildir->new($config)->scan('full');
+	PublicInbox::Watch->new($config)->scan('full');
 	$ibx = $config->lookup_name('test');
 	$num = $ibx->mm->num_for('do.not.want@example.com');
 	ok(defined $num, 'X-Mailing-List matched');
diff --git a/t/watch_multiple_headers.t b/t/watch_multiple_headers.t
index 0ee96d5f..a0813532 100644
--- a/t/watch_multiple_headers.t
+++ b/t/watch_multiple_headers.t
@@ -9,7 +9,7 @@ require_mods(qw(Search::Xapian DBD::SQLite));
 my ($tmpdir, $for_destroy) = tmpdir();
 my $inboxdir = "$tmpdir/v2";
 my $maildir = "$tmpdir/md";
-use_ok 'PublicInbox::WatchMaildir';
+use_ok 'PublicInbox::Watch';
 use_ok 'PublicInbox::Emergency';
 my $cfgpfx = "publicinbox.test";
 my $addr = 'test-public@example.com';
@@ -62,7 +62,7 @@ $cfgpfx.watchheader=To:$addr
 $cfgpfx.watchheader=Cc:$addr
 EOF
 my $config = PublicInbox::Config->new(\$cfg);
-PublicInbox::WatchMaildir->new($config)->scan('full');
+PublicInbox::Watch->new($config)->scan('full');
 my $ibx = $config->lookup_name('test');
 ok($ibx, 'found inbox by name');
 
diff --git a/t/watch_nntp.t b/t/watch_nntp.t
index 98fb1161..ce1a3153 100644
--- a/t/watch_nntp.t
+++ b/t/watch_nntp.t
@@ -5,8 +5,8 @@ use Test::More;
 use PublicInbox::Config;
 # see t/nntpd*.t for tests against a live NNTP server
 
-use_ok 'PublicInbox::WatchMaildir';
-my $nntp_url = \&PublicInbox::WatchMaildir::nntp_url;
+use_ok 'PublicInbox::Watch';
+my $nntp_url = \&PublicInbox::Watch::nntp_url;
 is('news://example.com/inbox.foo',
 	$nntp_url->('NEWS://examplE.com/inbox.foo'), 'lowercased');
 is('nntps://example.com/inbox.foo',

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH 04/11] watch: log signal activities to STDERR
  2020-08-31  4:41 [PATCH 00/11] watch: fix contention w/ Maildir & NNTP Eric Wong
                   ` (2 preceding siblings ...)
  2020-08-31  4:41 ` [PATCH 03/11] rename WatchMaildir => Watch Eric Wong
@ 2020-08-31  4:41 ` Eric Wong
  2020-08-31  4:41 ` [PATCH 05/11] watch: avoid unnecessary spawning on spam removals Eric Wong
                   ` (6 subsequent siblings)
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2020-08-31  4:41 UTC (permalink / raw)
  To: meta; +Cc: Eric Wong

From: Eric Wong <e@yhbt.net>

Sometimes it may not be apparent when/if a signal is
processed, this hopefully improves the situation.

We'll also change the process title when we're quitting
to better inform users.
---
 script/public-inbox-watch | 24 ++++++++++++++++++------
 1 file changed, 18 insertions(+), 6 deletions(-)

diff --git a/script/public-inbox-watch b/script/public-inbox-watch
index 02491860..b6c6b202 100755
--- a/script/public-inbox-watch
+++ b/script/public-inbox-watch
@@ -11,18 +11,30 @@ use PublicInbox::Syscall qw($SFD_NONBLOCK);
 my $oldset = PublicInbox::Sigfd::block_signals();
 STDOUT->autoflush(1);
 STDERR->autoflush(1);
-my ($config, $watch);
+local $0 = $0; # local since this script may be eval-ed
+my $watch = PublicInbox::Watch->new(PublicInbox::Config->new);
 my $reload = sub {
-	$config = PublicInbox::Config->new;
-	$watch->quit if $watch;
-	$watch = PublicInbox::Watch->new($config);
+	my $prev = $watch or return; # SIGQUIT issued
+	$watch->quit;
+	$watch = PublicInbox::Watch->new(PublicInbox::Config->new);
+	if ($watch) {
+		warn("I: reloaded\n");
+	} else {
+		warn("E: reloading failed\n");
+		$watch = $prev;
+	}
 };
-$reload->();
+
 if ($watch) {
-	my $scan = sub { $watch->trigger_scan('full') if $watch };
+	my $scan = sub {
+		return if !$watch;
+		warn "I: scanning\n";
+		$watch->trigger_scan('full');
+	};
 	my $quit = sub {
 		$watch->quit if $watch;
 		$watch = undef;
+		$0 .= ' quitting';
 	};
 	my $sig = {
 		HUP => $reload,

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH 05/11] watch: avoid unnecessary spawning on spam removals
  2020-08-31  4:41 [PATCH 00/11] watch: fix contention w/ Maildir & NNTP Eric Wong
                   ` (3 preceding siblings ...)
  2020-08-31  4:41 ` [PATCH 04/11] watch: log signal activities to STDERR Eric Wong
@ 2020-08-31  4:41 ` Eric Wong
  2020-08-31  4:41 ` [PATCH 06/11] watch: block signals before fork on non-signalfd/kevent systems Eric Wong
                   ` (5 subsequent siblings)
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2020-08-31  4:41 UTC (permalink / raw)
  To: meta; +Cc: Eric Wong

From: Eric Wong <e@yhbt.net>

This should further mitigate lock contention problems
when -watch is configured to watch on a Maildir for spam
while performing a large NNTP import.

There is now a small risk a message won't get removed because if
it's in the current (uncommitted) fast-import batch, but
unlikely given the batch size is now only 10 messages.

If a that small window is hit, flipping the \Seen flag
(e.g. marking it unread, and then read again) will trigger
another removal attempt via IMAP or Maildir.
---
 lib/PublicInbox/Import.pm     |  3 +++
 lib/PublicInbox/V2Writable.pm |  3 +++
 lib/PublicInbox/Watch.pm      | 31 +++++++++++++++++++++++++------
 3 files changed, 31 insertions(+), 6 deletions(-)

diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index 700b4026..ee5ca2ea 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -461,6 +461,9 @@ sub init_bare {
 	}
 }
 
+# true if locked and active
+sub active { !!$_[0]->{out} }
+
 sub done {
 	my ($self) = @_;
 	my $w = delete $self->{out} or return;
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index f2288904..553dd839 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -655,6 +655,9 @@ sub checkpoint ($;$) {
 # public
 sub barrier { checkpoint($_[0], 1) };
 
+# true if locked and active
+sub active { !!$_[0]->{im} }
+
 # public
 sub done {
 	my ($self) = @_;
diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm
index 5f786139..0bb92d0a 100644
--- a/lib/PublicInbox/Watch.pm
+++ b/lib/PublicInbox/Watch.pm
@@ -134,15 +134,34 @@ sub _done_for_now {
 sub remove_eml_i { # each_inbox callback
 	my ($ibx, $arg) = @_;
 	my ($self, $eml, $loc) = @$arg;
+
 	eval {
-		my $im = _importer_for($self, $ibx);
-		$im->remove($eml, 'spam');
-		if (my $scrub = $ibx->filter($im)) {
-			my $scrubbed = $scrub->scrub($eml, 1);
-			if ($scrubbed && $scrubbed != REJECT) {
-				$im->remove($scrubbed, 'spam');
+		# try to avoid taking a lock or unnecessary spawning
+		my $im = $self->{importers}->{"$ibx"};
+		my $scrubbed;
+		if ((!$im || !$im->active) && $ibx->over) {
+			if (content_exists($ibx, $eml)) {
+				# continue
+			} elsif (my $scrub = $ibx->filter($im)) {
+				$scrubbed = $scrub->scrub($eml, 1);
+				if ($scrubbed && $scrubbed != REJECT &&
+					  !content_exists($ibx, $scrubbed)) {
+					return;
+				}
+			} else {
+				return;
 			}
 		}
+
+		$im //= _importer_for($self, $ibx); # may spawn fast-import
+		$im->remove($eml, 'spam');
+		$scrubbed //= do {
+			my $scrub = $ibx->filter($im);
+			$scrub ? $scrub->scrub($eml, 1) : undef;
+		};
+		if ($scrubbed && $scrubbed != REJECT) {
+			$im->remove($scrubbed, 'spam');
+		}
 	};
 	if ($@) {
 		warn "error removing spam at: $loc from $ibx->{name}: $@\n";

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH 06/11] watch: block signals before fork on non-signalfd/kevent systems
  2020-08-31  4:41 [PATCH 00/11] watch: fix contention w/ Maildir & NNTP Eric Wong
                   ` (4 preceding siblings ...)
  2020-08-31  4:41 ` [PATCH 05/11] watch: avoid unnecessary spawning on spam removals Eric Wong
@ 2020-08-31  4:41 ` Eric Wong
  2020-08-31  4:41 ` [PATCH 07/11] watch: comments and tiny cleanups Eric Wong
                   ` (4 subsequent siblings)
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2020-08-31  4:41 UTC (permalink / raw)
  To: meta

In case there's non-Linux or BSD users w/o IO::KQueue, we
shouldn't let signal handlers fire in the child processes.

The child processes always assumed signals were blocked by
the parent, so no changes were necessary, there.
---
 lib/PublicInbox/Watch.pm | 21 ++++++++++++++-------
 1 file changed, 14 insertions(+), 7 deletions(-)

diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm
index 0bb92d0a..2698c44a 100644
--- a/lib/PublicInbox/Watch.pm
+++ b/lib/PublicInbox/Watch.pm
@@ -589,6 +589,7 @@ sub watch_atfork_child ($) {
 sub watch_atfork_parent ($) {
 	my ($self) = @_;
 	_done_for_now($self);
+	PublicInbox::Sigfd::block_signals();
 }
 
 sub imap_idle_requeue ($) { # DS::add_timer callback
@@ -628,10 +629,14 @@ sub event_step {
 	return if $self->{quit};
 	my $idle_todo = $self->{idle_todo};
 	if ($idle_todo && @$idle_todo) {
-		watch_atfork_parent($self);
-		while (my $url_intvl = shift(@$idle_todo)) {
-			imap_idle_fork($self, $url_intvl);
-		}
+		my $oldset = watch_atfork_parent($self);
+		eval {
+			while (my $url_intvl = shift(@$idle_todo)) {
+				imap_idle_fork($self, $url_intvl);
+			}
+		};
+		PublicInbox::Sigfd::sig_setmask($oldset);
+		die $@ if $@;
 	}
 	goto(&fs_scan_step) if $self->{mdre};
 }
@@ -684,9 +689,9 @@ sub watch_nntp_fetch_all ($$) {
 sub poll_fetch_fork ($) { # DS::add_timer callback
 	my ($self, $intvl, $urls) = @{$_[0]};
 	return if $self->{quit};
-	watch_atfork_parent($self);
-	defined(my $pid = fork) or die "fork: $!";
-	if ($pid == 0) {
+	my $oldset = watch_atfork_parent($self);
+	my $pid = fork;
+	if (defined($pid) && $pid == 0) {
 		watch_atfork_child($self);
 		if ($urls->[0] =~ m!\Aimaps?://!i) {
 			watch_imap_fetch_all($self, $urls);
@@ -695,6 +700,8 @@ sub poll_fetch_fork ($) { # DS::add_timer callback
 		}
 		_exit(0);
 	}
+	PublicInbox::Sigfd::sig_setmask($oldset);
+	die "fork: $!"  unless defined $pid;
 	$self->{poll_pids}->{$pid} = [ $intvl, $urls ];
 	PublicInbox::DS::dwaitpid($pid, \&poll_fetch_reap, $self);
 }

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH 07/11] watch: comments and tiny cleanups
  2020-08-31  4:41 [PATCH 00/11] watch: fix contention w/ Maildir & NNTP Eric Wong
                   ` (5 preceding siblings ...)
  2020-08-31  4:41 ` [PATCH 06/11] watch: block signals before fork on non-signalfd/kevent systems Eric Wong
@ 2020-08-31  4:41 ` Eric Wong
  2020-08-31  4:41 ` [PATCH 08/11] ds: avoid excessive queueing when reaping PIDs Eric Wong
                   ` (3 subsequent siblings)
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2020-08-31  4:41 UTC (permalink / raw)
  To: meta; +Cc: Eric Wong

From: Eric Wong <e@yhbt.net>

Get rid of an unused variable, prefix a warning and try to
better document control flow around various callbacks.
---
 lib/PublicInbox/Watch.pm | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm
index 2698c44a..db8d0396 100644
--- a/lib/PublicInbox/Watch.pm
+++ b/lib/PublicInbox/Watch.pm
@@ -268,7 +268,7 @@ sub watch_fs_init ($) {
 		delete $self->{done_timer};
 		_done_for_now($self);
 	};
-	my $cb = sub {
+	my $cb = sub { # called by PublicInbox::DirIdle::event_step
 		_try_path($self, $_[0]->fullname);
 		$self->{done_timer} //= PublicInbox::DS::requeue($done);
 	};
@@ -411,7 +411,7 @@ sub imap_import_msg ($$$$$) {
 	if (ref($inboxes)) {
 		for my $ibx (@$inboxes) {
 			my $eml = PublicInbox::Eml->new($$raw);
-			my $x = import_eml($self, $ibx, $eml);
+			import_eml($self, $ibx, $eml);
 		}
 	} elsif ($inboxes eq 'watchspam') {
 		# we don't remove unseen messages
@@ -566,7 +566,7 @@ sub watch_imap_idle_1 ($$$) {
 			$err = imap_fetch_all($self, $mic, $url);
 			$err //= imap_idle_once($self, $mic, $intvl, $url);
 		} else {
-			$err = "not connected: $!";
+			$err = "E: not connected: $!";
 		}
 		if ($err && !$self->{quit}) {
 			warn $err, "\n";
@@ -984,7 +984,7 @@ sub watch_nntp_init ($$) {
 	}
 }
 
-sub watch {
+sub watch { # main entry point
 	my ($self, $sig, $oldset) = @_;
 	$self->{oldset} = $oldset;
 	$self->{sig} = $sig;
@@ -998,7 +998,7 @@ sub watch {
 	}
 	watch_fs_init($self) if $self->{mdre};
 	PublicInbox::DS->SetPostLoopCallback(sub { !$self->quit_done });
-	PublicInbox::DS->EventLoop;
+	PublicInbox::DS->EventLoop; # calls ->event_step
 	_done_for_now($self);
 }
 
@@ -1083,7 +1083,7 @@ sub content_exists ($$) {
 
 sub _spamcheck_cb {
 	my ($sc) = @_;
-	sub {
+	sub { # this gets called by (V2Writable||Import)->add
 		my ($mime, $ibx) = @_;
 		return if content_exists($ibx, $mime);
 		my $tmp = '';

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH 08/11] ds: avoid excessive queueing when reaping PIDs
  2020-08-31  4:41 [PATCH 00/11] watch: fix contention w/ Maildir & NNTP Eric Wong
                   ` (6 preceding siblings ...)
  2020-08-31  4:41 ` [PATCH 07/11] watch: comments and tiny cleanups Eric Wong
@ 2020-08-31  4:41 ` Eric Wong
  2020-08-31  4:41 ` [PATCH 09/11] watch: use EOFpipe to reduce dwaitpid wakeups Eric Wong
                   ` (2 subsequent siblings)
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2020-08-31  4:41 UTC (permalink / raw)
  To: meta

We should not enqueue reap_pids() to run more than once per
EventLoop iteration.  We'll start reformatting reap_pids
to tabs, too, since we're no longer Danga::Socket.

We should also be able to remove timer usage for reaping
down-the-line once we stop abusing dwaitpid() in -watch.
---
 lib/PublicInbox/DS.pm | 43 ++++++++++++++++++++++++++-----------------
 1 file changed, 26 insertions(+), 17 deletions(-)

diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index a3f2e76c..b252ea3c 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -40,7 +40,7 @@ my $wait_pids; # list of [ pid, callback, callback_arg ]
 my $later_queue; # list of callbacks to run at some later interval
 my $EXPMAP; # fd -> idle_time
 our $EXPTIME = 180; # 3 minutes
-my ($later_timer, $reap_timer, $exp_timer);
+my ($later_timer, $reap_armed, $reap_timer, $exp_timer);
 my $ToClose; # sockets to close when event loop is done
 our (
      %DescriptorMap,             # fd (num) -> PublicInbox::DS object
@@ -68,7 +68,7 @@ Reset all state
 =cut
 sub Reset {
     %DescriptorMap = ();
-    $in_loop = $wait_pids = $later_queue = undef;
+    $in_loop = $wait_pids = $later_queue = $reap_armed = undef;
     $EXPMAP = {};
     $nextq = $ToClose = $reap_timer = $later_timer = $exp_timer = undef;
     $LoopTimeout = -1;  # no timeout by default
@@ -225,24 +225,33 @@ sub RunTimers {
 # and other things.  So we scan the $wait_pids list, which is hopefully
 # not too big.  We keep $wait_pids small by not calling dwaitpid()
 # until we've hit EOF when reading the stdout of the child.
+
 sub reap_pids {
-    my $tmp = $wait_pids or return;
-    $wait_pids = $reap_timer = undef;
-    foreach my $ary (@$tmp) {
-        my ($pid, $cb, $arg) = @$ary;
-        my $ret = waitpid($pid, WNOHANG);
-        if ($ret == 0) {
-            push @$wait_pids, $ary; # autovivifies @$wait_pids
-        } elsif ($cb) {
-            eval { $cb->($arg, $pid) };
-        }
-    }
-    # we may not be done, yet, and could've missed/masked a SIGCHLD:
-    $reap_timer = add_timer(1, \&reap_pids) if $wait_pids;
+	$reap_armed = undef;
+	my $tmp = $wait_pids or return;
+	$wait_pids = undef;
+	foreach my $ary (@$tmp) {
+		my ($pid, $cb, $arg) = @$ary;
+		my $ret = waitpid($pid, WNOHANG);
+		if ($ret == 0) {
+			push @$wait_pids, $ary; # autovivifies @$wait_pids
+		} elsif ($cb) {
+			eval { $cb->($arg, $pid) };
+		}
+	}
+	# we may not be done, yet, and could've missed/masked a SIGCHLD:
+	if ($wait_pids && !$reap_armed) {
+		$reap_timer //= add_timer(1, \&reap_pids_timed);
+	}
+}
+
+sub reap_pids_timed {
+	$reap_timer = undef;
+	goto \&reap_pids;
 }
 
 # reentrant SIGCHLD handler (since reap_pids is not reentrant)
-sub enqueue_reap ($) { push @$nextq, \&reap_pids }; # autovivifies
+sub enqueue_reap { $reap_armed //= requeue(\&reap_pids) }
 
 sub in_loop () { $in_loop }
 
@@ -627,7 +636,7 @@ sub dwaitpid ($$$) {
 	push @$wait_pids, [ @_ ]; # [ $pid, $cb, $arg ]
 
 	# We could've just missed our SIGCHLD, cover it, here:
-	requeue(\&reap_pids);
+	goto &enqueue_reap; # tail recursion
 }
 
 sub _run_later () {

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH 09/11] watch: use EOFpipe to reduce dwaitpid wakeups
  2020-08-31  4:41 [PATCH 00/11] watch: fix contention w/ Maildir & NNTP Eric Wong
                   ` (7 preceding siblings ...)
  2020-08-31  4:41 ` [PATCH 08/11] ds: avoid excessive queueing when reaping PIDs Eric Wong
@ 2020-08-31  4:41 ` Eric Wong
  2020-08-31  4:41 ` [PATCH 10/11] ds: avoid unnecessary timer for waitpid Eric Wong
  2020-08-31  4:41 ` [PATCH 11/11] replace ParentPipe with EOFpipe Eric Wong
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2020-08-31  4:41 UTC (permalink / raw)
  To: meta

It's a bit inefficient to use a pipe, here.  However, using
dwaitpid() on a process that's not expected to exit soon is
also inefficient as it causes excessive wakeups as most of
our inbox-writing code expects synchronous waitpid().

This only affects -watch instances configured for NNTP and IMAP
clients.
---
 MANIFEST                   |  1 +
 lib/PublicInbox/EOFpipe.pm | 24 ++++++++++++++++++++++++
 lib/PublicInbox/Watch.pm   | 25 +++++++++++++++++++++----
 3 files changed, 46 insertions(+), 4 deletions(-)
 create mode 100644 lib/PublicInbox/EOFpipe.pm

diff --git a/MANIFEST b/MANIFEST
index f090175e..0b3835d8 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -114,6 +114,7 @@ lib/PublicInbox/DSPoll.pm
 lib/PublicInbox/Daemon.pm
 lib/PublicInbox/DirIdle.pm
 lib/PublicInbox/DummyInbox.pm
+lib/PublicInbox/EOFpipe.pm
 lib/PublicInbox/Emergency.pm
 lib/PublicInbox/Eml.pm
 lib/PublicInbox/EmlContentFoo.pm
diff --git a/lib/PublicInbox/EOFpipe.pm b/lib/PublicInbox/EOFpipe.pm
new file mode 100644
index 00000000..489caf82
--- /dev/null
+++ b/lib/PublicInbox/EOFpipe.pm
@@ -0,0 +1,24 @@
+# Copyright (C) 2020 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+package PublicInbox::EOFpipe;
+use strict;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
+
+sub new {
+	my (undef, $rd, $cb, $arg) = @_;
+	my $self = bless {  cb => $cb, arg => $arg }, __PACKAGE__;
+	# 1031: F_SETPIPE_SZ, 4096: page size
+	fcntl($rd, 1031, 4096) if $^O eq 'linux';
+	$self->SUPER::new($rd, EPOLLIN|EPOLLONESHOT);
+}
+
+sub event_step {
+	my ($self) = @_;
+	if ($self->do_read(my $buf, 1) == 0) { # auto-closed
+		$self->{cb}->($self->{arg});
+	}
+}
+
+1;
diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm
index db8d0396..17786377 100644
--- a/lib/PublicInbox/Watch.pm
+++ b/lib/PublicInbox/Watch.pm
@@ -14,7 +14,8 @@ use PublicInbox::Sigfd;
 use PublicInbox::DS qw(now);
 use PublicInbox::MID qw(mids);
 use PublicInbox::ContentHash qw(content_hash);
-use POSIX qw(_exit);
+use PublicInbox::EOFpipe;
+use POSIX qw(_exit WNOHANG);
 
 sub compile_watchheaders ($) {
 	my ($ibx) = @_;
@@ -611,17 +612,30 @@ sub imap_idle_reap { # PublicInbox::DS::dwaitpid callback
 				\&imap_idle_requeue, [ $self, $url_intvl ]);
 }
 
+sub reap { # callback for EOFpipe
+	my ($pid, $cb, $self) = @{$_[0]};
+	my $ret = waitpid($pid, 0);
+	if ($ret == $pid) {
+		$cb->($self, $pid); # poll_fetch_reap || imap_idle_reap
+	} else {
+		warn "W: waitpid($pid) => ", $ret // "($!)", "\n";
+	}
+}
+
 sub imap_idle_fork ($$) {
 	my ($self, $url_intvl) = @_;
 	my ($url, $intvl) = @$url_intvl;
+	pipe(my ($r, $w)) or die "pipe: $!";
 	defined(my $pid = fork) or die "fork: $!";
 	if ($pid == 0) {
+		close $r;
 		watch_atfork_child($self);
 		watch_imap_idle_1($self, $url, $intvl);
+		close $w;
 		_exit(0);
 	}
 	$self->{idle_pids}->{$pid} = $url_intvl;
-	PublicInbox::DS::dwaitpid($pid, \&imap_idle_reap, $self);
+	PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&imap_idle_reap, $self]);
 }
 
 sub event_step {
@@ -689,24 +703,27 @@ sub watch_nntp_fetch_all ($$) {
 sub poll_fetch_fork ($) { # DS::add_timer callback
 	my ($self, $intvl, $urls) = @{$_[0]};
 	return if $self->{quit};
+	pipe(my ($r, $w)) or die "pipe: $!";
 	my $oldset = watch_atfork_parent($self);
 	my $pid = fork;
 	if (defined($pid) && $pid == 0) {
+		close $r;
 		watch_atfork_child($self);
 		if ($urls->[0] =~ m!\Aimaps?://!i) {
 			watch_imap_fetch_all($self, $urls);
 		} else {
 			watch_nntp_fetch_all($self, $urls);
 		}
+		close $w;
 		_exit(0);
 	}
 	PublicInbox::Sigfd::sig_setmask($oldset);
 	die "fork: $!"  unless defined $pid;
 	$self->{poll_pids}->{$pid} = [ $intvl, $urls ];
-	PublicInbox::DS::dwaitpid($pid, \&poll_fetch_reap, $self);
+	PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&poll_fetch_reap, $self]);
 }
 
-sub poll_fetch_reap { # PublicInbox::DS::dwaitpid callback
+sub poll_fetch_reap {
 	my ($self, $pid) = @_;
 	my $intvl_urls = delete $self->{poll_pids}->{$pid} or
 		die "BUG: PID=$pid (unknown) reaped: \$?=$?\n";

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH 10/11] ds: avoid unnecessary timer for waitpid
  2020-08-31  4:41 [PATCH 00/11] watch: fix contention w/ Maildir & NNTP Eric Wong
                   ` (8 preceding siblings ...)
  2020-08-31  4:41 ` [PATCH 09/11] watch: use EOFpipe to reduce dwaitpid wakeups Eric Wong
@ 2020-08-31  4:41 ` Eric Wong
  2020-08-31  4:41 ` [PATCH 11/11] replace ParentPipe with EOFpipe Eric Wong
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2020-08-31  4:41 UTC (permalink / raw)
  To: meta

It doesn't seem necessary, since we won't call dwaitpid()
until we see an EOF.
---
 lib/PublicInbox/DS.pm | 13 +++----------
 1 file changed, 3 insertions(+), 10 deletions(-)

diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index b252ea3c..661be1fd 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -40,7 +40,7 @@ my $wait_pids; # list of [ pid, callback, callback_arg ]
 my $later_queue; # list of callbacks to run at some later interval
 my $EXPMAP; # fd -> idle_time
 our $EXPTIME = 180; # 3 minutes
-my ($later_timer, $reap_armed, $reap_timer, $exp_timer);
+my ($later_timer, $reap_armed, $exp_timer);
 my $ToClose; # sockets to close when event loop is done
 our (
      %DescriptorMap,             # fd (num) -> PublicInbox::DS object
@@ -70,7 +70,7 @@ sub Reset {
     %DescriptorMap = ();
     $in_loop = $wait_pids = $later_queue = $reap_armed = undef;
     $EXPMAP = {};
-    $nextq = $ToClose = $reap_timer = $later_timer = $exp_timer = undef;
+    $nextq = $ToClose = $later_timer = $exp_timer = undef;
     $LoopTimeout = -1;  # no timeout by default
     @Timers = ();
 
@@ -240,14 +240,7 @@ sub reap_pids {
 		}
 	}
 	# we may not be done, yet, and could've missed/masked a SIGCHLD:
-	if ($wait_pids && !$reap_armed) {
-		$reap_timer //= add_timer(1, \&reap_pids_timed);
-	}
-}
-
-sub reap_pids_timed {
-	$reap_timer = undef;
-	goto \&reap_pids;
+	$reap_armed //= requeue(\&reap_pids) if $wait_pids;
 }
 
 # reentrant SIGCHLD handler (since reap_pids is not reentrant)

^ permalink raw reply related	[flat|nested] 12+ messages in thread

* [PATCH 11/11] replace ParentPipe with EOFpipe
  2020-08-31  4:41 [PATCH 00/11] watch: fix contention w/ Maildir & NNTP Eric Wong
                   ` (9 preceding siblings ...)
  2020-08-31  4:41 ` [PATCH 10/11] ds: avoid unnecessary timer for waitpid Eric Wong
@ 2020-08-31  4:41 ` Eric Wong
  10 siblings, 0 replies; 12+ messages in thread
From: Eric Wong @ 2020-08-31  4:41 UTC (permalink / raw)
  To: meta

ParentPipe was a subset of EOFpipe, except EOFpipe correctly
accounts for theoretical(*) spurious wakeups on the pipe.

(*) AFAIK, spurious wakeups are/were more likely on TCP sockets
    due to checksum failures, something that's not a problem on
    local pipes.  We're also not sharing pipes like we do with
    listen sockets on accept(2), so there's no chance of another
    process grabbing bytes (unless we have bugs in our code).
---
 MANIFEST                      |  1 -
 lib/PublicInbox/Daemon.pm     |  6 ++----
 lib/PublicInbox/ParentPipe.pm | 23 -----------------------
 3 files changed, 2 insertions(+), 28 deletions(-)
 delete mode 100644 lib/PublicInbox/ParentPipe.pm

diff --git a/MANIFEST b/MANIFEST
index 0b3835d8..b65e96b0 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -169,7 +169,6 @@ lib/PublicInbox/NNTPdeflate.pm
 lib/PublicInbox/NewsWWW.pm
 lib/PublicInbox/Over.pm
 lib/PublicInbox/OverIdx.pm
-lib/PublicInbox/ParentPipe.pm
 lib/PublicInbox/ProcessPipe.pm
 lib/PublicInbox/Qspawn.pm
 lib/PublicInbox/Reply.pm
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index 45475183..000ba169 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -17,7 +17,7 @@ STDERR->autoflush(1);
 use PublicInbox::DS qw(now);
 use PublicInbox::Syscall qw($SFD_NONBLOCK);
 require PublicInbox::Listener;
-require PublicInbox::ParentPipe;
+use PublicInbox::EOFpipe;
 use PublicInbox::Sigfd;
 my @CMD;
 my ($set_user, $oldset);
@@ -468,8 +468,6 @@ sub master_quit ($) {
 
 sub master_loop {
 	pipe(my ($p0, $p1)) or die "failed to create parent-pipe: $!";
-	# 1031: F_SETPIPE_SZ, 4096: page size
-	fcntl($p1, 1031, 4096) if $^O eq 'linux';
 	my $set_workers = $worker_processes;
 	reopen_logs();
 	my $ignore_winch;
@@ -603,7 +601,7 @@ sub daemon_loop ($$$$) {
 	if ($worker_processes > 0) {
 		$refresh->(); # preload by default
 		my $fh = master_loop(); # returns if in child process
-		PublicInbox::ParentPipe->new($fh, \&worker_quit);
+		PublicInbox::EOFpipe->new($fh, \&worker_quit, undef);
 	} else {
 		reopen_logs();
 		$set_user->() if $set_user;
diff --git a/lib/PublicInbox/ParentPipe.pm b/lib/PublicInbox/ParentPipe.pm
deleted file mode 100644
index 538b5632..00000000
--- a/lib/PublicInbox/ParentPipe.pm
+++ /dev/null
@@ -1,23 +0,0 @@
-# Copyright (C) 2016-2020 all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-
-# only for PublicInbox::Daemon, allows worker processes to be
-# notified if the master process dies.
-package PublicInbox::ParentPipe;
-use strict;
-use parent qw(PublicInbox::DS);
-use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
-
-sub new ($$$) {
-	my ($class, $pipe, $worker_quit) = @_;
-	my $self = bless { cb => $worker_quit }, $class;
-	$self->SUPER::new($pipe, EPOLLIN|EPOLLONESHOT);
-}
-
-# master process died, time to call worker_quit ourselves
-sub event_step {
-	$_[0]->close; # PublicInbox::DS::close
-	$_[0]->{cb}->();
-}
-
-1;

^ permalink raw reply related	[flat|nested] 12+ messages in thread

end of thread, other threads:[~2020-08-31  4:41 UTC | newest]

Thread overview: 12+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-08-31  4:41 [PATCH 00/11] watch: fix contention w/ Maildir & NNTP Eric Wong
2020-08-31  4:41 ` [PATCH 01/11] watch: limit batch size of NNTP and IMAP workers, too Eric Wong
2020-08-31  4:41 ` [PATCH 02/11] watchmaildir: use v5.10.1, drop warnings Eric Wong
2020-08-31  4:41 ` [PATCH 03/11] rename WatchMaildir => Watch Eric Wong
2020-08-31  4:41 ` [PATCH 04/11] watch: log signal activities to STDERR Eric Wong
2020-08-31  4:41 ` [PATCH 05/11] watch: avoid unnecessary spawning on spam removals Eric Wong
2020-08-31  4:41 ` [PATCH 06/11] watch: block signals before fork on non-signalfd/kevent systems Eric Wong
2020-08-31  4:41 ` [PATCH 07/11] watch: comments and tiny cleanups Eric Wong
2020-08-31  4:41 ` [PATCH 08/11] ds: avoid excessive queueing when reaping PIDs Eric Wong
2020-08-31  4:41 ` [PATCH 09/11] watch: use EOFpipe to reduce dwaitpid wakeups Eric Wong
2020-08-31  4:41 ` [PATCH 10/11] ds: avoid unnecessary timer for waitpid Eric Wong
2020-08-31  4:41 ` [PATCH 11/11] replace ParentPipe with EOFpipe Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).